Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,18 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.core.internal.async.ByteArrayAsyncRequestBody;
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
import software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody;
import software.amazon.awssdk.core.internal.async.SplittingPublisher;
import software.amazon.awssdk.core.internal.util.Mimetype;
import software.amazon.awssdk.utils.BinaryUtils;
import software.amazon.awssdk.utils.Validate;

/**
* Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where
Expand Down Expand Up @@ -246,4 +249,42 @@ static BlockingOutputStreamAsyncRequestBody forBlockingOutputStream(Long content
static AsyncRequestBody empty() {
return fromBytes(new byte[0]);
}


/**
* Converts this {@link AsyncRequestBody} to a publisher of {@link AsyncRequestBody}s, each of which publishes a specific
* portion of the original data, based on the configured {code chunkSizeInBytes}.
*
* <p>
* If content length of this {@link AsyncRequestBody} is present, each divided {@link AsyncRequestBody} is delivered to the
* subscriber right after it's initialized.
* <p>
* // TODO: API Surface Area review: should we make this behavior configurable?
* If content length is null, it is sent after the entire content for that chunk is buffered.
* In this case, the configured {@code maxMemoryUsageInBytes} must be larger than or equal to {@code chunkSizeInBytes}.
*
* @param chunkSizeInBytes the size for each divided chunk. The last chunk may be smaller than the configured size.
* @param maxMemoryUsageInBytes the max memory the SDK will use to buffer the content
* @return SplitAsyncRequestBodyResult
*/
default SplitAsyncRequestBodyResponse split(long chunkSizeInBytes, long maxMemoryUsageInBytes) {
Validate.isPositive(chunkSizeInBytes, "chunkSizeInBytes");
Validate.isPositive(maxMemoryUsageInBytes, "maxMemoryUsageInBytes");

if (!this.contentLength().isPresent()) {
Validate.isTrue(maxMemoryUsageInBytes >= chunkSizeInBytes,
"maxMemoryUsageInBytes must be larger than or equal to " +
"chunkSizeInBytes if the content length is unknown");
}

CompletableFuture<Void> future = new CompletableFuture<>();
SplittingPublisher splittingPublisher = SplittingPublisher.builder()
.asyncRequestBody(this)
.chunkSizeInBytes(chunkSizeInBytes)
.maxMemoryUsageInBytes(maxMemoryUsageInBytes)
.resultFuture(future)
.build();

return SplitAsyncRequestBodyResponse.create(splittingPublisher, future);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.async;


import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.utils.Validate;

/**
* Containing the result from {@link AsyncRequestBody#split(long, long)}
*/
@SdkPublicApi
public final class SplitAsyncRequestBodyResponse {
private final SdkPublisher<AsyncRequestBody> asyncRequestBody;
private final CompletableFuture<Void> future;

private SplitAsyncRequestBodyResponse(SdkPublisher<AsyncRequestBody> asyncRequestBody, CompletableFuture<Void> future) {
this.asyncRequestBody = Validate.paramNotNull(asyncRequestBody, "asyncRequestBody");
this.future = Validate.paramNotNull(future, "future");
}

public static SplitAsyncRequestBodyResponse create(SdkPublisher<AsyncRequestBody> asyncRequestBody,
CompletableFuture<Void> future) {
return new SplitAsyncRequestBodyResponse(asyncRequestBody, future);
}

/**
* Returns the converted {@link SdkPublisher} of {@link AsyncRequestBody}s. Each {@link AsyncRequestBody} publishes a specific
* portion of the original data.
*/
public SdkPublisher<AsyncRequestBody> asyncRequestBodyPublisher() {
return asyncRequestBody;
}

/**
* Returns {@link CompletableFuture} that will be notified when all data has been consumed or if an error occurs.
*/
public CompletableFuture<Void> future() {
return future;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

SplitAsyncRequestBodyResponse that = (SplitAsyncRequestBodyResponse) o;

if (!asyncRequestBody.equals(that.asyncRequestBody)) {
return false;
}
return future.equals(that.future);
}

@Override
public int hashCode() {
int result = asyncRequestBody.hashCode();
result = 31 * result + future.hashCode();
return result;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,11 @@
import software.amazon.awssdk.utils.async.SimplePublisher;

/**
* Splits an {@link SdkPublisher} to multiple smaller {@link AsyncRequestBody}s, each of which publishes a specific portion of the
* original data.
* Splits an {@link AsyncRequestBody} to multiple smaller {@link AsyncRequestBody}s, each of which publishes a specific portion of
* the original data.
*
* <p>If content length is known, each {@link AsyncRequestBody} is sent to the subscriber right after it's initialized.
* Otherwise, it is sent after the entire content for that chunk is buffered. This is required to get content length.
*
* // TODO: create a default method in AsyncRequestBody for this
*/
@SdkInternalApi
public class SplittingPublisher implements SdkPublisher<AsyncRequestBody> {
Expand All @@ -51,9 +49,9 @@ public class SplittingPublisher implements SdkPublisher<AsyncRequestBody> {

private SplittingPublisher(Builder builder) {
this.upstreamPublisher = Validate.paramNotNull(builder.asyncRequestBody, "asyncRequestBody");
this.chunkSizeInBytes = Validate.paramNotNull(builder.chunkSizeInBytes, "chunkSizeInBytes");
this.chunkSizeInBytes = Validate.isPositive(builder.chunkSizeInBytes, "chunkSizeInBytes");
this.splittingSubscriber = new SplittingSubscriber(upstreamPublisher.contentLength().orElse(null));
this.maxMemoryUsageInBytes = builder.maxMemoryUsageInBytes == null ? Long.MAX_VALUE : builder.maxMemoryUsageInBytes;
this.maxMemoryUsageInBytes = Validate.isPositive(builder.maxMemoryUsageInBytes, "maxMemoryUsageInBytes");
this.future = builder.future;

// We need to cancel upstream subscription if the future gets cancelled.
Expand Down Expand Up @@ -304,13 +302,13 @@ public Builder asyncRequestBody(AsyncRequestBody asyncRequestBody) {
* @param chunkSizeInBytes The new chunkSizeInBytes value.
* @return This object for method chaining.
*/
public Builder chunkSizeInBytes(Long chunkSizeInBytes) {
public Builder chunkSizeInBytes(long chunkSizeInBytes) {
this.chunkSizeInBytes = chunkSizeInBytes;
return this;
}

/**
* Sets the maximum memory usage in bytes. By default, it uses unlimited memory.
* Sets the maximum memory usage in bytes.
*
* @param maxMemoryUsageInBytes The new maxMemoryUsageInBytes value.
* @return This object for method chaining.
Expand All @@ -319,7 +317,7 @@ public Builder chunkSizeInBytes(Long chunkSizeInBytes) {
// on a new byte buffer. But we don't know for sure what the size of a buffer we request will be (we do use the size
// for the last byte buffer as a hint), so I don't think we can have a truly accurate max. Maybe we call it minimum
// buffer size instead?
public Builder maxMemoryUsageInBytes(Long maxMemoryUsageInBytes) {
public Builder maxMemoryUsageInBytes(long maxMemoryUsageInBytes) {
this.maxMemoryUsageInBytes = maxMemoryUsageInBytes;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,39 +15,30 @@

package software.amazon.awssdk.core.async;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import com.google.common.jimfs.Configuration;
import com.google.common.jimfs.Jimfs;
import io.reactivex.Flowable;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystem;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import org.assertj.core.util.Lists;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import software.amazon.awssdk.core.internal.util.Mimetype;
import software.amazon.awssdk.http.async.SimpleSubscriber;
import software.amazon.awssdk.utils.BinaryUtils;
import software.amazon.awssdk.utils.StringInputStream;

@RunWith(Parameterized.class)
public class AsyncRequestBodyTest {
Expand Down Expand Up @@ -177,4 +168,25 @@ public void fromBytes_byteArrayNotNull_createsCopy() {
ByteBuffer publishedBb = Flowable.fromPublisher(body).toList().blockingGet().get(0);
assertThat(BinaryUtils.copyAllBytesFrom(publishedBb)).isEqualTo(original);
}

@Test
public void split_nonPositiveInput_shouldThrowException() {
AsyncRequestBody body = AsyncRequestBody.fromString("test");
assertThatThrownBy(() -> body.split(0, 4)).hasMessageContaining("must be positive");
assertThatThrownBy(() -> body.split(-1, 4)).hasMessageContaining("must be positive");
assertThatThrownBy(() -> body.split(5, 0)).hasMessageContaining("must be positive");
assertThatThrownBy(() -> body.split(5, -1)).hasMessageContaining("must be positive");
}

@Test
public void split_contentUnknownMaxMemorySmallerThanChunkSize_shouldThrowException() {
AsyncRequestBody body = AsyncRequestBody.fromPublisher(new Publisher<ByteBuffer>() {
@Override
public void subscribe(Subscriber<? super ByteBuffer> s) {

}
});
assertThatThrownBy(() -> body.split(10, 4))
.hasMessageContaining("must be larger than or equal");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.async;

import nl.jqno.equalsverifier.EqualsVerifier;
import org.junit.jupiter.api.Test;

public class SplitAsyncRequestBodyResponseTest {

@Test
void equalsHashcode() {
EqualsVerifier.forClass(SplitAsyncRequestBodyResponse.class)
.withNonnullFields("asyncRequestBody", "future")
.verify();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.function.Function;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.internal.async.SplittingPublisher;
import software.amazon.awssdk.core.async.SplitAsyncRequestBodyResponse;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
Expand Down Expand Up @@ -169,26 +169,26 @@ private CompletableFuture<Void> sendUploadPartRequests(MpuRequestContext mpuRequ
CompletableFuture<PutObjectResponse> returnFuture,
Collection<CompletableFuture<CompletedPart>> futures) {

CompletableFuture<Void> splittingPublisherFuture = new CompletableFuture<>();


AsyncRequestBody asyncRequestBody = mpuRequestContext.request.right();
SplittingPublisher splittingPublisher = SplittingPublisher.builder()
.asyncRequestBody(asyncRequestBody)
.chunkSizeInBytes(mpuRequestContext.partSize)
.maxMemoryUsageInBytes(maxMemoryUsageInBytes)
.resultFuture(splittingPublisherFuture)
.build();

splittingPublisher.map(new BodyToRequestConverter(mpuRequestContext.request.left(), mpuRequestContext.uploadId))
.subscribe(pair -> sendIndividualUploadPartRequest(mpuRequestContext.uploadId,
completedParts,
futures,
pair,
splittingPublisherFuture))
.exceptionally(throwable -> {
returnFuture.completeExceptionally(throwable);
return null;
});

SplitAsyncRequestBodyResponse result = asyncRequestBody.split(mpuRequestContext.partSize, maxMemoryUsageInBytes);

CompletableFuture<Void> splittingPublisherFuture = result.future();

result.asyncRequestBodyPublisher()
.map(new BodyToRequestConverter(mpuRequestContext.request.left(),
mpuRequestContext.uploadId))
.subscribe(pair -> sendIndividualUploadPartRequest(mpuRequestContext.uploadId,
completedParts,
futures,
pair,
splittingPublisherFuture))
.exceptionally(throwable -> {
returnFuture.completeExceptionally(throwable);
return null;
});
return splittingPublisherFuture;
}

Expand Down