Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: integrate reputation manager into Lightpush behind a feature flag [WIP] #3309

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions tests/incentivization/test_poc_reputation.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import
stew/byteutils,
stint,
strutils,
tests/testlib/testasync
tests/testlib/testasync,
libp2p/[peerid, crypto/crypto]

import
waku/[node/peer_manager, waku_core],
Expand All @@ -15,16 +16,18 @@ import

suite "Waku Incentivization PoC Reputation":
var manager {.threadvar.}: ReputationManager
var peerId1 {.threadvar.}: PeerId

setup:
manager = ReputationManager.init()
peerId1 = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()

test "incentivization PoC: reputation: reputation table is empty after initialization":
check manager.reputationOf.len == 0

test "incentivization PoC: reputation: set and get reputation":
manager.setReputation("peer1", some(true)) # Encodes GoodRep
check manager.getReputation("peer1") == some(true)
manager.setReputation(peerId1, some(true)) # Encodes GoodRep
check manager.getReputation(peerId1) == some(true)

test "incentivization PoC: reputation: evaluate PushResponse valid":
let validLightpushResponse =
Expand All @@ -37,18 +40,14 @@ suite "Waku Incentivization PoC Reputation":
check evaluateResponse(invalidLightpushResponse) == BadResponse

test "incentivization PoC: reputation: updateReputationFromResponse valid":
let peerId = "peerWithValidResponse"
let validResp = PushResponse(isSuccess: true, info: some("All good"))
manager.updateReputationFromResponse(peerId, validResp)
check manager.getReputation(peerId) == some(true)
manager.updateReputationFromResponse(peerId1, validResp)
check manager.getReputation(peerId1) == some(true)

test "incentivization PoC: reputation: updateReputationFromResponse invalid":
let peerId = "peerWithInvalidResponse"
let invalidResp = PushResponse(isSuccess: false, info: none(string))
manager.updateReputationFromResponse(peerId, invalidResp)
check manager.getReputation(peerId) == some(false)
manager.updateReputationFromResponse(peerId1, invalidResp)
check manager.getReputation(peerId1) == some(false)

test "incentivization PoC: reputation: default is None":
let unknownPeerId = "unknown_peer"
# The peer is not in the table yet
check manager.getReputation(unknownPeerId) == none(bool)
check manager.getReputation(peerId1) == none(bool)
6 changes: 4 additions & 2 deletions tests/waku_lightpush/lightpush_utils.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import
waku/waku_lightpush,
waku/waku_lightpush/[client, common],
waku/common/rate_limit/setting,
../testlib/[common, wakucore]
../testlib/[common, wakucore],
waku/incentivization/reputation_manager

proc newTestWakuLightpushNode*(
switch: Switch,
Expand All @@ -26,4 +27,5 @@ proc newTestWakuLightpushNode*(

proc newTestWakuLightpushClient*(switch: Switch): WakuLightPushClient =
let peerManager = PeerManager.new(switch)
WakuLightPushClient.new(peerManager, rng)
let reputationManager = ReputationManager.new()
WakuLightPushClient.new(peerManager, reputationManager, rng)
3 changes: 1 addition & 2 deletions waku/incentivization/reputation_manager.nim
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import tables, std/options
import waku/waku_lightpush/rpc
import libp2p/peerid

type
PeerId = string

ResponseQuality* = enum
BadResponse
GoodResponse
Expand Down
47 changes: 41 additions & 6 deletions waku/waku_lightpush/client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,27 @@ import
./common,
./protocol_metrics,
./rpc,
./rpc_codec
./rpc_codec,
../incentivization/reputation_manager

logScope:
topics = "waku lightpush client"

type WakuLightPushClient* = ref object
peerManager*: PeerManager
reputationManager*: ReputationManager
rng*: ref rand.HmacDrbgContext
publishObservers: seq[PublishObserver]

proc new*(
T: type WakuLightPushClient, peerManager: PeerManager, rng: ref rand.HmacDrbgContext
T: type WakuLightPushClient,
peerManager: PeerManager,
reputationManager: ReputationManager,
rng: ref rand.HmacDrbgContext,
): T =
WakuLightPushClient(peerManager: peerManager, rng: rng)
WakuLightPushClient(
peerManager: peerManager, reputationManager: reputationManager, rng: rng
)

proc addPublishObserver*(wl: WakuLightPushClient, obs: PublishObserver) =
wl.publishObservers.add(obs)
Expand Down Expand Up @@ -65,6 +72,9 @@ proc sendPushRequest(
else:
return err("unknown failure")

when defined(reputation):
wl.reputationManager.updateReputationFromResponse(peer.peerId, response)

return ok()

proc publish*(
Expand All @@ -82,13 +92,20 @@ proc publish*(
obs.onMessagePublished(pubSubTopic, message)

notice "publishing message with lightpush",
pubsubTopic = pubsubTopic,
pubsubTopic = pubSubTopic,
contentTopic = message.contentTopic,
target_peer_id = peer.peerId,
msg_hash = msg_hash_hex_str

return ok(msg_hash_hex_str)

proc selectPeerFromPeerManager(
wl: WakuLightPushClient
): Future[Result[RemotePeerInfo, string]] {.async, gcsafe.} =
let peer = wl.peerManager.selectPeer(WakuLightPushCodec, none(PubsubTopic)).valueOr:
return err("could not retrieve a peer supporting WakuLightPushCodec")
return ok(peer)

proc publishToAny*(
wl: WakuLightPushClient, pubSubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
Expand All @@ -97,8 +114,26 @@ proc publishToAny*(

info "publishToAny", msg_hash = computeMessageHash(pubsubTopic, message).to0xHex

let peer = wl.peerManager.selectPeer(WakuLightPushCodec).valueOr:
return err("could not retrieve a peer supporting WakuLightPushCodec")
var peer: RemotePeerInfo

when defined(reputation):
const maxReputationAttempts = 10
var attempts = 0
while attempts < maxReputationAttempts:
let peerResult = await wl.selectPeerFromPeerManager()
if peerResult.isErr:
return err(peerResult.error)
peer = peerResult.get()
if not (wl.reputationManager.getReputation(peer.peerId) == some(false)):
break
attempts += 1
if attempts >= maxReputationAttempts:
warn "Maximum reputation-based retries exceeded; continuing with a bad-reputation peer."
else:
let peerResult = await wl.selectPeerFromPeerManager()
if peerResult.isErr:
return err(peerResult.error)
peer = peerResult.get()

let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message)
?await wl.sendPushRequest(pushRequest, peer)
Expand Down
Loading