Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Avoid skewed segment replication lag metric ([#17831](https://github.com/opensearch-project/OpenSearch/pull/17831))
- Increase the default segment counter step size when replica promoting ([#17568](https://github.com/opensearch-project/OpenSearch/pull/17568))
- [WLM] Rename QueryGroup to WorkloadGroup ([#17901](https://github.com/opensearch-project/OpenSearch/pull/17901))
- Add cluster setting for retry timeout of publish checkpoint tx action ([#17749](https://github.com/opensearch-project/OpenSearch/pull/17749))

### Dependencies
- Bump `com.nimbusds:nimbus-jose-jwt` from 9.41.1 to 10.0.2 ([#17607](https://github.com/opensearch-project/OpenSearch/pull/17607), [#17669](https://github.com/opensearch-project/OpenSearch/pull/17669))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchType;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.support.replication.TransportReplicationAction;
import org.opensearch.action.termvectors.TermVectorsRequestBuilder;
import org.opensearch.action.termvectors.TermVectorsResponse;
import org.opensearch.action.update.UpdateResponse;
Expand All @@ -59,6 +60,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.set.Sets;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.IndexModule;
Expand All @@ -72,6 +74,7 @@
import org.opensearch.index.engine.NRTReplicationReaderManager;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.replication.checkpoint.PublishCheckpointAction;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.node.NodeClosedException;
import org.opensearch.search.SearchService;
Expand All @@ -83,6 +86,7 @@
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.junit.annotations.TestLogging;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.RemoteTransportException;
import org.opensearch.transport.TransportService;
import org.opensearch.transport.client.Requests;
import org.junit.Before;
Expand All @@ -98,6 +102,7 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -131,6 +136,56 @@ private static String indexOrAlias() {
return randomBoolean() ? INDEX_NAME : "alias";
}

public void testRetryPublishCheckPoint() throws Exception {
// Reproduce the situation where the replica shard cannot synchronize data from the primary shard when there is a network exception.
// Test update of configuration PublishCheckpointAction#PUBLISH_CHECK_POINT_RETRY_TIMEOUT.
Settings mockNodeSetting = Settings.builder()
.put(TransportReplicationAction.REPLICATION_RETRY_TIMEOUT.getKey(), TimeValue.timeValueSeconds(0))
.put(PublishCheckpointAction.PUBLISH_CHECK_POINT_RETRY_TIMEOUT.getKey(), TimeValue.timeValueSeconds(0))
.build();

final String primaryNode = internalCluster().startDataOnlyNode(mockNodeSetting);
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put("index.refresh_interval", -1).build());
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startDataOnlyNode(mockNodeSetting);
ensureGreen(INDEX_NAME);

// update publish checkpoint retry time out
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(
Settings.builder().put(PublishCheckpointAction.PUBLISH_CHECK_POINT_RETRY_TIMEOUT.getKey(), TimeValue.timeValueMinutes(10))
)
.get();

// mock network exception
MockTransportService replicaTransportService = ((MockTransportService) internalCluster().getInstance(
TransportService.class,
replicaNode
));
AtomicBoolean mockReplicaReceivePublishCheckpointException = new AtomicBoolean(true);
replicaTransportService.addRequestHandlingBehavior(
PublishCheckpointAction.ACTION_NAME + TransportReplicationAction.REPLICA_ACTION_SUFFIX,
(handler, request, channel, task) -> {
if (mockReplicaReceivePublishCheckpointException.get()) {
logger.info("mock remote transport exception");
throw new RemoteTransportException("mock remote transport exception", new OpenSearchRejectedExecutionException());
}
logger.info("replica receive publish checkpoint request");
handler.messageReceived(request, channel, task);
}
);

client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
waitForSearchableDocs(0, replicaNode);
logger.info("ensure publish checkpoint request can be process");
mockReplicaReceivePublishCheckpointException.set(false);

waitForSearchableDocs(1, primaryNode, replicaNode);
replicaTransportService.clearAllRules();
}

public void testPrimaryStopped_ReplicaPromoted() throws Exception {
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ protected TransportReplicationAction(
this.transportReplicaAction = actionName + REPLICA_ACTION_SUFFIX;

this.initialRetryBackoffBound = REPLICATION_INITIAL_RETRY_BACKOFF_BOUND.get(settings);
this.retryTimeout = REPLICATION_RETRY_TIMEOUT.get(settings);
this.retryTimeout = getRetryTimeoutSetting().get(settings);
this.forceExecutionOnPrimary = forceExecutionOnPrimary;

transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, requestReader, this::handleOperationRequest);
Expand All @@ -273,7 +273,11 @@ protected TransportReplicationAction(

ClusterSettings clusterSettings = clusterService.getClusterSettings();
clusterSettings.addSettingsUpdateConsumer(REPLICATION_INITIAL_RETRY_BACKOFF_BOUND, (v) -> initialRetryBackoffBound = v);
clusterSettings.addSettingsUpdateConsumer(REPLICATION_RETRY_TIMEOUT, (v) -> retryTimeout = v);
clusterSettings.addSettingsUpdateConsumer(getRetryTimeoutSetting(), (v) -> retryTimeout = v);
}

protected Setting<TimeValue> getRetryTimeoutSetting() {
return REPLICATION_RETRY_TIMEOUT;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
import org.opensearch.indices.breaker.HierarchyCircuitBreakerService;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.replication.checkpoint.PublishCheckpointAction;
import org.opensearch.indices.store.IndicesStore;
import org.opensearch.ingest.IngestService;
import org.opensearch.monitor.fs.FsHealthService;
Expand Down Expand Up @@ -440,6 +441,7 @@ public void apply(Settings value, Settings current, Settings previous) {
HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING,
TransportReplicationAction.REPLICATION_INITIAL_RETRY_BACKOFF_BOUND,
TransportReplicationAction.REPLICATION_RETRY_TIMEOUT,
PublishCheckpointAction.PUBLISH_CHECK_POINT_RETRY_TIMEOUT,
TransportSettings.HOST,
TransportSettings.PUBLISH_HOST,
TransportSettings.PUBLISH_HOST_PROFILE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.util.concurrent.ThreadContextAccess;
import org.opensearch.core.action.ActionListener;
Expand Down Expand Up @@ -60,6 +62,16 @@ public class PublishCheckpointAction extends TransportReplicationAction<

private final SegmentReplicationTargetService replicationService;

/**
* The timeout for retrying publish checkpoint requests.
*/
public static final Setting<TimeValue> PUBLISH_CHECK_POINT_RETRY_TIMEOUT = Setting.timeSetting(
"indices.publish_check_point.retry_timeout",
TimeValue.timeValueMinutes(5),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

@Inject
public PublishCheckpointAction(
Settings settings,
Expand Down Expand Up @@ -87,6 +99,11 @@ public PublishCheckpointAction(
this.replicationService = targetService;
}

@Override
protected Setting<TimeValue> getRetryTimeoutSetting() {
return PUBLISH_CHECK_POINT_RETRY_TIMEOUT;
}

@Override
protected ReplicationResponse newResponseInstance(StreamInput in) throws IOException {
return new ReplicationResponse(in);
Expand Down
Loading