@@ -114,43 +114,73 @@ func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(context.Context, func()
114114}
115115
116116func (q * ByteFIFOQueue ) readToChan () {
117+ // handle quick cancels
118+ select {
119+ case <- q .closed :
120+ // tell the pool to shutdown.
121+ q .cancel ()
122+ return
123+ default :
124+ }
125+
126+ backOffTime := time .Millisecond * 100
127+ maxBackOffTime := time .Second * 3
117128 for {
118- select {
119- case <- q .closed :
120- // tell the pool to shutdown.
121- q .cancel ()
122- return
123- default :
124- q .lock .Lock ()
125- bs , err := q .byteFIFO .Pop ()
126- if err != nil {
127- q .lock .Unlock ()
128- log .Error ("%s: %s Error on Pop: %v" , q .typ , q .name , err )
129- time .Sleep (time .Millisecond * 100 )
130- continue
131- }
129+ success , resetBackoff := q .doPop ()
130+ if resetBackoff {
131+ backOffTime = 100 * time .Millisecond
132+ }
132133
133- if len (bs ) == 0 {
134- q .lock .Unlock ()
135- time .Sleep (time .Millisecond * 100 )
136- continue
134+ if success {
135+ select {
136+ case <- q .closed :
137+ // tell the pool to shutdown.
138+ q .cancel ()
139+ return
140+ default :
137141 }
138-
139- data , err := unmarshalAs (bs , q .exemplar )
140- if err != nil {
141- log .Error ("%s: %s Failed to unmarshal with error: %v" , q .typ , q .name , err )
142- q .lock .Unlock ()
143- time .Sleep (time .Millisecond * 100 )
144- continue
142+ } else {
143+ select {
144+ case <- q .closed :
145+ // tell the pool to shutdown.
146+ q .cancel ()
147+ return
148+ case <- time .After (backOffTime ):
149+ }
150+ backOffTime += backOffTime / 2
151+ if backOffTime > maxBackOffTime {
152+ backOffTime = maxBackOffTime
145153 }
146-
147- log .Trace ("%s %s: Task found: %#v" , q .typ , q .name , data )
148- q .WorkerPool .Push (data )
149- q .lock .Unlock ()
150154 }
151155 }
152156}
153157
158+ func (q * ByteFIFOQueue ) doPop () (success , resetBackoff bool ) {
159+ q .lock .Lock ()
160+ defer q .lock .Unlock ()
161+ bs , err := q .byteFIFO .Pop ()
162+ if err != nil {
163+ log .Error ("%s: %s Error on Pop: %v" , q .typ , q .name , err )
164+ return
165+ }
166+ if len (bs ) == 0 {
167+ return
168+ }
169+
170+ resetBackoff = true
171+
172+ data , err := unmarshalAs (bs , q .exemplar )
173+ if err != nil {
174+ log .Error ("%s: %s Failed to unmarshal with error: %v" , q .typ , q .name , err )
175+ return
176+ }
177+
178+ log .Trace ("%s %s: Task found: %#v" , q .typ , q .name , data )
179+ q .WorkerPool .Push (data )
180+ success = true
181+ return
182+ }
183+
154184// Shutdown processing from this queue
155185func (q * ByteFIFOQueue ) Shutdown () {
156186 log .Trace ("%s: %s Shutting down" , q .typ , q .name )
0 commit comments