Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;

import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;

public class BulkProcessorRetryIT extends ESRestHighLevelClientTestCase {

private static final String INDEX_NAME = "index";
private static final String TYPE_NAME = "type";

static {
System.setProperty("tests.rest.cluster", "localhost:9200");
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this static block should go away. I don't see why it's needed.


private static BulkProcessor.Builder initBulkProcessorBuilder(BulkProcessor.Listener listener) {
return BulkProcessor.builder(highLevelClient()::bulkAsync, listener);
}

public void testBulkRejectionLoadWithoutBackoff() throws Exception {
boolean rejectedExecutionExpected = true;
executeBulkRejectionLoad(BackoffPolicy.noBackoff(), rejectedExecutionExpected);
}

public void testBulkRejectionLoadWithBackoff() throws Throwable {
boolean rejectedExecutionExpected = false;
executeBulkRejectionLoad(BackoffPolicy.exponentialBackoff(), rejectedExecutionExpected);
}

private void executeBulkRejectionLoad(BackoffPolicy backoffPolicy, boolean rejectedExecutionExpected) throws Exception {
final CorrelatingBackoffPolicy internalPolicy = new CorrelatingBackoffPolicy(backoffPolicy);
final int numberOfAsyncOps = randomIntBetween(600, 700);
final CountDownLatch latch = new CountDownLatch(numberOfAsyncOps);
final Set<Object> responses = Collections.newSetFromMap(new ConcurrentHashMap<>());

BulkProcessor bulkProcessor = initBulkProcessorBuilder(new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
}

@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
internalPolicy.logResponse(response);
responses.add(response);
latch.countDown();
}

@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
responses.add(failure);
latch.countDown();
}
}).setBulkActions(1)
.setConcurrentRequests(randomIntBetween(0, 100))
.setBackoffPolicy(internalPolicy)
.build();

MultiGetRequest multiGetRequest = indexDocs(bulkProcessor, numberOfAsyncOps);
latch.await(10, TimeUnit.SECONDS);
bulkProcessor.close();

assertEquals(responses.size(), numberOfAsyncOps);

boolean rejectedAfterAllRetries = false;
for (Object response : responses) {
if (response instanceof BulkResponse) {
BulkResponse bulkResponse = (BulkResponse) response;
for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) {
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
if (failure.getStatus() == RestStatus.TOO_MANY_REQUESTS) {
if (!rejectedExecutionExpected) {
Throwable rootCause = ExceptionsHelper.unwrapCause(failure.getCause());
Iterator<TimeValue> backoffState = internalPolicy.backoffStateFor(bulkResponse);
assertNotNull("backoffState is null (indicates a bulk request got rejected without retry)", backoffState);
if (backoffState.hasNext()) {
// we're not expecting that we overwhelmed it even once when we maxed out the number of retries
throw new AssertionError("Got rejected although backoff policy would allow more retries", rootCause);
} else {
rejectedAfterAllRetries = true;
logger.debug("We maxed out the number of bulk retries and got rejected (this is ok).");
}
}
} else {
throw new AssertionError("Unexpected failure with status: " + failure.getStatus());
}
}
}
} else {
Throwable t = (Throwable) response;
// we're not expecting any other errors
throw new AssertionError("Unexpected failure", t);
}
}

highLevelClient().indices().refresh(new RefreshRequest());
int searchResultCount = highLevelClient().multiGet(multiGetRequest).getResponses().length;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you rename this variable, this is not about search anymore, rather the result of multi_get


if (rejectedAfterAllRetries) {
assertThat(searchResultCount, lessThan(numberOfAsyncOps));
} else {
assertThat(searchResultCount, equalTo(numberOfAsyncOps));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the assertions seem more accurate here, thanks! Would you mind making the same change in the original test for the transport client? It should work there too right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes !


}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think that we should also have the last check based on the search API and the returned total hits? Or maybe now that we are using multi_get that step is not necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think the multi get request we prepared when we indexed the documents is doing this, the same thing ? do a search (multi get) for all the indices in the end, to compare ? Seems in rest high level tests we are using rather multi get.


private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) {
MultiGetRequest multiGetRequest = new MultiGetRequest();
for (int i = 1; i <= numDocs; i++) {
processor.add(new IndexRequest(INDEX_NAME, TYPE_NAME, Integer.toString(i))
.source(XContentType.JSON, "field", randomRealisticUnicodeOfCodepointLengthBetween(1, 30)));
multiGetRequest.add(INDEX_NAME, TYPE_NAME, Integer.toString(i));
}
return multiGetRequest;
}

/**
* Internal helper class to correlate backoff states with bulk responses. This is needed to check whether we maxed out the number
* of retries but still got rejected (which is perfectly fine and can also happen from time to time under heavy load).
*
* This implementation relies on an implementation detail in Retry, namely that the bulk listener is notified on the same thread
* as the last call to the backoff policy's iterator. The advantage is that this is non-invasive to the rest of the production code.
*/
private static class CorrelatingBackoffPolicy extends BackoffPolicy {
private final Map<BulkResponse, Iterator<TimeValue>> correlations = new ConcurrentHashMap<>();
// this is intentionally *not* static final. We will only ever have one instance of this class per test case and want the
// thread local to be eligible for garbage collection right after the test to avoid leaks.
private final ThreadLocal<Iterator<TimeValue>> iterators = new ThreadLocal<>();

private final BackoffPolicy delegate;

private CorrelatingBackoffPolicy(BackoffPolicy delegate) {
this.delegate = delegate;
}

public Iterator<TimeValue> backoffStateFor(BulkResponse response) {
return correlations.get(response);
}

// Assumption: This method is called from the same thread as the last call to the internal iterator's #hasNext() / #next()
// see also Retry.AbstractRetryHandler#onResponse().
public void logResponse(BulkResponse response) {
Iterator<TimeValue> iterator = iterators.get();
// did we ever retry?
if (iterator != null) {
// we should correlate any iterator only once
iterators.remove();
correlations.put(response, iterator);
}
}

@Override
public Iterator<TimeValue> iterator() {
return new CorrelatingIterator(iterators, delegate.iterator());
}

private static class CorrelatingIterator implements Iterator<TimeValue> {
private final Iterator<TimeValue> delegate;
private final ThreadLocal<Iterator<TimeValue>> iterators;

private CorrelatingIterator(ThreadLocal<Iterator<TimeValue>> iterators, Iterator<TimeValue> delegate) {
this.iterators = iterators;
this.delegate = delegate;
}

@Override
public boolean hasNext() {
// update on every invocation as we might get rescheduled on a different thread. Unfortunately, there is a chance that
// we pollute the thread local map with stale values. Due to the implementation of Retry and the life cycle of the
// enclosing class CorrelatingBackoffPolicy this should not pose a major problem though.
iterators.set(this);
return delegate.hasNext();
}

@Override
public TimeValue next() {
// update on every invocation
iterators.set(this);
return delegate.next();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.Retry;
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.ParentTaskAssigningClient;
Expand All @@ -41,7 +40,6 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.IndexFieldMapper;
Expand All @@ -50,6 +48,8 @@
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.TypeFieldMapper;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService;
Expand All @@ -76,8 +76,8 @@
import static java.util.Collections.emptyList;
import static java.util.Collections.unmodifiableList;
import static org.elasticsearch.action.bulk.BackoffPolicy.exponentialBackoff;
import static org.elasticsearch.index.reindex.AbstractBulkByScrollRequest.SIZE_ALL_MATCHES;
import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
import static org.elasticsearch.index.reindex.AbstractBulkByScrollRequest.SIZE_ALL_MATCHES;
import static org.elasticsearch.rest.RestStatus.CONFLICT;
import static org.elasticsearch.search.sort.SortBuilders.fieldSort;

Expand Down Expand Up @@ -140,7 +140,7 @@ public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, Logger logger, Par
this.mainRequest = mainRequest;
this.listener = listener;
BackoffPolicy backoffPolicy = buildBackoffPolicy();
bulkRetry = new Retry(EsRejectedExecutionException.class, BackoffPolicy.wrap(backoffPolicy, worker::countBulkRetry), threadPool);
bulkRetry = new Retry(RestStatus.TOO_MANY_REQUESTS, BackoffPolicy.wrap(backoffPolicy, worker::countBulkRetry), threadPool);
scrollSource = buildScrollableResultSource(backoffPolicy);
scriptApplier = Objects.requireNonNull(buildScriptApplier(), "script applier must not be null");
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Netty4Plugin;
Expand Down Expand Up @@ -183,7 +184,7 @@ private void testCase(
bulk.add(client().prepareIndex("source", "test").setSource("foo", "bar " + i));
}

Retry retry = new Retry(EsRejectedExecutionException.class, BackoffPolicy.exponentialBackoff(), client().threadPool());
Retry retry = new Retry(RestStatus.TOO_MANY_REQUESTS, BackoffPolicy.exponentialBackoff(), client().threadPool());
BulkResponse initialBulkResponse = retry.withBackoff(client()::bulk, bulk.request(), client().settings()).actionGet();
assertFalse(initialBulkResponse.buildFailureMessage(), initialBulkResponse.hasFailures());
client().admin().indices().prepareRefresh("source").get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.Scheduler;

import java.util.concurrent.CountDownLatch;
Expand All @@ -49,7 +49,7 @@ public final class BulkRequestHandler {
this.consumer = consumer;
this.listener = listener;
this.concurrentRequests = concurrentRequests;
this.retry = new Retry(EsRejectedExecutionException.class, backoffPolicy, scheduler);
this.retry = new Retry(RestStatus.TOO_MANY_REQUESTS, backoffPolicy, scheduler);
this.semaphore = new Semaphore(concurrentRequests > 0 ? concurrentRequests : 1);
}

Expand Down
21 changes: 10 additions & 11 deletions server/src/main/java/org/elasticsearch/action/bulk/Retry.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
package org.elasticsearch.action.bulk;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;

Expand All @@ -40,12 +40,12 @@
* Encapsulates synchronous and asynchronous retry logic.
*/
public class Retry {
private final Class<? extends Throwable> retryOnThrowable;
private final RestStatus retryOnStatus;
private final BackoffPolicy backoffPolicy;
private final Scheduler scheduler;

public Retry(Class<? extends Throwable> retryOnThrowable, BackoffPolicy backoffPolicy, Scheduler scheduler) {
this.retryOnThrowable = retryOnThrowable;
public Retry(RestStatus retryOnStatus, BackoffPolicy backoffPolicy, Scheduler scheduler) {
this.retryOnStatus = retryOnStatus;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we consider making this hardcoded rather than an argument given that we always pass in the same value for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, as it's only used here in bulk for this case.

this.backoffPolicy = backoffPolicy;
this.scheduler = scheduler;
}
Expand All @@ -60,7 +60,7 @@ public Retry(Class<? extends Throwable> retryOnThrowable, BackoffPolicy backoffP
*/
public void withBackoff(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BulkRequest bulkRequest,
ActionListener<BulkResponse> listener, Settings settings) {
RetryHandler r = new RetryHandler(retryOnThrowable, backoffPolicy, consumer, listener, settings, scheduler);
RetryHandler r = new RetryHandler(retryOnStatus, backoffPolicy, consumer, listener, settings, scheduler);
r.execute(bulkRequest);
}

Expand All @@ -86,7 +86,7 @@ static class RetryHandler implements ActionListener<BulkResponse> {
private final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer;
private final ActionListener<BulkResponse> listener;
private final Iterator<TimeValue> backoff;
private final Class<? extends Throwable> retryOnThrowable;
private final RestStatus retryOnStatus;
// Access only when holding a client-side lock, see also #addResponses()
private final List<BulkItemResponse> responses = new ArrayList<>();
private final long startTimestampNanos;
Expand All @@ -95,10 +95,10 @@ static class RetryHandler implements ActionListener<BulkResponse> {
private volatile BulkRequest currentBulkRequest;
private volatile ScheduledFuture<?> scheduledRequestFuture;

RetryHandler(Class<? extends Throwable> retryOnThrowable, BackoffPolicy backoffPolicy,
RetryHandler(RestStatus retryOnStatus, BackoffPolicy backoffPolicy,
BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, ActionListener<BulkResponse> listener,
Settings settings, Scheduler scheduler) {
this.retryOnThrowable = retryOnThrowable;
this.retryOnStatus = retryOnStatus;
this.backoff = backoffPolicy.iterator();
this.consumer = consumer;
this.listener = listener;
Expand Down Expand Up @@ -160,9 +160,8 @@ private boolean canRetry(BulkResponse bulkItemResponses) {
}
for (BulkItemResponse bulkItemResponse : bulkItemResponses) {
if (bulkItemResponse.isFailed()) {
final Throwable cause = bulkItemResponse.getFailure().getCause();
final Throwable rootCause = ExceptionsHelper.unwrapCause(cause);
if (!rootCause.getClass().equals(retryOnThrowable)) {
final RestStatus status = bulkItemResponse.getFailure().getStatus();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to double check: we don't need to unwrap here anymore because the status of the root cause is propagated to its ancestor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we get the status field of BulkItemResponse's Failure, it is a seperate field than Exception cause in Failure class. So I find it always has the good value (RestStatus.TOO_MANY_REQUESTS) ? because the exception type was changed only through toXContent/fromXContent of BulkItemResponse, but in it the status was already parsed seperatly. So the status should always be good ? (except after toXContent/fromXContent, the BulkItemResponse was transfered again using readFrom/writeTo, which I don't think it's the case ?)

If it's this, I changed bulkItemResponse.getFailure().getStatus(); to bulkItemResponse.status();, because it's the same.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is correct. I also don't follow why readFrom/writeTo Would cause issues, the exception type does change but the status stays the same right?

if (this.retryOnStatus != status) {
return false;
}
}
Expand Down
Loading