From 1852773e3e1c4b75a3276440f95d7233aa43a877 Mon Sep 17 00:00:00 2001 From: JT Olio Date: Sun, 16 May 2021 10:36:53 -0600 Subject: [PATCH] satellite/contact: rate limit node checkins Change-Id: Ied386a2350aa073de46443e5259b56d49ec61dbf --- private/testplanet/satellite.go | 9 ++++- satellite/api.go | 2 +- satellite/contact/endpoint.go | 14 +++++-- satellite/contact/ratelimit.go | 44 +++++++++++++++++++++ satellite/contact/service.go | 22 +++++++---- satellite/core.go | 2 +- scripts/testdata/satellite-config.yaml.lock | 9 +++++ 7 files changed, 88 insertions(+), 14 deletions(-) create mode 100644 satellite/contact/ratelimit.go diff --git a/private/testplanet/satellite.go b/private/testplanet/satellite.go index af15e9e00..272c16b87 100644 --- a/private/testplanet/satellite.go +++ b/private/testplanet/satellite.go @@ -383,6 +383,10 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int return nil, err } + // TODO: it is a huge surprise that this doesn't use the config + // parsing `default` or `devDefault` struct tag values. + // we should use storj.io/private/cfgstruct to autopopulate default + // config values and then only override ones in special cases. config := satellite.Config{ Server: server.Config{ Address: "127.0.0.1:0", @@ -406,7 +410,10 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int Address: "127.0.0.1:0", }, Contact: contact.Config{ - Timeout: 1 * time.Minute, + Timeout: 1 * time.Minute, + RateLimitInterval: time.Nanosecond, + RateLimitBurst: 1000, + RateLimitCacheSize: 1000, }, Overlay: overlay.Config{ Node: overlay.NodeSelectionConfig{ diff --git a/satellite/api.go b/satellite/api.go index a9072ecfb..1872aaa1c 100644 --- a/satellite/api.go +++ b/satellite/api.go @@ -280,7 +280,7 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, Type: pb.NodeType_SATELLITE, Version: *pbVersion, } - peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), self, peer.Overlay.Service, peer.DB.PeerIdentities(), peer.Dialer, config.Contact.Timeout) + peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), self, peer.Overlay.Service, peer.DB.PeerIdentities(), peer.Dialer, config.Contact) peer.Contact.Endpoint = contact.NewEndpoint(peer.Log.Named("contact:endpoint"), peer.Contact.Service) if err := pb.DRPCRegisterNode(peer.Server.DRPC(), peer.Contact.Endpoint); err != nil { return nil, errs.Combine(err, peer.Close()) diff --git a/satellite/contact/endpoint.go b/satellite/contact/endpoint.go index c2b2bcc11..50c6f8c01 100644 --- a/satellite/contact/endpoint.go +++ b/satellite/contact/endpoint.go @@ -19,9 +19,10 @@ import ( ) var ( - errPingBackDial = errs.Class("pingback dialing") - errCheckInIdentity = errs.Class("check-in identity") - errCheckInNetwork = errs.Class("check-in network") + errPingBackDial = errs.Class("pingback dialing") + errCheckInIdentity = errs.Class("check-in identity") + errCheckInRateLimit = errs.Class("check-in ratelimit") + errCheckInNetwork = errs.Class("check-in network") ) // Endpoint implements the contact service Endpoints. @@ -54,6 +55,13 @@ func (endpoint *Endpoint) CheckIn(ctx context.Context, req *pb.CheckInRequest) ( } nodeID := peerID.ID + // we need a string as a key for the limiter, but nodeID.String() has base58 encoding overhead + nodeIDBytesAsString := string(nodeID.Bytes()) + if !endpoint.service.idLimiter.IsAllowed(nodeIDBytesAsString) { + endpoint.log.Info("node rate limited by id", zap.String("node address", req.Address), zap.Stringer("Node ID", nodeID)) + return nil, rpcstatus.Error(rpcstatus.ResourceExhausted, errCheckInRateLimit.New("node rate limited by id").Error()) + } + err = endpoint.service.peerIDs.Set(ctx, nodeID, peerID) if err != nil { endpoint.log.Info("failed to add peer identity entry for ID", zap.String("node address", req.Address), zap.Stringer("Node ID", nodeID), zap.Error(err)) diff --git a/satellite/contact/ratelimit.go b/satellite/contact/ratelimit.go new file mode 100644 index 000000000..58c4cf95f --- /dev/null +++ b/satellite/contact/ratelimit.go @@ -0,0 +1,44 @@ +// Copyright (C) 2021 Storj Labs, Inc. +// See LICENSE for copying information. + +package contact + +import ( + "fmt" + "time" + + "golang.org/x/time/rate" + + "storj.io/storj/private/lrucache" +) + +// RateLimiter allows to prevent multiple events in fixed period of time. +type RateLimiter struct { + limiters *lrucache.ExpiringLRU + interval time.Duration // interval during which events are not limiting. + burst int // maximum number of events allowed during duration. +} + +// NewRateLimiter is a constructor for RateLimiter. +func NewRateLimiter(interval time.Duration, burst, numLimits int) *RateLimiter { + return &RateLimiter{ + limiters: lrucache.New(lrucache.Options{Expiration: -1, Capacity: numLimits}), + interval: interval, + burst: burst, + } +} + +// IsAllowed indicates if event is allowed to happen. +func (rateLimiter *RateLimiter) IsAllowed(key string) bool { + limiter, err := rateLimiter.limiters.Get(key, func() (interface{}, error) { + return rate.NewLimiter( + rate.Limit(time.Second)/rate.Limit(rateLimiter.interval), + rateLimiter.burst, + ), nil + }) + if err != nil { + panic(fmt.Sprintf("unreachable: %+v", err)) + } + + return limiter.(*rate.Limiter).Allow() +} diff --git a/satellite/contact/service.go b/satellite/contact/service.go index 41addf8f5..220370a2b 100644 --- a/satellite/contact/service.go +++ b/satellite/contact/service.go @@ -24,6 +24,10 @@ import ( type Config struct { ExternalAddress string `user:"true" help:"the public address of the node, useful for nodes behind NAT" default:""` Timeout time.Duration `help:"timeout for pinging storage nodes" default:"10m0s"` + + RateLimitInterval time.Duration `help:"the amount of time that should happen between contact attempts usually" releaseDefault:"10m0s" devDefault:"1ns"` + RateLimitBurst int `help:"the maximum burst size for the contact rate limit token bucket" releaseDefault:"2" devDefault:"1000"` + RateLimitCacheSize int `help:"the number of nodes or addresses to keep token buckets for" default:"1000"` } // Service is the contact service between storage nodes and satellites. @@ -41,18 +45,20 @@ type Service struct { peerIDs overlay.PeerIdentities dialer rpc.Dialer - timeout time.Duration + timeout time.Duration + idLimiter *RateLimiter } // NewService creates a new contact service. -func NewService(log *zap.Logger, self *overlay.NodeDossier, overlay *overlay.Service, peerIDs overlay.PeerIdentities, dialer rpc.Dialer, timeout time.Duration) *Service { +func NewService(log *zap.Logger, self *overlay.NodeDossier, overlay *overlay.Service, peerIDs overlay.PeerIdentities, dialer rpc.Dialer, config Config) *Service { return &Service{ - log: log, - self: self, - overlay: overlay, - peerIDs: peerIDs, - dialer: dialer, - timeout: timeout, + log: log, + self: self, + overlay: overlay, + peerIDs: peerIDs, + dialer: dialer, + timeout: config.Timeout, + idLimiter: NewRateLimiter(config.RateLimitInterval, config.RateLimitBurst, config.RateLimitCacheSize), } } diff --git a/satellite/core.go b/satellite/core.go index fdcff7496..c162aee31 100644 --- a/satellite/core.go +++ b/satellite/core.go @@ -217,7 +217,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, Type: pb.NodeType_SATELLITE, Version: *pbVersion, } - peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), self, peer.Overlay.Service, peer.DB.PeerIdentities(), peer.Dialer, config.Contact.Timeout) + peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), self, peer.Overlay.Service, peer.DB.PeerIdentities(), peer.Dialer, config.Contact) peer.Services.Add(lifecycle.Item{ Name: "contact:service", Close: peer.Contact.Service.Close, diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index cb5477fcb..c398252e0 100755 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -172,6 +172,15 @@ compensation.withheld-percents: 75,75,75,50,50,50,25,25,25,0,0,0,0,0,0 # the public address of the node, useful for nodes behind NAT contact.external-address: "" +# the maximum burst size for the contact rate limit token bucket +# contact.rate-limit-burst: 2 + +# the number of nodes or addresses to keep token buckets for +# contact.rate-limit-cache-size: 1000 + +# the amount of time that should happen between contact attempts usually +# contact.rate-limit-interval: 10m0s + # timeout for pinging storage nodes # contact.timeout: 10m0s