Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ccr.action;

import com.carrotsearch.hppc.predicates.ObjectPredicate;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
Expand Down Expand Up @@ -42,6 +43,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -151,7 +153,7 @@ void updateAutoFollowers(ClusterState followerClusterState) {
AutoFollower autoFollower = new AutoFollower(remoteCluster, threadPool, this::updateStats, clusterService::state) {

@Override
void getLeaderClusterState(final String remoteCluster,
void getRemoteClusterState(final String remoteCluster,
final BiConsumer<ClusterState, Exception> handler) {
final ClusterStateRequest request = new ClusterStateRequest();
request.clear();
Expand All @@ -163,7 +165,7 @@ void getLeaderClusterState(final String remoteCluster,
remoteCluster,
request,
e -> handler.accept(null, e),
leaderClusterState -> handler.accept(leaderClusterState, null));
remoteClusterState -> handler.accept(remoteClusterState, null));
}

@Override
Expand Down Expand Up @@ -203,7 +205,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS

};
newAutoFollowers.put(remoteCluster, autoFollower);
autoFollower.autoFollowIndices();
autoFollower.start();
}

List<String> removedRemoteClusters = new ArrayList<>();
Expand Down Expand Up @@ -254,9 +256,9 @@ abstract static class AutoFollower {
this.followerClusterStateSupplier = followerClusterStateSupplier;
}

void autoFollowIndices() {
final ClusterState followerClusterState = followerClusterStateSupplier.get();
final AutoFollowMetadata autoFollowMetadata = followerClusterState.metaData().custom(AutoFollowMetadata.TYPE);
void start() {
final ClusterState clusterState = followerClusterStateSupplier.get();
final AutoFollowMetadata autoFollowMetadata = clusterState.metaData().custom(AutoFollowMetadata.TYPE);
if (autoFollowMetadata == null) {
LOGGER.info("AutoFollower for cluster [{}] has stopped, because there is no autofollow metadata", remoteCluster);
return;
Expand All @@ -274,51 +276,58 @@ void autoFollowIndices() {
this.autoFollowPatternsCountDown = new CountDown(patterns.size());
this.autoFollowResults = new AtomicArray<>(patterns.size());

getLeaderClusterState(remoteCluster, (leaderClusterState, e) -> {
if (leaderClusterState != null) {
assert e == null;

int i = 0;
for (String autoFollowPatternName : patterns) {
final int slot = i;
AutoFollowPattern autoFollowPattern = autoFollowMetadata.getPatterns().get(autoFollowPatternName);
Map<String, String> headers = autoFollowMetadata.getHeaders().get(autoFollowPatternName);
List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(autoFollowPatternName);

final List<Index> leaderIndicesToFollow = getLeaderIndicesToFollow(autoFollowPattern, leaderClusterState,
followerClusterState, followedIndices);
if (leaderIndicesToFollow.isEmpty()) {
finalise(slot, new AutoFollowResult(autoFollowPatternName));
} else {
List<Tuple<String, AutoFollowPattern>> patternsForTheSameLeaderCluster = autoFollowMetadata.getPatterns()
.entrySet().stream()
.filter(item -> autoFollowPatternName.equals(item.getKey()) == false)
.filter(item -> remoteCluster.equals(item.getValue().getRemoteCluster()))
.map(item -> new Tuple<>(item.getKey(), item.getValue()))
.collect(Collectors.toList());

Consumer<AutoFollowResult> resultHandler = result -> finalise(slot, result);
checkAutoFollowPattern(autoFollowPatternName, remoteCluster, autoFollowPattern, leaderIndicesToFollow, headers,
patternsForTheSameLeaderCluster, resultHandler);
}
i++;
}
getRemoteClusterState(remoteCluster, (remoteClusterState, remoteError) -> {
if (remoteClusterState != null) {
assert remoteError == null;
autoFollowIndices(autoFollowMetadata, clusterState, remoteClusterState, patterns);
} else {
List<AutoFollowResult> results = new ArrayList<>(patterns.size());
for (String autoFollowPatternName : patterns) {
results.add(new AutoFollowResult(autoFollowPatternName, e));
assert remoteError != null;
for (int i = 0; i < patterns.size(); i++) {
String autoFollowPatternName = patterns.get(i);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe an assertion that remoteError is not null?

finalise(i, new AutoFollowResult(autoFollowPatternName, remoteError));
}
statsUpdater.accept(results);
}
});
}

private void autoFollowIndices(final AutoFollowMetadata autoFollowMetadata,
final ClusterState clusterState,
final ClusterState remoteClusterState,
final List<String> patterns) {
int i = 0;
for (String autoFollowPatternName : patterns) {
final int slot = i;
AutoFollowPattern autoFollowPattern = autoFollowMetadata.getPatterns().get(autoFollowPatternName);
Map<String, String> headers = autoFollowMetadata.getHeaders().get(autoFollowPatternName);
List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(autoFollowPatternName);

final List<Index> leaderIndicesToFollow = getLeaderIndicesToFollow(autoFollowPattern, remoteClusterState,
clusterState, followedIndices);
if (leaderIndicesToFollow.isEmpty()) {
finalise(slot, new AutoFollowResult(autoFollowPatternName));
} else {
List<Tuple<String, AutoFollowPattern>> patternsForTheSameRemoteCluster = autoFollowMetadata.getPatterns()
.entrySet().stream()
.filter(item -> autoFollowPatternName.equals(item.getKey()) == false)
.filter(item -> remoteCluster.equals(item.getValue().getRemoteCluster()))
.map(item -> new Tuple<>(item.getKey(), item.getValue()))
.collect(Collectors.toList());

Consumer<AutoFollowResult> resultHandler = result -> finalise(slot, result);
checkAutoFollowPattern(autoFollowPatternName, remoteCluster, autoFollowPattern, leaderIndicesToFollow, headers,
patternsForTheSameRemoteCluster, resultHandler);
}
i++;
}
cleanFollowedRemoteIndices(remoteClusterState, patterns);
}

private void checkAutoFollowPattern(String autoFollowPattenName,
String leaderCluster,
String remoteCluster,
AutoFollowPattern autoFollowPattern,
List<Index> leaderIndicesToFollow,
Map<String, String> headers,
List<Tuple<String, AutoFollowPattern>> patternsForTheSameLeaderCluster,
List<Tuple<String, AutoFollowPattern>> patternsForTheSameRemoteCluster,
Consumer<AutoFollowResult> resultHandler) {

final CountDown leaderIndicesCountDown = new CountDown(leaderIndicesToFollow.size());
Expand All @@ -327,7 +336,7 @@ private void checkAutoFollowPattern(String autoFollowPattenName,
final Index indexToFollow = leaderIndicesToFollow.get(i);
final int slot = i;

List<String> otherMatchingPatterns = patternsForTheSameLeaderCluster.stream()
List<String> otherMatchingPatterns = patternsForTheSameRemoteCluster.stream()
.filter(otherPattern -> otherPattern.v2().match(indexToFollow.getName()))
.map(Tuple::v1)
.collect(Collectors.toList());
Expand All @@ -338,14 +347,13 @@ private void checkAutoFollowPattern(String autoFollowPattenName,
resultHandler.accept(new AutoFollowResult(autoFollowPattenName, results.asList()));
}
} else {
followLeaderIndex(autoFollowPattenName, leaderCluster, indexToFollow, autoFollowPattern, headers, error -> {
followLeaderIndex(autoFollowPattenName, remoteCluster, indexToFollow, autoFollowPattern, headers, error -> {
results.set(slot, new Tuple<>(indexToFollow, error));
if (leaderIndicesCountDown.countDown()) {
resultHandler.accept(new AutoFollowResult(autoFollowPattenName, results.asList()));
}
});
}

}
}

Expand Down Expand Up @@ -395,18 +403,18 @@ private void finalise(int slot, AutoFollowResult result) {
if (autoFollowPatternsCountDown.countDown()) {
statsUpdater.accept(autoFollowResults.asList());
// TODO: Remove scheduling here with using cluster state API's waitForMetadataVersion:
threadPool.schedule(TimeValue.timeValueMillis(2500), ThreadPool.Names.GENERIC, this::autoFollowIndices);
threadPool.schedule(TimeValue.timeValueMillis(2500), ThreadPool.Names.GENERIC, this::start);
}
}

static List<Index> getLeaderIndicesToFollow(AutoFollowPattern autoFollowPattern,
ClusterState leaderClusterState,
ClusterState remoteClusterState,
ClusterState followerClusterState,
List<String> followedIndexUUIDs) {
List<Index> leaderIndicesToFollow = new ArrayList<>();
for (IndexMetaData leaderIndexMetaData : leaderClusterState.getMetaData()) {
for (IndexMetaData leaderIndexMetaData : remoteClusterState.getMetaData()) {
if (autoFollowPattern.match(leaderIndexMetaData.getIndex().getName())) {
IndexRoutingTable indexRoutingTable = leaderClusterState.routingTable().index(leaderIndexMetaData.getIndex());
IndexRoutingTable indexRoutingTable = remoteClusterState.routingTable().index(leaderIndexMetaData.getIndex());
if (indexRoutingTable != null &&
// Leader indices can be in the cluster state, but not all primary shards may be ready yet.
// This checks ensures all primary shards have started, so that index following does not fail.
Expand Down Expand Up @@ -465,12 +473,63 @@ static Function<ClusterState, ClusterState> recordLeaderIndexAsFollowFunction(St
};
}

void cleanFollowedRemoteIndices(final ClusterState remoteClusterState, final List<String> patterns) {
updateAutoFollowMetadata(cleanFollowedRemoteIndices(remoteClusterState.metaData(), patterns), e -> {
if (e != null) {
LOGGER.warn("Error occured while cleaning followed leader indices", e);
}
});
}

static Function<ClusterState, ClusterState> cleanFollowedRemoteIndices(
final MetaData remoteMetadata, final List<String> autoFollowPatternNames) {
return currentState -> {
AutoFollowMetadata currentAutoFollowMetadata = currentState.metaData().custom(AutoFollowMetadata.TYPE);
Map<String, List<String>> autoFollowPatternNameToFollowedIndexUUIDs =
new HashMap<>(currentAutoFollowMetadata.getFollowedLeaderIndexUUIDs());
Set<String> remoteIndexUUIDS = new HashSet<>();
remoteMetadata.getIndices().values()
.forEach((ObjectPredicate<IndexMetaData>) value -> remoteIndexUUIDS.add(value.getIndexUUID()));

boolean requiresCSUpdate = false;
for (String autoFollowPatternName : autoFollowPatternNames) {
if (autoFollowPatternNameToFollowedIndexUUIDs.containsKey(autoFollowPatternName) == false) {
// A delete auto follow pattern request can have removed the auto follow pattern while we want to update
// the auto follow metadata with the fact that an index was successfully auto followed. If this
// happens, we can just skip this step.
continue;
}

List<String> followedIndexUUIDs =
new ArrayList<>(autoFollowPatternNameToFollowedIndexUUIDs.get(autoFollowPatternName));
// Remove leader indices that no longer exist in the remote cluster:
boolean entriesRemoved = followedIndexUUIDs.removeIf(
followedLeaderIndexUUID -> remoteIndexUUIDS.contains(followedLeaderIndexUUID) == false);
if (entriesRemoved) {
requiresCSUpdate = true;
}
autoFollowPatternNameToFollowedIndexUUIDs.put(autoFollowPatternName, followedIndexUUIDs);
}

if (requiresCSUpdate) {
final AutoFollowMetadata newAutoFollowMetadata = new AutoFollowMetadata(currentAutoFollowMetadata.getPatterns(),
autoFollowPatternNameToFollowedIndexUUIDs, currentAutoFollowMetadata.getHeaders());
return ClusterState.builder(currentState)
.metaData(MetaData.builder(currentState.getMetaData())
.putCustom(AutoFollowMetadata.TYPE, newAutoFollowMetadata).build())
.build();
} else {
return currentState;
}
};
}

/**
* Fetch the cluster state from the leader with the specified cluster alias
* @param remoteCluster the name of the leader cluster
* @param handler the callback to invoke
*/
abstract void getLeaderClusterState(
abstract void getRemoteClusterState(
String remoteCluster,
BiConsumer<ClusterState, Exception> handler
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
Expand All @@ -18,14 +19,17 @@
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.CcrIntegTestCase;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction;
import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
Expand Down Expand Up @@ -76,6 +80,46 @@ public void testAutoFollow() throws Exception {
assertFalse(followerClient().admin().indices().exists(request).actionGet().isExists());
}

public void testCleanFollowedLeaderIndexUUIDs() throws Exception {
Settings leaderIndexSettings = Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
.build();

putAutoFollowPatterns("my-pattern", new String[] {"logs-*"});
createLeaderIndex("logs-201901", leaderIndexSettings);
assertBusy(() -> {
AutoFollowStats autoFollowStats = getAutoFollowStats();
assertThat(autoFollowStats.getNumberOfSuccessfulFollowIndices(), equalTo(1L));

IndicesExistsRequest request = new IndicesExistsRequest("copy-logs-201901");
assertTrue(followerClient().admin().indices().exists(request).actionGet().isExists());

MetaData metaData = getFollowerCluster().clusterService().state().metaData();
String leaderIndexUUID = metaData.index("copy-logs-201901")
.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY)
.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY);
AutoFollowMetadata autoFollowMetadata = metaData.custom(AutoFollowMetadata.TYPE);
assertThat(autoFollowMetadata, notNullValue());
List<String> followedLeaderIndixUUIDs = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get("my-pattern");
assertThat(followedLeaderIndixUUIDs.size(), equalTo(1));
assertThat(followedLeaderIndixUUIDs.get(0), equalTo(leaderIndexUUID));
});

DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest("logs-201901");
assertAcked(leaderClient().admin().indices().delete(deleteIndexRequest).actionGet());

assertBusy(() -> {
AutoFollowMetadata autoFollowMetadata = getFollowerCluster().clusterService().state()
.metaData()
.custom(AutoFollowMetadata.TYPE);
assertThat(autoFollowMetadata, notNullValue());
List<String> followedLeaderIndixUUIDs = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get("my-pattern");
assertThat(followedLeaderIndixUUIDs.size(), equalTo(0));
});
}

public void testAutoFollowManyIndices() throws Exception {
Settings leaderIndexSettings = Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
Expand Down
Loading