Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public final class AutoFollowStats {
static final ParseField RECENT_AUTO_FOLLOW_ERRORS = new ParseField("recent_auto_follow_errors");
static final ParseField LEADER_INDEX = new ParseField("leader_index");
static final ParseField AUTO_FOLLOW_EXCEPTION = new ParseField("auto_follow_exception");
static final ParseField AUTO_FOLLOWED_CLUSTERS = new ParseField("auto_followed_clusters");
static final ParseField CLUSTER_NAME = new ParseField("cluster_name");
static final ParseField TIME_SINCE_LAST_AUTO_FOLLOW_MILLIS = new ParseField("time_since_last_auto_follow_millis");
static final ParseField LAST_SEEN_METADATA_VERSION = new ParseField("last_seen_metadata_version");

@SuppressWarnings("unchecked")
static final ConstructingObjectParser<AutoFollowStats, Void> STATS_PARSER = new ConstructingObjectParser<>("auto_follow_stats",
Expand All @@ -48,6 +52,10 @@ public final class AutoFollowStats {
(Long) args[2],
new TreeMap<>(
((List<Map.Entry<String, ElasticsearchException>>) args[3])
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))),
new TreeMap<>(
((List<Map.Entry<String, AutoFollowedCluster>>) args[4])
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
));
Expand All @@ -57,33 +65,47 @@ public final class AutoFollowStats {
"auto_follow_stats_errors",
args -> new AbstractMap.SimpleEntry<>((String) args[0], (ElasticsearchException) args[1]));

private static final ConstructingObjectParser<Map.Entry<String, AutoFollowedCluster>, Void> AUTO_FOLLOWED_CLUSTERS_PARSER =
new ConstructingObjectParser<>(
"auto_followed_clusters",
args -> new AbstractMap.SimpleEntry<>((String) args[0], new AutoFollowedCluster((Long) args[1], (Long) args[2])));

static {
AUTO_FOLLOW_EXCEPTIONS_PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_INDEX);
AUTO_FOLLOW_EXCEPTIONS_PARSER.declareObject(
ConstructingObjectParser.constructorArg(),
(p, c) -> ElasticsearchException.fromXContent(p),
AUTO_FOLLOW_EXCEPTION);

AUTO_FOLLOWED_CLUSTERS_PARSER.declareString(ConstructingObjectParser.constructorArg(), CLUSTER_NAME);
AUTO_FOLLOWED_CLUSTERS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TIME_SINCE_LAST_AUTO_FOLLOW_MILLIS);
AUTO_FOLLOWED_CLUSTERS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LAST_SEEN_METADATA_VERSION);

STATS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_INDICES_AUTO_FOLLOWED);
STATS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_REMOTE_CLUSTER_STATE_REQUESTS);
STATS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_INDICES_AUTO_FOLLOWED);
STATS_PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), AUTO_FOLLOW_EXCEPTIONS_PARSER,
RECENT_AUTO_FOLLOW_ERRORS);
STATS_PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), AUTO_FOLLOWED_CLUSTERS_PARSER,
AUTO_FOLLOWED_CLUSTERS);
}

private final long numberOfFailedFollowIndices;
private final long numberOfFailedRemoteClusterStateRequests;
private final long numberOfSuccessfulFollowIndices;
private final NavigableMap<String, ElasticsearchException> recentAutoFollowErrors;
private final NavigableMap<String, AutoFollowedCluster> autoFollowedClusters;

AutoFollowStats(long numberOfFailedFollowIndices,
long numberOfFailedRemoteClusterStateRequests,
long numberOfSuccessfulFollowIndices,
NavigableMap<String, ElasticsearchException> recentAutoFollowErrors) {
NavigableMap<String, ElasticsearchException> recentAutoFollowErrors,
NavigableMap<String, AutoFollowedCluster> autoFollowedClusters) {
this.numberOfFailedFollowIndices = numberOfFailedFollowIndices;
this.numberOfFailedRemoteClusterStateRequests = numberOfFailedRemoteClusterStateRequests;
this.numberOfSuccessfulFollowIndices = numberOfSuccessfulFollowIndices;
this.recentAutoFollowErrors = recentAutoFollowErrors;
this.autoFollowedClusters = autoFollowedClusters;
}

public long getNumberOfFailedFollowIndices() {
Expand All @@ -102,4 +124,27 @@ public NavigableMap<String, ElasticsearchException> getRecentAutoFollowErrors()
return recentAutoFollowErrors;
}

public NavigableMap<String, AutoFollowedCluster> getAutoFollowedClusters() {
return autoFollowedClusters;
}

public static class AutoFollowedCluster {

private final long timeSinceLastAutoFollowMillis;
private final long lastSeenMetadataVersion;

public AutoFollowedCluster(long timeSinceLastAutoFollowMillis, long lastSeenMetadataVersion) {
this.timeSinceLastAutoFollowMillis = timeSinceLastAutoFollowMillis;
this.lastSeenMetadataVersion = lastSeenMetadataVersion;
}

public long getTimeSinceLastAutoFollowMillis() {
return timeSinceLastAutoFollowMillis;
}

public long getLastSeenMetadataVersion() {
return lastSeenMetadataVersion;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.client.ccr;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.client.ccr.AutoFollowStats.AutoFollowedCluster;
import org.elasticsearch.client.ccr.IndicesFollowStats.ShardFollowStats;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.ByteSizeUnit;
Expand Down Expand Up @@ -185,6 +186,19 @@ private static void toXContent(CcrStatsResponse response, XContentBuilder builde
builder.endObject();
}
builder.endArray();
builder.startArray(AutoFollowStats.AUTO_FOLLOWED_CLUSTERS.getPreferredName());
for (Map.Entry<String, AutoFollowedCluster> entry : autoFollowStats.getAutoFollowedClusters().entrySet()) {
builder.startObject();
{
builder.field(AutoFollowStats.CLUSTER_NAME.getPreferredName(), entry.getKey());
builder.field(AutoFollowStats.TIME_SINCE_LAST_AUTO_FOLLOW_MILLIS.getPreferredName(),
entry.getValue().getTimeSinceLastAutoFollowMillis());
builder.field(AutoFollowStats.LAST_SEEN_METADATA_VERSION.getPreferredName(),
entry.getValue().getLastSeenMetadataVersion());
}
builder.endObject();
}
builder.endArray();
}
builder.endObject();

Expand Down Expand Up @@ -315,11 +329,16 @@ private static AutoFollowStats randomAutoFollowStats() {
for (int i = 0; i < count; i++) {
readExceptions.put("" + i, new ElasticsearchException(new IllegalStateException("index [" + i + "]")));
}
final NavigableMap<String, AutoFollowedCluster> autoFollowClusters = new TreeMap<>();
for (int i = 0; i < count; i++) {
autoFollowClusters.put("" + i, new AutoFollowedCluster(randomLong(), randomNonNegativeLong()));
}
return new AutoFollowStats(
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
readExceptions
readExceptions,
autoFollowClusters
);
}

Expand Down
4 changes: 3 additions & 1 deletion docs/reference/ccr/apis/get-ccr-stats.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ The API returns the following results:
"number_of_failed_follow_indices" : 0,
"number_of_failed_remote_cluster_state_requests" : 0,
"number_of_successful_follow_indices" : 1,
"recent_auto_follow_errors" : []
"recent_auto_follow_errors" : [],
"auto_followed_clusters" : []
},
"follow_stats" : {
"indices" : [
Expand Down Expand Up @@ -151,6 +152,7 @@ The API returns the following results:
// TESTRESPONSE[s/"number_of_failed_remote_cluster_state_requests" : 0/"number_of_failed_remote_cluster_state_requests" : $body.auto_follow_stats.number_of_failed_remote_cluster_state_requests/]
// TESTRESPONSE[s/"number_of_successful_follow_indices" : 1/"number_of_successful_follow_indices" : $body.auto_follow_stats.number_of_successful_follow_indices/]
// TESTRESPONSE[s/"recent_auto_follow_errors" : \[\]/"recent_auto_follow_errors" : $body.auto_follow_stats.recent_auto_follow_errors/]
// TESTRESPONSE[s/"auto_followed_clusters" : \[\]/"auto_followed_clusters" : $body.auto_follow_stats.auto_followed_clusters/]
// TESTRESPONSE[s/"leader_global_checkpoint" : 1024/"leader_global_checkpoint" : $body.follow_stats.indices.0.shards.0.leader_global_checkpoint/]
// TESTRESPONSE[s/"leader_max_seq_no" : 1536/"leader_max_seq_no" : $body.follow_stats.indices.0.shards.0.leader_max_seq_no/]
// TESTRESPONSE[s/"follower_global_checkpoint" : 768/"follower_global_checkpoint" : $body.follow_stats.indices.0.shards.0.follower_global_checkpoint/]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public Collection<Object> createComponents(

return Arrays.asList(
ccrLicenseChecker,
new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker)
new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker, threadPool::relativeTimeInMillis)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,12 @@
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.core.ccr.AutoFollowStats.AutoFollowedCluster;

/**
* A component that runs only on the elected master node and follows leader indices automatically
* if they match with a auto follow pattern that is defined in {@link AutoFollowMetadata}.
Expand All @@ -67,6 +70,7 @@ public class AutoFollowCoordinator implements ClusterStateListener {
private final Client client;
private final ClusterService clusterService;
private final CcrLicenseChecker ccrLicenseChecker;
private final LongSupplier relativeMillisTimeProvider;

private volatile Map<String, AutoFollower> autoFollowers = Collections.emptyMap();

Expand All @@ -79,10 +83,13 @@ public class AutoFollowCoordinator implements ClusterStateListener {
public AutoFollowCoordinator(
Client client,
ClusterService clusterService,
CcrLicenseChecker ccrLicenseChecker) {
CcrLicenseChecker ccrLicenseChecker,
LongSupplier relativeMillisTimeProvider) {

this.client = client;
this.clusterService = clusterService;
this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker, "ccrLicenseChecker");
this.relativeMillisTimeProvider = relativeMillisTimeProvider;
clusterService.addListener(this);
this.recentAutoFollowErrors = new LinkedHashMap<String, ElasticsearchException>() {
@Override
Expand All @@ -93,11 +100,26 @@ protected boolean removeEldestEntry(final Map.Entry<String, ElasticsearchExcepti
}

public synchronized AutoFollowStats getStats() {
final Map<String, AutoFollower> autoFollowers = this.autoFollowers;
final TreeMap<String, AutoFollowedCluster> timesSinceLastAutoFollowPerRemoteCluster = new TreeMap<>();
for (Map.Entry<String, AutoFollower> entry : autoFollowers.entrySet()) {
long lastAutoFollowTimeInMillis = entry.getValue().lastAutoFollowTimeInMillis;
long lastSeenMetadataVersion = entry.getValue().metadataVersion;
if (lastAutoFollowTimeInMillis != -1) {
long timeSinceLastAutoFollowInMillis = relativeMillisTimeProvider.getAsLong() - lastAutoFollowTimeInMillis;
timesSinceLastAutoFollowPerRemoteCluster.put(entry.getKey(),
new AutoFollowedCluster(timeSinceLastAutoFollowInMillis, lastSeenMetadataVersion));
} else {
timesSinceLastAutoFollowPerRemoteCluster.put(entry.getKey(), new AutoFollowedCluster(-1L, lastSeenMetadataVersion));
}
}

return new AutoFollowStats(
numberOfFailedIndicesAutoFollowed,
numberOfFailedRemoteClusterStateRequests,
numberOfSuccessfulIndicesAutoFollowed,
new TreeMap<>(recentAutoFollowErrors)
new TreeMap<>(recentAutoFollowErrors),
timesSinceLastAutoFollowPerRemoteCluster
);
}

Expand Down Expand Up @@ -146,7 +168,8 @@ void updateAutoFollowers(ClusterState followerClusterState) {

Map<String, AutoFollower> newAutoFollowers = new HashMap<>(newRemoteClusters.size());
for (String remoteCluster : newRemoteClusters) {
AutoFollower autoFollower = new AutoFollower(remoteCluster, this::updateStats, clusterService::state) {
AutoFollower autoFollower =
new AutoFollower(remoteCluster, this::updateStats, clusterService::state, relativeMillisTimeProvider) {

@Override
void getRemoteClusterState(final String remoteCluster,
Expand Down Expand Up @@ -239,20 +262,25 @@ abstract static class AutoFollower {
private final String remoteCluster;
private final Consumer<List<AutoFollowResult>> statsUpdater;
private final Supplier<ClusterState> followerClusterStateSupplier;
private final LongSupplier relativeTimeProvider;

private volatile long lastAutoFollowTimeInMillis = -1;
private volatile long metadataVersion = 0;
private volatile CountDown autoFollowPatternsCountDown;
private volatile AtomicArray<AutoFollowResult> autoFollowResults;

AutoFollower(final String remoteCluster,
final Consumer<List<AutoFollowResult>> statsUpdater,
final Supplier<ClusterState> followerClusterStateSupplier) {
final Supplier<ClusterState> followerClusterStateSupplier,
LongSupplier relativeTimeProvider) {
this.remoteCluster = remoteCluster;
this.statsUpdater = statsUpdater;
this.followerClusterStateSupplier = followerClusterStateSupplier;
this.relativeTimeProvider = relativeTimeProvider;
}

void start() {
lastAutoFollowTimeInMillis = relativeTimeProvider.getAsLong();
final ClusterState clusterState = followerClusterStateSupplier.get();
final AutoFollowMetadata autoFollowMetadata = clusterState.metaData().custom(AutoFollowMetadata.TYPE);
if (autoFollowMetadata == null) {
Expand Down
Loading