Revert "satellite/db: fix long loadtime for charges endpoint"
This reverts commit 676178299f
.
Reason for revert:
The new query used by this commit performs a full table scan.
It's been reverted pending a fix for that.
Change-Id: Idc53954459aa6f5a692056232b8674b11d1928ce
This commit is contained in:
parent
e06b94dcbc
commit
0ad544731d
@ -549,67 +549,55 @@ func (db *ProjectAccounting) GetProjectTotal(ctx context.Context, projectID uuid
|
|||||||
func (db *ProjectAccounting) GetProjectTotalByPartner(ctx context.Context, projectID uuid.UUID, partnerNames []string, since, before time.Time) (usages map[string]accounting.ProjectUsage, err error) {
|
func (db *ProjectAccounting) GetProjectTotalByPartner(ctx context.Context, projectID uuid.UUID, partnerNames []string, since, before time.Time) (usages map[string]accounting.ProjectUsage, err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
since = timeTruncateDown(since)
|
since = timeTruncateDown(since)
|
||||||
|
bucketNames, err := db.getBucketsSinceAndBefore(ctx, projectID, since, before)
|
||||||
storageQuery := db.db.Rebind(`
|
|
||||||
SELECT * FROM (
|
|
||||||
SELECT
|
|
||||||
COALESCE(t.bucket_name, rollups.bucket_name) AS bucket_name,
|
|
||||||
COALESCE(t.interval_start, rollups.interval_start) AS interval_start,
|
|
||||||
COALESCE(t.total_bytes, 0) AS total_bytes,
|
|
||||||
COALESCE(t.inline, 0) AS inline,
|
|
||||||
COALESCE(t.remote, 0) AS remote,
|
|
||||||
COALESCE(t.total_segments_count, 0) AS total_segments_count,
|
|
||||||
COALESCE(t.object_count, 0) AS object_count,
|
|
||||||
m.user_agent,
|
|
||||||
COALESCE(rollups.egress, 0) AS egress
|
|
||||||
FROM
|
|
||||||
bucket_storage_tallies AS t
|
|
||||||
FULL OUTER JOIN (
|
|
||||||
SELECT
|
|
||||||
bucket_name,
|
|
||||||
SUM(settled + inline) AS egress,
|
|
||||||
MIN(interval_start) AS interval_start
|
|
||||||
FROM
|
|
||||||
bucket_bandwidth_rollups
|
|
||||||
WHERE
|
|
||||||
project_id = $1 AND
|
|
||||||
interval_start >= $2 AND
|
|
||||||
interval_start < $3 AND
|
|
||||||
action = $4
|
|
||||||
GROUP BY
|
|
||||||
bucket_name
|
|
||||||
) AS rollups ON
|
|
||||||
t.bucket_name = rollups.bucket_name
|
|
||||||
LEFT JOIN bucket_metainfos AS m ON
|
|
||||||
m.project_id = $1 AND
|
|
||||||
m.name = COALESCE(t.bucket_name, rollups.bucket_name)
|
|
||||||
WHERE
|
|
||||||
(t.project_id IS NULL OR t.project_id = $1) AND
|
|
||||||
COALESCE(t.interval_start, rollups.interval_start) >= $2 AND
|
|
||||||
COALESCE(t.interval_start, rollups.interval_start) < $3
|
|
||||||
) AS q` + db.db.impl.AsOfSystemInterval(-10) + ` ORDER BY bucket_name, interval_start DESC`)
|
|
||||||
|
|
||||||
usages = make(map[string]accounting.ProjectUsage)
|
|
||||||
|
|
||||||
storageTalliesRows, err := db.db.QueryContext(ctx, storageQuery, projectID[:], since, before, pb.PieceAction_GET)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
var prevTallyForBucket = make(map[string]*accounting.BucketStorageTally)
|
|
||||||
var recentBucket string
|
|
||||||
|
|
||||||
for storageTalliesRows.Next() {
|
storageQuery := db.db.Rebind(`
|
||||||
tally := accounting.BucketStorageTally{}
|
SELECT
|
||||||
var userAgent []byte
|
bucket_storage_tallies.interval_start,
|
||||||
var inline, remote, egress int64
|
bucket_storage_tallies.total_bytes,
|
||||||
err = storageTalliesRows.Scan(&tally.BucketName, &tally.IntervalStart, &tally.TotalBytes, &inline, &remote, &tally.TotalSegmentCount, &tally.ObjectCount, &userAgent, &egress)
|
bucket_storage_tallies.inline,
|
||||||
if err != nil {
|
bucket_storage_tallies.remote,
|
||||||
return nil, errs.Combine(err, storageTalliesRows.Close())
|
bucket_storage_tallies.total_segments_count,
|
||||||
|
bucket_storage_tallies.object_count
|
||||||
|
FROM
|
||||||
|
bucket_storage_tallies
|
||||||
|
WHERE
|
||||||
|
bucket_storage_tallies.project_id = ? AND
|
||||||
|
bucket_storage_tallies.bucket_name = ? AND
|
||||||
|
bucket_storage_tallies.interval_start >= ? AND
|
||||||
|
bucket_storage_tallies.interval_start < ?
|
||||||
|
ORDER BY bucket_storage_tallies.interval_start DESC
|
||||||
|
`)
|
||||||
|
|
||||||
|
totalEgressQuery := db.db.Rebind(`
|
||||||
|
SELECT
|
||||||
|
COALESCE(SUM(settled) + SUM(inline), 0)
|
||||||
|
FROM
|
||||||
|
bucket_bandwidth_rollups
|
||||||
|
WHERE
|
||||||
|
project_id = ? AND
|
||||||
|
bucket_name = ? AND
|
||||||
|
interval_start >= ? AND
|
||||||
|
interval_start < ? AND
|
||||||
|
action = ?;
|
||||||
|
`)
|
||||||
|
|
||||||
|
usages = make(map[string]accounting.ProjectUsage)
|
||||||
|
|
||||||
|
for _, bucket := range bucketNames {
|
||||||
|
userAgentRow, err := db.db.Get_BucketMetainfo_UserAgent_By_ProjectId_And_Name(ctx,
|
||||||
|
dbx.BucketMetainfo_ProjectId(projectID[:]),
|
||||||
|
dbx.BucketMetainfo_Name([]byte(bucket)))
|
||||||
|
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var partner string
|
var partner string
|
||||||
if userAgent != nil {
|
if userAgentRow != nil && userAgentRow.UserAgent != nil {
|
||||||
entries, err := useragent.ParseEntries(userAgent)
|
entries, err := useragent.ParseEntries(userAgentRow.UserAgent)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -623,40 +611,59 @@ func (db *ProjectAccounting) GetProjectTotalByPartner(ctx context.Context, proje
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, ok := usages[partner]; !ok {
|
if _, ok := usages[partner]; !ok {
|
||||||
usages[partner] = accounting.ProjectUsage{Since: since, Before: before}
|
usages[partner] = accounting.ProjectUsage{Since: since, Before: before}
|
||||||
}
|
}
|
||||||
usage := usages[partner]
|
usage := usages[partner]
|
||||||
|
|
||||||
if tally.TotalBytes == 0 {
|
storageTalliesRows, err := db.db.QueryContext(ctx, storageQuery, projectID[:], []byte(bucket), since, before)
|
||||||
tally.TotalBytes = inline + remote
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if tally.BucketName != recentBucket {
|
var prevTally *accounting.BucketStorageTally
|
||||||
usage.Egress += egress
|
for storageTalliesRows.Next() {
|
||||||
recentBucket = tally.BucketName
|
tally := accounting.BucketStorageTally{}
|
||||||
|
|
||||||
|
var inline, remote int64
|
||||||
|
err = storageTalliesRows.Scan(&tally.IntervalStart, &tally.TotalBytes, &inline, &remote, &tally.TotalSegmentCount, &tally.ObjectCount)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errs.Combine(err, storageTalliesRows.Close())
|
||||||
|
}
|
||||||
|
if tally.TotalBytes == 0 {
|
||||||
|
tally.TotalBytes = inline + remote
|
||||||
|
}
|
||||||
|
|
||||||
|
if prevTally == nil {
|
||||||
|
prevTally = &tally
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
hours := prevTally.IntervalStart.Sub(tally.IntervalStart).Hours()
|
||||||
|
usage.Storage += memory.Size(tally.TotalBytes).Float64() * hours
|
||||||
|
usage.SegmentCount += float64(tally.TotalSegmentCount) * hours
|
||||||
|
usage.ObjectCount += float64(tally.ObjectCount) * hours
|
||||||
|
|
||||||
|
prevTally = &tally
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, ok := prevTallyForBucket[tally.BucketName]; !ok {
|
err = errs.Combine(storageTalliesRows.Err(), storageTalliesRows.Close())
|
||||||
prevTallyForBucket[tally.BucketName] = &tally
|
if err != nil {
|
||||||
usages[partner] = usage
|
return nil, err
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
|
||||||
hours := prevTallyForBucket[tally.BucketName].IntervalStart.Sub(tally.IntervalStart).Hours()
|
totalEgressRow := db.db.QueryRowContext(ctx, totalEgressQuery, projectID[:], []byte(bucket), since, before, pb.PieceAction_GET)
|
||||||
usage.Storage += memory.Size(tally.TotalBytes).Float64() * hours
|
if err != nil {
|
||||||
usage.SegmentCount += float64(tally.TotalSegmentCount) * hours
|
return nil, err
|
||||||
usage.ObjectCount += float64(tally.ObjectCount) * hours
|
}
|
||||||
|
|
||||||
|
var egress int64
|
||||||
|
if err = totalEgressRow.Scan(&egress); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
usage.Egress += egress
|
||||||
|
|
||||||
usages[partner] = usage
|
usages[partner] = usage
|
||||||
|
|
||||||
prevTallyForBucket[tally.BucketName] = &tally
|
|
||||||
}
|
|
||||||
|
|
||||||
err = errs.Combine(storageTalliesRows.Err(), storageTalliesRows.Close())
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return usages, nil
|
return usages, nil
|
||||||
|
@ -207,31 +207,31 @@ func Test_GetProjectTotal(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
const epsilon = 1e-8
|
const epsilon = 1e-8
|
||||||
require.InDelta(t, float64(tallies[0].Bytes()+tallies[1].Bytes()), usage.Storage, epsilon)
|
require.InDelta(t, usage.Storage, float64(tallies[0].Bytes()+tallies[1].Bytes()), epsilon)
|
||||||
require.InDelta(t, float64(tallies[0].TotalSegmentCount+tallies[1].TotalSegmentCount), usage.SegmentCount, epsilon)
|
require.InDelta(t, usage.SegmentCount, float64(tallies[0].TotalSegmentCount+tallies[1].TotalSegmentCount), epsilon)
|
||||||
require.InDelta(t, float64(tallies[0].ObjectCount+tallies[1].ObjectCount), usage.ObjectCount, epsilon)
|
require.InDelta(t, usage.ObjectCount, float64(tallies[0].ObjectCount+tallies[1].ObjectCount), epsilon)
|
||||||
require.Equal(t, expectedEgress, usage.Egress)
|
require.Equal(t, usage.Egress, expectedEgress)
|
||||||
require.Equal(t, tallies[0].IntervalStart, usage.Since)
|
require.Equal(t, usage.Since, tallies[0].IntervalStart)
|
||||||
require.Equal(t, tallies[2].IntervalStart.Add(time.Minute), usage.Before)
|
require.Equal(t, usage.Before, tallies[2].IntervalStart.Add(time.Minute))
|
||||||
|
|
||||||
// Ensure that GetProjectTotal treats the 'before' arg as exclusive
|
// Ensure that GetProjectTotal treats the 'before' arg as exclusive
|
||||||
usage, err = db.ProjectAccounting().GetProjectTotal(ctx, projectID, tallies[0].IntervalStart, tallies[2].IntervalStart)
|
usage, err = db.ProjectAccounting().GetProjectTotal(ctx, projectID, tallies[0].IntervalStart, tallies[2].IntervalStart)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.InDelta(t, float64(tallies[0].Bytes()), usage.Storage, epsilon)
|
require.InDelta(t, usage.Storage, float64(tallies[0].Bytes()), epsilon)
|
||||||
require.InDelta(t, float64(tallies[0].TotalSegmentCount), usage.SegmentCount, epsilon)
|
require.InDelta(t, usage.SegmentCount, float64(tallies[0].TotalSegmentCount), epsilon)
|
||||||
require.InDelta(t, float64(tallies[0].ObjectCount), usage.ObjectCount, epsilon)
|
require.InDelta(t, usage.ObjectCount, float64(tallies[0].ObjectCount), epsilon)
|
||||||
require.Equal(t, expectedEgress, usage.Egress)
|
require.Equal(t, usage.Egress, expectedEgress)
|
||||||
require.Equal(t, tallies[0].IntervalStart, usage.Since)
|
require.Equal(t, usage.Since, tallies[0].IntervalStart)
|
||||||
require.Equal(t, tallies[2].IntervalStart, usage.Before)
|
require.Equal(t, usage.Before, tallies[2].IntervalStart)
|
||||||
|
|
||||||
usage, err = db.ProjectAccounting().GetProjectTotal(ctx, projectID, rollups[0].IntervalStart, rollups[1].IntervalStart)
|
usage, err = db.ProjectAccounting().GetProjectTotal(ctx, projectID, rollups[0].IntervalStart, rollups[1].IntervalStart)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Zero(t, usage.Storage)
|
require.Zero(t, usage.Storage)
|
||||||
require.Zero(t, usage.SegmentCount)
|
require.Zero(t, usage.SegmentCount)
|
||||||
require.Zero(t, usage.ObjectCount)
|
require.Zero(t, usage.ObjectCount)
|
||||||
require.Equal(t, rollups[0].Inline+rollups[0].Settled, usage.Egress)
|
require.Equal(t, usage.Egress, rollups[0].Inline+rollups[0].Settled)
|
||||||
require.Equal(t, rollups[0].IntervalStart, usage.Since)
|
require.Equal(t, usage.Since, rollups[0].IntervalStart)
|
||||||
require.Equal(t, rollups[1].IntervalStart, usage.Before)
|
require.Equal(t, usage.Before, rollups[1].IntervalStart)
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user