Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,9 @@ public static CodeBlock create(OperationModel operationModel, IntermediateModel
return CodeBlock.of("");
}

// TODO : remove once request compression for streaming operations is supported
if (operationModel.isStreaming()) {
throw new IllegalStateException("Request compression for streaming operations is not yet supported in the AWS SDK "
+ "for Java.");
}

// TODO : remove once S3 checksum interceptors are moved to occur after CompressRequestStage
// TODO : remove once:
// 1) S3 checksum interceptors are moved to occur after CompressRequestStage
// 2) Transfer-Encoding:chunked is supported in S3
if (model.getMetadata().getServiceName().equals("S3")) {
throw new IllegalStateException("Request compression for S3 is not yet supported in the AWS SDK for Java.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
@SdkInternalApi
public final class AwsSignedChunkedEncodingInputStream extends AwsChunkedEncodingInputStream {

private static final String CRLF = "\r\n";
private static final String CHUNK_SIGNATURE_HEADER = ";chunk-signature=";
private static final String CHECKSUM_SIGNATURE_HEADER = "x-amz-trailer-signature:";
private String previousChunkSignature;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import software.amazon.awssdk.core.internal.http.HttpClientDependencies;
import software.amazon.awssdk.core.internal.http.RequestExecutionContext;
import software.amazon.awssdk.core.internal.http.pipeline.MutableRequestToRequestPipeline;
import software.amazon.awssdk.core.internal.sync.CompressionContentStreamProvider;
import software.amazon.awssdk.http.ContentStreamProvider;
import software.amazon.awssdk.http.SdkHttpFullRequest;
import software.amazon.awssdk.utils.IoUtils;
Expand Down Expand Up @@ -67,10 +68,21 @@ public SdkHttpFullRequest.Builder execute(SdkHttpFullRequest.Builder input, Requ
compressEntirePayload(input, compressor);
updateContentEncodingHeader(input, compressor);
updateContentLengthHeader(input);
return input;
}

if (!isTransferEncodingChunked(input)) {
return input;
}

// TODO : streaming - sync & async
if (context.requestProvider() == null) {
// sync streaming
input.contentStreamProvider(new CompressionContentStreamProvider(input.contentStreamProvider(), compressor));
}

// TODO : streaming - async

updateContentEncodingHeader(input, compressor);
return input;
}

Expand Down Expand Up @@ -123,6 +135,12 @@ private void updateContentLengthHeader(SdkHttpFullRequest.Builder input) {
}
}

private boolean isTransferEncodingChunked(SdkHttpFullRequest.Builder input) {
return input.firstMatchingHeader("Transfer-Encoding")
.map(headerValue -> headerValue.equals("chunked"))
.orElse(false);
}

private Compressor resolveCompressorType(ExecutionAttributes executionAttributes) {
List<String> encodings =
executionAttributes.getAttribute(SdkInternalExecutionAttribute.REQUEST_COMPRESSION).getEncodings();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.checksums.SdkChecksum;
import software.amazon.awssdk.core.internal.chunked.AwsChunkedEncodingConfig;
import software.amazon.awssdk.core.io.SdkInputStream;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.Validate;

/**
Expand All @@ -37,37 +35,18 @@
* the wrapped stream.
*/
@SdkInternalApi
public abstract class AwsChunkedEncodingInputStream extends SdkInputStream {
public abstract class AwsChunkedEncodingInputStream extends AwsChunkedInputStream {

public static final int DEFAULT_CHUNK_SIZE = 128 * 1024;
protected static final int SKIP_BUFFER_SIZE = 256 * 1024;
protected static final String CRLF = "\r\n";
protected static final byte[] FINAL_CHUNK = new byte[0];
protected static final String HEADER_COLON_SEPARATOR = ":";
private static final Logger log = Logger.loggerFor(AwsChunkedEncodingInputStream.class);
protected byte[] calculatedChecksum = null;
protected final String checksumHeaderForTrailer;
protected boolean isTrailingTerminated = true;
private InputStream is = null;
private final int chunkSize;
private final int maxBufferSize;
private final SdkChecksum sdkChecksum;
private boolean isLastTrailingCrlf;
/**
* Iterator on the current chunk.
*/
private ChunkContentIterator currentChunkIterator;

/**
* Iterator on the buffer of the decoded stream,
* Null if the wrapped stream is marksupported,
* otherwise it will be initialized when this wrapper is marked.
*/
private DecodedStreamBuffer decodedStreamBuffer;

private boolean isAtStart = true;
private boolean isTerminating = false;


/**
* Creates a chunked encoding input stream initialized with the originating stream. The configuration allows
Expand All @@ -89,10 +68,10 @@ protected AwsChunkedEncodingInputStream(InputStream in,
AwsChunkedEncodingInputStream originalChunkedStream = (AwsChunkedEncodingInputStream) in;
providedMaxBufferSize = Math.max(originalChunkedStream.maxBufferSize, providedMaxBufferSize);
is = originalChunkedStream.is;
decodedStreamBuffer = originalChunkedStream.decodedStreamBuffer;
underlyingStreamBuffer = originalChunkedStream.underlyingStreamBuffer;
} else {
is = in;
decodedStreamBuffer = null;
underlyingStreamBuffer = null;
}
this.chunkSize = awsChunkedEncodingConfig.chunkSize();
this.maxBufferSize = providedMaxBufferSize;
Expand Down Expand Up @@ -153,19 +132,6 @@ public T checksumHeaderForTrailer(String checksumHeaderForTrailer) {

}

@Override
public int read() throws IOException {
byte[] tmp = new byte[1];
int count = read(tmp, 0, 1);
if (count > 0) {
log.debug(() -> "One byte read from the stream.");
int unsignedByte = (int) tmp[0] & 0xFF;
return unsignedByte;
} else {
return count;
}
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
abortIfNeeded();
Expand Down Expand Up @@ -211,32 +177,6 @@ private boolean setUpTrailingChunks() {
return true;
}

@Override
public long skip(long n) throws IOException {
if (n <= 0) {
return 0;
}
long remaining = n;
int toskip = (int) Math.min(SKIP_BUFFER_SIZE, n);
byte[] temp = new byte[toskip];
while (remaining > 0) {
int count = read(temp, 0, toskip);
if (count < 0) {
break;
}
remaining -= count;
}
return n - remaining;
}

/**
* @see java.io.InputStream#markSupported()
*/
@Override
public boolean markSupported() {
return true;
}

/**
* The readlimit parameter is ignored.
*/
Expand All @@ -256,7 +196,7 @@ public void mark(int readlimit) {
} else {
log.debug(() -> "AwsChunkedEncodingInputStream marked at the start of the stream "
+ "(initializing the buffer since the wrapped stream is not mark-supported).");
decodedStreamBuffer = new DecodedStreamBuffer(maxBufferSize);
underlyingStreamBuffer = new UnderlyingStreamBuffer(maxBufferSize);
}
}

Expand All @@ -280,8 +220,8 @@ public void reset() throws IOException {
is.reset();
} else {
log.debug(() -> "AwsChunkedEncodingInputStream reset (will use the buffer of the decoded stream).");
Validate.notNull(decodedStreamBuffer, "Cannot reset the stream because the mark is not set.");
decodedStreamBuffer.startReadBuffer();
Validate.notNull(underlyingStreamBuffer, "Cannot reset the stream because the mark is not set.");
underlyingStreamBuffer.startReadBuffer();
}
isAtStart = true;
isTerminating = false;
Expand All @@ -298,14 +238,14 @@ private boolean setUpNextChunk() throws IOException {
int chunkSizeInBytes = 0;
while (chunkSizeInBytes < chunkSize) {
/** Read from the buffer of the decoded stream */
if (null != decodedStreamBuffer && decodedStreamBuffer.hasNext()) {
chunkData[chunkSizeInBytes++] = decodedStreamBuffer.next();
if (null != underlyingStreamBuffer && underlyingStreamBuffer.hasNext()) {
chunkData[chunkSizeInBytes++] = underlyingStreamBuffer.next();
} else { /** Read from the wrapped stream */
int bytesToRead = chunkSize - chunkSizeInBytes;
int count = is.read(chunkData, chunkSizeInBytes, bytesToRead);
if (count != -1) {
if (null != decodedStreamBuffer) {
decodedStreamBuffer.buffer(chunkData, chunkSizeInBytes, count);
if (null != underlyingStreamBuffer) {
underlyingStreamBuffer.buffer(chunkData, chunkSizeInBytes, count);
}
chunkSizeInBytes += count;
} else {
Expand Down Expand Up @@ -333,13 +273,6 @@ private boolean setUpNextChunk() throws IOException {
}
}


@Override
protected InputStream getWrappedInputStream() {
return is;
}


/**
* The final chunk.
*
Expand All @@ -361,5 +294,4 @@ protected InputStream getWrappedInputStream() {
* @return ChecksumChunkHeader in bytes based on the Header name field.
*/
protected abstract byte[] createChecksumChunkHeader();

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.io;

import java.io.IOException;
import java.io.InputStream;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.io.SdkInputStream;
import software.amazon.awssdk.utils.Logger;

/**
* A wrapper of InputStream that implements streaming in chunks.
*/
@SdkInternalApi
public abstract class AwsChunkedInputStream extends SdkInputStream {
public static final int DEFAULT_CHUNK_SIZE = 128 * 1024;
protected static final int SKIP_BUFFER_SIZE = 256 * 1024;
protected static final Logger log = Logger.loggerFor(AwsChunkedInputStream.class);
protected InputStream is;
/**
* Iterator on the current chunk.
*/
protected ChunkContentIterator currentChunkIterator;

/**
* Iterator on the buffer of the underlying stream,
* Null if the wrapped stream is marksupported,
* otherwise it will be initialized when this wrapper is marked.
*/
protected UnderlyingStreamBuffer underlyingStreamBuffer;
protected boolean isAtStart = true;
protected boolean isTerminating = false;

@Override
public int read() throws IOException {
byte[] tmp = new byte[1];
int count = read(tmp, 0, 1);
if (count > 0) {
log.debug(() -> "One byte read from the stream.");
int unsignedByte = (int) tmp[0] & 0xFF;
return unsignedByte;
} else {
return count;
}
}

@Override
public long skip(long n) throws IOException {
if (n <= 0) {
return 0;
}
long remaining = n;
int toskip = (int) Math.min(SKIP_BUFFER_SIZE, n);
byte[] temp = new byte[toskip];
while (remaining > 0) {
int count = read(temp, 0, toskip);
if (count < 0) {
break;
}
remaining -= count;
}
return n - remaining;
}

/**
* @see InputStream#markSupported()
*/
@Override
public boolean markSupported() {
return true;
}

@Override
protected InputStream getWrappedInputStream() {
return is;
}
}
Loading