From a785d3715731b30cef96580a81505c1d0b3d322a Mon Sep 17 00:00:00 2001 From: Isaac Hess Date: Mon, 20 Apr 2020 14:29:18 -0600 Subject: [PATCH] storagenode/pieces: Process deletes asynchronously To improve delete performance, we want to process deletes asynchronously once the message has been received from the satellite. This change makes it so that storagenodes will send the delete request to a piece Deleter, which will process a "best-effort" delete asynchronously and return a success message to the satellite. There is a configurable number of max delete workers and a max delete queue size. Change-Id: I016b68031f9065a9b09224f161b6783e18cf21e5 --- satellite/metainfo/endpoint_test.go | 84 +++++---- .../metainfo/piecedeletion/service_test.go | 37 +++- storagenode/peer.go | 9 + storagenode/pieces/deleter.go | 170 ++++++++++++++++++ storagenode/pieces/deleter_test.go | 74 ++++++++ storagenode/piecestore/endpoint.go | 40 ++--- storagenode/piecestore/endpoint_test.go | 10 +- 7 files changed, 353 insertions(+), 71 deletions(-) create mode 100644 storagenode/pieces/deleter.go create mode 100644 storagenode/pieces/deleter_test.go diff --git a/satellite/metainfo/endpoint_test.go b/satellite/metainfo/endpoint_test.go index 0b2ec1d9c..6cc9de048 100644 --- a/satellite/metainfo/endpoint_test.go +++ b/satellite/metainfo/endpoint_test.go @@ -25,38 +25,44 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) { t.Run("all nodes up", func(t *testing.T) { t.Parallel() - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, - Reconfigure: testplanet.Reconfigure{ - // Reconfigure RS for ensuring that we don't have long-tail cancellations - // and the upload doesn't leave garbage in the SNs - Satellite: testplanet.ReconfigureRS(2, 2, 4, 4), - }, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - var ( - uplnk = planet.Uplinks[0] - satelliteSys = planet.Satellites[0] - ) + var testCases = []struct { + caseDescription string + objData []byte + hasRemote bool + }{ + {caseDescription: "one remote segment", objData: testrand.Bytes(10 * memory.KiB)}, + {caseDescription: "one inline segment", objData: testrand.Bytes(3 * memory.KiB)}, + {caseDescription: "several segments (all remote)", objData: testrand.Bytes(50 * memory.KiB)}, + {caseDescription: "several segments (remote + inline)", objData: testrand.Bytes(33 * memory.KiB)}, + } - var testCases = []struct { - caseDescription string - objData []byte - }{ - {caseDescription: "one remote segment", objData: testrand.Bytes(10 * memory.KiB)}, - {caseDescription: "one inline segment", objData: testrand.Bytes(3 * memory.KiB)}, - {caseDescription: "several segments (all remote)", objData: testrand.Bytes(50 * memory.KiB)}, - {caseDescription: "several segments (remote + inline)", objData: testrand.Bytes(33 * memory.KiB)}, - } + for i, tc := range testCases { + i := i + tc := tc + t.Run(tc.caseDescription, func(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + // Reconfigure RS for ensuring that we don't have long-tail cancellations + // and the upload doesn't leave garbage in the SNs + Satellite: testplanet.ReconfigureRS(2, 2, 4, 4), + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + var ( + uplnk = planet.Uplinks[0] + satelliteSys = planet.Satellites[0] + ) - for i, tc := range testCases { - i := i - tc := tc - t.Run(tc.caseDescription, func(t *testing.T) { var ( bucketName = "a-bucket" objectName = "object-filename" + strconv.Itoa(i) + percentExp = 0.75 ) + for _, sn := range planet.StorageNodes { + sn.Peer.Storage2.PieceDeleter.SetupTest() + } + err := uplnk.UploadWithClientConfig(ctx, satelliteSys, testplanet.UplinkConfig{ Client: testplanet.ClientConfig{ SegmentSize: 10 * memory.KiB, @@ -83,6 +89,7 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) { // calculate the SNs used space after delete the pieces var totalUsedSpaceAfterDelete int64 for _, sn := range planet.StorageNodes { + sn.Peer.Storage2.PieceDeleter.Wait() piecesTotal, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx) require.NoError(t, err) totalUsedSpaceAfterDelete += piecesTotal @@ -91,12 +98,14 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) { // At this point we can only guarantee that the 75% of the SNs pieces // are delete due to the success threshold deletedUsedSpace := float64(totalUsedSpace-totalUsedSpaceAfterDelete) / float64(totalUsedSpace) - if deletedUsedSpace < 0.75 { - t.Fatalf("deleted used space is less than 0.75%%. Got %f", deletedUsedSpace) + if deletedUsedSpace < percentExp { + t.Fatalf("deleted used space is less than %f%%. Got %f", percentExp, deletedUsedSpace) } + }) - } - }) + + }) + } }) t.Run("some nodes down", func(t *testing.T) { @@ -128,10 +137,17 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) { Satellite: testplanet.ReconfigureRS(2, 2, 4, 4), }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + numToShutdown := 2 + var ( uplnk = planet.Uplinks[0] satelliteSys = planet.Satellites[0] ) + + for _, sn := range planet.StorageNodes { + sn.Peer.Storage2.PieceDeleter.SetupTest() + } + err := uplnk.UploadWithClientConfig(ctx, satelliteSys, testplanet.UplinkConfig{ Client: testplanet.ClientConfig{ SegmentSize: 10 * memory.KiB, @@ -139,7 +155,7 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) { }, bucketName, objectName, tc.objData) require.NoError(t, err) - // Shutdown the first 2 storage nodes before we delete the pieces + // Shutdown the first numToShutdown storage nodes before we delete the pieces require.NoError(t, planet.StopPeer(planet.StorageNodes[0])) require.NoError(t, planet.StopPeer(planet.StorageNodes[1])) @@ -149,10 +165,14 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) { ) require.NoError(t, err) + for _, sn := range planet.StorageNodes { + sn.Peer.Storage2.PieceDeleter.Wait() + } + // Check that storage nodes that were offline when deleting the pieces // they are still holding data var totalUsedSpace int64 - for i := 0; i < 2; i++ { + for i := 0; i < numToShutdown; i++ { piecesTotal, _, err := planet.StorageNodes[i].Storage2.Store.SpaceUsedForPieces(ctx) require.NoError(t, err) totalUsedSpace += piecesTotal @@ -163,7 +183,7 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) { // Check that storage nodes which are online when deleting pieces don't // hold any piece totalUsedSpace = 0 - for i := 2; i < len(planet.StorageNodes); i++ { + for i := numToShutdown; i < len(planet.StorageNodes); i++ { piecesTotal, _, err := planet.StorageNodes[i].Storage2.Store.SpaceUsedForPieces(ctx) require.NoError(t, err) totalUsedSpace += piecesTotal diff --git a/satellite/metainfo/piecedeletion/service_test.go b/satellite/metainfo/piecedeletion/service_test.go index 2511677ae..9d5112332 100644 --- a/satellite/metainfo/piecedeletion/service_test.go +++ b/satellite/metainfo/piecedeletion/service_test.go @@ -87,6 +87,12 @@ func TestService_DeletePieces_AllNodesUp(t *testing.T) { uplnk := planet.Uplinks[0] satelliteSys := planet.Satellites[0] + percentExp := 0.75 + + for _, sn := range planet.StorageNodes { + sn.Peer.Storage2.PieceDeleter.SetupTest() + } + { data := testrand.Bytes(10 * memory.KiB) err := uplnk.UploadWithClientConfig(ctx, satelliteSys, testplanet.UplinkConfig{ @@ -99,8 +105,8 @@ func TestService_DeletePieces_AllNodesUp(t *testing.T) { require.NoError(t, err) } - // ensure that no requests doesn't return an error - err := satelliteSys.API.Metainfo.PieceDeletion.Delete(ctx, nil, 0.75) + // ensure that no requests return an error + err := satelliteSys.API.Metainfo.PieceDeletion.Delete(ctx, nil, percentExp) require.NoError(t, err) var ( @@ -132,12 +138,13 @@ func TestService_DeletePieces_AllNodesUp(t *testing.T) { requests = append(requests, nodePieces) } - err = satelliteSys.API.Metainfo.PieceDeletion.Delete(ctx, requests, 0.75) + err = satelliteSys.API.Metainfo.PieceDeletion.Delete(ctx, requests, percentExp) require.NoError(t, err) // calculate the SNs used space after delete the pieces var totalUsedSpaceAfterDelete int64 for _, sn := range planet.StorageNodes { + sn.Peer.Storage2.PieceDeleter.Wait() piecesTotal, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx) require.NoError(t, err) totalUsedSpaceAfterDelete += piecesTotal @@ -146,8 +153,8 @@ func TestService_DeletePieces_AllNodesUp(t *testing.T) { // At this point we can only guarantee that the 75% of the SNs pieces // are delete due to the success threshold deletedUsedSpace := float64(totalUsedSpace-totalUsedSpaceAfterDelete) / float64(totalUsedSpace) - if deletedUsedSpace < 0.75 { - t.Fatalf("deleted used space is less than 0.75%%. Got %f", deletedUsedSpace) + if deletedUsedSpace < percentExp { + t.Fatalf("deleted used space is less than %e%%. Got %f", percentExp, deletedUsedSpace) } }) } @@ -163,6 +170,11 @@ func TestService_DeletePieces_SomeNodesDown(t *testing.T) { }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { uplnk := planet.Uplinks[0] satelliteSys := planet.Satellites[0] + numToShutdown := 2 + + for _, sn := range planet.StorageNodes { + sn.Peer.Storage2.PieceDeleter.SetupTest() + } { data := testrand.Bytes(10 * memory.KiB) @@ -197,8 +209,8 @@ func TestService_DeletePieces_SomeNodesDown(t *testing.T) { requests = append(requests, nodePieces) - // stop the first 2 SNs before deleting pieces - if i < 2 { + // stop the first numToShutdown SNs before deleting pieces + if i < numToShutdown { require.NoError(t, planet.StopPeer(sn)) } } @@ -206,10 +218,14 @@ func TestService_DeletePieces_SomeNodesDown(t *testing.T) { err := satelliteSys.API.Metainfo.PieceDeletion.Delete(ctx, requests, 0.9999) require.NoError(t, err) + for _, sn := range planet.StorageNodes { + sn.Peer.Storage2.PieceDeleter.Wait() + } + // Check that storage nodes which are online when deleting pieces don't // hold any piece var totalUsedSpace int64 - for i := 2; i < len(planet.StorageNodes); i++ { + for i := numToShutdown; i < len(planet.StorageNodes); i++ { piecesTotal, _, err := planet.StorageNodes[i].Storage2.Store.SpaceUsedForPieces(ctx) require.NoError(t, err) totalUsedSpace += piecesTotal @@ -231,6 +247,10 @@ func TestService_DeletePieces_AllNodesDown(t *testing.T) { uplnk := planet.Uplinks[0] satelliteSys := planet.Satellites[0] + for _, sn := range planet.StorageNodes { + sn.Peer.Storage2.PieceDeleter.SetupTest() + } + { data := testrand.Bytes(10 * memory.KiB) err := uplnk.UploadWithClientConfig(ctx, satelliteSys, testplanet.UplinkConfig{ @@ -278,6 +298,7 @@ func TestService_DeletePieces_AllNodesDown(t *testing.T) { var totalUsedSpace int64 for _, sn := range planet.StorageNodes { + sn.Peer.Storage2.PieceDeleter.Wait() // calculate the SNs total used space after data upload piecesTotal, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx) require.NoError(t, err) diff --git a/storagenode/peer.go b/storagenode/peer.go index 98f1a606d..2048b5035 100644 --- a/storagenode/peer.go +++ b/storagenode/peer.go @@ -204,6 +204,7 @@ type Peer struct { BlobsCache *pieces.BlobsUsageCache CacheService *pieces.CacheService RetainService *retain.Service + PieceDeleter *pieces.Deleter Endpoint *piecestore.Endpoint Inspector *inspector.Endpoint Monitor *monitor.Service @@ -392,6 +393,13 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten peer.DB.PieceSpaceUsedDB(), ) + peer.Storage2.PieceDeleter = pieces.NewDeleter(log.Named("piecedeleter"), peer.Storage2.Store, config.Storage2.DeleteWorkers, config.Storage2.DeleteQueueSize) + peer.Services.Add(lifecycle.Item{ + Name: "PieceDeleter", + Run: peer.Storage2.PieceDeleter.Run, + Close: peer.Storage2.PieceDeleter.Close, + }) + peer.Storage2.TrashChore = pieces.NewTrashChore( log.Named("pieces:trash"), 24*time.Hour, // choreInterval: how often to run the chore @@ -457,6 +465,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten peer.Storage2.RetainService, peer.Contact.PingStats, peer.Storage2.Store, + peer.Storage2.PieceDeleter, peer.DB.Orders(), peer.DB.Bandwidth(), peer.DB.UsedSerials(), diff --git a/storagenode/pieces/deleter.go b/storagenode/pieces/deleter.go new file mode 100644 index 000000000..6c3d9aa87 --- /dev/null +++ b/storagenode/pieces/deleter.go @@ -0,0 +1,170 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +package pieces + +import ( + "context" + "sync" + + "github.com/zeebo/errs" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + + "storj.io/common/storj" +) + +// DeleteRequest contains information to delete piece. +type DeleteRequest struct { + SatelliteID storj.NodeID + PieceID storj.PieceID +} + +// Deleter is a worker that processes requests to delete groups of pieceIDs. +// Deletes are processed "best-effort" asynchronously, and any errors are +// logged. +type Deleter struct { + mu sync.Mutex + ch chan DeleteRequest + numWorkers int + eg *errgroup.Group + log *zap.Logger + stop func() + store *Store + closed bool + + // The test variables are only used when testing. + testToDelete int + testCond *sync.Cond +} + +// NewDeleter creates a new Deleter. +func NewDeleter(log *zap.Logger, store *Store, numWorkers int, queueSize int) *Deleter { + if numWorkers == 0 { + numWorkers = 1 + } + if queueSize == 0 { + // Default queueSize is chosen as a large number that uses a manageable + // amount of memory. + queueSize = 10000 + } + return &Deleter{ + ch: make(chan DeleteRequest, queueSize), + numWorkers: numWorkers, + log: log, + store: store, + } +} + +// Run starts the delete workers. +func (d *Deleter) Run(ctx context.Context) error { + d.mu.Lock() + defer d.mu.Unlock() + + if d.closed { + return errs.New("already closed") + } + + if d.stop != nil { + return errs.New("already started") + } + + ctx, d.stop = context.WithCancel(ctx) + d.eg = &errgroup.Group{} + + for i := 0; i < d.numWorkers; i++ { + d.eg.Go(func() error { + return d.work(ctx) + }) + } + + return nil +} + +// Enqueue adds the pieceIDs to the delete queue. If the queue is full deletes +// are not processed and will be left for garbage collection. +func (d *Deleter) Enqueue(ctx context.Context, satelliteID storj.NodeID, pieceIDs []storj.PieceID) { + for _, pieceID := range pieceIDs { + select { + case d.ch <- DeleteRequest{satelliteID, pieceID}: + default: + mon.Counter("piecedeleter-queue-full").Inc(1) + return + } + } + + // If we are in testMode add the number of pieceIDs waiting to be processed. + if d.testCond != nil { + d.mu.Lock() + d.testToDelete += len(pieceIDs) + d.mu.Unlock() + } +} + +func (d *Deleter) work(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return nil + case r := <-d.ch: + err := d.store.Delete(ctx, r.SatelliteID, r.PieceID) + if err != nil { + // If a piece cannot be deleted, we just log the error. + d.log.Error("delete failed", + zap.Stringer("Satellite ID", r.SatelliteID), + zap.Stringer("Piece ID", r.PieceID), + zap.Error(err), + ) + } else { + d.log.Info("deleted", + zap.Stringer("Satellite ID", r.SatelliteID), + zap.Stringer("Piece ID", r.PieceID), + ) + } + + // If we are in test mode, check whether we have processed all known + // deletes, and if so broadcast on the cond. + if d.testCond != nil { + d.mu.Lock() + if d.testToDelete > 0 { + d.testToDelete-- + } + if d.testToDelete == 0 { + d.testCond.Broadcast() + } + d.mu.Unlock() + } + } + } +} + +// Close stops all the workers and waits for them to finish. +func (d *Deleter) Close() error { + d.mu.Lock() + d.closed = true + stop := d.stop + eg := d.eg + d.mu.Unlock() + + if stop != nil { + stop() + } + if eg != nil { + return eg.Wait() + } + return nil +} + +// Wait blocks until the queue is empty. This can only be called after SetupTest. +func (d *Deleter) Wait() { + d.mu.Lock() + for d.testToDelete > 0 { + d.testCond.Wait() + } + d.mu.Unlock() +} + +// SetupTest puts the deleter in test mode. This should only be called in tests. +func (d *Deleter) SetupTest() { + d.testCond = sync.NewCond(&d.mu) +} diff --git a/storagenode/pieces/deleter_test.go b/storagenode/pieces/deleter_test.go new file mode 100644 index 000000000..140198b5c --- /dev/null +++ b/storagenode/pieces/deleter_test.go @@ -0,0 +1,74 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +package pieces_test + +import ( + "io/ioutil" + "strings" + "testing" + + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + + "storj.io/common/memory" + "storj.io/common/pb" + "storj.io/common/testcontext" + "storj.io/common/testrand" + "storj.io/storj/storage/filestore" + "storj.io/storj/storagenode/pieces" +) + +func TestDeleter(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + dir, err := filestore.NewDir(ctx.Dir("piecedeleter")) + require.NoError(t, err) + + blobs := filestore.New(zaptest.NewLogger(t), dir) + defer ctx.Check(blobs.Close) + + store := pieces.NewStore(zaptest.NewLogger(t), blobs, nil, nil, nil) + + // Also test that 0 works for maxWorkers + deleter := pieces.NewDeleter(zaptest.NewLogger(t), store, 0, 0) + defer ctx.Check(deleter.Close) + deleter.SetupTest() + + require.NoError(t, deleter.Run(ctx)) + + satelliteID := testrand.NodeID() + pieceID := testrand.PieceID() + + data := testrand.Bytes(memory.KB) + w, err := store.Writer(ctx, satelliteID, pieceID) + require.NoError(t, err) + _, err = w.Write(data) + require.NoError(t, err) + require.NoError(t, w.Commit(ctx, &pb.PieceHeader{})) + + // confirm we can read the data before delete + r, err := store.Reader(ctx, satelliteID, pieceID) + require.NoError(t, err) + + buf, err := ioutil.ReadAll(r) + require.NoError(t, err) + require.Equal(t, data, buf) + + // Delete the piece we've created + deleter.Enqueue(ctx, satelliteID, []pb.PieceID{pieceID}) + + // Also delete a random non-existent piece, so we know it doesn't blow up + // when this happens + deleter.Enqueue(ctx, satelliteID, []pb.PieceID{testrand.PieceID()}) + + // wait for test hook to fire twice + deleter.Wait() + + _, err = store.Reader(ctx, satelliteID, pieceID) + require.Condition(t, func() bool { + return strings.Contains(err.Error(), "file does not exist") || + strings.Contains(err.Error(), "The system cannot find the path specified") + }, "unexpected error message") +} diff --git a/storagenode/piecestore/endpoint.go b/storagenode/piecestore/endpoint.go index aceaac331..efd1cb365 100644 --- a/storagenode/piecestore/endpoint.go +++ b/storagenode/piecestore/endpoint.go @@ -55,6 +55,8 @@ type OldConfig struct { type Config struct { ExpirationGracePeriod time.Duration `help:"how soon before expiration date should things be considered expired" default:"48h0m0s"` MaxConcurrentRequests int `help:"how many concurrent requests are allowed, before uploads are rejected. 0 represents unlimited." default:"0"` + DeleteWorkers int `help:"how many piece delete workers" default:"0"` + DeleteQueueSize int `help:"size of the piece delete queue" default:"0"` OrderLimitGracePeriod time.Duration `help:"how long after OrderLimit creation date are OrderLimits no longer accepted" default:"24h0m0s"` CacheSyncInterval time.Duration `help:"how often the space used cache is synced to persistent storage" releaseDefault:"1h0m0s" devDefault:"0h1m0s"` StreamOperationTimeout time.Duration `help:"how long to spend waiting for a stream operation before canceling" default:"30m"` @@ -85,10 +87,11 @@ type Endpoint struct { retain *retain.Service pingStats pingStatsSource - store *pieces.Store - orders orders.DB - usage bandwidth.DB - usedSerials UsedSerials + store *pieces.Store + orders orders.DB + usage bandwidth.DB + usedSerials UsedSerials + pieceDeleter *pieces.Deleter // liveRequests tracks the total number of incoming rpc requests. For gRPC // requests only, this number is compared to config.MaxConcurrentRequests @@ -104,7 +107,7 @@ type drpcEndpoint struct{ *Endpoint } func (endpoint *Endpoint) DRPC() pb.DRPCPiecestoreServer { return &drpcEndpoint{Endpoint: endpoint} } // NewEndpoint creates a new piecestore endpoint. -func NewEndpoint(log *zap.Logger, signer signing.Signer, trust *trust.Pool, monitor *monitor.Service, retain *retain.Service, pingStats pingStatsSource, store *pieces.Store, orders orders.DB, usage bandwidth.DB, usedSerials UsedSerials, config Config) (*Endpoint, error) { +func NewEndpoint(log *zap.Logger, signer signing.Signer, trust *trust.Pool, monitor *monitor.Service, retain *retain.Service, pingStats pingStatsSource, store *pieces.Store, pieceDeleter *pieces.Deleter, orders orders.DB, usage bandwidth.DB, usedSerials UsedSerials, config Config) (*Endpoint, error) { // If config.MaxConcurrentRequests is set we want to repsect it for grpc. // However, if it is 0 (unlimited) we force a limit. grpcReqLimit := config.MaxConcurrentRequests @@ -123,10 +126,11 @@ func NewEndpoint(log *zap.Logger, signer signing.Signer, trust *trust.Pool, moni retain: retain, pingStats: pingStats, - store: store, - orders: orders, - usage: usage, - usedSerials: usedSerials, + store: store, + orders: orders, + usage: usage, + usedSerials: usedSerials, + pieceDeleter: pieceDeleter, liveRequests: 0, }, nil @@ -186,23 +190,7 @@ func (endpoint *Endpoint) DeletePieces( return nil, rpcstatus.Error(rpcstatus.PermissionDenied, "delete pieces called with untrusted ID") } - for _, pieceID := range req.PieceIds { - err = endpoint.store.Delete(ctx, peer.ID, pieceID) - if err != nil { - // If a piece cannot be deleted, we just log the error. - // No error is returned to the caller. - endpoint.log.Error("delete failed", - zap.Stringer("Satellite ID", peer.ID), - zap.Stringer("Piece ID", pieceID), - zap.Error(err), - ) - } else { - endpoint.log.Info("deleted", - zap.Stringer("Satellite ID", peer.ID), - zap.Stringer("Piece ID", pieceID), - ) - } - } + endpoint.pieceDeleter.Enqueue(ctx, peer.ID, req.PieceIds) return &pb.DeletePiecesResponse{}, nil } diff --git a/storagenode/piecestore/endpoint_test.go b/storagenode/piecestore/endpoint_test.go index ceba09839..753441530 100644 --- a/storagenode/piecestore/endpoint_test.go +++ b/storagenode/piecestore/endpoint_test.go @@ -383,7 +383,7 @@ func TestDeletePieces(t *testing.T) { } t.Run("Ok", func(t *testing.T) { - pieceIDs := []storj.PieceID{{1}, {2}, {3}, {4}} + pieceIDs := []storj.PieceID{testrand.PieceID(), testrand.PieceID(), testrand.PieceID(), testrand.PieceID()} dataArray := make([][]byte, len(pieceIDs)) for i, pieceID := range pieceIDs { dataArray[i], _, _ = uploadPiece(t, ctx, pieceID, planetSN, planet.Uplinks[0], planetSat) @@ -403,8 +403,8 @@ func TestDeletePieces(t *testing.T) { }) t.Run("Ok: one piece to delete is missing", func(t *testing.T) { - missingPieceID := storj.PieceID{12} - pieceIDs := []storj.PieceID{{1}, {2}, {3}, {4}} + missingPieceID := testrand.PieceID() + pieceIDs := []storj.PieceID{testrand.PieceID(), testrand.PieceID(), testrand.PieceID(), testrand.PieceID()} dataArray := make([][]byte, len(pieceIDs)) for i, pieceID := range pieceIDs { dataArray[i], _, _ = uploadPiece(t, ctx, pieceID, planetSN, planet.Uplinks[0], planetSat) @@ -424,7 +424,7 @@ func TestDeletePieces(t *testing.T) { }) t.Run("Ok: no piece deleted", func(t *testing.T) { - pieceID := storj.PieceID{10} + pieceID := testrand.PieceID() data, _, _ := uploadPiece(t, ctx, pieceID, planetSN, planet.Uplinks[0], planetSat) err := client.DeletePieces(ctx.Context) @@ -436,7 +436,7 @@ func TestDeletePieces(t *testing.T) { }) t.Run("error: permission denied", func(t *testing.T) { - pieceID := storj.PieceID{11} + pieceID := testrand.PieceID() data, _, _ := uploadPiece(t, ctx, pieceID, planetSN, planet.Uplinks[0], planetSat) client, err := planet.Uplinks[0].DialPiecestore(ctx, planetSN)