storj/satellite/payments/billing/chore.go
dlamarmorgan afe58323f9 satellitedb: remove use of batch payment insert
Removed batch insert of payments since they do not guarantee order. Order of payments sent to the payments DB is important, because the billing chore will request new payments based on the last received payment. If the last payment inserted is not the last payment received, duplicate payments will be inserted into the billing table.

Change-Id: Ic3335c89fa8031f7bc16f417ca23ed83301ef8f6
2022-08-30 14:45:55 -07:00

82 lines
2.3 KiB
Go

// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package billing
import (
"context"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/sync2"
)
// ChoreErr is billing chore err class.
var ChoreErr = errs.Class("billing chore")
// Chore periodically queries for new billing transactions from payment type.
//
// architecture: Chore
type Chore struct {
log *zap.Logger
paymentTypes []PaymentType
transactionsDB TransactionsDB
TransactionCycle *sync2.Cycle
disableLoop bool
}
// NewChore creates new chore.
func NewChore(log *zap.Logger, paymentTypes []PaymentType, transactionsDB TransactionsDB, interval time.Duration, disableLoop bool) *Chore {
return &Chore{
log: log,
paymentTypes: paymentTypes,
transactionsDB: transactionsDB,
TransactionCycle: sync2.NewCycle(interval),
disableLoop: disableLoop,
}
}
// Run runs billing transaction loop.
func (chore *Chore) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
return chore.TransactionCycle.Run(ctx, func(ctx context.Context) error {
if chore.disableLoop {
chore.log.Debug("Skipping chore iteration as loop is disabled", zap.Bool("disableLoop", chore.disableLoop))
return nil
}
for _, paymentType := range chore.paymentTypes {
lastTransactionTime, lastTransactionMetadata, err := chore.transactionsDB.LastTransaction(ctx, paymentType.Source(), paymentType.Type())
if err != nil {
chore.log.Error("unable to determine timestamp of last transaction", zap.Error(ChoreErr.Wrap(err)))
continue
}
transactions, err := paymentType.GetNewTransactions(ctx, lastTransactionTime, lastTransactionMetadata)
if err != nil {
chore.log.Error("unable to get new billing transactions", zap.Error(ChoreErr.Wrap(err)))
continue
}
for _, transaction := range transactions {
_, err = chore.transactionsDB.Insert(ctx, transaction)
if err != nil {
chore.log.Error("error storing transaction to db", zap.Error(ChoreErr.Wrap(err)))
// we need to halt storing transactions if one fails, so that it can be tried again on the next loop.
break
}
}
}
return nil
})
}
// Close closes all underlying resources.
func (chore *Chore) Close() (err error) {
defer mon.Task()(nil)(&err)
chore.TransactionCycle.Close()
return nil
}