Skip to content

Commit f7b31b0

Browse files
committed
Merge from main
Signed-off-by: Harsha Vamsi Kalluri <[email protected]>
1 parent 9ff84a8 commit f7b31b0

File tree

10 files changed

+316
-19
lines changed

10 files changed

+316
-19
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
5757
- Fix Unified highlighter for nested fields when using matchPhrasePrefixQuery ([#19442](https://github.com/opensearch-project/OpenSearch/pull/19442))
5858
- Add S3Repository.LEGACY_MD5_CHECKSUM_CALCULATION to list of repository-s3 settings ([#19788](https://github.com/opensearch-project/OpenSearch/pull/19788))
5959
- Fix NPE of ScriptScoreQuery ([#19650](https://github.com/opensearch-project/OpenSearch/pull/19650))
60+
- Fix `addEmptyBuckets` while reducing histograms from consuming too much memory and tripping CircuitBreaker ([#17718](https://github.com/opensearch-project/OpenSearch/pull/17718))
6061

6162
### Dependencies
6263
- Update to Gradle 9.2 ([#19575](https://github.com/opensearch-project/OpenSearch/pull/19575)) ([#19856](https://github.com/opensearch-project/OpenSearch/pull/19856))

rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/10_histogram.yml

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -682,3 +682,74 @@ setup:
682682
- match: { profile.shards.0.aggregations.0.debug.unoptimized_segments: 0 }
683683
- match: { profile.shards.0.aggregations.0.debug.leaf_visited: 1 }
684684
- match: { profile.shards.0.aggregations.0.debug.inner_visited: 0 }
685+
686+
---
687+
"histogram creating empty buckets":
688+
- skip:
689+
version: " - 2.99.99"
690+
reason: fix currently only in 3.0
691+
692+
- do:
693+
indices.create:
694+
index: test_cb
695+
body:
696+
settings:
697+
number_of_replicas: 0
698+
number_of_shards: 1
699+
refresh_interval: -1
700+
mappings:
701+
properties:
702+
number:
703+
type: integer
704+
705+
- do:
706+
bulk:
707+
index: test_cb
708+
refresh: true
709+
body:
710+
- '{"index": {}}'
711+
- '{"number": 1}'
712+
- '{"index": {}}'
713+
- '{"number": 500}'
714+
- '{"index": {}}'
715+
- '{"number": 5000}'
716+
- '{"index": {}}'
717+
- '{"number": 500000}'
718+
719+
- do:
720+
catch: /circuit_breaking_exception/
721+
search:
722+
index: test_cb
723+
body:
724+
size: 0
725+
aggs:
726+
histo:
727+
histogram:
728+
field: number
729+
interval: 10
730+
extended_bounds:
731+
min: 0
732+
max: 174155895372
733+
734+
- match: { error.type: "search_phase_execution_exception" }
735+
- match: { error.caused_by.type: "circuit_breaking_exception"}
736+
- match: { status: 429 }
737+
738+
- do:
739+
catch: /too_many_buckets_exception/
740+
search:
741+
index: test_cb
742+
body:
743+
size: 0
744+
aggs:
745+
histo:
746+
histogram:
747+
field: number
748+
interval: 100000
749+
extended_bounds:
750+
min: 0
751+
max: 174155895372
752+
753+
- match: { error.type: "search_phase_execution_exception" }
754+
- match: { error.caused_by.type: "too_many_buckets_exception"}
755+
- match: { status: 503 }

rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/360_date_histogram.yml

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,3 +160,55 @@ setup:
160160
- match: { profile.shards.0.aggregations.0.debug.unoptimized_segments: 0 }
161161
- match: { profile.shards.0.aggregations.0.debug.leaf_visited: 0 }
162162
- match: { profile.shards.0.aggregations.0.debug.inner_visited: 0 }
163+
164+
---
165+
"date_histogram creating empty buckets":
166+
- skip:
167+
version: " - 2.99.99"
168+
reason: fix currently only in 3.0
169+
170+
- do:
171+
indices.create:
172+
index: test_cb
173+
body:
174+
settings:
175+
number_of_replicas: 0
176+
number_of_shards: 1
177+
refresh_interval: -1
178+
mappings:
179+
properties:
180+
date:
181+
type: date
182+
183+
- do:
184+
bulk:
185+
index: test_cb
186+
refresh: true
187+
body:
188+
- '{"index": {}}'
189+
- '{"date": "2016-01-01"}'
190+
- '{"index": {}}'
191+
- '{"date": "2016-01-02"}'
192+
- '{"index": {}}'
193+
- '{"date": "2016-02-01"}'
194+
- '{"index": {}}'
195+
- '{"date": "2016-03-01"}'
196+
197+
- do:
198+
catch: /circuit_breaking_exception/
199+
search:
200+
index: test_cb
201+
body:
202+
size: 0
203+
aggs:
204+
histo:
205+
date_histogram:
206+
field: date
207+
interval: 10s
208+
extended_bounds:
209+
min: 0
210+
max: 174155895372
211+
212+
- match: { error.type: "search_phase_execution_exception" }
213+
- match: { error.caused_by.type: "circuit_breaking_exception"}
214+
- match: { status: 429 }

server/src/main/java/org/opensearch/search/SearchService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1981,7 +1981,8 @@ public ReduceContext forFinalReduction() {
19811981
bigArrays,
19821982
scriptService,
19831983
multiBucketConsumerService.create(),
1984-
pipelineTree
1984+
pipelineTree,
1985+
multiBucketConsumerService.getBreaker()
19851986
);
19861987
}
19871988
};

server/src/main/java/org/opensearch/search/aggregations/InternalAggregation.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.opensearch.common.annotation.PublicApi;
3535
import org.opensearch.common.util.BigArrays;
3636
import org.opensearch.core.common.Strings;
37+
import org.opensearch.core.common.breaker.CircuitBreaker;
3738
import org.opensearch.core.common.io.stream.NamedWriteable;
3839
import org.opensearch.core.common.io.stream.StreamInput;
3940
import org.opensearch.core.common.io.stream.StreamOutput;
@@ -97,6 +98,7 @@ public static class ReduceContext {
9798
private final PipelineTree pipelineTreeRoot;
9899

99100
private boolean isSliceLevel;
101+
private final CircuitBreaker breaker;
100102
/**
101103
* Supplies the pipelines when the result of the reduce is serialized
102104
* to node versions that need pipeline aggregators to be serialized
@@ -112,13 +114,30 @@ public static ReduceContext forPartialReduction(
112114
ScriptService scriptService,
113115
Supplier<PipelineTree> pipelineTreeForBwcSerialization
114116
) {
115-
return new ReduceContext(bigArrays, scriptService, (s) -> {}, null, pipelineTreeForBwcSerialization);
117+
return new ReduceContext(bigArrays, scriptService, (s) -> {}, null, pipelineTreeForBwcSerialization, null);
116118
}
117119

118120
/**
119121
* Build a {@linkplain ReduceContext} to perform the final reduction.
120122
* @param pipelineTreeRoot The root of tree of pipeline aggregations for this request
121123
*/
124+
public static ReduceContext forFinalReduction(
125+
BigArrays bigArrays,
126+
ScriptService scriptService,
127+
IntConsumer multiBucketConsumer,
128+
PipelineTree pipelineTreeRoot,
129+
CircuitBreaker breaker
130+
) {
131+
return new ReduceContext(
132+
bigArrays,
133+
scriptService,
134+
multiBucketConsumer,
135+
requireNonNull(pipelineTreeRoot, "prefer EMPTY to null"),
136+
() -> pipelineTreeRoot,
137+
breaker
138+
);
139+
}
140+
122141
public static ReduceContext forFinalReduction(
123142
BigArrays bigArrays,
124143
ScriptService scriptService,
@@ -130,7 +149,8 @@ public static ReduceContext forFinalReduction(
130149
scriptService,
131150
multiBucketConsumer,
132151
requireNonNull(pipelineTreeRoot, "prefer EMPTY to null"),
133-
() -> pipelineTreeRoot
152+
() -> pipelineTreeRoot,
153+
null
134154
);
135155
}
136156

@@ -139,14 +159,16 @@ private ReduceContext(
139159
ScriptService scriptService,
140160
IntConsumer multiBucketConsumer,
141161
PipelineTree pipelineTreeRoot,
142-
Supplier<PipelineTree> pipelineTreeForBwcSerialization
162+
Supplier<PipelineTree> pipelineTreeForBwcSerialization,
163+
CircuitBreaker breaker
143164
) {
144165
this.bigArrays = bigArrays;
145166
this.scriptService = scriptService;
146167
this.multiBucketConsumer = multiBucketConsumer;
147168
this.pipelineTreeRoot = pipelineTreeRoot;
148169
this.pipelineTreeForBwcSerialization = pipelineTreeForBwcSerialization;
149170
this.isSliceLevel = false;
171+
this.breaker = breaker;
150172
}
151173

152174
/**
@@ -210,6 +232,9 @@ public void consumeBucketsAndMaybeBreak(int size) {
210232
multiBucketConsumer.accept(size);
211233
}
212234

235+
public CircuitBreaker getBreaker() {
236+
return breaker;
237+
}
213238
}
214239

215240
protected final String name;

server/src/main/java/org/opensearch/search/aggregations/MultiBucketConsumerService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,4 +218,9 @@ public int getLimit() {
218218
public MultiBucketConsumer create() {
219219
return new MultiBucketConsumer(maxBucket, breaker);
220220
}
221+
222+
public CircuitBreaker getBreaker() {
223+
return breaker;
224+
}
225+
221226
}

server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/InternalDateHistogram.java

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.lucene.util.CollectionUtil;
3535
import org.apache.lucene.util.PriorityQueue;
3636
import org.opensearch.common.Rounding;
37+
import org.opensearch.core.common.breaker.CircuitBreaker;
3738
import org.opensearch.core.common.io.stream.StreamInput;
3839
import org.opensearch.core.common.io.stream.StreamOutput;
3940
import org.opensearch.core.xcontent.XContentBuilder;
@@ -58,6 +59,9 @@
5859
import java.util.Map;
5960
import java.util.Objects;
6061

62+
import static java.lang.Math.max;
63+
import static java.lang.Math.min;
64+
6165
/**
6266
* Implementation of {@link Histogram}.
6367
*
@@ -322,7 +326,7 @@ public Bucket createBucket(InternalAggregations aggregations, Bucket prototype)
322326
return new Bucket(prototype.key, prototype.docCount, prototype.keyed, prototype.format, aggregations);
323327
}
324328

325-
private List<Bucket> reduceBuckets(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
329+
List<Bucket> reduceBuckets(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
326330
final PriorityQueue<IteratorAndCurrent<Bucket>> pq = new PriorityQueue<IteratorAndCurrent<Bucket>>(aggregations.size()) {
327331
@Override
328332
protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Bucket> b) {
@@ -394,9 +398,48 @@ protected Bucket reduceBucket(List<Bucket> buckets, ReduceContext context) {
394398
return createBucket(buckets.get(0).key, docCount, aggs);
395399
}
396400

397-
private void addEmptyBuckets(List<Bucket> list, ReduceContext reduceContext) {
401+
private int estimateTotalBucketCount(List<Bucket> list) {
402+
LongBounds bounds = emptyBucketInfo.bounds;
403+
int bucketCount = 0;
404+
if (bounds != null && bounds.getMin() != null && bounds.getMax() != null) {
405+
long min = min(bounds.getMin() + offset, list.getFirst().key);
406+
long max = max(bounds.getMax() + offset, list.getLast().key);
407+
long intervalWidth = 0;
408+
int i = 0;
409+
long key = min;
410+
while (key < max && i++ < 10) {
411+
bucketCount++;
412+
long nextKey = nextKey(key).longValue();
413+
intervalWidth = max(intervalWidth, nextKey - key);
414+
key = nextKey;
415+
}
416+
if (bucketCount < 10) {
417+
return bucketCount;
418+
}
419+
long estimatedBuckets = Math.round(Math.ceil((double) (max - min) / intervalWidth));
420+
if (estimatedBuckets > Integer.MAX_VALUE) {
421+
return Integer.MAX_VALUE;
422+
}
423+
return (int) estimatedBuckets;
424+
}
425+
return list.size();
426+
}
427+
428+
void addEmptyBuckets(List<Bucket> list, ReduceContext reduceContext) {
398429
Bucket lastBucket = null;
399430
LongBounds bounds = emptyBucketInfo.bounds;
431+
final int originalSize = list.size();
432+
// we use counts here only to add those values to the CircuitBreaker, list's count has already been added in #reduce, so we only
433+
// need to add emptyBucketCount
434+
final int estimateEmptyBucketCount = estimateTotalBucketCount(list) - originalSize;
435+
assert estimateEmptyBucketCount >= 0;
436+
CircuitBreaker breaker = reduceContext.getBreaker();
437+
if (breaker != null) {
438+
// 50 bytes memory usage for each empty bucket
439+
breaker.addEstimateBytesAndMaybeBreak(50L * estimateEmptyBucketCount, "empty date histogram buckets");
440+
}
441+
reduceContext.consumeBucketsAndMaybeBreak(estimateEmptyBucketCount);
442+
400443
ListIterator<Bucket> iter = list.listIterator();
401444

402445
// first adding all the empty buckets *before* the actual data (based on th extended_bounds.min the user requested)
@@ -452,11 +495,16 @@ private void addEmptyBuckets(List<Bucket> list, ReduceContext reduceContext) {
452495
key = nextKey(key).longValue();
453496
}
454497
}
498+
int postAddEmptyBucketCount = list.size() - estimateEmptyBucketCount - originalSize;
499+
if (postAddEmptyBucketCount > 0) {
500+
reduceContext.consumeBucketsAndMaybeBreak(postAddEmptyBucketCount);
501+
}
455502
}
456503

457504
@Override
458505
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
459506
List<Bucket> reducedBuckets = reduceBuckets(aggregations, reduceContext);
507+
reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size());
460508
if (reduceContext.isFinalReduce()) {
461509
if (minDocCount == 0) {
462510
addEmptyBuckets(reducedBuckets, reduceContext);
@@ -473,7 +521,6 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
473521
CollectionUtil.introSort(reducedBuckets, order.comparator());
474522
}
475523
}
476-
reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size());
477524
return new InternalDateHistogram(
478525
getName(),
479526
reducedBuckets,

0 commit comments

Comments
 (0)