satellite/orders: populate

project_bandwidth_daily_rollups table

We want to calculate used bandwidth better so we need to calculate it
from allocated and settled bandwidth. To do this we need first populate
this new table.

https://storjlabs.atlassian.net/browse/PG-56

Change-Id: I308b737bf08ee48ce4e46a3605697ab2095f7257
This commit is contained in:
Michał Niewrzał 2021-05-17 16:07:59 +02:00 committed by Michal Niewrzal
parent 02fc87e98b
commit 59eabcca24
4 changed files with 168 additions and 50 deletions

View File

@ -185,6 +185,8 @@ type ProjectAccounting interface {
GetAllocatedBandwidthTotal(ctx context.Context, projectID uuid.UUID, from time.Time) (int64, error)
// GetProjectAllocatedBandwidth returns project allocated bandwidth for the specified year and month.
GetProjectAllocatedBandwidth(ctx context.Context, projectID uuid.UUID, year int, month time.Month) (int64, error)
// GetProjectDailyBandwidth returns bandwidth (allocated and settled) for the specified day.
GetProjectDailyBandwidth(ctx context.Context, projectID uuid.UUID, year int, month time.Month, day int) (int64, int64, error)
// DeleteProjectAllocatedBandwidthBefore deletes project bandwidth rollups before the given time
DeleteProjectAllocatedBandwidthBefore(ctx context.Context, before time.Time) error

View File

@ -352,3 +352,50 @@ func TestSettledAmountsMatch(t *testing.T) {
})
}
}
func TestProjectBandwidthDailyRollups(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.ReconfigureRS(2, 3, 4, 4),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
// stop any async flushes because we want to be sure when some values are
// written to avoid races
planet.Satellites[0].Orders.Chore.Loop.Pause()
now := time.Now()
tomorrow := now.Add(24 * time.Hour)
planet.Satellites[0].Audit.Worker.Loop.Pause()
for _, storageNode := range planet.StorageNodes {
storageNode.Storage2.Orders.Sender.Pause()
}
expectedData := testrand.Bytes(50 * memory.KiB)
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket0", "test/path", expectedData)
require.NoError(t, err)
data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], "testbucket0", "test/path")
require.NoError(t, err)
require.Equal(t, expectedData, data)
// Wait for storage nodes to propagate all information.
require.NoError(t, planet.WaitForStorageNodeEndpoints(ctx))
// Have the nodes send up the orders.
for _, storageNode := range planet.StorageNodes {
storageNode.Storage2.Orders.SendOrders(ctx, tomorrow)
}
// flush rollups write cache
planet.Satellites[0].Orders.Chore.Loop.TriggerWait()
projectAccountingDB := planet.Satellites[0].DB.ProjectAccounting()
year, month, day := now.Year(), now.Month(), now.Day()
allocated, settled, err := projectAccountingDB.GetProjectDailyBandwidth(ctx, planet.Uplinks[0].Projects[0].ID, year, month, day)
require.NoError(t, err)
require.NotZero(t, allocated)
require.Equal(t, allocated, settled)
})
}

View File

@ -10,6 +10,7 @@ import (
"reflect"
"time"
"github.com/jackc/pgx/v4"
"github.com/zeebo/errs"
"go.uber.org/zap"
@ -17,6 +18,7 @@ import (
"storj.io/common/storj"
"storj.io/common/uuid"
"storj.io/private/dbutil/pgutil"
"storj.io/private/dbutil/pgxutil"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/satellitedb/dbx"
)
@ -46,56 +48,89 @@ type ordersDB struct {
func (db *ordersDB) UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
err = db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
statement := tx.Rebind(
return pgxutil.Conn(ctx, db.db, func(conn *pgx.Conn) error {
var batch pgx.Batch
// TODO decide if we need to have transaction here
batch.Queue(`START TRANSACTION`)
statement := db.db.Rebind(
`INSERT INTO bucket_bandwidth_rollups (bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(bucket_name, project_id, interval_start, action)
DO UPDATE SET allocated = bucket_bandwidth_rollups.allocated + ?`,
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(bucket_name, project_id, interval_start, action)
DO UPDATE SET allocated = bucket_bandwidth_rollups.allocated + ?`,
)
_, err = tx.Tx.ExecContext(ctx, statement,
bucketName, projectID[:], intervalStart.UTC(), defaultIntervalSeconds, action, 0, uint64(amount), 0, uint64(amount),
)
if err != nil {
return err
}
batch.Queue(statement, bucketName, projectID[:], intervalStart.UTC(), defaultIntervalSeconds, action, 0, uint64(amount), 0, uint64(amount))
if action == pb.PieceAction_GET {
// TODO remove when project_bandwidth_daily_rollups will be used
projectInterval := time.Date(intervalStart.Year(), intervalStart.Month(), 1, 0, 0, 0, 0, time.UTC)
statement = tx.Rebind(
statement = db.db.Rebind(
`INSERT INTO project_bandwidth_rollups (project_id, interval_month, egress_allocated)
VALUES (?, ?, ?)
ON CONFLICT(project_id, interval_month)
DO UPDATE SET egress_allocated = project_bandwidth_rollups.egress_allocated + EXCLUDED.egress_allocated::bigint`,
)
_, err = tx.Tx.ExecContext(ctx, statement, projectID[:], projectInterval, uint64(amount))
if err != nil {
return err
}
}
return nil
})
batch.Queue(statement, projectID[:], projectInterval, uint64(amount))
return nil
dailyInterval := time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), 0, 0, 0, 0, time.UTC)
statement = db.db.Rebind(
`INSERT INTO project_bandwidth_daily_rollups (project_id, interval_day, egress_allocated, egress_settled)
VALUES (?, ?, ?, ?)
ON CONFLICT(project_id, interval_day)
DO UPDATE SET egress_allocated = project_bandwidth_daily_rollups.egress_allocated + EXCLUDED.egress_allocated::BIGINT`,
)
batch.Queue(statement, projectID[:], dailyInterval, uint64(amount), 0)
}
batch.Queue(`COMMIT TRANSACTION`)
results := conn.SendBatch(ctx, &batch)
defer func() { err = errs.Combine(err, results.Close()) }()
var errlist errs.Group
for i := 0; i < batch.Len(); i++ {
_, err := results.Exec()
errlist.Add(err)
}
return errlist.Err()
})
}
// UpdateBucketBandwidthSettle updates 'settled' bandwidth for given bucket.
func (db *ordersDB) UpdateBucketBandwidthSettle(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
statement := db.db.Rebind(
`INSERT INTO bucket_bandwidth_rollups (bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(bucket_name, project_id, interval_start, action)
DO UPDATE SET settled = bucket_bandwidth_rollups.settled + ?`,
)
_, err = db.db.ExecContext(ctx, statement,
bucketName, projectID[:], intervalStart.UTC(), defaultIntervalSeconds, action, 0, 0, uint64(amount), uint64(amount),
)
if err != nil {
return ErrUpdateBucketBandwidthSettle.Wrap(err)
}
return nil
return db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
statement := db.db.Rebind(
`INSERT INTO bucket_bandwidth_rollups (bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(bucket_name, project_id, interval_start, action)
DO UPDATE SET settled = bucket_bandwidth_rollups.settled + ?`,
)
_, err = db.db.ExecContext(ctx, statement,
bucketName, projectID[:], intervalStart.UTC(), defaultIntervalSeconds, action, 0, 0, uint64(amount), uint64(amount),
)
if err != nil {
return ErrUpdateBucketBandwidthSettle.Wrap(err)
}
if action == pb.PieceAction_GET {
dailyInterval := time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), 0, 0, 0, 0, time.UTC)
statement = tx.Rebind(
`INSERT INTO project_bandwidth_daily_rollups (project_id, interval_day, egress_allocated, egress_settled)
VALUES (?, ?, ?, ?)
ON CONFLICT(project_id, interval_day)
DO UPDATE SET egress_settled = project_bandwidth_daily_rollups.egress_settled + EXCLUDED.egress_settled::BIGINT`,
)
_, err = tx.Tx.ExecContext(ctx, statement, projectID[:], dailyInterval, 0, uint64(amount))
if err != nil {
return err
}
}
return nil
})
}
// UpdateBucketBandwidthInline updates 'inline' bandwidth for given bucket.
@ -201,7 +236,12 @@ func (db *ordersDB) UpdateBucketBandwidthBatch(ctx context.Context, intervalStar
var inlineSlice []int64
var allocatedSlice []int64
var settledSlice []int64
projectRUMap := make(map[string]int64)
type bandwidth struct {
Allocated int64
Settled int64
}
projectRUMap := make(map[uuid.UUID]bandwidth)
for _, rollup := range rollups {
rollup := rollup
@ -213,7 +253,10 @@ func (db *ordersDB) UpdateBucketBandwidthBatch(ctx context.Context, intervalStar
settledSlice = append(settledSlice, rollup.Settled)
if rollup.Action == pb.PieceAction_GET {
projectRUMap[rollup.ProjectID.String()] += rollup.Allocated
b := projectRUMap[rollup.ProjectID]
b.Allocated += rollup.Allocated
b.Settled += rollup.Settled
projectRUMap[rollup.ProjectID] = b
}
}
@ -238,31 +281,42 @@ func (db *ordersDB) UpdateBucketBandwidthBatch(ctx context.Context, intervalStar
db.db.log.Error("Bucket bandwidth rollup batch flush failed.", zap.Error(err))
}
var projectRUIDs [][]byte
projectRUIDs := make([]uuid.UUID, 0, len(projectRUMap))
var projectRUAllocated []int64
var projectRUSettled []int64
projectInterval := time.Date(intervalStart.Year(), intervalStart.Month(), 1, intervalStart.Hour(), 0, 0, 0, time.UTC)
dailyInterval := time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), 0, 0, 0, 0, time.UTC)
for k, v := range projectRUMap {
projectID, err := uuid.FromString(k)
if err != nil {
db.db.log.Error("Could not parse project UUID.", zap.Error(err))
continue
}
projectRUIDs = append(projectRUIDs, projectID[:])
projectRUAllocated = append(projectRUAllocated, v)
for projectID, v := range projectRUMap {
projectRUIDs = append(projectRUIDs, projectID)
projectRUAllocated = append(projectRUAllocated, v.Allocated)
projectRUSettled = append(projectRUSettled, v.Settled)
}
if len(projectRUIDs) > 0 {
// TODO remove when project_bandwidth_daily_rollups will be used
_, err = tx.Tx.ExecContext(ctx, `
INSERT INTO project_bandwidth_rollups(project_id, interval_month, egress_allocated)
SELECT unnest($1::bytea[]), $2, unnest($3::bigint[])
ON CONFLICT(project_id, interval_month)
DO UPDATE SET egress_allocated = project_bandwidth_rollups.egress_allocated + EXCLUDED.egress_allocated::bigint;
`,
pgutil.ByteaArray(projectRUIDs), projectInterval, pgutil.Int8Array(projectRUAllocated))
INSERT INTO project_bandwidth_rollups(project_id, interval_month, egress_allocated)
SELECT unnest($1::bytea[]), $2, unnest($3::bigint[])
ON CONFLICT(project_id, interval_month)
DO UPDATE SET egress_allocated = project_bandwidth_rollups.egress_allocated + EXCLUDED.egress_allocated::bigint;
`,
pgutil.UUIDArray(projectRUIDs), projectInterval, pgutil.Int8Array(projectRUAllocated))
if err != nil {
db.db.log.Error("Project bandwidth rollup batch flush failed.", zap.Error(err))
}
_, err = tx.Tx.ExecContext(ctx, `
INSERT INTO project_bandwidth_daily_rollups(project_id, interval_day, egress_allocated, egress_settled)
SELECT unnest($1::bytea[]), $2, unnest($3::bigint[]), unnest($4::bigint[])
ON CONFLICT(project_id, interval_day)
DO UPDATE SET
egress_allocated = project_bandwidth_daily_rollups.egress_allocated + EXCLUDED.egress_allocated::bigint,
egress_settled = project_bandwidth_daily_rollups.egress_settled + EXCLUDED.egress_settled::bigint
`, pgutil.UUIDArray(projectRUIDs), dailyInterval, pgutil.Int8Array(projectRUAllocated), pgutil.Int8Array(projectRUSettled))
if err != nil {
db.db.log.Error("Project bandwidth daily rollup batch flush failed.", zap.Error(err))
}
}
return err
})

View File

@ -151,6 +151,21 @@ func (db *ProjectAccounting) GetProjectAllocatedBandwidth(ctx context.Context, p
return *egress, err
}
// GetProjectDailyBandwidth returns project bandwidth (allocated and settled) for the specified day.
func (db *ProjectAccounting) GetProjectDailyBandwidth(ctx context.Context, projectID uuid.UUID, year int, month time.Month, day int) (allocated int64, settled int64, err error) {
defer mon.Task()(&ctx)(&err)
interval := time.Date(year, month, day, 0, 0, 0, 0, time.UTC)
query := `SELECT egress_allocated, egress_settled FROM project_bandwidth_daily_rollups WHERE project_id = ? AND interval_day = ?;`
err = db.db.QueryRow(ctx, db.db.Rebind(query), projectID[:], interval).Scan(&allocated, &settled)
if errors.Is(err, sql.ErrNoRows) {
return 0, 0, nil
}
return allocated, settled, err
}
// DeleteProjectAllocatedBandwidthBefore deletes project bandwidth rollups before the given time.
func (db *ProjectAccounting) DeleteProjectAllocatedBandwidthBefore(ctx context.Context, before time.Time) (err error) {
defer mon.Task()(&ctx)(&err)