Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: filter rate limit #1258

Merged
merged 3 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/waku/server/rest/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
"golang.org/x/time/rate"
)

func createNode(t *testing.T, opts ...node.WakuNodeOption) *node.WakuNode {
Expand All @@ -37,8 +38,8 @@ func createNode(t *testing.T, opts ...node.WakuNodeOption) *node.WakuNode {

// node2 connects to node1
func twoFilterConnectedNodes(t *testing.T, pubSubTopics ...string) (*node.WakuNode, *node.WakuNode) {
node1 := createNode(t, node.WithWakuFilterFullNode()) // full node filter
node2 := createNode(t, node.WithWakuFilterLightNode()) // light node filter
node1 := createNode(t, node.WithWakuFilterFullNode(filter.WithFullNodeRateLimiter(rate.Inf, 0))) // full node filter
node2 := createNode(t, node.WithWakuFilterLightNode()) // light node filter

node2.Host().Peerstore().AddAddr(node1.Host().ID(), tests.GetHostAddress(node1.Host()), peerstore.PermanentAddrTTL)
err := node2.Host().Peerstore().AddProtocols(node1.Host().ID(), filter.FilterSubscribeID_v20beta1)
Expand Down
2 changes: 1 addition & 1 deletion default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pkgs.buildGo121Module {
'' else "";

# FIXME: This needs to be manually changed when updating modules.
vendorHash = "sha256-TrKlv3UHhFl+1HviEYFTmOpF+UiVdL6h53IkJXBFsRo=";
vendorHash = "sha256-yQ3anfZ/PU0M0KHiXqA9Ri8zFkg1nTYIk43jmcdGZYU=";

# Fix for 'nix run' trying to execute 'go-waku'.
meta = { mainProgram = "waku"; };
Expand Down
1 change: 1 addition & 0 deletions examples/basic-light-client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ require (
github.com/ipfs/go-log/v2 v2.5.1 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jellydator/ttlcache/v3 v3.3.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions examples/basic-light-client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,8 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc=
github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
Expand Down
1 change: 1 addition & 0 deletions examples/basic-relay/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ require (
github.com/ipfs/go-log/v2 v2.5.1 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jellydator/ttlcache/v3 v3.3.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions examples/basic-relay/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,8 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc=
github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
Expand Down
1 change: 1 addition & 0 deletions examples/chat2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ require (
github.com/ipfs/go-cid v0.4.1 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jellydator/ttlcache/v3 v3.3.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions examples/chat2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,8 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc=
github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
Expand Down
1 change: 1 addition & 0 deletions examples/filter2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ require (
github.com/ipfs/go-cid v0.4.1 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jellydator/ttlcache/v3 v3.3.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions examples/filter2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,8 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc=
github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
Expand Down
1 change: 1 addition & 0 deletions examples/noise/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ require (
github.com/ipfs/go-cid v0.4.1 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jellydator/ttlcache/v3 v3.3.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions examples/noise/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,8 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc=
github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
Expand Down
1 change: 1 addition & 0 deletions examples/rln/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ require (
github.com/ipfs/go-log/v2 v2.5.1 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jellydator/ttlcache/v3 v3.3.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions examples/rln/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,8 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc=
github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ require (
github.com/dustin/go-humanize v1.0.1
github.com/go-chi/chi/v5 v5.0.0
github.com/jackc/pgx/v5 v5.4.1
github.com/jellydator/ttlcache/v3 v3.3.0
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0
github.com/waku-org/go-noise v0.0.4
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,8 @@ github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0
github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc=
github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
Expand Down
20 changes: 19 additions & 1 deletion waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type WakuFilterLightNode struct {
log *zap.Logger
subscriptions *subscription.SubscriptionsMap
pm *peermanager.PeerManager
limiter *utils.RateLimiter
peerPingInterval time.Duration
}

Expand Down Expand Up @@ -89,6 +90,7 @@ func NewWakuFilterLightNode(
onlineChecker onlinechecker.OnlineChecker,
reg prometheus.Registerer,
log *zap.Logger,
opts ...LightNodeOption,
) *WakuFilterLightNode {
wf := new(WakuFilterLightNode)
wf.log = log.Named("filterv2-lightnode")
Expand All @@ -99,6 +101,14 @@ func NewWakuFilterLightNode(
wf.CommonService = service.NewCommonService()
wf.metrics = newMetrics(reg)
wf.peerPingInterval = 1 * time.Minute

params := &LightNodeParameters{}
opts = append(DefaultLightNodeOptions(), opts...)
for _, opt := range opts {
opt(params)
}
wf.limiter = utils.NewRateLimiter(params.limitR, params.limitB)

return wf
}

Expand Down Expand Up @@ -155,6 +165,14 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea

logger := wf.log.With(logging.HostID("peerID", peerID))

if !wf.limiter.Allow(peerID) {
wf.metrics.RecordError(rateLimitFailure)
if err := stream.Reset(); err != nil {
wf.log.Error("resetting connection", zap.Error(err))
}
return
}

if !wf.subscriptions.IsSubscribedTo(peerID) {
logger.Warn("received message push from unknown peer", logging.HostID("peerID", peerID))
wf.metrics.RecordError(unknownPeerMessagePush)
Expand Down Expand Up @@ -287,7 +305,7 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte,

}

if filterSubscribeResponse.RequestId != request.RequestId {
if filterSubscribeResponse.RequestId != "N/A" && filterSubscribeResponse.RequestId != request.RequestId {
wf.log.Error("requestID mismatch", zap.String("expected", request.RequestId), zap.String("received", filterSubscribeResponse.RequestId))
wf.metrics.RecordError(requestIDMismatch)
err := NewFilterError(300, "request_id_mismatch")
Expand Down
1 change: 1 addition & 0 deletions waku/v2/protocol/filter/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ var (
peerNotFoundFailure metricsErrCategory = "peer_not_found_failure"
writeResponseFailure metricsErrCategory = "write_response_failure"
pushTimeoutFailure metricsErrCategory = "push_timeout_failure"
rateLimitFailure metricsErrCategory = "ratelimit_failure"
)

// RecordError increases the counter for different error types
Expand Down
31 changes: 31 additions & 0 deletions waku/v2/protocol/filter/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/protocol"
"go.uber.org/zap"
"golang.org/x/time/rate"
)

func (old *FilterSubscribeParameters) Copy() *FilterSubscribeParameters {
Expand Down Expand Up @@ -57,13 +58,35 @@ type (
Timeout time.Duration
MaxSubscribers int
pm *peermanager.PeerManager
limitR rate.Limit
limitB int
}

Option func(*FilterParameters)

LightNodeParameters struct {
limitR rate.Limit
limitB int
}

LightNodeOption func(*LightNodeParameters)

FilterSubscribeOption func(*FilterSubscribeParameters) error
)

func WithLightNodeRateLimiter(r rate.Limit, b int) LightNodeOption {
return func(params *LightNodeParameters) {
params.limitR = r
params.limitB = b
}
}

func DefaultLightNodeOptions() []LightNodeOption {
return []LightNodeOption{
WithLightNodeRateLimiter(1, 1),
}
}

func WithTimeout(timeout time.Duration) Option {
return func(params *FilterParameters) {
params.Timeout = timeout
Expand Down Expand Up @@ -202,9 +225,17 @@ func WithPeerManager(pm *peermanager.PeerManager) Option {
}
}

func WithFullNodeRateLimiter(r rate.Limit, b int) Option {
return func(params *FilterParameters) {
params.limitR = r
params.limitB = b
}
}

func DefaultOptions() []Option {
return []Option{
WithTimeout(DefaultIdleSubscriptionTimeout),
WithMaxSubscribers(DefaultMaxSubscribers),
WithFullNodeRateLimiter(1, 1),
}
}
16 changes: 12 additions & 4 deletions waku/v2/protocol/filter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ type (
metrics Metrics
log *zap.Logger
*service.CommonService
subscriptions *SubscribersMap
pm *peermanager.PeerManager

subscriptions *SubscribersMap
pm *peermanager.PeerManager
limiter *utils.RateLimiter
maxSubscriptions int
}
)
Expand All @@ -56,6 +56,7 @@ func NewWakuFilterFullNode(timesource timesource.Timesource, reg prometheus.Regi
opt(params)
}

wf.limiter = utils.NewRateLimiter(params.limitR, params.limitB)
wf.CommonService = service.NewCommonService()
wf.metrics = newMetrics(reg)
wf.subscriptions = NewSubscribersMap(params.Timeout)
Expand Down Expand Up @@ -93,7 +94,14 @@ func (wf *WakuFilterFullNode) start(sub *relay.Subscription) error {

func (wf *WakuFilterFullNode) onRequest(ctx context.Context) func(network.Stream) {
return func(stream network.Stream) {
logger := wf.log.With(logging.HostID("peer", stream.Conn().RemotePeer()))
peerID := stream.Conn().RemotePeer()
logger := wf.log.With(logging.HostID("peer", peerID))

if !wf.limiter.Allow(peerID) {
wf.metrics.RecordError(rateLimitFailure)
wf.reply(ctx, stream, &pb.FilterSubscribeRequest{RequestId: "N/A"}, http.StatusTooManyRequests, "filter request rejected due rate limit exceeded")
return
}

reader := pbio.NewDelimitedReader(stream, math.MaxInt32)

Expand Down
5 changes: 3 additions & 2 deletions waku/v2/protocol/filter/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
"golang.org/x/time/rate"
)

type LightNodeData struct {
Expand Down Expand Up @@ -133,7 +134,7 @@ func (s *FilterTestSuite) GetWakuFilterFullNode(topic string, withRegisterAll bo

nodeData := s.GetWakuRelay(topic)

node2Filter := NewWakuFilterFullNode(timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log)
node2Filter := NewWakuFilterFullNode(timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log, WithFullNodeRateLimiter(rate.Inf, 0))
node2Filter.SetHost(nodeData.FullNodeHost)

var sub *relay.Subscription
Expand Down Expand Up @@ -166,7 +167,7 @@ func (s *FilterTestSuite) GetWakuFilterLightNode() LightNodeData {
b := relay.NewBroadcaster(10)
s.Require().NoError(b.Start(context.Background()))
pm := peermanager.NewPeerManager(5, 5, nil, nil, true, s.Log)
filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), onlinechecker.NewDefaultOnlineChecker(true), prometheus.DefaultRegisterer, s.Log)
filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), onlinechecker.NewDefaultOnlineChecker(true), prometheus.DefaultRegisterer, s.Log, WithLightNodeRateLimiter(rate.Inf, 0))
filterPush.SetHost(host)
pm.SetHost(host)
return LightNodeData{filterPush, host}
Expand Down
8 changes: 4 additions & 4 deletions waku/v2/protocol/lightpush/waku_lightpush.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
"golang.org/x/time/rate"
)

// LightPushID_v20beta1 is the current Waku LightPush protocol identifier
Expand All @@ -40,7 +39,7 @@ var (
type WakuLightPush struct {
h host.Host
relay *relay.WakuRelay
limiter *rate.Limiter
limiter *utils.RateLimiter
cancel context.CancelFunc
pm *peermanager.PeerManager
metrics Metrics
Expand All @@ -59,11 +58,12 @@ func NewWakuLightPush(relay *relay.WakuRelay, pm *peermanager.PeerManager, reg p
wakuLP.metrics = newMetrics(reg)

params := &LightpushParameters{}
opts = append(DefaultLightpushOptions(), opts...)
for _, opt := range opts {
opt(params)
}

wakuLP.limiter = params.limiter
wakuLP.limiter = utils.NewRateLimiter(params.limitR, params.limitB)

return wakuLP
}
Expand Down Expand Up @@ -106,7 +106,7 @@ func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(network.Stream)
Response: &pb.PushResponse{},
}

if wakuLP.limiter != nil && !wakuLP.limiter.Allow() {
if !wakuLP.limiter.Allow(stream.Conn().RemotePeer()) {
wakuLP.metrics.RecordError(rateLimitFailure)
responseMsg := "exceeds the rate limit"
responsePushRPC.Response.Info = &responseMsg
Expand Down
Loading
Loading