1111import org .apache .logging .log4j .LogManager ;
1212import org .apache .logging .log4j .Logger ;
1313import org .apache .logging .log4j .message .ParameterizedMessage ;
14- import org .apache .lucene .index .CorruptIndexException ;
1514import org .opensearch .ExceptionsHelper ;
16- import org .opensearch .OpenSearchCorruptionException ;
1715import org .opensearch .action .support .ChannelActionListener ;
1816import org .opensearch .cluster .ClusterChangedEvent ;
1917import org .opensearch .cluster .ClusterStateListener ;
2422import org .opensearch .common .lifecycle .AbstractLifecycleComponent ;
2523import org .opensearch .common .settings .Settings ;
2624import org .opensearch .common .util .CancellableThreads ;
27- import org .opensearch .common .util .concurrent .AbstractRunnable ;
2825import org .opensearch .common .util .concurrent .ConcurrentCollections ;
2926import org .opensearch .core .action .ActionListener ;
3027import org .opensearch .core .index .shard .ShardId ;
3330import org .opensearch .index .shard .IndexEventListener ;
3431import org .opensearch .index .shard .IndexShard ;
3532import org .opensearch .index .shard .IndexShardState ;
36- import org .opensearch .index .store .Store ;
3733import org .opensearch .indices .IndicesService ;
3834import org .opensearch .indices .recovery .FileChunkRequest ;
3935import org .opensearch .indices .recovery .ForceSyncRequest ;
6157import static org .opensearch .indices .replication .SegmentReplicationSourceService .Actions .UPDATE_VISIBLE_CHECKPOINT ;
6258
6359/**
64- * Service class that orchestrates replication events on replicas.
60+ * Service class that handles incoming checkpoints to initiate replication events on replicas.
6561 *
6662 * @opensearch.internal
6763 */
@@ -72,17 +68,14 @@ public class SegmentReplicationTargetService extends AbstractLifecycleComponent
7268 private final ThreadPool threadPool ;
7369 private final RecoverySettings recoverySettings ;
7470
75- private final ReplicationCollection <SegmentReplicationTarget > onGoingReplications ;
76-
77- private final Map <ShardId , SegmentReplicationState > completedReplications = ConcurrentCollections .newConcurrentMap ();
78-
7971 private final SegmentReplicationSourceFactory sourceFactory ;
8072
8173 protected final Map <ShardId , ReplicationCheckpoint > latestReceivedCheckpoint = ConcurrentCollections .newConcurrentMap ();
8274
8375 private final IndicesService indicesService ;
8476 private final ClusterService clusterService ;
8577 private final TransportService transportService ;
78+ private final SegmentReplicator replicator ;
8679
8780 /**
8881 * The internal actions
@@ -94,6 +87,7 @@ public static class Actions {
9487 public static final String FORCE_SYNC = "internal:index/shard/replication/segments_sync" ;
9588 }
9689
90+ @ Deprecated
9791 public SegmentReplicationTargetService (
9892 final ThreadPool threadPool ,
9993 final RecoverySettings recoverySettings ,
@@ -113,6 +107,7 @@ public SegmentReplicationTargetService(
113107 );
114108 }
115109
110+ @ Deprecated
116111 public SegmentReplicationTargetService (
117112 final ThreadPool threadPool ,
118113 final RecoverySettings recoverySettings ,
@@ -121,14 +116,34 @@ public SegmentReplicationTargetService(
121116 final IndicesService indicesService ,
122117 final ClusterService clusterService ,
123118 final ReplicationCollection <SegmentReplicationTarget > ongoingSegmentReplications
119+ ) {
120+ this (
121+ threadPool ,
122+ recoverySettings ,
123+ transportService ,
124+ sourceFactory ,
125+ indicesService ,
126+ clusterService ,
127+ new SegmentReplicator (threadPool )
128+ );
129+ }
130+
131+ public SegmentReplicationTargetService (
132+ final ThreadPool threadPool ,
133+ final RecoverySettings recoverySettings ,
134+ final TransportService transportService ,
135+ final SegmentReplicationSourceFactory sourceFactory ,
136+ final IndicesService indicesService ,
137+ final ClusterService clusterService ,
138+ final SegmentReplicator replicator
124139 ) {
125140 this .threadPool = threadPool ;
126141 this .recoverySettings = recoverySettings ;
127- this .onGoingReplications = ongoingSegmentReplications ;
128142 this .sourceFactory = sourceFactory ;
129143 this .indicesService = indicesService ;
130144 this .clusterService = clusterService ;
131145 this .transportService = transportService ;
146+ this .replicator = replicator ;
132147
133148 transportService .registerRequestHandler (
134149 Actions .FILE_CHUNK ,
@@ -154,7 +169,7 @@ protected void doStart() {
154169 @ Override
155170 protected void doStop () {
156171 if (DiscoveryNode .isDataNode (clusterService .getSettings ())) {
157- assert onGoingReplications .size () == 0 : "Replication collection should be empty on shutdown" ;
172+ assert replicator .size () == 0 : "Replication collection should be empty on shutdown" ;
158173 clusterService .removeListener (this );
159174 }
160175 }
@@ -199,7 +214,7 @@ public void clusterChanged(ClusterChangedEvent event) {
199214 @ Override
200215 public void beforeIndexShardClosed (ShardId shardId , @ Nullable IndexShard indexShard , Settings indexSettings ) {
201216 if (indexShard != null && indexShard .indexSettings ().isSegRepEnabledOrRemoteNode ()) {
202- onGoingReplications . cancelForShard (indexShard .shardId (), "Shard closing" );
217+ replicator . cancel (indexShard .shardId (), "Shard closing" );
203218 latestReceivedCheckpoint .remove (shardId );
204219 }
205220 }
@@ -224,7 +239,7 @@ public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting ol
224239 && indexShard .indexSettings ().isSegRepEnabledOrRemoteNode ()
225240 && oldRouting .primary () == false
226241 && newRouting .primary ()) {
227- onGoingReplications . cancelForShard (indexShard .shardId (), "Shard has been promoted to primary" );
242+ replicator . cancel (indexShard .shardId (), "Shard has been promoted to primary" );
228243 latestReceivedCheckpoint .remove (indexShard .shardId ());
229244 }
230245 }
@@ -234,17 +249,15 @@ public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting ol
234249 */
235250 @ Nullable
236251 public SegmentReplicationState getOngoingEventSegmentReplicationState (ShardId shardId ) {
237- return Optional .ofNullable (onGoingReplications .getOngoingReplicationTarget (shardId ))
238- .map (SegmentReplicationTarget ::state )
239- .orElse (null );
252+ return Optional .ofNullable (replicator .get (shardId )).map (SegmentReplicationTarget ::state ).orElse (null );
240253 }
241254
242255 /**
243256 * returns SegmentReplicationState of latest completed segment replication events.
244257 */
245258 @ Nullable
246259 public SegmentReplicationState getlatestCompletedEventSegmentReplicationState (ShardId shardId ) {
247- return completedReplications . get (shardId );
260+ return replicator . getCompleted (shardId );
248261 }
249262
250263 /**
@@ -257,11 +270,11 @@ public SegmentReplicationState getSegmentReplicationState(ShardId shardId) {
257270 }
258271
259272 public ReplicationRef <SegmentReplicationTarget > get (long replicationId ) {
260- return onGoingReplications .get (replicationId );
273+ return replicator .get (replicationId );
261274 }
262275
263276 public SegmentReplicationTarget get (ShardId shardId ) {
264- return onGoingReplications . getOngoingReplicationTarget (shardId );
277+ return replicator . get (shardId );
265278 }
266279
267280 /**
@@ -285,7 +298,7 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe
285298 // checkpoint to be replayed once the shard is Active.
286299 if (replicaShard .state ().equals (IndexShardState .STARTED ) == true ) {
287300 // Checks if received checkpoint is already present and ahead then it replaces old received checkpoint
288- SegmentReplicationTarget ongoingReplicationTarget = onGoingReplications . getOngoingReplicationTarget (replicaShard .shardId ());
301+ SegmentReplicationTarget ongoingReplicationTarget = replicator . get (replicaShard .shardId ());
289302 if (ongoingReplicationTarget != null ) {
290303 if (ongoingReplicationTarget .getCheckpoint ().getPrimaryTerm () < receivedCheckpoint .getPrimaryTerm ()) {
291304 logger .debug (
@@ -504,28 +517,12 @@ public SegmentReplicationTarget startReplication(
504517 final ReplicationCheckpoint checkpoint ,
505518 final SegmentReplicationListener listener
506519 ) {
507- final SegmentReplicationTarget target = new SegmentReplicationTarget (
508- indexShard ,
509- checkpoint ,
510- sourceFactory .get (indexShard ),
511- listener
512- );
513- startReplication (target );
514- return target ;
520+ return replicator .startReplication (indexShard , checkpoint , sourceFactory .get (indexShard ), listener );
515521 }
516522
517523 // pkg-private for integration tests
518524 void startReplication (final SegmentReplicationTarget target ) {
519- final long replicationId ;
520- try {
521- replicationId = onGoingReplications .startSafe (target , recoverySettings .activityTimeout ());
522- } catch (ReplicationFailedException e ) {
523- // replication already running for shard.
524- target .fail (e , false );
525- return ;
526- }
527- logger .trace (() -> new ParameterizedMessage ("Added new replication to collection {}" , target .description ()));
528- threadPool .generic ().execute (new ReplicationRunner (replicationId ));
525+ replicator .startReplication (target , recoverySettings .activityTimeout ());
529526 }
530527
531528 /**
@@ -550,89 +547,14 @@ default void onFailure(ReplicationState state, ReplicationFailedException e, boo
550547 void onReplicationFailure (SegmentReplicationState state , ReplicationFailedException e , boolean sendShardFailure );
551548 }
552549
553- /**
554- * Runnable implementation to trigger a replication event.
555- */
556- private class ReplicationRunner extends AbstractRunnable {
557-
558- final long replicationId ;
559-
560- public ReplicationRunner (long replicationId ) {
561- this .replicationId = replicationId ;
562- }
563-
564- @ Override
565- public void onFailure (Exception e ) {
566- onGoingReplications .fail (replicationId , new ReplicationFailedException ("Unexpected Error during replication" , e ), false );
567- }
568-
569- @ Override
570- public void doRun () {
571- start (replicationId );
572- }
573- }
574-
575- private void start (final long replicationId ) {
576- final SegmentReplicationTarget target ;
577- try (ReplicationRef <SegmentReplicationTarget > replicationRef = onGoingReplications .get (replicationId )) {
578- // This check is for handling edge cases where the reference is removed before the ReplicationRunner is started by the
579- // threadpool.
580- if (replicationRef == null ) {
581- return ;
582- }
583- target = replicationRef .get ();
584- }
585- target .startReplication (new ActionListener <>() {
586- @ Override
587- public void onResponse (Void o ) {
588- logger .debug (() -> new ParameterizedMessage ("Finished replicating {} marking as done." , target .description ()));
589- onGoingReplications .markAsDone (replicationId );
590- if (target .state ().getIndex ().recoveredFileCount () != 0 && target .state ().getIndex ().recoveredBytes () != 0 ) {
591- completedReplications .put (target .shardId (), target .state ());
592- }
593- }
594-
595- @ Override
596- public void onFailure (Exception e ) {
597- logger .debug ("Replication failed {}" , target .description ());
598- if (isStoreCorrupt (target ) || e instanceof CorruptIndexException || e instanceof OpenSearchCorruptionException ) {
599- onGoingReplications .fail (replicationId , new ReplicationFailedException ("Store corruption during replication" , e ), true );
600- return ;
601- }
602- onGoingReplications .fail (replicationId , new ReplicationFailedException ("Segment Replication failed" , e ), false );
603- }
604- });
605- }
606-
607- private boolean isStoreCorrupt (SegmentReplicationTarget target ) {
608- // ensure target is not already closed. In that case
609- // we can assume the store is not corrupt and that the replication
610- // event completed successfully.
611- if (target .refCount () > 0 ) {
612- final Store store = target .store ();
613- if (store .tryIncRef ()) {
614- try {
615- return store .isMarkedCorrupted ();
616- } catch (IOException ex ) {
617- logger .warn ("Unable to determine if store is corrupt" , ex );
618- return false ;
619- } finally {
620- store .decRef ();
621- }
622- }
623- }
624- // store already closed.
625- return false ;
626- }
627-
628550 private class FileChunkTransportRequestHandler implements TransportRequestHandler <FileChunkRequest > {
629551
630552 // How many bytes we've copied since we last called RateLimiter.pause
631553 final AtomicLong bytesSinceLastPause = new AtomicLong ();
632554
633555 @ Override
634556 public void messageReceived (final FileChunkRequest request , TransportChannel channel , Task task ) throws Exception {
635- try (ReplicationRef <SegmentReplicationTarget > ref = onGoingReplications . getSafe (request .recoveryId (), request .shardId ())) {
557+ try (ReplicationRef <SegmentReplicationTarget > ref = replicator . get (request .recoveryId (), request .shardId ())) {
636558 final SegmentReplicationTarget target = ref .get ();
637559 final ActionListener <Void > listener = target .createOrFinishListener (channel , Actions .FILE_CHUNK , request );
638560 target .handleFileChunk (request , target , bytesSinceLastPause , recoverySettings .replicationRateLimiter (), listener );
0 commit comments