From bcbec677a3c46f88b356b778b0950d91880caf46 Mon Sep 17 00:00:00 2001 From: Daniel Fuchs Date: Tue, 22 Oct 2024 16:02:44 +0100 Subject: [PATCH] Integrated review feedback --- .../internal/net/http/Http2Connection.java | 12 ++++++---- .../classes/jdk/internal/net/http/Stream.java | 2 +- .../internal/net/http/WindowUpdateSender.java | 24 +++++++++++-------- 3 files changed, 22 insertions(+), 16 deletions(-) diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java index aa7784e96d135..2e6b42049840e 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java @@ -1103,10 +1103,12 @@ final void releaseUnconsumed(DataFrame df) { dropDataFrame(df); } - // This method is called when a DataFrame is dropped before/without - // having been added to any Stream::inputQ. In that case, the number - // of unprocessed bytes hasn't been incremented by the stream, and - // does not need to be decremented. + // This method can be called directly when a DataFrame is dropped + // before/without having been added to any Stream::inputQ. + // In that case, the number of unprocessed bytes hasn't been incremented + // by the stream, and does not need to be decremented. + // Otherwise, if the frame is dropped after having been added to the + // inputQ, releaseUnconsumed above should be called. final void dropDataFrame(DataFrame df) { if (isMarked(closedState, SHUTDOWN_REQUESTED)) return; if (debug.on()) { @@ -1954,7 +1956,7 @@ int getStreamId() { } @Override - protected boolean windowSizeExceeded(int received) { + protected boolean windowSizeExceeded(long received) { if (connection.isOpen()) { try { connection.protocolError(ErrorFrame.FLOW_CONTROL_ERROR, diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java index 06d00f12a5b97..3e657ae40f51b 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java @@ -1767,7 +1767,7 @@ String dbgString() { } @Override - protected boolean windowSizeExceeded(int received) { + protected boolean windowSizeExceeded(long received) { onProtocolError(new ProtocolException("stream %s flow control window exceeded" .formatted(streamid)), ResetFrame.FLOW_CONTROL_ERROR); return true; diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/WindowUpdateSender.java b/src/java.net.http/share/classes/jdk/internal/net/http/WindowUpdateSender.java index 10d309d957bf7..0affadddf1518 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/WindowUpdateSender.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/WindowUpdateSender.java @@ -31,6 +31,7 @@ import jdk.internal.net.http.frame.WindowUpdateFrame; import jdk.internal.net.http.common.Utils; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; /** @@ -50,11 +51,11 @@ abstract class WindowUpdateSender { // The amount of flow controlled data received and processed, in bytes, // since the start of the window. // The window is exhausted when received + unprocessed >= windowSize - final AtomicInteger received = new AtomicInteger(); + final AtomicLong received = new AtomicLong(); // The amount of flow controlled data received and unprocessed, in bytes, // since the start of the window. // The window is exhausted when received + unprocessed >= windowSize - final AtomicInteger unprocessed = new AtomicInteger(); + final AtomicLong unprocessed = new AtomicLong(); final ReentrantLock sendLock = new ReentrantLock(); WindowUpdateSender(Http2Connection connection) { @@ -123,8 +124,11 @@ boolean canBufferUnprocessedBytes(int len) { // received and processed bytes and checks whether the // flow control window is exceeded. If so, take // corrective actions and return true. - private boolean checkWindowSizeExceeded(int len) { - int rcv = Math.addExact(received.get(), len); + private boolean checkWindowSizeExceeded(long len) { + // because windowSize is bound by Integer.MAX_VALUE + // we will never reach the point where received.get() + len + // could overflow + long rcv = received.get() + len; return rcv > windowSize && windowSizeExceeded(rcv); } @@ -139,7 +143,7 @@ private boolean checkWindowSizeExceeded(int len) { * @param delta the amount of processed bytes to release */ void processed(int delta) { - int rest = unprocessed.addAndGet(-delta); + long rest = unprocessed.addAndGet(-delta); assert rest >= 0; update(delta); } @@ -161,8 +165,8 @@ void processed(int delta) { * * @return the amount of remaining unprocessed bytes */ - int released(int delta) { - int rest = unprocessed.addAndGet(-delta); + long released(int delta) { + long rest = unprocessed.addAndGet(-delta); assert rest >= 0; return rest; } @@ -181,7 +185,7 @@ int released(int delta) { * @param delta the amount of bytes released from the window. */ void update(int delta) { - int rcv = received.addAndGet(delta); + long rcv = received.addAndGet(delta); if (debug.on()) debug.log("update: %d, received: %d, limit: %d", delta, rcv, limit); if (rcv > windowSize && windowSizeExceeded(rcv)) { return; @@ -189,7 +193,7 @@ void update(int delta) { if (rcv > limit) { sendLock.lock(); try { - int tosend = received.get(); + int tosend = (int)Math.min(received.get(), Integer.MAX_VALUE); if (tosend > limit) { received.getAndAdd(-tosend); sendWindowUpdate(tosend); @@ -230,6 +234,6 @@ String dbgString() { * @return {@code true} if the error was reported to the peer * and no further window update should be sent. */ - protected abstract boolean windowSizeExceeded(int received); + protected abstract boolean windowSizeExceeded(long received); }