Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/feature-AmazonS3-d7fcc2e.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "feature",
"category": "Amazon S3",
"contributor": "",
"description": "Added BufferedSplittableAsyncRequestBody.builder() with bufferBeforeSend option that fully buffers each multipart upload part before sending, fixing NonRetryableException when retrying parts from slow streaming sources."
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import software.amazon.awssdk.core.internal.async.SplittingPublisher;
import software.amazon.awssdk.utils.Validate;


/**
* An {@link AsyncRequestBody} decorator that enables splitting into retryable sub-request bodies.
*
Expand All @@ -34,11 +35,19 @@
* If the first subscriber fails to consume all the data (e.g., due to early cancellation or errors),
* subsequent retry attempts will fail since the complete data set is not available for resubscription.</p>
*
* <p><b>Usage Example:</b></p>
* <p><b>Usage Examples:</b></p>
* {@snippet :
* // Simple usage (default behavior, immediate send):
* AsyncRequestBody originalBody = AsyncRequestBody.fromString("Hello World");
* BufferedSplittableAsyncRequestBody retryableBody =
* BufferedSplittableAsyncRequestBody.create(originalBody);
*
* // With buffer-before-send enabled for slow streaming sources:
* BufferedSplittableAsyncRequestBody fullBufferedBody =
* BufferedSplittableAsyncRequestBody.builder()
* .asyncRequestBody(originalBody)
* .bufferBeforeSend(true)
* .build();
* }
*
* <p><b>Performance Considerations:</b></p>
Expand All @@ -54,21 +63,35 @@
@SdkPublicApi
public final class BufferedSplittableAsyncRequestBody implements AsyncRequestBody {
private final AsyncRequestBody delegate;
private final boolean bufferBeforeSend;

private BufferedSplittableAsyncRequestBody(AsyncRequestBody delegate) {
this.delegate = delegate;
private BufferedSplittableAsyncRequestBody(Builder builder) {
this.delegate = Validate.paramNotNull(builder.asyncRequestBody, "asyncRequestBody");
this.bufferBeforeSend = Validate.getOrDefault(builder.bufferBeforeSend, () -> false);
}

/**
* Creates a new {@link BufferedSplittableAsyncRequestBody} that wraps the provided {@link AsyncRequestBody}.
*
* <p>Full buffering is disabled by default. Each part is sent to the downstream subscriber immediately
* upon initialization in the known-content-length path.
*
* @param delegate the {@link AsyncRequestBody} to wrap and make retryable. Must not be null.
* @return a new {@link BufferedSplittableAsyncRequestBody} instance
* @throws NullPointerException if delegate is null
*/
public static BufferedSplittableAsyncRequestBody create(AsyncRequestBody delegate) {
Validate.paramNotNull(delegate, "delegate");
return new BufferedSplittableAsyncRequestBody(delegate);
return builder().asyncRequestBody(delegate).build();
}

/**
* Returns a new {@link Builder} for creating a {@link BufferedSplittableAsyncRequestBody} with configuration options.
*
* @return a new builder instance
*/
public static Builder builder() {
return new Builder();
}

@Override
Expand Down Expand Up @@ -98,6 +121,7 @@ public SdkPublisher<CloseableAsyncRequestBody> splitCloseable(AsyncRequestBodySp
.asyncRequestBody(this)
.splitConfiguration(splitConfiguration)
.retryableSubAsyncRequestBodyEnabled(true)
.bufferBeforeSend(bufferBeforeSend)
.build();
}

Expand All @@ -110,4 +134,55 @@ public void subscribe(Subscriber<? super ByteBuffer> s) {
public String body() {
return delegate.body();
}

/**
* Builder for {@link BufferedSplittableAsyncRequestBody}.
*/
public static final class Builder {
private AsyncRequestBody asyncRequestBody;
private Boolean bufferBeforeSend = null;

private Builder() {
}

/**
* Sets the {@link AsyncRequestBody} to wrap and make retryable.
*
* @param asyncRequestBody the request body to wrap. Must not be null.
* @return this builder for method chaining
*/
public Builder asyncRequestBody(AsyncRequestBody asyncRequestBody) {
this.asyncRequestBody = asyncRequestBody;
return this;
}

/**
* Configures whether to fully buffer each part before sending it downstream.
*
* <p>When enabled, each part is fully buffered before being sent to the downstream subscriber.
* This guarantees that the retry buffer is always populated before the HTTP layer subscribes,
* making per-part retry deterministically successful for slow streaming sources (e.g., SFTP).
*
* <p>When disabled (the default), each part is sent immediately upon initialization in the
* known-content-length path, allowing the HTTP connection to open while data is still arriving.
*
* @param bufferBeforeSend whether to enable full buffering before sending parts downstream.
* Defaults to {@code false}.
* @return this builder for method chaining
*/
public Builder bufferBeforeSend(Boolean bufferBeforeSend) {
this.bufferBeforeSend = bufferBeforeSend;
return this;
}

/**
* Builds a new {@link BufferedSplittableAsyncRequestBody} instance.
*
* @return a new instance configured by this builder
* @throws NullPointerException if asyncRequestBody is null
*/
public BufferedSplittableAsyncRequestBody build() {
return new BufferedSplittableAsyncRequestBody(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
*
* <p>If content length is known, each {@link AsyncRequestBody} is sent to the subscriber right after it's initialized.
* Otherwise, it is sent after the entire content for that chunk is buffered. This is required to get content length.
* When {@code bufferBeforeSend} is set to {@code true}, the known-content-length path also defers sending until the
* part is fully buffered, guaranteeing that the retry buffer is populated before the downstream subscriber receives it.
*/
@SdkInternalApi
public class SplittingPublisher implements SdkPublisher<CloseableAsyncRequestBody> {
Expand All @@ -46,6 +48,7 @@ public class SplittingPublisher implements SdkPublisher<CloseableAsyncRequestBod
private final long chunkSizeInBytes;
private final long bufferSizeInBytes;
private final boolean retryableSubAsyncRequestBodyEnabled;
private final boolean bufferBeforeSend;
private final AtomicBoolean currentBodySent = new AtomicBoolean(false);
private final String sourceBodyName;

Expand All @@ -64,6 +67,7 @@ private SplittingPublisher(Builder builder) {

this.retryableSubAsyncRequestBodyEnabled = Validate.paramNotNull(builder.retryableSubAsyncRequestBodyEnabled,
"retryableSubAsyncRequestBodyEnabled");
this.bufferBeforeSend = builder.bufferBeforeSend;
this.sourceBodyName = builder.asyncRequestBody.body();
if (!upstreamPublisher.contentLength().isPresent()) {
Validate.isTrue(bufferSizeInBytes >= chunkSizeInBytes,
Expand Down Expand Up @@ -136,7 +140,7 @@ private SubAsyncRequestBody initializeNextDownstreamBody(boolean contentLengthKn
}

currentBodySent.set(false);
if (contentLengthKnown) {
if (contentLengthKnown && !bufferBeforeSend) {
sendCurrentBody(body);
}
return body;
Expand Down Expand Up @@ -234,7 +238,7 @@ private void completeCurrentBody() {

// Current body could be completed in either onNext or onComplete, so we need to guard against sending the last body
// twice.
if (upstreamSize == null && currentBodySent.compareAndSet(false, true)) {
if ((upstreamSize == null || bufferBeforeSend) && currentBodySent.compareAndSet(false, true)) {
sendCurrentBody(currentBody);
}
}
Expand Down Expand Up @@ -307,6 +311,7 @@ public static final class Builder {
private AsyncRequestBody asyncRequestBody;
private AsyncRequestBodySplitConfiguration splitConfiguration;
private Boolean retryableSubAsyncRequestBodyEnabled;
private boolean bufferBeforeSend = false;

private Builder() {
}
Expand Down Expand Up @@ -335,6 +340,16 @@ public Builder retryableSubAsyncRequestBodyEnabled(Boolean retryableSubAsyncRequ
return this;
}

/**
* Sets whether to enable full buffering before sending parts downstream.
* When enabled, parts are only sent to the downstream subscriber after
* all data for that part has been received and complete() has been called.
*/
public Builder bufferBeforeSend(boolean bufferBeforeSend) {
this.bufferBeforeSend = bufferBeforeSend;
return this;
}

/**
* Builds a {@link SplittingPublisher} object based on the values held by this builder.
*/
Expand Down
Loading
Loading