// Copyright (C) 2019 Storj Labs, Inc. // See LICENSE for copying information. package satellitedb import ( "context" "database/sql" "errors" "fmt" "time" "github.com/zeebo/errs" "storj.io/common/memory" "storj.io/common/pb" "storj.io/common/uuid" "storj.io/storj/private/dbutil" "storj.io/storj/private/dbutil/pgutil" "storj.io/storj/satellite/accounting" "storj.io/storj/satellite/metainfo/metabase" "storj.io/storj/satellite/satellitedb/dbx" ) // ensure that ProjectAccounting implements accounting.ProjectAccounting. var _ accounting.ProjectAccounting = (*ProjectAccounting)(nil) // ProjectAccounting implements the accounting/db ProjectAccounting interface. type ProjectAccounting struct { db *satelliteDB } // SaveTallies saves the latest bucket info. func (db *ProjectAccounting) SaveTallies(ctx context.Context, intervalStart time.Time, bucketTallies map[metabase.BucketLocation]*accounting.BucketTally) (err error) { defer mon.Task()(&ctx)(&err) if len(bucketTallies) == 0 { return nil } var bucketNames, projectIDs [][]byte var inlineBytes, remoteBytes, metadataSizes []int64 var remoteSegments, inlineSegments, objectCounts []int64 for _, info := range bucketTallies { bucketNames = append(bucketNames, []byte(info.BucketName)) projectIDs = append(projectIDs, info.ProjectID[:]) inlineBytes = append(inlineBytes, info.InlineBytes) remoteBytes = append(remoteBytes, info.RemoteBytes) remoteSegments = append(remoteSegments, info.RemoteSegments) inlineSegments = append(inlineSegments, info.InlineSegments) objectCounts = append(objectCounts, info.ObjectCount) metadataSizes = append(metadataSizes, info.MetadataSize) } _, err = db.db.DB.ExecContext(ctx, db.db.Rebind(` INSERT INTO bucket_storage_tallies ( interval_start, bucket_name, project_id, inline, remote, remote_segments_count, inline_segments_count, object_count, metadata_size) SELECT $1, unnest($2::bytea[]), unnest($3::bytea[]), unnest($4::int8[]), unnest($5::int8[]), unnest($6::int8[]), unnest($7::int8[]), unnest($8::int8[]), unnest($9::int8[])`), intervalStart, pgutil.ByteaArray(bucketNames), pgutil.ByteaArray(projectIDs), pgutil.Int8Array(inlineBytes), pgutil.Int8Array(remoteBytes), pgutil.Int8Array(remoteSegments), pgutil.Int8Array(inlineSegments), pgutil.Int8Array(objectCounts), pgutil.Int8Array(metadataSizes)) return Error.Wrap(err) } // GetTallies saves the latest bucket info. func (db *ProjectAccounting) GetTallies(ctx context.Context) (tallies []accounting.BucketTally, err error) { defer mon.Task()(&ctx)(&err) dbxTallies, err := db.db.All_BucketStorageTally(ctx) if err != nil { return nil, Error.Wrap(err) } for _, dbxTally := range dbxTallies { projectID, err := uuid.FromBytes(dbxTally.ProjectId) if err != nil { return nil, Error.Wrap(err) } tallies = append(tallies, accounting.BucketTally{ BucketLocation: metabase.BucketLocation{ ProjectID: projectID, BucketName: string(dbxTally.BucketName), }, ObjectCount: int64(dbxTally.ObjectCount), InlineSegments: int64(dbxTally.InlineSegmentsCount), RemoteSegments: int64(dbxTally.RemoteSegmentsCount), InlineBytes: int64(dbxTally.Inline), RemoteBytes: int64(dbxTally.Remote), MetadataSize: int64(dbxTally.MetadataSize), }) } return tallies, nil } // CreateStorageTally creates a record in the bucket_storage_tallies accounting table. func (db *ProjectAccounting) CreateStorageTally(ctx context.Context, tally accounting.BucketStorageTally) (err error) { defer mon.Task()(&ctx)(&err) return Error.Wrap(db.db.CreateNoReturn_BucketStorageTally( ctx, dbx.BucketStorageTally_BucketName([]byte(tally.BucketName)), dbx.BucketStorageTally_ProjectId(tally.ProjectID[:]), dbx.BucketStorageTally_IntervalStart(tally.IntervalStart), dbx.BucketStorageTally_Inline(uint64(tally.InlineBytes)), dbx.BucketStorageTally_Remote(uint64(tally.RemoteBytes)), dbx.BucketStorageTally_RemoteSegmentsCount(uint(tally.RemoteSegmentCount)), dbx.BucketStorageTally_InlineSegmentsCount(uint(tally.InlineSegmentCount)), dbx.BucketStorageTally_ObjectCount(uint(tally.ObjectCount)), dbx.BucketStorageTally_MetadataSize(uint64(tally.MetadataSize)), )) } // GetAllocatedBandwidthTotal returns the sum of GET bandwidth usage allocated for a projectID for a time frame. func (db *ProjectAccounting) GetAllocatedBandwidthTotal(ctx context.Context, projectID uuid.UUID, from time.Time) (_ int64, err error) { defer mon.Task()(&ctx)(&err) var sum *int64 query := `SELECT SUM(allocated) FROM bucket_bandwidth_rollups WHERE project_id = ? AND action = ? AND interval_start >= ?;` err = db.db.QueryRow(ctx, db.db.Rebind(query), projectID[:], pb.PieceAction_GET, from.UTC()).Scan(&sum) if errors.Is(err, sql.ErrNoRows) || sum == nil { return 0, nil } return *sum, err } // GetProjectAllocatedBandwidth returns allocated bandwidth for the specified year and month. func (db *ProjectAccounting) GetProjectAllocatedBandwidth(ctx context.Context, projectID uuid.UUID, year int, month time.Month) (_ int64, err error) { defer mon.Task()(&ctx)(&err) var egress *int64 interval := time.Date(year, month, 1, 0, 0, 0, 0, time.UTC) query := `SELECT egress_allocated FROM project_bandwidth_rollups WHERE project_id = ? AND interval_month = ?;` err = db.db.QueryRow(ctx, db.db.Rebind(query), projectID[:], interval).Scan(&egress) if errors.Is(err, sql.ErrNoRows) || egress == nil { return 0, nil } return *egress, err } // DeleteProjectAllocatedBandwidthBefore deletes project bandwidth rollups before the given time. func (db *ProjectAccounting) DeleteProjectAllocatedBandwidthBefore(ctx context.Context, before time.Time) (err error) { defer mon.Task()(&ctx)(&err) _, err = db.db.DB.ExecContext(ctx, db.db.Rebind("DELETE FROM project_bandwidth_rollups WHERE interval_month < ?"), before) return err } // GetStorageTotals returns the current inline and remote storage usage for a projectID. func (db *ProjectAccounting) GetStorageTotals(ctx context.Context, projectID uuid.UUID) (inline int64, remote int64, err error) { defer mon.Task()(&ctx)(&err) var inlineSum, remoteSum sql.NullInt64 var intervalStart time.Time // Sum all the inline and remote values for a project that all share the same interval_start. // All records for a project that have the same interval start are part of the same tally run. // This should represent the most recent calculation of a project's total at rest storage. query := `SELECT interval_start, SUM(inline), SUM(remote) FROM bucket_storage_tallies WHERE project_id = ? GROUP BY interval_start ORDER BY interval_start DESC LIMIT 1;` err = db.db.QueryRow(ctx, db.db.Rebind(query), projectID[:]).Scan(&intervalStart, &inlineSum, &remoteSum) if err != nil || !inlineSum.Valid || !remoteSum.Valid { return 0, 0, nil } return inlineSum.Int64, remoteSum.Int64, err } // UpdateProjectUsageLimit updates project usage limit. func (db *ProjectAccounting) UpdateProjectUsageLimit(ctx context.Context, projectID uuid.UUID, limit memory.Size) (err error) { defer mon.Task()(&ctx)(&err) _, err = db.db.Update_Project_By_Id(ctx, dbx.Project_Id(projectID[:]), dbx.Project_Update_Fields{ UsageLimit: dbx.Project_UsageLimit(limit.Int64()), }, ) return err } // UpdateProjectBandwidthLimit updates project bandwidth limit. func (db *ProjectAccounting) UpdateProjectBandwidthLimit(ctx context.Context, projectID uuid.UUID, limit memory.Size) (err error) { defer mon.Task()(&ctx)(&err) _, err = db.db.Update_Project_By_Id(ctx, dbx.Project_Id(projectID[:]), dbx.Project_Update_Fields{ BandwidthLimit: dbx.Project_BandwidthLimit(limit.Int64()), }, ) return err } // GetProjectStorageLimit returns project storage usage limit. func (db *ProjectAccounting) GetProjectStorageLimit(ctx context.Context, projectID uuid.UUID) (_ *int64, err error) { defer mon.Task()(&ctx)(&err) row, err := db.db.Get_Project_UsageLimit_By_Id(ctx, dbx.Project_Id(projectID[:]), ) if err != nil { return nil, err } return row.UsageLimit, nil } // GetProjectBandwidthLimit returns project bandwidth usage limit. func (db *ProjectAccounting) GetProjectBandwidthLimit(ctx context.Context, projectID uuid.UUID) (_ *int64, err error) { defer mon.Task()(&ctx)(&err) row, err := db.db.Get_Project_BandwidthLimit_By_Id(ctx, dbx.Project_Id(projectID[:]), ) if err != nil { return nil, err } return row.BandwidthLimit, nil } // GetProjectTotal retrieves project usage for a given period. func (db *ProjectAccounting) GetProjectTotal(ctx context.Context, projectID uuid.UUID, since, before time.Time) (usage *accounting.ProjectUsage, err error) { defer mon.Task()(&ctx)(&err) since = timeTruncateDown(since) bucketNames, err := db.getBuckets(ctx, projectID) if err != nil { return nil, err } storageQuery := db.db.Rebind(` SELECT bucket_storage_tallies.interval_start, bucket_storage_tallies.inline, bucket_storage_tallies.remote, bucket_storage_tallies.object_count FROM bucket_storage_tallies WHERE bucket_storage_tallies.project_id = ? AND bucket_storage_tallies.bucket_name = ? AND bucket_storage_tallies.interval_start >= ? AND bucket_storage_tallies.interval_start <= ? ORDER BY bucket_storage_tallies.interval_start DESC `) bucketsTallies := make(map[string][]*accounting.BucketStorageTally) for _, bucket := range bucketNames { storageTallies := make([]*accounting.BucketStorageTally, 0) storageTalliesRows, err := db.db.QueryContext(ctx, storageQuery, projectID[:], []byte(bucket), since, before) if err != nil { return nil, err } // generating tallies for each bucket name. for storageTalliesRows.Next() { tally := accounting.BucketStorageTally{} err = storageTalliesRows.Scan(&tally.IntervalStart, &tally.InlineBytes, &tally.RemoteBytes, &tally.ObjectCount) if err != nil { return nil, errs.Combine(err, storageTalliesRows.Close()) } tally.BucketName = bucket storageTallies = append(storageTallies, &tally) } err = errs.Combine(storageTalliesRows.Err(), storageTalliesRows.Close()) if err != nil { return nil, err } bucketsTallies[bucket] = storageTallies } totalEgress, err := db.getTotalEgress(ctx, projectID, since, before) if err != nil { return nil, err } usage = new(accounting.ProjectUsage) usage.Egress = memory.Size(totalEgress).Int64() // sum up storage and objects for _, tallies := range bucketsTallies { for i := len(tallies) - 1; i > 0; i-- { current := (tallies)[i] hours := (tallies)[i-1].IntervalStart.Sub(current.IntervalStart).Hours() usage.Storage += memory.Size(current.InlineBytes).Float64() * hours usage.Storage += memory.Size(current.RemoteBytes).Float64() * hours usage.ObjectCount += float64(current.ObjectCount) * hours } } usage.Since = since usage.Before = before return usage, nil } // getTotalEgress returns total egress (settled + inline) of each bucket_bandwidth_rollup // in selected time period, project id. // only process PieceAction_GET. func (db *ProjectAccounting) getTotalEgress(ctx context.Context, projectID uuid.UUID, since, before time.Time) (totalEgress int64, err error) { totalEgressQuery := db.db.Rebind(` SELECT COALESCE(SUM(settled) + SUM(inline), 0) FROM bucket_bandwidth_rollups WHERE project_id = ? AND interval_start >= ? AND interval_start <= ? AND action = ?; `) totalEgressRow := db.db.QueryRowContext(ctx, totalEgressQuery, projectID[:], since, before, pb.PieceAction_GET) err = totalEgressRow.Scan(&totalEgress) return totalEgress, err } // GetBucketUsageRollups retrieves summed usage rollups for every bucket of particular project for a given period. func (db *ProjectAccounting) GetBucketUsageRollups(ctx context.Context, projectID uuid.UUID, since, before time.Time) (_ []accounting.BucketUsageRollup, err error) { defer mon.Task()(&ctx)(&err) since = timeTruncateDown(since.UTC()) before = before.UTC() buckets, err := db.getBuckets(ctx, projectID) if err != nil { return nil, err } roullupsQuery := db.db.Rebind(`SELECT SUM(settled), SUM(inline), action FROM bucket_bandwidth_rollups WHERE project_id = ? AND bucket_name = ? AND interval_start >= ? AND interval_start <= ? GROUP BY action`) // TODO: should be optimized storageQuery := db.db.All_BucketStorageTally_By_ProjectId_And_BucketName_And_IntervalStart_GreaterOrEqual_And_IntervalStart_LessOrEqual_OrderBy_Desc_IntervalStart var bucketUsageRollups []accounting.BucketUsageRollup for _, bucket := range buckets { err := func() error { bucketRollup := accounting.BucketUsageRollup{ ProjectID: projectID, BucketName: []byte(bucket), Since: since, Before: before, } // get bucket_bandwidth_rollups rollupsRows, err := db.db.QueryContext(ctx, roullupsQuery, projectID[:], []byte(bucket), since, before) if err != nil { return err } defer func() { err = errs.Combine(err, rollupsRows.Close()) }() // fill egress for rollupsRows.Next() { var action pb.PieceAction var settled, inline int64 err = rollupsRows.Scan(&settled, &inline, &action) if err != nil { return err } switch action { case pb.PieceAction_GET: bucketRollup.GetEgress += memory.Size(settled + inline).GB() case pb.PieceAction_GET_AUDIT: bucketRollup.AuditEgress += memory.Size(settled + inline).GB() case pb.PieceAction_GET_REPAIR: bucketRollup.RepairEgress += memory.Size(settled + inline).GB() default: continue } } if err := rollupsRows.Err(); err != nil { return err } bucketStorageTallies, err := storageQuery(ctx, dbx.BucketStorageTally_ProjectId(projectID[:]), dbx.BucketStorageTally_BucketName([]byte(bucket)), dbx.BucketStorageTally_IntervalStart(since), dbx.BucketStorageTally_IntervalStart(before)) if err != nil { return err } // fill metadata, objects and stored data // hours calculated from previous tallies, // so we skip the most recent one for i := len(bucketStorageTallies) - 1; i > 0; i-- { current := bucketStorageTallies[i] hours := bucketStorageTallies[i-1].IntervalStart.Sub(current.IntervalStart).Hours() bucketRollup.RemoteStoredData += memory.Size(current.Remote).GB() * hours bucketRollup.InlineStoredData += memory.Size(current.Inline).GB() * hours bucketRollup.MetadataSize += memory.Size(current.MetadataSize).GB() * hours bucketRollup.RemoteSegments += float64(current.RemoteSegmentsCount) * hours bucketRollup.InlineSegments += float64(current.InlineSegmentsCount) * hours bucketRollup.ObjectCount += float64(current.ObjectCount) * hours } bucketUsageRollups = append(bucketUsageRollups, bucketRollup) return nil }() if err != nil { return nil, err } } return bucketUsageRollups, nil } // prefixIncrement returns the lexicographically lowest byte string which is // greater than origPrefix and does not have origPrefix as a prefix. If no such // byte string exists (origPrefix is empty, or origPrefix contains only 0xff // bytes), returns false for ok. // // examples: prefixIncrement([]byte("abc")) -> ([]byte("abd", true) // prefixIncrement([]byte("ab\xff\xff")) -> ([]byte("ac", true) // prefixIncrement([]byte("")) -> (nil, false) // prefixIncrement([]byte("\x00")) -> ([]byte("\x01", true) // prefixIncrement([]byte("\xff\xff\xff")) -> (nil, false) // func prefixIncrement(origPrefix []byte) (incremented []byte, ok bool) { incremented = make([]byte, len(origPrefix)) copy(incremented, origPrefix) i := len(incremented) - 1 for i >= 0 { if incremented[i] != 0xff { incremented[i]++ return incremented[:i+1], true } i-- } // there is no byte string which is greater than origPrefix and which does // not have origPrefix as a prefix. return nil, false } // prefixMatch creates a SQL expression which // will evaluate to true if and only if the value of expr starts with the value // of prefix. // // Returns also a slice of arguments that should be passed to the corresponding // db.Query* or db.Exec* to fill in parameters in the returned SQL expression. // // The returned SQL expression needs to be passed through Rebind(), as it uses // `?` markers instead of `$N`, because we don't know what N we would need to // use. func (db *ProjectAccounting) prefixMatch(expr string, prefix []byte) (string, []byte, error) { incrementedPrefix, ok := prefixIncrement(prefix) switch db.db.implementation { case dbutil.Postgres: if !ok { return fmt.Sprintf(`(%s >= ?)`, expr), nil, nil } return fmt.Sprintf(`(%s >= ? AND %s < ?)`, expr, expr), incrementedPrefix, nil case dbutil.Cockroach: if !ok { return fmt.Sprintf(`(%s >= ?:::BYTEA)`, expr), nil, nil } return fmt.Sprintf(`(%s >= ?:::BYTEA AND %s < ?:::BYTEA)`, expr, expr), incrementedPrefix, nil default: return "", nil, errs.New("invalid dbType: %v", db.db.driver) } } // GetBucketTotals retrieves bucket usage totals for period of time. func (db *ProjectAccounting) GetBucketTotals(ctx context.Context, projectID uuid.UUID, cursor accounting.BucketUsageCursor, since, before time.Time) (_ *accounting.BucketUsagePage, err error) { defer mon.Task()(&ctx)(&err) since = timeTruncateDown(since) bucketPrefix := []byte(cursor.Search) if cursor.Limit > 50 { cursor.Limit = 50 } if cursor.Page == 0 { return nil, errs.New("page can not be 0") } page := &accounting.BucketUsagePage{ Search: cursor.Search, Limit: cursor.Limit, Offset: uint64((cursor.Page - 1) * cursor.Limit), } bucketNameRange, incrPrefix, err := db.prefixMatch("name", bucketPrefix) if err != nil { return nil, err } countQuery := db.db.Rebind(`SELECT COUNT(name) FROM bucket_metainfos WHERE project_id = ? AND ` + bucketNameRange) args := []interface{}{ projectID[:], bucketPrefix, } if incrPrefix != nil { args = append(args, incrPrefix) } countRow := db.db.QueryRowContext(ctx, countQuery, args...) err = countRow.Scan(&page.TotalCount) if err != nil { return nil, err } if page.TotalCount == 0 { return page, nil } if page.Offset > page.TotalCount-1 { return nil, errs.New("page is out of range") } var buckets []string bucketsQuery := db.db.Rebind(`SELECT name FROM bucket_metainfos WHERE project_id = ? AND ` + bucketNameRange + `ORDER BY name ASC LIMIT ? OFFSET ?`) args = []interface{}{ projectID[:], bucketPrefix, } if incrPrefix != nil { args = append(args, incrPrefix) } args = append(args, page.Limit, page.Offset) bucketRows, err := db.db.QueryContext(ctx, bucketsQuery, args...) if err != nil { return nil, err } defer func() { err = errs.Combine(err, bucketRows.Close()) }() for bucketRows.Next() { var bucket string err = bucketRows.Scan(&bucket) if err != nil { return nil, err } buckets = append(buckets, bucket) } if err := bucketRows.Err(); err != nil { return nil, err } rollupsQuery := db.db.Rebind(`SELECT COALESCE(SUM(settled) + SUM(inline), 0) FROM bucket_bandwidth_rollups WHERE project_id = ? AND bucket_name = ? AND interval_start >= ? AND interval_start <= ? AND action = ?`) storageQuery := db.db.Rebind(`SELECT inline, remote, object_count FROM bucket_storage_tallies WHERE project_id = ? AND bucket_name = ? AND interval_start >= ? AND interval_start <= ? ORDER BY interval_start DESC LIMIT 1`) var bucketUsages []accounting.BucketUsage for _, bucket := range buckets { bucketUsage := accounting.BucketUsage{ ProjectID: projectID, BucketName: bucket, Since: since, Before: before, } // get bucket_bandwidth_rollups rollupRow := db.db.QueryRowContext(ctx, rollupsQuery, projectID[:], []byte(bucket), since, before, pb.PieceAction_GET) var egress int64 err = rollupRow.Scan(&egress) if err != nil { if !errors.Is(err, sql.ErrNoRows) { return nil, err } } bucketUsage.Egress = memory.Size(egress).GB() storageRow := db.db.QueryRowContext(ctx, storageQuery, projectID[:], []byte(bucket), since, before) var inline, remote, objectCount int64 err = storageRow.Scan(&inline, &remote, &objectCount) if err != nil { if !errors.Is(err, sql.ErrNoRows) { return nil, err } } // fill storage and object count bucketUsage.Storage = memory.Size(inline + remote).GB() bucketUsage.ObjectCount = objectCount bucketUsages = append(bucketUsages, bucketUsage) } page.PageCount = uint(page.TotalCount / uint64(cursor.Limit)) if page.TotalCount%uint64(cursor.Limit) != 0 { page.PageCount++ } page.BucketUsages = bucketUsages page.CurrentPage = cursor.Page return page, nil } // getBuckets list all bucket of certain project. func (db *ProjectAccounting) getBuckets(ctx context.Context, projectID uuid.UUID) (_ []string, err error) { defer mon.Task()(&ctx)(&err) bucketsQuery := db.db.Rebind(`SELECT DISTINCT bucket_name FROM bucket_storage_tallies WHERE project_id = ?`) bucketRows, err := db.db.QueryContext(ctx, bucketsQuery, projectID[:]) if err != nil { return nil, err } defer func() { err = errs.Combine(err, bucketRows.Close()) }() var buckets []string for bucketRows.Next() { var bucket string err = bucketRows.Scan(&bucket) if err != nil { return nil, err } buckets = append(buckets, bucket) } return buckets, bucketRows.Err() } // timeTruncateDown truncates down to the hour before to be in sync with orders endpoint. func timeTruncateDown(t time.Time) time.Time { return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 0, t.Location()) } // GetProjectLimits returns current project limit for both storage and bandwidth. func (db *ProjectAccounting) GetProjectLimits(ctx context.Context, projectID uuid.UUID) (_ accounting.ProjectLimits, err error) { defer mon.Task()(&ctx)(&err) row, err := db.db.Get_Project_BandwidthLimit_Project_UsageLimit_By_Id(ctx, dbx.Project_Id(projectID[:]), ) if err != nil { return accounting.ProjectLimits{}, err } return accounting.ProjectLimits{ Usage: row.UsageLimit, Bandwidth: row.BandwidthLimit, }, nil }