Skip to content

Commit 819ffae

Browse files
committed
Skip segrep lag computation for shard copies on docrep nodes
Signed-off-by: Shourya Dutta Biswas <[email protected]>
1 parent 1b36ee4 commit 819ffae

File tree

2 files changed

+49
-2
lines changed

2 files changed

+49
-2
lines changed

server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualReplicationIT.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
1919
import org.opensearch.common.settings.Settings;
2020
import org.opensearch.index.IndexService;
21+
import org.opensearch.index.ReplicationStats;
2122
import org.opensearch.index.remote.RemoteSegmentStats;
2223
import org.opensearch.index.seqno.RetentionLease;
2324
import org.opensearch.index.seqno.RetentionLeases;
@@ -665,6 +666,43 @@ public void testFailoverRemotePrimaryToDocrepReplicaReseedToRemotePrimary() thro
665666
});
666667
}
667668

669+
/*
670+
Performs the same experiment as testRemotePrimaryDocRepReplica.
671+
672+
This ensures that the primary shard for the index has moved over to remote
673+
enabled node whereas the replica copy is still left behind on the docrep nodes
674+
675+
At this stage, segrep lag computation shouldn't consider the docrep shard copy while calculating bytes lag
676+
*/
677+
public void testZeroSegrepLagForShardsWithMixedReplicationGroup() throws Exception {
678+
testRemotePrimaryDocRepReplica();
679+
String remoteNodeName = internalCluster().client()
680+
.admin()
681+
.cluster()
682+
.prepareNodesStats()
683+
.get()
684+
.getNodes()
685+
.stream()
686+
.filter(nodeStats -> nodeStats.getNode().isRemoteStoreNode())
687+
.findFirst()
688+
.get()
689+
.getNode()
690+
.getName();
691+
ReplicationStats replicationStats = internalCluster().client()
692+
.admin()
693+
.cluster()
694+
.prepareNodesStats(remoteNodeName)
695+
.get()
696+
.getNodes()
697+
.get(0)
698+
.getIndices()
699+
.getSegments()
700+
.getReplicationStats();
701+
assertEquals(0, replicationStats.getMaxBytesBehind());
702+
assertEquals(0, replicationStats.getTotalBytesBehind());
703+
assertEquals(0, replicationStats.getMaxReplicationLag());
704+
}
705+
668706
private void assertReplicaAndPrimaryConsistency(String indexName, int firstBatch, int secondBatch) throws Exception {
669707
assertBusy(() -> {
670708
Map<ShardRouting, ShardStats> shardStatsMap = internalCluster().client()

server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1319,14 +1319,23 @@ && isPrimaryRelocation(e.getKey()) == false
13191319
public synchronized Set<SegmentReplicationShardStats> getSegmentReplicationStats() {
13201320
assert indexSettings.isSegRepEnabledOrRemoteNode();
13211321
if (primaryMode) {
1322+
// Check if the current primary shard is migrating to remote and
1323+
// all the other shard copies of the same index still hasn't completely moved over
1324+
// to the remote enabled nodes
1325+
boolean migratingPrimaryOnRemoteNode = indexSettings.isAssignedOnRemoteNode() && indexSettings.isRemoteStoreEnabled() == false;
13221326
return this.checkpoints.entrySet()
13231327
.stream()
1324-
// filter out this shard's allocation id, any shards that are out of sync or unavailable (shard marked in-sync but has not
1325-
// been assigned to a node).
1328+
/* Filter out:
1329+
- This shard's allocation id
1330+
- Any shards that are out of sync or unavailable (shard marked in-sync but has not been assigned to a node).
1331+
- (For remote store enabled clusters) Any shard that is not yet migrated to remote store enabled nodes during migration
1332+
*/
13261333
.filter(
13271334
entry -> entry.getKey().equals(this.shardAllocationId) == false
13281335
&& entry.getValue().inSync
13291336
&& replicationGroup.getUnavailableInSyncShards().contains(entry.getKey()) == false
1337+
&& (migratingPrimaryOnRemoteNode
1338+
&& isShardOnRemoteEnabledNode.apply(routingTable.getByAllocationId(entry.getKey()).currentNodeId()))
13301339
&& isPrimaryRelocation(entry.getKey()) == false
13311340
)
13321341
.map(entry -> buildShardStats(entry.getKey(), entry.getValue()))

0 commit comments

Comments
 (0)