Skip to content

Commit 4527bf2

Browse files
authored
feat(handle): is_connected command (#43)
1 parent 4c72494 commit 4527bf2

File tree

5 files changed

+144
-7
lines changed

5 files changed

+144
-7
lines changed

crates/p2p/src/commands.rs

+35-1
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@ use libp2p::{identity::secp256k1, Multiaddr, PeerId};
55
use musig2::{PartialSignature, PubNonce};
66
use strata_p2p_types::{P2POperatorPubKey, Scope, SessionId, StakeChainId, WotsPublicKeys};
77
use strata_p2p_wire::p2p::v1::{GetMessageRequest, GossipsubMsg, UnsignedGossipsubMsg};
8+
use tokio::sync::oneshot;
89

910
/// Ask P2P implementation to distribute some data across network.
1011
#[expect(clippy::large_enum_variant)]
11-
#[derive(Debug, Clone)]
12+
#[derive(Debug)]
1213
pub enum Command {
1314
/// Publishes message through gossip sub network of peers.
1415
PublishMessage(PublishMessage),
@@ -21,6 +22,9 @@ pub enum Command {
2122

2223
/// Connects to a peer, whitelists peer, and adds peer to the gossip sub network.
2324
ConnectToPeer(ConnectToPeerCommand),
25+
26+
/// Directly queries P2P state (doesn't produce events)
27+
QueryP2PState(QueryP2PStateCommand),
2428
}
2529

2630
#[derive(Debug, Clone)]
@@ -192,6 +196,36 @@ pub struct ConnectToPeerCommand {
192196
pub peer_addr: Multiaddr,
193197
}
194198

199+
impl From<ConnectToPeerCommand> for Command {
200+
fn from(v: ConnectToPeerCommand) -> Self {
201+
Self::ConnectToPeer(v)
202+
}
203+
}
204+
205+
/// Commands to directly query P2P state information
206+
#[derive(Debug)]
207+
pub enum QueryP2PStateCommand {
208+
/// Query if we're connected to a specific peer
209+
IsConnected {
210+
/// Peer ID to check
211+
peer_id: PeerId,
212+
/// Channel to send the response back
213+
response_sender: oneshot::Sender<bool>,
214+
},
215+
216+
/// Get all connected peers
217+
GetConnectedPeers {
218+
/// Channel to send the response back
219+
response_sender: oneshot::Sender<Vec<PeerId>>,
220+
},
221+
}
222+
223+
impl From<QueryP2PStateCommand> for Command {
224+
fn from(v: QueryP2PStateCommand) -> Self {
225+
Self::QueryP2PState(v)
226+
}
227+
}
228+
195229
/// Commands P2P to clean entries from internal key-value storage by
196230
/// session IDs, scopes and operator pubkeys.
197231
#[derive(Debug, Clone)]

crates/p2p/src/swarm/handle.rs

+62-5
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,23 @@ use std::{
55
fmt::Display,
66
pin::Pin,
77
task::{Context, Poll},
8+
time::Duration,
89
};
910

1011
use futures::{FutureExt, Stream};
11-
use libp2p::identity::secp256k1::Keypair;
12+
use libp2p::{identity::secp256k1::Keypair, PeerId};
1213
use thiserror::Error;
13-
use tokio::sync::{
14-
broadcast::{self, error::RecvError},
15-
mpsc,
14+
use tokio::{
15+
sync::{
16+
broadcast::{self, error::RecvError},
17+
mpsc, oneshot,
18+
},
19+
time::timeout,
1620
};
1721
use tracing::warn;
1822

1923
use crate::{
20-
commands::{Command, PublishMessage, UnsignedPublishMessage},
24+
commands::{Command, PublishMessage, QueryP2PStateCommand, UnsignedPublishMessage},
2125
events::Event,
2226
};
2327

@@ -76,6 +80,59 @@ impl P2PHandle {
7680
pub fn sign_message(&self, msg: UnsignedPublishMessage) -> PublishMessage {
7781
msg.sign_secp256k1(&self.keypair)
7882
}
83+
84+
/// Checks if the P2P node is connected to the specified peer.
85+
/// Returns true if connected, false otherwise.
86+
pub async fn is_connected(&self, peer_id: PeerId) -> bool {
87+
let (sender, receiver) = oneshot::channel();
88+
89+
// Send the command to check connection
90+
let cmd = Command::QueryP2PState(QueryP2PStateCommand::IsConnected {
91+
peer_id,
92+
response_sender: sender,
93+
});
94+
95+
// Use a cloned sender to avoid borrow issues
96+
let cmd_sender = self.commands.clone();
97+
98+
// Send the command
99+
if cmd_sender.send(cmd).await.is_err() {
100+
// If the command channel is closed, assume not connected
101+
return false;
102+
}
103+
104+
// Wait for response with timeout
105+
// TODO: make this configurable
106+
match timeout(Duration::from_secs(1), receiver).await {
107+
Ok(Ok(is_connected)) => is_connected,
108+
_ => false, // Timeout or channel closed
109+
}
110+
}
111+
112+
/// Gets the list of all currently connected peers.
113+
pub async fn get_connected_peers(&self) -> Vec<PeerId> {
114+
let (sender, receiver) = oneshot::channel();
115+
116+
// Send the command
117+
let cmd = Command::QueryP2PState(QueryP2PStateCommand::GetConnectedPeers {
118+
response_sender: sender,
119+
});
120+
121+
// Use a cloned sender to avoid borrow issues
122+
let cmd_sender = self.commands.clone();
123+
124+
// If sending fails, return empty list
125+
if cmd_sender.send(cmd).await.is_err() {
126+
return Vec::new();
127+
}
128+
129+
// Wait for response with timeout
130+
// TODO: make this configurable
131+
match timeout(Duration::from_secs(1), receiver).await {
132+
Ok(Ok(peers)) => peers,
133+
_ => Vec::new(), // Timeout or channel closed
134+
}
135+
}
79136
}
80137

81138
impl Clone for P2PHandle {

crates/p2p/src/swarm/mod.rs

+19-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,10 @@ use tokio::{
3838
use tokio_util::sync::CancellationToken;
3939
use tracing::{debug, error, info, instrument};
4040

41-
use crate::{commands::Command, events::Event};
41+
use crate::{
42+
commands::{Command, QueryP2PStateCommand},
43+
events::Event,
44+
};
4245

4346
mod behavior;
4447
mod codec;
@@ -522,6 +525,21 @@ impl<DB: RepositoryExt> P2P<DB> {
522525

523526
Ok(())
524527
}
528+
Command::QueryP2PState(query) => match query {
529+
QueryP2PStateCommand::IsConnected {
530+
peer_id,
531+
response_sender,
532+
} => {
533+
let is_connected = self.swarm.is_connected(&peer_id);
534+
let _ = response_sender.send(is_connected);
535+
Ok(())
536+
}
537+
QueryP2PStateCommand::GetConnectedPeers { response_sender } => {
538+
let peers = self.swarm.connected_peers().cloned().collect();
539+
let _ = response_sender.send(peers);
540+
Ok(())
541+
}
542+
},
525543
}
526544
}
527545

crates/p2p/src/tests/is_connected.rs

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
//! Tests for the `is_connected` method.
2+
3+
use crate::tests::common::Setup;
4+
5+
#[tokio::test]
6+
async fn test_is_connected() -> anyhow::Result<()> {
7+
// Set up two connected operators
8+
let Setup {
9+
operators,
10+
cancel,
11+
tasks,
12+
} = Setup::all_to_all(2).await?;
13+
14+
// Verify operator 0 is connected to operator 1
15+
let is_connected = operators[0].handle.is_connected(operators[1].peer_id).await;
16+
assert!(is_connected);
17+
18+
// Also test the get_connected_peers API
19+
let connected_peers = operators[0].handle.get_connected_peers().await;
20+
assert!(connected_peers.contains(&operators[1].peer_id));
21+
22+
// Cleanup
23+
cancel.cancel();
24+
tasks.wait().await;
25+
26+
Ok(())
27+
}

crates/p2p/src/tests/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@ pub(crate) mod common;
22
pub(crate) mod connect_peer;
33
pub(crate) mod db;
44
pub(crate) mod gossipsub;
5+
pub(crate) mod is_connected;
56
pub(crate) mod request;

0 commit comments

Comments
 (0)