storagenode: Include trash space when calculating space used

This commit adds functionality to include the space used in the trash
directory when calculating available space on the node.

It also includes this trash value in the space used cache, with methods
to keep the cache up-to-date as files are trashed, restored, and
emptied.

As part of the commit, the RestoreTrash and EmptyTrash methods have
slightly changed signatures. RestoreTrash now also returns the keys that
were restored, while EmptyTrash also returns the total disk space
recovered. Each of these changes makes it possible to keep the cache
up-to-date and know how much space is being used/recovered.

Also changed is the signature of PieceStoreAccess.ContentSize method.
Previously this method returns only the content size of the blob,
removing the size of any header data. This method has been renamed
`Size` and returns both the full disk size and content size of the blob.
This allows us to only stat the file once, and in some instances (i.e.
cache) knowing the full file size is useful.

Note: This commit simply adds the trash size data to the piece size data
we were already collecting. The piece size data is not accurate for all
use-cases (e.g. because it does not contain piece header data); however,
this commit does not fix that problem. Now that the ContentSize (Size)
method returns the full size of the file, it should be easier to fix
this problem in a future commit.

Change-Id: I4a6cae09e262c8452a618116d1dc66b687f59f85
This commit is contained in:
Isaac Hess 2019-12-21 06:11:24 -07:00
parent 3849b9a21a
commit 7d1e28ea30
17 changed files with 678 additions and 229 deletions

View File

@ -99,17 +99,17 @@ func (bad *BadBlobs) Trash(ctx context.Context, ref storage.BlobRef) error {
}
// RestoreTrash restores all files in the trash.
func (bad *BadBlobs) RestoreTrash(ctx context.Context, namespace []byte) error {
func (bad *BadBlobs) RestoreTrash(ctx context.Context, namespace []byte) ([][]byte, error) {
if bad.err != nil {
return bad.err
return nil, bad.err
}
return bad.blobs.RestoreTrash(ctx, namespace)
}
// EmptyTrash empties the trash
func (bad *BadBlobs) EmptyTrash(ctx context.Context, namespace []byte, trashedBefore time.Time) ([][]byte, error) {
func (bad *BadBlobs) EmptyTrash(ctx context.Context, namespace []byte, trashedBefore time.Time) (int64, [][]byte, error) {
if bad.err != nil {
return nil, bad.err
return 0, nil, bad.err
}
return bad.blobs.EmptyTrash(ctx, namespace, trashedBefore)
}
@ -174,20 +174,28 @@ func (bad *BadBlobs) FreeSpace() (int64, error) {
return bad.blobs.FreeSpace()
}
// SpaceUsed adds up how much is used in all namespaces.
func (bad *BadBlobs) SpaceUsed(ctx context.Context) (int64, error) {
// SpaceUsedForBlobs adds up how much is used in all namespaces.
func (bad *BadBlobs) SpaceUsedForBlobs(ctx context.Context) (int64, error) {
if bad.err != nil {
return 0, bad.err
}
return bad.blobs.SpaceUsed(ctx)
return bad.blobs.SpaceUsedForBlobs(ctx)
}
// SpaceUsedInNamespace adds up how much is used in the given namespace.
func (bad *BadBlobs) SpaceUsedInNamespace(ctx context.Context, namespace []byte) (int64, error) {
// SpaceUsedForBlobsInNamespace adds up how much is used in the given namespace.
func (bad *BadBlobs) SpaceUsedForBlobsInNamespace(ctx context.Context, namespace []byte) (int64, error) {
if bad.err != nil {
return 0, bad.err
}
return bad.blobs.SpaceUsedInNamespace(ctx, namespace)
return bad.blobs.SpaceUsedForBlobsInNamespace(ctx, namespace)
}
// SpaceUsedForTrash adds up how much is used in all namespaces.
func (bad *BadBlobs) SpaceUsedForTrash(ctx context.Context) (int64, error) {
if bad.err != nil {
return 0, bad.err
}
return bad.blobs.SpaceUsedForTrash(ctx)
}
// SetError configures the blob store to return a specific error for all operations.

View File

@ -90,13 +90,13 @@ func (slow *SlowBlobs) Trash(ctx context.Context, ref storage.BlobRef) error {
}
// RestoreTrash restores all files in the trash
func (slow *SlowBlobs) RestoreTrash(ctx context.Context, namespace []byte) error {
func (slow *SlowBlobs) RestoreTrash(ctx context.Context, namespace []byte) ([][]byte, error) {
slow.sleep()
return slow.blobs.RestoreTrash(ctx, namespace)
}
// EmptyTrash empties the trash
func (slow *SlowBlobs) EmptyTrash(ctx context.Context, namespace []byte, trashedBefore time.Time) ([][]byte, error) {
func (slow *SlowBlobs) EmptyTrash(ctx context.Context, namespace []byte, trashedBefore time.Time) (int64, [][]byte, error) {
slow.sleep()
return slow.blobs.EmptyTrash(ctx, namespace, trashedBefore)
}
@ -146,16 +146,22 @@ func (slow *SlowBlobs) FreeSpace() (int64, error) {
return slow.blobs.FreeSpace()
}
// SpaceUsed adds up how much is used in all namespaces
func (slow *SlowBlobs) SpaceUsed(ctx context.Context) (int64, error) {
// SpaceUsedForBlobs adds up how much is used in all namespaces
func (slow *SlowBlobs) SpaceUsedForBlobs(ctx context.Context) (int64, error) {
slow.sleep()
return slow.blobs.SpaceUsed(ctx)
return slow.blobs.SpaceUsedForBlobs(ctx)
}
// SpaceUsedInNamespace adds up how much is used in the given namespace
func (slow *SlowBlobs) SpaceUsedInNamespace(ctx context.Context, namespace []byte) (int64, error) {
// SpaceUsedForBlobsInNamespace adds up how much is used in the given namespace
func (slow *SlowBlobs) SpaceUsedForBlobsInNamespace(ctx context.Context, namespace []byte) (int64, error) {
slow.sleep()
return slow.blobs.SpaceUsedInNamespace(ctx, namespace)
return slow.blobs.SpaceUsedForBlobsInNamespace(ctx, namespace)
}
// SpaceUsedForTrash adds up how much is used in all namespaces
func (slow *SlowBlobs) SpaceUsedForTrash(ctx context.Context) (int64, error) {
slow.sleep()
return slow.blobs.SpaceUsedForTrash(ctx)
}
// SetLatency configures the blob store to sleep for delay duration for all

View File

@ -81,22 +81,24 @@ type Blobs interface {
DeleteWithStorageFormat(ctx context.Context, ref BlobRef, formatVer FormatVersion) error
// Trash marks a file for pending deletion
Trash(ctx context.Context, ref BlobRef) error
// RestoreTrash restores all files in the trash for a given namespace
RestoreTrash(ctx context.Context, namespace []byte) error
// EmptyTrash removes all files in trash that were moved to trash prior to trashedBefore
EmptyTrash(ctx context.Context, namespace []byte, trashedBefore time.Time) ([][]byte, error)
// RestoreTrash restores all files in the trash for a given namespace and returns the keys restored
RestoreTrash(ctx context.Context, namespace []byte) ([][]byte, error)
// EmptyTrash removes all files in trash that were moved to trash prior to trashedBefore and returns the total bytes emptied and keys deleted
EmptyTrash(ctx context.Context, namespace []byte, trashedBefore time.Time) (int64, [][]byte, error)
// Stat looks up disk metadata on the blob file
Stat(ctx context.Context, ref BlobRef) (BlobInfo, error)
// StatWithStorageFormat looks up disk metadata for the blob file with the given storage format
// version. This avoids the potential need to check multiple storage formats for the blob
// when the format is already known.
StatWithStorageFormat(ctx context.Context, ref BlobRef, formatVer FormatVersion) (BlobInfo, error)
// FreeSpace return how much free space left for writing
// FreeSpace return how much free space is available to the blobstore
FreeSpace() (int64, error)
// SpaceUsed adds up how much is used in all namespaces
SpaceUsed(ctx context.Context) (int64, error)
// SpaceUsedInNamespace adds up how much is used in the given namespace
SpaceUsedInNamespace(ctx context.Context, namespace []byte) (int64, error)
// SpaceUsedForTrash returns the total space used by the trash
SpaceUsedForTrash(ctx context.Context) (int64, error)
// SpaceUsedForBlobs adds up how much is used in all namespaces
SpaceUsedForBlobs(ctx context.Context) (int64, error)
// SpaceUsedForBlobsInNamespace adds up how much is used in the given namespace
SpaceUsedForBlobsInNamespace(ctx context.Context, namespace []byte) (int64, error)
// ListNamespaces finds all namespaces in which keys might currently be stored.
ListNamespaces(ctx context.Context) ([][]byte, error)
// WalkNamespace executes walkFunc for each locally stored blob, stored with

View File

@ -359,8 +359,8 @@ func (dir *Dir) ReplaceTrashnow(trashnow func() time.Time) {
}
// RestoreTrash moves every piece in the trash folder back into blobsdir
func (dir *Dir) RestoreTrash(ctx context.Context, namespace []byte) (err error) {
return dir.walkNamespaceInPath(ctx, namespace, dir.trashdir(), func(info storage.BlobInfo) error {
func (dir *Dir) RestoreTrash(ctx context.Context, namespace []byte) (keysRestored [][]byte, err error) {
err = dir.walkNamespaceInPath(ctx, namespace, dir.trashdir(), func(info storage.BlobInfo) error {
blobsBasePath, err := dir.blobToBasePath(info.BlobRef())
if err != nil {
return err
@ -389,14 +389,20 @@ func (dir *Dir) RestoreTrash(ctx context.Context, namespace []byte) (err error)
// by callers to return a nil error in the case of concurrent calls.)
return nil
}
return err
if err != nil {
return err
}
keysRestored = append(keysRestored, info.BlobRef().Key)
return nil
})
return keysRestored, err
}
// EmptyTrash walks the trash files for the given namespace and deletes any
// file whose mtime is older than trashedBefore. The mtime is modified when
// Trash is called.
func (dir *Dir) EmptyTrash(ctx context.Context, namespace []byte, trashedBefore time.Time) (deletedKeys [][]byte, err error) {
func (dir *Dir) EmptyTrash(ctx context.Context, namespace []byte, trashedBefore time.Time) (bytesEmptied int64, deletedKeys [][]byte, err error) {
defer mon.Task()(&ctx)(&err)
err = dir.walkNamespaceInPath(ctx, namespace, dir.trashdir(), func(blobInfo storage.BlobInfo) error {
fileInfo, err := blobInfo.Stat(ctx)
@ -411,13 +417,14 @@ func (dir *Dir) EmptyTrash(ctx context.Context, namespace []byte, trashedBefore
return err
}
deletedKeys = append(deletedKeys, blobInfo.BlobRef().Key)
bytesEmptied += fileInfo.Size()
}
return nil
})
if err != nil {
return nil, err
return 0, nil, err
}
return deletedKeys, nil
return bytesEmptied, deletedKeys, nil
}
// iterateStorageFormatVersions executes f for all storage format versions,

View File

@ -6,6 +6,7 @@ package filestore
import (
"context"
"os"
"path/filepath"
"time"
"github.com/zeebo/errs"
@ -109,25 +110,21 @@ func (store *blobStore) DeleteWithStorageFormat(ctx context.Context, ref storage
// Trash moves the ref to a trash directory
func (store *blobStore) Trash(ctx context.Context, ref storage.BlobRef) (err error) {
defer mon.Task()(&ctx)(&err)
err = store.dir.Trash(ctx, ref)
return Error.Wrap(err)
return Error.Wrap(store.dir.Trash(ctx, ref))
}
// RestoreTrash moves every piece in the trash back into the regular location
func (store *blobStore) RestoreTrash(ctx context.Context, namespace []byte) (err error) {
func (store *blobStore) RestoreTrash(ctx context.Context, namespace []byte) (keysRestored [][]byte, err error) {
defer mon.Task()(&ctx)(&err)
err = store.dir.RestoreTrash(ctx, namespace)
return Error.Wrap(err)
keysRestored, err = store.dir.RestoreTrash(ctx, namespace)
return keysRestored, Error.Wrap(err)
}
// // EmptyTrash removes all files in trash that have been there longer than trashExpiryDur
func (store *blobStore) EmptyTrash(ctx context.Context, namespace []byte, trashedBefore time.Time) (keys [][]byte, err error) {
func (store *blobStore) EmptyTrash(ctx context.Context, namespace []byte, trashedBefore time.Time) (bytesEmptied int64, keys [][]byte, err error) {
defer mon.Task()(&ctx)(&err)
keys, err = store.dir.EmptyTrash(ctx, namespace, trashedBefore)
if err != nil {
return nil, Error.Wrap(err)
}
return keys, nil
bytesEmptied, keys, err = store.dir.EmptyTrash(ctx, namespace, trashedBefore)
return bytesEmptied, keys, Error.Wrap(err)
}
// GarbageCollect tries to delete any files that haven't yet been deleted
@ -148,8 +145,8 @@ func (store *blobStore) Create(ctx context.Context, ref storage.BlobRef, size in
return newBlobWriter(ref, store, MaxFormatVersionSupported, file), nil
}
// SpaceUsed adds up the space used in all namespaces for blob storage
func (store *blobStore) SpaceUsed(ctx context.Context) (space int64, err error) {
// SpaceUsedForBlobs adds up the space used in all namespaces for blob storage
func (store *blobStore) SpaceUsedForBlobs(ctx context.Context) (space int64, err error) {
defer mon.Task()(&ctx)(&err)
var totalSpaceUsed int64
@ -158,7 +155,7 @@ func (store *blobStore) SpaceUsed(ctx context.Context) (space int64, err error)
return 0, Error.New("failed to enumerate namespaces: %v", err)
}
for _, namespace := range namespaces {
used, err := store.SpaceUsedInNamespace(ctx, namespace)
used, err := store.SpaceUsedForBlobsInNamespace(ctx, namespace)
if err != nil {
return 0, Error.New("failed to sum space used: %v", err)
}
@ -167,8 +164,8 @@ func (store *blobStore) SpaceUsed(ctx context.Context) (space int64, err error)
return totalSpaceUsed, nil
}
// SpaceUsedInNamespace adds up how much is used in the given namespace for blob storage
func (store *blobStore) SpaceUsedInNamespace(ctx context.Context, namespace []byte) (int64, error) {
// SpaceUsedForBlobsInNamespace adds up how much is used in the given namespace for blob storage
func (store *blobStore) SpaceUsedForBlobsInNamespace(ctx context.Context, namespace []byte) (int64, error) {
var totalUsed int64
err := store.WalkNamespace(ctx, namespace, func(info storage.BlobInfo) error {
statInfo, statErr := info.Stat(ctx)
@ -186,6 +183,20 @@ func (store *blobStore) SpaceUsedInNamespace(ctx context.Context, namespace []by
return totalUsed, nil
}
// SpaceUsedForTrash returns the total space used by the trash
func (store *blobStore) SpaceUsedForTrash(ctx context.Context) (total int64, err error) {
defer mon.Task()(&ctx)(&err)
err = filepath.Walk(store.dir.trashdir(), func(path string, info os.FileInfo, walkErr error) error {
if walkErr != nil {
err = errs.Combine(err, walkErr)
return filepath.SkipDir
}
total += info.Size()
return nil
})
return total, err
}
// FreeSpace returns how much space left in underlying directory
func (store *blobStore) FreeSpace() (int64, error) {
info, err := store.dir.Info()

View File

@ -12,6 +12,7 @@ import (
"path/filepath"
"sort"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -355,8 +356,8 @@ func TestMultipleStorageFormatVersions(t *testing.T) {
assert.Nil(t, reader)
}
// Check that the SpaceUsed and SpaceUsedInNamespace methods on filestore.blobStore
// work as expected.
// Check that the SpaceUsedForBlobs and SpaceUsedForBlobsInNamespace methods on
// filestore.blobStore work as expected.
func TestStoreSpaceUsed(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
@ -371,13 +372,13 @@ func TestStoreSpaceUsed(t *testing.T) {
sizesToStore = []memory.Size{4093, 0, 512, 1, memory.MB}
)
spaceUsed, err := store.SpaceUsed(ctx)
spaceUsed, err := store.SpaceUsedForBlobs(ctx)
require.NoError(t, err)
assert.Equal(t, int64(0), spaceUsed)
spaceUsed, err = store.SpaceUsedInNamespace(ctx, namespace)
spaceUsed, err = store.SpaceUsedForBlobsInNamespace(ctx, namespace)
require.NoError(t, err)
assert.Equal(t, int64(0), spaceUsed)
spaceUsed, err = store.SpaceUsedInNamespace(ctx, otherNamespace)
spaceUsed, err = store.SpaceUsedForBlobsInNamespace(ctx, otherNamespace)
require.NoError(t, err)
assert.Equal(t, int64(0), spaceUsed)
@ -394,13 +395,13 @@ func TestStoreSpaceUsed(t *testing.T) {
require.NoError(t, err)
totalSoFar += size
spaceUsed, err := store.SpaceUsed(ctx)
spaceUsed, err := store.SpaceUsedForBlobs(ctx)
require.NoError(t, err)
assert.Equal(t, int64(totalSoFar), spaceUsed)
spaceUsed, err = store.SpaceUsedInNamespace(ctx, namespace)
spaceUsed, err = store.SpaceUsedForBlobsInNamespace(ctx, namespace)
require.NoError(t, err)
assert.Equal(t, int64(totalSoFar), spaceUsed)
spaceUsed, err = store.SpaceUsedInNamespace(ctx, otherNamespace)
spaceUsed, err = store.SpaceUsedForBlobsInNamespace(ctx, otherNamespace)
require.NoError(t, err)
assert.Equal(t, int64(0), spaceUsed)
}
@ -530,6 +531,131 @@ func TestStoreTraversals(t *testing.T) {
assert.Equal(t, 2, iterations)
}
func TestEmptyTrash(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
store, err := filestore.NewAt(zaptest.NewLogger(t), ctx.Dir("store"))
require.NoError(t, err)
ctx.Check(store.Close)
size := memory.KB
type testfile struct {
data []byte
formatVer storage.FormatVersion
}
type testref struct {
key []byte
files []testfile
}
type testnamespace struct {
namespace []byte
refs []testref
}
namespaces := []testnamespace{
{
namespace: testrand.Bytes(namespaceSize),
refs: []testref{
{
// Has v0 and v1
key: testrand.Bytes(keySize),
files: []testfile{
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV0,
},
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV1,
},
},
},
{
// Has v0 only
key: testrand.Bytes(keySize),
files: []testfile{
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV0,
},
},
},
{
// Has v1 only
key: testrand.Bytes(keySize),
files: []testfile{
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV0,
},
},
},
},
},
{
namespace: testrand.Bytes(namespaceSize),
refs: []testref{
{
// Has v1 only
key: testrand.Bytes(keySize),
files: []testfile{
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV0,
},
},
},
},
},
}
for _, namespace := range namespaces {
for _, ref := range namespace.refs {
blobref := storage.BlobRef{
Namespace: namespace.namespace,
Key: ref.key,
}
for _, file := range ref.files {
var w storage.BlobWriter
if file.formatVer == filestore.FormatV0 {
fStore, ok := store.(interface {
TestCreateV0(ctx context.Context, ref storage.BlobRef) (_ storage.BlobWriter, err error)
})
require.Truef(t, ok, "can't make TestCreateV0 with this blob store (%T)", store)
w, err = fStore.TestCreateV0(ctx, blobref)
} else if file.formatVer == filestore.FormatV1 {
w, err = store.Create(ctx, blobref, int64(size))
}
require.NoError(t, err)
require.NotNil(t, w)
_, err = w.Write(file.data)
require.NoError(t, err)
require.NoError(t, w.Commit(ctx))
requireFileMatches(ctx, t, store, file.data, blobref, file.formatVer)
}
// Trash the ref
require.NoError(t, store.Trash(ctx, blobref))
}
}
// Restore the first namespace
var expectedFilesEmptied int64
for _, ref := range namespaces[0].refs {
for range ref.files {
expectedFilesEmptied++
}
}
emptiedBytes, keys, err := store.EmptyTrash(ctx, namespaces[0].namespace, time.Now().Add(time.Hour))
require.NoError(t, err)
assert.Equal(t, expectedFilesEmptied*int64(size), emptiedBytes)
assert.Equal(t, int(expectedFilesEmptied), len(keys))
}
func TestTrashAndRestore(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
@ -650,7 +776,17 @@ func TestTrashAndRestore(t *testing.T) {
}
// Restore the first namespace
require.NoError(t, store.RestoreTrash(ctx, namespaces[0].namespace))
var expKeysRestored [][]byte
for _, ref := range namespaces[0].refs {
for range ref.files {
expKeysRestored = append(expKeysRestored, ref.key)
}
}
sort.Slice(expKeysRestored, func(i int, j int) bool { return expKeysRestored[i][0] < expKeysRestored[j][0] })
restoredKeys, err := store.RestoreTrash(ctx, namespaces[0].namespace)
sort.Slice(restoredKeys, func(i int, j int) bool { return restoredKeys[i][0] < restoredKeys[j][0] })
require.NoError(t, err)
assert.Equal(t, expKeysRestored, restoredKeys)
// Verify pieces are back and look good for first namespace
for _, ref := range namespaces[0].refs {

View File

@ -55,7 +55,7 @@ func TestCollector(t *testing.T) {
usedSerials := storageNode.DB.UsedSerials()
// verify that we actually have some data on storage nodes
used, err := pieceStore.SpaceUsed(ctx)
used, err := pieceStore.SpaceUsedForBlobs(ctx)
require.NoError(t, err)
if used == 0 {
// this storage node didn't get picked for storing data
@ -109,7 +109,7 @@ func TestCollector(t *testing.T) {
require.NoError(t, err)
// verify that we deleted everything
used, err := pieceStore.SpaceUsed(ctx)
used, err := pieceStore.SpaceUsedForBlobs(ctx)
require.NoError(t, err)
require.Equal(t, int64(0), used)

View File

@ -270,7 +270,7 @@ func (worker *Worker) deleteOnePieceOrAll(ctx context.Context, pieceID *storj.Pi
pieceMap := make(map[pb.PieceID]int64)
ctxWithCancel, cancel := context.WithCancel(ctx)
err := worker.store.WalkSatellitePieces(ctxWithCancel, worker.satelliteID, func(piece pieces.StoredPieceAccess) error {
size, err := piece.ContentSize(ctxWithCancel)
_, size, err := piece.Size(ctxWithCancel)
if err != nil {
worker.log.Debug("failed to retrieve piece info", zap.Stringer("Satellite ID", worker.satelliteID), zap.Error(err))
}

View File

@ -162,7 +162,7 @@ func (service *Service) updateNodeInformation(ctx context.Context) (err error) {
func (service *Service) usedSpace(ctx context.Context) (_ int64, err error) {
defer mon.Task()(&ctx)(&err)
usedSpace, err := service.store.SpaceUsedForPieces(ctx)
usedSpace, err := service.store.SpaceUsedForPiecesAndTrash(ctx)
if err != nil {
return 0, err
}
@ -181,7 +181,7 @@ func (service *Service) usedBandwidth(ctx context.Context) (_ int64, err error)
// AvailableSpace returns available disk space for upload
func (service *Service) AvailableSpace(ctx context.Context) (_ int64, err error) {
defer mon.Task()(&ctx)(&err)
usedSpace, err := service.store.SpaceUsedForPieces(ctx)
usedSpace, err := service.usedSpace(ctx)
if err != nil {
return 0, Error.Wrap(err)
}

View File

@ -8,6 +8,7 @@ import (
"sync"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/storj/pkg/storj"
@ -48,13 +49,17 @@ func (service *CacheService) Run(ctx context.Context) (err error) {
if err != nil {
service.log.Error("error getting current space used calculation: ", zap.Error(err))
}
if err = service.usageCache.Recalculate(ctx, newTotal,
totalAtStart.totalSpaceUsed,
newTotalBySatellite,
totalAtStart.totalSpaceUsedBySatellite,
); err != nil {
service.log.Error("error during recalculating space usage cache: ", zap.Error(err))
newTrashTotal, err := service.store.SpaceUsedForTrash(ctx)
if err != nil {
service.log.Error("error getting current space for trash: ", zap.Error(err))
}
service.usageCache.Recalculate(ctx, newTotal,
totalAtStart.spaceUsedForPieces,
newTotalBySatellite,
totalAtStart.spaceUsedBySatellite,
newTrashTotal,
totalAtStart.spaceUsedForTrash,
)
if err = service.store.spaceUsedDB.Init(ctx); err != nil {
service.log.Error("error during init space usage db: ", zap.Error(err))
@ -79,10 +84,13 @@ func (service *CacheService) PersistCacheTotals(ctx context.Context) error {
cache := service.usageCache
cache.mu.Lock()
defer cache.mu.Unlock()
if err := service.store.spaceUsedDB.UpdateTotal(ctx, cache.totalSpaceUsed); err != nil {
if err := service.store.spaceUsedDB.UpdatePieceTotal(ctx, cache.spaceUsedForPieces); err != nil {
return err
}
if err := service.store.spaceUsedDB.UpdateTotalsForAllSatellites(ctx, cache.totalSpaceUsedBySatellite); err != nil {
if err := service.store.spaceUsedDB.UpdatePieceTotalsForAllSatellites(ctx, cache.spaceUsedBySatellite); err != nil {
return err
}
if err := service.store.spaceUsedDB.UpdateTrashTotal(ctx, cache.spaceUsedForTrash); err != nil {
return err
}
return nil
@ -90,13 +98,13 @@ func (service *CacheService) PersistCacheTotals(ctx context.Context) error {
// Init initializes the space used cache with the most recent values that were stored persistently
func (service *CacheService) Init(ctx context.Context) (err error) {
total, err := service.store.spaceUsedDB.GetTotal(ctx)
total, err := service.store.spaceUsedDB.GetPieceTotal(ctx)
if err != nil {
service.log.Error("CacheServiceInit error during initializing space usage cache GetTotal:", zap.Error(err))
return err
}
totalBySatellite, err := service.store.spaceUsedDB.GetTotalsForAllSatellites(ctx)
totalBySatellite, err := service.store.spaceUsedDB.GetPieceTotalsForAllSatellites(ctx)
if err != nil {
service.log.Error("CacheServiceInit error during initializing space usage cache GetTotalsForAllSatellites:", zap.Error(err))
return err
@ -119,33 +127,35 @@ func (service *CacheService) Close() (err error) {
type BlobsUsageCache struct {
storage.Blobs
mu sync.Mutex
totalSpaceUsed int64
totalSpaceUsedBySatellite map[storj.NodeID]int64
mu sync.Mutex
spaceUsedForPieces int64
spaceUsedForTrash int64
spaceUsedBySatellite map[storj.NodeID]int64
}
// NewBlobsUsageCache creates a new disk blob store with a space used cache
func NewBlobsUsageCache(blob storage.Blobs) *BlobsUsageCache {
return &BlobsUsageCache{
Blobs: blob,
totalSpaceUsedBySatellite: map[storj.NodeID]int64{},
Blobs: blob,
spaceUsedBySatellite: map[storj.NodeID]int64{},
}
}
// NewBlobsUsageCacheTest creates a new disk blob store with a space used cache
func NewBlobsUsageCacheTest(blob storage.Blobs, total int64, totalSpaceUsedBySatellite map[storj.NodeID]int64) *BlobsUsageCache {
func NewBlobsUsageCacheTest(blob storage.Blobs, piecesTotal, trashTotal int64, spaceUsedBySatellite map[storj.NodeID]int64) *BlobsUsageCache {
return &BlobsUsageCache{
Blobs: blob,
totalSpaceUsed: total,
totalSpaceUsedBySatellite: totalSpaceUsedBySatellite,
Blobs: blob,
spaceUsedForPieces: piecesTotal,
spaceUsedForTrash: trashTotal,
spaceUsedBySatellite: spaceUsedBySatellite,
}
}
func (blobs *BlobsUsageCache) init(total int64, totalBySatellite map[storj.NodeID]int64) {
blobs.mu.Lock()
defer blobs.mu.Unlock()
blobs.totalSpaceUsed = total
blobs.totalSpaceUsedBySatellite = totalBySatellite
blobs.spaceUsedForPieces = total
blobs.spaceUsedBySatellite = totalBySatellite
}
// SpaceUsedBySatellite returns the current total space used for a specific
@ -153,7 +163,7 @@ func (blobs *BlobsUsageCache) init(total int64, totalBySatellite map[storj.NodeI
func (blobs *BlobsUsageCache) SpaceUsedBySatellite(ctx context.Context, satelliteID storj.NodeID) (int64, error) {
blobs.mu.Lock()
defer blobs.mu.Unlock()
return blobs.totalSpaceUsedBySatellite[satelliteID], nil
return blobs.spaceUsedBySatellite[satelliteID], nil
}
// SpaceUsedForPieces returns the current total used space for
@ -161,21 +171,20 @@ func (blobs *BlobsUsageCache) SpaceUsedBySatellite(ctx context.Context, satellit
func (blobs *BlobsUsageCache) SpaceUsedForPieces(ctx context.Context) (int64, error) {
blobs.mu.Lock()
defer blobs.mu.Unlock()
return blobs.totalSpaceUsed, nil
return blobs.spaceUsedForPieces, nil
}
// SpaceUsedForTrash returns the current total used space for the trash dir
func (blobs *BlobsUsageCache) SpaceUsedForTrash(ctx context.Context) (int64, error) {
blobs.mu.Lock()
defer blobs.mu.Unlock()
return blobs.spaceUsedForTrash, nil
}
// Delete gets the size of the piece that is going to be deleted then deletes it and
// updates the space used cache accordingly
func (blobs *BlobsUsageCache) Delete(ctx context.Context, blobRef storage.BlobRef) error {
blobInfo, err := blobs.Stat(ctx, blobRef)
if err != nil {
return err
}
pieceAccess, err := newStoredPieceAccess(nil, blobInfo)
if err != nil {
return err
}
pieceContentSize, err := pieceAccess.ContentSize(ctx)
_, pieceContentSize, err := blobs.pieceContentSize(ctx, blobRef)
if err != nil {
return Error.Wrap(err)
}
@ -184,30 +193,111 @@ func (blobs *BlobsUsageCache) Delete(ctx context.Context, blobRef storage.BlobRe
return Error.Wrap(err)
}
satelliteID := storj.NodeID{}
copy(satelliteID[:], blobRef.Namespace)
blobs.Update(ctx, satelliteID, -pieceContentSize)
satelliteID, err := storj.NodeIDFromBytes(blobRef.Namespace)
if err != nil {
return err
}
blobs.Update(ctx, satelliteID, -pieceContentSize, 0)
return nil
}
func (blobs *BlobsUsageCache) pieceContentSize(ctx context.Context, blobRef storage.BlobRef) (size int64, contentSize int64, err error) {
blobInfo, err := blobs.Stat(ctx, blobRef)
if err != nil {
return 0, 0, err
}
pieceAccess, err := newStoredPieceAccess(nil, blobInfo)
if err != nil {
return 0, 0, err
}
return pieceAccess.Size(ctx)
}
// Update updates the cache totals with the piece content size
func (blobs *BlobsUsageCache) Update(ctx context.Context, satelliteID storj.NodeID, pieceContentSize int64) {
func (blobs *BlobsUsageCache) Update(ctx context.Context, satelliteID storj.NodeID, piecesDelta, trashDelta int64) {
blobs.mu.Lock()
defer blobs.mu.Unlock()
blobs.totalSpaceUsed += pieceContentSize
blobs.totalSpaceUsedBySatellite[satelliteID] += pieceContentSize
blobs.spaceUsedForPieces += piecesDelta
blobs.spaceUsedBySatellite[satelliteID] += piecesDelta
blobs.spaceUsedForTrash += trashDelta
}
// Trash moves the ref to the trash and updates the cache
func (blobs *BlobsUsageCache) Trash(ctx context.Context, blobRef storage.BlobRef) error {
size, pieceContentSize, err := blobs.pieceContentSize(ctx, blobRef)
if err != nil {
return Error.Wrap(err)
}
err = blobs.Blobs.Trash(ctx, blobRef)
if err != nil {
return Error.Wrap(err)
}
satelliteID, err := storj.NodeIDFromBytes(blobRef.Namespace)
if err != nil {
return Error.Wrap(err)
}
blobs.Update(ctx, satelliteID, -pieceContentSize, size)
return nil
}
// EmptyTrash empties the trash and updates the cache
func (blobs *BlobsUsageCache) EmptyTrash(ctx context.Context, namespace []byte, trashedBefore time.Time) (int64, [][]byte, error) {
satelliteID, err := storj.NodeIDFromBytes(namespace)
if err != nil {
return 0, nil, err
}
bytesEmptied, keys, err := blobs.Blobs.EmptyTrash(ctx, namespace, trashedBefore)
if err != nil {
return 0, nil, err
}
blobs.Update(ctx, satelliteID, 0, -bytesEmptied)
return bytesEmptied, keys, nil
}
// RestoreTrash restores the trash for the namespace and updates the cache
func (blobs *BlobsUsageCache) RestoreTrash(ctx context.Context, namespace []byte) ([][]byte, error) {
satelliteID, err := storj.NodeIDFromBytes(namespace)
if err != nil {
return nil, err
}
keysRestored, err := blobs.Blobs.RestoreTrash(ctx, namespace)
if err != nil {
return nil, err
}
for _, key := range keysRestored {
size, contentSize, sizeErr := blobs.pieceContentSize(ctx, storage.BlobRef{
Key: key,
Namespace: namespace,
})
if sizeErr != nil {
err = errs.Combine(err, sizeErr)
continue
}
blobs.Update(ctx, satelliteID, contentSize, -size)
}
return keysRestored, err
}
func (blobs *BlobsUsageCache) copyCacheTotals() BlobsUsageCache {
blobs.mu.Lock()
defer blobs.mu.Unlock()
var copyMap = map[storj.NodeID]int64{}
for k, v := range blobs.totalSpaceUsedBySatellite {
for k, v := range blobs.spaceUsedBySatellite {
copyMap[k] = v
}
return BlobsUsageCache{
totalSpaceUsed: blobs.totalSpaceUsed,
totalSpaceUsedBySatellite: copyMap,
spaceUsedForPieces: blobs.spaceUsedForPieces,
spaceUsedForTrash: blobs.spaceUsedForTrash,
spaceUsedBySatellite: copyMap,
}
}
@ -215,19 +305,25 @@ func (blobs *BlobsUsageCache) copyCacheTotals() BlobsUsageCache {
// space used cache, we had to iterate over all the pieces on disk. Since that can potentially take
// a long time, here we need to check if we missed any additions/deletions while we were iterating and
// estimate how many bytes missed then add those to the space used result of iteration.
func (blobs *BlobsUsageCache) Recalculate(ctx context.Context, newTotal, totalAtIterationStart int64, newTotalBySatellite, totalBySatelliteAtIterationStart map[storj.NodeID]int64) error {
func (blobs *BlobsUsageCache) Recalculate(ctx context.Context, newTotal, totalAtIterationStart int64, newTotalBySatellite,
totalBySatelliteAtIterationStart map[storj.NodeID]int64, newTrashTotal, trashTotalAtIterationStart int64) {
totalsAtIterationEnd := blobs.copyCacheTotals()
estimatedTotals := estimate(newTotal,
totalAtIterationStart,
totalsAtIterationEnd.totalSpaceUsed,
totalsAtIterationEnd.spaceUsedForPieces,
)
estimatedTrash := estimate(newTrashTotal,
trashTotalAtIterationStart,
totalsAtIterationEnd.spaceUsedForTrash)
var estimatedTotalsBySatellite = map[storj.NodeID]int64{}
for ID, newTotal := range newTotalBySatellite {
estimatedNewTotal := estimate(newTotal,
totalBySatelliteAtIterationStart[ID],
totalsAtIterationEnd.totalSpaceUsedBySatellite[ID],
totalsAtIterationEnd.spaceUsedBySatellite[ID],
)
// if the estimatedNewTotal is zero then there is no data stored
// for this satelliteID so don't add it to the cache
@ -238,14 +334,14 @@ func (blobs *BlobsUsageCache) Recalculate(ctx context.Context, newTotal, totalAt
}
// find any saIDs that are in totalsAtIterationEnd but not in newTotalSpaceUsedBySatellite
missedWhenIterationEnded := getMissed(totalsAtIterationEnd.totalSpaceUsedBySatellite,
missedWhenIterationEnded := getMissed(totalsAtIterationEnd.spaceUsedBySatellite,
newTotalBySatellite,
)
if len(missedWhenIterationEnded) > 0 {
for ID := range missedWhenIterationEnded {
estimatedNewTotal := estimate(0,
totalBySatelliteAtIterationStart[ID],
totalsAtIterationEnd.totalSpaceUsedBySatellite[ID],
totalsAtIterationEnd.spaceUsedBySatellite[ID],
)
if estimatedNewTotal == 0 {
continue
@ -255,10 +351,10 @@ func (blobs *BlobsUsageCache) Recalculate(ctx context.Context, newTotal, totalAt
}
blobs.mu.Lock()
blobs.totalSpaceUsed = estimatedTotals
blobs.totalSpaceUsedBySatellite = estimatedTotalsBySatellite
blobs.spaceUsedForPieces = estimatedTotals
blobs.spaceUsedForTrash = estimatedTrash
blobs.spaceUsedBySatellite = estimatedTotalsBySatellite
blobs.mu.Unlock()
return nil
}
func estimate(newSpaceUsedTotal, totalAtIterationStart, totalAtIterationEnd int64) int64 {

View File

@ -30,28 +30,41 @@ func TestDBInit(t *testing.T) {
defer ctx.Cleanup()
spaceUsedDB := db.PieceSpaceUsedDB()
total, err := spaceUsedDB.GetTotal(ctx)
total, err := spaceUsedDB.GetPieceTotal(ctx)
require.NoError(t, err)
require.Equal(t, total, int64(0))
// Expect that no total record exists since we haven't
// initialized yet
total, err = spaceUsedDB.GetTotal(ctx)
total, err = spaceUsedDB.GetPieceTotal(ctx)
require.NoError(t, err)
require.Equal(t, total, int64(0))
// Expect no record for trash total
trashTotal, err := spaceUsedDB.GetTrashTotal(ctx)
require.NoError(t, err)
require.Equal(t, int64(0), trashTotal)
// Now initialize the db to create the total record
err = spaceUsedDB.Init(ctx)
require.NoError(t, err)
// Now that a total record exists, we can update it
err = spaceUsedDB.UpdateTotal(ctx, int64(100))
err = spaceUsedDB.UpdatePieceTotal(ctx, int64(100))
require.NoError(t, err)
// Confirm the total record is now 100
total, err = spaceUsedDB.GetTotal(ctx)
err = spaceUsedDB.UpdateTrashTotal(ctx, int64(150))
require.NoError(t, err)
// Confirm the total record has been updated
total, err = spaceUsedDB.GetPieceTotal(ctx)
require.NoError(t, err)
require.Equal(t, total, int64(100))
// Confirm the trash total record has been updated
total, err = spaceUsedDB.GetTrashTotal(ctx)
require.NoError(t, err)
require.Equal(t, total, int64(150))
})
}
func TestCacheInit(t *testing.T) {
@ -64,7 +77,7 @@ func TestCacheInit(t *testing.T) {
require.NoError(t, err)
// setup the cache with zero values
cache := pieces.NewBlobsUsageCacheTest(nil, 0, nil)
cache := pieces.NewBlobsUsageCacheTest(nil, 0, 0, nil)
cacheService := pieces.NewService(zap.L(),
cache,
pieces.NewStore(zap.L(), cache, nil, nil, spaceUsedDB),
@ -75,18 +88,22 @@ func TestCacheInit(t *testing.T) {
// that the cache gets initialized with zero values
err = cacheService.Init(ctx)
require.NoError(t, err)
actualTotal, err := cache.SpaceUsedForPieces(ctx)
piecesTotal, err := cache.SpaceUsedForPieces(ctx)
require.NoError(t, err)
require.Equal(t, int64(0), actualTotal)
require.Equal(t, int64(0), piecesTotal)
actualTotalBySA, err := cache.SpaceUsedBySatellite(ctx, storj.NodeID{1})
require.NoError(t, err)
require.Equal(t, int64(0), actualTotalBySA)
trashTotal, err := cache.SpaceUsedForTrash(ctx)
require.NoError(t, err)
require.Equal(t, int64(0), trashTotal)
// setup: update the cache then sync those cache values
// to the database
expectedTotal := int64(150)
expectedPieces := int64(150)
expectedTotalBySA := map[storj.NodeID]int64{{1}: 100, {2}: 50}
cache = pieces.NewBlobsUsageCacheTest(nil, expectedTotal, expectedTotalBySA)
expectedTrash := int64(127)
cache = pieces.NewBlobsUsageCacheTest(nil, expectedPieces, expectedTrash, expectedTotalBySA)
cacheService = pieces.NewService(zap.L(),
cache,
pieces.NewStore(zap.L(), cache, nil, nil, spaceUsedDB),
@ -99,15 +116,18 @@ func TestCacheInit(t *testing.T) {
// that the cache gets initialized with the values from the database
err = cacheService.Init(ctx)
require.NoError(t, err)
actualTotal, err = cache.SpaceUsedForPieces(ctx)
piecesTotal, err = cache.SpaceUsedForPieces(ctx)
require.NoError(t, err)
require.Equal(t, expectedTotal, actualTotal)
require.Equal(t, expectedPieces, piecesTotal)
actualTotalBySA, err = cache.SpaceUsedBySatellite(ctx, storj.NodeID{1})
require.NoError(t, err)
require.Equal(t, int64(100), actualTotalBySA)
actualTotalBySA, err = cache.SpaceUsedBySatellite(ctx, storj.NodeID{2})
require.NoError(t, err)
require.Equal(t, int64(50), actualTotalBySA)
actualTrash, err := cache.SpaceUsedForTrash(ctx)
require.NoError(t, err)
require.Equal(t, int64(127), actualTrash)
})
}
@ -118,25 +138,33 @@ func TestPersistCacheTotals(t *testing.T) {
defer ctx.Cleanup()
// The database should start out with 0 for all totals
var expectedTotal int64
var expectedPieces int64
spaceUsedDB := db.PieceSpaceUsedDB()
err := spaceUsedDB.Init(ctx)
require.NoError(t, err)
actualTotal, err := spaceUsedDB.GetTotal(ctx)
piecesTotal, err := spaceUsedDB.GetPieceTotal(ctx)
require.NoError(t, err)
assert.Equal(t, expectedTotal, actualTotal)
assert.Equal(t, expectedPieces, piecesTotal)
var expectedTrash int64
err = spaceUsedDB.Init(ctx)
require.NoError(t, err)
trashTotal, err := spaceUsedDB.GetTrashTotal(ctx)
require.NoError(t, err)
assert.Equal(t, expectedTrash, trashTotal)
var expectedTotalBySA = map[storj.NodeID]int64{}
actualTotalBySA, err := spaceUsedDB.GetTotalsForAllSatellites(ctx)
actualTotalBySA, err := spaceUsedDB.GetPieceTotalsForAllSatellites(ctx)
require.NoError(t, err)
assert.Equal(t, expectedTotalBySA, actualTotalBySA)
// setup: update the cache then sync those cache values
// to the database
// setup the cache with zero values
expectedTotal = 150
expectedPieces = 150
expectedTotalBySA = map[storj.NodeID]int64{{1}: 100, {2}: 50}
cache := pieces.NewBlobsUsageCacheTest(nil, expectedTotal, expectedTotalBySA)
expectedTrash = 127
cache := pieces.NewBlobsUsageCacheTest(nil, expectedPieces, expectedTrash, expectedTotalBySA)
cacheService := pieces.NewService(zap.L(),
cache,
pieces.NewStore(zap.L(), cache, nil, nil, spaceUsedDB),
@ -146,27 +174,36 @@ func TestPersistCacheTotals(t *testing.T) {
require.NoError(t, err)
// Confirm those cache values are now saved persistently in the database
actualTotal, err = spaceUsedDB.GetTotal(ctx)
piecesTotal, err = spaceUsedDB.GetPieceTotal(ctx)
require.NoError(t, err)
assert.Equal(t, expectedTotal, actualTotal)
assert.Equal(t, expectedPieces, piecesTotal)
actualTotalBySA, err = spaceUsedDB.GetTotalsForAllSatellites(ctx)
actualTotalBySA, err = spaceUsedDB.GetPieceTotalsForAllSatellites(ctx)
require.NoError(t, err)
assert.Equal(t, expectedTotalBySA, actualTotalBySA)
trashTotal, err = spaceUsedDB.GetTrashTotal(ctx)
require.NoError(t, err)
assert.Equal(t, expectedTrash, trashTotal)
// Delete some piece content
pieceContentSize := -int64(100)
cache.Update(ctx, storj.NodeID{1}, pieceContentSize)
trashDelta := int64(104)
cache.Update(ctx, storj.NodeID{1}, pieceContentSize, trashDelta)
err = cacheService.PersistCacheTotals(ctx)
require.NoError(t, err)
// Confirm that the deleted stuff is not in the database anymore
actualTotal, err = spaceUsedDB.GetTotal(ctx)
piecesTotal, err = spaceUsedDB.GetPieceTotal(ctx)
require.NoError(t, err)
assert.Equal(t, expectedTotal+pieceContentSize, actualTotal)
assert.Equal(t, expectedPieces+pieceContentSize, piecesTotal)
trashTotal, err = spaceUsedDB.GetTrashTotal(ctx)
require.NoError(t, err)
assert.Equal(t, expectedTrash+trashDelta, trashTotal)
expectedTotalBySA = map[storj.NodeID]int64{{2}: 50}
actualTotalBySA, err = spaceUsedDB.GetTotalsForAllSatellites(ctx)
actualTotalBySA, err = spaceUsedDB.GetPieceTotalsForAllSatellites(ctx)
require.NoError(t, err)
assert.Equal(t, expectedTotalBySA, actualTotalBySA)
@ -180,18 +217,23 @@ func TestRecalculateCache(t *testing.T) {
end int64
new int64
expected int64
startTrash int64
endTrash int64
newTrash int64
expectedTrash int64
}{
{"1", 0, 0, 0, 0},
{"2", 0, 100, 0, 50},
{"3", 0, 100, 90, 140},
{"4", 0, 100, 110, 160},
{"5", 0, 100, -10, 40},
{"6", 0, 100, -200, 0},
{"7", 100, 0, 0, 0},
{"8", 100, 0, 90, 40},
{"9", 100, 0, 30, 0},
{"10", 100, 0, 110, 60},
{"11", 100, 0, -10, 0},
{"1", 0, 0, 0, 0, 0, 0, 0, 0},
{"2", 0, 100, 0, 50, 100, 110, 50, 55},
{"3", 0, 100, 90, 140, 0, 100, 50, 100},
{"4", 0, 100, 110, 160, 0, 100, -10, 40},
{"5", 0, 100, -10, 40, 0, 0, 0, 0},
{"6", 0, 100, -200, 0, 0, 0, 0, 0},
{"7", 100, 0, 0, 0, 0, 0, 0, 0},
{"8", 100, 0, 90, 40, 0, 0, 0, 0},
{"9", 100, 0, 30, 0, 0, 0, 0, 0},
{"10", 100, 0, 110, 60, 0, 0, 0, 0},
{"11", 100, 0, -10, 0, 0, 0, 0, 0},
}
for _, tt := range testCases {
@ -203,16 +245,18 @@ func TestRecalculateCache(t *testing.T) {
ID1 := storj.NodeID{1, 1}
cache := pieces.NewBlobsUsageCacheTest(nil,
tt.end,
tt.endTrash,
map[storj.NodeID]int64{ID1: tt.end},
)
err := cache.Recalculate(ctx,
cache.Recalculate(ctx,
tt.new,
tt.start,
map[storj.NodeID]int64{ID1: tt.new},
map[storj.NodeID]int64{ID1: tt.start},
tt.newTrash,
tt.startTrash,
)
require.NoError(t, err)
// Test: confirm correct cache values
actualTotalSpaceUsed, err := cache.SpaceUsedForPieces(ctx)
@ -222,6 +266,10 @@ func TestRecalculateCache(t *testing.T) {
actualTotalSpaceUsedBySA, err := cache.SpaceUsedBySatellite(ctx, ID1)
require.NoError(t, err)
assert.Equal(t, tt.expected, actualTotalSpaceUsedBySA)
actualTrash, err := cache.SpaceUsedForTrash(ctx)
require.NoError(t, err)
assert.Equal(t, tt.expectedTrash, actualTrash)
})
}
}
@ -238,16 +286,18 @@ func TestRecalculateCacheMissed(t *testing.T) {
// new recalculated values
cache := pieces.NewBlobsUsageCacheTest(nil,
int64(150),
int64(100),
map[storj.NodeID]int64{ID1: int64(100), ID2: int64(50)},
)
err := cache.Recalculate(ctx,
cache.Recalculate(ctx,
int64(100),
int64(0),
map[storj.NodeID]int64{ID1: int64(100)},
map[storj.NodeID]int64{ID1: int64(0)},
200,
0,
)
require.NoError(t, err)
// Test: confirm correct cache values
actualTotalSpaceUsed, err := cache.SpaceUsedForPieces(ctx)
@ -257,50 +307,86 @@ func TestRecalculateCacheMissed(t *testing.T) {
actualTotalSpaceUsedBySA, err := cache.SpaceUsedBySatellite(ctx, ID2)
require.NoError(t, err)
assert.Equal(t, int64(25), actualTotalSpaceUsedBySA)
actualTrash, err := cache.SpaceUsedForTrash(ctx)
require.NoError(t, err)
assert.Equal(t, int64(250), actualTrash)
}
func TestCacheCreateDelete(t *testing.T) {
func TestCacheCreateDeleteAndTrash(t *testing.T) {
storagenodedbtest.Run(t, func(t *testing.T, db storagenode.DB) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
cache := pieces.NewBlobsUsageCache(db.Pieces())
satelliteID := testrand.Bytes(32)
ref := storage.BlobRef{
Namespace: satelliteID,
Key: testrand.Bytes(32),
}
blob, err := cache.Create(ctx, ref, int64(4096))
require.NoError(t, err)
saID := storj.NodeID{}
copy(saID[:], satelliteID)
blobWriter, err := pieces.NewWriter(blob, cache, saID)
require.NoError(t, err)
pieceContent := []byte("stuff")
_, err = blobWriter.Write(pieceContent)
require.NoError(t, err)
header := pb.PieceHeader{}
err = blobWriter.Commit(ctx, &header)
require.NoError(t, err)
satelliteID := testrand.NodeID()
refs := []storage.BlobRef{
{
Namespace: satelliteID.Bytes(),
Key: testrand.Bytes(32),
},
{
Namespace: satelliteID.Bytes(),
Key: testrand.Bytes(32),
},
}
for _, ref := range refs {
blob, err := cache.Create(ctx, ref, int64(4096))
require.NoError(t, err)
blobWriter, err := pieces.NewWriter(blob, cache, satelliteID)
require.NoError(t, err)
_, err = blobWriter.Write(pieceContent)
require.NoError(t, err)
header := pb.PieceHeader{}
err = blobWriter.Commit(ctx, &header)
require.NoError(t, err)
}
// Expect that the cache has those bytes written for the piece
actualTotal, err := cache.SpaceUsedForPieces(ctx)
assertValues := func(msg string, satID storj.NodeID, expPiece, expTrash int) {
piecesTotal, err := cache.SpaceUsedForPieces(ctx)
require.NoError(t, err, msg)
assert.Equal(t, expPiece, int(piecesTotal), msg)
actualTotalBySA, err := cache.SpaceUsedBySatellite(ctx, satelliteID)
require.NoError(t, err, msg)
assert.Equal(t, expPiece, int(actualTotalBySA), msg)
trashTotal, err := cache.SpaceUsedForTrash(ctx)
require.NoError(t, err, msg)
assert.Equal(t, expTrash, int(trashTotal), msg)
}
assertValues("first write", satelliteID, len(pieceContent)*2, 0)
// Trash one piece
blobInfo, err := cache.Stat(ctx, refs[0])
require.NoError(t, err)
require.Equal(t, len(pieceContent), int(actualTotal))
actualTotalBySA, err := cache.SpaceUsedBySatellite(ctx, saID)
fileInfo, err := blobInfo.Stat(ctx)
require.NoError(t, err)
require.Equal(t, len(pieceContent), int(actualTotalBySA))
ref0Size := fileInfo.Size()
err = cache.Trash(ctx, refs[0])
require.NoError(t, err)
assertValues("trashed refs[0]", satelliteID, len(pieceContent), int(ref0Size))
// Restore one piece
_, err = cache.RestoreTrash(ctx, satelliteID.Bytes())
require.NoError(t, err)
assertValues("restore trash for satellite", satelliteID, len(pieceContent)*2, 0)
// Trash piece again
err = cache.Trash(ctx, refs[0])
require.NoError(t, err)
assertValues("trashed refs[0]", satelliteID, len(pieceContent), int(ref0Size))
// Empty trash
_, _, err = cache.EmptyTrash(ctx, satelliteID.Bytes(), time.Now().Add(24*time.Hour))
require.NoError(t, err)
assertValues("trashed refs[0]", satelliteID, len(pieceContent), 0)
// Delete that piece and confirm the cache is updated
err = cache.Delete(ctx, ref)
err = cache.Delete(ctx, refs[1])
require.NoError(t, err)
actualTotal, err = cache.SpaceUsedForPieces(ctx)
require.NoError(t, err)
require.Equal(t, 0, int(actualTotal))
actualTotalBySA, err = cache.SpaceUsedBySatellite(ctx, saID)
require.NoError(t, err)
require.Equal(t, 0, int(actualTotalBySA))
assertValues("delete refs[0]", satelliteID, 0, 0)
})
}
@ -347,7 +433,7 @@ func TestConcurrency(t *testing.T) {
var group errgroup.Group
group.Go(func() error {
node.Storage2.BlobsCache.Update(ctx, satellite.ID(), 1000)
node.Storage2.BlobsCache.Update(ctx, satellite.ID(), 1000, 0)
return nil
})
err := node.Storage2.CacheService.PersistCacheTotals(ctx)

View File

@ -119,7 +119,7 @@ func (w *Writer) Commit(ctx context.Context, pieceHeader *pb.PieceHeader) (err e
return Error.New("already closed")
}
if cache, ok := w.blobs.(*BlobsUsageCache); ok {
cache.Update(ctx, w.satellite, w.Size())
cache.Update(ctx, w.satellite, w.Size(), 0)
}
// point of no return: after this we definitely either commit or cancel
w.closed = true

View File

@ -111,16 +111,20 @@ type V0PieceInfoDBForTest interface {
//
// architecture: Database
type PieceSpaceUsedDB interface {
// Init creates the one total record if it doesn't already exist
// Init creates the one total and trash record if it doesn't already exist
Init(ctx context.Context) error
// GetTotal returns the total space used by all pieces stored on disk
GetTotal(ctx context.Context) (int64, error)
// GetPieceTotal returns the total space used by all pieces stored on disk
GetPieceTotal(ctx context.Context) (int64, error)
// UpdatePieceTotal updates the record for total spaced used for pieces with a new value
UpdatePieceTotal(ctx context.Context, newTotal int64) error
// GetTotalsForAllSatellites returns how much total space used by pieces stored on disk for each satelliteID
GetTotalsForAllSatellites(ctx context.Context) (map[storj.NodeID]int64, error)
// UpdateTotal updates the record for total spaced used with a new value
UpdateTotal(ctx context.Context, newTotal int64) error
// UpdateTotalsForAllSatellites updates each record for total spaced used with a new value for each satelliteID
UpdateTotalsForAllSatellites(ctx context.Context, newTotalsBySatellites map[storj.NodeID]int64) error
GetPieceTotalsForAllSatellites(ctx context.Context) (map[storj.NodeID]int64, error)
// UpdatePieceTotalsForAllSatellites updates each record for total spaced used with a new value for each satelliteID
UpdatePieceTotalsForAllSatellites(ctx context.Context, newTotalsBySatellites map[storj.NodeID]int64) error
// GetTrashTotal returns the total space used by trash
GetTrashTotal(ctx context.Context) (int64, error)
// UpdateTrashTotal updates the record for total spaced used for trash with a new value
UpdateTrashTotal(ctx context.Context, newTotal int64) error
}
// StoredPieceAccess allows inspection and manipulation of a piece during iteration with
@ -132,9 +136,9 @@ type StoredPieceAccess interface {
PieceID() storj.PieceID
// Satellite gives the nodeID of the satellite which owns the piece
Satellite() (storj.NodeID, error)
// ContentSize gives the size of the piece content (not including the piece header, if
// applicable)
ContentSize(ctx context.Context) (int64, error)
// Size gives the size of the piece on disk, and the size of the piece
// content (not including the piece header, if applicable)
Size(ctx context.Context) (int64, int64, error)
// CreationTime returns the piece creation time as given in the original PieceHash (which is
// likely not the same as the file mtime). For non-FormatV0 pieces, this requires opening
// the file and unmarshaling the piece header. If exact precision is not required, ModTime()
@ -316,7 +320,7 @@ func (store *Store) Trash(ctx context.Context, satellite storj.NodeID, pieceID s
func (store *Store) EmptyTrash(ctx context.Context, satelliteID storj.NodeID, trashedBefore time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
deletedIDs, err := store.blobs.EmptyTrash(ctx, satelliteID[:], trashedBefore)
_, deletedIDs, err := store.blobs.EmptyTrash(ctx, satelliteID[:], trashedBefore)
if err != nil {
return Error.Wrap(err)
}
@ -336,7 +340,7 @@ func (store *Store) EmptyTrash(ctx context.Context, satelliteID storj.NodeID, tr
func (store *Store) RestoreTrash(ctx context.Context, satelliteID storj.NodeID) (err error) {
defer mon.Task()(&ctx)(&err)
err = store.blobs.RestoreTrash(ctx, satelliteID.Bytes())
_, err = store.blobs.RestoreTrash(ctx, satelliteID.Bytes())
if err != nil {
return Error.Wrap(err)
}
@ -505,7 +509,7 @@ func (store *Store) DeleteFailed(ctx context.Context, expired ExpiredInfo, when
// traversal could cause this count to be undersized.
//
// Important note: this metric does not include space used by piece headers, whereas
// storj/filestore/store.(*Store).SpaceUsed() *does* include all space used by the blobs.
// storj/filestore/store.(*Store).SpaceUsedForBlobs() *does* include all space used by the blobs.
func (store *Store) SpaceUsedForPieces(ctx context.Context) (int64, error) {
if cache, ok := store.blobs.(*BlobsUsageCache); ok {
return cache.SpaceUsedForPieces(ctx)
@ -525,6 +529,28 @@ func (store *Store) SpaceUsedForPieces(ctx context.Context) (int64, error) {
return total, nil
}
// SpaceUsedForTrash returns the total space used by the the piece store's trash
func (store *Store) SpaceUsedForTrash(ctx context.Context) (int64, error) {
// If the blobs is cached, it will return the cached value
return store.blobs.SpaceUsedForTrash(ctx)
}
// SpaceUsedForPiecesAndTrash returns the total space used by both active
// pieces and the trash directory
func (store *Store) SpaceUsedForPiecesAndTrash(ctx context.Context) (int64, error) {
pieces, err := store.SpaceUsedForPieces(ctx)
if err != nil {
return 0, err
}
trash, err := store.SpaceUsedForTrash(ctx)
if err != nil {
return 0, err
}
return pieces + trash, nil
}
func (store *Store) getAllStoringSatellites(ctx context.Context) ([]storj.NodeID, error) {
namespaces, err := store.blobs.ListNamespaces(ctx)
if err != nil {
@ -546,7 +572,7 @@ func (store *Store) getAllStoringSatellites(ctx context.Context) ([]storj.NodeID
// that various errors in directory traversal could cause this count to be undersized.
//
// Important note: this metric does not include space used by piece headers, whereas
// storj/filestore/store.(*Store).SpaceUsedInNamespace() *does* include all space used by the
// storj/filestore/store.(*Store).SpaceUsedForBlobsInNamespace() *does* include all space used by the
// blobs.
func (store *Store) SpaceUsedBySatellite(ctx context.Context, satelliteID storj.NodeID) (int64, error) {
if cache, ok := store.blobs.(*BlobsUsageCache); ok {
@ -555,7 +581,7 @@ func (store *Store) SpaceUsedBySatellite(ctx context.Context, satelliteID storj.
var totalUsed int64
err := store.WalkSatellitePieces(ctx, satelliteID, func(access StoredPieceAccess) error {
contentSize, statErr := access.ContentSize(ctx)
_, contentSize, statErr := access.Size(ctx)
if statErr != nil {
store.log.Error("failed to stat", zap.Error(statErr), zap.Stringer("Piece ID", access.PieceID()), zap.Stringer("Satellite ID", satelliteID))
// keep iterating; we want a best effort total here.
@ -584,7 +610,7 @@ func (store *Store) SpaceUsedTotalAndBySatellite(ctx context.Context) (total int
var totalUsed int64
err := store.WalkSatellitePieces(ctx, satelliteID, func(access StoredPieceAccess) error {
contentSize, err := access.ContentSize(ctx)
_, contentSize, err := access.Size(ctx)
if err != nil {
return err
}
@ -654,18 +680,19 @@ func (access storedPieceAccess) Satellite() (storj.NodeID, error) {
return storj.NodeIDFromBytes(access.BlobRef().Namespace)
}
// ContentSize gives the size of the piece content (not including the piece header, if applicable)
func (access storedPieceAccess) ContentSize(ctx context.Context) (size int64, err error) {
// Size gives the size of the piece on disk, and the size of the content (not including the piece header, if applicable)
func (access storedPieceAccess) Size(ctx context.Context) (size, contentSize int64, err error) {
defer mon.Task()(&ctx)(&err)
stat, err := access.Stat(ctx)
if err != nil {
return 0, err
return 0, 0, err
}
size = stat.Size()
contentSize = size
if access.StorageFormatVersion() >= filestore.FormatV1 {
size -= V1PieceHeaderReservedArea
contentSize -= V1PieceHeaderReservedArea
}
return size, nil
return size, contentSize, nil
}
// CreationTime returns the piece creation time as given in the original PieceHash (which is likely

View File

@ -814,7 +814,7 @@ func TestOverwriteV0WithV1(t *testing.T) {
gotCreateTime, err := access.CreationTime(ctx)
require.NoError(t, err)
assert.Equal(t, v0CreateTime, gotCreateTime)
gotSize, err := access.ContentSize(ctx)
_, gotSize, err := access.Size(ctx)
require.NoError(t, err)
assert.Equal(t, int64(len(v0Data)), gotSize)
return nil
@ -857,7 +857,7 @@ func TestOverwriteV0WithV1(t *testing.T) {
gotCreateTime, err := access.CreationTime(ctx)
require.NoError(t, err)
assert.Equal(t, v1CreateTime, gotCreateTime)
gotSize, err := access.ContentSize(ctx)
_, gotSize, err := access.Size(ctx)
require.NoError(t, err)
assert.Equal(t, int64(len(v1Data)), gotSize)
case 2:
@ -867,7 +867,7 @@ func TestOverwriteV0WithV1(t *testing.T) {
gotCreateTime, err := access.CreationTime(ctx)
require.NoError(t, err)
assert.Equal(t, v0CreateTime, gotCreateTime)
gotSize, err := access.ContentSize(ctx)
_, gotSize, err := access.Size(ctx)
require.NoError(t, err)
assert.Equal(t, int64(len(v0Data)), gotSize)
default:

View File

@ -321,7 +321,7 @@ func setSpace(ctx context.Context, t *testing.T, planet *testplanet.Planet, spac
require.NoError(t, err)
// add these bytes to the space used cache so that we can test what happens
// when we exceeded available space on the storagenode
err = storageNode.DB.PieceSpaceUsedDB().UpdateTotal(ctx, availableSpace-space)
err = storageNode.DB.PieceSpaceUsedDB().UpdatePieceTotal(ctx, availableSpace-space)
require.NoError(t, err)
err = storageNode.Storage2.CacheService.Init(ctx)
require.NoError(t, err)

View File

@ -248,9 +248,9 @@ func (v0Access v0StoredPieceAccess) fillInBlobAccess(ctx context.Context) error
return nil
}
// ContentSize gives the size of the piece content (not including the piece header, if applicable)
func (v0Access v0StoredPieceAccess) ContentSize(ctx context.Context) (int64, error) {
return v0Access.pieceSize, nil
// Size gives the size of the piece, and the piece content size (not including the piece header, if applicable)
func (v0Access v0StoredPieceAccess) Size(ctx context.Context) (int64, int64, error) {
return v0Access.pieceSize, v0Access.pieceSize, nil
}
// CreationTime returns the piece creation time as given in the original order (which is not

View File

@ -18,40 +18,75 @@ var ErrPieceSpaceUsed = errs.Class("piece space used error")
// PieceSpaceUsedDBName represents the database name.
const PieceSpaceUsedDBName = "piece_spaced_used"
// trashTotalRowName is the special "satellite_id" used in the db to represent
// the total stored for trash. Similar to how we use NULL as a special value
// for satellite_id to represent the total for pieces, this value is used to
// identify the row storing the total for trash.
//
// It is intentionally an otherwise-invalid satellite_id (not 32 bytes) so that
// it cannot conflict with real satellite_id names
const trashTotalRowName = "trashtotal"
type pieceSpaceUsedDB struct {
dbContainerImpl
}
// Init creates the one total record if it doesn't already exist
// Init creates the total pieces and total trash records if they don't already exist
func (db *pieceSpaceUsedDB) Init(ctx context.Context) (err error) {
row := db.QueryRow(`
totalPiecesRow := db.QueryRow(`
SELECT total
FROM piece_space_used
WHERE satellite_id IS NULL;
`)
WHERE satellite_id IS NULL
AND satellite_id IS NOT ?;
`, trashTotalRowName)
var total int64
err = row.Scan(&total)
var piecesTotal int64
err = totalPiecesRow.Scan(&piecesTotal)
if err != nil {
if err == sql.ErrNoRows {
err = db.createInitTotal(ctx)
err = db.createInitTotalPieces(ctx)
if err != nil {
return ErrPieceSpaceUsed.Wrap(err)
}
}
}
totalTrashRow := db.QueryRow(`
SELECT total
FROM piece_space_used
WHERE satellite_id = ?;
`, trashTotalRowName)
var trashTotal int64
err = totalTrashRow.Scan(&trashTotal)
if err != nil {
if err == sql.ErrNoRows {
err = db.createInitTotalTrash(ctx)
if err != nil {
return ErrPieceSpaceUsed.Wrap(err)
}
}
}
return ErrPieceSpaceUsed.Wrap(err)
}
func (db *pieceSpaceUsedDB) createInitTotal(ctx context.Context) (err error) {
func (db *pieceSpaceUsedDB) createInitTotalPieces(ctx context.Context) (err error) {
_, err = db.Exec(`
INSERT INTO piece_space_used (total) VALUES (0)
`)
return ErrPieceSpaceUsed.Wrap(err)
}
// GetTotal returns the total space used by all pieces stored on disk
func (db *pieceSpaceUsedDB) GetTotal(ctx context.Context) (_ int64, err error) {
func (db *pieceSpaceUsedDB) createInitTotalTrash(ctx context.Context) (err error) {
_, err = db.Exec(`
INSERT INTO piece_space_used (total, satellite_id) VALUES (0, ?)
`, trashTotalRowName)
return ErrPieceSpaceUsed.Wrap(err)
}
// GetPieceTotal returns the total space used by all pieces stored on disk
func (db *pieceSpaceUsedDB) GetPieceTotal(ctx context.Context) (_ int64, err error) {
defer mon.Task()(&ctx)(&err)
row := db.QueryRow(`
@ -71,15 +106,37 @@ func (db *pieceSpaceUsedDB) GetTotal(ctx context.Context) (_ int64, err error) {
return total, nil
}
// GetTotalsForAllSatellites returns how much total space used by pieces stored on disk for each satelliteID
func (db *pieceSpaceUsedDB) GetTotalsForAllSatellites(ctx context.Context) (_ map[storj.NodeID]int64, err error) {
// GetTrashTotal returns the total space used by all trash
func (db *pieceSpaceUsedDB) GetTrashTotal(ctx context.Context) (_ int64, err error) {
defer mon.Task()(&ctx)(&err)
row := db.QueryRow(`
SELECT total
FROM piece_space_used
WHERE satellite_id = ?
`, trashTotalRowName)
var total int64
err = row.Scan(&total)
if err != nil {
if err == sql.ErrNoRows {
return total, nil
}
return total, ErrPieceSpaceUsed.Wrap(err)
}
return total, nil
}
// GetPieceTotalsForAllSatellites returns how much total space used by pieces stored on disk for each satelliteID
func (db *pieceSpaceUsedDB) GetPieceTotalsForAllSatellites(ctx context.Context) (_ map[storj.NodeID]int64, err error) {
defer mon.Task()(&ctx)(&err)
rows, err := db.QueryContext(ctx, `
SELECT total, satellite_id
FROM piece_space_used
WHERE satellite_id IS NOT NULL
`)
AND satellite_id IS NOT ?
`, trashTotalRowName)
if err != nil {
if err == sql.ErrNoRows {
return nil, nil
@ -102,8 +159,8 @@ func (db *pieceSpaceUsedDB) GetTotalsForAllSatellites(ctx context.Context) (_ ma
return totalBySatellite, nil
}
// UpdateTotal updates the record for total spaced used with a new value
func (db *pieceSpaceUsedDB) UpdateTotal(ctx context.Context, newTotal int64) (err error) {
// UpdatePieceTotal updates the record for total spaced used with a new value
func (db *pieceSpaceUsedDB) UpdatePieceTotal(ctx context.Context, newTotal int64) (err error) {
defer mon.Task()(&ctx)(&err)
_, err = db.ExecContext(ctx, `
@ -115,8 +172,21 @@ func (db *pieceSpaceUsedDB) UpdateTotal(ctx context.Context, newTotal int64) (er
return ErrPieceSpaceUsed.Wrap(err)
}
// UpdateTotalsForAllSatellites updates each record for total spaced used with a new value for each satelliteID
func (db *pieceSpaceUsedDB) UpdateTotalsForAllSatellites(ctx context.Context, newTotalsBySatellites map[storj.NodeID]int64) (err error) {
// UpdateTrashTotal updates the record for total spaced used with a new value
func (db *pieceSpaceUsedDB) UpdateTrashTotal(ctx context.Context, newTotal int64) (err error) {
defer mon.Task()(&ctx)(&err)
_, err = db.ExecContext(ctx, `
UPDATE piece_space_used
SET total = ?
WHERE satellite_id = ?
`, newTotal, trashTotalRowName)
return ErrPieceSpaceUsed.Wrap(err)
}
// UpdatePieceTotalsForAllSatellites updates each record for total spaced used with a new value for each satelliteID
func (db *pieceSpaceUsedDB) UpdatePieceTotalsForAllSatellites(ctx context.Context, newTotalsBySatellites map[storj.NodeID]int64) (err error) {
defer mon.Task()(&ctx)(&err)
for satelliteID, newTotal := range newTotalsBySatellites {