-
Notifications
You must be signed in to change notification settings - Fork 2.3k
Description
Describe the bug
Retries within RemoteStoreRefreshListener run outside of the refresh thread. Publishing checkpoints is included in the retry logic, meaning its possible the same checkpoint is published twice. This has no impact on replica shards as they dedupe the checkpoint, but it will invoke onCheckpointPublished within IndexShard on the primary, which starts timers to compute replication lag. If executed twice, this method will attempt to .start() the same timer twice. In tests this throws an assertion error while outside of tests it will reset the timer from the current timestamp. This will skew lag metrics.
[2023-10-19T18:23:41,312][INFO ][o.o.i.s.RemoteStoreRefreshListener] [node_s1] [test-idx-1][6] PUBLISHED CHECKPOINT ReplicationCheckpoint{shardId=[test-idx-1][6], primaryTerm=1, segmentsGen=3, version=7, size=0, codec=Lucene95} THREAD opensearch[node_s1][refresh][T#1]
[2023-10-19T18:23:41,321][INFO ][o.o.i.s.RemoteStoreRefreshListener] [node_s1] [test-idx-1][6] PUBLISHED CHECKPOINT ReplicationCheckpoint{shardId=[test-idx-1][6], primaryTerm=1, segmentsGen=3, version=7, size=0, codec=Lucene95} THREAD opensearch[node_s1][remote_refresh_retry][T#1]
[2023-10-19T18:23:41,321][ERROR][o.o.i.s.RemoteStoreRefreshListener] [node_s2] [test-idx-1][1] Exception in RemoteStoreRefreshListener.afterRefresh()
java.lang.AssertionError: already started
at org.opensearch.indices.replication.common.ReplicationTimer.start(ReplicationTimer.java:49) ~[classes/:?]
at org.opensearch.index.seqno.ReplicationTracker.lambda$startReplicationLagTimers$21(ReplicationTracker.java:1274) ~[classes/:?]
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) ~[?:?]
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179) ~[?:?]
at java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1858) ~[?:?]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) ~[?:?]
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) ~[?:?]
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) ~[?:?]
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) ~[?:?]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596) ~[?:?]
at org.opensearch.index.seqno.ReplicationTracker.startReplicationLagTimers(ReplicationTracker.java:1265) ~[classes/:?]
at org.opensearch.index.shard.IndexShard.onCheckpointPublished(IndexShard.java:1910) ~[classes/:?]
at org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher.publish(SegmentReplicationCheckpointPublisher.java:37) ~[classes/:?]
at org.opensearch.index.shard.RemoteStoreRefreshListener.onSuccessfulSegmentsSync(RemoteStoreRefreshListener.java:299) ~[classes/:?]
at org.opensearch.index.shard.RemoteStoreRefreshListener$1.onResponse(RemoteStoreRefreshListener.java:219) ~[classes/:?]
at org.opensearch.index.shard.RemoteStoreRefreshListener$1.onResponse(RemoteStoreRefreshListener.java:210) ~[classes/:?]
at org.opensearch.action.LatchedActionListener.onResponse(LatchedActionListener.java:58) ~[classes/:?]
at org.opensearch.index.shard.RemoteStoreRefreshListener.uploadNewSegments(RemoteStoreRefreshListener.java:356) ~[classes/:?]
at org.opensearch.index.shard.RemoteStoreRefreshListener.syncSegments(RemoteStoreRefreshListener.java:244) ~[classes/:?]
at org.opensearch.index.shard.RemoteStoreRefreshListener.performAfterRefreshWithPermit(RemoteStoreRefreshListener.java:155) ~[classes/:?]
at org.opensearch.index.shard.CloseableRetryableRefreshListener.runAfterRefreshWithPermit(CloseableRetryableRefreshListener.java:157) ~[classes/:?]
at org.opensearch.index.shard.CloseableRetryableRefreshListener.lambda$scheduleRetry$2(CloseableRetryableRefreshListener.java:123) ~[classes/:?]
at org.opensearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:849) [classes/:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) [?:?]
at java.lang.Thread.run(Thread.java:1589) [?:?
To Reproduce
I have been running SegmentReplicationUsingRemoteStoreITs on repeat, and can see this in logs from various test cases, while the test does not fail the timer would be reset.
Expected behavior
The checkpoint timer should not be reset if already started.