Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add overload constructor for Translog to accept Channel Factory as a parameter ([#18918](https://github.com/opensearch-project/OpenSearch/pull/18918))
- Add subdirectory-aware store module with recovery support ([#19132](https://github.com/opensearch-project/OpenSearch/pull/19132))
- Add a dynamic cluster setting to control the enablement of the merged segment warmer ([#18929](https://github.com/opensearch-project/OpenSearch/pull/18929))
- Add metrics for the merged segment warmer feature ([#18929](https://github.com/opensearch-project/OpenSearch/pull/18929))

### Changed
- Add CompletionStage variants to methods in the Client Interface and default to ActionListener impl ([#18998](https://github.com/opensearch-project/OpenSearch/pull/18998))
- IllegalArgumentException when scroll ID references a node not found in Cluster ([#19031](https://github.com/opensearch-project/OpenSearch/pull/19031))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,4 +358,8 @@ static <Response> void completeWith(ActionListener<Response> listener, CheckedSu
throw ex;
}
}

static <T> ActionListener<T> noOp() {
return ActionListener.wrap(response -> {}, exception -> {});
}
}

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.opensearch.index.flush.FlushStats;
import org.opensearch.index.get.GetStats;
import org.opensearch.index.merge.MergeStats;
import org.opensearch.index.merge.MergedSegmentWarmerStats;
import org.opensearch.index.recovery.RecoveryStats;
import org.opensearch.index.refresh.RefreshStats;
import org.opensearch.index.search.stats.SearchStats;
Expand Down Expand Up @@ -92,6 +93,9 @@ public class CommonStats implements Writeable, ToXContentFragment {
@Nullable
public MergeStats merge;

@Nullable
public MergedSegmentWarmerStats mergedSegmentWarmerStats;

@Nullable
public RefreshStats refresh;

Expand Down Expand Up @@ -179,6 +183,9 @@ public CommonStats(CommonStatsFlags flags) {
case Recovery:
recoveryStats = new RecoveryStats();
break;
case MergedSegmentWarmer:
mergedSegmentWarmerStats = new MergedSegmentWarmerStats();
break;
default:
throw new IllegalStateException("Unknown Flag: " + flag);
}
Expand Down Expand Up @@ -238,6 +245,9 @@ public CommonStats(IndicesQueryCache indicesQueryCache, IndexShard indexShard, C
case Recovery:
recoveryStats = indexShard.recoveryStats();
break;
case MergedSegmentWarmer:
mergedSegmentWarmerStats = indexShard.mergedSegmentWarmerStats();
break;
default:
throw new IllegalStateException("Unknown Flag: " + flag);
}
Expand All @@ -264,6 +274,7 @@ public CommonStats(StreamInput in) throws IOException {
translog = in.readOptionalWriteable(TranslogStats::new);
requestCache = in.readOptionalWriteable(RequestCacheStats::new);
recoveryStats = in.readOptionalWriteable(RecoveryStats::new);
mergedSegmentWarmerStats = in.readOptionalWriteable(MergedSegmentWarmerStats::new);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add version checks here. Like this: https://github.com/opensearch-project/OpenSearch/blob/main/server/src/main/java/org/opensearch/action/admin/indices/stats/ShardStats.java#L168-L170

The payload sent from the data node running lower version would not have this field, without version check based ser-de, the stats API would break during version upgrades

}

@Override
Expand All @@ -284,6 +295,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(translog);
out.writeOptionalWriteable(requestCache);
out.writeOptionalWriteable(recoveryStats);
out.writeOptionalWriteable(mergedSegmentWarmerStats);
}

public void add(CommonStats stats) {
Expand Down Expand Up @@ -416,6 +428,14 @@ public void add(CommonStats stats) {
} else {
recoveryStats.add(stats.getRecoveryStats());
}
if (mergedSegmentWarmerStats == null) {
if (stats.getMergedSegmentWarmer() != null) {
mergedSegmentWarmerStats = new MergedSegmentWarmerStats();
mergedSegmentWarmerStats.add(stats.getMergedSegmentWarmer());
}
} else {
mergedSegmentWarmerStats.add(stats.getMergedSegmentWarmer());
}
}

@Nullable
Expand Down Expand Up @@ -498,6 +518,11 @@ public RecoveryStats getRecoveryStats() {
return recoveryStats;
}

@Nullable
public MergedSegmentWarmerStats getMergedSegmentWarmer() {
return mergedSegmentWarmerStats;
}

/**
* Utility method which computes total memory by adding
* FieldData, PercolatorCache, Segments (index writer, version map)
Expand Down Expand Up @@ -537,7 +562,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
segments,
translog,
requestCache,
recoveryStats }
recoveryStats,
mergedSegmentWarmerStats }
).filter(Objects::nonNull);
for (ToXContent toXContent : ((Iterable<ToXContent>) stream::iterator)) {
toXContent.toXContent(builder, params);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,8 @@ public enum Flag {
Translog("translog", 13),
// 14 was previously used for Suggest
RequestCache("request_cache", 15),
Recovery("recovery", 16);
Recovery("recovery", 16),
MergedSegmentWarmer("merged_segment_warmer", 17);

private final String restName;
private final int index;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,10 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.INDEX_DERIVED_SOURCE_SETTING,
IndexSettings.INDEX_DERIVED_SOURCE_TRANSLOG_ENABLED_SETTING,

// Settings for merged segment warmer backpressure
IndexSettings.INDEX_MERGED_SEGMENT_WARMER_PRESSURE_ENABLED,
IndexSettings.INDEX_MERGED_SEGMENT_WARMER_MAX_CONCURRENT_WARMS_FACTOR,

// validate that built-in similarities don't get redefined
Setting.groupSetting("index.similarity.", (s) -> {
Map<String, Settings> groups = s.getAsGroups();
Expand Down
41 changes: 41 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,22 @@ public static IndexMergePolicy fromString(String text) {
Property.IndexScope
);

public static final Setting<Boolean> INDEX_MERGED_SEGMENT_WARMER_PRESSURE_ENABLED = Setting.boolSetting(
"index.merged_segment_warmer.pressure.enabled",
true,
Setting.Property.Dynamic,
Setting.Property.IndexScope
);

public static final Setting<Double> INDEX_MERGED_SEGMENT_WARMER_MAX_CONCURRENT_WARMS_FACTOR = Setting.doubleSetting(
"index.merged_segment_warmer.max_concurrent_warms_factor",
0.5d,
0,
1,
Setting.Property.Dynamic,
Setting.Property.IndexScope
);

public static final Setting<Boolean> INDEX_DERIVED_SOURCE_SETTING = Setting.boolSetting(
"index.derived_source.enabled",
false,
Expand Down Expand Up @@ -863,6 +879,8 @@ public static IndexMergePolicy fromString(String text) {
private volatile boolean allowDerivedField;
private final boolean derivedSourceEnabled;
private volatile boolean derivedSourceEnabledForTranslog;
private volatile double maxConcurrentMergedSegmentWarmsFactor;
private volatile boolean mergedSegmentWarmerPressureEnabled;

/**
* The maximum age of a retention lease before it is considered expired.
Expand Down Expand Up @@ -1103,6 +1121,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
derivedSourceEnabled = scopedSettings.get(INDEX_DERIVED_SOURCE_SETTING);
derivedSourceEnabledForTranslog = scopedSettings.get(INDEX_DERIVED_SOURCE_TRANSLOG_ENABLED_SETTING);
scopedSettings.addSettingsUpdateConsumer(INDEX_DERIVED_SOURCE_TRANSLOG_ENABLED_SETTING, this::setDerivedSourceEnabledForTranslog);
this.mergedSegmentWarmerPressureEnabled = scopedSettings.get(INDEX_MERGED_SEGMENT_WARMER_PRESSURE_ENABLED);
this.maxConcurrentMergedSegmentWarmsFactor = scopedSettings.get(INDEX_MERGED_SEGMENT_WARMER_MAX_CONCURRENT_WARMS_FACTOR);
/* There was unintentional breaking change got introduced with [OpenSearch-6424](https://github.com/opensearch-project/OpenSearch/pull/6424) (version 2.7).
* For indices created prior version (prior to 2.7) which has IndexSort type, they used to type cast the SortField.Type
* to higher bytes size like integer to long. This behavior was changed from OpenSearch 2.7 version not to
Expand Down Expand Up @@ -1248,6 +1268,27 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
this::setRemoteStoreTranslogRepository
);
scopedSettings.addSettingsUpdateConsumer(StarTreeIndexSettings.STAR_TREE_SEARCH_ENABLED_SETTING, this::setStarTreeIndexEnabled);
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGED_SEGMENT_WARMER_PRESSURE_ENABLED, this::setMergedSegmentWarmerPressureEnabled);
scopedSettings.addSettingsUpdateConsumer(
INDEX_MERGED_SEGMENT_WARMER_MAX_CONCURRENT_WARMS_FACTOR,
this::setMaxConcurrentMergedSegmentWarmsFactor
);
}

private void setMergedSegmentWarmerPressureEnabled(Boolean value) {
this.mergedSegmentWarmerPressureEnabled = value;
}

public boolean isMergedSegmentWarmerPressureEnabled() {
return this.mergedSegmentWarmerPressureEnabled;
}

private void setMaxConcurrentMergedSegmentWarmsFactor(Double value) {
this.maxConcurrentMergedSegmentWarmsFactor = value;
}

public double getMaxConcurrentMergedSegmentWarmsFactor() {
return this.maxConcurrentMergedSegmentWarmsFactor;
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.opensearch.index.mapper.ParsedDocument;
import org.opensearch.index.mapper.SeqNoFieldMapper;
import org.opensearch.index.merge.MergeStats;
import org.opensearch.index.merge.MergedSegmentTransferTracker;
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.DocsStats;
Expand Down Expand Up @@ -213,9 +214,17 @@ public MergeStats getMergeStats() {
return new MergeStats();
}

public MergedSegmentTransferTracker getMergedSegmentTransferTracker() {
return engineConfig.getMergedSegmentTransferTracker();
}

/** returns the history uuid for the engine */
public abstract String getHistoryUUID();

public int getMaxMergesCount() {
return 0;
}

/**
* Reads the current stored history ID from commit data.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.opensearch.index.codec.CodecSettings;
import org.opensearch.index.mapper.DocumentMapperForType;
import org.opensearch.index.mapper.ParsedDocument;
import org.opensearch.index.merge.MergedSegmentTransferTracker;
import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.InternalTranslogFactory;
Expand Down Expand Up @@ -115,6 +116,7 @@ public final class EngineConfig {
private final Comparator<LeafReader> leafSorter;
private final Supplier<DocumentMapperForType> documentMapperForTypeSupplier;
private final ClusterApplierService clusterApplierService;
private final MergedSegmentTransferTracker mergedSegmentTransferTracker;

/**
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
Expand Down Expand Up @@ -306,6 +308,7 @@ private EngineConfig(Builder builder) {
this.documentMapperForTypeSupplier = builder.documentMapperForTypeSupplier;
this.indexReaderWarmer = builder.indexReaderWarmer;
this.clusterApplierService = builder.clusterApplierService;
this.mergedSegmentTransferTracker = builder.mergedSegmentTransferTracker;
}

/**
Expand Down Expand Up @@ -625,6 +628,13 @@ public ClusterApplierService getClusterApplierService() {
return this.clusterApplierService;
}

/**
* Returns the MergedSegmentTransferTracker instance.
*/
public MergedSegmentTransferTracker getMergedSegmentTransferTracker() {
return this.mergedSegmentTransferTracker;
}

/**
* Builder for EngineConfig class
*
Expand Down Expand Up @@ -662,6 +672,7 @@ public static class Builder {
Comparator<LeafReader> leafSorter;
private IndexWriter.IndexReaderWarmer indexReaderWarmer;
private ClusterApplierService clusterApplierService;
private MergedSegmentTransferTracker mergedSegmentTransferTracker;

public Builder shardId(ShardId shardId) {
this.shardId = shardId;
Expand Down Expand Up @@ -813,6 +824,11 @@ public Builder clusterApplierService(ClusterApplierService clusterApplierService
return this;
}

public Builder mergedSegmentTransferTracker(MergedSegmentTransferTracker mergedSegmentTransferTracker) {
this.mergedSegmentTransferTracker = mergedSegmentTransferTracker;
return this;
}

public EngineConfig build() {
return new EngineConfig(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.opensearch.index.codec.CodecServiceFactory;
import org.opensearch.index.mapper.DocumentMapperForType;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.merge.MergedSegmentTransferTracker;
import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.TranslogConfig;
Expand Down Expand Up @@ -160,7 +161,8 @@ public EngineConfig newEngineConfig(
Comparator<LeafReader> leafSorter,
Supplier<DocumentMapperForType> documentMapperForTypeSupplier,
IndexWriter.IndexReaderWarmer indexReaderWarmer,
ClusterApplierService clusterApplierService
ClusterApplierService clusterApplierService,
MergedSegmentTransferTracker mergedSegmentTransferTracker
) {
CodecService codecServiceToUse = codecService;
if (codecService == null && this.codecServiceFactory != null) {
Expand Down Expand Up @@ -197,6 +199,7 @@ public EngineConfig newEngineConfig(
.documentMapperForTypeSupplier(documentMapperForTypeSupplier)
.indexReaderWarmer(indexReaderWarmer)
.clusterApplierService(clusterApplierService)
.mergedSegmentTransferTracker(mergedSegmentTransferTracker)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import org.opensearch.index.mapper.SourceFieldMapper;
import org.opensearch.index.mapper.Uid;
import org.opensearch.index.merge.MergeStats;
import org.opensearch.index.merge.MergedSegmentTransferTracker;
import org.opensearch.index.merge.OnGoingMerge;
import org.opensearch.index.seqno.LocalCheckpointTracker;
import org.opensearch.index.seqno.SeqNoStats;
Expand Down Expand Up @@ -258,7 +259,11 @@ public TranslogManager translogManager() {
boolean success = false;
try {
this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().relativeTimeInMillis();
mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
mergeScheduler = scheduler = new EngineMergeScheduler(
engineConfig.getShardId(),
engineConfig.getIndexSettings(),
getMergedSegmentTransferTracker()
);
throttle = new IndexThrottle();
try {
store.trimUnsafeCommits(engineConfig.getTranslogConfig().getTranslogPath());
Expand Down Expand Up @@ -2475,8 +2480,8 @@ private final class EngineMergeScheduler extends OpenSearchConcurrentMergeSchedu
private final AtomicInteger numMergesInFlight = new AtomicInteger(0);
private final AtomicBoolean isThrottling = new AtomicBoolean();

EngineMergeScheduler(ShardId shardId, IndexSettings indexSettings) {
super(shardId, indexSettings);
EngineMergeScheduler(ShardId shardId, IndexSettings indexSettings, MergedSegmentTransferTracker mergedSegmentTransferTracker) {
super(shardId, indexSettings, mergedSegmentTransferTracker);
}

@Override
Expand Down Expand Up @@ -2630,6 +2635,10 @@ public MergeStats getMergeStats() {
return mergeScheduler.stats();
}

public int getMaxMergesCount() {
return mergeScheduler.getMaxMergeCount();
}

LocalCheckpointTracker getLocalCheckpointTracker() {
return localCheckpointTracker;
}
Expand Down
Loading
Loading