Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for `matched_fields` with the unified highlighter ([#18164](https://github.com/opensearch-project/OpenSearch/issues/18164))
- [repository-s3] Add support for SSE-KMS and S3 bucket owner verification ([#18312](https://github.com/opensearch-project/OpenSearch/pull/18312))
- Added File Cache Stats - Involves Block level as well as full file level stats ([#17538](https://github.com/opensearch-project/OpenSearch/issues/17479))
- Added time_in_execution attribute to /_cluster/pending_tasks response ([#17780](https://github.com/opensearch-project/OpenSearch/pull/17780))

### Changed
- Create generic DocRequest to better categorize ActionRequests ([#18269](https://github.com/opensearch-project/OpenSearch/pull/18269)))

### Dependencies
- Update Apache Lucene from 10.1.0 to 10.2.1 ([#17961](https://github.com/opensearch-project/OpenSearch/pull/17961))
- Bump `com.google.code.gson:gson` from 2.12.1 to 2.13.1 ([#17923](https://github.com/opensearch-project/OpenSearch/pull/17923), [#18266](https://github.com/opensearch-project/OpenSearch/pull/18266))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,9 @@ public void testPendingUpdateTask() throws Exception {
clusterService.submitStateUpdateTask("1", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
invoked1.countDown();
try {
Thread.sleep(50);
invoked1.countDown();
block1.await();
} catch (InterruptedException e) {
fail();
Expand Down Expand Up @@ -395,6 +396,9 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
assertThat(pendingClusterTasks.size(), greaterThanOrEqualTo(10));
assertThat(pendingClusterTasks.get(0).getSource().string(), equalTo("1"));
assertThat(pendingClusterTasks.get(0).isExecuting(), equalTo(true));
assertThat(pendingClusterTasks.get(0).getTimeInExecutionInMillis(), greaterThan(0L));
assertThat(pendingClusterTasks.get(1).isExecuting(), equalTo(false));
assertThat(pendingClusterTasks.get(1).getTimeInExecutionInMillis(), equalTo(0L));
for (PendingClusterTask task : pendingClusterTasks) {
controlSources.remove(task.getSource().string());
}
Expand All @@ -405,6 +409,9 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
assertThat(response.pendingTasks().size(), greaterThanOrEqualTo(10));
assertThat(response.pendingTasks().get(0).getSource().string(), equalTo("1"));
assertThat(response.pendingTasks().get(0).isExecuting(), equalTo(true));
assertThat(response.pendingTasks().get(0).getTimeInExecutionInMillis(), greaterThan(0L));
assertThat(response.pendingTasks().get(1).isExecuting(), equalTo(false));
assertThat(response.pendingTasks().get(1).getTimeInExecutionInMillis(), equalTo(0L));
for (PendingClusterTask task : response) {
controlSources.remove(task.getSource().string());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@
.append(pendingClusterTask.getSource())
.append("/")
.append(pendingClusterTask.getTimeInQueue())
.append("/")
.append(pendingClusterTask.getTimeInExecution())

Check warning on line 95 in server/src/main/java/org/opensearch/action/admin/cluster/tasks/PendingClusterTasksResponse.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/tasks/PendingClusterTasksResponse.java#L94-L95

Added lines #L94 - L95 were not covered by tests
.append("\n");
}
return sb.toString();
Expand All @@ -108,6 +110,8 @@
builder.field(Fields.EXECUTING, pendingClusterTask.isExecuting());
builder.field(Fields.TIME_IN_QUEUE_MILLIS, pendingClusterTask.getTimeInQueueInMillis());
builder.field(Fields.TIME_IN_QUEUE, pendingClusterTask.getTimeInQueue());
builder.field(Fields.TIME_IN_EXECUTION_MILLIS, pendingClusterTask.getTimeInExecutionInMillis());
builder.field(Fields.TIME_IN_EXECUTION, pendingClusterTask.getTimeInExecution());

Check warning on line 114 in server/src/main/java/org/opensearch/action/admin/cluster/tasks/PendingClusterTasksResponse.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/tasks/PendingClusterTasksResponse.java#L113-L114

Added lines #L113 - L114 were not covered by tests
builder.endObject();
}
builder.endArray();
Expand All @@ -129,6 +133,8 @@
static final String SOURCE = "source";
static final String TIME_IN_QUEUE_MILLIS = "time_in_queue_millis";
static final String TIME_IN_QUEUE = "time_in_queue";
static final String TIME_IN_EXECUTION_MILLIS = "time_in_execution_millis";
static final String TIME_IN_EXECUTION = "time_in_execution";

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,8 @@ public List<PendingClusterTask> pendingTasks() {
pending.priority,
new Text(task.source()),
task.getAgeInMillis(),
pending.executing
pending.executing,
pending.executionTimeInMillis
);
}).collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.cluster.service;

import org.opensearch.Version;
import org.opensearch.common.Priority;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.unit.TimeValue;
Expand All @@ -55,23 +56,29 @@
private Text source;
private long timeInQueue;
private boolean executing;
private long timeInExecution;

public PendingClusterTask(StreamInput in) throws IOException {
insertOrder = in.readVLong();
priority = Priority.readFrom(in);
source = in.readText();
timeInQueue = in.readLong();
executing = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_3_1_0)) {
timeInExecution = in.readLong();

Check warning on line 68 in server/src/main/java/org/opensearch/cluster/service/PendingClusterTask.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/service/PendingClusterTask.java#L68

Added line #L68 was not covered by tests
}
}

public PendingClusterTask(long insertOrder, Priority priority, Text source, long timeInQueue, boolean executing) {
public PendingClusterTask(long insertOrder, Priority priority, Text source, long timeInQueue, boolean executing, long timeInExecution) {

Check warning on line 72 in server/src/main/java/org/opensearch/cluster/service/PendingClusterTask.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/service/PendingClusterTask.java#L72

Added line #L72 was not covered by tests
assert timeInQueue >= 0 : "got a negative timeInQueue [" + timeInQueue + "]";
assert insertOrder >= 0 : "got a negative insertOrder [" + insertOrder + "]";
assert timeInExecution >= 0 : "got a negative timeInExecution [" + timeInExecution + "]";
this.insertOrder = insertOrder;
this.priority = priority;
this.source = source;
this.timeInQueue = timeInQueue;
this.executing = executing;
this.timeInExecution = timeInExecution;

Check warning on line 81 in server/src/main/java/org/opensearch/cluster/service/PendingClusterTask.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/service/PendingClusterTask.java#L81

Added line #L81 was not covered by tests
}

public long getInsertOrder() {
Expand All @@ -90,10 +97,18 @@
return timeInQueue;
}

public long getTimeInExecutionInMillis() {
return timeInExecution;

Check warning on line 101 in server/src/main/java/org/opensearch/cluster/service/PendingClusterTask.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/service/PendingClusterTask.java#L101

Added line #L101 was not covered by tests
}

public TimeValue getTimeInQueue() {
return new TimeValue(getTimeInQueueInMillis());
}

public TimeValue getTimeInExecution() {
return new TimeValue(getTimeInExecutionInMillis());

Check warning on line 109 in server/src/main/java/org/opensearch/cluster/service/PendingClusterTask.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/service/PendingClusterTask.java#L109

Added line #L109 was not covered by tests
}

public boolean isExecuting() {
return executing;
}
Expand All @@ -105,5 +120,8 @@
out.writeText(source);
out.writeLong(timeInQueue);
out.writeBoolean(executing);
if (out.getVersion().onOrAfter(Version.V_3_1_0)) {
out.writeLong(timeInExecution);

Check warning on line 124 in server/src/main/java/org/opensearch/cluster/service/PendingClusterTask.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/service/PendingClusterTask.java#L124

Added line #L124 was not covered by tests
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.FutureTask;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RunnableFuture;
Expand All @@ -62,6 +64,7 @@
private final AtomicLong insertionOrder = new AtomicLong();
private final Queue<Runnable> current = ConcurrentCollections.newQueue();
private final ScheduledExecutorService timer;
private final ConcurrentMap<Runnable, TaskMetrics> taskMetrics = new ConcurrentHashMap<>();

public PrioritizedOpenSearchThreadPoolExecutor(
String name,
Expand Down Expand Up @@ -114,6 +117,15 @@

private void addPending(List<Runnable> runnables, List<Pending> pending, boolean executing) {
for (Runnable runnable : runnables) {
long executionTimeInMillis = 0;

if (executing) {
TaskMetrics metrics = taskMetrics.get(runnable);
if (metrics != null) {
executionTimeInMillis = metrics.getExecutionTimeMillis();
}
}

if (runnable instanceof TieBreakingPrioritizedRunnable) {
TieBreakingPrioritizedRunnable t = (TieBreakingPrioritizedRunnable) runnable;
Runnable innerRunnable = t.runnable;
Expand All @@ -122,28 +134,30 @@
innerRunnable can be null if task is finished but not removed from executor yet,
see {@link TieBreakingPrioritizedRunnable#run} and {@link TieBreakingPrioritizedRunnable#runAndClean}
*/
pending.add(new Pending(super.unwrap(innerRunnable), t.priority(), t.insertionOrder, executing));
pending.add(new Pending(super.unwrap(innerRunnable), t.priority(), t.insertionOrder, executing, executionTimeInMillis));
}
} else if (runnable instanceof PrioritizedFutureTask) {
PrioritizedFutureTask t = (PrioritizedFutureTask) runnable;
Object task = t.task;
if (t.task instanceof Runnable) {
task = super.unwrap((Runnable) t.task);
}
pending.add(new Pending(task, t.priority, t.insertionOrder, executing));
pending.add(new Pending(task, t.priority, t.insertionOrder, executing, executionTimeInMillis));

Check warning on line 145 in server/src/main/java/org/opensearch/common/util/concurrent/PrioritizedOpenSearchThreadPoolExecutor.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/util/concurrent/PrioritizedOpenSearchThreadPoolExecutor.java#L145

Added line #L145 was not covered by tests
}
}
}

@Override
protected void beforeExecute(Thread t, Runnable r) {
current.add(r);
taskMetrics.put(r, new TaskMetrics());
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
current.remove(r);
taskMetrics.remove(r); // Clean up metrics when task completes
}

public void execute(Runnable command, final TimeValue timeout, final Runnable timeoutCallback) {
Expand Down Expand Up @@ -211,12 +225,14 @@
public final Priority priority;
public final long insertionOrder;
public final boolean executing;
public final long executionTimeInMillis;

public Pending(Object task, Priority priority, long insertionOrder, boolean executing) {
public Pending(Object task, Priority priority, long insertionOrder, boolean executing, long executionTimeInMillis) {
this.task = task;
this.priority = priority;
this.insertionOrder = insertionOrder;
this.executing = executing;
this.executionTimeInMillis = executionTimeInMillis;
}
}

Expand Down Expand Up @@ -291,7 +307,6 @@
public Runnable unwrap() {
return runnable;
}

}

private static final class PrioritizedFutureTask<T> extends FutureTask<T> implements Comparable<PrioritizedFutureTask> {
Expand Down Expand Up @@ -324,4 +339,24 @@
}
}

/**
* Generic class to track various task metrics.
* This implementation tracks task execution time, but can be extended
* to include additional metrics.
*/
private static class TaskMetrics {
private final long startTimeNanos;

TaskMetrics() {
this.startTimeNanos = System.nanoTime();
}

/**
* Get the task execution time in milliseconds.
*/
long getExecutionTimeMillis() {
return TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNanos, TimeUnit.NANOSECONDS);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.concurrent.atomic.AtomicBoolean;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;

public class PrioritizedExecutorsTests extends OpenSearchTestCase {
Expand Down Expand Up @@ -231,6 +232,7 @@ public void testTimeout() throws Exception {
@Override
public void run() {
try {
Thread.sleep(50);
invoked.countDown();
block.await();
} catch (InterruptedException e) {
Expand All @@ -248,6 +250,7 @@ public String toString() {
assertThat(pending.length, equalTo(1));
assertThat(pending[0].task.toString(), equalTo("the blocking"));
assertThat(pending[0].executing, equalTo(true));
assertThat(pending[0].executionTimeInMillis, greaterThan(0L));

final AtomicBoolean executeCalled = new AtomicBoolean();
final CountDownLatch timedOut = new CountDownLatch(1);
Expand All @@ -272,8 +275,10 @@ public void run() {
assertThat(pending.length, equalTo(2));
assertThat(pending[0].task.toString(), equalTo("the blocking"));
assertThat(pending[0].executing, equalTo(true));
assertThat(pending[0].executionTimeInMillis, greaterThan(0L));
assertThat(pending[1].task.toString(), equalTo("the waiting"));
assertThat(pending[1].executing, equalTo(false));
assertThat(pending[1].executionTimeInMillis, equalTo(0L));

assertThat(timedOut.await(2, TimeUnit.SECONDS), equalTo(true));
block.countDown();
Expand Down
Loading