Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ban buggy or malicious relays #733

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
Prev Previous commit
Next Next commit
pool: auto-closing subscription guard
Signed-off-by: Yuki Kishimoto <[email protected]>
yukibtc committed Jan 30, 2025
commit 5f933f6b1c8c3b66848a2896bf552c24b1f76c52
95 changes: 79 additions & 16 deletions crates/nostr-relay-pool/src/relay/inner.rs
Original file line number Diff line number Diff line change
@@ -120,6 +120,58 @@ impl SubscriptionData {
}
}

/// Automatically removes the subscription when dropped
pub(crate) struct AutoClosingSubscriptionGuard<'a> {
url: &'a RelayUrl,
id: SubscriptionId,
subscriptions: &'a RwLock<HashMap<SubscriptionId, SubscriptionData>>,
dropped: bool,
}

impl<'a> AutoClosingSubscriptionGuard<'a> {
fn new(
url: &'a RelayUrl,
id: SubscriptionId,
subscriptions: &'a RwLock<HashMap<SubscriptionId, SubscriptionData>>,
) -> Self {
Self {
url,
id,
subscriptions,
dropped: false,
}
}
}

impl Drop for AutoClosingSubscriptionGuard<'_> {
fn drop(&mut self) {
// Check if already dropped
if self.dropped {
return;
}

#[cfg(debug_assertions)]
tracing::debug!(url = %self.url, id = %self.id, "Dropping auto-closing subscription guard.");

loop {
// Remove from map
let mut subscriptions = match self.subscriptions.try_write() {
Ok(subscriptions) => subscriptions,
Err(_) => continue,
};

subscriptions.remove(&self.id);
break;
}

#[cfg(debug_assertions)]
tracing::debug!(url = %self.url, id = %self.id, "Auto-closing subscription guard dropped.");

// Mark as dropped
self.dropped = true;
}
}

// Instead of wrap every field in an `Arc<T>`, which increases the number of atomic operations,
// put all fields that require an `Arc` here.
#[derive(Debug)]
@@ -305,16 +357,25 @@ impl InnerRelay {
})
}

pub(crate) async fn add_auto_closing_subscription(
/// Save the subscription in the map and when [`AutoClosingSubscriptionGuard`] drops, auto-remove it.
#[must_use]
pub(crate) async fn register_auto_closing_subscription(
&self,
id: SubscriptionId,
filter: Filter,
) {
let mut subscriptions = self.atomic.subscriptions.write().await;
subscriptions.insert(
id,
SubscriptionData::AutoClosing { filter },
);
) -> AutoClosingSubscriptionGuard {
// Insert into the subscription map
{
let mut subscriptions = self.atomic.subscriptions.write().await;
subscriptions.insert(
id.clone(),
SubscriptionData::AutoClosing {
filter,
},
);
}

AutoClosingSubscriptionGuard::new(&self.url, id, &self.atomic.subscriptions)
}

pub(crate) async fn update_long_lived_subscription(
@@ -387,11 +448,6 @@ impl InnerRelay {
}
}

pub(crate) async fn remove_subscription(&self, id: &SubscriptionId) {
let mut subscriptions = self.atomic.subscriptions.write().await;
subscriptions.remove(id);
}

#[inline]
pub fn queue(&self) -> usize {
self.atomic.channels.nostr_queue()
@@ -1262,6 +1318,11 @@ impl InnerRelay {
) {
let relay = self.clone(); // <-- FULL RELAY CLONE HERE
task::spawn(async move {
// Register auto-closing subscription
let _guard: AutoClosingSubscriptionGuard = relay
.register_auto_closing_subscription(id.clone(), filter.clone())
.await;

// Check if CLOSE needed
let to_close: bool = match relay
.handle_auto_closing(&id, filter, opts, notifications)
@@ -1285,9 +1346,6 @@ impl InnerRelay {
}
};

// Remove subscription
relay.remove_subscription(&id).await;

// Close subscription
if to_close {
tracing::debug!(id = %id, "Auto-closing subscription.");
@@ -1438,16 +1496,21 @@ impl InnerRelay {

pub async fn unsubscribe(&self, id: SubscriptionId) -> Result<(), Error> {
// Remove subscription
self.remove_subscription(&id).await;
{
let mut subscriptions = self.atomic.subscriptions.write().await;
subscriptions.remove(&id);
}

// Send CLOSE message
self.send_msg(ClientMessage::close(id))
}

pub async fn unsubscribe_all_long_lived(&self) -> Result<(), Error> {
// TODO: don't clone the map
let subscriptions = self.long_lived_subscriptions().await;

for id in subscriptions.into_keys() {
// TODO: use `subscriptions.remove(id)` and then `self.send_msg(ClientMessage::close(id))`
self.unsubscribe(id).await?;
}

11 changes: 7 additions & 4 deletions crates/nostr-relay-pool/src/relay/mod.rs
Original file line number Diff line number Diff line change
@@ -489,10 +489,6 @@ impl Relay {
// Send REQ message
self.inner.send_msg(msg)?;

self.inner
.add_auto_closing_subscription(id.clone(), filter.clone())
.await;

// Spawn auto-closing handler
self.inner
.spawn_auto_closing_handler(id, filter, opts, notifications)
@@ -683,6 +679,13 @@ impl Relay {
// Generate subscription ID for getting events
let down_sub_id: SubscriptionId = SubscriptionId::generate();

// Register auto-closing subscription
let _guard = self
.inner
.register_auto_closing_subscription(down_sub_id.clone(), filter.clone())
.await;

// Sync
match self
.inner
.sync_new(down_sub_id.clone(), filter.clone(), items.clone(), opts, &mut output)