Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParseException;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.discovery.FailedToCommitClusterStateException;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
Expand Down Expand Up @@ -1006,8 +1007,8 @@ private enum ElasticsearchExceptionHandle {
UNKNOWN_VERSION_ADDED),
TYPE_MISSING_EXCEPTION(org.elasticsearch.indices.TypeMissingException.class,
org.elasticsearch.indices.TypeMissingException::new, 137, UNKNOWN_VERSION_ADDED),
FAILED_TO_COMMIT_CLUSTER_STATE_EXCEPTION(org.elasticsearch.discovery.Discovery.FailedToCommitClusterStateException.class,
org.elasticsearch.discovery.Discovery.FailedToCommitClusterStateException::new, 140, UNKNOWN_VERSION_ADDED),
FAILED_TO_COMMIT_CLUSTER_STATE_EXCEPTION(FailedToCommitClusterStateException.class,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost all of these registrations use the fully-qualified class name (except CoordinationStateRejectedException, oops) so it looks like this should too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, also fixed for CoordinationStateRejectedException in d3c5a3d

FailedToCommitClusterStateException::new, 140, UNKNOWN_VERSION_ADDED),
QUERY_SHARD_EXCEPTION(org.elasticsearch.index.query.QueryShardException.class,
org.elasticsearch.index.query.QueryShardException::new, 141, UNKNOWN_VERSION_ADDED),
NO_LONGER_PRIMARY_SHARD_EXCEPTION(ShardStateAction.NoLongerPrimaryShardException.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.FailedToCommitClusterStateException;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -174,7 +174,7 @@ public void onResponse(Response response) {

@Override
public void onFailure(Exception t) {
if (t instanceof Discovery.FailedToCommitClusterStateException
if (t instanceof FailedToCommitClusterStateException
|| (t instanceof NotMasterException)) {
logger.debug(() -> new ParameterizedMessage("master could not publish cluster state or stepped down before publishing action [{}], scheduling a retry", actionName), t);
retry(t, masterChangePredicate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.FailedToCommitClusterStateException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -132,7 +132,7 @@ public void handleException(TransportException exp) {
private static Class[] MASTER_CHANNEL_EXCEPTIONS = new Class[]{
NotMasterException.class,
ConnectTransportException.class,
Discovery.FailedToCommitClusterStateException.class
FailedToCommitClusterStateException.class
};

private static boolean isMasterChannelException(TransportException exp) {
Expand Down Expand Up @@ -625,7 +625,7 @@ default void onSuccess() {
* are:
* - {@link NotMasterException}
* - {@link NodeDisconnectedException}
* - {@link Discovery.FailedToCommitClusterStateException}
* - {@link FailedToCommitClusterStateException}
*
* Any other exception is communicated to the requester via
* this notification.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,25 +59,31 @@ public class ClusterService extends AbstractLifecycleComponent {
private final OperationRouting operationRouting;

private final ClusterSettings clusterSettings;
private final Map<String, Supplier<ClusterState.Custom>> initialClusterStateCustoms;

public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool,
Map<String, Supplier<ClusterState.Custom>> initialClusterStateCustoms) {
this(settings, clusterSettings, new MasterService(settings, threadPool),
new ClusterApplierService(settings, clusterSettings, threadPool,
() -> ClusterService.newClusterStateBuilder(settings, initialClusterStateCustoms)));
}

public ClusterService(Settings settings, ClusterSettings clusterSettings,
MasterService masterService, ClusterApplierService clusterApplierService) {
super(settings);
this.masterService = new MasterService(settings, threadPool);
this.masterService = masterService;
this.operationRouting = new OperationRouting(settings, clusterSettings);
this.clusterSettings = clusterSettings;
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
this.clusterSettings.addSettingsUpdateConsumer(CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
this::setSlowTaskLoggingThreshold);
this.initialClusterStateCustoms = initialClusterStateCustoms;
this.clusterApplierService = new ClusterApplierService(settings, clusterSettings, threadPool, this::newClusterStateBuilder);
this.clusterApplierService = clusterApplierService;
}

/**
* Creates a new cluster state builder that is initialized with the cluster name and all initial cluster state customs.
*/
public ClusterState.Builder newClusterStateBuilder() {
private static ClusterState.Builder newClusterStateBuilder(Settings settings,
Map<String, Supplier<ClusterState.Custom>> initialClusterStateCustoms) {
ClusterState.Builder builder = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings));
for (Map.Entry<String, Supplier<ClusterState.Custom>> entry : initialClusterStateCustoms.entrySet()) {
builder.putCustom(entry.getKey(), entry.getValue().get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Assertions;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.AckedClusterStateTaskListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -48,7 +49,9 @@
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.discovery.ClusterStatePublisher;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.FailedToCommitClusterStateException;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Arrays;
Expand All @@ -57,9 +60,9 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;

Expand All @@ -70,7 +73,7 @@ public class MasterService extends AbstractLifecycleComponent {

public static final String MASTER_UPDATE_THREAD_NAME = "masterService#updateTask";

private BiConsumer<ClusterChangedEvent, Discovery.AckListener> clusterStatePublisher;
protected ClusterStatePublisher clusterStatePublisher;

private java.util.function.Supplier<ClusterState> clusterStateSupplier;

Expand All @@ -92,7 +95,7 @@ public void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
this.slowTaskLoggingThreshold = slowTaskLoggingThreshold;
}

public synchronized void setClusterStatePublisher(BiConsumer<ClusterChangedEvent, Discovery.AckListener> publisher) {
public synchronized void setClusterStatePublisher(ClusterStatePublisher publisher) {
clusterStatePublisher = publisher;
}

Expand All @@ -104,12 +107,16 @@ public synchronized void setClusterStateSupplier(java.util.function.Supplier<Clu
protected synchronized void doStart() {
Objects.requireNonNull(clusterStatePublisher, "please set a cluster state publisher before starting");
Objects.requireNonNull(clusterStateSupplier, "please set a cluster state supplier before starting");
threadPoolExecutor = EsExecutors.newSinglePrioritizing(
threadPoolExecutor = createThreadPoolExecutor();
taskBatcher = new Batcher(logger, threadPoolExecutor);
}

protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
return EsExecutors.newSinglePrioritizing(
nodeName() + "/" + MASTER_UPDATE_THREAD_NAME,
daemonThreadFactory(settings, MASTER_UPDATE_THREAD_NAME),
threadPool.getThreadContext(),
threadPool.scheduler());
taskBatcher = new Batcher(logger, threadPoolExecutor);
}

class Batcher extends TaskBatcher {
Expand Down Expand Up @@ -222,46 +229,87 @@ protected void runTasks(TaskInputs taskInputs) {
}

logger.debug("publishing cluster state version [{}]", newClusterState.version());
try {
clusterStatePublisher.accept(clusterChangedEvent, taskOutputs.createAckListener(threadPool, newClusterState));
} catch (Discovery.FailedToCommitClusterStateException t) {
final long version = newClusterState.version();
logger.warn(() -> new ParameterizedMessage(
"failing [{}]: failed to commit cluster state version [{}]", summary, version), t);
taskOutputs.publishingFailed(t);
return;
}
publish(clusterChangedEvent, taskOutputs, startTimeNS);
} catch (Exception e) {
handleException(summary, startTimeNS, newClusterState, e);
}
}
}

protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeNS) throws Exception {
CompletableFuture<Void> fut = new CompletableFuture<>();
clusterStatePublisher.publish(clusterChangedEvent, new ActionListener<Void>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use ActionListener#wrap to make this slightly more compact. Also, I presume you consciously choose for a CompletableFuture over PlainActionFuture ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I did not choose PlainActionFuture was because it asserts that we're not blocking on the MasterServiceUpdateThread (which this future deliberately does). Unfortunately, CompletableFuture has other problems (#32512 (comment)), so I've gone back to PlainActionFuture, but added a hook that allows to disable checking some of the assertions, see 526511d

@Override
public void onResponse(Void aVoid) {
fut.complete(aVoid);
}

@Override
public void onFailure(Exception e) {
fut.completeExceptionally(e);
}
}, taskOutputs.createAckListener(threadPool, clusterChangedEvent.state()));

final ActionListener<Void> publishListener = getPublishListener(clusterChangedEvent, taskOutputs, startTimeNS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this extra listener construct? at the moment it's activated fully sequentially. It will be simpler to just process the results of the future inline?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The extra listener construct is not needed. I've changed this in c84ddf7

// indefinitely wait for publication to complete
try {
FutureUtils.get(fut);
publishListener.onResponse(null);
} catch (Exception e) {
publishListener.onFailure(e);
}
}

taskOutputs.processedDifferentClusterState(previousClusterState, newClusterState);
protected ActionListener<Void> getPublishListener(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeNS) {
return new ActionListener<Void>() {

@Override
public void onResponse(Void ignore) {
taskOutputs.processedDifferentClusterState(clusterChangedEvent.previousState(), clusterChangedEvent.state());

try {
taskOutputs.clusterStatePublished(clusterChangedEvent);
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage(
"exception thrown while notifying executor of new cluster state publication [{}]",
summary), e);
"exception thrown while notifying executor of new cluster state publication [{}]",
clusterChangedEvent.source()), e);
}
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
logger.debug("processing [{}]: took [{}] done publishing updated cluster state (version: {}, uuid: {})", summary,
executionTime, newClusterState.version(),
newClusterState.stateUUID());
warnAboutSlowTaskIfNeeded(executionTime, summary);
} catch (Exception e) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
final long version = newClusterState.version();
final String stateUUID = newClusterState.stateUUID();
final String fullState = newClusterState.toString();
logger.warn(() -> new ParameterizedMessage(
"failed to publish updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}",
executionTime,
version,
stateUUID,
summary,
fullState),
e);
// TODO: do we want to call updateTask.onFailure here?
logger.debug("processing [{}]: took [{}] done publishing updated cluster state (version: {}, uuid: {})",
clusterChangedEvent.source(),
executionTime, clusterChangedEvent.state().version(),
clusterChangedEvent.state().stateUUID());
warnAboutSlowTaskIfNeeded(executionTime, clusterChangedEvent.source());
}
}

@Override
public void onFailure(Exception exception) {
if (exception instanceof FailedToCommitClusterStateException) {
final long version = clusterChangedEvent.state().version();
logger.warn(() -> new ParameterizedMessage(
"failing [{}]: failed to commit cluster state version [{}]", clusterChangedEvent.source(), version), exception);
taskOutputs.publishingFailed((FailedToCommitClusterStateException) exception);
} else {
handleException(clusterChangedEvent.source(), startTimeNS, clusterChangedEvent.state(), exception);
}
}
};
}

private void handleException(String summary, long startTimeNS, ClusterState newClusterState, Exception e) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
final long version = newClusterState.version();
final String stateUUID = newClusterState.stateUUID();
final String fullState = newClusterState.toString();
logger.warn(() -> new ParameterizedMessage(
"failed to publish updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}",
executionTime,
version,
stateUUID,
summary,
fullState),
e);
// TODO: do we want to call updateTask.onFailure here?
}

public TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState, long startTimeNS) {
Expand All @@ -276,7 +324,7 @@ private ClusterState patchVersions(ClusterState previousClusterState, ClusterTas

if (previousClusterState != newClusterState) {
// only the master controls the version numbers
Builder builder = ClusterState.builder(newClusterState).incrementVersion();
Builder builder = incrementVersion(newClusterState);
if (previousClusterState.routingTable() != newClusterState.routingTable()) {
builder.routingTable(RoutingTable.builder(newClusterState.routingTable())
.version(newClusterState.routingTable().version() + 1).build());
Expand All @@ -291,6 +339,10 @@ private ClusterState patchVersions(ClusterState previousClusterState, ClusterTas
return newClusterState;
}

public Builder incrementVersion(ClusterState clusterState) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be protected

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in 526511d

return ClusterState.builder(clusterState).incrementVersion();
}

/**
* Submits a cluster state update task; unlike {@link #submitStateUpdateTask(String, Object, ClusterStateTaskConfig,
* ClusterStateTaskExecutor, ClusterStateTaskListener)}, submitted updates will not be batched.
Expand Down Expand Up @@ -335,7 +387,7 @@ public <T> void submitStateUpdateTask(String source, T task,
/**
* Output created by executing a set of tasks provided as TaskInputs
*/
class TaskOutputs {
protected class TaskOutputs {
public final TaskInputs taskInputs;
public final ClusterState previousClusterState;
public final ClusterState newClusterState;
Expand All @@ -353,7 +405,7 @@ class TaskOutputs {
this.executionResults = executionResults;
}

public void publishingFailed(Discovery.FailedToCommitClusterStateException t) {
public void publishingFailed(FailedToCommitClusterStateException t) {
nonFailedTasks.forEach(task -> task.listener.onFailure(task.source(), t));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
private final Queue<Runnable> current = ConcurrentCollections.newQueue();
private final ScheduledExecutorService timer;

PrioritizedEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
public PrioritizedEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
ThreadFactory threadFactory, ThreadContext contextHolder, ScheduledExecutorService timer) {
super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(), threadFactory, contextHolder);
this.timer = timer;
Expand Down
Loading