Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.search.TransportSearchAction.SearchTimeProvider;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.Nullable;
Expand All @@ -43,7 +44,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
Expand All @@ -70,7 +70,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final Object shardFailuresMutex = new Object();
private final AtomicInteger successfulOps = new AtomicInteger();
private final AtomicInteger skippedOps = new AtomicInteger();
private final TransportSearchAction.SearchTimeProvider timeProvider;
private final SearchTimeProvider timeProvider;
private final SearchResponse.Clusters clusters;

AbstractSearchAsyncAction(String name, Logger logger, SearchTransportService searchTransportService,
Expand All @@ -79,7 +79,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
Map<String, Set<String>> indexRoutings,
Executor executor, SearchRequest request,
ActionListener<SearchResponse> listener, GroupShardsIterator<SearchShardIterator> shardsIts,
TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion,
SearchTimeProvider timeProvider, long clusterStateVersion,
SearchTask task, SearchPhaseResults<Result> resultConsumer, int maxConcurrentRequestsPerNode,
SearchResponse.Clusters clusters) {
super(name, request, shardsIts, logger, maxConcurrentRequestsPerNode, executor);
Expand All @@ -103,8 +103,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
* Builds how long it took to execute the search.
*/
long buildTookInMillis() {
return TimeUnit.NANOSECONDS.toMillis(
timeProvider.getRelativeCurrentNanos() - timeProvider.getRelativeStartNanos());
return timeProvider.buildTookInMillis();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ TotalHits getTotalHits() {
if (totalHits < trackTotalHitsUpTo) {
return new TotalHits(totalHits, totalHitsRelation);
} else {
/**
/*
* The user requested to count the total hits up to <code>trackTotalHitsUpTo</code>
* so we return this lower bound when the total hits is greater than this value.
* This can happen when multiple shards are merged since the limit to track total hits
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,297 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.search;

import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldDocs;
import org.apache.lucene.search.TotalHits;
import org.apache.lucene.search.grouping.CollapseTopFieldDocs;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.search.TransportSearchAction.SearchTimeProvider;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.profile.ProfileShardResult;
import org.elasticsearch.search.profile.SearchProfileShardResults;
import org.elasticsearch.search.suggest.Suggest;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Function;

import static org.elasticsearch.action.search.SearchResponse.Clusters;

/**
* Merges multiple search responses into one. Used in cross-cluster search when reduction is performed locally on each cluster.
* The CCS coordinating node sends one search request per remote cluster involved and gets one search response back from each one of them.
* Such responses contain all the info to be able to perform an additional reduction and return results back to the user.
* Preconditions are that only non final reduction has been performed on each cluster, meaning that buckets have not been pruned locally
* and pipeline aggregations have not yet been executed. Also, from+size search hits need to be requested to each cluster.
*/
//TODO it may make sense to investigate reusing existing merge code in SearchPhaseController#reducedQueryPhase, the logic is similar
//yet there are substantial differences in terms of the objects exchanged and logic in the sortDocs method.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's more than just reusing the reducedQueryPhase, as we discussed earlier we could integrate the remote cluster response as a shard response in the initial search phase and ignore hits coming from the remote cluster in the fetch phase. This would be identical to the removed QueryAndFetch strategy except that only the remote cluster response would have the fetch results. This is really a nice to have so no need to follow up on this but it would be nice if the TODO mentions this.

final class SearchResponseMerger {
private final int from;
private final int size;
private final SearchTimeProvider searchTimeProvider;
private final Clusters clusters;
private final Function<Boolean, ReduceContext> reduceContextFunction;
private final List<SearchResponse> searchResponses = new CopyOnWriteArrayList<>();

SearchResponseMerger(int from, int size, SearchTimeProvider searchTimeProvider, Clusters clusters,
Function<Boolean, ReduceContext> reduceContextFunction) {
this.from = from;
this.size = size;
this.searchTimeProvider = Objects.requireNonNull(searchTimeProvider);
this.clusters = Objects.requireNonNull(clusters);
this.reduceContextFunction = Objects.requireNonNull(reduceContextFunction);
}

/**
* Add a search response to the list of responses to be merged together into one.
* Merges currently happen at once when all responses are available and {@link #getMergedResponse()} is called. That may change
* in the future as it's possible to introduce incremental merges as responses come in if necessary.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think incremental merges is appealing here. The number of remote clusters should be low so there is no benefit to do the reduce incrementally.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may come back to this and have a laugh one day about the "number of remote clusters should be low" assumption :) but I agree we are not talking about hundreds at the moment. I thought it may be useful to incrementally reduce given the size of the responses from each cluster, but we should first measure what the benefit is if any.

*/
void add(SearchResponse searchResponse) {
searchResponses.add(searchResponse);
}

/**
* Returns the merged response. To be called once all responses have been added through {@link #add(SearchResponse)}
* so that all responses are merged into a single one.
*/
SearchResponse getMergedResponse() {
assert searchResponses.size() > 1;
int totalShards = 0;
int skippedShards = 0;
int successfulShards = 0;
boolean timedOut = false;
Boolean terminatedEarly = null;
//the current reduce phase counts as one
int numReducePhases = 1;
float maxScore = Float.NEGATIVE_INFINITY;
List<ShardSearchFailure> failures = new ArrayList<>();
Map<String, ProfileShardResult> profileResults = new HashMap<>();
List<InternalAggregations> aggs = new ArrayList<>();
Map<ShardId, Integer> shards = new TreeMap<>();
List<TopDocs> topDocsList = new ArrayList<>(searchResponses.size());
Map<String, List<Suggest.Suggestion>> groupedSuggestions = new HashMap<>();
Boolean trackTotalHits = null;

for (SearchResponse searchResponse : searchResponses) {
totalShards += searchResponse.getTotalShards();
skippedShards += searchResponse.getSkippedShards();
successfulShards += searchResponse.getSuccessfulShards();
timedOut = timedOut || searchResponse.isTimedOut();
if (searchResponse.isTerminatedEarly() != null && searchResponse.isTerminatedEarly()) {
terminatedEarly = true;
}
numReducePhases += searchResponse.getNumReducePhases();

Collections.addAll(failures, searchResponse.getShardFailures());

profileResults.putAll(searchResponse.getProfileResults());

if (searchResponse.getAggregations() != null) {
InternalAggregations internalAggs = (InternalAggregations) searchResponse.getAggregations();
aggs.add(internalAggs);
}

Suggest suggest = searchResponse.getSuggest();
if (suggest != null) {
for (Suggest.Suggestion<? extends Suggest.Suggestion.Entry<? extends Suggest.Suggestion.Entry.Option>> entries : suggest) {
List<Suggest.Suggestion> suggestionList = groupedSuggestions.computeIfAbsent(entries.getName(), s -> new ArrayList<>());
suggestionList.add(entries);
}
}

SearchHits searchHits = searchResponse.getHits();
if (Float.isNaN(searchHits.getMaxScore()) == false) {
maxScore = Math.max(maxScore, searchHits.getMaxScore());
}
final TotalHits totalHits;
if (searchHits.getTotalHits() == null) {
//in case we did't track total hits, we get null from each cluster, but we need to set 0 eq to the TopDocs
totalHits = new TotalHits(0, TotalHits.Relation.EQUAL_TO);
assert trackTotalHits == null || trackTotalHits == false;
trackTotalHits = false;
} else {
totalHits = searchHits.getTotalHits();
assert trackTotalHits == null || trackTotalHits;
trackTotalHits = true;
}
topDocsList.add(searchHitsToTopDocs(searchHits, totalHits, shards));
}

//now that we've gone through all the hits and we collected all the shards they come from, we can assign shardIndex to each shard
int shardIndex = 0;
for (Map.Entry<ShardId, Integer> shard : shards.entrySet()) {
shard.setValue(shardIndex++);
}
//and go through all the scoreDocs from each cluster and set their corresponding shardIndex
for (TopDocs topDocs : topDocsList) {
for (ScoreDoc scoreDoc : topDocs.scoreDocs) {
FieldDocAndSearchHit fieldDocAndSearchHit = (FieldDocAndSearchHit) scoreDoc;
ShardId shardId = fieldDocAndSearchHit.searchHit.getShard().getShardId();
fieldDocAndSearchHit.shardIndex = shards.get(shardId);
}
}

TopDocs topDocs = SearchPhaseController.mergeTopDocs(topDocsList, size, from);
SearchHits mergedSearchHits = topDocsToSearchHits(topDocs, Float.isInfinite(maxScore) ? Float.NaN : maxScore, trackTotalHits);
Suggest suggest = groupedSuggestions.isEmpty() ? null : new Suggest(Suggest.reduce(groupedSuggestions));
InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, reduceContextFunction.apply(true));
ShardSearchFailure[] shardFailures = failures.toArray(ShardSearchFailure.EMPTY_ARRAY);
//make failures ordering consistent with ordinary search and CCS
Arrays.sort(shardFailures, FAILURES_COMPARATOR);
InternalSearchResponse response = new InternalSearchResponse(mergedSearchHits, reducedAggs, suggest,
new SearchProfileShardResults(profileResults), timedOut, terminatedEarly, numReducePhases);
long tookInMillis = searchTimeProvider.buildTookInMillis();
return new SearchResponse(response, null, totalShards, successfulShards, skippedShards, tookInMillis, shardFailures, clusters);
}

private static final Comparator<ShardSearchFailure> FAILURES_COMPARATOR = new Comparator<ShardSearchFailure>() {
@Override
public int compare(ShardSearchFailure o1, ShardSearchFailure o2) {
ShardId shardId1 = extractShardId(o1);
ShardId shardId2 = extractShardId(o2);
if (shardId1 == null && shardId2 == null) {
return 0;
}
if (shardId1 == null) {
return -1;
}
if (shardId2 == null) {
return 1;
}
return shardId1.compareTo(shardId2);
}

private ShardId extractShardId(ShardSearchFailure failure) {
SearchShardTarget shard = failure.shard();
if (shard != null) {
return shard.getShardId();
}
Throwable cause = failure.getCause();
if (cause instanceof ElasticsearchException) {
ElasticsearchException e = (ElasticsearchException) cause;
return e.getShardId();
}
return null;
}
};

private static TopDocs searchHitsToTopDocs(SearchHits searchHits, TotalHits totalHits, Map<ShardId, Integer> shards) {
SearchHit[] hits = searchHits.getHits();
ScoreDoc[] scoreDocs = new ScoreDoc[hits.length];
final TopDocs topDocs;
if (searchHits.getSortFields() != null) {
if (searchHits.getCollapseField() != null) {
assert searchHits.getCollapseValues() != null;
topDocs = new CollapseTopFieldDocs(searchHits.getCollapseField(), totalHits, scoreDocs,
searchHits.getSortFields(), searchHits.getCollapseValues());
} else {
topDocs = new TopFieldDocs(totalHits, scoreDocs, searchHits.getSortFields());
}
} else {
topDocs = new TopDocs(totalHits, scoreDocs);
}

for (int i = 0; i < hits.length; i++) {
SearchHit hit = hits[i];
ShardId shardId = hit.getShard().getShardId();
shards.putIfAbsent(shardId, null);
final SortField[] sortFields = searchHits.getSortFields();
final Object[] sortValues;
if (sortFields == null) {
sortValues = null;
} else {
if (sortFields.length == 1 && sortFields[0].getType() == SortField.Type.SCORE) {
sortValues = new Object[]{hit.getScore()};
} else {
sortValues = hit.getRawSortValues();
}
}
scoreDocs[i] = new FieldDocAndSearchHit(hit.docId(), hit.getScore(), sortValues, hit);
}
return topDocs;
}

private static SearchHits topDocsToSearchHits(TopDocs topDocs, float maxScore, boolean trackTotalHits) {
SearchHit[] searchHits = new SearchHit[topDocs.scoreDocs.length];
for (int i = 0; i < topDocs.scoreDocs.length; i++) {
FieldDocAndSearchHit scoreDoc = (FieldDocAndSearchHit)topDocs.scoreDocs[i];
searchHits[i] = scoreDoc.searchHit;
}

SortField[] sortFields = null;
String collapseField = null;
Object[] collapseValues = null;
if (topDocs instanceof TopFieldDocs) {
sortFields = ((TopFieldDocs)topDocs).fields;
if (topDocs instanceof CollapseTopFieldDocs) {
CollapseTopFieldDocs collapseTopFieldDocs = (CollapseTopFieldDocs)topDocs;
collapseField = collapseTopFieldDocs.field;
collapseValues = collapseTopFieldDocs.collapseValues;
}
}
//in case we didn't track total hits, we got null from each cluster, and we need to set null to the final response
final TotalHits totalHits = trackTotalHits ? topDocs.totalHits : null;
return new SearchHits(searchHits, totalHits, maxScore, sortFields, collapseField, collapseValues);
}

private static void setShardIndex(Collection<List<FieldDoc>> shardResults) {
//every group of hits comes from a different shard. When hits come from the same index on multiple clusters and same
//shard identifier, we rely on such indices to have a different uuid across multiple clusters.
int i = 0;
for (List<FieldDoc> shardHits : shardResults) {
for (FieldDoc shardHit : shardHits) {
shardHit.shardIndex = i;
}
i++;
}
}

private static final class FieldDocAndSearchHit extends FieldDoc {
private final SearchHit searchHit;

//to simplify things, we use a FieldDoc all the time, even when only a ScoreDoc is needed, in which case fields are null.
FieldDocAndSearchHit(int doc, float score, Object[] fields, SearchHit searchHit) {
super(doc, score, fields);
this.searchHit = searchHit;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -140,7 +141,7 @@ private Map<String, Float> resolveIndexBoosts(SearchRequest searchRequest, Clust
* to moving backwards due to NTP and other such complexities, etc.). There are also issues with
* using a relative clock for reporting real time. Thus, we simply separate these two uses.
*/
static class SearchTimeProvider {
static final class SearchTimeProvider {

private final long absoluteStartMillis;
private final long relativeStartNanos;
Expand Down Expand Up @@ -170,12 +171,8 @@ long getAbsoluteStartMillis() {
return absoluteStartMillis;
}

long getRelativeStartNanos() {
return relativeStartNanos;
}

long getRelativeCurrentNanos() {
return relativeCurrentNanosProvider.getAsLong();
long buildTookInMillis() {
return TimeUnit.NANOSECONDS.toMillis(relativeCurrentNanosProvider.getAsLong() - relativeStartNanos);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public final class SearchHit implements Streamable, ToXContentObject, Iterable<D

}

//used only in tests
public SearchHit(int docId) {
this(docId, null, null, null);
}
Expand Down
Loading