Skip to content

Commit

Permalink
check shard set when syncing
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS committed Feb 3, 2025
1 parent 81a19c3 commit 4258696
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 13 deletions.
50 changes: 46 additions & 4 deletions waku/waku_store_sync/reconciliation.nim
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,46 @@ proc messageIngress*(self: SyncReconciliation, id: SyncID) =
self.storage.insert(id).isOkOr:
error "failed to insert new message", msg_hash = id.hash.toHex(), err = error

proc preProcessPayload(
self: SyncReconciliation, payload: RangesData
): Option[RangesData] =
## Check the received payload for cluster, shards and/or time mismatch.

var payload = payload

if self.cluster != payload.cluster:
return none(RangesData)

let shardsIntersection = self.shards * payload.shards.toPackedSet()

if shardsIntersection.len < 1:
return none(RangesData)

payload.shards = shardsIntersection.toSeq()

let timeRange = calculateTimeRange(self.relayJitter, self.syncRange)
let selfLowerBound = timeRange.a

# for non skip ranges check if they happen before any of our ranges
# convert to skip range in that case
for i in 0 ..< payload.ranges.len:
let rangeType = payload.ranges[i][1]
if rangeType != RangeType.Skip:
continue

let upperBound = payload.ranges[i][0].b.time
if selfLowerBound > upperBound:
payload.ranges[i][1] = RangeType.Skip

if rangeType == RangeType.Fingerprint:
payload.fingerprints.delete(0)
elif rangeType == RangeType.ItemSet:
payload.itemSets.delete(0)
else:
break

return some(payload)

proc processRequest(
self: SyncReconciliation, conn: Connection
): Future[Result[void, string]] {.async.} =
Expand Down Expand Up @@ -120,10 +160,12 @@ proc processRequest(
sendPayload: RangesData
rawPayload: seq[byte]

# Only process the ranges IF the shards and cluster matches
if self.cluster == recvPayload.cluster and
recvPayload.shards.toPackedSet() == self.shards:
sendPayload = self.storage.processPayload(recvPayload, hashToSend, hashToRecv)
let preProcessedPayloadRes = self.preProcessPayload(recvPayload)
if preProcessedPayloadRes.isSome():
let preProcessedPayload = preProcessedPayloadRes.get()

sendPayload =
self.storage.processPayload(preProcessedPayload, hashToSend, hashToRecv)

sendPayload.cluster = self.cluster
sendPayload.shards = self.shards.toSeq()
Expand Down
38 changes: 29 additions & 9 deletions waku/waku_store_sync/storage/seq_storage.nim
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import std/[algorithm, sequtils, math, options], results, chronos, stew/arrayops
import
std/[algorithm, sequtils, math, options, packedsets], results, chronos, stew/arrayops

import
../../waku_core/time,
Expand All @@ -9,6 +10,7 @@ import

type SeqStorage* = ref object of SyncStorage
elements: seq[SyncID]
shards: seq[uint16]

# Numer of parts a range will be splitted into.
partitionCount: int
Expand Down Expand Up @@ -66,7 +68,7 @@ method prune*(self: SeqStorage, timestamp: Timestamp): int {.raises: [].} =
return idx

proc computefingerprintFromSlice(
self: SeqStorage, sliceOpt: Option[Slice[int]]
self: SeqStorage, sliceOpt: Option[Slice[int]], shardSet: PackedSet[uint16]
): Fingerprint =
## XOR all hashes of a slice of the storage.

Expand All @@ -77,7 +79,15 @@ proc computefingerprintFromSlice(

let idxSlice = sliceOpt.get()

for id in self.elements[idxSlice]:
let slice = self.elements[idxSlice]

for i in 0 ..< slice.len:
let id = slice[i]
let shard = self.shards[i]

if not shardSet.contains(shard):
continue

fingerprint = fingerprint xor id.hash

return fingerprint
Expand All @@ -101,21 +111,22 @@ proc findIdxBounds(self: SeqStorage, slice: Slice[SyncID]): Option[Slice[int]] =
return some(lower ..< upper)

method computeFingerprint*(
self: SeqStorage, bounds: Slice[SyncID]
self: SeqStorage, bounds: Slice[SyncID], shardSet: PackedSet[uint16]
): Fingerprint {.raises: [].} =
let idxSliceOpt = self.findIdxBounds(bounds)
return self.computefingerprintFromSlice(idxSliceOpt)
return self.computefingerprintFromSlice(idxSliceOpt, shardSet)

proc processFingerprintRange*(
self: SeqStorage,
inputBounds: Slice[SyncID],
shardSet: PackedSet[uint16],
inputFingerprint: Fingerprint,
output: var RangesData,
) {.raises: [].} =
## Compares fingerprints and partition new ranges.

let idxSlice = self.findIdxBounds(inputBounds)
let ourFingerprint = self.computeFingerprintFromSlice(idxSlice)
let ourFingerprint = self.computeFingerprintFromSlice(idxSlice, shardSet)

if ourFingerprint == inputFingerprint:
output.ranges.add((inputBounds, RangeType.Skip))
Expand Down Expand Up @@ -153,14 +164,15 @@ proc processFingerprintRange*(
output.itemSets.add(state)
continue

let fingerprint = self.computeFingerprintFromSlice(some(slice))
let fingerprint = self.computeFingerprintFromSlice(some(slice), shardSet)
output.ranges.add((partitionBounds, RangeType.Fingerprint))
output.fingerprints.add(fingerprint)
continue

proc processItemSetRange*(
self: SeqStorage,
inputBounds: Slice[SyncID],
shardSet: PackedSet[uint16],
inputItemSet: ItemSet,
hashToSend: var seq[Fingerprint],
hashToRecv: var seq[Fingerprint],
Expand Down Expand Up @@ -190,6 +202,10 @@ proc processItemSetRange*(

while (j < m):
let ourElement = self.elements[j]
let shard = self.shards[j]

if not shardSet.contains(shard):
continue

if i >= n:
# in case we have more elements
Expand Down Expand Up @@ -234,6 +250,8 @@ method processPayload*(
i = 0
j = 0

let shardSet = input.shards.toPackedSet()

for (bounds, rangeType) in input.ranges:
case rangeType
of RangeType.Skip:
Expand All @@ -244,14 +262,16 @@ method processPayload*(
let fingerprint = input.fingerprints[i]
i.inc()

self.processFingerprintRange(bounds, fingerprint, output)
self.processFingerprintRange(bounds, shardSet, fingerprint, output)

continue
of RangeType.ItemSet:
let itemSet = input.itemsets[j]
j.inc()

self.processItemSetRange(bounds, itemSet, hashToSend, hashToRecv, output)
self.processItemSetRange(
bounds, shardSet, itemSet, hashToSend, hashToRecv, output
)

continue

Expand Down

0 comments on commit 4258696

Please sign in to comment.