From 26a04a5929f126765a92a11b715cb3211754c9d2 Mon Sep 17 00:00:00 2001 From: Michal Niewrzal Date: Thu, 16 Nov 2023 14:42:14 +0100 Subject: [PATCH] satellite/metainfo: use bucket versioning state for listing Currently we need to use different methods for listing objects depends on versioning state. Change-Id: I5747a699ba2dafcfc384216e4be0b662c8ae95fa --- satellite/metainfo/endpoint_object.go | 17 ++++++--- satellite/metainfo/endpoint_object_test.go | 41 +++++++++++++++++++++- 2 files changed, 52 insertions(+), 6 deletions(-) diff --git a/satellite/metainfo/endpoint_object.go b/satellite/metainfo/endpoint_object.go index 9d56e9609..e08f1a936 100644 --- a/satellite/metainfo/endpoint_object.go +++ b/satellite/metainfo/endpoint_object.go @@ -899,7 +899,7 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq } // TODO this needs to be optimized to avoid DB call on each request - placement, err := endpoint.buckets.GetBucketPlacement(ctx, req.Bucket, keyInfo.ProjectID) + bucket, err := endpoint.buckets.GetBucket(ctx, req.Bucket, keyInfo.ProjectID) if err != nil { if buckets.ErrBucketNotFound.Has(err) { return nil, rpcstatus.Errorf(rpcstatus.NotFound, "bucket not found: %s", req.Bucket) @@ -914,9 +914,16 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq } metabase.ListLimit.Ensure(&limit) - // TODO(ver): this is only temporary logic for testing, will be cleanup later + // TODO(ver): this is only temporary logic for testing, may require cleanup later + // Current logic is: + // * VersioningUnsupported || Unversioned - use metabase.IterateObjectsAllVersionsWithStatus + // * VersioningEnabled || VersioningSuspended + // * IncludeAllVersions == true - use metabase.IterateObjectsAllVersionsWithStatus + // * IncludeAllVersions == false - use metabase.ListObjects + // For now we want to use metabase.ListObjects only for versioned and suspended buckets because + // we need to verify performance of this method before we will use it globally useListObjects := false - if endpoint.config.UseBucketLevelObjectVersioningByProject(keyInfo.ProjectID) && endpoint.config.TestEnableBucketVersioning { + if bucket.Versioning == buckets.VersioningEnabled || bucket.Versioning == buckets.VersioningSuspended { useListObjects = !req.IncludeAllVersions } @@ -981,7 +988,7 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq } for _, entry := range result.Objects { - item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeSystemMetadata, includeCustomMetadata, placement) + item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeSystemMetadata, includeCustomMetadata, bucket.Placement) if err != nil { return nil, endpoint.convertMetabaseErr(err) } @@ -1006,7 +1013,7 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq }, func(ctx context.Context, it metabase.ObjectsIterator) error { entry := metabase.ObjectEntry{} for len(resp.Items) < limit && it.Next(ctx, &entry) { - item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeSystemMetadata, includeCustomMetadata, placement) + item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeSystemMetadata, includeCustomMetadata, bucket.Placement) if err != nil { return err } diff --git a/satellite/metainfo/endpoint_object_test.go b/satellite/metainfo/endpoint_object_test.go index 606a7b1ba..f5f84fd76 100644 --- a/satellite/metainfo/endpoint_object_test.go +++ b/satellite/metainfo/endpoint_object_test.go @@ -2770,7 +2770,6 @@ func TestEndpoint_Object_No_StorageNodes_Versioning(t *testing.T) { Reconfigure: testplanet.Reconfigure{ Satellite: func(log *zap.Logger, index int, config *satellite.Config) { config.Metainfo.UseBucketLevelObjectVersioning = true - config.Metainfo.TestEnableBucketVersioning = true }, }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { @@ -2809,6 +2808,8 @@ func TestEndpoint_Object_No_StorageNodes_Versioning(t *testing.T) { require.NoError(t, createBucket(bucketName)) + require.NoError(t, planet.Satellites[0].API.Buckets.Service.EnableBucketVersioning(ctx, []byte(bucketName), projectID)) + state, err := planet.Satellites[0].API.Buckets.Service.GetBucketVersioningState(ctx, []byte(bucketName), projectID) require.NoError(t, err) require.Equal(t, buckets.VersioningEnabled, state) @@ -2857,5 +2858,43 @@ func TestEndpoint_Object_No_StorageNodes_Versioning(t *testing.T) { require.Nil(t, getResponse.Object.RedundancyScheme) require.Equal(t, pb.Object_DELETE_MARKER_VERSIONED, getResponse.Object.Status) }) + + t.Run("listing objects, different versioning state", func(t *testing.T) { + defer ctx.Check(deleteBucket(bucketName)) + + require.NoError(t, createBucket(bucketName)) + + err = planet.Uplinks[0].Upload(ctx, satelliteSys, bucketName, "objectA", testrand.Bytes(100)) + require.NoError(t, err) + + err = planet.Uplinks[0].Upload(ctx, satelliteSys, bucketName, "objectB", testrand.Bytes(100)) + require.NoError(t, err) + + checkListing := func(expectedItems int, includeAllVersions bool) { + response, err := satelliteSys.API.Metainfo.Endpoint.ListObjects(ctx, &pb.ListObjectsRequest{ + Header: &pb.RequestHeader{ApiKey: apiKey}, + Bucket: []byte(bucketName), + IncludeAllVersions: includeAllVersions, + }) + require.NoError(t, err) + require.Len(t, response.Items, expectedItems) + } + + checkListing(2, false) + + require.NoError(t, planet.Satellites[0].API.Buckets.Service.EnableBucketVersioning(ctx, []byte(bucketName), projectID)) + + // upload second version of objectA + err = planet.Uplinks[0].Upload(ctx, satelliteSys, bucketName, "objectA", testrand.Bytes(100)) + require.NoError(t, err) + + checkListing(2, false) + checkListing(3, true) + + require.NoError(t, planet.Satellites[0].API.Buckets.Service.SuspendBucketVersioning(ctx, []byte(bucketName), projectID)) + + checkListing(2, false) + checkListing(3, true) + }) }) }