Skip to content

Commit

Permalink
8349662: SSLTube SSLSubscriptionWrapper has potential races when swit…
Browse files Browse the repository at this point in the history
…ching subscriptions
  • Loading branch information
dfuch committed Feb 7, 2025
1 parent 86cec4e commit acf81c1
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ void connect(Flow.Subscriber<? super List<ByteBuffer>> downReader,
// Connect the read sink first. That's the left-hand side
// downstream subscriber from the HttpConnection (or more
// accurately, the SSLSubscriberWrapper that will wrap it
// when SSLTube::connectFlows is called.
// when SSLTube::connectFlows is called).
reader.subscribe(downReader);

// Connect the right hand side tube (the socket tube).
Expand Down Expand Up @@ -191,7 +191,7 @@ public boolean isFinished() {
private volatile Flow.Subscription readSubscription;

// The DelegateWrapper wraps a subscribed {@code Flow.Subscriber} and
// tracks the subscriber's state. In particular it makes sure that
// tracks the subscriber's state. In particular, it makes sure that
// onComplete/onError are not called before onSubscribed.
static final class DelegateWrapper implements FlowTube.TubeSubscriber {
private final FlowTube.TubeSubscriber delegate;
Expand Down Expand Up @@ -302,7 +302,7 @@ public String toString() {

// Used to read data from the SSLTube.
final class SSLSubscriberWrapper implements FlowTube.TubeSubscriber {
private AtomicReference<DelegateWrapper> pendingDelegate =
private final AtomicReference<DelegateWrapper> pendingDelegate =
new AtomicReference<>();
private volatile DelegateWrapper subscribed;
private volatile boolean onCompleteReceived;
Expand Down Expand Up @@ -353,15 +353,15 @@ void setDelegate(Flow.Subscriber<? super List<ByteBuffer>> delegate) {
return;
}
// sslDelegate field should have been initialized by the
// the time we reach here, as there can be no subscriber
// time we reach here, as there can be no subscriber
// until SSLTube is fully constructed.
if (handleNow || !sslDelegate.resumeReader()) {
processPendingSubscriber();
}
}

// Can be called outside of the flow if an error has already been
// raise. Otherwise, must be called within the SSLFlowDelegate
// Can be called outside the flow if an error has already been
// raised. Otherwise, must be called within the SSLFlowDelegate
// downstream reader flow.
// If there is a subscription, and if there is a pending delegate,
// calls dropSubscription() on the previous delegate (if any),
Expand Down Expand Up @@ -619,43 +619,74 @@ final class SSLSubscriptionWrapper implements Flow.Subscription {
private volatile boolean cancelled;

void setSubscription(Flow.Subscription sub) {
long demand = writeDemand.get(); // FIXME: isn't it a racy way of passing the demand?
delegate = sub;
if (debug.on())
debug.log("setSubscription: demand=%d, cancelled:%s", demand, cancelled);
long demand;
// Avoid race condition and requesting demand twice if
// request() runs concurrently with setSubscription()
boolean cancelled;
synchronized (this) {
demand = writeDemand.get();
delegate = sub;
cancelled = this.cancelled;
}
if (debug.on()) {
debug.log("setSubscription: demand=%d, cancelled:%s, new subscription %s",
demand, cancelled, sub);
}

if (cancelled)
delegate.cancel();
sub.cancel();
else if (demand > 0)
sub.request(demand);
}

@Override
public void request(long n) {
writeDemand.increase(n);
if (debug.on()) debug.log("request: n=%d", n);
Flow.Subscription sub = delegate;
if (sub != null && n > 0) {
sub.request(n);
final long demand = n;
// Avoid race condition and requesting demand twice if
// request() runs concurrently with setSubscription()
Flow.Subscription sub;
long demanded;
synchronized (this) {
sub = delegate;
demanded = writeDemand.get();
writeDemand.increase(n);
}
if (debug.on()) {
debug.log("request: n=%s to %s (%s already demanded)",
demand, sub, demanded);
}
if (sub != null && demand > 0) {
if (debug.on()) debug.log("requesting %s from %s", demand, sub);
sub.request(demand);
}
}

@Override
public void cancel() {
cancelled = true;
if (delegate != null)
delegate.cancel();
Flow.Subscription sub;
synchronized (this) {
cancelled = true;
sub = delegate;
}
if (debug.on()) debug.log("cancel: cancelling subscription: " + sub);
if (sub != null) sub.cancel();
}
}

/* Subscriber - writing side */
@Override
public void onSubscribe(Flow.Subscription subscription) {
Objects.requireNonNull(subscription);
Flow.Subscription x = writeSubscription.delegate;
if (x != null)
x.cancel();
Flow.Subscription old;
synchronized (this) {
old = writeSubscription.delegate;
}
if (old != null && old != subscription) {
if (debug.on()) debug.log("onSubscribe: cancelling old subscription: " + old);
old.cancel();
}

if (debug.on()) debug.log("onSubscribe: new subscription: " + subscription);
writeSubscription.setSubscription(subscription);
}

Expand All @@ -664,8 +695,10 @@ public void onNext(List<ByteBuffer> item) {
Objects.requireNonNull(item);
boolean decremented = writeDemand.tryDecrement();
assert decremented : "Unexpected writeDemand: ";
if (debug.on())
debug.log("sending %d buffers to SSL flow delegate", item.size());
if (debug.on()) {
debug.log("sending %s buffers to SSL flow delegate (%s bytes)",
item.size(), Utils.remaining(item));
}
sslDelegate.upstreamWriter().onNext(item);
}

Expand Down
8 changes: 1 addition & 7 deletions test/jdk/java/net/httpclient/CookieHeaderTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -33,9 +33,6 @@
* CookieHeaderTest
*/

import com.sun.net.httpserver.HttpServer;
import com.sun.net.httpserver.HttpsConfigurator;
import com.sun.net.httpserver.HttpsServer;
import jdk.test.lib.net.SimpleSSLContext;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest;
Expand All @@ -51,7 +48,6 @@
import java.io.PrintWriter;
import java.io.Writer;
import java.net.CookieHandler;
import java.net.CookieManager;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
Expand All @@ -65,7 +61,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -76,7 +71,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import jdk.httpclient.test.lib.common.HttpServerAdapters;
import jdk.httpclient.test.lib.http2.Http2TestServer;

import static java.lang.System.out;
import static java.net.http.HttpClient.Version.HTTP_1_1;
Expand Down
Loading

0 comments on commit acf81c1

Please sign in to comment.