diff --git a/CHANGELOG.md b/CHANGELOG.md index 60371158d4933..ec1c7b626dc5f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,6 +52,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Increase the default segment counter step size when replica promoting ([#17568](https://github.com/opensearch-project/OpenSearch/pull/17568)) - [WLM] Rename QueryGroup to WorkloadGroup ([#17901](https://github.com/opensearch-project/OpenSearch/pull/17901)) - Relaxes jarHell check for optionally extended plugins([#17893](https://github.com/opensearch-project/OpenSearch/pull/17893))) +- Add cluster setting for retry timeout of publish checkpoint tx action ([#17749](https://github.com/opensearch-project/OpenSearch/pull/17749)) ### Dependencies - Bump `com.nimbusds:nimbus-jose-jwt` from 9.41.1 to 10.0.2 ([#17607](https://github.com/opensearch-project/OpenSearch/pull/17607), [#17669](https://github.com/opensearch-project/OpenSearch/pull/17669)) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 608ae2e215b31..ae093620c25b4 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -41,6 +41,7 @@ import org.opensearch.action.search.SearchResponse; import org.opensearch.action.search.SearchType; import org.opensearch.action.support.WriteRequest; +import org.opensearch.action.support.replication.TransportReplicationAction; import org.opensearch.action.termvectors.TermVectorsRequestBuilder; import org.opensearch.action.termvectors.TermVectorsResponse; import org.opensearch.action.update.UpdateResponse; @@ -59,6 +60,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.set.Sets; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.index.IndexModule; @@ -72,6 +74,7 @@ import org.opensearch.index.engine.NRTReplicationReaderManager; import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.recovery.FileChunkRequest; +import org.opensearch.indices.replication.checkpoint.PublishCheckpointAction; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.node.NodeClosedException; import org.opensearch.search.SearchService; @@ -83,6 +86,7 @@ import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.junit.annotations.TestLogging; import org.opensearch.test.transport.MockTransportService; +import org.opensearch.transport.RemoteTransportException; import org.opensearch.transport.TransportService; import org.opensearch.transport.client.Requests; import org.junit.Before; @@ -98,6 +102,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -131,6 +136,56 @@ private static String indexOrAlias() { return randomBoolean() ? INDEX_NAME : "alias"; } + public void testRetryPublishCheckPoint() throws Exception { + // Reproduce the case where the replica shard cannot synchronize data from the primary shard when there is a network exception. + // Test update of configuration PublishCheckpointAction#PUBLISH_CHECK_POINT_RETRY_TIMEOUT. + Settings mockNodeSetting = Settings.builder() + .put(TransportReplicationAction.REPLICATION_RETRY_TIMEOUT.getKey(), TimeValue.timeValueSeconds(0)) + .put(PublishCheckpointAction.PUBLISH_CHECK_POINT_RETRY_TIMEOUT.getKey(), TimeValue.timeValueSeconds(0)) + .build(); + + final String primaryNode = internalCluster().startDataOnlyNode(mockNodeSetting); + createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put("index.refresh_interval", -1).build()); + ensureYellowAndNoInitializingShards(INDEX_NAME); + final String replicaNode = internalCluster().startDataOnlyNode(mockNodeSetting); + ensureGreen(INDEX_NAME); + + // update publish checkpoint retry time out + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings( + Settings.builder().put(PublishCheckpointAction.PUBLISH_CHECK_POINT_RETRY_TIMEOUT.getKey(), TimeValue.timeValueMinutes(10)) + ) + .get(); + + // mock network exception + MockTransportService replicaTransportService = ((MockTransportService) internalCluster().getInstance( + TransportService.class, + replicaNode + )); + AtomicBoolean mockReplicaReceivePublishCheckpointException = new AtomicBoolean(true); + replicaTransportService.addRequestHandlingBehavior( + PublishCheckpointAction.ACTION_NAME + TransportReplicationAction.REPLICA_ACTION_SUFFIX, + (handler, request, channel, task) -> { + if (mockReplicaReceivePublishCheckpointException.get()) { + logger.info("mock remote transport exception"); + throw new RemoteTransportException("mock remote transport exception", new OpenSearchRejectedExecutionException()); + } + logger.info("replica receive publish checkpoint request"); + handler.messageReceived(request, channel, task); + } + ); + + client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + waitForSearchableDocs(0, replicaNode); + logger.info("ensure publish checkpoint request can be process"); + mockReplicaReceivePublishCheckpointException.set(false); + + waitForSearchableDocs(1, primaryNode, replicaNode); + replicaTransportService.clearAllRules(); + } + public void testPrimaryStopped_ReplicaPromoted() throws Exception { final String primary = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME); diff --git a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java index 29cf4003ac679..c81754b33fa62 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java @@ -249,7 +249,7 @@ protected TransportReplicationAction( this.transportReplicaAction = actionName + REPLICA_ACTION_SUFFIX; this.initialRetryBackoffBound = REPLICATION_INITIAL_RETRY_BACKOFF_BOUND.get(settings); - this.retryTimeout = REPLICATION_RETRY_TIMEOUT.get(settings); + this.retryTimeout = getRetryTimeoutSetting().get(settings); this.forceExecutionOnPrimary = forceExecutionOnPrimary; transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, requestReader, this::handleOperationRequest); @@ -273,7 +273,11 @@ protected TransportReplicationAction( ClusterSettings clusterSettings = clusterService.getClusterSettings(); clusterSettings.addSettingsUpdateConsumer(REPLICATION_INITIAL_RETRY_BACKOFF_BOUND, (v) -> initialRetryBackoffBound = v); - clusterSettings.addSettingsUpdateConsumer(REPLICATION_RETRY_TIMEOUT, (v) -> retryTimeout = v); + clusterSettings.addSettingsUpdateConsumer(getRetryTimeoutSetting(), (v) -> retryTimeout = v); + } + + protected Setting getRetryTimeoutSetting() { + return REPLICATION_RETRY_TIMEOUT; } /** diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index d0722b4f3a942..137260c82e6f5 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -132,6 +132,7 @@ import org.opensearch.indices.breaker.HierarchyCircuitBreakerService; import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.replication.checkpoint.PublishCheckpointAction; import org.opensearch.indices.store.IndicesStore; import org.opensearch.ingest.IngestService; import org.opensearch.monitor.fs.FsHealthService; @@ -440,6 +441,7 @@ public void apply(Settings value, Settings current, Settings previous) { HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING, TransportReplicationAction.REPLICATION_INITIAL_RETRY_BACKOFF_BOUND, TransportReplicationAction.REPLICATION_RETRY_TIMEOUT, + PublishCheckpointAction.PUBLISH_CHECK_POINT_RETRY_TIMEOUT, TransportSettings.HOST, TransportSettings.PUBLISH_HOST, TransportSettings.PUBLISH_HOST_PROFILE, diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java index d1e2884956f5c..7181355333be7 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java @@ -22,7 +22,9 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.inject.Inject; +import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.util.concurrent.ThreadContextAccess; import org.opensearch.core.action.ActionListener; @@ -60,6 +62,16 @@ public class PublishCheckpointAction extends TransportReplicationAction< private final SegmentReplicationTargetService replicationService; + /** + * The timeout for retrying publish checkpoint requests. + */ + public static final Setting PUBLISH_CHECK_POINT_RETRY_TIMEOUT = Setting.timeSetting( + "indices.publish_check_point.retry_timeout", + TimeValue.timeValueMinutes(5), + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + @Inject public PublishCheckpointAction( Settings settings, @@ -87,6 +99,11 @@ public PublishCheckpointAction( this.replicationService = targetService; } + @Override + protected Setting getRetryTimeoutSetting() { + return PUBLISH_CHECK_POINT_RETRY_TIMEOUT; + } + @Override protected ReplicationResponse newResponseInstance(StreamInput in) throws IOException { return new ReplicationResponse(in);