Skip to content

Commit d209210

Browse files
authored
Introduce deterministic task queue (#32197)
The cluster coordination layer relies on timeouts to ensure that a cluster can successfully form, and must also deal with concurrent activity in the cluster. This commit introduces some infrastructure that will help us to deterministically test components that use concurrency and/or timeouts.
1 parent 384cc54 commit d209210

File tree

4 files changed

+718
-3
lines changed

4 files changed

+718
-3
lines changed
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.cluster.coordination;
21+
22+
import org.elasticsearch.common.unit.TimeValue;
23+
24+
/**
25+
* Device which supports running a task after some delay has elapsed.
26+
*/
27+
public interface FutureExecutor {
28+
/**
29+
* Schedule the given task for execution after the given delay has elapsed.
30+
*/
31+
void schedule(Runnable task, TimeValue delay);
32+
}
33+

server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -362,9 +362,13 @@ public Runnable preserveContext(Runnable command) {
362362
return getThreadContext().preserveContext(command);
363363
}
364364

365-
public void shutdown() {
365+
protected final void stopCachedTimeThread() {
366366
cachedTimeThread.running = false;
367367
cachedTimeThread.interrupt();
368+
}
369+
370+
public void shutdown() {
371+
stopCachedTimeThread();
368372
scheduler.shutdown();
369373
for (ExecutorHolder executor : executors.values()) {
370374
if (executor.executor() instanceof ThreadPoolExecutor) {
@@ -374,8 +378,7 @@ public void shutdown() {
374378
}
375379

376380
public void shutdownNow() {
377-
cachedTimeThread.running = false;
378-
cachedTimeThread.interrupt();
381+
stopCachedTimeThread();
379382
scheduler.shutdownNow();
380383
for (ExecutorHolder executor : executors.values()) {
381384
if (executor.executor() instanceof ThreadPoolExecutor) {

0 commit comments

Comments
 (0)