Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wasmtime-wasi: Fix spurious wake-ups in blocking_ of InputStream and OutputStream #10113

Merged
merged 3 commits into from
Feb 3, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 40 additions & 4 deletions crates/wasi-io/src/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@ use alloc::boxed::Box;
use anyhow::Result;
use bytes::Bytes;

/// `Pollable::ready()` for `InputStream` and `OutputStream` may return
/// prematurely due to `io::ErrorKind::WouldBlock`.
///
/// To ensure that `blocking_` functions return a valid non-empty result,
/// we use a loop with a maximum iteration limit.
///
/// This constant defines the maximum number of loop attempts allowed.
const MAX_BLOCKING_ATTEMPTS: u8 = 10;

/// Host trait for implementing the `wasi:io/streams.input-stream` resource: A
/// bytestream which can be read from.
#[async_trait::async_trait]
Expand All @@ -24,8 +33,24 @@ pub trait InputStream: Pollable {
/// Similar to `read`, except that it blocks until at least one byte can be
/// read.
async fn blocking_read(&mut self, size: usize) -> StreamResult<Bytes> {
self.ready().await;
self.read(size)
if size == 0 {
self.ready().await;
return self.read(size);
}

let mut i = 0;
loop {
// This `ready` call may return prematurely due to `io::ErrorKind::WouldBlock`.
self.ready().await;
let data = self.read(size)?;
if !data.is_empty() {
return Ok(data);
}
if i >= MAX_BLOCKING_ATTEMPTS {
return Err(StreamError::trap("max blocking attempts exceeded"));
}
i += 1;
}
}

/// Same as the `read` method except that bytes are skipped.
Expand Down Expand Up @@ -239,8 +264,19 @@ pub trait OutputStream: Pollable {
/// Simultaneously waits for this stream to be writable and then returns how
/// much may be written or the last error that happened.
async fn write_ready(&mut self) -> StreamResult<usize> {
self.ready().await;
self.check_write()
let mut i = 0;
loop {
// This `ready` call may return prematurely due to `io::ErrorKind::WouldBlock`.
self.ready().await;
let n = self.check_write()?;
if n > 0 {
return Ok(n);
}
if i >= MAX_BLOCKING_ATTEMPTS {
return Err(StreamError::trap("max blocking attempts exceeded"));
}
i += 1;
}
}

/// Cancel any asynchronous work and wait for it to wrap up.
Expand Down