2121import org .opensearch .cluster .routing .ShardRouting ;
2222import org .opensearch .cluster .routing .ShardRoutingState ;
2323import org .opensearch .cluster .routing .allocation .command .MoveAllocationCommand ;
24+ import org .opensearch .common .SetOnce ;
2425import org .opensearch .common .settings .Settings ;
2526import org .opensearch .index .IndexSettings ;
2627import org .opensearch .index .remote .RemoteSegmentTransferTracker ;
@@ -269,92 +270,59 @@ public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exce
269270 // - Assert that download stats == upload stats
270271 // - Repeat this step for random times (between 5 and 10)
271272
272- // Create index with 1 pri and 1 replica and refresh interval disabled
273- createIndex (
274- INDEX_NAME ,
275- Settings .builder ().put (remoteStoreIndexSettings (1 , 1 )).put (IndexSettings .INDEX_REFRESH_INTERVAL_SETTING .getKey (), -1 ).build ()
276- );
277- ensureGreen (INDEX_NAME );
273+ // Prepare settings with single replica
274+ Settings .Builder settings = Settings .builder ()
275+ .put (remoteStoreIndexSettings (1 , 1 ))
276+ .put (IndexSettings .INDEX_REFRESH_INTERVAL_SETTING .getKey (), -1 );
278277
279- // Manually invoke a refresh
280- refresh ( INDEX_NAME );
278+ // Retrieve zero state stats
279+ SetOnce < RemoteSegmentTransferTracker . Stats > zeroStatePrimaryStats = prepareZeroStateStats ( settings , false );
281280
282- // Get zero state values
283- // Extract and assert zero state primary stats
284- RemoteStoreStatsResponse zeroStateResponse = client ().admin ().cluster ().prepareRemoteStoreStats (INDEX_NAME , "0" ).get ();
285- RemoteSegmentTransferTracker .Stats zeroStatePrimaryStats = Arrays .stream (zeroStateResponse .getRemoteStoreStats ())
286- .filter (remoteStoreStats -> remoteStoreStats .getShardRouting ().primary ())
287- .collect (Collectors .toList ())
288- .get (0 )
289- .getSegmentStats ();
290- logger .info (
291- "Zero state primary stats: {}ms refresh time lag, {}b bytes lag, {}b upload bytes started, {}b upload bytes failed , {} uploads succeeded, {} upload byes succeeded." ,
292- zeroStatePrimaryStats .refreshTimeLagMs ,
293- zeroStatePrimaryStats .bytesLag ,
294- zeroStatePrimaryStats .uploadBytesStarted ,
295- zeroStatePrimaryStats .uploadBytesFailed ,
296- zeroStatePrimaryStats .totalUploadsSucceeded ,
297- zeroStatePrimaryStats .uploadBytesSucceeded
298- );
299- assertTrue (
300- zeroStatePrimaryStats .totalUploadsStarted == zeroStatePrimaryStats .totalUploadsSucceeded
301- && zeroStatePrimaryStats .totalUploadsSucceeded == 1
302- );
303- assertTrue (
304- zeroStatePrimaryStats .uploadBytesStarted == zeroStatePrimaryStats .uploadBytesSucceeded
305- && zeroStatePrimaryStats .uploadBytesSucceeded > 0
306- );
307- assertTrue (zeroStatePrimaryStats .totalUploadsFailed == 0 && zeroStatePrimaryStats .uploadBytesFailed == 0 );
308-
309- // Extract and assert zero state replica stats
310- RemoteSegmentTransferTracker .Stats zeroStateReplicaStats = Arrays .stream (zeroStateResponse .getRemoteStoreStats ())
311- .filter (remoteStoreStats -> !remoteStoreStats .getShardRouting ().primary ())
312- .collect (Collectors .toList ())
313- .get (0 )
314- .getSegmentStats ();
315- assertTrue (
316- zeroStateReplicaStats .directoryFileTransferTrackerStats .transferredBytesStarted == 0
317- && zeroStateReplicaStats .directoryFileTransferTrackerStats .transferredBytesSucceeded == 0
318- );
319-
320- // Index documents
281+ // Iteration logic
321282 for (int i = 1 ; i <= randomIntBetween (5 , 10 ); i ++) {
322283 indexSingleDoc (INDEX_NAME );
323- // Running Flush & Refresh manually
324284 flushAndRefresh (INDEX_NAME );
325285 ensureGreen (INDEX_NAME );
286+ waitForReplication ();
326287
327- // Poll for RemoteStore Stats
328288 assertBusy (() -> {
329289 RemoteStoreStatsResponse response = client ().admin ().cluster ().prepareRemoteStoreStats (INDEX_NAME , "0" ).get ();
330- // Iterate through the response and extract the relevant segment upload and download stats
290+
291+ // Existing validation logic
331292 List <RemoteStoreStats > primaryStatsList = Arrays .stream (response .getRemoteStoreStats ())
332293 .filter (remoteStoreStats -> remoteStoreStats .getShardRouting ().primary ())
333294 .collect (Collectors .toList ());
334295 assertEquals (1 , primaryStatsList .size ());
296+
335297 List <RemoteStoreStats > replicaStatsList = Arrays .stream (response .getRemoteStoreStats ())
336298 .filter (remoteStoreStats -> !remoteStoreStats .getShardRouting ().primary ())
337299 .collect (Collectors .toList ());
338300 assertEquals (1 , replicaStatsList .size ());
301+
339302 RemoteSegmentTransferTracker .Stats primaryStats = primaryStatsList .get (0 ).getSegmentStats ();
340303 RemoteSegmentTransferTracker .Stats replicaStats = replicaStatsList .get (0 ).getSegmentStats ();
341- // Assert Upload syncs - zero state uploads == download syncs
304+
305+ // Existing assertions
342306 assertTrue (primaryStats .totalUploadsStarted > 0 );
343307 assertTrue (primaryStats .totalUploadsSucceeded > 0 );
308+ assertTrue (replicaStats .directoryFileTransferTrackerStats .transferredBytesStarted > 0 );
309+
344310 assertTrue (
345- replicaStats .directoryFileTransferTrackerStats .transferredBytesStarted > 0
346- && primaryStats .uploadBytesStarted
347- - zeroStatePrimaryStats .uploadBytesStarted >= replicaStats .directoryFileTransferTrackerStats .transferredBytesStarted
311+ primaryStats .uploadBytesStarted - zeroStatePrimaryStats
312+ .get ().uploadBytesStarted >= replicaStats .directoryFileTransferTrackerStats .transferredBytesStarted
348313 );
314+
315+ assertTrue (replicaStats .directoryFileTransferTrackerStats .transferredBytesSucceeded > 0 );
316+
349317 assertTrue (
350- replicaStats .directoryFileTransferTrackerStats .transferredBytesSucceeded > 0
351- && primaryStats .uploadBytesSucceeded
352- - zeroStatePrimaryStats .uploadBytesSucceeded >= replicaStats .directoryFileTransferTrackerStats .transferredBytesSucceeded
318+ primaryStats .uploadBytesSucceeded - zeroStatePrimaryStats
319+ .get ().uploadBytesSucceeded >= replicaStats .directoryFileTransferTrackerStats .transferredBytesSucceeded
353320 );
321+
354322 // Assert zero failures
355323 assertEquals (0 , primaryStats .uploadBytesFailed );
356324 assertEquals (0 , replicaStats .directoryFileTransferTrackerStats .transferredBytesFailed );
357- }, 60 , TimeUnit . SECONDS );
325+ });
358326 }
359327 }
360328
@@ -369,76 +337,42 @@ public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() thr
369337 // - Assert that download stats == upload stats
370338 // - Repeat this step for random times (between 5 and 10)
371339
372- // Create index
340+ // Get number of data nodes
373341 int dataNodeCount = client ().admin ().cluster ().prepareHealth ().get ().getNumberOfDataNodes ();
374- createIndex (
375- INDEX_NAME ,
376- Settings .builder ()
377- .put (remoteStoreIndexSettings (dataNodeCount - 1 , 1 ))
378- .put (IndexSettings .INDEX_REFRESH_INTERVAL_SETTING .getKey (), -1 )
379- .build ()
380- );
381- ensureGreen (INDEX_NAME );
382342
383- // Manually invoke a refresh
384- refresh (INDEX_NAME );
343+ // Prepare settings with multiple replicas
344+ Settings .Builder settings = Settings .builder ()
345+ .put (remoteStoreIndexSettings (dataNodeCount - 1 , 1 ))
346+ .put (IndexSettings .INDEX_REFRESH_INTERVAL_SETTING .getKey (), -1 );
385347
386- // Get zero state values
387- // Extract and assert zero state primary stats
388- RemoteStoreStatsResponse zeroStateResponse = client ().admin ().cluster ().prepareRemoteStoreStats (INDEX_NAME , "0" ).get ();
389- RemoteSegmentTransferTracker .Stats zeroStatePrimaryStats = Arrays .stream (zeroStateResponse .getRemoteStoreStats ())
390- .filter (remoteStoreStats -> remoteStoreStats .getShardRouting ().primary ())
391- .collect (Collectors .toList ())
392- .get (0 )
393- .getSegmentStats ();
394- logger .info (
395- "Zero state primary stats: {}ms refresh time lag, {}b bytes lag, {}b upload bytes started, {}b upload bytes failed , {} uploads succeeded, {} upload byes succeeded." ,
396- zeroStatePrimaryStats .refreshTimeLagMs ,
397- zeroStatePrimaryStats .bytesLag ,
398- zeroStatePrimaryStats .uploadBytesStarted ,
399- zeroStatePrimaryStats .uploadBytesFailed ,
400- zeroStatePrimaryStats .totalUploadsSucceeded ,
401- zeroStatePrimaryStats .uploadBytesSucceeded
402- );
403- assertTrue (
404- zeroStatePrimaryStats .totalUploadsStarted == zeroStatePrimaryStats .totalUploadsSucceeded
405- && zeroStatePrimaryStats .totalUploadsSucceeded == 1
406- );
407- assertTrue (
408- zeroStatePrimaryStats .uploadBytesStarted == zeroStatePrimaryStats .uploadBytesSucceeded
409- && zeroStatePrimaryStats .uploadBytesSucceeded > 0
410- );
411- assertTrue (zeroStatePrimaryStats .totalUploadsFailed == 0 && zeroStatePrimaryStats .uploadBytesFailed == 0 );
412-
413- // Extract and assert zero state replica stats
414- List <RemoteStoreStats > zeroStateReplicaStats = Arrays .stream (zeroStateResponse .getRemoteStoreStats ())
415- .filter (remoteStoreStats -> !remoteStoreStats .getShardRouting ().primary ())
416- .collect (Collectors .toList ());
417- zeroStateReplicaStats .forEach (stats -> {
418- assertTrue (
419- stats .getSegmentStats ().directoryFileTransferTrackerStats .transferredBytesStarted == 0
420- && stats .getSegmentStats ().directoryFileTransferTrackerStats .transferredBytesSucceeded == 0
421- );
422- });
348+ // Retrieve zero state stats
349+ SetOnce <RemoteSegmentTransferTracker .Stats > zeroStatePrimaryStats = prepareZeroStateStats (settings , true );
423350
351+ // Get current nodes in cluster
424352 int currentNodesInCluster = client ().admin ().cluster ().prepareHealth ().get ().getNumberOfDataNodes ();
353+
354+ // Iteration logic
425355 for (int i = 0 ; i < randomIntBetween (5 , 10 ); i ++) {
426356 indexSingleDoc (INDEX_NAME );
427- // Running Flush & Refresh manually
428357 flushAndRefresh (INDEX_NAME );
358+ ensureGreen (INDEX_NAME );
359+ waitForReplication ();
429360
430361 assertBusy (() -> {
431362 RemoteStoreStatsResponse response = client ().admin ().cluster ().prepareRemoteStoreStats (INDEX_NAME , "0" ).get ();
363+
364+ // Validate total and successful shards
432365 assertEquals (currentNodesInCluster , response .getSuccessfulShards ());
433- long uploadsStarted = 0 , uploadsSucceeded = 0 , uploadsFailed = 0 ;
434- long uploadBytesStarted = 0 , uploadBytesSucceeded = 0 , uploadBytesFailed = 0 ;
435- List <Long > downloadBytesStarted = new ArrayList <>(), downloadBytesSucceeded = new ArrayList <>(), downloadBytesFailed =
436- new ArrayList <>();
437366
438- // Assert that stats for primary shard and replica shard set are equal
439- for (RemoteStoreStats eachStatsObject : response .getRemoteStoreStats ()) {
440- RemoteSegmentTransferTracker .Stats stats = eachStatsObject .getSegmentStats ();
441- if (eachStatsObject .getShardRouting ().primary ()) {
367+ long uploadBytesStarted = 0 , uploadBytesSucceeded = 0 , uploadBytesFailed = 0 ;
368+ List <Long > downloadBytesStarted = new ArrayList <>();
369+ List <Long > downloadBytesSucceeded = new ArrayList <>();
370+ List <Long > downloadBytesFailed = new ArrayList <>();
371+
372+ // Collect stats for primary and replica shards
373+ for (RemoteStoreStats statsObject : response .getRemoteStoreStats ()) {
374+ RemoteSegmentTransferTracker .Stats stats = statsObject .getSegmentStats ();
375+ if (statsObject .getShardRouting ().primary ()) {
442376 uploadBytesStarted = stats .uploadBytesStarted ;
443377 uploadBytesSucceeded = stats .uploadBytesSucceeded ;
444378 uploadBytesFailed = stats .uploadBytesFailed ;
@@ -449,17 +383,78 @@ public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() thr
449383 }
450384 }
451385
452- assertEquals ( 0 , uploadsFailed );
386+ // Assertions
453387 assertEquals (0 , uploadBytesFailed );
454388 for (int j = 0 ; j < response .getSuccessfulShards () - 1 ; j ++) {
455- assertTrue (uploadBytesStarted - zeroStatePrimaryStats .uploadBytesStarted > downloadBytesStarted .get (j ));
456- assertTrue (uploadBytesSucceeded - zeroStatePrimaryStats .uploadBytesSucceeded > downloadBytesSucceeded .get (j ));
389+ assertTrue (uploadBytesStarted - zeroStatePrimaryStats .get (). uploadBytesStarted > downloadBytesStarted .get (j ));
390+ assertTrue (uploadBytesSucceeded - zeroStatePrimaryStats .get (). uploadBytesSucceeded > downloadBytesSucceeded .get (j ));
457391 assertEquals (0 , (long ) downloadBytesFailed .get (j ));
458392 }
459- }, 60 , TimeUnit .SECONDS );
393+ });
394+ }
395+ }
396+
397+ // New helper method to validate zero state primary stats
398+ private void validateZeroStatePrimaryStats (RemoteSegmentTransferTracker .Stats primaryStats ) {
399+ logger .info ("Zero state primary stats: {}" , primaryStats );
400+ assertEquals (primaryStats .totalUploadsStarted , primaryStats .totalUploadsSucceeded );
401+ assertTrue (primaryStats .totalUploadsSucceeded >= 1 );
402+ assertEquals (primaryStats .uploadBytesStarted , primaryStats .uploadBytesSucceeded );
403+ assertTrue (primaryStats .uploadBytesSucceeded > 0 );
404+ assertEquals (0 , primaryStats .totalUploadsFailed );
405+ assertEquals (0 , primaryStats .uploadBytesFailed );
406+ }
407+
408+ // helper method to validate zero state replica stats
409+ private void validateZeroStateReplicaStats (RemoteStoreStatsResponse zeroStateResponse , boolean multipleShardsExpected ) {
410+ List <RemoteStoreStats > zeroStateReplicaStats = Arrays .stream (zeroStateResponse .getRemoteStoreStats ())
411+ .filter (remoteStoreStats -> !remoteStoreStats .getShardRouting ().primary ())
412+ .collect (Collectors .toList ());
413+
414+ if (multipleShardsExpected ) {
415+ zeroStateReplicaStats .forEach (stats -> {
416+ assertEquals (0 , stats .getSegmentStats ().directoryFileTransferTrackerStats .transferredBytesStarted );
417+ assertEquals (0 , stats .getSegmentStats ().directoryFileTransferTrackerStats .transferredBytesSucceeded );
418+ });
419+ } else {
420+ RemoteSegmentTransferTracker .Stats replicaStats = zeroStateReplicaStats .get (0 ).getSegmentStats ();
421+ assertEquals (0 , replicaStats .directoryFileTransferTrackerStats .transferredBytesStarted );
422+ assertEquals (0 , replicaStats .directoryFileTransferTrackerStats .transferredBytesSucceeded );
460423 }
461424 }
462425
426+ // New helper method for common test setup and zero state stats retrieval
427+ private SetOnce <RemoteSegmentTransferTracker .Stats > prepareZeroStateStats (
428+ Settings .Builder additionalSettings ,
429+ boolean multipleShardsExpected
430+ ) throws Exception {
431+ SetOnce <RemoteSegmentTransferTracker .Stats > zeroStatePrimaryStats = new SetOnce <>();
432+
433+ // Create index with specified settings
434+ createIndex (INDEX_NAME , additionalSettings .build ());
435+ ensureGreen (INDEX_NAME );
436+
437+ // Manually invoke a refresh
438+ refresh (INDEX_NAME );
439+
440+ assertBusy (() -> {
441+ RemoteStoreStatsResponse zeroStateResponse = client ().admin ().cluster ().prepareRemoteStoreStats (INDEX_NAME , "0" ).get ();
442+
443+ RemoteSegmentTransferTracker .Stats primaryStats = Arrays .stream (zeroStateResponse .getRemoteStoreStats ())
444+ .filter (remoteStoreStats -> remoteStoreStats .getShardRouting ().primary ())
445+ .collect (Collectors .toList ())
446+ .get (0 )
447+ .getSegmentStats ();
448+
449+ validateZeroStatePrimaryStats (primaryStats );
450+ validateZeroStateReplicaStats (zeroStateResponse , multipleShardsExpected );
451+
452+ zeroStatePrimaryStats .set (primaryStats );
453+ });
454+
455+ return zeroStatePrimaryStats ;
456+ }
457+
463458 public void testStatsOnShardRelocation () {
464459 setup ();
465460 // Scenario:
0 commit comments