storagenode/pieces: introduce FileWalker

FileWalker implements methods to walk over pieces in
in a storage directory.

This is just a refactor to separate filewalker functions
from pieces.Store. This is needed to simplify the work
to create a separate filewalker subprocess and reduce the
number of config flags passed to the subprocess.

You might want to check https://review.dev.storj.io/c/storj/storj/+/9773

Change-Id: I4e9567024e54fc7c0bb21a7c27182ef745839fff
This commit is contained in:
Clement Sam 2023-03-01 03:59:53 +00:00 committed by Storj Robot
parent f8e8f3a4cc
commit e5c43722dc
9 changed files with 171 additions and 90 deletions

View File

@ -18,6 +18,7 @@ import (
"unicode" "unicode"
"github.com/jackc/pgconn" "github.com/jackc/pgconn"
"storj.io/private/tagsql" "storj.io/private/tagsql"
) )

View File

@ -253,6 +253,7 @@ type Peer struct {
Inspector *inspector.Endpoint Inspector *inspector.Endpoint
Monitor *monitor.Service Monitor *monitor.Service
Orders *orders.Service Orders *orders.Service
FileWalker *pieces.FileWalker
} }
Collector *collector.Service Collector *collector.Service
@ -454,8 +455,10 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
{ // setup storage { // setup storage
peer.Storage2.BlobsCache = pieces.NewBlobsUsageCache(peer.Log.Named("blobscache"), peer.DB.Pieces()) peer.Storage2.BlobsCache = pieces.NewBlobsUsageCache(peer.Log.Named("blobscache"), peer.DB.Pieces())
peer.Storage2.FileWalker = pieces.NewFileWalker(peer.Log.Named("filewalker"), peer.Storage2.BlobsCache, peer.DB.V0PieceInfo())
peer.Storage2.Store = pieces.NewStore(peer.Log.Named("pieces"), peer.Storage2.Store = pieces.NewStore(peer.Log.Named("pieces"),
peer.Storage2.FileWalker,
peer.Storage2.BlobsCache, peer.Storage2.BlobsCache,
peer.DB.V0PieceInfo(), peer.DB.V0PieceInfo(),
peer.DB.PieceExpirationDB(), peer.DB.PieceExpirationDB(),

View File

@ -113,7 +113,7 @@ func TestCacheInit(t *testing.T) {
cache := pieces.NewBlobsUsageCacheTest(log, nil, 0, 0, 0, nil) cache := pieces.NewBlobsUsageCacheTest(log, nil, 0, 0, 0, nil)
cacheService := pieces.NewService(log, cacheService := pieces.NewService(log,
cache, cache,
pieces.NewStore(log, cache, nil, nil, spaceUsedDB, pieces.DefaultConfig), pieces.NewStore(log, pieces.NewFileWalker(log, cache, nil), cache, nil, nil, spaceUsedDB, pieces.DefaultConfig),
1*time.Hour, 1*time.Hour,
true, true,
) )
@ -152,7 +152,7 @@ func TestCacheInit(t *testing.T) {
cache = pieces.NewBlobsUsageCacheTest(log, nil, expectedPiecesTotal, expectedPiecesContentSize, expectedTrash, expectedTotalBySA) cache = pieces.NewBlobsUsageCacheTest(log, nil, expectedPiecesTotal, expectedPiecesContentSize, expectedTrash, expectedTotalBySA)
cacheService = pieces.NewService(log, cacheService = pieces.NewService(log,
cache, cache,
pieces.NewStore(log, cache, nil, nil, spaceUsedDB, pieces.DefaultConfig), pieces.NewStore(log, pieces.NewFileWalker(log, cache, nil), cache, nil, nil, spaceUsedDB, pieces.DefaultConfig),
1*time.Hour, 1*time.Hour,
true, true,
) )
@ -163,7 +163,7 @@ func TestCacheInit(t *testing.T) {
cache = pieces.NewBlobsUsageCacheTest(log, nil, 0, 0, 0, nil) cache = pieces.NewBlobsUsageCacheTest(log, nil, 0, 0, 0, nil)
cacheService = pieces.NewService(log, cacheService = pieces.NewService(log,
cache, cache,
pieces.NewStore(log, cache, nil, nil, spaceUsedDB, pieces.DefaultConfig), pieces.NewStore(log, pieces.NewFileWalker(log, cache, nil), cache, nil, nil, spaceUsedDB, pieces.DefaultConfig),
1*time.Hour, 1*time.Hour,
true, true,
) )
@ -227,7 +227,7 @@ func TestCachServiceRun(t *testing.T) {
cache := pieces.NewBlobsUsageCache(log, blobstore) cache := pieces.NewBlobsUsageCache(log, blobstore)
cacheService := pieces.NewService(log, cacheService := pieces.NewService(log,
cache, cache,
pieces.NewStore(log, cache, nil, nil, spaceUsedDB, pieces.DefaultConfig), pieces.NewStore(log, pieces.NewFileWalker(log, cache, nil), cache, nil, nil, spaceUsedDB, pieces.DefaultConfig),
1*time.Hour, 1*time.Hour,
true, true,
) )
@ -314,7 +314,7 @@ func TestPersistCacheTotals(t *testing.T) {
cache := pieces.NewBlobsUsageCacheTest(log, nil, expectedPiecesTotal, expectedPiecesContentSize, expectedTrash, expectedTotalsBySA) cache := pieces.NewBlobsUsageCacheTest(log, nil, expectedPiecesTotal, expectedPiecesContentSize, expectedTrash, expectedTotalsBySA)
cacheService := pieces.NewService(log, cacheService := pieces.NewService(log,
cache, cache,
pieces.NewStore(log, cache, nil, nil, spaceUsedDB, pieces.DefaultConfig), pieces.NewStore(log, pieces.NewFileWalker(log, cache, nil), cache, nil, nil, spaceUsedDB, pieces.DefaultConfig),
1*time.Hour, 1*time.Hour,
true, true,
) )

View File

@ -38,10 +38,11 @@ func TestDeleter(t *testing.T) {
testCase := c testCase := c
t.Run(testCase.testID, func(t *testing.T) { t.Run(testCase.testID, func(t *testing.T) {
storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) { storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) {
dir, err := filestore.NewDir(zaptest.NewLogger(t), ctx.Dir("piecedeleter")) log := zaptest.NewLogger(t)
dir, err := filestore.NewDir(log, ctx.Dir("piecedeleter"))
require.NoError(t, err) require.NoError(t, err)
blobs := filestore.New(zaptest.NewLogger(t), dir, filestore.DefaultConfig) blobs := filestore.New(log, dir, filestore.DefaultConfig)
defer ctx.Check(blobs.Close) defer ctx.Check(blobs.Close)
v0PieceInfo, ok := db.V0PieceInfo().(pieces.V0PieceInfoDBForTest) v0PieceInfo, ok := db.V0PieceInfo().(pieces.V0PieceInfoDBForTest)
@ -51,8 +52,8 @@ func TestDeleter(t *testing.T) {
WritePreallocSize: 4 * memory.MiB, WritePreallocSize: 4 * memory.MiB,
DeleteToTrash: testCase.deleteToTrash, DeleteToTrash: testCase.deleteToTrash,
} }
store := pieces.NewStore(zaptest.NewLogger(t), blobs, v0PieceInfo, db.PieceExpirationDB(), nil, conf) store := pieces.NewStore(log, pieces.NewFileWalker(log, blobs, v0PieceInfo), blobs, v0PieceInfo, db.PieceExpirationDB(), nil, conf)
deleter := pieces.NewDeleter(zaptest.NewLogger(t), store, 1, 10000) deleter := pieces.NewDeleter(log, store, 1, 10000)
defer ctx.Check(deleter.Close) defer ctx.Check(deleter.Close)
deleter.SetupTest() deleter.SetupTest()

View File

@ -0,0 +1,63 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package pieces
import (
"context"
"go.uber.org/zap"
"storj.io/common/storj"
"storj.io/storj/storage"
"storj.io/storj/storage/filestore"
)
// FileWalker implements methods to walk over pieces in a storage directory.
type FileWalker struct {
log *zap.Logger
blobs storage.Blobs
v0PieceInfo V0PieceInfoDB
}
// NewFileWalker creates a new FileWalker.
func NewFileWalker(log *zap.Logger, blobs storage.Blobs, db V0PieceInfoDB) *FileWalker {
return &FileWalker{
log: log,
blobs: blobs,
v0PieceInfo: db,
}
}
// WalkSatellitePieces executes walkFunc for each locally stored piece in the namespace of the
// given satellite. If walkFunc returns a non-nil error, WalkSatellitePieces will stop iterating
// and return the error immediately. The ctx parameter is intended specifically to allow canceling
// iteration early.
//
// Note that this method includes all locally stored pieces, both V0 and higher.
func (fw *FileWalker) WalkSatellitePieces(ctx context.Context, satellite storj.NodeID, fn func(StoredPieceAccess) error) (err error) {
defer mon.Task()(&ctx)(&err)
// iterate over all in V1 storage, skipping v0 pieces
err = fw.blobs.WalkNamespace(ctx, satellite.Bytes(), func(blobInfo storage.BlobInfo) error {
if blobInfo.StorageFormatVersion() < filestore.FormatV1 {
// skip v0 pieces, which are handled separately
return nil
}
pieceAccess, err := newStoredPieceAccess(fw.blobs, blobInfo)
if err != nil {
// this is not a real piece blob. the blob store can't distinguish between actual piece
// blobs and stray files whose names happen to decode as valid base32. skip this
// "blob".
return nil //nolint: nilerr // we ignore other files
}
return fn(pieceAccess)
})
if err == nil && fw.v0PieceInfo != nil {
// iterate over all in V0 storage
err = fw.v0PieceInfo.WalkSatelliteV0Pieces(ctx, fw.blobs, satellite, fn)
}
return err
}

View File

@ -33,7 +33,7 @@ func BenchmarkReadWrite(b *testing.B) {
blobs := filestore.New(zap.NewNop(), dir, filestore.DefaultConfig) blobs := filestore.New(zap.NewNop(), dir, filestore.DefaultConfig)
defer ctx.Check(blobs.Close) defer ctx.Check(blobs.Close)
store := pieces.NewStore(zap.NewNop(), blobs, nil, nil, nil, pieces.DefaultConfig) store := pieces.NewStore(zap.NewNop(), pieces.NewFileWalker(zap.NewNop(), blobs, nil), blobs, nil, nil, nil, pieces.DefaultConfig)
// setup test parameters // setup test parameters
const blockSize = int(256 * memory.KiB) const blockSize = int(256 * memory.KiB)
@ -94,12 +94,14 @@ func readAndWritePiece(t *testing.T, content []byte) {
ctx := testcontext.New(t) ctx := testcontext.New(t)
defer ctx.Cleanup() defer ctx.Cleanup()
dir, err := filestore.NewDir(zaptest.NewLogger(t), ctx.Dir("pieces")) log := zaptest.NewLogger(t)
dir, err := filestore.NewDir(log, ctx.Dir("pieces"))
require.NoError(t, err) require.NoError(t, err)
blobs := filestore.New(zaptest.NewLogger(t), dir, filestore.DefaultConfig) blobs := filestore.New(log, dir, filestore.DefaultConfig)
defer ctx.Check(blobs.Close) defer ctx.Check(blobs.Close)
store := pieces.NewStore(zaptest.NewLogger(t), blobs, nil, nil, nil, pieces.DefaultConfig) store := pieces.NewStore(log, pieces.NewFileWalker(log, blobs, nil), blobs, nil, nil, nil, pieces.DefaultConfig)
// test parameters // test parameters
satelliteID := testrand.NodeID() satelliteID := testrand.NodeID()

View File

@ -172,9 +172,11 @@ type Store struct {
config Config config Config
blobs storage.Blobs blobs storage.Blobs
v0PieceInfo V0PieceInfoDB
expirationInfo PieceExpirationDB expirationInfo PieceExpirationDB
spaceUsedDB PieceSpaceUsedDB spaceUsedDB PieceSpaceUsedDB
v0PieceInfo V0PieceInfoDB
Filewalker *FileWalker
} }
// StoreForTest is a wrapper around Store to be used only in test scenarios. It enables writing // StoreForTest is a wrapper around Store to be used only in test scenarios. It enables writing
@ -184,16 +186,15 @@ type StoreForTest struct {
} }
// NewStore creates a new piece store. // NewStore creates a new piece store.
func NewStore(log *zap.Logger, blobs storage.Blobs, v0PieceInfo V0PieceInfoDB, func NewStore(log *zap.Logger, fw *FileWalker, blobs storage.Blobs, v0PieceInfo V0PieceInfoDB, expirationInfo PieceExpirationDB, spaceUsedDB PieceSpaceUsedDB, config Config) *Store {
expirationInfo PieceExpirationDB, pieceSpaceUsedDB PieceSpaceUsedDB, config Config) *Store {
return &Store{ return &Store{
log: log, log: log,
config: config, config: config,
blobs: blobs, blobs: blobs,
v0PieceInfo: v0PieceInfo,
expirationInfo: expirationInfo, expirationInfo: expirationInfo,
spaceUsedDB: pieceSpaceUsedDB, spaceUsedDB: spaceUsedDB,
v0PieceInfo: v0PieceInfo,
Filewalker: fw,
} }
} }
@ -276,13 +277,14 @@ func (store StoreForTest) WriterForFormatVersion(ctx context.Context, satellite
return writer, Error.Wrap(err) return writer, Error.Wrap(err)
} }
// Reader returns a new piece reader. // ReaderWithStorageFormat returns a new piece reader for a located piece, which avoids the
func (store *Store) Reader(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (_ *Reader, err error) { // potential need to check multiple storage formats to find the right blob.
func (store *StoreForTest) ReaderWithStorageFormat(ctx context.Context, satellite storj.NodeID,
pieceID storj.PieceID, formatVersion storage.FormatVersion) (_ *Reader, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
blob, err := store.blobs.Open(ctx, storage.BlobRef{ ref := storage.BlobRef{Namespace: satellite.Bytes(), Key: pieceID.Bytes()}
Namespace: satellite.Bytes(), blob, err := store.blobs.OpenWithStorageFormat(ctx, ref, formatVersion)
Key: pieceID.Bytes(),
})
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
return nil, err return nil, err
@ -294,14 +296,13 @@ func (store *Store) Reader(ctx context.Context, satellite storj.NodeID, pieceID
return reader, Error.Wrap(err) return reader, Error.Wrap(err)
} }
// ReaderWithStorageFormat returns a new piece reader for a located piece, which avoids the // Reader returns a new piece reader.
// potential need to check multiple storage formats to find the right blob. func (store *Store) Reader(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (_ *Reader, err error) {
func (store *Store) ReaderWithStorageFormat(ctx context.Context, satellite storj.NodeID,
pieceID storj.PieceID, formatVersion storage.FormatVersion) (_ *Reader, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
ref := storage.BlobRef{Namespace: satellite.Bytes(), Key: pieceID.Bytes()} blob, err := store.blobs.Open(ctx, storage.BlobRef{
blob, err := store.blobs.OpenWithStorageFormat(ctx, ref, formatVersion) Namespace: satellite.Bytes(),
Key: pieceID.Bytes(),
})
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
return nil, err return nil, err
@ -520,33 +521,11 @@ func (store *Store) GetHashAndLimit(ctx context.Context, satellite storj.NodeID,
return pieceHash, header.OrderLimit, nil return pieceHash, header.OrderLimit, nil
} }
// WalkSatellitePieces executes walkFunc for each locally stored piece in the namespace of the // WalkSatellitePieces wraps FileWalker.WalkSatellitePieces.
// given satellite. If walkFunc returns a non-nil error, WalkSatellitePieces will stop iterating
// and return the error immediately. The ctx parameter is intended specifically to allow canceling
// iteration early.
//
// Note that this method includes all locally stored pieces, both V0 and higher.
func (store *Store) WalkSatellitePieces(ctx context.Context, satellite storj.NodeID, walkFunc func(StoredPieceAccess) error) (err error) { func (store *Store) WalkSatellitePieces(ctx context.Context, satellite storj.NodeID, walkFunc func(StoredPieceAccess) error) (err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
// first iterate over all in V1 storage, then all in V0
err = store.blobs.WalkNamespace(ctx, satellite.Bytes(), func(blobInfo storage.BlobInfo) error { return store.Filewalker.WalkSatellitePieces(ctx, satellite, walkFunc)
if blobInfo.StorageFormatVersion() < filestore.FormatV1 {
// we'll address this piece while iterating over the V0 pieces below.
return nil
}
pieceAccess, err := newStoredPieceAccess(store, blobInfo)
if err != nil {
// this is not a real piece blob. the blob store can't distinguish between actual piece
// blobs and stray files whose names happen to decode as valid base32. skip this
// "blob".
return nil //nolint: nilerr // we ignore other files
}
return walkFunc(pieceAccess)
})
if err == nil && store.v0PieceInfo != nil {
err = store.v0PieceInfo.WalkSatelliteV0Pieces(ctx, store.blobs, satellite, walkFunc)
}
return err
} }
// GetExpired gets piece IDs that are expired and were created before the given time. // GetExpired gets piece IDs that are expired and were created before the given time.
@ -781,18 +760,20 @@ func (store *Store) Stat(ctx context.Context, satellite storj.NodeID, pieceID st
type storedPieceAccess struct { type storedPieceAccess struct {
storage.BlobInfo storage.BlobInfo
store *Store
pieceID storj.PieceID pieceID storj.PieceID
blobs storage.Blobs
} }
func newStoredPieceAccess(store *Store, blobInfo storage.BlobInfo) (storedPieceAccess, error) { func newStoredPieceAccess(blobs storage.Blobs, blobInfo storage.BlobInfo) (storedPieceAccess, error) {
pieceID, err := storj.PieceIDFromBytes(blobInfo.BlobRef().Key) ref := blobInfo.BlobRef()
pieceID, err := storj.PieceIDFromBytes(ref.Key)
if err != nil { if err != nil {
return storedPieceAccess{}, err return storedPieceAccess{}, err
} }
return storedPieceAccess{ return storedPieceAccess{
BlobInfo: blobInfo, BlobInfo: blobInfo,
store: store, blobs: blobs,
pieceID: pieceID, pieceID: pieceID,
}, nil }, nil
} }
@ -827,11 +808,16 @@ func (access storedPieceAccess) Size(ctx context.Context) (size, contentSize int
// header. If exact precision is not required, ModTime() may be a better solution. // header. If exact precision is not required, ModTime() may be a better solution.
func (access storedPieceAccess) CreationTime(ctx context.Context) (cTime time.Time, err error) { func (access storedPieceAccess) CreationTime(ctx context.Context) (cTime time.Time, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
satellite, err := access.Satellite()
blob, err := access.blobs.OpenWithStorageFormat(ctx, access.BlobInfo.BlobRef(), access.BlobInfo.StorageFormatVersion())
if err != nil { if err != nil {
if os.IsNotExist(err) {
return time.Time{}, err
}
return time.Time{}, err return time.Time{}, err
} }
reader, err := access.store.ReaderWithStorageFormat(ctx, satellite, access.PieceID(), access.StorageFormatVersion())
reader, err := NewReader(blob)
if err != nil { if err != nil {
return time.Time{}, err return time.Time{}, err
} }

View File

@ -38,13 +38,16 @@ func TestPieces(t *testing.T) {
ctx := testcontext.New(t) ctx := testcontext.New(t)
defer ctx.Cleanup() defer ctx.Cleanup()
dir, err := filestore.NewDir(zaptest.NewLogger(t), ctx.Dir("pieces")) log := zaptest.NewLogger(t)
dir, err := filestore.NewDir(log, ctx.Dir("pieces"))
require.NoError(t, err) require.NoError(t, err)
blobs := filestore.New(zaptest.NewLogger(t), dir, filestore.DefaultConfig) blobs := filestore.New(log, dir, filestore.DefaultConfig)
defer ctx.Check(blobs.Close) defer ctx.Check(blobs.Close)
store := pieces.NewStore(zaptest.NewLogger(t), blobs, nil, nil, nil, pieces.DefaultConfig) fw := pieces.NewFileWalker(log, blobs, nil)
store := pieces.NewStore(log, fw, blobs, nil, nil, nil, pieces.DefaultConfig)
satelliteID := testidentity.MustPregeneratedSignedIdentity(0, storj.LatestIDVersion()).ID satelliteID := testidentity.MustPregeneratedSignedIdentity(0, storj.LatestIDVersion()).ID
pieceID := storj.NewPieceID() pieceID := storj.NewPieceID()
@ -173,7 +176,7 @@ func verifyPieceHandle(t testing.TB, reader *pieces.Reader, expectDataLen int, e
} }
} }
func tryOpeningAPiece(ctx context.Context, t testing.TB, store *pieces.Store, satelliteID storj.NodeID, pieceID storj.PieceID, expectDataLen int, expectTime time.Time, expectFormat storage.FormatVersion) { func tryOpeningAPiece(ctx context.Context, t testing.TB, store *pieces.StoreForTest, satelliteID storj.NodeID, pieceID storj.PieceID, expectDataLen int, expectTime time.Time, expectFormat storage.FormatVersion) {
reader, err := store.Reader(ctx, satelliteID, pieceID) reader, err := store.Reader(ctx, satelliteID, pieceID)
require.NoError(t, err) require.NoError(t, err)
verifyPieceHandle(t, reader, expectDataLen, expectTime, expectFormat) verifyPieceHandle(t, reader, expectDataLen, expectTime, expectFormat)
@ -310,17 +313,20 @@ func TestTrashAndRestore(t *testing.T) {
} }
storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) { storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) {
dir, err := filestore.NewDir(zaptest.NewLogger(t), ctx.Dir("store")) log := zaptest.NewLogger(t)
dir, err := filestore.NewDir(log, ctx.Dir("store"))
require.NoError(t, err) require.NoError(t, err)
blobs := filestore.New(zaptest.NewLogger(t), dir, filestore.DefaultConfig) blobs := filestore.New(log, dir, filestore.DefaultConfig)
require.NoError(t, err) require.NoError(t, err)
defer ctx.Check(blobs.Close) defer ctx.Check(blobs.Close)
v0PieceInfo, ok := db.V0PieceInfo().(pieces.V0PieceInfoDBForTest) v0PieceInfo, ok := db.V0PieceInfo().(pieces.V0PieceInfoDBForTest)
require.True(t, ok, "V0PieceInfoDB can not satisfy V0PieceInfoDBForTest") require.True(t, ok, "V0PieceInfoDB can not satisfy V0PieceInfoDBForTest")
store := pieces.NewStore(zaptest.NewLogger(t), blobs, v0PieceInfo, db.PieceExpirationDB(), nil, pieces.DefaultConfig) fw := pieces.NewFileWalker(log, blobs, v0PieceInfo)
store := pieces.NewStore(log, fw, blobs, v0PieceInfo, db.PieceExpirationDB(), nil, pieces.DefaultConfig)
tStore := &pieces.StoreForTest{store} tStore := &pieces.StoreForTest{store}
var satelliteURLs []trust.SatelliteURL var satelliteURLs []trust.SatelliteURL
@ -374,7 +380,7 @@ func TestTrashAndRestore(t *testing.T) {
} }
// Verify piece matches data, has correct signature and expiration // Verify piece matches data, has correct signature and expiration
verifyPieceData(ctx, t, store, satellite.satelliteID, piece.pieceID, file.formatVer, file.data, piece.expiration, publicKey) verifyPieceData(ctx, t, tStore, satellite.satelliteID, piece.pieceID, file.formatVer, file.data, piece.expiration, publicKey)
} }
@ -412,14 +418,14 @@ func TestTrashAndRestore(t *testing.T) {
for _, satelliteURL := range satelliteURLs { for _, satelliteURL := range satelliteURLs {
poolConfig.Sources = append(poolConfig.Sources, &trust.StaticURLSource{URL: satelliteURL}) poolConfig.Sources = append(poolConfig.Sources, &trust.StaticURLSource{URL: satelliteURL})
} }
trust, err := trust.NewPool(zaptest.NewLogger(t), trust.Dialer(rpc.Dialer{}), poolConfig, db.Satellites()) trust, err := trust.NewPool(log, trust.Dialer(rpc.Dialer{}), poolConfig, db.Satellites())
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, trust.Refresh(ctx)) require.NoError(t, trust.Refresh(ctx))
// Empty trash by running the chore once // Empty trash by running the chore once
trashDur := 4 * 24 * time.Hour trashDur := 4 * 24 * time.Hour
chorectx, chorecancel := context.WithCancel(ctx) chorectx, chorecancel := context.WithCancel(ctx)
chore := pieces.NewTrashChore(zaptest.NewLogger(t), 24*time.Hour, trashDur, trust, store) chore := pieces.NewTrashChore(log, 24*time.Hour, trashDur, trust, store)
ctx.Go(func() error { ctx.Go(func() error {
return chore.Run(chorectx) return chore.Run(chorectx)
}) })
@ -437,7 +443,7 @@ func TestTrashAndRestore(t *testing.T) {
if piece.trashDur < trashDur { if piece.trashDur < trashDur {
// Expect the piece to be there // Expect the piece to be there
lastFile := piece.files[len(piece.files)-1] lastFile := piece.files[len(piece.files)-1]
verifyPieceData(ctx, t, store, satellites[0].satelliteID, piece.pieceID, filestore.MaxFormatVersionSupported, lastFile.data, piece.expiration, publicKey) verifyPieceData(ctx, t, tStore, satellites[0].satelliteID, piece.pieceID, filestore.MaxFormatVersionSupported, lastFile.data, piece.expiration, publicKey)
} else { } else {
// Expect the piece to be missing, it should be removed from the trash on EmptyTrash // Expect the piece to be missing, it should be removed from the trash on EmptyTrash
r, err := store.Reader(ctx, satellites[1].satelliteID, piece.pieceID) r, err := store.Reader(ctx, satellites[1].satelliteID, piece.pieceID)
@ -459,7 +465,7 @@ func TestTrashAndRestore(t *testing.T) {
if piece.trashDur < trashDur { if piece.trashDur < trashDur {
// Expect the piece to be there // Expect the piece to be there
lastFile := piece.files[len(piece.files)-1] lastFile := piece.files[len(piece.files)-1]
verifyPieceData(ctx, t, store, satellites[1].satelliteID, piece.pieceID, filestore.MaxFormatVersionSupported, lastFile.data, piece.expiration, publicKey) verifyPieceData(ctx, t, tStore, satellites[1].satelliteID, piece.pieceID, filestore.MaxFormatVersionSupported, lastFile.data, piece.expiration, publicKey)
} else { } else {
// Expect the piece to be missing, it should be removed from the trash on EmptyTrash // Expect the piece to be missing, it should be removed from the trash on EmptyTrash
r, err := store.Reader(ctx, satellites[1].satelliteID, piece.pieceID) r, err := store.Reader(ctx, satellites[1].satelliteID, piece.pieceID)
@ -470,7 +476,7 @@ func TestTrashAndRestore(t *testing.T) {
}) })
} }
func verifyPieceData(ctx context.Context, t testing.TB, store *pieces.Store, satelliteID storj.NodeID, pieceID storj.PieceID, formatVer storage.FormatVersion, expected []byte, expiration time.Time, publicKey storj.PiecePublicKey) { func verifyPieceData(ctx context.Context, t testing.TB, store *pieces.StoreForTest, satelliteID storj.NodeID, pieceID storj.PieceID, formatVer storage.FormatVersion, expected []byte, expiration time.Time, publicKey storj.PiecePublicKey) {
r, err := store.ReaderWithStorageFormat(ctx, satelliteID, pieceID, formatVer) r, err := store.ReaderWithStorageFormat(ctx, satelliteID, pieceID, formatVer)
require.NoError(t, err) require.NoError(t, err)
@ -553,11 +559,14 @@ func TestPieceVersionMigrate(t *testing.T) {
v0PieceInfo, ok := db.V0PieceInfo().(pieces.V0PieceInfoDBForTest) v0PieceInfo, ok := db.V0PieceInfo().(pieces.V0PieceInfoDBForTest)
require.True(t, ok, "V0PieceInfoDB can not satisfy V0PieceInfoDBForTest") require.True(t, ok, "V0PieceInfoDB can not satisfy V0PieceInfoDBForTest")
blobs, err := filestore.NewAt(zaptest.NewLogger(t), ctx.Dir("store"), filestore.DefaultConfig) log := zaptest.NewLogger(t)
blobs, err := filestore.NewAt(log, ctx.Dir("store"), filestore.DefaultConfig)
require.NoError(t, err) require.NoError(t, err)
defer ctx.Check(blobs.Close) defer ctx.Check(blobs.Close)
store := pieces.NewStore(zaptest.NewLogger(t), blobs, v0PieceInfo, nil, nil, pieces.DefaultConfig) fw := pieces.NewFileWalker(log, blobs, v0PieceInfo)
store := pieces.NewStore(log, fw, blobs, v0PieceInfo, nil, nil, pieces.DefaultConfig)
// write as a v0 piece // write as a v0 piece
tStore := &pieces.StoreForTest{store} tStore := &pieces.StoreForTest{store}
@ -597,16 +606,16 @@ func TestPieceVersionMigrate(t *testing.T) {
require.NoError(t, v0PieceInfo.Add(ctx, &pieceInfo)) require.NoError(t, v0PieceInfo.Add(ctx, &pieceInfo))
// verify piece can be opened as v0 // verify piece can be opened as v0
tryOpeningAPiece(ctx, t, store, satelliteID, pieceID, len(data), now, filestore.FormatV0) tryOpeningAPiece(ctx, t, tStore, satelliteID, pieceID, len(data), now, filestore.FormatV0)
// run migration // run migration
require.NoError(t, store.MigrateV0ToV1(ctx, satelliteID, pieceID)) require.NoError(t, store.MigrateV0ToV1(ctx, satelliteID, pieceID))
// open as v1 piece // open as v1 piece
tryOpeningAPiece(ctx, t, store, satelliteID, pieceID, len(data), now, filestore.FormatV1) tryOpeningAPiece(ctx, t, tStore, satelliteID, pieceID, len(data), now, filestore.FormatV1)
// manually read v1 piece // manually read v1 piece
reader, err := store.ReaderWithStorageFormat(ctx, satelliteID, pieceID, filestore.FormatV1) reader, err := tStore.ReaderWithStorageFormat(ctx, satelliteID, pieceID, filestore.FormatV1)
require.NoError(t, err) require.NoError(t, err)
// generate v1 pieceHash and verify signature is still valid // generate v1 pieceHash and verify signature is still valid
@ -636,11 +645,15 @@ func TestMultipleStorageFormatVersions(t *testing.T) {
ctx := testcontext.New(t) ctx := testcontext.New(t)
defer ctx.Cleanup() defer ctx.Cleanup()
blobs, err := filestore.NewAt(zaptest.NewLogger(t), ctx.Dir("store"), filestore.DefaultConfig) log := zaptest.NewLogger(t)
blobs, err := filestore.NewAt(log, ctx.Dir("store"), filestore.DefaultConfig)
require.NoError(t, err) require.NoError(t, err)
defer ctx.Check(blobs.Close) defer ctx.Check(blobs.Close)
store := pieces.NewStore(zaptest.NewLogger(t), blobs, nil, nil, nil, pieces.DefaultConfig) fw := pieces.NewFileWalker(log, blobs, nil)
store := pieces.NewStore(log, fw, blobs, nil, nil, nil, pieces.DefaultConfig)
tStore := &pieces.StoreForTest{store}
const pieceSize = 1024 const pieceSize = 1024
@ -659,8 +672,8 @@ func TestMultipleStorageFormatVersions(t *testing.T) {
writeAPiece(ctx, t, store, satellite, v1PieceID, data, now, nil, filestore.FormatV1) writeAPiece(ctx, t, store, satellite, v1PieceID, data, now, nil, filestore.FormatV1)
// look up the different pieces with Reader and ReaderWithStorageFormat // look up the different pieces with Reader and ReaderWithStorageFormat
tryOpeningAPiece(ctx, t, store, satellite, v0PieceID, len(data), now, filestore.FormatV0) tryOpeningAPiece(ctx, t, tStore, satellite, v0PieceID, len(data), now, filestore.FormatV0)
tryOpeningAPiece(ctx, t, store, satellite, v1PieceID, len(data), now, filestore.FormatV1) tryOpeningAPiece(ctx, t, tStore, satellite, v1PieceID, len(data), now, filestore.FormatV1)
// write a V1 piece with the same ID as the V0 piece (to simulate it being rewritten as // write a V1 piece with the same ID as the V0 piece (to simulate it being rewritten as
// V1 during a migration) // V1 during a migration)
@ -668,10 +681,10 @@ func TestMultipleStorageFormatVersions(t *testing.T) {
writeAPiece(ctx, t, store, satellite, v0PieceID, differentData, now, nil, filestore.FormatV1) writeAPiece(ctx, t, store, satellite, v0PieceID, differentData, now, nil, filestore.FormatV1)
// if we try to access the piece at that key, we should see only the V1 piece // if we try to access the piece at that key, we should see only the V1 piece
tryOpeningAPiece(ctx, t, store, satellite, v0PieceID, len(differentData), now, filestore.FormatV1) tryOpeningAPiece(ctx, t, tStore, satellite, v0PieceID, len(differentData), now, filestore.FormatV1)
// unless we ask specifically for a V0 piece // unless we ask specifically for a V0 piece
reader, err := store.ReaderWithStorageFormat(ctx, satellite, v0PieceID, filestore.FormatV0) reader, err := tStore.ReaderWithStorageFormat(ctx, satellite, v0PieceID, filestore.FormatV0)
require.NoError(t, err) require.NoError(t, err)
verifyPieceHandle(t, reader, len(data), now, filestore.FormatV0) verifyPieceHandle(t, reader, len(data), now, filestore.FormatV0)
require.NoError(t, reader.Close()) require.NoError(t, reader.Close())
@ -692,7 +705,11 @@ func TestGetExpired(t *testing.T) {
require.True(t, ok, "V0PieceInfoDB can not satisfy V0PieceInfoDBForTest") require.True(t, ok, "V0PieceInfoDB can not satisfy V0PieceInfoDBForTest")
expirationInfo := db.PieceExpirationDB() expirationInfo := db.PieceExpirationDB()
store := pieces.NewStore(zaptest.NewLogger(t), db.Pieces(), v0PieceInfo, expirationInfo, db.PieceSpaceUsedDB(), pieces.DefaultConfig) log := zaptest.NewLogger(t)
blobs := db.Pieces()
fw := pieces.NewFileWalker(log, blobs, v0PieceInfo)
store := pieces.NewStore(log, fw, blobs, v0PieceInfo, expirationInfo, db.PieceSpaceUsedDB(), pieces.DefaultConfig)
now := time.Now() now := time.Now()
testDates := []struct { testDates := []struct {
@ -759,7 +776,11 @@ func TestOverwriteV0WithV1(t *testing.T) {
require.True(t, ok, "V0PieceInfoDB can not satisfy V0PieceInfoDBForTest") require.True(t, ok, "V0PieceInfoDB can not satisfy V0PieceInfoDBForTest")
expirationInfo := db.PieceExpirationDB() expirationInfo := db.PieceExpirationDB()
store := pieces.NewStore(zaptest.NewLogger(t), db.Pieces(), v0PieceInfo, expirationInfo, db.PieceSpaceUsedDB(), pieces.DefaultConfig) log := zaptest.NewLogger(t)
blobs := db.Pieces()
fw := pieces.NewFileWalker(log, blobs, v0PieceInfo)
store := pieces.NewStore(log, fw, blobs, v0PieceInfo, expirationInfo, db.PieceSpaceUsedDB(), pieces.DefaultConfig)
satelliteID := testrand.NodeID() satelliteID := testrand.NodeID()
pieceID := testrand.PieceID() pieceID := testrand.PieceID()

View File

@ -31,7 +31,11 @@ import (
func TestRetainPieces(t *testing.T) { func TestRetainPieces(t *testing.T) {
storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) { storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) {
store := pieces.NewStore(zaptest.NewLogger(t), db.Pieces(), db.V0PieceInfo(), db.PieceExpirationDB(), db.PieceSpaceUsedDB(), pieces.DefaultConfig) log := zaptest.NewLogger(t)
blobs := db.Pieces()
v0PieceInfo := db.V0PieceInfo()
fw := pieces.NewFileWalker(log, blobs, v0PieceInfo)
store := pieces.NewStore(log, fw, blobs, v0PieceInfo, db.PieceExpirationDB(), db.PieceSpaceUsedDB(), pieces.DefaultConfig)
testStore := pieces.StoreForTest{Store: store} testStore := pieces.StoreForTest{Store: store}
const numPieces = 100 const numPieces = 100