Skip to content

Commit 08154aa

Browse files
committed
Integrate async deletion in the snapshot interactions
Signed-off-by: Ashish Singh <[email protected]>
1 parent 932a03e commit 08154aa

File tree

4 files changed

+61
-4
lines changed

4 files changed

+61
-4
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1010
- MultiTermQueries in keyword fields now default to `indexed` approach and gated behind cluster setting ([#15637](https://github.com/opensearch-project/OpenSearch/pull/15637))
1111
- Making _cat/allocation API use indexLevelStats ([#15292](https://github.com/opensearch-project/OpenSearch/pull/15292))
1212
- Memory optimisations in _cluster/health API ([#15492](https://github.com/opensearch-project/OpenSearch/pull/15492))
13+
- Add support for async deletion in S3BlobContainer ([#15621](https://github.com/opensearch-project/OpenSearch/pull/15621))
1314

1415
### Dependencies
1516
- Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578))

plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
153153
// Disable request throttling because some random values in tests might generate too many failures for the S3 client
154154
.put(S3ClientSettings.USE_THROTTLE_RETRIES_SETTING.getConcreteSettingForNamespace("test").getKey(), false)
155155
.put(S3ClientSettings.PROXY_TYPE_SETTING.getConcreteSettingForNamespace("test").getKey(), ProxySettings.ProxyType.DIRECT)
156+
.put(BlobStoreRepository.SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING.getKey(), false)
156157
.put(super.nodeSettings(nodeOrdinal))
157158
.setSecureSettings(secureSettings);
158159

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -779,7 +779,10 @@ public void apply(Settings value, Settings current, Settings previous) {
779779
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED,
780780
RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX,
781781
RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX,
782+
783+
// Snapshot related Settings
782784
BlobStoreRepository.SNAPSHOT_SHARD_PATH_PREFIX_SETTING,
785+
BlobStoreRepository.SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING,
783786

784787
SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING,
785788

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.opensearch.action.ActionRunnable;
5151
import org.opensearch.action.StepListener;
5252
import org.opensearch.action.support.GroupedActionListener;
53+
import org.opensearch.action.support.PlainActionFuture;
5354
import org.opensearch.cluster.ClusterState;
5455
import org.opensearch.cluster.ClusterStateUpdateTask;
5556
import org.opensearch.cluster.RepositoryCleanupInProgress;
@@ -69,6 +70,7 @@
6970
import org.opensearch.common.Randomness;
7071
import org.opensearch.common.SetOnce;
7172
import org.opensearch.common.UUIDs;
73+
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
7274
import org.opensearch.common.blobstore.BlobContainer;
7375
import org.opensearch.common.blobstore.BlobMetadata;
7476
import org.opensearch.common.blobstore.BlobPath;
@@ -180,6 +182,7 @@
180182
import java.util.Set;
181183
import java.util.concurrent.BlockingQueue;
182184
import java.util.concurrent.ConcurrentHashMap;
185+
import java.util.concurrent.ExecutionException;
183186
import java.util.concurrent.Executor;
184187
import java.util.concurrent.LinkedBlockingQueue;
185188
import java.util.concurrent.TimeUnit;
@@ -353,6 +356,16 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
353356
Setting.Property.Final
354357
);
355358

359+
/**
360+
* Controls the fixed prefix for the snapshot shard blob path. cluster.snapshot.async-deletion.enable
361+
*/
362+
public static final Setting<Boolean> SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING = Setting.boolSetting(
363+
"cluster.snapshot.async-deletion.enable",
364+
true,
365+
Setting.Property.NodeScope,
366+
Setting.Property.Dynamic
367+
);
368+
356369
protected volatile boolean supportURLRepo;
357370

358371
private volatile int maxShardBlobDeleteBatch;
@@ -446,6 +459,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
446459

447460
private final String snapshotShardPathPrefix;
448461

462+
private volatile boolean enableAsyncDeletion;
463+
449464
/**
450465
* Flag that is set to {@code true} if this instance is started with {@link #metadata} that has a higher value for
451466
* {@link RepositoryMetadata#pendingGeneration()} than for {@link RepositoryMetadata#generation()} indicating a full cluster restart
@@ -498,6 +513,8 @@ protected BlobStoreRepository(
498513
this.recoverySettings = recoverySettings;
499514
this.remoteStoreSettings = new RemoteStoreSettings(clusterService.getSettings(), clusterService.getClusterSettings());
500515
this.snapshotShardPathPrefix = SNAPSHOT_SHARD_PATH_PREFIX_SETTING.get(clusterService.getSettings());
516+
this.enableAsyncDeletion = SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING.get(clusterService.getSettings());
517+
clusterService.getClusterSettings().addSettingsUpdateConsumer(SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING, this::setEnableAsyncDeletion);
501518
}
502519

503520
@Override
@@ -2082,7 +2099,7 @@ private void executeOneStaleIndexDelete(
20822099
}
20832100

20842101
// Finally, we delete the [base_path]/indexId folder
2085-
deleteResult = deleteResult.add(indexEntry.getValue().delete()); // Deleting the index folder
2102+
deleteResult = deleteResult.add(deleteContainer(indexEntry.getValue())); // Deleting the index folder
20862103
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId);
20872104
return deleteResult;
20882105
} catch (IOException e) {
@@ -2115,6 +2132,21 @@ private void executeOneStaleIndexDelete(
21152132
}));
21162133
}
21172134

2135+
private DeleteResult deleteContainer(BlobContainer container) throws IOException {
2136+
long startTime = System.nanoTime();
2137+
DeleteResult deleteResult;
2138+
if (enableAsyncDeletion && container instanceof AsyncMultiStreamBlobContainer) {
2139+
// Use deleteAsync and wait for the result
2140+
PlainActionFuture<DeleteResult> future = new PlainActionFuture<>();
2141+
((AsyncMultiStreamBlobContainer) container).deleteAsync(future);
2142+
deleteResult = future.actionGet();
2143+
} else {
2144+
deleteResult = container.delete();
2145+
}
2146+
logger.debug(new ParameterizedMessage("[{}] Deleted {} in {}ns", metadata.name(), container.path(), startTime - System.nanoTime()));
2147+
return deleteResult;
2148+
}
2149+
21182150
/**
21192151
* Cleans up the remote store directory if needed.
21202152
* <p> This method cleans up segments in the remote store directory for deleted indices.
@@ -2318,7 +2350,7 @@ void releaseRemoteStoreLocksAndCleanup(
23182350
* @return A DeleteResult object representing the result of the deletion operation.
23192351
* @throws IOException If an I/O error occurs during the deletion process.
23202352
*/
2321-
private DeleteResult deleteShardData(ShardInfo shardInfo) throws IOException {
2353+
private DeleteResult deleteShardData(ShardInfo shardInfo) throws IOException, ExecutionException, InterruptedException {
23222354
// If the provided ShardInfo is null, return a zero DeleteResult
23232355
if (shardInfo == null) {
23242356
return DeleteResult.ZERO;
@@ -2330,7 +2362,7 @@ private DeleteResult deleteShardData(ShardInfo shardInfo) throws IOException {
23302362
// Iterate over the shards and delete each shard's data
23312363
for (int i = 0; i < shardInfo.getShardCount(); i++) {
23322364
// Call the delete method on the shardContainer and accumulate the result
2333-
deleteResult = deleteResult.add(shardContainer(shardInfo.getIndexId(), i).delete());
2365+
deleteResult = deleteResult.add(deleteContainer(shardContainer(shardInfo.getIndexId(), i)));
23342366
}
23352367

23362368
// Return the accumulated DeleteResult
@@ -2714,7 +2746,23 @@ public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, Sna
27142746

27152747
private void deleteFromContainer(BlobContainer container, List<String> blobs) throws IOException {
27162748
logger.trace(() -> new ParameterizedMessage("[{}] Deleting {} from [{}]", metadata.name(), blobs, container.path()));
2717-
container.deleteBlobsIgnoringIfNotExists(blobs);
2749+
long startTime = System.nanoTime();
2750+
if (enableAsyncDeletion && container instanceof AsyncMultiStreamBlobContainer) {
2751+
PlainActionFuture<Void> future = new PlainActionFuture<>();
2752+
((AsyncMultiStreamBlobContainer) container).deleteBlobsAsyncIgnoringIfNotExists(blobs, future);
2753+
future.actionGet();
2754+
} else {
2755+
container.deleteBlobsIgnoringIfNotExists(blobs);
2756+
}
2757+
logger.debug(
2758+
() -> new ParameterizedMessage(
2759+
"[{}] Deletion {} from [{}] took {}ns",
2760+
metadata.name(),
2761+
blobs,
2762+
container.path(),
2763+
System.nanoTime() - startTime
2764+
)
2765+
);
27182766
}
27192767

27202768
private BlobPath indicesPath() {
@@ -4565,4 +4613,8 @@ public String toString() {
45654613
return name;
45664614
}
45674615
}
4616+
4617+
public void setEnableAsyncDeletion(boolean enableAsyncDeletion) {
4618+
this.enableAsyncDeletion = enableAsyncDeletion;
4619+
}
45684620
}

0 commit comments

Comments
 (0)