storagenode/pieces: send piece deletions to trash
This is a temporary precaution to avoid incorrectly auditing nodes for pieces that were deleted between database backups if we have to restore from a previous backup. Here we send pieces to trash rather than directly deleting them from storage nodes so we can restore from trash after a db restoration. Change-Id: Icd979d2a9a755e7428190c0129c9bc969649d544
This commit is contained in:
parent
7e4e1040f2
commit
71072eb593
@ -132,21 +132,7 @@ func (d *Deleter) work(ctx context.Context) error {
|
||||
case r := <-d.ch:
|
||||
mon.IntVal("piecedeleter-queue-time").Observe(int64(time.Since(r.QueueTime)))
|
||||
mon.IntVal("piecedeleter-queue-size").Observe(int64(len(d.ch)))
|
||||
err := d.store.Delete(ctx, r.SatelliteID, r.PieceID)
|
||||
if err != nil {
|
||||
// If a piece cannot be deleted, we just log the error.
|
||||
d.log.Error("delete failed",
|
||||
zap.Stringer("Satellite ID", r.SatelliteID),
|
||||
zap.Stringer("Piece ID", r.PieceID),
|
||||
zap.Error(err),
|
||||
)
|
||||
} else {
|
||||
d.log.Info("deleted",
|
||||
zap.Stringer("Satellite ID", r.SatelliteID),
|
||||
zap.Stringer("Piece ID", r.PieceID),
|
||||
)
|
||||
}
|
||||
|
||||
d.deleteOrTrash(ctx, r.SatelliteID, r.PieceID)
|
||||
// If we are in test mode, check if we are done processing deletes
|
||||
if d.testMode {
|
||||
d.checkDone(-1)
|
||||
@ -190,3 +176,30 @@ func (d *Deleter) Wait(ctx context.Context) {
|
||||
func (d *Deleter) SetupTest() {
|
||||
d.testMode = true
|
||||
}
|
||||
|
||||
func (d *Deleter) deleteOrTrash(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) {
|
||||
var err error
|
||||
var errMsg string
|
||||
var infoMsg string
|
||||
if d.store.config.DeleteToTrash {
|
||||
err = d.store.Trash(ctx, satelliteID, pieceID)
|
||||
errMsg = "could not send delete piece to trash"
|
||||
infoMsg = "delete piece sent to trash"
|
||||
} else {
|
||||
err = d.store.Delete(ctx, satelliteID, pieceID)
|
||||
errMsg = "delete failed"
|
||||
infoMsg = "deleted"
|
||||
}
|
||||
if err != nil {
|
||||
d.log.Error(errMsg,
|
||||
zap.Stringer("Satellite ID", satelliteID),
|
||||
zap.Stringer("Piece ID", pieceID),
|
||||
zap.Error(err),
|
||||
)
|
||||
} else {
|
||||
d.log.Info(infoMsg,
|
||||
zap.Stringer("Satellite ID", satelliteID),
|
||||
zap.Stringer("Piece ID", pieceID),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
@ -5,8 +5,6 @@ package pieces_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io/ioutil"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
@ -18,63 +16,100 @@ import (
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/testrand"
|
||||
"storj.io/storj/storage/filestore"
|
||||
"storj.io/storj/storagenode"
|
||||
"storj.io/storj/storagenode/pieces"
|
||||
"storj.io/storj/storagenode/storagenodedb/storagenodedbtest"
|
||||
)
|
||||
|
||||
func TestDeleter(t *testing.T) {
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
cases := []struct {
|
||||
testID string
|
||||
deleteToTrash bool
|
||||
}{
|
||||
{
|
||||
testID: "regular-delete",
|
||||
deleteToTrash: false,
|
||||
}, {
|
||||
testID: "trash-delete",
|
||||
deleteToTrash: true,
|
||||
},
|
||||
}
|
||||
for _, c := range cases {
|
||||
testCase := c
|
||||
t.Run(testCase.testID, func(t *testing.T) {
|
||||
storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) {
|
||||
dir, err := filestore.NewDir(zaptest.NewLogger(t), ctx.Dir("piecedeleter"))
|
||||
require.NoError(t, err)
|
||||
|
||||
dir, err := filestore.NewDir(zaptest.NewLogger(t), ctx.Dir("piecedeleter"))
|
||||
require.NoError(t, err)
|
||||
blobs := filestore.New(zaptest.NewLogger(t), dir, filestore.DefaultConfig)
|
||||
defer ctx.Check(blobs.Close)
|
||||
|
||||
blobs := filestore.New(zaptest.NewLogger(t), dir, filestore.DefaultConfig)
|
||||
defer ctx.Check(blobs.Close)
|
||||
v0PieceInfo, ok := db.V0PieceInfo().(pieces.V0PieceInfoDBForTest)
|
||||
require.True(t, ok, "V0PieceInfoDB can not satisfy V0PieceInfoDBForTest")
|
||||
|
||||
store := pieces.NewStore(zaptest.NewLogger(t), blobs, nil, nil, nil, pieces.DefaultConfig)
|
||||
conf := pieces.Config{
|
||||
WritePreallocSize: 4 * memory.MiB,
|
||||
DeleteToTrash: testCase.deleteToTrash,
|
||||
}
|
||||
store := pieces.NewStore(zaptest.NewLogger(t), blobs, v0PieceInfo, db.PieceExpirationDB(), nil, conf)
|
||||
deleter := pieces.NewDeleter(zaptest.NewLogger(t), store, 1, 10000)
|
||||
defer ctx.Check(deleter.Close)
|
||||
deleter.SetupTest()
|
||||
|
||||
// Also test that 0 works for maxWorkers
|
||||
deleter := pieces.NewDeleter(zaptest.NewLogger(t), store, 1, 10000)
|
||||
defer ctx.Check(deleter.Close)
|
||||
deleter.SetupTest()
|
||||
require.NoError(t, deleter.Run(ctx))
|
||||
satelliteID := testrand.NodeID()
|
||||
pieceID := testrand.PieceID()
|
||||
|
||||
require.NoError(t, deleter.Run(ctx))
|
||||
data := testrand.Bytes(memory.KB)
|
||||
w, err := store.Writer(ctx, satelliteID, pieceID)
|
||||
require.NoError(t, err)
|
||||
_, err = w.Write(data)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, w.Commit(ctx, &pb.PieceHeader{}))
|
||||
|
||||
satelliteID := testrand.NodeID()
|
||||
pieceID := testrand.PieceID()
|
||||
// Delete the piece we've created
|
||||
unhandled := deleter.Enqueue(ctx, satelliteID, []pb.PieceID{pieceID})
|
||||
require.Equal(t, 0, unhandled)
|
||||
|
||||
data := testrand.Bytes(memory.KB)
|
||||
w, err := store.Writer(ctx, satelliteID, pieceID)
|
||||
require.NoError(t, err)
|
||||
_, err = w.Write(data)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, w.Commit(ctx, &pb.PieceHeader{}))
|
||||
// wait for test hook to fire twice
|
||||
deleter.Wait(ctx)
|
||||
|
||||
// confirm we can read the data before delete
|
||||
r, err := store.Reader(ctx, satelliteID, pieceID)
|
||||
require.NoError(t, err)
|
||||
// check that piece is not available
|
||||
r1, err := store.Reader(ctx, satelliteID, pieceID)
|
||||
require.Error(t, err)
|
||||
require.Nil(t, r1)
|
||||
|
||||
buf, err := ioutil.ReadAll(r)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, data, buf)
|
||||
defer func() {
|
||||
if r1 != nil {
|
||||
ctx.Check(r1.Close)
|
||||
}
|
||||
}()
|
||||
|
||||
// Delete the piece we've created
|
||||
unhandled := deleter.Enqueue(ctx, satelliteID, []pb.PieceID{pieceID})
|
||||
require.Equal(t, 0, unhandled)
|
||||
// check the trash
|
||||
err = store.RestoreTrash(ctx, satelliteID)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Also delete a random non-existent piece, so we know it doesn't blow up
|
||||
// when this happens
|
||||
unhandled = deleter.Enqueue(ctx, satelliteID, []pb.PieceID{testrand.PieceID()})
|
||||
require.Equal(t, 0, unhandled)
|
||||
r2, err := store.Reader(ctx, satelliteID, pieceID)
|
||||
defer func() {
|
||||
if r2 != nil {
|
||||
ctx.Check(r2.Close)
|
||||
}
|
||||
}()
|
||||
if !testCase.deleteToTrash {
|
||||
require.Error(t, err)
|
||||
require.Nil(t, r2)
|
||||
}
|
||||
if testCase.deleteToTrash {
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, r2)
|
||||
}
|
||||
|
||||
// wait for test hook to fire twice
|
||||
deleter.Wait(ctx)
|
||||
|
||||
_, err = store.Reader(ctx, satelliteID, pieceID)
|
||||
require.Condition(t, func() bool {
|
||||
return strings.Contains(err.Error(), "file does not exist") ||
|
||||
strings.Contains(err.Error(), "The system cannot find the path specified")
|
||||
}, "unexpected error message")
|
||||
// Also delete a random non-existent piece, so we know it doesn't blow up when this happens
|
||||
unhandled = deleter.Enqueue(ctx, satelliteID, []pb.PieceID{testrand.PieceID()})
|
||||
require.Equal(t, 0, unhandled)
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestEnqueueUnhandled(t *testing.T) {
|
||||
|
@ -155,6 +155,7 @@ type SatelliteUsage struct {
|
||||
// Config is configuration for Store.
|
||||
type Config struct {
|
||||
WritePreallocSize memory.Size `help:"file preallocated for uploading" default:"4MiB"`
|
||||
DeleteToTrash bool `help:"move pieces to trash upon deletion. Warning: if set to false, you risk disqualification for failed audits if a satellite database is restored from backup." default:"true"`
|
||||
}
|
||||
|
||||
// DefaultConfig is the default value for the Config.
|
||||
|
Loading…
Reference in New Issue
Block a user