@@ -40,29 +40,32 @@ func newSpanBlockProducersTracker(
4040 }
4141
4242 return & spanBlockProducersTracker {
43- logger : logger ,
44- borConfig : borConfig ,
45- store : store ,
46- recentSelections : recentSelectionsLru ,
47- newSpans : make (chan * Span ),
48- idleSignal : make (chan struct {}),
43+ logger : logger ,
44+ borConfig : borConfig ,
45+ store : store ,
46+ recentSelections : recentSelectionsLru ,
47+ newSpans : make (chan * Span ),
48+ idleSignal : make (chan struct {}),
49+ spanProcessedSignal : make (chan struct {}),
4950 }
5051}
5152
5253type spanBlockProducersTracker struct {
53- logger log.Logger
54- borConfig * borcfg.BorConfig
55- store EntityStore [* SpanBlockProducerSelection ]
56- recentSelections * lru.Cache [uint64 , SpanBlockProducerSelection ] // sprint number -> SpanBlockProducerSelection
57- newSpans chan * Span
58- queued atomic.Int32
59- idleSignal chan struct {}
54+ logger log.Logger
55+ borConfig * borcfg.BorConfig
56+ store EntityStore [* SpanBlockProducerSelection ]
57+ recentSelections * lru.Cache [uint64 , SpanBlockProducerSelection ] // sprint number -> SpanBlockProducerSelection
58+ newSpans chan * Span
59+ queued atomic.Int32
60+ idleSignal chan struct {}
61+ spanProcessedSignal chan struct {} // signal that a new span was fully processed
6062}
6163
6264func (t * spanBlockProducersTracker ) Run (ctx context.Context ) error {
6365 t .logger .Info (heimdallLogPrefix ("running span block producers tracker component" ))
6466
6567 defer close (t .idleSignal )
68+ defer close (t .spanProcessedSignal )
6669 for {
6770 select {
6871 case <- ctx .Done ():
@@ -73,6 +76,12 @@ func (t *spanBlockProducersTracker) Run(ctx context.Context) error {
7376 return err
7477 }
7578
79+ // signal that the span was observed (non-blocking)
80+ select {
81+ case t .spanProcessedSignal <- struct {}{}:
82+ default :
83+ }
84+
7685 t .queued .Add (- 1 )
7786 if t .queued .Load () == 0 {
7887 select {
@@ -84,6 +93,23 @@ func (t *spanBlockProducersTracker) Run(ctx context.Context) error {
8493 }
8594}
8695
96+ // Anticipates a new span to be observe and fully processed withing the given timeout period.
97+ // Returns true if a new span was processed, false if no new span was processed
98+ func (t * spanBlockProducersTracker ) AnticipateNewSpanWithTimeout (ctx context.Context , timeout time.Duration ) (bool , error ) {
99+ select {
100+ case <- ctx .Done ():
101+ return false , ctx .Err ()
102+ case _ , ok := <- t .spanProcessedSignal :
103+ if ! ok {
104+ return false , errors .New ("spanProcessed channel was closed" )
105+ }
106+ return true , nil
107+
108+ case <- time .After (timeout ): // timeout
109+ }
110+ return false , nil
111+ }
112+
87113func (t * spanBlockProducersTracker ) Synchronize (ctx context.Context ) error {
88114 if t .queued .Load () == 0 {
89115 return nil
@@ -111,7 +137,7 @@ func (t *spanBlockProducersTracker) ObserveSpanAsync(ctx context.Context, span *
111137}
112138
113139func (t * spanBlockProducersTracker ) ObserveSpan (ctx context.Context , newSpan * Span ) error {
114- t .logger .Debug (heimdallLogPrefix ("block producers tracker observing span" ), "id " , newSpan . Id )
140+ t .logger .Debug (heimdallLogPrefix ("block producers tracker observing span" ), "newSpan " , newSpan )
115141
116142 lastProducerSelection , ok , err := t .store .LastEntity (ctx )
117143 if err != nil {
@@ -204,11 +230,6 @@ func (t *spanBlockProducersTracker) Producers(ctx context.Context, blockNum uint
204230func (t * spanBlockProducersTracker ) producers (ctx context.Context , blockNum uint64 ) (* ValidatorSet , int , error ) {
205231 currentSprintNum := t .borConfig .CalculateSprintNumber (blockNum )
206232
207- // have we previously calculated the producers for the same sprint num (chain tip optimisation)
208- if selection , ok := t .recentSelections .Get (currentSprintNum ); ok {
209- return selection .Producers .Copy (), 0 , nil
210- }
211-
212233 // have we previously calculated the producers for the previous sprint num of the same span (chain tip optimisation)
213234 spanId , ok , err := t .store .EntityIdFromBlockNum (ctx , blockNum )
214235 if err != nil {
@@ -217,20 +238,7 @@ func (t *spanBlockProducersTracker) producers(ctx context.Context, blockNum uint
217238 if ! ok {
218239 return nil , 0 , fmt .Errorf ("could not get spanId from blockNum=%d" , blockNum )
219240 }
220- var prevSprintNum uint64
221- if currentSprintNum > 0 {
222- prevSprintNum = currentSprintNum - 1
223- }
224- if selection , ok := t .recentSelections .Get (prevSprintNum ); ok && SpanId (spanId ) == selection .SpanId {
225- producersCopy := selection .Producers .Copy ()
226- producersCopy .IncrementProposerPriority (1 )
227- selectionCopy := selection
228- selectionCopy .Producers = producersCopy
229- t .recentSelections .Add (currentSprintNum , selectionCopy )
230- return producersCopy , 1 , nil
231- }
232241
233- // no recent selection that we can easily use, re-calculate from DB
234242 producerSelection , ok , err := t .store .Entity (ctx , spanId )
235243 if err != nil {
236244 return nil , 0 , err
@@ -252,7 +260,5 @@ func (t *spanBlockProducersTracker) producers(ctx context.Context, blockNum uint
252260 producers = GetUpdatedValidatorSet (producers , producers .Validators , t .logger )
253261 producers .IncrementProposerPriority (1 )
254262 }
255-
256- t .recentSelections .Add (currentSprintNum , * producerSelection )
257263 return producers , increments , nil
258264}
0 commit comments