f77f61532a
Change-Id: I8240d99d0cdaad4c5e059565e88ee9619b62526e
394 lines
16 KiB
Go
394 lines
16 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package satellitedb
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
"reflect"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v4"
|
|
"github.com/zeebo/errs"
|
|
"go.uber.org/zap"
|
|
|
|
"storj.io/common/pb"
|
|
"storj.io/common/storj"
|
|
"storj.io/common/uuid"
|
|
"storj.io/private/dbutil/pgutil"
|
|
"storj.io/private/dbutil/pgxutil"
|
|
"storj.io/storj/satellite/orders"
|
|
"storj.io/storj/satellite/satellitedb/dbx"
|
|
)
|
|
|
|
const defaultIntervalSeconds = int(time.Hour / time.Second)
|
|
|
|
var (
|
|
// ErrDifferentStorageNodes is returned when ProcessOrders gets orders from different storage nodes.
|
|
ErrDifferentStorageNodes = errs.Class("different storage nodes")
|
|
// ErrBucketFromSerial is returned when there is an error trying to get the bucket name from the serial number.
|
|
ErrBucketFromSerial = errs.Class("bucket from serial number")
|
|
// ErrUpdateBucketBandwidthSettle is returned when there is an error updating bucket bandwidth.
|
|
ErrUpdateBucketBandwidthSettle = errs.Class("update bucket bandwidth settle")
|
|
// ErrProcessOrderWithWindowTx is returned when there is an error with the ProcessOrders transaction.
|
|
ErrProcessOrderWithWindowTx = errs.Class("process order with window transaction")
|
|
// ErrGetStoragenodeBandwidthInWindow is returned when there is an error getting all storage node bandwidth for a window.
|
|
ErrGetStoragenodeBandwidthInWindow = errs.Class("get storagenode bandwidth in window")
|
|
// ErrCreateStoragenodeBandwidth is returned when there is an error updating storage node bandwidth.
|
|
ErrCreateStoragenodeBandwidth = errs.Class("create storagenode bandwidth")
|
|
)
|
|
|
|
type ordersDB struct {
|
|
db *satelliteDB
|
|
}
|
|
|
|
// UpdateBucketBandwidthAllocation updates 'allocated' bandwidth for given bucket.
|
|
func (db *ordersDB) UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
return pgxutil.Conn(ctx, db.db, func(conn *pgx.Conn) error {
|
|
var batch pgx.Batch
|
|
|
|
// TODO decide if we need to have transaction here
|
|
batch.Queue(`START TRANSACTION`)
|
|
|
|
statement := db.db.Rebind(
|
|
`INSERT INTO bucket_bandwidth_rollups (bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(bucket_name, project_id, interval_start, action)
|
|
DO UPDATE SET allocated = bucket_bandwidth_rollups.allocated + ?`,
|
|
)
|
|
batch.Queue(statement, bucketName, projectID[:], intervalStart.UTC(), defaultIntervalSeconds, action, 0, uint64(amount), 0, uint64(amount))
|
|
|
|
if action == pb.PieceAction_GET {
|
|
dailyInterval := time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), 0, 0, 0, 0, time.UTC)
|
|
statement = db.db.Rebind(
|
|
`INSERT INTO project_bandwidth_daily_rollups (project_id, interval_day, egress_allocated, egress_settled, egress_dead)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
ON CONFLICT(project_id, interval_day)
|
|
DO UPDATE SET egress_allocated = project_bandwidth_daily_rollups.egress_allocated + EXCLUDED.egress_allocated::BIGINT`,
|
|
)
|
|
batch.Queue(statement, projectID[:], dailyInterval, uint64(amount), 0, 0)
|
|
}
|
|
|
|
batch.Queue(`COMMIT TRANSACTION`)
|
|
|
|
results := conn.SendBatch(ctx, &batch)
|
|
defer func() { err = errs.Combine(err, results.Close()) }()
|
|
|
|
var errlist errs.Group
|
|
for i := 0; i < batch.Len(); i++ {
|
|
_, err := results.Exec()
|
|
errlist.Add(err)
|
|
}
|
|
|
|
return errlist.Err()
|
|
})
|
|
}
|
|
|
|
// UpdateBucketBandwidthSettle updates 'settled' bandwidth for given bucket.
|
|
func (db *ordersDB) UpdateBucketBandwidthSettle(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, settledAmount, deadAmount int64, intervalStart time.Time) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
return db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
|
|
statement := db.db.Rebind(
|
|
`INSERT INTO bucket_bandwidth_rollups (bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(bucket_name, project_id, interval_start, action)
|
|
DO UPDATE SET settled = bucket_bandwidth_rollups.settled + ?`,
|
|
)
|
|
_, err = db.db.ExecContext(ctx, statement,
|
|
bucketName, projectID[:], intervalStart.UTC(), defaultIntervalSeconds, action, 0, 0, uint64(settledAmount), uint64(settledAmount),
|
|
)
|
|
if err != nil {
|
|
return ErrUpdateBucketBandwidthSettle.Wrap(err)
|
|
}
|
|
|
|
if action == pb.PieceAction_GET {
|
|
dailyInterval := time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), 0, 0, 0, 0, time.UTC)
|
|
statement = tx.Rebind(
|
|
`INSERT INTO project_bandwidth_daily_rollups (project_id, interval_day, egress_allocated, egress_settled, egress_dead)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
ON CONFLICT(project_id, interval_day)
|
|
DO UPDATE SET
|
|
egress_settled = project_bandwidth_daily_rollups.egress_settled + EXCLUDED.egress_settled::BIGINT,
|
|
egress_dead = project_bandwidth_daily_rollups.egress_dead + EXCLUDED.egress_dead::BIGINT`,
|
|
)
|
|
_, err = tx.Tx.ExecContext(ctx, statement, projectID[:], dailyInterval, 0, uint64(settledAmount), uint64(deadAmount))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// UpdateBucketBandwidthInline updates 'inline' bandwidth for given bucket.
|
|
func (db *ordersDB) UpdateBucketBandwidthInline(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
statement := db.db.Rebind(
|
|
`INSERT INTO bucket_bandwidth_rollups (bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(bucket_name, project_id, interval_start, action)
|
|
DO UPDATE SET inline = bucket_bandwidth_rollups.inline + ?`,
|
|
)
|
|
_, err = db.db.ExecContext(ctx, statement,
|
|
bucketName, projectID[:], intervalStart.UTC(), defaultIntervalSeconds, action, uint64(amount), 0, 0, uint64(amount),
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// UpdateStoragenodeBandwidthSettle updates 'settled' bandwidth for given storage node for the given intervalStart time.
|
|
func (db *ordersDB) UpdateStoragenodeBandwidthSettle(ctx context.Context, storageNode storj.NodeID, action pb.PieceAction, amount int64, intervalStart time.Time) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
statement := db.db.Rebind(
|
|
`INSERT INTO storagenode_bandwidth_rollups (storagenode_id, interval_start, interval_seconds, action, settled)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
ON CONFLICT(storagenode_id, interval_start, action)
|
|
DO UPDATE SET settled = storagenode_bandwidth_rollups.settled + ?`,
|
|
)
|
|
_, err = db.db.ExecContext(ctx, statement,
|
|
storageNode.Bytes(), intervalStart.UTC(), defaultIntervalSeconds, action, uint64(amount), uint64(amount),
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetBucketBandwidth gets total bucket bandwidth from period of time.
|
|
func (db *ordersDB) GetBucketBandwidth(ctx context.Context, projectID uuid.UUID, bucketName []byte, from, to time.Time) (_ int64, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
var sum *int64
|
|
query := `SELECT SUM(settled) FROM bucket_bandwidth_rollups WHERE bucket_name = ? AND project_id = ? AND interval_start > ? AND interval_start <= ?`
|
|
err = db.db.QueryRow(ctx, db.db.Rebind(query), bucketName, projectID[:], from.UTC(), to.UTC()).Scan(&sum)
|
|
if errors.Is(err, sql.ErrNoRows) || sum == nil {
|
|
return 0, nil
|
|
}
|
|
return *sum, Error.Wrap(err)
|
|
}
|
|
|
|
// GetStorageNodeBandwidth gets total storage node bandwidth from period of time.
|
|
func (db *ordersDB) GetStorageNodeBandwidth(ctx context.Context, nodeID storj.NodeID, from, to time.Time) (_ int64, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
var sum1, sum2 int64
|
|
|
|
err1 := db.db.QueryRow(ctx, db.db.Rebind(`
|
|
SELECT COALESCE(SUM(settled), 0)
|
|
FROM storagenode_bandwidth_rollups
|
|
WHERE storagenode_id = ?
|
|
AND interval_start > ?
|
|
AND interval_start <= ?
|
|
`), nodeID.Bytes(), from.UTC(), to.UTC()).Scan(&sum1)
|
|
|
|
err2 := db.db.QueryRow(ctx, db.db.Rebind(`
|
|
SELECT COALESCE(SUM(settled), 0)
|
|
FROM storagenode_bandwidth_rollups_phase2
|
|
WHERE storagenode_id = ?
|
|
AND interval_start > ?
|
|
AND interval_start <= ?
|
|
`), nodeID.Bytes(), from.UTC(), to.UTC()).Scan(&sum2)
|
|
|
|
if err1 != nil && !errors.Is(err1, sql.ErrNoRows) {
|
|
return 0, err1
|
|
} else if err2 != nil && !errors.Is(err2, sql.ErrNoRows) {
|
|
return 0, err2
|
|
}
|
|
|
|
return sum1 + sum2, nil
|
|
}
|
|
|
|
func (db *ordersDB) UpdateBucketBandwidthBatch(ctx context.Context, intervalStart time.Time, rollups []orders.BucketBandwidthRollup) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
return db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
if len(rollups) == 0 {
|
|
return nil
|
|
}
|
|
|
|
orders.SortBucketBandwidthRollups(rollups)
|
|
|
|
intervalStart = intervalStart.UTC()
|
|
intervalStart = time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), intervalStart.Hour(), 0, 0, 0, time.UTC)
|
|
|
|
var bucketNames [][]byte
|
|
var projectIDs [][]byte
|
|
var actionSlice []int32
|
|
var inlineSlice []int64
|
|
var allocatedSlice []int64
|
|
var settledSlice []int64
|
|
|
|
type bandwidth struct {
|
|
Allocated int64
|
|
Settled int64
|
|
Dead int64
|
|
}
|
|
projectRUMap := make(map[uuid.UUID]bandwidth)
|
|
|
|
for _, rollup := range rollups {
|
|
rollup := rollup
|
|
bucketNames = append(bucketNames, []byte(rollup.BucketName))
|
|
projectIDs = append(projectIDs, rollup.ProjectID[:])
|
|
actionSlice = append(actionSlice, int32(rollup.Action))
|
|
inlineSlice = append(inlineSlice, rollup.Inline)
|
|
allocatedSlice = append(allocatedSlice, rollup.Allocated)
|
|
settledSlice = append(settledSlice, rollup.Settled)
|
|
|
|
if rollup.Action == pb.PieceAction_GET {
|
|
b := projectRUMap[rollup.ProjectID]
|
|
b.Allocated += rollup.Allocated
|
|
b.Settled += rollup.Settled
|
|
b.Dead += rollup.Dead
|
|
projectRUMap[rollup.ProjectID] = b
|
|
}
|
|
}
|
|
|
|
_, err = tx.Tx.ExecContext(ctx, `
|
|
INSERT INTO bucket_bandwidth_rollups (
|
|
bucket_name, project_id,
|
|
interval_start, interval_seconds,
|
|
action, inline, allocated, settled)
|
|
SELECT
|
|
unnest($1::bytea[]), unnest($2::bytea[]),
|
|
$3, $4,
|
|
unnest($5::int4[]), unnest($6::bigint[]), unnest($7::bigint[]), unnest($8::bigint[])
|
|
ON CONFLICT(bucket_name, project_id, interval_start, action)
|
|
DO UPDATE SET
|
|
allocated = bucket_bandwidth_rollups.allocated + EXCLUDED.allocated,
|
|
inline = bucket_bandwidth_rollups.inline + EXCLUDED.inline,
|
|
settled = bucket_bandwidth_rollups.settled + EXCLUDED.settled`,
|
|
pgutil.ByteaArray(bucketNames), pgutil.ByteaArray(projectIDs),
|
|
intervalStart, defaultIntervalSeconds,
|
|
pgutil.Int4Array(actionSlice), pgutil.Int8Array(inlineSlice), pgutil.Int8Array(allocatedSlice), pgutil.Int8Array(settledSlice))
|
|
if err != nil {
|
|
db.db.log.Error("Bucket bandwidth rollup batch flush failed.", zap.Error(err))
|
|
}
|
|
|
|
projectRUIDs := make([]uuid.UUID, 0, len(projectRUMap))
|
|
var projectRUAllocated []int64
|
|
var projectRUSettled []int64
|
|
var projectRUDead []int64
|
|
dailyInterval := time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), 0, 0, 0, 0, time.UTC)
|
|
|
|
for projectID, v := range projectRUMap {
|
|
projectRUIDs = append(projectRUIDs, projectID)
|
|
projectRUAllocated = append(projectRUAllocated, v.Allocated)
|
|
projectRUSettled = append(projectRUSettled, v.Settled)
|
|
projectRUDead = append(projectRUDead, v.Dead)
|
|
}
|
|
|
|
if len(projectRUIDs) > 0 {
|
|
_, err = tx.Tx.ExecContext(ctx, `
|
|
INSERT INTO project_bandwidth_daily_rollups(project_id, interval_day, egress_allocated, egress_settled, egress_dead)
|
|
SELECT unnest($1::bytea[]), $2, unnest($3::bigint[]), unnest($4::bigint[]), unnest($5::bigint[])
|
|
ON CONFLICT(project_id, interval_day)
|
|
DO UPDATE SET
|
|
egress_allocated = project_bandwidth_daily_rollups.egress_allocated + EXCLUDED.egress_allocated::bigint,
|
|
egress_settled = project_bandwidth_daily_rollups.egress_settled + EXCLUDED.egress_settled::bigint,
|
|
egress_dead = project_bandwidth_daily_rollups.egress_dead + EXCLUDED.egress_dead::bigint
|
|
`, pgutil.UUIDArray(projectRUIDs), dailyInterval, pgutil.Int8Array(projectRUAllocated), pgutil.Int8Array(projectRUSettled), pgutil.Int8Array(projectRUDead))
|
|
if err != nil {
|
|
db.db.log.Error("Project bandwidth daily rollup batch flush failed.", zap.Error(err))
|
|
}
|
|
}
|
|
return err
|
|
})
|
|
}
|
|
|
|
//
|
|
// transaction/batch methods
|
|
//
|
|
|
|
// UpdateStoragenodeBandwidthSettleWithWindow adds a record to for each action and settled amount.
|
|
// If any of these orders already exist in the database, then all of these orders have already been processed.
|
|
// Orders within a single window may only be processed once to prevent double spending.
|
|
func (db *ordersDB) UpdateStoragenodeBandwidthSettleWithWindow(ctx context.Context, storageNodeID storj.NodeID, actionAmounts map[int32]int64, window time.Time) (status pb.SettlementWithWindowResponse_Status, alreadyProcessed bool, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
var batchStatus pb.SettlementWithWindowResponse_Status
|
|
var retryCount int
|
|
for {
|
|
err = db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
|
|
// try to get all rows from the storage node bandwidth table for the 1 hr window
|
|
// if there are already existing rows for the 1 hr window that means these orders have
|
|
// already been processed
|
|
rows, err := tx.All_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart(ctx,
|
|
dbx.StoragenodeBandwidthRollup_StoragenodeId(storageNodeID[:]),
|
|
dbx.StoragenodeBandwidthRollup_IntervalStart(window),
|
|
)
|
|
if err != nil {
|
|
return ErrGetStoragenodeBandwidthInWindow.Wrap(err)
|
|
}
|
|
|
|
if len(rows) != 0 {
|
|
// if there are already rows in the storagenode bandwidth table for this 1 hr window
|
|
// that means these orders have already been processed
|
|
// if these orders that the storagenode is trying to process again match what in the
|
|
// storagenode bandwidth table, then send a successful response to the storagenode
|
|
// so they don't keep trying to settle these orders again
|
|
// if these orders do not match what we have in the storage node bandwidth table then send
|
|
// back an invalid response
|
|
if SettledAmountsMatch(rows, actionAmounts) {
|
|
batchStatus = pb.SettlementWithWindowResponse_ACCEPTED
|
|
alreadyProcessed = true
|
|
return nil
|
|
}
|
|
batchStatus = pb.SettlementWithWindowResponse_REJECTED
|
|
return nil
|
|
}
|
|
// if there aren't any rows in the storagenode bandwidth table for this 1 hr window
|
|
// that means these orders have not been processed before so we can continue to process them
|
|
for action, amount := range actionAmounts {
|
|
_, err := tx.Create_StoragenodeBandwidthRollup(ctx,
|
|
dbx.StoragenodeBandwidthRollup_StoragenodeId(storageNodeID[:]),
|
|
dbx.StoragenodeBandwidthRollup_IntervalStart(window),
|
|
dbx.StoragenodeBandwidthRollup_IntervalSeconds(uint(defaultIntervalSeconds)),
|
|
dbx.StoragenodeBandwidthRollup_Action(uint(action)),
|
|
dbx.StoragenodeBandwidthRollup_Settled(uint64(amount)),
|
|
dbx.StoragenodeBandwidthRollup_Create_Fields{},
|
|
)
|
|
if err != nil {
|
|
return ErrCreateStoragenodeBandwidth.Wrap(err)
|
|
}
|
|
}
|
|
|
|
batchStatus = pb.SettlementWithWindowResponse_ACCEPTED
|
|
return nil
|
|
})
|
|
if dbx.IsConstraintError(err) {
|
|
retryCount++
|
|
if retryCount > 5 {
|
|
return 0, alreadyProcessed, errs.New("process order with window retry count too high")
|
|
}
|
|
continue
|
|
} else if err != nil {
|
|
return 0, alreadyProcessed, ErrProcessOrderWithWindowTx.Wrap(err)
|
|
}
|
|
break
|
|
}
|
|
|
|
return batchStatus, alreadyProcessed, nil
|
|
}
|
|
|
|
// SettledAmountsMatch checks if database rows match the orders. If the settled amount for
|
|
// each action are not the same then false is returned.
|
|
func SettledAmountsMatch(rows []*dbx.StoragenodeBandwidthRollup, orderActionAmounts map[int32]int64) bool {
|
|
rowsSumByAction := map[int32]int64{}
|
|
for _, row := range rows {
|
|
rowsSumByAction[int32(row.Action)] += int64(row.Settled)
|
|
}
|
|
|
|
return reflect.DeepEqual(rowsSumByAction, orderActionAmounts)
|
|
}
|