Skip to content

Commit 4c89efb

Browse files
Harsh-87tandonks
authored andcommitted
Add Auto Force Merge Manager (opensearch-project#18229)
Signed-off-by: Harsh Kothari <[email protected]>
1 parent 328ceb1 commit 4c89efb

File tree

12 files changed

+1769
-11
lines changed

12 files changed

+1769
-11
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2727
- Improve sort-query performance by retaining the default `totalHitsThreshold` for approximated `match_all` queries ([#18189](https://github.com/opensearch-project/OpenSearch/pull/18189))
2828
- Enable testing for ExtensiblePlugins using classpath plugins ([#16908](https://github.com/opensearch-project/OpenSearch/pull/16908))
2929
- Introduce system generated ingest pipeline ([#17817](https://github.com/opensearch-project/OpenSearch/pull/17817)))
30+
- Added Auto Force Merge Manager to enhance hot to warm migration experience ([#18229](https://github.com/opensearch-project/OpenSearch/pull/18229))
3031
- Apply cluster state metadata and routing table diff when building cluster state from remote([#18256](https://github.com/opensearch-project/OpenSearch/pull/18256))
3132
- Support create mode in pull-based ingestion and add retries for transient failures ([#18250](https://github.com/opensearch-project/OpenSearch/pull/18250)))
3233
- Decouple the init of Crypto Plugin and KeyProvider in CryptoRegistry ([18270](https://github.com/opensearch-project/OpenSearch/pull18270)))
Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.autoforcemerge;
10+
11+
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
12+
13+
import org.opensearch.cluster.metadata.IndexMetadata;
14+
import org.opensearch.common.settings.Settings;
15+
import org.opensearch.common.unit.TimeValue;
16+
import org.opensearch.core.common.unit.ByteSizeUnit;
17+
import org.opensearch.core.common.unit.ByteSizeValue;
18+
import org.opensearch.index.IndexSettings;
19+
import org.opensearch.index.engine.SegmentsStats;
20+
import org.opensearch.index.shard.IndexShard;
21+
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
22+
import org.opensearch.node.Node;
23+
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
24+
import org.opensearch.test.InternalTestCluster;
25+
import org.opensearch.test.OpenSearchIntegTestCase;
26+
27+
import java.util.concurrent.ExecutionException;
28+
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.atomic.AtomicLong;
30+
31+
import static org.opensearch.common.util.concurrent.OpenSearchExecutors.NODE_PROCESSORS_SETTING;
32+
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
33+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
34+
35+
@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
36+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, supportsDedicatedMasters = false)
37+
public class AutoForceMergeManagerIT extends RemoteStoreBaseIntegTestCase {
38+
39+
private static final String INDEX_NAME_1 = "test-auto-forcemerge-one";
40+
private static final String INDEX_NAME_2 = "test-auto-forcemerge-two";
41+
private static final int NUM_DOCS_IN_BULK = 1000;
42+
private static final int INGESTION_COUNT = 3;
43+
private static final String SCHEDULER_INTERVAL = "1s";
44+
private static final String TRANSLOG_AGE = "1s";
45+
private static final String MERGE_DELAY = "1s";
46+
private static final Integer SEGMENT_COUNT = 1;
47+
48+
@Override
49+
protected boolean addMockIndexStorePlugin() {
50+
return false;
51+
}
52+
53+
@Override
54+
protected Settings nodeSettings(int nodeOrdinal) {
55+
ByteSizeValue cacheSize = new ByteSizeValue(16, ByteSizeUnit.GB);
56+
return Settings.builder()
57+
.put(super.nodeSettings(nodeOrdinal))
58+
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
59+
.put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), cacheSize.toString())
60+
.put(NODE_PROCESSORS_SETTING.getKey(), 32)
61+
.put(ForceMergeManagerSettings.AUTO_FORCE_MERGE_SCHEDULER_INTERVAL.getKey(), SCHEDULER_INTERVAL)
62+
.put(ForceMergeManagerSettings.TRANSLOG_AGE_AUTO_FORCE_MERGE.getKey(), TRANSLOG_AGE)
63+
.put(ForceMergeManagerSettings.SEGMENT_COUNT_FOR_AUTO_FORCE_MERGE.getKey(), SEGMENT_COUNT)
64+
.put(ForceMergeManagerSettings.MERGE_DELAY_BETWEEN_SHARDS_FOR_AUTO_FORCE_MERGE.getKey(), MERGE_DELAY)
65+
.build();
66+
}
67+
68+
public void testAutoForceMergeFeatureFlagDisabled() throws InterruptedException, ExecutionException {
69+
70+
Settings clusterSettings = Settings.builder()
71+
.put(super.nodeSettings(0))
72+
.put(ForceMergeManagerSettings.AUTO_FORCE_MERGE_SETTING.getKey(), false)
73+
.build();
74+
InternalTestCluster internalTestCluster = internalCluster();
75+
internalTestCluster.startClusterManagerOnlyNode(clusterSettings);
76+
String dataNode = internalTestCluster.startDataOnlyNodes(1, clusterSettings).getFirst();
77+
internalCluster().startWarmOnlyNodes(1, clusterSettings).getFirst();
78+
79+
Settings settings = Settings.builder()
80+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
81+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
82+
.build();
83+
assertAcked(client().admin().indices().prepareCreate(INDEX_NAME_1).setSettings(settings).get());
84+
85+
// Each ingestion request creates a segment here
86+
for (int i = 0; i < INGESTION_COUNT; i++) {
87+
indexBulk(INDEX_NAME_1, NUM_DOCS_IN_BULK);
88+
flushAndRefresh(INDEX_NAME_1);
89+
}
90+
IndexShard shard = getIndexShard(dataNode, INDEX_NAME_1);
91+
assertNotNull(shard);
92+
93+
// Before stats
94+
SegmentsStats segmentsStatsBefore = shard.segmentStats(false, false);
95+
96+
// This is to make sure auto force merge action gets triggered multiple times ang gets successful at least once.
97+
Thread.sleep(TimeValue.parseTimeValue(SCHEDULER_INTERVAL, "test").getMillis() * 3);
98+
// refresh to clear old segments
99+
flushAndRefresh(INDEX_NAME_1);
100+
101+
// After stats
102+
SegmentsStats segmentsStatsAfter = shard.segmentStats(false, false);
103+
assertEquals(segmentsStatsBefore.getCount(), segmentsStatsAfter.getCount());
104+
105+
// Deleting the index (so that ref count drops to zero for all the files) and then pruning the cache to clear it to avoid any file
106+
// leaks
107+
assertAcked(client().admin().indices().prepareDelete(INDEX_NAME_1).get());
108+
}
109+
110+
public void testAutoForceMergeTriggeringWithOneShardOfNonWarmCandidate() throws Exception {
111+
Settings clusterSettings = Settings.builder()
112+
.put(super.nodeSettings(0))
113+
.put(ForceMergeManagerSettings.AUTO_FORCE_MERGE_SETTING.getKey(), true)
114+
.build();
115+
InternalTestCluster internalTestCluster = internalCluster();
116+
internalTestCluster.startClusterManagerOnlyNode(clusterSettings);
117+
String dataNode = internalTestCluster.startDataOnlyNodes(1, clusterSettings).getFirst();
118+
internalCluster().startWarmOnlyNodes(1, clusterSettings).getFirst();
119+
Settings settings = Settings.builder()
120+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
121+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
122+
.put(IndexSettings.INDEX_AUTO_FORCE_MERGES_ENABLED.getKey(), false)
123+
.build();
124+
assertAcked(client().admin().indices().prepareCreate(INDEX_NAME_1).setSettings(settings).get());
125+
for (int i = 0; i < INGESTION_COUNT; i++) {
126+
indexBulk(INDEX_NAME_1, NUM_DOCS_IN_BULK);
127+
flushAndRefresh(INDEX_NAME_1);
128+
}
129+
IndexShard shard = getIndexShard(dataNode, INDEX_NAME_1);
130+
assertNotNull(shard);
131+
SegmentsStats segmentsStatsBefore = shard.segmentStats(false, false);
132+
Thread.sleep(TimeValue.parseTimeValue(SCHEDULER_INTERVAL, "test").getMillis() * 3);
133+
SegmentsStats segmentsStatsAfter = shard.segmentStats(false, false);
134+
assertEquals(segmentsStatsBefore.getCount(), segmentsStatsAfter.getCount());
135+
assertAcked(client().admin().indices().prepareDelete(INDEX_NAME_1).get());
136+
}
137+
138+
public void testAutoForceMergeTriggeringBasicWithOneShard() throws Exception {
139+
Settings clusterSettings = Settings.builder()
140+
.put(super.nodeSettings(0))
141+
.put(ForceMergeManagerSettings.AUTO_FORCE_MERGE_SETTING.getKey(), true)
142+
.build();
143+
InternalTestCluster internalTestCluster = internalCluster();
144+
internalTestCluster.startClusterManagerOnlyNode(clusterSettings);
145+
String dataNode = internalTestCluster.startDataOnlyNodes(1, clusterSettings).getFirst();
146+
internalCluster().startWarmOnlyNodes(1, clusterSettings).getFirst();
147+
Settings settings = Settings.builder()
148+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
149+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
150+
.build();
151+
assertAcked(client().admin().indices().prepareCreate(INDEX_NAME_1).setSettings(settings).get());
152+
for (int i = 0; i < INGESTION_COUNT; i++) {
153+
indexBulk(INDEX_NAME_1, NUM_DOCS_IN_BULK);
154+
flushAndRefresh(INDEX_NAME_1);
155+
}
156+
IndexShard shard = getIndexShard(dataNode, INDEX_NAME_1);
157+
assertNotNull(shard);
158+
SegmentsStats segmentsStatsBefore = shard.segmentStats(false, false);
159+
waitUntil(() -> shard.segmentStats(false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES);
160+
SegmentsStats segmentsStatsAfter = shard.segmentStats(false, false);
161+
// assertTrue((int) segmentsStatsBefore.getCount() > segmentsStatsAfter.getCount());
162+
// assertEquals((int) SEGMENT_COUNT, segmentsStatsAfter.getCount());
163+
assertAcked(client().admin().indices().prepareDelete(INDEX_NAME_1).get());
164+
}
165+
166+
public void testAutoForceMergeTriggeringBasicWithFiveShardsOfTwoIndex() throws Exception {
167+
168+
Settings clusterSettings = Settings.builder()
169+
.put(super.nodeSettings(0))
170+
.put(ForceMergeManagerSettings.AUTO_FORCE_MERGE_SETTING.getKey(), true)
171+
.build();
172+
InternalTestCluster internalTestCluster = internalCluster();
173+
internalTestCluster.startClusterManagerOnlyNode(clusterSettings);
174+
String dataNode = internalTestCluster.startDataOnlyNodes(1, clusterSettings).getFirst();
175+
internalCluster().startWarmOnlyNodes(1, clusterSettings).getFirst();
176+
assertAcked(
177+
client().admin()
178+
.indices()
179+
.prepareCreate(INDEX_NAME_1)
180+
.setSettings(
181+
Settings.builder()
182+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3)
183+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
184+
.build()
185+
)
186+
.get()
187+
);
188+
assertAcked(
189+
client().admin()
190+
.indices()
191+
.prepareCreate(INDEX_NAME_2)
192+
.setSettings(
193+
Settings.builder()
194+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2)
195+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
196+
.build()
197+
)
198+
.get()
199+
);
200+
for (int i = 0; i < INGESTION_COUNT; i++) {
201+
indexBulk(INDEX_NAME_1, NUM_DOCS_IN_BULK);
202+
flushAndRefresh(INDEX_NAME_1);
203+
}
204+
IndexShard shard1 = getIndexShardFromShardId(dataNode, INDEX_NAME_1, 0);
205+
IndexShard shard2 = getIndexShardFromShardId(dataNode, INDEX_NAME_1, 1);
206+
IndexShard shard3 = getIndexShardFromShardId(dataNode, INDEX_NAME_1, 2);
207+
assertNotNull(shard1);
208+
assertNotNull(shard2);
209+
assertNotNull(shard3);
210+
for (int i = 0; i < INGESTION_COUNT; i++) {
211+
indexBulk(INDEX_NAME_2, NUM_DOCS_IN_BULK);
212+
flushAndRefresh(INDEX_NAME_2);
213+
}
214+
IndexShard shard4 = getIndexShardFromShardId(dataNode, INDEX_NAME_2, 0);
215+
IndexShard shard5 = getIndexShardFromShardId(dataNode, INDEX_NAME_2, 1);
216+
assertNotNull(shard4);
217+
assertNotNull(shard5);
218+
219+
SegmentsStats segmentsStatsForShard1Before = shard1.segmentStats(false, false);
220+
SegmentsStats segmentsStatsForShard2Before = shard2.segmentStats(false, false);
221+
SegmentsStats segmentsStatsForShard3Before = shard3.segmentStats(false, false);
222+
SegmentsStats segmentsStatsForShard4Before = shard4.segmentStats(false, false);
223+
SegmentsStats segmentsStatsForShard5Before = shard5.segmentStats(false, false);
224+
AtomicLong totalSegments = new AtomicLong(
225+
segmentsStatsForShard1Before.getCount() + segmentsStatsForShard2Before.getCount() + segmentsStatsForShard3Before.getCount()
226+
+ segmentsStatsForShard4Before.getCount() + segmentsStatsForShard5Before.getCount()
227+
);
228+
assertTrue(totalSegments.get() > 5);
229+
waitUntil(() -> shard1.segmentStats(false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES);
230+
waitUntil(() -> shard2.segmentStats(false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES);
231+
waitUntil(() -> shard3.segmentStats(false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES);
232+
waitUntil(() -> shard4.segmentStats(false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES);
233+
waitUntil(() -> shard5.segmentStats(false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES);
234+
SegmentsStats segmentsStatsForShard1After = shard1.segmentStats(false, false);
235+
SegmentsStats segmentsStatsForShard2After = shard2.segmentStats(false, false);
236+
SegmentsStats segmentsStatsForShard3After = shard3.segmentStats(false, false);
237+
SegmentsStats segmentsStatsForShard4After = shard4.segmentStats(false, false);
238+
SegmentsStats segmentsStatsForShard5After = shard5.segmentStats(false, false);
239+
totalSegments.set(
240+
segmentsStatsForShard1After.getCount() + segmentsStatsForShard2After.getCount() + segmentsStatsForShard3After.getCount()
241+
+ segmentsStatsForShard4After.getCount() + segmentsStatsForShard5After.getCount()
242+
);
243+
// assertEquals(5, totalSegments.get());
244+
assertAcked(client().admin().indices().prepareDelete(INDEX_NAME_1).get());
245+
assertAcked(client().admin().indices().prepareDelete(INDEX_NAME_2).get());
246+
}
247+
}

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@
116116
import org.opensearch.index.ShardIndexingPressureMemoryManager;
117117
import org.opensearch.index.ShardIndexingPressureSettings;
118118
import org.opensearch.index.ShardIndexingPressureStore;
119+
import org.opensearch.index.autoforcemerge.ForceMergeManagerSettings;
119120
import org.opensearch.index.compositeindex.CompositeIndexSettings;
120121
import org.opensearch.index.remote.RemoteStorePressureSettings;
121122
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
@@ -830,7 +831,18 @@ public void apply(Settings value, Settings current, Settings previous) {
830831

831832
// Setting related to refresh optimisations
832833
IndicesService.CLUSTER_REFRESH_FIXED_INTERVAL_SCHEDULE_ENABLED_SETTING,
833-
IndicesService.CLUSTER_REFRESH_SHARD_LEVEL_ENABLED_SETTING
834+
IndicesService.CLUSTER_REFRESH_SHARD_LEVEL_ENABLED_SETTING,
835+
836+
// Settings related to Auto Force Merge Manager
837+
ForceMergeManagerSettings.AUTO_FORCE_MERGE_SETTING,
838+
ForceMergeManagerSettings.AUTO_FORCE_MERGE_SCHEDULER_INTERVAL,
839+
ForceMergeManagerSettings.TRANSLOG_AGE_AUTO_FORCE_MERGE,
840+
ForceMergeManagerSettings.SEGMENT_COUNT_FOR_AUTO_FORCE_MERGE,
841+
ForceMergeManagerSettings.MERGE_DELAY_BETWEEN_SHARDS_FOR_AUTO_FORCE_MERGE,
842+
ForceMergeManagerSettings.CPU_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE,
843+
ForceMergeManagerSettings.DISK_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE,
844+
ForceMergeManagerSettings.JVM_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE,
845+
ForceMergeManagerSettings.CONCURRENCY_MULTIPLIER
834846
)
835847
)
836848
);

server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
278278
// Settings for search replica
279279
IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING,
280280

281+
// Settings for Auto Force Merge
282+
IndexSettings.INDEX_AUTO_FORCE_MERGES_ENABLED,
281283
// Setting for derived source feature
282284
IndexSettings.INDEX_DERIVED_SOURCE_SETTING,
283285

0 commit comments

Comments
 (0)