88
99package org .opensearch .index .codec .composite ;
1010
11+ import org .apache .logging .log4j .LogManager ;
12+ import org .apache .logging .log4j .Logger ;
1113import org .apache .lucene .codecs .DocValuesConsumer ;
1214import org .apache .lucene .codecs .DocValuesProducer ;
15+ import org .apache .lucene .index .DocValues ;
16+ import org .apache .lucene .index .DocValuesType ;
17+ import org .apache .lucene .index .EmptyDocValuesProducer ;
1318import org .apache .lucene .index .FieldInfo ;
1419import org .apache .lucene .index .MergeState ;
1520import org .apache .lucene .index .SegmentWriteState ;
21+ import org .apache .lucene .index .SortedNumericDocValues ;
1622import org .opensearch .common .annotation .ExperimentalApi ;
23+ import org .opensearch .index .codec .composite .datacube .startree .StarTreeValues ;
24+ import org .opensearch .index .compositeindex .datacube .startree .StarTreeField ;
1725import org .opensearch .index .compositeindex .datacube .startree .builder .StarTreesBuilder ;
1826import org .opensearch .index .mapper .CompositeMappedFieldType ;
1927import org .opensearch .index .mapper .MapperService ;
20- import org .opensearch .index .mapper .StarTreeMapper ;
2128
2229import java .io .IOException ;
30+ import java .util .Collections ;
2331import java .util .HashMap ;
2432import java .util .HashSet ;
33+ import java .util .List ;
2534import java .util .Map ;
2635import java .util .Set ;
2736import java .util .concurrent .atomic .AtomicReference ;
@@ -40,8 +49,10 @@ public class Composite99DocValuesWriter extends DocValuesConsumer {
4049 AtomicReference <MergeState > mergeState = new AtomicReference <>();
4150 private final Set <CompositeMappedFieldType > compositeMappedFieldTypes ;
4251 private final Set <String > compositeFieldSet ;
52+ private final Set <String > segmentFieldSet ;
4353
4454 private final Map <String , DocValuesProducer > fieldProducerMap = new HashMap <>();
55+ private static final Logger logger = LogManager .getLogger (Composite99DocValuesWriter .class );
4556
4657 public Composite99DocValuesWriter (DocValuesConsumer delegate , SegmentWriteState segmentWriteState , MapperService mapperService ) {
4758
@@ -50,6 +61,12 @@ public Composite99DocValuesWriter(DocValuesConsumer delegate, SegmentWriteState
5061 this .mapperService = mapperService ;
5162 this .compositeMappedFieldTypes = mapperService .getCompositeFieldTypes ();
5263 compositeFieldSet = new HashSet <>();
64+ segmentFieldSet = new HashSet <>();
65+ for (FieldInfo fi : segmentWriteState .fieldInfos ) {
66+ if (DocValuesType .SORTED_NUMERIC .equals (fi .getDocValuesType ())) {
67+ segmentFieldSet .add (fi .name );
68+ }
69+ }
5370 for (CompositeMappedFieldType type : compositeMappedFieldTypes ) {
5471 compositeFieldSet .addAll (type .fields ());
5572 }
@@ -95,23 +112,91 @@ private void createCompositeIndicesIfPossible(DocValuesProducer valuesProducer,
95112 fieldProducerMap .put (field .name , valuesProducer );
96113 compositeFieldSet .remove (field .name );
97114 }
115+ segmentFieldSet .remove (field .name );
116+ if (segmentFieldSet .isEmpty ()) {
117+ Set <String > compositeFieldSetCopy = new HashSet <>(compositeFieldSet );
118+ for (String compositeField : compositeFieldSetCopy ) {
119+ fieldProducerMap .put (compositeField , new EmptyDocValuesProducer () {
120+ @ Override
121+ public SortedNumericDocValues getSortedNumeric (FieldInfo field ) {
122+ return DocValues .emptySortedNumeric ();
123+ }
124+ });
125+ compositeFieldSet .remove (compositeField );
126+ }
127+ }
98128 // we have all the required fields to build composite fields
99129 if (compositeFieldSet .isEmpty ()) {
100130 for (CompositeMappedFieldType mappedType : compositeMappedFieldTypes ) {
101- if (mappedType instanceof StarTreeMapper . StarTreeFieldType ) {
102- try (StarTreesBuilder starTreesBuilder = new StarTreesBuilder (fieldProducerMap , state , mapperService )) {
103- starTreesBuilder .build ();
131+ if (mappedType . getCompositeIndexType (). equals ( CompositeMappedFieldType . CompositeFieldType . STAR_TREE ) ) {
132+ try (StarTreesBuilder starTreesBuilder = new StarTreesBuilder (state , mapperService )) {
133+ starTreesBuilder .build (fieldProducerMap );
104134 }
105135 }
106136 }
107137 }
138+
108139 }
109140
110141 @ Override
111142 public void merge (MergeState mergeState ) throws IOException {
112143 this .mergeState .compareAndSet (null , mergeState );
113144 super .merge (mergeState );
114- // TODO : handle merge star tree
115- // mergeStarTreeFields(mergeState);
145+ mergeCompositeFields (mergeState );
146+ }
147+
148+ /**
149+ * Merges composite fields from multiple segments
150+ * @param mergeState merge state
151+ */
152+ private void mergeCompositeFields (MergeState mergeState ) throws IOException {
153+ mergeStarTreeFields (mergeState );
154+ }
155+
156+ /**
157+ * Merges star tree data fields from multiple segments
158+ * @param mergeState merge state
159+ */
160+ private void mergeStarTreeFields (MergeState mergeState ) throws IOException {
161+ Map <String , List <StarTreeValues >> starTreeSubsPerField = new HashMap <>();
162+ StarTreeField starTreeField = null ;
163+ for (int i = 0 ; i < mergeState .docValuesProducers .length ; i ++) {
164+ CompositeIndexReader reader = null ;
165+ if (mergeState .docValuesProducers [i ] == null ) {
166+ continue ;
167+ }
168+ if (mergeState .docValuesProducers [i ] instanceof CompositeIndexReader ) {
169+ reader = (CompositeIndexReader ) mergeState .docValuesProducers [i ];
170+ } else {
171+ continue ;
172+ }
173+
174+ List <CompositeIndexFieldInfo > compositeFieldInfo = reader .getCompositeIndexFields ();
175+ for (CompositeIndexFieldInfo fieldInfo : compositeFieldInfo ) {
176+ if (fieldInfo .getType ().equals (CompositeMappedFieldType .CompositeFieldType .STAR_TREE )) {
177+ CompositeIndexValues compositeIndexValues = reader .getCompositeIndexValues (fieldInfo );
178+ if (compositeIndexValues instanceof StarTreeValues ) {
179+ StarTreeValues starTreeValues = (StarTreeValues ) compositeIndexValues ;
180+ List <StarTreeValues > fieldsList = starTreeSubsPerField .getOrDefault (fieldInfo .getField (), Collections .emptyList ());
181+ if (starTreeField == null ) {
182+ starTreeField = starTreeValues .getStarTreeField ();
183+ }
184+ // assert star tree configuration is same across segments
185+ else {
186+ if (starTreeField .equals (starTreeValues .getStarTreeField ()) == false ) {
187+ throw new IllegalArgumentException (
188+ "star tree field configuration must match the configuration of the field being merged"
189+ );
190+ }
191+ }
192+ fieldsList .add (starTreeValues );
193+ starTreeSubsPerField .put (fieldInfo .getField (), fieldsList );
194+ }
195+ }
196+ }
197+ }
198+ try (StarTreesBuilder starTreesBuilder = new StarTreesBuilder (state , mapperService )) {
199+ starTreesBuilder .buildDuringMerge (starTreeSubsPerField );
200+ }
116201 }
117202}
0 commit comments