2121
2222import org .apache .logging .log4j .Logger ;
2323import org .apache .logging .log4j .message .ParameterizedMessage ;
24+ import org .elasticsearch .common .unit .TimeValue ;
25+ import org .elasticsearch .common .util .concurrent .FutureUtils ;
2426
2527import java .io .Closeable ;
2628import java .io .IOException ;
27- import java .util .ArrayList ;
28- import java .util .List ;
29+ import java .util .LinkedHashMap ;
30+ import java .util .Map ;
2931import java .util .Objects ;
3032import java .util .concurrent .Executor ;
33+ import java .util .concurrent .ScheduledExecutorService ;
34+ import java .util .concurrent .ScheduledFuture ;
35+ import java .util .concurrent .TimeUnit ;
36+ import java .util .concurrent .TimeoutException ;
3137
3238import static org .elasticsearch .index .seqno .SequenceNumbers .NO_OPS_PERFORMED ;
3339import static org .elasticsearch .index .seqno .SequenceNumbers .UNASSIGNED_SEQ_NO ;
@@ -45,51 +51,59 @@ public class GlobalCheckpointListeners implements Closeable {
4551 public interface GlobalCheckpointListener {
4652 /**
4753 * Callback when the global checkpoint is updated or the shard is closed. If the shard is closed, the value of the global checkpoint
48- * will be set to {@link org.elasticsearch.index.seqno.SequenceNumbers#UNASSIGNED_SEQ_NO} and the exception will be non-null. If the
49- * global checkpoint is updated, the exception will be null.
54+ * will be set to {@link org.elasticsearch.index.seqno.SequenceNumbers#UNASSIGNED_SEQ_NO} and the exception will be non-null and an
55+ * instance of {@link IndexShardClosedException }. If the listener timed out waiting for notification then the exception will be
56+ * non-null and an instance of {@link TimeoutException}. If the global checkpoint is updated, the exception will be null.
5057 *
5158 * @param globalCheckpoint the updated global checkpoint
52- * @param e if non-null, the shard is closed
59+ * @param e if non-null, the shard is closed or the listener timed out
5360 */
54- void accept (long globalCheckpoint , IndexShardClosedException e );
61+ void accept (long globalCheckpoint , Exception e );
5562 }
5663
5764 // guarded by this
5865 private boolean closed ;
59- private volatile List <GlobalCheckpointListener > listeners ;
66+ private volatile Map <GlobalCheckpointListener , ScheduledFuture <?> > listeners ;
6067 private long lastKnownGlobalCheckpoint = UNASSIGNED_SEQ_NO ;
6168
6269 private final ShardId shardId ;
6370 private final Executor executor ;
71+ private final ScheduledExecutorService scheduler ;
6472 private final Logger logger ;
6573
6674 /**
6775 * Construct a global checkpoint listeners collection.
6876 *
69- * @param shardId the shard ID on which global checkpoint updates can be listened to
70- * @param executor the executor for listener notifications
71- * @param logger a shard-level logger
77+ * @param shardId the shard ID on which global checkpoint updates can be listened to
78+ * @param executor the executor for listener notifications
79+ * @param scheduler the executor used for scheduling timeouts
80+ * @param logger a shard-level logger
7281 */
7382 GlobalCheckpointListeners (
7483 final ShardId shardId ,
7584 final Executor executor ,
85+ final ScheduledExecutorService scheduler ,
7686 final Logger logger ) {
77- this .shardId = Objects .requireNonNull (shardId );
78- this .executor = Objects .requireNonNull (executor );
79- this .logger = Objects .requireNonNull (logger );
87+ this .shardId = Objects .requireNonNull (shardId , "shardId" );
88+ this .executor = Objects .requireNonNull (executor , "executor" );
89+ this .scheduler = Objects .requireNonNull (scheduler , "scheduler" );
90+ this .logger = Objects .requireNonNull (logger , "logger" );
8091 }
8192
8293 /**
8394 * Add a global checkpoint listener. If the global checkpoint is above the current global checkpoint known to the listener then the
8495 * listener will be asynchronously notified on the executor used to construct this collection of global checkpoint listeners. If the
8596 * shard is closed then the listener will be asynchronously notified on the executor used to construct this collection of global
8697 * checkpoint listeners. The listener will only be notified of at most one event, either the global checkpoint is updated or the shard
87- * is closed. A listener must re-register after one of these events to receive subsequent events.
98+ * is closed. A listener must re-register after one of these events to receive subsequent events. Callers may add a timeout to be
99+ * notified after if the timeout elapses. In this case, the listener will be notified with a {@link TimeoutException}. Passing null for
100+ * the timeout means no timeout will be associated to the listener.
88101 *
89102 * @param currentGlobalCheckpoint the current global checkpoint known to the listener
90103 * @param listener the listener
104+ * @param timeout the listener timeout, or null if no timeout
91105 */
92- synchronized void add (final long currentGlobalCheckpoint , final GlobalCheckpointListener listener ) {
106+ synchronized void add (final long currentGlobalCheckpoint , final GlobalCheckpointListener listener , final TimeValue timeout ) {
93107 if (closed ) {
94108 executor .execute (() -> notifyListener (listener , UNASSIGNED_SEQ_NO , new IndexShardClosedException (shardId )));
95109 return ;
@@ -99,9 +113,41 @@ synchronized void add(final long currentGlobalCheckpoint, final GlobalCheckpoint
99113 executor .execute (() -> notifyListener (listener , lastKnownGlobalCheckpoint , null ));
100114 } else {
101115 if (listeners == null ) {
102- listeners = new ArrayList <>();
116+ listeners = new LinkedHashMap <>();
117+ }
118+ if (timeout == null ) {
119+ listeners .put (listener , null );
120+ } else {
121+ listeners .put (
122+ listener ,
123+ scheduler .schedule (
124+ () -> {
125+ final boolean removed ;
126+ synchronized (this ) {
127+ /*
128+ * Note that the listeners map can be null if a notification nulled out the map reference when
129+ * notifying listeners, and then our scheduled execution occurred before we could be cancelled by
130+ * the notification. In this case, we would have blocked waiting for access to this critical
131+ * section.
132+ *
133+ * What is more, we know that this listener has a timeout associated with it (otherwise we would
134+ * not be here) so the return value from remove being null is an indication that we are not in the
135+ * map. This can happen if a notification nulled out the listeners, and then our scheduled execution
136+ * occurred before we could be cancelled by the notification, and then another thread added a
137+ * listener causing the listeners map reference to be non-null again. In this case, our listener
138+ * here would not be in the map and we should not fire the timeout logic.
139+ */
140+ removed = listeners != null && listeners .remove (listener ) != null ;
141+ }
142+ if (removed ) {
143+ final TimeoutException e = new TimeoutException (timeout .getStringRep ());
144+ logger .trace ("global checkpoint listener timed out" , e );
145+ executor .execute (() -> notifyListener (listener , UNASSIGNED_SEQ_NO , e ));
146+ }
147+ },
148+ timeout .nanos (),
149+ TimeUnit .NANOSECONDS ));
103150 }
104- listeners .add (listener );
105151 }
106152 }
107153
@@ -111,10 +157,25 @@ public synchronized void close() throws IOException {
111157 notifyListeners (UNASSIGNED_SEQ_NO , new IndexShardClosedException (shardId ));
112158 }
113159
160+ /**
161+ * The number of listeners currently pending for notification.
162+ *
163+ * @return the number of listeners pending notification
164+ */
114165 synchronized int pendingListeners () {
115166 return listeners == null ? 0 : listeners .size ();
116167 }
117168
169+ /**
170+ * The scheduled future for a listener that has a timeout associated with it, otherwise null.
171+ *
172+ * @param listener the listener to get the scheduled future for
173+ * @return a scheduled future representing the timeout future for the listener, otherwise null
174+ */
175+ synchronized ScheduledFuture <?> getTimeoutFuture (final GlobalCheckpointListener listener ) {
176+ return listeners .get (listener );
177+ }
178+
118179 /**
119180 * Invoke to notify all registered listeners of an updated global checkpoint.
120181 *
@@ -134,19 +195,24 @@ private void notifyListeners(final long globalCheckpoint, final IndexShardClosed
134195 assert (globalCheckpoint == UNASSIGNED_SEQ_NO && e != null ) || (globalCheckpoint >= NO_OPS_PERFORMED && e == null );
135196 if (listeners != null ) {
136197 // capture the current listeners
137- final List <GlobalCheckpointListener > currentListeners = listeners ;
198+ final Map <GlobalCheckpointListener , ScheduledFuture <?> > currentListeners = listeners ;
138199 listeners = null ;
139200 if (currentListeners != null ) {
140201 executor .execute (() -> {
141- for (final GlobalCheckpointListener listener : currentListeners ) {
142- notifyListener (listener , globalCheckpoint , e );
202+ for (final Map .Entry <GlobalCheckpointListener , ScheduledFuture <?>> listener : currentListeners .entrySet ()) {
203+ /*
204+ * We do not want to interrupt any timeouts that fired, these will detect that the listener has been notified and
205+ * not trigger the timeout.
206+ */
207+ FutureUtils .cancel (listener .getValue ());
208+ notifyListener (listener .getKey (), globalCheckpoint , e );
143209 }
144210 });
145211 }
146212 }
147213 }
148214
149- private void notifyListener (final GlobalCheckpointListener listener , final long globalCheckpoint , final IndexShardClosedException e ) {
215+ private void notifyListener (final GlobalCheckpointListener listener , final long globalCheckpoint , final Exception e ) {
150216 try {
151217 listener .accept (globalCheckpoint , e );
152218 } catch (final Exception caught ) {
@@ -156,8 +222,11 @@ private void notifyListener(final GlobalCheckpointListener listener, final long
156222 "error notifying global checkpoint listener of updated global checkpoint [{}]" ,
157223 globalCheckpoint ),
158224 caught );
159- } else {
225+ } else if ( e instanceof IndexShardClosedException ) {
160226 logger .warn ("error notifying global checkpoint listener of closed shard" , caught );
227+ } else {
228+ assert e instanceof TimeoutException : e ;
229+ logger .warn ("error notifying global checkpoint listener of timeout" , caught );
161230 }
162231 }
163232 }
0 commit comments