Skip to content

Commit

Permalink
chore: refactor filter to react when the remote peer closes the stream (
Browse files Browse the repository at this point in the history
#3281)

Better control when the remote peer closes the WakuFilterPushCodec
stream.
For example, go-waku closes the stream for every received message.
On the other hand, js-waku keeps the stream opened.
Therefore, we support both scenarios.
  • Loading branch information
Ivansete-status authored Feb 6, 2025
1 parent 8a7e602 commit ce7f09a
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 25 deletions.
3 changes: 2 additions & 1 deletion waku/incentivization/eligibility_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ type EligibilityManager* = ref object # FIXME: make web3 private?
proc init*(
T: type EligibilityManager, ethClient: string
): Future[EligibilityManager] {.async.} =
return EligibilityManager(web3: await newWeb3(ethClient), seenTxIds: initHashSet[TxHash]())
return
EligibilityManager(web3: await newWeb3(ethClient), seenTxIds: initHashSet[TxHash]())
# TODO: handle error if web3 instance is not established

# Clean up the web3 instance
Expand Down
36 changes: 36 additions & 0 deletions waku/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,42 @@ proc connectedPeers*(

return (inPeers, outPeers)

proc getStreamByPeerIdAndProtocol*(
pm: PeerManager, peerId: PeerId, protocol: string
): Future[Result[Connection, string]] {.async.} =
## Establishes a new stream to the given peer and protocol or returns the existing stream, if any.
## Notice that the "Connection" type represents a stream within a transport connection
## (we will need to adapt this term.)

let peerIdsMuxers: Table[PeerId, seq[Muxer]] = pm.switch.connManager.getConnections()
if not peerIdsMuxers.contains(peerId):
return err("peerId not found in connManager: " & $peerId)

let muxers = peerIdsMuxers[peerId]

var streams = newSeq[Connection](0)
for m in muxers:
for s in m.getStreams():
## getStreams is defined in nim-libp2p
streams.add(s)

## Try to get the opened streams for the given protocol
let streamsOfInterest = streams.filterIt(
it.protocol == protocol and not LPStream(it).isClosed and
not LPStream(it).isClosedRemotely
)

if streamsOfInterest.len > 0:
## In theory there should be one stream per protocol. Then we just pick up the 1st
return ok(streamsOfInterest[0])

## There isn't still a stream. Let's dial to create one
let streamRes = await pm.dialPeer(peerId, protocol)
if streamRes.isNone():
return err("getStreamByPeerIdProto no connection to peer: " & $peerId)

return ok(streamRes.get())

proc connectToRelayPeers*(pm: PeerManager) {.async.} =
var (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
let totalRelayPeers = inRelayPeers.len + outRelayPeers.len
Expand Down
32 changes: 9 additions & 23 deletions waku/waku_filter_v2/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -172,29 +172,15 @@ proc pushToPeer(
): Future[Result[void, string]] {.async.} =
debug "pushing message to subscribed peer", peerId = shortLog(peerId)

if not wf.peerManager.wakuPeerStore.hasPeer(peerId, WakuFilterPushCodec):
# Check that peer has not been removed from peer store
error "no addresses for peer", peerId = shortLog(peerId)
return err("no addresses for peer: " & $peerId)

let conn =
if wf.peerConnections.contains(peerId):
wf.peerConnections[peerId]
else:
## we never pushed a message before, let's dial then
let connRes = await wf.peerManager.dialPeer(peerId, WakuFilterPushCodec)
if connRes.isNone():
## We do not remove this peer, but allow the underlying peer manager
## to do so if it is deemed necessary
error "pushToPeer no connection to peer", peerId = shortLog(peerId)
return err("pushToPeer no connection to peer: " & shortLog(peerId))

let newConn = connRes.get()
wf.peerConnections[peerId] = newConn
newConn

await conn.writeLp(buffer)
debug "published successful", peerId = shortLog(peerId), conn
let stream = (
await wf.peerManager.getStreamByPeerIdAndProtocol(peerId, WakuFilterPushCodec)
).valueOr:
error "pushToPeer failed", error
return err("pushToPeer failed: " & $error)

await stream.writeLp(buffer)

debug "published successful", peerId = shortLog(peerId), stream
waku_service_network_bytes.inc(
amount = buffer.len().int64, labelValues = [WakuFilterPushCodec, "out"]
)
Expand Down

0 comments on commit ce7f09a

Please sign in to comment.