@@ -91,12 +91,12 @@ public interface GlobalCheckpointListener {
9191 */
9292 synchronized void add (final long currentGlobalCheckpoint , final GlobalCheckpointListener listener ) {
9393 if (closed ) {
94- executor .execute (() -> listener . accept ( UNASSIGNED_SEQ_NO , new IndexShardClosedException (shardId )));
94+ executor .execute (() -> notifyListener ( listener , UNASSIGNED_SEQ_NO , new IndexShardClosedException (shardId )));
9595 return ;
9696 }
9797 if (lastKnownGlobalCheckpoint > currentGlobalCheckpoint ) {
9898 // notify directly
99- executor .execute (() -> listener . accept ( lastKnownGlobalCheckpoint , null ));
99+ executor .execute (() -> notifyListener ( listener , lastKnownGlobalCheckpoint , null ));
100100 return ;
101101 } else {
102102 if (listeners == null ) {
@@ -107,10 +107,8 @@ synchronized void add(final long currentGlobalCheckpoint, final GlobalCheckpoint
107107 }
108108
109109 @ Override
110- public void close () throws IOException {
111- synchronized (this ) {
112- closed = true ;
113- }
110+ public synchronized void close () throws IOException {
111+ closed = true ;
114112 notifyListeners (UNASSIGNED_SEQ_NO , new IndexShardClosedException (shardId ));
115113 }
116114
@@ -123,25 +121,22 @@ synchronized int pendingListeners() {
123121 *
124122 * @param globalCheckpoint the updated global checkpoint
125123 */
126- void globalCheckpointUpdated (final long globalCheckpoint ) {
124+ synchronized void globalCheckpointUpdated (final long globalCheckpoint ) {
127125 assert globalCheckpoint >= NO_OPS_PERFORMED ;
128- synchronized (this ) {
129- assert globalCheckpoint > lastKnownGlobalCheckpoint
130- : "updated global checkpoint [" + globalCheckpoint + "]"
131- + " is not more than the last known global checkpoint [" + lastKnownGlobalCheckpoint + "]" ;
132- lastKnownGlobalCheckpoint = globalCheckpoint ;
133- }
126+ assert globalCheckpoint > lastKnownGlobalCheckpoint
127+ : "updated global checkpoint [" + globalCheckpoint + "]"
128+ + " is not more than the last known global checkpoint [" + lastKnownGlobalCheckpoint + "]" ;
129+ lastKnownGlobalCheckpoint = globalCheckpoint ;
134130 notifyListeners (globalCheckpoint , null );
135131 }
136132
137133 private void notifyListeners (final long globalCheckpoint , final IndexShardClosedException e ) {
134+ assert Thread .holdsLock (this );
138135 assert (globalCheckpoint == UNASSIGNED_SEQ_NO && e != null ) || (globalCheckpoint >= NO_OPS_PERFORMED && e == null );
139136 if (listeners != null ) {
140- final List <GlobalCheckpointListener > currentListeners ;
141- synchronized (this ) {
142- currentListeners = listeners ;
143- listeners = null ;
144- }
137+ // capture the current listeners
138+ final List <GlobalCheckpointListener > currentListeners = listeners ;
139+ listeners = null ;
145140 if (currentListeners != null ) {
146141 executor .execute (() -> {
147142 for (final GlobalCheckpointListener listener : currentListeners ) {
0 commit comments