Convert Payments to use SQL, for SUM() and Wallet (#1266)

* payments query no longer DBX, using SQL

* sum in SQL

* removed old function

* fixed rollup test

* wrap errors

* removed DBX code
This commit is contained in:
Bill Thorp 2019-02-07 15:26:55 -05:00 committed by GitHub
parent 8e448e2f6e
commit 0b35762105
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 40 additions and 136 deletions

View File

@ -30,7 +30,7 @@ func TestQuery(t *testing.T) {
timestamp := time.Now().UTC().AddDate(0, 0, -1*days)
start := timestamp
for i := 1; i <= days; i++ {
for i := 0; i <= days; i++ {
err := planet.Satellites[0].DB.Accounting().SaveAtRestRaw(ctx, timestamp, timestamp, nodeData)
assert.NoError(t, err)
@ -50,21 +50,23 @@ func TestQuery(t *testing.T) {
rows, err := planet.Satellites[0].DB.Accounting().QueryPaymentInfo(ctx, start, end)
assert.NoError(t, err)
if i == 1 {
if i == 0 { // we need at least two days for rollup to work
assert.Equal(t, 0, len(rows))
return
continue
}
// TODO: once we sum data totals by node ID across rollups, number of rows should be number of nodes
assert.Equal(t, (i-1)*len(planet.StorageNodes), len(rows))
// the number of rows should be number of nodes
assert.Equal(t, len(planet.StorageNodes), len(rows))
// verify data is correct
for _, r := range rows {
assert.Equal(t, bw[0], r.PutTotal)
assert.Equal(t, bw[1], r.GetTotal)
assert.Equal(t, bw[2], r.GetAuditTotal)
assert.Equal(t, bw[3], r.GetRepairTotal)
assert.Equal(t, atRest, r.AtRestTotal)
i := int64(i)
assert.Equal(t, i*bw[0], r.PutTotal)
assert.Equal(t, i*bw[1], r.GetTotal)
assert.Equal(t, i*bw[2], r.GetAuditTotal)
assert.Equal(t, i*bw[3], r.GetRepairTotal)
assert.Equal(t, float64(i)*atRest, r.AtRestTotal)
assert.NotNil(t, nodeData[r.NodeID])
assert.NotEmpty(t, r.Wallet)
}
}
})

View File

@ -195,31 +195,39 @@ func (db *accountingDB) SaveRollup(ctx context.Context, latestRollup time.Time,
// QueryPaymentInfo queries StatDB, Accounting Rollup on nodeID
func (db *accountingDB) QueryPaymentInfo(ctx context.Context, start time.Time, end time.Time) ([]*accounting.CSVRow, error) {
s := dbx.AccountingRollup_StartTime(start)
e := dbx.AccountingRollup_StartTime(end)
data, err := db.db.All_Node_Id_Node_CreatedAt_Node_AuditSuccessRatio_AccountingRollup_StartTime_AccountingRollup_PutTotal_AccountingRollup_GetTotal_AccountingRollup_GetAuditTotal_AccountingRollup_GetRepairTotal_AccountingRollup_PutRepairTotal_AccountingRollup_AtRestTotal_By_AccountingRollup_StartTime_GreaterOrEqual_And_AccountingRollup_StartTime_Less_OrderBy_Asc_Node_Id(ctx, s, e)
var sql = `SELECT n.id, n.created_at, n.audit_success_ratio, r.at_rest_total, r.get_repair_total,
r.put_repair_total, r.get_audit_total, r.put_total, r.get_total, o.operator_wallet
FROM (
SELECT node_id, SUM(at_rest_total) AS at_rest_total, SUM(get_repair_total) AS get_repair_total,
SUM(put_repair_total) AS put_repair_total, SUM(get_audit_total) AS get_audit_total,
SUM(put_total) AS put_total, SUM(get_total) AS get_total
FROM accounting_rollups
WHERE start_time >= ? AND start_time < ?
GROUP BY node_id
) r
LEFT JOIN nodes n ON n.id = r.node_id
LEFT JOIN overlay_cache_nodes o ON n.id = o.node_id
ORDER BY n.id`
rows, err := db.db.DB.Query(db.db.Rebind(sql), start.UTC(), end.UTC())
if err != nil {
return nil, Error.Wrap(err)
}
var rows []*accounting.CSVRow
for _, record := range data {
nodeID, err := storj.NodeIDFromBytes(record.Node_Id)
defer func() { err = errs.Combine(err, rows.Close()) }()
csv := make([]*accounting.CSVRow, 0, 0)
for rows.Next() {
var nodeID []byte
r := &accounting.CSVRow{}
err := rows.Scan(&nodeID, &r.NodeCreationDate, &r.AuditSuccessRatio, &r.AtRestTotal, &r.GetRepairTotal,
&r.PutRepairTotal, &r.GetAuditTotal, &r.PutTotal, &r.GetTotal, &r.Wallet)
if err != nil {
return rows, err
return csv, Error.Wrap(err)
}
row := &accounting.CSVRow{
NodeID: nodeID,
NodeCreationDate: record.Node_CreatedAt,
AuditSuccessRatio: record.Node_AuditSuccessRatio,
AtRestTotal: record.AccountingRollup_AtRestTotal,
GetRepairTotal: record.AccountingRollup_GetRepairTotal,
PutRepairTotal: record.AccountingRollup_PutRepairTotal,
GetAuditTotal: record.AccountingRollup_GetAuditTotal,
PutTotal: record.AccountingRollup_PutTotal,
GetTotal: record.AccountingRollup_GetTotal,
Date: record.AccountingRollup_StartTime,
id, err := storj.NodeIDFromBytes(nodeID)
if err != nil {
return csv, Error.Wrap(err)
}
rows = append(rows, row)
r.NodeID = id
csv = append(csv, r)
}
return rows, nil
return csv, nil
}

View File

@ -147,15 +147,6 @@ read all (
select node.id
)
// payment csv generation query
read all (
select node.id node.created_at node.audit_success_ratio accounting_rollup.start_time accounting_rollup.put_total accounting_rollup.get_total accounting_rollup.get_audit_total accounting_rollup.get_repair_total accounting_rollup.put_repair_total accounting_rollup.at_rest_total
where accounting_rollup.start_time >= ?
where accounting_rollup.start_time < ?
join node.id = accounting_rollup.node_id
orderby asc node.id
)
//--- overlaycache ---//
model overlay_cache_node (

View File

@ -2479,19 +2479,6 @@ type Id_Row struct {
Id []byte
}
type Node_Id_Node_CreatedAt_Node_AuditSuccessRatio_AccountingRollup_StartTime_AccountingRollup_PutTotal_AccountingRollup_GetTotal_AccountingRollup_GetAuditTotal_AccountingRollup_GetRepairTotal_AccountingRollup_PutRepairTotal_AccountingRollup_AtRestTotal_Row struct {
Node_Id []byte
Node_CreatedAt time.Time
Node_AuditSuccessRatio float64
AccountingRollup_StartTime time.Time
AccountingRollup_PutTotal int64
AccountingRollup_GetTotal int64
AccountingRollup_GetAuditTotal int64
AccountingRollup_GetRepairTotal int64
AccountingRollup_PutRepairTotal int64
AccountingRollup_AtRestTotal float64
}
type OperatorWallet_Row struct {
OperatorWallet string
}
@ -3211,40 +3198,6 @@ func (obj *postgresImpl) All_Node_Id(ctx context.Context) (
}
func (obj *postgresImpl) All_Node_Id_Node_CreatedAt_Node_AuditSuccessRatio_AccountingRollup_StartTime_AccountingRollup_PutTotal_AccountingRollup_GetTotal_AccountingRollup_GetAuditTotal_AccountingRollup_GetRepairTotal_AccountingRollup_PutRepairTotal_AccountingRollup_AtRestTotal_By_AccountingRollup_StartTime_GreaterOrEqual_And_AccountingRollup_StartTime_Less_OrderBy_Asc_Node_Id(ctx context.Context,
accounting_rollup_start_time_greater_or_equal AccountingRollup_StartTime_Field,
accounting_rollup_start_time_less AccountingRollup_StartTime_Field) (
rows []*Node_Id_Node_CreatedAt_Node_AuditSuccessRatio_AccountingRollup_StartTime_AccountingRollup_PutTotal_AccountingRollup_GetTotal_AccountingRollup_GetAuditTotal_AccountingRollup_GetRepairTotal_AccountingRollup_PutRepairTotal_AccountingRollup_AtRestTotal_Row, err error) {
var __embed_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.created_at, nodes.audit_success_ratio, accounting_rollups.start_time, accounting_rollups.put_total, accounting_rollups.get_total, accounting_rollups.get_audit_total, accounting_rollups.get_repair_total, accounting_rollups.put_repair_total, accounting_rollups.at_rest_total FROM nodes JOIN accounting_rollups ON nodes.id = accounting_rollups.node_id WHERE accounting_rollups.start_time >= ? AND accounting_rollups.start_time < ? ORDER BY nodes.id")
var __values []interface{}
__values = append(__values, accounting_rollup_start_time_greater_or_equal.value(), accounting_rollup_start_time_less.value())
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __values...)
__rows, err := obj.driver.Query(__stmt, __values...)
if err != nil {
return nil, obj.makeErr(err)
}
defer __rows.Close()
for __rows.Next() {
row := &Node_Id_Node_CreatedAt_Node_AuditSuccessRatio_AccountingRollup_StartTime_AccountingRollup_PutTotal_AccountingRollup_GetTotal_AccountingRollup_GetAuditTotal_AccountingRollup_GetRepairTotal_AccountingRollup_PutRepairTotal_AccountingRollup_AtRestTotal_Row{}
err = __rows.Scan(&row.Node_Id, &row.Node_CreatedAt, &row.Node_AuditSuccessRatio, &row.AccountingRollup_StartTime, &row.AccountingRollup_PutTotal, &row.AccountingRollup_GetTotal, &row.AccountingRollup_GetAuditTotal, &row.AccountingRollup_GetRepairTotal, &row.AccountingRollup_PutRepairTotal, &row.AccountingRollup_AtRestTotal)
if err != nil {
return nil, obj.makeErr(err)
}
rows = append(rows, row)
}
if err := __rows.Err(); err != nil {
return nil, obj.makeErr(err)
}
return rows, nil
}
func (obj *postgresImpl) Get_OverlayCacheNode_By_NodeId(ctx context.Context,
overlay_cache_node_node_id OverlayCacheNode_NodeId_Field) (
overlay_cache_node *OverlayCacheNode, err error) {
@ -5309,40 +5262,6 @@ func (obj *sqlite3Impl) All_Node_Id(ctx context.Context) (
}
func (obj *sqlite3Impl) All_Node_Id_Node_CreatedAt_Node_AuditSuccessRatio_AccountingRollup_StartTime_AccountingRollup_PutTotal_AccountingRollup_GetTotal_AccountingRollup_GetAuditTotal_AccountingRollup_GetRepairTotal_AccountingRollup_PutRepairTotal_AccountingRollup_AtRestTotal_By_AccountingRollup_StartTime_GreaterOrEqual_And_AccountingRollup_StartTime_Less_OrderBy_Asc_Node_Id(ctx context.Context,
accounting_rollup_start_time_greater_or_equal AccountingRollup_StartTime_Field,
accounting_rollup_start_time_less AccountingRollup_StartTime_Field) (
rows []*Node_Id_Node_CreatedAt_Node_AuditSuccessRatio_AccountingRollup_StartTime_AccountingRollup_PutTotal_AccountingRollup_GetTotal_AccountingRollup_GetAuditTotal_AccountingRollup_GetRepairTotal_AccountingRollup_PutRepairTotal_AccountingRollup_AtRestTotal_Row, err error) {
var __embed_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.created_at, nodes.audit_success_ratio, accounting_rollups.start_time, accounting_rollups.put_total, accounting_rollups.get_total, accounting_rollups.get_audit_total, accounting_rollups.get_repair_total, accounting_rollups.put_repair_total, accounting_rollups.at_rest_total FROM nodes JOIN accounting_rollups ON nodes.id = accounting_rollups.node_id WHERE accounting_rollups.start_time >= ? AND accounting_rollups.start_time < ? ORDER BY nodes.id")
var __values []interface{}
__values = append(__values, accounting_rollup_start_time_greater_or_equal.value(), accounting_rollup_start_time_less.value())
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __values...)
__rows, err := obj.driver.Query(__stmt, __values...)
if err != nil {
return nil, obj.makeErr(err)
}
defer __rows.Close()
for __rows.Next() {
row := &Node_Id_Node_CreatedAt_Node_AuditSuccessRatio_AccountingRollup_StartTime_AccountingRollup_PutTotal_AccountingRollup_GetTotal_AccountingRollup_GetAuditTotal_AccountingRollup_GetRepairTotal_AccountingRollup_PutRepairTotal_AccountingRollup_AtRestTotal_Row{}
err = __rows.Scan(&row.Node_Id, &row.Node_CreatedAt, &row.Node_AuditSuccessRatio, &row.AccountingRollup_StartTime, &row.AccountingRollup_PutTotal, &row.AccountingRollup_GetTotal, &row.AccountingRollup_GetAuditTotal, &row.AccountingRollup_GetRepairTotal, &row.AccountingRollup_PutRepairTotal, &row.AccountingRollup_AtRestTotal)
if err != nil {
return nil, obj.makeErr(err)
}
rows = append(rows, row)
}
if err := __rows.Err(); err != nil {
return nil, obj.makeErr(err)
}
return rows, nil
}
func (obj *sqlite3Impl) Get_OverlayCacheNode_By_NodeId(ctx context.Context,
overlay_cache_node_node_id OverlayCacheNode_NodeId_Field) (
overlay_cache_node *OverlayCacheNode, err error) {
@ -7085,17 +7004,6 @@ func (rx *Rx) All_Node_Id(ctx context.Context) (
return tx.All_Node_Id(ctx)
}
func (rx *Rx) All_Node_Id_Node_CreatedAt_Node_AuditSuccessRatio_AccountingRollup_StartTime_AccountingRollup_PutTotal_AccountingRollup_GetTotal_AccountingRollup_GetAuditTotal_AccountingRollup_GetRepairTotal_AccountingRollup_PutRepairTotal_AccountingRollup_AtRestTotal_By_AccountingRollup_StartTime_GreaterOrEqual_And_AccountingRollup_StartTime_Less_OrderBy_Asc_Node_Id(ctx context.Context,
accounting_rollup_start_time_greater_or_equal AccountingRollup_StartTime_Field,
accounting_rollup_start_time_less AccountingRollup_StartTime_Field) (
rows []*Node_Id_Node_CreatedAt_Node_AuditSuccessRatio_AccountingRollup_StartTime_AccountingRollup_PutTotal_AccountingRollup_GetTotal_AccountingRollup_GetAuditTotal_AccountingRollup_GetRepairTotal_AccountingRollup_PutRepairTotal_AccountingRollup_AtRestTotal_Row, err error) {
var tx *Tx
if tx, err = rx.getTx(ctx); err != nil {
return
}
return tx.All_Node_Id_Node_CreatedAt_Node_AuditSuccessRatio_AccountingRollup_StartTime_AccountingRollup_PutTotal_AccountingRollup_GetTotal_AccountingRollup_GetAuditTotal_AccountingRollup_GetRepairTotal_AccountingRollup_PutRepairTotal_AccountingRollup_AtRestTotal_By_AccountingRollup_StartTime_GreaterOrEqual_And_AccountingRollup_StartTime_Less_OrderBy_Asc_Node_Id(ctx, accounting_rollup_start_time_greater_or_equal, accounting_rollup_start_time_less)
}
func (rx *Rx) All_Project(ctx context.Context) (
rows []*Project, err error) {
var tx *Tx
@ -7726,11 +7634,6 @@ type Methods interface {
All_Node_Id(ctx context.Context) (
rows []*Id_Row, err error)
All_Node_Id_Node_CreatedAt_Node_AuditSuccessRatio_AccountingRollup_StartTime_AccountingRollup_PutTotal_AccountingRollup_GetTotal_AccountingRollup_GetAuditTotal_AccountingRollup_GetRepairTotal_AccountingRollup_PutRepairTotal_AccountingRollup_AtRestTotal_By_AccountingRollup_StartTime_GreaterOrEqual_And_AccountingRollup_StartTime_Less_OrderBy_Asc_Node_Id(ctx context.Context,
accounting_rollup_start_time_greater_or_equal AccountingRollup_StartTime_Field,
accounting_rollup_start_time_less AccountingRollup_StartTime_Field) (
rows []*Node_Id_Node_CreatedAt_Node_AuditSuccessRatio_AccountingRollup_StartTime_AccountingRollup_PutTotal_AccountingRollup_GetTotal_AccountingRollup_GetAuditTotal_AccountingRollup_GetRepairTotal_AccountingRollup_PutRepairTotal_AccountingRollup_AtRestTotal_Row, err error)
All_Project(ctx context.Context) (
rows []*Project, err error)