storj/storagenode/pieces/store_test.go

879 lines
29 KiB
Go
Raw Normal View History

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package pieces_test
import (
"bytes"
"context"
"encoding/hex"
"io"
"io/ioutil"
"os"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"storj.io/common/identity/testidentity"
"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/pkcrypto"
"storj.io/common/rpc"
"storj.io/common/signing"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/storage"
"storj.io/storj/storage/filestore"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/pieces"
"storj.io/storj/storagenode/storagenodedb/storagenodedbtest"
"storj.io/storj/storagenode/trust"
)
func TestPieces(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
dir, err := filestore.NewDir(ctx.Dir("pieces"))
require.NoError(t, err)
blobs := filestore.New(zaptest.NewLogger(t), dir)
defer ctx.Check(blobs.Close)
store := pieces.NewStore(zaptest.NewLogger(t), blobs, nil, nil, nil)
2019-04-08 19:15:19 +01:00
satelliteID := testidentity.MustPregeneratedSignedIdentity(0, storj.LatestIDVersion()).ID
pieceID := storj.NewPieceID()
source := testrand.Bytes(8000)
{ // write data
writer, err := store.Writer(ctx, satelliteID, pieceID)
require.NoError(t, err)
n, err := io.Copy(writer, bytes.NewReader(source))
require.NoError(t, err)
assert.Equal(t, len(source), int(n))
assert.Equal(t, len(source), int(writer.Size()))
// verify hash
hash := pkcrypto.NewHash()
_, _ = hash.Write(source)
assert.Equal(t, hash.Sum(nil), writer.Hash())
// commit
require.NoError(t, writer.Commit(ctx, &pb.PieceHeader{}))
// after commit we should be able to call cancel without an error
require.NoError(t, writer.Cancel(ctx))
}
{ // valid reads
read := func(offset, length int64) []byte {
reader, err := store.Reader(ctx, satelliteID, pieceID)
require.NoError(t, err)
pos, err := reader.Seek(offset, io.SeekStart)
require.NoError(t, err)
require.Equal(t, offset, pos)
data := make([]byte, length)
n, err := io.ReadFull(reader, data)
require.NoError(t, err)
require.Equal(t, int(length), n)
require.NoError(t, reader.Close())
return data
}
require.Equal(t, source[10:11], read(10, 1))
require.Equal(t, source[10:1010], read(10, 1000))
require.Equal(t, source, read(0, int64(len(source))))
}
{ // reading ends with io.EOF
reader, err := store.Reader(ctx, satelliteID, pieceID)
require.NoError(t, err)
data := make([]byte, 111)
for {
_, err := reader.Read(data)
if err != nil {
if err == io.EOF {
break
}
require.NoError(t, err)
}
}
require.NoError(t, reader.Close())
}
{ // test delete
assert.NoError(t, store.Delete(ctx, satelliteID, pieceID))
// read should now fail
_, err := store.Reader(ctx, satelliteID, pieceID)
assert.Error(t, err)
}
{ // write cancel
cancelledPieceID := storj.NewPieceID()
writer, err := store.Writer(ctx, satelliteID, cancelledPieceID)
require.NoError(t, err)
n, err := io.Copy(writer, bytes.NewReader(source))
require.NoError(t, err)
assert.Equal(t, len(source), int(n))
assert.Equal(t, len(source), int(writer.Size()))
// cancel writing
require.NoError(t, writer.Cancel(ctx))
// commit should not fail
require.Error(t, writer.Commit(ctx, &pb.PieceHeader{}))
// read should fail
_, err = store.Reader(ctx, satelliteID, cancelledPieceID)
assert.Error(t, err)
}
}
func writeAPiece(ctx context.Context, t testing.TB, store *pieces.Store, satelliteID storj.NodeID, pieceID storj.PieceID, data []byte, atTime time.Time, expireTime *time.Time, formatVersion storage.FormatVersion) {
tStore := &pieces.StoreForTest{store}
writer, err := tStore.WriterForFormatVersion(ctx, satelliteID, pieceID, formatVersion)
require.NoError(t, err)
_, err = writer.Write(data)
require.NoError(t, err)
size := writer.Size()
assert.Equal(t, int64(len(data)), size)
limit := pb.OrderLimit{}
if expireTime != nil {
limit.PieceExpiration = *expireTime
}
err = writer.Commit(ctx, &pb.PieceHeader{
Hash: writer.Hash(),
CreationTime: atTime,
OrderLimit: limit,
})
require.NoError(t, err)
}
func verifyPieceHandle(t testing.TB, reader *pieces.Reader, expectDataLen int, expectCreateTime time.Time, expectFormat storage.FormatVersion) {
assert.Equal(t, expectFormat, reader.StorageFormatVersion())
assert.Equal(t, int64(expectDataLen), reader.Size())
if expectFormat != filestore.FormatV0 {
pieceHeader, err := reader.GetPieceHeader()
require.NoError(t, err)
assert.Equal(t, expectFormat, storage.FormatVersion(pieceHeader.FormatVersion))
assert.True(t, expectCreateTime.Equal(pieceHeader.CreationTime))
}
}
func tryOpeningAPiece(ctx context.Context, t testing.TB, store *pieces.Store, satelliteID storj.NodeID, pieceID storj.PieceID, expectDataLen int, expectTime time.Time, expectFormat storage.FormatVersion) {
reader, err := store.Reader(ctx, satelliteID, pieceID)
require.NoError(t, err)
verifyPieceHandle(t, reader, expectDataLen, expectTime, expectFormat)
require.NoError(t, reader.Close())
reader, err = store.ReaderWithStorageFormat(ctx, satelliteID, pieceID, expectFormat)
require.NoError(t, err)
verifyPieceHandle(t, reader, expectDataLen, expectTime, expectFormat)
require.NoError(t, reader.Close())
}
func TestTrashAndRestore(t *testing.T) {
type testfile struct {
data []byte
formatVer storage.FormatVersion
}
type testpiece struct {
pieceID storj.PieceID
files []testfile
expiration time.Time
trashDur time.Duration
}
type testsatellite struct {
satelliteID storj.NodeID
pieces []testpiece
}
size := memory.KB
// Initialize pub/priv keys for signing piece hash
publicKeyBytes, err := hex.DecodeString("01eaebcb418cd629d4c01f365f33006c9de3ce70cf04da76c39cdc993f48fe53")
require.NoError(t, err)
privateKeyBytes, err := hex.DecodeString("afefcccadb3d17b1f241b7c83f88c088b54c01b5a25409c13cbeca6bfa22b06901eaebcb418cd629d4c01f365f33006c9de3ce70cf04da76c39cdc993f48fe53")
require.NoError(t, err)
publicKey, err := storj.PiecePublicKeyFromBytes(publicKeyBytes)
require.NoError(t, err)
privateKey, err := storj.PiecePrivateKeyFromBytes(privateKeyBytes)
require.NoError(t, err)
trashDurToBeEmptied := 7 * 24 * time.Hour
trashDurToBeKept := 3 * 24 * time.Hour
satellites := []testsatellite{
{
satelliteID: testrand.NodeID(),
pieces: []testpiece{
{
expiration: time.Time{}, // no expiration
pieceID: testrand.PieceID(),
trashDur: trashDurToBeEmptied,
files: []testfile{
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV0,
},
},
},
{
pieceID: testrand.PieceID(),
expiration: time.Now().Add(24 * time.Hour),
trashDur: trashDurToBeEmptied,
files: []testfile{
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV1,
},
},
},
{
pieceID: testrand.PieceID(),
expiration: time.Now().Add(24 * time.Hour),
trashDur: trashDurToBeKept,
files: []testfile{
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV1,
},
},
},
{
pieceID: testrand.PieceID(),
expiration: time.Now().Add(24 * time.Hour),
trashDur: trashDurToBeKept,
files: []testfile{
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV1,
},
},
},
{
pieceID: testrand.PieceID(),
expiration: time.Now().Add(24 * time.Hour),
trashDur: trashDurToBeKept,
files: []testfile{
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV0,
},
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV1,
},
},
},
},
},
{
satelliteID: testrand.NodeID(),
pieces: []testpiece{
{
pieceID: testrand.PieceID(),
expiration: time.Now().Add(24 * time.Hour),
trashDur: trashDurToBeKept,
files: []testfile{
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV1,
},
},
},
{
pieceID: testrand.PieceID(),
expiration: time.Now().Add(24 * time.Hour),
trashDur: trashDurToBeEmptied,
files: []testfile{
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV1,
},
},
},
},
},
}
storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) {
dir, err := filestore.NewDir(ctx.Dir("store"))
require.NoError(t, err)
blobs := filestore.New(zaptest.NewLogger(t), dir)
require.NoError(t, err)
defer ctx.Check(blobs.Close)
v0PieceInfo, ok := db.V0PieceInfo().(pieces.V0PieceInfoDBForTest)
require.True(t, ok, "V0PieceInfoDB can not satisfy V0PieceInfoDBForTest")
store := pieces.NewStore(zaptest.NewLogger(t), blobs, v0PieceInfo, db.PieceExpirationDB(), nil)
tStore := &pieces.StoreForTest{store}
var satelliteURLs []trust.SatelliteURL
for i, satellite := range satellites {
// host:port pair must be unique or the trust pool will aggregate
// them into a single entry with the first one "winning".
satelliteURLs = append(satelliteURLs, trust.SatelliteURL{
ID: satellite.satelliteID,
Host: "localhost",
Port: i,
})
now := time.Now()
for _, piece := range satellite.pieces {
// If test has expiration, add to expiration db
if !piece.expiration.IsZero() {
require.NoError(t, store.SetExpiration(ctx, satellite.satelliteID, piece.pieceID, piece.expiration))
}
for _, file := range piece.files {
w, err := tStore.WriterForFormatVersion(ctx, satellite.satelliteID, piece.pieceID, file.formatVer)
require.NoError(t, err)
_, err = w.Write(file.data)
require.NoError(t, err)
// Create, sign, and commit piece hash (to piece or v0PieceInfo)
pieceHash := &pb.PieceHash{
PieceId: piece.pieceID,
Hash: w.Hash(),
PieceSize: w.Size(),
Timestamp: now,
}
signedPieceHash, err := signing.SignUplinkPieceHash(ctx, privateKey, pieceHash)
require.NoError(t, err)
require.NoError(t, w.Commit(ctx, &pb.PieceHeader{
Hash: signedPieceHash.GetHash(),
CreationTime: signedPieceHash.GetTimestamp(),
Signature: signedPieceHash.GetSignature(),
}))
if file.formatVer == filestore.FormatV0 {
err = v0PieceInfo.Add(ctx, &pieces.Info{
SatelliteID: satellite.satelliteID,
PieceID: piece.pieceID,
PieceSize: signedPieceHash.GetPieceSize(),
PieceCreation: signedPieceHash.GetTimestamp(),
OrderLimit: &pb.OrderLimit{},
UplinkPieceHash: signedPieceHash,
})
require.NoError(t, err)
}
// Verify piece matches data, has correct signature and expiration
verifyPieceData(ctx, t, store, satellite.satelliteID, piece.pieceID, file.formatVer, file.data, piece.expiration, publicKey)
}
trashDurToUse := piece.trashDur
dir.ReplaceTrashnow(func() time.Time {
return time.Now().Add(-trashDurToUse)
})
// Trash the piece
require.NoError(t, store.Trash(ctx, satellite.satelliteID, piece.pieceID))
// Confirm is missing
r, err := store.Reader(ctx, satellite.satelliteID, piece.pieceID)
require.Error(t, err)
require.Nil(t, r)
// Verify no expiry information is returned for this piece
if !piece.expiration.IsZero() {
infos, err := store.GetExpired(ctx, time.Now().Add(720*time.Hour), 1000)
require.NoError(t, err)
var found bool
for _, info := range infos {
if info.SatelliteID == satellite.satelliteID && info.PieceID == piece.pieceID {
found = true
}
}
require.False(t, found)
}
}
}
// Initialize a trust pool
poolConfig := trust.Config{
CachePath: ctx.File("trust-cache.json"),
}
for _, satelliteURL := range satelliteURLs {
poolConfig.Sources = append(poolConfig.Sources, &trust.StaticURLSource{URL: satelliteURL})
}
trust, err := trust.NewPool(zaptest.NewLogger(t), trust.Dialer(rpc.Dialer{}), poolConfig)
require.NoError(t, err)
require.NoError(t, trust.Refresh(ctx))
// Empty trash by running the chore once
trashDur := 4 * 24 * time.Hour
chore := pieces.NewTrashChore(zaptest.NewLogger(t), 24*time.Hour, trashDur, trust, store)
ctx.Go(func() error {
return chore.Run(ctx)
})
chore.TriggerWait(ctx)
require.NoError(t, chore.Close())
// Restore all pieces in the first satellite
require.NoError(t, store.RestoreTrash(ctx, satellites[0].satelliteID))
// Check that each piece for first satellite is back, that they are
// MaxFormatVersionSupported (regardless of which version they began
// with), and that signature matches.
for _, piece := range satellites[0].pieces {
if piece.trashDur < trashDur {
// Expect the piece to be there
lastFile := piece.files[len(piece.files)-1]
verifyPieceData(ctx, t, store, satellites[0].satelliteID, piece.pieceID, filestore.MaxFormatVersionSupported, lastFile.data, piece.expiration, publicKey)
} else {
// Expect the piece to be missing, it should be removed from the trash on EmptyTrash
r, err := store.Reader(ctx, satellites[1].satelliteID, piece.pieceID)
require.Error(t, err)
require.Nil(t, r)
}
}
// Confirm 2nd satellite pieces are still in the trash
for _, piece := range satellites[1].pieces {
r, err := store.Reader(ctx, satellites[1].satelliteID, piece.pieceID)
require.Error(t, err)
require.Nil(t, r)
}
// Restore satellite[1] and make sure they're back (confirming they were not deleted on EmptyTrash)
require.NoError(t, store.RestoreTrash(ctx, satellites[1].satelliteID))
for _, piece := range satellites[1].pieces {
if piece.trashDur < trashDur {
// Expect the piece to be there
lastFile := piece.files[len(piece.files)-1]
verifyPieceData(ctx, t, store, satellites[1].satelliteID, piece.pieceID, filestore.MaxFormatVersionSupported, lastFile.data, piece.expiration, publicKey)
} else {
// Expect the piece to be missing, it should be removed from the trash on EmptyTrash
r, err := store.Reader(ctx, satellites[1].satelliteID, piece.pieceID)
require.Error(t, err)
require.Nil(t, r)
}
}
})
}
func verifyPieceData(ctx context.Context, t testing.TB, store *pieces.Store, satelliteID storj.NodeID, pieceID storj.PieceID, formatVer storage.FormatVersion, expected []byte, expiration time.Time, publicKey storj.PiecePublicKey) {
r, err := store.ReaderWithStorageFormat(ctx, satelliteID, pieceID, formatVer)
require.NoError(t, err)
// Get piece hash, verify signature
var pieceHash *pb.PieceHash
if formatVer > filestore.FormatV0 {
header, err := r.GetPieceHeader()
require.NoError(t, err)
pieceHash = &pb.PieceHash{
PieceId: pieceID,
Hash: header.GetHash(),
PieceSize: r.Size(),
Timestamp: header.GetCreationTime(),
Signature: header.GetSignature(),
}
} else {
info, err := store.GetV0PieceInfo(ctx, satelliteID, pieceID)
require.NoError(t, err)
pieceHash = info.UplinkPieceHash
}
require.NoError(t, signing.VerifyUplinkPieceHashSignature(ctx, publicKey, pieceHash))
// Require piece data to match expected
buf, err := ioutil.ReadAll(r)
require.NoError(t, err)
require.NoError(t, r.Close())
assert.True(t, bytes.Equal(buf, expected))
// Require expiration to match expected
infos, err := store.GetExpired(ctx, time.Now().Add(720*time.Hour), 1000)
require.NoError(t, err)
var found bool
for _, info := range infos {
if info.SatelliteID == satelliteID && info.PieceID == pieceID {
found = true
}
}
if expiration.IsZero() {
require.False(t, found)
} else {
require.True(t, found)
}
}
func TestPieceVersionMigrate(t *testing.T) {
storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) {
const pieceSize = 1024
var (
data = testrand.Bytes(pieceSize)
satelliteID = testrand.NodeID()
pieceID = testrand.PieceID()
now = time.Now().UTC() // we use UTC here to make struct equality testing easier
)
// Initialize pub/priv keys for signing piece hash
publicKeyBytes, err := hex.DecodeString("01eaebcb418cd629d4c01f365f33006c9de3ce70cf04da76c39cdc993f48fe53")
require.NoError(t, err)
privateKeyBytes, err := hex.DecodeString("afefcccadb3d17b1f241b7c83f88c088b54c01b5a25409c13cbeca6bfa22b06901eaebcb418cd629d4c01f365f33006c9de3ce70cf04da76c39cdc993f48fe53")
require.NoError(t, err)
publicKey, err := storj.PiecePublicKeyFromBytes(publicKeyBytes)
require.NoError(t, err)
privateKey, err := storj.PiecePrivateKeyFromBytes(privateKeyBytes)
require.NoError(t, err)
ol := &pb.OrderLimit{
SerialNumber: testrand.SerialNumber(),
SatelliteId: satelliteID,
StorageNodeId: testrand.NodeID(),
PieceId: pieceID,
SatelliteSignature: []byte("sig"),
Limit: 100,
Action: pb.PieceAction_GET,
PieceExpiration: now,
OrderExpiration: now,
OrderCreation: now,
}
olPieceInfo := *ol
v0PieceInfo, ok := db.V0PieceInfo().(pieces.V0PieceInfoDBForTest)
require.True(t, ok, "V0PieceInfoDB can not satisfy V0PieceInfoDBForTest")
blobs, err := filestore.NewAt(zaptest.NewLogger(t), ctx.Dir("store"))
require.NoError(t, err)
defer ctx.Check(blobs.Close)
store := pieces.NewStore(zaptest.NewLogger(t), blobs, v0PieceInfo, nil, nil)
// write as a v0 piece
tStore := &pieces.StoreForTest{store}
writer, err := tStore.WriterForFormatVersion(ctx, satelliteID, pieceID, filestore.FormatV0)
require.NoError(t, err)
_, err = writer.Write(data)
require.NoError(t, err)
assert.Equal(t, int64(len(data)), writer.Size())
err = writer.Commit(ctx, &pb.PieceHeader{
Hash: writer.Hash(),
CreationTime: now,
OrderLimit: olPieceInfo,
})
require.NoError(t, err)
// Create PieceHash from the v0 piece written
ph := &pb.PieceHash{
PieceId: pieceID,
Hash: writer.Hash(),
PieceSize: writer.Size(),
Timestamp: now,
}
// sign v0 pice hash
signedPhPieceInfo, err := signing.SignUplinkPieceHash(ctx, privateKey, ph)
require.NoError(t, err)
// Create v0 pieces.Info and add to v0 store
pieceInfo := pieces.Info{
SatelliteID: satelliteID,
PieceID: pieceID,
PieceSize: writer.Size(),
OrderLimit: &olPieceInfo,
PieceCreation: now,
UplinkPieceHash: signedPhPieceInfo,
}
require.NoError(t, v0PieceInfo.Add(ctx, &pieceInfo))
// verify piece can be opened as v0
tryOpeningAPiece(ctx, t, store, satelliteID, pieceID, len(data), now, filestore.FormatV0)
// run migration
require.NoError(t, store.MigrateV0ToV1(ctx, satelliteID, pieceID))
// open as v1 piece
tryOpeningAPiece(ctx, t, store, satelliteID, pieceID, len(data), now, filestore.FormatV1)
// manually read v1 piece
reader, err := store.ReaderWithStorageFormat(ctx, satelliteID, pieceID, filestore.FormatV1)
require.NoError(t, err)
// generate v1 pieceHash and verify signature is still valid
v1Header, err := reader.GetPieceHeader()
require.NoError(t, err)
v1PieceHash := pb.PieceHash{
PieceId: v1Header.OrderLimit.PieceId,
Hash: v1Header.GetHash(),
PieceSize: reader.Size(),
Timestamp: v1Header.GetCreationTime(),
Signature: v1Header.GetSignature(),
}
require.NoError(t, signing.VerifyUplinkPieceHashSignature(ctx, publicKey, &v1PieceHash))
require.Equal(t, signedPhPieceInfo.GetSignature(), v1PieceHash.GetSignature())
require.Equal(t, *ol, v1Header.OrderLimit)
// Verify that it was deleted from v0PieceInfo
retrivedInfo, err := v0PieceInfo.Get(ctx, satelliteID, pieceID)
require.Error(t, err)
require.Nil(t, retrivedInfo)
})
}
// Test that the piece store can still read V0 pieces that might be left over from a previous
// version, as well as V1 pieces.
func TestMultipleStorageFormatVersions(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
blobs, err := filestore.NewAt(zaptest.NewLogger(t), ctx.Dir("store"))
require.NoError(t, err)
defer ctx.Check(blobs.Close)
store := pieces.NewStore(zaptest.NewLogger(t), blobs, nil, nil, nil)
const pieceSize = 1024
var (
data = testrand.Bytes(pieceSize)
satellite = testrand.NodeID()
v0PieceID = testrand.PieceID()
v1PieceID = testrand.PieceID()
now = time.Now()
)
// write a V0 piece
writeAPiece(ctx, t, store, satellite, v0PieceID, data, now, nil, filestore.FormatV0)
// write a V1 piece
writeAPiece(ctx, t, store, satellite, v1PieceID, data, now, nil, filestore.FormatV1)
// look up the different pieces with Reader and ReaderWithStorageFormat
tryOpeningAPiece(ctx, t, store, satellite, v0PieceID, len(data), now, filestore.FormatV0)
tryOpeningAPiece(ctx, t, store, satellite, v1PieceID, len(data), now, filestore.FormatV1)
// write a V1 piece with the same ID as the V0 piece (to simulate it being rewritten as
// V1 during a migration)
differentData := append(data, 111, 104, 97, 105)
writeAPiece(ctx, t, store, satellite, v0PieceID, differentData, now, nil, filestore.FormatV1)
// if we try to access the piece at that key, we should see only the V1 piece
tryOpeningAPiece(ctx, t, store, satellite, v0PieceID, len(differentData), now, filestore.FormatV1)
// unless we ask specifically for a V0 piece
reader, err := store.ReaderWithStorageFormat(ctx, satellite, v0PieceID, filestore.FormatV0)
require.NoError(t, err)
verifyPieceHandle(t, reader, len(data), now, filestore.FormatV0)
require.NoError(t, reader.Close())
// delete the v0PieceID; both the V0 and the V1 pieces should go away
err = store.Delete(ctx, satellite, v0PieceID)
require.NoError(t, err)
reader, err = store.Reader(ctx, satellite, v0PieceID)
require.Error(t, err)
require.True(t, os.IsNotExist(err))
assert.Nil(t, reader)
}
func TestGetExpired(t *testing.T) {
storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) {
v0PieceInfo, ok := db.V0PieceInfo().(pieces.V0PieceInfoDBForTest)
require.True(t, ok, "V0PieceInfoDB can not satisfy V0PieceInfoDBForTest")
expirationInfo := db.PieceExpirationDB()
store := pieces.NewStore(zaptest.NewLogger(t), db.Pieces(), v0PieceInfo, expirationInfo, db.PieceSpaceUsedDB())
now := time.Now()
testDates := []struct {
years, months, days int
}{
{-20, -1, -2},
{1, 6, 14},
{0, -1, 0},
{0, 0, 1},
}
testPieces := make([]pieces.Info, 4)
for p := range testPieces {
testPieces[p] = pieces.Info{
SatelliteID: testrand.NodeID(),
PieceID: testrand.PieceID(),
OrderLimit: &pb.OrderLimit{},
UplinkPieceHash: &pb.PieceHash{},
PieceExpiration: now.AddDate(testDates[p].years, testDates[p].months, testDates[p].days),
}
}
// put testPieces 0 and 1 in the v0 pieceinfo db
err := v0PieceInfo.Add(ctx, &testPieces[0])
require.NoError(t, err)
err = v0PieceInfo.Add(ctx, &testPieces[1])
require.NoError(t, err)
// put testPieces 2 and 3 in the piece_expirations db
err = expirationInfo.SetExpiration(ctx, testPieces[2].SatelliteID, testPieces[2].PieceID, testPieces[2].PieceExpiration)
require.NoError(t, err)
err = expirationInfo.SetExpiration(ctx, testPieces[3].SatelliteID, testPieces[3].PieceID, testPieces[3].PieceExpiration)
require.NoError(t, err)
// GetExpired with limit 0 gives empty result
expired, err := store.GetExpired(ctx, now, 0)
require.NoError(t, err)
assert.Empty(t, expired)
// GetExpired with limit 1 gives only 1 result, although there are 2 possible
expired, err = store.GetExpired(ctx, now, 1)
require.NoError(t, err)
require.Len(t, expired, 1)
assert.Equal(t, testPieces[2].PieceID, expired[0].PieceID)
assert.Equal(t, testPieces[2].SatelliteID, expired[0].SatelliteID)
assert.False(t, expired[0].InPieceInfo)
// GetExpired with 2 or more gives all expired results correctly; one from
// piece_expirations, and one from pieceinfo
expired, err = store.GetExpired(ctx, now, 1000)
require.NoError(t, err)
require.Len(t, expired, 2)
assert.Equal(t, testPieces[2].PieceID, expired[0].PieceID)
assert.Equal(t, testPieces[2].SatelliteID, expired[0].SatelliteID)
assert.False(t, expired[0].InPieceInfo)
assert.Equal(t, testPieces[0].PieceID, expired[1].PieceID)
assert.Equal(t, testPieces[0].SatelliteID, expired[1].SatelliteID)
assert.True(t, expired[1].InPieceInfo)
})
}
func TestOverwriteV0WithV1(t *testing.T) {
storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) {
v0PieceInfo, ok := db.V0PieceInfo().(pieces.V0PieceInfoDBForTest)
require.True(t, ok, "V0PieceInfoDB can not satisfy V0PieceInfoDBForTest")
expirationInfo := db.PieceExpirationDB()
store := pieces.NewStore(zaptest.NewLogger(t), db.Pieces(), v0PieceInfo, expirationInfo, db.PieceSpaceUsedDB())
satelliteID := testrand.NodeID()
pieceID := testrand.PieceID()
v0Data := testrand.Bytes(4 * memory.MiB)
v1Data := testrand.Bytes(3 * memory.MiB)
// write the piece as V0. We can't provide the expireTime via writeAPiece, because
// BlobWriter.Commit only knows how to store expiration times in piece_expirations.
v0CreateTime := time.Now()
v0ExpireTime := v0CreateTime.AddDate(5, 0, 0)
writeAPiece(ctx, t, store, satelliteID, pieceID, v0Data, v0CreateTime, nil, filestore.FormatV0)
// now put the piece in the pieceinfo db directly, because store won't do that for us.
// this is where the expireTime takes effect.
err := v0PieceInfo.Add(ctx, &pieces.Info{
SatelliteID: satelliteID,
PieceID: pieceID,
PieceSize: int64(len(v0Data)),
PieceCreation: v0CreateTime,
PieceExpiration: v0ExpireTime,
OrderLimit: &pb.OrderLimit{},
UplinkPieceHash: &pb.PieceHash{},
})
require.NoError(t, err)
// ensure we can see it via store.Reader
{
reader, err := store.Reader(ctx, satelliteID, pieceID)
require.NoError(t, err)
assert.Equal(t, int64(len(v0Data)), reader.Size())
assert.Equal(t, filestore.FormatV0, reader.StorageFormatVersion())
gotData, err := ioutil.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, v0Data, gotData)
require.NoError(t, reader.Close())
}
// ensure we can see it via WalkSatellitePieces
calledTimes := 0
err = store.WalkSatellitePieces(ctx, satelliteID, func(access pieces.StoredPieceAccess) error {
calledTimes++
require.Equal(t, 1, calledTimes)
gotCreateTime, err := access.CreationTime(ctx)
require.NoError(t, err)
assert.True(t, v0CreateTime.Equal(gotCreateTime))
storagenode: Include trash space when calculating space used This commit adds functionality to include the space used in the trash directory when calculating available space on the node. It also includes this trash value in the space used cache, with methods to keep the cache up-to-date as files are trashed, restored, and emptied. As part of the commit, the RestoreTrash and EmptyTrash methods have slightly changed signatures. RestoreTrash now also returns the keys that were restored, while EmptyTrash also returns the total disk space recovered. Each of these changes makes it possible to keep the cache up-to-date and know how much space is being used/recovered. Also changed is the signature of PieceStoreAccess.ContentSize method. Previously this method returns only the content size of the blob, removing the size of any header data. This method has been renamed `Size` and returns both the full disk size and content size of the blob. This allows us to only stat the file once, and in some instances (i.e. cache) knowing the full file size is useful. Note: This commit simply adds the trash size data to the piece size data we were already collecting. The piece size data is not accurate for all use-cases (e.g. because it does not contain piece header data); however, this commit does not fix that problem. Now that the ContentSize (Size) method returns the full size of the file, it should be easier to fix this problem in a future commit. Change-Id: I4a6cae09e262c8452a618116d1dc66b687f59f85
2019-12-21 13:11:24 +00:00
_, gotSize, err := access.Size(ctx)
require.NoError(t, err)
assert.Equal(t, int64(len(v0Data)), gotSize)
return nil
})
require.NoError(t, err)
// now "overwrite" the piece (write a new blob with the same id, but with V1 storage)
v1CreateTime := time.Now()
v1ExpireTime := v1CreateTime.AddDate(5, 0, 0)
writeAPiece(ctx, t, store, satelliteID, pieceID, v1Data, v1CreateTime, &v1ExpireTime, filestore.FormatV1)
// ensure we can see it (the new piece) via store.Reader
{
reader, err := store.Reader(ctx, satelliteID, pieceID)
require.NoError(t, err)
assert.Equal(t, int64(len(v1Data)), reader.Size())
assert.Equal(t, filestore.FormatV1, reader.StorageFormatVersion())
gotData, err := ioutil.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, v1Data, gotData)
require.NoError(t, reader.Close())
}
// now _both_ pieces should show up under WalkSatellitePieces. this may
// be counter-intuitive, but the V0 piece still exists for now (so we can avoid
// hitting the pieceinfo db with every new piece write). I believe this is OK, because
// (a) I don't think that writing different pieces with the same piece ID is a normal
// use case, unless we make a V0->V1 migrator tool, which should know about these
// semantics; (b) the V0 piece should not ever become visible again to the user; it
// should not be possible under normal conditions to delete one without deleting the
// other.
calledTimes = 0
err = store.WalkSatellitePieces(ctx, satelliteID, func(access pieces.StoredPieceAccess) error {
calledTimes++
switch calledTimes {
case 1:
// expect the V1 piece
assert.Equal(t, pieceID, access.PieceID())
assert.Equal(t, filestore.FormatV1, access.StorageFormatVersion())
gotCreateTime, err := access.CreationTime(ctx)
require.NoError(t, err)
assert.True(t, v1CreateTime.Equal(gotCreateTime))
storagenode: Include trash space when calculating space used This commit adds functionality to include the space used in the trash directory when calculating available space on the node. It also includes this trash value in the space used cache, with methods to keep the cache up-to-date as files are trashed, restored, and emptied. As part of the commit, the RestoreTrash and EmptyTrash methods have slightly changed signatures. RestoreTrash now also returns the keys that were restored, while EmptyTrash also returns the total disk space recovered. Each of these changes makes it possible to keep the cache up-to-date and know how much space is being used/recovered. Also changed is the signature of PieceStoreAccess.ContentSize method. Previously this method returns only the content size of the blob, removing the size of any header data. This method has been renamed `Size` and returns both the full disk size and content size of the blob. This allows us to only stat the file once, and in some instances (i.e. cache) knowing the full file size is useful. Note: This commit simply adds the trash size data to the piece size data we were already collecting. The piece size data is not accurate for all use-cases (e.g. because it does not contain piece header data); however, this commit does not fix that problem. Now that the ContentSize (Size) method returns the full size of the file, it should be easier to fix this problem in a future commit. Change-Id: I4a6cae09e262c8452a618116d1dc66b687f59f85
2019-12-21 13:11:24 +00:00
_, gotSize, err := access.Size(ctx)
require.NoError(t, err)
assert.Equal(t, int64(len(v1Data)), gotSize)
case 2:
// expect the V0 piece
assert.Equal(t, pieceID, access.PieceID())
assert.Equal(t, filestore.FormatV0, access.StorageFormatVersion())
gotCreateTime, err := access.CreationTime(ctx)
require.NoError(t, err)
assert.True(t, v0CreateTime.Equal(gotCreateTime))
storagenode: Include trash space when calculating space used This commit adds functionality to include the space used in the trash directory when calculating available space on the node. It also includes this trash value in the space used cache, with methods to keep the cache up-to-date as files are trashed, restored, and emptied. As part of the commit, the RestoreTrash and EmptyTrash methods have slightly changed signatures. RestoreTrash now also returns the keys that were restored, while EmptyTrash also returns the total disk space recovered. Each of these changes makes it possible to keep the cache up-to-date and know how much space is being used/recovered. Also changed is the signature of PieceStoreAccess.ContentSize method. Previously this method returns only the content size of the blob, removing the size of any header data. This method has been renamed `Size` and returns both the full disk size and content size of the blob. This allows us to only stat the file once, and in some instances (i.e. cache) knowing the full file size is useful. Note: This commit simply adds the trash size data to the piece size data we were already collecting. The piece size data is not accurate for all use-cases (e.g. because it does not contain piece header data); however, this commit does not fix that problem. Now that the ContentSize (Size) method returns the full size of the file, it should be easier to fix this problem in a future commit. Change-Id: I4a6cae09e262c8452a618116d1dc66b687f59f85
2019-12-21 13:11:24 +00:00
_, gotSize, err := access.Size(ctx)
require.NoError(t, err)
assert.Equal(t, int64(len(v0Data)), gotSize)
default:
t.Fatalf("calledTimes should be 1 or 2, but it is %d", calledTimes)
}
return nil
})
require.NoError(t, err)
// delete the pieceID; this should get both V0 and V1
err = store.Delete(ctx, satelliteID, pieceID)
require.NoError(t, err)
err = store.WalkSatellitePieces(ctx, satelliteID, func(access pieces.StoredPieceAccess) error {
t.Fatalf("this should not have been called. pieceID=%x, format=%d", access.PieceID(), access.StorageFormatVersion())
return nil
})
require.NoError(t, err)
})
}