From ae2cba1d237169f1926e64fcea1e3e085f4c34ec Mon Sep 17 00:00:00 2001 From: Michal Niewrzal Date: Mon, 7 Aug 2023 12:04:24 +0200 Subject: [PATCH] satellite/metabase: add IteratePendingObjectsByKeyNew method New metabase method IteratePendingObjectsByKeyNew to iterate over entries in pending_objects table with the same object key. Implementation and tests are mostly copy of code for IteratePendingObjectsByKey. Main difference is that pending_objects table have StreamID column part of primary key instead Version. Method will be used to support new table in metainfo.ListPendingObjectStreams request. After full transition to pending_objects table we should remove 'New' suffix from methods names. Part of https://github.com/storj/storj/issues/6047 Change-Id: Ifc1ecbc534f8510fbd70c4ec676cf2bf8abb94cb --- satellite/metabase/list.go | 11 + satellite/metabase/metabasetest/test.go | 22 ++ .../metabase/pending_objects_iterator.go | 55 ++++ .../metabase/pending_objects_iterator_test.go | 242 ++++++++++++++++++ 4 files changed, 330 insertions(+) diff --git a/satellite/metabase/list.go b/satellite/metabase/list.go index ce0b6491a..5c1d294e7 100644 --- a/satellite/metabase/list.go +++ b/satellite/metabase/list.go @@ -180,3 +180,14 @@ func (db *DB) IteratePendingObjects(ctx context.Context, opts IteratePendingObje } return iterateAllPendingObjects(ctx, db, opts, fn) } + +// IteratePendingObjectsByKeyNew iterates through all streams of pending objects with the same ObjectKey. +// TODO should be refactored to IteratePendingObjectsByKey after full transition to pending_objects table. +func (db *DB) IteratePendingObjectsByKeyNew(ctx context.Context, opts IteratePendingObjectsByKey, fn func(context.Context, PendingObjectsIterator) error) (err error) { + defer mon.Task()(&ctx)(&err) + + if err := opts.Verify(); err != nil { + return err + } + return iteratePendingObjectsByKeyNew(ctx, db, opts, fn) +} diff --git a/satellite/metabase/metabasetest/test.go b/satellite/metabase/metabasetest/test.go index 723a684c1..0d77cf5ed 100644 --- a/satellite/metabase/metabasetest/test.go +++ b/satellite/metabase/metabasetest/test.go @@ -577,6 +577,28 @@ func (step IteratePendingObjectsByKey) Check(ctx *testcontext.Context, t *testin require.Zero(t, diff) } +// IteratePendingObjectsByKeyNew is for testing metabase.IteratePendingObjectsByKeyNew. +type IteratePendingObjectsByKeyNew struct { + Opts metabase.IteratePendingObjectsByKey + + Result []metabase.PendingObjectEntry + ErrClass *errs.Class + ErrText string +} + +// Check runs the test. +func (step IteratePendingObjectsByKeyNew) Check(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + var collector PendingObjectsCollector + + err := db.IteratePendingObjectsByKeyNew(ctx, step.Opts, collector.Add) + checkError(t, err, step.ErrClass, step.ErrText) + + result := []metabase.PendingObjectEntry(collector) + + diff := cmp.Diff(step.Result, result, DefaultTimeDiff()) + require.Zero(t, diff) +} + // IterateObjectsWithStatus is for testing metabase.IterateObjectsWithStatus. type IterateObjectsWithStatus struct { Opts metabase.IterateObjectsWithStatus diff --git a/satellite/metabase/pending_objects_iterator.go b/satellite/metabase/pending_objects_iterator.go index 52b0b6ceb..c464d5a32 100644 --- a/satellite/metabase/pending_objects_iterator.go +++ b/satellite/metabase/pending_objects_iterator.go @@ -323,3 +323,58 @@ func firstPendingObjectIterateCursor(recursive bool, cursor PendingObjectsCursor Inclusive: true, } } + +func iteratePendingObjectsByKeyNew(ctx context.Context, db *DB, opts IteratePendingObjectsByKey, fn func(context.Context, PendingObjectsIterator) error) (err error) { + defer mon.Task()(&ctx)(&err) + + cursor := opts.Cursor + + if cursor.StreamID.IsZero() { + cursor.StreamID = uuid.UUID{} + } + + it := &pendingObjectsIterator{ + db: db, + + projectID: opts.ProjectID, + bucketName: []byte(opts.BucketName), + prefix: "", + prefixLimit: "", + batchSize: opts.BatchSize, + recursive: true, + includeCustomMetadata: true, + includeSystemMetadata: true, + + curIndex: 0, + cursor: pendingObjectIterateCursor{ + Key: opts.ObjectKey, + StreamID: opts.Cursor.StreamID, + }, + doNextQuery: doNextQueryPendingStreamsByKey, + } + + return iteratePendingObjects(ctx, it, fn) + +} + +func doNextQueryPendingStreamsByKey(ctx context.Context, it *pendingObjectsIterator) (_ tagsql.Rows, err error) { + defer mon.Task()(&ctx)(&err) + + return it.db.db.QueryContext(ctx, ` + SELECT + object_key, stream_id, encryption, + created_at, expires_at, + encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key + FROM pending_objects + WHERE + project_id = $1 AND bucket_name = $2 + AND object_key = $3 + AND stream_id > $4::BYTEA + ORDER BY stream_id ASC + LIMIT $5 + `, it.projectID, it.bucketName, + []byte(it.cursor.Key), + it.cursor.StreamID, + it.batchSize, + ) +} diff --git a/satellite/metabase/pending_objects_iterator_test.go b/satellite/metabase/pending_objects_iterator_test.go index bf3cc476f..4c740c469 100644 --- a/satellite/metabase/pending_objects_iterator_test.go +++ b/satellite/metabase/pending_objects_iterator_test.go @@ -1039,3 +1039,245 @@ func pendingPrefixEntry(key metabase.ObjectKey) metabase.PendingObjectEntry { ObjectKey: key, } } + +func TestIteratePendingObjectsByKey(t *testing.T) { + metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + obj := metabasetest.RandObjectStream() + + location := obj.Location() + + now := time.Now() + zombieDeadline := now.Add(24 * time.Hour) + + for _, test := range metabasetest.InvalidObjectLocations(location) { + test := test + t.Run(test.Name, func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + metabasetest.IteratePendingObjectsByKeyNew{ + Opts: metabase.IteratePendingObjectsByKey{ + ObjectLocation: test.ObjectLocation, + }, + ErrClass: test.ErrClass, + ErrText: test.ErrText, + }.Check(ctx, t, db) + metabasetest.Verify{}.Check(ctx, t, db) + }) + } + + t.Run("committed object", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + obj := metabasetest.RandObjectStream() + + metabasetest.CreateObject(ctx, t, db, obj, 0) + metabasetest.IteratePendingObjectsByKeyNew{ + Opts: metabase.IteratePendingObjectsByKey{ + ObjectLocation: obj.Location(), + BatchSize: 10, + }, + Result: nil, + }.Check(ctx, t, db) + }) + t.Run("non existing object", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + pending := metabasetest.RandObjectStream() + metabasetest.CreatePendingObjectNew(ctx, t, db, pending, 0) + + object := metabase.RawPendingObject{ + PendingObjectStream: metabasetest.ObjectStreamToPending(pending), + CreatedAt: now, + + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieDeadline, + } + + metabasetest.IteratePendingObjectsByKeyNew{ + Opts: metabase.IteratePendingObjectsByKey{ + ObjectLocation: metabase.ObjectLocation{ + ProjectID: pending.ProjectID, + BucketName: pending.BucketName, + ObjectKey: pending.Location().ObjectKey + "other", + }, + BatchSize: 10, + }, + Result: nil, + }.Check(ctx, t, db) + + metabasetest.Verify{PendingObjects: []metabase.RawPendingObject{object}}.Check(ctx, t, db) + }) + + t.Run("less and more objects than limit", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + pending := []metabase.ObjectStream{metabasetest.RandObjectStream(), metabasetest.RandObjectStream(), metabasetest.RandObjectStream()} + + location := pending[0].Location() + objects := make([]metabase.RawPendingObject, 3) + expected := make([]metabase.PendingObjectEntry, 3) + + for i, obj := range pending { + obj.ProjectID = location.ProjectID + obj.BucketName = location.BucketName + obj.ObjectKey = location.ObjectKey + obj.Version = metabase.Version(i + 1) + + metabasetest.CreatePendingObjectNew(ctx, t, db, obj, 0) + + objects[i] = metabase.RawPendingObject{ + PendingObjectStream: metabasetest.ObjectStreamToPending(obj), + CreatedAt: now, + + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieDeadline, + } + expected[i] = pendingObjectEntryFromRaw(objects[i]) + } + + sort.Slice(expected, func(i, j int) bool { + return expected[i].StreamID.Less(expected[j].StreamID) + }) + + metabasetest.IteratePendingObjectsByKeyNew{ + Opts: metabase.IteratePendingObjectsByKey{ + ObjectLocation: location, + BatchSize: 10, + }, + Result: expected, + }.Check(ctx, t, db) + + metabasetest.IteratePendingObjectsByKeyNew{ + Opts: metabase.IteratePendingObjectsByKey{ + ObjectLocation: location, + BatchSize: 2, + }, + Result: expected, + }.Check(ctx, t, db) + + metabasetest.Verify{PendingObjects: objects}.Check(ctx, t, db) + }) + + t.Run("prefixed object key", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + pending := metabasetest.RandObjectStream() + pending.ObjectKey = metabase.ObjectKey("a/prefixed/" + string(location.ObjectKey)) + metabasetest.CreatePendingObjectNew(ctx, t, db, pending, 0) + + object := metabase.RawPendingObject{ + PendingObjectStream: metabasetest.ObjectStreamToPending(pending), + CreatedAt: now, + + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieDeadline, + } + + metabasetest.IteratePendingObjectsByKeyNew{ + Opts: metabase.IteratePendingObjectsByKey{ + ObjectLocation: pending.Location(), + }, + Result: []metabase.PendingObjectEntry{pendingObjectEntryFromRaw(object)}, + }.Check(ctx, t, db) + + metabasetest.Verify{PendingObjects: []metabase.RawPendingObject{object}}.Check(ctx, t, db) + }) + + t.Run("using streamID cursor", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + pending := []metabase.ObjectStream{metabasetest.RandObjectStream(), metabasetest.RandObjectStream(), metabasetest.RandObjectStream()} + + location := pending[0].Location() + objects := make([]metabase.RawPendingObject, 3) + expected := make([]metabase.PendingObjectEntry, 3) + + for i, obj := range pending { + obj.ProjectID = location.ProjectID + obj.BucketName = location.BucketName + obj.ObjectKey = location.ObjectKey + obj.Version = metabase.Version(i + 1) + + metabasetest.CreatePendingObjectNew(ctx, t, db, obj, 0) + + objects[i] = metabase.RawPendingObject{ + PendingObjectStream: metabasetest.ObjectStreamToPending(obj), + CreatedAt: now, + + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieDeadline, + } + expected[i] = pendingObjectEntryFromRaw(objects[i]) + } + + sort.Slice(expected, func(i, j int) bool { + return expected[i].StreamID.Less(expected[j].StreamID) + }) + + metabasetest.IteratePendingObjectsByKeyNew{ + Opts: metabase.IteratePendingObjectsByKey{ + ObjectLocation: location, + BatchSize: 10, + Cursor: metabase.StreamIDCursor{ + StreamID: expected[0].StreamID, + }, + }, + Result: expected[1:], + }.Check(ctx, t, db) + + metabasetest.Verify{PendingObjects: objects}.Check(ctx, t, db) + }) + + t.Run("same key different versions", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + obj1 := metabasetest.RandObjectStream() + obj2 := obj1 + obj2.StreamID = testrand.UUID() + obj2.Version = 2 + + pending := []metabase.ObjectStream{obj1, obj2} + + location := pending[0].Location() + objects := make([]metabase.RawPendingObject, 2) + expected := make([]metabase.PendingObjectEntry, 2) + + for i, obj := range pending { + obj.ProjectID = location.ProjectID + obj.BucketName = location.BucketName + obj.ObjectKey = location.ObjectKey + obj.Version = metabase.Version(i + 1) + + metabasetest.CreatePendingObjectNew(ctx, t, db, obj, 0) + + objects[i] = metabase.RawPendingObject{ + PendingObjectStream: metabasetest.ObjectStreamToPending(obj), + CreatedAt: now, + + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieDeadline, + } + expected[i] = pendingObjectEntryFromRaw(objects[i]) + } + + sort.Slice(expected, func(i, j int) bool { + return expected[i].StreamID.Less(expected[j].StreamID) + }) + + metabasetest.IteratePendingObjectsByKeyNew{ + Opts: metabase.IteratePendingObjectsByKey{ + ObjectLocation: location, + BatchSize: 1, + }, + Result: expected, + }.Check(ctx, t, db) + + metabasetest.IteratePendingObjectsByKeyNew{ + Opts: metabase.IteratePendingObjectsByKey{ + ObjectLocation: location, + BatchSize: 3, + }, + Result: expected, + }.Check(ctx, t, db) + + metabasetest.Verify{PendingObjects: objects}.Check(ctx, t, db) + }) + }) +}