2525import org .elasticsearch .ElasticsearchException ;
2626import org .elasticsearch .ExceptionsHelper ;
2727import org .elasticsearch .Version ;
28+ import org .elasticsearch .action .ActionListener ;
2829import org .elasticsearch .cluster .ClusterChangedEvent ;
2930import org .elasticsearch .cluster .ClusterState ;
3031import org .elasticsearch .cluster .ClusterStateObserver ;
4748import org .elasticsearch .common .io .stream .StreamInput ;
4849import org .elasticsearch .common .io .stream .StreamOutput ;
4950import org .elasticsearch .common .unit .TimeValue ;
50- import org .elasticsearch .common .util .concurrent .ConcurrentCollections ;
5151import org .elasticsearch .discovery .Discovery ;
5252import org .elasticsearch .index .shard .ShardId ;
5353import org .elasticsearch .node .NodeClosedException ;
5454import org .elasticsearch .threadpool .ThreadPool ;
5555import org .elasticsearch .transport .ConnectTransportException ;
5656import org .elasticsearch .transport .EmptyTransportResponseHandler ;
57- import org .elasticsearch .transport .NodeDisconnectedException ;
5857import org .elasticsearch .transport .RemoteTransportException ;
5958import org .elasticsearch .transport .TransportChannel ;
6059import org .elasticsearch .transport .TransportException ;
6160import org .elasticsearch .transport .TransportRequest ;
61+ import org .elasticsearch .transport .TransportRequestDeduplicator ;
6262import org .elasticsearch .transport .TransportRequestHandler ;
6363import org .elasticsearch .transport .TransportResponse ;
6464import org .elasticsearch .transport .TransportService ;
7070import java .util .Locale ;
7171import java .util .Objects ;
7272import java .util .Set ;
73- import java .util .concurrent .ConcurrentMap ;
7473import java .util .function .Predicate ;
7574
7675import static org .elasticsearch .index .seqno .SequenceNumbers .UNASSIGNED_PRIMARY_TERM ;
@@ -88,7 +87,7 @@ public class ShardStateAction {
8887
8988 // a list of shards that failed during replication
9089 // we keep track of these shards in order to avoid sending duplicate failed shard requests for a single failing shard.
91- private final ConcurrentMap <FailedShardEntry , CompositeListener > remoteFailedShardsCache = ConcurrentCollections . newConcurrentMap ();
90+ private final TransportRequestDeduplicator <FailedShardEntry > remoteFailedShardsDeduplicator = new TransportRequestDeduplicator <> ();
9291
9392 @ Inject
9493 public ShardStateAction (ClusterService clusterService , TransportService transportService ,
@@ -105,7 +104,7 @@ public ShardStateAction(ClusterService clusterService, TransportService transpor
105104 }
106105
107106 private void sendShardAction (final String actionName , final ClusterState currentState ,
108- final TransportRequest request , final Listener listener ) {
107+ final TransportRequest request , final ActionListener < Void > listener ) {
109108 ClusterStateObserver observer =
110109 new ClusterStateObserver (currentState , clusterService , null , logger , threadPool .getThreadContext ());
111110 DiscoveryNode masterNode = currentState .nodes ().getMasterNode ();
@@ -119,7 +118,7 @@ private void sendShardAction(final String actionName, final ClusterState current
119118 actionName , request , new EmptyTransportResponseHandler (ThreadPool .Names .SAME ) {
120119 @ Override
121120 public void handleResponse (TransportResponse .Empty response ) {
122- listener .onSuccess ( );
121+ listener .onResponse ( null );
123122 }
124123
125124 @ Override
@@ -162,60 +161,39 @@ private static boolean isMasterChannelException(TransportException exp) {
162161 * @param listener callback upon completion of the request
163162 */
164163 public void remoteShardFailed (final ShardId shardId , String allocationId , long primaryTerm , boolean markAsStale , final String message ,
165- @ Nullable final Exception failure , Listener listener ) {
164+ @ Nullable final Exception failure , ActionListener < Void > listener ) {
166165 assert primaryTerm > 0L : "primary term should be strictly positive" ;
167- final FailedShardEntry shardEntry = new FailedShardEntry (shardId , allocationId , primaryTerm , message , failure , markAsStale );
168- final CompositeListener compositeListener = new CompositeListener (listener );
169- final CompositeListener existingListener = remoteFailedShardsCache .putIfAbsent (shardEntry , compositeListener );
170- if (existingListener == null ) {
171- sendShardAction (SHARD_FAILED_ACTION_NAME , clusterService .state (), shardEntry , new Listener () {
172- @ Override
173- public void onSuccess () {
174- try {
175- compositeListener .onSuccess ();
176- } finally {
177- remoteFailedShardsCache .remove (shardEntry );
178- }
179- }
180- @ Override
181- public void onFailure (Exception e ) {
182- try {
183- compositeListener .onFailure (e );
184- } finally {
185- remoteFailedShardsCache .remove (shardEntry );
186- }
187- }
188- });
189- } else {
190- existingListener .addListener (listener );
191- }
166+ remoteFailedShardsDeduplicator .executeOnce (
167+ new FailedShardEntry (shardId , allocationId , primaryTerm , message , failure , markAsStale ), listener ,
168+ (req , reqListener ) -> sendShardAction (SHARD_FAILED_ACTION_NAME , clusterService .state (), req , reqListener ));
192169 }
193170
194171 int remoteShardFailedCacheSize () {
195- return remoteFailedShardsCache .size ();
172+ return remoteFailedShardsDeduplicator .size ();
196173 }
197174
198175 /**
199176 * Send a shard failed request to the master node to update the cluster state when a shard on the local node failed.
200177 */
201178 public void localShardFailed (final ShardRouting shardRouting , final String message ,
202- @ Nullable final Exception failure , Listener listener ) {
179+ @ Nullable final Exception failure , ActionListener < Void > listener ) {
203180 localShardFailed (shardRouting , message , failure , listener , clusterService .state ());
204181 }
205182
206183 /**
207184 * Send a shard failed request to the master node to update the cluster state when a shard on the local node failed.
208185 */
209186 public void localShardFailed (final ShardRouting shardRouting , final String message , @ Nullable final Exception failure ,
210- Listener listener , final ClusterState currentState ) {
187+ ActionListener < Void > listener , final ClusterState currentState ) {
211188 FailedShardEntry shardEntry = new FailedShardEntry (shardRouting .shardId (), shardRouting .allocationId ().getId (),
212189 0L , message , failure , true );
213190 sendShardAction (SHARD_FAILED_ACTION_NAME , currentState , shardEntry , listener );
214191 }
215192
216193 // visible for testing
217194 protected void waitForNewMasterAndRetry (String actionName , ClusterStateObserver observer ,
218- TransportRequest request , Listener listener , Predicate <ClusterState > changePredicate ) {
195+ TransportRequest request , ActionListener <Void > listener ,
196+ Predicate <ClusterState > changePredicate ) {
219197 observer .waitForNextChange (new ClusterStateObserver .Listener () {
220198 @ Override
221199 public void onNewClusterState (ClusterState state ) {
@@ -496,14 +474,14 @@ public int hashCode() {
496474 public void shardStarted (final ShardRouting shardRouting ,
497475 final long primaryTerm ,
498476 final String message ,
499- final Listener listener ) {
477+ final ActionListener < Void > listener ) {
500478 shardStarted (shardRouting , primaryTerm , message , listener , clusterService .state ());
501479 }
502480
503481 public void shardStarted (final ShardRouting shardRouting ,
504482 final long primaryTerm ,
505483 final String message ,
506- final Listener listener ,
484+ final ActionListener < Void > listener ,
507485 final ClusterState currentState ) {
508486 StartedShardEntry entry = new StartedShardEntry (shardRouting .shardId (), shardRouting .allocationId ().getId (), primaryTerm , message );
509487 sendShardAction (SHARD_STARTED_ACTION_NAME , currentState , entry , listener );
@@ -669,97 +647,6 @@ public String toString() {
669647 }
670648 }
671649
672- public interface Listener {
673-
674- default void onSuccess () {
675- }
676-
677- /**
678- * Notification for non-channel exceptions that are not handled
679- * by {@link ShardStateAction}.
680- *
681- * The exceptions that are handled by {@link ShardStateAction}
682- * are:
683- * - {@link NotMasterException}
684- * - {@link NodeDisconnectedException}
685- * - {@link Discovery.FailedToCommitClusterStateException}
686- *
687- * Any other exception is communicated to the requester via
688- * this notification.
689- *
690- * @param e the unexpected cause of the failure on the master
691- */
692- default void onFailure (final Exception e ) {
693- }
694-
695- }
696-
697- /**
698- * A composite listener that allows registering multiple listeners dynamically.
699- */
700- static final class CompositeListener implements Listener {
701- private boolean isNotified = false ;
702- private Exception failure = null ;
703- private final List <Listener > listeners = new ArrayList <>();
704-
705- CompositeListener (Listener listener ) {
706- listeners .add (listener );
707- }
708-
709- void addListener (Listener listener ) {
710- final boolean ready ;
711- synchronized (this ) {
712- ready = this .isNotified ;
713- if (ready == false ) {
714- listeners .add (listener );
715- }
716- }
717- if (ready ) {
718- if (failure != null ) {
719- listener .onFailure (failure );
720- } else {
721- listener .onSuccess ();
722- }
723- }
724- }
725-
726- private void onCompleted (Exception failure ) {
727- synchronized (this ) {
728- this .failure = failure ;
729- this .isNotified = true ;
730- }
731- RuntimeException firstException = null ;
732- for (Listener listener : listeners ) {
733- try {
734- if (failure != null ) {
735- listener .onFailure (failure );
736- } else {
737- listener .onSuccess ();
738- }
739- } catch (RuntimeException innerEx ) {
740- if (firstException == null ) {
741- firstException = innerEx ;
742- } else {
743- firstException .addSuppressed (innerEx );
744- }
745- }
746- }
747- if (firstException != null ) {
748- throw firstException ;
749- }
750- }
751-
752- @ Override
753- public void onSuccess () {
754- onCompleted (null );
755- }
756-
757- @ Override
758- public void onFailure (Exception failure ) {
759- onCompleted (failure );
760- }
761- }
762-
763650 public static class NoLongerPrimaryShardException extends ElasticsearchException {
764651
765652 public NoLongerPrimaryShardException (ShardId shardId , String msg ) {
0 commit comments