streamFuture() {
+ return streamFuture;
+ }
}
}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipe.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipe.java
new file mode 100644
index 00000000000..fdaed9a7bc6
--- /dev/null
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipe.java
@@ -0,0 +1,241 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.crt.internal.request;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import software.amazon.awssdk.annotations.SdkInternalApi;
+import software.amazon.awssdk.annotations.SdkTestInternalApi;
+
+/**
+ * Bounded producer/consumer hand-off between the caller thread (producer) and the CRT event-loop thread (consumer).
+ *
+ * The producer reads from the customer's {@code InputStream} and {@link #publish(Chunk) publishes} chunks
+ * into a bounded {@link ArrayBlockingQueue}. The consumer drains those chunks via {@link #pollDrain(ByteBuffer)},
+ * which is non-blocking: if no data is ready the consumer returns 0 bytes and CRT reschedules itself via
+ * {@code aws_channel_schedule_task_now}.
+ *
+ *
Drained chunks are returned to a free {@link ArrayDeque} (LIFO for cache hotness) guarded by this
+ * monitor. The producer parks on this monitor when the free deque is empty, providing back-pressure.
+ *
+ *
Chunk byte[] buffers are allocated lazily on the producer's first {@link #acquireForFill()}, not in
+ * the constructor. This keeps per-request heap minimal while a request is queued on the CRT connection
+ * pool waiting for a stream: the pipe object exists but its backing buffers do not.
+ *
+ *
State machine: {@code OPEN -> {EOF | ERROR | ABORTED}}. Transitions are one-way.
+ */
+@SdkInternalApi
+final class BodyChunkPipe {
+
+ enum State {
+ OPEN,
+ EOF,
+ ERROR,
+ ABORTED
+ }
+
+ /**
+ * Defense-in-depth wait timeout for {@link #acquireForFill()}. Even if a code path forgets
+ * to call {@link #abort()}, a parked producer wakes every {@value} ms to re-check state.
+ * Spurious wakeups are harmless.
+ */
+ private static final long ACQUIRE_WAIT_TIMEOUT_MS = 50L;
+
+ private final int depth;
+ private final int chunkSize;
+ private final ArrayBlockingQueue ready;
+ private final Deque free;
+ private final AtomicReference state = new AtomicReference<>(State.OPEN);
+ /**
+ * Guards the free deque, allocated counter, and producer wait/notify protocol. Kept private
+ * so external code cannot synchronize on the pipe instance and stall the producer.
+ */
+ private final Object freeLock = new Object();
+
+ private int allocated;
+ private volatile Throwable error;
+ private Chunk pendingDrain;
+
+ BodyChunkPipe(int depth, int chunkSize) {
+ if (depth < 1) {
+ throw new IllegalArgumentException("depth must be >= 1");
+ }
+ if (chunkSize < 1) {
+ throw new IllegalArgumentException("chunkSize must be >= 1");
+ }
+ this.depth = depth;
+ this.chunkSize = chunkSize;
+ this.ready = new ArrayBlockingQueue<>(depth);
+ this.free = new ArrayDeque<>(depth);
+ }
+
+ /**
+ * Producer side: acquire a chunk to fill. Blocks if all chunks are currently in flight.
+ * Returns {@code null} only if the pipe was aborted while the producer was waiting.
+ *
+ * Allocates the chunk's backing byte[] on first use up to the configured depth. This keeps the
+ * per-request footprint minimal until the producer actually starts pumping (i.e., until after the
+ * CRT stream has been acquired).
+ */
+ Chunk acquireForFill() throws InterruptedException {
+ synchronized (freeLock) {
+ while (true) {
+ State s = state.get();
+ if (s == State.ABORTED || s == State.ERROR) {
+ return null;
+ }
+ Chunk c = free.pollFirst();
+ if (c != null) {
+ return c;
+ }
+ if (allocated < depth) {
+ allocated++;
+ return new Chunk(chunkSize);
+ }
+ freeLock.wait(ACQUIRE_WAIT_TIMEOUT_MS);
+ }
+ }
+ }
+
+ /**
+ * Producer side: publish a filled chunk to the consumer. Caller must have set
+ * {@link Chunk#len(int)} before calling.
+ *
+ *
If the chunk is empty (zero-length read), it is recycled back to the free deque rather than
+ * pushed to the ready queue: an empty chunk would otherwise be leaked from the bounded pool, and
+ * the consumer would interpret it as a no-op anyway.
+ */
+ void publish(Chunk chunk) throws InterruptedException {
+ if (chunk.len() == 0) {
+ recycle(chunk);
+ return;
+ }
+ // ready.put() blocks if the queue is full, but the queue capacity == pool size,
+ // so this can only block briefly while the consumer drains.
+ ready.put(chunk);
+ }
+
+ /**
+ * Producer side: signal end-of-stream. Idempotent.
+ */
+ void signalEof() {
+ state.compareAndSet(State.OPEN, State.EOF);
+ }
+
+ /**
+ * Producer side: signal a fatal producer-side error. Idempotent.
+ */
+ void signalError(Throwable t) {
+ synchronized (freeLock) {
+ // Publish the cause BEFORE flipping state so a consumer's lock-free read in pollDrain
+ // never observes state==ERROR with error==null. The volatile write to `error` is
+ // harmless if the CAS later loses (idempotent signal).
+ error = t;
+ state.compareAndSet(State.OPEN, State.ERROR);
+ freeLock.notifyAll();
+ }
+ }
+
+ /**
+ * External-cancel: clear ready queue, flip state, wake producer.
+ */
+ void abort() {
+ synchronized (freeLock) {
+ if (state.compareAndSet(State.OPEN, State.ABORTED)) {
+ ready.clear();
+ }
+ freeLock.notifyAll();
+ }
+ }
+
+ /**
+ * Consumer side: drain bytes into {@code dst}. NEVER blocks.
+ *
+ *
Single-consumer: CRT invokes this only on the request's outgoing-stream task, which is
+ * scheduled serially on one event-loop thread per stream. {@code pendingDrain} is therefore
+ * not volatile - it is written and read by that single consumer thread.
+ *
+ * @return number of bytes drained, or {@code -1} on EOF with no remaining data.
+ * @throws RuntimeException if the pipe is in ERROR or ABORTED state with no remaining data.
+ */
+ int pollDrain(ByteBuffer dst) {
+ int totalBytesConsumed = 0;
+ while (dst.hasRemaining()) {
+ if (pendingDrain == null) {
+ pendingDrain = ready.poll();
+ }
+ if (pendingDrain == null) {
+ switch (state.get()) {
+ case ERROR:
+ throw new RuntimeException("Producer failed", error);
+ case ABORTED:
+ throw new RuntimeException("Request body stream was aborted");
+ case EOF:
+ return totalBytesConsumed > 0 ? totalBytesConsumed : -1;
+ case OPEN:
+ default:
+ // OPEN with empty queue: return what we have (possibly 0); CRT will retry.
+ return totalBytesConsumed;
+ }
+ }
+ int n = Math.min(dst.remaining(), pendingDrain.len() - pendingDrain.pos());
+ dst.put(pendingDrain.data(), pendingDrain.pos(), n);
+ pendingDrain.pos(pendingDrain.pos() + n);
+ totalBytesConsumed += n;
+ if (pendingDrain.pos() >= pendingDrain.len()) {
+ // The chunk has been fully copied into dst, so we return it to the free deque
+ // (and notify the producer in case it was waiting). This is what bounds the pool:
+ // chunks only re-enter the producer pool after the consumer has drained them.
+ Chunk drained = pendingDrain;
+ pendingDrain = null;
+ recycle(drained);
+ }
+ }
+ return totalBytesConsumed;
+ }
+
+ /**
+ * Visible-for-test / test-only helper: current pipe state.
+ */
+ @SdkTestInternalApi
+ State state() {
+ return state.get();
+ }
+
+ /**
+ * Visible-for-test / test-only helper: number of {@link Chunk} buffers minted so far. The pipe
+ * lazily allocates chunks on the producer's first {@link #acquireForFill()}, so this is 0 until
+ * the producer starts pumping and grows up to {@code depth}.
+ */
+ @SdkTestInternalApi
+ int allocatedForTest() {
+ synchronized (freeLock) {
+ return allocated;
+ }
+ }
+
+ private void recycle(Chunk c) {
+ c.pos(0);
+ c.len(0);
+ synchronized (freeLock) {
+ free.push(c);
+ freeLock.notifyAll();
+ }
+ }
+}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/Chunk.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/Chunk.java
new file mode 100644
index 00000000000..a7e97b292aa
--- /dev/null
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/Chunk.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.crt.internal.request;
+
+import software.amazon.awssdk.annotations.SdkInternalApi;
+
+/**
+ * A reusable byte buffer carrying request body data through the {@link BodyChunkPipe}.
+ *
+ *
Internal to this package; used only by {@link BodyChunkPipe} and {@link SyncRequestBodyPump}.
+ *
+ *
{@code pos} and {@code len} are intentionally non-volatile: hand-off between the producer and
+ * consumer always goes through the {@code ArrayBlockingQueue} (or the {@code freeLock} monitor on
+ * recycle), both of which provide release/acquire happens-before for the field writes.
+ */
+@SdkInternalApi
+final class Chunk {
+ private final byte[] data;
+ private int pos;
+ private int len;
+
+ Chunk(int chunkSize) {
+ this.data = new byte[chunkSize];
+ }
+
+ byte[] data() {
+ return data;
+ }
+
+ int pos() {
+ return pos;
+ }
+
+ void pos(int pos) {
+ this.pos = pos;
+ }
+
+ int len() {
+ return len;
+ }
+
+ void len(int len) {
+ this.len = len;
+ }
+}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java
index 8672d80b0d1..4a3991c5d44 100644
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java
@@ -23,6 +23,7 @@
import software.amazon.awssdk.crt.http.HttpHeader;
import software.amazon.awssdk.crt.http.HttpRequest;
import software.amazon.awssdk.crt.http.HttpRequestBase;
+import software.amazon.awssdk.http.ContentStreamProvider;
import software.amazon.awssdk.http.Header;
import software.amazon.awssdk.http.HttpExecuteRequest;
import software.amazon.awssdk.http.Protocol;
@@ -33,6 +34,16 @@
@SdkInternalApi
public final class CrtRequestAdapter {
+ /**
+ * Per-chunk size used by the sync request-body pipe.
+ */
+ private static final int CHUNK_SIZE = 128 * 1024;
+
+ /**
+ * Number of in-flight chunks the pipe holds.
+ */
+ private static final int PIPE_DEPTH = 4;
+
private CrtRequestAdapter() {
}
@@ -60,7 +71,12 @@ public static HttpRequestBase toAsyncCrtRequest(CrtAsyncRequestContext request)
crtRequestBodyAdapter);
}
- public static HttpRequest toCrtRequest(CrtRequestContext request) {
+ /**
+ * Build the CRT request for the sync path. When the SDK request has a body, this also constructs the
+ * {@link BodyChunkPipe} and a {@link SyncRequestBodyPump}; the caller thread is expected to drive
+ * the pump after the stream is activated.
+ */
+ public static SyncCrtRequest toCrtRequest(CrtRequestContext request) {
HttpExecuteRequest sdkExecuteRequest = request.sdkRequest();
SdkHttpRequest sdkRequest = sdkExecuteRequest.httpRequest();
@@ -78,14 +94,39 @@ public static HttpRequest toCrtRequest(CrtRequestContext request) {
HttpHeader[] crtHeaderArray = asArray(createHttpHeaderList(sdkRequest.getUri(), sdkExecuteRequest));
String finalEncodedPath = encodedPath + encodedQueryString;
- return sdkExecuteRequest.contentStreamProvider()
- .map(provider -> new HttpRequest(method,
- finalEncodedPath,
- crtHeaderArray,
- new CrtRequestInputStreamAdapter(provider)))
- .orElse(new HttpRequest(method,
- finalEncodedPath,
- crtHeaderArray, null));
+
+ Optional providerOpt = sdkExecuteRequest.contentStreamProvider();
+ if (!providerOpt.isPresent()) {
+ return new SyncCrtRequest(new HttpRequest(method, finalEncodedPath, crtHeaderArray, null), null);
+ }
+
+ BodyChunkPipe pipe = new BodyChunkPipe(PIPE_DEPTH, CHUNK_SIZE);
+ PipeBackedRequestBodyStream bodyStream = new PipeBackedRequestBodyStream(pipe);
+ SyncRequestBodyPump pump = new SyncRequestBodyPump(providerOpt.get(), pipe);
+ HttpRequest crtRequest = new HttpRequest(method, finalEncodedPath, crtHeaderArray, bodyStream);
+ return new SyncCrtRequest(crtRequest, pump);
+ }
+
+ /**
+ * Holder returned from {@link #toCrtRequest(CrtRequestContext)} bundling the CRT-side request and the
+ * caller-thread producer pump (null when the SDK request has no body).
+ */
+ public static final class SyncCrtRequest {
+ private final HttpRequest httpRequest;
+ private final SyncRequestBodyPump pump;
+
+ SyncCrtRequest(HttpRequest httpRequest, SyncRequestBodyPump pump) {
+ this.httpRequest = httpRequest;
+ this.pump = pump;
+ }
+
+ public HttpRequest httpRequest() {
+ return httpRequest;
+ }
+
+ public SyncRequestBodyPump pump() {
+ return pump;
+ }
}
private static HttpHeader[] asArray(List crtHeaderList) {
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestInputStreamAdapter.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestInputStreamAdapter.java
deleted file mode 100644
index 68f418b9e1d..00000000000
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestInputStreamAdapter.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License").
- * You may not use this file except in compliance with the License.
- * A copy of the License is located at
- *
- * http://aws.amazon.com/apache2.0
- *
- * or in the "license" file accompanying this file. This file is distributed
- * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package software.amazon.awssdk.http.crt.internal.request;
-
-import static java.lang.Math.min;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import software.amazon.awssdk.annotations.SdkInternalApi;
-import software.amazon.awssdk.crt.http.HttpRequestBodyStream;
-import software.amazon.awssdk.http.ContentStreamProvider;
-
-@SdkInternalApi
-final class CrtRequestInputStreamAdapter implements HttpRequestBodyStream {
- private static final int READ_BUFFER_SIZE = 16 * 1024;
-
- private final ContentStreamProvider provider;
- private volatile InputStream providerStream;
- private final byte[] readBuffer = new byte[READ_BUFFER_SIZE];
-
- CrtRequestInputStreamAdapter(ContentStreamProvider provider) {
- this.provider = provider;
- }
-
- @Override
- public boolean sendRequestBody(ByteBuffer bodyBytesOut) {
- int read;
-
- try {
- if (providerStream == null) {
- createNewStream();
- }
-
- int toRead = min(READ_BUFFER_SIZE, bodyBytesOut.remaining());
- read = providerStream.read(readBuffer, 0, toRead);
-
- if (read > 0) {
- bodyBytesOut.put(readBuffer, 0, read);
- }
- } catch (IOException ioe) {
- throw new RuntimeException(ioe);
- }
-
- return read < 0;
- }
-
- @Override
- public boolean resetPosition() {
- try {
- createNewStream();
- } catch (IOException ioe) {
- throw new RuntimeException(ioe);
- }
-
- return true;
- }
-
- private void createNewStream() throws IOException {
- if (providerStream != null) {
- providerStream.close();
- }
- providerStream = provider.newStream();
- }
-}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/PipeBackedRequestBodyStream.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/PipeBackedRequestBodyStream.java
new file mode 100644
index 00000000000..a2dd78eea0c
--- /dev/null
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/PipeBackedRequestBodyStream.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.crt.internal.request;
+
+import java.nio.ByteBuffer;
+import software.amazon.awssdk.annotations.SdkInternalApi;
+import software.amazon.awssdk.crt.http.HttpRequestBodyStream;
+
+/**
+ * A {@link HttpRequestBodyStream} adapter whose {@link #sendRequestBody(ByteBuffer)} drains bytes from a
+ * {@link BodyChunkPipe} that is fed by the caller thread. The pull callback NEVER blocks: if no data is ready,
+ * it returns 0 bytes and CRT reschedules the outgoing-stream task via {@code aws_channel_schedule_task_now},
+ * allowing other event-loop tasks (such as a concurrent GET response delivery) to run before the retry.
+ */
+@SdkInternalApi
+final class PipeBackedRequestBodyStream implements HttpRequestBodyStream {
+
+ private final BodyChunkPipe pipe;
+
+ PipeBackedRequestBodyStream(BodyChunkPipe pipe) {
+ this.pipe = pipe;
+ }
+
+ @Override
+ public boolean sendRequestBody(ByteBuffer bodyBytesOut) {
+ int drained = pipe.pollDrain(bodyBytesOut);
+ return drained < 0;
+ }
+
+ @Override
+ public boolean resetPosition() {
+ // The SDK retry layer (RetryableStage) handles request-level retries by calling prepareRequest() again,
+ // CRT does not currently exercise resetPosition for HTTP/1.1, so opting out is safe in practice.
+ return false;
+ }
+}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPump.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPump.java
new file mode 100644
index 00000000000..dbacf58eede
--- /dev/null
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPump.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.crt.internal.request;
+
+import java.io.IOException;
+import java.io.InputStream;
+import software.amazon.awssdk.annotations.SdkInternalApi;
+import software.amazon.awssdk.http.ContentStreamProvider;
+
+/**
+ * Caller-thread producer that reads from the customer's {@link InputStream} and publishes chunks to a
+ * {@link BodyChunkPipe}. Runs on the caller (sync) thread between stream activation and
+ * {@code responseFuture.join()}, ensuring the blocking {@code read()} happens off the CRT event loop.
+ */
+@SdkInternalApi
+public final class SyncRequestBodyPump {
+
+ private final ContentStreamProvider contentStreamProvider;
+ private final BodyChunkPipe pipe;
+
+ SyncRequestBodyPump(ContentStreamProvider contentStreamProvider, BodyChunkPipe pipe) {
+ this.contentStreamProvider = contentStreamProvider;
+ this.pipe = pipe;
+ }
+
+ /**
+ * Pump the entire input stream into the pipe. Runs on the caller thread; never invoked on the CRT
+ * event-loop thread. On EOF signals the pipe normally; on {@link IOException} signals an error and rethrows.
+ */
+ public void pump() throws IOException {
+ try (InputStream in = contentStreamProvider.newStream()) {
+ while (true) {
+ Chunk chunk = pipe.acquireForFill();
+ if (chunk == null) {
+ // pipe was aborted while we were waiting; stop without signaling EOF.
+ return;
+ }
+ int read;
+ try {
+ read = in.read(chunk.data(), 0, chunk.data().length);
+ } catch (IOException ioe) {
+ pipe.signalError(ioe);
+ throw ioe;
+ }
+ if (read < 0) {
+ pipe.signalEof();
+ return;
+ }
+ chunk.pos(0);
+ chunk.len(read);
+ pipe.publish(chunk);
+ }
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ pipe.abort();
+ throw new IOException("Interrupted while writing request body", ie);
+ }
+ }
+
+ /**
+ * Abort the underlying pipe (e.g., when the caller's {@code call()} is cancelled).
+ */
+ public void abort() {
+ pipe.abort();
+ }
+}
diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientWireMockTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientWireMockTest.java
index ce5d778f06a..5171451a4f0 100644
--- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientWireMockTest.java
+++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientWireMockTest.java
@@ -17,8 +17,13 @@
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.any;
+import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.equalToIgnoreCase;
+import static com.github.tomakehurst.wiremock.client.WireMock.put;
+import static com.github.tomakehurst.wiremock.client.WireMock.putRequestedFor;
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.verify;
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
import static org.assertj.core.api.Assertions.assertThat;
@@ -26,13 +31,21 @@
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.PROTOCOL;
import static software.amazon.awssdk.http.crt.CrtHttpClientTestUtils.createRequest;
+import com.github.tomakehurst.wiremock.http.Fault;
import com.github.tomakehurst.wiremock.junit.WireMockRule;
import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -45,6 +58,8 @@
import software.amazon.awssdk.http.HttpMetric;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.http.SdkHttpFullRequest;
+import software.amazon.awssdk.http.SdkHttpMethod;
import software.amazon.awssdk.http.SdkHttpRequest;
import software.amazon.awssdk.metrics.MetricCollection;
import software.amazon.awssdk.metrics.MetricCollector;
@@ -133,6 +148,294 @@ public void abortRequest_shouldFailTheExceptionWithIOException() throws Exceptio
}
}
+ @Test
+ public void putRequest_withInputStreamBody_serverReceivesBody() throws Exception {
+ try (SdkHttpClient client = AwsCrtHttpClient.create()) {
+ String body = "hello pull pump";
+ byte[] bodyBytes = body.getBytes(StandardCharsets.UTF_8);
+ URI uri = URI.create("http://localhost:" + mockServer.port());
+ stubFor(put(urlPathEqualTo("/sink")).willReturn(aResponse().withStatus(200)));
+
+ SdkHttpFullRequest request = SdkHttpFullRequest.builder()
+ .uri(uri)
+ .method(SdkHttpMethod.PUT)
+ .encodedPath("/sink")
+ .putHeader("Host", uri.getHost())
+ .putHeader("Content-Length", Integer.toString(bodyBytes.length))
+ .build();
+
+ HttpExecuteRequest executeRequest = HttpExecuteRequest.builder()
+ .request(request)
+ .contentStreamProvider(() -> new ByteArrayInputStream(bodyBytes))
+ .build();
+
+ HttpExecuteResponse response = client.prepareRequest(executeRequest).call();
+
+ assertThat(response.httpResponse().statusCode()).isEqualTo(200);
+ verify(putRequestedFor(urlPathEqualTo("/sink"))
+ .withHeader("Content-Length", equalTo(Integer.toString(bodyBytes.length)))
+ .withRequestBody(equalToIgnoreCase(body)));
+ }
+ }
+
+ @Test
+ public void inputStreamThrows_connectionReturnedToPool_subsequentRequestSucceeds() throws Exception {
+ // Bound the pool to a single connection: if the failed request leaks its connection, the
+ // second call() either fails to acquire (with the explicit timeout below) or blocks until
+ // the test framework times out. Either manifests as a deterministic failure rather than a hang.
+ try (SdkHttpClient client = AwsCrtHttpClient.builder()
+ .maxConcurrency(1)
+ .connectionAcquisitionTimeout(Duration.ofSeconds(10))
+ .build()) {
+ URI uri = URI.create("http://localhost:" + mockServer.port());
+ stubFor(put(urlPathEqualTo("/sink")).willReturn(aResponse().withStatus(200)));
+
+ IOException expected = new IOException("simulated upstream failure");
+ SdkHttpFullRequest failingRequest = SdkHttpFullRequest.builder()
+ .uri(uri)
+ .method(SdkHttpMethod.PUT)
+ .encodedPath("/sink")
+ .putHeader("Host", uri.getHost())
+ .putHeader("Content-Length", "100")
+ .build();
+ HttpExecuteRequest failingExecute =
+ HttpExecuteRequest.builder()
+ .request(failingRequest)
+ .contentStreamProvider(() -> new InputStream() {
+ @Override
+ public int read() throws IOException {
+ throw expected;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ throw expected;
+ }
+ })
+ .build();
+
+ assertThatThrownBy(() -> client.prepareRequest(failingExecute).call())
+ .isInstanceOf(IOException.class);
+
+ // If the previous failure leaked the connection, this second call would fail to acquire
+ // (bounded by the connectionAcquisitionTimeout configured above) instead of hanging.
+ String body = "second request body";
+ byte[] bodyBytes = body.getBytes(StandardCharsets.UTF_8);
+ SdkHttpFullRequest okRequest = SdkHttpFullRequest.builder()
+ .uri(uri)
+ .method(SdkHttpMethod.PUT)
+ .encodedPath("/sink")
+ .putHeader("Host", uri.getHost())
+ .putHeader("Content-Length", Integer.toString(bodyBytes.length))
+ .build();
+ HttpExecuteRequest okExecute = HttpExecuteRequest.builder()
+ .request(okRequest)
+ .contentStreamProvider(() -> new ByteArrayInputStream(bodyBytes))
+ .build();
+
+ HttpExecuteResponse response = client.prepareRequest(okExecute).call();
+ assertThat(response.httpResponse().statusCode()).isEqualTo(200);
+ }
+ }
+
+ @Test
+ public void abortMidRequest_connectionReturnedToPool_subsequentRequestSucceeds() throws Exception {
+ try (SdkHttpClient client = AwsCrtHttpClient.builder()
+ .maxConcurrency(1)
+ .connectionAcquisitionTimeout(Duration.ofSeconds(10))
+ .build()) {
+ URI uri = URI.create("http://localhost:" + mockServer.port());
+ stubFor(any(urlPathEqualTo("/")).willReturn(aResponse().withFixedDelay(2000).withBody("hello")));
+ stubFor(put(urlPathEqualTo("/sink")).willReturn(aResponse().withStatus(200)));
+
+ SdkHttpRequest delayedRequest = createRequest(uri);
+ HttpExecuteRequest delayedExecute = HttpExecuteRequest.builder()
+ .request(delayedRequest)
+ .contentStreamProvider(() -> new ByteArrayInputStream(new byte[0]))
+ .build();
+
+ ExecutableHttpRequest abortable = client.prepareRequest(delayedExecute);
+ executorService.schedule(abortable::abort, 100, TimeUnit.MILLISECONDS);
+ assertThatThrownBy(abortable::call).isInstanceOf(IOException.class).hasMessageContaining("cancelled");
+
+ String body = "after abort";
+ byte[] bodyBytes = body.getBytes(StandardCharsets.UTF_8);
+ SdkHttpFullRequest okRequest = SdkHttpFullRequest.builder()
+ .uri(uri)
+ .method(SdkHttpMethod.PUT)
+ .encodedPath("/sink")
+ .putHeader("Host", uri.getHost())
+ .putHeader("Content-Length", Integer.toString(bodyBytes.length))
+ .build();
+ HttpExecuteRequest okExecute = HttpExecuteRequest.builder()
+ .request(okRequest)
+ .contentStreamProvider(() -> new ByteArrayInputStream(bodyBytes))
+ .build();
+ HttpExecuteResponse response = client.prepareRequest(okExecute).call();
+ assertThat(response.httpResponse().statusCode()).isEqualTo(200);
+ }
+ }
+
+ @Test
+ public void serverResetsConnection_connectionReturnedToPool_subsequentRequestSucceeds() throws Exception {
+ try (SdkHttpClient client = AwsCrtHttpClient.builder()
+ .maxConcurrency(1)
+ .connectionAcquisitionTimeout(Duration.ofSeconds(10))
+ .build()) {
+ URI uri = URI.create("http://localhost:" + mockServer.port());
+ stubFor(put(urlPathEqualTo("/sink"))
+ .willReturn(aResponse().withFault(Fault.CONNECTION_RESET_BY_PEER)));
+
+ byte[] bodyBytes = randomAlphabetic(64).getBytes(StandardCharsets.UTF_8);
+ SdkHttpFullRequest failingRequest = SdkHttpFullRequest.builder()
+ .uri(uri)
+ .method(SdkHttpMethod.PUT)
+ .encodedPath("/sink")
+ .putHeader("Host", uri.getHost())
+ .putHeader("Content-Length", Integer.toString(bodyBytes.length))
+ .build();
+ HttpExecuteRequest failingExecute = HttpExecuteRequest.builder()
+ .request(failingRequest)
+ .contentStreamProvider(() -> new ByteArrayInputStream(bodyBytes))
+ .build();
+
+ assertThatThrownBy(() -> client.prepareRequest(failingExecute).call())
+ .isInstanceOf(IOException.class);
+
+ stubFor(put(urlPathEqualTo("/sink2")).willReturn(aResponse().withStatus(200)));
+ byte[] okBytes = "ok".getBytes(StandardCharsets.UTF_8);
+ SdkHttpFullRequest okRequest = SdkHttpFullRequest.builder()
+ .uri(uri)
+ .method(SdkHttpMethod.PUT)
+ .encodedPath("/sink2")
+ .putHeader("Host", uri.getHost())
+ .putHeader("Content-Length", Integer.toString(okBytes.length))
+ .build();
+ HttpExecuteRequest okExecute = HttpExecuteRequest.builder()
+ .request(okRequest)
+ .contentStreamProvider(() -> new ByteArrayInputStream(okBytes))
+ .build();
+
+ HttpExecuteResponse response = client.prepareRequest(okExecute).call();
+ assertThat(response.httpResponse().statusCode()).isEqualTo(200);
+ }
+ }
+
+ /**
+ * Regression test for the deadlock the pull-pump fix addresses. On master, the request body's
+ * {@code InputStream.read(...)} ran on the CRT event-loop thread (via the body callback), which
+ * meant a body sourced from a {@code GET}'s {@code ResponseInputStream} on the same event loop
+ * could deadlock: the GET held the event loop while the PUT body waited for it.
+ *
+ * Pull-pump moves the read to the caller (sync) thread. This test verifies that load-bearing
+ * claim by recording the thread that performs the body read and asserting it is the caller
+ * thread - not a CRT event-loop thread. Failure of either the assertion or the test timeout
+ * (a hang) is the deadlock signal.
+ */
+ @Test
+ public void putBodyReadHappensOnCallerThread_notOnCrtEventLoop() throws Exception {
+ try (SdkHttpClient client = AwsCrtHttpClient.builder()
+ .maxConcurrency(1)
+ .connectionAcquisitionTimeout(Duration.ofSeconds(10))
+ .build()) {
+ URI uri = URI.create("http://localhost:" + mockServer.port());
+ stubFor(put(urlPathEqualTo("/sink")).willReturn(aResponse().withStatus(200)));
+
+ byte[] bodyBytes = "body-on-caller".getBytes(StandardCharsets.UTF_8);
+ AtomicReference readThreadName = new AtomicReference<>();
+ SdkHttpFullRequest request = SdkHttpFullRequest.builder()
+ .uri(uri)
+ .method(SdkHttpMethod.PUT)
+ .encodedPath("/sink")
+ .putHeader("Host", uri.getHost())
+ .putHeader("Content-Length", Integer.toString(bodyBytes.length))
+ .build();
+ HttpExecuteRequest executeRequest =
+ HttpExecuteRequest.builder()
+ .request(request)
+ .contentStreamProvider(() -> new ByteArrayInputStream(bodyBytes) {
+ @Override
+ public synchronized int read(byte[] b, int off, int len) {
+ readThreadName.compareAndSet(null, Thread.currentThread().getName());
+ return super.read(b, off, len);
+ }
+ })
+ .build();
+
+ String callerThreadName = Thread.currentThread().getName();
+ HttpExecuteResponse response = client.prepareRequest(executeRequest).call();
+ assertThat(response.httpResponse().statusCode()).isEqualTo(200);
+
+ String observed = readThreadName.get();
+ assertThat(observed)
+ .as("body read should happen on the caller thread, not the CRT event loop")
+ .isNotNull()
+ .isEqualTo(callerThreadName)
+ .doesNotContainIgnoringCase("AwsEventLoop")
+ .doesNotContainIgnoringCase("aws-event-loop");
+ }
+ }
+
+ /**
+ * Stress companion to {@link #putBodyReadHappensOnCallerThread_notOnCrtEventLoop}. Issues a
+ * delayed GET (response delayed server-side) and a PUT in parallel through the same
+ * {@code maxConcurrency(1)} client. On master, sequencing them through a single connection
+ * with the body read tied to the event-loop thread could deadlock; here both calls must
+ * complete within the test timeout.
+ */
+ @Test
+ public void getInFlight_concurrentPut_bothComplete() throws Exception {
+ try (SdkHttpClient client = AwsCrtHttpClient.builder()
+ .maxConcurrency(1)
+ .connectionAcquisitionTimeout(Duration.ofSeconds(15))
+ .build()) {
+ URI uri = URI.create("http://localhost:" + mockServer.port());
+ stubFor(any(urlPathEqualTo("/slow"))
+ .willReturn(aResponse().withFixedDelay(2_000).withBody("hello")));
+ stubFor(put(urlPathEqualTo("/sink")).willReturn(aResponse().withStatus(200)));
+
+ SdkHttpRequest getRequest = SdkHttpFullRequest.builder()
+ .uri(uri)
+ .method(SdkHttpMethod.GET)
+ .encodedPath("/slow")
+ .putHeader("Host", uri.getHost())
+ .build();
+ HttpExecuteRequest getExecute = HttpExecuteRequest.builder()
+ .request(getRequest)
+ .contentStreamProvider(() -> new ByteArrayInputStream(new byte[0]))
+ .build();
+
+ byte[] putBytes = "put-body".getBytes(StandardCharsets.UTF_8);
+ SdkHttpFullRequest putRequest = SdkHttpFullRequest.builder()
+ .uri(uri)
+ .method(SdkHttpMethod.PUT)
+ .encodedPath("/sink")
+ .putHeader("Host", uri.getHost())
+ .putHeader("Content-Length", Integer.toString(putBytes.length))
+ .build();
+ HttpExecuteRequest putExecute = HttpExecuteRequest.builder()
+ .request(putRequest)
+ .contentStreamProvider(() -> new ByteArrayInputStream(putBytes))
+ .build();
+
+ ExecutorService pool = Executors.newFixedThreadPool(2);
+ try {
+ Callable getTask = () -> client.prepareRequest(getExecute).call();
+ Callable putTask = () -> client.prepareRequest(putExecute).call();
+ Future getFuture = pool.submit(getTask);
+ Future putFuture = pool.submit(putTask);
+
+ HttpExecuteResponse getResponse = getFuture.get(15, TimeUnit.SECONDS);
+ HttpExecuteResponse putResponse = putFuture.get(15, TimeUnit.SECONDS);
+ assertThat(getResponse.httpResponse().statusCode()).isEqualTo(200);
+ assertThat(putResponse.httpResponse().statusCode()).isEqualTo(200);
+ } finally {
+ pool.shutdownNow();
+ pool.awaitTermination(5, TimeUnit.SECONDS);
+ }
+ }
+ }
+
/**
* Make a simple request and wait for it to finish.
*
diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java
index 456000ac115..8b916ba1ee9 100644
--- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java
+++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java
@@ -87,7 +87,7 @@ public void execute_requestConversionFails_failsFuture() {
.request(HttpExecuteRequest.builder().build())
.build();
- CompletableFuture executeFuture = requestExecutor.execute(context);
+ CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture();
assertThat(executeFuture).hasFailedWithThrowableThat().isInstanceOf(NullPointerException.class);
}
@@ -102,7 +102,7 @@ public void execute_acquireStreamFails_wrapsWithIOException() {
.thenReturn(completableFuture);
completableFuture.completeExceptionally(exception);
- CompletableFuture executeFuture = requestExecutor.execute(context);
+ CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture();
assertThat(executeFuture).hasFailedWithThrowableThat().hasCause(exception).isInstanceOf(IOException.class);
}
@@ -116,7 +116,7 @@ public void execute_retryableException_wrapsWithIOException(Throwable throwable)
Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class)))
.thenReturn(completableFuture);
- CompletableFuture executeFuture = requestExecutor.execute(context);
+ CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture();
assertThat(executeFuture).hasFailedWithThrowableThat().hasCause(throwable).isInstanceOf(IOException.class);
}
@@ -133,7 +133,7 @@ public void execute_httpException_mapsToCorrectException(Entry executeFuture = requestExecutor.execute(context);
+ CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture();
assertThatThrownBy(executeFuture::join).hasCauseInstanceOf(expectedExceptionClass);
}
@@ -146,7 +146,7 @@ public void execute_nonRetryableHttpException_doesNotWrapWithIOException() {
Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class)))
.thenReturn(completableFuture);
- CompletableFuture executeFuture = requestExecutor.execute(context);
+ CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture();
assertThatThrownBy(executeFuture::join).hasCause(exception);
}
diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipeTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipeTest.java
new file mode 100644
index 00000000000..72f29278589
--- /dev/null
+++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipeTest.java
@@ -0,0 +1,396 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.crt.internal.request;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
+
+class BodyChunkPipeTest {
+
+ @Test
+ void pollDrain_emptyOpenPipe_returnsZero() {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ ByteBuffer dst = ByteBuffer.allocate(8);
+
+ int n = pipe.pollDrain(dst);
+
+ assertThat(n).isZero();
+ assertThat(dst.position()).isZero();
+ }
+
+ @Test
+ void pollDrain_afterEofWithEmptyQueue_returnsMinusOne() {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ pipe.signalEof();
+
+ int n = pipe.pollDrain(ByteBuffer.allocate(8));
+
+ assertThat(n).isEqualTo(-1);
+ }
+
+ @Test
+ void publish_thenDrain_consumerSeesProducerBytes() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ Chunk c = pipe.acquireForFill();
+ byte[] payload = {1, 2, 3, 4, 5};
+ System.arraycopy(payload, 0, c.data(), 0, payload.length);
+ c.pos(0);
+ c.len(payload.length);
+ pipe.publish(c);
+ pipe.signalEof();
+ ByteBuffer dst = ByteBuffer.allocate(16);
+
+ int first = pipe.pollDrain(dst);
+ int second = pipe.pollDrain(dst);
+
+ assertThat(first).isEqualTo(payload.length);
+ assertThat(second).isEqualTo(-1);
+ dst.flip();
+ byte[] out = new byte[dst.remaining()];
+ dst.get(out);
+ assertThat(out).containsExactly(payload);
+ }
+
+ @Test
+ void signalError_pollDrainThrows() {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ pipe.signalError(new RuntimeException("boom"));
+
+ assertThatThrownBy(() -> pipe.pollDrain(ByteBuffer.allocate(8)))
+ .hasMessageContaining("Producer failed")
+ .hasRootCauseMessage("boom");
+ }
+
+ @Test
+ void abort_emptiesReadyAndChangesState() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ Chunk c = pipe.acquireForFill();
+ c.len(4);
+ pipe.publish(c);
+
+ pipe.abort();
+
+ assertThatThrownBy(() -> pipe.pollDrain(ByteBuffer.allocate(8)))
+ .hasMessageContaining("aborted");
+ }
+
+ @Test
+ void pollDrain_signalErrorWithQueuedChunks_drainsThenThrows() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ Chunk c = pipe.acquireForFill();
+ byte[] payload = {7, 8, 9};
+ System.arraycopy(payload, 0, c.data(), 0, payload.length);
+ c.len(payload.length);
+ pipe.publish(c);
+ pipe.signalError(new RuntimeException("boom"));
+
+ ByteBuffer dst = ByteBuffer.allocate(payload.length);
+ int drained = pipe.pollDrain(dst);
+
+ assertThat(drained).isEqualTo(payload.length);
+ dst.flip();
+ byte[] out = new byte[dst.remaining()];
+ dst.get(out);
+ assertThat(out).containsExactly(payload);
+ assertThatThrownBy(() -> pipe.pollDrain(ByteBuffer.allocate(8)))
+ .hasMessageContaining("Producer failed")
+ .hasRootCauseMessage("boom");
+ }
+
+ @Test
+ void pollDrain_signalEofWithQueuedChunks_drainsThenReturnsMinusOne() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ Chunk c = pipe.acquireForFill();
+ byte[] payload = {10, 20, 30};
+ System.arraycopy(payload, 0, c.data(), 0, payload.length);
+ c.len(payload.length);
+ pipe.publish(c);
+ pipe.signalEof();
+
+ ByteBuffer dst = ByteBuffer.allocate(payload.length);
+ int drained = pipe.pollDrain(dst);
+ int afterDrain = pipe.pollDrain(ByteBuffer.allocate(8));
+
+ assertThat(drained).isEqualTo(payload.length);
+ dst.flip();
+ byte[] out = new byte[dst.remaining()];
+ dst.get(out);
+ assertThat(out).containsExactly(payload);
+ assertThat(afterDrain).isEqualTo(-1);
+ }
+
+ @Test
+ void abort_afterSignalEof_leavesStateAsEof() {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ pipe.signalEof();
+
+ pipe.abort();
+
+ assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.EOF);
+ assertThat(pipe.pollDrain(ByteBuffer.allocate(8))).isEqualTo(-1);
+ }
+
+ @Test
+ void abort_afterSignalEofWithQueuedChunks_doesNotClearReady() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ Chunk c = pipe.acquireForFill();
+ byte[] payload = {1, 2, 3};
+ System.arraycopy(payload, 0, c.data(), 0, payload.length);
+ c.len(payload.length);
+ pipe.publish(c);
+ pipe.signalEof();
+
+ pipe.abort();
+
+ assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.EOF);
+ ByteBuffer dst = ByteBuffer.allocate(payload.length);
+ int drained = pipe.pollDrain(dst);
+ assertThat(drained).isEqualTo(payload.length);
+ assertThat(pipe.pollDrain(ByteBuffer.allocate(8))).isEqualTo(-1);
+ }
+
+ @Test
+ void recycle_intoEofPipe_doesNotThrowAndDoesNotCorruptPool() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ Chunk c = pipe.acquireForFill();
+ c.len(4);
+ pipe.publish(c);
+ pipe.signalEof();
+
+ ByteBuffer dst = ByteBuffer.allocate(8);
+ int drained = pipe.pollDrain(dst);
+ int afterDrain = pipe.pollDrain(ByteBuffer.allocate(8));
+
+ assertThat(drained).isEqualTo(4);
+ assertThat(afterDrain).isEqualTo(-1);
+ assertThat(pipe.allocatedForTest()).isEqualTo(1);
+ }
+
+ @Test
+ void recycle_intoAbortedPipe_doesNotThrow() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ Chunk c = pipe.acquireForFill();
+ pipe.abort();
+
+ c.len(0);
+ pipe.publish(c);
+
+ assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.ABORTED);
+ }
+
+ @Test
+ void recycle_intoErrorPipe_doesNotThrow() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ Chunk c = pipe.acquireForFill();
+ pipe.signalError(new RuntimeException("boom"));
+
+ c.len(0);
+ pipe.publish(c);
+
+ assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.ERROR);
+ }
+
+ @Test
+ void constructor_doesNotAllocateChunks() {
+ BodyChunkPipe pipe = new BodyChunkPipe(4, 16);
+
+ assertThat(pipe.allocatedForTest()).isZero();
+ }
+
+ @Test
+ void acquireForFill_firstCall_allocatesOneChunk() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(4, 16);
+
+ Chunk c = pipe.acquireForFill();
+
+ assertThat(c).isNotNull();
+ assertThat(c.data()).hasSize(16);
+ assertThat(pipe.allocatedForTest()).isEqualTo(1);
+ }
+
+ @Test
+ void acquireForFill_uniqueChunksUpToDepth_thenStopsAllocating() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(3, 8);
+ Chunk c1 = pipe.acquireForFill();
+ Chunk c2 = pipe.acquireForFill();
+ Chunk c3 = pipe.acquireForFill();
+
+ c1.len(1);
+ pipe.publish(c1);
+ pipe.pollDrain(ByteBuffer.allocate(8));
+ Chunk reused = pipe.acquireForFill();
+
+ assertThat(c1).isNotSameAs(c2).isNotSameAs(c3);
+ assertThat(c2).isNotSameAs(c3);
+ assertThat(pipe.allocatedForTest()).isEqualTo(3);
+ assertThat(reused).isSameAs(c1);
+ }
+
+ @Test
+ void acquireForFill_recycledChunkReused_noNewAllocation() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ Chunk c = pipe.acquireForFill();
+ c.len(3);
+ pipe.publish(c);
+ pipe.pollDrain(ByteBuffer.allocate(8));
+
+ Chunk reused = pipe.acquireForFill();
+
+ assertThat(reused).isSameAs(c);
+ assertThat(pipe.allocatedForTest()).isEqualTo(1);
+ }
+
+ @Test
+ void acquireForFill_afterAbort_returnsNull() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ pipe.abort();
+
+ Chunk c = pipe.acquireForFill();
+
+ assertThat(c).isNull();
+ }
+
+ @Test
+ void acquireForFill_afterSignalError_returnsNull() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ pipe.signalError(new RuntimeException("boom"));
+
+ Chunk c = pipe.acquireForFill();
+
+ assertThat(c).isNull();
+ }
+
+ @Test
+ void constructor_invalidDepth_throws() {
+ assertThatThrownBy(() -> new BodyChunkPipe(0, 8))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("depth");
+ }
+
+ @Test
+ void constructor_invalidChunkSize_throws() {
+ assertThatThrownBy(() -> new BodyChunkPipe(2, 0))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("chunkSize");
+ }
+
+ /**
+ * Multi-threaded ordering test: producer races to call {@link BodyChunkPipe#signalError(Throwable)}
+ * while a consumer is concurrently spinning on {@link BodyChunkPipe#pollDrain(java.nio.ByteBuffer)}.
+ * The contract is that whenever the consumer observes the ERROR state, the cause must already
+ * be visible (no {@code RuntimeException("Producer failed", null)}). RepeatedTest amplifies the
+ * race window. With the cause published before the CAS, this should pass on every iteration.
+ */
+ @RepeatedTest(50)
+ void signalError_concurrentPollDrain_consumerNeverSeesNullCause() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 16);
+ IllegalStateException expected = new IllegalStateException("boom");
+ CountDownLatch start = new CountDownLatch(1);
+ AtomicReference consumerError = new AtomicReference<>();
+ AtomicReference nullCauseSighting = new AtomicReference<>();
+
+ Thread consumer = new Thread(() -> {
+ try {
+ start.await();
+ ByteBuffer dst = ByteBuffer.allocate(16);
+ long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5);
+ while (System.nanoTime() < deadline) {
+ try {
+ int n = pipe.pollDrain(dst);
+ if (n < 0) {
+ return;
+ }
+ dst.clear();
+ } catch (RuntimeException re) {
+ if (re.getCause() == null) {
+ nullCauseSighting.set(re);
+ }
+ return;
+ }
+ }
+ } catch (Throwable t) {
+ consumerError.set(t);
+ }
+ }, "pipe-consumer");
+
+ Thread producer = new Thread(() -> {
+ try {
+ start.await();
+ pipe.signalError(expected);
+ } catch (Throwable t) {
+ consumerError.set(t);
+ }
+ }, "pipe-producer");
+
+ consumer.start();
+ producer.start();
+ start.countDown();
+ producer.join(5_000);
+ consumer.join(5_000);
+
+ assertThat(consumer.isAlive()).isFalse();
+ assertThat(producer.isAlive()).isFalse();
+ assertThat(consumerError.get()).isNull();
+ assertThat(nullCauseSighting.get()).isNull();
+ }
+
+ /**
+ * Multi-threaded test for the recycle/notify path: producer is forced to block on
+ * {@link BodyChunkPipe#acquireForFill()} because all chunks are in flight, then the consumer
+ * drains a chunk which {@code recycle()}s and notifies the producer to wake. This exercises the
+ * full {@code freeLock.notifyAll()} hand-off rather than relying on the defensive 50ms wakeup.
+ */
+ @Test
+ void acquireForFill_blocksUntilConsumerRecycles_thenWakesAndCompletes() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(1, 8);
+ Chunk first = pipe.acquireForFill();
+ first.len(4);
+ pipe.publish(first);
+
+ CountDownLatch producerEntered = new CountDownLatch(1);
+ AtomicReference reused = new AtomicReference<>();
+ AtomicReference producerError = new AtomicReference<>();
+ Thread producer = new Thread(() -> {
+ try {
+ producerEntered.countDown();
+ Chunk c = pipe.acquireForFill();
+ reused.set(c);
+ } catch (Throwable t) {
+ producerError.set(t);
+ }
+ }, "pipe-producer");
+
+ producer.start();
+ producerEntered.await();
+ // Drain so the chunk is recycled and the producer is woken via notifyAll.
+ long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5);
+ while (reused.get() == null && System.nanoTime() < deadline) {
+ pipe.pollDrain(ByteBuffer.allocate(8));
+ producer.join(50);
+ }
+
+ assertThat(producerError.get()).isNull();
+ assertThat(reused.get()).isSameAs(first);
+ assertThat(pipe.allocatedForTest()).isEqualTo(1);
+ }
+}
diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/PipeBackedRequestBodyStreamTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/PipeBackedRequestBodyStreamTest.java
new file mode 100644
index 00000000000..1add84bb4cd
--- /dev/null
+++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/PipeBackedRequestBodyStreamTest.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.crt.internal.request;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.nio.ByteBuffer;
+import org.junit.jupiter.api.Test;
+
+class PipeBackedRequestBodyStreamTest {
+
+ @Test
+ void sendRequestBody_emptyOpenPipe_returnsFalseAndCopiesNothing() {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ PipeBackedRequestBodyStream stream = new PipeBackedRequestBodyStream(pipe);
+ ByteBuffer dst = ByteBuffer.allocate(8);
+
+ boolean done = stream.sendRequestBody(dst);
+
+ assertThat(done).isFalse();
+ assertThat(dst.position()).isZero();
+ }
+
+ @Test
+ void sendRequestBody_afterEofAndDrained_returnsTrue() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ Chunk c = pipe.acquireForFill();
+ byte[] payload = {1, 2, 3};
+ System.arraycopy(payload, 0, c.data(), 0, payload.length);
+ c.len(payload.length);
+ pipe.publish(c);
+ pipe.signalEof();
+ PipeBackedRequestBodyStream stream = new PipeBackedRequestBodyStream(pipe);
+
+ ByteBuffer first = ByteBuffer.allocate(8);
+ boolean firstDone = stream.sendRequestBody(first);
+ ByteBuffer second = ByteBuffer.allocate(8);
+ boolean secondDone = stream.sendRequestBody(second);
+
+ assertThat(firstDone).isFalse();
+ assertThat(first.position()).isEqualTo(payload.length);
+ assertThat(secondDone).isTrue();
+ assertThat(second.position()).isZero();
+ }
+
+ @Test
+ void sendRequestBody_pipeInError_throwsRuntimeExceptionWithCause() {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ IllegalStateException cause = new IllegalStateException("upstream broke");
+ pipe.signalError(cause);
+ PipeBackedRequestBodyStream stream = new PipeBackedRequestBodyStream(pipe);
+
+ assertThatThrownBy(() -> stream.sendRequestBody(ByteBuffer.allocate(8)))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("Producer failed")
+ .hasRootCauseMessage("upstream broke");
+ }
+
+ @Test
+ void sendRequestBody_pipeAborted_throwsRuntimeException() {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ pipe.abort();
+ PipeBackedRequestBodyStream stream = new PipeBackedRequestBodyStream(pipe);
+
+ assertThatThrownBy(() -> stream.sendRequestBody(ByteBuffer.allocate(8)))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("aborted");
+ }
+
+ @Test
+ void resetPosition_returnsFalse() {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ PipeBackedRequestBodyStream stream = new PipeBackedRequestBodyStream(pipe);
+
+ assertThat(stream.resetPosition()).isFalse();
+ }
+
+ /**
+ * When CRT's destination buffer is smaller than the chunk size, draining a single chunk
+ * requires multiple {@code sendRequestBody} calls. This exercises {@link BodyChunkPipe#pollDrain}'s
+ * {@code pendingDrain} state being carried across consumer invocations.
+ */
+ @Test
+ void sendRequestBody_destinationSmallerThanChunk_drainsAcrossMultipleCalls() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 16);
+ Chunk c = pipe.acquireForFill();
+ byte[] payload = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
+ System.arraycopy(payload, 0, c.data(), 0, payload.length);
+ c.len(payload.length);
+ pipe.publish(c);
+ pipe.signalEof();
+ PipeBackedRequestBodyStream stream = new PipeBackedRequestBodyStream(pipe);
+
+ ByteBuffer first = ByteBuffer.allocate(3);
+ ByteBuffer second = ByteBuffer.allocate(3);
+ ByteBuffer third = ByteBuffer.allocate(3);
+ ByteBuffer fourth = ByteBuffer.allocate(3);
+ ByteBuffer fifth = ByteBuffer.allocate(3);
+ boolean firstDone = stream.sendRequestBody(first);
+ boolean secondDone = stream.sendRequestBody(second);
+ boolean thirdDone = stream.sendRequestBody(third);
+ boolean fourthDone = stream.sendRequestBody(fourth);
+ boolean fifthDone = stream.sendRequestBody(fifth);
+
+ assertThat(firstDone).isFalse();
+ assertThat(secondDone).isFalse();
+ assertThat(thirdDone).isFalse();
+ assertThat(fourthDone).isFalse();
+ assertThat(fifthDone).isTrue();
+ assertThat(first.position()).isEqualTo(3);
+ assertThat(second.position()).isEqualTo(3);
+ assertThat(third.position()).isEqualTo(3);
+ assertThat(fourth.position()).isEqualTo(1);
+ assertThat(fifth.position()).isZero();
+
+ byte[] reassembled = new byte[payload.length];
+ first.flip();
+ first.get(reassembled, 0, 3);
+ second.flip();
+ second.get(reassembled, 3, 3);
+ third.flip();
+ third.get(reassembled, 6, 3);
+ fourth.flip();
+ fourth.get(reassembled, 9, 1);
+ assertThat(reassembled).containsExactly(payload);
+ }
+}
diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPumpTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPumpTest.java
new file mode 100644
index 00000000000..21d0d4fa1ec
--- /dev/null
+++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPumpTest.java
@@ -0,0 +1,239 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.crt.internal.request;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.LockSupport;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.http.ContentStreamProvider;
+import software.amazon.awssdk.http.SdkHttpFullResponse;
+
+class SyncRequestBodyPumpTest {
+
+ @Test
+ void pump_happyPath_consumerSeesAllProducerBytes() throws Exception {
+ byte[] payload = new byte[200];
+ for (int i = 0; i < payload.length; i++) {
+ payload[i] = (byte) (i & 0xFF);
+ }
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 32);
+ SyncRequestBodyPump pump = new SyncRequestBodyPump(ContentStreamProvider.fromByteArray(payload), pipe);
+ AtomicReference producerError = new AtomicReference<>();
+ Thread producer = new Thread(() -> {
+ try {
+ pump.pump();
+ } catch (Throwable t) {
+ producerError.set(t);
+ }
+ }, "pump-producer");
+
+ producer.start();
+ byte[] consumed = drainAll(pipe, payload.length);
+ producer.join(5_000);
+
+ assertThat(producerError.get()).isNull();
+ assertThat(producer.isAlive()).isFalse();
+ assertThat(consumed).containsExactly(payload);
+ assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.EOF);
+ }
+
+ @Test
+ void pump_emptyStream_signalsEofWithoutPublish() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 16);
+ SyncRequestBodyPump pump = new SyncRequestBodyPump(ContentStreamProvider.fromByteArray(new byte[0]), pipe);
+
+ pump.pump();
+
+ assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.EOF);
+ assertThat(pipe.pollDrain(ByteBuffer.allocate(8))).isEqualTo(-1);
+ }
+
+ @Test
+ void pump_inputStreamThrowsIoException_pumpSignalsErrorAndRethrows() {
+ IOException ioe = new IOException("disk gone");
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 16);
+ ContentStreamProvider provider = () -> new InputStream() {
+ @Override
+ public int read() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ throw ioe;
+ }
+ };
+ SyncRequestBodyPump pump = new SyncRequestBodyPump(provider, pipe);
+
+ assertThatThrownBy(pump::pump).isSameAs(ioe);
+ assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.ERROR);
+ assertThatThrownBy(() -> pipe.pollDrain(ByteBuffer.allocate(8)))
+ .hasMessageContaining("Producer failed")
+ .hasRootCauseMessage("disk gone");
+ }
+
+ @Test
+ void pump_abortedWhilePumping_returnsWithoutSignalingEof() throws Exception {
+ // pipe depth 1 + payload larger than chunk forces producer to block on second acquireForFill,
+ // giving the test thread a deterministic point to call abort().
+ BodyChunkPipe pipe = new BodyChunkPipe(1, 8);
+ byte[] payload = new byte[64];
+ SyncRequestBodyPump pump = new SyncRequestBodyPump(ContentStreamProvider.fromByteArray(payload), pipe);
+ AtomicReference producerError = new AtomicReference<>();
+ Thread producer = new Thread(() -> {
+ try {
+ pump.pump();
+ } catch (Throwable t) {
+ producerError.set(t);
+ }
+ }, "pump-producer");
+
+ producer.start();
+ waitUntilStateIsOpenWithChunkInFlight(pipe);
+ pump.abort();
+ producer.join(5_000);
+
+ assertThat(producer.isAlive()).isFalse();
+ assertThat(producerError.get()).isNull();
+ assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.ABORTED);
+ }
+
+ /**
+ * Regression test for the producer-livelock-on-CRT-failure path.
+ *
+ * If CRT signals request failure (network error, idle/health timeout, etc.) while the
+ * producer is parked in {@link BodyChunkPipe#acquireForFill()}, nothing in the pipe's normal
+ * contract wakes it without a recycle/abort. The fix in {@code AwsCrtHttpClient.CrtHttpRequest.call()}
+ * registers a {@code responseFuture.whenComplete(...)} hook that calls {@code pump.abort()}
+ * when the response future completes exceptionally. This test reproduces that wiring
+ * at the unit level: a pump runs against a pipe with no consumer, the producer parks once the
+ * pipe is full, and we then complete a separate response future exceptionally with the same
+ * hook to verify the producer unblocks and {@code pump()} returns.
+ *
+ *
Without the hook (or equivalent abort path), {@code producer.join(5_000)} would time out
+ * and the test would fail.
+ */
+ @Test
+ void pump_responseFutureFailsExceptionally_whileProducerParked_unblocksProducerViaAbortHook() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ // Payload larger than depth*chunkSize forces the producer to park on acquireForFill once
+ // both chunks are sitting in the ready queue with no consumer draining.
+ byte[] payload = new byte[128];
+ SyncRequestBodyPump pump = new SyncRequestBodyPump(ContentStreamProvider.fromByteArray(payload), pipe);
+ CompletableFuture responseFuture = new CompletableFuture<>();
+ responseFuture.whenComplete((r, t) -> {
+ if (t != null) {
+ pump.abort();
+ }
+ });
+
+ AtomicReference producerError = new AtomicReference<>();
+ Thread producer = new Thread(() -> {
+ try {
+ pump.pump();
+ } catch (Throwable t) {
+ producerError.set(t);
+ }
+ }, "pump-producer");
+
+ producer.start();
+ waitUntilProducerIsParked(pipe);
+ responseFuture.completeExceptionally(new IOException("simulated CRT failure"));
+ producer.join(5_000);
+
+ assertThat(producer.isAlive()).isFalse();
+ assertThat(producerError.get()).isNull();
+ assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.ABORTED);
+ }
+
+ @Test
+ void abort_propagatesToPipe() {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ SyncRequestBodyPump pump = new SyncRequestBodyPump(
+ ContentStreamProvider.fromByteArray(new byte[0]), pipe);
+
+ pump.abort();
+
+ assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.ABORTED);
+ }
+
+ private static byte[] drainAll(BodyChunkPipe pipe, int expected) {
+ byte[] out = new byte[expected];
+ int written = 0;
+ long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5);
+ while (written < expected && System.nanoTime() < deadline) {
+ ByteBuffer scratch = ByteBuffer.allocate(Math.min(64, expected - written));
+ int n = pipe.pollDrain(scratch);
+ if (n < 0) {
+ break;
+ }
+ if (n == 0) {
+ LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1));
+ continue;
+ }
+ scratch.flip();
+ scratch.get(out, written, n);
+ written += n;
+ }
+ if (written < expected) {
+ throw new AssertionError("Drained only " + written + " of " + expected + " bytes");
+ }
+ return out;
+ }
+
+ private static void waitUntilStateIsOpenWithChunkInFlight(BodyChunkPipe pipe) throws InterruptedException {
+ long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5);
+ while (System.nanoTime() < deadline) {
+ if (pipe.allocatedForTest() >= 1) {
+ return;
+ }
+ Thread.sleep(1);
+ }
+ throw new AssertionError("Producer did not allocate a chunk within timeout");
+ }
+
+ /**
+ * Wait for the producer to park on {@code acquireForFill}. Detected by the pipe reaching its
+ * configured depth in allocations and then staying there for a couple of consecutive observations
+ * (the producer can't make further progress without a recycle).
+ */
+ private static void waitUntilProducerIsParked(BodyChunkPipe pipe) throws InterruptedException {
+ long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5);
+ int stableObservations = 0;
+ int lastAllocated = -1;
+ while (System.nanoTime() < deadline) {
+ int allocated = pipe.allocatedForTest();
+ if (allocated == lastAllocated && allocated > 0) {
+ if (++stableObservations >= 3) {
+ return;
+ }
+ } else {
+ stableObservations = 0;
+ lastAllocated = allocated;
+ }
+ Thread.sleep(20);
+ }
+ throw new AssertionError("Producer did not park within timeout");
+ }
+}
diff --git a/services/s3/src/it/java/software/amazon/awssdk/services/s3/crthttpclient/S3WithCrtHttpClientsIntegrationTests.java b/services/s3/src/it/java/software/amazon/awssdk/services/s3/crthttpclient/S3WithCrtHttpClientsIntegrationTests.java
index f1eb1c6e4f2..943635dd41f 100644
--- a/services/s3/src/it/java/software/amazon/awssdk/services/s3/crthttpclient/S3WithCrtHttpClientsIntegrationTests.java
+++ b/services/s3/src/it/java/software/amazon/awssdk/services/s3/crthttpclient/S3WithCrtHttpClientsIntegrationTests.java
@@ -16,12 +16,13 @@
package software.amazon.awssdk.services.s3.crthttpclient;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
-import org.assertj.core.api.Assertions;
+import java.time.Duration;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -32,6 +33,7 @@
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3IntegrationTestBase;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.utils.ChecksumUtils;
import software.amazon.awssdk.testutils.RandomTempFile;
import software.amazon.awssdk.utils.Md5Utils;
@@ -80,4 +82,22 @@ void getObject_toFile_objectSentCorrectly() throws Exception {
assertThat(Md5Utils.md5AsBase64(destination.toFile())).isEqualTo(Md5Utils.md5AsBase64(testFile));
}
+
+ @Test
+ void getObject_responseStreamPipedIntoPutObject_completesWithoutDeadlock() throws Exception {
+ String destinationKey = "piped-" + TEST_KEY;
+ try (ResponseInputStream sourceStream =
+ s3WithCrtHttpClient.getObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY),
+ ResponseTransformer.toInputStream())) {
+ long contentLength = sourceStream.response().contentLength();
+
+ PutObjectResponse putResponse = assertTimeoutPreemptively(
+ Duration.ofSeconds(120),
+ () -> s3WithCrtHttpClient.putObject(
+ r -> r.bucket(TEST_BUCKET).key(destinationKey).contentLength(contentLength),
+ RequestBody.fromInputStream(sourceStream, contentLength)));
+
+ assertThat(putResponse.eTag()).isNotBlank();
+ }
+ }
}
From 3fe06e311ac06b10b9f6ec8c61462babe69165fb Mon Sep 17 00:00:00 2001
From: Zoe Wang <33073555+zoewangg@users.noreply.github.com>
Date: Wed, 10 Jun 2026 11:50:09 -0700
Subject: [PATCH 2/8] Add changelog entry for sync CRT pull-pump fix
---
.../next-release/bugfix-AWSCRTbasedHTTPClient-d1be626.json | 6 ++++++
1 file changed, 6 insertions(+)
create mode 100644 .changes/next-release/bugfix-AWSCRTbasedHTTPClient-d1be626.json
diff --git a/.changes/next-release/bugfix-AWSCRTbasedHTTPClient-d1be626.json b/.changes/next-release/bugfix-AWSCRTbasedHTTPClient-d1be626.json
new file mode 100644
index 00000000000..bf864237642
--- /dev/null
+++ b/.changes/next-release/bugfix-AWSCRTbasedHTTPClient-d1be626.json
@@ -0,0 +1,6 @@
+{
+ "type": "bugfix",
+ "category": "AWS CRT-based HTTP Client",
+ "contributor": "",
+ "description": "Fixed an issue where AwsCrtHttpClient (sync) could deadlock when a request body was sourced from an InputStream that depends on the same CRT event loop, for example when piping a GetObject ResponseInputStream into a PutObject body. The InputStream read now happens on the caller thread instead of the CRT event-loop thread."
+}
From ba13c6884ed15a01f6a8bbe69eb6041bfd21d892 Mon Sep 17 00:00:00 2001
From: Zoe Wang <33073555+zoewangg@users.noreply.github.com>
Date: Wed, 10 Jun 2026 14:07:45 -0700
Subject: [PATCH 3/8] Add timeout and diagnostic logging to
waitForStreamAcquired
---
.../amazon/awssdk/spotbugs-suppressions.xml | 4 ++
.../awssdk/http/crt/AwsCrtHttpClient.java | 59 +++++++++++++++----
.../awssdk/http/crt/AwsCrtHttpClientBase.java | 2 +-
.../http/crt/internal/CrtRequestContext.java | 12 ++++
.../http/crt/internal/CrtRequestExecutor.java | 13 ++++
.../crt/internal/request/BodyChunkPipe.java | 12 +++-
.../internal/request/SyncRequestBodyPump.java | 9 ++-
7 files changed, 95 insertions(+), 16 deletions(-)
diff --git a/build-tools/src/main/resources/software/amazon/awssdk/spotbugs-suppressions.xml b/build-tools/src/main/resources/software/amazon/awssdk/spotbugs-suppressions.xml
index 0f6ab0d22c6..446d747b435 100644
--- a/build-tools/src/main/resources/software/amazon/awssdk/spotbugs-suppressions.xml
+++ b/build-tools/src/main/resources/software/amazon/awssdk/spotbugs-suppressions.xml
@@ -353,6 +353,10 @@
caller (sync) thread, never on the CRT event loop. -->
+
+
+
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java
index 709cc912152..dbf58ef21f5 100644
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java
@@ -21,6 +21,9 @@
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.crt.http.HttpException;
@@ -39,6 +42,7 @@
import software.amazon.awssdk.http.crt.internal.request.SyncRequestBodyPump;
import software.amazon.awssdk.utils.AttributeMap;
import software.amazon.awssdk.utils.CompletableFutureUtils;
+import software.amazon.awssdk.utils.Logger;
/**
* An implementation of {@link SdkHttpClient} that uses the AWS Common Runtime (CRT) Http Client to communicate with
@@ -104,11 +108,14 @@ public ExecutableHttpRequest prepareRequest(HttpExecuteRequest request) {
.streamManager(streamManager)
.readBufferSize(this.readBufferSize)
.request(request)
+ .connectionAcquisitionTimeoutMillis(this.connectionAcquisitionTimeout)
.build();
return new CrtHttpRequest(context);
}
private static final class CrtHttpRequest implements ExecutableHttpRequest {
+ private static final Logger LOG = Logger.loggerFor(CrtHttpRequest.class);
+
private final CrtRequestContext context;
private volatile CompletableFuture responseFuture;
private volatile SyncRequestBodyPump pump;
@@ -120,56 +127,68 @@ private CrtHttpRequest(CrtRequestContext context) {
@Override
public HttpExecuteResponse call() throws IOException {
HttpExecuteResponse.Builder builder = HttpExecuteResponse.builder();
+ boolean hasBody = context.sdkRequest().contentStreamProvider().isPresent();
+ LOG.info(() -> "call() entered, hasBody=" + hasBody);
try {
CrtRequestExecutor.Result result = new CrtRequestExecutor().execute(context);
responseFuture = result.responseFuture();
pump = result.pump();
+ LOG.info(() -> "call() executor.execute() returned, streamFuture pending, pump="
+ + (pump != null ? "non-null" : "null"));
- // Abort the pump to unblock a parked producer when CRT signals request failure
- // via responseFuture.
if (pump != null) {
SyncRequestBodyPump pumpRef = pump;
responseFuture.whenComplete((r, t) -> {
if (t != null) {
+ LOG.info(() -> "responseFuture hook: invoking pump.abort() (cause="
+ + t.getClass().getSimpleName() + ")");
pumpRef.abort();
}
});
}
- boolean streamAcquired = waitForStreamAcquired(result.streamFuture());
+ LOG.info(() -> "call() entering waitForStreamAcquired, timeoutMillis="
+ + context.connectionAcquisitionTimeoutMillis());
+ boolean streamAcquired = waitForStreamAcquired(result.streamFuture(),
+ context.connectionAcquisitionTimeoutMillis());
+ LOG.info(() -> "call() waitForStreamAcquired returned " + streamAcquired);
- // No body case (pump == null): CRT writes the request line + headers when the stream
- // is acquired and emits no body callbacks. We only need to join responseFuture below.
if (pump != null) {
if (streamAcquired) {
+ LOG.info(() -> "call() entering pump.pump()");
try {
pump.pump();
+ LOG.info(() -> "call() pump.pump() returned");
} catch (IOException ioe) {
+ LOG.info(() -> "call() pump.pump() threw IOException: " + ioe.getMessage());
responseFuture.completeExceptionally(ioe);
throw ioe;
}
} else {
+ LOG.info(() -> "call() invoking pump.abort() (post-wait, streamAcquired=false)");
pump.abort();
}
}
+ LOG.info(() -> "call() entering joinInterruptibly(responseFuture)");
SdkHttpFullResponse response = CompletableFutureUtils.joinInterruptibly(responseFuture);
+ LOG.info(() -> "call() responseFuture joined: success");
builder.response(response);
builder.responseBody(response.content().orElse(null));
+ LOG.info(() -> "call() exiting normally");
return builder.build();
} catch (CompletionException e) {
Throwable cause = e.getCause();
+ LOG.info(() -> "call() catch CompletionException, cause="
+ + (cause == null ? "" : cause.getClass().getName() + ": " + cause.getMessage()));
- // Complete the future exceptionally to trigger connection cleanup in the response handler.
- // Handles the thread-interrupt case where joinInterruptibly throws due to
- // InterruptedException, ensuring closeConnection() is invoked to prevent leaking the
- // connection from the pool.
if (responseFuture != null) {
responseFuture.completeExceptionally(cause != null ? cause : e);
}
if (pump != null) {
+ LOG.info(() -> "call() catch invoking pump.abort()");
pump.abort();
}
@@ -191,22 +210,38 @@ public HttpExecuteResponse call() throws IOException {
@Override
public void abort() {
+ LOG.info(() -> "abort() called externally");
if (responseFuture != null) {
responseFuture.completeExceptionally(new IOException("Request was cancelled"));
}
if (pump != null) {
+ LOG.info(() -> "abort() invoking pump.abort()");
pump.abort();
}
}
- private boolean waitForStreamAcquired(CompletableFuture streamFuture) {
+ private boolean waitForStreamAcquired(CompletableFuture streamFuture, long timeoutMillis) {
if (streamFuture == null) {
+ LOG.info(() -> "waitForStreamAcquired: streamFuture==null, returning false");
return false;
}
+ LOG.info(() -> "waitForStreamAcquired: starting, timeout=" + timeoutMillis + "ms");
try {
- CompletableFutureUtils.joinInterruptibly(streamFuture);
+ streamFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
+ LOG.info(() -> "waitForStreamAcquired: streamFuture completed normally");
return true;
- } catch (CompletionException e) {
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.info(() -> "waitForStreamAcquired: interrupted");
+ return false;
+ } catch (TimeoutException e) {
+ LOG.warn(() -> "waitForStreamAcquired: timed out after " + timeoutMillis
+ + "ms - streamFuture still pending");
+ return false;
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ LOG.info(() -> "waitForStreamAcquired: streamFuture completed exceptionally: "
+ + (cause == null ? e.getMessage() : cause.getClass().getName() + ": " + cause.getMessage()));
return false;
}
}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientBase.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientBase.java
index 50689d2236d..afe19159e3c 100644
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientBase.java
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientBase.java
@@ -61,6 +61,7 @@ abstract class AwsCrtHttpClientBase implements SdkAutoCloseable {
protected final long readBufferSize;
protected final Protocol protocol;
+ protected final long connectionAcquisitionTimeout;
private final Map connectionPools = new ConcurrentHashMap<>();
private final LinkedList ownedSubResources = new LinkedList<>();
private final ClientBootstrap bootstrap;
@@ -70,7 +71,6 @@ abstract class AwsCrtHttpClientBase implements SdkAutoCloseable {
private final HttpMonitoringOptions monitoringOptions;
private final long maxConnectionIdleInMilliseconds;
private final int maxStreamsPerEndpoint;
- private final long connectionAcquisitionTimeout;
private final TlsContextOptions tlsContextOptions;
private boolean isClosed = false;
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestContext.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestContext.java
index ba97cc3466e..76771f859d0 100644
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestContext.java
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestContext.java
@@ -26,12 +26,14 @@ public final class CrtRequestContext {
private final long readBufferSize;
private final HttpStreamManager streamManager;
private final MetricCollector metricCollector;
+ private final long connectionAcquisitionTimeoutMillis;
private CrtRequestContext(Builder builder) {
this.request = builder.request;
this.readBufferSize = builder.readBufferSize;
this.streamManager = builder.streamManager;
this.metricCollector = request.metricCollector().orElse(null);
+ this.connectionAcquisitionTimeoutMillis = builder.connectionAcquisitionTimeoutMillis;
}
public static Builder builder() {
@@ -54,10 +56,15 @@ public MetricCollector metricCollector() {
return metricCollector;
}
+ public long connectionAcquisitionTimeoutMillis() {
+ return connectionAcquisitionTimeoutMillis;
+ }
+
public static final class Builder {
private HttpExecuteRequest request;
private long readBufferSize;
private HttpStreamManager streamManager;
+ private long connectionAcquisitionTimeoutMillis;
private Builder() {
}
@@ -77,6 +84,11 @@ public Builder streamManager(HttpStreamManager streamManager) {
return this;
}
+ public Builder connectionAcquisitionTimeoutMillis(long connectionAcquisitionTimeoutMillis) {
+ this.connectionAcquisitionTimeoutMillis = connectionAcquisitionTimeoutMillis;
+ return this;
+ }
+
public CrtRequestContext build() {
return new CrtRequestContext(this);
}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java
index 03fccd4ccd7..5838a0216b0 100644
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java
@@ -28,9 +28,11 @@
import software.amazon.awssdk.http.crt.internal.response.InputStreamAdaptingHttpStreamResponseHandler;
import software.amazon.awssdk.metrics.MetricCollector;
import software.amazon.awssdk.metrics.NoOpMetricCollector;
+import software.amazon.awssdk.utils.Logger;
@SdkInternalApi
public final class CrtRequestExecutor {
+ private static final Logger LOG = Logger.loggerFor(CrtRequestExecutor.class);
public Result execute(CrtRequestContext executionContext) {
CompletableFuture requestFuture = new CompletableFuture<>();
@@ -45,29 +47,40 @@ public Result execute(CrtRequestContext executionContext) {
InputStreamAdaptingHttpStreamResponseHandler crtResponseHandler =
new InputStreamAdaptingHttpStreamResponseHandler(requestFuture);
SyncCrtRequest syncCrtRequest = CrtRequestAdapter.toCrtRequest(executionContext);
+ LOG.info(() -> "execute() acquireStream invoked");
CompletableFuture streamFuture =
executionContext.streamManager().acquireStream(syncCrtRequest.httpRequest(), crtResponseHandler);
// Evict the connection from the pool on failure so it is not reused.
requestFuture.whenComplete((r, t) -> {
if (t != null) {
+ LOG.info(() -> "execute() requestFuture exceptional: closeConnection() (cause="
+ + t.getClass().getSimpleName() + ")");
crtResponseHandler.closeConnection();
}
});
long finalAcquireStartTime = acquireStartTime;
streamFuture.whenComplete((streamBase, throwable) -> {
+ if (throwable == null) {
+ LOG.info(() -> "execute() streamFuture.whenComplete fired with success");
+ } else {
+ LOG.info(() -> "execute() streamFuture.whenComplete fired with throwable=" + throwable.getClass().getName()
+ + ": " + throwable.getMessage());
+ }
crtResponseHandler.onAcquireStream(streamBase);
if (shouldPublishMetrics) {
reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime);
}
if (throwable != null) {
+ LOG.info(() -> "execute() completing requestFuture exceptionally from streamFuture failure");
requestFuture.completeExceptionally(wrapCrtException(throwable));
}
});
return new Result(requestFuture, syncCrtRequest.pump(), streamFuture);
} catch (Throwable t) {
+ LOG.info(() -> "execute() outer catch, completing requestFuture exceptionally: " + t.getClass().getName());
requestFuture.completeExceptionally(t);
return new Result(requestFuture, null, null);
}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipe.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipe.java
index fdaed9a7bc6..7aa6d31b2d6 100644
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipe.java
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipe.java
@@ -22,6 +22,7 @@
import java.util.concurrent.atomic.AtomicReference;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.annotations.SdkTestInternalApi;
+import software.amazon.awssdk.utils.Logger;
/**
* Bounded producer/consumer hand-off between the caller thread (producer) and the CRT event-loop thread (consumer).
@@ -42,6 +43,7 @@
*/
@SdkInternalApi
final class BodyChunkPipe {
+ private static final Logger LOG = Logger.loggerFor(BodyChunkPipe.class);
enum State {
OPEN,
@@ -98,6 +100,7 @@ Chunk acquireForFill() throws InterruptedException {
while (true) {
State s = state.get();
if (s == State.ABORTED || s == State.ERROR) {
+ LOG.debug(() -> "acquireForFill returning null, state=" + s);
return null;
}
Chunk c = free.pollFirst();
@@ -135,7 +138,9 @@ void publish(Chunk chunk) throws InterruptedException {
* Producer side: signal end-of-stream. Idempotent.
*/
void signalEof() {
- state.compareAndSet(State.OPEN, State.EOF);
+ if (state.compareAndSet(State.OPEN, State.EOF)) {
+ LOG.debug(() -> "state OPEN -> EOF");
+ }
}
/**
@@ -147,7 +152,9 @@ void signalError(Throwable t) {
// never observes state==ERROR with error==null. The volatile write to `error` is
// harmless if the CAS later loses (idempotent signal).
error = t;
- state.compareAndSet(State.OPEN, State.ERROR);
+ if (state.compareAndSet(State.OPEN, State.ERROR)) {
+ LOG.debug(() -> "state OPEN -> ERROR (" + t.getClass().getSimpleName() + ")");
+ }
freeLock.notifyAll();
}
}
@@ -158,6 +165,7 @@ void signalError(Throwable t) {
void abort() {
synchronized (freeLock) {
if (state.compareAndSet(State.OPEN, State.ABORTED)) {
+ LOG.debug(() -> "state OPEN -> ABORTED");
ready.clear();
}
freeLock.notifyAll();
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPump.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPump.java
index dbacf58eede..015dd0d0ea3 100644
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPump.java
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPump.java
@@ -19,6 +19,7 @@
import java.io.InputStream;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.http.ContentStreamProvider;
+import software.amazon.awssdk.utils.Logger;
/**
* Caller-thread producer that reads from the customer's {@link InputStream} and publishes chunks to a
@@ -27,6 +28,7 @@
*/
@SdkInternalApi
public final class SyncRequestBodyPump {
+ private static final Logger LOG = Logger.loggerFor(SyncRequestBodyPump.class);
private final ContentStreamProvider contentStreamProvider;
private final BodyChunkPipe pipe;
@@ -41,21 +43,24 @@ public final class SyncRequestBodyPump {
* event-loop thread. On EOF signals the pipe normally; on {@link IOException} signals an error and rethrows.
*/
public void pump() throws IOException {
+ LOG.info(() -> "pump() entered");
try (InputStream in = contentStreamProvider.newStream()) {
while (true) {
Chunk chunk = pipe.acquireForFill();
if (chunk == null) {
- // pipe was aborted while we were waiting; stop without signaling EOF.
+ LOG.info(() -> "pump() exiting due to abort (acquireForFill returned null)");
return;
}
int read;
try {
read = in.read(chunk.data(), 0, chunk.data().length);
} catch (IOException ioe) {
+ LOG.info(() -> "pump() exiting due to error: " + ioe.getMessage());
pipe.signalError(ioe);
throw ioe;
}
if (read < 0) {
+ LOG.info(() -> "pump() exiting due to eof");
pipe.signalEof();
return;
}
@@ -65,6 +70,7 @@ public void pump() throws IOException {
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
+ LOG.info(() -> "pump() exiting due to interrupt");
pipe.abort();
throw new IOException("Interrupted while writing request body", ie);
}
@@ -74,6 +80,7 @@ public void pump() throws IOException {
* Abort the underlying pipe (e.g., when the caller's {@code call()} is cancelled).
*/
public void abort() {
+ LOG.info(() -> "pump.abort() called");
pipe.abort();
}
}
From f2172b43c8804bd7b3396f4c7a2c1bbd7057dc60 Mon Sep 17 00:00:00 2001
From: Zoe Wang <33073555+zoewangg@users.noreply.github.com>
Date: Thu, 11 Jun 2026 10:06:32 -0700
Subject: [PATCH 4/8] Dump threads on long-running-request test timeout
(diagnostic)
When assertFailsWithinTimeBound times out waiting for the request future,
print a full thread dump (via ThreadMXBean.dumpAllThreads with locked
monitors and synchronizers) to stderr before throwing the AssertionError.
Surefire captures the forked-JVM stderr to surefire-reports, so the dump
survives Catapult's report-export step. Search the per-class -output.txt
for the marker "=== THREAD DUMP ===".
Adds checkstyle-suppressions entries for LongRunningRequestTestSupport so
the java.lang.management imports and System.err usage do not trip
NonJavaBaseModuleCheck and the System-console Regexp rule.
---
.../amazon/awssdk/checkstyle-suppressions.xml | 7 +++++++
.../http/LongRunningRequestTestSupport.java | 16 ++++++++++++++++
2 files changed, 23 insertions(+)
diff --git a/build-tools/src/main/resources/software/amazon/awssdk/checkstyle-suppressions.xml b/build-tools/src/main/resources/software/amazon/awssdk/checkstyle-suppressions.xml
index 4e81373b0be..d09f3d00488 100644
--- a/build-tools/src/main/resources/software/amazon/awssdk/checkstyle-suppressions.xml
+++ b/build-tools/src/main/resources/software/amazon/awssdk/checkstyle-suppressions.xml
@@ -72,4 +72,11 @@
+
+
+
+
diff --git a/test/http-client-tests/src/main/java/software/amazon/awssdk/http/LongRunningRequestTestSupport.java b/test/http-client-tests/src/main/java/software/amazon/awssdk/http/LongRunningRequestTestSupport.java
index 723869ad852..e8ffd490c5c 100644
--- a/test/http-client-tests/src/main/java/software/amazon/awssdk/http/LongRunningRequestTestSupport.java
+++ b/test/http-client-tests/src/main/java/software/amazon/awssdk/http/LongRunningRequestTestSupport.java
@@ -20,6 +20,9 @@
import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
import com.github.tomakehurst.wiremock.junit5.WireMockExtension;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -79,6 +82,8 @@ public static void assertFailsWithinTimeBound(CompletableFuture> future, Durat
future.get(maxWait.toMillis(), TimeUnit.MILLISECONDS);
throw new AssertionError("Expected request to throw an exception but it completed successfully");
} catch (TimeoutException e) {
+ // Dump threads BEFORE cancelling the future so the snapshot reflects the hang.
+ System.err.println(dumpAllThreads());
future.cancel(true);
throw new AssertionError(
"Expected request to fail within " + maxWait + " but it was still running - client appears to hang",
@@ -90,4 +95,15 @@ public static void assertFailsWithinTimeBound(CompletableFuture> future, Durat
// expected
}
}
+
+ static String dumpAllThreads() {
+ ThreadMXBean tmx = ManagementFactory.getThreadMXBean();
+ ThreadInfo[] infos = tmx.dumpAllThreads(true, true);
+ StringBuilder sb = new StringBuilder("=== THREAD DUMP ===\n");
+ for (ThreadInfo info : infos) {
+ sb.append(info.toString()).append('\n');
+ }
+ sb.append("=== END THREAD DUMP ===\n");
+ return sb.toString();
+ }
}
From 31bb55bdc27c3eacb54f08bfc964683f08503b93 Mon Sep 17 00:00:00 2001
From: Zoe Wang <33073555+zoewangg@users.noreply.github.com>
Date: Thu, 11 Jun 2026 11:45:07 -0700
Subject: [PATCH 5/8] Use logger instead of System.err for thread dump
Routes the hang-time thread dump through Logger.error so it lands in the
SDK's configured appender alongside the lifecycle logs, instead of racing
with CRT native stderr writes (the surefire "Corrupted channel by directly
writing to native stream" warning). Drops the now-unneeded Regexp
suppression for LongRunningRequestTestSupport; keeps the
NonJavaBaseModuleCheck suppression because java.lang.management is still
used by dumpAllThreads().
---
.../software/amazon/awssdk/checkstyle-suppressions.xml | 6 ++----
.../amazon/awssdk/http/LongRunningRequestTestSupport.java | 5 ++++-
2 files changed, 6 insertions(+), 5 deletions(-)
diff --git a/build-tools/src/main/resources/software/amazon/awssdk/checkstyle-suppressions.xml b/build-tools/src/main/resources/software/amazon/awssdk/checkstyle-suppressions.xml
index d09f3d00488..8988a3bcc74 100644
--- a/build-tools/src/main/resources/software/amazon/awssdk/checkstyle-suppressions.xml
+++ b/build-tools/src/main/resources/software/amazon/awssdk/checkstyle-suppressions.xml
@@ -73,10 +73,8 @@
-
+
-
diff --git a/test/http-client-tests/src/main/java/software/amazon/awssdk/http/LongRunningRequestTestSupport.java b/test/http-client-tests/src/main/java/software/amazon/awssdk/http/LongRunningRequestTestSupport.java
index e8ffd490c5c..098ad470b96 100644
--- a/test/http-client-tests/src/main/java/software/amazon/awssdk/http/LongRunningRequestTestSupport.java
+++ b/test/http-client-tests/src/main/java/software/amazon/awssdk/http/LongRunningRequestTestSupport.java
@@ -28,6 +28,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import software.amazon.awssdk.utils.Logger;
/**
* Shared helpers for the long-running request test suites.
@@ -39,6 +40,8 @@ public final class LongRunningRequestTestSupport {
public static final Duration TIME_BOUND_SAFETY_MARGIN = Duration.ofSeconds(10);
public static final Duration HANG_DELAY = Duration.ofMinutes(1);
+ private static final Logger log = Logger.loggerFor(LongRunningRequestTestSupport.class);
+
private LongRunningRequestTestSupport() {
}
@@ -83,7 +86,7 @@ public static void assertFailsWithinTimeBound(CompletableFuture> future, Durat
throw new AssertionError("Expected request to throw an exception but it completed successfully");
} catch (TimeoutException e) {
// Dump threads BEFORE cancelling the future so the snapshot reflects the hang.
- System.err.println(dumpAllThreads());
+ log.error(() -> dumpAllThreads());
future.cancel(true);
throw new AssertionError(
"Expected request to fail within " + maxWait + " but it was still running - client appears to hang",
From 06c4063baf2937fa4fed556ea94f5f3925dafeaf Mon Sep 17 00:00:00 2001
From: Zoe Wang <33073555+zoewangg@users.noreply.github.com>
Date: Thu, 11 Jun 2026 12:42:57 -0700
Subject: [PATCH 6/8] Address PR review comments from alextwoods
- Add Chunk.reset() and use it in BodyChunkPipe.recycle so the position+length
reset is a single named call.
- Swap the order in SyncRequestBodyPump's InterruptedException handler so
cleanup (pipe.abort()) runs before re-asserting the thread interrupt,
matching the conventional "do work, then re-interrupt last" idiom.
---
.../awssdk/http/crt/internal/request/BodyChunkPipe.java | 3 +--
.../amazon/awssdk/http/crt/internal/request/Chunk.java | 5 +++++
.../http/crt/internal/request/SyncRequestBodyPump.java | 2 +-
3 files changed, 7 insertions(+), 3 deletions(-)
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipe.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipe.java
index 7aa6d31b2d6..c6848f7872c 100644
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipe.java
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipe.java
@@ -239,8 +239,7 @@ int allocatedForTest() {
}
private void recycle(Chunk c) {
- c.pos(0);
- c.len(0);
+ c.reset();
synchronized (freeLock) {
free.push(c);
freeLock.notifyAll();
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/Chunk.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/Chunk.java
index a7e97b292aa..0797b8cbc67 100644
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/Chunk.java
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/Chunk.java
@@ -55,4 +55,9 @@ int len() {
void len(int len) {
this.len = len;
}
+
+ void reset() {
+ this.pos = 0;
+ this.len = 0;
+ }
}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPump.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPump.java
index 015dd0d0ea3..40ebe39902e 100644
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPump.java
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPump.java
@@ -69,9 +69,9 @@ public void pump() throws IOException {
pipe.publish(chunk);
}
} catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
LOG.info(() -> "pump() exiting due to interrupt");
pipe.abort();
+ Thread.currentThread().interrupt();
throw new IOException("Interrupted while writing request body", ie);
}
}
From 3a60e4c503f9346db2341d7908561fdc756cff88 Mon Sep 17 00:00:00 2001
From: Zoe Wang <33073555+zoewangg@users.noreply.github.com>
Date: Thu, 11 Jun 2026 17:34:51 -0700
Subject: [PATCH 7/8] Don't call onAcquireStream when streamFuture failed
Gates crtResponseHandler.onAcquireStream(stream) on throwable == null in
both CrtRequestExecutor and CrtAsyncRequestExecutor. On the failure path,
streamBase is null and onAcquireStream(null) is wrong even though it
happens to be a no-op today: passing null to a method that expects a real
stream is a contract violation. Knowingly diverges from origin/master,
which has the same issue.
---
.../awssdk/http/crt/internal/CrtAsyncRequestExecutor.java | 4 +++-
.../amazon/awssdk/http/crt/internal/CrtRequestExecutor.java | 2 +-
2 files changed, 4 insertions(+), 2 deletions(-)
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java
index ca9e04fa229..b3b6c232261 100644
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java
@@ -82,7 +82,9 @@ private void doExecute(CrtAsyncRequestContext executionContext,
long finalAcquireStartTime = acquireStartTime;
streamFuture.whenComplete((stream, throwable) -> {
- crtResponseHandler.onAcquireStream(stream);
+ if (throwable == null) {
+ crtResponseHandler.onAcquireStream(stream);
+ }
if (shouldPublishMetrics) {
reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime);
}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java
index 5838a0216b0..53ee36bb172 100644
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java
@@ -64,11 +64,11 @@ public Result execute(CrtRequestContext executionContext) {
streamFuture.whenComplete((streamBase, throwable) -> {
if (throwable == null) {
LOG.info(() -> "execute() streamFuture.whenComplete fired with success");
+ crtResponseHandler.onAcquireStream(streamBase);
} else {
LOG.info(() -> "execute() streamFuture.whenComplete fired with throwable=" + throwable.getClass().getName()
+ ": " + throwable.getMessage());
}
- crtResponseHandler.onAcquireStream(streamBase);
if (shouldPublishMetrics) {
reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime);
}
From 54ceeb2364f98a9b69878440fe933e22fa22edba Mon Sep 17 00:00:00 2001
From: Zoe Wang <33073555+zoewangg@users.noreply.github.com>
Date: Thu, 11 Jun 2026 17:54:05 -0700
Subject: [PATCH 8/8] Add request correlation ID to diagnostic logs
(test-controllable)
---
.../awssdk/http/crt/AwsCrtHttpClient.java | 54 ++++++-----
.../http/crt/internal/CrtRequestContext.java | 12 +++
.../http/crt/internal/CrtRequestExecutor.java | 15 +--
.../crt/internal/request/BodyChunkPipe.java | 14 ++-
.../internal/request/CrtRequestAdapter.java | 5 +-
.../internal/request/SyncRequestBodyPump.java | 18 ++--
.../http/LongRunningRequestTestSupport.java | 96 ++++++++++++++++++-
...HttpClientLongRunningRequestTestSuite.java | 58 ++---------
8 files changed, 176 insertions(+), 96 deletions(-)
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java
index dbf58ef21f5..1fa66cb940a 100644
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java
@@ -22,6 +22,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
@@ -104,11 +105,16 @@ public ExecutableHttpRequest prepareRequest(HttpExecuteRequest request) {
* request)
*/
HttpStreamManager streamManager = getOrCreateConnectionPool(poolKey(request.httpRequest()));
+ // Tests may override via x-aws-sdk-test-id so surefire output can be grep'd by request.
+ String reqId = request.httpRequest()
+ .firstMatchingHeader("x-aws-sdk-test-id")
+ .orElseGet(() -> String.format("%08x", ThreadLocalRandom.current().nextInt()));
CrtRequestContext context = CrtRequestContext.builder()
.streamManager(streamManager)
.readBufferSize(this.readBufferSize)
.request(request)
.connectionAcquisitionTimeoutMillis(this.connectionAcquisitionTimeout)
+ .reqId(reqId)
.build();
return new CrtHttpRequest(context);
}
@@ -117,70 +123,74 @@ private static final class CrtHttpRequest implements ExecutableHttpRequest {
private static final Logger LOG = Logger.loggerFor(CrtHttpRequest.class);
private final CrtRequestContext context;
+ private final String reqId;
+ private final String tag;
private volatile CompletableFuture responseFuture;
private volatile SyncRequestBodyPump pump;
private CrtHttpRequest(CrtRequestContext context) {
this.context = context;
+ this.reqId = context.reqId();
+ this.tag = "[reqId=" + reqId + "] ";
}
@Override
public HttpExecuteResponse call() throws IOException {
HttpExecuteResponse.Builder builder = HttpExecuteResponse.builder();
boolean hasBody = context.sdkRequest().contentStreamProvider().isPresent();
- LOG.info(() -> "call() entered, hasBody=" + hasBody);
+ LOG.info(() -> tag + "call() entered, hasBody=" + hasBody);
try {
CrtRequestExecutor.Result result = new CrtRequestExecutor().execute(context);
responseFuture = result.responseFuture();
pump = result.pump();
- LOG.info(() -> "call() executor.execute() returned, streamFuture pending, pump="
+ LOG.info(() -> tag + "call() executor.execute() returned, streamFuture pending, pump="
+ (pump != null ? "non-null" : "null"));
if (pump != null) {
SyncRequestBodyPump pumpRef = pump;
responseFuture.whenComplete((r, t) -> {
if (t != null) {
- LOG.info(() -> "responseFuture hook: invoking pump.abort() (cause="
+ LOG.info(() -> tag + "responseFuture hook: invoking pump.abort() (cause="
+ t.getClass().getSimpleName() + ")");
pumpRef.abort();
}
});
}
- LOG.info(() -> "call() entering waitForStreamAcquired, timeoutMillis="
+ LOG.info(() -> tag + "call() entering waitForStreamAcquired, timeoutMillis="
+ context.connectionAcquisitionTimeoutMillis());
boolean streamAcquired = waitForStreamAcquired(result.streamFuture(),
context.connectionAcquisitionTimeoutMillis());
- LOG.info(() -> "call() waitForStreamAcquired returned " + streamAcquired);
+ LOG.info(() -> tag + "call() waitForStreamAcquired returned " + streamAcquired);
if (pump != null) {
if (streamAcquired) {
- LOG.info(() -> "call() entering pump.pump()");
+ LOG.info(() -> tag + "call() entering pump.pump()");
try {
pump.pump();
- LOG.info(() -> "call() pump.pump() returned");
+ LOG.info(() -> tag + "call() pump.pump() returned");
} catch (IOException ioe) {
- LOG.info(() -> "call() pump.pump() threw IOException: " + ioe.getMessage());
+ LOG.info(() -> tag + "call() pump.pump() threw IOException: " + ioe.getMessage());
responseFuture.completeExceptionally(ioe);
throw ioe;
}
} else {
- LOG.info(() -> "call() invoking pump.abort() (post-wait, streamAcquired=false)");
+ LOG.info(() -> tag + "call() invoking pump.abort() (post-wait, streamAcquired=false)");
pump.abort();
}
}
- LOG.info(() -> "call() entering joinInterruptibly(responseFuture)");
+ LOG.info(() -> tag + "call() entering joinInterruptibly(responseFuture)");
SdkHttpFullResponse response = CompletableFutureUtils.joinInterruptibly(responseFuture);
- LOG.info(() -> "call() responseFuture joined: success");
+ LOG.info(() -> tag + "call() responseFuture joined: success");
builder.response(response);
builder.responseBody(response.content().orElse(null));
- LOG.info(() -> "call() exiting normally");
+ LOG.info(() -> tag + "call() exiting normally");
return builder.build();
} catch (CompletionException e) {
Throwable cause = e.getCause();
- LOG.info(() -> "call() catch CompletionException, cause="
+ LOG.info(() -> tag + "call() catch CompletionException, cause="
+ (cause == null ? "" : cause.getClass().getName() + ": " + cause.getMessage()));
if (responseFuture != null) {
@@ -188,7 +198,7 @@ public HttpExecuteResponse call() throws IOException {
}
if (pump != null) {
- LOG.info(() -> "call() catch invoking pump.abort()");
+ LOG.info(() -> tag + "call() catch invoking pump.abort()");
pump.abort();
}
@@ -210,37 +220,37 @@ public HttpExecuteResponse call() throws IOException {
@Override
public void abort() {
- LOG.info(() -> "abort() called externally");
+ LOG.info(() -> tag + "abort() called externally");
if (responseFuture != null) {
responseFuture.completeExceptionally(new IOException("Request was cancelled"));
}
if (pump != null) {
- LOG.info(() -> "abort() invoking pump.abort()");
+ LOG.info(() -> tag + "abort() invoking pump.abort()");
pump.abort();
}
}
private boolean waitForStreamAcquired(CompletableFuture streamFuture, long timeoutMillis) {
if (streamFuture == null) {
- LOG.info(() -> "waitForStreamAcquired: streamFuture==null, returning false");
+ LOG.info(() -> tag + "waitForStreamAcquired: streamFuture==null, returning false");
return false;
}
- LOG.info(() -> "waitForStreamAcquired: starting, timeout=" + timeoutMillis + "ms");
+ LOG.info(() -> tag + "waitForStreamAcquired: starting, timeout=" + timeoutMillis + "ms");
try {
streamFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
- LOG.info(() -> "waitForStreamAcquired: streamFuture completed normally");
+ LOG.info(() -> tag + "waitForStreamAcquired: streamFuture completed normally");
return true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- LOG.info(() -> "waitForStreamAcquired: interrupted");
+ LOG.info(() -> tag + "waitForStreamAcquired: interrupted");
return false;
} catch (TimeoutException e) {
- LOG.warn(() -> "waitForStreamAcquired: timed out after " + timeoutMillis
+ LOG.warn(() -> tag + "waitForStreamAcquired: timed out after " + timeoutMillis
+ "ms - streamFuture still pending");
return false;
} catch (ExecutionException e) {
Throwable cause = e.getCause();
- LOG.info(() -> "waitForStreamAcquired: streamFuture completed exceptionally: "
+ LOG.info(() -> tag + "waitForStreamAcquired: streamFuture completed exceptionally: "
+ (cause == null ? e.getMessage() : cause.getClass().getName() + ": " + cause.getMessage()));
return false;
}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestContext.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestContext.java
index 76771f859d0..ced8fd55508 100644
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestContext.java
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestContext.java
@@ -27,6 +27,7 @@ public final class CrtRequestContext {
private final HttpStreamManager streamManager;
private final MetricCollector metricCollector;
private final long connectionAcquisitionTimeoutMillis;
+ private final String reqId;
private CrtRequestContext(Builder builder) {
this.request = builder.request;
@@ -34,6 +35,7 @@ private CrtRequestContext(Builder builder) {
this.streamManager = builder.streamManager;
this.metricCollector = request.metricCollector().orElse(null);
this.connectionAcquisitionTimeoutMillis = builder.connectionAcquisitionTimeoutMillis;
+ this.reqId = builder.reqId;
}
public static Builder builder() {
@@ -60,11 +62,16 @@ public long connectionAcquisitionTimeoutMillis() {
return connectionAcquisitionTimeoutMillis;
}
+ public String reqId() {
+ return reqId;
+ }
+
public static final class Builder {
private HttpExecuteRequest request;
private long readBufferSize;
private HttpStreamManager streamManager;
private long connectionAcquisitionTimeoutMillis;
+ private String reqId;
private Builder() {
}
@@ -89,6 +96,11 @@ public Builder connectionAcquisitionTimeoutMillis(long connectionAcquisitionTime
return this;
}
+ public Builder reqId(String reqId) {
+ this.reqId = reqId;
+ return this;
+ }
+
public CrtRequestContext build() {
return new CrtRequestContext(this);
}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java
index 53ee36bb172..9c151477400 100644
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java
@@ -38,6 +38,7 @@ public Result execute(CrtRequestContext executionContext) {
CompletableFuture requestFuture = new CompletableFuture<>();
MetricCollector metricCollector = executionContext.metricCollector();
boolean shouldPublishMetrics = metricCollector != null && !(metricCollector instanceof NoOpMetricCollector);
+ String tag = "[reqId=" + executionContext.reqId() + "] ";
// get acquireStartTime as early as possible for the concurrency timer, but only when metrics are
// enabled since clock_gettime() is a full sys call barrier (multiple mutexes and a hw interrupt).
@@ -47,14 +48,14 @@ public Result execute(CrtRequestContext executionContext) {
InputStreamAdaptingHttpStreamResponseHandler crtResponseHandler =
new InputStreamAdaptingHttpStreamResponseHandler(requestFuture);
SyncCrtRequest syncCrtRequest = CrtRequestAdapter.toCrtRequest(executionContext);
- LOG.info(() -> "execute() acquireStream invoked");
+ LOG.info(() -> tag + "execute() acquireStream invoked");
CompletableFuture streamFuture =
executionContext.streamManager().acquireStream(syncCrtRequest.httpRequest(), crtResponseHandler);
// Evict the connection from the pool on failure so it is not reused.
requestFuture.whenComplete((r, t) -> {
if (t != null) {
- LOG.info(() -> "execute() requestFuture exceptional: closeConnection() (cause="
+ LOG.info(() -> tag + "execute() requestFuture exceptional: closeConnection() (cause="
+ t.getClass().getSimpleName() + ")");
crtResponseHandler.closeConnection();
}
@@ -63,24 +64,24 @@ public Result execute(CrtRequestContext executionContext) {
long finalAcquireStartTime = acquireStartTime;
streamFuture.whenComplete((streamBase, throwable) -> {
if (throwable == null) {
- LOG.info(() -> "execute() streamFuture.whenComplete fired with success");
+ LOG.info(() -> tag + "execute() streamFuture.whenComplete fired with success");
crtResponseHandler.onAcquireStream(streamBase);
} else {
- LOG.info(() -> "execute() streamFuture.whenComplete fired with throwable=" + throwable.getClass().getName()
- + ": " + throwable.getMessage());
+ LOG.info(() -> tag + "execute() streamFuture.whenComplete fired with throwable="
+ + throwable.getClass().getName() + ": " + throwable.getMessage());
}
if (shouldPublishMetrics) {
reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime);
}
if (throwable != null) {
- LOG.info(() -> "execute() completing requestFuture exceptionally from streamFuture failure");
+ LOG.info(() -> tag + "execute() completing requestFuture exceptionally from streamFuture failure");
requestFuture.completeExceptionally(wrapCrtException(throwable));
}
});
return new Result(requestFuture, syncCrtRequest.pump(), streamFuture);
} catch (Throwable t) {
- LOG.info(() -> "execute() outer catch, completing requestFuture exceptionally: " + t.getClass().getName());
+ LOG.info(() -> tag + "execute() outer catch, completing requestFuture exceptionally: " + t.getClass().getName());
requestFuture.completeExceptionally(t);
return new Result(requestFuture, null, null);
}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipe.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipe.java
index c6848f7872c..986eff9fe1a 100644
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipe.java
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipe.java
@@ -64,6 +64,7 @@ enum State {
private final ArrayBlockingQueue ready;
private final Deque free;
private final AtomicReference state = new AtomicReference<>(State.OPEN);
+ private final String tag;
/**
* Guards the free deque, allocated counter, and producer wait/notify protocol. Kept private
* so external code cannot synchronize on the pipe instance and stall the producer.
@@ -75,6 +76,10 @@ enum State {
private Chunk pendingDrain;
BodyChunkPipe(int depth, int chunkSize) {
+ this(depth, chunkSize, "-");
+ }
+
+ BodyChunkPipe(int depth, int chunkSize, String reqId) {
if (depth < 1) {
throw new IllegalArgumentException("depth must be >= 1");
}
@@ -85,6 +90,7 @@ enum State {
this.chunkSize = chunkSize;
this.ready = new ArrayBlockingQueue<>(depth);
this.free = new ArrayDeque<>(depth);
+ this.tag = "[reqId=" + reqId + "] ";
}
/**
@@ -100,7 +106,7 @@ Chunk acquireForFill() throws InterruptedException {
while (true) {
State s = state.get();
if (s == State.ABORTED || s == State.ERROR) {
- LOG.debug(() -> "acquireForFill returning null, state=" + s);
+ LOG.debug(() -> tag + "acquireForFill returning null, state=" + s);
return null;
}
Chunk c = free.pollFirst();
@@ -139,7 +145,7 @@ void publish(Chunk chunk) throws InterruptedException {
*/
void signalEof() {
if (state.compareAndSet(State.OPEN, State.EOF)) {
- LOG.debug(() -> "state OPEN -> EOF");
+ LOG.debug(() -> tag + "state OPEN -> EOF");
}
}
@@ -153,7 +159,7 @@ void signalError(Throwable t) {
// harmless if the CAS later loses (idempotent signal).
error = t;
if (state.compareAndSet(State.OPEN, State.ERROR)) {
- LOG.debug(() -> "state OPEN -> ERROR (" + t.getClass().getSimpleName() + ")");
+ LOG.debug(() -> tag + "state OPEN -> ERROR (" + t.getClass().getSimpleName() + ")");
}
freeLock.notifyAll();
}
@@ -165,7 +171,7 @@ void signalError(Throwable t) {
void abort() {
synchronized (freeLock) {
if (state.compareAndSet(State.OPEN, State.ABORTED)) {
- LOG.debug(() -> "state OPEN -> ABORTED");
+ LOG.debug(() -> tag + "state OPEN -> ABORTED");
ready.clear();
}
freeLock.notifyAll();
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java
index 4a3991c5d44..734c5a35222 100644
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java
@@ -100,9 +100,10 @@ public static SyncCrtRequest toCrtRequest(CrtRequestContext request) {
return new SyncCrtRequest(new HttpRequest(method, finalEncodedPath, crtHeaderArray, null), null);
}
- BodyChunkPipe pipe = new BodyChunkPipe(PIPE_DEPTH, CHUNK_SIZE);
+ String reqId = request.reqId();
+ BodyChunkPipe pipe = new BodyChunkPipe(PIPE_DEPTH, CHUNK_SIZE, reqId);
PipeBackedRequestBodyStream bodyStream = new PipeBackedRequestBodyStream(pipe);
- SyncRequestBodyPump pump = new SyncRequestBodyPump(providerOpt.get(), pipe);
+ SyncRequestBodyPump pump = new SyncRequestBodyPump(providerOpt.get(), pipe, reqId);
HttpRequest crtRequest = new HttpRequest(method, finalEncodedPath, crtHeaderArray, bodyStream);
return new SyncCrtRequest(crtRequest, pump);
}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPump.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPump.java
index 40ebe39902e..254b4127ea3 100644
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPump.java
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPump.java
@@ -32,10 +32,16 @@ public final class SyncRequestBodyPump {
private final ContentStreamProvider contentStreamProvider;
private final BodyChunkPipe pipe;
+ private final String tag;
SyncRequestBodyPump(ContentStreamProvider contentStreamProvider, BodyChunkPipe pipe) {
+ this(contentStreamProvider, pipe, "-");
+ }
+
+ SyncRequestBodyPump(ContentStreamProvider contentStreamProvider, BodyChunkPipe pipe, String reqId) {
this.contentStreamProvider = contentStreamProvider;
this.pipe = pipe;
+ this.tag = "[reqId=" + reqId + "] ";
}
/**
@@ -43,24 +49,24 @@ public final class SyncRequestBodyPump {
* event-loop thread. On EOF signals the pipe normally; on {@link IOException} signals an error and rethrows.
*/
public void pump() throws IOException {
- LOG.info(() -> "pump() entered");
+ LOG.info(() -> tag + "pump() entered");
try (InputStream in = contentStreamProvider.newStream()) {
while (true) {
Chunk chunk = pipe.acquireForFill();
if (chunk == null) {
- LOG.info(() -> "pump() exiting due to abort (acquireForFill returned null)");
+ LOG.info(() -> tag + "pump() exiting due to abort (acquireForFill returned null)");
return;
}
int read;
try {
read = in.read(chunk.data(), 0, chunk.data().length);
} catch (IOException ioe) {
- LOG.info(() -> "pump() exiting due to error: " + ioe.getMessage());
+ LOG.info(() -> tag + "pump() exiting due to error: " + ioe.getMessage());
pipe.signalError(ioe);
throw ioe;
}
if (read < 0) {
- LOG.info(() -> "pump() exiting due to eof");
+ LOG.info(() -> tag + "pump() exiting due to eof");
pipe.signalEof();
return;
}
@@ -69,7 +75,7 @@ public void pump() throws IOException {
pipe.publish(chunk);
}
} catch (InterruptedException ie) {
- LOG.info(() -> "pump() exiting due to interrupt");
+ LOG.info(() -> tag + "pump() exiting due to interrupt");
pipe.abort();
Thread.currentThread().interrupt();
throw new IOException("Interrupted while writing request body", ie);
@@ -80,7 +86,7 @@ public void pump() throws IOException {
* Abort the underlying pipe (e.g., when the caller's {@code call()} is cancelled).
*/
public void abort() {
- LOG.info(() -> "pump.abort() called");
+ LOG.info(() -> tag + "pump.abort() called");
pipe.abort();
}
}
diff --git a/test/http-client-tests/src/main/java/software/amazon/awssdk/http/LongRunningRequestTestSupport.java b/test/http-client-tests/src/main/java/software/amazon/awssdk/http/LongRunningRequestTestSupport.java
index 098ad470b96..21d04a5c944 100644
--- a/test/http-client-tests/src/main/java/software/amazon/awssdk/http/LongRunningRequestTestSupport.java
+++ b/test/http-client-tests/src/main/java/software/amazon/awssdk/http/LongRunningRequestTestSupport.java
@@ -20,12 +20,18 @@
import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
import com.github.tomakehurst.wiremock.junit5.WireMockExtension;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import software.amazon.awssdk.utils.Logger;
@@ -78,27 +84,109 @@ public static void stubHanging(WireMockExtension mockServer) {
.withFixedDelay((int) HANG_DELAY.toMillis())));
}
+ /**
+ * Async-executes a POST against {@code mockServer} on a worker thread. A per-call random
+ * {@code testReqId} is logged BEFORE the request is dispatched and propagated to the SDK via the
+ * {@code x-aws-sdk-test-id} header so the SDK can prefix its lifecycle logs with the same id.
+ */
+ public static TestRequestExecution executeAsync(SdkHttpClient client, WireMockExtension mockServer) {
+ String testReqId = "test-" + String.format("%08x", ThreadLocalRandom.current().nextInt());
+ log.info(() -> "TEST REQUEST ID: " + testReqId + " (dispatching)");
+ CompletableFuture future = CompletableFuture.supplyAsync(() -> {
+ executeRequest(client, mockServer, testReqId);
+ return null;
+ });
+ return new TestRequestExecution(testReqId, future);
+ }
+
+ private static void executeRequest(SdkHttpClient client, WireMockExtension mockServer, String testReqId) {
+ URI uri = URI.create("http://localhost:" + mockServer.getPort());
+ SdkHttpFullRequest request = SdkHttpFullRequest.builder()
+ .uri(uri)
+ .method(SdkHttpMethod.POST)
+ .putHeader("Host", uri.getHost())
+ .putHeader("Content-Length", "4")
+ .putHeader("x-aws-sdk-test-id", testReqId)
+ .contentStreamProvider(() -> new ByteArrayInputStream(
+ "Body".getBytes(StandardCharsets.UTF_8)))
+ .build();
+ try {
+ HttpExecuteResponse response = client.prepareRequest(HttpExecuteRequest.builder()
+ .request(request)
+ .contentStreamProvider(
+ request.contentStreamProvider()
+ .orElse(null))
+ .build())
+ .call();
+ response.responseBody().ifPresent(body -> {
+ try {
+ while (body.read() != -1) {
+ // drain body so mid-body timeouts surface
+ }
+ body.close();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ public static void assertFailsWithinTimeBound(TestRequestExecution execution, Duration expectedTimeout) {
+ assertFailsWithinTimeBound(execution.future(), execution.testReqId(), expectedTimeout);
+ }
+
public static void assertFailsWithinTimeBound(CompletableFuture> future, Duration expectedTimeout) {
+ assertFailsWithinTimeBound(future, "(unknown)", expectedTimeout);
+ }
+
+ private static void assertFailsWithinTimeBound(CompletableFuture> future, String testReqId, Duration expectedTimeout) {
Duration maxWait = expectedTimeout.plus(TIME_BOUND_SAFETY_MARGIN);
try {
future.get(maxWait.toMillis(), TimeUnit.MILLISECONDS);
- throw new AssertionError("Expected request to throw an exception but it completed successfully");
+ throw new AssertionError("Expected request " + testReqId
+ + " to throw an exception but it completed successfully");
} catch (TimeoutException e) {
- // Dump threads BEFORE cancelling the future so the snapshot reflects the hang.
+ // Bookend the thread dump with the test reqId so surefire output can be grep'd by request.
+ log.error(() -> "TEST REQUEST ID: " + testReqId + " (timed out, dumping threads)");
log.error(() -> dumpAllThreads());
future.cancel(true);
throw new AssertionError(
- "Expected request to fail within " + maxWait + " but it was still running - client appears to hang",
+ "Expected request " + testReqId + " to fail within " + maxWait
+ + " but it was still running - client appears to hang",
e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- throw new AssertionError("Unexpected interruption while waiting for request to fail", e);
+ throw new AssertionError("Unexpected interruption while waiting for request " + testReqId + " to fail", e);
} catch (ExecutionException e) {
// expected
}
}
+ /**
+ * Bundles a worker future with the per-call test reqId so the assertion helper can reference it in
+ * failure messages.
+ */
+ public static final class TestRequestExecution {
+ private final String testReqId;
+ private final CompletableFuture future;
+
+ TestRequestExecution(String testReqId, CompletableFuture future) {
+ this.testReqId = testReqId;
+ this.future = future;
+ }
+
+ public String testReqId() {
+ return testReqId;
+ }
+
+ public CompletableFuture future() {
+ return future;
+ }
+ }
+
static String dumpAllThreads() {
ThreadMXBean tmx = ManagementFactory.getThreadMXBean();
ThreadInfo[] infos = tmx.dumpAllThreads(true, true);
diff --git a/test/http-client-tests/src/main/java/software/amazon/awssdk/http/SdkHttpClientLongRunningRequestTestSuite.java b/test/http-client-tests/src/main/java/software/amazon/awssdk/http/SdkHttpClientLongRunningRequestTestSuite.java
index 0e52eb19eb4..4403db37a42 100644
--- a/test/http-client-tests/src/main/java/software/amazon/awssdk/http/SdkHttpClientLongRunningRequestTestSuite.java
+++ b/test/http-client-tests/src/main/java/software/amazon/awssdk/http/SdkHttpClientLongRunningRequestTestSuite.java
@@ -19,19 +19,15 @@
import static software.amazon.awssdk.http.LongRunningRequestTestSupport.CONFIGURED_TIMEOUT;
import static software.amazon.awssdk.http.LongRunningRequestTestSupport.HANG_DELAY;
import static software.amazon.awssdk.http.LongRunningRequestTestSupport.assertFailsWithinTimeBound;
+import static software.amazon.awssdk.http.LongRunningRequestTestSupport.executeAsync;
import static software.amazon.awssdk.http.LongRunningRequestTestSupport.stubHanging;
import static software.amazon.awssdk.http.LongRunningRequestTestSupport.stubLongPolling;
import static software.amazon.awssdk.http.LongRunningRequestTestSupport.stubStreamingWithPauses;
import com.github.tomakehurst.wiremock.junit5.WireMockExtension;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.net.URI;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.CompletableFuture;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+import software.amazon.awssdk.http.LongRunningRequestTestSupport.TestRequestExecution;
import software.amazon.awssdk.utils.AttributeMap;
/**
@@ -56,7 +52,7 @@ public void executeWhenReadTimeoutAndServerDelaysResponseFailsWithinTimeoutBound
CONFIGURED_TIMEOUT)
.build());
try {
- assertFailsWithinTimeBound(executeAsync(client), CONFIGURED_TIMEOUT);
+ assertFailsWithinTimeBound(executeAsync(client, mockServer), CONFIGURED_TIMEOUT);
} finally {
client.close();
}
@@ -71,7 +67,7 @@ public void executeWhenReadTimeoutAndStreamingResponsePausesFailsWithinTimeoutBo
CONFIGURED_TIMEOUT)
.build());
try {
- assertFailsWithinTimeBound(executeAsync(client), CONFIGURED_TIMEOUT);
+ assertFailsWithinTimeBound(executeAsync(client, mockServer), CONFIGURED_TIMEOUT);
} finally {
client.close();
}
@@ -89,54 +85,14 @@ public void executeWhenConnectionAcquireTimeoutAndPoolExhaustedFailsWithinTimeou
CONFIGURED_TIMEOUT)
.build());
try {
- CompletableFuture> firstRequest = executeAsync(client);
+ TestRequestExecution firstRequest = executeAsync(client, mockServer);
Thread.sleep(500);
- assertFailsWithinTimeBound(executeAsync(client), CONFIGURED_TIMEOUT);
+ assertFailsWithinTimeBound(executeAsync(client, mockServer), CONFIGURED_TIMEOUT);
- firstRequest.cancel(true);
+ firstRequest.future().cancel(true);
} finally {
client.close();
}
}
-
- private CompletableFuture executeAsync(SdkHttpClient client) {
- return CompletableFuture.supplyAsync(() -> {
- executeRequest(client);
- return null;
- });
- }
-
- private void executeRequest(SdkHttpClient client) {
- URI uri = URI.create("http://localhost:" + mockServer.getPort());
- SdkHttpFullRequest request = SdkHttpFullRequest.builder()
- .uri(uri)
- .method(SdkHttpMethod.POST)
- .putHeader("Host", uri.getHost())
- .putHeader("Content-Length", "4")
- .contentStreamProvider(() -> new ByteArrayInputStream(
- "Body".getBytes(StandardCharsets.UTF_8)))
- .build();
- try {
- HttpExecuteResponse response = client.prepareRequest(HttpExecuteRequest.builder()
- .request(request)
- .contentStreamProvider(
- request.contentStreamProvider()
- .orElse(null))
- .build())
- .call();
- response.responseBody().ifPresent(body -> {
- try {
- while (body.read() != -1) {
- // drain body so mid-body timeouts surface
- }
- body.close();
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- });
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
}