satellite/metainfo: Delete all the piece of a storage node in one single

request

Change-Id: Ia8758d36f1a113b545e4f746d74d172421f14b24
This commit is contained in:
Yingrong Zhao 2020-02-06 13:21:58 -05:00 committed by Yingrong Zhao
parent 3900dadafd
commit 3331b443e7
2 changed files with 27 additions and 24 deletions

View File

@ -20,6 +20,9 @@ import (
// ErrDeletePieces is the general error class for DeletePiecesService
var ErrDeletePieces = errs.Class("metainfo storage node service")
// piecesToDeleteLlimit is the maximum number of piece IDs can be sent to a storagenode in a request. Currently, the calculation is based on DRPC maximum message side, 4MB
const piecesToDeleteLimit = 1000
// DeletePiecesService is the metainfo service in charge of deleting pieces of
// storage nodes.
//
@ -69,7 +72,7 @@ func NewDeletePiecesService(log *zap.Logger, dialer rpc.Dialer, maxConcurrentCon
func (service *DeletePiecesService) DeletePieces(
ctx context.Context, nodes NodesPieces, successThreshold float64,
) error {
threshold, err := sync2.NewSuccessThreshold(nodes.NumPieces(), successThreshold)
threshold, err := sync2.NewSuccessThreshold(len(nodes), successThreshold)
if err != nil {
return err
}
@ -85,6 +88,16 @@ func (service *DeletePiecesService) DeletePieces(
node := n.Node
pieces := n.Pieces
// create batches if number of pieces are more than the maximum of number of piece ids that can be sent in a single request
pieceBatches := make([][]storj.PieceID, 0, (len(pieces)+piecesToDeleteLimit-1)/piecesToDeleteLimit)
for len(pieces) > piecesToDeleteLimit {
pieceBatches = append(pieceBatches, pieces[0:piecesToDeleteLimit])
pieces = pieces[piecesToDeleteLimit:]
}
if len(pieces) > 0 {
pieceBatches = append(pieceBatches, pieces)
}
limiter.Go(ctx, func() {
client, err := piecestore.Dial(
ctx, service.dialer, node, service.log, piecestore.Config{},
@ -96,10 +109,7 @@ func (service *DeletePiecesService) DeletePieces(
zap.Error(err),
)
// Mark all the pieces of this node as failure in the success threshold
for range pieces {
threshold.Failure()
}
threshold.Failure()
// Pieces will be collected by garbage collector
return
@ -115,22 +125,25 @@ func (service *DeletePiecesService) DeletePieces(
}
}()
for _, id := range pieces {
err := client.DeletePiece(ctx, id)
for _, batch := range pieceBatches {
err := client.DeletePieces(ctx, batch...)
if err != nil {
// piece will be collected by garbage collector
service.log.Warn("unable to delete piece of a storage node",
service.log.Warn("unable to delete pieces of a storage node",
zap.Stringer("node_id", node.Id),
zap.Stringer("piece_id", id),
zap.Error(err),
)
// mark the node as failure if one error is returned since only authentication errors are returned by DeletePieces
threshold.Failure()
continue
}
threshold.Success()
// Pieces will be collected by garbage collector
return
}
}
threshold.Success()
})
}
@ -160,14 +173,3 @@ type NodePieces struct {
// NodesPieces is a slice of NodePieces
type NodesPieces []NodePieces
// NumPieces sums the number of pieces of all the storage nodes of the slice and
// returns it.
func (nodes NodesPieces) NumPieces() int {
total := 0
for _, node := range nodes {
total += len(node.Pieces)
}
return total
}

View File

@ -356,7 +356,8 @@ func TestDeletePiecesService_DeletePieces_Invalid(t *testing.T) {
t.Run("invalid threshold", func(t *testing.T) {
t.Parallel()
nodesPieces := metainfo.NodesPieces{
{Pieces: make([]storj.PieceID, 2)},
{Pieces: make([]storj.PieceID, 1)},
{Pieces: make([]storj.PieceID, 1)},
}
err := deletePiecesService.DeletePieces(ctx, nodesPieces, 1)
require.Error(t, err)