Skip to content

Commit 028084c

Browse files
committed
Add a new merge policy that interleaves old and new segments on force merge (#48533)
This change adds a new merge policy that interleaves eldest and newest segments picked by MergePolicy#findForcedMerges and MergePolicy#findForcedDeletesMerges. This allows time-based indices, that usually have the eldest documents first, to be efficient at finding the most recent documents too. Although we wrap this merge policy for all indices even though it is mostly useful for time-based but there should be no overhead for other type of indices so it's simpler than adding a setting to enable it. This change is needed in order to ensure that the optimizations that we are working on in # remain efficient even after running a force merge. Relates #37043
1 parent abddf51 commit 028084c

File tree

6 files changed

+228
-2
lines changed

6 files changed

+228
-2
lines changed
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.lucene.index;
21+
22+
import org.elasticsearch.common.lucene.Lucene;
23+
24+
import java.io.IOException;
25+
import java.util.ArrayList;
26+
import java.util.Collections;
27+
import java.util.Comparator;
28+
import java.util.HashMap;
29+
import java.util.List;
30+
import java.util.Map;
31+
32+
/**
33+
* A {@link FilterMergePolicy} that interleaves eldest and newest segments picked by {@link MergePolicy#findForcedMerges}
34+
* and {@link MergePolicy#findForcedDeletesMerges}. This allows time-based indices, that usually have the eldest documents
35+
* first, to be efficient at finding the most recent documents too.
36+
*/
37+
public class ShuffleForcedMergePolicy extends FilterMergePolicy {
38+
private static final String SHUFFLE_MERGE_KEY = "es.shuffle_merge";
39+
40+
public ShuffleForcedMergePolicy(MergePolicy in) {
41+
super(in);
42+
}
43+
44+
/**
45+
* Return <code>true</code> if the provided reader was merged with interleaved segments.
46+
*/
47+
public static boolean isInterleavedSegment(LeafReader reader) {
48+
SegmentReader segReader = Lucene.segmentReader(reader);
49+
return segReader.getSegmentInfo().info.getDiagnostics().containsKey(SHUFFLE_MERGE_KEY);
50+
}
51+
52+
53+
@Override
54+
public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
55+
return wrap(in.findForcedDeletesMerges(segmentInfos, mergeContext));
56+
}
57+
58+
@Override
59+
public MergeSpecification findForcedMerges(SegmentInfos segmentInfos, int maxSegmentCount,
60+
Map<SegmentCommitInfo, Boolean> segmentsToMerge,
61+
MergeContext mergeContext) throws IOException {
62+
return wrap(in.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, mergeContext));
63+
}
64+
65+
private MergeSpecification wrap(MergeSpecification mergeSpec) throws IOException {
66+
if (mergeSpec == null) {
67+
return null;
68+
}
69+
MergeSpecification newMergeSpec = new MergeSpecification();
70+
for (OneMerge toWrap : mergeSpec.merges) {
71+
List<SegmentCommitInfo> newInfos = interleaveList(new ArrayList<>(toWrap.segments));
72+
newMergeSpec.add(new OneMerge(newInfos) {
73+
@Override
74+
public CodecReader wrapForMerge(CodecReader reader) throws IOException {
75+
return toWrap.wrapForMerge(reader);
76+
}
77+
78+
@Override
79+
public void setMergeInfo(SegmentCommitInfo info) {
80+
// Record that this merged segment is current as of this schemaGen:
81+
Map<String, String> copy = new HashMap<>(info.info.getDiagnostics());
82+
copy.put(SHUFFLE_MERGE_KEY, "");
83+
info.info.setDiagnostics(copy);
84+
super.setMergeInfo(info);
85+
}
86+
});
87+
}
88+
89+
return newMergeSpec;
90+
}
91+
92+
// Return a new list that sort segments of the original one by name (older first)
93+
// and then interleave them to colocate oldest and most recent segments together.
94+
private List<SegmentCommitInfo> interleaveList(List<SegmentCommitInfo> infos) throws IOException {
95+
List<SegmentCommitInfo> newInfos = new ArrayList<>(infos.size());
96+
Collections.sort(infos, Comparator.comparing(a -> a.info.name));
97+
int left = 0;
98+
int right = infos.size() - 1;
99+
while (left <= right) {
100+
SegmentCommitInfo leftInfo = infos.get(left);
101+
if (left == right) {
102+
newInfos.add(infos.get(left));
103+
} else {
104+
SegmentCommitInfo rightInfo = infos.get(right);
105+
// smaller segment first
106+
if (leftInfo.sizeInBytes() < rightInfo.sizeInBytes()) {
107+
newInfos.add(leftInfo);
108+
newInfos.add(rightInfo);
109+
} else {
110+
newInfos.add(rightInfo);
111+
newInfos.add(leftInfo);
112+
}
113+
}
114+
left ++;
115+
right --;
116+
}
117+
return newInfos;
118+
}
119+
}

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.lucene.index.MergePolicy;
3737
import org.apache.lucene.index.SegmentCommitInfo;
3838
import org.apache.lucene.index.SegmentInfos;
39+
import org.apache.lucene.index.ShuffleForcedMergePolicy;
3940
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
4041
import org.apache.lucene.index.Term;
4142
import org.apache.lucene.search.DocIdSetIterator;
@@ -56,6 +57,7 @@
5657
import org.elasticsearch.Assertions;
5758
import org.elasticsearch.ExceptionsHelper;
5859
import org.elasticsearch.action.index.IndexRequest;
60+
import org.elasticsearch.common.Booleans;
5961
import org.elasticsearch.common.Nullable;
6062
import org.elasticsearch.common.SuppressForbidden;
6163
import org.elasticsearch.common.lease.Releasable;
@@ -2226,6 +2228,13 @@ private IndexWriterConfig getIndexWriterConfig() {
22262228
new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETES_FIELD, softDeletesPolicy::getRetentionQuery,
22272229
new PrunePostingsMergePolicy(mergePolicy, IdFieldMapper.NAME)));
22282230
}
2231+
boolean shuffleForcedMerge = Booleans.parseBoolean(System.getProperty("es.shuffle_forced_merge", Boolean.TRUE.toString()));
2232+
if (shuffleForcedMerge) {
2233+
// We wrap the merge policy for all indices even though it is mostly useful for time-based indices
2234+
// but there should be no overhead for other type of indices so it's simpler than adding a setting
2235+
// to enable it.
2236+
mergePolicy = new ShuffleForcedMergePolicy(mergePolicy);
2237+
}
22292238
iwc.setMergePolicy(new ElasticsearchMergePolicy(mergePolicy));
22302239
iwc.setSimilarity(engineConfig.getSimilarity());
22312240
iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac());

server/src/main/java/org/elasticsearch/index/shard/ElasticsearchMergePolicy.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@ public ElasticsearchMergePolicy(MergePolicy delegate) {
6161
super(delegate);
6262
}
6363

64+
/** return the wrapped merge policy */
65+
public MergePolicy getDelegate() {
66+
return in;
67+
}
68+
6469
private boolean shouldUpgrade(SegmentCommitInfo info) {
6570
org.apache.lucene.util.Version old = info.info.getVersion();
6671
org.apache.lucene.util.Version cur = Version.CURRENT.luceneVersion;
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.lucene.index;
21+
22+
import org.apache.lucene.document.Document;
23+
import org.apache.lucene.document.Field;
24+
import org.apache.lucene.document.NumericDocValuesField;
25+
import org.apache.lucene.document.StringField;
26+
import org.apache.lucene.search.Sort;
27+
import org.apache.lucene.search.SortField;
28+
import org.apache.lucene.store.Directory;
29+
30+
import java.io.IOException;
31+
import java.util.function.Consumer;
32+
33+
import static org.hamcrest.Matchers.equalTo;
34+
import static org.hamcrest.Matchers.greaterThan;
35+
36+
public class ShuffleForcedMergePolicyTests extends BaseMergePolicyTestCase {
37+
public void testDiagnostics() throws IOException {
38+
try (Directory dir = newDirectory()) {
39+
IndexWriterConfig iwc = newIndexWriterConfig();
40+
MergePolicy mp = new ShuffleForcedMergePolicy(newLogMergePolicy());
41+
iwc.setMergePolicy(mp);
42+
boolean sorted = random().nextBoolean();
43+
if (sorted) {
44+
iwc.setIndexSort(new Sort(new SortField("sort", SortField.Type.INT)));
45+
}
46+
int numDocs = atLeast(100);
47+
48+
try (IndexWriter writer = new IndexWriter(dir, iwc)) {
49+
for (int i = 0; i < numDocs; i++) {
50+
if (i % 10 == 0) {
51+
writer.flush();
52+
}
53+
Document doc = new Document();
54+
doc.add(new StringField("id", "" + i, Field.Store.NO));
55+
doc.add(new NumericDocValuesField("sort", random().nextInt()));
56+
writer.addDocument(doc);
57+
}
58+
try (DirectoryReader reader = DirectoryReader.open(writer)) {
59+
assertThat(reader.leaves().size(), greaterThan(2));
60+
assertSegmentReaders(reader, leaf -> {
61+
assertFalse(ShuffleForcedMergePolicy.isInterleavedSegment(leaf));
62+
});
63+
}
64+
writer.forceMerge(1);
65+
try (DirectoryReader reader = DirectoryReader.open(writer)) {
66+
assertThat(reader.leaves().size(), equalTo(1));
67+
assertSegmentReaders(reader, leaf -> {
68+
assertTrue(ShuffleForcedMergePolicy.isInterleavedSegment(leaf));
69+
});
70+
}
71+
}
72+
}
73+
}
74+
75+
private void assertSegmentReaders(DirectoryReader reader, Consumer<LeafReader> checkLeaf) {
76+
for (LeafReaderContext leaf : reader.leaves()) {
77+
checkLeaf.accept(leaf.reader());
78+
}
79+
}
80+
81+
@Override
82+
protected MergePolicy mergePolicy() {
83+
return new ShuffleForcedMergePolicy(newLogMergePolicy());
84+
}
85+
86+
@Override
87+
protected void assertSegmentInfos(MergePolicy policy, SegmentInfos infos) throws IOException {}
88+
89+
@Override
90+
protected void assertMerge(MergePolicy policy, MergePolicy.MergeSpecification merge) throws IOException {}
91+
}

server/src/test/java/org/elasticsearch/index/engine/PrunePostingsMergePolicyTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.lucene.index.LeafReader;
3030
import org.apache.lucene.index.MergePolicy;
3131
import org.apache.lucene.index.NumericDocValues;
32+
import org.apache.lucene.index.ShuffleForcedMergePolicy;
3233
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
3334
import org.apache.lucene.index.Term;
3435
import org.apache.lucene.index.Terms;
@@ -51,7 +52,7 @@ public void testPrune() throws IOException {
5152
iwc.setSoftDeletesField("_soft_deletes");
5253
MergePolicy mp = new SoftDeletesRetentionMergePolicy("_soft_deletes", MatchAllDocsQuery::new,
5354
new PrunePostingsMergePolicy(newLogMergePolicy(), "id"));
54-
iwc.setMergePolicy(mp);
55+
iwc.setMergePolicy(new ShuffleForcedMergePolicy(mp));
5556
boolean sorted = randomBoolean();
5657
if (sorted) {
5758
iwc.setIndexSort(new Sort(new SortField("sort", SortField.Type.INT)));

server/src/test/java/org/elasticsearch/index/engine/RecoverySourcePruneMergePolicyTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.lucene.index.NumericDocValues;
3535
import org.apache.lucene.index.SegmentCommitInfo;
3636
import org.apache.lucene.index.SegmentInfos;
37+
import org.apache.lucene.index.ShuffleForcedMergePolicy;
3738
import org.apache.lucene.index.StandardDirectoryReader;
3839
import org.apache.lucene.index.Term;
3940
import org.apache.lucene.search.DocIdSetIterator;
@@ -57,7 +58,7 @@ public void testPruneAll() throws IOException {
5758
IndexWriterConfig iwc = newIndexWriterConfig();
5859
RecoverySourcePruneMergePolicy mp = new RecoverySourcePruneMergePolicy("extra_source", MatchNoDocsQuery::new,
5960
newLogMergePolicy());
60-
iwc.setMergePolicy(mp);
61+
iwc.setMergePolicy(new ShuffleForcedMergePolicy(mp));
6162
try (IndexWriter writer = new IndexWriter(dir, iwc)) {
6263
for (int i = 0; i < 20; i++) {
6364
if (i > 0 && randomBoolean()) {

0 commit comments

Comments
 (0)