storj/satellite/orders/rollups_write_cache.go

240 lines
6.8 KiB
Go
Raw Normal View History

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package orders
import (
"context"
"sync"
"time"
"go.uber.org/zap"
"storj.io/common/pb"
"storj.io/common/sync2"
"storj.io/common/uuid"
)
// CacheData stores the amount of inline and allocated data
// for a bucket bandwidth rollup.
type CacheData struct {
Inline int64
Allocated int64
Settled int64
Dead int64
}
// CacheKey is the key information for the cached map below.
type CacheKey struct {
ProjectID uuid.UUID
BucketName string
Action pb.PieceAction
IntervalStart int64
}
// RollupData contains the pending rollups waiting to be flushed to the db.
type RollupData map[CacheKey]CacheData
// RollupsWriteCache stores information needed to update bucket bandwidth rollups.
type RollupsWriteCache struct {
DB
batchSize int
wg sync.WaitGroup
log *zap.Logger
mu sync.Mutex
pendingRollups RollupData
stopped bool
flushing bool
nextFlushCompletion *sync2.Fence
}
// NewRollupsWriteCache creates an RollupsWriteCache.
func NewRollupsWriteCache(log *zap.Logger, db DB, batchSize int) *RollupsWriteCache {
return &RollupsWriteCache{
DB: db,
batchSize: batchSize,
log: log,
pendingRollups: make(RollupData),
nextFlushCompletion: new(sync2.Fence),
}
}
// UpdateBucketBandwidthAllocation updates the rollups cache adding allocated data for a bucket bandwidth rollup.
func (cache *RollupsWriteCache) UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error {
return cache.updateCacheValue(ctx, projectID, bucketName, action, amount, 0, 0, 0, intervalStart.UTC())
}
// UpdateBucketBandwidthInline updates the rollups cache adding inline data for a bucket bandwidth rollup.
func (cache *RollupsWriteCache) UpdateBucketBandwidthInline(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error {
return cache.updateCacheValue(ctx, projectID, bucketName, action, 0, amount, 0, 0, intervalStart.UTC())
}
// UpdateBucketBandwidthSettle updates the rollups cache adding settled data for a bucket bandwidth rollup - deadAmount is not used.
func (cache *RollupsWriteCache) UpdateBucketBandwidthSettle(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, settledAmount, deadAmount int64, intervalStart time.Time) error {
return cache.updateCacheValue(ctx, projectID, bucketName, action, 0, 0, settledAmount, deadAmount, intervalStart.UTC())
}
// resetCache should only be called after you have acquired the cache lock. It
// will reset the various cache values and return the pendingRollups.
func (cache *RollupsWriteCache) resetCache() RollupData {
pendingRollups := cache.pendingRollups
cache.pendingRollups = make(RollupData)
return pendingRollups
}
// Flush resets cache then flushes the everything in the rollups write cache to the database.
func (cache *RollupsWriteCache) Flush(ctx context.Context) {
defer mon.Task()(&ctx)(nil)
cache.mu.Lock()
// while we're already flushing, wait for it to complete.
for cache.flushing {
done := cache.nextFlushCompletion.Done()
cache.mu.Unlock()
select {
case <-done:
case <-ctx.Done():
return
}
cache.mu.Lock()
}
cache.flushing = true
pendingRollups := cache.resetCache()
cache.mu.Unlock()
cache.flush(ctx, pendingRollups)
}
// CloseAndFlush flushes anything in the cache and marks the cache as stopped.
func (cache *RollupsWriteCache) CloseAndFlush(ctx context.Context) error {
cache.mu.Lock()
cache.stopped = true
cache.mu.Unlock()
cache.wg.Wait()
cache.Flush(ctx)
return nil
}
// flush flushes the everything in the rollups write cache to the database.
func (cache *RollupsWriteCache) flush(ctx context.Context, pendingRollups RollupData) {
defer mon.Task()(&ctx)(nil)
if len(pendingRollups) > 0 {
rollups := make([]BucketBandwidthRollup, 0, len(pendingRollups))
for cacheKey, cacheData := range pendingRollups {
rollups = append(rollups, BucketBandwidthRollup{
ProjectID: cacheKey.ProjectID,
BucketName: cacheKey.BucketName,
IntervalStart: time.Unix(cacheKey.IntervalStart, 0),
Action: cacheKey.Action,
Inline: cacheData.Inline,
Allocated: cacheData.Allocated,
Settled: cacheData.Settled,
Dead: cacheData.Dead,
})
}
err := cache.DB.UpdateBandwidthBatch(ctx, rollups)
if err != nil {
mon.Event("rollups_write_cache_flush_lost")
cache.log.Error("MONEY LOST! Bucket bandwidth rollup batch flush failed", zap.Error(err))
}
}
cache.mu.Lock()
defer cache.mu.Unlock()
cache.nextFlushCompletion.Release()
cache.nextFlushCompletion = new(sync2.Fence)
cache.flushing = false
}
func (cache *RollupsWriteCache) updateCacheValue(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, allocated, inline, settled, dead int64, intervalStart time.Time) error {
defer mon.Task()(&ctx)(nil)
cache.mu.Lock()
defer cache.mu.Unlock()
if cache.stopped {
return Error.New("RollupsWriteCache is stopped")
}
key := CacheKey{
ProjectID: projectID,
BucketName: string(bucketName),
Action: action,
IntervalStart: time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), intervalStart.Hour(), 0, 0, 0, intervalStart.Location()).Unix(),
}
// pevent unbounded memory memory growth if we're not flushing fast enough
// to keep up with incoming writes.
data, ok := cache.pendingRollups[key]
if !ok && len(cache.pendingRollups) >= cache.batchSize {
mon.Event("rollups_write_cache_update_lost")
cache.log.Error("MONEY LOST! Flushing too slow to keep up with demand")
} else {
data.Allocated += allocated
data.Inline += inline
data.Settled += settled
data.Dead += dead
cache.pendingRollups[key] = data
}
if len(cache.pendingRollups) < cache.batchSize {
return nil
}
if !cache.flushing {
cache.flushing = true
pendingRollups := cache.resetCache()
cache.wg.Add(1)
go func() {
defer cache.wg.Done()
cache.flush(ctx, pendingRollups)
}()
}
return nil
}
// OnNextFlush waits until the next time a flush call is made, then closes
// the returned channel.
func (cache *RollupsWriteCache) OnNextFlush() <-chan struct{} {
cache.mu.Lock()
defer cache.mu.Unlock()
return cache.nextFlushCompletion.Done()
}
// CurrentSize returns the current size of the cache.
func (cache *RollupsWriteCache) CurrentSize() int {
cache.mu.Lock()
defer cache.mu.Unlock()
return len(cache.pendingRollups)
}
// CurrentData returns the contents of the cache.
func (cache *RollupsWriteCache) CurrentData() RollupData {
cache.mu.Lock()
defer cache.mu.Unlock()
copyCache := RollupData{}
for k, v := range cache.pendingRollups {
copyCache[k] = v
}
return copyCache
}