Skip to content

Commit 7e7e775

Browse files
authored
Reduce logging in DEBUG for MasterService:run (#14795) (#14864)
* Reduce logging in DEBUG for MasteService:run by introducing short and long summary in Taskbatcher Signed-off-by: Sumit Bansal <[email protected]> (cherry picked from commit b35690c) Signed-off-by: Sumit Bansal <[email protected]>
1 parent 10bdfee commit 7e7e775

File tree

5 files changed

+309
-63
lines changed

5 files changed

+309
-63
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2525
- Add SplitResponseProcessor to Search Pipelines (([#14800](https://github.com/opensearch-project/OpenSearch/issues/14800)))
2626
- Optimize TransportNodesAction to not send DiscoveryNodes for NodeStats, NodesInfo and ClusterStats call ([14749](https://github.com/opensearch-project/OpenSearch/pull/14749))
2727
- Refactor remote-routing-table service inline with remote state interfaces([#14668](https://github.com/opensearch-project/OpenSearch/pull/14668))
28+
- Reduce logging in DEBUG for MasterService:run ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795))
2829

2930
### Dependencies
3031
- Update to Apache Lucene 9.11.1 ([#14042](https://github.com/opensearch-project/OpenSearch/pull/14042), [#14576](https://github.com/opensearch-project/OpenSearch/pull/14576))

server/src/main/java/org/opensearch/cluster/service/MasterService.java

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
import java.util.Objects;
8585
import java.util.Optional;
8686
import java.util.concurrent.TimeUnit;
87+
import java.util.function.Function;
8788
import java.util.function.Supplier;
8889
import java.util.stream.Collectors;
8990

@@ -221,10 +222,10 @@ protected void onTimeout(List<? extends BatchedTask> tasks, TimeValue timeout) {
221222
}
222223

223224
@Override
224-
protected void run(Object batchingKey, List<? extends BatchedTask> tasks, String tasksSummary) {
225+
protected void run(Object batchingKey, List<? extends BatchedTask> tasks, Function<Boolean, String> taskSummaryGenerator) {
225226
ClusterStateTaskExecutor<Object> taskExecutor = (ClusterStateTaskExecutor<Object>) batchingKey;
226227
List<UpdateTask> updateTasks = (List<UpdateTask>) tasks;
227-
runTasks(new TaskInputs(taskExecutor, updateTasks, tasksSummary));
228+
runTasks(new TaskInputs(taskExecutor, updateTasks, taskSummaryGenerator));
228229
}
229230

230231
class UpdateTask extends BatchedTask {
@@ -297,26 +298,33 @@ public static boolean assertNotMasterUpdateThread(String reason) {
297298
}
298299

299300
private void runTasks(TaskInputs taskInputs) {
300-
final String summary = taskInputs.summary;
301+
final String longSummary = logger.isTraceEnabled() ? taskInputs.taskSummaryGenerator.apply(true) : "";
302+
final String shortSummary = taskInputs.taskSummaryGenerator.apply(false);
303+
301304
if (!lifecycle.started()) {
302-
logger.debug("processing [{}]: ignoring, cluster-manager service not started", summary);
305+
logger.debug("processing [{}]: ignoring, cluster-manager service not started", shortSummary);
303306
return;
304307
}
305308

306-
logger.debug("executing cluster state update for [{}]", summary);
309+
if (logger.isTraceEnabled()) {
310+
logger.trace("executing cluster state update for [{}]", longSummary);
311+
} else {
312+
logger.debug("executing cluster state update for [{}]", shortSummary);
313+
}
314+
307315
final ClusterState previousClusterState = state();
308316

309317
if (!previousClusterState.nodes().isLocalNodeElectedClusterManager() && taskInputs.runOnlyWhenClusterManager()) {
310-
logger.debug("failing [{}]: local node is no longer cluster-manager", summary);
318+
logger.debug("failing [{}]: local node is no longer cluster-manager", shortSummary);
311319
taskInputs.onNoLongerClusterManager();
312320
return;
313321
}
314322

315323
final long computationStartTime = threadPool.preciseRelativeTimeInNanos();
316-
final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState);
324+
final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState, shortSummary);
317325
taskOutputs.notifyFailedTasks();
318326
final TimeValue computationTime = getTimeSince(computationStartTime);
319-
logExecutionTime(computationTime, "compute cluster state update", summary);
327+
logExecutionTime(computationTime, "compute cluster state update", shortSummary);
320328

321329
clusterManagerMetrics.recordLatency(
322330
clusterManagerMetrics.clusterStateComputeHistogram,
@@ -328,25 +336,25 @@ private void runTasks(TaskInputs taskInputs) {
328336
final long notificationStartTime = threadPool.preciseRelativeTimeInNanos();
329337
taskOutputs.notifySuccessfulTasksOnUnchangedClusterState();
330338
final TimeValue executionTime = getTimeSince(notificationStartTime);
331-
logExecutionTime(executionTime, "notify listeners on unchanged cluster state", summary);
339+
logExecutionTime(executionTime, "notify listeners on unchanged cluster state", shortSummary);
332340
} else {
333341
final ClusterState newClusterState = taskOutputs.newClusterState;
334342
if (logger.isTraceEnabled()) {
335-
logger.trace("cluster state updated, source [{}]\n{}", summary, newClusterState);
343+
logger.trace("cluster state updated, source [{}]\n{}", longSummary, newClusterState);
336344
} else {
337-
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), summary);
345+
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), shortSummary);
338346
}
339347
final long publicationStartTime = threadPool.preciseRelativeTimeInNanos();
340348
try {
341-
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(summary, newClusterState, previousClusterState);
349+
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(shortSummary, newClusterState, previousClusterState);
342350
// new cluster state, notify all listeners
343351
final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
344352
if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
345353
String nodesDeltaSummary = nodesDelta.shortSummary();
346354
if (nodesDeltaSummary.length() > 0) {
347355
logger.info(
348356
"{}, term: {}, version: {}, delta: {}",
349-
summary,
357+
shortSummary,
350358
newClusterState.term(),
351359
newClusterState.version(),
352360
nodesDeltaSummary
@@ -357,7 +365,7 @@ private void runTasks(TaskInputs taskInputs) {
357365
logger.debug("publishing cluster state version [{}]", newClusterState.version());
358366
publish(clusterChangedEvent, taskOutputs, publicationStartTime);
359367
} catch (Exception e) {
360-
handleException(summary, publicationStartTime, newClusterState, e);
368+
handleException(shortSummary, publicationStartTime, newClusterState, e);
361369
}
362370
}
363371
}
@@ -452,8 +460,8 @@ private void handleException(String summary, long startTimeMillis, ClusterState
452460
// TODO: do we want to call updateTask.onFailure here?
453461
}
454462

455-
private TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState) {
456-
ClusterTasksResult<Object> clusterTasksResult = executeTasks(taskInputs, previousClusterState);
463+
private TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState, String taskSummary) {
464+
ClusterTasksResult<Object> clusterTasksResult = executeTasks(taskInputs, previousClusterState, taskSummary);
457465
ClusterState newClusterState = patchVersions(previousClusterState, clusterTasksResult);
458466
return new TaskOutputs(
459467
taskInputs,
@@ -897,7 +905,7 @@ public void onTimeout() {
897905
}
898906
}
899907

900-
private ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs, ClusterState previousClusterState) {
908+
private ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs, ClusterState previousClusterState, String taskSummary) {
901909
ClusterTasksResult<Object> clusterTasksResult;
902910
try {
903911
List<Object> inputs = taskInputs.updateTasks.stream().map(tUpdateTask -> tUpdateTask.task).collect(Collectors.toList());
@@ -913,7 +921,7 @@ private ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs, ClusterSt
913921
"failed to execute cluster state update (on version: [{}], uuid: [{}]) for [{}]\n{}{}{}",
914922
previousClusterState.version(),
915923
previousClusterState.stateUUID(),
916-
taskInputs.summary,
924+
taskSummary,
917925
previousClusterState.nodes(),
918926
previousClusterState.routingTable(),
919927
previousClusterState.getRoutingNodes()
@@ -955,14 +963,19 @@ private List<Batcher.UpdateTask> getNonFailedTasks(TaskInputs taskInputs, Cluste
955963
* Represents a set of tasks to be processed together with their executor
956964
*/
957965
private class TaskInputs {
958-
final String summary;
966+
959967
final List<Batcher.UpdateTask> updateTasks;
960968
final ClusterStateTaskExecutor<Object> executor;
969+
final Function<Boolean, String> taskSummaryGenerator;
961970

962-
TaskInputs(ClusterStateTaskExecutor<Object> executor, List<Batcher.UpdateTask> updateTasks, String summary) {
963-
this.summary = summary;
971+
TaskInputs(
972+
ClusterStateTaskExecutor<Object> executor,
973+
List<Batcher.UpdateTask> updateTasks,
974+
final Function<Boolean, String> taskSummaryGenerator
975+
) {
964976
this.executor = executor;
965977
this.updateTasks = updateTasks;
978+
this.taskSummaryGenerator = taskSummaryGenerator;
966979
}
967980

968981
boolean runOnlyWhenClusterManager() {

server/src/main/java/org/opensearch/cluster/service/TaskBatcher.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,6 @@ void runIfNotProcessed(BatchedTask updateTask) {
177177
// to give other tasks with different batching key a chance to execute.
178178
if (updateTask.processed.get() == false) {
179179
final List<BatchedTask> toExecute = new ArrayList<>();
180-
final Map<String, List<BatchedTask>> processTasksBySource = new HashMap<>();
181180
// While removing task, need to remove task first from taskMap and then remove identity from identityMap.
182181
// Changing this order might lead to duplicate task during submission.
183182
LinkedHashSet<BatchedTask> pending = tasksPerBatchingKey.remove(updateTask.batchingKey);
@@ -187,30 +186,41 @@ void runIfNotProcessed(BatchedTask updateTask) {
187186
if (task.processed.getAndSet(true) == false) {
188187
logger.trace("will process {}", task);
189188
toExecute.add(task);
190-
processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task);
191189
} else {
192190
logger.trace("skipping {}, already processed", task);
193191
}
194192
}
195193
}
196194

197195
if (toExecute.isEmpty() == false) {
198-
final String tasksSummary = processTasksBySource.entrySet().stream().map(entry -> {
199-
String tasks = updateTask.describeTasks(entry.getValue());
200-
return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
201-
}).reduce((s1, s2) -> s1 + ", " + s2).orElse("");
202-
196+
Function<Boolean, String> taskSummaryGenerator = (longSummaryRequired) -> {
197+
if (longSummaryRequired == null || !longSummaryRequired) {
198+
return buildShortSummary(updateTask.batchingKey, toExecute.size());
199+
}
200+
final Map<String, List<BatchedTask>> processTasksBySource = new HashMap<>();
201+
for (final BatchedTask task : toExecute) {
202+
processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task);
203+
}
204+
return processTasksBySource.entrySet().stream().map(entry -> {
205+
String tasks = updateTask.describeTasks(entry.getValue());
206+
return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
207+
}).reduce((s1, s2) -> s1 + ", " + s2).orElse("");
208+
};
203209
taskBatcherListener.onBeginProcessing(toExecute);
204-
run(updateTask.batchingKey, toExecute, tasksSummary);
210+
run(updateTask.batchingKey, toExecute, taskSummaryGenerator);
205211
}
206212
}
207213
}
208214

215+
private String buildShortSummary(final Object batchingKey, final int taskCount) {
216+
return "Tasks batched with key: " + batchingKey.toString().split("\\$")[0] + " and count: " + taskCount;
217+
}
218+
209219
/**
210220
* Action to be implemented by the specific batching implementation
211221
* All tasks have the given batching key.
212222
*/
213-
protected abstract void run(Object batchingKey, List<? extends BatchedTask> tasks, String tasksSummary);
223+
protected abstract void run(Object batchingKey, List<? extends BatchedTask> tasks, Function<Boolean, String> taskSummaryGenerator);
214224

215225
/**
216226
* Represents a runnable task that supports batching.

0 commit comments

Comments
 (0)