paul cannon ae5947327b satellite/accounting: Use metabase.AliasPiece with tally observer
We want to eliminate usages of LoopSegmentEntry.Pieces, because it is
costing a lot of cpu time to look up node IDs with every piece of every
segment we read.

In this change, we are eliminating use of LoopSegmentEntry.Pieces in the
node tally observer (both the ranged loop and segments loop variants).
It is not necessary to have a fully resolved nodeID until it is time to
store totals in the database. We can use NodeAliases as the map key
instead, and resolve NodeIDs just before storing totals.

Change-Id: Iec12aa393072436d7c22cc5a4ae1b63966cbcc18
2023-03-29 12:24:05 +00:00

698 lines
21 KiB

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package satellitedb
import (
// StoragenodeAccounting implements the accounting/db StoragenodeAccounting interface.
type StoragenodeAccounting struct {
db *satelliteDB
// SaveTallies records raw tallies of at rest data to the database.
func (db *StoragenodeAccounting) SaveTallies(ctx context.Context, latestTally time.Time, nodeIDs []storj.NodeID, totals []float64) (err error) {
defer mon.Task()(&ctx)(&err)
err = db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
_, err = tx.Tx.ExecContext(ctx, db.db.Rebind(`
INSERT INTO storagenode_storage_tallies (
node_id, data_total)
unnest($2::bytea[]), unnest($3::float8[])`),
pgutil.NodeIDArray(nodeIDs), pgutil.Float8Array(totals))
if err != nil {
return err
return tx.ReplaceNoReturn_AccountingTimestamps(ctx,
return Error.Wrap(err)
// GetTallies retrieves all raw tallies.
func (db *StoragenodeAccounting) GetTallies(ctx context.Context) (_ []*accounting.StoragenodeStorageTally, err error) {
defer mon.Task()(&ctx)(&err)
raws, err := db.db.All_StoragenodeStorageTally(ctx)
out := make([]*accounting.StoragenodeStorageTally, len(raws))
for i, r := range raws {
nodeID, err := storj.NodeIDFromBytes(r.NodeId)
if err != nil {
return nil, Error.Wrap(err)
out[i] = &accounting.StoragenodeStorageTally{
NodeID: nodeID,
IntervalEndTime: r.IntervalEndTime,
DataTotal: r.DataTotal,
return out, Error.Wrap(err)
// GetTalliesSince retrieves all raw tallies since latestRollup.
func (db *StoragenodeAccounting) GetTalliesSince(ctx context.Context, latestRollup time.Time) (_ []*accounting.StoragenodeStorageTally, err error) {
defer mon.Task()(&ctx)(&err)
raws, err := db.db.All_StoragenodeStorageTally_By_IntervalEndTime_GreaterOrEqual(ctx, dbx.StoragenodeStorageTally_IntervalEndTime(latestRollup))
out := make([]*accounting.StoragenodeStorageTally, len(raws))
for i, r := range raws {
nodeID, err := storj.NodeIDFromBytes(r.NodeId)
if err != nil {
return nil, Error.Wrap(err)
out[i] = &accounting.StoragenodeStorageTally{
NodeID: nodeID,
IntervalEndTime: r.IntervalEndTime,
DataTotal: r.DataTotal,
return out, Error.Wrap(err)
func (db *StoragenodeAccounting) getNodeIdsSince(ctx context.Context, since time.Time) (nodeids [][]byte, err error) {
defer mon.Task()(&ctx)(&err)
rows, err := db.db.QueryContext(ctx, db.db.Rebind(`SELECT DISTINCT storagenode_id FROM storagenode_bandwidth_rollups WHERE interval_start >= $1`), since)
if err != nil {
return nil, Error.Wrap(err)
defer func() {
err = errs.Combine(err, Error.Wrap(rows.Close()))
for rows.Next() {
var nodeid []byte
err := rows.Scan(&nodeid)
if err != nil {
return nil, Error.Wrap(err)
nodeids = append(nodeids, nodeid)
err = rows.Err()
if err != nil {
return nil, Error.Wrap(rows.Err())
return nodeids, nil
func (db *StoragenodeAccounting) getBandwidthByNodeSince(ctx context.Context, latestRollup time.Time, nodeid []byte,
cb func(context.Context, *accounting.StoragenodeBandwidthRollup) error) (err error) {
defer mon.Task()(&ctx)(&err)
pageLimit := db.db.opts.ReadRollupBatchSize
if pageLimit <= 0 {
pageLimit = 10000
var cursor *dbx.Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation
for {
rollups, next, err := db.db.Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual(ctx,
dbx.StoragenodeBandwidthRollup_StoragenodeId(nodeid), dbx.StoragenodeBandwidthRollup_IntervalStart(latestRollup),
pageLimit, cursor)
if err != nil {
return Error.Wrap(err)
cursor = next
for _, r := range rollups {
nodeID, err := storj.NodeIDFromBytes(r.StoragenodeId)
if err != nil {
return Error.Wrap(err)
err = cb(ctx, &accounting.StoragenodeBandwidthRollup{
NodeID: nodeID,
IntervalStart: r.IntervalStart,
Action: r.Action,
Settled: r.Settled,
if err != nil {
return err
if cursor == nil {
return nil
func (db *StoragenodeAccounting) getBandwidthPhase2ByNodeSince(ctx context.Context, latestRollup time.Time, nodeid []byte,
cb func(context.Context, *accounting.StoragenodeBandwidthRollup) error) (err error) {
defer mon.Task()(&ctx)(&err)
pageLimit := db.db.opts.ReadRollupBatchSize
if pageLimit <= 0 {
pageLimit = 10000
var cursor *dbx.Paged_StoragenodeBandwidthRollupPhase2_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation
for {
rollups, next, err := db.db.Paged_StoragenodeBandwidthRollupPhase2_By_StoragenodeId_And_IntervalStart_GreaterOrEqual(ctx,
dbx.StoragenodeBandwidthRollupPhase2_StoragenodeId(nodeid), dbx.StoragenodeBandwidthRollupPhase2_IntervalStart(latestRollup),
pageLimit, cursor)
if err != nil {
return Error.Wrap(err)
cursor = next
for _, r := range rollups {
nodeID, err := storj.NodeIDFromBytes(r.StoragenodeId)
if err != nil {
return Error.Wrap(err)
err = cb(ctx, &accounting.StoragenodeBandwidthRollup{
NodeID: nodeID,
IntervalStart: r.IntervalStart,
Action: r.Action,
Settled: r.Settled,
if err != nil {
return err
if cursor == nil {
return nil
// GetBandwidthSince retrieves all storagenode_bandwidth_rollup entires since latestRollup.
func (db *StoragenodeAccounting) GetBandwidthSince(ctx context.Context, latestRollup time.Time,
cb func(context.Context, *accounting.StoragenodeBandwidthRollup) error) (err error) {
defer mon.Task()(&ctx)(&err)
// This table's key structure is storagenode_id, interval_start, so we're going to try and make
// things easier on the database by making individual requests node by node. This is also
// going to allow us to avoid 16 minute queries.
var nodeids [][]byte
for {
nodeids, err = db.getNodeIdsSince(ctx, latestRollup)
if err != nil {
if cockroachutil.NeedsRetry(err) {
return err
for _, nodeid := range nodeids {
err = db.getBandwidthByNodeSince(ctx, latestRollup, nodeid, cb)
if err != nil {
return err
err = db.getBandwidthPhase2ByNodeSince(ctx, latestRollup, nodeid, cb)
if err != nil {
return err
return nil
// SaveRollup records raw tallies of at rest data to the database.
func (db *StoragenodeAccounting) SaveRollup(ctx context.Context, latestRollup time.Time, stats accounting.RollupStats) (err error) {
defer mon.Task()(&ctx)(&err)
if len(stats) == 0 {
return Error.New("In SaveRollup with empty nodeData")
batchSize := db.db.opts.SaveRollupBatchSize
if batchSize <= 0 {
batchSize = 1000
var rollups []*accounting.Rollup
for _, arsByDate := range stats {
for _, ar := range arsByDate {
rollups = append(rollups, ar)
insertBatch := func(ctx context.Context, db *dbx.DB, batch []*accounting.Rollup) (err error) {
defer mon.Task()(&ctx)(&err)
n := len(batch)
nodeID := make([]storj.NodeID, n)
startTime := make([]time.Time, n)
putTotal := make([]int64, n)
getTotal := make([]int64, n)
getAuditTotal := make([]int64, n)
getRepairTotal := make([]int64, n)
putRepairTotal := make([]int64, n)
atRestTotal := make([]float64, n)
intervalEndTime := make([]time.Time, n)
for i, ar := range batch {
nodeID[i] = ar.NodeID
startTime[i] = ar.StartTime
putTotal[i] = ar.PutTotal
getTotal[i] = ar.GetTotal
getAuditTotal[i] = ar.GetAuditTotal
getRepairTotal[i] = ar.GetRepairTotal
putRepairTotal[i] = ar.PutRepairTotal
atRestTotal[i] = ar.AtRestTotal
intervalEndTime[i] = ar.IntervalEndTime
_, err = db.ExecContext(ctx, `
INSERT INTO accounting_rollups (
node_id, start_time,
put_total, get_total,
get_audit_total, get_repair_total, put_repair_total,
SELECT * FROM unnest(
$1::bytea[], $2::timestamptz[],
$3::int8[], $4::int8[],
$5::int8[], $6::int8[], $7::int8[],
ON CONFLICT ( node_id, start_time )
put_total = EXCLUDED.put_total,
get_total = EXCLUDED.get_total,
get_audit_total = EXCLUDED.get_audit_total,
get_repair_total = EXCLUDED.get_repair_total,
put_repair_total = EXCLUDED.put_repair_total,
at_rest_total = EXCLUDED.at_rest_total,
interval_end_time = EXCLUDED.interval_end_time
`, pgutil.NodeIDArray(nodeID), pgutil.TimestampTZArray(startTime),
pgutil.Int8Array(putTotal), pgutil.Int8Array(getTotal),
pgutil.Int8Array(getAuditTotal), pgutil.Int8Array(getRepairTotal), pgutil.Int8Array(putRepairTotal),
return Error.Wrap(err)
// Note: we do not need here a transaction because we will "update" the
// columns when we do not update accounting.LastRollup. We will end up
// with partial data in the database, however in the next runs, we will
// try to fix them.
for len(rollups) > 0 {
batch := rollups
if len(batch) > batchSize {
batch = batch[:batchSize]
rollups = rollups[len(batch):]
if err := insertBatch(ctx, db.db.DB, batch); err != nil {
return Error.Wrap(err)
err = db.db.UpdateNoReturn_AccountingTimestamps_By_Name(ctx,
Value: dbx.AccountingTimestamps_Value(latestRollup),
return Error.Wrap(err)
// LastTimestamp records the greatest last tallied time.
func (db *StoragenodeAccounting) LastTimestamp(ctx context.Context, timestampType string) (_ time.Time, err error) {
defer mon.Task()(&ctx)(&err)
lastTally := time.Time{}
err = db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
lt, err := tx.Find_AccountingTimestamps_Value_By_Name(ctx, dbx.AccountingTimestamps_Name(timestampType))
if lt == nil {
return tx.ReplaceNoReturn_AccountingTimestamps(ctx,
lastTally = lt.Value
return err
return lastTally, err
// QueryPaymentInfo queries Overlay, Accounting Rollup on nodeID.
func (db *StoragenodeAccounting) QueryPaymentInfo(ctx context.Context, start time.Time, end time.Time) (_ []*accounting.CSVRow, err error) {
defer mon.Task()(&ctx)(&err)
sqlStmt := `SELECT, n.created_at, r.at_rest_total, r.get_repair_total,
r.put_repair_total, r.get_audit_total, r.put_total, r.get_total, n.wallet, n.disqualified
SELECT node_id, SUM(at_rest_total::decimal) AS at_rest_total, SUM(get_repair_total) AS get_repair_total,
SUM(put_repair_total) AS put_repair_total, SUM(get_audit_total) AS get_audit_total,
SUM(put_total) AS put_total, SUM(get_total) AS get_total
FROM accounting_rollups
WHERE start_time >= ? AND start_time < ?
GROUP BY node_id
) r
LEFT JOIN nodes n ON = r.node_id
rows, err := db.db.DB.QueryContext(ctx, db.db.Rebind(sqlStmt), start.UTC(), end.UTC())
if err != nil {
return nil, Error.Wrap(err)
defer func() { err = errs.Combine(err, rows.Close()) }()
csv := []*accounting.CSVRow{}
for rows.Next() {
var nodeID []byte
r := &accounting.CSVRow{}
var wallet sql.NullString
var disqualified *time.Time
err := rows.Scan(&nodeID, &r.NodeCreationDate, &r.AtRestTotal, &r.GetRepairTotal,
&r.PutRepairTotal, &r.GetAuditTotal, &r.PutTotal, &r.GetTotal, &wallet, &disqualified)
if err != nil {
return csv, Error.Wrap(err)
if wallet.Valid {
r.Wallet = wallet.String
id, err := storj.NodeIDFromBytes(nodeID)
if err != nil {
return csv, Error.Wrap(err)
r.NodeID = id
r.Disqualified = disqualified
csv = append(csv, r)
return csv, rows.Err()
// QueryStorageNodePeriodUsage returns usage invoices for nodes for a compensation period.
func (db *StoragenodeAccounting) QueryStorageNodePeriodUsage(ctx context.Context, period compensation.Period) (_ []accounting.StorageNodePeriodUsage, err error) {
defer mon.Task()(&ctx)(&err)
stmt := db.db.Rebind(`
SUM(at_rest_total::decimal) AS at_rest_total,
SUM(get_total) AS get_total,
SUM(put_total) AS put_total,
SUM(get_repair_total) AS get_repair_total,
SUM(put_repair_total) AS put_repair_total,
SUM(get_audit_total) AS get_audit_total
start_time >= ? AND start_time < ?
node_id ASC
rows, err := db.db.DB.QueryContext(ctx, stmt, period.StartDate(), period.EndDateExclusive())
if err != nil {
return nil, Error.Wrap(err)
defer func() { err = errs.Combine(err, rows.Close()) }()
usages := []accounting.StorageNodePeriodUsage{}
for rows.Next() {
var nodeID []byte
usage := accounting.StorageNodePeriodUsage{}
if err := rows.Scan(
); err != nil {
return nil, Error.Wrap(err)
usage.NodeID, err = storj.NodeIDFromBytes(nodeID)
if err != nil {
return nil, Error.Wrap(err)
usages = append(usages, usage)
return usages, rows.Err()
// QueryStorageNodeUsage returns slice of StorageNodeUsage for given period.
func (db *StoragenodeAccounting) QueryStorageNodeUsage(ctx context.Context, nodeID storj.NodeID, start time.Time, end time.Time) (_ []accounting.StorageNodeUsage, err error) {
defer mon.Task()(&ctx)(&err)
lastRollup, err := db.db.Find_AccountingTimestamps_Value_By_Name(ctx, dbx.AccountingTimestamps_Name(accounting.LastRollup))
if err != nil {
return nil, Error.Wrap(err)
if lastRollup == nil {
return nil, nil
start, end = start.UTC(), end.UTC()
// TODO: remove COALESCE when we're sure the interval_end_time in the
// accounting_rollups table are fully populated or back-filled with
// the start_time, and the interval_end_time is non-nullable
query := `
SELECT SUM(r1.at_rest_total) as at_rest_total,
(r1.start_time at time zone 'UTC')::date as start_time,
COALESCE(MAX(r1.interval_end_time), MAX(r1.start_time)) AS interval_end_time
FROM accounting_rollups r1
WHERE r1.node_id = $1
AND $2 <= r1.start_time AND r1.start_time <= $3
GROUP BY (r1.start_time at time zone 'UTC')::date
SELECT SUM(t.data_total) AS at_rest_total, (t.interval_end_time at time zone 'UTC')::date AS start_time,
MAX(t.interval_end_time) AS interval_end_time
FROM storagenode_storage_tallies t
WHERE t.node_id = $1
SELECT 1 FROM accounting_rollups r2
WHERE r2.node_id = $1
AND $2 <= r2.start_time AND r2.start_time <= $3
AND (r2.start_time at time zone 'UTC')::date = (t.interval_end_time at time zone 'UTC')::date
AND (SELECT value FROM accounting_timestamps WHERE name = $4) < t.interval_end_time AND t.interval_end_time <= $3
GROUP BY (t.interval_end_time at time zone 'UTC')::date
ORDER BY start_time;
rows, err := db.db.QueryContext(ctx, db.db.Rebind(query),
nodeID, start, end, accounting.LastRollup,
if err != nil {
return nil, Error.Wrap(err)
defer func() { err = errs.Combine(err, rows.Close()) }()
var nodeStorageUsages []accounting.StorageNodeUsage
for rows.Next() {
var atRestTotal float64
var startTime, intervalEndTime dbutil.NullTime
err = rows.Scan(&atRestTotal, &startTime, &intervalEndTime)
if err != nil {
return nil, Error.Wrap(err)
nodeStorageUsages = append(nodeStorageUsages, accounting.StorageNodeUsage{
NodeID: nodeID,
StorageUsed: atRestTotal,
Timestamp: startTime.Time,
IntervalEndTime: intervalEndTime.Time,
return nodeStorageUsages, rows.Err()
// DeleteTalliesBefore deletes all raw tallies prior to some time.
func (db *StoragenodeAccounting) DeleteTalliesBefore(ctx context.Context, latestRollup time.Time, batchSize int) (err error) {
defer mon.Task()(&ctx)(&err)
if batchSize <= 0 {
batchSize = 10000
var query string
switch db.db.impl {
case dbutil.Cockroach:
query = `
DELETE FROM storagenode_storage_tallies
WHERE interval_end_time < ?
case dbutil.Postgres:
query = `
DELETE FROM storagenode_storage_tallies
WHERE ctid IN (
FROM storagenode_storage_tallies
WHERE interval_end_time < ?
ORDER BY interval_end_time
return Error.New("unsupported database: %v", db.db.impl)
query = db.db.Rebind(query)
for {
res, err := db.db.DB.ExecContext(ctx, query, latestRollup, batchSize)
if err != nil {
if errs.Is(err, sql.ErrNoRows) {
return nil
return Error.Wrap(err)
affected, err := res.RowsAffected()
if err != nil {
return Error.Wrap(err)
if affected == 0 {
return nil
// ArchiveRollupsBefore archives rollups older than a given time.
func (db *StoragenodeAccounting) ArchiveRollupsBefore(ctx context.Context, before time.Time, batchSize int) (nodeRollupsDeleted int, err error) {
defer mon.Task()(&ctx)(&err)
if batchSize <= 0 {
return 0, nil
switch db.db.impl {
case dbutil.Cockroach:
for {
row := db.db.QueryRow(ctx, `
WITH rollups_to_move AS (
DELETE FROM storagenode_bandwidth_rollups
WHERE interval_start <= $1
), moved_rollups AS (
INSERT INTO storagenode_bandwidth_rollup_archives SELECT * FROM rollups_to_move RETURNING *
SELECT count(*) FROM moved_rollups
`, before, batchSize)
var rowCount int
err = row.Scan(&rowCount)
if err != nil {
return nodeRollupsDeleted, err
nodeRollupsDeleted += rowCount
if rowCount < batchSize {
return nodeRollupsDeleted, nil
case dbutil.Postgres:
storagenodeStatement := `
WITH rollups_to_move AS (
DELETE FROM storagenode_bandwidth_rollups
WHERE interval_start <= $1
), moved_rollups AS (
INSERT INTO storagenode_bandwidth_rollup_archives SELECT * FROM rollups_to_move RETURNING *
SELECT count(*) FROM moved_rollups
row := db.db.DB.QueryRow(ctx, storagenodeStatement, before)
err = row.Scan(&nodeRollupsDeleted)
return nodeRollupsDeleted, err
return 0, Error.New("unsupported database: %v", db.db.impl)
// GetRollupsSince retrieves all archived bandwidth rollup records since a given time.
func (db *StoragenodeAccounting) GetRollupsSince(ctx context.Context, since time.Time) (bwRollups []accounting.StoragenodeBandwidthRollup, err error) {
defer mon.Task()(&ctx)(&err)
pageLimit := db.db.opts.ReadRollupBatchSize
if pageLimit <= 0 {
pageLimit = 10000
var cursor *dbx.Paged_StoragenodeBandwidthRollup_By_IntervalStart_GreaterOrEqual_Continuation
for {
dbxRollups, next, err := db.db.Paged_StoragenodeBandwidthRollup_By_IntervalStart_GreaterOrEqual(ctx,
pageLimit, cursor)
if err != nil {
return nil, Error.Wrap(err)
cursor = next
for _, dbxRollup := range dbxRollups {
id, err := storj.NodeIDFromBytes(dbxRollup.StoragenodeId)
if err != nil {
return nil, Error.Wrap(err)
bwRollups = append(bwRollups, accounting.StoragenodeBandwidthRollup{
NodeID: id,
IntervalStart: dbxRollup.IntervalStart,
Action: dbxRollup.Action,
Settled: dbxRollup.Settled,
if cursor == nil {
return bwRollups, nil
// GetArchivedRollupsSince retrieves all archived bandwidth rollup records since a given time.
func (db *StoragenodeAccounting) GetArchivedRollupsSince(ctx context.Context, since time.Time) (bwRollups []accounting.StoragenodeBandwidthRollup, err error) {
defer mon.Task()(&ctx)(&err)
pageLimit := db.db.opts.ReadRollupBatchSize
if pageLimit <= 0 {
pageLimit = 10000
var cursor *dbx.Paged_StoragenodeBandwidthRollupArchive_By_IntervalStart_GreaterOrEqual_Continuation
for {
dbxRollups, next, err := db.db.Paged_StoragenodeBandwidthRollupArchive_By_IntervalStart_GreaterOrEqual(ctx,
pageLimit, cursor)
if err != nil {
return nil, Error.Wrap(err)
cursor = next
for _, dbxRollup := range dbxRollups {
id, err := storj.NodeIDFromBytes(dbxRollup.StoragenodeId)
if err != nil {
return nil, Error.Wrap(err)
bwRollups = append(bwRollups, accounting.StoragenodeBandwidthRollup{
NodeID: id,
IntervalStart: dbxRollup.IntervalStart,
Action: dbxRollup.Action,
Settled: dbxRollup.Settled,
if cursor == nil {
return bwRollups, nil