Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/plugin/ccr/qa/chain/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ followClusterTestCluster {
numNodes = 1
clusterName = 'follow-cluster'
setting 'xpack.license.self_generated.type', 'trial'
setting 'cluster.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\""
setting 'cluster.remote.middle_cluster.seeds', "\"${-> middleClusterTest.nodes.get(0).transportUri()}\""
setting 'node.name', 'follow'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,17 @@

package org.elasticsearch.xpack.ccr;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;

import java.util.Comparator;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;

public class ChainIT extends ESCCRRestTestCase {

public void testFollowIndex() throws Exception {
Expand Down Expand Up @@ -71,4 +79,68 @@ public void testFollowIndex() throws Exception {
}
}

public void testAutoFollowPatterns() throws Exception {
if ("follow".equals(targetCluster) == false) {
return;
}

Request putPatternRequest = new Request("PUT", "/_ccr/auto_follow/leader_cluster_pattern");
putPatternRequest.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"], \"remote_cluster\": \"leader_cluster\"}");
assertOK(client().performRequest(putPatternRequest));

putPatternRequest = new Request("PUT", "/_ccr/auto_follow/middle_cluster_pattern");
putPatternRequest.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"], \"remote_cluster\": \"middle_cluster\"}");
assertOK(client().performRequest(putPatternRequest));

try (RestClient leaderClient = buildLeaderClient()) {
Settings settings = Settings.builder()
.put("index.soft_deletes.enabled", true)
.build();
Request request = new Request("PUT", "/logs-20190101");
request.setJsonEntity("{\"settings\": " + Strings.toString(settings) +
", \"mappings\": {\"_doc\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}} }");
assertOK(leaderClient.performRequest(request));

for (int i = 0; i < 5; i++) {
String id = Integer.toString(i);
index(leaderClient, "logs-20190101", id, "field", i, "filtered_field", "true");
}
}

try (RestClient middleClient = buildMiddleClient()) {
Settings settings = Settings.builder()
.put("index.soft_deletes.enabled", true)
.build();
Request request = new Request("PUT", "/logs-20200101");
request.setJsonEntity("{\"settings\": " + Strings.toString(settings) +
", \"mappings\": {\"_doc\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}} }");
assertOK(middleClient.performRequest(request));

for (int i = 0; i < 5; i++) {
String id = Integer.toString(i);
index(middleClient, "logs-20200101", id, "field", i, "filtered_field", "true");
}
}

assertBusy(() -> {
Request statsRequest = new Request("GET", "/_ccr/stats");
Map<?, ?> response = toMap(client().performRequest(statsRequest));
Map<?, ?> autoFollowStats = (Map<?, ?>) response.get("auto_follow_stats");
assertThat(autoFollowStats.get("number_of_successful_follow_indices"), equalTo(2));

@SuppressWarnings("unchecked")
List<Map<String, ?>> trackingRemoteClusters =
(List<Map<String, ?>>) autoFollowStats.get("tracking_remote_clusters");
trackingRemoteClusters.sort(Comparator.comparing(o -> ((String) o.get("cluster_name"))));
assertThat(trackingRemoteClusters.size(), equalTo(2));
assertThat(trackingRemoteClusters.get(0).get("cluster_name"), equalTo("leader_cluster"));
assertThat(trackingRemoteClusters.get(1).get("cluster_name"), equalTo("middle_cluster"));

ensureYellow("logs-20190101");
ensureYellow("logs-20200101");
verifyDocuments("logs-20190101", 5, "filtered_field:true");
verifyDocuments("logs-20200101", 5, "filtered_field:true");
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public Collection<Object> createComponents(

return Arrays.asList(
ccrLicenseChecker,
new AutoFollowCoordinator(settings, client, threadPool, clusterService, ccrLicenseChecker)
new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker, System::nanoTime)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
client.getRemoteClusterClient(clusterAlias),
request,
onFailure,
leaderClusterState -> {
leaderClusterStateResponse -> {
ClusterState leaderClusterState = leaderClusterStateResponse.getState();
IndexMetaData leaderIndexMetaData = leaderClusterState.getMetaData().index(leaderIndex);
if (leaderIndexMetaData == null) {
onFailure.accept(new IndexNotFoundException(leaderIndex));
Expand Down Expand Up @@ -159,7 +160,7 @@ public void checkRemoteClusterLicenseAndFetchClusterState(
final String clusterAlias,
final ClusterStateRequest request,
final Consumer<Exception> onFailure,
final Consumer<ClusterState> leaderClusterStateConsumer) {
final Consumer<ClusterStateResponse> leaderClusterStateConsumer) {
checkRemoteClusterLicenseAndFetchClusterState(
client,
clusterAlias,
Expand Down Expand Up @@ -192,7 +193,7 @@ private void checkRemoteClusterLicenseAndFetchClusterState(
final Client remoteClient,
final ClusterStateRequest request,
final Consumer<Exception> onFailure,
final Consumer<ClusterState> leaderClusterStateConsumer,
final Consumer<ClusterStateResponse> leaderClusterStateConsumer,
final Function<RemoteClusterLicenseChecker.LicenseCheck, ElasticsearchStatusException> nonCompliantLicense,
final Function<Exception, ElasticsearchStatusException> unknownLicense) {
// we have to check the license on the remote cluster
Expand All @@ -204,7 +205,7 @@ private void checkRemoteClusterLicenseAndFetchClusterState(
public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) {
if (licenseCheck.isSuccess()) {
final ActionListener<ClusterStateResponse> clusterStateListener =
ActionListener.wrap(s -> leaderClusterStateConsumer.accept(s.getState()), onFailure);
ActionListener.wrap(leaderClusterStateConsumer::accept, onFailure);
// following an index in remote cluster, so use remote client to fetch leader index metadata
remoteClient.admin().cluster().state(request, clusterStateListener);
} else {
Expand All @@ -228,9 +229,7 @@ public void onFailure(final Exception e) {
* @param onFailure the failure consumer
* @param historyUUIDConsumer the leader index history uuid and consumer
*/
// NOTE: Placed this method here; in order to avoid duplication of logic for fetching history UUIDs
// in case of following a local or a remote cluster.
public void fetchLeaderHistoryUUIDs(
private void fetchLeaderHistoryUUIDs(
final Client remoteClient,
final IndexMetaData leaderIndexMetaData,
final Consumer<Exception> onFailure,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.XPackSettings;

import java.util.Arrays;
Expand All @@ -29,12 +28,6 @@ private CcrSettings() {
public static final Setting<Boolean> CCR_FOLLOWING_INDEX_SETTING =
Setting.boolSetting("index.xpack.ccr.following_index", false, Property.IndexScope, Property.InternalIndex);

/**
* Setting for controlling the interval in between polling leader clusters to check whether there are indices to follow
*/
public static final Setting<TimeValue> CCR_AUTO_FOLLOW_POLL_INTERVAL =
Setting.timeSetting("xpack.ccr.auto_follow.poll_interval", TimeValue.timeValueMillis(2500), Property.NodeScope);

/**
* The settings defined by CCR.
*
Expand All @@ -43,8 +36,8 @@ private CcrSettings() {
static List<Setting<?>> getSettings() {
return Arrays.asList(
XPackSettings.CCR_ENABLED_SETTING,
CCR_FOLLOWING_INDEX_SETTING,
CCR_AUTO_FOLLOW_POLL_INTERVAL);
CCR_FOLLOWING_INDEX_SETTING
);
}

}
Loading