satellite/metabase: add IteratePendingObjects method
New metabase method IteratePendingObjects to iterate over entries in pending_objects table. Implementation and tests are mostly copy of code that we are using to iterate over objects table. Main difference is that pending_objects table have StreamID column part of primary key instead Version. Also structure of pending object is smaller than the one from object table but it's a detail. Method will be used to support new table in metainfo.ListObjects request. Next step will be to port rest of iterator implementation to support pending_objects table in metainfo.ListPendingObjectStreams. Part of https://github.com/storj/storj/issues/6047 Change-Id: Ia578182f88840539f3668d4a242953e061eace02
This commit is contained in:
parent
5a8ef89824
commit
929dc80091
@ -118,3 +118,65 @@ func (opts *IteratePendingObjectsByKey) Verify() error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// PendingObjectEntry contains information about an pending object item in a bucket.
|
||||
type PendingObjectEntry struct {
|
||||
IsPrefix bool
|
||||
|
||||
ObjectKey ObjectKey
|
||||
StreamID uuid.UUID
|
||||
|
||||
CreatedAt time.Time
|
||||
ExpiresAt *time.Time
|
||||
|
||||
EncryptedMetadataNonce []byte
|
||||
EncryptedMetadata []byte
|
||||
EncryptedMetadataEncryptedKey []byte
|
||||
|
||||
Encryption storj.EncryptionParameters
|
||||
}
|
||||
|
||||
// PendingObjectsIterator iterates over a sequence of PendingObjectEntry items.
|
||||
type PendingObjectsIterator interface {
|
||||
Next(ctx context.Context, item *PendingObjectEntry) bool
|
||||
}
|
||||
|
||||
// PendingObjectsCursor cursor for iterating over pending objects.
|
||||
type PendingObjectsCursor struct {
|
||||
Key ObjectKey
|
||||
StreamID uuid.UUID
|
||||
}
|
||||
|
||||
// IteratePendingObjects contains arguments necessary for listing pending objects in a bucket.
|
||||
type IteratePendingObjects struct {
|
||||
ProjectID uuid.UUID
|
||||
BucketName string
|
||||
Recursive bool
|
||||
BatchSize int
|
||||
Prefix ObjectKey
|
||||
Cursor PendingObjectsCursor
|
||||
IncludeCustomMetadata bool
|
||||
IncludeSystemMetadata bool
|
||||
}
|
||||
|
||||
// Verify verifies request fields.
|
||||
func (opts *IteratePendingObjects) Verify() error {
|
||||
switch {
|
||||
case opts.ProjectID.IsZero():
|
||||
return ErrInvalidRequest.New("ProjectID missing")
|
||||
case opts.BucketName == "":
|
||||
return ErrInvalidRequest.New("BucketName missing")
|
||||
case opts.BatchSize < 0:
|
||||
return ErrInvalidRequest.New("BatchSize is negative")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// IteratePendingObjects iterates through all pending objects.
|
||||
func (db *DB) IteratePendingObjects(ctx context.Context, opts IteratePendingObjects, fn func(context.Context, PendingObjectsIterator) error) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
if err = opts.Verify(); err != nil {
|
||||
return err
|
||||
}
|
||||
return iterateAllPendingObjects(ctx, db, opts, fn)
|
||||
}
|
||||
|
@ -84,6 +84,55 @@ func CreatePendingObject(ctx *testcontext.Context, t *testing.T, db *metabase.DB
|
||||
}
|
||||
}
|
||||
|
||||
// CreatePendingObjectNew creates a new pending object with the specified number of segments.
|
||||
// TODO CreatePendingObject will be removed when transition to pending_objects table will be complete.
|
||||
func CreatePendingObjectNew(ctx *testcontext.Context, t *testing.T, db *metabase.DB, obj metabase.ObjectStream, numberOfSegments byte) {
|
||||
obj.Version = metabase.NextVersion
|
||||
BeginObjectNextVersion{
|
||||
Opts: metabase.BeginObjectNextVersion{
|
||||
ObjectStream: obj,
|
||||
Encryption: DefaultEncryption,
|
||||
UsePendingObjectsTable: true,
|
||||
},
|
||||
Version: 1,
|
||||
}.Check(ctx, t, db)
|
||||
obj.Version = 1
|
||||
|
||||
for i := byte(0); i < numberOfSegments; i++ {
|
||||
BeginSegment{
|
||||
Opts: metabase.BeginSegment{
|
||||
ObjectStream: obj,
|
||||
Position: metabase.SegmentPosition{Part: 0, Index: uint32(i)},
|
||||
RootPieceID: storj.PieceID{i + 1},
|
||||
Pieces: []metabase.Piece{{
|
||||
Number: 1,
|
||||
StorageNode: testrand.NodeID(),
|
||||
}},
|
||||
UsePendingObjectsTable: true,
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
CommitSegment{
|
||||
Opts: metabase.CommitSegment{
|
||||
ObjectStream: obj,
|
||||
Position: metabase.SegmentPosition{Part: 0, Index: uint32(i)},
|
||||
RootPieceID: storj.PieceID{1},
|
||||
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
|
||||
|
||||
EncryptedKey: []byte{3},
|
||||
EncryptedKeyNonce: []byte{4},
|
||||
EncryptedETag: []byte{5},
|
||||
|
||||
EncryptedSize: 1024,
|
||||
PlainSize: 512,
|
||||
PlainOffset: 0,
|
||||
Redundancy: DefaultRedundancy,
|
||||
UsePendingObjectsTable: true,
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
}
|
||||
}
|
||||
|
||||
// CreateObject creates a new committed object with the specified number of segments.
|
||||
func CreateObject(ctx *testcontext.Context, t require.TestingT, db *metabase.DB, obj metabase.ObjectStream, numberOfSegments byte) metabase.Object {
|
||||
BeginObjectExactVersion{
|
||||
|
@ -542,6 +542,19 @@ func (coll *LoopIterateCollector) Add(ctx context.Context, it metabase.LoopObjec
|
||||
return nil
|
||||
}
|
||||
|
||||
// PendingObjectsCollector is for testing metabase.PendingObjectsCollector.
|
||||
type PendingObjectsCollector []metabase.PendingObjectEntry
|
||||
|
||||
// Add adds object entries from iterator to the collection.
|
||||
func (coll *PendingObjectsCollector) Add(ctx context.Context, it metabase.PendingObjectsIterator) error {
|
||||
var item metabase.PendingObjectEntry
|
||||
|
||||
for it.Next(ctx, &item) {
|
||||
*coll = append(*coll, item)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// IteratePendingObjectsByKey is for testing metabase.IteratePendingObjectsByKey.
|
||||
type IteratePendingObjectsByKey struct {
|
||||
Opts metabase.IteratePendingObjectsByKey
|
||||
@ -584,6 +597,26 @@ func (step IterateObjectsWithStatus) Check(ctx *testcontext.Context, t testing.T
|
||||
require.Zero(t, diff)
|
||||
}
|
||||
|
||||
// IteratePendingObjects is for testing metabase.IteratePendingObjects.
|
||||
type IteratePendingObjects struct {
|
||||
Opts metabase.IteratePendingObjects
|
||||
|
||||
Result []metabase.PendingObjectEntry
|
||||
ErrClass *errs.Class
|
||||
ErrText string
|
||||
}
|
||||
|
||||
// Check runs the test.
|
||||
func (step IteratePendingObjects) Check(ctx *testcontext.Context, t testing.TB, db *metabase.DB) {
|
||||
var result PendingObjectsCollector
|
||||
|
||||
err := db.IteratePendingObjects(ctx, step.Opts, result.Add)
|
||||
checkError(t, err, step.ErrClass, step.ErrText)
|
||||
|
||||
diff := cmp.Diff(step.Result, []metabase.PendingObjectEntry(result), DefaultTimeDiff())
|
||||
require.Zero(t, diff)
|
||||
}
|
||||
|
||||
// IterateLoopObjects is for testing metabase.IterateLoopObjects.
|
||||
type IterateLoopObjects struct {
|
||||
Opts metabase.IterateLoopObjects
|
||||
|
325
satellite/metabase/pending_objects_iterator.go
Normal file
325
satellite/metabase/pending_objects_iterator.go
Normal file
@ -0,0 +1,325 @@
|
||||
// Copyright (C) 2023 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package metabase
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/private/tagsql"
|
||||
)
|
||||
|
||||
// pendingObjectsIterator enables iteration on pending objects in a bucket.
|
||||
type pendingObjectsIterator struct {
|
||||
db *DB
|
||||
|
||||
projectID uuid.UUID
|
||||
bucketName []byte
|
||||
prefix ObjectKey
|
||||
prefixLimit ObjectKey
|
||||
batchSize int
|
||||
recursive bool
|
||||
includeCustomMetadata bool
|
||||
includeSystemMetadata bool
|
||||
|
||||
curIndex int
|
||||
curRows tagsql.Rows
|
||||
cursor pendingObjectIterateCursor // not relative to prefix
|
||||
|
||||
skipPrefix ObjectKey // relative to prefix
|
||||
doNextQuery func(context.Context, *pendingObjectsIterator) (_ tagsql.Rows, err error)
|
||||
|
||||
// failErr is set when either scan or next query fails during iteration.
|
||||
failErr error
|
||||
}
|
||||
|
||||
type pendingObjectIterateCursor struct {
|
||||
Key ObjectKey
|
||||
StreamID uuid.UUID
|
||||
Inclusive bool
|
||||
}
|
||||
|
||||
func iterateAllPendingObjects(ctx context.Context, db *DB, opts IteratePendingObjects, fn func(context.Context, PendingObjectsIterator) error) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
it := &pendingObjectsIterator{
|
||||
db: db,
|
||||
|
||||
projectID: opts.ProjectID,
|
||||
bucketName: []byte(opts.BucketName),
|
||||
prefix: opts.Prefix,
|
||||
prefixLimit: prefixLimit(opts.Prefix),
|
||||
batchSize: opts.BatchSize,
|
||||
recursive: opts.Recursive,
|
||||
includeCustomMetadata: opts.IncludeCustomMetadata,
|
||||
includeSystemMetadata: opts.IncludeSystemMetadata,
|
||||
|
||||
curIndex: 0,
|
||||
cursor: firstPendingObjectIterateCursor(opts.Recursive, opts.Cursor, opts.Prefix),
|
||||
|
||||
doNextQuery: doNextQueryAllPendingObjects,
|
||||
}
|
||||
|
||||
// start from either the cursor or prefix, depending on which is larger
|
||||
if lessKey(it.cursor.Key, opts.Prefix) {
|
||||
it.cursor.Key = opts.Prefix
|
||||
it.cursor.Inclusive = true
|
||||
}
|
||||
|
||||
return iteratePendingObjects(ctx, it, fn)
|
||||
}
|
||||
|
||||
func iteratePendingObjects(ctx context.Context, it *pendingObjectsIterator, fn func(context.Context, PendingObjectsIterator) error) (err error) {
|
||||
batchsizeLimit.Ensure(&it.batchSize)
|
||||
|
||||
it.curRows, err = it.doNextQuery(ctx, it)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
it.cursor.Inclusive = false
|
||||
|
||||
defer func() {
|
||||
if rowsErr := it.curRows.Err(); rowsErr != nil {
|
||||
err = errs.Combine(err, rowsErr)
|
||||
}
|
||||
err = errs.Combine(err, it.failErr, it.curRows.Close())
|
||||
}()
|
||||
|
||||
return fn(ctx, it)
|
||||
}
|
||||
|
||||
// Next returns true if there was another item and copy it in item.
|
||||
func (it *pendingObjectsIterator) Next(ctx context.Context, item *PendingObjectEntry) bool {
|
||||
if it.recursive {
|
||||
return it.next(ctx, item)
|
||||
}
|
||||
|
||||
// TODO: implement this on the database side
|
||||
|
||||
// skip until we are past the prefix we returned before.
|
||||
if it.skipPrefix != "" {
|
||||
for strings.HasPrefix(string(item.ObjectKey), string(it.skipPrefix)) {
|
||||
if !it.next(ctx, item) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
it.skipPrefix = ""
|
||||
} else {
|
||||
ok := it.next(ctx, item)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// should this be treated as a prefix?
|
||||
p := strings.IndexByte(string(item.ObjectKey), Delimiter)
|
||||
if p >= 0 {
|
||||
it.skipPrefix = item.ObjectKey[:p+1]
|
||||
*item = PendingObjectEntry{
|
||||
IsPrefix: true,
|
||||
ObjectKey: item.ObjectKey[:p+1],
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// next returns true if there was another item and copy it in item.
|
||||
func (it *pendingObjectsIterator) next(ctx context.Context, item *PendingObjectEntry) bool {
|
||||
next := it.curRows.Next()
|
||||
if !next {
|
||||
if it.curIndex < it.batchSize {
|
||||
return false
|
||||
}
|
||||
|
||||
if it.curRows.Err() != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if !it.recursive {
|
||||
afterPrefix := it.cursor.Key[len(it.prefix):]
|
||||
p := bytes.IndexByte([]byte(afterPrefix), Delimiter)
|
||||
if p >= 0 {
|
||||
it.cursor.Key = it.prefix + prefixLimit(afterPrefix[:p+1])
|
||||
it.cursor.StreamID = uuid.UUID{}
|
||||
}
|
||||
}
|
||||
|
||||
rows, err := it.doNextQuery(ctx, it)
|
||||
if err != nil {
|
||||
it.failErr = errs.Combine(it.failErr, err)
|
||||
return false
|
||||
}
|
||||
|
||||
if closeErr := it.curRows.Close(); closeErr != nil {
|
||||
it.failErr = errs.Combine(it.failErr, closeErr, rows.Close())
|
||||
return false
|
||||
}
|
||||
|
||||
it.curRows = rows
|
||||
it.curIndex = 0
|
||||
if !it.curRows.Next() {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
err := it.scanItem(item)
|
||||
if err != nil {
|
||||
it.failErr = errs.Combine(it.failErr, err)
|
||||
return false
|
||||
}
|
||||
|
||||
it.curIndex++
|
||||
it.cursor.Key = it.prefix + item.ObjectKey
|
||||
it.cursor.StreamID = item.StreamID
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func doNextQueryAllPendingObjects(ctx context.Context, it *pendingObjectsIterator) (_ tagsql.Rows, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
cursorCompare := ">"
|
||||
if it.cursor.Inclusive {
|
||||
cursorCompare = ">="
|
||||
}
|
||||
|
||||
if it.prefixLimit == "" {
|
||||
querySelectFields := pendingObjectsQuerySelectorFields("object_key", it)
|
||||
return it.db.db.QueryContext(ctx, `
|
||||
SELECT
|
||||
`+querySelectFields+`
|
||||
FROM pending_objects
|
||||
WHERE
|
||||
(project_id, bucket_name, object_key, stream_id) `+cursorCompare+` ($1, $2, $3, $4)
|
||||
AND (project_id, bucket_name) < ($1, $6)
|
||||
AND (expires_at IS NULL OR expires_at > now())
|
||||
ORDER BY (project_id, bucket_name, object_key, stream_id) ASC
|
||||
LIMIT $5
|
||||
`, it.projectID, it.bucketName,
|
||||
[]byte(it.cursor.Key), it.cursor.StreamID,
|
||||
it.batchSize,
|
||||
nextBucket(it.bucketName),
|
||||
)
|
||||
}
|
||||
|
||||
fromSubstring := 1
|
||||
if it.prefix != "" {
|
||||
fromSubstring = len(it.prefix) + 1
|
||||
}
|
||||
|
||||
querySelectFields := pendingObjectsQuerySelectorFields("SUBSTRING(object_key FROM $7)", it)
|
||||
return it.db.db.QueryContext(ctx, `
|
||||
SELECT
|
||||
`+querySelectFields+`
|
||||
FROM pending_objects
|
||||
WHERE
|
||||
(project_id, bucket_name, object_key, stream_id) `+cursorCompare+` ($1, $2, $3, $4)
|
||||
AND (project_id, bucket_name, object_key) < ($1, $2, $5)
|
||||
AND (expires_at IS NULL OR expires_at > now())
|
||||
ORDER BY (project_id, bucket_name, object_key, stream_id) ASC
|
||||
LIMIT $6
|
||||
`, it.projectID, it.bucketName,
|
||||
[]byte(it.cursor.Key), it.cursor.StreamID,
|
||||
[]byte(it.prefixLimit),
|
||||
it.batchSize,
|
||||
fromSubstring,
|
||||
)
|
||||
}
|
||||
|
||||
func pendingObjectsQuerySelectorFields(objectKeyColumn string, it *pendingObjectsIterator) string {
|
||||
querySelectFields := objectKeyColumn + `
|
||||
,stream_id
|
||||
,encryption`
|
||||
|
||||
if it.includeSystemMetadata {
|
||||
querySelectFields += `
|
||||
,created_at
|
||||
,expires_at`
|
||||
}
|
||||
|
||||
if it.includeCustomMetadata {
|
||||
querySelectFields += `
|
||||
,encrypted_metadata_nonce
|
||||
,encrypted_metadata
|
||||
,encrypted_metadata_encrypted_key`
|
||||
}
|
||||
|
||||
return querySelectFields
|
||||
}
|
||||
|
||||
// scanItem scans doNextQuery results into PendingObjectEntry.
|
||||
func (it *pendingObjectsIterator) scanItem(item *PendingObjectEntry) (err error) {
|
||||
item.IsPrefix = false
|
||||
|
||||
fields := []interface{}{
|
||||
&item.ObjectKey,
|
||||
&item.StreamID,
|
||||
encryptionParameters{&item.Encryption},
|
||||
}
|
||||
|
||||
if it.includeSystemMetadata {
|
||||
fields = append(fields,
|
||||
&item.CreatedAt,
|
||||
&item.ExpiresAt,
|
||||
)
|
||||
}
|
||||
|
||||
if it.includeCustomMetadata {
|
||||
fields = append(fields,
|
||||
&item.EncryptedMetadataNonce,
|
||||
&item.EncryptedMetadata,
|
||||
&item.EncryptedMetadataEncryptedKey,
|
||||
)
|
||||
}
|
||||
|
||||
return it.curRows.Scan(fields...)
|
||||
}
|
||||
|
||||
// firstPendingObjectIterateCursor adjust the cursor for a non-recursive iteration.
|
||||
// The cursor is non-inclusive and we need to adjust to handle prefix as cursor properly.
|
||||
// We return the next possible key from the prefix.
|
||||
func firstPendingObjectIterateCursor(recursive bool, cursor PendingObjectsCursor, prefix ObjectKey) pendingObjectIterateCursor {
|
||||
if recursive {
|
||||
return pendingObjectIterateCursor{
|
||||
Key: cursor.Key,
|
||||
StreamID: cursor.StreamID,
|
||||
}
|
||||
}
|
||||
|
||||
// when the cursor does not match the prefix, we'll return the original cursor.
|
||||
if !strings.HasPrefix(string(cursor.Key), string(prefix)) {
|
||||
return pendingObjectIterateCursor{
|
||||
Key: cursor.Key,
|
||||
StreamID: cursor.StreamID,
|
||||
}
|
||||
}
|
||||
|
||||
// handle case where:
|
||||
// prefix: x/y/
|
||||
// cursor: x/y/z/w
|
||||
// In this case, we want the skip prefix to be `x/y/z` + string('/' + 1).
|
||||
|
||||
cursorWithoutPrefix := cursor.Key[len(prefix):]
|
||||
p := strings.IndexByte(string(cursorWithoutPrefix), Delimiter)
|
||||
if p < 0 {
|
||||
// The cursor is not a prefix, but instead a path inside the prefix,
|
||||
// so we can use it directly.
|
||||
return pendingObjectIterateCursor{
|
||||
Key: cursor.Key,
|
||||
StreamID: cursor.StreamID,
|
||||
}
|
||||
}
|
||||
|
||||
// return the next prefix given a scoped path
|
||||
return pendingObjectIterateCursor{
|
||||
Key: cursor.Key[:len(prefix)+p] + ObjectKey(Delimiter+1),
|
||||
StreamID: cursor.StreamID,
|
||||
Inclusive: true,
|
||||
}
|
||||
}
|
1041
satellite/metabase/pending_objects_iterator_test.go
Normal file
1041
satellite/metabase/pending_objects_iterator_test.go
Normal file
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue
Block a user