Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.common.SetOnce;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.remote.RemoteSegmentTransferTracker;
Expand Down Expand Up @@ -261,92 +262,59 @@ public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exce
// - Assert that download stats == upload stats
// - Repeat this step for random times (between 5 and 10)

// Create index with 1 pri and 1 replica and refresh interval disabled
createIndex(
INDEX_NAME,
Settings.builder().put(remoteStoreIndexSettings(1, 1)).put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1).build()
);
ensureGreen(INDEX_NAME);

// Manually invoke a refresh
refresh(INDEX_NAME);

// Get zero state values
// Extract and assert zero state primary stats
RemoteStoreStatsResponse zeroStateResponse = client().admin().cluster().prepareRemoteStoreStats(INDEX_NAME, "0").get();
RemoteSegmentTransferTracker.Stats zeroStatePrimaryStats = Arrays.stream(zeroStateResponse.getRemoteStoreStats())
.filter(remoteStoreStats -> remoteStoreStats.getShardRouting().primary())
.collect(Collectors.toList())
.get(0)
.getSegmentStats();
logger.info(
"Zero state primary stats: {}ms refresh time lag, {}b bytes lag, {}b upload bytes started, {}b upload bytes failed , {} uploads succeeded, {} upload byes succeeded.",
zeroStatePrimaryStats.refreshTimeLagMs,
zeroStatePrimaryStats.bytesLag,
zeroStatePrimaryStats.uploadBytesStarted,
zeroStatePrimaryStats.uploadBytesFailed,
zeroStatePrimaryStats.totalUploadsSucceeded,
zeroStatePrimaryStats.uploadBytesSucceeded
);
assertTrue(
zeroStatePrimaryStats.totalUploadsStarted == zeroStatePrimaryStats.totalUploadsSucceeded
&& zeroStatePrimaryStats.totalUploadsSucceeded == 1
);
assertTrue(
zeroStatePrimaryStats.uploadBytesStarted == zeroStatePrimaryStats.uploadBytesSucceeded
&& zeroStatePrimaryStats.uploadBytesSucceeded > 0
);
assertTrue(zeroStatePrimaryStats.totalUploadsFailed == 0 && zeroStatePrimaryStats.uploadBytesFailed == 0);
// Prepare settings with single replica
Settings.Builder settings = Settings.builder()
.put(remoteStoreIndexSettings(1, 1))
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1);

// Extract and assert zero state replica stats
RemoteSegmentTransferTracker.Stats zeroStateReplicaStats = Arrays.stream(zeroStateResponse.getRemoteStoreStats())
.filter(remoteStoreStats -> !remoteStoreStats.getShardRouting().primary())
.collect(Collectors.toList())
.get(0)
.getSegmentStats();
assertTrue(
zeroStateReplicaStats.directoryFileTransferTrackerStats.transferredBytesStarted == 0
&& zeroStateReplicaStats.directoryFileTransferTrackerStats.transferredBytesSucceeded == 0
);
// Retrieve zero state stats
SetOnce<RemoteSegmentTransferTracker.Stats> zeroStatePrimaryStats = prepareZeroStateStats(settings, false);

// Index documents
// Iteration logic
for (int i = 1; i <= randomIntBetween(5, 10); i++) {
indexSingleDoc(INDEX_NAME);
// Running Flush & Refresh manually
flushAndRefresh(INDEX_NAME);
ensureGreen(INDEX_NAME);
waitForReplication();

// Poll for RemoteStore Stats
assertBusy(() -> {
RemoteStoreStatsResponse response = client().admin().cluster().prepareRemoteStoreStats(INDEX_NAME, "0").get();
// Iterate through the response and extract the relevant segment upload and download stats

// Existing validation logic
List<RemoteStoreStats> primaryStatsList = Arrays.stream(response.getRemoteStoreStats())
.filter(remoteStoreStats -> remoteStoreStats.getShardRouting().primary())
.collect(Collectors.toList());
.toList();
assertEquals(1, primaryStatsList.size());

List<RemoteStoreStats> replicaStatsList = Arrays.stream(response.getRemoteStoreStats())
.filter(remoteStoreStats -> !remoteStoreStats.getShardRouting().primary())
.collect(Collectors.toList());
.toList();
assertEquals(1, replicaStatsList.size());
RemoteSegmentTransferTracker.Stats primaryStats = primaryStatsList.get(0).getSegmentStats();
RemoteSegmentTransferTracker.Stats replicaStats = replicaStatsList.get(0).getSegmentStats();
// Assert Upload syncs - zero state uploads == download syncs

RemoteSegmentTransferTracker.Stats primaryStats = primaryStatsList.getFirst().getSegmentStats();
RemoteSegmentTransferTracker.Stats replicaStats = replicaStatsList.getFirst().getSegmentStats();

// Existing assertions
assertTrue(primaryStats.totalUploadsStarted > 0);
assertTrue(primaryStats.totalUploadsSucceeded > 0);
assertTrue(replicaStats.directoryFileTransferTrackerStats.transferredBytesStarted > 0);

assertTrue(
replicaStats.directoryFileTransferTrackerStats.transferredBytesStarted > 0
&& primaryStats.uploadBytesStarted
- zeroStatePrimaryStats.uploadBytesStarted >= replicaStats.directoryFileTransferTrackerStats.transferredBytesStarted
primaryStats.uploadBytesStarted - zeroStatePrimaryStats
.get().uploadBytesStarted >= replicaStats.directoryFileTransferTrackerStats.transferredBytesStarted
);

assertTrue(replicaStats.directoryFileTransferTrackerStats.transferredBytesSucceeded > 0);

assertTrue(
replicaStats.directoryFileTransferTrackerStats.transferredBytesSucceeded > 0
&& primaryStats.uploadBytesSucceeded
- zeroStatePrimaryStats.uploadBytesSucceeded >= replicaStats.directoryFileTransferTrackerStats.transferredBytesSucceeded
primaryStats.uploadBytesSucceeded - zeroStatePrimaryStats
.get().uploadBytesSucceeded >= replicaStats.directoryFileTransferTrackerStats.transferredBytesSucceeded
);

// Assert zero failures
assertEquals(0, primaryStats.uploadBytesFailed);
assertEquals(0, replicaStats.directoryFileTransferTrackerStats.transferredBytesFailed);
}, 60, TimeUnit.SECONDS);
});
}
}

Expand All @@ -361,76 +329,42 @@ public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() thr
// - Assert that download stats == upload stats
// - Repeat this step for random times (between 5 and 10)

// Create index
// Get number of data nodes
int dataNodeCount = client().admin().cluster().prepareHealth().get().getNumberOfDataNodes();
createIndex(
INDEX_NAME,
Settings.builder()
.put(remoteStoreIndexSettings(dataNodeCount - 1, 1))
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1)
.build()
);
ensureGreen(INDEX_NAME);

// Manually invoke a refresh
refresh(INDEX_NAME);

// Get zero state values
// Extract and assert zero state primary stats
RemoteStoreStatsResponse zeroStateResponse = client().admin().cluster().prepareRemoteStoreStats(INDEX_NAME, "0").get();
RemoteSegmentTransferTracker.Stats zeroStatePrimaryStats = Arrays.stream(zeroStateResponse.getRemoteStoreStats())
.filter(remoteStoreStats -> remoteStoreStats.getShardRouting().primary())
.collect(Collectors.toList())
.get(0)
.getSegmentStats();
logger.info(
"Zero state primary stats: {}ms refresh time lag, {}b bytes lag, {}b upload bytes started, {}b upload bytes failed , {} uploads succeeded, {} upload byes succeeded.",
zeroStatePrimaryStats.refreshTimeLagMs,
zeroStatePrimaryStats.bytesLag,
zeroStatePrimaryStats.uploadBytesStarted,
zeroStatePrimaryStats.uploadBytesFailed,
zeroStatePrimaryStats.totalUploadsSucceeded,
zeroStatePrimaryStats.uploadBytesSucceeded
);
assertTrue(
zeroStatePrimaryStats.totalUploadsStarted == zeroStatePrimaryStats.totalUploadsSucceeded
&& zeroStatePrimaryStats.totalUploadsSucceeded == 1
);
assertTrue(
zeroStatePrimaryStats.uploadBytesStarted == zeroStatePrimaryStats.uploadBytesSucceeded
&& zeroStatePrimaryStats.uploadBytesSucceeded > 0
);
assertTrue(zeroStatePrimaryStats.totalUploadsFailed == 0 && zeroStatePrimaryStats.uploadBytesFailed == 0);
// Prepare settings with multiple replicas
Settings.Builder settings = Settings.builder()
.put(remoteStoreIndexSettings(dataNodeCount - 1, 1))
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1);

// Extract and assert zero state replica stats
List<RemoteStoreStats> zeroStateReplicaStats = Arrays.stream(zeroStateResponse.getRemoteStoreStats())
.filter(remoteStoreStats -> !remoteStoreStats.getShardRouting().primary())
.collect(Collectors.toList());
zeroStateReplicaStats.forEach(stats -> {
assertTrue(
stats.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesStarted == 0
&& stats.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesSucceeded == 0
);
});
// Retrieve zero state stats
SetOnce<RemoteSegmentTransferTracker.Stats> zeroStatePrimaryStats = prepareZeroStateStats(settings, true);

// Get current nodes in cluster
int currentNodesInCluster = client().admin().cluster().prepareHealth().get().getNumberOfDataNodes();

// Iteration logic
for (int i = 0; i < randomIntBetween(5, 10); i++) {
indexSingleDoc(INDEX_NAME);
// Running Flush & Refresh manually
flushAndRefresh(INDEX_NAME);
ensureGreen(INDEX_NAME);
waitForReplication();

assertBusy(() -> {
RemoteStoreStatsResponse response = client().admin().cluster().prepareRemoteStoreStats(INDEX_NAME, "0").get();

// Validate total and successful shards
assertEquals(currentNodesInCluster, response.getSuccessfulShards());
long uploadsStarted = 0, uploadsSucceeded = 0, uploadsFailed = 0;
long uploadBytesStarted = 0, uploadBytesSucceeded = 0, uploadBytesFailed = 0;
List<Long> downloadBytesStarted = new ArrayList<>(), downloadBytesSucceeded = new ArrayList<>(), downloadBytesFailed =
new ArrayList<>();

// Assert that stats for primary shard and replica shard set are equal
for (RemoteStoreStats eachStatsObject : response.getRemoteStoreStats()) {
RemoteSegmentTransferTracker.Stats stats = eachStatsObject.getSegmentStats();
if (eachStatsObject.getShardRouting().primary()) {
long uploadBytesStarted = 0, uploadBytesSucceeded = 0, uploadBytesFailed = 0;
List<Long> downloadBytesStarted = new ArrayList<>();
List<Long> downloadBytesSucceeded = new ArrayList<>();
List<Long> downloadBytesFailed = new ArrayList<>();

// Collect stats for primary and replica shards
for (RemoteStoreStats statsObject : response.getRemoteStoreStats()) {
RemoteSegmentTransferTracker.Stats stats = statsObject.getSegmentStats();
if (statsObject.getShardRouting().primary()) {
uploadBytesStarted = stats.uploadBytesStarted;
uploadBytesSucceeded = stats.uploadBytesSucceeded;
uploadBytesFailed = stats.uploadBytesFailed;
Expand All @@ -441,17 +375,78 @@ public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() thr
}
}

assertEquals(0, uploadsFailed);
// Assertions
assertEquals(0, uploadBytesFailed);
for (int j = 0; j < response.getSuccessfulShards() - 1; j++) {
assertTrue(uploadBytesStarted - zeroStatePrimaryStats.uploadBytesStarted > downloadBytesStarted.get(j));
assertTrue(uploadBytesSucceeded - zeroStatePrimaryStats.uploadBytesSucceeded > downloadBytesSucceeded.get(j));
assertTrue(uploadBytesStarted - zeroStatePrimaryStats.get().uploadBytesStarted > downloadBytesStarted.get(j));
assertTrue(uploadBytesSucceeded - zeroStatePrimaryStats.get().uploadBytesSucceeded > downloadBytesSucceeded.get(j));
assertEquals(0, (long) downloadBytesFailed.get(j));
}
}, 60, TimeUnit.SECONDS);
});
}
}

// New helper method to validate zero state primary stats
private void validateZeroStatePrimaryStats(RemoteSegmentTransferTracker.Stats primaryStats) {
logger.info("Zero state primary stats: {}", primaryStats);
assertEquals(primaryStats.totalUploadsStarted, primaryStats.totalUploadsSucceeded);
assertTrue(primaryStats.totalUploadsSucceeded >= 1);
assertEquals(primaryStats.uploadBytesStarted, primaryStats.uploadBytesSucceeded);
assertTrue(primaryStats.uploadBytesSucceeded > 0);
assertEquals(0, primaryStats.totalUploadsFailed);
assertEquals(0, primaryStats.uploadBytesFailed);
}

// helper method to validate zero state replica stats
private void validateZeroStateReplicaStats(RemoteStoreStatsResponse zeroStateResponse, boolean multipleShardsExpected) {
List<RemoteStoreStats> zeroStateReplicaStats = Arrays.stream(zeroStateResponse.getRemoteStoreStats())
.filter(remoteStoreStats -> !remoteStoreStats.getShardRouting().primary())
.toList();

if (multipleShardsExpected) {
zeroStateReplicaStats.forEach(stats -> {
assertEquals(0, stats.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesStarted);
assertEquals(0, stats.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesSucceeded);
});
} else {
RemoteSegmentTransferTracker.Stats replicaStats = zeroStateReplicaStats.getFirst().getSegmentStats();
assertEquals(0, replicaStats.directoryFileTransferTrackerStats.transferredBytesStarted);
assertEquals(0, replicaStats.directoryFileTransferTrackerStats.transferredBytesSucceeded);
}
}

// New helper method for common test setup and zero state stats retrieval
private SetOnce<RemoteSegmentTransferTracker.Stats> prepareZeroStateStats(
Settings.Builder additionalSettings,
boolean multipleShardsExpected
) throws Exception {
SetOnce<RemoteSegmentTransferTracker.Stats> zeroStatePrimaryStats = new SetOnce<>();

// Create index with specified settings
createIndex(INDEX_NAME, additionalSettings.build());
ensureGreen(INDEX_NAME);

// Manually invoke a refresh
refresh(INDEX_NAME);

assertBusy(() -> {
RemoteStoreStatsResponse zeroStateResponse = client().admin().cluster().prepareRemoteStoreStats(INDEX_NAME, "0").get();

RemoteSegmentTransferTracker.Stats primaryStats = Arrays.stream(zeroStateResponse.getRemoteStoreStats())
.filter(remoteStoreStats -> remoteStoreStats.getShardRouting().primary())
.toList()
.getFirst()
.getSegmentStats();

validateZeroStatePrimaryStats(primaryStats);
validateZeroStateReplicaStats(zeroStateResponse, multipleShardsExpected);

zeroStatePrimaryStats.set(primaryStats);
});

return zeroStatePrimaryStats;
}

public void testStatsOnShardRelocation() {
setup();
// Scenario:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,5 +580,53 @@
directoryFileTransferTrackerStats
);
}

@Override
public String toString() {
return "Stats{"

Check warning on line 586 in server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java#L586

Added line #L586 was not covered by tests
+ "shardId="
+ shardId
+ ", localRefreshClockTimeMs="
+ localRefreshClockTimeMs
+ ", remoteRefreshClockTimeMs="
+ remoteRefreshClockTimeMs
+ ", refreshTimeLagMs="
+ refreshTimeLagMs
+ ", localRefreshNumber="
+ localRefreshNumber
+ ", remoteRefreshNumber="
+ remoteRefreshNumber
+ ", uploadBytesStarted="
+ uploadBytesStarted
+ ", uploadBytesFailed="
+ uploadBytesFailed
+ ", uploadBytesSucceeded="
+ uploadBytesSucceeded
+ ", totalUploadsStarted="
+ totalUploadsStarted
+ ", totalUploadsFailed="
+ totalUploadsFailed
+ ", totalUploadsSucceeded="
+ totalUploadsSucceeded
+ ", rejectionCount="
+ rejectionCount
+ ", consecutiveFailuresCount="
+ consecutiveFailuresCount
+ ", lastSuccessfulRemoteRefreshBytes="
+ lastSuccessfulRemoteRefreshBytes
+ ", uploadBytesMovingAverage="
+ uploadBytesMovingAverage
+ ", uploadBytesPerSecMovingAverage="
+ uploadBytesPerSecMovingAverage
+ ", totalUploadTimeInMs="
+ totalUploadTimeInMs
+ ", uploadTimeMovingAverage="
+ uploadTimeMovingAverage
+ ", bytesLag="
+ bytesLag
+ ", directoryFileTransferTrackerStats="
+ directoryFileTransferTrackerStats
+ '}';
}
}
}
Loading