Skip to content

Commit b93e4c1

Browse files
committed
fix sub-aggregator casting when used with profile=true
Signed-off-by: Sandesh Kumar <[email protected]>
1 parent 387e663 commit b93e4c1

File tree

10 files changed

+38
-34
lines changed

10 files changed

+38
-34
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2525
- Fix flaky test FieldDataLoadingIT.testIndicesFieldDataCacheSizeSetting ([#19571](https://github.com/opensearch-project/OpenSearch/pull/19571))
2626
- Avoid primary shard failure caused by merged segment warmer exceptions ([#19436](https://github.com/opensearch-project/OpenSearch/pull/19436))
2727
- Fix pull-based ingestion out-of-bounds offset scenarios and remove persisted offsets ([#19607](https://github.com/opensearch-project/OpenSearch/pull/19607))
28+
- [Star Tree] Fix sub-aggregator casting for search with profile=true ([19652](https://github.com/opensearch-project/OpenSearch/pull/19652))
2829

2930
### Dependencies
3031
- Update to Gradle 9.1 ([#19575](https://github.com/opensearch-project/OpenSearch/pull/19575))

server/src/main/java/org/opensearch/search/aggregations/Aggregator.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,15 @@ public BucketComparator bucketComparator(String key, SortOrder order) {
170170
);
171171
}
172172

173+
/**
174+
* Returns the underlying Aggregator responsible for creating the bucket collector.
175+
* For most aggregators, this is the aggregator itself.
176+
* For wrappers like ProfilingAggregator, it's the delegate.
177+
*/
178+
public Aggregator unwrapAggregator() {
179+
return this;
180+
}
181+
173182
/**
174183
* Compare two buckets by their ordinal.
175184
*

server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,9 @@ public StarTreeBucketCollector getStarTreeBucketCollector(
335335
@Override
336336
public void setSubCollectors() throws IOException {
337337
for (Aggregator aggregator : subAggregators) {
338-
this.subCollectors.add(((StarTreePreComputeCollector) aggregator).getStarTreeBucketCollector(ctx, starTree, this));
338+
this.subCollectors.add(
339+
((StarTreePreComputeCollector) aggregator.unwrapAggregator()).getStarTreeBucketCollector(ctx, starTree, this)
340+
);
339341
}
340342
}
341343

server/src/main/java/org/opensearch/search/aggregations/bucket/range/RangeAggregator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,9 @@ public StarTreeBucketCollector getStarTreeBucketCollector(
401401
@Override
402402
public void setSubCollectors() throws IOException {
403403
for (Aggregator aggregator : subAggregators) {
404-
this.subCollectors.add(((StarTreePreComputeCollector) aggregator).getStarTreeBucketCollector(ctx, starTree, this));
404+
this.subCollectors.add(
405+
((StarTreePreComputeCollector) aggregator.unwrapAggregator()).getStarTreeBucketCollector(ctx, starTree, this)
406+
);
405407
}
406408
}
407409

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,9 @@ public StarTreeBucketCollector getStarTreeBucketCollector(
361361
@Override
362362
public void setSubCollectors() throws IOException {
363363
for (Aggregator aggregator : subAggregators) {
364-
this.subCollectors.add(((StarTreePreComputeCollector) aggregator).getStarTreeBucketCollector(ctx, starTree, this));
364+
this.subCollectors.add(
365+
((StarTreePreComputeCollector) aggregator.unwrapAggregator()).getStarTreeBucketCollector(ctx, starTree, this)
366+
);
365367
}
366368
}
367369

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -311,9 +311,9 @@ public StarTreeBucketCollector getStarTreeBucketCollector(
311311
@Override
312312
public void setSubCollectors() throws IOException {
313313
for (Aggregator aggregator : subAggregators) {
314-
if (aggregator instanceof StarTreePreComputeCollector collector) {
315-
this.subCollectors.add(collector.getStarTreeBucketCollector(ctx, starTree, this));
316-
}
314+
this.subCollectors.add(
315+
((StarTreePreComputeCollector) aggregator.unwrapAggregator()).getStarTreeBucketCollector(ctx, starTree, this)
316+
);
317317
}
318318
}
319319

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,9 @@ public StarTreeBucketCollector getStarTreeBucketCollector(
191191
@Override
192192
public void setSubCollectors() throws IOException {
193193
for (Aggregator aggregator : subAggregators) {
194-
this.subCollectors.add(((StarTreePreComputeCollector) aggregator).getStarTreeBucketCollector(ctx, starTree, this));
194+
this.subCollectors.add(
195+
((StarTreePreComputeCollector) aggregator.unwrapAggregator()).getStarTreeBucketCollector(ctx, starTree, this)
196+
);
195197
}
196198
}
197199

server/src/main/java/org/opensearch/search/aggregations/support/AggregationPath.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import org.opensearch.search.aggregations.InternalAggregations;
4141
import org.opensearch.search.aggregations.bucket.SingleBucketAggregator;
4242
import org.opensearch.search.aggregations.metrics.NumericMetricsAggregator;
43-
import org.opensearch.search.profile.aggregation.ProfilingAggregator;
4443
import org.opensearch.search.sort.SortOrder;
4544

4645
import java.util.ArrayList;
@@ -236,7 +235,7 @@ public Aggregator resolveAggregator(Aggregator root) {
236235
public Aggregator resolveTopmostAggregator(Aggregator root) {
237236
AggregationPath.PathElement token = pathElements.get(0);
238237
// TODO both unwrap and subAggregator are only used here!
239-
Aggregator aggregator = ProfilingAggregator.unwrap(root.subAggregator(token.name));
238+
Aggregator aggregator = root.subAggregator(token.name).unwrapAggregator();
240239
assert (aggregator instanceof SingleBucketAggregator) || (aggregator instanceof NumericMetricsAggregator)
241240
: "this should be picked up before aggregation execution - on validate";
242241
return aggregator;

server/src/main/java/org/opensearch/search/profile/aggregation/ProfilingAggregator.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public class ProfilingAggregator extends Aggregator implements Streamable {
5656
private final AggregationProfiler profiler;
5757
private AggregationProfileBreakdown profileBreakdown;
5858

59-
public ProfilingAggregator(Aggregator delegate, AggregationProfiler profiler) throws IOException {
59+
public ProfilingAggregator(Aggregator delegate, AggregationProfiler profiler) {
6060
this.profiler = profiler;
6161
this.delegate = delegate;
6262
}
@@ -164,19 +164,12 @@ public String toString() {
164164
return delegate.toString();
165165
}
166166

167-
public static Aggregator unwrap(Aggregator agg) {
168-
if (agg instanceof ProfilingAggregator) {
169-
return ((ProfilingAggregator) agg).delegate;
170-
}
171-
return agg;
172-
}
173-
174-
public Aggregator getDelegate() {
175-
return delegate;
176-
}
177-
178167
@Override
179168
public StreamingCostMetrics getStreamingCostMetrics() {
180169
return delegate instanceof Streamable ? ((Streamable) delegate).getStreamingCostMetrics() : StreamingCostMetrics.nonStreamable();
181170
}
171+
172+
public Aggregator unwrapAggregator() {
173+
return delegate;
174+
}
182175
}

server/src/main/java/org/opensearch/search/streaming/FlushModeResolver.java

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -139,19 +139,13 @@ private static StreamingCostMetrics collectMetrics(Collector collector) {
139139
}
140140

141141
private static Collector[] getChildren(Collector collector) {
142-
if (collector instanceof AggregatorBase) {
143-
return ((AggregatorBase) collector).subAggregators();
144-
}
145-
if (collector instanceof MultiCollector) {
146-
return ((MultiCollector) collector).getCollectors();
147-
}
148-
if (collector instanceof MultiBucketCollector) {
149-
return ((MultiBucketCollector) collector).getCollectors();
150-
}
151-
if (collector instanceof ProfilingAggregator) {
152-
return getChildren(((ProfilingAggregator) collector).getDelegate());
153-
}
154-
return new Collector[0];
142+
return switch (collector) {
143+
case AggregatorBase aggregatorBase -> aggregatorBase.subAggregators();
144+
case MultiCollector multiCollector -> multiCollector.getCollectors();
145+
case MultiBucketCollector multiBucketCollector -> multiBucketCollector.getCollectors();
146+
case ProfilingAggregator profilingAggregator -> getChildren(profilingAggregator.unwrapAggregator());
147+
default -> new Collector[0];
148+
};
155149
}
156150

157151
/**

0 commit comments

Comments
 (0)