Skip to content

Commit 62f0be4

Browse files
committed
Fixing assertion in TranslogWritter for derived source case in which recovery source could be different in display form but logically would be same
Signed-off-by: Tanik Pansuriya <[email protected]>
1 parent 31d701e commit 62f0be4

File tree

10 files changed

+167
-69
lines changed

10 files changed

+167
-69
lines changed

CHANGELOG.md

Lines changed: 3 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -7,43 +7,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
77
### Added
88
- Add support for Warm Indices Write Block on Flood Watermark breach ([#18375](https://github.com/opensearch-project/OpenSearch/pull/18375))
99
- Ability to run Code Coverage with Gradle and produce the jacoco reports locally ([#18509](https://github.com/opensearch-project/OpenSearch/issues/18509))
10-
- Add support for linux riscv64 platform ([#18156](https://github.com/opensearch-project/OpenSearch/pull/18156))
11-
- [Rule based auto-tagging] Add get rule API ([#17336](https://github.com/opensearch-project/OpenSearch/pull/17336))
12-
- [Rule based auto-tagging] Add Delete Rule API ([#18184](https://github.com/opensearch-project/OpenSearch/pull/18184))
13-
- Add paginated wlm/stats API ([#17638](https://github.com/opensearch-project/OpenSearch/pull/17638))
14-
- [Rule based auto-tagging] Add Create rule API ([#17792](https://github.com/opensearch-project/OpenSearch/pull/17792))
15-
- Implement parallel shard refresh behind cluster settings ([#17782](https://github.com/opensearch-project/OpenSearch/pull/17782))
16-
- Bump OpenSearch Core main branch to 3.0.0 ([#18039](https://github.com/opensearch-project/OpenSearch/pull/18039))
17-
- [Rule based Auto-tagging] Add wlm `ActionFilter` ([#17791](https://github.com/opensearch-project/OpenSearch/pull/17791))
18-
- Update API of Message in index to add the timestamp for lag calculation in ingestion polling ([#17977](https://github.com/opensearch-project/OpenSearch/pull/17977/))
19-
- Add Warm Disk Threshold Allocation Decider for Warm shards ([#18082](https://github.com/opensearch-project/OpenSearch/pull/18082))
20-
- Add composite directory factory ([#17988](https://github.com/opensearch-project/OpenSearch/pull/17988))
21-
- [Rule based auto-tagging] Add refresh based synchronization service for `Rule`s ([#18128](https://github.com/opensearch-project/OpenSearch/pull/18128))
22-
- Add pull-based ingestion error metrics and make internal queue size configurable ([#18088](https://github.com/opensearch-project/OpenSearch/pull/18088))
23-
- [Derive Source] Adding support for derive source feature and implementing it for various type of field mappers ([#17759](https://github.com/opensearch-project/OpenSearch/pull/17759))
24-
- [Derive Source] Adding integration of derived source feature across diff paths ([#18054](https://github.com/opensearch-project/OpenSearch/pull/18054))
25-
- [Security Manager Replacement] Enhance Java Agent to intercept newByteChannel ([#17989](https://github.com/opensearch-project/OpenSearch/pull/17989))
26-
- Enabled Async Shard Batch Fetch by default ([#18139](https://github.com/opensearch-project/OpenSearch/pull/18139))
27-
- Allow to get the search request from the QueryCoordinatorContext ([#17818](https://github.com/opensearch-project/OpenSearch/pull/17818))
28-
- Reject close index requests, while remote store migration is in progress.([#18327](https://github.com/opensearch-project/OpenSearch/pull/18327))
29-
- Improve sort-query performance by retaining the default `totalHitsThreshold` for approximated `match_all` queries ([#18189](https://github.com/opensearch-project/OpenSearch/pull/18189))
30-
- Enable testing for ExtensiblePlugins using classpath plugins ([#16908](https://github.com/opensearch-project/OpenSearch/pull/16908))
31-
- Introduce system generated ingest pipeline ([#17817](https://github.com/opensearch-project/OpenSearch/pull/17817)))
32-
- Apply cluster state metadata and routing table diff when building cluster state from remote([#18256](https://github.com/opensearch-project/OpenSearch/pull/18256))
33-
- Support create mode in pull-based ingestion and add retries for transient failures ([#18250](https://github.com/opensearch-project/OpenSearch/pull/18250)))
34-
- Decouple the init of Crypto Plugin and KeyProvider in CryptoRegistry ([18270](https://github.com/opensearch-project/OpenSearch/pull18270)))
35-
- Support cluster write block in pull-based ingestion ([#18280](https://github.com/opensearch-project/OpenSearch/pull/18280)))
36-
- Use QueryCoordinatorContext for the rewrite in validate API. ([#18272](https://github.com/opensearch-project/OpenSearch/pull/18272))
37-
- Upgrade crypto kms plugin dependencies for AWS SDK v2.x. ([#18268](https://github.com/opensearch-project/OpenSearch/pull/18268))
38-
- Add support for `matched_fields` with the unified highlighter ([#18164](https://github.com/opensearch-project/OpenSearch/issues/18164))
39-
- Add BooleanQuery rewrite for must_not RangeQuery clauses ([#17655](https://github.com/opensearch-project/OpenSearch/pull/17655))
40-
- [repository-s3] Add support for SSE-KMS and S3 bucket owner verification ([#18312](https://github.com/opensearch-project/OpenSearch/pull/18312))
41-
- Optimize gRPC perf by passing by reference ([#18303](https://github.com/opensearch-project/OpenSearch/pull/18303))
42-
- Added File Cache Stats - Involves Block level as well as full file level stats ([#17538](https://github.com/opensearch-project/OpenSearch/issues/17479))
43-
- Added time_in_execution attribute to /_cluster/pending_tasks response ([#17780](https://github.com/opensearch-project/OpenSearch/pull/17780))
44-
- Added File Cache Pinning ([#17617](https://github.com/opensearch-project/OpenSearch/issues/13648))
45-
- Support consumer reset in Resume API for pull-based ingestion. This PR includes a breaking change for the experimental pull-based ingestion feature. ([#18332](https://github.com/opensearch-project/OpenSearch/pull/18332))
46-
- Add FIPS build tooling ([#4254](https://github.com/opensearch-project/security/issues/4254))
4710

4811
### Changed
4912

@@ -60,3 +23,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
6023
### Security
6124

6225
[Unreleased 3.x]: https://github.com/opensearch-project/OpenSearch/compare/3.1...main
26+
### Added
27+
[Derive Source] Adding back "Adding integration of derived source feature across diff paths" with fixes for IT #18565
28+
([#18565](https://github.com/opensearch-project/OpenSearch/pull/18565))

server/src/internalClusterTest/java/org/opensearch/recovery/RecoveryWhileUnderLoadIT.java

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -533,7 +533,11 @@ public void testRecoveryWithDerivedSourceEnabled() throws Exception {
533533

534534
final int totalNumDocs = scaledRandomIntBetween(200, 1000);
535535
for (int i = 0; i < totalNumDocs; i++) {
536-
client().prepareIndex("test").setId(String.valueOf(i)).setSource("name", "test" + i, "age", i).get();
536+
if (i % 2 == 0) {
537+
client().prepareIndex("test").setId(String.valueOf(i)).setSource("name", "test" + i, "age", i).get();
538+
} else {
539+
client().prepareIndex("test").setId(String.valueOf(i)).setSource("age", i, "name", "test" + i).get();
540+
}
537541

538542
if (i % 100 == 0) {
539543
// Occasionally flush to create new segments
@@ -592,10 +596,17 @@ public void testReplicaRecoveryWithDerivedSourceBeforeRefresh() throws Exception
592596
// Index documents without refresh
593597
int docCount = randomIntBetween(100, 200);
594598
for (int i = 0; i < docCount; i++) {
595-
client().prepareIndex("test")
596-
.setId(String.valueOf(i))
597-
.setSource("timestamp", "2023-01-01T01:20:30." + String.valueOf(i % 10).repeat(3) + "Z", "ip", "192.168.1." + i)
598-
.get();
599+
if (i % 2 == 0) {
600+
client().prepareIndex("test")
601+
.setId(String.valueOf(i))
602+
.setSource("timestamp", "2023-01-01T01:20:30." + String.valueOf(i % 10).repeat(3) + "Z", "ip", "192.168.1." + i)
603+
.get();
604+
} else {
605+
client().prepareIndex("test")
606+
.setId(String.valueOf(i))
607+
.setSource("ip", "192.168.1." + i, "timestamp", "2023-01-01T01:20:30." + String.valueOf(i % 10).repeat(3) + "Z")
608+
.get();
609+
}
599610
}
600611

601612
// Add replica before refresh
@@ -657,11 +668,19 @@ public void testReplicaRecoveryWithDerivedSourceFromTranslog() throws Exception
657668
// Index documents with immediate visibility
658669
int docCount = randomIntBetween(100, 200);
659670
for (int i = 0; i < docCount; i++) {
660-
client().prepareIndex("test")
661-
.setId(String.valueOf(i))
662-
.setSource("coordinates", Geohash.stringEncode(40.0 + i, 75.0 + i) + i, "value", "fox_" + i + " in the field")
663-
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
664-
.get();
671+
if (i % 2 == 0) {
672+
client().prepareIndex("test")
673+
.setId(String.valueOf(i))
674+
.setSource("coordinates", Geohash.stringEncode(40.0 + i, 75.0 + i) + i, "value", "fox_" + i + " in the field")
675+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
676+
.get();
677+
} else {
678+
client().prepareIndex("test")
679+
.setId(String.valueOf(i))
680+
.setSource("value", "fox_" + i + " in the field", "coordinates", Geohash.stringEncode(40.0 + i, 75.0 + i) + i)
681+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
682+
.get();
683+
}
665684
}
666685

667686
// Force flush to ensure documents are in segments

server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.opensearch.index.translog.TranslogDeletionPolicy;
3030
import org.opensearch.index.translog.TranslogException;
3131
import org.opensearch.index.translog.TranslogManager;
32-
import org.opensearch.index.translog.TranslogOperationHelper;
3332
import org.opensearch.index.translog.WriteOnlyTranslogManager;
3433
import org.opensearch.index.translog.listener.TranslogEventListener;
3534
import org.opensearch.search.suggest.completion.CompletionStats;
@@ -126,8 +125,7 @@ public void onAfterTranslogSync() {
126125
},
127126
this,
128127
engineConfig.getTranslogFactory(),
129-
engineConfig.getStartedPrimarySupplier(),
130-
TranslogOperationHelper.create(engineConfig)
128+
engineConfig.getStartedPrimarySupplier()
131129
);
132130
this.translogManager = translogManagerRef;
133131
success = true;

server/src/main/java/org/opensearch/index/translog/InternalTranslogFactory.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,31 @@
2020
*/
2121
public class InternalTranslogFactory implements TranslogFactory {
2222

23+
@Override
24+
public Translog newTranslog(
25+
TranslogConfig translogConfig,
26+
String translogUUID,
27+
TranslogDeletionPolicy translogDeletionPolicy,
28+
LongSupplier globalCheckpointSupplier,
29+
LongSupplier primaryTermSupplier,
30+
LongConsumer persistedSequenceNumberConsumer,
31+
BooleanSupplier startedPrimarySupplier
32+
) throws IOException {
33+
34+
assert translogConfig.getIndexSettings().isDerivedSourceEnabled() == false; // For derived source supported index, primary method
35+
// must be used
36+
37+
return new LocalTranslog(
38+
translogConfig,
39+
translogUUID,
40+
translogDeletionPolicy,
41+
globalCheckpointSupplier,
42+
primaryTermSupplier,
43+
persistedSequenceNumberConsumer,
44+
TranslogOperationHelper.EMPTY
45+
);
46+
}
47+
2348
@Override
2449
public Translog newTranslog(
2550
TranslogConfig translogConfig,

server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,30 @@ public RemoteBlobStoreInternalTranslogFactory(
5656
this.remoteStoreSettings = remoteStoreSettings;
5757
}
5858

59+
@Override
60+
public Translog newTranslog(
61+
TranslogConfig config,
62+
String translogUUID,
63+
TranslogDeletionPolicy deletionPolicy,
64+
LongSupplier globalCheckpointSupplier,
65+
LongSupplier primaryTermSupplier,
66+
LongConsumer persistedSequenceNumberConsumer,
67+
BooleanSupplier startedPrimarySupplier
68+
) throws IOException {
69+
assert config.getIndexSettings().isDerivedSourceEnabled() == false; // For derived source supported index, primary method must be
70+
// used
71+
return this.newTranslog(
72+
config,
73+
translogUUID,
74+
deletionPolicy,
75+
globalCheckpointSupplier,
76+
primaryTermSupplier,
77+
persistedSequenceNumberConsumer,
78+
startedPrimarySupplier,
79+
TranslogOperationHelper.EMPTY
80+
);
81+
}
82+
5983
@Override
6084
public Translog newTranslog(
6185
TranslogConfig config,

server/src/main/java/org/opensearch/index/translog/Translog.java

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -162,17 +162,18 @@ public abstract class Translog extends AbstractIndexShardComponent implements In
162162
* generation referenced from already committed data. This means all operations that have not yet been committed should be in the
163163
* translog file referenced by this generation. The translog creation will fail if this generation can't be opened.
164164
*
165-
* @param config the configuration of this translog
166-
* @param translogUUID the translog uuid to open, null for a new translog
167-
* @param deletionPolicy an instance of {@link TranslogDeletionPolicy} that controls when a translog file can be safely
168-
* deleted
169-
* @param globalCheckpointSupplier a supplier for the global checkpoint
170-
* @param primaryTermSupplier a supplier for the latest value of primary term of the owning index shard. The latest term value is
171-
* examined and stored in the header whenever a new generation is rolled. It's guaranteed from outside
172-
* that a new generation is rolled when the term is increased. This guarantee allows to us to validate
173-
* and reject operation whose term is higher than the primary term stored in the translog header.
165+
* @param config the configuration of this translog
166+
* @param translogUUID the translog uuid to open, null for a new translog
167+
* @param deletionPolicy an instance of {@link TranslogDeletionPolicy} that controls when a translog file can be safely
168+
* deleted
169+
* @param globalCheckpointSupplier a supplier for the global checkpoint
170+
* @param primaryTermSupplier a supplier for the latest value of primary term of the owning index shard. The latest term value is
171+
* examined and stored in the header whenever a new generation is rolled. It's guaranteed from outside
172+
* that a new generation is rolled when the term is increased. This guarantee allows to us to validate
173+
* and reject operation whose term is higher than the primary term stored in the translog header.
174174
* @param persistedSequenceNumberConsumer a callback that's called whenever an operation with a given sequence number is successfully
175175
* persisted.
176+
* @param translogOperationHelper a helper method to validate translog operations with the support of derived source
176177
*/
177178
public Translog(
178179
final TranslogConfig config,
@@ -199,6 +200,30 @@ public Translog(
199200
this.translogOperationHelper = translogOperationHelper;
200201
}
201202

203+
/**
204+
* Secondary constructor, this should only be called if index is normal and not for derived source
205+
*/
206+
public Translog(
207+
final TranslogConfig config,
208+
final String translogUUID,
209+
TranslogDeletionPolicy deletionPolicy,
210+
final LongSupplier globalCheckpointSupplier,
211+
final LongSupplier primaryTermSupplier,
212+
final LongConsumer persistedSequenceNumberConsumer
213+
) throws IOException {
214+
this(
215+
config,
216+
translogUUID,
217+
deletionPolicy,
218+
globalCheckpointSupplier,
219+
primaryTermSupplier,
220+
persistedSequenceNumberConsumer,
221+
TranslogOperationHelper.EMPTY
222+
);
223+
assert config.getIndexSettings().isDerivedSourceEnabled() == false; // For derived source supported index, it is incorrect to use
224+
// this constructor
225+
}
226+
202227
/** recover all translog files found on disk */
203228
protected ArrayList<TranslogReader> recoverFromFiles(Checkpoint checkpoint) throws IOException {
204229
boolean success = false;
@@ -2102,7 +2127,7 @@ public static String createEmptyTranslog(
21022127
},
21032128
BigArrays.NON_RECYCLING_INSTANCE,
21042129
null,
2105-
null
2130+
TranslogOperationHelper.EMPTY
21062131
);
21072132
writer.close();
21082133
return uuid;

server/src/main/java/org/opensearch/index/translog/TranslogFactory.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,19 @@
2121
*
2222
* @opensearch.api
2323
*/
24-
@FunctionalInterface
2524
@PublicApi(since = "1.0.0")
2625
public interface TranslogFactory {
2726

27+
Translog newTranslog(
28+
final TranslogConfig config,
29+
final String translogUUID,
30+
final TranslogDeletionPolicy deletionPolicy,
31+
final LongSupplier globalCheckpointSupplier,
32+
final LongSupplier primaryTermSupplier,
33+
final LongConsumer persistedSequenceNumberConsumer,
34+
final BooleanSupplier startedPrimarySupplier
35+
) throws IOException;
36+
2837
Translog newTranslog(
2938
final TranslogConfig config,
3039
final String translogUUID,

server/src/main/java/org/opensearch/index/translog/TranslogOperationHelper.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@
88

99
package org.opensearch.index.translog;
1010

11-
import org.apache.logging.log4j.LogManager;
12-
import org.apache.logging.log4j.Logger;
1311
import org.opensearch.common.annotation.PublicApi;
1412
import org.opensearch.common.collect.Tuple;
1513
import org.opensearch.common.xcontent.XContentHelper;
@@ -30,12 +28,11 @@
3028
*
3129
* @opensearch.internal
3230
*/
33-
@PublicApi(since = "1.0.0")
31+
@PublicApi(since = "3.1.0")
3432
public abstract class TranslogOperationHelper {
3533

3634
public static final TranslogOperationHelper EMPTY = new TranslogOperationHelper() {
3735
};
38-
private static final Logger logger = LogManager.getLogger(TranslogOperationHelper.class);
3936

4037
private TranslogOperationHelper() {}
4138

@@ -70,7 +67,6 @@ public boolean hasSameIndexOperation(Translog.Index op1, Translog.Index op2) {
7067
}
7168

7269
private static BytesReference deriveSource(Translog.Index op, EngineConfig engineConfig) {
73-
logger.warn("Fetching derived source for {}", op.id());
7470
try (TranslogLeafReader leafReader = new TranslogLeafReader(op, engineConfig)) {
7571
final FieldsVisitor visitor = new FieldsVisitor(true);
7672
leafReader.storedFields().document(0, visitor); // As reader will have only a single document, segment level doc id will be 0

server/src/main/java/org/opensearch/index/translog/TranslogWriter.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,43 @@ private TranslogWriter(
168168
this.translogOperationHelper = translogOperationHelper;
169169
}
170170

171+
public static TranslogWriter create(
172+
ShardId shardId,
173+
String translogUUID,
174+
long fileGeneration,
175+
Path file,
176+
ChannelFactory channelFactory,
177+
ByteSizeValue bufferSize,
178+
final long initialMinTranslogGen,
179+
long initialGlobalCheckpoint,
180+
final LongSupplier globalCheckpointSupplier,
181+
final LongSupplier minTranslogGenerationSupplier,
182+
final long primaryTerm,
183+
TragicExceptionHolder tragedy,
184+
final LongConsumer persistedSequenceNumberConsumer,
185+
final BigArrays bigArrays,
186+
Boolean remoteTranslogEnabled
187+
) throws IOException {
188+
return create(
189+
shardId,
190+
translogUUID,
191+
fileGeneration,
192+
file,
193+
channelFactory,
194+
bufferSize,
195+
initialMinTranslogGen,
196+
initialGlobalCheckpoint,
197+
globalCheckpointSupplier,
198+
minTranslogGenerationSupplier,
199+
primaryTerm,
200+
tragedy,
201+
persistedSequenceNumberConsumer,
202+
bigArrays,
203+
remoteTranslogEnabled,
204+
TranslogOperationHelper.EMPTY
205+
);
206+
}
207+
171208
public static TranslogWriter create(
172209
ShardId shardId,
173210
String translogUUID,

0 commit comments

Comments
 (0)