// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package satellitedb
import (
const defaultIntervalSeconds = int(time.Hour / time.Second)
var (
// ErrDifferentStorageNodes is returned when ProcessOrders gets orders from different storage nodes.
ErrDifferentStorageNodes = errs.Class("different storage nodes")
// ErrBucketFromSerial is returned when there is an error trying to get the bucket name from the serial number.
ErrBucketFromSerial = errs.Class("bucket from serial number")
// ErrUpdateBucketBandwidthSettle is returned when there is an error updating bucket bandwidth.
ErrUpdateBucketBandwidthSettle = errs.Class("update bucket bandwidth settle")
// ErrProcessOrderWithWindowTx is returned when there is an error with the ProcessOrders transaction.
ErrProcessOrderWithWindowTx = errs.Class("process order with window transaction")
// ErrGetStoragenodeBandwidthInWindow is returned when there is an error getting all storage node bandwidth for a window.
ErrGetStoragenodeBandwidthInWindow = errs.Class("get storagenode bandwidth in window")
// ErrCreateStoragenodeBandwidth is returned when there is an error updating storage node bandwidth.
ErrCreateStoragenodeBandwidth = errs.Class("create storagenode bandwidth")
type ordersDB struct {
db *satelliteDB
type bandwidth struct {
Allocated int64
Settled int64
Inline int64
Dead int64
type bandwidthRollupKey struct {
BucketName string
ProjectID uuid.UUID
IntervalStart int64
Action pb.PieceAction
// UpdateBucketBandwidthAllocation updates 'allocated' bandwidth for given bucket.
func (db *ordersDB) UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
// TODO I wanted to remove this implementation but it looks it's heavily used in tests
// we should do cleanup as a separate change (Michal)
return pgxutil.Conn(ctx, db.db, func(conn *pgx.Conn) error {
var batch pgx.Batch
// TODO decide if we need to have transaction here
statement := db.db.Rebind(
`INSERT INTO bucket_bandwidth_rollups (project_id, bucket_name, interval_start, interval_seconds, action, inline, allocated, settled)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(project_id, bucket_name, interval_start, action)
DO UPDATE SET allocated = bucket_bandwidth_rollups.allocated + ?`,
batch.Queue(statement, projectID[:], bucketName, intervalStart.UTC(), defaultIntervalSeconds, action, 0, uint64(amount), 0, uint64(amount))
if action == pb.PieceAction_GET {
dailyInterval := time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), 0, 0, 0, 0, time.UTC)
statement = db.db.Rebind(
`INSERT INTO project_bandwidth_daily_rollups (project_id, interval_day, egress_allocated, egress_settled, egress_dead)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(project_id, interval_day)
DO UPDATE SET egress_allocated = project_bandwidth_daily_rollups.egress_allocated + EXCLUDED.egress_allocated::BIGINT`,
batch.Queue(statement, projectID[:], dailyInterval, uint64(amount), 0, 0)
results := conn.SendBatch(ctx, &batch)
defer func() { err = errs.Combine(err, results.Close()) }()
var errlist errs.Group
for i := 0; i < batch.Len(); i++ {
_, err := results.Exec()
return errlist.Err()
// UpdateBucketBandwidthSettle updates 'settled' bandwidth for given bucket.
func (db *ordersDB) UpdateBucketBandwidthSettle(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, settledAmount, deadAmount int64, intervalStart time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
return db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
statement := tx.Rebind(
`INSERT INTO bucket_bandwidth_rollups (project_id, bucket_name, interval_start, interval_seconds, action, inline, allocated, settled)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(project_id, bucket_name, interval_start, action)
DO UPDATE SET settled = bucket_bandwidth_rollups.settled + ?`,
_, err = tx.Tx.ExecContext(ctx, statement,
projectID[:], bucketName, intervalStart.UTC(), defaultIntervalSeconds, action, 0, 0, uint64(settledAmount), uint64(settledAmount),
if err != nil {
return ErrUpdateBucketBandwidthSettle.Wrap(err)
if action == pb.PieceAction_GET {
dailyInterval := time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), 0, 0, 0, 0, time.UTC)
statement = tx.Rebind(
`INSERT INTO project_bandwidth_daily_rollups (project_id, interval_day, egress_allocated, egress_settled, egress_dead)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(project_id, interval_day)
egress_settled = project_bandwidth_daily_rollups.egress_settled + EXCLUDED.egress_settled::BIGINT,
egress_dead = project_bandwidth_daily_rollups.egress_dead + EXCLUDED.egress_dead::BIGINT`,
_, err = tx.Tx.ExecContext(ctx, statement, projectID[:], dailyInterval, 0, uint64(settledAmount), uint64(deadAmount))
if err != nil {
return err
return nil
// UpdateBucketBandwidthInline updates 'inline' bandwidth for given bucket.
func (db *ordersDB) UpdateBucketBandwidthInline(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
statement := db.db.Rebind(
`INSERT INTO bucket_bandwidth_rollups (project_id, bucket_name, interval_start, interval_seconds, action, inline, allocated, settled)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(project_id, bucket_name, interval_start, action)
DO UPDATE SET inline = bucket_bandwidth_rollups.inline + ?`,
_, err = db.db.ExecContext(ctx, statement,
projectID[:], bucketName, intervalStart.UTC(), defaultIntervalSeconds, action, uint64(amount), 0, 0, uint64(amount),
if err != nil {
return err
return nil
// UpdateStoragenodeBandwidthSettle updates 'settled' bandwidth for given storage node for the given intervalStart time.
func (db *ordersDB) UpdateStoragenodeBandwidthSettle(ctx context.Context, storageNode storj.NodeID, action pb.PieceAction, amount int64, intervalStart time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
statement := db.db.Rebind(
`INSERT INTO storagenode_bandwidth_rollups (storagenode_id, interval_start, interval_seconds, action, settled)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(storagenode_id, interval_start, action)
DO UPDATE SET settled = storagenode_bandwidth_rollups.settled + ?`,
_, err = db.db.ExecContext(ctx, statement,
storageNode.Bytes(), intervalStart.UTC(), defaultIntervalSeconds, action, uint64(amount), uint64(amount),
if err != nil {
return err
return nil
// GetBucketBandwidth gets total bucket bandwidth from period of time.
func (db *ordersDB) GetBucketBandwidth(ctx context.Context, projectID uuid.UUID, bucketName []byte, from, to time.Time) (_ int64, err error) {
defer mon.Task()(&ctx)(&err)
var sum *int64
query := `SELECT SUM(settled) FROM bucket_bandwidth_rollups WHERE project_id = ? AND bucket_name = ? AND interval_start > ? AND interval_start <= ?`
err = db.db.QueryRow(ctx, db.db.Rebind(query), projectID[:], bucketName, from.UTC(), to.UTC()).Scan(&sum)
if errors.Is(err, sql.ErrNoRows) || sum == nil {
return 0, nil
return *sum, Error.Wrap(err)
// GetStorageNodeBandwidth gets total storage node bandwidth from period of time.
func (db *ordersDB) GetStorageNodeBandwidth(ctx context.Context, nodeID storj.NodeID, from, to time.Time) (_ int64, err error) {
defer mon.Task()(&ctx)(&err)
var sum1, sum2 int64
err1 := db.db.QueryRow(ctx, db.db.Rebind(`
FROM storagenode_bandwidth_rollups
WHERE storagenode_id = ?
AND interval_start > ?
AND interval_start <= ?
`), nodeID.Bytes(), from.UTC(), to.UTC()).Scan(&sum1)
err2 := db.db.QueryRow(ctx, db.db.Rebind(`
FROM storagenode_bandwidth_rollups_phase2
WHERE storagenode_id = ?
AND interval_start > ?
AND interval_start <= ?
`), nodeID.Bytes(), from.UTC(), to.UTC()).Scan(&sum2)
if err1 != nil && !errors.Is(err1, sql.ErrNoRows) {
return 0, err1
} else if err2 != nil && !errors.Is(err2, sql.ErrNoRows) {
return 0, err2
return sum1 + sum2, nil
// UpdateBandwidthBatch updates bucket and project bandwidth rollups in the database.
func (db *ordersDB) UpdateBandwidthBatch(ctx context.Context, rollups []orders.BucketBandwidthRollup) (err error) {
defer mon.Task()(&ctx)(&err)
if len(rollups) == 0 {
return nil
return db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
defer mon.Task()(&ctx)(&err)
// TODO reorg code to make clear what we are inserting/updating to
// bucket_bandwidth_rollups and project_bandwidth_daily_rollups
bucketRUMap := rollupBandwidth(rollups, toHourlyInterval, getBucketRollupKey)
projectIDs := make([]uuid.UUID, 0, len(bucketRUMap))
bucketNames := make([][]byte, 0, len(bucketRUMap))
intervalStartSlice := make([]time.Time, 0, len(bucketRUMap))
actionSlice := make([]int32, 0, len(bucketRUMap))
inlineSlice := make([]int64, 0, len(bucketRUMap))
settledSlice := make([]int64, 0, len(bucketRUMap))
bucketRUMapKeys := make([]bandwidthRollupKey, 0, len(bucketRUMap))
for key := range bucketRUMap {
bucketRUMapKeys = append(bucketRUMapKeys, key)
for _, rollupInfo := range bucketRUMapKeys {
usage := bucketRUMap[rollupInfo]
if usage.Inline != 0 || usage.Settled != 0 {
projectIDs = append(projectIDs, rollupInfo.ProjectID)
bucketNames = append(bucketNames, []byte(rollupInfo.BucketName))
intervalStartSlice = append(intervalStartSlice, time.Unix(rollupInfo.IntervalStart, 0))
actionSlice = append(actionSlice, int32(rollupInfo.Action))
inlineSlice = append(inlineSlice, usage.Inline)
settledSlice = append(settledSlice, usage.Settled)
// allocated must be not-null so lets keep slice until we will change DB schema
emptyAllocatedSlice := make([]int64, len(projectIDs))
if len(projectIDs) > 0 {
_, err = tx.Tx.ExecContext(ctx, `
INSERT INTO bucket_bandwidth_rollups (
project_id, bucket_name,
interval_start, interval_seconds,
action, inline, allocated, settled)
unnest($1::bytea[]), unnest($2::bytea[]), unnest($3::timestamptz[]),
unnest($5::int4[]), unnest($6::bigint[]), unnest($7::bigint[]), unnest($8::bigint[])
ON CONFLICT(project_id, bucket_name, interval_start, action)
inline = bucket_bandwidth_rollups.inline + EXCLUDED.inline,
settled = bucket_bandwidth_rollups.settled + EXCLUDED.settled
`, pgutil.UUIDArray(projectIDs), pgutil.ByteaArray(bucketNames), pgutil.TimestampTZArray(intervalStartSlice),
pgutil.Int4Array(actionSlice), pgutil.Int8Array(inlineSlice), pgutil.Int8Array(emptyAllocatedSlice), pgutil.Int8Array(settledSlice))
if err != nil {
return errs.New("bucket bandwidth rollup batch flush failed: %+v", err)
projectRUMap := rollupBandwidth(rollups, toDailyInterval, getProjectRollupKey)
projectIDs = make([]uuid.UUID, 0, len(projectRUMap))
intervalStartSlice = make([]time.Time, 0, len(projectRUMap))
allocatedSlice := make([]int64, 0, len(projectRUMap))
settledSlice = make([]int64, 0, len(projectRUMap))
deadSlice := make([]int64, 0, len(projectRUMap))
projectRUMapKeys := make([]bandwidthRollupKey, 0, len(projectRUMap))
for key := range projectRUMap {
if key.Action == pb.PieceAction_GET {
projectRUMapKeys = append(projectRUMapKeys, key)
for _, rollupInfo := range projectRUMapKeys {
usage := projectRUMap[rollupInfo]
projectIDs = append(projectIDs, rollupInfo.ProjectID)
intervalStartSlice = append(intervalStartSlice, time.Unix(rollupInfo.IntervalStart, 0))
allocatedSlice = append(allocatedSlice, usage.Allocated)
settledSlice = append(settledSlice, usage.Settled)
deadSlice = append(deadSlice, usage.Dead)
if len(projectIDs) > 0 {
// TODO: explore updating project_bandwidth_daily_rollups table to use "timestamp with time zone" for interval_day
_, err = tx.Tx.ExecContext(ctx, `
INSERT INTO project_bandwidth_daily_rollups(project_id, interval_day, egress_allocated, egress_settled, egress_dead)
SELECT unnest($1::bytea[]), unnest($2::date[]), unnest($3::bigint[]), unnest($4::bigint[]), unnest($5::bigint[])
ON CONFLICT(project_id, interval_day)
egress_allocated = project_bandwidth_daily_rollups.egress_allocated + EXCLUDED.egress_allocated::bigint,
egress_settled = project_bandwidth_daily_rollups.egress_settled + EXCLUDED.egress_settled::bigint,
egress_dead = project_bandwidth_daily_rollups.egress_dead + EXCLUDED.egress_dead::bigint
`, pgutil.UUIDArray(projectIDs), pgutil.DateArray(intervalStartSlice), pgutil.Int8Array(allocatedSlice), pgutil.Int8Array(settledSlice), pgutil.Int8Array(deadSlice))
if err != nil {
return errs.New("project bandwidth daily rollup batch flush failed: %+v", err)
return nil
// transaction/batch methods
// UpdateStoragenodeBandwidthSettleWithWindow adds a record to for each action and settled amount.
// If any of these orders already exist in the database, then all of these orders have already been processed.
// Orders within a single window may only be processed once to prevent double spending.
func (db *ordersDB) UpdateStoragenodeBandwidthSettleWithWindow(ctx context.Context, storageNodeID storj.NodeID, actionAmounts map[int32]int64, window time.Time) (status pb.SettlementWithWindowResponse_Status, alreadyProcessed bool, err error) {
defer mon.Task()(&ctx)(&err)
var batchStatus pb.SettlementWithWindowResponse_Status
var retryCount int
for {
err = db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
// try to get all rows from the storage node bandwidth table for the 1 hr window
// if there are already existing rows for the 1 hr window that means these orders have
// already been processed
rows, err := tx.All_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart(ctx,
if err != nil {
return ErrGetStoragenodeBandwidthInWindow.Wrap(err)
if len(rows) != 0 {
// if there are already rows in the storagenode bandwidth table for this 1 hr window
// that means these orders have already been processed
// if these orders that the storagenode is trying to process again match what in the
// storagenode bandwidth table, then send a successful response to the storagenode
// so they don't keep trying to settle these orders again
// if these orders do not match what we have in the storage node bandwidth table then send
// back an invalid response
if SettledAmountsMatch(rows, actionAmounts) {
batchStatus = pb.SettlementWithWindowResponse_ACCEPTED
alreadyProcessed = true
return nil
batchStatus = pb.SettlementWithWindowResponse_REJECTED
return nil
// if there aren't any rows in the storagenode bandwidth table for this 1 hr window
// that means these orders have not been processed before so we can continue to process them
for action, amount := range actionAmounts {
_, err := tx.Create_StoragenodeBandwidthRollup(ctx,
if err != nil {
return ErrCreateStoragenodeBandwidth.Wrap(err)
batchStatus = pb.SettlementWithWindowResponse_ACCEPTED
return nil
if dbx.IsConstraintError(err) {
if retryCount > 5 {
return 0, alreadyProcessed, errs.New("process order with window retry count too high")
} else if err != nil {
return 0, alreadyProcessed, ErrProcessOrderWithWindowTx.Wrap(err)
return batchStatus, alreadyProcessed, nil
// SettledAmountsMatch checks if database rows match the orders. If the settled amount for
// each action are not the same then false is returned.
func SettledAmountsMatch(rows []*dbx.StoragenodeBandwidthRollup, orderActionAmounts map[int32]int64) bool {
rowsSumByAction := map[int32]int64{}
for _, row := range rows {
rowsSumByAction[int32(row.Action)] += int64(row.Settled)
return reflect.DeepEqual(rowsSumByAction, orderActionAmounts)
// toDailyInterval rounds the time stamp down to the start of the day and converts it to unix time.
func toDailyInterval(timeInterval time.Time) int64 {
return time.Date(timeInterval.Year(), timeInterval.Month(), timeInterval.Day(), 0, 0, 0, 0, timeInterval.Location()).Unix()
// toHourlyInterval rounds the time stamp down to the start of the hour and converts it to unix time.
func toHourlyInterval(timeInterval time.Time) int64 {
return time.Date(timeInterval.Year(), timeInterval.Month(), timeInterval.Day(), timeInterval.Hour(), 0, 0, 0, timeInterval.Location()).Unix()
// rollupBandwidth rollup the bandwidth statistics into a map based on the provided key, interval.
func rollupBandwidth(rollups []orders.BucketBandwidthRollup,
toInterval func(time.Time) int64,
getKey func(orders.BucketBandwidthRollup, func(time.Time) int64) bandwidthRollupKey) map[bandwidthRollupKey]bandwidth {
projectRUMap := make(map[bandwidthRollupKey]bandwidth)
for _, rollup := range rollups {
rollup := rollup
projectKey := getKey(rollup, toInterval)
if b, ok := projectRUMap[projectKey]; ok {
b.Allocated += rollup.Allocated
b.Settled += rollup.Settled
b.Inline += rollup.Inline
b.Dead += rollup.Dead
projectRUMap[projectKey] = b
} else {
projectRUMap[projectKey] = bandwidth{
Allocated: rollup.Allocated,
Settled: rollup.Settled,
Inline: rollup.Inline,
Dead: rollup.Dead,
return projectRUMap
// getBucketRollupKey return a key for use in bucket bandwidth rollup statistics.
func getBucketRollupKey(rollup orders.BucketBandwidthRollup, toInterval func(time.Time) int64) bandwidthRollupKey {
return bandwidthRollupKey{
BucketName: rollup.BucketName,
ProjectID: rollup.ProjectID,
IntervalStart: toInterval(rollup.IntervalStart),
Action: rollup.Action,
// getProjectRollupKey return a key for use in project bandwidth rollup statistics.
func getProjectRollupKey(rollup orders.BucketBandwidthRollup, toInterval func(time.Time) int64) bandwidthRollupKey {
return bandwidthRollupKey{
ProjectID: rollup.ProjectID,
IntervalStart: toInterval(rollup.IntervalStart),
Action: rollup.Action,
func sortBandwidthRollupKeys(bandwidthRollupKeys []bandwidthRollupKey) {
sort.SliceStable(bandwidthRollupKeys, func(i, j int) bool {
uuidCompare := bandwidthRollupKeys[i].ProjectID.Compare(bandwidthRollupKeys[j].ProjectID)
switch {
case uuidCompare == -1:
return true
case uuidCompare == 1:
return false
case bandwidthRollupKeys[i].BucketName < bandwidthRollupKeys[j].BucketName:
return true
case bandwidthRollupKeys[i].BucketName > bandwidthRollupKeys[j].BucketName:
return false
case bandwidthRollupKeys[i].IntervalStart < bandwidthRollupKeys[j].IntervalStart:
return true
case bandwidthRollupKeys[i].IntervalStart > bandwidthRollupKeys[j].IntervalStart:
return false
case bandwidthRollupKeys[i].Action < bandwidthRollupKeys[j].Action:
return true
case bandwidthRollupKeys[i].Action > bandwidthRollupKeys[j].Action:
return false
return false