Skip to content

Commit 65601fa

Browse files
opensearch-trigger-bot[bot]github-actions[bot]rishabhmaurya
authored
Optimization in Numeric Terms Aggregation query for Large Bucket Counts (#18702) (#18974)
* optimize num agg using quick select for topN when applicable (cherry picked from commit 130d890) * Updated the numeric term aggregation logic to select topN * Updated the algorithm selection logic * Added a feature flag for the implementation * Added profile debug information * use priority queue method for significant terms * Refactored the selection strategy * Added tests case with proper assertions * Added cluster settings for selection strategy --------- (cherry picked from commit 7db7a5a) Signed-off-by: Rishabh Maurya <[email protected]> Signed-off-by: Vinay Krishna Pudyodu <[email protected]> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: Rishabh Maurya <[email protected]>
1 parent 00c39e9 commit 65601fa

File tree

8 files changed

+430
-33
lines changed

8 files changed

+430
-33
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3535
- Add fetch phase profiling. ([#18664](https://github.com/opensearch-project/OpenSearch/pull/18664))
3636
- Include named queries from rescore contexts in matched_queries array ([#18697](https://github.com/opensearch-project/OpenSearch/pull/18697))
3737
- Add the configurable limit on rule cardinality ([#18663](https://github.com/opensearch-project/OpenSearch/pull/18663))
38+
- Optimization in Numeric Terms Aggregation query for Large Bucket Counts([#18702](https://github.com/opensearch-project/OpenSearch/pull/18702))
3839
- Disable approximation framework when dealing with multiple sorts ([#18763](https://github.com/opensearch-project/OpenSearch/pull/18763))
3940
- [Experimental] Start in "clusterless" mode if a clusterless ClusterPlugin is loaded ([#18479](https://github.com/opensearch-project/OpenSearch/pull/18479))
4041
- [Star-Tree] Add star-tree search related stats ([#18707](https://github.com/opensearch-project/OpenSearch/pull/18707))

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -556,6 +556,7 @@ public void apply(Settings value, Settings current, Settings previous) {
556556
SearchService.MAX_KEEPALIVE_SETTING,
557557
SearchService.ALLOW_EXPENSIVE_QUERIES,
558558
MultiBucketConsumerService.MAX_BUCKET_SETTING,
559+
SearchService.BUCKET_SELECTION_STRATEGY_FACTOR_SETTING,
559560
SearchService.LOW_LEVEL_CANCELLATION_SETTING,
560561
SearchService.MAX_OPEN_SCROLL_CONTEXT,
561562
SearchService.MAX_OPEN_PIT_CONTEXT,

server/src/main/java/org/opensearch/search/DefaultSearchContext.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@
121121
import java.util.function.LongSupplier;
122122

123123
import static org.opensearch.search.SearchService.AGGREGATION_REWRITE_FILTER_SEGMENT_THRESHOLD;
124+
import static org.opensearch.search.SearchService.BUCKET_SELECTION_STRATEGY_FACTOR_SETTING;
124125
import static org.opensearch.search.SearchService.CARDINALITY_AGGREGATION_PRUNING_THRESHOLD;
125126
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE;
126127
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
@@ -215,6 +216,7 @@ final class DefaultSearchContext extends SearchContext {
215216
private final int maxAggRewriteFilters;
216217
private final int filterRewriteSegmentThreshold;
217218
private final int cardinalityAggregationPruningThreshold;
219+
private final int bucketSelectionStrategyFactor;
218220
private final boolean keywordIndexOrDocValuesEnabled;
219221

220222
private final boolean isStreamSearch;
@@ -280,6 +282,7 @@ final class DefaultSearchContext extends SearchContext {
280282
this.maxAggRewriteFilters = evaluateFilterRewriteSetting();
281283
this.filterRewriteSegmentThreshold = evaluateAggRewriteFilterSegThreshold();
282284
this.cardinalityAggregationPruningThreshold = evaluateCardinalityAggregationPruningThreshold();
285+
this.bucketSelectionStrategyFactor = evaluateBucketSelectionStrategyFactor();
283286
this.concurrentSearchDeciderFactories = concurrentSearchDeciderFactories;
284287
this.keywordIndexOrDocValuesEnabled = evaluateKeywordIndexOrDocValuesEnabled();
285288
this.isStreamSearch = isStreamSearch;
@@ -1230,6 +1233,11 @@ public int cardinalityAggregationPruningThreshold() {
12301233
return cardinalityAggregationPruningThreshold;
12311234
}
12321235

1236+
@Override
1237+
public int bucketSelectionStrategyFactor() {
1238+
return bucketSelectionStrategyFactor;
1239+
}
1240+
12331241
@Override
12341242
public boolean keywordIndexOrDocValuesEnabled() {
12351243
return keywordIndexOrDocValuesEnabled;
@@ -1242,6 +1250,13 @@ private int evaluateCardinalityAggregationPruningThreshold() {
12421250
return 0;
12431251
}
12441252

1253+
private int evaluateBucketSelectionStrategyFactor() {
1254+
if (clusterService != null) {
1255+
return clusterService.getClusterSettings().get(BUCKET_SELECTION_STRATEGY_FACTOR_SETTING);
1256+
}
1257+
return SearchService.DEFAULT_BUCKET_SELECTION_STRATEGY_FACTOR;
1258+
}
1259+
12451260
public boolean evaluateKeywordIndexOrDocValuesEnabled() {
12461261
if (clusterService != null) {
12471262
return clusterService.getClusterSettings().get(KEYWORD_INDEX_OR_DOC_VALUES_ENABLED);

server/src/main/java/org/opensearch/search/SearchService.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,16 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
369369
Property.NodeScope
370370
);
371371

372+
public static final int DEFAULT_BUCKET_SELECTION_STRATEGY_FACTOR = 5;
373+
public static final Setting<Integer> BUCKET_SELECTION_STRATEGY_FACTOR_SETTING = Setting.intSetting(
374+
"search.aggregation.bucket_selection_strategy_factor",
375+
DEFAULT_BUCKET_SELECTION_STRATEGY_FACTOR,
376+
0,
377+
10,
378+
Setting.Property.NodeScope,
379+
Setting.Property.Dynamic
380+
);
381+
372382
public static final int DEFAULT_SIZE = 10;
373383
public static final int DEFAULT_FROM = 0;
374384

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.search.aggregations.bucket.terms;
10+
11+
import org.apache.lucene.util.ArrayUtil;
12+
import org.apache.lucene.util.PriorityQueue;
13+
import org.opensearch.search.aggregations.BucketOrder;
14+
import org.opensearch.search.aggregations.InternalMultiBucketAggregation;
15+
import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds;
16+
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds.BucketOrdsEnum;
17+
18+
import java.io.IOException;
19+
import java.util.Arrays;
20+
import java.util.Comparator;
21+
import java.util.Iterator;
22+
import java.util.function.Supplier;
23+
24+
import static org.opensearch.search.aggregations.InternalOrder.isKeyOrder;
25+
26+
/**
27+
* Strategy for selecting top buckets from aggregation results.
28+
*
29+
*/
30+
enum BucketSelectionStrategy {
31+
PRIORITY_QUEUE {
32+
@Override
33+
public <B extends InternalMultiBucketAggregation.InternalBucket> SelectionResult<B> selectTopBuckets(SelectionInput<B> input)
34+
throws IOException {
35+
PriorityQueue<B> ordered = input.buildPriorityQueue.buildPriorityQueue(input.size);
36+
B spare = null;
37+
long otherDocCount = 0;
38+
39+
while (input.ordsEnum.next()) {
40+
long docCount = input.bucketDocCountFunction.bucketDocCount(input.ordsEnum.ord());
41+
otherDocCount += docCount;
42+
if (docCount < input.localBucketCountThresholds.getMinDocCount()) {
43+
continue;
44+
}
45+
if (spare == null) {
46+
spare = input.emptyBucketBuilder.get();
47+
}
48+
input.bucketUpdateFunction.updateBucket(spare, input.ordsEnum, docCount);
49+
spare = ordered.insertWithOverflow(spare);
50+
}
51+
52+
B[] topBuckets = input.bucketArrayBuilder.buildBuckets(ordered.size());
53+
if (isKeyOrder(input.order)) {
54+
for (int b = ordered.size() - 1; b >= 0; --b) {
55+
topBuckets[b] = ordered.pop();
56+
otherDocCount -= topBuckets[b].getDocCount();
57+
}
58+
} else {
59+
Iterator<B> itr = ordered.iterator();
60+
for (int b = ordered.size() - 1; b >= 0; --b) {
61+
topBuckets[b] = itr.next();
62+
otherDocCount -= topBuckets[b].getDocCount();
63+
}
64+
}
65+
66+
return new SelectionResult<>(topBuckets, otherDocCount, "priority_queue");
67+
}
68+
},
69+
70+
QUICK_SELECT_OR_SELECT_ALL {
71+
@Override
72+
public <B extends InternalMultiBucketAggregation.InternalBucket> SelectionResult<B> selectTopBuckets(SelectionInput<B> input)
73+
throws IOException {
74+
B[] bucketsForOrd = input.bucketArrayBuilder.buildBuckets((int) input.bucketsInOrd);
75+
int validBucketCount = 0;
76+
long otherDocCount = 0;
77+
78+
// Collect all valid buckets
79+
while (input.ordsEnum.next()) {
80+
long docCount = input.bucketDocCountFunction.bucketDocCount(input.ordsEnum.ord());
81+
otherDocCount += docCount;
82+
if (docCount < input.localBucketCountThresholds.getMinDocCount()) {
83+
continue;
84+
}
85+
86+
B spare = input.emptyBucketBuilder.get();
87+
input.bucketUpdateFunction.updateBucket(spare, input.ordsEnum, docCount);
88+
bucketsForOrd[validBucketCount++] = spare;
89+
}
90+
91+
B[] topBuckets;
92+
String actualStrategy;
93+
if (validBucketCount > input.size) {
94+
ArrayUtil.select(
95+
bucketsForOrd,
96+
0,
97+
validBucketCount,
98+
input.size,
99+
(b1, b2) -> input.partiallyBuiltBucketComparator.compare((InternalTerms.Bucket<?>) b1, (InternalTerms.Bucket<?>) b2)
100+
);
101+
topBuckets = Arrays.copyOf(bucketsForOrd, input.size);
102+
for (int b = 0; b < input.size; b++) {
103+
otherDocCount -= topBuckets[b].getDocCount();
104+
}
105+
actualStrategy = "quick_select";
106+
} else {
107+
// Return all buckets (no selection needed)
108+
topBuckets = Arrays.copyOf(bucketsForOrd, validBucketCount);
109+
otherDocCount = 0L;
110+
actualStrategy = "select_all";
111+
}
112+
113+
return new SelectionResult<>(topBuckets, otherDocCount, actualStrategy);
114+
}
115+
};
116+
117+
public static BucketSelectionStrategy determine(
118+
int size,
119+
long bucketsInOrd,
120+
BucketOrder order,
121+
Comparator<InternalTerms.Bucket<?>> partiallyBuiltBucketComparator,
122+
int factor
123+
) {
124+
/*
125+
We select the strategy based on the following condition with configurable threshold factor:
126+
case 1: size is less than 20% of bucketsInOrd: PRIORITY_QUEUE
127+
case 2: size is greater than 20% of bucketsInOrd: QUICK_SELECT
128+
case 3: size == bucketsInOrd : return all buckets
129+
case 2 and 3 are encapsulated in QUICK_SELECT_OR_SELECT_ALL method.
130+
131+
Along with the above conditions, we also go with the original PRIORITY_QUEUE based approach
132+
if isKeyOrder or its significant term aggregation.
133+
134+
if factor is 0, always use PRIORITY_QUEUE strategy (since 0 < bucketsInOrd is always true).
135+
*/
136+
if (((long) size * factor < bucketsInOrd) || isKeyOrder(order) || partiallyBuiltBucketComparator == null) {
137+
return PRIORITY_QUEUE;
138+
} else {
139+
return QUICK_SELECT_OR_SELECT_ALL;
140+
}
141+
}
142+
143+
public abstract <B extends InternalMultiBucketAggregation.InternalBucket> SelectionResult<B> selectTopBuckets(SelectionInput<B> input)
144+
throws IOException;
145+
146+
/**
147+
* Represents the inputs for strategy execution to select buckets
148+
*/
149+
public static class SelectionInput<B extends InternalMultiBucketAggregation.InternalBucket> {
150+
public final int size;
151+
public final long bucketsInOrd;
152+
public final BucketOrdsEnum ordsEnum;
153+
public final Supplier<B> emptyBucketBuilder;
154+
public final LocalBucketCountThresholds localBucketCountThresholds;
155+
public final int ordIdx;
156+
public final BucketOrder order;
157+
public final PriorityQueueBuilder<B> buildPriorityQueue;
158+
public final BucketArrayBuilder<B> bucketArrayBuilder;
159+
public final BucketUpdateFunction<B> bucketUpdateFunction;
160+
public final BucketDocCountFunction bucketDocCountFunction;
161+
public final Comparator<InternalTerms.Bucket<?>> partiallyBuiltBucketComparator;
162+
163+
public SelectionInput(
164+
int size,
165+
long bucketsInOrd,
166+
BucketOrdsEnum ordsEnum,
167+
Supplier<B> emptyBucketBuilder,
168+
LocalBucketCountThresholds localBucketCountThresholds,
169+
int ordIdx,
170+
BucketOrder order,
171+
PriorityQueueBuilder<B> buildPriorityQueue,
172+
BucketArrayBuilder<B> bucketArrayBuilder,
173+
BucketUpdateFunction<B> bucketUpdateFunction,
174+
BucketDocCountFunction bucketDocCountFunction,
175+
Comparator<InternalTerms.Bucket<?>> partiallyBuiltBucketComparator
176+
) {
177+
this.size = size;
178+
this.bucketsInOrd = bucketsInOrd;
179+
this.ordsEnum = ordsEnum;
180+
this.emptyBucketBuilder = emptyBucketBuilder;
181+
this.localBucketCountThresholds = localBucketCountThresholds;
182+
this.ordIdx = ordIdx;
183+
this.order = order;
184+
this.buildPriorityQueue = buildPriorityQueue;
185+
this.bucketArrayBuilder = bucketArrayBuilder;
186+
this.bucketUpdateFunction = bucketUpdateFunction;
187+
this.bucketDocCountFunction = bucketDocCountFunction;
188+
this.partiallyBuiltBucketComparator = partiallyBuiltBucketComparator;
189+
}
190+
}
191+
192+
/**
193+
* Represents the results strategy execution to select buckets
194+
*/
195+
public static class SelectionResult<B extends InternalMultiBucketAggregation.InternalBucket> {
196+
public final B[] topBuckets;
197+
public final long otherDocCount;
198+
public final String actualStrategyUsed;
199+
200+
public SelectionResult(B[] topBuckets, long otherDocCount, String actualStrategyUsed) {
201+
this.topBuckets = topBuckets;
202+
this.otherDocCount = otherDocCount;
203+
this.actualStrategyUsed = actualStrategyUsed;
204+
}
205+
}
206+
207+
/**
208+
* Interface for bucketDocCount method
209+
*/
210+
@FunctionalInterface
211+
public interface BucketDocCountFunction {
212+
long bucketDocCount(long ord) throws IOException;
213+
}
214+
215+
/**
216+
* Interface for updateBucket method
217+
*/
218+
@FunctionalInterface
219+
public interface BucketUpdateFunction<B> {
220+
void updateBucket(B spare, BucketOrdsEnum ordsEnum, long docCount) throws IOException;
221+
}
222+
223+
/**
224+
* Interface for buildBuckets method
225+
*/
226+
@FunctionalInterface
227+
public interface BucketArrayBuilder<B> {
228+
B[] buildBuckets(int size);
229+
}
230+
231+
/**
232+
* Interface for buildPriorityQueue method
233+
*/
234+
@FunctionalInterface
235+
public interface PriorityQueueBuilder<B> {
236+
PriorityQueue<B> buildPriorityQueue(int size);
237+
}
238+
}

0 commit comments

Comments
 (0)