2019-08-12 22:43:05 +01:00
|
|
|
// Copyright (C) 2019 Storj Labs, Inc.
|
|
|
|
// See LICENSE for copying information.
|
|
|
|
|
|
|
|
package pieces
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2020-01-30 22:25:04 +00:00
|
|
|
"fmt"
|
2019-08-12 22:43:05 +01:00
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
2019-12-21 13:11:24 +00:00
|
|
|
"github.com/zeebo/errs"
|
2019-08-12 22:43:05 +01:00
|
|
|
"go.uber.org/zap"
|
|
|
|
|
2019-12-27 11:48:47 +00:00
|
|
|
"storj.io/common/storj"
|
|
|
|
"storj.io/common/sync2"
|
2019-08-12 22:43:05 +01:00
|
|
|
"storj.io/storj/storage"
|
|
|
|
)
|
|
|
|
|
|
|
|
// CacheService updates the space used cache
|
2019-09-10 14:24:16 +01:00
|
|
|
//
|
|
|
|
// architecture: Chore
|
2019-08-12 22:43:05 +01:00
|
|
|
type CacheService struct {
|
|
|
|
log *zap.Logger
|
|
|
|
usageCache *BlobsUsageCache
|
|
|
|
store *Store
|
2020-01-29 15:37:50 +00:00
|
|
|
Loop *sync2.Cycle
|
2019-08-12 22:43:05 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// NewService creates a new cache service that updates the space usage cache on startup and syncs the cache values to
|
|
|
|
// persistent storage on an interval
|
|
|
|
func NewService(log *zap.Logger, usageCache *BlobsUsageCache, pieces *Store, interval time.Duration) *CacheService {
|
|
|
|
return &CacheService{
|
|
|
|
log: log,
|
|
|
|
usageCache: usageCache,
|
|
|
|
store: pieces,
|
2020-01-29 15:37:50 +00:00
|
|
|
Loop: sync2.NewCycle(interval),
|
2019-08-12 22:43:05 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Run recalculates the space used cache once and also runs a loop to sync the space used cache
|
|
|
|
// to persistent storage on an interval
|
|
|
|
func (service *CacheService) Run(ctx context.Context) (err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
2020-01-07 23:34:51 +00:00
|
|
|
totalsAtStart := service.usageCache.copyCacheTotals()
|
2019-08-12 22:43:05 +01:00
|
|
|
|
|
|
|
// recalculate the cache once
|
2020-01-07 23:34:51 +00:00
|
|
|
piecesTotal, piecesContentSize, totalsBySatellite, err := service.store.SpaceUsedTotalAndBySatellite(ctx)
|
2019-08-12 22:43:05 +01:00
|
|
|
if err != nil {
|
|
|
|
service.log.Error("error getting current space used calculation: ", zap.Error(err))
|
|
|
|
}
|
2020-01-30 18:30:48 +00:00
|
|
|
trashTotal, err := service.usageCache.Blobs.SpaceUsedForTrash(ctx)
|
2019-12-21 13:11:24 +00:00
|
|
|
if err != nil {
|
|
|
|
service.log.Error("error getting current space for trash: ", zap.Error(err))
|
2019-08-12 22:43:05 +01:00
|
|
|
}
|
2020-01-07 23:34:51 +00:00
|
|
|
service.usageCache.Recalculate(
|
|
|
|
piecesTotal,
|
|
|
|
totalsAtStart.piecesTotal,
|
|
|
|
piecesContentSize,
|
|
|
|
totalsAtStart.piecesContentSize,
|
|
|
|
trashTotal,
|
|
|
|
totalsAtStart.trashTotal,
|
|
|
|
totalsBySatellite,
|
|
|
|
totalsAtStart.spaceUsedBySatellite,
|
2019-12-21 13:11:24 +00:00
|
|
|
)
|
2019-08-12 22:43:05 +01:00
|
|
|
|
|
|
|
if err = service.store.spaceUsedDB.Init(ctx); err != nil {
|
|
|
|
service.log.Error("error during init space usage db: ", zap.Error(err))
|
|
|
|
}
|
|
|
|
|
2020-01-29 15:37:50 +00:00
|
|
|
return service.Loop.Run(ctx, func(ctx context.Context) (err error) {
|
2019-08-12 22:43:05 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
|
|
|
// on a loop sync the cache values to the db so that we have the them saved
|
|
|
|
// in the case that the storagenode restarts
|
|
|
|
if err := service.PersistCacheTotals(ctx); err != nil {
|
|
|
|
service.log.Error("error persisting cache totals to the database: ", zap.Error(err))
|
|
|
|
}
|
|
|
|
return err
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
// PersistCacheTotals saves the current totals of the space used cache to the database
|
|
|
|
// so that if the storagenode restarts it can retrieve the latest space used
|
|
|
|
// values without needing to recalculate since that could take a long time
|
|
|
|
func (service *CacheService) PersistCacheTotals(ctx context.Context) error {
|
|
|
|
cache := service.usageCache
|
2019-09-12 17:42:39 +01:00
|
|
|
cache.mu.Lock()
|
|
|
|
defer cache.mu.Unlock()
|
2020-01-07 23:34:51 +00:00
|
|
|
if err := service.store.spaceUsedDB.UpdatePieceTotals(ctx, cache.piecesTotal, cache.piecesContentSize); err != nil {
|
2019-08-12 22:43:05 +01:00
|
|
|
return err
|
|
|
|
}
|
2019-12-21 13:11:24 +00:00
|
|
|
if err := service.store.spaceUsedDB.UpdatePieceTotalsForAllSatellites(ctx, cache.spaceUsedBySatellite); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2020-01-07 23:34:51 +00:00
|
|
|
if err := service.store.spaceUsedDB.UpdateTrashTotal(ctx, cache.trashTotal); err != nil {
|
2019-08-12 22:43:05 +01:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Init initializes the space used cache with the most recent values that were stored persistently
|
|
|
|
func (service *CacheService) Init(ctx context.Context) (err error) {
|
2020-01-07 23:34:51 +00:00
|
|
|
piecesTotal, piecesContentSize, err := service.store.spaceUsedDB.GetPieceTotals(ctx)
|
2019-08-12 22:43:05 +01:00
|
|
|
if err != nil {
|
|
|
|
service.log.Error("CacheServiceInit error during initializing space usage cache GetTotal:", zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2020-01-07 23:34:51 +00:00
|
|
|
totalsBySatellite, err := service.store.spaceUsedDB.GetPieceTotalsForAllSatellites(ctx)
|
2019-08-12 22:43:05 +01:00
|
|
|
if err != nil {
|
|
|
|
service.log.Error("CacheServiceInit error during initializing space usage cache GetTotalsForAllSatellites:", zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2020-01-02 18:15:23 +00:00
|
|
|
trashTotal, err := service.store.spaceUsedDB.GetTrashTotal(ctx)
|
|
|
|
if err != nil {
|
|
|
|
service.log.Error("CacheServiceInit error during initializing space usage cache GetTrashTotal:", zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2020-01-07 23:34:51 +00:00
|
|
|
service.usageCache.init(piecesTotal, piecesContentSize, trashTotal, totalsBySatellite)
|
2019-08-12 22:43:05 +01:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Close closes the loop
|
|
|
|
func (service *CacheService) Close() (err error) {
|
2020-01-29 15:37:50 +00:00
|
|
|
service.Loop.Close()
|
2019-08-12 22:43:05 +01:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// BlobsUsageCache is a blob storage with a cache for storing
|
2020-01-23 17:47:20 +00:00
|
|
|
// totals of current space used.
|
|
|
|
//
|
|
|
|
// The following names have the following meaning:
|
|
|
|
// - piecesTotal: the total space used by pieces, including headers
|
|
|
|
// - piecesContentSize: the space used by piece content, not including headers
|
|
|
|
// - trashTotal: the total space used in the trash, including headers
|
|
|
|
// - pieceTotal and pieceContentSize are the corollary for a single file
|
2019-09-10 14:24:16 +01:00
|
|
|
//
|
|
|
|
// architecture: Database
|
2019-08-12 22:43:05 +01:00
|
|
|
type BlobsUsageCache struct {
|
|
|
|
storage.Blobs
|
2020-01-30 18:01:50 +00:00
|
|
|
log *zap.Logger
|
2019-08-12 22:43:05 +01:00
|
|
|
|
2019-12-21 13:11:24 +00:00
|
|
|
mu sync.Mutex
|
2020-01-07 23:34:51 +00:00
|
|
|
piecesTotal int64
|
|
|
|
piecesContentSize int64
|
|
|
|
trashTotal int64
|
|
|
|
spaceUsedBySatellite map[storj.NodeID]SatelliteUsage
|
2019-08-12 22:43:05 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// NewBlobsUsageCache creates a new disk blob store with a space used cache
|
2020-01-30 18:01:50 +00:00
|
|
|
func NewBlobsUsageCache(log *zap.Logger, blob storage.Blobs) *BlobsUsageCache {
|
2019-08-12 22:43:05 +01:00
|
|
|
return &BlobsUsageCache{
|
2020-01-30 18:01:50 +00:00
|
|
|
log: log,
|
2019-12-21 13:11:24 +00:00
|
|
|
Blobs: blob,
|
2020-01-07 23:34:51 +00:00
|
|
|
spaceUsedBySatellite: map[storj.NodeID]SatelliteUsage{},
|
2019-08-12 22:43:05 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewBlobsUsageCacheTest creates a new disk blob store with a space used cache
|
2020-01-30 19:30:34 +00:00
|
|
|
func NewBlobsUsageCacheTest(log *zap.Logger, blob storage.Blobs, piecesTotal, piecesContentSize, trashTotal int64, spaceUsedBySatellite map[storj.NodeID]SatelliteUsage) *BlobsUsageCache {
|
2019-08-12 22:43:05 +01:00
|
|
|
return &BlobsUsageCache{
|
2020-01-30 19:30:34 +00:00
|
|
|
log: log,
|
2019-12-21 13:11:24 +00:00
|
|
|
Blobs: blob,
|
2020-01-07 23:34:51 +00:00
|
|
|
piecesTotal: piecesTotal,
|
|
|
|
piecesContentSize: piecesContentSize,
|
|
|
|
trashTotal: trashTotal,
|
2019-12-21 13:11:24 +00:00
|
|
|
spaceUsedBySatellite: spaceUsedBySatellite,
|
2019-08-12 22:43:05 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-01-07 23:34:51 +00:00
|
|
|
func (blobs *BlobsUsageCache) init(pieceTotal, contentSize, trashTotal int64, totalsBySatellite map[storj.NodeID]SatelliteUsage) {
|
2019-08-12 22:43:05 +01:00
|
|
|
blobs.mu.Lock()
|
|
|
|
defer blobs.mu.Unlock()
|
2020-01-07 23:34:51 +00:00
|
|
|
blobs.piecesTotal = pieceTotal
|
|
|
|
blobs.piecesContentSize = contentSize
|
|
|
|
blobs.trashTotal = trashTotal
|
|
|
|
blobs.spaceUsedBySatellite = totalsBySatellite
|
2019-08-12 22:43:05 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// SpaceUsedBySatellite returns the current total space used for a specific
|
2020-01-07 23:34:51 +00:00
|
|
|
// satellite for all pieces
|
|
|
|
func (blobs *BlobsUsageCache) SpaceUsedBySatellite(ctx context.Context, satelliteID storj.NodeID) (piecesTotal int64, piecesContentSize int64, err error) {
|
2019-08-12 22:43:05 +01:00
|
|
|
blobs.mu.Lock()
|
|
|
|
defer blobs.mu.Unlock()
|
2020-01-07 23:34:51 +00:00
|
|
|
values := blobs.spaceUsedBySatellite[satelliteID]
|
|
|
|
return values.Total, values.ContentSize, nil
|
2019-08-12 22:43:05 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// SpaceUsedForPieces returns the current total used space for
|
2020-01-07 23:34:51 +00:00
|
|
|
//// all pieces
|
|
|
|
func (blobs *BlobsUsageCache) SpaceUsedForPieces(ctx context.Context) (int64, int64, error) {
|
2019-08-12 22:43:05 +01:00
|
|
|
blobs.mu.Lock()
|
|
|
|
defer blobs.mu.Unlock()
|
2020-01-07 23:34:51 +00:00
|
|
|
return blobs.piecesTotal, blobs.piecesContentSize, nil
|
2019-12-21 13:11:24 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// SpaceUsedForTrash returns the current total used space for the trash dir
|
|
|
|
func (blobs *BlobsUsageCache) SpaceUsedForTrash(ctx context.Context) (int64, error) {
|
|
|
|
blobs.mu.Lock()
|
|
|
|
defer blobs.mu.Unlock()
|
2020-01-07 23:34:51 +00:00
|
|
|
return blobs.trashTotal, nil
|
2019-08-12 22:43:05 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// Delete gets the size of the piece that is going to be deleted then deletes it and
|
|
|
|
// updates the space used cache accordingly
|
|
|
|
func (blobs *BlobsUsageCache) Delete(ctx context.Context, blobRef storage.BlobRef) error {
|
2020-01-07 23:34:51 +00:00
|
|
|
pieceTotal, pieceContentSize, err := blobs.pieceSizes(ctx, blobRef)
|
2019-12-21 13:11:24 +00:00
|
|
|
if err != nil {
|
|
|
|
return Error.Wrap(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := blobs.Blobs.Delete(ctx, blobRef); err != nil {
|
|
|
|
return Error.Wrap(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
satelliteID, err := storj.NodeIDFromBytes(blobRef.Namespace)
|
2019-08-12 22:43:05 +01:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2020-01-07 23:34:51 +00:00
|
|
|
blobs.Update(ctx, satelliteID, -pieceTotal, -pieceContentSize, 0)
|
2019-12-21 13:11:24 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2020-01-07 23:34:51 +00:00
|
|
|
func (blobs *BlobsUsageCache) pieceSizes(ctx context.Context, blobRef storage.BlobRef) (pieceTotal int64, pieceContentSize int64, err error) {
|
2019-12-21 13:11:24 +00:00
|
|
|
blobInfo, err := blobs.Stat(ctx, blobRef)
|
|
|
|
if err != nil {
|
|
|
|
return 0, 0, err
|
|
|
|
}
|
2019-08-12 22:43:05 +01:00
|
|
|
pieceAccess, err := newStoredPieceAccess(nil, blobInfo)
|
|
|
|
if err != nil {
|
2019-12-21 13:11:24 +00:00
|
|
|
return 0, 0, err
|
|
|
|
}
|
|
|
|
return pieceAccess.Size(ctx)
|
|
|
|
}
|
|
|
|
|
2020-01-07 23:34:51 +00:00
|
|
|
// Update updates the cache totals
|
|
|
|
func (blobs *BlobsUsageCache) Update(ctx context.Context, satelliteID storj.NodeID, piecesTotalDelta, piecesContentSizeDelta, trashDelta int64) {
|
2019-12-21 13:11:24 +00:00
|
|
|
blobs.mu.Lock()
|
|
|
|
defer blobs.mu.Unlock()
|
2020-01-30 18:01:50 +00:00
|
|
|
|
2020-01-30 22:25:04 +00:00
|
|
|
blobs.piecesTotal += piecesTotalDelta
|
|
|
|
blobs.piecesContentSize += piecesContentSizeDelta
|
|
|
|
blobs.trashTotal += trashDelta
|
2020-01-30 18:01:50 +00:00
|
|
|
|
2020-01-30 22:25:04 +00:00
|
|
|
blobs.ensurePositiveCacheValue(&blobs.piecesTotal, "piecesTotal")
|
|
|
|
blobs.ensurePositiveCacheValue(&blobs.piecesContentSize, "piecesContentSize")
|
|
|
|
blobs.ensurePositiveCacheValue(&blobs.trashTotal, "trashTotal")
|
2020-01-30 18:01:50 +00:00
|
|
|
|
2020-01-07 23:34:51 +00:00
|
|
|
oldVals := blobs.spaceUsedBySatellite[satelliteID]
|
2020-01-30 22:25:04 +00:00
|
|
|
newVals := SatelliteUsage{
|
|
|
|
Total: oldVals.Total + piecesTotalDelta,
|
|
|
|
ContentSize: oldVals.ContentSize + piecesContentSizeDelta,
|
2020-01-30 18:01:50 +00:00
|
|
|
}
|
2020-01-30 22:25:04 +00:00
|
|
|
blobs.ensurePositiveCacheValue(&newVals.Total, "satPiecesTotal")
|
|
|
|
blobs.ensurePositiveCacheValue(&newVals.ContentSize, "satPiecesContentSize")
|
|
|
|
blobs.spaceUsedBySatellite[satelliteID] = newVals
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
func (blobs *BlobsUsageCache) ensurePositiveCacheValue(value *int64, name string) {
|
|
|
|
if *value >= 0 {
|
|
|
|
return
|
2020-01-07 23:34:51 +00:00
|
|
|
}
|
2020-01-30 22:25:04 +00:00
|
|
|
blobs.log.Error(fmt.Sprintf("%s < 0", name), zap.Int64(name, *value))
|
|
|
|
*value = 0
|
2019-12-21 13:11:24 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Trash moves the ref to the trash and updates the cache
|
|
|
|
func (blobs *BlobsUsageCache) Trash(ctx context.Context, blobRef storage.BlobRef) error {
|
2020-01-07 23:34:51 +00:00
|
|
|
pieceTotal, pieceContentSize, err := blobs.pieceSizes(ctx, blobRef)
|
2019-12-21 13:11:24 +00:00
|
|
|
if err != nil {
|
|
|
|
return Error.Wrap(err)
|
2019-08-12 22:43:05 +01:00
|
|
|
}
|
2019-12-21 13:11:24 +00:00
|
|
|
|
|
|
|
err = blobs.Blobs.Trash(ctx, blobRef)
|
2019-08-12 22:43:05 +01:00
|
|
|
if err != nil {
|
|
|
|
return Error.Wrap(err)
|
|
|
|
}
|
|
|
|
|
2019-12-21 13:11:24 +00:00
|
|
|
satelliteID, err := storj.NodeIDFromBytes(blobRef.Namespace)
|
|
|
|
if err != nil {
|
2019-08-12 22:43:05 +01:00
|
|
|
return Error.Wrap(err)
|
|
|
|
}
|
|
|
|
|
2020-01-07 23:34:51 +00:00
|
|
|
blobs.Update(ctx, satelliteID, -pieceTotal, -pieceContentSize, pieceTotal)
|
2019-08-12 22:43:05 +01:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2019-12-21 13:11:24 +00:00
|
|
|
// EmptyTrash empties the trash and updates the cache
|
|
|
|
func (blobs *BlobsUsageCache) EmptyTrash(ctx context.Context, namespace []byte, trashedBefore time.Time) (int64, [][]byte, error) {
|
|
|
|
satelliteID, err := storj.NodeIDFromBytes(namespace)
|
|
|
|
if err != nil {
|
|
|
|
return 0, nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
bytesEmptied, keys, err := blobs.Blobs.EmptyTrash(ctx, namespace, trashedBefore)
|
|
|
|
if err != nil {
|
|
|
|
return 0, nil, err
|
|
|
|
}
|
|
|
|
|
2020-01-07 23:34:51 +00:00
|
|
|
blobs.Update(ctx, satelliteID, 0, 0, -bytesEmptied)
|
2019-12-21 13:11:24 +00:00
|
|
|
|
|
|
|
return bytesEmptied, keys, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// RestoreTrash restores the trash for the namespace and updates the cache
|
|
|
|
func (blobs *BlobsUsageCache) RestoreTrash(ctx context.Context, namespace []byte) ([][]byte, error) {
|
|
|
|
satelliteID, err := storj.NodeIDFromBytes(namespace)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
keysRestored, err := blobs.Blobs.RestoreTrash(ctx, namespace)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, key := range keysRestored {
|
2020-01-07 23:34:51 +00:00
|
|
|
pieceTotal, pieceContentSize, sizeErr := blobs.pieceSizes(ctx, storage.BlobRef{
|
2019-12-21 13:11:24 +00:00
|
|
|
Key: key,
|
|
|
|
Namespace: namespace,
|
|
|
|
})
|
|
|
|
if sizeErr != nil {
|
|
|
|
err = errs.Combine(err, sizeErr)
|
|
|
|
continue
|
|
|
|
}
|
2020-01-07 23:34:51 +00:00
|
|
|
blobs.Update(ctx, satelliteID, pieceTotal, pieceContentSize, -pieceTotal)
|
2019-12-21 13:11:24 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return keysRestored, err
|
2019-08-12 22:43:05 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
func (blobs *BlobsUsageCache) copyCacheTotals() BlobsUsageCache {
|
2019-09-12 17:42:39 +01:00
|
|
|
blobs.mu.Lock()
|
|
|
|
defer blobs.mu.Unlock()
|
2020-01-07 23:34:51 +00:00
|
|
|
var copyMap = map[storj.NodeID]SatelliteUsage{}
|
2019-12-21 13:11:24 +00:00
|
|
|
for k, v := range blobs.spaceUsedBySatellite {
|
2019-08-12 22:43:05 +01:00
|
|
|
copyMap[k] = v
|
|
|
|
}
|
|
|
|
return BlobsUsageCache{
|
2020-01-07 23:34:51 +00:00
|
|
|
piecesTotal: blobs.piecesTotal,
|
|
|
|
piecesContentSize: blobs.piecesContentSize,
|
|
|
|
trashTotal: blobs.trashTotal,
|
2019-12-21 13:11:24 +00:00
|
|
|
spaceUsedBySatellite: copyMap,
|
2019-08-12 22:43:05 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Recalculate estimates new totals for the space used cache. In order to get new totals for the
|
|
|
|
// space used cache, we had to iterate over all the pieces on disk. Since that can potentially take
|
|
|
|
// a long time, here we need to check if we missed any additions/deletions while we were iterating and
|
|
|
|
// estimate how many bytes missed then add those to the space used result of iteration.
|
2020-01-07 23:34:51 +00:00
|
|
|
func (blobs *BlobsUsageCache) Recalculate(
|
|
|
|
piecesTotal,
|
|
|
|
piecesTotalAtStart,
|
|
|
|
piecesContentSize,
|
|
|
|
piecesContentSizeAtStart,
|
|
|
|
trashTotal,
|
|
|
|
trashTotalAtStart int64,
|
|
|
|
totalsBySatellite,
|
|
|
|
totalsBySatelliteAtStart map[storj.NodeID]SatelliteUsage,
|
|
|
|
) {
|
|
|
|
|
|
|
|
totalsAtEnd := blobs.copyCacheTotals()
|
|
|
|
|
|
|
|
estimatedPiecesTotal := estimate(
|
|
|
|
piecesTotal,
|
|
|
|
piecesTotalAtStart,
|
|
|
|
totalsAtEnd.piecesTotal,
|
|
|
|
)
|
2019-08-12 22:43:05 +01:00
|
|
|
|
2020-01-07 23:34:51 +00:00
|
|
|
estimatedTotalTrash := estimate(
|
|
|
|
trashTotal,
|
|
|
|
trashTotalAtStart,
|
|
|
|
totalsAtEnd.trashTotal,
|
2019-08-12 22:43:05 +01:00
|
|
|
)
|
|
|
|
|
2020-01-07 23:34:51 +00:00
|
|
|
estimatedPiecesContentSize := estimate(
|
|
|
|
piecesContentSize,
|
|
|
|
piecesContentSizeAtStart,
|
|
|
|
totalsAtEnd.piecesContentSize,
|
|
|
|
)
|
2019-12-21 13:11:24 +00:00
|
|
|
|
2020-01-07 23:34:51 +00:00
|
|
|
var estimatedTotalsBySatellite = map[storj.NodeID]SatelliteUsage{}
|
|
|
|
for ID, values := range totalsBySatellite {
|
|
|
|
estimatedTotal := estimate(
|
|
|
|
values.Total,
|
|
|
|
totalsBySatelliteAtStart[ID].Total,
|
|
|
|
totalsAtEnd.spaceUsedBySatellite[ID].Total,
|
2019-08-12 22:43:05 +01:00
|
|
|
)
|
2020-01-07 23:34:51 +00:00
|
|
|
estimatedPiecesContentSize := estimate(
|
|
|
|
values.ContentSize,
|
|
|
|
totalsBySatelliteAtStart[ID].ContentSize,
|
|
|
|
totalsAtEnd.spaceUsedBySatellite[ID].ContentSize,
|
|
|
|
)
|
|
|
|
// if the estimatedTotal is zero then there is no data stored
|
2019-08-12 22:43:05 +01:00
|
|
|
// for this satelliteID so don't add it to the cache
|
2020-01-07 23:34:51 +00:00
|
|
|
if estimatedTotal == 0 && estimatedPiecesContentSize == 0 {
|
2019-08-12 22:43:05 +01:00
|
|
|
continue
|
|
|
|
}
|
2020-01-07 23:34:51 +00:00
|
|
|
estimatedTotalsBySatellite[ID] = SatelliteUsage{
|
|
|
|
Total: estimatedTotal,
|
|
|
|
ContentSize: estimatedPiecesContentSize,
|
|
|
|
}
|
2019-08-12 22:43:05 +01:00
|
|
|
}
|
|
|
|
|
2020-01-07 23:34:51 +00:00
|
|
|
// find any saIDs that are in totalsAtEnd but not in totalsBySatellite
|
|
|
|
missedWhenIterationEnded := getMissed(totalsAtEnd.spaceUsedBySatellite,
|
|
|
|
totalsBySatellite,
|
2019-08-12 22:43:05 +01:00
|
|
|
)
|
|
|
|
if len(missedWhenIterationEnded) > 0 {
|
|
|
|
for ID := range missedWhenIterationEnded {
|
2020-01-07 23:34:51 +00:00
|
|
|
estimatedTotal := estimate(
|
|
|
|
0,
|
|
|
|
totalsBySatelliteAtStart[ID].Total,
|
|
|
|
totalsAtEnd.spaceUsedBySatellite[ID].Total,
|
2019-08-12 22:43:05 +01:00
|
|
|
)
|
2020-01-07 23:34:51 +00:00
|
|
|
estimatedPiecesContentSize := estimate(
|
|
|
|
0,
|
|
|
|
totalsBySatelliteAtStart[ID].ContentSize,
|
|
|
|
totalsAtEnd.spaceUsedBySatellite[ID].ContentSize,
|
|
|
|
)
|
|
|
|
if estimatedTotal == 0 && estimatedPiecesContentSize == 0 {
|
2019-08-12 22:43:05 +01:00
|
|
|
continue
|
|
|
|
}
|
2020-01-07 23:34:51 +00:00
|
|
|
estimatedTotalsBySatellite[ID] = SatelliteUsage{
|
|
|
|
Total: estimatedTotal,
|
|
|
|
ContentSize: estimatedPiecesContentSize,
|
|
|
|
}
|
2019-08-12 22:43:05 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
blobs.mu.Lock()
|
2020-01-07 23:34:51 +00:00
|
|
|
blobs.piecesTotal = estimatedPiecesTotal
|
|
|
|
blobs.piecesContentSize = estimatedPiecesContentSize
|
|
|
|
blobs.trashTotal = estimatedTotalTrash
|
2019-12-21 13:11:24 +00:00
|
|
|
blobs.spaceUsedBySatellite = estimatedTotalsBySatellite
|
2019-08-12 22:43:05 +01:00
|
|
|
blobs.mu.Unlock()
|
|
|
|
}
|
|
|
|
|
|
|
|
func estimate(newSpaceUsedTotal, totalAtIterationStart, totalAtIterationEnd int64) int64 {
|
|
|
|
if newSpaceUsedTotal == totalAtIterationEnd {
|
2020-01-30 22:25:04 +00:00
|
|
|
if newSpaceUsedTotal < 0 {
|
|
|
|
return 0
|
|
|
|
}
|
2019-08-12 22:43:05 +01:00
|
|
|
return newSpaceUsedTotal
|
|
|
|
}
|
|
|
|
|
|
|
|
// If we missed writes/deletes while iterating, we will assume that half of those missed occurred before
|
|
|
|
// the iteration and half occurred after. So here we add half of the delta to the result space used totals
|
|
|
|
// from the iteration to account for those missed.
|
|
|
|
spaceUsedDeltaDuringIteration := totalAtIterationEnd - totalAtIterationStart
|
|
|
|
estimatedTotal := newSpaceUsedTotal + (spaceUsedDeltaDuringIteration / 2)
|
|
|
|
if estimatedTotal < 0 {
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
return estimatedTotal
|
|
|
|
}
|
|
|
|
|
2020-01-07 23:34:51 +00:00
|
|
|
func getMissed(endTotals, newTotals map[storj.NodeID]SatelliteUsage) map[storj.NodeID]SatelliteUsage {
|
|
|
|
var missed = map[storj.NodeID]SatelliteUsage{}
|
|
|
|
for id, vals := range endTotals {
|
2019-08-12 22:43:05 +01:00
|
|
|
if _, ok := newTotals[id]; !ok {
|
2020-01-07 23:34:51 +00:00
|
|
|
missed[id] = vals
|
2019-08-12 22:43:05 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
return missed
|
|
|
|
}
|
|
|
|
|
|
|
|
// Close satisfies the pieces interface
|
|
|
|
func (blobs *BlobsUsageCache) Close() error {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// TestCreateV0 creates a new V0 blob that can be written. This is only appropriate in test situations.
|
|
|
|
func (blobs *BlobsUsageCache) TestCreateV0(ctx context.Context, ref storage.BlobRef) (_ storage.BlobWriter, err error) {
|
|
|
|
fStore := blobs.Blobs.(interface {
|
|
|
|
TestCreateV0(ctx context.Context, ref storage.BlobRef) (_ storage.BlobWriter, err error)
|
|
|
|
})
|
|
|
|
return fStore.TestCreateV0(ctx, ref)
|
|
|
|
}
|