a8b66dce17
With the new phase 3 order submission, orders can be added to the storage and bandwidth rollup tables at timestamps before the most recent rollup was run. This change shifts the start time of each new rollup window to account for any unexpired orders that might have been added since the previous rollup. A satellitedb migration is necessary to allow upserts in the accounting_rollups table when entries with identical node_ids and start_times are inserted. Change-Id: Ib3022081f4d6be60cfec8430b45867ad3c01da63
373 lines
12 KiB
Go
373 lines
12 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package satellitedb
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"time"
|
|
|
|
"github.com/zeebo/errs"
|
|
|
|
"storj.io/common/storj"
|
|
"storj.io/storj/private/dbutil"
|
|
"storj.io/storj/private/dbutil/pgutil"
|
|
"storj.io/storj/satellite/accounting"
|
|
"storj.io/storj/satellite/compensation"
|
|
"storj.io/storj/satellite/satellitedb/dbx"
|
|
)
|
|
|
|
// StoragenodeAccounting implements the accounting/db StoragenodeAccounting interface.
|
|
type StoragenodeAccounting struct {
|
|
db *satelliteDB
|
|
}
|
|
|
|
// SaveTallies records raw tallies of at rest data to the database.
|
|
func (db *StoragenodeAccounting) SaveTallies(ctx context.Context, latestTally time.Time, nodeData map[storj.NodeID]float64) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
if len(nodeData) == 0 {
|
|
return Error.New("In SaveTallies with empty nodeData")
|
|
}
|
|
var nodeIDs []storj.NodeID
|
|
var totals []float64
|
|
var totalSum float64
|
|
for id, total := range nodeData {
|
|
nodeIDs = append(nodeIDs, id)
|
|
totals = append(totals, total)
|
|
totalSum += total
|
|
}
|
|
mon.IntVal("nodetallies.totalsum").Observe(int64(totalSum)) //mon:locked
|
|
|
|
err = db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
|
|
_, err = tx.Tx.ExecContext(ctx, db.db.Rebind(`
|
|
INSERT INTO storagenode_storage_tallies (
|
|
interval_end_time,
|
|
node_id, data_total)
|
|
SELECT
|
|
$1,
|
|
unnest($2::bytea[]), unnest($3::float8[])`),
|
|
latestTally,
|
|
pgutil.NodeIDArray(nodeIDs), pgutil.Float8Array(totals))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return tx.UpdateNoReturn_AccountingTimestamps_By_Name(ctx,
|
|
dbx.AccountingTimestamps_Name(accounting.LastAtRestTally),
|
|
dbx.AccountingTimestamps_Update_Fields{
|
|
Value: dbx.AccountingTimestamps_Value(latestTally),
|
|
},
|
|
)
|
|
})
|
|
return Error.Wrap(err)
|
|
}
|
|
|
|
// GetTallies retrieves all raw tallies.
|
|
func (db *StoragenodeAccounting) GetTallies(ctx context.Context) (_ []*accounting.StoragenodeStorageTally, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
raws, err := db.db.All_StoragenodeStorageTally(ctx)
|
|
out := make([]*accounting.StoragenodeStorageTally, len(raws))
|
|
for i, r := range raws {
|
|
nodeID, err := storj.NodeIDFromBytes(r.NodeId)
|
|
if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
out[i] = &accounting.StoragenodeStorageTally{
|
|
NodeID: nodeID,
|
|
IntervalEndTime: r.IntervalEndTime,
|
|
DataTotal: r.DataTotal,
|
|
}
|
|
}
|
|
return out, Error.Wrap(err)
|
|
}
|
|
|
|
// GetTalliesSince retrieves all raw tallies since latestRollup.
|
|
func (db *StoragenodeAccounting) GetTalliesSince(ctx context.Context, latestRollup time.Time) (_ []*accounting.StoragenodeStorageTally, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
raws, err := db.db.All_StoragenodeStorageTally_By_IntervalEndTime_GreaterOrEqual(ctx, dbx.StoragenodeStorageTally_IntervalEndTime(latestRollup))
|
|
out := make([]*accounting.StoragenodeStorageTally, len(raws))
|
|
for i, r := range raws {
|
|
nodeID, err := storj.NodeIDFromBytes(r.NodeId)
|
|
if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
out[i] = &accounting.StoragenodeStorageTally{
|
|
NodeID: nodeID,
|
|
IntervalEndTime: r.IntervalEndTime,
|
|
DataTotal: r.DataTotal,
|
|
}
|
|
}
|
|
return out, Error.Wrap(err)
|
|
}
|
|
|
|
// GetBandwidthSince retrieves all storagenode_bandwidth_rollup entires since latestRollup.
|
|
func (db *StoragenodeAccounting) GetBandwidthSince(ctx context.Context, latestRollup time.Time) (out []*accounting.StoragenodeBandwidthRollup, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
// get everything from the current rollups table
|
|
rollups, err := db.db.All_StoragenodeBandwidthRollup_By_IntervalStart_GreaterOrEqual(ctx,
|
|
dbx.StoragenodeBandwidthRollup_IntervalStart(latestRollup))
|
|
if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
|
|
for _, r := range rollups {
|
|
nodeID, err := storj.NodeIDFromBytes(r.StoragenodeId)
|
|
if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
out = append(out, &accounting.StoragenodeBandwidthRollup{
|
|
NodeID: nodeID,
|
|
IntervalStart: r.IntervalStart,
|
|
Action: r.Action,
|
|
Settled: r.Settled,
|
|
})
|
|
}
|
|
|
|
// include everything from the phase2 rollups table as well
|
|
phase2rollups, err := db.db.All_StoragenodeBandwidthRollupPhase2_By_IntervalStart_GreaterOrEqual(ctx,
|
|
dbx.StoragenodeBandwidthRollupPhase2_IntervalStart(latestRollup))
|
|
if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
|
|
for _, r := range phase2rollups {
|
|
nodeID, err := storj.NodeIDFromBytes(r.StoragenodeId)
|
|
if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
out = append(out, &accounting.StoragenodeBandwidthRollup{
|
|
NodeID: nodeID,
|
|
IntervalStart: r.IntervalStart,
|
|
Action: r.Action,
|
|
Settled: r.Settled,
|
|
})
|
|
}
|
|
|
|
return out, nil
|
|
}
|
|
|
|
// SaveRollup records raw tallies of at rest data to the database.
|
|
func (db *StoragenodeAccounting) SaveRollup(ctx context.Context, latestRollup time.Time, stats accounting.RollupStats) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
if len(stats) == 0 {
|
|
return Error.New("In SaveRollup with empty nodeData")
|
|
}
|
|
err = db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
|
|
for _, arsByDate := range stats {
|
|
for _, ar := range arsByDate {
|
|
nID := dbx.AccountingRollup_NodeId(ar.NodeID.Bytes())
|
|
start := dbx.AccountingRollup_StartTime(ar.StartTime)
|
|
put := dbx.AccountingRollup_PutTotal(ar.PutTotal)
|
|
get := dbx.AccountingRollup_GetTotal(ar.GetTotal)
|
|
audit := dbx.AccountingRollup_GetAuditTotal(ar.GetAuditTotal)
|
|
getRepair := dbx.AccountingRollup_GetRepairTotal(ar.GetRepairTotal)
|
|
putRepair := dbx.AccountingRollup_PutRepairTotal(ar.PutRepairTotal)
|
|
atRest := dbx.AccountingRollup_AtRestTotal(ar.AtRestTotal)
|
|
|
|
err := tx.ReplaceNoReturn_AccountingRollup(ctx, nID, start, put, get, audit, getRepair, putRepair, atRest)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
return tx.UpdateNoReturn_AccountingTimestamps_By_Name(ctx,
|
|
dbx.AccountingTimestamps_Name(accounting.LastRollup),
|
|
dbx.AccountingTimestamps_Update_Fields{
|
|
Value: dbx.AccountingTimestamps_Value(latestRollup),
|
|
},
|
|
)
|
|
})
|
|
return Error.Wrap(err)
|
|
}
|
|
|
|
// LastTimestamp records the greatest last tallied time.
|
|
func (db *StoragenodeAccounting) LastTimestamp(ctx context.Context, timestampType string) (_ time.Time, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
lastTally := time.Time{}
|
|
err = db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
|
|
lt, err := tx.Find_AccountingTimestamps_Value_By_Name(ctx, dbx.AccountingTimestamps_Name(timestampType))
|
|
if lt == nil {
|
|
return tx.CreateNoReturn_AccountingTimestamps(ctx,
|
|
dbx.AccountingTimestamps_Name(timestampType),
|
|
dbx.AccountingTimestamps_Value(lastTally),
|
|
)
|
|
}
|
|
lastTally = lt.Value
|
|
return err
|
|
})
|
|
return lastTally, err
|
|
}
|
|
|
|
// QueryPaymentInfo queries Overlay, Accounting Rollup on nodeID.
|
|
func (db *StoragenodeAccounting) QueryPaymentInfo(ctx context.Context, start time.Time, end time.Time) (_ []*accounting.CSVRow, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
sqlStmt := `SELECT n.id, n.created_at, r.at_rest_total, r.get_repair_total,
|
|
r.put_repair_total, r.get_audit_total, r.put_total, r.get_total, n.wallet, n.disqualified
|
|
FROM (
|
|
SELECT node_id, SUM(at_rest_total::decimal) AS at_rest_total, SUM(get_repair_total) AS get_repair_total,
|
|
SUM(put_repair_total) AS put_repair_total, SUM(get_audit_total) AS get_audit_total,
|
|
SUM(put_total) AS put_total, SUM(get_total) AS get_total
|
|
FROM accounting_rollups
|
|
WHERE start_time >= ? AND start_time < ?
|
|
GROUP BY node_id
|
|
) r
|
|
LEFT JOIN nodes n ON n.id = r.node_id
|
|
ORDER BY n.id`
|
|
|
|
rows, err := db.db.DB.QueryContext(ctx, db.db.Rebind(sqlStmt), start.UTC(), end.UTC())
|
|
if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
defer func() { err = errs.Combine(err, rows.Close()) }()
|
|
|
|
csv := []*accounting.CSVRow{}
|
|
for rows.Next() {
|
|
var nodeID []byte
|
|
r := &accounting.CSVRow{}
|
|
var wallet sql.NullString
|
|
var disqualified *time.Time
|
|
err := rows.Scan(&nodeID, &r.NodeCreationDate, &r.AtRestTotal, &r.GetRepairTotal,
|
|
&r.PutRepairTotal, &r.GetAuditTotal, &r.PutTotal, &r.GetTotal, &wallet, &disqualified)
|
|
if err != nil {
|
|
return csv, Error.Wrap(err)
|
|
}
|
|
if wallet.Valid {
|
|
r.Wallet = wallet.String
|
|
}
|
|
id, err := storj.NodeIDFromBytes(nodeID)
|
|
if err != nil {
|
|
return csv, Error.Wrap(err)
|
|
}
|
|
r.NodeID = id
|
|
r.Disqualified = disqualified
|
|
csv = append(csv, r)
|
|
}
|
|
return csv, rows.Err()
|
|
}
|
|
|
|
// QueryStorageNodePeriodUsage returns usage invoices for nodes for a compensation period.
|
|
func (db *StoragenodeAccounting) QueryStorageNodePeriodUsage(ctx context.Context, period compensation.Period) (_ []accounting.StorageNodePeriodUsage, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
stmt := db.db.Rebind(`
|
|
SELECT
|
|
node_id,
|
|
SUM(at_rest_total::decimal) AS at_rest_total,
|
|
SUM(get_total) AS get_total,
|
|
SUM(put_total) AS put_total,
|
|
SUM(get_repair_total) AS get_repair_total,
|
|
SUM(put_repair_total) AS put_repair_total,
|
|
SUM(get_audit_total) AS get_audit_total
|
|
FROM
|
|
accounting_rollups
|
|
WHERE
|
|
start_time >= ? AND start_time < ?
|
|
GROUP BY
|
|
node_id
|
|
ORDER BY
|
|
node_id ASC
|
|
`)
|
|
|
|
rows, err := db.db.DB.QueryContext(ctx, stmt, period.StartDate(), period.EndDateExclusive())
|
|
if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
defer func() { err = errs.Combine(err, rows.Close()) }()
|
|
|
|
usages := []accounting.StorageNodePeriodUsage{}
|
|
for rows.Next() {
|
|
var nodeID []byte
|
|
usage := accounting.StorageNodePeriodUsage{}
|
|
if err := rows.Scan(
|
|
&nodeID,
|
|
&usage.AtRestTotal,
|
|
&usage.GetTotal,
|
|
&usage.PutTotal,
|
|
&usage.GetRepairTotal,
|
|
&usage.PutRepairTotal,
|
|
&usage.GetAuditTotal,
|
|
); err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
|
|
usage.NodeID, err = storj.NodeIDFromBytes(nodeID)
|
|
if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
usages = append(usages, usage)
|
|
}
|
|
return usages, rows.Err()
|
|
}
|
|
|
|
// QueryStorageNodeUsage returns slice of StorageNodeUsage for given period.
|
|
func (db *StoragenodeAccounting) QueryStorageNodeUsage(ctx context.Context, nodeID storj.NodeID, start time.Time, end time.Time) (_ []accounting.StorageNodeUsage, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
lastRollup, err := db.db.Find_AccountingTimestamps_Value_By_Name(ctx, dbx.AccountingTimestamps_Name(accounting.LastRollup))
|
|
if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
if lastRollup == nil {
|
|
return nil, nil
|
|
}
|
|
|
|
start, end = start.UTC(), end.UTC()
|
|
|
|
query := `
|
|
SELECT SUM(at_rest_total), (start_time at time zone 'UTC')::date as start_time
|
|
FROM accounting_rollups
|
|
WHERE node_id = $1
|
|
AND $2 <= start_time AND start_time <= $3
|
|
GROUP BY (start_time at time zone 'UTC')::date
|
|
UNION
|
|
SELECT SUM(data_total) AS at_rest_total, (interval_end_time at time zone 'UTC')::date AS start_time
|
|
FROM storagenode_storage_tallies
|
|
WHERE node_id = $1
|
|
AND NOT EXISTS (
|
|
SELECT 1 FROM accounting_rollups
|
|
WHERE node_id = $1
|
|
AND $2 <= start_time AND start_time <= $3
|
|
AND (start_time at time zone 'UTC')::date = (interval_end_time at time zone 'UTC')::date
|
|
)
|
|
AND (SELECT value FROM accounting_timestamps WHERE name = $4) < interval_end_time AND interval_end_time <= $3
|
|
GROUP BY (interval_end_time at time zone 'UTC')::date
|
|
ORDER BY start_time;
|
|
`
|
|
|
|
rows, err := db.db.QueryContext(ctx, db.db.Rebind(query),
|
|
nodeID, start, end, accounting.LastRollup,
|
|
)
|
|
if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
defer func() { err = errs.Combine(err, rows.Close()) }()
|
|
|
|
var nodeStorageUsages []accounting.StorageNodeUsage
|
|
for rows.Next() {
|
|
var atRestTotal float64
|
|
var startTime dbutil.NullTime
|
|
|
|
err = rows.Scan(&atRestTotal, &startTime)
|
|
if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
|
|
nodeStorageUsages = append(nodeStorageUsages, accounting.StorageNodeUsage{
|
|
NodeID: nodeID,
|
|
StorageUsed: atRestTotal,
|
|
Timestamp: startTime.Time,
|
|
})
|
|
}
|
|
|
|
return nodeStorageUsages, rows.Err()
|
|
}
|
|
|
|
// DeleteTalliesBefore deletes all raw tallies prior to some time.
|
|
func (db *StoragenodeAccounting) DeleteTalliesBefore(ctx context.Context, latestRollup time.Time) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
deleteRawSQL := `DELETE FROM storagenode_storage_tallies WHERE interval_end_time < ?`
|
|
_, err = db.db.DB.ExecContext(ctx, db.db.Rebind(deleteRawSQL), latestRollup)
|
|
return err
|
|
}
|