storj/satellite/accounting/rolluparchive/rolluparchive.go
JT Olio da9ca0c650 testplanet/satellite: reduce the number of places default values need to be configured
Satellites set their configuration values to default values using
cfgstruct, however, it turns out our tests don't test these values
at all! Instead, they have a completely separate definition system
that is easy to forget about.

As is to be expected, these values have drifted, and it appears
in a few cases test planet is testing unreasonable values that we
won't see in production, or perhaps worse, features enabled in
production were missed and weren't enabled in testplanet.

This change makes it so all values are configured the same,
systematic way, so it's easy to see when test values are different
than dev values or release values, and it's less hard to forget
to enable features in testplanet.

In terms of reviewing, this change should be actually fairly
easy to review, considering private/testplanet/satellite.go keeps
the current config system and the new one and confirms that they
result in identical configurations, so you can be certain that
nothing was missed and the config is all correct.
You can also check the config lock to see what actual config
values changed.

Change-Id: I6715d0794887f577e21742afcf56fd2b9d12170e
2021-06-01 22:14:17 +00:00

93 lines
3.0 KiB
Go

// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package rolluparchive
import (
"context"
"time"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/sync2"
"storj.io/storj/satellite/accounting"
)
// Error is a standard error class for this package.
var (
Error = errs.Class("rolluparchive")
mon = monkit.Package()
)
// Config contains configurable values for rollup archiver.
type Config struct {
Interval time.Duration `help:"how frequently rollup archiver should run" releaseDefault:"24h" devDefault:"120s" testDefault:"$TESTINTERVAL"`
ArchiveAge time.Duration `help:"age at which a rollup is archived" default:"2160h" testDefault:"24h"`
BatchSize int `help:"number of records to delete per delete execution. Used only for crdb which is slow without limit." default:"500" testDefault:"1000"`
Enabled bool `help:"whether or not the rollup archive is enabled." default:"true"`
}
// Chore archives bucket and storagenode rollups at a given interval.
//
// architecture: Chore
type Chore struct {
log *zap.Logger
Loop *sync2.Cycle
archiveAge time.Duration
batchSize int
nodeAccounting accounting.StoragenodeAccounting
projectAccounting accounting.ProjectAccounting
}
// New creates a new rollup archiver chore.
func New(log *zap.Logger, sdb accounting.StoragenodeAccounting, pdb accounting.ProjectAccounting, config Config) *Chore {
return &Chore{
log: log,
Loop: sync2.NewCycle(config.Interval),
archiveAge: config.ArchiveAge,
batchSize: config.BatchSize,
nodeAccounting: sdb,
projectAccounting: pdb,
}
}
// Run starts the archiver chore.
func (chore *Chore) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
if chore.archiveAge < 0 {
return Error.New("archive age can't be less than 0")
}
return chore.Loop.Run(ctx, func(ctx context.Context) error {
cutoff := time.Now().UTC().Add(-chore.archiveAge)
err := chore.ArchiveRollups(ctx, cutoff, chore.batchSize)
if err != nil {
chore.log.Error("error archiving SN and bucket bandwidth rollups", zap.Error(err))
}
return nil
})
}
// Close stops the service and releases any resources.
func (chore *Chore) Close() error {
chore.Loop.Close()
return nil
}
// ArchiveRollups will remove old rollups from active rollup tables.
func (chore *Chore) ArchiveRollups(ctx context.Context, cutoff time.Time, batchSize int) (err error) {
defer mon.Task()(&ctx)(&err)
nodeRollupsArchived, err := chore.nodeAccounting.ArchiveRollupsBefore(ctx, cutoff, batchSize)
if err != nil {
chore.log.Error("archiving bandwidth rollups", zap.Int("node rollups archived", nodeRollupsArchived), zap.Error(err))
return Error.Wrap(err)
}
bucketRollupsArchived, err := chore.projectAccounting.ArchiveRollupsBefore(ctx, cutoff, batchSize)
if err != nil {
chore.log.Error("archiving bandwidth rollups", zap.Int("bucket rollups archived", bucketRollupsArchived), zap.Error(err))
return Error.Wrap(err)
}
return nil
}