Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions ethcore/light/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,17 @@ const RECALCULATE_COSTS_INTERVAL: Duration = Duration::from_secs(60 * 60);
// minimum interval between updates.
const UPDATE_INTERVAL: Duration = Duration::from_millis(5000);

/// Packet count for PIP.
const PACKET_COUNT_V1: u8 = 9;

/// Supported protocol versions.
pub const PROTOCOL_VERSIONS: &'static [u8] = &[1];
pub const PROTOCOL_VERSIONS: &'static [(u8, u8)] = &[
(1, PACKET_COUNT_V1),
];

/// Max protocol version.
pub const MAX_PROTOCOL_VERSION: u8 = 1;

/// Packet count for PIP.
pub const PACKET_COUNT: u8 = 9;

// packet ID definitions.
mod packet {
Expand Down Expand Up @@ -111,9 +114,9 @@ mod packet {
mod timeout {
use std::time::Duration;

pub const HANDSHAKE: Duration = Duration::from_millis(2500);
pub const ACKNOWLEDGE_UPDATE: Duration = Duration::from_millis(5000);
pub const BASE: u64 = 1500; // base timeout for packet.
pub const HANDSHAKE: Duration = Duration::from_millis(4_000);
pub const ACKNOWLEDGE_UPDATE: Duration = Duration::from_millis(5_000);
pub const BASE: u64 = 2_500; // base timeout for packet.

// timeouts per request within packet.
pub const HEADERS: u64 = 250; // per header?
Expand Down Expand Up @@ -688,7 +691,7 @@ impl LightProtocol {
Err(e) => { punish(*peer, io, e); return }
};

if PROTOCOL_VERSIONS.iter().find(|x| **x == proto_version).is_none() {
if PROTOCOL_VERSIONS.iter().find(|x| x.0 == proto_version).is_none() {
punish(*peer, io, Error::UnsupportedProtocolVersion(proto_version));
return;
}
Expand Down
17 changes: 7 additions & 10 deletions ethcore/sync/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use chain::{ChainSync, SyncStatus as EthSyncStatus};
use std::net::{SocketAddr, AddrParseError};
use std::str::FromStr;
use parking_lot::RwLock;
use chain::{ETH_PACKET_COUNT, SNAPSHOT_SYNC_PACKET_COUNT, ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_62,
use chain::{ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_62,
PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3};
use light::client::AsLightClient;
use light::Provider;
Expand Down Expand Up @@ -202,18 +202,15 @@ pub struct AttachedProtocol {
pub handler: Arc<NetworkProtocolHandler + Send + Sync>,
/// 3-character ID for the protocol.
pub protocol_id: ProtocolId,
/// Packet count.
pub packet_count: u8,
/// Supported versions.
pub versions: &'static [u8],
/// Supported versions and their packet counts.
pub versions: &'static [(u8, u8)],
}

impl AttachedProtocol {
fn register(&self, network: &NetworkService) {
let res = network.register_protocol(
self.handler.clone(),
self.protocol_id,
self.packet_count,
self.versions
);

Expand Down Expand Up @@ -459,15 +456,15 @@ impl ChainNotify for EthSync {
Err(err) => warn!("Error starting network: {}", err),
_ => {},
}
self.network.register_protocol(self.eth_handler.clone(), self.subprotocol_name, ETH_PACKET_COUNT, &[ETH_PROTOCOL_VERSION_62, ETH_PROTOCOL_VERSION_63])
self.network.register_protocol(self.eth_handler.clone(), self.subprotocol_name, &[ETH_PROTOCOL_VERSION_62, ETH_PROTOCOL_VERSION_63])
.unwrap_or_else(|e| warn!("Error registering ethereum protocol: {:?}", e));
// register the warp sync subprotocol
self.network.register_protocol(self.eth_handler.clone(), WARP_SYNC_PROTOCOL_ID, SNAPSHOT_SYNC_PACKET_COUNT, &[PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3])
self.network.register_protocol(self.eth_handler.clone(), WARP_SYNC_PROTOCOL_ID, &[PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3])
.unwrap_or_else(|e| warn!("Error registering snapshot sync protocol: {:?}", e));

// register the light protocol.
if let Some(light_proto) = self.light_proto.as_ref().map(|x| x.clone()) {
self.network.register_protocol(light_proto, self.light_subprotocol_name, ::light::net::PACKET_COUNT, ::light::net::PROTOCOL_VERSIONS)
self.network.register_protocol(light_proto, self.light_subprotocol_name, ::light::net::PROTOCOL_VERSIONS)
.unwrap_or_else(|e| warn!("Error registering light client protocol: {:?}", e));
}

Expand Down Expand Up @@ -827,7 +824,7 @@ impl ManageNetwork for LightSync {

let light_proto = self.proto.clone();

self.network.register_protocol(light_proto, self.subprotocol_name, ::light::net::PACKET_COUNT, ::light::net::PROTOCOL_VERSIONS)
self.network.register_protocol(light_proto, self.subprotocol_name, ::light::net::PROTOCOL_VERSIONS)
.unwrap_or_else(|e| warn!("Error registering light client protocol: {:?}", e));

for proto in &self.attached_protos { proto.register(&self.network) }
Expand Down
8 changes: 5 additions & 3 deletions ethcore/sync/src/chain/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use super::{
MAX_NEW_BLOCK_AGE,
MAX_NEW_HASHES,
PAR_PROTOCOL_VERSION_1,
PAR_PROTOCOL_VERSION_2,
PAR_PROTOCOL_VERSION_3,
BLOCK_BODIES_PACKET,
BLOCK_HEADERS_PACKET,
Expand Down Expand Up @@ -641,8 +640,11 @@ impl SyncHandler {
trace!(target: "sync", "Peer {} network id mismatch (ours: {}, theirs: {})", peer_id, sync.network_id, peer.network_id);
return Ok(());
}
if (warp_protocol && peer.protocol_version != PAR_PROTOCOL_VERSION_1 && peer.protocol_version != PAR_PROTOCOL_VERSION_2 && peer.protocol_version != PAR_PROTOCOL_VERSION_3)
|| (!warp_protocol && peer.protocol_version != ETH_PROTOCOL_VERSION_63 && peer.protocol_version != ETH_PROTOCOL_VERSION_62) {

if false
|| (warp_protocol && (peer.protocol_version < PAR_PROTOCOL_VERSION_1.0 || peer.protocol_version > PAR_PROTOCOL_VERSION_3.0))
|| (!warp_protocol && (peer.protocol_version < ETH_PROTOCOL_VERSION_62.0 || peer.protocol_version > ETH_PROTOCOL_VERSION_63.0))
{
io.disable_peer(peer_id);
trace!(target: "sync", "Peer {} unsupported eth protocol ({})", peer_id, peer.protocol_version);
return Ok(());
Expand Down
24 changes: 10 additions & 14 deletions ethcore/sync/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,15 @@ known_heap_size!(0, PeerInfo);
pub type PacketDecodeError = DecoderError;

/// 63 version of Ethereum protocol.
pub const ETH_PROTOCOL_VERSION_63: u8 = 63;
pub const ETH_PROTOCOL_VERSION_63: (u8, u8) = (63, 0x11);
/// 62 version of Ethereum protocol.
pub const ETH_PROTOCOL_VERSION_62: u8 = 62;
/// 1 version of Parity protocol.
pub const PAR_PROTOCOL_VERSION_1: u8 = 1;
pub const ETH_PROTOCOL_VERSION_62: (u8, u8) = (62, 0x11);
/// 1 version of Parity protocol and the packet count.
pub const PAR_PROTOCOL_VERSION_1: (u8, u8) = (1, 0x15);
/// 2 version of Parity protocol (consensus messages added).
pub const PAR_PROTOCOL_VERSION_2: u8 = 2;
pub const PAR_PROTOCOL_VERSION_2: (u8, u8) = (2, 0x16);
/// 3 version of Parity protocol (private transactions messages added).
pub const PAR_PROTOCOL_VERSION_3: u8 = 3;
pub const PAR_PROTOCOL_VERSION_3: (u8, u8) = (3, 0x18);

pub const MAX_BODIES_TO_SEND: usize = 256;
pub const MAX_HEADERS_TO_SEND: usize = 512;
Expand Down Expand Up @@ -169,8 +169,6 @@ pub const NODE_DATA_PACKET: u8 = 0x0e;
pub const GET_RECEIPTS_PACKET: u8 = 0x0f;
pub const RECEIPTS_PACKET: u8 = 0x10;

pub const ETH_PACKET_COUNT: u8 = 0x11;

pub const GET_SNAPSHOT_MANIFEST_PACKET: u8 = 0x11;
pub const SNAPSHOT_MANIFEST_PACKET: u8 = 0x12;
pub const GET_SNAPSHOT_DATA_PACKET: u8 = 0x13;
Expand All @@ -179,8 +177,6 @@ pub const CONSENSUS_DATA_PACKET: u8 = 0x15;
const PRIVATE_TRANSACTION_PACKET: u8 = 0x16;
const SIGNED_PRIVATE_TRANSACTION_PACKET: u8 = 0x17;

pub const SNAPSHOT_SYNC_PACKET_COUNT: u8 = 0x18;

const MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD: usize = 3;

const WAIT_PEERS_TIMEOUT: Duration = Duration::from_secs(5);
Expand Down Expand Up @@ -453,7 +449,7 @@ impl ChainSync {
let last_imported_number = self.new_blocks.last_imported_block_number();
SyncStatus {
state: self.state.clone(),
protocol_version: ETH_PROTOCOL_VERSION_63,
protocol_version: ETH_PROTOCOL_VERSION_63.0,
network_id: self.network_id,
start_block_number: self.starting_block,
last_imported_block_number: Some(last_imported_number),
Expand Down Expand Up @@ -855,7 +851,7 @@ impl ChainSync {
fn send_status(&mut self, io: &mut SyncIo, peer: PeerId) -> Result<(), network::Error> {
let warp_protocol_version = io.protocol_version(&WARP_SYNC_PROTOCOL_ID, peer);
let warp_protocol = warp_protocol_version != 0;
let protocol = if warp_protocol { warp_protocol_version } else { ETH_PROTOCOL_VERSION_63 };
let protocol = if warp_protocol { warp_protocol_version } else { ETH_PROTOCOL_VERSION_63.0 };
trace!(target: "sync", "Sending status to {}, protocol version {}", peer, protocol);
let mut packet = RlpStream::new_list(if warp_protocol { 7 } else { 5 });
let chain = io.chain().chain_info();
Expand Down Expand Up @@ -1019,11 +1015,11 @@ impl ChainSync {
}

fn get_consensus_peers(&self) -> Vec<PeerId> {
self.peers.iter().filter_map(|(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_2 { Some(*id) } else { None }).collect()
self.peers.iter().filter_map(|(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_2.0 { Some(*id) } else { None }).collect()
}

fn get_private_transaction_peers(&self) -> Vec<PeerId> {
self.peers.iter().filter_map(|(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_3 { Some(*id) } else { None }).collect()
self.peers.iter().filter_map(|(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_3.0 { Some(*id) } else { None }).collect()
}

/// Maintain other peers. Send out any new blocks and transactions
Expand Down
5 changes: 3 additions & 2 deletions ethcore/sync/src/chain/requester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use super::{
BlockSet,
ChainSync,
PeerAsking,
ETH_PACKET_COUNT,
ETH_PROTOCOL_VERSION_63,
GET_BLOCK_BODIES_PACKET,
GET_BLOCK_HEADERS_PACKET,
GET_RECEIPTS_PACKET,
Expand Down Expand Up @@ -140,7 +140,8 @@ impl SyncRequester {
}
peer.asking = asking;
peer.ask_time = Instant::now();
let result = if packet_id >= ETH_PACKET_COUNT {
// TODO [ToDr] This seems quite fragile. Be careful when protocol is updated.
let result = if packet_id >= ETH_PROTOCOL_VERSION_63.1 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps a test here to help our future selves?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logged a separate issue.

io.send_protocol(WARP_SYNC_PROTOCOL_ID, peer_id, packet_id, packet)
} else {
io.send(peer_id, packet_id, packet)
Expand Down
2 changes: 1 addition & 1 deletion ethcore/sync/src/light_sync/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

//! Helpers for decoding and verifying responses for headers.

use ethcore::{self, encoded, header::Header};
use ethcore::{encoded, header::Header};
use ethereum_types::H256;
use light::request::{HashOrNumber, CompleteHeadersRequest as HeadersRequest};
use rlp::DecoderError;
Expand Down
4 changes: 2 additions & 2 deletions ethcore/sync/src/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,11 @@ impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p {
}

fn eth_protocol_version(&self, _peer: PeerId) -> u8 {
ETH_PROTOCOL_VERSION_63
ETH_PROTOCOL_VERSION_63.0
}

fn protocol_version(&self, protocol: &ProtocolId, peer_id: PeerId) -> u8 {
if protocol == &WARP_SYNC_PROTOCOL_ID { PAR_PROTOCOL_VERSION_3 } else { self.eth_protocol_version(peer_id) }
if protocol == &WARP_SYNC_PROTOCOL_ID { PAR_PROTOCOL_VERSION_3.0 } else { self.eth_protocol_version(peer_id) }
}

fn chain_overlay(&self) -> &RwLock<HashMap<BlockNumber, Bytes>> {
Expand Down
2 changes: 0 additions & 2 deletions parity/whisper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,13 @@ pub fn setup(target_pool_size: usize, protos: &mut Vec<AttachedProtocol>)

protos.push(AttachedProtocol {
handler: net.clone() as Arc<_>,
packet_count: whisper_net::PACKET_COUNT,
versions: whisper_net::SUPPORTED_VERSIONS,
protocol_id: whisper_net::PROTOCOL_ID,
});

// parity-only extensions to whisper.
protos.push(AttachedProtocol {
handler: Arc::new(whisper_net::ParityExtensions),
packet_count: whisper_net::PACKET_COUNT,
versions: whisper_net::SUPPORTED_VERSIONS,
protocol_id: whisper_net::PARITY_PROTOCOL_ID,
});
Expand Down
13 changes: 9 additions & 4 deletions util/network-devp2p/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ const NODE_TABLE_TIMEOUT: Duration = Duration::from_secs(300);
#[derive(Debug, PartialEq, Eq)]
/// Protocol info
pub struct CapabilityInfo {
/// Protocol ID
pub protocol: ProtocolId,
/// Protocol version
pub version: u8,
/// Total number of packet IDs this protocol support.
pub packet_count: u8,
Expand Down Expand Up @@ -687,7 +689,7 @@ impl Host {
Err(e) => {
let s = session.lock();
trace!(target: "network", "Session read error: {}:{:?} ({:?}) {:?}", token, s.id(), s.remote_addr(), e);
if let ErrorKind::Disconnect(DisconnectReason::UselessPeer) = *e.kind() {
if let ErrorKind::Disconnect(DisconnectReason::IncompatibleProtocol) = *e.kind() {
if let Some(id) = s.id() {
if !self.reserved_nodes.read().contains(id) {
let mut nodes = self.nodes.write();
Expand Down Expand Up @@ -990,7 +992,6 @@ impl IoHandler<NetworkIoMessage> for Host {
ref handler,
ref protocol,
ref versions,
ref packet_count,
} => {
let h = handler.clone();
let reserved = self.reserved_nodes.read();
Expand All @@ -1000,8 +1001,12 @@ impl IoHandler<NetworkIoMessage> for Host {
);
self.handlers.write().insert(*protocol, h);
let mut info = self.info.write();
for v in versions {
info.capabilities.push(CapabilityInfo { protocol: *protocol, version: *v, packet_count: *packet_count });
for &(version, packet_count) in versions {
info.capabilities.push(CapabilityInfo {
protocol: *protocol,
version,
packet_count,
});
}
},
NetworkIoMessage::AddTimer {
Expand Down
2 changes: 1 addition & 1 deletion util/network-devp2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
//! fn main () {
//! let mut service = NetworkService::new(NetworkConfiguration::new_local(), None).expect("Error creating network service");
//! service.start().expect("Error starting service");
//! service.register_protocol(Arc::new(MyHandler), *b"myp", 1, &[1u8]);
//! service.register_protocol(Arc::new(MyHandler), *b"myp", &[(1u8, 1u8)]);
//!
//! // Wait for quit condition
//! // ...
Expand Down
13 changes: 9 additions & 4 deletions util/network-devp2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,17 @@ impl NetworkService {
}

/// Regiter a new protocol handler with the event loop.
pub fn register_protocol(&self, handler: Arc<NetworkProtocolHandler + Send + Sync>, protocol: ProtocolId, packet_count: u8, versions: &[u8]) -> Result<(), Error> {
pub fn register_protocol(
&self,
handler: Arc<NetworkProtocolHandler + Send + Sync>,
protocol: ProtocolId,
// version id + packet count
versions: &[(u8, u8)]
) -> Result<(), Error> {
self.io_service.send_message(NetworkIoMessage::AddHandler {
handler: handler,
protocol: protocol,
handler,
protocol,
versions: versions.to_vec(),
packet_count: packet_count,
})?;
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions util/network-devp2p/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl TestProtocol {
/// Creates and register protocol with the network service
pub fn register(service: &mut NetworkService, drop_session: bool) -> Arc<TestProtocol> {
let handler = Arc::new(TestProtocol::new(drop_session));
service.register_protocol(handler.clone(), *b"tst", 1, &[42u8, 43u8]).expect("Error registering test protocol handler");
service.register_protocol(handler.clone(), *b"tst", &[(42u8, 1u8), (43u8, 1u8)]).expect("Error registering test protocol handler");
handler
}

Expand Down Expand Up @@ -104,7 +104,7 @@ impl NetworkProtocolHandler for TestProtocol {
fn net_service() {
let service = NetworkService::new(NetworkConfiguration::new_local(), None).expect("Error creating network service");
service.start().unwrap();
service.register_protocol(Arc::new(TestProtocol::new(false)), *b"myp", 1, &[1u8]).unwrap();
service.register_protocol(Arc::new(TestProtocol::new(false)), *b"myp", &[(1u8, 1u8)]).unwrap();
}

#[test]
Expand Down
6 changes: 2 additions & 4 deletions util/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,8 @@ pub enum NetworkIoMessage {
handler: Arc<NetworkProtocolHandler + Sync>,
/// Protocol Id.
protocol: ProtocolId,
/// Supported protocol versions.
versions: Vec<u8>,
/// Number of packet IDs reserved by the protocol.
packet_count: u8,
/// Supported protocol versions and number of packet IDs reserved by the protocol (packet count).
versions: Vec<(u8, u8)>,
},
/// Register a new protocol timer
AddTimer {
Expand Down
4 changes: 2 additions & 2 deletions whisper/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,10 @@ fn execute<S, I>(command: I) -> Result<(), Error> where I: IntoIterator<Item=S>,
network.start()?;

// Attach whisper protocol to the network service
network.register_protocol(whisper_network_handler.clone(), whisper::net::PROTOCOL_ID, whisper::net::PACKET_COUNT,
network.register_protocol(whisper_network_handler.clone(), whisper::net::PROTOCOL_ID,
whisper::net::SUPPORTED_VERSIONS)?;
network.register_protocol(Arc::new(whisper::net::ParityExtensions), whisper::net::PARITY_PROTOCOL_ID,
whisper::net::PACKET_COUNT, whisper::net::SUPPORTED_VERSIONS)?;
whisper::net::SUPPORTED_VERSIONS)?;

// Request handler
let mut io = MetaIoHandler::default();
Expand Down
10 changes: 6 additions & 4 deletions whisper/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,17 @@ const RALLY_TIMEOUT: Duration = Duration::from_millis(2500);
/// Current protocol version.
pub const PROTOCOL_VERSION: usize = 6;

/// Number of packets. A bunch are reserved.
const PACKET_COUNT: u8 = 128;

/// Supported protocol versions.
pub const SUPPORTED_VERSIONS: &'static [u8] = &[PROTOCOL_VERSION as u8];
pub const SUPPORTED_VERSIONS: &'static [(u8, u8)] = &[
(PROTOCOL_VERSION as u8, PACKET_COUNT)
];

// maximum tolerated delay between messages packets.
const MAX_TOLERATED_DELAY: Duration = Duration::from_millis(5000);

/// Number of packets. A bunch are reserved.
pub const PACKET_COUNT: u8 = 128;

/// Whisper protocol ID
pub const PROTOCOL_ID: ::network::ProtocolId = *b"shh";

Expand Down