From 7f64dc03aab1e9b89c7311ff1ea6bde8d69c4dc4 Mon Sep 17 00:00:00 2001 From: Simon-Pierre Vivier Date: Fri, 24 Jan 2025 11:46:11 -0500 Subject: [PATCH] feat: waku store sync 2.0 config & setup (#3217) --- tests/all_tests_waku.nim | 3 ++ tests/waku_store_sync/test_all.nim | 3 ++ tests/waku_store_sync/test_protocol.nim | 14 ++++-- waku/factory/node_factory.nim | 8 +++ waku/node/waku_node.nim | 51 ++++++++++++++++++++ waku/waku_archive/archive.nim | 6 +-- waku/waku_store_sync.nim | 8 +++ waku/waku_store_sync/reconciliation.nim | 8 +-- waku/waku_store_sync/storage/seq_storage.nim | 3 ++ waku/waku_store_sync/transfer.nim | 8 ++- waku/waku_sync/common.nim | 34 ------------- 11 files changed, 96 insertions(+), 50 deletions(-) create mode 100644 tests/waku_store_sync/test_all.nim create mode 100644 waku/waku_store_sync.nim delete mode 100644 waku/waku_sync/common.nim diff --git a/tests/all_tests_waku.nim b/tests/all_tests_waku.nim index 212001b393..46c235b513 100644 --- a/tests/all_tests_waku.nim +++ b/tests/all_tests_waku.nim @@ -54,6 +54,9 @@ import ./waku_store_legacy/test_waku_store, ./waku_store_legacy/test_wakunode_store +# Waku store sync suite +import ./waku_store_sync/test_all + when defined(waku_exp_store_resume): # TODO: Review store resume test cases (#1282) import ./waku_store_legacy/test_resume diff --git a/tests/waku_store_sync/test_all.nim b/tests/waku_store_sync/test_all.nim new file mode 100644 index 0000000000..82daa388b7 --- /dev/null +++ b/tests/waku_store_sync/test_all.nim @@ -0,0 +1,3 @@ +{.used.} + +import ./test_protocol, ./test_storage, ./test_codec diff --git a/tests/waku_store_sync/test_protocol.nim b/tests/waku_store_sync/test_protocol.nim index 7ca783152f..d3ffa187f3 100644 --- a/tests/waku_store_sync/test_protocol.nim +++ b/tests/waku_store_sync/test_protocol.nim @@ -57,7 +57,9 @@ suite "Waku Sync: reconciliation": clientPeerInfo = clientSwitch.peerInfo.toRemotePeerInfo() asyncTeardown: - await allFutures(server.stop(), client.stop()) + server.stop() + client.stop() + await allFutures(serverSwitch.stop(), clientSwitch.stop()) asyncTest "sync 2 nodes both empty": @@ -345,7 +347,9 @@ suite "Waku Sync: transfer": clientPeermanager.addPeer(serverPeerInfo) asyncTeardown: - await allFutures(server.stopWait(), client.stopWait()) + server.stop() + client.stop() + await allFutures(serverSwitch.stop(), clientSwitch.stop()) asyncTest "transfer 1 message": @@ -364,7 +368,7 @@ suite "Waku Sync: transfer": await serverRemoteNeeds.put(need) # give time for transfer to happen - await sleepAsync(250.milliseconds) + await sleepAsync(500.milliseconds) var query = ArchiveQuery() query.includeData = true @@ -373,5 +377,7 @@ suite "Waku Sync: transfer": let res = await clientArchive.findMessages(query) assert res.isOk(), $res.error + let response = res.get() + check: - msg == res.get().messages[0] + response.messages.len > 0 diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 33e84f8dee..852ec460dd 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -396,6 +396,14 @@ proc setupProtocols( else: return err("failed to set node waku filter peer: " & filterNode.error) + if conf.storeSync: + ( + await node.mountStoreSync( + conf.storeSyncRange, conf.storeSyncInterval, conf.storeSyncRelayJitter + ) + ).isOkOr: + return err("failed to mount waku store sync protocol: " & $error) + # waku peer exchange setup if conf.peerExchange: try: diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 2158cb62ac..6dc68ea2bf 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -34,6 +34,7 @@ import ../waku_store/client as store_client, ../waku_store/common as store_common, ../waku_store/resume, + ../waku_store_sync, ../waku_filter_v2, ../waku_filter_v2/client as filter_client, ../waku_filter_v2/subscriptions as filter_subscriptions, @@ -99,6 +100,8 @@ type wakuStore*: store.WakuStore wakuStoreClient*: store_client.WakuStoreClient wakuStoreResume*: StoreResume + wakuStoreReconciliation*: SyncReconciliation + wakuStoreTransfer*: SyncTransfer wakuFilter*: waku_filter_v2.WakuFilter wakuFilterClient*: filter_client.WakuFilterClient wakuRlnRelay*: WakuRLNRelay @@ -201,6 +204,35 @@ proc mountSharding*( node.wakuSharding = Sharding(clusterId: clusterId, shardCountGenZero: shardCount) return ok() +## Waku Sync + +proc mountStoreSync*( + node: WakuNode, + storeSyncRange = 3600, + storeSyncInterval = 300, + storeSyncRelayJitter = 20, +): Future[Result[void, string]] {.async.} = + let idsChannel = newAsyncQueue[SyncID](100) + let wantsChannel = newAsyncQueue[(PeerId, WakuMessageHash)](100) + let needsChannel = newAsyncQueue[(PeerId, WakuMessageHash)](100) + + let recon = + ?await SyncReconciliation.new( + node.peerManager, node.wakuArchive, storeSyncRange.seconds, + storeSyncInterval.seconds, storeSyncRelayJitter.seconds, idsChannel, wantsChannel, + needsChannel, + ) + + node.wakuStoreReconciliation = recon + + let transfer = SyncTransfer.new( + node.peerManager, node.wakuArchive, idsChannel, wantsChannel, needsChannel + ) + + node.wakuStoreTransfer = transfer + + return ok() + ## Waku relay proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) = @@ -231,12 +263,19 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) = await node.wakuArchive.handleMessage(topic, msg) + proc syncHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = + if node.wakuStoreReconciliation.isNil(): + return + + node.wakuStoreReconciliation.messageIngress(topic, msg) + let defaultHandler = proc( topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = await traceHandler(topic, msg) await filterHandler(topic, msg) await archiveHandler(topic, msg) + await syncHandler(topic, msg) discard node.wakuRelay.subscribe(topic, defaultHandler) @@ -1295,6 +1334,12 @@ proc start*(node: WakuNode) {.async.} = if not node.wakuRendezvous.isNil(): await node.wakuRendezvous.start() + if not node.wakuStoreReconciliation.isNil(): + node.wakuStoreReconciliation.start() + + if not node.wakuStoreTransfer.isNil(): + node.wakuStoreTransfer.start() + ## The switch uses this mapper to update peer info addrs ## with announced addrs after start let addressMapper = proc( @@ -1334,6 +1379,12 @@ proc stop*(node: WakuNode) {.async.} = if not node.wakuStoreResume.isNil(): await node.wakuStoreResume.stopWait() + if not node.wakuStoreReconciliation.isNil(): + node.wakuStoreReconciliation.stop() + + if not node.wakuStoreTransfer.isNil(): + node.wakuStoreTransfer.stop() + if not node.wakuPeerExchange.isNil() and not node.wakuPeerExchange.pxLoopHandle.isNil(): await node.wakuPeerExchange.pxLoopHandle.cancelAndWait() diff --git a/waku/waku_archive/archive.nim b/waku/waku_archive/archive.nim index b1f30d6bd3..c357d71d0d 100644 --- a/waku/waku_archive/archive.nim +++ b/waku/waku_archive/archive.nim @@ -128,12 +128,12 @@ proc syncMessageIngress*( (await self.driver.put(msgHash, pubsubTopic, msg)).isOkOr: waku_archive_errors.inc(labelValues = [insertFailure]) trace "failed to insert message", - msg_hash = msgHash.to0xHex(), + msg_hash = msgHash.toHex(), pubsubTopic = pubsubTopic, contentTopic = msg.contentTopic, timestamp = msg.timestamp, - error = error - return err("failed to insert message") + error = $error + return err(error) trace "message archived", msg_hash = msgHash.to0xHex(), diff --git a/waku/waku_store_sync.nim b/waku/waku_store_sync.nim new file mode 100644 index 0000000000..06699d9fd0 --- /dev/null +++ b/waku/waku_store_sync.nim @@ -0,0 +1,8 @@ +{.push raises: [].} + +import + ./waku_store_sync/reconciliation, + ./waku_store_sync/transfer, + ./waku_store_sync/common + +export reconciliation, transfer, common diff --git a/waku/waku_store_sync/reconciliation.nim b/waku/waku_store_sync/reconciliation.nim index d2ade2ab39..5ad6260c9d 100644 --- a/waku/waku_store_sync/reconciliation.nim +++ b/waku/waku_store_sync/reconciliation.nim @@ -355,13 +355,13 @@ proc start*(self: SyncReconciliation) = info "Store Sync Reconciliation protocol started" -proc stopWait*(self: SyncReconciliation) {.async.} = +proc stop*(self: SyncReconciliation) = if self.syncInterval > ZeroDuration: - await self.periodicSyncFut.cancelAndWait() + self.periodicSyncFut.cancelSoon() if self.syncInterval > ZeroDuration: - await self.periodicPruneFut.cancelAndWait() + self.periodicPruneFut.cancelSoon() - await self.idsReceiverFut.cancelAndWait() + self.idsReceiverFut.cancelSoon() info "Store Sync Reconciliation protocol stopped" diff --git a/waku/waku_store_sync/storage/seq_storage.nim b/waku/waku_store_sync/storage/seq_storage.nim index 690d41abcf..b1782c22af 100644 --- a/waku/waku_store_sync/storage/seq_storage.nim +++ b/waku/waku_store_sync/storage/seq_storage.nim @@ -54,6 +54,9 @@ method prune*(self: SeqStorage, timestamp: Timestamp): int {.raises: [].} = ## Remove all elements before the timestamp. ## Returns # of elements pruned. + if self.elements.len == 0: + return 0 + let bound = SyncID(time: timestamp, hash: EmptyWakuMessageHash) let idx = self.elements.lowerBound(bound, common.cmp) diff --git a/waku/waku_store_sync/transfer.nim b/waku/waku_store_sync/transfer.nim index e384f04a51..4613c64cd1 100644 --- a/waku/waku_store_sync/transfer.nim +++ b/waku/waku_store_sync/transfer.nim @@ -16,8 +16,6 @@ import ../common/protobuf, ../waku_enr, ../waku_core/codecs, - ../waku_core/time, - ../waku_core/topics/pubsub_topic, ../waku_core/message/digest, ../waku_core/message/message, ../waku_core/message/default_values, @@ -215,10 +213,10 @@ proc start*(self: SyncTransfer) = info "Store Sync Transfer protocol started" -proc stopWait*(self: SyncTransfer) {.async.} = +proc stop*(self: SyncTransfer) = self.started = false - await self.localWantsRxFut.cancelAndWait() - await self.remoteNeedsRxFut.cancelAndWait() + self.localWantsRxFut.cancelSoon() + self.remoteNeedsRxFut.cancelSoon() info "Store Sync Transfer protocol stopped" diff --git a/waku/waku_sync/common.nim b/waku/waku_sync/common.nim deleted file mode 100644 index 4d26ff5069..0000000000 --- a/waku/waku_sync/common.nim +++ /dev/null @@ -1,34 +0,0 @@ -{.push raises: [].} - -import std/[options], chronos, libp2p/peerId -import ../waku_core - -from ../waku_core/codecs import WakuSyncCodec -export WakuSyncCodec - -const - DefaultSyncInterval*: Duration = 5.minutes - DefaultSyncRange*: Duration = 1.hours - RetryDelay*: Duration = 30.seconds - DefaultMaxFrameSize* = 1048576 # 1 MiB - DefaultGossipSubJitter*: Duration = 20.seconds - -type - TransferCallback* = proc( - hashes: seq[WakuMessageHash], peerId: PeerId - ): Future[Result[void, string]] {.async: (raises: []), closure.} - - PruneCallback* = proc( - startTime: Timestamp, endTime: Timestamp, cursor = none(WakuMessageHash) - ): Future[ - Result[(seq[(WakuMessageHash, Timestamp)], Option[WakuMessageHash]), string] - ] {.async: (raises: []), closure.} - - SyncPayload* = object - syncRange*: Option[(uint64, uint64)] - - frameSize*: Option[uint64] - - negentropy*: seq[byte] # negentropy protocol payload - - hashes*: seq[WakuMessageHash]