diff --git a/satellite/satellitedb/projectaccounting.go b/satellite/satellitedb/projectaccounting.go index c19fc5c8e..ecf238c0d 100644 --- a/satellite/satellitedb/projectaccounting.go +++ b/satellite/satellitedb/projectaccounting.go @@ -663,7 +663,7 @@ func (db *ProjectAccounting) GetBucketTotals(ctx context.Context, projectID uuid } // ArchiveRollupsBefore archives rollups older than a given time. -func (db *ProjectAccounting) ArchiveRollupsBefore(ctx context.Context, before time.Time, batchSize int) (bucketRollupsDeleted int, err error) { +func (db *ProjectAccounting) ArchiveRollupsBefore(ctx context.Context, before time.Time, batchSize int) (archivedCount int, err error) { defer mon.Task()(&ctx)(&err) if batchSize <= 0 { @@ -672,33 +672,18 @@ func (db *ProjectAccounting) ArchiveRollupsBefore(ctx context.Context, before ti switch db.db.impl { case dbutil.Cockroach: - for { - row := db.db.QueryRow(ctx, ` - WITH rollups_to_move AS ( - DELETE FROM bucket_bandwidth_rollups - WHERE interval_start <= $1 - LIMIT $2 RETURNING * - ), moved_rollups AS ( - INSERT INTO bucket_bandwidth_rollup_archives(bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled) - SELECT bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled FROM rollups_to_move - RETURNING * - ) - SELECT count(*) FROM moved_rollups - `, before, batchSize) - var rowCount int - err = row.Scan(&rowCount) + // We operate one action at a time, because we have an index on `(action, interval_start, project_id)`. + for action := range pb.PieceAction_name { + count, err := db.archiveRollupsBeforeByAction(ctx, action, before, batchSize) + archivedCount += count if err != nil { - return bucketRollupsDeleted, err - } - bucketRollupsDeleted += rowCount - - if rowCount < batchSize { - break + return archivedCount, Error.Wrap(err) } } + return archivedCount, nil case dbutil.Postgres: - bwStatement := ` + err := db.db.DB.QueryRow(ctx, ` WITH rollups_to_move AS ( DELETE FROM bucket_bandwidth_rollups WHERE interval_start <= $1 @@ -709,16 +694,39 @@ func (db *ProjectAccounting) ArchiveRollupsBefore(ctx context.Context, before ti RETURNING * ) SELECT count(*) FROM moved_rollups - ` - row := db.db.DB.QueryRow(ctx, bwStatement, before) - var rowCount int - err = row.Scan(&rowCount) - if err != nil { - return bucketRollupsDeleted, err - } - bucketRollupsDeleted = rowCount + `, before).Scan(&archivedCount) + return archivedCount, Error.Wrap(err) + default: + return 0, nil + } +} + +func (db *ProjectAccounting) archiveRollupsBeforeByAction(ctx context.Context, action int32, before time.Time, batchSize int) (archivedCount int, err error) { + defer mon.Task()(&ctx)(&err) + + for { + var rowCount int + err := db.db.QueryRow(ctx, ` + WITH rollups_to_move AS ( + DELETE FROM bucket_bandwidth_rollups + WHERE action = $1 AND interval_start <= $2 + LIMIT $3 RETURNING * + ), moved_rollups AS ( + INSERT INTO bucket_bandwidth_rollup_archives(bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled) + SELECT bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled FROM rollups_to_move + RETURNING * + ) + SELECT count(*) FROM moved_rollups + `, int(action), before, batchSize).Scan(&rowCount) + if err != nil { + return archivedCount, Error.Wrap(err) + } + archivedCount += rowCount + + if rowCount < batchSize { + return archivedCount, nil + } } - return bucketRollupsDeleted, err } // getBucketsSinceAndBefore lists distinct bucket names for a project within a specific timeframe.