4040import org .opensearch .cluster .ClusterState ;
4141import org .opensearch .cluster .Diff ;
4242import org .opensearch .cluster .IncompatibleClusterStateVersionException ;
43+ import org .opensearch .cluster .coordination .PersistedStateRegistry .PersistedStateType ;
4344import org .opensearch .cluster .node .DiscoveryNode ;
4445import org .opensearch .cluster .node .DiscoveryNodes ;
4546import org .opensearch .core .action .ActionListener ;
4647import org .opensearch .core .common .bytes .BytesReference ;
4748import org .opensearch .core .common .io .stream .NamedWriteableRegistry ;
4849import org .opensearch .core .common .io .stream .StreamInput ;
4950import org .opensearch .core .transport .TransportResponse ;
51+ import org .opensearch .gateway .GatewayMetaState .RemotePersistedState ;
52+ import org .opensearch .gateway .remote .ClusterMetadataManifest ;
53+ import org .opensearch .gateway .remote .RemoteClusterStateService ;
5054import org .opensearch .threadpool .ThreadPool ;
5155import org .opensearch .transport .BytesTransportRequest ;
5256import org .opensearch .transport .TransportChannel ;
@@ -74,6 +78,7 @@ public class PublicationTransportHandler {
7478 private static final Logger logger = LogManager .getLogger (PublicationTransportHandler .class );
7579
7680 public static final String PUBLISH_STATE_ACTION_NAME = "internal:cluster/coordination/publish_state" ;
81+ public static final String PUBLISH_REMOTE_STATE_ACTION_NAME = "internal:cluster/coordination/publish_remote_state" ;
7782 public static final String COMMIT_STATE_ACTION_NAME = "internal:cluster/coordination/commit_state" ;
7883
7984 private final TransportService transportService ;
@@ -97,16 +102,19 @@ public class PublicationTransportHandler {
97102 private final TransportRequestOptions stateRequestOptions = TransportRequestOptions .builder ()
98103 .withType (TransportRequestOptions .Type .STATE )
99104 .build ();
105+ private final RemoteClusterStateService remoteClusterStateService ;
100106
101107 public PublicationTransportHandler (
102108 TransportService transportService ,
103109 NamedWriteableRegistry namedWriteableRegistry ,
104110 Function <PublishRequest , PublishWithJoinResponse > handlePublishRequest ,
105- BiConsumer <ApplyCommitRequest , ActionListener <Void >> handleApplyCommit
111+ BiConsumer <ApplyCommitRequest , ActionListener <Void >> handleApplyCommit ,
112+ RemoteClusterStateService remoteClusterStateService
106113 ) {
107114 this .transportService = transportService ;
108115 this .namedWriteableRegistry = namedWriteableRegistry ;
109116 this .handlePublishRequest = handlePublishRequest ;
117+ this .remoteClusterStateService = remoteClusterStateService ;
110118
111119 transportService .registerRequestHandler (
112120 PUBLISH_STATE_ACTION_NAME ,
@@ -117,6 +125,15 @@ public PublicationTransportHandler(
117125 (request , channel , task ) -> channel .sendResponse (handleIncomingPublishRequest (request ))
118126 );
119127
128+ transportService .registerRequestHandler (
129+ PUBLISH_REMOTE_STATE_ACTION_NAME ,
130+ ThreadPool .Names .GENERIC ,
131+ false ,
132+ false ,
133+ RemotePublishRequest ::new ,
134+ (request , channel , task ) -> channel .sendResponse (handleIncomingRemotePublishRequest (request ))
135+ );
136+
120137 transportService .registerRequestHandler (
121138 COMMIT_STATE_ACTION_NAME ,
122139 ThreadPool .Names .GENERIC ,
@@ -211,6 +228,74 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
211228 }
212229 }
213230
231+ // package private for testing
232+ PublishWithJoinResponse handleIncomingRemotePublishRequest (RemotePublishRequest request ) throws IOException {
233+ if (transportService .getLocalNode ().equals (request .getSourceNode ())) {
234+ return acceptRemoteStateOnLocalNode (request );
235+ }
236+ // TODO Make cluster state download non-blocking: https://github.com/opensearch-project/OpenSearch/issues/14102
237+ ClusterMetadataManifest manifest = remoteClusterStateService .getClusterMetadataManifestByFileName (
238+ request .getClusterUUID (),
239+ request .getManifestFile ()
240+ );
241+ if (manifest == null ) {
242+ throw new IllegalStateException ("Publication failed as manifest was not found for " + request );
243+ }
244+ boolean applyFullState = false ;
245+ final ClusterState lastSeen = lastSeenClusterState .get ();
246+ if (lastSeen == null ) {
247+ logger .debug (() -> "Diff cannot be applied as there is no last cluster state" );
248+ applyFullState = true ;
249+ } else if (manifest .getDiffManifest () == null ) {
250+ logger .trace (() -> "There is no diff in the manifest" );
251+ applyFullState = true ;
252+ } else if (manifest .getDiffManifest ().getFromStateUUID ().equals (lastSeen .stateUUID ()) == false ) {
253+ logger .debug (() -> "Last cluster state not compatible with the diff" );
254+ applyFullState = true ;
255+ }
256+
257+ if (applyFullState == true ) {
258+ logger .debug (
259+ () -> new ParameterizedMessage (
260+ "Downloading full cluster state for term {}, version {}, stateUUID {}" ,
261+ manifest .getClusterTerm (),
262+ manifest .getStateVersion (),
263+ manifest .getStateUUID ()
264+ )
265+ );
266+ ClusterState clusterState = remoteClusterStateService .getClusterStateForManifest (
267+ request .getClusterName (),
268+ manifest ,
269+ transportService .getLocalNode ().getId (),
270+ true
271+ );
272+ fullClusterStateReceivedCount .incrementAndGet ();
273+ final PublishWithJoinResponse response = acceptState (clusterState );
274+ lastSeenClusterState .set (clusterState );
275+ return response ;
276+ } else {
277+ logger .debug (
278+ () -> new ParameterizedMessage (
279+ "Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}" ,
280+ manifest .getClusterTerm (),
281+ manifest .getStateVersion (),
282+ manifest .getDiffManifest ().getFromStateUUID (),
283+ manifest .getStateUUID ()
284+ )
285+ );
286+ ClusterState clusterState = remoteClusterStateService .getClusterStateUsingDiff (
287+ request .getClusterName (),
288+ manifest ,
289+ lastSeen ,
290+ transportService .getLocalNode ().getId ()
291+ );
292+ compatibleClusterStateDiffReceivedCount .incrementAndGet ();
293+ final PublishWithJoinResponse response = acceptState (clusterState );
294+ lastSeenClusterState .compareAndSet (lastSeen , clusterState );
295+ return response ;
296+ }
297+ }
298+
214299 private PublishWithJoinResponse acceptState (ClusterState incomingState ) {
215300 // if the state is coming from the current node, use original request instead (see currentPublishRequestToSelf for explanation)
216301 if (transportService .getLocalNode ().equals (incomingState .nodes ().getClusterManagerNode ())) {
@@ -224,8 +309,35 @@ private PublishWithJoinResponse acceptState(ClusterState incomingState) {
224309 return handlePublishRequest .apply (new PublishRequest (incomingState ));
225310 }
226311
227- public PublicationContext newPublicationContext (ClusterChangedEvent clusterChangedEvent ) {
228- final PublicationContext publicationContext = new PublicationContext (clusterChangedEvent );
312+ private PublishWithJoinResponse acceptRemoteStateOnLocalNode (RemotePublishRequest remotePublishRequest ) {
313+ final PublishRequest publishRequest = currentPublishRequestToSelf .get ();
314+ if (publishRequest == null
315+ || publishRequest .getAcceptedState ().coordinationMetadata ().term () != remotePublishRequest .term
316+ || publishRequest .getAcceptedState ().version () != remotePublishRequest .version ) {
317+ logger .debug (
318+ () -> new ParameterizedMessage (
319+ "Publication failure for current publish request : {} and remote publish request: {}" ,
320+ publishRequest ,
321+ remotePublishRequest
322+ )
323+ );
324+ throw new IllegalStateException ("publication to self failed for " + remotePublishRequest );
325+ }
326+ PublishWithJoinResponse publishWithJoinResponse = handlePublishRequest .apply (publishRequest );
327+ lastSeenClusterState .set (publishRequest .getAcceptedState ());
328+ return publishWithJoinResponse ;
329+ }
330+
331+ public PublicationContext newPublicationContext (
332+ ClusterChangedEvent clusterChangedEvent ,
333+ boolean isRemotePublicationEnabled ,
334+ PersistedStateRegistry persistedStateRegistry
335+ ) {
336+ final PublicationContext publicationContext = new PublicationContext (
337+ clusterChangedEvent ,
338+ isRemotePublicationEnabled ,
339+ persistedStateRegistry
340+ );
229341
230342 // Build the serializations we expect to need now, early in the process, so that an error during serialization fails the publication
231343 // straight away. This isn't watertight since we send diffs on a best-effort basis and may fall back to sending a full state (and
@@ -234,6 +346,16 @@ public PublicationContext newPublicationContext(ClusterChangedEvent clusterChang
234346 return publicationContext ;
235347 }
236348
349+ // package private for testing
350+ void setCurrentPublishRequestToSelf (PublishRequest publishRequest ) {
351+ this .currentPublishRequestToSelf .set (publishRequest );
352+ }
353+
354+ // package private for testing
355+ void setLastSeenClusterState (ClusterState clusterState ) {
356+ this .lastSeenClusterState .set (clusterState );
357+ }
358+
237359 private static BytesReference serializeFullClusterState (ClusterState clusterState , Version nodeVersion ) throws IOException {
238360 final BytesReference serializedState = CompressedStreamUtils .createCompressedStream (nodeVersion , stream -> {
239361 stream .writeBoolean (true );
@@ -270,12 +392,20 @@ public class PublicationContext {
270392 private final boolean sendFullVersion ;
271393 private final Map <Version , BytesReference > serializedStates = new HashMap <>();
272394 private final Map <Version , BytesReference > serializedDiffs = new HashMap <>();
395+ private final boolean sendRemoteState ;
396+ private final PersistedStateRegistry persistedStateRegistry ;
273397
274- PublicationContext (ClusterChangedEvent clusterChangedEvent ) {
398+ PublicationContext (
399+ ClusterChangedEvent clusterChangedEvent ,
400+ boolean isRemotePublicationEnabled ,
401+ PersistedStateRegistry persistedStateRegistry
402+ ) {
275403 discoveryNodes = clusterChangedEvent .state ().nodes ();
276404 newState = clusterChangedEvent .state ();
277405 previousState = clusterChangedEvent .previousState ();
278406 sendFullVersion = previousState .getBlocks ().disableStatePersistence ();
407+ sendRemoteState = isRemotePublicationEnabled ;
408+ this .persistedStateRegistry = persistedStateRegistry ;
279409 }
280410
281411 void buildDiffAndSerializeStates () {
@@ -339,7 +469,11 @@ public void onFailure(Exception e) {
339469 } else {
340470 responseActionListener = listener ;
341471 }
342- if (sendFullVersion || previousState .nodes ().nodeExists (destination ) == false ) {
472+ // TODO Decide to send remote state before starting publication by checking remote publication on all nodes
473+ if (sendRemoteState && destination .isRemoteStatePublicationEnabled ()) {
474+ logger .trace ("sending remote cluster state version [{}] to [{}]" , newState .version (), destination );
475+ sendRemoteClusterState (destination , publishRequest .getAcceptedState (), responseActionListener );
476+ } else if (sendFullVersion || previousState .nodes ().nodeExists (destination ) == false ) {
343477 logger .trace ("sending full cluster state version [{}] to [{}]" , newState .version (), destination );
344478 sendFullClusterState (destination , responseActionListener );
345479 } else {
@@ -384,6 +518,61 @@ public String executor() {
384518 );
385519 }
386520
521+ private void sendRemoteClusterState (
522+ final DiscoveryNode destination ,
523+ final ClusterState clusterState ,
524+ final ActionListener <PublishWithJoinResponse > listener
525+ ) {
526+ try {
527+ final String manifestFileName = ((RemotePersistedState ) persistedStateRegistry .getPersistedState (PersistedStateType .REMOTE ))
528+ .getLastUploadedManifestFile ();
529+ final RemotePublishRequest remotePublishRequest = new RemotePublishRequest (
530+ discoveryNodes .getLocalNode (),
531+ clusterState .term (),
532+ clusterState .getVersion (),
533+ clusterState .getClusterName ().value (),
534+ clusterState .metadata ().clusterUUID (),
535+ manifestFileName
536+ );
537+ final Consumer <TransportException > transportExceptionHandler = exp -> {
538+ logger .debug (() -> new ParameterizedMessage ("failed to send remote cluster state to {}" , destination ), exp );
539+ listener .onFailure (exp );
540+ };
541+ final TransportResponseHandler <PublishWithJoinResponse > responseHandler = new TransportResponseHandler <>() {
542+
543+ @ Override
544+ public PublishWithJoinResponse read (StreamInput in ) throws IOException {
545+ return new PublishWithJoinResponse (in );
546+ }
547+
548+ @ Override
549+ public void handleResponse (PublishWithJoinResponse response ) {
550+ listener .onResponse (response );
551+ }
552+
553+ @ Override
554+ public void handleException (TransportException exp ) {
555+ transportExceptionHandler .accept (exp );
556+ }
557+
558+ @ Override
559+ public String executor () {
560+ return ThreadPool .Names .GENERIC ;
561+ }
562+ };
563+ transportService .sendRequest (
564+ destination ,
565+ PUBLISH_REMOTE_STATE_ACTION_NAME ,
566+ remotePublishRequest ,
567+ stateRequestOptions ,
568+ responseHandler
569+ );
570+ } catch (Exception e ) {
571+ logger .warn (() -> new ParameterizedMessage ("error sending remote cluster state to {}" , destination ), e );
572+ listener .onFailure (e );
573+ }
574+ }
575+
387576 private void sendFullClusterState (DiscoveryNode destination , ActionListener <PublishWithJoinResponse > listener ) {
388577 BytesReference bytes = serializedStates .get (destination .getVersion ());
389578 if (bytes == null ) {
0 commit comments