satellite/contact: rate limit node checkins

Change-Id: Ied386a2350aa073de46443e5259b56d49ec61dbf
This commit is contained in:
JT Olio 2021-05-16 10:36:53 -06:00 committed by Ivan Fraixedes
parent 8f15f975a2
commit 1852773e3e
7 changed files with 88 additions and 14 deletions

View File

@ -383,6 +383,10 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int
return nil, err
}
// TODO: it is a huge surprise that this doesn't use the config
// parsing `default` or `devDefault` struct tag values.
// we should use storj.io/private/cfgstruct to autopopulate default
// config values and then only override ones in special cases.
config := satellite.Config{
Server: server.Config{
Address: "127.0.0.1:0",
@ -406,7 +410,10 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int
Address: "127.0.0.1:0",
},
Contact: contact.Config{
Timeout: 1 * time.Minute,
Timeout: 1 * time.Minute,
RateLimitInterval: time.Nanosecond,
RateLimitBurst: 1000,
RateLimitCacheSize: 1000,
},
Overlay: overlay.Config{
Node: overlay.NodeSelectionConfig{

View File

@ -280,7 +280,7 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
Type: pb.NodeType_SATELLITE,
Version: *pbVersion,
}
peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), self, peer.Overlay.Service, peer.DB.PeerIdentities(), peer.Dialer, config.Contact.Timeout)
peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), self, peer.Overlay.Service, peer.DB.PeerIdentities(), peer.Dialer, config.Contact)
peer.Contact.Endpoint = contact.NewEndpoint(peer.Log.Named("contact:endpoint"), peer.Contact.Service)
if err := pb.DRPCRegisterNode(peer.Server.DRPC(), peer.Contact.Endpoint); err != nil {
return nil, errs.Combine(err, peer.Close())

View File

@ -19,9 +19,10 @@ import (
)
var (
errPingBackDial = errs.Class("pingback dialing")
errCheckInIdentity = errs.Class("check-in identity")
errCheckInNetwork = errs.Class("check-in network")
errPingBackDial = errs.Class("pingback dialing")
errCheckInIdentity = errs.Class("check-in identity")
errCheckInRateLimit = errs.Class("check-in ratelimit")
errCheckInNetwork = errs.Class("check-in network")
)
// Endpoint implements the contact service Endpoints.
@ -54,6 +55,13 @@ func (endpoint *Endpoint) CheckIn(ctx context.Context, req *pb.CheckInRequest) (
}
nodeID := peerID.ID
// we need a string as a key for the limiter, but nodeID.String() has base58 encoding overhead
nodeIDBytesAsString := string(nodeID.Bytes())
if !endpoint.service.idLimiter.IsAllowed(nodeIDBytesAsString) {
endpoint.log.Info("node rate limited by id", zap.String("node address", req.Address), zap.Stringer("Node ID", nodeID))
return nil, rpcstatus.Error(rpcstatus.ResourceExhausted, errCheckInRateLimit.New("node rate limited by id").Error())
}
err = endpoint.service.peerIDs.Set(ctx, nodeID, peerID)
if err != nil {
endpoint.log.Info("failed to add peer identity entry for ID", zap.String("node address", req.Address), zap.Stringer("Node ID", nodeID), zap.Error(err))

View File

@ -0,0 +1,44 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package contact
import (
"fmt"
"time"
"golang.org/x/time/rate"
"storj.io/storj/private/lrucache"
)
// RateLimiter allows to prevent multiple events in fixed period of time.
type RateLimiter struct {
limiters *lrucache.ExpiringLRU
interval time.Duration // interval during which events are not limiting.
burst int // maximum number of events allowed during duration.
}
// NewRateLimiter is a constructor for RateLimiter.
func NewRateLimiter(interval time.Duration, burst, numLimits int) *RateLimiter {
return &RateLimiter{
limiters: lrucache.New(lrucache.Options{Expiration: -1, Capacity: numLimits}),
interval: interval,
burst: burst,
}
}
// IsAllowed indicates if event is allowed to happen.
func (rateLimiter *RateLimiter) IsAllowed(key string) bool {
limiter, err := rateLimiter.limiters.Get(key, func() (interface{}, error) {
return rate.NewLimiter(
rate.Limit(time.Second)/rate.Limit(rateLimiter.interval),
rateLimiter.burst,
), nil
})
if err != nil {
panic(fmt.Sprintf("unreachable: %+v", err))
}
return limiter.(*rate.Limiter).Allow()
}

View File

@ -24,6 +24,10 @@ import (
type Config struct {
ExternalAddress string `user:"true" help:"the public address of the node, useful for nodes behind NAT" default:""`
Timeout time.Duration `help:"timeout for pinging storage nodes" default:"10m0s"`
RateLimitInterval time.Duration `help:"the amount of time that should happen between contact attempts usually" releaseDefault:"10m0s" devDefault:"1ns"`
RateLimitBurst int `help:"the maximum burst size for the contact rate limit token bucket" releaseDefault:"2" devDefault:"1000"`
RateLimitCacheSize int `help:"the number of nodes or addresses to keep token buckets for" default:"1000"`
}
// Service is the contact service between storage nodes and satellites.
@ -41,18 +45,20 @@ type Service struct {
peerIDs overlay.PeerIdentities
dialer rpc.Dialer
timeout time.Duration
timeout time.Duration
idLimiter *RateLimiter
}
// NewService creates a new contact service.
func NewService(log *zap.Logger, self *overlay.NodeDossier, overlay *overlay.Service, peerIDs overlay.PeerIdentities, dialer rpc.Dialer, timeout time.Duration) *Service {
func NewService(log *zap.Logger, self *overlay.NodeDossier, overlay *overlay.Service, peerIDs overlay.PeerIdentities, dialer rpc.Dialer, config Config) *Service {
return &Service{
log: log,
self: self,
overlay: overlay,
peerIDs: peerIDs,
dialer: dialer,
timeout: timeout,
log: log,
self: self,
overlay: overlay,
peerIDs: peerIDs,
dialer: dialer,
timeout: config.Timeout,
idLimiter: NewRateLimiter(config.RateLimitInterval, config.RateLimitBurst, config.RateLimitCacheSize),
}
}

View File

@ -217,7 +217,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
Type: pb.NodeType_SATELLITE,
Version: *pbVersion,
}
peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), self, peer.Overlay.Service, peer.DB.PeerIdentities(), peer.Dialer, config.Contact.Timeout)
peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), self, peer.Overlay.Service, peer.DB.PeerIdentities(), peer.Dialer, config.Contact)
peer.Services.Add(lifecycle.Item{
Name: "contact:service",
Close: peer.Contact.Service.Close,

View File

@ -172,6 +172,15 @@ compensation.withheld-percents: 75,75,75,50,50,50,25,25,25,0,0,0,0,0,0
# the public address of the node, useful for nodes behind NAT
contact.external-address: ""
# the maximum burst size for the contact rate limit token bucket
# contact.rate-limit-burst: 2
# the number of nodes or addresses to keep token buckets for
# contact.rate-limit-cache-size: 1000
# the amount of time that should happen between contact attempts usually
# contact.rate-limit-interval: 10m0s
# timeout for pinging storage nodes
# contact.timeout: 10m0s