satellite/{console,payments}: freeze/warn storjscan users

This change enables the freezing/warning of users who use storjscan.

Issue: https://github.com/storj/storj/issues/6164

Change-Id: I7b00ee09d6527b3818b72326e9065c82ef5a2ac8
This commit is contained in:
Wilfred Asomani 2023-08-15 16:58:50 +00:00 committed by Storj Robot
parent ca0ea50cba
commit dcc4bd0d10
11 changed files with 352 additions and 88 deletions

View File

@ -77,6 +77,18 @@ const (
Warning AccountFreezeEventType = 1
)
// String returns a string representation of this event.
func (et AccountFreezeEventType) String() string {
switch et {
case Freeze:
return "Freeze"
case Warning:
return "Warning"
default:
return ""
}
}
// AccountFreezeService encapsulates operations concerning account freezes.
type AccountFreezeService struct {
freezeEventsDB AccountFreezeEvents

View File

@ -0,0 +1,48 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package console
import (
"context"
"storj.io/storj/satellite/payments"
"storj.io/storj/satellite/payments/billing"
)
var _ billing.Observer = (*InvoiceTokenPaymentObserver)(nil)
// InvoiceTokenPaymentObserver used to pay pending payments with STORJ tokens.
type InvoiceTokenPaymentObserver struct {
consoleDB DB
payments payments.Accounts
}
// NewInvoiceTokenPaymentObserver creates new observer instance.
func NewInvoiceTokenPaymentObserver(consoleDB DB, payments payments.Accounts) *InvoiceTokenPaymentObserver {
return &InvoiceTokenPaymentObserver{
consoleDB: consoleDB,
payments: payments,
}
}
// Process attempts to pay user's pending payments with tokens.
func (o *InvoiceTokenPaymentObserver) Process(ctx context.Context, transaction billing.Transaction) (err error) {
defer mon.Task()(&ctx)(&err)
user, err := o.consoleDB.Users().Get(ctx, transaction.UserID)
if err != nil {
return err
}
if !user.PaidTier {
return nil
}
err = o.payments.Invoices().AttemptPayOverdueInvoicesWithTokens(ctx, user.ID)
if err != nil {
return err
}
return nil
}

View File

@ -521,6 +521,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
choreObservers := billing.ChoreObservers{
UpgradeUser: console.NewUpgradeUserObserver(peer.DB.Console(), peer.DB.Billing(), config.Console.UsageLimits, config.Console.UserBalanceForUpgrade),
PayInvoices: console.NewInvoiceTokenPaymentObserver(peer.DB.Console(), peer.Payments.Accounts),
}
peer.Payments.BillingChore = billing.NewChore(

View File

@ -33,7 +33,7 @@ type Config struct {
Interval time.Duration `help:"How often to run this chore, which is how often unpaid invoices are checked." default:"24h"`
GracePeriod time.Duration `help:"How long to wait between a warning event and freezing an account." default:"360h"`
PriceThreshold int64 `help:"The failed invoice amount (in cents) beyond which an account will not be frozen" default:"10000"`
ExcludeStorjscan bool `help:"whether to exclude storjscan-paying users from automatic warn/freeze" default:"true"`
ExcludeStorjscan bool `help:"whether to exclude storjscan-paying users from automatic warn/freeze" default:"false"`
}
// Chore is a chore that checks for unpaid invoices and potentially freezes corresponding accounts.
@ -116,6 +116,7 @@ func (chore *Chore) attemptFreezeWarn(ctx context.Context) {
debugLog := func(message string) {
chore.log.Debug(message,
zap.String("process", "freeze/warn"),
zap.String("invoiceID", invoice.ID),
zap.String("customerID", invoice.CustomerID),
zap.Any("userID", userID),
@ -124,6 +125,7 @@ func (chore *Chore) attemptFreezeWarn(ctx context.Context) {
errorLog := func(message string, err error) {
chore.log.Error(message,
zap.String("process", "freeze/warn"),
zap.String("invoiceID", invoice.ID),
zap.String("customerID", invoice.CustomerID),
zap.Any("userID", userID),
@ -284,26 +286,40 @@ func (chore *Chore) attemptUnfreezeUnwarn(ctx context.Context) {
}
for _, event := range events.Events {
errorLog := func(message string, err error) {
chore.log.Error(message,
zap.String("process", "unfreeze/unwarn"),
zap.Any("userID", event.UserID),
zap.String("eventType", event.Type.String()),
zap.Error(Error.Wrap(err)),
)
}
usersCount++
invoices, err := chore.payments.Invoices().ListFailed(ctx, &event.UserID)
if err != nil {
chore.log.Error("Could not get failed invoices for user", zap.Error(Error.Wrap(err)))
errorLog("Could not get failed invoices for user", err)
continue
}
if len(invoices) > 0 {
// try to pay the invoices.
err = chore.payments.Invoices().AttemptPayOverdueInvoices(ctx, event.UserID)
if err != nil {
errorLog("Could not attempt payment", err)
}
continue
}
if event.Type == console.Freeze {
err = chore.freezeService.UnfreezeUser(ctx, event.UserID)
if err != nil {
chore.log.Error("Could not unfreeze user", zap.Error(Error.Wrap(err)))
errorLog("Could not unfreeze user", err)
}
unfrozenCount++
} else {
err = chore.freezeService.UnWarnUser(ctx, event.UserID)
if err != nil {
chore.log.Error("Could not unwarn user", zap.Error(Error.Wrap(err)))
errorLog("Could not unwarn user", err)
}
unwarnedCount++
}

View File

@ -323,90 +323,112 @@ func TestAutoFreezeChore(t *testing.T) {
err = service.UnfreezeUser(ctx, user2.ID)
require.NoError(t, err)
})
})
}
t.Run("Storjscan exceptions", func(t *testing.T) {
// AnalyticsMock tests that events are sent once.
service.TestChangeFreezeTracker(newFreezeTrackerMock(t))
// reset chore clock
chore.TestSetNow(time.Now)
func TestAutoFreezeChore_StorjscanExclusion(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 0,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.AccountFreeze.Enabled = true
config.AccountFreeze.ExcludeStorjscan = true
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
sat := planet.Satellites[0]
stripeClient := sat.API.Payments.StripeClient
invoicesDB := sat.Core.Payments.Accounts.Invoices()
customerDB := sat.Core.DB.StripeCoinPayments().Customers()
usersDB := sat.DB.Console().Users()
projectsDB := sat.DB.Console().Projects()
service := console.NewAccountFreezeService(sat.DB.Console().AccountFreezeEvents(), usersDB, projectsDB, newFreezeTrackerMock(t))
chore := sat.Core.Payments.AccountFreeze
chore.TestSetFreezeService(service)
storjscanUser, err := sat.AddUser(ctx, console.CreateUser{
FullName: "Test User",
Email: "storjscanuser@mail.test",
}, 1)
require.NoError(t, err)
amount := int64(100)
curr := string(stripe.CurrencyUSD)
// create a wallet and transaction for the new user in storjscan
address, err := blockchain.BytesToAddress(testrand.Bytes(20))
require.NoError(t, err)
require.NoError(t, sat.DB.Wallets().Add(ctx, storjscanUser.ID, address))
cachedPayments := []storjscan.CachedPayment{
{
From: blockchaintest.NewAddress(),
To: address,
TokenValue: currency.AmountFromBaseUnits(1000, currency.StorjToken),
USDValue: currency.AmountFromBaseUnits(testrand.Int63n(1000), currency.USDollarsMicro),
BlockHash: blockchaintest.NewHash(),
Transaction: blockchaintest.NewHash(),
Status: payments.PaymentStatusConfirmed,
Timestamp: time.Now(),
},
}
require.NoError(t, sat.DB.StorjscanPayments().InsertBatch(ctx, cachedPayments))
// AnalyticsMock tests that events are sent once.
service.TestChangeFreezeTracker(newFreezeTrackerMock(t))
// reset chore clock
chore.TestSetNow(time.Now)
storjscanCus, err := customerDB.GetCustomerID(ctx, storjscanUser.ID)
require.NoError(t, err)
storjscanUser, err := sat.AddUser(ctx, console.CreateUser{
FullName: "Test User",
Email: "storjscanuser@mail.test",
}, 1)
require.NoError(t, err)
item, err := stripeClient.InvoiceItems().New(&stripe.InvoiceItemParams{
Params: stripe.Params{Context: ctx},
Amount: &amount,
Currency: &curr,
Customer: &storjscanCus,
})
require.NoError(t, err)
// create a wallet and transaction for the new user in storjscan
address, err := blockchain.BytesToAddress(testrand.Bytes(20))
require.NoError(t, err)
require.NoError(t, sat.DB.Wallets().Add(ctx, storjscanUser.ID, address))
cachedPayments := []storjscan.CachedPayment{
{
From: blockchaintest.NewAddress(),
To: address,
TokenValue: currency.AmountFromBaseUnits(1000, currency.StorjToken),
USDValue: currency.AmountFromBaseUnits(testrand.Int63n(1000), currency.USDollarsMicro),
BlockHash: blockchaintest.NewHash(),
Transaction: blockchaintest.NewHash(),
Status: payments.PaymentStatusConfirmed,
Timestamp: time.Now(),
},
}
require.NoError(t, sat.DB.StorjscanPayments().InsertBatch(ctx, cachedPayments))
items := make([]*stripe.InvoiceUpcomingInvoiceItemParams, 0, 1)
items = append(items, &stripe.InvoiceUpcomingInvoiceItemParams{
InvoiceItem: &item.ID,
Amount: &amount,
Currency: &curr,
})
inv, err := stripeClient.Invoices().New(&stripe.InvoiceParams{
Params: stripe.Params{Context: ctx},
Customer: &storjscanCus,
InvoiceItems: items,
})
require.NoError(t, err)
storjscanCus, err := customerDB.GetCustomerID(ctx, storjscanUser.ID)
require.NoError(t, err)
paymentMethod := stripe1.MockInvoicesPayFailure
inv, err = stripeClient.Invoices().Pay(inv.ID, &stripe.InvoicePayParams{
Params: stripe.Params{Context: ctx},
PaymentMethod: &paymentMethod,
})
require.Error(t, err)
require.Equal(t, stripe.InvoiceStatusOpen, inv.Status)
failed, err := invoicesDB.ListFailed(ctx, nil)
require.NoError(t, err)
require.Equal(t, 1, len(failed))
invFound := false
for _, failedInv := range failed {
if failedInv.ID == inv.ID {
invFound = true
break
}
}
require.True(t, invFound)
chore.Loop.TriggerWait()
// user should not be warned or frozen due to storjscan payments
freeze, warning, err := service.GetAll(ctx, storjscanUser.ID)
require.NoError(t, err)
require.Nil(t, warning)
require.Nil(t, freeze)
item, err := stripeClient.InvoiceItems().New(&stripe.InvoiceItemParams{
Params: stripe.Params{Context: ctx},
Amount: &amount,
Currency: &curr,
Customer: &storjscanCus,
})
require.NoError(t, err)
items := make([]*stripe.InvoiceUpcomingInvoiceItemParams, 0, 1)
items = append(items, &stripe.InvoiceUpcomingInvoiceItemParams{
InvoiceItem: &item.ID,
Amount: &amount,
Currency: &curr,
})
inv, err := stripeClient.Invoices().New(&stripe.InvoiceParams{
Params: stripe.Params{Context: ctx},
Customer: &storjscanCus,
InvoiceItems: items,
})
require.NoError(t, err)
paymentMethod := stripe1.MockInvoicesPayFailure
inv, err = stripeClient.Invoices().Pay(inv.ID, &stripe.InvoicePayParams{
Params: stripe.Params{Context: ctx},
PaymentMethod: &paymentMethod,
})
require.Error(t, err)
require.Equal(t, stripe.InvoiceStatusOpen, inv.Status)
failed, err := invoicesDB.ListFailed(ctx, nil)
require.NoError(t, err)
require.Equal(t, 1, len(failed))
invFound := false
for _, failedInv := range failed {
if failedInv.ID == inv.ID {
invFound = true
break
}
}
require.True(t, invFound)
chore.Loop.TriggerWait()
// user should not be warned or frozen due to storjscan payments
freeze, warning, err := service.GetAll(ctx, storjscanUser.ID)
require.NoError(t, err)
require.Nil(t, warning)
require.Nil(t, freeze)
})
}

View File

@ -22,6 +22,7 @@ type Observer interface {
// ChoreObservers holds functionality to process confirmed transactions using different types of observers.
type ChoreObservers struct {
UpgradeUser Observer
PayInvoices Observer
}
// ChoreErr is billing chore err class.
@ -87,16 +88,21 @@ func (chore *Chore) Run(ctx context.Context) (err error) {
break
}
if chore.observers.UpgradeUser == nil {
continue
if chore.observers.UpgradeUser != nil {
err = chore.observers.UpgradeUser.Process(ctx, transaction)
if err != nil {
// we don't want to halt storing transactions if upgrade user observer fails
// because this chore is designed to store new transactions.
// So auto upgrading user is a side effect which shouldn't interrupt the main process.
chore.log.Error("error upgrading user", zap.Error(ChoreErr.Wrap(err)))
}
}
err = chore.observers.UpgradeUser.Process(ctx, transaction)
if err != nil {
// we don't want to halt storing transactions if upgrade user observer fails
// because this chore is designed to store new transactions.
// So auto upgrading user is a side effect which shouldn't interrupt the main process.
chore.log.Error("error upgrading user", zap.Error(ChoreErr.Wrap(err)))
if chore.observers.PayInvoices != nil {
err = chore.observers.PayInvoices.Process(ctx, transaction)
if err != nil {
chore.log.Error("error paying invoices", zap.Error(ChoreErr.Wrap(err)))
}
}
}
}
@ -110,3 +116,9 @@ func (chore *Chore) Close() (err error) {
chore.TransactionCycle.Close()
return nil
}
// TestSetPaymentTypes is used in tests to change the payment
// types this chore tracks.
func (chore *Chore) TestSetPaymentTypes(types []PaymentType) {
chore.paymentTypes = types
}

View File

@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stripe/stripe-go/v72"
"github.com/zeebo/errs"
"go.uber.org/zap/zaptest"
@ -19,6 +20,7 @@ import (
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/common/uuid"
"storj.io/storj/private/blockchain"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite/console"
"storj.io/storj/satellite/payments/billing"
@ -242,6 +244,115 @@ func TestChore_UpgradeUserObserver(t *testing.T) {
})
}
func TestChore_PayInvoiceObserver(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
sat := planet.Satellites[0]
db := sat.DB
invoicesDB := sat.Core.Payments.Accounts.Invoices()
stripeClient := sat.API.Payments.StripeClient
customerDB := sat.Core.DB.StripeCoinPayments().Customers()
ts := makeTimestamp()
user, err := sat.AddUser(ctx, console.CreateUser{
FullName: "Test User",
Email: "choreobserver@mail.test",
}, 1)
require.NoError(t, err)
cus, err := customerDB.GetCustomerID(ctx, user.ID)
require.NoError(t, err)
// setup storjscan wallet
address, err := blockchain.BytesToAddress(testrand.Bytes(20))
require.NoError(t, err)
userID := user.ID
err = sat.DB.Wallets().Add(ctx, userID, address)
require.NoError(t, err)
choreObservers := billing.ChoreObservers{
UpgradeUser: console.NewUpgradeUserObserver(db.Console(), db.Billing(), sat.Config.Console.UsageLimits, sat.Config.Console.UserBalanceForUpgrade),
PayInvoices: console.NewInvoiceTokenPaymentObserver(db.Console(), sat.Core.Payments.Accounts),
}
amount := int64(2000) // $20
amount2 := int64(1000) // $10
transaction := makeFakeTransaction(user.ID, billing.StorjScanSource, billing.TransactionTypeCredit, amount, ts, `{"fake": "transaction"}`)
transaction2 := makeFakeTransaction(user.ID, billing.StorjScanSource, billing.TransactionTypeCredit, amount2, ts.Add(time.Second*2), `{"fake": "transaction2"}`)
paymentTypes := []billing.PaymentType{
newFakePaymentType(billing.StorjScanSource,
[]billing.Transaction{transaction},
[]billing.Transaction{},
[]billing.Transaction{transaction2},
[]billing.Transaction{},
),
}
chore := billing.NewChore(zaptest.NewLogger(t), paymentTypes, db.Billing(), time.Hour, false, 0, choreObservers)
ctx.Go(func() error {
return chore.Run(ctx)
})
defer ctx.Check(chore.Close)
// create invoice
item, err := stripeClient.InvoiceItems().New(&stripe.InvoiceItemParams{
Params: stripe.Params{Context: ctx},
Amount: &amount,
Currency: stripe.String(string(stripe.CurrencyUSD)),
Customer: &cus,
})
require.NoError(t, err)
fullAmount := amount + amount2
items := make([]*stripe.InvoiceUpcomingInvoiceItemParams, 0, 1)
items = append(items, &stripe.InvoiceUpcomingInvoiceItemParams{
InvoiceItem: &item.ID,
Amount: &fullAmount,
Currency: stripe.String(string(stripe.CurrencyUSD)),
})
inv, err := stripeClient.Invoices().New(&stripe.InvoiceParams{
Params: stripe.Params{Context: ctx},
Customer: &cus,
InvoiceItems: items,
})
require.NoError(t, err)
inv, err = stripeClient.Invoices().FinalizeInvoice(inv.ID, nil)
require.NoError(t, err)
require.Equal(t, stripe.InvoiceStatusOpen, inv.Status)
invoices, err := invoicesDB.List(ctx, user.ID)
require.NoError(t, err)
require.NotEmpty(t, invoices)
require.Equal(t, inv.ID, invoices[0].ID)
require.Equal(t, inv.ID, invoices[0].ID)
require.Equal(t, string(inv.Status), invoices[0].Status)
chore.TransactionCycle.TriggerWait()
// user balance would've been the value of amount ($20) but
// PayInvoiceObserver will use this to pay part of this user's invoice.
balance, err := db.Billing().GetBalance(ctx, user.ID)
require.NoError(t, err)
require.Zero(t, balance.BaseUnits())
invoices, err = invoicesDB.List(ctx, user.ID)
require.NoError(t, err)
require.NotEmpty(t, invoices)
// invoice remains unpaid since only $20 was paid.
require.Equal(t, string(stripe.InvoiceStatusOpen), invoices[0].Status)
chore.TransactionCycle.TriggerWait()
// the second transaction of $10 reflects at this point and
// is used to pay for the remaining invoice balance.
invoices, err = invoicesDB.List(ctx, user.ID)
require.NoError(t, err)
require.NotEmpty(t, invoices)
require.Equal(t, string(stripe.InvoiceStatusPaid), invoices[0].Status)
})
}
func makeFakeTransaction(userID uuid.UUID, source string, typ billing.TransactionType, amountUSD int64, timestamp time.Time, metadata string) billing.Transaction {
return billing.Transaction{
UserID: userID,

View File

@ -45,6 +45,8 @@ type Invoices interface {
CheckPendingItems(ctx context.Context, userID uuid.UUID) (existingItems bool, err error)
// AttemptPayOverdueInvoices attempts to pay a user's open, overdue invoices.
AttemptPayOverdueInvoices(ctx context.Context, userID uuid.UUID) (err error)
// AttemptPayOverdueInvoicesWithTokens attempts to pay a user's open, overdue invoices with token only.
AttemptPayOverdueInvoicesWithTokens(ctx context.Context, userID uuid.UUID) (err error)
// Delete a draft invoice.
Delete(ctx context.Context, id string) (inv *Invoice, err error)
}

View File

@ -151,6 +151,41 @@ func (invoices *invoices) AttemptPayOverdueInvoices(ctx context.Context, userID
return nil
}
// AttemptPayOverdueInvoicesWithTokens attempts to pay a user's open, overdue invoices with tokens only.
func (invoices *invoices) AttemptPayOverdueInvoicesWithTokens(ctx context.Context, userID uuid.UUID) (err error) {
defer mon.Task()(&ctx, userID)(&err)
customerID, err := invoices.service.db.Customers().GetCustomerID(ctx, userID)
if err != nil {
return Error.Wrap(err)
}
stripeInvoices, err := invoices.service.getInvoices(ctx, customerID, time.Unix(0, 0))
if err != nil {
return Error.Wrap(err)
}
if len(stripeInvoices) == 0 {
return nil
}
// first check users token balance
monetaryTokenBalance, err := invoices.service.billingDB.GetBalance(ctx, userID)
if err != nil {
invoices.service.log.Error("error getting token balance", zap.Error(err))
return Error.Wrap(err)
}
if monetaryTokenBalance.BaseUnits() == 0 {
return Error.New("User has no tokens")
}
err = invoices.service.PayInvoicesWithTokenBalance(ctx, userID, customerID, stripeInvoices)
if err != nil {
invoices.service.log.Error("error paying invoice(s) with token balance", zap.Error(err))
return Error.Wrap(err)
}
return nil
}
// AttemptPayOverdueInvoices attempts to pay a user's open, overdue invoices.
func (invoices *invoices) attemptPayOverdueInvoicesWithCC(ctx context.Context, stripeInvoices []stripe.Invoice) (err error) {
var errGrp errs.Group

View File

@ -954,6 +954,11 @@ func (m mockCreditNotes) New(params *stripe.CreditNoteParams) (*stripe.CreditNot
for _, invoice := range invoices {
if invoice.ID == *params.Invoice {
invoice.AmountRemaining -= *params.Lines[0].UnitAmount
invoice.AmountDue -= *params.Lines[0].UnitAmount
invoice.Lines.Data[0].Amount -= *params.Lines[0].UnitAmount
if invoice.AmountRemaining <= 0 {
invoice.Status = stripe.InvoiceStatusPaid
}
}
}
}

View File

@ -2,7 +2,7 @@
# account-freeze.enabled: false
# whether to exclude storjscan-paying users from automatic warn/freeze
# account-freeze.exclude-storjscan: true
# account-freeze.exclude-storjscan: false
# How long to wait between a warning event and freezing an account.
# account-freeze.grace-period: 360h0m0s