satellite/satellitedb: optimize ProjectAccounting.ArchiveRollupsBefore
The previous query was making a full table scan. This modifies code to do the queries separately on each action. It will probably be slower on a small table, however there should be a several magnitude boost on large tables. Change-Id: Ib8885024d8a5a0102bbab4ce09bd6af9047930c9
This commit is contained in:
parent
c248651f3f
commit
bf5194d134
@ -663,7 +663,7 @@ func (db *ProjectAccounting) GetBucketTotals(ctx context.Context, projectID uuid
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ArchiveRollupsBefore archives rollups older than a given time.
|
// ArchiveRollupsBefore archives rollups older than a given time.
|
||||||
func (db *ProjectAccounting) ArchiveRollupsBefore(ctx context.Context, before time.Time, batchSize int) (bucketRollupsDeleted int, err error) {
|
func (db *ProjectAccounting) ArchiveRollupsBefore(ctx context.Context, before time.Time, batchSize int) (archivedCount int, err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
if batchSize <= 0 {
|
if batchSize <= 0 {
|
||||||
@ -672,33 +672,18 @@ func (db *ProjectAccounting) ArchiveRollupsBefore(ctx context.Context, before ti
|
|||||||
|
|
||||||
switch db.db.impl {
|
switch db.db.impl {
|
||||||
case dbutil.Cockroach:
|
case dbutil.Cockroach:
|
||||||
for {
|
|
||||||
row := db.db.QueryRow(ctx, `
|
|
||||||
WITH rollups_to_move AS (
|
|
||||||
DELETE FROM bucket_bandwidth_rollups
|
|
||||||
WHERE interval_start <= $1
|
|
||||||
LIMIT $2 RETURNING *
|
|
||||||
), moved_rollups AS (
|
|
||||||
INSERT INTO bucket_bandwidth_rollup_archives(bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled)
|
|
||||||
SELECT bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled FROM rollups_to_move
|
|
||||||
RETURNING *
|
|
||||||
)
|
|
||||||
SELECT count(*) FROM moved_rollups
|
|
||||||
`, before, batchSize)
|
|
||||||
|
|
||||||
var rowCount int
|
// We operate one action at a time, because we have an index on `(action, interval_start, project_id)`.
|
||||||
err = row.Scan(&rowCount)
|
for action := range pb.PieceAction_name {
|
||||||
|
count, err := db.archiveRollupsBeforeByAction(ctx, action, before, batchSize)
|
||||||
|
archivedCount += count
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return bucketRollupsDeleted, err
|
return archivedCount, Error.Wrap(err)
|
||||||
}
|
|
||||||
bucketRollupsDeleted += rowCount
|
|
||||||
|
|
||||||
if rowCount < batchSize {
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return archivedCount, nil
|
||||||
case dbutil.Postgres:
|
case dbutil.Postgres:
|
||||||
bwStatement := `
|
err := db.db.DB.QueryRow(ctx, `
|
||||||
WITH rollups_to_move AS (
|
WITH rollups_to_move AS (
|
||||||
DELETE FROM bucket_bandwidth_rollups
|
DELETE FROM bucket_bandwidth_rollups
|
||||||
WHERE interval_start <= $1
|
WHERE interval_start <= $1
|
||||||
@ -709,16 +694,39 @@ func (db *ProjectAccounting) ArchiveRollupsBefore(ctx context.Context, before ti
|
|||||||
RETURNING *
|
RETURNING *
|
||||||
)
|
)
|
||||||
SELECT count(*) FROM moved_rollups
|
SELECT count(*) FROM moved_rollups
|
||||||
`
|
`, before).Scan(&archivedCount)
|
||||||
row := db.db.DB.QueryRow(ctx, bwStatement, before)
|
return archivedCount, Error.Wrap(err)
|
||||||
var rowCount int
|
default:
|
||||||
err = row.Scan(&rowCount)
|
return 0, nil
|
||||||
if err != nil {
|
}
|
||||||
return bucketRollupsDeleted, err
|
}
|
||||||
}
|
|
||||||
bucketRollupsDeleted = rowCount
|
func (db *ProjectAccounting) archiveRollupsBeforeByAction(ctx context.Context, action int32, before time.Time, batchSize int) (archivedCount int, err error) {
|
||||||
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
|
for {
|
||||||
|
var rowCount int
|
||||||
|
err := db.db.QueryRow(ctx, `
|
||||||
|
WITH rollups_to_move AS (
|
||||||
|
DELETE FROM bucket_bandwidth_rollups
|
||||||
|
WHERE action = $1 AND interval_start <= $2
|
||||||
|
LIMIT $3 RETURNING *
|
||||||
|
), moved_rollups AS (
|
||||||
|
INSERT INTO bucket_bandwidth_rollup_archives(bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled)
|
||||||
|
SELECT bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled FROM rollups_to_move
|
||||||
|
RETURNING *
|
||||||
|
)
|
||||||
|
SELECT count(*) FROM moved_rollups
|
||||||
|
`, int(action), before, batchSize).Scan(&rowCount)
|
||||||
|
if err != nil {
|
||||||
|
return archivedCount, Error.Wrap(err)
|
||||||
|
}
|
||||||
|
archivedCount += rowCount
|
||||||
|
|
||||||
|
if rowCount < batchSize {
|
||||||
|
return archivedCount, nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return bucketRollupsDeleted, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// getBucketsSinceAndBefore lists distinct bucket names for a project within a specific timeframe.
|
// getBucketsSinceAndBefore lists distinct bucket names for a project within a specific timeframe.
|
||||||
|
Loading…
Reference in New Issue
Block a user