Skip to content

Commit d20bbda

Browse files
authored
Bulk Api support for global parameters (#35701)
Bulk Request in High level rest client should be consistent with what is possible in Rest API, therefore should support global parameters. Global parameters are passed in URL in Rest API. Some parameters are mandatory - index, type - and would fail validation if not provided before before the bulk is executed. Optional parameters - routing, pipeline. The usage of these should be consistent across sync/async execution, bulk processor and BulkRequestBuilder closes #26026 backport of #34528
1 parent fa7679e commit d20bbda

File tree

21 files changed

+712
-41
lines changed

21 files changed

+712
-41
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,8 @@ static Request bulk(BulkRequest bulkRequest) throws IOException {
122122
Params parameters = new Params(request);
123123
parameters.withTimeout(bulkRequest.timeout());
124124
parameters.withRefreshPolicy(bulkRequest.getRefreshPolicy());
125-
125+
parameters.withPipeline(bulkRequest.pipeline());
126+
parameters.withRouting(bulkRequest.routing());
126127
// Bulk API only supports newline delimited JSON or Smile. Before executing
127128
// the bulk, we need to check that all requests have the same content-type
128129
// and this content-type is supported by the Bulk API.

client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorIT.java

Lines changed: 125 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,18 @@
2828
import org.elasticsearch.action.get.MultiGetRequest;
2929
import org.elasticsearch.action.get.MultiGetResponse;
3030
import org.elasticsearch.action.index.IndexRequest;
31+
import org.elasticsearch.action.search.SearchRequest;
3132
import org.elasticsearch.common.Strings;
3233
import org.elasticsearch.common.bytes.BytesArray;
3334
import org.elasticsearch.common.unit.ByteSizeUnit;
3435
import org.elasticsearch.common.unit.ByteSizeValue;
3536
import org.elasticsearch.common.unit.TimeValue;
3637
import org.elasticsearch.common.xcontent.XContentType;
37-
import org.elasticsearch.common.xcontent.json.JsonXContent;
38+
import org.elasticsearch.search.SearchHit;
39+
import org.hamcrest.Matcher;
40+
import org.hamcrest.Matchers;
3841

42+
import java.io.IOException;
3943
import java.util.Arrays;
4044
import java.util.HashSet;
4145
import java.util.List;
@@ -44,10 +48,19 @@
4448
import java.util.concurrent.CountDownLatch;
4549
import java.util.concurrent.TimeUnit;
4650
import java.util.concurrent.atomic.AtomicInteger;
47-
51+
import java.util.stream.IntStream;
52+
53+
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
54+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.fieldFromSource;
55+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasId;
56+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasIndex;
57+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasProperty;
58+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasType;
4859
import static org.hamcrest.Matchers.both;
60+
import static org.hamcrest.Matchers.containsInAnyOrder;
4961
import static org.hamcrest.Matchers.either;
5062
import static org.hamcrest.Matchers.equalTo;
63+
import static org.hamcrest.Matchers.everyItem;
5164
import static org.hamcrest.Matchers.greaterThan;
5265
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
5366
import static org.hamcrest.Matchers.is;
@@ -268,23 +281,124 @@ public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception
268281
assertMultiGetResponse(highLevelClient().mget(multiGetRequest, RequestOptions.DEFAULT), testDocs);
269282
}
270283

271-
private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) throws Exception {
284+
@SuppressWarnings("unchecked")
285+
public void testGlobalParametersAndSingleRequest() throws Exception {
286+
createIndexWithMultipleShards("test");
287+
288+
final CountDownLatch latch = new CountDownLatch(1);
289+
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
290+
createFieldAddingPipleine("pipeline_id", "fieldNameXYZ", "valueXYZ");
291+
292+
// tag::bulk-processor-mix-parameters
293+
try (BulkProcessor processor = initBulkProcessorBuilder(listener)
294+
.setGlobalIndex("tweets")
295+
.setGlobalType("_doc")
296+
.setGlobalRouting("routing")
297+
.setGlobalPipeline("pipeline_id")
298+
.build()) {
299+
300+
301+
processor.add(new IndexRequest() // <1>
302+
.source(XContentType.JSON, "user", "some user"));
303+
processor.add(new IndexRequest("blogs", "post_type", "1") // <2>
304+
.source(XContentType.JSON, "title", "some title"));
305+
}
306+
// end::bulk-request-mix-pipeline
307+
latch.await();
308+
309+
Iterable<SearchHit> hits = searchAll(new SearchRequest("tweets").routing("routing"));
310+
assertThat(hits, everyItem(hasProperty(fieldFromSource("user"), equalTo("some user"))));
311+
assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
312+
313+
314+
Iterable<SearchHit> blogs = searchAll(new SearchRequest("blogs").routing("routing"));
315+
assertThat(blogs, everyItem(hasProperty(fieldFromSource("title"), equalTo("some title"))));
316+
assertThat(blogs, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
317+
}
318+
319+
@SuppressWarnings("unchecked")
320+
public void testGlobalParametersAndBulkProcessor() throws Exception {
321+
createIndexWithMultipleShards("test");
322+
323+
final CountDownLatch latch = new CountDownLatch(1);
324+
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
325+
createFieldAddingPipleine("pipeline_id", "fieldNameXYZ", "valueXYZ");
326+
327+
int numDocs = randomIntBetween(10, 10);
328+
try (BulkProcessor processor = initBulkProcessorBuilder(listener)
329+
//let's make sure that the bulk action limit trips, one single execution will index all the documents
330+
.setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs)
331+
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
332+
.setGlobalIndex("test")
333+
.setGlobalType("test")
334+
.setGlobalRouting("routing")
335+
.setGlobalPipeline("pipeline_id")
336+
.build()) {
337+
338+
indexDocs(processor, numDocs, null, null, "test", "test", "pipeline_id");
339+
latch.await();
340+
341+
assertThat(listener.beforeCounts.get(), equalTo(1));
342+
assertThat(listener.afterCounts.get(), equalTo(1));
343+
assertThat(listener.bulkFailures.size(), equalTo(0));
344+
assertResponseItems(listener.bulkItems, numDocs);
345+
346+
Iterable<SearchHit> hits = searchAll(new SearchRequest("test").routing("routing"));
347+
348+
assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
349+
assertThat(hits, everyItem(Matchers.allOf(hasIndex("test"), hasType("test"))));
350+
assertThat(hits, containsInAnyOrder(expectedIds(numDocs)));
351+
}
352+
}
353+
354+
@SuppressWarnings("unchecked")
355+
private Matcher<SearchHit>[] expectedIds(int numDocs) {
356+
return IntStream.rangeClosed(1, numDocs)
357+
.boxed()
358+
.map(n -> hasId(n.toString()))
359+
.<Matcher<SearchHit>>toArray(Matcher[]::new);
360+
}
361+
362+
private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs, String localIndex, String localType,
363+
String globalIndex, String globalType, String globalPipeline) throws Exception {
272364
MultiGetRequest multiGetRequest = new MultiGetRequest();
273365
for (int i = 1; i <= numDocs; i++) {
274366
if (randomBoolean()) {
275-
processor.add(new IndexRequest("test", "test", Integer.toString(i))
276-
.source(XContentType.JSON, "field", randomRealisticUnicodeOfLengthBetween(1, 30)));
367+
processor.add(new IndexRequest(localIndex, localType, Integer.toString(i))
368+
.source(XContentType.JSON, "field", randomRealisticUnicodeOfLengthBetween(1, 30)));
277369
} else {
278-
final String source = "{ \"index\":{\"_index\":\"test\",\"_type\":\"test\",\"_id\":\"" + Integer.toString(i) + "\"} }\n"
279-
+ Strings.toString(JsonXContent.contentBuilder()
280-
.startObject().field("field", randomRealisticUnicodeOfLengthBetween(1, 30)).endObject()) + "\n";
281-
processor.add(new BytesArray(source), null, null, XContentType.JSON);
370+
BytesArray data = bytesBulkRequest(localIndex, localType, i);
371+
processor.add(data, globalIndex, globalType, globalPipeline, null, XContentType.JSON);
282372
}
283-
multiGetRequest.add("test", "test", Integer.toString(i));
373+
multiGetRequest.add(localIndex, localType, Integer.toString(i));
284374
}
285375
return multiGetRequest;
286376
}
287377

378+
private static BytesArray bytesBulkRequest(String localIndex, String localType, int id) throws IOException {
379+
String action = Strings.toString(jsonBuilder()
380+
.startObject()
381+
.startObject("index")
382+
.field("_index", localIndex)
383+
.field("_type", localType)
384+
.field("_id", Integer.toString(id))
385+
.endObject()
386+
.endObject()
387+
);
388+
String source = Strings.toString(jsonBuilder()
389+
.startObject()
390+
.field("field", randomRealisticUnicodeOfLengthBetween(1, 30))
391+
.endObject()
392+
);
393+
394+
String request = action + "\n" + source + "\n";
395+
return new BytesArray(request);
396+
}
397+
398+
private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) throws Exception {
399+
return indexDocs(processor, numDocs, "test", "test", null, null, null);
400+
}
401+
288402
private static void assertResponseItems(List<BulkItemResponse> bulkItemResponses, int numDocs) {
289403
assertThat(bulkItemResponses.size(), is(numDocs));
290404
int i = 1;
@@ -343,4 +457,5 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
343457
}
344458
}
345459

460+
346461
}

0 commit comments

Comments
 (0)