storj/satellite/metainfo/delete_pieces_service.go
Ivan Fraixedes c4fd84ad3e satellite/metainfo: Add metrics and traces DeletePices
Trace the calls to DeletePiecesService.DeletePieces method and add
metrics for having statistics about the rate that specific storage node
is dialed and duration time spent on dialing storage nodes.

These statistics will help us to find out if we should implement
connections queues to storage node for reducing the deletion time in cae
that we see that we're spending too much time dialing frequent storage
nodes.

Ticket: https://storjlabs.atlassian.net/browse/SM-85
Change-Id: I9601676c3a8ad96c73c93833145929e4817755e2
2020-02-12 15:38:50 +00:00

198 lines
5.7 KiB
Go

// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package metainfo
import (
"context"
"fmt"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/pb"
"storj.io/common/rpc"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/uplink/piecestore"
)
// ErrDeletePieces is the general error class for DeletePiecesService
var ErrDeletePieces = errs.Class("metainfo storage node service")
// piecesToDeleteLlimit is the maximum number of piece IDs can be sent to a storagenode in a request. Currently, the calculation is based on DRPC maximum message side, 4MB
const piecesToDeleteLimit = 1000
// DeletePiecesService is the metainfo service in charge of deleting pieces of
// storage nodes.
//
// architecture: Service
type DeletePiecesService struct {
log *zap.Logger
dialer rpc.Dialer
// TODO: v3-3406 this values is currently only used to limit the concurrent
// connections by each single method call.
maxConns int
}
// NewDeletePiecesService creates a new DeletePiecesService. maxConcurrentConns
// is the maximum number of connections that each single method call uses.
//
// It returns an error if maxConcurrentConns is less or equal than 0, dialer is
// a zero value or log is nil.
func NewDeletePiecesService(log *zap.Logger, dialer rpc.Dialer, maxConcurrentConns int) (*DeletePiecesService, error) {
// TODO: v3-3476 should we have an upper limit?
if maxConcurrentConns <= 0 {
return nil, ErrDeletePieces.New(
"max concurrent connections must be greater than 0, got %d", maxConcurrentConns,
)
}
if dialer == (rpc.Dialer{}) {
return nil, ErrDeletePieces.New("%s", "dialer cannot be its zero value")
}
if log == nil {
return nil, ErrDeletePieces.New("%s", "logger cannot be nil")
}
return &DeletePiecesService{
maxConns: maxConcurrentConns,
dialer: dialer,
log: log,
}, nil
}
// DeletePieces deletes all the indicated pieces of the nodes which are online
// stopping 300 milliseconds after reaching the successThreshold of the total
// number of pieces otherwise when trying to delete all the pieces finishes.
//
// It only returns an error if sync2.NewSuccessThreshold returns an error.
func (service *DeletePiecesService) DeletePieces(
ctx context.Context, nodes NodesPieces, successThreshold float64,
) (err error) {
defer mon.Task()(&ctx, len(nodes), nodes.NumPieces(), successThreshold)(&err)
threshold, err := sync2.NewSuccessThreshold(len(nodes), successThreshold)
if err != nil {
return err
}
// TODO: v3-3476 This timeout will go away in a second commit
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// TODO: v3-3406 this limiter will be global to the service instance if we
// decide to do so
limiter := sync2.NewLimiter(service.maxConns)
for _, n := range nodes {
node := n.Node
pieces := n.Pieces
// create batches if number of pieces are more than the maximum of number of piece ids that can be sent in a single request
pieceBatches := make([][]storj.PieceID, 0, (len(pieces)+piecesToDeleteLimit-1)/piecesToDeleteLimit)
for len(pieces) > piecesToDeleteLimit {
pieceBatches = append(pieceBatches, pieces[0:piecesToDeleteLimit])
pieces = pieces[piecesToDeleteLimit:]
}
if len(pieces) > 0 {
pieceBatches = append(pieceBatches, pieces)
}
limiter.Go(ctx, func() {
// Track the rate that each single node is dialed
mon.Event(fmt.Sprintf("DeletePieces_node_%s", node.Id.String()))
// Track the low/high/recent/average/quantiles of successful nodes dialing.
// Not stopping the timer doesn't leak resources.
timerDialSuccess := mon.Timer("DeletePieces_nodes_dial_success").Start()
client, err := piecestore.Dial(
ctx, service.dialer, node, service.log, piecestore.Config{},
)
if err != nil {
service.log.Warn("unable to dial storage node",
zap.Stringer("node_id", node.Id),
zap.Stringer("node_info", node),
zap.Error(err),
)
threshold.Failure()
// Pieces will be collected by garbage collector
return
}
timerDialSuccess.Stop()
defer func() {
err := client.Close()
if err != nil {
service.log.Warn("error closing the storage node client connection",
zap.Stringer("node_id", node.Id),
zap.Stringer("node_info", node),
zap.Error(err),
)
}
}()
for _, batch := range pieceBatches {
err := client.DeletePieces(ctx, batch...)
if err != nil {
// piece will be collected by garbage collector
service.log.Warn("unable to delete pieces of a storage node",
zap.Stringer("node_id", node.Id),
zap.Error(err),
)
// mark the node as failure if one error is returned since only authentication errors are returned by DeletePieces
threshold.Failure()
// Pieces will be collected by garbage collector
return
}
}
threshold.Success()
})
}
threshold.Wait(ctx)
// return to the client after the success threshold but wait some time before
// canceling the remaining deletes
timer := time.AfterFunc(200*time.Millisecond, cancel)
defer timer.Stop()
limiter.Wait()
return nil
}
// Close wait until all the resources used by the service are closed before
// returning.
func (service *DeletePiecesService) Close() error {
// TODO: orange/v3-3476 it will wait until all the goroutines run by the
// DeletePieces finish rather than using the current timeout.
return nil
}
// NodePieces indicates a list of pieces that belong to a storage node.
type NodePieces struct {
Node *pb.Node
Pieces []storj.PieceID
}
// NodesPieces is a slice of NodePieces
type NodesPieces []NodePieces
// NumPieces sums the number of pieces of all the storage nodes of the slice and
// returns it.
func (nodes NodesPieces) NumPieces() int {
total := 0
for _, node := range nodes {
total += len(node.Pieces)
}
return total
}