Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ public void respond(ActionListener<Response> listener) {
}
}

protected static class ReplicaResult {
public static class ReplicaResult {
final Exception finalFailure;

public ReplicaResult(Exception finalFailure) {
Expand Down
53 changes: 48 additions & 5 deletions server/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private volatile AsyncRefreshTask refreshTask;
private volatile AsyncTranslogFSync fsyncTask;
private volatile AsyncGlobalCheckpointTask globalCheckpointTask;
private volatile AsyncRetentionLeaseBackgroundSyncTask retentionLeaseBackgroundSyncTask;

// don't convert to Setting<> and register... we only set this in tests and register via a plugin
private final String INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING = "index.translog.retention.check_interval";
Expand Down Expand Up @@ -197,6 +198,7 @@ public IndexService(
this.refreshTask = new AsyncRefreshTask(this);
this.trimTranslogTask = new AsyncTrimTranslogTask(this);
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
this.retentionLeaseBackgroundSyncTask = new AsyncRetentionLeaseBackgroundSyncTask(this);
rescheduleFsyncTask(indexSettings.getTranslogDurability());
}

Expand Down Expand Up @@ -286,7 +288,8 @@ public synchronized void close(final String reason, boolean delete) throws IOExc
refreshTask,
fsyncTask,
trimTranslogTask,
globalCheckpointTask);
globalCheckpointTask,
retentionLeaseBackgroundSyncTask);
}
}
}
Expand Down Expand Up @@ -403,7 +406,7 @@ public synchronized IndexShard createShard(
searchOperationListeners,
indexingOperationListeners,
() -> globalCheckpointSyncer.accept(shardId),
(retentionLeases, listener) -> retentionLeaseSyncer.syncRetentionLeasesForShard(shardId, retentionLeases, listener),
retentionLeaseSyncer,
circuitBreakerService);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down Expand Up @@ -782,6 +785,14 @@ private void maybeTrimTranslog() {
}

private void maybeSyncGlobalCheckpoints() {
sync(is -> is.maybeSyncGlobalCheckpoint("background"), "global checkpoint");
}

private void backgroundSyncRetentionLeases() {
sync(IndexShard::backgroundSyncRetentionLeases, "retention lease");
}

private void sync(final Consumer<IndexShard> sync, final String source) {
for (final IndexShard shard : this.shards.values()) {
if (shard.routingEntry().active() && shard.routingEntry().primary()) {
switch (shard.state()) {
Expand All @@ -795,17 +806,17 @@ private void maybeSyncGlobalCheckpoints() {
case STARTED:
try {
shard.runUnderPrimaryPermit(
() -> shard.maybeSyncGlobalCheckpoint("background"),
() -> sync.accept(shard),
e -> {
if (e instanceof AlreadyClosedException == false
&& e instanceof IndexShardClosedException == false) {
logger.warn(
new ParameterizedMessage(
"{} failed to execute background global checkpoint sync", shard.shardId()), e);
"{} failed to execute background {} sync", shard.shardId(), source), e);
}
},
ThreadPool.Names.SAME,
"background global checkpoint sync");
"background " + source + " sync");
} catch (final AlreadyClosedException | IndexShardClosedException e) {
// the shard was closed concurrently, continue
}
Expand Down Expand Up @@ -911,6 +922,15 @@ public String toString() {
Property.Dynamic,
Property.IndexScope);

// this setting is intentionally not registered, it is only used in tests
public static final Setting<TimeValue> RETENTION_LEASE_SYNC_INTERVAL_SETTING =
Setting.timeSetting(
"index.soft_deletes.retention_lease.sync_interval",
new TimeValue(5, TimeUnit.MINUTES),
new TimeValue(0, TimeUnit.MILLISECONDS),
Property.Dynamic,
Property.IndexScope);

/**
* Background task that syncs the global checkpoint to replicas.
*/
Expand All @@ -937,6 +957,29 @@ public String toString() {
}
}

final class AsyncRetentionLeaseBackgroundSyncTask extends BaseAsyncTask {

AsyncRetentionLeaseBackgroundSyncTask(final IndexService indexService) {
super(indexService, RETENTION_LEASE_SYNC_INTERVAL_SETTING.get(indexService.getIndexSettings().getSettings()));
}

@Override
protected void runInternal() {
indexService.backgroundSyncRetentionLeases();
}

@Override
protected String getThreadPool() {
return ThreadPool.Names.MANAGEMENT;
}

@Override
public String toString() {
return "retention_lease_background_sync";
}

}

AsyncRefreshTask getRefreshTask() { // for tests
return refreshTask;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
private RetentionLeases retentionLeases = RetentionLeases.EMPTY;

/**
* Get all non-expired retention leases tracked on this shard. An unmodifiable copy of the retention leases is returned. Note that only
* the primary shard calculates which leases are expired, and if any have expired, syncs the retention leases to any replicas.
* Get all non-expired retention leases tracked on this shard. Note that only the primary shard calculates which leases are expired,
* and if any have expired, syncs the retention leases to any replicas.
*
* @return the retention leases
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.seqno;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Objects;

/**
* Replication action responsible for background syncing retention leases to replicas. This action is deliberately a replication action so
* that if a replica misses a background retention lease sync then that shard will not be marked as stale. We have some tolerance for a
* shard copy missing renewals of retention leases since the background sync interval is much smaller than the expected lifetime of
* retention leases.
*/
public class RetentionLeaseBackgroundSyncAction extends TransportReplicationAction<
RetentionLeaseBackgroundSyncAction.Request,
RetentionLeaseBackgroundSyncAction.Request,
ReplicationResponse> {

public static String ACTION_NAME = "indices:admin/seq_no/retention_lease_background_sync";

private static final Logger LOGGER = LogManager.getLogger(RetentionLeaseSyncAction.class);

protected Logger getLogger() {
return LOGGER;
}

@Inject
public RetentionLeaseBackgroundSyncAction(
final Settings settings,
final TransportService transportService,
final ClusterService clusterService,
final IndicesService indicesService,
final ThreadPool threadPool,
final ShardStateAction shardStateAction,
final ActionFilters actionFilters,
final IndexNameExpressionResolver indexNameExpressionResolver) {
super(
settings,
ACTION_NAME,
transportService,
clusterService,
indicesService,
threadPool,
shardStateAction,
actionFilters,
indexNameExpressionResolver,
Request::new,
Request::new,
ThreadPool.Names.MANAGEMENT);
}

/**
* Background sync the specified retention leases for the specified shard.
*
* @param shardId the shard to sync
* @param retentionLeases the retention leases to sync
*/
public void backgroundSync(
final ShardId shardId,
final RetentionLeases retentionLeases) {
Objects.requireNonNull(shardId);
Objects.requireNonNull(retentionLeases);
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
// we have to execute under the system context so that if security is enabled the sync is authorized
threadContext.markAsSystemContext();
execute(
new Request(shardId, retentionLeases),
ActionListener.wrap(
r -> {},
e -> {
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
getLogger().warn(new ParameterizedMessage("{} retention lease background sync failed", shardId), e);
}
}));
}
}

@Override
protected PrimaryResult<Request, ReplicationResponse> shardOperationOnPrimary(final Request request, final IndexShard primary) {
Objects.requireNonNull(request);
Objects.requireNonNull(primary);
primary.afterWriteOperation();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if this call is useful for it does not change translog nor Lucene.

return new PrimaryResult<>(request, new ReplicationResponse());
}

@Override
protected ReplicaResult shardOperationOnReplica(final Request request, final IndexShard replica){
Objects.requireNonNull(request);
Objects.requireNonNull(replica);
replica.updateRetentionLeasesOnReplica(request.getRetentionLeases());
replica.afterWriteOperation();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here - I am not sure if this call is useful for it does not change translog nor Lucene.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal is to trigger periodic flush. Let us discuss the exact mechanics of this in a follow-up as that work develops.

return new ReplicaResult();
}

public static final class Request extends ReplicationRequest<Request> {

private RetentionLeases retentionLeases;

public RetentionLeases getRetentionLeases() {
return retentionLeases;
}

public Request() {

}

public Request(final ShardId shardId, final RetentionLeases retentionLeases) {
super(Objects.requireNonNull(shardId));
this.retentionLeases = Objects.requireNonNull(retentionLeases);
}

@Override
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
retentionLeases = new RetentionLeases(in);
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(Objects.requireNonNull(out));
retentionLeases.writeTo(out);
}

@Override
public String toString() {
return "Request{" +
"retentionLeases=" + retentionLeases +
", shardId=" + shardId +
", timeout=" + timeout +
", index='" + index + '\'' +
", waitForActiveShards=" + waitForActiveShards +
'}';
}

}

@Override
protected ReplicationResponse newResponseInstance() {
return new ReplicationResponse();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public RetentionLeaseSyncAction(
* @param retentionLeases the retention leases to sync
* @param listener the callback to invoke when the sync completes normally or abnormally
*/
public void syncRetentionLeasesForShard(
public void sync(
final ShardId shardId,
final RetentionLeases retentionLeases,
final ActionListener<ReplicationResponse> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
* A functional interface that represents a method for syncing retention leases to replica shards after a new retention lease is added on
* the primary.
*/
@FunctionalInterface
public interface RetentionLeaseSyncer {

/**
Expand All @@ -38,9 +37,20 @@ public interface RetentionLeaseSyncer {
* @param retentionLeases the retention leases to sync
* @param listener the callback when sync completes
*/
void syncRetentionLeasesForShard(
ShardId shardId,
RetentionLeases retentionLeases,
ActionListener<ReplicationResponse> listener);
void sync(ShardId shardId, RetentionLeases retentionLeases, ActionListener<ReplicationResponse> listener);

void backgroundSync(ShardId shardId, RetentionLeases retentionLeases);

RetentionLeaseSyncer EMPTY = new RetentionLeaseSyncer() {
@Override
public void sync(final ShardId shardId, final RetentionLeases retentionLeases, final ActionListener<ReplicationResponse> listener) {

}

@Override
public void backgroundSync(final ShardId shardId, final RetentionLeases retentionLeases) {

}
};

}
Loading