Skip to content

Commit aa4fd2b

Browse files
committed
add additional tests for searchBackpressureService and refactor code
Signed-off-by: Kaushal Kumar <[email protected]>
1 parent f4e1d6e commit aa4fd2b

18 files changed

+459
-212
lines changed

server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java

Lines changed: 80 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,11 @@
4242
import org.opensearch.threadpool.ThreadPool;
4343

4444
import java.io.IOException;
45-
import java.util.*;
45+
import java.util.ArrayList;
46+
import java.util.HashMap;
47+
import java.util.List;
48+
import java.util.Map;
49+
import java.util.Optional;
4650
import java.util.function.DoubleSupplier;
4751
import java.util.function.LongSupplier;
4852
import java.util.stream.Collectors;
@@ -166,44 +170,24 @@ void doRun() {
166170

167171
List<CancellableTask> searchTasks = getTaskByType(SearchTask.class);
168172
List<CancellableTask> searchShardTasks = getTaskByType(SearchShardTask.class);
169-
List<CancellableTask> cancellableTasks = new ArrayList<>();
170173

171174
// Force-refresh usage stats of these tasks before making a cancellation decision.
172175
taskResourceTrackingService.refreshResourceStats(searchTasks.toArray(new Task[0]));
173176
taskResourceTrackingService.refreshResourceStats(searchShardTasks.toArray(new Task[0]));
174177

175178
List<TaskCancellation> taskCancellations = new ArrayList<>();
176179

177-
addHeapBasedTaskCancellations(taskCancellations, searchTasks, searchShardTasks);
180+
taskCancellations = addHeapBasedTaskCancellations(taskCancellations, searchTasks, searchShardTasks);
178181

179-
addCPUBasedTaskCancellations(taskCancellations, searchTasks, searchShardTasks);
182+
taskCancellations = addCPUBasedTaskCancellations(taskCancellations, searchTasks, searchShardTasks);
180183

181-
addElapsedTimeBasedTaskCancellations(taskCancellations, searchTasks, searchShardTasks);
184+
taskCancellations = addElapsedTimeBasedTaskCancellations(taskCancellations, searchTasks, searchShardTasks);
182185

183186
// Since these cancellations might be duplicate due to multiple trackers causing cancellation for same task
184187
// We need to merge them
185-
taskCancellations = mergeTaskCancellations(taskCancellations);
186-
187-
// // Check if increase in heap usage is due to SearchTasks
188-
// if (HeapUsageTracker.isHeapUsageDominatedBySearch(
189-
// searchTasks,
190-
// getSettings().getSearchTaskSettings().getTotalHeapPercentThreshold()
191-
// )) {
192-
// cancellableTasks.addAll(searchTasks);
193-
// }
194-
//
195-
// // Check if increase in heap usage is due to SearchShardTasks
196-
// if (HeapUsageTracker.isHeapUsageDominatedBySearch(
197-
// searchShardTasks,
198-
// getSettings().getSearchShardTaskSettings().getTotalHeapPercentThreshold()
199-
// )) {
200-
// cancellableTasks.addAll(searchShardTasks);
201-
// }
202-
203-
// none of the task type is breaching the heap usage thresholds and hence we do not cancel any tasks
204-
// if (taskCancellations.isEmpty()) {
205-
// return;
206-
// }
188+
taskCancellations = mergeTaskCancellations(taskCancellations).stream()
189+
.filter(TaskCancellation::isEligibleForCancellation)
190+
.collect(Collectors.toList());
207191

208192
for (TaskCancellation taskCancellation : taskCancellations) {
209193
logger.warn(
@@ -235,51 +219,86 @@ void doRun() {
235219
}
236220
}
237221

238-
private void addElapsedTimeBasedTaskCancellations(List<TaskCancellation> taskCancellations, List<CancellableTask> searchTasks, List<CancellableTask> searchShardTasks) {
239-
final TaskResourceUsageTrackers.TaskResourceUsageTracker searchTaskElapsedTimeTracker = getTaskResourceUsageTrackersByType(SearchTask.class).getElapsedTimeTracker();
240-
final TaskResourceUsageTrackers.TaskResourceUsageTracker searchShardTaskElapsedTimeTracker = getTaskResourceUsageTrackersByType(SearchShardTask.class).getElapsedTimeTracker();
241-
242-
taskCancellations.addAll(
243-
searchTaskElapsedTimeTracker.getTaskCancellations(searchTasks, searchBackpressureStates.get(SearchTask.class)::incrementCancellationCount)
222+
private List<TaskCancellation> addElapsedTimeBasedTaskCancellations(
223+
List<TaskCancellation> taskCancellations,
224+
List<CancellableTask> searchTasks,
225+
List<CancellableTask> searchShardTasks
226+
) {
227+
final Optional<TaskResourceUsageTrackers.TaskResourceUsageTracker> searchTaskElapsedTimeTracker =
228+
getTaskResourceUsageTrackersByType(SearchTask.class).getElapsedTimeTracker();
229+
final Optional<TaskResourceUsageTrackers.TaskResourceUsageTracker> searchShardTaskElapsedTimeTracker =
230+
getTaskResourceUsageTrackersByType(SearchShardTask.class).getElapsedTimeTracker();
231+
232+
addTaskCancellationsFromTaskResourceUsageTracker(taskCancellations, searchTasks, searchTaskElapsedTimeTracker, SearchTask.class);
233+
234+
addTaskCancellationsFromTaskResourceUsageTracker(
235+
taskCancellations,
236+
searchShardTasks,
237+
searchShardTaskElapsedTimeTracker,
238+
SearchShardTask.class
244239
);
245240

246-
taskCancellations.addAll(
247-
searchShardTaskElapsedTimeTracker.getTaskCancellations(searchShardTasks, searchBackpressureStates.get(SearchShardTask.class)::incrementCancellationCount)
248-
);
241+
return taskCancellations;
249242
}
250243

251-
private void addCPUBasedTaskCancellations(List<TaskCancellation> taskCancellations, List<CancellableTask> searchTasks, List<CancellableTask> searchShardTasks) {
244+
private List<TaskCancellation> addCPUBasedTaskCancellations(
245+
List<TaskCancellation> taskCancellations,
246+
List<CancellableTask> searchTasks,
247+
List<CancellableTask> searchShardTasks
248+
) {
252249
if (nodeDuressTrackers.isCPUInDuress()) {
253-
final TaskResourceUsageTrackers.TaskResourceUsageTracker searchTaskCPUUsageTracker = getTaskResourceUsageTrackersByType(SearchTask.class).getCpuUsageTracker();
254-
final TaskResourceUsageTrackers.TaskResourceUsageTracker searchShardTaskCPUUsageTracker = getTaskResourceUsageTrackersByType(SearchShardTask.class).getCpuUsageTracker();
255-
256-
taskCancellations.addAll(
257-
searchTaskCPUUsageTracker
258-
.getTaskCancellations(searchTasks, searchBackpressureStates.get(SearchTask.class)::incrementCancellationCount)
259-
);
260-
261-
taskCancellations.addAll(
262-
searchShardTaskCPUUsageTracker
263-
.getTaskCancellations(searchShardTasks, searchBackpressureStates.get(SearchTask.class)::incrementCancellationCount)
250+
final Optional<TaskResourceUsageTrackers.TaskResourceUsageTracker> searchTaskCPUUsageTracker =
251+
getTaskResourceUsageTrackersByType(SearchTask.class).getCpuUsageTracker();
252+
final Optional<TaskResourceUsageTrackers.TaskResourceUsageTracker> searchShardTaskCPUUsageTracker =
253+
getTaskResourceUsageTrackersByType(SearchShardTask.class).getCpuUsageTracker();
254+
255+
addTaskCancellationsFromTaskResourceUsageTracker(taskCancellations, searchTasks, searchTaskCPUUsageTracker, SearchTask.class);
256+
257+
addTaskCancellationsFromTaskResourceUsageTracker(
258+
taskCancellations,
259+
searchShardTasks,
260+
searchShardTaskCPUUsageTracker,
261+
SearchShardTask.class
264262
);
265263
}
264+
return taskCancellations;
266265
}
267266

268-
private void addHeapBasedTaskCancellations(List<TaskCancellation> taskCancellations, List<CancellableTask> searchTasks, List<CancellableTask> searchShardTasks) {
267+
private List<TaskCancellation> addHeapBasedTaskCancellations(
268+
List<TaskCancellation> taskCancellations,
269+
List<CancellableTask> searchTasks,
270+
List<CancellableTask> searchShardTasks
271+
) {
269272
if (isHeapTrackingSupported() && nodeDuressTrackers.isHeapInDuress()) {
270-
final TaskResourceUsageTrackers.TaskResourceUsageTracker searchTaskHeapUsageTracker = getTaskResourceUsageTrackersByType(SearchTask.class).getHeapUsageTracker();
271-
final TaskResourceUsageTrackers.TaskResourceUsageTracker searchShardTaskHeapUsageTracker = getTaskResourceUsageTrackersByType(SearchShardTask.class).getHeapUsageTracker();
272-
273-
taskCancellations = searchTaskHeapUsageTracker
274-
.getTaskCancellations(searchTasks, searchBackpressureStates.get(SearchTask.class)::incrementCancellationCount);
275-
276-
taskCancellations.addAll(
277-
searchShardTaskHeapUsageTracker
278-
.getTaskCancellations(searchShardTasks, searchBackpressureStates.get(SearchShardTask.class)::incrementCompletionCount)
273+
final Optional<TaskResourceUsageTrackers.TaskResourceUsageTracker> searchTaskHeapUsageTracker =
274+
getTaskResourceUsageTrackersByType(SearchTask.class).getHeapUsageTracker();
275+
final Optional<TaskResourceUsageTrackers.TaskResourceUsageTracker> searchShardTaskHeapUsageTracker =
276+
getTaskResourceUsageTrackersByType(SearchShardTask.class).getHeapUsageTracker();
277+
278+
addTaskCancellationsFromTaskResourceUsageTracker(taskCancellations, searchTasks, searchTaskHeapUsageTracker, SearchTask.class);
279+
280+
addTaskCancellationsFromTaskResourceUsageTracker(
281+
taskCancellations,
282+
searchShardTasks,
283+
searchShardTaskHeapUsageTracker,
284+
SearchShardTask.class
279285
);
280286
}
287+
return taskCancellations;
281288
}
282289

290+
private void addTaskCancellationsFromTaskResourceUsageTracker(
291+
List<TaskCancellation> taskCancellations,
292+
List<CancellableTask> tasks,
293+
Optional<TaskResourceUsageTrackers.TaskResourceUsageTracker> taskResourceUsageTracker,
294+
Class<?> type
295+
) {
296+
taskResourceUsageTracker.ifPresent(
297+
tracker -> taskCancellations.addAll(
298+
tracker.getTaskCancellations(tasks, searchBackpressureStates.get(type)::incrementCancellationCount)
299+
)
300+
);
301+
}
283302

284303
/**
285304
* returns the taskTrackers for given type
@@ -290,7 +309,6 @@ private TaskResourceUsageTrackers getTaskResourceUsageTrackersByType(Class<? ext
290309
return taskTrackers.get(type);
291310
}
292311

293-
294312
/**
295313
* Method to reduce the taskCancellations into unique bunch
296314
* @param taskCancellations
@@ -299,14 +317,14 @@ private TaskResourceUsageTrackers getTaskResourceUsageTrackersByType(Class<? ext
299317
private List<TaskCancellation> mergeTaskCancellations(final List<TaskCancellation> taskCancellations) {
300318
final Map<Long, TaskCancellation> uniqueTaskCancellations = new HashMap<>();
301319

302-
for (TaskCancellation taskCancellation: taskCancellations) {
320+
for (TaskCancellation taskCancellation : taskCancellations) {
303321
final long taskId = taskCancellation.getTask().getId();
304-
uniqueTaskCancellations.put(taskId,
305-
uniqueTaskCancellations.getOrDefault(taskId, taskCancellation).merge(taskCancellation));
322+
uniqueTaskCancellations.put(taskId, uniqueTaskCancellations.getOrDefault(taskId, taskCancellation).merge(taskCancellation));
306323
}
307324

308325
return new ArrayList<>(uniqueTaskCancellations.values());
309326
}
327+
310328
/**
311329
* Given a task, returns the type of the task
312330
*/

server/src/main/java/org/opensearch/search/backpressure/stats/SearchShardTaskStats.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
import org.opensearch.search.backpressure.trackers.CpuUsageTracker;
1919
import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker;
2020
import org.opensearch.search.backpressure.trackers.HeapUsageTracker;
21-
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers;
2221
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType;
22+
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers;
2323

2424
import java.io.IOException;
2525
import java.util.Map;
@@ -67,7 +67,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
6767
builder.startObject();
6868

6969
builder.startObject("resource_tracker_stats");
70-
for (Map.Entry<TaskResourceUsageTrackerType, TaskResourceUsageTrackers.TaskResourceUsageTracker.Stats> entry : resourceUsageTrackerStats.entrySet()) {
70+
for (Map.Entry<
71+
TaskResourceUsageTrackerType,
72+
TaskResourceUsageTrackers.TaskResourceUsageTracker.Stats> entry : resourceUsageTrackerStats.entrySet()) {
7173
builder.field(entry.getKey().getName(), entry.getValue());
7274
}
7375
builder.endObject();

server/src/main/java/org/opensearch/search/backpressure/stats/SearchTaskStats.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
6868
builder.startObject();
6969

7070
builder.startObject("resource_tracker_stats");
71-
for (Map.Entry<TaskResourceUsageTrackerType, TaskResourceUsageTrackers.TaskResourceUsageTracker.Stats> entry : resourceUsageTrackerStats.entrySet()) {
71+
for (Map.Entry<
72+
TaskResourceUsageTrackerType,
73+
TaskResourceUsageTrackers.TaskResourceUsageTracker.Stats> entry : resourceUsageTrackerStats.entrySet()) {
7274
builder.field(entry.getKey().getName(), entry.getValue());
7375
}
7476
builder.endObject();

server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -34,35 +34,37 @@ public class CpuUsageTracker extends TaskResourceUsageTrackers.TaskResourceUsage
3434
private final LongSupplier thresholdSupplier;
3535

3636
public CpuUsageTracker(LongSupplier thresholdSupplier) {
37+
this(thresholdSupplier, (task) -> {
38+
long usage = task.getTotalResourceStats().getCpuTimeInNanos();
39+
long threshold = thresholdSupplier.getAsLong();
40+
41+
if (usage < threshold) {
42+
return Optional.empty();
43+
}
44+
45+
return Optional.of(
46+
new TaskCancellation.Reason(
47+
"cpu usage exceeded ["
48+
+ new TimeValue(usage, TimeUnit.NANOSECONDS)
49+
+ " >= "
50+
+ new TimeValue(threshold, TimeUnit.NANOSECONDS)
51+
+ "]",
52+
1 // TODO: fine-tune the cancellation score/weight
53+
)
54+
);
55+
});
56+
}
57+
58+
public CpuUsageTracker(LongSupplier thresholdSupplier, ResourceUsageBreachEvaluator resourceUsageBreachEvaluator) {
3759
this.thresholdSupplier = thresholdSupplier;
60+
this.resourceUsageBreachEvaluator = resourceUsageBreachEvaluator;
3861
}
3962

4063
@Override
4164
public String name() {
4265
return CPU_USAGE_TRACKER.getName();
4366
}
4467

45-
@Override
46-
public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task) {
47-
long usage = task.getTotalResourceStats().getCpuTimeInNanos();
48-
long threshold = thresholdSupplier.getAsLong();
49-
50-
if (usage < threshold) {
51-
return Optional.empty();
52-
}
53-
54-
return Optional.of(
55-
new TaskCancellation.Reason(
56-
"cpu usage exceeded ["
57-
+ new TimeValue(usage, TimeUnit.NANOSECONDS)
58-
+ " >= "
59-
+ new TimeValue(threshold, TimeUnit.NANOSECONDS)
60-
+ "]",
61-
1 // TODO: fine-tune the cancellation score/weight
62-
)
63-
);
64-
}
65-
6668
@Override
6769
public TaskResourceUsageTrackers.TaskResourceUsageTracker.Stats stats(List<? extends Task> activeTasks) {
6870
long currentMax = activeTasks.stream().mapToLong(t -> t.getTotalResourceStats().getCpuTimeInNanos()).max().orElse(0);

server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -34,36 +34,42 @@ public class ElapsedTimeTracker extends TaskResourceUsageTrackers.TaskResourceUs
3434
private final LongSupplier timeNanosSupplier;
3535

3636
public ElapsedTimeTracker(LongSupplier thresholdSupplier, LongSupplier timeNanosSupplier) {
37+
this(thresholdSupplier, timeNanosSupplier, (Task task) -> {
38+
long usage = timeNanosSupplier.getAsLong() - task.getStartTimeNanos();
39+
long threshold = thresholdSupplier.getAsLong();
40+
41+
if (usage < threshold) {
42+
return Optional.empty();
43+
}
44+
45+
return Optional.of(
46+
new TaskCancellation.Reason(
47+
"elapsed time exceeded ["
48+
+ new TimeValue(usage, TimeUnit.NANOSECONDS)
49+
+ " >= "
50+
+ new TimeValue(threshold, TimeUnit.NANOSECONDS)
51+
+ "]",
52+
1 // TODO: fine-tune the cancellation score/weight
53+
)
54+
);
55+
});
56+
}
57+
58+
public ElapsedTimeTracker(
59+
LongSupplier thresholdSupplier,
60+
LongSupplier timeNanosSupplier,
61+
ResourceUsageBreachEvaluator resourceUsageBreachEvaluator
62+
) {
3763
this.thresholdSupplier = thresholdSupplier;
3864
this.timeNanosSupplier = timeNanosSupplier;
65+
this.resourceUsageBreachEvaluator = resourceUsageBreachEvaluator;
3966
}
4067

4168
@Override
4269
public String name() {
4370
return ELAPSED_TIME_TRACKER.getName();
4471
}
4572

46-
@Override
47-
public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task) {
48-
long usage = timeNanosSupplier.getAsLong() - task.getStartTimeNanos();
49-
long threshold = thresholdSupplier.getAsLong();
50-
51-
if (usage < threshold) {
52-
return Optional.empty();
53-
}
54-
55-
return Optional.of(
56-
new TaskCancellation.Reason(
57-
"elapsed time exceeded ["
58-
+ new TimeValue(usage, TimeUnit.NANOSECONDS)
59-
+ " >= "
60-
+ new TimeValue(threshold, TimeUnit.NANOSECONDS)
61-
+ "]",
62-
1 // TODO: fine-tune the cancellation score/weight
63-
)
64-
);
65-
}
66-
6773
@Override
6874
public TaskResourceUsageTrackers.TaskResourceUsageTracker.Stats stats(List<? extends Task> activeTasks) {
6975
long now = timeNanosSupplier.getAsLong();

0 commit comments

Comments
 (0)