Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add option to specify preferred peers for filter #1250

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion waku/v2/api/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Sub struct {
type subscribeParameters struct {
batchInterval time.Duration
multiplexChannelBuffer int
preferredPeers peer.IDSlice
}

type SubscribeOptions func(*subscribeParameters)
Expand All @@ -75,6 +76,12 @@ func defaultOptions() []SubscribeOptions {
}
}

func WithPreferredServiceNodes(peers peer.IDSlice) SubscribeOptions {
return func(params *subscribeParameters) {
params.preferredPeers = peers
}
}

// Subscribe
func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, params *subscribeParameters) (*Sub, error) {
sub := new(Sub)
Expand Down Expand Up @@ -197,7 +204,16 @@ func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int
options := make([]filter.FilterSubscribeOption, 0)
options = append(options, filter.WithMaxPeersPerContentFilter(int(peerCount)))
for _, p := range apiSub.Config.Peers {
options = append(options, filter.WithPeer(p))
isExcludedPeer := false
for _, px := range peersToExclude { // configured peer can be excluded if sub fails with it.
if p == px {
isExcludedPeer = true
break
}
}
if !isExcludedPeer {
options = append(options, filter.WithPeer(p))
}
}
if len(peersToExclude) > 0 {
apiSub.log.Debug("subscribing with peers to exclude", zap.Stringers("excluded-peers", peersToExclude))
Expand Down
11 changes: 10 additions & 1 deletion waku/v2/api/filter/filter_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package filter

import (
"context"
"math/rand"
"sync"
"time"

"github.com/google/uuid"
"github.com/libp2p/go-libp2p/core/peer"

"go.uber.org/zap"
"golang.org/x/exp/maps"
Expand Down Expand Up @@ -61,7 +63,8 @@ type EnevelopeProcessor interface {
OnNewEnvelope(env *protocol.Envelope) error
}

func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int, envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode, opts ...SubscribeOptions) *FilterManager {
func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int,
envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode, opts ...SubscribeOptions) *FilterManager {
// This fn is being mocked in test
mgr := new(FilterManager)
mgr.ctx = ctx
Expand Down Expand Up @@ -162,6 +165,12 @@ func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) {
defer utils.LogOnPanic()
ctx, cancel := context.WithCancel(mgr.ctx)
config := FilterConfig{MaxPeers: mgr.minPeersPerFilter}
if len(mgr.params.preferredPeers) > 0 {
//use one peer which is from preferred peers.
randomIndex := rand.Intn(len(mgr.params.preferredPeers) - 1)
randomPreferredPeer := mgr.params.preferredPeers[randomIndex]
config.Peers = []peer.ID{randomPreferredPeer}
}
sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.params)
mgr.Lock()
mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub}
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/api/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (s *FilterApiTestSuite) TestSubscribe() {
s.Require().Equal(contentFilter.PubsubTopic, s.TestTopic)
ctx, cancel := context.WithCancel(context.Background())
s.Log.Info("About to perform API Subscribe()")
params := subscribeParameters{300 * time.Second, 1024}
params := subscribeParameters{300 * time.Second, 1024, nil}
apiSub, err := Subscribe(ctx, s.LightNode, contentFilter, apiConfig, s.Log, &params)
s.Require().NoError(err)
s.Require().Equal(apiSub.ContentFilter, contentFilter)
Expand Down
18 changes: 16 additions & 2 deletions waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,11 +333,20 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context,
params.selectedPeers = append(params.selectedPeers, pData.AddrInfo.ID)
}
reqPeerCount := params.maxPeers - len(params.selectedPeers)
for _, p := range params.selectedPeers {
if params.peersToExclude == nil {
params.peersToExclude = make(peermanager.PeerSet)
}
//exclude peers that are preferredpeers so that they don't get selected again.
if _, ok := params.peersToExclude[p]; !ok {
params.peersToExclude[p] = struct{}{}
}
}

if params.pm != nil && reqPeerCount > 0 {

wf.log.Debug("handleFilterSubscribeOptions", zap.Int("peerCount", reqPeerCount), zap.Int("excludePeersLen", len(params.peersToExclude)))
params.selectedPeers, err = wf.pm.SelectPeers(
selectedPeers, err := wf.pm.SelectPeers(
peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType,
Proto: FilterSubscribeID_v20beta1,
Expand All @@ -350,7 +359,12 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context,
)
if err != nil {
wf.log.Error("peer selection returned err", zap.Error(err))
return nil, nil, err
if len(params.selectedPeers) == 0 {
return nil, nil, err
}
}
if len(selectedPeers) > 0 {
params.selectedPeers = append(params.selectedPeers, selectedPeers...)
}
}
wf.log.Debug("handleFilterSubscribeOptions exit", zap.Int("selectedPeerCount", len(params.selectedPeers)))
Expand Down
Loading