satellite: implement metainfo get/set versioning endpoint

Change-Id: Ic43ea8419e7e7092dd191c8ed9f6e4eb4bdff20b
This commit is contained in:
dlamarmorgan 2023-11-09 15:24:29 -08:00 committed by Storj Robot
parent 13d02d9d11
commit cbc82690d7
4 changed files with 152 additions and 70 deletions

View File

@ -60,6 +60,28 @@ func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp
BucketGetLocation: response,
},
})
case *pb.BatchRequestItem_BucketGetVersioning:
singleRequest.BucketGetVersioning.Header = req.Header
response, err := endpoint.GetBucketVersioning(ctx, singleRequest.BucketGetVersioning)
if err != nil {
return resp, err
}
resp.Responses = append(resp.Responses, &pb.BatchResponseItem{
Response: &pb.BatchResponseItem_BucketGetVersioning{
BucketGetVersioning: response,
},
})
case *pb.BatchRequestItem_BucketSetVersioning:
singleRequest.BucketSetVersioning.Header = req.Header
response, err := endpoint.SetBucketVersioning(ctx, singleRequest.BucketSetVersioning)
if err != nil {
return resp, err
}
resp.Responses = append(resp.Responses, &pb.BatchResponseItem{
Response: &pb.BatchResponseItem_BucketSetVersioning{
BucketSetVersioning: response,
},
})
case *pb.BatchRequestItem_BucketDelete:
singleRequest.BucketDelete.Header = req.Header
response, err := endpoint.DeleteBucket(ctx, singleRequest.BucketDelete)

View File

@ -86,6 +86,65 @@ func (endpoint *Endpoint) GetBucketLocation(ctx context.Context, req *pb.GetBuck
}, nil
}
// GetBucketVersioning responds with the versioning state of the bucket and any error encountered.
func (endpoint *Endpoint) GetBucketVersioning(ctx context.Context, req *pb.GetBucketVersioningRequest) (resp *pb.GetBucketVersioningResponse, err error) {
defer mon.Task()(&ctx)(&err)
endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionRead,
Bucket: req.Name,
Time: time.Now(),
})
if err != nil {
return nil, err
}
endpoint.usageTracking(keyInfo, req.Header, fmt.Sprintf("%T", req))
versioning, err := endpoint.buckets.GetBucketVersioningState(ctx, req.GetName(), keyInfo.ProjectID)
if err != nil {
endpoint.log.Error("internal", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to get versioning state for the bucket")
}
return &pb.GetBucketVersioningResponse{
Versioning: int32(versioning),
}, nil
}
// SetBucketVersioning attempts to enable or disable versioning for a bucket and responds with any error encountered.
func (endpoint *Endpoint) SetBucketVersioning(ctx context.Context, req *pb.SetBucketVersioningRequest) (resp *pb.SetBucketVersioningResponse, err error) {
defer mon.Task()(&ctx)(&err)
endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionWrite,
Bucket: req.Name,
Time: time.Now(),
})
if err != nil {
return nil, err
}
endpoint.usageTracking(keyInfo, req.Header, fmt.Sprintf("%T", req))
if !endpoint.config.UseBucketLevelObjectVersioningByProject(keyInfo.ProjectID) {
return nil, rpcstatus.Error(rpcstatus.PermissionDenied, "versioning not allowed for this project")
}
if req.Versioning {
err = endpoint.buckets.EnableBucketVersioning(ctx, req.GetName(), keyInfo.ProjectID)
} else {
err = endpoint.buckets.SuspendBucketVersioning(ctx, req.GetName(), keyInfo.ProjectID)
}
if err != nil {
endpoint.log.Error("internal", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to enable versioning for the bucket")
}
return &pb.SetBucketVersioningResponse{}, nil
}
// CreateBucket creates a new bucket.
func (endpoint *Endpoint) CreateBucket(ctx context.Context, req *pb.BucketCreateRequest) (resp *pb.BucketCreateResponse, err error) {
defer mon.Task()(&ctx)(&err)

View File

@ -362,85 +362,72 @@ func TestGetBucketLocation(t *testing.T) {
func TestEnableSuspendBucketVersioning(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Metainfo.UseBucketLevelObjectVersioning = true
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
bucketName := "testbucket"
projectID := planet.Uplinks[0].Projects[0].ID
satellite := planet.Satellites[0]
enable := true
suspend := false
deleteBucket := func() error {
err := planet.Satellites[0].API.DB.Buckets().DeleteBucket(ctx, []byte(bucketName), projectID)
err := satellite.API.DB.Buckets().DeleteBucket(ctx, []byte(bucketName), projectID)
return err
}
createBucketVersioning := func(versioning buckets.Versioning) (buckets.Bucket, error) {
return planet.Satellites[0].API.DB.Buckets().CreateBucket(ctx, buckets.Bucket{
for _, tt := range []struct {
name string
initialVersioningState buckets.Versioning
versioning bool
resultantVersioningState buckets.Versioning
}{
{"Enable unsupported bucket fails", buckets.VersioningUnsupported, enable, buckets.VersioningUnsupported},
{"Suspend unsupported bucket fails", buckets.VersioningUnsupported, suspend, buckets.VersioningUnsupported},
{"Enable unversioned bucket succeeds", buckets.Unversioned, enable, buckets.VersioningEnabled},
{"Suspend unversioned bucket fails", buckets.Unversioned, suspend, buckets.Unversioned},
{"Enable enabled bucket succeeds", buckets.VersioningEnabled, enable, buckets.VersioningEnabled},
{"Suspend enabled bucket succeeds", buckets.VersioningEnabled, suspend, buckets.VersioningSuspended},
{"Enable suspended bucket succeeds", buckets.VersioningSuspended, enable, buckets.VersioningEnabled},
{"Suspend suspended bucket succeeds", buckets.VersioningSuspended, suspend, buckets.VersioningSuspended},
} {
t.Run(tt.name, func(t *testing.T) {
defer ctx.Check(deleteBucket)
bucket, err := satellite.API.DB.Buckets().CreateBucket(ctx, buckets.Bucket{
ProjectID: projectID,
Name: bucketName,
Versioning: versioning,
Versioning: tt.initialVersioningState,
})
require.NoError(t, err)
require.NotNil(t, bucket)
setResponse, err := satellite.API.Metainfo.Endpoint.SetBucketVersioning(ctx, &pb.SetBucketVersioningRequest{
Header: &pb.RequestHeader{
ApiKey: planet.Uplinks[0].APIKey[satellite.ID()].SerializeRaw(),
},
Name: []byte(bucketName),
Versioning: tt.versioning,
})
// only 3 error state transitions
if tt.initialVersioningState == buckets.VersioningUnsupported ||
(tt.initialVersioningState == buckets.Unversioned && tt.versioning == suspend) {
require.Error(t, err)
} else {
require.NoError(t, err)
}
require.Empty(t, setResponse)
getResponse, err := satellite.API.Metainfo.Endpoint.GetBucketVersioning(ctx, &pb.GetBucketVersioningRequest{
Header: &pb.RequestHeader{
ApiKey: planet.Uplinks[0].APIKey[satellite.ID()].SerializeRaw(),
},
Name: []byte(bucketName),
})
require.NoError(t, err)
require.Equal(t, tt.resultantVersioningState, buckets.Versioning(getResponse.Versioning))
})
}
t.Run("Enable versioning unsupported bucket fails", func(t *testing.T) {
defer ctx.Check(deleteBucket)
bucket, err := createBucketVersioning(buckets.VersioningUnsupported)
require.NoError(t, err)
err = planet.Satellites[0].API.DB.Buckets().EnableBucketVersioning(ctx, []byte(bucket.Name), bucket.ProjectID)
require.Error(t, err)
})
t.Run("Suspend versioning unsupported bucket fails", func(t *testing.T) {
defer ctx.Check(deleteBucket)
bucket, err := createBucketVersioning(buckets.VersioningUnsupported)
require.NoError(t, err)
err = planet.Satellites[0].API.DB.Buckets().SuspendBucketVersioning(ctx, []byte(bucket.Name), bucket.ProjectID)
require.Error(t, err)
})
t.Run("Enable unversioned bucket succeeds", func(t *testing.T) {
defer ctx.Check(deleteBucket)
bucket, err := createBucketVersioning(buckets.Unversioned)
require.NoError(t, err)
err = planet.Satellites[0].API.DB.Buckets().EnableBucketVersioning(ctx, []byte(bucket.Name), bucket.ProjectID)
require.NoError(t, err)
})
t.Run("Suspend unversioned bucket fails", func(t *testing.T) {
defer ctx.Check(deleteBucket)
bucket, err := createBucketVersioning(buckets.Unversioned)
require.NoError(t, err)
err = planet.Satellites[0].API.DB.Buckets().SuspendBucketVersioning(ctx, []byte(bucket.Name), bucket.ProjectID)
require.Error(t, err)
})
t.Run("Enable versioning enabled bucket succeeds", func(t *testing.T) {
defer ctx.Check(deleteBucket)
bucket, err := createBucketVersioning(buckets.VersioningEnabled)
require.NoError(t, err)
err = planet.Satellites[0].API.DB.Buckets().EnableBucketVersioning(ctx, []byte(bucket.Name), bucket.ProjectID)
require.NoError(t, err)
})
t.Run("Suspend versioning enabled bucket succeeds", func(t *testing.T) {
defer ctx.Check(deleteBucket)
bucket, err := createBucketVersioning(buckets.VersioningEnabled)
require.NoError(t, err)
err = planet.Satellites[0].API.DB.Buckets().SuspendBucketVersioning(ctx, []byte(bucket.Name), bucket.ProjectID)
require.NoError(t, err)
})
t.Run("Enable versioning suspended bucket succeeds", func(t *testing.T) {
defer ctx.Check(deleteBucket)
bucket, err := createBucketVersioning(buckets.VersioningSuspended)
require.NoError(t, err)
err = planet.Satellites[0].API.DB.Buckets().EnableBucketVersioning(ctx, []byte(bucket.Name), bucket.ProjectID)
require.NoError(t, err)
})
t.Run("Suspend versioning suspended bucket succeeds", func(t *testing.T) {
defer ctx.Check(deleteBucket)
bucket, err := createBucketVersioning(buckets.VersioningSuspended)
require.NoError(t, err)
err = planet.Satellites[0].API.DB.Buckets().SuspendBucketVersioning(ctx, []byte(bucket.Name), bucket.ProjectID)
require.NoError(t, err)
})
})
}

View File

@ -224,6 +224,20 @@ func TestEndpoint_NoStorageNodes(t *testing.T) {
})
assertInvalidArgument(t, err, false)
_, err = planet.Satellites[0].Metainfo.Endpoint.GetBucketVersioning(ctx, &pb.GetBucketVersioningRequest{
Header: &pb.RequestHeader{
ApiKey: []byte(invalidAPIKey),
},
})
assertInvalidArgument(t, err, false)
_, err = planet.Satellites[0].Metainfo.Endpoint.SetBucketVersioning(ctx, &pb.SetBucketVersioningRequest{
Header: &pb.RequestHeader{
ApiKey: []byte(invalidAPIKey),
},
})
assertInvalidArgument(t, err, false)
_, err = client.GetObject(ctx, metaclient.GetObjectParams{})
assertInvalidArgument(t, err, false)