From 827dc3822d574481807a1848dbacb4c5c577969d Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Sat, 25 May 2024 06:12:12 +0530 Subject: [PATCH] fix: allow peer-selection without pubsubTopics --- waku/v2/peermanager/peer_connector.go | 2 +- waku/v2/peermanager/peer_selection.go | 35 +++++++++++++-------------- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/waku/v2/peermanager/peer_connector.go b/waku/v2/peermanager/peer_connector.go index 6f40a1711..5de826466 100644 --- a/waku/v2/peermanager/peer_connector.go +++ b/waku/v2/peermanager/peer_connector.go @@ -130,7 +130,7 @@ func (c *PeerConnectionStrategy) consumeSubscription(s subscription) { if len(c.host.Network().Peers()) < waku_proto.GossipSubDMin { triggerImmediateConnection = true } - c.logger.Debug("adding discovered peer", logging.HostID("peerID", p.AddrInfo.ID)) + c.logger.Debug("adding discovered peer", logging.HostID("peerID", p.AddrInfo.ID), zap.Int64("origin", int64(p.Origin))) c.pm.AddDiscoveredPeer(p, triggerImmediateConnection) case <-time.After(1 * time.Second): diff --git a/waku/v2/peermanager/peer_selection.go b/waku/v2/peermanager/peer_selection.go index 6726ebee8..ee592f059 100644 --- a/waku/v2/peermanager/peer_selection.go +++ b/waku/v2/peermanager/peer_selection.go @@ -67,7 +67,6 @@ func (pm *PeerManager) SelectRandom(criteria PeerSelectionCriteria) (peer.IDSlic peerIDs = make(PeerSet) } // if not found in serviceSlots or proto == WakuRelayIDv200 - pm.logger.Debug("looking for peers in peerStore", zap.String("proto", string(criteria.Proto))) filteredPeers, err := pm.FilterPeersByProto(criteria.SpecificPeers, criteria.ExcludePeers, criteria.Proto) if err != nil { return nil, err @@ -130,14 +129,16 @@ func (pm *PeerManager) selectServicePeer(criteria PeerSelectionCriteria) (PeerSe if len(criteria.PubsubTopics) == 0 || (len(criteria.PubsubTopics) == 1 && criteria.PubsubTopics[0] == "") { return slot.getRandom(criteria.MaxPeers, criteria.ExcludePeers) } else { //PubsubTopic based selection - keys := make([]peer.ID, 0, len(slot.m)) + selectedPeers := make([]peer.ID, 0, len(slot.m)) for i := range slot.m { if PeerInSet(criteria.ExcludePeers, i) { continue } - keys = append(keys, i) + selectedPeers = append(selectedPeers, i) + } + if len(criteria.PubsubTopics) > 0 { + selectedPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, selectedPeers...) } - selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, keys...) tmpPeers, err := selectRandomPeers(selectedPeers, criteria.ExcludePeers, criteria.MaxPeers) for tmpPeer := range tmpPeers { peers[tmpPeer] = struct{}{} @@ -145,12 +146,16 @@ func (pm *PeerManager) selectServicePeer(criteria PeerSelectionCriteria) (PeerSe if err == nil && len(peers) == criteria.MaxPeers { return peers, nil } else { - pm.logger.Debug("discovering peers by pubsubTopic", zap.Strings("pubsubTopics", criteria.PubsubTopics)) - //Trigger on-demand discovery for this topic and connect to peer immediately. - //For now discover atleast 1 peer for the criteria - pm.discoverPeersByPubsubTopics(criteria.PubsubTopics, criteria.Proto, criteria.Ctx, 1) - //Try to fetch peers again. - continue + if len(criteria.PubsubTopics) > 0 { + + pm.logger.Debug("discovering peers by pubsubTopic", zap.Strings("pubsubTopics", criteria.PubsubTopics)) + //Trigger on-demand discovery for this topic and connect to peer immediately. + //For now discover atleast 1 peer for the criteria + pm.discoverPeersByPubsubTopics(criteria.PubsubTopics, criteria.Proto, criteria.Ctx, 1) + //Try to fetch peers again. + continue + } + break } } } @@ -186,12 +191,7 @@ func (pm *PeerManager) SelectPeers(criteria PeerSelectionCriteria) (peer.IDSlice if criteria.MaxPeers == 0 { criteria.MaxPeers = 1 } - excPeers := maps.Keys(criteria.ExcludePeers) - var excPeer peer.ID - if len(excPeers) > 0 { - excPeer = excPeers[0] - } - pm.logger.Debug("Select Peers", zap.Stringer("selectionCriteria", criteria), zap.Stringer("excludedPeers", excPeer)) + switch criteria.SelectionType { case Automatic: return pm.SelectRandom(criteria) @@ -220,7 +220,7 @@ func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) ( criteria.Ctx = context.Background() } - if len(criteria.PubsubTopics) == 0 || (len(criteria.PubsubTopics) == 1 && criteria.PubsubTopics[0] == "") { + if len(criteria.PubsubTopics) > 0 { peers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, criteria.SpecificPeers...) } @@ -253,6 +253,5 @@ func (pm *PeerManager) FilterPeersByProto(specificPeers peer.IDSlice, excludePee peers = append(peers, peer) } } - pm.logger.Debug("peers selected", zap.Int("peerCnt", len(peers))) return peers, nil }