@@ -95,6 +95,26 @@ func (s *funcSub) Err() <-chan error {
9595// Resubscribe applies backoff between calls to fn. The time between calls is adapted
9696// based on the error rate, but will never exceed backoffMax.
9797func Resubscribe (backoffMax time.Duration , fn ResubscribeFunc ) Subscription {
98+ return ResubscribeErr (backoffMax , func (ctx context.Context , _ error ) (Subscription , error ) {
99+ return fn (ctx )
100+ })
101+ }
102+
103+ // A ResubscribeFunc attempts to establish a subscription.
104+ type ResubscribeFunc func (context.Context ) (Subscription , error )
105+
106+ // ResubscribeErr calls fn repeatedly to keep a subscription established. When the
107+ // subscription is established, ResubscribeErr waits for it to fail and calls fn again. This
108+ // process repeats until Unsubscribe is called or the active subscription ends
109+ // successfully.
110+ //
111+ // The difference between Resubscribe and ResubscribeErr is that with ResubscribeErr,
112+ // the error of the failing subscription is available to the callback for logging
113+ // purposes.
114+ //
115+ // ResubscribeErr applies backoff between calls to fn. The time between calls is adapted
116+ // based on the error rate, but will never exceed backoffMax.
117+ func ResubscribeErr (backoffMax time.Duration , fn ResubscribeErrFunc ) Subscription {
98118 s := & resubscribeSub {
99119 waitTime : backoffMax / 10 ,
100120 backoffMax : backoffMax ,
@@ -106,15 +126,18 @@ func Resubscribe(backoffMax time.Duration, fn ResubscribeFunc) Subscription {
106126 return s
107127}
108128
109- // A ResubscribeFunc attempts to establish a subscription.
110- type ResubscribeFunc func (context.Context ) (Subscription , error )
129+ // A ResubscribeErrFunc attempts to establish a subscription.
130+ // For every call but the first, the second argument to this function is
131+ // the error that occurred with the previous subscription.
132+ type ResubscribeErrFunc func (context.Context , error ) (Subscription , error )
111133
112134type resubscribeSub struct {
113- fn ResubscribeFunc
135+ fn ResubscribeErrFunc
114136 err chan error
115137 unsub chan struct {}
116138 unsubOnce sync.Once
117139 lastTry mclock.AbsTime
140+ lastSubErr error
118141 waitTime , backoffMax time.Duration
119142}
120143
@@ -149,7 +172,7 @@ func (s *resubscribeSub) subscribe() Subscription {
149172 s .lastTry = mclock .Now ()
150173 ctx , cancel := context .WithCancel (context .Background ())
151174 go func () {
152- rsub , err := s .fn (ctx )
175+ rsub , err := s .fn (ctx , s . lastSubErr )
153176 sub = rsub
154177 subscribed <- err
155178 }()
@@ -178,6 +201,7 @@ func (s *resubscribeSub) waitForError(sub Subscription) bool {
178201 defer sub .Unsubscribe ()
179202 select {
180203 case err := <- sub .Err ():
204+ s .lastSubErr = err
181205 return err == nil
182206 case <- s .unsub :
183207 return true
0 commit comments