satellite: compensation package and commands

Change-Id: I7fd6399837e45ff48e5f3d47a95192a01d58e125
This commit is contained in:
Jeff Wendling 2020-03-10 14:42:11 -06:00
parent 23e5a0471f
commit e2ff2ce672
28 changed files with 1749 additions and 3 deletions

View File

@ -0,0 +1,180 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"context"
"io"
"net"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/storj/satellite/compensation"
"storj.io/storj/satellite/satellitedb"
)
func generateInvoicesCSV(ctx context.Context, period compensation.Period, out io.Writer) (err error) {
periodInfo := compensation.PeriodInfo{
Period: period,
Rates: &compensation.Rates{
AtRestGBHours: generateInvoicesCfg.Compensation.Rates.AtRestGBHours,
GetTB: generateInvoicesCfg.Compensation.Rates.GetTB,
PutTB: generateInvoicesCfg.Compensation.Rates.PutTB,
GetRepairTB: generateInvoicesCfg.Compensation.Rates.GetRepairTB,
PutRepairTB: generateInvoicesCfg.Compensation.Rates.PutRepairTB,
GetAuditTB: generateInvoicesCfg.Compensation.Rates.GetAuditTB,
},
SurgePercent: generateInvoicesCfg.SurgePercent,
DisposePercent: generateInvoicesCfg.Compensation.DisposePercent,
WithheldPercents: generateInvoicesCfg.Compensation.WithheldPercents,
}
db, err := satellitedb.New(zap.L().Named("db"), generateInvoicesCfg.Database, satellitedb.Options{})
if err != nil {
return errs.New("error connecting to master database on satellite: %+v", err)
}
defer func() { err = errs.Combine(err, db.Close()) }()
if err := db.CheckVersion(ctx); err != nil {
zap.S().Fatal("failed satellite database version check: ", err)
return errs.New("Error checking version for satellitedb: %+v", err)
}
periodUsage, err := db.StoragenodeAccounting().QueryStorageNodePeriodUsage(ctx, period)
if err != nil {
return err
}
invoices := make([]compensation.Invoice, 0, len(periodUsage))
for _, usage := range periodUsage {
withheldAmounts, err := db.Compensation().QueryWithheldAmounts(ctx, usage.NodeID)
if err != nil {
return err
}
node, err := db.OverlayCache().Get(ctx, usage.NodeID)
if err != nil {
return err
}
var gracefulExit *time.Time
if node.ExitStatus.ExitSuccess {
gracefulExit = node.ExitStatus.ExitFinishedAt
}
nodeAddress, _, err := net.SplitHostPort(node.Address.Address)
if err != nil {
return errs.New("unable to split node %q address %q", usage.NodeID, node.Address.Address)
}
nodeLastIP, _, err := net.SplitHostPort(node.LastIPPort)
if err != nil {
return errs.New("unable to split node %q last ip:port %q", usage.NodeID, node.LastIPPort)
}
paidYTD, err := db.Compensation().QueryPaidInYear(ctx, usage.NodeID, period.Year)
if err != nil {
return err
}
nodeInfo := compensation.NodeInfo{
ID: usage.NodeID,
CreatedAt: node.CreatedAt,
Disqualified: node.Disqualified,
GracefulExit: gracefulExit,
UsageAtRest: usage.AtRestTotal,
UsageGet: usage.GetTotal,
UsagePut: usage.PutTotal,
UsageGetRepair: usage.GetRepairTotal,
UsagePutRepair: usage.PutRepairTotal,
UsageGetAudit: usage.GetAuditTotal,
TotalHeld: withheldAmounts.TotalHeld,
TotalDisposed: withheldAmounts.TotalDisposed,
}
invoice := compensation.Invoice{
Period: period,
NodeID: compensation.NodeID(usage.NodeID),
NodeWallet: node.Operator.Wallet,
NodeAddress: nodeAddress,
NodeLastIP: nodeLastIP,
PaidYTD: paidYTD,
}
if err := invoice.MergeNodeInfo(nodeInfo); err != nil {
return err
}
invoices = append(invoices, invoice)
periodInfo.Nodes = append(periodInfo.Nodes, nodeInfo)
}
statements, err := compensation.GenerateStatements(periodInfo)
if err != nil {
return err
}
for i := 0; i < len(statements); i++ {
if err := invoices[i].MergeStatement(statements[i]); err != nil {
return err
}
}
if err := compensation.WriteInvoices(out, invoices); err != nil {
return err
}
return nil
}
func recordPeriod(ctx context.Context, paystubsCSV, paymentsCSV string) (int, int, error) {
paystubs, err := compensation.LoadPaystubs(paystubsCSV)
if err != nil {
return 0, 0, err
}
payments, err := compensation.LoadPayments(paymentsCSV)
if err != nil {
return 0, 0, err
}
db, err := satellitedb.New(zap.L().Named("db"), recordPeriodCfg.Database, satellitedb.Options{})
if err != nil {
return 0, 0, errs.New("error connecting to master database on satellite: %+v", err)
}
defer func() { err = errs.Combine(err, db.Close()) }()
if err := db.CheckVersion(ctx); err != nil {
zap.S().Fatal("failed satellite database version check: ", err)
return 0, 0, errs.New("Error checking version for satellitedb: %+v", err)
}
if err := db.Compensation().RecordPeriod(ctx, paystubs, payments); err != nil {
return 0, 0, err
}
return len(paystubs), len(payments), nil
}
func recordOneOffPayments(ctx context.Context, paymentsCSV string) (int, error) {
payments, err := compensation.LoadPayments(paymentsCSV)
if err != nil {
return 0, err
}
db, err := satellitedb.New(zap.L().Named("db"), recordOneOffPaymentsCfg.Database, satellitedb.Options{})
if err != nil {
return 0, errs.New("error connecting to master database on satellite: %+v", err)
}
defer func() { err = errs.Combine(err, db.Close()) }()
if err := db.CheckVersion(ctx); err != nil {
zap.S().Fatal("failed satellite database version check: ", err)
return 0, errs.New("Error checking version for satellitedb: %+v", err)
}
if err := db.Compensation().RecordPayments(ctx, payments); err != nil {
return 0, err
}
return len(payments), nil
}

View File

@ -6,6 +6,7 @@ package main
import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"text/tabwriter"
@ -27,6 +28,7 @@ import (
"storj.io/storj/pkg/revocation"
"storj.io/storj/satellite"
"storj.io/storj/satellite/accounting/live"
"storj.io/storj/satellite/compensation"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/satellitedb"
@ -138,6 +140,31 @@ var (
Long: "Ensures that we have a stripe customer for every satellite user",
RunE: cmdStripeCustomer,
}
compensationCmd = &cobra.Command{
Use: "compensation",
Short: "Storage Node Compensation commands",
}
generateInvoicesCmd = &cobra.Command{
Use: "generate-invoices [period]",
Short: "Generate storage node invoices",
Long: "Generate storage node invoices for a pay period. Period is a UTC date formatted like YYYY-MM.",
Args: cobra.ExactArgs(1),
RunE: cmdGenerateInvoices,
}
recordPeriodCmd = &cobra.Command{
Use: "record-period [paystubs-csv] [payments-csv]",
Short: "Record storage node pay period",
Long: "Record storage node paystubs and payments for a pay period",
Args: cobra.ExactArgs(2),
RunE: cmdRecordPeriod,
}
recordOneOffPaymentsCmd = &cobra.Command{
Use: "record-one-off-payments [payments-csv]",
Short: "Record one-off storage node payments",
Long: "Record one-off storage node payments outside of a pay period",
Args: cobra.ExactArgs(1),
RunE: cmdRecordOneOffPayments,
}
runCfg Satellite
setupCfg Satellite
@ -150,6 +177,18 @@ var (
Database string `help:"satellite database connection string" releaseDefault:"postgres://" devDefault:"postgres://"`
Output string `help:"destination of report output" default:""`
}
generateInvoicesCfg struct {
Database string `help:"satellite database connection string" releaseDefault:"postgres://" devDefault:"postgres://"`
Output string `help:"destination of report output" default:""`
Compensation compensation.Config
SurgePercent int `help:"surge percent for payments" default:"0"`
}
recordPeriodCfg struct {
Database string `help:"satellite database connection string" releaseDefault:"postgres://" devDefault:"postgres://"`
}
recordOneOffPaymentsCfg struct {
Database string `help:"satellite database connection string" releaseDefault:"postgres://" devDefault:"postgres://"`
}
partnerAttribtionCfg struct {
Database string `help:"satellite database connection string" releaseDefault:"postgres://" devDefault:"postgres://"`
Output string `help:"destination of report output" default:""`
@ -180,11 +219,15 @@ func init() {
rootCmd.AddCommand(setupCmd)
rootCmd.AddCommand(qdiagCmd)
rootCmd.AddCommand(reportsCmd)
rootCmd.AddCommand(compensationCmd)
reportsCmd.AddCommand(nodeUsageCmd)
reportsCmd.AddCommand(partnerAttributionCmd)
reportsCmd.AddCommand(gracefulExitCmd)
reportsCmd.AddCommand(verifyGracefulExitReceiptCmd)
reportsCmd.AddCommand(stripeCustomerCmd)
compensationCmd.AddCommand(generateInvoicesCmd)
compensationCmd.AddCommand(recordPeriodCmd)
compensationCmd.AddCommand(recordOneOffPaymentsCmd)
process.Bind(runCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(runMigrationCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(runAPICmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
@ -194,6 +237,9 @@ func init() {
process.Bind(setupCmd, &setupCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir), cfgstruct.SetupMode())
process.Bind(qdiagCmd, &qdiagCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(nodeUsageCmd, &nodeUsageCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(generateInvoicesCmd, &generateInvoicesCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(recordPeriodCmd, &recordPeriodCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(recordOneOffPaymentsCmd, &recordOneOffPaymentsCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(gracefulExitCmd, &gracefulExitCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(verifyGracefulExitReceiptCmd, &verifyGracefulExitReceiptCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(stripeCustomerCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
@ -435,6 +481,49 @@ func cmdStripeCustomer(cmd *cobra.Command, args []string) (err error) {
return generateStripeCustomers(ctx)
}
func cmdGenerateInvoices(cmd *cobra.Command, args []string) (err error) {
ctx, _ := process.Ctx(cmd)
period, err := compensation.PeriodFromString(args[0])
if err != nil {
return err
}
if err := runWithOutput(generateInvoicesCfg.Output, func(out io.Writer) error {
return generateInvoicesCSV(ctx, period, out)
}); err != nil {
return err
}
if generateInvoicesCfg.Output != "" {
fmt.Println("Generated invoices")
}
return nil
}
func cmdRecordPeriod(cmd *cobra.Command, args []string) (err error) {
ctx, _ := process.Ctx(cmd)
paystubsCount, paymentsCount, err := recordPeriod(ctx, args[0], args[1])
if err != nil {
return err
}
fmt.Println(paystubsCount, "paystubs recorded")
fmt.Println(paymentsCount, "payments recorded")
return nil
}
func cmdRecordOneOffPayments(cmd *cobra.Command, args []string) (err error) {
ctx, _ := process.Ctx(cmd)
count, err := recordOneOffPayments(ctx, args[0])
if err != nil {
return err
}
fmt.Println(count, "payments recorded")
return nil
}
func cmdValueAttribution(cmd *cobra.Command, args []string) (err error) {
ctx, _ := process.Ctx(cmd)
log := zap.L().Named("satellite-cli")

31
cmd/satellite/output.go Normal file
View File

@ -0,0 +1,31 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"io"
"os"
"github.com/zeebo/errs"
)
func runWithOutput(output string, fn func(io.Writer) error) (err error) {
if output == "" {
return fn(os.Stdout)
}
outputTmp := output + ".tmp"
file, err := os.Create(outputTmp)
if err != nil {
return errs.New("unable to create temporary output file: %v", err)
}
err = errs.Combine(err, fn(file))
err = errs.Combine(err, file.Close())
if err == nil {
err = errs.Combine(err, os.Rename(outputTmp, output))
}
if err != nil {
return errs.Combine(err, os.Remove(outputTmp))
}
return err
}

View File

@ -0,0 +1,11 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package currency
import "github.com/zeebo/errs"
var (
// Error wraps errors coming from this package.
Error = errs.Class("currency")
)

View File

@ -0,0 +1,61 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package currency
import (
"math"
"github.com/shopspring/decimal"
"github.com/zeebo/errs"
)
var (
maxInt64 = decimal.NewFromInt(math.MaxInt64)
// Zero is a MicroUnit representing 0.
Zero MicroUnit
)
// NewMicroUnit returns a MicroUnit with v. Much like a time.Duration, a value
// of 1 means 1e-6 or one millionth of a unit of currency.
func NewMicroUnit(v int64) MicroUnit {
return MicroUnit{v: v}
}
// MicroUnit represents 1e-6 or one millionth of a unit of currency (e.g. one
// millionth of a dollar). It is used instead of a floating point type to
// prevent rounding errors.
type MicroUnit struct{ v int64 }
// Value returns the underlying MicroUnit value.
func (m MicroUnit) Value() int64 { return m.v }
// Decimal returns the a decimal form of the MicroUnit.
func (m MicroUnit) Decimal() decimal.Decimal {
return decimal.New(m.v, -6)
}
// FloatString returns a string fixed to 6 decimal places.
func (m MicroUnit) FloatString() string {
return m.Decimal().StringFixed(6)
}
// MicroUnitFromFloatString parses the string from FloatString into a MicroUnit.
func MicroUnitFromFloatString(s string) (MicroUnit, error) {
d, err := decimal.NewFromString(s)
if err != nil {
return MicroUnit{}, errs.Wrap(err)
}
return MicroUnitFromDecimal(d)
}
// MicroUnitFromDecimal returns a MicroUnit from a decimal value and returns an
// error if there is not enough precision.
func MicroUnitFromDecimal(d decimal.Decimal) (MicroUnit, error) {
m := d.Shift(6).Truncate(0)
if m.GreaterThan(maxInt64) {
return MicroUnit{}, errs.New("%s overflows micro-unit", d)
}
return MicroUnit{v: m.IntPart()}, nil
}

View File

@ -0,0 +1,20 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package currency
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestMicroUnitToFloatString(t *testing.T) {
require.Equal(t, "1.002332", NewMicroUnit(1002332).FloatString())
}
func TestMicroUnitFromFloatString(t *testing.T) {
m, err := MicroUnitFromFloatString("0.012340")
require.NoError(t, err)
require.Equal(t, NewMicroUnit(12340), m)
}

View File

@ -11,6 +11,7 @@ import (
"storj.io/common/memory"
"storj.io/common/storj"
"storj.io/storj/satellite/compensation"
)
// RollupStats is a convenience alias
@ -45,6 +46,17 @@ type Rollup struct {
AtRestTotal float64
}
// StorageNodePeriodUsage represents a statement for a node for a compensation period
type StorageNodePeriodUsage struct {
NodeID storj.NodeID
AtRestTotal float64
GetTotal int64
PutTotal int64
GetRepairTotal int64
PutRepairTotal int64
GetAuditTotal int64
}
// StorageNodeUsage is node at rest space usage over a period of time
type StorageNodeUsage struct {
NodeID storj.NodeID
@ -138,6 +150,8 @@ type StoragenodeAccounting interface {
LastTimestamp(ctx context.Context, timestampType string) (time.Time, error)
// QueryPaymentInfo queries Nodes and Accounting_Rollup on nodeID
QueryPaymentInfo(ctx context.Context, start time.Time, end time.Time) ([]*CSVRow, error)
// QueryStorageNodePeriodUsage returns accounting statements for nodes for a given compensation period
QueryStorageNodePeriodUsage(ctx context.Context, period compensation.Period) ([]StorageNodePeriodUsage, error)
// QueryStorageNodeUsage returns slice of StorageNodeUsage for given period
QueryStorageNodeUsage(ctx context.Context, nodeID storj.NodeID, start time.Time, end time.Time) ([]StorageNodeUsage, error)
// DeleteTalliesBefore deletes all tallies prior to some time

View File

@ -0,0 +1,86 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package compensation
import "strings"
// Code is an enumeration of states some billing entry could be in.
type Code string
const (
// Disqualified is included if the node is disqualified.
Disqualified Code = "D"
// Sanctioned is included if payment is withheld because the node is in
// a sanctioned country.
Sanctioned Code = "S"
// No1099 is included if payment is withheld because the node has not
// filed a 1099 and payment would put it over limits.
No1099 Code = "T"
// InWithholding is included if the node is in the initial held amount
// period.
InWithholding Code = "E"
// GracefulExit is included if the node has gracefully exited.
GracefulExit Code = "X"
)
// CodeFromString parses the string into a Code.
func CodeFromString(s string) (Code, error) {
code := Code(s)
switch code {
case Disqualified, Sanctioned, No1099, InWithholding, GracefulExit:
return code, nil
default:
return "", Error.New("no such code %q", code)
}
}
// Codes represents a collection of Code values.
type Codes []Code
// String serializes the Codes into a colon separated list.
func (codes Codes) String() string {
builder := new(strings.Builder)
for i, code := range codes {
if i > 0 {
builder.WriteByte(':')
}
builder.WriteString(string(code))
}
return builder.String()
}
// UnmarshalCSV does the custom unmarshaling of Codes.
func (codes *Codes) UnmarshalCSV(s string) error {
value, err := CodesFromString(s)
if err != nil {
return err
}
*codes = value
return nil
}
// MarshalCSV does the custom marshaling of Codes.
func (codes Codes) MarshalCSV() (string, error) {
return codes.String(), nil
}
// CodesFromString parses the list of codes into a Codes.
func CodesFromString(s string) (codes Codes, err error) {
for _, segment := range strings.Split(s, ":") {
if len(segment) == 0 {
// ignore empty segments
continue
}
code, err := CodeFromString(segment)
if err != nil {
return nil, err
}
codes = append(codes, code)
}
return codes, nil
}

View File

@ -0,0 +1,11 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package compensation
import "github.com/zeebo/errs"
var (
// Error wraps common errors from this package
Error = errs.Class("compensation")
)

View File

@ -0,0 +1,60 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package compensation
import (
"strconv"
"strings"
)
// Config contains configuration for the calculations this package performs.
type Config struct {
Rates struct {
AtRestGBHours Rate `user:"true" help:"rate for data at rest per GB/hour" default:"0.00000205"`
GetTB Rate `user:"true" help:"rate for egress bandwidth per TB" default:"20.00"`
PutTB Rate `user:"true" help:"rate for ingress bandwidth per TB" default:"0"`
GetRepairTB Rate `user:"true" help:"rate for repair egress bandwidth per TB" default:"10.00"`
PutRepairTB Rate `user:"true" help:"rate for repair ingress bandwidth per TB" default:"0"`
GetAuditTB Rate `user:"true" help:"rate for audit egress bandwidth per TB" default:"10.00"`
}
WithheldPercents Percents `user:"true" help:"comma separated monthly withheld percentage rates" default:"75,75,75,50,50,50,25,25,25,0,0,0,0,0,0"`
DisposePercent int `user:"true" help:"percent of held amount disposed to node after leaving withheld" default:"50"`
}
// Percents is used to hold a list of percentages, typically for the withheld schedule.
type Percents []int
// String formats the percentages.
func (percents Percents) String() string {
s := make([]string, 0, len(percents))
for _, percent := range percents {
s = append(s, strconv.FormatInt(int64(percent), 10))
}
return strings.Join(s, ",")
}
// Set implements pflag.Value by parsing a comma separated list of percents
func (percents *Percents) Set(value string) error {
var entries []string
if value != "" {
entries = strings.Split(value, ",")
}
var toSet []int
for _, entry := range entries {
percent, err := strconv.ParseInt(entry, 10, 0)
if err != nil {
return Error.New("invalid percent %q: %w", entry, err)
}
toSet = append(toSet, int(percent))
}
*percents = toSet
return nil
}
// Type returns the type of the pflag.Value
func (percents Percents) Type() string {
return "percents"
}

View File

@ -0,0 +1,61 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package compensation
import (
"time"
"storj.io/common/storj"
)
// NodeID is a wrapper type around storj.NodeID that implements CSV helpers.
type NodeID storj.NodeID
// Bytes calls the underlying type's Bytes function.
func (nodeID NodeID) Bytes() []byte {
return storj.NodeID(nodeID).Bytes()
}
// String calls the underlying type's String function.
func (nodeID NodeID) String() string {
return storj.NodeID(nodeID).String()
}
// UnmarshalCSV reads the csv entry into a storj.NodeID.
func (nodeID *NodeID) UnmarshalCSV(s string) error {
v, err := storj.NodeIDFromString(s)
if err != nil {
return err
}
*nodeID = NodeID(v)
return nil
}
// MarshalCSV writes the storj.NodeID into a CSV entry.
func (nodeID NodeID) MarshalCSV() (string, error) {
return nodeID.String(), nil
}
// UTCDate is a wrapper type around time.Time that implements CSV helpers.
type UTCDate time.Time
// String formats the date into YYYY-MM-DD.
func (date UTCDate) String() string {
return time.Time(date).Format("2006-01-02")
}
// UnmarshalCSV reads the YYYY-MM-DD date into the date.
func (date *UTCDate) UnmarshalCSV(s string) error {
v, err := time.Parse("2006-01-02", s)
if err != nil {
return err
}
*date = UTCDate(v)
return nil
}
// MarshalCSV writes out a CSV row containing the YYYY-MM-DD of the time.
func (date UTCDate) MarshalCSV() (string, error) {
return date.String(), nil
}

View File

@ -0,0 +1,32 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package compensation
import (
"context"
"storj.io/storj/pkg/storj"
"storj.io/storj/private/currency"
)
// WithheldAmounts holds the amounts held and disposed.
type WithheldAmounts struct {
TotalHeld currency.MicroUnit
TotalDisposed currency.MicroUnit
}
// DB is the interface we need to source the data to calculate compensation.
type DB interface {
// QueryWithheldAmounts queries the WithheldAmounts for the given nodeID.
QueryWithheldAmounts(ctx context.Context, nodeID storj.NodeID) (WithheldAmounts, error)
// QueryPaidInYear returns the total amount paid to the nodeID in the provided year.
QueryPaidInYear(ctx context.Context, nodeID storj.NodeID, year int) (currency.MicroUnit, error)
// RecordPeriod records a set of paystubs and payments for some time period.
RecordPeriod(ctx context.Context, paystubs []Paystub, payments []Payment) error
// RecordPayments records one off individual payments.
RecordPayments(ctx context.Context, payments []Payment) error
}

View File

@ -0,0 +1,31 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package compensation
import (
"time"
"github.com/shopspring/decimal"
)
var (
oneHundred = decimal.NewFromInt(100)
)
// NodeWithheldPercent returns the percentage that should be withheld and if the node is still
// in the withholding period based on its creation date.
func NodeWithheldPercent(withheldPercents []int, nodeCreatedAt, endDate time.Time) (int, bool) {
for i, withheldPercent := range withheldPercents {
if nodeCreatedAt.AddDate(0, i+1, 0).After(endDate) {
return withheldPercent, true
}
}
return 0, false
}
// PercentOf sets v to a percentage of itself. For example if v was 200 and
// percent was 20, v would be set to 40.
func PercentOf(v, percent decimal.Decimal) decimal.Decimal {
return v.Mul(percent).Div(oneHundred)
}

View File

@ -0,0 +1,65 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package compensation_test
import (
"testing"
"time"
"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"
"storj.io/storj/satellite/compensation"
)
func TestNodeWithheldPercent(t *testing.T) {
date := func(year int, month time.Month, day int) time.Time {
return time.Date(year, month, day, 0, 0, 0, 0, time.UTC)
}
rates := []int{75, 75, 75, 50, 50, 50, 25, 25, 25, 0, 0, 0, 0, 0, 0}
startDate := date(2019, 1, 1)
for _, tt := range []struct {
rate int
inWithholding bool
date time.Time
}{
{rate: 75, inWithholding: true, date: startDate},
{rate: 75, inWithholding: true, date: date(2019, 2, 1)},
{rate: 75, inWithholding: true, date: date(2019, 3, 1)},
{rate: 75, inWithholding: true, date: date(2019, 3, 31)},
{rate: 50, inWithholding: true, date: date(2019, 4, 1)},
{rate: 50, inWithholding: true, date: date(2019, 5, 1)},
{rate: 50, inWithholding: true, date: date(2019, 6, 1)},
{rate: 50, inWithholding: true, date: date(2019, 6, 30)},
{rate: 25, inWithholding: true, date: date(2019, 7, 1)},
{rate: 25, inWithholding: true, date: date(2019, 8, 1)},
{rate: 25, inWithholding: true, date: date(2019, 9, 1)},
{rate: 25, inWithholding: true, date: date(2019, 9, 30)},
{rate: 00, inWithholding: true, date: date(2019, 10, 1)},
{rate: 00, inWithholding: true, date: date(2019, 11, 1)},
{rate: 00, inWithholding: true, date: date(2019, 12, 1)},
{rate: 00, inWithholding: true, date: date(2020, 1, 1)},
{rate: 00, inWithholding: true, date: date(2020, 2, 1)},
{rate: 00, inWithholding: true, date: date(2020, 3, 1)},
{rate: 00, inWithholding: true, date: date(2020, 3, 31)},
{rate: 00, inWithholding: false, date: date(2020, 4, 1)},
} {
t.Logf("rate=%d inWithholding=%t date=%s", tt.rate, tt.inWithholding, tt.date.Format("2006-01"))
rate, inWithholding := compensation.NodeWithheldPercent(rates, startDate, tt.date)
assert.Equal(t, tt.rate, rate)
assert.Equal(t, tt.inWithholding, inWithholding)
}
}
func TestPercentOf(t *testing.T) {
percentOf := func(v, p int64) int64 {
return compensation.PercentOf(decimal.NewFromInt(v), decimal.NewFromInt(p)).IntPart()
}
assert.Equal(t, int64(40), percentOf(200, 20))
assert.Equal(t, int64(0), percentOf(200, 0))
assert.Equal(t, int64(600), percentOf(200, 300))
}

View File

@ -0,0 +1,97 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package compensation
import (
"io"
"storj.io/common/strictcsv"
"storj.io/storj/private/currency"
)
// Invoice holds the calculations for the amount required to pay to a node
// for a given pay period.
type Invoice struct {
Period Period `csv:"period"` // The payment period
NodeID NodeID `csv:"node-id"` // The node ID
NodeCreatedAt UTCDate `csv:"node-created-at"` // When the node was created
NodeDisqualified *UTCDate `csv:"node-disqualified"` // When and if the node was disqualified
NodeGracefulExit *UTCDate `csv:"node-gracefulexit"` // When and if the node finished a graceful exit
NodeWallet string `csv:"node-wallet"` // The node's wallet address
NodeAddress string `csv:"node-address"` // The node's TODO
NodeLastIP string `csv:"node-last-ip"` // The last known ip the node had
Codes Codes `csv:"codes"` // Any codes providing context to the invoice
UsageAtRest float64 `csv:"usage-at-rest"` // Byte-hours provided during the payment period
UsageGet int64 `csv:"usage-get"` // Number of bytes served in GET requests
UsagePut int64 `csv:"usage-put"` // Number of bytes served in PUT requests
UsageGetRepair int64 `csv:"usage-get-repair"` // Number of bytes served in GET_REPAIR requests
UsagePutRepair int64 `csv:"usage-put-repair"` // Number of bytes served in PUT_REPAIR requests
UsageGetAudit int64 `csv:"usage-get-audit"` // Number of bytes served in GET_AUDIT requests
CompAtRest currency.MicroUnit `csv:"comp-at-rest"` // Compensation for usage-at-rest
CompGet currency.MicroUnit `csv:"comp-get"` // Compensation for usage-get
CompPut currency.MicroUnit `csv:"comp-put"` // Compensation for usage-put
CompGetRepair currency.MicroUnit `csv:"comp-get-repair"` // Compensation for usage-get-repair
CompPutRepair currency.MicroUnit `csv:"comp-put-repair"` // Compensation for usage-put-repair
CompGetAudit currency.MicroUnit `csv:"comp-get-audit"` // Compensation for usage-get-audit
SurgePercent int `csv:"surge-percent"` // Surge percent used to calculate compensation, or 0 if no surge
Owed currency.MicroUnit `csv:"owed"` // Amount we intend to pay to the node (sum(comp-*) - held + disposed)
Held currency.MicroUnit `csv:"held"` // Amount held from sum(comp-*) for this period
Disposed currency.MicroUnit `csv:"disposed"` // Amount of owed that is due to graceful-exit or held period ending
TotalHeld currency.MicroUnit `csv:"total-held"` // Total amount ever held from the node
TotalDisposed currency.MicroUnit `csv:"total-disposed"` // Total amount ever disposed to the node
PaidYTD currency.MicroUnit `csv:"paid-ytd"` // Total amount paid so far this year (not including this period)
}
// MergeNodeInfo updates the fields representing the node information into the invoice.
func (invoice *Invoice) MergeNodeInfo(nodeInfo NodeInfo) error {
if invoice.NodeID != NodeID(nodeInfo.ID) {
return Error.New("node ID mismatch (invoice=%q nodeinfo=%q)", invoice.NodeID, nodeInfo.ID)
}
invoice.NodeCreatedAt = UTCDate(nodeInfo.CreatedAt)
invoice.NodeDisqualified = (*UTCDate)(nodeInfo.Disqualified)
invoice.NodeGracefulExit = (*UTCDate)(nodeInfo.GracefulExit)
invoice.UsageAtRest = nodeInfo.UsageAtRest
invoice.UsageGet = nodeInfo.UsageGet
invoice.UsagePut = nodeInfo.UsagePut
invoice.UsageGetRepair = nodeInfo.UsageGetRepair
invoice.UsagePutRepair = nodeInfo.UsagePutRepair
invoice.UsageGetAudit = nodeInfo.UsageGetAudit
invoice.TotalHeld = nodeInfo.TotalHeld
invoice.TotalDisposed = nodeInfo.TotalDisposed
return nil
}
// MergeStatement updates the fields representing the calculation of the payment amounts
// into the invoice.
func (invoice *Invoice) MergeStatement(statement Statement) error {
if invoice.NodeID != NodeID(statement.NodeID) {
return Error.New("node ID mismatch (invoice=%q statement=%q)", invoice.NodeID, statement.NodeID)
}
invoice.Codes = statement.Codes
invoice.CompAtRest = statement.AtRest
invoice.CompGet = statement.Get
invoice.CompPut = statement.Put
invoice.CompGetRepair = statement.GetRepair
invoice.CompPutRepair = statement.PutRepair
invoice.CompGetAudit = statement.GetAudit
invoice.SurgePercent = statement.SurgePercent
invoice.Owed = statement.Owed
invoice.Held = statement.Held
invoice.Disposed = statement.Disposed
return nil
}
// ReadInvoices reads a collection of Invoice values in CSV form.
func ReadInvoices(r io.Reader) ([]Invoice, error) {
var invoices []Invoice
if err := strictcsv.Read(r, &invoices); err != nil {
return nil, err
}
return invoices, nil
}
// WriteInvoices writes a collection of Invoice values in CSV form.
func WriteInvoices(w io.Writer, invoices []Invoice) error {
return strictcsv.Write(w, invoices)
}

View File

@ -0,0 +1,46 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package compensation
import (
"io"
"os"
"storj.io/common/strictcsv"
"storj.io/storj/private/currency"
)
// Payment represents an actual payment that happened.
type Payment struct {
Period Period `csv:"period"`
NodeID NodeID `csv:"node-id"`
Amount currency.MicroUnit `csv:"amount"`
Receipt *string `csv:"receipt"`
Notes *string `csv:"notes"`
}
// LoadPayments loads a collection of Payments from a file on disk containing
// them in CSV form.
func LoadPayments(path string) ([]Payment, error) {
f, err := os.Open(path)
if err != nil {
return nil, Error.Wrap(err)
}
defer func() { _ = f.Close() }()
return ReadPayments(f)
}
// ReadPayments reads a collection of Payments in CSV form.
func ReadPayments(r io.Reader) ([]Payment, error) {
var payments []Payment
if err := strictcsv.Read(r, &payments); err != nil {
return nil, err
}
return payments, nil
}
// WritePayments writes a collection of payments in CSV form.
func WritePayments(w io.Writer, payments []Payment) error {
return strictcsv.Write(w, payments)
}

View File

@ -0,0 +1,55 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package compensation
import (
"io"
"os"
"storj.io/common/strictcsv"
"storj.io/storj/private/currency"
)
// Paystub contains the basic information about a payment that is to be made.
type Paystub struct {
Period Period `csv:"period"`
NodeID NodeID `csv:"node-id"`
Codes Codes `csv:"codes"`
UsageAtRest float64 `csv:"usage-at-rest"`
UsageGet int64 `csv:"usage-get"`
UsagePut int64 `csv:"usage-put"`
UsageGetRepair int64 `csv:"usage-get-repair"`
UsagePutRepair int64 `csv:"usage-put-repair"`
UsageGetAudit int64 `csv:"usage-get-audit"`
CompAtRest currency.MicroUnit `csv:"comp-at-rest"`
CompGet currency.MicroUnit `csv:"comp-get"`
CompPut currency.MicroUnit `csv:"comp-put"`
CompGetRepair currency.MicroUnit `csv:"comp-get-repair"`
CompPutRepair currency.MicroUnit `csv:"comp-put-repair"`
CompGetAudit currency.MicroUnit `csv:"comp-get-audit"`
SurgePercent int64 `csv:"surge-percent"`
Owed currency.MicroUnit `csv:"owed"`
Held currency.MicroUnit `csv:"held"`
Disposed currency.MicroUnit `csv:"disposed"`
Paid currency.MicroUnit `csv:"paid"`
}
// LoadPaystubs loads a collection of Paystubs in CSV form from the provided file.
func LoadPaystubs(path string) ([]Paystub, error) {
f, err := os.Open(path)
if err != nil {
return nil, Error.Wrap(err)
}
defer func() { _ = f.Close() }()
return ReadPaystubs(f)
}
// ReadPaystubs reads a collection of Paystubs in CSV form.
func ReadPaystubs(r io.Reader) ([]Paystub, error) {
var paystubs []Paystub
if err := strictcsv.Read(r, &paystubs); err != nil {
return nil, err
}
return paystubs, nil
}

View File

@ -0,0 +1,63 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package compensation
import (
"fmt"
"time"
)
// Period represents a monthly payment period.
type Period struct {
Year int
Month time.Month
}
// String outputs the YYYY-MM form of the payment period.
func (p Period) String() string {
return fmt.Sprintf("%04d-%02d", p.Year, p.Month)
}
// StartDate returns a time.Time that is less than or equal to any time in the period.
func (p Period) StartDate() time.Time {
return time.Date(p.Year, p.Month, 1, 0, 0, 0, 0, time.UTC)
}
// EndDateExclusive returns a time.Time that is greater than any time in the period.
func (p Period) EndDateExclusive() time.Time {
return time.Date(p.Year, p.Month+1, 1, 0, 0, 0, 0, time.UTC)
}
// UnmarshalCSV reads the Period in CSV form.
func (p *Period) UnmarshalCSV(s string) error {
v, err := PeriodFromString(s)
if err != nil {
return err
}
*p = v
return nil
}
// MarshalCSV returns the CSV form of the Period.
func (p Period) MarshalCSV() (string, error) {
return p.String(), nil
}
// PeriodFromString parses the YYYY-MM string into a Period.
func PeriodFromString(s string) (Period, error) {
t, err := time.Parse("2006-01", s)
if err != nil {
return Period{}, Error.Wrap(err)
}
return PeriodFromTime(t), nil
}
// PeriodFromTime takes a time.Time and returns a Period that contains it.
func PeriodFromTime(t time.Time) Period {
year, month, _ := t.UTC().Date()
return Period{
Year: year,
Month: month,
}
}

View File

@ -0,0 +1,42 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package compensation
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestPeriod(t *testing.T) {
for _, tt := range []struct {
year int
month time.Month
startDate string
endDate string
days int
}{
{year: 2019, month: 1, startDate: "2019-01-01", endDate: "2019-02-01", days: 31},
{year: 2019, month: 2, startDate: "2019-02-01", endDate: "2019-03-01", days: 28},
{year: 2019, month: 3, startDate: "2019-03-01", endDate: "2019-04-01", days: 31},
{year: 2019, month: 4, startDate: "2019-04-01", endDate: "2019-05-01", days: 30},
{year: 2019, month: 5, startDate: "2019-05-01", endDate: "2019-06-01", days: 31},
{year: 2019, month: 6, startDate: "2019-06-01", endDate: "2019-07-01", days: 30},
{year: 2019, month: 7, startDate: "2019-07-01", endDate: "2019-08-01", days: 31},
{year: 2019, month: 8, startDate: "2019-08-01", endDate: "2019-09-01", days: 31},
{year: 2019, month: 9, startDate: "2019-09-01", endDate: "2019-10-01", days: 30},
{year: 2019, month: 10, startDate: "2019-10-01", endDate: "2019-11-01", days: 31},
{year: 2019, month: 11, startDate: "2019-11-01", endDate: "2019-12-01", days: 30},
{year: 2019, month: 12, startDate: "2019-12-01", endDate: "2020-01-01", days: 31},
// leap year/month
{year: 2020, month: 2, startDate: "2020-02-01", endDate: "2020-03-01", days: 29},
} {
t.Logf("year:%d month:%d startDate:%s endDate:%s days:%d", tt.year, tt.month, tt.startDate, tt.endDate, tt.days)
period := Period{Year: tt.year, Month: tt.month}
assert.Equal(t, tt.startDate, period.StartDate().Format("2006-01-02"))
assert.Equal(t, tt.endDate, period.EndDateExclusive().Format("2006-01-02"))
}
}

View File

@ -0,0 +1,58 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package compensation
import (
"github.com/shopspring/decimal"
"github.com/spf13/pflag"
)
// Rates configures the payment rates for network operations.
type Rates struct {
AtRestGBHours Rate // For data at rest in dollars per gigabyte-hour.
GetTB Rate // For data the node has sent for reads in dollars per terabyte.
PutTB Rate // For data the node has received for writes in dollars per terabyte.
GetRepairTB Rate // For data the node has sent for repairs in dollars per terabyte.
PutRepairTB Rate // For data the node has received for repairs in dollars per terabyte.
GetAuditTB Rate // For data the node has sent for audits in dollars per terabyte.
}
// Rate is a wrapper type around a decimal.Decimal.
type Rate decimal.Decimal
var _ pflag.Value = (*Rate)(nil)
// RateFromString parses the string form of the rate into a Rate.
func RateFromString(value string) (Rate, error) {
r, err := decimal.NewFromString(value)
if err != nil {
return Rate{}, err
}
return Rate(r), nil
}
// String returns the string form of the Rate.
func (rate Rate) String() string {
return decimal.Decimal(rate).String()
}
// Set updates the Rate to be equal to the parsed string.
func (rate *Rate) Set(s string) error {
r, err := decimal.NewFromString(s)
if err != nil {
return err
}
*rate = Rate(r)
return nil
}
// Type returns a unique string representing the type of the Rate.
func (rate Rate) Type() string {
return "rate"
}
// RequireRateFromString parses the Rate from the string or panics.
func RequireRateFromString(s string) Rate {
return Rate(decimal.RequireFromString(s))
}

View File

@ -0,0 +1,220 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package compensation
import (
"time"
"github.com/shopspring/decimal"
"github.com/zeebo/errs"
"storj.io/common/storj"
"storj.io/storj/private/currency"
)
var (
gb = decimal.NewFromInt(1e9)
tb = decimal.NewFromInt(1e12)
)
var (
// DefaultWithheldPercents contains the standard withholding schedule.
DefaultWithheldPercents = []int{75, 75, 75, 50, 50, 50, 25, 25, 25}
// DefaultRates contains the standard operation rates.
DefaultRates = Rates{
AtRestGBHours: RequireRateFromString("0.00000205"), // $1.50/TB at rest
GetTB: RequireRateFromString("20.00"), // $20.00/TB
PutTB: RequireRateFromString("0.00"),
GetRepairTB: RequireRateFromString("10.00"), // $10.00/TB
PutRepairTB: RequireRateFromString("0.00"),
GetAuditTB: RequireRateFromString("10.0"), // $10.00/TB
}
)
// NodeInfo contains all of the information about a node and the operations
// it performed in some period.
type NodeInfo struct {
ID storj.NodeID
CreatedAt time.Time
Disqualified *time.Time
GracefulExit *time.Time
UsageAtRest float64
UsageGet int64
UsagePut int64
UsageGetRepair int64
UsagePutRepair int64
UsageGetAudit int64
TotalHeld currency.MicroUnit
TotalDisposed currency.MicroUnit
}
// Statement is the computed amounts and codes from a node.
type Statement struct {
NodeID storj.NodeID
Codes Codes
AtRest currency.MicroUnit
Get currency.MicroUnit
Put currency.MicroUnit
GetRepair currency.MicroUnit
PutRepair currency.MicroUnit
GetAudit currency.MicroUnit
SurgePercent int
Owed currency.MicroUnit
Held currency.MicroUnit
Disposed currency.MicroUnit
}
// PeriodInfo contains configuration about the payment info to generate
// the statements.
type PeriodInfo struct {
// Period is the period.
Period Period
// Nodes is usage and other related information for nodes for this period.
Nodes []NodeInfo
// Rates is the compensation rates for different operations. If nil, the
// default rates are used.
Rates *Rates
// WithheldPercents is the percent to withhold from the total, after surge
// adjustments, for each month in the node's lifetime. For example, to
// withhold 75% in the first month, 50% in the second month, 0% in the third
// month and to leave withheld thereafter, set to [75,50,0]. If nil,
// DefaultWithheldPercents is used.
WithheldPercents []int
// DisposePercent is the percent to dispose to the node after it has left
// withholding. The remaining amount is kept until the node performs a graceful
// exit.
DisposePercent int
// SurgePercent is the percent to adjust final amounts owed. For example,
// to pay 150%, set to 150. Zero means no surge.
SurgePercent int
}
// GenerateStatements generates all of the Statements for the given PeriodInfo.
func GenerateStatements(info PeriodInfo) ([]Statement, error) {
endDate := info.Period.EndDateExclusive()
rates := info.Rates
if rates == nil {
rates = &DefaultRates
}
withheldPercents := info.WithheldPercents
if withheldPercents == nil {
withheldPercents = DefaultWithheldPercents
}
surgePercent := decimal.NewFromInt(int64(info.SurgePercent))
disposePercent := decimal.NewFromInt(int64(info.DisposePercent))
// Intermediate calculations (especially at-rest related) can overflow an
// int64 so we need to use arbitrary precision fixed point math. The final
// calculations should fit comfortably into an int64. If not, it means
// we're trying to pay somebody more than 9,223,372,036,854,775,807
// micro-units (e.g. $9,223,372,036,854 dollars).
statements := make([]Statement, 0, len(info.Nodes))
for _, node := range info.Nodes {
var codes []Code
atRest := decimal.NewFromFloat(node.UsageAtRest).
Mul(decimal.Decimal(rates.AtRestGBHours)).
Div(gb)
get := decimal.NewFromInt(node.UsageGet).
Mul(decimal.Decimal(rates.GetTB)).
Div(tb)
put := decimal.NewFromInt(node.UsagePut).
Mul(decimal.Decimal(rates.PutTB)).
Div(tb)
getRepair := decimal.NewFromInt(node.UsageGetRepair).
Mul(decimal.Decimal(rates.GetRepairTB)).
Div(tb)
putRepair := decimal.NewFromInt(node.UsagePutRepair).
Mul(decimal.Decimal(rates.PutRepairTB)).
Div(tb)
getAudit := decimal.NewFromInt(node.UsageGetAudit).
Mul(decimal.Decimal(rates.GetAuditTB)).
Div(tb)
total := decimal.Sum(atRest, get, put, getRepair, putRepair, getAudit)
if info.SurgePercent > 0 {
total = PercentOf(total, surgePercent)
}
gracefullyExited := node.GracefulExit != nil && node.GracefulExit.Before(endDate)
if gracefullyExited {
codes = append(codes, GracefulExit)
}
withheldPercent, inWithholding := NodeWithheldPercent(withheldPercents, node.CreatedAt, endDate)
held := PercentOf(total, decimal.NewFromInt(int64(withheldPercent)))
owed := total.Sub(held)
if inWithholding {
codes = append(codes, InWithholding)
}
var disposed decimal.Decimal
if !inWithholding || gracefullyExited {
// The storage node is out of withholding. Determine how much should be
// disposed from withheld back to the storage node.
disposed = node.TotalHeld.Decimal()
if !gracefullyExited {
disposed = PercentOf(disposed, disposePercent)
} else { // if it's a graceful exit, don't withhold anything
owed = owed.Add(held)
held = decimal.Zero
}
disposed = disposed.Sub(node.TotalDisposed.Decimal())
if disposed.Sign() < 0 {
// We've disposed more than we should have according to the
// percent. Don't dispose any more.
disposed = decimal.Zero
}
owed = owed.Add(disposed)
}
// If the node is disqualified nothing is owed/held/disposed.
if node.Disqualified != nil && node.Disqualified.Before(endDate) && !gracefullyExited {
codes = append(codes, Disqualified)
disposed = decimal.Zero
held = decimal.Zero
owed = decimal.Zero
}
var overflowErrs errs.Group
toMicroUnit := func(v decimal.Decimal) currency.MicroUnit {
m, err := currency.MicroUnitFromDecimal(v)
if err != nil {
overflowErrs.Add(err)
return currency.MicroUnit{}
}
return m
}
statement := Statement{
NodeID: node.ID,
Codes: codes,
AtRest: toMicroUnit(atRest),
Get: toMicroUnit(get),
Put: toMicroUnit(put),
GetRepair: toMicroUnit(getRepair),
PutRepair: toMicroUnit(putRepair),
GetAudit: toMicroUnit(getAudit),
SurgePercent: info.SurgePercent,
Owed: toMicroUnit(owed),
Held: toMicroUnit(held),
Disposed: toMicroUnit(disposed),
}
if err := overflowErrs.Err(); err != nil {
return nil, Error.New("currency overflows encountered while calculating payment for node %s", statement.NodeID.String())
}
statements = append(statements, statement)
}
return statements, nil
}

View File

@ -0,0 +1,182 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package compensation_test
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"storj.io/common/testrand"
"storj.io/storj/private/currency"
"storj.io/storj/satellite/compensation"
)
// D returns a MicroUnit representing the amount in dollars. It is in general not
// useful because it accepts a float, but makes it easier to express natual units
// in tests.
func D(v float64) currency.MicroUnit { return currency.NewMicroUnit(int64(v * 1e6)) }
func TestGenerateStatements(t *testing.T) {
const (
GB = 1_000_000_000
TB = 1_000_000_000_000
)
rates := compensation.Rates{
AtRestGBHours: compensation.RequireRateFromString("2"),
GetTB: compensation.RequireRateFromString("3"),
PutTB: compensation.RequireRateFromString("5"),
GetRepairTB: compensation.RequireRateFromString("7"),
PutRepairTB: compensation.RequireRateFromString("11"),
GetAuditTB: compensation.RequireRateFromString("13"),
}
// 50 percent withheld the first month
withheldPercents := []int{50}
// 60 percent disposed after leaving withheld and before graceful exit
disposePercent := 60
nodeID := testrand.NodeID()
for _, tt := range []struct {
name string
surgePercent int
node compensation.NodeInfo
statement compensation.Statement
}{
{
name: "within withholding",
surgePercent: 0,
node: compensation.NodeInfo{
ID: nodeID,
CreatedAt: time.Date(2019, 11, 2, 0, 0, 0, 0, time.UTC),
UsageAtRest: 1 * GB,
UsageGet: 2 * TB,
UsagePut: 3 * TB,
UsageGetRepair: 4 * TB,
UsagePutRepair: 5 * TB,
UsageGetAudit: 6 * TB,
},
statement: compensation.Statement{
NodeID: nodeID,
Codes: compensation.Codes{compensation.InWithholding},
AtRest: D(2),
Get: D(6),
Put: D(15),
GetRepair: D(28),
PutRepair: D(55),
GetAudit: D(78),
Owed: D(92),
Held: D(92),
Disposed: D(0),
},
},
{
name: "just out of withheld",
surgePercent: 0,
node: compensation.NodeInfo{
ID: nodeID,
CreatedAt: time.Date(2019, 11, 1, 0, 0, 0, 0, time.UTC),
UsageAtRest: 1 * GB,
UsageGet: 2 * TB,
UsagePut: 3 * TB,
UsageGetRepair: 4 * TB,
UsagePutRepair: 5 * TB,
UsageGetAudit: 6 * TB,
TotalHeld: D(40),
},
statement: compensation.Statement{
NodeID: nodeID,
AtRest: D(2),
Get: D(6),
Put: D(15),
GetRepair: D(28),
PutRepair: D(55),
GetAudit: D(78),
Owed: D(184 + 24), // 184 for usage, 24 disposed from withheld
Held: D(0),
Disposed: D(24),
},
},
{
name: "out of withheld and already disposed",
surgePercent: 0,
node: compensation.NodeInfo{
ID: nodeID,
CreatedAt: time.Date(2019, 6, 12, 0, 0, 0, 0, time.UTC),
UsageAtRest: 1 * GB,
UsageGet: 2 * TB,
UsagePut: 3 * TB,
UsageGetRepair: 4 * TB,
UsagePutRepair: 5 * TB,
UsageGetAudit: 6 * TB,
TotalHeld: D(40),
TotalDisposed: D(24),
},
statement: compensation.Statement{
NodeID: nodeID,
AtRest: D(2),
Get: D(6),
Put: D(15),
GetRepair: D(28),
PutRepair: D(55),
GetAudit: D(78),
Owed: D(184),
Held: D(0),
Disposed: D(0),
},
},
{
name: "graceful exit within period",
surgePercent: 0,
node: compensation.NodeInfo{
ID: nodeID,
CreatedAt: time.Date(2018, 6, 12, 0, 0, 0, 0, time.UTC),
GracefulExit: timePtr(time.Date(2019, 11, 30, 23, 59, 59, 0, time.UTC)),
UsageAtRest: 1 * GB,
UsageGet: 2 * TB,
UsagePut: 3 * TB,
UsageGetRepair: 4 * TB,
UsagePutRepair: 5 * TB,
UsageGetAudit: 6 * TB,
TotalHeld: D(40),
TotalDisposed: D(24),
},
statement: compensation.Statement{
NodeID: nodeID,
Codes: compensation.Codes{compensation.GracefulExit},
AtRest: D(2),
Get: D(6),
Put: D(15),
GetRepair: D(28),
PutRepair: D(55),
GetAudit: D(78),
Owed: D(184 + 16),
Held: D(0),
Disposed: D(16),
},
},
} {
tt := tt
t.Run(tt.name, func(t *testing.T) {
statements, err := compensation.GenerateStatements(compensation.PeriodInfo{
Period: compensation.Period{Year: 2019, Month: 11},
Nodes: []compensation.NodeInfo{tt.node},
SurgePercent: tt.surgePercent,
Rates: &rates,
WithheldPercents: withheldPercents,
DisposePercent: disposePercent,
})
require.NoError(t, err)
assert.Equal(t, []compensation.Statement{tt.statement}, statements)
})
}
}
func timePtr(t time.Time) *time.Time {
return &t
}

View File

@ -20,6 +20,7 @@ import (
"storj.io/storj/satellite/admin"
"storj.io/storj/satellite/attribution"
"storj.io/storj/satellite/audit"
"storj.io/storj/satellite/compensation"
"storj.io/storj/satellite/console"
"storj.io/storj/satellite/console/consoleweb"
"storj.io/storj/satellite/contact"
@ -92,6 +93,8 @@ type DB interface {
DowntimeTracking() downtime.DB
// Heldamount returns database for heldamount.
HeldAmount() heldamount.DB
// Compoensation tracks storage node compensation
Compensation() compensation.DB
}
// Config is the global config satellite
@ -138,4 +141,6 @@ type Config struct {
Metrics metrics.Config
Downtime downtime.Config
Compensation compensation.Config
}

View File

@ -0,0 +1,140 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package satellitedb
import (
"context"
"fmt"
"storj.io/common/storj"
"storj.io/storj/private/currency"
"storj.io/storj/satellite/compensation"
"storj.io/storj/satellite/satellitedb/dbx"
)
type compensationDB struct {
db *satelliteDB
}
func (comp *compensationDB) QueryPaidInYear(ctx context.Context, nodeID storj.NodeID, year int) (totalPaid currency.MicroUnit, err error) {
defer mon.Task()(&ctx)(&err)
start := fmt.Sprintf("%04d-01", year)
endExclusive := fmt.Sprintf("%04d-01", year+1)
stmt := comp.db.Rebind(`
SELECT
coalesce(SUM(amount), 0) AS sum_paid
FROM
storagenode_payments
WHERE
node_id = ?
AND
period >= ? AND period < ?
`)
var sumPaid int64
if err := comp.db.DB.QueryRow(ctx, stmt, nodeID, start, endExclusive).Scan(&sumPaid); err != nil {
return currency.Zero, Error.Wrap(err)
}
return currency.NewMicroUnit(sumPaid), nil
}
// QueryWithheldAmounts returns withheld data for the given node
func (comp *compensationDB) QueryWithheldAmounts(ctx context.Context, nodeID storj.NodeID) (_ compensation.WithheldAmounts, err error) {
defer mon.Task()(&ctx)(&err)
stmt := comp.db.Rebind(`
SELECT
coalesce(SUM(held), 0) AS total_held,
coalesce(SUM(disposed), 0) AS total_disposed
FROM
storagenode_paystubs
WHERE
node_id = ?
`)
var totalHeld, totalDisposed int64
if err := comp.db.DB.QueryRow(ctx, stmt, nodeID).Scan(&totalHeld, &totalDisposed); err != nil {
return compensation.WithheldAmounts{}, Error.Wrap(err)
}
return compensation.WithheldAmounts{
TotalHeld: currency.NewMicroUnit(totalHeld),
TotalDisposed: currency.NewMicroUnit(totalDisposed),
}, nil
}
func (comp *compensationDB) RecordPeriod(ctx context.Context, paystubs []compensation.Paystub, payments []compensation.Payment) (err error) {
defer mon.Task()(&ctx)(&err)
return Error.Wrap(comp.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
if err := recordPaystubs(ctx, tx, paystubs); err != nil {
return err
}
if err := recordPayments(ctx, tx, payments); err != nil {
return err
}
return nil
}))
}
func (comp *compensationDB) RecordPayments(ctx context.Context, payments []compensation.Payment) (err error) {
defer mon.Task()(&ctx)(&err)
return Error.Wrap(comp.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
return recordPayments(ctx, tx, payments)
}))
}
func recordPaystubs(ctx context.Context, tx *dbx.Tx, paystubs []compensation.Paystub) error {
for _, paystub := range paystubs {
err := tx.CreateNoReturn_StoragenodePaystub(ctx,
dbx.StoragenodePaystub_Period(paystub.Period.String()),
dbx.StoragenodePaystub_NodeId(paystub.NodeID.Bytes()),
dbx.StoragenodePaystub_Codes(paystub.Codes.String()),
dbx.StoragenodePaystub_UsageAtRest(paystub.UsageAtRest),
dbx.StoragenodePaystub_UsageGet(paystub.UsageGet),
dbx.StoragenodePaystub_UsagePut(paystub.UsagePut),
dbx.StoragenodePaystub_UsageGetRepair(paystub.UsageGetRepair),
dbx.StoragenodePaystub_UsagePutRepair(paystub.UsagePutRepair),
dbx.StoragenodePaystub_UsageGetAudit(paystub.UsageGetAudit),
dbx.StoragenodePaystub_CompAtRest(paystub.CompAtRest.Value()),
dbx.StoragenodePaystub_CompGet(paystub.CompGet.Value()),
dbx.StoragenodePaystub_CompPut(paystub.CompPut.Value()),
dbx.StoragenodePaystub_CompGetRepair(paystub.CompGetRepair.Value()),
dbx.StoragenodePaystub_CompPutRepair(paystub.CompPutRepair.Value()),
dbx.StoragenodePaystub_CompGetAudit(paystub.CompGetAudit.Value()),
dbx.StoragenodePaystub_SurgePercent(paystub.SurgePercent),
dbx.StoragenodePaystub_Held(paystub.Held.Value()),
dbx.StoragenodePaystub_Owed(paystub.Owed.Value()),
dbx.StoragenodePaystub_Disposed(paystub.Disposed.Value()),
dbx.StoragenodePaystub_Paid(paystub.Paid.Value()),
)
if err != nil {
return err
}
}
return nil
}
func recordPayments(ctx context.Context, tx *dbx.Tx, payments []compensation.Payment) error {
for _, payment := range payments {
opts := dbx.StoragenodePayment_Create_Fields{}
if payment.Receipt != nil {
opts.Receipt = dbx.StoragenodePayment_Receipt(*payment.Receipt)
}
if payment.Notes != nil {
opts.Notes = dbx.StoragenodePayment_Notes(*payment.Notes)
}
err := tx.CreateNoReturn_StoragenodePayment(ctx,
dbx.StoragenodePayment_NodeId(payment.NodeID.Bytes()),
dbx.StoragenodePayment_Amount(payment.Amount.Value()),
opts,
)
if err != nil {
return err
}
}
return nil
}

View File

@ -16,6 +16,7 @@ import (
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/attribution"
"storj.io/storj/satellite/audit"
"storj.io/storj/satellite/compensation"
"storj.io/storj/satellite/console"
"storj.io/storj/satellite/downtime"
"storj.io/storj/satellite/gracefulexit"
@ -182,3 +183,8 @@ func (db *satelliteDB) DowntimeTracking() downtime.DB {
func (db *satelliteDB) HeldAmount() heldamount.DB {
return &paymentStubs{db: db}
}
// Compenstation returns database for storage node compensation
func (db *satelliteDB) Compensation() compensation.DB {
return &compensationDB{db: db}
}

View File

@ -1086,6 +1086,7 @@ func convertDBNode(ctx context.Context, info *dbx.Node) (_ *overlay.NodeDossier,
exitStatus.ExitInitiatedAt = info.ExitInitiatedAt
exitStatus.ExitLoopCompletedAt = info.ExitLoopCompletedAt
exitStatus.ExitFinishedAt = info.ExitFinishedAt
exitStatus.ExitSuccess = info.ExitSuccess
node := &overlay.NodeDossier{
Node: pb.Node{

View File

@ -14,6 +14,7 @@ import (
"storj.io/common/storj"
"storj.io/storj/private/dbutil"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/compensation"
"storj.io/storj/satellite/satellitedb/dbx"
)
@ -173,8 +174,8 @@ func (db *StoragenodeAccounting) LastTimestamp(ctx context.Context, timestampTyp
func (db *StoragenodeAccounting) QueryPaymentInfo(ctx context.Context, start time.Time, end time.Time) (_ []*accounting.CSVRow, err error) {
defer mon.Task()(&ctx)(&err)
var sqlStmt = `SELECT n.id, n.created_at, r.at_rest_total, r.get_repair_total,
r.put_repair_total, r.get_audit_total, r.put_total, r.get_total, n.wallet, n.disqualified
FROM (
r.put_repair_total, r.get_audit_total, r.put_total, r.get_total, n.wallet, n.disqualified
FROM (
SELECT node_id, SUM(at_rest_total) AS at_rest_total, SUM(get_repair_total) AS get_repair_total,
SUM(put_repair_total) AS put_repair_total, SUM(get_audit_total) AS get_audit_total,
SUM(put_total) AS put_total, SUM(get_total) AS get_total
@ -183,7 +184,7 @@ func (db *StoragenodeAccounting) QueryPaymentInfo(ctx context.Context, start tim
GROUP BY node_id
) r
LEFT JOIN nodes n ON n.id = r.node_id
ORDER BY n.id`
ORDER BY n.id`
rows, err := db.db.DB.QueryContext(ctx, db.db.Rebind(sqlStmt), start.UTC(), end.UTC())
if err != nil {
@ -216,6 +217,60 @@ func (db *StoragenodeAccounting) QueryPaymentInfo(ctx context.Context, start tim
return csv, rows.Err()
}
// QueryStorageNodePeriodUsage returns usage invoices for nodes for a compensation period
func (db *StoragenodeAccounting) QueryStorageNodePeriodUsage(ctx context.Context, period compensation.Period) (_ []accounting.StorageNodePeriodUsage, err error) {
defer mon.Task()(&ctx)(&err)
stmt := db.db.Rebind(`
SELECT
node_id,
SUM(at_rest_total) AS at_rest_total,
SUM(get_total) AS get_total,
SUM(put_total) AS put_total,
SUM(get_repair_total) AS get_repair_total,
SUM(put_repair_total) AS put_repair_total,
SUM(get_audit_total) AS get_audit_total
FROM
accounting_rollups
WHERE
start_time >= ? AND start_time < ?
GROUP BY
node_id
ORDER BY
node_id ASC
`)
rows, err := db.db.DB.QueryContext(ctx, stmt, period.StartDate(), period.EndDateExclusive())
if err != nil {
return nil, Error.Wrap(err)
}
defer func() { err = errs.Combine(err, rows.Close()) }()
usages := []accounting.StorageNodePeriodUsage{}
for rows.Next() {
var nodeID []byte
usage := accounting.StorageNodePeriodUsage{}
if err := rows.Scan(
&nodeID,
&usage.AtRestTotal,
&usage.GetTotal,
&usage.PutTotal,
&usage.GetRepairTotal,
&usage.PutRepairTotal,
&usage.GetAuditTotal,
); err != nil {
return nil, Error.Wrap(err)
}
usage.NodeID, err = storj.NodeIDFromBytes(nodeID)
if err != nil {
return nil, Error.Wrap(err)
}
usages = append(usages, usage)
}
return usages, rows.Err()
}
// QueryStorageNodeUsage returns slice of StorageNodeUsage for given period
func (db *StoragenodeAccounting) QueryStorageNodeUsage(ctx context.Context, nodeID storj.NodeID, start time.Time, end time.Time) (_ []accounting.StorageNodeUsage, err error) {
defer mon.Task()(&ctx)(&err)

View File

@ -37,6 +37,30 @@
# override value for repair threshold
# checker.repair-override: 0
# percent of held amount disposed to node after leaving withheld
compensation.dispose-percent: 50
# rate for data at rest per GB/hour
compensation.rates.at-rest-gb-hours: "0.00000205"
# rate for audit egress bandwidth per TB
compensation.rates.get-audit-tb: "10"
# rate for repair egress bandwidth per TB
compensation.rates.get-repair-tb: "10"
# rate for egress bandwidth per TB
compensation.rates.get-tb: "20"
# rate for repair ingress bandwidth per TB
compensation.rates.put-repair-tb: "0"
# rate for ingress bandwidth per TB
compensation.rates.put-tb: "0"
# comma separated monthly withheld percentage rates
compensation.withheld-percents: 75,75,75,50,50,50,25,25,25,0,0,0,0,0,0
# server address of the graphql api gateway and frontend app
# console.address: :10100