Revert "satellite/db: optimize project usage query"

This reverts commit 31ec421299.

This change made the usages endpoint slower for accounts with large
number of projects.

Change-Id: I95870e95c2bf3bc3050087532fd0d20cbb50748b
This commit is contained in:
Wilfred Asomani 2023-09-07 16:39:36 +00:00 committed by Storj Robot
parent df037564d7
commit 754bf5f8af
2 changed files with 85 additions and 138 deletions

View File

@ -568,70 +568,13 @@ func (db *ProjectAccounting) GetProjectTotal(ctx context.Context, projectID uuid
func (db *ProjectAccounting) GetProjectTotalByPartner(ctx context.Context, projectID uuid.UUID, partnerNames []string, since, before time.Time) (usages map[string]accounting.ProjectUsage, err error) {
defer mon.Task()(&ctx)(&err)
since = timeTruncateDown(since)
var buckets []struct{ Name, UserAgent []byte }
q := db.db.Rebind(`
SELECT buckets.bucket_name,
m.user_agent
FROM (SELECT bucket_name
FROM bucket_bandwidth_rollups
WHERE project_id = $1
UNION
SELECT bucket_name
FROM bucket_storage_tallies
WHERE project_id = $1) AS buckets
LEFT JOIN bucket_metainfos AS m ON
m.project_id = $1 AND
m.name = buckets.bucket_name
`)
rows, err := db.db.QueryContext(ctx, q, projectID[:])
bucketNames, err := db.getBucketsSinceAndBefore(ctx, projectID, since, before)
if err != nil {
return nil, err
}
var bucketNames [][]byte
for rows.Next() {
var bucket struct{ Name, UserAgent []byte }
err = rows.Scan(&bucket.Name, &bucket.UserAgent)
if err != nil {
return nil, errs.Combine(err, rows.Close())
}
buckets = append(buckets, bucket)
bucketNames = append(bucketNames, bucket.Name)
}
err = errs.Combine(rows.Err(), rows.Close())
if err != nil {
return nil, err
}
bucketNameToPartner := make(map[string]string, len(buckets))
for _, bucket := range buckets {
if len(bucket.UserAgent) == 0 {
bucketNameToPartner[string(bucket.Name)] = ""
continue
}
entries, err := useragent.ParseEntries(bucket.UserAgent)
if err != nil {
return nil, err
}
partner := ""
if len(entries) != 0 {
for _, partnerName := range partnerNames {
if entries[0].Product == partnerName {
partner = partnerName
break
}
}
}
bucketNameToPartner[string(bucket.Name)] = partner
}
storageQuery := db.db.Rebind(`
SELECT
bucket_storage_tallies.bucket_name,
bucket_storage_tallies.interval_start,
bucket_storage_tallies.total_bytes,
bucket_storage_tallies.inline,
@ -641,103 +584,107 @@ func (db *ProjectAccounting) GetProjectTotalByPartner(ctx context.Context, proje
FROM
bucket_storage_tallies
WHERE
bucket_storage_tallies.bucket_name = ANY(?) AND
bucket_storage_tallies.project_id = ? AND
bucket_storage_tallies.bucket_name = ? AND
bucket_storage_tallies.interval_start >= ? AND
bucket_storage_tallies.interval_start < ?
ORDER BY bucket_storage_tallies.interval_start DESC
`)
storageTalliesRows, err := db.db.QueryContext(ctx, storageQuery, pgutil.ByteaArray(bucketNames), projectID[:], since, before)
if err != nil {
return nil, err
}
usages = make(map[string]accounting.ProjectUsage)
previousBucketTally := make(map[string]*accounting.BucketStorageTally)
for storageTalliesRows.Next() {
tally := accounting.BucketStorageTally{}
var inline, remote int64
var bucket string
err = storageTalliesRows.Scan(&bucket, &tally.IntervalStart, &tally.TotalBytes, &inline, &remote, &tally.TotalSegmentCount, &tally.ObjectCount)
if err != nil {
return nil, errs.Combine(err, storageTalliesRows.Close())
}
if tally.TotalBytes == 0 {
tally.TotalBytes = inline + remote
}
if previousBucketTally[bucket] == nil {
previousBucketTally[bucket] = &tally
continue
}
prevTally := previousBucketTally[bucket]
partner := bucketNameToPartner[bucket]
if _, ok := usages[partner]; !ok {
usages[partner] = accounting.ProjectUsage{Since: since, Before: before}
}
usage := usages[partner]
hours := prevTally.IntervalStart.Sub(tally.IntervalStart).Hours()
usage.Storage += memory.Size(tally.TotalBytes).Float64() * hours
usage.SegmentCount += float64(tally.TotalSegmentCount) * hours
usage.ObjectCount += float64(tally.ObjectCount) * hours
usages[partner] = usage
previousBucketTally[bucket] = &tally
}
err = errs.Combine(storageTalliesRows.Err(), storageTalliesRows.Close())
if err != nil {
return nil, err
}
totalEgressQuery := db.db.Rebind(`
SELECT
bucket_name,
COALESCE(SUM(settled) + SUM(inline), 0)
FROM
bucket_bandwidth_rollups
WHERE
project_id = ? AND
bucket_name = ? AND
interval_start >= ? AND
interval_start < ? AND
action = ?
Group BY bucket_name;
action = ?;
`)
totalEgressRow, err := db.db.QueryContext(ctx, totalEgressQuery, projectID[:], since, before, pb.PieceAction_GET)
if err != nil {
return nil, err
}
usages = make(map[string]accounting.ProjectUsage)
for totalEgressRow.Next() {
var egress int64
var bucket string
if err = totalEgressRow.Scan(&bucket, &egress); err != nil {
for _, bucket := range bucketNames {
userAgentRow, err := db.db.Get_BucketMetainfo_UserAgent_By_ProjectId_And_Name(ctx,
dbx.BucketMetainfo_ProjectId(projectID[:]),
dbx.BucketMetainfo_Name([]byte(bucket)))
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return nil, err
}
partner := bucketNameToPartner[bucket]
var partner string
if userAgentRow != nil && userAgentRow.UserAgent != nil {
entries, err := useragent.ParseEntries(userAgentRow.UserAgent)
if err != nil {
return nil, err
}
if len(entries) != 0 {
for _, iterPartner := range partnerNames {
if entries[0].Product == iterPartner {
partner = iterPartner
break
}
}
}
}
if _, ok := usages[partner]; !ok {
usages[partner] = accounting.ProjectUsage{Since: since, Before: before}
}
usage := usages[partner]
storageTalliesRows, err := db.db.QueryContext(ctx, storageQuery, projectID[:], []byte(bucket), since, before)
if err != nil {
return nil, err
}
var prevTally *accounting.BucketStorageTally
for storageTalliesRows.Next() {
tally := accounting.BucketStorageTally{}
var inline, remote int64
err = storageTalliesRows.Scan(&tally.IntervalStart, &tally.TotalBytes, &inline, &remote, &tally.TotalSegmentCount, &tally.ObjectCount)
if err != nil {
return nil, errs.Combine(err, storageTalliesRows.Close())
}
if tally.TotalBytes == 0 {
tally.TotalBytes = inline + remote
}
if prevTally == nil {
prevTally = &tally
continue
}
hours := prevTally.IntervalStart.Sub(tally.IntervalStart).Hours()
usage.Storage += memory.Size(tally.TotalBytes).Float64() * hours
usage.SegmentCount += float64(tally.TotalSegmentCount) * hours
usage.ObjectCount += float64(tally.ObjectCount) * hours
prevTally = &tally
}
err = errs.Combine(storageTalliesRows.Err(), storageTalliesRows.Close())
if err != nil {
return nil, err
}
totalEgressRow := db.db.QueryRowContext(ctx, totalEgressQuery, projectID[:], []byte(bucket), since, before, pb.PieceAction_GET)
if err != nil {
return nil, err
}
var egress int64
if err = totalEgressRow.Scan(&egress); err != nil {
return nil, err
}
usage.Egress += egress
usages[partner] = usage
}
err = errs.Combine(totalEgressRow.Err(), totalEgressRow.Close())
if err != nil {
return nil, err
}
return usages, nil
}

View File

@ -207,31 +207,31 @@ func Test_GetProjectTotal(t *testing.T) {
require.NoError(t, err)
const epsilon = 1e-8
require.InDelta(t, float64(tallies[0].Bytes()+tallies[1].Bytes()), usage.Storage, epsilon)
require.InDelta(t, float64(tallies[0].TotalSegmentCount+tallies[1].TotalSegmentCount), usage.SegmentCount, epsilon)
require.InDelta(t, float64(tallies[0].ObjectCount+tallies[1].ObjectCount), usage.ObjectCount, epsilon)
require.Equal(t, expectedEgress, usage.Egress)
require.Equal(t, tallies[0].IntervalStart, usage.Since)
require.Equal(t, tallies[2].IntervalStart.Add(time.Minute), usage.Before)
require.InDelta(t, usage.Storage, float64(tallies[0].Bytes()+tallies[1].Bytes()), epsilon)
require.InDelta(t, usage.SegmentCount, float64(tallies[0].TotalSegmentCount+tallies[1].TotalSegmentCount), epsilon)
require.InDelta(t, usage.ObjectCount, float64(tallies[0].ObjectCount+tallies[1].ObjectCount), epsilon)
require.Equal(t, usage.Egress, expectedEgress)
require.Equal(t, usage.Since, tallies[0].IntervalStart)
require.Equal(t, usage.Before, tallies[2].IntervalStart.Add(time.Minute))
// Ensure that GetProjectTotal treats the 'before' arg as exclusive
usage, err = db.ProjectAccounting().GetProjectTotal(ctx, projectID, tallies[0].IntervalStart, tallies[2].IntervalStart)
require.NoError(t, err)
require.InDelta(t, float64(tallies[0].Bytes()), usage.Storage, epsilon)
require.InDelta(t, float64(tallies[0].TotalSegmentCount), usage.SegmentCount, epsilon)
require.InDelta(t, float64(tallies[0].ObjectCount), usage.ObjectCount, epsilon)
require.Equal(t, expectedEgress, usage.Egress)
require.Equal(t, tallies[0].IntervalStart, usage.Since)
require.Equal(t, tallies[2].IntervalStart, usage.Before)
require.InDelta(t, usage.Storage, float64(tallies[0].Bytes()), epsilon)
require.InDelta(t, usage.SegmentCount, float64(tallies[0].TotalSegmentCount), epsilon)
require.InDelta(t, usage.ObjectCount, float64(tallies[0].ObjectCount), epsilon)
require.Equal(t, usage.Egress, expectedEgress)
require.Equal(t, usage.Since, tallies[0].IntervalStart)
require.Equal(t, usage.Before, tallies[2].IntervalStart)
usage, err = db.ProjectAccounting().GetProjectTotal(ctx, projectID, rollups[0].IntervalStart, rollups[1].IntervalStart)
require.NoError(t, err)
require.Zero(t, usage.Storage)
require.Zero(t, usage.SegmentCount)
require.Zero(t, usage.ObjectCount)
require.Equal(t, rollups[0].Inline+rollups[0].Settled, usage.Egress)
require.Equal(t, rollups[0].IntervalStart, usage.Since)
require.Equal(t, rollups[1].IntervalStart, usage.Before)
require.Equal(t, usage.Egress, rollups[0].Inline+rollups[0].Settled)
require.Equal(t, usage.Since, rollups[0].IntervalStart)
require.Equal(t, usage.Before, rollups[1].IntervalStart)
},
)
}