diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java
index 9789e03c83641..fd996b0aa5cdd 100644
--- a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java
+++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java
@@ -62,7 +62,10 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
public static final int DEFAULT_PRE_FILTER_SHARD_SIZE = 128;
public static final int DEFAULT_BATCHED_REDUCE_SIZE = 512;
+ private static final long DEFAULT_ABSOLUTE_START_MILLIS = -1;
+
private final String localClusterAlias;
+ private final long absoluteStartMillis;
private SearchType searchType = SearchType.DEFAULT;
@@ -95,6 +98,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
public SearchRequest() {
this.localClusterAlias = null;
+ this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
}
/**
@@ -115,6 +119,7 @@ public SearchRequest(SearchRequest searchRequest) {
this.source = searchRequest.source;
this.types = searchRequest.types;
this.localClusterAlias = searchRequest.localClusterAlias;
+ this.absoluteStartMillis = searchRequest.absoluteStartMillis;
}
/**
@@ -138,12 +143,17 @@ public SearchRequest(String[] indices, SearchSourceBuilder source) {
}
/**
- * Creates a new search request by providing the alias of the cluster where it will be executed. Used when a {@link SearchRequest}
- * is created and executed as part of a cross-cluster search request performing local reduction on each cluster.
- * The coordinating CCS node provides the alias to prefix index names with in the returned search results.
+ * Creates a new search request by providing the alias of the cluster where it will be executed, as well as the current time in
+ * milliseconds from the epoch time. Used when a {@link SearchRequest} is created and executed as part of a cross-cluster search
+ * request performing local reduction on each cluster. The coordinating CCS node provides the alias to prefix index names with in
+ * the returned search results, and the current time to be used on the remote clusters to ensure that the same value is used.
*/
- SearchRequest(String localClusterAlias) {
+ SearchRequest(String localClusterAlias, long absoluteStartMillis) {
this.localClusterAlias = Objects.requireNonNull(localClusterAlias, "cluster alias must not be null");
+ if (absoluteStartMillis < 0) {
+ throw new IllegalArgumentException("absoluteStartMillis must not be negative but was [" + absoluteStartMillis + "]");
+ }
+ this.absoluteStartMillis = absoluteStartMillis;
}
/**
@@ -155,10 +165,7 @@ public SearchRequest(String[] indices, SearchSourceBuilder source) {
public SearchRequest(StreamInput in) throws IOException {
super(in);
searchType = SearchType.fromId(in.readByte());
- indices = new String[in.readVInt()];
- for (int i = 0; i < indices.length; i++) {
- indices[i] = in.readString();
- }
+ indices = in.readStringArray();
routing = in.readOptionalString();
preference = in.readOptionalString();
scroll = in.readOptionalWriteable(Scroll::new);
@@ -175,8 +182,14 @@ public SearchRequest(StreamInput in) throws IOException {
//TODO update version after backport
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
localClusterAlias = in.readOptionalString();
+ if (localClusterAlias != null) {
+ absoluteStartMillis = in.readVLong();
+ } else {
+ absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
+ }
} else {
localClusterAlias = null;
+ absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
}
}
@@ -184,10 +197,7 @@ public SearchRequest(StreamInput in) throws IOException {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeByte(searchType.id());
- out.writeVInt(indices.length);
- for (String index : indices) {
- out.writeString(index);
- }
+ out.writeStringArray(indices);
out.writeOptionalString(routing);
out.writeOptionalString(preference);
out.writeOptionalWriteable(scroll);
@@ -204,6 +214,9 @@ public void writeTo(StreamOutput out) throws IOException {
//TODO update version after backport
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeOptionalString(localClusterAlias);
+ if (localClusterAlias != null) {
+ out.writeVLong(absoluteStartMillis);
+ }
}
}
@@ -243,6 +256,17 @@ String getLocalClusterAlias() {
return localClusterAlias;
}
+ /**
+ * Returns the current time in milliseconds from the time epoch, to be used for the execution of this search request. Used to
+ * ensure that the same value, determined by the coordinating node, is used on all nodes involved in the execution of the search
+ * request. When created through {@link #SearchRequest(String, long)}, this method returns the provided current time, otherwise
+ * it will return {@link System#currentTimeMillis()}.
+ *
+ */
+ long getOrCreateAbsoluteStartMillis() {
+ return absoluteStartMillis == DEFAULT_ABSOLUTE_START_MILLIS ? System.currentTimeMillis() : absoluteStartMillis;
+ }
+
/**
* Sets the indices the search will be executed on.
*/
@@ -435,7 +459,6 @@ public Boolean allowPartialSearchResults() {
return this.allowPartialSearchResults;
}
-
/**
* Sets the number of shard results that should be reduced at once on the coordinating node. This value should be used as a protection
* mechanism to reduce the memory overhead per search request if the potential number of shards in the request can be large.
@@ -498,13 +521,6 @@ public int getPreFilterShardSize() {
return preFilterShardSize;
}
- /**
- * Returns true iff the maxConcurrentShardRequest is set.
- */
- boolean isMaxConcurrentShardRequestsSet() {
- return maxConcurrentShardRequests != 0;
- }
-
/**
* @return true if the request only has suggest
*/
@@ -538,7 +554,7 @@ public String getDescription() {
}
@Override
- public void readFrom(StreamInput in) throws IOException {
+ public void readFrom(StreamInput in) {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@@ -564,14 +580,15 @@ public boolean equals(Object o) {
Objects.equals(preFilterShardSize, that.preFilterShardSize) &&
Objects.equals(indicesOptions, that.indicesOptions) &&
Objects.equals(allowPartialSearchResults, that.allowPartialSearchResults) &&
- Objects.equals(localClusterAlias, that.localClusterAlias);
+ Objects.equals(localClusterAlias, that.localClusterAlias) &&
+ absoluteStartMillis == that.absoluteStartMillis;
}
@Override
public int hashCode() {
return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache,
scroll, Arrays.hashCode(types), indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize,
- allowPartialSearchResults, localClusterAlias);
+ allowPartialSearchResults, localClusterAlias, absoluteStartMillis);
}
@Override
@@ -590,6 +607,7 @@ public String toString() {
", preFilterShardSize=" + preFilterShardSize +
", allowPartialSearchResults=" + allowPartialSearchResults +
", localClusterAlias=" + localClusterAlias +
+ ", getOrCreateAbsoluteStartMillis=" + absoluteStartMillis +
", source=" + source + '}';
}
}
diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java
index db6867dbb3b38..463d1487d0307 100644
--- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java
+++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java
@@ -180,10 +180,9 @@ long getRelativeCurrentNanos() {
@Override
protected void doExecute(Task task, SearchRequest searchRequest, ActionListener listener) {
- final long absoluteStartMillis = System.currentTimeMillis();
final long relativeStartNanos = System.nanoTime();
final SearchTimeProvider timeProvider =
- new SearchTimeProvider(absoluteStartMillis, relativeStartNanos, System::nanoTime);
+ new SearchTimeProvider(searchRequest.getOrCreateAbsoluteStartMillis(), relativeStartNanos, System::nanoTime);
ActionListener rewriteListener = ActionListener.wrap(source -> {
if (source != searchRequest.source()) {
// only set it if it changed - we don't allow null values to be set but it might be already null be we want to catch
diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java
index ed32c0a1dd309..3fb9b6ae4eb16 100644
--- a/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java
+++ b/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java
@@ -40,6 +40,9 @@
import java.util.List;
import static org.elasticsearch.test.EqualsHashCodeTestUtils.checkEqualsAndHashCode;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
public class SearchRequestTests extends AbstractSearchTestCase {
@@ -48,12 +51,19 @@ protected SearchRequest createSearchRequest() throws IOException {
if (randomBoolean()) {
return super.createSearchRequest();
}
- //clusterAlias does not have public getter/setter hence we randomize it only in this test specifically.
- SearchRequest searchRequest = new SearchRequest(randomAlphaOfLengthBetween(5, 10));
+ //clusterAlias and absoluteStartMillis do not have public getters/setters hence we randomize them only in this test specifically.
+ SearchRequest searchRequest = new SearchRequest(randomAlphaOfLengthBetween(5, 10), randomNonNegativeLong());
RandomSearchRequestGenerator.randomSearchRequest(searchRequest, this::createSearchSourceBuilder);
return searchRequest;
}
+ public void testClusterAliasValidation() {
+ expectThrows(NullPointerException.class, () -> new SearchRequest(null, 0));
+ expectThrows(IllegalArgumentException.class, () -> new SearchRequest("", -1));
+ SearchRequest searchRequest = new SearchRequest("", 0);
+ assertNull(searchRequest.validate());
+ }
+
public void testSerialization() throws Exception {
SearchRequest searchRequest = createSearchRequest();
SearchRequest deserializedRequest = copyWriteable(searchRequest, namedWriteableRegistry, SearchRequest::new);
@@ -69,8 +79,10 @@ public void testClusterAliasSerialization() throws IOException {
//TODO update version after backport
if (version.before(Version.V_7_0_0)) {
assertNull(deserializedRequest.getLocalClusterAlias());
+ assertAbsoluteStartMillisIsCurrentTime(deserializedRequest);
} else {
assertEquals(searchRequest.getLocalClusterAlias(), deserializedRequest.getLocalClusterAlias());
+ assertEquals(searchRequest.getOrCreateAbsoluteStartMillis(), deserializedRequest.getOrCreateAbsoluteStartMillis());
}
}
@@ -78,13 +90,21 @@ public void testClusterAliasSerialization() throws IOException {
public void testReadFromPre7_0_0() throws IOException {
String msg = "AAEBBWluZGV4AAAAAQACAAAA/////w8AAAAAAAAA/////w8AAAAAAAACAAAAAAABAAMCBAUBAAKABACAAQIAAA==";
try (StreamInput in = StreamInput.wrap(Base64.getDecoder().decode(msg))) {
- in.setVersion(Version.V_6_6_0);
+ in.setVersion(VersionUtils.randomVersionBetween(random(), Version.V_6_4_0, VersionUtils.getPreviousVersion(Version.V_7_0_0)));
SearchRequest searchRequest = new SearchRequest(in);
assertArrayEquals(new String[]{"index"}, searchRequest.indices());
assertNull(searchRequest.getLocalClusterAlias());
+ assertAbsoluteStartMillisIsCurrentTime(searchRequest);
}
}
+ private static void assertAbsoluteStartMillisIsCurrentTime(SearchRequest searchRequest) {
+ long before = System.currentTimeMillis();
+ long absoluteStartMillis = searchRequest.getOrCreateAbsoluteStartMillis();
+ long after = System.currentTimeMillis();
+ assertThat(absoluteStartMillis, allOf(greaterThanOrEqualTo(before), lessThanOrEqualTo(after)));
+ }
+
public void testIllegalArguments() {
SearchRequest searchRequest = new SearchRequest();
assertNotNull(searchRequest.indices());
diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java
index a67f80dd48f7d..19bd76ec09da2 100644
--- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java
+++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java
@@ -21,14 +21,18 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESSingleNodeTestCase;
public class TransportSearchActionSingleNodeTests extends ESSingleNodeTestCase {
public void testLocalClusterAlias() {
+ long nowInMillis = System.currentTimeMillis();
IndexRequest indexRequest = new IndexRequest("test");
indexRequest.id("1");
indexRequest.source("field", "value");
@@ -37,7 +41,7 @@ public void testLocalClusterAlias() {
assertEquals(RestStatus.CREATED, indexResponse.status());
{
- SearchRequest searchRequest = new SearchRequest("local");
+ SearchRequest searchRequest = new SearchRequest("local", nowInMillis);
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(1, searchResponse.getHits().getTotalHits().value);
SearchHit[] hits = searchResponse.getHits().getHits();
@@ -48,7 +52,7 @@ public void testLocalClusterAlias() {
assertEquals("1", hit.getId());
}
{
- SearchRequest searchRequest = new SearchRequest("");
+ SearchRequest searchRequest = new SearchRequest("", nowInMillis);
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(1, searchResponse.getHits().getTotalHits().value);
SearchHit[] hits = searchResponse.getHits().getHits();
@@ -59,4 +63,58 @@ public void testLocalClusterAlias() {
assertEquals("1", hit.getId());
}
}
+
+ public void testAbsoluteStartMillis() {
+ {
+ IndexRequest indexRequest = new IndexRequest("test-1970.01.01");
+ indexRequest.id("1");
+ indexRequest.source("date", "1970-01-01");
+ indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
+ IndexResponse indexResponse = client().index(indexRequest).actionGet();
+ assertEquals(RestStatus.CREATED, indexResponse.status());
+ }
+ {
+ IndexRequest indexRequest = new IndexRequest("test-1982.01.01");
+ indexRequest.id("1");
+ indexRequest.source("date", "1982-01-01");
+ indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
+ IndexResponse indexResponse = client().index(indexRequest).actionGet();
+ assertEquals(RestStatus.CREATED, indexResponse.status());
+ }
+ {
+ SearchRequest searchRequest = new SearchRequest();
+ SearchResponse searchResponse = client().search(searchRequest).actionGet();
+ assertEquals(2, searchResponse.getHits().getTotalHits().value);
+ }
+ {
+ SearchRequest searchRequest = new SearchRequest("");
+ searchRequest.indicesOptions(IndicesOptions.fromOptions(true, true, true, true));
+ SearchResponse searchResponse = client().search(searchRequest).actionGet();
+ assertEquals(0, searchResponse.getTotalShards());
+ }
+ {
+ SearchRequest searchRequest = new SearchRequest("", 0);
+ SearchResponse searchResponse = client().search(searchRequest).actionGet();
+ assertEquals(2, searchResponse.getHits().getTotalHits().value);
+ }
+ {
+ SearchRequest searchRequest = new SearchRequest("", 0);
+ searchRequest.indices("");
+ SearchResponse searchResponse = client().search(searchRequest).actionGet();
+ assertEquals(1, searchResponse.getHits().getTotalHits().value);
+ assertEquals("test-1970.01.01", searchResponse.getHits().getHits()[0].getIndex());
+ }
+ {
+ SearchRequest searchRequest = new SearchRequest("", 0);
+ SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
+ RangeQueryBuilder rangeQuery = new RangeQueryBuilder("date");
+ rangeQuery.gte("1970-01-01");
+ rangeQuery.lt("1982-01-01");
+ sourceBuilder.query(rangeQuery);
+ searchRequest.source(sourceBuilder);
+ SearchResponse searchResponse = client().search(searchRequest).actionGet();
+ assertEquals(1, searchResponse.getHits().getTotalHits().value);
+ assertEquals("test-1970.01.01", searchResponse.getHits().getHits()[0].getIndex());
+ }
+ }
}