Skip to content

Commit 6b14155

Browse files
Bugfix in RemoteFsTimestampAwareTranslog.trimUnreferencedReaders (#16078)
Signed-off-by: Sachin Kale <[email protected]> (cherry picked from commit 1bddf2f) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent b41be86 commit 6b14155

File tree

4 files changed

+186
-1
lines changed

4 files changed

+186
-1
lines changed

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.remotestore;
1010

11+
import org.opensearch.action.support.IndicesOptions;
1112
import org.opensearch.common.blobstore.BlobPath;
1213
import org.opensearch.common.collect.Tuple;
1314
import org.opensearch.common.settings.Settings;
@@ -32,6 +33,7 @@
3233
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG;
3334
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
3435
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA;
36+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures;
3537

3638
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
3739
public class RemoteStorePinnedTimestampsGarbageCollectionIT extends RemoteStoreBaseIntegTestCase {
@@ -288,6 +290,79 @@ public void testLiveIndexWithPinnedTimestamps() throws Exception {
288290
});
289291
}
290292

293+
public void testLiveIndexWithPinnedTimestampsMultiplePrimaryTerms() throws Exception {
294+
prepareCluster(1, 2, Settings.EMPTY);
295+
Settings indexSettings = Settings.builder()
296+
.put(remoteStoreIndexSettings(1, 1))
297+
.put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 3)
298+
.build();
299+
createIndex(INDEX_NAME, indexSettings);
300+
ensureYellowAndNoInitializingShards(INDEX_NAME);
301+
ensureGreen(INDEX_NAME);
302+
303+
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);
304+
305+
RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
306+
RemoteStorePinnedTimestampService.class,
307+
primaryNodeName(INDEX_NAME)
308+
);
309+
310+
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));
311+
312+
int numDocs = randomIntBetween(5, 10);
313+
for (int i = 0; i < numDocs; i++) {
314+
keepPinnedTimestampSchedulerUpdated();
315+
indexSingleDoc(INDEX_NAME, true);
316+
if (i == 2) {
317+
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.timeValueMinutes(1));
318+
remoteStorePinnedTimestampService.pinTimestamp(System.currentTimeMillis(), "xyz", noOpActionListener);
319+
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);
320+
}
321+
}
322+
323+
ingestDocs();
324+
325+
internalCluster().restartNode(primaryNodeName(INDEX_NAME));
326+
ensureGreen(INDEX_NAME);
327+
328+
ingestDocs();
329+
330+
String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings());
331+
String shardDataPath = getShardLevelBlobPath(
332+
client(),
333+
INDEX_NAME,
334+
BlobPath.cleanPath(),
335+
"0",
336+
TRANSLOG,
337+
DATA,
338+
translogPathFixedPrefix
339+
).buildAsString();
340+
Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1");
341+
342+
assertBusy(() -> {
343+
List<Path> dataFiles = Files.list(translogDataPath).collect(Collectors.toList());
344+
assertFalse(dataFiles.isEmpty());
345+
});
346+
}
347+
348+
private void ingestDocs() {
349+
int numDocs = randomIntBetween(15, 20);
350+
for (int i = 0; i < numDocs; i++) {
351+
indexSingleDoc(INDEX_NAME, false);
352+
}
353+
354+
assertNoFailures(client().admin().indices().prepareRefresh(INDEX_NAME).setIndicesOptions(IndicesOptions.lenientExpandOpen()).get());
355+
flushAndRefresh(INDEX_NAME);
356+
357+
int numDocsPostFailover = randomIntBetween(15, 20);
358+
for (int i = 0; i < numDocsPostFailover; i++) {
359+
indexSingleDoc(INDEX_NAME, false);
360+
}
361+
362+
flushAndRefresh(INDEX_NAME);
363+
assertNoFailures(client().admin().indices().prepareRefresh(INDEX_NAME).setIndicesOptions(IndicesOptions.lenientExpandOpen()).get());
364+
}
365+
291366
public void testIndexDeletionNoPinnedTimestamps() throws Exception {
292367
prepareCluster(1, 1, Settings.EMPTY);
293368
Settings indexSettings = Settings.builder()

server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.concurrent.TimeUnit;
3131
import java.util.stream.Collectors;
3232

33+
import static org.opensearch.index.IndexSettings.INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING;
3334
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
3435
import static org.hamcrest.Matchers.equalTo;
3536
import static org.hamcrest.Matchers.greaterThan;
@@ -312,6 +313,107 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2MultipleSnapshots(
312313
// translogPostDeletionOfSnapshot1.size()), 60, TimeUnit.SECONDS);
313314
}
314315

316+
public void testRemoteStoreCleanupMultiplePrimaryOnSnapshotDeletion() throws Exception {
317+
disableRepoConsistencyCheck("Remote store repository is being used in the test");
318+
final Path remoteStoreRepoPath = randomRepoPath();
319+
Settings settings = remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath);
320+
settings = Settings.builder()
321+
.put(settings)
322+
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true)
323+
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), RemoteStoreEnums.PathType.FIXED.toString())
324+
.build();
325+
String clusterManagerName = internalCluster().startClusterManagerOnlyNode(settings);
326+
internalCluster().startDataOnlyNodes(3, settings);
327+
final Client clusterManagerClient = internalCluster().clusterManagerClient();
328+
ensureStableCluster(4);
329+
330+
RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
331+
RemoteStorePinnedTimestampService.class,
332+
clusterManagerName
333+
);
334+
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));
335+
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);
336+
337+
final String snapshotRepoName = "snapshot-repo-name";
338+
final Path snapshotRepoPath = randomRepoPath();
339+
createRepository(snapshotRepoName, "mock", snapshotRepoSettingsForShallowV2(snapshotRepoPath));
340+
341+
final String remoteStoreEnabledIndexName = "remote-index-1";
342+
final Settings remoteStoreEnabledIndexSettings = Settings.builder()
343+
.put(getRemoteStoreBackedIndexSettings())
344+
.put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 2)
345+
.build();
346+
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
347+
ensureGreen(remoteStoreEnabledIndexName);
348+
349+
// Create 2 snapshots for primary term 1
350+
keepPinnedTimestampSchedulerUpdated();
351+
indexRandomDocs(remoteStoreEnabledIndexName, 5);
352+
createSnapshot(snapshotRepoName, "snap1");
353+
keepPinnedTimestampSchedulerUpdated();
354+
indexRandomDocs(remoteStoreEnabledIndexName, 5);
355+
createSnapshot(snapshotRepoName, "snap2");
356+
357+
// Restart current primary to change the primary term
358+
internalCluster().restartNode(primaryNodeName(remoteStoreEnabledIndexName));
359+
ensureGreen(remoteStoreEnabledIndexName);
360+
361+
// Create 2 snapshots for primary term 2
362+
keepPinnedTimestampSchedulerUpdated();
363+
indexRandomDocs(remoteStoreEnabledIndexName, 5);
364+
createSnapshot(snapshotRepoName, "snap3");
365+
keepPinnedTimestampSchedulerUpdated();
366+
indexRandomDocs(remoteStoreEnabledIndexName, 5);
367+
createSnapshot(snapshotRepoName, "snap4");
368+
369+
String indexUUID = client().admin()
370+
.indices()
371+
.prepareGetSettings(remoteStoreEnabledIndexName)
372+
.get()
373+
.getSetting(remoteStoreEnabledIndexName, IndexMetadata.SETTING_INDEX_UUID);
374+
375+
Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID);
376+
Path shardPath = Path.of(String.valueOf(indexPath), "0");
377+
Path translogPath = Path.of(String.valueOf(shardPath), "translog", "data", "1");
378+
379+
// Deleting snap1 will still keep files in primary term 1 due to snap2
380+
deleteSnapshot(clusterManagerClient, snapshotRepoName, "snap1");
381+
assertTrue(RemoteStoreBaseIntegTestCase.getFileCount(translogPath) > 0);
382+
383+
// Deleting snap2 will not remove primary term 1 as we need to trigger trimUnreferencedReaders once
384+
deleteSnapshot(clusterManagerClient, snapshotRepoName, "snap2");
385+
assertTrue(RemoteStoreBaseIntegTestCase.getFileCount(translogPath) > 0);
386+
387+
// Index a doc to trigger trimUnreferencedReaders
388+
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);
389+
keepPinnedTimestampSchedulerUpdated();
390+
indexRandomDocs(remoteStoreEnabledIndexName, 5);
391+
392+
assertBusy(() -> assertFalse(Files.exists(translogPath)), 30, TimeUnit.SECONDS);
393+
}
394+
395+
private void createSnapshot(String repoName, String snapshotName) {
396+
CreateSnapshotResponse createSnapshotResponse = client().admin()
397+
.cluster()
398+
.prepareCreateSnapshot(repoName, snapshotName)
399+
.setWaitForCompletion(true)
400+
.get();
401+
SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo();
402+
403+
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
404+
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
405+
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
406+
assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName));
407+
}
408+
409+
private void deleteSnapshot(Client clusterManagerClient, String repoName, String snapshotName) {
410+
AcknowledgedResponse deleteSnapshotResponse = clusterManagerClient.admin()
411+
.cluster()
412+
.prepareDeleteSnapshot(repoName, snapshotName)
413+
.get();
414+
assertAcked(deleteSnapshotResponse);
415+
}
416+
315417
private Settings snapshotV2Settings(Path remoteStoreRepoPath) {
316418
Settings settings = Settings.builder()
317419
.put(remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath))

server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public void trimUnreferencedReaders() throws IOException {
121121
protected void trimUnreferencedReaders(boolean indexDeleted, boolean trimLocal) throws IOException {
122122
if (trimLocal) {
123123
// clean up local translog files and updates readers
124-
super.trimUnreferencedReaders();
124+
super.trimUnreferencedReaders(true);
125125
}
126126

127127
// Update file tracker to reflect local translog state

server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -549,9 +549,17 @@ protected Releasable drainSync() {
549549

550550
@Override
551551
public void trimUnreferencedReaders() throws IOException {
552+
trimUnreferencedReaders(false);
553+
}
554+
555+
protected void trimUnreferencedReaders(boolean onlyTrimLocal) throws IOException {
552556
// clean up local translog files and updates readers
553557
super.trimUnreferencedReaders();
554558

559+
if (onlyTrimLocal) {
560+
return;
561+
}
562+
555563
// This is to ensure that after the permits are acquired during primary relocation, there are no further modification on remote
556564
// store.
557565
if (startedPrimarySupplier.getAsBoolean() == false || pauseSync.get()) {

0 commit comments

Comments
 (0)