Skip to content

Commit

Permalink
8338569
Browse files Browse the repository at this point in the history
  • Loading branch information
dfuch committed Aug 19, 2024
1 parent ddbc0b6 commit 0436091
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 57 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -44,6 +44,7 @@

import jdk.internal.net.http.common.Deadline;
import jdk.internal.net.http.common.FlowTube;
import jdk.internal.net.http.common.Log;
import jdk.internal.net.http.common.Logger;
import jdk.internal.net.http.common.TimeLine;
import jdk.internal.net.http.common.TimeSource;
Expand Down Expand Up @@ -492,13 +493,13 @@ void clear() {

// Remove a connection from the pool.
// should only be called while holding the ConnectionPool stateLock.
private void removeFromPool(HttpConnection c) {
private boolean removeFromPool(HttpConnection c) {
assert stateLock.isHeldByCurrentThread();
if (c instanceof PlainHttpConnection) {
removeFromPool(c, plainPool);
return removeFromPool(c, plainPool);
} else {
assert c.isSecure() : "connection " + c + " is not secure!";
removeFromPool(c, sslPool);
return removeFromPool(c, sslPool);
}
}

Expand All @@ -524,18 +525,37 @@ private boolean contains0(HttpConnection c) {
return false;
}

void cleanup(HttpConnection c, Throwable error) {
void cleanup(HttpConnection c, long pendingData, Throwable error) {
if (debug.on())
debug.log("%s : ConnectionPool.cleanup(%s)",
String.valueOf(c.getConnectionFlow()), error);
stateLock.lock();
boolean removed;
try {
removeFromPool(c);
removed = removeFromPool(c);
expiryList.remove(c);
} finally {
stateLock.unlock();
}
c.close();
if (!removed && pendingData != 0) {
// this should not happen; the cleanup may have consumed
// some data that wasn't supposed to be consumed, so
// the only thing we can do is log it and close the
// connection.
if (Log.errors()) {
Log.logError("WARNING: CleanupTrigger triggered for" +
" a connection not found in the pool: closing {0}", c.dbgString());
}
if (debug.on()) {
debug.log("WARNING: CleanupTrigger triggered for" +
" a connection not found in the pool: closing %s", c.dbgString());
}
Throwable cause = new IOException("Unexpected cleanup triggered for non pooled connection", error);
cause.printStackTrace();
c.close(cause);
} else {
c.close();
}
}

/**
Expand All @@ -549,32 +569,37 @@ private final class CleanupTrigger implements

private final HttpConnection connection;
private volatile boolean done;
private volatile boolean dropped;

public CleanupTrigger(HttpConnection connection) {
this.connection = connection;
}

public boolean isDone() { return done;}

private void triggerCleanup(Throwable error) {
private void triggerCleanup(long pendingData, Throwable error) {
done = true;
cleanup(connection, error);
if (debug.on()) {
debug.log("Cleanup triggered for %s: pendingData:%s error:%s", this, pendingData, error);
}
cleanup(connection, pendingData, error);
}

@Override public void request(long n) {}
@Override public void cancel() {}

@Override
public void onSubscribe(Flow.Subscription subscription) {
if (dropped || done) return;
subscription.request(1);
}
@Override
public void onError(Throwable error) { triggerCleanup(error); }
public void onError(Throwable error) { triggerCleanup(0, error); }
@Override
public void onComplete() { triggerCleanup(null); }
public void onComplete() { triggerCleanup(0, null); }
@Override
public void onNext(List<ByteBuffer> item) {
triggerCleanup(new IOException("Data received while in pool"));
triggerCleanup(Utils.remaining(item), new IOException("Data received while in pool"));
}

@Override
Expand All @@ -586,5 +611,10 @@ public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
public String toString() {
return "CleanupTrigger(" + connection.getConnectionFlow() + ")";
}

@Override
public void dropSubscription() {
dropped = true;
}
}
}
104 changes: 71 additions & 33 deletions src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -29,6 +29,7 @@
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -557,31 +558,33 @@ private final class InternalReadPublisher
implements Flow.Publisher<List<ByteBuffer>> {
private final InternalReadSubscription subscriptionImpl
= new InternalReadSubscription();
AtomicReference<ReadSubscription> pendingSubscription = new AtomicReference<>();
ConcurrentLinkedQueue<ReadSubscription> pendingSubscriptions = new ConcurrentLinkedQueue<>();
private volatile ReadSubscription subscription;

@Override
public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
Objects.requireNonNull(s);

TubeSubscriber sub = FlowTube.asTubeSubscriber(s);
ReadSubscription target = new ReadSubscription(subscriptionImpl, sub);
ReadSubscription previous = pendingSubscription.getAndSet(target);

if (previous != null && previous != target) {
ReadSubscription previous;
while ((previous = pendingSubscriptions.poll()) != null) {
if (debug.on())
debug.log("read publisher: dropping pending subscriber: "
+ previous.subscriber);
previous.errorRef.compareAndSet(null, errorRef.get());
// make sure no data will be routed to the old subscriber.
previous.stopReading();
previous.signalOnSubscribe();
if (subscriptionImpl.completed) {
previous.signalCompletion();
} else {
previous.subscriber.dropSubscription();
}
}
ReadSubscription target = new ReadSubscription(subscriptionImpl, sub);
pendingSubscriptions.offer(target);

if (debug.on()) debug.log("read publisher got subscriber");
if (debug.on()) debug.log("read publisher got new subscriber: " + s);
subscriptionImpl.signalSubscribe();
debugState("leaving read.subscribe: ");
}
Expand All @@ -606,6 +609,7 @@ final class ReadSubscription implements Flow.Subscription {
volatile boolean subscribed;
volatile boolean cancelled;
volatile boolean completed;
private volatile boolean stopped;

public ReadSubscription(InternalReadSubscription impl,
TubeSubscriber subscriber) {
Expand All @@ -623,11 +627,12 @@ public void cancel() {

@Override
public void request(long n) {
if (!cancelled) {
if (!cancelled && !stopped) {
// should be safe to not synchronize here.
impl.request(n);
} else {
if (debug.on())
debug.log("subscription cancelled, ignoring request %d", n);
debug.log("subscription stopped or cancelled, ignoring request %d", n);
}
}

Expand Down Expand Up @@ -661,6 +666,32 @@ void signalOnSubscribe() {
signalCompletion();
}
}

/**
* Called when switching subscriber on the {@link InternalReadSubscription}.
* This subscriber is the old subscriber. Demand on the internal
* subscription will be reset and reading will be paused until the
* new subscriber is subscribed.
* This should ensure that no data is routed to this subscriber
* until the new subscriber is subscribed.
*/
synchronized void stopReading() {
stopped = true;
impl.demand.reset();
}

synchronized boolean tryDecrementDemand() {
if (stopped) return false;
return impl.demand.tryDecrement();
}

synchronized boolean isStopped() {
return stopped;
}

synchronized void increaseDemand(long n) {
if (!stopped) impl.demand.increase(n);
}
}

final class InternalReadSubscription implements Flow.Subscription {
Expand Down Expand Up @@ -835,7 +866,7 @@ final void read() {

// If we reach here then we must be in the selector thread.
assert client.isSelectorThread();
if (demand.tryDecrement()) {
if (current.tryDecrementDemand()) {
// we have demand.
try {
List<ByteBuffer> bytes = readAvailable(current.bufferSource);
Expand Down Expand Up @@ -881,8 +912,10 @@ final void read() {
// event. This ensures that this loop is
// executed again when the socket becomes
// readable again.
demand.increase(1);
resumeReadEvent();
if (!current.isStopped()) {
current.increaseDemand(1);
resumeReadEvent();
}
if (errorRef.get() != null) continue;
debugState("leaving read() loop with no bytes");
return;
Expand Down Expand Up @@ -922,30 +955,35 @@ final void read() {
}

boolean handlePending() {
ReadSubscription pending = pendingSubscription.getAndSet(null);
if (pending == null) return false;
if (debug.on())
debug.log("handling pending subscription for %s",
ReadSubscription pending;
boolean subscribed = false;
while ((pending = pendingSubscriptions.poll()) != null) {
subscribed = true;
if (debug.on())
debug.log("handling pending subscription for %s",
pending.subscriber);
ReadSubscription current = subscription;
if (current != null && current != pending && !completed) {
current.subscriber.dropSubscription();
}
if (debug.on()) debug.log("read demand reset to 0");
subscriptionImpl.demand.reset(); // subscriber will increase demand if it needs to.
pending.errorRef.compareAndSet(null, errorRef.get());
if (!readScheduler.isStopped()) {
subscription = pending;
} else {
if (debug.on()) debug.log("socket tube is already stopped");
}
if (debug.on()) debug.log("calling onSubscribe");
pending.signalOnSubscribe();
if (completed) {
ReadSubscription current = subscription;
if (current != null && current != pending && !completed) {
debug.log("dropping pending subscription for current %s",
current.subscriber);
current.subscriber.dropSubscription();
}
if (debug.on()) debug.log("read demand reset to 0");
subscriptionImpl.demand.reset(); // subscriber will increase demand if it needs to.
pending.errorRef.compareAndSet(null, errorRef.get());
pending.signalCompletion();
if (!readScheduler.isStopped()) {
subscription = pending;
} else {
if (debug.on()) debug.log("socket tube is already stopped");
}
if (debug.on()) debug.log("calling onSubscribe on " + pending.subscriber);
pending.signalOnSubscribe();
if (completed) {
pending.errorRef.compareAndSet(null, errorRef.get());
pending.signalCompletion();
}
}
return true;
return subscribed;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ public void onError(Throwable throwable) {
public void onComplete() {
delegate.onComplete();
}
@Override
public String toString() {
return "TubeSubscriberWrapper("+delegate.toString()+")";
}
}

}
Expand Down
4 changes: 2 additions & 2 deletions test/jdk/java/net/httpclient/DigestEchoClient.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -64,7 +64,7 @@
* @test
* @summary this test verifies that a client may provides authorization
* headers directly when connecting with a server.
* @bug 8087112
* @bug 8087112 8336655 8338569
* @library /test/lib /test/jdk/java/net/httpclient/lib
* @build jdk.httpclient.test.lib.common.HttpServerAdapters jdk.test.lib.net.SimpleSSLContext
* DigestEchoServer ReferenceTracker DigestEchoClient
Expand Down
10 changes: 8 additions & 2 deletions test/jdk/java/net/httpclient/ShutdownNow.java
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,10 @@ void testConcurrent(String uriString) throws Exception {
if (client.awaitTermination(Duration.ofMillis(2000))) {
out.println("Client terminated within expected delay");
} else {
throw new AssertionError("client still running");
System.out.println(TRACKER.diagnose(client));
if (client.awaitTermination(Duration.ofMillis(500))) {
throw new AssertionError("client still running: " + TRACKER.diagnose(client));
}
}
assertTrue(client.isTerminated());
}
Expand Down Expand Up @@ -275,7 +278,10 @@ void testSequential(String uriString) throws Exception {
if (client.awaitTermination(Duration.ofMillis(2000))) {
out.println("Client terminated within expected delay");
} else {
throw new AssertionError("client still running");
System.out.println(TRACKER.diagnose(client));
if (client.awaitTermination(Duration.ofMillis(500))) {
throw new AssertionError("client still running: " + TRACKER.diagnose(client));
}
}
assertTrue(client.isTerminated());
}
Expand Down
Loading

0 comments on commit 0436091

Please sign in to comment.