Skip to content

Commit b6a2b8d

Browse files
committed
Track EWMA[1] of task execution time in search threadpool executor
This is the first step towards adaptive replica selection (#24915). This PR tracks the execution time, also known as the "service time" of a task in the threadpool. The `QueueResizingEsThreadPoolExecutor` then stores a moving average of these task times which can be retrieved from the executor. Currently there is no functionality using the EWMA yet (other than tests), this is only a bite-sized building block so that it's easier to review. [1]: EWMA = Exponentially Weighted Moving Average
1 parent f2a23e3 commit b6a2b8d

File tree

5 files changed

+219
-9
lines changed

5 files changed

+219
-9
lines changed
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.common;
21+
22+
23+
import java.util.concurrent.atomic.AtomicLong;
24+
25+
/**
26+
* Implements exponentially weighted moving averages (commonly abbreviated EWMA) for a single value.
27+
* This class is safe to share between threads.
28+
*/
29+
public class ExponentiallyWeightedMovingAverage {
30+
31+
private final double alpha;
32+
private final AtomicLong averageBits;
33+
34+
/**
35+
* Create a new EWMA with a given {@code alpha} and {@code initialAvg}. A smaller alpha means
36+
* that new data points will have less weight, where a high alpha means older data points will
37+
* have a lower influence.
38+
*/
39+
public ExponentiallyWeightedMovingAverage(double alpha, double initialAvg) {
40+
if (alpha < 0 || alpha > 1) {
41+
throw new IllegalArgumentException("alpha must be greater or equal to 0 and less than or equal to 1");
42+
}
43+
this.alpha = alpha;
44+
this.averageBits = new AtomicLong(Double.doubleToLongBits(initialAvg));
45+
}
46+
47+
public double getAverage() {
48+
return Double.longBitsToDouble(this.averageBits.get());
49+
}
50+
51+
public void addValue(double newValue) {
52+
boolean successful = false;
53+
do {
54+
final long currentBits = this.averageBits.get();
55+
final double currentAvg = getAverage();
56+
final double newAvg = (alpha * newValue) + ((1 - alpha) * currentAvg);
57+
final long newBits = Double.doubleToLongBits(newAvg);
58+
successful = averageBits.compareAndSet(currentBits, newBits);
59+
} while (successful == false);
60+
}
61+
}

core/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.logging.log4j.Logger;
2323
import org.apache.logging.log4j.message.ParameterizedMessage;
24+
import org.elasticsearch.common.ExponentiallyWeightedMovingAverage;
2425
import org.elasticsearch.common.collect.Tuple;
2526
import org.elasticsearch.common.logging.ESLoggerFactory;
2627
import org.elasticsearch.common.unit.TimeValue;
@@ -43,17 +44,21 @@
4344
*/
4445
public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecutor {
4546

47+
// This is a random starting point alpha. TODO: revisit this with actual testing and/or make it configurable
48+
public static double EWMA_ALPHA = 0.3;
49+
4650
private static final Logger logger =
4751
ESLoggerFactory.getLogger(QueueResizingEsThreadPoolExecutor.class);
52+
// The amount the queue size is adjusted by for each calcuation
53+
private static final int QUEUE_ADJUSTMENT_AMOUNT = 50;
4854

4955
private final Function<Runnable, Runnable> runnableWrapper;
5056
private final ResizableBlockingQueue<Runnable> workQueue;
5157
private final int tasksPerFrame;
5258
private final int minQueueSize;
5359
private final int maxQueueSize;
5460
private final long targetedResponseTimeNanos;
55-
// The amount the queue size is adjusted by for each calcuation
56-
private static final int QUEUE_ADJUSTMENT_AMOUNT = 50;
61+
private final ExponentiallyWeightedMovingAverage executionEWMA;
5762

5863
private final AtomicLong totalTaskNanos = new AtomicLong(0);
5964
private final AtomicInteger taskCount = new AtomicInteger(0);
@@ -74,6 +79,9 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto
7479
this.minQueueSize = minQueueSize;
7580
this.maxQueueSize = maxQueueSize;
7681
this.targetedResponseTimeNanos = targetedResponseTime.getNanos();
82+
// We choose to start the EWMA with the targeted response time, reasoning that it is a
83+
// better start point for a realistic task execution time than starting at 0
84+
this.executionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, targetedResponseTimeNanos);
7785
logger.debug("thread pool [{}] will adjust queue by [{}] when determining automatic queue size",
7886
name, QUEUE_ADJUSTMENT_AMOUNT);
7987
}
@@ -126,6 +134,13 @@ public int getCurrentCapacity() {
126134
return workQueue.capacity();
127135
}
128136

137+
/**
138+
* Returns the exponentially weighted moving average of the task execution time
139+
*/
140+
public double getTaskExecutionEWMA() {
141+
return executionEWMA.getAverage();
142+
}
143+
129144
@Override
130145
protected void afterExecute(Runnable r, Throwable t) {
131146
super.afterExecute(r, t);
@@ -136,6 +151,11 @@ protected void afterExecute(Runnable r, Throwable t) {
136151
assert r instanceof TimedRunnable : "expected only TimedRunnables in queue";
137152
final long taskNanos = ((TimedRunnable) r).getTotalNanos();
138153
final long totalNanos = totalTaskNanos.addAndGet(taskNanos);
154+
155+
final long taskExecutionNanos = ((TimedRunnable) r).getTotalExecutionNanos();
156+
assert taskExecutionNanos >= 0 : "expected task to always take longer than 0 nanoseconds, got: " + taskExecutionNanos;
157+
executionEWMA.addValue(taskExecutionNanos);
158+
139159
if (taskCount.incrementAndGet() == this.tasksPerFrame) {
140160
final long endTimeNs = System.nanoTime();
141161
final long totalRuntime = endTimeNs - this.startNs;
@@ -149,20 +169,22 @@ protected void afterExecute(Runnable r, Throwable t) {
149169
try {
150170
final double lambda = calculateLambda(tasksPerFrame, totalNanos);
151171
final int desiredQueueSize = calculateL(lambda, targetedResponseTimeNanos);
172+
final int oldCapacity = workQueue.capacity();
173+
152174
if (logger.isDebugEnabled()) {
153175
final long avgTaskTime = totalNanos / tasksPerFrame;
154-
logger.debug("[{}]: there were [{}] tasks in [{}], avg task time: [{}], [{} tasks/s], " +
155-
"optimal queue is [{}]",
176+
logger.debug("[{}]: there were [{}] tasks in [{}], avg task time [{}], EWMA task execution [{}], " +
177+
"[{} tasks/s], optimal queue is [{}], current capacity [{}]",
156178
name,
157179
tasksPerFrame,
158180
TimeValue.timeValueNanos(totalRuntime),
159181
TimeValue.timeValueNanos(avgTaskTime),
182+
TimeValue.timeValueNanos((long)executionEWMA.getAverage()),
160183
String.format(Locale.ROOT, "%.2f", lambda * TimeValue.timeValueSeconds(1).nanos()),
161-
desiredQueueSize);
184+
desiredQueueSize,
185+
oldCapacity);
162186
}
163187

164-
final int oldCapacity = workQueue.capacity();
165-
166188
// Adjust the queue size towards the desired capacity using an adjust of
167189
// QUEUE_ADJUSTMENT_AMOUNT (either up or down), keeping in mind the min and max
168190
// values the queue size can have.
@@ -223,6 +245,7 @@ public String toString() {
223245
b.append("max queue capacity = ").append(maxQueueSize).append(", ");
224246
b.append("frame size = ").append(tasksPerFrame).append(", ");
225247
b.append("targeted response rate = ").append(TimeValue.timeValueNanos(targetedResponseTimeNanos)).append(", ");
248+
b.append("task execution EWMA = ").append(TimeValue.timeValueNanos((long)executionEWMA.getAverage())).append(", ");
226249
b.append("adjustment amount = ").append(QUEUE_ADJUSTMENT_AMOUNT).append(", ");
227250
/*
228251
* ThreadPoolExecutor has some nice information in its toString but we

core/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@
2020
package org.elasticsearch.common.util.concurrent;
2121

2222
/**
23-
* A class used to wrap a {@code Runnable} that allows capturing the time the task since creation
24-
* through execution.
23+
* A class used to wrap a {@code Runnable} that allows capturing the time of the task since creation
24+
* through execution as well as only execution time.
2525
*/
2626
class TimedRunnable implements Runnable {
2727
private final Runnable original;
2828
private final long creationTimeNanos;
29+
private long startTimeNanos;
2930
private long finishTimeNanos = -1;
3031

3132
TimedRunnable(Runnable original) {
@@ -36,6 +37,7 @@ class TimedRunnable implements Runnable {
3637
@Override
3738
public void run() {
3839
try {
40+
startTimeNanos = System.nanoTime();
3941
original.run();
4042
} finally {
4143
finishTimeNanos = System.nanoTime();
@@ -53,4 +55,16 @@ long getTotalNanos() {
5355
}
5456
return finishTimeNanos - creationTimeNanos;
5557
}
58+
59+
/**
60+
* Return the time this task spent being run.
61+
* If the task is still running or has not yet been run, returns -1.
62+
*/
63+
long getTotalExecutionNanos() {
64+
if (startTimeNanos == -1 || finishTimeNanos == -1) {
65+
// There must have been an exception thrown, the total time is unknown (-1)
66+
return -1;
67+
}
68+
return finishTimeNanos - startTimeNanos;
69+
}
5670
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.common;
21+
22+
import org.elasticsearch.test.ESTestCase;
23+
24+
import static org.hamcrest.Matchers.equalTo;
25+
import static org.hamcrest.Matchers.lessThan;
26+
import static org.junit.Assert.assertThat;
27+
28+
/**
29+
* Implements exponentially weighted moving averages (commonly abbreviated EWMA) for a single value.
30+
*/
31+
public class ExponentiallyWeightedMovingAverageTests extends ESTestCase {
32+
33+
public void testEWMA() {
34+
final ExponentiallyWeightedMovingAverage ewma = new ExponentiallyWeightedMovingAverage(0.5, 10);
35+
ewma.addValue(12);
36+
assertThat(ewma.getAverage(), equalTo(11.0));
37+
ewma.addValue(10);
38+
ewma.addValue(15);
39+
ewma.addValue(13);
40+
assertThat(ewma.getAverage(), equalTo(12.875));
41+
}
42+
43+
public void testInvalidAlpha() {
44+
try {
45+
ExponentiallyWeightedMovingAverage ewma = new ExponentiallyWeightedMovingAverage(-0.5, 10);
46+
fail("should have failed to create EWMA");
47+
} catch (IllegalArgumentException e) {
48+
assertThat(e.getMessage(), equalTo("alpha must be greater or equal to 0 and less than or equal to 1"));
49+
}
50+
51+
try {
52+
ExponentiallyWeightedMovingAverage ewma = new ExponentiallyWeightedMovingAverage(1.5, 10);
53+
fail("should have failed to create EWMA");
54+
} catch (IllegalArgumentException e) {
55+
assertThat(e.getMessage(), equalTo("alpha must be greater or equal to 0 and less than or equal to 1"));
56+
}
57+
}
58+
59+
public void testConvergingToValue() {
60+
final ExponentiallyWeightedMovingAverage ewma = new ExponentiallyWeightedMovingAverage(0.5, 10000);
61+
for (int i = 0; i < 100000; i++) {
62+
ewma.addValue(1);
63+
}
64+
assertThat(ewma.getAverage(), lessThan(2.0));
65+
}
66+
}

core/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,47 @@ public void testAutoQueueSizingWithMax() throws Exception {
184184
context.close();
185185
}
186186

187+
public void testExecutionEWMACalculation() throws Exception {
188+
ThreadContext context = new ThreadContext(Settings.EMPTY);
189+
ResizableBlockingQueue<Runnable> queue =
190+
new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(),
191+
100);
192+
193+
QueueResizingEsThreadPoolExecutor executor =
194+
new QueueResizingEsThreadPoolExecutor(
195+
"test-threadpool", 1, 1, 1000,
196+
TimeUnit.MILLISECONDS, queue, 10, 200, fastWrapper(), 10, TimeValue.timeValueMillis(1),
197+
EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), context);
198+
executor.prestartAllCoreThreads();
199+
logger.info("--> executor: {}", executor);
200+
201+
assertThat((long)executor.getTaskExecutionEWMA(), equalTo(1000000L));
202+
executeTask(executor, 1);
203+
assertBusy(() -> {
204+
assertThat((long)executor.getTaskExecutionEWMA(), equalTo(700030L));
205+
});
206+
executeTask(executor, 1);
207+
assertBusy(() -> {
208+
assertThat((long)executor.getTaskExecutionEWMA(), equalTo(490050L));
209+
});
210+
executeTask(executor, 1);
211+
assertBusy(() -> {
212+
assertThat((long)executor.getTaskExecutionEWMA(), equalTo(343065L));
213+
});
214+
executeTask(executor, 1);
215+
assertBusy(() -> {
216+
assertThat((long)executor.getTaskExecutionEWMA(), equalTo(240175L));
217+
});
218+
executeTask(executor, 1);
219+
assertBusy(() -> {
220+
assertThat((long)executor.getTaskExecutionEWMA(), equalTo(168153L));
221+
});
222+
223+
executor.shutdown();
224+
executor.awaitTermination(10, TimeUnit.SECONDS);
225+
context.close();
226+
}
227+
187228
private Function<Runnable, Runnable> randomBetweenLimitsWrapper(final int minNs, final int maxNs) {
188229
return (runnable) -> {
189230
return new SettableTimedRunnable(randomIntBetween(minNs, maxNs));
@@ -222,5 +263,10 @@ public SettableTimedRunnable(long timeTaken) {
222263
public long getTotalNanos() {
223264
return timeTaken;
224265
}
266+
267+
@Override
268+
public long getTotalExecutionNanos() {
269+
return timeTaken;
270+
}
225271
}
226272
}

0 commit comments

Comments
 (0)