5353import software .amazon .awssdk .services .s3 .model .NoSuchKeyException ;
5454import software .amazon .awssdk .services .s3 .model .ObjectAttributes ;
5555import software .amazon .awssdk .services .s3 .model .PutObjectRequest ;
56+ import software .amazon .awssdk .services .s3 .model .PutObjectResponse ;
57+ import software .amazon .awssdk .services .s3 .model .S3Exception ;
5658import software .amazon .awssdk .services .s3 .model .UploadPartRequest ;
5759import software .amazon .awssdk .services .s3 .model .UploadPartResponse ;
5860import software .amazon .awssdk .services .s3 .paginators .ListObjectsV2Iterable ;
6264import org .apache .logging .log4j .LogManager ;
6365import org .apache .logging .log4j .Logger ;
6466import org .apache .logging .log4j .message .ParameterizedMessage ;
67+ import org .opensearch .OpenSearchException ;
6568import org .opensearch .action .support .PlainActionFuture ;
6669import org .opensearch .common .Nullable ;
6770import org .opensearch .common .SetOnce ;
7275import org .opensearch .common .blobstore .BlobMetadata ;
7376import org .opensearch .common .blobstore .BlobPath ;
7477import org .opensearch .common .blobstore .BlobStoreException ;
78+ import org .opensearch .common .blobstore .ConditionalWrite .ConditionalWriteOptions ;
79+ import org .opensearch .common .blobstore .ConditionalWrite .ConditionalWriteResponse ;
7580import org .opensearch .common .blobstore .DeleteResult ;
7681import org .opensearch .common .blobstore .InputStreamWithMetadata ;
7782import org .opensearch .common .blobstore .stream .read .ReadContext ;
96101import java .io .InputStream ;
97102import java .util .ArrayList ;
98103import java .util .List ;
104+ import java .util .Locale ;
99105import java .util .Map ;
100106import java .util .concurrent .CompletableFuture ;
101107import java .util .concurrent .ExecutionException ;
@@ -117,6 +123,7 @@ class S3BlobContainer extends AbstractBlobContainer implements AsyncMultiStreamB
117123
118124 private final S3BlobStore blobStore ;
119125 private final String keyPath ;
126+ public static final int HTTP_STATUS_PRECONDITION_FAILED = 412 ;
120127
121128 S3BlobContainer (BlobPath path , S3BlobStore blobStore ) {
122129 super (path );
@@ -521,6 +528,99 @@ private String buildKey(String blobName) {
521528 return keyPath + blobName ;
522529 }
523530
531+ /**
532+ * Executes a upload to S3 using conditional write options.
533+ * The upload can proceed based on various conditional scenarios like If-Match, If-None-Match, etc.
534+ *
535+ * @param blobStore the S3 blob store
536+ * @param blobName the key (name) of the blob
537+ * @param input the input stream containing the blob data
538+ * @param blobSize the size of the blob in bytes
539+ * @param metadata optional metadata to be associated with the blob
540+ * @param options conditional write options for the upload
541+ * @param listener listener to handle the resulting response or error notifications
542+ * @throws IOException if an error occurs during upload or if validations fail
543+ */
544+ void executeSingleUploadConditionally (
545+ final S3BlobStore blobStore ,
546+ final String blobName ,
547+ final InputStream input ,
548+ final long blobSize ,
549+ final Map <String , String > metadata ,
550+ final ConditionalWriteOptions options ,
551+ final ActionListener <ConditionalWriteResponse > listener
552+ ) throws IOException {
553+ // Extra safety checks remain the same
554+ if (blobSize > MAX_FILE_SIZE .getBytes ()) {
555+ throw new IllegalArgumentException ("Upload request size [" + blobSize + "] can't be larger than " + MAX_FILE_SIZE );
556+ }
557+ if (blobSize > blobStore .bufferSizeInBytes ()) {
558+ throw new IllegalArgumentException ("Upload request size [" + blobSize + "] can't be larger than buffer size" );
559+ }
560+
561+ PutObjectRequest .Builder putObjectRequestBuilder = PutObjectRequest .builder ()
562+ .bucket (blobStore .bucket ())
563+ .key (blobName )
564+ .contentLength (blobSize )
565+ .storageClass (blobStore .getStorageClass ())
566+ .acl (blobStore .getCannedACL ())
567+ .overrideConfiguration (o -> o .addMetricPublisher (blobStore .getStatsMetricPublisher ().putObjectMetricPublisher ))
568+ .expectedBucketOwner (blobStore .expectedBucketOwner ());
569+
570+ // Apply conditional logic based on options
571+ if (options .isIfMatch ()) {
572+ putObjectRequestBuilder .ifMatch (options .getVersionIdentifier ());
573+ } else if (options .isIfNotExists ()) {
574+ putObjectRequestBuilder .ifNoneMatch ("*" );
575+ }
576+
577+ if (CollectionUtils .isNotEmpty (metadata )) {
578+ putObjectRequestBuilder = putObjectRequestBuilder .metadata (metadata );
579+ }
580+
581+ // Use extracted encryption configuration helper
582+ configureEncryptionSettings (putObjectRequestBuilder , blobStore );
583+
584+ PutObjectRequest putObjectRequest = putObjectRequestBuilder .build ();
585+
586+ try (AmazonS3Reference clientReference = blobStore .clientReference ()) {
587+ final InputStream requestInputStream = blobStore .isUploadRetryEnabled ()
588+ ? new BufferedInputStream (input , (int ) (blobSize + 1 ))
589+ : input ;
590+
591+ PutObjectResponse response = SocketAccess .doPrivileged (
592+ () -> clientReference .get ().putObject (putObjectRequest , RequestBody .fromInputStream (requestInputStream , blobSize ))
593+ );
594+
595+ if (response .eTag () != null ) {
596+ listener .onResponse (ConditionalWriteResponse .success (response .eTag ()));
597+ } else {
598+ IOException exception = new IOException (
599+ "S3 upload for [" + blobName + "] returned null ETag, violating data integrity expectations"
600+ );
601+ listener .onFailure (exception );
602+ throw exception ;
603+ }
604+
605+ } catch (S3Exception e ) {
606+ if (e .statusCode () == HTTP_STATUS_PRECONDITION_FAILED ) {
607+ listener .onFailure (new OpenSearchException ("stale_primary_shard" , e , "Precondition Failed : Etag Mismatch" , blobName ));
608+ throw new IOException ("Unable to upload object [" + blobName + "] due to ETag mismatch" , e );
609+ } else {
610+ IOException exception = new IOException (
611+ String .format (Locale .ROOT , "S3 error during upload [%s]: %s" , blobName , e .getMessage ()),
612+ e
613+ );
614+ listener .onFailure (exception );
615+ throw exception ;
616+ }
617+ } catch (SdkException e ) {
618+ IOException exception = new IOException (String .format (Locale .ROOT , "S3 upload failed for [%s]" , blobName ), e );
619+ listener .onFailure (exception );
620+ throw exception ;
621+ }
622+ }
623+
524624 /**
525625 * Uploads a blob using a single upload request
526626 */
0 commit comments