diff --git a/.changes/next-release/feature-AmazonS3-d7fcc2e.json b/.changes/next-release/feature-AmazonS3-d7fcc2e.json new file mode 100644 index 00000000000..2556bbdcb39 --- /dev/null +++ b/.changes/next-release/feature-AmazonS3-d7fcc2e.json @@ -0,0 +1,6 @@ +{ + "type": "feature", + "category": "Amazon S3", + "contributor": "", + "description": "Added BufferedSplittableAsyncRequestBody.builder() with bufferBeforeSend option that fully buffers each multipart upload part before sending, fixing NonRetryableException when retrying parts from slow streaming sources." +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BufferedSplittableAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BufferedSplittableAsyncRequestBody.java index a1c46238dfe..8d874bb5bc1 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BufferedSplittableAsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BufferedSplittableAsyncRequestBody.java @@ -22,6 +22,7 @@ import software.amazon.awssdk.core.internal.async.SplittingPublisher; import software.amazon.awssdk.utils.Validate; + /** * An {@link AsyncRequestBody} decorator that enables splitting into retryable sub-request bodies. * @@ -34,11 +35,19 @@ * If the first subscriber fails to consume all the data (e.g., due to early cancellation or errors), * subsequent retry attempts will fail since the complete data set is not available for resubscription.

* - *

Usage Example:

+ *

Usage Examples:

* {@snippet : + * // Simple usage (default behavior, immediate send): * AsyncRequestBody originalBody = AsyncRequestBody.fromString("Hello World"); * BufferedSplittableAsyncRequestBody retryableBody = * BufferedSplittableAsyncRequestBody.create(originalBody); + * + * // With buffer-before-send enabled for slow streaming sources: + * BufferedSplittableAsyncRequestBody fullBufferedBody = + * BufferedSplittableAsyncRequestBody.builder() + * .asyncRequestBody(originalBody) + * .bufferBeforeSend(true) + * .build(); * } * *

Performance Considerations:

@@ -54,21 +63,35 @@ @SdkPublicApi public final class BufferedSplittableAsyncRequestBody implements AsyncRequestBody { private final AsyncRequestBody delegate; + private final boolean bufferBeforeSend; - private BufferedSplittableAsyncRequestBody(AsyncRequestBody delegate) { - this.delegate = delegate; + private BufferedSplittableAsyncRequestBody(Builder builder) { + this.delegate = Validate.paramNotNull(builder.asyncRequestBody, "asyncRequestBody"); + this.bufferBeforeSend = Validate.getOrDefault(builder.bufferBeforeSend, () -> false); } /** * Creates a new {@link BufferedSplittableAsyncRequestBody} that wraps the provided {@link AsyncRequestBody}. * + *

Full buffering is disabled by default. Each part is sent to the downstream subscriber immediately + * upon initialization in the known-content-length path. + * * @param delegate the {@link AsyncRequestBody} to wrap and make retryable. Must not be null. * @return a new {@link BufferedSplittableAsyncRequestBody} instance * @throws NullPointerException if delegate is null */ public static BufferedSplittableAsyncRequestBody create(AsyncRequestBody delegate) { Validate.paramNotNull(delegate, "delegate"); - return new BufferedSplittableAsyncRequestBody(delegate); + return builder().asyncRequestBody(delegate).build(); + } + + /** + * Returns a new {@link Builder} for creating a {@link BufferedSplittableAsyncRequestBody} with configuration options. + * + * @return a new builder instance + */ + public static Builder builder() { + return new Builder(); } @Override @@ -98,6 +121,7 @@ public SdkPublisher splitCloseable(AsyncRequestBodySp .asyncRequestBody(this) .splitConfiguration(splitConfiguration) .retryableSubAsyncRequestBodyEnabled(true) + .bufferBeforeSend(bufferBeforeSend) .build(); } @@ -110,4 +134,61 @@ public void subscribe(Subscriber s) { public String body() { return delegate.body(); } + + /** + * Builder for {@link BufferedSplittableAsyncRequestBody}. + */ + public static final class Builder { + private AsyncRequestBody asyncRequestBody; + private Boolean bufferBeforeSend = null; + + private Builder() { + } + + /** + * Sets the {@link AsyncRequestBody} to wrap and make retryable. + * + * @param asyncRequestBody the request body to wrap. Must not be null. + * @return this builder for method chaining + */ + public Builder asyncRequestBody(AsyncRequestBody asyncRequestBody) { + this.asyncRequestBody = asyncRequestBody; + return this; + } + + /** + * Configures whether to fully buffer each part before sending it downstream. + * + *

When enabled, each part is fully buffered before being sent to the downstream subscriber. + * This guarantees that the retry buffer is always populated before the HTTP layer subscribes, + * making per-part retry deterministically successful for slow streaming sources (e.g., SFTP). + * + *

When disabled (the default), each part is sent immediately upon initialization in the + * known-content-length path, allowing the HTTP connection to open while data is still arriving. + * + *

Memory usage: Enabling this option does not increase the maximum memory footprint. + * Buffered data remains bounded by the {@code bufferSizeInBytes} configured in the + * {@link AsyncRequestBodySplitConfiguration}. The only behavioral difference is timing: data is + * held in the buffer slightly longer (until the part is complete) before being sent downstream, + * rather than being streamed out concurrently. + * + * @param bufferBeforeSend whether to enable full buffering before sending parts downstream. + * Defaults to {@code false}. + * @return this builder for method chaining + */ + public Builder bufferBeforeSend(Boolean bufferBeforeSend) { + this.bufferBeforeSend = bufferBeforeSend; + return this; + } + + /** + * Builds a new {@link BufferedSplittableAsyncRequestBody} instance. + * + * @return a new instance configured by this builder + * @throws NullPointerException if asyncRequestBody is null + */ + public BufferedSplittableAsyncRequestBody build() { + return new BufferedSplittableAsyncRequestBody(this); + } + } } diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java index 82e749c14cc..7aad91b5aea 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java @@ -36,6 +36,8 @@ * *

If content length is known, each {@link AsyncRequestBody} is sent to the subscriber right after it's initialized. * Otherwise, it is sent after the entire content for that chunk is buffered. This is required to get content length. + * When {@code bufferBeforeSend} is set to {@code true}, the known-content-length path also defers sending until the + * part is fully buffered, guaranteeing that the retry buffer is populated before the downstream subscriber receives it. */ @SdkInternalApi public class SplittingPublisher implements SdkPublisher { @@ -46,6 +48,7 @@ public class SplittingPublisher implements SdkPublisher= chunkSizeInBytes, "bufferSizeInBytes must be larger than or equal to " + - "chunkSizeInBytes if the content length is unknown"); + "chunkSizeInBytes when the content length is unknown or bufferBeforeSend is enabled"); } } @@ -136,7 +140,7 @@ private SubAsyncRequestBody initializeNextDownstreamBody(boolean contentLengthKn } currentBodySent.set(false); - if (contentLengthKnown) { + if (contentLengthKnown && !bufferBeforeSend) { sendCurrentBody(body); } return body; @@ -234,7 +238,7 @@ private void completeCurrentBody() { // Current body could be completed in either onNext or onComplete, so we need to guard against sending the last body // twice. - if (upstreamSize == null && currentBodySent.compareAndSet(false, true)) { + if ((upstreamSize == null || bufferBeforeSend) && currentBodySent.compareAndSet(false, true)) { sendCurrentBody(currentBody); } } @@ -307,6 +311,7 @@ public static final class Builder { private AsyncRequestBody asyncRequestBody; private AsyncRequestBodySplitConfiguration splitConfiguration; private Boolean retryableSubAsyncRequestBodyEnabled; + private boolean bufferBeforeSend = false; private Builder() { } @@ -335,6 +340,19 @@ public Builder retryableSubAsyncRequestBodyEnabled(Boolean retryableSubAsyncRequ return this; } + /** + * Sets whether to enable full buffering before sending parts downstream. + * When enabled, parts are only sent to the downstream subscriber after + * all data for that part has been received and complete() has been called. + * + *

This does not increase the maximum memory footprint. Buffered data remains + * bounded by {@code bufferSizeInBytes} in the split configuration. + */ + public Builder bufferBeforeSend(boolean bufferBeforeSend) { + this.bufferBeforeSend = bufferBeforeSend; + return this; + } + /** * Builds a {@link SplittingPublisher} object based on the values held by this builder. */ diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java index 39326ac4167..4bd0ffb3ef1 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java @@ -48,6 +48,9 @@ import org.reactivestreams.Subscription; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration; +import software.amazon.awssdk.core.async.BufferedSplittableAsyncRequestBody; +import software.amazon.awssdk.core.async.CloseableAsyncRequestBody; +import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.utils.BinaryUtils; import software.amazon.awssdk.utils.Pair; @@ -298,6 +301,203 @@ public void onComplete() { assertThat(error).isEqualTo(upstreamError); } + @Test + void bufferedSplittable_createWithFullBufferingTrue_defersPartEmission() throws Exception { + // Create a controlled async request body with known content length that delivers data in two chunks + byte[] data = new byte[10]; + for (int i = 0; i < data.length; i++) { + data[i] = (byte) i; + } + AsyncRequestBody sourceBody = new AsyncRequestBody() { + @Override + public Optional contentLength() { + return Optional.of((long) data.length); + } + + @Override + public void subscribe(Subscriber s) { + s.onSubscribe(new Subscription() { + private boolean done = false; + + @Override + public void request(long n) { + if (!done) { + done = true; + s.onNext(ByteBuffer.wrap(data)); + s.onComplete(); + } + } + + @Override + public void cancel() { + } + }); + } + }; + + // Use builder to enable full buffering + BufferedSplittableAsyncRequestBody bufferedBody = BufferedSplittableAsyncRequestBody.builder() + .asyncRequestBody(sourceBody) + .bufferBeforeSend(true) + .build(); + + // Verify that content length is propagated + assertThat(bufferedBody.contentLength()).hasValue((long) data.length); + + // Split with a chunk size of 5 (two parts of 5 bytes each) + AsyncRequestBodySplitConfiguration splitConfig = AsyncRequestBodySplitConfiguration.builder() + .chunkSizeInBytes(5L) + .bufferSizeInBytes(20L) + .build(); + + SdkPublisher publisher = bufferedBody.splitCloseable(splitConfig); + + // Track when parts are received by the downstream subscriber + List> partFutures = new ArrayList<>(); + + CompletableFuture subscribeFuture = publisher.subscribe(requestBody -> { + // Each part should arrive with data already available (fully buffered) + CompletableFuture partFuture = new CompletableFuture<>(); + partFutures.add(partFuture); + requestBody.subscribe(new BaosSubscriber(partFuture)); + partFuture.whenComplete((r, t) -> requestBody.close()); + }); + + subscribeFuture.get(5, TimeUnit.SECONDS); + + // Verify we received 2 parts with correct data + assertThat(partFutures.size()).isEqualTo(2); + + byte[] firstPart = partFutures.get(0).get(5, TimeUnit.SECONDS); + byte[] secondPart = partFutures.get(1).get(5, TimeUnit.SECONDS); + + byte[] expectedFirst = new byte[]{0, 1, 2, 3, 4}; + byte[] expectedSecond = new byte[]{5, 6, 7, 8, 9}; + + assertThat(firstPart).isEqualTo(expectedFirst); + assertThat(secondPart).isEqualTo(expectedSecond); + } + + @Test + void bufferedSplittable_createDefault_sendsPartsImmediately() throws Exception { + // Create a body with known content length + byte[] data = new byte[10]; + for (int i = 0; i < data.length; i++) { + data[i] = (byte) i; + } + AsyncRequestBody sourceBody = new AsyncRequestBody() { + @Override + public Optional contentLength() { + return Optional.of((long) data.length); + } + + @Override + public void subscribe(Subscriber s) { + s.onSubscribe(new Subscription() { + private boolean done = false; + + @Override + public void request(long n) { + if (!done) { + done = true; + s.onNext(ByteBuffer.wrap(data)); + s.onComplete(); + } + } + + @Override + public void cancel() { + } + }); + } + }; + + // Use default create(body) - full buffering should be disabled + BufferedSplittableAsyncRequestBody bufferedBody = BufferedSplittableAsyncRequestBody.create(sourceBody); + + // Verify content length is propagated + assertThat(bufferedBody.contentLength()).hasValue((long) data.length); + + // Split with chunk size of 5 (two parts of 5 bytes each) + AsyncRequestBodySplitConfiguration splitConfig = AsyncRequestBodySplitConfiguration.builder() + .chunkSizeInBytes(5L) + .bufferSizeInBytes(20L) + .build(); + + SdkPublisher publisher = bufferedBody.splitCloseable(splitConfig); + + // Track parts received + List> partFutures = new ArrayList<>(); + + CompletableFuture subscribeFuture = publisher.subscribe(requestBody -> { + CompletableFuture partFuture = new CompletableFuture<>(); + partFutures.add(partFuture); + requestBody.subscribe(new BaosSubscriber(partFuture)); + partFuture.whenComplete((r, t) -> requestBody.close()); + }); + + subscribeFuture.get(5, TimeUnit.SECONDS); + + // Verify we received 2 parts with correct data (existing behavior preserved) + assertThat(partFutures.size()).isEqualTo(2); + + byte[] firstPart = partFutures.get(0).get(5, TimeUnit.SECONDS); + byte[] secondPart = partFutures.get(1).get(5, TimeUnit.SECONDS); + + byte[] expectedFirst = new byte[]{0, 1, 2, 3, 4}; + byte[] expectedSecond = new byte[]{5, 6, 7, 8, 9}; + + assertThat(firstPart).isEqualTo(expectedFirst); + assertThat(secondPart).isEqualTo(expectedSecond); + } + + @Test + void bufferedSplittable_createWithFullBufferingTrue_partsAreRetryable() throws Exception { + // Verify that create(body, true) produces retryable sub-bodies (retry buffer is populated) + byte[] data = new byte[10]; + for (int i = 0; i < data.length; i++) { + data[i] = (byte) i; + } + AsyncRequestBody sourceBody = AsyncRequestBody.fromBytes(data); + + BufferedSplittableAsyncRequestBody bufferedBody = BufferedSplittableAsyncRequestBody.builder() + .asyncRequestBody(sourceBody) + .bufferBeforeSend(true) + .build(); + + AsyncRequestBodySplitConfiguration splitConfig = AsyncRequestBodySplitConfiguration.builder() + .chunkSizeInBytes(5L) + .bufferSizeInBytes(20L) + .build(); + + SdkPublisher publisher = bufferedBody.splitCloseable(splitConfig); + + // Subscribe, read each part, then resubscribe to verify retry buffer is available + Map, CompletableFuture>> futures = new HashMap<>(); + AtomicInteger index = new AtomicInteger(); + + publisher.subscribe(requestBody -> { + int i = index.getAndIncrement(); + CompletableFuture firstRead = new CompletableFuture<>(); + requestBody.subscribe(new BaosSubscriber(firstRead)); + + firstRead.whenComplete((r, t) -> { + // Resubscribe to verify retry works + CompletableFuture secondRead = new CompletableFuture<>(); + requestBody.subscribe(new BaosSubscriber(secondRead)); + futures.put(i, Pair.of(firstRead, secondRead)); + secondRead.whenComplete((res, throwable) -> requestBody.close()); + }); + }).get(5, TimeUnit.SECONDS); + + // Verify all parts can be re-read (retry buffer is populated before downstream subscription) + for (int i = 0; i < futures.size(); i++) { + byte[] firstReadData = futures.get(i).left().get(5, TimeUnit.SECONDS); + byte[] secondReadData = futures.get(i).right().get(5, TimeUnit.SECONDS); + assertThat(firstReadData).isEqualTo(secondReadData); + } + } + private static void verifySplitContent(AsyncRequestBody asyncRequestBody, int chunkSize) throws Exception { SplittingPublisher splittingPublisher = SplittingPublisher.builder() .asyncRequestBody(asyncRequestBody) @@ -356,6 +556,393 @@ public void cancel() { } } + // ==================== Tests for bufferBeforeSend ==================== + + @Test + void bufferBeforeSend_knownContentLength_defersBodyUntilComplete() throws Exception { + // When bufferBeforeSend=true and content length is known, the downstream subscriber + // should NOT receive the body until completeCurrentBody() is invoked (i.e., after the part is fully buffered). + byte[] data = new byte[10]; + for (int i = 0; i < data.length; i++) { + data[i] = (byte) i; + } + + // Use a controlled upstream that sends data in pieces + ControlledAsyncRequestBody controlledBody = new ControlledAsyncRequestBody(Optional.of((long) data.length)); + + SplittingPublisher splittingPublisher = SplittingPublisher.builder() + .asyncRequestBody(controlledBody) + .splitConfiguration(AsyncRequestBodySplitConfiguration.builder() + .chunkSizeInBytes(10L) + .bufferSizeInBytes(20L) + .build()) + .retryableSubAsyncRequestBodyEnabled(true) + .bufferBeforeSend(true) + .build(); + + List> receivedBodies = new ArrayList<>(); + CompletableFuture subscribeFuture = splittingPublisher.subscribe(requestBody -> { + CompletableFuture bodyFuture = new CompletableFuture<>(); + receivedBodies.add(bodyFuture); + BaosSubscriber subscriber = new BaosSubscriber(bodyFuture); + requestBody.subscribe(subscriber); + }); + + // Give time for subscription to be set up + Thread.sleep(100); + + // Send partial data — the body should NOT have been emitted yet + controlledBody.sendData(ByteBuffer.wrap(data, 0, 5)); + Thread.sleep(100); + assertThat(receivedBodies.size()).isEqualTo(0); + + // Send remaining data and complete + controlledBody.sendData(ByteBuffer.wrap(data, 5, 5)); + controlledBody.complete(); + + subscribeFuture.get(5, TimeUnit.SECONDS); + + // Now the body should have been emitted and completed + assertThat(receivedBodies.size()).isEqualTo(1); + byte[] result = receivedBodies.get(0).get(5, TimeUnit.SECONDS); + assertThat(result).isEqualTo(data); + } + + @Test + void bufferBeforeSendDisabled_knownContentLength_sendsImmediately() throws Exception { + // When bufferBeforeSend=false (default) and content length is known, the body should + // be sent to the downstream subscriber immediately upon initialization. + byte[] data = new byte[10]; + for (int i = 0; i < data.length; i++) { + data[i] = (byte) i; + } + + ControlledAsyncRequestBody controlledBody = new ControlledAsyncRequestBody(Optional.of((long) data.length)); + + SplittingPublisher splittingPublisher = SplittingPublisher.builder() + .asyncRequestBody(controlledBody) + .splitConfiguration(AsyncRequestBodySplitConfiguration.builder() + .chunkSizeInBytes(10L) + .bufferSizeInBytes(20L) + .build()) + .retryableSubAsyncRequestBodyEnabled(true) + .bufferBeforeSend(false) + .build(); + + List> receivedBodies = new ArrayList<>(); + splittingPublisher.subscribe(requestBody -> { + CompletableFuture bodyFuture = new CompletableFuture<>(); + receivedBodies.add(bodyFuture); + BaosSubscriber subscriber = new BaosSubscriber(bodyFuture); + requestBody.subscribe(subscriber); + }); + + // Give time for subscription to be set up — the body should be sent immediately + Thread.sleep(100); + assertThat(receivedBodies.size()).isEqualTo(1); + + // Now send data and complete + controlledBody.sendData(ByteBuffer.wrap(data)); + controlledBody.complete(); + + byte[] result = receivedBodies.get(0).get(5, TimeUnit.SECONDS); + assertThat(result).isEqualTo(data); + } + + @Test + void bufferBeforeSend_unknownContentLength_behaviorUnchanged() throws Exception { + // When content length is unknown, behavior is unchanged regardless of bufferBeforeSend. + // The body is always deferred until complete (existing behavior). + byte[] data = new byte[10]; + for (int i = 0; i < data.length; i++) { + data[i] = (byte) i; + } + + ControlledAsyncRequestBody controlledBody = new ControlledAsyncRequestBody(Optional.empty()); + + SplittingPublisher splittingPublisher = SplittingPublisher.builder() + .asyncRequestBody(controlledBody) + .splitConfiguration(AsyncRequestBodySplitConfiguration.builder() + .chunkSizeInBytes(10L) + .bufferSizeInBytes(20L) + .build()) + .retryableSubAsyncRequestBodyEnabled(true) + .bufferBeforeSend(true) + .build(); + + List> receivedBodies = new ArrayList<>(); + splittingPublisher.subscribe(requestBody -> { + CompletableFuture bodyFuture = new CompletableFuture<>(); + receivedBodies.add(bodyFuture); + BaosSubscriber subscriber = new BaosSubscriber(bodyFuture); + requestBody.subscribe(subscriber); + }); + + // Give time for subscription to be set up + Thread.sleep(100); + + // Send partial data — the body should NOT have been emitted yet (unknown-length path defers) + controlledBody.sendData(ByteBuffer.wrap(data, 0, 5)); + Thread.sleep(100); + assertThat(receivedBodies.size()).isEqualTo(0); + + // Send remaining data and complete + controlledBody.sendData(ByteBuffer.wrap(data, 5, 5)); + controlledBody.complete(); + + // Now body should be emitted + Thread.sleep(200); + assertThat(receivedBodies.size()).isEqualTo(1); + byte[] result = receivedBodies.get(0).get(5, TimeUnit.SECONDS); + assertThat(result).isEqualTo(data); + } + + @Test + void bufferBeforeSend_multiPart_allPartsDeferred() throws Exception { + // When splitting into multiple parts with bufferBeforeSend=true, all parts are deferred + // until fully buffered. + byte[] data = new byte[20]; + for (int i = 0; i < data.length; i++) { + data[i] = (byte) i; + } + + ControlledAsyncRequestBody controlledBody = new ControlledAsyncRequestBody(Optional.of((long) data.length)); + + SplittingPublisher splittingPublisher = SplittingPublisher.builder() + .asyncRequestBody(controlledBody) + .splitConfiguration(AsyncRequestBodySplitConfiguration.builder() + .chunkSizeInBytes(10L) + .bufferSizeInBytes(30L) + .build()) + .retryableSubAsyncRequestBodyEnabled(true) + .bufferBeforeSend(true) + .build(); + + List> receivedBodies = new ArrayList<>(); + CompletableFuture subscribeFuture = splittingPublisher.subscribe(requestBody -> { + CompletableFuture bodyFuture = new CompletableFuture<>(); + receivedBodies.add(bodyFuture); + BaosSubscriber subscriber = new BaosSubscriber(bodyFuture); + requestBody.subscribe(subscriber); + }); + + // Give time for subscription + Thread.sleep(100); + + // Send first chunk partially — no body should be emitted yet + controlledBody.sendData(ByteBuffer.wrap(data, 0, 5)); + Thread.sleep(100); + assertThat(receivedBodies.size()).isEqualTo(0); + + // Complete first chunk (10 bytes) — first body should now be emitted + controlledBody.sendData(ByteBuffer.wrap(data, 5, 5)); + Thread.sleep(100); + assertThat(receivedBodies.size()).isEqualTo(1); + + // Send second chunk partially — second body should not be emitted yet + controlledBody.sendData(ByteBuffer.wrap(data, 10, 5)); + Thread.sleep(100); + assertThat(receivedBodies.size()).isEqualTo(1); + + // Complete second chunk and signal upstream complete + controlledBody.sendData(ByteBuffer.wrap(data, 15, 5)); + controlledBody.complete(); + + subscribeFuture.get(5, TimeUnit.SECONDS); + + // Both bodies should now be emitted + assertThat(receivedBodies.size()).isEqualTo(2); + + // Verify content of first part + byte[] firstPart = receivedBodies.get(0).get(5, TimeUnit.SECONDS); + byte[] expectedFirst = new byte[10]; + System.arraycopy(data, 0, expectedFirst, 0, 10); + assertThat(firstPart).isEqualTo(expectedFirst); + + // Verify content of second part + byte[] secondPart = receivedBodies.get(1).get(5, TimeUnit.SECONDS); + byte[] expectedSecond = new byte[10]; + System.arraycopy(data, 10, expectedSecond, 0, 10); + assertThat(secondPart).isEqualTo(expectedSecond); + } + + @Test + void bufferBeforeSend_upstreamError_doesNotSendIncompleteBody() throws Exception { + // When bufferBeforeSend=true and the upstream signals onError() before a part is fully buffered, + // the incomplete part body should NOT be sent downstream. + ControlledAsyncRequestBody controlledBody = new ControlledAsyncRequestBody(Optional.of(20L)); + + SplittingPublisher splittingPublisher = SplittingPublisher.builder() + .asyncRequestBody(controlledBody) + .splitConfiguration(AsyncRequestBodySplitConfiguration.builder() + .chunkSizeInBytes(10L) + .bufferSizeInBytes(20L) + .build()) + .retryableSubAsyncRequestBodyEnabled(true) + .bufferBeforeSend(true) + .build(); + + List> receivedBodies = new ArrayList<>(); + CompletableFuture downstreamError = new CompletableFuture<>(); + splittingPublisher.subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + s.request(Long.MAX_VALUE); + } + + @Override + public void onNext(CloseableAsyncRequestBody requestBody) { + CompletableFuture bodyFuture = new CompletableFuture<>(); + receivedBodies.add(bodyFuture); + BaosSubscriber subscriber = new BaosSubscriber(bodyFuture); + requestBody.subscribe(subscriber); + } + + @Override + public void onError(Throwable t) { + downstreamError.complete(t); + } + + @Override + public void onComplete() { + } + }); + + // Give time for subscription + Thread.sleep(100); + + // Send partial data (less than chunk size of 10) + controlledBody.sendData(ByteBuffer.wrap(new byte[5])); + Thread.sleep(100); + + // No body should have been emitted (bufferBeforeSend defers until complete) + assertThat(receivedBodies.size()).isEqualTo(0); + + // Signal upstream error + RuntimeException error = new RuntimeException("upstream failure"); + controlledBody.sendError(error); + + // The error should propagate to downstream subscriber + Throwable receivedError = downstreamError.get(5, TimeUnit.SECONDS); + assertThat(receivedError).isEqualTo(error); + + // The incomplete body should NOT have been sent downstream + assertThat(receivedBodies.size()).isEqualTo(0); + } + + // ==================== Tests for bufferBeforeSend validation ==================== + + @Test + void bufferBeforeSend_bufferSizeLessThanChunkSize_throwsException() { + // When bufferBeforeSend=true and bufferSizeInBytes < chunkSizeInBytes, construction should fail + // because the buffer cannot hold a full chunk, which would cause a deadlock. + assertThatThrownBy(() -> SplittingPublisher.builder() + .asyncRequestBody(AsyncRequestBody.fromString("hello")) + .splitConfiguration(AsyncRequestBodySplitConfiguration.builder() + .chunkSizeInBytes(10L) + .bufferSizeInBytes(5L) + .build()) + .retryableSubAsyncRequestBodyEnabled(true) + .bufferBeforeSend(true) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("bufferSizeInBytes must be larger than or equal to chunkSizeInBytes"); + } + + @Test + void bufferBeforeSend_bufferSizeEqualToChunkSize_succeeds() { + // When bufferBeforeSend=true and bufferSizeInBytes == chunkSizeInBytes, construction should succeed. + SplittingPublisher publisher = SplittingPublisher.builder() + .asyncRequestBody(AsyncRequestBody.fromString("hello")) + .splitConfiguration(AsyncRequestBodySplitConfiguration.builder() + .chunkSizeInBytes(10L) + .bufferSizeInBytes(10L) + .build()) + .retryableSubAsyncRequestBodyEnabled(true) + .bufferBeforeSend(true) + .build(); + assertThat(publisher).isNotNull(); + } + + @Test + void bufferBeforeSendDisabled_bufferSizeLessThanChunkSize_knownContentLength_succeeds() { + // When bufferBeforeSend=false and content length is known, bufferSizeInBytes < chunkSizeInBytes + // is allowed because data flows through without full buffering. + SplittingPublisher publisher = SplittingPublisher.builder() + .asyncRequestBody(AsyncRequestBody.fromString("hello")) + .splitConfiguration(AsyncRequestBodySplitConfiguration.builder() + .chunkSizeInBytes(10L) + .bufferSizeInBytes(5L) + .build()) + .retryableSubAsyncRequestBodyEnabled(true) + .bufferBeforeSend(false) + .build(); + assertThat(publisher).isNotNull(); + } + + @Test + void unknownContentLength_bufferSizeLessThanChunkSize_throwsException() { + // Existing behavior: unknown content length with bufferSizeInBytes < chunkSizeInBytes should fail. + ControlledAsyncRequestBody unknownLengthBody = new ControlledAsyncRequestBody(Optional.empty()); + assertThatThrownBy(() -> SplittingPublisher.builder() + .asyncRequestBody(unknownLengthBody) + .splitConfiguration(AsyncRequestBodySplitConfiguration.builder() + .chunkSizeInBytes(10L) + .bufferSizeInBytes(5L) + .build()) + .retryableSubAsyncRequestBodyEnabled(true) + .bufferBeforeSend(false) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("bufferSizeInBytes must be larger than or equal to chunkSizeInBytes"); + } + + /** + * A controlled AsyncRequestBody that allows tests to send data, complete, and signal errors + * at specific times to test deferred/immediate behavior. + */ + private static class ControlledAsyncRequestBody implements AsyncRequestBody { + private final Optional contentLength; + private volatile Subscriber subscriber; + private volatile Subscription subscription; + + ControlledAsyncRequestBody(Optional contentLength) { + this.contentLength = contentLength; + } + + @Override + public Optional contentLength() { + return contentLength; + } + + @Override + public void subscribe(Subscriber s) { + this.subscriber = s; + s.onSubscribe(new Subscription() { + @Override + public void request(long n) { + // Controlled — data is sent explicitly via sendData() + } + + @Override + public void cancel() { + } + }); + } + + void sendData(ByteBuffer data) { + subscriber.onNext(data); + } + + void complete() { + subscriber.onComplete(); + } + + void sendError(Throwable t) { + subscriber.onError(t); + } + } + private static final class BaosSubscriber implements Subscriber { private final CompletableFuture resultFuture; diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientPutObjectWiremockTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientPutObjectWiremockTest.java index 815abd982ef..42bb4f715c8 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientPutObjectWiremockTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientPutObjectWiremockTest.java @@ -27,6 +27,7 @@ import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; import static com.github.tomakehurst.wiremock.client.WireMock.verify; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import com.github.tomakehurst.wiremock.client.ResponseDefinitionBuilder; @@ -52,6 +53,7 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.core.SdkBytes; @@ -241,5 +243,78 @@ public int read() { } }; } + + /** + * Verifies that with full buffering enabled, a slow-streaming body with known content length + * can successfully retry a failed part upload. This simulates the SFTP scenario where data + * arrives slowly and the retry buffer must be populated before the HTTP layer subscribes. + */ + @Test + void mpuWithBufferBeforeSend_slowStreamingKnownLength_retriesSuccessfullyOn500() { + // Stub CreateMultipartUpload (POST) and CompleteMultipartUpload (POST) + stubFor(post(anyUrl()).willReturn(aResponse().withStatus(200).withBody(CREATE_MULTIPART_PAYLOAD))); + // Part 1: first attempt returns 500, retry returns 200 + stubUploadFailsInitialAttemptCalls(1, aResponse().withStatus(500)); + // Part 2: succeeds on first attempt + stubFor(put(anyUrl()) + .withQueryParam("partNumber", matching(String.valueOf(2))) + .willReturn(aResponse().withStatus(200))); + + // Create a slow-streaming body with KNOWN content length (20 bytes total = 2 parts of 10 bytes) + int partSize = 10; + int totalSize = partSize * 2; + AsyncRequestBody slowStreamingBody = new AsyncRequestBody() { + @Override + public Optional contentLength() { + return Optional.of((long) totalSize); + } + + @Override + public void subscribe(Subscriber subscriber) { + subscriber.onSubscribe(new Subscription() { + private int bytesEmitted = 0; + + @Override + public void request(long n) { + // Emit data in small chunks to simulate slow streaming (like SFTP) + for (long i = 0; i < n && bytesEmitted < totalSize; i++) { + int chunkSize = Math.min(5, totalSize - bytesEmitted); + ByteBuffer buffer = ByteBuffer.allocate(chunkSize); + for (int j = 0; j < chunkSize; j++) { + buffer.put((byte) ('a' + (bytesEmitted + j) % 26)); + } + buffer.flip(); + bytesEmitted += chunkSize; + subscriber.onNext(buffer); + } + if (bytesEmitted >= totalSize) { + subscriber.onComplete(); + } + } + + @Override + public void cancel() { + // no-op + } + }); + } + }; + + // Wrap with full buffering enabled — this ensures retry buffer is populated before HTTP subscribe + BufferedSplittableAsyncRequestBody bufferedBody = + BufferedSplittableAsyncRequestBody.builder() + .asyncRequestBody(slowStreamingBody) + .bufferBeforeSend(true) + .build(); + + // The upload should complete successfully — retry works because full buffering + // ensures the retry buffer is populated before the HTTP layer subscribes + PutObjectResponse response = s3AsyncClient.putObject(b -> b.bucket(BUCKET).key(KEY), bufferedBody).join(); + assertThat(response).isNotNull(); + + // Verify part 1 was attempted more than once (retry happened) + verify(moreThan(1), putRequestedFor(anyUrl()).withQueryParam("partNumber", matching(String.valueOf(1)))); + verify(lessThanOrExactly(3), putRequestedFor(anyUrl()).withQueryParam("partNumber", matching(String.valueOf(1)))); + } }