storj/satellite/satellitedb/orders.go

193 lines
7.6 KiB
Go
Raw Normal View History

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package satellitedb
import (
"bytes"
"context"
"database/sql"
2019-03-28 20:09:23 +00:00
"time"
"storj.io/storj/pkg/pb"
2019-03-28 20:09:23 +00:00
"storj.io/storj/pkg/storj"
dbx "storj.io/storj/satellite/satellitedb/dbx"
)
const defaultIntervalSeconds = int(time.Hour / time.Second)
type ordersDB struct {
db *dbx.DB
}
// CreateSerialInfo creates serial number entry in database
2019-03-28 20:09:23 +00:00
func (db *ordersDB) CreateSerialInfo(ctx context.Context, serialNumber storj.SerialNumber, bucketID []byte, limitExpiration time.Time) error {
_, err := db.db.Create_SerialNumber(
ctx,
dbx.SerialNumber_SerialNumber(serialNumber.Bytes()),
dbx.SerialNumber_BucketId(bucketID),
dbx.SerialNumber_ExpiresAt(limitExpiration),
)
return err
}
// UseSerialNumber creates serial number entry in database
func (db *ordersDB) UseSerialNumber(ctx context.Context, serialNumber storj.SerialNumber, storageNodeID storj.NodeID) ([]byte, error) {
statement := db.db.Rebind(
`INSERT INTO used_serials (serial_number_id, storage_node_id)
SELECT id, ? FROM serial_numbers WHERE serial_number = ?`,
)
_, err := db.db.ExecContext(ctx, statement, storageNodeID.Bytes(), serialNumber.Bytes())
if err != nil {
return nil, err
}
dbxSerialNumber, err := db.db.Find_SerialNumber_By_SerialNumber(
ctx,
dbx.SerialNumber_SerialNumber(serialNumber.Bytes()),
)
if err != nil {
return nil, err
}
return dbxSerialNumber.BucketId, nil
}
// UpdateBucketBandwidthAllocation updates 'allocated' bandwidth for given bucket
func (db *ordersDB) UpdateBucketBandwidthAllocation(ctx context.Context, bucketID []byte, action pb.PieceAction, amount int64) error {
now := time.Now()
intervalStart := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location())
pathElements := bytes.Split(bucketID, []byte("/"))
bucketName, projectID := pathElements[1], pathElements[0]
statement := db.db.Rebind(
`INSERT INTO bucket_bandwidth_rollups (bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(bucket_name, project_id, interval_start, action)
DO UPDATE SET allocated = bucket_bandwidth_rollups.allocated + ?`,
)
_, err := db.db.ExecContext(ctx, statement,
bucketName, projectID, intervalStart, defaultIntervalSeconds, action, 0, uint64(amount), 0, uint64(amount),
)
if err != nil {
return err
}
return nil
}
// UpdateBucketBandwidthSettle updates 'settled' bandwidth for given bucket
func (db *ordersDB) UpdateBucketBandwidthSettle(ctx context.Context, bucketID []byte, action pb.PieceAction, amount int64) error {
now := time.Now()
intervalStart := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location())
pathElements := bytes.Split(bucketID, []byte("/"))
bucketName, projectID := pathElements[1], pathElements[0]
statement := db.db.Rebind(
`INSERT INTO bucket_bandwidth_rollups (bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(bucket_name, project_id, interval_start, action)
DO UPDATE SET settled = bucket_bandwidth_rollups.settled + ?`,
)
_, err := db.db.ExecContext(ctx, statement,
bucketName, projectID, intervalStart, defaultIntervalSeconds, action, 0, 0, uint64(amount), uint64(amount),
)
if err != nil {
return err
}
return nil
}
// UpdateBucketBandwidthInline updates 'inline' bandwidth for given bucket
func (db *ordersDB) UpdateBucketBandwidthInline(ctx context.Context, bucketID []byte, action pb.PieceAction, amount int64) error {
now := time.Now()
intervalStart := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location())
pathElements := bytes.Split(bucketID, []byte("/"))
bucketName, projectID := pathElements[1], pathElements[0]
statement := db.db.Rebind(
`INSERT INTO bucket_bandwidth_rollups (bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(bucket_name, project_id, interval_start, action)
DO UPDATE SET inline = bucket_bandwidth_rollups.inline + ?`,
)
_, err := db.db.ExecContext(ctx, statement,
bucketName, projectID, intervalStart, defaultIntervalSeconds, action, uint64(amount), 0, 0, uint64(amount),
)
if err != nil {
return err
}
return nil
}
// UpdateStoragenodeBandwidthAllocation updates 'allocated' bandwidth for given storage node
func (db *ordersDB) UpdateStoragenodeBandwidthAllocation(ctx context.Context, storageNode storj.NodeID, action pb.PieceAction, amount int64) error {
now := time.Now()
intervalStart := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location())
statement := db.db.Rebind(
`INSERT INTO storagenode_bandwidth_rollups (storagenode_id, interval_start, interval_seconds, action, allocated, settled)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(storagenode_id, interval_start, action)
DO UPDATE SET allocated = storagenode_bandwidth_rollups.allocated + ?`,
)
_, err := db.db.ExecContext(ctx, statement,
storageNode.Bytes(), intervalStart, defaultIntervalSeconds, action, uint64(amount), 0, uint64(amount),
)
if err != nil {
return err
}
return nil
}
// UpdateStoragenodeBandwidthSettle updates 'settled' bandwidth for given storage node
func (db *ordersDB) UpdateStoragenodeBandwidthSettle(ctx context.Context, storageNode storj.NodeID, action pb.PieceAction, amount int64) error {
now := time.Now()
intervalStart := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location())
statement := db.db.Rebind(
`INSERT INTO storagenode_bandwidth_rollups (storagenode_id, interval_start, interval_seconds, action, allocated, settled)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(storagenode_id, interval_start, action)
DO UPDATE SET settled = storagenode_bandwidth_rollups.settled + ?`,
)
_, err := db.db.ExecContext(ctx, statement,
storageNode.Bytes(), intervalStart, defaultIntervalSeconds, action, 0, uint64(amount), uint64(amount),
)
if err != nil {
return err
}
return nil
}
// GetBucketBandwidth gets total bucket bandwidth from period of time
func (db *ordersDB) GetBucketBandwidth(ctx context.Context, bucketID []byte, from, to time.Time) (int64, error) {
pathElements := bytes.Split(bucketID, []byte("/"))
bucketName, projectID := pathElements[1], pathElements[0]
var sum *int64
query := `SELECT SUM(settled) FROM bucket_bandwidth_rollups WHERE bucket_name = ? AND project_id = ? AND interval_start > ? AND interval_start <= ?`
err := db.db.QueryRow(db.db.Rebind(query), bucketName, projectID, from, to).Scan(&sum)
if err == sql.ErrNoRows || sum == nil {
return 0, nil
}
return *sum, err
}
// GetStorageNodeBandwidth gets total storage node bandwidth from period of time
func (db *ordersDB) GetStorageNodeBandwidth(ctx context.Context, nodeID storj.NodeID, from, to time.Time) (int64, error) {
var sum *int64
query := `SELECT SUM(settled) FROM storagenode_bandwidth_rollups WHERE storagenode_id = ? AND interval_start > ? AND interval_start <= ?`
err := db.db.QueryRow(db.db.Rebind(query), nodeID.Bytes(), from, to).Scan(&sum)
if err == sql.ErrNoRows || sum == nil {
return 0, nil
}
return *sum, err
}
// UnuseSerialNumber removes pair serial number -> storage node id from database
func (db *ordersDB) UnuseSerialNumber(ctx context.Context, serialNumber storj.SerialNumber, storageNodeID storj.NodeID) error {
statement := `DELETE FROM used_serials WHERE storage_node_id = ? AND
serial_number_id IN (SELECT id FROM serial_numbers WHERE serial_number = ?)`
_, err := db.db.ExecContext(ctx, db.db.Rebind(statement), storageNodeID.Bytes(), serialNumber.Bytes())
return err
}