4444import org .opensearch .common .settings .Settings ;
4545import org .opensearch .core .common .io .stream .StreamOutput ;
4646import org .opensearch .gateway .remote .ClusterMetadataManifest ;
47+ import org .opensearch .gateway .remote .ClusterStateDiffManifest ;
4748import org .opensearch .gateway .remote .RemoteClusterStateService ;
49+ import org .opensearch .gateway .remote .RemoteDownloadStats ;
4850import org .opensearch .node .Node ;
4951import org .opensearch .telemetry .tracing .noop .NoopTracer ;
5052import org .opensearch .test .OpenSearchTestCase ;
6264import static org .hamcrest .Matchers .containsString ;
6365import static org .hamcrest .Matchers .instanceOf ;
6466import static org .hamcrest .Matchers .is ;
67+ import static org .mockito .ArgumentMatchers .any ;
68+ import static org .mockito .Mockito .doAnswer ;
6569import static org .mockito .Mockito .mock ;
6670import static org .mockito .Mockito .times ;
71+ import static org .mockito .Mockito .verify ;
72+ import static org .mockito .Mockito .verifyNoMoreInteractions ;
6773import static org .mockito .Mockito .when ;
6874
6975public class PublicationTransportHandlerTests extends OpenSearchTestCase {
@@ -160,7 +166,8 @@ public void testHandleIncomingRemotePublishRequestWhenNoCurrentPublishRequest()
160166 () -> handler .handleIncomingRemotePublishRequest (remotePublishRequest )
161167 );
162168 assertThat (e .getMessage (), containsString ("publication to self failed" ));
163- Mockito .verifyNoInteractions (remoteClusterStateService );
169+ verify (remoteClusterStateService , times (1 )).readMetadataFailed ();
170+ verifyNoMoreInteractions (remoteClusterStateService );
164171 }
165172
166173 public void testHandleIncomingRemotePublishRequestWhenTermMismatch () {
@@ -185,7 +192,8 @@ public void testHandleIncomingRemotePublishRequestWhenTermMismatch() {
185192 () -> handler .handleIncomingRemotePublishRequest (remotePublishRequest )
186193 );
187194 assertThat (e .getMessage (), containsString ("publication to self failed" ));
188- Mockito .verifyNoInteractions (remoteClusterStateService );
195+ verify (remoteClusterStateService , times (1 )).readMetadataFailed ();
196+ verifyNoMoreInteractions (remoteClusterStateService );
189197 }
190198
191199 public void testHandleIncomingRemotePublishRequestWhenVersionMismatch () {
@@ -210,7 +218,8 @@ public void testHandleIncomingRemotePublishRequestWhenVersionMismatch() {
210218 () -> handler .handleIncomingRemotePublishRequest (remotePublishRequest )
211219 );
212220 assertThat (e .getMessage (), containsString ("publication to self failed" ));
213- Mockito .verifyNoInteractions (remoteClusterStateService );
221+ verify (remoteClusterStateService , times (1 )).readMetadataFailed ();
222+ verifyNoMoreInteractions (remoteClusterStateService );
214223 }
215224
216225 public void testHandleIncomingRemotePublishRequestForLocalNode () throws IOException {
@@ -235,6 +244,119 @@ public void testHandleIncomingRemotePublishRequestForLocalNode() throws IOExcept
235244 Mockito .verifyNoInteractions (remoteClusterStateService );
236245 }
237246
247+ public void testDownloadRemotePersistedFailedStats () throws IOException {
248+ RemoteDownloadStats remoteDownloadStats = new RemoteDownloadStats ();
249+ RemoteClusterStateService remoteClusterStateService = mock (RemoteClusterStateService .class );
250+ when (remoteClusterStateService .getDownloadStats ()).thenReturn (remoteDownloadStats );
251+
252+ doAnswer ((i ) -> {
253+ remoteDownloadStats .stateFailed ();
254+ return null ;
255+ }).when (remoteClusterStateService ).readMetadataFailed ();
256+
257+ PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse (new PublishResponse (TERM , VERSION ), Optional .empty ());
258+ Function <PublishRequest , PublishWithJoinResponse > handlePublishRequest = p -> expectedPublishResponse ;
259+ final PublicationTransportHandler handler = getPublicationTransportHandler (handlePublishRequest , remoteClusterStateService );
260+ RemotePublishRequest remotePublishRequest = new RemotePublishRequest (
261+ secondNode ,
262+ TERM ,
263+ VERSION ,
264+ CLUSTER_NAME ,
265+ CLUSTER_UUID ,
266+ MANIFEST_FILE
267+ );
268+ ClusterState clusterState = buildClusterState (TERM , VERSION );
269+ PublishRequest publishRequest = new PublishRequest (clusterState );
270+ handler .setCurrentPublishRequestToSelf (publishRequest );
271+
272+ assertThrows (IllegalStateException .class , () -> handler .handleIncomingRemotePublishRequest (remotePublishRequest ));
273+ assertEquals (1 , remoteClusterStateService .getDownloadStats ().getFailedCount ());
274+ }
275+
276+ public void testDownloadRemotePersistedDiffStats () throws IOException {
277+ RemoteDownloadStats remoteDownloadStats = new RemoteDownloadStats ();
278+ RemoteClusterStateService remoteClusterStateService = mock (RemoteClusterStateService .class );
279+ when (remoteClusterStateService .getDownloadStats ()).thenReturn (remoteDownloadStats );
280+ ClusterMetadataManifest metadataManifest = new ClusterMetadataManifest .Builder ().diffManifest (
281+ new ClusterStateDiffManifest .Builder ().fromStateUUID ("state-uuid" ).build ()
282+ ).build ();
283+ when (remoteClusterStateService .getClusterMetadataManifestByFileName (any (), any ())).thenReturn (metadataManifest );
284+
285+ doAnswer ((i ) -> {
286+ remoteDownloadStats .diffDownloadState ();
287+ return null ;
288+ }).when (remoteClusterStateService ).diffDownloadState ();
289+
290+ doAnswer ((i ) -> {
291+ remoteDownloadStats .fullDownloadState ();
292+ return null ;
293+ }).when (remoteClusterStateService ).fullDownloadState ();
294+
295+ PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse (new PublishResponse (TERM , VERSION ), Optional .empty ());
296+ Function <PublishRequest , PublishWithJoinResponse > handlePublishRequest = p -> expectedPublishResponse ;
297+ final PublicationTransportHandler handler = getPublicationTransportHandler (handlePublishRequest , remoteClusterStateService );
298+ ClusterState clusterState = mock (ClusterState .class );
299+ handler .setLastSeenClusterState (clusterState );
300+ when (clusterState .stateUUID ()).thenReturn ("state-uuid" );
301+
302+ RemotePublishRequest remotePublishRequest = new RemotePublishRequest (
303+ secondNode ,
304+ TERM ,
305+ VERSION ,
306+ CLUSTER_NAME ,
307+ CLUSTER_UUID ,
308+ MANIFEST_FILE
309+ );
310+ clusterState = buildClusterState (TERM , VERSION );
311+ PublishRequest publishRequest = new PublishRequest (clusterState );
312+ handler .setCurrentPublishRequestToSelf (publishRequest );
313+ assertThrows (NullPointerException .class , () -> handler .handleIncomingRemotePublishRequest (remotePublishRequest ));
314+ assertEquals (1 , remoteDownloadStats .getDiffDownloadCount ());
315+ assertEquals (0 , remoteDownloadStats .getFullDownloadCount ());
316+ }
317+
318+ public void testDownloadRemotePersistedFullStats () throws IOException {
319+ RemoteDownloadStats remoteDownloadStats = new RemoteDownloadStats ();
320+ RemoteClusterStateService remoteClusterStateService = mock (RemoteClusterStateService .class );
321+ when (remoteClusterStateService .getDownloadStats ()).thenReturn (remoteDownloadStats );
322+ ClusterMetadataManifest metadataManifest = new ClusterMetadataManifest .Builder ().diffManifest (
323+ new ClusterStateDiffManifest .Builder ().fromStateUUID ("state-uuid2" ).build ()
324+ ).build ();
325+ when (remoteClusterStateService .getClusterMetadataManifestByFileName (any (), any ())).thenReturn (metadataManifest );
326+
327+ doAnswer ((i ) -> {
328+ remoteDownloadStats .diffDownloadState ();
329+ return null ;
330+ }).when (remoteClusterStateService ).diffDownloadState ();
331+
332+ doAnswer ((i ) -> {
333+ remoteDownloadStats .fullDownloadState ();
334+ return null ;
335+ }).when (remoteClusterStateService ).fullDownloadState ();
336+
337+ PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse (new PublishResponse (TERM , VERSION ), Optional .empty ());
338+ Function <PublishRequest , PublishWithJoinResponse > handlePublishRequest = p -> expectedPublishResponse ;
339+ final PublicationTransportHandler handler = getPublicationTransportHandler (handlePublishRequest , remoteClusterStateService );
340+ ClusterState clusterState = mock (ClusterState .class );
341+ handler .setLastSeenClusterState (clusterState );
342+ when (clusterState .stateUUID ()).thenReturn ("state-uuid" );
343+
344+ RemotePublishRequest remotePublishRequest = new RemotePublishRequest (
345+ secondNode ,
346+ TERM ,
347+ VERSION ,
348+ CLUSTER_NAME ,
349+ CLUSTER_UUID ,
350+ MANIFEST_FILE
351+ );
352+ clusterState = buildClusterState (TERM , VERSION );
353+ PublishRequest publishRequest = new PublishRequest (clusterState );
354+ handler .setCurrentPublishRequestToSelf (publishRequest );
355+ assertThrows (NullPointerException .class , () -> handler .handleIncomingRemotePublishRequest (remotePublishRequest ));
356+ assertEquals (0 , remoteDownloadStats .getDiffDownloadCount ());
357+ assertEquals (1 , remoteDownloadStats .getFullDownloadCount ());
358+ }
359+
238360 public void testHandleIncomingRemotePublishRequestWhenManifestNotFound () throws IOException {
239361 RemoteClusterStateService remoteClusterStateService = mock (RemoteClusterStateService .class );
240362
0 commit comments