storj/satellite/satellitedb/projectaccounting.go
Wilfred Asomani 74b3617813 Revert "satellite/db: fix long loadtime for charges endpoint"
This reverts commit 676178299f.

Reason for revert:
The new query used by this commit performs a full table scan.
It's been reverted pending a fix for that.

Change-Id: Idc53954459aa6f5a692056232b8674b11d1928ce
2023-06-23 09:32:33 +00:00

1213 lines
37 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package satellitedb
import (
"context"
"database/sql"
"errors"
"fmt"
"time"
pgxerrcode "github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5"
"github.com/zeebo/errs"
"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/useragent"
"storj.io/common/uuid"
"storj.io/private/dbutil"
"storj.io/private/dbutil/pgutil"
"storj.io/private/dbutil/pgutil/pgerrcode"
"storj.io/private/dbutil/pgxutil"
"storj.io/private/tagsql"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/satellitedb/dbx"
)
// ensure that ProjectAccounting implements accounting.ProjectAccounting.
var _ accounting.ProjectAccounting = (*ProjectAccounting)(nil)
var allocatedExpirationInDays = 2
// ProjectAccounting implements the accounting/db ProjectAccounting interface.
type ProjectAccounting struct {
db *satelliteDB
}
// SaveTallies saves the latest bucket info.
func (db *ProjectAccounting) SaveTallies(ctx context.Context, intervalStart time.Time, bucketTallies map[metabase.BucketLocation]*accounting.BucketTally) (err error) {
defer mon.Task()(&ctx)(&err)
if len(bucketTallies) == 0 {
return nil
}
var bucketNames, projectIDs [][]byte
var totalBytes, metadataSizes []int64
var totalSegments, objectCounts []int64
for _, info := range bucketTallies {
bucketNames = append(bucketNames, []byte(info.BucketName))
projectIDs = append(projectIDs, info.ProjectID[:])
totalBytes = append(totalBytes, info.TotalBytes)
totalSegments = append(totalSegments, info.TotalSegments)
objectCounts = append(objectCounts, info.ObjectCount)
metadataSizes = append(metadataSizes, info.MetadataSize)
}
_, err = db.db.DB.ExecContext(ctx, db.db.Rebind(`
INSERT INTO bucket_storage_tallies (
interval_start,
bucket_name, project_id,
total_bytes, inline, remote,
total_segments_count, remote_segments_count, inline_segments_count,
object_count, metadata_size)
SELECT
$1,
unnest($2::bytea[]), unnest($3::bytea[]),
unnest($4::int8[]), $5, $6,
unnest($7::int8[]), $8, $9,
unnest($10::int8[]), unnest($11::int8[])`),
intervalStart,
pgutil.ByteaArray(bucketNames), pgutil.ByteaArray(projectIDs),
pgutil.Int8Array(totalBytes), 0, 0,
pgutil.Int8Array(totalSegments), 0, 0,
pgutil.Int8Array(objectCounts), pgutil.Int8Array(metadataSizes))
return Error.Wrap(err)
}
// GetTallies retrieves all tallies ordered by interval start (descending).
func (db *ProjectAccounting) GetTallies(ctx context.Context) (tallies []accounting.BucketTally, err error) {
defer mon.Task()(&ctx)(&err)
dbxTallies, err := db.db.All_BucketStorageTally_OrderBy_Desc_IntervalStart(ctx)
if err != nil {
return nil, Error.Wrap(err)
}
for _, dbxTally := range dbxTallies {
projectID, err := uuid.FromBytes(dbxTally.ProjectId)
if err != nil {
return nil, Error.Wrap(err)
}
totalBytes := dbxTally.TotalBytes
if totalBytes == 0 {
totalBytes = dbxTally.Inline + dbxTally.Remote
}
totalSegments := dbxTally.TotalSegmentsCount
if totalSegments == 0 {
totalSegments = dbxTally.InlineSegmentsCount + dbxTally.RemoteSegmentsCount
}
tallies = append(tallies, accounting.BucketTally{
BucketLocation: metabase.BucketLocation{
ProjectID: projectID,
BucketName: string(dbxTally.BucketName),
},
ObjectCount: int64(dbxTally.ObjectCount),
TotalSegments: int64(totalSegments),
TotalBytes: int64(totalBytes),
MetadataSize: int64(dbxTally.MetadataSize),
})
}
return tallies, nil
}
// CreateStorageTally creates a record in the bucket_storage_tallies accounting table.
func (db *ProjectAccounting) CreateStorageTally(ctx context.Context, tally accounting.BucketStorageTally) (err error) {
defer mon.Task()(&ctx)(&err)
_, err = db.db.DB.ExecContext(ctx, db.db.Rebind(`
INSERT INTO bucket_storage_tallies (
interval_start,
bucket_name, project_id,
total_bytes, inline, remote,
total_segments_count, remote_segments_count, inline_segments_count,
object_count, metadata_size)
VALUES (
?,
?, ?,
?, ?, ?,
?, ?, ?,
?, ?
)`), tally.IntervalStart,
[]byte(tally.BucketName), tally.ProjectID,
tally.TotalBytes, 0, 0,
tally.TotalSegmentCount, 0, 0,
tally.ObjectCount, tally.MetadataSize,
)
return Error.Wrap(err)
}
// GetNonEmptyTallyBucketsInRange returns a list of bucket locations within the given range
// whose most recent tally does not represent empty usage.
func (db *ProjectAccounting) GetNonEmptyTallyBucketsInRange(ctx context.Context, from, to metabase.BucketLocation) (result []metabase.BucketLocation, err error) {
defer mon.Task()(&ctx)(&err)
err = withRows(db.db.QueryContext(ctx, `
SELECT project_id, name
FROM bucket_metainfos bm
WHERE (project_id, name) BETWEEN ($1, $2) AND ($3, $4)
AND NOT 0 IN (
SELECT object_count FROM bucket_storage_tallies
WHERE (project_id, bucket_name) = (bm.project_id, bm.name)
ORDER BY interval_start DESC
LIMIT 1
)
`, from.ProjectID, []byte(from.BucketName), to.ProjectID, []byte(to.BucketName)),
)(func(r tagsql.Rows) error {
for r.Next() {
loc := metabase.BucketLocation{}
if err := r.Scan(&loc.ProjectID, &loc.BucketName); err != nil {
return err
}
result = append(result, loc)
}
return nil
})
if err != nil {
return nil, Error.Wrap(err)
}
return result, nil
}
// GetProjectSettledBandwidthTotal returns the sum of GET bandwidth usage settled for a projectID in the past time frame.
func (db *ProjectAccounting) GetProjectSettledBandwidthTotal(ctx context.Context, projectID uuid.UUID, from time.Time) (_ int64, err error) {
defer mon.Task()(&ctx)(&err)
var sum *int64
query := `SELECT SUM(settled) FROM bucket_bandwidth_rollups WHERE project_id = $1 AND action = $2 AND interval_start >= $3;`
err = db.db.QueryRow(ctx, db.db.Rebind(query), projectID[:], pb.PieceAction_GET, from.UTC()).Scan(&sum)
if errors.Is(err, sql.ErrNoRows) || sum == nil {
return 0, nil
}
return *sum, err
}
// GetProjectBandwidth returns the used bandwidth (settled or allocated) for the specified year, month and day.
func (db *ProjectAccounting) GetProjectBandwidth(ctx context.Context, projectID uuid.UUID, year int, month time.Month, day int, asOfSystemInterval time.Duration) (_ int64, err error) {
defer mon.Task()(&ctx)(&err)
var egress *int64
startOfMonth := time.Date(year, month, 1, 0, 0, 0, 0, time.UTC)
var expiredSince time.Time
if day < allocatedExpirationInDays {
expiredSince = startOfMonth
} else {
expiredSince = time.Date(year, month, day-allocatedExpirationInDays, 0, 0, 0, 0, time.UTC)
}
periodEnd := time.Date(year, month+1, 1, 0, 0, 0, 0, time.UTC)
query := `WITH egress AS (
SELECT
CASE WHEN interval_day < ?
THEN egress_settled
ELSE egress_allocated-egress_dead
END AS amount
FROM project_bandwidth_daily_rollups
WHERE project_id = ? AND interval_day >= ? AND interval_day < ?
) SELECT sum(amount) FROM egress` + db.db.impl.AsOfSystemInterval(asOfSystemInterval)
err = db.db.QueryRow(ctx, db.db.Rebind(query), expiredSince, projectID[:], startOfMonth, periodEnd).Scan(&egress)
if errors.Is(err, sql.ErrNoRows) || egress == nil {
return 0, nil
}
return *egress, err
}
// GetProjectDailyBandwidth returns project bandwidth (allocated and settled) for the specified day.
func (db *ProjectAccounting) GetProjectDailyBandwidth(ctx context.Context, projectID uuid.UUID, year int, month time.Month, day int) (allocated int64, settled, dead int64, err error) {
defer mon.Task()(&ctx)(&err)
interval := time.Date(year, month, day, 0, 0, 0, 0, time.UTC)
query := `SELECT egress_allocated, egress_settled, egress_dead FROM project_bandwidth_daily_rollups WHERE project_id = ? AND interval_day = ?;`
err = db.db.QueryRow(ctx, db.db.Rebind(query), projectID[:], interval).Scan(&allocated, &settled, &dead)
if errors.Is(err, sql.ErrNoRows) {
return 0, 0, 0, nil
}
return allocated, settled, dead, err
}
// GetProjectDailyUsageByDateRange returns project daily allocated, settled bandwidth and storage usage by specific date range.
func (db *ProjectAccounting) GetProjectDailyUsageByDateRange(ctx context.Context, projectID uuid.UUID, from, to time.Time, crdbInterval time.Duration) (_ *accounting.ProjectDailyUsage, err error) {
defer mon.Task()(&ctx)(&err)
now := time.Now()
nowBeginningOfDay := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
fromBeginningOfDay := time.Date(from.Year(), from.Month(), from.Day(), 0, 0, 0, 0, time.UTC)
toEndOfDay := time.Date(to.Year(), to.Month(), to.Day(), 23, 59, 59, 0, time.UTC)
allocatedBandwidth := make([]accounting.ProjectUsageByDay, 0)
settledBandwidth := make([]accounting.ProjectUsageByDay, 0)
storage := make([]accounting.ProjectUsageByDay, 0)
err = pgxutil.Conn(ctx, db.db, func(conn *pgx.Conn) error {
var batch pgx.Batch
expiredSince := nowBeginningOfDay.Add(time.Duration(-allocatedExpirationInDays) * time.Hour * 24)
storageQuery := db.db.Rebind(`
WITH project_usage AS (
SELECT
interval_start,
DATE_TRUNC('day',interval_start) AS interval_day,
project_id,
bucket_name,
total_bytes
FROM bucket_storage_tallies
WHERE project_id = $1 AND
interval_start >= $2 AND
interval_start <= $3
)
-- Sum all buckets usage in the same project.
SELECT
interval_day,
SUM(total_bytes) AS total_bytes
FROM
(SELECT
DISTINCT ON (project_id, bucket_name, interval_day)
project_id,
bucket_name,
total_bytes,
interval_day,
interval_start
FROM project_usage
ORDER BY project_id, bucket_name, interval_day, interval_start DESC) pu
` + db.db.impl.AsOfSystemInterval(crdbInterval) + `
GROUP BY project_id, bucket_name, interval_day
`)
batch.Queue(storageQuery, projectID, fromBeginningOfDay, toEndOfDay)
batch.Queue(db.db.Rebind(`
SELECT interval_day, egress_settled,
CASE WHEN interval_day < $1
THEN egress_settled
ELSE egress_allocated-egress_dead
END AS allocated
FROM project_bandwidth_daily_rollups
WHERE project_id = $2 AND (interval_day BETWEEN $3 AND $4)
`), expiredSince, projectID, fromBeginningOfDay, toEndOfDay)
results := conn.SendBatch(ctx, &batch)
defer func() { err = errs.Combine(err, results.Close()) }()
storageRows, err := results.Query()
if err != nil {
if pgerrcode.FromError(err) == pgxerrcode.InvalidCatalogName {
// this error may happen if database is created in the last 5 minutes (`as of systemtime` points to a time before Genesis).
// in this case we can ignore the database not found error and return with no usage.
// if the database is really missing --> we have more serious problems than getting 0s from here.
return nil
}
return err
}
var current time.Time
var index int
for storageRows.Next() {
var day time.Time
var amount int64
err = storageRows.Scan(&day, &amount)
if err != nil {
storageRows.Close()
return err
}
if len(storage) == 0 {
current = day
storage = append(storage, accounting.ProjectUsageByDay{
Date: day.UTC(),
Value: amount,
})
continue
}
if current == day {
storage[index].Value += amount
continue
}
current = day
index++
storage = append(storage, accounting.ProjectUsageByDay{
Date: day.UTC(),
Value: amount,
})
}
storageRows.Close()
err = storageRows.Err()
if err != nil {
return err
}
bandwidthRows, err := results.Query()
if err != nil {
return err
}
for bandwidthRows.Next() {
var day time.Time
var settled int64
var allocated int64
err = bandwidthRows.Scan(&day, &settled, &allocated)
if err != nil {
bandwidthRows.Close()
return err
}
settledBandwidth = append(settledBandwidth, accounting.ProjectUsageByDay{
Date: day.UTC(),
Value: settled,
})
allocatedBandwidth = append(allocatedBandwidth, accounting.ProjectUsageByDay{
Date: day.UTC(),
Value: allocated,
})
}
bandwidthRows.Close()
err = bandwidthRows.Err()
if err != nil {
return err
}
return nil
})
if err != nil {
return nil, Error.New("unable to get project daily usage: %w", err)
}
return &accounting.ProjectDailyUsage{
StorageUsage: storage,
AllocatedBandwidthUsage: allocatedBandwidth,
SettledBandwidthUsage: settledBandwidth,
}, nil
}
// DeleteProjectBandwidthBefore deletes project bandwidth rollups before the given time.
func (db *ProjectAccounting) DeleteProjectBandwidthBefore(ctx context.Context, before time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
_, err = db.db.DB.ExecContext(ctx, db.db.Rebind("DELETE FROM project_bandwidth_daily_rollups WHERE interval_day < ?"), before)
return err
}
// UpdateProjectUsageLimit updates project usage limit.
func (db *ProjectAccounting) UpdateProjectUsageLimit(ctx context.Context, projectID uuid.UUID, limit memory.Size) (err error) {
defer mon.Task()(&ctx)(&err)
_, err = db.db.Update_Project_By_Id(ctx,
dbx.Project_Id(projectID[:]),
dbx.Project_Update_Fields{
UsageLimit: dbx.Project_UsageLimit(limit.Int64()),
},
)
return err
}
// UpdateProjectBandwidthLimit updates project bandwidth limit.
func (db *ProjectAccounting) UpdateProjectBandwidthLimit(ctx context.Context, projectID uuid.UUID, limit memory.Size) (err error) {
defer mon.Task()(&ctx)(&err)
_, err = db.db.Update_Project_By_Id(ctx,
dbx.Project_Id(projectID[:]),
dbx.Project_Update_Fields{
BandwidthLimit: dbx.Project_BandwidthLimit(limit.Int64()),
},
)
return err
}
// UpdateProjectSegmentLimit updates project segment limit.
func (db *ProjectAccounting) UpdateProjectSegmentLimit(ctx context.Context, projectID uuid.UUID, limit int64) (err error) {
defer mon.Task()(&ctx)(&err)
_, err = db.db.Update_Project_By_Id(ctx,
dbx.Project_Id(projectID[:]),
dbx.Project_Update_Fields{
SegmentLimit: dbx.Project_SegmentLimit(limit),
},
)
return err
}
// GetProjectStorageLimit returns project storage usage limit.
func (db *ProjectAccounting) GetProjectStorageLimit(ctx context.Context, projectID uuid.UUID) (_ *int64, err error) {
defer mon.Task()(&ctx)(&err)
row, err := db.db.Get_Project_UsageLimit_By_Id(ctx,
dbx.Project_Id(projectID[:]),
)
if err != nil {
return nil, err
}
return row.UsageLimit, nil
}
// GetProjectBandwidthLimit returns project bandwidth usage limit.
func (db *ProjectAccounting) GetProjectBandwidthLimit(ctx context.Context, projectID uuid.UUID) (_ *int64, err error) {
defer mon.Task()(&ctx)(&err)
row, err := db.db.Get_Project_BandwidthLimit_By_Id(ctx,
dbx.Project_Id(projectID[:]),
)
if err != nil {
return nil, err
}
return row.BandwidthLimit, nil
}
// GetProjectObjectsSegments returns project objects and segments number.
func (db *ProjectAccounting) GetProjectObjectsSegments(ctx context.Context, projectID uuid.UUID) (objectsSegments accounting.ProjectObjectsSegments, err error) {
defer mon.Task()(&ctx)(&err)
var latestDate time.Time
latestDateRow := db.db.QueryRowContext(ctx, db.db.Rebind(`
SELECT interval_start FROM bucket_storage_tallies bst
WHERE
project_id = ?
AND EXISTS (SELECT 1 FROM bucket_metainfos bm WHERE bm.project_id = bst.project_id)
ORDER BY interval_start DESC
LIMIT 1
`), projectID[:])
if err = latestDateRow.Scan(&latestDate); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return accounting.ProjectObjectsSegments{}, nil
}
return accounting.ProjectObjectsSegments{}, err
}
// calculate total segments and objects count.
storageTalliesRows := db.db.QueryRowContext(ctx, db.db.Rebind(`
SELECT
SUM(total_segments_count),
SUM(object_count)
FROM
bucket_storage_tallies
WHERE
project_id = ? AND
interval_start = ?
`), projectID[:], latestDate)
if err = storageTalliesRows.Scan(&objectsSegments.SegmentCount, &objectsSegments.ObjectCount); err != nil {
return accounting.ProjectObjectsSegments{}, err
}
return objectsSegments, nil
}
// GetProjectSegmentLimit returns project segment limit.
func (db *ProjectAccounting) GetProjectSegmentLimit(ctx context.Context, projectID uuid.UUID) (_ *int64, err error) {
defer mon.Task()(&ctx)(&err)
row, err := db.db.Get_Project_SegmentLimit_By_Id(ctx,
dbx.Project_Id(projectID[:]),
)
if err != nil {
return nil, err
}
return row.SegmentLimit, nil
}
// GetProjectTotal retrieves project usage for a given period.
func (db *ProjectAccounting) GetProjectTotal(ctx context.Context, projectID uuid.UUID, since, before time.Time) (_ *accounting.ProjectUsage, err error) {
defer mon.Task()(&ctx)(&err)
usages, err := db.GetProjectTotalByPartner(ctx, projectID, nil, since, before)
if err != nil {
return nil, err
}
if usage, ok := usages[""]; ok {
return &usage, nil
}
return &accounting.ProjectUsage{Since: since, Before: before}, nil
}
// GetProjectTotalByPartner retrieves project usage for a given period categorized by partner name.
// Unpartnered usage or usage for a partner not present in partnerNames is mapped to the empty string.
func (db *ProjectAccounting) GetProjectTotalByPartner(ctx context.Context, projectID uuid.UUID, partnerNames []string, since, before time.Time) (usages map[string]accounting.ProjectUsage, err error) {
defer mon.Task()(&ctx)(&err)
since = timeTruncateDown(since)
bucketNames, err := db.getBucketsSinceAndBefore(ctx, projectID, since, before)
if err != nil {
return nil, err
}
storageQuery := db.db.Rebind(`
SELECT
bucket_storage_tallies.interval_start,
bucket_storage_tallies.total_bytes,
bucket_storage_tallies.inline,
bucket_storage_tallies.remote,
bucket_storage_tallies.total_segments_count,
bucket_storage_tallies.object_count
FROM
bucket_storage_tallies
WHERE
bucket_storage_tallies.project_id = ? AND
bucket_storage_tallies.bucket_name = ? AND
bucket_storage_tallies.interval_start >= ? AND
bucket_storage_tallies.interval_start < ?
ORDER BY bucket_storage_tallies.interval_start DESC
`)
totalEgressQuery := db.db.Rebind(`
SELECT
COALESCE(SUM(settled) + SUM(inline), 0)
FROM
bucket_bandwidth_rollups
WHERE
project_id = ? AND
bucket_name = ? AND
interval_start >= ? AND
interval_start < ? AND
action = ?;
`)
usages = make(map[string]accounting.ProjectUsage)
for _, bucket := range bucketNames {
userAgentRow, err := db.db.Get_BucketMetainfo_UserAgent_By_ProjectId_And_Name(ctx,
dbx.BucketMetainfo_ProjectId(projectID[:]),
dbx.BucketMetainfo_Name([]byte(bucket)))
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return nil, err
}
var partner string
if userAgentRow != nil && userAgentRow.UserAgent != nil {
entries, err := useragent.ParseEntries(userAgentRow.UserAgent)
if err != nil {
return nil, err
}
if len(entries) != 0 {
for _, iterPartner := range partnerNames {
if entries[0].Product == iterPartner {
partner = iterPartner
break
}
}
}
}
if _, ok := usages[partner]; !ok {
usages[partner] = accounting.ProjectUsage{Since: since, Before: before}
}
usage := usages[partner]
storageTalliesRows, err := db.db.QueryContext(ctx, storageQuery, projectID[:], []byte(bucket), since, before)
if err != nil {
return nil, err
}
var prevTally *accounting.BucketStorageTally
for storageTalliesRows.Next() {
tally := accounting.BucketStorageTally{}
var inline, remote int64
err = storageTalliesRows.Scan(&tally.IntervalStart, &tally.TotalBytes, &inline, &remote, &tally.TotalSegmentCount, &tally.ObjectCount)
if err != nil {
return nil, errs.Combine(err, storageTalliesRows.Close())
}
if tally.TotalBytes == 0 {
tally.TotalBytes = inline + remote
}
if prevTally == nil {
prevTally = &tally
continue
}
hours := prevTally.IntervalStart.Sub(tally.IntervalStart).Hours()
usage.Storage += memory.Size(tally.TotalBytes).Float64() * hours
usage.SegmentCount += float64(tally.TotalSegmentCount) * hours
usage.ObjectCount += float64(tally.ObjectCount) * hours
prevTally = &tally
}
err = errs.Combine(storageTalliesRows.Err(), storageTalliesRows.Close())
if err != nil {
return nil, err
}
totalEgressRow := db.db.QueryRowContext(ctx, totalEgressQuery, projectID[:], []byte(bucket), since, before, pb.PieceAction_GET)
if err != nil {
return nil, err
}
var egress int64
if err = totalEgressRow.Scan(&egress); err != nil {
return nil, err
}
usage.Egress += egress
usages[partner] = usage
}
return usages, nil
}
// GetBucketUsageRollups retrieves summed usage rollups for every bucket of particular project for a given period.
func (db *ProjectAccounting) GetBucketUsageRollups(ctx context.Context, projectID uuid.UUID, since, before time.Time) (_ []accounting.BucketUsageRollup, err error) {
defer mon.Task()(&ctx)(&err)
since = timeTruncateDown(since.UTC())
before = before.UTC()
buckets, err := db.getBucketsSinceAndBefore(ctx, projectID, since, before)
if err != nil {
return nil, err
}
var bucketUsageRollups []accounting.BucketUsageRollup
for _, bucket := range buckets {
bucketRollup, err := db.getSingleBucketRollup(ctx, projectID, bucket, since, before)
if err != nil {
return nil, err
}
bucketUsageRollups = append(bucketUsageRollups, *bucketRollup)
}
return bucketUsageRollups, nil
}
// GetSingleBucketUsageRollup retrieves usage rollup for a single bucket of particular project for a given period.
func (db *ProjectAccounting) GetSingleBucketUsageRollup(ctx context.Context, projectID uuid.UUID, bucket string, since, before time.Time) (_ *accounting.BucketUsageRollup, err error) {
defer mon.Task()(&ctx)(&err)
since = timeTruncateDown(since.UTC())
before = before.UTC()
bucketRollup, err := db.getSingleBucketRollup(ctx, projectID, bucket, since, before)
if err != nil {
return nil, err
}
return bucketRollup, nil
}
func (db *ProjectAccounting) getSingleBucketRollup(ctx context.Context, projectID uuid.UUID, bucket string, since, before time.Time) (*accounting.BucketUsageRollup, error) {
roullupsQuery := db.db.Rebind(`SELECT SUM(settled), SUM(inline), action
FROM bucket_bandwidth_rollups
WHERE project_id = ? AND bucket_name = ? AND interval_start >= ? AND interval_start <= ?
GROUP BY action`)
// TODO: should be optimized
storageQuery := db.db.All_BucketStorageTally_By_ProjectId_And_BucketName_And_IntervalStart_GreaterOrEqual_And_IntervalStart_LessOrEqual_OrderBy_Desc_IntervalStart
bucketRollup := &accounting.BucketUsageRollup{
ProjectID: projectID,
BucketName: bucket,
Since: since,
Before: before,
}
// get bucket_bandwidth_rollup
rollupRows, err := db.db.QueryContext(ctx, roullupsQuery, projectID[:], []byte(bucket), since, before)
if err != nil {
return nil, err
}
defer func() { err = errs.Combine(err, rollupRows.Close()) }()
// fill egress
for rollupRows.Next() {
var action pb.PieceAction
var settled, inline int64
err = rollupRows.Scan(&settled, &inline, &action)
if err != nil {
return nil, err
}
switch action {
case pb.PieceAction_GET:
bucketRollup.GetEgress += memory.Size(settled + inline).GB()
case pb.PieceAction_GET_AUDIT:
bucketRollup.AuditEgress += memory.Size(settled + inline).GB()
case pb.PieceAction_GET_REPAIR:
bucketRollup.RepairEgress += memory.Size(settled + inline).GB()
default:
continue
}
}
if err := rollupRows.Err(); err != nil {
return nil, err
}
bucketStorageTallies, err := storageQuery(ctx,
dbx.BucketStorageTally_ProjectId(projectID[:]),
dbx.BucketStorageTally_BucketName([]byte(bucket)),
dbx.BucketStorageTally_IntervalStart(since),
dbx.BucketStorageTally_IntervalStart(before))
if err != nil {
return nil, err
}
// fill metadata, objects and stored data
// hours calculated from previous tallies,
// so we skip the most recent one
for i := len(bucketStorageTallies) - 1; i > 0; i-- {
current := bucketStorageTallies[i]
hours := bucketStorageTallies[i-1].IntervalStart.Sub(current.IntervalStart).Hours()
if current.TotalBytes > 0 {
bucketRollup.TotalStoredData += memory.Size(current.TotalBytes).GB() * hours
} else {
bucketRollup.TotalStoredData += memory.Size(current.Remote+current.Inline).GB() * hours
}
bucketRollup.MetadataSize += memory.Size(current.MetadataSize).GB() * hours
if current.TotalSegmentsCount > 0 {
bucketRollup.TotalSegments += float64(current.TotalSegmentsCount) * hours
} else {
bucketRollup.TotalSegments += float64(current.RemoteSegmentsCount+current.InlineSegmentsCount) * hours
}
bucketRollup.ObjectCount += float64(current.ObjectCount) * hours
}
return bucketRollup, nil
}
// prefixIncrement returns the lexicographically lowest byte string which is
// greater than origPrefix and does not have origPrefix as a prefix. If no such
// byte string exists (origPrefix is empty, or origPrefix contains only 0xff
// bytes), returns false for ok.
//
// examples: prefixIncrement([]byte("abc")) -> ([]byte("abd", true)
//
// prefixIncrement([]byte("ab\xff\xff")) -> ([]byte("ac", true)
// prefixIncrement([]byte("")) -> (nil, false)
// prefixIncrement([]byte("\x00")) -> ([]byte("\x01", true)
// prefixIncrement([]byte("\xff\xff\xff")) -> (nil, false)
func prefixIncrement(origPrefix []byte) (incremented []byte, ok bool) {
incremented = make([]byte, len(origPrefix))
copy(incremented, origPrefix)
i := len(incremented) - 1
for i >= 0 {
if incremented[i] != 0xff {
incremented[i]++
return incremented[:i+1], true
}
i--
}
// there is no byte string which is greater than origPrefix and which does
// not have origPrefix as a prefix.
return nil, false
}
// prefixMatch creates a SQL expression which
// will evaluate to true if and only if the value of expr starts with the value
// of prefix.
//
// Returns also a slice of arguments that should be passed to the corresponding
// db.Query* or db.Exec* to fill in parameters in the returned SQL expression.
//
// The returned SQL expression needs to be passed through Rebind(), as it uses
// `?` markers instead of `$N`, because we don't know what N we would need to
// use.
func (db *ProjectAccounting) prefixMatch(expr string, prefix []byte) (string, []byte, error) {
incrementedPrefix, ok := prefixIncrement(prefix)
switch db.db.impl {
case dbutil.Postgres:
if !ok {
return fmt.Sprintf(`(%s >= ?)`, expr), nil, nil
}
return fmt.Sprintf(`(%s >= ? AND %s < ?)`, expr, expr), incrementedPrefix, nil
case dbutil.Cockroach:
if !ok {
return fmt.Sprintf(`(%s >= ?:::BYTEA)`, expr), nil, nil
}
return fmt.Sprintf(`(%s >= ?:::BYTEA AND %s < ?:::BYTEA)`, expr, expr), incrementedPrefix, nil
default:
return "", nil, errs.New("unhandled database: %v", db.db.driver)
}
}
// GetBucketTotals retrieves bucket usage totals for period of time.
func (db *ProjectAccounting) GetBucketTotals(ctx context.Context, projectID uuid.UUID, cursor accounting.BucketUsageCursor, before time.Time) (_ *accounting.BucketUsagePage, err error) {
defer mon.Task()(&ctx)(&err)
bucketPrefix := []byte(cursor.Search)
if cursor.Limit > 50 {
cursor.Limit = 50
}
if cursor.Page == 0 {
return nil, errs.New("page can not be 0")
}
page := &accounting.BucketUsagePage{
Search: cursor.Search,
Limit: cursor.Limit,
Offset: uint64((cursor.Page - 1) * cursor.Limit),
}
bucketNameRange, incrPrefix, err := db.prefixMatch("name", bucketPrefix)
if err != nil {
return nil, err
}
countQuery := db.db.Rebind(`SELECT COUNT(name) FROM bucket_metainfos
WHERE project_id = ? AND ` + bucketNameRange)
args := []interface{}{
projectID[:],
bucketPrefix,
}
if incrPrefix != nil {
args = append(args, incrPrefix)
}
countRow := db.db.QueryRowContext(ctx, countQuery, args...)
err = countRow.Scan(&page.TotalCount)
if err != nil {
return nil, err
}
if page.TotalCount == 0 {
return page, nil
}
if page.Offset > page.TotalCount-1 {
return nil, errs.New("page is out of range")
}
bucketsQuery := db.db.Rebind(`SELECT name, created_at FROM bucket_metainfos
WHERE project_id = ? AND ` + bucketNameRange + `ORDER BY name ASC LIMIT ? OFFSET ?`)
args = []interface{}{
projectID[:],
bucketPrefix,
}
if incrPrefix != nil {
args = append(args, incrPrefix)
}
args = append(args, page.Limit, page.Offset)
bucketRows, err := db.db.QueryContext(ctx, bucketsQuery, args...)
if err != nil {
return nil, err
}
defer func() { err = errs.Combine(err, bucketRows.Close()) }()
type bucketWithCreationDate struct {
name string
createdAt time.Time
}
var buckets []bucketWithCreationDate
for bucketRows.Next() {
var bucket string
var createdAt time.Time
err = bucketRows.Scan(&bucket, &createdAt)
if err != nil {
return nil, err
}
buckets = append(buckets, bucketWithCreationDate{
name: bucket,
createdAt: createdAt,
})
}
if err := bucketRows.Err(); err != nil {
return nil, err
}
rollupsQuery := db.db.Rebind(`SELECT COALESCE(SUM(settled) + SUM(inline), 0)
FROM bucket_bandwidth_rollups
WHERE project_id = ? AND bucket_name = ? AND interval_start >= ? AND interval_start <= ? AND action = ?`)
storageQuery := db.db.Rebind(`SELECT total_bytes, inline, remote, object_count, total_segments_count
FROM bucket_storage_tallies
WHERE project_id = ? AND bucket_name = ? AND interval_start >= ? AND interval_start <= ?
ORDER BY interval_start DESC
LIMIT 1`)
var bucketUsages []accounting.BucketUsage
for _, bucket := range buckets {
bucketUsage := accounting.BucketUsage{
ProjectID: projectID,
BucketName: bucket.name,
Since: bucket.createdAt,
Before: before,
}
// get bucket_bandwidth_rollups
rollupRow := db.db.QueryRowContext(ctx, rollupsQuery, projectID[:], []byte(bucket.name), bucket.createdAt, before, pb.PieceAction_GET)
var egress int64
err = rollupRow.Scan(&egress)
if err != nil {
if !errors.Is(err, sql.ErrNoRows) {
return nil, err
}
}
bucketUsage.Egress = memory.Size(egress).GB()
storageRow := db.db.QueryRowContext(ctx, storageQuery, projectID[:], []byte(bucket.name), bucket.createdAt, before)
var tally accounting.BucketStorageTally
var inline, remote int64
err = storageRow.Scan(&tally.TotalBytes, &inline, &remote, &tally.ObjectCount, &tally.TotalSegmentCount)
if err != nil {
if !errors.Is(err, sql.ErrNoRows) {
return nil, err
}
}
if tally.TotalBytes == 0 {
tally.TotalBytes = inline + remote
}
// fill storage and object count
bucketUsage.Storage = memory.Size(tally.Bytes()).GB()
bucketUsage.SegmentCount = tally.TotalSegmentCount
bucketUsage.ObjectCount = tally.ObjectCount
bucketUsages = append(bucketUsages, bucketUsage)
}
page.PageCount = uint(page.TotalCount / uint64(cursor.Limit))
if page.TotalCount%uint64(cursor.Limit) != 0 {
page.PageCount++
}
page.BucketUsages = bucketUsages
page.CurrentPage = cursor.Page
return page, nil
}
// ArchiveRollupsBefore archives rollups older than a given time.
func (db *ProjectAccounting) ArchiveRollupsBefore(ctx context.Context, before time.Time, batchSize int) (archivedCount int, err error) {
defer mon.Task()(&ctx)(&err)
if batchSize <= 0 {
return 0, nil
}
switch db.db.impl {
case dbutil.Cockroach:
// We operate one action at a time, because we have an index on `(action, interval_start, project_id)`.
for action := range pb.PieceAction_name {
count, err := db.archiveRollupsBeforeByAction(ctx, action, before, batchSize)
archivedCount += count
if err != nil {
return archivedCount, Error.Wrap(err)
}
}
return archivedCount, nil
case dbutil.Postgres:
err := db.db.DB.QueryRow(ctx, `
WITH rollups_to_move AS (
DELETE FROM bucket_bandwidth_rollups
WHERE interval_start <= $1
RETURNING *
), moved_rollups AS (
INSERT INTO bucket_bandwidth_rollup_archives(bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled)
SELECT bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled FROM rollups_to_move
RETURNING *
)
SELECT count(*) FROM moved_rollups
`, before).Scan(&archivedCount)
return archivedCount, Error.Wrap(err)
default:
return 0, nil
}
}
func (db *ProjectAccounting) archiveRollupsBeforeByAction(ctx context.Context, action int32, before time.Time, batchSize int) (archivedCount int, err error) {
defer mon.Task()(&ctx)(&err)
for {
var rowCount int
err := db.db.QueryRow(ctx, `
WITH rollups_to_move AS (
DELETE FROM bucket_bandwidth_rollups
WHERE action = $1 AND interval_start <= $2
LIMIT $3 RETURNING *
), moved_rollups AS (
INSERT INTO bucket_bandwidth_rollup_archives(bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled)
SELECT bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled FROM rollups_to_move
RETURNING *
)
SELECT count(*) FROM moved_rollups
`, int(action), before, batchSize).Scan(&rowCount)
if err != nil {
return archivedCount, Error.Wrap(err)
}
archivedCount += rowCount
if rowCount < batchSize {
return archivedCount, nil
}
}
}
// getBucketsSinceAndBefore lists distinct bucket names for a project within a specific timeframe.
func (db *ProjectAccounting) getBucketsSinceAndBefore(ctx context.Context, projectID uuid.UUID, since, before time.Time) (buckets []string, err error) {
defer mon.Task()(&ctx)(&err)
queryFormat := `SELECT DISTINCT bucket_name
FROM %s
WHERE project_id = ?
AND interval_start >= ?
AND interval_start < ?`
bucketMap := make(map[string]struct{})
for _, tableName := range []string{"bucket_storage_tallies", "bucket_bandwidth_rollups"} {
query := db.db.Rebind(fmt.Sprintf(queryFormat, tableName))
rows, err := db.db.QueryContext(ctx, query, projectID[:], since, before)
if err != nil {
return nil, err
}
for rows.Next() {
var bucket string
err = rows.Scan(&bucket)
if err != nil {
return nil, errs.Combine(err, rows.Close())
}
bucketMap[bucket] = struct{}{}
}
err = errs.Combine(rows.Err(), rows.Close())
if err != nil {
return nil, err
}
}
for bucket := range bucketMap {
buckets = append(buckets, bucket)
}
return buckets, nil
}
// timeTruncateDown truncates down to the hour before to be in sync with orders endpoint.
func timeTruncateDown(t time.Time) time.Time {
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 0, t.Location())
}
// GetProjectLimits returns current project limit for both storage and bandwidth.
func (db *ProjectAccounting) GetProjectLimits(ctx context.Context, projectID uuid.UUID) (_ accounting.ProjectLimits, err error) {
defer mon.Task()(&ctx)(&err)
row, err := db.db.Get_Project_BandwidthLimit_Project_UsageLimit_Project_SegmentLimit_Project_RateLimit_Project_BurstLimit_By_Id(ctx,
dbx.Project_Id(projectID[:]),
)
if err != nil {
return accounting.ProjectLimits{}, err
}
return accounting.ProjectLimits{
Usage: row.UsageLimit,
Bandwidth: row.BandwidthLimit,
Segments: row.SegmentLimit,
RateLimit: row.RateLimit,
BurstLimit: row.BurstLimit,
}, nil
}
// GetRollupsSince retrieves all archived rollup records since a given time.
func (db *ProjectAccounting) GetRollupsSince(ctx context.Context, since time.Time) (bwRollups []orders.BucketBandwidthRollup, err error) {
defer mon.Task()(&ctx)(&err)
pageLimit := db.db.opts.ReadRollupBatchSize
if pageLimit <= 0 {
pageLimit = 10000
}
var cursor *dbx.Paged_BucketBandwidthRollup_By_IntervalStart_GreaterOrEqual_Continuation
for {
dbxRollups, next, err := db.db.Paged_BucketBandwidthRollup_By_IntervalStart_GreaterOrEqual(ctx,
dbx.BucketBandwidthRollup_IntervalStart(since),
pageLimit, cursor)
if err != nil {
return nil, Error.Wrap(err)
}
cursor = next
for _, dbxRollup := range dbxRollups {
projectID, err := uuid.FromBytes(dbxRollup.ProjectId)
if err != nil {
return nil, err
}
bwRollups = append(bwRollups, orders.BucketBandwidthRollup{
ProjectID: projectID,
BucketName: string(dbxRollup.BucketName),
Action: pb.PieceAction(dbxRollup.Action),
Inline: int64(dbxRollup.Inline),
Allocated: int64(dbxRollup.Allocated),
Settled: int64(dbxRollup.Settled),
})
}
if cursor == nil {
return bwRollups, nil
}
}
}
// GetArchivedRollupsSince retrieves all archived rollup records since a given time.
func (db *ProjectAccounting) GetArchivedRollupsSince(ctx context.Context, since time.Time) (bwRollups []orders.BucketBandwidthRollup, err error) {
defer mon.Task()(&ctx)(&err)
pageLimit := db.db.opts.ReadRollupBatchSize
if pageLimit <= 0 {
pageLimit = 10000
}
var cursor *dbx.Paged_BucketBandwidthRollupArchive_By_IntervalStart_GreaterOrEqual_Continuation
for {
dbxRollups, next, err := db.db.Paged_BucketBandwidthRollupArchive_By_IntervalStart_GreaterOrEqual(ctx,
dbx.BucketBandwidthRollupArchive_IntervalStart(since),
pageLimit, cursor)
if err != nil {
return nil, Error.Wrap(err)
}
cursor = next
for _, dbxRollup := range dbxRollups {
projectID, err := uuid.FromBytes(dbxRollup.ProjectId)
if err != nil {
return nil, err
}
bwRollups = append(bwRollups, orders.BucketBandwidthRollup{
ProjectID: projectID,
BucketName: string(dbxRollup.BucketName),
Action: pb.PieceAction(dbxRollup.Action),
Inline: int64(dbxRollup.Inline),
Allocated: int64(dbxRollup.Allocated),
Settled: int64(dbxRollup.Settled),
})
}
if cursor == nil {
return bwRollups, nil
}
}
}