satellite/{accounting,orders,satellitedb}: group bucket bandwidth rollups by time window

Batching of the order submissions can lead to combining the allocated
traffic totals for two completely different time windows, resulting
in incorrect customer accounting. This change will group the batched
order submissions by projectID as well as time window, leading to
distinct updates of a buckets bandwidth rollup based on the hour
window in which the order was created.

Change-Id: Ifb4d67923eec8a533b9758379914f17ff7abea32
This commit is contained in:
dlamarmorgan 2021-11-23 14:50:42 -08:00 committed by Damein Morgan
parent 5d3085f38d
commit ab37b65cfc
9 changed files with 279 additions and 130 deletions

2
go.mod
View File

@ -51,7 +51,7 @@ require (
storj.io/common v0.0.0-20211217122906-6be0b96ce7e0
storj.io/drpc v0.0.26
storj.io/monkit-jaeger v0.0.0-20210426161729-debb1cbcbbd7
storj.io/private v0.0.0-20211029202355-a7eae71c382a
storj.io/private v0.0.0-20211209191323-6595d4aa0cfe
storj.io/uplink v1.7.1-0.20211031201307-b30e004c1ccb
)

4
go.sum
View File

@ -894,7 +894,7 @@ storj.io/drpc v0.0.26/go.mod h1:ofQUDPQbbIymRDKE0tms48k8bLP5Y+dsI9CbXGv3gko=
storj.io/monkit-jaeger v0.0.0-20210225162224-66fb37637bf6/go.mod h1:gj4vuCeyCRjRmH8LIrgoyU9Dc9uR6H+/GcDUXmTbf80=
storj.io/monkit-jaeger v0.0.0-20210426161729-debb1cbcbbd7 h1:zi0w9zoBfvuqysSAqxJT1Ton2YB5IhyMM3/3CISjlrQ=
storj.io/monkit-jaeger v0.0.0-20210426161729-debb1cbcbbd7/go.mod h1:gj4vuCeyCRjRmH8LIrgoyU9Dc9uR6H+/GcDUXmTbf80=
storj.io/private v0.0.0-20211029202355-a7eae71c382a h1:cWPChMGma5Cw5rdGuKrlc+XFxjisRVAXfa5Ny9/nxzw=
storj.io/private v0.0.0-20211029202355-a7eae71c382a/go.mod h1:BoSaGSvsC8C6Gy0FyjrHfsElJA623hLsNIyexs6vGno=
storj.io/private v0.0.0-20211209191323-6595d4aa0cfe h1:gPf2s3d247JWd/Iqzw7g8mvpdlqdBpVTsBhe6oPMkKU=
storj.io/private v0.0.0-20211209191323-6595d4aa0cfe/go.mod h1:BoSaGSvsC8C6Gy0FyjrHfsElJA623hLsNIyexs6vGno=
storj.io/uplink v1.7.1-0.20211031201307-b30e004c1ccb h1:+WaWPmWvm12StirZ/b2xgbL9lnB7UmRhvC8fckNN1ZI=
storj.io/uplink v1.7.1-0.20211031201307-b30e004c1ccb/go.mod h1:XvtDUEQplm3nG76Qo+uR59J7Rrwm9Q40XOwGpPSpF2g=

View File

@ -203,10 +203,10 @@ func TestProjectBandwidthRollups(t *testing.T) {
require.NoError(t, err)
rollups := []orders.BucketBandwidthRollup{
{ProjectID: p1, BucketName: string(b1), Action: pb.PieceAction_GET, Inline: 1000, Allocated: 1000 /* counted */, Settled: 1000},
{ProjectID: p1, BucketName: string(b2), Action: pb.PieceAction_GET, Inline: 1000, Allocated: 1000 /* counted */, Settled: 1000},
{ProjectID: p1, BucketName: string(b1), Action: pb.PieceAction_GET, Inline: 1000, Allocated: 1000 /* counted */, Settled: 1000, IntervalStart: hour},
{ProjectID: p1, BucketName: string(b2), Action: pb.PieceAction_GET, Inline: 1000, Allocated: 1000 /* counted */, Settled: 1000, IntervalStart: hour},
}
err = db.Orders().UpdateBucketBandwidthBatch(ctx, hour, rollups)
err = db.Orders().UpdateBandwidthBatch(ctx, rollups)
require.NoError(t, err)
// allocated bandwidth should correspond to the sum of bandwidth corresponding to GET action (4000 here)
@ -246,20 +246,20 @@ func TestProjectBandwidthRollups(t *testing.T) {
require.NoError(t, err)
rollups = []orders.BucketBandwidthRollup{
{ProjectID: p1, BucketName: string(b1), Action: pb.PieceAction_PUT, Inline: 1000, Allocated: 1000, Settled: 1000},
{ProjectID: p1, BucketName: string(b2), Action: pb.PieceAction_PUT, Inline: 1000, Allocated: 1000, Settled: 1000},
{ProjectID: p1, BucketName: string(b1), Action: pb.PieceAction_PUT_GRACEFUL_EXIT, Inline: 1000, Allocated: 1000, Settled: 1000},
{ProjectID: p1, BucketName: string(b2), Action: pb.PieceAction_PUT_REPAIR, Inline: 1000, Allocated: 1000, Settled: 1000},
{ProjectID: p1, BucketName: string(b1), Action: pb.PieceAction_GET_AUDIT, Inline: 1000, Allocated: 1000, Settled: 1000},
{ProjectID: p1, BucketName: string(b2), Action: pb.PieceAction_GET_REPAIR, Inline: 1000, Allocated: 1000, Settled: 1000},
{ProjectID: p2, BucketName: string(b1), Action: pb.PieceAction_PUT, Inline: 1000, Allocated: 1000, Settled: 1000},
{ProjectID: p2, BucketName: string(b2), Action: pb.PieceAction_PUT, Inline: 1000, Allocated: 1000, Settled: 1000},
{ProjectID: p2, BucketName: string(b1), Action: pb.PieceAction_PUT_GRACEFUL_EXIT, Inline: 1000, Allocated: 1000, Settled: 1000},
{ProjectID: p2, BucketName: string(b2), Action: pb.PieceAction_PUT_REPAIR, Inline: 1000, Allocated: 1000, Settled: 1000},
{ProjectID: p2, BucketName: string(b1), Action: pb.PieceAction_GET_AUDIT, Inline: 1000, Allocated: 1000, Settled: 1000},
{ProjectID: p2, BucketName: string(b2), Action: pb.PieceAction_GET_REPAIR, Inline: 1000, Allocated: 1000, Settled: 1000},
{ProjectID: p1, BucketName: string(b1), Action: pb.PieceAction_PUT, Inline: 1000, Allocated: 1000, Settled: 1000, IntervalStart: hour},
{ProjectID: p1, BucketName: string(b2), Action: pb.PieceAction_PUT, Inline: 1000, Allocated: 1000, Settled: 1000, IntervalStart: hour},
{ProjectID: p1, BucketName: string(b1), Action: pb.PieceAction_PUT_GRACEFUL_EXIT, Inline: 1000, Allocated: 1000, Settled: 1000, IntervalStart: hour},
{ProjectID: p1, BucketName: string(b2), Action: pb.PieceAction_PUT_REPAIR, Inline: 1000, Allocated: 1000, Settled: 1000, IntervalStart: hour},
{ProjectID: p1, BucketName: string(b1), Action: pb.PieceAction_GET_AUDIT, Inline: 1000, Allocated: 1000, Settled: 1000, IntervalStart: hour},
{ProjectID: p1, BucketName: string(b2), Action: pb.PieceAction_GET_REPAIR, Inline: 1000, Allocated: 1000, Settled: 1000, IntervalStart: hour},
{ProjectID: p2, BucketName: string(b1), Action: pb.PieceAction_PUT, Inline: 1000, Allocated: 1000, Settled: 1000, IntervalStart: hour},
{ProjectID: p2, BucketName: string(b2), Action: pb.PieceAction_PUT, Inline: 1000, Allocated: 1000, Settled: 1000, IntervalStart: hour},
{ProjectID: p2, BucketName: string(b1), Action: pb.PieceAction_PUT_GRACEFUL_EXIT, Inline: 1000, Allocated: 1000, Settled: 1000, IntervalStart: hour},
{ProjectID: p2, BucketName: string(b2), Action: pb.PieceAction_PUT_REPAIR, Inline: 1000, Allocated: 1000, Settled: 1000, IntervalStart: hour},
{ProjectID: p2, BucketName: string(b1), Action: pb.PieceAction_GET_AUDIT, Inline: 1000, Allocated: 1000, Settled: 1000, IntervalStart: hour},
{ProjectID: p2, BucketName: string(b2), Action: pb.PieceAction_GET_REPAIR, Inline: 1000, Allocated: 1000, Settled: 1000, IntervalStart: hour},
}
err = db.Orders().UpdateBucketBandwidthBatch(ctx, hour, rollups)
err = db.Orders().UpdateBandwidthBatch(ctx, rollups)
require.NoError(t, err)
// things that should be partially counted (settled amount lower than allocated amount)
@ -269,10 +269,10 @@ func TestProjectBandwidthRollups(t *testing.T) {
require.NoError(t, err)
rollups = []orders.BucketBandwidthRollup{
{ProjectID: p1, BucketName: string(b1), Action: pb.PieceAction_GET, Inline: 1000, Allocated: 1000, Settled: 300, Dead: 700},
{ProjectID: p1, BucketName: string(b2), Action: pb.PieceAction_GET, Inline: 1000, Allocated: 1000, Settled: 500, Dead: 500},
{ProjectID: p1, BucketName: string(b1), Action: pb.PieceAction_GET, Inline: 1000, Allocated: 1000, Settled: 300, Dead: 700, IntervalStart: hour},
{ProjectID: p1, BucketName: string(b2), Action: pb.PieceAction_GET, Inline: 1000, Allocated: 1000, Settled: 500, Dead: 500, IntervalStart: hour},
}
err = db.Orders().UpdateBucketBandwidthBatch(ctx, hour, rollups)
err = db.Orders().UpdateBandwidthBatch(ctx, rollups)
require.NoError(t, err)
alloc, err = db.ProjectAccounting().GetProjectBandwidth(ctx, p1, now.Year(), now.Month(), now.Day(), 0)
@ -399,7 +399,7 @@ func TestProjectUsageCustomLimit(t *testing.T) {
func TestUsageRollups(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 2,
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 3,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
const (
numBuckets = 5
@ -414,9 +414,11 @@ func TestUsageRollups(t *testing.T) {
project1 := planet.Uplinks[0].Projects[0].ID
project2 := planet.Uplinks[1].Projects[0].ID
project3 := planet.Uplinks[2].Projects[0].ID
p1base := binary.BigEndian.Uint64(project1[:8]) >> 48
p2base := binary.BigEndian.Uint64(project2[:8]) >> 48
p3base := binary.BigEndian.Uint64(project3[:8]) >> 48
getValue := func(i, j int, base uint64) int64 {
a := uint64((i+1)*(j+1)) ^ base
@ -430,6 +432,8 @@ func TestUsageRollups(t *testing.T) {
pb.PieceAction_GET_REPAIR,
}
var rollups []orders.BucketBandwidthRollup
var buckets []string
for i := 0; i < numBuckets; i++ {
bucketName := fmt.Sprintf("bucket-%d", i)
@ -468,9 +472,40 @@ func TestUsageRollups(t *testing.T) {
require.NoError(t, err)
}
err = planet.Uplinks[2].CreateBucket(ctx, planet.Satellites[0], bucketName)
require.NoError(t, err)
// project 3
for _, action := range actions {
value := getValue(1, i, p2base)
rollups = append(rollups, orders.BucketBandwidthRollup{
ProjectID: project3,
BucketName: bucketName,
Action: action,
IntervalStart: now.Add(-time.Hour * 2),
Inline: value,
Allocated: value * 6,
Settled: value * 3,
})
rollups = append(rollups, orders.BucketBandwidthRollup{
ProjectID: project3,
BucketName: bucketName,
Action: action,
IntervalStart: now,
Inline: value,
Allocated: value * 6,
Settled: value * 3,
})
}
buckets = append(buckets, bucketName)
}
err := db.Orders().UpdateBandwidthBatch(ctx, rollups)
require.NoError(t, err)
for i := 0; i < tallyIntervals; i++ {
interval := start.Add(tallyInterval * time.Duration(i))
@ -484,8 +519,13 @@ func TestUsageRollups(t *testing.T) {
ProjectID: project2,
BucketName: bucket,
}
bucketLoc3 := metabase.BucketLocation{
ProjectID: project3,
BucketName: bucket,
}
value1 := getValue(i, j, p1base) * 10
value2 := getValue(i, j, p2base) * 10
value3 := getValue(i, j, p3base) * 10
tally1 := &accounting.BucketTally{
BucketLocation: bucketLoc1,
@ -503,8 +543,17 @@ func TestUsageRollups(t *testing.T) {
MetadataSize: value2,
}
tally3 := &accounting.BucketTally{
BucketLocation: bucketLoc3,
ObjectCount: value3,
TotalSegments: value3 + value3,
TotalBytes: value3 + value3,
MetadataSize: value3,
}
bucketTallies[bucketLoc1] = tally1
bucketTallies[bucketLoc2] = tally2
bucketTallies[bucketLoc3] = tally3
}
err := db.ProjectAccounting().SaveTallies(ctx, interval, bucketTallies)
@ -521,6 +570,20 @@ func TestUsageRollups(t *testing.T) {
projTotal2, err := usageRollups.GetProjectTotal(ctx, project2, start, now)
require.NoError(t, err)
require.NotNil(t, projTotal2)
projTotal3, err := usageRollups.GetProjectTotal(ctx, project3, start, now)
require.NoError(t, err)
require.NotNil(t, projTotal3)
projTotal3Prev2Hours, err := usageRollups.GetProjectTotal(ctx, project3, now.Add(-time.Hour*2), now.Add(-time.Hour*1))
require.NoError(t, err)
require.NotNil(t, projTotal3Prev2Hours)
require.NotZero(t, projTotal3Prev2Hours.Egress)
projTotal3Prev3Hours, err := usageRollups.GetProjectTotal(ctx, project3, now.Add(-time.Hour*3), now.Add(-time.Hour*2))
require.NoError(t, err)
require.NotNil(t, projTotal3Prev3Hours)
require.NotZero(t, projTotal3Prev3Hours.Egress)
})
t.Run("test bucket usage rollups", func(t *testing.T) {
@ -531,6 +594,18 @@ func TestUsageRollups(t *testing.T) {
rollups2, err := usageRollups.GetBucketUsageRollups(ctx, project2, start, now)
require.NoError(t, err)
require.NotNil(t, rollups2)
rollups3, err := usageRollups.GetBucketUsageRollups(ctx, project3, start, now)
require.NoError(t, err)
require.NotNil(t, rollups3)
rollups3Prev2Hours, err := usageRollups.GetBucketUsageRollups(ctx, project3, now.Add(-time.Hour*2), now.Add(-time.Hour*1))
require.NoError(t, err)
require.NotNil(t, rollups3Prev2Hours)
rollups3Prev3Hours, err := usageRollups.GetBucketUsageRollups(ctx, project3, now.Add(-time.Hour*3), now.Add(-time.Hour*2))
require.NoError(t, err)
require.NotNil(t, rollups3Prev3Hours)
})
t.Run("test bucket totals", func(t *testing.T) {
@ -546,6 +621,18 @@ func TestUsageRollups(t *testing.T) {
totals2, err := usageRollups.GetBucketTotals(ctx, project2, cursor, start, now)
require.NoError(t, err)
require.NotNil(t, totals2)
totals3, err := usageRollups.GetBucketTotals(ctx, project3, cursor, start, now)
require.NoError(t, err)
require.NotNil(t, totals3)
totals3Prev2Hours, err := usageRollups.GetBucketTotals(ctx, project3, cursor, now.Add(-time.Hour*2), now.Add(-time.Hour*1))
require.NoError(t, err)
require.NotNil(t, totals3Prev2Hours)
totals3Prev3Hours, err := usageRollups.GetBucketTotals(ctx, project3, cursor, now.Add(-time.Hour*3), now.Add(-time.Hour*2))
require.NoError(t, err)
require.NotNil(t, totals3Prev3Hours)
})
t.Run("Get paged", func(t *testing.T) {

View File

@ -36,8 +36,8 @@ type DB interface {
UpdateBucketBandwidthSettle(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, settledAmount, deadAmount int64, intervalStart time.Time) error
// UpdateBucketBandwidthInline updates 'inline' bandwidth for given bucket
UpdateBucketBandwidthInline(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
// UpdateBucketBandwidthBatch updates all the bandwidth rollups in the database
UpdateBucketBandwidthBatch(ctx context.Context, intervalStart time.Time, rollups []BucketBandwidthRollup) error
// UpdateBandwidthBatch updates bucket and project bandwidth rollups in the database
UpdateBandwidthBatch(ctx context.Context, rollups []BucketBandwidthRollup) error
// UpdateStoragenodeBandwidthSettle updates 'settled' bandwidth for given storage node
UpdateStoragenodeBandwidthSettle(ctx context.Context, storageNode storj.NodeID, action pb.PieceAction, amount int64, intervalStart time.Time) error
@ -84,13 +84,14 @@ var (
// BucketBandwidthRollup contains all the info needed for a bucket bandwidth rollup.
type BucketBandwidthRollup struct {
ProjectID uuid.UUID
BucketName string
Action pb.PieceAction
Inline int64
Allocated int64
Settled int64
Dead int64
ProjectID uuid.UUID
BucketName string
Action pb.PieceAction
IntervalStart time.Time
Inline int64
Allocated int64
Settled int64
Dead int64
}
// SortBucketBandwidthRollups sorts the rollups.

View File

@ -26,9 +26,10 @@ type CacheData struct {
// CacheKey is the key information for the cached map below.
type CacheKey struct {
ProjectID uuid.UUID
BucketName string
Action pb.PieceAction
ProjectID uuid.UUID
BucketName string
Action pb.PieceAction
IntervalStart int64
}
// RollupData contains the pending rollups waiting to be flushed to the db.
@ -43,7 +44,6 @@ type RollupsWriteCache struct {
mu sync.Mutex
pendingRollups RollupData
latestTime time.Time
stopped bool
flushing bool
@ -77,16 +77,12 @@ func (cache *RollupsWriteCache) UpdateBucketBandwidthSettle(ctx context.Context,
}
// resetCache should only be called after you have acquired the cache lock. It
// will reset the various cache values and return the pendingRollups,
// latestTime, and currentSize.
func (cache *RollupsWriteCache) resetCache() (RollupData, time.Time) {
// will reset the various cache values and return the pendingRollups.
func (cache *RollupsWriteCache) resetCache() RollupData {
pendingRollups := cache.pendingRollups
cache.pendingRollups = make(RollupData)
latestTime := cache.latestTime
cache.latestTime = time.Time{}
return pendingRollups, latestTime
return pendingRollups
}
// Flush resets cache then flushes the everything in the rollups write cache to the database.
@ -110,11 +106,11 @@ func (cache *RollupsWriteCache) Flush(ctx context.Context) {
}
cache.flushing = true
pendingRollups, latestTime := cache.resetCache()
pendingRollups := cache.resetCache()
cache.mu.Unlock()
cache.flush(ctx, pendingRollups, latestTime)
cache.flush(ctx, pendingRollups)
}
// CloseAndFlush flushes anything in the cache and marks the cache as stopped.
@ -130,24 +126,25 @@ func (cache *RollupsWriteCache) CloseAndFlush(ctx context.Context) error {
}
// flush flushes the everything in the rollups write cache to the database.
func (cache *RollupsWriteCache) flush(ctx context.Context, pendingRollups RollupData, latestTime time.Time) {
func (cache *RollupsWriteCache) flush(ctx context.Context, pendingRollups RollupData) {
defer mon.Task()(&ctx)(nil)
if len(pendingRollups) > 0 {
rollups := make([]BucketBandwidthRollup, 0, len(pendingRollups))
for cacheKey, cacheData := range pendingRollups {
rollups = append(rollups, BucketBandwidthRollup{
ProjectID: cacheKey.ProjectID,
BucketName: cacheKey.BucketName,
Action: cacheKey.Action,
Inline: cacheData.Inline,
Allocated: cacheData.Allocated,
Settled: cacheData.Settled,
Dead: cacheData.Dead,
ProjectID: cacheKey.ProjectID,
BucketName: cacheKey.BucketName,
IntervalStart: time.Unix(cacheKey.IntervalStart, 0),
Action: cacheKey.Action,
Inline: cacheData.Inline,
Allocated: cacheData.Allocated,
Settled: cacheData.Settled,
Dead: cacheData.Dead,
})
}
err := cache.DB.UpdateBucketBandwidthBatch(ctx, latestTime, rollups)
err := cache.DB.UpdateBandwidthBatch(ctx, rollups)
if err != nil {
mon.Event("rollups_write_cache_flush_lost")
cache.log.Error("MONEY LOST! Bucket bandwidth rollup batch flush failed.", zap.Error(err))
@ -173,9 +170,10 @@ func (cache *RollupsWriteCache) updateCacheValue(ctx context.Context, projectID
}
key := CacheKey{
ProjectID: projectID,
BucketName: string(bucketName),
Action: action,
ProjectID: projectID,
BucketName: string(bucketName),
Action: action,
IntervalStart: time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), intervalStart.Hour(), 0, 0, 0, intervalStart.Location()).Unix(),
}
// pevent unbounded memory memory growth if we're not flushing fast enough
@ -185,9 +183,6 @@ func (cache *RollupsWriteCache) updateCacheValue(ctx context.Context, projectID
mon.Event("rollups_write_cache_update_lost")
cache.log.Error("MONEY LOST! Flushing too slow to keep up with demand.")
} else {
if cache.latestTime.IsZero() || intervalStart.After(cache.latestTime) {
cache.latestTime = intervalStart
}
data.Allocated += allocated
data.Inline += inline
@ -202,12 +197,12 @@ func (cache *RollupsWriteCache) updateCacheValue(ctx context.Context, projectID
if !cache.flushing {
cache.flushing = true
pendingRollups, latestTime := cache.resetCache()
pendingRollups := cache.resetCache()
cache.wg.Add(1)
go func() {
defer cache.wg.Done()
cache.flush(ctx, pendingRollups, latestTime)
cache.flush(ctx, pendingRollups)
}()
}

View File

@ -148,9 +148,10 @@ func TestUpdateBucketBandwidth(t *testing.T) {
projectID := testrand.UUID()
bucketName := []byte("testbucketname")
amount := (memory.MB * 500).Int64()
err := ordersDB.UpdateBucketBandwidthAllocation(ctx, projectID, bucketName, pb.PieceAction_GET, amount, time.Now())
intervalStart := time.Now()
err := ordersDB.UpdateBucketBandwidthAllocation(ctx, projectID, bucketName, pb.PieceAction_GET, amount, intervalStart)
require.NoError(t, err)
err = ordersDB.UpdateBucketBandwidthSettle(ctx, projectID, bucketName, pb.PieceAction_PUT, amount, 0, time.Now())
err = ordersDB.UpdateBucketBandwidthSettle(ctx, projectID, bucketName, pb.PieceAction_PUT, amount, 0, intervalStart)
require.NoError(t, err)
// test: confirm there is one item in the cache now
@ -158,14 +159,16 @@ func TestUpdateBucketBandwidth(t *testing.T) {
require.Equal(t, 2, size)
projectMap := cache.CurrentData()
expectedKeyAllocated := orders.CacheKey{
ProjectID: projectID,
BucketName: string(bucketName),
Action: pb.PieceAction_GET,
ProjectID: projectID,
BucketName: string(bucketName),
Action: pb.PieceAction_GET,
IntervalStart: time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), intervalStart.Hour(), 0, 0, 0, intervalStart.Location()).Unix(),
}
expectedKeySettled := orders.CacheKey{
ProjectID: projectID,
BucketName: string(bucketName),
Action: pb.PieceAction_PUT,
ProjectID: projectID,
BucketName: string(bucketName),
Action: pb.PieceAction_PUT,
IntervalStart: time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), intervalStart.Hour(), 0, 0, 0, intervalStart.Location()).Unix(),
}
expectedCacheDataAllocated := orders.CacheData{
Inline: 0,
@ -177,15 +180,15 @@ func TestUpdateBucketBandwidth(t *testing.T) {
Allocated: 0,
Settled: amount,
}
require.Equal(t, projectMap[expectedKeyAllocated], expectedCacheDataAllocated)
require.Equal(t, projectMap[expectedKeySettled], expectedCacheDataSettled)
require.Equal(t, expectedCacheDataAllocated, projectMap[expectedKeyAllocated])
require.Equal(t, expectedCacheDataSettled, projectMap[expectedKeySettled])
// setup: add another item to the cache but with a different projectID
projectID2 := testrand.UUID()
amount2 := (memory.MB * 10).Int64()
err = ordersDB.UpdateBucketBandwidthAllocation(ctx, projectID2, bucketName, pb.PieceAction_GET, amount2, time.Now())
err = ordersDB.UpdateBucketBandwidthAllocation(ctx, projectID2, bucketName, pb.PieceAction_GET, amount2, intervalStart)
require.NoError(t, err)
err = ordersDB.UpdateBucketBandwidthSettle(ctx, projectID2, bucketName, pb.PieceAction_GET, amount2, 0, time.Now())
err = ordersDB.UpdateBucketBandwidthSettle(ctx, projectID2, bucketName, pb.PieceAction_GET, amount2, 0, intervalStart)
require.NoError(t, err)
size = cache.CurrentSize()
require.Equal(t, 3, size)
@ -193,9 +196,10 @@ func TestUpdateBucketBandwidth(t *testing.T) {
// test: confirm there are 3 items in the cache now with different projectIDs
expectedKey := orders.CacheKey{
ProjectID: projectID2,
BucketName: string(bucketName),
Action: pb.PieceAction_GET,
ProjectID: projectID2,
BucketName: string(bucketName),
Action: pb.PieceAction_GET,
IntervalStart: time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), intervalStart.Hour(), 0, 0, 0, intervalStart.Location()).Unix(),
}
expectedData := orders.CacheData{
Inline: 0,

View File

@ -44,6 +44,19 @@ type ordersDB struct {
db *satelliteDB
}
type bandwidth struct {
Allocated int64
Settled int64
Inline int64
Dead int64
}
type bandwidthRollupKey struct {
BucketName string
ProjectID uuid.UUID
IntervalStart int64
Action pb.PieceAction
}
// UpdateBucketBandwidthAllocation updates 'allocated' bandwidth for given bucket.
func (db *ordersDB) UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
@ -207,7 +220,8 @@ func (db *ordersDB) GetStorageNodeBandwidth(ctx context.Context, nodeID storj.No
return sum1 + sum2, nil
}
func (db *ordersDB) UpdateBucketBandwidthBatch(ctx context.Context, intervalStart time.Time, rollups []orders.BucketBandwidthRollup) (err error) {
// UpdateBandwidthBatch updates bucket and project bandwidth rollups in the database.
func (db *ordersDB) UpdateBandwidthBatch(ctx context.Context, rollups []orders.BucketBandwidthRollup) (err error) {
defer mon.Task()(&ctx)(&err)
return db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
@ -219,39 +233,24 @@ func (db *ordersDB) UpdateBucketBandwidthBatch(ctx context.Context, intervalStar
orders.SortBucketBandwidthRollups(rollups)
intervalStart = intervalStart.UTC()
intervalStart = time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), intervalStart.Hour(), 0, 0, 0, time.UTC)
bucketRUMap := rollupBandwidth(rollups, toHourlyInterval, getBucketRollupKey)
var bucketNames [][]byte
var projectIDs [][]byte
var actionSlice []int32
var inlineSlice []int64
var allocatedSlice []int64
var settledSlice []int64
inlineSlice := make([]int64, 0, len(bucketRUMap))
allocatedSlice := make([]int64, 0, len(bucketRUMap))
settledSlice := make([]int64, 0, len(bucketRUMap))
bucketNames := make([][]byte, 0, len(bucketRUMap))
projectIDs := make([]uuid.UUID, 0, len(bucketRUMap))
intervalStartSlice := make([]time.Time, 0, len(bucketRUMap))
actionSlice := make([]int32, 0, len(bucketRUMap))
type bandwidth struct {
Allocated int64
Settled int64
Dead int64
}
projectRUMap := make(map[uuid.UUID]bandwidth)
for _, rollup := range rollups {
rollup := rollup
bucketNames = append(bucketNames, []byte(rollup.BucketName))
projectIDs = append(projectIDs, rollup.ProjectID[:])
actionSlice = append(actionSlice, int32(rollup.Action))
inlineSlice = append(inlineSlice, rollup.Inline)
allocatedSlice = append(allocatedSlice, rollup.Allocated)
settledSlice = append(settledSlice, rollup.Settled)
if rollup.Action == pb.PieceAction_GET {
b := projectRUMap[rollup.ProjectID]
b.Allocated += rollup.Allocated
b.Settled += rollup.Settled
b.Dead += rollup.Dead
projectRUMap[rollup.ProjectID] = b
}
for rollupInfo, usage := range bucketRUMap {
inlineSlice = append(inlineSlice, usage.Inline)
allocatedSlice = append(allocatedSlice, usage.Allocated)
settledSlice = append(settledSlice, usage.Settled)
bucketNames = append(bucketNames, []byte(rollupInfo.BucketName))
projectIDs = append(projectIDs, rollupInfo.ProjectID)
intervalStartSlice = append(intervalStartSlice, time.Unix(rollupInfo.IntervalStart, 0))
actionSlice = append(actionSlice, int32(rollupInfo.Action))
}
_, err = tx.Tx.ExecContext(ctx, `
@ -260,44 +259,50 @@ func (db *ordersDB) UpdateBucketBandwidthBatch(ctx context.Context, intervalStar
interval_start, interval_seconds,
action, inline, allocated, settled)
SELECT
unnest($1::bytea[]), unnest($2::bytea[]),
$3, $4,
unnest($1::bytea[]), unnest($2::bytea[]), unnest($3::timestamptz[]),
$4,
unnest($5::int4[]), unnest($6::bigint[]), unnest($7::bigint[]), unnest($8::bigint[])
ON CONFLICT(bucket_name, project_id, interval_start, action)
DO UPDATE SET
allocated = bucket_bandwidth_rollups.allocated + EXCLUDED.allocated,
inline = bucket_bandwidth_rollups.inline + EXCLUDED.inline,
settled = bucket_bandwidth_rollups.settled + EXCLUDED.settled`,
pgutil.ByteaArray(bucketNames), pgutil.ByteaArray(projectIDs),
intervalStart, defaultIntervalSeconds,
pgutil.ByteaArray(bucketNames), pgutil.UUIDArray(projectIDs), pgutil.TimestampTZArray(intervalStartSlice),
defaultIntervalSeconds,
pgutil.Int4Array(actionSlice), pgutil.Int8Array(inlineSlice), pgutil.Int8Array(allocatedSlice), pgutil.Int8Array(settledSlice))
if err != nil {
db.db.log.Error("Bucket bandwidth rollup batch flush failed.", zap.Error(err))
}
projectRUIDs := make([]uuid.UUID, 0, len(projectRUMap))
var projectRUAllocated []int64
var projectRUSettled []int64
var projectRUDead []int64
dailyInterval := time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), 0, 0, 0, 0, time.UTC)
projectRUMap := rollupBandwidth(rollups, toDailyInterval, getProjectRollupKey)
for projectID, v := range projectRUMap {
projectRUIDs = append(projectRUIDs, projectID)
projectRUAllocated = append(projectRUAllocated, v.Allocated)
projectRUSettled = append(projectRUSettled, v.Settled)
projectRUDead = append(projectRUDead, v.Dead)
projectIDs = make([]uuid.UUID, 0, len(projectRUMap))
intervalStartSlice = make([]time.Time, 0, len(projectRUMap))
allocatedSlice = make([]int64, 0, len(projectRUMap))
settledSlice = make([]int64, 0, len(projectRUMap))
deadSlice := make([]int64, 0, len(projectRUMap))
for rollupInfo, usage := range projectRUMap {
if rollupInfo.Action == pb.PieceAction_GET {
allocatedSlice = append(allocatedSlice, usage.Allocated)
settledSlice = append(settledSlice, usage.Settled)
deadSlice = append(deadSlice, usage.Dead)
projectIDs = append(projectIDs, rollupInfo.ProjectID)
intervalStartSlice = append(intervalStartSlice, time.Unix(rollupInfo.IntervalStart, 0))
}
}
if len(projectRUIDs) > 0 {
if len(projectIDs) > 0 {
// TODO: explore updating project_bandwidth_daily_rollups table to use "timestamp with time zone" for interval_day
_, err = tx.Tx.ExecContext(ctx, `
INSERT INTO project_bandwidth_daily_rollups(project_id, interval_day, egress_allocated, egress_settled, egress_dead)
SELECT unnest($1::bytea[]), $2, unnest($3::bigint[]), unnest($4::bigint[]), unnest($5::bigint[])
SELECT unnest($1::bytea[]), unnest($2::date[]), unnest($3::bigint[]), unnest($4::bigint[]), unnest($5::bigint[])
ON CONFLICT(project_id, interval_day)
DO UPDATE SET
egress_allocated = project_bandwidth_daily_rollups.egress_allocated + EXCLUDED.egress_allocated::bigint,
egress_settled = project_bandwidth_daily_rollups.egress_settled + EXCLUDED.egress_settled::bigint,
egress_dead = project_bandwidth_daily_rollups.egress_dead + EXCLUDED.egress_dead::bigint
`, pgutil.UUIDArray(projectRUIDs), dailyInterval, pgutil.Int8Array(projectRUAllocated), pgutil.Int8Array(projectRUSettled), pgutil.Int8Array(projectRUDead))
`, pgutil.UUIDArray(projectIDs), pgutil.DateArray(intervalStartSlice), pgutil.Int8Array(allocatedSlice), pgutil.Int8Array(settledSlice), pgutil.Int8Array(deadSlice))
if err != nil {
db.db.log.Error("Project bandwidth daily rollup batch flush failed.", zap.Error(err))
}
@ -391,3 +396,60 @@ func SettledAmountsMatch(rows []*dbx.StoragenodeBandwidthRollup, orderActionAmou
return reflect.DeepEqual(rowsSumByAction, orderActionAmounts)
}
// toDailyInterval rounds the time stamp down to the start of the day and converts it to unix time.
func toDailyInterval(timeInterval time.Time) int64 {
return time.Date(timeInterval.Year(), timeInterval.Month(), timeInterval.Day(), 0, 0, 0, 0, timeInterval.Location()).Unix()
}
// toHourlyInterval rounds the time stamp down to the start of the hour and converts it to unix time.
func toHourlyInterval(timeInterval time.Time) int64 {
return time.Date(timeInterval.Year(), timeInterval.Month(), timeInterval.Day(), timeInterval.Hour(), 0, 0, 0, timeInterval.Location()).Unix()
}
// rollupBandwidth rollup the bandwidth statistics into a map based on the provided key, interval.
func rollupBandwidth(rollups []orders.BucketBandwidthRollup,
toInterval func(time.Time) int64,
getKey func(orders.BucketBandwidthRollup, func(time.Time) int64) bandwidthRollupKey) map[bandwidthRollupKey]bandwidth {
projectRUMap := make(map[bandwidthRollupKey]bandwidth)
for _, rollup := range rollups {
rollup := rollup
projectKey := getKey(rollup, toInterval)
if b, ok := projectRUMap[projectKey]; ok {
b.Allocated += rollup.Allocated
b.Settled += rollup.Settled
b.Inline += rollup.Inline
b.Dead += rollup.Dead
projectRUMap[projectKey] = b
} else {
projectRUMap[projectKey] = bandwidth{
Allocated: rollup.Allocated,
Settled: rollup.Settled,
Inline: rollup.Inline,
Dead: rollup.Dead,
}
}
}
return projectRUMap
}
// getBucketRollupKey return a key for use in bucket bandwidth rollup statistics.
func getBucketRollupKey(rollup orders.BucketBandwidthRollup, toInterval func(time.Time) int64) bandwidthRollupKey {
return bandwidthRollupKey{
BucketName: rollup.BucketName,
ProjectID: rollup.ProjectID,
IntervalStart: toInterval(rollup.IntervalStart),
Action: rollup.Action,
}
}
// getProjectRollupKey return a key for use in project bandwidth rollup statistics.
func getProjectRollupKey(rollup orders.BucketBandwidthRollup, toInterval func(time.Time) int64) bandwidthRollupKey {
return bandwidthRollupKey{
ProjectID: rollup.ProjectID,
IntervalStart: toInterval(rollup.IntervalStart),
Action: rollup.Action,
}
}

View File

@ -12,7 +12,7 @@ require (
go.uber.org/zap v1.17.0
storj.io/common v0.0.0-20211217122906-6be0b96ce7e0
storj.io/gateway-mt v1.14.4-0.20211015103214-01eddbc864fb
storj.io/private v0.0.0-20211029202355-a7eae71c382a
storj.io/private v0.0.0-20211209191323-6595d4aa0cfe
storj.io/storj v0.12.1-0.20211102170500-1de8a695e84a
)

View File

@ -956,7 +956,7 @@ github.com/zeebo/assert v0.0.0-20181109011804-10f827ce2ed6/go.mod h1:yssERNPivll
github.com/zeebo/assert v1.1.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
github.com/zeebo/clingy v0.0.0-20211209163509-9715de867439/go.mod h1:MHEhXvEfewflU7SSVKHI7nkdU+fpyxZ5XPPzj+5gYNw=
github.com/zeebo/clingy v0.0.0-20211216163256-6aa562dbf46b/go.mod h1:MHEhXvEfewflU7SSVKHI7nkdU+fpyxZ5XPPzj+5gYNw=
github.com/zeebo/errs v1.1.1/go.mod h1:Yj8dHrUQwls1bF3dr/vcSIu+qf4mI7idnTcHfoACc6I=
github.com/zeebo/errs v1.2.2 h1:5NFypMTuSdoySVTqlNs1dEoU21QVamMQJxW/Fii5O7g=
github.com/zeebo/errs v1.2.2/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4=
@ -1417,8 +1417,8 @@ storj.io/monkit-jaeger v0.0.0-20210225162224-66fb37637bf6/go.mod h1:gj4vuCeyCRjR
storj.io/monkit-jaeger v0.0.0-20210426161729-debb1cbcbbd7 h1:zi0w9zoBfvuqysSAqxJT1Ton2YB5IhyMM3/3CISjlrQ=
storj.io/monkit-jaeger v0.0.0-20210426161729-debb1cbcbbd7/go.mod h1:gj4vuCeyCRjRmH8LIrgoyU9Dc9uR6H+/GcDUXmTbf80=
storj.io/private v0.0.0-20210810102517-434aeab3f17d/go.mod h1:wvudoWSyOyB2daZNaMykjjqsSUad/ttFlUwgelg9+qg=
storj.io/private v0.0.0-20211029202355-a7eae71c382a h1:cWPChMGma5Cw5rdGuKrlc+XFxjisRVAXfa5Ny9/nxzw=
storj.io/private v0.0.0-20211029202355-a7eae71c382a/go.mod h1:BoSaGSvsC8C6Gy0FyjrHfsElJA623hLsNIyexs6vGno=
storj.io/private v0.0.0-20211209191323-6595d4aa0cfe h1:gPf2s3d247JWd/Iqzw7g8mvpdlqdBpVTsBhe6oPMkKU=
storj.io/private v0.0.0-20211209191323-6595d4aa0cfe/go.mod h1:BoSaGSvsC8C6Gy0FyjrHfsElJA623hLsNIyexs6vGno=
storj.io/uplink v1.6.0/go.mod h1:zqj/LFDxa6RMaSRSHOmukg3mMgesOry0iHSjNldDMGo=
storj.io/uplink v1.7.0/go.mod h1:zqj/LFDxa6RMaSRSHOmukg3mMgesOry0iHSjNldDMGo=
storj.io/uplink v1.7.1-0.20211031201307-b30e004c1ccb h1:+WaWPmWvm12StirZ/b2xgbL9lnB7UmRhvC8fckNN1ZI=