diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index ab0f995803762..8ab9e396b4dc1 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -179,8 +179,7 @@ public List> getPersistentTasksExecutor(ClusterServic ThreadPool threadPool, Client client, SettingsModule settingsModule) { - IndexScopedSettings indexScopedSettings = settingsModule.getIndexScopedSettings(); - return Collections.singletonList(new ShardFollowTasksExecutor(client, threadPool, clusterService, indexScopedSettings)); + return Collections.singletonList(new ShardFollowTasksExecutor(client, threadPool, clusterService, settingsModule)); } public List> getActions() { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java index 26089ab46952d..d6262a7711dad 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java @@ -30,11 +30,17 @@ public final class CcrSettings { Setting.boolSetting("index.xpack.ccr.following_index", false, Property.IndexScope, Property.InternalIndex); /** - * Dynamic node setting for specifying the wait_for_timeout that the auto follow coordinator should be using. + * Dynamic node setting for specifying the wait_for_timeout that the auto follow coordinator and shard follow task should be using. */ - public static final Setting CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT = Setting.timeSetting( - "ccr.auto_follow.wait_for_metadata_timeout", TimeValue.timeValueSeconds(60), Property.NodeScope, Property.Dynamic); + public static final Setting CCR_WAIT_FOR_METADATA_TIMEOUT = Setting.timeSetting( + "ccr.wait_for_metadata_timeout", TimeValue.timeValueSeconds(60), Property.NodeScope, Property.Dynamic); + /** + * Dynamic node setting for specifying the wait_for_timeout that the auto follow coordinator should be using. + * TODO: Deprecate and remove this setting + */ + private static final Setting CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT = Setting.timeSetting( + "ccr.auto_follow.wait_for_metadata_timeout", CCR_WAIT_FOR_METADATA_TIMEOUT, Property.NodeScope, Property.Dynamic); /** * Max bytes a node can recover per second. @@ -62,7 +68,8 @@ static List> getSettings() { CCR_FOLLOWING_INDEX_SETTING, RECOVERY_MAX_BYTES_PER_SECOND, INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, - CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT); + CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT, + CCR_WAIT_FOR_METADATA_TIMEOUT); } private final CombinedRateLimiter ccrRateLimiter; diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index e3b008efc5657..19d2b9ce4797e 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -113,8 +113,8 @@ protected boolean removeEldestEntry(final Map.Entry { + updateMapping(0L, followerMappingVersion -> { synchronized (ShardFollowNodeTask.this) { currentMappingVersion = followerMappingVersion; } @@ -370,7 +370,7 @@ private synchronized void handleWriteResponse(final BulkShardOperationsResponse coordinateReads(); } - private synchronized void maybeUpdateMapping(Long minimumRequiredMappingVersion, Runnable task) { + private synchronized void maybeUpdateMapping(long minimumRequiredMappingVersion, Runnable task) { if (currentMappingVersion >= minimumRequiredMappingVersion) { LOGGER.trace("{} mapping version [{}] is higher or equal than minimum required mapping version [{}]", params.getFollowShardId(), currentMappingVersion, minimumRequiredMappingVersion); @@ -378,7 +378,7 @@ private synchronized void maybeUpdateMapping(Long minimumRequiredMappingVersion, } else { LOGGER.trace("{} updating mapping, mapping version [{}] is lower than minimum required mapping version [{}]", params.getFollowShardId(), currentMappingVersion, minimumRequiredMappingVersion); - updateMapping(mappingVersion -> { + updateMapping(minimumRequiredMappingVersion, mappingVersion -> { currentMappingVersion = mappingVersion; task.run(); }); @@ -400,12 +400,13 @@ private synchronized void maybeUpdateSettings(final Long minimumRequiredSettings } } - private void updateMapping(LongConsumer handler) { - updateMapping(handler, new AtomicInteger(0)); + private void updateMapping(long minRequiredMappingVersion, LongConsumer handler) { + updateMapping(minRequiredMappingVersion, handler, new AtomicInteger(0)); } - private void updateMapping(LongConsumer handler, AtomicInteger retryCounter) { - innerUpdateMapping(handler, e -> handleFailure(e, retryCounter, () -> updateMapping(handler, retryCounter))); + private void updateMapping(long minRequiredMappingVersion, LongConsumer handler, AtomicInteger retryCounter) { + innerUpdateMapping(minRequiredMappingVersion, handler, + e -> handleFailure(e, retryCounter, () -> updateMapping(minRequiredMappingVersion, handler, retryCounter))); } private void updateSettings(final LongConsumer handler) { @@ -471,7 +472,7 @@ static boolean shouldRetry(String remoteCluster, Exception e) { } // These methods are protected for testing purposes: - protected abstract void innerUpdateMapping(LongConsumer handler, Consumer errorHandler); + protected abstract void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer handler, Consumer errorHandler); protected abstract void innerUpdateSettings(LongConsumer handler, Consumer errorHandler); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 97308126ffb3f..956171ba9b7c3 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -24,11 +24,13 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.Index; @@ -46,6 +48,7 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ccr.Ccr; +import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; @@ -69,16 +72,20 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor this.waitForMetadataTimeOut = newVal); } @Override @@ -112,33 +119,25 @@ protected AllocatedPersistentTask createTask(long id, String type, String action scheduler, System::nanoTime) { @Override - protected void innerUpdateMapping(LongConsumer handler, Consumer errorHandler) { - Index leaderIndex = params.getLeaderShardId().getIndex(); - Index followIndex = params.getFollowShardId().getIndex(); - - ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName()); - CheckedConsumer onResponse = clusterStateResponse -> { - IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex); - if (indexMetaData.getMappings().isEmpty()) { - assert indexMetaData.getMappingVersion() == 1; - handler.accept(indexMetaData.getMappingVersion()); - return; - } - - assert indexMetaData.getMappings().size() == 1 : "expected exactly one mapping, but got [" + - indexMetaData.getMappings().size() + "]"; - MappingMetaData mappingMetaData = indexMetaData.getMappings().iterator().next().value; - - PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followIndex.getName(), mappingMetaData); - followerClient.admin().indices().putMapping(putMappingRequest, ActionListener.wrap( - putMappingResponse -> handler.accept(indexMetaData.getMappingVersion()), - errorHandler)); - }; - try { - remoteClient(params).admin().cluster().state(clusterStateRequest, ActionListener.wrap(onResponse, errorHandler)); - } catch (Exception e) { - errorHandler.accept(e); - } + protected void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer handler, Consumer errorHandler) { + final Index followerIndex = params.getFollowShardId().getIndex(); + getIndexMetadata(minRequiredMappingVersion, 0L, params, ActionListener.wrap( + indexMetaData -> { + if (indexMetaData.getMappings().isEmpty()) { + assert indexMetaData.getMappingVersion() == 1; + handler.accept(indexMetaData.getMappingVersion()); + return; + } + assert indexMetaData.getMappings().size() == 1 : "expected exactly one mapping, but got [" + + indexMetaData.getMappings().size() + "]"; + MappingMetaData mappingMetaData = indexMetaData.getMappings().iterator().next().value; + PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followerIndex.getName(), mappingMetaData); + followerClient.admin().indices().putMapping(putMappingRequest, ActionListener.wrap( + putMappingResponse -> handler.accept(indexMetaData.getMappingVersion()), + errorHandler)); + }, + errorHandler + )); } @Override @@ -257,6 +256,39 @@ private Client remoteClient(ShardFollowTask params) { return wrapClient(client.getRemoteClusterClient(params.getRemoteCluster()), params.getHeaders()); } + private void getIndexMetadata(long minRequiredMappingVersion, long minRequiredMetadataVersion, + ShardFollowTask params, ActionListener listener) { + final Index leaderIndex = params.getLeaderShardId().getIndex(); + final ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName()); + if (minRequiredMetadataVersion > 0) { + clusterStateRequest.waitForMetaDataVersion(minRequiredMetadataVersion).waitForTimeout(waitForMetadataTimeOut); + } + try { + remoteClient(params).admin().cluster().state(clusterStateRequest, ActionListener.wrap( + r -> { + // if wait_for_metadata_version timeout, the response is empty + if (r.getState() == null) { + assert minRequiredMetadataVersion > 0; + getIndexMetadata(minRequiredMappingVersion, minRequiredMetadataVersion, params, listener); + return; + } + final MetaData metaData = r.getState().metaData(); + final IndexMetaData indexMetaData = metaData.getIndexSafe(leaderIndex); + if (indexMetaData.getMappingVersion() < minRequiredMappingVersion) { + // ask for the next version. + getIndexMetadata(minRequiredMappingVersion, metaData.version() + 1, params, listener); + } else { + assert metaData.version() >= minRequiredMetadataVersion : metaData.version() + " < " + minRequiredMetadataVersion; + listener.onResponse(indexMetaData); + } + }, + listener::onFailure + )); + } catch (Exception e) { + listener.onFailure(e); + } + } + interface FollowerStatsInfoHandler { void accept(String followerHistoryUUID, long globalCheckpoint, long maxSeqNo); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java index 7af3d690e3a99..65fd80325e716 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java @@ -201,7 +201,7 @@ private NodeConfigurationSource createNodeConfigurationSource(String leaderSeedA builder.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false); builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial"); // Let cluster state api return quickly in order to speed up auto follow tests: - builder.put(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT.getKey(), TimeValue.timeValueMillis(100)); + builder.put(CcrSettings.CCR_WAIT_FOR_METADATA_TIMEOUT.getKey(), TimeValue.timeValueMillis(100)); if (configureRemoteClusterViaNodeSettings() && leaderSeedAddress != null) { builder.put("cluster.remote.leader_cluster.seeds", leaderSeedAddress); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java index e77a672f1fddb..ad8f545fa9dc0 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java @@ -46,7 +46,7 @@ protected Settings nodeSettings() { builder.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false); builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial"); // Let cluster state api return quickly in order to speed up auto follow tests: - builder.put(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT.getKey(), TimeValue.timeValueMillis(100)); + builder.put(CcrSettings.CCR_WAIT_FOR_METADATA_TIMEOUT.getKey(), TimeValue.timeValueMillis(100)); return builder.build(); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java index d58a2d0a0f18d..f03eeaaa03648 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java @@ -6,25 +6,34 @@ package org.elasticsearch.xpack.ccr; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.xpack.CcrIntegTestCase; +import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; +import org.elasticsearch.xpack.core.ccr.client.CcrClient; +import org.hamcrest.Matchers; import java.util.Locale; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -32,6 +41,7 @@ import static java.util.Collections.singletonMap; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; public class FollowerFailOverIT extends CcrIntegTestCase { @@ -220,4 +230,56 @@ public void testAddNewReplicasOnFollower() throws Exception { pauseFollow("follower-index"); } + public void testReadRequestsReturnsLatestMappingVersion() throws Exception { + InternalTestCluster leaderCluster = getLeaderCluster(); + Settings nodeAttributes = Settings.builder().put("node.attr.box", "large").build(); + String dataNode = leaderCluster.startDataOnlyNode(nodeAttributes); + assertAcked( + leaderClient().admin().indices().prepareCreate("leader-index") + .setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true") + .put("index.routing.allocation.require.box", "large")) + .get() + ); + ClusterService clusterService = leaderCluster.clusterService(dataNode); + ShardId shardId = clusterService.state().routingTable().index("leader-index").shard(0).shardId(); + IndicesService indicesService = leaderCluster.getInstance(IndicesService.class, dataNode); + IndexShard indexShard = indicesService.getShardOrNull(shardId); + // Block the ClusterService from exposing the cluster state with the mapping change. This makes the ClusterService + // have an older mapping version than the actual mapping version that IndexService will use to index "doc1". + final CountDownLatch latch = new CountDownLatch(1); + clusterService.addLowPriorityApplier(event -> { + IndexMetaData imd = event.state().metaData().index("leader-index"); + if (imd != null && imd.mapping() != null && + XContentMapValues.extractValue("properties.balance.type", imd.mapping().sourceAsMap()) != null) { + try { + logger.info("--> block ClusterService from exposing new mapping version"); + latch.await(); + } catch (Exception e) { + throw new AssertionError(e); + } + } + }); + leaderCluster.client().admin().indices().preparePutMapping().setType("doc") + .setSource("balance", "type=long").setTimeout(TimeValue.ZERO).get(); + IndexResponse indexResp = leaderCluster.client(dataNode).prepareIndex("leader-index", "doc", "1") + .setSource("{\"balance\": 100}", XContentType.JSON).setTimeout(TimeValue.ZERO).get(); + assertThat(indexResp.getResult(), equalTo(DocWriteResponse.Result.CREATED)); + assertThat(indexShard.getGlobalCheckpoint(), equalTo(0L)); + getFollowerCluster().startDataOnlyNode(nodeAttributes); + followerClient().execute(PutFollowAction.INSTANCE, putFollow("leader-index", "follower-index")).get(); + ensureFollowerGreen("follower-index"); + // Make sure at least one read-request which requires mapping sync is completed. + assertBusy(() -> { + CcrClient ccrClient = new CcrClient(followerClient()); + FollowStatsAction.StatsResponses responses = ccrClient.followStats(new FollowStatsAction.StatsRequest()).actionGet(); + long bytesRead = responses.getStatsResponses().stream().mapToLong(r -> r.status().bytesRead()).sum(); + assertThat(bytesRead, Matchers.greaterThan(0L)); + }, 60, TimeUnit.SECONDS); + latch.countDown(); + assertIndexFullyReplicatedToFollower("leader-index", "follower-index"); + pauseFollow("follower-index"); + } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index 2ac67a65fc1c6..4d4603d022f7d 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -1016,7 +1016,7 @@ private static Supplier localClusterStateSupplier(ClusterState... private ClusterService mockClusterService() { ClusterService clusterService = mock(ClusterService.class); ClusterSettings clusterSettings = - new ClusterSettings(Settings.EMPTY, Collections.singleton(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT)); + new ClusterSettings(Settings.EMPTY, Collections.singleton(CcrSettings.CCR_WAIT_FOR_METADATA_TIMEOUT)); when(clusterService.getClusterSettings()).thenReturn(clusterSettings); return clusterService; } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java index dde869f80bef9..4af9e7c23a276 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java @@ -111,7 +111,7 @@ private ShardFollowNodeTask createShardFollowTask(int concurrency, TestRun testR private final Map fromToSlot = new HashMap<>(); @Override - protected void innerUpdateMapping(LongConsumer handler, Consumer errorHandler) { + protected void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer handler, Consumer errorHandler) { handler.accept(mappingVersion); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index 9929241fc23c8..a7d07b6066732 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -988,7 +988,7 @@ private ShardFollowNodeTask createShardFollowTask(ShardFollowTaskParams params) 1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), followTask, scheduler, System::nanoTime) { @Override - protected void innerUpdateMapping(LongConsumer handler, Consumer errorHandler) { + protected void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer handler, Consumer errorHandler) { Exception failure = mappingUpdateFailures.poll(); if (failure != null) { errorHandler.accept(failure); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 9dc7c6648eebc..8a3e374a24c5a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -396,7 +396,7 @@ protected synchronized void onOperationsFetched(Translog.Operation[] operations) } @Override - protected void innerUpdateMapping(LongConsumer handler, Consumer errorHandler) { + protected void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer handler, Consumer errorHandler) { // noop, as mapping updates are not tested handler.accept(1L); }