satellite/satellitedb: batch delete storage node tallies

Currently we have a significant number of tallies that need to be
deleted together. Add a limit (by default 10k) to how many will
be deleted at the same time.

Change-Id: If530383f19b4d3bb83ed5fe956610a2e52f130a1
This commit is contained in:
Egon Elbre 2022-07-20 11:10:29 +03:00
parent a9de5ce6c3
commit 82fede2132
7 changed files with 74 additions and 27 deletions

View File

@ -188,7 +188,7 @@ type StoragenodeAccounting interface {
// QueryStorageNodeUsage returns slice of StorageNodeUsage for given period
QueryStorageNodeUsage(ctx context.Context, nodeID storj.NodeID, start time.Time, end time.Time) ([]StorageNodeUsage, error)
// DeleteTalliesBefore deletes all tallies prior to some time
DeleteTalliesBefore(ctx context.Context, latestRollup time.Time) error
DeleteTalliesBefore(ctx context.Context, latestRollup time.Time, batchSize int) error
// ArchiveRollupsBefore archives rollups older than a given time and returns num storagenode and bucket bandwidth rollups archived.
ArchiveRollupsBefore(ctx context.Context, before time.Time, batchSize int) (numArchivedNodeBW int, err error)
// GetRollupsSince retrieves all archived bandwidth rollup records since a given time. A hard limit batch size is used for results.

View File

@ -17,29 +17,32 @@ import (
// Config contains configurable values for rollup.
type Config struct {
Interval time.Duration `help:"how frequently rollup should run" releaseDefault:"24h" devDefault:"120s" testDefault:"$TESTINTERVAL"`
DeleteTallies bool `help:"option for deleting tallies after they are rolled up" default:"true"`
Interval time.Duration `help:"how frequently rollup should run" releaseDefault:"24h" devDefault:"120s" testDefault:"$TESTINTERVAL"`
DeleteTallies bool `help:"option for deleting tallies after they are rolled up" default:"true"`
DeleteTalliesBatchSize int `help:"how many tallies to delete in a batch" default:"10000"`
}
// Service is the rollup service for totalling data on storage nodes on daily intervals.
//
// architecture: Chore
type Service struct {
logger *zap.Logger
Loop *sync2.Cycle
sdb accounting.StoragenodeAccounting
deleteTallies bool
OrderExpiration time.Duration
logger *zap.Logger
Loop *sync2.Cycle
sdb accounting.StoragenodeAccounting
deleteTallies bool
deleteTalliesBatchSize int
OrderExpiration time.Duration
}
// New creates a new rollup service.
func New(logger *zap.Logger, sdb accounting.StoragenodeAccounting, interval time.Duration, deleteTallies bool, orderExpiration time.Duration) *Service {
func New(logger *zap.Logger, sdb accounting.StoragenodeAccounting, config Config, orderExpiration time.Duration) *Service {
return &Service{
logger: logger,
Loop: sync2.NewCycle(interval),
sdb: sdb,
deleteTallies: deleteTallies,
OrderExpiration: orderExpiration,
logger: logger,
Loop: sync2.NewCycle(config.Interval),
sdb: sdb,
deleteTallies: config.DeleteTallies,
deleteTalliesBatchSize: config.DeleteTalliesBatchSize,
OrderExpiration: orderExpiration,
}
}
@ -101,7 +104,7 @@ func (r *Service) Rollup(ctx context.Context) (err error) {
if r.deleteTallies {
// Delete already rolled up tallies
latestTally = latestTally.Add(-r.OrderExpiration)
err = r.sdb.DeleteTalliesBefore(ctx, latestTally)
err = r.sdb.DeleteTalliesBefore(ctx, latestTally, r.deleteTalliesBatchSize)
if err != nil {
return Error.Wrap(err)
}

View File

@ -46,7 +46,7 @@ func TestRollupNoDeletes(t *testing.T) {
storageNodes = createNodes(ctx, t, db)
)
rollupService := rollup.New(testplanet.NewLogger(t), snAccountingDB, 120*time.Second, false, time.Hour)
rollupService := rollup.New(testplanet.NewLogger(t), snAccountingDB, rollup.Config{Interval: 120 * time.Second}, time.Hour)
// disqualifying nodes is unrelated to this test, but it is added here
// to confirm the disqualification shows up in the accounting CSVRow
@ -147,7 +147,7 @@ func TestRollupDeletes(t *testing.T) {
storageNodes = createNodes(ctx, t, db)
)
rollupService := rollup.New(testplanet.NewLogger(t), snAccountingDB, 120*time.Second, true, time.Hour)
rollupService := rollup.New(testplanet.NewLogger(t), snAccountingDB, rollup.Config{Interval: 120 * time.Second, DeleteTallies: true}, time.Hour)
// disqualifying nodes is unrelated to this test, but it is added here
// to confirm the disqualification shows up in the accounting CSVRow

View File

@ -17,7 +17,6 @@ import (
"storj.io/common/testrand"
"storj.io/common/uuid"
"storj.io/storj/private/testplanet"
"storj.io/storj/private/teststorj"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/accounting/tally"
"storj.io/storj/satellite/metabase"
@ -30,7 +29,7 @@ func TestDeleteTalliesBefore(t *testing.T) {
}{
{
eraseBefore: time.Now(),
expectedRaws: 1,
expectedRaws: 3,
},
{
eraseBefore: time.Now().Add(24 * time.Hour),
@ -43,14 +42,15 @@ func TestDeleteTalliesBefore(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
id := teststorj.NodeIDFromBytes([]byte{})
nodeData := make(map[storj.NodeID]float64)
nodeData[id] = float64(1000)
nodeData[storj.NodeID{1}] = float64(1000)
nodeData[storj.NodeID{2}] = float64(1000)
nodeData[storj.NodeID{3}] = float64(1000)
err := planet.Satellites[0].DB.StoragenodeAccounting().SaveTallies(ctx, time.Now(), nodeData)
require.NoError(t, err)
err = planet.Satellites[0].DB.StoragenodeAccounting().DeleteTalliesBefore(ctx, test.eraseBefore)
err = planet.Satellites[0].DB.StoragenodeAccounting().DeleteTalliesBefore(ctx, test.eraseBefore, 1)
require.NoError(t, err)
raws, err := planet.Satellites[0].DB.StoragenodeAccounting().GetTallies(ctx)

View File

@ -472,7 +472,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
// Lets add 1 more day so we catch any off by one errors when deleting tallies
orderExpirationPlusDay := config.Orders.Expiration + config.Rollup.Interval
peer.Accounting.Rollup = rollup.New(peer.Log.Named("accounting:rollup"), peer.DB.StoragenodeAccounting(), config.Rollup.Interval, config.Rollup.DeleteTallies, orderExpirationPlusDay)
peer.Accounting.Rollup = rollup.New(peer.Log.Named("accounting:rollup"), peer.DB.StoragenodeAccounting(), config.Rollup, orderExpirationPlusDay)
peer.Services.Add(lifecycle.Item{
Name: "accounting:rollup",
Run: peer.Accounting.Rollup.Run,

View File

@ -519,11 +519,52 @@ func (db *StoragenodeAccounting) QueryStorageNodeUsage(ctx context.Context, node
}
// DeleteTalliesBefore deletes all raw tallies prior to some time.
func (db *StoragenodeAccounting) DeleteTalliesBefore(ctx context.Context, latestRollup time.Time) (err error) {
func (db *StoragenodeAccounting) DeleteTalliesBefore(ctx context.Context, latestRollup time.Time, batchSize int) (err error) {
defer mon.Task()(&ctx)(&err)
deleteRawSQL := `DELETE FROM storagenode_storage_tallies WHERE interval_end_time < ?`
_, err = db.db.DB.ExecContext(ctx, db.db.Rebind(deleteRawSQL), latestRollup)
return err
if batchSize <= 0 {
batchSize = 10000
}
var query string
switch db.db.impl {
case dbutil.Cockroach:
query = `
DELETE FROM storagenode_storage_tallies
WHERE interval_end_time < ?
LIMIT ?`
case dbutil.Postgres:
query = `
DELETE FROM storagenode_storage_tallies
WHERE ctid IN (
SELECT ctid
FROM storagenode_storage_tallies
WHERE interval_end_time < ?
ORDER BY interval_end_time
LIMIT ?
)`
default:
return Error.New("unsupported database: %v", db.db.impl)
}
query = db.db.Rebind(query)
for {
res, err := db.db.DB.ExecContext(ctx, query, latestRollup, batchSize)
if err != nil {
if errs.Is(err, sql.ErrNoRows) {
return nil
}
return Error.Wrap(err)
}
affected, err := res.RowsAffected()
if err != nil {
return Error.Wrap(err)
}
if affected == 0 {
return nil
}
}
}
// ArchiveRollupsBefore archives rollups older than a given time.

View File

@ -817,6 +817,9 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
# option for deleting tallies after they are rolled up
# rollup.delete-tallies: true
# how many tallies to delete in a batch
# rollup.delete-tallies-batch-size: 10000
# how frequently rollup should run
# rollup.interval: 24h0m0s