diff --git a/CHANGELOG.md b/CHANGELOG.md index a1485c47e279a..12497a48fe21b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [repository-s3] Add support for SSE-KMS and S3 bucket owner verification ([#18312](https://github.com/opensearch-project/OpenSearch/pull/18312)) - Optimize gRPC perf by passing by reference ([#18303](https://github.com/opensearch-project/OpenSearch/pull/18303)) - Added File Cache Stats - Involves Block level as well as full file level stats ([#17538](https://github.com/opensearch-project/OpenSearch/issues/17479)) +- Added time_in_execution attribute to /_cluster/pending_tasks response ([#17780](https://github.com/opensearch-project/OpenSearch/pull/17780)) - Added File Cache Pinning ([#17617](https://github.com/opensearch-project/OpenSearch/issues/13648)) - Support consumer reset in Resume API for pull-based ingestion. This PR includes a breaking change for the experimental pull-based ingestion feature. ([#18332](https://github.com/opensearch-project/OpenSearch/pull/18332)) @@ -40,6 +41,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Create generic DocRequest to better categorize ActionRequests ([#18269](https://github.com/opensearch-project/OpenSearch/pull/18269))) - Change implementation for `percentiles` aggregation for latency improvement [#18124](https://github.com/opensearch-project/OpenSearch/pull/18124) + ### Dependencies - Update Apache Lucene from 10.1.0 to 10.2.1 ([#17961](https://github.com/opensearch-project/OpenSearch/pull/17961)) - Bump `com.google.code.gson:gson` from 2.12.1 to 2.13.1 ([#17923](https://github.com/opensearch-project/OpenSearch/pull/17923), [#18266](https://github.com/opensearch-project/OpenSearch/pull/18266)) diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/service/ClusterServiceIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/service/ClusterServiceIT.java index 7d9ffb23a2cf7..d82d9c0ed05e3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/service/ClusterServiceIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/service/ClusterServiceIT.java @@ -351,8 +351,9 @@ public void testPendingUpdateTask() throws Exception { clusterService.submitStateUpdateTask("1", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - invoked1.countDown(); try { + Thread.sleep(50); + invoked1.countDown(); block1.await(); } catch (InterruptedException e) { fail(); @@ -395,6 +396,9 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS assertThat(pendingClusterTasks.size(), greaterThanOrEqualTo(10)); assertThat(pendingClusterTasks.get(0).getSource().string(), equalTo("1")); assertThat(pendingClusterTasks.get(0).isExecuting(), equalTo(true)); + assertThat(pendingClusterTasks.get(0).getTimeInExecutionInMillis(), greaterThan(0L)); + assertThat(pendingClusterTasks.get(1).isExecuting(), equalTo(false)); + assertThat(pendingClusterTasks.get(1).getTimeInExecutionInMillis(), equalTo(0L)); for (PendingClusterTask task : pendingClusterTasks) { controlSources.remove(task.getSource().string()); } @@ -405,6 +409,9 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS assertThat(response.pendingTasks().size(), greaterThanOrEqualTo(10)); assertThat(response.pendingTasks().get(0).getSource().string(), equalTo("1")); assertThat(response.pendingTasks().get(0).isExecuting(), equalTo(true)); + assertThat(response.pendingTasks().get(0).getTimeInExecutionInMillis(), greaterThan(0L)); + assertThat(response.pendingTasks().get(1).isExecuting(), equalTo(false)); + assertThat(response.pendingTasks().get(1).getTimeInExecutionInMillis(), equalTo(0L)); for (PendingClusterTask task : response) { controlSources.remove(task.getSource().string()); } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/tasks/PendingClusterTasksResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/tasks/PendingClusterTasksResponse.java index 9f4568c88b273..138538df0de00 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/tasks/PendingClusterTasksResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/tasks/PendingClusterTasksResponse.java @@ -91,6 +91,8 @@ public String toString() { .append(pendingClusterTask.getSource()) .append("/") .append(pendingClusterTask.getTimeInQueue()) + .append("/") + .append(pendingClusterTask.getTimeInExecution()) .append("\n"); } return sb.toString(); @@ -108,6 +110,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(Fields.EXECUTING, pendingClusterTask.isExecuting()); builder.field(Fields.TIME_IN_QUEUE_MILLIS, pendingClusterTask.getTimeInQueueInMillis()); builder.field(Fields.TIME_IN_QUEUE, pendingClusterTask.getTimeInQueue()); + builder.field(Fields.TIME_IN_EXECUTION_MILLIS, pendingClusterTask.getTimeInExecutionInMillis()); + builder.field(Fields.TIME_IN_EXECUTION, pendingClusterTask.getTimeInExecution()); builder.endObject(); } builder.endArray(); @@ -129,6 +133,8 @@ static final class Fields { static final String SOURCE = "source"; static final String TIME_IN_QUEUE_MILLIS = "time_in_queue_millis"; static final String TIME_IN_QUEUE = "time_in_queue"; + static final String TIME_IN_EXECUTION_MILLIS = "time_in_execution_millis"; + static final String TIME_IN_EXECUTION = "time_in_execution"; } diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerService.java index 4019145e2f823..de2a4a998f2b0 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerService.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerService.java @@ -609,7 +609,8 @@ public List pendingTasks() { pending.priority, new Text(task.source()), task.getAgeInMillis(), - pending.executing + pending.executing, + pending.executionTimeInMillis ); }).collect(Collectors.toList()); } diff --git a/server/src/main/java/org/opensearch/cluster/service/PendingClusterTask.java b/server/src/main/java/org/opensearch/cluster/service/PendingClusterTask.java index b06c537e7bac5..c6bc184ee9af8 100644 --- a/server/src/main/java/org/opensearch/cluster/service/PendingClusterTask.java +++ b/server/src/main/java/org/opensearch/cluster/service/PendingClusterTask.java @@ -32,6 +32,7 @@ package org.opensearch.cluster.service; +import org.opensearch.Version; import org.opensearch.common.Priority; import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.unit.TimeValue; @@ -55,6 +56,7 @@ public class PendingClusterTask implements Writeable { private Text source; private long timeInQueue; private boolean executing; + private long timeInExecution; public PendingClusterTask(StreamInput in) throws IOException { insertOrder = in.readVLong(); @@ -62,16 +64,21 @@ public PendingClusterTask(StreamInput in) throws IOException { source = in.readText(); timeInQueue = in.readLong(); executing = in.readBoolean(); + if (in.getVersion().onOrAfter(Version.V_3_1_0)) { + timeInExecution = in.readLong(); + } } - public PendingClusterTask(long insertOrder, Priority priority, Text source, long timeInQueue, boolean executing) { + public PendingClusterTask(long insertOrder, Priority priority, Text source, long timeInQueue, boolean executing, long timeInExecution) { assert timeInQueue >= 0 : "got a negative timeInQueue [" + timeInQueue + "]"; assert insertOrder >= 0 : "got a negative insertOrder [" + insertOrder + "]"; + assert timeInExecution >= 0 : "got a negative timeInExecution [" + timeInExecution + "]"; this.insertOrder = insertOrder; this.priority = priority; this.source = source; this.timeInQueue = timeInQueue; this.executing = executing; + this.timeInExecution = timeInExecution; } public long getInsertOrder() { @@ -90,10 +97,18 @@ public long getTimeInQueueInMillis() { return timeInQueue; } + public long getTimeInExecutionInMillis() { + return timeInExecution; + } + public TimeValue getTimeInQueue() { return new TimeValue(getTimeInQueueInMillis()); } + public TimeValue getTimeInExecution() { + return new TimeValue(getTimeInExecutionInMillis()); + } + public boolean isExecuting() { return executing; } @@ -105,5 +120,8 @@ public void writeTo(StreamOutput out) throws IOException { out.writeText(source); out.writeLong(timeInQueue); out.writeBoolean(executing); + if (out.getVersion().onOrAfter(Version.V_3_1_0)) { + out.writeLong(timeInExecution); + } } } diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/PrioritizedOpenSearchThreadPoolExecutor.java b/server/src/main/java/org/opensearch/common/util/concurrent/PrioritizedOpenSearchThreadPoolExecutor.java index 95df4486b9d7b..c9bc466d128ec 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/PrioritizedOpenSearchThreadPoolExecutor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/PrioritizedOpenSearchThreadPoolExecutor.java @@ -38,6 +38,8 @@ import java.util.List; import java.util.Queue; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.FutureTask; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.RunnableFuture; @@ -62,6 +64,7 @@ public class PrioritizedOpenSearchThreadPoolExecutor extends OpenSearchThreadPoo private final AtomicLong insertionOrder = new AtomicLong(); private final Queue current = ConcurrentCollections.newQueue(); private final ScheduledExecutorService timer; + private final ConcurrentMap taskMetrics = new ConcurrentHashMap<>(); public PrioritizedOpenSearchThreadPoolExecutor( String name, @@ -114,6 +117,15 @@ public TimeValue getMaxTaskWaitTime() { private void addPending(List runnables, List pending, boolean executing) { for (Runnable runnable : runnables) { + long executionTimeInMillis = 0; + + if (executing) { + TaskMetrics metrics = taskMetrics.get(runnable); + if (metrics != null) { + executionTimeInMillis = metrics.getExecutionTimeMillis(); + } + } + if (runnable instanceof TieBreakingPrioritizedRunnable) { TieBreakingPrioritizedRunnable t = (TieBreakingPrioritizedRunnable) runnable; Runnable innerRunnable = t.runnable; @@ -122,7 +134,7 @@ private void addPending(List runnables, List pending, boolean innerRunnable can be null if task is finished but not removed from executor yet, see {@link TieBreakingPrioritizedRunnable#run} and {@link TieBreakingPrioritizedRunnable#runAndClean} */ - pending.add(new Pending(super.unwrap(innerRunnable), t.priority(), t.insertionOrder, executing)); + pending.add(new Pending(super.unwrap(innerRunnable), t.priority(), t.insertionOrder, executing, executionTimeInMillis)); } } else if (runnable instanceof PrioritizedFutureTask) { PrioritizedFutureTask t = (PrioritizedFutureTask) runnable; @@ -130,7 +142,7 @@ private void addPending(List runnables, List pending, boolean if (t.task instanceof Runnable) { task = super.unwrap((Runnable) t.task); } - pending.add(new Pending(task, t.priority, t.insertionOrder, executing)); + pending.add(new Pending(task, t.priority, t.insertionOrder, executing, executionTimeInMillis)); } } } @@ -138,12 +150,14 @@ private void addPending(List runnables, List pending, boolean @Override protected void beforeExecute(Thread t, Runnable r) { current.add(r); + taskMetrics.put(r, new TaskMetrics()); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); current.remove(r); + taskMetrics.remove(r); // Clean up metrics when task completes } public void execute(Runnable command, final TimeValue timeout, final Runnable timeoutCallback) { @@ -211,12 +225,14 @@ public static class Pending { public final Priority priority; public final long insertionOrder; public final boolean executing; + public final long executionTimeInMillis; - public Pending(Object task, Priority priority, long insertionOrder, boolean executing) { + public Pending(Object task, Priority priority, long insertionOrder, boolean executing, long executionTimeInMillis) { this.task = task; this.priority = priority; this.insertionOrder = insertionOrder; this.executing = executing; + this.executionTimeInMillis = executionTimeInMillis; } } @@ -291,7 +307,6 @@ private void runAndClean(Runnable run) { public Runnable unwrap() { return runnable; } - } private static final class PrioritizedFutureTask extends FutureTask implements Comparable { @@ -324,4 +339,24 @@ public int compareTo(PrioritizedFutureTask pft) { } } + /** + * Generic class to track various task metrics. + * This implementation tracks task execution time, but can be extended + * to include additional metrics. + */ + private static class TaskMetrics { + private final long startTimeNanos; + + TaskMetrics() { + this.startTimeNanos = System.nanoTime(); + } + + /** + * Get the task execution time in milliseconds. + */ + long getExecutionTimeMillis() { + return TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNanos, TimeUnit.NANOSECONDS); + } + } + } diff --git a/server/src/test/java/org/opensearch/common/util/concurrent/PrioritizedExecutorsTests.java b/server/src/test/java/org/opensearch/common/util/concurrent/PrioritizedExecutorsTests.java index 0e9a2f0819408..a6cdc4de8cfaf 100644 --- a/server/src/test/java/org/opensearch/common/util/concurrent/PrioritizedExecutorsTests.java +++ b/server/src/test/java/org/opensearch/common/util/concurrent/PrioritizedExecutorsTests.java @@ -52,6 +52,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; public class PrioritizedExecutorsTests extends OpenSearchTestCase { @@ -231,6 +232,7 @@ public void testTimeout() throws Exception { @Override public void run() { try { + Thread.sleep(50); invoked.countDown(); block.await(); } catch (InterruptedException e) { @@ -248,6 +250,7 @@ public String toString() { assertThat(pending.length, equalTo(1)); assertThat(pending[0].task.toString(), equalTo("the blocking")); assertThat(pending[0].executing, equalTo(true)); + assertThat(pending[0].executionTimeInMillis, greaterThan(0L)); final AtomicBoolean executeCalled = new AtomicBoolean(); final CountDownLatch timedOut = new CountDownLatch(1); @@ -272,8 +275,10 @@ public void run() { assertThat(pending.length, equalTo(2)); assertThat(pending[0].task.toString(), equalTo("the blocking")); assertThat(pending[0].executing, equalTo(true)); + assertThat(pending[0].executionTimeInMillis, greaterThan(0L)); assertThat(pending[1].task.toString(), equalTo("the waiting")); assertThat(pending[1].executing, equalTo(false)); + assertThat(pending[1].executionTimeInMillis, equalTo(0L)); assertThat(timedOut.await(2, TimeUnit.SECONDS), equalTo(true)); block.countDown();