Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.repositories.s3;

import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;

public class S3AsyncDeleteHelper {
private static final Logger logger = LogManager.getLogger(S3AsyncDeleteHelper.class);

static CompletableFuture<Void> executeDeleteChain(
S3AsyncClient s3AsyncClient,
S3BlobStore blobStore,
List<String> objectsToDelete,
CompletableFuture<Void> currentChain,
boolean ignoreIfNotExists,
Runnable afterDeleteAction
) {
List<List<String>> batches = createDeleteBatches(objectsToDelete, blobStore.getBulkDeletesSize());
CompletableFuture<Void> newChain = currentChain.thenCompose(
v -> executeDeleteBatches(s3AsyncClient, blobStore, batches, ignoreIfNotExists)
);
if (afterDeleteAction != null) {
newChain = newChain.thenRun(afterDeleteAction);
}
return newChain;
}

static List<List<String>> createDeleteBatches(List<String> keys, int bulkDeleteSize) {
List<List<String>> batches = new ArrayList<>();
for (int i = 0; i < keys.size(); i += bulkDeleteSize) {
batches.add(keys.subList(i, Math.min(keys.size(), i + bulkDeleteSize)));
}
return batches;
}

private static CompletableFuture<Void> executeDeleteBatches(
S3AsyncClient s3AsyncClient,
S3BlobStore blobStore,
List<List<String>> batches,
boolean ignoreIfNotExists
) {
CompletableFuture<Void> allDeletesFuture = CompletableFuture.completedFuture(null);

for (List<String> batch : batches) {
allDeletesFuture = allDeletesFuture.thenCompose(
v -> executeSingleDeleteBatch(s3AsyncClient, blobStore, batch, ignoreIfNotExists)
);
}

return allDeletesFuture;
}

private static CompletableFuture<Void> executeSingleDeleteBatch(
S3AsyncClient s3AsyncClient,
S3BlobStore blobStore,
List<String> batch,
boolean ignoreIfNotExists
) {
DeleteObjectsRequest deleteRequest = bulkDelete(blobStore.bucket(), batch, blobStore);
return s3AsyncClient.deleteObjects(deleteRequest)
.thenApply(response -> processDeleteResponse(response, ignoreIfNotExists))
.exceptionally(e -> {
if (!ignoreIfNotExists) {
throw new CompletionException(e);
}
logger.warn("Error during batch deletion", e);
return null;
});
}

private static Void processDeleteResponse(DeleteObjectsResponse deleteObjectsResponse, boolean ignoreIfNotExists) {
if (!deleteObjectsResponse.errors().isEmpty()) {
if (ignoreIfNotExists) {
logger.warn(
() -> new ParameterizedMessage(
"Failed to delete some blobs {}",
deleteObjectsResponse.errors()
.stream()
.map(s3Error -> "[" + s3Error.key() + "][" + s3Error.code() + "][" + s3Error.message() + "]")
.collect(Collectors.toList())
)
);
} else {
throw new CompletionException(new IOException("Failed to delete some blobs: " + deleteObjectsResponse.errors()));
}
}
return null;
}

private static DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs, S3BlobStore blobStore) {
return DeleteObjectsRequest.builder()
.bucket(bucket)
.delete(
Delete.builder()
.objects(blobs.stream().map(blob -> ObjectIdentifier.builder().key(blob).build()).collect(Collectors.toList()))
.quiet(true)
.build()
)
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().deleteObjectsMetricPublisher))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Publisher;
import software.amazon.awssdk.utils.CollectionUtils;

import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -109,6 +110,9 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import static org.opensearch.repositories.s3.S3Repository.MAX_FILE_SIZE;
import static org.opensearch.repositories.s3.S3Repository.MAX_FILE_SIZE_USING_MULTIPART;
import static org.opensearch.repositories.s3.S3Repository.MIN_PART_SIZE_USING_MULTIPART;
Expand Down Expand Up @@ -875,4 +879,131 @@ CompletableFuture<GetObjectAttributesResponse> getBlobMetadata(S3AsyncClient s3A

return SocketAccess.doPrivileged(() -> s3AsyncClient.getObjectAttributes(getObjectAttributesRequest));
}

@Override
public void deleteAsync(ActionListener<DeleteResult> completionListener) {
try (AmazonAsyncS3Reference asyncClientReference = blobStore.asyncClientReference()) {
S3AsyncClient s3AsyncClient = asyncClientReference.get().client();

ListObjectsV2Request listRequest = ListObjectsV2Request.builder().bucket(blobStore.bucket()).prefix(keyPath).build();
ListObjectsV2Publisher listPublisher = s3AsyncClient.listObjectsV2Paginator(listRequest);

AtomicLong deletedBlobs = new AtomicLong();
AtomicLong deletedBytes = new AtomicLong();

CompletableFuture<Void> listingFuture = new CompletableFuture<>();

listPublisher.subscribe(new Subscriber<ListObjectsV2Response>() {
private Subscription subscription;
private final List<String> objectsToDelete = new ArrayList<>();
private CompletableFuture<Void> deletionChain = CompletableFuture.completedFuture(null);

@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
subscription.request(1);
}

@Override
public void onNext(ListObjectsV2Response response) {
response.contents().forEach(s3Object -> {
deletedBlobs.incrementAndGet();
deletedBytes.addAndGet(s3Object.size());
objectsToDelete.add(s3Object.key());
});

int bulkDeleteSize = blobStore.getBulkDeletesSize();
if (objectsToDelete.size() >= bulkDeleteSize) {
int fullBatchesCount = objectsToDelete.size() / bulkDeleteSize;
int itemsToDelete = fullBatchesCount * bulkDeleteSize;

List<String> batchToDelete = new ArrayList<>(objectsToDelete.subList(0, itemsToDelete));
objectsToDelete.subList(0, itemsToDelete).clear();

deletionChain = S3AsyncDeleteHelper.executeDeleteChain(
s3AsyncClient,
blobStore,
batchToDelete,
deletionChain,
false,
() -> subscription.request(1)
);
} else {
subscription.request(1);
}
}

@Override
public void onError(Throwable t) {
listingFuture.completeExceptionally(new IOException("Failed to list objects for deletion", t));
}

@Override
public void onComplete() {
if (!objectsToDelete.isEmpty()) {
deletionChain = S3AsyncDeleteHelper.executeDeleteChain(
s3AsyncClient,
blobStore,
objectsToDelete,
deletionChain,
false,
null
);
}
deletionChain.whenComplete((v, throwable) -> {
if (throwable != null) {
listingFuture.completeExceptionally(throwable);
} else {
listingFuture.complete(null);
}
});
}
});

listingFuture.whenComplete((v, throwable) -> {
if (throwable != null) {
completionListener.onFailure(
throwable instanceof Exception
? (Exception) throwable
: new IOException("Unexpected error during async deletion", throwable)
);
} else {
completionListener.onResponse(new DeleteResult(deletedBlobs.get(), deletedBytes.get()));
}
});
} catch (Exception e) {
completionListener.onFailure(new IOException("Failed to initiate async deletion", e));
}
}

@Override
public void deleteBlobsAsyncIgnoringIfNotExists(List<String> blobNames, ActionListener<Void> completionListener) {
if (blobNames.isEmpty()) {
completionListener.onResponse(null);
return;
}

try (AmazonAsyncS3Reference asyncClientReference = blobStore.asyncClientReference()) {
S3AsyncClient s3AsyncClient = asyncClientReference.get().client();

List<String> keysToDelete = blobNames.stream().map(this::buildKey).collect(Collectors.toList());

S3AsyncDeleteHelper.executeDeleteChain(
s3AsyncClient,
blobStore,
keysToDelete,
CompletableFuture.completedFuture(null),
true,
null
).whenComplete((v, throwable) -> {
if (throwable != null) {
completionListener.onFailure(new IOException("Failed to delete blobs", throwable));
} else {
completionListener.onResponse(null);
}
});
} catch (Exception e) {
completionListener.onFailure(new IOException("Failed to initiate async blob deletion", e));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

/*
* SPDX-License-Identifier: Apache-2.0
*
Expand All @@ -12,6 +13,7 @@
import org.opensearch.common.StreamContext;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.DeleteResult;
import org.opensearch.common.blobstore.fs.FsBlobContainer;
import org.opensearch.common.blobstore.fs.FsBlobStore;
import org.opensearch.common.blobstore.stream.read.ReadContext;
Expand Down Expand Up @@ -146,4 +148,14 @@ public boolean remoteIntegrityCheckSupported() {
private boolean isSegmentFile(String filename) {
return !filename.endsWith(".tlog") && !filename.endsWith(".ckp");
}

@Override
public void deleteAsync(ActionListener<DeleteResult> completionListener) {
throw new UnsupportedOperationException("deleteAsync");
}

@Override
public void deleteBlobsAsyncIgnoringIfNotExists(List<String> blobNames, ActionListener<Void> completionListener) {
throw new UnsupportedOperationException("deleteBlobsAsyncIgnoringIfNotExists");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.core.action.ActionListener;

import java.io.IOException;
import java.util.List;

/**
* An extension of {@link BlobContainer} that adds {@link AsyncMultiStreamBlobContainer#asyncBlobUpload} to allow
Expand Down Expand Up @@ -48,4 +49,8 @@ public interface AsyncMultiStreamBlobContainer extends BlobContainer {
* by underlying blobContainer. In this case, caller doesn't need to ensure integrity of data.
*/
boolean remoteIntegrityCheckSupported();

void deleteAsync(ActionListener<DeleteResult> completionListener);

void deleteBlobsAsyncIgnoringIfNotExists(List<String> blobNames, ActionListener<Void> completionListener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,4 +171,14 @@ private InputStreamContainer decryptInputStreamContainer(InputStreamContainer in
return new InputStreamContainer(decryptedStream, adjustedLength, adjustedPos);
}
}

@Override
public void deleteAsync(ActionListener<DeleteResult> completionListener) {
blobContainer.deleteAsync(completionListener);
}

@Override
public void deleteBlobsAsyncIgnoringIfNotExists(List<String> blobNames, ActionListener<Void> completionListener) {
blobContainer.deleteBlobsAsyncIgnoringIfNotExists(blobNames, completionListener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.blobstore.DeleteResult;
import org.opensearch.common.blobstore.fs.FsBlobContainer;
import org.opensearch.common.blobstore.fs.FsBlobStore;
import org.opensearch.common.blobstore.stream.read.ReadContext;
Expand Down Expand Up @@ -51,6 +52,7 @@
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -340,5 +342,15 @@ public boolean remoteIntegrityCheckSupported() {
public BlobContainer getDelegate() {
return delegate;
}

@Override
public void deleteBlobsAsyncIgnoringIfNotExists(List<String> blobNames, ActionListener<Void> completionListener) {
throw new RuntimeException("deleteBlobsAsyncIgnoringIfNotExists not supported");
}

@Override
public void deleteAsync(ActionListener<DeleteResult> completionListener) {
throw new RuntimeException("deleteAsync not supported");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.blobstore.DeleteResult;
import org.opensearch.common.blobstore.fs.FsBlobContainer;
import org.opensearch.common.blobstore.fs.FsBlobStore;
import org.opensearch.common.blobstore.stream.read.ReadContext;
Expand All @@ -63,6 +64,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

Expand Down Expand Up @@ -335,5 +337,15 @@ public boolean remoteIntegrityCheckSupported() {
public BlobContainer getDelegate() {
return delegate;
}

@Override
public void deleteAsync(ActionListener<DeleteResult> completionListener) {
throw new RuntimeException("deleteAsync not supported");
}

@Override
public void deleteBlobsAsyncIgnoringIfNotExists(List<String> blobNames, ActionListener<Void> completionListener) {
throw new RuntimeException("deleteBlobsAsyncIgnoringIfNotExists not supported");
}
}
}