Skip to content

Commit

Permalink
Integrated review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
dfuch committed Oct 22, 2024
1 parent 6ac7f5c commit bcbec67
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1103,10 +1103,12 @@ final void releaseUnconsumed(DataFrame df) {
dropDataFrame(df);
}

// This method is called when a DataFrame is dropped before/without
// having been added to any Stream::inputQ. In that case, the number
// of unprocessed bytes hasn't been incremented by the stream, and
// does not need to be decremented.
// This method can be called directly when a DataFrame is dropped
// before/without having been added to any Stream::inputQ.
// In that case, the number of unprocessed bytes hasn't been incremented
// by the stream, and does not need to be decremented.
// Otherwise, if the frame is dropped after having been added to the
// inputQ, releaseUnconsumed above should be called.
final void dropDataFrame(DataFrame df) {
if (isMarked(closedState, SHUTDOWN_REQUESTED)) return;
if (debug.on()) {
Expand Down Expand Up @@ -1954,7 +1956,7 @@ int getStreamId() {
}

@Override
protected boolean windowSizeExceeded(int received) {
protected boolean windowSizeExceeded(long received) {
if (connection.isOpen()) {
try {
connection.protocolError(ErrorFrame.FLOW_CONTROL_ERROR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1767,7 +1767,7 @@ String dbgString() {
}

@Override
protected boolean windowSizeExceeded(int received) {
protected boolean windowSizeExceeded(long received) {
onProtocolError(new ProtocolException("stream %s flow control window exceeded"
.formatted(streamid)), ResetFrame.FLOW_CONTROL_ERROR);
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import jdk.internal.net.http.frame.WindowUpdateFrame;
import jdk.internal.net.http.common.Utils;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;

/**
Expand All @@ -50,11 +51,11 @@ abstract class WindowUpdateSender {
// The amount of flow controlled data received and processed, in bytes,
// since the start of the window.
// The window is exhausted when received + unprocessed >= windowSize
final AtomicInteger received = new AtomicInteger();
final AtomicLong received = new AtomicLong();
// The amount of flow controlled data received and unprocessed, in bytes,
// since the start of the window.
// The window is exhausted when received + unprocessed >= windowSize
final AtomicInteger unprocessed = new AtomicInteger();
final AtomicLong unprocessed = new AtomicLong();
final ReentrantLock sendLock = new ReentrantLock();

WindowUpdateSender(Http2Connection connection) {
Expand Down Expand Up @@ -123,8 +124,11 @@ boolean canBufferUnprocessedBytes(int len) {
// received and processed bytes and checks whether the
// flow control window is exceeded. If so, take
// corrective actions and return true.
private boolean checkWindowSizeExceeded(int len) {
int rcv = Math.addExact(received.get(), len);
private boolean checkWindowSizeExceeded(long len) {
// because windowSize is bound by Integer.MAX_VALUE
// we will never reach the point where received.get() + len
// could overflow
long rcv = received.get() + len;
return rcv > windowSize && windowSizeExceeded(rcv);
}

Expand All @@ -139,7 +143,7 @@ private boolean checkWindowSizeExceeded(int len) {
* @param delta the amount of processed bytes to release
*/
void processed(int delta) {
int rest = unprocessed.addAndGet(-delta);
long rest = unprocessed.addAndGet(-delta);
assert rest >= 0;
update(delta);
}
Expand All @@ -161,8 +165,8 @@ void processed(int delta) {
*
* @return the amount of remaining unprocessed bytes
*/
int released(int delta) {
int rest = unprocessed.addAndGet(-delta);
long released(int delta) {
long rest = unprocessed.addAndGet(-delta);
assert rest >= 0;
return rest;
}
Expand All @@ -181,15 +185,15 @@ int released(int delta) {
* @param delta the amount of bytes released from the window.
*/
void update(int delta) {
int rcv = received.addAndGet(delta);
long rcv = received.addAndGet(delta);
if (debug.on()) debug.log("update: %d, received: %d, limit: %d", delta, rcv, limit);
if (rcv > windowSize && windowSizeExceeded(rcv)) {
return;
}
if (rcv > limit) {
sendLock.lock();
try {
int tosend = received.get();
int tosend = (int)Math.min(received.get(), Integer.MAX_VALUE);
if (tosend > limit) {
received.getAndAdd(-tosend);
sendWindowUpdate(tosend);
Expand Down Expand Up @@ -230,6 +234,6 @@ String dbgString() {
* @return {@code true} if the error was reported to the peer
* and no further window update should be sent.
*/
protected abstract boolean windowSizeExceeded(int received);
protected abstract boolean windowSizeExceeded(long received);

}

0 comments on commit bcbec67

Please sign in to comment.