storage/filestore: Add Trash and RestoreTrash to Blobs (#3529)

* storage/filestore: Add Trash and RestoreTrash to Blobs

* Change restore to be satellite-specific

* Fix comment

* Fix merge rename conflict
This commit is contained in:
Isaac Hess 2019-11-14 15:19:15 -07:00 committed by GitHub
parent ee6c1cac8a
commit 2166c2a21b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 334 additions and 35 deletions

View File

@ -83,6 +83,18 @@ func (slow *SlowBlobs) OpenWithStorageFormat(ctx context.Context, ref storage.Bl
return slow.blobs.OpenWithStorageFormat(ctx, ref, formatVer) return slow.blobs.OpenWithStorageFormat(ctx, ref, formatVer)
} }
// Trash deletes the blob with the namespace and key.
func (slow *SlowBlobs) Trash(ctx context.Context, ref storage.BlobRef) error {
slow.sleep()
return slow.blobs.Trash(ctx, ref)
}
// RestoreTrash restores all files in the trash
func (slow *SlowBlobs) RestoreTrash(ctx context.Context, namespace []byte) error {
slow.sleep()
return slow.blobs.RestoreTrash(ctx, namespace)
}
// Delete deletes the blob with the namespace and key. // Delete deletes the blob with the namespace and key.
func (slow *SlowBlobs) Delete(ctx context.Context, ref storage.BlobRef) error { func (slow *SlowBlobs) Delete(ctx context.Context, ref storage.BlobRef) error {
slow.sleep() slow.sleep()

View File

@ -78,6 +78,10 @@ type Blobs interface {
Delete(ctx context.Context, ref BlobRef) error Delete(ctx context.Context, ref BlobRef) error
// DeleteWithStorageFormat deletes a blob of a specific storage format // DeleteWithStorageFormat deletes a blob of a specific storage format
DeleteWithStorageFormat(ctx context.Context, ref BlobRef, formatVer FormatVersion) error DeleteWithStorageFormat(ctx context.Context, ref BlobRef, formatVer FormatVersion) error
// Trash marks a file for pending deletion
Trash(ctx context.Context, ref BlobRef) error
// RestoreTrash restores all files in the trash for a given namespace
RestoreTrash(ctx context.Context, namespace []byte) error
// Stat looks up disk metadata on the blob file // Stat looks up disk metadata on the blob file
Stat(ctx context.Context, ref BlobRef) (BlobInfo, error) Stat(ctx context.Context, ref BlobRef) (BlobInfo, error)
// StatWithStorageFormat looks up disk metadata for the blob file with the given storage format // StatWithStorageFormat looks up disk metadata for the blob file with the given storage format

View File

@ -48,16 +48,25 @@ func NewDir(path string) (*Dir, error) {
os.MkdirAll(dir.blobsdir(), dirPermission), os.MkdirAll(dir.blobsdir(), dirPermission),
os.MkdirAll(dir.tempdir(), dirPermission), os.MkdirAll(dir.tempdir(), dirPermission),
os.MkdirAll(dir.garbagedir(), dirPermission), os.MkdirAll(dir.garbagedir(), dirPermission),
os.MkdirAll(dir.trashdir(), dirPermission),
) )
} }
// Path returns the directory path // Path returns the directory path
func (dir *Dir) Path() string { return dir.path } func (dir *Dir) Path() string { return dir.path }
func (dir *Dir) blobsdir() string { return filepath.Join(dir.path, "blobs") } // blobsdir is the sub-directory containing the blobs
func (dir *Dir) tempdir() string { return filepath.Join(dir.path, "temp") } func (dir *Dir) blobsdir() string { return filepath.Join(dir.path, "blobs") }
// tempdir is used for temp files prior to being moved into blobsdir
func (dir *Dir) tempdir() string { return filepath.Join(dir.path, "temp") }
// garbagedir contains files that failed to delete but should be deleted
func (dir *Dir) garbagedir() string { return filepath.Join(dir.path, "garbage") } func (dir *Dir) garbagedir() string { return filepath.Join(dir.path, "garbage") }
// trashdir contains files staged for deletion for a period of time
func (dir *Dir) trashdir() string { return filepath.Join(dir.path, "trash") }
// CreateTemporaryFile creates a preallocated temporary file in the temp directory // CreateTemporaryFile creates a preallocated temporary file in the temp directory
// prealloc preallocates file to make writing faster // prealloc preallocates file to make writing faster
func (dir *Dir) CreateTemporaryFile(ctx context.Context, prealloc int64) (_ *os.File, err error) { func (dir *Dir) CreateTemporaryFile(ctx context.Context, prealloc int64) (_ *os.File, err error) {
@ -91,6 +100,11 @@ func (dir *Dir) DeleteTemporary(ctx context.Context, file *os.File) (err error)
// part of the filepath is constant, and blobPathForFormatVersion may need to be called multiple // part of the filepath is constant, and blobPathForFormatVersion may need to be called multiple
// times with different storage.FormatVersion values. // times with different storage.FormatVersion values.
func (dir *Dir) blobToBasePath(ref storage.BlobRef) (string, error) { func (dir *Dir) blobToBasePath(ref storage.BlobRef) (string, error) {
return dir.refToDirPath(ref, dir.blobsdir())
}
// refToDirPath converts a blob reference to a filepath in the specified sub-directory.
func (dir *Dir) refToDirPath(ref storage.BlobRef, subDir string) (string, error) {
if !ref.IsValid() { if !ref.IsValid() {
return "", storage.ErrInvalidBlobRef.New("") return "", storage.ErrInvalidBlobRef.New("")
} }
@ -101,7 +115,7 @@ func (dir *Dir) blobToBasePath(ref storage.BlobRef) (string, error) {
// ensure we always have enough characters to split [:2] and [2:] // ensure we always have enough characters to split [:2] and [2:]
key = "11" + key key = "11" + key
} }
return filepath.Join(dir.blobsdir(), namespace, key[:2], key[2:]), nil return filepath.Join(subDir, namespace, key[:2], key[2:]), nil
} }
// blobPathForFormatVersion adjusts a bare blob path (as might have been generated by a call to // blobPathForFormatVersion adjusts a bare blob path (as might have been generated by a call to
@ -116,10 +130,10 @@ func blobPathForFormatVersion(path string, formatVersion storage.FormatVersion)
return path + unknownPieceFileSuffix return path + unknownPieceFileSuffix
} }
// blobToTrashPath converts a blob reference to a filepath in transient storage. // blobToGarbagePath converts a blob reference to a filepath in transient
// The files in trash are deleted on an interval (in case the initial deletion didn't work for // storage. The files in garbage are deleted on an interval (in case the
// some reason). // initial deletion didn't work for some reason).
func (dir *Dir) blobToTrashPath(ref storage.BlobRef) string { func (dir *Dir) blobToGarbagePath(ref storage.BlobRef) string {
var name []byte var name []byte
name = append(name, ref.Namespace...) name = append(name, ref.Namespace...)
name = append(name, ref.Key...) name = append(name, ref.Key...)
@ -251,24 +265,112 @@ func (dir *Dir) StatWithStorageFormat(ctx context.Context, ref storage.BlobRef,
return nil, Error.New("unable to stat %q: %v", vPath, err) return nil, Error.New("unable to stat %q: %v", vPath, err)
} }
// Trash moves the piece specified by ref to the trashdir for every format version
func (dir *Dir) Trash(ctx context.Context, ref storage.BlobRef) (err error) {
defer mon.Task()(&ctx)(&err)
return dir.iterateStorageFormatVersions(ctx, ref, dir.TrashWithStorageFormat)
}
// TrashWithStorageFormat moves the piece specified by ref to the trashdir for the specified format version
func (dir *Dir) TrashWithStorageFormat(ctx context.Context, ref storage.BlobRef, formatVer storage.FormatVersion) (err error) {
// Ensure trashdir exists so that we know any os.IsNotExist errors below
// are not from a missing trash dir
_, err = os.Stat(dir.trashdir())
if err != nil {
return err
}
blobsBasePath, err := dir.blobToBasePath(ref)
if err != nil {
return err
}
blobsVerPath := blobPathForFormatVersion(blobsBasePath, formatVer)
trashBasePath, err := dir.refToDirPath(ref, dir.trashdir())
if err != nil {
return err
}
trashVerPath := blobPathForFormatVersion(trashBasePath, formatVer)
// ensure the dirs exist for trash path
err = os.MkdirAll(filepath.Dir(trashVerPath), dirPermission)
if err != nil && !os.IsExist(err) {
return err
}
// move to trash
err = rename(blobsVerPath, trashVerPath)
if os.IsNotExist(err) {
// no piece at that path; either it has a different storage format
// version or there was a concurrent call. (This function is expected
// by callers to return a nil error in the case of concurrent calls.)
return nil
}
return err
}
// RestoreTrash moves every piece in the trash folder back into blobsdir
func (dir *Dir) RestoreTrash(ctx context.Context, namespace []byte) (err error) {
return dir.walkNamespaceInPath(ctx, namespace, dir.trashdir(), func(info storage.BlobInfo) error {
blobsBasePath, err := dir.blobToBasePath(info.BlobRef())
if err != nil {
return err
}
blobsVerPath := blobPathForFormatVersion(blobsBasePath, info.StorageFormatVersion())
trashBasePath, err := dir.refToDirPath(info.BlobRef(), dir.trashdir())
if err != nil {
return err
}
trashVerPath := blobPathForFormatVersion(trashBasePath, info.StorageFormatVersion())
// ensure the dirs exist for blobs path
err = os.MkdirAll(filepath.Dir(blobsVerPath), dirPermission)
if err != nil && !os.IsExist(err) {
return err
}
// move back to blobsdir
err = rename(trashVerPath, blobsVerPath)
if os.IsNotExist(err) {
// no piece at that path; either it has a different storage format
// version or there was a concurrent call. (This function is expected
// by callers to return a nil error in the case of concurrent calls.)
return nil
}
return err
})
}
// iterateStorageFormatVersions executes f for all storage format versions,
// starting with the oldest format version. It is more likely, in the general
// case, that we will find the piece with the newest format version instead,
// but if we iterate backward here then we run the risk of a race condition:
// the piece might have existed with _SomeOldVer before the call, and could
// then have been updated atomically with _MaxVer concurrently while we were
// iterating. If we iterate _forwards_, this race should not occur because it
// is assumed that pieces are never rewritten with an _older_ storage format
// version.
//
// f will be executed for every storage formate version regardless of the
// result, and will aggregate errors into a single returned error
func (dir *Dir) iterateStorageFormatVersions(ctx context.Context, ref storage.BlobRef, f func(ctx context.Context, ref storage.BlobRef, i storage.FormatVersion) error) (err error) {
defer mon.Task()(&ctx)(&err)
var combinedErrors errs.Group
for i := MinFormatVersionSupported; i <= MaxFormatVersionSupported; i++ {
combinedErrors.Add(f(ctx, ref, i))
}
return combinedErrors.Err()
}
// Delete deletes blobs with the specified ref (in all supported storage formats). // Delete deletes blobs with the specified ref (in all supported storage formats).
func (dir *Dir) Delete(ctx context.Context, ref storage.BlobRef) (err error) { func (dir *Dir) Delete(ctx context.Context, ref storage.BlobRef) (err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
return dir.iterateStorageFormatVersions(ctx, ref, dir.DeleteWithStorageFormat)
var combinedErrors errs.Group
// Try deleting all possible paths, starting with the oldest format version. It is more
// likely, in the general case, that we will find the piece with the newest format version
// instead, but if we iterate backward here then we run the risk of a race condition: the
// piece might have existed with _SomeOldVer before the Delete call, and could then have
// been updated atomically with _MaxVer concurrently while we were iterating. If we iterate
// _forwards_, this race should not occur because it is assumed that pieces are never
// rewritten with an _older_ storage format version.
for i := MinFormatVersionSupported; i <= MaxFormatVersionSupported; i++ {
combinedErrors.Add(dir.DeleteWithStorageFormat(ctx, ref, i))
}
return combinedErrors.Err()
} }
// DeleteWithStorageFormat deletes the blob with the specified ref for one specific format version // DeleteWithStorageFormat deletes the blob with the specified ref for one specific format version
@ -287,11 +389,11 @@ func (dir *Dir) DeleteWithStorageFormat(ctx context.Context, ref storage.BlobRef
return err return err
} }
trashPath := dir.blobToTrashPath(ref) garbagePath := dir.blobToGarbagePath(ref)
verPath := blobPathForFormatVersion(pathBase, formatVer) verPath := blobPathForFormatVersion(pathBase, formatVer)
// move to trash folder, this is allowed for some OS-es // move to garbage folder, this is allowed for some OS-es
moveErr := rename(verPath, trashPath) moveErr := rename(verPath, garbagePath)
if os.IsNotExist(moveErr) { if os.IsNotExist(moveErr) {
// no piece at that path; either it has a different storage format // no piece at that path; either it has a different storage format
// version or there was a concurrent delete. (this function is expected // version or there was a concurrent delete. (this function is expected
@ -299,13 +401,13 @@ func (dir *Dir) DeleteWithStorageFormat(ctx context.Context, ref storage.BlobRef
return nil return nil
} }
if moveErr != nil { if moveErr != nil {
// piece could not be moved into the trash dir; we'll try removing it // piece could not be moved into the garbage dir; we'll try removing it
// directly // directly
trashPath = verPath garbagePath = verPath
} }
// try removing the file // try removing the file
err = os.Remove(trashPath) err = os.Remove(garbagePath)
// ignore concurrent deletes // ignore concurrent deletes
if os.IsNotExist(err) { if os.IsNotExist(err) {
@ -318,7 +420,7 @@ func (dir *Dir) DeleteWithStorageFormat(ctx context.Context, ref storage.BlobRef
// queue to be retried later. // queue to be retried later.
if err != nil { if err != nil {
dir.mu.Lock() dir.mu.Lock()
dir.deleteQueue = append(dir.deleteQueue, trashPath) dir.deleteQueue = append(dir.deleteQueue, garbagePath)
dir.mu.Unlock() dir.mu.Unlock()
mon.Event("delete_deferred_to_queue") mon.Event("delete_deferred_to_queue")
} }
@ -357,7 +459,7 @@ func (dir *Dir) GarbageCollect(ctx context.Context) (err error) {
dir.mu.Unlock() dir.mu.Unlock()
} }
// remove anything left in the trashdir // remove anything left in the garbagedir
_ = removeAllContent(ctx, dir.garbagedir()) _ = removeAllContent(ctx, dir.garbagedir())
return nil return nil
} }
@ -368,8 +470,12 @@ const nameBatchSize = 1024
// guaranteed to contain any blobs. // guaranteed to contain any blobs.
func (dir *Dir) ListNamespaces(ctx context.Context) (ids [][]byte, err error) { func (dir *Dir) ListNamespaces(ctx context.Context) (ids [][]byte, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
topBlobDir := dir.blobsdir() return dir.listNamespacesInPath(ctx, dir.blobsdir())
openDir, err := os.Open(topBlobDir) }
func (dir *Dir) listNamespacesInPath(ctx context.Context, path string) (ids [][]byte, err error) {
defer mon.Task()(&ctx)(&err)
openDir, err := os.Open(path)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -399,8 +505,14 @@ func (dir *Dir) ListNamespaces(ctx context.Context) (ids [][]byte, err error) {
// iterating and return the error immediately. The ctx parameter is intended specifically to allow // iterating and return the error immediately. The ctx parameter is intended specifically to allow
// canceling iteration early. // canceling iteration early.
func (dir *Dir) WalkNamespace(ctx context.Context, namespace []byte, walkFunc func(storage.BlobInfo) error) (err error) { func (dir *Dir) WalkNamespace(ctx context.Context, namespace []byte, walkFunc func(storage.BlobInfo) error) (err error) {
defer mon.Task()(&ctx)(&err)
return dir.walkNamespaceInPath(ctx, namespace, dir.blobsdir(), walkFunc)
}
func (dir *Dir) walkNamespaceInPath(ctx context.Context, namespace []byte, path string, walkFunc func(storage.BlobInfo) error) (err error) {
defer mon.Task()(&ctx)(&err)
namespaceDir := pathEncoding.EncodeToString(namespace) namespaceDir := pathEncoding.EncodeToString(namespace)
nsDir := filepath.Join(dir.blobsdir(), namespaceDir) nsDir := filepath.Join(path, namespaceDir)
openDir, err := os.Open(nsDir) openDir, err := os.Open(nsDir)
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
@ -431,7 +543,7 @@ func (dir *Dir) WalkNamespace(ctx context.Context, namespace []byte, walkFunc fu
// don't need to pass on this error // don't need to pass on this error
continue continue
} }
err := dir.walkNamespaceWithPrefix(ctx, namespace, nsDir, keyPrefix, walkFunc) err := walkNamespaceWithPrefix(ctx, namespace, nsDir, keyPrefix, walkFunc)
if err != nil { if err != nil {
return err return err
} }
@ -458,7 +570,7 @@ func decodeBlobInfo(namespace []byte, keyPrefix, keyDir string, keyInfo os.FileI
return newBlobInfo(ref, filepath.Join(keyDir, blobFileName), keyInfo, formatVer), true return newBlobInfo(ref, filepath.Join(keyDir, blobFileName), keyInfo, formatVer), true
} }
func (dir *Dir) walkNamespaceWithPrefix(ctx context.Context, namespace []byte, nsDir, keyPrefix string, walkFunc func(storage.BlobInfo) error) (err error) { func walkNamespaceWithPrefix(ctx context.Context, namespace []byte, nsDir, keyPrefix string, walkFunc func(storage.BlobInfo) error) (err error) {
keyDir := filepath.Join(nsDir, keyPrefix) keyDir := filepath.Join(nsDir, keyPrefix)
openDir, err := os.Open(keyDir) openDir, err := os.Open(keyDir)
if err != nil { if err != nil {

View File

@ -101,6 +101,20 @@ func (store *blobStore) DeleteWithStorageFormat(ctx context.Context, ref storage
return Error.Wrap(err) return Error.Wrap(err)
} }
// Trash moves the ref to a trash directory
func (store *blobStore) Trash(ctx context.Context, ref storage.BlobRef) (err error) {
defer mon.Task()(&ctx)(&err)
err = store.dir.Trash(ctx, ref)
return Error.Wrap(err)
}
// RestoreTrash moves every piece in the trash back into the regular location
func (store *blobStore) RestoreTrash(ctx context.Context, namespace []byte) (err error) {
defer mon.Task()(&ctx)(&err)
err = store.dir.RestoreTrash(ctx, namespace)
return Error.Wrap(err)
}
// GarbageCollect tries to delete any files that haven't yet been deleted // GarbageCollect tries to delete any files that haven't yet been deleted
func (store *blobStore) GarbageCollect(ctx context.Context) (err error) { func (store *blobStore) GarbageCollect(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)

View File

@ -529,3 +529,160 @@ func TestStoreTraversals(t *testing.T) {
assert.Equal(t, err, expectedErr) assert.Equal(t, err, expectedErr)
assert.Equal(t, 2, iterations) assert.Equal(t, 2, iterations)
} }
func TestTrashAndRestore(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
store, err := filestore.NewAt(zaptest.NewLogger(t), ctx.Dir("store"))
require.NoError(t, err)
ctx.Check(store.Close)
size := memory.KB
type testfile struct {
data []byte
formatVer storage.FormatVersion
}
type testref struct {
key []byte
files []testfile
}
type testnamespace struct {
namespace []byte
refs []testref
}
namespaces := []testnamespace{
{
namespace: testrand.Bytes(namespaceSize),
refs: []testref{
{
// Has v0 and v1
key: testrand.Bytes(keySize),
files: []testfile{
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV0,
},
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV1,
},
},
},
{
// Has v0 only
key: testrand.Bytes(keySize),
files: []testfile{
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV0,
},
},
},
{
// Has v1 only
key: testrand.Bytes(keySize),
files: []testfile{
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV0,
},
},
},
},
},
{
namespace: testrand.Bytes(namespaceSize),
refs: []testref{
{
// Has v1 only
key: testrand.Bytes(keySize),
files: []testfile{
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV0,
},
},
},
},
},
}
for _, namespace := range namespaces {
for _, ref := range namespace.refs {
blobref := storage.BlobRef{
Namespace: namespace.namespace,
Key: ref.key,
}
for _, file := range ref.files {
var w storage.BlobWriter
if file.formatVer == filestore.FormatV0 {
fStore, ok := store.(interface {
TestCreateV0(ctx context.Context, ref storage.BlobRef) (_ storage.BlobWriter, err error)
})
require.Truef(t, ok, "can't make TestCreateV0 with this blob store (%T)", store)
w, err = fStore.TestCreateV0(ctx, blobref)
} else if file.formatVer == filestore.FormatV1 {
w, err = store.Create(ctx, blobref, int64(size))
}
require.NoError(t, err)
require.NotNil(t, w)
_, err = w.Write(file.data)
require.NoError(t, err)
require.NoError(t, w.Commit(ctx))
requireFileMatches(ctx, t, store, file.data, blobref, file.formatVer)
}
// Trash the ref
require.NoError(t, store.Trash(ctx, blobref))
// Verify files are gone
for _, file := range ref.files {
_, err = store.OpenWithStorageFormat(ctx, blobref, file.formatVer)
require.Error(t, err)
require.True(t, os.IsNotExist(err))
}
}
}
// Restore the first namespace
require.NoError(t, store.RestoreTrash(ctx, namespaces[0].namespace))
// Verify pieces are back and look good for first namespace
for _, ref := range namespaces[0].refs {
blobref := storage.BlobRef{
Namespace: namespaces[0].namespace,
Key: ref.key,
}
for _, file := range ref.files {
requireFileMatches(ctx, t, store, file.data, blobref, file.formatVer)
}
}
// Verify pieces in second namespace are still missing (were not restored)
for _, ref := range namespaces[1].refs {
blobref := storage.BlobRef{
Namespace: namespaces[1].namespace,
Key: ref.key,
}
for _, file := range ref.files {
r, err := store.OpenWithStorageFormat(ctx, blobref, file.formatVer)
require.Error(t, err)
require.Nil(t, r)
}
}
}
func requireFileMatches(ctx context.Context, t *testing.T, store storage.Blobs, data []byte, ref storage.BlobRef, formatVer storage.FormatVersion) {
r, err := store.OpenWithStorageFormat(ctx, ref, formatVer)
require.NoError(t, err)
buf, err := ioutil.ReadAll(r)
require.NoError(t, err)
require.Equal(t, buf, data)
}