storagenode/pieces: Add migration from v0 piece to v1 piece (#3401)

This commit is contained in:
Isaac Hess 2019-11-04 09:59:45 -07:00 committed by Stefan Benten
parent 0c2e498f09
commit 4d26d0a6a6
6 changed files with 254 additions and 48 deletions

View File

@ -84,6 +84,12 @@ func (slow *SlowBlobs) Delete(ctx context.Context, ref storage.BlobRef) error {
return slow.blobs.Delete(ctx, ref)
}
// DeleteWithStorageFormat deletes the blob with the namespace, key, and format version
func (slow *SlowBlobs) DeleteWithStorageFormat(ctx context.Context, ref storage.BlobRef, formatVer storage.FormatVersion) error {
slow.sleep()
return slow.blobs.DeleteWithStorageFormat(ctx, ref, formatVer)
}
// Stat looks up disk metadata on the blob file
func (slow *SlowBlobs) Stat(ctx context.Context, ref storage.BlobRef) (storage.BlobInfo, error) {
slow.sleep()

View File

@ -76,6 +76,8 @@ type Blobs interface {
OpenWithStorageFormat(ctx context.Context, ref BlobRef, formatVer FormatVersion) (BlobReader, error)
// Delete deletes the blob with the namespace and key
Delete(ctx context.Context, ref BlobRef) error
// DeleteWithStorageFormat deletes a blob of a specific storage format
DeleteWithStorageFormat(ctx context.Context, ref BlobRef, formatVer FormatVersion) error
// Stat looks up disk metadata on the blob file
Stat(ctx context.Context, ref BlobRef) (BlobInfo, error)
// StatWithStorageFormat looks up disk metadata for the blob file with the given storage format

View File

@ -254,16 +254,8 @@ func (dir *Dir) StatWithStorageFormat(ctx context.Context, ref storage.BlobRef,
// Delete deletes blobs with the specified ref (in all supported storage formats).
func (dir *Dir) Delete(ctx context.Context, ref storage.BlobRef) (err error) {
defer mon.Task()(&ctx)(&err)
pathBase, err := dir.blobToBasePath(ref)
if err != nil {
return err
}
trashPath := dir.blobToTrashPath(ref)
var (
moveErr error
combinedErrors errs.Group
)
var combinedErrors errs.Group
// Try deleting all possible paths, starting with the oldest format version. It is more
// likely, in the general case, that we will find the piece with the newest format version
@ -273,50 +265,70 @@ func (dir *Dir) Delete(ctx context.Context, ref storage.BlobRef) (err error) {
// _forwards_, this race should not occur because it is assumed that pieces are never
// rewritten with an _older_ storage format version.
for i := MinFormatVersionSupported; i <= MaxFormatVersionSupported; i++ {
verPath := blobPathForFormatVersion(pathBase, i)
// move to trash folder, this is allowed for some OS-es
moveErr = rename(verPath, trashPath)
if os.IsNotExist(moveErr) {
// no piece at that path; either it has a different storage format version or there
// was a concurrent delete. (this function is expected by callers to return a nil
// error in the case of concurrent deletes.)
continue
}
if moveErr != nil {
// piece could not be moved into the trash dir; we'll try removing it directly
trashPath = verPath
}
// try removing the file
err = os.Remove(trashPath)
// ignore concurrent deletes
if os.IsNotExist(err) {
// something is happening at the same time as this; possibly a concurrent delete,
// or possibly a rewrite of the blob. keep checking for more versions.
continue
}
// the remove may have failed because of an open file handle. put it in a queue to be
// retried later.
if err != nil {
dir.mu.Lock()
dir.deleteQueue = append(dir.deleteQueue, trashPath)
dir.mu.Unlock()
}
// ignore is-busy errors, they are still in the queue
// but no need to notify
if isBusy(err) {
err = nil
}
combinedErrors.Add(err)
combinedErrors.Add(dir.DeleteWithStorageFormat(ctx, ref, i))
}
return combinedErrors.Err()
}
// DeleteWithStorageFormat deletes the blob with the specified ref for one specific format version
func (dir *Dir) DeleteWithStorageFormat(ctx context.Context, ref storage.BlobRef, formatVer storage.FormatVersion) (err error) {
defer mon.Task()(&ctx)(&err)
// Ensure garbage dir exists so that we know any os.IsNotExist errors below
// are not from a missing garbage dir
_, err = os.Stat(dir.garbagedir())
if err != nil {
return err
}
pathBase, err := dir.blobToBasePath(ref)
if err != nil {
return err
}
trashPath := dir.blobToTrashPath(ref)
verPath := blobPathForFormatVersion(pathBase, formatVer)
// move to trash folder, this is allowed for some OS-es
moveErr := rename(verPath, trashPath)
if os.IsNotExist(moveErr) {
// no piece at that path; either it has a different storage format
// version or there was a concurrent delete. (this function is expected
// by callers to return a nil error in the case of concurrent deletes.)
return nil
}
if moveErr != nil {
// piece could not be moved into the trash dir; we'll try removing it
// directly
trashPath = verPath
}
// try removing the file
err = os.Remove(trashPath)
// ignore concurrent deletes
if os.IsNotExist(err) {
// something is happening at the same time as this; possibly a
// concurrent delete, or possibly a rewrite of the blob.
return nil
}
// the remove may have failed because of an open file handle. put it in a
// queue to be retried later.
if err != nil {
dir.mu.Lock()
dir.deleteQueue = append(dir.deleteQueue, trashPath)
dir.mu.Unlock()
}
// ignore is-busy errors, they are still in the queue but no need to notify
if isBusy(err) {
err = nil
}
return err
}
// GarbageCollect collects files that are pending deletion.
func (dir *Dir) GarbageCollect(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)

View File

@ -94,6 +94,13 @@ func (store *Store) Delete(ctx context.Context, ref storage.BlobRef) (err error)
return Error.Wrap(err)
}
// DeleteWithStorageFormat deletes blobs with the specified ref and storage format version
func (store *Store) DeleteWithStorageFormat(ctx context.Context, ref storage.BlobRef, formatVer storage.FormatVersion) (err error) {
defer mon.Task()(&ctx)(&err)
err = store.dir.DeleteWithStorageFormat(ctx, ref, formatVer)
return Error.Wrap(err)
}
// GarbageCollect tries to delete any files that haven't yet been deleted
func (store *Store) GarbageCollect(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)

View File

@ -5,6 +5,7 @@ package pieces
import (
"context"
"io"
"os"
"time"
@ -270,6 +271,64 @@ func (store *Store) Delete(ctx context.Context, satellite storj.NodeID, pieceID
if store.v0PieceInfo != nil {
err = errs.Combine(err, store.v0PieceInfo.Delete(ctx, satellite, pieceID))
}
return Error.Wrap(err)
}
// MigrateV0ToV1 will migrate a piece stored with storage format v0 to storage
// format v1. If the piece is not stored as a v0 piece it will return an error.
// The follow failures are possible:
// - Fail to open or read v0 piece. In this case no artifacts remain.
// - Fail to Write or Commit v1 piece. In this case no artifacts remain.
// - Fail to Delete v0 piece. In this case v0 piece may remain, but v1 piece
// will exist and be preferred in future calls.
func (store *Store) MigrateV0ToV1(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) (err error) {
defer mon.Task()(&ctx)(&err)
info, err := store.v0PieceInfo.Get(ctx, satelliteID, pieceID)
if err != nil {
return Error.Wrap(err)
}
err = func() (err error) {
r, err := store.Reader(ctx, satelliteID, pieceID)
if err != nil {
return err
}
defer func() { err = errs.Combine(err, r.Close()) }()
w, err := store.Writer(ctx, satelliteID, pieceID)
if err != nil {
return err
}
_, err = io.Copy(w, r)
if err != nil {
return errs.Combine(err, w.Cancel(ctx))
}
header := &pb.PieceHeader{
Hash: w.Hash(),
CreationTime: info.PieceCreation,
Signature: info.UplinkPieceHash.GetSignature(),
OrderLimit: *info.OrderLimit,
}
return w.Commit(ctx, header)
}()
if err != nil {
return Error.Wrap(err)
}
err = store.blobs.DeleteWithStorageFormat(ctx, storage.BlobRef{
Namespace: satelliteID.Bytes(),
Key: pieceID.Bytes(),
}, filestore.FormatV0)
if store.v0PieceInfo != nil {
err = errs.Combine(err, store.v0PieceInfo.Delete(ctx, satelliteID, pieceID))
}
return Error.Wrap(err)
}

View File

@ -6,6 +6,7 @@ package pieces_test
import (
"bytes"
"context"
"encoding/hex"
"io"
"io/ioutil"
"os"
@ -22,6 +23,7 @@ import (
"storj.io/storj/internal/testrand"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/pkcrypto"
"storj.io/storj/pkg/signing"
"storj.io/storj/pkg/storj"
"storj.io/storj/storage"
"storj.io/storj/storage/filestore"
@ -181,6 +183,124 @@ func tryOpeningAPiece(ctx context.Context, t testing.TB, store *pieces.Store, sa
require.NoError(t, reader.Close())
}
func TestPieceVersionMigrate(t *testing.T) {
storagenodedbtest.Run(t, func(t *testing.T, db storagenode.DB) {
const pieceSize = 1024
ctx := testcontext.New(t)
defer ctx.Cleanup()
var (
data = testrand.Bytes(pieceSize)
satelliteID = testrand.NodeID()
pieceID = testrand.PieceID()
now = time.Now().UTC()
)
// Initialize pub/priv keys for signing piece hash
publicKeyBytes, err := hex.DecodeString("01eaebcb418cd629d4c01f365f33006c9de3ce70cf04da76c39cdc993f48fe53")
require.NoError(t, err)
privateKeyBytes, err := hex.DecodeString("afefcccadb3d17b1f241b7c83f88c088b54c01b5a25409c13cbeca6bfa22b06901eaebcb418cd629d4c01f365f33006c9de3ce70cf04da76c39cdc993f48fe53")
require.NoError(t, err)
publicKey, err := storj.PiecePublicKeyFromBytes(publicKeyBytes)
require.NoError(t, err)
privateKey, err := storj.PiecePrivateKeyFromBytes(privateKeyBytes)
require.NoError(t, err)
ol := &pb.OrderLimit{
SerialNumber: testrand.SerialNumber(),
SatelliteId: satelliteID,
StorageNodeId: testrand.NodeID(),
PieceId: pieceID,
SatelliteSignature: []byte("sig"),
Limit: 100,
Action: pb.PieceAction_GET,
PieceExpiration: now,
OrderExpiration: now,
OrderCreation: now,
}
olPieceInfo := *ol
v0PieceInfo, ok := db.V0PieceInfo().(pieces.V0PieceInfoDBForTest)
require.True(t, ok, "V0PieceInfoDB can not satisfy V0PieceInfoDBForTest")
blobs, err := filestore.NewAt(zaptest.NewLogger(t), ctx.Dir("store"))
require.NoError(t, err)
defer ctx.Check(blobs.Close)
store := pieces.NewStore(zaptest.NewLogger(t), blobs, v0PieceInfo, nil, nil)
// write as a v0 piece
tStore := &pieces.StoreForTest{store}
writer, err := tStore.WriterForFormatVersion(ctx, satelliteID, pieceID, filestore.FormatV0)
require.NoError(t, err)
_, err = writer.Write(data)
require.NoError(t, err)
assert.Equal(t, int64(len(data)), writer.Size())
err = writer.Commit(ctx, &pb.PieceHeader{
Hash: writer.Hash(),
CreationTime: now,
OrderLimit: olPieceInfo,
})
require.NoError(t, err)
// Create PieceHash from the v0 piece written
ph := &pb.PieceHash{
PieceId: pieceID,
Hash: writer.Hash(),
PieceSize: writer.Size(),
Timestamp: now,
}
// sign v0 pice hash
signedPhPieceInfo, err := signing.SignUplinkPieceHash(ctx, privateKey, ph)
require.NoError(t, err)
// Create v0 pieces.Info and add to v0 store
pieceInfo := pieces.Info{
SatelliteID: satelliteID,
PieceID: pieceID,
PieceSize: writer.Size(),
OrderLimit: &olPieceInfo,
PieceCreation: now,
UplinkPieceHash: signedPhPieceInfo,
}
require.NoError(t, v0PieceInfo.Add(ctx, &pieceInfo))
// verify piece can be opened as v0
tryOpeningAPiece(ctx, t, store, satelliteID, pieceID, len(data), now, filestore.FormatV0)
// run migration
require.NoError(t, store.MigrateV0ToV1(ctx, satelliteID, pieceID))
// open as v1 piece
tryOpeningAPiece(ctx, t, store, satelliteID, pieceID, len(data), now, filestore.FormatV1)
// manually read v1 piece
reader, err := store.ReaderWithStorageFormat(ctx, satelliteID, pieceID, filestore.FormatV1)
require.NoError(t, err)
// generate v1 pieceHash and verify signature is still valid
v1Header, err := reader.GetPieceHeader()
require.NoError(t, err)
v1PieceHash := pb.PieceHash{
PieceId: v1Header.OrderLimit.PieceId,
Hash: v1Header.GetHash(),
PieceSize: reader.Size(),
Timestamp: v1Header.GetCreationTime(),
Signature: v1Header.GetSignature(),
}
require.NoError(t, signing.VerifyUplinkPieceHashSignature(ctx, publicKey, &v1PieceHash))
require.Equal(t, signedPhPieceInfo.GetSignature(), v1PieceHash.GetSignature())
require.Equal(t, *ol, v1Header.OrderLimit)
// Verify that it was deleted from v0PieceInfo
retrivedInfo, err := v0PieceInfo.Get(ctx, satelliteID, pieceID)
require.Error(t, err)
require.Nil(t, retrivedInfo)
})
}
// Test that the piece store can still read V0 pieces that might be left over from a previous
// version, as well as V1 pieces.
func TestMultipleStorageFormatVersions(t *testing.T) {