2022-06-17 00:29:31 +01:00
|
|
|
// Copyright (C) 2022 Storj Labs, Inc.
|
|
|
|
// See LICENSE for copying information.
|
|
|
|
|
|
|
|
package billing
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/zeebo/errs"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
|
|
|
|
"storj.io/common/sync2"
|
|
|
|
)
|
|
|
|
|
2023-07-20 16:37:53 +01:00
|
|
|
// Observer processes a billing transaction.
|
|
|
|
type Observer interface {
|
|
|
|
// Process is called repeatedly for each transaction.
|
|
|
|
Process(context.Context, Transaction) error
|
|
|
|
}
|
2023-07-12 16:16:51 +01:00
|
|
|
|
2023-07-20 16:37:53 +01:00
|
|
|
// ChoreObservers holds functionality to process confirmed transactions using different types of observers.
|
|
|
|
type ChoreObservers struct {
|
|
|
|
UpgradeUser Observer
|
2023-08-15 17:58:50 +01:00
|
|
|
PayInvoices Observer
|
2023-07-20 16:37:53 +01:00
|
|
|
}
|
2023-07-12 16:16:51 +01:00
|
|
|
|
2022-06-17 00:29:31 +01:00
|
|
|
// ChoreErr is billing chore err class.
|
|
|
|
var ChoreErr = errs.Class("billing chore")
|
|
|
|
|
|
|
|
// Chore periodically queries for new billing transactions from payment type.
|
|
|
|
//
|
|
|
|
// architecture: Chore
|
|
|
|
type Chore struct {
|
|
|
|
log *zap.Logger
|
|
|
|
paymentTypes []PaymentType
|
|
|
|
transactionsDB TransactionsDB
|
|
|
|
TransactionCycle *sync2.Cycle
|
|
|
|
|
|
|
|
disableLoop bool
|
2023-03-28 02:42:26 +01:00
|
|
|
bonusRate int64
|
2023-07-20 16:37:53 +01:00
|
|
|
observers ChoreObservers
|
2022-06-17 00:29:31 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// NewChore creates new chore.
|
2023-07-20 16:37:53 +01:00
|
|
|
func NewChore(log *zap.Logger, paymentTypes []PaymentType, transactionsDB TransactionsDB, interval time.Duration, disableLoop bool, bonusRate int64, observers ChoreObservers) *Chore {
|
2022-06-17 00:29:31 +01:00
|
|
|
return &Chore{
|
|
|
|
log: log,
|
|
|
|
paymentTypes: paymentTypes,
|
|
|
|
transactionsDB: transactionsDB,
|
|
|
|
TransactionCycle: sync2.NewCycle(interval),
|
|
|
|
disableLoop: disableLoop,
|
2023-03-28 02:42:26 +01:00
|
|
|
bonusRate: bonusRate,
|
2023-07-12 16:16:51 +01:00
|
|
|
observers: observers,
|
2022-06-17 00:29:31 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Run runs billing transaction loop.
|
|
|
|
func (chore *Chore) Run(ctx context.Context) (err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
|
|
|
return chore.TransactionCycle.Run(ctx, func(ctx context.Context) error {
|
|
|
|
if chore.disableLoop {
|
|
|
|
chore.log.Debug("Skipping chore iteration as loop is disabled", zap.Bool("disableLoop", chore.disableLoop))
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, paymentType := range chore.paymentTypes {
|
|
|
|
lastTransactionTime, lastTransactionMetadata, err := chore.transactionsDB.LastTransaction(ctx, paymentType.Source(), paymentType.Type())
|
2022-08-15 15:41:19 +01:00
|
|
|
if err != nil && !errs.Is(err, ErrNoTransactions) {
|
2022-06-17 00:29:31 +01:00
|
|
|
chore.log.Error("unable to determine timestamp of last transaction", zap.Error(ChoreErr.Wrap(err)))
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
transactions, err := paymentType.GetNewTransactions(ctx, lastTransactionTime, lastTransactionMetadata)
|
|
|
|
if err != nil {
|
|
|
|
chore.log.Error("unable to get new billing transactions", zap.Error(ChoreErr.Wrap(err)))
|
|
|
|
continue
|
|
|
|
}
|
2022-08-23 22:59:15 +01:00
|
|
|
for _, transaction := range transactions {
|
2023-03-28 02:42:26 +01:00
|
|
|
if bonus, ok := prepareBonusTransaction(chore.bonusRate, paymentType.Source(), transaction); ok {
|
|
|
|
_, err = chore.transactionsDB.Insert(ctx, transaction, bonus)
|
|
|
|
} else {
|
|
|
|
_, err = chore.transactionsDB.Insert(ctx, transaction)
|
|
|
|
}
|
2022-08-23 22:59:15 +01:00
|
|
|
if err != nil {
|
|
|
|
chore.log.Error("error storing transaction to db", zap.Error(ChoreErr.Wrap(err)))
|
|
|
|
// we need to halt storing transactions if one fails, so that it can be tried again on the next loop.
|
|
|
|
break
|
|
|
|
}
|
2023-07-12 16:16:51 +01:00
|
|
|
|
2023-08-15 17:58:50 +01:00
|
|
|
if chore.observers.UpgradeUser != nil {
|
|
|
|
err = chore.observers.UpgradeUser.Process(ctx, transaction)
|
|
|
|
if err != nil {
|
|
|
|
// we don't want to halt storing transactions if upgrade user observer fails
|
|
|
|
// because this chore is designed to store new transactions.
|
|
|
|
// So auto upgrading user is a side effect which shouldn't interrupt the main process.
|
|
|
|
chore.log.Error("error upgrading user", zap.Error(ChoreErr.Wrap(err)))
|
|
|
|
}
|
2023-07-20 16:37:53 +01:00
|
|
|
}
|
|
|
|
|
2023-08-15 17:58:50 +01:00
|
|
|
if chore.observers.PayInvoices != nil {
|
|
|
|
err = chore.observers.PayInvoices.Process(ctx, transaction)
|
|
|
|
if err != nil {
|
|
|
|
chore.log.Error("error paying invoices", zap.Error(ChoreErr.Wrap(err)))
|
|
|
|
}
|
2023-07-12 16:16:51 +01:00
|
|
|
}
|
2022-06-17 00:29:31 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
// Close closes all underlying resources.
|
|
|
|
func (chore *Chore) Close() (err error) {
|
|
|
|
defer mon.Task()(nil)(&err)
|
|
|
|
chore.TransactionCycle.Close()
|
|
|
|
return nil
|
|
|
|
}
|
2023-08-15 17:58:50 +01:00
|
|
|
|
|
|
|
// TestSetPaymentTypes is used in tests to change the payment
|
|
|
|
// types this chore tracks.
|
|
|
|
func (chore *Chore) TestSetPaymentTypes(types []PaymentType) {
|
|
|
|
chore.paymentTypes = types
|
|
|
|
}
|