streamFuture() {
+ return streamFuture;
+ }
}
}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipe.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipe.java
new file mode 100644
index 00000000000..986eff9fe1a
--- /dev/null
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipe.java
@@ -0,0 +1,254 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.crt.internal.request;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import software.amazon.awssdk.annotations.SdkInternalApi;
+import software.amazon.awssdk.annotations.SdkTestInternalApi;
+import software.amazon.awssdk.utils.Logger;
+
+/**
+ * Bounded producer/consumer hand-off between the caller thread (producer) and the CRT event-loop thread (consumer).
+ *
+ * The producer reads from the customer's {@code InputStream} and {@link #publish(Chunk) publishes} chunks
+ * into a bounded {@link ArrayBlockingQueue}. The consumer drains those chunks via {@link #pollDrain(ByteBuffer)},
+ * which is non-blocking: if no data is ready the consumer returns 0 bytes and CRT reschedules itself via
+ * {@code aws_channel_schedule_task_now}.
+ *
+ *
Drained chunks are returned to a free {@link ArrayDeque} (LIFO for cache hotness) guarded by this
+ * monitor. The producer parks on this monitor when the free deque is empty, providing back-pressure.
+ *
+ *
Chunk byte[] buffers are allocated lazily on the producer's first {@link #acquireForFill()}, not in
+ * the constructor. This keeps per-request heap minimal while a request is queued on the CRT connection
+ * pool waiting for a stream: the pipe object exists but its backing buffers do not.
+ *
+ *
State machine: {@code OPEN -> {EOF | ERROR | ABORTED}}. Transitions are one-way.
+ */
+@SdkInternalApi
+final class BodyChunkPipe {
+ private static final Logger LOG = Logger.loggerFor(BodyChunkPipe.class);
+
+ enum State {
+ OPEN,
+ EOF,
+ ERROR,
+ ABORTED
+ }
+
+ /**
+ * Defense-in-depth wait timeout for {@link #acquireForFill()}. Even if a code path forgets
+ * to call {@link #abort()}, a parked producer wakes every {@value} ms to re-check state.
+ * Spurious wakeups are harmless.
+ */
+ private static final long ACQUIRE_WAIT_TIMEOUT_MS = 50L;
+
+ private final int depth;
+ private final int chunkSize;
+ private final ArrayBlockingQueue ready;
+ private final Deque free;
+ private final AtomicReference state = new AtomicReference<>(State.OPEN);
+ private final String tag;
+ /**
+ * Guards the free deque, allocated counter, and producer wait/notify protocol. Kept private
+ * so external code cannot synchronize on the pipe instance and stall the producer.
+ */
+ private final Object freeLock = new Object();
+
+ private int allocated;
+ private volatile Throwable error;
+ private Chunk pendingDrain;
+
+ BodyChunkPipe(int depth, int chunkSize) {
+ this(depth, chunkSize, "-");
+ }
+
+ BodyChunkPipe(int depth, int chunkSize, String reqId) {
+ if (depth < 1) {
+ throw new IllegalArgumentException("depth must be >= 1");
+ }
+ if (chunkSize < 1) {
+ throw new IllegalArgumentException("chunkSize must be >= 1");
+ }
+ this.depth = depth;
+ this.chunkSize = chunkSize;
+ this.ready = new ArrayBlockingQueue<>(depth);
+ this.free = new ArrayDeque<>(depth);
+ this.tag = "[reqId=" + reqId + "] ";
+ }
+
+ /**
+ * Producer side: acquire a chunk to fill. Blocks if all chunks are currently in flight.
+ * Returns {@code null} only if the pipe was aborted while the producer was waiting.
+ *
+ * Allocates the chunk's backing byte[] on first use up to the configured depth. This keeps the
+ * per-request footprint minimal until the producer actually starts pumping (i.e., until after the
+ * CRT stream has been acquired).
+ */
+ Chunk acquireForFill() throws InterruptedException {
+ synchronized (freeLock) {
+ while (true) {
+ State s = state.get();
+ if (s == State.ABORTED || s == State.ERROR) {
+ LOG.debug(() -> tag + "acquireForFill returning null, state=" + s);
+ return null;
+ }
+ Chunk c = free.pollFirst();
+ if (c != null) {
+ return c;
+ }
+ if (allocated < depth) {
+ allocated++;
+ return new Chunk(chunkSize);
+ }
+ freeLock.wait(ACQUIRE_WAIT_TIMEOUT_MS);
+ }
+ }
+ }
+
+ /**
+ * Producer side: publish a filled chunk to the consumer. Caller must have set
+ * {@link Chunk#len(int)} before calling.
+ *
+ *
If the chunk is empty (zero-length read), it is recycled back to the free deque rather than
+ * pushed to the ready queue: an empty chunk would otherwise be leaked from the bounded pool, and
+ * the consumer would interpret it as a no-op anyway.
+ */
+ void publish(Chunk chunk) throws InterruptedException {
+ if (chunk.len() == 0) {
+ recycle(chunk);
+ return;
+ }
+ // ready.put() blocks if the queue is full, but the queue capacity == pool size,
+ // so this can only block briefly while the consumer drains.
+ ready.put(chunk);
+ }
+
+ /**
+ * Producer side: signal end-of-stream. Idempotent.
+ */
+ void signalEof() {
+ if (state.compareAndSet(State.OPEN, State.EOF)) {
+ LOG.debug(() -> tag + "state OPEN -> EOF");
+ }
+ }
+
+ /**
+ * Producer side: signal a fatal producer-side error. Idempotent.
+ */
+ void signalError(Throwable t) {
+ synchronized (freeLock) {
+ // Publish the cause BEFORE flipping state so a consumer's lock-free read in pollDrain
+ // never observes state==ERROR with error==null. The volatile write to `error` is
+ // harmless if the CAS later loses (idempotent signal).
+ error = t;
+ if (state.compareAndSet(State.OPEN, State.ERROR)) {
+ LOG.debug(() -> tag + "state OPEN -> ERROR (" + t.getClass().getSimpleName() + ")");
+ }
+ freeLock.notifyAll();
+ }
+ }
+
+ /**
+ * External-cancel: clear ready queue, flip state, wake producer.
+ */
+ void abort() {
+ synchronized (freeLock) {
+ if (state.compareAndSet(State.OPEN, State.ABORTED)) {
+ LOG.debug(() -> tag + "state OPEN -> ABORTED");
+ ready.clear();
+ }
+ freeLock.notifyAll();
+ }
+ }
+
+ /**
+ * Consumer side: drain bytes into {@code dst}. NEVER blocks.
+ *
+ *
Single-consumer: CRT invokes this only on the request's outgoing-stream task, which is
+ * scheduled serially on one event-loop thread per stream. {@code pendingDrain} is therefore
+ * not volatile - it is written and read by that single consumer thread.
+ *
+ * @return number of bytes drained, or {@code -1} on EOF with no remaining data.
+ * @throws RuntimeException if the pipe is in ERROR or ABORTED state with no remaining data.
+ */
+ int pollDrain(ByteBuffer dst) {
+ int totalBytesConsumed = 0;
+ while (dst.hasRemaining()) {
+ if (pendingDrain == null) {
+ pendingDrain = ready.poll();
+ }
+ if (pendingDrain == null) {
+ switch (state.get()) {
+ case ERROR:
+ throw new RuntimeException("Producer failed", error);
+ case ABORTED:
+ throw new RuntimeException("Request body stream was aborted");
+ case EOF:
+ return totalBytesConsumed > 0 ? totalBytesConsumed : -1;
+ case OPEN:
+ default:
+ // OPEN with empty queue: return what we have (possibly 0); CRT will retry.
+ return totalBytesConsumed;
+ }
+ }
+ int n = Math.min(dst.remaining(), pendingDrain.len() - pendingDrain.pos());
+ dst.put(pendingDrain.data(), pendingDrain.pos(), n);
+ pendingDrain.pos(pendingDrain.pos() + n);
+ totalBytesConsumed += n;
+ if (pendingDrain.pos() >= pendingDrain.len()) {
+ // The chunk has been fully copied into dst, so we return it to the free deque
+ // (and notify the producer in case it was waiting). This is what bounds the pool:
+ // chunks only re-enter the producer pool after the consumer has drained them.
+ Chunk drained = pendingDrain;
+ pendingDrain = null;
+ recycle(drained);
+ }
+ }
+ return totalBytesConsumed;
+ }
+
+ /**
+ * Visible-for-test / test-only helper: current pipe state.
+ */
+ @SdkTestInternalApi
+ State state() {
+ return state.get();
+ }
+
+ /**
+ * Visible-for-test / test-only helper: number of {@link Chunk} buffers minted so far. The pipe
+ * lazily allocates chunks on the producer's first {@link #acquireForFill()}, so this is 0 until
+ * the producer starts pumping and grows up to {@code depth}.
+ */
+ @SdkTestInternalApi
+ int allocatedForTest() {
+ synchronized (freeLock) {
+ return allocated;
+ }
+ }
+
+ private void recycle(Chunk c) {
+ c.reset();
+ synchronized (freeLock) {
+ free.push(c);
+ freeLock.notifyAll();
+ }
+ }
+}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/Chunk.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/Chunk.java
new file mode 100644
index 00000000000..0797b8cbc67
--- /dev/null
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/Chunk.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.crt.internal.request;
+
+import software.amazon.awssdk.annotations.SdkInternalApi;
+
+/**
+ * A reusable byte buffer carrying request body data through the {@link BodyChunkPipe}.
+ *
+ *
Internal to this package; used only by {@link BodyChunkPipe} and {@link SyncRequestBodyPump}.
+ *
+ *
{@code pos} and {@code len} are intentionally non-volatile: hand-off between the producer and
+ * consumer always goes through the {@code ArrayBlockingQueue} (or the {@code freeLock} monitor on
+ * recycle), both of which provide release/acquire happens-before for the field writes.
+ */
+@SdkInternalApi
+final class Chunk {
+ private final byte[] data;
+ private int pos;
+ private int len;
+
+ Chunk(int chunkSize) {
+ this.data = new byte[chunkSize];
+ }
+
+ byte[] data() {
+ return data;
+ }
+
+ int pos() {
+ return pos;
+ }
+
+ void pos(int pos) {
+ this.pos = pos;
+ }
+
+ int len() {
+ return len;
+ }
+
+ void len(int len) {
+ this.len = len;
+ }
+
+ void reset() {
+ this.pos = 0;
+ this.len = 0;
+ }
+}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java
index 8672d80b0d1..734c5a35222 100644
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java
@@ -23,6 +23,7 @@
import software.amazon.awssdk.crt.http.HttpHeader;
import software.amazon.awssdk.crt.http.HttpRequest;
import software.amazon.awssdk.crt.http.HttpRequestBase;
+import software.amazon.awssdk.http.ContentStreamProvider;
import software.amazon.awssdk.http.Header;
import software.amazon.awssdk.http.HttpExecuteRequest;
import software.amazon.awssdk.http.Protocol;
@@ -33,6 +34,16 @@
@SdkInternalApi
public final class CrtRequestAdapter {
+ /**
+ * Per-chunk size used by the sync request-body pipe.
+ */
+ private static final int CHUNK_SIZE = 128 * 1024;
+
+ /**
+ * Number of in-flight chunks the pipe holds.
+ */
+ private static final int PIPE_DEPTH = 4;
+
private CrtRequestAdapter() {
}
@@ -60,7 +71,12 @@ public static HttpRequestBase toAsyncCrtRequest(CrtAsyncRequestContext request)
crtRequestBodyAdapter);
}
- public static HttpRequest toCrtRequest(CrtRequestContext request) {
+ /**
+ * Build the CRT request for the sync path. When the SDK request has a body, this also constructs the
+ * {@link BodyChunkPipe} and a {@link SyncRequestBodyPump}; the caller thread is expected to drive
+ * the pump after the stream is activated.
+ */
+ public static SyncCrtRequest toCrtRequest(CrtRequestContext request) {
HttpExecuteRequest sdkExecuteRequest = request.sdkRequest();
SdkHttpRequest sdkRequest = sdkExecuteRequest.httpRequest();
@@ -78,14 +94,40 @@ public static HttpRequest toCrtRequest(CrtRequestContext request) {
HttpHeader[] crtHeaderArray = asArray(createHttpHeaderList(sdkRequest.getUri(), sdkExecuteRequest));
String finalEncodedPath = encodedPath + encodedQueryString;
- return sdkExecuteRequest.contentStreamProvider()
- .map(provider -> new HttpRequest(method,
- finalEncodedPath,
- crtHeaderArray,
- new CrtRequestInputStreamAdapter(provider)))
- .orElse(new HttpRequest(method,
- finalEncodedPath,
- crtHeaderArray, null));
+
+ Optional providerOpt = sdkExecuteRequest.contentStreamProvider();
+ if (!providerOpt.isPresent()) {
+ return new SyncCrtRequest(new HttpRequest(method, finalEncodedPath, crtHeaderArray, null), null);
+ }
+
+ String reqId = request.reqId();
+ BodyChunkPipe pipe = new BodyChunkPipe(PIPE_DEPTH, CHUNK_SIZE, reqId);
+ PipeBackedRequestBodyStream bodyStream = new PipeBackedRequestBodyStream(pipe);
+ SyncRequestBodyPump pump = new SyncRequestBodyPump(providerOpt.get(), pipe, reqId);
+ HttpRequest crtRequest = new HttpRequest(method, finalEncodedPath, crtHeaderArray, bodyStream);
+ return new SyncCrtRequest(crtRequest, pump);
+ }
+
+ /**
+ * Holder returned from {@link #toCrtRequest(CrtRequestContext)} bundling the CRT-side request and the
+ * caller-thread producer pump (null when the SDK request has no body).
+ */
+ public static final class SyncCrtRequest {
+ private final HttpRequest httpRequest;
+ private final SyncRequestBodyPump pump;
+
+ SyncCrtRequest(HttpRequest httpRequest, SyncRequestBodyPump pump) {
+ this.httpRequest = httpRequest;
+ this.pump = pump;
+ }
+
+ public HttpRequest httpRequest() {
+ return httpRequest;
+ }
+
+ public SyncRequestBodyPump pump() {
+ return pump;
+ }
}
private static HttpHeader[] asArray(List crtHeaderList) {
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestInputStreamAdapter.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestInputStreamAdapter.java
deleted file mode 100644
index 68f418b9e1d..00000000000
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestInputStreamAdapter.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License").
- * You may not use this file except in compliance with the License.
- * A copy of the License is located at
- *
- * http://aws.amazon.com/apache2.0
- *
- * or in the "license" file accompanying this file. This file is distributed
- * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package software.amazon.awssdk.http.crt.internal.request;
-
-import static java.lang.Math.min;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import software.amazon.awssdk.annotations.SdkInternalApi;
-import software.amazon.awssdk.crt.http.HttpRequestBodyStream;
-import software.amazon.awssdk.http.ContentStreamProvider;
-
-@SdkInternalApi
-final class CrtRequestInputStreamAdapter implements HttpRequestBodyStream {
- private static final int READ_BUFFER_SIZE = 16 * 1024;
-
- private final ContentStreamProvider provider;
- private volatile InputStream providerStream;
- private final byte[] readBuffer = new byte[READ_BUFFER_SIZE];
-
- CrtRequestInputStreamAdapter(ContentStreamProvider provider) {
- this.provider = provider;
- }
-
- @Override
- public boolean sendRequestBody(ByteBuffer bodyBytesOut) {
- int read;
-
- try {
- if (providerStream == null) {
- createNewStream();
- }
-
- int toRead = min(READ_BUFFER_SIZE, bodyBytesOut.remaining());
- read = providerStream.read(readBuffer, 0, toRead);
-
- if (read > 0) {
- bodyBytesOut.put(readBuffer, 0, read);
- }
- } catch (IOException ioe) {
- throw new RuntimeException(ioe);
- }
-
- return read < 0;
- }
-
- @Override
- public boolean resetPosition() {
- try {
- createNewStream();
- } catch (IOException ioe) {
- throw new RuntimeException(ioe);
- }
-
- return true;
- }
-
- private void createNewStream() throws IOException {
- if (providerStream != null) {
- providerStream.close();
- }
- providerStream = provider.newStream();
- }
-}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/PipeBackedRequestBodyStream.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/PipeBackedRequestBodyStream.java
new file mode 100644
index 00000000000..a2dd78eea0c
--- /dev/null
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/PipeBackedRequestBodyStream.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.crt.internal.request;
+
+import java.nio.ByteBuffer;
+import software.amazon.awssdk.annotations.SdkInternalApi;
+import software.amazon.awssdk.crt.http.HttpRequestBodyStream;
+
+/**
+ * A {@link HttpRequestBodyStream} adapter whose {@link #sendRequestBody(ByteBuffer)} drains bytes from a
+ * {@link BodyChunkPipe} that is fed by the caller thread. The pull callback NEVER blocks: if no data is ready,
+ * it returns 0 bytes and CRT reschedules the outgoing-stream task via {@code aws_channel_schedule_task_now},
+ * allowing other event-loop tasks (such as a concurrent GET response delivery) to run before the retry.
+ */
+@SdkInternalApi
+final class PipeBackedRequestBodyStream implements HttpRequestBodyStream {
+
+ private final BodyChunkPipe pipe;
+
+ PipeBackedRequestBodyStream(BodyChunkPipe pipe) {
+ this.pipe = pipe;
+ }
+
+ @Override
+ public boolean sendRequestBody(ByteBuffer bodyBytesOut) {
+ int drained = pipe.pollDrain(bodyBytesOut);
+ return drained < 0;
+ }
+
+ @Override
+ public boolean resetPosition() {
+ // The SDK retry layer (RetryableStage) handles request-level retries by calling prepareRequest() again,
+ // CRT does not currently exercise resetPosition for HTTP/1.1, so opting out is safe in practice.
+ return false;
+ }
+}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPump.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPump.java
new file mode 100644
index 00000000000..254b4127ea3
--- /dev/null
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPump.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.crt.internal.request;
+
+import java.io.IOException;
+import java.io.InputStream;
+import software.amazon.awssdk.annotations.SdkInternalApi;
+import software.amazon.awssdk.http.ContentStreamProvider;
+import software.amazon.awssdk.utils.Logger;
+
+/**
+ * Caller-thread producer that reads from the customer's {@link InputStream} and publishes chunks to a
+ * {@link BodyChunkPipe}. Runs on the caller (sync) thread between stream activation and
+ * {@code responseFuture.join()}, ensuring the blocking {@code read()} happens off the CRT event loop.
+ */
+@SdkInternalApi
+public final class SyncRequestBodyPump {
+ private static final Logger LOG = Logger.loggerFor(SyncRequestBodyPump.class);
+
+ private final ContentStreamProvider contentStreamProvider;
+ private final BodyChunkPipe pipe;
+ private final String tag;
+
+ SyncRequestBodyPump(ContentStreamProvider contentStreamProvider, BodyChunkPipe pipe) {
+ this(contentStreamProvider, pipe, "-");
+ }
+
+ SyncRequestBodyPump(ContentStreamProvider contentStreamProvider, BodyChunkPipe pipe, String reqId) {
+ this.contentStreamProvider = contentStreamProvider;
+ this.pipe = pipe;
+ this.tag = "[reqId=" + reqId + "] ";
+ }
+
+ /**
+ * Pump the entire input stream into the pipe. Runs on the caller thread; never invoked on the CRT
+ * event-loop thread. On EOF signals the pipe normally; on {@link IOException} signals an error and rethrows.
+ */
+ public void pump() throws IOException {
+ LOG.info(() -> tag + "pump() entered");
+ try (InputStream in = contentStreamProvider.newStream()) {
+ while (true) {
+ Chunk chunk = pipe.acquireForFill();
+ if (chunk == null) {
+ LOG.info(() -> tag + "pump() exiting due to abort (acquireForFill returned null)");
+ return;
+ }
+ int read;
+ try {
+ read = in.read(chunk.data(), 0, chunk.data().length);
+ } catch (IOException ioe) {
+ LOG.info(() -> tag + "pump() exiting due to error: " + ioe.getMessage());
+ pipe.signalError(ioe);
+ throw ioe;
+ }
+ if (read < 0) {
+ LOG.info(() -> tag + "pump() exiting due to eof");
+ pipe.signalEof();
+ return;
+ }
+ chunk.pos(0);
+ chunk.len(read);
+ pipe.publish(chunk);
+ }
+ } catch (InterruptedException ie) {
+ LOG.info(() -> tag + "pump() exiting due to interrupt");
+ pipe.abort();
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while writing request body", ie);
+ }
+ }
+
+ /**
+ * Abort the underlying pipe (e.g., when the caller's {@code call()} is cancelled).
+ */
+ public void abort() {
+ LOG.info(() -> tag + "pump.abort() called");
+ pipe.abort();
+ }
+}
diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientWireMockTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientWireMockTest.java
index ce5d778f06a..5171451a4f0 100644
--- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientWireMockTest.java
+++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientWireMockTest.java
@@ -17,8 +17,13 @@
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.any;
+import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.equalToIgnoreCase;
+import static com.github.tomakehurst.wiremock.client.WireMock.put;
+import static com.github.tomakehurst.wiremock.client.WireMock.putRequestedFor;
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.verify;
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
import static org.assertj.core.api.Assertions.assertThat;
@@ -26,13 +31,21 @@
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.PROTOCOL;
import static software.amazon.awssdk.http.crt.CrtHttpClientTestUtils.createRequest;
+import com.github.tomakehurst.wiremock.http.Fault;
import com.github.tomakehurst.wiremock.junit.WireMockRule;
import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -45,6 +58,8 @@
import software.amazon.awssdk.http.HttpMetric;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.http.SdkHttpFullRequest;
+import software.amazon.awssdk.http.SdkHttpMethod;
import software.amazon.awssdk.http.SdkHttpRequest;
import software.amazon.awssdk.metrics.MetricCollection;
import software.amazon.awssdk.metrics.MetricCollector;
@@ -133,6 +148,294 @@ public void abortRequest_shouldFailTheExceptionWithIOException() throws Exceptio
}
}
+ @Test
+ public void putRequest_withInputStreamBody_serverReceivesBody() throws Exception {
+ try (SdkHttpClient client = AwsCrtHttpClient.create()) {
+ String body = "hello pull pump";
+ byte[] bodyBytes = body.getBytes(StandardCharsets.UTF_8);
+ URI uri = URI.create("http://localhost:" + mockServer.port());
+ stubFor(put(urlPathEqualTo("/sink")).willReturn(aResponse().withStatus(200)));
+
+ SdkHttpFullRequest request = SdkHttpFullRequest.builder()
+ .uri(uri)
+ .method(SdkHttpMethod.PUT)
+ .encodedPath("/sink")
+ .putHeader("Host", uri.getHost())
+ .putHeader("Content-Length", Integer.toString(bodyBytes.length))
+ .build();
+
+ HttpExecuteRequest executeRequest = HttpExecuteRequest.builder()
+ .request(request)
+ .contentStreamProvider(() -> new ByteArrayInputStream(bodyBytes))
+ .build();
+
+ HttpExecuteResponse response = client.prepareRequest(executeRequest).call();
+
+ assertThat(response.httpResponse().statusCode()).isEqualTo(200);
+ verify(putRequestedFor(urlPathEqualTo("/sink"))
+ .withHeader("Content-Length", equalTo(Integer.toString(bodyBytes.length)))
+ .withRequestBody(equalToIgnoreCase(body)));
+ }
+ }
+
+ @Test
+ public void inputStreamThrows_connectionReturnedToPool_subsequentRequestSucceeds() throws Exception {
+ // Bound the pool to a single connection: if the failed request leaks its connection, the
+ // second call() either fails to acquire (with the explicit timeout below) or blocks until
+ // the test framework times out. Either manifests as a deterministic failure rather than a hang.
+ try (SdkHttpClient client = AwsCrtHttpClient.builder()
+ .maxConcurrency(1)
+ .connectionAcquisitionTimeout(Duration.ofSeconds(10))
+ .build()) {
+ URI uri = URI.create("http://localhost:" + mockServer.port());
+ stubFor(put(urlPathEqualTo("/sink")).willReturn(aResponse().withStatus(200)));
+
+ IOException expected = new IOException("simulated upstream failure");
+ SdkHttpFullRequest failingRequest = SdkHttpFullRequest.builder()
+ .uri(uri)
+ .method(SdkHttpMethod.PUT)
+ .encodedPath("/sink")
+ .putHeader("Host", uri.getHost())
+ .putHeader("Content-Length", "100")
+ .build();
+ HttpExecuteRequest failingExecute =
+ HttpExecuteRequest.builder()
+ .request(failingRequest)
+ .contentStreamProvider(() -> new InputStream() {
+ @Override
+ public int read() throws IOException {
+ throw expected;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ throw expected;
+ }
+ })
+ .build();
+
+ assertThatThrownBy(() -> client.prepareRequest(failingExecute).call())
+ .isInstanceOf(IOException.class);
+
+ // If the previous failure leaked the connection, this second call would fail to acquire
+ // (bounded by the connectionAcquisitionTimeout configured above) instead of hanging.
+ String body = "second request body";
+ byte[] bodyBytes = body.getBytes(StandardCharsets.UTF_8);
+ SdkHttpFullRequest okRequest = SdkHttpFullRequest.builder()
+ .uri(uri)
+ .method(SdkHttpMethod.PUT)
+ .encodedPath("/sink")
+ .putHeader("Host", uri.getHost())
+ .putHeader("Content-Length", Integer.toString(bodyBytes.length))
+ .build();
+ HttpExecuteRequest okExecute = HttpExecuteRequest.builder()
+ .request(okRequest)
+ .contentStreamProvider(() -> new ByteArrayInputStream(bodyBytes))
+ .build();
+
+ HttpExecuteResponse response = client.prepareRequest(okExecute).call();
+ assertThat(response.httpResponse().statusCode()).isEqualTo(200);
+ }
+ }
+
+ @Test
+ public void abortMidRequest_connectionReturnedToPool_subsequentRequestSucceeds() throws Exception {
+ try (SdkHttpClient client = AwsCrtHttpClient.builder()
+ .maxConcurrency(1)
+ .connectionAcquisitionTimeout(Duration.ofSeconds(10))
+ .build()) {
+ URI uri = URI.create("http://localhost:" + mockServer.port());
+ stubFor(any(urlPathEqualTo("/")).willReturn(aResponse().withFixedDelay(2000).withBody("hello")));
+ stubFor(put(urlPathEqualTo("/sink")).willReturn(aResponse().withStatus(200)));
+
+ SdkHttpRequest delayedRequest = createRequest(uri);
+ HttpExecuteRequest delayedExecute = HttpExecuteRequest.builder()
+ .request(delayedRequest)
+ .contentStreamProvider(() -> new ByteArrayInputStream(new byte[0]))
+ .build();
+
+ ExecutableHttpRequest abortable = client.prepareRequest(delayedExecute);
+ executorService.schedule(abortable::abort, 100, TimeUnit.MILLISECONDS);
+ assertThatThrownBy(abortable::call).isInstanceOf(IOException.class).hasMessageContaining("cancelled");
+
+ String body = "after abort";
+ byte[] bodyBytes = body.getBytes(StandardCharsets.UTF_8);
+ SdkHttpFullRequest okRequest = SdkHttpFullRequest.builder()
+ .uri(uri)
+ .method(SdkHttpMethod.PUT)
+ .encodedPath("/sink")
+ .putHeader("Host", uri.getHost())
+ .putHeader("Content-Length", Integer.toString(bodyBytes.length))
+ .build();
+ HttpExecuteRequest okExecute = HttpExecuteRequest.builder()
+ .request(okRequest)
+ .contentStreamProvider(() -> new ByteArrayInputStream(bodyBytes))
+ .build();
+ HttpExecuteResponse response = client.prepareRequest(okExecute).call();
+ assertThat(response.httpResponse().statusCode()).isEqualTo(200);
+ }
+ }
+
+ @Test
+ public void serverResetsConnection_connectionReturnedToPool_subsequentRequestSucceeds() throws Exception {
+ try (SdkHttpClient client = AwsCrtHttpClient.builder()
+ .maxConcurrency(1)
+ .connectionAcquisitionTimeout(Duration.ofSeconds(10))
+ .build()) {
+ URI uri = URI.create("http://localhost:" + mockServer.port());
+ stubFor(put(urlPathEqualTo("/sink"))
+ .willReturn(aResponse().withFault(Fault.CONNECTION_RESET_BY_PEER)));
+
+ byte[] bodyBytes = randomAlphabetic(64).getBytes(StandardCharsets.UTF_8);
+ SdkHttpFullRequest failingRequest = SdkHttpFullRequest.builder()
+ .uri(uri)
+ .method(SdkHttpMethod.PUT)
+ .encodedPath("/sink")
+ .putHeader("Host", uri.getHost())
+ .putHeader("Content-Length", Integer.toString(bodyBytes.length))
+ .build();
+ HttpExecuteRequest failingExecute = HttpExecuteRequest.builder()
+ .request(failingRequest)
+ .contentStreamProvider(() -> new ByteArrayInputStream(bodyBytes))
+ .build();
+
+ assertThatThrownBy(() -> client.prepareRequest(failingExecute).call())
+ .isInstanceOf(IOException.class);
+
+ stubFor(put(urlPathEqualTo("/sink2")).willReturn(aResponse().withStatus(200)));
+ byte[] okBytes = "ok".getBytes(StandardCharsets.UTF_8);
+ SdkHttpFullRequest okRequest = SdkHttpFullRequest.builder()
+ .uri(uri)
+ .method(SdkHttpMethod.PUT)
+ .encodedPath("/sink2")
+ .putHeader("Host", uri.getHost())
+ .putHeader("Content-Length", Integer.toString(okBytes.length))
+ .build();
+ HttpExecuteRequest okExecute = HttpExecuteRequest.builder()
+ .request(okRequest)
+ .contentStreamProvider(() -> new ByteArrayInputStream(okBytes))
+ .build();
+
+ HttpExecuteResponse response = client.prepareRequest(okExecute).call();
+ assertThat(response.httpResponse().statusCode()).isEqualTo(200);
+ }
+ }
+
+ /**
+ * Regression test for the deadlock the pull-pump fix addresses. On master, the request body's
+ * {@code InputStream.read(...)} ran on the CRT event-loop thread (via the body callback), which
+ * meant a body sourced from a {@code GET}'s {@code ResponseInputStream} on the same event loop
+ * could deadlock: the GET held the event loop while the PUT body waited for it.
+ *
+ * Pull-pump moves the read to the caller (sync) thread. This test verifies that load-bearing
+ * claim by recording the thread that performs the body read and asserting it is the caller
+ * thread - not a CRT event-loop thread. Failure of either the assertion or the test timeout
+ * (a hang) is the deadlock signal.
+ */
+ @Test
+ public void putBodyReadHappensOnCallerThread_notOnCrtEventLoop() throws Exception {
+ try (SdkHttpClient client = AwsCrtHttpClient.builder()
+ .maxConcurrency(1)
+ .connectionAcquisitionTimeout(Duration.ofSeconds(10))
+ .build()) {
+ URI uri = URI.create("http://localhost:" + mockServer.port());
+ stubFor(put(urlPathEqualTo("/sink")).willReturn(aResponse().withStatus(200)));
+
+ byte[] bodyBytes = "body-on-caller".getBytes(StandardCharsets.UTF_8);
+ AtomicReference readThreadName = new AtomicReference<>();
+ SdkHttpFullRequest request = SdkHttpFullRequest.builder()
+ .uri(uri)
+ .method(SdkHttpMethod.PUT)
+ .encodedPath("/sink")
+ .putHeader("Host", uri.getHost())
+ .putHeader("Content-Length", Integer.toString(bodyBytes.length))
+ .build();
+ HttpExecuteRequest executeRequest =
+ HttpExecuteRequest.builder()
+ .request(request)
+ .contentStreamProvider(() -> new ByteArrayInputStream(bodyBytes) {
+ @Override
+ public synchronized int read(byte[] b, int off, int len) {
+ readThreadName.compareAndSet(null, Thread.currentThread().getName());
+ return super.read(b, off, len);
+ }
+ })
+ .build();
+
+ String callerThreadName = Thread.currentThread().getName();
+ HttpExecuteResponse response = client.prepareRequest(executeRequest).call();
+ assertThat(response.httpResponse().statusCode()).isEqualTo(200);
+
+ String observed = readThreadName.get();
+ assertThat(observed)
+ .as("body read should happen on the caller thread, not the CRT event loop")
+ .isNotNull()
+ .isEqualTo(callerThreadName)
+ .doesNotContainIgnoringCase("AwsEventLoop")
+ .doesNotContainIgnoringCase("aws-event-loop");
+ }
+ }
+
+ /**
+ * Stress companion to {@link #putBodyReadHappensOnCallerThread_notOnCrtEventLoop}. Issues a
+ * delayed GET (response delayed server-side) and a PUT in parallel through the same
+ * {@code maxConcurrency(1)} client. On master, sequencing them through a single connection
+ * with the body read tied to the event-loop thread could deadlock; here both calls must
+ * complete within the test timeout.
+ */
+ @Test
+ public void getInFlight_concurrentPut_bothComplete() throws Exception {
+ try (SdkHttpClient client = AwsCrtHttpClient.builder()
+ .maxConcurrency(1)
+ .connectionAcquisitionTimeout(Duration.ofSeconds(15))
+ .build()) {
+ URI uri = URI.create("http://localhost:" + mockServer.port());
+ stubFor(any(urlPathEqualTo("/slow"))
+ .willReturn(aResponse().withFixedDelay(2_000).withBody("hello")));
+ stubFor(put(urlPathEqualTo("/sink")).willReturn(aResponse().withStatus(200)));
+
+ SdkHttpRequest getRequest = SdkHttpFullRequest.builder()
+ .uri(uri)
+ .method(SdkHttpMethod.GET)
+ .encodedPath("/slow")
+ .putHeader("Host", uri.getHost())
+ .build();
+ HttpExecuteRequest getExecute = HttpExecuteRequest.builder()
+ .request(getRequest)
+ .contentStreamProvider(() -> new ByteArrayInputStream(new byte[0]))
+ .build();
+
+ byte[] putBytes = "put-body".getBytes(StandardCharsets.UTF_8);
+ SdkHttpFullRequest putRequest = SdkHttpFullRequest.builder()
+ .uri(uri)
+ .method(SdkHttpMethod.PUT)
+ .encodedPath("/sink")
+ .putHeader("Host", uri.getHost())
+ .putHeader("Content-Length", Integer.toString(putBytes.length))
+ .build();
+ HttpExecuteRequest putExecute = HttpExecuteRequest.builder()
+ .request(putRequest)
+ .contentStreamProvider(() -> new ByteArrayInputStream(putBytes))
+ .build();
+
+ ExecutorService pool = Executors.newFixedThreadPool(2);
+ try {
+ Callable getTask = () -> client.prepareRequest(getExecute).call();
+ Callable putTask = () -> client.prepareRequest(putExecute).call();
+ Future getFuture = pool.submit(getTask);
+ Future putFuture = pool.submit(putTask);
+
+ HttpExecuteResponse getResponse = getFuture.get(15, TimeUnit.SECONDS);
+ HttpExecuteResponse putResponse = putFuture.get(15, TimeUnit.SECONDS);
+ assertThat(getResponse.httpResponse().statusCode()).isEqualTo(200);
+ assertThat(putResponse.httpResponse().statusCode()).isEqualTo(200);
+ } finally {
+ pool.shutdownNow();
+ pool.awaitTermination(5, TimeUnit.SECONDS);
+ }
+ }
+ }
+
/**
* Make a simple request and wait for it to finish.
*
diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java
index 456000ac115..8b916ba1ee9 100644
--- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java
+++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java
@@ -87,7 +87,7 @@ public void execute_requestConversionFails_failsFuture() {
.request(HttpExecuteRequest.builder().build())
.build();
- CompletableFuture executeFuture = requestExecutor.execute(context);
+ CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture();
assertThat(executeFuture).hasFailedWithThrowableThat().isInstanceOf(NullPointerException.class);
}
@@ -102,7 +102,7 @@ public void execute_acquireStreamFails_wrapsWithIOException() {
.thenReturn(completableFuture);
completableFuture.completeExceptionally(exception);
- CompletableFuture executeFuture = requestExecutor.execute(context);
+ CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture();
assertThat(executeFuture).hasFailedWithThrowableThat().hasCause(exception).isInstanceOf(IOException.class);
}
@@ -116,7 +116,7 @@ public void execute_retryableException_wrapsWithIOException(Throwable throwable)
Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class)))
.thenReturn(completableFuture);
- CompletableFuture executeFuture = requestExecutor.execute(context);
+ CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture();
assertThat(executeFuture).hasFailedWithThrowableThat().hasCause(throwable).isInstanceOf(IOException.class);
}
@@ -133,7 +133,7 @@ public void execute_httpException_mapsToCorrectException(Entry executeFuture = requestExecutor.execute(context);
+ CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture();
assertThatThrownBy(executeFuture::join).hasCauseInstanceOf(expectedExceptionClass);
}
@@ -146,7 +146,7 @@ public void execute_nonRetryableHttpException_doesNotWrapWithIOException() {
Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class)))
.thenReturn(completableFuture);
- CompletableFuture executeFuture = requestExecutor.execute(context);
+ CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture();
assertThatThrownBy(executeFuture::join).hasCause(exception);
}
diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipeTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipeTest.java
new file mode 100644
index 00000000000..72f29278589
--- /dev/null
+++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipeTest.java
@@ -0,0 +1,396 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.crt.internal.request;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
+
+class BodyChunkPipeTest {
+
+ @Test
+ void pollDrain_emptyOpenPipe_returnsZero() {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ ByteBuffer dst = ByteBuffer.allocate(8);
+
+ int n = pipe.pollDrain(dst);
+
+ assertThat(n).isZero();
+ assertThat(dst.position()).isZero();
+ }
+
+ @Test
+ void pollDrain_afterEofWithEmptyQueue_returnsMinusOne() {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ pipe.signalEof();
+
+ int n = pipe.pollDrain(ByteBuffer.allocate(8));
+
+ assertThat(n).isEqualTo(-1);
+ }
+
+ @Test
+ void publish_thenDrain_consumerSeesProducerBytes() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ Chunk c = pipe.acquireForFill();
+ byte[] payload = {1, 2, 3, 4, 5};
+ System.arraycopy(payload, 0, c.data(), 0, payload.length);
+ c.pos(0);
+ c.len(payload.length);
+ pipe.publish(c);
+ pipe.signalEof();
+ ByteBuffer dst = ByteBuffer.allocate(16);
+
+ int first = pipe.pollDrain(dst);
+ int second = pipe.pollDrain(dst);
+
+ assertThat(first).isEqualTo(payload.length);
+ assertThat(second).isEqualTo(-1);
+ dst.flip();
+ byte[] out = new byte[dst.remaining()];
+ dst.get(out);
+ assertThat(out).containsExactly(payload);
+ }
+
+ @Test
+ void signalError_pollDrainThrows() {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ pipe.signalError(new RuntimeException("boom"));
+
+ assertThatThrownBy(() -> pipe.pollDrain(ByteBuffer.allocate(8)))
+ .hasMessageContaining("Producer failed")
+ .hasRootCauseMessage("boom");
+ }
+
+ @Test
+ void abort_emptiesReadyAndChangesState() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ Chunk c = pipe.acquireForFill();
+ c.len(4);
+ pipe.publish(c);
+
+ pipe.abort();
+
+ assertThatThrownBy(() -> pipe.pollDrain(ByteBuffer.allocate(8)))
+ .hasMessageContaining("aborted");
+ }
+
+ @Test
+ void pollDrain_signalErrorWithQueuedChunks_drainsThenThrows() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ Chunk c = pipe.acquireForFill();
+ byte[] payload = {7, 8, 9};
+ System.arraycopy(payload, 0, c.data(), 0, payload.length);
+ c.len(payload.length);
+ pipe.publish(c);
+ pipe.signalError(new RuntimeException("boom"));
+
+ ByteBuffer dst = ByteBuffer.allocate(payload.length);
+ int drained = pipe.pollDrain(dst);
+
+ assertThat(drained).isEqualTo(payload.length);
+ dst.flip();
+ byte[] out = new byte[dst.remaining()];
+ dst.get(out);
+ assertThat(out).containsExactly(payload);
+ assertThatThrownBy(() -> pipe.pollDrain(ByteBuffer.allocate(8)))
+ .hasMessageContaining("Producer failed")
+ .hasRootCauseMessage("boom");
+ }
+
+ @Test
+ void pollDrain_signalEofWithQueuedChunks_drainsThenReturnsMinusOne() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ Chunk c = pipe.acquireForFill();
+ byte[] payload = {10, 20, 30};
+ System.arraycopy(payload, 0, c.data(), 0, payload.length);
+ c.len(payload.length);
+ pipe.publish(c);
+ pipe.signalEof();
+
+ ByteBuffer dst = ByteBuffer.allocate(payload.length);
+ int drained = pipe.pollDrain(dst);
+ int afterDrain = pipe.pollDrain(ByteBuffer.allocate(8));
+
+ assertThat(drained).isEqualTo(payload.length);
+ dst.flip();
+ byte[] out = new byte[dst.remaining()];
+ dst.get(out);
+ assertThat(out).containsExactly(payload);
+ assertThat(afterDrain).isEqualTo(-1);
+ }
+
+ @Test
+ void abort_afterSignalEof_leavesStateAsEof() {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ pipe.signalEof();
+
+ pipe.abort();
+
+ assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.EOF);
+ assertThat(pipe.pollDrain(ByteBuffer.allocate(8))).isEqualTo(-1);
+ }
+
+ @Test
+ void abort_afterSignalEofWithQueuedChunks_doesNotClearReady() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ Chunk c = pipe.acquireForFill();
+ byte[] payload = {1, 2, 3};
+ System.arraycopy(payload, 0, c.data(), 0, payload.length);
+ c.len(payload.length);
+ pipe.publish(c);
+ pipe.signalEof();
+
+ pipe.abort();
+
+ assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.EOF);
+ ByteBuffer dst = ByteBuffer.allocate(payload.length);
+ int drained = pipe.pollDrain(dst);
+ assertThat(drained).isEqualTo(payload.length);
+ assertThat(pipe.pollDrain(ByteBuffer.allocate(8))).isEqualTo(-1);
+ }
+
+ @Test
+ void recycle_intoEofPipe_doesNotThrowAndDoesNotCorruptPool() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ Chunk c = pipe.acquireForFill();
+ c.len(4);
+ pipe.publish(c);
+ pipe.signalEof();
+
+ ByteBuffer dst = ByteBuffer.allocate(8);
+ int drained = pipe.pollDrain(dst);
+ int afterDrain = pipe.pollDrain(ByteBuffer.allocate(8));
+
+ assertThat(drained).isEqualTo(4);
+ assertThat(afterDrain).isEqualTo(-1);
+ assertThat(pipe.allocatedForTest()).isEqualTo(1);
+ }
+
+ @Test
+ void recycle_intoAbortedPipe_doesNotThrow() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ Chunk c = pipe.acquireForFill();
+ pipe.abort();
+
+ c.len(0);
+ pipe.publish(c);
+
+ assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.ABORTED);
+ }
+
+ @Test
+ void recycle_intoErrorPipe_doesNotThrow() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ Chunk c = pipe.acquireForFill();
+ pipe.signalError(new RuntimeException("boom"));
+
+ c.len(0);
+ pipe.publish(c);
+
+ assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.ERROR);
+ }
+
+ @Test
+ void constructor_doesNotAllocateChunks() {
+ BodyChunkPipe pipe = new BodyChunkPipe(4, 16);
+
+ assertThat(pipe.allocatedForTest()).isZero();
+ }
+
+ @Test
+ void acquireForFill_firstCall_allocatesOneChunk() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(4, 16);
+
+ Chunk c = pipe.acquireForFill();
+
+ assertThat(c).isNotNull();
+ assertThat(c.data()).hasSize(16);
+ assertThat(pipe.allocatedForTest()).isEqualTo(1);
+ }
+
+ @Test
+ void acquireForFill_uniqueChunksUpToDepth_thenStopsAllocating() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(3, 8);
+ Chunk c1 = pipe.acquireForFill();
+ Chunk c2 = pipe.acquireForFill();
+ Chunk c3 = pipe.acquireForFill();
+
+ c1.len(1);
+ pipe.publish(c1);
+ pipe.pollDrain(ByteBuffer.allocate(8));
+ Chunk reused = pipe.acquireForFill();
+
+ assertThat(c1).isNotSameAs(c2).isNotSameAs(c3);
+ assertThat(c2).isNotSameAs(c3);
+ assertThat(pipe.allocatedForTest()).isEqualTo(3);
+ assertThat(reused).isSameAs(c1);
+ }
+
+ @Test
+ void acquireForFill_recycledChunkReused_noNewAllocation() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ Chunk c = pipe.acquireForFill();
+ c.len(3);
+ pipe.publish(c);
+ pipe.pollDrain(ByteBuffer.allocate(8));
+
+ Chunk reused = pipe.acquireForFill();
+
+ assertThat(reused).isSameAs(c);
+ assertThat(pipe.allocatedForTest()).isEqualTo(1);
+ }
+
+ @Test
+ void acquireForFill_afterAbort_returnsNull() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ pipe.abort();
+
+ Chunk c = pipe.acquireForFill();
+
+ assertThat(c).isNull();
+ }
+
+ @Test
+ void acquireForFill_afterSignalError_returnsNull() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ pipe.signalError(new RuntimeException("boom"));
+
+ Chunk c = pipe.acquireForFill();
+
+ assertThat(c).isNull();
+ }
+
+ @Test
+ void constructor_invalidDepth_throws() {
+ assertThatThrownBy(() -> new BodyChunkPipe(0, 8))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("depth");
+ }
+
+ @Test
+ void constructor_invalidChunkSize_throws() {
+ assertThatThrownBy(() -> new BodyChunkPipe(2, 0))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("chunkSize");
+ }
+
+ /**
+ * Multi-threaded ordering test: producer races to call {@link BodyChunkPipe#signalError(Throwable)}
+ * while a consumer is concurrently spinning on {@link BodyChunkPipe#pollDrain(java.nio.ByteBuffer)}.
+ * The contract is that whenever the consumer observes the ERROR state, the cause must already
+ * be visible (no {@code RuntimeException("Producer failed", null)}). RepeatedTest amplifies the
+ * race window. With the cause published before the CAS, this should pass on every iteration.
+ */
+ @RepeatedTest(50)
+ void signalError_concurrentPollDrain_consumerNeverSeesNullCause() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 16);
+ IllegalStateException expected = new IllegalStateException("boom");
+ CountDownLatch start = new CountDownLatch(1);
+ AtomicReference consumerError = new AtomicReference<>();
+ AtomicReference nullCauseSighting = new AtomicReference<>();
+
+ Thread consumer = new Thread(() -> {
+ try {
+ start.await();
+ ByteBuffer dst = ByteBuffer.allocate(16);
+ long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5);
+ while (System.nanoTime() < deadline) {
+ try {
+ int n = pipe.pollDrain(dst);
+ if (n < 0) {
+ return;
+ }
+ dst.clear();
+ } catch (RuntimeException re) {
+ if (re.getCause() == null) {
+ nullCauseSighting.set(re);
+ }
+ return;
+ }
+ }
+ } catch (Throwable t) {
+ consumerError.set(t);
+ }
+ }, "pipe-consumer");
+
+ Thread producer = new Thread(() -> {
+ try {
+ start.await();
+ pipe.signalError(expected);
+ } catch (Throwable t) {
+ consumerError.set(t);
+ }
+ }, "pipe-producer");
+
+ consumer.start();
+ producer.start();
+ start.countDown();
+ producer.join(5_000);
+ consumer.join(5_000);
+
+ assertThat(consumer.isAlive()).isFalse();
+ assertThat(producer.isAlive()).isFalse();
+ assertThat(consumerError.get()).isNull();
+ assertThat(nullCauseSighting.get()).isNull();
+ }
+
+ /**
+ * Multi-threaded test for the recycle/notify path: producer is forced to block on
+ * {@link BodyChunkPipe#acquireForFill()} because all chunks are in flight, then the consumer
+ * drains a chunk which {@code recycle()}s and notifies the producer to wake. This exercises the
+ * full {@code freeLock.notifyAll()} hand-off rather than relying on the defensive 50ms wakeup.
+ */
+ @Test
+ void acquireForFill_blocksUntilConsumerRecycles_thenWakesAndCompletes() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(1, 8);
+ Chunk first = pipe.acquireForFill();
+ first.len(4);
+ pipe.publish(first);
+
+ CountDownLatch producerEntered = new CountDownLatch(1);
+ AtomicReference reused = new AtomicReference<>();
+ AtomicReference producerError = new AtomicReference<>();
+ Thread producer = new Thread(() -> {
+ try {
+ producerEntered.countDown();
+ Chunk c = pipe.acquireForFill();
+ reused.set(c);
+ } catch (Throwable t) {
+ producerError.set(t);
+ }
+ }, "pipe-producer");
+
+ producer.start();
+ producerEntered.await();
+ // Drain so the chunk is recycled and the producer is woken via notifyAll.
+ long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5);
+ while (reused.get() == null && System.nanoTime() < deadline) {
+ pipe.pollDrain(ByteBuffer.allocate(8));
+ producer.join(50);
+ }
+
+ assertThat(producerError.get()).isNull();
+ assertThat(reused.get()).isSameAs(first);
+ assertThat(pipe.allocatedForTest()).isEqualTo(1);
+ }
+}
diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/PipeBackedRequestBodyStreamTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/PipeBackedRequestBodyStreamTest.java
new file mode 100644
index 00000000000..1add84bb4cd
--- /dev/null
+++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/PipeBackedRequestBodyStreamTest.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.crt.internal.request;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.nio.ByteBuffer;
+import org.junit.jupiter.api.Test;
+
+class PipeBackedRequestBodyStreamTest {
+
+ @Test
+ void sendRequestBody_emptyOpenPipe_returnsFalseAndCopiesNothing() {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ PipeBackedRequestBodyStream stream = new PipeBackedRequestBodyStream(pipe);
+ ByteBuffer dst = ByteBuffer.allocate(8);
+
+ boolean done = stream.sendRequestBody(dst);
+
+ assertThat(done).isFalse();
+ assertThat(dst.position()).isZero();
+ }
+
+ @Test
+ void sendRequestBody_afterEofAndDrained_returnsTrue() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ Chunk c = pipe.acquireForFill();
+ byte[] payload = {1, 2, 3};
+ System.arraycopy(payload, 0, c.data(), 0, payload.length);
+ c.len(payload.length);
+ pipe.publish(c);
+ pipe.signalEof();
+ PipeBackedRequestBodyStream stream = new PipeBackedRequestBodyStream(pipe);
+
+ ByteBuffer first = ByteBuffer.allocate(8);
+ boolean firstDone = stream.sendRequestBody(first);
+ ByteBuffer second = ByteBuffer.allocate(8);
+ boolean secondDone = stream.sendRequestBody(second);
+
+ assertThat(firstDone).isFalse();
+ assertThat(first.position()).isEqualTo(payload.length);
+ assertThat(secondDone).isTrue();
+ assertThat(second.position()).isZero();
+ }
+
+ @Test
+ void sendRequestBody_pipeInError_throwsRuntimeExceptionWithCause() {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ IllegalStateException cause = new IllegalStateException("upstream broke");
+ pipe.signalError(cause);
+ PipeBackedRequestBodyStream stream = new PipeBackedRequestBodyStream(pipe);
+
+ assertThatThrownBy(() -> stream.sendRequestBody(ByteBuffer.allocate(8)))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("Producer failed")
+ .hasRootCauseMessage("upstream broke");
+ }
+
+ @Test
+ void sendRequestBody_pipeAborted_throwsRuntimeException() {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ pipe.abort();
+ PipeBackedRequestBodyStream stream = new PipeBackedRequestBodyStream(pipe);
+
+ assertThatThrownBy(() -> stream.sendRequestBody(ByteBuffer.allocate(8)))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("aborted");
+ }
+
+ @Test
+ void resetPosition_returnsFalse() {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ PipeBackedRequestBodyStream stream = new PipeBackedRequestBodyStream(pipe);
+
+ assertThat(stream.resetPosition()).isFalse();
+ }
+
+ /**
+ * When CRT's destination buffer is smaller than the chunk size, draining a single chunk
+ * requires multiple {@code sendRequestBody} calls. This exercises {@link BodyChunkPipe#pollDrain}'s
+ * {@code pendingDrain} state being carried across consumer invocations.
+ */
+ @Test
+ void sendRequestBody_destinationSmallerThanChunk_drainsAcrossMultipleCalls() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 16);
+ Chunk c = pipe.acquireForFill();
+ byte[] payload = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
+ System.arraycopy(payload, 0, c.data(), 0, payload.length);
+ c.len(payload.length);
+ pipe.publish(c);
+ pipe.signalEof();
+ PipeBackedRequestBodyStream stream = new PipeBackedRequestBodyStream(pipe);
+
+ ByteBuffer first = ByteBuffer.allocate(3);
+ ByteBuffer second = ByteBuffer.allocate(3);
+ ByteBuffer third = ByteBuffer.allocate(3);
+ ByteBuffer fourth = ByteBuffer.allocate(3);
+ ByteBuffer fifth = ByteBuffer.allocate(3);
+ boolean firstDone = stream.sendRequestBody(first);
+ boolean secondDone = stream.sendRequestBody(second);
+ boolean thirdDone = stream.sendRequestBody(third);
+ boolean fourthDone = stream.sendRequestBody(fourth);
+ boolean fifthDone = stream.sendRequestBody(fifth);
+
+ assertThat(firstDone).isFalse();
+ assertThat(secondDone).isFalse();
+ assertThat(thirdDone).isFalse();
+ assertThat(fourthDone).isFalse();
+ assertThat(fifthDone).isTrue();
+ assertThat(first.position()).isEqualTo(3);
+ assertThat(second.position()).isEqualTo(3);
+ assertThat(third.position()).isEqualTo(3);
+ assertThat(fourth.position()).isEqualTo(1);
+ assertThat(fifth.position()).isZero();
+
+ byte[] reassembled = new byte[payload.length];
+ first.flip();
+ first.get(reassembled, 0, 3);
+ second.flip();
+ second.get(reassembled, 3, 3);
+ third.flip();
+ third.get(reassembled, 6, 3);
+ fourth.flip();
+ fourth.get(reassembled, 9, 1);
+ assertThat(reassembled).containsExactly(payload);
+ }
+}
diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPumpTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPumpTest.java
new file mode 100644
index 00000000000..21d0d4fa1ec
--- /dev/null
+++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPumpTest.java
@@ -0,0 +1,239 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.crt.internal.request;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.LockSupport;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.http.ContentStreamProvider;
+import software.amazon.awssdk.http.SdkHttpFullResponse;
+
+class SyncRequestBodyPumpTest {
+
+ @Test
+ void pump_happyPath_consumerSeesAllProducerBytes() throws Exception {
+ byte[] payload = new byte[200];
+ for (int i = 0; i < payload.length; i++) {
+ payload[i] = (byte) (i & 0xFF);
+ }
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 32);
+ SyncRequestBodyPump pump = new SyncRequestBodyPump(ContentStreamProvider.fromByteArray(payload), pipe);
+ AtomicReference producerError = new AtomicReference<>();
+ Thread producer = new Thread(() -> {
+ try {
+ pump.pump();
+ } catch (Throwable t) {
+ producerError.set(t);
+ }
+ }, "pump-producer");
+
+ producer.start();
+ byte[] consumed = drainAll(pipe, payload.length);
+ producer.join(5_000);
+
+ assertThat(producerError.get()).isNull();
+ assertThat(producer.isAlive()).isFalse();
+ assertThat(consumed).containsExactly(payload);
+ assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.EOF);
+ }
+
+ @Test
+ void pump_emptyStream_signalsEofWithoutPublish() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 16);
+ SyncRequestBodyPump pump = new SyncRequestBodyPump(ContentStreamProvider.fromByteArray(new byte[0]), pipe);
+
+ pump.pump();
+
+ assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.EOF);
+ assertThat(pipe.pollDrain(ByteBuffer.allocate(8))).isEqualTo(-1);
+ }
+
+ @Test
+ void pump_inputStreamThrowsIoException_pumpSignalsErrorAndRethrows() {
+ IOException ioe = new IOException("disk gone");
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 16);
+ ContentStreamProvider provider = () -> new InputStream() {
+ @Override
+ public int read() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ throw ioe;
+ }
+ };
+ SyncRequestBodyPump pump = new SyncRequestBodyPump(provider, pipe);
+
+ assertThatThrownBy(pump::pump).isSameAs(ioe);
+ assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.ERROR);
+ assertThatThrownBy(() -> pipe.pollDrain(ByteBuffer.allocate(8)))
+ .hasMessageContaining("Producer failed")
+ .hasRootCauseMessage("disk gone");
+ }
+
+ @Test
+ void pump_abortedWhilePumping_returnsWithoutSignalingEof() throws Exception {
+ // pipe depth 1 + payload larger than chunk forces producer to block on second acquireForFill,
+ // giving the test thread a deterministic point to call abort().
+ BodyChunkPipe pipe = new BodyChunkPipe(1, 8);
+ byte[] payload = new byte[64];
+ SyncRequestBodyPump pump = new SyncRequestBodyPump(ContentStreamProvider.fromByteArray(payload), pipe);
+ AtomicReference producerError = new AtomicReference<>();
+ Thread producer = new Thread(() -> {
+ try {
+ pump.pump();
+ } catch (Throwable t) {
+ producerError.set(t);
+ }
+ }, "pump-producer");
+
+ producer.start();
+ waitUntilStateIsOpenWithChunkInFlight(pipe);
+ pump.abort();
+ producer.join(5_000);
+
+ assertThat(producer.isAlive()).isFalse();
+ assertThat(producerError.get()).isNull();
+ assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.ABORTED);
+ }
+
+ /**
+ * Regression test for the producer-livelock-on-CRT-failure path.
+ *
+ * If CRT signals request failure (network error, idle/health timeout, etc.) while the
+ * producer is parked in {@link BodyChunkPipe#acquireForFill()}, nothing in the pipe's normal
+ * contract wakes it without a recycle/abort. The fix in {@code AwsCrtHttpClient.CrtHttpRequest.call()}
+ * registers a {@code responseFuture.whenComplete(...)} hook that calls {@code pump.abort()}
+ * when the response future completes exceptionally. This test reproduces that wiring
+ * at the unit level: a pump runs against a pipe with no consumer, the producer parks once the
+ * pipe is full, and we then complete a separate response future exceptionally with the same
+ * hook to verify the producer unblocks and {@code pump()} returns.
+ *
+ *
Without the hook (or equivalent abort path), {@code producer.join(5_000)} would time out
+ * and the test would fail.
+ */
+ @Test
+ void pump_responseFutureFailsExceptionally_whileProducerParked_unblocksProducerViaAbortHook() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ // Payload larger than depth*chunkSize forces the producer to park on acquireForFill once
+ // both chunks are sitting in the ready queue with no consumer draining.
+ byte[] payload = new byte[128];
+ SyncRequestBodyPump pump = new SyncRequestBodyPump(ContentStreamProvider.fromByteArray(payload), pipe);
+ CompletableFuture responseFuture = new CompletableFuture<>();
+ responseFuture.whenComplete((r, t) -> {
+ if (t != null) {
+ pump.abort();
+ }
+ });
+
+ AtomicReference producerError = new AtomicReference<>();
+ Thread producer = new Thread(() -> {
+ try {
+ pump.pump();
+ } catch (Throwable t) {
+ producerError.set(t);
+ }
+ }, "pump-producer");
+
+ producer.start();
+ waitUntilProducerIsParked(pipe);
+ responseFuture.completeExceptionally(new IOException("simulated CRT failure"));
+ producer.join(5_000);
+
+ assertThat(producer.isAlive()).isFalse();
+ assertThat(producerError.get()).isNull();
+ assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.ABORTED);
+ }
+
+ @Test
+ void abort_propagatesToPipe() {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ SyncRequestBodyPump pump = new SyncRequestBodyPump(
+ ContentStreamProvider.fromByteArray(new byte[0]), pipe);
+
+ pump.abort();
+
+ assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.ABORTED);
+ }
+
+ private static byte[] drainAll(BodyChunkPipe pipe, int expected) {
+ byte[] out = new byte[expected];
+ int written = 0;
+ long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5);
+ while (written < expected && System.nanoTime() < deadline) {
+ ByteBuffer scratch = ByteBuffer.allocate(Math.min(64, expected - written));
+ int n = pipe.pollDrain(scratch);
+ if (n < 0) {
+ break;
+ }
+ if (n == 0) {
+ LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1));
+ continue;
+ }
+ scratch.flip();
+ scratch.get(out, written, n);
+ written += n;
+ }
+ if (written < expected) {
+ throw new AssertionError("Drained only " + written + " of " + expected + " bytes");
+ }
+ return out;
+ }
+
+ private static void waitUntilStateIsOpenWithChunkInFlight(BodyChunkPipe pipe) throws InterruptedException {
+ long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5);
+ while (System.nanoTime() < deadline) {
+ if (pipe.allocatedForTest() >= 1) {
+ return;
+ }
+ Thread.sleep(1);
+ }
+ throw new AssertionError("Producer did not allocate a chunk within timeout");
+ }
+
+ /**
+ * Wait for the producer to park on {@code acquireForFill}. Detected by the pipe reaching its
+ * configured depth in allocations and then staying there for a couple of consecutive observations
+ * (the producer can't make further progress without a recycle).
+ */
+ private static void waitUntilProducerIsParked(BodyChunkPipe pipe) throws InterruptedException {
+ long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5);
+ int stableObservations = 0;
+ int lastAllocated = -1;
+ while (System.nanoTime() < deadline) {
+ int allocated = pipe.allocatedForTest();
+ if (allocated == lastAllocated && allocated > 0) {
+ if (++stableObservations >= 3) {
+ return;
+ }
+ } else {
+ stableObservations = 0;
+ lastAllocated = allocated;
+ }
+ Thread.sleep(20);
+ }
+ throw new AssertionError("Producer did not park within timeout");
+ }
+}
diff --git a/services/s3/src/it/java/software/amazon/awssdk/services/s3/crthttpclient/S3WithCrtHttpClientsIntegrationTests.java b/services/s3/src/it/java/software/amazon/awssdk/services/s3/crthttpclient/S3WithCrtHttpClientsIntegrationTests.java
index f1eb1c6e4f2..943635dd41f 100644
--- a/services/s3/src/it/java/software/amazon/awssdk/services/s3/crthttpclient/S3WithCrtHttpClientsIntegrationTests.java
+++ b/services/s3/src/it/java/software/amazon/awssdk/services/s3/crthttpclient/S3WithCrtHttpClientsIntegrationTests.java
@@ -16,12 +16,13 @@
package software.amazon.awssdk.services.s3.crthttpclient;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
-import org.assertj.core.api.Assertions;
+import java.time.Duration;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -32,6 +33,7 @@
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3IntegrationTestBase;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.utils.ChecksumUtils;
import software.amazon.awssdk.testutils.RandomTempFile;
import software.amazon.awssdk.utils.Md5Utils;
@@ -80,4 +82,22 @@ void getObject_toFile_objectSentCorrectly() throws Exception {
assertThat(Md5Utils.md5AsBase64(destination.toFile())).isEqualTo(Md5Utils.md5AsBase64(testFile));
}
+
+ @Test
+ void getObject_responseStreamPipedIntoPutObject_completesWithoutDeadlock() throws Exception {
+ String destinationKey = "piped-" + TEST_KEY;
+ try (ResponseInputStream sourceStream =
+ s3WithCrtHttpClient.getObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY),
+ ResponseTransformer.toInputStream())) {
+ long contentLength = sourceStream.response().contentLength();
+
+ PutObjectResponse putResponse = assertTimeoutPreemptively(
+ Duration.ofSeconds(120),
+ () -> s3WithCrtHttpClient.putObject(
+ r -> r.bucket(TEST_BUCKET).key(destinationKey).contentLength(contentLength),
+ RequestBody.fromInputStream(sourceStream, contentLength)));
+
+ assertThat(putResponse.eTag()).isNotBlank();
+ }
+ }
}
diff --git a/test/http-client-tests/src/main/java/software/amazon/awssdk/http/LongRunningRequestTestSupport.java b/test/http-client-tests/src/main/java/software/amazon/awssdk/http/LongRunningRequestTestSupport.java
index 723869ad852..21d04a5c944 100644
--- a/test/http-client-tests/src/main/java/software/amazon/awssdk/http/LongRunningRequestTestSupport.java
+++ b/test/http-client-tests/src/main/java/software/amazon/awssdk/http/LongRunningRequestTestSupport.java
@@ -20,11 +20,21 @@
import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
import com.github.tomakehurst.wiremock.junit5.WireMockExtension;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import software.amazon.awssdk.utils.Logger;
/**
* Shared helpers for the long-running request test suites.
@@ -36,6 +46,8 @@ public final class LongRunningRequestTestSupport {
public static final Duration TIME_BOUND_SAFETY_MARGIN = Duration.ofSeconds(10);
public static final Duration HANG_DELAY = Duration.ofMinutes(1);
+ private static final Logger log = Logger.loggerFor(LongRunningRequestTestSupport.class);
+
private LongRunningRequestTestSupport() {
}
@@ -72,22 +84,117 @@ public static void stubHanging(WireMockExtension mockServer) {
.withFixedDelay((int) HANG_DELAY.toMillis())));
}
+ /**
+ * Async-executes a POST against {@code mockServer} on a worker thread. A per-call random
+ * {@code testReqId} is logged BEFORE the request is dispatched and propagated to the SDK via the
+ * {@code x-aws-sdk-test-id} header so the SDK can prefix its lifecycle logs with the same id.
+ */
+ public static TestRequestExecution executeAsync(SdkHttpClient client, WireMockExtension mockServer) {
+ String testReqId = "test-" + String.format("%08x", ThreadLocalRandom.current().nextInt());
+ log.info(() -> "TEST REQUEST ID: " + testReqId + " (dispatching)");
+ CompletableFuture future = CompletableFuture.supplyAsync(() -> {
+ executeRequest(client, mockServer, testReqId);
+ return null;
+ });
+ return new TestRequestExecution(testReqId, future);
+ }
+
+ private static void executeRequest(SdkHttpClient client, WireMockExtension mockServer, String testReqId) {
+ URI uri = URI.create("http://localhost:" + mockServer.getPort());
+ SdkHttpFullRequest request = SdkHttpFullRequest.builder()
+ .uri(uri)
+ .method(SdkHttpMethod.POST)
+ .putHeader("Host", uri.getHost())
+ .putHeader("Content-Length", "4")
+ .putHeader("x-aws-sdk-test-id", testReqId)
+ .contentStreamProvider(() -> new ByteArrayInputStream(
+ "Body".getBytes(StandardCharsets.UTF_8)))
+ .build();
+ try {
+ HttpExecuteResponse response = client.prepareRequest(HttpExecuteRequest.builder()
+ .request(request)
+ .contentStreamProvider(
+ request.contentStreamProvider()
+ .orElse(null))
+ .build())
+ .call();
+ response.responseBody().ifPresent(body -> {
+ try {
+ while (body.read() != -1) {
+ // drain body so mid-body timeouts surface
+ }
+ body.close();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ public static void assertFailsWithinTimeBound(TestRequestExecution execution, Duration expectedTimeout) {
+ assertFailsWithinTimeBound(execution.future(), execution.testReqId(), expectedTimeout);
+ }
+
public static void assertFailsWithinTimeBound(CompletableFuture> future, Duration expectedTimeout) {
+ assertFailsWithinTimeBound(future, "(unknown)", expectedTimeout);
+ }
+
+ private static void assertFailsWithinTimeBound(CompletableFuture> future, String testReqId, Duration expectedTimeout) {
Duration maxWait = expectedTimeout.plus(TIME_BOUND_SAFETY_MARGIN);
try {
future.get(maxWait.toMillis(), TimeUnit.MILLISECONDS);
- throw new AssertionError("Expected request to throw an exception but it completed successfully");
+ throw new AssertionError("Expected request " + testReqId
+ + " to throw an exception but it completed successfully");
} catch (TimeoutException e) {
+ // Bookend the thread dump with the test reqId so surefire output can be grep'd by request.
+ log.error(() -> "TEST REQUEST ID: " + testReqId + " (timed out, dumping threads)");
+ log.error(() -> dumpAllThreads());
future.cancel(true);
throw new AssertionError(
- "Expected request to fail within " + maxWait + " but it was still running - client appears to hang",
+ "Expected request " + testReqId + " to fail within " + maxWait
+ + " but it was still running - client appears to hang",
e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- throw new AssertionError("Unexpected interruption while waiting for request to fail", e);
+ throw new AssertionError("Unexpected interruption while waiting for request " + testReqId + " to fail", e);
} catch (ExecutionException e) {
// expected
}
}
+
+ /**
+ * Bundles a worker future with the per-call test reqId so the assertion helper can reference it in
+ * failure messages.
+ */
+ public static final class TestRequestExecution {
+ private final String testReqId;
+ private final CompletableFuture future;
+
+ TestRequestExecution(String testReqId, CompletableFuture future) {
+ this.testReqId = testReqId;
+ this.future = future;
+ }
+
+ public String testReqId() {
+ return testReqId;
+ }
+
+ public CompletableFuture future() {
+ return future;
+ }
+ }
+
+ static String dumpAllThreads() {
+ ThreadMXBean tmx = ManagementFactory.getThreadMXBean();
+ ThreadInfo[] infos = tmx.dumpAllThreads(true, true);
+ StringBuilder sb = new StringBuilder("=== THREAD DUMP ===\n");
+ for (ThreadInfo info : infos) {
+ sb.append(info.toString()).append('\n');
+ }
+ sb.append("=== END THREAD DUMP ===\n");
+ return sb.toString();
+ }
}
diff --git a/test/http-client-tests/src/main/java/software/amazon/awssdk/http/SdkHttpClientLongRunningRequestTestSuite.java b/test/http-client-tests/src/main/java/software/amazon/awssdk/http/SdkHttpClientLongRunningRequestTestSuite.java
index 0e52eb19eb4..4403db37a42 100644
--- a/test/http-client-tests/src/main/java/software/amazon/awssdk/http/SdkHttpClientLongRunningRequestTestSuite.java
+++ b/test/http-client-tests/src/main/java/software/amazon/awssdk/http/SdkHttpClientLongRunningRequestTestSuite.java
@@ -19,19 +19,15 @@
import static software.amazon.awssdk.http.LongRunningRequestTestSupport.CONFIGURED_TIMEOUT;
import static software.amazon.awssdk.http.LongRunningRequestTestSupport.HANG_DELAY;
import static software.amazon.awssdk.http.LongRunningRequestTestSupport.assertFailsWithinTimeBound;
+import static software.amazon.awssdk.http.LongRunningRequestTestSupport.executeAsync;
import static software.amazon.awssdk.http.LongRunningRequestTestSupport.stubHanging;
import static software.amazon.awssdk.http.LongRunningRequestTestSupport.stubLongPolling;
import static software.amazon.awssdk.http.LongRunningRequestTestSupport.stubStreamingWithPauses;
import com.github.tomakehurst.wiremock.junit5.WireMockExtension;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.net.URI;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.CompletableFuture;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+import software.amazon.awssdk.http.LongRunningRequestTestSupport.TestRequestExecution;
import software.amazon.awssdk.utils.AttributeMap;
/**
@@ -56,7 +52,7 @@ public void executeWhenReadTimeoutAndServerDelaysResponseFailsWithinTimeoutBound
CONFIGURED_TIMEOUT)
.build());
try {
- assertFailsWithinTimeBound(executeAsync(client), CONFIGURED_TIMEOUT);
+ assertFailsWithinTimeBound(executeAsync(client, mockServer), CONFIGURED_TIMEOUT);
} finally {
client.close();
}
@@ -71,7 +67,7 @@ public void executeWhenReadTimeoutAndStreamingResponsePausesFailsWithinTimeoutBo
CONFIGURED_TIMEOUT)
.build());
try {
- assertFailsWithinTimeBound(executeAsync(client), CONFIGURED_TIMEOUT);
+ assertFailsWithinTimeBound(executeAsync(client, mockServer), CONFIGURED_TIMEOUT);
} finally {
client.close();
}
@@ -89,54 +85,14 @@ public void executeWhenConnectionAcquireTimeoutAndPoolExhaustedFailsWithinTimeou
CONFIGURED_TIMEOUT)
.build());
try {
- CompletableFuture> firstRequest = executeAsync(client);
+ TestRequestExecution firstRequest = executeAsync(client, mockServer);
Thread.sleep(500);
- assertFailsWithinTimeBound(executeAsync(client), CONFIGURED_TIMEOUT);
+ assertFailsWithinTimeBound(executeAsync(client, mockServer), CONFIGURED_TIMEOUT);
- firstRequest.cancel(true);
+ firstRequest.future().cancel(true);
} finally {
client.close();
}
}
-
- private CompletableFuture executeAsync(SdkHttpClient client) {
- return CompletableFuture.supplyAsync(() -> {
- executeRequest(client);
- return null;
- });
- }
-
- private void executeRequest(SdkHttpClient client) {
- URI uri = URI.create("http://localhost:" + mockServer.getPort());
- SdkHttpFullRequest request = SdkHttpFullRequest.builder()
- .uri(uri)
- .method(SdkHttpMethod.POST)
- .putHeader("Host", uri.getHost())
- .putHeader("Content-Length", "4")
- .contentStreamProvider(() -> new ByteArrayInputStream(
- "Body".getBytes(StandardCharsets.UTF_8)))
- .build();
- try {
- HttpExecuteResponse response = client.prepareRequest(HttpExecuteRequest.builder()
- .request(request)
- .contentStreamProvider(
- request.contentStreamProvider()
- .orElse(null))
- .build())
- .call();
- response.responseBody().ifPresent(body -> {
- try {
- while (body.read() != -1) {
- // drain body so mid-body timeouts surface
- }
- body.close();
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- });
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
}