Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,9 @@ public static CodeBlock create(OperationModel operationModel, IntermediateModel
return CodeBlock.of("");
}

// TODO : remove once request compression for streaming operations is supported
if (operationModel.isStreaming()) {
throw new IllegalStateException("Request compression for streaming operations is not yet supported in the AWS SDK "
+ "for Java.");
}

// TODO : remove once S3 checksum interceptors are moved to occur after CompressRequestStage
// TODO : remove once:
// 1) S3 checksum interceptors are moved to occur after CompressRequestStage
// 2) Transfer-Encoding:chunked is supported in S3
if (model.getMetadata().getServiceName().equals("S3")) {
throw new IllegalStateException("Request compression for S3 is not yet supported in the AWS SDK for Java.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
@SdkInternalApi
public final class AwsSignedChunkedEncodingInputStream extends AwsChunkedEncodingInputStream {

private static final String CRLF = "\r\n";
private static final String CHUNK_SIGNATURE_HEADER = ";chunk-signature=";
private static final String CHECKSUM_SIGNATURE_HEADER = "x-amz-trailer-signature:";
private String previousChunkSignature;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.async;

import static software.amazon.awssdk.core.HttpChecksumConstant.DEFAULT_ASYNC_CHUNK_SIZE;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.utils.builder.SdkBuilder;

/**
* Class that will buffer incoming BufferBytes with unknown total length to chunks of bufferSize
*/
@SdkInternalApi
public final class ChunkBufferWithUnknownLength {
private ByteBuffer currentBuffer;

private ChunkBufferWithUnknownLength(Integer bufferSize) {
int chunkSize = bufferSize != null ? bufferSize : DEFAULT_ASYNC_CHUNK_SIZE;
this.currentBuffer = ByteBuffer.allocate(chunkSize);
}

public static Builder builder() {
return new DefaultBuilder();
}

public synchronized Iterable<ByteBuffer> bufferAndCreateChunks(ByteBuffer buffer) {
List<ByteBuffer> bufferedList = new ArrayList<>();
while (buffer.hasRemaining()) {
int bytesToCopy = Math.min(buffer.remaining(), currentBuffer.remaining());
byte[] bytes = new byte[bytesToCopy];
buffer.get(bytes);
currentBuffer.put(bytes);

if (!currentBuffer.hasRemaining() || !buffer.hasRemaining()) {
currentBuffer.flip();
ByteBuffer bufferToSend = ByteBuffer.allocate(currentBuffer.limit());
bufferToSend.put(currentBuffer);
bufferToSend.flip();
bufferedList.add(bufferToSend);
currentBuffer.clear();
}
}
return bufferedList;
}

public interface Builder extends SdkBuilder<Builder, ChunkBufferWithUnknownLength> {
Builder bufferSize(int bufferSize);
}

private static final class DefaultBuilder implements Builder {
private Integer bufferSize;

@Override
public ChunkBufferWithUnknownLength build() {
return new ChunkBufferWithUnknownLength(bufferSize);
}

@Override
public Builder bufferSize(int bufferSize) {
this.bufferSize = bufferSize;
return this;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.async;

import java.nio.ByteBuffer;
import java.util.Optional;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.internal.compression.Compressor;
import software.amazon.awssdk.utils.Validate;
import software.amazon.awssdk.utils.builder.SdkBuilder;

/**
* Wrapper class to wrap an AsyncRequestBody.
* This will chunk and compress the payload with the provided {@link Compressor}.
*/
@SdkInternalApi
public class CompressionAsyncRequestBody implements AsyncRequestBody {

private static final int COMPRESSION_CHUNK_SIZE = 128 * 1024;
private final AsyncRequestBody wrapped;
private final Compressor compressor;

private CompressionAsyncRequestBody(DefaultBuilder builder) {
Validate.notNull(builder.asyncRequestBody, "wrapped AsyncRequestBody cannot be null");
Validate.notNull(builder.compressor, "compressor cannot be null");
this.wrapped = builder.asyncRequestBody;
this.compressor = builder.compressor;
}

/**
* @return Builder instance to construct a {@link CompressionAsyncRequestBody}.
*/
public static Builder builder() {
return new DefaultBuilder();
}

public interface Builder extends SdkBuilder<CompressionAsyncRequestBody.Builder, CompressionAsyncRequestBody> {

/**
* Sets the AsyncRequestBody that will be wrapped.
* @param asyncRequestBody
* @return This builder for method chaining.
*/
Builder asyncRequestBody(AsyncRequestBody asyncRequestBody);

/**
* Sets the compressor to compress the request.
* @param compressor
* @return This builder for method chaining.
*/
Builder compressor(Compressor compressor);
}

private static final class DefaultBuilder implements Builder {

private AsyncRequestBody asyncRequestBody;
private Compressor compressor;

@Override
public CompressionAsyncRequestBody build() {
return new CompressionAsyncRequestBody(this);
}

@Override
public Builder asyncRequestBody(AsyncRequestBody asyncRequestBody) {
this.asyncRequestBody = asyncRequestBody;
return this;
}

@Override
public Builder compressor(Compressor compressor) {
this.compressor = compressor;
return this;
}
}

@Override
public Optional<Long> contentLength() {
return Optional.empty();
}

@Override
public String contentType() {
return wrapped.contentType();
}

@Override
public void subscribe(Subscriber<? super ByteBuffer> s) {
Validate.notNull(s, "Subscription MUST NOT be null.");

ChunkBufferWithUnknownLength chunkBuffer = ChunkBufferWithUnknownLength.builder()
.bufferSize(COMPRESSION_CHUNK_SIZE)
.build();

wrapped.flatMapIterable(chunkBuffer::bufferAndCreateChunks)
.subscribe(new CompressionSubscriber(s, compressor));
}

private static final class CompressionSubscriber implements Subscriber<ByteBuffer> {

private final Subscriber<? super ByteBuffer> wrapped;
private final Compressor compressor;

CompressionSubscriber(Subscriber<? super ByteBuffer> wrapped, Compressor compressor) {
this.wrapped = wrapped;
this.compressor = compressor;
}

@Override
public void onSubscribe(Subscription subscription) {
wrapped.onSubscribe(subscription);
}

@Override
public void onNext(ByteBuffer byteBuffer) {
ByteBuffer compressedBuffer = compressor.compress(byteBuffer);
wrapped.onNext(compressedBuffer);
}

@Override
public void onError(Throwable t) {
wrapped.onError(t);
}

@Override
public void onComplete() {
wrapped.onComplete();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute;
import software.amazon.awssdk.core.internal.async.CompressionAsyncRequestBody;
import software.amazon.awssdk.core.internal.compression.Compressor;
import software.amazon.awssdk.core.internal.compression.CompressorType;
import software.amazon.awssdk.core.internal.http.HttpClientDependencies;
import software.amazon.awssdk.core.internal.http.RequestExecutionContext;
import software.amazon.awssdk.core.internal.http.pipeline.MutableRequestToRequestPipeline;
import software.amazon.awssdk.core.internal.sync.CompressionContentStreamProvider;
import software.amazon.awssdk.http.ContentStreamProvider;
import software.amazon.awssdk.http.SdkHttpFullRequest;
import software.amazon.awssdk.utils.IoUtils;
Expand Down Expand Up @@ -67,10 +69,25 @@ public SdkHttpFullRequest.Builder execute(SdkHttpFullRequest.Builder input, Requ
compressEntirePayload(input, compressor);
updateContentEncodingHeader(input, compressor);
updateContentLengthHeader(input);
return input;
}

if (!isTransferEncodingChunked(input)) {
return input;
}

// TODO : streaming - sync & async
if (context.requestProvider() == null) {
// sync streaming
input.contentStreamProvider(new CompressionContentStreamProvider(input.contentStreamProvider(), compressor));
} else {
// async streaming
context.requestProvider(CompressionAsyncRequestBody.builder()
.asyncRequestBody(context.requestProvider())
.compressor(compressor)
.build());
}

updateContentEncodingHeader(input, compressor);
return input;
}

Expand Down Expand Up @@ -123,6 +140,12 @@ private void updateContentLengthHeader(SdkHttpFullRequest.Builder input) {
}
}

private boolean isTransferEncodingChunked(SdkHttpFullRequest.Builder input) {
return input.firstMatchingHeader("Transfer-Encoding")
.map(headerValue -> headerValue.equals("chunked"))
.orElse(false);
}

private Compressor resolveCompressorType(ExecutionAttributes executionAttributes) {
List<String> encodings =
executionAttributes.getAttribute(SdkInternalExecutionAttribute.REQUEST_COMPRESSION).getEncodings();
Expand Down
Loading