satellite/db: fix long loadtime for charges endpoint

This change addresses an issue where the /charges endpoint will take a
while to respond due to a project having a large number of buckets.
The queries responsible for this have been merged into a single query to
get all data needed at a go and potentially improve performance.
Benchmarks indicate that this new way is more performant than the
previous.

name                                 old ms/op  new ms/op
Postgres/sum_all_partner_usages          7.534      0.622
Postgres/individual_partner_usages       6.104      0.588
Cockroach/sum_all_partner_usages        14.813      3.057
Cockroach/individual_partner_usages     16.046      2.852

Issue: https://github.com/storj/storj-private/issues/277

Change-Id: Ibb7f867ab6610b3cb1ba203961f7d6aef6bfda4a
This commit is contained in:
Wilfred Asomani 2023-05-31 10:44:24 +00:00 committed by Storj Robot
parent 2a4c1fa41e
commit 676178299f
2 changed files with 89 additions and 96 deletions

View File

@ -549,55 +549,67 @@ func (db *ProjectAccounting) GetProjectTotal(ctx context.Context, projectID uuid
func (db *ProjectAccounting) GetProjectTotalByPartner(ctx context.Context, projectID uuid.UUID, partnerNames []string, since, before time.Time) (usages map[string]accounting.ProjectUsage, err error) { func (db *ProjectAccounting) GetProjectTotalByPartner(ctx context.Context, projectID uuid.UUID, partnerNames []string, since, before time.Time) (usages map[string]accounting.ProjectUsage, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
since = timeTruncateDown(since) since = timeTruncateDown(since)
bucketNames, err := db.getBucketsSinceAndBefore(ctx, projectID, since, before)
if err != nil {
return nil, err
}
storageQuery := db.db.Rebind(` storageQuery := db.db.Rebind(`
SELECT SELECT * FROM (
bucket_storage_tallies.interval_start, SELECT
bucket_storage_tallies.total_bytes, COALESCE(t.bucket_name, rollups.bucket_name) AS bucket_name,
bucket_storage_tallies.inline, COALESCE(t.interval_start, rollups.interval_start) AS interval_start,
bucket_storage_tallies.remote, COALESCE(t.total_bytes, 0) AS total_bytes,
bucket_storage_tallies.total_segments_count, COALESCE(t.inline, 0) AS inline,
bucket_storage_tallies.object_count COALESCE(t.remote, 0) AS remote,
FROM COALESCE(t.total_segments_count, 0) AS total_segments_count,
bucket_storage_tallies COALESCE(t.object_count, 0) AS object_count,
WHERE m.user_agent,
bucket_storage_tallies.project_id = ? AND COALESCE(rollups.egress, 0) AS egress
bucket_storage_tallies.bucket_name = ? AND FROM
bucket_storage_tallies.interval_start >= ? AND bucket_storage_tallies AS t
bucket_storage_tallies.interval_start < ? FULL OUTER JOIN (
ORDER BY bucket_storage_tallies.interval_start DESC SELECT
`) bucket_name,
SUM(settled + inline) AS egress,
totalEgressQuery := db.db.Rebind(` MIN(interval_start) AS interval_start
SELECT FROM
COALESCE(SUM(settled) + SUM(inline), 0) bucket_bandwidth_rollups
FROM WHERE
bucket_bandwidth_rollups project_id = $1 AND
WHERE interval_start >= $2 AND
project_id = ? AND interval_start < $3 AND
bucket_name = ? AND action = $4
interval_start >= ? AND GROUP BY
interval_start < ? AND bucket_name
action = ?; ) AS rollups ON
`) t.bucket_name = rollups.bucket_name
LEFT JOIN bucket_metainfos AS m ON
m.project_id = $1 AND
m.name = COALESCE(t.bucket_name, rollups.bucket_name)
WHERE
(t.project_id IS NULL OR t.project_id = $1) AND
COALESCE(t.interval_start, rollups.interval_start) >= $2 AND
COALESCE(t.interval_start, rollups.interval_start) < $3
) AS q` + db.db.impl.AsOfSystemInterval(-10) + ` ORDER BY bucket_name, interval_start DESC`)
usages = make(map[string]accounting.ProjectUsage) usages = make(map[string]accounting.ProjectUsage)
for _, bucket := range bucketNames { storageTalliesRows, err := db.db.QueryContext(ctx, storageQuery, projectID[:], since, before, pb.PieceAction_GET)
userAgentRow, err := db.db.Get_BucketMetainfo_UserAgent_By_ProjectId_And_Name(ctx, if err != nil {
dbx.BucketMetainfo_ProjectId(projectID[:]), return nil, err
dbx.BucketMetainfo_Name([]byte(bucket))) }
if err != nil && !errors.Is(err, sql.ErrNoRows) { var prevTallyForBucket = make(map[string]*accounting.BucketStorageTally)
return nil, err var recentBucket string
for storageTalliesRows.Next() {
tally := accounting.BucketStorageTally{}
var userAgent []byte
var inline, remote, egress int64
err = storageTalliesRows.Scan(&tally.BucketName, &tally.IntervalStart, &tally.TotalBytes, &inline, &remote, &tally.TotalSegmentCount, &tally.ObjectCount, &userAgent, &egress)
if err != nil {
return nil, errs.Combine(err, storageTalliesRows.Close())
} }
var partner string var partner string
if userAgentRow != nil && userAgentRow.UserAgent != nil { if userAgent != nil {
entries, err := useragent.ParseEntries(userAgentRow.UserAgent) entries, err := useragent.ParseEntries(userAgent)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -611,59 +623,40 @@ func (db *ProjectAccounting) GetProjectTotalByPartner(ctx context.Context, proje
} }
} }
} }
if _, ok := usages[partner]; !ok { if _, ok := usages[partner]; !ok {
usages[partner] = accounting.ProjectUsage{Since: since, Before: before} usages[partner] = accounting.ProjectUsage{Since: since, Before: before}
} }
usage := usages[partner] usage := usages[partner]
storageTalliesRows, err := db.db.QueryContext(ctx, storageQuery, projectID[:], []byte(bucket), since, before) if tally.TotalBytes == 0 {
if err != nil { tally.TotalBytes = inline + remote
return nil, err
} }
var prevTally *accounting.BucketStorageTally if tally.BucketName != recentBucket {
for storageTalliesRows.Next() { usage.Egress += egress
tally := accounting.BucketStorageTally{} recentBucket = tally.BucketName
var inline, remote int64
err = storageTalliesRows.Scan(&tally.IntervalStart, &tally.TotalBytes, &inline, &remote, &tally.TotalSegmentCount, &tally.ObjectCount)
if err != nil {
return nil, errs.Combine(err, storageTalliesRows.Close())
}
if tally.TotalBytes == 0 {
tally.TotalBytes = inline + remote
}
if prevTally == nil {
prevTally = &tally
continue
}
hours := prevTally.IntervalStart.Sub(tally.IntervalStart).Hours()
usage.Storage += memory.Size(tally.TotalBytes).Float64() * hours
usage.SegmentCount += float64(tally.TotalSegmentCount) * hours
usage.ObjectCount += float64(tally.ObjectCount) * hours
prevTally = &tally
} }
err = errs.Combine(storageTalliesRows.Err(), storageTalliesRows.Close()) if _, ok := prevTallyForBucket[tally.BucketName]; !ok {
if err != nil { prevTallyForBucket[tally.BucketName] = &tally
return nil, err usages[partner] = usage
continue
} }
totalEgressRow := db.db.QueryRowContext(ctx, totalEgressQuery, projectID[:], []byte(bucket), since, before, pb.PieceAction_GET) hours := prevTallyForBucket[tally.BucketName].IntervalStart.Sub(tally.IntervalStart).Hours()
if err != nil { usage.Storage += memory.Size(tally.TotalBytes).Float64() * hours
return nil, err usage.SegmentCount += float64(tally.TotalSegmentCount) * hours
} usage.ObjectCount += float64(tally.ObjectCount) * hours
var egress int64
if err = totalEgressRow.Scan(&egress); err != nil {
return nil, err
}
usage.Egress += egress
usages[partner] = usage usages[partner] = usage
prevTallyForBucket[tally.BucketName] = &tally
}
err = errs.Combine(storageTalliesRows.Err(), storageTalliesRows.Close())
if err != nil {
return nil, err
} }
return usages, nil return usages, nil

View File

@ -205,31 +205,31 @@ func Test_GetProjectTotal(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
const epsilon = 1e-8 const epsilon = 1e-8
require.InDelta(t, usage.Storage, float64(tallies[0].Bytes()+tallies[1].Bytes()), epsilon) require.InDelta(t, float64(tallies[0].Bytes()+tallies[1].Bytes()), usage.Storage, epsilon)
require.InDelta(t, usage.SegmentCount, float64(tallies[0].TotalSegmentCount+tallies[1].TotalSegmentCount), epsilon) require.InDelta(t, float64(tallies[0].TotalSegmentCount+tallies[1].TotalSegmentCount), usage.SegmentCount, epsilon)
require.InDelta(t, usage.ObjectCount, float64(tallies[0].ObjectCount+tallies[1].ObjectCount), epsilon) require.InDelta(t, float64(tallies[0].ObjectCount+tallies[1].ObjectCount), usage.ObjectCount, epsilon)
require.Equal(t, usage.Egress, expectedEgress) require.Equal(t, expectedEgress, usage.Egress)
require.Equal(t, usage.Since, tallies[0].IntervalStart) require.Equal(t, tallies[0].IntervalStart, usage.Since)
require.Equal(t, usage.Before, tallies[2].IntervalStart.Add(time.Minute)) require.Equal(t, tallies[2].IntervalStart.Add(time.Minute), usage.Before)
// Ensure that GetProjectTotal treats the 'before' arg as exclusive // Ensure that GetProjectTotal treats the 'before' arg as exclusive
usage, err = db.ProjectAccounting().GetProjectTotal(ctx, projectID, tallies[0].IntervalStart, tallies[2].IntervalStart) usage, err = db.ProjectAccounting().GetProjectTotal(ctx, projectID, tallies[0].IntervalStart, tallies[2].IntervalStart)
require.NoError(t, err) require.NoError(t, err)
require.InDelta(t, usage.Storage, float64(tallies[0].Bytes()), epsilon) require.InDelta(t, float64(tallies[0].Bytes()), usage.Storage, epsilon)
require.InDelta(t, usage.SegmentCount, float64(tallies[0].TotalSegmentCount), epsilon) require.InDelta(t, float64(tallies[0].TotalSegmentCount), usage.SegmentCount, epsilon)
require.InDelta(t, usage.ObjectCount, float64(tallies[0].ObjectCount), epsilon) require.InDelta(t, float64(tallies[0].ObjectCount), usage.ObjectCount, epsilon)
require.Equal(t, usage.Egress, expectedEgress) require.Equal(t, expectedEgress, usage.Egress)
require.Equal(t, usage.Since, tallies[0].IntervalStart) require.Equal(t, tallies[0].IntervalStart, usage.Since)
require.Equal(t, usage.Before, tallies[2].IntervalStart) require.Equal(t, tallies[2].IntervalStart, usage.Before)
usage, err = db.ProjectAccounting().GetProjectTotal(ctx, projectID, rollups[0].IntervalStart, rollups[1].IntervalStart) usage, err = db.ProjectAccounting().GetProjectTotal(ctx, projectID, rollups[0].IntervalStart, rollups[1].IntervalStart)
require.NoError(t, err) require.NoError(t, err)
require.Zero(t, usage.Storage) require.Zero(t, usage.Storage)
require.Zero(t, usage.SegmentCount) require.Zero(t, usage.SegmentCount)
require.Zero(t, usage.ObjectCount) require.Zero(t, usage.ObjectCount)
require.Equal(t, usage.Egress, rollups[0].Inline+rollups[0].Settled) require.Equal(t, rollups[0].Inline+rollups[0].Settled, usage.Egress)
require.Equal(t, usage.Since, rollups[0].IntervalStart) require.Equal(t, rollups[0].IntervalStart, usage.Since)
require.Equal(t, usage.Before, rollups[1].IntervalStart) require.Equal(t, rollups[1].IntervalStart, usage.Before)
}, },
) )
} }