2424import org .elasticsearch .cluster .ClusterState ;
2525import org .elasticsearch .cluster .metadata .IndexMetaData ;
2626import org .elasticsearch .cluster .metadata .MappingMetaData ;
27+ import org .elasticsearch .cluster .metadata .MetaData ;
2728import org .elasticsearch .cluster .routing .IndexRoutingTable ;
2829import org .elasticsearch .cluster .service .ClusterService ;
2930import org .elasticsearch .common .CheckedConsumer ;
3031import org .elasticsearch .common .settings .IndexScopedSettings ;
3132import org .elasticsearch .common .settings .Settings ;
33+ import org .elasticsearch .common .settings .SettingsModule ;
3234import org .elasticsearch .common .unit .TimeValue ;
3335import org .elasticsearch .common .util .concurrent .EsRejectedExecutionException ;
3436import org .elasticsearch .index .Index ;
4648import org .elasticsearch .tasks .TaskId ;
4749import org .elasticsearch .threadpool .ThreadPool ;
4850import org .elasticsearch .xpack .ccr .Ccr ;
51+ import org .elasticsearch .xpack .ccr .CcrSettings ;
4952import org .elasticsearch .xpack .ccr .action .bulk .BulkShardOperationsAction ;
5053import org .elasticsearch .xpack .ccr .action .bulk .BulkShardOperationsRequest ;
5154import org .elasticsearch .xpack .ccr .action .bulk .BulkShardOperationsResponse ;
@@ -69,16 +72,20 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
6972 private final ThreadPool threadPool ;
7073 private final ClusterService clusterService ;
7174 private final IndexScopedSettings indexScopedSettings ;
75+ private volatile TimeValue waitForMetadataTimeOut ;
7276
7377 public ShardFollowTasksExecutor (Client client ,
7478 ThreadPool threadPool ,
7579 ClusterService clusterService ,
76- IndexScopedSettings indexScopedSettings ) {
80+ SettingsModule settingsModule ) {
7781 super (ShardFollowTask .NAME , Ccr .CCR_THREAD_POOL_NAME );
7882 this .client = client ;
7983 this .threadPool = threadPool ;
8084 this .clusterService = clusterService ;
81- this .indexScopedSettings = indexScopedSettings ;
85+ this .indexScopedSettings = settingsModule .getIndexScopedSettings ();
86+ this .waitForMetadataTimeOut = CcrSettings .CCR_WAIT_FOR_METADATA_TIMEOUT .get (settingsModule .getSettings ());
87+ clusterService .getClusterSettings ().addSettingsUpdateConsumer (CcrSettings .CCR_WAIT_FOR_METADATA_TIMEOUT ,
88+ newVal -> this .waitForMetadataTimeOut = newVal );
8289 }
8390
8491 @ Override
@@ -112,33 +119,25 @@ protected AllocatedPersistentTask createTask(long id, String type, String action
112119 scheduler , System ::nanoTime ) {
113120
114121 @ Override
115- protected void innerUpdateMapping (LongConsumer handler , Consumer <Exception > errorHandler ) {
116- Index leaderIndex = params .getLeaderShardId ().getIndex ();
117- Index followIndex = params .getFollowShardId ().getIndex ();
118-
119- ClusterStateRequest clusterStateRequest = CcrRequests .metaDataRequest (leaderIndex .getName ());
120- CheckedConsumer <ClusterStateResponse , Exception > onResponse = clusterStateResponse -> {
121- IndexMetaData indexMetaData = clusterStateResponse .getState ().metaData ().getIndexSafe (leaderIndex );
122- if (indexMetaData .getMappings ().isEmpty ()) {
123- assert indexMetaData .getMappingVersion () == 1 ;
124- handler .accept (indexMetaData .getMappingVersion ());
125- return ;
126- }
127-
128- assert indexMetaData .getMappings ().size () == 1 : "expected exactly one mapping, but got [" +
129- indexMetaData .getMappings ().size () + "]" ;
130- MappingMetaData mappingMetaData = indexMetaData .getMappings ().iterator ().next ().value ;
131-
132- PutMappingRequest putMappingRequest = CcrRequests .putMappingRequest (followIndex .getName (), mappingMetaData );
133- followerClient .admin ().indices ().putMapping (putMappingRequest , ActionListener .wrap (
134- putMappingResponse -> handler .accept (indexMetaData .getMappingVersion ()),
135- errorHandler ));
136- };
137- try {
138- remoteClient (params ).admin ().cluster ().state (clusterStateRequest , ActionListener .wrap (onResponse , errorHandler ));
139- } catch (Exception e ) {
140- errorHandler .accept (e );
141- }
122+ protected void innerUpdateMapping (long minRequiredMappingVersion , LongConsumer handler , Consumer <Exception > errorHandler ) {
123+ final Index followerIndex = params .getFollowShardId ().getIndex ();
124+ getIndexMetadata (minRequiredMappingVersion , 0L , params , ActionListener .wrap (
125+ indexMetaData -> {
126+ if (indexMetaData .getMappings ().isEmpty ()) {
127+ assert indexMetaData .getMappingVersion () == 1 ;
128+ handler .accept (indexMetaData .getMappingVersion ());
129+ return ;
130+ }
131+ assert indexMetaData .getMappings ().size () == 1 : "expected exactly one mapping, but got [" +
132+ indexMetaData .getMappings ().size () + "]" ;
133+ MappingMetaData mappingMetaData = indexMetaData .getMappings ().iterator ().next ().value ;
134+ PutMappingRequest putMappingRequest = CcrRequests .putMappingRequest (followerIndex .getName (), mappingMetaData );
135+ followerClient .admin ().indices ().putMapping (putMappingRequest , ActionListener .wrap (
136+ putMappingResponse -> handler .accept (indexMetaData .getMappingVersion ()),
137+ errorHandler ));
138+ },
139+ errorHandler
140+ ));
142141 }
143142
144143 @ Override
@@ -257,6 +256,39 @@ private Client remoteClient(ShardFollowTask params) {
257256 return wrapClient (client .getRemoteClusterClient (params .getRemoteCluster ()), params .getHeaders ());
258257 }
259258
259+ private void getIndexMetadata (long minRequiredMappingVersion , long minRequiredMetadataVersion ,
260+ ShardFollowTask params , ActionListener <IndexMetaData > listener ) {
261+ final Index leaderIndex = params .getLeaderShardId ().getIndex ();
262+ final ClusterStateRequest clusterStateRequest = CcrRequests .metaDataRequest (leaderIndex .getName ());
263+ if (minRequiredMetadataVersion > 0 ) {
264+ clusterStateRequest .waitForMetaDataVersion (minRequiredMetadataVersion ).waitForTimeout (waitForMetadataTimeOut );
265+ }
266+ try {
267+ remoteClient (params ).admin ().cluster ().state (clusterStateRequest , ActionListener .wrap (
268+ r -> {
269+ // if wait_for_metadata_version timeout, the response is empty
270+ if (r .getState () == null ) {
271+ assert minRequiredMetadataVersion > 0 ;
272+ getIndexMetadata (minRequiredMappingVersion , minRequiredMetadataVersion , params , listener );
273+ return ;
274+ }
275+ final MetaData metaData = r .getState ().metaData ();
276+ final IndexMetaData indexMetaData = metaData .getIndexSafe (leaderIndex );
277+ if (indexMetaData .getMappingVersion () < minRequiredMappingVersion ) {
278+ // ask for the next version.
279+ getIndexMetadata (minRequiredMappingVersion , metaData .version () + 1 , params , listener );
280+ } else {
281+ assert metaData .version () >= minRequiredMetadataVersion : metaData .version () + " < " + minRequiredMetadataVersion ;
282+ listener .onResponse (indexMetaData );
283+ }
284+ },
285+ listener ::onFailure
286+ ));
287+ } catch (Exception e ) {
288+ listener .onFailure (e );
289+ }
290+ }
291+
260292 interface FollowerStatsInfoHandler {
261293 void accept (String followerHistoryUUID , long globalCheckpoint , long maxSeqNo );
262294 }
0 commit comments