From 6dc948da46cdaa63c687bbe8a7794f8a63c457bc Mon Sep 17 00:00:00 2001 From: Ivan Fraixedes Date: Thu, 9 Jan 2020 18:09:22 +0100 Subject: [PATCH] satellite/metainfo: Create service for deleting pieces Create a service for deleting pieces of storage nodes. Currently the DeletePieces method returns after a success threshold, completion or a timeout. The end goal is to return when reaching the success threshold and leaving the remaining goroutines running after DeletePieces method returns and add a life cycle to the service that it waits for them when it closes. This is the first commit for ticket: https://storjlabs.atlassian.net/browse/V3-3476 Change-Id: If740bbf57c741f880449980b8176b036dd956c7b --- satellite/api.go | 25 +- satellite/metainfo/delete_pieces_service.go | 173 +++++++ .../metainfo/delete_pieces_service_test.go | 466 ++++++++++++++++++ satellite/metainfo/metainfo.go | 107 +--- 4 files changed, 672 insertions(+), 99 deletions(-) create mode 100644 satellite/metainfo/delete_pieces_service.go create mode 100644 satellite/metainfo/delete_pieces_service_test.go diff --git a/satellite/api.go b/satellite/api.go index 45a0e8c4c..6617d4a00 100644 --- a/satellite/api.go +++ b/satellite/api.go @@ -50,6 +50,9 @@ import ( "storj.io/storj/satellite/vouchers" ) +// TODO: orange/v3-3406 this value may change once it's used in production +const metainfoDeletePiecesConcurrencyLimit = 100 + // API is the satellite API process // // architecture: Peer @@ -85,9 +88,10 @@ type API struct { } Metainfo struct { - Database metainfo.PointerDB - Service *metainfo.Service - Endpoint2 *metainfo.Endpoint + Database metainfo.PointerDB + Service *metainfo.Service + DeletePiecesService *metainfo.DeletePiecesService + Endpoint2 *metainfo.Endpoint } Inspector struct { @@ -293,15 +297,25 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metai peer.Metainfo.Database, peer.DB.Buckets(), ) + + peer.Metainfo.DeletePiecesService, err = metainfo.NewDeletePiecesService( + peer.Log.Named("metainfo:DeletePiecesService"), + peer.Dialer, + metainfoDeletePiecesConcurrencyLimit, + ) + if err != nil { + return nil, errs.Combine(err, peer.Close()) + } + peer.Metainfo.Endpoint2 = metainfo.NewEndpoint( peer.Log.Named("metainfo:endpoint"), peer.Metainfo.Service, + peer.Metainfo.DeletePiecesService, peer.Orders.Service, peer.Overlay.Service, peer.DB.Attribution(), peer.Marketing.PartnersService, peer.DB.PeerIdentities(), - peer.Dialer, peer.DB.Console().APIKeys(), peer.Accounting.ProjectUsage, config.Metainfo.RS, @@ -572,6 +586,9 @@ func (peer *API) Close() error { if peer.Orders.Chore.Loop != nil { errlist.Add(peer.Orders.Chore.Close()) } + if peer.Metainfo.DeletePiecesService != nil { + errlist.Add(peer.Metainfo.DeletePiecesService.Close()) + } return errlist.Err() } diff --git a/satellite/metainfo/delete_pieces_service.go b/satellite/metainfo/delete_pieces_service.go new file mode 100644 index 000000000..6bc037d54 --- /dev/null +++ b/satellite/metainfo/delete_pieces_service.go @@ -0,0 +1,173 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +package metainfo + +import ( + "context" + "time" + + "github.com/zeebo/errs" + "go.uber.org/zap" + + "storj.io/common/pb" + "storj.io/common/rpc" + "storj.io/common/storj" + "storj.io/common/sync2" + "storj.io/uplink/piecestore" +) + +// ErrDeletePieces is the general error class for DeletePiecesService +var ErrDeletePieces = errs.Class("metainfo storage node service") + +// DeletePiecesService is the metainfo service in charge of deleting pieces of +// storage nodes. +// +// architecture: Service +type DeletePiecesService struct { + log *zap.Logger + dialer rpc.Dialer + + // TODO: v3-3406 this values is currently only used to limit the concurrent + // connections by each single method call. + maxConns int +} + +// NewDeletePiecesService creates a new DeletePiecesService. maxConcurrentConns +// is the maximum number of connections that each single method call uses. +// +// It returns an error if maxConcurrentConns is less or equal than 0, dialer is +// a zero value or log is nil. +func NewDeletePiecesService(log *zap.Logger, dialer rpc.Dialer, maxConcurrentConns int) (*DeletePiecesService, error) { + // TODO: v3-3476 should we have an upper limit? + if maxConcurrentConns <= 0 { + return nil, ErrDeletePieces.New( + "max concurrent connections must be greater than 0, got %d", maxConcurrentConns, + ) + } + + if dialer == (rpc.Dialer{}) { + return nil, ErrDeletePieces.New("%s", "dialer cannot be its zero value") + } + + if log == nil { + return nil, ErrDeletePieces.New("%s", "logger cannot be nil") + } + + return &DeletePiecesService{ + maxConns: maxConcurrentConns, + dialer: dialer, + log: log, + }, nil +} + +// DeletePieces deletes all the indicated pieces of the nodes which are online +// stopping 300 milliseconds after reaching the successThreshold of the total +// number of pieces otherwise when trying to delete all the pieces finishes. +// +// It only returns an error if sync2.NewSuccessThreshold returns an error. +func (service *DeletePiecesService) DeletePieces( + ctx context.Context, nodes NodesPieces, successThreshold float64, +) error { + threshold, err := sync2.NewSuccessThreshold(nodes.NumPieces(), successThreshold) + if err != nil { + return err + } + + // TODO: v3-3476 This timeout will go away in a second commit + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // TODO: v3-3406 this limiter will be global to the service instance if we + // decide to do so + limiter := sync2.NewLimiter(service.maxConns) + for _, n := range nodes { + node := n.Node + pieces := n.Pieces + + limiter.Go(ctx, func() { + client, err := piecestore.Dial( + ctx, service.dialer, node, service.log, piecestore.Config{}, + ) + if err != nil { + service.log.Warn("unable to dial storage node", + zap.Stringer("node_id", node.Id), + zap.Stringer("node_info", node), + zap.Error(err), + ) + + // Mark all the pieces of this node as failure in the success threshold + for range pieces { + threshold.Failure() + } + + // Pieces will be collected by garbage collector + return + } + defer func() { + err := client.Close() + if err != nil { + service.log.Warn("error closing the storage node client connection", + zap.Stringer("node_id", node.Id), + zap.Stringer("node_info", node), + zap.Error(err), + ) + } + }() + + for _, id := range pieces { + err := client.DeletePiece(ctx, id) + if err != nil { + // piece will be collected by garbage collector + service.log.Warn("unable to delete piece of a storage node", + zap.Stringer("node_id", node.Id), + zap.Stringer("piece_id", id), + zap.Error(err), + ) + + threshold.Failure() + continue + } + + threshold.Success() + } + }) + } + + threshold.Wait(ctx) + // return to the client after the success threshold but wait some time before + // canceling the remaining deletes + timer := time.AfterFunc(200*time.Millisecond, cancel) + defer timer.Stop() + + limiter.Wait() + return nil +} + +// Close wait until all the resources used by the service are closed before +// returning. +func (service *DeletePiecesService) Close() error { + // TODO: orange/v3-3476 it will wait until all the goroutines run by the + // DeletePieces finish rather than using the current timeout. + return nil +} + +// NodePieces indicates a list of pieces that belong to a storage node. +type NodePieces struct { + Node *pb.Node + Pieces []storj.PieceID +} + +// NodesPieces is a slice of NodePieces +type NodesPieces []NodePieces + +// NumPieces sums the number of pieces of all the storage nodes of the slice and +// returns it. +func (nodes NodesPieces) NumPieces() int { + total := 0 + for _, node := range nodes { + total += len(node.Pieces) + } + + return total +} diff --git a/satellite/metainfo/delete_pieces_service_test.go b/satellite/metainfo/delete_pieces_service_test.go new file mode 100644 index 000000000..7d02b8ef2 --- /dev/null +++ b/satellite/metainfo/delete_pieces_service_test.go @@ -0,0 +1,466 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +package metainfo_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zaptest" + + "storj.io/common/memory" + "storj.io/common/rpc" + "storj.io/common/storj" + "storj.io/common/testcontext" + "storj.io/common/testrand" + "storj.io/storj/cmd/uplink/cmd" + "storj.io/storj/private/testplanet" + "storj.io/storj/satellite/metainfo" + "storj.io/storj/storagenode/pieces" +) + +func TestNewDeletePiecesService(t *testing.T) { + type params struct { + maxConcurrentConns int + dialer rpc.Dialer + log *zap.Logger + } + var testCases = []struct { + desc string + args params + errMsg string + }{ + { + desc: "ok", + args: params{ + maxConcurrentConns: 10, + dialer: rpc.NewDefaultDialer(nil), + log: zaptest.NewLogger(t), + }, + }, + { + desc: "error: 0 maxConcurrentCons", + args: params{ + maxConcurrentConns: 0, + dialer: rpc.NewDefaultDialer(nil), + log: zaptest.NewLogger(t), + }, + errMsg: "greater than 0", + }, + { + desc: "error: negative maxConcurrentCons", + args: params{ + maxConcurrentConns: -3, + dialer: rpc.NewDefaultDialer(nil), + log: zaptest.NewLogger(t), + }, + errMsg: "greater than 0", + }, + { + desc: "error: zero dialer", + args: params{ + maxConcurrentConns: 87, + dialer: rpc.Dialer{}, + log: zaptest.NewLogger(t), + }, + errMsg: "dialer cannot be its zero value", + }, + { + desc: "error: nil logger", + args: params{ + maxConcurrentConns: 2, + dialer: rpc.NewDefaultDialer(nil), + log: nil, + }, + errMsg: "logger cannot be nil", + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + svc, err := metainfo.NewDeletePiecesService(tc.args.log, tc.args.dialer, tc.args.maxConcurrentConns) + if tc.errMsg != "" { + require.Error(t, err) + require.True(t, metainfo.ErrDeletePieces.Has(err), "unexpected error class") + require.Contains(t, err.Error(), tc.errMsg) + return + } + + require.NoError(t, err) + require.NotNil(t, svc) + + }) + } +} + +func TestDeletePiecesService_DeletePieces(t *testing.T) { + t.Run("all nodes up", func(t *testing.T) { + t.Parallel() + + ctx := testcontext.New(t) + defer ctx.Cleanup() + + planet, err := testplanet.New(t, 1, 4, 1) + require.NoError(t, err) + defer ctx.Check(planet.Shutdown) + planet.Start(ctx) + + var ( + uplnk = planet.Uplinks[0] + satelliteSys = planet.Satellites[0] + ) + + { + data := testrand.Bytes(10 * memory.KiB) + // Use RSConfig for ensuring that we don't have long-tail cancellations + // and the upload doesn't leave garbage in the SNs + err = uplnk.UploadWithClientConfig(ctx, satelliteSys, cmd.Config{ + Client: cmd.ClientConfig{ + SegmentSize: 10 * memory.KiB, + }, + RS: cmd.RSConfig{ + MinThreshold: 2, + RepairThreshold: 2, + SuccessThreshold: 4, + MaxThreshold: 4, + }, + }, + "a-bucket", "object-filename", data, + ) + require.NoError(t, err) + } + + var ( + totalUsedSpace int64 + nodesPieces metainfo.NodesPieces + ) + for _, sn := range planet.StorageNodes { + // calculate the SNs total used space after data upload + usedSpace, err := sn.Storage2.Store.SpaceUsedForPieces(ctx) + require.NoError(t, err) + totalUsedSpace += usedSpace + + // Get pb node and all the pieces of the storage node + dossier, err := satelliteSys.Overlay.Service.Get(ctx, sn.ID()) + require.NoError(t, err) + + nodePieces := metainfo.NodePieces{ + Node: &dossier.Node, + } + + err = sn.Storage2.Store.WalkSatellitePieces(ctx, satelliteSys.ID(), + func(store pieces.StoredPieceAccess) error { + nodePieces.Pieces = append(nodePieces.Pieces, store.PieceID()) + return nil + }, + ) + require.NoError(t, err) + + nodesPieces = append(nodesPieces, nodePieces) + } + + err = satelliteSys.API.Metainfo.DeletePiecesService.DeletePieces(ctx, nodesPieces, 0.75) + require.NoError(t, err) + + // calculate the SNs used space after delete the pieces + var totalUsedSpaceAfterDelete int64 + for _, sn := range planet.StorageNodes { + usedSpace, err := sn.Storage2.Store.SpaceUsedForPieces(ctx) + require.NoError(t, err) + totalUsedSpaceAfterDelete += usedSpace + } + + // At this point we can only guarantee that the 75% of the SNs pieces + // are delete due to the success threshold + deletedUsedSpace := float64(totalUsedSpace-totalUsedSpaceAfterDelete) / float64(totalUsedSpace) + if deletedUsedSpace < 0.75 { + t.Fatalf("deleted used space is less than 0.75%%. Got %f", deletedUsedSpace) + } + }) + + t.Run("some nodes down", func(t *testing.T) { + t.Parallel() + + ctx := testcontext.New(t) + defer ctx.Cleanup() + + planet, err := testplanet.New(t, 1, 4, 1) + require.NoError(t, err) + defer ctx.Check(planet.Shutdown) + planet.Start(ctx) + + var ( + uplnk = planet.Uplinks[0] + satelliteSys = planet.Satellites[0] + ) + + { + data := testrand.Bytes(10 * memory.KiB) + // Use RSConfig for ensuring that we don't have long-tail cancellations + // and the upload doesn't leave garbage in the SNs + err = uplnk.UploadWithClientConfig(ctx, satelliteSys, cmd.Config{ + Client: cmd.ClientConfig{ + SegmentSize: 10 * memory.KiB, + }, + RS: cmd.RSConfig{ + MinThreshold: 2, + RepairThreshold: 2, + SuccessThreshold: 4, + MaxThreshold: 4, + }, + }, + "a-bucket", "object-filename", data, + ) + require.NoError(t, err) + } + + var nodesPieces metainfo.NodesPieces + for i, sn := range planet.StorageNodes { + // Get pb node and all the pieces of the storage node + dossier, err := satelliteSys.Overlay.Service.Get(ctx, sn.ID()) + require.NoError(t, err) + + nodePieces := metainfo.NodePieces{ + Node: &dossier.Node, + } + + err = sn.Storage2.Store.WalkSatellitePieces(ctx, satelliteSys.ID(), + func(store pieces.StoredPieceAccess) error { + nodePieces.Pieces = append(nodePieces.Pieces, store.PieceID()) + return nil + }, + ) + require.NoError(t, err) + + nodesPieces = append(nodesPieces, nodePieces) + + // stop the first 2 SNs before deleting pieces + if i < 2 { + require.NoError(t, planet.StopPeer(sn)) + } + } + + err = satelliteSys.API.Metainfo.DeletePiecesService.DeletePieces(ctx, nodesPieces, 0.9999) + require.NoError(t, err) + + // Check that storage nodes which are online when deleting pieces don't + // hold any piece + var totalUsedSpace int64 + for i := 2; i < len(planet.StorageNodes); i++ { + usedSpace, err := planet.StorageNodes[i].Storage2.Store.SpaceUsedForPieces(ctx) + require.NoError(t, err) + totalUsedSpace += usedSpace + } + + require.Zero(t, totalUsedSpace, "totalUsedSpace online nodes") + }) + + t.Run("all nodes down", func(t *testing.T) { + t.Parallel() + + ctx := testcontext.New(t) + defer ctx.Cleanup() + + planet, err := testplanet.New(t, 1, 4, 1) + require.NoError(t, err) + defer ctx.Check(planet.Shutdown) + planet.Start(ctx) + + var ( + uplnk = planet.Uplinks[0] + satelliteSys = planet.Satellites[0] + ) + + { + data := testrand.Bytes(10 * memory.KiB) + // Use RSConfig for ensuring that we don't have long-tail cancellations + // and the upload doesn't leave garbage in the SNs + err = uplnk.UploadWithClientConfig(ctx, satelliteSys, cmd.Config{ + Client: cmd.ClientConfig{ + SegmentSize: 10 * memory.KiB, + }, + RS: cmd.RSConfig{ + MinThreshold: 2, + RepairThreshold: 2, + SuccessThreshold: 4, + MaxThreshold: 4, + }, + }, + "a-bucket", "object-filename", data, + ) + require.NoError(t, err) + } + + var ( + expectedTotalUsedSpace int64 + nodesPieces metainfo.NodesPieces + ) + for _, sn := range planet.StorageNodes { + // calculate the SNs total used space after data upload + usedSpace, err := sn.Storage2.Store.SpaceUsedForPieces(ctx) + require.NoError(t, err) + expectedTotalUsedSpace += usedSpace + + // Get pb node and all the pieces of the storage node + dossier, err := satelliteSys.Overlay.Service.Get(ctx, sn.ID()) + require.NoError(t, err) + + nodePieces := metainfo.NodePieces{ + Node: &dossier.Node, + } + + err = sn.Storage2.Store.WalkSatellitePieces(ctx, satelliteSys.ID(), + func(store pieces.StoredPieceAccess) error { + nodePieces.Pieces = append(nodePieces.Pieces, store.PieceID()) + return nil + }, + ) + require.NoError(t, err) + + nodesPieces = append(nodesPieces, nodePieces) + require.NoError(t, planet.StopPeer(sn)) + } + + err = satelliteSys.API.Metainfo.DeletePiecesService.DeletePieces(ctx, nodesPieces, 0.9999) + require.NoError(t, err) + + var totalUsedSpace int64 + for _, sn := range planet.StorageNodes { + // calculate the SNs total used space after data upload + usedSpace, err := sn.Storage2.Store.SpaceUsedForPieces(ctx) + require.NoError(t, err) + totalUsedSpace += usedSpace + } + + require.Equal(t, expectedTotalUsedSpace, totalUsedSpace, "totalUsedSpace") + }) + + t.Run("invalid dialer", func(t *testing.T) { + t.Parallel() + + ctx := testcontext.New(t) + defer ctx.Cleanup() + + planet, err := testplanet.New(t, 1, 4, 1) + require.NoError(t, err) + defer ctx.Check(planet.Shutdown) + planet.Start(ctx) + + var ( + uplnk = planet.Uplinks[0] + satelliteSys = planet.Satellites[0] + ) + + { + data := testrand.Bytes(10 * memory.KiB) + // Use RSConfig for ensuring that we don't have long-tail cancellations + // and the upload doesn't leave garbage in the SNs + err = uplnk.UploadWithClientConfig(ctx, satelliteSys, cmd.Config{ + Client: cmd.ClientConfig{ + SegmentSize: 10 * memory.KiB, + }, + RS: cmd.RSConfig{ + MinThreshold: 2, + RepairThreshold: 2, + SuccessThreshold: 4, + MaxThreshold: 4, + }, + }, + "a-bucket", "object-filename", data, + ) + require.NoError(t, err) + } + + var ( + expectedTotalUsedSpace int64 + nodesPieces metainfo.NodesPieces + ) + for _, sn := range planet.StorageNodes { + // calculate the SNs total used space after data upload + usedSpace, err := sn.Storage2.Store.SpaceUsedForPieces(ctx) + require.NoError(t, err) + expectedTotalUsedSpace += usedSpace + + // Get pb node and all the pieces of the storage node + dossier, err := satelliteSys.Overlay.Service.Get(ctx, sn.ID()) + require.NoError(t, err) + + nodePieces := metainfo.NodePieces{ + Node: &dossier.Node, + } + + err = sn.Storage2.Store.WalkSatellitePieces(ctx, satelliteSys.ID(), + func(store pieces.StoredPieceAccess) error { + nodePieces.Pieces = append(nodePieces.Pieces, store.PieceID()) + return nil + }, + ) + require.NoError(t, err) + + nodesPieces = append(nodesPieces, nodePieces) + } + + // The passed dialer cannot dial nodes because it doesn't have TLSOptions + dialer := satelliteSys.API.Dialer + dialer.TLSOptions = nil + service, err := metainfo.NewDeletePiecesService( + zaptest.NewLogger(t), dialer, len(nodesPieces)-1, + ) + require.NoError(t, err) + + err = service.DeletePieces(ctx, nodesPieces, 0.75) + require.NoError(t, err) + + var totalUsedSpaceAfterDelete int64 + for _, sn := range planet.StorageNodes { + usedSpace, err := sn.Storage2.Store.SpaceUsedForPieces(ctx) + require.NoError(t, err) + totalUsedSpaceAfterDelete += usedSpace + } + + // because no node can be dialed the SNs used space should be the same + require.Equal(t, expectedTotalUsedSpace, totalUsedSpaceAfterDelete) + }) + + t.Run("empty nodes pieces", func(t *testing.T) { + t.Parallel() + + ctx := testcontext.New(t) + defer ctx.Cleanup() + + planet, err := testplanet.New(t, 1, 4, 1) + require.NoError(t, err) + defer ctx.Check(planet.Shutdown) + planet.Start(ctx) + + err = planet.Satellites[0].API.Metainfo.DeletePiecesService.DeletePieces(ctx, metainfo.NodesPieces{}, 0.75) + require.Error(t, err) + assert.False(t, metainfo.ErrDeletePieces.Has(err), "unexpected error class") + assert.Contains(t, err.Error(), "invalid number of tasks") + }) + + t.Run("invalid threshold", func(t *testing.T) { + t.Parallel() + + ctx := testcontext.New(t) + defer ctx.Cleanup() + + planet, err := testplanet.New(t, 1, 4, 1) + require.NoError(t, err) + defer ctx.Check(planet.Shutdown) + planet.Start(ctx) + + nodesPieces := make(metainfo.NodesPieces, 1) + nodesPieces[0].Pieces = make([]storj.PieceID, 2) + err = planet.Satellites[0].API.Metainfo.DeletePiecesService.DeletePieces(ctx, nodesPieces, 1) + require.Error(t, err) + assert.False(t, metainfo.ErrDeletePieces.Has(err), "unexpected error class") + assert.Contains(t, err.Error(), "invalid successThreshold") + }) +} diff --git a/satellite/metainfo/metainfo.go b/satellite/metainfo/metainfo.go index adf1e9440..d07348393 100644 --- a/satellite/metainfo/metainfo.go +++ b/satellite/metainfo/metainfo.go @@ -21,11 +21,9 @@ import ( "storj.io/common/errs2" "storj.io/common/identity" "storj.io/common/pb" - "storj.io/common/rpc" "storj.io/common/rpc/rpcstatus" "storj.io/common/signing" "storj.io/common/storj" - "storj.io/common/sync2" "storj.io/storj/pkg/macaroon" "storj.io/storj/private/context2" "storj.io/storj/private/dbutil" @@ -36,7 +34,6 @@ import ( "storj.io/storj/satellite/overlay" "storj.io/storj/satellite/rewards" "storj.io/uplink/eestream" - "storj.io/uplink/piecestore" "storj.io/uplink/storage/meta" ) @@ -46,8 +43,6 @@ const ( lastSegment = -1 listLimit = 1000 - // TODO: orange/v3-3406 this value may change once it's used in production - deleteObjectPiecesConcurrencyLimit = 100 deleteObjectPiecesSuccessThreshold = 0.75 ) @@ -79,13 +74,13 @@ type Revocations interface { type Endpoint struct { log *zap.Logger metainfo *Service + deletePieces *DeletePiecesService orders *orders.Service overlay *overlay.Service attributions attribution.DB partners *rewards.PartnersService peerIdentities overlay.PeerIdentities projectUsage *accounting.Service - dialer rpc.Dialer apiKeys APIKeys createRequests *createRequests requiredRSConfig RSConfig @@ -94,19 +89,21 @@ type Endpoint struct { } // NewEndpoint creates new metainfo endpoint instance. -func NewEndpoint(log *zap.Logger, metainfo *Service, orders *orders.Service, cache *overlay.Service, - attributions attribution.DB, partners *rewards.PartnersService, peerIdentities overlay.PeerIdentities, - dialer rpc.Dialer, apiKeys APIKeys, projectUsage *accounting.Service, rsConfig RSConfig, satellite signing.Signer, maxCommitInterval time.Duration) *Endpoint { +func NewEndpoint(log *zap.Logger, metainfo *Service, deletePieces *DeletePiecesService, + orders *orders.Service, cache *overlay.Service, attributions attribution.DB, + partners *rewards.PartnersService, peerIdentities overlay.PeerIdentities, + apiKeys APIKeys, projectUsage *accounting.Service, rsConfig RSConfig, + satellite signing.Signer, maxCommitInterval time.Duration) *Endpoint { // TODO do something with too many params return &Endpoint{ log: log, metainfo: metainfo, + deletePieces: deletePieces, orders: orders, overlay: cache, attributions: attributions, partners: partners, peerIdentities: peerIdentities, - dialer: dialer, apiKeys: apiKeys, projectUsage: projectUsage, createRequests: newCreateRequests(), @@ -2386,95 +2383,15 @@ func (endpoint *Endpoint) DeleteObjectPieces( return nil } - var successThreshold *sync2.SuccessThreshold - { - var numPieces int - for _, node := range nodes { - numPieces += len(nodesPieces[node.Id]) - } - - var err error - successThreshold, err = sync2.NewSuccessThreshold(numPieces, deleteObjectPiecesSuccessThreshold) - if err != nil { - endpoint.log.Error("error creating success threshold", - zap.Int("num_tasks", numPieces), - zap.Float32("success_threshold", deleteObjectPiecesSuccessThreshold), - zap.Error(err), - ) - - return rpcstatus.Errorf(rpcstatus.Internal, - "error creating success threshold: %+v", err.Error(), - ) - } - } - - // TODO: v3-3476 This timeout will go away when the service is implemented - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - // TODO: v3-3406 Should we use a global limiter? - limiter := sync2.NewLimiter(deleteObjectPiecesConcurrencyLimit) + var nodesPiecesList NodesPieces for _, node := range nodes { - node := node - nodePieces := nodesPieces[node.Id] - - limiter.Go(ctx, func() { - client, err := piecestore.Dial( - ctx, endpoint.dialer, node, endpoint.log, piecestore.Config{}, - ) - if err != nil { - endpoint.log.Warn("unable to dial storage node", - zap.Stringer("node_id", node.Id), - zap.Stringer("node_info", node), - zap.Error(err), - ) - - // Mark all the pieces of this node as failure in the success threshold - for range nodePieces { - successThreshold.Failure() - } - - // Pieces will be collected by garbage collector - return - } - defer func() { - err := client.Close() - if err != nil { - endpoint.log.Warn("error closing the storage node client connection", - zap.Stringer("node_id", node.Id), - zap.Stringer("node_info", node), - zap.Error(err), - ) - } - }() - - for _, pieceID := range nodePieces { - err := client.DeletePiece(ctx, pieceID) - if err != nil { - // piece will be collected by garbage collector - endpoint.log.Warn("unable to delete piece of a storage node", - zap.Stringer("node_id", node.Id), - zap.Stringer("piece_id", pieceID), - zap.Error(err), - ) - - successThreshold.Failure() - continue - } - - successThreshold.Success() - } + nodesPiecesList = append(nodesPiecesList, NodePieces{ + Node: node, + Pieces: nodesPieces[node.Id], }) } - successThreshold.Wait(ctx) - // return to the client after the success threshold but wait some time before - // canceling the remaining deletes - timer := time.AfterFunc(200*time.Millisecond, cancel) - defer timer.Stop() - - limiter.Wait() - return nil + return endpoint.deletePieces.DeletePieces(ctx, nodesPiecesList, deleteObjectPiecesSuccessThreshold) } // deletePointer deletes a pointer returning the deleted pointer.