satellite/metabase: always try to remove old version on commit
We have a bug in our behavior while doing API pods deployment. At this time its possible to have pods with multiple versions flag set true only partially for some of pods. Because of that it's possible to start new object without removing existing/older version on BeginObject (new behavior) and also don't remove that existing/older object on CommitObject. That can cause to have two committed objects with different versions and that's a state we want to avoid. To fix it we are removing multiple versions flag from CommitObject to always try delete existing objects. This way even if we don't remove existing object on BeginObject it will be always removed while committing. Fixes https://github.com/storj/storj/issues/5373 Change-Id: Idc334bf5cc785d2f559af96e92c3de6d82ca58ba
This commit is contained in:
parent
7461ffe148
commit
5c2131ed0d
@ -584,40 +584,38 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
|
||||
}
|
||||
|
||||
versionsToDelete := []Version{}
|
||||
if db.config.MultipleVersions {
|
||||
if err := withRows(tx.QueryContext(ctx, `
|
||||
SELECT version
|
||||
FROM objects
|
||||
WHERE
|
||||
project_id = $1 AND
|
||||
bucket_name = $2 AND
|
||||
object_key = $3 AND
|
||||
status = `+committedStatus,
|
||||
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey))(func(rows tagsql.Rows) error {
|
||||
for rows.Next() {
|
||||
var version Version
|
||||
if err := rows.Scan(&version); err != nil {
|
||||
return Error.New("failed to scan previous object: %w", err)
|
||||
}
|
||||
|
||||
versionsToDelete = append(versionsToDelete, version)
|
||||
if err := withRows(tx.QueryContext(ctx, `
|
||||
SELECT version
|
||||
FROM objects
|
||||
WHERE
|
||||
project_id = $1 AND
|
||||
bucket_name = $2 AND
|
||||
object_key = $3 AND
|
||||
status = `+committedStatus,
|
||||
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey))(func(rows tagsql.Rows) error {
|
||||
for rows.Next() {
|
||||
var version Version
|
||||
if err := rows.Scan(&version); err != nil {
|
||||
return Error.New("failed to scan previous object: %w", err)
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return Error.New("failed to find previous objects: %w", err)
|
||||
}
|
||||
|
||||
if len(versionsToDelete) > 1 {
|
||||
db.log.Warn("object with multiple committed versions were found!",
|
||||
zap.Stringer("Project ID", opts.ProjectID), zap.String("Bucket Name", opts.BucketName),
|
||||
zap.String("Object Key", string(opts.ObjectKey)))
|
||||
|
||||
mon.Meter("multiple_committed_versions").Mark(1)
|
||||
versionsToDelete = append(versionsToDelete, version)
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return Error.New("failed to find previous objects: %w", err)
|
||||
}
|
||||
|
||||
if len(versionsToDelete) != 0 && opts.DisallowDelete {
|
||||
return ErrPermissionDenied.New("no permissions to delete existing object")
|
||||
}
|
||||
if len(versionsToDelete) > 1 {
|
||||
db.log.Warn("object with multiple committed versions were found!",
|
||||
zap.Stringer("Project ID", opts.ProjectID), zap.String("Bucket Name", opts.BucketName),
|
||||
zap.ByteString("Object Key", []byte(opts.ObjectKey)), zap.Int("deleted", len(versionsToDelete)))
|
||||
|
||||
mon.Meter("multiple_committed_versions").Mark(1)
|
||||
}
|
||||
|
||||
if len(versionsToDelete) != 0 && opts.DisallowDelete {
|
||||
return ErrPermissionDenied.New("no permissions to delete existing object")
|
||||
}
|
||||
|
||||
err = tx.QueryRowContext(ctx, `
|
||||
|
@ -158,7 +158,6 @@ func TestBeginObjectNextVersion(t *testing.T) {
|
||||
t.Run("older committed version exists", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
now1 := time.Now()
|
||||
objectStream.Version = metabase.NextVersion
|
||||
|
||||
metabasetest.BeginObjectNextVersion{
|
||||
@ -202,21 +201,9 @@ func TestBeginObjectNextVersion(t *testing.T) {
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
// currently CommitObject always deletes previous versions so only version 2 left
|
||||
metabasetest.Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
{
|
||||
ObjectStream: metabase.ObjectStream{
|
||||
ProjectID: obj.ProjectID,
|
||||
BucketName: obj.BucketName,
|
||||
ObjectKey: obj.ObjectKey,
|
||||
Version: 1,
|
||||
StreamID: obj.StreamID,
|
||||
},
|
||||
CreatedAt: now1,
|
||||
Status: metabase.Committed,
|
||||
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
},
|
||||
{
|
||||
ObjectStream: metabase.ObjectStream{
|
||||
ProjectID: obj.ProjectID,
|
||||
@ -249,7 +236,6 @@ func TestBeginObjectNextVersion(t *testing.T) {
|
||||
Version: 1,
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
now2 := time.Now()
|
||||
metabasetest.BeginObjectNextVersion{
|
||||
Opts: metabase.BeginObjectNextVersion{
|
||||
ObjectStream: objectStream,
|
||||
@ -282,6 +268,7 @@ func TestBeginObjectNextVersion(t *testing.T) {
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
// currently CommitObject always deletes previous versions so only version 1 left
|
||||
metabasetest.Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
{
|
||||
@ -295,19 +282,6 @@ func TestBeginObjectNextVersion(t *testing.T) {
|
||||
CreatedAt: now1,
|
||||
Status: metabase.Committed,
|
||||
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
},
|
||||
{
|
||||
ObjectStream: metabase.ObjectStream{
|
||||
ProjectID: obj.ProjectID,
|
||||
BucketName: obj.BucketName,
|
||||
ObjectKey: obj.ObjectKey,
|
||||
Version: 2,
|
||||
StreamID: obj.StreamID,
|
||||
},
|
||||
CreatedAt: now2,
|
||||
Status: metabase.Committed,
|
||||
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
},
|
||||
},
|
||||
@ -364,157 +338,6 @@ func TestBeginObjectNextVersion(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestBeginObjectNextVersionMultipleVersions(t *testing.T) {
|
||||
// TODO partially duplicated TestBeginObjectNextVersion tests to cover MultipleVersions enabled
|
||||
// to be removed when flag will be removed
|
||||
metabasetest.RunWithConfig(t, metabase.Config{
|
||||
ApplicationName: "satellite-test",
|
||||
MultipleVersions: true,
|
||||
}, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||
obj := metabasetest.RandObjectStream()
|
||||
|
||||
objectStream := metabase.ObjectStream{
|
||||
ProjectID: obj.ProjectID,
|
||||
BucketName: obj.BucketName,
|
||||
ObjectKey: obj.ObjectKey,
|
||||
StreamID: obj.StreamID,
|
||||
}
|
||||
|
||||
t.Run("older committed version exists", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
objectStream.Version = metabase.NextVersion
|
||||
|
||||
metabasetest.BeginObjectNextVersion{
|
||||
Opts: metabase.BeginObjectNextVersion{
|
||||
ObjectStream: objectStream,
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
},
|
||||
Version: 1,
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.CommitObject{
|
||||
Opts: metabase.CommitObject{
|
||||
ObjectStream: metabase.ObjectStream{
|
||||
ProjectID: obj.ProjectID,
|
||||
BucketName: obj.BucketName,
|
||||
ObjectKey: obj.ObjectKey,
|
||||
Version: 1,
|
||||
StreamID: obj.StreamID,
|
||||
},
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
now2 := time.Now()
|
||||
metabasetest.BeginObjectNextVersion{
|
||||
Opts: metabase.BeginObjectNextVersion{
|
||||
ObjectStream: objectStream,
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
},
|
||||
Version: 2,
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
// CommitObject will also delete old object with version 1
|
||||
metabasetest.CommitObject{
|
||||
Opts: metabase.CommitObject{
|
||||
ObjectStream: metabase.ObjectStream{
|
||||
ProjectID: obj.ProjectID,
|
||||
BucketName: obj.BucketName,
|
||||
ObjectKey: obj.ObjectKey,
|
||||
Version: 2,
|
||||
StreamID: obj.StreamID,
|
||||
},
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
{
|
||||
ObjectStream: metabase.ObjectStream{
|
||||
ProjectID: obj.ProjectID,
|
||||
BucketName: obj.BucketName,
|
||||
ObjectKey: obj.ObjectKey,
|
||||
Version: 2,
|
||||
StreamID: obj.StreamID,
|
||||
},
|
||||
CreatedAt: now2,
|
||||
Status: metabase.Committed,
|
||||
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
},
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("newer committed version exists", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
now1 := time.Now()
|
||||
|
||||
objectStream.Version = metabase.NextVersion
|
||||
|
||||
metabasetest.BeginObjectNextVersion{
|
||||
Opts: metabase.BeginObjectNextVersion{
|
||||
ObjectStream: objectStream,
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
},
|
||||
Version: 1,
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.BeginObjectNextVersion{
|
||||
Opts: metabase.BeginObjectNextVersion{
|
||||
ObjectStream: objectStream,
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
},
|
||||
Version: 2,
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.CommitObject{
|
||||
Opts: metabase.CommitObject{
|
||||
ObjectStream: metabase.ObjectStream{
|
||||
ProjectID: obj.ProjectID,
|
||||
BucketName: obj.BucketName,
|
||||
ObjectKey: obj.ObjectKey,
|
||||
Version: 2,
|
||||
StreamID: obj.StreamID,
|
||||
},
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
// CommitObject will also delete old object with version 2
|
||||
metabasetest.CommitObject{
|
||||
Opts: metabase.CommitObject{
|
||||
ObjectStream: metabase.ObjectStream{
|
||||
ProjectID: obj.ProjectID,
|
||||
BucketName: obj.BucketName,
|
||||
ObjectKey: obj.ObjectKey,
|
||||
Version: 1,
|
||||
StreamID: obj.StreamID,
|
||||
},
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
{
|
||||
ObjectStream: metabase.ObjectStream{
|
||||
ProjectID: obj.ProjectID,
|
||||
BucketName: obj.BucketName,
|
||||
ObjectKey: obj.ObjectKey,
|
||||
Version: 1,
|
||||
StreamID: obj.StreamID,
|
||||
},
|
||||
CreatedAt: now1,
|
||||
Status: metabase.Committed,
|
||||
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
},
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestBeginObjectExactVersion(t *testing.T) {
|
||||
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||
obj := metabasetest.RandObjectStream()
|
||||
@ -758,21 +581,9 @@ func TestBeginObjectExactVersion(t *testing.T) {
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
// currently CommitObject always deletes previous versions so only version 3 left
|
||||
metabasetest.Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
{
|
||||
ObjectStream: metabase.ObjectStream{
|
||||
ProjectID: obj.ProjectID,
|
||||
BucketName: obj.BucketName,
|
||||
ObjectKey: obj.ObjectKey,
|
||||
Version: 1,
|
||||
StreamID: obj.StreamID,
|
||||
},
|
||||
CreatedAt: now1,
|
||||
Status: metabase.Committed,
|
||||
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
},
|
||||
{
|
||||
ObjectStream: metabase.ObjectStream{
|
||||
ProjectID: obj.ProjectID,
|
||||
@ -827,6 +638,7 @@ func TestBeginObjectExactVersion(t *testing.T) {
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
// currently CommitObject always deletes previous versions so only version 1 left
|
||||
metabasetest.Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
{
|
||||
@ -840,19 +652,6 @@ func TestBeginObjectExactVersion(t *testing.T) {
|
||||
CreatedAt: now1,
|
||||
Status: metabase.Committed,
|
||||
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
},
|
||||
{
|
||||
ObjectStream: metabase.ObjectStream{
|
||||
ProjectID: obj.ProjectID,
|
||||
BucketName: obj.BucketName,
|
||||
ObjectKey: obj.ObjectKey,
|
||||
Version: 3,
|
||||
StreamID: obj.StreamID,
|
||||
},
|
||||
CreatedAt: now1,
|
||||
Status: metabase.Committed,
|
||||
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
},
|
||||
},
|
||||
@ -3406,3 +3205,46 @@ func TestCommitObjectWithIncorrectAmountOfParts(t *testing.T) {
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestMultipleVersionsBug(t *testing.T) {
|
||||
// test simulates case when we have different configurations for different
|
||||
// API instances in the system (multiple versions flag)
|
||||
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||
obj := metabasetest.RandObjectStream()
|
||||
|
||||
// simulates code WITHOUT multiple versions flag enabled
|
||||
obj.Version = metabase.DefaultVersion
|
||||
_, err := db.BeginObjectExactVersion(ctx, metabase.BeginObjectExactVersion{
|
||||
ObjectStream: obj,
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// this commit will be run WITH multiple versions flag enabled
|
||||
_, err = db.CommitObject(ctx, metabase.CommitObject{
|
||||
ObjectStream: obj,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// start overriding object
|
||||
|
||||
// simulates code WITH multiple versions flag enabled
|
||||
obj.Version = metabase.NextVersion
|
||||
pendingObject, err := db.BeginObjectNextVersion(ctx, metabase.BeginObjectNextVersion{
|
||||
ObjectStream: obj,
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
obj.Version = pendingObject.Version
|
||||
db.TestingEnableMultipleVersions(false)
|
||||
_, err = db.CommitObject(ctx, metabase.CommitObject{
|
||||
ObjectStream: obj,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
objects, err := db.TestingAllObjects(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, len(objects))
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user