satellite/orders: use project daily bandwidth rollups

Replace GetProjectAllocatedBandwidth by GetProjectBandwidth which calculates
used bandwidth from allocated and settled bandwidth recorded in the
project_bandwidth_daily_rollups table.
For each day in the month, if the allocated bandwidth is expired, it uses the
settled bandwidth for computing used bandwidth.

Change-Id: Ife723c6d5275338f470619631acb25930d39ac3c
This commit is contained in:
Fadila Khadar 2021-05-25 15:12:01 +02:00 committed by Michal Niewrzal
parent 59eabcca24
commit 63cfc8fbe0
6 changed files with 62 additions and 25 deletions

View File

@ -183,12 +183,12 @@ type ProjectAccounting interface {
CreateStorageTally(ctx context.Context, tally BucketStorageTally) error CreateStorageTally(ctx context.Context, tally BucketStorageTally) error
// GetAllocatedBandwidthTotal returns the sum of GET bandwidth usage allocated for a projectID in the past time frame // GetAllocatedBandwidthTotal returns the sum of GET bandwidth usage allocated for a projectID in the past time frame
GetAllocatedBandwidthTotal(ctx context.Context, projectID uuid.UUID, from time.Time) (int64, error) GetAllocatedBandwidthTotal(ctx context.Context, projectID uuid.UUID, from time.Time) (int64, error)
// GetProjectAllocatedBandwidth returns project allocated bandwidth for the specified year and month. // GetProjectBandwidth returns project allocated bandwidth for the specified year, month and day.
GetProjectAllocatedBandwidth(ctx context.Context, projectID uuid.UUID, year int, month time.Month) (int64, error) GetProjectBandwidth(ctx context.Context, projectID uuid.UUID, year int, month time.Month, day int) (int64, error)
// GetProjectDailyBandwidth returns bandwidth (allocated and settled) for the specified day. // GetProjectDailyBandwidth returns bandwidth (allocated and settled) for the specified day.
GetProjectDailyBandwidth(ctx context.Context, projectID uuid.UUID, year int, month time.Month, day int) (int64, int64, error) GetProjectDailyBandwidth(ctx context.Context, projectID uuid.UUID, year int, month time.Month, day int) (int64, int64, error)
// DeleteProjectAllocatedBandwidthBefore deletes project bandwidth rollups before the given time // DeleteProjectBandwidthBefore deletes project bandwidth rollups before the given time
DeleteProjectAllocatedBandwidthBefore(ctx context.Context, before time.Time) error DeleteProjectBandwidthBefore(ctx context.Context, before time.Time) error
// GetStorageTotals returns the current inline and remote storage usage for a projectID // GetStorageTotals returns the current inline and remote storage usage for a projectID
GetStorageTotals(ctx context.Context, projectID uuid.UUID) (int64, int64, error) GetStorageTotals(ctx context.Context, projectID uuid.UUID) (int64, int64, error)

View File

@ -69,7 +69,7 @@ func (chore *Chore) RunOnce(ctx context.Context) (err error) {
now := time.Now().UTC() now := time.Now().UTC()
beforeMonth := time.Date(now.Year(), now.Month()-time.Month(chore.config.RetainMonths), 1, 0, 0, 0, 0, time.UTC) beforeMonth := time.Date(now.Year(), now.Month()-time.Month(chore.config.RetainMonths), 1, 0, 0, 0, 0, time.UTC)
return chore.db.DeleteProjectAllocatedBandwidthBefore(ctx, beforeMonth) return chore.db.DeleteProjectBandwidthBefore(ctx, beforeMonth)
} }
// Close stops the chore. // Close stops the chore.

View File

@ -63,7 +63,7 @@ func testProjectAllocatedBandwidthRetain(t *testing.T, retain int) {
} }
for i := 0; i <= months; i++ { for i := 0; i <= months; i++ {
newDate := time.Date(now.Year(), now.Month()-time.Month(i), 15, 12, 0, 0, 0, time.UTC) newDate := time.Date(now.Year(), now.Month()-time.Month(i), 15, 12, 0, 0, 0, time.UTC)
bytes, err := satellite.Accounting.ProjectUsage.GetProjectAllocatedBandwidth(ctx, projectID, newDate.Year(), newDate.Month()) bytes, err := satellite.Accounting.ProjectUsage.GetProjectBandwidth(ctx, projectID, newDate.Year(), newDate.Month(), newDate.Day())
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, testBytes, bytes) require.EqualValues(t, testBytes, bytes)
} }
@ -72,7 +72,7 @@ func testProjectAllocatedBandwidthRetain(t *testing.T, retain int) {
for i := 0; i <= months; i++ { for i := 0; i <= months; i++ {
newDate := time.Date(now.Year(), now.Month()-time.Month(i), 15, 12, 0, 0, 0, time.UTC) newDate := time.Date(now.Year(), now.Month()-time.Month(i), 15, 12, 0, 0, 0, time.UTC)
bytes, err := satellite.Accounting.ProjectUsage.GetProjectAllocatedBandwidth(ctx, projectID, newDate.Year(), newDate.Month()) bytes, err := satellite.Accounting.ProjectUsage.GetProjectBandwidth(ctx, projectID, newDate.Year(), newDate.Month(), newDate.Day())
if i < months || retain < 0 { // there should always be the current month if i < months || retain < 0 { // there should always be the current month
require.NoError(t, err) require.NoError(t, err)

View File

@ -74,7 +74,7 @@ func (usage *Service) ExceedsBandwidthUsage(ctx context.Context, projectID uuid.
// Get current bandwidth value from database. // Get current bandwidth value from database.
now := usage.nowFn() now := usage.nowFn()
bandwidthGetTotal, err = usage.GetProjectAllocatedBandwidth(ctx, projectID, now.Year(), now.Month()) bandwidthGetTotal, err = usage.GetProjectBandwidth(ctx, projectID, now.Year(), now.Month(), now.Day())
if err != nil { if err != nil {
return err return err
} }
@ -162,11 +162,11 @@ func (usage *Service) GetProjectBandwidthTotals(ctx context.Context, projectID u
return total, ErrProjectUsage.Wrap(err) return total, ErrProjectUsage.Wrap(err)
} }
// GetProjectAllocatedBandwidth returns project allocated bandwidth for the specified year and month. // GetProjectBandwidth returns project allocated bandwidth for the specified year, month and day.
func (usage *Service) GetProjectAllocatedBandwidth(ctx context.Context, projectID uuid.UUID, year int, month time.Month) (_ int64, err error) { func (usage *Service) GetProjectBandwidth(ctx context.Context, projectID uuid.UUID, year int, month time.Month, day int) (_ int64, err error) {
defer mon.Task()(&ctx, projectID)(&err) defer mon.Task()(&ctx, projectID)(&err)
total, err := usage.projectAccountingDB.GetProjectAllocatedBandwidth(ctx, projectID, year, month) total, err := usage.projectAccountingDB.GetProjectBandwidth(ctx, projectID, year, month, day)
return total, ErrProjectUsage.Wrap(err) return total, ErrProjectUsage.Wrap(err)
} }

View File

@ -119,21 +119,31 @@ func TestProjectUsageBandwidth(t *testing.T) {
saDB := planet.Satellites[0].DB saDB := planet.Satellites[0].DB
orderDB := saDB.Orders() orderDB := saDB.Orders()
now := time.Now()
// make sure we don't end up with a flaky test if we are in the beginning of the month as we have to add expired bandwidth allocations
if now.Day() < 5 {
now = time.Date(now.Year(), now.Month(), 5, now.Hour(), now.Minute(), now.Second(), now.Nanosecond(), now.Location())
}
bucket := metabase.BucketLocation{ProjectID: planet.Uplinks[0].Projects[0].ID, BucketName: "testbucket"} bucket := metabase.BucketLocation{ProjectID: planet.Uplinks[0].Projects[0].ID, BucketName: "testbucket"}
projectUsage := planet.Satellites[0].Accounting.ProjectUsage projectUsage := planet.Satellites[0].Accounting.ProjectUsage
// Setup: create a BucketBandwidthRollup record to test exceeding bandwidth project limit // Setup: create a BucketBandwidthRollup record to test exceeding bandwidth project limit
if testCase.expectedResource == "bandwidth" { if testCase.expectedResource == "bandwidth" {
now := time.Now()
err := setUpBucketBandwidthAllocations(ctx, bucket.ProjectID, orderDB, now) err := setUpBucketBandwidthAllocations(ctx, bucket.ProjectID, orderDB, now)
require.NoError(t, err) require.NoError(t, err)
} }
// Setup: create a BucketBandwidthRollup record that should not be taken into account as
// it is expired.
err := setUpBucketBandwidthAllocations(ctx, bucket.ProjectID, orderDB, now.Add(-72*time.Hour))
require.NoError(t, err)
// Setup: create some bytes for the uplink to upload to test the download later // Setup: create some bytes for the uplink to upload to test the download later
expectedData := testrand.Bytes(50 * memory.KiB) expectedData := testrand.Bytes(50 * memory.KiB)
filePath := "test/path" filePath := "test/path"
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucket.BucketName, filePath, expectedData) err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucket.BucketName, filePath, expectedData)
require.NoError(t, err) require.NoError(t, err)
actualExceeded, _, err := projectUsage.ExceedsBandwidthUsage(ctx, bucket.ProjectID) actualExceeded, _, err := projectUsage.ExceedsBandwidthUsage(ctx, bucket.ProjectID)
@ -167,8 +177,13 @@ func TestProjectBandwidthRollups(t *testing.T) {
time.Sleep(timeBuf) time.Sleep(timeBuf)
now = time.Now().UTC() now = time.Now().UTC()
} }
// make sure we don't end up with a flaky test if we are in the beginning of the month as we have to add expired bandwidth allocations
if now.Day() < 5 {
now = time.Date(now.Year(), now.Month(), 5, now.Hour(), now.Minute(), now.Second(), now.Nanosecond(), now.Location())
}
hour := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location()) hour := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location())
expired := time.Date(now.Year(), now.Month(), now.Day()-3, now.Hour(), 0, 0, 0, now.Location())
// things that should be counted // things that should be counted
err := db.Orders().UpdateBucketBandwidthAllocation(ctx, p1, b1, pb.PieceAction_GET, 1000, hour) err := db.Orders().UpdateBucketBandwidthAllocation(ctx, p1, b1, pb.PieceAction_GET, 1000, hour)
require.NoError(t, err) require.NoError(t, err)
@ -207,6 +222,11 @@ func TestProjectBandwidthRollups(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
err = db.Orders().UpdateBucketBandwidthAllocation(ctx, p2, b2, pb.PieceAction_GET_REPAIR, 1000, hour) err = db.Orders().UpdateBucketBandwidthAllocation(ctx, p2, b2, pb.PieceAction_GET_REPAIR, 1000, hour)
require.NoError(t, err) require.NoError(t, err)
// these two should not be counted. They are expired and have no corresponding rollup
err = db.Orders().UpdateBucketBandwidthAllocation(ctx, p1, b1, pb.PieceAction_GET, 1000, expired)
require.NoError(t, err)
err = db.Orders().UpdateBucketBandwidthAllocation(ctx, p1, b2, pb.PieceAction_GET, 1000, expired)
require.NoError(t, err)
rollups = []orders.BucketBandwidthRollup{ rollups = []orders.BucketBandwidthRollup{
{ProjectID: p1, BucketName: string(b1), Action: pb.PieceAction_PUT, Inline: 1000, Allocated: 1000, Settled: 1000}, {ProjectID: p1, BucketName: string(b1), Action: pb.PieceAction_PUT, Inline: 1000, Allocated: 1000, Settled: 1000},
@ -225,7 +245,7 @@ func TestProjectBandwidthRollups(t *testing.T) {
err = db.Orders().UpdateBucketBandwidthBatch(ctx, hour, rollups) err = db.Orders().UpdateBucketBandwidthBatch(ctx, hour, rollups)
require.NoError(t, err) require.NoError(t, err)
alloc, err := db.ProjectAccounting().GetProjectAllocatedBandwidth(ctx, p1, now.Year(), now.Month()) alloc, err := db.ProjectAccounting().GetProjectBandwidth(ctx, p1, now.Year(), now.Month(), now.Day())
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, 4000, alloc) require.EqualValues(t, 4000, alloc)
}) })
@ -301,8 +321,7 @@ func setUpBucketBandwidthAllocations(ctx *testcontext.Context, projectID uuid.UU
// that sum greater than the defaultMaxUsage // that sum greater than the defaultMaxUsage
amount := 15 * memory.GB.Int64() amount := 15 * memory.GB.Int64()
action := pb.PieceAction_GET action := pb.PieceAction_GET
intervalStart := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location()) err := orderDB.UpdateBucketBandwidthAllocation(ctx, projectID, []byte(bucketName), action, amount, now)
err := orderDB.UpdateBucketBandwidthAllocation(ctx, projectID, []byte(bucketName), action, amount, intervalStart)
if err != nil { if err != nil {
return err return err
} }

View File

@ -26,6 +26,8 @@ import (
// ensure that ProjectAccounting implements accounting.ProjectAccounting. // ensure that ProjectAccounting implements accounting.ProjectAccounting.
var _ accounting.ProjectAccounting = (*ProjectAccounting)(nil) var _ accounting.ProjectAccounting = (*ProjectAccounting)(nil)
var allocatedExpirationInDays = 2
// ProjectAccounting implements the accounting/db ProjectAccounting interface. // ProjectAccounting implements the accounting/db ProjectAccounting interface.
type ProjectAccounting struct { type ProjectAccounting struct {
db *satelliteDB db *satelliteDB
@ -135,15 +137,31 @@ func (db *ProjectAccounting) GetAllocatedBandwidthTotal(ctx context.Context, pro
return *sum, err return *sum, err
} }
// GetProjectAllocatedBandwidth returns allocated bandwidth for the specified year and month. // GetProjectBandwidth returns the used bandwidth (settled or allocated) for the specified year, month and day.
func (db *ProjectAccounting) GetProjectAllocatedBandwidth(ctx context.Context, projectID uuid.UUID, year int, month time.Month) (_ int64, err error) { func (db *ProjectAccounting) GetProjectBandwidth(ctx context.Context, projectID uuid.UUID, year int, month time.Month, day int) (_ int64, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
var egress *int64 var egress *int64
interval := time.Date(year, month, 1, 0, 0, 0, 0, time.UTC) startOfMonth := time.Date(year, month, 1, 0, 0, 0, 0, time.UTC)
query := `SELECT egress_allocated FROM project_bandwidth_rollups WHERE project_id = ? AND interval_month = ?;` var expiredSince time.Time
err = db.db.QueryRow(ctx, db.db.Rebind(query), projectID[:], interval).Scan(&egress) if day < allocatedExpirationInDays {
expiredSince = startOfMonth
} else {
expiredSince = time.Date(year, month, day-allocatedExpirationInDays+1, 0, 0, 0, 0, time.UTC)
}
periodEnd := time.Date(year, month, day, 0, 0, 0, 0, time.UTC)
query := ` WITH egress AS (
SELECT
CASE WHEN interval_day < ?
THEN egress_settled
ELSE egress_allocated
END AS amount
FROM project_bandwidth_daily_rollups
WHERE project_id = ? AND interval_day >= ? AND interval_day <= ?
) SELECT sum(amount) from egress;`
err = db.db.QueryRow(ctx, db.db.Rebind(query), expiredSince, projectID[:], startOfMonth, periodEnd).Scan(&egress)
if errors.Is(err, sql.ErrNoRows) || egress == nil { if errors.Is(err, sql.ErrNoRows) || egress == nil {
return 0, nil return 0, nil
} }
@ -166,11 +184,11 @@ func (db *ProjectAccounting) GetProjectDailyBandwidth(ctx context.Context, proje
return allocated, settled, err return allocated, settled, err
} }
// DeleteProjectAllocatedBandwidthBefore deletes project bandwidth rollups before the given time. // DeleteProjectBandwidthBefore deletes project bandwidth rollups before the given time.
func (db *ProjectAccounting) DeleteProjectAllocatedBandwidthBefore(ctx context.Context, before time.Time) (err error) { func (db *ProjectAccounting) DeleteProjectBandwidthBefore(ctx context.Context, before time.Time) (err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
_, err = db.db.DB.ExecContext(ctx, db.db.Rebind("DELETE FROM project_bandwidth_rollups WHERE interval_month < ?"), before) _, err = db.db.DB.ExecContext(ctx, db.db.Rebind("DELETE FROM project_bandwidth_daily_rollups WHERE interval_day < ?"), before)
return err return err
} }