Skip to content

Commit b109803

Browse files
authored
Fix reclaiming reserved capacity (#832)
Reclaiming requested capacity that has not been actually reserved yet is wrong, as this capacity never existed to begin with. Fixes #607
1 parent d7c56f4 commit b109803

File tree

3 files changed

+109
-6
lines changed

3 files changed

+109
-6
lines changed

src/proto/streams/prioritize.rs

+11-6
Original file line numberDiff line numberDiff line change
@@ -347,13 +347,18 @@ impl Prioritize {
347347
/// Reclaim just reserved capacity, not buffered capacity, and re-assign
348348
/// it to the connection
349349
pub fn reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
350-
// only reclaim requested capacity that isn't already buffered
351-
if stream.requested_send_capacity as usize > stream.buffered_send_data {
352-
let reserved = stream.requested_send_capacity - stream.buffered_send_data as WindowSize;
350+
// only reclaim reserved capacity that isn't already buffered
351+
if stream.send_flow.available().as_size() as usize > stream.buffered_send_data {
352+
let reserved =
353+
stream.send_flow.available().as_size() - stream.buffered_send_data as WindowSize;
354+
355+
// Panic safety: due to how `reserved` is computed it can't be greater
356+
// than what's available.
357+
stream
358+
.send_flow
359+
.claim_capacity(reserved)
360+
.expect("window size should be greater than reserved");
353361

354-
// TODO: proper error handling
355-
let _res = stream.send_flow.claim_capacity(reserved);
356-
debug_assert!(_res.is_ok());
357362
self.assign_connection_capacity(reserved, stream, counts);
358363
}
359364
}

tests/h2-support/src/frames.rs

+5
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,11 @@ impl Mock<frame::Settings> {
365365
self
366366
}
367367

368+
pub fn max_frame_size(mut self, val: u32) -> Self {
369+
self.0.set_max_frame_size(Some(val));
370+
self
371+
}
372+
368373
pub fn initial_window_size(mut self, val: u32) -> Self {
369374
self.0.set_initial_window_size(Some(val));
370375
self

tests/h2-tests/tests/flow_control.rs

+93
Original file line numberDiff line numberDiff line change
@@ -1898,3 +1898,96 @@ async fn window_size_decremented_past_zero() {
18981898

18991899
join(client, srv).await;
19001900
}
1901+
1902+
#[tokio::test]
1903+
async fn reclaim_reserved_capacity() {
1904+
use futures::channel::oneshot;
1905+
1906+
h2_support::trace_init!();
1907+
1908+
let (io, mut srv) = mock::new();
1909+
let (depleted_tx, depleted_rx) = oneshot::channel();
1910+
1911+
let mock = async move {
1912+
let settings = srv.assert_client_handshake().await;
1913+
assert_default_settings!(settings);
1914+
1915+
srv.recv_frame(frames::headers(1).request("POST", "https://www.example.com/"))
1916+
.await;
1917+
srv.send_frame(frames::headers(1).response(200)).await;
1918+
1919+
srv.recv_frame(frames::data(1, vec![0; 16384])).await;
1920+
srv.recv_frame(frames::data(1, vec![0; 16384])).await;
1921+
srv.recv_frame(frames::data(1, vec![0; 16384])).await;
1922+
srv.recv_frame(frames::data(1, vec![0; 16383])).await;
1923+
depleted_tx.send(()).unwrap();
1924+
1925+
// By now, this peer's connection window is completely depleted.
1926+
1927+
srv.recv_frame(frames::headers(3).request("POST", "https://www.example.com/"))
1928+
.await;
1929+
srv.send_frame(frames::headers(3).response(200)).await;
1930+
1931+
srv.recv_frame(frames::reset(1).cancel()).await;
1932+
};
1933+
1934+
let h2 = async move {
1935+
let (mut client, mut h2) = client::handshake(io).await.unwrap();
1936+
1937+
let mut depleting_stream = {
1938+
let request = Request::builder()
1939+
.method(Method::POST)
1940+
.uri("https://www.example.com/")
1941+
.body(())
1942+
.unwrap();
1943+
1944+
let (resp, stream) = client.send_request(request, false).unwrap();
1945+
1946+
{
1947+
let resp = h2.drive(resp).await.unwrap();
1948+
assert_eq!(resp.status(), StatusCode::OK);
1949+
}
1950+
1951+
stream
1952+
};
1953+
1954+
depleting_stream
1955+
.send_data(vec![0; 65535].into(), false)
1956+
.unwrap();
1957+
h2.drive(depleted_rx).await.unwrap();
1958+
1959+
// By now, the client knows it has completely depleted the server's
1960+
// connection window.
1961+
1962+
depleting_stream.reserve_capacity(1);
1963+
1964+
let mut starved_stream = {
1965+
let request = Request::builder()
1966+
.method(Method::POST)
1967+
.uri("https://www.example.com/")
1968+
.body(())
1969+
.unwrap();
1970+
1971+
let (resp, stream) = client.send_request(request, false).unwrap();
1972+
1973+
{
1974+
let resp = h2.drive(resp).await.unwrap();
1975+
assert_eq!(resp.status(), StatusCode::OK);
1976+
}
1977+
1978+
stream
1979+
};
1980+
1981+
// The following call puts starved_stream in pending_send, as the
1982+
// server's connection window is completely empty.
1983+
starved_stream.send_data(vec![0; 1].into(), false).unwrap();
1984+
1985+
// This drop should change nothing, as it didn't actually reserve
1986+
// any available connection window, only requested it.
1987+
drop(depleting_stream);
1988+
1989+
h2.await.unwrap();
1990+
};
1991+
1992+
join(mock, h2).await;
1993+
}

0 commit comments

Comments
 (0)