storagenode/piecestore: implement trash recovery for download requests
This change allows a node to look for a piece in the trash when serving a download request. If the piece is found in the trash, it restores it to the blobs directory and continue to serve the request as expected. Resolves https://github.com/storj/storj/issues/6145 Change-Id: Ibfa3c0b4954875fa977bc995fc4dd2705ca3ce42
This commit is contained in:
parent
3f1ea4a0b9
commit
9ab934e2ae
@ -89,6 +89,10 @@ type Blobs interface {
|
||||
RestoreTrash(ctx context.Context, namespace []byte) ([][]byte, error)
|
||||
// EmptyTrash removes all files in trash that were moved to trash prior to trashedBefore and returns the total bytes emptied and keys deleted.
|
||||
EmptyTrash(ctx context.Context, namespace []byte, trashedBefore time.Time) (int64, [][]byte, error)
|
||||
// TryRestoreTrashPiece attempts to restore a piece from the trash.
|
||||
// It returns nil if the piece was restored, or an error if the piece was not
|
||||
// in the trash or could not be restored.
|
||||
TryRestoreTrashPiece(ctx context.Context, ref BlobRef) error
|
||||
// Stat looks up disk metadata on the blob file.
|
||||
Stat(ctx context.Context, ref BlobRef) (BlobInfo, error)
|
||||
// StatWithStorageFormat looks up disk metadata for the blob file with the given storage format
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
@ -470,6 +471,35 @@ func (dir *Dir) RestoreTrash(ctx context.Context, namespace []byte) (keysRestore
|
||||
return keysRestored, errorsEncountered.Err()
|
||||
}
|
||||
|
||||
// TryRestoreTrashPiece attempts to restore a piece from the trash if it exists.
|
||||
// It returns nil if the piece was restored, or an error if the piece was not
|
||||
// in the trash or could not be restored.
|
||||
func (dir *Dir) TryRestoreTrashPiece(ctx context.Context, ref blobstore.BlobRef) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
blobsBasePath, err := dir.blobToBasePath(ref)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
trashBasePath, err := dir.refToDirPath(ref, dir.trashdir())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// ensure the dirs exist for blobs path
|
||||
blobsVerPath := blobPathForFormatVersion(blobsBasePath, MaxFormatVersionSupported)
|
||||
err = os.MkdirAll(filepath.Dir(blobsVerPath), dirPermission)
|
||||
if err != nil && !errors.Is(err, fs.ErrExist) {
|
||||
return err
|
||||
}
|
||||
|
||||
trashVerPath := blobPathForFormatVersion(trashBasePath, MaxFormatVersionSupported)
|
||||
|
||||
// move back to blobsdir
|
||||
return rename(trashVerPath, blobsVerPath)
|
||||
}
|
||||
|
||||
// EmptyTrash walks the trash files for the given namespace and deletes any
|
||||
// file whose mtime is older than trashedBefore. The mtime is modified when
|
||||
// Trash is called.
|
||||
|
@ -154,7 +154,15 @@ func (store *blobStore) RestoreTrash(ctx context.Context, namespace []byte) (key
|
||||
return keysRestored, Error.Wrap(err)
|
||||
}
|
||||
|
||||
// // EmptyTrash removes all files in trash that have been there longer than trashExpiryDur.
|
||||
// TryRestoreTrashPiece attempts to restore a piece from the trash if it exists.
|
||||
// It returns nil if the piece was restored, or an error if the piece was not
|
||||
// in the trash or could not be restored.
|
||||
func (store *blobStore) TryRestoreTrashPiece(ctx context.Context, ref blobstore.BlobRef) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
return Error.Wrap(store.dir.TryRestoreTrashPiece(ctx, ref))
|
||||
}
|
||||
|
||||
// EmptyTrash removes all files in trash that have been there longer than trashExpiryDur.
|
||||
func (store *blobStore) EmptyTrash(ctx context.Context, namespace []byte, trashedBefore time.Time) (bytesEmptied int64, keys [][]byte, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
bytesEmptied, keys, err = store.dir.EmptyTrash(ctx, namespace, trashedBefore)
|
||||
|
@ -160,6 +160,14 @@ func (bad *BadBlobs) EmptyTrash(ctx context.Context, namespace []byte, trashedBe
|
||||
return bad.blobs.EmptyTrash(ctx, namespace, trashedBefore)
|
||||
}
|
||||
|
||||
// TryRestoreTrashPiece attempts to restore a piece from the trash.
|
||||
func (bad *BadBlobs) TryRestoreTrashPiece(ctx context.Context, ref blobstore.BlobRef) error {
|
||||
if err := bad.err.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
return bad.blobs.TryRestoreTrashPiece(ctx, ref)
|
||||
}
|
||||
|
||||
// Delete deletes the blob with the namespace and key.
|
||||
func (bad *BadBlobs) Delete(ctx context.Context, ref blobstore.BlobRef) error {
|
||||
if err := bad.err.Err(); err != nil {
|
||||
|
@ -157,6 +157,14 @@ func (slow *SlowBlobs) StatWithStorageFormat(ctx context.Context, ref blobstore.
|
||||
return slow.blobs.StatWithStorageFormat(ctx, ref, formatVer)
|
||||
}
|
||||
|
||||
// TryRestoreTrashPiece attempts to restore a piece from trash.
|
||||
func (slow *SlowBlobs) TryRestoreTrashPiece(ctx context.Context, ref blobstore.BlobRef) error {
|
||||
if err := slow.sleep(ctx); err != nil {
|
||||
return errs.Wrap(err)
|
||||
}
|
||||
return slow.blobs.TryRestoreTrashPiece(ctx, ref)
|
||||
}
|
||||
|
||||
// WalkNamespace executes walkFunc for each locally stored blob in the given namespace.
|
||||
// If walkFunc returns a non-nil error, WalkNamespace will stop iterating and return the
|
||||
// error immediately.
|
||||
|
@ -319,6 +319,16 @@ func (store *Store) Reader(ctx context.Context, satellite storj.NodeID, pieceID
|
||||
return reader, Error.Wrap(err)
|
||||
}
|
||||
|
||||
// TryRestoreTrashPiece attempts to restore a piece from the trash.
|
||||
// It returns nil if the piece was restored, or an error if the piece was not in the trash.
|
||||
func (store *Store) TryRestoreTrashPiece(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
return Error.Wrap(store.blobs.TryRestoreTrashPiece(ctx, blobstore.BlobRef{
|
||||
Namespace: satellite.Bytes(),
|
||||
Key: pieceID.Bytes(),
|
||||
}))
|
||||
}
|
||||
|
||||
// Delete deletes the specified piece.
|
||||
func (store *Store) Delete(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
"net"
|
||||
"os"
|
||||
"sync"
|
||||
@ -666,12 +667,31 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err
|
||||
|
||||
pieceReader, err = endpoint.store.Reader(ctx, limit.SatelliteId, limit.PieceId)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
endpoint.monitor.VerifyDirReadableLoop.TriggerWait()
|
||||
return rpcstatus.Wrap(rpcstatus.NotFound, err)
|
||||
}
|
||||
if !errs.Is(err, fs.ErrNotExist) {
|
||||
return rpcstatus.Wrap(rpcstatus.Internal, err)
|
||||
}
|
||||
|
||||
// check if the file is in trash, if so, restore it and
|
||||
// continue serving the download request.
|
||||
tryRestoreErr := endpoint.store.TryRestoreTrashPiece(ctx, limit.SatelliteId, limit.PieceId)
|
||||
if tryRestoreErr != nil {
|
||||
if !errs.Is(tryRestoreErr, fs.ErrNotExist) {
|
||||
// file is not in trash, and we don't want to return a file rename error,
|
||||
// so we return the original "file does not exist" error
|
||||
tryRestoreErr = err
|
||||
}
|
||||
endpoint.monitor.VerifyDirReadableLoop.TriggerWait()
|
||||
return rpcstatus.Wrap(rpcstatus.NotFound, tryRestoreErr)
|
||||
}
|
||||
mon.Meter("download_file_in_trash", monkit.NewSeriesTag("namespace", limit.SatelliteId.String()), monkit.NewSeriesTag("piece_id", limit.PieceId.String())).Mark(1)
|
||||
endpoint.log.Warn("file found in trash", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.String("Remote Address", remoteAddr))
|
||||
|
||||
// try to open the file again
|
||||
pieceReader, err = endpoint.store.Reader(ctx, limit.SatelliteId, limit.PieceId)
|
||||
if err != nil {
|
||||
return rpcstatus.Wrap(rpcstatus.Internal, err)
|
||||
}
|
||||
}
|
||||
defer func() {
|
||||
err := pieceReader.Close() // similarly how transcation Rollback works
|
||||
if err != nil {
|
||||
|
@ -6,6 +6,7 @@ package piecestore_test
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
@ -345,36 +346,69 @@ func TestDownload(t *testing.T) {
|
||||
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
pieceID := storj.PieceID{1}
|
||||
expectedData, _, _ := uploadPiece(t, ctx, pieceID, planet.StorageNodes[0], planet.Uplinks[0], planet.Satellites[0])
|
||||
data, _, _ := uploadPiece(t, ctx, pieceID, planet.StorageNodes[0], planet.Uplinks[0], planet.Satellites[0])
|
||||
|
||||
// upload another piece that we will trash
|
||||
trashPieceID := storj.PieceID{3}
|
||||
trashPieceData, _, _ := uploadPiece(t, ctx, trashPieceID, planet.StorageNodes[0], planet.Uplinks[0], planet.Satellites[0])
|
||||
err := planet.StorageNodes[0].Storage2.Store.Trash(ctx, planet.Satellites[0].ID(), trashPieceID)
|
||||
require.NoError(t, err)
|
||||
_, err = planet.StorageNodes[0].Storage2.Store.Stat(ctx, planet.Satellites[0].ID(), trashPieceID)
|
||||
require.Equal(t, true, errs.Is(err, os.ErrNotExist))
|
||||
|
||||
client, err := planet.Uplinks[0].DialPiecestore(ctx, planet.StorageNodes[0])
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, tt := range []struct {
|
||||
name string
|
||||
pieceID storj.PieceID
|
||||
action pb.PieceAction
|
||||
// downloadData is data we are trying to download
|
||||
downloadData []byte
|
||||
errs []string
|
||||
finalChecks func()
|
||||
}{
|
||||
{ // should successfully download data
|
||||
name: "download successful",
|
||||
pieceID: pieceID,
|
||||
action: pb.PieceAction_GET,
|
||||
downloadData: data,
|
||||
},
|
||||
{ // should restore from trash and successfully download data
|
||||
name: "restore trash and successfully download",
|
||||
pieceID: trashPieceID,
|
||||
action: pb.PieceAction_GET,
|
||||
downloadData: trashPieceData,
|
||||
finalChecks: func() {
|
||||
blobInfo, err := planet.StorageNodes[0].Storage2.Store.Stat(ctx, planet.Satellites[0].ID(), trashPieceID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, trashPieceID.Bytes(), blobInfo.BlobRef().Key)
|
||||
},
|
||||
},
|
||||
{ // should err with piece ID not specified
|
||||
name: "piece id not specified",
|
||||
pieceID: storj.PieceID{},
|
||||
action: pb.PieceAction_GET,
|
||||
downloadData: data,
|
||||
errs: []string{"missing piece id"},
|
||||
},
|
||||
{ // should err with piece ID not specified
|
||||
name: "file does not exist",
|
||||
pieceID: storj.PieceID{2},
|
||||
action: pb.PieceAction_GET,
|
||||
downloadData: data,
|
||||
errs: []string{"file does not exist", "The system cannot find the path specified"},
|
||||
},
|
||||
{ // should err with invalid action
|
||||
name: "invalid action",
|
||||
pieceID: pieceID,
|
||||
downloadData: data,
|
||||
action: pb.PieceAction_PUT,
|
||||
errs: []string{"expected get or get repair or audit action got PUT"},
|
||||
},
|
||||
} {
|
||||
tt := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
serialNumber := testrand.SerialNumber()
|
||||
|
||||
orderLimit, piecePrivateKey := GenerateOrderLimit(
|
||||
@ -386,22 +420,22 @@ func TestDownload(t *testing.T) {
|
||||
serialNumber,
|
||||
24*time.Hour,
|
||||
24*time.Hour,
|
||||
int64(len(expectedData)),
|
||||
int64(len(tt.downloadData)),
|
||||
)
|
||||
signer := signing.SignerFromFullIdentity(planet.Satellites[0].Identity)
|
||||
orderLimit, err = signing.SignOrderLimit(ctx, signer, orderLimit)
|
||||
require.NoError(t, err)
|
||||
|
||||
downloader, err := client.Download(ctx, orderLimit, piecePrivateKey, 0, int64(len(expectedData)))
|
||||
downloader, err := client.Download(ctx, orderLimit, piecePrivateKey, 0, int64(len(tt.downloadData)))
|
||||
require.NoError(t, err)
|
||||
|
||||
buffer := make([]byte, len(expectedData))
|
||||
buffer := make([]byte, len(data))
|
||||
n, readErr := downloader.Read(buffer)
|
||||
|
||||
if len(tt.errs) > 0 {
|
||||
} else {
|
||||
require.NoError(t, readErr)
|
||||
require.Equal(t, expectedData, buffer[:n])
|
||||
require.Equal(t, tt.downloadData, buffer[:n])
|
||||
}
|
||||
|
||||
closeErr := downloader.Close()
|
||||
@ -427,6 +461,11 @@ func TestDownload(t *testing.T) {
|
||||
hash, originalLimit := downloader.GetHashAndLimit()
|
||||
require.Nil(t, hash)
|
||||
require.Nil(t, originalLimit)
|
||||
|
||||
if tt.finalChecks != nil {
|
||||
tt.finalChecks()
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user