satellite/metainfo: drop MultipleVersions config flag
This flag was in general one time switch to enable versions internally. New we can remove it as it makes code more complex. Change-Id: I740b6e8fae80d5fac51d9425793b02678357490e
This commit is contained in:
parent
ae5947327b
commit
54b6e1614a
@ -491,7 +491,6 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int
|
|||||||
MinPartSize: config.Metainfo.MinPartSize,
|
MinPartSize: config.Metainfo.MinPartSize,
|
||||||
MaxNumberOfParts: config.Metainfo.MaxNumberOfParts,
|
MaxNumberOfParts: config.Metainfo.MaxNumberOfParts,
|
||||||
ServerSideCopy: config.Metainfo.ServerSideCopy,
|
ServerSideCopy: config.Metainfo.ServerSideCopy,
|
||||||
MultipleVersions: config.Metainfo.MultipleVersions,
|
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errs.Wrap(err)
|
return nil, errs.Wrap(err)
|
||||||
|
@ -2733,16 +2733,7 @@ func TestCommitObject(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
})
|
})
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestCommitObject_MultipleVersions(t *testing.T) {
|
|
||||||
metabasetest.RunWithConfig(t, metabase.Config{
|
|
||||||
ApplicationName: "satellite-test",
|
|
||||||
MinPartSize: 5 * memory.MiB,
|
|
||||||
MaxNumberOfParts: 1000,
|
|
||||||
MultipleVersions: true,
|
|
||||||
}, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
|
||||||
t.Run("OnDelete", func(t *testing.T) {
|
t.Run("OnDelete", func(t *testing.T) {
|
||||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||||
|
|
||||||
@ -3205,46 +3196,3 @@ func TestCommitObjectWithIncorrectAmountOfParts(t *testing.T) {
|
|||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMultipleVersionsBug(t *testing.T) {
|
|
||||||
// test simulates case when we have different configurations for different
|
|
||||||
// API instances in the system (multiple versions flag)
|
|
||||||
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
|
||||||
obj := metabasetest.RandObjectStream()
|
|
||||||
|
|
||||||
// simulates code WITHOUT multiple versions flag enabled
|
|
||||||
obj.Version = metabase.DefaultVersion
|
|
||||||
_, err := db.BeginObjectExactVersion(ctx, metabase.BeginObjectExactVersion{
|
|
||||||
ObjectStream: obj,
|
|
||||||
Encryption: metabasetest.DefaultEncryption,
|
|
||||||
})
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// this commit will be run WITH multiple versions flag enabled
|
|
||||||
_, err = db.CommitObject(ctx, metabase.CommitObject{
|
|
||||||
ObjectStream: obj,
|
|
||||||
})
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// start overriding object
|
|
||||||
|
|
||||||
// simulates code WITH multiple versions flag enabled
|
|
||||||
obj.Version = metabase.NextVersion
|
|
||||||
pendingObject, err := db.BeginObjectNextVersion(ctx, metabase.BeginObjectNextVersion{
|
|
||||||
ObjectStream: obj,
|
|
||||||
Encryption: metabasetest.DefaultEncryption,
|
|
||||||
})
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
obj.Version = pendingObject.Version
|
|
||||||
db.TestingEnableMultipleVersions(false)
|
|
||||||
_, err = db.CommitObject(ctx, metabase.CommitObject{
|
|
||||||
ObjectStream: obj,
|
|
||||||
})
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
objects, err := db.TestingAllObjects(ctx)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, 1, len(objects))
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
@ -111,9 +111,6 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if !db.config.MultipleVersions {
|
|
||||||
nextAvailableVersion = opts.Version
|
|
||||||
}
|
|
||||||
if objectAtDestination != nil && objectAtDestination.StreamID == sourceObject.StreamID {
|
if objectAtDestination != nil && objectAtDestination.StreamID == sourceObject.StreamID {
|
||||||
newObject = sourceObject
|
newObject = sourceObject
|
||||||
return nil
|
return nil
|
||||||
@ -212,10 +209,7 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
|
|||||||
}
|
}
|
||||||
|
|
||||||
if objectAtDestination != nil {
|
if objectAtDestination != nil {
|
||||||
version := opts.Version
|
version := objectAtDestination.Version
|
||||||
if db.config.MultipleVersions {
|
|
||||||
version = objectAtDestination.Version
|
|
||||||
}
|
|
||||||
deletedObjects, err := db.deleteObjectExactVersionServerSideCopy(
|
deletedObjects, err := db.deleteObjectExactVersionServerSideCopy(
|
||||||
ctx, DeleteObjectExactVersion{
|
ctx, DeleteObjectExactVersion{
|
||||||
Version: version,
|
Version: version,
|
||||||
|
@ -1066,7 +1066,6 @@ func TestFinishCopyObject(t *testing.T) {
|
|||||||
|
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
metabasetest.DeleteAll{}.Check(ctx, t, db)
|
metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||||
db.TestingEnableMultipleVersions(false)
|
|
||||||
sourceObjStream.BucketName = tc.Bucket
|
sourceObjStream.BucketName = tc.Bucket
|
||||||
sourceObjStream.ObjectKey = tc.Key
|
sourceObjStream.ObjectKey = tc.Key
|
||||||
destinationObjStream.BucketName = tc.NewBucket
|
destinationObjStream.BucketName = tc.NewBucket
|
||||||
@ -1138,7 +1137,6 @@ func TestFinishCopyObject(t *testing.T) {
|
|||||||
}.Run(ctx, t, db, destinationObjStream, 0)
|
}.Run(ctx, t, db, destinationObjStream, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
db.TestingEnableMultipleVersions(true)
|
|
||||||
copyObj, expectedOriginalSegments, _ := metabasetest.CreateObjectCopy{
|
copyObj, expectedOriginalSegments, _ := metabasetest.CreateObjectCopy{
|
||||||
OriginalObject: sourceObj,
|
OriginalObject: sourceObj,
|
||||||
CopyObjectStream: &destinationObjStream,
|
CopyObjectStream: &destinationObjStream,
|
||||||
|
@ -36,7 +36,6 @@ type Config struct {
|
|||||||
// TODO remove this flag when server-side copy implementation will be finished
|
// TODO remove this flag when server-side copy implementation will be finished
|
||||||
ServerSideCopy bool
|
ServerSideCopy bool
|
||||||
ServerSideCopyDisabled bool
|
ServerSideCopyDisabled bool
|
||||||
MultipleVersions bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// DB implements a database for storing objects and segments.
|
// DB implements a database for storing objects and segments.
|
||||||
@ -684,9 +683,3 @@ func limitedAsOfSystemTime(impl dbutil.Implementation, now, baseline time.Time,
|
|||||||
}
|
}
|
||||||
return impl.AsOfSystemTime(baseline)
|
return impl.AsOfSystemTime(baseline)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestingEnableMultipleVersions enables or disables the use of multiple versions (for tests).
|
|
||||||
// Will be removed when multiple versions is enabled in production.
|
|
||||||
func (db *DB) TestingEnableMultipleVersions(enabled bool) {
|
|
||||||
db.config.MultipleVersions = enabled
|
|
||||||
}
|
|
||||||
|
@ -89,7 +89,6 @@ func Run(t *testing.T, fn func(ctx *testcontext.Context, t *testing.T, db *metab
|
|||||||
MaxNumberOfParts: config.MaxNumberOfParts,
|
MaxNumberOfParts: config.MaxNumberOfParts,
|
||||||
ServerSideCopy: config.ServerSideCopy,
|
ServerSideCopy: config.ServerSideCopy,
|
||||||
ServerSideCopyDisabled: config.ServerSideCopyDisabled,
|
ServerSideCopyDisabled: config.ServerSideCopyDisabled,
|
||||||
MultipleVersions: config.MultipleVersions,
|
|
||||||
}, fn)
|
}, fn)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -156,10 +156,9 @@ func (db *DB) FinishMoveObject(ctx context.Context, opts FinishMoveObject) (err
|
|||||||
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) {
|
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) {
|
||||||
targetVersion := opts.Version
|
targetVersion := opts.Version
|
||||||
|
|
||||||
if db.config.MultipleVersions {
|
useNewVersion := false
|
||||||
useNewVersion := false
|
highestVersion := Version(0)
|
||||||
highestVersion := Version(0)
|
err = withRows(tx.QueryContext(ctx, `
|
||||||
err = withRows(tx.QueryContext(ctx, `
|
|
||||||
SELECT version, status
|
SELECT version, status
|
||||||
FROM objects
|
FROM objects
|
||||||
WHERE
|
WHERE
|
||||||
@ -168,32 +167,31 @@ func (db *DB) FinishMoveObject(ctx context.Context, opts FinishMoveObject) (err
|
|||||||
object_key = $3
|
object_key = $3
|
||||||
ORDER BY version ASC
|
ORDER BY version ASC
|
||||||
`, opts.ProjectID, []byte(opts.NewBucket), opts.NewEncryptedObjectKey))(func(rows tagsql.Rows) error {
|
`, opts.ProjectID, []byte(opts.NewBucket), opts.NewEncryptedObjectKey))(func(rows tagsql.Rows) error {
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var status ObjectStatus
|
var status ObjectStatus
|
||||||
var version Version
|
var version Version
|
||||||
|
|
||||||
err = rows.Scan(&version, &status)
|
err = rows.Scan(&version, &status)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Error.New("failed to scan objects: %w", err)
|
return Error.New("failed to scan objects: %w", err)
|
||||||
}
|
|
||||||
|
|
||||||
if status == Committed {
|
|
||||||
return ErrObjectAlreadyExists.New("")
|
|
||||||
} else if status == Pending && version == opts.Version {
|
|
||||||
useNewVersion = true
|
|
||||||
}
|
|
||||||
highestVersion = version
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
if status == Committed {
|
||||||
})
|
return ErrObjectAlreadyExists.New("")
|
||||||
if err != nil {
|
} else if status == Pending && version == opts.Version {
|
||||||
return Error.Wrap(err)
|
useNewVersion = true
|
||||||
|
}
|
||||||
|
highestVersion = version
|
||||||
}
|
}
|
||||||
|
|
||||||
if useNewVersion {
|
return nil
|
||||||
targetVersion = highestVersion + 1
|
})
|
||||||
}
|
if err != nil {
|
||||||
|
return Error.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if useNewVersion {
|
||||||
|
targetVersion = highestVersion + 1
|
||||||
}
|
}
|
||||||
|
|
||||||
updateObjectsQuery := `
|
updateObjectsQuery := `
|
||||||
|
@ -527,13 +527,6 @@ func TestFinishMoveObject(t *testing.T) {
|
|||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
})
|
})
|
||||||
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestFinishMoveObject_MultipleVersions(t *testing.T) {
|
|
||||||
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
|
||||||
db.TestingEnableMultipleVersions(true)
|
|
||||||
|
|
||||||
t.Run("finish move object - different versions reject", func(t *testing.T) {
|
t.Run("finish move object - different versions reject", func(t *testing.T) {
|
||||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
@ -139,7 +139,6 @@ type Config struct {
|
|||||||
// TODO remove this flag when server-side copy implementation will be finished
|
// TODO remove this flag when server-side copy implementation will be finished
|
||||||
ServerSideCopy bool `help:"enable code for server-side copy, deprecated. please leave this to true." default:"true"`
|
ServerSideCopy bool `help:"enable code for server-side copy, deprecated. please leave this to true." default:"true"`
|
||||||
ServerSideCopyDisabled bool `help:"disable already enabled server-side copy. this is because once server side copy is enabled, delete code should stay changed, even if you want to disable server side copy" default:"false"`
|
ServerSideCopyDisabled bool `help:"disable already enabled server-side copy. this is because once server side copy is enabled, delete code should stay changed, even if you want to disable server side copy" default:"false"`
|
||||||
MultipleVersions bool `help:"feature flag to enable using multple objects versions in the system internally" default:"true"`
|
|
||||||
// TODO remove when we benchmarking are done and decision is made.
|
// TODO remove when we benchmarking are done and decision is made.
|
||||||
TestListingQuery bool `default:"false" help:"test the new query for non-recursive listing"`
|
TestListingQuery bool `default:"false" help:"test the new query for non-recursive listing"`
|
||||||
}
|
}
|
||||||
@ -151,6 +150,5 @@ func (c Config) Metabase(applicationName string) metabase.Config {
|
|||||||
MinPartSize: c.MinPartSize,
|
MinPartSize: c.MinPartSize,
|
||||||
MaxNumberOfParts: c.MaxNumberOfParts,
|
MaxNumberOfParts: c.MaxNumberOfParts,
|
||||||
ServerSideCopy: c.ServerSideCopy,
|
ServerSideCopy: c.ServerSideCopy,
|
||||||
MultipleVersions: c.MultipleVersions,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -88,30 +88,6 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe
|
|||||||
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
if !endpoint.config.MultipleVersions {
|
|
||||||
if canDelete {
|
|
||||||
_, err = endpoint.DeleteObjectAnyStatus(ctx, metabase.ObjectLocation{
|
|
||||||
ProjectID: keyInfo.ProjectID,
|
|
||||||
BucketName: string(req.Bucket),
|
|
||||||
ObjectKey: metabase.ObjectKey(req.EncryptedObjectKey),
|
|
||||||
})
|
|
||||||
if err != nil && !storj.ErrObjectNotFound.Has(err) {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
_, err = endpoint.metabase.GetObjectLastCommitted(ctx, metabase.GetObjectLastCommitted{
|
|
||||||
ObjectLocation: metabase.ObjectLocation{
|
|
||||||
ProjectID: keyInfo.ProjectID,
|
|
||||||
BucketName: string(req.Bucket),
|
|
||||||
ObjectKey: metabase.ObjectKey(req.EncryptedObjectKey),
|
|
||||||
},
|
|
||||||
})
|
|
||||||
if err == nil {
|
|
||||||
return nil, rpcstatus.Error(rpcstatus.PermissionDenied, "Unauthorized API credentials")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := endpoint.ensureAttribution(ctx, req.Header, keyInfo, req.Bucket, nil); err != nil {
|
if err := endpoint.ensureAttribution(ctx, req.Header, keyInfo, req.Bucket, nil); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -141,40 +117,21 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe
|
|||||||
nonce = req.EncryptedMetadataNonce[:]
|
nonce = req.EncryptedMetadataNonce[:]
|
||||||
}
|
}
|
||||||
|
|
||||||
var object metabase.Object
|
object, err := endpoint.metabase.BeginObjectNextVersion(ctx, metabase.BeginObjectNextVersion{
|
||||||
if endpoint.config.MultipleVersions {
|
ObjectStream: metabase.ObjectStream{
|
||||||
object, err = endpoint.metabase.BeginObjectNextVersion(ctx, metabase.BeginObjectNextVersion{
|
ProjectID: keyInfo.ProjectID,
|
||||||
ObjectStream: metabase.ObjectStream{
|
BucketName: string(req.Bucket),
|
||||||
ProjectID: keyInfo.ProjectID,
|
ObjectKey: metabase.ObjectKey(req.EncryptedObjectKey),
|
||||||
BucketName: string(req.Bucket),
|
StreamID: streamID,
|
||||||
ObjectKey: metabase.ObjectKey(req.EncryptedObjectKey),
|
Version: metabase.NextVersion,
|
||||||
StreamID: streamID,
|
},
|
||||||
Version: metabase.NextVersion,
|
ExpiresAt: expiresAt,
|
||||||
},
|
Encryption: encryptionParameters,
|
||||||
ExpiresAt: expiresAt,
|
|
||||||
Encryption: encryptionParameters,
|
|
||||||
|
|
||||||
EncryptedMetadata: req.EncryptedMetadata,
|
EncryptedMetadata: req.EncryptedMetadata,
|
||||||
EncryptedMetadataEncryptedKey: req.EncryptedMetadataEncryptedKey,
|
EncryptedMetadataEncryptedKey: req.EncryptedMetadataEncryptedKey,
|
||||||
EncryptedMetadataNonce: nonce,
|
EncryptedMetadataNonce: nonce,
|
||||||
})
|
})
|
||||||
} else {
|
|
||||||
object, err = endpoint.metabase.BeginObjectExactVersion(ctx, metabase.BeginObjectExactVersion{
|
|
||||||
ObjectStream: metabase.ObjectStream{
|
|
||||||
ProjectID: keyInfo.ProjectID,
|
|
||||||
BucketName: string(req.Bucket),
|
|
||||||
ObjectKey: metabase.ObjectKey(req.EncryptedObjectKey),
|
|
||||||
StreamID: streamID,
|
|
||||||
Version: metabase.DefaultVersion,
|
|
||||||
},
|
|
||||||
ExpiresAt: expiresAt,
|
|
||||||
Encryption: encryptionParameters,
|
|
||||||
|
|
||||||
EncryptedMetadata: req.EncryptedMetadata,
|
|
||||||
EncryptedMetadataEncryptedKey: req.EncryptedMetadataEncryptedKey,
|
|
||||||
EncryptedMetadataNonce: nonce,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, endpoint.convertMetabaseErr(err)
|
return nil, endpoint.convertMetabaseErr(err)
|
||||||
}
|
}
|
||||||
|
@ -772,6 +772,7 @@ func TestEndpoint_Object_No_StorageNodes_TestListingQuery(t *testing.T) {
|
|||||||
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestEndpoint_Object_With_StorageNodes(t *testing.T) {
|
func TestEndpoint_Object_With_StorageNodes(t *testing.T) {
|
||||||
testplanet.Run(t, testplanet.Config{
|
testplanet.Run(t, testplanet.Config{
|
||||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||||
@ -786,6 +787,10 @@ func TestEndpoint_Object_With_StorageNodes(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer ctx.Check(metainfoClient.Close)
|
defer ctx.Check(metainfoClient.Close)
|
||||||
|
|
||||||
|
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer ctx.Check(project.Close)
|
||||||
|
|
||||||
bucketName := "testbucket"
|
bucketName := "testbucket"
|
||||||
deleteBucket := func(bucketName string) func() error {
|
deleteBucket := func(bucketName string) func() error {
|
||||||
return func() error {
|
return func() error {
|
||||||
@ -1228,6 +1233,177 @@ func TestEndpoint_Object_With_StorageNodes(t *testing.T) {
|
|||||||
require.Equal(t, 1, len(segments))
|
require.Equal(t, 1, len(segments))
|
||||||
require.Equal(t, storj.EU, segments[0].Placement)
|
require.Equal(t, storj.EU, segments[0].Placement)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("multiple versions", func(t *testing.T) {
|
||||||
|
defer ctx.Check(deleteBucket("multipleversions"))
|
||||||
|
|
||||||
|
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "multipleversions", "object", testrand.Bytes(10*memory.MiB))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// override object to have it with version 2
|
||||||
|
expectedData := testrand.Bytes(11 * memory.KiB)
|
||||||
|
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "multipleversions", "object", expectedData)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
objects, err := planet.Satellites[0].Metabase.DB.TestingAllObjects(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, objects, 1)
|
||||||
|
require.EqualValues(t, 2, objects[0].Version)
|
||||||
|
|
||||||
|
// add some pending uploads, each will have version higher then 2
|
||||||
|
uploadIDs := []string{}
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
info, err := project.BeginUpload(ctx, "multipleversions", "object", nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
uploadIDs = append(uploadIDs, info.UploadID)
|
||||||
|
}
|
||||||
|
|
||||||
|
checkDownload := func(objectKey string, expectedData []byte) {
|
||||||
|
data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], "multipleversions", objectKey)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, expectedData, data)
|
||||||
|
}
|
||||||
|
|
||||||
|
checkDownload("object", expectedData)
|
||||||
|
|
||||||
|
err = project.MoveObject(ctx, "multipleversions", "object", "multipleversions", "object_moved", nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
checkDownload("object_moved", expectedData)
|
||||||
|
|
||||||
|
err = project.MoveObject(ctx, "multipleversions", "object_moved", "multipleversions", "object", nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
checkDownload("object", expectedData)
|
||||||
|
|
||||||
|
iterator := project.ListObjects(ctx, "multipleversions", nil)
|
||||||
|
require.True(t, iterator.Next())
|
||||||
|
require.Equal(t, "object", iterator.Item().Key)
|
||||||
|
require.NoError(t, iterator.Err())
|
||||||
|
|
||||||
|
// upload multipleversions/object once again as we just moved it
|
||||||
|
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "multipleversions", "object", expectedData)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
checkDownload("object", expectedData)
|
||||||
|
|
||||||
|
{ // server side copy
|
||||||
|
_, err = project.CopyObject(ctx, "multipleversions", "object", "multipleversions", "object_copy", nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
checkDownload("object_copy", expectedData)
|
||||||
|
|
||||||
|
_, err = project.DeleteObject(ctx, "multipleversions", "object")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
_, err = project.CopyObject(ctx, "multipleversions", "object_copy", "multipleversions", "object", nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
checkDownload("object", expectedData)
|
||||||
|
|
||||||
|
_, err = project.DeleteObject(ctx, "multipleversions", "object_copy")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
checkDownload("object", expectedData)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = project.AbortUpload(ctx, "multipleversions", "object", uploadIDs[0])
|
||||||
|
require.NoError(t, err)
|
||||||
|
checkDownload("object", expectedData)
|
||||||
|
|
||||||
|
expectedData = testrand.Bytes(12 * memory.KiB)
|
||||||
|
upload, err := project.UploadPart(ctx, "multipleversions", "object", uploadIDs[1], 1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
_, err = upload.Write(expectedData)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, upload.Commit())
|
||||||
|
_, err = project.CommitUpload(ctx, "multipleversions", "object", uploadIDs[1], nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
checkDownload("object", expectedData)
|
||||||
|
|
||||||
|
_, err = project.DeleteObject(ctx, "multipleversions", "object")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
_, err = project.DeleteObject(ctx, "multipleversions", "object_moved")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
iterator = project.ListObjects(ctx, "multipleversions", nil)
|
||||||
|
require.False(t, iterator.Next())
|
||||||
|
require.NoError(t, iterator.Err())
|
||||||
|
|
||||||
|
// use next available pending upload
|
||||||
|
upload, err = project.UploadPart(ctx, "multipleversions", "object", uploadIDs[2], 1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
_, err = upload.Write(expectedData)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, upload.Commit())
|
||||||
|
_, err = project.CommitUpload(ctx, "multipleversions", "object", uploadIDs[2], nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
checkDownload("object", expectedData)
|
||||||
|
|
||||||
|
uploads := project.ListUploads(ctx, "multipleversions", nil)
|
||||||
|
count := 0
|
||||||
|
for uploads.Next() {
|
||||||
|
require.Equal(t, "object", uploads.Item().Key)
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
// we started with 10 pending object and during test we abort/commit 3 objects
|
||||||
|
pendingUploadsLeft := 7
|
||||||
|
require.Equal(t, pendingUploadsLeft, count)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("override object", func(t *testing.T) {
|
||||||
|
defer ctx.Check(deleteBucket("bucket"))
|
||||||
|
|
||||||
|
bucketName := "bucket"
|
||||||
|
objectName := "file1"
|
||||||
|
|
||||||
|
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, objectName, testrand.Bytes(5*memory.KiB))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, segments, 1)
|
||||||
|
require.NotZero(t, len(segments[0].Pieces))
|
||||||
|
|
||||||
|
for _, piece := range segments[0].Pieces {
|
||||||
|
node := planet.FindNode(piece.StorageNode)
|
||||||
|
pieceID := segments[0].RootPieceID.Derive(piece.StorageNode, int32(piece.Number))
|
||||||
|
|
||||||
|
piece, err := node.DB.Pieces().Stat(ctx, storage.BlobRef{
|
||||||
|
Namespace: planet.Satellites[0].ID().Bytes(),
|
||||||
|
Key: pieceID.Bytes(),
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, piece)
|
||||||
|
}
|
||||||
|
|
||||||
|
oldPieces := segments[0].Pieces
|
||||||
|
expectedData := testrand.Bytes(5 * memory.KiB)
|
||||||
|
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, objectName, expectedData)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
planet.WaitForStorageNodeDeleters(ctx)
|
||||||
|
|
||||||
|
// verify that old object pieces are not stored on storage nodes anymore
|
||||||
|
for _, piece := range oldPieces {
|
||||||
|
node := planet.FindNode(piece.StorageNode)
|
||||||
|
pieceID := segments[0].RootPieceID.Derive(piece.StorageNode, int32(piece.Number))
|
||||||
|
|
||||||
|
piece, err := node.DB.Pieces().Stat(ctx, storage.BlobRef{
|
||||||
|
Namespace: planet.Satellites[0].ID().Bytes(),
|
||||||
|
Key: pieceID.Bytes(),
|
||||||
|
})
|
||||||
|
require.Error(t, err)
|
||||||
|
require.Nil(t, piece)
|
||||||
|
}
|
||||||
|
|
||||||
|
data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], bucketName, objectName)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, expectedData, data)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2026,213 +2202,9 @@ func TestEndpoint_UpdateObjectMetadata(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestEndpoint_Object_MultipleVersions(t *testing.T) {
|
func TestEndpoint_Object_CopyObject(t *testing.T) {
|
||||||
testplanet.Run(t, testplanet.Config{
|
testplanet.Run(t, testplanet.Config{
|
||||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||||
Reconfigure: testplanet.Reconfigure{
|
|
||||||
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
|
||||||
config.Metainfo.MultipleVersions = true
|
|
||||||
config.Metainfo.PieceDeletion.DeleteSuccessThreshold = 1
|
|
||||||
|
|
||||||
testplanet.ReconfigureRS(2, 3, 4, 4)(log, index, config)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
||||||
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
|
|
||||||
require.NoError(t, err)
|
|
||||||
defer ctx.Check(project.Close)
|
|
||||||
|
|
||||||
deleteBucket := func(bucketName string) func() error {
|
|
||||||
return func() error {
|
|
||||||
_, err := project.DeleteBucketWithObjects(ctx, bucketName)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Run("multiple versions", func(t *testing.T) {
|
|
||||||
defer ctx.Check(deleteBucket("multipleversions"))
|
|
||||||
|
|
||||||
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "multipleversions", "object", testrand.Bytes(10*memory.MiB))
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// override object to have it with version 2
|
|
||||||
expectedData := testrand.Bytes(11 * memory.KiB)
|
|
||||||
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "multipleversions", "object", expectedData)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
objects, err := planet.Satellites[0].Metabase.DB.TestingAllObjects(ctx)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Len(t, objects, 1)
|
|
||||||
require.EqualValues(t, 2, objects[0].Version)
|
|
||||||
|
|
||||||
// add some pending uploads, each will have version higher then 2
|
|
||||||
uploadIDs := []string{}
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
info, err := project.BeginUpload(ctx, "multipleversions", "object", nil)
|
|
||||||
require.NoError(t, err)
|
|
||||||
uploadIDs = append(uploadIDs, info.UploadID)
|
|
||||||
}
|
|
||||||
|
|
||||||
checkDownload := func(objectKey string, expectedData []byte) {
|
|
||||||
data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], "multipleversions", objectKey)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, expectedData, data)
|
|
||||||
}
|
|
||||||
|
|
||||||
checkDownload("object", expectedData)
|
|
||||||
|
|
||||||
err = project.MoveObject(ctx, "multipleversions", "object", "multipleversions", "object_moved", nil)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
checkDownload("object_moved", expectedData)
|
|
||||||
|
|
||||||
err = project.MoveObject(ctx, "multipleversions", "object_moved", "multipleversions", "object", nil)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
checkDownload("object", expectedData)
|
|
||||||
|
|
||||||
iterator := project.ListObjects(ctx, "multipleversions", nil)
|
|
||||||
require.True(t, iterator.Next())
|
|
||||||
require.Equal(t, "object", iterator.Item().Key)
|
|
||||||
require.NoError(t, iterator.Err())
|
|
||||||
|
|
||||||
// upload multipleversions/object once again as we just moved it
|
|
||||||
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "multipleversions", "object", expectedData)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
checkDownload("object", expectedData)
|
|
||||||
|
|
||||||
{ // server side copy
|
|
||||||
_, err = project.CopyObject(ctx, "multipleversions", "object", "multipleversions", "object_copy", nil)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
checkDownload("object_copy", expectedData)
|
|
||||||
|
|
||||||
_, err = project.DeleteObject(ctx, "multipleversions", "object")
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
_, err = project.CopyObject(ctx, "multipleversions", "object_copy", "multipleversions", "object", nil)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
checkDownload("object", expectedData)
|
|
||||||
|
|
||||||
_, err = project.DeleteObject(ctx, "multipleversions", "object_copy")
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
checkDownload("object", expectedData)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = project.AbortUpload(ctx, "multipleversions", "object", uploadIDs[0])
|
|
||||||
require.NoError(t, err)
|
|
||||||
checkDownload("object", expectedData)
|
|
||||||
|
|
||||||
expectedData = testrand.Bytes(12 * memory.KiB)
|
|
||||||
upload, err := project.UploadPart(ctx, "multipleversions", "object", uploadIDs[1], 1)
|
|
||||||
require.NoError(t, err)
|
|
||||||
_, err = upload.Write(expectedData)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.NoError(t, upload.Commit())
|
|
||||||
_, err = project.CommitUpload(ctx, "multipleversions", "object", uploadIDs[1], nil)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
checkDownload("object", expectedData)
|
|
||||||
|
|
||||||
_, err = project.DeleteObject(ctx, "multipleversions", "object")
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
_, err = project.DeleteObject(ctx, "multipleversions", "object_moved")
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
iterator = project.ListObjects(ctx, "multipleversions", nil)
|
|
||||||
require.False(t, iterator.Next())
|
|
||||||
require.NoError(t, iterator.Err())
|
|
||||||
|
|
||||||
// use next available pending upload
|
|
||||||
upload, err = project.UploadPart(ctx, "multipleversions", "object", uploadIDs[2], 1)
|
|
||||||
require.NoError(t, err)
|
|
||||||
_, err = upload.Write(expectedData)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.NoError(t, upload.Commit())
|
|
||||||
_, err = project.CommitUpload(ctx, "multipleversions", "object", uploadIDs[2], nil)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
checkDownload("object", expectedData)
|
|
||||||
|
|
||||||
uploads := project.ListUploads(ctx, "multipleversions", nil)
|
|
||||||
count := 0
|
|
||||||
for uploads.Next() {
|
|
||||||
require.Equal(t, "object", uploads.Item().Key)
|
|
||||||
count++
|
|
||||||
}
|
|
||||||
// we started with 10 pending object and during test we abort/commit 3 objects
|
|
||||||
pendingUploadsLeft := 7
|
|
||||||
require.Equal(t, pendingUploadsLeft, count)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("override object", func(t *testing.T) {
|
|
||||||
defer ctx.Check(deleteBucket("bucket"))
|
|
||||||
|
|
||||||
bucketName := "bucket"
|
|
||||||
objectName := "file1"
|
|
||||||
|
|
||||||
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, objectName, testrand.Bytes(5*memory.KiB))
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Len(t, segments, 1)
|
|
||||||
|
|
||||||
pieceIDs := map[storj.NodeID]storj.PieceID{}
|
|
||||||
for _, piece := range segments[0].Pieces {
|
|
||||||
pieceIDs[piece.StorageNode] = segments[0].RootPieceID.Derive(piece.StorageNode, int32(piece.Number))
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, node := range planet.StorageNodes {
|
|
||||||
pieceID, ok := pieceIDs[node.ID()]
|
|
||||||
require.True(t, ok)
|
|
||||||
piece, err := node.DB.Pieces().Stat(ctx, storage.BlobRef{
|
|
||||||
Namespace: planet.Satellites[0].ID().Bytes(),
|
|
||||||
Key: pieceID.Bytes(),
|
|
||||||
})
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.NotNil(t, piece)
|
|
||||||
}
|
|
||||||
|
|
||||||
expectedData := testrand.Bytes(5 * memory.KiB)
|
|
||||||
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, objectName, expectedData)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
planet.WaitForStorageNodeDeleters(ctx)
|
|
||||||
|
|
||||||
// verify that old object pieces are not stored on storage nodes anymore
|
|
||||||
for _, node := range planet.StorageNodes {
|
|
||||||
pieceID, ok := pieceIDs[node.ID()]
|
|
||||||
require.True(t, ok)
|
|
||||||
|
|
||||||
piece, err := node.DB.Pieces().Stat(ctx, storage.BlobRef{
|
|
||||||
Namespace: planet.Satellites[0].ID().Bytes(),
|
|
||||||
Key: pieceID.Bytes(),
|
|
||||||
})
|
|
||||||
require.Error(t, err)
|
|
||||||
require.Nil(t, piece)
|
|
||||||
}
|
|
||||||
|
|
||||||
data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], bucketName, objectName)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, expectedData, data)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestEndpoint_Object_CopyObject_MultipleVersions(t *testing.T) {
|
|
||||||
testplanet.Run(t, testplanet.Config{
|
|
||||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
|
||||||
Reconfigure: testplanet.Reconfigure{
|
|
||||||
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
|
||||||
config.Metainfo.MultipleVersions = true
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||||
checkDownload := func(objectKey string, expectedData []byte) {
|
checkDownload := func(objectKey string, expectedData []byte) {
|
||||||
data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], "multipleversions", objectKey)
|
data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], "multipleversions", objectKey)
|
||||||
@ -2310,14 +2282,9 @@ func TestEndpoint_Object_CopyObject_MultipleVersions(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestEndpoint_Object_MoveObject_MultipleVersions(t *testing.T) {
|
func TestEndpoint_Object_MoveObject(t *testing.T) {
|
||||||
testplanet.Run(t, testplanet.Config{
|
testplanet.Run(t, testplanet.Config{
|
||||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||||
Reconfigure: testplanet.Reconfigure{
|
|
||||||
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
|
||||||
config.Metainfo.MultipleVersions = true
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||||
expectedDataA := testrand.Bytes(7 * memory.KiB)
|
expectedDataA := testrand.Bytes(7 * memory.KiB)
|
||||||
|
|
||||||
|
3
scripts/testdata/satellite-config.yaml.lock
vendored
3
scripts/testdata/satellite-config.yaml.lock
vendored
@ -610,9 +610,6 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
|
|||||||
# minimum remote segment size
|
# minimum remote segment size
|
||||||
# metainfo.min-remote-segment-size: 1.2 KiB
|
# metainfo.min-remote-segment-size: 1.2 KiB
|
||||||
|
|
||||||
# feature flag to enable using multple objects versions in the system internally
|
|
||||||
# metainfo.multiple-versions: true
|
|
||||||
|
|
||||||
# toggle flag if overlay is enabled
|
# toggle flag if overlay is enabled
|
||||||
# metainfo.overlay: true
|
# metainfo.overlay: true
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user