Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Implement GRPC Search params `Highlight`and `Sort` ([#19868](https://github.com/opensearch-project/OpenSearch/pull/19868))
- Implement GRPC ConstantScoreQuery, FuzzyQuery, MatchBoolPrefixQuery, MatchPhrasePrefix, PrefixQuery, MatchQuery ([#19854](https://github.com/opensearch-project/OpenSearch/pull/19854))
- Add async periodic flush task support for pull-based ingestion ([#19878](https://github.com/opensearch-project/OpenSearch/pull/19878))
- Add support for context aware segments ([#19098](https://github.com/opensearch-project/OpenSearch/pull/19098))

### Changed
- Faster `terms` query creation for `keyword` field with index and docValues enabled ([#19350](https://github.com/opensearch-project/OpenSearch/pull/19350))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.plugins.IndexStorePlugin;

import java.io.IOException;
import java.nio.file.FileVisitResult;
Expand Down Expand Up @@ -79,6 +80,37 @@ public SubdirectoryAwareStore(
super(shardId, indexSettings, new SubdirectoryAwareDirectory(directory, shardPath), shardLock, onClose, shardPath);
}

/**
* Constructor for SubdirectoryAwareStore.
*
* @param shardId the shard ID
* @param indexSettings the index settings
* @param directory the directory to use for the store
* @param shardLock the shard lock
* @param onClose the on close callback
* @param shardPath the shard path
* @param directoryFactory the directory factory
*/
public SubdirectoryAwareStore(
ShardId shardId,
IndexSettings indexSettings,
Directory directory,
ShardLock shardLock,
OnClose onClose,
ShardPath shardPath,
IndexStorePlugin.DirectoryFactory directoryFactory
) {
super(
shardId,
indexSettings,
new SubdirectoryAwareDirectory(directory, shardPath),
shardLock,
onClose,
shardPath,
directoryFactory
);
}

@Override
public MetadataSnapshot getMetadata(IndexCommit commit) throws IOException {
long totalNumDocs = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,31 @@ public Store newStore(
) {
return new SubdirectoryAwareStore(shardId, indexSettings, directory, shardLock, onClose, shardPath);
}

/**
* Creates a new {@link SubdirectoryAwareStore} instance.
*
* @param shardId the shard identifier
* @param indexSettings the index settings
* @param directory the underlying Lucene directory
* @param shardLock the shard lock
* @param onClose callback to execute when the store is closed
* @param shardPath the path information for the shard
* @param directoryFactory the directory factory to create child level directory.
* Used for Context Aware Segments enabled indices.
* @return a new SubdirectoryAwareStore instance
*/
@Override
public Store newStore(
ShardId shardId,
IndexSettings indexSettings,
Directory directory,
ShardLock shardLock,
Store.OnClose onClose,
ShardPath shardPath,
DirectoryFactory directoryFactory
) {
return new SubdirectoryAwareStore(shardId, indexSettings, directory, shardLock, onClose, shardPath, directoryFactory);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.FsDirectoryFactory;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreStats;
import org.opensearch.plugins.IndexStorePlugin;
Expand Down Expand Up @@ -67,7 +68,8 @@ public void testStats() throws IOException {
SubdirectoryStorePluginTests.newFSDirectory(path.resolve("index")),
new DummyShardLock(shardId),
Store.OnClose.EMPTY,
new ShardPath(false, path, path, shardId)
new ShardPath(false, path, path, shardId),
new FsDirectoryFactory()
);

long initialStoreSize = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
public final class SmbMmapFsDirectoryFactory extends FsDirectoryFactory {

@Override
protected Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
public Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
return new SmbDirectoryWrapper(
setPreload(
new MMapDirectory(location, lockFactory),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
public final class SmbNIOFsDirectoryFactory extends FsDirectoryFactory {

@Override
protected Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
public Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
return new SmbDirectoryWrapper(new NIOFSDirectory(location, lockFactory));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.opensearch.Version.V_2_7_0;
import static org.opensearch.Version.V_3_0_0;
import static org.opensearch.Version.V_3_2_0;
import static org.opensearch.Version.V_3_3_0;

/**
* Utility class to register server exceptions
Expand Down Expand Up @@ -1241,5 +1242,13 @@ public static void registerExceptions() {
V_3_2_0
)
);
registerExceptionHandle(
new OpenSearchExceptionHandle(
org.opensearch.index.engine.LookupMapLockAcquisitionException.class,
org.opensearch.index.engine.LookupMapLockAcquisitionException::new,
CUSTOM_ELASTICSEARCH_EXCEPTIONS_BASE_ID + 2,
V_3_3_0
)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.opensearch.index.IndexingPressureService;
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.LookupMapLockAcquisitionException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.index.get.GetResult;
import org.opensearch.index.mapper.MapperException;
Expand Down Expand Up @@ -727,7 +728,15 @@ && isConflictException(executionResult.getFailure().getCause())
&& context.getRetryCounter() < ((UpdateRequest) docWriteRequest).retryOnConflict()) {
context.resetForExecutionForRetry();
return;
}
} else if (isFailed
&& context.getPrimary() != null
&& context.getPrimary().indexSettings() != null
&& context.getPrimary().indexSettings().isContextAwareEnabled()
&& isLookupMapLockAcquisitionException(executionResult.getFailure().getCause())
&& context.getRetryCounter() < context.getPrimary().indexSettings().getMaxRetryOnLookupMapAcquisitionException()) {
context.resetForExecutionForRetry();
return;
}
final BulkItemResponse response;
if (isUpdate) {
response = processUpdateResponse((UpdateRequest) docWriteRequest, context.getConcreteIndex(), executionResult, updateResult);
Expand Down Expand Up @@ -756,6 +765,10 @@ private static boolean isConflictException(final Exception e) {
return ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException;
}

private static boolean isLookupMapLockAcquisitionException(final Exception e) {
return ExceptionsHelper.unwrapCause(e) instanceof LookupMapLockAcquisitionException;
}

/**
* Creates a new bulk item result from the given requests and result of performing the update operation on the shard.
*/
Expand Down
16 changes: 15 additions & 1 deletion server/src/main/java/org/opensearch/common/lucene/Lucene.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.index.analysis.AnalyzerScope;
import org.opensearch.index.analysis.NamedAnalyzer;
import org.opensearch.index.codec.CriteriaBasedCodec;
import org.opensearch.index.fielddata.IndexFieldData;
import org.opensearch.index.fielddata.plain.NonPruningSortedSetOrdinalsIndexFieldData.NonPruningSortField;
import org.opensearch.search.sort.SortedWiderNumericSortField;
Expand Down Expand Up @@ -939,10 +940,23 @@ public LeafReader wrap(LeafReader leaf) {
// Two scenarios that we have hard-deletes: (1) from old segments where soft-deletes was disabled,
// (2) when IndexWriter hits non-aborted exceptions. These two cases, IW flushes SegmentInfos
// before exposing the hard-deletes, thus we can use the hard-delete count of SegmentInfos.
final int numDocs = segmentReader.maxDoc() - segmentReader.getSegmentInfo().getDelCount();

// With CAS enabled segments, hard deletes can also be present, so correcting numDocs.
// We are using attribute value here to identify whether segment has CAS enabled or not.
int numDocs;
if (isContextAwareEnabled(segmentReader)) {
numDocs = popCount(hardLiveDocs);
} else {
numDocs = segmentReader.maxDoc() - segmentReader.getSegmentInfo().getDelCount();
}

assert numDocs == popCount(hardLiveDocs) : numDocs + " != " + popCount(hardLiveDocs);
return new LeafReaderWithLiveDocs(segmentReader, hardLiveDocs, numDocs);
}

private boolean isContextAwareEnabled(SegmentReader reader) {
return reader.getSegmentInfo().info.getAttribute(CriteriaBasedCodec.BUCKET_NAME) != null;
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
ShardsLimitAllocationDecider.INDEX_TOTAL_REMOTE_CAPABLE_PRIMARY_SHARDS_PER_NODE_SETTING,
IndexSettings.INDEX_GC_DELETES_SETTING,
IndexSettings.INDEX_SOFT_DELETES_SETTING,
IndexSettings.INDEX_CONTEXT_AWARE_ENABLED_SETTING,
IndexSettings.INDEX_MAX_RETRY_ON_LOOKUP_MAP_LOCK_ACQUISITION_EXCEPTION,
IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING,
IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING,
IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ public class FeatureFlags {
*/
public static final String REMOTE_STORE_MIGRATION_EXPERIMENTAL = FEATURE_FLAG_PREFIX + "remote_store.migration.enabled";

/**
* Gates the visibility of the context aware segments.
*/
public static final String CONTEXT_AWARE_MIGRATION_EXPERIMENTAL_FLAG = FEATURE_FLAG_PREFIX + "context_aware.migration.enabled";

/**
* Gates the functionality of extensions.
* Once the feature is ready for production release, this feature flag can be removed.
Expand Down Expand Up @@ -69,6 +74,12 @@ public class FeatureFlags {
Property.NodeScope
);

public static final Setting<Boolean> CONTEXT_AWARE_MIGRATION_EXPERIMENTAL_SETTING = Setting.boolSetting(
CONTEXT_AWARE_MIGRATION_EXPERIMENTAL_FLAG,
false,
Property.NodeScope
);

public static final Setting<Boolean> EXTENSIONS_SETTING = Setting.boolSetting(EXTENSIONS, false, Property.NodeScope);

public static final Setting<Boolean> TELEMETRY_SETTING = Setting.boolSetting(TELEMETRY, false, Property.NodeScope);
Expand Down Expand Up @@ -133,6 +144,7 @@ static class FeatureFlagsImpl {
put(TERM_VERSION_PRECOMMIT_ENABLE_SETTING, TERM_VERSION_PRECOMMIT_ENABLE_SETTING.getDefault(Settings.EMPTY));
put(ARROW_STREAMS_SETTING, ARROW_STREAMS_SETTING.getDefault(Settings.EMPTY));
put(STREAM_TRANSPORT_SETTING, STREAM_TRANSPORT_SETTING.getDefault(Settings.EMPTY));
put(CONTEXT_AWARE_MIGRATION_EXPERIMENTAL_SETTING, CONTEXT_AWARE_MIGRATION_EXPERIMENTAL_SETTING.getDefault(Settings.EMPTY));
}
};

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index;

import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;

import java.io.IOException;
import java.util.Arrays;

/**
* Directory wrapper used to filter out child level directory for context aware enabled indices.
*
*/
public class BucketedCompositeDirectory extends FilterDirectory {

public static final String CHILD_DIRECTORY_PREFIX = "temp_";

protected BucketedCompositeDirectory(Directory in) {
super(in);
}

/**
* List all files within directory filtering out child level directory.
* @return files excluding child level directory.
*
* @throws IOException in case of I/O error
*/
@Override
public String[] listAll() throws IOException {
return Arrays.stream(super.listAll())
.filter(fileName -> !fileName.startsWith(CHILD_DIRECTORY_PREFIX))
.distinct()
.toArray(String[]::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index;

import org.apache.lucene.index.FilterMergePolicy;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.MergeTrigger;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.opensearch.index.codec.CriteriaBasedCodec;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Wrapper merge policy which is used for context aware enabled indices. This merge policy merges segments that belongs
* to same bucket.
*
*/
public class CriteriaBasedMergePolicy extends FilterMergePolicy {

protected final MergePolicy in;

public CriteriaBasedMergePolicy(MergePolicy in) {
super(in);
this.in = in;
}

/**
* Merges the segments belonging to same group.
*
* @param mergeTrigger the event that triggered the merge
* @param infos the total set of segments in the index
* @param mergeContext the IndexWriter to find the merges on
* @return
* @throws IOException
*/
@Override
public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos infos, MergeContext mergeContext) throws IOException {
final Set<SegmentCommitInfo> merging = mergeContext.getMergingSegments();
MergeSpecification spec = null;
final Map<String, List<SegmentCommitInfo>> commitInfos = new HashMap<>();
for (SegmentCommitInfo si : infos) {
if (merging.contains(si)) {
continue;
}

final String dwptGroupNumber = si.info.getAttribute(CriteriaBasedCodec.BUCKET_NAME);
commitInfos.computeIfAbsent(dwptGroupNumber, k -> new ArrayList<>()).add(si);
}

for (String dwptGroupNumber : commitInfos.keySet()) {
if (commitInfos.get(dwptGroupNumber).size() > 1) {
final SegmentInfos newSIS = new SegmentInfos(infos.getIndexCreatedVersionMajor());
for (SegmentCommitInfo info : commitInfos.get(dwptGroupNumber)) {
newSIS.add(info);
}

final MergeSpecification tieredMergePolicySpec = in.findMerges(mergeTrigger, newSIS, mergeContext);
if (tieredMergePolicySpec != null) {
if (spec == null) {
spec = new MergeSpecification();
}

spec.merges.addAll(tieredMergePolicySpec.merges);
}
}
}

return spec;
}
}
Loading
Loading