Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
package org.opensearch.common.blobstore;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.blobstore.ConditionalWrite.ConditionalWriteOptions;
import org.opensearch.common.blobstore.ConditionalWrite.ConditionalWriteResponse;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.core.action.ActionListener;
Expand All @@ -35,6 +37,24 @@ public interface AsyncMultiStreamBlobContainer extends BlobContainer {
*/
void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> completionListener) throws IOException;

/**
* Reads blob content basis a preconditional requirement, from multiple streams each from a specific part of the file, which is provided by the
* StreamContextSupplier in the WriteContext passed to this method. An {@link IOException} is thrown if reading
* any of the input streams fails, or writing to the target blob fails
*
* @param writeContext A WriteContext object encapsulating all information needed to perform the upload
* @param options The {@link ConditionalWriteOptions} specifying the preconditions that must be met for the upload to proceed.
* @param completionListener The {@link ActionListener} to which upload events and the result will be published.
* @throws IOException if any of the input streams could not be read, or the target blob could not be written to
*/
default void asyncBlobUploadConditionally(
WriteContext writeContext,
ConditionalWriteOptions options,
ActionListener<ConditionalWriteResponse> completionListener
) throws IOException {
throw new UnsupportedOperationException("asyncBlobUploadConditionally is not implemented yet");
};

/**
* Creates an async callback of a {@link ReadContext} containing the multipart streams for a specified blob within the container.
* @param blobName The name of the blob for which the {@link ReadContext} needs to be fetched.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
package org.opensearch.common.blobstore;

import org.opensearch.common.StreamContext;
import org.opensearch.common.blobstore.ConditionalWrite.ConditionalWriteOptions;
import org.opensearch.common.blobstore.ConditionalWrite.ConditionalWriteResponse;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.crypto.CryptoHandler;
Expand Down Expand Up @@ -44,6 +46,16 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
blobContainer.asyncBlobUpload(encryptedWriteContext, completionListener);
}

@Override
public void asyncBlobUploadConditionally(
WriteContext writeContext,
ConditionalWriteOptions options,
ActionListener<ConditionalWriteResponse> completionListener
) throws IOException {
EncryptedWriteContext<T, U> encryptedWriteContext = new EncryptedWriteContext<>(writeContext, cryptoHandler);
blobContainer.asyncBlobUploadConditionally(encryptedWriteContext, options, completionListener);
}

@Override
public void readBlobAsync(String blobName, ActionListener<ReadContext> listener) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.blobstore.ConditionalWrite.ConditionalWriteOptions;
import org.opensearch.common.blobstore.ConditionalWrite.ConditionalWriteResponse;
import org.opensearch.core.action.ActionListener;

import java.io.IOException;
Expand Down Expand Up @@ -125,6 +127,40 @@ default long readBlobPreferredLength() {
throw new UnsupportedOperationException(); // NORELEASE
}

/**
* Reads blob content from the input stream and writes it to the container in a new blob with the given name,
applying any conditional checks specified in {@code options}. This operation is atomic: if any precondition fails or the write
* encounters an error, no data will be persisted and the failure will be signaled via {@link ActionListener}
* The {@code ActionListener<ConditionalWriteResponse>} is invoked with the new identifier if the operation succeeds.
*
* @param blobName
* The name of the blob to write the contents of the input stream to.
* @param inputStream
* The input stream from which to retrieve the bytes to write to the blob.
* @param blobSize
* The size of the blob to be written, in bytes. It is implementation dependent whether
* this value is used in writing the blob to the repository.
* @param failIfAlreadyExists
* whether to throw a FileAlreadyExistsException if the given blob already exists
* @param options
* The {@link ConditionalWriteOptions} specifying the preconditions that must be met for the upload to proceed.
* @param listener
* The {@link ActionListener} to which upload events and the result will be published.
* @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists
* @throws IOException if the input stream could not be read, the upload fails (including identifier mismatches), or the target blob
* could not be written to
*/
default void writeBlobConditionally(
String blobName,
InputStream inputStream,
long blobSize,
boolean failIfAlreadyExists,
ConditionalWriteOptions options,
ActionListener<ConditionalWriteResponse> listener
) throws IOException {
throw new UnsupportedOperationException("writeBlobConditionally is not implemented yet");
}

/**
* Reads blob content from the input stream and writes it to the container in a new blob with the given name.
* This method assumes the container does not already contain a blob of the same blobName. If a blob by the
Expand All @@ -144,6 +180,45 @@ default long readBlobPreferredLength() {
*/
void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException;

/**
* Reads blob content from the input stream and writes it to the container in a new blob with the given name,
* attaching the provided metadata and applying any conditional checks specified in {@code options}. this operation is atomic:
* if any precondition fails or the write encounters an error, no data will be persisted and the failure will be signaled via {@link ActionListener}
* The {@code ActionListener<ConditionalWriteResponse>} is invoked with the new identifier if the operation succeeds.
*
* @param blobName
* The name of the blob to write the contents of the input stream to.
* @param inputStream
* The input stream from which to retrieve the bytes to write to the blob.
* @param metadata
* The metadata to be associate with the blob upload.
* @param blobSize
* The size of the blob to be written, in bytes. It is implementation dependent whether
* this value is used in writing the blob to the repository.
* @param failIfAlreadyExists
* whether to throw a FileAlreadyExistsException if the given blob already exists
* @param options
* The {@link ConditionalWriteOptions} specifying the preconditions that must be met for the upload to proceed.
* @param listener
* The {@link ActionListener} to which upload events and the result will be published.
* @throws FileAlreadyExistsException
* if failIfAlreadyExists is true and a blob by the same name already exists
* @throws IOException if the input stream could not be read, the upload fails (including identifier mismatches), or the target blob
* could not be written to
*/
@ExperimentalApi
default void writeBlobWithMetadataConditionally(
String blobName,
InputStream inputStream,
long blobSize,
boolean failIfAlreadyExists,
@Nullable Map<String, String> metadata,
ConditionalWriteOptions options,
ActionListener<ConditionalWriteResponse> listener
) throws IOException {
throw new UnsupportedOperationException("writeBlobWithMetadataConditionally is not implemented yet");
}

/**
* Reads blob content from the input stream and writes it to the container in a new blob with the given name, and metadata.
* This method assumes the container does not already contain a blob of the same blobName. If a blob by the
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@

/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.blobstore;

import java.time.Instant;

/**
* Utility classes supporting conditional write operations on a {@link BlobContainer}.
* The main entry points are {@link ConditionalWriteOptions} for specifying conditions, and
* {@link ConditionalWriteResponse} for receiving the result of a conditional write.
*/
public final class ConditionalWrite {
private ConditionalWrite() {}

/**
* Encapsulates options controlling preconditions to be deployed when a blob is to be written to the remote store.
* Immutable and thread-safe. Use the provided static factory methods or the {@link Builder}
* to construct instances with the desired conditions. These options can be supplied to
* blob store write operations to enforce preconditions
*
*/
public static final class ConditionalWriteOptions {

private final boolean ifNotExists;
private final boolean ifMatch;
private final boolean ifUnmodifiedSince;
private final String versionIdentifier;
private final Instant unmodifiedSince;

private ConditionalWriteOptions(Builder builder) {
this.ifNotExists = builder.ifNotExists;
this.ifMatch = builder.ifMatch;
this.ifUnmodifiedSince = builder.ifUnmodifiedSince;
this.versionIdentifier = builder.versionIdentifier;
this.unmodifiedSince = builder.unmodifiedSince;
}
Comment on lines +37 to +43
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can there be case where the conditionalWriteOptions has multiple conditions and are contradicting? If yes do we need to have validation for same?

Copy link
Contributor Author

@x-INFiN1TY-x x-INFiN1TY-x Jun 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


public static ConditionalWriteOptions none() {
return new Builder().build();
}

public static ConditionalWriteOptions ifNotExists() {
return new Builder().setIfNotExists(true).build();
}

public static ConditionalWriteOptions ifMatch(String versionIdentifier) {
return new Builder().setIfMatch(true).setVersionIdentifier(versionIdentifier).build();
}

public static ConditionalWriteOptions ifUnmodifiedSince(Instant ts) {
return new Builder().setIfUnmodifiedSince(true).setUnmodifiedSince(ts).build();
}

/**
* Returns a new {@link Builder} for constructing custom conditional write options.
*/
public static Builder builder() {
return new Builder();
}

public boolean isIfNotExists() {
return ifNotExists;
}

public boolean isIfMatch() {
return ifMatch;
}

public boolean isIfUnmodifiedSince() {
return ifUnmodifiedSince;
}

public String getVersionIdentifier() {
return versionIdentifier;
}

public Instant getUnmodifiedSince() {
return unmodifiedSince;
}

/**
* Builder for {@link ConditionalWriteOptions}.
* Allows fine-grained construction of conditional write criteria.
*/
public static final class Builder {
private boolean ifNotExists = false;
private boolean ifMatch = false;
private boolean ifUnmodifiedSince = false;
private String versionIdentifier = null;
private Instant unmodifiedSince = null;

private Builder() {}

/**
* Sets the write to succeed only if the blob does not exist.
* @param flag true to enable this condition
* @return this builder
*/
public Builder setIfNotExists(boolean flag) {
this.ifNotExists = flag;
return this;
}

/**
* Sets the write to succeed only if the blob matches the expected version.
* @param flag true to enable this condition
* @return this builder
*/
public Builder setIfMatch(boolean flag) {
this.ifMatch = flag;
return this;
}

/**
* Sets the write to succeed only if the blob was not modified since a given instant.
* @param flag true to enable this condition
* @return this builder
*/
public Builder setIfUnmodifiedSince(boolean flag) {
this.ifUnmodifiedSince = flag;
return this;
}

/**
* Sets the timestamp before which the blob must remain unmodified.
* @param ts the instant to check
* @return this builder
*/
public Builder setUnmodifiedSince(Instant ts) {
this.unmodifiedSince = ts;
return this;
}

public Builder setVersionIdentifier(String versionIdentifier) {
this.versionIdentifier = versionIdentifier;
return this;
}

public ConditionalWriteOptions build() {
return new ConditionalWriteOptions(this);
}
}
}

/**
* encapsulates the result of a conditional write operation.
* Contains the new version identifier (such as an ETag or version string) retrieved from the remote store
* after a successful write.
*/
public static final class ConditionalWriteResponse {
private final String newVersionIdentifier;

private ConditionalWriteResponse(String versionIdentifier) {
this.newVersionIdentifier = versionIdentifier;
}

public static ConditionalWriteResponse success(String versionIdentifier) {
return new ConditionalWriteResponse(versionIdentifier);
}

public String getVersionIdentifier() {
return newVersionIdentifier;
}
}
}
Loading