Skip to content

Commit 4a33821

Browse files
committed
[CCR] Change AutofollowCoordinator to use wait_for_metadata_version (#36264)
Changed AutofollowCoordinator makes use of the wait_for_metadata_version feature in cluster state API and removed hard coded poll interval. Originates from #35895 Relates to #33007
1 parent 8d3dec5 commit 4a33821

File tree

5 files changed

+183
-55
lines changed

5 files changed

+183
-55
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ public Collection<Object> createComponents(
161161

162162
return Arrays.asList(
163163
ccrLicenseChecker,
164-
new AutoFollowCoordinator(client, threadPool, clusterService, ccrLicenseChecker)
164+
new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker)
165165
);
166166
}
167167

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,9 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
122122
client.getRemoteClusterClient(clusterAlias),
123123
request,
124124
onFailure,
125-
leaderClusterState -> {
126-
IndexMetaData leaderIndexMetaData = leaderClusterState.getMetaData().index(leaderIndex);
125+
remoteClusterStateResponse -> {
126+
ClusterState remoteClusterState = remoteClusterStateResponse.getState();
127+
IndexMetaData leaderIndexMetaData = remoteClusterState.getMetaData().index(leaderIndex);
127128
if (leaderIndexMetaData == null) {
128129
onFailure.accept(new IndexNotFoundException(leaderIndex));
129130
return;
@@ -160,7 +161,7 @@ public void checkRemoteClusterLicenseAndFetchClusterState(
160161
final String clusterAlias,
161162
final ClusterStateRequest request,
162163
final Consumer<Exception> onFailure,
163-
final Consumer<ClusterState> leaderClusterStateConsumer) {
164+
final Consumer<ClusterStateResponse> leaderClusterStateConsumer) {
164165
try {
165166
Client remoteClient = systemClient(client.getRemoteClusterClient(clusterAlias));
166167
checkRemoteClusterLicenseAndFetchClusterState(
@@ -200,7 +201,7 @@ private void checkRemoteClusterLicenseAndFetchClusterState(
200201
final Client remoteClient,
201202
final ClusterStateRequest request,
202203
final Consumer<Exception> onFailure,
203-
final Consumer<ClusterState> leaderClusterStateConsumer,
204+
final Consumer<ClusterStateResponse> leaderClusterStateConsumer,
204205
final Function<RemoteClusterLicenseChecker.LicenseCheck, ElasticsearchStatusException> nonCompliantLicense,
205206
final Function<Exception, ElasticsearchStatusException> unknownLicense) {
206207
// we have to check the license on the remote cluster
@@ -212,7 +213,7 @@ private void checkRemoteClusterLicenseAndFetchClusterState(
212213
public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) {
213214
if (licenseCheck.isSuccess()) {
214215
final ActionListener<ClusterStateResponse> clusterStateListener =
215-
ActionListener.wrap(s -> leaderClusterStateConsumer.accept(s.getState()), onFailure);
216+
ActionListener.wrap(leaderClusterStateConsumer::accept, onFailure);
216217
// following an index in remote cluster, so use remote client to fetch leader index metadata
217218
remoteClient.admin().cluster().state(request, clusterStateListener);
218219
} else {

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.ExceptionsHelper;
1414
import org.elasticsearch.action.ActionListener;
1515
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
16+
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
1617
import org.elasticsearch.client.Client;
1718
import org.elasticsearch.cluster.ClusterChangedEvent;
1819
import org.elasticsearch.cluster.ClusterState;
@@ -24,13 +25,11 @@
2425
import org.elasticsearch.cluster.service.ClusterService;
2526
import org.elasticsearch.common.collect.CopyOnWriteHashMap;
2627
import org.elasticsearch.common.collect.Tuple;
27-
import org.elasticsearch.common.unit.TimeValue;
2828
import org.elasticsearch.common.util.concurrent.AtomicArray;
2929
import org.elasticsearch.common.util.concurrent.CountDown;
3030
import org.elasticsearch.index.Index;
3131
import org.elasticsearch.index.IndexSettings;
3232
import org.elasticsearch.license.LicenseUtils;
33-
import org.elasticsearch.threadpool.ThreadPool;
3433
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
3534
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
3635
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
@@ -64,7 +63,6 @@ public class AutoFollowCoordinator implements ClusterStateListener {
6463
private static final int MAX_AUTO_FOLLOW_ERRORS = 256;
6564

6665
private final Client client;
67-
private final ThreadPool threadPool;
6866
private final ClusterService clusterService;
6967
private final CcrLicenseChecker ccrLicenseChecker;
7068

@@ -78,11 +76,9 @@ public class AutoFollowCoordinator implements ClusterStateListener {
7876

7977
public AutoFollowCoordinator(
8078
Client client,
81-
ThreadPool threadPool,
8279
ClusterService clusterService,
8380
CcrLicenseChecker ccrLicenseChecker) {
8481
this.client = client;
85-
this.threadPool = threadPool;
8682
this.clusterService = clusterService;
8783
this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker, "ccrLicenseChecker");
8884
clusterService.addListener(this);
@@ -148,22 +144,24 @@ void updateAutoFollowers(ClusterState followerClusterState) {
148144

149145
Map<String, AutoFollower> newAutoFollowers = new HashMap<>(newRemoteClusters.size());
150146
for (String remoteCluster : newRemoteClusters) {
151-
AutoFollower autoFollower = new AutoFollower(remoteCluster, threadPool, this::updateStats, clusterService::state) {
147+
AutoFollower autoFollower = new AutoFollower(remoteCluster, this::updateStats, clusterService::state) {
152148

153149
@Override
154150
void getRemoteClusterState(final String remoteCluster,
155-
final BiConsumer<ClusterState, Exception> handler) {
151+
final long metadataVersion,
152+
final BiConsumer<ClusterStateResponse, Exception> handler) {
156153
final ClusterStateRequest request = new ClusterStateRequest();
157154
request.clear();
158155
request.metaData(true);
159156
request.routingTable(true);
157+
request.waitForMetaDataVersion(metadataVersion);
160158
// TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API
161159
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState(
162160
client,
163161
remoteCluster,
164162
request,
165163
e -> handler.accept(null, e),
166-
remoteClusterState -> handler.accept(remoteClusterState, null));
164+
remoteClusterStateResponse -> handler.accept(remoteClusterStateResponse, null));
167165
}
168166

169167
@Override
@@ -237,19 +235,17 @@ public void clusterChanged(ClusterChangedEvent event) {
237235
abstract static class AutoFollower {
238236

239237
private final String remoteCluster;
240-
private final ThreadPool threadPool;
241238
private final Consumer<List<AutoFollowResult>> statsUpdater;
242239
private final Supplier<ClusterState> followerClusterStateSupplier;
243240

241+
private volatile long metadataVersion = 0;
244242
private volatile CountDown autoFollowPatternsCountDown;
245243
private volatile AtomicArray<AutoFollowResult> autoFollowResults;
246244

247245
AutoFollower(final String remoteCluster,
248-
final ThreadPool threadPool,
249246
final Consumer<List<AutoFollowResult>> statsUpdater,
250247
final Supplier<ClusterState> followerClusterStateSupplier) {
251248
this.remoteCluster = remoteCluster;
252-
this.threadPool = threadPool;
253249
this.statsUpdater = statsUpdater;
254250
this.followerClusterStateSupplier = followerClusterStateSupplier;
255251
}
@@ -274,9 +270,15 @@ void start() {
274270
this.autoFollowPatternsCountDown = new CountDown(patterns.size());
275271
this.autoFollowResults = new AtomicArray<>(patterns.size());
276272

277-
getRemoteClusterState(remoteCluster, (remoteClusterState, remoteError) -> {
278-
if (remoteClusterState != null) {
273+
getRemoteClusterState(remoteCluster, metadataVersion + 1, (remoteClusterStateResponse, remoteError) -> {
274+
if (remoteClusterStateResponse != null) {
279275
assert remoteError == null;
276+
if (remoteClusterStateResponse.isWaitForTimedOut()) {
277+
start();
278+
return;
279+
}
280+
ClusterState remoteClusterState = remoteClusterStateResponse.getState();
281+
metadataVersion = remoteClusterState.metaData().version();
280282
autoFollowIndices(autoFollowMetadata, clusterState, remoteClusterState, patterns);
281283
} else {
282284
assert remoteError != null;
@@ -400,8 +402,7 @@ private void finalise(int slot, AutoFollowResult result) {
400402
autoFollowResults.set(slot, result);
401403
if (autoFollowPatternsCountDown.countDown()) {
402404
statsUpdater.accept(autoFollowResults.asList());
403-
// TODO: Remove scheduling here with using cluster state API's waitForMetadataVersion:
404-
threadPool.schedule(TimeValue.timeValueMillis(2500), ThreadPool.Names.GENERIC, this::start);
405+
start();
405406
}
406407
}
407408

@@ -520,13 +521,15 @@ static Function<ClusterState, ClusterState> cleanFollowedRemoteIndices(
520521
}
521522

522523
/**
523-
* Fetch the cluster state from the leader with the specified cluster alias
524+
* Fetch a remote cluster state from with the specified cluster alias
524525
* @param remoteCluster the name of the leader cluster
526+
* @param metadataVersion the last seen metadata version
525527
* @param handler the callback to invoke
526528
*/
527529
abstract void getRemoteClusterState(
528530
String remoteCluster,
529-
BiConsumer<ClusterState, Exception> handler
531+
long metadataVersion,
532+
BiConsumer<ClusterStateResponse, Exception> handler
530533
);
531534

532535
abstract void createAndFollow(

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import org.elasticsearch.action.ActionListener;
99
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
10+
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
1011
import org.elasticsearch.action.support.ActionFilters;
1112
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1213
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
@@ -82,7 +83,7 @@ protected void masterOperation(PutAutoFollowPatternAction.Request request,
8283
.filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey()))
8384
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
8485

85-
Consumer<ClusterState> consumer = remoteClusterState -> {
86+
Consumer<ClusterStateResponse> consumer = remoteClusterState -> {
8687
String[] indices = request.getLeaderIndexPatterns().toArray(new String[0]);
8788
ccrLicenseChecker.hasPrivilegesToFollowIndices(remoteClient, indices, e -> {
8889
if (e == null) {
@@ -96,7 +97,7 @@ protected AcknowledgedResponse newResponse(boolean acknowledged) {
9697

9798
@Override
9899
public ClusterState execute(ClusterState currentState) throws Exception {
99-
return innerPut(request, filteredHeaders, currentState, remoteClusterState);
100+
return innerPut(request, filteredHeaders, currentState, remoteClusterState.getState());
100101
}
101102
});
102103
} else {

0 commit comments

Comments
 (0)