Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

8342075: HttpClient: improve HTTP/2 flow control checks #3005

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import jdk.internal.net.http.common.MinimalFuture;
import jdk.internal.net.http.common.Utils;
import jdk.internal.net.http.frame.SettingsFrame;

import static jdk.internal.net.http.frame.SettingsFrame.INITIAL_CONNECTION_WINDOW_SIZE;
import static jdk.internal.net.http.frame.SettingsFrame.INITIAL_WINDOW_SIZE;
import static jdk.internal.net.http.frame.SettingsFrame.ENABLE_PUSH;
import static jdk.internal.net.http.frame.SettingsFrame.HEADER_TABLE_SIZE;
Expand Down Expand Up @@ -247,9 +249,13 @@ int getConnectionWindowSize(SettingsFrame clientSettings) {
int defaultValue = Math.min(Integer.MAX_VALUE,
Math.max(streamWindow, K*K*32));

// The min value is the max between the streamWindow and
// the initial connection window size
int minValue = Math.max(INITIAL_CONNECTION_WINDOW_SIZE, streamWindow);

return getParameter(
"jdk.httpclient.connectionWindowSize",
streamWindow, Integer.MAX_VALUE, defaultValue);
minValue, Integer.MAX_VALUE, defaultValue);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,34 @@ private String checkMaxOrphanedHeadersExceeded(HeaderFrame hf) {
return null;
}

// This method is called when a DataFrame that was added
// to a Stream::inputQ is later dropped from the queue
// without being consumed.
//
// Before adding a frame to the queue, the Stream calls
// connection.windowUpdater.canBufferUnprocessedBytes(), which
// increases the count of unprocessed bytes in the connection.
// After consuming the frame, it calls connection.windowUpdater::processed,
// which decrements the count of unprocessed bytes, and possibly
// sends a window update to the peer.
//
// This method is called when connection.windowUpdater::processed
// will not be called, which can happen when consuming the frame
// fails, or when an empty DataFrame terminates the stream,
// or when the stream is cancelled while data is still
// sitting in its inputQ. In the later case, it is called for
// each frame that is dropped from the queue.
final void releaseUnconsumed(DataFrame df) {
windowUpdater.released(df.payloadLength());
dropDataFrame(df);
}

// This method can be called directly when a DataFrame is dropped
// before/without having been added to any Stream::inputQ.
// In that case, the number of unprocessed bytes hasn't been incremented
// by the stream, and does not need to be decremented.
// Otherwise, if the frame is dropped after having been added to the
// inputQ, releaseUnconsumed above should be called.
final void dropDataFrame(DataFrame df) {
if (closed) return;
if (debug.on()) {
Expand Down Expand Up @@ -1087,6 +1115,10 @@ private void handleConnectionFrame(Http2Frame frame)
}
}

boolean isOpen() {
return !closed && connection.channel().isOpen();
}

void resetStream(int streamid, int code) {
try {
if (connection.channel().isOpen()) {
Expand Down Expand Up @@ -1277,11 +1309,12 @@ private void sendConnectionPreface() throws IOException {
// Note that the default initial window size, not to be confused
// with the initial window size, is defined by RFC 7540 as
// 64K -1.
final int len = windowUpdater.initialWindowSize - DEFAULT_INITIAL_WINDOW_SIZE;
if (len != 0) {
final int len = windowUpdater.initialWindowSize - INITIAL_CONNECTION_WINDOW_SIZE;
assert len >= 0;
if (len > 0) {
if (Log.channel()) {
Log.logChannel("Sending initial connection window update frame: {0} ({1} - {2})",
len, windowUpdater.initialWindowSize, DEFAULT_INITIAL_WINDOW_SIZE);
len, windowUpdater.initialWindowSize, INITIAL_CONNECTION_WINDOW_SIZE);
}
windowUpdater.sendWindowUpdate(len);
}
Expand Down Expand Up @@ -1658,6 +1691,19 @@ public ConnectionWindowUpdateSender(Http2Connection connection,
int getStreamId() {
return 0;
}

@Override
protected boolean windowSizeExceeded(long received) {
if (connection.isOpen()) {
try {
connection.protocolError(ErrorFrame.FLOW_CONTROL_ERROR,
"connection window exceeded");
} catch (IOException io) {
connection.shutdown(io);
}
}
return true;
}
}

/**
Expand Down
61 changes: 49 additions & 12 deletions src/java.net.http/share/classes/jdk/internal/net/http/Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ class Stream<T> extends ExchangeImpl<T> {
* sending any data. Will be null for PushStreams, as they cannot send data.
*/
private final WindowController windowController;
private final WindowUpdateSender windowUpdater;
private final WindowUpdateSender streamWindowUpdater;

@Override
HttpConnection connection() {
Expand Down Expand Up @@ -203,7 +203,8 @@ private void schedule() {
int size = Utils.remaining(dsts, Integer.MAX_VALUE);
if (size == 0 && finished) {
inputQ.remove();
connection.ensureWindowUpdated(df); // must update connection window
// consumed will not be called
connection.releaseUnconsumed(df); // must update connection window
Log.logTrace("responseSubscriber.onComplete");
if (debug.on()) debug.log("incoming: onComplete");
sched.stop();
Expand All @@ -219,7 +220,11 @@ private void schedule() {
try {
subscriber.onNext(dsts);
} catch (Throwable t) {
connection.dropDataFrame(df); // must update connection window
// Data frames that have been added to the inputQ
// must be released using releaseUnconsumed() to
// account for the amount of unprocessed bytes
// tracked by the connection.windowUpdater.
connection.releaseUnconsumed(df);
throw t;
}
if (consumed(df)) {
Expand Down Expand Up @@ -272,7 +277,11 @@ private void drainInputQueue() {
Http2Frame frame;
while ((frame = inputQ.poll()) != null) {
if (frame instanceof DataFrame) {
connection.dropDataFrame((DataFrame)frame);
// Data frames that have been added to the inputQ
// must be released using releaseUnconsumed() to
// account for the amount of unprocessed bytes
// tracked by the connection.windowUpdater.
connection.releaseUnconsumed((DataFrame)frame);
}
}
}
Expand All @@ -297,12 +306,13 @@ private boolean consumed(DataFrame df) {
boolean endStream = df.getFlag(DataFrame.END_STREAM);
if (len == 0) return endStream;

connection.windowUpdater.update(len);

connection.windowUpdater.processed(len);
if (!endStream) {
streamWindowUpdater.processed(len);
} else {
// Don't send window update on a stream which is
// closed or half closed.
windowUpdater.update(len);
streamWindowUpdater.released(len);
}

// true: end of stream; false: more data coming
Expand Down Expand Up @@ -343,8 +353,21 @@ public String toString() {
}

private void receiveDataFrame(DataFrame df) {
inputQ.add(df);
sched.runOrSchedule();
try {
int len = df.payloadLength();
if (len > 0) {
// we return from here if the connection is being closed.
if (!connection.windowUpdater.canBufferUnprocessedBytes(len)) return;
// we return from here if the stream is being closed.
if (closed || !streamWindowUpdater.canBufferUnprocessedBytes(len)) {
connection.releaseUnconsumed(df);
return;
}
}
inputQ.add(df);
} finally {
sched.runOrSchedule();
}
}

/** Handles a RESET frame. RESET is always handled inline in the queue. */
Expand Down Expand Up @@ -429,7 +452,7 @@ CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
this.responseHeadersBuilder = new HttpHeadersBuilder();
this.rspHeadersConsumer = new HeadersConsumer();
this.requestPseudoHeaders = createPseudoHeaders(request);
this.windowUpdater = new StreamWindowUpdateSender(connection);
this.streamWindowUpdater = new StreamWindowUpdateSender(connection);
}

/**
Expand Down Expand Up @@ -1281,12 +1304,18 @@ void cancel(IOException cause) {

@Override
void onProtocolError(final IOException cause) {
onProtocolError(cause, ResetFrame.PROTOCOL_ERROR);
}

void onProtocolError(final IOException cause, int code) {
if (debug.on()) {
debug.log("cancelling exchange on stream %d due to protocol error: %s", streamid, cause.getMessage());
debug.log("cancelling exchange on stream %d due to protocol error [%s]: %s",
streamid, ErrorFrame.stringForCode(code),
cause.getMessage());
}
Log.logError("cancelling exchange on stream {0} due to protocol error: {1}\n", streamid, cause);
// send a RESET frame and close the stream
cancelImpl(cause, ResetFrame.PROTOCOL_ERROR);
cancelImpl(cause, code);
}

void connectionClosing(Throwable cause) {
Expand Down Expand Up @@ -1554,6 +1583,14 @@ String dbgString() {
return dbgString = dbg;
}
}

@Override
protected boolean windowSizeExceeded(long received) {
onProtocolError(new ProtocolException("stream " + streamid +
" flow control window exceeded"),
ResetFrame.FLOW_CONTROL_ERROR);
return true;
}
}

/**
Expand Down
Loading