From f5020de57c9200e61aaa842013263075de229333 Mon Sep 17 00:00:00 2001 From: Egon Elbre Date: Wed, 5 Apr 2023 20:03:06 +0300 Subject: [PATCH] storagenode/blobstore: move blob store logic The blobstore implementation is entirely related to storagenode, so the rightful place is together with the storagenode implementation. Fixes https://github.com/storj/storj/issues/5754 Change-Id: Ie6637b0262cf37af6c3e558556c7604d9dc3613d --- private/testplanet/storagenode.go | 2 +- satellite/audit/reverify_test.go | 2 +- satellite/audit/verifier_test.go | 8 +-- satellite/gc/gc_test.go | 8 +-- satellite/gracefulexit/endpoint_test.go | 2 +- satellite/metainfo/endpoint_object_test.go | 6 +- .../metainfo/piecedeletion/service_test.go | 2 +- satellite/repair/repair_test.go | 6 +- {storage => storagenode/blobstore}/blob.go | 2 +- .../blobstore}/filestore/blob.go | 20 +++--- .../blobstore}/filestore/dir.go | 60 ++++++++-------- .../blobstore}/filestore/dir_test.go | 0 .../blobstore}/filestore/dir_unix.go | 0 .../blobstore}/filestore/dir_windows.go | 0 .../blobstore}/filestore/error.go | 0 .../blobstore}/filestore/errors_other.go | 0 .../blobstore}/filestore/errors_unix.go | 0 .../blobstore}/filestore/store.go | 34 ++++----- .../blobstore}/filestore/store_test.go | 70 +++++++++---------- .../blobstore}/testblobs/bad.go | 30 ++++---- .../blobstore}/testblobs/limitedspace.go | 8 +-- .../blobstore}/testblobs/slow.go | 26 +++---- storagenode/gracefulexit/chore_test.go | 6 +- storagenode/gracefulexit/worker_test.go | 2 +- storagenode/peer.go | 6 +- storagenode/pieces/cache.go | 20 +++--- storagenode/pieces/cache_test.go | 18 ++--- storagenode/pieces/deleter_test.go | 2 +- storagenode/pieces/filewalker.go | 10 +-- storagenode/pieces/readwrite.go | 22 +++--- storagenode/pieces/readwrite_test.go | 6 +- storagenode/pieces/store.go | 48 ++++++------- storagenode/pieces/store_test.go | 16 ++--- storagenode/piecestore/endpoint_test.go | 2 +- storagenode/retain/retain_test.go | 6 +- storagenode/storagenodedb/database.go | 8 +-- storagenode/storagenodedb/migrations_test.go | 2 +- storagenode/storagenodedb/pieceinfo.go | 20 +++--- .../storagenodedbtest/run_test.go | 2 +- .../storagenodedbtest/snapshot_test.go | 2 +- 40 files changed, 243 insertions(+), 241 deletions(-) rename {storage => storagenode/blobstore}/blob.go (99%) rename {storage => storagenode/blobstore}/filestore/blob.go (84%) rename {storage => storagenode/blobstore}/filestore/dir.go (91%) rename {storage => storagenode/blobstore}/filestore/dir_test.go (100%) rename {storage => storagenode/blobstore}/filestore/dir_unix.go (100%) rename {storage => storagenode/blobstore}/filestore/dir_windows.go (100%) rename {storage => storagenode/blobstore}/filestore/error.go (100%) rename {storage => storagenode/blobstore}/filestore/errors_other.go (100%) rename {storage => storagenode/blobstore}/filestore/errors_unix.go (100%) rename {storage => storagenode/blobstore}/filestore/store.go (85%) rename {storage => storagenode/blobstore}/filestore/store_test.go (92%) rename {private => storagenode/blobstore}/testblobs/bad.go (86%) rename {private => storagenode/blobstore}/testblobs/limitedspace.go (85%) rename {private => storagenode/blobstore}/testblobs/slow.go (88%) diff --git a/private/testplanet/storagenode.go b/private/testplanet/storagenode.go index c77868708..742c76d9e 100644 --- a/private/testplanet/storagenode.go +++ b/private/testplanet/storagenode.go @@ -23,10 +23,10 @@ import ( "storj.io/private/debug" "storj.io/storj/private/revocation" "storj.io/storj/private/server" - "storj.io/storj/storage/filestore" "storj.io/storj/storagenode" "storj.io/storj/storagenode/apikeys" "storj.io/storj/storagenode/bandwidth" + "storj.io/storj/storagenode/blobstore/filestore" "storj.io/storj/storagenode/collector" "storj.io/storj/storagenode/console/consoleserver" "storj.io/storj/storagenode/contact" diff --git a/satellite/audit/reverify_test.go b/satellite/audit/reverify_test.go index 89cb9c64f..9b4dad61f 100644 --- a/satellite/audit/reverify_test.go +++ b/satellite/audit/reverify_test.go @@ -19,12 +19,12 @@ import ( "storj.io/common/sync2" "storj.io/common/testcontext" "storj.io/common/testrand" - "storj.io/storj/private/testblobs" "storj.io/storj/private/testplanet" "storj.io/storj/satellite" "storj.io/storj/satellite/audit" "storj.io/storj/satellite/metabase" "storj.io/storj/storagenode" + "storj.io/storj/storagenode/blobstore/testblobs" ) func TestReverifySuccess(t *testing.T) { diff --git a/satellite/audit/verifier_test.go b/satellite/audit/verifier_test.go index f81aca6ca..995ad96ec 100644 --- a/satellite/audit/verifier_test.go +++ b/satellite/audit/verifier_test.go @@ -26,13 +26,13 @@ import ( "storj.io/common/testcontext" "storj.io/common/testrand" "storj.io/common/uuid" - "storj.io/storj/private/testblobs" "storj.io/storj/private/testplanet" "storj.io/storj/satellite" "storj.io/storj/satellite/audit" "storj.io/storj/satellite/metabase" - "storj.io/storj/storage" "storj.io/storj/storagenode" + "storj.io/storj/storagenode/blobstore" + "storj.io/storj/storagenode/blobstore/testblobs" ) // TestDownloadSharesHappyPath checks that the Share.Error field of all shares @@ -1108,7 +1108,7 @@ func getRemoteSegment( func corruptPieceData(ctx context.Context, t *testing.T, planet *testplanet.Planet, corruptedNode *testplanet.StorageNode, corruptedPieceID storj.PieceID) { t.Helper() - blobRef := storage.BlobRef{ + blobRef := blobstore.BlobRef{ Namespace: planet.Satellites[0].ID().Bytes(), Key: corruptedPieceID.Bytes(), } @@ -1566,7 +1566,7 @@ func newBadBlobsAllowVerify(log *zap.Logger, nodeDB storagenode.DB) storagenode. type badBlobsAllowVerify struct { testblobs.ErrorBlobs - goodBlobs storage.Blobs + goodBlobs blobstore.Blobs } func (b *badBlobsAllowVerify) VerifyStorageDir(ctx context.Context, id storj.NodeID) error { diff --git a/satellite/gc/gc_test.go b/satellite/gc/gc_test.go index 70cc229e9..a797010f4 100644 --- a/satellite/gc/gc_test.go +++ b/satellite/gc/gc_test.go @@ -23,8 +23,8 @@ import ( "storj.io/storj/private/testplanet" "storj.io/storj/satellite/gc/bloomfilter" "storj.io/storj/satellite/metabase" - "storj.io/storj/storage" "storj.io/storj/storagenode" + "storj.io/storj/storagenode/blobstore" "storj.io/uplink/private/eestream" "storj.io/uplink/private/testuplink" ) @@ -102,7 +102,7 @@ func TestGarbageCollection(t *testing.T) { require.NoError(t, err) // Check that piece of the deleted object is on the storagenode - pieceAccess, err := targetNode.DB.Pieces().Stat(ctx, storage.BlobRef{ + pieceAccess, err := targetNode.DB.Pieces().Stat(ctx, blobstore.BlobRef{ Namespace: satellite.ID().Bytes(), Key: deletedPieceID.Bytes(), }) @@ -128,7 +128,7 @@ func TestGarbageCollection(t *testing.T) { targetNode.Storage2.RetainService.TestWaitUntilEmpty() // Check that piece of the deleted object is not on the storagenode - pieceAccess, err = targetNode.DB.Pieces().Stat(ctx, storage.BlobRef{ + pieceAccess, err = targetNode.DB.Pieces().Stat(ctx, blobstore.BlobRef{ Namespace: satellite.ID().Bytes(), Key: deletedPieceID.Bytes(), }) @@ -136,7 +136,7 @@ func TestGarbageCollection(t *testing.T) { require.Nil(t, pieceAccess) // Check that piece of the kept object is on the storagenode - pieceAccess, err = targetNode.DB.Pieces().Stat(ctx, storage.BlobRef{ + pieceAccess, err = targetNode.DB.Pieces().Stat(ctx, blobstore.BlobRef{ Namespace: satellite.ID().Bytes(), Key: keptPieceID.Bytes(), }) diff --git a/satellite/gracefulexit/endpoint_test.go b/satellite/gracefulexit/endpoint_test.go index f81cecf11..19111c08e 100644 --- a/satellite/gracefulexit/endpoint_test.go +++ b/satellite/gracefulexit/endpoint_test.go @@ -26,13 +26,13 @@ import ( "storj.io/common/sync2" "storj.io/common/testcontext" "storj.io/common/testrand" - "storj.io/storj/private/testblobs" "storj.io/storj/private/testplanet" "storj.io/storj/satellite" "storj.io/storj/satellite/metabase" "storj.io/storj/satellite/metainfo" "storj.io/storj/satellite/overlay" "storj.io/storj/storagenode" + "storj.io/storj/storagenode/blobstore/testblobs" "storj.io/storj/storagenode/gracefulexit" ) diff --git a/satellite/metainfo/endpoint_object_test.go b/satellite/metainfo/endpoint_object_test.go index 44424115a..7e7271f16 100644 --- a/satellite/metainfo/endpoint_object_test.go +++ b/satellite/metainfo/endpoint_object_test.go @@ -35,7 +35,7 @@ import ( "storj.io/storj/satellite/internalpb" "storj.io/storj/satellite/metabase" "storj.io/storj/satellite/metainfo" - "storj.io/storj/storage" + "storj.io/storj/storagenode/blobstore" "storj.io/uplink" "storj.io/uplink/private/metaclient" "storj.io/uplink/private/object" @@ -1372,7 +1372,7 @@ func TestEndpoint_Object_With_StorageNodes(t *testing.T) { node := planet.FindNode(piece.StorageNode) pieceID := segments[0].RootPieceID.Derive(piece.StorageNode, int32(piece.Number)) - piece, err := node.DB.Pieces().Stat(ctx, storage.BlobRef{ + piece, err := node.DB.Pieces().Stat(ctx, blobstore.BlobRef{ Namespace: planet.Satellites[0].ID().Bytes(), Key: pieceID.Bytes(), }) @@ -1392,7 +1392,7 @@ func TestEndpoint_Object_With_StorageNodes(t *testing.T) { node := planet.FindNode(piece.StorageNode) pieceID := segments[0].RootPieceID.Derive(piece.StorageNode, int32(piece.Number)) - piece, err := node.DB.Pieces().Stat(ctx, storage.BlobRef{ + piece, err := node.DB.Pieces().Stat(ctx, blobstore.BlobRef{ Namespace: planet.Satellites[0].ID().Bytes(), Key: pieceID.Bytes(), }) diff --git a/satellite/metainfo/piecedeletion/service_test.go b/satellite/metainfo/piecedeletion/service_test.go index b8cf5be6d..e98b9e5f4 100644 --- a/satellite/metainfo/piecedeletion/service_test.go +++ b/satellite/metainfo/piecedeletion/service_test.go @@ -18,12 +18,12 @@ import ( "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/common/testrand" - "storj.io/storj/private/testblobs" "storj.io/storj/private/testplanet" "storj.io/storj/satellite" "storj.io/storj/satellite/metainfo/piecedeletion" "storj.io/storj/satellite/overlay" "storj.io/storj/storagenode" + "storj.io/storj/storagenode/blobstore/testblobs" "storj.io/storj/storagenode/pieces" ) diff --git a/satellite/repair/repair_test.go b/satellite/repair/repair_test.go index 33f941b59..852fbe87d 100644 --- a/satellite/repair/repair_test.go +++ b/satellite/repair/repair_test.go @@ -26,7 +26,6 @@ import ( "storj.io/common/testcontext" "storj.io/common/testrand" "storj.io/common/uuid" - "storj.io/storj/private/testblobs" "storj.io/storj/private/testplanet" "storj.io/storj/satellite" "storj.io/storj/satellite/accounting" @@ -35,8 +34,9 @@ import ( "storj.io/storj/satellite/repair/checker" "storj.io/storj/satellite/repair/repairer" "storj.io/storj/satellite/reputation" - "storj.io/storj/storage" "storj.io/storj/storagenode" + "storj.io/storj/storagenode/blobstore" + "storj.io/storj/storagenode/blobstore/testblobs" "storj.io/uplink/private/eestream" "storj.io/uplink/private/piecestore" ) @@ -2378,7 +2378,7 @@ func getRemoteSegment( func corruptPieceData(ctx context.Context, t *testing.T, planet *testplanet.Planet, corruptedNode *testplanet.StorageNode, corruptedPieceID storj.PieceID) { t.Helper() - blobRef := storage.BlobRef{ + blobRef := blobstore.BlobRef{ Namespace: planet.Satellites[0].ID().Bytes(), Key: corruptedPieceID.Bytes(), } diff --git a/storage/blob.go b/storagenode/blobstore/blob.go similarity index 99% rename from storage/blob.go rename to storagenode/blobstore/blob.go index 6be446664..2635ed43e 100644 --- a/storage/blob.go +++ b/storagenode/blobstore/blob.go @@ -1,7 +1,7 @@ // Copyright (C) 2019 Storj Labs, Inc. // See LICENSE for copying information. -package storage +package blobstore import ( "context" diff --git a/storage/filestore/blob.go b/storagenode/blobstore/filestore/blob.go similarity index 84% rename from storage/filestore/blob.go rename to storagenode/blobstore/filestore/blob.go index 89a848c11..1694ae447 100644 --- a/storage/filestore/blob.go +++ b/storagenode/blobstore/filestore/blob.go @@ -11,15 +11,15 @@ import ( "github.com/zeebo/errs" - "storj.io/storj/storage" + "storj.io/storj/storagenode/blobstore" ) const ( // FormatV0 is the identifier for storage format v0, which also corresponds to an absence of // format version information. - FormatV0 storage.FormatVersion = 0 + FormatV0 blobstore.FormatVersion = 0 // FormatV1 is the identifier for storage format v1. - FormatV1 storage.FormatVersion = 1 + FormatV1 blobstore.FormatVersion = 1 // Note: New FormatVersion values should be consecutive, as certain parts of this blob store // iterate over them numerically and check for blobs stored with each version. @@ -42,10 +42,10 @@ const ( // blobReader implements reading blobs. type blobReader struct { *os.File - formatVersion storage.FormatVersion + formatVersion blobstore.FormatVersion } -func newBlobReader(file *os.File, formatVersion storage.FormatVersion) *blobReader { +func newBlobReader(file *os.File, formatVersion blobstore.FormatVersion) *blobReader { return &blobReader{file, formatVersion} } @@ -59,21 +59,21 @@ func (blob *blobReader) Size() (int64, error) { } // StorageFormatVersion gets the storage format version being used by the blob. -func (blob *blobReader) StorageFormatVersion() storage.FormatVersion { +func (blob *blobReader) StorageFormatVersion() blobstore.FormatVersion { return blob.formatVersion } // blobWriter implements writing blobs. type blobWriter struct { - ref storage.BlobRef + ref blobstore.BlobRef store *blobStore closed bool - formatVersion storage.FormatVersion + formatVersion blobstore.FormatVersion buffer *bufio.Writer fh *os.File } -func newBlobWriter(ref storage.BlobRef, store *blobStore, formatVersion storage.FormatVersion, file *os.File, bufferSize int) *blobWriter { +func newBlobWriter(ref blobstore.BlobRef, store *blobStore, formatVersion blobstore.FormatVersion, file *os.File, bufferSize int) *blobWriter { return &blobWriter{ ref: ref, store: store, @@ -140,6 +140,6 @@ func (blob *blobWriter) Size() (int64, error) { } // StorageFormatVersion indicates what storage format version the blob is using. -func (blob *blobWriter) StorageFormatVersion() storage.FormatVersion { +func (blob *blobWriter) StorageFormatVersion() blobstore.FormatVersion { return blob.formatVersion } diff --git a/storage/filestore/dir.go b/storagenode/blobstore/filestore/dir.go similarity index 91% rename from storage/filestore/dir.go rename to storagenode/blobstore/filestore/dir.go index 2c1ed7cdb..7a5910443 100644 --- a/storage/filestore/dir.go +++ b/storagenode/blobstore/filestore/dir.go @@ -22,7 +22,7 @@ import ( "storj.io/common/experiment" "storj.io/common/storj" - "storj.io/storj/storage" + "storj.io/storj/storagenode/blobstore" ) const ( @@ -162,14 +162,14 @@ func (dir *Dir) DeleteTemporary(ctx context.Context, file *os.File) (err error) // entire path; blobPathForFormatVersion() must also be used. This is a separate call because this // part of the filepath is constant, and blobPathForFormatVersion may need to be called multiple // times with different storage.FormatVersion values. -func (dir *Dir) blobToBasePath(ref storage.BlobRef) (string, error) { +func (dir *Dir) blobToBasePath(ref blobstore.BlobRef) (string, error) { return dir.refToDirPath(ref, dir.blobsdir()) } // refToDirPath converts a blob reference to a filepath in the specified sub-directory. -func (dir *Dir) refToDirPath(ref storage.BlobRef, subDir string) (string, error) { +func (dir *Dir) refToDirPath(ref blobstore.BlobRef, subDir string) (string, error) { if !ref.IsValid() { - return "", storage.ErrInvalidBlobRef.New("") + return "", blobstore.ErrInvalidBlobRef.New("") } namespace := pathEncoding.EncodeToString(ref.Namespace) @@ -183,7 +183,7 @@ func (dir *Dir) refToDirPath(ref storage.BlobRef, subDir string) (string, error) // fileConfirmedInTrash returns true if it is able to confirm the file is in // the trash. On errors, or if the file is not in the trash, it returns false. -func (dir *Dir) fileConfirmedInTrash(ctx context.Context, ref storage.BlobRef, formatVer storage.FormatVersion) bool { +func (dir *Dir) fileConfirmedInTrash(ctx context.Context, ref blobstore.BlobRef, formatVer blobstore.FormatVersion) bool { trashBasePath, err := dir.refToDirPath(ref, dir.trashdir()) if err != nil { return false @@ -195,7 +195,7 @@ func (dir *Dir) fileConfirmedInTrash(ctx context.Context, ref storage.BlobRef, f // blobPathForFormatVersion adjusts a bare blob path (as might have been generated by a call to // blobToBasePath()) to what it should be for the given storage format version. -func blobPathForFormatVersion(path string, formatVersion storage.FormatVersion) string { +func blobPathForFormatVersion(path string, formatVersion blobstore.FormatVersion) string { switch formatVersion { case FormatV0: return path + v0PieceFileSuffix @@ -208,7 +208,7 @@ func blobPathForFormatVersion(path string, formatVersion storage.FormatVersion) // blobToGarbagePath converts a blob reference to a filepath in transient // storage. The files in garbage are deleted on an interval (in case the // initial deletion didn't work for some reason). -func (dir *Dir) blobToGarbagePath(ref storage.BlobRef) string { +func (dir *Dir) blobToGarbagePath(ref blobstore.BlobRef) string { var name []byte name = append(name, ref.Namespace...) name = append(name, ref.Key...) @@ -216,7 +216,7 @@ func (dir *Dir) blobToGarbagePath(ref storage.BlobRef) string { } // Commit commits the temporary file to permanent storage. -func (dir *Dir) Commit(ctx context.Context, file *os.File, ref storage.BlobRef, formatVersion storage.FormatVersion) (err error) { +func (dir *Dir) Commit(ctx context.Context, file *os.File, ref blobstore.BlobRef, formatVersion blobstore.FormatVersion) (err error) { defer mon.Task()(&ctx)(&err) position, seekErr := file.Seek(0, io.SeekCurrent) truncErr := file.Truncate(position) @@ -264,7 +264,7 @@ func (dir *Dir) Commit(ctx context.Context, file *os.File, ref storage.BlobRef, // order to find the blob, if it was stored with an older version of the storage node software. // In cases where the storage format version of a blob is already known, OpenWithStorageFormat() // will generally be a better choice. -func (dir *Dir) Open(ctx context.Context, ref storage.BlobRef) (_ *os.File, _ storage.FormatVersion, err error) { +func (dir *Dir) Open(ctx context.Context, ref blobstore.BlobRef) (_ *os.File, _ blobstore.FormatVersion, err error) { defer mon.Task()(&ctx)(&err) path, err := dir.blobToBasePath(ref) if err != nil { @@ -290,7 +290,7 @@ func (dir *Dir) Open(ctx context.Context, ref storage.BlobRef) (_ *os.File, _ st // OpenWithStorageFormat opens an already-located blob file with a known storage format version, // which avoids the potential need to search through multiple storage formats to find the blob. -func (dir *Dir) OpenWithStorageFormat(ctx context.Context, ref storage.BlobRef, formatVer storage.FormatVersion) (_ *os.File, err error) { +func (dir *Dir) OpenWithStorageFormat(ctx context.Context, ref blobstore.BlobRef, formatVer blobstore.FormatVersion) (_ *os.File, err error) { defer mon.Task()(&ctx)(&err) path, err := dir.blobToBasePath(ref) if err != nil { @@ -315,7 +315,7 @@ func (dir *Dir) OpenWithStorageFormat(ctx context.Context, ref storage.BlobRef, // in order to find the blob, if it was stored with an older version of the storage node software. // In cases where the storage format version of a blob is already known, StatWithStorageFormat() // will generally be a better choice. -func (dir *Dir) Stat(ctx context.Context, ref storage.BlobRef) (_ storage.BlobInfo, err error) { +func (dir *Dir) Stat(ctx context.Context, ref blobstore.BlobRef) (_ blobstore.BlobInfo, err error) { defer mon.Task()(&ctx)(&err) path, err := dir.blobToBasePath(ref) if err != nil { @@ -337,7 +337,7 @@ func (dir *Dir) Stat(ctx context.Context, ref storage.BlobRef) (_ storage.BlobIn // StatWithStorageFormat looks up disk metadata on the blob file with the given storage format // version. This avoids the need for checking for the file in multiple different storage format // types. -func (dir *Dir) StatWithStorageFormat(ctx context.Context, ref storage.BlobRef, formatVer storage.FormatVersion) (_ storage.BlobInfo, err error) { +func (dir *Dir) StatWithStorageFormat(ctx context.Context, ref blobstore.BlobRef, formatVer blobstore.FormatVersion) (_ blobstore.BlobInfo, err error) { defer mon.Task()(&ctx)(&err) path, err := dir.blobToBasePath(ref) if err != nil { @@ -355,13 +355,13 @@ func (dir *Dir) StatWithStorageFormat(ctx context.Context, ref storage.BlobRef, } // Trash moves the piece specified by ref to the trashdir for every format version. -func (dir *Dir) Trash(ctx context.Context, ref storage.BlobRef) (err error) { +func (dir *Dir) Trash(ctx context.Context, ref blobstore.BlobRef) (err error) { defer mon.Task()(&ctx)(&err) return dir.iterateStorageFormatVersions(ctx, ref, dir.TrashWithStorageFormat) } // TrashWithStorageFormat moves the piece specified by ref to the trashdir for the specified format version. -func (dir *Dir) TrashWithStorageFormat(ctx context.Context, ref storage.BlobRef, formatVer storage.FormatVersion) (err error) { +func (dir *Dir) TrashWithStorageFormat(ctx context.Context, ref blobstore.BlobRef, formatVer blobstore.FormatVersion) (err error) { // Ensure trashdir exists so that we know any os.IsNotExist errors below // are not from a missing trash dir _, err = os.Stat(dir.trashdir()) @@ -426,7 +426,7 @@ func (dir *Dir) ReplaceTrashnow(trashnow func() time.Time) { // RestoreTrash moves every piece in the trash folder back into blobsdir. func (dir *Dir) RestoreTrash(ctx context.Context, namespace []byte) (keysRestored [][]byte, err error) { var errorsEncountered errs.Group - err = dir.walkNamespaceInPath(ctx, namespace, dir.trashdir(), func(info storage.BlobInfo) error { + err = dir.walkNamespaceInPath(ctx, namespace, dir.trashdir(), func(info blobstore.BlobInfo) error { blobsBasePath, err := dir.blobToBasePath(info.BlobRef()) if err != nil { errorsEncountered.Add(err) @@ -476,7 +476,7 @@ func (dir *Dir) RestoreTrash(ctx context.Context, namespace []byte) (keysRestore func (dir *Dir) EmptyTrash(ctx context.Context, namespace []byte, trashedBefore time.Time) (bytesEmptied int64, deletedKeys [][]byte, err error) { defer mon.Task()(&ctx)(&err) var errorsEncountered errs.Group - err = dir.walkNamespaceInPath(ctx, namespace, dir.trashdir(), func(info storage.BlobInfo) error { + err = dir.walkNamespaceInPath(ctx, namespace, dir.trashdir(), func(info blobstore.BlobInfo) error { fileInfo, err := info.Stat(ctx) if err != nil { if os.IsNotExist(err) { @@ -523,7 +523,7 @@ func (dir *Dir) EmptyTrash(ctx context.Context, namespace []byte, trashedBefore // // f will be executed for every storage formate version regardless of the // result, and will aggregate errors into a single returned error. -func (dir *Dir) iterateStorageFormatVersions(ctx context.Context, ref storage.BlobRef, f func(ctx context.Context, ref storage.BlobRef, i storage.FormatVersion) error) (err error) { +func (dir *Dir) iterateStorageFormatVersions(ctx context.Context, ref blobstore.BlobRef, f func(ctx context.Context, ref blobstore.BlobRef, i blobstore.FormatVersion) error) (err error) { defer mon.Task()(&ctx)(&err) var combinedErrors errs.Group for i := MinFormatVersionSupported; i <= MaxFormatVersionSupported; i++ { @@ -536,7 +536,7 @@ func (dir *Dir) iterateStorageFormatVersions(ctx context.Context, ref storage.Bl // // It doesn't return an error if the blob is not found for any reason or it // cannot be deleted at this moment and it's delayed. -func (dir *Dir) Delete(ctx context.Context, ref storage.BlobRef) (err error) { +func (dir *Dir) Delete(ctx context.Context, ref blobstore.BlobRef) (err error) { defer mon.Task()(&ctx)(&err) return dir.iterateStorageFormatVersions(ctx, ref, dir.DeleteWithStorageFormat) } @@ -550,7 +550,7 @@ func (dir *Dir) Delete(ctx context.Context, ref storage.BlobRef) (err error) { // * push the blobs to queue for retrying later. // // It doesn't return an error if the piece isn't found for any reason. -func (dir *Dir) DeleteWithStorageFormat(ctx context.Context, ref storage.BlobRef, formatVer storage.FormatVersion) (err error) { +func (dir *Dir) DeleteWithStorageFormat(ctx context.Context, ref blobstore.BlobRef, formatVer blobstore.FormatVersion) (err error) { defer mon.Task()(&ctx)(&err) return dir.deleteWithStorageFormatInPath(ctx, dir.blobsdir(), ref, formatVer) } @@ -561,7 +561,7 @@ func (dir *Dir) DeleteNamespace(ctx context.Context, ref []byte) (err error) { return dir.deleteNamespace(ctx, dir.blobsdir(), ref) } -func (dir *Dir) deleteWithStorageFormatInPath(ctx context.Context, path string, ref storage.BlobRef, formatVer storage.FormatVersion) (err error) { +func (dir *Dir) deleteWithStorageFormatInPath(ctx context.Context, path string, ref blobstore.BlobRef, formatVer blobstore.FormatVersion) (err error) { defer mon.Task()(&ctx)(&err) // Ensure garbage dir exists so that we know any os.IsNotExist errors below @@ -705,12 +705,12 @@ func (dir *Dir) listNamespacesInPath(ctx context.Context, path string) (ids [][] // greater, in the given namespace. If walkFunc returns a non-nil error, WalkNamespace will stop // iterating and return the error immediately. The ctx parameter is intended specifically to allow // canceling iteration early. -func (dir *Dir) WalkNamespace(ctx context.Context, namespace []byte, walkFunc func(storage.BlobInfo) error) (err error) { +func (dir *Dir) WalkNamespace(ctx context.Context, namespace []byte, walkFunc func(blobstore.BlobInfo) error) (err error) { defer mon.Task()(&ctx)(&err) return dir.walkNamespaceInPath(ctx, namespace, dir.blobsdir(), walkFunc) } -func (dir *Dir) walkNamespaceInPath(ctx context.Context, namespace []byte, path string, walkFunc func(storage.BlobInfo) error) (err error) { +func (dir *Dir) walkNamespaceInPath(ctx context.Context, namespace []byte, path string, walkFunc func(blobstore.BlobInfo) error) (err error) { defer mon.Task()(&ctx)(&err) namespaceDir := pathEncoding.EncodeToString(namespace) nsDir := filepath.Join(path, namespaceDir) @@ -755,7 +755,7 @@ func (dir *Dir) walkNamespaceInPath(ctx context.Context, namespace []byte, path } } -func decodeBlobInfo(namespace []byte, keyPrefix, keyDir, name string) (info storage.BlobInfo, ok bool) { +func decodeBlobInfo(namespace []byte, keyPrefix, keyDir, name string) (info blobstore.BlobInfo, ok bool) { blobFileName := name encodedKey := keyPrefix + blobFileName formatVer := FormatV0 @@ -767,14 +767,14 @@ func decodeBlobInfo(namespace []byte, keyPrefix, keyDir, name string) (info stor if err != nil { return nil, false } - ref := storage.BlobRef{ + ref := blobstore.BlobRef{ Namespace: namespace, Key: key, } return newBlobInfo(ref, filepath.Join(keyDir, blobFileName), nil, formatVer), true } -func walkNamespaceWithPrefix(ctx context.Context, log *zap.Logger, namespace []byte, nsDir, keyPrefix string, walkFunc func(storage.BlobInfo) error) (err error) { +func walkNamespaceWithPrefix(ctx context.Context, log *zap.Logger, namespace []byte, nsDir, keyPrefix string, walkFunc func(blobstore.BlobInfo) error) (err error) { keyDir := filepath.Join(nsDir, keyPrefix) openDir, err := os.Open(keyDir) if err != nil { @@ -852,13 +852,13 @@ func (dir *Dir) Info(ctx context.Context) (DiskInfo, error) { } type blobInfo struct { - ref storage.BlobRef + ref blobstore.BlobRef path string fileInfo os.FileInfo - formatVersion storage.FormatVersion + formatVersion blobstore.FormatVersion } -func newBlobInfo(ref storage.BlobRef, path string, fileInfo os.FileInfo, formatVer storage.FormatVersion) storage.BlobInfo { +func newBlobInfo(ref blobstore.BlobRef, path string, fileInfo os.FileInfo, formatVer blobstore.FormatVersion) blobstore.BlobInfo { return &blobInfo{ ref: ref, path: path, @@ -867,11 +867,11 @@ func newBlobInfo(ref storage.BlobRef, path string, fileInfo os.FileInfo, formatV } } -func (info *blobInfo) BlobRef() storage.BlobRef { +func (info *blobInfo) BlobRef() blobstore.BlobRef { return info.ref } -func (info *blobInfo) StorageFormatVersion() storage.FormatVersion { +func (info *blobInfo) StorageFormatVersion() blobstore.FormatVersion { return info.formatVersion } diff --git a/storage/filestore/dir_test.go b/storagenode/blobstore/filestore/dir_test.go similarity index 100% rename from storage/filestore/dir_test.go rename to storagenode/blobstore/filestore/dir_test.go diff --git a/storage/filestore/dir_unix.go b/storagenode/blobstore/filestore/dir_unix.go similarity index 100% rename from storage/filestore/dir_unix.go rename to storagenode/blobstore/filestore/dir_unix.go diff --git a/storage/filestore/dir_windows.go b/storagenode/blobstore/filestore/dir_windows.go similarity index 100% rename from storage/filestore/dir_windows.go rename to storagenode/blobstore/filestore/dir_windows.go diff --git a/storage/filestore/error.go b/storagenode/blobstore/filestore/error.go similarity index 100% rename from storage/filestore/error.go rename to storagenode/blobstore/filestore/error.go diff --git a/storage/filestore/errors_other.go b/storagenode/blobstore/filestore/errors_other.go similarity index 100% rename from storage/filestore/errors_other.go rename to storagenode/blobstore/filestore/errors_other.go diff --git a/storage/filestore/errors_unix.go b/storagenode/blobstore/filestore/errors_unix.go similarity index 100% rename from storage/filestore/errors_unix.go rename to storagenode/blobstore/filestore/errors_unix.go diff --git a/storage/filestore/store.go b/storagenode/blobstore/filestore/store.go similarity index 85% rename from storage/filestore/store.go rename to storagenode/blobstore/filestore/store.go index fdfb9b2a3..269a99a28 100644 --- a/storage/filestore/store.go +++ b/storagenode/blobstore/filestore/store.go @@ -18,7 +18,7 @@ import ( "storj.io/common/memory" "storj.io/common/storj" - "storj.io/storj/storage" + "storj.io/storj/storagenode/blobstore" ) var ( @@ -29,12 +29,14 @@ var ( ErrIsDir = Error.New("file is a directory") mon = monkit.Package() + // for backwards compatibility. + monStorage = monkit.ScopeNamed("storj.io/storj/storage/filestore") - _ storage.Blobs = (*blobStore)(nil) + _ blobstore.Blobs = (*blobStore)(nil) ) func monFileInTrash(namespace []byte) *monkit.Meter { - return mon.Meter("open_file_in_trash", monkit.NewSeriesTag("namespace", hex.EncodeToString(namespace))) //mon:locked + return monStorage.Meter("open_file_in_trash", monkit.NewSeriesTag("namespace", hex.EncodeToString(namespace))) //mon:locked } // Config is configuration for the blob store. @@ -55,12 +57,12 @@ type blobStore struct { } // New creates a new disk blob store in the specified directory. -func New(log *zap.Logger, dir *Dir, config Config) storage.Blobs { +func New(log *zap.Logger, dir *Dir, config Config) blobstore.Blobs { return &blobStore{dir: dir, log: log, config: config} } // NewAt creates a new disk blob store in the specified directory. -func NewAt(log *zap.Logger, path string, config Config) (storage.Blobs, error) { +func NewAt(log *zap.Logger, path string, config Config) (blobstore.Blobs, error) { dir, err := NewDir(log, path) if err != nil { return nil, Error.Wrap(err) @@ -72,7 +74,7 @@ func NewAt(log *zap.Logger, path string, config Config) (storage.Blobs, error) { func (store *blobStore) Close() error { return nil } // Open loads blob with the specified hash. -func (store *blobStore) Open(ctx context.Context, ref storage.BlobRef) (_ storage.BlobReader, err error) { +func (store *blobStore) Open(ctx context.Context, ref blobstore.BlobRef) (_ blobstore.BlobReader, err error) { defer mon.Task()(&ctx)(&err) file, formatVer, err := store.dir.Open(ctx, ref) if err != nil { @@ -86,7 +88,7 @@ func (store *blobStore) Open(ctx context.Context, ref storage.BlobRef) (_ storag // OpenWithStorageFormat loads the already-located blob, avoiding the potential need to check multiple // storage formats to find the blob. -func (store *blobStore) OpenWithStorageFormat(ctx context.Context, blobRef storage.BlobRef, formatVer storage.FormatVersion) (_ storage.BlobReader, err error) { +func (store *blobStore) OpenWithStorageFormat(ctx context.Context, blobRef blobstore.BlobRef, formatVer blobstore.FormatVersion) (_ blobstore.BlobReader, err error) { defer mon.Task()(&ctx)(&err) file, err := store.dir.OpenWithStorageFormat(ctx, blobRef, formatVer) if err != nil { @@ -99,14 +101,14 @@ func (store *blobStore) OpenWithStorageFormat(ctx context.Context, blobRef stora } // Stat looks up disk metadata on the blob file. -func (store *blobStore) Stat(ctx context.Context, ref storage.BlobRef) (_ storage.BlobInfo, err error) { +func (store *blobStore) Stat(ctx context.Context, ref blobstore.BlobRef) (_ blobstore.BlobInfo, err error) { defer mon.Task()(&ctx)(&err) info, err := store.dir.Stat(ctx, ref) return info, Error.Wrap(err) } // StatWithStorageFormat looks up disk metadata on the blob file with the given storage format version. -func (store *blobStore) StatWithStorageFormat(ctx context.Context, ref storage.BlobRef, formatVer storage.FormatVersion) (_ storage.BlobInfo, err error) { +func (store *blobStore) StatWithStorageFormat(ctx context.Context, ref blobstore.BlobRef, formatVer blobstore.FormatVersion) (_ blobstore.BlobInfo, err error) { defer mon.Task()(&ctx)(&err) info, err := store.dir.StatWithStorageFormat(ctx, ref, formatVer) return info, Error.Wrap(err) @@ -116,14 +118,14 @@ func (store *blobStore) StatWithStorageFormat(ctx context.Context, ref storage.B // // It doesn't return an error if the blob isn't found for any reason or it cannot // be deleted at this moment and it's delayed. -func (store *blobStore) Delete(ctx context.Context, ref storage.BlobRef) (err error) { +func (store *blobStore) Delete(ctx context.Context, ref blobstore.BlobRef) (err error) { defer mon.Task()(&ctx)(&err) err = store.dir.Delete(ctx, ref) return Error.Wrap(err) } // DeleteWithStorageFormat deletes blobs with the specified ref and storage format version. -func (store *blobStore) DeleteWithStorageFormat(ctx context.Context, ref storage.BlobRef, formatVer storage.FormatVersion) (err error) { +func (store *blobStore) DeleteWithStorageFormat(ctx context.Context, ref blobstore.BlobRef, formatVer blobstore.FormatVersion) (err error) { defer mon.Task()(&ctx)(&err) err = store.dir.DeleteWithStorageFormat(ctx, ref, formatVer) return Error.Wrap(err) @@ -137,7 +139,7 @@ func (store *blobStore) DeleteNamespace(ctx context.Context, ref []byte) (err er } // Trash moves the ref to a trash directory. -func (store *blobStore) Trash(ctx context.Context, ref storage.BlobRef) (err error) { +func (store *blobStore) Trash(ctx context.Context, ref blobstore.BlobRef) (err error) { defer mon.Task()(&ctx)(&err) return Error.Wrap(store.dir.Trash(ctx, ref)) } @@ -165,7 +167,7 @@ func (store *blobStore) GarbageCollect(ctx context.Context) (err error) { // Create creates a new blob that can be written. // Optionally takes a size argument for performance improvements, -1 is unknown size. -func (store *blobStore) Create(ctx context.Context, ref storage.BlobRef, size int64) (_ storage.BlobWriter, err error) { +func (store *blobStore) Create(ctx context.Context, ref blobstore.BlobRef, size int64) (_ blobstore.BlobWriter, err error) { defer mon.Task()(&ctx)(&err) file, err := store.dir.CreateTemporaryFile(ctx, size) if err != nil { @@ -196,7 +198,7 @@ func (store *blobStore) SpaceUsedForBlobs(ctx context.Context) (space int64, err // SpaceUsedForBlobsInNamespace adds up how much is used in the given namespace for blob storage. func (store *blobStore) SpaceUsedForBlobsInNamespace(ctx context.Context, namespace []byte) (int64, error) { var totalUsed int64 - err := store.WalkNamespace(ctx, namespace, func(info storage.BlobInfo) error { + err := store.WalkNamespace(ctx, namespace, func(info blobstore.BlobInfo) error { statInfo, statErr := info.Stat(ctx) if statErr != nil { store.log.Error("failed to stat blob", zap.Binary("namespace", namespace), zap.Binary("key", info.BlobRef().Key), zap.Error(statErr)) @@ -282,12 +284,12 @@ func (store *blobStore) ListNamespaces(ctx context.Context) (ids [][]byte, err e // WalkNamespace executes walkFunc for each locally stored blob in the given namespace. If walkFunc // returns a non-nil error, WalkNamespace will stop iterating and return the error immediately. The // ctx parameter is intended specifically to allow canceling iteration early. -func (store *blobStore) WalkNamespace(ctx context.Context, namespace []byte, walkFunc func(storage.BlobInfo) error) (err error) { +func (store *blobStore) WalkNamespace(ctx context.Context, namespace []byte, walkFunc func(blobstore.BlobInfo) error) (err error) { return store.dir.WalkNamespace(ctx, namespace, walkFunc) } // TestCreateV0 creates a new V0 blob that can be written. This is ONLY appropriate in test situations. -func (store *blobStore) TestCreateV0(ctx context.Context, ref storage.BlobRef) (_ storage.BlobWriter, err error) { +func (store *blobStore) TestCreateV0(ctx context.Context, ref blobstore.BlobRef) (_ blobstore.BlobWriter, err error) { defer mon.Task()(&ctx)(&err) file, err := store.dir.CreateTemporaryFile(ctx, -1) diff --git a/storage/filestore/store_test.go b/storagenode/blobstore/filestore/store_test.go similarity index 92% rename from storage/filestore/store_test.go rename to storagenode/blobstore/filestore/store_test.go index 5b461f2df..d7ff2678b 100644 --- a/storage/filestore/store_test.go +++ b/storagenode/blobstore/filestore/store_test.go @@ -23,8 +23,8 @@ import ( "storj.io/common/memory" "storj.io/common/testcontext" "storj.io/common/testrand" - "storj.io/storj/storage" - "storj.io/storj/storage/filestore" + "storj.io/storj/storagenode/blobstore" + "storj.io/storj/storagenode/blobstore/filestore" ) const ( @@ -46,13 +46,13 @@ func TestStoreLoad(t *testing.T) { data := testrand.Bytes(blobSize) temp := make([]byte, len(data)) - refs := []storage.BlobRef{} + refs := []blobstore.BlobRef{} namespace := testrand.Bytes(32) // store without size for i := 0; i < repeatCount; i++ { - ref := storage.BlobRef{ + ref := blobstore.BlobRef{ Namespace: namespace, Key: testrand.Bytes(32), } @@ -75,7 +75,7 @@ func TestStoreLoad(t *testing.T) { namespace = testrand.Bytes(32) // store with size for i := 0; i < repeatCount; i++ { - ref := storage.BlobRef{ + ref := blobstore.BlobRef{ Namespace: namespace, Key: testrand.Bytes(32), } @@ -94,7 +94,7 @@ func TestStoreLoad(t *testing.T) { namespace = testrand.Bytes(32) // store with larger size { - ref := storage.BlobRef{ + ref := blobstore.BlobRef{ Namespace: namespace, Key: testrand.Bytes(32), } @@ -113,7 +113,7 @@ func TestStoreLoad(t *testing.T) { namespace = testrand.Bytes(32) // store with error { - ref := storage.BlobRef{ + ref := blobstore.BlobRef{ Namespace: namespace, Key: testrand.Bytes(32), } @@ -174,7 +174,7 @@ func TestDeleteWhileReading(t *testing.T) { data := testrand.Bytes(blobSize) - ref := storage.BlobRef{ + ref := blobstore.BlobRef{ Namespace: []byte{0}, Key: []byte{1}, } @@ -237,15 +237,15 @@ func TestDeleteWhileReading(t *testing.T) { } } -func writeABlob(ctx context.Context, t testing.TB, store storage.Blobs, blobRef storage.BlobRef, data []byte, formatVersion storage.FormatVersion) { +func writeABlob(ctx context.Context, t testing.TB, store blobstore.Blobs, blobRef blobstore.BlobRef, data []byte, formatVersion blobstore.FormatVersion) { var ( - blobWriter storage.BlobWriter + blobWriter blobstore.BlobWriter err error ) switch formatVersion { case filestore.FormatV0: fStore, ok := store.(interface { - TestCreateV0(ctx context.Context, ref storage.BlobRef) (_ storage.BlobWriter, err error) + TestCreateV0(ctx context.Context, ref blobstore.BlobRef) (_ blobstore.BlobWriter, err error) }) require.Truef(t, ok, "can't make a WriterForFormatVersion with this blob store (%T)", store) blobWriter, err = fStore.TestCreateV0(ctx, blobRef) @@ -265,21 +265,21 @@ func writeABlob(ctx context.Context, t testing.TB, store storage.Blobs, blobRef require.NoError(t, err) } -func verifyBlobHandle(t testing.TB, reader storage.BlobReader, expectDataLen int, expectFormat storage.FormatVersion) { +func verifyBlobHandle(t testing.TB, reader blobstore.BlobReader, expectDataLen int, expectFormat blobstore.FormatVersion) { assert.Equal(t, expectFormat, reader.StorageFormatVersion()) size, err := reader.Size() require.NoError(t, err) assert.Equal(t, int64(expectDataLen), size) } -func verifyBlobInfo(ctx context.Context, t testing.TB, blobInfo storage.BlobInfo, expectDataLen int, expectFormat storage.FormatVersion) { +func verifyBlobInfo(ctx context.Context, t testing.TB, blobInfo blobstore.BlobInfo, expectDataLen int, expectFormat blobstore.FormatVersion) { assert.Equal(t, expectFormat, blobInfo.StorageFormatVersion()) stat, err := blobInfo.Stat(ctx) require.NoError(t, err) assert.Equal(t, int64(expectDataLen), stat.Size()) } -func tryOpeningABlob(ctx context.Context, t testing.TB, store storage.Blobs, blobRef storage.BlobRef, expectDataLen int, expectFormat storage.FormatVersion) { +func tryOpeningABlob(ctx context.Context, t testing.TB, store blobstore.Blobs, blobRef blobstore.BlobRef, expectDataLen int, expectFormat blobstore.FormatVersion) { reader, err := store.Open(ctx, blobRef) require.NoError(t, err) verifyBlobHandle(t, reader, expectDataLen, expectFormat) @@ -315,8 +315,8 @@ func TestMultipleStorageFormatVersions(t *testing.T) { v0BlobKey = testrand.Bytes(keySize) v1BlobKey = testrand.Bytes(keySize) - v0Ref = storage.BlobRef{Namespace: namespace, Key: v0BlobKey} - v1Ref = storage.BlobRef{Namespace: namespace, Key: v1BlobKey} + v0Ref = blobstore.BlobRef{Namespace: namespace, Key: v0BlobKey} + v1Ref = blobstore.BlobRef{Namespace: namespace, Key: v1BlobKey} ) // write a V0 blob @@ -386,7 +386,7 @@ func TestStoreSpaceUsed(t *testing.T) { var totalSoFar memory.Size for _, size := range sizesToStore { contents := testrand.Bytes(size) - blobRef := storage.BlobRef{Namespace: namespace, Key: testrand.Bytes(keySize)} + blobRef := blobstore.BlobRef{Namespace: namespace, Key: testrand.Bytes(keySize)} blobWriter, err := store.Create(ctx, blobRef, int64(len(contents))) require.NoError(t, err) @@ -420,7 +420,7 @@ func TestStoreTraversals(t *testing.T) { // invent some namespaces and store stuff in them type namespaceWithBlobs struct { namespace []byte - blobs []storage.BlobRef + blobs []blobstore.BlobRef } const numNamespaces = 4 recordsToInsert := make([]namespaceWithBlobs, numNamespaces) @@ -433,9 +433,9 @@ func TestStoreTraversals(t *testing.T) { recordsToInsert[i].namespace[len(namespaceBase)-1] = byte(i) // put varying numbers of blobs in the namespaces - recordsToInsert[i].blobs = make([]storage.BlobRef, i+1) + recordsToInsert[i].blobs = make([]blobstore.BlobRef, i+1) for j := range recordsToInsert[i].blobs { - recordsToInsert[i].blobs[j] = storage.BlobRef{ + recordsToInsert[i].blobs[j] = blobstore.BlobRef{ Namespace: recordsToInsert[i].namespace, Key: testrand.Bytes(keySize), } @@ -471,7 +471,7 @@ func TestStoreTraversals(t *testing.T) { // keep track of which blobs we visit with WalkNamespace found := make([]bool, len(expected.blobs)) - err = store.WalkNamespace(ctx, expected.namespace, func(info storage.BlobInfo) error { + err = store.WalkNamespace(ctx, expected.namespace, func(info blobstore.BlobInfo) error { gotBlobRef := info.BlobRef() assert.Equal(t, expected.namespace, gotBlobRef.Namespace) // find which blob this is in expected.blobs @@ -511,7 +511,7 @@ func TestStoreTraversals(t *testing.T) { // test WalkNamespace on a nonexistent namespace also namespaceBase[len(namespaceBase)-1] = byte(numNamespaces) - err = store.WalkNamespace(ctx, namespaceBase, func(_ storage.BlobInfo) error { + err = store.WalkNamespace(ctx, namespaceBase, func(_ blobstore.BlobInfo) error { t.Fatal("this should not have been called") return nil }) @@ -520,7 +520,7 @@ func TestStoreTraversals(t *testing.T) { // check that WalkNamespace stops iterating after an error return iterations := 0 expectedErr := errs.New("an expected error") - err = store.WalkNamespace(ctx, recordsToInsert[numNamespaces-1].namespace, func(_ storage.BlobInfo) error { + err = store.WalkNamespace(ctx, recordsToInsert[numNamespaces-1].namespace, func(_ blobstore.BlobInfo) error { iterations++ if iterations == 2 { return expectedErr @@ -544,7 +544,7 @@ func TestEmptyTrash(t *testing.T) { type testfile struct { data []byte - formatVer storage.FormatVersion + formatVer blobstore.FormatVersion } type testref struct { key []byte @@ -614,16 +614,16 @@ func TestEmptyTrash(t *testing.T) { for _, namespace := range namespaces { for _, ref := range namespace.refs { - blobref := storage.BlobRef{ + blobref := blobstore.BlobRef{ Namespace: namespace.namespace, Key: ref.key, } for _, file := range ref.files { - var w storage.BlobWriter + var w blobstore.BlobWriter if file.formatVer == filestore.FormatV0 { fStore, ok := store.(interface { - TestCreateV0(ctx context.Context, ref storage.BlobRef) (_ storage.BlobWriter, err error) + TestCreateV0(ctx context.Context, ref blobstore.BlobRef) (_ blobstore.BlobWriter, err error) }) require.Truef(t, ok, "can't make TestCreateV0 with this blob store (%T)", store) w, err = fStore.TestCreateV0(ctx, blobref) @@ -669,7 +669,7 @@ func TestTrashAndRestore(t *testing.T) { type testfile struct { data []byte - formatVer storage.FormatVersion + formatVer blobstore.FormatVersion } type testref struct { key []byte @@ -739,16 +739,16 @@ func TestTrashAndRestore(t *testing.T) { for _, namespace := range namespaces { for _, ref := range namespace.refs { - blobref := storage.BlobRef{ + blobref := blobstore.BlobRef{ Namespace: namespace.namespace, Key: ref.key, } for _, file := range ref.files { - var w storage.BlobWriter + var w blobstore.BlobWriter if file.formatVer == filestore.FormatV0 { fStore, ok := store.(interface { - TestCreateV0(ctx context.Context, ref storage.BlobRef) (_ storage.BlobWriter, err error) + TestCreateV0(ctx context.Context, ref blobstore.BlobRef) (_ blobstore.BlobWriter, err error) }) require.Truef(t, ok, "can't make TestCreateV0 with this blob store (%T)", store) w, err = fStore.TestCreateV0(ctx, blobref) @@ -791,7 +791,7 @@ func TestTrashAndRestore(t *testing.T) { // Verify pieces are back and look good for first namespace for _, ref := range namespaces[0].refs { - blobref := storage.BlobRef{ + blobref := blobstore.BlobRef{ Namespace: namespaces[0].namespace, Key: ref.key, } @@ -802,7 +802,7 @@ func TestTrashAndRestore(t *testing.T) { // Verify pieces in second namespace are still missing (were not restored) for _, ref := range namespaces[1].refs { - blobref := storage.BlobRef{ + blobref := blobstore.BlobRef{ Namespace: namespaces[1].namespace, Key: ref.key, } @@ -814,7 +814,7 @@ func TestTrashAndRestore(t *testing.T) { } } -func requireFileMatches(ctx context.Context, t *testing.T, store storage.Blobs, data []byte, ref storage.BlobRef, formatVer storage.FormatVersion) { +func requireFileMatches(ctx context.Context, t *testing.T, store blobstore.Blobs, data []byte, ref blobstore.BlobRef, formatVer blobstore.FormatVersion) { r, err := store.OpenWithStorageFormat(ctx, ref, formatVer) require.NoError(t, err) @@ -838,7 +838,7 @@ func TestBlobMemoryBuffer(t *testing.T) { require.NoError(t, err) defer ctx.Check(store.Close) - ref := storage.BlobRef{ + ref := blobstore.BlobRef{ Namespace: testrand.Bytes(32), Key: testrand.Bytes(32), } diff --git a/private/testblobs/bad.go b/storagenode/blobstore/testblobs/bad.go similarity index 86% rename from private/testblobs/bad.go rename to storagenode/blobstore/testblobs/bad.go index a9aec3181..790c96879 100644 --- a/private/testblobs/bad.go +++ b/storagenode/blobstore/testblobs/bad.go @@ -11,15 +11,15 @@ import ( "go.uber.org/zap" "storj.io/common/storj" - "storj.io/storj/storage" "storj.io/storj/storagenode" + "storj.io/storj/storagenode/blobstore" ) -// ErrorBlobs is the interface of storage.Blobs with the SetError method added. +// ErrorBlobs is the interface of blobstore.Blobs with the SetError method added. // This allows the BadDB{}.Blobs member to be replaced with something that has // specific behavior changes. type ErrorBlobs interface { - storage.Blobs + blobstore.Blobs SetError(err error) } @@ -41,7 +41,7 @@ func NewBadDB(log *zap.Logger, db storagenode.DB) *BadDB { } // Pieces returns the blob store. -func (bad *BadDB) Pieces() storage.Blobs { +func (bad *BadDB) Pieces() blobstore.Blobs { return bad.Blobs } @@ -53,7 +53,7 @@ func (bad *BadDB) SetError(err error) { // BadBlobs implements a bad blob store. type BadBlobs struct { err lockedErr - blobs storage.Blobs + blobs blobstore.Blobs log *zap.Logger } @@ -78,7 +78,7 @@ func (m *lockedErr) Set(err error) { // newBadBlobs creates a new bad blob store wrapping the provided blobs. // Use SetError to manually configure the error returned by all operations. -func newBadBlobs(log *zap.Logger, blobs storage.Blobs) *BadBlobs { +func newBadBlobs(log *zap.Logger, blobs blobstore.Blobs) *BadBlobs { return &BadBlobs{ log: log, blobs: blobs, @@ -92,7 +92,7 @@ func (bad *BadBlobs) SetError(err error) { // Create creates a new blob that can be written optionally takes a size // argument for performance improvements, -1 is unknown size. -func (bad *BadBlobs) Create(ctx context.Context, ref storage.BlobRef, size int64) (storage.BlobWriter, error) { +func (bad *BadBlobs) Create(ctx context.Context, ref blobstore.BlobRef, size int64) (blobstore.BlobWriter, error) { if err := bad.err.Err(); err != nil { return nil, err } @@ -108,7 +108,7 @@ func (bad *BadBlobs) Close() error { } // Open opens a reader with the specified namespace and key. -func (bad *BadBlobs) Open(ctx context.Context, ref storage.BlobRef) (storage.BlobReader, error) { +func (bad *BadBlobs) Open(ctx context.Context, ref blobstore.BlobRef) (blobstore.BlobReader, error) { if err := bad.err.Err(); err != nil { return nil, err } @@ -117,7 +117,7 @@ func (bad *BadBlobs) Open(ctx context.Context, ref storage.BlobRef) (storage.Blo // OpenWithStorageFormat opens a reader for the already-located blob, avoiding the potential need // to check multiple storage formats to find the blob. -func (bad *BadBlobs) OpenWithStorageFormat(ctx context.Context, ref storage.BlobRef, formatVer storage.FormatVersion) (storage.BlobReader, error) { +func (bad *BadBlobs) OpenWithStorageFormat(ctx context.Context, ref blobstore.BlobRef, formatVer blobstore.FormatVersion) (blobstore.BlobReader, error) { if err := bad.err.Err(); err != nil { return nil, err } @@ -125,7 +125,7 @@ func (bad *BadBlobs) OpenWithStorageFormat(ctx context.Context, ref storage.Blob } // Trash deletes the blob with the namespace and key. -func (bad *BadBlobs) Trash(ctx context.Context, ref storage.BlobRef) error { +func (bad *BadBlobs) Trash(ctx context.Context, ref blobstore.BlobRef) error { if err := bad.err.Err(); err != nil { return err } @@ -149,7 +149,7 @@ func (bad *BadBlobs) EmptyTrash(ctx context.Context, namespace []byte, trashedBe } // Delete deletes the blob with the namespace and key. -func (bad *BadBlobs) Delete(ctx context.Context, ref storage.BlobRef) error { +func (bad *BadBlobs) Delete(ctx context.Context, ref blobstore.BlobRef) error { if err := bad.err.Err(); err != nil { return err } @@ -157,7 +157,7 @@ func (bad *BadBlobs) Delete(ctx context.Context, ref storage.BlobRef) error { } // DeleteWithStorageFormat deletes the blob with the namespace, key, and format version. -func (bad *BadBlobs) DeleteWithStorageFormat(ctx context.Context, ref storage.BlobRef, formatVer storage.FormatVersion) error { +func (bad *BadBlobs) DeleteWithStorageFormat(ctx context.Context, ref blobstore.BlobRef, formatVer blobstore.FormatVersion) error { if err := bad.err.Err(); err != nil { return err } @@ -173,7 +173,7 @@ func (bad *BadBlobs) DeleteNamespace(ctx context.Context, ref []byte) (err error } // Stat looks up disk metadata on the blob file. -func (bad *BadBlobs) Stat(ctx context.Context, ref storage.BlobRef) (storage.BlobInfo, error) { +func (bad *BadBlobs) Stat(ctx context.Context, ref blobstore.BlobRef) (blobstore.BlobInfo, error) { if err := bad.err.Err(); err != nil { return nil, err } @@ -183,7 +183,7 @@ func (bad *BadBlobs) Stat(ctx context.Context, ref storage.BlobRef) (storage.Blo // StatWithStorageFormat looks up disk metadata for the blob file with the given storage format // version. This avoids the potential need to check multiple storage formats for the blob // when the format is already known. -func (bad *BadBlobs) StatWithStorageFormat(ctx context.Context, ref storage.BlobRef, formatVer storage.FormatVersion) (storage.BlobInfo, error) { +func (bad *BadBlobs) StatWithStorageFormat(ctx context.Context, ref blobstore.BlobRef, formatVer blobstore.FormatVersion) (blobstore.BlobInfo, error) { if err := bad.err.Err(); err != nil { return nil, err } @@ -193,7 +193,7 @@ func (bad *BadBlobs) StatWithStorageFormat(ctx context.Context, ref storage.Blob // WalkNamespace executes walkFunc for each locally stored blob in the given namespace. // If walkFunc returns a non-nil error, WalkNamespace will stop iterating and return the // error immediately. -func (bad *BadBlobs) WalkNamespace(ctx context.Context, namespace []byte, walkFunc func(storage.BlobInfo) error) error { +func (bad *BadBlobs) WalkNamespace(ctx context.Context, namespace []byte, walkFunc func(blobstore.BlobInfo) error) error { if err := bad.err.Err(); err != nil { return err } diff --git a/private/testblobs/limitedspace.go b/storagenode/blobstore/testblobs/limitedspace.go similarity index 85% rename from private/testblobs/limitedspace.go rename to storagenode/blobstore/testblobs/limitedspace.go index 43d209299..8cd1e94b2 100644 --- a/private/testblobs/limitedspace.go +++ b/storagenode/blobstore/testblobs/limitedspace.go @@ -8,8 +8,8 @@ import ( "go.uber.org/zap" - "storj.io/storj/storage" "storj.io/storj/storagenode" + "storj.io/storj/storagenode/blobstore" ) // ensures that limitedSpaceDB implements storagenode.DB. @@ -32,19 +32,19 @@ func NewLimitedSpaceDB(log *zap.Logger, db storagenode.DB, freeSpace int64) stor } // Pieces returns the blob store. -func (lim *limitedSpaceDB) Pieces() storage.Blobs { +func (lim *limitedSpaceDB) Pieces() blobstore.Blobs { return lim.blobs } // LimitedSpaceBlobs implements a limited space blob store. type LimitedSpaceBlobs struct { - storage.Blobs + blobstore.Blobs log *zap.Logger freeSpace int64 } // newLimitedSpaceBlobs creates a new limited space blob store wrapping the provided blobs. -func newLimitedSpaceBlobs(log *zap.Logger, blobs storage.Blobs, freeSpace int64) *LimitedSpaceBlobs { +func newLimitedSpaceBlobs(log *zap.Logger, blobs blobstore.Blobs, freeSpace int64) *LimitedSpaceBlobs { return &LimitedSpaceBlobs{ log: log, Blobs: blobs, diff --git a/private/testblobs/slow.go b/storagenode/blobstore/testblobs/slow.go similarity index 88% rename from private/testblobs/slow.go rename to storagenode/blobstore/testblobs/slow.go index 5689f6c3e..558e9ec66 100644 --- a/private/testblobs/slow.go +++ b/storagenode/blobstore/testblobs/slow.go @@ -12,8 +12,8 @@ import ( "go.uber.org/zap" "storj.io/common/storj" - "storj.io/storj/storage" "storj.io/storj/storagenode" + "storj.io/storj/storagenode/blobstore" ) // SlowDB implements slow storage node DB. @@ -34,7 +34,7 @@ func NewSlowDB(log *zap.Logger, db storagenode.DB) *SlowDB { } // Pieces returns the blob store. -func (slow *SlowDB) Pieces() storage.Blobs { +func (slow *SlowDB) Pieces() blobstore.Blobs { return slow.blobs } @@ -47,13 +47,13 @@ func (slow *SlowDB) SetLatency(delay time.Duration) { // SlowBlobs implements a slow blob store. type SlowBlobs struct { delay int64 // time.Duration - blobs storage.Blobs + blobs blobstore.Blobs log *zap.Logger } // newSlowBlobs creates a new slow blob store wrapping the provided blobs. // Use SetLatency to dynamically configure the latency of all operations. -func newSlowBlobs(log *zap.Logger, blobs storage.Blobs) *SlowBlobs { +func newSlowBlobs(log *zap.Logger, blobs blobstore.Blobs) *SlowBlobs { return &SlowBlobs{ log: log, blobs: blobs, @@ -62,7 +62,7 @@ func newSlowBlobs(log *zap.Logger, blobs storage.Blobs) *SlowBlobs { // Create creates a new blob that can be written optionally takes a size // argument for performance improvements, -1 is unknown size. -func (slow *SlowBlobs) Create(ctx context.Context, ref storage.BlobRef, size int64) (storage.BlobWriter, error) { +func (slow *SlowBlobs) Create(ctx context.Context, ref blobstore.BlobRef, size int64) (blobstore.BlobWriter, error) { if err := slow.sleep(ctx); err != nil { return nil, errs.Wrap(err) } @@ -75,7 +75,7 @@ func (slow *SlowBlobs) Close() error { } // Open opens a reader with the specified namespace and key. -func (slow *SlowBlobs) Open(ctx context.Context, ref storage.BlobRef) (storage.BlobReader, error) { +func (slow *SlowBlobs) Open(ctx context.Context, ref blobstore.BlobRef) (blobstore.BlobReader, error) { if err := slow.sleep(ctx); err != nil { return nil, errs.Wrap(err) } @@ -84,7 +84,7 @@ func (slow *SlowBlobs) Open(ctx context.Context, ref storage.BlobRef) (storage.B // OpenWithStorageFormat opens a reader for the already-located blob, avoiding the potential need // to check multiple storage formats to find the blob. -func (slow *SlowBlobs) OpenWithStorageFormat(ctx context.Context, ref storage.BlobRef, formatVer storage.FormatVersion) (storage.BlobReader, error) { +func (slow *SlowBlobs) OpenWithStorageFormat(ctx context.Context, ref blobstore.BlobRef, formatVer blobstore.FormatVersion) (blobstore.BlobReader, error) { if err := slow.sleep(ctx); err != nil { return nil, errs.Wrap(err) } @@ -92,7 +92,7 @@ func (slow *SlowBlobs) OpenWithStorageFormat(ctx context.Context, ref storage.Bl } // Trash deletes the blob with the namespace and key. -func (slow *SlowBlobs) Trash(ctx context.Context, ref storage.BlobRef) error { +func (slow *SlowBlobs) Trash(ctx context.Context, ref blobstore.BlobRef) error { if err := slow.sleep(ctx); err != nil { return errs.Wrap(err) } @@ -116,7 +116,7 @@ func (slow *SlowBlobs) EmptyTrash(ctx context.Context, namespace []byte, trashed } // Delete deletes the blob with the namespace and key. -func (slow *SlowBlobs) Delete(ctx context.Context, ref storage.BlobRef) error { +func (slow *SlowBlobs) Delete(ctx context.Context, ref blobstore.BlobRef) error { if err := slow.sleep(ctx); err != nil { return errs.Wrap(err) } @@ -124,7 +124,7 @@ func (slow *SlowBlobs) Delete(ctx context.Context, ref storage.BlobRef) error { } // DeleteWithStorageFormat deletes the blob with the namespace, key, and format version. -func (slow *SlowBlobs) DeleteWithStorageFormat(ctx context.Context, ref storage.BlobRef, formatVer storage.FormatVersion) error { +func (slow *SlowBlobs) DeleteWithStorageFormat(ctx context.Context, ref blobstore.BlobRef, formatVer blobstore.FormatVersion) error { if err := slow.sleep(ctx); err != nil { return errs.Wrap(err) } @@ -140,7 +140,7 @@ func (slow *SlowBlobs) DeleteNamespace(ctx context.Context, ref []byte) (err err } // Stat looks up disk metadata on the blob file. -func (slow *SlowBlobs) Stat(ctx context.Context, ref storage.BlobRef) (storage.BlobInfo, error) { +func (slow *SlowBlobs) Stat(ctx context.Context, ref blobstore.BlobRef) (blobstore.BlobInfo, error) { if err := slow.sleep(ctx); err != nil { return nil, errs.Wrap(err) } @@ -150,7 +150,7 @@ func (slow *SlowBlobs) Stat(ctx context.Context, ref storage.BlobRef) (storage.B // StatWithStorageFormat looks up disk metadata for the blob file with the given storage format // version. This avoids the potential need to check multiple storage formats for the blob // when the format is already known. -func (slow *SlowBlobs) StatWithStorageFormat(ctx context.Context, ref storage.BlobRef, formatVer storage.FormatVersion) (storage.BlobInfo, error) { +func (slow *SlowBlobs) StatWithStorageFormat(ctx context.Context, ref blobstore.BlobRef, formatVer blobstore.FormatVersion) (blobstore.BlobInfo, error) { if err := slow.sleep(ctx); err != nil { return nil, errs.Wrap(err) } @@ -160,7 +160,7 @@ func (slow *SlowBlobs) StatWithStorageFormat(ctx context.Context, ref storage.Bl // WalkNamespace executes walkFunc for each locally stored blob in the given namespace. // If walkFunc returns a non-nil error, WalkNamespace will stop iterating and return the // error immediately. -func (slow *SlowBlobs) WalkNamespace(ctx context.Context, namespace []byte, walkFunc func(storage.BlobInfo) error) error { +func (slow *SlowBlobs) WalkNamespace(ctx context.Context, namespace []byte, walkFunc func(blobstore.BlobInfo) error) error { if err := slow.sleep(ctx); err != nil { return errs.Wrap(err) } diff --git a/storagenode/gracefulexit/chore_test.go b/storagenode/gracefulexit/chore_test.go index ce72f3d94..2ef36688f 100644 --- a/storagenode/gracefulexit/chore_test.go +++ b/storagenode/gracefulexit/chore_test.go @@ -17,7 +17,7 @@ import ( "storj.io/common/testrand" "storj.io/storj/private/testplanet" "storj.io/storj/satellite/overlay" - "storj.io/storj/storage" + "storj.io/storj/storagenode/blobstore" ) func TestChore(t *testing.T) { @@ -129,7 +129,7 @@ func exitSatellite(ctx context.Context, t *testing.T, planet *testplanet.Planet, namespaces, err := exitingNode.DB.Pieces().ListNamespaces(ctx) require.NoError(t, err) for _, ns := range namespaces { - err = exitingNode.DB.Pieces().WalkNamespace(ctx, ns, func(blobInfo storage.BlobInfo) error { + err = exitingNode.DB.Pieces().WalkNamespace(ctx, ns, func(blobInfo blobstore.BlobInfo) error { return errs.New("found a piece on the node. this shouldn't happen.") }) require.NoError(t, err) @@ -146,7 +146,7 @@ func getNodePieceCounts(ctx context.Context, planet *testplanet.Planet) (_ map[s return nil, err } for _, ns := range namespaces { - err = node.DB.Pieces().WalkNamespace(ctx, ns, func(blobInfo storage.BlobInfo) error { + err = node.DB.Pieces().WalkNamespace(ctx, ns, func(blobInfo blobstore.BlobInfo) error { nodePieceCounts[node.ID()]++ return nil }) diff --git a/storagenode/gracefulexit/worker_test.go b/storagenode/gracefulexit/worker_test.go index 83891d386..a62f68c73 100644 --- a/storagenode/gracefulexit/worker_test.go +++ b/storagenode/gracefulexit/worker_test.go @@ -16,11 +16,11 @@ import ( "storj.io/common/rpc/rpcstatus" "storj.io/common/testcontext" "storj.io/common/testrand" - "storj.io/storj/private/testblobs" "storj.io/storj/private/testplanet" "storj.io/storj/satellite" "storj.io/storj/satellite/overlay" "storj.io/storj/storagenode" + "storj.io/storj/storagenode/blobstore/testblobs" "storj.io/storj/storagenode/gracefulexit" ) diff --git a/storagenode/peer.go b/storagenode/peer.go index 809d37131..35970224f 100644 --- a/storagenode/peer.go +++ b/storagenode/peer.go @@ -30,10 +30,10 @@ import ( "storj.io/storj/private/multinodepb" "storj.io/storj/private/server" "storj.io/storj/private/version/checker" - "storj.io/storj/storage" - "storj.io/storj/storage/filestore" "storj.io/storj/storagenode/apikeys" "storj.io/storj/storagenode/bandwidth" + "storj.io/storj/storagenode/blobstore" + "storj.io/storj/storagenode/blobstore/filestore" "storj.io/storj/storagenode/collector" "storj.io/storj/storagenode/console" "storj.io/storj/storagenode/console/consoleserver" @@ -80,7 +80,7 @@ type DB interface { // Close closes the database Close() error - Pieces() storage.Blobs + Pieces() blobstore.Blobs Orders() orders.DB V0PieceInfo() pieces.V0PieceInfoDB diff --git a/storagenode/pieces/cache.go b/storagenode/pieces/cache.go index b40e13657..2f60884f4 100644 --- a/storagenode/pieces/cache.go +++ b/storagenode/pieces/cache.go @@ -14,7 +14,7 @@ import ( "storj.io/common/storj" "storj.io/common/sync2" - "storj.io/storj/storage" + "storj.io/storj/storagenode/blobstore" ) // CacheService updates the space used cache. @@ -157,7 +157,7 @@ func (service *CacheService) Close() (err error) { // // architecture: Database type BlobsUsageCache struct { - storage.Blobs + blobstore.Blobs log *zap.Logger mu sync.Mutex @@ -168,7 +168,7 @@ type BlobsUsageCache struct { } // NewBlobsUsageCache creates a new disk blob store with a space used cache. -func NewBlobsUsageCache(log *zap.Logger, blob storage.Blobs) *BlobsUsageCache { +func NewBlobsUsageCache(log *zap.Logger, blob blobstore.Blobs) *BlobsUsageCache { return &BlobsUsageCache{ log: log, Blobs: blob, @@ -177,7 +177,7 @@ func NewBlobsUsageCache(log *zap.Logger, blob storage.Blobs) *BlobsUsageCache { } // NewBlobsUsageCacheTest creates a new disk blob store with a space used cache. -func NewBlobsUsageCacheTest(log *zap.Logger, blob storage.Blobs, piecesTotal, piecesContentSize, trashTotal int64, spaceUsedBySatellite map[storj.NodeID]SatelliteUsage) *BlobsUsageCache { +func NewBlobsUsageCacheTest(log *zap.Logger, blob blobstore.Blobs, piecesTotal, piecesContentSize, trashTotal int64, spaceUsedBySatellite map[storj.NodeID]SatelliteUsage) *BlobsUsageCache { return &BlobsUsageCache{ log: log, Blobs: blob, @@ -222,7 +222,7 @@ func (blobs *BlobsUsageCache) SpaceUsedForTrash(ctx context.Context) (int64, err // Delete gets the size of the piece that is going to be deleted then deletes it and // updates the space used cache accordingly. -func (blobs *BlobsUsageCache) Delete(ctx context.Context, blobRef storage.BlobRef) error { +func (blobs *BlobsUsageCache) Delete(ctx context.Context, blobRef blobstore.BlobRef) error { pieceTotal, pieceContentSize, err := blobs.pieceSizes(ctx, blobRef) if err != nil { return Error.Wrap(err) @@ -241,7 +241,7 @@ func (blobs *BlobsUsageCache) Delete(ctx context.Context, blobRef storage.BlobRe return nil } -func (blobs *BlobsUsageCache) pieceSizes(ctx context.Context, blobRef storage.BlobRef) (pieceTotal int64, pieceContentSize int64, err error) { +func (blobs *BlobsUsageCache) pieceSizes(ctx context.Context, blobRef blobstore.BlobRef) (pieceTotal int64, pieceContentSize int64, err error) { blobInfo, err := blobs.Stat(ctx, blobRef) if err != nil { return 0, 0, err @@ -286,7 +286,7 @@ func (blobs *BlobsUsageCache) ensurePositiveCacheValue(value *int64, name string } // Trash moves the ref to the trash and updates the cache. -func (blobs *BlobsUsageCache) Trash(ctx context.Context, blobRef storage.BlobRef) error { +func (blobs *BlobsUsageCache) Trash(ctx context.Context, blobRef blobstore.BlobRef) error { pieceTotal, pieceContentSize, err := blobs.pieceSizes(ctx, blobRef) if err != nil { return Error.Wrap(err) @@ -336,7 +336,7 @@ func (blobs *BlobsUsageCache) RestoreTrash(ctx context.Context, namespace []byte } for _, key := range keysRestored { - pieceTotal, pieceContentSize, sizeErr := blobs.pieceSizes(ctx, storage.BlobRef{ + pieceTotal, pieceContentSize, sizeErr := blobs.pieceSizes(ctx, blobstore.BlobRef{ Key: key, Namespace: namespace, }) @@ -492,9 +492,9 @@ func (blobs *BlobsUsageCache) Close() error { } // TestCreateV0 creates a new V0 blob that can be written. This is only appropriate in test situations. -func (blobs *BlobsUsageCache) TestCreateV0(ctx context.Context, ref storage.BlobRef) (_ storage.BlobWriter, err error) { +func (blobs *BlobsUsageCache) TestCreateV0(ctx context.Context, ref blobstore.BlobRef) (_ blobstore.BlobWriter, err error) { fStore := blobs.Blobs.(interface { - TestCreateV0(ctx context.Context, ref storage.BlobRef) (_ storage.BlobWriter, err error) + TestCreateV0(ctx context.Context, ref blobstore.BlobRef) (_ blobstore.BlobWriter, err error) }) return fStore.TestCreateV0(ctx, ref) } diff --git a/storagenode/pieces/cache_test.go b/storagenode/pieces/cache_test.go index 333d4b3ee..91608fa37 100644 --- a/storagenode/pieces/cache_test.go +++ b/storagenode/pieces/cache_test.go @@ -18,9 +18,9 @@ import ( "storj.io/common/testcontext" "storj.io/common/testrand" "storj.io/storj/private/testplanet" - "storj.io/storj/storage" - "storj.io/storj/storage/filestore" "storj.io/storj/storagenode" + "storj.io/storj/storagenode/blobstore" + "storj.io/storj/storagenode/blobstore/filestore" "storj.io/storj/storagenode/pieces" "storj.io/storj/storagenode/storagenodedb/storagenodedbtest" ) @@ -195,13 +195,13 @@ func TestCachServiceRun(t *testing.T) { storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) { spaceUsedDB := db.PieceSpaceUsedDB() - blobstore, err := filestore.NewAt(log, ctx.Dir(), filestore.DefaultConfig) + store, err := filestore.NewAt(log, ctx.Dir(), filestore.DefaultConfig) require.NoError(t, err) // Prior to initializing the cache service (which should walk the files), // write a single file so something exists to be counted expBlobSize := memory.KB - w, err := blobstore.Create(ctx, storage.BlobRef{ + w, err := store.Create(ctx, blobstore.BlobRef{ Namespace: testrand.NodeID().Bytes(), Key: testrand.PieceID().Bytes(), }, -1) @@ -212,19 +212,19 @@ func TestCachServiceRun(t *testing.T) { // Now write a piece that we are going to trash expTrashSize := 2 * memory.KB - trashRef := storage.BlobRef{ + trashRef := blobstore.BlobRef{ Namespace: testrand.NodeID().Bytes(), Key: testrand.PieceID().Bytes(), } - w, err = blobstore.Create(ctx, trashRef, -1) + w, err = store.Create(ctx, trashRef, -1) require.NoError(t, err) _, err = w.Write(testrand.Bytes(expTrashSize)) require.NoError(t, err) require.NoError(t, w.Commit(ctx)) - require.NoError(t, blobstore.Trash(ctx, trashRef)) // trash it + require.NoError(t, store.Trash(ctx, trashRef)) // trash it // Now instantiate the cache - cache := pieces.NewBlobsUsageCache(log, blobstore) + cache := pieces.NewBlobsUsageCache(log, store) cacheService := pieces.NewService(log, cache, pieces.NewStore(log, pieces.NewFileWalker(log, cache, nil), cache, nil, nil, spaceUsedDB, pieces.DefaultConfig), @@ -591,7 +591,7 @@ func TestCacheCreateDeleteAndTrash(t *testing.T) { cache := pieces.NewBlobsUsageCache(zaptest.NewLogger(t), db.Pieces()) pieceContent := []byte("stuff") satelliteID := testrand.NodeID() - refs := []storage.BlobRef{ + refs := []blobstore.BlobRef{ { Namespace: satelliteID.Bytes(), Key: testrand.Bytes(32), diff --git a/storagenode/pieces/deleter_test.go b/storagenode/pieces/deleter_test.go index 8880a1e79..0dd3285e9 100644 --- a/storagenode/pieces/deleter_test.go +++ b/storagenode/pieces/deleter_test.go @@ -15,8 +15,8 @@ import ( "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/common/testrand" - "storj.io/storj/storage/filestore" "storj.io/storj/storagenode" + "storj.io/storj/storagenode/blobstore/filestore" "storj.io/storj/storagenode/pieces" "storj.io/storj/storagenode/storagenodedb/storagenodedbtest" ) diff --git a/storagenode/pieces/filewalker.go b/storagenode/pieces/filewalker.go index 155df623c..ce38c52fb 100644 --- a/storagenode/pieces/filewalker.go +++ b/storagenode/pieces/filewalker.go @@ -9,20 +9,20 @@ import ( "go.uber.org/zap" "storj.io/common/storj" - "storj.io/storj/storage" - "storj.io/storj/storage/filestore" + "storj.io/storj/storagenode/blobstore" + "storj.io/storj/storagenode/blobstore/filestore" ) // FileWalker implements methods to walk over pieces in a storage directory. type FileWalker struct { log *zap.Logger - blobs storage.Blobs + blobs blobstore.Blobs v0PieceInfo V0PieceInfoDB } // NewFileWalker creates a new FileWalker. -func NewFileWalker(log *zap.Logger, blobs storage.Blobs, db V0PieceInfoDB) *FileWalker { +func NewFileWalker(log *zap.Logger, blobs blobstore.Blobs, db V0PieceInfoDB) *FileWalker { return &FileWalker{ log: log, blobs: blobs, @@ -39,7 +39,7 @@ func NewFileWalker(log *zap.Logger, blobs storage.Blobs, db V0PieceInfoDB) *File func (fw *FileWalker) WalkSatellitePieces(ctx context.Context, satellite storj.NodeID, fn func(StoredPieceAccess) error) (err error) { defer mon.Task()(&ctx)(&err) // iterate over all in V1 storage, skipping v0 pieces - err = fw.blobs.WalkNamespace(ctx, satellite.Bytes(), func(blobInfo storage.BlobInfo) error { + err = fw.blobs.WalkNamespace(ctx, satellite.Bytes(), func(blobInfo blobstore.BlobInfo) error { if blobInfo.StorageFormatVersion() < filestore.FormatV1 { // skip v0 pieces, which are handled separately return nil diff --git a/storagenode/pieces/readwrite.go b/storagenode/pieces/readwrite.go index 49c71eb3b..9c9883ea7 100644 --- a/storagenode/pieces/readwrite.go +++ b/storagenode/pieces/readwrite.go @@ -15,8 +15,8 @@ import ( "storj.io/common/pb" "storj.io/common/storj" - "storj.io/storj/storage" - "storj.io/storj/storage/filestore" + "storj.io/storj/storagenode/blobstore" + "storj.io/storj/storagenode/blobstore/filestore" ) const ( @@ -63,16 +63,16 @@ var BadFormatVersion = errs.Class("Incompatible storage format version") type Writer struct { log *zap.Logger hash hash.Hash - blob storage.BlobWriter + blob blobstore.BlobWriter pieceSize int64 // piece size only; i.e., not including piece header - blobs storage.Blobs + blobs blobstore.Blobs satellite storj.NodeID closed bool } -// NewWriter creates a new writer for storage.BlobWriter. -func NewWriter(log *zap.Logger, blobWriter storage.BlobWriter, blobs storage.Blobs, satellite storj.NodeID, hashAlgorithm pb.PieceHashAlgorithm) (*Writer, error) { +// NewWriter creates a new writer for blobstore.BlobWriter. +func NewWriter(log *zap.Logger, blobWriter blobstore.BlobWriter, blobs blobstore.Blobs, satellite storj.NodeID, hashAlgorithm pb.PieceHashAlgorithm) (*Writer, error) { w := &Writer{log: log} if blobWriter.StorageFormatVersion() >= filestore.FormatV1 { // We skip past the reserved header area for now- we want the header to be at the @@ -211,15 +211,15 @@ func (w *Writer) Cancel(ctx context.Context) (err error) { // Reader implements a piece reader that reads content from blob store. type Reader struct { - formatVersion storage.FormatVersion + formatVersion blobstore.FormatVersion - blob storage.BlobReader + blob blobstore.BlobReader pos int64 // relative to file start; i.e., it includes piece header pieceSize int64 // piece size only; i.e., not including piece header } -// NewReader creates a new reader for storage.BlobReader. -func NewReader(blob storage.BlobReader) (*Reader, error) { +// NewReader creates a new reader for blobstore.BlobReader. +func NewReader(blob blobstore.BlobReader) (*Reader, error) { size, err := blob.Size() if err != nil { return nil, Error.Wrap(err) @@ -241,7 +241,7 @@ func NewReader(blob storage.BlobReader) (*Reader, error) { } // StorageFormatVersion returns the storage format version of the piece being read. -func (r *Reader) StorageFormatVersion() storage.FormatVersion { +func (r *Reader) StorageFormatVersion() blobstore.FormatVersion { return r.formatVersion } diff --git a/storagenode/pieces/readwrite_test.go b/storagenode/pieces/readwrite_test.go index 7cd8a3ec4..76c13d090 100644 --- a/storagenode/pieces/readwrite_test.go +++ b/storagenode/pieces/readwrite_test.go @@ -19,8 +19,8 @@ import ( "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/common/testrand" - "storj.io/storj/storage" - "storj.io/storj/storage/filestore" + "storj.io/storj/storagenode/blobstore" + "storj.io/storj/storagenode/blobstore/filestore" "storj.io/storj/storagenode/pieces" ) @@ -158,7 +158,7 @@ func readAndWritePiece(t *testing.T, content []byte) { assert.Truef(t, header.OrderLimit.PieceExpiration.Equal(expirationTime), "*header.ExpirationTime = %s, but expected expirationTime = %s", header.OrderLimit.PieceExpiration, expirationTime) assert.Equal(t, pb.OrderLimit{PieceExpiration: expirationTime.UTC()}, header.OrderLimit) - assert.Equal(t, filestore.FormatV1, storage.FormatVersion(header.FormatVersion)) + assert.Equal(t, filestore.FormatV1, blobstore.FormatVersion(header.FormatVersion)) // make sure seek-nowhere works as expected after piece header is read too // (from the point of view of the piece store, the file position has not moved) diff --git a/storagenode/pieces/store.go b/storagenode/pieces/store.go index d7cc61955..c304ff211 100644 --- a/storagenode/pieces/store.go +++ b/storagenode/pieces/store.go @@ -17,8 +17,8 @@ import ( "storj.io/common/memory" "storj.io/common/pb" "storj.io/common/storj" - "storj.io/storj/storage" - "storj.io/storj/storage/filestore" + "storj.io/storj/storagenode/blobstore" + "storj.io/storj/storagenode/blobstore/filestore" ) var ( @@ -72,7 +72,7 @@ type PieceExpirationDB interface { // V0PieceInfoDB stores meta information about pieces stored with storage format V0 (where // metadata goes in the "pieceinfo" table in the storagenodedb). The actual pieces are stored -// behind something providing the storage.Blobs interface. +// behind something providing the blobstore.Blobs interface. // // architecture: Database type V0PieceInfoDB interface { @@ -90,7 +90,7 @@ type V0PieceInfoDB interface { // non-nil error, WalkSatelliteV0Pieces will stop iterating and return the error // immediately. The ctx parameter is intended specifically to allow canceling iteration // early. - WalkSatelliteV0Pieces(ctx context.Context, blobStore storage.Blobs, satellite storj.NodeID, walkFunc func(StoredPieceAccess) error) error + WalkSatelliteV0Pieces(ctx context.Context, blobStore blobstore.Blobs, satellite storj.NodeID, walkFunc func(StoredPieceAccess) error) error } // V0PieceInfoDBForTest is like V0PieceInfoDB, but adds on the Add() method so @@ -127,7 +127,7 @@ type PieceSpaceUsedDB interface { // StoredPieceAccess allows inspection and manipulation of a piece during iteration with // WalkSatellitePieces-type methods. type StoredPieceAccess interface { - storage.BlobInfo + blobstore.BlobInfo // PieceID gives the pieceID of the piece PieceID() storj.PieceID @@ -171,7 +171,7 @@ type Store struct { log *zap.Logger config Config - blobs storage.Blobs + blobs blobstore.Blobs expirationInfo PieceExpirationDB spaceUsedDB PieceSpaceUsedDB v0PieceInfo V0PieceInfoDB @@ -186,7 +186,7 @@ type StoreForTest struct { } // NewStore creates a new piece store. -func NewStore(log *zap.Logger, fw *FileWalker, blobs storage.Blobs, v0PieceInfo V0PieceInfoDB, expirationInfo PieceExpirationDB, spaceUsedDB PieceSpaceUsedDB, config Config) *Store { +func NewStore(log *zap.Logger, fw *FileWalker, blobs blobstore.Blobs, v0PieceInfo V0PieceInfoDB, expirationInfo PieceExpirationDB, spaceUsedDB PieceSpaceUsedDB, config Config) *Store { return &Store{ log: log, config: config, @@ -231,7 +231,7 @@ func (store *Store) VerifyStorageDirWithTimeout(ctx context.Context, id storj.No // Writer returns a new piece writer. func (store *Store) Writer(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID, hashAlgorithm pb.PieceHashAlgorithm) (_ *Writer, err error) { defer mon.Task()(&ctx)(&err) - blobWriter, err := store.blobs.Create(ctx, storage.BlobRef{ + blobWriter, err := store.blobs.Create(ctx, blobstore.BlobRef{ Namespace: satellite.Bytes(), Key: pieceID.Bytes(), }, store.config.WritePreallocSize.Int64()) @@ -247,19 +247,19 @@ func (store *Store) Writer(ctx context.Context, satellite storj.NodeID, pieceID // This is meant to be used externally only in test situations (thus the StoreForTest receiver // type). func (store StoreForTest) WriterForFormatVersion(ctx context.Context, satellite storj.NodeID, - pieceID storj.PieceID, formatVersion storage.FormatVersion, hashAlgorithm pb.PieceHashAlgorithm) (_ *Writer, err error) { + pieceID storj.PieceID, formatVersion blobstore.FormatVersion, hashAlgorithm pb.PieceHashAlgorithm) (_ *Writer, err error) { defer mon.Task()(&ctx)(&err) - blobRef := storage.BlobRef{ + blobRef := blobstore.BlobRef{ Namespace: satellite.Bytes(), Key: pieceID.Bytes(), } - var blobWriter storage.BlobWriter + var blobWriter blobstore.BlobWriter switch formatVersion { case filestore.FormatV0: fStore, ok := store.blobs.(interface { - TestCreateV0(ctx context.Context, ref storage.BlobRef) (_ storage.BlobWriter, err error) + TestCreateV0(ctx context.Context, ref blobstore.BlobRef) (_ blobstore.BlobWriter, err error) }) if !ok { return nil, Error.New("can't make a WriterForFormatVersion with this blob store (%T)", store.blobs) @@ -280,10 +280,10 @@ func (store StoreForTest) WriterForFormatVersion(ctx context.Context, satellite // ReaderWithStorageFormat returns a new piece reader for a located piece, which avoids the // potential need to check multiple storage formats to find the right blob. func (store *StoreForTest) ReaderWithStorageFormat(ctx context.Context, satellite storj.NodeID, - pieceID storj.PieceID, formatVersion storage.FormatVersion) (_ *Reader, err error) { + pieceID storj.PieceID, formatVersion blobstore.FormatVersion) (_ *Reader, err error) { defer mon.Task()(&ctx)(&err) - ref := storage.BlobRef{Namespace: satellite.Bytes(), Key: pieceID.Bytes()} + ref := blobstore.BlobRef{Namespace: satellite.Bytes(), Key: pieceID.Bytes()} blob, err := store.blobs.OpenWithStorageFormat(ctx, ref, formatVersion) if err != nil { if os.IsNotExist(err) { @@ -299,7 +299,7 @@ func (store *StoreForTest) ReaderWithStorageFormat(ctx context.Context, satellit // Reader returns a new piece reader. func (store *Store) Reader(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (_ *Reader, err error) { defer mon.Task()(&ctx)(&err) - blob, err := store.blobs.Open(ctx, storage.BlobRef{ + blob, err := store.blobs.Open(ctx, blobstore.BlobRef{ Namespace: satellite.Bytes(), Key: pieceID.Bytes(), }) @@ -317,7 +317,7 @@ func (store *Store) Reader(ctx context.Context, satellite storj.NodeID, pieceID // Delete deletes the specified piece. func (store *Store) Delete(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (err error) { defer mon.Task()(&ctx)(&err) - err = store.blobs.Delete(ctx, storage.BlobRef{ + err = store.blobs.Delete(ctx, blobstore.BlobRef{ Namespace: satellite.Bytes(), Key: pieceID.Bytes(), }) @@ -366,7 +366,7 @@ func (store *Store) Trash(ctx context.Context, satellite storj.NodeID, pieceID s // Check if the MaxFormatVersionSupported piece exists. If not, we assume // this is an old piece version and attempt to migrate it. - _, err = store.blobs.StatWithStorageFormat(ctx, storage.BlobRef{ + _, err = store.blobs.StatWithStorageFormat(ctx, blobstore.BlobRef{ Namespace: satellite.Bytes(), Key: pieceID.Bytes(), }, filestore.MaxFormatVersionSupported) @@ -386,7 +386,7 @@ func (store *Store) Trash(ctx context.Context, satellite storj.NodeID, pieceID s } err = store.expirationInfo.Trash(ctx, satellite, pieceID) - err = errs.Combine(err, store.blobs.Trash(ctx, storage.BlobRef{ + err = errs.Combine(err, store.blobs.Trash(ctx, blobstore.BlobRef{ Namespace: satellite.Bytes(), Key: pieceID.Bytes(), })) @@ -471,7 +471,7 @@ func (store *Store) MigrateV0ToV1(ctx context.Context, satelliteID storj.NodeID, return Error.Wrap(err) } - err = store.blobs.DeleteWithStorageFormat(ctx, storage.BlobRef{ + err = store.blobs.DeleteWithStorageFormat(ctx, blobstore.BlobRef{ Namespace: satelliteID.Bytes(), Key: pieceID.Bytes(), }, filestore.FormatV0) @@ -751,20 +751,20 @@ func (store *Store) CheckWritabilityWithTimeout(ctx context.Context, timeout tim } // Stat looks up disk metadata on the blob file. -func (store *Store) Stat(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (storage.BlobInfo, error) { - return store.blobs.Stat(ctx, storage.BlobRef{ +func (store *Store) Stat(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (blobstore.BlobInfo, error) { + return store.blobs.Stat(ctx, blobstore.BlobRef{ Namespace: satellite.Bytes(), Key: pieceID.Bytes(), }) } type storedPieceAccess struct { - storage.BlobInfo + blobstore.BlobInfo pieceID storj.PieceID - blobs storage.Blobs + blobs blobstore.Blobs } -func newStoredPieceAccess(blobs storage.Blobs, blobInfo storage.BlobInfo) (storedPieceAccess, error) { +func newStoredPieceAccess(blobs blobstore.Blobs, blobInfo blobstore.BlobInfo) (storedPieceAccess, error) { ref := blobInfo.BlobRef() pieceID, err := storj.PieceIDFromBytes(ref.Key) if err != nil { diff --git a/storagenode/pieces/store_test.go b/storagenode/pieces/store_test.go index d18fab0d7..c0c69acf6 100644 --- a/storagenode/pieces/store_test.go +++ b/storagenode/pieces/store_test.go @@ -26,9 +26,9 @@ import ( "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/common/testrand" - "storj.io/storj/storage" - "storj.io/storj/storage/filestore" "storj.io/storj/storagenode" + "storj.io/storj/storagenode/blobstore" + "storj.io/storj/storagenode/blobstore/filestore" "storj.io/storj/storagenode/pieces" "storj.io/storj/storagenode/storagenodedb/storagenodedbtest" "storj.io/storj/storagenode/trust" @@ -144,7 +144,7 @@ func TestPieces(t *testing.T) { } } -func writeAPiece(ctx context.Context, t testing.TB, store *pieces.Store, satelliteID storj.NodeID, pieceID storj.PieceID, data []byte, atTime time.Time, expireTime *time.Time, formatVersion storage.FormatVersion) { +func writeAPiece(ctx context.Context, t testing.TB, store *pieces.Store, satelliteID storj.NodeID, pieceID storj.PieceID, data []byte, atTime time.Time, expireTime *time.Time, formatVersion blobstore.FormatVersion) { tStore := &pieces.StoreForTest{store} writer, err := tStore.WriterForFormatVersion(ctx, satelliteID, pieceID, formatVersion, pb.PieceHashAlgorithm_SHA256) require.NoError(t, err) @@ -165,18 +165,18 @@ func writeAPiece(ctx context.Context, t testing.TB, store *pieces.Store, satelli require.NoError(t, err) } -func verifyPieceHandle(t testing.TB, reader *pieces.Reader, expectDataLen int, expectCreateTime time.Time, expectFormat storage.FormatVersion) { +func verifyPieceHandle(t testing.TB, reader *pieces.Reader, expectDataLen int, expectCreateTime time.Time, expectFormat blobstore.FormatVersion) { assert.Equal(t, expectFormat, reader.StorageFormatVersion()) assert.Equal(t, int64(expectDataLen), reader.Size()) if expectFormat != filestore.FormatV0 { pieceHeader, err := reader.GetPieceHeader() require.NoError(t, err) - assert.Equal(t, expectFormat, storage.FormatVersion(pieceHeader.FormatVersion)) + assert.Equal(t, expectFormat, blobstore.FormatVersion(pieceHeader.FormatVersion)) assert.True(t, expectCreateTime.Equal(pieceHeader.CreationTime)) } } -func tryOpeningAPiece(ctx context.Context, t testing.TB, store *pieces.StoreForTest, satelliteID storj.NodeID, pieceID storj.PieceID, expectDataLen int, expectTime time.Time, expectFormat storage.FormatVersion) { +func tryOpeningAPiece(ctx context.Context, t testing.TB, store *pieces.StoreForTest, satelliteID storj.NodeID, pieceID storj.PieceID, expectDataLen int, expectTime time.Time, expectFormat blobstore.FormatVersion) { reader, err := store.Reader(ctx, satelliteID, pieceID) require.NoError(t, err) verifyPieceHandle(t, reader, expectDataLen, expectTime, expectFormat) @@ -191,7 +191,7 @@ func tryOpeningAPiece(ctx context.Context, t testing.TB, store *pieces.StoreForT func TestTrashAndRestore(t *testing.T) { type testfile struct { data []byte - formatVer storage.FormatVersion + formatVer blobstore.FormatVersion } type testpiece struct { pieceID storj.PieceID @@ -476,7 +476,7 @@ func TestTrashAndRestore(t *testing.T) { }) } -func verifyPieceData(ctx context.Context, t testing.TB, store *pieces.StoreForTest, satelliteID storj.NodeID, pieceID storj.PieceID, formatVer storage.FormatVersion, expected []byte, expiration time.Time, publicKey storj.PiecePublicKey) { +func verifyPieceData(ctx context.Context, t testing.TB, store *pieces.StoreForTest, satelliteID storj.NodeID, pieceID storj.PieceID, formatVer blobstore.FormatVersion, expected []byte, expiration time.Time, publicKey storj.PiecePublicKey) { r, err := store.ReaderWithStorageFormat(ctx, satelliteID, pieceID, formatVer) require.NoError(t, err) diff --git a/storagenode/piecestore/endpoint_test.go b/storagenode/piecestore/endpoint_test.go index 6787f6497..b6be95af2 100644 --- a/storagenode/piecestore/endpoint_test.go +++ b/storagenode/piecestore/endpoint_test.go @@ -26,10 +26,10 @@ import ( "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/common/testrand" - "storj.io/storj/private/testblobs" "storj.io/storj/private/testplanet" "storj.io/storj/storagenode" "storj.io/storj/storagenode/bandwidth" + "storj.io/storj/storagenode/blobstore/testblobs" "storj.io/uplink/private/piecestore" ) diff --git a/storagenode/retain/retain_test.go b/storagenode/retain/retain_test.go index 071cca46b..85756678a 100644 --- a/storagenode/retain/retain_test.go +++ b/storagenode/retain/retain_test.go @@ -21,9 +21,9 @@ import ( "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/common/testrand" - "storj.io/storj/storage" - "storj.io/storj/storage/filestore" "storj.io/storj/storagenode" + "storj.io/storj/storagenode/blobstore" + "storj.io/storj/storagenode/blobstore/filestore" "storj.io/storj/storagenode/pieces" "storj.io/storj/storagenode/retain" "storj.io/storj/storagenode/storagenodedb/storagenodedbtest" @@ -59,7 +59,7 @@ func TestRetainPieces(t *testing.T) { // keep pieceIDs[numPiecesToKeep+numOldPieces : numPieces] (recent + not in filter) // add all pieces to the node pieces info DB - but only count piece ids in filter for index, id := range pieceIDs { - var formatVer storage.FormatVersion + var formatVer blobstore.FormatVersion if index%2 == 0 { formatVer = filestore.FormatV0 } else { diff --git a/storagenode/storagenodedb/database.go b/storagenode/storagenodedb/database.go index 7ef01e1a7..c92d953a4 100644 --- a/storagenode/storagenodedb/database.go +++ b/storagenode/storagenodedb/database.go @@ -23,10 +23,10 @@ import ( "storj.io/private/dbutil/sqliteutil" "storj.io/private/tagsql" "storj.io/storj/private/migrate" - "storj.io/storj/storage" - "storj.io/storj/storage/filestore" "storj.io/storj/storagenode/apikeys" "storj.io/storj/storagenode/bandwidth" + "storj.io/storj/storagenode/blobstore" + "storj.io/storj/storagenode/blobstore/filestore" "storj.io/storj/storagenode/notifications" "storj.io/storj/storagenode/orders" "storj.io/storj/storagenode/payouts" @@ -92,7 +92,7 @@ type DB struct { log *zap.Logger config Config - pieces storage.Blobs + pieces blobstore.Blobs dbDirectory string @@ -512,7 +512,7 @@ func (db *DB) Orders() orders.DB { } // Pieces returns blob storage for pieces. -func (db *DB) Pieces() storage.Blobs { +func (db *DB) Pieces() blobstore.Blobs { return db.pieces } diff --git a/storagenode/storagenodedb/migrations_test.go b/storagenode/storagenodedb/migrations_test.go index ba0f97203..601885e18 100644 --- a/storagenode/storagenodedb/migrations_test.go +++ b/storagenode/storagenodedb/migrations_test.go @@ -16,7 +16,7 @@ import ( "storj.io/common/testcontext" "storj.io/private/dbutil/dbschema" "storj.io/private/dbutil/sqliteutil" - "storj.io/storj/storage/filestore" + "storj.io/storj/storagenode/blobstore/filestore" "storj.io/storj/storagenode/storagenodedb" "storj.io/storj/storagenode/storagenodedb/testdata" ) diff --git a/storagenode/storagenodedb/pieceinfo.go b/storagenode/storagenodedb/pieceinfo.go index d38874c10..ba7e13655 100644 --- a/storagenode/storagenodedb/pieceinfo.go +++ b/storagenode/storagenodedb/pieceinfo.go @@ -12,8 +12,8 @@ import ( "storj.io/common/pb" "storj.io/common/storj" - "storj.io/storj/storage" - "storj.io/storj/storage/filestore" + "storj.io/storj/storagenode/blobstore" + "storj.io/storj/storagenode/blobstore/filestore" "storj.io/storj/storagenode/pieces" ) @@ -57,7 +57,7 @@ func (db *v0PieceInfoDB) Add(ctx context.Context, info *pieces.Info) (err error) return ErrPieceInfo.Wrap(err) } -func (db *v0PieceInfoDB) getAllPiecesOwnedBy(ctx context.Context, blobStore storage.Blobs, satelliteID storj.NodeID) ([]v0StoredPieceAccess, error) { +func (db *v0PieceInfoDB) getAllPiecesOwnedBy(ctx context.Context, blobStore blobstore.Blobs, satelliteID storj.NodeID) ([]v0StoredPieceAccess, error) { rows, err := db.QueryContext(ctx, ` SELECT piece_id, piece_size, piece_creation, piece_expiration FROM pieceinfo_ @@ -90,7 +90,7 @@ func (db *v0PieceInfoDB) getAllPiecesOwnedBy(ctx context.Context, blobStore stor // // If blobStore is nil, the .Stat() and .FullPath() methods of the provided StoredPieceAccess // instances will not work, but otherwise everything should be ok. -func (db *v0PieceInfoDB) WalkSatelliteV0Pieces(ctx context.Context, blobStore storage.Blobs, satelliteID storj.NodeID, walkFunc func(pieces.StoredPieceAccess) error) (err error) { +func (db *v0PieceInfoDB) WalkSatelliteV0Pieces(ctx context.Context, blobStore blobstore.Blobs, satelliteID storj.NodeID, walkFunc func(pieces.StoredPieceAccess) error) (err error) { defer mon.Task()(&ctx)(&err) // TODO: is it worth paging this query? we hope that SNs will not yet have too many V0 pieces. @@ -206,13 +206,13 @@ func (db *v0PieceInfoDB) GetExpired(ctx context.Context, expiredAt time.Time, li } type v0StoredPieceAccess struct { - blobStore storage.Blobs + blobStore blobstore.Blobs satellite storj.NodeID pieceID storj.PieceID pieceSize int64 creationTime time.Time expirationTime *time.Time - blobInfo storage.BlobInfo + blobInfo blobstore.BlobInfo } // PieceID returns the piece ID for the piece. @@ -225,9 +225,9 @@ func (v0Access *v0StoredPieceAccess) Satellite() (storj.NodeID, error) { return v0Access.satellite, nil } -// BlobRef returns the relevant storage.BlobRef locator for the piece. -func (v0Access *v0StoredPieceAccess) BlobRef() storage.BlobRef { - return storage.BlobRef{ +// BlobRef returns the relevant blobstore.BlobRef locator for the piece. +func (v0Access *v0StoredPieceAccess) BlobRef() blobstore.BlobRef { + return blobstore.BlobRef{ Namespace: v0Access.satellite.Bytes(), Key: v0Access.pieceID.Bytes(), } @@ -274,7 +274,7 @@ func (v0Access *v0StoredPieceAccess) FullPath(ctx context.Context) (string, erro } // StorageFormatVersion indicates the storage format version used to store the piece. -func (v0Access *v0StoredPieceAccess) StorageFormatVersion() storage.FormatVersion { +func (v0Access *v0StoredPieceAccess) StorageFormatVersion() blobstore.FormatVersion { return filestore.FormatV0 } diff --git a/storagenode/storagenodedb/storagenodedbtest/run_test.go b/storagenode/storagenodedb/storagenodedbtest/run_test.go index 96ed2ccb1..08810dead 100644 --- a/storagenode/storagenodedb/storagenodedbtest/run_test.go +++ b/storagenode/storagenodedb/storagenodedbtest/run_test.go @@ -22,8 +22,8 @@ import ( "storj.io/common/testcontext" "storj.io/common/testrand" "storj.io/common/uuid" - "storj.io/storj/storage/filestore" "storj.io/storj/storagenode" + "storj.io/storj/storagenode/blobstore/filestore" "storj.io/storj/storagenode/orders/ordersfile" "storj.io/storj/storagenode/storagenodedb" "storj.io/storj/storagenode/storagenodedb/storagenodedbtest" diff --git a/storagenode/storagenodedb/storagenodedbtest/snapshot_test.go b/storagenode/storagenodedb/storagenodedbtest/snapshot_test.go index 854b5667c..80a2e29e3 100644 --- a/storagenode/storagenodedb/storagenodedbtest/snapshot_test.go +++ b/storagenode/storagenodedb/storagenodedbtest/snapshot_test.go @@ -16,7 +16,7 @@ import ( "storj.io/common/testcontext" "storj.io/private/tagsql" - "storj.io/storj/storage/filestore" + "storj.io/storj/storagenode/blobstore/filestore" "storj.io/storj/storagenode/storagenodedb" )