From cbc82690d7619472581a68be429e5db1842f7e7e Mon Sep 17 00:00:00 2001 From: dlamarmorgan Date: Thu, 9 Nov 2023 15:24:29 -0800 Subject: [PATCH] satellite: implement metainfo get/set versioning endpoint Change-Id: Ic43ea8419e7e7092dd191c8ed9f6e4eb4bdff20b --- satellite/metainfo/batch.go | 22 ++++ satellite/metainfo/endpoint_bucket.go | 59 ++++++++++ satellite/metainfo/endpoint_bucket_test.go | 127 +++++++++------------ satellite/metainfo/endpoint_test.go | 14 +++ 4 files changed, 152 insertions(+), 70 deletions(-) diff --git a/satellite/metainfo/batch.go b/satellite/metainfo/batch.go index b54a9af2a..588c44d8f 100644 --- a/satellite/metainfo/batch.go +++ b/satellite/metainfo/batch.go @@ -60,6 +60,28 @@ func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp BucketGetLocation: response, }, }) + case *pb.BatchRequestItem_BucketGetVersioning: + singleRequest.BucketGetVersioning.Header = req.Header + response, err := endpoint.GetBucketVersioning(ctx, singleRequest.BucketGetVersioning) + if err != nil { + return resp, err + } + resp.Responses = append(resp.Responses, &pb.BatchResponseItem{ + Response: &pb.BatchResponseItem_BucketGetVersioning{ + BucketGetVersioning: response, + }, + }) + case *pb.BatchRequestItem_BucketSetVersioning: + singleRequest.BucketSetVersioning.Header = req.Header + response, err := endpoint.SetBucketVersioning(ctx, singleRequest.BucketSetVersioning) + if err != nil { + return resp, err + } + resp.Responses = append(resp.Responses, &pb.BatchResponseItem{ + Response: &pb.BatchResponseItem_BucketSetVersioning{ + BucketSetVersioning: response, + }, + }) case *pb.BatchRequestItem_BucketDelete: singleRequest.BucketDelete.Header = req.Header response, err := endpoint.DeleteBucket(ctx, singleRequest.BucketDelete) diff --git a/satellite/metainfo/endpoint_bucket.go b/satellite/metainfo/endpoint_bucket.go index ccdb7af92..a118e08ec 100644 --- a/satellite/metainfo/endpoint_bucket.go +++ b/satellite/metainfo/endpoint_bucket.go @@ -86,6 +86,65 @@ func (endpoint *Endpoint) GetBucketLocation(ctx context.Context, req *pb.GetBuck }, nil } +// GetBucketVersioning responds with the versioning state of the bucket and any error encountered. +func (endpoint *Endpoint) GetBucketVersioning(ctx context.Context, req *pb.GetBucketVersioningRequest) (resp *pb.GetBucketVersioningResponse, err error) { + defer mon.Task()(&ctx)(&err) + + endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName()) + + keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{ + Op: macaroon.ActionRead, + Bucket: req.Name, + Time: time.Now(), + }) + if err != nil { + return nil, err + } + endpoint.usageTracking(keyInfo, req.Header, fmt.Sprintf("%T", req)) + + versioning, err := endpoint.buckets.GetBucketVersioningState(ctx, req.GetName(), keyInfo.ProjectID) + if err != nil { + endpoint.log.Error("internal", zap.Error(err)) + return nil, rpcstatus.Error(rpcstatus.Internal, "unable to get versioning state for the bucket") + } + + return &pb.GetBucketVersioningResponse{ + Versioning: int32(versioning), + }, nil +} + +// SetBucketVersioning attempts to enable or disable versioning for a bucket and responds with any error encountered. +func (endpoint *Endpoint) SetBucketVersioning(ctx context.Context, req *pb.SetBucketVersioningRequest) (resp *pb.SetBucketVersioningResponse, err error) { + defer mon.Task()(&ctx)(&err) + + endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName()) + + keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{ + Op: macaroon.ActionWrite, + Bucket: req.Name, + Time: time.Now(), + }) + if err != nil { + return nil, err + } + endpoint.usageTracking(keyInfo, req.Header, fmt.Sprintf("%T", req)) + + if !endpoint.config.UseBucketLevelObjectVersioningByProject(keyInfo.ProjectID) { + return nil, rpcstatus.Error(rpcstatus.PermissionDenied, "versioning not allowed for this project") + } + if req.Versioning { + err = endpoint.buckets.EnableBucketVersioning(ctx, req.GetName(), keyInfo.ProjectID) + } else { + err = endpoint.buckets.SuspendBucketVersioning(ctx, req.GetName(), keyInfo.ProjectID) + } + if err != nil { + endpoint.log.Error("internal", zap.Error(err)) + return nil, rpcstatus.Error(rpcstatus.Internal, "unable to enable versioning for the bucket") + } + + return &pb.SetBucketVersioningResponse{}, nil +} + // CreateBucket creates a new bucket. func (endpoint *Endpoint) CreateBucket(ctx context.Context, req *pb.BucketCreateRequest) (resp *pb.BucketCreateResponse, err error) { defer mon.Task()(&ctx)(&err) diff --git a/satellite/metainfo/endpoint_bucket_test.go b/satellite/metainfo/endpoint_bucket_test.go index fc4815a9e..5e784c07f 100644 --- a/satellite/metainfo/endpoint_bucket_test.go +++ b/satellite/metainfo/endpoint_bucket_test.go @@ -362,85 +362,72 @@ func TestGetBucketLocation(t *testing.T) { func TestEnableSuspendBucketVersioning(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: func(log *zap.Logger, index int, config *satellite.Config) { + config.Metainfo.UseBucketLevelObjectVersioning = true + }, + }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { bucketName := "testbucket" projectID := planet.Uplinks[0].Projects[0].ID + satellite := planet.Satellites[0] + enable := true + suspend := false deleteBucket := func() error { - err := planet.Satellites[0].API.DB.Buckets().DeleteBucket(ctx, []byte(bucketName), projectID) + err := satellite.API.DB.Buckets().DeleteBucket(ctx, []byte(bucketName), projectID) return err } - createBucketVersioning := func(versioning buckets.Versioning) (buckets.Bucket, error) { - return planet.Satellites[0].API.DB.Buckets().CreateBucket(ctx, buckets.Bucket{ - ProjectID: projectID, - Name: bucketName, - Versioning: versioning, + + for _, tt := range []struct { + name string + initialVersioningState buckets.Versioning + versioning bool + resultantVersioningState buckets.Versioning + }{ + {"Enable unsupported bucket fails", buckets.VersioningUnsupported, enable, buckets.VersioningUnsupported}, + {"Suspend unsupported bucket fails", buckets.VersioningUnsupported, suspend, buckets.VersioningUnsupported}, + {"Enable unversioned bucket succeeds", buckets.Unversioned, enable, buckets.VersioningEnabled}, + {"Suspend unversioned bucket fails", buckets.Unversioned, suspend, buckets.Unversioned}, + {"Enable enabled bucket succeeds", buckets.VersioningEnabled, enable, buckets.VersioningEnabled}, + {"Suspend enabled bucket succeeds", buckets.VersioningEnabled, suspend, buckets.VersioningSuspended}, + {"Enable suspended bucket succeeds", buckets.VersioningSuspended, enable, buckets.VersioningEnabled}, + {"Suspend suspended bucket succeeds", buckets.VersioningSuspended, suspend, buckets.VersioningSuspended}, + } { + t.Run(tt.name, func(t *testing.T) { + defer ctx.Check(deleteBucket) + bucket, err := satellite.API.DB.Buckets().CreateBucket(ctx, buckets.Bucket{ + ProjectID: projectID, + Name: bucketName, + Versioning: tt.initialVersioningState, + }) + require.NoError(t, err) + require.NotNil(t, bucket) + setResponse, err := satellite.API.Metainfo.Endpoint.SetBucketVersioning(ctx, &pb.SetBucketVersioningRequest{ + Header: &pb.RequestHeader{ + ApiKey: planet.Uplinks[0].APIKey[satellite.ID()].SerializeRaw(), + }, + Name: []byte(bucketName), + Versioning: tt.versioning, + }) + // only 3 error state transitions + if tt.initialVersioningState == buckets.VersioningUnsupported || + (tt.initialVersioningState == buckets.Unversioned && tt.versioning == suspend) { + require.Error(t, err) + } else { + require.NoError(t, err) + } + require.Empty(t, setResponse) + getResponse, err := satellite.API.Metainfo.Endpoint.GetBucketVersioning(ctx, &pb.GetBucketVersioningRequest{ + Header: &pb.RequestHeader{ + ApiKey: planet.Uplinks[0].APIKey[satellite.ID()].SerializeRaw(), + }, + Name: []byte(bucketName), + }) + require.NoError(t, err) + require.Equal(t, tt.resultantVersioningState, buckets.Versioning(getResponse.Versioning)) }) } - - t.Run("Enable versioning unsupported bucket fails", func(t *testing.T) { - defer ctx.Check(deleteBucket) - bucket, err := createBucketVersioning(buckets.VersioningUnsupported) - require.NoError(t, err) - err = planet.Satellites[0].API.DB.Buckets().EnableBucketVersioning(ctx, []byte(bucket.Name), bucket.ProjectID) - require.Error(t, err) - }) - - t.Run("Suspend versioning unsupported bucket fails", func(t *testing.T) { - defer ctx.Check(deleteBucket) - bucket, err := createBucketVersioning(buckets.VersioningUnsupported) - require.NoError(t, err) - err = planet.Satellites[0].API.DB.Buckets().SuspendBucketVersioning(ctx, []byte(bucket.Name), bucket.ProjectID) - require.Error(t, err) - }) - - t.Run("Enable unversioned bucket succeeds", func(t *testing.T) { - defer ctx.Check(deleteBucket) - bucket, err := createBucketVersioning(buckets.Unversioned) - require.NoError(t, err) - err = planet.Satellites[0].API.DB.Buckets().EnableBucketVersioning(ctx, []byte(bucket.Name), bucket.ProjectID) - require.NoError(t, err) - }) - - t.Run("Suspend unversioned bucket fails", func(t *testing.T) { - defer ctx.Check(deleteBucket) - bucket, err := createBucketVersioning(buckets.Unversioned) - require.NoError(t, err) - err = planet.Satellites[0].API.DB.Buckets().SuspendBucketVersioning(ctx, []byte(bucket.Name), bucket.ProjectID) - require.Error(t, err) - }) - - t.Run("Enable versioning enabled bucket succeeds", func(t *testing.T) { - defer ctx.Check(deleteBucket) - bucket, err := createBucketVersioning(buckets.VersioningEnabled) - require.NoError(t, err) - err = planet.Satellites[0].API.DB.Buckets().EnableBucketVersioning(ctx, []byte(bucket.Name), bucket.ProjectID) - require.NoError(t, err) - }) - - t.Run("Suspend versioning enabled bucket succeeds", func(t *testing.T) { - defer ctx.Check(deleteBucket) - bucket, err := createBucketVersioning(buckets.VersioningEnabled) - require.NoError(t, err) - err = planet.Satellites[0].API.DB.Buckets().SuspendBucketVersioning(ctx, []byte(bucket.Name), bucket.ProjectID) - require.NoError(t, err) - }) - - t.Run("Enable versioning suspended bucket succeeds", func(t *testing.T) { - defer ctx.Check(deleteBucket) - bucket, err := createBucketVersioning(buckets.VersioningSuspended) - require.NoError(t, err) - err = planet.Satellites[0].API.DB.Buckets().EnableBucketVersioning(ctx, []byte(bucket.Name), bucket.ProjectID) - require.NoError(t, err) - }) - - t.Run("Suspend versioning suspended bucket succeeds", func(t *testing.T) { - defer ctx.Check(deleteBucket) - bucket, err := createBucketVersioning(buckets.VersioningSuspended) - require.NoError(t, err) - err = planet.Satellites[0].API.DB.Buckets().SuspendBucketVersioning(ctx, []byte(bucket.Name), bucket.ProjectID) - require.NoError(t, err) - }) }) } diff --git a/satellite/metainfo/endpoint_test.go b/satellite/metainfo/endpoint_test.go index 94a646eeb..7d287c7e4 100644 --- a/satellite/metainfo/endpoint_test.go +++ b/satellite/metainfo/endpoint_test.go @@ -224,6 +224,20 @@ func TestEndpoint_NoStorageNodes(t *testing.T) { }) assertInvalidArgument(t, err, false) + _, err = planet.Satellites[0].Metainfo.Endpoint.GetBucketVersioning(ctx, &pb.GetBucketVersioningRequest{ + Header: &pb.RequestHeader{ + ApiKey: []byte(invalidAPIKey), + }, + }) + assertInvalidArgument(t, err, false) + + _, err = planet.Satellites[0].Metainfo.Endpoint.SetBucketVersioning(ctx, &pb.SetBucketVersioningRequest{ + Header: &pb.RequestHeader{ + ApiKey: []byte(invalidAPIKey), + }, + }) + assertInvalidArgument(t, err, false) + _, err = client.GetObject(ctx, metaclient.GetObjectParams{}) assertInvalidArgument(t, err, false)