storj/satellite/satellitedb/storagenodeaccounting.go
Jessica Grebenschikov aeab599d21 satellitedb: removed unused id on storagenode_storage_tallies table, add index on node_id
The goal of this change is to improve the storagenode_storage_tallies table by removing the unneeded id column that is not being used but only taking up space, and also to add an index on a different column that needs it. Removing and adding a column seems simple, but ended up being more complicated because of some cockroachdb limitations.

The cockroachdb limitation when trying to remove a column from a table and create a new primary key are:
1. only allows primary key creation at table creation time (docs: https://www.cockroachlabs.com/docs/stable/primary-key.html)
2. table drop or rename is performed async and cannot be done in a transaction (issue: https://github.com/cockroachdb/cockroach/issues/12123, https://github.com/cockroachdb/cockroach/issues/22868)

To address these differences between cockroachdb  and Postgres, this PR performs different migrations for the two database. The Postgres migration is straight forward and what you would expect, but the cockroach migration has two main changes:
1. To change a primary key, use the recommended process from the cockroachdb docs to create a new table with the new primary key you want and then migrate the data.
2. In order to do 1, we needed to do the new table renaming in a separate transaction from the data migration.

Ref: SM-65

Change-Id: Idc9aee3ab57aa4d5570e3d2980afea853cd966bf
2020-03-20 14:39:44 -07:00

288 lines
9.5 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package satellitedb
import (
"context"
"database/sql"
"time"
"github.com/lib/pq"
"github.com/zeebo/errs"
"storj.io/common/storj"
"storj.io/storj/private/dbutil"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/satellitedb/dbx"
)
// StoragenodeAccounting implements the accounting/db StoragenodeAccounting interface
type StoragenodeAccounting struct {
db *satelliteDB
}
// SaveTallies records raw tallies of at rest data to the database
func (db *StoragenodeAccounting) SaveTallies(ctx context.Context, latestTally time.Time, nodeData map[storj.NodeID]float64) (err error) {
defer mon.Task()(&ctx)(&err)
if len(nodeData) == 0 {
return Error.New("In SaveTallies with empty nodeData")
}
var nodeIDs []storj.NodeID
var totals []float64
for id, total := range nodeData {
nodeIDs = append(nodeIDs, id)
totals = append(totals, total)
}
err = db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
_, err = tx.Tx.ExecContext(ctx, db.db.Rebind(`
INSERT INTO storagenode_storage_tallies (
interval_end_time,
node_id, data_total)
SELECT
$1,
unnest($2::bytea[]), unnest($3::float8[])`),
latestTally,
postgresNodeIDList(nodeIDs), pq.Array(totals))
if err != nil {
return err
}
return tx.UpdateNoReturn_AccountingTimestamps_By_Name(ctx,
dbx.AccountingTimestamps_Name(accounting.LastAtRestTally),
dbx.AccountingTimestamps_Update_Fields{
Value: dbx.AccountingTimestamps_Value(latestTally),
},
)
})
return Error.Wrap(err)
}
// GetTallies retrieves all raw tallies
func (db *StoragenodeAccounting) GetTallies(ctx context.Context) (_ []*accounting.StoragenodeStorageTally, err error) {
defer mon.Task()(&ctx)(&err)
raws, err := db.db.All_StoragenodeStorageTally(ctx)
out := make([]*accounting.StoragenodeStorageTally, len(raws))
for i, r := range raws {
nodeID, err := storj.NodeIDFromBytes(r.NodeId)
if err != nil {
return nil, Error.Wrap(err)
}
out[i] = &accounting.StoragenodeStorageTally{
NodeID: nodeID,
IntervalEndTime: r.IntervalEndTime,
DataTotal: r.DataTotal,
}
}
return out, Error.Wrap(err)
}
// GetTalliesSince retrieves all raw tallies since latestRollup
func (db *StoragenodeAccounting) GetTalliesSince(ctx context.Context, latestRollup time.Time) (_ []*accounting.StoragenodeStorageTally, err error) {
defer mon.Task()(&ctx)(&err)
raws, err := db.db.All_StoragenodeStorageTally_By_IntervalEndTime_GreaterOrEqual(ctx, dbx.StoragenodeStorageTally_IntervalEndTime(latestRollup))
out := make([]*accounting.StoragenodeStorageTally, len(raws))
for i, r := range raws {
nodeID, err := storj.NodeIDFromBytes(r.NodeId)
if err != nil {
return nil, Error.Wrap(err)
}
out[i] = &accounting.StoragenodeStorageTally{
NodeID: nodeID,
IntervalEndTime: r.IntervalEndTime,
DataTotal: r.DataTotal,
}
}
return out, Error.Wrap(err)
}
// GetBandwidthSince retrieves all storagenode_bandwidth_rollup entires since latestRollup
func (db *StoragenodeAccounting) GetBandwidthSince(ctx context.Context, latestRollup time.Time) (_ []*accounting.StoragenodeBandwidthRollup, err error) {
defer mon.Task()(&ctx)(&err)
rollups, err := db.db.All_StoragenodeBandwidthRollup_By_IntervalStart_GreaterOrEqual(ctx, dbx.StoragenodeBandwidthRollup_IntervalStart(latestRollup))
out := make([]*accounting.StoragenodeBandwidthRollup, len(rollups))
for i, r := range rollups {
nodeID, err := storj.NodeIDFromBytes(r.StoragenodeId)
if err != nil {
return nil, Error.Wrap(err)
}
out[i] = &accounting.StoragenodeBandwidthRollup{
NodeID: nodeID,
IntervalStart: r.IntervalStart,
Action: r.Action,
Settled: r.Settled,
}
}
return out, Error.Wrap(err)
}
// SaveRollup records raw tallies of at rest data to the database
func (db *StoragenodeAccounting) SaveRollup(ctx context.Context, latestRollup time.Time, stats accounting.RollupStats) (err error) {
defer mon.Task()(&ctx)(&err)
if len(stats) == 0 {
return Error.New("In SaveRollup with empty nodeData")
}
err = db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
for _, arsByDate := range stats {
for _, ar := range arsByDate {
nID := dbx.AccountingRollup_NodeId(ar.NodeID.Bytes())
start := dbx.AccountingRollup_StartTime(ar.StartTime)
put := dbx.AccountingRollup_PutTotal(ar.PutTotal)
get := dbx.AccountingRollup_GetTotal(ar.GetTotal)
audit := dbx.AccountingRollup_GetAuditTotal(ar.GetAuditTotal)
getRepair := dbx.AccountingRollup_GetRepairTotal(ar.GetRepairTotal)
putRepair := dbx.AccountingRollup_PutRepairTotal(ar.PutRepairTotal)
atRest := dbx.AccountingRollup_AtRestTotal(ar.AtRestTotal)
err := tx.CreateNoReturn_AccountingRollup(ctx, nID, start, put, get, audit, getRepair, putRepair, atRest)
if err != nil {
return err
}
}
}
return tx.UpdateNoReturn_AccountingTimestamps_By_Name(ctx,
dbx.AccountingTimestamps_Name(accounting.LastRollup),
dbx.AccountingTimestamps_Update_Fields{
Value: dbx.AccountingTimestamps_Value(latestRollup),
},
)
})
return Error.Wrap(err)
}
// LastTimestamp records the greatest last tallied time
func (db *StoragenodeAccounting) LastTimestamp(ctx context.Context, timestampType string) (_ time.Time, err error) {
defer mon.Task()(&ctx)(&err)
lastTally := time.Time{}
err = db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
lt, err := tx.Find_AccountingTimestamps_Value_By_Name(ctx, dbx.AccountingTimestamps_Name(timestampType))
if lt == nil {
return tx.CreateNoReturn_AccountingTimestamps(ctx,
dbx.AccountingTimestamps_Name(timestampType),
dbx.AccountingTimestamps_Value(lastTally),
)
}
lastTally = lt.Value
return err
})
return lastTally, err
}
// QueryPaymentInfo queries Overlay, Accounting Rollup on nodeID
func (db *StoragenodeAccounting) QueryPaymentInfo(ctx context.Context, start time.Time, end time.Time) (_ []*accounting.CSVRow, err error) {
defer mon.Task()(&ctx)(&err)
var sqlStmt = `SELECT n.id, n.created_at, r.at_rest_total, r.get_repair_total,
r.put_repair_total, r.get_audit_total, r.put_total, r.get_total, n.wallet, n.disqualified
FROM (
SELECT node_id, SUM(at_rest_total) AS at_rest_total, SUM(get_repair_total) AS get_repair_total,
SUM(put_repair_total) AS put_repair_total, SUM(get_audit_total) AS get_audit_total,
SUM(put_total) AS put_total, SUM(get_total) AS get_total
FROM accounting_rollups
WHERE start_time >= ? AND start_time < ?
GROUP BY node_id
) r
LEFT JOIN nodes n ON n.id = r.node_id
ORDER BY n.id`
rows, err := db.db.DB.QueryContext(ctx, db.db.Rebind(sqlStmt), start.UTC(), end.UTC())
if err != nil {
return nil, Error.Wrap(err)
}
defer func() { err = errs.Combine(err, rows.Close()) }()
csv := []*accounting.CSVRow{}
for rows.Next() {
var nodeID []byte
r := &accounting.CSVRow{}
var wallet sql.NullString
var disqualified *time.Time
err := rows.Scan(&nodeID, &r.NodeCreationDate, &r.AtRestTotal, &r.GetRepairTotal,
&r.PutRepairTotal, &r.GetAuditTotal, &r.PutTotal, &r.GetTotal, &wallet, &disqualified)
if err != nil {
return csv, Error.Wrap(err)
}
if wallet.Valid {
r.Wallet = wallet.String
}
id, err := storj.NodeIDFromBytes(nodeID)
if err != nil {
return csv, Error.Wrap(err)
}
r.NodeID = id
r.Disqualified = disqualified
csv = append(csv, r)
}
return csv, rows.Err()
}
// QueryStorageNodeUsage returns slice of StorageNodeUsage for given period
func (db *StoragenodeAccounting) QueryStorageNodeUsage(ctx context.Context, nodeID storj.NodeID, start time.Time, end time.Time) (_ []accounting.StorageNodeUsage, err error) {
defer mon.Task()(&ctx)(&err)
lastRollup, err := db.db.Find_AccountingTimestamps_Value_By_Name(ctx, dbx.AccountingTimestamps_Name(accounting.LastRollup))
if err != nil {
return nil, Error.Wrap(err)
}
if lastRollup == nil {
return nil, nil
}
start, end = start.UTC(), end.UTC()
query := `
SELECT at_rest_total, start_time::date
FROM accounting_rollups
WHERE node_id = $1
AND $2 <= start_time AND start_time <= $3
UNION
SELECT SUM(data_total) AS at_rest_total, interval_end_time::date AS start_time
FROM storagenode_storage_tallies
WHERE node_id = $1
AND NOT EXISTS (
SELECT 1 FROM accounting_rollups
WHERE node_id = $1
AND $2 <= start_time AND start_time <= $3
AND start_time::date = interval_end_time::date
)
AND (SELECT value FROM accounting_timestamps WHERE name = $4) < interval_end_time AND interval_end_time <= $3
GROUP BY interval_end_time::date
ORDER BY start_time;
`
rows, err := db.db.QueryContext(ctx, db.db.Rebind(query),
nodeID, start, end, accounting.LastRollup,
)
if err != nil {
return nil, Error.Wrap(err)
}
defer func() { err = errs.Combine(err, rows.Close()) }()
var nodeStorageUsages []accounting.StorageNodeUsage
for rows.Next() {
var atRestTotal float64
var startTime dbutil.NullTime
err = rows.Scan(&atRestTotal, &startTime)
if err != nil {
return nil, Error.Wrap(err)
}
nodeStorageUsages = append(nodeStorageUsages, accounting.StorageNodeUsage{
NodeID: nodeID,
StorageUsed: atRestTotal,
Timestamp: startTime.Time,
})
}
return nodeStorageUsages, rows.Err()
}
// DeleteTalliesBefore deletes all raw tallies prior to some time
func (db *StoragenodeAccounting) DeleteTalliesBefore(ctx context.Context, latestRollup time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
var deleteRawSQL = `DELETE FROM storagenode_storage_tallies WHERE interval_end_time < ?`
_, err = db.db.DB.ExecContext(ctx, db.db.Rebind(deleteRawSQL), latestRollup)
return err
}