2019-05-10 20:05:42 +01:00
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package satellitedb
import (
"context"
"database/sql"
2020-07-14 14:04:38 +01:00
"errors"
2019-11-28 18:45:31 +00:00
"fmt"
2019-05-10 20:05:42 +01:00
"time"
2019-11-15 14:27:44 +00:00
"github.com/zeebo/errs"
2019-05-10 20:05:42 +01:00
2019-12-27 11:48:47 +00:00
"storj.io/common/memory"
"storj.io/common/pb"
2020-03-30 10:08:50 +01:00
"storj.io/common/uuid"
2021-04-23 10:52:40 +01:00
"storj.io/private/dbutil"
"storj.io/private/dbutil/pgutil"
2019-07-28 06:55:36 +01:00
"storj.io/storj/satellite/accounting"
2021-04-21 13:42:57 +01:00
"storj.io/storj/satellite/metabase"
2020-11-30 19:34:42 +00:00
"storj.io/storj/satellite/orders"
2020-01-15 02:29:51 +00:00
"storj.io/storj/satellite/satellitedb/dbx"
2019-05-10 20:05:42 +01:00
)
2019-11-28 18:45:31 +00:00
// ensure that ProjectAccounting implements accounting.ProjectAccounting.
var _ accounting . ProjectAccounting = ( * ProjectAccounting ) ( nil )
2021-05-25 14:12:01 +01:00
var allocatedExpirationInDays = 2
2020-07-06 21:15:55 +01:00
// ProjectAccounting implements the accounting/db ProjectAccounting interface.
2019-05-10 20:05:42 +01:00
type ProjectAccounting struct {
2019-12-14 02:29:54 +00:00
db * satelliteDB
2019-05-10 20:05:42 +01:00
}
2020-07-06 21:15:55 +01:00
// SaveTallies saves the latest bucket info.
2020-08-31 11:14:20 +01:00
func ( db * ProjectAccounting ) SaveTallies ( ctx context . Context , intervalStart time . Time , bucketTallies map [ metabase . BucketLocation ] * accounting . BucketTally ) ( err error ) {
2019-06-04 12:55:38 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-05-10 20:05:42 +01:00
if len ( bucketTallies ) == 0 {
2019-09-12 18:31:50 +01:00
return nil
2019-05-10 20:05:42 +01:00
}
2020-01-15 20:35:08 +00:00
var bucketNames , projectIDs [ ] [ ] byte
2021-07-01 12:29:25 +01:00
var totalBytes , metadataSizes [ ] int64
var totalSegments , objectCounts [ ] int64
2020-01-15 20:35:08 +00:00
for _ , info := range bucketTallies {
2020-08-31 11:14:20 +01:00
bucketNames = append ( bucketNames , [ ] byte ( info . BucketName ) )
2020-01-15 20:35:08 +00:00
projectIDs = append ( projectIDs , info . ProjectID [ : ] )
2021-06-30 10:58:26 +01:00
totalBytes = append ( totalBytes , info . TotalBytes )
totalSegments = append ( totalSegments , info . TotalSegments )
2020-06-28 04:56:29 +01:00
objectCounts = append ( objectCounts , info . ObjectCount )
metadataSizes = append ( metadataSizes , info . MetadataSize )
2020-01-15 20:35:08 +00:00
}
_ , err = db . db . DB . ExecContext ( ctx , db . db . Rebind ( `
INSERT INTO bucket_storage_tallies (
interval_start ,
bucket_name , project_id ,
2021-06-30 10:58:26 +01:00
total_bytes , inline , remote ,
total_segments_count , remote_segments_count , inline_segments_count ,
2020-01-15 20:35:08 +00:00
object_count , metadata_size )
SELECT
$ 1 ,
unnest ( $ 2 : : bytea [ ] ) , unnest ( $ 3 : : bytea [ ] ) ,
2021-07-01 12:29:25 +01:00
unnest ( $ 4 : : int8 [ ] ) , $ 5 , $ 6 ,
unnest ( $ 7 : : int8 [ ] ) , $ 8 , $ 9 ,
2021-06-30 10:58:26 +01:00
unnest ( $ 10 : : int8 [ ] ) , unnest ( $ 11 : : int8 [ ] ) ` ) ,
2020-01-15 20:35:08 +00:00
intervalStart ,
2020-06-28 04:56:29 +01:00
pgutil . ByteaArray ( bucketNames ) , pgutil . ByteaArray ( projectIDs ) ,
2021-07-01 12:29:25 +01:00
pgutil . Int8Array ( totalBytes ) , 0 , 0 ,
pgutil . Int8Array ( totalSegments ) , 0 , 0 ,
2020-06-28 04:56:29 +01:00
pgutil . Int8Array ( objectCounts ) , pgutil . Int8Array ( metadataSizes ) )
2019-05-10 20:05:42 +01:00
2020-01-15 20:35:08 +00:00
return Error . Wrap ( err )
2019-09-12 18:31:50 +01:00
}
2020-07-06 21:15:55 +01:00
// GetTallies saves the latest bucket info.
2019-09-12 18:31:50 +01:00
func ( db * ProjectAccounting ) GetTallies ( ctx context . Context ) ( tallies [ ] accounting . BucketTally , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
dbxTallies , err := db . db . All_BucketStorageTally ( ctx )
2019-08-13 23:13:56 +01:00
if err != nil {
return nil , Error . Wrap ( err )
2019-05-10 20:05:42 +01:00
}
2019-09-12 18:31:50 +01:00
for _ , dbxTally := range dbxTallies {
2020-03-31 17:49:16 +01:00
projectID , err := uuid . FromBytes ( dbxTally . ProjectId )
2019-09-13 14:51:41 +01:00
if err != nil {
return nil , Error . Wrap ( err )
}
2021-07-01 12:29:25 +01:00
totalBytes := dbxTally . TotalBytes
if totalBytes == 0 {
totalBytes = dbxTally . Inline + dbxTally . Remote
}
totalSegments := dbxTally . TotalSegmentsCount
if totalSegments == 0 {
totalSegments = dbxTally . InlineSegmentsCount + dbxTally . RemoteSegmentsCount
}
2019-09-12 18:31:50 +01:00
tallies = append ( tallies , accounting . BucketTally {
2020-08-31 11:14:20 +01:00
BucketLocation : metabase . BucketLocation {
ProjectID : projectID ,
BucketName : string ( dbxTally . BucketName ) ,
} ,
2021-07-01 12:29:25 +01:00
ObjectCount : int64 ( dbxTally . ObjectCount ) ,
TotalSegments : int64 ( totalSegments ) ,
TotalBytes : int64 ( totalBytes ) ,
MetadataSize : int64 ( dbxTally . MetadataSize ) ,
2019-09-12 18:31:50 +01:00
} )
}
return tallies , nil
2019-05-10 20:05:42 +01:00
}
2020-07-06 21:15:55 +01:00
// CreateStorageTally creates a record in the bucket_storage_tallies accounting table.
2019-06-04 12:55:38 +01:00
func ( db * ProjectAccounting ) CreateStorageTally ( ctx context . Context , tally accounting . BucketStorageTally ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-09-12 18:31:50 +01:00
2021-06-30 10:58:26 +01:00
_ , err = db . db . DB . ExecContext ( ctx , db . db . Rebind ( `
INSERT INTO bucket_storage_tallies (
interval_start ,
bucket_name , project_id ,
total_bytes , inline , remote ,
total_segments_count , remote_segments_count , inline_segments_count ,
object_count , metadata_size )
VALUES (
? ,
? , ? ,
? , ? , ? ,
? , ? , ? ,
? , ?
) ` ) , tally . IntervalStart ,
[ ] byte ( tally . BucketName ) , tally . ProjectID ,
2021-07-01 12:29:25 +01:00
tally . TotalBytes , 0 , 0 ,
tally . TotalSegmentCount , 0 , 0 ,
2021-06-30 10:58:26 +01:00
tally . ObjectCount , tally . MetadataSize ,
)
return Error . Wrap ( err )
2019-05-10 20:05:42 +01:00
}
2020-07-06 21:15:55 +01:00
// GetAllocatedBandwidthTotal returns the sum of GET bandwidth usage allocated for a projectID for a time frame.
2019-06-25 16:58:42 +01:00
func ( db * ProjectAccounting ) GetAllocatedBandwidthTotal ( ctx context . Context , projectID uuid . UUID , from time . Time ) ( _ int64 , err error ) {
2019-06-04 12:55:38 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-05-10 20:05:42 +01:00
var sum * int64
2020-06-01 14:43:12 +01:00
query := ` SELECT SUM(allocated) FROM bucket_bandwidth_rollups WHERE project_id = ? AND action = ? AND interval_start >= ?; `
2020-04-01 09:47:29 +01:00
err = db . db . QueryRow ( ctx , db . db . Rebind ( query ) , projectID [ : ] , pb . PieceAction_GET , from . UTC ( ) ) . Scan ( & sum )
2020-07-14 14:04:38 +01:00
if errors . Is ( err , sql . ErrNoRows ) || sum == nil {
2019-05-10 20:05:42 +01:00
return 0 , nil
}
return * sum , err
}
2021-05-25 14:12:01 +01:00
// GetProjectBandwidth returns the used bandwidth (settled or allocated) for the specified year, month and day.
2021-05-25 18:43:47 +01:00
func ( db * ProjectAccounting ) GetProjectBandwidth ( ctx context . Context , projectID uuid . UUID , year int , month time . Month , day int , asOfSystemInterval time . Duration ) ( _ int64 , err error ) {
2020-05-01 14:24:12 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
var egress * int64
2021-05-25 14:12:01 +01:00
startOfMonth := time . Date ( year , month , 1 , 0 , 0 , 0 , 0 , time . UTC )
2020-05-01 14:24:12 +01:00
2021-05-25 14:12:01 +01:00
var expiredSince time . Time
if day < allocatedExpirationInDays {
expiredSince = startOfMonth
} else {
2021-06-03 10:09:25 +01:00
expiredSince = time . Date ( year , month , day - allocatedExpirationInDays , 0 , 0 , 0 , 0 , time . UTC )
2021-05-25 14:12:01 +01:00
}
2021-06-30 13:54:12 +01:00
periodEnd := time . Date ( year , month + 1 , 1 , 0 , 0 , 0 , 0 , time . UTC )
2021-05-25 14:12:01 +01:00
2021-05-25 18:43:47 +01:00
query := ` WITH egress AS (
2021-05-25 14:12:01 +01:00
SELECT
2021-06-03 10:09:25 +01:00
CASE WHEN interval_day < ?
2021-05-25 14:12:01 +01:00
THEN egress_settled
2021-05-29 23:16:12 +01:00
ELSE egress_allocated - egress_dead
2021-05-25 14:12:01 +01:00
END AS amount
FROM project_bandwidth_daily_rollups
2021-06-01 09:09:37 +01:00
WHERE project_id = ? AND interval_day >= ? AND interval_day < ?
2021-05-25 18:43:47 +01:00
) SELECT sum ( amount ) FROM egress ` + db . db . impl . AsOfSystemInterval ( asOfSystemInterval )
2021-05-25 14:12:01 +01:00
err = db . db . QueryRow ( ctx , db . db . Rebind ( query ) , expiredSince , projectID [ : ] , startOfMonth , periodEnd ) . Scan ( & egress )
2020-07-14 14:04:38 +01:00
if errors . Is ( err , sql . ErrNoRows ) || egress == nil {
2020-05-01 14:24:12 +01:00
return 0 , nil
}
return * egress , err
}
2021-05-17 15:07:59 +01:00
// GetProjectDailyBandwidth returns project bandwidth (allocated and settled) for the specified day.
2021-05-29 23:16:12 +01:00
func ( db * ProjectAccounting ) GetProjectDailyBandwidth ( ctx context . Context , projectID uuid . UUID , year int , month time . Month , day int ) ( allocated int64 , settled , dead int64 , err error ) {
2021-05-17 15:07:59 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
interval := time . Date ( year , month , day , 0 , 0 , 0 , 0 , time . UTC )
2021-05-29 23:16:12 +01:00
query := ` SELECT egress_allocated, egress_settled, egress_dead FROM project_bandwidth_daily_rollups WHERE project_id = ? AND interval_day = ?; `
err = db . db . QueryRow ( ctx , db . db . Rebind ( query ) , projectID [ : ] , interval ) . Scan ( & allocated , & settled , & dead )
2021-05-17 15:07:59 +01:00
if errors . Is ( err , sql . ErrNoRows ) {
2021-05-29 23:16:12 +01:00
return 0 , 0 , 0 , nil
2021-05-17 15:07:59 +01:00
}
2021-05-29 23:16:12 +01:00
return allocated , settled , dead , err
2021-05-17 15:07:59 +01:00
}
2021-12-07 14:41:39 +00:00
// GetProjectDailyBandwidthByDateRange returns project daily settled bandwidth usage by specific date range.
func ( db * ProjectAccounting ) GetProjectDailyBandwidthByDateRange ( ctx context . Context , projectID uuid . UUID , from , to time . Time ) ( _ [ ] accounting . ProjectUsageByDay , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
usage := make ( [ ] accounting . ProjectUsageByDay , 0 )
query := db . db . Rebind ( ` SELECT interval_day, COALESCE(egress_allocated, 0) FROM project_bandwidth_daily_rollups WHERE project_id = ? AND (interval_day BETWEEN ? AND ?) ` )
rows , err := db . db . QueryContext ( ctx , query , projectID [ : ] , from , to )
if err != nil {
return nil , err
}
for rows . Next ( ) {
var day time . Time
var amount int64
err = rows . Scan ( & day , & amount )
if err != nil {
return nil , errs . Combine ( err , rows . Close ( ) )
}
usage = append ( usage , accounting . ProjectUsageByDay {
Date : day ,
Value : amount ,
} )
}
err = errs . Combine ( rows . Err ( ) , rows . Close ( ) )
return usage , err
}
// GetProjectDailyStorageByDateRange returns project daily storage usage by specific date range.
func ( db * ProjectAccounting ) GetProjectDailyStorageByDateRange ( ctx context . Context , _ uuid . UUID , _ , _ time . Time ) ( _ [ ] accounting . ProjectUsageByDay , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
usage := make ( [ ] accounting . ProjectUsageByDay , 0 )
return usage , err
}
2021-05-25 14:12:01 +01:00
// DeleteProjectBandwidthBefore deletes project bandwidth rollups before the given time.
func ( db * ProjectAccounting ) DeleteProjectBandwidthBefore ( ctx context . Context , before time . Time ) ( err error ) {
2020-07-07 15:48:09 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2021-05-25 14:12:01 +01:00
_ , err = db . db . DB . ExecContext ( ctx , db . db . Rebind ( "DELETE FROM project_bandwidth_daily_rollups WHERE interval_day < ?" ) , before )
2020-07-07 15:48:09 +01:00
return err
}
2019-12-10 16:12:49 +00:00
// UpdateProjectUsageLimit updates project usage limit.
func ( db * ProjectAccounting ) UpdateProjectUsageLimit ( ctx context . Context , projectID uuid . UUID , limit memory . Size ) ( err error ) {
2019-11-25 14:18:04 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-12-10 16:12:49 +00:00
_ , err = db . db . Update_Project_By_Id ( ctx ,
dbx . Project_Id ( projectID [ : ] ) ,
dbx . Project_Update_Fields {
UsageLimit : dbx . Project_UsageLimit ( limit . Int64 ( ) ) ,
} ,
)
return err
2019-11-25 14:18:04 +00:00
}
2020-05-12 14:01:15 +01:00
// UpdateProjectBandwidthLimit updates project bandwidth limit.
func ( db * ProjectAccounting ) UpdateProjectBandwidthLimit ( ctx context . Context , projectID uuid . UUID , limit memory . Size ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
_ , err = db . db . Update_Project_By_Id ( ctx ,
dbx . Project_Id ( projectID [ : ] ) ,
dbx . Project_Update_Fields {
BandwidthLimit : dbx . Project_BandwidthLimit ( limit . Int64 ( ) ) ,
} ,
)
return err
}
2021-12-03 15:06:20 +00:00
// UpdateProjectSegmentLimit updates project segment limit.
func ( db * ProjectAccounting ) UpdateProjectSegmentLimit ( ctx context . Context , projectID uuid . UUID , limit int64 ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
_ , err = db . db . Update_Project_By_Id ( ctx ,
dbx . Project_Id ( projectID [ : ] ) ,
dbx . Project_Update_Fields {
SegmentLimit : dbx . Project_SegmentLimit ( limit ) ,
} ,
)
return err
}
2019-11-25 14:18:04 +00:00
// GetProjectStorageLimit returns project storage usage limit.
2020-09-06 00:02:12 +01:00
func ( db * ProjectAccounting ) GetProjectStorageLimit ( ctx context . Context , projectID uuid . UUID ) ( _ * int64 , err error ) {
2019-11-25 14:18:04 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
2020-05-12 14:01:15 +01:00
row , err := db . db . Get_Project_UsageLimit_By_Id ( ctx ,
dbx . Project_Id ( projectID [ : ] ) ,
)
if err != nil {
2020-09-06 00:02:12 +01:00
return nil , err
2020-05-12 14:01:15 +01:00
}
2020-09-06 00:02:12 +01:00
return row . UsageLimit , nil
2019-11-25 14:18:04 +00:00
}
// GetProjectBandwidthLimit returns project bandwidth usage limit.
2020-09-06 00:02:12 +01:00
func ( db * ProjectAccounting ) GetProjectBandwidthLimit ( ctx context . Context , projectID uuid . UUID ) ( _ * int64 , err error ) {
2019-11-25 14:18:04 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
2020-05-12 14:01:15 +01:00
row , err := db . db . Get_Project_BandwidthLimit_By_Id ( ctx ,
2019-12-10 16:12:49 +00:00
dbx . Project_Id ( projectID [ : ] ) ,
2019-11-25 14:18:04 +00:00
)
2019-05-28 16:36:52 +01:00
if err != nil {
2020-09-06 00:02:12 +01:00
return nil , err
2019-05-28 16:36:52 +01:00
}
2019-11-25 14:18:04 +00:00
2020-09-06 00:02:12 +01:00
return row . BandwidthLimit , nil
2019-05-28 16:36:52 +01:00
}
2019-11-15 14:27:44 +00:00
2021-11-17 15:32:34 +00:00
// GetProjectObjectsSegments retrieves project objects and segments.
func ( db * ProjectAccounting ) GetProjectObjectsSegments ( ctx context . Context , projectID uuid . UUID ) ( objectsSegments * accounting . ProjectObjectsSegments , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
objectsSegments = new ( accounting . ProjectObjectsSegments )
// check if rows exist.
var count int64
countRow := db . db . QueryRowContext ( ctx , db . db . Rebind ( ` SELECT COUNT(*) FROM bucket_storage_tallies WHERE project_id = ? ` ) , projectID [ : ] )
if err = countRow . Scan ( & count ) ; err != nil {
return nil , err
}
if count == 0 {
return objectsSegments , nil
}
var latestDate time . Time
latestDateRow := db . db . QueryRowContext ( ctx , db . db . Rebind ( ` SELECT MAX(interval_start) FROM bucket_storage_tallies WHERE project_id = ? ` ) , projectID [ : ] )
if err = latestDateRow . Scan ( & latestDate ) ; err != nil {
return nil , err
}
// check if latest bucket tallies are more than 3 days old.
inThreeDays := latestDate . Add ( 24 * time . Hour * 3 )
if inThreeDays . Before ( time . Now ( ) ) {
return objectsSegments , nil
}
// calculate total segments and objects count.
storageTalliesRows := db . db . QueryRowContext ( ctx , db . db . Rebind ( `
SELECT
SUM ( total_segments_count ) ,
SUM ( object_count )
FROM
bucket_storage_tallies
WHERE
2021-12-03 15:06:20 +00:00
project_id = ? AND
2021-11-17 15:32:34 +00:00
interval_start = ?
` ) , projectID [ : ] , latestDate )
if err = storageTalliesRows . Scan ( & objectsSegments . SegmentCount , & objectsSegments . ObjectCount ) ; err != nil {
return nil , err
}
return objectsSegments , nil
}
2021-12-03 15:06:20 +00:00
// GetProjectSegmentLimit returns project segment limit.
func ( db * ProjectAccounting ) GetProjectSegmentLimit ( ctx context . Context , projectID uuid . UUID ) ( _ * int64 , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
row , err := db . db . Get_Project_SegmentLimit_By_Id ( ctx ,
dbx . Project_Id ( projectID [ : ] ) ,
)
if err != nil {
return nil , err
}
return row . SegmentLimit , nil
}
2019-12-10 16:12:49 +00:00
// GetProjectTotal retrieves project usage for a given period.
2019-11-15 14:27:44 +00:00
func ( db * ProjectAccounting ) GetProjectTotal ( ctx context . Context , projectID uuid . UUID , since , before time . Time ) ( usage * accounting . ProjectUsage , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
since = timeTruncateDown ( since )
2021-01-05 19:39:08 +00:00
bucketNames , err := db . getBucketsSinceAndBefore ( ctx , projectID , since , before )
2019-11-15 14:27:44 +00:00
if err != nil {
return nil , err
}
2019-11-28 18:45:31 +00:00
storageQuery := db . db . Rebind ( `
SELECT
2021-03-01 20:04:00 +00:00
bucket_storage_tallies . interval_start ,
2021-06-30 10:58:26 +01:00
bucket_storage_tallies . total_bytes ,
2019-11-28 18:45:31 +00:00
bucket_storage_tallies . inline ,
bucket_storage_tallies . remote ,
2021-10-28 16:50:06 +01:00
bucket_storage_tallies . total_segments_count ,
bucket_storage_tallies . object_count
2021-03-01 20:04:00 +00:00
FROM
bucket_storage_tallies
WHERE
bucket_storage_tallies . project_id = ? AND
2020-07-06 21:15:55 +01:00
bucket_storage_tallies . bucket_name = ? AND
2021-03-01 20:04:00 +00:00
bucket_storage_tallies . interval_start >= ? AND
bucket_storage_tallies . interval_start <= ?
2019-11-28 18:45:31 +00:00
ORDER BY bucket_storage_tallies . interval_start DESC
` )
bucketsTallies := make ( map [ string ] [ ] * accounting . BucketStorageTally )
for _ , bucket := range bucketNames {
storageTallies := make ( [ ] * accounting . BucketStorageTally , 0 )
2020-01-16 14:27:24 +00:00
2019-11-29 15:53:57 +00:00
storageTalliesRows , err := db . db . QueryContext ( ctx , storageQuery , projectID [ : ] , [ ] byte ( bucket ) , since , before )
2019-11-15 14:27:44 +00:00
if err != nil {
return nil , err
}
2019-11-28 18:45:31 +00:00
// generating tallies for each bucket name.
for storageTalliesRows . Next ( ) {
tally := accounting . BucketStorageTally { }
2021-07-01 12:29:25 +01:00
var inline , remote int64
2021-10-28 16:50:06 +01:00
err = storageTalliesRows . Scan ( & tally . IntervalStart , & tally . TotalBytes , & inline , & remote , & tally . TotalSegmentCount , & tally . ObjectCount )
2019-11-28 18:45:31 +00:00
if err != nil {
2020-01-16 14:27:24 +00:00
return nil , errs . Combine ( err , storageTalliesRows . Close ( ) )
2019-11-28 18:45:31 +00:00
}
2021-07-01 12:29:25 +01:00
if tally . TotalBytes == 0 {
tally . TotalBytes = inline + remote
}
2019-11-28 18:45:31 +00:00
tally . BucketName = bucket
2019-11-28 21:42:04 +00:00
storageTallies = append ( storageTallies , & tally )
2019-11-15 14:27:44 +00:00
}
2020-01-16 14:27:24 +00:00
err = errs . Combine ( storageTalliesRows . Err ( ) , storageTalliesRows . Close ( ) )
2019-11-29 15:53:57 +00:00
if err != nil {
return nil , err
}
2019-11-28 18:45:31 +00:00
bucketsTallies [ bucket ] = storageTallies
2019-11-15 14:27:44 +00:00
}
2019-11-28 18:45:31 +00:00
totalEgress , err := db . getTotalEgress ( ctx , projectID , since , before )
if err != nil {
return nil , err
2019-11-15 14:27:44 +00:00
}
usage = new ( accounting . ProjectUsage )
usage . Egress = memory . Size ( totalEgress ) . Int64 ( )
2021-10-28 16:50:06 +01:00
// sum up storage, objects, and segments
2019-11-15 14:27:44 +00:00
for _ , tallies := range bucketsTallies {
2019-11-28 18:45:31 +00:00
for i := len ( tallies ) - 1 ; i > 0 ; i -- {
current := ( tallies ) [ i ]
hours := ( tallies ) [ i - 1 ] . IntervalStart . Sub ( current . IntervalStart ) . Hours ( )
2021-06-30 10:58:26 +01:00
usage . Storage += memory . Size ( current . Bytes ( ) ) . Float64 ( ) * hours
2021-10-20 23:54:34 +01:00
usage . SegmentCount += float64 ( current . TotalSegmentCount ) * hours
2021-10-28 16:50:06 +01:00
usage . ObjectCount += float64 ( current . ObjectCount ) * hours
2019-11-15 14:27:44 +00:00
}
}
usage . Since = since
usage . Before = before
return usage , nil
}
2019-11-28 18:45:31 +00:00
// getTotalEgress returns total egress (settled + inline) of each bucket_bandwidth_rollup
// in selected time period, project id.
2020-01-28 14:51:14 +00:00
// only process PieceAction_GET.
2019-11-28 18:45:31 +00:00
func ( db * ProjectAccounting ) getTotalEgress ( ctx context . Context , projectID uuid . UUID , since , before time . Time ) ( totalEgress int64 , err error ) {
2020-01-28 14:51:14 +00:00
totalEgressQuery := db . db . Rebind ( `
2021-03-01 20:04:00 +00:00
SELECT
COALESCE ( SUM ( settled ) + SUM ( inline ) , 0 )
FROM
bucket_bandwidth_rollups
WHERE
project_id = ? AND
interval_start >= ? AND
interval_start <= ? AND
2020-01-28 14:51:14 +00:00
action = ? ;
` )
2019-11-28 18:45:31 +00:00
2020-01-28 14:51:14 +00:00
totalEgressRow := db . db . QueryRowContext ( ctx , totalEgressQuery , projectID [ : ] , since , before , pb . PieceAction_GET )
2019-11-28 18:45:31 +00:00
err = totalEgressRow . Scan ( & totalEgress )
return totalEgress , err
}
2020-07-06 21:15:55 +01:00
// GetBucketUsageRollups retrieves summed usage rollups for every bucket of particular project for a given period.
2019-11-15 14:27:44 +00:00
func ( db * ProjectAccounting ) GetBucketUsageRollups ( ctx context . Context , projectID uuid . UUID , since , before time . Time ) ( _ [ ] accounting . BucketUsageRollup , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2020-01-10 18:53:42 +00:00
since = timeTruncateDown ( since . UTC ( ) )
before = before . UTC ( )
2019-11-15 14:27:44 +00:00
2021-01-05 19:39:08 +00:00
buckets , err := db . getBucketsSinceAndBefore ( ctx , projectID , since , before )
2019-11-15 14:27:44 +00:00
if err != nil {
return nil , err
}
roullupsQuery := db . db . Rebind ( ` SELECT SUM ( settled ) , SUM ( inline ) , action
FROM bucket_bandwidth_rollups
WHERE project_id = ? AND bucket_name = ? AND interval_start >= ? AND interval_start <= ?
GROUP BY action ` )
2019-11-28 18:45:31 +00:00
// TODO: should be optimized
2019-11-15 14:27:44 +00:00
storageQuery := db . db . All_BucketStorageTally_By_ProjectId_And_BucketName_And_IntervalStart_GreaterOrEqual_And_IntervalStart_LessOrEqual_OrderBy_Desc_IntervalStart
var bucketUsageRollups [ ] accounting . BucketUsageRollup
for _ , bucket := range buckets {
2020-11-02 12:21:55 +00:00
err := func ( ) error {
bucketRollup := accounting . BucketUsageRollup {
ProjectID : projectID ,
BucketName : [ ] byte ( bucket ) ,
Since : since ,
Before : before ,
}
2019-11-15 14:27:44 +00:00
2020-11-02 12:21:55 +00:00
// get bucket_bandwidth_rollups
rollupsRows , err := db . db . QueryContext ( ctx , roullupsQuery , projectID [ : ] , [ ] byte ( bucket ) , since , before )
if err != nil {
return err
}
defer func ( ) { err = errs . Combine ( err , rollupsRows . Close ( ) ) } ( )
// fill egress
for rollupsRows . Next ( ) {
var action pb . PieceAction
var settled , inline int64
err = rollupsRows . Scan ( & settled , & inline , & action )
if err != nil {
return err
}
switch action {
case pb . PieceAction_GET :
bucketRollup . GetEgress += memory . Size ( settled + inline ) . GB ( )
case pb . PieceAction_GET_AUDIT :
bucketRollup . AuditEgress += memory . Size ( settled + inline ) . GB ( )
case pb . PieceAction_GET_REPAIR :
bucketRollup . RepairEgress += memory . Size ( settled + inline ) . GB ( )
default :
continue
}
}
if err := rollupsRows . Err ( ) ; err != nil {
return err
}
2019-11-15 14:27:44 +00:00
2020-11-02 12:21:55 +00:00
bucketStorageTallies , err := storageQuery ( ctx ,
dbx . BucketStorageTally_ProjectId ( projectID [ : ] ) ,
dbx . BucketStorageTally_BucketName ( [ ] byte ( bucket ) ) ,
dbx . BucketStorageTally_IntervalStart ( since ) ,
dbx . BucketStorageTally_IntervalStart ( before ) )
2019-11-15 14:27:44 +00:00
if err != nil {
2020-11-02 12:21:55 +00:00
return err
2019-11-15 14:27:44 +00:00
}
2020-11-02 12:21:55 +00:00
// fill metadata, objects and stored data
// hours calculated from previous tallies,
// so we skip the most recent one
for i := len ( bucketStorageTallies ) - 1 ; i > 0 ; i -- {
current := bucketStorageTallies [ i ]
2019-11-15 14:27:44 +00:00
2020-11-02 12:21:55 +00:00
hours := bucketStorageTallies [ i - 1 ] . IntervalStart . Sub ( current . IntervalStart ) . Hours ( )
2021-06-30 10:58:26 +01:00
if current . TotalBytes > 0 {
bucketRollup . TotalStoredData += memory . Size ( current . TotalBytes ) . GB ( ) * hours
} else {
bucketRollup . TotalStoredData += memory . Size ( current . Remote + current . Inline ) . GB ( ) * hours
}
2020-11-02 12:21:55 +00:00
bucketRollup . MetadataSize += memory . Size ( current . MetadataSize ) . GB ( ) * hours
2021-06-30 10:58:26 +01:00
if current . TotalSegmentsCount > 0 {
bucketRollup . TotalSegments += float64 ( current . TotalSegmentsCount ) * hours
} else {
bucketRollup . TotalSegments += float64 ( current . RemoteSegmentsCount + current . InlineSegmentsCount ) * hours
}
2020-11-02 12:21:55 +00:00
bucketRollup . ObjectCount += float64 ( current . ObjectCount ) * hours
}
2019-11-15 14:27:44 +00:00
2020-11-02 12:21:55 +00:00
bucketUsageRollups = append ( bucketUsageRollups , bucketRollup )
return nil
} ( )
2019-11-15 14:27:44 +00:00
if err != nil {
return nil , err
}
}
return bucketUsageRollups , nil
}
2019-12-10 16:32:54 +00:00
// prefixIncrement returns the lexicographically lowest byte string which is
// greater than origPrefix and does not have origPrefix as a prefix. If no such
// byte string exists (origPrefix is empty, or origPrefix contains only 0xff
// bytes), returns false for ok.
//
// examples: prefixIncrement([]byte("abc")) -> ([]byte("abd", true)
// prefixIncrement([]byte("ab\xff\xff")) -> ([]byte("ac", true)
// prefixIncrement([]byte("")) -> (nil, false)
// prefixIncrement([]byte("\x00")) -> ([]byte("\x01", true)
// prefixIncrement([]byte("\xff\xff\xff")) -> (nil, false)
//
func prefixIncrement ( origPrefix [ ] byte ) ( incremented [ ] byte , ok bool ) {
incremented = make ( [ ] byte , len ( origPrefix ) )
copy ( incremented , origPrefix )
i := len ( incremented ) - 1
for i >= 0 {
if incremented [ i ] != 0xff {
incremented [ i ] ++
return incremented [ : i + 1 ] , true
}
i --
}
// there is no byte string which is greater than origPrefix and which does
// not have origPrefix as a prefix.
return nil , false
}
// prefixMatch creates a SQL expression which
// will evaluate to true if and only if the value of expr starts with the value
// of prefix.
//
// Returns also a slice of arguments that should be passed to the corresponding
// db.Query* or db.Exec* to fill in parameters in the returned SQL expression.
//
// The returned SQL expression needs to be passed through Rebind(), as it uses
// `?` markers instead of `$N`, because we don't know what N we would need to
// use.
func ( db * ProjectAccounting ) prefixMatch ( expr string , prefix [ ] byte ) ( string , [ ] byte , error ) {
incrementedPrefix , ok := prefixIncrement ( prefix )
2021-05-11 09:49:26 +01:00
switch db . db . impl {
2019-12-10 16:32:54 +00:00
case dbutil . Postgres :
if ! ok {
return fmt . Sprintf ( ` (%s >= ?) ` , expr ) , nil , nil
}
return fmt . Sprintf ( ` (%s >= ? AND %s < ?) ` , expr , expr ) , incrementedPrefix , nil
case dbutil . Cockroach :
if ! ok {
return fmt . Sprintf ( ` (%s >= ?:::BYTEA) ` , expr ) , nil , nil
}
return fmt . Sprintf ( ` (%s >= ?:::BYTEA AND %s < ?:::BYTEA) ` , expr , expr ) , incrementedPrefix , nil
default :
2021-05-11 09:49:26 +01:00
return "" , nil , errs . New ( "unhandled database: %v" , db . db . driver )
2019-12-10 16:32:54 +00:00
}
}
2020-07-06 21:15:55 +01:00
// GetBucketTotals retrieves bucket usage totals for period of time.
2019-11-15 14:27:44 +00:00
func ( db * ProjectAccounting ) GetBucketTotals ( ctx context . Context , projectID uuid . UUID , cursor accounting . BucketUsageCursor , since , before time . Time ) ( _ * accounting . BucketUsagePage , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
since = timeTruncateDown ( since )
2019-12-10 16:32:54 +00:00
bucketPrefix := [ ] byte ( cursor . Search )
2019-11-15 14:27:44 +00:00
if cursor . Limit > 50 {
cursor . Limit = 50
}
if cursor . Page == 0 {
return nil , errs . New ( "page can not be 0" )
}
page := & accounting . BucketUsagePage {
Search : cursor . Search ,
Limit : cursor . Limit ,
Offset : uint64 ( ( cursor . Page - 1 ) * cursor . Limit ) ,
}
2020-04-24 21:25:16 +01:00
bucketNameRange , incrPrefix , err := db . prefixMatch ( "name" , bucketPrefix )
2019-12-10 16:32:54 +00:00
if err != nil {
return nil , err
}
2020-04-24 21:25:16 +01:00
countQuery := db . db . Rebind ( ` SELECT COUNT ( name ) FROM bucket_metainfos
WHERE project_id = ? AND ` + bucketNameRange )
2019-11-15 14:27:44 +00:00
2019-12-10 16:32:54 +00:00
args := [ ] interface { } {
2019-11-15 14:27:44 +00:00
projectID [ : ] ,
2019-12-10 16:32:54 +00:00
bucketPrefix ,
}
if incrPrefix != nil {
args = append ( args , incrPrefix )
}
countRow := db . db . QueryRowContext ( ctx , countQuery , args ... )
2019-11-15 14:27:44 +00:00
err = countRow . Scan ( & page . TotalCount )
if err != nil {
return nil , err
}
2019-12-10 16:32:54 +00:00
2019-11-15 14:27:44 +00:00
if page . TotalCount == 0 {
return page , nil
}
if page . Offset > page . TotalCount - 1 {
return nil , errs . New ( "page is out of range" )
}
2019-12-10 16:32:54 +00:00
var buckets [ ] string
2021-03-01 20:04:00 +00:00
bucketsQuery := db . db . Rebind ( ` SELECT name FROM bucket_metainfos
2020-04-24 21:25:16 +01:00
WHERE project_id = ? AND ` + bucketNameRange + ` ORDER BY name ASC LIMIT ? OFFSET ? ` )
2019-11-15 14:27:44 +00:00
2019-12-10 16:32:54 +00:00
args = [ ] interface { } {
2019-11-15 14:27:44 +00:00
projectID [ : ] ,
2019-12-10 16:32:54 +00:00
bucketPrefix ,
}
if incrPrefix != nil {
args = append ( args , incrPrefix )
}
args = append ( args , page . Limit , page . Offset )
2019-11-15 14:27:44 +00:00
2019-12-10 16:32:54 +00:00
bucketRows , err := db . db . QueryContext ( ctx , bucketsQuery , args ... )
2019-11-15 14:27:44 +00:00
if err != nil {
return nil , err
}
2019-12-10 16:32:54 +00:00
defer func ( ) { err = errs . Combine ( err , bucketRows . Close ( ) ) } ( )
2020-01-16 14:27:24 +00:00
2019-11-15 14:27:44 +00:00
for bucketRows . Next ( ) {
var bucket string
err = bucketRows . Scan ( & bucket )
if err != nil {
return nil , err
}
buckets = append ( buckets , bucket )
}
2020-01-16 14:27:24 +00:00
if err := bucketRows . Err ( ) ; err != nil {
return nil , err
}
2019-11-15 14:27:44 +00:00
2020-01-28 14:51:14 +00:00
rollupsQuery := db . db . Rebind ( ` SELECT COALESCE ( SUM ( settled ) + SUM ( inline ) , 0 )
2019-11-15 14:27:44 +00:00
FROM bucket_bandwidth_rollups
2020-01-28 14:51:14 +00:00
WHERE project_id = ? AND bucket_name = ? AND interval_start >= ? AND interval_start <= ? AND action = ? ` )
2019-11-15 14:27:44 +00:00
2021-10-28 16:50:06 +01:00
storageQuery := db . db . Rebind ( ` SELECT total_bytes , inline , remote , object_count , total_segments_count
2019-11-15 14:27:44 +00:00
FROM bucket_storage_tallies
WHERE project_id = ? AND bucket_name = ? AND interval_start >= ? AND interval_start <= ?
ORDER BY interval_start DESC
LIMIT 1 ` )
var bucketUsages [ ] accounting . BucketUsage
for _ , bucket := range buckets {
bucketUsage := accounting . BucketUsage {
ProjectID : projectID ,
BucketName : bucket ,
Since : since ,
Before : before ,
}
// get bucket_bandwidth_rollups
2020-01-28 14:51:14 +00:00
rollupRow := db . db . QueryRowContext ( ctx , rollupsQuery , projectID [ : ] , [ ] byte ( bucket ) , since , before , pb . PieceAction_GET )
2019-11-15 14:27:44 +00:00
2020-01-28 14:51:14 +00:00
var egress int64
err = rollupRow . Scan ( & egress )
if err != nil {
2020-07-16 16:50:15 +01:00
if ! errors . Is ( err , sql . ErrNoRows ) {
2019-11-15 14:27:44 +00:00
return nil , err
}
2020-01-16 14:27:24 +00:00
}
2019-11-15 14:27:44 +00:00
2020-01-28 14:51:14 +00:00
bucketUsage . Egress = memory . Size ( egress ) . GB ( )
2019-11-15 14:27:44 +00:00
storageRow := db . db . QueryRowContext ( ctx , storageQuery , projectID [ : ] , [ ] byte ( bucket ) , since , before )
2021-06-30 10:58:26 +01:00
var tally accounting . BucketStorageTally
2021-07-01 12:29:25 +01:00
var inline , remote int64
2021-10-28 16:50:06 +01:00
err = storageRow . Scan ( & tally . TotalBytes , & inline , & remote , & tally . ObjectCount , & tally . TotalSegmentCount )
2019-11-15 14:27:44 +00:00
if err != nil {
2020-07-16 16:50:15 +01:00
if ! errors . Is ( err , sql . ErrNoRows ) {
2019-11-15 14:27:44 +00:00
return nil , err
}
}
2021-07-01 12:29:25 +01:00
if tally . TotalBytes == 0 {
tally . TotalBytes = inline + remote
}
2019-11-15 14:27:44 +00:00
// fill storage and object count
2021-06-30 10:58:26 +01:00
bucketUsage . Storage = memory . Size ( tally . Bytes ( ) ) . GB ( )
2021-10-20 23:54:34 +01:00
bucketUsage . SegmentCount = tally . TotalSegmentCount
2021-10-28 16:50:06 +01:00
bucketUsage . ObjectCount = tally . ObjectCount
2019-11-15 14:27:44 +00:00
bucketUsages = append ( bucketUsages , bucketUsage )
}
page . PageCount = uint ( page . TotalCount / uint64 ( cursor . Limit ) )
if page . TotalCount % uint64 ( cursor . Limit ) != 0 {
page . PageCount ++
}
page . BucketUsages = bucketUsages
page . CurrentPage = cursor . Page
return page , nil
}
2020-11-30 19:34:42 +00:00
// ArchiveRollupsBefore archives rollups older than a given time.
2021-07-02 14:41:49 +01:00
func ( db * ProjectAccounting ) ArchiveRollupsBefore ( ctx context . Context , before time . Time , batchSize int ) ( archivedCount int , err error ) {
2020-11-30 19:34:42 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
if batchSize <= 0 {
return 0 , nil
}
2021-05-11 09:49:26 +01:00
switch db . db . impl {
2020-11-30 19:34:42 +00:00
case dbutil . Cockroach :
2021-07-02 14:41:49 +01:00
// We operate one action at a time, because we have an index on `(action, interval_start, project_id)`.
for action := range pb . PieceAction_name {
count , err := db . archiveRollupsBeforeByAction ( ctx , action , before , batchSize )
archivedCount += count
if err != nil {
return archivedCount , Error . Wrap ( err )
2020-11-30 19:34:42 +00:00
}
}
2021-07-02 14:41:49 +01:00
return archivedCount , nil
2020-11-30 19:34:42 +00:00
case dbutil . Postgres :
2021-07-02 14:41:49 +01:00
err := db . db . DB . QueryRow ( ctx , `
2020-11-30 19:34:42 +00:00
WITH rollups_to_move AS (
DELETE FROM bucket_bandwidth_rollups
WHERE interval_start <= $ 1
RETURNING *
) , moved_rollups AS (
INSERT INTO bucket_bandwidth_rollup_archives ( bucket_name , project_id , interval_start , interval_seconds , action , inline , allocated , settled )
SELECT bucket_name , project_id , interval_start , interval_seconds , action , inline , allocated , settled FROM rollups_to_move
RETURNING *
2021-03-01 20:04:00 +00:00
)
SELECT count ( * ) FROM moved_rollups
2021-07-02 14:41:49 +01:00
` , before ) . Scan ( & archivedCount )
return archivedCount , Error . Wrap ( err )
default :
return 0 , nil
}
}
func ( db * ProjectAccounting ) archiveRollupsBeforeByAction ( ctx context . Context , action int32 , before time . Time , batchSize int ) ( archivedCount int , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
for {
2020-11-30 19:34:42 +00:00
var rowCount int
2021-07-02 14:41:49 +01:00
err := db . db . QueryRow ( ctx , `
WITH rollups_to_move AS (
DELETE FROM bucket_bandwidth_rollups
WHERE action = $ 1 AND interval_start <= $ 2
LIMIT $ 3 RETURNING *
) , moved_rollups AS (
INSERT INTO bucket_bandwidth_rollup_archives ( bucket_name , project_id , interval_start , interval_seconds , action , inline , allocated , settled )
SELECT bucket_name , project_id , interval_start , interval_seconds , action , inline , allocated , settled FROM rollups_to_move
RETURNING *
)
SELECT count ( * ) FROM moved_rollups
` , int ( action ) , before , batchSize ) . Scan ( & rowCount )
2020-11-30 19:34:42 +00:00
if err != nil {
2021-07-02 14:41:49 +01:00
return archivedCount , Error . Wrap ( err )
}
archivedCount += rowCount
if rowCount < batchSize {
return archivedCount , nil
2020-11-30 19:34:42 +00:00
}
}
}
2021-01-05 19:39:08 +00:00
// getBucketsSinceAndBefore lists distinct bucket names for a project within a specific timeframe.
func ( db * ProjectAccounting ) getBucketsSinceAndBefore ( ctx context . Context , projectID uuid . UUID , since , before time . Time ) ( _ [ ] string , err error ) {
2019-11-15 14:27:44 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
bucketsQuery := db . db . Rebind ( ` SELECT DISTINCT bucket_name
2020-07-06 21:15:55 +01:00
FROM bucket_storage_tallies
2021-01-05 19:39:08 +00:00
WHERE project_id = ?
2021-03-01 20:04:00 +00:00
AND interval_start >= ?
2021-01-05 19:39:08 +00:00
AND interval_start <= ? ` )
bucketRows , err := db . db . QueryContext ( ctx , bucketsQuery , projectID [ : ] , since , before )
2019-11-15 14:27:44 +00:00
if err != nil {
return nil , err
}
defer func ( ) { err = errs . Combine ( err , bucketRows . Close ( ) ) } ( )
var buckets [ ] string
for bucketRows . Next ( ) {
var bucket string
err = bucketRows . Scan ( & bucket )
if err != nil {
return nil , err
}
buckets = append ( buckets , bucket )
}
2020-01-16 14:27:24 +00:00
return buckets , bucketRows . Err ( )
2019-11-15 14:27:44 +00:00
}
2020-07-06 21:15:55 +01:00
// timeTruncateDown truncates down to the hour before to be in sync with orders endpoint.
2019-11-15 14:27:44 +00:00
func timeTruncateDown ( t time . Time ) time . Time {
return time . Date ( t . Year ( ) , t . Month ( ) , t . Day ( ) , t . Hour ( ) , 0 , 0 , 0 , t . Location ( ) )
}
2020-09-09 20:20:44 +01:00
// GetProjectLimits returns current project limit for both storage and bandwidth.
func ( db * ProjectAccounting ) GetProjectLimits ( ctx context . Context , projectID uuid . UUID ) ( _ accounting . ProjectLimits , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2021-12-03 15:06:20 +00:00
row , err := db . db . Get_Project_BandwidthLimit_Project_UsageLimit_Project_SegmentLimit_By_Id ( ctx ,
2020-09-09 20:20:44 +01:00
dbx . Project_Id ( projectID [ : ] ) ,
)
if err != nil {
return accounting . ProjectLimits { } , err
}
return accounting . ProjectLimits {
2020-10-06 13:50:29 +01:00
Usage : row . UsageLimit ,
Bandwidth : row . BandwidthLimit ,
2021-12-03 15:06:20 +00:00
Segments : row . SegmentLimit ,
2020-09-09 20:20:44 +01:00
} , nil
}
2020-11-30 19:34:42 +00:00
// GetRollupsSince retrieves all archived rollup records since a given time.
func ( db * ProjectAccounting ) GetRollupsSince ( ctx context . Context , since time . Time ) ( bwRollups [ ] orders . BucketBandwidthRollup , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
pageLimit := db . db . opts . ReadRollupBatchSize
if pageLimit <= 0 {
pageLimit = 10000
}
var cursor * dbx . Paged_BucketBandwidthRollup_By_IntervalStart_GreaterOrEqual_Continuation
for {
dbxRollups , next , err := db . db . Paged_BucketBandwidthRollup_By_IntervalStart_GreaterOrEqual ( ctx ,
dbx . BucketBandwidthRollup_IntervalStart ( since ) ,
pageLimit , cursor )
if err != nil {
return nil , Error . Wrap ( err )
}
cursor = next
for _ , dbxRollup := range dbxRollups {
projectID , err := uuid . FromBytes ( dbxRollup . ProjectId )
if err != nil {
return nil , err
}
bwRollups = append ( bwRollups , orders . BucketBandwidthRollup {
ProjectID : projectID ,
BucketName : string ( dbxRollup . BucketName ) ,
Action : pb . PieceAction ( dbxRollup . Action ) ,
Inline : int64 ( dbxRollup . Inline ) ,
Allocated : int64 ( dbxRollup . Allocated ) ,
Settled : int64 ( dbxRollup . Settled ) ,
} )
}
2021-03-01 20:04:00 +00:00
if cursor == nil {
2020-11-30 19:34:42 +00:00
return bwRollups , nil
}
}
}
// GetArchivedRollupsSince retrieves all archived rollup records since a given time.
func ( db * ProjectAccounting ) GetArchivedRollupsSince ( ctx context . Context , since time . Time ) ( bwRollups [ ] orders . BucketBandwidthRollup , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
pageLimit := db . db . opts . ReadRollupBatchSize
if pageLimit <= 0 {
pageLimit = 10000
}
var cursor * dbx . Paged_BucketBandwidthRollupArchive_By_IntervalStart_GreaterOrEqual_Continuation
for {
dbxRollups , next , err := db . db . Paged_BucketBandwidthRollupArchive_By_IntervalStart_GreaterOrEqual ( ctx ,
dbx . BucketBandwidthRollupArchive_IntervalStart ( since ) ,
pageLimit , cursor )
if err != nil {
return nil , Error . Wrap ( err )
}
cursor = next
for _ , dbxRollup := range dbxRollups {
projectID , err := uuid . FromBytes ( dbxRollup . ProjectId )
if err != nil {
return nil , err
}
bwRollups = append ( bwRollups , orders . BucketBandwidthRollup {
ProjectID : projectID ,
BucketName : string ( dbxRollup . BucketName ) ,
Action : pb . PieceAction ( dbxRollup . Action ) ,
Inline : int64 ( dbxRollup . Inline ) ,
Allocated : int64 ( dbxRollup . Allocated ) ,
Settled : int64 ( dbxRollup . Settled ) ,
} )
}
2021-03-01 20:04:00 +00:00
if cursor == nil {
2020-11-30 19:34:42 +00:00
return bwRollups , nil
}
}
}