Skip to content

Commit 70f48ee

Browse files
authored
Watcher: Prevent duplicate watch triggering during upgrade (#30643)
If a user is putting a watch, while upgrading from 5.x to 6.x, this can lead to the watch being triggered on the node receiving the put watch request. Note, that this can only happen when watcher is not running in its distributed fashion. The condition for this is, that there are still nodes running on version 5 in a 6.x cluster.
1 parent 7f48df0 commit 70f48ee

File tree

2 files changed

+105
-14
lines changed

2 files changed

+105
-14
lines changed

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchAction.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ public class TransportPutWatchAction extends WatcherTransportAction<PutWatchRequ
6363
private final WatchParser parser;
6464
private final TriggerService triggerService;
6565
private final Client client;
66+
private final ClusterService clusterService;
6667
private static final ToXContent.Params DEFAULT_PARAMS =
6768
WatcherParams.builder().hideSecrets(false).hideHeaders(false).includeStatus(true).build();
6869

@@ -76,6 +77,7 @@ public TransportPutWatchAction(Settings settings, TransportService transportServ
7677
this.clock = clock;
7778
this.parser = parser;
7879
this.client = client;
80+
this.clusterService = clusterService;
7981
this.triggerService = triggerService;
8082
}
8183

@@ -106,7 +108,10 @@ protected void masterOperation(PutWatchRequest request, ClusterState state,
106108
executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, updateRequest,
107109
ActionListener.<UpdateResponse>wrap(response -> {
108110
boolean created = response.getResult() == DocWriteResponse.Result.CREATED;
109-
if (localExecute(request) == false && watch.status().state().isActive()) {
111+
// if not yet in distributed mode (mixed 5/6 version in cluster), only trigger on the master node
112+
if (localExecute(request) == false &&
113+
this.clusterService.state().nodes().isLocalNodeElectedMaster() &&
114+
watch.status().state().isActive()) {
110115
triggerService.add(watch);
111116
}
112117
listener.onResponse(new PutWatchResponse(response.getId(), response.getVersion(), created));

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchActionTests.java

Lines changed: 99 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,18 @@
55
*/
66
package org.elasticsearch.xpack.watcher.transport.actions.put;
77

8+
import org.elasticsearch.Version;
89
import org.elasticsearch.action.ActionListener;
9-
import org.elasticsearch.action.index.IndexRequest;
10-
import org.elasticsearch.action.index.IndexResponse;
10+
import org.elasticsearch.action.DocWriteResponse;
1111
import org.elasticsearch.action.support.ActionFilters;
12+
import org.elasticsearch.action.update.UpdateRequest;
13+
import org.elasticsearch.action.update.UpdateResponse;
1214
import org.elasticsearch.client.Client;
15+
import org.elasticsearch.cluster.ClusterName;
1316
import org.elasticsearch.cluster.ClusterState;
1417
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
18+
import org.elasticsearch.cluster.node.DiscoveryNode;
19+
import org.elasticsearch.cluster.node.DiscoveryNodes;
1520
import org.elasticsearch.cluster.service.ClusterService;
1621
import org.elasticsearch.common.settings.Settings;
1722
import org.elasticsearch.common.util.concurrent.ThreadContext;
@@ -23,17 +28,23 @@
2328
import org.elasticsearch.transport.TransportService;
2429
import org.elasticsearch.xpack.core.ClientHelper;
2530
import org.elasticsearch.xpack.core.watcher.transport.actions.put.PutWatchRequest;
31+
import org.elasticsearch.xpack.core.watcher.transport.actions.put.PutWatchResponse;
2632
import org.elasticsearch.xpack.core.watcher.watch.ClockMock;
2733
import org.elasticsearch.xpack.core.watcher.watch.Watch;
34+
import org.elasticsearch.xpack.core.watcher.watch.WatchStatus;
2835
import org.elasticsearch.xpack.watcher.test.WatchExecutionContextMockBuilder;
2936
import org.elasticsearch.xpack.watcher.trigger.TriggerService;
3037
import org.elasticsearch.xpack.watcher.watch.WatchParser;
38+
import org.joda.time.DateTime;
39+
import org.joda.time.DateTimeZone;
3140
import org.junit.Before;
3241
import org.mockito.ArgumentCaptor;
3342

3443
import java.util.Collections;
44+
import java.util.HashSet;
3545
import java.util.Map;
3646

47+
import static java.util.Arrays.asList;
3748
import static org.hamcrest.Matchers.hasKey;
3849
import static org.hamcrest.Matchers.hasSize;
3950
import static org.hamcrest.Matchers.is;
@@ -45,59 +56,134 @@
4556
import static org.mockito.Mockito.doAnswer;
4657
import static org.mockito.Mockito.mock;
4758
import static org.mockito.Mockito.verify;
59+
import static org.mockito.Mockito.verifyZeroInteractions;
4860
import static org.mockito.Mockito.when;
4961

5062
public class TransportPutWatchActionTests extends ESTestCase {
5163

5264
private TransportPutWatchAction action;
53-
private Watch watch = new WatchExecutionContextMockBuilder("_id").buildMock().watch();
54-
private ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
65+
private final Watch watch = new WatchExecutionContextMockBuilder("_id").buildMock().watch();
66+
private final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
67+
private final ClusterService clusterService = mock(ClusterService.class);
68+
private final TriggerService triggerService = mock(TriggerService.class);
69+
private final ActionListener<PutWatchResponse> listener = ActionListener.wrap(r -> {}, e -> assertThat(e, is(nullValue())));
5570

5671
@Before
5772
public void setupAction() throws Exception {
58-
TriggerService triggerService = mock(TriggerService.class);
59-
ClusterService clusterService = mock(ClusterService.class);
6073
ThreadPool threadPool = mock(ThreadPool.class);
6174
when(threadPool.getThreadContext()).thenReturn(threadContext);
6275

6376
TransportService transportService = mock(TransportService.class);
6477

6578
WatchParser parser = mock(WatchParser.class);
6679
when(parser.parseWithSecrets(eq("_id"), eq(false), anyObject(), anyObject(), anyObject(), anyBoolean())).thenReturn(watch);
80+
WatchStatus status = mock(WatchStatus.class);
81+
WatchStatus.State state = new WatchStatus.State(true, DateTime.now(DateTimeZone.UTC));
82+
when(status.state()).thenReturn(state);
83+
when(watch.status()).thenReturn(status);
6784

6885
Client client = mock(Client.class);
6986
when(client.threadPool()).thenReturn(threadPool);
7087
// mock an index response that calls the listener
7188
doAnswer(invocation -> {
72-
IndexRequest request = (IndexRequest) invocation.getArguments()[1];
73-
ActionListener<IndexResponse> listener = (ActionListener) invocation.getArguments()[2];
89+
UpdateRequest request = (UpdateRequest) invocation.getArguments()[0];
90+
ActionListener<UpdateResponse> listener = (ActionListener) invocation.getArguments()[1];
7491

7592
ShardId shardId = new ShardId(new Index(Watch.INDEX, "uuid"), 0);
76-
listener.onResponse(new IndexResponse(shardId, request.type(), request.id(), 1, 1, 1, true));
93+
listener.onResponse(new UpdateResponse(shardId, request.type(), request.id(), request.version(),
94+
DocWriteResponse.Result.UPDATED));
7795

7896
return null;
79-
}).when(client).execute(any(), any(), any());
97+
}).when(client).update(any(), any());
8098

8199
action = new TransportPutWatchAction(Settings.EMPTY, transportService, threadPool,
82100
new ActionFilters(Collections.emptySet()), new IndexNameExpressionResolver(Settings.EMPTY), new ClockMock(),
83101
new XPackLicenseState(Settings.EMPTY), parser, client, clusterService, triggerService);
84102
}
85103

86104
public void testHeadersAreFilteredWhenPuttingWatches() throws Exception {
87-
ClusterState state = mock(ClusterState.class);
88105
// set up threadcontext with some arbitrary info
89106
String headerName = randomFrom(ClientHelper.SECURITY_HEADER_FILTERS);
90107
threadContext.putHeader(headerName, randomAlphaOfLength(10));
91108
threadContext.putHeader(randomAlphaOfLength(10), "doesntmatter");
92109

93110
PutWatchRequest putWatchRequest = new PutWatchRequest();
94111
putWatchRequest.setId("_id");
95-
action.masterOperation(putWatchRequest, state, ActionListener.wrap(r -> {}, e -> assertThat(e, is(nullValue()))));
112+
113+
ClusterState state = ClusterState.builder(new ClusterName("my_cluster"))
114+
.nodes(DiscoveryNodes.builder()
115+
.masterNodeId("node_1")
116+
.localNodeId(randomFrom("node_1", "node_2"))
117+
.add(newNode("node_1", Version.CURRENT))
118+
.add(newNode("node_2", Version.CURRENT)))
119+
.build();
120+
when(clusterService.state()).thenReturn(state);
121+
122+
action.masterOperation(putWatchRequest, state, listener);
96123

97124
ArgumentCaptor<Map> captor = ArgumentCaptor.forClass(Map.class);
98125
verify(watch.status()).setHeaders(captor.capture());
99126
Map<String, String> capturedHeaders = captor.getValue();
100127
assertThat(capturedHeaders.keySet(), hasSize(1));
101128
assertThat(capturedHeaders, hasKey(headerName));
102129
}
103-
}
130+
131+
public void testWatchesAreNeverTriggeredWhenDistributed() throws Exception {
132+
PutWatchRequest putWatchRequest = new PutWatchRequest();
133+
putWatchRequest.setId("_id");
134+
135+
ClusterState clusterState = ClusterState.builder(new ClusterName("my_cluster"))
136+
.nodes(DiscoveryNodes.builder()
137+
.masterNodeId("node_1")
138+
.localNodeId(randomFrom("node_1", "node_2"))
139+
.add(newNode("node_1", Version.CURRENT))
140+
.add(newNode("node_2", Version.CURRENT)))
141+
.build();
142+
when(clusterService.state()).thenReturn(clusterState);
143+
144+
action.masterOperation(putWatchRequest, clusterState, listener);
145+
146+
verifyZeroInteractions(triggerService);
147+
}
148+
149+
public void testWatchesAreNotTriggeredOnNonMasterWhenNotDistributed() throws Exception {
150+
PutWatchRequest putWatchRequest = new PutWatchRequest();
151+
putWatchRequest.setId("_id");
152+
153+
ClusterState clusterState = ClusterState.builder(new ClusterName("my_cluster"))
154+
.nodes(DiscoveryNodes.builder()
155+
.masterNodeId("node_2")
156+
.localNodeId("node_1")
157+
.add(newNode("node_1", Version.CURRENT))
158+
.add(newNode("node_2", Version.V_5_6_10)))
159+
.build();
160+
when(clusterService.state()).thenReturn(clusterState);
161+
162+
action.masterOperation(putWatchRequest, clusterState, listener);
163+
164+
verifyZeroInteractions(triggerService);
165+
}
166+
167+
public void testWatchesAreTriggeredOnMasterWhenNotDistributed() throws Exception {
168+
PutWatchRequest putWatchRequest = new PutWatchRequest();
169+
putWatchRequest.setId("_id");
170+
171+
ClusterState clusterState = ClusterState.builder(new ClusterName("my_cluster"))
172+
.nodes(DiscoveryNodes.builder()
173+
.masterNodeId("node_1")
174+
.localNodeId("node_1")
175+
.add(newNode("node_1", Version.CURRENT))
176+
.add(newNode("node_2", Version.V_5_6_10)))
177+
.build();
178+
when(clusterService.state()).thenReturn(clusterState);
179+
180+
action.masterOperation(putWatchRequest, clusterState, listener);
181+
182+
verify(triggerService).add(eq(watch));
183+
}
184+
185+
private static DiscoveryNode newNode(String nodeId, Version version) {
186+
return new DiscoveryNode(nodeId, ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(),
187+
new HashSet<>(asList(DiscoveryNode.Role.values())), version);
188+
}
189+
}

0 commit comments

Comments
 (0)