diff --git a/tests/incentivization/test_poc_reputation.nim b/tests/incentivization/test_poc_reputation.nim index b35c4b92fc..72333f101e 100644 --- a/tests/incentivization/test_poc_reputation.nim +++ b/tests/incentivization/test_poc_reputation.nim @@ -6,7 +6,8 @@ import stew/byteutils, stint, strutils, - tests/testlib/testasync + tests/testlib/testasync, + libp2p/[peerid, crypto/crypto] import waku/[node/peer_manager, waku_core], @@ -15,16 +16,18 @@ import suite "Waku Incentivization PoC Reputation": var manager {.threadvar.}: ReputationManager + var peerId1 {.threadvar.}: PeerId setup: manager = ReputationManager.init() + peerId1 = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() test "incentivization PoC: reputation: reputation table is empty after initialization": check manager.reputationOf.len == 0 test "incentivization PoC: reputation: set and get reputation": - manager.setReputation("peer1", some(true)) # Encodes GoodRep - check manager.getReputation("peer1") == some(true) + manager.setReputation(peerId1, some(true)) # Encodes GoodRep + check manager.getReputation(peerId1) == some(true) test "incentivization PoC: reputation: evaluate PushResponse valid": let validLightpushResponse = @@ -37,18 +40,14 @@ suite "Waku Incentivization PoC Reputation": check evaluateResponse(invalidLightpushResponse) == BadResponse test "incentivization PoC: reputation: updateReputationFromResponse valid": - let peerId = "peerWithValidResponse" let validResp = PushResponse(isSuccess: true, info: some("All good")) - manager.updateReputationFromResponse(peerId, validResp) - check manager.getReputation(peerId) == some(true) + manager.updateReputationFromResponse(peerId1, validResp) + check manager.getReputation(peerId1) == some(true) test "incentivization PoC: reputation: updateReputationFromResponse invalid": - let peerId = "peerWithInvalidResponse" let invalidResp = PushResponse(isSuccess: false, info: none(string)) - manager.updateReputationFromResponse(peerId, invalidResp) - check manager.getReputation(peerId) == some(false) + manager.updateReputationFromResponse(peerId1, invalidResp) + check manager.getReputation(peerId1) == some(false) test "incentivization PoC: reputation: default is None": - let unknownPeerId = "unknown_peer" - # The peer is not in the table yet - check manager.getReputation(unknownPeerId) == none(bool) + check manager.getReputation(peerId1) == none(bool) diff --git a/tests/waku_lightpush/lightpush_utils.nim b/tests/waku_lightpush/lightpush_utils.nim index 45bbe125c6..a2a13a415b 100644 --- a/tests/waku_lightpush/lightpush_utils.nim +++ b/tests/waku_lightpush/lightpush_utils.nim @@ -8,7 +8,8 @@ import waku/waku_lightpush, waku/waku_lightpush/[client, common], waku/common/rate_limit/setting, - ../testlib/[common, wakucore] + ../testlib/[common, wakucore], + waku/incentivization/reputation_manager proc newTestWakuLightpushNode*( switch: Switch, @@ -26,4 +27,5 @@ proc newTestWakuLightpushNode*( proc newTestWakuLightpushClient*(switch: Switch): WakuLightPushClient = let peerManager = PeerManager.new(switch) - WakuLightPushClient.new(peerManager, rng) + let reputationManager = ReputationManager.new() + WakuLightPushClient.new(peerManager, reputationManager, rng) diff --git a/waku/incentivization/reputation_manager.nim b/waku/incentivization/reputation_manager.nim index d5097b711b..f911a5351a 100644 --- a/waku/incentivization/reputation_manager.nim +++ b/waku/incentivization/reputation_manager.nim @@ -1,9 +1,8 @@ import tables, std/options import waku/waku_lightpush/rpc +import libp2p/peerid type - PeerId = string - ResponseQuality* = enum BadResponse GoodResponse diff --git a/waku/waku_lightpush/client.nim b/waku/waku_lightpush/client.nim index 3e20bf9e35..66e9fa8564 100644 --- a/waku/waku_lightpush/client.nim +++ b/waku/waku_lightpush/client.nim @@ -11,20 +11,27 @@ import ./common, ./protocol_metrics, ./rpc, - ./rpc_codec + ./rpc_codec, + ../incentivization/reputation_manager logScope: topics = "waku lightpush client" type WakuLightPushClient* = ref object peerManager*: PeerManager + reputationManager*: ReputationManager rng*: ref rand.HmacDrbgContext publishObservers: seq[PublishObserver] proc new*( - T: type WakuLightPushClient, peerManager: PeerManager, rng: ref rand.HmacDrbgContext + T: type WakuLightPushClient, + peerManager: PeerManager, + reputationManager: ReputationManager, + rng: ref rand.HmacDrbgContext, ): T = - WakuLightPushClient(peerManager: peerManager, rng: rng) + WakuLightPushClient( + peerManager: peerManager, reputationManager: reputationManager, rng: rng + ) proc addPublishObserver*(wl: WakuLightPushClient, obs: PublishObserver) = wl.publishObservers.add(obs) @@ -65,6 +72,9 @@ proc sendPushRequest( else: return err("unknown failure") + when defined(reputation): + wl.reputationManager.updateReputationFromResponse(peer.peerId, response) + return ok() proc publish*( @@ -82,13 +92,20 @@ proc publish*( obs.onMessagePublished(pubSubTopic, message) notice "publishing message with lightpush", - pubsubTopic = pubsubTopic, + pubsubTopic = pubSubTopic, contentTopic = message.contentTopic, target_peer_id = peer.peerId, msg_hash = msg_hash_hex_str return ok(msg_hash_hex_str) +proc selectPeerFromPeerManager( + wl: WakuLightPushClient +): Future[Result[RemotePeerInfo, string]] {.async, gcsafe.} = + let peer = wl.peerManager.selectPeer(WakuLightPushCodec, none(PubsubTopic)).valueOr: + return err("could not retrieve a peer supporting WakuLightPushCodec") + return ok(peer) + proc publishToAny*( wl: WakuLightPushClient, pubSubTopic: PubsubTopic, message: WakuMessage ): Future[WakuLightPushResult[void]] {.async, gcsafe.} = @@ -97,8 +114,26 @@ proc publishToAny*( info "publishToAny", msg_hash = computeMessageHash(pubsubTopic, message).to0xHex - let peer = wl.peerManager.selectPeer(WakuLightPushCodec).valueOr: - return err("could not retrieve a peer supporting WakuLightPushCodec") + var peer: RemotePeerInfo + + when defined(reputation): + const maxReputationAttempts = 10 + var attempts = 0 + while attempts < maxReputationAttempts: + let peerResult = await wl.selectPeerFromPeerManager() + if peerResult.isErr: + return err(peerResult.error) + peer = peerResult.get() + if not (wl.reputationManager.getReputation(peer.peerId) == some(false)): + break + attempts += 1 + if attempts >= maxReputationAttempts: + warn "Maximum reputation-based retries exceeded; continuing with a bad-reputation peer." + else: + let peerResult = await wl.selectPeerFromPeerManager() + if peerResult.isErr: + return err(peerResult.error) + peer = peerResult.get() let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message) ?await wl.sendPushRequest(pushRequest, peer)