diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java index 8b740994e3b6c..09006db1d8e04 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java @@ -1417,6 +1417,38 @@ private Resp internalPerformRequest(Req request, throw new IOException("Unable to parse response body for " + response, e); } } + + /** + * Defines a helper method for requests that can 404 and in which case will return an empty Optional + * otherwise tries to parse the response body + */ + protected final Optional performRequestAndParseOptionalEntity(Req request, + CheckedFunction requestConverter, + RequestOptions options, + CheckedFunction entityParser + ) throws IOException { + Optional validationException = request.validate(); + if (validationException != null && validationException.isPresent()) { + throw validationException.get(); + } + Request req = requestConverter.apply(request); + req.setOptions(options); + Response response; + try { + response = client.performRequest(req); + } catch (ResponseException e) { + if (RestStatus.NOT_FOUND.getStatus() == e.getResponse().getStatusLine().getStatusCode()) { + return Optional.empty(); + } + throw parseResponseException(e); + } + + try { + return Optional.of(parseEntity(response.getEntity(), entityParser)); + } catch (Exception e) { + throw new IOException("Unable to parse response body for " + response, e); + } + } /** * @deprecated If creating a new HLRC ReST API call, consider creating new actions instead of reusing server actions. The Validation @@ -1538,6 +1570,62 @@ public void onFailure(Exception exception) { } }; } + + /** + * Async request which returns empty Optionals in the case of 404s or parses entity into an Optional + */ + protected final void performRequestAsyncAndParseOptionalEntity(Req request, + CheckedFunction requestConverter, + RequestOptions options, + CheckedFunction entityParser, + ActionListener> listener) { + Optional validationException = request.validate(); + if (validationException != null && validationException.isPresent()) { + listener.onFailure(validationException.get()); + return; + } + Request req; + try { + req = requestConverter.apply(request); + } catch (Exception e) { + listener.onFailure(e); + return; + } + req.setOptions(options); + ResponseListener responseListener = wrapResponseListener404sOptional(response -> parseEntity(response.getEntity(), + entityParser), listener); + client.performRequestAsync(req, responseListener); + } + + final ResponseListener wrapResponseListener404sOptional(CheckedFunction responseConverter, + ActionListener> actionListener) { + return new ResponseListener() { + @Override + public void onSuccess(Response response) { + try { + actionListener.onResponse(Optional.of(responseConverter.apply(response))); + } catch (Exception e) { + IOException ioe = new IOException("Unable to parse response body for " + response, e); + onFailure(ioe); + } + } + + @Override + public void onFailure(Exception exception) { + if (exception instanceof ResponseException) { + ResponseException responseException = (ResponseException) exception; + Response response = responseException.getResponse(); + if (RestStatus.NOT_FOUND.getStatus() == response.getStatusLine().getStatusCode()) { + actionListener.onResponse(Optional.empty()); + } else { + actionListener.onFailure(parseResponseException(responseException)); + } + } else { + actionListener.onFailure(exception); + } + } + }; + } /** * Converts a {@link ResponseException} obtained from the low level REST client into an {@link ElasticsearchException}. diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/TasksClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/TasksClient.java index 3b957b2defb0d..4bf7565222a73 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/TasksClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/TasksClient.java @@ -24,8 +24,11 @@ import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; +import org.elasticsearch.client.tasks.GetTaskRequest; +import org.elasticsearch.client.tasks.GetTaskResponse; import java.io.IOException; +import java.util.Optional; import static java.util.Collections.emptySet; @@ -67,6 +70,34 @@ public void listAsync(ListTasksRequest request, RequestOptions options, ActionLi restHighLevelClient.performRequestAsyncAndParseEntity(request, TasksRequestConverters::listTasks, options, ListTasksResponse::fromXContent, listener, emptySet()); } + + /** + * Get a task using the Task Management API. + * See + * Task Management API on elastic.co + * @param request the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return the response + * @throws IOException in case there is a problem sending the request or parsing back the response + */ + public Optional get(GetTaskRequest request, RequestOptions options) throws IOException { + return restHighLevelClient.performRequestAndParseOptionalEntity(request, TasksRequestConverters::getTask, options, + GetTaskResponse::fromXContent); + } + + /** + * Get a task using the Task Management API. + * See + * Task Management API on elastic.co + * @param request the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @param listener an actionlistener that takes an optional response (404s are returned as an empty Optional) + */ + public void getAsync(GetTaskRequest request, RequestOptions options, ActionListener> listener) { + + restHighLevelClient.performRequestAsyncAndParseOptionalEntity(request, TasksRequestConverters::getTask, options, + GetTaskResponse::fromXContent, listener); + } /** * Cancel one or more cluster tasks using the Task Management API. diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/TasksRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/TasksRequestConverters.java index 45723dcc938c5..f0e9cf4e025f6 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/TasksRequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/TasksRequestConverters.java @@ -23,6 +23,8 @@ import org.apache.http.client.methods.HttpPost; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; +import org.elasticsearch.client.RequestConverters.EndpointBuilder; +import org.elasticsearch.client.tasks.GetTaskRequest; final class TasksRequestConverters { @@ -54,4 +56,16 @@ static Request listTasks(ListTasksRequest listTaskRequest) { .putParam("group_by", "none"); return request; } + + static Request getTask(GetTaskRequest getTaskRequest) { + String endpoint = new EndpointBuilder().addPathPartAsIs("_tasks") + .addPathPartAsIs(getTaskRequest.getNodeId() + ":" + Long.toString(getTaskRequest.getTaskId())) + .build(); + Request request = new Request(HttpGet.METHOD_NAME, endpoint); + RequestConverters.Params params = new RequestConverters.Params(request); + params.withTimeout(getTaskRequest.getTimeout()) + .withWaitForCompletion(getTaskRequest.getWaitForCompletion()); + return request; + } + } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/tasks/GetTaskRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/tasks/GetTaskRequest.java new file mode 100644 index 0000000000000..0dc3168937573 --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/tasks/GetTaskRequest.java @@ -0,0 +1,108 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client.tasks; + +import org.elasticsearch.client.Validatable; +import org.elasticsearch.client.ValidationException; +import org.elasticsearch.common.unit.TimeValue; + +import java.util.Objects; +import java.util.Optional; + +public class GetTaskRequest implements Validatable { + private final String nodeId; + private final long taskId; + private boolean waitForCompletion = false; + private TimeValue timeout = null; + + public GetTaskRequest(String nodeId, long taskId) { + this.nodeId = nodeId; + this.taskId = taskId; + } + + public String getNodeId() { + return nodeId; + } + + public long getTaskId() { + return taskId; + } + + /** + * Should this request wait for all found tasks to complete? + */ + public boolean getWaitForCompletion() { + return waitForCompletion; + } + + /** + * Should this request wait for all found tasks to complete? + */ + public GetTaskRequest setWaitForCompletion(boolean waitForCompletion) { + this.waitForCompletion = waitForCompletion; + return this; + } + + /** + * Timeout to wait for any async actions this request must take. It must take anywhere from 0 to 2. + */ + public TimeValue getTimeout() { + return timeout; + } + + /** + * Timeout to wait for any async actions this request must take. + */ + public GetTaskRequest setTimeout(TimeValue timeout) { + this.timeout = timeout; + return this; + } + + @Override + public Optional validate() { + final ValidationException validationException = new ValidationException(); + if (timeout != null && !waitForCompletion) { + validationException.addValidationError("Timeout settings are only accepted if waitForCompletion is also set"); + } + if (validationException.validationErrors().isEmpty()) { + return Optional.empty(); + } + return Optional.of(validationException); + } + + @Override + public int hashCode() { + return Objects.hash(nodeId, taskId, waitForCompletion, timeout); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + GetTaskRequest other = (GetTaskRequest) obj; + return Objects.equals(nodeId, other.nodeId) && + taskId == other.taskId && + waitForCompletion == other.waitForCompletion && + Objects.equals(timeout, other.timeout); + } +} diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/tasks/GetTaskResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/tasks/GetTaskResponse.java new file mode 100644 index 0000000000000..05d40f12f5b21 --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/tasks/GetTaskResponse.java @@ -0,0 +1,58 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.tasks; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.tasks.TaskInfo; + +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; + +public class GetTaskResponse { + private final boolean completed; + private final TaskInfo taskInfo; + public static final ParseField COMPLETED = new ParseField("completed"); + public static final ParseField TASK = new ParseField("task"); + + public GetTaskResponse(boolean completed, TaskInfo taskInfo) { + this.completed = completed; + this.taskInfo = taskInfo; + } + + public boolean isCompleted() { + return completed; + } + + public TaskInfo getTaskInfo() { + return taskInfo; + } + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("get_task", + true, a -> new GetTaskResponse((boolean) a[0], (TaskInfo) a[1])); + static { + PARSER.declareBoolean(constructorArg(), COMPLETED); + PARSER.declareObject(constructorArg(), (p, c) -> TaskInfo.fromXContent(p), TASK); + } + + public static GetTaskResponse fromXContent(XContentParser parser) { + return PARSER.apply(parser, null); + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CustomRestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CustomRestHighLevelClientTests.java index ff27fe21c27e6..3b69f10344a0d 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CustomRestHighLevelClientTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CustomRestHighLevelClientTests.java @@ -126,8 +126,11 @@ public void testMethodsVisibility() { "parseResponseException", "performRequest", "performRequestAndParseEntity", + "performRequestAndParseOptionalEntity", "performRequestAsync", - "performRequestAsyncAndParseEntity"}; + "performRequestAsyncAndParseEntity", + "performRequestAsyncAndParseOptionalEntity" + }; final Set protectedMethods = Arrays.stream(RestHighLevelClient.class.getDeclaredMethods()) .filter(method -> Modifier.isProtected(method.getModifiers())) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java index 2c5d279592f48..62e6afe6ffb97 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java @@ -97,6 +97,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -675,8 +676,7 @@ public void testApiNamingConventions() throws Exception { "indices.put_alias", "mtermvectors", "render_search_template", - "scripts_painless_execute", - "tasks.get" + "scripts_painless_execute" }; //These API are not required for high-level client feature completeness String[] notRequiredApi = new String[] { @@ -777,8 +777,11 @@ private void assertSyncMethod(Method method, String apiName) { assertThat("the return type for method [" + method + "] is incorrect", method.getReturnType().getSimpleName(), equalTo("boolean")); } else { - assertThat("the return type for method [" + method + "] is incorrect", - method.getReturnType().getSimpleName(), endsWith("Response")); + // It's acceptable for 404s to be represented as empty Optionals + if (!method.getReturnType().isAssignableFrom(Optional.class)) { + assertThat("the return type for method [" + method + "] is incorrect", + method.getReturnType().getSimpleName(), endsWith("Response")); + } } assertEquals("incorrect number of exceptions for method [" + method + "]", 1, method.getExceptionTypes().length); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksIT.java index baa97cfa5b4ef..52e6d7d3bd756 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksIT.java @@ -24,10 +24,21 @@ import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; +import org.elasticsearch.client.tasks.GetTaskRequest; +import org.elasticsearch.client.tasks.GetTaskResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskInfo; import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; import static java.util.Collections.emptyList; import static org.hamcrest.Matchers.equalTo; @@ -60,6 +71,56 @@ public void testListTasks() throws IOException { } assertTrue("List tasks were not found", listTasksFound); } + + public void testGetValidTask() throws IOException { + + // Run a Reindex to create a task + + final String sourceIndex = "source1"; + final String destinationIndex = "dest"; + Settings settings = Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0).build(); + createIndex(sourceIndex, settings); + createIndex(destinationIndex, settings); + BulkRequest bulkRequest = new BulkRequest() + .add(new IndexRequest(sourceIndex, "type", "1").source(Collections.singletonMap("foo", "bar"), XContentType.JSON)) + .add(new IndexRequest(sourceIndex, "type", "2").source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON)) + .setRefreshPolicy(RefreshPolicy.IMMEDIATE); + assertEquals(RestStatus.OK, highLevelClient().bulk(bulkRequest, RequestOptions.DEFAULT).status()); + + // (need to use low level client because currently high level client + // doesn't support async return of task id - needs + // https://github.com/elastic/elasticsearch/pull/35202 ) + RestClient lowClient = highLevelClient().getLowLevelClient(); + Request request = new Request("POST", "_reindex"); + request.addParameter("wait_for_completion", "false"); + request.setJsonEntity("{" + " \"source\": {\n" + " \"index\": \"source1\"\n" + " },\n" + " \"dest\": {\n" + + " \"index\": \"dest\"\n" + " }" + "}"); + Response response = lowClient.performRequest(request); + Map map = entityAsMap(response); + Object taskId = map.get("task"); + assertNotNull(taskId); + + TaskId childTaskId = new TaskId(taskId.toString()); + GetTaskRequest gtr = new GetTaskRequest(childTaskId.getNodeId(), childTaskId.getId()); + gtr.setWaitForCompletion(randomBoolean()); + Optional getTaskResponse = execute(gtr, highLevelClient().tasks()::get, highLevelClient().tasks()::getAsync); + assertTrue(getTaskResponse.isPresent()); + GetTaskResponse taskResponse = getTaskResponse.get(); + if (gtr.getWaitForCompletion()) { + assertTrue(taskResponse.isCompleted()); + } + TaskInfo info = taskResponse.getTaskInfo(); + assertTrue(info.isCancellable()); + assertEquals("reindex from [source1] to [dest]", info.getDescription()); + assertEquals("indices:data/write/reindex", info.getAction()); + } + + public void testGetInvalidTask() throws IOException { + // Check 404s are returned as empty Optionals + GetTaskRequest gtr = new GetTaskRequest("doesNotExistNodeName", 123); + Optional getTaskResponse = execute(gtr, highLevelClient().tasks()::get, highLevelClient().tasks()::getAsync); + assertFalse(getTaskResponse.isPresent()); + } public void testCancelTasks() throws IOException { ListTasksRequest listRequest = new ListTasksRequest(); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/core/tasks/GetTaskResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/core/tasks/GetTaskResponseTests.java new file mode 100644 index 0000000000000..60a52a7ff3d37 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/core/tasks/GetTaskResponseTests.java @@ -0,0 +1,107 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.core.tasks; + +import org.elasticsearch.client.Requests; +import org.elasticsearch.client.tasks.GetTaskResponse; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.tasks.RawTaskStatus; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskInfo; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester; + +public class GetTaskResponseTests extends ESTestCase { + + public void testFromXContent() throws IOException { + xContentTester( + this::createParser, + this::createTestInstance, + this::toXContent, + GetTaskResponse::fromXContent) + .supportsUnknownFields(true) + .assertEqualsConsumer(this::assertEqualInstances) + .assertToXContentEquivalence(true) + .randomFieldsExcludeFilter(field ->field.endsWith("headers") || field.endsWith("status")) + .test(); + } + + private GetTaskResponse createTestInstance() { + return new GetTaskResponse(randomBoolean(), randomTaskInfo()); + } + + private void toXContent(GetTaskResponse response, XContentBuilder builder) throws IOException { + builder.startObject(); + { + builder.field(GetTaskResponse.COMPLETED.getPreferredName(), response.isCompleted()); + builder.startObject(GetTaskResponse.TASK.getPreferredName()); + response.getTaskInfo().toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + } + builder.endObject(); + } + + private void assertEqualInstances(GetTaskResponse expectedInstance, GetTaskResponse newInstance) { + assertEquals(expectedInstance.isCompleted(), newInstance.isCompleted()); + assertEquals(expectedInstance.getTaskInfo(), newInstance.getTaskInfo()); + } + + static TaskInfo randomTaskInfo() { + TaskId taskId = randomTaskId(); + String type = randomAlphaOfLength(5); + String action = randomAlphaOfLength(5); + Task.Status status = randomBoolean() ? randomRawTaskStatus() : null; + String description = randomBoolean() ? randomAlphaOfLength(5) : null; + long startTime = randomLong(); + long runningTimeNanos = randomLong(); + boolean cancellable = randomBoolean(); + TaskId parentTaskId = randomBoolean() ? TaskId.EMPTY_TASK_ID : randomTaskId(); + Map headers = randomBoolean() ? + Collections.emptyMap() : + Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5)); + return new TaskInfo(taskId, type, action, description, status, startTime, runningTimeNanos, cancellable, parentTaskId, headers); + } + + private static TaskId randomTaskId() { + return new TaskId(randomAlphaOfLength(5), randomLong()); + } + + private static RawTaskStatus randomRawTaskStatus() { + try (XContentBuilder builder = XContentBuilder.builder(Requests.INDEX_CONTENT_TYPE.xContent())) { + builder.startObject(); + int fields = between(0, 10); + for (int f = 0; f < fields; f++) { + builder.field(randomAlphaOfLength(5), randomAlphaOfLength(5)); + } + builder.endObject(); + return new RawTaskStatus(BytesReference.bytes(builder)); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } +}