Skip to content

Commit

Permalink
feat: draft changes for optimizing filter ping
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem committed May 16, 2024
1 parent 6e47bd1 commit c636567
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 104 deletions.
112 changes: 24 additions & 88 deletions waku/v2/api/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ package api
import (
"context"
"encoding/json"
"sync"
"time"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)

const FilterPingTimeout = 5 * time.Second
Expand Down Expand Up @@ -39,6 +37,7 @@ type Sub struct {
ctx context.Context
cancel context.CancelFunc
log *zap.Logger
closing chan string
}

// Subscribe
Expand Down Expand Up @@ -77,13 +76,17 @@ func (apiSub *Sub) healthCheckLoop() {
apiSub.log.Debug("healthCheckLoop: Done()")
apiSub.cleanup()
return
case <-ticker.C:
apiSub.log.Debug("healthCheckLoop: checkAliveness()")
topicCounts := apiSub.getTopicCounts()
apiSub.resubscribe(topicCounts)
case subId := <-apiSub.closing:
//trigger closing and resubscribe flow for subscription.
apiSub.closeAndResubscribe(subId)
}
}
}

func (apiSub *Sub) closeAndResubscribe(subId string) {
apiSub.subs[subId].Close()
apiSub.resubscribe()
delete(apiSub.subs, subId)
}

func (apiSub *Sub) cleanup() {
Expand All @@ -103,93 +106,19 @@ func (apiSub *Sub) cleanup() {

}

// Returns active sub counts for each pubsub topic
func (apiSub *Sub) getTopicCounts() map[string]int {
// Buffered chan for sub aliveness results
type CheckResult struct {
sub *subscription.SubscriptionDetails
alive bool
}
checkResults := make(chan CheckResult, len(apiSub.subs))
var wg sync.WaitGroup

// Run pings asynchronously
for _, s := range apiSub.subs {
wg.Add(1)
go func(sub *subscription.SubscriptionDetails) {
defer wg.Done()
ctx, cancelFunc := context.WithTimeout(apiSub.ctx, FilterPingTimeout)
defer cancelFunc()
err := apiSub.wf.IsSubscriptionAlive(ctx, sub)

apiSub.log.Debug("Check result:", zap.Any("subID", sub.ID), zap.Bool("result", err == nil))
checkResults <- CheckResult{sub, err == nil}
}(s)
}

// Collect healthy topic counts
topicCounts := make(map[string]int)

topicMap, _ := protocol.ContentFilterToPubSubTopicMap(apiSub.ContentFilter)
for _, t := range maps.Keys(topicMap) {
topicCounts[t] = 0
}
wg.Wait()
close(checkResults)
for s := range checkResults {
if !s.alive {
// Close inactive subs
s.sub.Close()
delete(apiSub.subs, s.sub.ID)
} else {
topicCounts[s.sub.ContentFilter.PubsubTopic]++
}
}

return topicCounts
}

// Attempts to resubscribe on topics that lack subscriptions
func (apiSub *Sub) resubscribe(topicCounts map[string]int) {

// Delete healthy topics
for t, cnt := range topicCounts {
if cnt == apiSub.Config.MaxPeers {
delete(topicCounts, t)
}
}
func (apiSub *Sub) resubscribe() {
// Re-subscribe asynchronously
count := len(apiSub.subs) - 1

if len(topicCounts) == 0 {
// All topics healthy, return
subs, err := apiSub.subscribe(apiSub.ContentFilter, apiSub.Config.MaxPeers-count)
if err != nil {
return
}
var wg sync.WaitGroup
} //Not handling scenario where all requested subs are not received as that will get handled in next cycle.

// Re-subscribe asynchronously
newSubs := make(chan []*subscription.SubscriptionDetails)

for t, cnt := range topicCounts {
cFilter := protocol.ContentFilter{PubsubTopic: t, ContentTopics: apiSub.ContentFilter.ContentTopics}
wg.Add(1)
go func(count int) {
defer wg.Done()
subs, err := apiSub.subscribe(cFilter, apiSub.Config.MaxPeers-count)
if err != nil {
return
} //Not handling scenario where all requested subs are not received as that will get handled in next cycle.
newSubs <- subs
}(cnt)
}
wg.Wait()
close(newSubs)
apiSub.log.Debug("resubscribe(): before range newSubs")
for subs := range newSubs {
if subs != nil {
apiSub.multiplex(subs)
}
}
apiSub.log.Debug("checkAliveness(): close(newSubs)")
//close(newSubs)

apiSub.multiplex(subs)
}

func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int) ([]*subscription.SubscriptionDetails, error) {
Expand Down Expand Up @@ -229,5 +158,12 @@ func (apiSub *Sub) multiplex(subs []*subscription.SubscriptionDetails) {
apiSub.DataCh <- env
}
}(subDetails)
go func(subDetails *subscription.SubscriptionDetails) {
_, ok := <-subDetails.Closing
if !ok {
return
}
apiSub.closing <- subDetails.ID
}(subDetails)
}
}
23 changes: 12 additions & 11 deletions waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/http"
"strings"
"sync"
"time"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
Expand Down Expand Up @@ -43,13 +44,14 @@ var (

type WakuFilterLightNode struct {
*service.CommonService
h host.Host
broadcaster relay.Broadcaster //TODO: Move the broadcast functionality outside of relay client to a higher SDK layer.s
timesource timesource.Timesource
metrics Metrics
log *zap.Logger
subscriptions *subscription.SubscriptionsMap
pm *peermanager.PeerManager
h host.Host
broadcaster relay.Broadcaster //TODO: Move the broadcast functionality outside of relay client to a higher SDK layer.s
timesource timesource.Timesource
metrics Metrics
log *zap.Logger
subscriptions *subscription.SubscriptionsMap
pm *peermanager.PeerManager
peerPingInterval time.Duration
}

type WakuFilterPushError struct {
Expand Down Expand Up @@ -86,7 +88,6 @@ func NewWakuFilterLightNode(broadcaster relay.Broadcaster, pm *peermanager.PeerM
wf.pm = pm
wf.CommonService = service.NewCommonService()
wf.metrics = newMetrics(reg)

return wf
}

Expand All @@ -97,13 +98,13 @@ func (wf *WakuFilterLightNode) SetHost(h host.Host) {

func (wf *WakuFilterLightNode) Start(ctx context.Context) error {
return wf.CommonService.Start(ctx, wf.start)

}

func (wf *WakuFilterLightNode) start() error {
wf.subscriptions = subscription.NewSubscriptionMap(wf.log)
wf.h.SetStreamHandlerMatch(FilterPushID_v20beta1, protocol.PrefixTextMatch(string(FilterPushID_v20beta1)), wf.onRequest(wf.Context()))

//Start Filter liveness check
go wf.FilterHealthCheckLoop()
wf.log.Info("filter-push protocol started")
return nil
}
Expand Down Expand Up @@ -463,7 +464,7 @@ func (wf *WakuFilterLightNode) IsSubscriptionAlive(ctx context.Context, subscrip
if err := wf.ErrOnNotRunning(); err != nil {
return err
}

//TODO: Don't ping, rather check status of last ping and return status?? or something else.
return wf.Ping(ctx, subscription.PeerID)
}

Expand Down
32 changes: 32 additions & 0 deletions waku/v2/protocol/filter/filter_health_check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package filter

import (
"context"
"time"
)

func (wf *WakuFilterLightNode) PingPeers() {
//Send a ping to all the peers and report their status to corresponding subscriptions
// Alive or not or set state of subcription??
for _, peer := range wf.subscriptions.GetSubscribedPeers() {
err := wf.Ping(context.TODO(), peer)
if err != nil {
subscriptions := wf.subscriptions.GetAllSubscriptionsForPeer(peer)
for _, subscription := range subscriptions {
//Indicating that subscription is closing
//This feels like a hack, but taking this approach for now so as to avoid refactoring.
subscription.Closing <- true
}
}
}
}

func (wf *WakuFilterLightNode) FilterHealthCheckLoop() {
wf.CommonService.WaitGroup().Add(1)
defer wf.WaitGroup().Done()
for {
//TODO: Do we have to wait for wf.ctx context completion and exit as well?
time.Sleep(wf.peerPingInterval)
wf.PingPeers()
}
}
11 changes: 6 additions & 5 deletions waku/v2/protocol/subscription/subscription_details.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ type PeerContentFilter struct {
type SubscriptionDetails struct {
sync.RWMutex

ID string `json:"subscriptionID"`
mapRef *SubscriptionsMap
Closed bool `json:"-"`
once sync.Once
ID string `json:"subscriptionID"`
mapRef *SubscriptionsMap
Closed bool `json:"-"`
once sync.Once
Closing chan bool

PeerID peer.ID `json:"peerID"`
ContentFilter protocol.ContentFilter `json:"contentFilters"`
Expand Down Expand Up @@ -96,7 +97,7 @@ func (s *SubscriptionDetails) CloseC() {
s.once.Do(func() {
s.Lock()
defer s.Unlock()

close(s.Closing)
s.Closed = true
close(s.C)
})
Expand Down
25 changes: 25 additions & 0 deletions waku/v2/protocol/subscription/subscriptions_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func (sub *SubscriptionsMap) NewSubscription(peerID peer.ID, cf protocol.Content
PeerID: peerID,
C: make(chan *protocol.Envelope, 1024),
ContentFilter: protocol.ContentFilter{PubsubTopic: cf.PubsubTopic, ContentTopics: maps.Clone(cf.ContentTopics)},
Closing: make(chan bool),
}

// Increase the number of subscriptions for this (pubsubTopic, contentTopic) pair
Expand Down Expand Up @@ -218,6 +219,30 @@ func (m *SubscriptionsMap) GetSubscriptionsForPeer(peerID peer.ID, contentFilter
return output
}

func (m *SubscriptionsMap) GetAllSubscriptionsForPeer(peerID peer.ID) []*SubscriptionDetails {
m.RLock()
defer m.RUnlock()

var output []*SubscriptionDetails
for _, peerSubs := range m.items {
if peerSubs.PeerID == peerID {
for _, subs := range peerSubs.SubsPerPubsubTopic {
for _, subscriptionDetail := range subs {
output = append(output, subscriptionDetail)
}
}
break
}
}
return output
}

func (m *SubscriptionsMap) GetSubscribedPeers() peer.IDSlice {
m.RLock()
defer m.RUnlock()
return maps.Keys(m.items)
}

func (m *SubscriptionsMap) GetAllSubscriptions() []*SubscriptionDetails {
return m.GetSubscriptionsForPeer("", protocol.ContentFilter{})
}

0 comments on commit c636567

Please sign in to comment.