diff --git a/.changes/next-release/feature-AmazonS3-d7fcc2e.json b/.changes/next-release/feature-AmazonS3-d7fcc2e.json
new file mode 100644
index 00000000000..2556bbdcb39
--- /dev/null
+++ b/.changes/next-release/feature-AmazonS3-d7fcc2e.json
@@ -0,0 +1,6 @@
+{
+ "type": "feature",
+ "category": "Amazon S3",
+ "contributor": "",
+ "description": "Added BufferedSplittableAsyncRequestBody.builder() with bufferBeforeSend option that fully buffers each multipart upload part before sending, fixing NonRetryableException when retrying parts from slow streaming sources."
+}
diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BufferedSplittableAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BufferedSplittableAsyncRequestBody.java
index a1c46238dfe..8d874bb5bc1 100644
--- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BufferedSplittableAsyncRequestBody.java
+++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BufferedSplittableAsyncRequestBody.java
@@ -22,6 +22,7 @@
import software.amazon.awssdk.core.internal.async.SplittingPublisher;
import software.amazon.awssdk.utils.Validate;
+
/**
* An {@link AsyncRequestBody} decorator that enables splitting into retryable sub-request bodies.
*
@@ -34,11 +35,19 @@
* If the first subscriber fails to consume all the data (e.g., due to early cancellation or errors),
* subsequent retry attempts will fail since the complete data set is not available for resubscription.
*
- * Usage Example:
+ * Usage Examples:
* {@snippet :
+ * // Simple usage (default behavior, immediate send):
* AsyncRequestBody originalBody = AsyncRequestBody.fromString("Hello World");
* BufferedSplittableAsyncRequestBody retryableBody =
* BufferedSplittableAsyncRequestBody.create(originalBody);
+ *
+ * // With buffer-before-send enabled for slow streaming sources:
+ * BufferedSplittableAsyncRequestBody fullBufferedBody =
+ * BufferedSplittableAsyncRequestBody.builder()
+ * .asyncRequestBody(originalBody)
+ * .bufferBeforeSend(true)
+ * .build();
* }
*
* Performance Considerations:
@@ -54,21 +63,35 @@
@SdkPublicApi
public final class BufferedSplittableAsyncRequestBody implements AsyncRequestBody {
private final AsyncRequestBody delegate;
+ private final boolean bufferBeforeSend;
- private BufferedSplittableAsyncRequestBody(AsyncRequestBody delegate) {
- this.delegate = delegate;
+ private BufferedSplittableAsyncRequestBody(Builder builder) {
+ this.delegate = Validate.paramNotNull(builder.asyncRequestBody, "asyncRequestBody");
+ this.bufferBeforeSend = Validate.getOrDefault(builder.bufferBeforeSend, () -> false);
}
/**
* Creates a new {@link BufferedSplittableAsyncRequestBody} that wraps the provided {@link AsyncRequestBody}.
*
+ * Full buffering is disabled by default. Each part is sent to the downstream subscriber immediately
+ * upon initialization in the known-content-length path.
+ *
* @param delegate the {@link AsyncRequestBody} to wrap and make retryable. Must not be null.
* @return a new {@link BufferedSplittableAsyncRequestBody} instance
* @throws NullPointerException if delegate is null
*/
public static BufferedSplittableAsyncRequestBody create(AsyncRequestBody delegate) {
Validate.paramNotNull(delegate, "delegate");
- return new BufferedSplittableAsyncRequestBody(delegate);
+ return builder().asyncRequestBody(delegate).build();
+ }
+
+ /**
+ * Returns a new {@link Builder} for creating a {@link BufferedSplittableAsyncRequestBody} with configuration options.
+ *
+ * @return a new builder instance
+ */
+ public static Builder builder() {
+ return new Builder();
}
@Override
@@ -98,6 +121,7 @@ public SdkPublisher splitCloseable(AsyncRequestBodySp
.asyncRequestBody(this)
.splitConfiguration(splitConfiguration)
.retryableSubAsyncRequestBodyEnabled(true)
+ .bufferBeforeSend(bufferBeforeSend)
.build();
}
@@ -110,4 +134,61 @@ public void subscribe(Subscriber super ByteBuffer> s) {
public String body() {
return delegate.body();
}
+
+ /**
+ * Builder for {@link BufferedSplittableAsyncRequestBody}.
+ */
+ public static final class Builder {
+ private AsyncRequestBody asyncRequestBody;
+ private Boolean bufferBeforeSend = null;
+
+ private Builder() {
+ }
+
+ /**
+ * Sets the {@link AsyncRequestBody} to wrap and make retryable.
+ *
+ * @param asyncRequestBody the request body to wrap. Must not be null.
+ * @return this builder for method chaining
+ */
+ public Builder asyncRequestBody(AsyncRequestBody asyncRequestBody) {
+ this.asyncRequestBody = asyncRequestBody;
+ return this;
+ }
+
+ /**
+ * Configures whether to fully buffer each part before sending it downstream.
+ *
+ * When enabled, each part is fully buffered before being sent to the downstream subscriber.
+ * This guarantees that the retry buffer is always populated before the HTTP layer subscribes,
+ * making per-part retry deterministically successful for slow streaming sources (e.g., SFTP).
+ *
+ *
When disabled (the default), each part is sent immediately upon initialization in the
+ * known-content-length path, allowing the HTTP connection to open while data is still arriving.
+ *
+ *
Memory usage: Enabling this option does not increase the maximum memory footprint.
+ * Buffered data remains bounded by the {@code bufferSizeInBytes} configured in the
+ * {@link AsyncRequestBodySplitConfiguration}. The only behavioral difference is timing: data is
+ * held in the buffer slightly longer (until the part is complete) before being sent downstream,
+ * rather than being streamed out concurrently.
+ *
+ * @param bufferBeforeSend whether to enable full buffering before sending parts downstream.
+ * Defaults to {@code false}.
+ * @return this builder for method chaining
+ */
+ public Builder bufferBeforeSend(Boolean bufferBeforeSend) {
+ this.bufferBeforeSend = bufferBeforeSend;
+ return this;
+ }
+
+ /**
+ * Builds a new {@link BufferedSplittableAsyncRequestBody} instance.
+ *
+ * @return a new instance configured by this builder
+ * @throws NullPointerException if asyncRequestBody is null
+ */
+ public BufferedSplittableAsyncRequestBody build() {
+ return new BufferedSplittableAsyncRequestBody(this);
+ }
+ }
}
diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java
index 82e749c14cc..7aad91b5aea 100644
--- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java
+++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java
@@ -36,6 +36,8 @@
*
*
If content length is known, each {@link AsyncRequestBody} is sent to the subscriber right after it's initialized.
* Otherwise, it is sent after the entire content for that chunk is buffered. This is required to get content length.
+ * When {@code bufferBeforeSend} is set to {@code true}, the known-content-length path also defers sending until the
+ * part is fully buffered, guaranteeing that the retry buffer is populated before the downstream subscriber receives it.
*/
@SdkInternalApi
public class SplittingPublisher implements SdkPublisher {
@@ -46,6 +48,7 @@ public class SplittingPublisher implements SdkPublisher= chunkSizeInBytes,
"bufferSizeInBytes must be larger than or equal to " +
- "chunkSizeInBytes if the content length is unknown");
+ "chunkSizeInBytes when the content length is unknown or bufferBeforeSend is enabled");
}
}
@@ -136,7 +140,7 @@ private SubAsyncRequestBody initializeNextDownstreamBody(boolean contentLengthKn
}
currentBodySent.set(false);
- if (contentLengthKnown) {
+ if (contentLengthKnown && !bufferBeforeSend) {
sendCurrentBody(body);
}
return body;
@@ -234,7 +238,7 @@ private void completeCurrentBody() {
// Current body could be completed in either onNext or onComplete, so we need to guard against sending the last body
// twice.
- if (upstreamSize == null && currentBodySent.compareAndSet(false, true)) {
+ if ((upstreamSize == null || bufferBeforeSend) && currentBodySent.compareAndSet(false, true)) {
sendCurrentBody(currentBody);
}
}
@@ -307,6 +311,7 @@ public static final class Builder {
private AsyncRequestBody asyncRequestBody;
private AsyncRequestBodySplitConfiguration splitConfiguration;
private Boolean retryableSubAsyncRequestBodyEnabled;
+ private boolean bufferBeforeSend = false;
private Builder() {
}
@@ -335,6 +340,19 @@ public Builder retryableSubAsyncRequestBodyEnabled(Boolean retryableSubAsyncRequ
return this;
}
+ /**
+ * Sets whether to enable full buffering before sending parts downstream.
+ * When enabled, parts are only sent to the downstream subscriber after
+ * all data for that part has been received and complete() has been called.
+ *
+ * This does not increase the maximum memory footprint. Buffered data remains
+ * bounded by {@code bufferSizeInBytes} in the split configuration.
+ */
+ public Builder bufferBeforeSend(boolean bufferBeforeSend) {
+ this.bufferBeforeSend = bufferBeforeSend;
+ return this;
+ }
+
/**
* Builds a {@link SplittingPublisher} object based on the values held by this builder.
*/
diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java
index 39326ac4167..4bd0ffb3ef1 100644
--- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java
+++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java
@@ -48,6 +48,9 @@
import org.reactivestreams.Subscription;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration;
+import software.amazon.awssdk.core.async.BufferedSplittableAsyncRequestBody;
+import software.amazon.awssdk.core.async.CloseableAsyncRequestBody;
+import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.utils.BinaryUtils;
import software.amazon.awssdk.utils.Pair;
@@ -298,6 +301,203 @@ public void onComplete() {
assertThat(error).isEqualTo(upstreamError);
}
+ @Test
+ void bufferedSplittable_createWithFullBufferingTrue_defersPartEmission() throws Exception {
+ // Create a controlled async request body with known content length that delivers data in two chunks
+ byte[] data = new byte[10];
+ for (int i = 0; i < data.length; i++) {
+ data[i] = (byte) i;
+ }
+ AsyncRequestBody sourceBody = new AsyncRequestBody() {
+ @Override
+ public Optional contentLength() {
+ return Optional.of((long) data.length);
+ }
+
+ @Override
+ public void subscribe(Subscriber super ByteBuffer> s) {
+ s.onSubscribe(new Subscription() {
+ private boolean done = false;
+
+ @Override
+ public void request(long n) {
+ if (!done) {
+ done = true;
+ s.onNext(ByteBuffer.wrap(data));
+ s.onComplete();
+ }
+ }
+
+ @Override
+ public void cancel() {
+ }
+ });
+ }
+ };
+
+ // Use builder to enable full buffering
+ BufferedSplittableAsyncRequestBody bufferedBody = BufferedSplittableAsyncRequestBody.builder()
+ .asyncRequestBody(sourceBody)
+ .bufferBeforeSend(true)
+ .build();
+
+ // Verify that content length is propagated
+ assertThat(bufferedBody.contentLength()).hasValue((long) data.length);
+
+ // Split with a chunk size of 5 (two parts of 5 bytes each)
+ AsyncRequestBodySplitConfiguration splitConfig = AsyncRequestBodySplitConfiguration.builder()
+ .chunkSizeInBytes(5L)
+ .bufferSizeInBytes(20L)
+ .build();
+
+ SdkPublisher publisher = bufferedBody.splitCloseable(splitConfig);
+
+ // Track when parts are received by the downstream subscriber
+ List> partFutures = new ArrayList<>();
+
+ CompletableFuture subscribeFuture = publisher.subscribe(requestBody -> {
+ // Each part should arrive with data already available (fully buffered)
+ CompletableFuture partFuture = new CompletableFuture<>();
+ partFutures.add(partFuture);
+ requestBody.subscribe(new BaosSubscriber(partFuture));
+ partFuture.whenComplete((r, t) -> requestBody.close());
+ });
+
+ subscribeFuture.get(5, TimeUnit.SECONDS);
+
+ // Verify we received 2 parts with correct data
+ assertThat(partFutures.size()).isEqualTo(2);
+
+ byte[] firstPart = partFutures.get(0).get(5, TimeUnit.SECONDS);
+ byte[] secondPart = partFutures.get(1).get(5, TimeUnit.SECONDS);
+
+ byte[] expectedFirst = new byte[]{0, 1, 2, 3, 4};
+ byte[] expectedSecond = new byte[]{5, 6, 7, 8, 9};
+
+ assertThat(firstPart).isEqualTo(expectedFirst);
+ assertThat(secondPart).isEqualTo(expectedSecond);
+ }
+
+ @Test
+ void bufferedSplittable_createDefault_sendsPartsImmediately() throws Exception {
+ // Create a body with known content length
+ byte[] data = new byte[10];
+ for (int i = 0; i < data.length; i++) {
+ data[i] = (byte) i;
+ }
+ AsyncRequestBody sourceBody = new AsyncRequestBody() {
+ @Override
+ public Optional contentLength() {
+ return Optional.of((long) data.length);
+ }
+
+ @Override
+ public void subscribe(Subscriber super ByteBuffer> s) {
+ s.onSubscribe(new Subscription() {
+ private boolean done = false;
+
+ @Override
+ public void request(long n) {
+ if (!done) {
+ done = true;
+ s.onNext(ByteBuffer.wrap(data));
+ s.onComplete();
+ }
+ }
+
+ @Override
+ public void cancel() {
+ }
+ });
+ }
+ };
+
+ // Use default create(body) - full buffering should be disabled
+ BufferedSplittableAsyncRequestBody bufferedBody = BufferedSplittableAsyncRequestBody.create(sourceBody);
+
+ // Verify content length is propagated
+ assertThat(bufferedBody.contentLength()).hasValue((long) data.length);
+
+ // Split with chunk size of 5 (two parts of 5 bytes each)
+ AsyncRequestBodySplitConfiguration splitConfig = AsyncRequestBodySplitConfiguration.builder()
+ .chunkSizeInBytes(5L)
+ .bufferSizeInBytes(20L)
+ .build();
+
+ SdkPublisher publisher = bufferedBody.splitCloseable(splitConfig);
+
+ // Track parts received
+ List> partFutures = new ArrayList<>();
+
+ CompletableFuture subscribeFuture = publisher.subscribe(requestBody -> {
+ CompletableFuture partFuture = new CompletableFuture<>();
+ partFutures.add(partFuture);
+ requestBody.subscribe(new BaosSubscriber(partFuture));
+ partFuture.whenComplete((r, t) -> requestBody.close());
+ });
+
+ subscribeFuture.get(5, TimeUnit.SECONDS);
+
+ // Verify we received 2 parts with correct data (existing behavior preserved)
+ assertThat(partFutures.size()).isEqualTo(2);
+
+ byte[] firstPart = partFutures.get(0).get(5, TimeUnit.SECONDS);
+ byte[] secondPart = partFutures.get(1).get(5, TimeUnit.SECONDS);
+
+ byte[] expectedFirst = new byte[]{0, 1, 2, 3, 4};
+ byte[] expectedSecond = new byte[]{5, 6, 7, 8, 9};
+
+ assertThat(firstPart).isEqualTo(expectedFirst);
+ assertThat(secondPart).isEqualTo(expectedSecond);
+ }
+
+ @Test
+ void bufferedSplittable_createWithFullBufferingTrue_partsAreRetryable() throws Exception {
+ // Verify that create(body, true) produces retryable sub-bodies (retry buffer is populated)
+ byte[] data = new byte[10];
+ for (int i = 0; i < data.length; i++) {
+ data[i] = (byte) i;
+ }
+ AsyncRequestBody sourceBody = AsyncRequestBody.fromBytes(data);
+
+ BufferedSplittableAsyncRequestBody bufferedBody = BufferedSplittableAsyncRequestBody.builder()
+ .asyncRequestBody(sourceBody)
+ .bufferBeforeSend(true)
+ .build();
+
+ AsyncRequestBodySplitConfiguration splitConfig = AsyncRequestBodySplitConfiguration.builder()
+ .chunkSizeInBytes(5L)
+ .bufferSizeInBytes(20L)
+ .build();
+
+ SdkPublisher publisher = bufferedBody.splitCloseable(splitConfig);
+
+ // Subscribe, read each part, then resubscribe to verify retry buffer is available
+ Map, CompletableFuture>> futures = new HashMap<>();
+ AtomicInteger index = new AtomicInteger();
+
+ publisher.subscribe(requestBody -> {
+ int i = index.getAndIncrement();
+ CompletableFuture firstRead = new CompletableFuture<>();
+ requestBody.subscribe(new BaosSubscriber(firstRead));
+
+ firstRead.whenComplete((r, t) -> {
+ // Resubscribe to verify retry works
+ CompletableFuture secondRead = new CompletableFuture<>();
+ requestBody.subscribe(new BaosSubscriber(secondRead));
+ futures.put(i, Pair.of(firstRead, secondRead));
+ secondRead.whenComplete((res, throwable) -> requestBody.close());
+ });
+ }).get(5, TimeUnit.SECONDS);
+
+ // Verify all parts can be re-read (retry buffer is populated before downstream subscription)
+ for (int i = 0; i < futures.size(); i++) {
+ byte[] firstReadData = futures.get(i).left().get(5, TimeUnit.SECONDS);
+ byte[] secondReadData = futures.get(i).right().get(5, TimeUnit.SECONDS);
+ assertThat(firstReadData).isEqualTo(secondReadData);
+ }
+ }
+
private static void verifySplitContent(AsyncRequestBody asyncRequestBody, int chunkSize) throws Exception {
SplittingPublisher splittingPublisher = SplittingPublisher.builder()
.asyncRequestBody(asyncRequestBody)
@@ -356,6 +556,393 @@ public void cancel() {
}
}
+ // ==================== Tests for bufferBeforeSend ====================
+
+ @Test
+ void bufferBeforeSend_knownContentLength_defersBodyUntilComplete() throws Exception {
+ // When bufferBeforeSend=true and content length is known, the downstream subscriber
+ // should NOT receive the body until completeCurrentBody() is invoked (i.e., after the part is fully buffered).
+ byte[] data = new byte[10];
+ for (int i = 0; i < data.length; i++) {
+ data[i] = (byte) i;
+ }
+
+ // Use a controlled upstream that sends data in pieces
+ ControlledAsyncRequestBody controlledBody = new ControlledAsyncRequestBody(Optional.of((long) data.length));
+
+ SplittingPublisher splittingPublisher = SplittingPublisher.builder()
+ .asyncRequestBody(controlledBody)
+ .splitConfiguration(AsyncRequestBodySplitConfiguration.builder()
+ .chunkSizeInBytes(10L)
+ .bufferSizeInBytes(20L)
+ .build())
+ .retryableSubAsyncRequestBodyEnabled(true)
+ .bufferBeforeSend(true)
+ .build();
+
+ List> receivedBodies = new ArrayList<>();
+ CompletableFuture subscribeFuture = splittingPublisher.subscribe(requestBody -> {
+ CompletableFuture bodyFuture = new CompletableFuture<>();
+ receivedBodies.add(bodyFuture);
+ BaosSubscriber subscriber = new BaosSubscriber(bodyFuture);
+ requestBody.subscribe(subscriber);
+ });
+
+ // Give time for subscription to be set up
+ Thread.sleep(100);
+
+ // Send partial data — the body should NOT have been emitted yet
+ controlledBody.sendData(ByteBuffer.wrap(data, 0, 5));
+ Thread.sleep(100);
+ assertThat(receivedBodies.size()).isEqualTo(0);
+
+ // Send remaining data and complete
+ controlledBody.sendData(ByteBuffer.wrap(data, 5, 5));
+ controlledBody.complete();
+
+ subscribeFuture.get(5, TimeUnit.SECONDS);
+
+ // Now the body should have been emitted and completed
+ assertThat(receivedBodies.size()).isEqualTo(1);
+ byte[] result = receivedBodies.get(0).get(5, TimeUnit.SECONDS);
+ assertThat(result).isEqualTo(data);
+ }
+
+ @Test
+ void bufferBeforeSendDisabled_knownContentLength_sendsImmediately() throws Exception {
+ // When bufferBeforeSend=false (default) and content length is known, the body should
+ // be sent to the downstream subscriber immediately upon initialization.
+ byte[] data = new byte[10];
+ for (int i = 0; i < data.length; i++) {
+ data[i] = (byte) i;
+ }
+
+ ControlledAsyncRequestBody controlledBody = new ControlledAsyncRequestBody(Optional.of((long) data.length));
+
+ SplittingPublisher splittingPublisher = SplittingPublisher.builder()
+ .asyncRequestBody(controlledBody)
+ .splitConfiguration(AsyncRequestBodySplitConfiguration.builder()
+ .chunkSizeInBytes(10L)
+ .bufferSizeInBytes(20L)
+ .build())
+ .retryableSubAsyncRequestBodyEnabled(true)
+ .bufferBeforeSend(false)
+ .build();
+
+ List> receivedBodies = new ArrayList<>();
+ splittingPublisher.subscribe(requestBody -> {
+ CompletableFuture bodyFuture = new CompletableFuture<>();
+ receivedBodies.add(bodyFuture);
+ BaosSubscriber subscriber = new BaosSubscriber(bodyFuture);
+ requestBody.subscribe(subscriber);
+ });
+
+ // Give time for subscription to be set up — the body should be sent immediately
+ Thread.sleep(100);
+ assertThat(receivedBodies.size()).isEqualTo(1);
+
+ // Now send data and complete
+ controlledBody.sendData(ByteBuffer.wrap(data));
+ controlledBody.complete();
+
+ byte[] result = receivedBodies.get(0).get(5, TimeUnit.SECONDS);
+ assertThat(result).isEqualTo(data);
+ }
+
+ @Test
+ void bufferBeforeSend_unknownContentLength_behaviorUnchanged() throws Exception {
+ // When content length is unknown, behavior is unchanged regardless of bufferBeforeSend.
+ // The body is always deferred until complete (existing behavior).
+ byte[] data = new byte[10];
+ for (int i = 0; i < data.length; i++) {
+ data[i] = (byte) i;
+ }
+
+ ControlledAsyncRequestBody controlledBody = new ControlledAsyncRequestBody(Optional.empty());
+
+ SplittingPublisher splittingPublisher = SplittingPublisher.builder()
+ .asyncRequestBody(controlledBody)
+ .splitConfiguration(AsyncRequestBodySplitConfiguration.builder()
+ .chunkSizeInBytes(10L)
+ .bufferSizeInBytes(20L)
+ .build())
+ .retryableSubAsyncRequestBodyEnabled(true)
+ .bufferBeforeSend(true)
+ .build();
+
+ List> receivedBodies = new ArrayList<>();
+ splittingPublisher.subscribe(requestBody -> {
+ CompletableFuture bodyFuture = new CompletableFuture<>();
+ receivedBodies.add(bodyFuture);
+ BaosSubscriber subscriber = new BaosSubscriber(bodyFuture);
+ requestBody.subscribe(subscriber);
+ });
+
+ // Give time for subscription to be set up
+ Thread.sleep(100);
+
+ // Send partial data — the body should NOT have been emitted yet (unknown-length path defers)
+ controlledBody.sendData(ByteBuffer.wrap(data, 0, 5));
+ Thread.sleep(100);
+ assertThat(receivedBodies.size()).isEqualTo(0);
+
+ // Send remaining data and complete
+ controlledBody.sendData(ByteBuffer.wrap(data, 5, 5));
+ controlledBody.complete();
+
+ // Now body should be emitted
+ Thread.sleep(200);
+ assertThat(receivedBodies.size()).isEqualTo(1);
+ byte[] result = receivedBodies.get(0).get(5, TimeUnit.SECONDS);
+ assertThat(result).isEqualTo(data);
+ }
+
+ @Test
+ void bufferBeforeSend_multiPart_allPartsDeferred() throws Exception {
+ // When splitting into multiple parts with bufferBeforeSend=true, all parts are deferred
+ // until fully buffered.
+ byte[] data = new byte[20];
+ for (int i = 0; i < data.length; i++) {
+ data[i] = (byte) i;
+ }
+
+ ControlledAsyncRequestBody controlledBody = new ControlledAsyncRequestBody(Optional.of((long) data.length));
+
+ SplittingPublisher splittingPublisher = SplittingPublisher.builder()
+ .asyncRequestBody(controlledBody)
+ .splitConfiguration(AsyncRequestBodySplitConfiguration.builder()
+ .chunkSizeInBytes(10L)
+ .bufferSizeInBytes(30L)
+ .build())
+ .retryableSubAsyncRequestBodyEnabled(true)
+ .bufferBeforeSend(true)
+ .build();
+
+ List> receivedBodies = new ArrayList<>();
+ CompletableFuture subscribeFuture = splittingPublisher.subscribe(requestBody -> {
+ CompletableFuture bodyFuture = new CompletableFuture<>();
+ receivedBodies.add(bodyFuture);
+ BaosSubscriber subscriber = new BaosSubscriber(bodyFuture);
+ requestBody.subscribe(subscriber);
+ });
+
+ // Give time for subscription
+ Thread.sleep(100);
+
+ // Send first chunk partially — no body should be emitted yet
+ controlledBody.sendData(ByteBuffer.wrap(data, 0, 5));
+ Thread.sleep(100);
+ assertThat(receivedBodies.size()).isEqualTo(0);
+
+ // Complete first chunk (10 bytes) — first body should now be emitted
+ controlledBody.sendData(ByteBuffer.wrap(data, 5, 5));
+ Thread.sleep(100);
+ assertThat(receivedBodies.size()).isEqualTo(1);
+
+ // Send second chunk partially — second body should not be emitted yet
+ controlledBody.sendData(ByteBuffer.wrap(data, 10, 5));
+ Thread.sleep(100);
+ assertThat(receivedBodies.size()).isEqualTo(1);
+
+ // Complete second chunk and signal upstream complete
+ controlledBody.sendData(ByteBuffer.wrap(data, 15, 5));
+ controlledBody.complete();
+
+ subscribeFuture.get(5, TimeUnit.SECONDS);
+
+ // Both bodies should now be emitted
+ assertThat(receivedBodies.size()).isEqualTo(2);
+
+ // Verify content of first part
+ byte[] firstPart = receivedBodies.get(0).get(5, TimeUnit.SECONDS);
+ byte[] expectedFirst = new byte[10];
+ System.arraycopy(data, 0, expectedFirst, 0, 10);
+ assertThat(firstPart).isEqualTo(expectedFirst);
+
+ // Verify content of second part
+ byte[] secondPart = receivedBodies.get(1).get(5, TimeUnit.SECONDS);
+ byte[] expectedSecond = new byte[10];
+ System.arraycopy(data, 10, expectedSecond, 0, 10);
+ assertThat(secondPart).isEqualTo(expectedSecond);
+ }
+
+ @Test
+ void bufferBeforeSend_upstreamError_doesNotSendIncompleteBody() throws Exception {
+ // When bufferBeforeSend=true and the upstream signals onError() before a part is fully buffered,
+ // the incomplete part body should NOT be sent downstream.
+ ControlledAsyncRequestBody controlledBody = new ControlledAsyncRequestBody(Optional.of(20L));
+
+ SplittingPublisher splittingPublisher = SplittingPublisher.builder()
+ .asyncRequestBody(controlledBody)
+ .splitConfiguration(AsyncRequestBodySplitConfiguration.builder()
+ .chunkSizeInBytes(10L)
+ .bufferSizeInBytes(20L)
+ .build())
+ .retryableSubAsyncRequestBodyEnabled(true)
+ .bufferBeforeSend(true)
+ .build();
+
+ List> receivedBodies = new ArrayList<>();
+ CompletableFuture downstreamError = new CompletableFuture<>();
+ splittingPublisher.subscribe(new Subscriber() {
+ @Override
+ public void onSubscribe(Subscription s) {
+ s.request(Long.MAX_VALUE);
+ }
+
+ @Override
+ public void onNext(CloseableAsyncRequestBody requestBody) {
+ CompletableFuture bodyFuture = new CompletableFuture<>();
+ receivedBodies.add(bodyFuture);
+ BaosSubscriber subscriber = new BaosSubscriber(bodyFuture);
+ requestBody.subscribe(subscriber);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ downstreamError.complete(t);
+ }
+
+ @Override
+ public void onComplete() {
+ }
+ });
+
+ // Give time for subscription
+ Thread.sleep(100);
+
+ // Send partial data (less than chunk size of 10)
+ controlledBody.sendData(ByteBuffer.wrap(new byte[5]));
+ Thread.sleep(100);
+
+ // No body should have been emitted (bufferBeforeSend defers until complete)
+ assertThat(receivedBodies.size()).isEqualTo(0);
+
+ // Signal upstream error
+ RuntimeException error = new RuntimeException("upstream failure");
+ controlledBody.sendError(error);
+
+ // The error should propagate to downstream subscriber
+ Throwable receivedError = downstreamError.get(5, TimeUnit.SECONDS);
+ assertThat(receivedError).isEqualTo(error);
+
+ // The incomplete body should NOT have been sent downstream
+ assertThat(receivedBodies.size()).isEqualTo(0);
+ }
+
+ // ==================== Tests for bufferBeforeSend validation ====================
+
+ @Test
+ void bufferBeforeSend_bufferSizeLessThanChunkSize_throwsException() {
+ // When bufferBeforeSend=true and bufferSizeInBytes < chunkSizeInBytes, construction should fail
+ // because the buffer cannot hold a full chunk, which would cause a deadlock.
+ assertThatThrownBy(() -> SplittingPublisher.builder()
+ .asyncRequestBody(AsyncRequestBody.fromString("hello"))
+ .splitConfiguration(AsyncRequestBodySplitConfiguration.builder()
+ .chunkSizeInBytes(10L)
+ .bufferSizeInBytes(5L)
+ .build())
+ .retryableSubAsyncRequestBodyEnabled(true)
+ .bufferBeforeSend(true)
+ .build())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("bufferSizeInBytes must be larger than or equal to chunkSizeInBytes");
+ }
+
+ @Test
+ void bufferBeforeSend_bufferSizeEqualToChunkSize_succeeds() {
+ // When bufferBeforeSend=true and bufferSizeInBytes == chunkSizeInBytes, construction should succeed.
+ SplittingPublisher publisher = SplittingPublisher.builder()
+ .asyncRequestBody(AsyncRequestBody.fromString("hello"))
+ .splitConfiguration(AsyncRequestBodySplitConfiguration.builder()
+ .chunkSizeInBytes(10L)
+ .bufferSizeInBytes(10L)
+ .build())
+ .retryableSubAsyncRequestBodyEnabled(true)
+ .bufferBeforeSend(true)
+ .build();
+ assertThat(publisher).isNotNull();
+ }
+
+ @Test
+ void bufferBeforeSendDisabled_bufferSizeLessThanChunkSize_knownContentLength_succeeds() {
+ // When bufferBeforeSend=false and content length is known, bufferSizeInBytes < chunkSizeInBytes
+ // is allowed because data flows through without full buffering.
+ SplittingPublisher publisher = SplittingPublisher.builder()
+ .asyncRequestBody(AsyncRequestBody.fromString("hello"))
+ .splitConfiguration(AsyncRequestBodySplitConfiguration.builder()
+ .chunkSizeInBytes(10L)
+ .bufferSizeInBytes(5L)
+ .build())
+ .retryableSubAsyncRequestBodyEnabled(true)
+ .bufferBeforeSend(false)
+ .build();
+ assertThat(publisher).isNotNull();
+ }
+
+ @Test
+ void unknownContentLength_bufferSizeLessThanChunkSize_throwsException() {
+ // Existing behavior: unknown content length with bufferSizeInBytes < chunkSizeInBytes should fail.
+ ControlledAsyncRequestBody unknownLengthBody = new ControlledAsyncRequestBody(Optional.empty());
+ assertThatThrownBy(() -> SplittingPublisher.builder()
+ .asyncRequestBody(unknownLengthBody)
+ .splitConfiguration(AsyncRequestBodySplitConfiguration.builder()
+ .chunkSizeInBytes(10L)
+ .bufferSizeInBytes(5L)
+ .build())
+ .retryableSubAsyncRequestBodyEnabled(true)
+ .bufferBeforeSend(false)
+ .build())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("bufferSizeInBytes must be larger than or equal to chunkSizeInBytes");
+ }
+
+ /**
+ * A controlled AsyncRequestBody that allows tests to send data, complete, and signal errors
+ * at specific times to test deferred/immediate behavior.
+ */
+ private static class ControlledAsyncRequestBody implements AsyncRequestBody {
+ private final Optional contentLength;
+ private volatile Subscriber super ByteBuffer> subscriber;
+ private volatile Subscription subscription;
+
+ ControlledAsyncRequestBody(Optional contentLength) {
+ this.contentLength = contentLength;
+ }
+
+ @Override
+ public Optional contentLength() {
+ return contentLength;
+ }
+
+ @Override
+ public void subscribe(Subscriber super ByteBuffer> s) {
+ this.subscriber = s;
+ s.onSubscribe(new Subscription() {
+ @Override
+ public void request(long n) {
+ // Controlled — data is sent explicitly via sendData()
+ }
+
+ @Override
+ public void cancel() {
+ }
+ });
+ }
+
+ void sendData(ByteBuffer data) {
+ subscriber.onNext(data);
+ }
+
+ void complete() {
+ subscriber.onComplete();
+ }
+
+ void sendError(Throwable t) {
+ subscriber.onError(t);
+ }
+ }
+
private static final class BaosSubscriber implements Subscriber {
private final CompletableFuture resultFuture;
diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientPutObjectWiremockTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientPutObjectWiremockTest.java
index 815abd982ef..42bb4f715c8 100644
--- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientPutObjectWiremockTest.java
+++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientPutObjectWiremockTest.java
@@ -27,6 +27,7 @@
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import static com.github.tomakehurst.wiremock.client.WireMock.verify;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import com.github.tomakehurst.wiremock.client.ResponseDefinitionBuilder;
@@ -52,6 +53,7 @@
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.SdkBytes;
@@ -241,5 +243,78 @@ public int read() {
}
};
}
+
+ /**
+ * Verifies that with full buffering enabled, a slow-streaming body with known content length
+ * can successfully retry a failed part upload. This simulates the SFTP scenario where data
+ * arrives slowly and the retry buffer must be populated before the HTTP layer subscribes.
+ */
+ @Test
+ void mpuWithBufferBeforeSend_slowStreamingKnownLength_retriesSuccessfullyOn500() {
+ // Stub CreateMultipartUpload (POST) and CompleteMultipartUpload (POST)
+ stubFor(post(anyUrl()).willReturn(aResponse().withStatus(200).withBody(CREATE_MULTIPART_PAYLOAD)));
+ // Part 1: first attempt returns 500, retry returns 200
+ stubUploadFailsInitialAttemptCalls(1, aResponse().withStatus(500));
+ // Part 2: succeeds on first attempt
+ stubFor(put(anyUrl())
+ .withQueryParam("partNumber", matching(String.valueOf(2)))
+ .willReturn(aResponse().withStatus(200)));
+
+ // Create a slow-streaming body with KNOWN content length (20 bytes total = 2 parts of 10 bytes)
+ int partSize = 10;
+ int totalSize = partSize * 2;
+ AsyncRequestBody slowStreamingBody = new AsyncRequestBody() {
+ @Override
+ public Optional contentLength() {
+ return Optional.of((long) totalSize);
+ }
+
+ @Override
+ public void subscribe(Subscriber super ByteBuffer> subscriber) {
+ subscriber.onSubscribe(new Subscription() {
+ private int bytesEmitted = 0;
+
+ @Override
+ public void request(long n) {
+ // Emit data in small chunks to simulate slow streaming (like SFTP)
+ for (long i = 0; i < n && bytesEmitted < totalSize; i++) {
+ int chunkSize = Math.min(5, totalSize - bytesEmitted);
+ ByteBuffer buffer = ByteBuffer.allocate(chunkSize);
+ for (int j = 0; j < chunkSize; j++) {
+ buffer.put((byte) ('a' + (bytesEmitted + j) % 26));
+ }
+ buffer.flip();
+ bytesEmitted += chunkSize;
+ subscriber.onNext(buffer);
+ }
+ if (bytesEmitted >= totalSize) {
+ subscriber.onComplete();
+ }
+ }
+
+ @Override
+ public void cancel() {
+ // no-op
+ }
+ });
+ }
+ };
+
+ // Wrap with full buffering enabled — this ensures retry buffer is populated before HTTP subscribe
+ BufferedSplittableAsyncRequestBody bufferedBody =
+ BufferedSplittableAsyncRequestBody.builder()
+ .asyncRequestBody(slowStreamingBody)
+ .bufferBeforeSend(true)
+ .build();
+
+ // The upload should complete successfully — retry works because full buffering
+ // ensures the retry buffer is populated before the HTTP layer subscribes
+ PutObjectResponse response = s3AsyncClient.putObject(b -> b.bucket(BUCKET).key(KEY), bufferedBody).join();
+ assertThat(response).isNotNull();
+
+ // Verify part 1 was attempted more than once (retry happened)
+ verify(moreThan(1), putRequestedFor(anyUrl()).withQueryParam("partNumber", matching(String.valueOf(1))));
+ verify(lessThanOrExactly(3), putRequestedFor(anyUrl()).withQueryParam("partNumber", matching(String.valueOf(1))));
+ }
}