Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.

Commit 08abf67

Browse files
tomusdrw5chdn
authored andcommitted
Fix packet count when talking with PAR2 peers (#8555)
* Support diferent packet counts in different protocol versions. * Fix light timeouts and eclipse protection. * Fix devp2p tests. * Fix whisper-cli compilation. * Fix compilation. * Fix ethcore-sync tests. * Revert "Fix light timeouts and eclipse protection." This reverts commit 06285ea. * Increase timeouts.
1 parent 981554c commit 08abf67

File tree

14 files changed

+68
-61
lines changed

14 files changed

+68
-61
lines changed

ethcore/light/src/net/mod.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -75,14 +75,17 @@ const RECALCULATE_COSTS_INTERVAL: Duration = Duration::from_secs(60 * 60);
7575
// minimum interval between updates.
7676
const UPDATE_INTERVAL: Duration = Duration::from_millis(5000);
7777

78+
/// Packet count for PIP.
79+
const PACKET_COUNT_V1: u8 = 9;
80+
7881
/// Supported protocol versions.
79-
pub const PROTOCOL_VERSIONS: &'static [u8] = &[1];
82+
pub const PROTOCOL_VERSIONS: &'static [(u8, u8)] = &[
83+
(1, PACKET_COUNT_V1),
84+
];
8085

8186
/// Max protocol version.
8287
pub const MAX_PROTOCOL_VERSION: u8 = 1;
8388

84-
/// Packet count for PIP.
85-
pub const PACKET_COUNT: u8 = 9;
8689

8790
// packet ID definitions.
8891
mod packet {
@@ -111,9 +114,9 @@ mod packet {
111114
mod timeout {
112115
use std::time::Duration;
113116

114-
pub const HANDSHAKE: Duration = Duration::from_millis(2500);
115-
pub const ACKNOWLEDGE_UPDATE: Duration = Duration::from_millis(5000);
116-
pub const BASE: u64 = 1500; // base timeout for packet.
117+
pub const HANDSHAKE: Duration = Duration::from_millis(4_000);
118+
pub const ACKNOWLEDGE_UPDATE: Duration = Duration::from_millis(5_000);
119+
pub const BASE: u64 = 2_500; // base timeout for packet.
117120

118121
// timeouts per request within packet.
119122
pub const HEADERS: u64 = 250; // per header?
@@ -688,7 +691,7 @@ impl LightProtocol {
688691
Err(e) => { punish(*peer, io, e); return }
689692
};
690693

691-
if PROTOCOL_VERSIONS.iter().find(|x| **x == proto_version).is_none() {
694+
if PROTOCOL_VERSIONS.iter().find(|x| x.0 == proto_version).is_none() {
692695
punish(*peer, io, Error::UnsupportedProtocolVersion(proto_version));
693696
return;
694697
}

ethcore/sync/src/api.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use chain::{ChainSync, SyncStatus as EthSyncStatus};
3333
use std::net::{SocketAddr, AddrParseError};
3434
use std::str::FromStr;
3535
use parking_lot::RwLock;
36-
use chain::{ETH_PACKET_COUNT, SNAPSHOT_SYNC_PACKET_COUNT, ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_62,
36+
use chain::{ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_62,
3737
PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3};
3838
use light::client::AsLightClient;
3939
use light::Provider;
@@ -202,18 +202,15 @@ pub struct AttachedProtocol {
202202
pub handler: Arc<NetworkProtocolHandler + Send + Sync>,
203203
/// 3-character ID for the protocol.
204204
pub protocol_id: ProtocolId,
205-
/// Packet count.
206-
pub packet_count: u8,
207-
/// Supported versions.
208-
pub versions: &'static [u8],
205+
/// Supported versions and their packet counts.
206+
pub versions: &'static [(u8, u8)],
209207
}
210208

211209
impl AttachedProtocol {
212210
fn register(&self, network: &NetworkService) {
213211
let res = network.register_protocol(
214212
self.handler.clone(),
215213
self.protocol_id,
216-
self.packet_count,
217214
self.versions
218215
);
219216

@@ -459,15 +456,15 @@ impl ChainNotify for EthSync {
459456
Err(err) => warn!("Error starting network: {}", err),
460457
_ => {},
461458
}
462-
self.network.register_protocol(self.eth_handler.clone(), self.subprotocol_name, ETH_PACKET_COUNT, &[ETH_PROTOCOL_VERSION_62, ETH_PROTOCOL_VERSION_63])
459+
self.network.register_protocol(self.eth_handler.clone(), self.subprotocol_name, &[ETH_PROTOCOL_VERSION_62, ETH_PROTOCOL_VERSION_63])
463460
.unwrap_or_else(|e| warn!("Error registering ethereum protocol: {:?}", e));
464461
// register the warp sync subprotocol
465-
self.network.register_protocol(self.eth_handler.clone(), WARP_SYNC_PROTOCOL_ID, SNAPSHOT_SYNC_PACKET_COUNT, &[PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3])
462+
self.network.register_protocol(self.eth_handler.clone(), WARP_SYNC_PROTOCOL_ID, &[PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3])
466463
.unwrap_or_else(|e| warn!("Error registering snapshot sync protocol: {:?}", e));
467464

468465
// register the light protocol.
469466
if let Some(light_proto) = self.light_proto.as_ref().map(|x| x.clone()) {
470-
self.network.register_protocol(light_proto, self.light_subprotocol_name, ::light::net::PACKET_COUNT, ::light::net::PROTOCOL_VERSIONS)
467+
self.network.register_protocol(light_proto, self.light_subprotocol_name, ::light::net::PROTOCOL_VERSIONS)
471468
.unwrap_or_else(|e| warn!("Error registering light client protocol: {:?}", e));
472469
}
473470

@@ -827,7 +824,7 @@ impl ManageNetwork for LightSync {
827824

828825
let light_proto = self.proto.clone();
829826

830-
self.network.register_protocol(light_proto, self.subprotocol_name, ::light::net::PACKET_COUNT, ::light::net::PROTOCOL_VERSIONS)
827+
self.network.register_protocol(light_proto, self.subprotocol_name, ::light::net::PROTOCOL_VERSIONS)
831828
.unwrap_or_else(|e| warn!("Error registering light client protocol: {:?}", e));
832829

833830
for proto in &self.attached_protos { proto.register(&self.network) }

ethcore/sync/src/chain/handler.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ use super::{
4545
MAX_NEW_BLOCK_AGE,
4646
MAX_NEW_HASHES,
4747
PAR_PROTOCOL_VERSION_1,
48-
PAR_PROTOCOL_VERSION_2,
4948
PAR_PROTOCOL_VERSION_3,
5049
BLOCK_BODIES_PACKET,
5150
BLOCK_HEADERS_PACKET,
@@ -641,8 +640,11 @@ impl SyncHandler {
641640
trace!(target: "sync", "Peer {} network id mismatch (ours: {}, theirs: {})", peer_id, sync.network_id, peer.network_id);
642641
return Ok(());
643642
}
644-
if (warp_protocol && peer.protocol_version != PAR_PROTOCOL_VERSION_1 && peer.protocol_version != PAR_PROTOCOL_VERSION_2 && peer.protocol_version != PAR_PROTOCOL_VERSION_3)
645-
|| (!warp_protocol && peer.protocol_version != ETH_PROTOCOL_VERSION_63 && peer.protocol_version != ETH_PROTOCOL_VERSION_62) {
643+
644+
if false
645+
|| (warp_protocol && (peer.protocol_version < PAR_PROTOCOL_VERSION_1.0 || peer.protocol_version > PAR_PROTOCOL_VERSION_3.0))
646+
|| (!warp_protocol && (peer.protocol_version < ETH_PROTOCOL_VERSION_62.0 || peer.protocol_version > ETH_PROTOCOL_VERSION_63.0))
647+
{
646648
io.disable_peer(peer_id);
647649
trace!(target: "sync", "Peer {} unsupported eth protocol ({})", peer_id, peer.protocol_version);
648650
return Ok(());

ethcore/sync/src/chain/mod.rs

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -127,15 +127,15 @@ known_heap_size!(0, PeerInfo);
127127
pub type PacketDecodeError = DecoderError;
128128

129129
/// 63 version of Ethereum protocol.
130-
pub const ETH_PROTOCOL_VERSION_63: u8 = 63;
130+
pub const ETH_PROTOCOL_VERSION_63: (u8, u8) = (63, 0x11);
131131
/// 62 version of Ethereum protocol.
132-
pub const ETH_PROTOCOL_VERSION_62: u8 = 62;
133-
/// 1 version of Parity protocol.
134-
pub const PAR_PROTOCOL_VERSION_1: u8 = 1;
132+
pub const ETH_PROTOCOL_VERSION_62: (u8, u8) = (62, 0x11);
133+
/// 1 version of Parity protocol and the packet count.
134+
pub const PAR_PROTOCOL_VERSION_1: (u8, u8) = (1, 0x15);
135135
/// 2 version of Parity protocol (consensus messages added).
136-
pub const PAR_PROTOCOL_VERSION_2: u8 = 2;
136+
pub const PAR_PROTOCOL_VERSION_2: (u8, u8) = (2, 0x16);
137137
/// 3 version of Parity protocol (private transactions messages added).
138-
pub const PAR_PROTOCOL_VERSION_3: u8 = 3;
138+
pub const PAR_PROTOCOL_VERSION_3: (u8, u8) = (3, 0x18);
139139

140140
pub const MAX_BODIES_TO_SEND: usize = 256;
141141
pub const MAX_HEADERS_TO_SEND: usize = 512;
@@ -169,8 +169,6 @@ pub const NODE_DATA_PACKET: u8 = 0x0e;
169169
pub const GET_RECEIPTS_PACKET: u8 = 0x0f;
170170
pub const RECEIPTS_PACKET: u8 = 0x10;
171171

172-
pub const ETH_PACKET_COUNT: u8 = 0x11;
173-
174172
pub const GET_SNAPSHOT_MANIFEST_PACKET: u8 = 0x11;
175173
pub const SNAPSHOT_MANIFEST_PACKET: u8 = 0x12;
176174
pub const GET_SNAPSHOT_DATA_PACKET: u8 = 0x13;
@@ -179,8 +177,6 @@ pub const CONSENSUS_DATA_PACKET: u8 = 0x15;
179177
const PRIVATE_TRANSACTION_PACKET: u8 = 0x16;
180178
const SIGNED_PRIVATE_TRANSACTION_PACKET: u8 = 0x17;
181179

182-
pub const SNAPSHOT_SYNC_PACKET_COUNT: u8 = 0x18;
183-
184180
const MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD: usize = 3;
185181

186182
const WAIT_PEERS_TIMEOUT: Duration = Duration::from_secs(5);
@@ -453,7 +449,7 @@ impl ChainSync {
453449
let last_imported_number = self.new_blocks.last_imported_block_number();
454450
SyncStatus {
455451
state: self.state.clone(),
456-
protocol_version: ETH_PROTOCOL_VERSION_63,
452+
protocol_version: ETH_PROTOCOL_VERSION_63.0,
457453
network_id: self.network_id,
458454
start_block_number: self.starting_block,
459455
last_imported_block_number: Some(last_imported_number),
@@ -855,7 +851,7 @@ impl ChainSync {
855851
fn send_status(&mut self, io: &mut SyncIo, peer: PeerId) -> Result<(), network::Error> {
856852
let warp_protocol_version = io.protocol_version(&WARP_SYNC_PROTOCOL_ID, peer);
857853
let warp_protocol = warp_protocol_version != 0;
858-
let protocol = if warp_protocol { warp_protocol_version } else { ETH_PROTOCOL_VERSION_63 };
854+
let protocol = if warp_protocol { warp_protocol_version } else { ETH_PROTOCOL_VERSION_63.0 };
859855
trace!(target: "sync", "Sending status to {}, protocol version {}", peer, protocol);
860856
let mut packet = RlpStream::new_list(if warp_protocol { 7 } else { 5 });
861857
let chain = io.chain().chain_info();
@@ -1019,11 +1015,11 @@ impl ChainSync {
10191015
}
10201016

10211017
fn get_consensus_peers(&self) -> Vec<PeerId> {
1022-
self.peers.iter().filter_map(|(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_2 { Some(*id) } else { None }).collect()
1018+
self.peers.iter().filter_map(|(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_2.0 { Some(*id) } else { None }).collect()
10231019
}
10241020

10251021
fn get_private_transaction_peers(&self) -> Vec<PeerId> {
1026-
self.peers.iter().filter_map(|(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_3 { Some(*id) } else { None }).collect()
1022+
self.peers.iter().filter_map(|(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_3.0 { Some(*id) } else { None }).collect()
10271023
}
10281024

10291025
/// Maintain other peers. Send out any new blocks and transactions

ethcore/sync/src/chain/requester.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use super::{
2828
BlockSet,
2929
ChainSync,
3030
PeerAsking,
31-
ETH_PACKET_COUNT,
31+
ETH_PROTOCOL_VERSION_63,
3232
GET_BLOCK_BODIES_PACKET,
3333
GET_BLOCK_HEADERS_PACKET,
3434
GET_RECEIPTS_PACKET,
@@ -140,7 +140,8 @@ impl SyncRequester {
140140
}
141141
peer.asking = asking;
142142
peer.ask_time = Instant::now();
143-
let result = if packet_id >= ETH_PACKET_COUNT {
143+
// TODO [ToDr] This seems quite fragile. Be careful when protocol is updated.
144+
let result = if packet_id >= ETH_PROTOCOL_VERSION_63.1 {
144145
io.send_protocol(WARP_SYNC_PROTOCOL_ID, peer_id, packet_id, packet)
145146
} else {
146147
io.send(peer_id, packet_id, packet)

ethcore/sync/src/tests/helpers.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,11 +134,11 @@ impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p {
134134
}
135135

136136
fn eth_protocol_version(&self, _peer: PeerId) -> u8 {
137-
ETH_PROTOCOL_VERSION_63
137+
ETH_PROTOCOL_VERSION_63.0
138138
}
139139

140140
fn protocol_version(&self, protocol: &ProtocolId, peer_id: PeerId) -> u8 {
141-
if protocol == &WARP_SYNC_PROTOCOL_ID { PAR_PROTOCOL_VERSION_3 } else { self.eth_protocol_version(peer_id) }
141+
if protocol == &WARP_SYNC_PROTOCOL_ID { PAR_PROTOCOL_VERSION_3.0 } else { self.eth_protocol_version(peer_id) }
142142
}
143143

144144
fn chain_overlay(&self) -> &RwLock<HashMap<BlockNumber, Bytes>> {

parity/whisper.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,15 +89,13 @@ pub fn setup(target_pool_size: usize, protos: &mut Vec<AttachedProtocol>)
8989

9090
protos.push(AttachedProtocol {
9191
handler: net.clone() as Arc<_>,
92-
packet_count: whisper_net::PACKET_COUNT,
9392
versions: whisper_net::SUPPORTED_VERSIONS,
9493
protocol_id: whisper_net::PROTOCOL_ID,
9594
});
9695

9796
// parity-only extensions to whisper.
9897
protos.push(AttachedProtocol {
9998
handler: Arc::new(whisper_net::ParityExtensions),
100-
packet_count: whisper_net::PACKET_COUNT,
10199
versions: whisper_net::SUPPORTED_VERSIONS,
102100
protocol_id: whisper_net::PARITY_PROTOCOL_ID,
103101
});

util/network-devp2p/src/host.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,9 @@ const NODE_TABLE_TIMEOUT: Duration = Duration::from_secs(300);
7979
#[derive(Debug, PartialEq, Eq)]
8080
/// Protocol info
8181
pub struct CapabilityInfo {
82+
/// Protocol ID
8283
pub protocol: ProtocolId,
84+
/// Protocol version
8385
pub version: u8,
8486
/// Total number of packet IDs this protocol support.
8587
pub packet_count: u8,
@@ -687,7 +689,7 @@ impl Host {
687689
Err(e) => {
688690
let s = session.lock();
689691
trace!(target: "network", "Session read error: {}:{:?} ({:?}) {:?}", token, s.id(), s.remote_addr(), e);
690-
if let ErrorKind::Disconnect(DisconnectReason::UselessPeer) = *e.kind() {
692+
if let ErrorKind::Disconnect(DisconnectReason::IncompatibleProtocol) = *e.kind() {
691693
if let Some(id) = s.id() {
692694
if !self.reserved_nodes.read().contains(id) {
693695
let mut nodes = self.nodes.write();
@@ -990,7 +992,6 @@ impl IoHandler<NetworkIoMessage> for Host {
990992
ref handler,
991993
ref protocol,
992994
ref versions,
993-
ref packet_count,
994995
} => {
995996
let h = handler.clone();
996997
let reserved = self.reserved_nodes.read();
@@ -1000,8 +1001,12 @@ impl IoHandler<NetworkIoMessage> for Host {
10001001
);
10011002
self.handlers.write().insert(*protocol, h);
10021003
let mut info = self.info.write();
1003-
for v in versions {
1004-
info.capabilities.push(CapabilityInfo { protocol: *protocol, version: *v, packet_count: *packet_count });
1004+
for &(version, packet_count) in versions {
1005+
info.capabilities.push(CapabilityInfo {
1006+
protocol: *protocol,
1007+
version,
1008+
packet_count,
1009+
});
10051010
}
10061011
},
10071012
NetworkIoMessage::AddTimer {

util/network-devp2p/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
//! fn main () {
5050
//! let mut service = NetworkService::new(NetworkConfiguration::new_local(), None).expect("Error creating network service");
5151
//! service.start().expect("Error starting service");
52-
//! service.register_protocol(Arc::new(MyHandler), *b"myp", 1, &[1u8]);
52+
//! service.register_protocol(Arc::new(MyHandler), *b"myp", &[(1u8, 1u8)]);
5353
//!
5454
//! // Wait for quit condition
5555
//! // ...

util/network-devp2p/src/service.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,17 @@ impl NetworkService {
6767
}
6868

6969
/// Regiter a new protocol handler with the event loop.
70-
pub fn register_protocol(&self, handler: Arc<NetworkProtocolHandler + Send + Sync>, protocol: ProtocolId, packet_count: u8, versions: &[u8]) -> Result<(), Error> {
70+
pub fn register_protocol(
71+
&self,
72+
handler: Arc<NetworkProtocolHandler + Send + Sync>,
73+
protocol: ProtocolId,
74+
// version id + packet count
75+
versions: &[(u8, u8)]
76+
) -> Result<(), Error> {
7177
self.io_service.send_message(NetworkIoMessage::AddHandler {
72-
handler: handler,
73-
protocol: protocol,
78+
handler,
79+
protocol,
7480
versions: versions.to_vec(),
75-
packet_count: packet_count,
7681
})?;
7782
Ok(())
7883
}

0 commit comments

Comments
 (0)