3b99f03780
Change-Id: Ib14303fc9f97a133410e2d6e2cf532e468b3dcee
203 lines
5.7 KiB
Go
203 lines
5.7 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package orders
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/skyrings/skyring-common/tools/uuid"
|
|
"go.uber.org/zap"
|
|
|
|
"storj.io/common/pb"
|
|
"storj.io/common/sync2"
|
|
)
|
|
|
|
// CacheData stores the amount of inline and allocated data
|
|
// for a bucket bandwidth rollup
|
|
type CacheData struct {
|
|
Inline int64
|
|
Allocated int64
|
|
}
|
|
|
|
// CacheKey is the key information for the cached map below
|
|
type CacheKey struct {
|
|
ProjectID uuid.UUID
|
|
BucketName string
|
|
Action pb.PieceAction
|
|
}
|
|
|
|
// RollupData contains the pending rollups waiting to be flushed to the db
|
|
type RollupData map[CacheKey]CacheData
|
|
|
|
// RollupsWriteCache stores information needed to update bucket bandwidth rollups
|
|
type RollupsWriteCache struct {
|
|
DB
|
|
batchSize int
|
|
currentSize int
|
|
latestTime time.Time
|
|
|
|
log *zap.Logger
|
|
mu sync.Mutex
|
|
pendingRollups RollupData
|
|
|
|
nextFlushCompletion *sync2.Fence
|
|
}
|
|
|
|
// NewRollupsWriteCache creates an RollupsWriteCache
|
|
func NewRollupsWriteCache(log *zap.Logger, db DB, batchSize int) *RollupsWriteCache {
|
|
return &RollupsWriteCache{
|
|
DB: db,
|
|
batchSize: batchSize,
|
|
log: log,
|
|
pendingRollups: make(RollupData),
|
|
nextFlushCompletion: new(sync2.Fence),
|
|
}
|
|
}
|
|
|
|
// UpdateBucketBandwidthAllocation updates the rollups cache adding allocated data for a bucket bandwidth rollup
|
|
func (cache *RollupsWriteCache) UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error {
|
|
cache.updateCacheValue(ctx, projectID, bucketName, action, amount, 0, intervalStart.UTC())
|
|
return nil
|
|
}
|
|
|
|
// UpdateBucketBandwidthInline updates the rollups cache adding inline data for a bucket bandwidth rollup
|
|
func (cache *RollupsWriteCache) UpdateBucketBandwidthInline(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error {
|
|
cache.updateCacheValue(ctx, projectID, bucketName, action, 0, amount, intervalStart.UTC())
|
|
return nil
|
|
}
|
|
|
|
// FlushToDB resets cache then flushes the everything in the rollups write cache to the database
|
|
func (cache *RollupsWriteCache) FlushToDB(ctx context.Context) {
|
|
defer mon.Task()(&ctx)(nil)
|
|
|
|
cache.mu.Lock()
|
|
defer cache.mu.Unlock()
|
|
pendingRollups := cache.pendingRollups
|
|
cache.pendingRollups = make(RollupData)
|
|
oldSize := cache.currentSize
|
|
cache.currentSize = 0
|
|
latestTime := cache.latestTime
|
|
cache.latestTime = time.Time{}
|
|
go cache.flushToDB(ctx, pendingRollups, latestTime, oldSize)
|
|
}
|
|
|
|
// flushToDB flushes the everything in the rollups write cache to the database
|
|
func (cache *RollupsWriteCache) flushToDB(ctx context.Context, pendingRollups RollupData, latestTime time.Time, oldSize int) {
|
|
defer mon.Task()(&ctx)(nil)
|
|
|
|
rollups := make([]BandwidthRollup, 0, oldSize)
|
|
for cacheKey, cacheData := range pendingRollups {
|
|
rollups = append(rollups, BandwidthRollup{
|
|
ProjectID: cacheKey.ProjectID,
|
|
BucketName: cacheKey.BucketName,
|
|
Action: cacheKey.Action,
|
|
Inline: cacheData.Inline,
|
|
Allocated: cacheData.Allocated,
|
|
})
|
|
}
|
|
|
|
SortRollups(rollups)
|
|
|
|
err := cache.DB.UpdateBucketBandwidthBatch(ctx, latestTime.UTC(), rollups)
|
|
if err != nil {
|
|
cache.log.Error("MONEY LOST! Bucket bandwidth rollup batch flush failed.", zap.Error(err))
|
|
}
|
|
|
|
var completion *sync2.Fence
|
|
cache.mu.Lock()
|
|
cache.nextFlushCompletion, completion = new(sync2.Fence), cache.nextFlushCompletion
|
|
cache.mu.Unlock()
|
|
completion.Release()
|
|
}
|
|
|
|
// SortRollups sorts the rollups
|
|
func SortRollups(rollups []BandwidthRollup) {
|
|
sort.SliceStable(rollups, func(i, j int) bool {
|
|
uuidCompare := bytes.Compare(rollups[i].ProjectID[:], rollups[j].ProjectID[:])
|
|
switch {
|
|
case uuidCompare == -1:
|
|
return true
|
|
case uuidCompare == 1:
|
|
return false
|
|
case rollups[i].BucketName < rollups[j].BucketName:
|
|
return true
|
|
case rollups[i].BucketName > rollups[j].BucketName:
|
|
return false
|
|
case rollups[i].Action < rollups[j].Action:
|
|
return true
|
|
case rollups[i].Action > rollups[j].Action:
|
|
return false
|
|
default:
|
|
return false
|
|
}
|
|
})
|
|
}
|
|
|
|
func (cache *RollupsWriteCache) updateCacheValue(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, allocated, inline int64, intervalStart time.Time) {
|
|
defer mon.Task()(&ctx)(nil)
|
|
|
|
cache.mu.Lock()
|
|
defer cache.mu.Unlock()
|
|
|
|
if intervalStart.After(cache.latestTime) {
|
|
cache.latestTime = intervalStart
|
|
}
|
|
|
|
key := CacheKey{
|
|
ProjectID: projectID,
|
|
BucketName: string(bucketName),
|
|
Action: action,
|
|
}
|
|
|
|
data, ok := cache.pendingRollups[key]
|
|
if !ok {
|
|
cache.currentSize++
|
|
}
|
|
data.Allocated += allocated
|
|
data.Inline += inline
|
|
cache.pendingRollups[key] = data
|
|
|
|
if cache.currentSize < cache.batchSize {
|
|
return
|
|
}
|
|
pendingRollups := cache.pendingRollups
|
|
cache.pendingRollups = make(RollupData)
|
|
oldSize := cache.currentSize
|
|
cache.currentSize = 0
|
|
latestTime := cache.latestTime
|
|
cache.latestTime = time.Time{}
|
|
go cache.flushToDB(ctx, pendingRollups, latestTime, oldSize)
|
|
}
|
|
|
|
// OnNextFlush waits until the next time a flushToDB call is made, then closes
|
|
// the returned channel.
|
|
func (cache *RollupsWriteCache) OnNextFlush() <-chan struct{} {
|
|
cache.mu.Lock()
|
|
fence := cache.nextFlushCompletion
|
|
cache.mu.Unlock()
|
|
return fence.Done()
|
|
}
|
|
|
|
// CurrentSize returns the current size of the cache.
|
|
func (cache *RollupsWriteCache) CurrentSize() int {
|
|
cache.mu.Lock()
|
|
defer cache.mu.Unlock()
|
|
return cache.currentSize
|
|
}
|
|
|
|
// CurrentData returns the contents of the cache.
|
|
func (cache *RollupsWriteCache) CurrentData() RollupData {
|
|
cache.mu.Lock()
|
|
defer cache.mu.Unlock()
|
|
copyCache := RollupData{}
|
|
for k, v := range cache.pendingRollups {
|
|
copyCache[k] = v
|
|
}
|
|
return copyCache
|
|
}
|