From 780c0e0b35270ec653b7fd9479304b590447090b Mon Sep 17 00:00:00 2001 From: Michal Niewrzal Date: Mon, 7 Aug 2023 14:10:17 +0200 Subject: [PATCH] satellite/metainfo: adjust ListPendingObjectStreams to pending_objects table New method IteratePendingObjectsByKeyNew is used to provide results for metainfo.ListPendingObjectStreams. This endpoint is used to list pending objects with the same object key. In this case to support both tables (objects, pending_objects) we need to do one query per table and merge results. Because existing metainfo protobuf API is missing some fields to have proper listing cursor we are not able to make ListPendingObjectStreams correct for returning more than single page. We need to fix it separately. With this change also turns out that approach to merge results from listing objects for ListObjects method was wrong and this change is also fixing this problem. Handling both tables will be removed at some point and only pending_objects will be used to look for results. Part of https://github.com/storj/storj/issues/6047 Change-Id: I8a88a6f885ad529704e6c032f1d97926123c2909 --- satellite/metainfo/endpoint_object.go | 197 ++++++++++++++++----- satellite/metainfo/endpoint_object_test.go | 122 +++++++++++++ 2 files changed, 273 insertions(+), 46 deletions(-) diff --git a/satellite/metainfo/endpoint_object.go b/satellite/metainfo/endpoint_object.go index 6fe4a73e8..d6be94dcc 100644 --- a/satellite/metainfo/endpoint_object.go +++ b/satellite/metainfo/endpoint_object.go @@ -7,6 +7,7 @@ import ( "bytes" "context" "fmt" + "sort" "time" "github.com/jtolio/eventkit" @@ -903,6 +904,13 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq resp.More = result.More } else { if status == metabase.Pending && endpoint.config.UsePendingObjectsTable { + type ObjectListItem struct { + Item *pb.ObjectListItem + StreamID uuid.UUID + } + + pendingObjectsEntries := make([]ObjectListItem, 0, limit) + // TODO when objects table will be free from pending objects only this listing method will remain err = endpoint.metabase.IteratePendingObjects(ctx, metabase.IteratePendingObjects{ ProjectID: keyInfo.ProjectID, BucketName: string(req.Bucket), @@ -917,53 +925,118 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq IncludeSystemMetadata: includeSystemMetadata, }, func(ctx context.Context, it metabase.PendingObjectsIterator) error { entry := metabase.PendingObjectEntry{} - for len(resp.Items) < limit && it.Next(ctx, &entry) { + for it.Next(ctx, &entry) { item, err := endpoint.pendingObjectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeSystemMetadata, includeCustomMetadata, placement) if err != nil { return err } - resp.Items = append(resp.Items, item) + pendingObjectsEntries = append(pendingObjectsEntries, ObjectListItem{ + Item: item, + StreamID: entry.StreamID, + }) } - resp.More = it.Next(ctx, &entry) return nil }) if err != nil { return nil, endpoint.convertMetabaseErr(err) } - } - // we always need results from both tables for now - err = endpoint.metabase.IterateObjectsAllVersionsWithStatus(ctx, - metabase.IterateObjectsWithStatus{ - ProjectID: keyInfo.ProjectID, - BucketName: string(req.Bucket), - Prefix: prefix, - Cursor: metabase.IterateCursor{ - Key: cursorKey, - Version: cursorVersion, - }, - Recursive: req.Recursive, - BatchSize: limit + 1, - Status: status, - IncludeCustomMetadata: includeCustomMetadata, - IncludeSystemMetadata: includeSystemMetadata, - }, func(ctx context.Context, it metabase.ObjectsIterator) error { - entry := metabase.ObjectEntry{} - for len(resp.Items) < limit && it.Next(ctx, &entry) { - item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeSystemMetadata, includeCustomMetadata, placement) - if err != nil { - return err + // we always need results from both tables for now + objectsEntries := make([]ObjectListItem, 0, limit) + err = endpoint.metabase.IterateObjectsAllVersionsWithStatus(ctx, + metabase.IterateObjectsWithStatus{ + ProjectID: keyInfo.ProjectID, + BucketName: string(req.Bucket), + Prefix: prefix, + Cursor: metabase.IterateCursor{ + Key: cursorKey, + Version: cursorVersion, + }, + Recursive: req.Recursive, + BatchSize: limit + 1, + Status: metabase.Pending, + IncludeCustomMetadata: includeCustomMetadata, + IncludeSystemMetadata: includeSystemMetadata, + }, func(ctx context.Context, it metabase.ObjectsIterator) error { + entry := metabase.ObjectEntry{} + for it.Next(ctx, &entry) { + item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeSystemMetadata, includeCustomMetadata, placement) + if err != nil { + return err + } + objectsEntries = append(objectsEntries, ObjectListItem{ + Item: item, + StreamID: entry.StreamID, + }) } - resp.Items = append(resp.Items, item) - } + return nil + }, + ) + if err != nil { + return nil, endpoint.convertMetabaseErr(err) + } - // we need to take into account also potential results from IteratePendingObjects - resp.More = resp.More || it.Next(ctx, &entry) - return nil - }, - ) - if err != nil { - return nil, endpoint.convertMetabaseErr(err) + // combine results from both tables and sort them by object key to be able to cut results to the limit + allResults := make([]ObjectListItem, 0, len(pendingObjectsEntries)+len(objectsEntries)) + allResults = append(allResults, pendingObjectsEntries...) + allResults = append(allResults, objectsEntries...) + sort.Slice(allResults, func(i, j int) bool { + keyCompare := bytes.Compare(allResults[i].Item.EncryptedObjectKey, allResults[j].Item.EncryptedObjectKey) + switch { + case keyCompare == -1: + return true + case keyCompare == 1: + return false + case allResults[i].Item.Version < allResults[j].Item.Version: + return true + case allResults[i].Item.Version > allResults[j].Item.Version: + return false + default: + return allResults[i].StreamID.Less(allResults[j].StreamID) + } + }) + if len(allResults) >= limit { + resp.More = len(allResults) > limit + allResults = allResults[:limit] + } + resp.Items = make([]*pb.ObjectListItem, len(allResults)) + for i, objectListItem := range allResults { + resp.Items[i] = objectListItem.Item + } + } else { + // we always need results from both tables for now + err = endpoint.metabase.IterateObjectsAllVersionsWithStatus(ctx, + metabase.IterateObjectsWithStatus{ + ProjectID: keyInfo.ProjectID, + BucketName: string(req.Bucket), + Prefix: prefix, + Cursor: metabase.IterateCursor{ + Key: cursorKey, + Version: cursorVersion, + }, + Recursive: req.Recursive, + BatchSize: limit + 1, + Status: status, + IncludeCustomMetadata: includeCustomMetadata, + IncludeSystemMetadata: includeSystemMetadata, + }, func(ctx context.Context, it metabase.ObjectsIterator) error { + entry := metabase.ObjectEntry{} + for len(resp.Items) < limit && it.Next(ctx, &entry) { + item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeSystemMetadata, includeCustomMetadata, placement) + if err != nil { + return err + } + resp.Items = append(resp.Items, item) + } + + // we need to take into account also potential results from IteratePendingObjects + resp.More = resp.More || it.Next(ctx, &entry) + return nil + }, + ) + if err != nil { + return nil, endpoint.convertMetabaseErr(err) + } } } endpoint.log.Info("Object List", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "list"), zap.String("type", "object")) @@ -1024,25 +1097,47 @@ func (endpoint *Endpoint) ListPendingObjectStreams(ctx context.Context, req *pb. resp = &pb.ObjectListPendingStreamsResponse{} resp.Items = []*pb.ObjectListItem{} - err = endpoint.metabase.IteratePendingObjectsByKey(ctx, - metabase.IteratePendingObjectsByKey{ - ObjectLocation: metabase.ObjectLocation{ - ProjectID: keyInfo.ProjectID, - BucketName: string(req.Bucket), - ObjectKey: metabase.ObjectKey(req.EncryptedObjectKey), + + options := metabase.IteratePendingObjectsByKey{ + ObjectLocation: metabase.ObjectLocation{ + ProjectID: keyInfo.ProjectID, + BucketName: string(req.Bucket), + ObjectKey: metabase.ObjectKey(req.EncryptedObjectKey), + }, + BatchSize: limit + 1, + Cursor: cursor, + } + + if endpoint.config.UsePendingObjectsTable { + err = endpoint.metabase.IteratePendingObjectsByKeyNew(ctx, + options, func(ctx context.Context, it metabase.PendingObjectsIterator) error { + entry := metabase.PendingObjectEntry{} + for it.Next(ctx, &entry) { + item, err := endpoint.pendingObjectEntryToProtoListItem(ctx, req.Bucket, entry, "", true, true, placement) + if err != nil { + return err + } + resp.Items = append(resp.Items, item) + } + return nil }, - BatchSize: limit + 1, - Cursor: cursor, - }, func(ctx context.Context, it metabase.ObjectsIterator) error { + ) + if err != nil { + return nil, endpoint.convertMetabaseErr(err) + } + } + + objectsEntries := make([]*pb.ObjectListItem, 0, limit) + err = endpoint.metabase.IteratePendingObjectsByKey(ctx, + options, func(ctx context.Context, it metabase.ObjectsIterator) error { entry := metabase.ObjectEntry{} - for len(resp.Items) < limit && it.Next(ctx, &entry) { + for it.Next(ctx, &entry) { item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, "", true, true, placement) if err != nil { return err } - resp.Items = append(resp.Items, item) + objectsEntries = append(objectsEntries, item) } - resp.More = it.Next(ctx, &entry) return nil }, ) @@ -1050,6 +1145,16 @@ func (endpoint *Endpoint) ListPendingObjectStreams(ctx context.Context, req *pb. return nil, endpoint.convertMetabaseErr(err) } + // TODO currently this request have a bug if we would like to list all pending objects + // with the same name if we have more than single page of them (1000) because protobuf + // cursor doesn't include additional things like StreamID so it's a bit useless to do + // anything else than just combine results + resp.Items = append(resp.Items, objectsEntries...) + if len(resp.Items) >= limit { + resp.More = len(resp.Items) > limit + resp.Items = resp.Items[:limit] + } + endpoint.log.Info("List pending object streams", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "list"), zap.String("type", "object")) mon.Meter("req_list_pending_object_streams").Mark(1) diff --git a/satellite/metainfo/endpoint_object_test.go b/satellite/metainfo/endpoint_object_test.go index 137a7f667..c225a4bfb 100644 --- a/satellite/metainfo/endpoint_object_test.go +++ b/satellite/metainfo/endpoint_object_test.go @@ -19,6 +19,7 @@ import ( "github.com/zeebo/errs" "go.uber.org/zap" "golang.org/x/exp/maps" + "golang.org/x/exp/slices" "storj.io/common/errs2" "storj.io/common/identity" @@ -802,6 +803,127 @@ func TestEndpoint_Object_No_StorageNodes_UsePendingObjectsTable(t *testing.T) { require.False(t, iterator.Next()) require.NoError(t, iterator.Err()) }) + + t.Run("ListPendingObjectStreams", func(t *testing.T) { + defer ctx.Check(deleteBucket) + + _, err = project.CreateBucket(ctx, bucketName) + require.NoError(t, err) + + _, err = project.BeginUpload(ctx, bucketName, "pending_object", nil) + require.NoError(t, err) + + iterator := project.ListUploads(ctx, bucketName, &uplink.ListUploadsOptions{ + Prefix: "pending_object", + }) + require.True(t, iterator.Next()) + require.Equal(t, "pending_object", iterator.Item().Key) + require.NoError(t, iterator.Err()) + + // pending object in objects table + _, err := planet.Satellites[0].Metabase.DB.BeginObjectNextVersion(ctx, metabase.BeginObjectNextVersion{ + ObjectStream: metabase.ObjectStream{ + ProjectID: projectID, + BucketName: bucketName, + ObjectKey: metabase.ObjectKey("pending_object"), + StreamID: testrand.UUID(), + Version: metabase.NextVersion, + }, + Encryption: metabasetest.DefaultEncryption, + UsePendingObjectsTable: false, + }) + require.NoError(t, err) + + keys := []string{} + iterator = project.ListUploads(ctx, bucketName, &uplink.ListUploadsOptions{ + Prefix: "pending_object", + }) + for iterator.Next() { + keys = append(keys, iterator.Item().Key) + } + require.NoError(t, iterator.Err()) + + // we should have two objects with the same name, one from pending_objects + // table and second from objects table + require.ElementsMatch(t, []string{ + "pending_object", + "pending_object", + }, keys) + }) + + t.Run("mixed objects from both tables", func(t *testing.T) { + type TestCases struct { + PendingObjectsTable []string + ObjectsTable []string + } + + for _, tc := range []TestCases{ + { + PendingObjectsTable: []string{"A", "B", "C"}, + ObjectsTable: []string{"X", "Y", "Z"}, + }, + { + PendingObjectsTable: []string{"A", "Y", "C"}, + ObjectsTable: []string{"X", "B", "Z"}, + }, + { + + PendingObjectsTable: []string{"X", "B", "Z"}, + ObjectsTable: []string{"A", "Y", "C"}, + }, + { + PendingObjectsTable: []string{"A", "B", "C", "X", "Y", "Z"}, + }, + { + ObjectsTable: []string{"A", "B", "C", "X", "Y", "Z"}, + }, + } { + t.Run("", func(t *testing.T) { + defer ctx.Check(deleteBucket) + + _, err = project.CreateBucket(ctx, bucketName) + require.NoError(t, err) + + allKeys := []string{} + // create objects in pending_objects table + for _, key := range tc.PendingObjectsTable { + _, err = project.BeginUpload(ctx, bucketName, key, nil) + require.NoError(t, err) + allKeys = append(allKeys, key) + } + + // create objects in objects table + for _, key := range tc.ObjectsTable { + _, err := planet.Satellites[0].Metabase.DB.BeginObjectNextVersion(ctx, metabase.BeginObjectNextVersion{ + ObjectStream: metabase.ObjectStream{ + ProjectID: projectID, + BucketName: bucketName, + ObjectKey: metabase.ObjectKey(key), + StreamID: testrand.UUID(), + Version: metabase.NextVersion, + }, + Encryption: metabasetest.DefaultEncryption, + UsePendingObjectsTable: false, + }) + require.NoError(t, err) + allKeys = append(allKeys, key) + } + + slices.Sort(allKeys) + + for _, limit := range []int{1, 2, 3, 10, 1000} { + ctx := testuplink.WithListLimit(ctx, limit) + resultKeys := []string{} + iterator := project.ListUploads(ctx, bucketName, nil) + for iterator.Next() { + resultKeys = append(resultKeys, iterator.Item().Key) + } + require.NoError(t, iterator.Err()) + require.Equal(t, allKeys, resultKeys) + } + }) + } + }) }) }