satellite/metabase: add IteratePendingObjectsByKeyNew method

New metabase method IteratePendingObjectsByKeyNew to iterate
over entries in pending_objects table with the same object key.
Implementation and tests are mostly copy of code for
IteratePendingObjectsByKey. Main difference is that pending_objects
table have StreamID column part of primary key instead Version.

Method will be used to support new table in
metainfo.ListPendingObjectStreams request.

After full transition to pending_objects table we should remove 'New'
suffix from methods names.

Part of https://github.com/storj/storj/issues/6047

Change-Id: Ifc1ecbc534f8510fbd70c4ec676cf2bf8abb94cb
This commit is contained in:
Michal Niewrzal 2023-08-07 12:04:24 +02:00 committed by Michał Niewrzał
parent 929dc80091
commit ae2cba1d23
4 changed files with 330 additions and 0 deletions

View File

@ -180,3 +180,14 @@ func (db *DB) IteratePendingObjects(ctx context.Context, opts IteratePendingObje
}
return iterateAllPendingObjects(ctx, db, opts, fn)
}
// IteratePendingObjectsByKeyNew iterates through all streams of pending objects with the same ObjectKey.
// TODO should be refactored to IteratePendingObjectsByKey after full transition to pending_objects table.
func (db *DB) IteratePendingObjectsByKeyNew(ctx context.Context, opts IteratePendingObjectsByKey, fn func(context.Context, PendingObjectsIterator) error) (err error) {
defer mon.Task()(&ctx)(&err)
if err := opts.Verify(); err != nil {
return err
}
return iteratePendingObjectsByKeyNew(ctx, db, opts, fn)
}

View File

@ -577,6 +577,28 @@ func (step IteratePendingObjectsByKey) Check(ctx *testcontext.Context, t *testin
require.Zero(t, diff)
}
// IteratePendingObjectsByKeyNew is for testing metabase.IteratePendingObjectsByKeyNew.
type IteratePendingObjectsByKeyNew struct {
Opts metabase.IteratePendingObjectsByKey
Result []metabase.PendingObjectEntry
ErrClass *errs.Class
ErrText string
}
// Check runs the test.
func (step IteratePendingObjectsByKeyNew) Check(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
var collector PendingObjectsCollector
err := db.IteratePendingObjectsByKeyNew(ctx, step.Opts, collector.Add)
checkError(t, err, step.ErrClass, step.ErrText)
result := []metabase.PendingObjectEntry(collector)
diff := cmp.Diff(step.Result, result, DefaultTimeDiff())
require.Zero(t, diff)
}
// IterateObjectsWithStatus is for testing metabase.IterateObjectsWithStatus.
type IterateObjectsWithStatus struct {
Opts metabase.IterateObjectsWithStatus

View File

@ -323,3 +323,58 @@ func firstPendingObjectIterateCursor(recursive bool, cursor PendingObjectsCursor
Inclusive: true,
}
}
func iteratePendingObjectsByKeyNew(ctx context.Context, db *DB, opts IteratePendingObjectsByKey, fn func(context.Context, PendingObjectsIterator) error) (err error) {
defer mon.Task()(&ctx)(&err)
cursor := opts.Cursor
if cursor.StreamID.IsZero() {
cursor.StreamID = uuid.UUID{}
}
it := &pendingObjectsIterator{
db: db,
projectID: opts.ProjectID,
bucketName: []byte(opts.BucketName),
prefix: "",
prefixLimit: "",
batchSize: opts.BatchSize,
recursive: true,
includeCustomMetadata: true,
includeSystemMetadata: true,
curIndex: 0,
cursor: pendingObjectIterateCursor{
Key: opts.ObjectKey,
StreamID: opts.Cursor.StreamID,
},
doNextQuery: doNextQueryPendingStreamsByKey,
}
return iteratePendingObjects(ctx, it, fn)
}
func doNextQueryPendingStreamsByKey(ctx context.Context, it *pendingObjectsIterator) (_ tagsql.Rows, err error) {
defer mon.Task()(&ctx)(&err)
return it.db.db.QueryContext(ctx, `
SELECT
object_key, stream_id, encryption,
created_at, expires_at,
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key
FROM pending_objects
WHERE
project_id = $1 AND bucket_name = $2
AND object_key = $3
AND stream_id > $4::BYTEA
ORDER BY stream_id ASC
LIMIT $5
`, it.projectID, it.bucketName,
[]byte(it.cursor.Key),
it.cursor.StreamID,
it.batchSize,
)
}

View File

@ -1039,3 +1039,245 @@ func pendingPrefixEntry(key metabase.ObjectKey) metabase.PendingObjectEntry {
ObjectKey: key,
}
}
func TestIteratePendingObjectsByKey(t *testing.T) {
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
obj := metabasetest.RandObjectStream()
location := obj.Location()
now := time.Now()
zombieDeadline := now.Add(24 * time.Hour)
for _, test := range metabasetest.InvalidObjectLocations(location) {
test := test
t.Run(test.Name, func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
metabasetest.IteratePendingObjectsByKeyNew{
Opts: metabase.IteratePendingObjectsByKey{
ObjectLocation: test.ObjectLocation,
},
ErrClass: test.ErrClass,
ErrText: test.ErrText,
}.Check(ctx, t, db)
metabasetest.Verify{}.Check(ctx, t, db)
})
}
t.Run("committed object", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
obj := metabasetest.RandObjectStream()
metabasetest.CreateObject(ctx, t, db, obj, 0)
metabasetest.IteratePendingObjectsByKeyNew{
Opts: metabase.IteratePendingObjectsByKey{
ObjectLocation: obj.Location(),
BatchSize: 10,
},
Result: nil,
}.Check(ctx, t, db)
})
t.Run("non existing object", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
pending := metabasetest.RandObjectStream()
metabasetest.CreatePendingObjectNew(ctx, t, db, pending, 0)
object := metabase.RawPendingObject{
PendingObjectStream: metabasetest.ObjectStreamToPending(pending),
CreatedAt: now,
Encryption: metabasetest.DefaultEncryption,
ZombieDeletionDeadline: &zombieDeadline,
}
metabasetest.IteratePendingObjectsByKeyNew{
Opts: metabase.IteratePendingObjectsByKey{
ObjectLocation: metabase.ObjectLocation{
ProjectID: pending.ProjectID,
BucketName: pending.BucketName,
ObjectKey: pending.Location().ObjectKey + "other",
},
BatchSize: 10,
},
Result: nil,
}.Check(ctx, t, db)
metabasetest.Verify{PendingObjects: []metabase.RawPendingObject{object}}.Check(ctx, t, db)
})
t.Run("less and more objects than limit", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
pending := []metabase.ObjectStream{metabasetest.RandObjectStream(), metabasetest.RandObjectStream(), metabasetest.RandObjectStream()}
location := pending[0].Location()
objects := make([]metabase.RawPendingObject, 3)
expected := make([]metabase.PendingObjectEntry, 3)
for i, obj := range pending {
obj.ProjectID = location.ProjectID
obj.BucketName = location.BucketName
obj.ObjectKey = location.ObjectKey
obj.Version = metabase.Version(i + 1)
metabasetest.CreatePendingObjectNew(ctx, t, db, obj, 0)
objects[i] = metabase.RawPendingObject{
PendingObjectStream: metabasetest.ObjectStreamToPending(obj),
CreatedAt: now,
Encryption: metabasetest.DefaultEncryption,
ZombieDeletionDeadline: &zombieDeadline,
}
expected[i] = pendingObjectEntryFromRaw(objects[i])
}
sort.Slice(expected, func(i, j int) bool {
return expected[i].StreamID.Less(expected[j].StreamID)
})
metabasetest.IteratePendingObjectsByKeyNew{
Opts: metabase.IteratePendingObjectsByKey{
ObjectLocation: location,
BatchSize: 10,
},
Result: expected,
}.Check(ctx, t, db)
metabasetest.IteratePendingObjectsByKeyNew{
Opts: metabase.IteratePendingObjectsByKey{
ObjectLocation: location,
BatchSize: 2,
},
Result: expected,
}.Check(ctx, t, db)
metabasetest.Verify{PendingObjects: objects}.Check(ctx, t, db)
})
t.Run("prefixed object key", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
pending := metabasetest.RandObjectStream()
pending.ObjectKey = metabase.ObjectKey("a/prefixed/" + string(location.ObjectKey))
metabasetest.CreatePendingObjectNew(ctx, t, db, pending, 0)
object := metabase.RawPendingObject{
PendingObjectStream: metabasetest.ObjectStreamToPending(pending),
CreatedAt: now,
Encryption: metabasetest.DefaultEncryption,
ZombieDeletionDeadline: &zombieDeadline,
}
metabasetest.IteratePendingObjectsByKeyNew{
Opts: metabase.IteratePendingObjectsByKey{
ObjectLocation: pending.Location(),
},
Result: []metabase.PendingObjectEntry{pendingObjectEntryFromRaw(object)},
}.Check(ctx, t, db)
metabasetest.Verify{PendingObjects: []metabase.RawPendingObject{object}}.Check(ctx, t, db)
})
t.Run("using streamID cursor", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
pending := []metabase.ObjectStream{metabasetest.RandObjectStream(), metabasetest.RandObjectStream(), metabasetest.RandObjectStream()}
location := pending[0].Location()
objects := make([]metabase.RawPendingObject, 3)
expected := make([]metabase.PendingObjectEntry, 3)
for i, obj := range pending {
obj.ProjectID = location.ProjectID
obj.BucketName = location.BucketName
obj.ObjectKey = location.ObjectKey
obj.Version = metabase.Version(i + 1)
metabasetest.CreatePendingObjectNew(ctx, t, db, obj, 0)
objects[i] = metabase.RawPendingObject{
PendingObjectStream: metabasetest.ObjectStreamToPending(obj),
CreatedAt: now,
Encryption: metabasetest.DefaultEncryption,
ZombieDeletionDeadline: &zombieDeadline,
}
expected[i] = pendingObjectEntryFromRaw(objects[i])
}
sort.Slice(expected, func(i, j int) bool {
return expected[i].StreamID.Less(expected[j].StreamID)
})
metabasetest.IteratePendingObjectsByKeyNew{
Opts: metabase.IteratePendingObjectsByKey{
ObjectLocation: location,
BatchSize: 10,
Cursor: metabase.StreamIDCursor{
StreamID: expected[0].StreamID,
},
},
Result: expected[1:],
}.Check(ctx, t, db)
metabasetest.Verify{PendingObjects: objects}.Check(ctx, t, db)
})
t.Run("same key different versions", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
obj1 := metabasetest.RandObjectStream()
obj2 := obj1
obj2.StreamID = testrand.UUID()
obj2.Version = 2
pending := []metabase.ObjectStream{obj1, obj2}
location := pending[0].Location()
objects := make([]metabase.RawPendingObject, 2)
expected := make([]metabase.PendingObjectEntry, 2)
for i, obj := range pending {
obj.ProjectID = location.ProjectID
obj.BucketName = location.BucketName
obj.ObjectKey = location.ObjectKey
obj.Version = metabase.Version(i + 1)
metabasetest.CreatePendingObjectNew(ctx, t, db, obj, 0)
objects[i] = metabase.RawPendingObject{
PendingObjectStream: metabasetest.ObjectStreamToPending(obj),
CreatedAt: now,
Encryption: metabasetest.DefaultEncryption,
ZombieDeletionDeadline: &zombieDeadline,
}
expected[i] = pendingObjectEntryFromRaw(objects[i])
}
sort.Slice(expected, func(i, j int) bool {
return expected[i].StreamID.Less(expected[j].StreamID)
})
metabasetest.IteratePendingObjectsByKeyNew{
Opts: metabase.IteratePendingObjectsByKey{
ObjectLocation: location,
BatchSize: 1,
},
Result: expected,
}.Check(ctx, t, db)
metabasetest.IteratePendingObjectsByKeyNew{
Opts: metabase.IteratePendingObjectsByKey{
ObjectLocation: location,
BatchSize: 3,
},
Result: expected,
}.Check(ctx, t, db)
metabasetest.Verify{PendingObjects: objects}.Check(ctx, t, db)
})
})
}