Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.breaker.CircuitBreakingException;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.core.index.Index;
Expand Down Expand Up @@ -250,24 +249,6 @@ private void assertOnGoingRecoveryState(
assertThat(state.getStage(), not(equalTo(Stage.DONE)));
}

private void slowDownRecovery(ByteSizeValue shardSize) {
long chunkSize = Math.max(1, shardSize.getBytes() / 10);
assertTrue(
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder()
// one chunk per sec..
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), chunkSize, ByteSizeUnit.BYTES)
// small chunks
.put(INDICES_RECOVERY_CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(chunkSize, ByteSizeUnit.BYTES))
)
.get()
.isAcknowledged()
);
}

private void restoreRecoverySpeed() {
assertTrue(
client().admin()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ public class MigrationBaseTestCase extends OpenSearchIntegTestCase {
boolean addRemote = false;
Settings extraSettings = Settings.EMPTY;

private static final int MIN_DOC_COUNT = 50;
private static final int MAX_DOC_COUNT = 100;

private final List<String> documentKeys = List.of(
randomAlphaOfLength(5),
randomAlphaOfLength(5),
Expand Down Expand Up @@ -145,6 +148,10 @@ public BulkResponse indexBulk(String indexName, int numDocs) {
return client().bulk(bulkRequest).actionGet();
}

protected int numDocs() {
return between(MIN_DOC_COUNT, MAX_DOC_COUNT);
}

Map<String, Integer> getShardCountByNodeId() {
final Map<String, Integer> shardCountByNodeId = new HashMap<>();
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
Expand All @@ -167,9 +174,17 @@ private void indexSingleDoc(String indexName) {
client().prepareIndex(indexName).setSource("auto", true).get();
}

private int bulkIndex(String indexName) throws InterruptedException {
final int numDocs = numDocs();
indexBulk(indexName, numDocs);
return numDocs;
}

public class AsyncIndexingService {
private String indexName;
private AtomicLong indexedDocs = new AtomicLong(0);
private final String indexName;
private final AtomicLong indexedDocs = new AtomicLong(0);

private final AtomicLong singleIndexedDocs = new AtomicLong(0);
private AtomicBoolean finished = new AtomicBoolean();
private Thread indexingThread;

Expand All @@ -193,11 +208,28 @@ public long getIndexedDocs() {
return indexedDocs.get();
}

public long getSingleIndexedDocs() {
return singleIndexedDocs.get();
}

private Thread getIndexingThread() {
return new Thread(() -> {
int iteration = 0;
while (finished.get() == false) {
indexSingleDoc(indexName);
long currentDocCount = indexedDocs.incrementAndGet();
long currentDocCount;
if (rarely()) {
indexSingleDoc(indexName);
currentDocCount = indexedDocs.incrementAndGet();
singleIndexedDocs.incrementAndGet();
} else {
try {
currentDocCount = indexedDocs.addAndGet(bulkIndex(indexName));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
iteration++;

if (currentDocCount > 0 && currentDocCount % refreshFrequency == 0) {
if (rarely()) {
client().admin().indices().prepareFlush(indexName).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.client.Client;
import org.opensearch.client.Requests;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.plugins.Plugin;
Expand All @@ -38,6 +40,9 @@

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemotePrimaryRelocationIT extends MigrationBaseTestCase {

private final String INDEX_NAME = "primary-relocation";

protected int maximumNumberOfShards() {
return 1;
}
Expand All @@ -57,15 +62,17 @@ public void testRemotePrimaryRelocation() throws Exception {
updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

// create shard with 0 replica and 1 shard
client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get();
ensureGreen("test");
// create shard with 1 replica and 1 shard
Settings settings = super.indexSettings();
settings = Settings.builder().put(settings).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build();
client().admin().indices().prepareCreate(INDEX_NAME).setSettings(settings).setMapping("field", "type=text").get();
ensureGreen(INDEX_NAME);

AtomicInteger numAutoGenDocs = new AtomicInteger();
final AtomicBoolean finished = new AtomicBoolean(false);
AsyncIndexingService asyncIndexingService = new AsyncIndexingService("test");
AsyncIndexingService asyncIndexingService = new AsyncIndexingService(INDEX_NAME);
asyncIndexingService.startIndexing();
refresh("test");
refresh(INDEX_NAME);

// add remote node in mixed mode cluster
setAddRemote(true);
Expand All @@ -86,7 +93,13 @@ public void testRemotePrimaryRelocation() throws Exception {
// Index some more docs
int currentDoc = numAutoGenDocs.get();
int finalCurrentDoc1 = currentDoc;
waitUntil(() -> numAutoGenDocs.get() > finalCurrentDoc1 + 5);
waitUntil(() -> numAutoGenDocs.get() > finalCurrentDoc1 + 100);

ByteSizeValue shardSize = client().admin().indices().prepareStats(INDEX_NAME).execute().actionGet().getShards()[0].getStats()
.getStore()
.size();
logger.info("Shard size after migration is {}", shardSize);
slowDownRecovery(shardSize);

// Change direction to remote store
updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store"));
Expand All @@ -96,42 +109,44 @@ public void testRemotePrimaryRelocation() throws Exception {
client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand("test", 0, primaryNodeName("test"), remoteNode))
.add(new MoveAllocationCommand(INDEX_NAME, 0, primaryNodeName(INDEX_NAME), remoteNode))
.execute()
.actionGet();
waitForRelocation();
assertEquals(remoteNode, primaryNodeName("test"));
logger.info("--> relocation from docrep to remote complete");
assertEquals(remoteNode, primaryNodeName(INDEX_NAME));

// Index some more docs
currentDoc = numAutoGenDocs.get();
int finalCurrentDoc = currentDoc;
waitUntil(() -> numAutoGenDocs.get() > finalCurrentDoc + 5);

// increase recovery speed a bit to account for current size
shardSize = client().admin().indices().prepareStats(INDEX_NAME).execute().actionGet().getShards()[0].getStats().getStore().size();
slowDownRecovery(shardSize);

client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand("test", 0, remoteNode, remoteNode2))
.add(new MoveAllocationCommand(INDEX_NAME, 0, remoteNode, remoteNode2))
.execute()
.actionGet();
waitForRelocation();
assertEquals(remoteNode2, primaryNodeName("test"));

assertEquals(remoteNode2, primaryNodeName(INDEX_NAME));
logger.info("--> relocation from remote to remote complete");

finished.set(true);
asyncIndexingService.stopIndexing();
refresh("test");
refresh(INDEX_NAME);
OpenSearchAssertions.assertHitCount(
client().prepareSearch("test").setTrackTotalHits(true).get(),
client().prepareSearch(INDEX_NAME).setTrackTotalHits(true).get(),
asyncIndexingService.getIndexedDocs()
);
OpenSearchAssertions.assertHitCount(
client().prepareSearch("test")
client().prepareSearch(INDEX_NAME)
.setTrackTotalHits(true)// extra paranoia ;)
.setQuery(QueryBuilders.termQuery("auto", true))
.get(),
asyncIndexingService.getIndexedDocs()
asyncIndexingService.getSingleIndexedDocs()
);
}

Expand All @@ -142,13 +157,13 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception {
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

// create shard with 0 replica and 1 shard
client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get();
ensureGreen("test");
client().admin().indices().prepareCreate(INDEX_NAME).setSettings(indexSettings()).setMapping("field", "type=text").get();
ensureGreen(INDEX_NAME);

AsyncIndexingService asyncIndexingService = new AsyncIndexingService("test");
AsyncIndexingService asyncIndexingService = new AsyncIndexingService(INDEX_NAME);
asyncIndexingService.startIndexing();

refresh("test");
refresh(INDEX_NAME);

// add remote node in mixed mode cluster
setAddRemote(true);
Expand All @@ -167,7 +182,12 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception {
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

logger.info("--> relocating from {} to {} ", docRepNode, remoteNode);
client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, docRepNode, remoteNode)).execute().actionGet();
client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand(INDEX_NAME, 0, docRepNode, remoteNode))
.execute()
.actionGet();
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
Expand All @@ -186,7 +206,7 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception {
.waitForNoInitializingShards(true);
ClusterHealthResponse actionGet = client().admin().cluster().health(healthRequest).actionGet();
assertEquals(actionGet.getRelocatingShards(), 0);
assertEquals(docRepNode, primaryNodeName("test"));
assertEquals(docRepNode, primaryNodeName(INDEX_NAME));

asyncIndexingService.stopIndexing();
client().admin()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false)
public class RemoteReplicaRecoveryIT extends MigrationBaseTestCase {

private final String INDEX_NAME = "replica-recovery";

protected int maximumNumberOfShards() {
return 1;
}
Expand All @@ -44,7 +46,6 @@ protected int minimumNumberOfReplicas() {
Brings up new replica copies on remote and docrep nodes, when primary is on a remote node
Live indexing is happening meanwhile
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/13473")
public void testReplicaRecovery() throws Exception {
internalCluster().setBootstrapClusterManagerNodeIndex(0);
String primaryNode = internalCluster().startNode();
Expand All @@ -53,13 +54,13 @@ public void testReplicaRecovery() throws Exception {
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

// create shard with 0 replica and 1 shard
client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get();
String replicaNode = internalCluster().startNode();
ensureGreen("test");
AsyncIndexingService asyncIndexingService = new AsyncIndexingService("test");
client().admin().indices().prepareCreate(INDEX_NAME).setSettings(indexSettings()).setMapping("field", "type=text").get();
internalCluster().startNode();
ensureGreen(INDEX_NAME);
AsyncIndexingService asyncIndexingService = new AsyncIndexingService(INDEX_NAME);
asyncIndexingService.startIndexing();

refresh("test");
refresh(INDEX_NAME);

// add remote node in mixed mode cluster
setAddRemote(true);
Expand All @@ -77,7 +78,7 @@ public void testReplicaRecovery() throws Exception {
client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand("test", 0, primaryNode, remoteNode))
.add(new MoveAllocationCommand(INDEX_NAME, 0, primaryNode, remoteNode))
.execute()
.actionGet();

Expand All @@ -89,42 +90,38 @@ public void testReplicaRecovery() throws Exception {
client().admin()
.indices()
.updateSettings(
new UpdateSettingsRequest("test").settings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 3)
.put("index.routing.allocation.exclude._name", remoteNode)
.build()
)
new UpdateSettingsRequest(INDEX_NAME).settings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 3).build())
)
.get();

waitForRelocation();
asyncIndexingService.stopIndexing();
refresh("test");
refreshAndWaitForReplication(INDEX_NAME);
ensureGreen(INDEX_NAME);

// segrep lag should be zero
assertBusy(() -> {
SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin()
.indices()
.prepareSegmentReplicationStats("test")
.prepareSegmentReplicationStats(INDEX_NAME)
.setDetailed(true)
.execute()
.actionGet();
SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats().get("test").get(0);
SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).get(0);
assertEquals(segmentReplicationStatsResponse.getReplicationStats().size(), 1);
perGroupStats.getReplicaStats().stream().forEach(e -> assertEquals(e.getCurrentReplicationLagMillis(), 0));
}, 20, TimeUnit.SECONDS);

OpenSearchAssertions.assertHitCount(
client().prepareSearch("test").setTrackTotalHits(true).get(),
client().prepareSearch(INDEX_NAME).setTrackTotalHits(true).get(),
asyncIndexingService.getIndexedDocs()
);
OpenSearchAssertions.assertHitCount(
client().prepareSearch("test")
client().prepareSearch(INDEX_NAME)
.setTrackTotalHits(true)// extra paranoia ;)
.setQuery(QueryBuilders.termQuery("auto", true))
.get(),
asyncIndexingService.getIndexedDocs()
asyncIndexingService.getSingleIndexedDocs()
);

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public void testEndToEndRemoteMigration() throws Exception {
.setTrackTotalHits(true)// extra paranoia ;)
.setQuery(QueryBuilders.termQuery("auto", true))
.get(),
asyncIndexingService.getIndexedDocs()
asyncIndexingService.getSingleIndexedDocs()
);
}

Expand Down
Loading
Loading