Skip to content

Commit 2a150a0

Browse files
committed
Adding the MergedSegmentWarmerPressureService
Signed-off-by: kh3ra <[email protected]>
1 parent 12721aa commit 2a150a0

File tree

11 files changed

+263
-26
lines changed

11 files changed

+263
-26
lines changed

server/src/main/java/org/opensearch/index/IndexSettings.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -796,6 +796,22 @@ public static IndexMergePolicy fromString(String text) {
796796
Property.IndexScope
797797
);
798798

799+
public static final Setting<Boolean> INDEX_MERGED_SEGMENT_WARMER_PRESSURE_ENABLED = Setting.boolSetting(
800+
"index.merged_segment_warmer.pressure.enabled",
801+
true,
802+
Setting.Property.Dynamic,
803+
Setting.Property.NodeScope
804+
);
805+
806+
public static final Setting<Double> INDEX_MERGED_SEGMENT_WARMER_MAX_CONCURRENT_WARMS_FACTOR = Setting.doubleSetting(
807+
"index.merged_segment_warmer.max_concurrent_warms_factor",
808+
0.5,
809+
0,
810+
1,
811+
Setting.Property.Dynamic,
812+
Setting.Property.NodeScope
813+
);
814+
799815
private final Index index;
800816
private final Version version;
801817
private final Logger logger;
@@ -846,6 +862,8 @@ public static IndexMergePolicy fromString(String text) {
846862
private final RemoteStorePathStrategy remoteStorePathStrategy;
847863
private final boolean isTranslogMetadataEnabled;
848864
private volatile boolean allowDerivedField;
865+
private volatile double maxConcurrentMergedSegmentWarmsFactor;
866+
private volatile boolean mergedSegmentWarmerPressureEnabled;
849867

850868
/**
851869
* The maximum age of a retention lease before it is considered expired.
@@ -1225,6 +1243,24 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
12251243
this::setRemoteStoreTranslogRepository
12261244
);
12271245
scopedSettings.addSettingsUpdateConsumer(StarTreeIndexSettings.STAR_TREE_SEARCH_ENABLED_SETTING, this::setStarTreeIndexEnabled);
1246+
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGED_SEGMENT_WARMER_PRESSURE_ENABLED, this::setMergedSegmentWarmerPressureEnabled);
1247+
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGED_SEGMENT_WARMER_MAX_CONCURRENT_WARMS_FACTOR, this::setMaxConcurrentMergedSegmentWarmsFactor);
1248+
}
1249+
1250+
private void setMergedSegmentWarmerPressureEnabled(Boolean value) {
1251+
this.mergedSegmentWarmerPressureEnabled = value;
1252+
}
1253+
1254+
public boolean isMergedSegmentWarmerPressureEnabled() {
1255+
return this.mergedSegmentWarmerPressureEnabled;
1256+
}
1257+
1258+
private void setMaxConcurrentMergedSegmentWarmsFactor(Double value) {
1259+
this.maxConcurrentMergedSegmentWarmsFactor = value;
1260+
}
1261+
1262+
public double getMaxConcurrentMergedSegmentWarmsFactor() {
1263+
return this.maxConcurrentMergedSegmentWarmsFactor;
12281264
}
12291265

12301266
private void setSearchIdleAfter(TimeValue searchIdleAfter) {

server/src/main/java/org/opensearch/index/engine/Engine.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,10 @@ public MergeStats getMergeStats() {
216216
/** returns the history uuid for the engine */
217217
public abstract String getHistoryUUID();
218218

219+
public int getMaxMergesCount() {
220+
return 0;
221+
}
222+
219223
/**
220224
* Reads the current stored history ID from commit data.
221225
*/

server/src/main/java/org/opensearch/index/engine/InternalEngine.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2624,6 +2624,10 @@ public MergeStats getMergeStats() {
26242624
return mergeScheduler.stats();
26252625
}
26262626

2627+
public int getMaxMergesCount() {
2628+
return mergeScheduler.getMaxMergeCount();
2629+
}
2630+
26272631
LocalCheckpointTracker getLocalCheckpointTracker() {
26282632
return localCheckpointTracker;
26292633
}

server/src/main/java/org/opensearch/index/engine/MergedSegmentWarmer.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.opensearch.cluster.service.ClusterService;
1818
import org.opensearch.common.logging.Loggers;
1919
import org.opensearch.index.merge.MergedSegmentReplicationTracker;
20+
import org.opensearch.index.merge.MergedSegmentWarmerPressureService;
2021
import org.opensearch.index.shard.IndexShard;
2122
import org.opensearch.indices.recovery.RecoverySettings;
2223
import org.opensearch.transport.TransportService;
@@ -35,6 +36,7 @@ public class MergedSegmentWarmer implements IndexWriter.IndexReaderWarmer {
3536
private final ClusterService clusterService;
3637
private final IndexShard indexShard;
3738
private final MergedSegmentReplicationTracker mergedSegmentReplicationTracker;
39+
private final MergedSegmentWarmerPressureService mergedSegmentWarmerPressureService;
3840
private final Logger logger;
3941

4042
public MergedSegmentWarmer(
@@ -47,12 +49,16 @@ public MergedSegmentWarmer(
4749
this.recoverySettings = recoverySettings;
4850
this.clusterService = clusterService;
4951
this.indexShard = indexShard;
52+
this.mergedSegmentWarmerPressureService = indexShard.mergedSegmentWarmerPressureService();
5053
this.mergedSegmentReplicationTracker = indexShard.mergedSegmentReplicationTracker();
5154
this.logger = Loggers.getLogger(getClass(), indexShard.shardId());
5255
}
5356

5457
@Override
5558
public void warm(LeafReader leafReader) throws IOException {
59+
if (shouldWarm() == false) {
60+
return;
61+
}
5662
mergedSegmentReplicationTracker.incrementTotalWarmInvocationsCount();
5763
mergedSegmentReplicationTracker.incrementOngoingWarms();
5864
// IndexWriter.IndexReaderWarmer#warm is called by IndexWriter#mergeMiddle. The type of leafReader should be SegmentReader.
@@ -84,4 +90,14 @@ public void warm(LeafReader leafReader) throws IOException {
8490
mergedSegmentReplicationTracker.decrementOngoingWarms();
8591
}
8692
}
93+
94+
private boolean shouldWarm() {
95+
if (mergedSegmentWarmerPressureService.isEnabled() &&
96+
mergedSegmentWarmerPressureService.shouldWarm(mergedSegmentReplicationTracker.stats()) == false) {
97+
mergedSegmentReplicationTracker.incrementTotalRejectedWarms();
98+
return false;
99+
}
100+
101+
return true;
102+
}
87103
}

server/src/main/java/org/opensearch/index/merge/MergedSegmentReplicationTracker.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public class MergedSegmentReplicationTracker extends AbstractIndexShardComponent
3030
private final CounterMetric totalBytesDownloaded = new CounterMetric();
3131
private final CounterMetric totalUploadTimeMillis = new CounterMetric();
3232
private final CounterMetric totalDownloadTimeMillis = new CounterMetric();
33+
private final CounterMetric totalRejectedWarms = new CounterMetric();
3334
private final CounterMetric ongoingWarms = new CounterMetric();
3435

3536
public MergedSegmentReplicationTracker(ShardId shardId, IndexSettings indexSettings) {
@@ -52,6 +53,10 @@ public void incrementTotalWarmFailureCount() {
5253
totalWarmFailureCount.inc();
5354
}
5455

56+
public void incrementTotalRejectedWarms() {
57+
totalRejectedWarms.inc();
58+
}
59+
5560
public void addTotalWarmTimeMillis(long time) {
5661
totalWarmTimeMillis.inc(time);
5762
}
@@ -82,6 +87,7 @@ public MergedSegmentWarmerStats stats() {
8287
totalBytesDownloaded.count(),
8388
totalUploadTimeMillis.count(),
8489
totalDownloadTimeMillis.count(),
90+
totalRejectedWarms.count(),
8591
ongoingWarms.count()
8692
);
8793
return stats;
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.merge;
10+
11+
import org.apache.logging.log4j.Logger;
12+
import org.opensearch.common.annotation.ExperimentalApi;
13+
import org.opensearch.common.logging.Loggers;
14+
import org.opensearch.index.IndexSettings;
15+
import org.opensearch.index.shard.IndexShard;
16+
17+
import java.util.List;
18+
import java.util.Locale;
19+
import java.util.function.Predicate;
20+
21+
/**
22+
* Service that applies throttling predicates to determine if merged segment warming should proceed.
23+
* Evaluates conditions like concurrency limits and applies backpressure when thresholds are exceeded.
24+
*
25+
* @opensearch.internal
26+
*/
27+
@ExperimentalApi
28+
public class MergedSegmentWarmerPressureService {
29+
30+
private final Logger logger;
31+
32+
private final PressureSettings pressureSettings;
33+
34+
private final List<ThrottlePredicate> throttlePredicates;
35+
36+
public MergedSegmentWarmerPressureService(
37+
IndexShard indexShard
38+
) {
39+
this.pressureSettings = new PressureSettings(indexShard);
40+
this.throttlePredicates = List.of(
41+
new ConcurrencyLimiterPredicate(indexShard, pressureSettings)
42+
);
43+
this.logger = Loggers.getLogger(MergedSegmentWarmerPressureService.class, indexShard.shardId());
44+
}
45+
46+
public boolean isEnabled() {
47+
return pressureSettings.isMergedSegmentWarmerPressureServiceEnabled();
48+
}
49+
50+
/**
51+
* Determines if warming should proceed by evaluating all throttle conditions.
52+
* Returns false on the first failing predicate and logs the rejection reason.
53+
*
54+
* @param stats MergedSegmentWarmerStats snapshot at the time of invocation of warm
55+
* @return true if all predicates pass, false if any predicate fails
56+
*/
57+
public boolean shouldWarm(MergedSegmentWarmerStats stats) {
58+
return throttlePredicates.stream().allMatch(throttlePredicate -> {
59+
boolean res = throttlePredicate.test(stats);
60+
if(res == false && logger.isTraceEnabled()) logger.trace(throttlePredicate.rejectionMessage(stats));
61+
return res;
62+
});
63+
}
64+
65+
/**
66+
* Abstract class to check if merged segment warm needs to be throttled.
67+
*
68+
* @opensearch.internal
69+
*/
70+
private static abstract class ThrottlePredicate implements Predicate<MergedSegmentWarmerStats> {
71+
72+
final PressureSettings pressureSettings;
73+
final IndexShard indexShard;
74+
75+
private ThrottlePredicate(IndexShard indexShard, PressureSettings pressureSettings) {
76+
this.pressureSettings = pressureSettings;
77+
this.indexShard = indexShard;
78+
}
79+
80+
/**
81+
* Returns the name of the predicate.
82+
*
83+
* @return the name using class name.
84+
*/
85+
abstract String name();
86+
87+
String rejectionMessage(MergedSegmentWarmerStats statsSnapshot) {
88+
return String.format(Locale.ROOT,
89+
"Merged segment warm rejected for shard [%s] by predicate: %s ",
90+
indexShard.shardId(),
91+
name()
92+
);
93+
}
94+
}
95+
96+
private static class ConcurrencyLimiterPredicate extends ThrottlePredicate{
97+
private final String NAME = "Concurrency limiter predicate for merged segment warmer throttling";
98+
99+
private ConcurrencyLimiterPredicate(IndexShard indexShard, PressureSettings pressureSettings) {
100+
super(indexShard, pressureSettings);
101+
}
102+
103+
private long calculateMaxAllowedConcurrentWarms(int maxConcurrentMerges) {
104+
return (long) (pressureSettings.getMaxConcurrentWarmsFactor() * maxConcurrentMerges);
105+
}
106+
107+
@Override
108+
String name() {
109+
return NAME;
110+
}
111+
112+
@Override
113+
String rejectionMessage(MergedSegmentWarmerStats stats) {
114+
long maxAllowed = calculateMaxAllowedConcurrentWarms(indexShard.getMaxMergesAllowed());
115+
return super.rejectionMessage(stats) +
116+
String.format("\nCurrent ongoing warms: %d, max allowed: %d",
117+
stats.getOngoingWarms(),
118+
maxAllowed
119+
);
120+
}
121+
122+
@Override
123+
public boolean test(MergedSegmentWarmerStats statsSnapshot) {
124+
long onGoingWarms = statsSnapshot.getOngoingWarms();
125+
long maxAllowedWarms = calculateMaxAllowedConcurrentWarms(indexShard.getMaxMergesAllowed());
126+
127+
return maxAllowedWarms > onGoingWarms;
128+
}
129+
}
130+
131+
132+
/**
133+
* Settings related to back pressure for MergedSegmentWarmer throttling.
134+
*
135+
* @opensearch.internal
136+
*/
137+
private static class PressureSettings {
138+
IndexShard indexShard;
139+
140+
PressureSettings(IndexShard indexShard) {
141+
this.indexShard = indexShard;
142+
}
143+
144+
private IndexSettings indexSettings() {
145+
return indexShard.indexSettings();
146+
}
147+
148+
boolean isMergedSegmentWarmerPressureServiceEnabled() {
149+
return indexSettings().isMergedSegmentWarmerPressureEnabled();
150+
}
151+
152+
public double getMaxConcurrentWarmsFactor() {
153+
return indexSettings().getMaxConcurrentMergedSegmentWarmsFactor();
154+
}
155+
}
156+
}

0 commit comments

Comments
 (0)