diff --git a/.changes/next-release/bugfix-AWSCRTbasedHTTPClient-d1be626.json b/.changes/next-release/bugfix-AWSCRTbasedHTTPClient-d1be626.json new file mode 100644 index 00000000000..bf864237642 --- /dev/null +++ b/.changes/next-release/bugfix-AWSCRTbasedHTTPClient-d1be626.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "AWS CRT-based HTTP Client", + "contributor": "", + "description": "Fixed an issue where AwsCrtHttpClient (sync) could deadlock when a request body was sourced from an InputStream that depends on the same CRT event loop, for example when piping a GetObject ResponseInputStream into a PutObject body. The InputStream read now happens on the caller thread instead of the CRT event-loop thread." +} diff --git a/build-tools/src/main/resources/software/amazon/awssdk/checkstyle-suppressions.xml b/build-tools/src/main/resources/software/amazon/awssdk/checkstyle-suppressions.xml index 4e81373b0be..8988a3bcc74 100644 --- a/build-tools/src/main/resources/software/amazon/awssdk/checkstyle-suppressions.xml +++ b/build-tools/src/main/resources/software/amazon/awssdk/checkstyle-suppressions.xml @@ -72,4 +72,9 @@ + + + diff --git a/build-tools/src/main/resources/software/amazon/awssdk/spotbugs-suppressions.xml b/build-tools/src/main/resources/software/amazon/awssdk/spotbugs-suppressions.xml index 05606dc6d57..446d747b435 100644 --- a/build-tools/src/main/resources/software/amazon/awssdk/spotbugs-suppressions.xml +++ b/build-tools/src/main/resources/software/amazon/awssdk/spotbugs-suppressions.xml @@ -348,6 +348,15 @@ + + + + + + diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java index 9c6d769e48f..1fa66cb940a 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java @@ -21,9 +21,14 @@ import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import software.amazon.awssdk.annotations.SdkPublicApi; import software.amazon.awssdk.crt.http.HttpException; +import software.amazon.awssdk.crt.http.HttpStreamBase; import software.amazon.awssdk.crt.http.HttpStreamManager; import software.amazon.awssdk.http.ExecutableHttpRequest; import software.amazon.awssdk.http.HttpExecuteRequest; @@ -35,8 +40,10 @@ import software.amazon.awssdk.http.crt.internal.AwsCrtClientBuilderBase; import software.amazon.awssdk.http.crt.internal.CrtRequestContext; import software.amazon.awssdk.http.crt.internal.CrtRequestExecutor; +import software.amazon.awssdk.http.crt.internal.request.SyncRequestBodyPump; import software.amazon.awssdk.utils.AttributeMap; import software.amazon.awssdk.utils.CompletableFutureUtils; +import software.amazon.awssdk.utils.Logger; /** * An implementation of {@link SdkHttpClient} that uses the AWS Common Runtime (CRT) Http Client to communicate with @@ -98,43 +105,103 @@ public ExecutableHttpRequest prepareRequest(HttpExecuteRequest request) { * request) */ HttpStreamManager streamManager = getOrCreateConnectionPool(poolKey(request.httpRequest())); + // Tests may override via x-aws-sdk-test-id so surefire output can be grep'd by request. + String reqId = request.httpRequest() + .firstMatchingHeader("x-aws-sdk-test-id") + .orElseGet(() -> String.format("%08x", ThreadLocalRandom.current().nextInt())); CrtRequestContext context = CrtRequestContext.builder() .streamManager(streamManager) .readBufferSize(this.readBufferSize) .request(request) + .connectionAcquisitionTimeoutMillis(this.connectionAcquisitionTimeout) + .reqId(reqId) .build(); return new CrtHttpRequest(context); } private static final class CrtHttpRequest implements ExecutableHttpRequest { + private static final Logger LOG = Logger.loggerFor(CrtHttpRequest.class); + private final CrtRequestContext context; + private final String reqId; + private final String tag; private volatile CompletableFuture responseFuture; + private volatile SyncRequestBodyPump pump; private CrtHttpRequest(CrtRequestContext context) { this.context = context; + this.reqId = context.reqId(); + this.tag = "[reqId=" + reqId + "] "; } @Override public HttpExecuteResponse call() throws IOException { HttpExecuteResponse.Builder builder = HttpExecuteResponse.builder(); + boolean hasBody = context.sdkRequest().contentStreamProvider().isPresent(); + LOG.info(() -> tag + "call() entered, hasBody=" + hasBody); try { - responseFuture = new CrtRequestExecutor().execute(context); + CrtRequestExecutor.Result result = new CrtRequestExecutor().execute(context); + responseFuture = result.responseFuture(); + pump = result.pump(); + LOG.info(() -> tag + "call() executor.execute() returned, streamFuture pending, pump=" + + (pump != null ? "non-null" : "null")); + + if (pump != null) { + SyncRequestBodyPump pumpRef = pump; + responseFuture.whenComplete((r, t) -> { + if (t != null) { + LOG.info(() -> tag + "responseFuture hook: invoking pump.abort() (cause=" + + t.getClass().getSimpleName() + ")"); + pumpRef.abort(); + } + }); + } + + LOG.info(() -> tag + "call() entering waitForStreamAcquired, timeoutMillis=" + + context.connectionAcquisitionTimeoutMillis()); + boolean streamAcquired = waitForStreamAcquired(result.streamFuture(), + context.connectionAcquisitionTimeoutMillis()); + LOG.info(() -> tag + "call() waitForStreamAcquired returned " + streamAcquired); + + if (pump != null) { + if (streamAcquired) { + LOG.info(() -> tag + "call() entering pump.pump()"); + try { + pump.pump(); + LOG.info(() -> tag + "call() pump.pump() returned"); + } catch (IOException ioe) { + LOG.info(() -> tag + "call() pump.pump() threw IOException: " + ioe.getMessage()); + responseFuture.completeExceptionally(ioe); + throw ioe; + } + } else { + LOG.info(() -> tag + "call() invoking pump.abort() (post-wait, streamAcquired=false)"); + pump.abort(); + } + } + + LOG.info(() -> tag + "call() entering joinInterruptibly(responseFuture)"); SdkHttpFullResponse response = CompletableFutureUtils.joinInterruptibly(responseFuture); + LOG.info(() -> tag + "call() responseFuture joined: success"); builder.response(response); builder.responseBody(response.content().orElse(null)); + LOG.info(() -> tag + "call() exiting normally"); return builder.build(); } catch (CompletionException e) { Throwable cause = e.getCause(); + LOG.info(() -> tag + "call() catch CompletionException, cause=" + + (cause == null ? "" : cause.getClass().getName() + ": " + cause.getMessage())); - // Complete the future exceptionally to trigger connection cleanup in the response handler. - // Handles thread-interrupt case where joinInterruptibly throws due to - // InterruptedException. Without this, the - // Ensures that closeConnection() is invoked to prevent leaking the connection from the pool. if (responseFuture != null) { responseFuture.completeExceptionally(cause != null ? cause : e); } + if (pump != null) { + LOG.info(() -> tag + "call() catch invoking pump.abort()"); + pump.abort(); + } + if (cause instanceof IOException) { throw (IOException) cause; } @@ -153,9 +220,40 @@ public HttpExecuteResponse call() throws IOException { @Override public void abort() { + LOG.info(() -> tag + "abort() called externally"); if (responseFuture != null) { responseFuture.completeExceptionally(new IOException("Request was cancelled")); } + if (pump != null) { + LOG.info(() -> tag + "abort() invoking pump.abort()"); + pump.abort(); + } + } + + private boolean waitForStreamAcquired(CompletableFuture streamFuture, long timeoutMillis) { + if (streamFuture == null) { + LOG.info(() -> tag + "waitForStreamAcquired: streamFuture==null, returning false"); + return false; + } + LOG.info(() -> tag + "waitForStreamAcquired: starting, timeout=" + timeoutMillis + "ms"); + try { + streamFuture.get(timeoutMillis, TimeUnit.MILLISECONDS); + LOG.info(() -> tag + "waitForStreamAcquired: streamFuture completed normally"); + return true; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.info(() -> tag + "waitForStreamAcquired: interrupted"); + return false; + } catch (TimeoutException e) { + LOG.warn(() -> tag + "waitForStreamAcquired: timed out after " + timeoutMillis + + "ms - streamFuture still pending"); + return false; + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + LOG.info(() -> tag + "waitForStreamAcquired: streamFuture completed exceptionally: " + + (cause == null ? e.getMessage() : cause.getClass().getName() + ": " + cause.getMessage())); + return false; + } } } diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientBase.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientBase.java index 50689d2236d..afe19159e3c 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientBase.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientBase.java @@ -61,6 +61,7 @@ abstract class AwsCrtHttpClientBase implements SdkAutoCloseable { protected final long readBufferSize; protected final Protocol protocol; + protected final long connectionAcquisitionTimeout; private final Map connectionPools = new ConcurrentHashMap<>(); private final LinkedList ownedSubResources = new LinkedList<>(); private final ClientBootstrap bootstrap; @@ -70,7 +71,6 @@ abstract class AwsCrtHttpClientBase implements SdkAutoCloseable { private final HttpMonitoringOptions monitoringOptions; private final long maxConnectionIdleInMilliseconds; private final int maxStreamsPerEndpoint; - private final long connectionAcquisitionTimeout; private final TlsContextOptions tlsContextOptions; private boolean isClosed = false; diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java index ca9e04fa229..b3b6c232261 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java @@ -82,7 +82,9 @@ private void doExecute(CrtAsyncRequestContext executionContext, long finalAcquireStartTime = acquireStartTime; streamFuture.whenComplete((stream, throwable) -> { - crtResponseHandler.onAcquireStream(stream); + if (throwable == null) { + crtResponseHandler.onAcquireStream(stream); + } if (shouldPublishMetrics) { reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime); } diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestContext.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestContext.java index ba97cc3466e..ced8fd55508 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestContext.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestContext.java @@ -26,12 +26,16 @@ public final class CrtRequestContext { private final long readBufferSize; private final HttpStreamManager streamManager; private final MetricCollector metricCollector; + private final long connectionAcquisitionTimeoutMillis; + private final String reqId; private CrtRequestContext(Builder builder) { this.request = builder.request; this.readBufferSize = builder.readBufferSize; this.streamManager = builder.streamManager; this.metricCollector = request.metricCollector().orElse(null); + this.connectionAcquisitionTimeoutMillis = builder.connectionAcquisitionTimeoutMillis; + this.reqId = builder.reqId; } public static Builder builder() { @@ -54,10 +58,20 @@ public MetricCollector metricCollector() { return metricCollector; } + public long connectionAcquisitionTimeoutMillis() { + return connectionAcquisitionTimeoutMillis; + } + + public String reqId() { + return reqId; + } + public static final class Builder { private HttpExecuteRequest request; private long readBufferSize; private HttpStreamManager streamManager; + private long connectionAcquisitionTimeoutMillis; + private String reqId; private Builder() { } @@ -77,6 +91,16 @@ public Builder streamManager(HttpStreamManager streamManager) { return this; } + public Builder connectionAcquisitionTimeoutMillis(long connectionAcquisitionTimeoutMillis) { + this.connectionAcquisitionTimeoutMillis = connectionAcquisitionTimeoutMillis; + return this; + } + + public Builder reqId(String reqId) { + this.reqId = reqId; + return this; + } + public CrtRequestContext build() { return new CrtRequestContext(this); } diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java index a1283236dc1..9c151477400 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java @@ -20,69 +20,106 @@ import java.util.concurrent.CompletableFuture; import software.amazon.awssdk.annotations.SdkInternalApi; -import software.amazon.awssdk.crt.http.HttpRequest; import software.amazon.awssdk.crt.http.HttpStreamBase; import software.amazon.awssdk.http.SdkHttpFullResponse; import software.amazon.awssdk.http.crt.internal.request.CrtRequestAdapter; +import software.amazon.awssdk.http.crt.internal.request.CrtRequestAdapter.SyncCrtRequest; +import software.amazon.awssdk.http.crt.internal.request.SyncRequestBodyPump; import software.amazon.awssdk.http.crt.internal.response.InputStreamAdaptingHttpStreamResponseHandler; import software.amazon.awssdk.metrics.MetricCollector; import software.amazon.awssdk.metrics.NoOpMetricCollector; +import software.amazon.awssdk.utils.Logger; @SdkInternalApi public final class CrtRequestExecutor { + private static final Logger LOG = Logger.loggerFor(CrtRequestExecutor.class); - public CompletableFuture execute(CrtRequestContext executionContext) { + public Result execute(CrtRequestContext executionContext) { CompletableFuture requestFuture = new CompletableFuture<>(); + MetricCollector metricCollector = executionContext.metricCollector(); + boolean shouldPublishMetrics = metricCollector != null && !(metricCollector instanceof NoOpMetricCollector); + String tag = "[reqId=" + executionContext.reqId() + "] "; + + // get acquireStartTime as early as possible for the concurrency timer, but only when metrics are + // enabled since clock_gettime() is a full sys call barrier (multiple mutexes and a hw interrupt). + long acquireStartTime = shouldPublishMetrics ? System.nanoTime() : 0; try { - doExecute(executionContext, requestFuture); + InputStreamAdaptingHttpStreamResponseHandler crtResponseHandler = + new InputStreamAdaptingHttpStreamResponseHandler(requestFuture); + SyncCrtRequest syncCrtRequest = CrtRequestAdapter.toCrtRequest(executionContext); + LOG.info(() -> tag + "execute() acquireStream invoked"); + CompletableFuture streamFuture = + executionContext.streamManager().acquireStream(syncCrtRequest.httpRequest(), crtResponseHandler); + + // Evict the connection from the pool on failure so it is not reused. + requestFuture.whenComplete((r, t) -> { + if (t != null) { + LOG.info(() -> tag + "execute() requestFuture exceptional: closeConnection() (cause=" + + t.getClass().getSimpleName() + ")"); + crtResponseHandler.closeConnection(); + } + }); + + long finalAcquireStartTime = acquireStartTime; + streamFuture.whenComplete((streamBase, throwable) -> { + if (throwable == null) { + LOG.info(() -> tag + "execute() streamFuture.whenComplete fired with success"); + crtResponseHandler.onAcquireStream(streamBase); + } else { + LOG.info(() -> tag + "execute() streamFuture.whenComplete fired with throwable=" + + throwable.getClass().getName() + ": " + throwable.getMessage()); + } + if (shouldPublishMetrics) { + reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime); + } + if (throwable != null) { + LOG.info(() -> tag + "execute() completing requestFuture exceptionally from streamFuture failure"); + requestFuture.completeExceptionally(wrapCrtException(throwable)); + } + }); + + return new Result(requestFuture, syncCrtRequest.pump(), streamFuture); } catch (Throwable t) { + LOG.info(() -> tag + "execute() outer catch, completing requestFuture exceptionally: " + t.getClass().getName()); requestFuture.completeExceptionally(t); + return new Result(requestFuture, null, null); } - - return requestFuture; } - private void doExecute(CrtRequestContext executionContext, CompletableFuture requestFuture) { - MetricCollector metricCollector = executionContext.metricCollector(); - boolean shouldPublishMetrics = metricCollector != null && !(metricCollector instanceof NoOpMetricCollector); - - long acquireStartTime = 0; - - if (shouldPublishMetrics) { - // go ahead and get acquireStartTime for the concurrency timer as early as possible, - // so it's as accurate as possible, but only do it in a branch since clock_gettime() - // results in a full sys call barrier (multiple mutexes and a hw interrupt). - acquireStartTime = System.nanoTime(); + /** + * Result of {@link #execute(CrtRequestContext)}: bundles the response future with the optional + * caller-thread body pump (null when the request has no body) and the future that completes + * when the CRT stream has been acquired from the connection pool. + */ + public static final class Result { + private final CompletableFuture responseFuture; + private final SyncRequestBodyPump pump; + private final CompletableFuture streamFuture; + + Result(CompletableFuture responseFuture, + SyncRequestBodyPump pump, + CompletableFuture streamFuture) { + this.responseFuture = responseFuture; + this.pump = pump; + this.streamFuture = streamFuture; } - InputStreamAdaptingHttpStreamResponseHandler crtResponseHandler = - new InputStreamAdaptingHttpStreamResponseHandler(requestFuture); - - HttpRequest crtRequest = CrtRequestAdapter.toCrtRequest(executionContext); - - CompletableFuture streamFuture = - executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler); - - // Evict the connection from the pool on failure so it is not reused. - requestFuture.whenComplete((r, t) -> { - if (t != null) { - crtResponseHandler.closeConnection(); - } - }); - - long finalAcquireStartTime = acquireStartTime; + public CompletableFuture responseFuture() { + return responseFuture; + } - streamFuture.whenComplete((streamBase, throwable) -> { - crtResponseHandler.onAcquireStream(streamBase); - if (shouldPublishMetrics) { - reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime); - } + public SyncRequestBodyPump pump() { + return pump; + } - if (throwable != null) { - Throwable toThrow = wrapCrtException(throwable); - requestFuture.completeExceptionally(toThrow); - } - }); + /** + * Future that completes when the CRT stream has been acquired (or acquisition has failed). + * The caller blocks on this before running the body pump so per-request body buffers are + * not allocated while a request is queued on the connection pool. + */ + public CompletableFuture streamFuture() { + return streamFuture; + } } } diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipe.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipe.java new file mode 100644 index 00000000000..986eff9fe1a --- /dev/null +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipe.java @@ -0,0 +1,254 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.crt.internal.request; + +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.annotations.SdkTestInternalApi; +import software.amazon.awssdk.utils.Logger; + +/** + * Bounded producer/consumer hand-off between the caller thread (producer) and the CRT event-loop thread (consumer). + * + *

The producer reads from the customer's {@code InputStream} and {@link #publish(Chunk) publishes} chunks + * into a bounded {@link ArrayBlockingQueue}. The consumer drains those chunks via {@link #pollDrain(ByteBuffer)}, + * which is non-blocking: if no data is ready the consumer returns 0 bytes and CRT reschedules itself via + * {@code aws_channel_schedule_task_now}. + * + *

Drained chunks are returned to a free {@link ArrayDeque} (LIFO for cache hotness) guarded by this + * monitor. The producer parks on this monitor when the free deque is empty, providing back-pressure. + * + *

Chunk byte[] buffers are allocated lazily on the producer's first {@link #acquireForFill()}, not in + * the constructor. This keeps per-request heap minimal while a request is queued on the CRT connection + * pool waiting for a stream: the pipe object exists but its backing buffers do not. + * + *

State machine: {@code OPEN -> {EOF | ERROR | ABORTED}}. Transitions are one-way. + */ +@SdkInternalApi +final class BodyChunkPipe { + private static final Logger LOG = Logger.loggerFor(BodyChunkPipe.class); + + enum State { + OPEN, + EOF, + ERROR, + ABORTED + } + + /** + * Defense-in-depth wait timeout for {@link #acquireForFill()}. Even if a code path forgets + * to call {@link #abort()}, a parked producer wakes every {@value} ms to re-check state. + * Spurious wakeups are harmless. + */ + private static final long ACQUIRE_WAIT_TIMEOUT_MS = 50L; + + private final int depth; + private final int chunkSize; + private final ArrayBlockingQueue ready; + private final Deque free; + private final AtomicReference state = new AtomicReference<>(State.OPEN); + private final String tag; + /** + * Guards the free deque, allocated counter, and producer wait/notify protocol. Kept private + * so external code cannot synchronize on the pipe instance and stall the producer. + */ + private final Object freeLock = new Object(); + + private int allocated; + private volatile Throwable error; + private Chunk pendingDrain; + + BodyChunkPipe(int depth, int chunkSize) { + this(depth, chunkSize, "-"); + } + + BodyChunkPipe(int depth, int chunkSize, String reqId) { + if (depth < 1) { + throw new IllegalArgumentException("depth must be >= 1"); + } + if (chunkSize < 1) { + throw new IllegalArgumentException("chunkSize must be >= 1"); + } + this.depth = depth; + this.chunkSize = chunkSize; + this.ready = new ArrayBlockingQueue<>(depth); + this.free = new ArrayDeque<>(depth); + this.tag = "[reqId=" + reqId + "] "; + } + + /** + * Producer side: acquire a chunk to fill. Blocks if all chunks are currently in flight. + * Returns {@code null} only if the pipe was aborted while the producer was waiting. + * + *

Allocates the chunk's backing byte[] on first use up to the configured depth. This keeps the + * per-request footprint minimal until the producer actually starts pumping (i.e., until after the + * CRT stream has been acquired). + */ + Chunk acquireForFill() throws InterruptedException { + synchronized (freeLock) { + while (true) { + State s = state.get(); + if (s == State.ABORTED || s == State.ERROR) { + LOG.debug(() -> tag + "acquireForFill returning null, state=" + s); + return null; + } + Chunk c = free.pollFirst(); + if (c != null) { + return c; + } + if (allocated < depth) { + allocated++; + return new Chunk(chunkSize); + } + freeLock.wait(ACQUIRE_WAIT_TIMEOUT_MS); + } + } + } + + /** + * Producer side: publish a filled chunk to the consumer. Caller must have set + * {@link Chunk#len(int)} before calling. + * + *

If the chunk is empty (zero-length read), it is recycled back to the free deque rather than + * pushed to the ready queue: an empty chunk would otherwise be leaked from the bounded pool, and + * the consumer would interpret it as a no-op anyway. + */ + void publish(Chunk chunk) throws InterruptedException { + if (chunk.len() == 0) { + recycle(chunk); + return; + } + // ready.put() blocks if the queue is full, but the queue capacity == pool size, + // so this can only block briefly while the consumer drains. + ready.put(chunk); + } + + /** + * Producer side: signal end-of-stream. Idempotent. + */ + void signalEof() { + if (state.compareAndSet(State.OPEN, State.EOF)) { + LOG.debug(() -> tag + "state OPEN -> EOF"); + } + } + + /** + * Producer side: signal a fatal producer-side error. Idempotent. + */ + void signalError(Throwable t) { + synchronized (freeLock) { + // Publish the cause BEFORE flipping state so a consumer's lock-free read in pollDrain + // never observes state==ERROR with error==null. The volatile write to `error` is + // harmless if the CAS later loses (idempotent signal). + error = t; + if (state.compareAndSet(State.OPEN, State.ERROR)) { + LOG.debug(() -> tag + "state OPEN -> ERROR (" + t.getClass().getSimpleName() + ")"); + } + freeLock.notifyAll(); + } + } + + /** + * External-cancel: clear ready queue, flip state, wake producer. + */ + void abort() { + synchronized (freeLock) { + if (state.compareAndSet(State.OPEN, State.ABORTED)) { + LOG.debug(() -> tag + "state OPEN -> ABORTED"); + ready.clear(); + } + freeLock.notifyAll(); + } + } + + /** + * Consumer side: drain bytes into {@code dst}. NEVER blocks. + * + *

Single-consumer: CRT invokes this only on the request's outgoing-stream task, which is + * scheduled serially on one event-loop thread per stream. {@code pendingDrain} is therefore + * not volatile - it is written and read by that single consumer thread. + * + * @return number of bytes drained, or {@code -1} on EOF with no remaining data. + * @throws RuntimeException if the pipe is in ERROR or ABORTED state with no remaining data. + */ + int pollDrain(ByteBuffer dst) { + int totalBytesConsumed = 0; + while (dst.hasRemaining()) { + if (pendingDrain == null) { + pendingDrain = ready.poll(); + } + if (pendingDrain == null) { + switch (state.get()) { + case ERROR: + throw new RuntimeException("Producer failed", error); + case ABORTED: + throw new RuntimeException("Request body stream was aborted"); + case EOF: + return totalBytesConsumed > 0 ? totalBytesConsumed : -1; + case OPEN: + default: + // OPEN with empty queue: return what we have (possibly 0); CRT will retry. + return totalBytesConsumed; + } + } + int n = Math.min(dst.remaining(), pendingDrain.len() - pendingDrain.pos()); + dst.put(pendingDrain.data(), pendingDrain.pos(), n); + pendingDrain.pos(pendingDrain.pos() + n); + totalBytesConsumed += n; + if (pendingDrain.pos() >= pendingDrain.len()) { + // The chunk has been fully copied into dst, so we return it to the free deque + // (and notify the producer in case it was waiting). This is what bounds the pool: + // chunks only re-enter the producer pool after the consumer has drained them. + Chunk drained = pendingDrain; + pendingDrain = null; + recycle(drained); + } + } + return totalBytesConsumed; + } + + /** + * Visible-for-test / test-only helper: current pipe state. + */ + @SdkTestInternalApi + State state() { + return state.get(); + } + + /** + * Visible-for-test / test-only helper: number of {@link Chunk} buffers minted so far. The pipe + * lazily allocates chunks on the producer's first {@link #acquireForFill()}, so this is 0 until + * the producer starts pumping and grows up to {@code depth}. + */ + @SdkTestInternalApi + int allocatedForTest() { + synchronized (freeLock) { + return allocated; + } + } + + private void recycle(Chunk c) { + c.reset(); + synchronized (freeLock) { + free.push(c); + freeLock.notifyAll(); + } + } +} diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/Chunk.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/Chunk.java new file mode 100644 index 00000000000..0797b8cbc67 --- /dev/null +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/Chunk.java @@ -0,0 +1,63 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.crt.internal.request; + +import software.amazon.awssdk.annotations.SdkInternalApi; + +/** + * A reusable byte buffer carrying request body data through the {@link BodyChunkPipe}. + * + *

Internal to this package; used only by {@link BodyChunkPipe} and {@link SyncRequestBodyPump}. + * + *

{@code pos} and {@code len} are intentionally non-volatile: hand-off between the producer and + * consumer always goes through the {@code ArrayBlockingQueue} (or the {@code freeLock} monitor on + * recycle), both of which provide release/acquire happens-before for the field writes. + */ +@SdkInternalApi +final class Chunk { + private final byte[] data; + private int pos; + private int len; + + Chunk(int chunkSize) { + this.data = new byte[chunkSize]; + } + + byte[] data() { + return data; + } + + int pos() { + return pos; + } + + void pos(int pos) { + this.pos = pos; + } + + int len() { + return len; + } + + void len(int len) { + this.len = len; + } + + void reset() { + this.pos = 0; + this.len = 0; + } +} diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java index 8672d80b0d1..734c5a35222 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java @@ -23,6 +23,7 @@ import software.amazon.awssdk.crt.http.HttpHeader; import software.amazon.awssdk.crt.http.HttpRequest; import software.amazon.awssdk.crt.http.HttpRequestBase; +import software.amazon.awssdk.http.ContentStreamProvider; import software.amazon.awssdk.http.Header; import software.amazon.awssdk.http.HttpExecuteRequest; import software.amazon.awssdk.http.Protocol; @@ -33,6 +34,16 @@ @SdkInternalApi public final class CrtRequestAdapter { + /** + * Per-chunk size used by the sync request-body pipe. + */ + private static final int CHUNK_SIZE = 128 * 1024; + + /** + * Number of in-flight chunks the pipe holds. + */ + private static final int PIPE_DEPTH = 4; + private CrtRequestAdapter() { } @@ -60,7 +71,12 @@ public static HttpRequestBase toAsyncCrtRequest(CrtAsyncRequestContext request) crtRequestBodyAdapter); } - public static HttpRequest toCrtRequest(CrtRequestContext request) { + /** + * Build the CRT request for the sync path. When the SDK request has a body, this also constructs the + * {@link BodyChunkPipe} and a {@link SyncRequestBodyPump}; the caller thread is expected to drive + * the pump after the stream is activated. + */ + public static SyncCrtRequest toCrtRequest(CrtRequestContext request) { HttpExecuteRequest sdkExecuteRequest = request.sdkRequest(); SdkHttpRequest sdkRequest = sdkExecuteRequest.httpRequest(); @@ -78,14 +94,40 @@ public static HttpRequest toCrtRequest(CrtRequestContext request) { HttpHeader[] crtHeaderArray = asArray(createHttpHeaderList(sdkRequest.getUri(), sdkExecuteRequest)); String finalEncodedPath = encodedPath + encodedQueryString; - return sdkExecuteRequest.contentStreamProvider() - .map(provider -> new HttpRequest(method, - finalEncodedPath, - crtHeaderArray, - new CrtRequestInputStreamAdapter(provider))) - .orElse(new HttpRequest(method, - finalEncodedPath, - crtHeaderArray, null)); + + Optional providerOpt = sdkExecuteRequest.contentStreamProvider(); + if (!providerOpt.isPresent()) { + return new SyncCrtRequest(new HttpRequest(method, finalEncodedPath, crtHeaderArray, null), null); + } + + String reqId = request.reqId(); + BodyChunkPipe pipe = new BodyChunkPipe(PIPE_DEPTH, CHUNK_SIZE, reqId); + PipeBackedRequestBodyStream bodyStream = new PipeBackedRequestBodyStream(pipe); + SyncRequestBodyPump pump = new SyncRequestBodyPump(providerOpt.get(), pipe, reqId); + HttpRequest crtRequest = new HttpRequest(method, finalEncodedPath, crtHeaderArray, bodyStream); + return new SyncCrtRequest(crtRequest, pump); + } + + /** + * Holder returned from {@link #toCrtRequest(CrtRequestContext)} bundling the CRT-side request and the + * caller-thread producer pump (null when the SDK request has no body). + */ + public static final class SyncCrtRequest { + private final HttpRequest httpRequest; + private final SyncRequestBodyPump pump; + + SyncCrtRequest(HttpRequest httpRequest, SyncRequestBodyPump pump) { + this.httpRequest = httpRequest; + this.pump = pump; + } + + public HttpRequest httpRequest() { + return httpRequest; + } + + public SyncRequestBodyPump pump() { + return pump; + } } private static HttpHeader[] asArray(List crtHeaderList) { diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestInputStreamAdapter.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestInputStreamAdapter.java deleted file mode 100644 index 68f418b9e1d..00000000000 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestInputStreamAdapter.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package software.amazon.awssdk.http.crt.internal.request; - -import static java.lang.Math.min; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import software.amazon.awssdk.annotations.SdkInternalApi; -import software.amazon.awssdk.crt.http.HttpRequestBodyStream; -import software.amazon.awssdk.http.ContentStreamProvider; - -@SdkInternalApi -final class CrtRequestInputStreamAdapter implements HttpRequestBodyStream { - private static final int READ_BUFFER_SIZE = 16 * 1024; - - private final ContentStreamProvider provider; - private volatile InputStream providerStream; - private final byte[] readBuffer = new byte[READ_BUFFER_SIZE]; - - CrtRequestInputStreamAdapter(ContentStreamProvider provider) { - this.provider = provider; - } - - @Override - public boolean sendRequestBody(ByteBuffer bodyBytesOut) { - int read; - - try { - if (providerStream == null) { - createNewStream(); - } - - int toRead = min(READ_BUFFER_SIZE, bodyBytesOut.remaining()); - read = providerStream.read(readBuffer, 0, toRead); - - if (read > 0) { - bodyBytesOut.put(readBuffer, 0, read); - } - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } - - return read < 0; - } - - @Override - public boolean resetPosition() { - try { - createNewStream(); - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } - - return true; - } - - private void createNewStream() throws IOException { - if (providerStream != null) { - providerStream.close(); - } - providerStream = provider.newStream(); - } -} diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/PipeBackedRequestBodyStream.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/PipeBackedRequestBodyStream.java new file mode 100644 index 00000000000..a2dd78eea0c --- /dev/null +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/PipeBackedRequestBodyStream.java @@ -0,0 +1,49 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.crt.internal.request; + +import java.nio.ByteBuffer; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.crt.http.HttpRequestBodyStream; + +/** + * A {@link HttpRequestBodyStream} adapter whose {@link #sendRequestBody(ByteBuffer)} drains bytes from a + * {@link BodyChunkPipe} that is fed by the caller thread. The pull callback NEVER blocks: if no data is ready, + * it returns 0 bytes and CRT reschedules the outgoing-stream task via {@code aws_channel_schedule_task_now}, + * allowing other event-loop tasks (such as a concurrent GET response delivery) to run before the retry. + */ +@SdkInternalApi +final class PipeBackedRequestBodyStream implements HttpRequestBodyStream { + + private final BodyChunkPipe pipe; + + PipeBackedRequestBodyStream(BodyChunkPipe pipe) { + this.pipe = pipe; + } + + @Override + public boolean sendRequestBody(ByteBuffer bodyBytesOut) { + int drained = pipe.pollDrain(bodyBytesOut); + return drained < 0; + } + + @Override + public boolean resetPosition() { + // The SDK retry layer (RetryableStage) handles request-level retries by calling prepareRequest() again, + // CRT does not currently exercise resetPosition for HTTP/1.1, so opting out is safe in practice. + return false; + } +} diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPump.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPump.java new file mode 100644 index 00000000000..254b4127ea3 --- /dev/null +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPump.java @@ -0,0 +1,92 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.crt.internal.request; + +import java.io.IOException; +import java.io.InputStream; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.http.ContentStreamProvider; +import software.amazon.awssdk.utils.Logger; + +/** + * Caller-thread producer that reads from the customer's {@link InputStream} and publishes chunks to a + * {@link BodyChunkPipe}. Runs on the caller (sync) thread between stream activation and + * {@code responseFuture.join()}, ensuring the blocking {@code read()} happens off the CRT event loop. + */ +@SdkInternalApi +public final class SyncRequestBodyPump { + private static final Logger LOG = Logger.loggerFor(SyncRequestBodyPump.class); + + private final ContentStreamProvider contentStreamProvider; + private final BodyChunkPipe pipe; + private final String tag; + + SyncRequestBodyPump(ContentStreamProvider contentStreamProvider, BodyChunkPipe pipe) { + this(contentStreamProvider, pipe, "-"); + } + + SyncRequestBodyPump(ContentStreamProvider contentStreamProvider, BodyChunkPipe pipe, String reqId) { + this.contentStreamProvider = contentStreamProvider; + this.pipe = pipe; + this.tag = "[reqId=" + reqId + "] "; + } + + /** + * Pump the entire input stream into the pipe. Runs on the caller thread; never invoked on the CRT + * event-loop thread. On EOF signals the pipe normally; on {@link IOException} signals an error and rethrows. + */ + public void pump() throws IOException { + LOG.info(() -> tag + "pump() entered"); + try (InputStream in = contentStreamProvider.newStream()) { + while (true) { + Chunk chunk = pipe.acquireForFill(); + if (chunk == null) { + LOG.info(() -> tag + "pump() exiting due to abort (acquireForFill returned null)"); + return; + } + int read; + try { + read = in.read(chunk.data(), 0, chunk.data().length); + } catch (IOException ioe) { + LOG.info(() -> tag + "pump() exiting due to error: " + ioe.getMessage()); + pipe.signalError(ioe); + throw ioe; + } + if (read < 0) { + LOG.info(() -> tag + "pump() exiting due to eof"); + pipe.signalEof(); + return; + } + chunk.pos(0); + chunk.len(read); + pipe.publish(chunk); + } + } catch (InterruptedException ie) { + LOG.info(() -> tag + "pump() exiting due to interrupt"); + pipe.abort(); + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while writing request body", ie); + } + } + + /** + * Abort the underlying pipe (e.g., when the caller's {@code call()} is cancelled). + */ + public void abort() { + LOG.info(() -> tag + "pump.abort() called"); + pipe.abort(); + } +} diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientWireMockTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientWireMockTest.java index ce5d778f06a..5171451a4f0 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientWireMockTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientWireMockTest.java @@ -17,8 +17,13 @@ import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; import static com.github.tomakehurst.wiremock.client.WireMock.any; +import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; +import static com.github.tomakehurst.wiremock.client.WireMock.equalToIgnoreCase; +import static com.github.tomakehurst.wiremock.client.WireMock.put; +import static com.github.tomakehurst.wiremock.client.WireMock.putRequestedFor; import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo; +import static com.github.tomakehurst.wiremock.client.WireMock.verify; import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig; import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; import static org.assertj.core.api.Assertions.assertThat; @@ -26,13 +31,21 @@ import static software.amazon.awssdk.http.SdkHttpConfigurationOption.PROTOCOL; import static software.amazon.awssdk.http.crt.CrtHttpClientTestUtils.createRequest; +import com.github.tomakehurst.wiremock.http.Fault; import com.github.tomakehurst.wiremock.junit.WireMockRule; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; @@ -45,6 +58,8 @@ import software.amazon.awssdk.http.HttpMetric; import software.amazon.awssdk.http.Protocol; import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.http.SdkHttpFullRequest; +import software.amazon.awssdk.http.SdkHttpMethod; import software.amazon.awssdk.http.SdkHttpRequest; import software.amazon.awssdk.metrics.MetricCollection; import software.amazon.awssdk.metrics.MetricCollector; @@ -133,6 +148,294 @@ public void abortRequest_shouldFailTheExceptionWithIOException() throws Exceptio } } + @Test + public void putRequest_withInputStreamBody_serverReceivesBody() throws Exception { + try (SdkHttpClient client = AwsCrtHttpClient.create()) { + String body = "hello pull pump"; + byte[] bodyBytes = body.getBytes(StandardCharsets.UTF_8); + URI uri = URI.create("http://localhost:" + mockServer.port()); + stubFor(put(urlPathEqualTo("/sink")).willReturn(aResponse().withStatus(200))); + + SdkHttpFullRequest request = SdkHttpFullRequest.builder() + .uri(uri) + .method(SdkHttpMethod.PUT) + .encodedPath("/sink") + .putHeader("Host", uri.getHost()) + .putHeader("Content-Length", Integer.toString(bodyBytes.length)) + .build(); + + HttpExecuteRequest executeRequest = HttpExecuteRequest.builder() + .request(request) + .contentStreamProvider(() -> new ByteArrayInputStream(bodyBytes)) + .build(); + + HttpExecuteResponse response = client.prepareRequest(executeRequest).call(); + + assertThat(response.httpResponse().statusCode()).isEqualTo(200); + verify(putRequestedFor(urlPathEqualTo("/sink")) + .withHeader("Content-Length", equalTo(Integer.toString(bodyBytes.length))) + .withRequestBody(equalToIgnoreCase(body))); + } + } + + @Test + public void inputStreamThrows_connectionReturnedToPool_subsequentRequestSucceeds() throws Exception { + // Bound the pool to a single connection: if the failed request leaks its connection, the + // second call() either fails to acquire (with the explicit timeout below) or blocks until + // the test framework times out. Either manifests as a deterministic failure rather than a hang. + try (SdkHttpClient client = AwsCrtHttpClient.builder() + .maxConcurrency(1) + .connectionAcquisitionTimeout(Duration.ofSeconds(10)) + .build()) { + URI uri = URI.create("http://localhost:" + mockServer.port()); + stubFor(put(urlPathEqualTo("/sink")).willReturn(aResponse().withStatus(200))); + + IOException expected = new IOException("simulated upstream failure"); + SdkHttpFullRequest failingRequest = SdkHttpFullRequest.builder() + .uri(uri) + .method(SdkHttpMethod.PUT) + .encodedPath("/sink") + .putHeader("Host", uri.getHost()) + .putHeader("Content-Length", "100") + .build(); + HttpExecuteRequest failingExecute = + HttpExecuteRequest.builder() + .request(failingRequest) + .contentStreamProvider(() -> new InputStream() { + @Override + public int read() throws IOException { + throw expected; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + throw expected; + } + }) + .build(); + + assertThatThrownBy(() -> client.prepareRequest(failingExecute).call()) + .isInstanceOf(IOException.class); + + // If the previous failure leaked the connection, this second call would fail to acquire + // (bounded by the connectionAcquisitionTimeout configured above) instead of hanging. + String body = "second request body"; + byte[] bodyBytes = body.getBytes(StandardCharsets.UTF_8); + SdkHttpFullRequest okRequest = SdkHttpFullRequest.builder() + .uri(uri) + .method(SdkHttpMethod.PUT) + .encodedPath("/sink") + .putHeader("Host", uri.getHost()) + .putHeader("Content-Length", Integer.toString(bodyBytes.length)) + .build(); + HttpExecuteRequest okExecute = HttpExecuteRequest.builder() + .request(okRequest) + .contentStreamProvider(() -> new ByteArrayInputStream(bodyBytes)) + .build(); + + HttpExecuteResponse response = client.prepareRequest(okExecute).call(); + assertThat(response.httpResponse().statusCode()).isEqualTo(200); + } + } + + @Test + public void abortMidRequest_connectionReturnedToPool_subsequentRequestSucceeds() throws Exception { + try (SdkHttpClient client = AwsCrtHttpClient.builder() + .maxConcurrency(1) + .connectionAcquisitionTimeout(Duration.ofSeconds(10)) + .build()) { + URI uri = URI.create("http://localhost:" + mockServer.port()); + stubFor(any(urlPathEqualTo("/")).willReturn(aResponse().withFixedDelay(2000).withBody("hello"))); + stubFor(put(urlPathEqualTo("/sink")).willReturn(aResponse().withStatus(200))); + + SdkHttpRequest delayedRequest = createRequest(uri); + HttpExecuteRequest delayedExecute = HttpExecuteRequest.builder() + .request(delayedRequest) + .contentStreamProvider(() -> new ByteArrayInputStream(new byte[0])) + .build(); + + ExecutableHttpRequest abortable = client.prepareRequest(delayedExecute); + executorService.schedule(abortable::abort, 100, TimeUnit.MILLISECONDS); + assertThatThrownBy(abortable::call).isInstanceOf(IOException.class).hasMessageContaining("cancelled"); + + String body = "after abort"; + byte[] bodyBytes = body.getBytes(StandardCharsets.UTF_8); + SdkHttpFullRequest okRequest = SdkHttpFullRequest.builder() + .uri(uri) + .method(SdkHttpMethod.PUT) + .encodedPath("/sink") + .putHeader("Host", uri.getHost()) + .putHeader("Content-Length", Integer.toString(bodyBytes.length)) + .build(); + HttpExecuteRequest okExecute = HttpExecuteRequest.builder() + .request(okRequest) + .contentStreamProvider(() -> new ByteArrayInputStream(bodyBytes)) + .build(); + HttpExecuteResponse response = client.prepareRequest(okExecute).call(); + assertThat(response.httpResponse().statusCode()).isEqualTo(200); + } + } + + @Test + public void serverResetsConnection_connectionReturnedToPool_subsequentRequestSucceeds() throws Exception { + try (SdkHttpClient client = AwsCrtHttpClient.builder() + .maxConcurrency(1) + .connectionAcquisitionTimeout(Duration.ofSeconds(10)) + .build()) { + URI uri = URI.create("http://localhost:" + mockServer.port()); + stubFor(put(urlPathEqualTo("/sink")) + .willReturn(aResponse().withFault(Fault.CONNECTION_RESET_BY_PEER))); + + byte[] bodyBytes = randomAlphabetic(64).getBytes(StandardCharsets.UTF_8); + SdkHttpFullRequest failingRequest = SdkHttpFullRequest.builder() + .uri(uri) + .method(SdkHttpMethod.PUT) + .encodedPath("/sink") + .putHeader("Host", uri.getHost()) + .putHeader("Content-Length", Integer.toString(bodyBytes.length)) + .build(); + HttpExecuteRequest failingExecute = HttpExecuteRequest.builder() + .request(failingRequest) + .contentStreamProvider(() -> new ByteArrayInputStream(bodyBytes)) + .build(); + + assertThatThrownBy(() -> client.prepareRequest(failingExecute).call()) + .isInstanceOf(IOException.class); + + stubFor(put(urlPathEqualTo("/sink2")).willReturn(aResponse().withStatus(200))); + byte[] okBytes = "ok".getBytes(StandardCharsets.UTF_8); + SdkHttpFullRequest okRequest = SdkHttpFullRequest.builder() + .uri(uri) + .method(SdkHttpMethod.PUT) + .encodedPath("/sink2") + .putHeader("Host", uri.getHost()) + .putHeader("Content-Length", Integer.toString(okBytes.length)) + .build(); + HttpExecuteRequest okExecute = HttpExecuteRequest.builder() + .request(okRequest) + .contentStreamProvider(() -> new ByteArrayInputStream(okBytes)) + .build(); + + HttpExecuteResponse response = client.prepareRequest(okExecute).call(); + assertThat(response.httpResponse().statusCode()).isEqualTo(200); + } + } + + /** + * Regression test for the deadlock the pull-pump fix addresses. On master, the request body's + * {@code InputStream.read(...)} ran on the CRT event-loop thread (via the body callback), which + * meant a body sourced from a {@code GET}'s {@code ResponseInputStream} on the same event loop + * could deadlock: the GET held the event loop while the PUT body waited for it. + * + *

Pull-pump moves the read to the caller (sync) thread. This test verifies that load-bearing + * claim by recording the thread that performs the body read and asserting it is the caller + * thread - not a CRT event-loop thread. Failure of either the assertion or the test timeout + * (a hang) is the deadlock signal. + */ + @Test + public void putBodyReadHappensOnCallerThread_notOnCrtEventLoop() throws Exception { + try (SdkHttpClient client = AwsCrtHttpClient.builder() + .maxConcurrency(1) + .connectionAcquisitionTimeout(Duration.ofSeconds(10)) + .build()) { + URI uri = URI.create("http://localhost:" + mockServer.port()); + stubFor(put(urlPathEqualTo("/sink")).willReturn(aResponse().withStatus(200))); + + byte[] bodyBytes = "body-on-caller".getBytes(StandardCharsets.UTF_8); + AtomicReference readThreadName = new AtomicReference<>(); + SdkHttpFullRequest request = SdkHttpFullRequest.builder() + .uri(uri) + .method(SdkHttpMethod.PUT) + .encodedPath("/sink") + .putHeader("Host", uri.getHost()) + .putHeader("Content-Length", Integer.toString(bodyBytes.length)) + .build(); + HttpExecuteRequest executeRequest = + HttpExecuteRequest.builder() + .request(request) + .contentStreamProvider(() -> new ByteArrayInputStream(bodyBytes) { + @Override + public synchronized int read(byte[] b, int off, int len) { + readThreadName.compareAndSet(null, Thread.currentThread().getName()); + return super.read(b, off, len); + } + }) + .build(); + + String callerThreadName = Thread.currentThread().getName(); + HttpExecuteResponse response = client.prepareRequest(executeRequest).call(); + assertThat(response.httpResponse().statusCode()).isEqualTo(200); + + String observed = readThreadName.get(); + assertThat(observed) + .as("body read should happen on the caller thread, not the CRT event loop") + .isNotNull() + .isEqualTo(callerThreadName) + .doesNotContainIgnoringCase("AwsEventLoop") + .doesNotContainIgnoringCase("aws-event-loop"); + } + } + + /** + * Stress companion to {@link #putBodyReadHappensOnCallerThread_notOnCrtEventLoop}. Issues a + * delayed GET (response delayed server-side) and a PUT in parallel through the same + * {@code maxConcurrency(1)} client. On master, sequencing them through a single connection + * with the body read tied to the event-loop thread could deadlock; here both calls must + * complete within the test timeout. + */ + @Test + public void getInFlight_concurrentPut_bothComplete() throws Exception { + try (SdkHttpClient client = AwsCrtHttpClient.builder() + .maxConcurrency(1) + .connectionAcquisitionTimeout(Duration.ofSeconds(15)) + .build()) { + URI uri = URI.create("http://localhost:" + mockServer.port()); + stubFor(any(urlPathEqualTo("/slow")) + .willReturn(aResponse().withFixedDelay(2_000).withBody("hello"))); + stubFor(put(urlPathEqualTo("/sink")).willReturn(aResponse().withStatus(200))); + + SdkHttpRequest getRequest = SdkHttpFullRequest.builder() + .uri(uri) + .method(SdkHttpMethod.GET) + .encodedPath("/slow") + .putHeader("Host", uri.getHost()) + .build(); + HttpExecuteRequest getExecute = HttpExecuteRequest.builder() + .request(getRequest) + .contentStreamProvider(() -> new ByteArrayInputStream(new byte[0])) + .build(); + + byte[] putBytes = "put-body".getBytes(StandardCharsets.UTF_8); + SdkHttpFullRequest putRequest = SdkHttpFullRequest.builder() + .uri(uri) + .method(SdkHttpMethod.PUT) + .encodedPath("/sink") + .putHeader("Host", uri.getHost()) + .putHeader("Content-Length", Integer.toString(putBytes.length)) + .build(); + HttpExecuteRequest putExecute = HttpExecuteRequest.builder() + .request(putRequest) + .contentStreamProvider(() -> new ByteArrayInputStream(putBytes)) + .build(); + + ExecutorService pool = Executors.newFixedThreadPool(2); + try { + Callable getTask = () -> client.prepareRequest(getExecute).call(); + Callable putTask = () -> client.prepareRequest(putExecute).call(); + Future getFuture = pool.submit(getTask); + Future putFuture = pool.submit(putTask); + + HttpExecuteResponse getResponse = getFuture.get(15, TimeUnit.SECONDS); + HttpExecuteResponse putResponse = putFuture.get(15, TimeUnit.SECONDS); + assertThat(getResponse.httpResponse().statusCode()).isEqualTo(200); + assertThat(putResponse.httpResponse().statusCode()).isEqualTo(200); + } finally { + pool.shutdownNow(); + pool.awaitTermination(5, TimeUnit.SECONDS); + } + } + } + /** * Make a simple request and wait for it to finish. * diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java index 456000ac115..8b916ba1ee9 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java @@ -87,7 +87,7 @@ public void execute_requestConversionFails_failsFuture() { .request(HttpExecuteRequest.builder().build()) .build(); - CompletableFuture executeFuture = requestExecutor.execute(context); + CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture(); assertThat(executeFuture).hasFailedWithThrowableThat().isInstanceOf(NullPointerException.class); } @@ -102,7 +102,7 @@ public void execute_acquireStreamFails_wrapsWithIOException() { .thenReturn(completableFuture); completableFuture.completeExceptionally(exception); - CompletableFuture executeFuture = requestExecutor.execute(context); + CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture(); assertThat(executeFuture).hasFailedWithThrowableThat().hasCause(exception).isInstanceOf(IOException.class); } @@ -116,7 +116,7 @@ public void execute_retryableException_wrapsWithIOException(Throwable throwable) Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class))) .thenReturn(completableFuture); - CompletableFuture executeFuture = requestExecutor.execute(context); + CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture(); assertThat(executeFuture).hasFailedWithThrowableThat().hasCause(throwable).isInstanceOf(IOException.class); } @@ -133,7 +133,7 @@ public void execute_httpException_mapsToCorrectException(Entry executeFuture = requestExecutor.execute(context); + CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture(); assertThatThrownBy(executeFuture::join).hasCauseInstanceOf(expectedExceptionClass); } @@ -146,7 +146,7 @@ public void execute_nonRetryableHttpException_doesNotWrapWithIOException() { Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class))) .thenReturn(completableFuture); - CompletableFuture executeFuture = requestExecutor.execute(context); + CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture(); assertThatThrownBy(executeFuture::join).hasCause(exception); } diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipeTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipeTest.java new file mode 100644 index 00000000000..72f29278589 --- /dev/null +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipeTest.java @@ -0,0 +1,396 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.crt.internal.request; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.Test; + +class BodyChunkPipeTest { + + @Test + void pollDrain_emptyOpenPipe_returnsZero() { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + ByteBuffer dst = ByteBuffer.allocate(8); + + int n = pipe.pollDrain(dst); + + assertThat(n).isZero(); + assertThat(dst.position()).isZero(); + } + + @Test + void pollDrain_afterEofWithEmptyQueue_returnsMinusOne() { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + pipe.signalEof(); + + int n = pipe.pollDrain(ByteBuffer.allocate(8)); + + assertThat(n).isEqualTo(-1); + } + + @Test + void publish_thenDrain_consumerSeesProducerBytes() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + Chunk c = pipe.acquireForFill(); + byte[] payload = {1, 2, 3, 4, 5}; + System.arraycopy(payload, 0, c.data(), 0, payload.length); + c.pos(0); + c.len(payload.length); + pipe.publish(c); + pipe.signalEof(); + ByteBuffer dst = ByteBuffer.allocate(16); + + int first = pipe.pollDrain(dst); + int second = pipe.pollDrain(dst); + + assertThat(first).isEqualTo(payload.length); + assertThat(second).isEqualTo(-1); + dst.flip(); + byte[] out = new byte[dst.remaining()]; + dst.get(out); + assertThat(out).containsExactly(payload); + } + + @Test + void signalError_pollDrainThrows() { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + pipe.signalError(new RuntimeException("boom")); + + assertThatThrownBy(() -> pipe.pollDrain(ByteBuffer.allocate(8))) + .hasMessageContaining("Producer failed") + .hasRootCauseMessage("boom"); + } + + @Test + void abort_emptiesReadyAndChangesState() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + Chunk c = pipe.acquireForFill(); + c.len(4); + pipe.publish(c); + + pipe.abort(); + + assertThatThrownBy(() -> pipe.pollDrain(ByteBuffer.allocate(8))) + .hasMessageContaining("aborted"); + } + + @Test + void pollDrain_signalErrorWithQueuedChunks_drainsThenThrows() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + Chunk c = pipe.acquireForFill(); + byte[] payload = {7, 8, 9}; + System.arraycopy(payload, 0, c.data(), 0, payload.length); + c.len(payload.length); + pipe.publish(c); + pipe.signalError(new RuntimeException("boom")); + + ByteBuffer dst = ByteBuffer.allocate(payload.length); + int drained = pipe.pollDrain(dst); + + assertThat(drained).isEqualTo(payload.length); + dst.flip(); + byte[] out = new byte[dst.remaining()]; + dst.get(out); + assertThat(out).containsExactly(payload); + assertThatThrownBy(() -> pipe.pollDrain(ByteBuffer.allocate(8))) + .hasMessageContaining("Producer failed") + .hasRootCauseMessage("boom"); + } + + @Test + void pollDrain_signalEofWithQueuedChunks_drainsThenReturnsMinusOne() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + Chunk c = pipe.acquireForFill(); + byte[] payload = {10, 20, 30}; + System.arraycopy(payload, 0, c.data(), 0, payload.length); + c.len(payload.length); + pipe.publish(c); + pipe.signalEof(); + + ByteBuffer dst = ByteBuffer.allocate(payload.length); + int drained = pipe.pollDrain(dst); + int afterDrain = pipe.pollDrain(ByteBuffer.allocate(8)); + + assertThat(drained).isEqualTo(payload.length); + dst.flip(); + byte[] out = new byte[dst.remaining()]; + dst.get(out); + assertThat(out).containsExactly(payload); + assertThat(afterDrain).isEqualTo(-1); + } + + @Test + void abort_afterSignalEof_leavesStateAsEof() { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + pipe.signalEof(); + + pipe.abort(); + + assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.EOF); + assertThat(pipe.pollDrain(ByteBuffer.allocate(8))).isEqualTo(-1); + } + + @Test + void abort_afterSignalEofWithQueuedChunks_doesNotClearReady() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + Chunk c = pipe.acquireForFill(); + byte[] payload = {1, 2, 3}; + System.arraycopy(payload, 0, c.data(), 0, payload.length); + c.len(payload.length); + pipe.publish(c); + pipe.signalEof(); + + pipe.abort(); + + assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.EOF); + ByteBuffer dst = ByteBuffer.allocate(payload.length); + int drained = pipe.pollDrain(dst); + assertThat(drained).isEqualTo(payload.length); + assertThat(pipe.pollDrain(ByteBuffer.allocate(8))).isEqualTo(-1); + } + + @Test + void recycle_intoEofPipe_doesNotThrowAndDoesNotCorruptPool() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + Chunk c = pipe.acquireForFill(); + c.len(4); + pipe.publish(c); + pipe.signalEof(); + + ByteBuffer dst = ByteBuffer.allocate(8); + int drained = pipe.pollDrain(dst); + int afterDrain = pipe.pollDrain(ByteBuffer.allocate(8)); + + assertThat(drained).isEqualTo(4); + assertThat(afterDrain).isEqualTo(-1); + assertThat(pipe.allocatedForTest()).isEqualTo(1); + } + + @Test + void recycle_intoAbortedPipe_doesNotThrow() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + Chunk c = pipe.acquireForFill(); + pipe.abort(); + + c.len(0); + pipe.publish(c); + + assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.ABORTED); + } + + @Test + void recycle_intoErrorPipe_doesNotThrow() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + Chunk c = pipe.acquireForFill(); + pipe.signalError(new RuntimeException("boom")); + + c.len(0); + pipe.publish(c); + + assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.ERROR); + } + + @Test + void constructor_doesNotAllocateChunks() { + BodyChunkPipe pipe = new BodyChunkPipe(4, 16); + + assertThat(pipe.allocatedForTest()).isZero(); + } + + @Test + void acquireForFill_firstCall_allocatesOneChunk() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(4, 16); + + Chunk c = pipe.acquireForFill(); + + assertThat(c).isNotNull(); + assertThat(c.data()).hasSize(16); + assertThat(pipe.allocatedForTest()).isEqualTo(1); + } + + @Test + void acquireForFill_uniqueChunksUpToDepth_thenStopsAllocating() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(3, 8); + Chunk c1 = pipe.acquireForFill(); + Chunk c2 = pipe.acquireForFill(); + Chunk c3 = pipe.acquireForFill(); + + c1.len(1); + pipe.publish(c1); + pipe.pollDrain(ByteBuffer.allocate(8)); + Chunk reused = pipe.acquireForFill(); + + assertThat(c1).isNotSameAs(c2).isNotSameAs(c3); + assertThat(c2).isNotSameAs(c3); + assertThat(pipe.allocatedForTest()).isEqualTo(3); + assertThat(reused).isSameAs(c1); + } + + @Test + void acquireForFill_recycledChunkReused_noNewAllocation() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + Chunk c = pipe.acquireForFill(); + c.len(3); + pipe.publish(c); + pipe.pollDrain(ByteBuffer.allocate(8)); + + Chunk reused = pipe.acquireForFill(); + + assertThat(reused).isSameAs(c); + assertThat(pipe.allocatedForTest()).isEqualTo(1); + } + + @Test + void acquireForFill_afterAbort_returnsNull() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + pipe.abort(); + + Chunk c = pipe.acquireForFill(); + + assertThat(c).isNull(); + } + + @Test + void acquireForFill_afterSignalError_returnsNull() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + pipe.signalError(new RuntimeException("boom")); + + Chunk c = pipe.acquireForFill(); + + assertThat(c).isNull(); + } + + @Test + void constructor_invalidDepth_throws() { + assertThatThrownBy(() -> new BodyChunkPipe(0, 8)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("depth"); + } + + @Test + void constructor_invalidChunkSize_throws() { + assertThatThrownBy(() -> new BodyChunkPipe(2, 0)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("chunkSize"); + } + + /** + * Multi-threaded ordering test: producer races to call {@link BodyChunkPipe#signalError(Throwable)} + * while a consumer is concurrently spinning on {@link BodyChunkPipe#pollDrain(java.nio.ByteBuffer)}. + * The contract is that whenever the consumer observes the ERROR state, the cause must already + * be visible (no {@code RuntimeException("Producer failed", null)}). RepeatedTest amplifies the + * race window. With the cause published before the CAS, this should pass on every iteration. + */ + @RepeatedTest(50) + void signalError_concurrentPollDrain_consumerNeverSeesNullCause() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(2, 16); + IllegalStateException expected = new IllegalStateException("boom"); + CountDownLatch start = new CountDownLatch(1); + AtomicReference consumerError = new AtomicReference<>(); + AtomicReference nullCauseSighting = new AtomicReference<>(); + + Thread consumer = new Thread(() -> { + try { + start.await(); + ByteBuffer dst = ByteBuffer.allocate(16); + long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5); + while (System.nanoTime() < deadline) { + try { + int n = pipe.pollDrain(dst); + if (n < 0) { + return; + } + dst.clear(); + } catch (RuntimeException re) { + if (re.getCause() == null) { + nullCauseSighting.set(re); + } + return; + } + } + } catch (Throwable t) { + consumerError.set(t); + } + }, "pipe-consumer"); + + Thread producer = new Thread(() -> { + try { + start.await(); + pipe.signalError(expected); + } catch (Throwable t) { + consumerError.set(t); + } + }, "pipe-producer"); + + consumer.start(); + producer.start(); + start.countDown(); + producer.join(5_000); + consumer.join(5_000); + + assertThat(consumer.isAlive()).isFalse(); + assertThat(producer.isAlive()).isFalse(); + assertThat(consumerError.get()).isNull(); + assertThat(nullCauseSighting.get()).isNull(); + } + + /** + * Multi-threaded test for the recycle/notify path: producer is forced to block on + * {@link BodyChunkPipe#acquireForFill()} because all chunks are in flight, then the consumer + * drains a chunk which {@code recycle()}s and notifies the producer to wake. This exercises the + * full {@code freeLock.notifyAll()} hand-off rather than relying on the defensive 50ms wakeup. + */ + @Test + void acquireForFill_blocksUntilConsumerRecycles_thenWakesAndCompletes() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(1, 8); + Chunk first = pipe.acquireForFill(); + first.len(4); + pipe.publish(first); + + CountDownLatch producerEntered = new CountDownLatch(1); + AtomicReference reused = new AtomicReference<>(); + AtomicReference producerError = new AtomicReference<>(); + Thread producer = new Thread(() -> { + try { + producerEntered.countDown(); + Chunk c = pipe.acquireForFill(); + reused.set(c); + } catch (Throwable t) { + producerError.set(t); + } + }, "pipe-producer"); + + producer.start(); + producerEntered.await(); + // Drain so the chunk is recycled and the producer is woken via notifyAll. + long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5); + while (reused.get() == null && System.nanoTime() < deadline) { + pipe.pollDrain(ByteBuffer.allocate(8)); + producer.join(50); + } + + assertThat(producerError.get()).isNull(); + assertThat(reused.get()).isSameAs(first); + assertThat(pipe.allocatedForTest()).isEqualTo(1); + } +} diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/PipeBackedRequestBodyStreamTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/PipeBackedRequestBodyStreamTest.java new file mode 100644 index 00000000000..1add84bb4cd --- /dev/null +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/PipeBackedRequestBodyStreamTest.java @@ -0,0 +1,141 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.crt.internal.request; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.nio.ByteBuffer; +import org.junit.jupiter.api.Test; + +class PipeBackedRequestBodyStreamTest { + + @Test + void sendRequestBody_emptyOpenPipe_returnsFalseAndCopiesNothing() { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + PipeBackedRequestBodyStream stream = new PipeBackedRequestBodyStream(pipe); + ByteBuffer dst = ByteBuffer.allocate(8); + + boolean done = stream.sendRequestBody(dst); + + assertThat(done).isFalse(); + assertThat(dst.position()).isZero(); + } + + @Test + void sendRequestBody_afterEofAndDrained_returnsTrue() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + Chunk c = pipe.acquireForFill(); + byte[] payload = {1, 2, 3}; + System.arraycopy(payload, 0, c.data(), 0, payload.length); + c.len(payload.length); + pipe.publish(c); + pipe.signalEof(); + PipeBackedRequestBodyStream stream = new PipeBackedRequestBodyStream(pipe); + + ByteBuffer first = ByteBuffer.allocate(8); + boolean firstDone = stream.sendRequestBody(first); + ByteBuffer second = ByteBuffer.allocate(8); + boolean secondDone = stream.sendRequestBody(second); + + assertThat(firstDone).isFalse(); + assertThat(first.position()).isEqualTo(payload.length); + assertThat(secondDone).isTrue(); + assertThat(second.position()).isZero(); + } + + @Test + void sendRequestBody_pipeInError_throwsRuntimeExceptionWithCause() { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + IllegalStateException cause = new IllegalStateException("upstream broke"); + pipe.signalError(cause); + PipeBackedRequestBodyStream stream = new PipeBackedRequestBodyStream(pipe); + + assertThatThrownBy(() -> stream.sendRequestBody(ByteBuffer.allocate(8))) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("Producer failed") + .hasRootCauseMessage("upstream broke"); + } + + @Test + void sendRequestBody_pipeAborted_throwsRuntimeException() { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + pipe.abort(); + PipeBackedRequestBodyStream stream = new PipeBackedRequestBodyStream(pipe); + + assertThatThrownBy(() -> stream.sendRequestBody(ByteBuffer.allocate(8))) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("aborted"); + } + + @Test + void resetPosition_returnsFalse() { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + PipeBackedRequestBodyStream stream = new PipeBackedRequestBodyStream(pipe); + + assertThat(stream.resetPosition()).isFalse(); + } + + /** + * When CRT's destination buffer is smaller than the chunk size, draining a single chunk + * requires multiple {@code sendRequestBody} calls. This exercises {@link BodyChunkPipe#pollDrain}'s + * {@code pendingDrain} state being carried across consumer invocations. + */ + @Test + void sendRequestBody_destinationSmallerThanChunk_drainsAcrossMultipleCalls() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(2, 16); + Chunk c = pipe.acquireForFill(); + byte[] payload = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; + System.arraycopy(payload, 0, c.data(), 0, payload.length); + c.len(payload.length); + pipe.publish(c); + pipe.signalEof(); + PipeBackedRequestBodyStream stream = new PipeBackedRequestBodyStream(pipe); + + ByteBuffer first = ByteBuffer.allocate(3); + ByteBuffer second = ByteBuffer.allocate(3); + ByteBuffer third = ByteBuffer.allocate(3); + ByteBuffer fourth = ByteBuffer.allocate(3); + ByteBuffer fifth = ByteBuffer.allocate(3); + boolean firstDone = stream.sendRequestBody(first); + boolean secondDone = stream.sendRequestBody(second); + boolean thirdDone = stream.sendRequestBody(third); + boolean fourthDone = stream.sendRequestBody(fourth); + boolean fifthDone = stream.sendRequestBody(fifth); + + assertThat(firstDone).isFalse(); + assertThat(secondDone).isFalse(); + assertThat(thirdDone).isFalse(); + assertThat(fourthDone).isFalse(); + assertThat(fifthDone).isTrue(); + assertThat(first.position()).isEqualTo(3); + assertThat(second.position()).isEqualTo(3); + assertThat(third.position()).isEqualTo(3); + assertThat(fourth.position()).isEqualTo(1); + assertThat(fifth.position()).isZero(); + + byte[] reassembled = new byte[payload.length]; + first.flip(); + first.get(reassembled, 0, 3); + second.flip(); + second.get(reassembled, 3, 3); + third.flip(); + third.get(reassembled, 6, 3); + fourth.flip(); + fourth.get(reassembled, 9, 1); + assertThat(reassembled).containsExactly(payload); + } +} diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPumpTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPumpTest.java new file mode 100644 index 00000000000..21d0d4fa1ec --- /dev/null +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPumpTest.java @@ -0,0 +1,239 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.crt.internal.request; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.LockSupport; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.http.ContentStreamProvider; +import software.amazon.awssdk.http.SdkHttpFullResponse; + +class SyncRequestBodyPumpTest { + + @Test + void pump_happyPath_consumerSeesAllProducerBytes() throws Exception { + byte[] payload = new byte[200]; + for (int i = 0; i < payload.length; i++) { + payload[i] = (byte) (i & 0xFF); + } + BodyChunkPipe pipe = new BodyChunkPipe(2, 32); + SyncRequestBodyPump pump = new SyncRequestBodyPump(ContentStreamProvider.fromByteArray(payload), pipe); + AtomicReference producerError = new AtomicReference<>(); + Thread producer = new Thread(() -> { + try { + pump.pump(); + } catch (Throwable t) { + producerError.set(t); + } + }, "pump-producer"); + + producer.start(); + byte[] consumed = drainAll(pipe, payload.length); + producer.join(5_000); + + assertThat(producerError.get()).isNull(); + assertThat(producer.isAlive()).isFalse(); + assertThat(consumed).containsExactly(payload); + assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.EOF); + } + + @Test + void pump_emptyStream_signalsEofWithoutPublish() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(2, 16); + SyncRequestBodyPump pump = new SyncRequestBodyPump(ContentStreamProvider.fromByteArray(new byte[0]), pipe); + + pump.pump(); + + assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.EOF); + assertThat(pipe.pollDrain(ByteBuffer.allocate(8))).isEqualTo(-1); + } + + @Test + void pump_inputStreamThrowsIoException_pumpSignalsErrorAndRethrows() { + IOException ioe = new IOException("disk gone"); + BodyChunkPipe pipe = new BodyChunkPipe(2, 16); + ContentStreamProvider provider = () -> new InputStream() { + @Override + public int read() { + throw new UnsupportedOperationException(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + throw ioe; + } + }; + SyncRequestBodyPump pump = new SyncRequestBodyPump(provider, pipe); + + assertThatThrownBy(pump::pump).isSameAs(ioe); + assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.ERROR); + assertThatThrownBy(() -> pipe.pollDrain(ByteBuffer.allocate(8))) + .hasMessageContaining("Producer failed") + .hasRootCauseMessage("disk gone"); + } + + @Test + void pump_abortedWhilePumping_returnsWithoutSignalingEof() throws Exception { + // pipe depth 1 + payload larger than chunk forces producer to block on second acquireForFill, + // giving the test thread a deterministic point to call abort(). + BodyChunkPipe pipe = new BodyChunkPipe(1, 8); + byte[] payload = new byte[64]; + SyncRequestBodyPump pump = new SyncRequestBodyPump(ContentStreamProvider.fromByteArray(payload), pipe); + AtomicReference producerError = new AtomicReference<>(); + Thread producer = new Thread(() -> { + try { + pump.pump(); + } catch (Throwable t) { + producerError.set(t); + } + }, "pump-producer"); + + producer.start(); + waitUntilStateIsOpenWithChunkInFlight(pipe); + pump.abort(); + producer.join(5_000); + + assertThat(producer.isAlive()).isFalse(); + assertThat(producerError.get()).isNull(); + assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.ABORTED); + } + + /** + * Regression test for the producer-livelock-on-CRT-failure path. + * + *

If CRT signals request failure (network error, idle/health timeout, etc.) while the + * producer is parked in {@link BodyChunkPipe#acquireForFill()}, nothing in the pipe's normal + * contract wakes it without a recycle/abort. The fix in {@code AwsCrtHttpClient.CrtHttpRequest.call()} + * registers a {@code responseFuture.whenComplete(...)} hook that calls {@code pump.abort()} + * when the response future completes exceptionally. This test reproduces that wiring + * at the unit level: a pump runs against a pipe with no consumer, the producer parks once the + * pipe is full, and we then complete a separate response future exceptionally with the same + * hook to verify the producer unblocks and {@code pump()} returns. + * + *

Without the hook (or equivalent abort path), {@code producer.join(5_000)} would time out + * and the test would fail. + */ + @Test + void pump_responseFutureFailsExceptionally_whileProducerParked_unblocksProducerViaAbortHook() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + // Payload larger than depth*chunkSize forces the producer to park on acquireForFill once + // both chunks are sitting in the ready queue with no consumer draining. + byte[] payload = new byte[128]; + SyncRequestBodyPump pump = new SyncRequestBodyPump(ContentStreamProvider.fromByteArray(payload), pipe); + CompletableFuture responseFuture = new CompletableFuture<>(); + responseFuture.whenComplete((r, t) -> { + if (t != null) { + pump.abort(); + } + }); + + AtomicReference producerError = new AtomicReference<>(); + Thread producer = new Thread(() -> { + try { + pump.pump(); + } catch (Throwable t) { + producerError.set(t); + } + }, "pump-producer"); + + producer.start(); + waitUntilProducerIsParked(pipe); + responseFuture.completeExceptionally(new IOException("simulated CRT failure")); + producer.join(5_000); + + assertThat(producer.isAlive()).isFalse(); + assertThat(producerError.get()).isNull(); + assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.ABORTED); + } + + @Test + void abort_propagatesToPipe() { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + SyncRequestBodyPump pump = new SyncRequestBodyPump( + ContentStreamProvider.fromByteArray(new byte[0]), pipe); + + pump.abort(); + + assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.ABORTED); + } + + private static byte[] drainAll(BodyChunkPipe pipe, int expected) { + byte[] out = new byte[expected]; + int written = 0; + long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5); + while (written < expected && System.nanoTime() < deadline) { + ByteBuffer scratch = ByteBuffer.allocate(Math.min(64, expected - written)); + int n = pipe.pollDrain(scratch); + if (n < 0) { + break; + } + if (n == 0) { + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1)); + continue; + } + scratch.flip(); + scratch.get(out, written, n); + written += n; + } + if (written < expected) { + throw new AssertionError("Drained only " + written + " of " + expected + " bytes"); + } + return out; + } + + private static void waitUntilStateIsOpenWithChunkInFlight(BodyChunkPipe pipe) throws InterruptedException { + long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5); + while (System.nanoTime() < deadline) { + if (pipe.allocatedForTest() >= 1) { + return; + } + Thread.sleep(1); + } + throw new AssertionError("Producer did not allocate a chunk within timeout"); + } + + /** + * Wait for the producer to park on {@code acquireForFill}. Detected by the pipe reaching its + * configured depth in allocations and then staying there for a couple of consecutive observations + * (the producer can't make further progress without a recycle). + */ + private static void waitUntilProducerIsParked(BodyChunkPipe pipe) throws InterruptedException { + long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5); + int stableObservations = 0; + int lastAllocated = -1; + while (System.nanoTime() < deadline) { + int allocated = pipe.allocatedForTest(); + if (allocated == lastAllocated && allocated > 0) { + if (++stableObservations >= 3) { + return; + } + } else { + stableObservations = 0; + lastAllocated = allocated; + } + Thread.sleep(20); + } + throw new AssertionError("Producer did not park within timeout"); + } +} diff --git a/services/s3/src/it/java/software/amazon/awssdk/services/s3/crthttpclient/S3WithCrtHttpClientsIntegrationTests.java b/services/s3/src/it/java/software/amazon/awssdk/services/s3/crthttpclient/S3WithCrtHttpClientsIntegrationTests.java index f1eb1c6e4f2..943635dd41f 100644 --- a/services/s3/src/it/java/software/amazon/awssdk/services/s3/crthttpclient/S3WithCrtHttpClientsIntegrationTests.java +++ b/services/s3/src/it/java/software/amazon/awssdk/services/s3/crthttpclient/S3WithCrtHttpClientsIntegrationTests.java @@ -16,12 +16,13 @@ package software.amazon.awssdk.services.s3.crthttpclient; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import org.assertj.core.api.Assertions; +import java.time.Duration; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -32,6 +33,7 @@ import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3IntegrationTestBase; import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; import software.amazon.awssdk.services.s3.utils.ChecksumUtils; import software.amazon.awssdk.testutils.RandomTempFile; import software.amazon.awssdk.utils.Md5Utils; @@ -80,4 +82,22 @@ void getObject_toFile_objectSentCorrectly() throws Exception { assertThat(Md5Utils.md5AsBase64(destination.toFile())).isEqualTo(Md5Utils.md5AsBase64(testFile)); } + + @Test + void getObject_responseStreamPipedIntoPutObject_completesWithoutDeadlock() throws Exception { + String destinationKey = "piped-" + TEST_KEY; + try (ResponseInputStream sourceStream = + s3WithCrtHttpClient.getObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), + ResponseTransformer.toInputStream())) { + long contentLength = sourceStream.response().contentLength(); + + PutObjectResponse putResponse = assertTimeoutPreemptively( + Duration.ofSeconds(120), + () -> s3WithCrtHttpClient.putObject( + r -> r.bucket(TEST_BUCKET).key(destinationKey).contentLength(contentLength), + RequestBody.fromInputStream(sourceStream, contentLength))); + + assertThat(putResponse.eTag()).isNotBlank(); + } + } } diff --git a/test/http-client-tests/src/main/java/software/amazon/awssdk/http/LongRunningRequestTestSupport.java b/test/http-client-tests/src/main/java/software/amazon/awssdk/http/LongRunningRequestTestSupport.java index 723869ad852..21d04a5c944 100644 --- a/test/http-client-tests/src/main/java/software/amazon/awssdk/http/LongRunningRequestTestSupport.java +++ b/test/http-client-tests/src/main/java/software/amazon/awssdk/http/LongRunningRequestTestSupport.java @@ -20,11 +20,21 @@ import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo; import com.github.tomakehurst.wiremock.junit5.WireMockExtension; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.net.URI; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import software.amazon.awssdk.utils.Logger; /** * Shared helpers for the long-running request test suites. @@ -36,6 +46,8 @@ public final class LongRunningRequestTestSupport { public static final Duration TIME_BOUND_SAFETY_MARGIN = Duration.ofSeconds(10); public static final Duration HANG_DELAY = Duration.ofMinutes(1); + private static final Logger log = Logger.loggerFor(LongRunningRequestTestSupport.class); + private LongRunningRequestTestSupport() { } @@ -72,22 +84,117 @@ public static void stubHanging(WireMockExtension mockServer) { .withFixedDelay((int) HANG_DELAY.toMillis()))); } + /** + * Async-executes a POST against {@code mockServer} on a worker thread. A per-call random + * {@code testReqId} is logged BEFORE the request is dispatched and propagated to the SDK via the + * {@code x-aws-sdk-test-id} header so the SDK can prefix its lifecycle logs with the same id. + */ + public static TestRequestExecution executeAsync(SdkHttpClient client, WireMockExtension mockServer) { + String testReqId = "test-" + String.format("%08x", ThreadLocalRandom.current().nextInt()); + log.info(() -> "TEST REQUEST ID: " + testReqId + " (dispatching)"); + CompletableFuture future = CompletableFuture.supplyAsync(() -> { + executeRequest(client, mockServer, testReqId); + return null; + }); + return new TestRequestExecution(testReqId, future); + } + + private static void executeRequest(SdkHttpClient client, WireMockExtension mockServer, String testReqId) { + URI uri = URI.create("http://localhost:" + mockServer.getPort()); + SdkHttpFullRequest request = SdkHttpFullRequest.builder() + .uri(uri) + .method(SdkHttpMethod.POST) + .putHeader("Host", uri.getHost()) + .putHeader("Content-Length", "4") + .putHeader("x-aws-sdk-test-id", testReqId) + .contentStreamProvider(() -> new ByteArrayInputStream( + "Body".getBytes(StandardCharsets.UTF_8))) + .build(); + try { + HttpExecuteResponse response = client.prepareRequest(HttpExecuteRequest.builder() + .request(request) + .contentStreamProvider( + request.contentStreamProvider() + .orElse(null)) + .build()) + .call(); + response.responseBody().ifPresent(body -> { + try { + while (body.read() != -1) { + // drain body so mid-body timeouts surface + } + body.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public static void assertFailsWithinTimeBound(TestRequestExecution execution, Duration expectedTimeout) { + assertFailsWithinTimeBound(execution.future(), execution.testReqId(), expectedTimeout); + } + public static void assertFailsWithinTimeBound(CompletableFuture future, Duration expectedTimeout) { + assertFailsWithinTimeBound(future, "(unknown)", expectedTimeout); + } + + private static void assertFailsWithinTimeBound(CompletableFuture future, String testReqId, Duration expectedTimeout) { Duration maxWait = expectedTimeout.plus(TIME_BOUND_SAFETY_MARGIN); try { future.get(maxWait.toMillis(), TimeUnit.MILLISECONDS); - throw new AssertionError("Expected request to throw an exception but it completed successfully"); + throw new AssertionError("Expected request " + testReqId + + " to throw an exception but it completed successfully"); } catch (TimeoutException e) { + // Bookend the thread dump with the test reqId so surefire output can be grep'd by request. + log.error(() -> "TEST REQUEST ID: " + testReqId + " (timed out, dumping threads)"); + log.error(() -> dumpAllThreads()); future.cancel(true); throw new AssertionError( - "Expected request to fail within " + maxWait + " but it was still running - client appears to hang", + "Expected request " + testReqId + " to fail within " + maxWait + + " but it was still running - client appears to hang", e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new AssertionError("Unexpected interruption while waiting for request to fail", e); + throw new AssertionError("Unexpected interruption while waiting for request " + testReqId + " to fail", e); } catch (ExecutionException e) { // expected } } + + /** + * Bundles a worker future with the per-call test reqId so the assertion helper can reference it in + * failure messages. + */ + public static final class TestRequestExecution { + private final String testReqId; + private final CompletableFuture future; + + TestRequestExecution(String testReqId, CompletableFuture future) { + this.testReqId = testReqId; + this.future = future; + } + + public String testReqId() { + return testReqId; + } + + public CompletableFuture future() { + return future; + } + } + + static String dumpAllThreads() { + ThreadMXBean tmx = ManagementFactory.getThreadMXBean(); + ThreadInfo[] infos = tmx.dumpAllThreads(true, true); + StringBuilder sb = new StringBuilder("=== THREAD DUMP ===\n"); + for (ThreadInfo info : infos) { + sb.append(info.toString()).append('\n'); + } + sb.append("=== END THREAD DUMP ===\n"); + return sb.toString(); + } } diff --git a/test/http-client-tests/src/main/java/software/amazon/awssdk/http/SdkHttpClientLongRunningRequestTestSuite.java b/test/http-client-tests/src/main/java/software/amazon/awssdk/http/SdkHttpClientLongRunningRequestTestSuite.java index 0e52eb19eb4..4403db37a42 100644 --- a/test/http-client-tests/src/main/java/software/amazon/awssdk/http/SdkHttpClientLongRunningRequestTestSuite.java +++ b/test/http-client-tests/src/main/java/software/amazon/awssdk/http/SdkHttpClientLongRunningRequestTestSuite.java @@ -19,19 +19,15 @@ import static software.amazon.awssdk.http.LongRunningRequestTestSupport.CONFIGURED_TIMEOUT; import static software.amazon.awssdk.http.LongRunningRequestTestSupport.HANG_DELAY; import static software.amazon.awssdk.http.LongRunningRequestTestSupport.assertFailsWithinTimeBound; +import static software.amazon.awssdk.http.LongRunningRequestTestSupport.executeAsync; import static software.amazon.awssdk.http.LongRunningRequestTestSupport.stubHanging; import static software.amazon.awssdk.http.LongRunningRequestTestSupport.stubLongPolling; import static software.amazon.awssdk.http.LongRunningRequestTestSupport.stubStreamingWithPauses; import com.github.tomakehurst.wiremock.junit5.WireMockExtension; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.net.URI; -import java.nio.charset.StandardCharsets; -import java.util.concurrent.CompletableFuture; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import software.amazon.awssdk.http.LongRunningRequestTestSupport.TestRequestExecution; import software.amazon.awssdk.utils.AttributeMap; /** @@ -56,7 +52,7 @@ public void executeWhenReadTimeoutAndServerDelaysResponseFailsWithinTimeoutBound CONFIGURED_TIMEOUT) .build()); try { - assertFailsWithinTimeBound(executeAsync(client), CONFIGURED_TIMEOUT); + assertFailsWithinTimeBound(executeAsync(client, mockServer), CONFIGURED_TIMEOUT); } finally { client.close(); } @@ -71,7 +67,7 @@ public void executeWhenReadTimeoutAndStreamingResponsePausesFailsWithinTimeoutBo CONFIGURED_TIMEOUT) .build()); try { - assertFailsWithinTimeBound(executeAsync(client), CONFIGURED_TIMEOUT); + assertFailsWithinTimeBound(executeAsync(client, mockServer), CONFIGURED_TIMEOUT); } finally { client.close(); } @@ -89,54 +85,14 @@ public void executeWhenConnectionAcquireTimeoutAndPoolExhaustedFailsWithinTimeou CONFIGURED_TIMEOUT) .build()); try { - CompletableFuture firstRequest = executeAsync(client); + TestRequestExecution firstRequest = executeAsync(client, mockServer); Thread.sleep(500); - assertFailsWithinTimeBound(executeAsync(client), CONFIGURED_TIMEOUT); + assertFailsWithinTimeBound(executeAsync(client, mockServer), CONFIGURED_TIMEOUT); - firstRequest.cancel(true); + firstRequest.future().cancel(true); } finally { client.close(); } } - - private CompletableFuture executeAsync(SdkHttpClient client) { - return CompletableFuture.supplyAsync(() -> { - executeRequest(client); - return null; - }); - } - - private void executeRequest(SdkHttpClient client) { - URI uri = URI.create("http://localhost:" + mockServer.getPort()); - SdkHttpFullRequest request = SdkHttpFullRequest.builder() - .uri(uri) - .method(SdkHttpMethod.POST) - .putHeader("Host", uri.getHost()) - .putHeader("Content-Length", "4") - .contentStreamProvider(() -> new ByteArrayInputStream( - "Body".getBytes(StandardCharsets.UTF_8))) - .build(); - try { - HttpExecuteResponse response = client.prepareRequest(HttpExecuteRequest.builder() - .request(request) - .contentStreamProvider( - request.contentStreamProvider() - .orElse(null)) - .build()) - .call(); - response.responseBody().ifPresent(body -> { - try { - while (body.read() != -1) { - // drain body so mid-body timeouts surface - } - body.close(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } }