Skip to content

Commit a0f202c

Browse files
fjljorgemmsilva
authored andcommitted
beacon/light/api: improve handling of event stream setup failures (ethereum#29308)
The StartHeadListener method will only be called once. So it can't just make one attempt to connect to the eventsource endpoint, it has to keep trying. Note that once the stream is established, the eventsource implementation itself will keep retrying.
1 parent 1915ba9 commit a0f202c

File tree

1 file changed

+74
-32
lines changed

1 file changed

+74
-32
lines changed

beacon/light/api/light_api.go

Lines changed: 74 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717
package api
1818

1919
import (
20+
"context"
2021
"encoding/json"
2122
"errors"
2223
"fmt"
2324
"io"
2425
"net/http"
26+
"sync"
2527
"time"
2628

2729
"github.com/donovanhide/eventsource"
@@ -416,39 +418,34 @@ type HeadEventListener struct {
416418
// The callbacks are also called for the current head and optimistic head at startup.
417419
// They are never called concurrently.
418420
func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func() {
419-
closeCh := make(chan struct{}) // initiate closing the stream
420-
closedCh := make(chan struct{}) // stream closed (or failed to create)
421-
stoppedCh := make(chan struct{}) // sync loop stopped
422-
streamCh := make(chan *eventsource.Stream, 1)
421+
var (
422+
ctx, closeCtx = context.WithCancel(context.Background())
423+
streamCh = make(chan *eventsource.Stream, 1)
424+
wg sync.WaitGroup
425+
)
426+
427+
// When connected to a Lodestar node the subscription blocks until the first actual
428+
// event arrives; therefore we create the subscription in a separate goroutine while
429+
// letting the main goroutine sync up to the current head.
430+
wg.Add(1)
423431
go func() {
424-
defer close(closedCh)
425-
// when connected to a Lodestar node the subscription blocks until the
426-
// first actual event arrives; therefore we create the subscription in
427-
// a separate goroutine while letting the main goroutine sync up to the
428-
// current head
429-
req, err := http.NewRequest("GET", api.url+
430-
"/eth/v1/events?topics=head&topics=light_client_optimistic_update&topics=light_client_finality_update", nil)
431-
if err != nil {
432-
listener.OnError(fmt.Errorf("error creating event subscription request: %v", err))
433-
return
434-
}
435-
for k, v := range api.customHeaders {
436-
req.Header.Set(k, v)
437-
}
438-
stream, err := eventsource.SubscribeWithRequest("", req)
439-
if err != nil {
440-
listener.OnError(fmt.Errorf("error creating event subscription: %v", err))
441-
close(streamCh)
432+
defer wg.Done()
433+
stream := api.startEventStream(ctx, &listener)
434+
if stream == nil {
435+
// This case happens when the context was closed.
442436
return
443437
}
438+
// Stream was opened, wait for close signal.
444439
streamCh <- stream
445-
<-closeCh
440+
<-ctx.Done()
446441
stream.Close()
447442
}()
448443

444+
wg.Add(1)
449445
go func() {
450-
defer close(stoppedCh)
446+
defer wg.Done()
451447

448+
// Request initial data.
452449
if head, err := api.GetHeader(common.Hash{}); err == nil {
453450
listener.OnNewHead(head.Slot, head.Hash())
454451
}
@@ -458,39 +455,50 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func()
458455
if finalityUpdate, err := api.GetFinalityUpdate(); err == nil {
459456
listener.OnFinality(finalityUpdate)
460457
}
461-
stream := <-streamCh
462-
if stream == nil {
458+
459+
// Receive the stream.
460+
var stream *eventsource.Stream
461+
select {
462+
case stream = <-streamCh:
463+
case <-ctx.Done():
463464
return
464465
}
465466

466467
for {
467468
select {
469+
case <-ctx.Done():
470+
stream.Close()
471+
468472
case event, ok := <-stream.Events:
469473
if !ok {
470474
return
471475
}
472476
switch event.Event() {
473477
case "head":
474-
if slot, blockRoot, err := decodeHeadEvent([]byte(event.Data())); err == nil {
478+
slot, blockRoot, err := decodeHeadEvent([]byte(event.Data()))
479+
if err == nil {
475480
listener.OnNewHead(slot, blockRoot)
476481
} else {
477482
listener.OnError(fmt.Errorf("error decoding head event: %v", err))
478483
}
479484
case "light_client_optimistic_update":
480-
if signedHead, err := decodeOptimisticHeadUpdate([]byte(event.Data())); err == nil {
485+
signedHead, err := decodeOptimisticHeadUpdate([]byte(event.Data()))
486+
if err == nil {
481487
listener.OnSignedHead(signedHead)
482488
} else {
483489
listener.OnError(fmt.Errorf("error decoding optimistic update event: %v", err))
484490
}
485491
case "light_client_finality_update":
486-
if finalityUpdate, err := decodeFinalityUpdate([]byte(event.Data())); err == nil {
492+
finalityUpdate, err := decodeFinalityUpdate([]byte(event.Data()))
493+
if err == nil {
487494
listener.OnFinality(finalityUpdate)
488495
} else {
489496
listener.OnError(fmt.Errorf("error decoding finality update event: %v", err))
490497
}
491498
default:
492499
listener.OnError(fmt.Errorf("unexpected event: %s", event.Event()))
493500
}
501+
494502
case err, ok := <-stream.Errors:
495503
if !ok {
496504
return
@@ -499,9 +507,43 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func()
499507
}
500508
}
501509
}()
510+
502511
return func() {
503-
close(closeCh)
504-
<-closedCh
505-
<-stoppedCh
512+
closeCtx()
513+
wg.Wait()
514+
}
515+
}
516+
517+
// startEventStream establishes an event stream. This will keep retrying until the stream has been
518+
// established. It can only return nil when the context is canceled.
519+
func (api *BeaconLightApi) startEventStream(ctx context.Context, listener *HeadEventListener) *eventsource.Stream {
520+
for retry := true; retry; retry = ctxSleep(ctx, 5*time.Second) {
521+
path := "/eth/v1/events?topics=head&topics=light_client_optimistic_update&topics=light_client_finality_update"
522+
req, err := http.NewRequestWithContext(ctx, "GET", api.url+path, nil)
523+
if err != nil {
524+
listener.OnError(fmt.Errorf("error creating event subscription request: %v", err))
525+
continue
526+
}
527+
for k, v := range api.customHeaders {
528+
req.Header.Set(k, v)
529+
}
530+
stream, err := eventsource.SubscribeWithRequest("", req)
531+
if err != nil {
532+
listener.OnError(fmt.Errorf("error creating event subscription: %v", err))
533+
continue
534+
}
535+
return stream
536+
}
537+
return nil
538+
}
539+
540+
func ctxSleep(ctx context.Context, timeout time.Duration) (ok bool) {
541+
timer := time.NewTimer(timeout)
542+
defer timer.Stop()
543+
select {
544+
case <-timer.C:
545+
return true
546+
case <-ctx.Done():
547+
return false
506548
}
507549
}

0 commit comments

Comments
 (0)