@@ -181,52 +181,53 @@ func (f *clientPool) stop() {
181181 f .lock .Unlock ()
182182}
183183
184- // registerPeer implements peerSetNotify
185- func (f * clientPool ) registerPeer (p * peer ) {
186- c := f .connect (p , 0 )
187- if c != nil {
188- p .balanceTracker = & c .balanceTracker
189- }
190- }
191-
192184// connect should be called after a successful handshake. If the connection was
193185// rejected, there is no need to call disconnect.
194- func (f * clientPool ) connect (peer clientPeer , capacity uint64 ) * clientInfo {
186+ func (f * clientPool ) connect (peer clientPeer , capacity uint64 ) bool {
195187 f .lock .Lock ()
196188 defer f .lock .Unlock ()
197189
190+ // Short circuit is clientPool is already closed.
198191 if f .closed {
199- return nil
192+ return false
200193 }
201- address := peer .freeClientId ()
202- id := peer .ID ()
203- idStr := peerIdToString (id )
194+ // Dedup connected peers.
195+ id , freeID := peer .ID (), peer .freeClientId ()
204196 if _ , ok := f .connectedMap [id ]; ok {
205197 clientRejectedMeter .Mark (1 )
206- log .Debug ("Client already connected" , "address" , address , "id" , idStr )
207- return nil
198+ log .Debug ("Client already connected" , "address" , freeID , "id" , peerIdToString ( id ) )
199+ return false
208200 }
201+ // Create a clientInfo but do not add it yet
209202 now := f .clock .Now ()
210- // create a clientInfo but do not add it yet
211- e := & clientInfo {pool : f , peer : peer , address : address , queueIndex : - 1 , id : id }
212203 posBalance := f .getPosBalance (id ).value
213- e .priority = posBalance != 0
204+ e := & clientInfo {pool : f , peer : peer , address : freeID , queueIndex : - 1 , id : id , priority : posBalance != 0 }
205+
214206 var negBalance uint64
215- nb := f .negBalanceMap [address ]
207+ nb := f .negBalanceMap [freeID ]
216208 if nb != nil {
217209 negBalance = uint64 (math .Exp (float64 (nb .logValue - f .logOffset (now )) / fixedPointMultiplier ))
218210 }
211+ // If the client is a free client, assign with a low free capacity,
212+ // Otherwise assign with the given value(priority client)
219213 if ! e .priority {
220214 capacity = f .freeClientCap
221215 }
222- // check whether it fits into connectedQueue
216+ // Ensure the capacity will never lower than the free capacity.
223217 if capacity < f .freeClientCap {
224218 capacity = f .freeClientCap
225219 }
226220 e .capacity = capacity
221+
227222 e .balanceTracker .init (f .clock , capacity )
228223 e .balanceTracker .setBalance (posBalance , negBalance )
229224 f .setClientPriceFactors (e )
225+
226+ // If the number of clients already connected in the clientpool exceeds its
227+ // capacity, evict some clients with lowest priority.
228+ //
229+ // If the priority of the newly added client is lower than the priority of
230+ // all connected clients, the client is rejected.
230231 newCapacity := f .connectedCapacity + capacity
231232 newCount := f .connectedQueue .Size () + 1
232233 if newCapacity > f .capacityLimit || newCount > f .countLimit {
@@ -248,8 +249,8 @@ func (f *clientPool) connect(peer clientPeer, capacity uint64) *clientInfo {
248249 f .connectedQueue .Push (c )
249250 }
250251 clientRejectedMeter .Mark (1 )
251- log .Debug ("Client rejected" , "address" , address , "id" , idStr )
252- return nil
252+ log .Debug ("Client rejected" , "address" , freeID , "id" , peerIdToString ( id ) )
253+ return false
253254 }
254255 // accept new client, drop old ones
255256 for _ , c := range kickList {
@@ -258,7 +259,7 @@ func (f *clientPool) connect(peer clientPeer, capacity uint64) *clientInfo {
258259 }
259260 // client accepted, finish setting it up
260261 if nb != nil {
261- delete (f .negBalanceMap , address )
262+ delete (f .negBalanceMap , freeID )
262263 f .negBalanceQueue .Remove (nb .queueIndex )
263264 }
264265 if e .priority {
@@ -272,13 +273,8 @@ func (f *clientPool) connect(peer clientPeer, capacity uint64) *clientInfo {
272273 e .peer .updateCapacity (e .capacity )
273274 }
274275 clientConnectedMeter .Mark (1 )
275- log .Debug ("Client accepted" , "address" , address )
276- return e
277- }
278-
279- // unregisterPeer implements peerSetNotify
280- func (f * clientPool ) unregisterPeer (p * peer ) {
281- f .disconnect (p )
276+ log .Debug ("Client accepted" , "address" , freeID )
277+ return true
282278}
283279
284280// disconnect should be called when a connection is terminated. If the disconnection
@@ -378,6 +374,18 @@ func (f *clientPool) setLimits(count int, totalCap uint64) {
378374 })
379375}
380376
377+ // requestCost feeds request cost after serving a request from the given peer.
378+ func (f * clientPool ) requestCost (p * peer , cost uint64 ) {
379+ f .lock .Lock ()
380+ defer f .lock .Unlock ()
381+
382+ info , exist := f .connectedMap [p .ID ()]
383+ if ! exist || f .closed {
384+ return
385+ }
386+ info .balanceTracker .requestCost (cost )
387+ }
388+
381389// logOffset calculates the time-dependent offset for the logarithmic
382390// representation of negative balance
383391func (f * clientPool ) logOffset (now mclock.AbsTime ) int64 {
0 commit comments