From 73e31f49774b7d62a49e3f6c7f742a738cffd7c3 Mon Sep 17 00:00:00 2001 From: Sergei Tikhomirov Date: Wed, 26 Feb 2025 16:30:15 +0100 Subject: [PATCH 1/2] integrate reputation manager into Lightpush behind a feature flag --- tests/incentivization/test_poc_reputation.nim | 2 +- tests/waku_lightpush/lightpush_utils.nim | 6 ++- waku/incentivization/reputation_manager.nim | 13 +++-- waku/waku_lightpush/client.nim | 47 ++++++++++++++++--- 4 files changed, 52 insertions(+), 16 deletions(-) diff --git a/tests/incentivization/test_poc_reputation.nim b/tests/incentivization/test_poc_reputation.nim index b35c4b92fc..5c0bdca0ca 100644 --- a/tests/incentivization/test_poc_reputation.nim +++ b/tests/incentivization/test_poc_reputation.nim @@ -46,7 +46,7 @@ suite "Waku Incentivization PoC Reputation": let peerId = "peerWithInvalidResponse" let invalidResp = PushResponse(isSuccess: false, info: none(string)) manager.updateReputationFromResponse(peerId, invalidResp) - check manager.getReputation(peerId) == some(false) + check manager.getReputation(peerId) == some(false) test "incentivization PoC: reputation: default is None": let unknownPeerId = "unknown_peer" diff --git a/tests/waku_lightpush/lightpush_utils.nim b/tests/waku_lightpush/lightpush_utils.nim index 45bbe125c6..a2a13a415b 100644 --- a/tests/waku_lightpush/lightpush_utils.nim +++ b/tests/waku_lightpush/lightpush_utils.nim @@ -8,7 +8,8 @@ import waku/waku_lightpush, waku/waku_lightpush/[client, common], waku/common/rate_limit/setting, - ../testlib/[common, wakucore] + ../testlib/[common, wakucore], + waku/incentivization/reputation_manager proc newTestWakuLightpushNode*( switch: Switch, @@ -26,4 +27,5 @@ proc newTestWakuLightpushNode*( proc newTestWakuLightpushClient*(switch: Switch): WakuLightPushClient = let peerManager = PeerManager.new(switch) - WakuLightPushClient.new(peerManager, rng) + let reputationManager = ReputationManager.new() + WakuLightPushClient.new(peerManager, reputationManager, rng) diff --git a/waku/incentivization/reputation_manager.nim b/waku/incentivization/reputation_manager.nim index d5097b711b..95de0a8cb4 100644 --- a/waku/incentivization/reputation_manager.nim +++ b/waku/incentivization/reputation_manager.nim @@ -1,9 +1,8 @@ import tables, std/options import waku/waku_lightpush/rpc +import libp2p/peerid type - PeerId = string - ResponseQuality* = enum BadResponse GoodResponse @@ -13,17 +12,17 @@ type # some(false) => BadRep # none(bool) => unknown / not set ReputationManager* = ref object - reputationOf*: Table[PeerId, Option[bool]] + reputationOf*: Table[PeerID, Option[bool]] proc init*(T: type ReputationManager): ReputationManager = - return ReputationManager(reputationOf: initTable[PeerId, Option[bool]]()) + return ReputationManager(reputationOf: initTable[PeerID, Option[bool]]()) proc setReputation*( - manager: var ReputationManager, peer: PeerId, repValue: Option[bool] + manager: var ReputationManager, peer: PeerID, repValue: Option[bool] ) = manager.reputationOf[peer] = repValue -proc getReputation*(manager: ReputationManager, peer: PeerId): Option[bool] = +proc getReputation*(manager: ReputationManager, peer: PeerID): Option[bool] = if peer in manager.reputationOf: result = manager.reputationOf[peer] else: @@ -38,7 +37,7 @@ proc evaluateResponse*(response: PushResponse): ResponseQuality = # Update reputation of the peer based on the quality of the response proc updateReputationFromResponse*( - manager: var ReputationManager, peer: PeerId, response: PushResponse + manager: var ReputationManager, peer: PeerID, response: PushResponse ) = let respQuality = evaluateResponse(response) case respQuality diff --git a/waku/waku_lightpush/client.nim b/waku/waku_lightpush/client.nim index 3e20bf9e35..66e9fa8564 100644 --- a/waku/waku_lightpush/client.nim +++ b/waku/waku_lightpush/client.nim @@ -11,20 +11,27 @@ import ./common, ./protocol_metrics, ./rpc, - ./rpc_codec + ./rpc_codec, + ../incentivization/reputation_manager logScope: topics = "waku lightpush client" type WakuLightPushClient* = ref object peerManager*: PeerManager + reputationManager*: ReputationManager rng*: ref rand.HmacDrbgContext publishObservers: seq[PublishObserver] proc new*( - T: type WakuLightPushClient, peerManager: PeerManager, rng: ref rand.HmacDrbgContext + T: type WakuLightPushClient, + peerManager: PeerManager, + reputationManager: ReputationManager, + rng: ref rand.HmacDrbgContext, ): T = - WakuLightPushClient(peerManager: peerManager, rng: rng) + WakuLightPushClient( + peerManager: peerManager, reputationManager: reputationManager, rng: rng + ) proc addPublishObserver*(wl: WakuLightPushClient, obs: PublishObserver) = wl.publishObservers.add(obs) @@ -65,6 +72,9 @@ proc sendPushRequest( else: return err("unknown failure") + when defined(reputation): + wl.reputationManager.updateReputationFromResponse(peer.peerId, response) + return ok() proc publish*( @@ -82,13 +92,20 @@ proc publish*( obs.onMessagePublished(pubSubTopic, message) notice "publishing message with lightpush", - pubsubTopic = pubsubTopic, + pubsubTopic = pubSubTopic, contentTopic = message.contentTopic, target_peer_id = peer.peerId, msg_hash = msg_hash_hex_str return ok(msg_hash_hex_str) +proc selectPeerFromPeerManager( + wl: WakuLightPushClient +): Future[Result[RemotePeerInfo, string]] {.async, gcsafe.} = + let peer = wl.peerManager.selectPeer(WakuLightPushCodec, none(PubsubTopic)).valueOr: + return err("could not retrieve a peer supporting WakuLightPushCodec") + return ok(peer) + proc publishToAny*( wl: WakuLightPushClient, pubSubTopic: PubsubTopic, message: WakuMessage ): Future[WakuLightPushResult[void]] {.async, gcsafe.} = @@ -97,8 +114,26 @@ proc publishToAny*( info "publishToAny", msg_hash = computeMessageHash(pubsubTopic, message).to0xHex - let peer = wl.peerManager.selectPeer(WakuLightPushCodec).valueOr: - return err("could not retrieve a peer supporting WakuLightPushCodec") + var peer: RemotePeerInfo + + when defined(reputation): + const maxReputationAttempts = 10 + var attempts = 0 + while attempts < maxReputationAttempts: + let peerResult = await wl.selectPeerFromPeerManager() + if peerResult.isErr: + return err(peerResult.error) + peer = peerResult.get() + if not (wl.reputationManager.getReputation(peer.peerId) == some(false)): + break + attempts += 1 + if attempts >= maxReputationAttempts: + warn "Maximum reputation-based retries exceeded; continuing with a bad-reputation peer." + else: + let peerResult = await wl.selectPeerFromPeerManager() + if peerResult.isErr: + return err(peerResult.error) + peer = peerResult.get() let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message) ?await wl.sendPushRequest(pushRequest, peer) From eee3c9585414811d432b76fd1d8fe5bbfffa667e Mon Sep 17 00:00:00 2001 From: Sergei Tikhomirov Date: Wed, 26 Feb 2025 18:59:54 +0100 Subject: [PATCH 2/2] use PeerId properly for reputation tests --- tests/incentivization/test_poc_reputation.nim | 23 +++++++++---------- waku/incentivization/reputation_manager.nim | 10 ++++---- 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/tests/incentivization/test_poc_reputation.nim b/tests/incentivization/test_poc_reputation.nim index 5c0bdca0ca..72333f101e 100644 --- a/tests/incentivization/test_poc_reputation.nim +++ b/tests/incentivization/test_poc_reputation.nim @@ -6,7 +6,8 @@ import stew/byteutils, stint, strutils, - tests/testlib/testasync + tests/testlib/testasync, + libp2p/[peerid, crypto/crypto] import waku/[node/peer_manager, waku_core], @@ -15,16 +16,18 @@ import suite "Waku Incentivization PoC Reputation": var manager {.threadvar.}: ReputationManager + var peerId1 {.threadvar.}: PeerId setup: manager = ReputationManager.init() + peerId1 = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() test "incentivization PoC: reputation: reputation table is empty after initialization": check manager.reputationOf.len == 0 test "incentivization PoC: reputation: set and get reputation": - manager.setReputation("peer1", some(true)) # Encodes GoodRep - check manager.getReputation("peer1") == some(true) + manager.setReputation(peerId1, some(true)) # Encodes GoodRep + check manager.getReputation(peerId1) == some(true) test "incentivization PoC: reputation: evaluate PushResponse valid": let validLightpushResponse = @@ -37,18 +40,14 @@ suite "Waku Incentivization PoC Reputation": check evaluateResponse(invalidLightpushResponse) == BadResponse test "incentivization PoC: reputation: updateReputationFromResponse valid": - let peerId = "peerWithValidResponse" let validResp = PushResponse(isSuccess: true, info: some("All good")) - manager.updateReputationFromResponse(peerId, validResp) - check manager.getReputation(peerId) == some(true) + manager.updateReputationFromResponse(peerId1, validResp) + check manager.getReputation(peerId1) == some(true) test "incentivization PoC: reputation: updateReputationFromResponse invalid": - let peerId = "peerWithInvalidResponse" let invalidResp = PushResponse(isSuccess: false, info: none(string)) - manager.updateReputationFromResponse(peerId, invalidResp) - check manager.getReputation(peerId) == some(false) + manager.updateReputationFromResponse(peerId1, invalidResp) + check manager.getReputation(peerId1) == some(false) test "incentivization PoC: reputation: default is None": - let unknownPeerId = "unknown_peer" - # The peer is not in the table yet - check manager.getReputation(unknownPeerId) == none(bool) + check manager.getReputation(peerId1) == none(bool) diff --git a/waku/incentivization/reputation_manager.nim b/waku/incentivization/reputation_manager.nim index 95de0a8cb4..f911a5351a 100644 --- a/waku/incentivization/reputation_manager.nim +++ b/waku/incentivization/reputation_manager.nim @@ -12,17 +12,17 @@ type # some(false) => BadRep # none(bool) => unknown / not set ReputationManager* = ref object - reputationOf*: Table[PeerID, Option[bool]] + reputationOf*: Table[PeerId, Option[bool]] proc init*(T: type ReputationManager): ReputationManager = - return ReputationManager(reputationOf: initTable[PeerID, Option[bool]]()) + return ReputationManager(reputationOf: initTable[PeerId, Option[bool]]()) proc setReputation*( - manager: var ReputationManager, peer: PeerID, repValue: Option[bool] + manager: var ReputationManager, peer: PeerId, repValue: Option[bool] ) = manager.reputationOf[peer] = repValue -proc getReputation*(manager: ReputationManager, peer: PeerID): Option[bool] = +proc getReputation*(manager: ReputationManager, peer: PeerId): Option[bool] = if peer in manager.reputationOf: result = manager.reputationOf[peer] else: @@ -37,7 +37,7 @@ proc evaluateResponse*(response: PushResponse): ResponseQuality = # Update reputation of the peer based on the quality of the response proc updateReputationFromResponse*( - manager: var ReputationManager, peer: PeerID, response: PushResponse + manager: var ReputationManager, peer: PeerId, response: PushResponse ) = let respQuality = evaluateResponse(response) case respQuality