Skip to content

Commit 1fcdf52

Browse files
committed
Add UTs
Signed-off-by: Ashish Singh <[email protected]>
1 parent ffc81c0 commit 1fcdf52

File tree

4 files changed

+338
-14
lines changed

4 files changed

+338
-14
lines changed

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/StatsMetricPublisher.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ public void publish(MetricCollection metricCollection) {
9595
public void close() {}
9696
};
9797

98+
public MetricPublisher getDeleteObjectsMetricPublisher() {
99+
return deleteObjectsMetricPublisher;
100+
}
101+
98102
public MetricPublisher getObjectMetricPublisher = new MetricPublisher() {
99103
@Override
100104
public void publish(MetricCollection metricCollection) {

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/S3AsyncDeleteHelper.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,7 @@ static List<List<String>> createDeleteBatches(List<String> keys, int bulkDeleteS
5050
return batches;
5151
}
5252

53-
private static CompletableFuture<Void> executeDeleteBatches(
54-
S3AsyncClient s3AsyncClient,
55-
S3BlobStore blobStore,
56-
List<List<String>> batches
57-
) {
53+
static CompletableFuture<Void> executeDeleteBatches(S3AsyncClient s3AsyncClient, S3BlobStore blobStore, List<List<String>> batches) {
5854
CompletableFuture<Void> allDeletesFuture = CompletableFuture.completedFuture(null);
5955

6056
for (List<String> batch : batches) {
@@ -64,16 +60,12 @@ private static CompletableFuture<Void> executeDeleteBatches(
6460
return allDeletesFuture;
6561
}
6662

67-
private static CompletableFuture<Void> executeSingleDeleteBatch(
68-
S3AsyncClient s3AsyncClient,
69-
S3BlobStore blobStore,
70-
List<String> batch
71-
) {
63+
static CompletableFuture<Void> executeSingleDeleteBatch(S3AsyncClient s3AsyncClient, S3BlobStore blobStore, List<String> batch) {
7264
DeleteObjectsRequest deleteRequest = bulkDelete(blobStore.bucket(), batch, blobStore);
7365
return s3AsyncClient.deleteObjects(deleteRequest).thenApply(S3AsyncDeleteHelper::processDeleteResponse);
7466
}
7567

76-
private static Void processDeleteResponse(DeleteObjectsResponse deleteObjectsResponse) {
68+
static Void processDeleteResponse(DeleteObjectsResponse deleteObjectsResponse) {
7769
if (!deleteObjectsResponse.errors().isEmpty()) {
7870
logger.warn(
7971
() -> new ParameterizedMessage(
@@ -88,7 +80,7 @@ private static Void processDeleteResponse(DeleteObjectsResponse deleteObjectsRes
8880
return null;
8981
}
9082

91-
private static DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs, S3BlobStore blobStore) {
83+
static DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs, S3BlobStore blobStore) {
9284
return DeleteObjectsRequest.builder()
9385
.bucket(bucket)
9486
.delete(
@@ -97,7 +89,7 @@ private static DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs
9789
.quiet(true)
9890
.build()
9991
)
100-
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().deleteObjectsMetricPublisher))
92+
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().getDeleteObjectsMetricPublisher()))
10193
.build();
10294
}
10395
}

plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
7070
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
7171
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
72+
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Publisher;
7273

7374
import org.opensearch.action.LatchedActionListener;
7475
import org.opensearch.common.blobstore.BlobContainer;
@@ -101,16 +102,19 @@
101102
import java.util.concurrent.CompletableFuture;
102103
import java.util.concurrent.CountDownLatch;
103104
import java.util.concurrent.atomic.AtomicInteger;
105+
import java.util.concurrent.atomic.AtomicReference;
104106
import java.util.stream.Collectors;
105107
import java.util.stream.IntStream;
106108

107109
import org.mockito.ArgumentCaptor;
108110
import org.mockito.ArgumentMatchers;
111+
import org.reactivestreams.Subscriber;
112+
import org.reactivestreams.Subscription;
109113

110114
import static org.hamcrest.Matchers.equalTo;
111115
import static org.hamcrest.Matchers.instanceOf;
116+
import static org.mockito.ArgumentMatchers.any;
112117
import static org.mockito.ArgumentMatchers.eq;
113-
import static org.mockito.Mockito.any;
114118
import static org.mockito.Mockito.doAnswer;
115119
import static org.mockito.Mockito.mock;
116120
import static org.mockito.Mockito.times;
@@ -1275,6 +1279,94 @@ public void testTransformResponseToInputStreamContainer() throws Exception {
12751279
assertEquals(inputStream.available(), inputStreamContainer.getInputStream().available());
12761280
}
12771281

1282+
public void testDeleteAsync() throws Exception {
1283+
final String bucketName = randomAlphaOfLengthBetween(1, 10);
1284+
final BlobPath blobPath = new BlobPath();
1285+
1286+
final S3BlobStore blobStore = mock(S3BlobStore.class);
1287+
when(blobStore.bucket()).thenReturn(bucketName);
1288+
when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher());
1289+
when(blobStore.getBulkDeletesSize()).thenReturn(1);
1290+
1291+
final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class);
1292+
final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class);
1293+
when(blobStore.asyncClientReference()).thenReturn(asyncClientReference);
1294+
AmazonAsyncS3WithCredentials amazonAsyncS3WithCredentials = AmazonAsyncS3WithCredentials.create(
1295+
s3AsyncClient,
1296+
s3AsyncClient,
1297+
s3AsyncClient,
1298+
null
1299+
);
1300+
when(asyncClientReference.get()).thenReturn(amazonAsyncS3WithCredentials);
1301+
1302+
final List<S3Object> s3Objects = Arrays.asList(
1303+
S3Object.builder().key("key1").size(100L).build(),
1304+
S3Object.builder().key("key2").size(200L).build(),
1305+
S3Object.builder().key("key3").size(300L).build()
1306+
);
1307+
1308+
final ListObjectsV2Response response1 = ListObjectsV2Response.builder().contents(s3Objects.subList(0, 2)).build();
1309+
final ListObjectsV2Response response2 = ListObjectsV2Response.builder().contents(s3Objects.subList(2, 3)).build();
1310+
1311+
final ListObjectsV2Publisher listPublisher = mock(ListObjectsV2Publisher.class);
1312+
AtomicInteger counter = new AtomicInteger();
1313+
doAnswer(invocation -> {
1314+
Subscriber<ListObjectsV2Response> subscriber = invocation.getArgument(0);
1315+
subscriber.onSubscribe(new Subscription() {
1316+
@Override
1317+
public void request(long n) {
1318+
int currentCounter = counter.getAndIncrement();
1319+
if (currentCounter == 0) {
1320+
subscriber.onNext(response1);
1321+
}
1322+
if (currentCounter == 1) {
1323+
subscriber.onNext(response2);
1324+
}
1325+
if (currentCounter == 2) {
1326+
subscriber.onComplete();
1327+
}
1328+
}
1329+
1330+
@Override
1331+
public void cancel() {}
1332+
});
1333+
return null;
1334+
}).when(listPublisher).subscribe(any(Subscriber.class));
1335+
1336+
when(s3AsyncClient.listObjectsV2Paginator(any(ListObjectsV2Request.class))).thenReturn(listPublisher);
1337+
1338+
when(s3AsyncClient.deleteObjects(any(DeleteObjectsRequest.class))).thenReturn(
1339+
CompletableFuture.completedFuture(DeleteObjectsResponse.builder().build())
1340+
);
1341+
1342+
final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);
1343+
1344+
CountDownLatch latch = new CountDownLatch(1);
1345+
AtomicReference<DeleteResult> deleteResultRef = new AtomicReference<>();
1346+
blobContainer.deleteAsync(new ActionListener<>() {
1347+
@Override
1348+
public void onResponse(DeleteResult deleteResult) {
1349+
deleteResultRef.set(deleteResult);
1350+
latch.countDown();
1351+
}
1352+
1353+
@Override
1354+
public void onFailure(Exception e) {
1355+
logger.error("exception during deleteAsync", e);
1356+
fail("Unexpected failure: " + e.getMessage());
1357+
}
1358+
});
1359+
1360+
latch.await();
1361+
1362+
DeleteResult deleteResult = deleteResultRef.get();
1363+
assertEquals(3, deleteResult.blobsDeleted());
1364+
assertEquals(600, deleteResult.bytesDeleted());
1365+
1366+
verify(s3AsyncClient, times(1)).listObjectsV2Paginator(any(ListObjectsV2Request.class));
1367+
verify(s3AsyncClient, times(3)).deleteObjects(any(DeleteObjectsRequest.class));
1368+
}
1369+
12781370
private void mockObjectResponse(S3AsyncClient s3AsyncClient, String bucketName, String blobName, int objectSize) {
12791371

12801372
final InputStream inputStream = new ByteArrayInputStream(randomByteArrayOfLength(objectSize));

0 commit comments

Comments
 (0)