Skip to content

Commit 350a0f5

Browse files
bowenlan-amznPeter Alfonsi
authored andcommitted
Apply fast date histogram optimization at the segment level (opensearch-project#12073)
--------- Signed-off-by: bowenlan-amzn <[email protected]>
1 parent 5fa068e commit 350a0f5

File tree

8 files changed

+624
-180
lines changed

8 files changed

+624
-180
lines changed

server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/DateHistogramIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,9 +177,9 @@ public void setupSuiteScopeCluster() throws Exception {
177177
indexDoc(2, 15, 3), // date: Feb 15, dates: Feb 15, Mar 16
178178
indexDoc(3, 2, 4), // date: Mar 2, dates: Mar 2, Apr 3
179179
indexDoc(3, 15, 5), // date: Mar 15, dates: Mar 15, Apr 16
180-
indexDoc(3, 23, 6)
180+
indexDoc(3, 23, 6) // date: Mar 23, dates: Mar 23, Apr 24
181181
)
182-
); // date: Mar 23, dates: Mar 23, Apr 24
182+
);
183183
indexRandom(true, builders);
184184
ensureSearchable();
185185
}

server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java

Lines changed: 237 additions & 123 deletions
Large diffs are not rendered by default.

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java

Lines changed: 46 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -164,24 +164,55 @@ final class CompositeAggregator extends BucketsAggregator {
164164
this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey);
165165
this.rawAfterKey = rawAfterKey;
166166

167-
fastFilterContext = new FastFilterRewriteHelper.FastFilterContext();
167+
fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(context);
168168
if (!FastFilterRewriteHelper.isCompositeAggRewriteable(sourceConfigs)) return;
169-
fastFilterContext.setAggregationType(
170-
new FastFilterRewriteHelper.CompositeAggregationType(sourceConfigs, rawAfterKey, formats, size)
171-
);
169+
fastFilterContext.setAggregationType(new CompositeAggregationType());
172170
if (fastFilterContext.isRewriteable(parent, subAggregators.length)) {
173-
// bucketOrds is the data structure for saving date histogram results
171+
// bucketOrds is used for saving date histogram results
174172
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE);
175-
// Currently the filter rewrite is only supported for date histograms
176-
FastFilterRewriteHelper.CompositeAggregationType aggregationType =
177-
(FastFilterRewriteHelper.CompositeAggregationType) fastFilterContext.aggregationType;
178-
preparedRounding = aggregationType.getRoundingPreparer();
179-
fastFilterContext.buildFastFilter(
180-
context,
181-
fc -> FastFilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name()),
182-
x -> aggregationType.getRounding(),
183-
() -> preparedRounding
184-
);
173+
preparedRounding = ((CompositeAggregationType) fastFilterContext.getAggregationType()).getRoundingPrepared();
174+
fastFilterContext.buildFastFilter();
175+
}
176+
}
177+
178+
/**
179+
* Currently the filter rewrite is only supported for date histograms
180+
*/
181+
private class CompositeAggregationType extends FastFilterRewriteHelper.AbstractDateHistogramAggregationType {
182+
private final RoundingValuesSource valuesSource;
183+
private long afterKey = -1L;
184+
185+
public CompositeAggregationType() {
186+
super(sourceConfigs[0].fieldType(), sourceConfigs[0].missingBucket(), sourceConfigs[0].hasScript());
187+
this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource();
188+
if (rawAfterKey != null) {
189+
assert rawAfterKey.size() == 1 && formats.size() == 1;
190+
this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> {
191+
throw new IllegalArgumentException("now() is not supported in [after] key");
192+
});
193+
}
194+
}
195+
196+
public Rounding getRounding(final long low, final long high) {
197+
return valuesSource.getRounding();
198+
}
199+
200+
public Rounding.Prepared getRoundingPrepared() {
201+
return valuesSource.getPreparedRounding();
202+
}
203+
204+
@Override
205+
protected void processAfterKey(long[] bound, long interval) {
206+
// afterKey is the last bucket key in previous response, and the bucket key
207+
// is the minimum of all values in the bucket, so need to add the interval
208+
if (afterKey != -1L) {
209+
bound[0] = afterKey + interval;
210+
}
211+
}
212+
213+
@Override
214+
public int getSize() {
215+
return size;
185216
}
186217
}
187218

server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.opensearch.common.util.IntArray;
4343
import org.opensearch.common.util.LongArray;
4444
import org.opensearch.core.common.util.ByteArray;
45+
import org.opensearch.index.mapper.MappedFieldType;
4546
import org.opensearch.search.DocValueFormat;
4647
import org.opensearch.search.aggregations.Aggregator;
4748
import org.opensearch.search.aggregations.AggregatorFactories;
@@ -156,45 +157,53 @@ private AutoDateHistogramAggregator(
156157
this.roundingPreparer = roundingPreparer;
157158
this.preparedRounding = prepareRounding(0);
158159

159-
fastFilterContext = new FastFilterRewriteHelper.FastFilterContext();
160+
fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(context);
160161
fastFilterContext.setAggregationType(
161-
new FastFilterRewriteHelper.DateHistogramAggregationType(
162+
new AutoHistogramAggregationType(
162163
valuesSourceConfig.fieldType(),
163164
valuesSourceConfig.missing() != null,
164165
valuesSourceConfig.script() != null
165166
)
166167
);
167168
if (fastFilterContext.isRewriteable(parent, subAggregators.length)) {
168-
fastFilterContext.buildFastFilter(
169-
context,
170-
fc -> FastFilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name()),
171-
b -> getMinimumRounding(b[0], b[1]),
172-
// Passing prepared rounding as supplier to ensure the correct prepared
173-
// rounding is set as it is done during getMinimumRounding
174-
() -> preparedRounding
175-
);
169+
fastFilterContext.buildFastFilter();
176170
}
177171
}
178172

179-
private Rounding getMinimumRounding(final long low, final long high) {
180-
// max - min / targetBuckets = bestDuration
181-
// find the right innerInterval this bestDuration belongs to
182-
// since we cannot exceed targetBuckets, bestDuration should go up,
183-
// so the right innerInterval should be an upper bound
184-
long bestDuration = (high - low) / targetBuckets;
185-
while (roundingIdx < roundingInfos.length - 1) {
186-
final RoundingInfo curRoundingInfo = roundingInfos[roundingIdx];
187-
final int temp = curRoundingInfo.innerIntervals[curRoundingInfo.innerIntervals.length - 1];
188-
// If the interval duration is covered by the maximum inner interval,
189-
// we can start with this outer interval for creating the buckets
190-
if (bestDuration <= temp * curRoundingInfo.roughEstimateDurationMillis) {
191-
break;
173+
private class AutoHistogramAggregationType extends FastFilterRewriteHelper.AbstractDateHistogramAggregationType {
174+
175+
public AutoHistogramAggregationType(MappedFieldType fieldType, boolean missing, boolean hasScript) {
176+
super(fieldType, missing, hasScript);
177+
}
178+
179+
@Override
180+
protected Rounding getRounding(final long low, final long high) {
181+
// max - min / targetBuckets = bestDuration
182+
// find the right innerInterval this bestDuration belongs to
183+
// since we cannot exceed targetBuckets, bestDuration should go up,
184+
// so the right innerInterval should be an upper bound
185+
long bestDuration = (high - low) / targetBuckets;
186+
// reset so this function is idempotent
187+
roundingIdx = 0;
188+
while (roundingIdx < roundingInfos.length - 1) {
189+
final RoundingInfo curRoundingInfo = roundingInfos[roundingIdx];
190+
final int temp = curRoundingInfo.innerIntervals[curRoundingInfo.innerIntervals.length - 1];
191+
// If the interval duration is covered by the maximum inner interval,
192+
// we can start with this outer interval for creating the buckets
193+
if (bestDuration <= temp * curRoundingInfo.roughEstimateDurationMillis) {
194+
break;
195+
}
196+
roundingIdx++;
192197
}
193-
roundingIdx++;
198+
199+
preparedRounding = prepareRounding(roundingIdx);
200+
return roundingInfos[roundingIdx].rounding;
194201
}
195202

196-
preparedRounding = prepareRounding(roundingIdx);
197-
return roundingInfos[roundingIdx].rounding;
203+
@Override
204+
protected Prepared getRoundingPrepared() {
205+
return preparedRounding;
206+
}
198207
}
199208

200209
protected abstract LongKeyedBucketOrds getBucketOrds();

server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.opensearch.common.Nullable;
4040
import org.opensearch.common.Rounding;
4141
import org.opensearch.common.lease.Releasables;
42+
import org.opensearch.index.mapper.MappedFieldType;
4243
import org.opensearch.search.DocValueFormat;
4344
import org.opensearch.search.aggregations.Aggregator;
4445
import org.opensearch.search.aggregations.AggregatorFactories;
@@ -115,29 +116,35 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg
115116

116117
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), cardinality);
117118

118-
fastFilterContext = new FastFilterRewriteHelper.FastFilterContext();
119+
fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(context);
119120
fastFilterContext.setAggregationType(
120-
new FastFilterRewriteHelper.DateHistogramAggregationType(
121+
new DateHistogramAggregationType(
121122
valuesSourceConfig.fieldType(),
122123
valuesSourceConfig.missing() != null,
123-
valuesSourceConfig.script() != null
124+
valuesSourceConfig.script() != null,
125+
hardBounds
124126
)
125127
);
126128
if (fastFilterContext.isRewriteable(parent, subAggregators.length)) {
127-
fastFilterContext.buildFastFilter(context, this::computeBounds, x -> rounding, () -> preparedRounding);
129+
fastFilterContext.buildFastFilter();
128130
}
129131
}
130132

131-
private long[] computeBounds(final FastFilterRewriteHelper.DateHistogramAggregationType fieldContext) throws IOException {
132-
final long[] bounds = FastFilterRewriteHelper.getAggregationBounds(context, fieldContext.getFieldType().name());
133-
if (bounds != null) {
134-
// Update min/max limit if user specified any hard bounds
135-
if (hardBounds != null) {
136-
bounds[0] = Math.max(bounds[0], hardBounds.getMin());
137-
bounds[1] = Math.min(bounds[1], hardBounds.getMax() - 1); // hard bounds max is exclusive
138-
}
133+
private class DateHistogramAggregationType extends FastFilterRewriteHelper.AbstractDateHistogramAggregationType {
134+
135+
public DateHistogramAggregationType(MappedFieldType fieldType, boolean missing, boolean hasScript, LongBounds hardBounds) {
136+
super(fieldType, missing, hasScript, hardBounds);
137+
}
138+
139+
@Override
140+
protected Rounding getRounding(long low, long high) {
141+
return rounding;
142+
}
143+
144+
@Override
145+
protected Rounding.Prepared getRoundingPrepared() {
146+
return preparedRounding;
139147
}
140-
return bounds;
141148
}
142149

143150
@Override

server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.lucene.document.LongPoint;
3636
import org.apache.lucene.index.Term;
3737
import org.apache.lucene.search.DocValuesFieldExistsQuery;
38+
import org.apache.lucene.search.FieldExistsQuery;
3839
import org.apache.lucene.search.MatchAllDocsQuery;
3940
import org.apache.lucene.search.TermQuery;
4041
import org.opensearch.OpenSearchParseException;
@@ -1256,6 +1257,74 @@ public void testMultiValuedWithKeywordLongAndDouble() throws Exception {
12561257
);
12571258
}
12581259

1260+
public void testDateHistogramSourceWithSize() throws IOException {
1261+
final List<Map<String, List<Object>>> dataset = new ArrayList<>(
1262+
Arrays.asList(
1263+
createDocument("date", asLong("2017-10-20T03:08:45")),
1264+
createDocument("date", asLong("2016-09-20T09:00:34")),
1265+
createDocument("date", asLong("2016-09-20T11:34:00")),
1266+
createDocument("date", asLong("2017-10-20T06:09:24")),
1267+
createDocument("date", asLong("2017-10-19T06:09:24")),
1268+
createDocument("long", 4L)
1269+
)
1270+
);
1271+
testSearchCase(
1272+
Arrays.asList(
1273+
new MatchAllDocsQuery(),
1274+
new FieldExistsQuery("date"),
1275+
LongPoint.newRangeQuery("date", asLong("2016-09-20T09:00:34"), asLong("2017-10-20T06:09:24"))
1276+
),
1277+
dataset,
1278+
() -> {
1279+
DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date").field("date")
1280+
.calendarInterval(DateHistogramInterval.days(1));
1281+
return new CompositeAggregationBuilder("name", Collections.singletonList(histo)).size(1);
1282+
},
1283+
(result) -> {
1284+
assertEquals(1, result.getBuckets().size());
1285+
assertEquals("{date=1474329600000}", result.afterKey().toString()); // 2017-10-20T00:00:00
1286+
assertEquals("{date=1474329600000}", result.getBuckets().get(0).getKeyAsString());
1287+
assertEquals(2L, result.getBuckets().get(0).getDocCount());
1288+
}
1289+
);
1290+
}
1291+
1292+
public void testDateHistogramSourceWithDocCountField() throws IOException {
1293+
final List<Map<String, List<Object>>> dataset = new ArrayList<>(
1294+
Arrays.asList(
1295+
createDocument("date", asLong("2017-10-20T03:08:45"), "_doc_count", 5),
1296+
createDocument("date", asLong("2016-09-20T09:00:34")),
1297+
createDocument("date", asLong("2016-09-20T11:34:00"), "_doc_count", 2),
1298+
createDocument("date", asLong("2017-10-20T06:09:24")),
1299+
createDocument("date", asLong("2017-10-19T06:09:24"), "_doc_count", 3),
1300+
createDocument("long", 4L)
1301+
)
1302+
);
1303+
testSearchCase(
1304+
Arrays.asList(
1305+
new MatchAllDocsQuery(),
1306+
new FieldExistsQuery("date"),
1307+
LongPoint.newRangeQuery("date", asLong("2016-09-20T09:00:34"), asLong("2017-10-20T06:09:24"))
1308+
),
1309+
dataset,
1310+
() -> {
1311+
DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date").field("date")
1312+
.calendarInterval(DateHistogramInterval.days(1));
1313+
return new CompositeAggregationBuilder("name", Collections.singletonList(histo));
1314+
},
1315+
(result) -> {
1316+
assertEquals(3, result.getBuckets().size());
1317+
assertEquals("{date=1508457600000}", result.afterKey().toString());
1318+
assertEquals("{date=1474329600000}", result.getBuckets().get(0).getKeyAsString());
1319+
assertEquals(3L, result.getBuckets().get(0).getDocCount());
1320+
assertEquals("{date=1508371200000}", result.getBuckets().get(1).getKeyAsString());
1321+
assertEquals(3L, result.getBuckets().get(1).getDocCount());
1322+
assertEquals("{date=1508457600000}", result.getBuckets().get(2).getKeyAsString());
1323+
assertEquals(6L, result.getBuckets().get(2).getDocCount());
1324+
}
1325+
);
1326+
}
1327+
12591328
public void testWithDateHistogram() throws IOException {
12601329
final List<Map<String, List<Object>>> dataset = new ArrayList<>();
12611330
dataset.addAll(

0 commit comments

Comments
 (0)