55 */
66package org .elasticsearch .xpack .ccr .action ;
77
8+ import com .carrotsearch .hppc .predicates .ObjectPredicate ;
89import org .apache .logging .log4j .LogManager ;
910import org .apache .logging .log4j .Logger ;
1011import org .apache .logging .log4j .message .ParameterizedMessage ;
4243import java .util .ArrayList ;
4344import java .util .Collections ;
4445import java .util .HashMap ;
46+ import java .util .HashSet ;
4547import java .util .LinkedHashMap ;
4648import java .util .List ;
4749import java .util .Map ;
@@ -151,7 +153,7 @@ void updateAutoFollowers(ClusterState followerClusterState) {
151153 AutoFollower autoFollower = new AutoFollower (remoteCluster , threadPool , this ::updateStats , clusterService ::state ) {
152154
153155 @ Override
154- void getLeaderClusterState (final String remoteCluster ,
156+ void getRemoteClusterState (final String remoteCluster ,
155157 final BiConsumer <ClusterState , Exception > handler ) {
156158 final ClusterStateRequest request = new ClusterStateRequest ();
157159 request .clear ();
@@ -163,7 +165,7 @@ void getLeaderClusterState(final String remoteCluster,
163165 remoteCluster ,
164166 request ,
165167 e -> handler .accept (null , e ),
166- leaderClusterState -> handler .accept (leaderClusterState , null ));
168+ remoteClusterState -> handler .accept (remoteClusterState , null ));
167169 }
168170
169171 @ Override
@@ -203,7 +205,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
203205
204206 };
205207 newAutoFollowers .put (remoteCluster , autoFollower );
206- autoFollower .autoFollowIndices ();
208+ autoFollower .start ();
207209 }
208210
209211 List <String > removedRemoteClusters = new ArrayList <>();
@@ -254,9 +256,9 @@ abstract static class AutoFollower {
254256 this .followerClusterStateSupplier = followerClusterStateSupplier ;
255257 }
256258
257- void autoFollowIndices () {
258- final ClusterState followerClusterState = followerClusterStateSupplier .get ();
259- final AutoFollowMetadata autoFollowMetadata = followerClusterState .metaData ().custom (AutoFollowMetadata .TYPE );
259+ void start () {
260+ final ClusterState clusterState = followerClusterStateSupplier .get ();
261+ final AutoFollowMetadata autoFollowMetadata = clusterState .metaData ().custom (AutoFollowMetadata .TYPE );
260262 if (autoFollowMetadata == null ) {
261263 LOGGER .info ("AutoFollower for cluster [{}] has stopped, because there is no autofollow metadata" , remoteCluster );
262264 return ;
@@ -274,51 +276,58 @@ void autoFollowIndices() {
274276 this .autoFollowPatternsCountDown = new CountDown (patterns .size ());
275277 this .autoFollowResults = new AtomicArray <>(patterns .size ());
276278
277- getLeaderClusterState (remoteCluster , (leaderClusterState , e ) -> {
278- if (leaderClusterState != null ) {
279- assert e == null ;
280-
281- int i = 0 ;
282- for (String autoFollowPatternName : patterns ) {
283- final int slot = i ;
284- AutoFollowPattern autoFollowPattern = autoFollowMetadata .getPatterns ().get (autoFollowPatternName );
285- Map <String , String > headers = autoFollowMetadata .getHeaders ().get (autoFollowPatternName );
286- List <String > followedIndices = autoFollowMetadata .getFollowedLeaderIndexUUIDs ().get (autoFollowPatternName );
287-
288- final List <Index > leaderIndicesToFollow = getLeaderIndicesToFollow (autoFollowPattern , leaderClusterState ,
289- followerClusterState , followedIndices );
290- if (leaderIndicesToFollow .isEmpty ()) {
291- finalise (slot , new AutoFollowResult (autoFollowPatternName ));
292- } else {
293- List <Tuple <String , AutoFollowPattern >> patternsForTheSameLeaderCluster = autoFollowMetadata .getPatterns ()
294- .entrySet ().stream ()
295- .filter (item -> autoFollowPatternName .equals (item .getKey ()) == false )
296- .filter (item -> remoteCluster .equals (item .getValue ().getRemoteCluster ()))
297- .map (item -> new Tuple <>(item .getKey (), item .getValue ()))
298- .collect (Collectors .toList ());
299-
300- Consumer <AutoFollowResult > resultHandler = result -> finalise (slot , result );
301- checkAutoFollowPattern (autoFollowPatternName , remoteCluster , autoFollowPattern , leaderIndicesToFollow , headers ,
302- patternsForTheSameLeaderCluster , resultHandler );
303- }
304- i ++;
305- }
279+ getRemoteClusterState (remoteCluster , (remoteClusterState , remoteError ) -> {
280+ if (remoteClusterState != null ) {
281+ assert remoteError == null ;
282+ autoFollowIndices (autoFollowMetadata , clusterState , remoteClusterState , patterns );
306283 } else {
307- List <AutoFollowResult > results = new ArrayList <>(patterns .size ());
308- for (String autoFollowPatternName : patterns ) {
309- results .add (new AutoFollowResult (autoFollowPatternName , e ));
284+ assert remoteError != null ;
285+ for (int i = 0 ; i < patterns .size (); i ++) {
286+ String autoFollowPatternName = patterns .get (i );
287+ finalise (i , new AutoFollowResult (autoFollowPatternName , remoteError ));
310288 }
311- statsUpdater .accept (results );
312289 }
313290 });
314291 }
315292
293+ private void autoFollowIndices (final AutoFollowMetadata autoFollowMetadata ,
294+ final ClusterState clusterState ,
295+ final ClusterState remoteClusterState ,
296+ final List <String > patterns ) {
297+ int i = 0 ;
298+ for (String autoFollowPatternName : patterns ) {
299+ final int slot = i ;
300+ AutoFollowPattern autoFollowPattern = autoFollowMetadata .getPatterns ().get (autoFollowPatternName );
301+ Map <String , String > headers = autoFollowMetadata .getHeaders ().get (autoFollowPatternName );
302+ List <String > followedIndices = autoFollowMetadata .getFollowedLeaderIndexUUIDs ().get (autoFollowPatternName );
303+
304+ final List <Index > leaderIndicesToFollow = getLeaderIndicesToFollow (autoFollowPattern , remoteClusterState ,
305+ clusterState , followedIndices );
306+ if (leaderIndicesToFollow .isEmpty ()) {
307+ finalise (slot , new AutoFollowResult (autoFollowPatternName ));
308+ } else {
309+ List <Tuple <String , AutoFollowPattern >> patternsForTheSameRemoteCluster = autoFollowMetadata .getPatterns ()
310+ .entrySet ().stream ()
311+ .filter (item -> autoFollowPatternName .equals (item .getKey ()) == false )
312+ .filter (item -> remoteCluster .equals (item .getValue ().getRemoteCluster ()))
313+ .map (item -> new Tuple <>(item .getKey (), item .getValue ()))
314+ .collect (Collectors .toList ());
315+
316+ Consumer <AutoFollowResult > resultHandler = result -> finalise (slot , result );
317+ checkAutoFollowPattern (autoFollowPatternName , remoteCluster , autoFollowPattern , leaderIndicesToFollow , headers ,
318+ patternsForTheSameRemoteCluster , resultHandler );
319+ }
320+ i ++;
321+ }
322+ cleanFollowedRemoteIndices (remoteClusterState , patterns );
323+ }
324+
316325 private void checkAutoFollowPattern (String autoFollowPattenName ,
317- String leaderCluster ,
326+ String remoteCluster ,
318327 AutoFollowPattern autoFollowPattern ,
319328 List <Index > leaderIndicesToFollow ,
320329 Map <String , String > headers ,
321- List <Tuple <String , AutoFollowPattern >> patternsForTheSameLeaderCluster ,
330+ List <Tuple <String , AutoFollowPattern >> patternsForTheSameRemoteCluster ,
322331 Consumer <AutoFollowResult > resultHandler ) {
323332
324333 final CountDown leaderIndicesCountDown = new CountDown (leaderIndicesToFollow .size ());
@@ -327,7 +336,7 @@ private void checkAutoFollowPattern(String autoFollowPattenName,
327336 final Index indexToFollow = leaderIndicesToFollow .get (i );
328337 final int slot = i ;
329338
330- List <String > otherMatchingPatterns = patternsForTheSameLeaderCluster .stream ()
339+ List <String > otherMatchingPatterns = patternsForTheSameRemoteCluster .stream ()
331340 .filter (otherPattern -> otherPattern .v2 ().match (indexToFollow .getName ()))
332341 .map (Tuple ::v1 )
333342 .collect (Collectors .toList ());
@@ -338,14 +347,13 @@ private void checkAutoFollowPattern(String autoFollowPattenName,
338347 resultHandler .accept (new AutoFollowResult (autoFollowPattenName , results .asList ()));
339348 }
340349 } else {
341- followLeaderIndex (autoFollowPattenName , leaderCluster , indexToFollow , autoFollowPattern , headers , error -> {
350+ followLeaderIndex (autoFollowPattenName , remoteCluster , indexToFollow , autoFollowPattern , headers , error -> {
342351 results .set (slot , new Tuple <>(indexToFollow , error ));
343352 if (leaderIndicesCountDown .countDown ()) {
344353 resultHandler .accept (new AutoFollowResult (autoFollowPattenName , results .asList ()));
345354 }
346355 });
347356 }
348-
349357 }
350358 }
351359
@@ -395,18 +403,18 @@ private void finalise(int slot, AutoFollowResult result) {
395403 if (autoFollowPatternsCountDown .countDown ()) {
396404 statsUpdater .accept (autoFollowResults .asList ());
397405 // TODO: Remove scheduling here with using cluster state API's waitForMetadataVersion:
398- threadPool .schedule (TimeValue .timeValueMillis (2500 ), ThreadPool .Names .GENERIC , this ::autoFollowIndices );
406+ threadPool .schedule (TimeValue .timeValueMillis (2500 ), ThreadPool .Names .GENERIC , this ::start );
399407 }
400408 }
401409
402410 static List <Index > getLeaderIndicesToFollow (AutoFollowPattern autoFollowPattern ,
403- ClusterState leaderClusterState ,
411+ ClusterState remoteClusterState ,
404412 ClusterState followerClusterState ,
405413 List <String > followedIndexUUIDs ) {
406414 List <Index > leaderIndicesToFollow = new ArrayList <>();
407- for (IndexMetaData leaderIndexMetaData : leaderClusterState .getMetaData ()) {
415+ for (IndexMetaData leaderIndexMetaData : remoteClusterState .getMetaData ()) {
408416 if (autoFollowPattern .match (leaderIndexMetaData .getIndex ().getName ())) {
409- IndexRoutingTable indexRoutingTable = leaderClusterState .routingTable ().index (leaderIndexMetaData .getIndex ());
417+ IndexRoutingTable indexRoutingTable = remoteClusterState .routingTable ().index (leaderIndexMetaData .getIndex ());
410418 if (indexRoutingTable != null &&
411419 // Leader indices can be in the cluster state, but not all primary shards may be ready yet.
412420 // This checks ensures all primary shards have started, so that index following does not fail.
@@ -465,12 +473,63 @@ static Function<ClusterState, ClusterState> recordLeaderIndexAsFollowFunction(St
465473 };
466474 }
467475
476+ void cleanFollowedRemoteIndices (final ClusterState remoteClusterState , final List <String > patterns ) {
477+ updateAutoFollowMetadata (cleanFollowedRemoteIndices (remoteClusterState .metaData (), patterns ), e -> {
478+ if (e != null ) {
479+ LOGGER .warn ("Error occured while cleaning followed leader indices" , e );
480+ }
481+ });
482+ }
483+
484+ static Function <ClusterState , ClusterState > cleanFollowedRemoteIndices (
485+ final MetaData remoteMetadata , final List <String > autoFollowPatternNames ) {
486+ return currentState -> {
487+ AutoFollowMetadata currentAutoFollowMetadata = currentState .metaData ().custom (AutoFollowMetadata .TYPE );
488+ Map <String , List <String >> autoFollowPatternNameToFollowedIndexUUIDs =
489+ new HashMap <>(currentAutoFollowMetadata .getFollowedLeaderIndexUUIDs ());
490+ Set <String > remoteIndexUUIDS = new HashSet <>();
491+ remoteMetadata .getIndices ().values ()
492+ .forEach ((ObjectPredicate <IndexMetaData >) value -> remoteIndexUUIDS .add (value .getIndexUUID ()));
493+
494+ boolean requiresCSUpdate = false ;
495+ for (String autoFollowPatternName : autoFollowPatternNames ) {
496+ if (autoFollowPatternNameToFollowedIndexUUIDs .containsKey (autoFollowPatternName ) == false ) {
497+ // A delete auto follow pattern request can have removed the auto follow pattern while we want to update
498+ // the auto follow metadata with the fact that an index was successfully auto followed. If this
499+ // happens, we can just skip this step.
500+ continue ;
501+ }
502+
503+ List <String > followedIndexUUIDs =
504+ new ArrayList <>(autoFollowPatternNameToFollowedIndexUUIDs .get (autoFollowPatternName ));
505+ // Remove leader indices that no longer exist in the remote cluster:
506+ boolean entriesRemoved = followedIndexUUIDs .removeIf (
507+ followedLeaderIndexUUID -> remoteIndexUUIDS .contains (followedLeaderIndexUUID ) == false );
508+ if (entriesRemoved ) {
509+ requiresCSUpdate = true ;
510+ }
511+ autoFollowPatternNameToFollowedIndexUUIDs .put (autoFollowPatternName , followedIndexUUIDs );
512+ }
513+
514+ if (requiresCSUpdate ) {
515+ final AutoFollowMetadata newAutoFollowMetadata = new AutoFollowMetadata (currentAutoFollowMetadata .getPatterns (),
516+ autoFollowPatternNameToFollowedIndexUUIDs , currentAutoFollowMetadata .getHeaders ());
517+ return ClusterState .builder (currentState )
518+ .metaData (MetaData .builder (currentState .getMetaData ())
519+ .putCustom (AutoFollowMetadata .TYPE , newAutoFollowMetadata ).build ())
520+ .build ();
521+ } else {
522+ return currentState ;
523+ }
524+ };
525+ }
526+
468527 /**
469528 * Fetch the cluster state from the leader with the specified cluster alias
470529 * @param remoteCluster the name of the leader cluster
471530 * @param handler the callback to invoke
472531 */
473- abstract void getLeaderClusterState (
532+ abstract void getRemoteClusterState (
474533 String remoteCluster ,
475534 BiConsumer <ClusterState , Exception > handler
476535 );
0 commit comments