Skip to content

Commit cd5fcc7

Browse files
committed
Fix remote recovery IT
Signed-off-by: Gaurav Bafna <[email protected]>
1 parent cf2c31f commit cd5fcc7

File tree

6 files changed

+108
-64
lines changed

6 files changed

+108
-64
lines changed

server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@
7878
import org.opensearch.core.common.Strings;
7979
import org.opensearch.core.common.breaker.CircuitBreaker;
8080
import org.opensearch.core.common.breaker.CircuitBreakingException;
81-
import org.opensearch.core.common.unit.ByteSizeUnit;
8281
import org.opensearch.core.common.unit.ByteSizeValue;
8382
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
8483
import org.opensearch.core.index.Index;
@@ -250,24 +249,6 @@ private void assertOnGoingRecoveryState(
250249
assertThat(state.getStage(), not(equalTo(Stage.DONE)));
251250
}
252251

253-
private void slowDownRecovery(ByteSizeValue shardSize) {
254-
long chunkSize = Math.max(1, shardSize.getBytes() / 10);
255-
assertTrue(
256-
client().admin()
257-
.cluster()
258-
.prepareUpdateSettings()
259-
.setTransientSettings(
260-
Settings.builder()
261-
// one chunk per sec..
262-
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), chunkSize, ByteSizeUnit.BYTES)
263-
// small chunks
264-
.put(INDICES_RECOVERY_CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(chunkSize, ByteSizeUnit.BYTES))
265-
)
266-
.get()
267-
.isAcknowledged()
268-
);
269-
}
270-
271252
private void restoreRecoverySpeed() {
272253
assertTrue(
273254
client().admin()

server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ public class MigrationBaseTestCase extends OpenSearchIntegTestCase {
4949
boolean addRemote = false;
5050
Settings extraSettings = Settings.EMPTY;
5151

52+
private static final int MIN_DOC_COUNT = 50;
53+
private static final int MAX_DOC_COUNT = 100;
54+
5255
private final List<String> documentKeys = List.of(
5356
randomAlphaOfLength(5),
5457
randomAlphaOfLength(5),
@@ -126,6 +129,10 @@ public BulkResponse indexBulk(String indexName, int numDocs) {
126129
return client().bulk(bulkRequest).actionGet();
127130
}
128131

132+
protected int numDocs() {
133+
return between(MIN_DOC_COUNT, MAX_DOC_COUNT);
134+
}
135+
129136
Map<String, Integer> getShardCountByNodeId() {
130137
final Map<String, Integer> shardCountByNodeId = new HashMap<>();
131138
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
@@ -148,9 +155,17 @@ private void indexSingleDoc(String indexName) {
148155
client().prepareIndex(indexName).setSource("auto", true).get();
149156
}
150157

158+
private int bulkIndex(String indexName) throws InterruptedException {
159+
final int numDocs = numDocs();
160+
indexBulk(indexName, numDocs);
161+
return numDocs;
162+
}
163+
151164
public class AsyncIndexingService {
152-
private String indexName;
153-
private AtomicLong indexedDocs = new AtomicLong(0);
165+
private final String indexName;
166+
private final AtomicLong indexedDocs = new AtomicLong(0);
167+
168+
private final AtomicLong singleIndexedDocs = new AtomicLong(0);
154169
private AtomicBoolean finished = new AtomicBoolean();
155170
private Thread indexingThread;
156171

@@ -174,17 +189,34 @@ public long getIndexedDocs() {
174189
return indexedDocs.get();
175190
}
176191

192+
public long getSingleIndexedDocs() {
193+
return singleIndexedDocs.get();
194+
}
195+
177196
private Thread getIndexingThread() {
178197
return new Thread(() -> {
198+
int iteration = 0;
179199
while (finished.get() == false) {
180-
indexSingleDoc(indexName);
181-
long currentDocCount = indexedDocs.incrementAndGet();
200+
long currentDocCount;
201+
if (rarely()) {
202+
indexSingleDoc(indexName);
203+
currentDocCount = indexedDocs.incrementAndGet();
204+
singleIndexedDocs.incrementAndGet();
205+
} else {
206+
try {
207+
currentDocCount = indexedDocs.addAndGet(bulkIndex(indexName));
208+
} catch (InterruptedException e) {
209+
throw new RuntimeException(e);
210+
}
211+
}
212+
iteration++;
213+
182214
if (currentDocCount > 0 && currentDocCount % refreshFrequency == 0) {
183215
if (rarely()) {
184-
logger.info("--> [iteration {}] flushing index", currentDocCount);
216+
logger.info("--> [iteration {}] flushing index", iteration);
185217
client().admin().indices().prepareFlush(indexName).get();
186218
} else {
187-
logger.info("--> [iteration {}] refreshing index", currentDocCount);
219+
logger.info("--> [iteration {}] refreshing index", iteration);
188220
client().admin().indices().prepareRefresh(indexName).get();
189221
}
190222
}

server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.opensearch.common.Priority;
2020
import org.opensearch.common.settings.Settings;
2121
import org.opensearch.common.unit.TimeValue;
22+
import org.opensearch.core.common.unit.ByteSizeValue;
2223
import org.opensearch.index.query.QueryBuilders;
2324
import org.opensearch.indices.recovery.RecoverySettings;
2425
import org.opensearch.plugins.Plugin;
@@ -38,6 +39,9 @@
3839

3940
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
4041
public class RemotePrimaryRelocationIT extends MigrationBaseTestCase {
42+
43+
private final String INDEX_NAME = "primary-relocation";
44+
4145
protected int maximumNumberOfShards() {
4246
return 1;
4347
}
@@ -58,14 +62,14 @@ public void testRemotePrimaryRelocation() throws Exception {
5862
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
5963

6064
// create shard with 0 replica and 1 shard
61-
client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get();
62-
ensureGreen("test");
65+
client().admin().indices().prepareCreate(INDEX_NAME).setSettings(indexSettings()).setMapping("field", "type=text").get();
66+
ensureGreen(INDEX_NAME);
6367

6468
AtomicInteger numAutoGenDocs = new AtomicInteger();
6569
final AtomicBoolean finished = new AtomicBoolean(false);
66-
AsyncIndexingService asyncIndexingService = new AsyncIndexingService("test");
70+
AsyncIndexingService asyncIndexingService = new AsyncIndexingService(INDEX_NAME);
6771
asyncIndexingService.startIndexing();
68-
refresh("test");
72+
refresh(INDEX_NAME);
6973

7074
// add remote node in mixed mode cluster
7175
setAddRemote(true);
@@ -86,7 +90,12 @@ public void testRemotePrimaryRelocation() throws Exception {
8690
// Index some more docs
8791
int currentDoc = numAutoGenDocs.get();
8892
int finalCurrentDoc1 = currentDoc;
89-
waitUntil(() -> numAutoGenDocs.get() > finalCurrentDoc1 + 5);
93+
waitUntil(() -> numAutoGenDocs.get() > finalCurrentDoc1 + 100);
94+
95+
ByteSizeValue shardSize = client().admin().indices().prepareStats(INDEX_NAME).execute().actionGet().getShards()[0].getStats()
96+
.getStore()
97+
.size();
98+
slowDownRecovery(shardSize);
9099

91100
// Change direction to remote store
92101
updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store"));
@@ -96,7 +105,7 @@ public void testRemotePrimaryRelocation() throws Exception {
96105
client().admin()
97106
.cluster()
98107
.prepareReroute()
99-
.add(new MoveAllocationCommand("test", 0, primaryNodeName("test"), remoteNode))
108+
.add(new MoveAllocationCommand(INDEX_NAME, 0, primaryNodeName(INDEX_NAME), remoteNode))
100109
.execute()
101110
.actionGet();
102111
ClusterHealthResponse clusterHealthResponse = client().admin()
@@ -109,7 +118,7 @@ public void testRemotePrimaryRelocation() throws Exception {
109118
.actionGet();
110119

111120
assertEquals(0, clusterHealthResponse.getRelocatingShards());
112-
assertEquals(remoteNode, primaryNodeName("test"));
121+
assertEquals(remoteNode, primaryNodeName(INDEX_NAME));
113122
logger.info("--> relocation from docrep to remote complete");
114123

115124
// Index some more docs
@@ -120,7 +129,7 @@ public void testRemotePrimaryRelocation() throws Exception {
120129
client().admin()
121130
.cluster()
122131
.prepareReroute()
123-
.add(new MoveAllocationCommand("test", 0, remoteNode, remoteNode2))
132+
.add(new MoveAllocationCommand(INDEX_NAME, 0, remoteNode, remoteNode2))
124133
.execute()
125134
.actionGet();
126135
clusterHealthResponse = client().admin()
@@ -133,23 +142,23 @@ public void testRemotePrimaryRelocation() throws Exception {
133142
.actionGet();
134143

135144
assertEquals(0, clusterHealthResponse.getRelocatingShards());
136-
assertEquals(remoteNode2, primaryNodeName("test"));
145+
assertEquals(remoteNode2, primaryNodeName(INDEX_NAME));
137146

138147
logger.info("--> relocation from remote to remote complete");
139148

140149
finished.set(true);
141150
asyncIndexingService.stopIndexing();
142-
refresh("test");
151+
refresh(INDEX_NAME);
143152
OpenSearchAssertions.assertHitCount(
144-
client().prepareSearch("test").setTrackTotalHits(true).get(),
153+
client().prepareSearch(INDEX_NAME).setTrackTotalHits(true).get(),
145154
asyncIndexingService.getIndexedDocs()
146155
);
147156
OpenSearchAssertions.assertHitCount(
148-
client().prepareSearch("test")
157+
client().prepareSearch(INDEX_NAME)
149158
.setTrackTotalHits(true)// extra paranoia ;)
150159
.setQuery(QueryBuilders.termQuery("auto", true))
151160
.get(),
152-
asyncIndexingService.getIndexedDocs()
161+
asyncIndexingService.getSingleIndexedDocs()
153162
);
154163
}
155164

@@ -161,13 +170,13 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception {
161170
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
162171

163172
// create shard with 0 replica and 1 shard
164-
client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get();
165-
ensureGreen("test");
173+
client().admin().indices().prepareCreate(INDEX_NAME).setSettings(indexSettings()).setMapping("field", "type=text").get();
174+
ensureGreen(INDEX_NAME);
166175

167-
AsyncIndexingService asyncIndexingService = new AsyncIndexingService("test");
176+
AsyncIndexingService asyncIndexingService = new AsyncIndexingService(INDEX_NAME);
168177
asyncIndexingService.startIndexing();
169178

170-
refresh("test");
179+
refresh(INDEX_NAME);
171180

172181
// add remote node in mixed mode cluster
173182
setAddRemote(true);
@@ -186,7 +195,12 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception {
186195
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
187196

188197
logger.info("--> relocating from {} to {} ", docRepNode, remoteNode);
189-
client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, docRepNode, remoteNode)).execute().actionGet();
198+
client().admin()
199+
.cluster()
200+
.prepareReroute()
201+
.add(new MoveAllocationCommand(INDEX_NAME, 0, docRepNode, remoteNode))
202+
.execute()
203+
.actionGet();
190204
ClusterHealthResponse clusterHealthResponse = client().admin()
191205
.cluster()
192206
.prepareHealth()
@@ -205,7 +219,7 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception {
205219
.waitForNoInitializingShards(true);
206220
ClusterHealthResponse actionGet = client().admin().cluster().health(healthRequest).actionGet();
207221
assertEquals(actionGet.getRelocatingShards(), 0);
208-
assertEquals(docRepNode, primaryNodeName("test"));
222+
assertEquals(docRepNode, primaryNodeName(INDEX_NAME));
209223

210224
asyncIndexingService.stopIndexing();
211225
client().admin()

server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteReplicaRecoveryIT.java

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false)
3232
public class RemoteReplicaRecoveryIT extends MigrationBaseTestCase {
3333

34+
private final String INDEX_NAME = "replica-recovery";
35+
3436
protected int maximumNumberOfShards() {
3537
return 1;
3638
}
@@ -47,7 +49,6 @@ protected int minimumNumberOfReplicas() {
4749
Brings up new replica copies on remote and docrep nodes, when primary is on a remote node
4850
Live indexing is happening meanwhile
4951
*/
50-
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/13473")
5152
public void testReplicaRecovery() throws Exception {
5253
internalCluster().setBootstrapClusterManagerNodeIndex(0);
5354
String primaryNode = internalCluster().startNode();
@@ -56,13 +57,13 @@ public void testReplicaRecovery() throws Exception {
5657
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
5758

5859
// create shard with 0 replica and 1 shard
59-
client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get();
60-
String replicaNode = internalCluster().startNode();
61-
ensureGreen("test");
62-
AsyncIndexingService asyncIndexingService = new AsyncIndexingService("test");
60+
client().admin().indices().prepareCreate(INDEX_NAME).setSettings(indexSettings()).setMapping("field", "type=text").get();
61+
internalCluster().startNode();
62+
ensureGreen(INDEX_NAME);
63+
AsyncIndexingService asyncIndexingService = new AsyncIndexingService(INDEX_NAME);
6364
asyncIndexingService.startIndexing();
6465

65-
refresh("test");
66+
refresh(INDEX_NAME);
6667

6768
// add remote node in mixed mode cluster
6869
setAddRemote(true);
@@ -80,7 +81,7 @@ public void testReplicaRecovery() throws Exception {
8081
client().admin()
8182
.cluster()
8283
.prepareReroute()
83-
.add(new MoveAllocationCommand("test", 0, primaryNode, remoteNode))
84+
.add(new MoveAllocationCommand(INDEX_NAME, 0, primaryNode, remoteNode))
8485
.execute()
8586
.actionGet();
8687
ClusterHealthResponse clusterHealthResponse = client().admin()
@@ -100,12 +101,7 @@ public void testReplicaRecovery() throws Exception {
100101
client().admin()
101102
.indices()
102103
.updateSettings(
103-
new UpdateSettingsRequest("test").settings(
104-
Settings.builder()
105-
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 3)
106-
.put("index.routing.allocation.exclude._name", remoteNode)
107-
.build()
108-
)
104+
new UpdateSettingsRequest(INDEX_NAME).settings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 3).build())
109105
)
110106
.get();
111107

@@ -121,31 +117,32 @@ public void testReplicaRecovery() throws Exception {
121117

122118
assertEquals(0, clusterHealthResponse.getRelocatingShards());
123119
asyncIndexingService.stopIndexing();
124-
refresh("test");
120+
refreshAndWaitForReplication(INDEX_NAME);
121+
ensureGreen(INDEX_NAME);
125122

126123
// segrep lag should be zero
127124
assertBusy(() -> {
128125
SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin()
129126
.indices()
130-
.prepareSegmentReplicationStats("test")
127+
.prepareSegmentReplicationStats(INDEX_NAME)
131128
.setDetailed(true)
132129
.execute()
133130
.actionGet();
134-
SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats().get("test").get(0);
131+
SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).get(0);
135132
assertEquals(segmentReplicationStatsResponse.getReplicationStats().size(), 1);
136133
perGroupStats.getReplicaStats().stream().forEach(e -> assertEquals(e.getCurrentReplicationLagMillis(), 0));
137134
}, 20, TimeUnit.SECONDS);
138135

139136
OpenSearchAssertions.assertHitCount(
140-
client().prepareSearch("test").setTrackTotalHits(true).get(),
137+
client().prepareSearch(INDEX_NAME).setTrackTotalHits(true).get(),
141138
asyncIndexingService.getIndexedDocs()
142139
);
143140
OpenSearchAssertions.assertHitCount(
144-
client().prepareSearch("test")
141+
client().prepareSearch(INDEX_NAME)
145142
.setTrackTotalHits(true)// extra paranoia ;)
146143
.setQuery(QueryBuilders.termQuery("auto", true))
147144
.get(),
148-
asyncIndexingService.getIndexedDocs()
145+
asyncIndexingService.getSingleIndexedDocs()
149146
);
150147

151148
}

server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ public void testEndToEndRemoteMigration() throws Exception {
215215
.setTrackTotalHits(true)// extra paranoia ;)
216216
.setQuery(QueryBuilders.termQuery("auto", true))
217217
.get(),
218-
asyncIndexingService.getIndexedDocs()
218+
asyncIndexingService.getSingleIndexedDocs()
219219
);
220220
}
221221
}

0 commit comments

Comments
 (0)