satellite/payments: extend billing chore functionality to upgrade user

Added new observer for billing chore to check user's balance and upgrade their account if balance is more than or equal to needed amount for upgrade.
Added new config value which stands for needed amount of base units of US micro dollars needed to upgrade user.

Issue:
https://github.com/storj/storj/issues/5978

Change-Id: Ic3992cd3114397bfdd9e231ca090ff21ca66648b
This commit is contained in:
Vitalii 2023-07-12 18:16:51 +03:00
parent 0303920da7
commit 2ee0195eba
7 changed files with 248 additions and 10 deletions

View File

@ -0,0 +1,89 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package console
import (
"context"
"storj.io/common/memory"
"storj.io/storj/satellite/payments/billing"
)
var _ billing.Observer = (*UpgradeUserObserver)(nil)
// UpgradeUserObserver used to upgrade user if their balance is more than $10 after confirmed token transaction.
type UpgradeUserObserver struct {
consoleDB DB
transactionsDB billing.TransactionsDB
usageLimitsConfig UsageLimitsConfig
userBalanceForUpgrade int64
}
// NewUpgradeUserObserver creates new observer instance.
func NewUpgradeUserObserver(consoleDB DB, transactionsDB billing.TransactionsDB, usageLimitsConfig UsageLimitsConfig, userBalanceForUpgrade int64) *UpgradeUserObserver {
return &UpgradeUserObserver{
consoleDB: consoleDB,
transactionsDB: transactionsDB,
usageLimitsConfig: usageLimitsConfig,
userBalanceForUpgrade: userBalanceForUpgrade,
}
}
// Process puts user into the paid tier and converts projects to upgraded limits.
func (o *UpgradeUserObserver) Process(ctx context.Context, transaction billing.Transaction) (err error) {
defer mon.Task()(&ctx)(&err)
user, err := o.consoleDB.Users().Get(ctx, transaction.UserID)
if err != nil {
return err
}
if user.PaidTier {
return nil
}
balance, err := o.transactionsDB.GetBalance(ctx, user.ID)
if err != nil {
return err
}
// check if user's balance is less than needed amount for upgrade.
if balance.BaseUnits() < o.userBalanceForUpgrade {
return nil
}
err = o.consoleDB.Users().UpdatePaidTier(ctx, user.ID, true,
o.usageLimitsConfig.Bandwidth.Paid,
o.usageLimitsConfig.Storage.Paid,
o.usageLimitsConfig.Segment.Paid,
o.usageLimitsConfig.Project.Paid,
)
if err != nil {
return err
}
projects, err := o.consoleDB.Projects().GetOwn(ctx, user.ID)
if err != nil {
return err
}
for _, project := range projects {
if project.StorageLimit == nil || *project.StorageLimit < o.usageLimitsConfig.Storage.Paid {
project.StorageLimit = new(memory.Size)
*project.StorageLimit = o.usageLimitsConfig.Storage.Paid
}
if project.BandwidthLimit == nil || *project.BandwidthLimit < o.usageLimitsConfig.Bandwidth.Paid {
project.BandwidthLimit = new(memory.Size)
*project.BandwidthLimit = o.usageLimitsConfig.Bandwidth.Paid
}
if project.SegmentLimit == nil || *project.SegmentLimit < o.usageLimitsConfig.Segment.Paid {
*project.SegmentLimit = o.usageLimitsConfig.Segment.Paid
}
err = o.consoleDB.Projects().Update(ctx, &project)
if err != nil {
return err
}
}
return nil
}

View File

@ -194,6 +194,7 @@ type Config struct {
LoginAttemptsWithoutPenalty int `help:"number of times user can try to login without penalty" default:"3"`
FailedLoginPenalty float64 `help:"incremental duration of penalty for failed login attempts in minutes" default:"2.0"`
ProjectInvitationExpiration time.Duration `help:"duration that project member invitations are valid for" default:"168h"`
UserBalanceForUpgrade int64 `help:"amount of base units of US micro dollars needed to upgrade user's tier status'" default:"100000"`
UsageLimits UsageLimitsConfig
Captcha CaptchaConfig
Session SessionConfig

View File

@ -517,6 +517,10 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
debug.Cycle("Payments Storjscan", peer.Payments.StorjscanChore.TransactionCycle),
)
choreObservers := map[billing.ObserverBilling]billing.Observer{
billing.ObserverUpgradeUser: console.NewUpgradeUserObserver(peer.DB.Console(), peer.DB.Billing(), config.Console.UsageLimits, config.Console.UserBalanceForUpgrade),
}
peer.Payments.BillingChore = billing.NewChore(
peer.Log.Named("payments.billing:chore"),
[]billing.PaymentType{peer.Payments.StorjscanService},
@ -524,6 +528,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
config.Payments.BillingConfig.Interval,
config.Payments.BillingConfig.DisableLoop,
config.Payments.BonusRate,
choreObservers,
)
peer.Services.Add(lifecycle.Item{
Name: "billing:chore",

View File

@ -13,6 +13,14 @@ import (
"storj.io/common/sync2"
)
// ObserverBilling used to create enumerable of chore observers.
type ObserverBilling int64
const (
// ObserverUpgradeUser stands for upgrade user observer type.
ObserverUpgradeUser ObserverBilling = 0
)
// ChoreErr is billing chore err class.
var ChoreErr = errs.Class("billing chore")
@ -27,10 +35,11 @@ type Chore struct {
disableLoop bool
bonusRate int64
observers map[ObserverBilling]Observer
}
// NewChore creates new chore.
func NewChore(log *zap.Logger, paymentTypes []PaymentType, transactionsDB TransactionsDB, interval time.Duration, disableLoop bool, bonusRate int64) *Chore {
func NewChore(log *zap.Logger, paymentTypes []PaymentType, transactionsDB TransactionsDB, interval time.Duration, disableLoop bool, bonusRate int64, observers map[ObserverBilling]Observer) *Chore {
return &Chore{
log: log,
paymentTypes: paymentTypes,
@ -38,6 +47,7 @@ func NewChore(log *zap.Logger, paymentTypes []PaymentType, transactionsDB Transa
TransactionCycle: sync2.NewCycle(interval),
disableLoop: disableLoop,
bonusRate: bonusRate,
observers: observers,
}
}
@ -73,6 +83,11 @@ func (chore *Chore) Run(ctx context.Context) (err error) {
// we need to halt storing transactions if one fails, so that it can be tried again on the next loop.
break
}
err = chore.observers[ObserverUpgradeUser].Process(ctx, transaction)
if err != nil {
chore.log.Error("error upgrading user", zap.Error(ChoreErr.Wrap(err)))
}
}
}
return nil

View File

@ -19,9 +19,9 @@ import (
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/common/uuid"
"storj.io/storj/satellite"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite/console"
"storj.io/storj/satellite/payments/billing"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
)
func TestChore(t *testing.T) {
@ -74,7 +74,7 @@ func TestChore(t *testing.T) {
assert.Equal(t, expected, actual, "unexpected balance for user %s (%q)", userID, names[userID])
}
runTest := func(ctx *testcontext.Context, t *testing.T, db billing.TransactionsDB, bonusRate int64, mikeTXs, joeTXs, robertTXs []billing.Transaction, mikeBalance, joeBalance, robertBalance currency.Amount) {
runTest := func(ctx *testcontext.Context, t *testing.T, consoleDB console.DB, db billing.TransactionsDB, bonusRate int64, mikeTXs, joeTXs, robertTXs []billing.Transaction, mikeBalance, joeBalance, robertBalance currency.Amount, usageLimitsConfig console.UsageLimitsConfig, userBalanceForUpgrade int64) {
paymentTypes := []billing.PaymentType{
newFakePaymentType(billing.StorjScanSource,
[]billing.Transaction{mike1, joe1, joe2},
@ -85,7 +85,11 @@ func TestChore(t *testing.T) {
),
}
chore := billing.NewChore(zaptest.NewLogger(t), paymentTypes, db, time.Hour, false, bonusRate)
choreObservers := map[billing.ObserverBilling]billing.Observer{
billing.ObserverUpgradeUser: console.NewUpgradeUserObserver(consoleDB, db, usageLimitsConfig, userBalanceForUpgrade),
}
chore := billing.NewChore(zaptest.NewLogger(t), paymentTypes, db, time.Hour, false, bonusRate, choreObservers)
ctx.Go(func() error {
return chore.Run(ctx)
})
@ -106,32 +110,141 @@ func TestChore(t *testing.T) {
}
t.Run("without StorjScan bonus", func(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
runTest(ctx, t, db.Billing(), 0,
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
sat := planet.Satellites[0]
db := sat.DB
runTest(ctx, t, db.Console(), db.Billing(), 0,
[]billing.Transaction{mike2, mike1},
[]billing.Transaction{joe1, joe2},
[]billing.Transaction{robert1},
currency.AmountFromBaseUnits(30000000, currency.USDollarsMicro),
currency.AmountFromBaseUnits(4000000, currency.USDollarsMicro),
currency.AmountFromBaseUnits(30000000, currency.USDollarsMicro),
sat.Config.Console.UsageLimits,
sat.Config.Console.UserBalanceForUpgrade,
)
})
})
t.Run("with StorjScan bonus", func(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
runTest(ctx, t, db.Billing(), 10,
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
sat := planet.Satellites[0]
db := sat.DB
runTest(ctx, t, db.Console(), db.Billing(), 10,
[]billing.Transaction{mike2, mike2Bonus, mike1, mike1Bonus},
[]billing.Transaction{joe1, joe1Bonus, joe2},
[]billing.Transaction{robert1},
currency.AmountFromBaseUnits(33000000, currency.USDollarsMicro),
currency.AmountFromBaseUnits(4500000, currency.USDollarsMicro),
currency.AmountFromBaseUnits(30000000, currency.USDollarsMicro),
sat.Config.Console.UsageLimits,
sat.Config.Console.UserBalanceForUpgrade,
)
})
})
}
func TestChore_UpgradeUserObserver(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
sat := planet.Satellites[0]
db := sat.DB
usageLimitsConfig := sat.Config.Console.UsageLimits
ts := makeTimestamp()
user, err := sat.AddUser(ctx, console.CreateUser{
FullName: "Test User",
Email: "choreobserver@mail.test",
}, 1)
require.NoError(t, err)
_, err = sat.AddProject(ctx, user.ID, "Test Project")
require.NoError(t, err)
choreObservers := map[billing.ObserverBilling]billing.Observer{
billing.ObserverUpgradeUser: console.NewUpgradeUserObserver(db.Console(), db.Billing(), sat.Config.Console.UsageLimits, sat.Config.Console.UserBalanceForUpgrade),
}
amount1 := int64(2) // $2
amount2 := int64(8) // $8
transaction1 := makeFakeTransaction(user.ID, billing.StorjScanSource, billing.TransactionTypeCredit, amount1, ts, `{"fake": "transaction1"}`)
transaction2 := makeFakeTransaction(user.ID, billing.StorjScanSource, billing.TransactionTypeCredit, amount2, ts.Add(time.Second*2), `{"fake": "transaction2"}`)
paymentTypes := []billing.PaymentType{
newFakePaymentType(billing.StorjScanSource,
[]billing.Transaction{transaction1},
[]billing.Transaction{},
[]billing.Transaction{transaction2},
[]billing.Transaction{},
),
}
chore := billing.NewChore(zaptest.NewLogger(t), paymentTypes, db.Billing(), time.Hour, false, 0, choreObservers)
ctx.Go(func() error {
return chore.Run(ctx)
})
defer ctx.Check(chore.Close)
t.Run("user not upgraded", func(t *testing.T) {
chore.TransactionCycle.Pause()
chore.TransactionCycle.TriggerWait()
chore.TransactionCycle.Pause()
balance, err := db.Billing().GetBalance(ctx, user.ID)
require.NoError(t, err)
expected := currency.AmountFromBaseUnits(amount1*int64(10000), currency.USDollarsMicro)
require.True(t, expected.Equal(balance))
user, err = db.Console().Users().Get(ctx, user.ID)
require.NoError(t, err)
require.False(t, user.PaidTier)
projects, err := db.Console().Projects().GetOwn(ctx, user.ID)
require.NoError(t, err)
for _, p := range projects {
require.Equal(t, usageLimitsConfig.Storage.Free, *p.StorageLimit)
require.Equal(t, usageLimitsConfig.Bandwidth.Free, *p.BandwidthLimit)
require.Equal(t, usageLimitsConfig.Segment.Free, *p.SegmentLimit)
}
})
t.Run("user upgraded", func(t *testing.T) {
chore.TransactionCycle.Pause()
chore.TransactionCycle.TriggerWait()
chore.TransactionCycle.Pause()
balance, err := db.Billing().GetBalance(ctx, user.ID)
require.NoError(t, err)
expected := currency.AmountFromBaseUnits((amount1+amount2)*int64(10000), currency.USDollarsMicro)
require.True(t, expected.Equal(balance))
user, err = db.Console().Users().Get(ctx, user.ID)
require.NoError(t, err)
require.True(t, user.PaidTier)
require.Equal(t, usageLimitsConfig.Storage.Paid.Int64(), user.ProjectStorageLimit)
require.Equal(t, usageLimitsConfig.Bandwidth.Paid.Int64(), user.ProjectBandwidthLimit)
require.Equal(t, usageLimitsConfig.Segment.Paid, user.ProjectSegmentLimit)
require.Equal(t, usageLimitsConfig.Project.Paid, user.ProjectLimit)
projects, err := db.Console().Projects().GetOwn(ctx, user.ID)
require.NoError(t, err)
for _, p := range projects {
require.Equal(t, usageLimitsConfig.Storage.Paid, *p.StorageLimit)
require.Equal(t, usageLimitsConfig.Bandwidth.Paid, *p.BandwidthLimit)
require.Equal(t, usageLimitsConfig.Segment.Paid, *p.SegmentLimit)
}
})
})
}
func makeFakeTransaction(userID uuid.UUID, source string, typ billing.TransactionType, amountUSD int64, timestamp time.Time, metadata string) billing.Transaction {
return billing.Transaction{
UserID: userID,
@ -176,7 +289,7 @@ func newFakePaymentType(source string, txBatches ...[]billing.Transaction) *fake
func (pt *fakePaymentType) Source() string { return pt.source }
func (pt *fakePaymentType) Type() billing.TransactionType { return pt.txType }
func (pt *fakePaymentType) GetNewTransactions(ctx context.Context, lastTransactionTime time.Time, metadata []byte) ([]billing.Transaction, error) {
func (pt *fakePaymentType) GetNewTransactions(_ context.Context, lastTransactionTime time.Time, metadata []byte) ([]billing.Transaction, error) {
// Ensure that the chore is passing up the expected fields
switch {
case !pt.lastTransactionTime.Equal(lastTransactionTime):

View File

@ -0,0 +1,12 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package billing
import "context"
// Observer processes a billing transaction.
type Observer interface {
// Process is called repeatedly for each transaction.
Process(context.Context, Transaction) error
}

View File

@ -406,6 +406,9 @@ compensation.withheld-percents: 75,75,75,50,50,50,25,25,25,0,0,0,0,0,0
# whether to use vuetify POC project
# console.use-vuetify-project: false
# amount of base units of US micro dollars needed to upgrade user's tier status'
# console.user-balance-for-upgrade: 100000
# whether to load templates on each request
# console.watch: false