From 4dfaf52460fde63effbc4c01f84e074ee596cba3 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Fri, 4 Jan 2019 15:42:00 +0100 Subject: [PATCH 1/2] Add support for providing absolute start time to SearchRequest We have recently added support for providing a local cluster alias to a SearchRequest through a package protected constructor. When executing cross-cluster search requests with local reduction on each cluster, the CCS coordinating node will have to provide such cluster alias to each remote cluster, as well as the absolute start time of the search action in milliseconds from the time epoch, to be used when evaluating date math expressions both while executing queries / scripts as well as when resolving index names. This commit adds support for providing the start time together with the cluster alias. It is a final member in the search request, which will only be set when using cross-cluster search with local reduction (also known as alternate execution mode). When not provided, the coordinating node will determine the current time and pass it through (by calling `System.currentTimeMillis`). Relates to #32125 --- .../action/search/SearchRequest.java | 64 ++++++++++++------- .../action/search/TransportSearchAction.java | 3 +- .../action/search/SearchRequestTests.java | 15 ++++- .../TransportSearchActionSingleNodeTests.java | 62 +++++++++++++++++- 4 files changed, 116 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java index 9789e03c83641..6b8fe1cde6561 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -62,7 +62,10 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest public static final int DEFAULT_PRE_FILTER_SHARD_SIZE = 128; public static final int DEFAULT_BATCHED_REDUCE_SIZE = 512; + private static final long DEFAULT_ABSOLUTE_START_MILLIS = -1; + private final String localClusterAlias; + private final long absoluteStartMillis; private SearchType searchType = SearchType.DEFAULT; @@ -95,6 +98,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest public SearchRequest() { this.localClusterAlias = null; + this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS; } /** @@ -115,6 +119,7 @@ public SearchRequest(SearchRequest searchRequest) { this.source = searchRequest.source; this.types = searchRequest.types; this.localClusterAlias = searchRequest.localClusterAlias; + this.absoluteStartMillis = searchRequest.absoluteStartMillis; } /** @@ -138,12 +143,17 @@ public SearchRequest(String[] indices, SearchSourceBuilder source) { } /** - * Creates a new search request by providing the alias of the cluster where it will be executed. Used when a {@link SearchRequest} - * is created and executed as part of a cross-cluster search request performing local reduction on each cluster. - * The coordinating CCS node provides the alias to prefix index names with in the returned search results. + * Creates a new search request by providing the alias of the cluster where it will be executed, as well as the current time in + * milliseconds from the epoch time. Used when a {@link SearchRequest} is created and executed as part of a cross-cluster search + * request performing local reduction on each cluster. The coordinating CCS node provides the alias to prefix index names with in + * the returned search results, and the current time to be used on the remote clusters to ensure that the same value is used. */ - SearchRequest(String localClusterAlias) { + SearchRequest(String localClusterAlias, long absoluteStartMillis) { this.localClusterAlias = Objects.requireNonNull(localClusterAlias, "cluster alias must not be null"); + if (absoluteStartMillis < 0) { + throw new IllegalArgumentException("absoluteStartMillis must not be negative but was [" + absoluteStartMillis + "]"); + } + this.absoluteStartMillis = absoluteStartMillis; } /** @@ -155,10 +165,7 @@ public SearchRequest(String[] indices, SearchSourceBuilder source) { public SearchRequest(StreamInput in) throws IOException { super(in); searchType = SearchType.fromId(in.readByte()); - indices = new String[in.readVInt()]; - for (int i = 0; i < indices.length; i++) { - indices[i] = in.readString(); - } + indices = in.readStringArray(); routing = in.readOptionalString(); preference = in.readOptionalString(); scroll = in.readOptionalWriteable(Scroll::new); @@ -175,8 +182,14 @@ public SearchRequest(StreamInput in) throws IOException { //TODO update version after backport if (in.getVersion().onOrAfter(Version.V_7_0_0)) { localClusterAlias = in.readOptionalString(); + if (localClusterAlias != null) { + absoluteStartMillis = in.readVLong(); + } else { + absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS; + } } else { localClusterAlias = null; + absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS; } } @@ -184,10 +197,7 @@ public SearchRequest(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeByte(searchType.id()); - out.writeVInt(indices.length); - for (String index : indices) { - out.writeString(index); - } + out.writeStringArray(indices); out.writeOptionalString(routing); out.writeOptionalString(preference); out.writeOptionalWriteable(scroll); @@ -204,6 +214,9 @@ public void writeTo(StreamOutput out) throws IOException { //TODO update version after backport if (out.getVersion().onOrAfter(Version.V_7_0_0)) { out.writeOptionalString(localClusterAlias); + if (localClusterAlias != null) { + out.writeVLong(absoluteStartMillis); + } } } @@ -243,6 +256,17 @@ String getLocalClusterAlias() { return localClusterAlias; } + /** + * Returns the current time in milliseconds from the time epoch, to be used for the execution of this search request. A non-null + * value (expected to be greater or equal than 0) indicates that this search request is being executed as part of a locally reduced + * cross-cluster search request. The provided current time is used to ensure that the same value, determined by the CCS coordinating + * node, is used on all clusters involved in the execution of the search request. + */ + @Nullable + Long getAbsoluteStartMillis() { + return absoluteStartMillis == DEFAULT_ABSOLUTE_START_MILLIS ? null : absoluteStartMillis; + } + /** * Sets the indices the search will be executed on. */ @@ -435,7 +459,6 @@ public Boolean allowPartialSearchResults() { return this.allowPartialSearchResults; } - /** * Sets the number of shard results that should be reduced at once on the coordinating node. This value should be used as a protection * mechanism to reduce the memory overhead per search request if the potential number of shards in the request can be large. @@ -498,13 +521,6 @@ public int getPreFilterShardSize() { return preFilterShardSize; } - /** - * Returns true iff the maxConcurrentShardRequest is set. - */ - boolean isMaxConcurrentShardRequestsSet() { - return maxConcurrentShardRequests != 0; - } - /** * @return true if the request only has suggest */ @@ -538,7 +554,7 @@ public String getDescription() { } @Override - public void readFrom(StreamInput in) throws IOException { + public void readFrom(StreamInput in) { throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @@ -564,14 +580,15 @@ public boolean equals(Object o) { Objects.equals(preFilterShardSize, that.preFilterShardSize) && Objects.equals(indicesOptions, that.indicesOptions) && Objects.equals(allowPartialSearchResults, that.allowPartialSearchResults) && - Objects.equals(localClusterAlias, that.localClusterAlias); + Objects.equals(localClusterAlias, that.localClusterAlias) && + absoluteStartMillis == that.absoluteStartMillis; } @Override public int hashCode() { return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache, scroll, Arrays.hashCode(types), indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize, - allowPartialSearchResults, localClusterAlias); + allowPartialSearchResults, localClusterAlias, absoluteStartMillis); } @Override @@ -590,6 +607,7 @@ public String toString() { ", preFilterShardSize=" + preFilterShardSize + ", allowPartialSearchResults=" + allowPartialSearchResults + ", localClusterAlias=" + localClusterAlias + + ", absoluteStartMillis=" + absoluteStartMillis + ", source=" + source + '}'; } } diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index db6867dbb3b38..3c5d192a13c4a 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -180,7 +180,8 @@ long getRelativeCurrentNanos() { @Override protected void doExecute(Task task, SearchRequest searchRequest, ActionListener listener) { - final long absoluteStartMillis = System.currentTimeMillis(); + final long absoluteStartMillis = searchRequest.getAbsoluteStartMillis() != null ? + searchRequest.getAbsoluteStartMillis() : System.currentTimeMillis(); final long relativeStartNanos = System.nanoTime(); final SearchTimeProvider timeProvider = new SearchTimeProvider(absoluteStartMillis, relativeStartNanos, System::nanoTime); diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java index 719a14491ae1d..c6f5c86b5092a 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java @@ -48,12 +48,19 @@ protected SearchRequest createSearchRequest() throws IOException { if (randomBoolean()) { return super.createSearchRequest(); } - //clusterAlias does not have public getter/setter hence we randomize it only in this test specifically. - SearchRequest searchRequest = new SearchRequest(randomAlphaOfLengthBetween(5, 10)); + //clusterAlias and absoluteStartMillis do not have public getters/setters hence we randomize them only in this test specifically. + SearchRequest searchRequest = new SearchRequest(randomAlphaOfLengthBetween(5, 10), randomNonNegativeLong()); RandomSearchRequestGenerator.randomSearchRequest(searchRequest, this::createSearchSourceBuilder); return searchRequest; } + public void testClusterAliasValidation() { + expectThrows(NullPointerException.class, () -> new SearchRequest(null, 0)); + expectThrows(IllegalArgumentException.class, () -> new SearchRequest("", -1)); + SearchRequest searchRequest = new SearchRequest("", 0); + assertNull(searchRequest.validate()); + } + public void testSerialization() throws Exception { SearchRequest searchRequest = createSearchRequest(); SearchRequest deserializedRequest = copyWriteable(searchRequest, namedWriteableRegistry, SearchRequest::new); @@ -69,8 +76,10 @@ public void testClusterAliasSerialization() throws IOException { //TODO update version after backport if (version.before(Version.V_7_0_0)) { assertNull(deserializedRequest.getLocalClusterAlias()); + assertNull(deserializedRequest.getAbsoluteStartMillis()); } else { assertEquals(searchRequest.getLocalClusterAlias(), deserializedRequest.getLocalClusterAlias()); + assertEquals(searchRequest.getAbsoluteStartMillis(), deserializedRequest.getAbsoluteStartMillis()); } } @@ -78,9 +87,11 @@ public void testClusterAliasSerialization() throws IOException { public void testReadFromPre7_0_0() throws IOException { String msg = "AAEBBWluZGV4AAAAAQACAAAA/////w8AAAAAAAAA/////w8AAAAAAAACAAAAAAABAAMCBAUBAAKABACAAQIAAA=="; try (StreamInput in = StreamInput.wrap(Base64.getDecoder().decode(msg))) { + in.setVersion(VersionUtils.randomVersionBetween(random(), Version.V_6_4_0, VersionUtils.getPreviousVersion(Version.V_7_0_0))); SearchRequest searchRequest = new SearchRequest(in); assertArrayEquals(new String[]{"index"}, searchRequest.indices()); assertNull(searchRequest.getLocalClusterAlias()); + assertNull(searchRequest.getAbsoluteStartMillis()); } } diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java index a67f80dd48f7d..19bd76ec09da2 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java @@ -21,14 +21,18 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.test.ESSingleNodeTestCase; public class TransportSearchActionSingleNodeTests extends ESSingleNodeTestCase { public void testLocalClusterAlias() { + long nowInMillis = System.currentTimeMillis(); IndexRequest indexRequest = new IndexRequest("test"); indexRequest.id("1"); indexRequest.source("field", "value"); @@ -37,7 +41,7 @@ public void testLocalClusterAlias() { assertEquals(RestStatus.CREATED, indexResponse.status()); { - SearchRequest searchRequest = new SearchRequest("local"); + SearchRequest searchRequest = new SearchRequest("local", nowInMillis); SearchResponse searchResponse = client().search(searchRequest).actionGet(); assertEquals(1, searchResponse.getHits().getTotalHits().value); SearchHit[] hits = searchResponse.getHits().getHits(); @@ -48,7 +52,7 @@ public void testLocalClusterAlias() { assertEquals("1", hit.getId()); } { - SearchRequest searchRequest = new SearchRequest(""); + SearchRequest searchRequest = new SearchRequest("", nowInMillis); SearchResponse searchResponse = client().search(searchRequest).actionGet(); assertEquals(1, searchResponse.getHits().getTotalHits().value); SearchHit[] hits = searchResponse.getHits().getHits(); @@ -59,4 +63,58 @@ public void testLocalClusterAlias() { assertEquals("1", hit.getId()); } } + + public void testAbsoluteStartMillis() { + { + IndexRequest indexRequest = new IndexRequest("test-1970.01.01"); + indexRequest.id("1"); + indexRequest.source("date", "1970-01-01"); + indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); + IndexResponse indexResponse = client().index(indexRequest).actionGet(); + assertEquals(RestStatus.CREATED, indexResponse.status()); + } + { + IndexRequest indexRequest = new IndexRequest("test-1982.01.01"); + indexRequest.id("1"); + indexRequest.source("date", "1982-01-01"); + indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); + IndexResponse indexResponse = client().index(indexRequest).actionGet(); + assertEquals(RestStatus.CREATED, indexResponse.status()); + } + { + SearchRequest searchRequest = new SearchRequest(); + SearchResponse searchResponse = client().search(searchRequest).actionGet(); + assertEquals(2, searchResponse.getHits().getTotalHits().value); + } + { + SearchRequest searchRequest = new SearchRequest(""); + searchRequest.indicesOptions(IndicesOptions.fromOptions(true, true, true, true)); + SearchResponse searchResponse = client().search(searchRequest).actionGet(); + assertEquals(0, searchResponse.getTotalShards()); + } + { + SearchRequest searchRequest = new SearchRequest("", 0); + SearchResponse searchResponse = client().search(searchRequest).actionGet(); + assertEquals(2, searchResponse.getHits().getTotalHits().value); + } + { + SearchRequest searchRequest = new SearchRequest("", 0); + searchRequest.indices(""); + SearchResponse searchResponse = client().search(searchRequest).actionGet(); + assertEquals(1, searchResponse.getHits().getTotalHits().value); + assertEquals("test-1970.01.01", searchResponse.getHits().getHits()[0].getIndex()); + } + { + SearchRequest searchRequest = new SearchRequest("", 0); + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + RangeQueryBuilder rangeQuery = new RangeQueryBuilder("date"); + rangeQuery.gte("1970-01-01"); + rangeQuery.lt("1982-01-01"); + sourceBuilder.query(rangeQuery); + searchRequest.source(sourceBuilder); + SearchResponse searchResponse = client().search(searchRequest).actionGet(); + assertEquals(1, searchResponse.getHits().getTotalHits().value); + assertEquals("test-1970.01.01", searchResponse.getHits().getHits()[0].getIndex()); + } + } } From ec6e90d95670ed8e3a71ebf8b8fdf97048c5e0b9 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Fri, 4 Jan 2019 22:23:18 +0100 Subject: [PATCH 2/2] address review comment --- .../action/search/SearchRequest.java | 16 ++++++++-------- .../action/search/TransportSearchAction.java | 4 +--- .../action/search/SearchRequestTests.java | 16 +++++++++++++--- 3 files changed, 22 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java index 6b8fe1cde6561..fd996b0aa5cdd 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -257,14 +257,14 @@ String getLocalClusterAlias() { } /** - * Returns the current time in milliseconds from the time epoch, to be used for the execution of this search request. A non-null - * value (expected to be greater or equal than 0) indicates that this search request is being executed as part of a locally reduced - * cross-cluster search request. The provided current time is used to ensure that the same value, determined by the CCS coordinating - * node, is used on all clusters involved in the execution of the search request. + * Returns the current time in milliseconds from the time epoch, to be used for the execution of this search request. Used to + * ensure that the same value, determined by the coordinating node, is used on all nodes involved in the execution of the search + * request. When created through {@link #SearchRequest(String, long)}, this method returns the provided current time, otherwise + * it will return {@link System#currentTimeMillis()}. + * */ - @Nullable - Long getAbsoluteStartMillis() { - return absoluteStartMillis == DEFAULT_ABSOLUTE_START_MILLIS ? null : absoluteStartMillis; + long getOrCreateAbsoluteStartMillis() { + return absoluteStartMillis == DEFAULT_ABSOLUTE_START_MILLIS ? System.currentTimeMillis() : absoluteStartMillis; } /** @@ -607,7 +607,7 @@ public String toString() { ", preFilterShardSize=" + preFilterShardSize + ", allowPartialSearchResults=" + allowPartialSearchResults + ", localClusterAlias=" + localClusterAlias + - ", absoluteStartMillis=" + absoluteStartMillis + + ", getOrCreateAbsoluteStartMillis=" + absoluteStartMillis + ", source=" + source + '}'; } } diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 3c5d192a13c4a..463d1487d0307 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -180,11 +180,9 @@ long getRelativeCurrentNanos() { @Override protected void doExecute(Task task, SearchRequest searchRequest, ActionListener listener) { - final long absoluteStartMillis = searchRequest.getAbsoluteStartMillis() != null ? - searchRequest.getAbsoluteStartMillis() : System.currentTimeMillis(); final long relativeStartNanos = System.nanoTime(); final SearchTimeProvider timeProvider = - new SearchTimeProvider(absoluteStartMillis, relativeStartNanos, System::nanoTime); + new SearchTimeProvider(searchRequest.getOrCreateAbsoluteStartMillis(), relativeStartNanos, System::nanoTime); ActionListener rewriteListener = ActionListener.wrap(source -> { if (source != searchRequest.source()) { // only set it if it changed - we don't allow null values to be set but it might be already null be we want to catch diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java index c6f5c86b5092a..3fb9b6ae4eb16 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java @@ -40,6 +40,9 @@ import java.util.List; import static org.elasticsearch.test.EqualsHashCodeTestUtils.checkEqualsAndHashCode; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; public class SearchRequestTests extends AbstractSearchTestCase { @@ -76,10 +79,10 @@ public void testClusterAliasSerialization() throws IOException { //TODO update version after backport if (version.before(Version.V_7_0_0)) { assertNull(deserializedRequest.getLocalClusterAlias()); - assertNull(deserializedRequest.getAbsoluteStartMillis()); + assertAbsoluteStartMillisIsCurrentTime(deserializedRequest); } else { assertEquals(searchRequest.getLocalClusterAlias(), deserializedRequest.getLocalClusterAlias()); - assertEquals(searchRequest.getAbsoluteStartMillis(), deserializedRequest.getAbsoluteStartMillis()); + assertEquals(searchRequest.getOrCreateAbsoluteStartMillis(), deserializedRequest.getOrCreateAbsoluteStartMillis()); } } @@ -91,10 +94,17 @@ public void testReadFromPre7_0_0() throws IOException { SearchRequest searchRequest = new SearchRequest(in); assertArrayEquals(new String[]{"index"}, searchRequest.indices()); assertNull(searchRequest.getLocalClusterAlias()); - assertNull(searchRequest.getAbsoluteStartMillis()); + assertAbsoluteStartMillisIsCurrentTime(searchRequest); } } + private static void assertAbsoluteStartMillisIsCurrentTime(SearchRequest searchRequest) { + long before = System.currentTimeMillis(); + long absoluteStartMillis = searchRequest.getOrCreateAbsoluteStartMillis(); + long after = System.currentTimeMillis(); + assertThat(absoluteStartMillis, allOf(greaterThanOrEqualTo(before), lessThanOrEqualTo(after))); + } + public void testIllegalArguments() { SearchRequest searchRequest = new SearchRequest(); assertNotNull(searchRequest.indices());