14ad7a4f1c
services This PR adds a limiter on the amount of concurrent objects deletion can be handled so we don't run out of memory. Change-Id: Id2ce368af6f86845fcdfd34cb2f5e460efe9b272
242 lines
6.9 KiB
Go
242 lines
6.9 KiB
Go
// Copyright (C) 2020 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package piecedeletion
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/zeebo/errs"
|
|
"go.uber.org/zap"
|
|
"golang.org/x/sync/semaphore"
|
|
|
|
"storj.io/common/pb"
|
|
"storj.io/common/rpc"
|
|
"storj.io/common/storj"
|
|
"storj.io/common/sync2"
|
|
)
|
|
|
|
// Config defines configuration options for Service.
|
|
type Config struct {
|
|
MaxConcurrency int `help:"maximum number of concurrent requests to storage nodes" default:"100"`
|
|
MaxConcurrentPieces int `help:"maximum number of concurrent pieces can be processed" default:"1000000"`
|
|
|
|
MaxPiecesPerBatch int `help:"maximum number of pieces per batch" default:"5000"`
|
|
MaxPiecesPerRequest int `help:"maximum number pieces per single request" default:"1000"`
|
|
|
|
DialTimeout time.Duration `help:"timeout for dialing nodes (0 means satellite default)" default:"0"`
|
|
FailThreshold time.Duration `help:"threshold for retrying a failed node" releaseDefault:"5m" devDefault:"2s"`
|
|
RequestTimeout time.Duration `help:"timeout for a single delete request" releaseDefault:"1m" devDefault:"2s"`
|
|
}
|
|
|
|
const (
|
|
minTimeout = 5 * time.Millisecond
|
|
maxTimeout = 5 * time.Minute
|
|
)
|
|
|
|
// Verify verifies configuration sanity.
|
|
func (config *Config) Verify() errs.Group {
|
|
var errlist errs.Group
|
|
if config.MaxConcurrency <= 0 {
|
|
errlist.Add(Error.New("concurrency %d must be greater than 0", config.MaxConcurrency))
|
|
}
|
|
if config.MaxConcurrentPieces <= 0 {
|
|
errlist.Add(Error.New("max concurrent pieces %d must be greater than 0", config.MaxConcurrentPieces))
|
|
}
|
|
if config.MaxPiecesPerBatch < config.MaxPiecesPerRequest {
|
|
errlist.Add(Error.New("max pieces per batch %d should be larger than max pieces per request %d", config.MaxPiecesPerBatch, config.MaxPiecesPerRequest))
|
|
}
|
|
if config.MaxPiecesPerBatch <= 0 {
|
|
errlist.Add(Error.New("max pieces per batch %d must be greater than 0", config.MaxPiecesPerBatch))
|
|
}
|
|
if config.MaxPiecesPerRequest <= 0 {
|
|
errlist.Add(Error.New("max pieces per request %d must be greater than 0", config.MaxPiecesPerRequest))
|
|
}
|
|
if config.DialTimeout != 0 && (config.DialTimeout <= minTimeout || maxTimeout <= config.DialTimeout) {
|
|
errlist.Add(Error.New("dial timeout %v must be between %v and %v", config.DialTimeout, minTimeout, maxTimeout))
|
|
}
|
|
if config.RequestTimeout < minTimeout || maxTimeout < config.RequestTimeout {
|
|
errlist.Add(Error.New("request timeout %v should be between %v and %v", config.RequestTimeout, minTimeout, maxTimeout))
|
|
}
|
|
return errlist
|
|
}
|
|
|
|
// Nodes stores reliable nodes information.
|
|
type Nodes interface {
|
|
KnownReliable(ctx context.Context, nodeIDs storj.NodeIDList) ([]*pb.Node, error)
|
|
}
|
|
|
|
// Service handles combining piece deletion requests.
|
|
//
|
|
// architecture: Service
|
|
type Service struct {
|
|
log *zap.Logger
|
|
config Config
|
|
|
|
concurrentRequests *semaphore.Weighted
|
|
|
|
rpcDialer rpc.Dialer
|
|
nodesDB Nodes
|
|
|
|
running sync2.Fence
|
|
combiner *Combiner
|
|
dialer *Dialer
|
|
limited *LimitedHandler
|
|
}
|
|
|
|
// NewService creates a new service.
|
|
func NewService(log *zap.Logger, dialer rpc.Dialer, nodesDB Nodes, config Config) (*Service, error) {
|
|
var errlist errs.Group
|
|
if log == nil {
|
|
errlist.Add(Error.New("log is nil"))
|
|
}
|
|
if dialer == (rpc.Dialer{}) {
|
|
errlist.Add(Error.New("dialer is zero"))
|
|
}
|
|
if nodesDB == nil {
|
|
errlist.Add(Error.New("nodesDB is nil"))
|
|
}
|
|
if errs := config.Verify(); len(errs) > 0 {
|
|
errlist.Add(errs...)
|
|
}
|
|
if err := errlist.Err(); err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
|
|
dialerClone := dialer
|
|
if config.DialTimeout > 0 {
|
|
dialerClone.DialTimeout = config.DialTimeout
|
|
}
|
|
|
|
return &Service{
|
|
log: log,
|
|
config: config,
|
|
concurrentRequests: semaphore.NewWeighted(int64(config.MaxConcurrentPieces)),
|
|
rpcDialer: dialerClone,
|
|
nodesDB: nodesDB,
|
|
}, nil
|
|
}
|
|
|
|
// newQueue creates the configured queue.
|
|
func (service *Service) newQueue() Queue {
|
|
return NewLimitedJobs(service.config.MaxPiecesPerBatch)
|
|
}
|
|
|
|
// Run initializes the service.
|
|
func (service *Service) Run(ctx context.Context) error {
|
|
defer service.running.Release()
|
|
|
|
config := service.config
|
|
service.dialer = NewDialer(service.log.Named("dialer"), service.rpcDialer, config.RequestTimeout, config.FailThreshold, config.MaxPiecesPerRequest)
|
|
service.limited = NewLimitedHandler(service.dialer, config.MaxConcurrency)
|
|
service.combiner = NewCombiner(ctx, service.limited, service.newQueue)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Close shuts down the service.
|
|
func (service *Service) Close() error {
|
|
<-service.running.Done()
|
|
service.combiner.Close()
|
|
return nil
|
|
}
|
|
|
|
// Delete deletes the pieces specified in the requests waiting until success threshold is reached.
|
|
func (service *Service) Delete(ctx context.Context, requests []Request, successThreshold float64) (err error) {
|
|
defer mon.Task()(&ctx, len(requests), requestsPieceCount(requests), successThreshold)(&err)
|
|
|
|
if len(requests) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// wait for combiner and dialer to set themselves up.
|
|
if !service.running.Wait(ctx) {
|
|
return Error.Wrap(ctx.Err())
|
|
}
|
|
|
|
for i, req := range requests {
|
|
if !req.IsValid() {
|
|
return Error.New("request #%d is invalid", i)
|
|
}
|
|
}
|
|
|
|
// When number of pieces are more than the maximum limit, we let it overflow,
|
|
// so we don't have to split requests in to separate batches.
|
|
totalPieceCount := requestsPieceCount(requests)
|
|
if totalPieceCount > service.config.MaxConcurrentPieces {
|
|
totalPieceCount = service.config.MaxConcurrentPieces
|
|
}
|
|
|
|
if err := service.concurrentRequests.Acquire(ctx, int64(totalPieceCount)); err != nil {
|
|
return Error.Wrap(err)
|
|
}
|
|
defer service.concurrentRequests.Release(int64(totalPieceCount))
|
|
|
|
threshold, err := sync2.NewSuccessThreshold(len(requests), successThreshold)
|
|
if err != nil {
|
|
return Error.Wrap(err)
|
|
}
|
|
|
|
// Create a map for matching node information with the corresponding
|
|
// request.
|
|
nodesReqs := make(map[storj.NodeID]Request, len(requests))
|
|
nodeIDs := []storj.NodeID{}
|
|
for _, req := range requests {
|
|
if req.Node.Address == "" {
|
|
nodeIDs = append(nodeIDs, req.Node.ID)
|
|
}
|
|
nodesReqs[req.Node.ID] = req
|
|
}
|
|
|
|
if len(nodeIDs) > 0 {
|
|
nodes, err := service.nodesDB.KnownReliable(ctx, nodeIDs)
|
|
if err != nil {
|
|
// Pieces will be collected by garbage collector
|
|
return Error.Wrap(err)
|
|
}
|
|
|
|
for _, node := range nodes {
|
|
req := nodesReqs[node.Id]
|
|
|
|
nodesReqs[node.Id] = Request{
|
|
Node: storj.NodeURL{
|
|
ID: node.Id,
|
|
Address: node.Address.Address,
|
|
},
|
|
Pieces: req.Pieces,
|
|
}
|
|
}
|
|
}
|
|
|
|
for _, req := range nodesReqs {
|
|
service.combiner.Enqueue(req.Node, Job{
|
|
Pieces: req.Pieces,
|
|
Resolve: threshold,
|
|
})
|
|
}
|
|
|
|
threshold.Wait(ctx)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Request defines a deletion requests for a node.
|
|
type Request struct {
|
|
Node storj.NodeURL
|
|
Pieces []storj.PieceID
|
|
}
|
|
|
|
// IsValid returns whether the request is valid.
|
|
func (req *Request) IsValid() bool {
|
|
return !req.Node.ID.IsZero() && len(req.Pieces) > 0
|
|
}
|
|
|
|
func requestsPieceCount(requests []Request) int {
|
|
total := 0
|
|
for _, r := range requests {
|
|
total += len(r.Pieces)
|
|
}
|
|
return total
|
|
}
|