Skip to content

Commit

Permalink
fix: code review
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Jun 18, 2024
1 parent 7174378 commit ffabc2b
Show file tree
Hide file tree
Showing 11 changed files with 115 additions and 52 deletions.
16 changes: 16 additions & 0 deletions tests/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,3 +437,19 @@ func WaitForTimeout(t *testing.T, ctx context.Context, timeout time.Duration, wg

wg.Wait()
}

type MockOnlineChecker struct {
online bool
}

func NewMockOnlineChecker(online bool) *MockOnlineChecker {
return &MockOnlineChecker{online: online}
}

func (o *MockOnlineChecker) SetOnline(online bool) {
o.online = online
}

func (o *MockOnlineChecker) IsOnline() bool {
return o.online
}
19 changes: 9 additions & 10 deletions waku/v2/api/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/google/uuid"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
Expand Down Expand Up @@ -39,13 +40,13 @@ type Sub struct {
cancel context.CancelFunc
log *zap.Logger
closing chan string
isNodeOnline bool //indicates if node has connectivity, this helps subscribe loop takes decision as to resubscribe or not.
onlineChecker onlinechecker.OnlineChecker
resubscribeInProgress bool
id string
}

// Subscribe
func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, online bool) (*Sub, error) {
func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger) (*Sub, error) {
sub := new(Sub)
sub.id = uuid.NewString()
sub.wf = wf
Expand All @@ -56,14 +57,16 @@ func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilte
sub.Config = config
sub.log = log.Named("filter-api").With(zap.String("apisub-id", sub.id), zap.Stringer("content-filter", sub.ContentFilter))
sub.log.Debug("filter subscribe params", zap.Int("max-peers", config.MaxPeers))
sub.isNodeOnline = online
sub.closing = make(chan string, config.MaxPeers)
if online {

sub.onlineChecker = wf.OnlineChecker()
if wf.OnlineChecker().IsOnline() {
subs, err := sub.subscribe(contentFilter, sub.Config.MaxPeers)
if err == nil {
sub.multiplex(subs)
}
}

go sub.subscriptionLoop()
return sub, nil
}
Expand All @@ -72,17 +75,13 @@ func (apiSub *Sub) Unsubscribe() {
apiSub.cancel()
}

func (apiSub *Sub) SetNodeState(online bool) {
apiSub.isNodeOnline = online
}

func (apiSub *Sub) subscriptionLoop() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if apiSub.isNodeOnline && len(apiSub.subs) < apiSub.Config.MaxPeers &&
if apiSub.onlineChecker.IsOnline() && len(apiSub.subs) < apiSub.Config.MaxPeers &&
!apiSub.resubscribeInProgress && len(apiSub.closing) < apiSub.Config.MaxPeers {
apiSub.closing <- ""
}
Expand All @@ -109,7 +108,7 @@ func (apiSub *Sub) checkAndResubscribe(subId string) {
delete(apiSub.subs, subId)
}
apiSub.log.Debug("subscription status", zap.Int("sub-count", len(apiSub.subs)), zap.Stringer("content-filter", apiSub.ContentFilter))
if apiSub.isNodeOnline && len(apiSub.subs) < apiSub.Config.MaxPeers {
if apiSub.onlineChecker.IsOnline() && len(apiSub.subs) < apiSub.Config.MaxPeers {
apiSub.resubscribe(failedPeer)
}
apiSub.resubscribeInProgress = false
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/api/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (s *FilterApiTestSuite) TestSubscribe() {
s.Require().Equal(contentFilter.PubsubTopic, s.TestTopic)

s.Log.Info("About to perform API Subscribe()")
apiSub, err := Subscribe(context.Background(), s.LightNode, contentFilter, apiConfig, s.Log, true)
apiSub, err := Subscribe(context.Background(), s.LightNode, contentFilter, apiConfig, s.Log)
s.Require().NoError(err)
s.Require().Equal(apiSub.ContentFilter, contentFilter)
s.Log.Info("Subscribed")
Expand Down
8 changes: 2 additions & 6 deletions waku/v2/node/wakunode2.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
//Initialize peer manager.
w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.opts.peerStoreCapacity, metadata, w.log)

w.peerConnector, err = peermanager.NewPeerConnectionStrategy(w.peermanager, discoveryConnectTimeout, w.log)
w.peerConnector, err = peermanager.NewPeerConnectionStrategy(w.peermanager, w.opts.onlineChecker, discoveryConnectTimeout, w.log)
if err != nil {
w.log.Error("creating peer connection strategy", zap.Error(err))
}
Expand Down Expand Up @@ -290,7 +290,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
w.opts.filterOpts = append(w.opts.filterOpts, filter.WithPeerManager(w.peermanager))

w.filterFullNode = filter.NewWakuFilterFullNode(w.timesource, w.opts.prometheusReg, w.log, w.opts.filterOpts...)
w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.prometheusReg, w.log)
w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.onlineChecker, w.opts.prometheusReg, w.log)
w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.opts.prometheusReg, w.log, w.opts.lightpushOpts...)

w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log)
Expand Down Expand Up @@ -957,7 +957,3 @@ func GetDiscv5Option(dnsDiscoveredNodes []dnsdisc.DiscoveredNode, discv5Nodes []
func (w *WakuNode) ClusterID() uint16 {
return w.opts.clusterID
}

func (w *WakuNode) PeerConnector() *peermanager.PeerConnectionStrategy {
return w.peerConnector
}
14 changes: 14 additions & 0 deletions waku/v2/node/wakuoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/prometheus/client_golang/prometheus"
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_store"
Expand Down Expand Up @@ -61,6 +62,8 @@ type WakuNodeParameters struct {
circuitRelayMinInterval time.Duration
circuitRelayBootDelay time.Duration

onlineChecker onlinechecker.OnlineChecker

enableNTP bool
ntpURLs []string

Expand Down Expand Up @@ -127,6 +130,7 @@ var DefaultWakuNodeOptions = []WakuNodeOption{
WithMaxPeerConnections(50),
WithMaxConnectionsPerIP(DefaultMaxConnectionsPerIP),
WithCircuitRelayParams(2*time.Second, 3*time.Minute),
WithOnlineChecker(onlinechecker.NewDefaultOnlineChecker(true)),
}

// MultiAddresses return the list of multiaddresses configured in the node
Expand Down Expand Up @@ -551,6 +555,16 @@ func WithTopicHealthStatusChannel(ch chan<- peermanager.TopicHealthStatus) WakuN
}
}

// WithOnlineChecker sets up an OnlineChecker which will be used to determine whether the node
// is online or not. The OnlineChecker must be implemented by consumers of go-waku since they
// have additional context to determine what it means for a node to be online/offline
func WithOnlineChecker(onlineChecker onlinechecker.OnlineChecker) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.onlineChecker = onlineChecker
return nil
}
}

// Default options used in the libp2p node
var DefaultLibP2POptions = []libp2p.Option{
libp2p.ChainOptions(
Expand Down
24 changes: 24 additions & 0 deletions waku/v2/onlinechecker/online.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package onlinechecker

// OnlineChecker is used to determine if node has connectivity.
type OnlineChecker interface {
IsOnline() bool
}

type DefaultOnlineChecker struct {
online bool
}

func NewDefaultOnlineChecker(online bool) OnlineChecker {
return &DefaultOnlineChecker{
online: online,
}
}

func (o *DefaultOnlineChecker) SetOnline(online bool) {
o.online = online
}

func (o *DefaultOnlineChecker) IsOnline() bool {
return o.online
}
55 changes: 28 additions & 27 deletions waku/v2/peermanager/peer_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/libp2p/go-libp2p/p2p/discovery/backoff"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/service"
Expand All @@ -28,10 +29,11 @@ import (
// PeerConnectionStrategy is a utility to connect to peers,
// but only if we have not recently tried connecting to them already
type PeerConnectionStrategy struct {
mux sync.Mutex
cache *lru.TwoQueueCache
host host.Host
pm *PeerManager
mux sync.Mutex
cache *lru.TwoQueueCache
host host.Host
pm *PeerManager
onlineChecker onlinechecker.OnlineChecker

paused atomic.Bool
dialTimeout time.Duration
Expand Down Expand Up @@ -60,8 +62,12 @@ func getBackOff() backoff.BackoffFactory {
//
// dialTimeout is how long we attempt to connect to a peer before giving up
// minPeers is the minimum number of peers that the node should have
func NewPeerConnectionStrategy(pm *PeerManager,
dialTimeout time.Duration, logger *zap.Logger) (*PeerConnectionStrategy, error) {
func NewPeerConnectionStrategy(
pm *PeerManager,
onlineChecker onlinechecker.OnlineChecker,
dialTimeout time.Duration,
logger *zap.Logger,
) (*PeerConnectionStrategy, error) {
// cacheSize is the size of a TwoQueueCache
cacheSize := 600
cache, err := lru.New2Q(cacheSize)
Expand All @@ -73,6 +79,7 @@ func NewPeerConnectionStrategy(pm *PeerManager,
cache: cache,
dialTimeout: dialTimeout,
CommonDiscoveryService: service.NewCommonDiscoveryService(),
onlineChecker: onlineChecker,
pm: pm,
backoff: getBackOff(),
logger: logger.Named("discovery-connector"),
Expand Down Expand Up @@ -242,32 +249,26 @@ func (c *PeerConnectionStrategy) dialPeers() {
select {
case <-c.Context().Done():
return
default:
if c.isPaused() {
time.Sleep(100 * time.Millisecond) // Sleep for a while to avoid busy waiting
case pd, ok := <-c.GetListeningChan():
if !ok {
return
}

if !c.onlineChecker.IsOnline() {
continue
}

select {
case <-c.Context().Done():
return
case pd, ok := <-c.GetListeningChan():
if !ok {
return
}
addrInfo := pd.AddrInfo
addrInfo := pd.AddrInfo

if addrInfo.ID == c.host.ID() || addrInfo.ID == "" ||
c.host.Network().Connectedness(addrInfo.ID) == network.Connected {
continue
}
if addrInfo.ID == c.host.ID() || addrInfo.ID == "" ||
c.host.Network().Connectedness(addrInfo.ID) == network.Connected {
continue
}

if c.canDialPeer(addrInfo) {
sem <- struct{}{}
c.WaitGroup().Add(1)
go c.dialPeer(addrInfo, sem)
}
default:
if c.canDialPeer(addrInfo) {
sem <- struct{}{}
c.WaitGroup().Add(1)
go c.dialPeer(addrInfo, sem)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions waku/v2/peermanager/peer_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func TestAdditionAndRemovalOfPeer(t *testing.T) {
func TestConnectToRelayPeers(t *testing.T) {

ctx, pm, deferFn := initTest(t)
pc, err := NewPeerConnectionStrategy(pm, 120*time.Second, pm.logger)
pc, err := NewPeerConnectionStrategy(pm, tests.NewMockOnlineChecker(true), 120*time.Second, pm.logger)
require.NoError(t, err)
err = pc.Start(ctx)
require.NoError(t, err)
Expand Down Expand Up @@ -254,7 +254,7 @@ func createHostWithDiscv5AndPM(t *testing.T, hostName string, topic string, enrF
require.NoError(t, err)
pm := NewPeerManager(10, 20, nil, logger)
pm.SetHost(host)
peerconn, err := NewPeerConnectionStrategy(pm, 30*time.Second, logger)
peerconn, err := NewPeerConnectionStrategy(pm, tests.NewMockOnlineChecker(true), 30*time.Second, logger)
require.NoError(t, err)
discv5, err := discv5.NewDiscoveryV5(prvKey1, localNode, peerconn, prometheus.DefaultRegisterer, logger, discv5.WithUDPPort(uint(udpPort)), discv5.WithBootnodes(bootnode))
require.NoError(t, err)
Expand Down
19 changes: 16 additions & 3 deletions waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/libp2p/go-msgio/pbio"
"github.com/prometheus/client_golang/prometheus"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
Expand Down Expand Up @@ -45,7 +46,8 @@ var (
type WakuFilterLightNode struct {
*service.CommonService
h host.Host
broadcaster relay.Broadcaster //TODO: Move the broadcast functionality outside of relay client to a higher SDK layer.s
broadcaster relay.Broadcaster //TODO: Move the broadcast functionality outside of relay client to a higher SDK layer.
onlineChecker onlinechecker.OnlineChecker
timesource timesource.Timesource
metrics Metrics
log *zap.Logger
Expand Down Expand Up @@ -79,12 +81,19 @@ func (arr *WakuFilterPushResult) Errors() []WakuFilterPushError {
// Note that broadcaster is optional.
// Takes an optional peermanager if WakuFilterLightnode is being created along with WakuNode.
// If using libp2p host, then pass peermanager as nil
func NewWakuFilterLightNode(broadcaster relay.Broadcaster, pm *peermanager.PeerManager,
timesource timesource.Timesource, reg prometheus.Registerer, log *zap.Logger) *WakuFilterLightNode {
func NewWakuFilterLightNode(
broadcaster relay.Broadcaster,
pm *peermanager.PeerManager,
timesource timesource.Timesource,
onlineChecker onlinechecker.OnlineChecker,
reg prometheus.Registerer,
log *zap.Logger,
) *WakuFilterLightNode {
wf := new(WakuFilterLightNode)
wf.log = log.Named("filterv2-lightnode")
wf.broadcaster = broadcaster
wf.timesource = timesource
wf.onlineChecker = onlineChecker
wf.pm = pm
wf.CommonService = service.NewCommonService()
wf.metrics = newMetrics(reg)
Expand Down Expand Up @@ -701,3 +710,7 @@ func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...Filte

return wf.unsubscribeAll(ctx, opts...)
}

func (wf *WakuFilterLightNode) OnlineChecker() onlinechecker.OnlineChecker {
return wf.onlineChecker
}
2 changes: 1 addition & 1 deletion waku/v2/protocol/filter/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (s *FilterTestSuite) GetWakuFilterLightNode() LightNodeData {
b := relay.NewBroadcaster(10)
s.Require().NoError(b.Start(context.Background()))
pm := peermanager.NewPeerManager(5, 5, nil, s.Log)
filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log)
filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), tests.NewMockOnlineChecker(true), prometheus.DefaultRegisterer, s.Log)
filterPush.SetHost(host)
pm.SetHost(host)
return LightNodeData{filterPush, host}
Expand Down
4 changes: 2 additions & 2 deletions waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func TestRetrieveProvidePeerExchangeWithPMAndPeerAddr(t *testing.T) {
// Prepare peer manager for host3
pm3 := peermanager.NewPeerManager(10, 20, nil, log)
pm3.SetHost(host3)
pxPeerConn3, err := peermanager.NewPeerConnectionStrategy(pm3, 30*time.Second, utils.Logger())
pxPeerConn3, err := peermanager.NewPeerConnectionStrategy(pm3, tests.NewMockOnlineChecker(true), 30*time.Second, utils.Logger())
require.NoError(t, err)
pxPeerConn3.SetHost(host3)
err = pxPeerConn3.Start(context.Background())
Expand Down Expand Up @@ -368,7 +368,7 @@ func TestRetrieveProvidePeerExchangeWithPMOnly(t *testing.T) {
// Prepare peer manager for host3
pm3 := peermanager.NewPeerManager(10, 20, nil, log)
pm3.SetHost(host3)
pxPeerConn3, err := peermanager.NewPeerConnectionStrategy(pm3, 30*time.Second, utils.Logger())
pxPeerConn3, err := peermanager.NewPeerConnectionStrategy(pm3, tests.NewMockOnlineChecker(true), 30*time.Second, utils.Logger())
require.NoError(t, err)
pxPeerConn3.SetHost(host3)
err = pxPeerConn3.Start(context.Background())
Expand Down

0 comments on commit ffabc2b

Please sign in to comment.