Skip to content

Commit

Permalink
8347597: HttpClient: improve exception reporting when closing connection
Browse files Browse the repository at this point in the history
  • Loading branch information
dfuch committed Jan 13, 2025
1 parent 0612636 commit 0be4389
Show file tree
Hide file tree
Showing 15 changed files with 182 additions and 122 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -72,7 +72,7 @@ public CompletableFuture<Void> finishConnect() {
if (ex == null) {
return plainConnection.finishConnect();
} else {
plainConnection.close();
plainConnection.close(ex);
return MinimalFuture.<Void>failedFuture(ex);
} })
.thenCompose(Function.identity());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -81,7 +81,7 @@ public CompletableFuture<Void> finishConnect() {
if (ex == null) {
return plainConnection.finishConnect();
} else {
plainConnection.close();
plainConnection.close(ex);
return MinimalFuture.<Void>failedFuture(ex);
} })
.thenCompose(Function.identity());
Expand Down
75 changes: 43 additions & 32 deletions src/java.net.http/share/classes/jdk/internal/net/http/Exchange.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -33,11 +33,13 @@
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.net.http.HttpClient;
import java.net.http.HttpResponse;
import java.net.http.HttpTimeoutException;

import jdk.internal.net.http.HttpClientImpl.DelegatingExecutor;
import jdk.internal.net.http.common.Logger;
import jdk.internal.net.http.common.MinimalFuture;
import jdk.internal.net.http.common.Utils;
Expand All @@ -63,9 +65,9 @@ final class Exchange<T> {

// used to record possible cancellation raised before the exchImpl
// has been established.
private volatile IOException failed;
private final AtomicReference<IOException> failed = new AtomicReference<>();
final MultiExchange<T> multi;
final Executor parentExecutor;
final DelegatingExecutor parentExecutor;
volatile boolean upgrading; // to HTTP/2
volatile boolean upgraded; // to HTTP/2
final PushGroup<T> pushGroup;
Expand All @@ -91,7 +93,7 @@ PushGroup<T> getPushGroup() {
return pushGroup;
}

Executor executor() {
DelegatingExecutor executor() {
return parentExecutor;
}

Expand Down Expand Up @@ -236,26 +238,26 @@ public void cancel(IOException cause) {
// If the impl is non null, propagate the exception right away.
// Otherwise record it so that it can be propagated once the
// exchange impl has been established.
ExchangeImpl<?> impl = exchImpl;
ExchangeImpl<?> impl;
IOException closeReason = null;
synchronized (this) {
impl = exchImpl;
if (impl == null) {
// no impl yet. record the exception
failed.compareAndSet(null, cause);
}
}
if (impl != null) {
// propagate the exception to the impl
if (debug.on()) debug.log("Cancelling exchImpl: %s", exchImpl);
impl.cancel(cause);
} else {
// no impl yet. record the exception
IOException failed = this.failed;
if (failed == null) {
synchronized (this) {
failed = this.failed;
if (failed == null) {
failed = this.failed = cause;
}
}
}

// abort/close the connection if setting up the exchange. This can
// abort/close the connection if setting up the exchange. This can
// be important when setting up HTTP/2
connectionAborter.closeConnection(failed);
closeReason = failed.get();
if (closeReason != null) {
connectionAborter.closeConnection(closeReason);
}

// now call checkCancelled to recheck the impl.
// if the failed state is set and the impl is not null, reset
Expand All @@ -274,9 +276,9 @@ private void checkCancelled() {
ExchangeImpl<?> impl = null;
IOException cause = null;
CompletableFuture<? extends ExchangeImpl<T>> cf = null;
if (failed != null) {
if (failed.get() != null) {
synchronized (this) {
cause = failed;
cause = failed.get();
impl = exchImpl;
cf = exchangeCF;
}
Expand All @@ -286,7 +288,11 @@ private void checkCancelled() {
// The exception is raised by propagating it to the impl.
if (debug.on()) debug.log("Cancelling exchImpl: %s", impl);
impl.cancel(cause);
failed = null;
synchronized (this) {
if (impl == exchImpl) {
failed.compareAndSet(cause, null);
}
}
} else {
Log.logTrace("Exchange: request [{0}/timeout={1}ms] no impl is set."
+ "\n\tCan''t cancel yet with {2}",
Expand All @@ -313,7 +319,7 @@ <U> CompletableFuture<U> checkCancelled(CompletableFuture<U> cf, HttpConnection
if (t == null) t = new IOException("Request cancelled");
if (debug.on()) debug.log("exchange cancelled during connect: " + t);
try {
connection.close();
connection.close(t);
} catch (Throwable x) {
if (debug.on()) debug.log("Failed to close connection", x);
}
Expand All @@ -330,8 +336,13 @@ public void h2Upgrade() {
request.setH2Upgrade(this);
}

synchronized IOException failed(IOException io) {
IOException cause = failed.compareAndExchange(null, io);
return cause == null ? io : cause;
}

synchronized IOException getCancelCause() {
return failed;
return failed.get();
}

// get/set the exchange impl, solving race condition issues with
Expand Down Expand Up @@ -409,6 +420,11 @@ private CompletableFuture<Response> checkFor407(ExchangeImpl<T> ex, Throwable t,
}
}

private CompletableFuture<Response> startSendingBody(DelegatingExecutor executor) {
return exchImpl.sendBodyAsync()
.thenCompose(exIm -> exIm.getResponseAsync(executor));
}

// After sending the request headers, if no ProxyAuthorizationRequired
// was raised and the expectContinue flag is on, we need to wait
// for the 100-Continue response
Expand All @@ -430,9 +446,7 @@ private CompletableFuture<Response> expectContinue(ExchangeImpl<T> ex) {
if (debug.on())
debug.log("Setting ExpectTimeoutRaised and sending request body");
exchImpl.setExpectTimeoutRaised();
CompletableFuture<Response> cf =
exchImpl.sendBodyAsync()
.thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));
CompletableFuture<Response> cf = startSendingBody(parentExecutor);
cf = wrapForUpgrade(cf);
cf = wrapForLog(cf);
return cf;
Expand All @@ -444,9 +458,7 @@ private CompletableFuture<Response> expectContinue(ExchangeImpl<T> ex) {
nonFinalResponses.incrementAndGet();
Log.logTrace("Received 100-Continue: sending body");
if (debug.on()) debug.log("Received 100-Continue for %s", r1);
CompletableFuture<Response> cf =
exchImpl.sendBodyAsync()
.thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));
CompletableFuture<Response> cf = startSendingBody(parentExecutor);
cf = wrapForUpgrade(cf);
cf = wrapForLog(cf);
return cf;
Expand All @@ -471,8 +483,7 @@ private CompletableFuture<Response> expectContinue(ExchangeImpl<T> ex) {
private CompletableFuture<Response> sendRequestBody(ExchangeImpl<T> ex) {
assert !request.expectContinue();
if (debug.on()) debug.log("sendRequestBody");
CompletableFuture<Response> cf = ex.sendBodyAsync()
.thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));
CompletableFuture<Response> cf = startSendingBody(parentExecutor);
cf = wrapForUpgrade(cf);
// after 101 is handled we check for other 1xx responses
cf = cf.thenCompose(this::ignore1xxResponse);
Expand Down Expand Up @@ -669,7 +680,7 @@ HttpResponse.BodySubscriber<T> ignoreBody(HttpResponse.ResponseInfo hdrs) {
// Either way, we need to relay it to s.
synchronized (this) {
exchImpl = s;
t = failed;
t = failed.get();
}
// Check whether the HTTP/1.1 was cancelled.
if (t == null) t = e.getCancelCause();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -259,8 +259,10 @@ private void flush() {
checkRequestMore();

} catch (Throwable t) {
Throwable x = error;
if (x == null) error = t; // will be handled in the finally block
synchronized (this) {
Throwable x = error;
if (x == null) error = t; // will be handled in the finally block
}
if (debug.on()) debug.log("Unexpected error caught in flush()", t);
} finally {
// Handles any pending error.
Expand Down Expand Up @@ -312,7 +314,7 @@ private void checkForErrors() {
// close the upstream connection.
Http1Exchange<?> exchg = owner;
stop();
if (exchg != null) exchg.connection().close();
if (exchg != null) exchg.connection().close(x);
}
}
}
Expand Down Expand Up @@ -346,7 +348,7 @@ private boolean hasDemand(Http1AsyncDelegate delegate) {
AbstractSubscription subscription = delegate.subscription();
long demand = subscription.demand().get();
if (debug.on())
debug.log("downstream subscription demand is %s", demand);
debug.log("downstream subscription demand is %s for %s", demand, delegate);
return demand > 0;
}

Expand Down Expand Up @@ -573,7 +575,7 @@ void requestMore() {
if (canRequestMore.compareAndSet(true, false)) {
if (!completed && !dropped) {
if (debug.on())
debug.log("Http1TubeSubscriber: requesting one more from upstream");
debug.log("Http1TubeSubscriber: requesting one more from upstream: " + s);
s.request(1);
return;
}
Expand Down
Loading

0 comments on commit 0be4389

Please sign in to comment.