storj/satellite/metabase/list_verify_segments.go

222 lines
5.9 KiB
Go
Raw Normal View History

// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package metabase
import (
"context"
"time"
"storj.io/common/storj"
"storj.io/common/uuid"
"storj.io/private/dbutil/pgutil"
"storj.io/private/tagsql"
)
// ListVerifyLimit is the maximum number of items the client can request for listing.
const ListVerifyLimit = intLimitRange(100000)
// ListVerifySegments contains arguments necessary for listing stream segments.
type ListVerifySegments struct {
CursorStreamID uuid.UUID
CursorPosition SegmentPosition
StreamIDs []uuid.UUID
Limit int
AsOfSystemTime time.Time
AsOfSystemInterval time.Duration
}
// ListVerifySegmentsResult is the result of ListVerifySegments.
type ListVerifySegmentsResult struct {
Segments []VerifySegment
}
// VerifySegment result of listing segments for verifying remote segments.
type VerifySegment struct {
StreamID uuid.UUID
Position SegmentPosition
CreatedAt time.Time
RepairedAt *time.Time
RootPieceID storj.PieceID
Redundancy storj.RedundancyScheme
AliasPieces AliasPieces
}
func (opts *ListVerifySegments) getQueryAndParameters(asof string) (string, []interface{}) {
if len(opts.StreamIDs) == 0 {
return `
SELECT
stream_id, position,
created_at, repaired_at,
root_piece_id, redundancy,
remote_alias_pieces
FROM segments
` + asof + `
WHERE
(stream_id, position) > ($1, $2) AND
inline_data IS NULL AND
remote_alias_pieces IS NOT NULL
ORDER BY stream_id ASC, position ASC
LIMIT $3
`, []interface{}{
opts.CursorStreamID,
opts.CursorPosition,
opts.Limit,
}
}
return `
SELECT
segments.stream_id, segments.position,
segments.created_at, segments.repaired_at,
segments.root_piece_id, segments.redundancy,
segments.remote_alias_pieces
FROM segments
` + asof + `
WHERE
stream_id = ANY($1) AND
(segments.stream_id, segments.position) > ($2, $3) AND
segments.inline_data IS NULL AND
segments.remote_alias_pieces IS NOT NULL
ORDER BY segments.stream_id ASC, segments.position ASC
LIMIT $4
`, []interface{}{
pgutil.UUIDArray(opts.StreamIDs),
opts.CursorStreamID,
opts.CursorPosition,
opts.Limit,
}
}
// ListVerifySegments lists specified stream segments.
func (db *DB) ListVerifySegments(ctx context.Context, opts ListVerifySegments) (result ListVerifySegmentsResult, err error) {
defer mon.Task()(&ctx)(&err)
if opts.Limit <= 0 {
return ListVerifySegmentsResult{}, ErrInvalidRequest.New("invalid limit: %d", opts.Limit)
}
ListVerifyLimit.Ensure(&opts.Limit)
result.Segments = make([]VerifySegment, 0, opts.Limit)
asOfString := db.asOfTime(opts.AsOfSystemTime, opts.AsOfSystemInterval)
query, parameters := opts.getQueryAndParameters(asOfString)
err = withRows(db.db.QueryContext(ctx, query, parameters...))(func(rows tagsql.Rows) error {
for rows.Next() {
var seg VerifySegment
err := rows.Scan(
&seg.StreamID,
&seg.Position,
&seg.CreatedAt,
&seg.RepairedAt,
&seg.RootPieceID,
redundancyScheme{&seg.Redundancy},
&seg.AliasPieces,
)
if err != nil {
return Error.Wrap(err)
}
result.Segments = append(result.Segments, seg)
}
return nil
})
return result, Error.Wrap(err)
}
// ListVerifyBucketList represents a list of buckets.
type ListVerifyBucketList struct {
Buckets []BucketLocation
}
// Add adds a (projectID, bucketName) to the list of buckets to be checked.
func (list *ListVerifyBucketList) Add(projectID uuid.UUID, bucketName string) {
list.Buckets = append(list.Buckets, BucketLocation{
ProjectID: projectID,
BucketName: bucketName,
})
}
// ListBucketsStreamIDsResult is the result of listing segments of a list of buckets.
type ListBucketsStreamIDsResult struct {
StreamIDs []uuid.UUID
Counts []int
LastBucket BucketLocation
}
func (list *ListBucketsStreamIDsResult) addStreamID(streamID uuid.UUID, count int) {
list.StreamIDs = append(list.StreamIDs, streamID)
list.Counts = append(list.Counts, count)
}
// ListBucketsStreamIDs contains arguments necessary for listing stream segments from buckets.
type ListBucketsStreamIDs struct {
BucketList ListVerifyBucketList
CursorBucket BucketLocation
CursorStreamID uuid.UUID
Limit int
AsOfSystemTime time.Time
AsOfSystemInterval time.Duration
}
// ListBucketsStreamIDs lists the streamIDs of a list of buckets.
func (db *DB) ListBucketsStreamIDs(ctx context.Context, opts ListBucketsStreamIDs) (ListBucketsStreamIDsResult, error) {
if opts.Limit <= 0 {
return ListBucketsStreamIDsResult{}, ErrInvalidRequest.New("invalid limit: %d", opts.Limit)
}
ListVerifyLimit.Ensure(&opts.Limit)
result := ListBucketsStreamIDsResult{}
bucketNamesBytes := [][]byte{}
projectIDs := []uuid.UUID{}
for _, bucket := range opts.BucketList.Buckets {
bucketNamesBytes = append(bucketNamesBytes, []byte(bucket.BucketName))
projectIDs = append(projectIDs, bucket.ProjectID)
}
// get the list of stream_ids and segment counts from the objects table
err := withRows(db.db.QueryContext(ctx, `
SELECT DISTINCT project_id, bucket_name, stream_id, segment_count
FROM objects
`+db.asOfTime(opts.AsOfSystemTime, opts.AsOfSystemInterval)+`
WHERE
(project_id, bucket_name, stream_id) > ($4::BYTEA, $5::BYTEA, $6::BYTEA) AND
(project_id, bucket_name) IN (SELECT UNNEST($1::BYTEA[]),UNNEST($2::BYTEA[]))
ORDER BY project_id, bucket_name, stream_id ASC
LIMIT $3
`, pgutil.UUIDArray(projectIDs), pgutil.ByteaArray(bucketNamesBytes),
opts.Limit,
opts.CursorBucket.ProjectID, opts.CursorBucket.BucketName, opts.CursorStreamID,
))(func(rows tagsql.Rows) error {
for rows.Next() {
var streamID uuid.UUID
var count int
err := rows.Scan(
&result.LastBucket.ProjectID,
&result.LastBucket.BucketName,
&streamID,
&count,
)
if err != nil {
return Error.Wrap(err)
}
result.addStreamID(streamID, count)
}
return nil
})
if err != nil {
return ListBucketsStreamIDsResult{}, err
}
return result, nil
}