satellite/payments: slightly refactor user upgrade observer of billing chore

Resolves post-merge comments from here
https://review.dev.storj.io/c/storj/storj/+/10780

Reworked some definitions. Added checks and comments.

Change-Id: I19c63804ad1f30a1ffd8cb87e96f43deed20a685
This commit is contained in:
Vitalii 2023-07-20 18:37:53 +03:00 committed by Storj Robot
parent 2c934d1cfd
commit 6e4044b245
4 changed files with 25 additions and 27 deletions

View File

@ -519,8 +519,8 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
debug.Cycle("Payments Storjscan", peer.Payments.StorjscanChore.TransactionCycle), debug.Cycle("Payments Storjscan", peer.Payments.StorjscanChore.TransactionCycle),
) )
choreObservers := map[billing.ObserverBilling]billing.Observer{ choreObservers := billing.ChoreObservers{
billing.ObserverUpgradeUser: console.NewUpgradeUserObserver(peer.DB.Console(), peer.DB.Billing(), config.Console.UsageLimits, config.Console.UserBalanceForUpgrade), UpgradeUser: console.NewUpgradeUserObserver(peer.DB.Console(), peer.DB.Billing(), config.Console.UsageLimits, config.Console.UserBalanceForUpgrade),
} }
peer.Payments.BillingChore = billing.NewChore( peer.Payments.BillingChore = billing.NewChore(

View File

@ -13,13 +13,16 @@ import (
"storj.io/common/sync2" "storj.io/common/sync2"
) )
// ObserverBilling used to create enumerable of chore observers. // Observer processes a billing transaction.
type ObserverBilling int64 type Observer interface {
// Process is called repeatedly for each transaction.
Process(context.Context, Transaction) error
}
const ( // ChoreObservers holds functionality to process confirmed transactions using different types of observers.
// ObserverUpgradeUser stands for upgrade user observer type. type ChoreObservers struct {
ObserverUpgradeUser ObserverBilling = 0 UpgradeUser Observer
) }
// ChoreErr is billing chore err class. // ChoreErr is billing chore err class.
var ChoreErr = errs.Class("billing chore") var ChoreErr = errs.Class("billing chore")
@ -35,11 +38,11 @@ type Chore struct {
disableLoop bool disableLoop bool
bonusRate int64 bonusRate int64
observers map[ObserverBilling]Observer observers ChoreObservers
} }
// NewChore creates new chore. // NewChore creates new chore.
func NewChore(log *zap.Logger, paymentTypes []PaymentType, transactionsDB TransactionsDB, interval time.Duration, disableLoop bool, bonusRate int64, observers map[ObserverBilling]Observer) *Chore { func NewChore(log *zap.Logger, paymentTypes []PaymentType, transactionsDB TransactionsDB, interval time.Duration, disableLoop bool, bonusRate int64, observers ChoreObservers) *Chore {
return &Chore{ return &Chore{
log: log, log: log,
paymentTypes: paymentTypes, paymentTypes: paymentTypes,
@ -84,8 +87,15 @@ func (chore *Chore) Run(ctx context.Context) (err error) {
break break
} }
err = chore.observers[ObserverUpgradeUser].Process(ctx, transaction) if chore.observers.UpgradeUser == nil {
continue
}
err = chore.observers.UpgradeUser.Process(ctx, transaction)
if err != nil { if err != nil {
// we don't want to halt storing transactions if upgrade user observer fails
// because this chore is designed to store new transactions.
// So auto upgrading user is a side effect which shouldn't interrupt the main process.
chore.log.Error("error upgrading user", zap.Error(ChoreErr.Wrap(err))) chore.log.Error("error upgrading user", zap.Error(ChoreErr.Wrap(err)))
} }
} }

View File

@ -85,8 +85,8 @@ func TestChore(t *testing.T) {
), ),
} }
choreObservers := map[billing.ObserverBilling]billing.Observer{ choreObservers := billing.ChoreObservers{
billing.ObserverUpgradeUser: console.NewUpgradeUserObserver(consoleDB, db, usageLimitsConfig, userBalanceForUpgrade), UpgradeUser: console.NewUpgradeUserObserver(consoleDB, db, usageLimitsConfig, userBalanceForUpgrade),
} }
chore := billing.NewChore(zaptest.NewLogger(t), paymentTypes, db, time.Hour, false, bonusRate, choreObservers) chore := billing.NewChore(zaptest.NewLogger(t), paymentTypes, db, time.Hour, false, bonusRate, choreObservers)
@ -168,8 +168,8 @@ func TestChore_UpgradeUserObserver(t *testing.T) {
_, err = sat.AddProject(ctx, user.ID, "Test Project") _, err = sat.AddProject(ctx, user.ID, "Test Project")
require.NoError(t, err) require.NoError(t, err)
choreObservers := map[billing.ObserverBilling]billing.Observer{ choreObservers := billing.ChoreObservers{
billing.ObserverUpgradeUser: console.NewUpgradeUserObserver(db.Console(), db.Billing(), sat.Config.Console.UsageLimits, sat.Config.Console.UserBalanceForUpgrade), UpgradeUser: console.NewUpgradeUserObserver(db.Console(), db.Billing(), sat.Config.Console.UsageLimits, sat.Config.Console.UserBalanceForUpgrade),
} }
amount1 := int64(200) // $2 amount1 := int64(200) // $2

View File

@ -1,12 +0,0 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package billing
import "context"
// Observer processes a billing transaction.
type Observer interface {
// Process is called repeatedly for each transaction.
Process(context.Context, Transaction) error
}