From 8e1971684b006c1d1a574ac7d64fe3337f640f85 Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Wed, 16 May 2018 10:17:40 +0200 Subject: [PATCH] Watcher: Prevent duplicate watch triggering during upgrade If a user is putting a watch, while upgrading from 5.x to 6.x, this can lead to the watch being triggered on the node receiving the put watch request. Note, that this can only happen when watcher is not running in its distributed fashion. The condition for this is, that there are still nodes running on version 5 in a 6.x cluster. --- .../actions/put/TransportPutWatchAction.java | 7 +- .../put/TransportPutWatchActionTests.java | 112 ++++++++++++++++-- 2 files changed, 105 insertions(+), 14 deletions(-) diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchAction.java index d836507596b74..45144c2d34683 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchAction.java @@ -63,6 +63,7 @@ public class TransportPutWatchAction extends WatcherTransportActionwrap(response -> { boolean created = response.getResult() == DocWriteResponse.Result.CREATED; - if (localExecute(request) == false && watch.status().state().isActive()) { + // if not yet in distributed mode (mixed 5/6 version in cluster), only trigger on the master node + if (localExecute(request) == false && + this.clusterService.state().nodes().isLocalNodeElectedMaster() && + watch.status().state().isActive()) { triggerService.add(watch); } listener.onResponse(new PutWatchResponse(response.getId(), response.getVersion(), created)); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchActionTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchActionTests.java index 9a41c9b7aabbf..2585659a91670 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchActionTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchActionTests.java @@ -5,13 +5,18 @@ */ package org.elasticsearch.xpack.watcher.transport.actions.put; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -22,18 +27,24 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.watcher.transport.actions.put.PutWatchRequest; +import org.elasticsearch.xpack.core.watcher.transport.actions.put.PutWatchResponse; import org.elasticsearch.xpack.core.watcher.watch.ClockMock; import org.elasticsearch.xpack.core.watcher.watch.Watch; +import org.elasticsearch.xpack.core.watcher.watch.WatchStatus; import org.elasticsearch.xpack.watcher.Watcher; import org.elasticsearch.xpack.watcher.test.WatchExecutionContextMockBuilder; import org.elasticsearch.xpack.watcher.trigger.TriggerService; import org.elasticsearch.xpack.watcher.watch.WatchParser; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.junit.Before; import org.mockito.ArgumentCaptor; import java.util.Collections; +import java.util.HashSet; import java.util.Map; +import static java.util.Arrays.asList; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; @@ -45,18 +56,20 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; public class TransportPutWatchActionTests extends ESTestCase { private TransportPutWatchAction action; - private Watch watch = new WatchExecutionContextMockBuilder("_id").buildMock().watch(); - private ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + private final Watch watch = new WatchExecutionContextMockBuilder("_id").buildMock().watch(); + private final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + private final ClusterService clusterService = mock(ClusterService.class); + private final TriggerService triggerService = mock(TriggerService.class); + private final ActionListener listener = ActionListener.wrap(r -> {}, e -> assertThat(e, is(nullValue()))); @Before public void setupAction() throws Exception { - TriggerService triggerService = mock(TriggerService.class); - ClusterService clusterService = mock(ClusterService.class); ThreadPool threadPool = mock(ThreadPool.class); when(threadPool.getThreadContext()).thenReturn(threadContext); @@ -64,19 +77,24 @@ public void setupAction() throws Exception { WatchParser parser = mock(WatchParser.class); when(parser.parseWithSecrets(eq("_id"), eq(false), anyObject(), anyObject(), anyObject(), anyBoolean())).thenReturn(watch); + WatchStatus status = mock(WatchStatus.class); + WatchStatus.State state = new WatchStatus.State(true, DateTime.now(DateTimeZone.UTC)); + when(status.state()).thenReturn(state); + when(watch.status()).thenReturn(status); Client client = mock(Client.class); when(client.threadPool()).thenReturn(threadPool); // mock an index response that calls the listener doAnswer(invocation -> { - IndexRequest request = (IndexRequest) invocation.getArguments()[1]; - ActionListener listener = (ActionListener) invocation.getArguments()[2]; + UpdateRequest request = (UpdateRequest) invocation.getArguments()[0]; + ActionListener listener = (ActionListener) invocation.getArguments()[1]; ShardId shardId = new ShardId(new Index(Watch.INDEX, "uuid"), 0); - listener.onResponse(new IndexResponse(shardId, request.type(), request.id(), 1, 1, 1, true)); + listener.onResponse(new UpdateResponse(shardId, request.type(), request.id(), request.version(), + DocWriteResponse.Result.UPDATED)); return null; - }).when(client).execute(any(), any(), any()); + }).when(client).update(any(), any()); action = new TransportPutWatchAction(Settings.EMPTY, transportService, threadPool, new ActionFilters(Collections.emptySet()), new IndexNameExpressionResolver(Settings.EMPTY), new ClockMock(), @@ -84,7 +102,6 @@ public void setupAction() throws Exception { } public void testHeadersAreFilteredWhenPuttingWatches() throws Exception { - ClusterState state = mock(ClusterState.class); // set up threadcontext with some arbitrary info String headerName = randomFrom(Watcher.HEADER_FILTERS); threadContext.putHeader(headerName, randomAlphaOfLength(10)); @@ -92,7 +109,17 @@ public void testHeadersAreFilteredWhenPuttingWatches() throws Exception { PutWatchRequest putWatchRequest = new PutWatchRequest(); putWatchRequest.setId("_id"); - action.masterOperation(putWatchRequest, state, ActionListener.wrap(r -> {}, e -> assertThat(e, is(nullValue())))); + + ClusterState state = ClusterState.builder(new ClusterName("my_cluster")) + .nodes(DiscoveryNodes.builder() + .masterNodeId("node_1") + .localNodeId(randomFrom("node_1", "node_2")) + .add(newNode("node_1", Version.CURRENT)) + .add(newNode("node_2", Version.CURRENT))) + .build(); + when(clusterService.state()).thenReturn(state); + + action.masterOperation(putWatchRequest, state, listener); ArgumentCaptor captor = ArgumentCaptor.forClass(Map.class); verify(watch.status()).setHeaders(captor.capture()); @@ -100,4 +127,63 @@ public void testHeadersAreFilteredWhenPuttingWatches() throws Exception { assertThat(capturedHeaders.keySet(), hasSize(1)); assertThat(capturedHeaders, hasKey(headerName)); } -} \ No newline at end of file + + public void testWatchesAreNeverTriggeredWhenDistributed() throws Exception { + PutWatchRequest putWatchRequest = new PutWatchRequest(); + putWatchRequest.setId("_id"); + + ClusterState clusterState = ClusterState.builder(new ClusterName("my_cluster")) + .nodes(DiscoveryNodes.builder() + .masterNodeId("node_1") + .localNodeId(randomFrom("node_1", "node_2")) + .add(newNode("node_1", Version.CURRENT)) + .add(newNode("node_2", Version.CURRENT))) + .build(); + when(clusterService.state()).thenReturn(clusterState); + + action.masterOperation(putWatchRequest, clusterState, listener); + + verifyZeroInteractions(triggerService); + } + + public void testWatchesAreNotTriggeredOnNonMasterWhenNotDistributed() throws Exception { + PutWatchRequest putWatchRequest = new PutWatchRequest(); + putWatchRequest.setId("_id"); + + ClusterState clusterState = ClusterState.builder(new ClusterName("my_cluster")) + .nodes(DiscoveryNodes.builder() + .masterNodeId("node_2") + .localNodeId("node_1") + .add(newNode("node_1", Version.CURRENT)) + .add(newNode("node_2", Version.V_5_6_10))) + .build(); + when(clusterService.state()).thenReturn(clusterState); + + action.masterOperation(putWatchRequest, clusterState, listener); + + verifyZeroInteractions(triggerService); + } + + public void testWatchesAreTriggeredOnMasterWhenNotDistributed() throws Exception { + PutWatchRequest putWatchRequest = new PutWatchRequest(); + putWatchRequest.setId("_id"); + + ClusterState clusterState = ClusterState.builder(new ClusterName("my_cluster")) + .nodes(DiscoveryNodes.builder() + .masterNodeId("node_1") + .localNodeId("node_1") + .add(newNode("node_1", Version.CURRENT)) + .add(newNode("node_2", Version.V_5_6_10))) + .build(); + when(clusterService.state()).thenReturn(clusterState); + + action.masterOperation(putWatchRequest, clusterState, listener); + + verify(triggerService).add(eq(watch)); + } + + private static DiscoveryNode newNode(String nodeId, Version version) { + return new DiscoveryNode(nodeId, ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(), + new HashSet<>(asList(DiscoveryNode.Role.values())), version); + } +}