c4fd84ad3e
Trace the calls to DeletePiecesService.DeletePieces method and add metrics for having statistics about the rate that specific storage node is dialed and duration time spent on dialing storage nodes. These statistics will help us to find out if we should implement connections queues to storage node for reducing the deletion time in cae that we see that we're spending too much time dialing frequent storage nodes. Ticket: https://storjlabs.atlassian.net/browse/SM-85 Change-Id: I9601676c3a8ad96c73c93833145929e4817755e2
198 lines
5.7 KiB
Go
198 lines
5.7 KiB
Go
// Copyright (C) 2020 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package metainfo
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/zeebo/errs"
|
|
"go.uber.org/zap"
|
|
|
|
"storj.io/common/pb"
|
|
"storj.io/common/rpc"
|
|
"storj.io/common/storj"
|
|
"storj.io/common/sync2"
|
|
"storj.io/uplink/piecestore"
|
|
)
|
|
|
|
// ErrDeletePieces is the general error class for DeletePiecesService
|
|
var ErrDeletePieces = errs.Class("metainfo storage node service")
|
|
|
|
// piecesToDeleteLlimit is the maximum number of piece IDs can be sent to a storagenode in a request. Currently, the calculation is based on DRPC maximum message side, 4MB
|
|
const piecesToDeleteLimit = 1000
|
|
|
|
// DeletePiecesService is the metainfo service in charge of deleting pieces of
|
|
// storage nodes.
|
|
//
|
|
// architecture: Service
|
|
type DeletePiecesService struct {
|
|
log *zap.Logger
|
|
dialer rpc.Dialer
|
|
|
|
// TODO: v3-3406 this values is currently only used to limit the concurrent
|
|
// connections by each single method call.
|
|
maxConns int
|
|
}
|
|
|
|
// NewDeletePiecesService creates a new DeletePiecesService. maxConcurrentConns
|
|
// is the maximum number of connections that each single method call uses.
|
|
//
|
|
// It returns an error if maxConcurrentConns is less or equal than 0, dialer is
|
|
// a zero value or log is nil.
|
|
func NewDeletePiecesService(log *zap.Logger, dialer rpc.Dialer, maxConcurrentConns int) (*DeletePiecesService, error) {
|
|
// TODO: v3-3476 should we have an upper limit?
|
|
if maxConcurrentConns <= 0 {
|
|
return nil, ErrDeletePieces.New(
|
|
"max concurrent connections must be greater than 0, got %d", maxConcurrentConns,
|
|
)
|
|
}
|
|
|
|
if dialer == (rpc.Dialer{}) {
|
|
return nil, ErrDeletePieces.New("%s", "dialer cannot be its zero value")
|
|
}
|
|
|
|
if log == nil {
|
|
return nil, ErrDeletePieces.New("%s", "logger cannot be nil")
|
|
}
|
|
|
|
return &DeletePiecesService{
|
|
maxConns: maxConcurrentConns,
|
|
dialer: dialer,
|
|
log: log,
|
|
}, nil
|
|
}
|
|
|
|
// DeletePieces deletes all the indicated pieces of the nodes which are online
|
|
// stopping 300 milliseconds after reaching the successThreshold of the total
|
|
// number of pieces otherwise when trying to delete all the pieces finishes.
|
|
//
|
|
// It only returns an error if sync2.NewSuccessThreshold returns an error.
|
|
func (service *DeletePiecesService) DeletePieces(
|
|
ctx context.Context, nodes NodesPieces, successThreshold float64,
|
|
) (err error) {
|
|
defer mon.Task()(&ctx, len(nodes), nodes.NumPieces(), successThreshold)(&err)
|
|
|
|
threshold, err := sync2.NewSuccessThreshold(len(nodes), successThreshold)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// TODO: v3-3476 This timeout will go away in a second commit
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
// TODO: v3-3406 this limiter will be global to the service instance if we
|
|
// decide to do so
|
|
limiter := sync2.NewLimiter(service.maxConns)
|
|
for _, n := range nodes {
|
|
node := n.Node
|
|
pieces := n.Pieces
|
|
|
|
// create batches if number of pieces are more than the maximum of number of piece ids that can be sent in a single request
|
|
pieceBatches := make([][]storj.PieceID, 0, (len(pieces)+piecesToDeleteLimit-1)/piecesToDeleteLimit)
|
|
for len(pieces) > piecesToDeleteLimit {
|
|
pieceBatches = append(pieceBatches, pieces[0:piecesToDeleteLimit])
|
|
pieces = pieces[piecesToDeleteLimit:]
|
|
}
|
|
if len(pieces) > 0 {
|
|
pieceBatches = append(pieceBatches, pieces)
|
|
}
|
|
|
|
limiter.Go(ctx, func() {
|
|
// Track the rate that each single node is dialed
|
|
mon.Event(fmt.Sprintf("DeletePieces_node_%s", node.Id.String()))
|
|
|
|
// Track the low/high/recent/average/quantiles of successful nodes dialing.
|
|
// Not stopping the timer doesn't leak resources.
|
|
timerDialSuccess := mon.Timer("DeletePieces_nodes_dial_success").Start()
|
|
client, err := piecestore.Dial(
|
|
ctx, service.dialer, node, service.log, piecestore.Config{},
|
|
)
|
|
if err != nil {
|
|
service.log.Warn("unable to dial storage node",
|
|
zap.Stringer("node_id", node.Id),
|
|
zap.Stringer("node_info", node),
|
|
zap.Error(err),
|
|
)
|
|
|
|
threshold.Failure()
|
|
|
|
// Pieces will be collected by garbage collector
|
|
return
|
|
}
|
|
timerDialSuccess.Stop()
|
|
|
|
defer func() {
|
|
err := client.Close()
|
|
if err != nil {
|
|
service.log.Warn("error closing the storage node client connection",
|
|
zap.Stringer("node_id", node.Id),
|
|
zap.Stringer("node_info", node),
|
|
zap.Error(err),
|
|
)
|
|
}
|
|
}()
|
|
|
|
for _, batch := range pieceBatches {
|
|
err := client.DeletePieces(ctx, batch...)
|
|
if err != nil {
|
|
// piece will be collected by garbage collector
|
|
service.log.Warn("unable to delete pieces of a storage node",
|
|
zap.Stringer("node_id", node.Id),
|
|
zap.Error(err),
|
|
)
|
|
|
|
// mark the node as failure if one error is returned since only authentication errors are returned by DeletePieces
|
|
threshold.Failure()
|
|
|
|
// Pieces will be collected by garbage collector
|
|
return
|
|
}
|
|
}
|
|
|
|
threshold.Success()
|
|
|
|
})
|
|
}
|
|
|
|
threshold.Wait(ctx)
|
|
// return to the client after the success threshold but wait some time before
|
|
// canceling the remaining deletes
|
|
timer := time.AfterFunc(200*time.Millisecond, cancel)
|
|
defer timer.Stop()
|
|
|
|
limiter.Wait()
|
|
return nil
|
|
}
|
|
|
|
// Close wait until all the resources used by the service are closed before
|
|
// returning.
|
|
func (service *DeletePiecesService) Close() error {
|
|
// TODO: orange/v3-3476 it will wait until all the goroutines run by the
|
|
// DeletePieces finish rather than using the current timeout.
|
|
return nil
|
|
}
|
|
|
|
// NodePieces indicates a list of pieces that belong to a storage node.
|
|
type NodePieces struct {
|
|
Node *pb.Node
|
|
Pieces []storj.PieceID
|
|
}
|
|
|
|
// NodesPieces is a slice of NodePieces
|
|
type NodesPieces []NodePieces
|
|
|
|
// NumPieces sums the number of pieces of all the storage nodes of the slice and
|
|
// returns it.
|
|
func (nodes NodesPieces) NumPieces() int {
|
|
total := 0
|
|
for _, node := range nodes {
|
|
total += len(node.Pieces)
|
|
}
|
|
|
|
return total
|
|
}
|