satellites/orders: populate egress_dead in project_bandwidth_daily_rollups

Populate the egress_dead column for taking into account allocated bandwidth that can be removed because orders have been sent by the storage nodes. The bandwidth not used in these orders can be allocated again.

Change-Id: I78c333a03945cd7330aec052edd3562ec671118e
This commit is contained in:
Fadila Khadar 2021-05-30 00:16:12 +02:00 committed by Fadila
parent 6ab2647adb
commit fb0d055a41
11 changed files with 74 additions and 45 deletions

View File

@ -184,7 +184,7 @@ type ProjectAccounting interface {
// GetProjectBandwidth returns project allocated bandwidth for the specified year, month and day.
GetProjectBandwidth(ctx context.Context, projectID uuid.UUID, year int, month time.Month, day int, asOfSystemInterval time.Duration) (int64, error)
// GetProjectDailyBandwidth returns bandwidth (allocated and settled) for the specified day.
GetProjectDailyBandwidth(ctx context.Context, projectID uuid.UUID, year int, month time.Month, day int) (int64, int64, error)
GetProjectDailyBandwidth(ctx context.Context, projectID uuid.UUID, year int, month time.Month, day int) (int64, int64, int64, error)
// DeleteProjectBandwidthBefore deletes project bandwidth rollups before the given time
DeleteProjectBandwidthBefore(ctx context.Context, before time.Time) error

View File

@ -286,7 +286,7 @@ func createBucketBandwidthRollupsForPast4Days(ctx *testcontext.Context, satellit
return expectedSum, err
}
err = ordersDB.UpdateBucketBandwidthSettle(ctx,
projectID, []byte(bucketName), pb.PieceAction_GET, amount, intervalStart,
projectID, []byte(bucketName), pb.PieceAction_GET, amount, 0, intervalStart,
)
if err != nil {
return expectedSum, err
@ -421,7 +421,7 @@ func TestUsageRollups(t *testing.T) {
err := db.Orders().UpdateBucketBandwidthAllocation(ctx, project1, []byte(bucketName), action, value*6, now)
require.NoError(t, err)
err = db.Orders().UpdateBucketBandwidthSettle(ctx, project1, []byte(bucketName), action, value*3, now)
err = db.Orders().UpdateBucketBandwidthSettle(ctx, project1, []byte(bucketName), action, value*3, 0, now)
require.NoError(t, err)
err = db.Orders().UpdateBucketBandwidthInline(ctx, project1, []byte(bucketName), action, value, now)
@ -438,7 +438,7 @@ func TestUsageRollups(t *testing.T) {
err := db.Orders().UpdateBucketBandwidthAllocation(ctx, project2, []byte(bucketName), action, value*6, now)
require.NoError(t, err)
err = db.Orders().UpdateBucketBandwidthSettle(ctx, project2, []byte(bucketName), action, value*3, now)
err = db.Orders().UpdateBucketBandwidthSettle(ctx, project2, []byte(bucketName), action, value*3, 0, now)
require.NoError(t, err)
err = db.Orders().UpdateBucketBandwidthInline(ctx, project2, []byte(bucketName), action, value, now)

View File

@ -53,7 +53,7 @@ func TestRollupArchiveChore(t *testing.T) {
bucketName = fmt.Sprintf("%s%d", "testbucket", i)
err := satellite.DB.Orders().UpdateBucketBandwidthSettle(ctx,
projectID, []byte(bucketName), pb.PieceAction_GET, bwAmount, timestamp,
projectID, []byte(bucketName), pb.PieceAction_GET, bwAmount, 0, timestamp,
)
require.NoError(t, err)

View File

@ -196,11 +196,11 @@ func createData(ctx *testcontext.Context, t *testing.T, db satellite.DB, testDat
projectAccoutingDB := db.ProjectAccounting()
orderDB := db.Orders()
err := orderDB.UpdateBucketBandwidthSettle(ctx, testData.projectID, testData.bucketName, pb.PieceAction_GET, testData.egressSize, testData.bwStart)
err := orderDB.UpdateBucketBandwidthSettle(ctx, testData.projectID, testData.bucketName, pb.PieceAction_GET, testData.egressSize, 0, testData.bwStart)
require.NoError(t, err)
// Only GET should be counted. So this should not effect results
err = orderDB.UpdateBucketBandwidthSettle(ctx, testData.projectID, testData.bucketName, pb.PieceAction_GET_AUDIT, testData.egressSize, testData.bwStart)
err = orderDB.UpdateBucketBandwidthSettle(ctx, testData.projectID, testData.bucketName, pb.PieceAction_GET_AUDIT, testData.egressSize, 0, testData.bwStart)
require.NoError(t, err)
testData.bwStart = testData.bwStart.Add(time.Hour)

View File

@ -33,7 +33,7 @@ type DB interface {
// UpdateBucketBandwidthAllocation updates 'allocated' bandwidth for given bucket
UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
// UpdateBucketBandwidthSettle updates 'settled' bandwidth for given bucket
UpdateBucketBandwidthSettle(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
UpdateBucketBandwidthSettle(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, settledAmount, deadAmount int64, intervalStart time.Time) error
// UpdateBucketBandwidthInline updates 'inline' bandwidth for given bucket
UpdateBucketBandwidthInline(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
// UpdateBucketBandwidthBatch updates all the bandwidth rollups in the database
@ -90,6 +90,7 @@ type BucketBandwidthRollup struct {
Inline int64
Allocated int64
Settled int64
Dead int64
}
// SortBucketBandwidthRollups sorts the rollups.
@ -232,8 +233,14 @@ func (endpoint *Endpoint) SettlementWithWindowFinal(stream pb.DRPCOrders_Settlem
log := endpoint.log.Named(peer.ID.String())
log.Debug("SettlementWithWindow")
type bandwidthAmount struct {
Settled int64
Allocated int64
Dead int64
}
storagenodeSettled := map[int32]int64{}
bucketSettled := map[bucketIDAction]int64{}
bucketSettled := map[bucketIDAction]bandwidthAmount{}
seenSerials := map[storj.SerialNumber]struct{}{}
var window int64
@ -317,11 +324,16 @@ func (endpoint *Endpoint) SettlementWithWindowFinal(stream pb.DRPCOrders_Settlem
continue
}
bucketSettled[bucketIDAction{
currentBucketIDAction := bucketIDAction{
bucketname: bucketInfo.BucketName,
projectID: bucketInfo.ProjectID,
action: orderLimit.Action,
}] += order.Amount
}
bucketSettled[currentBucketIDAction] = bandwidthAmount{
Settled: bucketSettled[currentBucketIDAction].Settled + order.Amount,
Allocated: bucketSettled[currentBucketIDAction].Allocated + orderLimit.Limit,
Dead: bucketSettled[currentBucketIDAction].Dead + orderLimit.Limit - order.Amount,
}
}
if len(storagenodeSettled) == 0 {
@ -346,9 +358,9 @@ func (endpoint *Endpoint) SettlementWithWindowFinal(stream pb.DRPCOrders_Settlem
)
if status == pb.SettlementWithWindowResponse_ACCEPTED && !alreadyProcessed {
for bucketIDAction, amount := range bucketSettled {
for bucketIDAction, bwAmount := range bucketSettled {
err = endpoint.DB.UpdateBucketBandwidthSettle(ctx,
bucketIDAction.projectID, []byte(bucketIDAction.bucketname), bucketIDAction.action, amount, time.Unix(0, window),
bucketIDAction.projectID, []byte(bucketIDAction.bucketname), bucketIDAction.action, bwAmount.Settled, bwAmount.Dead, time.Unix(0, window),
)
if err != nil {
log.Info("err updating bucket bandwidth settle", zap.Error(err))
@ -404,5 +416,10 @@ func (endpoint *Endpoint) isValid(ctx context.Context, log *zap.Logger, order *p
mon.Event("order_not_valid_window_mismatch")
return false
}
if orderLimit.Limit < order.Amount {
log.Debug("invalid settlement: amounts mismatch")
mon.Event("order_not_valid_amounts_mismatch")
return false
}
return true
}

View File

@ -393,9 +393,10 @@ func TestProjectBandwidthDailyRollups(t *testing.T) {
projectAccountingDB := planet.Satellites[0].DB.ProjectAccounting()
year, month, day := now.Year(), now.Month(), now.Day()
allocated, settled, err := projectAccountingDB.GetProjectDailyBandwidth(ctx, planet.Uplinks[0].Projects[0].ID, year, month, day)
allocated, settled, dead, err := projectAccountingDB.GetProjectDailyBandwidth(ctx, planet.Uplinks[0].Projects[0].ID, year, month, day)
require.NoError(t, err)
require.NotZero(t, allocated)
require.Equal(t, allocated, settled)
require.Zero(t, dead)
})
}

View File

@ -21,6 +21,7 @@ type CacheData struct {
Inline int64
Allocated int64
Settled int64
Dead int64
}
// CacheKey is the key information for the cached map below.
@ -62,17 +63,17 @@ func NewRollupsWriteCache(log *zap.Logger, db DB, batchSize int) *RollupsWriteCa
// UpdateBucketBandwidthAllocation updates the rollups cache adding allocated data for a bucket bandwidth rollup.
func (cache *RollupsWriteCache) UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error {
return cache.updateCacheValue(ctx, projectID, bucketName, action, amount, 0, 0, intervalStart.UTC())
return cache.updateCacheValue(ctx, projectID, bucketName, action, amount, 0, 0, 0, intervalStart.UTC())
}
// UpdateBucketBandwidthInline updates the rollups cache adding inline data for a bucket bandwidth rollup.
func (cache *RollupsWriteCache) UpdateBucketBandwidthInline(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error {
return cache.updateCacheValue(ctx, projectID, bucketName, action, 0, amount, 0, intervalStart.UTC())
return cache.updateCacheValue(ctx, projectID, bucketName, action, 0, amount, 0, 0, intervalStart.UTC())
}
// UpdateBucketBandwidthSettle updates the rollups cache adding settled data for a bucket bandwidth rollup.
func (cache *RollupsWriteCache) UpdateBucketBandwidthSettle(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error {
return cache.updateCacheValue(ctx, projectID, bucketName, action, 0, 0, amount, intervalStart.UTC())
// UpdateBucketBandwidthSettle updates the rollups cache adding settled data for a bucket bandwidth rollup - deadAmount is not used.
func (cache *RollupsWriteCache) UpdateBucketBandwidthSettle(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, settledAmount, deadAmount int64, intervalStart time.Time) error {
return cache.updateCacheValue(ctx, projectID, bucketName, action, 0, 0, settledAmount, deadAmount, intervalStart.UTC())
}
// resetCache should only be called after you have acquired the cache lock. It
@ -142,6 +143,7 @@ func (cache *RollupsWriteCache) flush(ctx context.Context, pendingRollups Rollup
Inline: cacheData.Inline,
Allocated: cacheData.Allocated,
Settled: cacheData.Settled,
Dead: cacheData.Dead,
})
}
@ -160,7 +162,7 @@ func (cache *RollupsWriteCache) flush(ctx context.Context, pendingRollups Rollup
cache.flushing = false
}
func (cache *RollupsWriteCache) updateCacheValue(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, allocated, inline, settled int64, intervalStart time.Time) error {
func (cache *RollupsWriteCache) updateCacheValue(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, allocated, inline, settled, dead int64, intervalStart time.Time) error {
defer mon.Task()(&ctx)(nil)
cache.mu.Lock()
@ -190,6 +192,7 @@ func (cache *RollupsWriteCache) updateCacheValue(ctx context.Context, projectID
data.Allocated += allocated
data.Inline += inline
data.Settled += settled
data.Dead += dead
cache.pendingRollups[key] = data
}

View File

@ -150,7 +150,7 @@ func TestUpdateBucketBandwidth(t *testing.T) {
amount := (memory.MB * 500).Int64()
err := ordersDB.UpdateBucketBandwidthAllocation(ctx, projectID, bucketName, pb.PieceAction_GET, amount, time.Now())
require.NoError(t, err)
err = ordersDB.UpdateBucketBandwidthSettle(ctx, projectID, bucketName, pb.PieceAction_PUT, amount, time.Now())
err = ordersDB.UpdateBucketBandwidthSettle(ctx, projectID, bucketName, pb.PieceAction_PUT, amount, 0, time.Now())
require.NoError(t, err)
// test: confirm there is one item in the cache now
@ -185,7 +185,7 @@ func TestUpdateBucketBandwidth(t *testing.T) {
amount2 := (memory.MB * 10).Int64()
err = ordersDB.UpdateBucketBandwidthAllocation(ctx, projectID2, bucketName, pb.PieceAction_GET, amount2, time.Now())
require.NoError(t, err)
err = ordersDB.UpdateBucketBandwidthSettle(ctx, projectID2, bucketName, pb.PieceAction_GET, amount2, time.Now())
err = ordersDB.UpdateBucketBandwidthSettle(ctx, projectID2, bucketName, pb.PieceAction_GET, amount2, 0, time.Now())
require.NoError(t, err)
size = cache.CurrentSize()
require.Equal(t, 3, size)

View File

@ -55,7 +55,7 @@ func TestService_InvoiceElementsProcessing(t *testing.T) {
require.NoError(t, err)
err = satellite.DB.Orders().UpdateBucketBandwidthSettle(ctx, project.ID, []byte("testbucket"),
pb.PieceAction_GET, int64(i+10)*memory.GiB.Int64(), period)
pb.PieceAction_GET, int64(i+10)*memory.GiB.Int64(), 0, period)
require.NoError(t, err)
}
@ -137,7 +137,7 @@ func TestService_InvoiceUserWithManyProjects(t *testing.T) {
// generate egress
projectsEgress[i] = int64(i+10) * memory.GiB.Int64()
err = satellite.DB.Orders().UpdateBucketBandwidthSettle(ctx, projects[i].ID, []byte("testbucket"),
pb.PieceAction_GET, projectsEgress[i], period)
pb.PieceAction_GET, projectsEgress[i], 0, period)
require.NoError(t, err)
// generate storage
@ -249,7 +249,7 @@ func TestService_InvoiceUserWithManyCoupons(t *testing.T) {
{
// generate egress
err = satellite.DB.Orders().UpdateBucketBandwidthSettle(ctx, project.ID, []byte("testbucket"),
pb.PieceAction_GET, 10*memory.GiB.Int64(), period)
pb.PieceAction_GET, 10*memory.GiB.Int64(), 0, period)
require.NoError(t, err)
// generate storage
@ -362,7 +362,7 @@ func TestService_ApplyCouponsInTheOrder(t *testing.T) {
{
// generate egress - 48 cents
err = satellite.DB.Orders().UpdateBucketBandwidthSettle(ctx, project.ID, []byte("testbucket"),
pb.PieceAction_GET, 10*memory.GiB.Int64(), period)
pb.PieceAction_GET, 10*memory.GiB.Int64(), 0, period)
require.NoError(t, err)
}
@ -499,7 +499,7 @@ func TestService_CouponStatus(t *testing.T) {
// generate egress
err = satellite.DB.Orders().UpdateBucketBandwidthSettle(ctx, project.ID, []byte("testbucket"),
pb.PieceAction_GET, tt.egress.Int64(), period)
pb.PieceAction_GET, tt.egress.Int64(), 0, period)
require.NoError(t, err, errTag)
satellite.API.Payments.Service.SetNow(func() time.Time {

View File

@ -65,12 +65,12 @@ func (db *ordersDB) UpdateBucketBandwidthAllocation(ctx context.Context, project
if action == pb.PieceAction_GET {
dailyInterval := time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), 0, 0, 0, 0, time.UTC)
statement = db.db.Rebind(
`INSERT INTO project_bandwidth_daily_rollups (project_id, interval_day, egress_allocated, egress_settled)
VALUES (?, ?, ?, ?)
`INSERT INTO project_bandwidth_daily_rollups (project_id, interval_day, egress_allocated, egress_settled, egress_dead)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(project_id, interval_day)
DO UPDATE SET egress_allocated = project_bandwidth_daily_rollups.egress_allocated + EXCLUDED.egress_allocated::BIGINT`,
)
batch.Queue(statement, projectID[:], dailyInterval, uint64(amount), 0)
batch.Queue(statement, projectID[:], dailyInterval, uint64(amount), 0, 0)
}
batch.Queue(`COMMIT TRANSACTION`)
@ -89,7 +89,7 @@ func (db *ordersDB) UpdateBucketBandwidthAllocation(ctx context.Context, project
}
// UpdateBucketBandwidthSettle updates 'settled' bandwidth for given bucket.
func (db *ordersDB) UpdateBucketBandwidthSettle(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) (err error) {
func (db *ordersDB) UpdateBucketBandwidthSettle(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, settledAmount, deadAmount int64, intervalStart time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
return db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
@ -100,7 +100,7 @@ func (db *ordersDB) UpdateBucketBandwidthSettle(ctx context.Context, projectID u
DO UPDATE SET settled = bucket_bandwidth_rollups.settled + ?`,
)
_, err = db.db.ExecContext(ctx, statement,
bucketName, projectID[:], intervalStart.UTC(), defaultIntervalSeconds, action, 0, 0, uint64(amount), uint64(amount),
bucketName, projectID[:], intervalStart.UTC(), defaultIntervalSeconds, action, 0, 0, uint64(settledAmount), uint64(settledAmount),
)
if err != nil {
return ErrUpdateBucketBandwidthSettle.Wrap(err)
@ -109,12 +109,14 @@ func (db *ordersDB) UpdateBucketBandwidthSettle(ctx context.Context, projectID u
if action == pb.PieceAction_GET {
dailyInterval := time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), 0, 0, 0, 0, time.UTC)
statement = tx.Rebind(
`INSERT INTO project_bandwidth_daily_rollups (project_id, interval_day, egress_allocated, egress_settled)
VALUES (?, ?, ?, ?)
`INSERT INTO project_bandwidth_daily_rollups (project_id, interval_day, egress_allocated, egress_settled, egress_dead)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(project_id, interval_day)
DO UPDATE SET egress_settled = project_bandwidth_daily_rollups.egress_settled + EXCLUDED.egress_settled::BIGINT`,
DO UPDATE SET
egress_settled = project_bandwidth_daily_rollups.egress_settled + EXCLUDED.egress_settled::BIGINT,
egress_dead = project_bandwidth_daily_rollups.egress_dead + EXCLUDED.egress_dead::BIGINT`,
)
_, err = tx.Tx.ExecContext(ctx, statement, projectID[:], dailyInterval, 0, uint64(amount))
_, err = tx.Tx.ExecContext(ctx, statement, projectID[:], dailyInterval, 0, uint64(settledAmount), uint64(deadAmount))
if err != nil {
return err
}
@ -230,6 +232,7 @@ func (db *ordersDB) UpdateBucketBandwidthBatch(ctx context.Context, intervalStar
type bandwidth struct {
Allocated int64
Settled int64
Dead int64
}
projectRUMap := make(map[uuid.UUID]bandwidth)
@ -246,6 +249,7 @@ func (db *ordersDB) UpdateBucketBandwidthBatch(ctx context.Context, intervalStar
b := projectRUMap[rollup.ProjectID]
b.Allocated += rollup.Allocated
b.Settled += rollup.Settled
b.Dead += rollup.Dead
projectRUMap[rollup.ProjectID] = b
}
}
@ -274,23 +278,27 @@ func (db *ordersDB) UpdateBucketBandwidthBatch(ctx context.Context, intervalStar
projectRUIDs := make([]uuid.UUID, 0, len(projectRUMap))
var projectRUAllocated []int64
var projectRUSettled []int64
var projectRUDead []int64
dailyInterval := time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), 0, 0, 0, 0, time.UTC)
for projectID, v := range projectRUMap {
projectRUIDs = append(projectRUIDs, projectID)
projectRUAllocated = append(projectRUAllocated, v.Allocated)
projectRUSettled = append(projectRUSettled, v.Settled)
projectRUDead = append(projectRUDead, v.Dead)
}
if len(projectRUIDs) > 0 {
_, err = tx.Tx.ExecContext(ctx, `
INSERT INTO project_bandwidth_daily_rollups(project_id, interval_day, egress_allocated, egress_settled)
SELECT unnest($1::bytea[]), $2, unnest($3::bigint[]), unnest($4::bigint[])
INSERT INTO project_bandwidth_daily_rollups(project_id, interval_day, egress_allocated, egress_settled, egress_dead)
SELECT unnest($1::bytea[]), $2, unnest($3::bigint[]), unnest($4::bigint[]), unnest($5::bigint[])
ON CONFLICT(project_id, interval_day)
DO UPDATE SET
egress_allocated = project_bandwidth_daily_rollups.egress_allocated + EXCLUDED.egress_allocated::bigint,
egress_settled = project_bandwidth_daily_rollups.egress_settled + EXCLUDED.egress_settled::bigint
`, pgutil.UUIDArray(projectRUIDs), dailyInterval, pgutil.Int8Array(projectRUAllocated), pgutil.Int8Array(projectRUSettled))
egress_settled = project_bandwidth_daily_rollups.egress_settled + EXCLUDED.egress_settled::bigint,
egress_dead = project_bandwidth_daily_rollups.egress_dead + EXCLUDED.egress_dead::bigint
`, pgutil.UUIDArray(projectRUIDs), dailyInterval, pgutil.Int8Array(projectRUAllocated), pgutil.Int8Array(projectRUSettled), pgutil.Int8Array(projectRUDead))
if err != nil {
db.db.log.Error("Project bandwidth daily rollup batch flush failed.", zap.Error(err))
}

View File

@ -185,18 +185,18 @@ func (db *ProjectAccounting) GetProjectBandwidth(ctx context.Context, projectID
}
// GetProjectDailyBandwidth returns project bandwidth (allocated and settled) for the specified day.
func (db *ProjectAccounting) GetProjectDailyBandwidth(ctx context.Context, projectID uuid.UUID, year int, month time.Month, day int) (allocated int64, settled int64, err error) {
func (db *ProjectAccounting) GetProjectDailyBandwidth(ctx context.Context, projectID uuid.UUID, year int, month time.Month, day int) (allocated int64, settled, dead int64, err error) {
defer mon.Task()(&ctx)(&err)
interval := time.Date(year, month, day, 0, 0, 0, 0, time.UTC)
query := `SELECT egress_allocated, egress_settled FROM project_bandwidth_daily_rollups WHERE project_id = ? AND interval_day = ?;`
err = db.db.QueryRow(ctx, db.db.Rebind(query), projectID[:], interval).Scan(&allocated, &settled)
query := `SELECT egress_allocated, egress_settled, egress_dead FROM project_bandwidth_daily_rollups WHERE project_id = ? AND interval_day = ?;`
err = db.db.QueryRow(ctx, db.db.Rebind(query), projectID[:], interval).Scan(&allocated, &settled, &dead)
if errors.Is(err, sql.ErrNoRows) {
return 0, 0, nil
return 0, 0, 0, nil
}
return allocated, settled, err
return allocated, settled, dead, err
}
// DeleteProjectBandwidthBefore deletes project bandwidth rollups before the given time.