storagenode/pieces: Process deletes asynchronously

To improve delete performance, we want to process deletes asynchronously
once the message has been received from the satellite. This change makes
it so that storagenodes will send the delete request to a piece Deleter,
which will process a "best-effort" delete asynchronously and return a
success message to the satellite.

There is a configurable number of max delete workers and a max delete
queue size.

Change-Id: I016b68031f9065a9b09224f161b6783e18cf21e5
This commit is contained in:
Isaac Hess 2020-04-20 14:29:18 -06:00
parent 1336070fec
commit a785d37157
7 changed files with 353 additions and 71 deletions

View File

@ -25,6 +25,21 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) {
t.Run("all nodes up", func(t *testing.T) { t.Run("all nodes up", func(t *testing.T) {
t.Parallel() t.Parallel()
var testCases = []struct {
caseDescription string
objData []byte
hasRemote bool
}{
{caseDescription: "one remote segment", objData: testrand.Bytes(10 * memory.KiB)},
{caseDescription: "one inline segment", objData: testrand.Bytes(3 * memory.KiB)},
{caseDescription: "several segments (all remote)", objData: testrand.Bytes(50 * memory.KiB)},
{caseDescription: "several segments (remote + inline)", objData: testrand.Bytes(33 * memory.KiB)},
}
for i, tc := range testCases {
i := i
tc := tc
t.Run(tc.caseDescription, func(t *testing.T) {
testplanet.Run(t, testplanet.Config{ testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{ Reconfigure: testplanet.Reconfigure{
@ -38,25 +53,16 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) {
satelliteSys = planet.Satellites[0] satelliteSys = planet.Satellites[0]
) )
var testCases = []struct {
caseDescription string
objData []byte
}{
{caseDescription: "one remote segment", objData: testrand.Bytes(10 * memory.KiB)},
{caseDescription: "one inline segment", objData: testrand.Bytes(3 * memory.KiB)},
{caseDescription: "several segments (all remote)", objData: testrand.Bytes(50 * memory.KiB)},
{caseDescription: "several segments (remote + inline)", objData: testrand.Bytes(33 * memory.KiB)},
}
for i, tc := range testCases {
i := i
tc := tc
t.Run(tc.caseDescription, func(t *testing.T) {
var ( var (
bucketName = "a-bucket" bucketName = "a-bucket"
objectName = "object-filename" + strconv.Itoa(i) objectName = "object-filename" + strconv.Itoa(i)
percentExp = 0.75
) )
for _, sn := range planet.StorageNodes {
sn.Peer.Storage2.PieceDeleter.SetupTest()
}
err := uplnk.UploadWithClientConfig(ctx, satelliteSys, testplanet.UplinkConfig{ err := uplnk.UploadWithClientConfig(ctx, satelliteSys, testplanet.UplinkConfig{
Client: testplanet.ClientConfig{ Client: testplanet.ClientConfig{
SegmentSize: 10 * memory.KiB, SegmentSize: 10 * memory.KiB,
@ -83,6 +89,7 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) {
// calculate the SNs used space after delete the pieces // calculate the SNs used space after delete the pieces
var totalUsedSpaceAfterDelete int64 var totalUsedSpaceAfterDelete int64
for _, sn := range planet.StorageNodes { for _, sn := range planet.StorageNodes {
sn.Peer.Storage2.PieceDeleter.Wait()
piecesTotal, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx) piecesTotal, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx)
require.NoError(t, err) require.NoError(t, err)
totalUsedSpaceAfterDelete += piecesTotal totalUsedSpaceAfterDelete += piecesTotal
@ -91,13 +98,15 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) {
// At this point we can only guarantee that the 75% of the SNs pieces // At this point we can only guarantee that the 75% of the SNs pieces
// are delete due to the success threshold // are delete due to the success threshold
deletedUsedSpace := float64(totalUsedSpace-totalUsedSpaceAfterDelete) / float64(totalUsedSpace) deletedUsedSpace := float64(totalUsedSpace-totalUsedSpaceAfterDelete) / float64(totalUsedSpace)
if deletedUsedSpace < 0.75 { if deletedUsedSpace < percentExp {
t.Fatalf("deleted used space is less than 0.75%%. Got %f", deletedUsedSpace) t.Fatalf("deleted used space is less than %f%%. Got %f", percentExp, deletedUsedSpace)
} }
})
}) })
} }
}) })
})
t.Run("some nodes down", func(t *testing.T) { t.Run("some nodes down", func(t *testing.T) {
t.Parallel() t.Parallel()
@ -128,10 +137,17 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) {
Satellite: testplanet.ReconfigureRS(2, 2, 4, 4), Satellite: testplanet.ReconfigureRS(2, 2, 4, 4),
}, },
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
numToShutdown := 2
var ( var (
uplnk = planet.Uplinks[0] uplnk = planet.Uplinks[0]
satelliteSys = planet.Satellites[0] satelliteSys = planet.Satellites[0]
) )
for _, sn := range planet.StorageNodes {
sn.Peer.Storage2.PieceDeleter.SetupTest()
}
err := uplnk.UploadWithClientConfig(ctx, satelliteSys, testplanet.UplinkConfig{ err := uplnk.UploadWithClientConfig(ctx, satelliteSys, testplanet.UplinkConfig{
Client: testplanet.ClientConfig{ Client: testplanet.ClientConfig{
SegmentSize: 10 * memory.KiB, SegmentSize: 10 * memory.KiB,
@ -139,7 +155,7 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) {
}, bucketName, objectName, tc.objData) }, bucketName, objectName, tc.objData)
require.NoError(t, err) require.NoError(t, err)
// Shutdown the first 2 storage nodes before we delete the pieces // Shutdown the first numToShutdown storage nodes before we delete the pieces
require.NoError(t, planet.StopPeer(planet.StorageNodes[0])) require.NoError(t, planet.StopPeer(planet.StorageNodes[0]))
require.NoError(t, planet.StopPeer(planet.StorageNodes[1])) require.NoError(t, planet.StopPeer(planet.StorageNodes[1]))
@ -149,10 +165,14 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) {
) )
require.NoError(t, err) require.NoError(t, err)
for _, sn := range planet.StorageNodes {
sn.Peer.Storage2.PieceDeleter.Wait()
}
// Check that storage nodes that were offline when deleting the pieces // Check that storage nodes that were offline when deleting the pieces
// they are still holding data // they are still holding data
var totalUsedSpace int64 var totalUsedSpace int64
for i := 0; i < 2; i++ { for i := 0; i < numToShutdown; i++ {
piecesTotal, _, err := planet.StorageNodes[i].Storage2.Store.SpaceUsedForPieces(ctx) piecesTotal, _, err := planet.StorageNodes[i].Storage2.Store.SpaceUsedForPieces(ctx)
require.NoError(t, err) require.NoError(t, err)
totalUsedSpace += piecesTotal totalUsedSpace += piecesTotal
@ -163,7 +183,7 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) {
// Check that storage nodes which are online when deleting pieces don't // Check that storage nodes which are online when deleting pieces don't
// hold any piece // hold any piece
totalUsedSpace = 0 totalUsedSpace = 0
for i := 2; i < len(planet.StorageNodes); i++ { for i := numToShutdown; i < len(planet.StorageNodes); i++ {
piecesTotal, _, err := planet.StorageNodes[i].Storage2.Store.SpaceUsedForPieces(ctx) piecesTotal, _, err := planet.StorageNodes[i].Storage2.Store.SpaceUsedForPieces(ctx)
require.NoError(t, err) require.NoError(t, err)
totalUsedSpace += piecesTotal totalUsedSpace += piecesTotal

View File

@ -87,6 +87,12 @@ func TestService_DeletePieces_AllNodesUp(t *testing.T) {
uplnk := planet.Uplinks[0] uplnk := planet.Uplinks[0]
satelliteSys := planet.Satellites[0] satelliteSys := planet.Satellites[0]
percentExp := 0.75
for _, sn := range planet.StorageNodes {
sn.Peer.Storage2.PieceDeleter.SetupTest()
}
{ {
data := testrand.Bytes(10 * memory.KiB) data := testrand.Bytes(10 * memory.KiB)
err := uplnk.UploadWithClientConfig(ctx, satelliteSys, testplanet.UplinkConfig{ err := uplnk.UploadWithClientConfig(ctx, satelliteSys, testplanet.UplinkConfig{
@ -99,8 +105,8 @@ func TestService_DeletePieces_AllNodesUp(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
} }
// ensure that no requests doesn't return an error // ensure that no requests return an error
err := satelliteSys.API.Metainfo.PieceDeletion.Delete(ctx, nil, 0.75) err := satelliteSys.API.Metainfo.PieceDeletion.Delete(ctx, nil, percentExp)
require.NoError(t, err) require.NoError(t, err)
var ( var (
@ -132,12 +138,13 @@ func TestService_DeletePieces_AllNodesUp(t *testing.T) {
requests = append(requests, nodePieces) requests = append(requests, nodePieces)
} }
err = satelliteSys.API.Metainfo.PieceDeletion.Delete(ctx, requests, 0.75) err = satelliteSys.API.Metainfo.PieceDeletion.Delete(ctx, requests, percentExp)
require.NoError(t, err) require.NoError(t, err)
// calculate the SNs used space after delete the pieces // calculate the SNs used space after delete the pieces
var totalUsedSpaceAfterDelete int64 var totalUsedSpaceAfterDelete int64
for _, sn := range planet.StorageNodes { for _, sn := range planet.StorageNodes {
sn.Peer.Storage2.PieceDeleter.Wait()
piecesTotal, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx) piecesTotal, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx)
require.NoError(t, err) require.NoError(t, err)
totalUsedSpaceAfterDelete += piecesTotal totalUsedSpaceAfterDelete += piecesTotal
@ -146,8 +153,8 @@ func TestService_DeletePieces_AllNodesUp(t *testing.T) {
// At this point we can only guarantee that the 75% of the SNs pieces // At this point we can only guarantee that the 75% of the SNs pieces
// are delete due to the success threshold // are delete due to the success threshold
deletedUsedSpace := float64(totalUsedSpace-totalUsedSpaceAfterDelete) / float64(totalUsedSpace) deletedUsedSpace := float64(totalUsedSpace-totalUsedSpaceAfterDelete) / float64(totalUsedSpace)
if deletedUsedSpace < 0.75 { if deletedUsedSpace < percentExp {
t.Fatalf("deleted used space is less than 0.75%%. Got %f", deletedUsedSpace) t.Fatalf("deleted used space is less than %e%%. Got %f", percentExp, deletedUsedSpace)
} }
}) })
} }
@ -163,6 +170,11 @@ func TestService_DeletePieces_SomeNodesDown(t *testing.T) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
uplnk := planet.Uplinks[0] uplnk := planet.Uplinks[0]
satelliteSys := planet.Satellites[0] satelliteSys := planet.Satellites[0]
numToShutdown := 2
for _, sn := range planet.StorageNodes {
sn.Peer.Storage2.PieceDeleter.SetupTest()
}
{ {
data := testrand.Bytes(10 * memory.KiB) data := testrand.Bytes(10 * memory.KiB)
@ -197,8 +209,8 @@ func TestService_DeletePieces_SomeNodesDown(t *testing.T) {
requests = append(requests, nodePieces) requests = append(requests, nodePieces)
// stop the first 2 SNs before deleting pieces // stop the first numToShutdown SNs before deleting pieces
if i < 2 { if i < numToShutdown {
require.NoError(t, planet.StopPeer(sn)) require.NoError(t, planet.StopPeer(sn))
} }
} }
@ -206,10 +218,14 @@ func TestService_DeletePieces_SomeNodesDown(t *testing.T) {
err := satelliteSys.API.Metainfo.PieceDeletion.Delete(ctx, requests, 0.9999) err := satelliteSys.API.Metainfo.PieceDeletion.Delete(ctx, requests, 0.9999)
require.NoError(t, err) require.NoError(t, err)
for _, sn := range planet.StorageNodes {
sn.Peer.Storage2.PieceDeleter.Wait()
}
// Check that storage nodes which are online when deleting pieces don't // Check that storage nodes which are online when deleting pieces don't
// hold any piece // hold any piece
var totalUsedSpace int64 var totalUsedSpace int64
for i := 2; i < len(planet.StorageNodes); i++ { for i := numToShutdown; i < len(planet.StorageNodes); i++ {
piecesTotal, _, err := planet.StorageNodes[i].Storage2.Store.SpaceUsedForPieces(ctx) piecesTotal, _, err := planet.StorageNodes[i].Storage2.Store.SpaceUsedForPieces(ctx)
require.NoError(t, err) require.NoError(t, err)
totalUsedSpace += piecesTotal totalUsedSpace += piecesTotal
@ -231,6 +247,10 @@ func TestService_DeletePieces_AllNodesDown(t *testing.T) {
uplnk := planet.Uplinks[0] uplnk := planet.Uplinks[0]
satelliteSys := planet.Satellites[0] satelliteSys := planet.Satellites[0]
for _, sn := range planet.StorageNodes {
sn.Peer.Storage2.PieceDeleter.SetupTest()
}
{ {
data := testrand.Bytes(10 * memory.KiB) data := testrand.Bytes(10 * memory.KiB)
err := uplnk.UploadWithClientConfig(ctx, satelliteSys, testplanet.UplinkConfig{ err := uplnk.UploadWithClientConfig(ctx, satelliteSys, testplanet.UplinkConfig{
@ -278,6 +298,7 @@ func TestService_DeletePieces_AllNodesDown(t *testing.T) {
var totalUsedSpace int64 var totalUsedSpace int64
for _, sn := range planet.StorageNodes { for _, sn := range planet.StorageNodes {
sn.Peer.Storage2.PieceDeleter.Wait()
// calculate the SNs total used space after data upload // calculate the SNs total used space after data upload
piecesTotal, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx) piecesTotal, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx)
require.NoError(t, err) require.NoError(t, err)

View File

@ -204,6 +204,7 @@ type Peer struct {
BlobsCache *pieces.BlobsUsageCache BlobsCache *pieces.BlobsUsageCache
CacheService *pieces.CacheService CacheService *pieces.CacheService
RetainService *retain.Service RetainService *retain.Service
PieceDeleter *pieces.Deleter
Endpoint *piecestore.Endpoint Endpoint *piecestore.Endpoint
Inspector *inspector.Endpoint Inspector *inspector.Endpoint
Monitor *monitor.Service Monitor *monitor.Service
@ -392,6 +393,13 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
peer.DB.PieceSpaceUsedDB(), peer.DB.PieceSpaceUsedDB(),
) )
peer.Storage2.PieceDeleter = pieces.NewDeleter(log.Named("piecedeleter"), peer.Storage2.Store, config.Storage2.DeleteWorkers, config.Storage2.DeleteQueueSize)
peer.Services.Add(lifecycle.Item{
Name: "PieceDeleter",
Run: peer.Storage2.PieceDeleter.Run,
Close: peer.Storage2.PieceDeleter.Close,
})
peer.Storage2.TrashChore = pieces.NewTrashChore( peer.Storage2.TrashChore = pieces.NewTrashChore(
log.Named("pieces:trash"), log.Named("pieces:trash"),
24*time.Hour, // choreInterval: how often to run the chore 24*time.Hour, // choreInterval: how often to run the chore
@ -457,6 +465,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
peer.Storage2.RetainService, peer.Storage2.RetainService,
peer.Contact.PingStats, peer.Contact.PingStats,
peer.Storage2.Store, peer.Storage2.Store,
peer.Storage2.PieceDeleter,
peer.DB.Orders(), peer.DB.Orders(),
peer.DB.Bandwidth(), peer.DB.Bandwidth(),
peer.DB.UsedSerials(), peer.DB.UsedSerials(),

View File

@ -0,0 +1,170 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package pieces
import (
"context"
"sync"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"storj.io/common/storj"
)
// DeleteRequest contains information to delete piece.
type DeleteRequest struct {
SatelliteID storj.NodeID
PieceID storj.PieceID
}
// Deleter is a worker that processes requests to delete groups of pieceIDs.
// Deletes are processed "best-effort" asynchronously, and any errors are
// logged.
type Deleter struct {
mu sync.Mutex
ch chan DeleteRequest
numWorkers int
eg *errgroup.Group
log *zap.Logger
stop func()
store *Store
closed bool
// The test variables are only used when testing.
testToDelete int
testCond *sync.Cond
}
// NewDeleter creates a new Deleter.
func NewDeleter(log *zap.Logger, store *Store, numWorkers int, queueSize int) *Deleter {
if numWorkers == 0 {
numWorkers = 1
}
if queueSize == 0 {
// Default queueSize is chosen as a large number that uses a manageable
// amount of memory.
queueSize = 10000
}
return &Deleter{
ch: make(chan DeleteRequest, queueSize),
numWorkers: numWorkers,
log: log,
store: store,
}
}
// Run starts the delete workers.
func (d *Deleter) Run(ctx context.Context) error {
d.mu.Lock()
defer d.mu.Unlock()
if d.closed {
return errs.New("already closed")
}
if d.stop != nil {
return errs.New("already started")
}
ctx, d.stop = context.WithCancel(ctx)
d.eg = &errgroup.Group{}
for i := 0; i < d.numWorkers; i++ {
d.eg.Go(func() error {
return d.work(ctx)
})
}
return nil
}
// Enqueue adds the pieceIDs to the delete queue. If the queue is full deletes
// are not processed and will be left for garbage collection.
func (d *Deleter) Enqueue(ctx context.Context, satelliteID storj.NodeID, pieceIDs []storj.PieceID) {
for _, pieceID := range pieceIDs {
select {
case d.ch <- DeleteRequest{satelliteID, pieceID}:
default:
mon.Counter("piecedeleter-queue-full").Inc(1)
return
}
}
// If we are in testMode add the number of pieceIDs waiting to be processed.
if d.testCond != nil {
d.mu.Lock()
d.testToDelete += len(pieceIDs)
d.mu.Unlock()
}
}
func (d *Deleter) work(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case r := <-d.ch:
err := d.store.Delete(ctx, r.SatelliteID, r.PieceID)
if err != nil {
// If a piece cannot be deleted, we just log the error.
d.log.Error("delete failed",
zap.Stringer("Satellite ID", r.SatelliteID),
zap.Stringer("Piece ID", r.PieceID),
zap.Error(err),
)
} else {
d.log.Info("deleted",
zap.Stringer("Satellite ID", r.SatelliteID),
zap.Stringer("Piece ID", r.PieceID),
)
}
// If we are in test mode, check whether we have processed all known
// deletes, and if so broadcast on the cond.
if d.testCond != nil {
d.mu.Lock()
if d.testToDelete > 0 {
d.testToDelete--
}
if d.testToDelete == 0 {
d.testCond.Broadcast()
}
d.mu.Unlock()
}
}
}
}
// Close stops all the workers and waits for them to finish.
func (d *Deleter) Close() error {
d.mu.Lock()
d.closed = true
stop := d.stop
eg := d.eg
d.mu.Unlock()
if stop != nil {
stop()
}
if eg != nil {
return eg.Wait()
}
return nil
}
// Wait blocks until the queue is empty. This can only be called after SetupTest.
func (d *Deleter) Wait() {
d.mu.Lock()
for d.testToDelete > 0 {
d.testCond.Wait()
}
d.mu.Unlock()
}
// SetupTest puts the deleter in test mode. This should only be called in tests.
func (d *Deleter) SetupTest() {
d.testCond = sync.NewCond(&d.mu)
}

View File

@ -0,0 +1,74 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package pieces_test
import (
"io/ioutil"
"strings"
"testing"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/storage/filestore"
"storj.io/storj/storagenode/pieces"
)
func TestDeleter(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
dir, err := filestore.NewDir(ctx.Dir("piecedeleter"))
require.NoError(t, err)
blobs := filestore.New(zaptest.NewLogger(t), dir)
defer ctx.Check(blobs.Close)
store := pieces.NewStore(zaptest.NewLogger(t), blobs, nil, nil, nil)
// Also test that 0 works for maxWorkers
deleter := pieces.NewDeleter(zaptest.NewLogger(t), store, 0, 0)
defer ctx.Check(deleter.Close)
deleter.SetupTest()
require.NoError(t, deleter.Run(ctx))
satelliteID := testrand.NodeID()
pieceID := testrand.PieceID()
data := testrand.Bytes(memory.KB)
w, err := store.Writer(ctx, satelliteID, pieceID)
require.NoError(t, err)
_, err = w.Write(data)
require.NoError(t, err)
require.NoError(t, w.Commit(ctx, &pb.PieceHeader{}))
// confirm we can read the data before delete
r, err := store.Reader(ctx, satelliteID, pieceID)
require.NoError(t, err)
buf, err := ioutil.ReadAll(r)
require.NoError(t, err)
require.Equal(t, data, buf)
// Delete the piece we've created
deleter.Enqueue(ctx, satelliteID, []pb.PieceID{pieceID})
// Also delete a random non-existent piece, so we know it doesn't blow up
// when this happens
deleter.Enqueue(ctx, satelliteID, []pb.PieceID{testrand.PieceID()})
// wait for test hook to fire twice
deleter.Wait()
_, err = store.Reader(ctx, satelliteID, pieceID)
require.Condition(t, func() bool {
return strings.Contains(err.Error(), "file does not exist") ||
strings.Contains(err.Error(), "The system cannot find the path specified")
}, "unexpected error message")
}

View File

@ -55,6 +55,8 @@ type OldConfig struct {
type Config struct { type Config struct {
ExpirationGracePeriod time.Duration `help:"how soon before expiration date should things be considered expired" default:"48h0m0s"` ExpirationGracePeriod time.Duration `help:"how soon before expiration date should things be considered expired" default:"48h0m0s"`
MaxConcurrentRequests int `help:"how many concurrent requests are allowed, before uploads are rejected. 0 represents unlimited." default:"0"` MaxConcurrentRequests int `help:"how many concurrent requests are allowed, before uploads are rejected. 0 represents unlimited." default:"0"`
DeleteWorkers int `help:"how many piece delete workers" default:"0"`
DeleteQueueSize int `help:"size of the piece delete queue" default:"0"`
OrderLimitGracePeriod time.Duration `help:"how long after OrderLimit creation date are OrderLimits no longer accepted" default:"24h0m0s"` OrderLimitGracePeriod time.Duration `help:"how long after OrderLimit creation date are OrderLimits no longer accepted" default:"24h0m0s"`
CacheSyncInterval time.Duration `help:"how often the space used cache is synced to persistent storage" releaseDefault:"1h0m0s" devDefault:"0h1m0s"` CacheSyncInterval time.Duration `help:"how often the space used cache is synced to persistent storage" releaseDefault:"1h0m0s" devDefault:"0h1m0s"`
StreamOperationTimeout time.Duration `help:"how long to spend waiting for a stream operation before canceling" default:"30m"` StreamOperationTimeout time.Duration `help:"how long to spend waiting for a stream operation before canceling" default:"30m"`
@ -89,6 +91,7 @@ type Endpoint struct {
orders orders.DB orders orders.DB
usage bandwidth.DB usage bandwidth.DB
usedSerials UsedSerials usedSerials UsedSerials
pieceDeleter *pieces.Deleter
// liveRequests tracks the total number of incoming rpc requests. For gRPC // liveRequests tracks the total number of incoming rpc requests. For gRPC
// requests only, this number is compared to config.MaxConcurrentRequests // requests only, this number is compared to config.MaxConcurrentRequests
@ -104,7 +107,7 @@ type drpcEndpoint struct{ *Endpoint }
func (endpoint *Endpoint) DRPC() pb.DRPCPiecestoreServer { return &drpcEndpoint{Endpoint: endpoint} } func (endpoint *Endpoint) DRPC() pb.DRPCPiecestoreServer { return &drpcEndpoint{Endpoint: endpoint} }
// NewEndpoint creates a new piecestore endpoint. // NewEndpoint creates a new piecestore endpoint.
func NewEndpoint(log *zap.Logger, signer signing.Signer, trust *trust.Pool, monitor *monitor.Service, retain *retain.Service, pingStats pingStatsSource, store *pieces.Store, orders orders.DB, usage bandwidth.DB, usedSerials UsedSerials, config Config) (*Endpoint, error) { func NewEndpoint(log *zap.Logger, signer signing.Signer, trust *trust.Pool, monitor *monitor.Service, retain *retain.Service, pingStats pingStatsSource, store *pieces.Store, pieceDeleter *pieces.Deleter, orders orders.DB, usage bandwidth.DB, usedSerials UsedSerials, config Config) (*Endpoint, error) {
// If config.MaxConcurrentRequests is set we want to repsect it for grpc. // If config.MaxConcurrentRequests is set we want to repsect it for grpc.
// However, if it is 0 (unlimited) we force a limit. // However, if it is 0 (unlimited) we force a limit.
grpcReqLimit := config.MaxConcurrentRequests grpcReqLimit := config.MaxConcurrentRequests
@ -127,6 +130,7 @@ func NewEndpoint(log *zap.Logger, signer signing.Signer, trust *trust.Pool, moni
orders: orders, orders: orders,
usage: usage, usage: usage,
usedSerials: usedSerials, usedSerials: usedSerials,
pieceDeleter: pieceDeleter,
liveRequests: 0, liveRequests: 0,
}, nil }, nil
@ -186,23 +190,7 @@ func (endpoint *Endpoint) DeletePieces(
return nil, rpcstatus.Error(rpcstatus.PermissionDenied, "delete pieces called with untrusted ID") return nil, rpcstatus.Error(rpcstatus.PermissionDenied, "delete pieces called with untrusted ID")
} }
for _, pieceID := range req.PieceIds { endpoint.pieceDeleter.Enqueue(ctx, peer.ID, req.PieceIds)
err = endpoint.store.Delete(ctx, peer.ID, pieceID)
if err != nil {
// If a piece cannot be deleted, we just log the error.
// No error is returned to the caller.
endpoint.log.Error("delete failed",
zap.Stringer("Satellite ID", peer.ID),
zap.Stringer("Piece ID", pieceID),
zap.Error(err),
)
} else {
endpoint.log.Info("deleted",
zap.Stringer("Satellite ID", peer.ID),
zap.Stringer("Piece ID", pieceID),
)
}
}
return &pb.DeletePiecesResponse{}, nil return &pb.DeletePiecesResponse{}, nil
} }

View File

@ -383,7 +383,7 @@ func TestDeletePieces(t *testing.T) {
} }
t.Run("Ok", func(t *testing.T) { t.Run("Ok", func(t *testing.T) {
pieceIDs := []storj.PieceID{{1}, {2}, {3}, {4}} pieceIDs := []storj.PieceID{testrand.PieceID(), testrand.PieceID(), testrand.PieceID(), testrand.PieceID()}
dataArray := make([][]byte, len(pieceIDs)) dataArray := make([][]byte, len(pieceIDs))
for i, pieceID := range pieceIDs { for i, pieceID := range pieceIDs {
dataArray[i], _, _ = uploadPiece(t, ctx, pieceID, planetSN, planet.Uplinks[0], planetSat) dataArray[i], _, _ = uploadPiece(t, ctx, pieceID, planetSN, planet.Uplinks[0], planetSat)
@ -403,8 +403,8 @@ func TestDeletePieces(t *testing.T) {
}) })
t.Run("Ok: one piece to delete is missing", func(t *testing.T) { t.Run("Ok: one piece to delete is missing", func(t *testing.T) {
missingPieceID := storj.PieceID{12} missingPieceID := testrand.PieceID()
pieceIDs := []storj.PieceID{{1}, {2}, {3}, {4}} pieceIDs := []storj.PieceID{testrand.PieceID(), testrand.PieceID(), testrand.PieceID(), testrand.PieceID()}
dataArray := make([][]byte, len(pieceIDs)) dataArray := make([][]byte, len(pieceIDs))
for i, pieceID := range pieceIDs { for i, pieceID := range pieceIDs {
dataArray[i], _, _ = uploadPiece(t, ctx, pieceID, planetSN, planet.Uplinks[0], planetSat) dataArray[i], _, _ = uploadPiece(t, ctx, pieceID, planetSN, planet.Uplinks[0], planetSat)
@ -424,7 +424,7 @@ func TestDeletePieces(t *testing.T) {
}) })
t.Run("Ok: no piece deleted", func(t *testing.T) { t.Run("Ok: no piece deleted", func(t *testing.T) {
pieceID := storj.PieceID{10} pieceID := testrand.PieceID()
data, _, _ := uploadPiece(t, ctx, pieceID, planetSN, planet.Uplinks[0], planetSat) data, _, _ := uploadPiece(t, ctx, pieceID, planetSN, planet.Uplinks[0], planetSat)
err := client.DeletePieces(ctx.Context) err := client.DeletePieces(ctx.Context)
@ -436,7 +436,7 @@ func TestDeletePieces(t *testing.T) {
}) })
t.Run("error: permission denied", func(t *testing.T) { t.Run("error: permission denied", func(t *testing.T) {
pieceID := storj.PieceID{11} pieceID := testrand.PieceID()
data, _, _ := uploadPiece(t, ctx, pieceID, planetSN, planet.Uplinks[0], planetSat) data, _, _ := uploadPiece(t, ctx, pieceID, planetSN, planet.Uplinks[0], planetSat)
client, err := planet.Uplinks[0].DialPiecestore(ctx, planetSN) client, err := planet.Uplinks[0].DialPiecestore(ctx, planetSN)