f671eb2beb
This change adds two new tables to process orders as fast as we used to but in an asynchronous manner and with hopefully less storage usage. This should help scale on cockroach, but limits us to one worker. It lays the groundwork for the order processing pipeline to be queue rather than database driven. For more details, see the added fast billing changes blueprint. It also fixes the orders db so that all the timestamps that are passed to columns that do not contain a time zone are converted to UTC at the last possible opportunity, making it less likely to use the APIs incorrectly. We really should migrate to include timezones on all of our timestamp columns. Change-Id: Ibfda8e7a3d5972b7798fb61b31ff56419c64ea35
84 lines
1.9 KiB
Go
84 lines
1.9 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package dbcleanup
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/spacemonkeygo/monkit/v3"
|
|
"github.com/zeebo/errs"
|
|
"go.uber.org/zap"
|
|
|
|
"storj.io/common/sync2"
|
|
"storj.io/storj/satellite/orders"
|
|
)
|
|
|
|
var (
|
|
// Error the default dbcleanup errs class.
|
|
Error = errs.Class("dbcleanup error")
|
|
|
|
mon = monkit.Package()
|
|
)
|
|
|
|
// Config defines configuration struct for dbcleanup chore.
|
|
type Config struct {
|
|
SerialsInterval time.Duration `help:"how often to delete expired serial numbers" default:"24h"`
|
|
}
|
|
|
|
// Chore for deleting DB entries that are no longer needed.
|
|
//
|
|
// architecture: Chore
|
|
type Chore struct {
|
|
log *zap.Logger
|
|
orders orders.DB
|
|
|
|
Serials *sync2.Cycle
|
|
}
|
|
|
|
// NewChore creates new chore for deleting DB entries.
|
|
func NewChore(log *zap.Logger, orders orders.DB, config Config) *Chore {
|
|
return &Chore{
|
|
log: log,
|
|
orders: orders,
|
|
|
|
Serials: sync2.NewCycle(config.SerialsInterval),
|
|
}
|
|
}
|
|
|
|
// Run starts the db cleanup chore.
|
|
func (chore *Chore) Run(ctx context.Context) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
return chore.Serials.Run(ctx, chore.deleteExpiredSerials)
|
|
}
|
|
|
|
func (chore *Chore) deleteExpiredSerials(ctx context.Context) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
chore.log.Debug("deleting expired serial numbers")
|
|
|
|
now := time.Now()
|
|
|
|
deleted, err := chore.orders.DeleteExpiredSerials(ctx, now)
|
|
if err != nil {
|
|
chore.log.Error("deleting expired serial numbers", zap.Error(err))
|
|
} else {
|
|
chore.log.Debug("expired serials deleted", zap.Int("items deleted", deleted))
|
|
}
|
|
|
|
deleted, err = chore.orders.DeleteExpiredConsumedSerials(ctx, now)
|
|
if err != nil {
|
|
chore.log.Error("deleting expired serial numbers", zap.Error(err))
|
|
} else {
|
|
chore.log.Debug("expired serials deleted", zap.Int("items deleted", deleted))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Close stops the dbcleanup chore.
|
|
func (chore *Chore) Close() error {
|
|
chore.Serials.Close()
|
|
return nil
|
|
}
|