storagenode/orders: adding jitter to sending (#3725)
This commit is contained in:
parent
82ee13b00b
commit
758fe35aba
@ -115,6 +115,7 @@ func (planet *Planet) newStorageNodes(count int, whitelistedSatellites storj.Nod
|
||||
SenderTimeout: 10 * time.Minute,
|
||||
CleanupInterval: defaultInterval,
|
||||
ArchiveTTL: time.Hour,
|
||||
MaxSleep: 0,
|
||||
},
|
||||
Monitor: monitor.Config{
|
||||
MinimumBandwidth: 100 * memory.MB,
|
||||
|
@ -6,6 +6,7 @@ package orders
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
@ -82,6 +83,7 @@ type DB interface {
|
||||
|
||||
// Config defines configuration for sending orders.
|
||||
type Config struct {
|
||||
MaxSleep time.Duration `help:"maximum duration to wait before trying to send orders" releaseDefault:"300s" devDefault:"1s"`
|
||||
SenderInterval time.Duration `help:"duration between sending" default:"1h0m0s"`
|
||||
SenderTimeout time.Duration `help:"timeout for sending" default:"1h0m0s"`
|
||||
SenderDialTimeout time.Duration `help:"timeout for dialing satellite during sending orders" default:"1m0s"`
|
||||
@ -123,8 +125,31 @@ func (service *Service) Run(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
var group errgroup.Group
|
||||
service.Sender.Start(ctx, &group, service.sendOrders)
|
||||
service.Cleanup.Start(ctx, &group, service.cleanArchive)
|
||||
|
||||
service.Sender.Start(ctx, &group, func(ctx context.Context) error {
|
||||
if err := service.sleep(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err := service.sendOrders(ctx)
|
||||
if err != nil {
|
||||
service.log.Error("sending orders failed", zap.Error(err))
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
service.Cleanup.Start(ctx, &group, func(ctx context.Context) error {
|
||||
if err := service.sleep(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err := service.cleanArchive(ctx)
|
||||
if err != nil {
|
||||
service.log.Error("clean archive failed", zap.Error(err))
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return group.Wait()
|
||||
}
|
||||
@ -336,6 +361,21 @@ func (service *Service) settle(ctx context.Context, log *zap.Logger, satelliteID
|
||||
return errList.Err()
|
||||
}
|
||||
|
||||
// sleep for random interval in [0;maxSleep)
|
||||
// returns error if context was cancelled
|
||||
func (service *Service) sleep(ctx context.Context) error {
|
||||
if service.config.MaxSleep <= 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
jitter := time.Duration(rand.Int63n(int64(service.config.MaxSleep)))
|
||||
if !sync2.Sleep(ctx, jitter) {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close stops the sending service.
|
||||
func (service *Service) Close() error {
|
||||
service.Sender.Close()
|
||||
|
Loading…
Reference in New Issue
Block a user