@@ -65,6 +65,8 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
6565
6666 private _registrarTopologyIds : string [ ] | undefined
6767 protected enabled : boolean
68+ private readonly maxInboundStreams : number
69+ private readonly maxOutboundStreams : number
6870
6971 constructor ( props : PubSubInit ) {
7072 super ( )
@@ -74,7 +76,9 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
7476 globalSignaturePolicy = 'StrictSign' ,
7577 canRelayMessage = false ,
7678 emitSelf = false ,
77- messageProcessingConcurrency = 10
79+ messageProcessingConcurrency = 10 ,
80+ maxInboundStreams = 1 ,
81+ maxOutboundStreams = 1
7882 } = props
7983
8084 this . multicodecs = ensureArray ( multicodecs )
@@ -88,6 +92,8 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
8892 this . emitSelf = emitSelf
8993 this . topicValidators = new Map ( )
9094 this . queue = new Queue ( { concurrency : messageProcessingConcurrency } )
95+ this . maxInboundStreams = maxInboundStreams
96+ this . maxOutboundStreams = maxOutboundStreams
9197
9298 this . _onIncomingStream = this . _onIncomingStream . bind ( this )
9399 this . _onPeerConnected = this . _onPeerConnected . bind ( this )
@@ -115,7 +121,10 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
115121 const registrar = this . components . getRegistrar ( )
116122 // Incoming streams
117123 // Called after a peer dials us
118- await Promise . all ( this . multicodecs . map ( async multicodec => await registrar . handle ( multicodec , this . _onIncomingStream ) ) )
124+ await Promise . all ( this . multicodecs . map ( async multicodec => await registrar . handle ( multicodec , this . _onIncomingStream , {
125+ maxInboundStreams : this . maxInboundStreams ,
126+ maxOutboundStreams : this . maxOutboundStreams
127+ } ) ) )
119128
120129 // register protocol with topology
121130 // Topology callbacks called on connection manager changes
0 commit comments