storj/satellite/metainfo/piecedeletion/service.go

242 lines
6.9 KiB
Go
Raw Normal View History

// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package piecedeletion
import (
"context"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/semaphore"
"storj.io/common/pb"
"storj.io/common/rpc"
"storj.io/common/storj"
"storj.io/common/sync2"
)
// Config defines configuration options for Service.
type Config struct {
MaxConcurrency int `help:"maximum number of concurrent requests to storage nodes" default:"100"`
MaxConcurrentPieces int `help:"maximum number of concurrent pieces can be processed" default:"1000000"`
MaxPiecesPerBatch int `help:"maximum number of pieces per batch" default:"5000"`
MaxPiecesPerRequest int `help:"maximum number pieces per single request" default:"1000"`
DialTimeout time.Duration `help:"timeout for dialing nodes (0 means satellite default)" default:"0"`
FailThreshold time.Duration `help:"threshold for retrying a failed node" releaseDefault:"5m" devDefault:"2s"`
RequestTimeout time.Duration `help:"timeout for a single delete request" releaseDefault:"1m" devDefault:"2s"`
}
const (
minTimeout = 5 * time.Millisecond
maxTimeout = 5 * time.Minute
)
// Verify verifies configuration sanity.
func (config *Config) Verify() errs.Group {
var errlist errs.Group
if config.MaxConcurrency <= 0 {
errlist.Add(Error.New("concurrency %d must be greater than 0", config.MaxConcurrency))
}
if config.MaxConcurrentPieces <= 0 {
errlist.Add(Error.New("max concurrent pieces %d must be greater than 0", config.MaxConcurrentPieces))
}
if config.MaxPiecesPerBatch < config.MaxPiecesPerRequest {
errlist.Add(Error.New("max pieces per batch %d should be larger than max pieces per request %d", config.MaxPiecesPerBatch, config.MaxPiecesPerRequest))
}
if config.MaxPiecesPerBatch <= 0 {
errlist.Add(Error.New("max pieces per batch %d must be greater than 0", config.MaxPiecesPerBatch))
}
if config.MaxPiecesPerRequest <= 0 {
errlist.Add(Error.New("max pieces per request %d must be greater than 0", config.MaxPiecesPerRequest))
}
if config.DialTimeout != 0 && (config.DialTimeout <= minTimeout || maxTimeout <= config.DialTimeout) {
errlist.Add(Error.New("dial timeout %v must be between %v and %v", config.DialTimeout, minTimeout, maxTimeout))
}
if config.RequestTimeout < minTimeout || maxTimeout < config.RequestTimeout {
errlist.Add(Error.New("request timeout %v should be between %v and %v", config.RequestTimeout, minTimeout, maxTimeout))
}
return errlist
}
// Nodes stores reliable nodes information.
type Nodes interface {
KnownReliable(ctx context.Context, nodeIDs storj.NodeIDList) ([]*pb.Node, error)
}
// Service handles combining piece deletion requests.
//
// architecture: Service
type Service struct {
log *zap.Logger
config Config
concurrentRequests *semaphore.Weighted
rpcDialer rpc.Dialer
nodesDB Nodes
running sync2.Fence
combiner *Combiner
dialer *Dialer
limited *LimitedHandler
}
// NewService creates a new service.
func NewService(log *zap.Logger, dialer rpc.Dialer, nodesDB Nodes, config Config) (*Service, error) {
var errlist errs.Group
if log == nil {
errlist.Add(Error.New("log is nil"))
}
if dialer == (rpc.Dialer{}) {
errlist.Add(Error.New("dialer is zero"))
}
if nodesDB == nil {
errlist.Add(Error.New("nodesDB is nil"))
}
if errs := config.Verify(); len(errs) > 0 {
errlist.Add(errs...)
}
if err := errlist.Err(); err != nil {
return nil, Error.Wrap(err)
}
dialerClone := dialer
if config.DialTimeout > 0 {
dialerClone.DialTimeout = config.DialTimeout
}
return &Service{
log: log,
config: config,
concurrentRequests: semaphore.NewWeighted(int64(config.MaxConcurrentPieces)),
rpcDialer: dialerClone,
nodesDB: nodesDB,
}, nil
}
// newQueue creates the configured queue.
func (service *Service) newQueue() Queue {
return NewLimitedJobs(service.config.MaxPiecesPerBatch)
}
// Run initializes the service.
func (service *Service) Run(ctx context.Context) error {
defer service.running.Release()
config := service.config
service.dialer = NewDialer(service.log.Named("dialer"), service.rpcDialer, config.RequestTimeout, config.FailThreshold, config.MaxPiecesPerRequest)
service.limited = NewLimitedHandler(service.dialer, config.MaxConcurrency)
service.combiner = NewCombiner(ctx, service.limited, service.newQueue)
return nil
}
// Close shuts down the service.
func (service *Service) Close() error {
<-service.running.Done()
service.combiner.Close()
return nil
}
// Delete deletes the pieces specified in the requests waiting until success threshold is reached.
func (service *Service) Delete(ctx context.Context, requests []Request, successThreshold float64) (err error) {
defer mon.Task()(&ctx, len(requests), requestsPieceCount(requests), successThreshold)(&err)
if len(requests) == 0 {
return nil
}
// wait for combiner and dialer to set themselves up.
if !service.running.Wait(ctx) {
return Error.Wrap(ctx.Err())
}
for i, req := range requests {
if !req.IsValid() {
return Error.New("request #%d is invalid", i)
}
}
// When number of pieces are more than the maximum limit, we let it overflow,
// so we don't have to split requests in to separate batches.
totalPieceCount := requestsPieceCount(requests)
if totalPieceCount > service.config.MaxConcurrentPieces {
totalPieceCount = service.config.MaxConcurrentPieces
}
if err := service.concurrentRequests.Acquire(ctx, int64(totalPieceCount)); err != nil {
return Error.Wrap(err)
}
defer service.concurrentRequests.Release(int64(totalPieceCount))
// Create a map for matching node information with the corresponding
// request.
nodesReqs := make(map[storj.NodeID]Request, len(requests))
nodeIDs := []storj.NodeID{}
for _, req := range requests {
if req.Node.Address == "" {
nodeIDs = append(nodeIDs, req.Node.ID)
}
nodesReqs[req.Node.ID] = req
}
if len(nodeIDs) > 0 {
nodes, err := service.nodesDB.KnownReliable(ctx, nodeIDs)
if err != nil {
// Pieces will be collected by garbage collector
return Error.Wrap(err)
}
for _, node := range nodes {
req := nodesReqs[node.Id]
nodesReqs[node.Id] = Request{
Node: storj.NodeURL{
ID: node.Id,
Address: node.Address.Address,
},
Pieces: req.Pieces,
}
}
}
threshold, err := sync2.NewSuccessThreshold(len(nodesReqs), successThreshold)
if err != nil {
return Error.Wrap(err)
}
for _, req := range nodesReqs {
service.combiner.Enqueue(req.Node, Job{
Pieces: req.Pieces,
Resolve: threshold,
})
}
threshold.Wait(ctx)
return nil
}
// Request defines a deletion requests for a node.
type Request struct {
Node storj.NodeURL
Pieces []storj.PieceID
}
// IsValid returns whether the request is valid.
func (req *Request) IsValid() bool {
return !req.Node.ID.IsZero() && len(req.Pieces) > 0
}
func requestsPieceCount(requests []Request) int {
total := 0
for _, r := range requests {
total += len(r.Pieces)
}
return total
}