|
21 | 21 | import org.elasticsearch.action.ActionListener; |
22 | 22 | import org.elasticsearch.action.LatchedActionListener; |
23 | 23 | import org.elasticsearch.client.ESRestHighLevelClientTestCase; |
| 24 | +import org.elasticsearch.client.Request; |
24 | 25 | import org.elasticsearch.client.RequestOptions; |
| 26 | +import org.elasticsearch.client.Response; |
25 | 27 | import org.elasticsearch.client.RestHighLevelClient; |
| 28 | +import org.elasticsearch.client.watcher.AckWatchRequest; |
| 29 | +import org.elasticsearch.client.watcher.AckWatchResponse; |
| 30 | +import org.elasticsearch.client.watcher.ActionStatus; |
| 31 | +import org.elasticsearch.client.watcher.ActionStatus.AckStatus; |
| 32 | +import org.elasticsearch.client.watcher.WatchStatus; |
26 | 33 | import org.elasticsearch.common.bytes.BytesArray; |
27 | 34 | import org.elasticsearch.common.bytes.BytesReference; |
28 | 35 | import org.elasticsearch.common.xcontent.XContentType; |
29 | 36 | import org.elasticsearch.protocol.xpack.watcher.DeleteWatchRequest; |
30 | 37 | import org.elasticsearch.protocol.xpack.watcher.DeleteWatchResponse; |
31 | 38 | import org.elasticsearch.protocol.xpack.watcher.PutWatchRequest; |
32 | 39 | import org.elasticsearch.protocol.xpack.watcher.PutWatchResponse; |
| 40 | +import org.elasticsearch.rest.RestStatus; |
33 | 41 |
|
34 | 42 | import java.util.concurrent.CountDownLatch; |
35 | 43 | import java.util.concurrent.TimeUnit; |
@@ -132,4 +140,67 @@ public void onFailure(Exception e) { |
132 | 140 | } |
133 | 141 | } |
134 | 142 |
|
| 143 | + public void testAckWatch() throws Exception { |
| 144 | + RestHighLevelClient client = highLevelClient(); |
| 145 | + |
| 146 | + { |
| 147 | + BytesReference watch = new BytesArray("{ \n" + |
| 148 | + " \"trigger\": { \"schedule\": { \"interval\": \"10h\" } },\n" + |
| 149 | + " \"input\": { \"simple\": { \"foo\" : \"bar\" } },\n" + |
| 150 | + " \"actions\": { \"logme\": { \"logging\": { \"text\": \"{{ctx.payload}}\" } } }\n" + |
| 151 | + "}"); |
| 152 | + PutWatchRequest putWatchRequest = new PutWatchRequest("my_watch_id", watch, XContentType.JSON); |
| 153 | + client.watcher().putWatch(putWatchRequest, RequestOptions.DEFAULT); |
| 154 | + |
| 155 | + // TODO: use the high-level REST client here once it supports 'execute watch'. |
| 156 | + Request executeWatchRequest = new Request("POST", "_xpack/watcher/watch/my_watch_id/_execute"); |
| 157 | + executeWatchRequest.setJsonEntity("{ \"record_execution\": true }"); |
| 158 | + Response executeResponse = client().performRequest(executeWatchRequest); |
| 159 | + assertEquals(RestStatus.OK.getStatus(), executeResponse.getStatusLine().getStatusCode()); |
| 160 | + } |
| 161 | + |
| 162 | + { |
| 163 | + //tag::ack-watch-execute |
| 164 | + AckWatchRequest request = new AckWatchRequest("my_watch_id", // <1> |
| 165 | + "logme", "emailme"); // <2> |
| 166 | + AckWatchResponse response = client.watcher().ackWatch(request, RequestOptions.DEFAULT); |
| 167 | + //end::ack-watch-execute |
| 168 | + |
| 169 | + //tag::ack-watch-response |
| 170 | + WatchStatus watchStatus = response.getStatus(); |
| 171 | + ActionStatus actionStatus = watchStatus.actionStatus("logme"); // <1> |
| 172 | + AckStatus.State ackState = actionStatus.ackStatus().state(); // <2> |
| 173 | + //end::ack-watch-response |
| 174 | + |
| 175 | + assertEquals(AckStatus.State.ACKED, ackState); |
| 176 | + } |
| 177 | + |
| 178 | + { |
| 179 | + AckWatchRequest request = new AckWatchRequest("my_watch_id"); |
| 180 | + // tag::ack-watch-execute-listener |
| 181 | + ActionListener<AckWatchResponse> listener = new ActionListener<AckWatchResponse>() { |
| 182 | + @Override |
| 183 | + public void onResponse(AckWatchResponse response) { |
| 184 | + // <1> |
| 185 | + } |
| 186 | + |
| 187 | + @Override |
| 188 | + public void onFailure(Exception e) { |
| 189 | + // <2> |
| 190 | + } |
| 191 | + }; |
| 192 | + // end::ack-watch-execute-listener |
| 193 | + |
| 194 | + // For testing, replace the empty listener by a blocking listener. |
| 195 | + final CountDownLatch latch = new CountDownLatch(1); |
| 196 | + listener = new LatchedActionListener<>(listener, latch); |
| 197 | + |
| 198 | + // tag::ack-watch-execute-async |
| 199 | + client.watcher().ackWatchAsync(request, RequestOptions.DEFAULT, listener); // <1> |
| 200 | + // end::ack-watch-execute-async |
| 201 | + |
| 202 | + assertTrue(latch.await(30L, TimeUnit.SECONDS)); |
| 203 | + } |
| 204 | + } |
| 205 | + |
135 | 206 | } |
0 commit comments