Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Throttle #65

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft

Conversation

programatik29
Copy link
Contributor

Adds Throttle which can slow down outgoing data.

Copy link
Contributor

@neoeinstein neoeinstein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some thoughts.

/// Will panic if milliseconds in `duration` is larger than `u32::MAX`.
pub fn new(body: B, duration: Duration, bytes: u32) -> Self {
let bytes = f64::from(bytes);
let duration = f64::from(u32::try_from(duration.as_millis()).expect("duration too large"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could instead use .as_secs_f64(). This is a units change, but by using this function below, you can keep things aligned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

State::Waiting(sleep, time) => match sleep.as_mut().poll(cx) {
Poll::Ready(()) => {
let byte_rate = *this.byte_rate;
let mut elapsed = to_f64(time.elapsed().as_millis());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.as_secs_f64()

}
Poll::Pending => return Poll::Pending,
},
State::Ready(time) => match this.inner.as_mut().poll_data(cx) {
Copy link
Contributor

@neoeinstein neoeinstein Jun 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of note, if we get a really large, single chunk, then no real throttling of data is done. Instead, this does throttling of chunk pulls, which may be pretty coarse in practice, as the max buffer size in hyper for HTTP/1 is 408 kiB. With a Full inner response body, Throttle would send everything as a single chunk without any throttling, probably not what a user of Throttle in a response would expect.

To implement throttling regardless of chunk size, you may need to hold the underlying data as buffer to enable re-chunking the data on the way through, potentially avoiding floating-point rate calculations.

One way would be: call poll_data(), split off up to quota and send (or all if below quota), if quota reached, then save away remaining bytes and halt sending until the next time horizon. On reaching next time horizon, send quota out of remaining bytes. If bytes are exhausted, poll data again and repeat. There’s some decision to be made here between number and size of chunks and the ability to track the requested throttle rate. With large time buckets, you can keep chunks relatively large, but will end up with highly-variable instantaneous throughput. With small time buckets, you may have a smoother throughput profile, but have more overhead in the number of chunks being sent downstream.

Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To implement throttling regardless of chunk size, you may need to hold the underlying data as buffer to enable re-chunking the data on the way through. potentially avoiding floating-point rate calculations.

I think this buffering cost should be optional to users. Maybe a Buffer body utility can be added.

Instead, this does throttling of chunk pulls, which may be pretty coarse in practice, as the max buffer size in hyper for HTTP/1 is 408 kiB.

Can't really get around that except documenting this setting and having users set it.

Copy link
Contributor

@neoeinstein neoeinstein Jun 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this buffering cost should be optional to users. Maybe a Buffer body utility can be added.

In practice, as the chunks are Bytes, the split operation is cheap, zero-copy, and doesn’t actually require allocating any distinct memory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know bytes::buf::Buf::copy_to_bytes was optimized for Bytes.

One way would be: call poll_data(), split off up to quota and send (or all if below quota), if quota reached, then save away remaining bytes and halt sending until the next time horizon. On reaching next time horizon, send quota out of remaining bytes. If bytes are exhausted, poll data again and repeat. There’s some decision to be made here between number and size of chunks and the ability to track the requested throttle rate. With large time buckets, you can keep chunks relatively large, but will end up with highly-variable instantaneous throughput. With small time buckets, you may have a smoother throughput profile, but have more overhead in the number of chunks being sent downstream.

I will work on this.

use std::{convert::Infallible, time::Duration};
use tokio::time::Instant;

#[tokio::test(start_paused = true)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do so love the auto-advancing clock for tokio testing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants