2019-03-27 10:24:35 +00:00
|
|
|
// Copyright (C) 2019 Storj Labs, Inc.
|
|
|
|
// See LICENSE for copying information.
|
|
|
|
|
|
|
|
package satellitedb
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2019-04-01 21:14:58 +01:00
|
|
|
"database/sql"
|
2020-01-10 18:53:42 +00:00
|
|
|
"fmt"
|
2019-06-10 15:58:28 +01:00
|
|
|
"sort"
|
2020-01-10 18:53:42 +00:00
|
|
|
"strings"
|
2019-03-28 20:09:23 +00:00
|
|
|
"time"
|
2019-03-27 10:24:35 +00:00
|
|
|
|
2019-06-25 16:58:42 +01:00
|
|
|
"github.com/skyrings/skyring-common/tools/uuid"
|
2019-08-15 20:05:43 +01:00
|
|
|
"github.com/zeebo/errs"
|
2020-01-10 18:53:42 +00:00
|
|
|
"go.uber.org/zap"
|
2019-06-10 15:58:28 +01:00
|
|
|
|
2019-12-27 11:48:47 +00:00
|
|
|
"storj.io/common/pb"
|
|
|
|
"storj.io/common/storj"
|
2020-01-16 22:58:42 +00:00
|
|
|
"storj.io/storj/private/dbutil"
|
2019-11-14 19:46:15 +00:00
|
|
|
"storj.io/storj/private/dbutil/pgutil"
|
2019-04-04 15:42:01 +01:00
|
|
|
"storj.io/storj/satellite/orders"
|
2020-01-15 02:29:51 +00:00
|
|
|
"storj.io/storj/satellite/satellitedb/dbx"
|
2019-03-27 10:24:35 +00:00
|
|
|
)
|
|
|
|
|
2019-04-01 21:14:58 +01:00
|
|
|
const defaultIntervalSeconds = int(time.Hour / time.Second)
|
|
|
|
|
2019-09-06 15:49:30 +01:00
|
|
|
var (
|
|
|
|
// ErrDifferentStorageNodes is returned when ProcessOrders gets orders from different storage nodes.
|
|
|
|
ErrDifferentStorageNodes = errs.Class("different storage nodes")
|
|
|
|
)
|
|
|
|
|
2019-03-27 10:24:35 +00:00
|
|
|
type ordersDB struct {
|
2019-12-14 02:29:54 +00:00
|
|
|
db *satelliteDB
|
2020-01-15 21:45:17 +00:00
|
|
|
|
|
|
|
reportedRollupsReadBatchSize int
|
2019-03-27 10:24:35 +00:00
|
|
|
}
|
|
|
|
|
2019-04-01 21:14:58 +01:00
|
|
|
// CreateSerialInfo creates serial number entry in database
|
2019-06-04 12:55:38 +01:00
|
|
|
func (db *ordersDB) CreateSerialInfo(ctx context.Context, serialNumber storj.SerialNumber, bucketID []byte, limitExpiration time.Time) (err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-09-12 18:31:50 +01:00
|
|
|
return db.db.CreateNoReturn_SerialNumber(
|
2019-03-28 20:09:23 +00:00
|
|
|
ctx,
|
|
|
|
dbx.SerialNumber_SerialNumber(serialNumber.Bytes()),
|
|
|
|
dbx.SerialNumber_BucketId(bucketID),
|
|
|
|
dbx.SerialNumber_ExpiresAt(limitExpiration),
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
2019-08-27 18:12:38 +01:00
|
|
|
// DeleteExpiredSerials deletes all expired serials in serial_number and used_serials table.
|
|
|
|
func (db *ordersDB) DeleteExpiredSerials(ctx context.Context, now time.Time) (_ int, err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
count, err := db.db.Delete_SerialNumber_By_ExpiresAt_LessOrEqual(ctx, dbx.SerialNumber_ExpiresAt(now))
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
return int(count), nil
|
|
|
|
}
|
|
|
|
|
2019-04-01 21:14:58 +01:00
|
|
|
// UseSerialNumber creates serial number entry in database
|
2019-06-04 12:55:38 +01:00
|
|
|
func (db *ordersDB) UseSerialNumber(ctx context.Context, serialNumber storj.SerialNumber, storageNodeID storj.NodeID) (_ []byte, err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-04-01 21:14:58 +01:00
|
|
|
statement := db.db.Rebind(
|
|
|
|
`INSERT INTO used_serials (serial_number_id, storage_node_id)
|
|
|
|
SELECT id, ? FROM serial_numbers WHERE serial_number = ?`,
|
|
|
|
)
|
2019-06-04 12:55:38 +01:00
|
|
|
_, err = db.db.ExecContext(ctx, statement, storageNodeID.Bytes(), serialNumber.Bytes())
|
2019-04-01 21:14:58 +01:00
|
|
|
if err != nil {
|
2019-10-18 22:27:57 +01:00
|
|
|
if pgutil.IsConstraintError(err) {
|
2019-04-04 15:42:01 +01:00
|
|
|
return nil, orders.ErrUsingSerialNumber.New("serial number already used")
|
|
|
|
}
|
2019-04-01 21:14:58 +01:00
|
|
|
return nil, err
|
2019-03-27 10:24:35 +00:00
|
|
|
}
|
|
|
|
|
2019-04-01 21:14:58 +01:00
|
|
|
dbxSerialNumber, err := db.db.Find_SerialNumber_By_SerialNumber(
|
|
|
|
ctx,
|
|
|
|
dbx.SerialNumber_SerialNumber(serialNumber.Bytes()),
|
|
|
|
)
|
2019-03-27 10:24:35 +00:00
|
|
|
if err != nil {
|
2019-04-01 21:14:58 +01:00
|
|
|
return nil, err
|
2019-03-27 10:24:35 +00:00
|
|
|
}
|
2019-04-04 15:42:01 +01:00
|
|
|
if dbxSerialNumber == nil {
|
|
|
|
return nil, orders.ErrUsingSerialNumber.New("serial number not found")
|
|
|
|
}
|
2019-04-01 21:14:58 +01:00
|
|
|
return dbxSerialNumber.BucketId, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// UpdateBucketBandwidthAllocation updates 'allocated' bandwidth for given bucket
|
2019-06-25 16:58:42 +01:00
|
|
|
func (db *ordersDB) UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) (err error) {
|
2019-06-04 12:55:38 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-04-01 21:14:58 +01:00
|
|
|
statement := db.db.Rebind(
|
2019-04-02 19:21:18 +01:00
|
|
|
`INSERT INTO bucket_bandwidth_rollups (bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled)
|
|
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
|
|
ON CONFLICT(bucket_name, project_id, interval_start, action)
|
2019-04-01 21:14:58 +01:00
|
|
|
DO UPDATE SET allocated = bucket_bandwidth_rollups.allocated + ?`,
|
|
|
|
)
|
2019-06-04 12:55:38 +01:00
|
|
|
_, err = db.db.ExecContext(ctx, statement,
|
2019-06-21 16:38:37 +01:00
|
|
|
bucketName, projectID[:], intervalStart, defaultIntervalSeconds, action, 0, uint64(amount), 0, uint64(amount),
|
2019-04-02 19:21:18 +01:00
|
|
|
)
|
2019-03-27 10:24:35 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2019-04-01 21:14:58 +01:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// UpdateBucketBandwidthSettle updates 'settled' bandwidth for given bucket
|
2019-06-25 16:58:42 +01:00
|
|
|
func (db *ordersDB) UpdateBucketBandwidthSettle(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) (err error) {
|
2019-06-04 12:55:38 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-04-01 21:14:58 +01:00
|
|
|
statement := db.db.Rebind(
|
2019-04-02 19:21:18 +01:00
|
|
|
`INSERT INTO bucket_bandwidth_rollups (bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled)
|
|
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
|
|
ON CONFLICT(bucket_name, project_id, interval_start, action)
|
2019-04-01 21:14:58 +01:00
|
|
|
DO UPDATE SET settled = bucket_bandwidth_rollups.settled + ?`,
|
2019-03-27 10:24:35 +00:00
|
|
|
)
|
2019-06-04 12:55:38 +01:00
|
|
|
_, err = db.db.ExecContext(ctx, statement,
|
2019-06-21 16:38:37 +01:00
|
|
|
bucketName, projectID[:], intervalStart, defaultIntervalSeconds, action, 0, 0, uint64(amount), uint64(amount),
|
2019-04-02 19:21:18 +01:00
|
|
|
)
|
2019-03-27 10:24:35 +00:00
|
|
|
if err != nil {
|
2019-04-01 21:14:58 +01:00
|
|
|
return err
|
2019-03-27 10:24:35 +00:00
|
|
|
}
|
2019-04-01 21:14:58 +01:00
|
|
|
return nil
|
2019-03-27 10:24:35 +00:00
|
|
|
}
|
|
|
|
|
2019-04-01 21:14:58 +01:00
|
|
|
// UpdateBucketBandwidthInline updates 'inline' bandwidth for given bucket
|
2019-06-25 16:58:42 +01:00
|
|
|
func (db *ordersDB) UpdateBucketBandwidthInline(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) (err error) {
|
2019-06-04 12:55:38 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-04-01 21:14:58 +01:00
|
|
|
statement := db.db.Rebind(
|
2019-04-02 19:21:18 +01:00
|
|
|
`INSERT INTO bucket_bandwidth_rollups (bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled)
|
|
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
|
|
ON CONFLICT(bucket_name, project_id, interval_start, action)
|
2019-04-01 21:14:58 +01:00
|
|
|
DO UPDATE SET inline = bucket_bandwidth_rollups.inline + ?`,
|
|
|
|
)
|
2019-06-04 12:55:38 +01:00
|
|
|
_, err = db.db.ExecContext(ctx, statement,
|
2019-06-21 16:38:37 +01:00
|
|
|
bucketName, projectID[:], intervalStart, defaultIntervalSeconds, action, uint64(amount), 0, 0, uint64(amount),
|
2019-04-02 19:21:18 +01:00
|
|
|
)
|
2019-03-27 10:24:35 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2019-04-01 21:14:58 +01:00
|
|
|
return nil
|
|
|
|
}
|
2019-03-27 10:24:35 +00:00
|
|
|
|
2019-04-04 16:20:59 +01:00
|
|
|
// UpdateStoragenodeBandwidthSettle updates 'settled' bandwidth for given storage node for the given intervalStart time
|
2019-06-04 12:55:38 +01:00
|
|
|
func (db *ordersDB) UpdateStoragenodeBandwidthSettle(ctx context.Context, storageNode storj.NodeID, action pb.PieceAction, amount int64, intervalStart time.Time) (err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-04-01 21:14:58 +01:00
|
|
|
statement := db.db.Rebind(
|
2020-01-12 15:20:47 +00:00
|
|
|
`INSERT INTO storagenode_bandwidth_rollups (storagenode_id, interval_start, interval_seconds, action, settled)
|
|
|
|
VALUES (?, ?, ?, ?, ?)
|
2019-04-01 21:14:58 +01:00
|
|
|
ON CONFLICT(storagenode_id, interval_start, action)
|
|
|
|
DO UPDATE SET settled = storagenode_bandwidth_rollups.settled + ?`,
|
|
|
|
)
|
2019-06-04 12:55:38 +01:00
|
|
|
_, err = db.db.ExecContext(ctx, statement,
|
2020-01-12 15:20:47 +00:00
|
|
|
storageNode.Bytes(), intervalStart, defaultIntervalSeconds, action, uint64(amount), uint64(amount),
|
2019-04-02 19:21:18 +01:00
|
|
|
)
|
2019-03-27 10:24:35 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2019-04-01 21:14:58 +01:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// GetBucketBandwidth gets total bucket bandwidth from period of time
|
2019-06-25 16:58:42 +01:00
|
|
|
func (db *ordersDB) GetBucketBandwidth(ctx context.Context, projectID uuid.UUID, bucketName []byte, from, to time.Time) (_ int64, err error) {
|
2019-06-04 12:55:38 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-04-01 21:14:58 +01:00
|
|
|
var sum *int64
|
2019-04-02 19:21:18 +01:00
|
|
|
query := `SELECT SUM(settled) FROM bucket_bandwidth_rollups WHERE bucket_name = ? AND project_id = ? AND interval_start > ? AND interval_start <= ?`
|
2020-01-17 20:07:00 +00:00
|
|
|
err = db.db.QueryRow(ctx, db.db.Rebind(query), bucketName, projectID[:], from, to).Scan(&sum)
|
2019-04-01 21:14:58 +01:00
|
|
|
if err == sql.ErrNoRows || sum == nil {
|
|
|
|
return 0, nil
|
|
|
|
}
|
2020-01-15 21:45:17 +00:00
|
|
|
return *sum, Error.Wrap(err)
|
2019-04-01 21:14:58 +01:00
|
|
|
}
|
2019-03-27 10:24:35 +00:00
|
|
|
|
2019-04-01 21:14:58 +01:00
|
|
|
// GetStorageNodeBandwidth gets total storage node bandwidth from period of time
|
2019-06-04 12:55:38 +01:00
|
|
|
func (db *ordersDB) GetStorageNodeBandwidth(ctx context.Context, nodeID storj.NodeID, from, to time.Time) (_ int64, err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-04-01 21:14:58 +01:00
|
|
|
var sum *int64
|
|
|
|
query := `SELECT SUM(settled) FROM storagenode_bandwidth_rollups WHERE storagenode_id = ? AND interval_start > ? AND interval_start <= ?`
|
2020-01-17 20:07:00 +00:00
|
|
|
err = db.db.QueryRow(ctx, db.db.Rebind(query), nodeID.Bytes(), from, to).Scan(&sum)
|
2019-04-01 21:14:58 +01:00
|
|
|
if err == sql.ErrNoRows || sum == nil {
|
|
|
|
return 0, nil
|
|
|
|
}
|
|
|
|
return *sum, err
|
|
|
|
}
|
2019-03-27 10:24:35 +00:00
|
|
|
|
2019-04-01 21:14:58 +01:00
|
|
|
// UnuseSerialNumber removes pair serial number -> storage node id from database
|
2019-06-04 12:55:38 +01:00
|
|
|
func (db *ordersDB) UnuseSerialNumber(ctx context.Context, serialNumber storj.SerialNumber, storageNodeID storj.NodeID) (err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-04-01 21:14:58 +01:00
|
|
|
statement := `DELETE FROM used_serials WHERE storage_node_id = ? AND
|
|
|
|
serial_number_id IN (SELECT id FROM serial_numbers WHERE serial_number = ?)`
|
2019-06-04 12:55:38 +01:00
|
|
|
_, err = db.db.ExecContext(ctx, db.db.Rebind(statement), storageNodeID.Bytes(), serialNumber.Bytes())
|
2019-04-01 21:14:58 +01:00
|
|
|
return err
|
2019-03-27 10:24:35 +00:00
|
|
|
}
|
2019-08-15 20:05:43 +01:00
|
|
|
|
2019-09-06 15:49:30 +01:00
|
|
|
// ProcessOrders take a list of order requests and "settles" them in one transaction.
|
|
|
|
//
|
|
|
|
// ProcessOrders requires that all orders come from the same storage node.
|
2019-08-19 14:36:11 +01:00
|
|
|
func (db *ordersDB) ProcessOrders(ctx context.Context, requests []*orders.ProcessOrderRequest) (responses []*orders.ProcessOrderResponse, err error) {
|
2019-08-15 20:05:43 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
|
|
|
if len(requests) == 0 {
|
2020-01-15 21:45:17 +00:00
|
|
|
return nil, nil
|
2019-08-15 20:05:43 +01:00
|
|
|
}
|
|
|
|
|
2019-09-06 15:49:30 +01:00
|
|
|
// check that all requests are from the same storage node
|
|
|
|
storageNodeID := requests[0].OrderLimit.StorageNodeId
|
|
|
|
for _, req := range requests[1:] {
|
|
|
|
if req.OrderLimit.StorageNodeId != storageNodeID {
|
|
|
|
return nil, ErrDifferentStorageNodes.New("requests from different storage nodes %v and %v", storageNodeID, req.OrderLimit.StorageNodeId)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// sort requests by serial number, all of them should be from the same storage node
|
|
|
|
sort.Slice(requests, func(i, k int) bool {
|
|
|
|
return requests[i].OrderLimit.SerialNumber.Less(requests[k].OrderLimit.SerialNumber)
|
|
|
|
})
|
|
|
|
|
2020-01-15 21:45:17 +00:00
|
|
|
// do a read only transaction to get all the project id/bucket ids
|
|
|
|
var bucketIDs [][]byte
|
2019-12-19 09:56:26 +00:00
|
|
|
err = db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
|
2020-01-15 21:45:17 +00:00
|
|
|
for _, request := range requests {
|
|
|
|
row, err := tx.Find_SerialNumber_By_SerialNumber(ctx,
|
|
|
|
dbx.SerialNumber_SerialNumber(request.Order.SerialNumber.Bytes()))
|
|
|
|
if err != nil {
|
|
|
|
return Error.Wrap(err)
|
|
|
|
}
|
|
|
|
if row != nil {
|
|
|
|
bucketIDs = append(bucketIDs, row.BucketId)
|
|
|
|
} else {
|
|
|
|
bucketIDs = append(bucketIDs, nil)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
2019-12-19 09:56:26 +00:00
|
|
|
})
|
2020-01-15 21:45:17 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, Error.Wrap(err)
|
|
|
|
}
|
|
|
|
|
2020-01-16 22:58:42 +00:00
|
|
|
// perform all of the upserts into reported serials table
|
2020-01-15 21:45:17 +00:00
|
|
|
err = db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
|
2020-01-16 22:58:42 +00:00
|
|
|
var stmt strings.Builder
|
|
|
|
var stmtBegin, stmtEnd string
|
|
|
|
switch db.db.implementation {
|
|
|
|
case dbutil.Postgres:
|
|
|
|
stmtBegin = `INSERT INTO reported_serials ( expires_at, storage_node_id, bucket_id, action, serial_number, settled, observed_at ) VALUES `
|
|
|
|
stmtEnd = ` ON CONFLICT ( expires_at, storage_node_id, bucket_id, action, serial_number )
|
|
|
|
DO UPDATE SET
|
|
|
|
expires_at = EXCLUDED.expires_at,
|
|
|
|
storage_node_id = EXCLUDED.storage_node_id,
|
|
|
|
bucket_id = EXCLUDED.bucket_id,
|
|
|
|
action = EXCLUDED.action,
|
|
|
|
serial_number = EXCLUDED.serial_number,
|
|
|
|
settled = EXCLUDED.settled,
|
|
|
|
observed_at = EXCLUDED.observed_at`
|
|
|
|
case dbutil.Cockroach:
|
|
|
|
stmtBegin = `UPSERT INTO reported_serials ( expires_at, storage_node_id, bucket_id, action, serial_number, settled, observed_at ) VALUES `
|
|
|
|
default:
|
|
|
|
return errs.New("invalid dbType: %v", db.db.driver)
|
|
|
|
}
|
|
|
|
|
|
|
|
stmt.WriteString(stmtBegin)
|
|
|
|
var expiresAt time.Time
|
|
|
|
var bucketID []byte
|
|
|
|
var serialNum storj.SerialNumber
|
|
|
|
var action pb.PieceAction
|
|
|
|
var expiresArgNum, bucketArgNum, serialArgNum, actionArgNum int
|
|
|
|
var args []interface{}
|
2020-01-24 15:44:48 +00:00
|
|
|
writeComma := false
|
2020-01-16 22:58:42 +00:00
|
|
|
args = append(args, storageNodeID.Bytes(), time.Now().UTC())
|
2020-01-15 21:45:17 +00:00
|
|
|
|
|
|
|
for i, request := range requests {
|
|
|
|
if bucketIDs[i] == nil {
|
|
|
|
responses = append(responses, &orders.ProcessOrderResponse{
|
|
|
|
SerialNumber: request.Order.SerialNumber,
|
|
|
|
Status: pb.SettlementResponse_REJECTED,
|
|
|
|
})
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2020-01-24 15:44:48 +00:00
|
|
|
if writeComma {
|
2020-01-16 22:58:42 +00:00
|
|
|
stmt.WriteString(",")
|
|
|
|
}
|
|
|
|
if expiresAt != roundToNextDay(request.OrderLimit.OrderExpiration) {
|
|
|
|
expiresAt = roundToNextDay(request.OrderLimit.OrderExpiration)
|
|
|
|
args = append(args, expiresAt)
|
|
|
|
expiresArgNum = len(args)
|
|
|
|
}
|
|
|
|
if string(bucketID) != string(bucketIDs[i]) {
|
|
|
|
bucketID = bucketIDs[i]
|
|
|
|
args = append(args, bucketID)
|
|
|
|
bucketArgNum = len(args)
|
|
|
|
}
|
|
|
|
if action != request.OrderLimit.Action {
|
|
|
|
action = request.OrderLimit.Action
|
|
|
|
args = append(args, action)
|
|
|
|
actionArgNum = len(args)
|
|
|
|
}
|
|
|
|
if serialNum != request.Order.SerialNumber {
|
|
|
|
serialNum = request.Order.SerialNumber
|
|
|
|
args = append(args, serialNum.Bytes())
|
|
|
|
serialArgNum = len(args)
|
2020-01-15 21:45:17 +00:00
|
|
|
}
|
2019-08-15 20:05:43 +01:00
|
|
|
|
2020-01-16 22:58:42 +00:00
|
|
|
args = append(args, request.Order.Amount)
|
|
|
|
stmt.WriteString(fmt.Sprintf(
|
|
|
|
"($%d,$1,$%d,$%d,$%d,$%d,$2)",
|
|
|
|
expiresArgNum,
|
|
|
|
bucketArgNum,
|
|
|
|
actionArgNum,
|
|
|
|
serialArgNum,
|
|
|
|
len(args),
|
|
|
|
))
|
2020-01-24 15:44:48 +00:00
|
|
|
writeComma = true
|
2020-01-16 22:58:42 +00:00
|
|
|
|
2020-01-15 21:45:17 +00:00
|
|
|
responses = append(responses, &orders.ProcessOrderResponse{
|
|
|
|
SerialNumber: request.Order.SerialNumber,
|
|
|
|
Status: pb.SettlementResponse_ACCEPTED,
|
|
|
|
})
|
2019-08-15 20:05:43 +01:00
|
|
|
}
|
2020-01-16 22:58:42 +00:00
|
|
|
stmt.WriteString(stmtEnd)
|
|
|
|
_, err = tx.Tx.ExecContext(ctx, stmt.String(), args...)
|
|
|
|
if err != nil {
|
|
|
|
return Error.Wrap(err)
|
|
|
|
}
|
2019-08-29 16:14:10 +01:00
|
|
|
|
2020-01-15 21:45:17 +00:00
|
|
|
return nil
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return nil, Error.Wrap(err)
|
|
|
|
}
|
|
|
|
return responses, nil
|
|
|
|
}
|
2019-08-29 16:14:10 +01:00
|
|
|
|
2020-01-15 21:45:17 +00:00
|
|
|
func roundToNextDay(t time.Time) time.Time {
|
2020-01-17 00:33:34 +00:00
|
|
|
t = t.UTC()
|
|
|
|
return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location()).AddDate(0, 0, 1)
|
2020-01-15 21:45:17 +00:00
|
|
|
}
|
2019-09-06 15:49:30 +01:00
|
|
|
|
2020-01-15 21:45:17 +00:00
|
|
|
// GetBillableBandwidth gets total billable (expired consumed serial) bandwidth for nodes and buckets for all actions.
|
|
|
|
func (db *ordersDB) GetBillableBandwidth(ctx context.Context, now time.Time) (
|
|
|
|
bucketRollups []orders.BucketBandwidthRollup, storagenodeRollups []orders.StoragenodeBandwidthRollup, err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-08-15 20:05:43 +01:00
|
|
|
|
2020-01-15 21:45:17 +00:00
|
|
|
batchSize := db.reportedRollupsReadBatchSize
|
|
|
|
if batchSize <= 0 {
|
|
|
|
batchSize = 1000
|
2019-08-15 20:05:43 +01:00
|
|
|
}
|
|
|
|
|
2020-01-15 21:45:17 +00:00
|
|
|
type storagenodeKey struct {
|
|
|
|
nodeID storj.NodeID
|
|
|
|
action pb.PieceAction
|
2019-08-15 20:05:43 +01:00
|
|
|
}
|
2020-01-15 21:45:17 +00:00
|
|
|
byStoragenode := make(map[storagenodeKey]uint64)
|
2019-09-06 15:49:30 +01:00
|
|
|
|
2020-01-15 21:45:17 +00:00
|
|
|
type bucketKey struct {
|
|
|
|
projectID uuid.UUID
|
|
|
|
bucketName string
|
|
|
|
action pb.PieceAction
|
|
|
|
}
|
|
|
|
byBucket := make(map[bucketKey]uint64)
|
|
|
|
|
|
|
|
var token *dbx.Paged_ReportedSerial_By_ExpiresAt_LessOrEqual_Continuation
|
|
|
|
var rows []*dbx.ReportedSerial
|
2019-09-06 15:49:30 +01:00
|
|
|
|
2020-01-15 21:45:17 +00:00
|
|
|
for {
|
|
|
|
// We explicitly use a new transaction each time because we don't need the guarantees and
|
|
|
|
// because we don't want a transaction reading for 1000 years.
|
|
|
|
rows, token, err = db.db.Paged_ReportedSerial_By_ExpiresAt_LessOrEqual(ctx,
|
|
|
|
dbx.ReportedSerial_ExpiresAt(now), batchSize, token)
|
2019-08-15 20:05:43 +01:00
|
|
|
if err != nil {
|
2020-01-15 21:45:17 +00:00
|
|
|
return nil, nil, Error.Wrap(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, row := range rows {
|
|
|
|
nodeID, err := storj.NodeIDFromBytes(row.StorageNodeId)
|
|
|
|
if err != nil {
|
|
|
|
db.db.log.Error("bad row inserted into reported serials",
|
|
|
|
zap.Binary("storagenode_id", row.StorageNodeId))
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
projectID, bucketName, err := orders.SplitBucketID(row.BucketId)
|
|
|
|
if err != nil {
|
|
|
|
db.db.log.Error("bad row inserted into reported serials",
|
|
|
|
zap.Binary("bucket_id", row.BucketId))
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
action := pb.PieceAction(row.Action)
|
|
|
|
settled := row.Settled
|
|
|
|
|
|
|
|
byStoragenode[storagenodeKey{
|
|
|
|
nodeID: nodeID,
|
|
|
|
action: action,
|
|
|
|
}] += settled
|
|
|
|
|
|
|
|
byBucket[bucketKey{
|
|
|
|
projectID: *projectID,
|
|
|
|
bucketName: string(bucketName),
|
|
|
|
action: action,
|
|
|
|
}] += settled
|
|
|
|
}
|
|
|
|
|
|
|
|
if token == nil {
|
|
|
|
break
|
2019-08-15 20:05:43 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-01-15 21:45:17 +00:00
|
|
|
for key, settled := range byBucket {
|
|
|
|
bucketRollups = append(bucketRollups, orders.BucketBandwidthRollup{
|
|
|
|
ProjectID: key.projectID,
|
|
|
|
BucketName: key.bucketName,
|
|
|
|
Action: key.action,
|
|
|
|
Settled: int64(settled),
|
|
|
|
})
|
2019-09-06 15:49:30 +01:00
|
|
|
}
|
|
|
|
|
2020-01-15 21:45:17 +00:00
|
|
|
for key, settled := range byStoragenode {
|
|
|
|
storagenodeRollups = append(storagenodeRollups, orders.StoragenodeBandwidthRollup{
|
|
|
|
NodeID: key.nodeID,
|
|
|
|
Action: key.action,
|
|
|
|
Settled: int64(settled),
|
2019-09-06 15:49:30 +01:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2020-01-15 21:45:17 +00:00
|
|
|
return bucketRollups, storagenodeRollups, nil
|
|
|
|
}
|
2019-08-15 20:05:43 +01:00
|
|
|
|
2020-01-15 21:45:17 +00:00
|
|
|
//
|
|
|
|
// transaction/batch methods
|
|
|
|
//
|
2019-08-15 20:05:43 +01:00
|
|
|
|
2020-01-15 21:45:17 +00:00
|
|
|
type ordersDBTx struct {
|
|
|
|
tx *dbx.Tx
|
|
|
|
log *zap.Logger
|
|
|
|
}
|
2019-08-15 20:05:43 +01:00
|
|
|
|
2020-01-16 18:02:15 +00:00
|
|
|
func (db *ordersDB) WithTransaction(ctx context.Context, cb func(ctx context.Context, tx orders.Transaction) error) (err error) {
|
2020-01-15 21:45:17 +00:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
|
|
|
return db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
|
|
|
|
return cb(ctx, &ordersDBTx{tx: tx, log: db.db.log})
|
|
|
|
})
|
2019-08-15 20:05:43 +01:00
|
|
|
}
|
2020-01-10 18:53:42 +00:00
|
|
|
|
2020-01-15 21:45:17 +00:00
|
|
|
func (tx *ordersDBTx) UpdateBucketBandwidthBatch(ctx context.Context, intervalStart time.Time, rollups []orders.BucketBandwidthRollup) (err error) {
|
2020-01-14 00:36:12 +00:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
2020-01-10 18:53:42 +00:00
|
|
|
if len(rollups) == 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2020-01-15 21:45:17 +00:00
|
|
|
orders.SortBucketBandwidthRollups(rollups)
|
|
|
|
|
2020-01-10 18:53:42 +00:00
|
|
|
const stmtBegin = `
|
|
|
|
INSERT INTO bucket_bandwidth_rollups (bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled)
|
|
|
|
VALUES
|
|
|
|
`
|
|
|
|
const stmtEnd = `
|
|
|
|
ON CONFLICT(bucket_name, project_id, interval_start, action)
|
2020-01-15 21:45:17 +00:00
|
|
|
DO UPDATE SET
|
|
|
|
allocated = bucket_bandwidth_rollups.allocated + EXCLUDED.allocated,
|
|
|
|
inline = bucket_bandwidth_rollups.inline + EXCLUDED.inline,
|
|
|
|
settled = bucket_bandwidth_rollups.settled + EXCLUDED.settled
|
2020-01-10 18:53:42 +00:00
|
|
|
`
|
|
|
|
|
|
|
|
intervalStart = intervalStart.UTC()
|
|
|
|
intervalStart = time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), intervalStart.Hour(), 0, 0, 0, time.UTC)
|
|
|
|
|
|
|
|
var lastProjectID uuid.UUID
|
|
|
|
var lastBucketName string
|
|
|
|
var projectIDArgNum int
|
|
|
|
var bucketNameArgNum int
|
|
|
|
var args []interface{}
|
|
|
|
|
|
|
|
var stmt strings.Builder
|
|
|
|
stmt.WriteString(stmtBegin)
|
|
|
|
|
|
|
|
args = append(args, intervalStart)
|
|
|
|
for i, rollup := range rollups {
|
|
|
|
if i > 0 {
|
|
|
|
stmt.WriteString(",")
|
|
|
|
}
|
|
|
|
if lastProjectID != rollup.ProjectID {
|
|
|
|
lastProjectID = rollup.ProjectID
|
2020-01-24 04:57:00 +00:00
|
|
|
// Take the slice over a copy of the value so that we don't mutate
|
|
|
|
// the underlying value for different range iterations. :grrcox:
|
|
|
|
project := rollup.ProjectID
|
|
|
|
args = append(args, project[:])
|
2020-01-10 18:53:42 +00:00
|
|
|
projectIDArgNum = len(args)
|
|
|
|
}
|
|
|
|
if lastBucketName != rollup.BucketName {
|
|
|
|
lastBucketName = rollup.BucketName
|
|
|
|
args = append(args, lastBucketName)
|
|
|
|
bucketNameArgNum = len(args)
|
|
|
|
}
|
2020-01-15 21:45:17 +00:00
|
|
|
args = append(args, rollup.Action, rollup.Inline, rollup.Allocated, rollup.Settled)
|
2020-01-10 18:53:42 +00:00
|
|
|
|
|
|
|
stmt.WriteString(fmt.Sprintf(
|
2020-01-15 21:45:17 +00:00
|
|
|
"($%d,$%d,$1,%d,$%d,$%d,$%d,$%d)",
|
2020-01-10 18:53:42 +00:00
|
|
|
bucketNameArgNum,
|
|
|
|
projectIDArgNum,
|
|
|
|
defaultIntervalSeconds,
|
2020-01-15 21:45:17 +00:00
|
|
|
len(args)-3,
|
2020-01-10 18:53:42 +00:00
|
|
|
len(args)-2,
|
|
|
|
len(args)-1,
|
|
|
|
len(args),
|
|
|
|
))
|
|
|
|
}
|
|
|
|
stmt.WriteString(stmtEnd)
|
2020-01-14 00:36:12 +00:00
|
|
|
|
2020-01-15 21:45:17 +00:00
|
|
|
_, err = tx.tx.Tx.ExecContext(ctx, stmt.String(), args...)
|
2020-01-10 18:53:42 +00:00
|
|
|
if err != nil {
|
2020-01-15 21:45:17 +00:00
|
|
|
tx.log.Error("Bucket bandwidth rollup batch flush failed.", zap.Error(err))
|
2020-01-10 18:53:42 +00:00
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
2020-01-15 21:45:17 +00:00
|
|
|
|
|
|
|
func (tx *ordersDBTx) UpdateStoragenodeBandwidthBatch(ctx context.Context, intervalStart time.Time, rollups []orders.StoragenodeBandwidthRollup) (err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
|
|
|
if len(rollups) == 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
orders.SortStoragenodeBandwidthRollups(rollups)
|
|
|
|
|
|
|
|
const stmtBegin = `
|
|
|
|
INSERT INTO storagenode_bandwidth_rollups (storagenode_id, interval_start, interval_seconds, action, allocated, settled)
|
|
|
|
VALUES
|
|
|
|
`
|
|
|
|
const stmtEnd = `
|
|
|
|
ON CONFLICT(storagenode_id, interval_start, action)
|
|
|
|
DO UPDATE SET
|
|
|
|
allocated = storagenode_bandwidth_rollups.allocated + EXCLUDED.allocated,
|
|
|
|
settled = storagenode_bandwidth_rollups.settled + EXCLUDED.settled
|
|
|
|
`
|
|
|
|
|
|
|
|
intervalStart = intervalStart.UTC()
|
|
|
|
intervalStart = time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), intervalStart.Hour(), 0, 0, 0, time.UTC)
|
|
|
|
|
|
|
|
var lastNodeID storj.NodeID
|
|
|
|
var nodeIDArgNum int
|
|
|
|
var args []interface{}
|
|
|
|
|
|
|
|
var stmt strings.Builder
|
|
|
|
stmt.WriteString(stmtBegin)
|
|
|
|
|
|
|
|
args = append(args, intervalStart)
|
|
|
|
for i, rollup := range rollups {
|
|
|
|
if i > 0 {
|
|
|
|
stmt.WriteString(",")
|
|
|
|
}
|
|
|
|
if lastNodeID != rollup.NodeID {
|
|
|
|
lastNodeID = rollup.NodeID
|
|
|
|
// take the slice over rollup.ProjectID, because it is going to stay
|
|
|
|
// the same up to the ExecContext call, whereas lastProjectID is likely
|
|
|
|
// to be overwritten
|
|
|
|
args = append(args, rollup.NodeID.Bytes())
|
|
|
|
nodeIDArgNum = len(args)
|
|
|
|
}
|
|
|
|
args = append(args, rollup.Action, rollup.Allocated, rollup.Settled)
|
|
|
|
|
|
|
|
stmt.WriteString(fmt.Sprintf(
|
|
|
|
"($%d,$1,%d,$%d,$%d,$%d)",
|
|
|
|
nodeIDArgNum,
|
|
|
|
defaultIntervalSeconds,
|
|
|
|
len(args)-2,
|
|
|
|
len(args)-1,
|
|
|
|
len(args),
|
|
|
|
))
|
|
|
|
}
|
|
|
|
stmt.WriteString(stmtEnd)
|
|
|
|
|
|
|
|
_, err = tx.tx.Tx.ExecContext(ctx, stmt.String(), args...)
|
|
|
|
if err != nil {
|
|
|
|
tx.log.Error("Storagenode bandwidth rollup batch flush failed.", zap.Error(err))
|
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// DeleteExpiredReportedSerials deletes any expired reported serials as of expiredThreshold.
|
|
|
|
func (tx *ordersDBTx) DeleteExpiredReportedSerials(ctx context.Context, expiredThreshold time.Time) (err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
|
|
|
_, err = tx.tx.Delete_ReportedSerial_By_ExpiresAt_LessOrEqual(ctx,
|
|
|
|
dbx.ReportedSerial_ExpiresAt(expiredThreshold))
|
|
|
|
return Error.Wrap(err)
|
|
|
|
}
|