Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timing out on SinkExt::send still causes the item to be re-inserted later into the bounded channel #2894

Open
NumberFour8 opened this issue Oct 27, 2024 · 0 comments

Comments

@NumberFour8
Copy link

NumberFour8 commented Oct 27, 2024

When inserting an element into the bounded MPSC channel that is at full capacity, the SinkExt::send future correctly waits until there is space in the channel.

However, if the SinkExt::send future is selected against a timeout (the future should be therefore destroyed if the timeout elapses first), the attempted value is still later inserted into the channel.

The expectation is that if the send future is terminated early, the item it attempted to insert should never appear in the channel.

Instead, it seems that the last item attempted to be send is cached (when polling for channel readiness starts inside send) and re-delivered in the next send. That seems as a bug, or at best an undocumented behavior, if I'm not mistaken.

Minimal example for reproducing the behavior:

use std::time::Duration;

use futures::prelude::*;
use futures::pin_mut;
use tokio;
use anyhow;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let (snd, recv) = futures::channel::mpsc::channel(3);
    pin_mut!(snd);
    pin_mut!(recv);
    
    snd.send(1).await?;
    snd.send(2).await?;
    snd.send(3).await?;
    // Channel now contains 1,2,3
    
    // Insertion of 4 times out, because the channel is at full capacity
    {
        let insert_4 = std::pin::pin!(snd.send(4));
        let timeout = std::pin::pin!(tokio::time::sleep(Duration::from_millis(500)));
        match futures::future::select(insert_4, timeout).await {
            futures::future::Either::Left(_) => anyhow::bail!("must timeout when at full capacity"),
            futures::future::Either::Right(_) => {}
        }
    }
    
    // Free up some capacity by consuming 1,2
    assert_eq!(Some(1), recv.next().await);
    assert_eq!(Some(2), recv.next().await);
    // Channel should now contain only 3
    
    // Now send 5 into the channel and close it
    snd.send(5).await?;
    snd.close_channel();
    
    // Channel now should contain 3,5 (but actually seems to contain 3,4,5)
    assert_eq!(Some(3), recv.next().await);
    assert_eq!(Some(5), recv.next().await); // Panics here: 4 happens to be in the despite the timeout
    assert_eq!(None, recv.next().await);
    
    
    Ok(())
}

Playground link: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=e72c947af414fc813345d71533065414

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant