Skip to content

Commit 71ebed6

Browse files
authored
Watcher: Mark watcher as started only after loading watches (#30403)
Starting watcher should wait for the watcher to be started before marking the status as started, which is now done via a callback. Also, reloading watcher could set the execution service to paused. This could lead to watches not being executed, when run in tests. This fix does not change the paused flag in the execution service, just clears out the current queue and executions. Closes #30381
1 parent 3962340 commit 71ebed6

File tree

6 files changed

+77
-20
lines changed

6 files changed

+77
-20
lines changed

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,7 @@ public void clusterChanged(ClusterChangedEvent event) {
110110
// if this is not a data node, we need to start it ourselves possibly
111111
if (event.state().nodes().getLocalNode().isDataNode() == false &&
112112
isWatcherStoppedManually == false && this.state.get() == WatcherState.STOPPED) {
113-
watcherService.start(event.state());
114-
this.state.set(WatcherState.STARTED);
113+
watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED));
115114
return;
116115
}
117116

@@ -157,8 +156,8 @@ public void clusterChanged(ClusterChangedEvent event) {
157156
if (state.get() == WatcherState.STARTED) {
158157
watcherService.reload(event.state(), "new local watcher shard allocation ids");
159158
} else if (state.get() == WatcherState.STOPPED) {
160-
watcherService.start(event.state());
161-
this.state.set(WatcherState.STARTED);
159+
this.state.set(WatcherState.STARTING);
160+
watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED));
162161
}
163162
} else {
164163
clearAllocationIds();

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -183,23 +183,40 @@ void reload(ClusterState state, String reason) {
183183
// by checking the cluster state version before and after loading the watches we can potentially just exit without applying the
184184
// changes
185185
processedClusterStateVersion.set(state.getVersion());
186-
pauseExecution(reason);
187186
triggerService.pauseExecution();
187+
int cancelledTaskCount = executionService.clearExecutionsAndQueue();
188+
logger.info("reloading watcher, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount);
188189

189190
executor.execute(wrapWatcherService(() -> reloadInner(state, reason, false),
190191
e -> logger.error("error reloading watcher", e)));
191192
}
192193

193-
public void start(ClusterState state) {
194+
/**
195+
* start the watcher service, load watches in the background
196+
*
197+
* @param state the current cluster state
198+
* @param postWatchesLoadedCallback the callback to be triggered, when watches where loaded successfully
199+
*/
200+
public void start(ClusterState state, Runnable postWatchesLoadedCallback) {
201+
executionService.unPause();
194202
processedClusterStateVersion.set(state.getVersion());
195-
executor.execute(wrapWatcherService(() -> reloadInner(state, "starting", true),
203+
executor.execute(wrapWatcherService(() -> {
204+
if (reloadInner(state, "starting", true)) {
205+
postWatchesLoadedCallback.run();
206+
}
207+
},
196208
e -> logger.error("error starting watcher", e)));
197209
}
198210

199211
/**
200-
* reload the watches and start scheduling them
212+
* reload watches and start scheduling them
213+
*
214+
* @param state the current cluster state
215+
* @param reason the reason for reloading, will be logged
216+
* @param loadTriggeredWatches should triggered watches be loaded in this run, not needed for reloading, only for starting
217+
* @return true if no other loading of a newer cluster state happened in parallel, false otherwise
201218
*/
202-
private synchronized void reloadInner(ClusterState state, String reason, boolean loadTriggeredWatches) {
219+
private synchronized boolean reloadInner(ClusterState state, String reason, boolean loadTriggeredWatches) {
203220
// exit early if another thread has come in between
204221
if (processedClusterStateVersion.get() != state.getVersion()) {
205222
logger.debug("watch service has not been reloaded for state [{}], another reload for state [{}] in progress",
@@ -221,9 +238,11 @@ private synchronized void reloadInner(ClusterState state, String reason, boolean
221238
executionService.executeTriggeredWatches(triggeredWatches);
222239
}
223240
logger.debug("watch service has been reloaded, reason [{}]", reason);
241+
return true;
224242
} else {
225243
logger.debug("watch service has not been reloaded for state [{}], another reload for state [{}] in progress",
226244
state.getVersion(), processedClusterStateVersion.get());
245+
return false;
227246
}
228247
}
229248

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,11 +121,25 @@ public void unPause() {
121121
}
122122

123123
/**
124-
* Pause the execution of the watcher executor
124+
* Pause the execution of the watcher executor, and empty the state.
125+
* Pausing means, that no new watch executions will be done unless this pausing is explicitely unset.
126+
* This is important when watcher is stopped, so that scheduled watches do not accidentally get executed.
127+
* This should not be used when we need to reload watcher based on some cluster state changes, then just calling
128+
* {@link #clearExecutionsAndQueue()} is the way to go
129+
*
125130
* @return the number of tasks that have been removed
126131
*/
127132
public int pause() {
128133
paused.set(true);
134+
return clearExecutionsAndQueue();
135+
}
136+
137+
/**
138+
* Empty the currently queued tasks and wait for current executions to finish.
139+
*
140+
* @return the number of tasks that have been removed
141+
*/
142+
public int clearExecutionsAndQueue() {
129143
int cancelledTaskCount = executor.queue().drainTo(new ArrayList<>());
130144
this.clearExecutions();
131145
return cancelledTaskCount;

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ public void testManualStartStop() {
180180
reset(watcherService);
181181
when(watcherService.validate(clusterState)).thenReturn(true);
182182
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, stoppedClusterState));
183-
verify(watcherService, times(1)).start(eq(clusterState));
183+
verify(watcherService, times(1)).start(eq(clusterState), anyObject());
184184

185185
// no change, keep going
186186
reset(watcherService);
@@ -423,7 +423,7 @@ public void testWatcherServiceDoesNotStartIfIndexTemplatesAreMissing() throws Ex
423423
when(watcherService.validate(eq(state))).thenReturn(true);
424424

425425
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", state, state));
426-
verify(watcherService, times(0)).start(any(ClusterState.class));
426+
verify(watcherService, times(0)).start(any(ClusterState.class), anyObject());
427427
}
428428

429429
public void testWatcherStopsWhenMasterNodeIsMissing() {

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import static org.mockito.Matchers.any;
6969
import static org.mockito.Matchers.eq;
7070
import static org.mockito.Mockito.mock;
71+
import static org.mockito.Mockito.never;
7172
import static org.mockito.Mockito.verify;
7273
import static org.mockito.Mockito.when;
7374

@@ -199,7 +200,7 @@ void stopExecutor() {
199200
when(client.clearScroll(any(ClearScrollRequest.class))).thenReturn(clearScrollFuture);
200201
clearScrollFuture.onResponse(new ClearScrollResponse(true, 1));
201202

202-
service.start(clusterState);
203+
service.start(clusterState, () -> {});
203204

204205
ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class);
205206
verify(triggerService).start(captor.capture());
@@ -238,6 +239,27 @@ void stopExecutor() {
238239
verify(triggerEngine).pauseExecution();
239240
}
240241

242+
// if we have to reload the watcher service, the execution service should not be paused, as this might
243+
// result in missing executions
244+
public void testReloadingWatcherDoesNotPauseExecutionService() {
245+
ExecutionService executionService = mock(ExecutionService.class);
246+
TriggerService triggerService = mock(TriggerService.class);
247+
WatcherService service = new WatcherService(Settings.EMPTY, triggerService, mock(TriggeredWatchStore.class),
248+
executionService, mock(WatchParser.class), mock(Client.class), executorService) {
249+
@Override
250+
void stopExecutor() {
251+
}
252+
};
253+
254+
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
255+
csBuilder.metaData(MetaData.builder());
256+
257+
service.reload(csBuilder.build(), "whatever");
258+
verify(executionService).clearExecutionsAndQueue();
259+
verify(executionService, never()).pause();
260+
verify(triggerService).pauseExecution();
261+
}
262+
241263
private static DiscoveryNode newNode() {
242264
return new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(),
243265
new HashSet<>(asList(DiscoveryNode.Role.values())), Version.CURRENT);

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/ExecutionVarsIntegrationTests.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Map;
2424
import java.util.function.Function;
2525

26+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
2627
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction;
2728
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
2829
import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput;
@@ -36,6 +37,8 @@
3637

3738
public class ExecutionVarsIntegrationTests extends AbstractWatcherIntegrationTestCase {
3839

40+
private String watchId = randomAlphaOfLength(20);
41+
3942
@Override
4043
protected List<Class<? extends Plugin>> pluginTypes() {
4144
List<Class<? extends Plugin>> types = super.pluginTypes();
@@ -107,7 +110,7 @@ protected Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
107110
public void testVars() throws Exception {
108111
WatcherClient watcherClient = watcherClient();
109112

110-
PutWatchResponse putWatchResponse = watcherClient.preparePutWatch("_id").setSource(watchBuilder()
113+
PutWatchResponse putWatchResponse = watcherClient.preparePutWatch(watchId).setSource(watchBuilder()
111114
.trigger(schedule(cron("0/1 * * * * ?")))
112115
.input(simpleInput("value", 5))
113116
.condition(new ScriptCondition(
@@ -126,7 +129,7 @@ public void testVars() throws Exception {
126129

127130
assertThat(putWatchResponse.isCreated(), is(true));
128131

129-
timeWarp().trigger("_id");
132+
timeWarp().trigger(watchId);
130133

131134
flush();
132135
refresh();
@@ -135,11 +138,11 @@ public void testVars() throws Exception {
135138
// defaults to match all;
136139
});
137140

138-
assertThat(searchResponse.getHits().getTotalHits(), is(1L));
141+
assertHitCount(searchResponse, 1L);
139142

140143
Map<String, Object> source = searchResponse.getHits().getAt(0).getSourceAsMap();
141144

142-
assertValue(source, "watch_id", is("_id"));
145+
assertValue(source, "watch_id", is(watchId));
143146
assertValue(source, "state", is("executed"));
144147

145148
// we don't store the computed vars in history
@@ -171,7 +174,7 @@ public void testVars() throws Exception {
171174
public void testVarsManual() throws Exception {
172175
WatcherClient watcherClient = watcherClient();
173176

174-
PutWatchResponse putWatchResponse = watcherClient.preparePutWatch("_id").setSource(watchBuilder()
177+
PutWatchResponse putWatchResponse = watcherClient.preparePutWatch(watchId).setSource(watchBuilder()
175178
.trigger(schedule(cron("0/1 * * * * ? 2020")))
176179
.input(simpleInput("value", 5))
177180
.condition(new ScriptCondition(
@@ -193,13 +196,13 @@ public void testVarsManual() throws Exception {
193196
boolean debug = randomBoolean();
194197

195198
ExecuteWatchResponse executeWatchResponse = watcherClient
196-
.prepareExecuteWatch("_id")
199+
.prepareExecuteWatch(watchId)
197200
.setDebug(debug)
198201
.get();
199202
assertThat(executeWatchResponse.getRecordId(), notNullValue());
200203
XContentSource source = executeWatchResponse.getRecordSource();
201204

202-
assertValue(source, "watch_id", is("_id"));
205+
assertValue(source, "watch_id", is(watchId));
203206
assertValue(source, "state", is("executed"));
204207

205208
if (debug) {

0 commit comments

Comments
 (0)