Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Increase the default segment counter step size when replica promoting ([#17568](https://github.com/opensearch-project/OpenSearch/pull/17568))
- [WLM] Rename QueryGroup to WorkloadGroup ([#17901](https://github.com/opensearch-project/OpenSearch/pull/17901))
- Relaxes jarHell check for optionally extended plugins([#17893](https://github.com/opensearch-project/OpenSearch/pull/17893)))
- Add cluster setting for retry timeout of publish checkpoint tx action ([#17749](https://github.com/opensearch-project/OpenSearch/pull/17749))

### Dependencies
- Bump `com.nimbusds:nimbus-jose-jwt` from 9.41.1 to 10.0.2 ([#17607](https://github.com/opensearch-project/OpenSearch/pull/17607), [#17669](https://github.com/opensearch-project/OpenSearch/pull/17669))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchType;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.support.replication.TransportReplicationAction;
import org.opensearch.action.termvectors.TermVectorsRequestBuilder;
import org.opensearch.action.termvectors.TermVectorsResponse;
import org.opensearch.action.update.UpdateResponse;
Expand All @@ -59,6 +60,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.set.Sets;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.IndexModule;
Expand All @@ -72,6 +74,7 @@
import org.opensearch.index.engine.NRTReplicationReaderManager;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.replication.checkpoint.PublishCheckpointAction;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.node.NodeClosedException;
import org.opensearch.search.SearchService;
Expand All @@ -83,6 +86,7 @@
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.junit.annotations.TestLogging;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.RemoteTransportException;
import org.opensearch.transport.TransportService;
import org.opensearch.transport.client.Requests;
import org.junit.Before;
Expand All @@ -98,6 +102,7 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -131,6 +136,56 @@ private static String indexOrAlias() {
return randomBoolean() ? INDEX_NAME : "alias";
}

public void testRetryPublishCheckPoint() throws Exception {
// Reproduce the case where the replica shard cannot synchronize data from the primary shard when there is a network exception.
// Test update of configuration PublishCheckpointAction#PUBLISH_CHECK_POINT_RETRY_TIMEOUT.
Settings mockNodeSetting = Settings.builder()
.put(TransportReplicationAction.REPLICATION_RETRY_TIMEOUT.getKey(), TimeValue.timeValueSeconds(0))
.put(PublishCheckpointAction.PUBLISH_CHECK_POINT_RETRY_TIMEOUT.getKey(), TimeValue.timeValueSeconds(0))
.build();

final String primaryNode = internalCluster().startDataOnlyNode(mockNodeSetting);
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put("index.refresh_interval", -1).build());
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startDataOnlyNode(mockNodeSetting);
ensureGreen(INDEX_NAME);

// update publish checkpoint retry time out
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(
Settings.builder().put(PublishCheckpointAction.PUBLISH_CHECK_POINT_RETRY_TIMEOUT.getKey(), TimeValue.timeValueMinutes(10))
)
.get();

// mock network exception
MockTransportService replicaTransportService = ((MockTransportService) internalCluster().getInstance(
TransportService.class,
replicaNode
));
AtomicBoolean mockReplicaReceivePublishCheckpointException = new AtomicBoolean(true);
replicaTransportService.addRequestHandlingBehavior(
PublishCheckpointAction.ACTION_NAME + TransportReplicationAction.REPLICA_ACTION_SUFFIX,
(handler, request, channel, task) -> {
if (mockReplicaReceivePublishCheckpointException.get()) {
logger.info("mock remote transport exception");
throw new RemoteTransportException("mock remote transport exception", new OpenSearchRejectedExecutionException());
}
logger.info("replica receive publish checkpoint request");
handler.messageReceived(request, channel, task);
}
);

client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
waitForSearchableDocs(0, replicaNode);
logger.info("ensure publish checkpoint request can be process");
mockReplicaReceivePublishCheckpointException.set(false);

waitForSearchableDocs(1, primaryNode, replicaNode);
replicaTransportService.clearAllRules();
}

public void testPrimaryStopped_ReplicaPromoted() throws Exception {
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ protected TransportReplicationAction(
this.transportReplicaAction = actionName + REPLICA_ACTION_SUFFIX;

this.initialRetryBackoffBound = REPLICATION_INITIAL_RETRY_BACKOFF_BOUND.get(settings);
this.retryTimeout = REPLICATION_RETRY_TIMEOUT.get(settings);
this.retryTimeout = getRetryTimeoutSetting().get(settings);
this.forceExecutionOnPrimary = forceExecutionOnPrimary;

transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, requestReader, this::handleOperationRequest);
Expand All @@ -273,7 +273,11 @@ protected TransportReplicationAction(

ClusterSettings clusterSettings = clusterService.getClusterSettings();
clusterSettings.addSettingsUpdateConsumer(REPLICATION_INITIAL_RETRY_BACKOFF_BOUND, (v) -> initialRetryBackoffBound = v);
clusterSettings.addSettingsUpdateConsumer(REPLICATION_RETRY_TIMEOUT, (v) -> retryTimeout = v);
clusterSettings.addSettingsUpdateConsumer(getRetryTimeoutSetting(), (v) -> retryTimeout = v);
}

protected Setting<TimeValue> getRetryTimeoutSetting() {
return REPLICATION_RETRY_TIMEOUT;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
import org.opensearch.indices.breaker.HierarchyCircuitBreakerService;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.replication.checkpoint.PublishCheckpointAction;
import org.opensearch.indices.store.IndicesStore;
import org.opensearch.ingest.IngestService;
import org.opensearch.monitor.fs.FsHealthService;
Expand Down Expand Up @@ -440,6 +441,7 @@ public void apply(Settings value, Settings current, Settings previous) {
HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING,
TransportReplicationAction.REPLICATION_INITIAL_RETRY_BACKOFF_BOUND,
TransportReplicationAction.REPLICATION_RETRY_TIMEOUT,
PublishCheckpointAction.PUBLISH_CHECK_POINT_RETRY_TIMEOUT,
TransportSettings.HOST,
TransportSettings.PUBLISH_HOST,
TransportSettings.PUBLISH_HOST_PROFILE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.util.concurrent.ThreadContextAccess;
import org.opensearch.core.action.ActionListener;
Expand Down Expand Up @@ -60,6 +62,16 @@ public class PublishCheckpointAction extends TransportReplicationAction<

private final SegmentReplicationTargetService replicationService;

/**
* The timeout for retrying publish checkpoint requests.
*/
public static final Setting<TimeValue> PUBLISH_CHECK_POINT_RETRY_TIMEOUT = Setting.timeSetting(
"indices.publish_check_point.retry_timeout",
TimeValue.timeValueMinutes(5),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

@Inject
public PublishCheckpointAction(
Settings settings,
Expand Down Expand Up @@ -87,6 +99,11 @@ public PublishCheckpointAction(
this.replicationService = targetService;
}

@Override
protected Setting<TimeValue> getRetryTimeoutSetting() {
return PUBLISH_CHECK_POINT_RETRY_TIMEOUT;
}

@Override
protected ReplicationResponse newResponseInstance(StreamInput in) throws IOException {
return new ReplicationResponse(in);
Expand Down
Loading