Skip to content

Commit 1e7b824

Browse files
committed
Adding support for context aware segments
Signed-off-by: RS146BIJAY <[email protected]>
1 parent 9ff84a8 commit 1e7b824

File tree

51 files changed

+4426
-219
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+4426
-219
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2323
- Implement GRPC Search params `Highlight`and `Sort` ([#19868](https://github.com/opensearch-project/OpenSearch/pull/19868))
2424
- Implement GRPC ConstantScoreQuery, FuzzyQuery, MatchBoolPrefixQuery, MatchPhrasePrefix, PrefixQuery, MatchQuery ([#19854](https://github.com/opensearch-project/OpenSearch/pull/19854))
2525
- Add async periodic flush task support for pull-based ingestion ([#19878](https://github.com/opensearch-project/OpenSearch/pull/19878))
26+
- Add support for context aware segments ([#19098](https://github.com/opensearch-project/OpenSearch/pull/19098))
2627

2728
### Changed
2829
- Faster `terms` query creation for `keyword` field with index and docValues enabled ([#19350](https://github.com/opensearch-project/OpenSearch/pull/19350))

modules/store-subdirectory/src/main/java/org/opensearch/plugin/store/subdirectory/SubdirectoryAwareStore.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.opensearch.index.shard.ShardPath;
2727
import org.opensearch.index.store.Store;
2828
import org.opensearch.index.store.StoreFileMetadata;
29+
import org.opensearch.plugins.IndexStorePlugin;
2930

3031
import java.io.IOException;
3132
import java.nio.file.FileVisitResult;
@@ -79,6 +80,37 @@ public SubdirectoryAwareStore(
7980
super(shardId, indexSettings, new SubdirectoryAwareDirectory(directory, shardPath), shardLock, onClose, shardPath);
8081
}
8182

83+
/**
84+
* Constructor for SubdirectoryAwareStore.
85+
*
86+
* @param shardId the shard ID
87+
* @param indexSettings the index settings
88+
* @param directory the directory to use for the store
89+
* @param shardLock the shard lock
90+
* @param onClose the on close callback
91+
* @param shardPath the shard path
92+
* @param directoryFactory the directory factory
93+
*/
94+
public SubdirectoryAwareStore(
95+
ShardId shardId,
96+
IndexSettings indexSettings,
97+
Directory directory,
98+
ShardLock shardLock,
99+
OnClose onClose,
100+
ShardPath shardPath,
101+
IndexStorePlugin.DirectoryFactory directoryFactory
102+
) {
103+
super(
104+
shardId,
105+
indexSettings,
106+
new SubdirectoryAwareDirectory(directory, shardPath),
107+
shardLock,
108+
onClose,
109+
shardPath,
110+
directoryFactory
111+
);
112+
}
113+
82114
@Override
83115
public MetadataSnapshot getMetadata(IndexCommit commit) throws IOException {
84116
long totalNumDocs = 0;

modules/store-subdirectory/src/main/java/org/opensearch/plugin/store/subdirectory/SubdirectoryStorePlugin.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,5 +81,31 @@ public Store newStore(
8181
) {
8282
return new SubdirectoryAwareStore(shardId, indexSettings, directory, shardLock, onClose, shardPath);
8383
}
84+
85+
/**
86+
* Creates a new {@link SubdirectoryAwareStore} instance.
87+
*
88+
* @param shardId the shard identifier
89+
* @param indexSettings the index settings
90+
* @param directory the underlying Lucene directory
91+
* @param shardLock the shard lock
92+
* @param onClose callback to execute when the store is closed
93+
* @param shardPath the path information for the shard
94+
* @param directoryFactory the directory factory to create child level directory.
95+
* Used for Context Aware Segments enabled indices.
96+
* @return a new SubdirectoryAwareStore instance
97+
*/
98+
@Override
99+
public Store newStore(
100+
ShardId shardId,
101+
IndexSettings indexSettings,
102+
Directory directory,
103+
ShardLock shardLock,
104+
Store.OnClose onClose,
105+
ShardPath shardPath,
106+
DirectoryFactory directoryFactory
107+
) {
108+
return new SubdirectoryAwareStore(shardId, indexSettings, directory, shardLock, onClose, shardPath, directoryFactory);
109+
}
84110
}
85111
}

modules/store-subdirectory/src/test/java/org/opensearch/plugin/store/subdirectory/SubdirectoryStorePluginTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.opensearch.common.util.io.IOUtils;
2121
import org.opensearch.core.index.shard.ShardId;
2222
import org.opensearch.index.shard.ShardPath;
23+
import org.opensearch.index.store.FsDirectoryFactory;
2324
import org.opensearch.index.store.Store;
2425
import org.opensearch.index.store.StoreStats;
2526
import org.opensearch.plugins.IndexStorePlugin;
@@ -67,7 +68,8 @@ public void testStats() throws IOException {
6768
SubdirectoryStorePluginTests.newFSDirectory(path.resolve("index")),
6869
new DummyShardLock(shardId),
6970
Store.OnClose.EMPTY,
70-
new ShardPath(false, path, path, shardId)
71+
new ShardPath(false, path, path, shardId),
72+
new FsDirectoryFactory()
7173
);
7274

7375
long initialStoreSize = 0;

plugins/store-smb/src/main/java/org/opensearch/index/store/smbmmapfs/SmbMmapFsDirectoryFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
public final class SmbMmapFsDirectoryFactory extends FsDirectoryFactory {
4848

4949
@Override
50-
protected Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
50+
public Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
5151
return new SmbDirectoryWrapper(
5252
setPreload(
5353
new MMapDirectory(location, lockFactory),

plugins/store-smb/src/main/java/org/opensearch/index/store/smbniofs/SmbNIOFsDirectoryFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
public final class SmbNIOFsDirectoryFactory extends FsDirectoryFactory {
2525

2626
@Override
27-
protected Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
27+
public Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
2828
return new SmbDirectoryWrapper(new NIOFSDirectory(location, lockFactory));
2929
}
3030
}

server/src/main/java/org/opensearch/OpenSearchServerException.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static org.opensearch.Version.V_2_7_0;
2525
import static org.opensearch.Version.V_3_0_0;
2626
import static org.opensearch.Version.V_3_2_0;
27+
import static org.opensearch.Version.V_3_3_0;
2728

2829
/**
2930
* Utility class to register server exceptions
@@ -1241,5 +1242,13 @@ public static void registerExceptions() {
12411242
V_3_2_0
12421243
)
12431244
);
1245+
registerExceptionHandle(
1246+
new OpenSearchExceptionHandle(
1247+
org.opensearch.index.engine.LookupMapLockAcquisitionException.class,
1248+
org.opensearch.index.engine.LookupMapLockAcquisitionException::new,
1249+
CUSTOM_ELASTICSEARCH_EXCEPTIONS_BASE_ID + 2,
1250+
V_3_3_0
1251+
)
1252+
);
12441253
}
12451254
}

server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
import org.opensearch.index.IndexingPressureService;
8888
import org.opensearch.index.SegmentReplicationPressureService;
8989
import org.opensearch.index.engine.Engine;
90+
import org.opensearch.index.engine.LookupMapLockAcquisitionException;
9091
import org.opensearch.index.engine.VersionConflictEngineException;
9192
import org.opensearch.index.get.GetResult;
9293
import org.opensearch.index.mapper.MapperException;
@@ -727,7 +728,15 @@ && isConflictException(executionResult.getFailure().getCause())
727728
&& context.getRetryCounter() < ((UpdateRequest) docWriteRequest).retryOnConflict()) {
728729
context.resetForExecutionForRetry();
729730
return;
730-
}
731+
} else if (isFailed
732+
&& context.getPrimary() != null
733+
&& context.getPrimary().indexSettings() != null
734+
&& context.getPrimary().indexSettings().isContextAwareEnabled()
735+
&& isLookupMapLockAcquisitionException(executionResult.getFailure().getCause())
736+
&& context.getRetryCounter() < context.getPrimary().indexSettings().getMaxRetryOnLookupMapAcquisitionException()) {
737+
context.resetForExecutionForRetry();
738+
return;
739+
}
731740
final BulkItemResponse response;
732741
if (isUpdate) {
733742
response = processUpdateResponse((UpdateRequest) docWriteRequest, context.getConcreteIndex(), executionResult, updateResult);
@@ -756,6 +765,10 @@ private static boolean isConflictException(final Exception e) {
756765
return ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException;
757766
}
758767

768+
private static boolean isLookupMapLockAcquisitionException(final Exception e) {
769+
return ExceptionsHelper.unwrapCause(e) instanceof LookupMapLockAcquisitionException;
770+
}
771+
759772
/**
760773
* Creates a new bulk item result from the given requests and result of performing the update operation on the shard.
761774
*/

server/src/main/java/org/opensearch/common/lucene/Lucene.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
import org.opensearch.core.common.io.stream.StreamOutput;
9292
import org.opensearch.index.analysis.AnalyzerScope;
9393
import org.opensearch.index.analysis.NamedAnalyzer;
94+
import org.opensearch.index.codec.CriteriaBasedCodec;
9495
import org.opensearch.index.fielddata.IndexFieldData;
9596
import org.opensearch.index.fielddata.plain.NonPruningSortedSetOrdinalsIndexFieldData.NonPruningSortField;
9697
import org.opensearch.search.sort.SortedWiderNumericSortField;
@@ -939,10 +940,23 @@ public LeafReader wrap(LeafReader leaf) {
939940
// Two scenarios that we have hard-deletes: (1) from old segments where soft-deletes was disabled,
940941
// (2) when IndexWriter hits non-aborted exceptions. These two cases, IW flushes SegmentInfos
941942
// before exposing the hard-deletes, thus we can use the hard-delete count of SegmentInfos.
942-
final int numDocs = segmentReader.maxDoc() - segmentReader.getSegmentInfo().getDelCount();
943+
944+
// With CAS enabled segments, hard deletes can also be present, so correcting numDocs.
945+
// We are using attribute value here to identify whether segment has CAS enabled or not.
946+
int numDocs;
947+
if (isContextAwareEnabled(segmentReader)) {
948+
numDocs = popCount(hardLiveDocs);
949+
} else {
950+
numDocs = segmentReader.maxDoc() - segmentReader.getSegmentInfo().getDelCount();
951+
}
952+
943953
assert numDocs == popCount(hardLiveDocs) : numDocs + " != " + popCount(hardLiveDocs);
944954
return new LeafReaderWithLiveDocs(segmentReader, hardLiveDocs, numDocs);
945955
}
956+
957+
private boolean isContextAwareEnabled(SegmentReader reader) {
958+
return reader.getSegmentInfo().info.getAttribute(CriteriaBasedCodec.BUCKET_NAME) != null;
959+
}
946960
});
947961
}
948962

server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
176176
ShardsLimitAllocationDecider.INDEX_TOTAL_REMOTE_CAPABLE_PRIMARY_SHARDS_PER_NODE_SETTING,
177177
IndexSettings.INDEX_GC_DELETES_SETTING,
178178
IndexSettings.INDEX_SOFT_DELETES_SETTING,
179+
IndexSettings.INDEX_CONTEXT_AWARE_ENABLED_SETTING,
180+
IndexSettings.INDEX_MAX_RETRY_ON_LOOKUP_MAP_LOCK_ACQUISITION_EXCEPTION,
179181
IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING,
180182
IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING,
181183
IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING,

0 commit comments

Comments
 (0)