Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
70ee923
git commit -m "UTs for : executeMultipartUploadIfEtagMatches"
tqranjan Apr 27, 2025
2c31c83
git commit -m "Implementation for : executeMultipartUploadIfEtagMatches"
tqranjan Apr 27, 2025
eb3778c
Adds a constant HTTP_STATUS_PRECONDITION_FAILED with value 412
tqranjan Apr 28, 2025
44d45e8
UploadRequest : Add ConditionalWriteOptions
tqranjan May 27, 2025
a31d15e
Add ConditionalWrite utility for conditional blob store writes
tqranjan May 27, 2025
a1df80a
Adds logic to Upload an object to S3 conditionally using the async cl…
tqranjan May 27, 2025
2c2c72a
UTs for async Conditional upload logic
tqranjan May 27, 2025
f0857dc
Test Suite for ASync Conditional upload process
tqranjan May 28, 2025
062a76d
S3 Blobcontainer Implementation for Async Conditional upload
tqranjan May 28, 2025
2f8f4de
Javadoc for builder
tqranjan May 28, 2025
9fdefaa
asyncBlobUploadConditionally : Interfaces for AsyncMultiStreamBlobCon…
tqranjan May 28, 2025
d8d8f9a
Log fix
tqranjan May 28, 2025
7a7e240
Merge branch 'main' into pr-branch-conditionalMultipartUpload
x-INFiN1TY-x May 29, 2025
5cfeb6e
SpotlessApply post Conflict resolution
tqranjan May 29, 2025
0a90680
Conflict resolution in executeMultipartUploadConditionally
tqranjan May 29, 2025
cbc0188
Conflict resolution in executeMultipartUploadConditionally's UTs
tqranjan May 29, 2025
0eedd2c
Conflict resolution in S3BlobContainerMockClientTests & AsyncTransfer…
tqranjan May 29, 2025
0e31c62
S3BlobContainer Refactor
tqranjan Jun 11, 2025
50498ac
AsyncTransferManager Refactor
tqranjan Jun 11, 2025
656a3a1
removed stale_primary_shard : generic rewording
tqranjan Jun 11, 2025
11dc264
AsyncTransferManagerTests.java refactor
tqranjan Jun 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CommonPrefix;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
Expand All @@ -53,6 +54,7 @@
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.ObjectAttributes;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
Expand All @@ -63,6 +65,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.common.Nullable;
import org.opensearch.common.SetOnce;
Expand Down Expand Up @@ -97,6 +100,7 @@
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand All @@ -117,6 +121,7 @@ class S3BlobContainer extends AbstractBlobContainer implements AsyncMultiStreamB

private final S3BlobStore blobStore;
private final String keyPath;
public static final int HTTP_STATUS_PRECONDITION_FAILED = 412;

S3BlobContainer(BlobPath path, S3BlobStore blobStore) {
super(path);
Expand Down Expand Up @@ -509,6 +514,169 @@ private String buildKey(String blobName) {
return keyPath + blobName;
}

public void executeMultipartUploadIfEtagMatches(
Copy link
Contributor

@Bukhtawar Bukhtawar May 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this code getting consumed? I don't see a corresponding change in the generic blob store interface

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Diag

Sorry missed this, The executeMultipartUploadIfEtagMatches method (now renamed to executeMultipartUploadConditionally) is meant to be consumed via the writeBlobWithMetadataConditionally and writeBlobConditionally methods in the generic blob store interface. These interfaces are introduced in this PR currently under review.

final S3BlobStore blobStore,
final String blobName,
final InputStream input,
final long blobSize,
final Map<String, String> metadata,
final String eTag,
final ActionListener<String> etagListener
) throws IOException {

ensureMultiPartUploadSize(blobSize);

final long partSize = blobStore.bufferSizeInBytes();
final Tuple<Long, Long> multiparts = numberOfMultiparts(blobSize, partSize);
if (multiparts.v1() > Integer.MAX_VALUE) {
throw new IllegalArgumentException("Too many multipart upload parts; consider a larger buffer size.");
}
final int nbParts = multiparts.v1().intValue();
final long lastPartSize = multiparts.v2();
assert blobSize == (((nbParts - 1) * partSize) + lastPartSize) : "blobSize does not match multipart sizes";
// test
CreateMultipartUploadRequest.Builder createRequestBuilder = CreateMultipartUploadRequest.builder()
.bucket(blobStore.bucket())
.key(blobName)
.storageClass(blobStore.getStorageClass())
.acl(blobStore.getCannedACL())
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().multipartUploadMetricCollector));

if (metadata != null && !metadata.isEmpty()) {
createRequestBuilder.metadata(metadata);
}
if (blobStore.serverSideEncryption()) {
createRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
}

final CreateMultipartUploadRequest createMultipartUploadRequest = createRequestBuilder.build();
final SetOnce<String> uploadId = new SetOnce<>();
final String bucketName = blobStore.bucket();
boolean success = false;

final InputStream requestInputStream = blobStore.isUploadRetryEnabled()
? new BufferedInputStream(input, (int) (partSize + 1))
: input;

try (AmazonS3Reference clientReference = blobStore.clientReference()) {
uploadId.set(
SocketAccess.doPrivileged(() -> clientReference.get().createMultipartUpload(createMultipartUploadRequest).uploadId())
);
if (Strings.isEmpty(uploadId.get())) {
IOException exception = new IOException("Failed to initialize multipart upload for " + blobName);
etagListener.onFailure(exception);
throw exception;
}

final List<CompletedPart> parts = new ArrayList<>(nbParts);
long bytesCount = 0;

for (int i = 1; i <= nbParts; i++) {
long currentPartSize = (i < nbParts) ? partSize : lastPartSize;
final UploadPartRequest uploadPartRequest = UploadPartRequest.builder()
.bucket(bucketName)
.key(blobName)
.uploadId(uploadId.get())
.partNumber(i)
.contentLength(currentPartSize)
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().multipartUploadMetricCollector))
.build();

bytesCount += currentPartSize;

final UploadPartResponse uploadResponse = SocketAccess.doPrivileged(
() -> clientReference.get()
.uploadPart(uploadPartRequest, RequestBody.fromInputStream(requestInputStream, currentPartSize))
);

String partETag = uploadResponse.eTag();
if (partETag == null) {
IOException exception = new IOException(
String.format(Locale.ROOT, "S3 part upload for [%s] part [%d] returned null ETag", blobName, i)
);
etagListener.onFailure(exception);
throw exception;
}

parts.add(CompletedPart.builder().partNumber(i).eTag(partETag).build());
}

if (bytesCount != blobSize) {
IOException exception = new IOException(
String.format(Locale.ROOT, "Multipart upload for [%s] sent %d bytes; expected %d bytes", blobName, bytesCount, blobSize)
);
etagListener.onFailure(exception);
throw exception;
}

CompleteMultipartUploadRequest completeRequest = CompleteMultipartUploadRequest.builder()
.bucket(bucketName)
.key(blobName)
.uploadId(uploadId.get())
.multipartUpload(CompletedMultipartUpload.builder().parts(parts).build())
.ifMatch(eTag)
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().multipartUploadMetricCollector))
.build();

CompleteMultipartUploadResponse completeResponse = SocketAccess.doPrivileged(
() -> clientReference.get().completeMultipartUpload(completeRequest)
);

if (completeResponse.eTag() != null) {
success = true;
etagListener.onResponse(completeResponse.eTag());
} else {
IOException exception = new IOException(
"S3 multipart upload for [" + blobName + "] returned null ETag, violating data integrity expectations"
);
etagListener.onFailure(exception);
throw exception;
}

} catch (S3Exception e) {
if (e.statusCode() == HTTP_STATUS_PRECONDITION_FAILED) {
etagListener.onFailure(new OpenSearchException("stale_primary_shard", e, "Precondition Failed : Etag Mismatch", blobName));
throw new IOException("Unable to upload object [" + blobName + "] due to ETag mismatch", e);
} else {
IOException exception = new IOException(
String.format(Locale.ROOT, "S3 error during multipart upload [%s]: %s", blobName, e.getMessage()),
e
);
etagListener.onFailure(exception);
throw exception;
}
} catch (SdkException e) {
IOException exception = new IOException(String.format(Locale.ROOT, "S3 multipart upload failed for [%s]", blobName), e);
etagListener.onFailure(exception);
throw exception;
} catch (Exception e) {
IOException exception = new IOException(
String.format(Locale.ROOT, "Unexpected error during multipart upload [%s]: %s", blobName, e.getMessage()),
e
);
etagListener.onFailure(exception);
throw exception;
} finally {
if (!success && Strings.hasLength(uploadId.get())) {
AbortMultipartUploadRequest abortRequest = AbortMultipartUploadRequest.builder()
.bucket(bucketName)
.key(blobName)
.uploadId(uploadId.get())
.build();
try (AmazonS3Reference abortClient = blobStore.clientReference()) {
SocketAccess.doPrivilegedVoid(() -> abortClient.get().abortMultipartUpload(abortRequest));
} catch (Exception abortException) {
logger.warn(
"Failed to abort incomplete multipart upload [{}] with ID [{}]. "
+ "This may result in orphaned S3 data and charges.",
new Object[] { blobName, uploadId.get() },
abortException
);
}
}
}
}

/**
* Uploads a blob using a single upload request
*/
Expand Down
Loading
Loading