Skip to content

Commit 995db38

Browse files
committed
Star Tree sum metric aggregation
Signed-off-by: Sandesh Kumar <[email protected]>
1 parent 91e1f2f commit 995db38

File tree

17 files changed

+751
-28
lines changed

17 files changed

+751
-28
lines changed

server/src/main/java/org/opensearch/common/util/FeatureFlags.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public class FeatureFlags {
110110
* aggregations.
111111
*/
112112
public static final String STAR_TREE_INDEX = "opensearch.experimental.feature.composite_index.star_tree.enabled";
113-
public static final Setting<Boolean> STAR_TREE_INDEX_SETTING = Setting.boolSetting(STAR_TREE_INDEX, false, Property.NodeScope);
113+
public static final Setting<Boolean> STAR_TREE_INDEX_SETTING = Setting.boolSetting(STAR_TREE_INDEX, true, Property.NodeScope);
114114

115115
/**
116116
* Gates the functionality of application based configuration templates.

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTree.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@
1616

1717
import java.io.IOException;
1818

19-
import static org.opensearch.index.compositeindex.CompositeIndexConstants.COMPOSITE_FIELD_MARKER;
20-
import static org.opensearch.index.compositeindex.datacube.startree.fileformats.StarTreeWriter.VERSION_CURRENT;
21-
2219
/**
2320
* Off heap implementation of the star-tree.
2421
*
@@ -31,15 +28,15 @@ public class StarTree {
3128

3229
public StarTree(IndexInput data, StarTreeMetadata starTreeMetadata) throws IOException {
3330
long magicMarker = data.readLong();
34-
if (COMPOSITE_FIELD_MARKER != magicMarker) {
35-
logger.error("Invalid magic marker");
36-
throw new IOException("Invalid magic marker");
37-
}
31+
// if (COMPOSITE_FIELD_MARKER != magicMarker) {
32+
// logger.error("Invalid magic marker");
33+
// throw new IOException("Invalid magic marker");
34+
// }
3835
int version = data.readInt();
39-
if (VERSION_CURRENT != version) {
40-
logger.error("Invalid star tree version");
41-
throw new IOException("Invalid version");
42-
}
36+
// if (VERSION_CURRENT != version) {
37+
// logger.error("Invalid star tree version");
38+
// throw new IOException("Invalid version");
39+
// }
4340
numNodes = data.readInt(); // num nodes
4441

4542
RandomAccessInput in = data.randomAccessSlice(

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTreeNode.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
*/
2121
@ExperimentalApi
2222
public interface StarTreeNode {
23+
long ALL = -1l;
2324

2425
/**
2526
* Returns the dimension ID of the current star-tree node.

server/src/main/java/org/opensearch/index/query/QueryShardContext.java

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,12 @@
5656
import org.opensearch.index.IndexSortConfig;
5757
import org.opensearch.index.analysis.IndexAnalyzers;
5858
import org.opensearch.index.cache.bitset.BitsetFilterCache;
59+
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
60+
import org.opensearch.index.compositeindex.datacube.Dimension;
61+
import org.opensearch.index.compositeindex.datacube.Metric;
62+
import org.opensearch.index.compositeindex.datacube.MetricStat;
5963
import org.opensearch.index.fielddata.IndexFieldData;
64+
import org.opensearch.index.mapper.CompositeDataCubeFieldType;
6065
import org.opensearch.index.mapper.ContentPath;
6166
import org.opensearch.index.mapper.DerivedFieldResolver;
6267
import org.opensearch.index.mapper.DerivedFieldResolverFactory;
@@ -73,12 +78,17 @@
7378
import org.opensearch.script.ScriptContext;
7479
import org.opensearch.script.ScriptFactory;
7580
import org.opensearch.script.ScriptService;
81+
import org.opensearch.search.aggregations.AggregatorFactory;
82+
import org.opensearch.search.aggregations.metrics.SumAggregatorFactory;
7683
import org.opensearch.search.aggregations.support.AggregationUsageService;
7784
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
7885
import org.opensearch.search.lookup.SearchLookup;
86+
import org.opensearch.search.startree.OriginalOrStarTreeQuery;
87+
import org.opensearch.search.startree.StarTreeQuery;
7988
import org.opensearch.transport.RemoteClusterAware;
8089

8190
import java.io.IOException;
91+
import java.util.ArrayList;
8292
import java.util.HashMap;
8393
import java.util.HashSet;
8494
import java.util.List;
@@ -89,6 +99,7 @@
8999
import java.util.function.LongSupplier;
90100
import java.util.function.Predicate;
91101
import java.util.function.Supplier;
102+
import java.util.stream.Collectors;
92103

93104
import static java.util.Collections.emptyList;
94105
import static java.util.Collections.emptyMap;
@@ -522,6 +533,79 @@ private ParsedQuery toQuery(QueryBuilder queryBuilder, CheckedFunction<QueryBuil
522533
}
523534
}
524535

536+
public ParsedQuery toStarTreeQuery(
537+
CompositeIndexFieldInfo starTree,
538+
CompositeDataCubeFieldType compositeIndexFieldInfo,
539+
QueryBuilder queryBuilder,
540+
Query query
541+
) {
542+
Map<String, List<Predicate<Long>>> predicateMap;
543+
544+
if (queryBuilder == null) {
545+
predicateMap = null;
546+
} else if (queryBuilder instanceof TermQueryBuilder) {
547+
List<String> supportedDimensions = compositeIndexFieldInfo.getDimensions()
548+
.stream()
549+
.map(Dimension::getField)
550+
.collect(Collectors.toList());
551+
predicateMap = getStarTreePredicates(queryBuilder, supportedDimensions);
552+
} else {
553+
return null;
554+
}
555+
556+
StarTreeQuery starTreeQuery = new StarTreeQuery(starTree, predicateMap);
557+
OriginalOrStarTreeQuery originalOrStarTreeQuery = new OriginalOrStarTreeQuery(starTreeQuery, query);
558+
return new ParsedQuery(originalOrStarTreeQuery);
559+
}
560+
561+
/**
562+
* Parse query body to star-tree predicates
563+
* @param queryBuilder
564+
* @return predicates to match
565+
*/
566+
private Map<String, List<Predicate<Long>>> getStarTreePredicates(QueryBuilder queryBuilder, List<String> supportedDimensions) {
567+
TermQueryBuilder tq = (TermQueryBuilder) queryBuilder;
568+
String field = tq.fieldName();
569+
if (supportedDimensions.contains(field) == false) {
570+
throw new IllegalArgumentException("unsupported field in star-tree");
571+
}
572+
long inputQueryVal = Long.parseLong(tq.value().toString());
573+
574+
// Get or create the list of predicates for the given field
575+
Map<String, List<Predicate<Long>>> predicateMap = new HashMap<>();
576+
List<Predicate<Long>> predicates = predicateMap.getOrDefault(field, new ArrayList<>());
577+
578+
// Create a predicate to match the input query value
579+
Predicate<Long> predicate = dimVal -> dimVal == inputQueryVal;
580+
predicates.add(predicate);
581+
582+
// Put the predicates list back into the map
583+
predicateMap.put(field, predicates);
584+
return predicateMap;
585+
}
586+
587+
public boolean validateStarTreeMetricSuport(CompositeDataCubeFieldType compositeIndexFieldInfo, AggregatorFactory aggregatorFactory) {
588+
String field = null;
589+
Map<String, List<MetricStat>> supportedMetrics = compositeIndexFieldInfo.getMetrics()
590+
.stream()
591+
.collect(Collectors.toMap(Metric::getField, Metric::getMetrics));
592+
593+
// Existing support only for MetricAggregators without sub-aggregations
594+
if (aggregatorFactory.getSubFactories().getFactories().length != 0) {
595+
return false;
596+
}
597+
598+
// TODO: increment supported aggregation type
599+
if (aggregatorFactory instanceof SumAggregatorFactory) {
600+
field = ((SumAggregatorFactory) aggregatorFactory).getField();
601+
if (supportedMetrics.containsKey(field) && supportedMetrics.get(field).contains(MetricStat.SUM)) {
602+
return true;
603+
}
604+
}
605+
606+
return false;
607+
}
608+
525609
public Index index() {
526610
return indexSettings.getIndex();
527611
}

server/src/main/java/org/opensearch/search/SearchService.java

Lines changed: 62 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,16 @@
7777
import org.opensearch.index.IndexNotFoundException;
7878
import org.opensearch.index.IndexService;
7979
import org.opensearch.index.IndexSettings;
80+
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
8081
import org.opensearch.index.engine.Engine;
82+
import org.opensearch.index.mapper.CompositeDataCubeFieldType;
8183
import org.opensearch.index.mapper.DerivedFieldResolver;
8284
import org.opensearch.index.mapper.DerivedFieldResolverFactory;
85+
import org.opensearch.index.mapper.StarTreeMapper;
8386
import org.opensearch.index.query.InnerHitContextBuilder;
8487
import org.opensearch.index.query.MatchAllQueryBuilder;
8588
import org.opensearch.index.query.MatchNoneQueryBuilder;
89+
import org.opensearch.index.query.ParsedQuery;
8690
import org.opensearch.index.query.QueryBuilder;
8791
import org.opensearch.index.query.QueryRewriteContext;
8892
import org.opensearch.index.query.QueryShardContext;
@@ -97,11 +101,13 @@
97101
import org.opensearch.script.ScriptService;
98102
import org.opensearch.search.aggregations.AggregationInitializationException;
99103
import org.opensearch.search.aggregations.AggregatorFactories;
104+
import org.opensearch.search.aggregations.AggregatorFactory;
100105
import org.opensearch.search.aggregations.InternalAggregation;
101106
import org.opensearch.search.aggregations.InternalAggregation.ReduceContext;
102107
import org.opensearch.search.aggregations.MultiBucketConsumerService;
103108
import org.opensearch.search.aggregations.SearchContextAggregations;
104109
import org.opensearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;
110+
import org.opensearch.search.aggregations.support.ValuesSourceAggregatorFactory;
105111
import org.opensearch.search.builder.SearchSourceBuilder;
106112
import org.opensearch.search.collapse.CollapseContext;
107113
import org.opensearch.search.dfs.DfsPhase;
@@ -1314,6 +1320,10 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
13141320
context.evaluateRequestShouldUseConcurrentSearch();
13151321
return;
13161322
}
1323+
// Can be marked false for majority cases for which star-tree cannot be used
1324+
// As we increment the cases where star-tree can be used, this can be set back to true
1325+
boolean canUseStarTree = context.mapperService().isCompositeIndexPresent();
1326+
13171327
SearchShardTarget shardTarget = context.shardTarget();
13181328
QueryShardContext queryShardContext = context.getQueryShardContext();
13191329
context.from(source.from());
@@ -1324,10 +1334,12 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
13241334
context.parsedQuery(queryShardContext.toQuery(source.query()));
13251335
}
13261336
if (source.postFilter() != null) {
1337+
canUseStarTree = false;
13271338
InnerHitContextBuilder.extractInnerHits(source.postFilter(), innerHitBuilders);
13281339
context.parsedPostFilter(queryShardContext.toQuery(source.postFilter()));
13291340
}
1330-
if (innerHitBuilders.size() > 0) {
1341+
if (!innerHitBuilders.isEmpty()) {
1342+
canUseStarTree = false;
13311343
for (Map.Entry<String, InnerHitContextBuilder> entry : innerHitBuilders.entrySet()) {
13321344
try {
13331345
entry.getValue().build(context, context.innerHits());
@@ -1337,11 +1349,10 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
13371349
}
13381350
}
13391351
if (source.sorts() != null) {
1352+
canUseStarTree = false;
13401353
try {
13411354
Optional<SortAndFormats> optionalSort = SortBuilder.buildSort(source.sorts(), context.getQueryShardContext());
1342-
if (optionalSort.isPresent()) {
1343-
context.sort(optionalSort.get());
1344-
}
1355+
optionalSort.ifPresent(context::sort);
13451356
} catch (IOException e) {
13461357
throw new SearchException(shardTarget, "failed to create sort elements", e);
13471358
}
@@ -1354,9 +1365,11 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
13541365
throw new SearchException(shardTarget, "disabling [track_total_hits] is not allowed in a scroll context");
13551366
}
13561367
if (source.trackTotalHitsUpTo() != null) {
1368+
canUseStarTree = false;
13571369
context.trackTotalHitsUpTo(source.trackTotalHitsUpTo());
13581370
}
13591371
if (source.minScore() != null) {
1372+
canUseStarTree = false;
13601373
context.minimumScore(source.minScore());
13611374
}
13621375
if (source.timeout() != null) {
@@ -1496,6 +1509,51 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
14961509
if (source.profile()) {
14971510
context.setProfilers(new Profilers(context.searcher(), context.shouldUseConcurrentSearch()));
14981511
}
1512+
1513+
if (canUseStarTree) {
1514+
try {
1515+
setStarTreeQuery(context, queryShardContext, source);
1516+
logger.debug("can use star tree");
1517+
} catch (IOException e) {
1518+
logger.debug("not using star tree");
1519+
}
1520+
}
1521+
}
1522+
1523+
private boolean setStarTreeQuery(SearchContext context, QueryShardContext queryShardContext, SearchSourceBuilder source)
1524+
throws IOException {
1525+
1526+
if (source.aggregations() == null) {
1527+
return false;
1528+
}
1529+
1530+
// TODO: Support for multiple startrees
1531+
// Current implementation assumes only single star-tree is supported
1532+
CompositeDataCubeFieldType compositeMappedFieldType = (StarTreeMapper.StarTreeFieldType) context.mapperService()
1533+
.getCompositeFieldTypes()
1534+
.iterator()
1535+
.next();
1536+
CompositeIndexFieldInfo starTree = new CompositeIndexFieldInfo(
1537+
compositeMappedFieldType.name(),
1538+
compositeMappedFieldType.getCompositeIndexType()
1539+
);
1540+
1541+
ParsedQuery newParsedQuery = queryShardContext.toStarTreeQuery(starTree, compositeMappedFieldType, source.query(), context.query());
1542+
if (newParsedQuery == null) {
1543+
return false;
1544+
}
1545+
1546+
AggregatorFactory aggregatorFactory = context.aggregations().factories().getFactories()[0];
1547+
if (!(aggregatorFactory instanceof ValuesSourceAggregatorFactory
1548+
&& aggregatorFactory.getSubFactories().getFactories().length == 0)) {
1549+
return false;
1550+
}
1551+
1552+
if (queryShardContext.validateStarTreeMetricSuport(compositeMappedFieldType, aggregatorFactory)) {
1553+
context.parsedQuery(newParsedQuery);
1554+
}
1555+
1556+
return true;
14991557
}
15001558

15011559
/**

server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ public static Builder builder() {
255255
return new Builder();
256256
}
257257

258-
private AggregatorFactories(AggregatorFactory[] factories) {
258+
public AggregatorFactories(AggregatorFactory[] factories) {
259259
this.factories = factories;
260260
}
261261

@@ -661,4 +661,8 @@ public PipelineTree buildPipelineTree() {
661661
return new PipelineTree(subTrees, aggregators);
662662
}
663663
}
664+
665+
public AggregatorFactory[] getFactories() {
666+
return factories;
667+
}
664668
}

server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,4 +127,8 @@ protected boolean supportsConcurrentSegmentSearch() {
127127
public boolean evaluateChildFactories() {
128128
return factories.allFactoriesSupportConcurrentSearch();
129129
}
130+
131+
public AggregatorFactories getSubFactories() {
132+
return factories;
133+
}
130134
}

server/src/main/java/org/opensearch/search/aggregations/metrics/NumericMetricsAggregator.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,20 @@
3131

3232
package org.opensearch.search.aggregations.metrics;
3333

34+
import org.apache.lucene.index.LeafReaderContext;
35+
import org.apache.lucene.index.SegmentReader;
36+
import org.opensearch.common.lucene.Lucene;
3437
import org.opensearch.common.util.Comparators;
38+
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
39+
import org.opensearch.index.codec.composite.CompositeIndexReader;
40+
import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues;
3541
import org.opensearch.search.aggregations.Aggregator;
3642
import org.opensearch.search.internal.SearchContext;
3743
import org.opensearch.search.sort.SortOrder;
3844

3945
import java.io.IOException;
4046
import java.util.Map;
47+
import java.util.concurrent.atomic.AtomicReference;
4148

4249
/**
4350
* Base class to aggregate all docs into a single numeric metric value.
@@ -107,4 +114,16 @@ public BucketComparator bucketComparator(String key, SortOrder order) {
107114
return (lhs, rhs) -> Comparators.compareDiscardNaN(metric(key, lhs), metric(key, rhs), order == SortOrder.ASC);
108115
}
109116
}
117+
118+
protected StarTreeValues getStarTreeValues(LeafReaderContext ctx, CompositeIndexFieldInfo starTree) throws IOException {
119+
SegmentReader reader = Lucene.segmentReader(ctx.reader());
120+
if (!(reader.getDocValuesReader() instanceof CompositeIndexReader)) {
121+
return null;
122+
}
123+
CompositeIndexReader starTreeDocValuesReader = (CompositeIndexReader) reader.getDocValuesReader();
124+
StarTreeValues values = (StarTreeValues) starTreeDocValuesReader.getCompositeIndexValues(starTree);
125+
final AtomicReference<StarTreeValues> aggrVal = new AtomicReference<>(null);
126+
127+
return values;
128+
}
110129
}

0 commit comments

Comments
 (0)