@@ -90,15 +90,10 @@ impl IntoConnectionHandler for Proto {
9090 }
9191}
9292
93- /// A pending reply to an inbound identification request.
94- enum Pending {
95- /// The reply is queued for sending.
96- Queued ( ReplySubstream < NegotiatedSubstream > ) ,
97- /// The reply is being sent.
98- Sending {
99- peer : PeerId ,
100- io : Pin < Box < dyn Future < Output = Result < ( ) , UpgradeError > > + Send > > ,
101- } ,
93+ /// A reply to an inbound identification request.
94+ struct Sending {
95+ peer : PeerId ,
96+ io : Pin < Box < dyn Future < Output = Result < ( ) , UpgradeError > > + Send > > ,
10297}
10398
10499/// Protocol handler for sending and receiving identification requests.
@@ -119,8 +114,11 @@ pub struct Handler {
119114 > ; 4 ] ,
120115 > ,
121116
122- /// Pending replies to send.
123- pending_replies : VecDeque < Pending > ,
117+ /// Streams awaiting `BehaviourInfo` to then send identify requests.
118+ reply_streams : VecDeque < ReplySubstream < NegotiatedSubstream > > ,
119+
120+ /// Pending identification replies, awaiting being sent.
121+ pending_replies : VecDeque < Sending > ,
124122
125123 /// Future that fires when we need to identify the node again.
126124 trigger_next_identify : Delay ,
@@ -144,9 +142,6 @@ pub struct Handler {
144142
145143 /// Address observed by or for the remote.
146144 observed_addr : Multiaddr ,
147-
148- /// Information provided by the `Behaviour` upon requesting.
149- behaviour_info : Option < BehaviourInfo > ,
150145}
151146
152147/// Information provided by the `Behaviour` upon requesting.
@@ -199,6 +194,7 @@ impl Handler {
199194 remote_peer_id,
200195 inbound_identify_push : Default :: default ( ) ,
201196 events : SmallVec :: new ( ) ,
197+ reply_streams : VecDeque :: new ( ) ,
202198 pending_replies : VecDeque :: new ( ) ,
203199 trigger_next_identify : Delay :: new ( initial_delay) ,
204200 keep_alive : KeepAlive :: Yes ,
@@ -207,7 +203,6 @@ impl Handler {
207203 protocol_version,
208204 agent_version,
209205 observed_addr,
210- behaviour_info : None ,
211206 }
212207 }
213208
@@ -222,20 +217,16 @@ impl Handler {
222217 ) {
223218 match output {
224219 EitherOutput :: First ( substream) => {
225- // If we already have `BehaviourInfo` we can proceed responding to the Identify request,
226- // if not, we request it .
227- if self . behaviour_info . is_none ( ) {
228- self . events
229- . push ( ConnectionHandlerEvent :: Custom ( Event :: Identify ) ) ;
230- }
231- if !self . pending_replies . is_empty ( ) {
220+ self . events
221+ . push ( ConnectionHandlerEvent :: Custom ( Event :: Identify ) ) ;
222+ if !self . reply_streams . is_empty ( ) {
232223 warn ! (
233224 "New inbound identify request from {} while a previous one \
234225 is still pending. Queueing the new one.",
235226 self . remote_peer_id,
236227 ) ;
237228 }
238- self . pending_replies . push_back ( Pending :: Queued ( substream) ) ;
229+ self . reply_streams . push_back ( substream) ;
239230 }
240231 EitherOutput :: Second ( fut) => {
241232 if self . inbound_identify_push . replace ( fut) . is_some ( ) {
@@ -319,8 +310,24 @@ impl ConnectionHandler for Handler {
319310 ) ,
320311 } ) ;
321312 }
322- InEvent :: Identify ( info) => {
323- self . behaviour_info = Some ( info) ;
313+ InEvent :: Identify ( behaviour_info) => {
314+ let info = Info {
315+ public_key : self . public_key . clone ( ) ,
316+ protocol_version : self . protocol_version . clone ( ) ,
317+ agent_version : self . agent_version . clone ( ) ,
318+ listen_addrs : behaviour_info. listen_addrs ,
319+ protocols : behaviour_info. protocols ,
320+ observed_addr : self . observed_addr . clone ( ) ,
321+ } ;
322+ let substream = self
323+ . reply_streams
324+ . pop_front ( )
325+ . expect ( "A BehaviourInfo reply should have a matching substream." ) ;
326+ let io = Box :: pin ( substream. send ( info) ) ;
327+ self . pending_replies . push_back ( Sending {
328+ peer : self . remote_peer_id ,
329+ io,
330+ } ) ;
324331 }
325332 }
326333 }
@@ -364,49 +371,23 @@ impl ConnectionHandler for Handler {
364371 }
365372
366373 // Check for pending replies to send.
367- if let Some ( ref info) = self . behaviour_info {
368- if let Some ( mut pending) = self . pending_replies . pop_front ( ) {
369- loop {
370- match pending {
371- Pending :: Queued ( io) => {
372- let info = Info {
373- public_key : self . public_key . clone ( ) ,
374- protocol_version : self . protocol_version . clone ( ) ,
375- agent_version : self . agent_version . clone ( ) ,
376- listen_addrs : info. listen_addrs . clone ( ) ,
377- protocols : info. protocols . clone ( ) ,
378- observed_addr : self . observed_addr . clone ( ) ,
379- } ;
380- let io = Box :: pin ( io. send ( info. clone ( ) ) ) ;
381- pending = Pending :: Sending {
382- peer : self . remote_peer_id ,
383- io,
384- } ;
385- }
386- Pending :: Sending { peer, mut io } => {
387- match Future :: poll ( Pin :: new ( & mut io) , cx) {
388- Poll :: Pending => {
389- self . pending_replies
390- . push_front ( Pending :: Sending { peer, io } ) ;
391- return Poll :: Pending ;
392- }
393- Poll :: Ready ( Ok ( ( ) ) ) => {
394- return Poll :: Ready ( ConnectionHandlerEvent :: Custom (
395- Event :: Identification ( peer) ,
396- ) ) ;
397- }
398- Poll :: Ready ( Err ( err) ) => {
399- return Poll :: Ready ( ConnectionHandlerEvent :: Custom (
400- Event :: IdentificationError (
401- ConnectionHandlerUpgrErr :: Upgrade (
402- libp2p_core:: upgrade:: UpgradeError :: Apply ( err) ,
403- ) ,
404- ) ,
405- ) )
406- }
407- }
408- }
409- }
374+ if let Some ( mut sending) = self . pending_replies . pop_front ( ) {
375+ match Future :: poll ( Pin :: new ( & mut sending. io ) , cx) {
376+ Poll :: Pending => {
377+ self . pending_replies . push_front ( sending) ;
378+ return Poll :: Pending ;
379+ }
380+ Poll :: Ready ( Ok ( ( ) ) ) => {
381+ return Poll :: Ready ( ConnectionHandlerEvent :: Custom ( Event :: Identification (
382+ sending. peer ,
383+ ) ) ) ;
384+ }
385+ Poll :: Ready ( Err ( err) ) => {
386+ return Poll :: Ready ( ConnectionHandlerEvent :: Custom ( Event :: IdentificationError (
387+ ConnectionHandlerUpgrErr :: Upgrade (
388+ libp2p_core:: upgrade:: UpgradeError :: Apply ( err) ,
389+ ) ,
390+ ) ) )
410391 }
411392 }
412393 }
0 commit comments