diff --git a/satellite/orders/endpoint.go b/satellite/orders/endpoint.go index d18c7c228..b276a6824 100644 --- a/satellite/orders/endpoint.go +++ b/satellite/orders/endpoint.go @@ -196,8 +196,8 @@ func NewEndpoint(log *zap.Logger, satelliteSignee signing.Signee, db DB, nodeAPI } type bucketIDAction struct { - bucketname string projectID uuid.UUID + bucketname string action pb.PieceAction } @@ -353,8 +353,8 @@ func (endpoint *Endpoint) SettlementWithWindowFinal(stream pb.DRPCOrders_Settlem } currentBucketIDAction := bucketIDAction{ - bucketname: bucketInfo.BucketName, projectID: bucketInfo.ProjectID, + bucketname: bucketInfo.BucketName, action: orderLimit.Action, } bucketSettled[currentBucketIDAction] = bandwidthAmount{ diff --git a/satellite/satellitedb/orders.go b/satellite/satellitedb/orders.go index 79a1f071c..9bfa631ed 100644 --- a/satellite/satellitedb/orders.go +++ b/satellite/satellitedb/orders.go @@ -4,7 +4,6 @@ package satellitedb import ( - "bytes" "context" "database/sql" "errors" @@ -14,7 +13,6 @@ import ( "github.com/jackc/pgx/v4" "github.com/zeebo/errs" - "go.uber.org/zap" "storj.io/common/pb" "storj.io/common/storj" @@ -74,12 +72,12 @@ func (db *ordersDB) UpdateBucketBandwidthAllocation(ctx context.Context, project batch.Queue(`START TRANSACTION`) statement := db.db.Rebind( - `INSERT INTO bucket_bandwidth_rollups (bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled) + `INSERT INTO bucket_bandwidth_rollups (project_id, bucket_name, interval_start, interval_seconds, action, inline, allocated, settled) VALUES (?, ?, ?, ?, ?, ?, ?, ?) - ON CONFLICT(bucket_name, project_id, interval_start, action) + ON CONFLICT(project_id, bucket_name, interval_start, action) DO UPDATE SET allocated = bucket_bandwidth_rollups.allocated + ?`, ) - batch.Queue(statement, bucketName, projectID[:], intervalStart.UTC(), defaultIntervalSeconds, action, 0, uint64(amount), 0, uint64(amount)) + batch.Queue(statement, projectID[:], bucketName, intervalStart.UTC(), defaultIntervalSeconds, action, 0, uint64(amount), 0, uint64(amount)) if action == pb.PieceAction_GET { dailyInterval := time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), 0, 0, 0, 0, time.UTC) @@ -113,13 +111,13 @@ func (db *ordersDB) UpdateBucketBandwidthSettle(ctx context.Context, projectID u return db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error { statement := tx.Rebind( - `INSERT INTO bucket_bandwidth_rollups (bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled) + `INSERT INTO bucket_bandwidth_rollups (project_id, bucket_name, interval_start, interval_seconds, action, inline, allocated, settled) VALUES (?, ?, ?, ?, ?, ?, ?, ?) - ON CONFLICT(bucket_name, project_id, interval_start, action) + ON CONFLICT(project_id, bucket_name, interval_start, action) DO UPDATE SET settled = bucket_bandwidth_rollups.settled + ?`, ) _, err = tx.Tx.ExecContext(ctx, statement, - bucketName, projectID[:], intervalStart.UTC(), defaultIntervalSeconds, action, 0, 0, uint64(settledAmount), uint64(settledAmount), + projectID[:], bucketName, intervalStart.UTC(), defaultIntervalSeconds, action, 0, 0, uint64(settledAmount), uint64(settledAmount), ) if err != nil { return ErrUpdateBucketBandwidthSettle.Wrap(err) @@ -149,13 +147,13 @@ func (db *ordersDB) UpdateBucketBandwidthInline(ctx context.Context, projectID u defer mon.Task()(&ctx)(&err) statement := db.db.Rebind( - `INSERT INTO bucket_bandwidth_rollups (bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled) + `INSERT INTO bucket_bandwidth_rollups (project_id, bucket_name, interval_start, interval_seconds, action, inline, allocated, settled) VALUES (?, ?, ?, ?, ?, ?, ?, ?) - ON CONFLICT(bucket_name, project_id, interval_start, action) + ON CONFLICT(project_id, bucket_name, interval_start, action) DO UPDATE SET inline = bucket_bandwidth_rollups.inline + ?`, ) _, err = db.db.ExecContext(ctx, statement, - bucketName, projectID[:], intervalStart.UTC(), defaultIntervalSeconds, action, uint64(amount), 0, 0, uint64(amount), + projectID[:], bucketName, intervalStart.UTC(), defaultIntervalSeconds, action, uint64(amount), 0, 0, uint64(amount), ) if err != nil { return err @@ -187,8 +185,8 @@ func (db *ordersDB) GetBucketBandwidth(ctx context.Context, projectID uuid.UUID, defer mon.Task()(&ctx)(&err) var sum *int64 - query := `SELECT SUM(settled) FROM bucket_bandwidth_rollups WHERE bucket_name = ? AND project_id = ? AND interval_start > ? AND interval_start <= ?` - err = db.db.QueryRow(ctx, db.db.Rebind(query), bucketName, projectID[:], from.UTC(), to.UTC()).Scan(&sum) + query := `SELECT SUM(settled) FROM bucket_bandwidth_rollups WHERE project_id = ? AND bucket_name = ? AND interval_start > ? AND interval_start <= ?` + err = db.db.QueryRow(ctx, db.db.Rebind(query), projectID[:], bucketName, from.UTC(), to.UTC()).Scan(&sum) if errors.Is(err, sql.ErrNoRows) || sum == nil { return 0, nil } @@ -230,24 +228,24 @@ func (db *ordersDB) GetStorageNodeBandwidth(ctx context.Context, nodeID storj.No func (db *ordersDB) UpdateBandwidthBatch(ctx context.Context, rollups []orders.BucketBandwidthRollup) (err error) { defer mon.Task()(&ctx)(&err) + if len(rollups) == 0 { + return nil + } + return db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error { defer mon.Task()(&ctx)(&err) - if len(rollups) == 0 { - return nil - } - // TODO reorg code to make clear what we are inserting/updating to // bucket_bandwidth_rollups and project_bandwidth_daily_rollups bucketRUMap := rollupBandwidth(rollups, toHourlyInterval, getBucketRollupKey) - inlineSlice := make([]int64, 0, len(bucketRUMap)) - settledSlice := make([]int64, 0, len(bucketRUMap)) - bucketNames := make([][]byte, 0, len(bucketRUMap)) projectIDs := make([]uuid.UUID, 0, len(bucketRUMap)) + bucketNames := make([][]byte, 0, len(bucketRUMap)) intervalStartSlice := make([]time.Time, 0, len(bucketRUMap)) actionSlice := make([]int32, 0, len(bucketRUMap)) + inlineSlice := make([]int64, 0, len(bucketRUMap)) + settledSlice := make([]int64, 0, len(bucketRUMap)) bucketRUMapKeys := make([]bandwidthRollupKey, 0, len(bucketRUMap)) for key := range bucketRUMap { @@ -259,12 +257,12 @@ func (db *ordersDB) UpdateBandwidthBatch(ctx context.Context, rollups []orders.B for _, rollupInfo := range bucketRUMapKeys { usage := bucketRUMap[rollupInfo] if usage.Inline != 0 || usage.Settled != 0 { - inlineSlice = append(inlineSlice, usage.Inline) - settledSlice = append(settledSlice, usage.Settled) - bucketNames = append(bucketNames, []byte(rollupInfo.BucketName)) projectIDs = append(projectIDs, rollupInfo.ProjectID) + bucketNames = append(bucketNames, []byte(rollupInfo.BucketName)) intervalStartSlice = append(intervalStartSlice, time.Unix(rollupInfo.IntervalStart, 0)) actionSlice = append(actionSlice, int32(rollupInfo.Action)) + inlineSlice = append(inlineSlice, usage.Inline) + settledSlice = append(settledSlice, usage.Settled) } } @@ -274,22 +272,22 @@ func (db *ordersDB) UpdateBandwidthBatch(ctx context.Context, rollups []orders.B if len(projectIDs) > 0 { _, err = tx.Tx.ExecContext(ctx, ` INSERT INTO bucket_bandwidth_rollups ( - bucket_name, project_id, + project_id, bucket_name, interval_start, interval_seconds, action, inline, allocated, settled) SELECT unnest($1::bytea[]), unnest($2::bytea[]), unnest($3::timestamptz[]), $4, unnest($5::int4[]), unnest($6::bigint[]), unnest($7::bigint[]), unnest($8::bigint[]) - ON CONFLICT(bucket_name, project_id, interval_start, action) + ON CONFLICT(project_id, bucket_name, interval_start, action) DO UPDATE SET inline = bucket_bandwidth_rollups.inline + EXCLUDED.inline, settled = bucket_bandwidth_rollups.settled + EXCLUDED.settled - `, pgutil.ByteaArray(bucketNames), pgutil.UUIDArray(projectIDs), pgutil.TimestampTZArray(intervalStartSlice), + `, pgutil.UUIDArray(projectIDs), pgutil.ByteaArray(bucketNames), pgutil.TimestampTZArray(intervalStartSlice), defaultIntervalSeconds, pgutil.Int4Array(actionSlice), pgutil.Int8Array(inlineSlice), pgutil.Int8Array(emptyAllocatedSlice), pgutil.Int8Array(settledSlice)) if err != nil { - db.db.log.Error("Bucket bandwidth rollup batch flush failed.", zap.Error(err)) + return errs.New("bucket bandwidth rollup batch flush failed: %+v", err) } } @@ -332,10 +330,10 @@ func (db *ordersDB) UpdateBandwidthBatch(ctx context.Context, rollups []orders.B egress_dead = project_bandwidth_daily_rollups.egress_dead + EXCLUDED.egress_dead::bigint `, pgutil.UUIDArray(projectIDs), pgutil.DateArray(intervalStartSlice), pgutil.Int8Array(allocatedSlice), pgutil.Int8Array(settledSlice), pgutil.Int8Array(deadSlice)) if err != nil { - db.db.log.Error("Project bandwidth daily rollup batch flush failed.", zap.Error(err)) + return errs.New("project bandwidth daily rollup batch flush failed: %+v", err) } } - return err + return nil }) } @@ -484,16 +482,16 @@ func getProjectRollupKey(rollup orders.BucketBandwidthRollup, toInterval func(ti func sortBandwidthRollupKeys(bandwidthRollupKeys []bandwidthRollupKey) { sort.SliceStable(bandwidthRollupKeys, func(i, j int) bool { - uuidCompare := bytes.Compare(bandwidthRollupKeys[i].ProjectID[:], bandwidthRollupKeys[j].ProjectID[:]) + uuidCompare := bandwidthRollupKeys[i].ProjectID.Compare(bandwidthRollupKeys[j].ProjectID) switch { - case bandwidthRollupKeys[i].BucketName < bandwidthRollupKeys[j].BucketName: - return true - case bandwidthRollupKeys[i].BucketName > bandwidthRollupKeys[j].BucketName: - return false case uuidCompare == -1: return true case uuidCompare == 1: return false + case bandwidthRollupKeys[i].BucketName < bandwidthRollupKeys[j].BucketName: + return true + case bandwidthRollupKeys[i].BucketName > bandwidthRollupKeys[j].BucketName: + return false case bandwidthRollupKeys[i].IntervalStart < bandwidthRollupKeys[j].IntervalStart: return true case bandwidthRollupKeys[i].IntervalStart > bandwidthRollupKeys[j].IntervalStart: diff --git a/satellite/satellitedb/orders_test.go b/satellite/satellitedb/orders_test.go index 21d38ea35..43c87bae9 100644 --- a/satellite/satellitedb/orders_test.go +++ b/satellite/satellitedb/orders_test.go @@ -67,18 +67,18 @@ func TestSortRollupKeys(t *testing.T) { IntervalStart: 5, Action: pb.PieceAction_GET, }, - { - ProjectID: uuid.UUID{2}, - BucketName: "a", - IntervalStart: 2, - Action: pb.PieceAction_GET, - }, { ProjectID: uuid.UUID{1}, BucketName: "b", IntervalStart: 3, Action: pb.PieceAction_GET, }, + { + ProjectID: uuid.UUID{2}, + BucketName: "a", + IntervalStart: 2, + Action: pb.PieceAction_GET, + }, } assert.NotEqual(t, expRollups, rollups) diff --git a/satellite/satellitedb/projectaccounting.go b/satellite/satellitedb/projectaccounting.go index d7e23c4d9..4e5203c50 100644 --- a/satellite/satellitedb/projectaccounting.go +++ b/satellite/satellitedb/projectaccounting.go @@ -552,8 +552,8 @@ func (db *ProjectAccounting) GetProjectTotalByPartner(ctx context.Context, proje FROM bucket_bandwidth_rollups WHERE - bucket_name = ? AND project_id = ? AND + bucket_name = ? AND interval_start >= ? AND interval_start < ? AND action = ?; @@ -624,7 +624,7 @@ func (db *ProjectAccounting) GetProjectTotalByPartner(ctx context.Context, proje return nil, err } - totalEgressRow := db.db.QueryRowContext(ctx, totalEgressQuery, []byte(bucket), projectID[:], since, before, pb.PieceAction_GET) + totalEgressRow := db.db.QueryRowContext(ctx, totalEgressQuery, projectID[:], []byte(bucket), since, before, pb.PieceAction_GET) if err != nil { return nil, err }