diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/activate/TransportActivateWatchAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/activate/TransportActivateWatchAction.java index 776c76524796b..7178773f61624 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/activate/TransportActivateWatchAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/activate/TransportActivateWatchAction.java @@ -54,6 +54,7 @@ public class TransportActivateWatchAction extends WatcherTransportAction { + UpdateRequest request = (UpdateRequest) invocation.getArguments()[0]; + ActionListener listener = (ActionListener) invocation.getArguments()[1]; + + ShardId shardId = new ShardId(new Index(Watch.INDEX, "uuid"), 0); + listener.onResponse(new UpdateResponse(shardId, request.type(), request.id(), request.version(), + DocWriteResponse.Result.UPDATED)); + + return null; + }).when(client).update(any(), any()); + + // mock an get response that calls the listener + doAnswer(invocation -> { + GetRequest request = (GetRequest) invocation.getArguments()[0]; + ActionListener listener = (ActionListener) invocation.getArguments()[1]; + + GetResult getResult = new GetResult(request.index(), request.type(), request.id(), request.version(), true, null, + Collections.emptyMap()); + listener.onResponse(new GetResponse(getResult)); + + return null; + }).when(client).get(any(), any()); + + action = new TransportActivateWatchAction(Settings.EMPTY, transportService, threadPool, + new ActionFilters(Collections.emptySet()), new IndexNameExpressionResolver(Settings.EMPTY), new ClockMock(), + new XPackLicenseState(Settings.EMPTY), parser, clusterService, client, triggerService); + } + + // when running in distributed mode, watches are only triggered by the indexing operation listener + public void testWatchesAreNotTriggeredWhenDistributed() throws Exception { + boolean watchActivated = randomBoolean(); + ActivateWatchRequest request = new ActivateWatchRequest("watch_id", watchActivated); + ActionListener listener = PlainActionFuture.newFuture(); + + // add a few nodes, with current versions + ClusterState clusterState = ClusterState.builder(new ClusterName("my_cluster")) + .nodes(DiscoveryNodes.builder() + .masterNodeId("node_1") + .localNodeId(randomFrom("node_1", "node_2")) + .add(newNode("node_1", Version.CURRENT)) + .add(newNode("node_2", Version.CURRENT))) + .build(); + when(clusterService.state()).thenReturn(clusterState); + mockWatchStatus(watchActivated); + + action.masterOperation(request, clusterState, listener); + + verifyNoMoreInteractions(triggerService); + } + + public void testWatchesAreNotTriggeredOnNonMasterWhenNotDistributed() throws Exception { + boolean watchActivated = randomBoolean(); + ActivateWatchRequest request = new ActivateWatchRequest("watch_id", watchActivated); + ActionListener listener = PlainActionFuture.newFuture(); + + // add a few nodes, with current versions + ClusterState clusterState = ClusterState.builder(new ClusterName("my_cluster")) + .nodes(DiscoveryNodes.builder() + .masterNodeId("node_2") + .localNodeId("node_1") + .add(newNode("node_1", Version.CURRENT)) + .add(newNode("node_2", Version.V_5_6_10))) + .build(); + when(clusterService.state()).thenReturn(clusterState); + mockWatchStatus(watchActivated); + + action.masterOperation(request, clusterState, listener); + + verifyNoMoreInteractions(triggerService); + } + + // we trigger on the master node only, not on any other node + public void testWatchesAreTriggeredOnMasterWhenNotDistributed() throws Exception { + boolean watchActivated = randomBoolean(); + ActivateWatchRequest request = new ActivateWatchRequest("watch_id", watchActivated); + ActionListener listener = PlainActionFuture.newFuture(); + + // add a few nodes, with current versions + ClusterState clusterState = ClusterState.builder(new ClusterName("my_cluster")) + .nodes(DiscoveryNodes.builder() + .masterNodeId("node_1") + .localNodeId("node_1") + .add(newNode("node_1", Version.CURRENT)) + .add(newNode("node_2", Version.V_5_6_10))) + .build(); + when(clusterService.state()).thenReturn(clusterState); + mockWatchStatus(watchActivated); + + action.masterOperation(request, clusterState, listener); + + if (watchActivated) { + verify(triggerService).add(eq(watch)); + } else { + verify(triggerService).remove(eq("watch_id")); + } + } + + private void mockWatchStatus(boolean active) { + WatchStatus status = mock(WatchStatus.class); + WatchStatus.State state = new WatchStatus.State(active, DateTime.now(DateTimeZone.UTC)); + when(status.state()).thenReturn(state); + when(watch.status()).thenReturn(status); + } + + private static DiscoveryNode newNode(String nodeId, Version version) { + return new DiscoveryNode(nodeId, ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(), + new HashSet<>(asList(DiscoveryNode.Role.values())), version); + } +}