storj/satellite/payments/stripecoinpayments/clearing.go
paul cannon d8733ddd40 satellite/satellitedb: stop using _gob columns
This sets the corresponding _numeric columns to be NOT NULL (it has been
verified manually that there are no more NULL _numeric values on any
known satellites, and it should be impossible with current code to get
new NULL values in the _numeric columns.

We can't drop the _gob columns immediately, as there will still be code
running that expects them, but once this version is deployed we can
finally drop them and be totally done with this crazy 5-step migration.

Change-Id: I518302528d972090d56b3eedc815656610ac8e73
2022-03-30 04:13:13 +00:00

82 lines
2.0 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package stripecoinpayments
import (
"context"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"storj.io/common/sync2"
)
// ErrChore is stripecoinpayments clearing loop chore error class.
var ErrChore = errs.Class("stripecoinpayments chore")
// Chore runs clearing process of reconciling transactions deposits,
// customer balance, invoices and usages.
//
// architecture: Chore
type Chore struct {
log *zap.Logger
service *Service
TransactionCycle *sync2.Cycle
AccountBalanceCycle *sync2.Cycle
}
// NewChore creates new clearing loop chore.
// TODO: uncomment new interval when coupons will be finished.
func NewChore(log *zap.Logger, service *Service, txInterval, accBalanceInterval time.Duration) *Chore {
return &Chore{
log: log,
service: service,
TransactionCycle: sync2.NewCycle(txInterval),
AccountBalanceCycle: sync2.NewCycle(accBalanceInterval),
}
}
// Run runs all clearing related cycles.
func (chore *Chore) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
var group errgroup.Group
chore.TransactionCycle.Start(ctx, &group,
func(ctx context.Context) error {
chore.log.Info("running transactions update cycle")
if err := chore.service.updateTransactionsLoop(ctx); err != nil {
chore.log.Error("transaction update cycle failed", zap.Error(ErrChore.Wrap(err)))
}
return nil
},
)
chore.AccountBalanceCycle.Start(ctx, &group,
func(ctx context.Context) error {
chore.log.Info("running account balance update cycle")
if err := chore.service.updateAccountBalanceLoop(ctx); err != nil {
chore.log.Error("account balance update cycle failed", zap.Error(ErrChore.Wrap(err)))
}
return nil
},
)
return ErrChore.Wrap(group.Wait())
}
// Close closes all underlying resources.
func (chore *Chore) Close() (err error) {
defer mon.Task()(nil)(&err)
chore.TransactionCycle.Close()
chore.AccountBalanceCycle.Close()
return nil
}