diff --git a/satellite/console/observerupgradeuser.go b/satellite/console/observerupgradeuser.go new file mode 100644 index 000000000..4615b5980 --- /dev/null +++ b/satellite/console/observerupgradeuser.go @@ -0,0 +1,89 @@ +// Copyright (C) 2023 Storj Labs, Inc. +// See LICENSE for copying information. + +package console + +import ( + "context" + + "storj.io/common/memory" + "storj.io/storj/satellite/payments/billing" +) + +var _ billing.Observer = (*UpgradeUserObserver)(nil) + +// UpgradeUserObserver used to upgrade user if their balance is more than $10 after confirmed token transaction. +type UpgradeUserObserver struct { + consoleDB DB + transactionsDB billing.TransactionsDB + usageLimitsConfig UsageLimitsConfig + userBalanceForUpgrade int64 +} + +// NewUpgradeUserObserver creates new observer instance. +func NewUpgradeUserObserver(consoleDB DB, transactionsDB billing.TransactionsDB, usageLimitsConfig UsageLimitsConfig, userBalanceForUpgrade int64) *UpgradeUserObserver { + return &UpgradeUserObserver{ + consoleDB: consoleDB, + transactionsDB: transactionsDB, + usageLimitsConfig: usageLimitsConfig, + userBalanceForUpgrade: userBalanceForUpgrade, + } +} + +// Process puts user into the paid tier and converts projects to upgraded limits. +func (o *UpgradeUserObserver) Process(ctx context.Context, transaction billing.Transaction) (err error) { + defer mon.Task()(&ctx)(&err) + + user, err := o.consoleDB.Users().Get(ctx, transaction.UserID) + if err != nil { + return err + } + + if user.PaidTier { + return nil + } + + balance, err := o.transactionsDB.GetBalance(ctx, user.ID) + if err != nil { + return err + } + + // check if user's balance is less than needed amount for upgrade. + if balance.BaseUnits() < o.userBalanceForUpgrade { + return nil + } + + err = o.consoleDB.Users().UpdatePaidTier(ctx, user.ID, true, + o.usageLimitsConfig.Bandwidth.Paid, + o.usageLimitsConfig.Storage.Paid, + o.usageLimitsConfig.Segment.Paid, + o.usageLimitsConfig.Project.Paid, + ) + if err != nil { + return err + } + + projects, err := o.consoleDB.Projects().GetOwn(ctx, user.ID) + if err != nil { + return err + } + for _, project := range projects { + if project.StorageLimit == nil || *project.StorageLimit < o.usageLimitsConfig.Storage.Paid { + project.StorageLimit = new(memory.Size) + *project.StorageLimit = o.usageLimitsConfig.Storage.Paid + } + if project.BandwidthLimit == nil || *project.BandwidthLimit < o.usageLimitsConfig.Bandwidth.Paid { + project.BandwidthLimit = new(memory.Size) + *project.BandwidthLimit = o.usageLimitsConfig.Bandwidth.Paid + } + if project.SegmentLimit == nil || *project.SegmentLimit < o.usageLimitsConfig.Segment.Paid { + *project.SegmentLimit = o.usageLimitsConfig.Segment.Paid + } + err = o.consoleDB.Projects().Update(ctx, &project) + if err != nil { + return err + } + } + + return nil +} diff --git a/satellite/console/service.go b/satellite/console/service.go index 3c9420d69..8bdbb0597 100644 --- a/satellite/console/service.go +++ b/satellite/console/service.go @@ -194,6 +194,7 @@ type Config struct { LoginAttemptsWithoutPenalty int `help:"number of times user can try to login without penalty" default:"3"` FailedLoginPenalty float64 `help:"incremental duration of penalty for failed login attempts in minutes" default:"2.0"` ProjectInvitationExpiration time.Duration `help:"duration that project member invitations are valid for" default:"168h"` + UserBalanceForUpgrade int64 `help:"amount of base units of US micro dollars needed to upgrade user's tier status'" default:"100000"` UsageLimits UsageLimitsConfig Captcha CaptchaConfig Session SessionConfig diff --git a/satellite/core.go b/satellite/core.go index 55a17e018..b37a9f751 100644 --- a/satellite/core.go +++ b/satellite/core.go @@ -517,6 +517,10 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, debug.Cycle("Payments Storjscan", peer.Payments.StorjscanChore.TransactionCycle), ) + choreObservers := map[billing.ObserverBilling]billing.Observer{ + billing.ObserverUpgradeUser: console.NewUpgradeUserObserver(peer.DB.Console(), peer.DB.Billing(), config.Console.UsageLimits, config.Console.UserBalanceForUpgrade), + } + peer.Payments.BillingChore = billing.NewChore( peer.Log.Named("payments.billing:chore"), []billing.PaymentType{peer.Payments.StorjscanService}, @@ -524,6 +528,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config.Payments.BillingConfig.Interval, config.Payments.BillingConfig.DisableLoop, config.Payments.BonusRate, + choreObservers, ) peer.Services.Add(lifecycle.Item{ Name: "billing:chore", diff --git a/satellite/payments/billing/chore.go b/satellite/payments/billing/chore.go index 5e6288df9..0087f0e95 100644 --- a/satellite/payments/billing/chore.go +++ b/satellite/payments/billing/chore.go @@ -13,6 +13,14 @@ import ( "storj.io/common/sync2" ) +// ObserverBilling used to create enumerable of chore observers. +type ObserverBilling int64 + +const ( + // ObserverUpgradeUser stands for upgrade user observer type. + ObserverUpgradeUser ObserverBilling = 0 +) + // ChoreErr is billing chore err class. var ChoreErr = errs.Class("billing chore") @@ -27,10 +35,11 @@ type Chore struct { disableLoop bool bonusRate int64 + observers map[ObserverBilling]Observer } // NewChore creates new chore. -func NewChore(log *zap.Logger, paymentTypes []PaymentType, transactionsDB TransactionsDB, interval time.Duration, disableLoop bool, bonusRate int64) *Chore { +func NewChore(log *zap.Logger, paymentTypes []PaymentType, transactionsDB TransactionsDB, interval time.Duration, disableLoop bool, bonusRate int64, observers map[ObserverBilling]Observer) *Chore { return &Chore{ log: log, paymentTypes: paymentTypes, @@ -38,6 +47,7 @@ func NewChore(log *zap.Logger, paymentTypes []PaymentType, transactionsDB Transa TransactionCycle: sync2.NewCycle(interval), disableLoop: disableLoop, bonusRate: bonusRate, + observers: observers, } } @@ -73,6 +83,11 @@ func (chore *Chore) Run(ctx context.Context) (err error) { // we need to halt storing transactions if one fails, so that it can be tried again on the next loop. break } + + err = chore.observers[ObserverUpgradeUser].Process(ctx, transaction) + if err != nil { + chore.log.Error("error upgrading user", zap.Error(ChoreErr.Wrap(err))) + } } } return nil diff --git a/satellite/payments/billing/chore_test.go b/satellite/payments/billing/chore_test.go index ad6a9c061..22914c0a0 100644 --- a/satellite/payments/billing/chore_test.go +++ b/satellite/payments/billing/chore_test.go @@ -19,9 +19,9 @@ import ( "storj.io/common/testcontext" "storj.io/common/testrand" "storj.io/common/uuid" - "storj.io/storj/satellite" + "storj.io/storj/private/testplanet" + "storj.io/storj/satellite/console" "storj.io/storj/satellite/payments/billing" - "storj.io/storj/satellite/satellitedb/satellitedbtest" ) func TestChore(t *testing.T) { @@ -74,7 +74,7 @@ func TestChore(t *testing.T) { assert.Equal(t, expected, actual, "unexpected balance for user %s (%q)", userID, names[userID]) } - runTest := func(ctx *testcontext.Context, t *testing.T, db billing.TransactionsDB, bonusRate int64, mikeTXs, joeTXs, robertTXs []billing.Transaction, mikeBalance, joeBalance, robertBalance currency.Amount) { + runTest := func(ctx *testcontext.Context, t *testing.T, consoleDB console.DB, db billing.TransactionsDB, bonusRate int64, mikeTXs, joeTXs, robertTXs []billing.Transaction, mikeBalance, joeBalance, robertBalance currency.Amount, usageLimitsConfig console.UsageLimitsConfig, userBalanceForUpgrade int64) { paymentTypes := []billing.PaymentType{ newFakePaymentType(billing.StorjScanSource, []billing.Transaction{mike1, joe1, joe2}, @@ -85,7 +85,11 @@ func TestChore(t *testing.T) { ), } - chore := billing.NewChore(zaptest.NewLogger(t), paymentTypes, db, time.Hour, false, bonusRate) + choreObservers := map[billing.ObserverBilling]billing.Observer{ + billing.ObserverUpgradeUser: console.NewUpgradeUserObserver(consoleDB, db, usageLimitsConfig, userBalanceForUpgrade), + } + + chore := billing.NewChore(zaptest.NewLogger(t), paymentTypes, db, time.Hour, false, bonusRate, choreObservers) ctx.Go(func() error { return chore.Run(ctx) }) @@ -106,32 +110,141 @@ func TestChore(t *testing.T) { } t.Run("without StorjScan bonus", func(t *testing.T) { - satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { - runTest(ctx, t, db.Billing(), 0, + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 0, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + sat := planet.Satellites[0] + db := sat.DB + + runTest(ctx, t, db.Console(), db.Billing(), 0, []billing.Transaction{mike2, mike1}, []billing.Transaction{joe1, joe2}, []billing.Transaction{robert1}, currency.AmountFromBaseUnits(30000000, currency.USDollarsMicro), currency.AmountFromBaseUnits(4000000, currency.USDollarsMicro), currency.AmountFromBaseUnits(30000000, currency.USDollarsMicro), + sat.Config.Console.UsageLimits, + sat.Config.Console.UserBalanceForUpgrade, ) }) }) t.Run("with StorjScan bonus", func(t *testing.T) { - satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { - runTest(ctx, t, db.Billing(), 10, + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 0, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + sat := planet.Satellites[0] + db := sat.DB + + runTest(ctx, t, db.Console(), db.Billing(), 10, []billing.Transaction{mike2, mike2Bonus, mike1, mike1Bonus}, []billing.Transaction{joe1, joe1Bonus, joe2}, []billing.Transaction{robert1}, currency.AmountFromBaseUnits(33000000, currency.USDollarsMicro), currency.AmountFromBaseUnits(4500000, currency.USDollarsMicro), currency.AmountFromBaseUnits(30000000, currency.USDollarsMicro), + sat.Config.Console.UsageLimits, + sat.Config.Console.UserBalanceForUpgrade, ) }) }) } +func TestChore_UpgradeUserObserver(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 0, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + sat := planet.Satellites[0] + db := sat.DB + usageLimitsConfig := sat.Config.Console.UsageLimits + ts := makeTimestamp() + + user, err := sat.AddUser(ctx, console.CreateUser{ + FullName: "Test User", + Email: "choreobserver@mail.test", + }, 1) + require.NoError(t, err) + + _, err = sat.AddProject(ctx, user.ID, "Test Project") + require.NoError(t, err) + + choreObservers := map[billing.ObserverBilling]billing.Observer{ + billing.ObserverUpgradeUser: console.NewUpgradeUserObserver(db.Console(), db.Billing(), sat.Config.Console.UsageLimits, sat.Config.Console.UserBalanceForUpgrade), + } + + amount1 := int64(2) // $2 + amount2 := int64(8) // $8 + transaction1 := makeFakeTransaction(user.ID, billing.StorjScanSource, billing.TransactionTypeCredit, amount1, ts, `{"fake": "transaction1"}`) + transaction2 := makeFakeTransaction(user.ID, billing.StorjScanSource, billing.TransactionTypeCredit, amount2, ts.Add(time.Second*2), `{"fake": "transaction2"}`) + paymentTypes := []billing.PaymentType{ + newFakePaymentType(billing.StorjScanSource, + []billing.Transaction{transaction1}, + []billing.Transaction{}, + []billing.Transaction{transaction2}, + []billing.Transaction{}, + ), + } + + chore := billing.NewChore(zaptest.NewLogger(t), paymentTypes, db.Billing(), time.Hour, false, 0, choreObservers) + ctx.Go(func() error { + return chore.Run(ctx) + }) + defer ctx.Check(chore.Close) + + t.Run("user not upgraded", func(t *testing.T) { + chore.TransactionCycle.Pause() + chore.TransactionCycle.TriggerWait() + chore.TransactionCycle.Pause() + + balance, err := db.Billing().GetBalance(ctx, user.ID) + require.NoError(t, err) + expected := currency.AmountFromBaseUnits(amount1*int64(10000), currency.USDollarsMicro) + require.True(t, expected.Equal(balance)) + + user, err = db.Console().Users().Get(ctx, user.ID) + require.NoError(t, err) + require.False(t, user.PaidTier) + + projects, err := db.Console().Projects().GetOwn(ctx, user.ID) + require.NoError(t, err) + + for _, p := range projects { + require.Equal(t, usageLimitsConfig.Storage.Free, *p.StorageLimit) + require.Equal(t, usageLimitsConfig.Bandwidth.Free, *p.BandwidthLimit) + require.Equal(t, usageLimitsConfig.Segment.Free, *p.SegmentLimit) + } + }) + + t.Run("user upgraded", func(t *testing.T) { + chore.TransactionCycle.Pause() + chore.TransactionCycle.TriggerWait() + chore.TransactionCycle.Pause() + + balance, err := db.Billing().GetBalance(ctx, user.ID) + require.NoError(t, err) + expected := currency.AmountFromBaseUnits((amount1+amount2)*int64(10000), currency.USDollarsMicro) + require.True(t, expected.Equal(balance)) + + user, err = db.Console().Users().Get(ctx, user.ID) + require.NoError(t, err) + require.True(t, user.PaidTier) + require.Equal(t, usageLimitsConfig.Storage.Paid.Int64(), user.ProjectStorageLimit) + require.Equal(t, usageLimitsConfig.Bandwidth.Paid.Int64(), user.ProjectBandwidthLimit) + require.Equal(t, usageLimitsConfig.Segment.Paid, user.ProjectSegmentLimit) + require.Equal(t, usageLimitsConfig.Project.Paid, user.ProjectLimit) + + projects, err := db.Console().Projects().GetOwn(ctx, user.ID) + require.NoError(t, err) + + for _, p := range projects { + require.Equal(t, usageLimitsConfig.Storage.Paid, *p.StorageLimit) + require.Equal(t, usageLimitsConfig.Bandwidth.Paid, *p.BandwidthLimit) + require.Equal(t, usageLimitsConfig.Segment.Paid, *p.SegmentLimit) + } + }) + }) +} + func makeFakeTransaction(userID uuid.UUID, source string, typ billing.TransactionType, amountUSD int64, timestamp time.Time, metadata string) billing.Transaction { return billing.Transaction{ UserID: userID, @@ -176,7 +289,7 @@ func newFakePaymentType(source string, txBatches ...[]billing.Transaction) *fake func (pt *fakePaymentType) Source() string { return pt.source } func (pt *fakePaymentType) Type() billing.TransactionType { return pt.txType } -func (pt *fakePaymentType) GetNewTransactions(ctx context.Context, lastTransactionTime time.Time, metadata []byte) ([]billing.Transaction, error) { +func (pt *fakePaymentType) GetNewTransactions(_ context.Context, lastTransactionTime time.Time, metadata []byte) ([]billing.Transaction, error) { // Ensure that the chore is passing up the expected fields switch { case !pt.lastTransactionTime.Equal(lastTransactionTime): diff --git a/satellite/payments/billing/observer.go b/satellite/payments/billing/observer.go new file mode 100644 index 000000000..224fd049b --- /dev/null +++ b/satellite/payments/billing/observer.go @@ -0,0 +1,12 @@ +// Copyright (C) 2023 Storj Labs, Inc. +// See LICENSE for copying information. + +package billing + +import "context" + +// Observer processes a billing transaction. +type Observer interface { + // Process is called repeatedly for each transaction. + Process(context.Context, Transaction) error +} diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index 095d6001a..e7c05f32c 100755 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -406,6 +406,9 @@ compensation.withheld-percents: 75,75,75,50,50,50,25,25,25,0,0,0,0,0,0 # whether to use vuetify POC project # console.use-vuetify-project: false +# amount of base units of US micro dollars needed to upgrade user's tier status' +# console.user-balance-for-upgrade: 100000 + # whether to load templates on each request # console.watch: false