Skip to content
Open
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "AWS CRT-based HTTP Client",
"contributor": "",
"description": "Fixed an issue where AwsCrtHttpClient (sync) could deadlock when a request body was sourced from an InputStream that depends on the same CRT event loop, for example when piping a GetObject ResponseInputStream into a PutObject body. The InputStream read now happens on the caller thread instead of the CRT event-loop thread."
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,9 @@
<!-- ResponseHandlerHelper has helper method closeConnection() which handles safe closing of connection -->
<suppress id="NoCrtStreamCancel"
files=".*ResponseHandlerHelper\.java$"/>

<!-- LongRunningRequestTestSupport uses java.lang.management on hang to capture a thread dump
that surfaces the failure cause in CI logs. -->
<suppress checks="software.amazon.awssdk.buildtools.checkstyle.NonJavaBaseModuleCheck"
files=".*LongRunningRequestTestSupport\.java$"/>
</suppressions>
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,15 @@

<Class name="~software\.amazon\.awssdk\.messagemanager\.sns\.internal\.SnsHostProvider" />

<!-- BodyChunkPipe is the producer/consumer pipe for the sync CRT client; producer-side
acquireForFill is intentionally blocking on back-pressure and only ever runs on the
caller (sync) thread, never on the CRT event loop. -->
<Class name="~software\.amazon\.awssdk\.http\.crt\.internal\.request\.BodyChunkPipe" />

<!-- CrtHttpRequest.waitForStreamAcquired blocks the caller (sync) thread on the stream
acquisition future with a hard timeout; never runs on the CRT event loop. -->
<Class name="~software\.amazon\.awssdk\.http\.crt\.AwsCrtHttpClient\$CrtHttpRequest" />

<!-- test modules are allowed to make blocking call as parts of their testing -->
<Class name="~.*testutils.*" />
<Class name="~.*s3benchmarks.*" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,14 @@
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.crt.http.HttpException;
import software.amazon.awssdk.crt.http.HttpStreamBase;
import software.amazon.awssdk.crt.http.HttpStreamManager;
import software.amazon.awssdk.http.ExecutableHttpRequest;
import software.amazon.awssdk.http.HttpExecuteRequest;
Expand All @@ -35,8 +40,10 @@
import software.amazon.awssdk.http.crt.internal.AwsCrtClientBuilderBase;
import software.amazon.awssdk.http.crt.internal.CrtRequestContext;
import software.amazon.awssdk.http.crt.internal.CrtRequestExecutor;
import software.amazon.awssdk.http.crt.internal.request.SyncRequestBodyPump;
import software.amazon.awssdk.utils.AttributeMap;
import software.amazon.awssdk.utils.CompletableFutureUtils;
import software.amazon.awssdk.utils.Logger;

/**
* An implementation of {@link SdkHttpClient} that uses the AWS Common Runtime (CRT) Http Client to communicate with
Expand Down Expand Up @@ -98,43 +105,103 @@ public ExecutableHttpRequest prepareRequest(HttpExecuteRequest request) {
* request)
*/
HttpStreamManager streamManager = getOrCreateConnectionPool(poolKey(request.httpRequest()));
// Tests may override via x-aws-sdk-test-id so surefire output can be grep'd by request.
String reqId = request.httpRequest()
.firstMatchingHeader("x-aws-sdk-test-id")
.orElseGet(() -> String.format("%08x", ThreadLocalRandom.current().nextInt()));
CrtRequestContext context = CrtRequestContext.builder()
.streamManager(streamManager)
.readBufferSize(this.readBufferSize)
.request(request)
.connectionAcquisitionTimeoutMillis(this.connectionAcquisitionTimeout)
.reqId(reqId)
.build();
return new CrtHttpRequest(context);
}

private static final class CrtHttpRequest implements ExecutableHttpRequest {
private static final Logger LOG = Logger.loggerFor(CrtHttpRequest.class);

private final CrtRequestContext context;
private final String reqId;
private final String tag;
private volatile CompletableFuture<SdkHttpFullResponse> responseFuture;
private volatile SyncRequestBodyPump pump;

private CrtHttpRequest(CrtRequestContext context) {
this.context = context;
this.reqId = context.reqId();
this.tag = "[reqId=" + reqId + "] ";
}

@Override
public HttpExecuteResponse call() throws IOException {
HttpExecuteResponse.Builder builder = HttpExecuteResponse.builder();
boolean hasBody = context.sdkRequest().contentStreamProvider().isPresent();
LOG.info(() -> tag + "call() entered, hasBody=" + hasBody);

try {
responseFuture = new CrtRequestExecutor().execute(context);
CrtRequestExecutor.Result result = new CrtRequestExecutor().execute(context);
responseFuture = result.responseFuture();
pump = result.pump();
LOG.info(() -> tag + "call() executor.execute() returned, streamFuture pending, pump="
+ (pump != null ? "non-null" : "null"));

if (pump != null) {
SyncRequestBodyPump pumpRef = pump;
responseFuture.whenComplete((r, t) -> {
if (t != null) {
LOG.info(() -> tag + "responseFuture hook: invoking pump.abort() (cause="
+ t.getClass().getSimpleName() + ")");
pumpRef.abort();
}
});
}

LOG.info(() -> tag + "call() entering waitForStreamAcquired, timeoutMillis="
+ context.connectionAcquisitionTimeoutMillis());
boolean streamAcquired = waitForStreamAcquired(result.streamFuture(),
context.connectionAcquisitionTimeoutMillis());
LOG.info(() -> tag + "call() waitForStreamAcquired returned " + streamAcquired);

if (pump != null) {
if (streamAcquired) {
LOG.info(() -> tag + "call() entering pump.pump()");
try {
pump.pump();
LOG.info(() -> tag + "call() pump.pump() returned");
} catch (IOException ioe) {
LOG.info(() -> tag + "call() pump.pump() threw IOException: " + ioe.getMessage());
responseFuture.completeExceptionally(ioe);
throw ioe;
}
} else {
LOG.info(() -> tag + "call() invoking pump.abort() (post-wait, streamAcquired=false)");
pump.abort();
}
}

LOG.info(() -> tag + "call() entering joinInterruptibly(responseFuture)");
SdkHttpFullResponse response = CompletableFutureUtils.joinInterruptibly(responseFuture);
LOG.info(() -> tag + "call() responseFuture joined: success");
builder.response(response);
builder.responseBody(response.content().orElse(null));
LOG.info(() -> tag + "call() exiting normally");
return builder.build();
} catch (CompletionException e) {
Throwable cause = e.getCause();
LOG.info(() -> tag + "call() catch CompletionException, cause="
+ (cause == null ? "<null>" : cause.getClass().getName() + ": " + cause.getMessage()));

// Complete the future exceptionally to trigger connection cleanup in the response handler.
// Handles thread-interrupt case where joinInterruptibly throws due to
// InterruptedException. Without this, the
// Ensures that closeConnection() is invoked to prevent leaking the connection from the pool.
if (responseFuture != null) {
responseFuture.completeExceptionally(cause != null ? cause : e);
}

if (pump != null) {
LOG.info(() -> tag + "call() catch invoking pump.abort()");
pump.abort();
}

if (cause instanceof IOException) {
throw (IOException) cause;
}
Expand All @@ -153,9 +220,40 @@ public HttpExecuteResponse call() throws IOException {

@Override
public void abort() {
LOG.info(() -> tag + "abort() called externally");
if (responseFuture != null) {
responseFuture.completeExceptionally(new IOException("Request was cancelled"));
}
if (pump != null) {
LOG.info(() -> tag + "abort() invoking pump.abort()");
pump.abort();
}
}

private boolean waitForStreamAcquired(CompletableFuture<HttpStreamBase> streamFuture, long timeoutMillis) {
if (streamFuture == null) {
LOG.info(() -> tag + "waitForStreamAcquired: streamFuture==null, returning false");
return false;
}
LOG.info(() -> tag + "waitForStreamAcquired: starting, timeout=" + timeoutMillis + "ms");
try {
streamFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
LOG.info(() -> tag + "waitForStreamAcquired: streamFuture completed normally");
return true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.info(() -> tag + "waitForStreamAcquired: interrupted");
return false;
} catch (TimeoutException e) {
LOG.warn(() -> tag + "waitForStreamAcquired: timed out after " + timeoutMillis
+ "ms - streamFuture still pending");
return false;
} catch (ExecutionException e) {
Throwable cause = e.getCause();
LOG.info(() -> tag + "waitForStreamAcquired: streamFuture completed exceptionally: "
+ (cause == null ? e.getMessage() : cause.getClass().getName() + ": " + cause.getMessage()));
return false;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ abstract class AwsCrtHttpClientBase implements SdkAutoCloseable {

protected final long readBufferSize;
protected final Protocol protocol;
protected final long connectionAcquisitionTimeout;
private final Map<URI, HttpStreamManager> connectionPools = new ConcurrentHashMap<>();
private final LinkedList<CrtResource> ownedSubResources = new LinkedList<>();
private final ClientBootstrap bootstrap;
Expand All @@ -70,7 +71,6 @@ abstract class AwsCrtHttpClientBase implements SdkAutoCloseable {
private final HttpMonitoringOptions monitoringOptions;
private final long maxConnectionIdleInMilliseconds;
private final int maxStreamsPerEndpoint;
private final long connectionAcquisitionTimeout;
private final TlsContextOptions tlsContextOptions;
private boolean isClosed = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ private void doExecute(CrtAsyncRequestContext executionContext,
long finalAcquireStartTime = acquireStartTime;

streamFuture.whenComplete((stream, throwable) -> {
crtResponseHandler.onAcquireStream(stream);
if (throwable == null) {
crtResponseHandler.onAcquireStream(stream);
}
if (shouldPublishMetrics) {
reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,16 @@ public final class CrtRequestContext {
private final long readBufferSize;
private final HttpStreamManager streamManager;
private final MetricCollector metricCollector;
private final long connectionAcquisitionTimeoutMillis;
private final String reqId;

private CrtRequestContext(Builder builder) {
this.request = builder.request;
this.readBufferSize = builder.readBufferSize;
this.streamManager = builder.streamManager;
this.metricCollector = request.metricCollector().orElse(null);
this.connectionAcquisitionTimeoutMillis = builder.connectionAcquisitionTimeoutMillis;
this.reqId = builder.reqId;
}

public static Builder builder() {
Expand All @@ -54,10 +58,20 @@ public MetricCollector metricCollector() {
return metricCollector;
}

public long connectionAcquisitionTimeoutMillis() {
return connectionAcquisitionTimeoutMillis;
}

public String reqId() {
return reqId;
}

public static final class Builder {
private HttpExecuteRequest request;
private long readBufferSize;
private HttpStreamManager streamManager;
private long connectionAcquisitionTimeoutMillis;
private String reqId;

private Builder() {
}
Expand All @@ -77,6 +91,16 @@ public Builder streamManager(HttpStreamManager streamManager) {
return this;
}

public Builder connectionAcquisitionTimeoutMillis(long connectionAcquisitionTimeoutMillis) {
this.connectionAcquisitionTimeoutMillis = connectionAcquisitionTimeoutMillis;
return this;
}

public Builder reqId(String reqId) {
this.reqId = reqId;
return this;
}

public CrtRequestContext build() {
return new CrtRequestContext(this);
}
Expand Down
Loading
Loading