storj/storagenode/pieces/store_test.go
Egon Elbre ee71fbb41d storagenode/piecestore: start restore trash in the background
Starting restore trash in the background allows the satellite to
continue to the next storagenode without needing to wait until
completion.

Of course, this means the satellite doesn't get feedback whether it
succeeds successfully or not. This means that the restore-trash needs to
be executed several times.

Change-Id: I62d43f6f2e4a07854f6d083a65badf897338083b
2022-12-16 18:15:52 +02:00

881 lines
30 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package pieces_test
import (
"bytes"
"context"
"encoding/hex"
"errors"
"io"
"os"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"storj.io/common/identity/testidentity"
"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/pkcrypto"
"storj.io/common/rpc"
"storj.io/common/signing"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/storage"
"storj.io/storj/storage/filestore"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/pieces"
"storj.io/storj/storagenode/storagenodedb/storagenodedbtest"
"storj.io/storj/storagenode/trust"
)
func TestPieces(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
dir, err := filestore.NewDir(zaptest.NewLogger(t), ctx.Dir("pieces"))
require.NoError(t, err)
blobs := filestore.New(zaptest.NewLogger(t), dir, filestore.DefaultConfig)
defer ctx.Check(blobs.Close)
store := pieces.NewStore(zaptest.NewLogger(t), blobs, nil, nil, nil, pieces.DefaultConfig)
satelliteID := testidentity.MustPregeneratedSignedIdentity(0, storj.LatestIDVersion()).ID
pieceID := storj.NewPieceID()
source := testrand.Bytes(8000)
{ // write data
writer, err := store.Writer(ctx, satelliteID, pieceID, pb.PieceHashAlgorithm_SHA256)
require.NoError(t, err)
n, err := io.Copy(writer, bytes.NewReader(source))
require.NoError(t, err)
assert.Equal(t, len(source), int(n))
assert.Equal(t, len(source), int(writer.Size()))
// verify hash
hash := pkcrypto.NewHash()
_, _ = hash.Write(source)
assert.Equal(t, hash.Sum(nil), writer.Hash())
// commit
require.NoError(t, writer.Commit(ctx, &pb.PieceHeader{}))
// after commit we should be able to call cancel without an error
require.NoError(t, writer.Cancel(ctx))
}
{ // valid reads
read := func(offset, length int64) []byte {
reader, err := store.Reader(ctx, satelliteID, pieceID)
require.NoError(t, err)
pos, err := reader.Seek(offset, io.SeekStart)
require.NoError(t, err)
require.Equal(t, offset, pos)
data := make([]byte, length)
n, err := io.ReadFull(reader, data)
require.NoError(t, err)
require.Equal(t, int(length), n)
require.NoError(t, reader.Close())
return data
}
require.Equal(t, source[10:11], read(10, 1))
require.Equal(t, source[10:1010], read(10, 1000))
require.Equal(t, source, read(0, int64(len(source))))
}
{ // reading ends with io.EOF
reader, err := store.Reader(ctx, satelliteID, pieceID)
require.NoError(t, err)
data := make([]byte, 111)
for {
_, err := reader.Read(data)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
}
}
require.NoError(t, reader.Close())
}
{ // test delete
assert.NoError(t, store.Delete(ctx, satelliteID, pieceID))
// read should now fail
_, err := store.Reader(ctx, satelliteID, pieceID)
assert.Error(t, err)
}
{ // write cancel
cancelledPieceID := storj.NewPieceID()
writer, err := store.Writer(ctx, satelliteID, cancelledPieceID, pb.PieceHashAlgorithm_SHA256)
require.NoError(t, err)
n, err := io.Copy(writer, bytes.NewReader(source))
require.NoError(t, err)
assert.Equal(t, len(source), int(n))
assert.Equal(t, len(source), int(writer.Size()))
// cancel writing
require.NoError(t, writer.Cancel(ctx))
// commit should not fail
require.Error(t, writer.Commit(ctx, &pb.PieceHeader{}))
// read should fail
_, err = store.Reader(ctx, satelliteID, cancelledPieceID)
assert.Error(t, err)
}
}
func writeAPiece(ctx context.Context, t testing.TB, store *pieces.Store, satelliteID storj.NodeID, pieceID storj.PieceID, data []byte, atTime time.Time, expireTime *time.Time, formatVersion storage.FormatVersion) {
tStore := &pieces.StoreForTest{store}
writer, err := tStore.WriterForFormatVersion(ctx, satelliteID, pieceID, formatVersion, pb.PieceHashAlgorithm_SHA256)
require.NoError(t, err)
_, err = writer.Write(data)
require.NoError(t, err)
size := writer.Size()
assert.Equal(t, int64(len(data)), size)
limit := pb.OrderLimit{}
if expireTime != nil {
limit.PieceExpiration = *expireTime
}
err = writer.Commit(ctx, &pb.PieceHeader{
Hash: writer.Hash(),
CreationTime: atTime,
OrderLimit: limit,
})
require.NoError(t, err)
}
func verifyPieceHandle(t testing.TB, reader *pieces.Reader, expectDataLen int, expectCreateTime time.Time, expectFormat storage.FormatVersion) {
assert.Equal(t, expectFormat, reader.StorageFormatVersion())
assert.Equal(t, int64(expectDataLen), reader.Size())
if expectFormat != filestore.FormatV0 {
pieceHeader, err := reader.GetPieceHeader()
require.NoError(t, err)
assert.Equal(t, expectFormat, storage.FormatVersion(pieceHeader.FormatVersion))
assert.True(t, expectCreateTime.Equal(pieceHeader.CreationTime))
}
}
func tryOpeningAPiece(ctx context.Context, t testing.TB, store *pieces.Store, satelliteID storj.NodeID, pieceID storj.PieceID, expectDataLen int, expectTime time.Time, expectFormat storage.FormatVersion) {
reader, err := store.Reader(ctx, satelliteID, pieceID)
require.NoError(t, err)
verifyPieceHandle(t, reader, expectDataLen, expectTime, expectFormat)
require.NoError(t, reader.Close())
reader, err = store.ReaderWithStorageFormat(ctx, satelliteID, pieceID, expectFormat)
require.NoError(t, err)
verifyPieceHandle(t, reader, expectDataLen, expectTime, expectFormat)
require.NoError(t, reader.Close())
}
func TestTrashAndRestore(t *testing.T) {
type testfile struct {
data []byte
formatVer storage.FormatVersion
}
type testpiece struct {
pieceID storj.PieceID
files []testfile
expiration time.Time
trashDur time.Duration
}
type testsatellite struct {
satelliteID storj.NodeID
pieces []testpiece
}
size := memory.KB
// Initialize pub/priv keys for signing piece hash
publicKeyBytes, err := hex.DecodeString("01eaebcb418cd629d4c01f365f33006c9de3ce70cf04da76c39cdc993f48fe53")
require.NoError(t, err)
privateKeyBytes, err := hex.DecodeString("afefcccadb3d17b1f241b7c83f88c088b54c01b5a25409c13cbeca6bfa22b06901eaebcb418cd629d4c01f365f33006c9de3ce70cf04da76c39cdc993f48fe53")
require.NoError(t, err)
publicKey, err := storj.PiecePublicKeyFromBytes(publicKeyBytes)
require.NoError(t, err)
privateKey, err := storj.PiecePrivateKeyFromBytes(privateKeyBytes)
require.NoError(t, err)
trashDurToBeEmptied := 7 * 24 * time.Hour
trashDurToBeKept := 3 * 24 * time.Hour
satellites := []testsatellite{
{
satelliteID: testrand.NodeID(),
pieces: []testpiece{
{
expiration: time.Time{}, // no expiration
pieceID: testrand.PieceID(),
trashDur: trashDurToBeEmptied,
files: []testfile{
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV0,
},
},
},
{
pieceID: testrand.PieceID(),
expiration: time.Now().Add(24 * time.Hour),
trashDur: trashDurToBeEmptied,
files: []testfile{
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV1,
},
},
},
{
pieceID: testrand.PieceID(),
expiration: time.Now().Add(24 * time.Hour),
trashDur: trashDurToBeKept,
files: []testfile{
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV1,
},
},
},
{
pieceID: testrand.PieceID(),
expiration: time.Now().Add(24 * time.Hour),
trashDur: trashDurToBeKept,
files: []testfile{
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV1,
},
},
},
{
pieceID: testrand.PieceID(),
expiration: time.Now().Add(24 * time.Hour),
trashDur: trashDurToBeKept,
files: []testfile{
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV0,
},
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV1,
},
},
},
},
},
{
satelliteID: testrand.NodeID(),
pieces: []testpiece{
{
pieceID: testrand.PieceID(),
expiration: time.Now().Add(24 * time.Hour),
trashDur: trashDurToBeKept,
files: []testfile{
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV1,
},
},
},
{
pieceID: testrand.PieceID(),
expiration: time.Now().Add(24 * time.Hour),
trashDur: trashDurToBeEmptied,
files: []testfile{
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV1,
},
},
},
},
},
}
storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) {
dir, err := filestore.NewDir(zaptest.NewLogger(t), ctx.Dir("store"))
require.NoError(t, err)
blobs := filestore.New(zaptest.NewLogger(t), dir, filestore.DefaultConfig)
require.NoError(t, err)
defer ctx.Check(blobs.Close)
v0PieceInfo, ok := db.V0PieceInfo().(pieces.V0PieceInfoDBForTest)
require.True(t, ok, "V0PieceInfoDB can not satisfy V0PieceInfoDBForTest")
store := pieces.NewStore(zaptest.NewLogger(t), blobs, v0PieceInfo, db.PieceExpirationDB(), nil, pieces.DefaultConfig)
tStore := &pieces.StoreForTest{store}
var satelliteURLs []trust.SatelliteURL
for i, satellite := range satellites {
// host:port pair must be unique or the trust pool will aggregate
// them into a single entry with the first one "winning".
satelliteURLs = append(satelliteURLs, trust.SatelliteURL{
ID: satellite.satelliteID,
Host: "localhost",
Port: i,
})
now := time.Now()
for _, piece := range satellite.pieces {
// If test has expiration, add to expiration db
if !piece.expiration.IsZero() {
require.NoError(t, store.SetExpiration(ctx, satellite.satelliteID, piece.pieceID, piece.expiration))
}
for _, file := range piece.files {
w, err := tStore.WriterForFormatVersion(ctx, satellite.satelliteID, piece.pieceID, file.formatVer, pb.PieceHashAlgorithm_SHA256)
require.NoError(t, err)
_, err = w.Write(file.data)
require.NoError(t, err)
// Create, sign, and commit piece hash (to piece or v0PieceInfo)
pieceHash := &pb.PieceHash{
PieceId: piece.pieceID,
Hash: w.Hash(),
PieceSize: w.Size(),
Timestamp: now,
}
signedPieceHash, err := signing.SignUplinkPieceHash(ctx, privateKey, pieceHash)
require.NoError(t, err)
require.NoError(t, w.Commit(ctx, &pb.PieceHeader{
Hash: signedPieceHash.GetHash(),
CreationTime: signedPieceHash.GetTimestamp(),
Signature: signedPieceHash.GetSignature(),
}))
if file.formatVer == filestore.FormatV0 {
err = v0PieceInfo.Add(ctx, &pieces.Info{
SatelliteID: satellite.satelliteID,
PieceID: piece.pieceID,
PieceSize: signedPieceHash.GetPieceSize(),
PieceCreation: signedPieceHash.GetTimestamp(),
OrderLimit: &pb.OrderLimit{},
UplinkPieceHash: signedPieceHash,
})
require.NoError(t, err)
}
// Verify piece matches data, has correct signature and expiration
verifyPieceData(ctx, t, store, satellite.satelliteID, piece.pieceID, file.formatVer, file.data, piece.expiration, publicKey)
}
trashDurToUse := piece.trashDur
dir.ReplaceTrashnow(func() time.Time {
return time.Now().Add(-trashDurToUse)
})
// Trash the piece
require.NoError(t, store.Trash(ctx, satellite.satelliteID, piece.pieceID))
// Confirm is missing
r, err := store.Reader(ctx, satellite.satelliteID, piece.pieceID)
require.Error(t, err)
require.Nil(t, r)
// Verify no expiry information is returned for this piece
if !piece.expiration.IsZero() {
infos, err := store.GetExpired(ctx, time.Now().Add(720*time.Hour), 1000)
require.NoError(t, err)
var found bool
for _, info := range infos {
if info.SatelliteID == satellite.satelliteID && info.PieceID == piece.pieceID {
found = true
}
}
require.False(t, found)
}
}
}
// Initialize a trust pool
poolConfig := trust.Config{
CachePath: ctx.File("trust-cache.json"),
}
for _, satelliteURL := range satelliteURLs {
poolConfig.Sources = append(poolConfig.Sources, &trust.StaticURLSource{URL: satelliteURL})
}
trust, err := trust.NewPool(zaptest.NewLogger(t), trust.Dialer(rpc.Dialer{}), poolConfig, db.Satellites())
require.NoError(t, err)
require.NoError(t, trust.Refresh(ctx))
// Empty trash by running the chore once
trashDur := 4 * 24 * time.Hour
chorectx, chorecancel := context.WithCancel(ctx)
chore := pieces.NewTrashChore(zaptest.NewLogger(t), 24*time.Hour, trashDur, trust, store)
ctx.Go(func() error {
return chore.Run(chorectx)
})
chore.Cycle.TriggerWait()
require.NoError(t, chore.Close())
chorecancel()
// Restore all pieces in the first satellite
require.NoError(t, store.RestoreTrash(ctx, satellites[0].satelliteID))
// Check that each piece for first satellite is back, that they are
// MaxFormatVersionSupported (regardless of which version they began
// with), and that signature matches.
for _, piece := range satellites[0].pieces {
if piece.trashDur < trashDur {
// Expect the piece to be there
lastFile := piece.files[len(piece.files)-1]
verifyPieceData(ctx, t, store, satellites[0].satelliteID, piece.pieceID, filestore.MaxFormatVersionSupported, lastFile.data, piece.expiration, publicKey)
} else {
// Expect the piece to be missing, it should be removed from the trash on EmptyTrash
r, err := store.Reader(ctx, satellites[1].satelliteID, piece.pieceID)
require.Error(t, err)
require.Nil(t, r)
}
}
// Confirm 2nd satellite pieces are still in the trash
for _, piece := range satellites[1].pieces {
r, err := store.Reader(ctx, satellites[1].satelliteID, piece.pieceID)
require.Error(t, err)
require.Nil(t, r)
}
// Restore satellite[1] and make sure they're back (confirming they were not deleted on EmptyTrash)
require.NoError(t, store.RestoreTrash(ctx, satellites[1].satelliteID))
for _, piece := range satellites[1].pieces {
if piece.trashDur < trashDur {
// Expect the piece to be there
lastFile := piece.files[len(piece.files)-1]
verifyPieceData(ctx, t, store, satellites[1].satelliteID, piece.pieceID, filestore.MaxFormatVersionSupported, lastFile.data, piece.expiration, publicKey)
} else {
// Expect the piece to be missing, it should be removed from the trash on EmptyTrash
r, err := store.Reader(ctx, satellites[1].satelliteID, piece.pieceID)
require.Error(t, err)
require.Nil(t, r)
}
}
})
}
func verifyPieceData(ctx context.Context, t testing.TB, store *pieces.Store, satelliteID storj.NodeID, pieceID storj.PieceID, formatVer storage.FormatVersion, expected []byte, expiration time.Time, publicKey storj.PiecePublicKey) {
r, err := store.ReaderWithStorageFormat(ctx, satelliteID, pieceID, formatVer)
require.NoError(t, err)
// Get piece hash, verify signature
var pieceHash *pb.PieceHash
if formatVer > filestore.FormatV0 {
header, err := r.GetPieceHeader()
require.NoError(t, err)
pieceHash = &pb.PieceHash{
PieceId: pieceID,
Hash: header.GetHash(),
PieceSize: r.Size(),
Timestamp: header.GetCreationTime(),
Signature: header.GetSignature(),
}
} else {
info, err := store.GetV0PieceInfo(ctx, satelliteID, pieceID)
require.NoError(t, err)
pieceHash = info.UplinkPieceHash
}
require.NoError(t, signing.VerifyUplinkPieceHashSignature(ctx, publicKey, pieceHash))
// Require piece data to match expected
buf, err := io.ReadAll(r)
require.NoError(t, err)
require.NoError(t, r.Close())
assert.True(t, bytes.Equal(buf, expected))
// Require expiration to match expected
infos, err := store.GetExpired(ctx, time.Now().Add(720*time.Hour), 1000)
require.NoError(t, err)
var found bool
for _, info := range infos {
if info.SatelliteID == satelliteID && info.PieceID == pieceID {
found = true
}
}
if expiration.IsZero() {
require.False(t, found)
} else {
require.True(t, found)
}
}
func TestPieceVersionMigrate(t *testing.T) {
storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) {
const pieceSize = 1024
var (
data = testrand.Bytes(pieceSize)
satelliteID = testrand.NodeID()
pieceID = testrand.PieceID()
now = time.Now().UTC() // we use UTC here to make struct equality testing easier
)
// Initialize pub/priv keys for signing piece hash
publicKeyBytes, err := hex.DecodeString("01eaebcb418cd629d4c01f365f33006c9de3ce70cf04da76c39cdc993f48fe53")
require.NoError(t, err)
privateKeyBytes, err := hex.DecodeString("afefcccadb3d17b1f241b7c83f88c088b54c01b5a25409c13cbeca6bfa22b06901eaebcb418cd629d4c01f365f33006c9de3ce70cf04da76c39cdc993f48fe53")
require.NoError(t, err)
publicKey, err := storj.PiecePublicKeyFromBytes(publicKeyBytes)
require.NoError(t, err)
privateKey, err := storj.PiecePrivateKeyFromBytes(privateKeyBytes)
require.NoError(t, err)
ol := &pb.OrderLimit{
SerialNumber: testrand.SerialNumber(),
SatelliteId: satelliteID,
StorageNodeId: testrand.NodeID(),
PieceId: pieceID,
SatelliteSignature: []byte("sig"),
Limit: 100,
Action: pb.PieceAction_GET,
PieceExpiration: now,
OrderExpiration: now,
OrderCreation: now,
}
olPieceInfo := *ol
v0PieceInfo, ok := db.V0PieceInfo().(pieces.V0PieceInfoDBForTest)
require.True(t, ok, "V0PieceInfoDB can not satisfy V0PieceInfoDBForTest")
blobs, err := filestore.NewAt(zaptest.NewLogger(t), ctx.Dir("store"), filestore.DefaultConfig)
require.NoError(t, err)
defer ctx.Check(blobs.Close)
store := pieces.NewStore(zaptest.NewLogger(t), blobs, v0PieceInfo, nil, nil, pieces.DefaultConfig)
// write as a v0 piece
tStore := &pieces.StoreForTest{store}
writer, err := tStore.WriterForFormatVersion(ctx, satelliteID, pieceID, filestore.FormatV0, pb.PieceHashAlgorithm_SHA256)
require.NoError(t, err)
_, err = writer.Write(data)
require.NoError(t, err)
assert.Equal(t, int64(len(data)), writer.Size())
err = writer.Commit(ctx, &pb.PieceHeader{
Hash: writer.Hash(),
CreationTime: now,
OrderLimit: olPieceInfo,
})
require.NoError(t, err)
// Create PieceHash from the v0 piece written
ph := &pb.PieceHash{
PieceId: pieceID,
Hash: writer.Hash(),
PieceSize: writer.Size(),
Timestamp: now,
}
// sign v0 pice hash
signedPhPieceInfo, err := signing.SignUplinkPieceHash(ctx, privateKey, ph)
require.NoError(t, err)
// Create v0 pieces.Info and add to v0 store
pieceInfo := pieces.Info{
SatelliteID: satelliteID,
PieceID: pieceID,
PieceSize: writer.Size(),
OrderLimit: &olPieceInfo,
PieceCreation: now,
UplinkPieceHash: signedPhPieceInfo,
}
require.NoError(t, v0PieceInfo.Add(ctx, &pieceInfo))
// verify piece can be opened as v0
tryOpeningAPiece(ctx, t, store, satelliteID, pieceID, len(data), now, filestore.FormatV0)
// run migration
require.NoError(t, store.MigrateV0ToV1(ctx, satelliteID, pieceID))
// open as v1 piece
tryOpeningAPiece(ctx, t, store, satelliteID, pieceID, len(data), now, filestore.FormatV1)
// manually read v1 piece
reader, err := store.ReaderWithStorageFormat(ctx, satelliteID, pieceID, filestore.FormatV1)
require.NoError(t, err)
// generate v1 pieceHash and verify signature is still valid
v1Header, err := reader.GetPieceHeader()
require.NoError(t, err)
v1PieceHash := pb.PieceHash{
PieceId: v1Header.OrderLimit.PieceId,
Hash: v1Header.GetHash(),
PieceSize: reader.Size(),
Timestamp: v1Header.GetCreationTime(),
Signature: v1Header.GetSignature(),
}
require.NoError(t, signing.VerifyUplinkPieceHashSignature(ctx, publicKey, &v1PieceHash))
require.Equal(t, signedPhPieceInfo.GetSignature(), v1PieceHash.GetSignature())
require.Equal(t, *ol, v1Header.OrderLimit)
// Verify that it was deleted from v0PieceInfo
retrivedInfo, err := v0PieceInfo.Get(ctx, satelliteID, pieceID)
require.Error(t, err)
require.Nil(t, retrivedInfo)
})
}
// Test that the piece store can still read V0 pieces that might be left over from a previous
// version, as well as V1 pieces.
func TestMultipleStorageFormatVersions(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
blobs, err := filestore.NewAt(zaptest.NewLogger(t), ctx.Dir("store"), filestore.DefaultConfig)
require.NoError(t, err)
defer ctx.Check(blobs.Close)
store := pieces.NewStore(zaptest.NewLogger(t), blobs, nil, nil, nil, pieces.DefaultConfig)
const pieceSize = 1024
var (
data = testrand.Bytes(pieceSize)
satellite = testrand.NodeID()
v0PieceID = testrand.PieceID()
v1PieceID = testrand.PieceID()
now = time.Now()
)
// write a V0 piece
writeAPiece(ctx, t, store, satellite, v0PieceID, data, now, nil, filestore.FormatV0)
// write a V1 piece
writeAPiece(ctx, t, store, satellite, v1PieceID, data, now, nil, filestore.FormatV1)
// look up the different pieces with Reader and ReaderWithStorageFormat
tryOpeningAPiece(ctx, t, store, satellite, v0PieceID, len(data), now, filestore.FormatV0)
tryOpeningAPiece(ctx, t, store, satellite, v1PieceID, len(data), now, filestore.FormatV1)
// write a V1 piece with the same ID as the V0 piece (to simulate it being rewritten as
// V1 during a migration)
differentData := append(append([]byte{}, data...), 111, 104, 97, 105)
writeAPiece(ctx, t, store, satellite, v0PieceID, differentData, now, nil, filestore.FormatV1)
// if we try to access the piece at that key, we should see only the V1 piece
tryOpeningAPiece(ctx, t, store, satellite, v0PieceID, len(differentData), now, filestore.FormatV1)
// unless we ask specifically for a V0 piece
reader, err := store.ReaderWithStorageFormat(ctx, satellite, v0PieceID, filestore.FormatV0)
require.NoError(t, err)
verifyPieceHandle(t, reader, len(data), now, filestore.FormatV0)
require.NoError(t, reader.Close())
// delete the v0PieceID; both the V0 and the V1 pieces should go away
err = store.Delete(ctx, satellite, v0PieceID)
require.NoError(t, err)
reader, err = store.Reader(ctx, satellite, v0PieceID)
require.Error(t, err)
require.True(t, os.IsNotExist(err))
assert.Nil(t, reader)
}
func TestGetExpired(t *testing.T) {
storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) {
v0PieceInfo, ok := db.V0PieceInfo().(pieces.V0PieceInfoDBForTest)
require.True(t, ok, "V0PieceInfoDB can not satisfy V0PieceInfoDBForTest")
expirationInfo := db.PieceExpirationDB()
store := pieces.NewStore(zaptest.NewLogger(t), db.Pieces(), v0PieceInfo, expirationInfo, db.PieceSpaceUsedDB(), pieces.DefaultConfig)
now := time.Now()
testDates := []struct {
years, months, days int
}{
{-20, -1, -2},
{1, 6, 14},
{0, -1, 0},
{0, 0, 1},
}
testPieces := make([]pieces.Info, 4)
for p := range testPieces {
testPieces[p] = pieces.Info{
SatelliteID: testrand.NodeID(),
PieceID: testrand.PieceID(),
OrderLimit: &pb.OrderLimit{},
UplinkPieceHash: &pb.PieceHash{},
PieceExpiration: now.AddDate(testDates[p].years, testDates[p].months, testDates[p].days),
}
}
// put testPieces 0 and 1 in the v0 pieceinfo db
err := v0PieceInfo.Add(ctx, &testPieces[0])
require.NoError(t, err)
err = v0PieceInfo.Add(ctx, &testPieces[1])
require.NoError(t, err)
// put testPieces 2 and 3 in the piece_expirations db
err = expirationInfo.SetExpiration(ctx, testPieces[2].SatelliteID, testPieces[2].PieceID, testPieces[2].PieceExpiration)
require.NoError(t, err)
err = expirationInfo.SetExpiration(ctx, testPieces[3].SatelliteID, testPieces[3].PieceID, testPieces[3].PieceExpiration)
require.NoError(t, err)
// GetExpired with limit 0 gives empty result
expired, err := store.GetExpired(ctx, now, 0)
require.NoError(t, err)
assert.Empty(t, expired)
// GetExpired with limit 1 gives only 1 result, although there are 2 possible
expired, err = store.GetExpired(ctx, now, 1)
require.NoError(t, err)
require.Len(t, expired, 1)
assert.Equal(t, testPieces[2].PieceID, expired[0].PieceID)
assert.Equal(t, testPieces[2].SatelliteID, expired[0].SatelliteID)
assert.False(t, expired[0].InPieceInfo)
// GetExpired with 2 or more gives all expired results correctly; one from
// piece_expirations, and one from pieceinfo
expired, err = store.GetExpired(ctx, now, 1000)
require.NoError(t, err)
require.Len(t, expired, 2)
assert.Equal(t, testPieces[2].PieceID, expired[0].PieceID)
assert.Equal(t, testPieces[2].SatelliteID, expired[0].SatelliteID)
assert.False(t, expired[0].InPieceInfo)
assert.Equal(t, testPieces[0].PieceID, expired[1].PieceID)
assert.Equal(t, testPieces[0].SatelliteID, expired[1].SatelliteID)
assert.True(t, expired[1].InPieceInfo)
})
}
func TestOverwriteV0WithV1(t *testing.T) {
storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) {
v0PieceInfo, ok := db.V0PieceInfo().(pieces.V0PieceInfoDBForTest)
require.True(t, ok, "V0PieceInfoDB can not satisfy V0PieceInfoDBForTest")
expirationInfo := db.PieceExpirationDB()
store := pieces.NewStore(zaptest.NewLogger(t), db.Pieces(), v0PieceInfo, expirationInfo, db.PieceSpaceUsedDB(), pieces.DefaultConfig)
satelliteID := testrand.NodeID()
pieceID := testrand.PieceID()
v0Data := testrand.Bytes(4 * memory.MiB)
v1Data := testrand.Bytes(3 * memory.MiB)
// write the piece as V0. We can't provide the expireTime via writeAPiece, because
// BlobWriter.Commit only knows how to store expiration times in piece_expirations.
v0CreateTime := time.Now()
v0ExpireTime := v0CreateTime.AddDate(5, 0, 0)
writeAPiece(ctx, t, store, satelliteID, pieceID, v0Data, v0CreateTime, nil, filestore.FormatV0)
// now put the piece in the pieceinfo db directly, because store won't do that for us.
// this is where the expireTime takes effect.
err := v0PieceInfo.Add(ctx, &pieces.Info{
SatelliteID: satelliteID,
PieceID: pieceID,
PieceSize: int64(len(v0Data)),
PieceCreation: v0CreateTime,
PieceExpiration: v0ExpireTime,
OrderLimit: &pb.OrderLimit{},
UplinkPieceHash: &pb.PieceHash{},
})
require.NoError(t, err)
// ensure we can see it via store.Reader
{
reader, err := store.Reader(ctx, satelliteID, pieceID)
require.NoError(t, err)
assert.Equal(t, int64(len(v0Data)), reader.Size())
assert.Equal(t, filestore.FormatV0, reader.StorageFormatVersion())
gotData, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, v0Data, gotData)
require.NoError(t, reader.Close())
}
// ensure we can see it via WalkSatellitePieces
calledTimes := 0
err = store.WalkSatellitePieces(ctx, satelliteID, func(access pieces.StoredPieceAccess) error {
calledTimes++
require.Equal(t, 1, calledTimes)
gotCreateTime, err := access.CreationTime(ctx)
require.NoError(t, err)
assert.True(t, v0CreateTime.Equal(gotCreateTime))
_, gotSize, err := access.Size(ctx)
require.NoError(t, err)
assert.Equal(t, int64(len(v0Data)), gotSize)
return nil
})
require.NoError(t, err)
// now "overwrite" the piece (write a new blob with the same id, but with V1 storage)
v1CreateTime := time.Now()
v1ExpireTime := v1CreateTime.AddDate(5, 0, 0)
writeAPiece(ctx, t, store, satelliteID, pieceID, v1Data, v1CreateTime, &v1ExpireTime, filestore.FormatV1)
// ensure we can see it (the new piece) via store.Reader
{
reader, err := store.Reader(ctx, satelliteID, pieceID)
require.NoError(t, err)
assert.Equal(t, int64(len(v1Data)), reader.Size())
assert.Equal(t, filestore.FormatV1, reader.StorageFormatVersion())
gotData, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, v1Data, gotData)
require.NoError(t, reader.Close())
}
// now _both_ pieces should show up under WalkSatellitePieces. this may
// be counter-intuitive, but the V0 piece still exists for now (so we can avoid
// hitting the pieceinfo db with every new piece write). I believe this is OK, because
// (a) I don't think that writing different pieces with the same piece ID is a normal
// use case, unless we make a V0->V1 migrator tool, which should know about these
// semantics; (b) the V0 piece should not ever become visible again to the user; it
// should not be possible under normal conditions to delete one without deleting the
// other.
calledTimes = 0
err = store.WalkSatellitePieces(ctx, satelliteID, func(access pieces.StoredPieceAccess) error {
calledTimes++
switch calledTimes {
case 1:
// expect the V1 piece
assert.Equal(t, pieceID, access.PieceID())
assert.Equal(t, filestore.FormatV1, access.StorageFormatVersion())
gotCreateTime, err := access.CreationTime(ctx)
require.NoError(t, err)
assert.True(t, v1CreateTime.Equal(gotCreateTime))
_, gotSize, err := access.Size(ctx)
require.NoError(t, err)
assert.Equal(t, int64(len(v1Data)), gotSize)
case 2:
// expect the V0 piece
assert.Equal(t, pieceID, access.PieceID())
assert.Equal(t, filestore.FormatV0, access.StorageFormatVersion())
gotCreateTime, err := access.CreationTime(ctx)
require.NoError(t, err)
assert.True(t, v0CreateTime.Equal(gotCreateTime))
_, gotSize, err := access.Size(ctx)
require.NoError(t, err)
assert.Equal(t, int64(len(v0Data)), gotSize)
default:
t.Fatalf("calledTimes should be 1 or 2, but it is %d", calledTimes)
}
return nil
})
require.NoError(t, err)
// delete the pieceID; this should get both V0 and V1
err = store.Delete(ctx, satelliteID, pieceID)
require.NoError(t, err)
err = store.WalkSatellitePieces(ctx, satelliteID, func(access pieces.StoredPieceAccess) error {
t.Fatalf("this should not have been called. pieceID=%x, format=%d", access.PieceID(), access.StorageFormatVersion())
return nil
})
require.NoError(t, err)
})
}