satellite/orders: cleanup after altering primary key

We changed primary key for bucket_bandwidth_rollups table. Now we
need to do some cleanup in places like structs, sorting methods or SQL
queries.

Change-Id: Ida4f874f161356df193379a53507602e04db1668
This commit is contained in:
Michal Niewrzal 2023-03-02 11:59:10 +01:00
parent 45a47f654e
commit bc8f8f62b5
4 changed files with 42 additions and 44 deletions

View File

@ -196,8 +196,8 @@ func NewEndpoint(log *zap.Logger, satelliteSignee signing.Signee, db DB, nodeAPI
} }
type bucketIDAction struct { type bucketIDAction struct {
bucketname string
projectID uuid.UUID projectID uuid.UUID
bucketname string
action pb.PieceAction action pb.PieceAction
} }
@ -353,8 +353,8 @@ func (endpoint *Endpoint) SettlementWithWindowFinal(stream pb.DRPCOrders_Settlem
} }
currentBucketIDAction := bucketIDAction{ currentBucketIDAction := bucketIDAction{
bucketname: bucketInfo.BucketName,
projectID: bucketInfo.ProjectID, projectID: bucketInfo.ProjectID,
bucketname: bucketInfo.BucketName,
action: orderLimit.Action, action: orderLimit.Action,
} }
bucketSettled[currentBucketIDAction] = bandwidthAmount{ bucketSettled[currentBucketIDAction] = bandwidthAmount{

View File

@ -4,7 +4,6 @@
package satellitedb package satellitedb
import ( import (
"bytes"
"context" "context"
"database/sql" "database/sql"
"errors" "errors"
@ -14,7 +13,6 @@ import (
"github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4"
"github.com/zeebo/errs" "github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/pb" "storj.io/common/pb"
"storj.io/common/storj" "storj.io/common/storj"
@ -74,12 +72,12 @@ func (db *ordersDB) UpdateBucketBandwidthAllocation(ctx context.Context, project
batch.Queue(`START TRANSACTION`) batch.Queue(`START TRANSACTION`)
statement := db.db.Rebind( statement := db.db.Rebind(
`INSERT INTO bucket_bandwidth_rollups (bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled) `INSERT INTO bucket_bandwidth_rollups (project_id, bucket_name, interval_start, interval_seconds, action, inline, allocated, settled)
VALUES (?, ?, ?, ?, ?, ?, ?, ?) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(bucket_name, project_id, interval_start, action) ON CONFLICT(project_id, bucket_name, interval_start, action)
DO UPDATE SET allocated = bucket_bandwidth_rollups.allocated + ?`, DO UPDATE SET allocated = bucket_bandwidth_rollups.allocated + ?`,
) )
batch.Queue(statement, bucketName, projectID[:], intervalStart.UTC(), defaultIntervalSeconds, action, 0, uint64(amount), 0, uint64(amount)) batch.Queue(statement, projectID[:], bucketName, intervalStart.UTC(), defaultIntervalSeconds, action, 0, uint64(amount), 0, uint64(amount))
if action == pb.PieceAction_GET { if action == pb.PieceAction_GET {
dailyInterval := time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), 0, 0, 0, 0, time.UTC) dailyInterval := time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), 0, 0, 0, 0, time.UTC)
@ -113,13 +111,13 @@ func (db *ordersDB) UpdateBucketBandwidthSettle(ctx context.Context, projectID u
return db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error { return db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
statement := tx.Rebind( statement := tx.Rebind(
`INSERT INTO bucket_bandwidth_rollups (bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled) `INSERT INTO bucket_bandwidth_rollups (project_id, bucket_name, interval_start, interval_seconds, action, inline, allocated, settled)
VALUES (?, ?, ?, ?, ?, ?, ?, ?) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(bucket_name, project_id, interval_start, action) ON CONFLICT(project_id, bucket_name, interval_start, action)
DO UPDATE SET settled = bucket_bandwidth_rollups.settled + ?`, DO UPDATE SET settled = bucket_bandwidth_rollups.settled + ?`,
) )
_, err = tx.Tx.ExecContext(ctx, statement, _, err = tx.Tx.ExecContext(ctx, statement,
bucketName, projectID[:], intervalStart.UTC(), defaultIntervalSeconds, action, 0, 0, uint64(settledAmount), uint64(settledAmount), projectID[:], bucketName, intervalStart.UTC(), defaultIntervalSeconds, action, 0, 0, uint64(settledAmount), uint64(settledAmount),
) )
if err != nil { if err != nil {
return ErrUpdateBucketBandwidthSettle.Wrap(err) return ErrUpdateBucketBandwidthSettle.Wrap(err)
@ -149,13 +147,13 @@ func (db *ordersDB) UpdateBucketBandwidthInline(ctx context.Context, projectID u
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
statement := db.db.Rebind( statement := db.db.Rebind(
`INSERT INTO bucket_bandwidth_rollups (bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled) `INSERT INTO bucket_bandwidth_rollups (project_id, bucket_name, interval_start, interval_seconds, action, inline, allocated, settled)
VALUES (?, ?, ?, ?, ?, ?, ?, ?) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(bucket_name, project_id, interval_start, action) ON CONFLICT(project_id, bucket_name, interval_start, action)
DO UPDATE SET inline = bucket_bandwidth_rollups.inline + ?`, DO UPDATE SET inline = bucket_bandwidth_rollups.inline + ?`,
) )
_, err = db.db.ExecContext(ctx, statement, _, err = db.db.ExecContext(ctx, statement,
bucketName, projectID[:], intervalStart.UTC(), defaultIntervalSeconds, action, uint64(amount), 0, 0, uint64(amount), projectID[:], bucketName, intervalStart.UTC(), defaultIntervalSeconds, action, uint64(amount), 0, 0, uint64(amount),
) )
if err != nil { if err != nil {
return err return err
@ -187,8 +185,8 @@ func (db *ordersDB) GetBucketBandwidth(ctx context.Context, projectID uuid.UUID,
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
var sum *int64 var sum *int64
query := `SELECT SUM(settled) FROM bucket_bandwidth_rollups WHERE bucket_name = ? AND project_id = ? AND interval_start > ? AND interval_start <= ?` query := `SELECT SUM(settled) FROM bucket_bandwidth_rollups WHERE project_id = ? AND bucket_name = ? AND interval_start > ? AND interval_start <= ?`
err = db.db.QueryRow(ctx, db.db.Rebind(query), bucketName, projectID[:], from.UTC(), to.UTC()).Scan(&sum) err = db.db.QueryRow(ctx, db.db.Rebind(query), projectID[:], bucketName, from.UTC(), to.UTC()).Scan(&sum)
if errors.Is(err, sql.ErrNoRows) || sum == nil { if errors.Is(err, sql.ErrNoRows) || sum == nil {
return 0, nil return 0, nil
} }
@ -230,24 +228,24 @@ func (db *ordersDB) GetStorageNodeBandwidth(ctx context.Context, nodeID storj.No
func (db *ordersDB) UpdateBandwidthBatch(ctx context.Context, rollups []orders.BucketBandwidthRollup) (err error) { func (db *ordersDB) UpdateBandwidthBatch(ctx context.Context, rollups []orders.BucketBandwidthRollup) (err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
if len(rollups) == 0 {
return nil
}
return db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error { return db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
if len(rollups) == 0 {
return nil
}
// TODO reorg code to make clear what we are inserting/updating to // TODO reorg code to make clear what we are inserting/updating to
// bucket_bandwidth_rollups and project_bandwidth_daily_rollups // bucket_bandwidth_rollups and project_bandwidth_daily_rollups
bucketRUMap := rollupBandwidth(rollups, toHourlyInterval, getBucketRollupKey) bucketRUMap := rollupBandwidth(rollups, toHourlyInterval, getBucketRollupKey)
inlineSlice := make([]int64, 0, len(bucketRUMap))
settledSlice := make([]int64, 0, len(bucketRUMap))
bucketNames := make([][]byte, 0, len(bucketRUMap))
projectIDs := make([]uuid.UUID, 0, len(bucketRUMap)) projectIDs := make([]uuid.UUID, 0, len(bucketRUMap))
bucketNames := make([][]byte, 0, len(bucketRUMap))
intervalStartSlice := make([]time.Time, 0, len(bucketRUMap)) intervalStartSlice := make([]time.Time, 0, len(bucketRUMap))
actionSlice := make([]int32, 0, len(bucketRUMap)) actionSlice := make([]int32, 0, len(bucketRUMap))
inlineSlice := make([]int64, 0, len(bucketRUMap))
settledSlice := make([]int64, 0, len(bucketRUMap))
bucketRUMapKeys := make([]bandwidthRollupKey, 0, len(bucketRUMap)) bucketRUMapKeys := make([]bandwidthRollupKey, 0, len(bucketRUMap))
for key := range bucketRUMap { for key := range bucketRUMap {
@ -259,12 +257,12 @@ func (db *ordersDB) UpdateBandwidthBatch(ctx context.Context, rollups []orders.B
for _, rollupInfo := range bucketRUMapKeys { for _, rollupInfo := range bucketRUMapKeys {
usage := bucketRUMap[rollupInfo] usage := bucketRUMap[rollupInfo]
if usage.Inline != 0 || usage.Settled != 0 { if usage.Inline != 0 || usage.Settled != 0 {
inlineSlice = append(inlineSlice, usage.Inline)
settledSlice = append(settledSlice, usage.Settled)
bucketNames = append(bucketNames, []byte(rollupInfo.BucketName))
projectIDs = append(projectIDs, rollupInfo.ProjectID) projectIDs = append(projectIDs, rollupInfo.ProjectID)
bucketNames = append(bucketNames, []byte(rollupInfo.BucketName))
intervalStartSlice = append(intervalStartSlice, time.Unix(rollupInfo.IntervalStart, 0)) intervalStartSlice = append(intervalStartSlice, time.Unix(rollupInfo.IntervalStart, 0))
actionSlice = append(actionSlice, int32(rollupInfo.Action)) actionSlice = append(actionSlice, int32(rollupInfo.Action))
inlineSlice = append(inlineSlice, usage.Inline)
settledSlice = append(settledSlice, usage.Settled)
} }
} }
@ -274,22 +272,22 @@ func (db *ordersDB) UpdateBandwidthBatch(ctx context.Context, rollups []orders.B
if len(projectIDs) > 0 { if len(projectIDs) > 0 {
_, err = tx.Tx.ExecContext(ctx, ` _, err = tx.Tx.ExecContext(ctx, `
INSERT INTO bucket_bandwidth_rollups ( INSERT INTO bucket_bandwidth_rollups (
bucket_name, project_id, project_id, bucket_name,
interval_start, interval_seconds, interval_start, interval_seconds,
action, inline, allocated, settled) action, inline, allocated, settled)
SELECT SELECT
unnest($1::bytea[]), unnest($2::bytea[]), unnest($3::timestamptz[]), unnest($1::bytea[]), unnest($2::bytea[]), unnest($3::timestamptz[]),
$4, $4,
unnest($5::int4[]), unnest($6::bigint[]), unnest($7::bigint[]), unnest($8::bigint[]) unnest($5::int4[]), unnest($6::bigint[]), unnest($7::bigint[]), unnest($8::bigint[])
ON CONFLICT(bucket_name, project_id, interval_start, action) ON CONFLICT(project_id, bucket_name, interval_start, action)
DO UPDATE SET DO UPDATE SET
inline = bucket_bandwidth_rollups.inline + EXCLUDED.inline, inline = bucket_bandwidth_rollups.inline + EXCLUDED.inline,
settled = bucket_bandwidth_rollups.settled + EXCLUDED.settled settled = bucket_bandwidth_rollups.settled + EXCLUDED.settled
`, pgutil.ByteaArray(bucketNames), pgutil.UUIDArray(projectIDs), pgutil.TimestampTZArray(intervalStartSlice), `, pgutil.UUIDArray(projectIDs), pgutil.ByteaArray(bucketNames), pgutil.TimestampTZArray(intervalStartSlice),
defaultIntervalSeconds, defaultIntervalSeconds,
pgutil.Int4Array(actionSlice), pgutil.Int8Array(inlineSlice), pgutil.Int8Array(emptyAllocatedSlice), pgutil.Int8Array(settledSlice)) pgutil.Int4Array(actionSlice), pgutil.Int8Array(inlineSlice), pgutil.Int8Array(emptyAllocatedSlice), pgutil.Int8Array(settledSlice))
if err != nil { if err != nil {
db.db.log.Error("Bucket bandwidth rollup batch flush failed.", zap.Error(err)) return errs.New("bucket bandwidth rollup batch flush failed: %+v", err)
} }
} }
@ -332,10 +330,10 @@ func (db *ordersDB) UpdateBandwidthBatch(ctx context.Context, rollups []orders.B
egress_dead = project_bandwidth_daily_rollups.egress_dead + EXCLUDED.egress_dead::bigint egress_dead = project_bandwidth_daily_rollups.egress_dead + EXCLUDED.egress_dead::bigint
`, pgutil.UUIDArray(projectIDs), pgutil.DateArray(intervalStartSlice), pgutil.Int8Array(allocatedSlice), pgutil.Int8Array(settledSlice), pgutil.Int8Array(deadSlice)) `, pgutil.UUIDArray(projectIDs), pgutil.DateArray(intervalStartSlice), pgutil.Int8Array(allocatedSlice), pgutil.Int8Array(settledSlice), pgutil.Int8Array(deadSlice))
if err != nil { if err != nil {
db.db.log.Error("Project bandwidth daily rollup batch flush failed.", zap.Error(err)) return errs.New("project bandwidth daily rollup batch flush failed: %+v", err)
} }
} }
return err return nil
}) })
} }
@ -484,16 +482,16 @@ func getProjectRollupKey(rollup orders.BucketBandwidthRollup, toInterval func(ti
func sortBandwidthRollupKeys(bandwidthRollupKeys []bandwidthRollupKey) { func sortBandwidthRollupKeys(bandwidthRollupKeys []bandwidthRollupKey) {
sort.SliceStable(bandwidthRollupKeys, func(i, j int) bool { sort.SliceStable(bandwidthRollupKeys, func(i, j int) bool {
uuidCompare := bytes.Compare(bandwidthRollupKeys[i].ProjectID[:], bandwidthRollupKeys[j].ProjectID[:]) uuidCompare := bandwidthRollupKeys[i].ProjectID.Compare(bandwidthRollupKeys[j].ProjectID)
switch { switch {
case bandwidthRollupKeys[i].BucketName < bandwidthRollupKeys[j].BucketName:
return true
case bandwidthRollupKeys[i].BucketName > bandwidthRollupKeys[j].BucketName:
return false
case uuidCompare == -1: case uuidCompare == -1:
return true return true
case uuidCompare == 1: case uuidCompare == 1:
return false return false
case bandwidthRollupKeys[i].BucketName < bandwidthRollupKeys[j].BucketName:
return true
case bandwidthRollupKeys[i].BucketName > bandwidthRollupKeys[j].BucketName:
return false
case bandwidthRollupKeys[i].IntervalStart < bandwidthRollupKeys[j].IntervalStart: case bandwidthRollupKeys[i].IntervalStart < bandwidthRollupKeys[j].IntervalStart:
return true return true
case bandwidthRollupKeys[i].IntervalStart > bandwidthRollupKeys[j].IntervalStart: case bandwidthRollupKeys[i].IntervalStart > bandwidthRollupKeys[j].IntervalStart:

View File

@ -67,18 +67,18 @@ func TestSortRollupKeys(t *testing.T) {
IntervalStart: 5, IntervalStart: 5,
Action: pb.PieceAction_GET, Action: pb.PieceAction_GET,
}, },
{
ProjectID: uuid.UUID{2},
BucketName: "a",
IntervalStart: 2,
Action: pb.PieceAction_GET,
},
{ {
ProjectID: uuid.UUID{1}, ProjectID: uuid.UUID{1},
BucketName: "b", BucketName: "b",
IntervalStart: 3, IntervalStart: 3,
Action: pb.PieceAction_GET, Action: pb.PieceAction_GET,
}, },
{
ProjectID: uuid.UUID{2},
BucketName: "a",
IntervalStart: 2,
Action: pb.PieceAction_GET,
},
} }
assert.NotEqual(t, expRollups, rollups) assert.NotEqual(t, expRollups, rollups)

View File

@ -552,8 +552,8 @@ func (db *ProjectAccounting) GetProjectTotalByPartner(ctx context.Context, proje
FROM FROM
bucket_bandwidth_rollups bucket_bandwidth_rollups
WHERE WHERE
bucket_name = ? AND
project_id = ? AND project_id = ? AND
bucket_name = ? AND
interval_start >= ? AND interval_start >= ? AND
interval_start < ? AND interval_start < ? AND
action = ?; action = ?;
@ -624,7 +624,7 @@ func (db *ProjectAccounting) GetProjectTotalByPartner(ctx context.Context, proje
return nil, err return nil, err
} }
totalEgressRow := db.db.QueryRowContext(ctx, totalEgressQuery, []byte(bucket), projectID[:], since, before, pb.PieceAction_GET) totalEgressRow := db.db.QueryRowContext(ctx, totalEgressQuery, projectID[:], []byte(bucket), since, before, pb.PieceAction_GET)
if err != nil { if err != nil {
return nil, err return nil, err
} }