From c010e373746a7aeb260f489298bd8d623727dd69 Mon Sep 17 00:00:00 2001 From: Michal Niewrzal Date: Mon, 7 Aug 2023 12:52:13 +0200 Subject: [PATCH] satellite/metainfo: adjust ListObjects to use pending_objects table Adjust metainfo.ListObjects method to use IteratePendingObjects to support new pending_objects table. New method will be used only when we are listing pending objects. Because until objects table will be free from pending objects we can have results in both tables we are merging listing results. This also means that in some (rare?) cases we may return more results than specified listing limit. This situation is temporary. Part of https://github.com/storj/storj/issues/6047 Change-Id: I06389145e5d916c532dfdbd3dcc9ef68ef70e515 --- satellite/metainfo/endpoint_object.go | 153 ++++++++++++++++++--- satellite/metainfo/endpoint_object_test.go | 135 ++++++++++++++++++ 2 files changed, 272 insertions(+), 16 deletions(-) diff --git a/satellite/metainfo/endpoint_object.go b/satellite/metainfo/endpoint_object.go index ca7a1ee94..6fe4a73e8 100644 --- a/satellite/metainfo/endpoint_object.go +++ b/satellite/metainfo/endpoint_object.go @@ -844,12 +844,11 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq status = metabase.ObjectStatus(req.Status) } - cursor := metabase.IterateCursor{ - Key: metabase.ObjectKey(req.EncryptedCursor), - // TODO: set to a the version from the protobuf request when it supports this - } - if len(cursor.Key) != 0 { - cursor.Key = prefix + cursor.Key + cursorKey := metabase.ObjectKey(req.EncryptedCursor) + cursorVersion := metabase.Version(0) + cursorStreamID := uuid.UUID{} + if len(cursorKey) != 0 { + cursorKey = prefix + cursorKey // TODO this is a workaround to avoid duplicates while listing objects by libuplink. // because version is not part of cursor yet and we can have object with version higher @@ -858,7 +857,9 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq // fix as we still want to avoid this problem for older libuplink versions. // // it should be set in case of pending and committed objects - cursor.Version = metabase.MaxVersion + cursorVersion = metabase.MaxVersion + // for the same reasons as above we need to set maximum UUID as a cursor stream id + cursorStreamID = uuid.Max() } includeCustomMetadata := true @@ -875,10 +876,13 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq if endpoint.config.TestListingQuery { result, err := endpoint.metabase.ListObjects(ctx, metabase.ListObjects{ - ProjectID: keyInfo.ProjectID, - BucketName: string(req.Bucket), - Prefix: prefix, - Cursor: metabase.ListObjectsCursor(cursor), + ProjectID: keyInfo.ProjectID, + BucketName: string(req.Bucket), + Prefix: prefix, + Cursor: metabase.ListObjectsCursor{ + Key: cursorKey, + Version: cursorVersion, + }, Recursive: req.Recursive, Limit: limit, Status: status, @@ -898,12 +902,46 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq } resp.More = result.More } else { + if status == metabase.Pending && endpoint.config.UsePendingObjectsTable { + err = endpoint.metabase.IteratePendingObjects(ctx, metabase.IteratePendingObjects{ + ProjectID: keyInfo.ProjectID, + BucketName: string(req.Bucket), + Prefix: prefix, + Cursor: metabase.PendingObjectsCursor{ + Key: cursorKey, + StreamID: cursorStreamID, + }, + Recursive: req.Recursive, + BatchSize: limit + 1, + IncludeCustomMetadata: includeCustomMetadata, + IncludeSystemMetadata: includeSystemMetadata, + }, func(ctx context.Context, it metabase.PendingObjectsIterator) error { + entry := metabase.PendingObjectEntry{} + for len(resp.Items) < limit && it.Next(ctx, &entry) { + item, err := endpoint.pendingObjectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeSystemMetadata, includeCustomMetadata, placement) + if err != nil { + return err + } + resp.Items = append(resp.Items, item) + } + resp.More = it.Next(ctx, &entry) + return nil + }) + if err != nil { + return nil, endpoint.convertMetabaseErr(err) + } + } + + // we always need results from both tables for now err = endpoint.metabase.IterateObjectsAllVersionsWithStatus(ctx, metabase.IterateObjectsWithStatus{ - ProjectID: keyInfo.ProjectID, - BucketName: string(req.Bucket), - Prefix: prefix, - Cursor: cursor, + ProjectID: keyInfo.ProjectID, + BucketName: string(req.Bucket), + Prefix: prefix, + Cursor: metabase.IterateCursor{ + Key: cursorKey, + Version: cursorVersion, + }, Recursive: req.Recursive, BatchSize: limit + 1, Status: status, @@ -918,7 +956,9 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq } resp.Items = append(resp.Items, item) } - resp.More = it.Next(ctx, &entry) + + // we need to take into account also potential results from IteratePendingObjects + resp.More = resp.More || it.Next(ctx, &entry) return nil }, ) @@ -1461,6 +1501,87 @@ func (endpoint *Endpoint) objectEntryToProtoListItem(ctx context.Context, bucket return item, nil } +func (endpoint *Endpoint) pendingObjectEntryToProtoListItem(ctx context.Context, bucket []byte, + entry metabase.PendingObjectEntry, prefixToPrependInSatStreamID metabase.ObjectKey, + includeSystem, includeMetadata bool, placement storj.PlacementConstraint) (item *pb.ObjectListItem, err error) { + + item = &pb.ObjectListItem{ + EncryptedObjectKey: []byte(entry.ObjectKey), + Status: pb.Object_UPLOADING, + } + + expiresAt := time.Time{} + if entry.ExpiresAt != nil { + expiresAt = *entry.ExpiresAt + } + + if includeSystem { + item.ExpiresAt = expiresAt + item.CreatedAt = entry.CreatedAt + } + + if includeMetadata { + var nonce storj.Nonce + if len(entry.EncryptedMetadataNonce) > 0 { + nonce, err = storj.NonceFromBytes(entry.EncryptedMetadataNonce) + if err != nil { + return nil, err + } + } + + streamMeta := &pb.StreamMeta{} + err = pb.Unmarshal(entry.EncryptedMetadata, streamMeta) + if err != nil { + return nil, err + } + + if entry.Encryption != (storj.EncryptionParameters{}) { + streamMeta.EncryptionType = int32(entry.Encryption.CipherSuite) + streamMeta.EncryptionBlockSize = entry.Encryption.BlockSize + } + + if entry.EncryptedMetadataEncryptedKey != nil { + streamMeta.LastSegmentMeta = &pb.SegmentMeta{ + EncryptedKey: entry.EncryptedMetadataEncryptedKey, + KeyNonce: entry.EncryptedMetadataNonce, + } + } + + metadataBytes, err := pb.Marshal(streamMeta) + if err != nil { + return nil, err + } + + item.EncryptedMetadata = metadataBytes + item.EncryptedMetadataNonce = nonce + item.EncryptedMetadataEncryptedKey = entry.EncryptedMetadataEncryptedKey + } + + // Add Stream ID to list items if listing is for pending objects. + // The client requires the Stream ID to use in the MultipartInfo. + satStreamID, err := endpoint.packStreamID(ctx, &internalpb.StreamID{ + Bucket: bucket, + EncryptedObjectKey: append([]byte(prefixToPrependInSatStreamID), []byte(entry.ObjectKey)...), + Version: 1, + CreationDate: entry.CreatedAt, + ExpirationDate: expiresAt, + StreamId: entry.StreamID[:], + MultipartObject: true, + EncryptionParameters: &pb.EncryptionParameters{ + CipherSuite: pb.CipherSuite(entry.Encryption.CipherSuite), + BlockSize: int64(entry.Encryption.BlockSize), + }, + Placement: int32(placement), + UsePendingObjectsTable: true, + }) + if err != nil { + return nil, err + } + item.StreamId = &satStreamID + + return item, nil +} + // DeleteCommittedObject deletes all the pieces of the storage nodes that belongs // to the specified object. // diff --git a/satellite/metainfo/endpoint_object_test.go b/satellite/metainfo/endpoint_object_test.go index 022a6a169..137a7f667 100644 --- a/satellite/metainfo/endpoint_object_test.go +++ b/satellite/metainfo/endpoint_object_test.go @@ -38,6 +38,7 @@ import ( "storj.io/storj/satellite/buckets" "storj.io/storj/satellite/internalpb" "storj.io/storj/satellite/metabase" + "storj.io/storj/satellite/metabase/metabasetest" "storj.io/storj/satellite/metainfo" "storj.io/storj/satellite/overlay" "storj.io/storj/storagenode" @@ -660,6 +661,7 @@ func TestEndpoint_Object_No_StorageNodes(t *testing.T) { iterator := project.ListUploads(ctx, bucketName, &tt.options) require.True(t, iterator.Next()) require.Equal(t, uploadInfo.UploadID, iterator.Item().UploadID) + require.NoError(t, iterator.Err()) err = project.AbortUpload(ctx, bucketName, "multipart-object", iterator.Item().UploadID) require.NoError(t, err) @@ -670,6 +672,139 @@ func TestEndpoint_Object_No_StorageNodes(t *testing.T) { }) } +func TestEndpoint_Object_No_StorageNodes_UsePendingObjectsTable(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: testplanet.Combine( + func(log *zap.Logger, index int, config *satellite.Config) { + config.Metainfo.UsePendingObjectsTable = true + }, + ), + Uplink: func(log *zap.Logger, index int, config *testplanet.UplinkConfig) { + // we need to not encrypt paths because one of tests is creating object + // manually in DB directly. With path encryption listing would skip such object. + config.DefaultPathCipher = storj.EncNull + }, + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + projectID := planet.Uplinks[0].Projects[0].ID + + project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0]) + require.NoError(t, err) + defer ctx.Check(project.Close) + + bucketName := "testbucket" + deleteBucket := func() error { + _, err := project.DeleteBucketWithObjects(ctx, bucketName) + return err + } + + t.Run("UploadID check", func(t *testing.T) { + defer ctx.Check(deleteBucket) + + _, err = project.CreateBucket(ctx, bucketName) + require.NoError(t, err) + + for _, tt := range []struct { + expires time.Time + options uplink.ListUploadsOptions + }{ + { + options: uplink.ListUploadsOptions{System: false, Custom: false}, + }, + { + options: uplink.ListUploadsOptions{System: true, Custom: false}, + }, + { + options: uplink.ListUploadsOptions{System: true, Custom: true}, + }, + { + options: uplink.ListUploadsOptions{System: false, Custom: true}, + }, + { + expires: time.Now().Add(24 * time.Hour), + options: uplink.ListUploadsOptions{System: false, Custom: false}, + }, + { + expires: time.Now().Add(24 * time.Hour), + options: uplink.ListUploadsOptions{System: true, Custom: false}, + }, + { + expires: time.Now().Add(24 * time.Hour), + options: uplink.ListUploadsOptions{System: true, Custom: true}, + }, + { + expires: time.Now().Add(24 * time.Hour), + options: uplink.ListUploadsOptions{System: false, Custom: true}, + }, + } { + t.Run(fmt.Sprintf("expires:%v;system:%v;custom:%v", !tt.expires.IsZero(), tt.options.System, tt.options.Custom), func(t *testing.T) { + objectKey := "multipart-object" + uploadInfo, err := project.BeginUpload(ctx, bucketName, objectKey, &uplink.UploadOptions{ + Expires: tt.expires, + }) + require.NoError(t, err) + + iterator := project.ListUploads(ctx, bucketName, &tt.options) + require.True(t, iterator.Next()) + require.Equal(t, uploadInfo.UploadID, iterator.Item().UploadID) + require.NoError(t, iterator.Err()) + + err = project.AbortUpload(ctx, bucketName, objectKey, iterator.Item().UploadID) + require.NoError(t, err) + }) + } + }) + + t.Run("object in pending_object and object tables", func(t *testing.T) { + defer ctx.Check(deleteBucket) + + _, err = project.CreateBucket(ctx, bucketName) + require.NoError(t, err) + + // pending object in objects table + _, err := planet.Satellites[0].Metabase.DB.BeginObjectNextVersion(ctx, metabase.BeginObjectNextVersion{ + ObjectStream: metabase.ObjectStream{ + ProjectID: projectID, + BucketName: bucketName, + ObjectKey: metabase.ObjectKey("objects_table"), + StreamID: testrand.UUID(), + Version: metabase.NextVersion, + }, + Encryption: metabasetest.DefaultEncryption, + UsePendingObjectsTable: false, + }) + require.NoError(t, err) + + // pending object in pending_objects table + _, err = project.BeginUpload(ctx, bucketName, "pending_object_table", nil) + require.NoError(t, err) + + keys := []string{} + iterator := project.ListUploads(ctx, bucketName, nil) + for iterator.Next() { + keys = append(keys, iterator.Item().Key) + } + require.NoError(t, iterator.Err()) + require.ElementsMatch(t, []string{ + "objects_table", + "pending_object_table", + }, keys) + + iterator = project.ListUploads(ctx, bucketName, nil) + for iterator.Next() { + require.NoError(t, project.AbortUpload(ctx, bucketName, iterator.Item().Key, iterator.Item().UploadID)) + } + require.NoError(t, iterator.Err()) + + iterator = project.ListUploads(ctx, bucketName, nil) + require.False(t, iterator.Next()) + require.NoError(t, iterator.Err()) + }) + }) +} + func TestEndpoint_Object_UploadLimit(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, UplinkCount: 1,