From 1affefe2d45aac16ca0e69913a752565864df5cf Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Fri, 6 Dec 2024 14:48:46 +0800 Subject: [PATCH 01/26] template implementation --- Cargo.lock | 8 +++++ Cargo.toml | 2 +- misc/peer-store/Cargo.toml | 12 +++++++ misc/peer-store/src/behaviour.rs | 62 ++++++++++++++++++++++++++++++++ misc/peer-store/src/lib.rs | 5 +++ misc/peer-store/src/store.rs | 5 +++ 6 files changed, 93 insertions(+), 1 deletion(-) create mode 100644 misc/peer-store/Cargo.toml create mode 100644 misc/peer-store/src/behaviour.rs create mode 100644 misc/peer-store/src/lib.rs create mode 100644 misc/peer-store/src/store.rs diff --git a/Cargo.lock b/Cargo.lock index 45f185d9780..b02abda6bdc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4298,6 +4298,14 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c" +[[package]] +name = "peer-store" +version = "0.1.0" +dependencies = [ + "libp2p-core", + "libp2p-swarm", +] + [[package]] name = "pem" version = "3.0.2" diff --git a/Cargo.toml b/Cargo.toml index 7f7b601ab82..dea37485d56 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,7 +64,7 @@ members = [ "transports/websocket-websys", "transports/websocket", "transports/webtransport-websys", - "wasm-tests/webtransport-tests", + "wasm-tests/webtransport-tests", "misc/peer-store", ] resolver = "2" diff --git a/misc/peer-store/Cargo.toml b/misc/peer-store/Cargo.toml new file mode 100644 index 00000000000..c9013491301 --- /dev/null +++ b/misc/peer-store/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "peer-store" +version = "0.1.0" +edition = "2021" +rust-version.workspace = true + +[dependencies] +libp2p-core = { workspace = true } +libp2p-swarm = { workspace = true } + +[lints] +workspace = true diff --git a/misc/peer-store/src/behaviour.rs b/misc/peer-store/src/behaviour.rs new file mode 100644 index 00000000000..f121f2ab110 --- /dev/null +++ b/misc/peer-store/src/behaviour.rs @@ -0,0 +1,62 @@ +use libp2p_swarm::{dummy, NetworkBehaviour}; + +use crate::store::Store; + +pub enum Event{ + RecordUpdated +} + +pub struct Behaviour { + store: S, +} + +impl NetworkBehaviour for Behaviour +where + S: Store + Send + Sync + 'static, +{ + type ConnectionHandler = dummy::ConnectionHandler; + + type ToSwarm = Event; + + fn handle_established_inbound_connection( + &mut self, + _connection_id: libp2p_swarm::ConnectionId, + _peer: libp2p_core::PeerId, + _local_addr: &libp2p_core::Multiaddr, + _remote_addr: &libp2p_core::Multiaddr, + ) -> Result, libp2p_swarm::ConnectionDenied> { + Ok(dummy::ConnectionHandler) + } + + fn handle_established_outbound_connection( + &mut self, + _connection_id: libp2p_swarm::ConnectionId, + _peer: libp2p_core::PeerId, + _addr: &libp2p_core::Multiaddr, + _role_override: libp2p_core::Endpoint, + _port_use: libp2p_core::transport::PortUse, + ) -> Result, libp2p_swarm::ConnectionDenied> { + Ok(dummy::ConnectionHandler) + } + + fn on_swarm_event(&mut self, event: libp2p_swarm::FromSwarm) { + todo!() + } + + fn on_connection_handler_event( + &mut self, + _peer_id: libp2p_core::PeerId, + _connection_id: libp2p_swarm::ConnectionId, + _event: libp2p_swarm::THandlerOutEvent, + ) { + todo!() + } + + fn poll( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll>> + { + todo!() + } +} diff --git a/misc/peer-store/src/lib.rs b/misc/peer-store/src/lib.rs new file mode 100644 index 00000000000..fed9de9cb54 --- /dev/null +++ b/misc/peer-store/src/lib.rs @@ -0,0 +1,5 @@ +mod behaviour; +mod store; + +pub use behaviour::{Behaviour, Event}; +pub use store::{MemoryStore, Store}; diff --git a/misc/peer-store/src/store.rs b/misc/peer-store/src/store.rs new file mode 100644 index 00000000000..b4e1b34ca6b --- /dev/null +++ b/misc/peer-store/src/store.rs @@ -0,0 +1,5 @@ +pub trait Store {} + +pub struct MemoryStore {} + +impl Store for MemoryStore {} From 8d4930a4af54a57886238ea0c5e796d88eba6e74 Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Sat, 7 Dec 2024 15:30:50 +0800 Subject: [PATCH 02/26] implement in-memory store --- misc/peer-store/src/behaviour.rs | 78 +++++++++++++++++++++++---- misc/peer-store/src/lib.rs | 19 +++++++ misc/peer-store/src/store.rs | 92 ++++++++++++++++++++++++++++++-- 3 files changed, 176 insertions(+), 13 deletions(-) diff --git a/misc/peer-store/src/behaviour.rs b/misc/peer-store/src/behaviour.rs index f121f2ab110..f66fc7186d9 100644 --- a/misc/peer-store/src/behaviour.rs +++ b/misc/peer-store/src/behaviour.rs @@ -1,18 +1,50 @@ -use libp2p_swarm::{dummy, NetworkBehaviour}; +use std::{collections::VecDeque, task::Poll}; + +use libp2p_core::{Multiaddr, PeerId}; +use libp2p_swarm::{dummy, FromSwarm, NetworkBehaviour}; use crate::store::Store; -pub enum Event{ - RecordUpdated +pub enum Event { + RecordUpdated { peer: PeerId }, } pub struct Behaviour { store: S, + pending_events: VecDeque, +} + +impl Behaviour +where + S: Store + 'static, +{ + pub fn new(store: S) -> Self { + Self { + store, + pending_events: VecDeque::new(), + } + } + /// List peers that are currently connected to this peer. + pub fn list_connected(&self) -> Box<[&PeerId]> { + self.store.list_connected() + } + /// Try to get all observed address of the given peer. + /// Returns `None` when the peer is not in the store. + pub fn address_of_peer(&self, peer: &PeerId) -> Option> { + self.store.addresses_of_peer(peer) + } + /// Manually update a record. + /// This will always cause an `Event::RecordUpdated` to be emitted. + pub fn update_record(&mut self, peer: &PeerId, address: &Multiaddr) { + self.store.on_address_update(peer, address); + self.pending_events + .push_back(Event::RecordUpdated { peer: *peer }); + } } impl NetworkBehaviour for Behaviour where - S: Store + Send + Sync + 'static, + S: Store + 'static, { type ConnectionHandler = dummy::ConnectionHandler; @@ -21,26 +53,49 @@ where fn handle_established_inbound_connection( &mut self, _connection_id: libp2p_swarm::ConnectionId, - _peer: libp2p_core::PeerId, + peer: libp2p_core::PeerId, _local_addr: &libp2p_core::Multiaddr, - _remote_addr: &libp2p_core::Multiaddr, + remote_addr: &libp2p_core::Multiaddr, ) -> Result, libp2p_swarm::ConnectionDenied> { + self.store.on_peer_connect(&peer); + if self.store.on_address_update(&peer, remote_addr) { + self.pending_events.push_back(Event::RecordUpdated { peer }); + } Ok(dummy::ConnectionHandler) } fn handle_established_outbound_connection( &mut self, _connection_id: libp2p_swarm::ConnectionId, - _peer: libp2p_core::PeerId, + peer: libp2p_core::PeerId, _addr: &libp2p_core::Multiaddr, _role_override: libp2p_core::Endpoint, _port_use: libp2p_core::transport::PortUse, ) -> Result, libp2p_swarm::ConnectionDenied> { + self.store.on_peer_connect(&peer); Ok(dummy::ConnectionHandler) } fn on_swarm_event(&mut self, event: libp2p_swarm::FromSwarm) { - todo!() + match event { + FromSwarm::ConnectionClosed(info) => { + if info.remaining_established < 1 { + self.store.on_peer_disconnect(&info.peer_id); + } + } + FromSwarm::ConnectionEstablished(info) => { + if info.other_established == 0 { + self.store.on_peer_connect(&info.peer_id); + } + } + FromSwarm::NewExternalAddrOfPeer(info) => { + if self.store.on_address_update(&info.peer_id, info.addr) { + self.pending_events + .push_back(Event::RecordUpdated { peer: info.peer_id }); + } + } + _ => {} + } } fn on_connection_handler_event( @@ -54,9 +109,12 @@ where fn poll( &mut self, - cx: &mut std::task::Context<'_>, + _cx: &mut std::task::Context<'_>, ) -> std::task::Poll>> { - todo!() + if let Some(ev) = self.pending_events.pop_front() { + return Poll::Ready(libp2p_swarm::ToSwarm::GenerateEvent(ev)); + } + Poll::Pending } } diff --git a/misc/peer-store/src/lib.rs b/misc/peer-store/src/lib.rs index fed9de9cb54..df40decd5d7 100644 --- a/misc/peer-store/src/lib.rs +++ b/misc/peer-store/src/lib.rs @@ -1,5 +1,24 @@ mod behaviour; mod store; +use std::time::SystemTime; + pub use behaviour::{Behaviour, Event}; +use libp2p_core::Multiaddr; pub use store::{MemoryStore, Store}; + +pub struct AddressRecord<'a>{ + address: &'a Multiaddr, + last_seen: &'a SystemTime, +} +impl<'a> AddressRecord<'a>{ + /// The address of this record. + pub fn address(&self)->&Multiaddr{ + self.address + } + /// How much time has passed since the address is last reported wrt. current time. + /// This may fail because of system time change. + pub fn last_seen(&self)->Result{ + self.last_seen.elapsed() + } +} \ No newline at end of file diff --git a/misc/peer-store/src/store.rs b/misc/peer-store/src/store.rs index b4e1b34ca6b..d56a730a596 100644 --- a/misc/peer-store/src/store.rs +++ b/misc/peer-store/src/store.rs @@ -1,5 +1,91 @@ -pub trait Store {} +use std::{ + collections::{HashMap, HashSet}, + time::SystemTime, +}; -pub struct MemoryStore {} +use libp2p_core::{Multiaddr, PeerId}; -impl Store for MemoryStore {} +pub trait Store { + fn on_peer_connect(&mut self, peer: &PeerId); + fn on_peer_disconnect(&mut self, peer: &PeerId); + /// Update an address record. + /// Return `true` when the address is new. + fn on_address_update(&mut self, peer: &PeerId, address: &Multiaddr) -> bool; + fn list_connected(&self) -> Box<[&PeerId]>; + fn addresses_of_peer(&self, peer: &PeerId) -> Option>; +} + +pub(crate) struct PeerAddressRecord { + addresses: HashMap, +} +impl PeerAddressRecord { + pub(crate) fn records(&self) -> Box<[super::AddressRecord]> { + self.addresses + .iter() + .map(|(address, record)| super::AddressRecord { + address, + last_seen: &record.last_seen, + }) + .collect() + } + pub(crate) fn new(address: &Multiaddr) -> Self { + let mut address_book = HashMap::new(); + address_book.insert(address.clone(), AddressRecord::new()); + Self { + addresses: address_book, + } + } + pub(crate) fn on_address_update(&mut self, address: &Multiaddr) -> bool { + if let Some(record) = self.addresses.get_mut(address) { + record.update_last_seen(); + false + } else { + self.addresses.insert(address.clone(), AddressRecord::new()); + true + } + } +} + +pub(crate) struct AddressRecord { + /// The time when the address is last seen, in seconds. + last_seen: SystemTime, +} +impl AddressRecord { + pub(crate) fn new() -> Self { + Self { + last_seen: SystemTime::now(), + } + } + pub(crate) fn update_last_seen(&mut self) { + self.last_seen = SystemTime::now(); + } +} + +pub struct MemoryStore { + connected_peers: HashSet, + address_book: HashMap, +} + +impl Store for MemoryStore { + fn on_peer_connect(&mut self, peer: &PeerId) { + self.connected_peers.insert(*peer); + } + fn on_peer_disconnect(&mut self, peer: &PeerId) { + self.connected_peers.remove(peer); + } + fn on_address_update(&mut self, peer: &PeerId, address: &Multiaddr) -> bool { + if let Some(record) = self.address_book.get_mut(peer) { + record.on_address_update(address) + } else { + self.address_book + .insert(*peer, PeerAddressRecord::new(address)); + true + } + } + fn list_connected(&self) -> Box<[&PeerId]> { + self.connected_peers.iter().collect() + } + fn addresses_of_peer(&self, peer: &PeerId) -> Option> { + self.address_book.get(peer).map(|record| record.records()) + } +} From 9cb59c39c6e3320215f156cccf634d4c15691e7e Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Sat, 7 Dec 2024 15:33:01 +0800 Subject: [PATCH 03/26] manifest update --- Cargo.lock | 16 ++++++++-------- misc/peer-store/Cargo.toml | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b02abda6bdc..de03b741875 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2999,6 +2999,14 @@ dependencies = [ "zeroize", ] +[[package]] +name = "libp2p-peer-store" +version = "0.1.0" +dependencies = [ + "libp2p-core", + "libp2p-swarm", +] + [[package]] name = "libp2p-perf" version = "0.4.0" @@ -4298,14 +4306,6 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c" -[[package]] -name = "peer-store" -version = "0.1.0" -dependencies = [ - "libp2p-core", - "libp2p-swarm", -] - [[package]] name = "pem" version = "3.0.2" diff --git a/misc/peer-store/Cargo.toml b/misc/peer-store/Cargo.toml index c9013491301..ee6bace01fe 100644 --- a/misc/peer-store/Cargo.toml +++ b/misc/peer-store/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "peer-store" +name = "libp2p-peer-store" version = "0.1.0" edition = "2021" rust-version.workspace = true From fbc134461e75ce9e92bf32daff76b985fad7e8c3 Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Sat, 7 Dec 2024 15:34:57 +0800 Subject: [PATCH 04/26] formatting --- misc/peer-store/src/lib.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/misc/peer-store/src/lib.rs b/misc/peer-store/src/lib.rs index df40decd5d7..0d86a6e6d10 100644 --- a/misc/peer-store/src/lib.rs +++ b/misc/peer-store/src/lib.rs @@ -7,18 +7,18 @@ pub use behaviour::{Behaviour, Event}; use libp2p_core::Multiaddr; pub use store::{MemoryStore, Store}; -pub struct AddressRecord<'a>{ +pub struct AddressRecord<'a> { address: &'a Multiaddr, last_seen: &'a SystemTime, } -impl<'a> AddressRecord<'a>{ +impl<'a> AddressRecord<'a> { /// The address of this record. - pub fn address(&self)->&Multiaddr{ + pub fn address(&self) -> &Multiaddr { self.address } /// How much time has passed since the address is last reported wrt. current time. /// This may fail because of system time change. - pub fn last_seen(&self)->Result{ + pub fn last_seen(&self) -> Result { self.last_seen.elapsed() } -} \ No newline at end of file +} From 2bcfa7f32e8d45d50ffa4edb1cea26192c3816c4 Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Sat, 7 Dec 2024 15:48:16 +0800 Subject: [PATCH 05/26] docs --- misc/peer-store/src/behaviour.rs | 2 ++ misc/peer-store/src/store.rs | 10 +++++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/misc/peer-store/src/behaviour.rs b/misc/peer-store/src/behaviour.rs index f66fc7186d9..d69536614c5 100644 --- a/misc/peer-store/src/behaviour.rs +++ b/misc/peer-store/src/behaviour.rs @@ -5,12 +5,14 @@ use libp2p_swarm::{dummy, FromSwarm, NetworkBehaviour}; use crate::store::Store; +/// Events of this behaviour that will be emmitted to the swarm. pub enum Event { RecordUpdated { peer: PeerId }, } pub struct Behaviour { store: S, + /// Events that will be emitted. pending_events: VecDeque, } diff --git a/misc/peer-store/src/store.rs b/misc/peer-store/src/store.rs index d56a730a596..67bca3be782 100644 --- a/misc/peer-store/src/store.rs +++ b/misc/peer-store/src/store.rs @@ -5,8 +5,13 @@ use std::{ use libp2p_core::{Multiaddr, PeerId}; +/// A store that +/// - keep track of currently connected peers; +/// - contains all observed addresses of peers; pub trait Store { + /// Called when a peer connects. fn on_peer_connect(&mut self, peer: &PeerId); + /// Called when a peer disconnects. fn on_peer_disconnect(&mut self, peer: &PeerId); /// Update an address record. /// Return `true` when the address is new. @@ -47,7 +52,7 @@ impl PeerAddressRecord { } pub(crate) struct AddressRecord { - /// The time when the address is last seen, in seconds. + /// The time when the address is last seen. last_seen: SystemTime, } impl AddressRecord { @@ -61,8 +66,11 @@ impl AddressRecord { } } +/// A in-memory store. pub struct MemoryStore { + /// Peers that are currently connected. connected_peers: HashSet, + /// An address book of peers regardless of their status(connected or not). address_book: HashMap, } From 6df6ee5a87f0901a8b7b015ed73575d918ae0bf5 Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Sun, 8 Dec 2024 11:14:31 +0800 Subject: [PATCH 06/26] return iterator of references instead of heap allocation --- misc/peer-store/src/behaviour.rs | 9 ++++++--- misc/peer-store/src/store.rs | 13 ++++++------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/misc/peer-store/src/behaviour.rs b/misc/peer-store/src/behaviour.rs index d69536614c5..f76ff0a7e75 100644 --- a/misc/peer-store/src/behaviour.rs +++ b/misc/peer-store/src/behaviour.rs @@ -27,12 +27,15 @@ where } } /// List peers that are currently connected to this peer. - pub fn list_connected(&self) -> Box<[&PeerId]> { + pub fn list_connected(&self) -> impl Iterator { self.store.list_connected() } /// Try to get all observed address of the given peer. /// Returns `None` when the peer is not in the store. - pub fn address_of_peer(&self, peer: &PeerId) -> Option> { + pub fn address_of_peer<'a, 'b>( + &'a self, + peer: &'b PeerId, + ) -> Option> + use<'a, 'b, S>> { self.store.addresses_of_peer(peer) } /// Manually update a record. @@ -106,7 +109,7 @@ where _connection_id: libp2p_swarm::ConnectionId, _event: libp2p_swarm::THandlerOutEvent, ) { - todo!() + unreachable!("No event will be produced by a dummy handler.") } fn poll( diff --git a/misc/peer-store/src/store.rs b/misc/peer-store/src/store.rs index 67bca3be782..7426c509cf2 100644 --- a/misc/peer-store/src/store.rs +++ b/misc/peer-store/src/store.rs @@ -16,22 +16,21 @@ pub trait Store { /// Update an address record. /// Return `true` when the address is new. fn on_address_update(&mut self, peer: &PeerId, address: &Multiaddr) -> bool; - fn list_connected(&self) -> Box<[&PeerId]>; - fn addresses_of_peer(&self, peer: &PeerId) -> Option>; + fn list_connected(&self) -> impl Iterator; + fn addresses_of_peer(&self, peer: &PeerId) -> Option>; } pub(crate) struct PeerAddressRecord { addresses: HashMap, } impl PeerAddressRecord { - pub(crate) fn records(&self) -> Box<[super::AddressRecord]> { + pub(crate) fn records(&self) -> impl Iterator { self.addresses .iter() .map(|(address, record)| super::AddressRecord { address, last_seen: &record.last_seen, }) - .collect() } pub(crate) fn new(address: &Multiaddr) -> Self { let mut address_book = HashMap::new(); @@ -90,10 +89,10 @@ impl Store for MemoryStore { true } } - fn list_connected(&self) -> Box<[&PeerId]> { - self.connected_peers.iter().collect() + fn list_connected(&self) -> impl Iterator { + self.connected_peers.iter() } - fn addresses_of_peer(&self, peer: &PeerId) -> Option> { + fn addresses_of_peer(&self, peer: &PeerId) -> Option> { self.address_book.get(peer).map(|record| record.records()) } } From 99a6bfd56ed3191914cb24cfdb16070833001eb9 Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Sun, 8 Dec 2024 11:21:00 +0800 Subject: [PATCH 07/26] move conencted_peers out of Store simplify Store trait, don't report conencted unless confirmed by swarm event --- misc/peer-store/src/behaviour.rs | 24 +++++++++++++++++------- misc/peer-store/src/store.rs | 31 +++++++++---------------------- 2 files changed, 26 insertions(+), 29 deletions(-) diff --git a/misc/peer-store/src/behaviour.rs b/misc/peer-store/src/behaviour.rs index f76ff0a7e75..81e54256dd9 100644 --- a/misc/peer-store/src/behaviour.rs +++ b/misc/peer-store/src/behaviour.rs @@ -1,4 +1,7 @@ -use std::{collections::VecDeque, task::Poll}; +use std::{ + collections::{HashSet, VecDeque}, + task::Poll, +}; use libp2p_core::{Multiaddr, PeerId}; use libp2p_swarm::{dummy, FromSwarm, NetworkBehaviour}; @@ -12,6 +15,8 @@ pub enum Event { pub struct Behaviour { store: S, + /// Peers that are currently connected. + connected_peers: HashSet, /// Events that will be emitted. pending_events: VecDeque, } @@ -23,12 +28,13 @@ where pub fn new(store: S) -> Self { Self { store, + connected_peers: HashSet::new(), pending_events: VecDeque::new(), } } /// List peers that are currently connected to this peer. pub fn list_connected(&self) -> impl Iterator { - self.store.list_connected() + self.connected_peers.iter() } /// Try to get all observed address of the given peer. /// Returns `None` when the peer is not in the store. @@ -45,6 +51,12 @@ where self.pending_events .push_back(Event::RecordUpdated { peer: *peer }); } + fn on_peer_connect(&mut self, peer: &PeerId) { + self.connected_peers.insert(*peer); + } + fn on_peer_disconnect(&mut self, peer: &PeerId) { + self.connected_peers.remove(peer); + } } impl NetworkBehaviour for Behaviour @@ -62,7 +74,6 @@ where _local_addr: &libp2p_core::Multiaddr, remote_addr: &libp2p_core::Multiaddr, ) -> Result, libp2p_swarm::ConnectionDenied> { - self.store.on_peer_connect(&peer); if self.store.on_address_update(&peer, remote_addr) { self.pending_events.push_back(Event::RecordUpdated { peer }); } @@ -72,12 +83,11 @@ where fn handle_established_outbound_connection( &mut self, _connection_id: libp2p_swarm::ConnectionId, - peer: libp2p_core::PeerId, + _peer: libp2p_core::PeerId, _addr: &libp2p_core::Multiaddr, _role_override: libp2p_core::Endpoint, _port_use: libp2p_core::transport::PortUse, ) -> Result, libp2p_swarm::ConnectionDenied> { - self.store.on_peer_connect(&peer); Ok(dummy::ConnectionHandler) } @@ -85,12 +95,12 @@ where match event { FromSwarm::ConnectionClosed(info) => { if info.remaining_established < 1 { - self.store.on_peer_disconnect(&info.peer_id); + self.on_peer_disconnect(&info.peer_id); } } FromSwarm::ConnectionEstablished(info) => { if info.other_established == 0 { - self.store.on_peer_connect(&info.peer_id); + self.on_peer_connect(&info.peer_id); } } FromSwarm::NewExternalAddrOfPeer(info) => { diff --git a/misc/peer-store/src/store.rs b/misc/peer-store/src/store.rs index 7426c509cf2..6a7e5ac7298 100644 --- a/misc/peer-store/src/store.rs +++ b/misc/peer-store/src/store.rs @@ -1,7 +1,4 @@ -use std::{ - collections::{HashMap, HashSet}, - time::SystemTime, -}; +use std::{collections::HashMap, time::SystemTime}; use libp2p_core::{Multiaddr, PeerId}; @@ -9,15 +6,13 @@ use libp2p_core::{Multiaddr, PeerId}; /// - keep track of currently connected peers; /// - contains all observed addresses of peers; pub trait Store { - /// Called when a peer connects. - fn on_peer_connect(&mut self, peer: &PeerId); - /// Called when a peer disconnects. - fn on_peer_disconnect(&mut self, peer: &PeerId); /// Update an address record. /// Return `true` when the address is new. fn on_address_update(&mut self, peer: &PeerId, address: &Multiaddr) -> bool; - fn list_connected(&self) -> impl Iterator; - fn addresses_of_peer(&self, peer: &PeerId) -> Option>; + fn addresses_of_peer( + &self, + peer: &PeerId, + ) -> Option>; } pub(crate) struct PeerAddressRecord { @@ -67,19 +62,11 @@ impl AddressRecord { /// A in-memory store. pub struct MemoryStore { - /// Peers that are currently connected. - connected_peers: HashSet, /// An address book of peers regardless of their status(connected or not). address_book: HashMap, } impl Store for MemoryStore { - fn on_peer_connect(&mut self, peer: &PeerId) { - self.connected_peers.insert(*peer); - } - fn on_peer_disconnect(&mut self, peer: &PeerId) { - self.connected_peers.remove(peer); - } fn on_address_update(&mut self, peer: &PeerId, address: &Multiaddr) -> bool { if let Some(record) = self.address_book.get_mut(peer) { record.on_address_update(address) @@ -89,10 +76,10 @@ impl Store for MemoryStore { true } } - fn list_connected(&self) -> impl Iterator { - self.connected_peers.iter() - } - fn addresses_of_peer(&self, peer: &PeerId) -> Option> { + fn addresses_of_peer( + &self, + peer: &PeerId, + ) -> Option> { self.address_book.get(peer).map(|record| record.records()) } } From 5c7ba32fbd4a18dfb289f1a971f703332a47c3c4 Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Sun, 8 Dec 2024 11:26:31 +0800 Subject: [PATCH 08/26] use capped LruCache instead of uncapped HashMap for address records --- Cargo.lock | 1 + misc/peer-store/Cargo.toml | 1 + misc/peer-store/src/store.rs | 11 ++++++----- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index de03b741875..c016dc66185 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3005,6 +3005,7 @@ version = "0.1.0" dependencies = [ "libp2p-core", "libp2p-swarm", + "lru", ] [[package]] diff --git a/misc/peer-store/Cargo.toml b/misc/peer-store/Cargo.toml index ee6bace01fe..70ad378b8df 100644 --- a/misc/peer-store/Cargo.toml +++ b/misc/peer-store/Cargo.toml @@ -7,6 +7,7 @@ rust-version.workspace = true [dependencies] libp2p-core = { workspace = true } libp2p-swarm = { workspace = true } +lru = "*" [lints] workspace = true diff --git a/misc/peer-store/src/store.rs b/misc/peer-store/src/store.rs index 6a7e5ac7298..1b1aa572b40 100644 --- a/misc/peer-store/src/store.rs +++ b/misc/peer-store/src/store.rs @@ -1,6 +1,7 @@ -use std::{collections::HashMap, time::SystemTime}; +use std::{collections::HashMap, num::NonZeroUsize, time::SystemTime}; use libp2p_core::{Multiaddr, PeerId}; +use lru::LruCache; /// A store that /// - keep track of currently connected peers; @@ -16,7 +17,7 @@ pub trait Store { } pub(crate) struct PeerAddressRecord { - addresses: HashMap, + addresses: LruCache, } impl PeerAddressRecord { pub(crate) fn records(&self) -> impl Iterator { @@ -28,8 +29,8 @@ impl PeerAddressRecord { }) } pub(crate) fn new(address: &Multiaddr) -> Self { - let mut address_book = HashMap::new(); - address_book.insert(address.clone(), AddressRecord::new()); + let mut address_book = LruCache::new(NonZeroUsize::new(8).expect("8 is greater than 0")); + address_book.get_or_insert(address.clone(), AddressRecord::new); Self { addresses: address_book, } @@ -39,7 +40,7 @@ impl PeerAddressRecord { record.update_last_seen(); false } else { - self.addresses.insert(address.clone(), AddressRecord::new()); + self.addresses.get_or_insert(address.clone(), AddressRecord::new); true } } From b6dcd591024c84cfb5a615e4a62d82d577bab32a Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Sun, 8 Dec 2024 11:35:23 +0800 Subject: [PATCH 09/26] update address book when a connection is established regardless of future denial --- misc/peer-store/src/behaviour.rs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/misc/peer-store/src/behaviour.rs b/misc/peer-store/src/behaviour.rs index 81e54256dd9..3ae42f3f389 100644 --- a/misc/peer-store/src/behaviour.rs +++ b/misc/peer-store/src/behaviour.rs @@ -57,6 +57,12 @@ where fn on_peer_disconnect(&mut self, peer: &PeerId) { self.connected_peers.remove(peer); } + fn on_address_update(&mut self, peer: &PeerId, address: &Multiaddr) { + if self.store.on_address_update(peer, address) { + self.pending_events + .push_back(Event::RecordUpdated { peer: *peer }); + } + } } impl NetworkBehaviour for Behaviour @@ -74,20 +80,19 @@ where _local_addr: &libp2p_core::Multiaddr, remote_addr: &libp2p_core::Multiaddr, ) -> Result, libp2p_swarm::ConnectionDenied> { - if self.store.on_address_update(&peer, remote_addr) { - self.pending_events.push_back(Event::RecordUpdated { peer }); - } + self.on_address_update(&peer, remote_addr); Ok(dummy::ConnectionHandler) } fn handle_established_outbound_connection( &mut self, _connection_id: libp2p_swarm::ConnectionId, - _peer: libp2p_core::PeerId, - _addr: &libp2p_core::Multiaddr, + peer: libp2p_core::PeerId, + addr: &libp2p_core::Multiaddr, _role_override: libp2p_core::Endpoint, _port_use: libp2p_core::transport::PortUse, ) -> Result, libp2p_swarm::ConnectionDenied> { + self.on_address_update(&peer, addr); Ok(dummy::ConnectionHandler) } @@ -104,10 +109,7 @@ where } } FromSwarm::NewExternalAddrOfPeer(info) => { - if self.store.on_address_update(&info.peer_id, info.addr) { - self.pending_events - .push_back(Event::RecordUpdated { peer: info.peer_id }); - } + self.on_address_update(&info.peer_id, info.addr); } _ => {} } From cbb1906321a17feb379a2a7af802452cdf153813 Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Mon, 9 Dec 2024 14:14:36 +0800 Subject: [PATCH 10/26] provide address for dial --- misc/peer-store/src/behaviour.rs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/misc/peer-store/src/behaviour.rs b/misc/peer-store/src/behaviour.rs index 3ae42f3f389..5eadfe2a77f 100644 --- a/misc/peer-store/src/behaviour.rs +++ b/misc/peer-store/src/behaviour.rs @@ -84,6 +84,25 @@ where Ok(dummy::ConnectionHandler) } + fn handle_pending_outbound_connection( + &mut self, + _connection_id: libp2p_swarm::ConnectionId, + maybe_peer: Option, + _addresses: &[Multiaddr], + _effective_role: libp2p_core::Endpoint, + ) -> Result, libp2p_swarm::ConnectionDenied> { + if maybe_peer.is_none() { + return Ok(Vec::with_capacity(0)); + } + if let Some(i) = self + .store + .addresses_of_peer(&maybe_peer.expect("already handled")) + { + return Ok(i.map(|r| r.address).cloned().collect()); + } + Ok(Vec::with_capacity(0)) + } + fn handle_established_outbound_connection( &mut self, _connection_id: libp2p_swarm::ConnectionId, From 6270259c4124c5afb0d74e63967c0cc59aaa0d92 Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Fri, 13 Dec 2024 15:33:25 +0800 Subject: [PATCH 11/26] apply suggestions Store now handles FromSwarm directly; rename some on_* methods; allow removing address; update records on FromSwarm::ConnectionEstablished --- misc/peer-store/src/behaviour.rs | 20 +++-- misc/peer-store/src/lib.rs | 4 +- misc/peer-store/src/memory_store.rs | 109 ++++++++++++++++++++++++++++ misc/peer-store/src/store.rs | 83 +++------------------ 4 files changed, 136 insertions(+), 80 deletions(-) create mode 100644 misc/peer-store/src/memory_store.rs diff --git a/misc/peer-store/src/behaviour.rs b/misc/peer-store/src/behaviour.rs index 5eadfe2a77f..4313d8415e1 100644 --- a/misc/peer-store/src/behaviour.rs +++ b/misc/peer-store/src/behaviour.rs @@ -8,7 +8,7 @@ use libp2p_swarm::{dummy, FromSwarm, NetworkBehaviour}; use crate::store::Store; -/// Events of this behaviour that will be emmitted to the swarm. +/// Events generated by [`Behaviour`] and emitted back to the [`libp2p_swarm::Swarm`]. pub enum Event { RecordUpdated { peer: PeerId }, } @@ -17,7 +17,7 @@ pub struct Behaviour { store: S, /// Peers that are currently connected. connected_peers: HashSet, - /// Events that will be emitted. + /// Pending Events to be emitted back to the [`libp2p_swarm::Swarm`]. pending_events: VecDeque, } @@ -47,7 +47,7 @@ where /// Manually update a record. /// This will always cause an `Event::RecordUpdated` to be emitted. pub fn update_record(&mut self, peer: &PeerId, address: &Multiaddr) { - self.store.on_address_update(peer, address); + self.store.update_address(peer, address); self.pending_events .push_back(Event::RecordUpdated { peer: *peer }); } @@ -58,11 +58,17 @@ where self.connected_peers.remove(peer); } fn on_address_update(&mut self, peer: &PeerId, address: &Multiaddr) { - if self.store.on_address_update(peer, address) { + if self.store.update_address(peer, address) { self.pending_events .push_back(Event::RecordUpdated { peer: *peer }); } } + fn handle_store_event(&mut self, event: super::store::Event) { + use super::store::Event::*; + match event { + RecordUpdated(peer) => self.pending_events.push_back(Event::RecordUpdated { peer }), + } + } } impl NetworkBehaviour for Behaviour @@ -116,6 +122,9 @@ where } fn on_swarm_event(&mut self, event: libp2p_swarm::FromSwarm) { + if let Some(ev) = self.store.on_swarm_event(&event) { + self.handle_store_event(ev); + }; match event { FromSwarm::ConnectionClosed(info) => { if info.remaining_established < 1 { @@ -127,9 +136,6 @@ where self.on_peer_connect(&info.peer_id); } } - FromSwarm::NewExternalAddrOfPeer(info) => { - self.on_address_update(&info.peer_id, info.addr); - } _ => {} } } diff --git a/misc/peer-store/src/lib.rs b/misc/peer-store/src/lib.rs index 0d86a6e6d10..6fc9aab2db9 100644 --- a/misc/peer-store/src/lib.rs +++ b/misc/peer-store/src/lib.rs @@ -1,11 +1,13 @@ mod behaviour; +mod memory_store; mod store; use std::time::SystemTime; pub use behaviour::{Behaviour, Event}; use libp2p_core::Multiaddr; -pub use store::{MemoryStore, Store}; +pub use store::Store; +pub use memory_store::MemoryStore; pub struct AddressRecord<'a> { address: &'a Multiaddr, diff --git a/misc/peer-store/src/memory_store.rs b/misc/peer-store/src/memory_store.rs new file mode 100644 index 00000000000..09e2ea225ce --- /dev/null +++ b/misc/peer-store/src/memory_store.rs @@ -0,0 +1,109 @@ +use std::{collections::HashMap, num::NonZeroUsize, time::SystemTime}; + +use libp2p_core::{Multiaddr, PeerId}; +use libp2p_swarm::FromSwarm; +use lru::LruCache; + +use super::{store::Event, Store}; + +pub(crate) struct PeerAddressRecord { + addresses: LruCache, +} +impl PeerAddressRecord { + pub(crate) fn records(&self) -> impl Iterator { + self.addresses + .iter() + .map(|(address, record)| super::AddressRecord { + address, + last_seen: &record.last_seen, + }) + } + pub(crate) fn new(address: &Multiaddr) -> Self { + let mut address_book = LruCache::new(NonZeroUsize::new(8).expect("8 is greater than 0")); + address_book.get_or_insert(address.clone(), AddressRecord::new); + Self { + addresses: address_book, + } + } + pub(crate) fn update_address(&mut self, address: &Multiaddr) -> bool { + if let Some(record) = self.addresses.get_mut(address) { + record.update_last_seen(); + return false; + } + // reduce syscall(new record won't call `SystemTime::now()` twice) + self.addresses + .get_or_insert(address.clone(), AddressRecord::new); + true + } + pub(crate) fn remove_address(&mut self, address: &Multiaddr) -> bool { + self.addresses.pop(address).is_some() + } +} + +pub(crate) struct AddressRecord { + /// The time when the address is last seen. + last_seen: SystemTime, +} +impl AddressRecord { + pub(crate) fn new() -> Self { + Self { + last_seen: SystemTime::now(), + } + } + pub(crate) fn update_last_seen(&mut self) { + self.last_seen = SystemTime::now(); + } +} + +/// A in-memory store. +pub struct MemoryStore { + /// An address book of peers regardless of their status(connected or not). + address_book: HashMap, +} + +impl Store for MemoryStore { + fn update_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool { + if let Some(record) = self.address_book.get_mut(peer) { + record.update_address(address) + } else { + self.address_book + .insert(*peer, PeerAddressRecord::new(address)); + true + } + } + fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool { + if let Some(record) = self.address_book.get_mut(peer) { + return record.remove_address(address); + } + false + } + fn on_swarm_event(&mut self, swarm_event: &FromSwarm) -> Option { + match swarm_event { + FromSwarm::NewExternalAddrOfPeer(info) => { + if self.update_address(&info.peer_id, info.addr) { + return Some(Event::RecordUpdated(info.peer_id)); + } + None + } + FromSwarm::ConnectionEstablished(info) => { + let mut is_record_updated = false; + for failed_addr in info.failed_addresses { + is_record_updated |= self.remove_address(&info.peer_id, failed_addr); + } + is_record_updated |= + self.update_address(&info.peer_id, info.endpoint.get_remote_address()); + if is_record_updated { + return Some(Event::RecordUpdated(info.peer_id)); + } + None + } + _ => None, + } + } + fn addresses_of_peer( + &self, + peer: &PeerId, + ) -> Option> { + self.address_book.get(peer).map(|record| record.records()) + } +} diff --git a/misc/peer-store/src/store.rs b/misc/peer-store/src/store.rs index 1b1aa572b40..3b9af8f74dd 100644 --- a/misc/peer-store/src/store.rs +++ b/misc/peer-store/src/store.rs @@ -1,86 +1,25 @@ -use std::{collections::HashMap, num::NonZeroUsize, time::SystemTime}; - use libp2p_core::{Multiaddr, PeerId}; -use lru::LruCache; +use libp2p_swarm::FromSwarm; /// A store that /// - keep track of currently connected peers; /// - contains all observed addresses of peers; pub trait Store { /// Update an address record. - /// Return `true` when the address is new. - fn on_address_update(&mut self, peer: &PeerId, address: &Multiaddr) -> bool; + /// Returns `true` when the address is new. + fn update_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool; + /// Remove an address record. + /// Returns `true` when the address is removed. + fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool; + /// How this store handles events from the swarm. + fn on_swarm_event(&mut self, event: &FromSwarm) -> Option; + /// Get all stored addresses of the peer. fn addresses_of_peer( &self, peer: &PeerId, ) -> Option>; } -pub(crate) struct PeerAddressRecord { - addresses: LruCache, -} -impl PeerAddressRecord { - pub(crate) fn records(&self) -> impl Iterator { - self.addresses - .iter() - .map(|(address, record)| super::AddressRecord { - address, - last_seen: &record.last_seen, - }) - } - pub(crate) fn new(address: &Multiaddr) -> Self { - let mut address_book = LruCache::new(NonZeroUsize::new(8).expect("8 is greater than 0")); - address_book.get_or_insert(address.clone(), AddressRecord::new); - Self { - addresses: address_book, - } - } - pub(crate) fn on_address_update(&mut self, address: &Multiaddr) -> bool { - if let Some(record) = self.addresses.get_mut(address) { - record.update_last_seen(); - false - } else { - self.addresses.get_or_insert(address.clone(), AddressRecord::new); - true - } - } -} - -pub(crate) struct AddressRecord { - /// The time when the address is last seen. - last_seen: SystemTime, -} -impl AddressRecord { - pub(crate) fn new() -> Self { - Self { - last_seen: SystemTime::now(), - } - } - pub(crate) fn update_last_seen(&mut self) { - self.last_seen = SystemTime::now(); - } -} - -/// A in-memory store. -pub struct MemoryStore { - /// An address book of peers regardless of their status(connected or not). - address_book: HashMap, -} - -impl Store for MemoryStore { - fn on_address_update(&mut self, peer: &PeerId, address: &Multiaddr) -> bool { - if let Some(record) = self.address_book.get_mut(peer) { - record.on_address_update(address) - } else { - self.address_book - .insert(*peer, PeerAddressRecord::new(address)); - true - } - } - fn addresses_of_peer( - &self, - peer: &PeerId, - ) -> Option> { - self.address_book.get(peer).map(|record| record.records()) - } +pub enum Event { + RecordUpdated(PeerId), } From cc91ab5e2260849b7e40004eee0dc682d819de31 Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Tue, 17 Dec 2024 20:36:38 +0800 Subject: [PATCH 12/26] garbage collect records, test --- Cargo.lock | 3 + misc/peer-store/Cargo.toml | 5 + misc/peer-store/src/behaviour.rs | 69 ++++-- misc/peer-store/src/lib.rs | 21 +- misc/peer-store/src/memory_store.rs | 341 +++++++++++++++++++++++----- misc/peer-store/src/store.rs | 32 ++- 6 files changed, 370 insertions(+), 101 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c016dc66185..b99cf30a625 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3003,7 +3003,10 @@ dependencies = [ name = "libp2p-peer-store" version = "0.1.0" dependencies = [ + "futures-timer", + "futures-util", "libp2p-core", + "libp2p-identity", "libp2p-swarm", "lru", ] diff --git a/misc/peer-store/Cargo.toml b/misc/peer-store/Cargo.toml index 70ad378b8df..072ad6979d3 100644 --- a/misc/peer-store/Cargo.toml +++ b/misc/peer-store/Cargo.toml @@ -8,6 +8,11 @@ rust-version.workspace = true libp2p-core = { workspace = true } libp2p-swarm = { workspace = true } lru = "*" +futures-timer = "*" +futures-util = "*" + +[dev-dependencies] +libp2p-identity = { workspace = true, features = ["rand"] } [lints] workspace = true diff --git a/misc/peer-store/src/behaviour.rs b/misc/peer-store/src/behaviour.rs index 4313d8415e1..6d212fdae5f 100644 --- a/misc/peer-store/src/behaviour.rs +++ b/misc/peer-store/src/behaviour.rs @@ -1,35 +1,47 @@ use std::{ collections::{HashSet, VecDeque}, task::Poll, + time::Duration, }; +use futures_timer::Delay; +use futures_util::FutureExt; use libp2p_core::{Multiaddr, PeerId}; use libp2p_swarm::{dummy, FromSwarm, NetworkBehaviour}; -use crate::store::Store; +use crate::store::{AddressSource, Store}; /// Events generated by [`Behaviour`] and emitted back to the [`libp2p_swarm::Swarm`]. pub enum Event { RecordUpdated { peer: PeerId }, } +pub struct Config { + /// The interval for garbage collecting records. + check_record_ttl_interval: Duration, +} + pub struct Behaviour { store: S, /// Peers that are currently connected. connected_peers: HashSet, /// Pending Events to be emitted back to the [`libp2p_swarm::Swarm`]. pending_events: VecDeque, + record_ttl_timer: Option, + config: Config, } -impl Behaviour +impl<'a, S> Behaviour where - S: Store + 'static, + S: Store<'a> + 'static, { - pub fn new(store: S) -> Self { + pub fn new(store: S, config: Config) -> Self { Self { store, connected_peers: HashSet::new(), pending_events: VecDeque::new(), + record_ttl_timer: Some(Delay::new(config.check_record_ttl_interval)), + config, } } /// List peers that are currently connected to this peer. @@ -38,16 +50,17 @@ where } /// Try to get all observed address of the given peer. /// Returns `None` when the peer is not in the store. - pub fn address_of_peer<'a, 'b>( + pub fn address_of_peer<'b>( &'a self, peer: &'b PeerId, - ) -> Option> + use<'a, 'b, S>> { + ) -> Option + use<'a, 'b, S>> { self.store.addresses_of_peer(peer) } /// Manually update a record. /// This will always cause an `Event::RecordUpdated` to be emitted. pub fn update_record(&mut self, peer: &PeerId, address: &Multiaddr) { - self.store.update_address(peer, address); + self.store + .update_address(peer, address, AddressSource::Manual, false); self.pending_events .push_back(Event::RecordUpdated { peer: *peer }); } @@ -57,8 +70,17 @@ where fn on_peer_disconnect(&mut self, peer: &PeerId) { self.connected_peers.remove(peer); } - fn on_address_update(&mut self, peer: &PeerId, address: &Multiaddr) { - if self.store.update_address(peer, address) { + fn on_address_update( + &mut self, + peer: &PeerId, + address: &Multiaddr, + source: AddressSource, + should_expire: bool, + ) { + if self + .store + .update_address(peer, address, source, should_expire) + { self.pending_events .push_back(Event::RecordUpdated { peer: *peer }); } @@ -71,9 +93,9 @@ where } } -impl NetworkBehaviour for Behaviour +impl<'a, S> NetworkBehaviour for Behaviour where - S: Store + 'static, + S: Store<'a> + 'static, { type ConnectionHandler = dummy::ConnectionHandler; @@ -86,7 +108,7 @@ where _local_addr: &libp2p_core::Multiaddr, remote_addr: &libp2p_core::Multiaddr, ) -> Result, libp2p_swarm::ConnectionDenied> { - self.on_address_update(&peer, remote_addr); + self.on_address_update(&peer, remote_addr, AddressSource::DirectConnection, false); Ok(dummy::ConnectionHandler) } @@ -104,7 +126,7 @@ where .store .addresses_of_peer(&maybe_peer.expect("already handled")) { - return Ok(i.map(|r| r.address).cloned().collect()); + return Ok(i.cloned().collect()); } Ok(Vec::with_capacity(0)) } @@ -117,7 +139,7 @@ where _role_override: libp2p_core::Endpoint, _port_use: libp2p_core::transport::PortUse, ) -> Result, libp2p_swarm::ConnectionDenied> { - self.on_address_update(&peer, addr); + self.on_address_update(&peer, addr, AddressSource::DirectConnection, false); Ok(dummy::ConnectionHandler) } @@ -151,12 +173,29 @@ where fn poll( &mut self, - _cx: &mut std::task::Context<'_>, + cx: &mut std::task::Context<'_>, ) -> std::task::Poll>> { if let Some(ev) = self.pending_events.pop_front() { return Poll::Ready(libp2p_swarm::ToSwarm::GenerateEvent(ev)); } + self.poll_record_ttl(cx); Poll::Pending } } + +impl<'a, S> Behaviour +where + S: Store<'a>, +{ + /// Garbage collect records. + fn poll_record_ttl(&mut self, cx: &mut std::task::Context<'_>) { + if let Some(mut timer) = self.record_ttl_timer.take() { + if let Poll::Ready(()) = timer.poll_unpin(cx) { + self.store.check_ttl(); + self.record_ttl_timer = Some(Delay::new(self.config.check_record_ttl_interval)); + } + self.record_ttl_timer = Some(timer) + } + } +} diff --git a/misc/peer-store/src/lib.rs b/misc/peer-store/src/lib.rs index 6fc9aab2db9..afe7e457413 100644 --- a/misc/peer-store/src/lib.rs +++ b/misc/peer-store/src/lib.rs @@ -2,25 +2,6 @@ mod behaviour; mod memory_store; mod store; -use std::time::SystemTime; - pub use behaviour::{Behaviour, Event}; -use libp2p_core::Multiaddr; +pub use memory_store::{AddressRecord, Config, MemoryStore}; pub use store::Store; -pub use memory_store::MemoryStore; - -pub struct AddressRecord<'a> { - address: &'a Multiaddr, - last_seen: &'a SystemTime, -} -impl<'a> AddressRecord<'a> { - /// The address of this record. - pub fn address(&self) -> &Multiaddr { - self.address - } - /// How much time has passed since the address is last reported wrt. current time. - /// This may fail because of system time change. - pub fn last_seen(&self) -> Result { - self.last_seen.elapsed() - } -} diff --git a/misc/peer-store/src/memory_store.rs b/misc/peer-store/src/memory_store.rs index 09e2ea225ce..4d5fc47d5b1 100644 --- a/misc/peer-store/src/memory_store.rs +++ b/misc/peer-store/src/memory_store.rs @@ -1,73 +1,47 @@ -use std::{collections::HashMap, num::NonZeroUsize, time::SystemTime}; +use std::{ + collections::HashMap, + num::NonZeroUsize, + time::{Duration, Instant}, +}; use libp2p_core::{Multiaddr, PeerId}; use libp2p_swarm::FromSwarm; -use lru::LruCache; use super::{store::Event, Store}; +use crate::store::AddressSource; -pub(crate) struct PeerAddressRecord { - addresses: LruCache, -} -impl PeerAddressRecord { - pub(crate) fn records(&self) -> impl Iterator { - self.addresses - .iter() - .map(|(address, record)| super::AddressRecord { - address, - last_seen: &record.last_seen, - }) - } - pub(crate) fn new(address: &Multiaddr) -> Self { - let mut address_book = LruCache::new(NonZeroUsize::new(8).expect("8 is greater than 0")); - address_book.get_or_insert(address.clone(), AddressRecord::new); - Self { - addresses: address_book, - } - } - pub(crate) fn update_address(&mut self, address: &Multiaddr) -> bool { - if let Some(record) = self.addresses.get_mut(address) { - record.update_last_seen(); - return false; - } - // reduce syscall(new record won't call `SystemTime::now()` twice) - self.addresses - .get_or_insert(address.clone(), AddressRecord::new); - true - } - pub(crate) fn remove_address(&mut self, address: &Multiaddr) -> bool { - self.addresses.pop(address).is_some() - } +/// A in-memory store. +#[derive(Default)] +pub struct MemoryStore { + /// An address book of peers regardless of their status(connected or not). + address_book: HashMap, + config: Config, } -pub(crate) struct AddressRecord { - /// The time when the address is last seen. - last_seen: SystemTime, -} -impl AddressRecord { - pub(crate) fn new() -> Self { +impl MemoryStore { + pub fn new(config: Config) -> Self { Self { - last_seen: SystemTime::now(), + config, + ..Default::default() } } - pub(crate) fn update_last_seen(&mut self) { - self.last_seen = SystemTime::now(); - } -} - -/// A in-memory store. -pub struct MemoryStore { - /// An address book of peers regardless of their status(connected or not). - address_book: HashMap, } -impl Store for MemoryStore { - fn update_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool { +impl<'a> Store<'a> for MemoryStore { + type AddressRecord = AddressRecord<'a>; + fn update_address( + &mut self, + peer: &PeerId, + address: &Multiaddr, + source: AddressSource, + should_expire: bool, + ) -> bool { if let Some(record) = self.address_book.get_mut(peer) { - record.update_address(address) + record.update_address(address, source, should_expire) } else { - self.address_book - .insert(*peer, PeerAddressRecord::new(address)); + let mut new_record = record::PeerAddressRecord::new(self.config.record_capacity); + new_record.update_address(address, source, should_expire); + self.address_book.insert(*peer, new_record); true } } @@ -77,10 +51,11 @@ impl Store for MemoryStore { } false } + fn on_swarm_event(&mut self, swarm_event: &FromSwarm) -> Option { match swarm_event { FromSwarm::NewExternalAddrOfPeer(info) => { - if self.update_address(&info.peer_id, info.addr) { + if self.update_address(&info.peer_id, info.addr, AddressSource::Behaviour, true) { return Some(Event::RecordUpdated(info.peer_id)); } None @@ -90,8 +65,12 @@ impl Store for MemoryStore { for failed_addr in info.failed_addresses { is_record_updated |= self.remove_address(&info.peer_id, failed_addr); } - is_record_updated |= - self.update_address(&info.peer_id, info.endpoint.get_remote_address()); + is_record_updated |= self.update_address( + &info.peer_id, + info.endpoint.get_remote_address(), + AddressSource::DirectConnection, + false, + ); if is_record_updated { return Some(Event::RecordUpdated(info.peer_id)); } @@ -100,10 +79,250 @@ impl Store for MemoryStore { _ => None, } } - fn addresses_of_peer( - &self, + fn addresses_of_peer<'b>(&self, peer: &'b PeerId) -> Option> { + self.address_book + .get(peer) + .map(|record| record.records().map(|r| r.address)) + } + fn address_record_of_peer( + &'a self, peer: &PeerId, - ) -> Option> { + ) -> Option>> { self.address_book.get(peer).map(|record| record.records()) } + fn check_ttl(&mut self) { + let now = Instant::now(); + for (_, r) in &mut self.address_book { + r.check_ttl(now, self.config.record_ttl); + } + } +} + +pub struct Config { + record_ttl: Duration, + record_capacity: NonZeroUsize, +} + +impl Default for Config { + fn default() -> Self { + Self { + record_ttl: Duration::from_secs(600), + record_capacity: NonZeroUsize::try_from(8).expect("8 > 0"), + } + } +} + +pub struct AddressRecord<'a> { + /// The address of this record. + address: &'a Multiaddr, + last_seen: &'a Instant, + pub source: AddressSource, + pub should_expire: bool, +} +impl<'a> AddressRecord<'a> { + /// How much time has passed since the address is last reported wrt. current time. + /// This may fail because of system time change. + pub fn last_seen(&self, now: Instant) -> std::time::Duration { + now.duration_since(self.last_seen.clone()) + } +} + +mod record { + use lru::LruCache; + + use super::*; + + pub(crate) struct PeerAddressRecord { + addresses: LruCache, + } + impl PeerAddressRecord { + pub(crate) fn records(&self) -> impl Iterator { + self.addresses + .iter() + .map(|(address, record)| super::AddressRecord { + address, + last_seen: &record.last_seen, + source: record.source, + should_expire: record.should_expire, + }) + } + pub(crate) fn new(capacity: NonZeroUsize) -> Self { + Self { + addresses: LruCache::new(capacity), + } + } + pub(crate) fn update_address( + &mut self, + address: &Multiaddr, + source: AddressSource, + should_expire: bool, + ) -> bool { + if let Some(record) = self.addresses.get_mut(address) { + record.update_last_seen(); + return false; + } + // new record won't call `Instant::now()` twice + self.addresses.get_or_insert(address.clone(), || { + AddressRecord::new(source, should_expire) + }); + true + } + pub(crate) fn remove_address(&mut self, address: &Multiaddr) -> bool { + self.addresses.pop(address).is_some() + } + pub(crate) fn check_ttl(&mut self, now: Instant, ttl: Duration) { + let mut records_to_be_deleted = Vec::new(); + for (k, record) in self.addresses.iter() { + if record.is_expired(now, ttl) { + records_to_be_deleted.push(k.clone()); + } + } + for k in records_to_be_deleted { + self.addresses.pop(&k); + } + } + } + + pub(crate) struct AddressRecord { + /// The time when the address is last seen. + last_seen: Instant, + source: AddressSource, + should_expire: bool, + } + impl AddressRecord { + pub(crate) fn new(source: AddressSource, should_expire: bool) -> Self { + Self { + last_seen: Instant::now(), + source, + should_expire, + } + } + pub(crate) fn update_last_seen(&mut self) { + self.last_seen = Instant::now(); + } + pub(crate) fn is_expired(&self, now: Instant, ttl: Duration) -> bool { + self.should_expire && now.duration_since(self.last_seen) > ttl + } + } +} + +#[cfg(test)] +mod test { + use std::{num::NonZeroUsize, str::FromStr, thread, time::Duration}; + + use libp2p_core::{Multiaddr, PeerId}; + + use super::{Config, MemoryStore}; + use crate::Store; + + #[test] + fn record_expire() { + let config = Config { + record_capacity: NonZeroUsize::try_from(4).expect("4 > 0"), + record_ttl: Duration::from_millis(1), + }; + let mut store = MemoryStore::new(config); + let fake_peer = PeerId::random(); + let addr_no_expire = Multiaddr::from_str("/ip4/127.0.0.1").expect("parsing to succeed"); + let addr_should_expire = Multiaddr::from_str("/ip4/127.0.0.2").expect("parsing to succeed"); + store.update_address( + &fake_peer, + &addr_no_expire, + crate::store::AddressSource::Manual, + false, + ); + store.update_address( + &fake_peer, + &addr_should_expire, + crate::store::AddressSource::Manual, + true, + ); + thread::sleep(Duration::from_millis(2)); + store.check_ttl(); + assert!(store + .addresses_of_peer(&fake_peer) + .expect("peer to be in the store") + .find(|r| **r == addr_should_expire) + .is_none()); + assert!(store + .addresses_of_peer(&fake_peer) + .expect("peer to be in the store") + .find(|r| **r == addr_no_expire) + .is_some()); + } + + #[test] + fn recent_use_bubble_up() { + let mut store = MemoryStore::new(Default::default()); + let fake_peer = PeerId::random(); + let addr1 = Multiaddr::from_str("/ip4/127.0.0.1").expect("parsing to succeed"); + let addr2 = Multiaddr::from_str("/ip4/127.0.0.2").expect("parsing to succeed"); + store.update_address( + &fake_peer, + &addr1, + crate::store::AddressSource::Manual, + false, + ); + store.update_address( + &fake_peer, + &addr2, + crate::store::AddressSource::Manual, + false, + ); + assert!( + *store + .address_book + .get(&fake_peer) + .expect("peer to be in the store") + .records() + .last() + .expect("addr in the record") + .address + == addr1 + ); + store.update_address( + &fake_peer, + &addr1, + crate::store::AddressSource::Manual, + false, + ); + assert!( + *store + .address_book + .get(&fake_peer) + .expect("peer to be in the store") + .records() + .last() + .expect("addr in the record") + .address + == addr2 + ); + } + + #[test] + fn bounded_store() { + let mut store = MemoryStore::new(Default::default()); + let fake_peer = PeerId::random(); + for i in 1..10 { + let addr_string = format!("/ip4/127.0.0.{}", i); + store.update_address( + &fake_peer, + &Multiaddr::from_str(&addr_string).expect("parsing to succeed"), + crate::store::AddressSource::Manual, + false, + ); + } + let first_record = Multiaddr::from_str("/ip4/127.0.0.1").expect("parsing to succeed"); + assert!(store + .addresses_of_peer(&fake_peer) + .expect("peer to be in the store") + .find(|addr| **addr == first_record) + .is_none()); + let second_record = Multiaddr::from_str("/ip4/127.0.0.2").expect("parsing to succeed"); + assert!(store + .addresses_of_peer(&fake_peer) + .expect("peer to be in the store") + .find(|addr| **addr == second_record) + .is_some()); + } } diff --git a/misc/peer-store/src/store.rs b/misc/peer-store/src/store.rs index 3b9af8f74dd..1bb9ae51040 100644 --- a/misc/peer-store/src/store.rs +++ b/misc/peer-store/src/store.rs @@ -4,22 +4,44 @@ use libp2p_swarm::FromSwarm; /// A store that /// - keep track of currently connected peers; /// - contains all observed addresses of peers; -pub trait Store { +pub trait Store<'a> { + type AddressRecord; /// Update an address record. /// Returns `true` when the address is new. - fn update_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool; + fn update_address( + &mut self, + peer: &PeerId, + address: &Multiaddr, + source: AddressSource, + should_expire: bool, + ) -> bool; /// Remove an address record. /// Returns `true` when the address is removed. fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool; /// How this store handles events from the swarm. fn on_swarm_event(&mut self, event: &FromSwarm) -> Option; /// Get all stored addresses of the peer. - fn addresses_of_peer( - &self, + fn addresses_of_peer<'b>(&self, peer: &'b PeerId) -> Option>; + /// Get all stored address records of the peer. + fn address_record_of_peer( + &'a self, peer: &PeerId, - ) -> Option>; + ) -> Option>; + /// C + fn check_ttl(&mut self); } pub enum Event { RecordUpdated(PeerId), } + +/// How the address is discovered. +#[derive(Debug, Clone, Copy)] +pub enum AddressSource { + /// The address is discovered from a behaviour(e.g. kadelima, identify). + Behaviour, + /// We have direct connection to the address. + DirectConnection, + /// The address is manually added. + Manual, +} From f9b040e71d7eed9c4868c96e595514407adc9a3c Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Wed, 18 Dec 2024 10:15:59 +0800 Subject: [PATCH 13/26] documentation and formatting --- misc/peer-store/src/behaviour.rs | 10 ++++-- misc/peer-store/src/memory_store.rs | 54 +++++++++++++++++++---------- misc/peer-store/src/store.rs | 7 +++- 3 files changed, 50 insertions(+), 21 deletions(-) diff --git a/misc/peer-store/src/behaviour.rs b/misc/peer-store/src/behaviour.rs index 6d212fdae5f..ed078f6afa0 100644 --- a/misc/peer-store/src/behaviour.rs +++ b/misc/peer-store/src/behaviour.rs @@ -13,6 +13,9 @@ use crate::store::{AddressSource, Store}; /// Events generated by [`Behaviour`] and emitted back to the [`libp2p_swarm::Swarm`]. pub enum Event { + /// The peer's record has been updated. + /// Manually updating a record will always emit this event + /// even if it provides no new information. RecordUpdated { peer: PeerId }, } @@ -44,10 +47,12 @@ where config, } } + /// List peers that are currently connected to this peer. pub fn list_connected(&self) -> impl Iterator { self.connected_peers.iter() } + /// Try to get all observed address of the given peer. /// Returns `None` when the peer is not in the store. pub fn address_of_peer<'b>( @@ -56,9 +61,10 @@ where ) -> Option + use<'a, 'b, S>> { self.store.addresses_of_peer(peer) } + /// Manually update a record. - /// This will always cause an `Event::RecordUpdated` to be emitted. - pub fn update_record(&mut self, peer: &PeerId, address: &Multiaddr) { + /// This will always emit an `Event::RecordUpdated`. + pub fn update_address(&mut self, peer: &PeerId, address: &Multiaddr) { self.store .update_address(peer, address, AddressSource::Manual, false); self.pending_events diff --git a/misc/peer-store/src/memory_store.rs b/misc/peer-store/src/memory_store.rs index 4d5fc47d5b1..e3a585b28de 100644 --- a/misc/peer-store/src/memory_store.rs +++ b/misc/peer-store/src/memory_store.rs @@ -29,6 +29,7 @@ impl MemoryStore { impl<'a> Store<'a> for MemoryStore { type AddressRecord = AddressRecord<'a>; + fn update_address( &mut self, peer: &PeerId, @@ -37,14 +38,14 @@ impl<'a> Store<'a> for MemoryStore { should_expire: bool, ) -> bool { if let Some(record) = self.address_book.get_mut(peer) { - record.update_address(address, source, should_expire) - } else { - let mut new_record = record::PeerAddressRecord::new(self.config.record_capacity); - new_record.update_address(address, source, should_expire); - self.address_book.insert(*peer, new_record); - true + return record.update_address(address, source, should_expire); } + let mut new_record = record::PeerAddressRecord::new(self.config.record_capacity); + new_record.update_address(address, source, should_expire); + self.address_book.insert(*peer, new_record); + true } + fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool { if let Some(record) = self.address_book.get_mut(peer) { return record.remove_address(address); @@ -79,27 +80,33 @@ impl<'a> Store<'a> for MemoryStore { _ => None, } } + fn addresses_of_peer<'b>(&self, peer: &'b PeerId) -> Option> { self.address_book .get(peer) .map(|record| record.records().map(|r| r.address)) } + fn address_record_of_peer( &'a self, peer: &PeerId, ) -> Option>> { self.address_book.get(peer).map(|record| record.records()) } + fn check_ttl(&mut self) { let now = Instant::now(); - for (_, r) in &mut self.address_book { + for r in &mut self.address_book.values_mut() { r.check_ttl(now, self.config.record_ttl); } } } pub struct Config { + /// TTL for a record. record_ttl: Duration, + /// The capacaity of a record store. + /// The least used record will be discarded when the store is full. record_capacity: NonZeroUsize, } @@ -113,17 +120,24 @@ impl Default for Config { } pub struct AddressRecord<'a> { - /// The address of this record. - address: &'a Multiaddr, + /// The last time we saw this address. last_seen: &'a Instant, + /// The address of this record. + pub address: &'a Multiaddr, + /// How we observed the address. pub source: AddressSource, + /// Whether the address expires. pub should_expire: bool, } impl<'a> AddressRecord<'a> { - /// How much time has passed since the address is last reported wrt. current time. - /// This may fail because of system time change. - pub fn last_seen(&self, now: Instant) -> std::time::Duration { - now.duration_since(self.last_seen.clone()) + /// How much time has passed since the address is last reported wrt. the given instant. + pub fn last_seen_since(&self, now: Instant) -> Duration { + now.duration_since(*self.last_seen) + } + /// How much time has passed since the address is last reported wrt. current time. + pub fn last_seen(&self) -> Duration { + let now = Instant::now(); + now.duration_since(*self.last_seen) } } @@ -133,9 +147,16 @@ mod record { use super::*; pub(crate) struct PeerAddressRecord { + /// A LRU(Least Recently Used) cache for addresses. + /// Will delete the least-recently-used record when full. addresses: LruCache, } impl PeerAddressRecord { + pub(crate) fn new(capacity: NonZeroUsize) -> Self { + Self { + addresses: LruCache::new(capacity), + } + } pub(crate) fn records(&self) -> impl Iterator { self.addresses .iter() @@ -146,11 +167,6 @@ mod record { should_expire: record.should_expire, }) } - pub(crate) fn new(capacity: NonZeroUsize) -> Self { - Self { - addresses: LruCache::new(capacity), - } - } pub(crate) fn update_address( &mut self, address: &Multiaddr, @@ -186,7 +202,9 @@ mod record { pub(crate) struct AddressRecord { /// The time when the address is last seen. last_seen: Instant, + /// How the address is discovered. source: AddressSource, + /// Whether the address will expire. should_expire: bool, } impl AddressRecord { diff --git a/misc/peer-store/src/store.rs b/misc/peer-store/src/store.rs index 1bb9ae51040..86721f4a237 100644 --- a/misc/peer-store/src/store.rs +++ b/misc/peer-store/src/store.rs @@ -2,10 +2,10 @@ use libp2p_core::{Multiaddr, PeerId}; use libp2p_swarm::FromSwarm; /// A store that -/// - keep track of currently connected peers; /// - contains all observed addresses of peers; pub trait Store<'a> { type AddressRecord; + /// Update an address record. /// Returns `true` when the address is new. fn update_address( @@ -15,18 +15,23 @@ pub trait Store<'a> { source: AddressSource, should_expire: bool, ) -> bool; + /// Remove an address record. /// Returns `true` when the address is removed. fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool; + /// How this store handles events from the swarm. fn on_swarm_event(&mut self, event: &FromSwarm) -> Option; + /// Get all stored addresses of the peer. fn addresses_of_peer<'b>(&self, peer: &'b PeerId) -> Option>; + /// Get all stored address records of the peer. fn address_record_of_peer( &'a self, peer: &PeerId, ) -> Option>; + /// C fn check_ttl(&mut self); } From a9597daee5dd1f39a5c45ab52e9df80063e7dd0e Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Wed, 18 Dec 2024 10:28:11 +0800 Subject: [PATCH 14/26] clippy lint --- misc/peer-store/src/behaviour.rs | 2 +- misc/peer-store/src/memory_store.rs | 20 ++++++++------------ misc/peer-store/src/store.rs | 2 +- 3 files changed, 10 insertions(+), 14 deletions(-) diff --git a/misc/peer-store/src/behaviour.rs b/misc/peer-store/src/behaviour.rs index ed078f6afa0..8401a4911e8 100644 --- a/misc/peer-store/src/behaviour.rs +++ b/misc/peer-store/src/behaviour.rs @@ -58,7 +58,7 @@ where pub fn address_of_peer<'b>( &'a self, peer: &'b PeerId, - ) -> Option + use<'a, 'b, S>> { + ) -> Option + use<'a, 'b, S>> { self.store.addresses_of_peer(peer) } diff --git a/misc/peer-store/src/memory_store.rs b/misc/peer-store/src/memory_store.rs index e3a585b28de..0b47eedec69 100644 --- a/misc/peer-store/src/memory_store.rs +++ b/misc/peer-store/src/memory_store.rs @@ -81,7 +81,7 @@ impl<'a> Store<'a> for MemoryStore { } } - fn addresses_of_peer<'b>(&self, peer: &'b PeerId) -> Option> { + fn addresses_of_peer(&self, peer: &PeerId) -> Option> { self.address_book .get(peer) .map(|record| record.records().map(|r| r.address)) @@ -129,7 +129,7 @@ pub struct AddressRecord<'a> { /// Whether the address expires. pub should_expire: bool, } -impl<'a> AddressRecord<'a> { +impl AddressRecord<'_> { /// How much time has passed since the address is last reported wrt. the given instant. pub fn last_seen_since(&self, now: Instant) -> Duration { now.duration_since(*self.last_seen) @@ -257,16 +257,14 @@ mod test { ); thread::sleep(Duration::from_millis(2)); store.check_ttl(); - assert!(store + assert!(!store .addresses_of_peer(&fake_peer) .expect("peer to be in the store") - .find(|r| **r == addr_should_expire) - .is_none()); + .any(|r| *r == addr_should_expire)); assert!(store .addresses_of_peer(&fake_peer) .expect("peer to be in the store") - .find(|r| **r == addr_no_expire) - .is_some()); + .any(|r| *r == addr_no_expire)); } #[test] @@ -331,16 +329,14 @@ mod test { ); } let first_record = Multiaddr::from_str("/ip4/127.0.0.1").expect("parsing to succeed"); - assert!(store + assert!(!store .addresses_of_peer(&fake_peer) .expect("peer to be in the store") - .find(|addr| **addr == first_record) - .is_none()); + .any(|addr| *addr == first_record)); let second_record = Multiaddr::from_str("/ip4/127.0.0.2").expect("parsing to succeed"); assert!(store .addresses_of_peer(&fake_peer) .expect("peer to be in the store") - .find(|addr| **addr == second_record) - .is_some()); + .any(|addr| *addr == second_record)); } } diff --git a/misc/peer-store/src/store.rs b/misc/peer-store/src/store.rs index 86721f4a237..5ac5d5082e3 100644 --- a/misc/peer-store/src/store.rs +++ b/misc/peer-store/src/store.rs @@ -24,7 +24,7 @@ pub trait Store<'a> { fn on_swarm_event(&mut self, event: &FromSwarm) -> Option; /// Get all stored addresses of the peer. - fn addresses_of_peer<'b>(&self, peer: &'b PeerId) -> Option>; + fn addresses_of_peer(&self, peer: &PeerId) -> Option>; /// Get all stored address records of the peer. fn address_record_of_peer( From 0e6b2802c2d8c31013814ed790530fd0cf6751f3 Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Fri, 20 Dec 2024 14:36:30 +0800 Subject: [PATCH 15/26] simplify Store trait --- misc/peer-store/src/behaviour.rs | 11 +++++++++++ misc/peer-store/src/memory_store.rs | 22 ++++++++++++++-------- misc/peer-store/src/store.rs | 8 +------- 3 files changed, 26 insertions(+), 15 deletions(-) diff --git a/misc/peer-store/src/behaviour.rs b/misc/peer-store/src/behaviour.rs index 8401a4911e8..e512a5f47dc 100644 --- a/misc/peer-store/src/behaviour.rs +++ b/misc/peer-store/src/behaviour.rs @@ -70,6 +70,17 @@ where self.pending_events .push_back(Event::RecordUpdated { peer: *peer }); } + + /// Get a immutable reference to the internal store. + pub fn store(&self) -> &S { + &self.store + } + + /// Get a mutable reference to the internal store. + pub fn store_mut(&mut self) -> &mut S { + &mut self.store + } + fn on_peer_connect(&mut self, peer: &PeerId) { self.connected_peers.insert(*peer); } diff --git a/misc/peer-store/src/memory_store.rs b/misc/peer-store/src/memory_store.rs index 0b47eedec69..8e4fd0792f0 100644 --- a/misc/peer-store/src/memory_store.rs +++ b/misc/peer-store/src/memory_store.rs @@ -8,7 +8,7 @@ use libp2p_core::{Multiaddr, PeerId}; use libp2p_swarm::FromSwarm; use super::{store::Event, Store}; -use crate::store::AddressSource; +use crate::{store::AddressSource, Behaviour}; /// A in-memory store. #[derive(Default)] @@ -87,13 +87,6 @@ impl<'a> Store<'a> for MemoryStore { .map(|record| record.records().map(|r| r.address)) } - fn address_record_of_peer( - &'a self, - peer: &PeerId, - ) -> Option>> { - self.address_book.get(peer).map(|record| record.records()) - } - fn check_ttl(&mut self) { let now = Instant::now(); for r in &mut self.address_book.values_mut() { @@ -102,6 +95,19 @@ impl<'a> Store<'a> for MemoryStore { } } +impl Behaviour { + /// Get all stored address records of the peer. + pub fn address_record_of_peer( + &self, + peer: &PeerId, + ) -> Option> { + self.store() + .address_book + .get(peer) + .map(|record| record.records()) + } +} + pub struct Config { /// TTL for a record. record_ttl: Duration, diff --git a/misc/peer-store/src/store.rs b/misc/peer-store/src/store.rs index 5ac5d5082e3..86d8c14832a 100644 --- a/misc/peer-store/src/store.rs +++ b/misc/peer-store/src/store.rs @@ -26,13 +26,7 @@ pub trait Store<'a> { /// Get all stored addresses of the peer. fn addresses_of_peer(&self, peer: &PeerId) -> Option>; - /// Get all stored address records of the peer. - fn address_record_of_peer( - &'a self, - peer: &PeerId, - ) -> Option>; - - /// C + /// Trigger grabage collection for records. fn check_ttl(&mut self); } From 92ac2fb6b749546d7fc65f34d63f59355b21f25b Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Fri, 20 Dec 2024 14:46:29 +0800 Subject: [PATCH 16/26] manifest and changelog --- misc/peer-store/CHANGELOG.md | 4 ++++ misc/peer-store/Cargo.toml | 5 ++++- 2 files changed, 8 insertions(+), 1 deletion(-) create mode 100644 misc/peer-store/CHANGELOG.md diff --git a/misc/peer-store/CHANGELOG.md b/misc/peer-store/CHANGELOG.md new file mode 100644 index 00000000000..8353d77b5cf --- /dev/null +++ b/misc/peer-store/CHANGELOG.md @@ -0,0 +1,4 @@ +## 0.1.0 + +- Introduce `libp2p-peer-store`. + See [PR 5724](https://github.com/libp2p/rust-libp2p/pull/5724). diff --git a/misc/peer-store/Cargo.toml b/misc/peer-store/Cargo.toml index 072ad6979d3..94b4ce4ffd5 100644 --- a/misc/peer-store/Cargo.toml +++ b/misc/peer-store/Cargo.toml @@ -1,7 +1,10 @@ [package] name = "libp2p-peer-store" -version = "0.1.0" edition = "2021" +version = "0.1.0" +authors = ["drHuangMHT "] +license = "MIT" +repository = "https://github.com/libp2p/rust-libp2p" rust-version.workspace = true [dependencies] From b8a7114e2bf758b9b13a62a9bc834b2f09a423df Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Fri, 20 Dec 2024 15:08:52 +0800 Subject: [PATCH 17/26] export at libp2p crate root --- Cargo.lock | 1 + Cargo.toml | 2 ++ libp2p/Cargo.toml | 2 ++ libp2p/src/lib.rs | 2 ++ misc/peer-store/src/lib.rs | 5 ++--- misc/peer-store/src/memory_store.rs | 2 +- 6 files changed, 10 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7c08ce48a72..989259ddb7c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2680,6 +2680,7 @@ dependencies = [ "libp2p-metrics", "libp2p-mplex", "libp2p-noise", + "libp2p-peer-store", "libp2p-ping", "libp2p-plaintext", "libp2p-pnet", diff --git a/Cargo.toml b/Cargo.toml index a72c4c3403e..f2328d7e388 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ members = [ "misc/memory-connection-limits", "misc/metrics", "misc/multistream-select", + "misc/peer-store", "misc/quick-protobuf-codec", "misc/quickcheck-ext", "misc/rw-stream-sink", @@ -89,6 +90,7 @@ libp2p-memory-connection-limits = { version = "0.3.1", path = "misc/memory-conne libp2p-metrics = { version = "0.15.0", path = "misc/metrics" } libp2p-mplex = { version = "0.42.0", path = "muxers/mplex" } libp2p-noise = { version = "0.45.1", path = "transports/noise" } +libp2p-peer-store = { version = "0.1.0", path = "misc/peer-store" } libp2p-perf = { version = "0.4.0", path = "protocols/perf" } libp2p-ping = { version = "0.45.1", path = "protocols/ping" } libp2p-plaintext = { version = "0.42.0", path = "transports/plaintext" } diff --git a/libp2p/Cargo.toml b/libp2p/Cargo.toml index 3d44e0bc43c..ac05152f6e4 100644 --- a/libp2p/Cargo.toml +++ b/libp2p/Cargo.toml @@ -68,6 +68,7 @@ mdns = ["dep:libp2p-mdns"] memory-connection-limits = ["dep:libp2p-memory-connection-limits"] metrics = ["dep:libp2p-metrics"] noise = ["dep:libp2p-noise"] +# peer-store = ["dep:libp2p-peer-store"] ping = ["dep:libp2p-ping", "libp2p-metrics?/ping"] plaintext = ["dep:libp2p-plaintext"] pnet = ["dep:libp2p-pnet"] @@ -110,6 +111,7 @@ libp2p-identity = { workspace = true, features = ["rand"] } libp2p-kad = { workspace = true, optional = true } libp2p-metrics = { workspace = true, optional = true } libp2p-noise = { workspace = true, optional = true } +libp2p-peer-store = { workspace = true } libp2p-ping = { workspace = true, optional = true } libp2p-plaintext = { workspace = true, optional = true } libp2p-pnet = { workspace = true, optional = true } diff --git a/libp2p/src/lib.rs b/libp2p/src/lib.rs index 47e1142d0e9..13193b48eb6 100644 --- a/libp2p/src/lib.rs +++ b/libp2p/src/lib.rs @@ -81,6 +81,8 @@ pub use libp2p_metrics as metrics; #[cfg(feature = "noise")] #[doc(inline)] pub use libp2p_noise as noise; +#[doc(inline)] +pub use libp2p_peer_store as peer_store; #[cfg(feature = "ping")] #[doc(inline)] pub use libp2p_ping as ping; diff --git a/misc/peer-store/src/lib.rs b/misc/peer-store/src/lib.rs index afe7e457413..db771ed09f0 100644 --- a/misc/peer-store/src/lib.rs +++ b/misc/peer-store/src/lib.rs @@ -1,7 +1,6 @@ mod behaviour; -mod memory_store; +pub mod memory_store; mod store; -pub use behaviour::{Behaviour, Event}; -pub use memory_store::{AddressRecord, Config, MemoryStore}; +pub use behaviour::{Behaviour, Config, Event}; pub use store::Store; diff --git a/misc/peer-store/src/memory_store.rs b/misc/peer-store/src/memory_store.rs index 8e4fd0792f0..2567c4e0b16 100644 --- a/misc/peer-store/src/memory_store.rs +++ b/misc/peer-store/src/memory_store.rs @@ -100,7 +100,7 @@ impl Behaviour { pub fn address_record_of_peer( &self, peer: &PeerId, - ) -> Option> { + ) -> Option> { self.store() .address_book .get(peer) From 5f628fb485f810ed27677cd00d9fee1925b8d91c Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Fri, 20 Dec 2024 15:15:48 +0800 Subject: [PATCH 18/26] changelog for libp2p --- libp2p/CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/libp2p/CHANGELOG.md b/libp2p/CHANGELOG.md index 8b7bf0ff55f..5579af66b5a 100644 --- a/libp2p/CHANGELOG.md +++ b/libp2p/CHANGELOG.md @@ -1,3 +1,8 @@ +## 0.55.0(unreleased) + +- Introduce `libp2p-peer-store`. + See [PR 5724](https://github.com/libp2p/rust-libp2p/pull/5724). + ## 0.54.2 - Add `with_connection_timeout` on `SwarmBuilder` to allow configuration of the connection_timeout parameter. From 121e91b4e2cb21b744c4ac05605666a5b5527696 Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Tue, 31 Dec 2024 11:43:22 +0800 Subject: [PATCH 19/26] introduce PeerRecord --- misc/peer-store/src/behaviour.rs | 29 ++++++--- misc/peer-store/src/memory_store.rs | 93 +++++++++++++++++++++++++---- misc/peer-store/src/store.rs | 13 ++++ 3 files changed, 118 insertions(+), 17 deletions(-) diff --git a/misc/peer-store/src/behaviour.rs b/misc/peer-store/src/behaviour.rs index e512a5f47dc..fe733997a5b 100644 --- a/misc/peer-store/src/behaviour.rs +++ b/misc/peer-store/src/behaviour.rs @@ -71,6 +71,20 @@ where .push_back(Event::RecordUpdated { peer: *peer }); } + /// Should be called when other protocol emits a [`PeerRecord`](libp2p_core::PeerRecord). + /// This will always emit an `Event::RecordUpdated`. + pub fn on_signed_peer_record( + &mut self, + peer: &PeerId, + record: libp2p_core::PeerRecord, + source: AddressSource, + ) { + self.store + .update_certified_address(peer, record, source, false); + self.pending_events + .push_back(Event::RecordUpdated { peer: *peer }); + } + /// Get a immutable reference to the internal store. pub fn store(&self) -> &S { &self.store @@ -137,15 +151,16 @@ where _effective_role: libp2p_core::Endpoint, ) -> Result, libp2p_swarm::ConnectionDenied> { if maybe_peer.is_none() { - return Ok(Vec::with_capacity(0)); + return Ok(Vec::new()); } - if let Some(i) = self - .store - .addresses_of_peer(&maybe_peer.expect("already handled")) - { - return Ok(i.cloned().collect()); + let peer = maybe_peer.expect("already handled"); + if let Some(unsigned) = self.store.addresses_of_peer(&peer) { + if let Some(signed) = self.store.certified_addresses_of_peer(&peer) { + return Ok(signed.chain(unsigned).cloned().collect()); + } + return Ok(unsigned.cloned().collect()); } - Ok(Vec::with_capacity(0)) + Ok(Vec::new()) } fn handle_established_outbound_connection( diff --git a/misc/peer-store/src/memory_store.rs b/misc/peer-store/src/memory_store.rs index 2567c4e0b16..dbd58479d7a 100644 --- a/misc/peer-store/src/memory_store.rs +++ b/misc/peer-store/src/memory_store.rs @@ -46,6 +46,22 @@ impl<'a> Store<'a> for MemoryStore { true } + fn update_certified_address( + &mut self, + peer: &PeerId, + signed_address: libp2p_core::PeerRecord, + source: AddressSource, + should_expire: bool, + ) -> bool { + if let Some(record) = self.address_book.get_mut(peer) { + return record.update_certified_address(signed_address, source, should_expire); + } + let mut new_record = record::PeerAddressRecord::new(self.config.record_capacity); + new_record.update_certified_address(signed_address, source, should_expire); + self.address_book.insert(*peer, new_record); + true + } + fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool { if let Some(record) = self.address_book.get_mut(peer) { return record.remove_address(address); @@ -87,6 +103,20 @@ impl<'a> Store<'a> for MemoryStore { .map(|record| record.records().map(|r| r.address)) } + fn certified_addresses_of_peer( + &self, + peer: &PeerId, + ) -> Option> { + self.address_book.get(peer).and_then(|record| { + Some( + record + .records() + .filter(|r| r.signed.is_some()) + .map(|r| r.address), + ) + }) + } + fn check_ttl(&mut self) { let now = Instant::now(); for r in &mut self.address_book.values_mut() { @@ -104,7 +134,7 @@ impl Behaviour { self.store() .address_book .get(peer) - .map(|record| record.records()) + .and_then(|r| Some(r.records())) } } @@ -134,6 +164,8 @@ pub struct AddressRecord<'a> { pub source: AddressSource, /// Whether the address expires. pub should_expire: bool, + /// Whether the address is signed. + pub signed: Option<&'a libp2p_core::PeerRecord>, } impl AddressRecord<'_> { /// How much time has passed since the address is last reported wrt. the given instant. @@ -148,6 +180,9 @@ impl AddressRecord<'_> { } mod record { + use std::rc::Rc; + + use libp2p_core::PeerRecord; use lru::LruCache; use super::*; @@ -164,14 +199,21 @@ mod record { } } pub(crate) fn records(&self) -> impl Iterator { - self.addresses - .iter() - .map(|(address, record)| super::AddressRecord { + self.addresses.iter().map(|(address, record)| { + let AddressRecord { + last_seen, + source, + should_expire, + signature: signed, + } = record; + super::AddressRecord { + last_seen, address, - last_seen: &record.last_seen, - source: record.source, - should_expire: record.should_expire, - }) + should_expire: *should_expire, + source: *source, + signed: signed.as_ref().map(|i| i.as_ref()), + } + }) } pub(crate) fn update_address( &mut self, @@ -185,10 +227,32 @@ mod record { } // new record won't call `Instant::now()` twice self.addresses.get_or_insert(address.clone(), || { - AddressRecord::new(source, should_expire) + AddressRecord::new(source, should_expire, None) }); true } + pub(crate) fn update_certified_address( + &mut self, + signed_record: PeerRecord, + source: AddressSource, + should_expire: bool, + ) -> bool { + let mut is_updated = false; + let signed_record = Rc::new(signed_record); + for address in signed_record.addresses() { + // promote the address or update with the latest signature. + if let Some(r) = self.addresses.get_mut(address) { + r.signature = Some(signed_record.clone()); + continue; + } + // the address is not present. this defers cloning. + self.addresses.get_or_insert(address.clone(), || { + AddressRecord::new(source, should_expire, Some(signed_record.clone())) + }); + is_updated = true; + } + is_updated + } pub(crate) fn remove_address(&mut self, address: &Multiaddr) -> bool { self.addresses.pop(address).is_some() } @@ -212,13 +276,22 @@ mod record { source: AddressSource, /// Whether the address will expire. should_expire: bool, + /// Reference to the `PeerRecord` that contains this address. + /// The inner `PeerRecord` will be dropped automatically + /// when there is no living reference to it. + signature: Option>, } impl AddressRecord { - pub(crate) fn new(source: AddressSource, should_expire: bool) -> Self { + pub(crate) fn new( + source: AddressSource, + should_expire: bool, + signed: Option>, + ) -> Self { Self { last_seen: Instant::now(), source, should_expire, + signature: signed, } } pub(crate) fn update_last_seen(&mut self) { diff --git a/misc/peer-store/src/store.rs b/misc/peer-store/src/store.rs index 86d8c14832a..1bc7bedc353 100644 --- a/misc/peer-store/src/store.rs +++ b/misc/peer-store/src/store.rs @@ -16,6 +16,16 @@ pub trait Store<'a> { should_expire: bool, ) -> bool; + /// Update an address record. + /// Returns `true` when the address is new. + fn update_certified_address( + &mut self, + peer: &PeerId, + record: libp2p_core::PeerRecord, + source: AddressSource, + should_expire: bool, + ) -> bool; + /// Remove an address record. /// Returns `true` when the address is removed. fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool; @@ -26,6 +36,9 @@ pub trait Store<'a> { /// Get all stored addresses of the peer. fn addresses_of_peer(&self, peer: &PeerId) -> Option>; + /// Get addresses of the peer that have been signed. + fn certified_addresses_of_peer(&self, peer: &PeerId) -> Option>; + /// Trigger grabage collection for records. fn check_ttl(&mut self); } From 47048ec41413886549e3027d9a3e9cae7f8538a7 Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Tue, 31 Dec 2024 19:47:03 +0800 Subject: [PATCH 20/26] remove borrowed type and unused associated type --- Cargo.toml | 4 +- misc/peer-store/src/behaviour.rs | 24 ++++---- misc/peer-store/src/memory_store.rs | 90 ++++++----------------------- misc/peer-store/src/store.rs | 9 +-- 4 files changed, 34 insertions(+), 93 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f2328d7e388..11a209ab4c5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,7 +65,7 @@ members = [ "transports/websocket-websys", "transports/websocket", "transports/webtransport-websys", - "wasm-tests/webtransport-tests", "misc/peer-store", + "wasm-tests/webtransport-tests", ] resolver = "2" @@ -102,7 +102,7 @@ libp2p-request-response = { version = "0.28.0", path = "protocols/request-respon libp2p-server = { version = "0.12.8", path = "misc/server" } libp2p-stream = { version = "0.2.0-alpha.1", path = "protocols/stream" } libp2p-swarm = { version = "0.45.2", path = "swarm" } -libp2p-swarm-derive = { version = "=0.35.0", path = "swarm-derive" } # `libp2p-swarm-derive` may not be compatible with different `libp2p-swarm` non-breaking releases. E.g. `libp2p-swarm` might introduce a new enum variant `FromSwarm` (which is `#[non-exhaustive]`) in a non-breaking release. Older versions of `libp2p-swarm-derive` would not forward this enum variant within the `NetworkBehaviour` hierarchy. Thus the version pinning is required. +libp2p-swarm-derive = { version = "=0.35.0", path = "swarm-derive" } # `libp2p-swarm-derive` may not be compatible with different `libp2p-swarm` non-breaking releases. E.g. `libp2p-swarm` might introduce a new enum variant `FromSwarm` (which is `#[non-exhaustive]`) in a non-breaking release. Older versions of `libp2p-swarm-derive` would not forward this enum variant within the `NetworkBehaviour` hierarchy. Thus the version pinning is required. libp2p-swarm-test = { version = "0.5.0", path = "swarm-test" } libp2p-tcp = { version = "0.42.0", path = "transports/tcp" } libp2p-tls = { version = "0.5.0", path = "transports/tls" } diff --git a/misc/peer-store/src/behaviour.rs b/misc/peer-store/src/behaviour.rs index fe733997a5b..596ba6b1a02 100644 --- a/misc/peer-store/src/behaviour.rs +++ b/misc/peer-store/src/behaviour.rs @@ -36,7 +36,7 @@ pub struct Behaviour { impl<'a, S> Behaviour where - S: Store<'a> + 'static, + S: Store + 'static, { pub fn new(store: S, config: Config) -> Self { Self { @@ -124,9 +124,9 @@ where } } -impl<'a, S> NetworkBehaviour for Behaviour +impl NetworkBehaviour for Behaviour where - S: Store<'a> + 'static, + S: Store + 'static, { type ConnectionHandler = dummy::ConnectionHandler; @@ -154,13 +154,11 @@ where return Ok(Vec::new()); } let peer = maybe_peer.expect("already handled"); - if let Some(unsigned) = self.store.addresses_of_peer(&peer) { - if let Some(signed) = self.store.certified_addresses_of_peer(&peer) { - return Ok(signed.chain(unsigned).cloned().collect()); - } - return Ok(unsigned.cloned().collect()); - } - Ok(Vec::new()) + Ok(self + .store + .addresses_of_peer(&peer) + .map(|i| i.cloned().collect()) + .unwrap_or_default()) } fn handle_established_outbound_connection( @@ -216,15 +214,15 @@ where } } -impl<'a, S> Behaviour +impl Behaviour where - S: Store<'a>, + S: Store, { /// Garbage collect records. fn poll_record_ttl(&mut self, cx: &mut std::task::Context<'_>) { if let Some(mut timer) = self.record_ttl_timer.take() { if let Poll::Ready(()) = timer.poll_unpin(cx) { - self.store.check_ttl(); + self.store.poll(); self.record_ttl_timer = Some(Delay::new(self.config.check_record_ttl_interval)); } self.record_ttl_timer = Some(timer) diff --git a/misc/peer-store/src/memory_store.rs b/misc/peer-store/src/memory_store.rs index dbd58479d7a..7338284bb56 100644 --- a/misc/peer-store/src/memory_store.rs +++ b/misc/peer-store/src/memory_store.rs @@ -27,8 +27,8 @@ impl MemoryStore { } } -impl<'a> Store<'a> for MemoryStore { - type AddressRecord = AddressRecord<'a>; +impl Store for MemoryStore { + type Event = (); fn update_address( &mut self, @@ -100,28 +100,15 @@ impl<'a> Store<'a> for MemoryStore { fn addresses_of_peer(&self, peer: &PeerId) -> Option> { self.address_book .get(peer) - .map(|record| record.records().map(|r| r.address)) + .map(|record| record.records().map(|(addr, _)| addr)) } - fn certified_addresses_of_peer( - &self, - peer: &PeerId, - ) -> Option> { - self.address_book.get(peer).and_then(|record| { - Some( - record - .records() - .filter(|r| r.signed.is_some()) - .map(|r| r.address), - ) - }) - } - - fn check_ttl(&mut self) { + fn poll(&mut self) -> Option<()> { let now = Instant::now(); for r in &mut self.address_book.values_mut() { r.check_ttl(now, self.config.record_ttl); } + None } } @@ -130,11 +117,8 @@ impl Behaviour { pub fn address_record_of_peer( &self, peer: &PeerId, - ) -> Option> { - self.store() - .address_book - .get(peer) - .and_then(|r| Some(r.records())) + ) -> Option> { + self.store().address_book.get(peer).map(|r| r.records()) } } @@ -155,30 +139,6 @@ impl Default for Config { } } -pub struct AddressRecord<'a> { - /// The last time we saw this address. - last_seen: &'a Instant, - /// The address of this record. - pub address: &'a Multiaddr, - /// How we observed the address. - pub source: AddressSource, - /// Whether the address expires. - pub should_expire: bool, - /// Whether the address is signed. - pub signed: Option<&'a libp2p_core::PeerRecord>, -} -impl AddressRecord<'_> { - /// How much time has passed since the address is last reported wrt. the given instant. - pub fn last_seen_since(&self, now: Instant) -> Duration { - now.duration_since(*self.last_seen) - } - /// How much time has passed since the address is last reported wrt. current time. - pub fn last_seen(&self) -> Duration { - let now = Instant::now(); - now.duration_since(*self.last_seen) - } -} - mod record { use std::rc::Rc; @@ -198,22 +158,8 @@ mod record { addresses: LruCache::new(capacity), } } - pub(crate) fn records(&self) -> impl Iterator { - self.addresses.iter().map(|(address, record)| { - let AddressRecord { - last_seen, - source, - should_expire, - signature: signed, - } = record; - super::AddressRecord { - last_seen, - address, - should_expire: *should_expire, - source: *source, - signed: signed.as_ref().map(|i| i.as_ref()), - } - }) + pub(crate) fn records(&self) -> impl Iterator { + self.addresses.iter() } pub(crate) fn update_address( &mut self, @@ -269,17 +215,17 @@ mod record { } } - pub(crate) struct AddressRecord { + pub struct AddressRecord { /// The time when the address is last seen. - last_seen: Instant, + pub last_seen: Instant, /// How the address is discovered. - source: AddressSource, + pub source: AddressSource, /// Whether the address will expire. - should_expire: bool, + pub should_expire: bool, /// Reference to the `PeerRecord` that contains this address. - /// The inner `PeerRecord` will be dropped automatically + /// The inner `PeerRecord` will be dropped automatically /// when there is no living reference to it. - signature: Option>, + pub signature: Option>, } impl AddressRecord { pub(crate) fn new( @@ -335,7 +281,7 @@ mod test { true, ); thread::sleep(Duration::from_millis(2)); - store.check_ttl(); + store.poll(); assert!(!store .addresses_of_peer(&fake_peer) .expect("peer to be in the store") @@ -372,7 +318,7 @@ mod test { .records() .last() .expect("addr in the record") - .address + .0 == addr1 ); store.update_address( @@ -389,7 +335,7 @@ mod test { .records() .last() .expect("addr in the record") - .address + .0 == addr2 ); } diff --git a/misc/peer-store/src/store.rs b/misc/peer-store/src/store.rs index 1bc7bedc353..eb8bdd00562 100644 --- a/misc/peer-store/src/store.rs +++ b/misc/peer-store/src/store.rs @@ -3,8 +3,8 @@ use libp2p_swarm::FromSwarm; /// A store that /// - contains all observed addresses of peers; -pub trait Store<'a> { - type AddressRecord; +pub trait Store { + type Event; /// Update an address record. /// Returns `true` when the address is new. @@ -36,11 +36,8 @@ pub trait Store<'a> { /// Get all stored addresses of the peer. fn addresses_of_peer(&self, peer: &PeerId) -> Option>; - /// Get addresses of the peer that have been signed. - fn certified_addresses_of_peer(&self, peer: &PeerId) -> Option>; - /// Trigger grabage collection for records. - fn check_ttl(&mut self); + fn poll(&mut self) -> Option; } pub enum Event { From 9da96ff264db2862721c03dd77a3c1ca8435f683 Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Thu, 2 Jan 2025 15:56:14 +0800 Subject: [PATCH 21/26] move garbage collection to Store level --- misc/peer-store/src/behaviour.rs | 32 ++--------------------------- misc/peer-store/src/lib.rs | 2 +- misc/peer-store/src/memory_store.rs | 29 +++++++++++++++++++++----- misc/peer-store/src/store.rs | 4 +++- 4 files changed, 30 insertions(+), 37 deletions(-) diff --git a/misc/peer-store/src/behaviour.rs b/misc/peer-store/src/behaviour.rs index 596ba6b1a02..6d119dd46b6 100644 --- a/misc/peer-store/src/behaviour.rs +++ b/misc/peer-store/src/behaviour.rs @@ -1,11 +1,8 @@ use std::{ collections::{HashSet, VecDeque}, task::Poll, - time::Duration, }; -use futures_timer::Delay; -use futures_util::FutureExt; use libp2p_core::{Multiaddr, PeerId}; use libp2p_swarm::{dummy, FromSwarm, NetworkBehaviour}; @@ -19,32 +16,23 @@ pub enum Event { RecordUpdated { peer: PeerId }, } -pub struct Config { - /// The interval for garbage collecting records. - check_record_ttl_interval: Duration, -} - pub struct Behaviour { store: S, /// Peers that are currently connected. connected_peers: HashSet, /// Pending Events to be emitted back to the [`libp2p_swarm::Swarm`]. pending_events: VecDeque, - record_ttl_timer: Option, - config: Config, } impl<'a, S> Behaviour where S: Store + 'static, { - pub fn new(store: S, config: Config) -> Self { + pub fn new(store: S) -> Self { Self { store, connected_peers: HashSet::new(), pending_events: VecDeque::new(), - record_ttl_timer: Some(Delay::new(config.check_record_ttl_interval)), - config, } } @@ -209,23 +197,7 @@ where if let Some(ev) = self.pending_events.pop_front() { return Poll::Ready(libp2p_swarm::ToSwarm::GenerateEvent(ev)); } - self.poll_record_ttl(cx); + self.store.poll(cx); Poll::Pending } } - -impl Behaviour -where - S: Store, -{ - /// Garbage collect records. - fn poll_record_ttl(&mut self, cx: &mut std::task::Context<'_>) { - if let Some(mut timer) = self.record_ttl_timer.take() { - if let Poll::Ready(()) = timer.poll_unpin(cx) { - self.store.poll(); - self.record_ttl_timer = Some(Delay::new(self.config.check_record_ttl_interval)); - } - self.record_ttl_timer = Some(timer) - } - } -} diff --git a/misc/peer-store/src/lib.rs b/misc/peer-store/src/lib.rs index db771ed09f0..819fe41cd2f 100644 --- a/misc/peer-store/src/lib.rs +++ b/misc/peer-store/src/lib.rs @@ -2,5 +2,5 @@ mod behaviour; pub mod memory_store; mod store; -pub use behaviour::{Behaviour, Config, Event}; +pub use behaviour::{Behaviour, Event}; pub use store::Store; diff --git a/misc/peer-store/src/memory_store.rs b/misc/peer-store/src/memory_store.rs index 7338284bb56..4ff99f794ae 100644 --- a/misc/peer-store/src/memory_store.rs +++ b/misc/peer-store/src/memory_store.rs @@ -1,9 +1,12 @@ use std::{ collections::HashMap, num::NonZeroUsize, + task::{Context, Poll}, time::{Duration, Instant}, }; +use futures_timer::Delay; +use futures_util::FutureExt; use libp2p_core::{Multiaddr, PeerId}; use libp2p_swarm::FromSwarm; @@ -15,6 +18,7 @@ use crate::{store::AddressSource, Behaviour}; pub struct MemoryStore { /// An address book of peers regardless of their status(connected or not). address_book: HashMap, + record_ttl_timer: Option, config: Config, } @@ -25,6 +29,13 @@ impl MemoryStore { ..Default::default() } } + + fn check_record_ttl(&mut self) { + let now = Instant::now(); + for r in &mut self.address_book.values_mut() { + r.check_ttl(now, self.config.record_ttl); + } + } } impl Store for MemoryStore { @@ -103,10 +114,13 @@ impl Store for MemoryStore { .map(|record| record.records().map(|(addr, _)| addr)) } - fn poll(&mut self) -> Option<()> { - let now = Instant::now(); - for r in &mut self.address_book.values_mut() { - r.check_ttl(now, self.config.record_ttl); + fn poll(&mut self, cx: &mut Context<'_>) -> Option<()> { + if let Some(mut timer) = self.record_ttl_timer.take() { + if let Poll::Ready(()) = timer.poll_unpin(cx) { + self.check_record_ttl(); + self.record_ttl_timer = Some(Delay::new(self.config.check_record_ttl_interval)); + } + self.record_ttl_timer = Some(timer) } None } @@ -125,9 +139,12 @@ impl Behaviour { pub struct Config { /// TTL for a record. record_ttl: Duration, + /// The capacaity of a record store. /// The least used record will be discarded when the store is full. record_capacity: NonZeroUsize, + /// The interval for garbage collecting records. + check_record_ttl_interval: Duration, } impl Default for Config { @@ -135,6 +152,7 @@ impl Default for Config { Self { record_ttl: Duration::from_secs(600), record_capacity: NonZeroUsize::try_from(8).expect("8 > 0"), + check_record_ttl_interval: Duration::from_secs(5), } } } @@ -263,6 +281,7 @@ mod test { let config = Config { record_capacity: NonZeroUsize::try_from(4).expect("4 > 0"), record_ttl: Duration::from_millis(1), + ..Default::default() }; let mut store = MemoryStore::new(config); let fake_peer = PeerId::random(); @@ -281,7 +300,7 @@ mod test { true, ); thread::sleep(Duration::from_millis(2)); - store.poll(); + store.check_record_ttl(); assert!(!store .addresses_of_peer(&fake_peer) .expect("peer to be in the store") diff --git a/misc/peer-store/src/store.rs b/misc/peer-store/src/store.rs index eb8bdd00562..61301e3b994 100644 --- a/misc/peer-store/src/store.rs +++ b/misc/peer-store/src/store.rs @@ -1,3 +1,5 @@ +use std::task::Context; + use libp2p_core::{Multiaddr, PeerId}; use libp2p_swarm::FromSwarm; @@ -37,7 +39,7 @@ pub trait Store { fn addresses_of_peer(&self, peer: &PeerId) -> Option>; /// Trigger grabage collection for records. - fn poll(&mut self) -> Option; + fn poll(&mut self, cx: &mut Context<'_>) -> Option; } pub enum Event { From 9945fd04aa56e1e93870719fabf3c38a258c016c Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Thu, 2 Jan 2025 16:02:42 +0800 Subject: [PATCH 22/26] delay cloning of PeerRecord --- misc/peer-store/src/behaviour.rs | 406 ++++++++++++++-------------- misc/peer-store/src/memory_store.rs | 16 +- misc/peer-store/src/store.rs | 3 +- 3 files changed, 212 insertions(+), 213 deletions(-) diff --git a/misc/peer-store/src/behaviour.rs b/misc/peer-store/src/behaviour.rs index 6d119dd46b6..b381a6b9101 100644 --- a/misc/peer-store/src/behaviour.rs +++ b/misc/peer-store/src/behaviour.rs @@ -1,203 +1,203 @@ -use std::{ - collections::{HashSet, VecDeque}, - task::Poll, -}; - -use libp2p_core::{Multiaddr, PeerId}; -use libp2p_swarm::{dummy, FromSwarm, NetworkBehaviour}; - -use crate::store::{AddressSource, Store}; - -/// Events generated by [`Behaviour`] and emitted back to the [`libp2p_swarm::Swarm`]. -pub enum Event { - /// The peer's record has been updated. - /// Manually updating a record will always emit this event - /// even if it provides no new information. - RecordUpdated { peer: PeerId }, -} - -pub struct Behaviour { - store: S, - /// Peers that are currently connected. - connected_peers: HashSet, - /// Pending Events to be emitted back to the [`libp2p_swarm::Swarm`]. - pending_events: VecDeque, -} - -impl<'a, S> Behaviour -where - S: Store + 'static, -{ - pub fn new(store: S) -> Self { - Self { - store, - connected_peers: HashSet::new(), - pending_events: VecDeque::new(), - } - } - - /// List peers that are currently connected to this peer. - pub fn list_connected(&self) -> impl Iterator { - self.connected_peers.iter() - } - - /// Try to get all observed address of the given peer. - /// Returns `None` when the peer is not in the store. - pub fn address_of_peer<'b>( - &'a self, - peer: &'b PeerId, - ) -> Option + use<'a, 'b, S>> { - self.store.addresses_of_peer(peer) - } - - /// Manually update a record. - /// This will always emit an `Event::RecordUpdated`. - pub fn update_address(&mut self, peer: &PeerId, address: &Multiaddr) { - self.store - .update_address(peer, address, AddressSource::Manual, false); - self.pending_events - .push_back(Event::RecordUpdated { peer: *peer }); - } - - /// Should be called when other protocol emits a [`PeerRecord`](libp2p_core::PeerRecord). - /// This will always emit an `Event::RecordUpdated`. - pub fn on_signed_peer_record( - &mut self, - peer: &PeerId, - record: libp2p_core::PeerRecord, - source: AddressSource, - ) { - self.store - .update_certified_address(peer, record, source, false); - self.pending_events - .push_back(Event::RecordUpdated { peer: *peer }); - } - - /// Get a immutable reference to the internal store. - pub fn store(&self) -> &S { - &self.store - } - - /// Get a mutable reference to the internal store. - pub fn store_mut(&mut self) -> &mut S { - &mut self.store - } - - fn on_peer_connect(&mut self, peer: &PeerId) { - self.connected_peers.insert(*peer); - } - fn on_peer_disconnect(&mut self, peer: &PeerId) { - self.connected_peers.remove(peer); - } - fn on_address_update( - &mut self, - peer: &PeerId, - address: &Multiaddr, - source: AddressSource, - should_expire: bool, - ) { - if self - .store - .update_address(peer, address, source, should_expire) - { - self.pending_events - .push_back(Event::RecordUpdated { peer: *peer }); - } - } - fn handle_store_event(&mut self, event: super::store::Event) { - use super::store::Event::*; - match event { - RecordUpdated(peer) => self.pending_events.push_back(Event::RecordUpdated { peer }), - } - } -} - -impl NetworkBehaviour for Behaviour -where - S: Store + 'static, -{ - type ConnectionHandler = dummy::ConnectionHandler; - - type ToSwarm = Event; - - fn handle_established_inbound_connection( - &mut self, - _connection_id: libp2p_swarm::ConnectionId, - peer: libp2p_core::PeerId, - _local_addr: &libp2p_core::Multiaddr, - remote_addr: &libp2p_core::Multiaddr, - ) -> Result, libp2p_swarm::ConnectionDenied> { - self.on_address_update(&peer, remote_addr, AddressSource::DirectConnection, false); - Ok(dummy::ConnectionHandler) - } - - fn handle_pending_outbound_connection( - &mut self, - _connection_id: libp2p_swarm::ConnectionId, - maybe_peer: Option, - _addresses: &[Multiaddr], - _effective_role: libp2p_core::Endpoint, - ) -> Result, libp2p_swarm::ConnectionDenied> { - if maybe_peer.is_none() { - return Ok(Vec::new()); - } - let peer = maybe_peer.expect("already handled"); - Ok(self - .store - .addresses_of_peer(&peer) - .map(|i| i.cloned().collect()) - .unwrap_or_default()) - } - - fn handle_established_outbound_connection( - &mut self, - _connection_id: libp2p_swarm::ConnectionId, - peer: libp2p_core::PeerId, - addr: &libp2p_core::Multiaddr, - _role_override: libp2p_core::Endpoint, - _port_use: libp2p_core::transport::PortUse, - ) -> Result, libp2p_swarm::ConnectionDenied> { - self.on_address_update(&peer, addr, AddressSource::DirectConnection, false); - Ok(dummy::ConnectionHandler) - } - - fn on_swarm_event(&mut self, event: libp2p_swarm::FromSwarm) { - if let Some(ev) = self.store.on_swarm_event(&event) { - self.handle_store_event(ev); - }; - match event { - FromSwarm::ConnectionClosed(info) => { - if info.remaining_established < 1 { - self.on_peer_disconnect(&info.peer_id); - } - } - FromSwarm::ConnectionEstablished(info) => { - if info.other_established == 0 { - self.on_peer_connect(&info.peer_id); - } - } - _ => {} - } - } - - fn on_connection_handler_event( - &mut self, - _peer_id: libp2p_core::PeerId, - _connection_id: libp2p_swarm::ConnectionId, - _event: libp2p_swarm::THandlerOutEvent, - ) { - unreachable!("No event will be produced by a dummy handler.") - } - - fn poll( - &mut self, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll>> - { - if let Some(ev) = self.pending_events.pop_front() { - return Poll::Ready(libp2p_swarm::ToSwarm::GenerateEvent(ev)); - } - self.store.poll(cx); - Poll::Pending - } -} +use std::{ + collections::{HashSet, VecDeque}, + task::Poll, +}; + +use libp2p_core::{Multiaddr, PeerId}; +use libp2p_swarm::{dummy, FromSwarm, NetworkBehaviour}; + +use crate::store::{AddressSource, Store}; + +/// Events generated by [`Behaviour`] and emitted back to the [`libp2p_swarm::Swarm`]. +pub enum Event { + /// The peer's record has been updated. + /// Manually updating a record will always emit this event + /// even if it provides no new information. + RecordUpdated { peer: PeerId }, +} + +pub struct Behaviour { + store: S, + /// Peers that are currently connected. + connected_peers: HashSet, + /// Pending Events to be emitted back to the [`libp2p_swarm::Swarm`]. + pending_events: VecDeque, +} + +impl<'a, S> Behaviour +where + S: Store + 'static, +{ + pub fn new(store: S) -> Self { + Self { + store, + connected_peers: HashSet::new(), + pending_events: VecDeque::new(), + } + } + + /// List peers that are currently connected to this peer. + pub fn list_connected(&self) -> impl Iterator { + self.connected_peers.iter() + } + + /// Try to get all observed address of the given peer. + /// Returns `None` when the peer is not in the store. + pub fn address_of_peer<'b>( + &'a self, + peer: &'b PeerId, + ) -> Option + use<'a, 'b, S>> { + self.store.addresses_of_peer(peer) + } + + /// Manually update a record. + /// This will always emit an `Event::RecordUpdated`. + pub fn update_address(&mut self, peer: &PeerId, address: &Multiaddr) { + self.store + .update_address(peer, address, AddressSource::Manual, false); + self.pending_events + .push_back(Event::RecordUpdated { peer: *peer }); + } + + /// Should be called when other protocol emits a [`PeerRecord`](libp2p_core::PeerRecord). + /// This will always emit an `Event::RecordUpdated`. + pub fn on_signed_peer_record( + &mut self, + signed_record: &libp2p_core::PeerRecord, + source: AddressSource, + ) { + self.store + .update_certified_address(signed_record, source, false); + self.pending_events.push_back(Event::RecordUpdated { + peer: signed_record.peer_id(), + }); + } + + /// Get a immutable reference to the internal store. + pub fn store(&self) -> &S { + &self.store + } + + /// Get a mutable reference to the internal store. + pub fn store_mut(&mut self) -> &mut S { + &mut self.store + } + + fn on_peer_connect(&mut self, peer: &PeerId) { + self.connected_peers.insert(*peer); + } + fn on_peer_disconnect(&mut self, peer: &PeerId) { + self.connected_peers.remove(peer); + } + fn on_address_update( + &mut self, + peer: &PeerId, + address: &Multiaddr, + source: AddressSource, + should_expire: bool, + ) { + if self + .store + .update_address(peer, address, source, should_expire) + { + self.pending_events + .push_back(Event::RecordUpdated { peer: *peer }); + } + } + fn handle_store_event(&mut self, event: super::store::Event) { + use super::store::Event::*; + match event { + RecordUpdated(peer) => self.pending_events.push_back(Event::RecordUpdated { peer }), + } + } +} + +impl NetworkBehaviour for Behaviour +where + S: Store + 'static, +{ + type ConnectionHandler = dummy::ConnectionHandler; + + type ToSwarm = Event; + + fn handle_established_inbound_connection( + &mut self, + _connection_id: libp2p_swarm::ConnectionId, + peer: libp2p_core::PeerId, + _local_addr: &libp2p_core::Multiaddr, + remote_addr: &libp2p_core::Multiaddr, + ) -> Result, libp2p_swarm::ConnectionDenied> { + self.on_address_update(&peer, remote_addr, AddressSource::DirectConnection, false); + Ok(dummy::ConnectionHandler) + } + + fn handle_pending_outbound_connection( + &mut self, + _connection_id: libp2p_swarm::ConnectionId, + maybe_peer: Option, + _addresses: &[Multiaddr], + _effective_role: libp2p_core::Endpoint, + ) -> Result, libp2p_swarm::ConnectionDenied> { + if maybe_peer.is_none() { + return Ok(Vec::new()); + } + let peer = maybe_peer.expect("already handled"); + Ok(self + .store + .addresses_of_peer(&peer) + .map(|i| i.cloned().collect()) + .unwrap_or_default()) + } + + fn handle_established_outbound_connection( + &mut self, + _connection_id: libp2p_swarm::ConnectionId, + peer: libp2p_core::PeerId, + addr: &libp2p_core::Multiaddr, + _role_override: libp2p_core::Endpoint, + _port_use: libp2p_core::transport::PortUse, + ) -> Result, libp2p_swarm::ConnectionDenied> { + self.on_address_update(&peer, addr, AddressSource::DirectConnection, false); + Ok(dummy::ConnectionHandler) + } + + fn on_swarm_event(&mut self, event: libp2p_swarm::FromSwarm) { + if let Some(ev) = self.store.on_swarm_event(&event) { + self.handle_store_event(ev); + }; + match event { + FromSwarm::ConnectionClosed(info) => { + if info.remaining_established < 1 { + self.on_peer_disconnect(&info.peer_id); + } + } + FromSwarm::ConnectionEstablished(info) => { + if info.other_established == 0 { + self.on_peer_connect(&info.peer_id); + } + } + _ => {} + } + } + + fn on_connection_handler_event( + &mut self, + _peer_id: libp2p_core::PeerId, + _connection_id: libp2p_swarm::ConnectionId, + _event: libp2p_swarm::THandlerOutEvent, + ) { + unreachable!("No event will be produced by a dummy handler.") + } + + fn poll( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll>> + { + if let Some(ev) = self.pending_events.pop_front() { + return Poll::Ready(libp2p_swarm::ToSwarm::GenerateEvent(ev)); + } + self.store.poll(cx); + Poll::Pending + } +} diff --git a/misc/peer-store/src/memory_store.rs b/misc/peer-store/src/memory_store.rs index 4ff99f794ae..005441c5fe2 100644 --- a/misc/peer-store/src/memory_store.rs +++ b/misc/peer-store/src/memory_store.rs @@ -59,17 +59,17 @@ impl Store for MemoryStore { fn update_certified_address( &mut self, - peer: &PeerId, - signed_address: libp2p_core::PeerRecord, + signed_record: &libp2p_core::PeerRecord, source: AddressSource, should_expire: bool, ) -> bool { - if let Some(record) = self.address_book.get_mut(peer) { - return record.update_certified_address(signed_address, source, should_expire); + let peer = signed_record.peer_id(); + if let Some(record) = self.address_book.get_mut(&peer) { + return record.update_certified_address(signed_record, source, should_expire); } let mut new_record = record::PeerAddressRecord::new(self.config.record_capacity); - new_record.update_certified_address(signed_address, source, should_expire); - self.address_book.insert(*peer, new_record); + new_record.update_certified_address(signed_record, source, should_expire); + self.address_book.insert(peer, new_record); true } @@ -197,12 +197,12 @@ mod record { } pub(crate) fn update_certified_address( &mut self, - signed_record: PeerRecord, + signed_record: &PeerRecord, source: AddressSource, should_expire: bool, ) -> bool { let mut is_updated = false; - let signed_record = Rc::new(signed_record); + let signed_record = Rc::new(signed_record.clone()); for address in signed_record.addresses() { // promote the address or update with the latest signature. if let Some(r) = self.addresses.get_mut(address) { diff --git a/misc/peer-store/src/store.rs b/misc/peer-store/src/store.rs index 61301e3b994..cece6bf2826 100644 --- a/misc/peer-store/src/store.rs +++ b/misc/peer-store/src/store.rs @@ -22,8 +22,7 @@ pub trait Store { /// Returns `true` when the address is new. fn update_certified_address( &mut self, - peer: &PeerId, - record: libp2p_core::PeerRecord, + signed_record: &libp2p_core::PeerRecord, source: AddressSource, should_expire: bool, ) -> bool; From cb294c6070166d9281500e950d0f64820932e313 Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Fri, 3 Jan 2025 11:00:54 +0800 Subject: [PATCH 23/26] MemoryStore strict mode only provide signed address when strict_mode is set to true --- misc/peer-store/src/memory_store.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/misc/peer-store/src/memory_store.rs b/misc/peer-store/src/memory_store.rs index 005441c5fe2..97255dfd8b4 100644 --- a/misc/peer-store/src/memory_store.rs +++ b/misc/peer-store/src/memory_store.rs @@ -109,9 +109,12 @@ impl Store for MemoryStore { } fn addresses_of_peer(&self, peer: &PeerId) -> Option> { - self.address_book - .get(peer) - .map(|record| record.records().map(|(addr, _)| addr)) + self.address_book.get(peer).map(|record| { + record + .records() + .filter(|(_, r)| !self.config.strict_mode || r.signature.is_some()) + .map(|(addr, _)| addr) + }) } fn poll(&mut self, cx: &mut Context<'_>) -> Option<()> { @@ -127,7 +130,7 @@ impl Store for MemoryStore { } impl Behaviour { - /// Get all stored address records of the peer. + /// Get all stored address records of the peer, not affected by `strict_mode`. pub fn address_record_of_peer( &self, peer: &PeerId, @@ -139,12 +142,13 @@ impl Behaviour { pub struct Config { /// TTL for a record. record_ttl: Duration, - /// The capacaity of a record store. /// The least used record will be discarded when the store is full. record_capacity: NonZeroUsize, /// The interval for garbage collecting records. check_record_ttl_interval: Duration, + /// Only provide signed addresses to the behaviour when set to true. + strict_mode: bool, } impl Default for Config { @@ -153,6 +157,7 @@ impl Default for Config { record_ttl: Duration::from_secs(600), record_capacity: NonZeroUsize::try_from(8).expect("8 > 0"), check_record_ttl_interval: Duration::from_secs(5), + strict_mode: false, } } } From a9dc9aa4611006aa3fb120d38bada6770bd43b3b Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Wed, 8 Jan 2025 11:24:00 +0800 Subject: [PATCH 24/26] allow Store to report to Swarm --- misc/peer-store/src/behaviour.rs | 18 ++++++++++++------ misc/peer-store/src/memory_store.rs | 2 +- misc/peer-store/src/store.rs | 7 +++++-- 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/misc/peer-store/src/behaviour.rs b/misc/peer-store/src/behaviour.rs index b381a6b9101..8a04b3c5526 100644 --- a/misc/peer-store/src/behaviour.rs +++ b/misc/peer-store/src/behaviour.rs @@ -9,19 +9,22 @@ use libp2p_swarm::{dummy, FromSwarm, NetworkBehaviour}; use crate::store::{AddressSource, Store}; /// Events generated by [`Behaviour`] and emitted back to the [`libp2p_swarm::Swarm`]. -pub enum Event { +pub enum Event { /// The peer's record has been updated. /// Manually updating a record will always emit this event /// even if it provides no new information. - RecordUpdated { peer: PeerId }, + RecordUpdated { + peer: PeerId, + }, + Store(S::ToSwarm), } -pub struct Behaviour { +pub struct Behaviour { store: S, /// Peers that are currently connected. connected_peers: HashSet, /// Pending Events to be emitted back to the [`libp2p_swarm::Swarm`]. - pending_events: VecDeque, + pending_events: VecDeque>, } impl<'a, S> Behaviour @@ -115,10 +118,11 @@ where impl NetworkBehaviour for Behaviour where S: Store + 'static, + ::ToSwarm: Send + Sync, { type ConnectionHandler = dummy::ConnectionHandler; - type ToSwarm = Event; + type ToSwarm = Event; fn handle_established_inbound_connection( &mut self, @@ -197,7 +201,9 @@ where if let Some(ev) = self.pending_events.pop_front() { return Poll::Ready(libp2p_swarm::ToSwarm::GenerateEvent(ev)); } - self.store.poll(cx); + if let Some(ev) = self.store.poll(cx) { + self.pending_events.push_back(Event::Store(ev)); + }; Poll::Pending } } diff --git a/misc/peer-store/src/memory_store.rs b/misc/peer-store/src/memory_store.rs index 97255dfd8b4..594b5cd2097 100644 --- a/misc/peer-store/src/memory_store.rs +++ b/misc/peer-store/src/memory_store.rs @@ -39,7 +39,7 @@ impl MemoryStore { } impl Store for MemoryStore { - type Event = (); + type ToSwarm = (); fn update_address( &mut self, diff --git a/misc/peer-store/src/store.rs b/misc/peer-store/src/store.rs index cece6bf2826..24cce5a9d2a 100644 --- a/misc/peer-store/src/store.rs +++ b/misc/peer-store/src/store.rs @@ -6,7 +6,10 @@ use libp2p_swarm::FromSwarm; /// A store that /// - contains all observed addresses of peers; pub trait Store { - type Event; + + /// Event generated by the store and emitted to [`Swarm`](libp2p_swarm::Swarm). + /// The behaviour cannot handle this event. + type ToSwarm; /// Update an address record. /// Returns `true` when the address is new. @@ -38,7 +41,7 @@ pub trait Store { fn addresses_of_peer(&self, peer: &PeerId) -> Option>; /// Trigger grabage collection for records. - fn poll(&mut self, cx: &mut Context<'_>) -> Option; + fn poll(&mut self, cx: &mut Context<'_>) -> Option; } pub enum Event { From c94f63698be114aa2861792d4db9e3f45f6a31c6 Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Thu, 23 Jan 2025 20:28:12 +0800 Subject: [PATCH 25/26] don't store connected peers --- misc/peer-store/src/behaviour.rs | 34 ++------------------------------ 1 file changed, 2 insertions(+), 32 deletions(-) diff --git a/misc/peer-store/src/behaviour.rs b/misc/peer-store/src/behaviour.rs index 8a04b3c5526..107fbd252fc 100644 --- a/misc/peer-store/src/behaviour.rs +++ b/misc/peer-store/src/behaviour.rs @@ -1,10 +1,7 @@ -use std::{ - collections::{HashSet, VecDeque}, - task::Poll, -}; +use std::{collections::VecDeque, task::Poll}; use libp2p_core::{Multiaddr, PeerId}; -use libp2p_swarm::{dummy, FromSwarm, NetworkBehaviour}; +use libp2p_swarm::{dummy, NetworkBehaviour}; use crate::store::{AddressSource, Store}; @@ -21,8 +18,6 @@ pub enum Event { pub struct Behaviour { store: S, - /// Peers that are currently connected. - connected_peers: HashSet, /// Pending Events to be emitted back to the [`libp2p_swarm::Swarm`]. pending_events: VecDeque>, } @@ -34,16 +29,10 @@ where pub fn new(store: S) -> Self { Self { store, - connected_peers: HashSet::new(), pending_events: VecDeque::new(), } } - /// List peers that are currently connected to this peer. - pub fn list_connected(&self) -> impl Iterator { - self.connected_peers.iter() - } - /// Try to get all observed address of the given peer. /// Returns `None` when the peer is not in the store. pub fn address_of_peer<'b>( @@ -86,12 +75,6 @@ where &mut self.store } - fn on_peer_connect(&mut self, peer: &PeerId) { - self.connected_peers.insert(*peer); - } - fn on_peer_disconnect(&mut self, peer: &PeerId) { - self.connected_peers.remove(peer); - } fn on_address_update( &mut self, peer: &PeerId, @@ -169,19 +152,6 @@ where if let Some(ev) = self.store.on_swarm_event(&event) { self.handle_store_event(ev); }; - match event { - FromSwarm::ConnectionClosed(info) => { - if info.remaining_established < 1 { - self.on_peer_disconnect(&info.peer_id); - } - } - FromSwarm::ConnectionEstablished(info) => { - if info.other_established == 0 { - self.on_peer_connect(&info.peer_id); - } - } - _ => {} - } } fn on_connection_handler_event( From a02f088dffb9a6bb1d24331767ea443dec2d2ac8 Mon Sep 17 00:00:00 2001 From: drHuangMHT Date: Fri, 31 Jan 2025 20:45:47 +0800 Subject: [PATCH 26/26] allow attaching custom data --- misc/peer-store/src/memory_store.rs | 122 +++++++++++++++++++--------- misc/peer-store/src/store.rs | 121 +++++++++++++-------------- 2 files changed, 144 insertions(+), 99 deletions(-) diff --git a/misc/peer-store/src/memory_store.rs b/misc/peer-store/src/memory_store.rs index 594b5cd2097..9d8c27be6c0 100644 --- a/misc/peer-store/src/memory_store.rs +++ b/misc/peer-store/src/memory_store.rs @@ -1,5 +1,5 @@ use std::{ - collections::HashMap, + collections::{HashMap, VecDeque}, num::NonZeroUsize, task::{Context, Poll}, time::{Duration, Instant}, @@ -9,37 +9,65 @@ use futures_timer::Delay; use futures_util::FutureExt; use libp2p_core::{Multiaddr, PeerId}; use libp2p_swarm::FromSwarm; +use record::PeerRecord; -use super::{store::Event, Store}; +use super::Store; use crate::{store::AddressSource, Behaviour}; +#[derive(Debug, Clone)] +pub enum Event { + CustomDataUpdated(PeerId), +} + /// A in-memory store. #[derive(Default)] -pub struct MemoryStore { - /// An address book of peers regardless of their status(connected or not). - address_book: HashMap, +pub struct MemoryStore { + /// The internal store. + records: HashMap>, + pending_events: VecDeque, record_ttl_timer: Option, config: Config, } -impl MemoryStore { +impl MemoryStore { pub fn new(config: Config) -> Self { Self { config, - ..Default::default() + records: HashMap::new(), + record_ttl_timer: None, + pending_events: VecDeque::default(), } } fn check_record_ttl(&mut self) { let now = Instant::now(); - for r in &mut self.address_book.values_mut() { - r.check_ttl(now, self.config.record_ttl); + for r in &mut self.records.values_mut() { + r.check_addresses_ttl(now, self.config.record_ttl); + } + } + + pub fn get_custom_data(&self, peer: &PeerId) -> Option<&T> { + self.records.get(peer).and_then(|r| r.get_custom_data()) + } + pub fn take_custom_data(&mut self, peer: &PeerId) -> Option { + self.records + .get_mut(peer) + .and_then(|r| r.take_custom_data()) + } + pub fn insert_custom_data(&mut self, peer: &PeerId, custom_data: T) { + if let Some(r) = self.records.get_mut(peer) { + return r.insert_custom_data(custom_data); } + let mut new_record = PeerRecord::new(self.config.record_capacity); + new_record.insert_custom_data(custom_data); + self.records.insert(*peer, new_record); + self.pending_events + .push_back(Event::CustomDataUpdated(*peer)); } } -impl Store for MemoryStore { - type ToSwarm = (); +impl Store for MemoryStore { + type ToSwarm = Event; fn update_address( &mut self, @@ -48,12 +76,12 @@ impl Store for MemoryStore { source: AddressSource, should_expire: bool, ) -> bool { - if let Some(record) = self.address_book.get_mut(peer) { + if let Some(record) = self.records.get_mut(peer) { return record.update_address(address, source, should_expire); } - let mut new_record = record::PeerAddressRecord::new(self.config.record_capacity); + let mut new_record = record::PeerRecord::new(self.config.record_capacity); new_record.update_address(address, source, should_expire); - self.address_book.insert(*peer, new_record); + self.records.insert(*peer, new_record); true } @@ -64,27 +92,27 @@ impl Store for MemoryStore { should_expire: bool, ) -> bool { let peer = signed_record.peer_id(); - if let Some(record) = self.address_book.get_mut(&peer) { + if let Some(record) = self.records.get_mut(&peer) { return record.update_certified_address(signed_record, source, should_expire); } - let mut new_record = record::PeerAddressRecord::new(self.config.record_capacity); + let mut new_record = record::PeerRecord::new(self.config.record_capacity); new_record.update_certified_address(signed_record, source, should_expire); - self.address_book.insert(peer, new_record); + self.records.insert(peer, new_record); true } fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool { - if let Some(record) = self.address_book.get_mut(peer) { + if let Some(record) = self.records.get_mut(peer) { return record.remove_address(address); } false } - fn on_swarm_event(&mut self, swarm_event: &FromSwarm) -> Option { + fn on_swarm_event(&mut self, swarm_event: &FromSwarm) -> Option { match swarm_event { FromSwarm::NewExternalAddrOfPeer(info) => { if self.update_address(&info.peer_id, info.addr, AddressSource::Behaviour, true) { - return Some(Event::RecordUpdated(info.peer_id)); + return Some(super::store::Event::RecordUpdated(info.peer_id)); } None } @@ -100,7 +128,7 @@ impl Store for MemoryStore { false, ); if is_record_updated { - return Some(Event::RecordUpdated(info.peer_id)); + return Some(super::store::Event::RecordUpdated(info.peer_id)); } None } @@ -109,15 +137,15 @@ impl Store for MemoryStore { } fn addresses_of_peer(&self, peer: &PeerId) -> Option> { - self.address_book.get(peer).map(|record| { + self.records.get(peer).map(|record| { record - .records() + .addresses() .filter(|(_, r)| !self.config.strict_mode || r.signature.is_some()) .map(|(addr, _)| addr) }) } - fn poll(&mut self, cx: &mut Context<'_>) -> Option<()> { + fn poll(&mut self, cx: &mut Context<'_>) -> Option { if let Some(mut timer) = self.record_ttl_timer.take() { if let Poll::Ready(()) = timer.poll_unpin(cx) { self.check_record_ttl(); @@ -125,17 +153,23 @@ impl Store for MemoryStore { } self.record_ttl_timer = Some(timer) } + if let Some(ev) = self.pending_events.pop_front() { + return Some(ev); + } None } } -impl Behaviour { +impl Behaviour> +where + T: 'static, +{ /// Get all stored address records of the peer, not affected by `strict_mode`. pub fn address_record_of_peer( &self, peer: &PeerId, ) -> Option> { - self.store().address_book.get(peer).map(|r| r.records()) + self.store().records.get(peer).map(|r| r.addresses()) } } @@ -165,23 +199,24 @@ impl Default for Config { mod record { use std::rc::Rc; - use libp2p_core::PeerRecord; use lru::LruCache; use super::*; - pub(crate) struct PeerAddressRecord { + pub(crate) struct PeerRecord { /// A LRU(Least Recently Used) cache for addresses. /// Will delete the least-recently-used record when full. addresses: LruCache, + custom: Option, } - impl PeerAddressRecord { + impl PeerRecord { pub(crate) fn new(capacity: NonZeroUsize) -> Self { Self { addresses: LruCache::new(capacity), + custom: None, } } - pub(crate) fn records(&self) -> impl Iterator { + pub(crate) fn addresses(&self) -> impl Iterator { self.addresses.iter() } pub(crate) fn update_address( @@ -202,7 +237,7 @@ mod record { } pub(crate) fn update_certified_address( &mut self, - signed_record: &PeerRecord, + signed_record: &libp2p_core::PeerRecord, source: AddressSource, should_expire: bool, ) -> bool { @@ -225,7 +260,7 @@ mod record { pub(crate) fn remove_address(&mut self, address: &Multiaddr) -> bool { self.addresses.pop(address).is_some() } - pub(crate) fn check_ttl(&mut self, now: Instant, ttl: Duration) { + pub(crate) fn check_addresses_ttl(&mut self, now: Instant, ttl: Duration) { let mut records_to_be_deleted = Vec::new(); for (k, record) in self.addresses.iter() { if record.is_expired(now, ttl) { @@ -236,6 +271,15 @@ mod record { self.addresses.pop(&k); } } + pub(crate) fn get_custom_data(&self) -> Option<&T> { + self.custom.as_ref() + } + pub(crate) fn take_custom_data(&mut self) -> Option { + self.custom.take() + } + pub(crate) fn insert_custom_data(&mut self, custom_data: T) { + let _ = self.custom.insert(custom_data); + } } pub struct AddressRecord { @@ -288,7 +332,7 @@ mod test { record_ttl: Duration::from_millis(1), ..Default::default() }; - let mut store = MemoryStore::new(config); + let mut store: MemoryStore<()> = MemoryStore::new(config); let fake_peer = PeerId::random(); let addr_no_expire = Multiaddr::from_str("/ip4/127.0.0.1").expect("parsing to succeed"); let addr_should_expire = Multiaddr::from_str("/ip4/127.0.0.2").expect("parsing to succeed"); @@ -318,7 +362,7 @@ mod test { #[test] fn recent_use_bubble_up() { - let mut store = MemoryStore::new(Default::default()); + let mut store: MemoryStore<()> = MemoryStore::new(Default::default()); let fake_peer = PeerId::random(); let addr1 = Multiaddr::from_str("/ip4/127.0.0.1").expect("parsing to succeed"); let addr2 = Multiaddr::from_str("/ip4/127.0.0.2").expect("parsing to succeed"); @@ -336,10 +380,10 @@ mod test { ); assert!( *store - .address_book + .records .get(&fake_peer) .expect("peer to be in the store") - .records() + .addresses() .last() .expect("addr in the record") .0 @@ -353,10 +397,10 @@ mod test { ); assert!( *store - .address_book + .records .get(&fake_peer) .expect("peer to be in the store") - .records() + .addresses() .last() .expect("addr in the record") .0 @@ -366,7 +410,7 @@ mod test { #[test] fn bounded_store() { - let mut store = MemoryStore::new(Default::default()); + let mut store: MemoryStore<()> = MemoryStore::new(Default::default()); let fake_peer = PeerId::random(); for i in 1..10 { let addr_string = format!("/ip4/127.0.0.{}", i); diff --git a/misc/peer-store/src/store.rs b/misc/peer-store/src/store.rs index 24cce5a9d2a..39ee2095e7c 100644 --- a/misc/peer-store/src/store.rs +++ b/misc/peer-store/src/store.rs @@ -1,60 +1,61 @@ -use std::task::Context; - -use libp2p_core::{Multiaddr, PeerId}; -use libp2p_swarm::FromSwarm; - -/// A store that -/// - contains all observed addresses of peers; -pub trait Store { - - /// Event generated by the store and emitted to [`Swarm`](libp2p_swarm::Swarm). - /// The behaviour cannot handle this event. - type ToSwarm; - - /// Update an address record. - /// Returns `true` when the address is new. - fn update_address( - &mut self, - peer: &PeerId, - address: &Multiaddr, - source: AddressSource, - should_expire: bool, - ) -> bool; - - /// Update an address record. - /// Returns `true` when the address is new. - fn update_certified_address( - &mut self, - signed_record: &libp2p_core::PeerRecord, - source: AddressSource, - should_expire: bool, - ) -> bool; - - /// Remove an address record. - /// Returns `true` when the address is removed. - fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool; - - /// How this store handles events from the swarm. - fn on_swarm_event(&mut self, event: &FromSwarm) -> Option; - - /// Get all stored addresses of the peer. - fn addresses_of_peer(&self, peer: &PeerId) -> Option>; - - /// Trigger grabage collection for records. - fn poll(&mut self, cx: &mut Context<'_>) -> Option; -} - -pub enum Event { - RecordUpdated(PeerId), -} - -/// How the address is discovered. -#[derive(Debug, Clone, Copy)] -pub enum AddressSource { - /// The address is discovered from a behaviour(e.g. kadelima, identify). - Behaviour, - /// We have direct connection to the address. - DirectConnection, - /// The address is manually added. - Manual, -} +use std::task::Context; + +use libp2p_core::{Multiaddr, PeerId}; +use libp2p_swarm::FromSwarm; + +/// A store that +/// - contains all observed addresses of peers; +pub trait Store { + /// Event generated by the store and emitted to [`Swarm`](libp2p_swarm::Swarm). + /// The behaviour cannot handle this event. + type ToSwarm; + + /// Update an address record. + /// Returns `true` when the address is new. + fn update_address( + &mut self, + peer: &PeerId, + address: &Multiaddr, + source: AddressSource, + should_expire: bool, + ) -> bool; + + /// Update an address record. + /// Returns `true` when the address is new. + fn update_certified_address( + &mut self, + signed_record: &libp2p_core::PeerRecord, + source: AddressSource, + should_expire: bool, + ) -> bool; + + /// Remove an address record. + /// Returns `true` when the address is removed. + fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool; + + /// How this store handles events from the swarm. + fn on_swarm_event(&mut self, event: &FromSwarm) -> Option; + + /// Get all stored addresses of the peer. + fn addresses_of_peer(&self, peer: &PeerId) -> Option>; + + /// Trigger grabage collection for records. + fn poll(&mut self, cx: &mut Context<'_>) -> Option; +} + +/// Event that will be handled by the behaviour. +/// `Store::ToSwarm` should be a superset of this event. +pub enum Event { + RecordUpdated(PeerId), +} + +/// How the address is discovered. +#[derive(Debug, Clone, Copy)] +pub enum AddressSource { + /// The address is discovered from a behaviour(e.g. kadelima, identify). + Behaviour, + /// We have direct connection to the address. + DirectConnection, + /// The address is manually added. + Manual, +}