diff --git a/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamBlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamBlobContainer.java index b769cdc2fe7ab..08bdd90f617af 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamBlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamBlobContainer.java @@ -9,6 +9,8 @@ package org.opensearch.common.blobstore; import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.blobstore.ConditionalWrite.ConditionalWriteOptions; +import org.opensearch.common.blobstore.ConditionalWrite.ConditionalWriteResponse; import org.opensearch.common.blobstore.stream.read.ReadContext; import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.core.action.ActionListener; @@ -35,6 +37,24 @@ public interface AsyncMultiStreamBlobContainer extends BlobContainer { */ void asyncBlobUpload(WriteContext writeContext, ActionListener completionListener) throws IOException; + /** + * Reads blob content basis a preconditional requirement, from multiple streams each from a specific part of the file, which is provided by the + * StreamContextSupplier in the WriteContext passed to this method. An {@link IOException} is thrown if reading + * any of the input streams fails, or writing to the target blob fails + * + * @param writeContext A WriteContext object encapsulating all information needed to perform the upload + * @param options The {@link ConditionalWriteOptions} specifying the preconditions that must be met for the upload to proceed. + * @param completionListener The {@link ActionListener} to which upload events and the result will be published. + * @throws IOException if any of the input streams could not be read, or the target blob could not be written to + */ + default void asyncBlobUploadConditionally( + WriteContext writeContext, + ConditionalWriteOptions options, + ActionListener completionListener + ) throws IOException { + throw new UnsupportedOperationException("asyncBlobUploadConditionally is not implemented yet"); + }; + /** * Creates an async callback of a {@link ReadContext} containing the multipart streams for a specified blob within the container. * @param blobName The name of the blob for which the {@link ReadContext} needs to be fetched. diff --git a/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainer.java index 286c01f9dca44..2c47452f99a83 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainer.java @@ -9,6 +9,8 @@ package org.opensearch.common.blobstore; import org.opensearch.common.StreamContext; +import org.opensearch.common.blobstore.ConditionalWrite.ConditionalWriteOptions; +import org.opensearch.common.blobstore.ConditionalWrite.ConditionalWriteResponse; import org.opensearch.common.blobstore.stream.read.ReadContext; import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.crypto.CryptoHandler; @@ -44,6 +46,16 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp blobContainer.asyncBlobUpload(encryptedWriteContext, completionListener); } + @Override + public void asyncBlobUploadConditionally( + WriteContext writeContext, + ConditionalWriteOptions options, + ActionListener completionListener + ) throws IOException { + EncryptedWriteContext encryptedWriteContext = new EncryptedWriteContext<>(writeContext, cryptoHandler); + blobContainer.asyncBlobUploadConditionally(encryptedWriteContext, options, completionListener); + } + @Override public void readBlobAsync(String blobName, ActionListener listener) { try { diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java index a2e4199029ef4..796fd475ac6fc 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java @@ -34,6 +34,8 @@ import org.opensearch.common.Nullable; import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.blobstore.ConditionalWrite.ConditionalWriteOptions; +import org.opensearch.common.blobstore.ConditionalWrite.ConditionalWriteResponse; import org.opensearch.core.action.ActionListener; import java.io.IOException; @@ -125,6 +127,40 @@ default long readBlobPreferredLength() { throw new UnsupportedOperationException(); // NORELEASE } + /** + * Reads blob content from the input stream and writes it to the container in a new blob with the given name, + applying any conditional checks specified in {@code options}. This operation is atomic: if any precondition fails or the write + * encounters an error, no data will be persisted and the failure will be signaled via {@link ActionListener} + * The {@code ActionListener} is invoked with the new identifier if the operation succeeds. + * + * @param blobName + * The name of the blob to write the contents of the input stream to. + * @param inputStream + * The input stream from which to retrieve the bytes to write to the blob. + * @param blobSize + * The size of the blob to be written, in bytes. It is implementation dependent whether + * this value is used in writing the blob to the repository. + * @param failIfAlreadyExists + * whether to throw a FileAlreadyExistsException if the given blob already exists + * @param options + * The {@link ConditionalWriteOptions} specifying the preconditions that must be met for the upload to proceed. + * @param listener + * The {@link ActionListener} to which upload events and the result will be published. + * @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists + * @throws IOException if the input stream could not be read, the upload fails (including identifier mismatches), or the target blob + * could not be written to + */ + default void writeBlobConditionally( + String blobName, + InputStream inputStream, + long blobSize, + boolean failIfAlreadyExists, + ConditionalWriteOptions options, + ActionListener listener + ) throws IOException { + throw new UnsupportedOperationException("writeBlobConditionally is not implemented yet"); + } + /** * Reads blob content from the input stream and writes it to the container in a new blob with the given name. * This method assumes the container does not already contain a blob of the same blobName. If a blob by the @@ -144,6 +180,45 @@ default long readBlobPreferredLength() { */ void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException; + /** + * Reads blob content from the input stream and writes it to the container in a new blob with the given name, + * attaching the provided metadata and applying any conditional checks specified in {@code options}. this operation is atomic: + * if any precondition fails or the write encounters an error, no data will be persisted and the failure will be signaled via {@link ActionListener} + * The {@code ActionListener} is invoked with the new identifier if the operation succeeds. + * + * @param blobName + * The name of the blob to write the contents of the input stream to. + * @param inputStream + * The input stream from which to retrieve the bytes to write to the blob. + * @param metadata + * The metadata to be associate with the blob upload. + * @param blobSize + * The size of the blob to be written, in bytes. It is implementation dependent whether + * this value is used in writing the blob to the repository. + * @param failIfAlreadyExists + * whether to throw a FileAlreadyExistsException if the given blob already exists + * @param options + * The {@link ConditionalWriteOptions} specifying the preconditions that must be met for the upload to proceed. + * @param listener + * The {@link ActionListener} to which upload events and the result will be published. + * @throws FileAlreadyExistsException + * if failIfAlreadyExists is true and a blob by the same name already exists + * @throws IOException if the input stream could not be read, the upload fails (including identifier mismatches), or the target blob + * could not be written to + */ + @ExperimentalApi + default void writeBlobWithMetadataConditionally( + String blobName, + InputStream inputStream, + long blobSize, + boolean failIfAlreadyExists, + @Nullable Map metadata, + ConditionalWriteOptions options, + ActionListener listener + ) throws IOException { + throw new UnsupportedOperationException("writeBlobWithMetadataConditionally is not implemented yet"); + } + /** * Reads blob content from the input stream and writes it to the container in a new blob with the given name, and metadata. * This method assumes the container does not already contain a blob of the same blobName. If a blob by the diff --git a/server/src/main/java/org/opensearch/common/blobstore/ConditionalWrite.java b/server/src/main/java/org/opensearch/common/blobstore/ConditionalWrite.java new file mode 100644 index 0000000000000..6d0f491be5592 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/ConditionalWrite.java @@ -0,0 +1,172 @@ + +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore; + +import java.time.Instant; + +/** + * Utility classes supporting conditional write operations on a {@link BlobContainer}. + * The main entry points are {@link ConditionalWriteOptions} for specifying conditions, and + * {@link ConditionalWriteResponse} for receiving the result of a conditional write. + */ +public final class ConditionalWrite { + private ConditionalWrite() {} + + /** + * Encapsulates options controlling preconditions to be deployed when a blob is to be written to the remote store. + * Immutable and thread-safe. Use the provided static factory methods or the {@link Builder} + * to construct instances with the desired conditions. These options can be supplied to + * blob store write operations to enforce preconditions + * + */ + public static final class ConditionalWriteOptions { + + private final boolean ifNotExists; + private final boolean ifMatch; + private final boolean ifUnmodifiedSince; + private final String versionIdentifier; + private final Instant unmodifiedSince; + + private ConditionalWriteOptions(Builder builder) { + this.ifNotExists = builder.ifNotExists; + this.ifMatch = builder.ifMatch; + this.ifUnmodifiedSince = builder.ifUnmodifiedSince; + this.versionIdentifier = builder.versionIdentifier; + this.unmodifiedSince = builder.unmodifiedSince; + } + + public static ConditionalWriteOptions none() { + return new Builder().build(); + } + + public static ConditionalWriteOptions ifNotExists() { + return new Builder().setIfNotExists(true).build(); + } + + public static ConditionalWriteOptions ifMatch(String versionIdentifier) { + return new Builder().setIfMatch(true).setVersionIdentifier(versionIdentifier).build(); + } + + public static ConditionalWriteOptions ifUnmodifiedSince(Instant ts) { + return new Builder().setIfUnmodifiedSince(true).setUnmodifiedSince(ts).build(); + } + + /** + * Returns a new {@link Builder} for constructing custom conditional write options. + */ + public static Builder builder() { + return new Builder(); + } + + public boolean isIfNotExists() { + return ifNotExists; + } + + public boolean isIfMatch() { + return ifMatch; + } + + public boolean isIfUnmodifiedSince() { + return ifUnmodifiedSince; + } + + public String getVersionIdentifier() { + return versionIdentifier; + } + + public Instant getUnmodifiedSince() { + return unmodifiedSince; + } + + /** + * Builder for {@link ConditionalWriteOptions}. + * Allows fine-grained construction of conditional write criteria. + */ + public static final class Builder { + private boolean ifNotExists = false; + private boolean ifMatch = false; + private boolean ifUnmodifiedSince = false; + private String versionIdentifier = null; + private Instant unmodifiedSince = null; + + private Builder() {} + + /** + * Sets the write to succeed only if the blob does not exist. + * @param flag true to enable this condition + * @return this builder + */ + public Builder setIfNotExists(boolean flag) { + this.ifNotExists = flag; + return this; + } + + /** + * Sets the write to succeed only if the blob matches the expected version. + * @param flag true to enable this condition + * @return this builder + */ + public Builder setIfMatch(boolean flag) { + this.ifMatch = flag; + return this; + } + + /** + * Sets the write to succeed only if the blob was not modified since a given instant. + * @param flag true to enable this condition + * @return this builder + */ + public Builder setIfUnmodifiedSince(boolean flag) { + this.ifUnmodifiedSince = flag; + return this; + } + + /** + * Sets the timestamp before which the blob must remain unmodified. + * @param ts the instant to check + * @return this builder + */ + public Builder setUnmodifiedSince(Instant ts) { + this.unmodifiedSince = ts; + return this; + } + + public Builder setVersionIdentifier(String versionIdentifier) { + this.versionIdentifier = versionIdentifier; + return this; + } + + public ConditionalWriteOptions build() { + return new ConditionalWriteOptions(this); + } + } + } + + /** + * encapsulates the result of a conditional write operation. + * Contains the new version identifier (such as an ETag or version string) retrieved from the remote store + * after a successful write. + */ + public static final class ConditionalWriteResponse { + private final String newVersionIdentifier; + + private ConditionalWriteResponse(String versionIdentifier) { + this.newVersionIdentifier = versionIdentifier; + } + + public static ConditionalWriteResponse success(String versionIdentifier) { + return new ConditionalWriteResponse(versionIdentifier); + } + + public String getVersionIdentifier() { + return newVersionIdentifier; + } + } +}