Skip to content

Commit

Permalink
Add more comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dfuch committed Nov 11, 2024
1 parent 318b181 commit 9d21e58
Showing 1 changed file with 23 additions and 2 deletions.
25 changes: 23 additions & 2 deletions src/java.net.http/share/classes/jdk/internal/net/http/Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ class Stream<T> extends ExchangeImpl<T> {
// send lock: prevent sending DataFrames after reset occurred.
private final Lock sendLock = new ReentrantLock();
private final Lock stateLock = new ReentrantLock();
// inputQ lock: methods that take from the inputQ
// must not run concurrently.
private final Lock inputQLock = new ReentrantLock();

/**
Expand All @@ -185,6 +187,7 @@ HttpConnection connection() {
private void schedule() {
boolean onCompleteCalled = false;
HttpResponse.BodySubscriber<T> subscriber = responseSubscriber;
// prevents drainInputQueue() from running concurrently
inputQLock.lock();
try {
if (subscriber == null) {
Expand Down Expand Up @@ -284,12 +287,17 @@ private void schedule() {
}
}

// must only be called from the scheduler schedule() loop.
// ensure that all received data frames are accounted for
// Called from the scheduler schedule() loop,
// or after resetting the stream.
// Ensures that all received data frames are accounted for
// in the connection window flow control if the scheduler
// is stopped before all the data is consumed.
// The inputQLock is used to prevent concurrently taking
// from the queue.
private void drainInputQueue() {
Http2Frame frame;
// will wait until schedule() has finished taking
// from the queue, if needed.
inputQLock.lock();
try {
while ((frame = inputQ.poll()) != null) {
Expand Down Expand Up @@ -420,6 +428,19 @@ private void receiveDataFrame(DataFrame df) {
}
}

// Ensures that no data frame is pushed on the inputQ
// after the stream is closed.
// Changes to the `closed` boolean are guarded by the
// stateLock. Contention should be low as only one
// thread at a time adds to the inputQ, and
// we can only contend when closing the stream.
// Note that this method can run concurrently with
// methods holding the inputQLock: that is OK.
// The inputQLock is there to ensure that methods
// taking from the queue are not running concurrently
// with each others, but concurrently adding at the
// end of the queue while peeking/polling at the head
// is OK.
private void pushDataFrame(int len, DataFrame df) {
boolean closed = false;
stateLock.lock();
Expand Down

0 comments on commit 9d21e58

Please sign in to comment.