2021-02-18 12:54:09 +00:00
|
|
|
// Copyright (C) 2021 Storj Labs, Inc.
|
|
|
|
// See LICENSE for copying information.
|
|
|
|
|
|
|
|
package metabase
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2022-10-06 12:27:08 +01:00
|
|
|
"math"
|
2021-02-18 12:54:09 +00:00
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/zeebo/errs"
|
|
|
|
|
2021-03-02 12:20:02 +00:00
|
|
|
"storj.io/common/storj"
|
2021-02-18 12:54:09 +00:00
|
|
|
"storj.io/common/uuid"
|
2021-04-23 10:52:40 +01:00
|
|
|
"storj.io/private/tagsql"
|
2021-02-18 12:54:09 +00:00
|
|
|
)
|
|
|
|
|
2021-09-09 14:15:19 +01:00
|
|
|
const loopIteratorBatchSizeLimit = intLimitRange(5000)
|
2021-02-19 13:02:52 +00:00
|
|
|
|
2021-03-01 14:29:03 +00:00
|
|
|
// IterateLoopObjects contains arguments necessary for listing objects in metabase.
|
|
|
|
type IterateLoopObjects struct {
|
2021-02-18 12:54:09 +00:00
|
|
|
BatchSize int
|
2021-03-03 11:20:41 +00:00
|
|
|
|
2021-06-04 21:28:00 +01:00
|
|
|
AsOfSystemTime time.Time
|
|
|
|
AsOfSystemInterval time.Duration
|
2021-02-18 12:54:09 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Verify verifies get object request fields.
|
2021-03-01 14:29:03 +00:00
|
|
|
func (opts *IterateLoopObjects) Verify() error {
|
2021-02-18 12:54:09 +00:00
|
|
|
if opts.BatchSize < 0 {
|
|
|
|
return ErrInvalidRequest.New("BatchSize is negative")
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-03-03 11:20:41 +00:00
|
|
|
// LoopObjectsIterator iterates over a sequence of LoopObjectEntry items.
|
|
|
|
type LoopObjectsIterator interface {
|
|
|
|
Next(ctx context.Context, item *LoopObjectEntry) bool
|
|
|
|
}
|
|
|
|
|
|
|
|
// LoopObjectEntry contains information about object needed by metainfo loop.
|
|
|
|
type LoopObjectEntry struct {
|
2021-04-15 12:06:08 +01:00
|
|
|
ObjectStream // metrics, repair, tally
|
|
|
|
Status ObjectStatus // verify
|
|
|
|
CreatedAt time.Time // temp used by metabase-createdat-migration
|
|
|
|
ExpiresAt *time.Time // tally
|
|
|
|
SegmentCount int32 // metrics
|
2021-06-30 10:04:46 +01:00
|
|
|
TotalEncryptedSize int64 // tally
|
2021-04-15 12:06:08 +01:00
|
|
|
EncryptedMetadataSize int // tally
|
2021-03-03 11:20:41 +00:00
|
|
|
}
|
|
|
|
|
2021-06-30 10:04:46 +01:00
|
|
|
// Expired checks if object is expired relative to now.
|
|
|
|
func (o LoopObjectEntry) Expired(now time.Time) bool {
|
|
|
|
return o.ExpiresAt != nil && o.ExpiresAt.Before(now)
|
|
|
|
}
|
|
|
|
|
2021-03-01 14:29:03 +00:00
|
|
|
// IterateLoopObjects iterates through all objects in metabase.
|
|
|
|
func (db *DB) IterateLoopObjects(ctx context.Context, opts IterateLoopObjects, fn func(context.Context, LoopObjectsIterator) error) (err error) {
|
2021-02-18 12:54:09 +00:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
|
|
|
if err := opts.Verify(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2021-03-01 14:29:03 +00:00
|
|
|
it := &loopIterator{
|
2021-02-18 12:54:09 +00:00
|
|
|
db: db,
|
|
|
|
|
|
|
|
batchSize: opts.BatchSize,
|
|
|
|
|
2021-06-04 21:28:00 +01:00
|
|
|
curIndex: 0,
|
|
|
|
cursor: loopIterateCursor{},
|
|
|
|
asOfSystemTime: opts.AsOfSystemTime,
|
|
|
|
asOfSystemInterval: opts.AsOfSystemInterval,
|
2021-02-18 12:54:09 +00:00
|
|
|
}
|
|
|
|
|
2021-09-09 14:15:19 +01:00
|
|
|
loopIteratorBatchSizeLimit.Ensure(&it.batchSize)
|
2021-02-18 12:54:09 +00:00
|
|
|
|
|
|
|
it.curRows, err = it.doNextQuery(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
defer func() {
|
|
|
|
if rowsErr := it.curRows.Err(); rowsErr != nil {
|
|
|
|
err = errs.Combine(err, rowsErr)
|
|
|
|
}
|
2021-06-04 18:32:25 +01:00
|
|
|
err = errs.Combine(err, it.failErr, it.curRows.Close())
|
2021-02-18 12:54:09 +00:00
|
|
|
}()
|
|
|
|
|
|
|
|
return fn(ctx, it)
|
|
|
|
}
|
|
|
|
|
2021-03-01 14:29:03 +00:00
|
|
|
// loopIterator enables iteration of all objects in metabase.
|
|
|
|
type loopIterator struct {
|
2021-02-18 12:54:09 +00:00
|
|
|
db *DB
|
|
|
|
|
2021-06-04 21:28:00 +01:00
|
|
|
batchSize int
|
|
|
|
asOfSystemTime time.Time
|
|
|
|
asOfSystemInterval time.Duration
|
2021-02-18 12:54:09 +00:00
|
|
|
|
|
|
|
curIndex int
|
|
|
|
curRows tagsql.Rows
|
2021-03-01 14:29:03 +00:00
|
|
|
cursor loopIterateCursor
|
2021-06-04 18:32:25 +01:00
|
|
|
|
|
|
|
// failErr is set when either scan or next query fails during iteration.
|
|
|
|
failErr error
|
2021-02-18 12:54:09 +00:00
|
|
|
}
|
|
|
|
|
2021-03-01 14:29:03 +00:00
|
|
|
type loopIterateCursor struct {
|
2021-02-18 12:54:09 +00:00
|
|
|
ProjectID uuid.UUID
|
|
|
|
BucketName string
|
|
|
|
ObjectKey ObjectKey
|
|
|
|
Version Version
|
|
|
|
}
|
|
|
|
|
|
|
|
// Next returns true if there was another item and copy it in item.
|
2021-03-01 14:29:03 +00:00
|
|
|
func (it *loopIterator) Next(ctx context.Context, item *LoopObjectEntry) bool {
|
2021-02-18 12:54:09 +00:00
|
|
|
next := it.curRows.Next()
|
|
|
|
if !next {
|
|
|
|
if it.curIndex < it.batchSize {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
if it.curRows.Err() != nil {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
rows, err := it.doNextQuery(ctx)
|
|
|
|
if err != nil {
|
2021-06-04 18:32:25 +01:00
|
|
|
it.failErr = errs.Combine(it.failErr, err)
|
2021-02-18 12:54:09 +00:00
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
2021-06-04 18:32:25 +01:00
|
|
|
if closeErr := it.curRows.Close(); closeErr != nil {
|
|
|
|
it.failErr = errs.Combine(it.failErr, closeErr, rows.Close())
|
2021-02-18 12:54:09 +00:00
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
it.curRows = rows
|
|
|
|
it.curIndex = 0
|
|
|
|
if !it.curRows.Next() {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
err := it.scanItem(item)
|
|
|
|
if err != nil {
|
2021-06-04 18:32:25 +01:00
|
|
|
it.failErr = errs.Combine(it.failErr, err)
|
2021-02-18 12:54:09 +00:00
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
it.curIndex++
|
|
|
|
it.cursor.ProjectID = item.ProjectID
|
|
|
|
it.cursor.BucketName = item.BucketName
|
|
|
|
it.cursor.ObjectKey = item.ObjectKey
|
|
|
|
it.cursor.Version = item.Version
|
|
|
|
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
2021-03-01 14:29:03 +00:00
|
|
|
func (it *loopIterator) doNextQuery(ctx context.Context) (_ tagsql.Rows, err error) {
|
2021-02-18 12:54:09 +00:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
2021-07-28 14:44:22 +01:00
|
|
|
return it.db.db.QueryContext(ctx, `
|
2021-02-18 12:54:09 +00:00
|
|
|
SELECT
|
|
|
|
project_id, bucket_name,
|
2021-03-01 17:01:49 +00:00
|
|
|
object_key, stream_id, version,
|
2021-04-15 12:06:08 +01:00
|
|
|
status,
|
2021-03-19 13:06:13 +00:00
|
|
|
created_at, expires_at,
|
2021-06-30 10:04:46 +01:00
|
|
|
segment_count, total_encrypted_size,
|
2021-03-01 17:01:49 +00:00
|
|
|
LENGTH(COALESCE(encrypted_metadata,''))
|
2021-02-18 12:54:09 +00:00
|
|
|
FROM objects
|
2021-06-04 21:28:00 +01:00
|
|
|
`+it.db.asOfTime(it.asOfSystemTime, it.asOfSystemInterval)+`
|
2021-02-18 12:54:09 +00:00
|
|
|
WHERE (project_id, bucket_name, object_key, version) > ($1, $2, $3, $4)
|
|
|
|
ORDER BY project_id ASC, bucket_name ASC, object_key ASC, version ASC
|
|
|
|
LIMIT $5
|
2021-03-02 11:22:49 +00:00
|
|
|
`, it.cursor.ProjectID, []byte(it.cursor.BucketName),
|
2021-02-18 12:54:09 +00:00
|
|
|
[]byte(it.cursor.ObjectKey), int(it.cursor.Version),
|
|
|
|
it.batchSize,
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
2021-03-01 14:29:03 +00:00
|
|
|
// scanItem scans doNextQuery results into LoopObjectEntry.
|
|
|
|
func (it *loopIterator) scanItem(item *LoopObjectEntry) error {
|
2021-02-18 12:54:09 +00:00
|
|
|
return it.curRows.Scan(
|
|
|
|
&item.ProjectID, &item.BucketName,
|
2021-03-01 17:01:49 +00:00
|
|
|
&item.ObjectKey, &item.StreamID, &item.Version,
|
2021-04-15 12:06:08 +01:00
|
|
|
&item.Status,
|
2021-03-19 13:06:13 +00:00
|
|
|
&item.CreatedAt, &item.ExpiresAt,
|
2021-06-30 10:04:46 +01:00
|
|
|
&item.SegmentCount, &item.TotalEncryptedSize,
|
2021-03-01 17:01:49 +00:00
|
|
|
&item.EncryptedMetadataSize,
|
2021-02-18 12:54:09 +00:00
|
|
|
)
|
|
|
|
}
|
2021-03-02 12:20:02 +00:00
|
|
|
|
2021-03-01 15:27:04 +00:00
|
|
|
// SegmentIterator returns the next segment.
|
2021-06-15 00:21:20 +01:00
|
|
|
type SegmentIterator func(ctx context.Context, segment *LoopSegmentEntry) bool
|
2021-03-01 15:27:04 +00:00
|
|
|
|
2021-03-02 12:20:02 +00:00
|
|
|
// LoopSegmentEntry contains information about segment metadata needed by metainfo loop.
|
|
|
|
type LoopSegmentEntry struct {
|
2021-03-02 12:58:23 +00:00
|
|
|
StreamID uuid.UUID
|
|
|
|
Position SegmentPosition
|
2021-08-05 00:56:50 +01:00
|
|
|
CreatedAt time.Time // non-nillable
|
2021-06-08 11:03:24 +01:00
|
|
|
ExpiresAt *time.Time
|
2021-03-31 15:08:10 +01:00
|
|
|
RepairedAt *time.Time // repair
|
2021-03-02 12:58:23 +00:00
|
|
|
RootPieceID storj.PieceID
|
2021-03-02 12:20:02 +00:00
|
|
|
EncryptedSize int32 // size of the whole segment (not a piece)
|
2021-04-15 12:06:08 +01:00
|
|
|
PlainOffset int64 // verify
|
|
|
|
PlainSize int32 // verify
|
2023-02-17 09:51:57 +00:00
|
|
|
AliasPieces AliasPieces
|
2021-03-02 12:58:23 +00:00
|
|
|
Redundancy storj.RedundancyScheme
|
|
|
|
Pieces Pieces
|
2021-10-27 09:50:27 +01:00
|
|
|
Placement storj.PlacementConstraint
|
2021-03-02 12:20:02 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Inline returns true if segment is inline.
|
|
|
|
func (s LoopSegmentEntry) Inline() bool {
|
|
|
|
return s.Redundancy.IsZero() && len(s.Pieces) == 0
|
|
|
|
}
|
|
|
|
|
2021-05-14 09:57:14 +01:00
|
|
|
// LoopSegmentsIterator iterates over a sequence of LoopSegmentEntry items.
|
|
|
|
type LoopSegmentsIterator interface {
|
|
|
|
Next(ctx context.Context, item *LoopSegmentEntry) bool
|
|
|
|
}
|
|
|
|
|
|
|
|
// IterateLoopSegments contains arguments necessary for listing segments in metabase.
|
|
|
|
type IterateLoopSegments struct {
|
2021-06-10 12:50:38 +01:00
|
|
|
BatchSize int
|
|
|
|
AsOfSystemTime time.Time
|
|
|
|
AsOfSystemInterval time.Duration
|
2022-10-06 12:27:08 +01:00
|
|
|
StartStreamID uuid.UUID
|
|
|
|
EndStreamID uuid.UUID
|
2021-05-14 09:57:14 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// Verify verifies segments request fields.
|
|
|
|
func (opts *IterateLoopSegments) Verify() error {
|
|
|
|
if opts.BatchSize < 0 {
|
|
|
|
return ErrInvalidRequest.New("BatchSize is negative")
|
|
|
|
}
|
2022-10-06 12:27:08 +01:00
|
|
|
if !opts.EndStreamID.IsZero() {
|
|
|
|
if opts.EndStreamID.Less(opts.StartStreamID) {
|
|
|
|
return ErrInvalidRequest.New("EndStreamID is smaller than StartStreamID")
|
|
|
|
}
|
|
|
|
if opts.StartStreamID == opts.EndStreamID {
|
|
|
|
return ErrInvalidRequest.New("StartStreamID and EndStreamID must be different")
|
|
|
|
}
|
|
|
|
}
|
2021-05-14 09:57:14 +01:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// IterateLoopSegments iterates through all segments in metabase.
|
|
|
|
func (db *DB) IterateLoopSegments(ctx context.Context, opts IterateLoopSegments, fn func(context.Context, LoopSegmentsIterator) error) (err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
|
|
|
if err := opts.Verify(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
it := &loopSegmentIterator{
|
|
|
|
db: db,
|
|
|
|
|
2021-06-10 12:50:38 +01:00
|
|
|
asOfSystemTime: opts.AsOfSystemTime,
|
|
|
|
asOfSystemInterval: opts.AsOfSystemInterval,
|
|
|
|
batchSize: opts.BatchSize,
|
2021-05-14 09:57:14 +01:00
|
|
|
|
|
|
|
curIndex: 0,
|
2022-10-06 12:27:08 +01:00
|
|
|
cursor: loopSegmentIteratorCursor{
|
|
|
|
StartStreamID: opts.StartStreamID,
|
|
|
|
EndStreamID: opts.EndStreamID,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
if !opts.StartStreamID.IsZero() {
|
|
|
|
// uses MaxInt32 instead of MaxUint32 because position is an int8 in db.
|
|
|
|
it.cursor.StartPosition = SegmentPosition{math.MaxInt32, math.MaxInt32}
|
|
|
|
}
|
|
|
|
if it.cursor.EndStreamID.IsZero() {
|
2023-04-28 12:43:27 +01:00
|
|
|
it.cursor.EndStreamID = uuid.Max()
|
2021-05-14 09:57:14 +01:00
|
|
|
}
|
|
|
|
|
2021-09-09 14:15:19 +01:00
|
|
|
loopIteratorBatchSizeLimit.Ensure(&it.batchSize)
|
2021-05-14 09:57:14 +01:00
|
|
|
|
|
|
|
it.curRows, err = it.doNextQuery(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
defer func() {
|
|
|
|
if rowsErr := it.curRows.Err(); rowsErr != nil {
|
|
|
|
err = errs.Combine(err, rowsErr)
|
|
|
|
}
|
2021-06-04 18:32:25 +01:00
|
|
|
err = errs.Combine(err, it.failErr, it.curRows.Close())
|
2021-05-14 09:57:14 +01:00
|
|
|
}()
|
|
|
|
|
|
|
|
return fn(ctx, it)
|
|
|
|
}
|
|
|
|
|
|
|
|
// loopSegmentIterator enables iteration of all segments in metabase.
|
|
|
|
type loopSegmentIterator struct {
|
|
|
|
db *DB
|
|
|
|
|
2021-06-10 12:50:38 +01:00
|
|
|
batchSize int
|
|
|
|
asOfSystemTime time.Time
|
|
|
|
asOfSystemInterval time.Duration
|
2021-05-14 09:57:14 +01:00
|
|
|
|
|
|
|
curIndex int
|
|
|
|
curRows tagsql.Rows
|
|
|
|
cursor loopSegmentIteratorCursor
|
2021-06-04 18:32:25 +01:00
|
|
|
|
|
|
|
// failErr is set when either scan or next query fails during iteration.
|
|
|
|
failErr error
|
2021-05-14 09:57:14 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
type loopSegmentIteratorCursor struct {
|
2022-10-06 12:27:08 +01:00
|
|
|
StartStreamID uuid.UUID
|
|
|
|
StartPosition SegmentPosition
|
|
|
|
EndStreamID uuid.UUID
|
2021-05-14 09:57:14 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// Next returns true if there was another item and copy it in item.
|
|
|
|
func (it *loopSegmentIterator) Next(ctx context.Context, item *LoopSegmentEntry) bool {
|
|
|
|
next := it.curRows.Next()
|
|
|
|
if !next {
|
|
|
|
if it.curIndex < it.batchSize {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
if it.curRows.Err() != nil {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
rows, err := it.doNextQuery(ctx)
|
|
|
|
if err != nil {
|
2021-06-04 18:32:25 +01:00
|
|
|
it.failErr = errs.Combine(it.failErr, err)
|
2021-05-14 09:57:14 +01:00
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
2021-06-04 18:32:25 +01:00
|
|
|
if failErr := it.curRows.Close(); failErr != nil {
|
|
|
|
it.failErr = errs.Combine(it.failErr, failErr, rows.Close())
|
2021-05-14 09:57:14 +01:00
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
it.curRows = rows
|
|
|
|
it.curIndex = 0
|
|
|
|
if !it.curRows.Next() {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
err := it.scanItem(ctx, item)
|
|
|
|
if err != nil {
|
2021-06-04 18:32:25 +01:00
|
|
|
it.failErr = errs.Combine(it.failErr, err)
|
2021-05-14 09:57:14 +01:00
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
it.curIndex++
|
2022-10-06 12:27:08 +01:00
|
|
|
it.cursor.StartStreamID = item.StreamID
|
|
|
|
it.cursor.StartPosition = item.Position
|
2021-05-14 09:57:14 +01:00
|
|
|
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
func (it *loopSegmentIterator) doNextQuery(ctx context.Context) (_ tagsql.Rows, err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
2021-07-28 14:44:22 +01:00
|
|
|
return it.db.db.QueryContext(ctx, `
|
2021-05-14 09:57:14 +01:00
|
|
|
SELECT
|
|
|
|
stream_id, position,
|
2021-06-08 11:03:24 +01:00
|
|
|
created_at, expires_at, repaired_at,
|
2021-05-14 09:57:14 +01:00
|
|
|
root_piece_id,
|
|
|
|
encrypted_size,
|
|
|
|
plain_offset, plain_size,
|
|
|
|
redundancy,
|
2021-10-27 09:50:27 +01:00
|
|
|
remote_alias_pieces,
|
|
|
|
placement
|
2021-05-14 09:57:14 +01:00
|
|
|
FROM segments
|
2021-06-10 12:50:38 +01:00
|
|
|
`+it.db.asOfTime(it.asOfSystemTime, it.asOfSystemInterval)+`
|
2021-05-14 09:57:14 +01:00
|
|
|
WHERE
|
2022-10-06 12:27:08 +01:00
|
|
|
(stream_id, position) > ($1, $2) AND stream_id <= $4
|
2021-05-14 09:57:14 +01:00
|
|
|
ORDER BY (stream_id, position) ASC
|
|
|
|
LIMIT $3
|
2022-10-06 12:27:08 +01:00
|
|
|
`, it.cursor.StartStreamID, it.cursor.StartPosition.Encode(),
|
|
|
|
it.batchSize, it.cursor.EndStreamID,
|
2021-05-14 09:57:14 +01:00
|
|
|
)
|
|
|
|
}
|
|
|
|
|
|
|
|
// scanItem scans doNextQuery results into LoopSegmentEntry.
|
|
|
|
func (it *loopSegmentIterator) scanItem(ctx context.Context, item *LoopSegmentEntry) error {
|
|
|
|
err := it.curRows.Scan(
|
|
|
|
&item.StreamID, &item.Position,
|
2021-06-08 11:03:24 +01:00
|
|
|
&item.CreatedAt, &item.ExpiresAt, &item.RepairedAt,
|
2021-05-14 09:57:14 +01:00
|
|
|
&item.RootPieceID,
|
|
|
|
&item.EncryptedSize,
|
|
|
|
&item.PlainOffset, &item.PlainSize,
|
|
|
|
redundancyScheme{&item.Redundancy},
|
2023-02-17 09:51:57 +00:00
|
|
|
&item.AliasPieces,
|
2021-10-27 09:50:27 +01:00
|
|
|
&item.Placement,
|
2021-05-14 09:57:14 +01:00
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
return Error.New("failed to scan segments: %w", err)
|
|
|
|
}
|
|
|
|
|
2023-02-17 09:51:57 +00:00
|
|
|
item.Pieces, err = it.db.aliasCache.ConvertAliasesToPieces(ctx, item.AliasPieces)
|
2021-05-14 09:57:14 +01:00
|
|
|
if err != nil {
|
|
|
|
return Error.New("failed to convert aliases to pieces: %w", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
2022-10-06 12:27:08 +01:00
|
|
|
|
2022-10-05 11:22:23 +01:00
|
|
|
// BucketTally contains information about aggregate data stored in a bucket.
|
|
|
|
type BucketTally struct {
|
|
|
|
BucketLocation
|
|
|
|
|
2023-05-23 14:58:14 +01:00
|
|
|
ObjectCount int64
|
|
|
|
PendingObjectCount int64
|
2022-10-05 11:22:23 +01:00
|
|
|
|
|
|
|
TotalSegments int64
|
|
|
|
TotalBytes int64
|
|
|
|
|
|
|
|
MetadataSize int64
|
|
|
|
}
|
|
|
|
|
|
|
|
// CollectBucketTallies contains arguments necessary for looping through objects in metabase.
|
|
|
|
type CollectBucketTallies struct {
|
|
|
|
From BucketLocation
|
|
|
|
To BucketLocation
|
|
|
|
AsOfSystemTime time.Time
|
|
|
|
AsOfSystemInterval time.Duration
|
2022-11-17 13:06:17 +00:00
|
|
|
Now time.Time
|
2022-10-05 11:22:23 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// Verify verifies CollectBucketTallies request fields.
|
|
|
|
func (opts *CollectBucketTallies) Verify() error {
|
|
|
|
if opts.To.ProjectID.Less(opts.From.ProjectID) {
|
|
|
|
return ErrInvalidRequest.New("project ID To is before project ID From")
|
|
|
|
}
|
|
|
|
if opts.To.ProjectID == opts.From.ProjectID && opts.To.BucketName < opts.From.BucketName {
|
|
|
|
return ErrInvalidRequest.New("bucket name To is before bucket name From")
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// CollectBucketTallies collect limited bucket tallies from given bucket locations.
|
|
|
|
func (db *DB) CollectBucketTallies(ctx context.Context, opts CollectBucketTallies) (result []BucketTally, err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
|
|
|
if err := opts.Verify(); err != nil {
|
|
|
|
return []BucketTally{}, err
|
|
|
|
}
|
|
|
|
|
2022-11-17 13:06:17 +00:00
|
|
|
if opts.Now.IsZero() {
|
|
|
|
opts.Now = time.Now()
|
|
|
|
}
|
|
|
|
|
2022-10-05 11:22:23 +01:00
|
|
|
err = withRows(db.db.QueryContext(ctx, `
|
2023-05-23 14:58:14 +01:00
|
|
|
SELECT
|
|
|
|
project_id, bucket_name,
|
|
|
|
SUM(total_encrypted_size), SUM(segment_count), COALESCE(SUM(length(encrypted_metadata)), 0),
|
|
|
|
count(*), count(*) FILTER (WHERE status = 1)
|
2022-10-05 11:22:23 +01:00
|
|
|
FROM objects
|
|
|
|
`+db.asOfTime(opts.AsOfSystemTime, opts.AsOfSystemInterval)+`
|
|
|
|
WHERE (project_id, bucket_name) BETWEEN ($1, $2) AND ($3, $4) AND
|
2022-11-17 13:06:17 +00:00
|
|
|
(expires_at IS NULL OR expires_at > $5)
|
2022-10-05 11:22:23 +01:00
|
|
|
GROUP BY (project_id, bucket_name)
|
|
|
|
ORDER BY (project_id, bucket_name) ASC
|
2023-06-20 09:53:42 +01:00
|
|
|
`, opts.From.ProjectID, []byte(opts.From.BucketName), opts.To.ProjectID, []byte(opts.To.BucketName), opts.Now))(func(rows tagsql.Rows) error {
|
2022-10-05 11:22:23 +01:00
|
|
|
for rows.Next() {
|
|
|
|
var bucketTally BucketTally
|
|
|
|
|
|
|
|
if err = rows.Scan(
|
|
|
|
&bucketTally.ProjectID, &bucketTally.BucketName,
|
|
|
|
&bucketTally.TotalBytes, &bucketTally.TotalSegments,
|
|
|
|
&bucketTally.MetadataSize, &bucketTally.ObjectCount,
|
2023-05-23 14:58:14 +01:00
|
|
|
&bucketTally.PendingObjectCount,
|
2022-10-05 11:22:23 +01:00
|
|
|
); err != nil {
|
|
|
|
return Error.New("unable to query bucket tally: %w", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
result = append(result, bucketTally)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return []BucketTally{}, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return result, nil
|
|
|
|
}
|