satellite/metainfo: Create service for deleting pieces

Create a service for deleting pieces of storage nodes.

Currently the DeletePieces method returns after a success threshold,
completion or a timeout.

The end goal is to return when reaching the success threshold and
leaving the remaining goroutines running after DeletePieces method
returns and add a life cycle to the service that it waits for them when
it closes.

This is the first commit for ticket:
https://storjlabs.atlassian.net/browse/V3-3476

Change-Id: If740bbf57c741f880449980b8176b036dd956c7b
This commit is contained in:
Ivan Fraixedes 2020-01-09 18:09:22 +01:00
parent 02ed2b5a19
commit 6dc948da46
No known key found for this signature in database
GPG Key ID: 042B474597F96DB7
4 changed files with 672 additions and 99 deletions

View File

@ -50,6 +50,9 @@ import (
"storj.io/storj/satellite/vouchers"
)
// TODO: orange/v3-3406 this value may change once it's used in production
const metainfoDeletePiecesConcurrencyLimit = 100
// API is the satellite API process
//
// architecture: Peer
@ -85,9 +88,10 @@ type API struct {
}
Metainfo struct {
Database metainfo.PointerDB
Service *metainfo.Service
Endpoint2 *metainfo.Endpoint
Database metainfo.PointerDB
Service *metainfo.Service
DeletePiecesService *metainfo.DeletePiecesService
Endpoint2 *metainfo.Endpoint
}
Inspector struct {
@ -293,15 +297,25 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metai
peer.Metainfo.Database,
peer.DB.Buckets(),
)
peer.Metainfo.DeletePiecesService, err = metainfo.NewDeletePiecesService(
peer.Log.Named("metainfo:DeletePiecesService"),
peer.Dialer,
metainfoDeletePiecesConcurrencyLimit,
)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
peer.Metainfo.Endpoint2 = metainfo.NewEndpoint(
peer.Log.Named("metainfo:endpoint"),
peer.Metainfo.Service,
peer.Metainfo.DeletePiecesService,
peer.Orders.Service,
peer.Overlay.Service,
peer.DB.Attribution(),
peer.Marketing.PartnersService,
peer.DB.PeerIdentities(),
peer.Dialer,
peer.DB.Console().APIKeys(),
peer.Accounting.ProjectUsage,
config.Metainfo.RS,
@ -572,6 +586,9 @@ func (peer *API) Close() error {
if peer.Orders.Chore.Loop != nil {
errlist.Add(peer.Orders.Chore.Close())
}
if peer.Metainfo.DeletePiecesService != nil {
errlist.Add(peer.Metainfo.DeletePiecesService.Close())
}
return errlist.Err()
}

View File

@ -0,0 +1,173 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package metainfo
import (
"context"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/pb"
"storj.io/common/rpc"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/uplink/piecestore"
)
// ErrDeletePieces is the general error class for DeletePiecesService
var ErrDeletePieces = errs.Class("metainfo storage node service")
// DeletePiecesService is the metainfo service in charge of deleting pieces of
// storage nodes.
//
// architecture: Service
type DeletePiecesService struct {
log *zap.Logger
dialer rpc.Dialer
// TODO: v3-3406 this values is currently only used to limit the concurrent
// connections by each single method call.
maxConns int
}
// NewDeletePiecesService creates a new DeletePiecesService. maxConcurrentConns
// is the maximum number of connections that each single method call uses.
//
// It returns an error if maxConcurrentConns is less or equal than 0, dialer is
// a zero value or log is nil.
func NewDeletePiecesService(log *zap.Logger, dialer rpc.Dialer, maxConcurrentConns int) (*DeletePiecesService, error) {
// TODO: v3-3476 should we have an upper limit?
if maxConcurrentConns <= 0 {
return nil, ErrDeletePieces.New(
"max concurrent connections must be greater than 0, got %d", maxConcurrentConns,
)
}
if dialer == (rpc.Dialer{}) {
return nil, ErrDeletePieces.New("%s", "dialer cannot be its zero value")
}
if log == nil {
return nil, ErrDeletePieces.New("%s", "logger cannot be nil")
}
return &DeletePiecesService{
maxConns: maxConcurrentConns,
dialer: dialer,
log: log,
}, nil
}
// DeletePieces deletes all the indicated pieces of the nodes which are online
// stopping 300 milliseconds after reaching the successThreshold of the total
// number of pieces otherwise when trying to delete all the pieces finishes.
//
// It only returns an error if sync2.NewSuccessThreshold returns an error.
func (service *DeletePiecesService) DeletePieces(
ctx context.Context, nodes NodesPieces, successThreshold float64,
) error {
threshold, err := sync2.NewSuccessThreshold(nodes.NumPieces(), successThreshold)
if err != nil {
return err
}
// TODO: v3-3476 This timeout will go away in a second commit
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// TODO: v3-3406 this limiter will be global to the service instance if we
// decide to do so
limiter := sync2.NewLimiter(service.maxConns)
for _, n := range nodes {
node := n.Node
pieces := n.Pieces
limiter.Go(ctx, func() {
client, err := piecestore.Dial(
ctx, service.dialer, node, service.log, piecestore.Config{},
)
if err != nil {
service.log.Warn("unable to dial storage node",
zap.Stringer("node_id", node.Id),
zap.Stringer("node_info", node),
zap.Error(err),
)
// Mark all the pieces of this node as failure in the success threshold
for range pieces {
threshold.Failure()
}
// Pieces will be collected by garbage collector
return
}
defer func() {
err := client.Close()
if err != nil {
service.log.Warn("error closing the storage node client connection",
zap.Stringer("node_id", node.Id),
zap.Stringer("node_info", node),
zap.Error(err),
)
}
}()
for _, id := range pieces {
err := client.DeletePiece(ctx, id)
if err != nil {
// piece will be collected by garbage collector
service.log.Warn("unable to delete piece of a storage node",
zap.Stringer("node_id", node.Id),
zap.Stringer("piece_id", id),
zap.Error(err),
)
threshold.Failure()
continue
}
threshold.Success()
}
})
}
threshold.Wait(ctx)
// return to the client after the success threshold but wait some time before
// canceling the remaining deletes
timer := time.AfterFunc(200*time.Millisecond, cancel)
defer timer.Stop()
limiter.Wait()
return nil
}
// Close wait until all the resources used by the service are closed before
// returning.
func (service *DeletePiecesService) Close() error {
// TODO: orange/v3-3476 it will wait until all the goroutines run by the
// DeletePieces finish rather than using the current timeout.
return nil
}
// NodePieces indicates a list of pieces that belong to a storage node.
type NodePieces struct {
Node *pb.Node
Pieces []storj.PieceID
}
// NodesPieces is a slice of NodePieces
type NodesPieces []NodePieces
// NumPieces sums the number of pieces of all the storage nodes of the slice and
// returns it.
func (nodes NodesPieces) NumPieces() int {
total := 0
for _, node := range nodes {
total += len(node.Pieces)
}
return total
}

View File

@ -0,0 +1,466 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package metainfo_test
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"storj.io/common/memory"
"storj.io/common/rpc"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/cmd/uplink/cmd"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/storagenode/pieces"
)
func TestNewDeletePiecesService(t *testing.T) {
type params struct {
maxConcurrentConns int
dialer rpc.Dialer
log *zap.Logger
}
var testCases = []struct {
desc string
args params
errMsg string
}{
{
desc: "ok",
args: params{
maxConcurrentConns: 10,
dialer: rpc.NewDefaultDialer(nil),
log: zaptest.NewLogger(t),
},
},
{
desc: "error: 0 maxConcurrentCons",
args: params{
maxConcurrentConns: 0,
dialer: rpc.NewDefaultDialer(nil),
log: zaptest.NewLogger(t),
},
errMsg: "greater than 0",
},
{
desc: "error: negative maxConcurrentCons",
args: params{
maxConcurrentConns: -3,
dialer: rpc.NewDefaultDialer(nil),
log: zaptest.NewLogger(t),
},
errMsg: "greater than 0",
},
{
desc: "error: zero dialer",
args: params{
maxConcurrentConns: 87,
dialer: rpc.Dialer{},
log: zaptest.NewLogger(t),
},
errMsg: "dialer cannot be its zero value",
},
{
desc: "error: nil logger",
args: params{
maxConcurrentConns: 2,
dialer: rpc.NewDefaultDialer(nil),
log: nil,
},
errMsg: "logger cannot be nil",
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.desc, func(t *testing.T) {
t.Parallel()
svc, err := metainfo.NewDeletePiecesService(tc.args.log, tc.args.dialer, tc.args.maxConcurrentConns)
if tc.errMsg != "" {
require.Error(t, err)
require.True(t, metainfo.ErrDeletePieces.Has(err), "unexpected error class")
require.Contains(t, err.Error(), tc.errMsg)
return
}
require.NoError(t, err)
require.NotNil(t, svc)
})
}
}
func TestDeletePiecesService_DeletePieces(t *testing.T) {
t.Run("all nodes up", func(t *testing.T) {
t.Parallel()
ctx := testcontext.New(t)
defer ctx.Cleanup()
planet, err := testplanet.New(t, 1, 4, 1)
require.NoError(t, err)
defer ctx.Check(planet.Shutdown)
planet.Start(ctx)
var (
uplnk = planet.Uplinks[0]
satelliteSys = planet.Satellites[0]
)
{
data := testrand.Bytes(10 * memory.KiB)
// Use RSConfig for ensuring that we don't have long-tail cancellations
// and the upload doesn't leave garbage in the SNs
err = uplnk.UploadWithClientConfig(ctx, satelliteSys, cmd.Config{
Client: cmd.ClientConfig{
SegmentSize: 10 * memory.KiB,
},
RS: cmd.RSConfig{
MinThreshold: 2,
RepairThreshold: 2,
SuccessThreshold: 4,
MaxThreshold: 4,
},
},
"a-bucket", "object-filename", data,
)
require.NoError(t, err)
}
var (
totalUsedSpace int64
nodesPieces metainfo.NodesPieces
)
for _, sn := range planet.StorageNodes {
// calculate the SNs total used space after data upload
usedSpace, err := sn.Storage2.Store.SpaceUsedForPieces(ctx)
require.NoError(t, err)
totalUsedSpace += usedSpace
// Get pb node and all the pieces of the storage node
dossier, err := satelliteSys.Overlay.Service.Get(ctx, sn.ID())
require.NoError(t, err)
nodePieces := metainfo.NodePieces{
Node: &dossier.Node,
}
err = sn.Storage2.Store.WalkSatellitePieces(ctx, satelliteSys.ID(),
func(store pieces.StoredPieceAccess) error {
nodePieces.Pieces = append(nodePieces.Pieces, store.PieceID())
return nil
},
)
require.NoError(t, err)
nodesPieces = append(nodesPieces, nodePieces)
}
err = satelliteSys.API.Metainfo.DeletePiecesService.DeletePieces(ctx, nodesPieces, 0.75)
require.NoError(t, err)
// calculate the SNs used space after delete the pieces
var totalUsedSpaceAfterDelete int64
for _, sn := range planet.StorageNodes {
usedSpace, err := sn.Storage2.Store.SpaceUsedForPieces(ctx)
require.NoError(t, err)
totalUsedSpaceAfterDelete += usedSpace
}
// At this point we can only guarantee that the 75% of the SNs pieces
// are delete due to the success threshold
deletedUsedSpace := float64(totalUsedSpace-totalUsedSpaceAfterDelete) / float64(totalUsedSpace)
if deletedUsedSpace < 0.75 {
t.Fatalf("deleted used space is less than 0.75%%. Got %f", deletedUsedSpace)
}
})
t.Run("some nodes down", func(t *testing.T) {
t.Parallel()
ctx := testcontext.New(t)
defer ctx.Cleanup()
planet, err := testplanet.New(t, 1, 4, 1)
require.NoError(t, err)
defer ctx.Check(planet.Shutdown)
planet.Start(ctx)
var (
uplnk = planet.Uplinks[0]
satelliteSys = planet.Satellites[0]
)
{
data := testrand.Bytes(10 * memory.KiB)
// Use RSConfig for ensuring that we don't have long-tail cancellations
// and the upload doesn't leave garbage in the SNs
err = uplnk.UploadWithClientConfig(ctx, satelliteSys, cmd.Config{
Client: cmd.ClientConfig{
SegmentSize: 10 * memory.KiB,
},
RS: cmd.RSConfig{
MinThreshold: 2,
RepairThreshold: 2,
SuccessThreshold: 4,
MaxThreshold: 4,
},
},
"a-bucket", "object-filename", data,
)
require.NoError(t, err)
}
var nodesPieces metainfo.NodesPieces
for i, sn := range planet.StorageNodes {
// Get pb node and all the pieces of the storage node
dossier, err := satelliteSys.Overlay.Service.Get(ctx, sn.ID())
require.NoError(t, err)
nodePieces := metainfo.NodePieces{
Node: &dossier.Node,
}
err = sn.Storage2.Store.WalkSatellitePieces(ctx, satelliteSys.ID(),
func(store pieces.StoredPieceAccess) error {
nodePieces.Pieces = append(nodePieces.Pieces, store.PieceID())
return nil
},
)
require.NoError(t, err)
nodesPieces = append(nodesPieces, nodePieces)
// stop the first 2 SNs before deleting pieces
if i < 2 {
require.NoError(t, planet.StopPeer(sn))
}
}
err = satelliteSys.API.Metainfo.DeletePiecesService.DeletePieces(ctx, nodesPieces, 0.9999)
require.NoError(t, err)
// Check that storage nodes which are online when deleting pieces don't
// hold any piece
var totalUsedSpace int64
for i := 2; i < len(planet.StorageNodes); i++ {
usedSpace, err := planet.StorageNodes[i].Storage2.Store.SpaceUsedForPieces(ctx)
require.NoError(t, err)
totalUsedSpace += usedSpace
}
require.Zero(t, totalUsedSpace, "totalUsedSpace online nodes")
})
t.Run("all nodes down", func(t *testing.T) {
t.Parallel()
ctx := testcontext.New(t)
defer ctx.Cleanup()
planet, err := testplanet.New(t, 1, 4, 1)
require.NoError(t, err)
defer ctx.Check(planet.Shutdown)
planet.Start(ctx)
var (
uplnk = planet.Uplinks[0]
satelliteSys = planet.Satellites[0]
)
{
data := testrand.Bytes(10 * memory.KiB)
// Use RSConfig for ensuring that we don't have long-tail cancellations
// and the upload doesn't leave garbage in the SNs
err = uplnk.UploadWithClientConfig(ctx, satelliteSys, cmd.Config{
Client: cmd.ClientConfig{
SegmentSize: 10 * memory.KiB,
},
RS: cmd.RSConfig{
MinThreshold: 2,
RepairThreshold: 2,
SuccessThreshold: 4,
MaxThreshold: 4,
},
},
"a-bucket", "object-filename", data,
)
require.NoError(t, err)
}
var (
expectedTotalUsedSpace int64
nodesPieces metainfo.NodesPieces
)
for _, sn := range planet.StorageNodes {
// calculate the SNs total used space after data upload
usedSpace, err := sn.Storage2.Store.SpaceUsedForPieces(ctx)
require.NoError(t, err)
expectedTotalUsedSpace += usedSpace
// Get pb node and all the pieces of the storage node
dossier, err := satelliteSys.Overlay.Service.Get(ctx, sn.ID())
require.NoError(t, err)
nodePieces := metainfo.NodePieces{
Node: &dossier.Node,
}
err = sn.Storage2.Store.WalkSatellitePieces(ctx, satelliteSys.ID(),
func(store pieces.StoredPieceAccess) error {
nodePieces.Pieces = append(nodePieces.Pieces, store.PieceID())
return nil
},
)
require.NoError(t, err)
nodesPieces = append(nodesPieces, nodePieces)
require.NoError(t, planet.StopPeer(sn))
}
err = satelliteSys.API.Metainfo.DeletePiecesService.DeletePieces(ctx, nodesPieces, 0.9999)
require.NoError(t, err)
var totalUsedSpace int64
for _, sn := range planet.StorageNodes {
// calculate the SNs total used space after data upload
usedSpace, err := sn.Storage2.Store.SpaceUsedForPieces(ctx)
require.NoError(t, err)
totalUsedSpace += usedSpace
}
require.Equal(t, expectedTotalUsedSpace, totalUsedSpace, "totalUsedSpace")
})
t.Run("invalid dialer", func(t *testing.T) {
t.Parallel()
ctx := testcontext.New(t)
defer ctx.Cleanup()
planet, err := testplanet.New(t, 1, 4, 1)
require.NoError(t, err)
defer ctx.Check(planet.Shutdown)
planet.Start(ctx)
var (
uplnk = planet.Uplinks[0]
satelliteSys = planet.Satellites[0]
)
{
data := testrand.Bytes(10 * memory.KiB)
// Use RSConfig for ensuring that we don't have long-tail cancellations
// and the upload doesn't leave garbage in the SNs
err = uplnk.UploadWithClientConfig(ctx, satelliteSys, cmd.Config{
Client: cmd.ClientConfig{
SegmentSize: 10 * memory.KiB,
},
RS: cmd.RSConfig{
MinThreshold: 2,
RepairThreshold: 2,
SuccessThreshold: 4,
MaxThreshold: 4,
},
},
"a-bucket", "object-filename", data,
)
require.NoError(t, err)
}
var (
expectedTotalUsedSpace int64
nodesPieces metainfo.NodesPieces
)
for _, sn := range planet.StorageNodes {
// calculate the SNs total used space after data upload
usedSpace, err := sn.Storage2.Store.SpaceUsedForPieces(ctx)
require.NoError(t, err)
expectedTotalUsedSpace += usedSpace
// Get pb node and all the pieces of the storage node
dossier, err := satelliteSys.Overlay.Service.Get(ctx, sn.ID())
require.NoError(t, err)
nodePieces := metainfo.NodePieces{
Node: &dossier.Node,
}
err = sn.Storage2.Store.WalkSatellitePieces(ctx, satelliteSys.ID(),
func(store pieces.StoredPieceAccess) error {
nodePieces.Pieces = append(nodePieces.Pieces, store.PieceID())
return nil
},
)
require.NoError(t, err)
nodesPieces = append(nodesPieces, nodePieces)
}
// The passed dialer cannot dial nodes because it doesn't have TLSOptions
dialer := satelliteSys.API.Dialer
dialer.TLSOptions = nil
service, err := metainfo.NewDeletePiecesService(
zaptest.NewLogger(t), dialer, len(nodesPieces)-1,
)
require.NoError(t, err)
err = service.DeletePieces(ctx, nodesPieces, 0.75)
require.NoError(t, err)
var totalUsedSpaceAfterDelete int64
for _, sn := range planet.StorageNodes {
usedSpace, err := sn.Storage2.Store.SpaceUsedForPieces(ctx)
require.NoError(t, err)
totalUsedSpaceAfterDelete += usedSpace
}
// because no node can be dialed the SNs used space should be the same
require.Equal(t, expectedTotalUsedSpace, totalUsedSpaceAfterDelete)
})
t.Run("empty nodes pieces", func(t *testing.T) {
t.Parallel()
ctx := testcontext.New(t)
defer ctx.Cleanup()
planet, err := testplanet.New(t, 1, 4, 1)
require.NoError(t, err)
defer ctx.Check(planet.Shutdown)
planet.Start(ctx)
err = planet.Satellites[0].API.Metainfo.DeletePiecesService.DeletePieces(ctx, metainfo.NodesPieces{}, 0.75)
require.Error(t, err)
assert.False(t, metainfo.ErrDeletePieces.Has(err), "unexpected error class")
assert.Contains(t, err.Error(), "invalid number of tasks")
})
t.Run("invalid threshold", func(t *testing.T) {
t.Parallel()
ctx := testcontext.New(t)
defer ctx.Cleanup()
planet, err := testplanet.New(t, 1, 4, 1)
require.NoError(t, err)
defer ctx.Check(planet.Shutdown)
planet.Start(ctx)
nodesPieces := make(metainfo.NodesPieces, 1)
nodesPieces[0].Pieces = make([]storj.PieceID, 2)
err = planet.Satellites[0].API.Metainfo.DeletePiecesService.DeletePieces(ctx, nodesPieces, 1)
require.Error(t, err)
assert.False(t, metainfo.ErrDeletePieces.Has(err), "unexpected error class")
assert.Contains(t, err.Error(), "invalid successThreshold")
})
}

View File

@ -21,11 +21,9 @@ import (
"storj.io/common/errs2"
"storj.io/common/identity"
"storj.io/common/pb"
"storj.io/common/rpc"
"storj.io/common/rpc/rpcstatus"
"storj.io/common/signing"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/storj/pkg/macaroon"
"storj.io/storj/private/context2"
"storj.io/storj/private/dbutil"
@ -36,7 +34,6 @@ import (
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/rewards"
"storj.io/uplink/eestream"
"storj.io/uplink/piecestore"
"storj.io/uplink/storage/meta"
)
@ -46,8 +43,6 @@ const (
lastSegment = -1
listLimit = 1000
// TODO: orange/v3-3406 this value may change once it's used in production
deleteObjectPiecesConcurrencyLimit = 100
deleteObjectPiecesSuccessThreshold = 0.75
)
@ -79,13 +74,13 @@ type Revocations interface {
type Endpoint struct {
log *zap.Logger
metainfo *Service
deletePieces *DeletePiecesService
orders *orders.Service
overlay *overlay.Service
attributions attribution.DB
partners *rewards.PartnersService
peerIdentities overlay.PeerIdentities
projectUsage *accounting.Service
dialer rpc.Dialer
apiKeys APIKeys
createRequests *createRequests
requiredRSConfig RSConfig
@ -94,19 +89,21 @@ type Endpoint struct {
}
// NewEndpoint creates new metainfo endpoint instance.
func NewEndpoint(log *zap.Logger, metainfo *Service, orders *orders.Service, cache *overlay.Service,
attributions attribution.DB, partners *rewards.PartnersService, peerIdentities overlay.PeerIdentities,
dialer rpc.Dialer, apiKeys APIKeys, projectUsage *accounting.Service, rsConfig RSConfig, satellite signing.Signer, maxCommitInterval time.Duration) *Endpoint {
func NewEndpoint(log *zap.Logger, metainfo *Service, deletePieces *DeletePiecesService,
orders *orders.Service, cache *overlay.Service, attributions attribution.DB,
partners *rewards.PartnersService, peerIdentities overlay.PeerIdentities,
apiKeys APIKeys, projectUsage *accounting.Service, rsConfig RSConfig,
satellite signing.Signer, maxCommitInterval time.Duration) *Endpoint {
// TODO do something with too many params
return &Endpoint{
log: log,
metainfo: metainfo,
deletePieces: deletePieces,
orders: orders,
overlay: cache,
attributions: attributions,
partners: partners,
peerIdentities: peerIdentities,
dialer: dialer,
apiKeys: apiKeys,
projectUsage: projectUsage,
createRequests: newCreateRequests(),
@ -2386,95 +2383,15 @@ func (endpoint *Endpoint) DeleteObjectPieces(
return nil
}
var successThreshold *sync2.SuccessThreshold
{
var numPieces int
for _, node := range nodes {
numPieces += len(nodesPieces[node.Id])
}
var err error
successThreshold, err = sync2.NewSuccessThreshold(numPieces, deleteObjectPiecesSuccessThreshold)
if err != nil {
endpoint.log.Error("error creating success threshold",
zap.Int("num_tasks", numPieces),
zap.Float32("success_threshold", deleteObjectPiecesSuccessThreshold),
zap.Error(err),
)
return rpcstatus.Errorf(rpcstatus.Internal,
"error creating success threshold: %+v", err.Error(),
)
}
}
// TODO: v3-3476 This timeout will go away when the service is implemented
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// TODO: v3-3406 Should we use a global limiter?
limiter := sync2.NewLimiter(deleteObjectPiecesConcurrencyLimit)
var nodesPiecesList NodesPieces
for _, node := range nodes {
node := node
nodePieces := nodesPieces[node.Id]
limiter.Go(ctx, func() {
client, err := piecestore.Dial(
ctx, endpoint.dialer, node, endpoint.log, piecestore.Config{},
)
if err != nil {
endpoint.log.Warn("unable to dial storage node",
zap.Stringer("node_id", node.Id),
zap.Stringer("node_info", node),
zap.Error(err),
)
// Mark all the pieces of this node as failure in the success threshold
for range nodePieces {
successThreshold.Failure()
}
// Pieces will be collected by garbage collector
return
}
defer func() {
err := client.Close()
if err != nil {
endpoint.log.Warn("error closing the storage node client connection",
zap.Stringer("node_id", node.Id),
zap.Stringer("node_info", node),
zap.Error(err),
)
}
}()
for _, pieceID := range nodePieces {
err := client.DeletePiece(ctx, pieceID)
if err != nil {
// piece will be collected by garbage collector
endpoint.log.Warn("unable to delete piece of a storage node",
zap.Stringer("node_id", node.Id),
zap.Stringer("piece_id", pieceID),
zap.Error(err),
)
successThreshold.Failure()
continue
}
successThreshold.Success()
}
nodesPiecesList = append(nodesPiecesList, NodePieces{
Node: node,
Pieces: nodesPieces[node.Id],
})
}
successThreshold.Wait(ctx)
// return to the client after the success threshold but wait some time before
// canceling the remaining deletes
timer := time.AfterFunc(200*time.Millisecond, cancel)
defer timer.Stop()
limiter.Wait()
return nil
return endpoint.deletePieces.DeletePieces(ctx, nodesPiecesList, deleteObjectPiecesSuccessThreshold)
}
// deletePointer deletes a pointer returning the deleted pointer.