satellite/compensation: add total-paid and total-distributed to invoices

Change-Id: Id4414867917cbf8aad77795f764d6381e88d9a34
This commit is contained in:
Jeff Wendling 2021-01-27 13:53:16 -05:00 committed by JT Olio
parent 339d1212cd
commit 759bdd6794
5 changed files with 36 additions and 20 deletions

View File

@ -12,7 +12,6 @@ import (
"github.com/zeebo/errs" "github.com/zeebo/errs"
"go.uber.org/zap" "go.uber.org/zap"
"storj.io/storj/private/currency"
"storj.io/storj/satellite/compensation" "storj.io/storj/satellite/compensation"
"storj.io/storj/satellite/satellitedb" "storj.io/storj/satellite/satellitedb"
) )
@ -51,7 +50,7 @@ func generateInvoicesCSV(ctx context.Context, period compensation.Period, out io
invoices := make([]compensation.Invoice, 0, len(periodUsage)) invoices := make([]compensation.Invoice, 0, len(periodUsage))
for _, usage := range periodUsage { for _, usage := range periodUsage {
withheldAmounts, err := db.Compensation().QueryWithheldAmounts(ctx, usage.NodeID) totalAmounts, err := db.Compensation().QueryTotalAmounts(ctx, usage.NodeID)
if err != nil { if err != nil {
return err return err
} }
@ -89,8 +88,10 @@ func generateInvoicesCSV(ctx context.Context, period compensation.Period, out io
UsageGetRepair: usage.GetRepairTotal, UsageGetRepair: usage.GetRepairTotal,
UsagePutRepair: usage.PutRepairTotal, UsagePutRepair: usage.PutRepairTotal,
UsageGetAudit: usage.GetAuditTotal, UsageGetAudit: usage.GetAuditTotal,
TotalHeld: withheldAmounts.TotalHeld, TotalHeld: totalAmounts.TotalHeld,
TotalDisposed: withheldAmounts.TotalDisposed, TotalDisposed: totalAmounts.TotalDisposed,
TotalPaid: totalAmounts.TotalPaid,
TotalDistributed: totalAmounts.TotalDistributed,
} }
invoice := compensation.Invoice{ invoice := compensation.Invoice{
@ -99,7 +100,6 @@ func generateInvoicesCSV(ctx context.Context, period compensation.Period, out io
NodeWallet: node.Operator.Wallet, NodeWallet: node.Operator.Wallet,
NodeAddress: nodeAddress, NodeAddress: nodeAddress,
NodeLastIP: nodeLastIP, NodeLastIP: nodeLastIP,
PaidYTD: currency.Zero, // deprecated
} }
if err := invoice.MergeNodeInfo(nodeInfo); err != nil { if err := invoice.MergeNodeInfo(nodeInfo); err != nil {

View File

@ -10,16 +10,23 @@ import (
"storj.io/storj/private/currency" "storj.io/storj/private/currency"
) )
// WithheldAmounts holds the amounts held and disposed. // TotalAmounts holds the amounts held and disposed.
type WithheldAmounts struct { //
TotalHeld currency.MicroUnit // Invariants:
TotalDisposed currency.MicroUnit // TotalHeld >= TotalDisposed
// TotalPaid >= TotalDisposed
// TotalPaid >= TotalDistributed (we may distribute less due to minimum payout threshold)
type TotalAmounts struct {
TotalHeld currency.MicroUnit // portion from owed that was held back
TotalDisposed currency.MicroUnit // portion from held back that went into paid
TotalPaid currency.MicroUnit // earned amount that is available to be distributed
TotalDistributed currency.MicroUnit // amount actually transferred to the operator
} }
// DB is the interface we need to source the data to calculate compensation. // DB is the interface we need to source the data to calculate compensation.
type DB interface { type DB interface {
// QueryWithheldAmounts queries the WithheldAmounts for the given nodeID. // QueryTotalAmounts queries the WithheldAmounts for the given nodeID.
QueryWithheldAmounts(ctx context.Context, nodeID storj.NodeID) (WithheldAmounts, error) QueryTotalAmounts(ctx context.Context, nodeID storj.NodeID) (TotalAmounts, error)
// RecordPeriod records a set of paystubs and payments for some time period. // RecordPeriod records a set of paystubs and payments for some time period.
RecordPeriod(ctx context.Context, paystubs []Paystub, payments []Payment) error RecordPeriod(ctx context.Context, paystubs []Paystub, payments []Payment) error

View File

@ -40,7 +40,8 @@ type Invoice struct {
Disposed currency.MicroUnit `csv:"disposed"` // Amount of owed that is due to graceful-exit or held period ending Disposed currency.MicroUnit `csv:"disposed"` // Amount of owed that is due to graceful-exit or held period ending
TotalHeld currency.MicroUnit `csv:"total-held"` // Total amount ever held from the node TotalHeld currency.MicroUnit `csv:"total-held"` // Total amount ever held from the node
TotalDisposed currency.MicroUnit `csv:"total-disposed"` // Total amount ever disposed to the node TotalDisposed currency.MicroUnit `csv:"total-disposed"` // Total amount ever disposed to the node
PaidYTD currency.MicroUnit `csv:"paid-ytd"` // Deprecated TotalPaid currency.MicroUnit `csv:"total-paid"` // Total amount ever paid to the node (but not necessarily dispensed)
TotalDistributed currency.MicroUnit `csv:"total-distributed"` // Total amount ever distributed to the node (always less than or equal to paid)
} }
// MergeNodeInfo updates the fields representing the node information into the invoice. // MergeNodeInfo updates the fields representing the node information into the invoice.
@ -59,6 +60,8 @@ func (invoice *Invoice) MergeNodeInfo(nodeInfo NodeInfo) error {
invoice.UsageGetAudit = nodeInfo.UsageGetAudit invoice.UsageGetAudit = nodeInfo.UsageGetAudit
invoice.TotalHeld = nodeInfo.TotalHeld invoice.TotalHeld = nodeInfo.TotalHeld
invoice.TotalDisposed = nodeInfo.TotalDisposed invoice.TotalDisposed = nodeInfo.TotalDisposed
invoice.TotalPaid = nodeInfo.TotalPaid
invoice.TotalDistributed = nodeInfo.TotalDistributed
return nil return nil
} }

View File

@ -49,6 +49,8 @@ type NodeInfo struct {
UsageGetAudit int64 UsageGetAudit int64
TotalHeld currency.MicroUnit TotalHeld currency.MicroUnit
TotalDisposed currency.MicroUnit TotalDisposed currency.MicroUnit
TotalPaid currency.MicroUnit
TotalDistributed currency.MicroUnit
} }
// Statement is the computed amounts and codes from a node. // Statement is the computed amounts and codes from a node.

View File

@ -16,28 +16,32 @@ type compensationDB struct {
db *satelliteDB db *satelliteDB
} }
// QueryWithheldAmounts returns withheld data for the given node. // QueryTotalAmounts returns withheld data for the given node.
func (comp *compensationDB) QueryWithheldAmounts(ctx context.Context, nodeID storj.NodeID) (_ compensation.WithheldAmounts, err error) { func (comp *compensationDB) QueryTotalAmounts(ctx context.Context, nodeID storj.NodeID) (_ compensation.TotalAmounts, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
stmt := comp.db.Rebind(` stmt := comp.db.Rebind(`
SELECT SELECT
coalesce(SUM(held), 0) AS total_held, coalesce(SUM(held), 0) AS total_held,
coalesce(SUM(disposed), 0) AS total_disposed coalesce(SUM(disposed), 0) AS total_disposed
coalesce(SUM(paid), 0) AS total_paid,
coalesce(SUM(distributed), 0) AS total_distributed
FROM FROM
storagenode_paystubs storagenode_paystubs
WHERE WHERE
node_id = ? node_id = ?
`) `)
var totalHeld, totalDisposed int64 var totalHeld, totalDisposed, totalPaid, totalDistributed int64
if err := comp.db.DB.QueryRow(ctx, stmt, nodeID).Scan(&totalHeld, &totalDisposed); err != nil { if err := comp.db.DB.QueryRow(ctx, stmt, nodeID).Scan(&totalHeld, &totalDisposed, &totalPaid, &totalDistributed); err != nil {
return compensation.WithheldAmounts{}, Error.Wrap(err) return compensation.TotalAmounts{}, Error.Wrap(err)
} }
return compensation.WithheldAmounts{ return compensation.TotalAmounts{
TotalHeld: currency.NewMicroUnit(totalHeld), TotalHeld: currency.NewMicroUnit(totalHeld),
TotalDisposed: currency.NewMicroUnit(totalDisposed), TotalDisposed: currency.NewMicroUnit(totalDisposed),
TotalPaid: currency.NewMicroUnit(totalPaid),
TotalDistributed: currency.NewMicroUnit(totalDistributed),
}, nil }, nil
} }