Skip to content

Commit 603d1a4

Browse files
author
Christoph Büscher
authored
Add stop rollup job support to HL REST Client (#34702)
This change adds support for stoping a rollup job to the High Level REST Client. Relates to #29827
1 parent c346a0f commit 603d1a4

File tree

12 files changed

+349
-20
lines changed

12 files changed

+349
-20
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/RollupClient.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,14 @@
2828
import org.elasticsearch.client.rollup.GetRollupJobResponse;
2929
import org.elasticsearch.client.rollup.GetRollupCapsRequest;
3030
import org.elasticsearch.client.rollup.GetRollupCapsResponse;
31+
import org.elasticsearch.client.rollup.GetRollupJobRequest;
32+
import org.elasticsearch.client.rollup.GetRollupJobResponse;
3133
import org.elasticsearch.client.rollup.PutRollupJobRequest;
3234
import org.elasticsearch.client.rollup.PutRollupJobResponse;
3335
import org.elasticsearch.client.rollup.StartRollupJobRequest;
3436
import org.elasticsearch.client.rollup.StartRollupJobResponse;
37+
import org.elasticsearch.client.rollup.StopRollupJobRequest;
38+
import org.elasticsearch.client.rollup.StopRollupJobResponse;
3539

3640
import java.io.IOException;
3741
import java.util.Collections;
@@ -118,6 +122,40 @@ public void startRollupJobAsync(StartRollupJobRequest request, RequestOptions op
118122
listener, Collections.emptySet());
119123
}
120124

125+
/**
126+
* Stop a rollup job
127+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/rollup-stop-job.html">
128+
* the docs</a> for more.
129+
* @param request the request
130+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
131+
* @return the response
132+
* @throws IOException in case there is a problem sending the request or parsing back the response
133+
*/
134+
public StopRollupJobResponse stopRollupJob(StopRollupJobRequest request, RequestOptions options) throws IOException {
135+
return restHighLevelClient.performRequestAndParseEntity(request,
136+
RollupRequestConverters::stopJob,
137+
options,
138+
StopRollupJobResponse::fromXContent,
139+
Collections.emptySet());
140+
}
141+
142+
/**
143+
* Asynchronously stop a rollup job
144+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/rollup-stop-job.html">
145+
* the docs</a> for more.
146+
* @param request the request
147+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
148+
* @param listener the listener to be notified upon request completion
149+
*/
150+
public void stopRollupJobAsync(StopRollupJobRequest request, RequestOptions options,
151+
ActionListener<StopRollupJobResponse> listener) {
152+
restHighLevelClient.performRequestAsyncAndParseEntity(request,
153+
RollupRequestConverters::stopJob,
154+
options,
155+
StopRollupJobResponse::fromXContent,
156+
listener, Collections.emptySet());
157+
}
158+
121159
/**
122160
* Delete a rollup job from the cluster
123161
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/rollup-delete-job.html">

client/rest-high-level/src/main/java/org/elasticsearch/client/RollupRequestConverters.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.client.rollup.GetRollupJobRequest;
2929
import org.elasticsearch.client.rollup.PutRollupJobRequest;
3030
import org.elasticsearch.client.rollup.StartRollupJobRequest;
31+
import org.elasticsearch.client.rollup.StopRollupJobRequest;
3132

3233
import java.io.IOException;
3334

@@ -55,8 +56,16 @@ static Request startJob(final StartRollupJobRequest startRollupJobRequest) throw
5556
.addPathPart(startRollupJobRequest.getJobId())
5657
.addPathPartAsIs("_start")
5758
.build();
58-
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
59-
return request;
59+
return new Request(HttpPost.METHOD_NAME, endpoint);
60+
}
61+
62+
static Request stopJob(final StopRollupJobRequest stopRollupJobRequest) throws IOException {
63+
String endpoint = new RequestConverters.EndpointBuilder()
64+
.addPathPartAsIs("_xpack", "rollup", "job")
65+
.addPathPart(stopRollupJobRequest.getJobId())
66+
.addPathPartAsIs("_stop")
67+
.build();
68+
return new Request(HttpPost.METHOD_NAME, endpoint);
6069
}
6170

6271
static Request getJob(final GetRollupJobRequest getRollupJobRequest) {

client/rest-high-level/src/main/java/org/elasticsearch/client/rollup/StartRollupJobResponse.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public class StartRollupJobResponse extends AcknowledgedResponse {
3030
private static final String PARSE_FIELD_NAME = "started";
3131

3232
private static final ConstructingObjectParser<StartRollupJobResponse, Void> PARSER = AcknowledgedResponse
33-
.generateParser("delete_rollup_job_response", StartRollupJobResponse::new, PARSE_FIELD_NAME);
33+
.generateParser("start_rollup_job_response", StartRollupJobResponse::new, PARSE_FIELD_NAME);
3434

3535
public StartRollupJobResponse(boolean acknowledged) {
3636
super(acknowledged);
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.client.rollup;
20+
21+
import org.elasticsearch.client.Validatable;
22+
23+
import java.util.Objects;
24+
25+
public class StopRollupJobRequest implements Validatable {
26+
27+
private final String jobId;
28+
29+
public StopRollupJobRequest(final String jobId) {
30+
this.jobId = Objects.requireNonNull(jobId, "id parameter must not be null");
31+
}
32+
33+
public String getJobId() {
34+
return jobId;
35+
}
36+
37+
@Override
38+
public boolean equals(Object o) {
39+
if (this == o) return true;
40+
if (o == null || getClass() != o.getClass()) return false;
41+
final StopRollupJobRequest that = (StopRollupJobRequest) o;
42+
return Objects.equals(jobId, that.jobId);
43+
}
44+
45+
@Override
46+
public int hashCode() {
47+
return Objects.hash(jobId);
48+
}
49+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.client.rollup;
21+
22+
import org.elasticsearch.client.core.AcknowledgedResponse;
23+
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
24+
import org.elasticsearch.common.xcontent.XContentParser;
25+
26+
import java.io.IOException;
27+
28+
public class StopRollupJobResponse extends AcknowledgedResponse {
29+
30+
private static final String PARSE_FIELD_NAME = "stopped";
31+
32+
private static final ConstructingObjectParser<StopRollupJobResponse, Void> PARSER = AcknowledgedResponse
33+
.generateParser("stop_rollup_job_response", StopRollupJobResponse::new, PARSE_FIELD_NAME);
34+
35+
public StopRollupJobResponse(boolean acknowledged) {
36+
super(acknowledged);
37+
}
38+
39+
public static StopRollupJobResponse fromXContent(final XContentParser parser) throws IOException {
40+
return PARSER.parse(parser, null);
41+
}
42+
43+
@Override
44+
protected String getFieldName() {
45+
return PARSE_FIELD_NAME;
46+
}
47+
}

client/rest-high-level/src/test/java/org/elasticsearch/client/RollupIT.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,12 @@
4141
import org.elasticsearch.client.rollup.GetRollupJobResponse.JobWrapper;
4242
import org.elasticsearch.client.rollup.PutRollupJobRequest;
4343
import org.elasticsearch.client.rollup.PutRollupJobResponse;
44-
import org.elasticsearch.client.rollup.StartRollupJobRequest;
45-
import org.elasticsearch.client.rollup.StartRollupJobResponse;
4644
import org.elasticsearch.client.rollup.RollableIndexCaps;
4745
import org.elasticsearch.client.rollup.RollupJobCaps;
46+
import org.elasticsearch.client.rollup.StartRollupJobRequest;
47+
import org.elasticsearch.client.rollup.StartRollupJobResponse;
48+
import org.elasticsearch.client.rollup.StopRollupJobRequest;
49+
import org.elasticsearch.client.rollup.StopRollupJobResponse;
4850
import org.elasticsearch.client.rollup.job.config.DateHistogramGroupConfig;
4951
import org.elasticsearch.client.rollup.job.config.GroupConfig;
5052
import org.elasticsearch.client.rollup.job.config.MetricConfig;
@@ -230,6 +232,11 @@ public void testPutStartAndGetRollupJob() throws Exception {
230232
assertThat(job.getStatus().getState(), either(equalTo(IndexerState.STARTED)).or(equalTo(IndexerState.INDEXING)));
231233
assertThat(job.getStatus().getCurrentPosition(), hasKey("date.date_histogram"));
232234
assertEquals(true, job.getStatus().getUpgradedDocumentId());
235+
236+
// stop the job
237+
StopRollupJobRequest stopRequest = new StopRollupJobRequest(id);
238+
StopRollupJobResponse stopResponse = execute(stopRequest, rollupClient::stopRollupJob, rollupClient::stopRollupJobAsync);
239+
assertTrue(stopResponse.isAcknowledged());
233240
}
234241

235242
public void testGetMissingRollupJob() throws Exception {

client/rest-high-level/src/test/java/org/elasticsearch/client/RollupRequestConvertersTests.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.client.rollup.GetRollupJobRequest;
2626
import org.elasticsearch.client.rollup.PutRollupJobRequest;
2727
import org.elasticsearch.client.rollup.StartRollupJobRequest;
28+
import org.elasticsearch.client.rollup.StopRollupJobRequest;
2829
import org.elasticsearch.client.rollup.job.config.RollupJobConfig;
2930
import org.elasticsearch.client.rollup.job.config.RollupJobConfigTests;
3031
import org.elasticsearch.test.ESTestCase;
@@ -61,6 +62,18 @@ public void testStartJob() throws IOException {
6162
assertThat(request.getEntity(), nullValue());
6263
}
6364

65+
public void testStopJob() throws IOException {
66+
String jobId = randomAlphaOfLength(5);
67+
68+
StopRollupJobRequest stopJob = new StopRollupJobRequest(jobId);
69+
70+
Request request = RollupRequestConverters.stopJob(stopJob);
71+
assertThat(request.getEndpoint(), equalTo("/_xpack/rollup/job/" + jobId + "/_stop"));
72+
assertThat(HttpPost.METHOD_NAME, equalTo(request.getMethod()));
73+
assertThat(request.getParameters().keySet(), empty());
74+
assertNull(request.getEntity());
75+
}
76+
6477
public void testGetJob() {
6578
boolean getAll = randomBoolean();
6679
String job = getAll ? "_all" : RequestConvertersTests.randomIndicesNames(1, 1)[0];

client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/RollupDocumentationIT.java

Lines changed: 52 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,7 @@
2929
import org.elasticsearch.action.index.IndexRequest;
3030
import org.elasticsearch.action.support.WriteRequest;
3131
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
32-
import org.elasticsearch.client.Request;
3332
import org.elasticsearch.client.RequestOptions;
34-
import org.elasticsearch.client.Response;
3533
import org.elasticsearch.client.RestHighLevelClient;
3634
import org.elasticsearch.client.RollupClient;
3735
import org.elasticsearch.client.rollup.DeleteRollupJobRequest;
@@ -51,6 +49,8 @@
5149
import org.elasticsearch.client.rollup.RollupJobCaps;
5250
import org.elasticsearch.client.rollup.StartRollupJobRequest;
5351
import org.elasticsearch.client.rollup.StartRollupJobResponse;
52+
import org.elasticsearch.client.rollup.StopRollupJobRequest;
53+
import org.elasticsearch.client.rollup.StopRollupJobResponse;
5454
import org.elasticsearch.client.rollup.job.config.DateHistogramGroupConfig;
5555
import org.elasticsearch.client.rollup.job.config.GroupConfig;
5656
import org.elasticsearch.client.rollup.job.config.HistogramGroupConfig;
@@ -237,59 +237,96 @@ public void onFailure(Exception e) {
237237
assertTrue(latch.await(30L, TimeUnit.SECONDS));
238238
}
239239

240-
241240
@SuppressWarnings("unused")
242241
public void testStartRollupJob() throws Exception {
243242
testCreateRollupJob();
244243
RestHighLevelClient client = highLevelClient();
245-
246244
String id = "job_1";
247245
// tag::rollup-start-job-request
248246
StartRollupJobRequest request = new StartRollupJobRequest(id); // <1>
249247
// end::rollup-start-job-request
250-
251-
252248
try {
253249
// tag::rollup-start-job-execute
254250
RollupClient rc = client.rollup();
255251
StartRollupJobResponse response = rc.startRollupJob(request, RequestOptions.DEFAULT);
256252
// end::rollup-start-job-execute
257-
258253
// tag::rollup-start-job-response
259254
response.isAcknowledged(); // <1>
260255
// end::rollup-start-job-response
261256
} catch (Exception e) {
262257
// Swallow any exception, this test does not test actually cancelling.
263258
}
264-
265259
// tag::rollup-start-job-execute-listener
266260
ActionListener<StartRollupJobResponse> listener = new ActionListener<StartRollupJobResponse>() {
267261
@Override
268262
public void onResponse(StartRollupJobResponse response) {
269263
// <1>
270264
}
271-
272265
@Override
273266
public void onFailure(Exception e) {
274267
// <2>
275268
}
276269
};
277270
// end::rollup-start-job-execute-listener
278-
279271
final CountDownLatch latch = new CountDownLatch(1);
280272
listener = new LatchedActionListener<>(listener, latch);
281-
282273
// tag::rollup-start-job-execute-async
283274
RollupClient rc = client.rollup();
284275
rc.startRollupJobAsync(request, RequestOptions.DEFAULT, listener); // <1>
285276
// end::rollup-start-job-execute-async
286-
287277
assertTrue(latch.await(30L, TimeUnit.SECONDS));
288278

289279
// stop job so it can correctly be deleted by the test teardown
290-
// TODO Replace this with the Rollup Stop Job API
291-
Response stoptResponse = client().performRequest(new Request("POST", "/_xpack/rollup/job/" + id + "/_stop"));
292-
assertEquals(RestStatus.OK.getStatus(), stoptResponse.getStatusLine().getStatusCode());
280+
rc.stopRollupJob(new StopRollupJobRequest(id), RequestOptions.DEFAULT);
281+
}
282+
283+
@SuppressWarnings("unused")
284+
public void testStopRollupJob() throws Exception {
285+
testCreateRollupJob();
286+
RestHighLevelClient client = highLevelClient();
287+
288+
String id = "job_1";
289+
// tag::rollup-stop-job-request
290+
StopRollupJobRequest request = new StopRollupJobRequest(id); // <1>
291+
// end::rollup-stop-job-request
292+
293+
294+
try {
295+
// tag::rollup-stop-job-execute
296+
RollupClient rc = client.rollup();
297+
StopRollupJobResponse response = rc.stopRollupJob(request, RequestOptions.DEFAULT);
298+
// end::rollup-stop-job-execute
299+
300+
// tag::rollup-stop-job-response
301+
response.isAcknowledged(); // <1>
302+
// end::rollup-stop-job-response
303+
} catch (Exception e) {
304+
// Swallow any exception, this test does not test actually cancelling.
305+
}
306+
307+
// tag::rollup-stop-job-execute-listener
308+
ActionListener<StopRollupJobResponse> listener = new ActionListener<StopRollupJobResponse>() {
309+
@Override
310+
public void onResponse(StopRollupJobResponse response) {
311+
// <1>
312+
}
313+
314+
@Override
315+
public void onFailure(Exception e) {
316+
// <2>
317+
}
318+
};
319+
// end::rollup-stop-job-execute-listener
320+
321+
final CountDownLatch latch = new CountDownLatch(1);
322+
listener = new LatchedActionListener<>(listener, latch);
323+
324+
// tag::rollup-stop-job-execute-async
325+
RollupClient rc = client.rollup();
326+
rc.stopRollupJobAsync(request, RequestOptions.DEFAULT, listener); // <1>
327+
// end::rollup-stop-job-execute-async
328+
329+
assertTrue(latch.await(30L, TimeUnit.SECONDS));
293330
}
294331

295332
@SuppressWarnings("unused")

0 commit comments

Comments
 (0)