storj/storagenode/pieces/store_test.go

558 lines
19 KiB
Go
Raw Normal View History

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package pieces_test
import (
"bytes"
"context"
"encoding/hex"
"io"
"io/ioutil"
"os"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"storj.io/storj/internal/memory"
"storj.io/storj/internal/testcontext"
2019-04-08 19:15:19 +01:00
"storj.io/storj/internal/testidentity"
"storj.io/storj/internal/testrand"
"storj.io/storj/pkg/pb"
2019-04-08 19:15:19 +01:00
"storj.io/storj/pkg/pkcrypto"
"storj.io/storj/pkg/signing"
2019-04-08 19:15:19 +01:00
"storj.io/storj/pkg/storj"
"storj.io/storj/storage"
"storj.io/storj/storage/filestore"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/pieces"
"storj.io/storj/storagenode/storagenodedb/storagenodedbtest"
)
func TestPieces(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
dir, err := filestore.NewDir(ctx.Dir("pieces"))
require.NoError(t, err)
blobs := filestore.New(zaptest.NewLogger(t), dir)
defer ctx.Check(blobs.Close)
store := pieces.NewStore(zaptest.NewLogger(t), blobs, nil, nil, nil)
2019-04-08 19:15:19 +01:00
satelliteID := testidentity.MustPregeneratedSignedIdentity(0, storj.LatestIDVersion()).ID
pieceID := storj.NewPieceID()
source := testrand.Bytes(8000)
{ // write data
writer, err := store.Writer(ctx, satelliteID, pieceID)
require.NoError(t, err)
n, err := io.Copy(writer, bytes.NewReader(source))
require.NoError(t, err)
assert.Equal(t, len(source), int(n))
assert.Equal(t, len(source), int(writer.Size()))
// verify hash
hash := pkcrypto.NewHash()
_, _ = hash.Write(source)
assert.Equal(t, hash.Sum(nil), writer.Hash())
// commit
require.NoError(t, writer.Commit(ctx, &pb.PieceHeader{}))
// after commit we should be able to call cancel without an error
require.NoError(t, writer.Cancel(ctx))
}
{ // valid reads
read := func(offset, length int64) []byte {
reader, err := store.Reader(ctx, satelliteID, pieceID)
require.NoError(t, err)
pos, err := reader.Seek(offset, io.SeekStart)
require.NoError(t, err)
require.Equal(t, offset, pos)
data := make([]byte, length)
n, err := io.ReadFull(reader, data)
require.NoError(t, err)
require.Equal(t, int(length), n)
require.NoError(t, reader.Close())
return data
}
require.Equal(t, source[10:11], read(10, 1))
require.Equal(t, source[10:1010], read(10, 1000))
require.Equal(t, source, read(0, int64(len(source))))
}
{ // reading ends with io.EOF
reader, err := store.Reader(ctx, satelliteID, pieceID)
require.NoError(t, err)
data := make([]byte, 111)
for {
_, err := reader.Read(data)
if err != nil {
if err == io.EOF {
break
}
require.NoError(t, err)
}
}
require.NoError(t, reader.Close())
}
{ // test delete
assert.NoError(t, store.Delete(ctx, satelliteID, pieceID))
// read should now fail
_, err := store.Reader(ctx, satelliteID, pieceID)
assert.Error(t, err)
}
{ // write cancel
cancelledPieceID := storj.NewPieceID()
writer, err := store.Writer(ctx, satelliteID, cancelledPieceID)
require.NoError(t, err)
n, err := io.Copy(writer, bytes.NewReader(source))
require.NoError(t, err)
assert.Equal(t, len(source), int(n))
assert.Equal(t, len(source), int(writer.Size()))
// cancel writing
require.NoError(t, writer.Cancel(ctx))
// commit should not fail
require.Error(t, writer.Commit(ctx, &pb.PieceHeader{}))
// read should fail
_, err = store.Reader(ctx, satelliteID, cancelledPieceID)
assert.Error(t, err)
}
}
func writeAPiece(ctx context.Context, t testing.TB, store *pieces.Store, satelliteID storj.NodeID, pieceID storj.PieceID, data []byte, atTime time.Time, expireTime *time.Time, formatVersion storage.FormatVersion) {
tStore := &pieces.StoreForTest{store}
writer, err := tStore.WriterForFormatVersion(ctx, satelliteID, pieceID, formatVersion)
require.NoError(t, err)
_, err = writer.Write(data)
require.NoError(t, err)
size := writer.Size()
assert.Equal(t, int64(len(data)), size)
limit := pb.OrderLimit{}
if expireTime != nil {
limit.PieceExpiration = *expireTime
}
err = writer.Commit(ctx, &pb.PieceHeader{
Hash: writer.Hash(),
CreationTime: atTime,
OrderLimit: limit,
})
require.NoError(t, err)
}
func verifyPieceHandle(t testing.TB, reader *pieces.Reader, expectDataLen int, expectCreateTime time.Time, expectFormat storage.FormatVersion) {
assert.Equal(t, expectFormat, reader.StorageFormatVersion())
assert.Equal(t, int64(expectDataLen), reader.Size())
if expectFormat != filestore.FormatV0 {
pieceHeader, err := reader.GetPieceHeader()
require.NoError(t, err)
assert.Equal(t, expectFormat, storage.FormatVersion(pieceHeader.FormatVersion))
assert.Equal(t, expectCreateTime.UTC(), pieceHeader.CreationTime.UTC())
}
}
func tryOpeningAPiece(ctx context.Context, t testing.TB, store *pieces.Store, satelliteID storj.NodeID, pieceID storj.PieceID, expectDataLen int, expectTime time.Time, expectFormat storage.FormatVersion) {
reader, err := store.Reader(ctx, satelliteID, pieceID)
require.NoError(t, err)
verifyPieceHandle(t, reader, expectDataLen, expectTime, expectFormat)
require.NoError(t, reader.Close())
reader, err = store.ReaderWithStorageFormat(ctx, satelliteID, pieceID, expectFormat)
require.NoError(t, err)
verifyPieceHandle(t, reader, expectDataLen, expectTime, expectFormat)
require.NoError(t, reader.Close())
}
func TestPieceVersionMigrate(t *testing.T) {
storagenodedbtest.Run(t, func(t *testing.T, db storagenode.DB) {
const pieceSize = 1024
ctx := testcontext.New(t)
defer ctx.Cleanup()
var (
data = testrand.Bytes(pieceSize)
satelliteID = testrand.NodeID()
pieceID = testrand.PieceID()
now = time.Now().UTC()
)
// Initialize pub/priv keys for signing piece hash
publicKeyBytes, err := hex.DecodeString("01eaebcb418cd629d4c01f365f33006c9de3ce70cf04da76c39cdc993f48fe53")
require.NoError(t, err)
privateKeyBytes, err := hex.DecodeString("afefcccadb3d17b1f241b7c83f88c088b54c01b5a25409c13cbeca6bfa22b06901eaebcb418cd629d4c01f365f33006c9de3ce70cf04da76c39cdc993f48fe53")
require.NoError(t, err)
publicKey, err := storj.PiecePublicKeyFromBytes(publicKeyBytes)
require.NoError(t, err)
privateKey, err := storj.PiecePrivateKeyFromBytes(privateKeyBytes)
require.NoError(t, err)
ol := &pb.OrderLimit{
SerialNumber: testrand.SerialNumber(),
SatelliteId: satelliteID,
StorageNodeId: testrand.NodeID(),
PieceId: pieceID,
SatelliteSignature: []byte("sig"),
Limit: 100,
Action: pb.PieceAction_GET,
PieceExpiration: now,
OrderExpiration: now,
OrderCreation: now,
}
olPieceInfo := *ol
v0PieceInfo, ok := db.V0PieceInfo().(pieces.V0PieceInfoDBForTest)
require.True(t, ok, "V0PieceInfoDB can not satisfy V0PieceInfoDBForTest")
blobs, err := filestore.NewAt(zaptest.NewLogger(t), ctx.Dir("store"))
require.NoError(t, err)
defer ctx.Check(blobs.Close)
store := pieces.NewStore(zaptest.NewLogger(t), blobs, v0PieceInfo, nil, nil)
// write as a v0 piece
tStore := &pieces.StoreForTest{store}
writer, err := tStore.WriterForFormatVersion(ctx, satelliteID, pieceID, filestore.FormatV0)
require.NoError(t, err)
_, err = writer.Write(data)
require.NoError(t, err)
assert.Equal(t, int64(len(data)), writer.Size())
err = writer.Commit(ctx, &pb.PieceHeader{
Hash: writer.Hash(),
CreationTime: now,
OrderLimit: olPieceInfo,
})
require.NoError(t, err)
// Create PieceHash from the v0 piece written
ph := &pb.PieceHash{
PieceId: pieceID,
Hash: writer.Hash(),
PieceSize: writer.Size(),
Timestamp: now,
}
// sign v0 pice hash
signedPhPieceInfo, err := signing.SignUplinkPieceHash(ctx, privateKey, ph)
require.NoError(t, err)
// Create v0 pieces.Info and add to v0 store
pieceInfo := pieces.Info{
SatelliteID: satelliteID,
PieceID: pieceID,
PieceSize: writer.Size(),
OrderLimit: &olPieceInfo,
PieceCreation: now,
UplinkPieceHash: signedPhPieceInfo,
}
require.NoError(t, v0PieceInfo.Add(ctx, &pieceInfo))
// verify piece can be opened as v0
tryOpeningAPiece(ctx, t, store, satelliteID, pieceID, len(data), now, filestore.FormatV0)
// run migration
require.NoError(t, store.MigrateV0ToV1(ctx, satelliteID, pieceID))
// open as v1 piece
tryOpeningAPiece(ctx, t, store, satelliteID, pieceID, len(data), now, filestore.FormatV1)
// manually read v1 piece
reader, err := store.ReaderWithStorageFormat(ctx, satelliteID, pieceID, filestore.FormatV1)
require.NoError(t, err)
// generate v1 pieceHash and verify signature is still valid
v1Header, err := reader.GetPieceHeader()
require.NoError(t, err)
v1PieceHash := pb.PieceHash{
PieceId: v1Header.OrderLimit.PieceId,
Hash: v1Header.GetHash(),
PieceSize: reader.Size(),
Timestamp: v1Header.GetCreationTime(),
Signature: v1Header.GetSignature(),
}
require.NoError(t, signing.VerifyUplinkPieceHashSignature(ctx, publicKey, &v1PieceHash))
require.Equal(t, signedPhPieceInfo.GetSignature(), v1PieceHash.GetSignature())
require.Equal(t, *ol, v1Header.OrderLimit)
// Verify that it was deleted from v0PieceInfo
retrivedInfo, err := v0PieceInfo.Get(ctx, satelliteID, pieceID)
require.Error(t, err)
require.Nil(t, retrivedInfo)
})
}
// Test that the piece store can still read V0 pieces that might be left over from a previous
// version, as well as V1 pieces.
func TestMultipleStorageFormatVersions(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
blobs, err := filestore.NewAt(zaptest.NewLogger(t), ctx.Dir("store"))
require.NoError(t, err)
defer ctx.Check(blobs.Close)
store := pieces.NewStore(zaptest.NewLogger(t), blobs, nil, nil, nil)
const pieceSize = 1024
var (
data = testrand.Bytes(pieceSize)
satellite = testrand.NodeID()
v0PieceID = testrand.PieceID()
v1PieceID = testrand.PieceID()
now = time.Now().UTC()
)
// write a V0 piece
writeAPiece(ctx, t, store, satellite, v0PieceID, data, now, nil, filestore.FormatV0)
// write a V1 piece
writeAPiece(ctx, t, store, satellite, v1PieceID, data, now, nil, filestore.FormatV1)
// look up the different pieces with Reader and ReaderWithStorageFormat
tryOpeningAPiece(ctx, t, store, satellite, v0PieceID, len(data), now, filestore.FormatV0)
tryOpeningAPiece(ctx, t, store, satellite, v1PieceID, len(data), now, filestore.FormatV1)
// write a V1 piece with the same ID as the V0 piece (to simulate it being rewritten as
// V1 during a migration)
differentData := append(data, 111, 104, 97, 105)
writeAPiece(ctx, t, store, satellite, v0PieceID, differentData, now, nil, filestore.FormatV1)
// if we try to access the piece at that key, we should see only the V1 piece
tryOpeningAPiece(ctx, t, store, satellite, v0PieceID, len(differentData), now, filestore.FormatV1)
// unless we ask specifically for a V0 piece
reader, err := store.ReaderWithStorageFormat(ctx, satellite, v0PieceID, filestore.FormatV0)
require.NoError(t, err)
verifyPieceHandle(t, reader, len(data), now, filestore.FormatV0)
require.NoError(t, reader.Close())
// delete the v0PieceID; both the V0 and the V1 pieces should go away
err = store.Delete(ctx, satellite, v0PieceID)
require.NoError(t, err)
reader, err = store.Reader(ctx, satellite, v0PieceID)
require.Error(t, err)
require.True(t, os.IsNotExist(err))
assert.Nil(t, reader)
}
func TestGetExpired(t *testing.T) {
storagenodedbtest.Run(t, func(t *testing.T, db storagenode.DB) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
v0PieceInfo, ok := db.V0PieceInfo().(pieces.V0PieceInfoDBForTest)
require.True(t, ok, "V0PieceInfoDB can not satisfy V0PieceInfoDBForTest")
expirationInfo := db.PieceExpirationDB()
store := pieces.NewStore(zaptest.NewLogger(t), db.Pieces(), v0PieceInfo, expirationInfo, db.PieceSpaceUsedDB())
now := time.Now().UTC()
testDates := []struct {
years, months, days int
}{
{-20, -1, -2},
{1, 6, 14},
{0, -1, 0},
{0, 0, 1},
}
testPieces := make([]pieces.Info, 4)
for p := range testPieces {
testPieces[p] = pieces.Info{
SatelliteID: testrand.NodeID(),
PieceID: testrand.PieceID(),
OrderLimit: &pb.OrderLimit{},
UplinkPieceHash: &pb.PieceHash{},
PieceExpiration: now.AddDate(testDates[p].years, testDates[p].months, testDates[p].days),
}
}
// put testPieces 0 and 1 in the v0 pieceinfo db
err := v0PieceInfo.Add(ctx, &testPieces[0])
require.NoError(t, err)
err = v0PieceInfo.Add(ctx, &testPieces[1])
require.NoError(t, err)
// put testPieces 2 and 3 in the piece_expirations db
err = expirationInfo.SetExpiration(ctx, testPieces[2].SatelliteID, testPieces[2].PieceID, testPieces[2].PieceExpiration)
require.NoError(t, err)
err = expirationInfo.SetExpiration(ctx, testPieces[3].SatelliteID, testPieces[3].PieceID, testPieces[3].PieceExpiration)
require.NoError(t, err)
// GetExpired with limit 0 gives empty result
expired, err := store.GetExpired(ctx, now, 0)
require.NoError(t, err)
assert.Empty(t, expired)
// GetExpired with limit 1 gives only 1 result, although there are 2 possible
expired, err = store.GetExpired(ctx, now, 1)
require.NoError(t, err)
require.Len(t, expired, 1)
assert.Equal(t, testPieces[2].PieceID, expired[0].PieceID)
assert.Equal(t, testPieces[2].SatelliteID, expired[0].SatelliteID)
assert.False(t, expired[0].InPieceInfo)
// GetExpired with 2 or more gives all expired results correctly; one from
// piece_expirations, and one from pieceinfo
expired, err = store.GetExpired(ctx, now, 1000)
require.NoError(t, err)
require.Len(t, expired, 2)
assert.Equal(t, testPieces[2].PieceID, expired[0].PieceID)
assert.Equal(t, testPieces[2].SatelliteID, expired[0].SatelliteID)
assert.False(t, expired[0].InPieceInfo)
assert.Equal(t, testPieces[0].PieceID, expired[1].PieceID)
assert.Equal(t, testPieces[0].SatelliteID, expired[1].SatelliteID)
assert.True(t, expired[1].InPieceInfo)
})
}
func TestOverwriteV0WithV1(t *testing.T) {
storagenodedbtest.Run(t, func(t *testing.T, db storagenode.DB) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
v0PieceInfo, ok := db.V0PieceInfo().(pieces.V0PieceInfoDBForTest)
require.True(t, ok, "V0PieceInfoDB can not satisfy V0PieceInfoDBForTest")
expirationInfo := db.PieceExpirationDB()
store := pieces.NewStore(zaptest.NewLogger(t), db.Pieces(), v0PieceInfo, expirationInfo, db.PieceSpaceUsedDB())
satelliteID := testrand.NodeID()
pieceID := testrand.PieceID()
v0Data := testrand.Bytes(4 * memory.MiB)
v1Data := testrand.Bytes(3 * memory.MiB)
// write the piece as V0. We can't provide the expireTime via writeAPiece, because
// BlobWriter.Commit only knows how to store expiration times in piece_expirations.
v0CreateTime := time.Now().UTC()
v0ExpireTime := v0CreateTime.AddDate(5, 0, 0)
writeAPiece(ctx, t, store, satelliteID, pieceID, v0Data, v0CreateTime, nil, filestore.FormatV0)
// now put the piece in the pieceinfo db directly, because store won't do that for us.
// this is where the expireTime takes effect.
err := v0PieceInfo.Add(ctx, &pieces.Info{
SatelliteID: satelliteID,
PieceID: pieceID,
PieceSize: int64(len(v0Data)),
PieceCreation: v0CreateTime,
PieceExpiration: v0ExpireTime,
OrderLimit: &pb.OrderLimit{},
UplinkPieceHash: &pb.PieceHash{},
})
require.NoError(t, err)
// ensure we can see it via store.Reader
{
reader, err := store.Reader(ctx, satelliteID, pieceID)
require.NoError(t, err)
assert.Equal(t, int64(len(v0Data)), reader.Size())
assert.Equal(t, filestore.FormatV0, reader.StorageFormatVersion())
gotData, err := ioutil.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, v0Data, gotData)
require.NoError(t, reader.Close())
}
// ensure we can see it via WalkSatellitePieces
calledTimes := 0
err = store.WalkSatellitePieces(ctx, satelliteID, func(access pieces.StoredPieceAccess) error {
calledTimes++
require.Equal(t, 1, calledTimes)
gotCreateTime, err := access.CreationTime(ctx)
require.NoError(t, err)
assert.Equal(t, v0CreateTime, gotCreateTime)
gotSize, err := access.ContentSize(ctx)
require.NoError(t, err)
assert.Equal(t, int64(len(v0Data)), gotSize)
return nil
})
require.NoError(t, err)
// now "overwrite" the piece (write a new blob with the same id, but with V1 storage)
v1CreateTime := time.Now().UTC()
v1ExpireTime := v1CreateTime.AddDate(5, 0, 0)
writeAPiece(ctx, t, store, satelliteID, pieceID, v1Data, v1CreateTime, &v1ExpireTime, filestore.FormatV1)
// ensure we can see it (the new piece) via store.Reader
{
reader, err := store.Reader(ctx, satelliteID, pieceID)
require.NoError(t, err)
assert.Equal(t, int64(len(v1Data)), reader.Size())
assert.Equal(t, filestore.FormatV1, reader.StorageFormatVersion())
gotData, err := ioutil.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, v1Data, gotData)
require.NoError(t, reader.Close())
}
// now _both_ pieces should show up under WalkSatellitePieces. this may
// be counter-intuitive, but the V0 piece still exists for now (so we can avoid
// hitting the pieceinfo db with every new piece write). I believe this is OK, because
// (a) I don't think that writing different pieces with the same piece ID is a normal
// use case, unless we make a V0->V1 migrator tool, which should know about these
// semantics; (b) the V0 piece should not ever become visible again to the user; it
// should not be possible under normal conditions to delete one without deleting the
// other.
calledTimes = 0
err = store.WalkSatellitePieces(ctx, satelliteID, func(access pieces.StoredPieceAccess) error {
calledTimes++
switch calledTimes {
case 1:
// expect the V1 piece
assert.Equal(t, pieceID, access.PieceID())
assert.Equal(t, filestore.FormatV1, access.StorageFormatVersion())
gotCreateTime, err := access.CreationTime(ctx)
require.NoError(t, err)
assert.Equal(t, v1CreateTime, gotCreateTime)
gotSize, err := access.ContentSize(ctx)
require.NoError(t, err)
assert.Equal(t, int64(len(v1Data)), gotSize)
case 2:
// expect the V0 piece
assert.Equal(t, pieceID, access.PieceID())
assert.Equal(t, filestore.FormatV0, access.StorageFormatVersion())
gotCreateTime, err := access.CreationTime(ctx)
require.NoError(t, err)
assert.Equal(t, v0CreateTime, gotCreateTime)
gotSize, err := access.ContentSize(ctx)
require.NoError(t, err)
assert.Equal(t, int64(len(v0Data)), gotSize)
default:
t.Fatalf("calledTimes should be 1 or 2, but it is %d", calledTimes)
}
return nil
})
require.NoError(t, err)
// delete the pieceID; this should get both V0 and V1
err = store.Delete(ctx, satelliteID, pieceID)
require.NoError(t, err)
err = store.WalkSatellitePieces(ctx, satelliteID, func(access pieces.StoredPieceAccess) error {
t.Fatalf("this should not have been called. pieceID=%x, format=%d", access.PieceID(), access.StorageFormatVersion())
return nil
})
require.NoError(t, err)
})
}