storj/storage/filestore/store_test.go
Cameron Ayer 586e6f2f13 private/testblobs, storage, storage/filestore: add storage dir verification to filestore
Sometimes SNOs fail to properly configure or lose connection to their storage directory
which can result in DQ. This causes unnecessary repair and is unfortunate for all parties.

This change introduces the creation of a special file in the storage directory at runtime
containing the node ID. While the storage node runs, it periodically verifies that it can
find said file with the correct contents in the correct location. If not, the node will
shut down with an error message.

This change will solve the issue of nodes losing access to the storage directory, but it will not
solve the issue of nodes pointing to the wrong directory, as the identifying file is created each
time the node starts up. After this change has been the minimum version for a few releases, we will
remove the creation of the directory-identifying file from the storage node run command and add it
to the setup command.

Change-Id: Ib7b10e96ac07373219835e39239e93957e7667a4
2020-08-19 17:18:14 +00:00

921 lines
25 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package filestore_test
import (
"bytes"
"context"
"io"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"sort"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/zeebo/errs"
"go.uber.org/zap/zaptest"
"storj.io/common/identity/testidentity"
"storj.io/common/memory"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/storage"
"storj.io/storj/storage/filestore"
)
const (
namespaceSize = 32
keySize = 32
)
func TestStoreLoad(t *testing.T) {
const blobSize = 8 << 10
const repeatCount = 16
ctx := testcontext.New(t)
defer ctx.Cleanup()
store, err := filestore.NewAt(zaptest.NewLogger(t), ctx.Dir("store"), filestore.DefaultConfig)
require.NoError(t, err)
ctx.Check(store.Close)
data := testrand.Bytes(blobSize)
temp := make([]byte, len(data))
refs := []storage.BlobRef{}
namespace := testrand.Bytes(32)
// store without size
for i := 0; i < repeatCount; i++ {
ref := storage.BlobRef{
Namespace: namespace,
Key: testrand.Bytes(32),
}
refs = append(refs, ref)
writer, err := store.Create(ctx, ref, -1)
require.NoError(t, err)
n, err := writer.Write(data)
require.NoError(t, err)
require.Equal(t, n, len(data))
require.NoError(t, writer.Commit(ctx))
// after committing we should be able to call cancel without an error
require.NoError(t, writer.Cancel(ctx))
// two commits should fail
require.Error(t, writer.Commit(ctx))
}
namespace = testrand.Bytes(32)
// store with size
for i := 0; i < repeatCount; i++ {
ref := storage.BlobRef{
Namespace: namespace,
Key: testrand.Bytes(32),
}
refs = append(refs, ref)
writer, err := store.Create(ctx, ref, int64(len(data)))
require.NoError(t, err)
n, err := writer.Write(data)
require.NoError(t, err)
require.Equal(t, n, len(data))
require.NoError(t, writer.Commit(ctx))
}
namespace = testrand.Bytes(32)
// store with larger size
{
ref := storage.BlobRef{
Namespace: namespace,
Key: testrand.Bytes(32),
}
refs = append(refs, ref)
writer, err := store.Create(ctx, ref, int64(len(data)*2))
require.NoError(t, err)
n, err := writer.Write(data)
require.NoError(t, err)
require.Equal(t, n, len(data))
require.NoError(t, writer.Commit(ctx))
}
namespace = testrand.Bytes(32)
// store with error
{
ref := storage.BlobRef{
Namespace: namespace,
Key: testrand.Bytes(32),
}
writer, err := store.Create(ctx, ref, -1)
require.NoError(t, err)
n, err := writer.Write(data)
require.NoError(t, err)
require.Equal(t, n, len(data))
require.NoError(t, writer.Cancel(ctx))
// commit after cancel should return an error
require.Error(t, writer.Commit(ctx))
_, err = store.Open(ctx, ref)
require.Error(t, err)
}
// try reading all the blobs
for _, ref := range refs {
reader, err := store.Open(ctx, ref)
require.NoError(t, err)
size, err := reader.Size()
require.NoError(t, err)
require.Equal(t, size, int64(len(data)))
_, err = io.ReadFull(reader, temp)
require.NoError(t, err)
require.NoError(t, reader.Close())
require.Equal(t, data, temp)
}
// delete the blobs
for _, ref := range refs {
err := store.Delete(ctx, ref)
require.NoError(t, err)
}
// try reading all the blobs
for _, ref := range refs {
_, err := store.Open(ctx, ref)
require.Error(t, err)
}
}
func TestDeleteWhileReading(t *testing.T) {
const blobSize = 8 << 10
ctx := testcontext.New(t)
defer ctx.Cleanup()
store, err := filestore.NewAt(zaptest.NewLogger(t), ctx.Dir("store"), filestore.DefaultConfig)
require.NoError(t, err)
ctx.Check(store.Close)
data := testrand.Bytes(blobSize)
ref := storage.BlobRef{
Namespace: []byte{0},
Key: []byte{1},
}
writer, err := store.Create(ctx, ref, -1)
require.NoError(t, err)
_, err = writer.Write(data)
require.NoError(t, err)
// loading uncommitted file should fail
_, err = store.Open(ctx, ref)
require.Error(t, err, "loading uncommitted file should fail")
// commit the file
err = writer.Commit(ctx)
require.NoError(t, err, "commit the file")
// open a reader
reader, err := store.Open(ctx, ref)
require.NoError(t, err, "open a reader")
// double close, just in case
defer func() { _ = reader.Close() }()
// delete while reading
err = store.Delete(ctx, ref)
require.NoError(t, err, "delete while reading")
// opening deleted file should fail
_, err = store.Open(ctx, ref)
require.Error(t, err, "opening deleted file should fail")
// read all content
result, err := ioutil.ReadAll(reader)
require.NoError(t, err, "read all content")
// finally close reader
err = reader.Close()
require.NoError(t, err)
// should be able to read the full content
require.Equal(t, data, result)
// collect trash
gStore := store.(interface {
GarbageCollect(ctx context.Context) error
})
_ = gStore.GarbageCollect(ctx)
// flaky test, for checking whether files have been actually deleted from disk
err = filepath.Walk(ctx.Dir("store"), func(path string, info os.FileInfo, _ error) error {
if info.IsDir() {
return nil
}
return errs.New("found file %q", path)
})
if err != nil {
t.Fatal(err)
}
}
func writeABlob(ctx context.Context, t testing.TB, store storage.Blobs, blobRef storage.BlobRef, data []byte, formatVersion storage.FormatVersion) {
var (
blobWriter storage.BlobWriter
err error
)
switch formatVersion {
case filestore.FormatV0:
fStore, ok := store.(interface {
TestCreateV0(ctx context.Context, ref storage.BlobRef) (_ storage.BlobWriter, err error)
})
require.Truef(t, ok, "can't make a WriterForFormatVersion with this blob store (%T)", store)
blobWriter, err = fStore.TestCreateV0(ctx, blobRef)
case filestore.FormatV1:
blobWriter, err = store.Create(ctx, blobRef, int64(len(data)))
default:
t.Fatalf("please teach me how to make a V%d blob", formatVersion)
}
require.NoError(t, err)
require.Equal(t, formatVersion, blobWriter.StorageFormatVersion())
_, err = blobWriter.Write(data)
require.NoError(t, err)
size, err := blobWriter.Size()
require.NoError(t, err)
assert.Equal(t, int64(len(data)), size)
err = blobWriter.Commit(ctx)
require.NoError(t, err)
}
func verifyBlobHandle(t testing.TB, reader storage.BlobReader, expectDataLen int, expectFormat storage.FormatVersion) {
assert.Equal(t, expectFormat, reader.StorageFormatVersion())
size, err := reader.Size()
require.NoError(t, err)
assert.Equal(t, int64(expectDataLen), size)
}
func verifyBlobInfo(ctx context.Context, t testing.TB, blobInfo storage.BlobInfo, expectDataLen int, expectFormat storage.FormatVersion) {
assert.Equal(t, expectFormat, blobInfo.StorageFormatVersion())
stat, err := blobInfo.Stat(ctx)
require.NoError(t, err)
assert.Equal(t, int64(expectDataLen), stat.Size())
}
func tryOpeningABlob(ctx context.Context, t testing.TB, store storage.Blobs, blobRef storage.BlobRef, expectDataLen int, expectFormat storage.FormatVersion) {
reader, err := store.Open(ctx, blobRef)
require.NoError(t, err)
verifyBlobHandle(t, reader, expectDataLen, expectFormat)
require.NoError(t, reader.Close())
blobInfo, err := store.Stat(ctx, blobRef)
require.NoError(t, err)
verifyBlobInfo(ctx, t, blobInfo, expectDataLen, expectFormat)
blobInfo, err = store.StatWithStorageFormat(ctx, blobRef, expectFormat)
require.NoError(t, err)
verifyBlobInfo(ctx, t, blobInfo, expectDataLen, expectFormat)
reader, err = store.OpenWithStorageFormat(ctx, blobInfo.BlobRef(), blobInfo.StorageFormatVersion())
require.NoError(t, err)
verifyBlobHandle(t, reader, expectDataLen, expectFormat)
require.NoError(t, reader.Close())
}
func TestMultipleStorageFormatVersions(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
store, err := filestore.NewAt(zaptest.NewLogger(t), ctx.Dir("store"), filestore.DefaultConfig)
require.NoError(t, err)
ctx.Check(store.Close)
const blobSize = 1024
var (
data = testrand.Bytes(blobSize)
namespace = testrand.Bytes(namespaceSize)
v0BlobKey = testrand.Bytes(keySize)
v1BlobKey = testrand.Bytes(keySize)
v0Ref = storage.BlobRef{Namespace: namespace, Key: v0BlobKey}
v1Ref = storage.BlobRef{Namespace: namespace, Key: v1BlobKey}
)
// write a V0 blob
writeABlob(ctx, t, store, v0Ref, data, filestore.FormatV0)
// write a V1 blob
writeABlob(ctx, t, store, v1Ref, data, filestore.FormatV1)
// look up the different blobs with Open and Stat and OpenWithStorageFormat
tryOpeningABlob(ctx, t, store, v0Ref, len(data), filestore.FormatV0)
tryOpeningABlob(ctx, t, store, v1Ref, len(data), filestore.FormatV1)
// write a V1 blob with the same ID as the V0 blob (to simulate it being rewritten as
// V1 during a migration), with different data so we can distinguish them
differentData := make([]byte, len(data)+2)
copy(differentData, data)
copy(differentData[len(data):], "\xff\x00")
writeABlob(ctx, t, store, v0Ref, differentData, filestore.FormatV1)
// if we try to access the blob at that key, we should see only the V1 blob
tryOpeningABlob(ctx, t, store, v0Ref, len(differentData), filestore.FormatV1)
// unless we ask specifically for a V0 blob
blobInfo, err := store.StatWithStorageFormat(ctx, v0Ref, filestore.FormatV0)
require.NoError(t, err)
verifyBlobInfo(ctx, t, blobInfo, len(data), filestore.FormatV0)
reader, err := store.OpenWithStorageFormat(ctx, blobInfo.BlobRef(), blobInfo.StorageFormatVersion())
require.NoError(t, err)
verifyBlobHandle(t, reader, len(data), filestore.FormatV0)
require.NoError(t, reader.Close())
// delete the v0BlobKey; both the V0 and the V1 blobs should go away
err = store.Delete(ctx, v0Ref)
require.NoError(t, err)
reader, err = store.Open(ctx, v0Ref)
require.Error(t, err)
assert.Nil(t, reader)
}
// Check that the SpaceUsedForBlobs and SpaceUsedForBlobsInNamespace methods on
// filestore.blobStore work as expected.
func TestStoreSpaceUsed(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
store, err := filestore.NewAt(zaptest.NewLogger(t), ctx.Dir("store"), filestore.DefaultConfig)
require.NoError(t, err)
ctx.Check(store.Close)
var (
namespace = testrand.Bytes(namespaceSize)
otherNamespace = testrand.Bytes(namespaceSize)
sizesToStore = []memory.Size{4093, 0, 512, 1, memory.MB}
)
spaceUsed, err := store.SpaceUsedForBlobs(ctx)
require.NoError(t, err)
assert.Equal(t, int64(0), spaceUsed)
spaceUsed, err = store.SpaceUsedForBlobsInNamespace(ctx, namespace)
require.NoError(t, err)
assert.Equal(t, int64(0), spaceUsed)
spaceUsed, err = store.SpaceUsedForBlobsInNamespace(ctx, otherNamespace)
require.NoError(t, err)
assert.Equal(t, int64(0), spaceUsed)
var totalSoFar memory.Size
for _, size := range sizesToStore {
contents := testrand.Bytes(size)
blobRef := storage.BlobRef{Namespace: namespace, Key: testrand.Bytes(keySize)}
blobWriter, err := store.Create(ctx, blobRef, int64(len(contents)))
require.NoError(t, err)
_, err = blobWriter.Write(contents)
require.NoError(t, err)
err = blobWriter.Commit(ctx)
require.NoError(t, err)
totalSoFar += size
spaceUsed, err := store.SpaceUsedForBlobs(ctx)
require.NoError(t, err)
assert.Equal(t, int64(totalSoFar), spaceUsed)
spaceUsed, err = store.SpaceUsedForBlobsInNamespace(ctx, namespace)
require.NoError(t, err)
assert.Equal(t, int64(totalSoFar), spaceUsed)
spaceUsed, err = store.SpaceUsedForBlobsInNamespace(ctx, otherNamespace)
require.NoError(t, err)
assert.Equal(t, int64(0), spaceUsed)
}
}
// Check that ListNamespaces and WalkNamespace work as expected.
func TestStoreTraversals(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
store, err := filestore.NewAt(zaptest.NewLogger(t), ctx.Dir("store"), filestore.DefaultConfig)
require.NoError(t, err)
ctx.Check(store.Close)
// invent some namespaces and store stuff in them
type namespaceWithBlobs struct {
namespace []byte
blobs []storage.BlobRef
}
const numNamespaces = 4
recordsToInsert := make([]namespaceWithBlobs, numNamespaces)
var namespaceBase = testrand.Bytes(namespaceSize)
for i := range recordsToInsert {
// give each namespace a similar ID but modified in the last byte to distinguish
recordsToInsert[i].namespace = make([]byte, len(namespaceBase))
copy(recordsToInsert[i].namespace, namespaceBase)
recordsToInsert[i].namespace[len(namespaceBase)-1] = byte(i)
// put varying numbers of blobs in the namespaces
recordsToInsert[i].blobs = make([]storage.BlobRef, i+1)
for j := range recordsToInsert[i].blobs {
recordsToInsert[i].blobs[j] = storage.BlobRef{
Namespace: recordsToInsert[i].namespace,
Key: testrand.Bytes(keySize),
}
blobWriter, err := store.Create(ctx, recordsToInsert[i].blobs[j], 0)
require.NoError(t, err)
// also vary the sizes of the blobs so we can check Stat results
_, err = blobWriter.Write(testrand.Bytes(memory.Size(j)))
require.NoError(t, err)
err = blobWriter.Commit(ctx)
require.NoError(t, err)
}
}
// test ListNamespaces
gotNamespaces, err := store.ListNamespaces(ctx)
require.NoError(t, err)
sort.Slice(gotNamespaces, func(i, j int) bool {
return bytes.Compare(gotNamespaces[i], gotNamespaces[j]) < 0
})
sort.Slice(recordsToInsert, func(i, j int) bool {
return bytes.Compare(recordsToInsert[i].namespace, recordsToInsert[j].namespace) < 0
})
for i, expected := range recordsToInsert {
require.Equalf(t, expected.namespace, gotNamespaces[i], "mismatch at index %d: recordsToInsert is %+v and gotNamespaces is %v", i, recordsToInsert, gotNamespaces)
}
// test WalkNamespace
for _, expected := range recordsToInsert {
// this isn't strictly necessary, since the function closure below is not persisted
// past the end of a loop iteration, but this keeps the linter from complaining.
expected := expected
// keep track of which blobs we visit with WalkNamespace
found := make([]bool, len(expected.blobs))
err = store.WalkNamespace(ctx, expected.namespace, func(info storage.BlobInfo) error {
gotBlobRef := info.BlobRef()
assert.Equal(t, expected.namespace, gotBlobRef.Namespace)
// find which blob this is in expected.blobs
blobIdentified := -1
for i, expectedBlobRef := range expected.blobs {
if bytes.Equal(gotBlobRef.Key, expectedBlobRef.Key) {
found[i] = true
blobIdentified = i
}
}
// make sure this is a blob we actually put in
require.NotEqualf(t, -1, blobIdentified,
"WalkNamespace gave BlobRef %v, but I don't remember storing that",
gotBlobRef)
// check BlobInfo sanity
stat, err := info.Stat(ctx)
require.NoError(t, err)
nameFromStat := stat.Name()
fullPath, err := info.FullPath(ctx)
require.NoError(t, err)
basePath := filepath.Base(fullPath)
assert.Equal(t, nameFromStat, basePath)
assert.Equal(t, int64(blobIdentified), stat.Size())
assert.False(t, stat.IsDir())
return nil
})
require.NoError(t, err)
// make sure all blobs were visited
for i := range found {
assert.True(t, found[i],
"WalkNamespace never yielded blob at index %d: %v",
i, expected.blobs[i])
}
}
// test WalkNamespace on a nonexistent namespace also
namespaceBase[len(namespaceBase)-1] = byte(numNamespaces)
err = store.WalkNamespace(ctx, namespaceBase, func(_ storage.BlobInfo) error {
t.Fatal("this should not have been called")
return nil
})
require.NoError(t, err)
// check that WalkNamespace stops iterating after an error return
iterations := 0
expectedErr := errs.New("an expected error")
err = store.WalkNamespace(ctx, recordsToInsert[numNamespaces-1].namespace, func(_ storage.BlobInfo) error {
iterations++
if iterations == 2 {
return expectedErr
}
return nil
})
assert.Error(t, err)
assert.Equal(t, err, expectedErr)
assert.Equal(t, 2, iterations)
}
func TestEmptyTrash(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
store, err := filestore.NewAt(zaptest.NewLogger(t), ctx.Dir("store"), filestore.DefaultConfig)
require.NoError(t, err)
ctx.Check(store.Close)
size := memory.KB
type testfile struct {
data []byte
formatVer storage.FormatVersion
}
type testref struct {
key []byte
files []testfile
}
type testnamespace struct {
namespace []byte
refs []testref
}
namespaces := []testnamespace{
{
namespace: testrand.Bytes(namespaceSize),
refs: []testref{
{
// Has v0 and v1
key: testrand.Bytes(keySize),
files: []testfile{
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV0,
},
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV1,
},
},
},
{
// Has v0 only
key: testrand.Bytes(keySize),
files: []testfile{
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV0,
},
},
},
{
// Has v1 only
key: testrand.Bytes(keySize),
files: []testfile{
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV0,
},
},
},
},
},
{
namespace: testrand.Bytes(namespaceSize),
refs: []testref{
{
// Has v1 only
key: testrand.Bytes(keySize),
files: []testfile{
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV0,
},
},
},
},
},
}
for _, namespace := range namespaces {
for _, ref := range namespace.refs {
blobref := storage.BlobRef{
Namespace: namespace.namespace,
Key: ref.key,
}
for _, file := range ref.files {
var w storage.BlobWriter
if file.formatVer == filestore.FormatV0 {
fStore, ok := store.(interface {
TestCreateV0(ctx context.Context, ref storage.BlobRef) (_ storage.BlobWriter, err error)
})
require.Truef(t, ok, "can't make TestCreateV0 with this blob store (%T)", store)
w, err = fStore.TestCreateV0(ctx, blobref)
} else if file.formatVer == filestore.FormatV1 {
w, err = store.Create(ctx, blobref, int64(size))
}
require.NoError(t, err)
require.NotNil(t, w)
_, err = w.Write(file.data)
require.NoError(t, err)
require.NoError(t, w.Commit(ctx))
requireFileMatches(ctx, t, store, file.data, blobref, file.formatVer)
}
// Trash the ref
require.NoError(t, store.Trash(ctx, blobref))
}
}
// Restore the first namespace
var expectedFilesEmptied int64
for _, ref := range namespaces[0].refs {
for range ref.files {
expectedFilesEmptied++
}
}
emptiedBytes, keys, err := store.EmptyTrash(ctx, namespaces[0].namespace, time.Now().Add(time.Hour))
require.NoError(t, err)
assert.Equal(t, expectedFilesEmptied*int64(size), emptiedBytes)
assert.Equal(t, int(expectedFilesEmptied), len(keys))
}
func TestTrashAndRestore(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
store, err := filestore.NewAt(zaptest.NewLogger(t), ctx.Dir("store"), filestore.DefaultConfig)
require.NoError(t, err)
ctx.Check(store.Close)
size := memory.KB
type testfile struct {
data []byte
formatVer storage.FormatVersion
}
type testref struct {
key []byte
files []testfile
}
type testnamespace struct {
namespace []byte
refs []testref
}
namespaces := []testnamespace{
{
namespace: testrand.Bytes(namespaceSize),
refs: []testref{
{
// Has v0 and v1
key: testrand.Bytes(keySize),
files: []testfile{
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV0,
},
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV1,
},
},
},
{
// Has v0 only
key: testrand.Bytes(keySize),
files: []testfile{
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV0,
},
},
},
{
// Has v1 only
key: testrand.Bytes(keySize),
files: []testfile{
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV0,
},
},
},
},
},
{
namespace: testrand.Bytes(namespaceSize),
refs: []testref{
{
// Has v1 only
key: testrand.Bytes(keySize),
files: []testfile{
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV0,
},
},
},
},
},
}
for _, namespace := range namespaces {
for _, ref := range namespace.refs {
blobref := storage.BlobRef{
Namespace: namespace.namespace,
Key: ref.key,
}
for _, file := range ref.files {
var w storage.BlobWriter
if file.formatVer == filestore.FormatV0 {
fStore, ok := store.(interface {
TestCreateV0(ctx context.Context, ref storage.BlobRef) (_ storage.BlobWriter, err error)
})
require.Truef(t, ok, "can't make TestCreateV0 with this blob store (%T)", store)
w, err = fStore.TestCreateV0(ctx, blobref)
} else if file.formatVer == filestore.FormatV1 {
w, err = store.Create(ctx, blobref, int64(size))
}
require.NoError(t, err)
require.NotNil(t, w)
_, err = w.Write(file.data)
require.NoError(t, err)
require.NoError(t, w.Commit(ctx))
requireFileMatches(ctx, t, store, file.data, blobref, file.formatVer)
}
// Trash the ref
require.NoError(t, store.Trash(ctx, blobref))
// Verify files are gone
for _, file := range ref.files {
_, err = store.OpenWithStorageFormat(ctx, blobref, file.formatVer)
require.Error(t, err)
require.True(t, os.IsNotExist(err))
}
}
}
// Restore the first namespace
var expKeysRestored [][]byte
for _, ref := range namespaces[0].refs {
for range ref.files {
expKeysRestored = append(expKeysRestored, ref.key)
}
}
sort.Slice(expKeysRestored, func(i int, j int) bool { return expKeysRestored[i][0] < expKeysRestored[j][0] })
restoredKeys, err := store.RestoreTrash(ctx, namespaces[0].namespace)
sort.Slice(restoredKeys, func(i int, j int) bool { return restoredKeys[i][0] < restoredKeys[j][0] })
require.NoError(t, err)
assert.Equal(t, expKeysRestored, restoredKeys)
// Verify pieces are back and look good for first namespace
for _, ref := range namespaces[0].refs {
blobref := storage.BlobRef{
Namespace: namespaces[0].namespace,
Key: ref.key,
}
for _, file := range ref.files {
requireFileMatches(ctx, t, store, file.data, blobref, file.formatVer)
}
}
// Verify pieces in second namespace are still missing (were not restored)
for _, ref := range namespaces[1].refs {
blobref := storage.BlobRef{
Namespace: namespaces[1].namespace,
Key: ref.key,
}
for _, file := range ref.files {
r, err := store.OpenWithStorageFormat(ctx, blobref, file.formatVer)
require.Error(t, err)
require.Nil(t, r)
}
}
}
func requireFileMatches(ctx context.Context, t *testing.T, store storage.Blobs, data []byte, ref storage.BlobRef, formatVer storage.FormatVersion) {
r, err := store.OpenWithStorageFormat(ctx, ref, formatVer)
require.NoError(t, err)
buf, err := ioutil.ReadAll(r)
require.NoError(t, err)
require.Equal(t, buf, data)
}
// TestBlobMemoryBuffer ensures that buffering doesn't have problems with
// small writes randomly seeked through the file.
func TestBlobMemoryBuffer(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
const size = 2048
store, err := filestore.NewAt(zaptest.NewLogger(t), ctx.Dir("store"), filestore.Config{
WriteBufferSize: 1 * memory.KiB,
})
require.NoError(t, err)
ctx.Check(store.Close)
ref := storage.BlobRef{
Namespace: testrand.Bytes(32),
Key: testrand.Bytes(32),
}
writer, err := store.Create(ctx, ref, size)
require.NoError(t, err)
for _, v := range rand.Perm(size) {
_, err := writer.Seek(int64(v), io.SeekStart)
require.NoError(t, err)
n, err := writer.Write([]byte{byte(v)})
require.NoError(t, err)
require.Equal(t, n, 1)
}
_, err = writer.Seek(size, io.SeekStart)
require.NoError(t, err)
require.NoError(t, writer.Commit(ctx))
reader, err := store.Open(ctx, ref)
require.NoError(t, err)
buf, err := ioutil.ReadAll(reader)
require.NoError(t, err)
for i := range buf {
require.Equal(t, byte(i), buf[i])
}
require.Equal(t, size, len(buf))
}
func TestStorageDirVerification(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
log := zaptest.NewLogger(t)
dir, err := filestore.NewDir(log, ctx.Dir("store"))
require.NoError(t, err)
ident0, err := testidentity.NewTestIdentity(ctx)
require.NoError(t, err)
ident1, err := testidentity.NewTestIdentity(ctx)
require.NoError(t, err)
store := filestore.New(log, dir, filestore.Config{
WriteBufferSize: 1 * memory.KiB,
})
// test nonexistent file returns error
require.Error(t, store.VerifyStorageDir(ident0.ID))
require.NoError(t, dir.CreateVerificationFile(ident0.ID))
// test correct ID returns no error
require.NoError(t, store.VerifyStorageDir(ident0.ID))
// test incorrect ID returns error
err = store.VerifyStorageDir(ident1.ID)
require.Contains(t, err.Error(), "does not match running node's ID")
// test invalid node ID returns error
f, err := os.Create(filepath.Join(dir.Path(), "storage-dir-verification"))
require.NoError(t, err)
defer func() {
require.NoError(t, f.Close())
}()
_, err = f.Write([]byte{0, 1, 2, 3})
require.NoError(t, err)
err = store.VerifyStorageDir(ident0.ID)
require.Contains(t, err.Error(), "content of file is not a valid node ID")
// test file overwrite returns no error
require.NoError(t, dir.CreateVerificationFile(ident0.ID))
require.NoError(t, store.VerifyStorageDir(ident0.ID))
}