Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "AWS CRT-based HTTP Client",
"contributor": "",
"description": "Fixed an issue where AwsCrtHttpClient (sync) could deadlock when a request body was sourced from an InputStream that depends on the same CRT event loop, for example when piping a GetObject ResponseInputStream into a PutObject body. The InputStream read now happens on the caller thread instead of the CRT event-loop thread."
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,9 @@
<!-- ResponseHandlerHelper has helper method closeConnection() which handles safe closing of connection -->
<suppress id="NoCrtStreamCancel"
files=".*ResponseHandlerHelper\.java$"/>

<!-- LongRunningRequestTestSupport uses java.lang.management on hang to capture a thread dump
that surfaces the failure cause in CI logs. -->
<suppress checks="software.amazon.awssdk.buildtools.checkstyle.NonJavaBaseModuleCheck"
files=".*LongRunningRequestTestSupport\.java$"/>
</suppressions>
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,15 @@

<Class name="~software\.amazon\.awssdk\.messagemanager\.sns\.internal\.SnsHostProvider" />

<!-- BodyChunkPipe is the producer/consumer pipe for the sync CRT client; producer-side
acquireForFill is intentionally blocking on back-pressure and only ever runs on the
caller (sync) thread, never on the CRT event loop. -->
<Class name="~software\.amazon\.awssdk\.http\.crt\.internal\.request\.BodyChunkPipe" />

<!-- CrtHttpRequest.waitForStreamAcquired blocks the caller (sync) thread on the stream
acquisition future with a hard timeout; never runs on the CRT event loop. -->
<Class name="~software\.amazon\.awssdk\.http\.crt\.AwsCrtHttpClient\$CrtHttpRequest" />

<!-- test modules are allowed to make blocking call as parts of their testing -->
<Class name="~.*testutils.*" />
<Class name="~.*s3benchmarks.*" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,14 @@
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.crt.http.HttpException;
import software.amazon.awssdk.crt.http.HttpStreamBase;
import software.amazon.awssdk.crt.http.HttpStreamManager;
import software.amazon.awssdk.http.ExecutableHttpRequest;
import software.amazon.awssdk.http.HttpExecuteRequest;
Expand All @@ -35,8 +40,10 @@
import software.amazon.awssdk.http.crt.internal.AwsCrtClientBuilderBase;
import software.amazon.awssdk.http.crt.internal.CrtRequestContext;
import software.amazon.awssdk.http.crt.internal.CrtRequestExecutor;
import software.amazon.awssdk.http.crt.internal.request.SyncRequestBodyPump;
import software.amazon.awssdk.utils.AttributeMap;
import software.amazon.awssdk.utils.CompletableFutureUtils;
import software.amazon.awssdk.utils.Logger;

/**
* An implementation of {@link SdkHttpClient} that uses the AWS Common Runtime (CRT) Http Client to communicate with
Expand Down Expand Up @@ -98,43 +105,103 @@ public ExecutableHttpRequest prepareRequest(HttpExecuteRequest request) {
* request)
*/
HttpStreamManager streamManager = getOrCreateConnectionPool(poolKey(request.httpRequest()));
// Tests may override via x-aws-sdk-test-id so surefire output can be grep'd by request.
String reqId = request.httpRequest()
.firstMatchingHeader("x-aws-sdk-test-id")
.orElseGet(() -> String.format("%08x", ThreadLocalRandom.current().nextInt()));
CrtRequestContext context = CrtRequestContext.builder()
.streamManager(streamManager)
.readBufferSize(this.readBufferSize)
.request(request)
.connectionAcquisitionTimeoutMillis(this.connectionAcquisitionTimeout)
.reqId(reqId)
.build();
return new CrtHttpRequest(context);
}

private static final class CrtHttpRequest implements ExecutableHttpRequest {
private static final Logger LOG = Logger.loggerFor(CrtHttpRequest.class);

private final CrtRequestContext context;
private final String reqId;
private final String tag;
private volatile CompletableFuture<SdkHttpFullResponse> responseFuture;
private volatile SyncRequestBodyPump pump;

private CrtHttpRequest(CrtRequestContext context) {
this.context = context;
this.reqId = context.reqId();
this.tag = "[reqId=" + reqId + "] ";
}

@Override
public HttpExecuteResponse call() throws IOException {
HttpExecuteResponse.Builder builder = HttpExecuteResponse.builder();
boolean hasBody = context.sdkRequest().contentStreamProvider().isPresent();
LOG.info(() -> tag + "call() entered, hasBody=" + hasBody);

try {
responseFuture = new CrtRequestExecutor().execute(context);
CrtRequestExecutor.Result result = new CrtRequestExecutor().execute(context);
responseFuture = result.responseFuture();
pump = result.pump();
LOG.info(() -> tag + "call() executor.execute() returned, streamFuture pending, pump="
+ (pump != null ? "non-null" : "null"));

if (pump != null) {
SyncRequestBodyPump pumpRef = pump;
responseFuture.whenComplete((r, t) -> {
if (t != null) {
LOG.info(() -> tag + "responseFuture hook: invoking pump.abort() (cause="
+ t.getClass().getSimpleName() + ")");
pumpRef.abort();
}
});
}

LOG.info(() -> tag + "call() entering waitForStreamAcquired, timeoutMillis="
+ context.connectionAcquisitionTimeoutMillis());
boolean streamAcquired = waitForStreamAcquired(result.streamFuture(),
context.connectionAcquisitionTimeoutMillis());
LOG.info(() -> tag + "call() waitForStreamAcquired returned " + streamAcquired);

if (pump != null) {
if (streamAcquired) {
LOG.info(() -> tag + "call() entering pump.pump()");
try {
pump.pump();
LOG.info(() -> tag + "call() pump.pump() returned");
} catch (IOException ioe) {
LOG.info(() -> tag + "call() pump.pump() threw IOException: " + ioe.getMessage());
responseFuture.completeExceptionally(ioe);
throw ioe;
}
} else {
LOG.info(() -> tag + "call() invoking pump.abort() (post-wait, streamAcquired=false)");
pump.abort();
}
}

LOG.info(() -> tag + "call() entering joinInterruptibly(responseFuture)");
SdkHttpFullResponse response = CompletableFutureUtils.joinInterruptibly(responseFuture);
LOG.info(() -> tag + "call() responseFuture joined: success");
builder.response(response);
builder.responseBody(response.content().orElse(null));
LOG.info(() -> tag + "call() exiting normally");
return builder.build();
} catch (CompletionException e) {
Throwable cause = e.getCause();
LOG.info(() -> tag + "call() catch CompletionException, cause="
+ (cause == null ? "<null>" : cause.getClass().getName() + ": " + cause.getMessage()));

// Complete the future exceptionally to trigger connection cleanup in the response handler.
// Handles thread-interrupt case where joinInterruptibly throws due to
// InterruptedException. Without this, the
// Ensures that closeConnection() is invoked to prevent leaking the connection from the pool.
if (responseFuture != null) {
responseFuture.completeExceptionally(cause != null ? cause : e);
}

if (pump != null) {
LOG.info(() -> tag + "call() catch invoking pump.abort()");
pump.abort();
}

if (cause instanceof IOException) {
throw (IOException) cause;
}
Expand All @@ -153,9 +220,40 @@ public HttpExecuteResponse call() throws IOException {

@Override
public void abort() {
LOG.info(() -> tag + "abort() called externally");
if (responseFuture != null) {
responseFuture.completeExceptionally(new IOException("Request was cancelled"));
}
if (pump != null) {
LOG.info(() -> tag + "abort() invoking pump.abort()");
pump.abort();
}
}

private boolean waitForStreamAcquired(CompletableFuture<HttpStreamBase> streamFuture, long timeoutMillis) {
if (streamFuture == null) {
LOG.info(() -> tag + "waitForStreamAcquired: streamFuture==null, returning false");
return false;
}
LOG.info(() -> tag + "waitForStreamAcquired: starting, timeout=" + timeoutMillis + "ms");
try {
streamFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
LOG.info(() -> tag + "waitForStreamAcquired: streamFuture completed normally");
return true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.info(() -> tag + "waitForStreamAcquired: interrupted");
return false;
} catch (TimeoutException e) {
LOG.warn(() -> tag + "waitForStreamAcquired: timed out after " + timeoutMillis
+ "ms - streamFuture still pending");
return false;
} catch (ExecutionException e) {
Throwable cause = e.getCause();
LOG.info(() -> tag + "waitForStreamAcquired: streamFuture completed exceptionally: "
+ (cause == null ? e.getMessage() : cause.getClass().getName() + ": " + cause.getMessage()));
return false;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ abstract class AwsCrtHttpClientBase implements SdkAutoCloseable {

protected final long readBufferSize;
protected final Protocol protocol;
protected final long connectionAcquisitionTimeout;
private final Map<URI, HttpStreamManager> connectionPools = new ConcurrentHashMap<>();
private final LinkedList<CrtResource> ownedSubResources = new LinkedList<>();
private final ClientBootstrap bootstrap;
Expand All @@ -70,7 +71,6 @@ abstract class AwsCrtHttpClientBase implements SdkAutoCloseable {
private final HttpMonitoringOptions monitoringOptions;
private final long maxConnectionIdleInMilliseconds;
private final int maxStreamsPerEndpoint;
private final long connectionAcquisitionTimeout;
private final TlsContextOptions tlsContextOptions;
private boolean isClosed = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ private void doExecute(CrtAsyncRequestContext executionContext,
long finalAcquireStartTime = acquireStartTime;

streamFuture.whenComplete((stream, throwable) -> {
crtResponseHandler.onAcquireStream(stream);
if (throwable == null) {
crtResponseHandler.onAcquireStream(stream);
}
if (shouldPublishMetrics) {
reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,16 @@ public final class CrtRequestContext {
private final long readBufferSize;
private final HttpStreamManager streamManager;
private final MetricCollector metricCollector;
private final long connectionAcquisitionTimeoutMillis;
private final String reqId;

private CrtRequestContext(Builder builder) {
this.request = builder.request;
this.readBufferSize = builder.readBufferSize;
this.streamManager = builder.streamManager;
this.metricCollector = request.metricCollector().orElse(null);
this.connectionAcquisitionTimeoutMillis = builder.connectionAcquisitionTimeoutMillis;
this.reqId = builder.reqId;
}

public static Builder builder() {
Expand All @@ -54,10 +58,20 @@ public MetricCollector metricCollector() {
return metricCollector;
}

public long connectionAcquisitionTimeoutMillis() {
return connectionAcquisitionTimeoutMillis;
}

public String reqId() {
return reqId;
}

public static final class Builder {
private HttpExecuteRequest request;
private long readBufferSize;
private HttpStreamManager streamManager;
private long connectionAcquisitionTimeoutMillis;
private String reqId;

private Builder() {
}
Expand All @@ -77,6 +91,16 @@ public Builder streamManager(HttpStreamManager streamManager) {
return this;
}

public Builder connectionAcquisitionTimeoutMillis(long connectionAcquisitionTimeoutMillis) {
this.connectionAcquisitionTimeoutMillis = connectionAcquisitionTimeoutMillis;
return this;
}

public Builder reqId(String reqId) {
this.reqId = reqId;
return this;
}

public CrtRequestContext build() {
return new CrtRequestContext(this);
}
Expand Down
Loading
Loading