From 6bf8c29c9e2761ca82e3749be7ba725096a2c8ef Mon Sep 17 00:00:00 2001 From: Alex Woods Date: Thu, 11 Jun 2026 09:37:03 -0700 Subject: [PATCH 1/4] Add fullBufferingEnabled opt-in option to BufferedSplittableAsyncRequestBody --- .../BufferedSplittableAsyncRequestBody.java | 30 +- .../internal/async/SplittingPublisher.java | 17 +- .../async/SplittingPublisherTest.java | 514 ++++++++++++++++++ ...3MultipartClientPutObjectWiremockTest.java | 72 +++ 4 files changed, 629 insertions(+), 4 deletions(-) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BufferedSplittableAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BufferedSplittableAsyncRequestBody.java index a1c46238dfe..2c8478a2e39 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BufferedSplittableAsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BufferedSplittableAsyncRequestBody.java @@ -54,21 +54,46 @@ @SdkPublicApi public final class BufferedSplittableAsyncRequestBody implements AsyncRequestBody { private final AsyncRequestBody delegate; + private final boolean fullBufferingEnabled; - private BufferedSplittableAsyncRequestBody(AsyncRequestBody delegate) { + private BufferedSplittableAsyncRequestBody(AsyncRequestBody delegate, boolean fullBufferingEnabled) { this.delegate = delegate; + this.fullBufferingEnabled = fullBufferingEnabled; } /** * Creates a new {@link BufferedSplittableAsyncRequestBody} that wraps the provided {@link AsyncRequestBody}. * + *

Full buffering is disabled by default. Each part is sent to the downstream subscriber immediately + * upon initialization in the known-content-length path (existing behavior). + * * @param delegate the {@link AsyncRequestBody} to wrap and make retryable. Must not be null. * @return a new {@link BufferedSplittableAsyncRequestBody} instance * @throws NullPointerException if delegate is null */ public static BufferedSplittableAsyncRequestBody create(AsyncRequestBody delegate) { Validate.paramNotNull(delegate, "delegate"); - return new BufferedSplittableAsyncRequestBody(delegate); + return new BufferedSplittableAsyncRequestBody(delegate, false); + } + + /** + * Creates a new {@link BufferedSplittableAsyncRequestBody} that wraps the provided {@link AsyncRequestBody}, + * with an option to enable full buffering before sending parts downstream. + * + *

When {@code fullBufferingEnabled} is {@code true}, each part is fully buffered before being sent to the + * downstream subscriber. This guarantees that the retry buffer is always populated before the HTTP layer + * subscribes, making per-part retry deterministically successful for slow streaming sources (e.g., SFTP). + * + *

When {@code fullBufferingEnabled} is {@code false}, behavior is identical to {@link #create(AsyncRequestBody)}. + * + * @param delegate the {@link AsyncRequestBody} to wrap and make retryable. Must not be null. + * @param fullBufferingEnabled whether to enable full buffering before sending parts downstream + * @return a new {@link BufferedSplittableAsyncRequestBody} instance + * @throws NullPointerException if delegate is null + */ + public static BufferedSplittableAsyncRequestBody create(AsyncRequestBody delegate, boolean fullBufferingEnabled) { + Validate.paramNotNull(delegate, "delegate"); + return new BufferedSplittableAsyncRequestBody(delegate, fullBufferingEnabled); } @Override @@ -98,6 +123,7 @@ public SdkPublisher splitCloseable(AsyncRequestBodySp .asyncRequestBody(this) .splitConfiguration(splitConfiguration) .retryableSubAsyncRequestBodyEnabled(true) + .fullBufferingEnabled(fullBufferingEnabled) .build(); } diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java index 82e749c14cc..4a167ee1171 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java @@ -46,6 +46,7 @@ public class SplittingPublisher implements SdkPublisher= chunkSizeInBytes, @@ -136,7 +138,7 @@ private SubAsyncRequestBody initializeNextDownstreamBody(boolean contentLengthKn } currentBodySent.set(false); - if (contentLengthKnown) { + if (contentLengthKnown && !fullBufferingEnabled) { sendCurrentBody(body); } return body; @@ -234,7 +236,7 @@ private void completeCurrentBody() { // Current body could be completed in either onNext or onComplete, so we need to guard against sending the last body // twice. - if (upstreamSize == null && currentBodySent.compareAndSet(false, true)) { + if ((upstreamSize == null || fullBufferingEnabled) && currentBodySent.compareAndSet(false, true)) { sendCurrentBody(currentBody); } } @@ -307,6 +309,7 @@ public static final class Builder { private AsyncRequestBody asyncRequestBody; private AsyncRequestBodySplitConfiguration splitConfiguration; private Boolean retryableSubAsyncRequestBodyEnabled; + private boolean fullBufferingEnabled = false; private Builder() { } @@ -335,6 +338,16 @@ public Builder retryableSubAsyncRequestBodyEnabled(Boolean retryableSubAsyncRequ return this; } + /** + * Sets whether to enable full buffering before sending parts downstream. + * When enabled, parts are only sent to the downstream subscriber after + * all data for that part has been received and complete() has been called. + */ + public Builder fullBufferingEnabled(boolean fullBufferingEnabled) { + this.fullBufferingEnabled = fullBufferingEnabled; + return this; + } + /** * Builds a {@link SplittingPublisher} object based on the values held by this builder. */ diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java index 39326ac4167..42cc9f3e6fb 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java @@ -48,6 +48,9 @@ import org.reactivestreams.Subscription; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration; +import software.amazon.awssdk.core.async.BufferedSplittableAsyncRequestBody; +import software.amazon.awssdk.core.async.CloseableAsyncRequestBody; +import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.utils.BinaryUtils; import software.amazon.awssdk.utils.Pair; @@ -298,6 +301,197 @@ public void onComplete() { assertThat(error).isEqualTo(upstreamError); } + @Test + void bufferedSplittable_createWithFullBufferingTrue_defersPartEmission() throws Exception { + // Create a controlled async request body with known content length that delivers data in two chunks + byte[] data = new byte[10]; + for (int i = 0; i < data.length; i++) { + data[i] = (byte) i; + } + AsyncRequestBody sourceBody = new AsyncRequestBody() { + @Override + public Optional contentLength() { + return Optional.of((long) data.length); + } + + @Override + public void subscribe(Subscriber s) { + s.onSubscribe(new Subscription() { + private boolean done = false; + + @Override + public void request(long n) { + if (!done) { + done = true; + s.onNext(ByteBuffer.wrap(data)); + s.onComplete(); + } + } + + @Override + public void cancel() { + } + }); + } + }; + + // Use create(body, true) to enable full buffering + BufferedSplittableAsyncRequestBody bufferedBody = BufferedSplittableAsyncRequestBody.create(sourceBody, true); + + // Verify that content length is propagated + assertThat(bufferedBody.contentLength()).hasValue((long) data.length); + + // Split with a chunk size of 5 (two parts of 5 bytes each) + AsyncRequestBodySplitConfiguration splitConfig = AsyncRequestBodySplitConfiguration.builder() + .chunkSizeInBytes(5L) + .bufferSizeInBytes(20L) + .build(); + + SdkPublisher publisher = bufferedBody.splitCloseable(splitConfig); + + // Track when parts are received by the downstream subscriber + List> partFutures = new ArrayList<>(); + + CompletableFuture subscribeFuture = publisher.subscribe(requestBody -> { + // Each part should arrive with data already available (fully buffered) + CompletableFuture partFuture = new CompletableFuture<>(); + partFutures.add(partFuture); + requestBody.subscribe(new BaosSubscriber(partFuture)); + partFuture.whenComplete((r, t) -> requestBody.close()); + }); + + subscribeFuture.get(5, TimeUnit.SECONDS); + + // Verify we received 2 parts with correct data + assertThat(partFutures.size()).isEqualTo(2); + + byte[] firstPart = partFutures.get(0).get(5, TimeUnit.SECONDS); + byte[] secondPart = partFutures.get(1).get(5, TimeUnit.SECONDS); + + byte[] expectedFirst = new byte[]{0, 1, 2, 3, 4}; + byte[] expectedSecond = new byte[]{5, 6, 7, 8, 9}; + + assertThat(firstPart).isEqualTo(expectedFirst); + assertThat(secondPart).isEqualTo(expectedSecond); + } + + @Test + void bufferedSplittable_createDefault_sendsPartsImmediately() throws Exception { + // Create a body with known content length + byte[] data = new byte[10]; + for (int i = 0; i < data.length; i++) { + data[i] = (byte) i; + } + AsyncRequestBody sourceBody = new AsyncRequestBody() { + @Override + public Optional contentLength() { + return Optional.of((long) data.length); + } + + @Override + public void subscribe(Subscriber s) { + s.onSubscribe(new Subscription() { + private boolean done = false; + + @Override + public void request(long n) { + if (!done) { + done = true; + s.onNext(ByteBuffer.wrap(data)); + s.onComplete(); + } + } + + @Override + public void cancel() { + } + }); + } + }; + + // Use default create(body) - full buffering should be disabled + BufferedSplittableAsyncRequestBody bufferedBody = BufferedSplittableAsyncRequestBody.create(sourceBody); + + // Verify content length is propagated + assertThat(bufferedBody.contentLength()).hasValue((long) data.length); + + // Split with chunk size of 5 (two parts of 5 bytes each) + AsyncRequestBodySplitConfiguration splitConfig = AsyncRequestBodySplitConfiguration.builder() + .chunkSizeInBytes(5L) + .bufferSizeInBytes(20L) + .build(); + + SdkPublisher publisher = bufferedBody.splitCloseable(splitConfig); + + // Track parts received + List> partFutures = new ArrayList<>(); + + CompletableFuture subscribeFuture = publisher.subscribe(requestBody -> { + CompletableFuture partFuture = new CompletableFuture<>(); + partFutures.add(partFuture); + requestBody.subscribe(new BaosSubscriber(partFuture)); + partFuture.whenComplete((r, t) -> requestBody.close()); + }); + + subscribeFuture.get(5, TimeUnit.SECONDS); + + // Verify we received 2 parts with correct data (existing behavior preserved) + assertThat(partFutures.size()).isEqualTo(2); + + byte[] firstPart = partFutures.get(0).get(5, TimeUnit.SECONDS); + byte[] secondPart = partFutures.get(1).get(5, TimeUnit.SECONDS); + + byte[] expectedFirst = new byte[]{0, 1, 2, 3, 4}; + byte[] expectedSecond = new byte[]{5, 6, 7, 8, 9}; + + assertThat(firstPart).isEqualTo(expectedFirst); + assertThat(secondPart).isEqualTo(expectedSecond); + } + + @Test + void bufferedSplittable_createWithFullBufferingTrue_partsAreRetryable() throws Exception { + // Verify that create(body, true) produces retryable sub-bodies (retry buffer is populated) + byte[] data = new byte[10]; + for (int i = 0; i < data.length; i++) { + data[i] = (byte) i; + } + AsyncRequestBody sourceBody = AsyncRequestBody.fromBytes(data); + + BufferedSplittableAsyncRequestBody bufferedBody = BufferedSplittableAsyncRequestBody.create(sourceBody, true); + + AsyncRequestBodySplitConfiguration splitConfig = AsyncRequestBodySplitConfiguration.builder() + .chunkSizeInBytes(5L) + .bufferSizeInBytes(20L) + .build(); + + SdkPublisher publisher = bufferedBody.splitCloseable(splitConfig); + + // Subscribe, read each part, then resubscribe to verify retry buffer is available + Map, CompletableFuture>> futures = new HashMap<>(); + AtomicInteger index = new AtomicInteger(); + + publisher.subscribe(requestBody -> { + int i = index.getAndIncrement(); + CompletableFuture firstRead = new CompletableFuture<>(); + requestBody.subscribe(new BaosSubscriber(firstRead)); + + firstRead.whenComplete((r, t) -> { + // Resubscribe to verify retry works + CompletableFuture secondRead = new CompletableFuture<>(); + requestBody.subscribe(new BaosSubscriber(secondRead)); + futures.put(i, Pair.of(firstRead, secondRead)); + secondRead.whenComplete((res, throwable) -> requestBody.close()); + }); + }).get(5, TimeUnit.SECONDS); + + // Verify all parts can be re-read (retry buffer is populated before downstream subscription) + for (int i = 0; i < futures.size(); i++) { + byte[] firstReadData = futures.get(i).left().get(5, TimeUnit.SECONDS); + byte[] secondReadData = futures.get(i).right().get(5, TimeUnit.SECONDS); + assertThat(firstReadData).isEqualTo(secondReadData); + } + } + private static void verifySplitContent(AsyncRequestBody asyncRequestBody, int chunkSize) throws Exception { SplittingPublisher splittingPublisher = SplittingPublisher.builder() .asyncRequestBody(asyncRequestBody) @@ -356,6 +550,326 @@ public void cancel() { } } + // ==================== Tests for fullBufferingEnabled ==================== + + @Test + void fullBufferingEnabled_knownContentLength_defersBodyUntilComplete() throws Exception { + // When fullBufferingEnabled=true and content length is known, the downstream subscriber + // should NOT receive the body until completeCurrentBody() is invoked (i.e., after the part is fully buffered). + byte[] data = new byte[10]; + for (int i = 0; i < data.length; i++) { + data[i] = (byte) i; + } + + // Use a controlled upstream that sends data in pieces + ControlledAsyncRequestBody controlledBody = new ControlledAsyncRequestBody(Optional.of((long) data.length)); + + SplittingPublisher splittingPublisher = SplittingPublisher.builder() + .asyncRequestBody(controlledBody) + .splitConfiguration(AsyncRequestBodySplitConfiguration.builder() + .chunkSizeInBytes(10L) + .bufferSizeInBytes(20L) + .build()) + .retryableSubAsyncRequestBodyEnabled(true) + .fullBufferingEnabled(true) + .build(); + + List> receivedBodies = new ArrayList<>(); + CompletableFuture subscribeFuture = splittingPublisher.subscribe(requestBody -> { + CompletableFuture bodyFuture = new CompletableFuture<>(); + receivedBodies.add(bodyFuture); + BaosSubscriber subscriber = new BaosSubscriber(bodyFuture); + requestBody.subscribe(subscriber); + }); + + // Give time for subscription to be set up + Thread.sleep(100); + + // Send partial data — the body should NOT have been emitted yet + controlledBody.sendData(ByteBuffer.wrap(data, 0, 5)); + Thread.sleep(100); + assertThat(receivedBodies.size()).isEqualTo(0); + + // Send remaining data and complete + controlledBody.sendData(ByteBuffer.wrap(data, 5, 5)); + controlledBody.complete(); + + subscribeFuture.get(5, TimeUnit.SECONDS); + + // Now the body should have been emitted and completed + assertThat(receivedBodies.size()).isEqualTo(1); + byte[] result = receivedBodies.get(0).get(5, TimeUnit.SECONDS); + assertThat(result).isEqualTo(data); + } + + @Test + void fullBufferingDisabled_knownContentLength_sendsImmediately() throws Exception { + // When fullBufferingEnabled=false (default) and content length is known, the body should + // be sent to the downstream subscriber immediately upon initialization. + byte[] data = new byte[10]; + for (int i = 0; i < data.length; i++) { + data[i] = (byte) i; + } + + ControlledAsyncRequestBody controlledBody = new ControlledAsyncRequestBody(Optional.of((long) data.length)); + + SplittingPublisher splittingPublisher = SplittingPublisher.builder() + .asyncRequestBody(controlledBody) + .splitConfiguration(AsyncRequestBodySplitConfiguration.builder() + .chunkSizeInBytes(10L) + .bufferSizeInBytes(20L) + .build()) + .retryableSubAsyncRequestBodyEnabled(true) + .fullBufferingEnabled(false) + .build(); + + List> receivedBodies = new ArrayList<>(); + splittingPublisher.subscribe(requestBody -> { + CompletableFuture bodyFuture = new CompletableFuture<>(); + receivedBodies.add(bodyFuture); + BaosSubscriber subscriber = new BaosSubscriber(bodyFuture); + requestBody.subscribe(subscriber); + }); + + // Give time for subscription to be set up — the body should be sent immediately + Thread.sleep(100); + assertThat(receivedBodies.size()).isEqualTo(1); + + // Now send data and complete + controlledBody.sendData(ByteBuffer.wrap(data)); + controlledBody.complete(); + + byte[] result = receivedBodies.get(0).get(5, TimeUnit.SECONDS); + assertThat(result).isEqualTo(data); + } + + @Test + void fullBufferingEnabled_unknownContentLength_behaviorUnchanged() throws Exception { + // When content length is unknown, behavior is unchanged regardless of fullBufferingEnabled. + // The body is always deferred until complete (existing behavior). + byte[] data = new byte[10]; + for (int i = 0; i < data.length; i++) { + data[i] = (byte) i; + } + + ControlledAsyncRequestBody controlledBody = new ControlledAsyncRequestBody(Optional.empty()); + + SplittingPublisher splittingPublisher = SplittingPublisher.builder() + .asyncRequestBody(controlledBody) + .splitConfiguration(AsyncRequestBodySplitConfiguration.builder() + .chunkSizeInBytes(10L) + .bufferSizeInBytes(20L) + .build()) + .retryableSubAsyncRequestBodyEnabled(true) + .fullBufferingEnabled(true) + .build(); + + List> receivedBodies = new ArrayList<>(); + splittingPublisher.subscribe(requestBody -> { + CompletableFuture bodyFuture = new CompletableFuture<>(); + receivedBodies.add(bodyFuture); + BaosSubscriber subscriber = new BaosSubscriber(bodyFuture); + requestBody.subscribe(subscriber); + }); + + // Give time for subscription to be set up + Thread.sleep(100); + + // Send partial data — the body should NOT have been emitted yet (unknown-length path defers) + controlledBody.sendData(ByteBuffer.wrap(data, 0, 5)); + Thread.sleep(100); + assertThat(receivedBodies.size()).isEqualTo(0); + + // Send remaining data and complete + controlledBody.sendData(ByteBuffer.wrap(data, 5, 5)); + controlledBody.complete(); + + // Now body should be emitted + Thread.sleep(200); + assertThat(receivedBodies.size()).isEqualTo(1); + byte[] result = receivedBodies.get(0).get(5, TimeUnit.SECONDS); + assertThat(result).isEqualTo(data); + } + + @Test + void fullBufferingEnabled_multiPart_allPartsDeferred() throws Exception { + // When splitting into multiple parts with fullBufferingEnabled=true, all parts are deferred + // until fully buffered. + byte[] data = new byte[20]; + for (int i = 0; i < data.length; i++) { + data[i] = (byte) i; + } + + ControlledAsyncRequestBody controlledBody = new ControlledAsyncRequestBody(Optional.of((long) data.length)); + + SplittingPublisher splittingPublisher = SplittingPublisher.builder() + .asyncRequestBody(controlledBody) + .splitConfiguration(AsyncRequestBodySplitConfiguration.builder() + .chunkSizeInBytes(10L) + .bufferSizeInBytes(30L) + .build()) + .retryableSubAsyncRequestBodyEnabled(true) + .fullBufferingEnabled(true) + .build(); + + List> receivedBodies = new ArrayList<>(); + CompletableFuture subscribeFuture = splittingPublisher.subscribe(requestBody -> { + CompletableFuture bodyFuture = new CompletableFuture<>(); + receivedBodies.add(bodyFuture); + BaosSubscriber subscriber = new BaosSubscriber(bodyFuture); + requestBody.subscribe(subscriber); + }); + + // Give time for subscription + Thread.sleep(100); + + // Send first chunk partially — no body should be emitted yet + controlledBody.sendData(ByteBuffer.wrap(data, 0, 5)); + Thread.sleep(100); + assertThat(receivedBodies.size()).isEqualTo(0); + + // Complete first chunk (10 bytes) — first body should now be emitted + controlledBody.sendData(ByteBuffer.wrap(data, 5, 5)); + Thread.sleep(100); + assertThat(receivedBodies.size()).isEqualTo(1); + + // Send second chunk partially — second body should not be emitted yet + controlledBody.sendData(ByteBuffer.wrap(data, 10, 5)); + Thread.sleep(100); + assertThat(receivedBodies.size()).isEqualTo(1); + + // Complete second chunk and signal upstream complete + controlledBody.sendData(ByteBuffer.wrap(data, 15, 5)); + controlledBody.complete(); + + subscribeFuture.get(5, TimeUnit.SECONDS); + + // Both bodies should now be emitted + assertThat(receivedBodies.size()).isEqualTo(2); + + // Verify content of first part + byte[] firstPart = receivedBodies.get(0).get(5, TimeUnit.SECONDS); + byte[] expectedFirst = new byte[10]; + System.arraycopy(data, 0, expectedFirst, 0, 10); + assertThat(firstPart).isEqualTo(expectedFirst); + + // Verify content of second part + byte[] secondPart = receivedBodies.get(1).get(5, TimeUnit.SECONDS); + byte[] expectedSecond = new byte[10]; + System.arraycopy(data, 10, expectedSecond, 0, 10); + assertThat(secondPart).isEqualTo(expectedSecond); + } + + @Test + void fullBufferingEnabled_upstreamError_doesNotSendIncompleteBody() throws Exception { + // When fullBufferingEnabled=true and the upstream signals onError() before a part is fully buffered, + // the incomplete part body should NOT be sent downstream. + ControlledAsyncRequestBody controlledBody = new ControlledAsyncRequestBody(Optional.of(20L)); + + SplittingPublisher splittingPublisher = SplittingPublisher.builder() + .asyncRequestBody(controlledBody) + .splitConfiguration(AsyncRequestBodySplitConfiguration.builder() + .chunkSizeInBytes(10L) + .bufferSizeInBytes(20L) + .build()) + .retryableSubAsyncRequestBodyEnabled(true) + .fullBufferingEnabled(true) + .build(); + + List> receivedBodies = new ArrayList<>(); + CompletableFuture downstreamError = new CompletableFuture<>(); + splittingPublisher.subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + s.request(Long.MAX_VALUE); + } + + @Override + public void onNext(CloseableAsyncRequestBody requestBody) { + CompletableFuture bodyFuture = new CompletableFuture<>(); + receivedBodies.add(bodyFuture); + BaosSubscriber subscriber = new BaosSubscriber(bodyFuture); + requestBody.subscribe(subscriber); + } + + @Override + public void onError(Throwable t) { + downstreamError.complete(t); + } + + @Override + public void onComplete() { + } + }); + + // Give time for subscription + Thread.sleep(100); + + // Send partial data (less than chunk size of 10) + controlledBody.sendData(ByteBuffer.wrap(new byte[5])); + Thread.sleep(100); + + // No body should have been emitted (fullBufferingEnabled defers until complete) + assertThat(receivedBodies.size()).isEqualTo(0); + + // Signal upstream error + RuntimeException error = new RuntimeException("upstream failure"); + controlledBody.sendError(error); + + // The error should propagate to downstream subscriber + Throwable receivedError = downstreamError.get(5, TimeUnit.SECONDS); + assertThat(receivedError).isEqualTo(error); + + // The incomplete body should NOT have been sent downstream + assertThat(receivedBodies.size()).isEqualTo(0); + } + + /** + * A controlled AsyncRequestBody that allows tests to send data, complete, and signal errors + * at specific times to test deferred/immediate behavior. + */ + private static class ControlledAsyncRequestBody implements AsyncRequestBody { + private final Optional contentLength; + private volatile Subscriber subscriber; + private volatile Subscription subscription; + + ControlledAsyncRequestBody(Optional contentLength) { + this.contentLength = contentLength; + } + + @Override + public Optional contentLength() { + return contentLength; + } + + @Override + public void subscribe(Subscriber s) { + this.subscriber = s; + s.onSubscribe(new Subscription() { + @Override + public void request(long n) { + // Controlled — data is sent explicitly via sendData() + } + + @Override + public void cancel() { + } + }); + } + + void sendData(ByteBuffer data) { + subscriber.onNext(data); + } + + void complete() { + subscriber.onComplete(); + } + + void sendError(Throwable t) { + subscriber.onError(t); + } + } + private static final class BaosSubscriber implements Subscriber { private final CompletableFuture resultFuture; diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientPutObjectWiremockTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientPutObjectWiremockTest.java index 815abd982ef..0f9bec28054 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientPutObjectWiremockTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientPutObjectWiremockTest.java @@ -27,6 +27,7 @@ import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; import static com.github.tomakehurst.wiremock.client.WireMock.verify; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import com.github.tomakehurst.wiremock.client.ResponseDefinitionBuilder; @@ -52,6 +53,7 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.core.SdkBytes; @@ -241,5 +243,75 @@ public int read() { } }; } + + /** + * Verifies that with full buffering enabled, a slow-streaming body with known content length + * can successfully retry a failed part upload. This simulates the SFTP scenario where data + * arrives slowly and the retry buffer must be populated before the HTTP layer subscribes. + */ + @Test + void mpuWithFullBufferingEnabled_slowStreamingKnownLength_retriesSuccessfullyOn500() { + // Stub CreateMultipartUpload (POST) and CompleteMultipartUpload (POST) + stubFor(post(anyUrl()).willReturn(aResponse().withStatus(200).withBody(CREATE_MULTIPART_PAYLOAD))); + // Part 1: first attempt returns 500, retry returns 200 + stubUploadFailsInitialAttemptCalls(1, aResponse().withStatus(500)); + // Part 2: succeeds on first attempt + stubFor(put(anyUrl()) + .withQueryParam("partNumber", matching(String.valueOf(2))) + .willReturn(aResponse().withStatus(200))); + + // Create a slow-streaming body with KNOWN content length (20 bytes total = 2 parts of 10 bytes) + int partSize = 10; + int totalSize = partSize * 2; + AsyncRequestBody slowStreamingBody = new AsyncRequestBody() { + @Override + public Optional contentLength() { + return Optional.of((long) totalSize); + } + + @Override + public void subscribe(Subscriber subscriber) { + subscriber.onSubscribe(new Subscription() { + private int bytesEmitted = 0; + + @Override + public void request(long n) { + // Emit data in small chunks to simulate slow streaming (like SFTP) + for (long i = 0; i < n && bytesEmitted < totalSize; i++) { + int chunkSize = Math.min(5, totalSize - bytesEmitted); + ByteBuffer buffer = ByteBuffer.allocate(chunkSize); + for (int j = 0; j < chunkSize; j++) { + buffer.put((byte) ('a' + (bytesEmitted + j) % 26)); + } + buffer.flip(); + bytesEmitted += chunkSize; + subscriber.onNext(buffer); + } + if (bytesEmitted >= totalSize) { + subscriber.onComplete(); + } + } + + @Override + public void cancel() { + // no-op + } + }); + } + }; + + // Wrap with full buffering enabled — this ensures retry buffer is populated before HTTP subscribe + BufferedSplittableAsyncRequestBody bufferedBody = + BufferedSplittableAsyncRequestBody.create(slowStreamingBody, true); + + // The upload should complete successfully — retry works because full buffering + // ensures the retry buffer is populated before the HTTP layer subscribes + PutObjectResponse response = s3AsyncClient.putObject(b -> b.bucket(BUCKET).key(KEY), bufferedBody).join(); + assertThat(response).isNotNull(); + + // Verify part 1 was attempted more than once (retry happened) + verify(moreThan(1), putRequestedFor(anyUrl()).withQueryParam("partNumber", matching(String.valueOf(1)))); + verify(lessThanOrExactly(3), putRequestedFor(anyUrl()).withQueryParam("partNumber", matching(String.valueOf(1)))); + } } From 0dae378240a31dc1491b638090312001c4b630a7 Mon Sep 17 00:00:00 2001 From: Alex Woods Date: Thu, 11 Jun 2026 10:06:59 -0700 Subject: [PATCH 2/4] Minor cleanups + add changelog --- .../feature-AmazonS3-d7fcc2e.json | 6 ++ .../BufferedSplittableAsyncRequestBody.java | 90 ++++++++++++++----- .../internal/async/SplittingPublisher.java | 2 + .../async/SplittingPublisherTest.java | 12 ++- ...3MultipartClientPutObjectWiremockTest.java | 5 +- 5 files changed, 90 insertions(+), 25 deletions(-) create mode 100644 .changes/next-release/feature-AmazonS3-d7fcc2e.json diff --git a/.changes/next-release/feature-AmazonS3-d7fcc2e.json b/.changes/next-release/feature-AmazonS3-d7fcc2e.json new file mode 100644 index 00000000000..3afe77264e3 --- /dev/null +++ b/.changes/next-release/feature-AmazonS3-d7fcc2e.json @@ -0,0 +1,6 @@ +{ + "type": "feature", + "category": "Amazon S3", + "contributor": "", + "description": "Added BufferedSplittableAsyncRequestBody.builder() with fullBufferingEnabled option that fully buffers each multipart upload part before sending, fixing NonRetryableException when retrying parts from slow streaming sources." +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BufferedSplittableAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BufferedSplittableAsyncRequestBody.java index 2c8478a2e39..112806025bd 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BufferedSplittableAsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BufferedSplittableAsyncRequestBody.java @@ -34,11 +34,19 @@ * If the first subscriber fails to consume all the data (e.g., due to early cancellation or errors), * subsequent retry attempts will fail since the complete data set is not available for resubscription.

* - *

Usage Example:

+ *

Usage Examples:

* {@snippet : + * // Simple usage (default behavior, immediate send): * AsyncRequestBody originalBody = AsyncRequestBody.fromString("Hello World"); * BufferedSplittableAsyncRequestBody retryableBody = * BufferedSplittableAsyncRequestBody.create(originalBody); + * + * // With full buffering enabled for slow streaming sources: + * BufferedSplittableAsyncRequestBody fullBufferedBody = + * BufferedSplittableAsyncRequestBody.builder() + * .asyncRequestBody(originalBody) + * .fullBufferingEnabled(true) + * .build(); * } * *

Performance Considerations:

@@ -56,16 +64,16 @@ public final class BufferedSplittableAsyncRequestBody implements AsyncRequestBod private final AsyncRequestBody delegate; private final boolean fullBufferingEnabled; - private BufferedSplittableAsyncRequestBody(AsyncRequestBody delegate, boolean fullBufferingEnabled) { - this.delegate = delegate; - this.fullBufferingEnabled = fullBufferingEnabled; + private BufferedSplittableAsyncRequestBody(Builder builder) { + this.delegate = Validate.paramNotNull(builder.asyncRequestBody, "asyncRequestBody"); + this.fullBufferingEnabled = builder.fullBufferingEnabled; } /** * Creates a new {@link BufferedSplittableAsyncRequestBody} that wraps the provided {@link AsyncRequestBody}. * *

Full buffering is disabled by default. Each part is sent to the downstream subscriber immediately - * upon initialization in the known-content-length path (existing behavior). + * upon initialization in the known-content-length path. * * @param delegate the {@link AsyncRequestBody} to wrap and make retryable. Must not be null. * @return a new {@link BufferedSplittableAsyncRequestBody} instance @@ -73,27 +81,16 @@ private BufferedSplittableAsyncRequestBody(AsyncRequestBody delegate, boolean fu */ public static BufferedSplittableAsyncRequestBody create(AsyncRequestBody delegate) { Validate.paramNotNull(delegate, "delegate"); - return new BufferedSplittableAsyncRequestBody(delegate, false); + return builder().asyncRequestBody(delegate).build(); } /** - * Creates a new {@link BufferedSplittableAsyncRequestBody} that wraps the provided {@link AsyncRequestBody}, - * with an option to enable full buffering before sending parts downstream. - * - *

When {@code fullBufferingEnabled} is {@code true}, each part is fully buffered before being sent to the - * downstream subscriber. This guarantees that the retry buffer is always populated before the HTTP layer - * subscribes, making per-part retry deterministically successful for slow streaming sources (e.g., SFTP). + * Returns a new {@link Builder} for creating a {@link BufferedSplittableAsyncRequestBody} with configuration options. * - *

When {@code fullBufferingEnabled} is {@code false}, behavior is identical to {@link #create(AsyncRequestBody)}. - * - * @param delegate the {@link AsyncRequestBody} to wrap and make retryable. Must not be null. - * @param fullBufferingEnabled whether to enable full buffering before sending parts downstream - * @return a new {@link BufferedSplittableAsyncRequestBody} instance - * @throws NullPointerException if delegate is null + * @return a new builder instance */ - public static BufferedSplittableAsyncRequestBody create(AsyncRequestBody delegate, boolean fullBufferingEnabled) { - Validate.paramNotNull(delegate, "delegate"); - return new BufferedSplittableAsyncRequestBody(delegate, fullBufferingEnabled); + public static Builder builder() { + return new Builder(); } @Override @@ -136,4 +133,55 @@ public void subscribe(Subscriber s) { public String body() { return delegate.body(); } + + /** + * Builder for {@link BufferedSplittableAsyncRequestBody}. + */ + public static final class Builder { + private AsyncRequestBody asyncRequestBody; + private boolean fullBufferingEnabled = false; + + private Builder() { + } + + /** + * Sets the {@link AsyncRequestBody} to wrap and make retryable. + * + * @param asyncRequestBody the request body to wrap. Must not be null. + * @return this builder for method chaining + */ + public Builder asyncRequestBody(AsyncRequestBody asyncRequestBody) { + this.asyncRequestBody = asyncRequestBody; + return this; + } + + /** + * Configures whether to fully buffer each part before sending it downstream. + * + *

When enabled, each part is fully buffered before being sent to the downstream subscriber. + * This guarantees that the retry buffer is always populated before the HTTP layer subscribes, + * making per-part retry deterministically successful for slow streaming sources (e.g., SFTP). + * + *

When disabled (the default), each part is sent immediately upon initialization in the + * known-content-length path, allowing the HTTP connection to open while data is still arriving. + * + * @param fullBufferingEnabled whether to enable full buffering before sending parts downstream. + * Defaults to {@code false}. + * @return this builder for method chaining + */ + public Builder fullBufferingEnabled(boolean fullBufferingEnabled) { + this.fullBufferingEnabled = fullBufferingEnabled; + return this; + } + + /** + * Builds a new {@link BufferedSplittableAsyncRequestBody} instance. + * + * @return a new instance configured by this builder + * @throws NullPointerException if asyncRequestBody is null + */ + public BufferedSplittableAsyncRequestBody build() { + return new BufferedSplittableAsyncRequestBody(this); + } + } } diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java index 4a167ee1171..b3b1422904a 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java @@ -36,6 +36,8 @@ * *

If content length is known, each {@link AsyncRequestBody} is sent to the subscriber right after it's initialized. * Otherwise, it is sent after the entire content for that chunk is buffered. This is required to get content length. + * When {@code fullBufferingEnabled} is set to {@code true}, the known-content-length path also defers sending until the + * part is fully buffered, guaranteeing that the retry buffer is populated before the downstream subscriber receives it. */ @SdkInternalApi public class SplittingPublisher implements SdkPublisher { diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java index 42cc9f3e6fb..36e72d6b64d 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java @@ -335,8 +335,11 @@ public void cancel() { } }; - // Use create(body, true) to enable full buffering - BufferedSplittableAsyncRequestBody bufferedBody = BufferedSplittableAsyncRequestBody.create(sourceBody, true); + // Use builder to enable full buffering + BufferedSplittableAsyncRequestBody bufferedBody = BufferedSplittableAsyncRequestBody.builder() + .asyncRequestBody(sourceBody) + .fullBufferingEnabled(true) + .build(); // Verify that content length is propagated assertThat(bufferedBody.contentLength()).hasValue((long) data.length); @@ -457,7 +460,10 @@ void bufferedSplittable_createWithFullBufferingTrue_partsAreRetryable() throws E } AsyncRequestBody sourceBody = AsyncRequestBody.fromBytes(data); - BufferedSplittableAsyncRequestBody bufferedBody = BufferedSplittableAsyncRequestBody.create(sourceBody, true); + BufferedSplittableAsyncRequestBody bufferedBody = BufferedSplittableAsyncRequestBody.builder() + .asyncRequestBody(sourceBody) + .fullBufferingEnabled(true) + .build(); AsyncRequestBodySplitConfiguration splitConfig = AsyncRequestBodySplitConfiguration.builder() .chunkSizeInBytes(5L) diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientPutObjectWiremockTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientPutObjectWiremockTest.java index 0f9bec28054..547d91a60de 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientPutObjectWiremockTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientPutObjectWiremockTest.java @@ -302,7 +302,10 @@ public void cancel() { // Wrap with full buffering enabled — this ensures retry buffer is populated before HTTP subscribe BufferedSplittableAsyncRequestBody bufferedBody = - BufferedSplittableAsyncRequestBody.create(slowStreamingBody, true); + BufferedSplittableAsyncRequestBody.builder() + .asyncRequestBody(slowStreamingBody) + .fullBufferingEnabled(true) + .build(); // The upload should complete successfully — retry works because full buffering // ensures the retry buffer is populated before the HTTP layer subscribes From 195f16e4740dc25d42106d0366a731a04b55e34f Mon Sep 17 00:00:00 2001 From: Alex Woods Date: Fri, 12 Jun 2026 08:32:18 -0700 Subject: [PATCH 3/4] Change fullBufferingEnabled to bufferBeforeSend --- .../feature-AmazonS3-d7fcc2e.json | 2 +- .../BufferedSplittableAsyncRequestBody.java | 21 +++++----- .../internal/async/SplittingPublisher.java | 16 ++++---- .../async/SplittingPublisherTest.java | 38 +++++++++---------- ...3MultipartClientPutObjectWiremockTest.java | 4 +- 5 files changed, 41 insertions(+), 40 deletions(-) diff --git a/.changes/next-release/feature-AmazonS3-d7fcc2e.json b/.changes/next-release/feature-AmazonS3-d7fcc2e.json index 3afe77264e3..2556bbdcb39 100644 --- a/.changes/next-release/feature-AmazonS3-d7fcc2e.json +++ b/.changes/next-release/feature-AmazonS3-d7fcc2e.json @@ -2,5 +2,5 @@ "type": "feature", "category": "Amazon S3", "contributor": "", - "description": "Added BufferedSplittableAsyncRequestBody.builder() with fullBufferingEnabled option that fully buffers each multipart upload part before sending, fixing NonRetryableException when retrying parts from slow streaming sources." + "description": "Added BufferedSplittableAsyncRequestBody.builder() with bufferBeforeSend option that fully buffers each multipart upload part before sending, fixing NonRetryableException when retrying parts from slow streaming sources." } diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BufferedSplittableAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BufferedSplittableAsyncRequestBody.java index 112806025bd..5741165d2eb 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BufferedSplittableAsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BufferedSplittableAsyncRequestBody.java @@ -22,6 +22,7 @@ import software.amazon.awssdk.core.internal.async.SplittingPublisher; import software.amazon.awssdk.utils.Validate; + /** * An {@link AsyncRequestBody} decorator that enables splitting into retryable sub-request bodies. * @@ -41,11 +42,11 @@ * BufferedSplittableAsyncRequestBody retryableBody = * BufferedSplittableAsyncRequestBody.create(originalBody); * - * // With full buffering enabled for slow streaming sources: + * // With buffer-before-send enabled for slow streaming sources: * BufferedSplittableAsyncRequestBody fullBufferedBody = * BufferedSplittableAsyncRequestBody.builder() * .asyncRequestBody(originalBody) - * .fullBufferingEnabled(true) + * .bufferBeforeSend(true) * .build(); * } * @@ -62,11 +63,11 @@ @SdkPublicApi public final class BufferedSplittableAsyncRequestBody implements AsyncRequestBody { private final AsyncRequestBody delegate; - private final boolean fullBufferingEnabled; + private final boolean bufferBeforeSend; private BufferedSplittableAsyncRequestBody(Builder builder) { this.delegate = Validate.paramNotNull(builder.asyncRequestBody, "asyncRequestBody"); - this.fullBufferingEnabled = builder.fullBufferingEnabled; + this.bufferBeforeSend = Validate.getOrDefault(builder.bufferBeforeSend, () -> false); } /** @@ -120,7 +121,7 @@ public SdkPublisher splitCloseable(AsyncRequestBodySp .asyncRequestBody(this) .splitConfiguration(splitConfiguration) .retryableSubAsyncRequestBodyEnabled(true) - .fullBufferingEnabled(fullBufferingEnabled) + .bufferBeforeSend(bufferBeforeSend) .build(); } @@ -139,7 +140,7 @@ public String body() { */ public static final class Builder { private AsyncRequestBody asyncRequestBody; - private boolean fullBufferingEnabled = false; + private Boolean bufferBeforeSend = null; private Builder() { } @@ -165,12 +166,12 @@ public Builder asyncRequestBody(AsyncRequestBody asyncRequestBody) { *

When disabled (the default), each part is sent immediately upon initialization in the * known-content-length path, allowing the HTTP connection to open while data is still arriving. * - * @param fullBufferingEnabled whether to enable full buffering before sending parts downstream. - * Defaults to {@code false}. + * @param bufferBeforeSend whether to enable full buffering before sending parts downstream. + * Defaults to {@code false}. * @return this builder for method chaining */ - public Builder fullBufferingEnabled(boolean fullBufferingEnabled) { - this.fullBufferingEnabled = fullBufferingEnabled; + public Builder bufferBeforeSend(Boolean bufferBeforeSend) { + this.bufferBeforeSend = bufferBeforeSend; return this; } diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java index b3b1422904a..6e41aee1abb 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java @@ -36,7 +36,7 @@ * *

If content length is known, each {@link AsyncRequestBody} is sent to the subscriber right after it's initialized. * Otherwise, it is sent after the entire content for that chunk is buffered. This is required to get content length. - * When {@code fullBufferingEnabled} is set to {@code true}, the known-content-length path also defers sending until the + * When {@code bufferBeforeSend} is set to {@code true}, the known-content-length path also defers sending until the * part is fully buffered, guaranteeing that the retry buffer is populated before the downstream subscriber receives it. */ @SdkInternalApi @@ -48,7 +48,7 @@ public class SplittingPublisher implements SdkPublisher= chunkSizeInBytes, @@ -140,7 +140,7 @@ private SubAsyncRequestBody initializeNextDownstreamBody(boolean contentLengthKn } currentBodySent.set(false); - if (contentLengthKnown && !fullBufferingEnabled) { + if (contentLengthKnown && !bufferBeforeSend) { sendCurrentBody(body); } return body; @@ -238,7 +238,7 @@ private void completeCurrentBody() { // Current body could be completed in either onNext or onComplete, so we need to guard against sending the last body // twice. - if ((upstreamSize == null || fullBufferingEnabled) && currentBodySent.compareAndSet(false, true)) { + if ((upstreamSize == null || bufferBeforeSend) && currentBodySent.compareAndSet(false, true)) { sendCurrentBody(currentBody); } } @@ -311,7 +311,7 @@ public static final class Builder { private AsyncRequestBody asyncRequestBody; private AsyncRequestBodySplitConfiguration splitConfiguration; private Boolean retryableSubAsyncRequestBodyEnabled; - private boolean fullBufferingEnabled = false; + private boolean bufferBeforeSend = false; private Builder() { } @@ -345,8 +345,8 @@ public Builder retryableSubAsyncRequestBodyEnabled(Boolean retryableSubAsyncRequ * When enabled, parts are only sent to the downstream subscriber after * all data for that part has been received and complete() has been called. */ - public Builder fullBufferingEnabled(boolean fullBufferingEnabled) { - this.fullBufferingEnabled = fullBufferingEnabled; + public Builder bufferBeforeSend(boolean bufferBeforeSend) { + this.bufferBeforeSend = bufferBeforeSend; return this; } diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java index 36e72d6b64d..6903e6c4d0e 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java @@ -338,7 +338,7 @@ public void cancel() { // Use builder to enable full buffering BufferedSplittableAsyncRequestBody bufferedBody = BufferedSplittableAsyncRequestBody.builder() .asyncRequestBody(sourceBody) - .fullBufferingEnabled(true) + .bufferBeforeSend(true) .build(); // Verify that content length is propagated @@ -462,7 +462,7 @@ void bufferedSplittable_createWithFullBufferingTrue_partsAreRetryable() throws E BufferedSplittableAsyncRequestBody bufferedBody = BufferedSplittableAsyncRequestBody.builder() .asyncRequestBody(sourceBody) - .fullBufferingEnabled(true) + .bufferBeforeSend(true) .build(); AsyncRequestBodySplitConfiguration splitConfig = AsyncRequestBodySplitConfiguration.builder() @@ -556,11 +556,11 @@ public void cancel() { } } - // ==================== Tests for fullBufferingEnabled ==================== + // ==================== Tests for bufferBeforeSend ==================== @Test - void fullBufferingEnabled_knownContentLength_defersBodyUntilComplete() throws Exception { - // When fullBufferingEnabled=true and content length is known, the downstream subscriber + void bufferBeforeSend_knownContentLength_defersBodyUntilComplete() throws Exception { + // When bufferBeforeSend=true and content length is known, the downstream subscriber // should NOT receive the body until completeCurrentBody() is invoked (i.e., after the part is fully buffered). byte[] data = new byte[10]; for (int i = 0; i < data.length; i++) { @@ -577,7 +577,7 @@ void fullBufferingEnabled_knownContentLength_defersBodyUntilComplete() throws Ex .bufferSizeInBytes(20L) .build()) .retryableSubAsyncRequestBodyEnabled(true) - .fullBufferingEnabled(true) + .bufferBeforeSend(true) .build(); List> receivedBodies = new ArrayList<>(); @@ -609,8 +609,8 @@ void fullBufferingEnabled_knownContentLength_defersBodyUntilComplete() throws Ex } @Test - void fullBufferingDisabled_knownContentLength_sendsImmediately() throws Exception { - // When fullBufferingEnabled=false (default) and content length is known, the body should + void bufferBeforeSendDisabled_knownContentLength_sendsImmediately() throws Exception { + // When bufferBeforeSend=false (default) and content length is known, the body should // be sent to the downstream subscriber immediately upon initialization. byte[] data = new byte[10]; for (int i = 0; i < data.length; i++) { @@ -626,7 +626,7 @@ void fullBufferingDisabled_knownContentLength_sendsImmediately() throws Exceptio .bufferSizeInBytes(20L) .build()) .retryableSubAsyncRequestBodyEnabled(true) - .fullBufferingEnabled(false) + .bufferBeforeSend(false) .build(); List> receivedBodies = new ArrayList<>(); @@ -650,8 +650,8 @@ void fullBufferingDisabled_knownContentLength_sendsImmediately() throws Exceptio } @Test - void fullBufferingEnabled_unknownContentLength_behaviorUnchanged() throws Exception { - // When content length is unknown, behavior is unchanged regardless of fullBufferingEnabled. + void bufferBeforeSend_unknownContentLength_behaviorUnchanged() throws Exception { + // When content length is unknown, behavior is unchanged regardless of bufferBeforeSend. // The body is always deferred until complete (existing behavior). byte[] data = new byte[10]; for (int i = 0; i < data.length; i++) { @@ -667,7 +667,7 @@ void fullBufferingEnabled_unknownContentLength_behaviorUnchanged() throws Except .bufferSizeInBytes(20L) .build()) .retryableSubAsyncRequestBodyEnabled(true) - .fullBufferingEnabled(true) + .bufferBeforeSend(true) .build(); List> receivedBodies = new ArrayList<>(); @@ -698,8 +698,8 @@ void fullBufferingEnabled_unknownContentLength_behaviorUnchanged() throws Except } @Test - void fullBufferingEnabled_multiPart_allPartsDeferred() throws Exception { - // When splitting into multiple parts with fullBufferingEnabled=true, all parts are deferred + void bufferBeforeSend_multiPart_allPartsDeferred() throws Exception { + // When splitting into multiple parts with bufferBeforeSend=true, all parts are deferred // until fully buffered. byte[] data = new byte[20]; for (int i = 0; i < data.length; i++) { @@ -715,7 +715,7 @@ void fullBufferingEnabled_multiPart_allPartsDeferred() throws Exception { .bufferSizeInBytes(30L) .build()) .retryableSubAsyncRequestBodyEnabled(true) - .fullBufferingEnabled(true) + .bufferBeforeSend(true) .build(); List> receivedBodies = new ArrayList<>(); @@ -767,8 +767,8 @@ void fullBufferingEnabled_multiPart_allPartsDeferred() throws Exception { } @Test - void fullBufferingEnabled_upstreamError_doesNotSendIncompleteBody() throws Exception { - // When fullBufferingEnabled=true and the upstream signals onError() before a part is fully buffered, + void bufferBeforeSend_upstreamError_doesNotSendIncompleteBody() throws Exception { + // When bufferBeforeSend=true and the upstream signals onError() before a part is fully buffered, // the incomplete part body should NOT be sent downstream. ControlledAsyncRequestBody controlledBody = new ControlledAsyncRequestBody(Optional.of(20L)); @@ -779,7 +779,7 @@ void fullBufferingEnabled_upstreamError_doesNotSendIncompleteBody() throws Excep .bufferSizeInBytes(20L) .build()) .retryableSubAsyncRequestBodyEnabled(true) - .fullBufferingEnabled(true) + .bufferBeforeSend(true) .build(); List> receivedBodies = new ArrayList<>(); @@ -815,7 +815,7 @@ public void onComplete() { controlledBody.sendData(ByteBuffer.wrap(new byte[5])); Thread.sleep(100); - // No body should have been emitted (fullBufferingEnabled defers until complete) + // No body should have been emitted (bufferBeforeSend defers until complete) assertThat(receivedBodies.size()).isEqualTo(0); // Signal upstream error diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientPutObjectWiremockTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientPutObjectWiremockTest.java index 547d91a60de..42bb4f715c8 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientPutObjectWiremockTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientPutObjectWiremockTest.java @@ -250,7 +250,7 @@ public int read() { * arrives slowly and the retry buffer must be populated before the HTTP layer subscribes. */ @Test - void mpuWithFullBufferingEnabled_slowStreamingKnownLength_retriesSuccessfullyOn500() { + void mpuWithBufferBeforeSend_slowStreamingKnownLength_retriesSuccessfullyOn500() { // Stub CreateMultipartUpload (POST) and CompleteMultipartUpload (POST) stubFor(post(anyUrl()).willReturn(aResponse().withStatus(200).withBody(CREATE_MULTIPART_PAYLOAD))); // Part 1: first attempt returns 500, retry returns 200 @@ -304,7 +304,7 @@ public void cancel() { BufferedSplittableAsyncRequestBody bufferedBody = BufferedSplittableAsyncRequestBody.builder() .asyncRequestBody(slowStreamingBody) - .fullBufferingEnabled(true) + .bufferBeforeSend(true) .build(); // The upload should complete successfully — retry works because full buffering From 583f253b5aef8525c896670f652a2cf586b2bd0a Mon Sep 17 00:00:00 2001 From: Alex Woods Date: Fri, 12 Jun 2026 12:23:21 -0700 Subject: [PATCH 4/4] Add validation on buffer size --- .../BufferedSplittableAsyncRequestBody.java | 6 ++ .../internal/async/SplittingPublisher.java | 7 +- .../async/SplittingPublisherTest.java | 67 +++++++++++++++++++ 3 files changed, 78 insertions(+), 2 deletions(-) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BufferedSplittableAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BufferedSplittableAsyncRequestBody.java index 5741165d2eb..8d874bb5bc1 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BufferedSplittableAsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BufferedSplittableAsyncRequestBody.java @@ -166,6 +166,12 @@ public Builder asyncRequestBody(AsyncRequestBody asyncRequestBody) { *

When disabled (the default), each part is sent immediately upon initialization in the * known-content-length path, allowing the HTTP connection to open while data is still arriving. * + *

Memory usage: Enabling this option does not increase the maximum memory footprint. + * Buffered data remains bounded by the {@code bufferSizeInBytes} configured in the + * {@link AsyncRequestBodySplitConfiguration}. The only behavioral difference is timing: data is + * held in the buffer slightly longer (until the part is complete) before being sent downstream, + * rather than being streamed out concurrently. + * * @param bufferBeforeSend whether to enable full buffering before sending parts downstream. * Defaults to {@code false}. * @return this builder for method chaining diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java index 6e41aee1abb..7aad91b5aea 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java @@ -69,10 +69,10 @@ private SplittingPublisher(Builder builder) { "retryableSubAsyncRequestBodyEnabled"); this.bufferBeforeSend = builder.bufferBeforeSend; this.sourceBodyName = builder.asyncRequestBody.body(); - if (!upstreamPublisher.contentLength().isPresent()) { + if (!upstreamPublisher.contentLength().isPresent() || bufferBeforeSend) { Validate.isTrue(bufferSizeInBytes >= chunkSizeInBytes, "bufferSizeInBytes must be larger than or equal to " + - "chunkSizeInBytes if the content length is unknown"); + "chunkSizeInBytes when the content length is unknown or bufferBeforeSend is enabled"); } } @@ -344,6 +344,9 @@ public Builder retryableSubAsyncRequestBodyEnabled(Boolean retryableSubAsyncRequ * Sets whether to enable full buffering before sending parts downstream. * When enabled, parts are only sent to the downstream subscriber after * all data for that part has been received and complete() has been called. + * + *

This does not increase the maximum memory footprint. Buffered data remains + * bounded by {@code bufferSizeInBytes} in the split configuration. */ public Builder bufferBeforeSend(boolean bufferBeforeSend) { this.bufferBeforeSend = bufferBeforeSend; diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java index 6903e6c4d0e..4bd0ffb3ef1 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java @@ -830,6 +830,73 @@ public void onComplete() { assertThat(receivedBodies.size()).isEqualTo(0); } + // ==================== Tests for bufferBeforeSend validation ==================== + + @Test + void bufferBeforeSend_bufferSizeLessThanChunkSize_throwsException() { + // When bufferBeforeSend=true and bufferSizeInBytes < chunkSizeInBytes, construction should fail + // because the buffer cannot hold a full chunk, which would cause a deadlock. + assertThatThrownBy(() -> SplittingPublisher.builder() + .asyncRequestBody(AsyncRequestBody.fromString("hello")) + .splitConfiguration(AsyncRequestBodySplitConfiguration.builder() + .chunkSizeInBytes(10L) + .bufferSizeInBytes(5L) + .build()) + .retryableSubAsyncRequestBodyEnabled(true) + .bufferBeforeSend(true) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("bufferSizeInBytes must be larger than or equal to chunkSizeInBytes"); + } + + @Test + void bufferBeforeSend_bufferSizeEqualToChunkSize_succeeds() { + // When bufferBeforeSend=true and bufferSizeInBytes == chunkSizeInBytes, construction should succeed. + SplittingPublisher publisher = SplittingPublisher.builder() + .asyncRequestBody(AsyncRequestBody.fromString("hello")) + .splitConfiguration(AsyncRequestBodySplitConfiguration.builder() + .chunkSizeInBytes(10L) + .bufferSizeInBytes(10L) + .build()) + .retryableSubAsyncRequestBodyEnabled(true) + .bufferBeforeSend(true) + .build(); + assertThat(publisher).isNotNull(); + } + + @Test + void bufferBeforeSendDisabled_bufferSizeLessThanChunkSize_knownContentLength_succeeds() { + // When bufferBeforeSend=false and content length is known, bufferSizeInBytes < chunkSizeInBytes + // is allowed because data flows through without full buffering. + SplittingPublisher publisher = SplittingPublisher.builder() + .asyncRequestBody(AsyncRequestBody.fromString("hello")) + .splitConfiguration(AsyncRequestBodySplitConfiguration.builder() + .chunkSizeInBytes(10L) + .bufferSizeInBytes(5L) + .build()) + .retryableSubAsyncRequestBodyEnabled(true) + .bufferBeforeSend(false) + .build(); + assertThat(publisher).isNotNull(); + } + + @Test + void unknownContentLength_bufferSizeLessThanChunkSize_throwsException() { + // Existing behavior: unknown content length with bufferSizeInBytes < chunkSizeInBytes should fail. + ControlledAsyncRequestBody unknownLengthBody = new ControlledAsyncRequestBody(Optional.empty()); + assertThatThrownBy(() -> SplittingPublisher.builder() + .asyncRequestBody(unknownLengthBody) + .splitConfiguration(AsyncRequestBodySplitConfiguration.builder() + .chunkSizeInBytes(10L) + .bufferSizeInBytes(5L) + .build()) + .retryableSubAsyncRequestBodyEnabled(true) + .bufferBeforeSend(false) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("bufferSizeInBytes must be larger than or equal to chunkSizeInBytes"); + } + /** * A controlled AsyncRequestBody that allows tests to send data, complete, and signal errors * at specific times to test deferred/immediate behavior.