Skip to content

Commit

Permalink
8336655
Browse files Browse the repository at this point in the history
  • Loading branch information
dfuch committed Aug 13, 2024
1 parent 877fd5a commit eb1e350
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -44,6 +44,7 @@

import jdk.internal.net.http.common.Deadline;
import jdk.internal.net.http.common.FlowTube;
import jdk.internal.net.http.common.Log;
import jdk.internal.net.http.common.Logger;
import jdk.internal.net.http.common.TimeLine;
import jdk.internal.net.http.common.TimeSource;
Expand Down Expand Up @@ -492,13 +493,13 @@ void clear() {

// Remove a connection from the pool.
// should only be called while holding the ConnectionPool stateLock.
private void removeFromPool(HttpConnection c) {
private boolean removeFromPool(HttpConnection c) {
assert stateLock.isHeldByCurrentThread();
if (c instanceof PlainHttpConnection) {
removeFromPool(c, plainPool);
return removeFromPool(c, plainPool);
} else {
assert c.isSecure() : "connection " + c + " is not secure!";
removeFromPool(c, sslPool);
return removeFromPool(c, sslPool);
}
}

Expand Down Expand Up @@ -529,13 +530,29 @@ void cleanup(HttpConnection c, Throwable error) {
debug.log("%s : ConnectionPool.cleanup(%s)",
String.valueOf(c.getConnectionFlow()), error);
stateLock.lock();
boolean removed;
try {
removeFromPool(c);
removed = removeFromPool(c);
expiryList.remove(c);
} finally {
stateLock.unlock();
}
c.close();
if (!removed) {
// this should not happen; the cleanup may have consumed
// some data that wasn't supposed to be consumed, so
// the only thing we can do is log it and close the
// connection.
if (Log.errors()) {
Log.logError("WARNING: CleanupTrigger triggered for" +
" a connection not found in the pool: closing {0}", c);
} else if (debug.on()) {
debug.log("WARNING: CleanupTrigger triggered for" +
" a connection not found in the pool: closing %s", c);
}
c.close(new IOException("Unexpected cleanup triggered for non pooled connection"));
} else {
c.close();
}
}

/**
Expand All @@ -549,6 +566,7 @@ private final class CleanupTrigger implements

private final HttpConnection connection;
private volatile boolean done;
private volatile boolean dropped;

public CleanupTrigger(HttpConnection connection) {
this.connection = connection;
Expand All @@ -566,6 +584,7 @@ private void triggerCleanup(Throwable error) {

@Override
public void onSubscribe(Flow.Subscription subscription) {
if (dropped || done) return;
subscription.request(1);
}
@Override
Expand All @@ -586,5 +605,10 @@ public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
public String toString() {
return "CleanupTrigger(" + connection.getConnectionFlow() + ")";
}

@Override
public void dropSubscription() {
dropped = true;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -573,6 +573,8 @@ public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
debug.log("read publisher: dropping pending subscriber: "
+ previous.subscriber);
previous.errorRef.compareAndSet(null, errorRef.get());
// make sure no data will be routed to the old subscriber.
previous.stopReading();
previous.signalOnSubscribe();
if (subscriptionImpl.completed) {
previous.signalCompletion();
Expand Down Expand Up @@ -606,6 +608,7 @@ final class ReadSubscription implements Flow.Subscription {
volatile boolean subscribed;
volatile boolean cancelled;
volatile boolean completed;
volatile boolean stopped;

public ReadSubscription(InternalReadSubscription impl,
TubeSubscriber subscriber) {
Expand All @@ -623,11 +626,11 @@ public void cancel() {

@Override
public void request(long n) {
if (!cancelled) {
if (!cancelled && !stopped) {
impl.request(n);
} else {
if (debug.on())
debug.log("subscription cancelled, ignoring request %d", n);
debug.log("subscription stopped or cancelled, ignoring request %d", n);
}
}

Expand Down Expand Up @@ -661,6 +664,20 @@ void signalOnSubscribe() {
signalCompletion();
}
}

/**
* Called when switching subscriber on the {@link InternalReadSubscription}.
* This subscriber is the old subscriber. Demand on the internal
* subscription will be reset and reading will be paused until the
* new subscriber is subscribed.
* This should ensure that no data is routed to this subscriber
* until the new subscriber is subscribed.
*/
void stopReading() {
stopped = true;
impl.demand.reset();
impl.pauseReadEvent();
}
}

final class InternalReadSubscription implements Flow.Subscription {
Expand Down
4 changes: 2 additions & 2 deletions test/jdk/java/net/httpclient/DigestEchoClient.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -64,7 +64,7 @@
* @test
* @summary this test verifies that a client may provides authorization
* headers directly when connecting with a server.
* @bug 8087112
* @bug 8087112 8336655
* @library /test/lib /test/jdk/java/net/httpclient/lib
* @build jdk.httpclient.test.lib.common.HttpServerAdapters jdk.test.lib.net.SimpleSSLContext
* DigestEchoServer ReferenceTracker DigestEchoClient
Expand Down

0 comments on commit eb1e350

Please sign in to comment.