Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public Collection<Object> createComponents(

return Arrays.asList(
ccrLicenseChecker,
new AutoFollowCoordinator(client, threadPool, clusterService, ccrLicenseChecker)
new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,9 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
client.getRemoteClusterClient(clusterAlias),
request,
onFailure,
leaderClusterState -> {
IndexMetaData leaderIndexMetaData = leaderClusterState.getMetaData().index(leaderIndex);
remoteClusterStateRsp -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess “Rsp is Response? Sorry to be a nit, can we spell it out?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was my attempt to keep line length at bay. I will change it.

ClusterState remoteClusterState = remoteClusterStateRsp.getState();
IndexMetaData leaderIndexMetaData = remoteClusterState.getMetaData().index(leaderIndex);
if (leaderIndexMetaData == null) {
onFailure.accept(new IndexNotFoundException(leaderIndex));
return;
Expand Down Expand Up @@ -159,7 +160,7 @@ public void checkRemoteClusterLicenseAndFetchClusterState(
final String clusterAlias,
final ClusterStateRequest request,
final Consumer<Exception> onFailure,
final Consumer<ClusterState> leaderClusterStateConsumer) {
final Consumer<ClusterStateResponse> leaderClusterStateConsumer) {
try {
Client remoteClient = systemClient(client.getRemoteClusterClient(clusterAlias));
checkRemoteClusterLicenseAndFetchClusterState(
Expand Down Expand Up @@ -199,7 +200,7 @@ private void checkRemoteClusterLicenseAndFetchClusterState(
final Client remoteClient,
final ClusterStateRequest request,
final Consumer<Exception> onFailure,
final Consumer<ClusterState> leaderClusterStateConsumer,
final Consumer<ClusterStateResponse> leaderClusterStateConsumer,
final Function<RemoteClusterLicenseChecker.LicenseCheck, ElasticsearchStatusException> nonCompliantLicense,
final Function<Exception, ElasticsearchStatusException> unknownLicense) {
// we have to check the license on the remote cluster
Expand All @@ -211,7 +212,7 @@ private void checkRemoteClusterLicenseAndFetchClusterState(
public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) {
if (licenseCheck.isSuccess()) {
final ActionListener<ClusterStateResponse> clusterStateListener =
ActionListener.wrap(s -> leaderClusterStateConsumer.accept(s.getState()), onFailure);
ActionListener.wrap(leaderClusterStateConsumer::accept, onFailure);
// following an index in remote cluster, so use remote client to fetch leader index metadata
remoteClient.admin().cluster().state(request, clusterStateListener);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -23,13 +24,11 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.CopyOnWriteHashMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
Expand Down Expand Up @@ -62,7 +61,6 @@ public class AutoFollowCoordinator implements ClusterStateListener {
private static final int MAX_AUTO_FOLLOW_ERRORS = 256;

private final Client client;
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final CcrLicenseChecker ccrLicenseChecker;

Expand All @@ -76,11 +74,9 @@ public class AutoFollowCoordinator implements ClusterStateListener {

public AutoFollowCoordinator(
Client client,
ThreadPool threadPool,
ClusterService clusterService,
CcrLicenseChecker ccrLicenseChecker) {
this.client = client;
this.threadPool = threadPool;
this.clusterService = clusterService;
this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker, "ccrLicenseChecker");
clusterService.addListener(this);
Expand Down Expand Up @@ -146,22 +142,24 @@ void updateAutoFollowers(ClusterState followerClusterState) {

Map<String, AutoFollower> newAutoFollowers = new HashMap<>(newRemoteClusters.size());
for (String remoteCluster : newRemoteClusters) {
AutoFollower autoFollower = new AutoFollower(remoteCluster, threadPool, this::updateStats, clusterService::state) {
AutoFollower autoFollower = new AutoFollower(remoteCluster, this::updateStats, clusterService::state) {

@Override
void getLeaderClusterState(final String remoteCluster,
final BiConsumer<ClusterState, Exception> handler) {
void getRemoteClusterState(final String remoteCluster,
final long metadataVersion,
final BiConsumer<ClusterStateResponse, Exception> handler) {
final ClusterStateRequest request = new ClusterStateRequest();
request.clear();
request.metaData(true);
request.routingTable(true);
request.waitForMetaDataVersion(metadataVersion);
// TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState(
client,
remoteCluster,
request,
e -> handler.accept(null, e),
leaderClusterState -> handler.accept(leaderClusterState, null));
remoteClusterStateRsp -> handler.accept(remoteClusterStateRsp, null));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment here about spelling out to Response.

}

@Override
Expand Down Expand Up @@ -235,19 +233,17 @@ public void clusterChanged(ClusterChangedEvent event) {
abstract static class AutoFollower {

private final String remoteCluster;
private final ThreadPool threadPool;
private final Consumer<List<AutoFollowResult>> statsUpdater;
private final Supplier<ClusterState> followerClusterStateSupplier;

private volatile long metadataVersion = 0;
private volatile CountDown autoFollowPatternsCountDown;
private volatile AtomicArray<AutoFollowResult> autoFollowResults;

AutoFollower(final String remoteCluster,
final ThreadPool threadPool,
final Consumer<List<AutoFollowResult>> statsUpdater,
final Supplier<ClusterState> followerClusterStateSupplier) {
this.remoteCluster = remoteCluster;
this.threadPool = threadPool;
this.statsUpdater = statsUpdater;
this.followerClusterStateSupplier = followerClusterStateSupplier;
}
Expand All @@ -272,10 +268,15 @@ void autoFollowIndices() {
this.autoFollowPatternsCountDown = new CountDown(patterns.size());
this.autoFollowResults = new AtomicArray<>(patterns.size());

getLeaderClusterState(remoteCluster, (leaderClusterState, e) -> {
if (leaderClusterState != null) {
getRemoteClusterState(remoteCluster, metadataVersion + 1, (leaderClusterStateRsp, e) -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remote instead of leader.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And of course same comment to spell out.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#36408 already changes this method, so when I merge in master after that pr is merged it should be good.

if (leaderClusterStateRsp != null) {
assert e == null;
if (leaderClusterStateRsp.isWaitForTimedOut()) {
autoFollowIndices();
return;
}

ClusterState leaderClusterState = leaderClusterStateRsp.getState();
int i = 0;
for (String autoFollowPatternName : patterns) {
final int slot = i;
Expand Down Expand Up @@ -392,8 +393,7 @@ private void finalise(int slot, AutoFollowResult result) {
autoFollowResults.set(slot, result);
if (autoFollowPatternsCountDown.countDown()) {
statsUpdater.accept(autoFollowResults.asList());
// TODO: Remove scheduling here with using cluster state API's waitForMetadataVersion:
threadPool.schedule(TimeValue.timeValueMillis(2500), ThreadPool.Names.GENERIC, this::autoFollowIndices);
autoFollowIndices();
}
}

Expand Down Expand Up @@ -461,13 +461,15 @@ static Function<ClusterState, ClusterState> recordLeaderIndexAsFollowFunction(St
}

/**
* Fetch the cluster state from the leader with the specified cluster alias
* Fetch a remote cluster state from with the specified cluster alias
* @param remoteCluster the name of the leader cluster
* @param metadataVersion the last seen metadata version
* @param handler the callback to invoke
*/
abstract void getLeaderClusterState(
abstract void getRemoteClusterState(
String remoteCluster,
BiConsumer<ClusterState, Exception> handler
long metadataVersion,
BiConsumer<ClusterStateResponse, Exception> handler
);

abstract void createAndFollow(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
Expand Down Expand Up @@ -80,7 +81,7 @@ protected void masterOperation(PutAutoFollowPatternAction.Request request,
.filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

Consumer<ClusterState> consumer = remoteClusterState -> {
Consumer<ClusterStateResponse> consumer = remoteClusterState -> {
String[] indices = request.getLeaderIndexPatterns().toArray(new String[0]);
ccrLicenseChecker.hasPrivilegesToFollowIndices(remoteClient, indices, e -> {
if (e == null) {
Expand All @@ -94,7 +95,7 @@ protected AcknowledgedResponse newResponse(boolean acknowledged) {

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return innerPut(request, filteredHeaders, currentState, remoteClusterState);
return innerPut(request, filteredHeaders, currentState, remoteClusterState.getState());
}
});
} else {
Expand Down
Loading