satellite/metainfo: add UseBucketLevelObjectVersioning by project
This small feature flag will give us the ability to test object versioning for specific projects without enabling it globally. Change-Id: I78301f071b7b8079dd1bd4a561fce0800ce9f074
This commit is contained in:
parent
dc5ae6f4f6
commit
539b32d01e
@ -151,6 +151,10 @@ type Config struct {
|
|||||||
UsePendingObjectsTableProjects []string `help:"list of projects which will have UsePendingObjectsTable feature flag enabled" default:"" hidden:"true"`
|
UsePendingObjectsTableProjects []string `help:"list of projects which will have UsePendingObjectsTable feature flag enabled" default:"" hidden:"true"`
|
||||||
UsePendingObjectsTableRollout int `help:"percentage (0-100) of projects which should have this feature enabled" default:"0" hidden:"true"`
|
UsePendingObjectsTableRollout int `help:"percentage (0-100) of projects which should have this feature enabled" default:"0" hidden:"true"`
|
||||||
|
|
||||||
|
UseBucketLevelObjectVersioning bool `help:"enable the use of bucket level object versioning" default:"false"`
|
||||||
|
// flag to simplify testing by enabling bucket level versioning feature only for specific projects
|
||||||
|
UseBucketLevelObjectVersioningProjects []string `help:"list of projects which will have UseBucketLevelObjectVersioning feature flag enabled" default:"" hidden:"true"`
|
||||||
|
|
||||||
// TODO remove when we benchmarking are done and decision is made.
|
// TODO remove when we benchmarking are done and decision is made.
|
||||||
TestListingQuery bool `default:"false" help:"test the new query for non-recursive listing"`
|
TestListingQuery bool `default:"false" help:"test the new query for non-recursive listing"`
|
||||||
}
|
}
|
||||||
@ -172,6 +176,8 @@ type ExtendedConfig struct {
|
|||||||
|
|
||||||
usePendingObjectsTableProjects []uuid.UUID
|
usePendingObjectsTableProjects []uuid.UUID
|
||||||
usePendingObjectsTableRolloutCursor uuid.UUID
|
usePendingObjectsTableRolloutCursor uuid.UUID
|
||||||
|
|
||||||
|
useBucketLevelObjectVersioningProjects []uuid.UUID
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewExtendedConfig creates new instance of extended config.
|
// NewExtendedConfig creates new instance of extended config.
|
||||||
@ -190,6 +196,14 @@ func NewExtendedConfig(config Config) (_ ExtendedConfig, err error) {
|
|||||||
return ExtendedConfig{}, err
|
return ExtendedConfig{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, projectIDString := range config.UseBucketLevelObjectVersioningProjects {
|
||||||
|
projectID, err := uuid.FromString(projectIDString)
|
||||||
|
if err != nil {
|
||||||
|
return ExtendedConfig{}, err
|
||||||
|
}
|
||||||
|
extendedConfig.useBucketLevelObjectVersioningProjects = append(extendedConfig.useBucketLevelObjectVersioningProjects, projectID)
|
||||||
|
}
|
||||||
|
|
||||||
return extendedConfig, nil
|
return extendedConfig, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -214,6 +228,21 @@ func (ec ExtendedConfig) UsePendingObjectsTableByProject(projectID uuid.UUID) bo
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UseBucketLevelObjectVersioningByProject checks if UseBucketLevelObjectVersioning should be enabled for specific project.
|
||||||
|
func (ec ExtendedConfig) UseBucketLevelObjectVersioningByProject(projectID uuid.UUID) bool {
|
||||||
|
// if its globally enabled don't look at projects
|
||||||
|
if ec.UseBucketLevelObjectVersioning {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
for _, p := range ec.useBucketLevelObjectVersioningProjects {
|
||||||
|
if p == projectID {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func createRolloutCursor(percentage int) (uuid.UUID, error) {
|
func createRolloutCursor(percentage int) (uuid.UUID, error) {
|
||||||
if percentage <= 0 {
|
if percentage <= 0 {
|
||||||
return uuid.UUID{}, nil
|
return uuid.UUID{}, nil
|
||||||
|
@ -180,6 +180,32 @@ func TestExtendedConfig_UsePendingObjectsTableRollout(t *testing.T) {
|
|||||||
require.True(t, config.UsePendingObjectsTableByProject(makeUUID("FFFFFFFF-0000-0000-0000-000000000000")))
|
require.True(t, config.UsePendingObjectsTableByProject(makeUUID("FFFFFFFF-0000-0000-0000-000000000000")))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestExtendedConfig_UseBucketLevelObjectVersioning(t *testing.T) {
|
||||||
|
projectA := testrand.UUID()
|
||||||
|
projectB := testrand.UUID()
|
||||||
|
projectC := testrand.UUID()
|
||||||
|
config, err := metainfo.NewExtendedConfig(metainfo.Config{
|
||||||
|
UseBucketLevelObjectVersioningProjects: []string{
|
||||||
|
projectA.String(),
|
||||||
|
projectB.String(),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.True(t, config.UseBucketLevelObjectVersioningByProject(projectA))
|
||||||
|
require.True(t, config.UseBucketLevelObjectVersioningByProject(projectB))
|
||||||
|
require.False(t, config.UseBucketLevelObjectVersioningByProject(projectC))
|
||||||
|
|
||||||
|
config, err = metainfo.NewExtendedConfig(metainfo.Config{
|
||||||
|
UsePendingObjectsTable: false,
|
||||||
|
UseBucketLevelObjectVersioningProjects: []string{
|
||||||
|
"01000000-0000-0000-0000-000000000000",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, config.UseBucketLevelObjectVersioningByProject(uuid.UUID{1}))
|
||||||
|
}
|
||||||
|
|
||||||
func makeUUID(uuidString string) uuid.UUID {
|
func makeUUID(uuidString string) uuid.UUID {
|
||||||
value, _ := uuid.FromString(uuidString)
|
value, _ := uuid.FromString(uuidString)
|
||||||
return value
|
return value
|
||||||
|
@ -143,6 +143,9 @@ func (endpoint *Endpoint) CreateBucket(ctx context.Context, req *pb.BucketCreate
|
|||||||
}
|
}
|
||||||
bucketReq.Placement = project.DefaultPlacement
|
bucketReq.Placement = project.DefaultPlacement
|
||||||
|
|
||||||
|
if endpoint.config.UseBucketLevelObjectVersioningByProject(keyInfo.ProjectID) {
|
||||||
|
bucketReq.Versioning = buckets.Unversioned
|
||||||
|
}
|
||||||
bucket, err := endpoint.buckets.CreateBucket(ctx, bucketReq)
|
bucket, err := endpoint.buckets.CreateBucket(ctx, bucketReq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if buckets.ErrBucketAlreadyExists.Has(err) {
|
if buckets.ErrBucketAlreadyExists.Has(err) {
|
||||||
|
@ -363,66 +363,122 @@ func TestEnableSuspendBucketVersioning(t *testing.T) {
|
|||||||
testplanet.Run(t, testplanet.Config{
|
testplanet.Run(t, testplanet.Config{
|
||||||
SatelliteCount: 1, UplinkCount: 1,
|
SatelliteCount: 1, UplinkCount: 1,
|
||||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||||
|
bucketName := "testbucket"
|
||||||
projectID := planet.Uplinks[0].Projects[0].ID
|
projectID := planet.Uplinks[0].Projects[0].ID
|
||||||
// create new buckets with each possible versioning state
|
|
||||||
_, err := planet.Satellites[0].API.DB.Buckets().CreateBucket(ctx, buckets.Bucket{
|
|
||||||
ProjectID: projectID,
|
|
||||||
Name: "unsupported",
|
|
||||||
Placement: storj.EU,
|
|
||||||
Versioning: buckets.VersioningUnsupported,
|
|
||||||
})
|
|
||||||
require.NoError(t, err)
|
|
||||||
_, err = planet.Satellites[0].API.DB.Buckets().CreateBucket(ctx, buckets.Bucket{
|
|
||||||
ProjectID: projectID,
|
|
||||||
Name: "unversioned",
|
|
||||||
Placement: storj.EU,
|
|
||||||
Versioning: buckets.Unversioned,
|
|
||||||
})
|
|
||||||
require.NoError(t, err)
|
|
||||||
_, err = planet.Satellites[0].API.DB.Buckets().CreateBucket(ctx, buckets.Bucket{
|
|
||||||
ProjectID: projectID,
|
|
||||||
Name: "versioning_enabled",
|
|
||||||
Placement: storj.EU,
|
|
||||||
Versioning: buckets.VersioningEnabled,
|
|
||||||
})
|
|
||||||
require.NoError(t, err)
|
|
||||||
_, err = planet.Satellites[0].API.DB.Buckets().CreateBucket(ctx, buckets.Bucket{
|
|
||||||
ProjectID: projectID,
|
|
||||||
Name: "versioning_suspended",
|
|
||||||
Placement: storj.EU,
|
|
||||||
Versioning: buckets.VersioningSuspended,
|
|
||||||
})
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
t.Run("EnableBucketSucceeds", func(t *testing.T) {
|
deleteBucket := func() error {
|
||||||
err = planet.Satellites[0].API.DB.Buckets().EnableBucketVersioning(ctx, []byte("unversioned"), projectID)
|
err := planet.Satellites[0].API.DB.Buckets().DeleteBucket(ctx, []byte(bucketName), projectID)
|
||||||
require.NoError(t, err)
|
return err
|
||||||
err = planet.Satellites[0].API.DB.Buckets().EnableBucketVersioning(ctx, []byte("versioning_enabled"), projectID)
|
}
|
||||||
require.NoError(t, err)
|
createBucketVersioning := func(versioning buckets.Versioning) (buckets.Bucket, error) {
|
||||||
err = planet.Satellites[0].API.DB.Buckets().EnableBucketVersioning(ctx, []byte("versioning_suspended"), projectID)
|
return planet.Satellites[0].API.DB.Buckets().CreateBucket(ctx, buckets.Bucket{
|
||||||
require.NoError(t, err)
|
ProjectID: projectID,
|
||||||
})
|
Name: bucketName,
|
||||||
|
Versioning: versioning,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
t.Run("SuspendBucketSucceeds", func(t *testing.T) {
|
t.Run("Enable versioning unsupported bucket fails", func(t *testing.T) {
|
||||||
err = planet.Satellites[0].API.DB.Buckets().EnableBucketVersioning(ctx, []byte("versioning_enabled"), projectID)
|
defer ctx.Check(deleteBucket)
|
||||||
|
bucket, err := createBucketVersioning(buckets.VersioningUnsupported)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
err = planet.Satellites[0].API.DB.Buckets().EnableBucketVersioning(ctx, []byte("versioning_suspended"), projectID)
|
err = planet.Satellites[0].API.DB.Buckets().EnableBucketVersioning(ctx, []byte(bucket.Name), bucket.ProjectID)
|
||||||
require.NoError(t, err)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("EnableBucketVersioning fails due to invalid transition", func(t *testing.T) {
|
|
||||||
// attempt enable bucket versioning on unsupported bucket
|
|
||||||
err = planet.Satellites[0].API.DB.Buckets().EnableBucketVersioning(ctx, []byte("unsupported"), projectID)
|
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("SuspendBucketVersioning fails due to invalid transition", func(t *testing.T) {
|
t.Run("Suspend versioning unsupported bucket fails", func(t *testing.T) {
|
||||||
// attempt suspend bucket versioning on unsupported
|
defer ctx.Check(deleteBucket)
|
||||||
err = planet.Satellites[0].API.DB.Buckets().SuspendBucketVersioning(ctx, []byte("unsupported"), projectID)
|
bucket, err := createBucketVersioning(buckets.VersioningUnsupported)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = planet.Satellites[0].API.DB.Buckets().SuspendBucketVersioning(ctx, []byte(bucket.Name), bucket.ProjectID)
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
// attempt suspend bucket versioning on unversioned
|
})
|
||||||
err = planet.Satellites[0].API.DB.Buckets().SuspendBucketVersioning(ctx, []byte("unversioned"), projectID)
|
|
||||||
|
t.Run("Enable unversioned bucket succeeds", func(t *testing.T) {
|
||||||
|
defer ctx.Check(deleteBucket)
|
||||||
|
bucket, err := createBucketVersioning(buckets.Unversioned)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = planet.Satellites[0].API.DB.Buckets().EnableBucketVersioning(ctx, []byte(bucket.Name), bucket.ProjectID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Suspend unversioned bucket fails", func(t *testing.T) {
|
||||||
|
defer ctx.Check(deleteBucket)
|
||||||
|
bucket, err := createBucketVersioning(buckets.Unversioned)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = planet.Satellites[0].API.DB.Buckets().SuspendBucketVersioning(ctx, []byte(bucket.Name), bucket.ProjectID)
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("Enable versioning enabled bucket succeeds", func(t *testing.T) {
|
||||||
|
defer ctx.Check(deleteBucket)
|
||||||
|
bucket, err := createBucketVersioning(buckets.VersioningEnabled)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = planet.Satellites[0].API.DB.Buckets().EnableBucketVersioning(ctx, []byte(bucket.Name), bucket.ProjectID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Suspend versioning enabled bucket succeeds", func(t *testing.T) {
|
||||||
|
defer ctx.Check(deleteBucket)
|
||||||
|
bucket, err := createBucketVersioning(buckets.VersioningEnabled)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = planet.Satellites[0].API.DB.Buckets().SuspendBucketVersioning(ctx, []byte(bucket.Name), bucket.ProjectID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Enable versioning suspended bucket succeeds", func(t *testing.T) {
|
||||||
|
defer ctx.Check(deleteBucket)
|
||||||
|
bucket, err := createBucketVersioning(buckets.VersioningSuspended)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = planet.Satellites[0].API.DB.Buckets().EnableBucketVersioning(ctx, []byte(bucket.Name), bucket.ProjectID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Suspend versioning suspended bucket succeeds", func(t *testing.T) {
|
||||||
|
defer ctx.Check(deleteBucket)
|
||||||
|
bucket, err := createBucketVersioning(buckets.VersioningSuspended)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = planet.Satellites[0].API.DB.Buckets().SuspendBucketVersioning(ctx, []byte(bucket.Name), bucket.ProjectID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEnableSuspendBucketVersioningFeature(t *testing.T) {
|
||||||
|
testplanet.Run(t, testplanet.Config{
|
||||||
|
SatelliteCount: 1, UplinkCount: 1,
|
||||||
|
Reconfigure: testplanet.Reconfigure{
|
||||||
|
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
||||||
|
config.Metainfo.UseBucketLevelObjectVersioning = true
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||||
|
satelliteSys := planet.Satellites[0]
|
||||||
|
apiKey := planet.Uplinks[0].APIKey[satelliteSys.ID()]
|
||||||
|
projectID := planet.Uplinks[0].Projects[0].ID
|
||||||
|
|
||||||
|
_, err := satelliteSys.Metainfo.Endpoint.CreateBucket(ctx, &pb.BucketCreateRequest{
|
||||||
|
Header: &pb.RequestHeader{
|
||||||
|
ApiKey: apiKey.SerializeRaw(),
|
||||||
|
},
|
||||||
|
Name: []byte("bucket1"),
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// verify suspend unversioned bucket fails
|
||||||
|
err = planet.Satellites[0].API.DB.Buckets().SuspendBucketVersioning(ctx, []byte("bucket1"), projectID)
|
||||||
|
require.Error(t, err)
|
||||||
|
|
||||||
|
// verify enable unversioned bucket succeeds
|
||||||
|
err = planet.Satellites[0].API.DB.Buckets().EnableBucketVersioning(ctx, []byte("bucket1"), projectID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// verify suspend enabled bucket succeeds
|
||||||
|
err = planet.Satellites[0].API.DB.Buckets().SuspendBucketVersioning(ctx, []byte("bucket1"), projectID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// verify re-enable suspended bucket succeeds
|
||||||
|
err = planet.Satellites[0].API.DB.Buckets().EnableBucketVersioning(ctx, []byte("bucket1"), projectID)
|
||||||
|
require.NoError(t, err)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -148,9 +148,9 @@ func (db *bucketsDB) SuspendBucketVersioning(ctx context.Context, bucketName []b
|
|||||||
dbx.BucketMetainfo_ProjectId(projectID[:]),
|
dbx.BucketMetainfo_ProjectId(projectID[:]),
|
||||||
dbx.BucketMetainfo_Name(bucketName),
|
dbx.BucketMetainfo_Name(bucketName),
|
||||||
// only suspend versioning if current versioning state is enabled, or suspended.
|
// only suspend versioning if current versioning state is enabled, or suspended.
|
||||||
dbx.BucketMetainfo_Versioning(int(buckets.Unversioned)),
|
dbx.BucketMetainfo_Versioning(int(buckets.VersioningEnabled)),
|
||||||
dbx.BucketMetainfo_Update_Fields{
|
dbx.BucketMetainfo_Update_Fields{
|
||||||
Versioning: dbx.BucketMetainfo_Versioning(int(buckets.VersioningEnabled)),
|
Versioning: dbx.BucketMetainfo_Versioning(int(buckets.VersioningSuspended)),
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return buckets.ErrBucket.Wrap(err)
|
return buckets.ErrBucket.Wrap(err)
|
||||||
|
3
scripts/testdata/satellite-config.yaml.lock
vendored
3
scripts/testdata/satellite-config.yaml.lock
vendored
@ -748,6 +748,9 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
|
|||||||
# how often we can upload to the single object (the same location) per API instance
|
# how often we can upload to the single object (the same location) per API instance
|
||||||
# metainfo.upload-limiter.single-object-limit: 1s
|
# metainfo.upload-limiter.single-object-limit: 1s
|
||||||
|
|
||||||
|
# enable the use of bucket level object versioning
|
||||||
|
# metainfo.use-bucket-level-object-versioning: false
|
||||||
|
|
||||||
# enable new flow for upload which is using pending_objects table
|
# enable new flow for upload which is using pending_objects table
|
||||||
# metainfo.use-pending-objects-table: false
|
# metainfo.use-pending-objects-table: false
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user