-
Notifications
You must be signed in to change notification settings - Fork 932
Stream retry support part 2: Introduce a new split method in AsyncRequestBody that returns an SdkP… #6346
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feature/master/mpu-stream-retry
Are you sure you want to change the base?
Conversation
93b3757
to
01a4cdb
Compare
01a4cdb
to
8f3a1ac
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces a new splitV2
method in AsyncRequestBody
that returns an SdkPublisher
of ClosableAsyncRequestBody
instead of AsyncRequestBody
, enabling proper resource management for split request bodies. This change is designed to support streaming retry for S3 multipart uploads.
- Adds
ClosableAsyncRequestBody
interface extendingAsyncRequestBody
with close capability - Updates S3 multipart client implementation to use the new
splitV2
method - Maintains backward compatibility by deprecating the original
split
method
Reviewed Changes
Copilot reviewed 17 out of 18 changed files in this pull request and generated 4 comments.
Show a summary per file
File | Description |
---|---|
ClosableAsyncRequestBody.java |
New interface extending AsyncRequestBody with close capability |
AsyncRequestBody.java |
Adds splitV2 method and deprecates original split method |
SplittingPublisher.java |
Updates to return ClosableAsyncRequestBody and improve resource management |
UploadWithUnknownContentLengthHelper.java |
Updated to use splitV2 and properly close request bodies |
UploadWithKnownContentLengthHelper.java |
Updated to use splitV2 method |
KnownContentLengthAsyncRequestBodySubscriber.java |
Updated to handle ClosableAsyncRequestBody and close resources |
Test files | Updated tests to use new interfaces and verify retry behavior |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
You can also share your feedback on Copilot code review for a chance to win a $100 gift card. Take the survey.
core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java
Outdated
Show resolved
Hide resolved
core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java
Outdated
Show resolved
Hide resolved
...tware/amazon/awssdk/services/s3/internal/multipart/UploadWithUnknownContentLengthHelper.java
Outdated
Show resolved
Hide resolved
...azon/awssdk/services/s3/internal/multipart/KnownContentLengthAsyncRequestBodySubscriber.java
Show resolved
Hide resolved
8f3a1ac
to
e0a046e
Compare
…ublisher of ClosableAsyncRequestBody and use it in s3 multipart client
e0a046e
to
d94cec4
Compare
3814adb
to
3ffa942
Compare
3ffa942
to
3c91cc9
Compare
...-core/src/main/java/software/amazon/awssdk/core/async/listener/AsyncRequestBodyListener.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
private void stubUploadFailsInitialAttemptCalls(int partNumber, ResponseDefinitionBuilder responseDefinitionBuilder) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have end to end test cases with Wiremock/stubs to test end to end to retryable error cases ?
Or can have separate task to cover end to end test w.r.t Multipart client error cases with different Part sizes and part numbers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added wiremock test for retryable error cases and this method is actually used in that test
3b244ec
to
29f911a
Compare
29f911a
to
004733b
Compare
...core/src/main/java/software/amazon/awssdk/core/async/BufferedSplittableAsyncRequestBody.java
Show resolved
Hide resolved
core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java
Outdated
Show resolved
Hide resolved
|
||
@Override | ||
public void close() { | ||
// no op |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why no op comment would be helpful ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically making it more explicit that this is deliberately left empty and not by mistake.
...rc/main/java/software/amazon/awssdk/core/internal/async/NonRetryableSubAsyncRequestBody.java
Outdated
Show resolved
Hide resolved
if (subscribeCalled.compareAndSet(false, true)) { | ||
delegate.subscribe(s); | ||
} else { | ||
s.onSubscribe(new NoopSubscription(s)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why doe we need to do this ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per reactive spec, onSubscribe must be signaled
Publisher.subscribe MUST call onSubscribe on the provided Subscriber prior to any other signals to that Subscriber
delegate.subscribe(s); | ||
} else { | ||
s.onSubscribe(new NoopSubscription(s)); | ||
s.onError(NonRetryableException.create( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is retry only the use case case where this re subscription can occur ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes for the SDK use case, but I suppose it could be some other use case if customers use split
directly (although we don't expect it to be used by users directly). I can update the message to make it more clear
log.debug(() -> "requesting data after closing" + partNumber); | ||
} | ||
} catch (Throwable e) { | ||
log.warn(() -> String.format("Unexpected error thrown from cleaning up AsyncRequestBody for part number %d, " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this just WARN and not error log ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used warn because it's not a immediate request failure per se. WDYT?
log.debug(() -> "Received complete() for part number: " + partNumber); | ||
// ByteBuffersAsyncRequestBody MUST be created before we complete the current | ||
// request because retry may happen right after | ||
bufferedAsyncRequestBody = ByteBuffersAsyncRequestBody.of(buffers, bufferedLength); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are passing buffers as it to ByteBuffersAsyncRequestBody , should we pass a copy new ArrayList<>(buffers)
and make it threadsafe too using synchronized (buffersLock)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra copy/buffering is not necessary because it's entirely used internally and we know we will not modify it. Yup, will add synchronized
// For unknown content length, we always create a new DownstreamBody because we don't know if there is data | ||
// left or not, so we need to only send the body if there is actually data | ||
long bufferedLength = currentBody.receivedBytesLength(); | ||
if (bufferedLength == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If possible just in response to this comment can you tell me how we will end up bufferedLength == 0
here ? As in for touch files () byte files)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's explained above :) Will move it down and clarify
For unknown content length, we always create a new DownstreamBody because we don't know if there is data eft or not, so we need to only send the body if there is actually data
...core/src/main/java/software/amazon/awssdk/core/async/BufferedSplittableAsyncRequestBody.java
Outdated
Show resolved
Hide resolved
core/sdk-core/src/main/java/software/amazon/awssdk/core/async/ClosableAsyncRequestBody.java
Outdated
Show resolved
Hide resolved
onNumBytesConsumed.accept((long) length); | ||
if (t != null) { | ||
error(t); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for my understanding, why is onNumBytesConsumed
always updated, instead of only when t == null
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The data is considered consumed once the future completes (exceptionally or not) since there's no retry anyway.
...rc/main/java/software/amazon/awssdk/core/internal/async/NonRetryableSubAsyncRequestBody.java
Outdated
Show resolved
Hide resolved
...e/src/main/java/software/amazon/awssdk/core/internal/async/RetryableSubAsyncRequestBody.java
Outdated
Show resolved
Hide resolved
...e/src/main/java/software/amazon/awssdk/core/internal/async/RetryableSubAsyncRequestBody.java
Outdated
Show resolved
Hide resolved
57cbc1f
to
43ee3ba
Compare
43ee3ba
to
0c2000e
Compare
|
…ublisher of ClosableAsyncRequestBody and use it in s3 multipart client
Motivation and Context
This PR introduces a new split method,
splitClosable
inAsyncRequestBody
that returns an SdkPublisher of ClosableAsyncRequestBody instead of AsyncRequestBody, enabling proper resource management for split request bodies. The change is primarily implemented to support streaming retry for S3 multipart client for retry Support retries for individual parts of a multipart upload in Java SDK V2 #6198Introduces ClosableAsyncRequestBody interface extending AsyncRequestBody with close capability
Updates S3 multipart client implementation to use the new splitClosable method
Testing
Added wiremock tests
Screenshots (if appropriate)
Types of changes
Checklist
mvn install
succeedsscripts/new-change
script and following the instructions. Commit the new file created by the script in.changes/next-release
with your changes.License