satellite/orders: Add write cache for bw rollups

Change-Id: I8ba454cb2ab4742cafd6ed09120e4240874831fc
This commit is contained in:
Isaac Hess 2020-01-10 11:53:42 -07:00 committed by Jess G
parent e1ba3931ec
commit 4950d7106a
12 changed files with 659 additions and 11 deletions

View File

@ -97,8 +97,10 @@ type SatelliteSystem struct {
}
Orders struct {
DB orders.DB
Endpoint *orders.Endpoint
Service *orders.Service
Chore *orders.Chore
}
Repair struct {
@ -320,7 +322,10 @@ func (planet *Planet) newSatellites(count int) ([]*SatelliteSystem, error) {
},
},
Orders: orders.Config{
Expiration: 7 * 24 * time.Hour,
Expiration: 7 * 24 * time.Hour,
SettlementBatchSize: 10,
FlushBatchSize: 10,
FlushInterval: 2 * time.Second,
},
Checker: checker.Config{
Interval: defaultInterval,
@ -490,8 +495,10 @@ func createNewSystem(log *zap.Logger, peer *satellite.Core, api *satellite.API,
system.Inspector.Endpoint = api.Inspector.Endpoint
system.Orders.DB = api.Orders.DB
system.Orders.Endpoint = api.Orders.Endpoint
system.Orders.Service = peer.Orders.Service
system.Orders.Chore = api.Orders.Chore
system.Repair.Checker = peer.Repair.Checker
system.Repair.Repairer = repairerPeer.Repairer

View File

@ -78,8 +78,10 @@ type API struct {
}
Orders struct {
DB orders.DB
Endpoint *orders.Endpoint
Service *orders.Service
Chore *orders.Chore
}
Metainfo struct {
@ -231,18 +233,21 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metai
}
{ // setup orders
ordersWriteCache := orders.NewRollupsWriteCache(log, peer.DB.Orders(), config.Orders.FlushBatchSize)
peer.Orders.DB = ordersWriteCache
peer.Orders.Chore = orders.NewChore(log.Named("orders chore"), ordersWriteCache, config.Orders)
satelliteSignee := signing.SigneeFromPeerIdentity(peer.Identity.PeerIdentity())
peer.Orders.Endpoint = orders.NewEndpoint(
peer.Log.Named("orders:endpoint"),
satelliteSignee,
peer.DB.Orders(),
peer.Orders.DB,
config.Orders.SettlementBatchSize,
)
peer.Orders.Service = orders.NewService(
peer.Log.Named("orders:service"),
signing.SignerFromFullIdentity(peer.Identity),
peer.Overlay.Service,
peer.DB.Orders(),
peer.Orders.DB,
config.Orders.Expiration,
&pb.NodeAddress{
Transport: pb.NodeTransport_TCP_TLS_GRPC,
@ -524,6 +529,9 @@ func (peer *API) Run(ctx context.Context) (err error) {
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Marketing.Endpoint.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Orders.Chore.Run(ctx))
})
return group.Wait()
}
@ -561,6 +569,9 @@ func (peer *API) Close() error {
if peer.Overlay.Service != nil {
errlist.Add(peer.Overlay.Service.Close())
}
if peer.Orders.Chore.Loop != nil {
errlist.Add(peer.Orders.Chore.Close())
}
return errlist.Err()
}

View File

@ -70,7 +70,9 @@ type Core struct {
}
Orders struct {
DB orders.DB
Service *orders.Service
Chore *orders.Chore
}
Repair struct {
@ -190,11 +192,14 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo
}
{ // setup orders
ordersWriteCache := orders.NewRollupsWriteCache(log, peer.DB.Orders(), config.Orders.FlushBatchSize)
peer.Orders.DB = ordersWriteCache
peer.Orders.Chore = orders.NewChore(log.Named("orders chore"), ordersWriteCache, config.Orders)
peer.Orders.Service = orders.NewService(
peer.Log.Named("orders:service"),
signing.SignerFromFullIdentity(peer.Identity),
peer.Overlay.Service,
peer.DB.Orders(),
peer.Orders.DB,
config.Orders.Expiration,
&pb.NodeAddress{
Transport: pb.NodeTransport_TCP_TLS_GRPC,
@ -431,6 +436,9 @@ func (peer *Core) Run(ctx context.Context) (err error) {
group.Go(func() error {
return errs2.IgnoreCanceled(peer.DowntimeTracking.EstimationChore.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Orders.Chore.Run(ctx))
})
return group.Wait()
}
@ -493,6 +501,9 @@ func (peer *Core) Close() error {
if peer.Metainfo.Loop != nil {
errlist.Add(peer.Metainfo.Loop.Close())
}
if peer.Orders.Chore.Loop != nil {
errlist.Add(peer.Orders.Chore.Close())
}
return errlist.Err()
}

45
satellite/orders/chore.go Normal file
View File

@ -0,0 +1,45 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package orders
import (
"context"
"go.uber.org/zap"
"storj.io/common/sync2"
)
// Chore for flushing orders write cache to the database.
//
// architecture: Chore
type Chore struct {
log *zap.Logger
ordersWriteCache *RollupsWriteCache
Loop *sync2.Cycle
}
// NewChore creates new chore for flushing the orders write cache to the database.
func NewChore(log *zap.Logger, ordersWriteCache *RollupsWriteCache, config Config) *Chore {
return &Chore{
log: log,
ordersWriteCache: ordersWriteCache,
Loop: sync2.NewCycle(config.FlushInterval),
}
}
// Run starts the orders write cache chore.
func (chore *Chore) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
return chore.Loop.Run(ctx, func(ctx context.Context) error {
chore.ordersWriteCache.FlushToDB(ctx)
return nil
})
}
// Close stops the orders write cache chore.
func (chore *Chore) Close() error {
chore.Loop.Close()
return nil
}

View File

@ -51,6 +51,18 @@ type DB interface {
// ProcessOrders takes a list of order requests and processes them in a batch
ProcessOrders(ctx context.Context, requests []*ProcessOrderRequest) (responses []*ProcessOrderResponse, err error)
// UpdateBucketBandwidthBatch updates all the bandwidth rollups in the database
UpdateBucketBandwidthBatch(ctx context.Context, intervalStart time.Time, rollups []BandwidthRollup) error
}
// BandwidthRollup contains all the info needed for a bucket bandwidth rollup
type BandwidthRollup struct {
ProjectID uuid.UUID
BucketName string
Action pb.PieceAction
Inline int64
Allocated int64
}
var (

View File

@ -0,0 +1,196 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package orders
import (
"bytes"
"context"
"sort"
"sync"
"time"
"github.com/skyrings/skyring-common/tools/uuid"
"go.uber.org/zap"
"storj.io/common/pb"
"storj.io/common/sync2"
)
// CacheData stores the amount of inline and allocated data
// for a bucket bandwidth rollup
type CacheData struct {
Inline int64
Allocated int64
}
// CacheKey is the key information for the cached map below
type CacheKey struct {
ProjectID uuid.UUID
BucketName string
Action pb.PieceAction
}
// RollupData contains the pending rollups waiting to be flushed to the db
type RollupData map[CacheKey]CacheData
// RollupsWriteCache stores information needed to update bucket bandwidth rollups
type RollupsWriteCache struct {
DB
batchSize int
currentSize int
latestTime time.Time
log *zap.Logger
mu sync.Mutex
pendingRollups RollupData
nextFlushCompletion *sync2.Fence
}
// NewRollupsWriteCache creates an RollupsWriteCache
func NewRollupsWriteCache(log *zap.Logger, db DB, batchSize int) *RollupsWriteCache {
return &RollupsWriteCache{
DB: db,
batchSize: batchSize,
log: log,
pendingRollups: make(RollupData),
nextFlushCompletion: new(sync2.Fence),
}
}
// UpdateBucketBandwidthAllocation updates the rollups cache adding allocated data for a bucket bandwidth rollup
func (cache *RollupsWriteCache) UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error {
cache.updateCacheValue(ctx, projectID, bucketName, action, amount, 0, intervalStart.UTC())
return nil
}
// UpdateBucketBandwidthInline updates the rollups cache adding inline data for a bucket bandwidth rollup
func (cache *RollupsWriteCache) UpdateBucketBandwidthInline(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error {
cache.updateCacheValue(ctx, projectID, bucketName, action, 0, amount, intervalStart.UTC())
return nil
}
// FlushToDB resets cache then flushes the everything in the rollups write cache to the database
func (cache *RollupsWriteCache) FlushToDB(ctx context.Context) {
cache.mu.Lock()
defer cache.mu.Unlock()
pendingRollups := cache.pendingRollups
cache.pendingRollups = make(RollupData)
oldSize := cache.currentSize
cache.currentSize = 0
latestTime := cache.latestTime
cache.latestTime = time.Time{}
go cache.flushToDB(ctx, pendingRollups, latestTime, oldSize)
}
// flushToDB flushes the everything in the rollups write cache to the database
func (cache *RollupsWriteCache) flushToDB(ctx context.Context, pendingRollups RollupData, latestTime time.Time, oldSize int) {
rollups := make([]BandwidthRollup, 0, oldSize)
for cacheKey, cacheData := range pendingRollups {
rollups = append(rollups, BandwidthRollup{
ProjectID: cacheKey.ProjectID,
BucketName: cacheKey.BucketName,
Action: cacheKey.Action,
Inline: cacheData.Inline,
Allocated: cacheData.Allocated,
})
}
SortRollups(rollups)
err := cache.DB.UpdateBucketBandwidthBatch(ctx, latestTime.UTC(), rollups)
if err != nil {
cache.log.Error("MONEY LOST! Bucket bandwidth rollup batch flush failed.", zap.Error(err))
}
var completion *sync2.Fence
cache.mu.Lock()
cache.nextFlushCompletion, completion = new(sync2.Fence), cache.nextFlushCompletion
cache.mu.Unlock()
completion.Release()
}
// SortRollups sorts the rollups
func SortRollups(rollups []BandwidthRollup) {
sort.SliceStable(rollups, func(i, j int) bool {
uuidCompare := bytes.Compare(rollups[i].ProjectID[:], rollups[j].ProjectID[:])
switch {
case uuidCompare == -1:
return true
case uuidCompare == 1:
return false
case rollups[i].BucketName < rollups[j].BucketName:
return true
case rollups[i].BucketName > rollups[j].BucketName:
return false
case rollups[i].Action < rollups[j].Action:
return true
case rollups[i].Action > rollups[j].Action:
return false
default:
return false
}
})
}
func (cache *RollupsWriteCache) updateCacheValue(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, allocated, inline int64, intervalStart time.Time) {
cache.mu.Lock()
defer cache.mu.Unlock()
if intervalStart.After(cache.latestTime) {
cache.latestTime = intervalStart
}
key := CacheKey{
ProjectID: projectID,
BucketName: string(bucketName),
Action: action,
}
data, ok := cache.pendingRollups[key]
if !ok {
cache.currentSize++
}
data.Allocated += allocated
data.Inline += inline
cache.pendingRollups[key] = data
if cache.currentSize < cache.batchSize {
return
}
pendingRollups := cache.pendingRollups
cache.pendingRollups = make(RollupData)
oldSize := cache.currentSize
cache.currentSize = 0
latestTime := cache.latestTime
cache.latestTime = time.Time{}
go cache.flushToDB(ctx, pendingRollups, latestTime, oldSize)
}
// OnNextFlush waits until the next time a flushToDB call is made, then closes
// the returned channel.
func (cache *RollupsWriteCache) OnNextFlush() <-chan struct{} {
cache.mu.Lock()
fence := cache.nextFlushCompletion
cache.mu.Unlock()
return fence.Done()
}
// CurrentSize returns the current size of the cache.
func (cache *RollupsWriteCache) CurrentSize() int {
cache.mu.Lock()
defer cache.mu.Unlock()
return cache.currentSize
}
// CurrentData returns the contents of the cache.
func (cache *RollupsWriteCache) CurrentData() RollupData {
cache.mu.Lock()
defer cache.mu.Unlock()
copyCache := RollupData{}
for k, v := range cache.pendingRollups {
copyCache[k] = v
}
return copyCache
}

View File

@ -0,0 +1,277 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package orders_test
import (
"context"
"fmt"
"testing"
"time"
"github.com/skyrings/skyring-common/tools/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
)
// unfortunately, in GB is apparently the only way we can get this data, so we need to
// arrange for the test to produce a total value that won't lose too much precision
// in the conversion to GB.
func getTotalBandwidthInGB(ctx context.Context, accountingDB accounting.ProjectAccounting, projectID uuid.UUID, since time.Time) (int64, error) {
total, err := accountingDB.GetAllocatedBandwidthTotal(ctx, projectID, since.Add(-time.Hour))
if err != nil {
return 0, err
}
return total, nil
}
// TestOrdersWriteCacheBatchLimitReached makes sure bandwidth rollup values are not written to the
// db until the batch size is reached.
func TestOrdersWriteCacheBatchLimitReached(t *testing.T) {
satellitedbtest.Run(t, func(t *testing.T, db satellite.DB) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
useBatchSize := 10
amount := (memory.MB * 500).Int64()
projectID := testrand.UUID()
startTime := time.Now().UTC()
owc := orders.NewRollupsWriteCache(zaptest.NewLogger(t), db.Orders(), useBatchSize)
accountingDB := db.ProjectAccounting()
// use different bucketName for each write, so they don't get aggregated yet
for i := 0; i < useBatchSize-1; i++ {
bucketName := fmt.Sprintf("my_files_%d", i)
err := owc.UpdateBucketBandwidthAllocation(ctx, projectID, []byte(bucketName), pb.PieceAction_GET, amount, startTime)
require.NoError(t, err)
// check that nothing was actually written since it should just be stored
total, err := getTotalBandwidthInGB(ctx, accountingDB, projectID, startTime)
require.NoError(t, err)
require.Equal(t, int64(0), total)
}
whenDone := owc.OnNextFlush()
// write one more rollup record to hit the threshold
err := owc.UpdateBucketBandwidthAllocation(ctx, projectID, []byte("my_files_last"), pb.PieceAction_GET, amount, startTime)
require.NoError(t, err)
// make sure flushing is done
select {
case <-whenDone:
break
case <-ctx.Done():
t.Fatal(ctx.Err())
}
total, err := getTotalBandwidthInGB(ctx, accountingDB, projectID, startTime)
require.NoError(t, err)
require.Equal(t, amount*int64(useBatchSize), total)
})
}
// TestOrdersWriteCacheBatchChore makes sure bandwidth rollup values are not written to the
// db until the chore flushes the DB (assuming the batch size is not reached).
func TestOrdersWriteCacheBatchChore(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
},
func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
useBatchSize := 10
amount := (memory.MB * 500).Int64()
projectID := testrand.UUID()
startTime := time.Now().UTC()
planet.Satellites[0].Orders.Chore.Loop.Pause()
accountingDB := planet.Satellites[0].DB.ProjectAccounting()
ordersDB := planet.Satellites[0].Orders.DB
// use different pieceAction for each write, so they don't get aggregated yet
for i := 0; i < useBatchSize-1; i++ {
bucketName := fmt.Sprintf("my_files_%d", i)
err := ordersDB.UpdateBucketBandwidthAllocation(ctx, projectID, []byte(bucketName), pb.PieceAction_GET, amount, startTime)
require.NoError(t, err)
// check that nothing was actually written
total, err := getTotalBandwidthInGB(ctx, accountingDB, projectID, startTime)
require.NoError(t, err)
require.Equal(t, int64(0), total)
}
owc := ordersDB.(*orders.RollupsWriteCache)
whenDone := owc.OnNextFlush()
// wait for Loop to complete
planet.Satellites[0].Orders.Chore.Loop.Restart()
planet.Satellites[0].Orders.Chore.Loop.TriggerWait()
// make sure flushing is done
select {
case <-whenDone:
break
case <-ctx.Done():
t.Fatal(ctx.Err())
}
total, err := getTotalBandwidthInGB(ctx, accountingDB, projectID, startTime)
require.NoError(t, err)
require.Equal(t, amount*int64(useBatchSize-1), total)
},
)
}
func TestUpdateBucketBandwidthAllocation(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
},
func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
ordersDB := planet.Satellites[0].Orders.DB
// setup: check there is nothing in the cache to start
cache, ok := ordersDB.(*orders.RollupsWriteCache)
require.True(t, ok)
size := cache.CurrentSize()
require.Equal(t, size, 0)
// setup: add one item to the cache
projectID := testrand.UUID()
bucketName := []byte("testbucketname")
amount := (memory.MB * 500).Int64()
err := ordersDB.UpdateBucketBandwidthAllocation(ctx, projectID, bucketName, pb.PieceAction_GET, amount, time.Now().UTC())
require.NoError(t, err)
// test: confirm there is one item in the cache now
size = cache.CurrentSize()
require.Equal(t, size, 1)
projectMap := cache.CurrentData()
expectedKey := orders.CacheKey{
ProjectID: projectID,
BucketName: string(bucketName),
Action: pb.PieceAction_GET,
}
expected := orders.RollupData{
expectedKey: {
Inline: 0,
Allocated: amount,
},
}
require.Equal(t, projectMap, expected)
// setup: add another item to the cache but with a different projectID
projectID2 := testrand.UUID()
amount2 := (memory.MB * 10).Int64()
err = ordersDB.UpdateBucketBandwidthAllocation(ctx, projectID2, bucketName, pb.PieceAction_GET, amount2, time.Now().UTC())
require.NoError(t, err)
size = cache.CurrentSize()
require.Equal(t, size, 2)
projectMap2 := cache.CurrentData()
// test: confirm there are two items in the cache now for different projectIDs
expectedKey = orders.CacheKey{
ProjectID: projectID2,
BucketName: string(bucketName),
Action: pb.PieceAction_GET,
}
expected[expectedKey] = orders.CacheData{
Inline: 0,
Allocated: amount2,
}
require.Equal(t, projectMap2, expected)
},
)
}
func TestSortRollups(t *testing.T) {
rollups := []orders.BandwidthRollup{
{
ProjectID: uuid.UUID{1},
BucketName: "a",
Action: pb.PieceAction_GET, // GET is 2
Inline: 1,
Allocated: 2,
},
{
ProjectID: uuid.UUID{2},
BucketName: "a",
Action: pb.PieceAction_GET,
Inline: 1,
Allocated: 2,
},
{
ProjectID: uuid.UUID{1},
BucketName: "b",
Action: pb.PieceAction_GET,
Inline: 1,
Allocated: 2,
},
{
ProjectID: uuid.UUID{1},
BucketName: "a",
Action: pb.PieceAction_GET_AUDIT,
Inline: 1,
Allocated: 2,
},
{
ProjectID: uuid.UUID{1},
BucketName: "a",
Action: pb.PieceAction_GET,
Inline: 1,
Allocated: 2,
},
}
expRollups := []orders.BandwidthRollup{
{
ProjectID: uuid.UUID{1},
BucketName: "a",
Action: pb.PieceAction_GET, // GET is 2
Inline: 1,
Allocated: 2,
},
{
ProjectID: uuid.UUID{1},
BucketName: "a",
Action: pb.PieceAction_GET,
Inline: 1,
Allocated: 2,
},
{
ProjectID: uuid.UUID{1},
BucketName: "a",
Action: pb.PieceAction_GET_AUDIT,
Inline: 1,
Allocated: 2,
},
{
ProjectID: uuid.UUID{1},
BucketName: "b",
Action: pb.PieceAction_GET,
Inline: 1,
Allocated: 2,
},
{
ProjectID: uuid.UUID{2},
BucketName: "a",
Action: pb.PieceAction_GET,
Inline: 1,
Allocated: 2,
},
}
assert.NotEqual(t, expRollups, rollups)
orders.SortRollups(rollups)
assert.Equal(t, expRollups, rollups)
}

View File

@ -27,6 +27,8 @@ var ErrDownloadFailedNotEnoughPieces = errs.Class("not enough pieces for downloa
type Config struct {
Expiration time.Duration `help:"how long until an order expires" default:"168h"` // 7 days
SettlementBatchSize int `help:"how many orders to batch per transaction" default:"250"`
FlushBatchSize int `help:"how many items in the rollups write cache before they are flushed to the database" devDefault:"20" releaseDefault:"10000"`
FlushInterval time.Duration `help:"how often to flush the rollups write cache to the database" devDefault:"30s" releaseDefault:"1m"`
}
// Service for creating order limits.

View File

@ -37,9 +37,13 @@ type Repairer struct {
Dialer rpc.Dialer
Version *version_checker.Service
Metainfo *metainfo.Service
Overlay *overlay.Service
Orders *orders.Service
Metainfo *metainfo.Service
Overlay *overlay.Service
Orders struct {
DB orders.DB
Service *orders.Service
Chore *orders.Chore
}
SegmentRepairer *repairer.SegmentRepairer
Repairer *repairer.Service
}
@ -80,11 +84,14 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity, pointerDB metainf
}
{ // setup orders
peer.Orders = orders.NewService(
ordersWriteCache := orders.NewRollupsWriteCache(log, ordersDB, config.Orders.FlushBatchSize)
peer.Orders.DB = ordersWriteCache
peer.Orders.Chore = orders.NewChore(log.Named("orders chore"), ordersWriteCache, config.Orders)
peer.Orders.Service = orders.NewService(
log.Named("orders"),
signing.SignerFromFullIdentity(peer.Identity),
peer.Overlay,
ordersDB,
peer.Orders.DB,
config.Orders.Expiration,
&pb.NodeAddress{
Transport: pb.NodeTransport_TCP_TLS_GRPC,
@ -98,7 +105,7 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity, pointerDB metainf
peer.SegmentRepairer = repairer.NewSegmentRepairer(
log.Named("segment-repair"),
peer.Metainfo,
peer.Orders,
peer.Orders.Service,
peer.Overlay,
peer.Dialer,
config.Repairer.Timeout,
@ -122,6 +129,9 @@ func (peer *Repairer) Run(ctx context.Context) (err error) {
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Version.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Orders.Chore.Run(ctx))
})
group.Go(func() error {
peer.Log.Info("Repairer started")
return errs2.IgnoreCanceled(peer.Repairer.Run(ctx))
@ -142,6 +152,9 @@ func (peer *Repairer) Close() error {
if peer.Repairer != nil {
errlist.Add(peer.Repairer.Close())
}
if peer.Orders.Chore != nil {
errlist.Add(peer.Orders.Chore.Close())
}
return errlist.Err()
}

View File

@ -7,11 +7,14 @@ import (
"bytes"
"context"
"database/sql"
"fmt"
"sort"
"strings"
"time"
"github.com/skyrings/skyring-common/tools/uuid"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/pb"
"storj.io/common/storj"
@ -361,3 +364,67 @@ func (db *ordersDB) processOrdersInTx(ctx context.Context, requests []*orders.Pr
}
return responses, nil
}
func (db *ordersDB) UpdateBucketBandwidthBatch(ctx context.Context, intervalStart time.Time, rollups []orders.BandwidthRollup) error {
if len(rollups) == 0 {
return nil
}
const stmtBegin = `
INSERT INTO bucket_bandwidth_rollups (bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled)
VALUES
`
const stmtEnd = `
ON CONFLICT(bucket_name, project_id, interval_start, action)
DO UPDATE SET allocated = bucket_bandwidth_rollups.allocated + EXCLUDED.allocated, inline = bucket_bandwidth_rollups.inline + EXCLUDED.inline
`
intervalStart = intervalStart.UTC()
intervalStart = time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), intervalStart.Hour(), 0, 0, 0, time.UTC)
var lastProjectID uuid.UUID
var lastBucketName string
var projectIDArgNum int
var bucketNameArgNum int
var args []interface{}
var stmt strings.Builder
stmt.WriteString(stmtBegin)
args = append(args, intervalStart)
for i, rollup := range rollups {
if i > 0 {
stmt.WriteString(",")
}
if lastProjectID != rollup.ProjectID {
lastProjectID = rollup.ProjectID
// take the slice over rollup.ProjectID, because it is going to stay
// the same up to the ExecContext call, whereas lastProjectID is likely
// to be overwritten
args = append(args, rollup.ProjectID[:])
projectIDArgNum = len(args)
}
if lastBucketName != rollup.BucketName {
lastBucketName = rollup.BucketName
args = append(args, lastBucketName)
bucketNameArgNum = len(args)
}
args = append(args, rollup.Action, rollup.Inline, rollup.Allocated)
stmt.WriteString(fmt.Sprintf(
"($%d,$%d,$1,%d,$%d,$%d,$%d,0)",
bucketNameArgNum,
projectIDArgNum,
defaultIntervalSeconds,
len(args)-2,
len(args)-1,
len(args),
))
}
stmt.WriteString(stmtEnd)
_, err := db.db.ExecContext(ctx, stmt.String(), args...)
if err != nil {
db.db.log.Error("Bucket bandwidth rollup batch flush failed.", zap.Error(err))
}
return err
}

View File

@ -287,7 +287,8 @@ func (db *ProjectAccounting) getTotalEgress(ctx context.Context, projectID uuid.
// GetBucketUsageRollups retrieves summed usage rollups for every bucket of particular project for a given period
func (db *ProjectAccounting) GetBucketUsageRollups(ctx context.Context, projectID uuid.UUID, since, before time.Time) (_ []accounting.BucketUsageRollup, err error) {
defer mon.Task()(&ctx)(&err)
since = timeTruncateDown(since)
since = timeTruncateDown(since.UTC())
before = before.UTC()
buckets, err := db.getBuckets(ctx, projectID, since, before)
if err != nil {

View File

@ -298,6 +298,12 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
# how long until an order expires
# orders.expiration: 168h0m0s
# how many items in the rollups write cache before they are flushed to the database
# orders.flush-batch-size: 10000
# how often to flush the rollups write cache to the database
# orders.flush-interval: 1m0s
# how many orders to batch per transaction
# orders.settlement-batch-size: 250