storj/storagenode/pieces/cache.go
Isaac Hess 14fd6a9ef0 storagenode/pieces: Track total piece size
This change updates the storagenode piecestore apis to expose access to
the full piece size stored on disk. Previously we only had access to
(and only kept a cache of) the content size used for all pieces. This
was inaccurate when reporting the amount of disk space used by nodes.

We now have access to the total content size, as well as the total disk
usage, of all pieces. The pieces cache also keeps a cache of the total
piece size along with the content size.

Change-Id: I4fffe7e1257e04c46021a2e37c5adc6fe69bee55
2020-01-23 11:00:24 -07:00

454 lines
14 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package pieces
import (
"context"
"sync"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/storj/storage"
)
// CacheService updates the space used cache
//
// architecture: Chore
type CacheService struct {
log *zap.Logger
usageCache *BlobsUsageCache
store *Store
loop sync2.Cycle
}
// NewService creates a new cache service that updates the space usage cache on startup and syncs the cache values to
// persistent storage on an interval
func NewService(log *zap.Logger, usageCache *BlobsUsageCache, pieces *Store, interval time.Duration) *CacheService {
return &CacheService{
log: log,
usageCache: usageCache,
store: pieces,
loop: *sync2.NewCycle(interval),
}
}
// Run recalculates the space used cache once and also runs a loop to sync the space used cache
// to persistent storage on an interval
func (service *CacheService) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
totalsAtStart := service.usageCache.copyCacheTotals()
// recalculate the cache once
piecesTotal, piecesContentSize, totalsBySatellite, err := service.store.SpaceUsedTotalAndBySatellite(ctx)
if err != nil {
service.log.Error("error getting current space used calculation: ", zap.Error(err))
}
trashTotal, err := service.store.SpaceUsedForTrash(ctx)
if err != nil {
service.log.Error("error getting current space for trash: ", zap.Error(err))
}
service.usageCache.Recalculate(
piecesTotal,
totalsAtStart.piecesTotal,
piecesContentSize,
totalsAtStart.piecesContentSize,
trashTotal,
totalsAtStart.trashTotal,
totalsBySatellite,
totalsAtStart.spaceUsedBySatellite,
)
if err = service.store.spaceUsedDB.Init(ctx); err != nil {
service.log.Error("error during init space usage db: ", zap.Error(err))
}
return service.loop.Run(ctx, func(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
// on a loop sync the cache values to the db so that we have the them saved
// in the case that the storagenode restarts
if err := service.PersistCacheTotals(ctx); err != nil {
service.log.Error("error persisting cache totals to the database: ", zap.Error(err))
}
return err
})
}
// PersistCacheTotals saves the current totals of the space used cache to the database
// so that if the storagenode restarts it can retrieve the latest space used
// values without needing to recalculate since that could take a long time
func (service *CacheService) PersistCacheTotals(ctx context.Context) error {
cache := service.usageCache
cache.mu.Lock()
defer cache.mu.Unlock()
if err := service.store.spaceUsedDB.UpdatePieceTotals(ctx, cache.piecesTotal, cache.piecesContentSize); err != nil {
return err
}
if err := service.store.spaceUsedDB.UpdatePieceTotalsForAllSatellites(ctx, cache.spaceUsedBySatellite); err != nil {
return err
}
if err := service.store.spaceUsedDB.UpdateTrashTotal(ctx, cache.trashTotal); err != nil {
return err
}
return nil
}
// Init initializes the space used cache with the most recent values that were stored persistently
func (service *CacheService) Init(ctx context.Context) (err error) {
piecesTotal, piecesContentSize, err := service.store.spaceUsedDB.GetPieceTotals(ctx)
if err != nil {
service.log.Error("CacheServiceInit error during initializing space usage cache GetTotal:", zap.Error(err))
return err
}
totalsBySatellite, err := service.store.spaceUsedDB.GetPieceTotalsForAllSatellites(ctx)
if err != nil {
service.log.Error("CacheServiceInit error during initializing space usage cache GetTotalsForAllSatellites:", zap.Error(err))
return err
}
trashTotal, err := service.store.spaceUsedDB.GetTrashTotal(ctx)
if err != nil {
service.log.Error("CacheServiceInit error during initializing space usage cache GetTrashTotal:", zap.Error(err))
return err
}
service.usageCache.init(piecesTotal, piecesContentSize, trashTotal, totalsBySatellite)
return nil
}
// Close closes the loop
func (service *CacheService) Close() (err error) {
service.loop.Close()
return nil
}
// BlobsUsageCache is a blob storage with a cache for storing
// totals of current space used
//
// architecture: Database
type BlobsUsageCache struct {
storage.Blobs
mu sync.Mutex
piecesTotal int64
piecesContentSize int64
trashTotal int64
spaceUsedBySatellite map[storj.NodeID]SatelliteUsage
}
// NewBlobsUsageCache creates a new disk blob store with a space used cache
func NewBlobsUsageCache(blob storage.Blobs) *BlobsUsageCache {
return &BlobsUsageCache{
Blobs: blob,
spaceUsedBySatellite: map[storj.NodeID]SatelliteUsage{},
}
}
// NewBlobsUsageCacheTest creates a new disk blob store with a space used cache
func NewBlobsUsageCacheTest(blob storage.Blobs, piecesTotal, piecesContentSize, trashTotal int64, spaceUsedBySatellite map[storj.NodeID]SatelliteUsage) *BlobsUsageCache {
return &BlobsUsageCache{
Blobs: blob,
piecesTotal: piecesTotal,
piecesContentSize: piecesContentSize,
trashTotal: trashTotal,
spaceUsedBySatellite: spaceUsedBySatellite,
}
}
func (blobs *BlobsUsageCache) init(pieceTotal, contentSize, trashTotal int64, totalsBySatellite map[storj.NodeID]SatelliteUsage) {
blobs.mu.Lock()
defer blobs.mu.Unlock()
blobs.piecesTotal = pieceTotal
blobs.piecesContentSize = contentSize
blobs.trashTotal = trashTotal
blobs.spaceUsedBySatellite = totalsBySatellite
}
// SpaceUsedBySatellite returns the current total space used for a specific
// satellite for all pieces
func (blobs *BlobsUsageCache) SpaceUsedBySatellite(ctx context.Context, satelliteID storj.NodeID) (piecesTotal int64, piecesContentSize int64, err error) {
blobs.mu.Lock()
defer blobs.mu.Unlock()
values := blobs.spaceUsedBySatellite[satelliteID]
return values.Total, values.ContentSize, nil
}
// SpaceUsedForPieces returns the current total used space for
//// all pieces
func (blobs *BlobsUsageCache) SpaceUsedForPieces(ctx context.Context) (int64, int64, error) {
blobs.mu.Lock()
defer blobs.mu.Unlock()
return blobs.piecesTotal, blobs.piecesContentSize, nil
}
// SpaceUsedForTrash returns the current total used space for the trash dir
func (blobs *BlobsUsageCache) SpaceUsedForTrash(ctx context.Context) (int64, error) {
blobs.mu.Lock()
defer blobs.mu.Unlock()
return blobs.trashTotal, nil
}
// Delete gets the size of the piece that is going to be deleted then deletes it and
// updates the space used cache accordingly
func (blobs *BlobsUsageCache) Delete(ctx context.Context, blobRef storage.BlobRef) error {
pieceTotal, pieceContentSize, err := blobs.pieceSizes(ctx, blobRef)
if err != nil {
return Error.Wrap(err)
}
if err := blobs.Blobs.Delete(ctx, blobRef); err != nil {
return Error.Wrap(err)
}
satelliteID, err := storj.NodeIDFromBytes(blobRef.Namespace)
if err != nil {
return err
}
blobs.Update(ctx, satelliteID, -pieceTotal, -pieceContentSize, 0)
return nil
}
func (blobs *BlobsUsageCache) pieceSizes(ctx context.Context, blobRef storage.BlobRef) (pieceTotal int64, pieceContentSize int64, err error) {
blobInfo, err := blobs.Stat(ctx, blobRef)
if err != nil {
return 0, 0, err
}
pieceAccess, err := newStoredPieceAccess(nil, blobInfo)
if err != nil {
return 0, 0, err
}
return pieceAccess.Size(ctx)
}
// Update updates the cache totals
func (blobs *BlobsUsageCache) Update(ctx context.Context, satelliteID storj.NodeID, piecesTotalDelta, piecesContentSizeDelta, trashDelta int64) {
blobs.mu.Lock()
defer blobs.mu.Unlock()
blobs.piecesTotal += piecesTotalDelta
blobs.piecesContentSize += piecesContentSizeDelta
blobs.trashTotal += trashDelta
oldVals := blobs.spaceUsedBySatellite[satelliteID]
blobs.spaceUsedBySatellite[satelliteID] = SatelliteUsage{
Total: oldVals.Total + piecesTotalDelta,
ContentSize: oldVals.ContentSize + piecesContentSizeDelta,
}
}
// Trash moves the ref to the trash and updates the cache
func (blobs *BlobsUsageCache) Trash(ctx context.Context, blobRef storage.BlobRef) error {
pieceTotal, pieceContentSize, err := blobs.pieceSizes(ctx, blobRef)
if err != nil {
return Error.Wrap(err)
}
err = blobs.Blobs.Trash(ctx, blobRef)
if err != nil {
return Error.Wrap(err)
}
satelliteID, err := storj.NodeIDFromBytes(blobRef.Namespace)
if err != nil {
return Error.Wrap(err)
}
blobs.Update(ctx, satelliteID, -pieceTotal, -pieceContentSize, pieceTotal)
return nil
}
// EmptyTrash empties the trash and updates the cache
func (blobs *BlobsUsageCache) EmptyTrash(ctx context.Context, namespace []byte, trashedBefore time.Time) (int64, [][]byte, error) {
satelliteID, err := storj.NodeIDFromBytes(namespace)
if err != nil {
return 0, nil, err
}
bytesEmptied, keys, err := blobs.Blobs.EmptyTrash(ctx, namespace, trashedBefore)
if err != nil {
return 0, nil, err
}
blobs.Update(ctx, satelliteID, 0, 0, -bytesEmptied)
return bytesEmptied, keys, nil
}
// RestoreTrash restores the trash for the namespace and updates the cache
func (blobs *BlobsUsageCache) RestoreTrash(ctx context.Context, namespace []byte) ([][]byte, error) {
satelliteID, err := storj.NodeIDFromBytes(namespace)
if err != nil {
return nil, err
}
keysRestored, err := blobs.Blobs.RestoreTrash(ctx, namespace)
if err != nil {
return nil, err
}
for _, key := range keysRestored {
pieceTotal, pieceContentSize, sizeErr := blobs.pieceSizes(ctx, storage.BlobRef{
Key: key,
Namespace: namespace,
})
if sizeErr != nil {
err = errs.Combine(err, sizeErr)
continue
}
blobs.Update(ctx, satelliteID, pieceTotal, pieceContentSize, -pieceTotal)
}
return keysRestored, err
}
func (blobs *BlobsUsageCache) copyCacheTotals() BlobsUsageCache {
blobs.mu.Lock()
defer blobs.mu.Unlock()
var copyMap = map[storj.NodeID]SatelliteUsage{}
for k, v := range blobs.spaceUsedBySatellite {
copyMap[k] = v
}
return BlobsUsageCache{
piecesTotal: blobs.piecesTotal,
piecesContentSize: blobs.piecesContentSize,
trashTotal: blobs.trashTotal,
spaceUsedBySatellite: copyMap,
}
}
// Recalculate estimates new totals for the space used cache. In order to get new totals for the
// space used cache, we had to iterate over all the pieces on disk. Since that can potentially take
// a long time, here we need to check if we missed any additions/deletions while we were iterating and
// estimate how many bytes missed then add those to the space used result of iteration.
func (blobs *BlobsUsageCache) Recalculate(
piecesTotal,
piecesTotalAtStart,
piecesContentSize,
piecesContentSizeAtStart,
trashTotal,
trashTotalAtStart int64,
totalsBySatellite,
totalsBySatelliteAtStart map[storj.NodeID]SatelliteUsage,
) {
totalsAtEnd := blobs.copyCacheTotals()
estimatedPiecesTotal := estimate(
piecesTotal,
piecesTotalAtStart,
totalsAtEnd.piecesTotal,
)
estimatedTotalTrash := estimate(
trashTotal,
trashTotalAtStart,
totalsAtEnd.trashTotal,
)
estimatedPiecesContentSize := estimate(
piecesContentSize,
piecesContentSizeAtStart,
totalsAtEnd.piecesContentSize,
)
var estimatedTotalsBySatellite = map[storj.NodeID]SatelliteUsage{}
for ID, values := range totalsBySatellite {
estimatedTotal := estimate(
values.Total,
totalsBySatelliteAtStart[ID].Total,
totalsAtEnd.spaceUsedBySatellite[ID].Total,
)
estimatedPiecesContentSize := estimate(
values.ContentSize,
totalsBySatelliteAtStart[ID].ContentSize,
totalsAtEnd.spaceUsedBySatellite[ID].ContentSize,
)
// if the estimatedTotal is zero then there is no data stored
// for this satelliteID so don't add it to the cache
if estimatedTotal == 0 && estimatedPiecesContentSize == 0 {
continue
}
estimatedTotalsBySatellite[ID] = SatelliteUsage{
Total: estimatedTotal,
ContentSize: estimatedPiecesContentSize,
}
}
// find any saIDs that are in totalsAtEnd but not in totalsBySatellite
missedWhenIterationEnded := getMissed(totalsAtEnd.spaceUsedBySatellite,
totalsBySatellite,
)
if len(missedWhenIterationEnded) > 0 {
for ID := range missedWhenIterationEnded {
estimatedTotal := estimate(
0,
totalsBySatelliteAtStart[ID].Total,
totalsAtEnd.spaceUsedBySatellite[ID].Total,
)
estimatedPiecesContentSize := estimate(
0,
totalsBySatelliteAtStart[ID].ContentSize,
totalsAtEnd.spaceUsedBySatellite[ID].ContentSize,
)
if estimatedTotal == 0 && estimatedPiecesContentSize == 0 {
continue
}
estimatedTotalsBySatellite[ID] = SatelliteUsage{
Total: estimatedTotal,
ContentSize: estimatedPiecesContentSize,
}
}
}
blobs.mu.Lock()
blobs.piecesTotal = estimatedPiecesTotal
blobs.piecesContentSize = estimatedPiecesContentSize
blobs.trashTotal = estimatedTotalTrash
blobs.spaceUsedBySatellite = estimatedTotalsBySatellite
blobs.mu.Unlock()
}
func estimate(newSpaceUsedTotal, totalAtIterationStart, totalAtIterationEnd int64) int64 {
if newSpaceUsedTotal == totalAtIterationEnd {
return newSpaceUsedTotal
}
// If we missed writes/deletes while iterating, we will assume that half of those missed occurred before
// the iteration and half occurred after. So here we add half of the delta to the result space used totals
// from the iteration to account for those missed.
spaceUsedDeltaDuringIteration := totalAtIterationEnd - totalAtIterationStart
estimatedTotal := newSpaceUsedTotal + (spaceUsedDeltaDuringIteration / 2)
if estimatedTotal < 0 {
return 0
}
return estimatedTotal
}
func getMissed(endTotals, newTotals map[storj.NodeID]SatelliteUsage) map[storj.NodeID]SatelliteUsage {
var missed = map[storj.NodeID]SatelliteUsage{}
for id, vals := range endTotals {
if _, ok := newTotals[id]; !ok {
missed[id] = vals
}
}
return missed
}
// Close satisfies the pieces interface
func (blobs *BlobsUsageCache) Close() error {
return nil
}
// TestCreateV0 creates a new V0 blob that can be written. This is only appropriate in test situations.
func (blobs *BlobsUsageCache) TestCreateV0(ctx context.Context, ref storage.BlobRef) (_ storage.BlobWriter, err error) {
fStore := blobs.Blobs.(interface {
TestCreateV0(ctx context.Context, ref storage.BlobRef) (_ storage.BlobWriter, err error)
})
return fStore.TestCreateV0(ctx, ref)
}