satellite/metabase: add logic for verifying segments in given buckets

To be able to verify segments in a list of buckets, this change:
- adds method ListBucketsStreamIDs to list all stream ids belonging to a list of buckets provided using a ListVerifyBucketList on which Add(projectID, bucketName) is defined.
- allows to specify a list of streamIDs to check in ListVerifySegments

Fixes https://github.com/storj/storj-private/issues/101

Change-Id: I72a48a0873a3056ac54ad56c0e9242364b2ae918
This commit is contained in:
Fadila Khadar 2022-12-02 15:28:12 +01:00 committed by Storj Robot
parent d6bcde4672
commit 7fd23d6864
2 changed files with 222 additions and 14 deletions

View File

@ -9,6 +9,7 @@ import (
"storj.io/common/storj"
"storj.io/common/uuid"
"storj.io/private/dbutil/pgutil"
"storj.io/private/tagsql"
)
@ -19,6 +20,8 @@ const ListVerifyLimit = intLimitRange(100000)
type ListVerifySegments struct {
CursorStreamID uuid.UUID
CursorPosition SegmentPosition
StreamIDs []uuid.UUID
Limit int
AsOfSystemTime time.Time
@ -44,31 +47,66 @@ type VerifySegment struct {
AliasPieces AliasPieces
}
// ListVerifySegments lists specified stream segments.
func (db *DB) ListVerifySegments(ctx context.Context, opts ListVerifySegments) (result ListVerifySegmentsResult, err error) {
defer mon.Task()(&ctx)(&err)
func (opts *ListVerifySegments) getQueryAndParameters(asof string) (string, []interface{}) {
if opts.Limit <= 0 {
return ListVerifySegmentsResult{}, ErrInvalidRequest.New("Invalid limit: %d", opts.Limit)
}
ListVerifyLimit.Ensure(&opts.Limit)
result.Segments = make([]VerifySegment, 0, opts.Limit)
err = withRows(db.db.QueryContext(ctx, `
if len(opts.StreamIDs) == 0 {
return `
SELECT
stream_id, position,
created_at, repaired_at,
root_piece_id, redundancy,
remote_alias_pieces
FROM segments
`+db.asOfTime(opts.AsOfSystemTime, opts.AsOfSystemInterval)+`
` + asof + `
WHERE
(stream_id, position) > ($1, $2) AND
inline_data IS NULL AND
remote_alias_pieces IS NOT NULL
ORDER BY stream_id ASC, position ASC
LIMIT $3
`, opts.CursorStreamID, opts.CursorPosition, opts.Limit))(func(rows tagsql.Rows) error {
`, []interface{}{
opts.CursorStreamID,
opts.CursorPosition,
opts.Limit,
}
}
return `
SELECT
segments.stream_id, segments.position,
segments.created_at, segments.repaired_at,
segments.root_piece_id, segments.redundancy,
segments.remote_alias_pieces
FROM segments
` + asof + `
WHERE
stream_id = ANY($1) AND
(segments.stream_id, segments.position) > ($2, $3) AND
segments.inline_data IS NULL AND
segments.remote_alias_pieces IS NOT NULL
ORDER BY segments.stream_id ASC, segments.position ASC
LIMIT $4
`, []interface{}{
pgutil.UUIDArray(opts.StreamIDs),
opts.CursorStreamID,
opts.CursorPosition,
opts.Limit,
}
}
// ListVerifySegments lists specified stream segments.
func (db *DB) ListVerifySegments(ctx context.Context, opts ListVerifySegments) (result ListVerifySegmentsResult, err error) {
defer mon.Task()(&ctx)(&err)
if opts.Limit <= 0 {
return ListVerifySegmentsResult{}, ErrInvalidRequest.New("invalid limit: %d", opts.Limit)
}
ListVerifyLimit.Ensure(&opts.Limit)
result.Segments = make([]VerifySegment, 0, opts.Limit)
asOfString := db.asOfTime(opts.AsOfSystemTime, opts.AsOfSystemInterval)
query, parameters := opts.getQueryAndParameters(asOfString)
err = withRows(db.db.QueryContext(ctx, query, parameters...))(func(rows tagsql.Rows) error {
for rows.Next() {
var seg VerifySegment
err := rows.Scan(
@ -93,3 +131,91 @@ func (db *DB) ListVerifySegments(ctx context.Context, opts ListVerifySegments) (
return result, Error.Wrap(err)
}
// ListVerifyBucketList represents a list of buckets.
type ListVerifyBucketList struct {
Buckets []BucketLocation
}
// Add adds a (projectID, bucketName) to the list of buckets to be checked.
func (list *ListVerifyBucketList) Add(projectID uuid.UUID, bucketName string) {
list.Buckets = append(list.Buckets, BucketLocation{
ProjectID: projectID,
BucketName: bucketName,
})
}
// ListBucketsStreamIDsResult is the result of listing segments of a list of buckets.
type ListBucketsStreamIDsResult struct {
StreamIDs []uuid.UUID
Counts []int
LastBucket BucketLocation
}
func (list *ListBucketsStreamIDsResult) addStreamID(streamID uuid.UUID, count int) {
list.StreamIDs = append(list.StreamIDs, streamID)
list.Counts = append(list.Counts, count)
}
// ListBucketsStreamIDs contains arguments necessary for listing stream segments from buckets.
type ListBucketsStreamIDs struct {
BucketList ListVerifyBucketList
CursorBucket BucketLocation
CursorStreamID uuid.UUID
Limit int
AsOfSystemTime time.Time
AsOfSystemInterval time.Duration
}
// ListBucketsStreamIDs lists the streamIDs of a list of buckets.
func (db *DB) ListBucketsStreamIDs(ctx context.Context, opts ListBucketsStreamIDs) (ListBucketsStreamIDsResult, error) {
if opts.Limit <= 0 {
return ListBucketsStreamIDsResult{}, ErrInvalidRequest.New("invalid limit: %d", opts.Limit)
}
ListVerifyLimit.Ensure(&opts.Limit)
result := ListBucketsStreamIDsResult{}
bucketNamesBytes := [][]byte{}
projectIDs := []uuid.UUID{}
for _, bucket := range opts.BucketList.Buckets {
bucketNamesBytes = append(bucketNamesBytes, []byte(bucket.BucketName))
projectIDs = append(projectIDs, bucket.ProjectID)
}
// get the list of stream_ids and segment counts from the objects table
err := withRows(db.db.QueryContext(ctx, `
SELECT DISTINCT project_id, bucket_name, stream_id, segment_count
FROM objects
`+db.asOfTime(opts.AsOfSystemTime, opts.AsOfSystemInterval)+`
WHERE
(project_id, bucket_name, stream_id) > ($4::BYTEA, $5::BYTEA, $6::BYTEA) AND
(project_id, bucket_name) IN (SELECT UNNEST($1::BYTEA[]),UNNEST($2::BYTEA[]))
ORDER BY project_id, bucket_name, stream_id ASC
LIMIT $3
`, pgutil.UUIDArray(projectIDs), pgutil.ByteaArray(bucketNamesBytes),
opts.Limit,
opts.CursorBucket.ProjectID, opts.CursorBucket.BucketName, opts.CursorStreamID,
))(func(rows tagsql.Rows) error {
for rows.Next() {
var streamID uuid.UUID
var count int
err := rows.Scan(
&result.LastBucket.ProjectID,
&result.LastBucket.BucketName,
&streamID,
&count,
)
if err != nil {
return Error.Wrap(err)
}
result.addStreamID(streamID, count)
}
return nil
})
if err != nil {
return ListBucketsStreamIDsResult{}, err
}
return result, nil
}

View File

@ -29,7 +29,7 @@ func TestListVerifySegments(t *testing.T) {
Limit: -1,
},
ErrClass: &metabase.ErrInvalidRequest,
ErrText: "Invalid limit: -1",
ErrText: "invalid limit: -1",
}.Check(ctx, t, db)
metabasetest.Verify{}.Check(ctx, t, db)
@ -250,6 +250,88 @@ func TestListVerifySegments(t *testing.T) {
Result: metabase.ListVerifySegmentsResult{},
}.Check(ctx, t, db)
})
t.Run("streamID list", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
expectedVerifySegments := []metabase.VerifySegment{}
nbBuckets := 3
bucketList := metabase.ListVerifyBucketList{}
for i := 0; i < nbBuckets; i++ {
projectID := testrand.UUID()
bucketName := testrand.BucketName()
bucketList.Add(projectID, bucketName)
obj := metabasetest.RandObjectStream()
obj.ProjectID = projectID
obj.BucketName = bucketName
obj.StreamID[0] = byte(i) // make StreamIDs ordered
_ = metabasetest.CreateObject(ctx, t, db, obj, 1)
expectedVerifySegments = append(expectedVerifySegments, defaultVerifySegment(obj.StreamID, 0))
// create a un-related object
_ = metabasetest.CreateObject(ctx, t, db, metabasetest.RandObjectStream(), 2)
}
opts := metabase.ListBucketsStreamIDs{
BucketList: bucketList,
Limit: 10,
}
streamIDList, err := db.ListBucketsStreamIDs(ctx, opts)
require.NoError(t, err)
require.Len(t, streamIDList.StreamIDs, nbBuckets)
metabasetest.ListVerifySegments{
Opts: metabase.ListVerifySegments{
StreamIDs: streamIDList.StreamIDs,
Limit: 5,
},
Result: metabase.ListVerifySegmentsResult{
Segments: expectedVerifySegments,
},
}.Check(ctx, t, db)
})
})
}
func TestListBucketsStreamIDs(t *testing.T) {
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
t.Run("many objects segments", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
nbBuckets := 3
bucketList := metabase.ListVerifyBucketList{}
obj := metabasetest.RandObjectStream()
for i := 0; i < nbBuckets; i++ {
projectID := testrand.UUID()
projectID[0] = byte(i) // make projectID ordered
bucketName := testrand.BucketName()
bucketList.Add(projectID, bucketName)
obj.ProjectID = projectID
obj.BucketName = bucketName
obj.StreamID[0] = byte(i) // make StreamIDs ordered
_ = metabasetest.CreateObject(ctx, t, db, obj, 3)
// create a un-related object
_ = metabasetest.CreateObject(ctx, t, db, metabasetest.RandObjectStream(), 2)
}
opts := metabase.ListBucketsStreamIDs{
BucketList: bucketList,
Limit: 10,
}
listStreamIDsResult, err := db.ListBucketsStreamIDs(ctx, opts)
require.NoError(t, err)
require.Len(t, listStreamIDsResult.StreamIDs, nbBuckets)
require.Equal(t, obj.ProjectID, listStreamIDsResult.LastBucket.ProjectID)
require.Equal(t, obj.BucketName, listStreamIDsResult.LastBucket.BucketName)
require.Equal(t, obj.StreamID,
listStreamIDsResult.StreamIDs[len(listStreamIDsResult.StreamIDs)-1])
// TODO more test cases
})
})
}