Skip to content

Commit

Permalink
feat: waku store sync 2.0 config & setup (#3217)
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS authored Jan 24, 2025
1 parent 81f24cc commit 7f64dc0
Show file tree
Hide file tree
Showing 11 changed files with 96 additions and 50 deletions.
3 changes: 3 additions & 0 deletions tests/all_tests_waku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ import
./waku_store_legacy/test_waku_store,
./waku_store_legacy/test_wakunode_store

# Waku store sync suite
import ./waku_store_sync/test_all

when defined(waku_exp_store_resume):
# TODO: Review store resume test cases (#1282)
import ./waku_store_legacy/test_resume
Expand Down
3 changes: 3 additions & 0 deletions tests/waku_store_sync/test_all.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{.used.}

import ./test_protocol, ./test_storage, ./test_codec
14 changes: 10 additions & 4 deletions tests/waku_store_sync/test_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ suite "Waku Sync: reconciliation":
clientPeerInfo = clientSwitch.peerInfo.toRemotePeerInfo()

asyncTeardown:
await allFutures(server.stop(), client.stop())
server.stop()
client.stop()

await allFutures(serverSwitch.stop(), clientSwitch.stop())

asyncTest "sync 2 nodes both empty":
Expand Down Expand Up @@ -345,7 +347,9 @@ suite "Waku Sync: transfer":
clientPeermanager.addPeer(serverPeerInfo)

asyncTeardown:
await allFutures(server.stopWait(), client.stopWait())
server.stop()
client.stop()

await allFutures(serverSwitch.stop(), clientSwitch.stop())

asyncTest "transfer 1 message":
Expand All @@ -364,7 +368,7 @@ suite "Waku Sync: transfer":
await serverRemoteNeeds.put(need)

# give time for transfer to happen
await sleepAsync(250.milliseconds)
await sleepAsync(500.milliseconds)

var query = ArchiveQuery()
query.includeData = true
Expand All @@ -373,5 +377,7 @@ suite "Waku Sync: transfer":
let res = await clientArchive.findMessages(query)
assert res.isOk(), $res.error

let response = res.get()

check:
msg == res.get().messages[0]
response.messages.len > 0
8 changes: 8 additions & 0 deletions waku/factory/node_factory.nim
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,14 @@ proc setupProtocols(
else:
return err("failed to set node waku filter peer: " & filterNode.error)

if conf.storeSync:
(
await node.mountStoreSync(
conf.storeSyncRange, conf.storeSyncInterval, conf.storeSyncRelayJitter
)
).isOkOr:
return err("failed to mount waku store sync protocol: " & $error)

# waku peer exchange setup
if conf.peerExchange:
try:
Expand Down
51 changes: 51 additions & 0 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import
../waku_store/client as store_client,
../waku_store/common as store_common,
../waku_store/resume,
../waku_store_sync,
../waku_filter_v2,
../waku_filter_v2/client as filter_client,
../waku_filter_v2/subscriptions as filter_subscriptions,
Expand Down Expand Up @@ -99,6 +100,8 @@ type
wakuStore*: store.WakuStore
wakuStoreClient*: store_client.WakuStoreClient
wakuStoreResume*: StoreResume
wakuStoreReconciliation*: SyncReconciliation
wakuStoreTransfer*: SyncTransfer
wakuFilter*: waku_filter_v2.WakuFilter
wakuFilterClient*: filter_client.WakuFilterClient
wakuRlnRelay*: WakuRLNRelay
Expand Down Expand Up @@ -201,6 +204,35 @@ proc mountSharding*(
node.wakuSharding = Sharding(clusterId: clusterId, shardCountGenZero: shardCount)
return ok()

## Waku Sync

proc mountStoreSync*(
node: WakuNode,
storeSyncRange = 3600,
storeSyncInterval = 300,
storeSyncRelayJitter = 20,
): Future[Result[void, string]] {.async.} =
let idsChannel = newAsyncQueue[SyncID](100)
let wantsChannel = newAsyncQueue[(PeerId, WakuMessageHash)](100)
let needsChannel = newAsyncQueue[(PeerId, WakuMessageHash)](100)

let recon =
?await SyncReconciliation.new(
node.peerManager, node.wakuArchive, storeSyncRange.seconds,
storeSyncInterval.seconds, storeSyncRelayJitter.seconds, idsChannel, wantsChannel,
needsChannel,
)

node.wakuStoreReconciliation = recon

let transfer = SyncTransfer.new(
node.peerManager, node.wakuArchive, idsChannel, wantsChannel, needsChannel
)

node.wakuStoreTransfer = transfer

return ok()

## Waku relay

proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
Expand Down Expand Up @@ -231,12 +263,19 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =

await node.wakuArchive.handleMessage(topic, msg)

proc syncHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
if node.wakuStoreReconciliation.isNil():
return

node.wakuStoreReconciliation.messageIngress(topic, msg)

let defaultHandler = proc(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await traceHandler(topic, msg)
await filterHandler(topic, msg)
await archiveHandler(topic, msg)
await syncHandler(topic, msg)

discard node.wakuRelay.subscribe(topic, defaultHandler)

Expand Down Expand Up @@ -1295,6 +1334,12 @@ proc start*(node: WakuNode) {.async.} =
if not node.wakuRendezvous.isNil():
await node.wakuRendezvous.start()

if not node.wakuStoreReconciliation.isNil():
node.wakuStoreReconciliation.start()

if not node.wakuStoreTransfer.isNil():
node.wakuStoreTransfer.start()

## The switch uses this mapper to update peer info addrs
## with announced addrs after start
let addressMapper = proc(
Expand Down Expand Up @@ -1334,6 +1379,12 @@ proc stop*(node: WakuNode) {.async.} =
if not node.wakuStoreResume.isNil():
await node.wakuStoreResume.stopWait()

if not node.wakuStoreReconciliation.isNil():
node.wakuStoreReconciliation.stop()

if not node.wakuStoreTransfer.isNil():
node.wakuStoreTransfer.stop()

if not node.wakuPeerExchange.isNil() and not node.wakuPeerExchange.pxLoopHandle.isNil():
await node.wakuPeerExchange.pxLoopHandle.cancelAndWait()

Expand Down
6 changes: 3 additions & 3 deletions waku/waku_archive/archive.nim
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,12 @@ proc syncMessageIngress*(
(await self.driver.put(msgHash, pubsubTopic, msg)).isOkOr:
waku_archive_errors.inc(labelValues = [insertFailure])
trace "failed to insert message",
msg_hash = msgHash.to0xHex(),
msg_hash = msgHash.toHex(),
pubsubTopic = pubsubTopic,
contentTopic = msg.contentTopic,
timestamp = msg.timestamp,
error = error
return err("failed to insert message")
error = $error
return err(error)

trace "message archived",
msg_hash = msgHash.to0xHex(),
Expand Down
8 changes: 8 additions & 0 deletions waku/waku_store_sync.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{.push raises: [].}

import
./waku_store_sync/reconciliation,
./waku_store_sync/transfer,
./waku_store_sync/common

export reconciliation, transfer, common
8 changes: 4 additions & 4 deletions waku/waku_store_sync/reconciliation.nim
Original file line number Diff line number Diff line change
Expand Up @@ -355,13 +355,13 @@ proc start*(self: SyncReconciliation) =

info "Store Sync Reconciliation protocol started"

proc stopWait*(self: SyncReconciliation) {.async.} =
proc stop*(self: SyncReconciliation) =
if self.syncInterval > ZeroDuration:
await self.periodicSyncFut.cancelAndWait()
self.periodicSyncFut.cancelSoon()

if self.syncInterval > ZeroDuration:
await self.periodicPruneFut.cancelAndWait()
self.periodicPruneFut.cancelSoon()

await self.idsReceiverFut.cancelAndWait()
self.idsReceiverFut.cancelSoon()

info "Store Sync Reconciliation protocol stopped"
3 changes: 3 additions & 0 deletions waku/waku_store_sync/storage/seq_storage.nim
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ method prune*(self: SeqStorage, timestamp: Timestamp): int {.raises: [].} =
## Remove all elements before the timestamp.
## Returns # of elements pruned.

if self.elements.len == 0:
return 0

let bound = SyncID(time: timestamp, hash: EmptyWakuMessageHash)

let idx = self.elements.lowerBound(bound, common.cmp)
Expand Down
8 changes: 3 additions & 5 deletions waku/waku_store_sync/transfer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import
../common/protobuf,
../waku_enr,
../waku_core/codecs,
../waku_core/time,
../waku_core/topics/pubsub_topic,
../waku_core/message/digest,
../waku_core/message/message,
../waku_core/message/default_values,
Expand Down Expand Up @@ -215,10 +213,10 @@ proc start*(self: SyncTransfer) =

info "Store Sync Transfer protocol started"

proc stopWait*(self: SyncTransfer) {.async.} =
proc stop*(self: SyncTransfer) =
self.started = false

await self.localWantsRxFut.cancelAndWait()
await self.remoteNeedsRxFut.cancelAndWait()
self.localWantsRxFut.cancelSoon()
self.remoteNeedsRxFut.cancelSoon()

info "Store Sync Transfer protocol stopped"
34 changes: 0 additions & 34 deletions waku/waku_sync/common.nim

This file was deleted.

0 comments on commit 7f64dc0

Please sign in to comment.