Skip to content

Commit

Permalink
zb: Replace futures-util runtime dependency with futures-lite
Browse files Browse the repository at this point in the history
  • Loading branch information
jplatte committed Feb 4, 2025
1 parent c0fbcb5 commit cec38e6
Show file tree
Hide file tree
Showing 13 changed files with 37 additions and 30 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions zbus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ async-io = [
# features for only specific target OS: https://github.com/rust-lang/cargo/issues/1197.
"async-process",
"blocking",
"futures-util/io",
]
tokio = ["dep:tokio"]
vsock = ["dep:vsock", "dep:async-io"]
Expand All @@ -56,7 +55,7 @@ serde = { version = "1.0.200", features = ["derive"] }
serde_repr = "0.1.19"
enumflags2 = { version = "0.7.9", features = ["serde"] }
futures-core = "0.3.30"
futures-util = { version = "0.3.30", default-features = false, features = [
futures-lite = { version = "2.6.0", default-features = false, features = [
"std",
] }
async-broadcast = "0.7.0"
Expand Down Expand Up @@ -121,7 +120,7 @@ async-recursion = "1.1.1"
[dev-dependencies]
zbus_xml = { path = "../zbus_xml", version = "5.0.0" }
doc-comment = "0.3.3"
futures-util = "0.3.30" # activate default features
futures-util = { version = "0.3.30", features = ["io"] }
ntest = "0.9.2"
test-log = { version = "0.2.16", features = [
"trace",
Expand Down
2 changes: 1 addition & 1 deletion zbus/src/blocking/message_iterator.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use futures_util::StreamExt;
use futures_lite::StreamExt;
use static_assertions::assert_impl_all;

use crate::{
Expand Down
2 changes: 1 addition & 1 deletion zbus/src/blocking/proxy/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! The client-side proxy API.
use enumflags2::BitFlags;
use futures_util::StreamExt;
use futures_lite::StreamExt;
use static_assertions::assert_impl_all;
use std::{fmt, ops::Deref};
use zbus_names::{BusName, InterfaceName, MemberName, UniqueName};
Expand Down
14 changes: 9 additions & 5 deletions zbus/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use zbus_names::{BusName, ErrorName, InterfaceName, MemberName, OwnedUniqueName,
use zvariant::ObjectPath;

use futures_core::Future;
use futures_util::StreamExt;
use futures_lite::StreamExt;

use crate::{
async_lock::{Mutex, Semaphore, SemaphorePermit},
Expand Down Expand Up @@ -1541,14 +1541,14 @@ mod tests {
#[cfg(feature = "p2p")]
#[cfg(test)]
mod p2p_tests {
use futures_util::stream::TryStreamExt;
use event_listener::Event;
use futures_util::TryStreamExt;
use ntest::timeout;
use test_log::test;
use zvariant::{Endian, NATIVE_ENDIAN};

use crate::{conn::AuthMechanism, Guid};

use super::*;
use super::{socket, Builder, Connection};
use crate::{conn::AuthMechanism, Guid, Message, MessageStream, Result};

// Same numbered client and server are already paired up.
async fn test_p2p(
Expand Down Expand Up @@ -1740,6 +1740,9 @@ mod p2p_tests {
feature = "tokio-vsock"
))]
async fn test_vsock_connect() -> Result<(Connection, Connection)> {
#[cfg(feature = "tokio-vsock")]
use futures_util::StreamExt;

let guid = Guid::generate();

#[cfg(all(feature = "vsock", not(feature = "tokio")))]
Expand Down Expand Up @@ -1814,6 +1817,7 @@ mod p2p_tests {

#[cfg(feature = "tokio-vsock")]
async fn vsock_p2p_pipe() -> Result<(Connection, Connection)> {
use futures_util::StreamExt;
use tokio_vsock::VsockAddr;

let guid = Guid::generate();
Expand Down
6 changes: 3 additions & 3 deletions zbus/src/connection/socket/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl TryFrom<&mut Child> for Command {
#[async_trait::async_trait]
impl ReadHalf for ChildStdout {
async fn recvmsg(&mut self, buf: &mut [u8]) -> RecvmsgResult {
match futures_util::AsyncReadExt::read(&mut self, buf).await {
match futures_lite::AsyncReadExt::read(&mut self, buf).await {
Err(e) => Err(e),
Ok(len) => {
#[cfg(unix)]
Expand Down Expand Up @@ -104,11 +104,11 @@ impl WriteHalf for ChildStdin {
));
}

futures_util::AsyncWriteExt::write(&mut self, buf).await
futures_lite::AsyncWriteExt::write(&mut self, buf).await
}

async fn close(&mut self) -> io::Result<()> {
futures_util::AsyncWriteExt::close(&mut self).await
futures_lite::AsyncWriteExt::close(&mut self).await
}
}

Expand Down
4 changes: 2 additions & 2 deletions zbus/src/connection/socket/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use super::{Socket, Split};
#[async_trait::async_trait]
impl ReadHalf for Arc<Async<TcpStream>> {
async fn recvmsg(&mut self, buf: &mut [u8]) -> RecvmsgResult {
match futures_util::AsyncReadExt::read(&mut self.as_ref(), buf).await {
match futures_lite::AsyncReadExt::read(&mut self.as_ref(), buf).await {
Err(e) => Err(e),
Ok(len) => {
#[cfg(unix)]
Expand Down Expand Up @@ -69,7 +69,7 @@ impl WriteHalf for Arc<Async<TcpStream>> {
));
}

futures_util::AsyncWriteExt::write(&mut self.as_ref(), buf).await
futures_lite::AsyncWriteExt::write(&mut self.as_ref(), buf).await
}

async fn close(&mut self) -> io::Result<()> {
Expand Down
4 changes: 2 additions & 2 deletions zbus/src/connection/socket/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ impl super::WriteHalf for tokio::net::unix::OwnedWriteHalf {
#[async_trait::async_trait]
impl super::ReadHalf for Arc<Async<UnixStream>> {
async fn recvmsg(&mut self, buf: &mut [u8]) -> super::RecvmsgResult {
match futures_util::AsyncReadExt::read(&mut self.as_ref(), buf).await {
match futures_lite::AsyncReadExt::read(&mut self.as_ref(), buf).await {
Err(e) => Err(e),
Ok(len) => {
#[cfg(unix)]
Expand Down Expand Up @@ -247,7 +247,7 @@ impl super::WriteHalf for Arc<Async<UnixStream>> {
buf: &[u8],
#[cfg(unix)] _fds: &[BorrowedFd<'_>],
) -> std::io::Result<usize> {
futures_util::AsyncWriteExt::write(&mut self.as_ref(), buf).await
futures_lite::AsyncWriteExt::write(&mut self.as_ref(), buf).await
}

async fn close(&mut self) -> std::io::Result<()> {
Expand Down
4 changes: 2 additions & 2 deletions zbus/src/connection/socket/vsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use super::{Socket, Split};
#[async_trait::async_trait]
impl super::ReadHalf for std::sync::Arc<async_io::Async<vsock::VsockStream>> {
async fn recvmsg(&mut self, buf: &mut [u8]) -> super::RecvmsgResult {
match futures_util::AsyncReadExt::read(&mut self.as_ref(), buf).await {
match futures_lite::AsyncReadExt::read(&mut self.as_ref(), buf).await {
Err(e) => Err(e),
Ok(len) => {
#[cfg(unix)]
Expand Down Expand Up @@ -40,7 +40,7 @@ impl super::WriteHalf for std::sync::Arc<async_io::Async<vsock::VsockStream>> {
));
}

futures_util::AsyncWriteExt::write(&mut self.as_ref(), buf).await
futures_lite::AsyncWriteExt::write(&mut self.as_ref(), buf).await
}

async fn close(&mut self) -> std::io::Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion zbus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ extern crate self as zbus;
pub mod export {
pub use async_trait;
pub use futures_core;
pub use futures_util;
pub use futures_lite;
pub use ordered_stream;
pub use serde;
pub use static_assertions;
Expand Down
3 changes: 1 addition & 2 deletions zbus/src/message_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ use std::{
};

use async_broadcast::Receiver as ActiveReceiver;
use futures_core::stream;
use futures_util::stream::FusedStream;
use futures_core::stream::{self, FusedStream};
use ordered_stream::{OrderedStream, PollResult};
use static_assertions::assert_impl_all;
use tracing::warn;
Expand Down
14 changes: 9 additions & 5 deletions zbus/src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use enumflags2::{bitflags, BitFlags};
use event_listener::{Event, EventListener};
use futures_core::{ready, stream};
use futures_util::{future::Either, stream::Map};
use futures_lite::stream::Map;
use ordered_stream::{join as join_streams, FromFuture, Join, OrderedStream, PollResult};
use static_assertions::assert_impl_all;
use std::{
Expand Down Expand Up @@ -401,7 +401,7 @@ impl PropertiesCache {
interface: InterfaceName<'static>,
uncached_properties: HashSet<zvariant::Str<'static>>,
) -> Result<()> {
use futures_util::StreamExt;
use futures_lite::StreamExt;

trace!("Listening for property changes on {interface}...");
while let Some(update) = prop_changes.next().await {
Expand Down Expand Up @@ -1004,7 +1004,7 @@ impl<'a> Proxy<'a> {
/// Note that zbus doesn't queue the updates. If the listener is slower than the receiver, it
/// will only receive the last update.
pub async fn receive_owner_changed(&self) -> Result<OwnerChangedStream<'a>> {
use futures_util::StreamExt;
use futures_lite::StreamExt;
let dbus_proxy = fdo::DBusProxy::builder(self.connection())
.cache_properties(CacheProperties::No)
.build()
Expand Down Expand Up @@ -1100,8 +1100,7 @@ impl stream::Stream for OwnerChangedStream<'_> {
type Item = Option<UniqueName<'static>>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
use futures_util::StreamExt;
self.get_mut().stream.poll_next_unpin(cx)
Pin::new(&mut self.get_mut().stream).poll_next(cx)
}
}

Expand Down Expand Up @@ -1357,6 +1356,11 @@ where
fn inner(&self) -> &Proxy<'c>;
}

enum Either<L, R> {
Left(L),
Right(R),
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
2 changes: 1 addition & 1 deletion zbus_macros/src/iface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ pub fn expand(args: Punctuated<Meta, Token![,]>, mut input: ItemImpl) -> syn::Re
quote!(self.#ident(#args_names)#method_await)
} else if is_async {
quote!(
#zbus::export::futures_util::future::FutureExt::map(
#zbus::export::futures_lite::future::FutureExt::map(
self.#ident(#args_names),
::std::result::Result::Ok,
)
Expand Down

0 comments on commit cec38e6

Please sign in to comment.