satellite/metabase: use commit order for objects
This changes metabase behavior such that the latest object will be the committed object which may result in a new version assigned to the objects. Change-Id: I7a731d7db4696caba75fff65107a248569b6111f
This commit is contained in:
parent
eff1719977
commit
a4edbdd005
@ -692,11 +692,11 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
|
|||||||
var highestVersion Version
|
var highestVersion Version
|
||||||
if opts.Versioned {
|
if opts.Versioned {
|
||||||
// TODO(ver): fold this into the queries below using a subquery.
|
// TODO(ver): fold this into the queries below using a subquery.
|
||||||
v, err := db.queryHighestVersion(ctx, opts.Location(), tx)
|
maxVersion, err := db.queryHighestVersion(ctx, opts.Location(), tx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Error.Wrap(err)
|
return Error.Wrap(err)
|
||||||
}
|
}
|
||||||
highestVersion = v
|
highestVersion = maxVersion
|
||||||
} else {
|
} else {
|
||||||
// TODO(ver): fold this into the query below using a subquery using `ON CONFLICT` on the unique index.
|
// TODO(ver): fold this into the query below using a subquery using `ON CONFLICT` on the unique index.
|
||||||
// Note, we are prematurely deleting the object without permissions
|
// Note, we are prematurely deleting the object without permissions
|
||||||
@ -708,7 +708,6 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
|
|||||||
if deleteResult.DeletedObjectCount > 0 && opts.DisallowDelete {
|
if deleteResult.DeletedObjectCount > 0 && opts.DisallowDelete {
|
||||||
return ErrPermissionDenied.New("no permissions to delete existing object")
|
return ErrPermissionDenied.New("no permissions to delete existing object")
|
||||||
}
|
}
|
||||||
|
|
||||||
highestVersion = deleteResult.MaxVersion
|
highestVersion = deleteResult.MaxVersion
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -784,6 +783,13 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
|
|||||||
return Error.New("failed to update object: %w", err)
|
return Error.New("failed to update object: %w", err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
nextVersion := opts.Version
|
||||||
|
if nextVersion < highestVersion {
|
||||||
|
nextVersion = highestVersion + 1
|
||||||
|
}
|
||||||
|
args = append(args, nextVersion)
|
||||||
|
opts.Version = nextVersion
|
||||||
|
|
||||||
metadataColumns := ""
|
metadataColumns := ""
|
||||||
if opts.OverrideEncryptedMetadata {
|
if opts.OverrideEncryptedMetadata {
|
||||||
args = append(args,
|
args = append(args,
|
||||||
@ -792,13 +798,14 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
|
|||||||
opts.EncryptedMetadataEncryptedKey,
|
opts.EncryptedMetadataEncryptedKey,
|
||||||
)
|
)
|
||||||
metadataColumns = `,
|
metadataColumns = `,
|
||||||
encrypted_metadata_nonce = $12,
|
encrypted_metadata_nonce = $13,
|
||||||
encrypted_metadata = $13,
|
encrypted_metadata = $14,
|
||||||
encrypted_metadata_encrypted_key = $14
|
encrypted_metadata_encrypted_key = $15
|
||||||
`
|
`
|
||||||
}
|
}
|
||||||
err = tx.QueryRowContext(ctx, `
|
err = tx.QueryRowContext(ctx, `
|
||||||
UPDATE objects SET
|
UPDATE objects SET
|
||||||
|
version = $12,
|
||||||
status = $6,
|
status = $6,
|
||||||
segment_count = $7,
|
segment_count = $7,
|
||||||
|
|
||||||
|
@ -51,23 +51,31 @@ func (db *DB) CommitObjectWithSegments(ctx context.Context, opts CommitObjectWit
|
|||||||
return Object{}, nil, err
|
return Object{}, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var previousObject deleteObjectUnversionedCommittedResult
|
var deleted deleteObjectUnversionedCommittedResult
|
||||||
|
|
||||||
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) error {
|
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) error {
|
||||||
// TODO: should we prevent this from executing when the object has been committed
|
// TODO: should we prevent this from executing when the object has been committed
|
||||||
// currently this requires quite a lot of database communication, so invalid handling can be expensive.
|
// currently this requires quite a lot of database communication, so invalid handling can be expensive.
|
||||||
|
|
||||||
|
var highestVersion Version
|
||||||
if !opts.Versioned {
|
if !opts.Versioned {
|
||||||
var err error
|
var err error
|
||||||
// Note, we are prematurely deleting the object without permissions
|
// Note, we are prematurely deleting the object without permissions
|
||||||
// and then rolling the action back, if we were not allowed to.
|
// and then rolling the action back, if we were not allowed to.
|
||||||
previousObject, err = db.deleteObjectUnversionedCommitted(ctx, opts.Location(), tx)
|
deleted, err = db.deleteObjectUnversionedCommitted(ctx, opts.Location(), tx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if previousObject.DeletedObjectCount > 0 && opts.DisallowDelete {
|
if deleted.DeletedObjectCount > 0 && opts.DisallowDelete {
|
||||||
return ErrPermissionDenied.New("no permissions to delete existing object")
|
return ErrPermissionDenied.New("no permissions to delete existing object")
|
||||||
}
|
}
|
||||||
|
highestVersion = deleted.MaxVersion
|
||||||
|
} else {
|
||||||
|
v, err := db.queryHighestVersion(ctx, opts.Location(), tx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
highestVersion = v
|
||||||
}
|
}
|
||||||
|
|
||||||
segmentsInDatabase, err := fetchSegmentsForCommit(ctx, tx, opts.StreamID)
|
segmentsInDatabase, err := fetchSegmentsForCommit(ctx, tx, opts.StreamID)
|
||||||
@ -113,9 +121,14 @@ func (db *DB) CommitObjectWithSegments(ctx context.Context, opts CommitObjectWit
|
|||||||
}
|
}
|
||||||
|
|
||||||
nextStatus := committedWhereVersioned(opts.Versioned)
|
nextStatus := committedWhereVersioned(opts.Versioned)
|
||||||
|
nextVersion := opts.Version
|
||||||
|
if nextVersion < highestVersion {
|
||||||
|
nextVersion = highestVersion + 1
|
||||||
|
}
|
||||||
|
|
||||||
err = tx.QueryRowContext(ctx, `
|
err = tx.QueryRowContext(ctx, `
|
||||||
UPDATE objects SET
|
UPDATE objects SET
|
||||||
|
version = $14,
|
||||||
status = $6,
|
status = $6,
|
||||||
segment_count = $7,
|
segment_count = $7,
|
||||||
|
|
||||||
@ -138,6 +151,7 @@ func (db *DB) CommitObjectWithSegments(ctx context.Context, opts CommitObjectWit
|
|||||||
totalPlainSize,
|
totalPlainSize,
|
||||||
totalEncryptedSize,
|
totalEncryptedSize,
|
||||||
fixedSegmentSize,
|
fixedSegmentSize,
|
||||||
|
nextVersion,
|
||||||
).
|
).
|
||||||
Scan(
|
Scan(
|
||||||
&object.CreatedAt, &object.ExpiresAt,
|
&object.CreatedAt, &object.ExpiresAt,
|
||||||
@ -154,7 +168,7 @@ func (db *DB) CommitObjectWithSegments(ctx context.Context, opts CommitObjectWit
|
|||||||
object.ProjectID = opts.ProjectID
|
object.ProjectID = opts.ProjectID
|
||||||
object.BucketName = opts.BucketName
|
object.BucketName = opts.BucketName
|
||||||
object.ObjectKey = opts.ObjectKey
|
object.ObjectKey = opts.ObjectKey
|
||||||
object.Version = opts.Version
|
object.Version = nextVersion
|
||||||
object.Status = nextStatus
|
object.Status = nextStatus
|
||||||
object.SegmentCount = int32(len(finalSegments))
|
object.SegmentCount = int32(len(finalSegments))
|
||||||
object.EncryptedMetadataNonce = opts.EncryptedMetadataNonce
|
object.EncryptedMetadataNonce = opts.EncryptedMetadataNonce
|
||||||
|
@ -332,6 +332,7 @@ func TestCommitObjectWithSegments(t *testing.T) {
|
|||||||
EncryptedMetadata: encryptedMetadata,
|
EncryptedMetadata: encryptedMetadata,
|
||||||
EncryptedMetadataEncryptedKey: encryptedMetadataKey,
|
EncryptedMetadataEncryptedKey: encryptedMetadataKey,
|
||||||
},
|
},
|
||||||
|
ExpectVersion: obj.Version + 1,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
metabasetest.Verify{
|
metabasetest.Verify{
|
||||||
@ -341,7 +342,7 @@ func TestCommitObjectWithSegments(t *testing.T) {
|
|||||||
ProjectID: obj.ProjectID,
|
ProjectID: obj.ProjectID,
|
||||||
BucketName: obj.BucketName,
|
BucketName: obj.BucketName,
|
||||||
ObjectKey: obj.ObjectKey,
|
ObjectKey: obj.ObjectKey,
|
||||||
Version: 5,
|
Version: obj.Version + 1,
|
||||||
StreamID: obj.StreamID,
|
StreamID: obj.StreamID,
|
||||||
},
|
},
|
||||||
CreatedAt: now,
|
CreatedAt: now,
|
||||||
|
@ -264,6 +264,7 @@ func TestBeginObjectNextVersion(t *testing.T) {
|
|||||||
StreamID: obj.StreamID,
|
StreamID: obj.StreamID,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
ExpectVersion: 3,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
// currently CommitObject always deletes previous versions so only version 1 left
|
// currently CommitObject always deletes previous versions so only version 1 left
|
||||||
@ -274,7 +275,7 @@ func TestBeginObjectNextVersion(t *testing.T) {
|
|||||||
ProjectID: obj.ProjectID,
|
ProjectID: obj.ProjectID,
|
||||||
BucketName: obj.BucketName,
|
BucketName: obj.BucketName,
|
||||||
ObjectKey: obj.ObjectKey,
|
ObjectKey: obj.ObjectKey,
|
||||||
Version: 1,
|
Version: 3,
|
||||||
StreamID: obj.StreamID,
|
StreamID: obj.StreamID,
|
||||||
},
|
},
|
||||||
CreatedAt: now1,
|
CreatedAt: now1,
|
||||||
@ -799,6 +800,7 @@ func TestBeginObjectExactVersion(t *testing.T) {
|
|||||||
Opts: metabase.CommitObject{
|
Opts: metabase.CommitObject{
|
||||||
ObjectStream: objectStream,
|
ObjectStream: objectStream,
|
||||||
},
|
},
|
||||||
|
ExpectVersion: 301,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
// currently CommitObject always deletes previous versions so only version 1 left
|
// currently CommitObject always deletes previous versions so only version 1 left
|
||||||
@ -809,7 +811,7 @@ func TestBeginObjectExactVersion(t *testing.T) {
|
|||||||
ProjectID: obj.ProjectID,
|
ProjectID: obj.ProjectID,
|
||||||
BucketName: obj.BucketName,
|
BucketName: obj.BucketName,
|
||||||
ObjectKey: obj.ObjectKey,
|
ObjectKey: obj.ObjectKey,
|
||||||
Version: 100,
|
Version: 301,
|
||||||
StreamID: obj.StreamID,
|
StreamID: obj.StreamID,
|
||||||
},
|
},
|
||||||
CreatedAt: now1,
|
CreatedAt: now1,
|
||||||
@ -4432,21 +4434,27 @@ func TestCommitObjectVersioned(t *testing.T) {
|
|||||||
ObjectStream: v1,
|
ObjectStream: v1,
|
||||||
Versioned: true,
|
Versioned: true,
|
||||||
},
|
},
|
||||||
|
ExpectVersion: v3.Version + 1,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
v1.Version = v3.Version + 1
|
||||||
|
|
||||||
metabasetest.CommitObject{
|
metabasetest.CommitObject{
|
||||||
Opts: metabase.CommitObject{
|
Opts: metabase.CommitObject{
|
||||||
ObjectStream: v2,
|
ObjectStream: v2,
|
||||||
Versioned: true,
|
Versioned: true,
|
||||||
},
|
},
|
||||||
|
ExpectVersion: v3.Version + 2,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
v2.Version = v1.Version + 1
|
||||||
|
|
||||||
metabasetest.CommitObject{
|
metabasetest.CommitObject{
|
||||||
Opts: metabase.CommitObject{
|
Opts: metabase.CommitObject{
|
||||||
ObjectStream: v3,
|
ObjectStream: v3,
|
||||||
Versioned: true,
|
Versioned: true,
|
||||||
},
|
},
|
||||||
|
ExpectVersion: v3.Version + 3,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
v3.Version = v2.Version + 1
|
||||||
|
|
||||||
metabasetest.Verify{
|
metabasetest.Verify{
|
||||||
Objects: []metabase.RawObject{
|
Objects: []metabase.RawObject{
|
||||||
@ -4558,13 +4566,17 @@ func TestCommitObjectVersioned(t *testing.T) {
|
|||||||
Opts: metabase.CommitObject{
|
Opts: metabase.CommitObject{
|
||||||
ObjectStream: v3,
|
ObjectStream: v3,
|
||||||
},
|
},
|
||||||
|
ExpectVersion: 5,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
v3.Version = 5
|
||||||
|
|
||||||
metabasetest.CommitObject{
|
metabasetest.CommitObject{
|
||||||
Opts: metabase.CommitObject{
|
Opts: metabase.CommitObject{
|
||||||
ObjectStream: v1,
|
ObjectStream: v1,
|
||||||
},
|
},
|
||||||
|
ExpectVersion: 6,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
v1.Version = 6
|
||||||
|
|
||||||
// The latter commit should overwrite the v3.
|
// The latter commit should overwrite the v3.
|
||||||
// When pending objects table is enabled, then objects
|
// When pending objects table is enabled, then objects
|
||||||
@ -4601,14 +4613,18 @@ func TestCommitObjectVersioned(t *testing.T) {
|
|||||||
ObjectStream: v2,
|
ObjectStream: v2,
|
||||||
Versioned: true,
|
Versioned: true,
|
||||||
},
|
},
|
||||||
|
ExpectVersion: 7,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
v2.Version = 7
|
||||||
|
|
||||||
metabasetest.CommitObject{
|
metabasetest.CommitObject{
|
||||||
Opts: metabase.CommitObject{
|
Opts: metabase.CommitObject{
|
||||||
ObjectStream: v4,
|
ObjectStream: v4,
|
||||||
Versioned: true,
|
Versioned: true,
|
||||||
},
|
},
|
||||||
|
ExpectVersion: 8,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
v4.Version = 8
|
||||||
|
|
||||||
metabasetest.Verify{
|
metabasetest.Verify{
|
||||||
Objects: []metabase.RawObject{
|
Objects: []metabase.RawObject{
|
||||||
@ -4719,14 +4735,18 @@ func TestCommitObjectVersioned(t *testing.T) {
|
|||||||
ObjectStream: v1,
|
ObjectStream: v1,
|
||||||
Versioned: true,
|
Versioned: true,
|
||||||
},
|
},
|
||||||
|
ExpectVersion: 5,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
v1.Version = 5
|
||||||
|
|
||||||
metabasetest.CommitObject{
|
metabasetest.CommitObject{
|
||||||
Opts: metabase.CommitObject{
|
Opts: metabase.CommitObject{
|
||||||
ObjectStream: v3,
|
ObjectStream: v3,
|
||||||
Versioned: true,
|
Versioned: true,
|
||||||
},
|
},
|
||||||
|
ExpectVersion: 6,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
v3.Version = 6
|
||||||
|
|
||||||
metabasetest.Verify{
|
metabasetest.Verify{
|
||||||
Objects: []metabase.RawObject{
|
Objects: []metabase.RawObject{
|
||||||
@ -4763,13 +4783,17 @@ func TestCommitObjectVersioned(t *testing.T) {
|
|||||||
Opts: metabase.CommitObject{
|
Opts: metabase.CommitObject{
|
||||||
ObjectStream: v2,
|
ObjectStream: v2,
|
||||||
},
|
},
|
||||||
|
ExpectVersion: 7,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
v2.Version = 7
|
||||||
|
|
||||||
metabasetest.CommitObject{
|
metabasetest.CommitObject{
|
||||||
Opts: metabase.CommitObject{
|
Opts: metabase.CommitObject{
|
||||||
ObjectStream: v4,
|
ObjectStream: v4,
|
||||||
},
|
},
|
||||||
|
ExpectVersion: 8,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
v4.Version = 8
|
||||||
|
|
||||||
// committing v4 should overwrite the previous unversioned commit (v2),
|
// committing v4 should overwrite the previous unversioned commit (v2),
|
||||||
// so v2 is not in the result check
|
// so v2 is not in the result check
|
||||||
@ -4882,7 +4906,9 @@ func TestCommitObjectVersioned(t *testing.T) {
|
|||||||
ObjectStream: v1,
|
ObjectStream: v1,
|
||||||
Versioned: true,
|
Versioned: true,
|
||||||
},
|
},
|
||||||
|
ExpectVersion: 5,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
v1.Version = 5
|
||||||
|
|
||||||
metabasetest.Verify{
|
metabasetest.Verify{
|
||||||
Objects: []metabase.RawObject{
|
Objects: []metabase.RawObject{
|
||||||
@ -4920,7 +4946,9 @@ func TestCommitObjectVersioned(t *testing.T) {
|
|||||||
Opts: metabase.CommitObject{
|
Opts: metabase.CommitObject{
|
||||||
ObjectStream: v2,
|
ObjectStream: v2,
|
||||||
},
|
},
|
||||||
|
ExpectVersion: 6,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
v2.Version = 6
|
||||||
|
|
||||||
metabasetest.Verify{
|
metabasetest.Verify{
|
||||||
Objects: []metabase.RawObject{
|
Objects: []metabase.RawObject{
|
||||||
@ -4958,7 +4986,9 @@ func TestCommitObjectVersioned(t *testing.T) {
|
|||||||
ObjectStream: v3,
|
ObjectStream: v3,
|
||||||
Versioned: true,
|
Versioned: true,
|
||||||
},
|
},
|
||||||
|
ExpectVersion: 7,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
v3.Version = 7
|
||||||
|
|
||||||
metabasetest.Verify{
|
metabasetest.Verify{
|
||||||
Objects: []metabase.RawObject{
|
Objects: []metabase.RawObject{
|
||||||
@ -4994,7 +5024,9 @@ func TestCommitObjectVersioned(t *testing.T) {
|
|||||||
Opts: metabase.CommitObject{
|
Opts: metabase.CommitObject{
|
||||||
ObjectStream: v4,
|
ObjectStream: v4,
|
||||||
},
|
},
|
||||||
|
ExpectVersion: 8,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
v4.Version = 8
|
||||||
|
|
||||||
// committing v4 should overwrite the previous unversioned commit (v2),
|
// committing v4 should overwrite the previous unversioned commit (v2),
|
||||||
// so v2 is not in the result check
|
// so v2 is not in the result check
|
||||||
@ -5043,6 +5075,7 @@ func TestCommitObjectVersioned(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
rawObjects := make([]metabase.RawObject, 0, len(objs))
|
rawObjects := make([]metabase.RawObject, 0, len(objs))
|
||||||
|
expectVersion := metabase.Version(numCommits + 1)
|
||||||
for i := 0; i < len(objs); i++ {
|
for i := 0; i < len(objs); i++ {
|
||||||
versioned := i%2 == 0
|
versioned := i%2 == 0
|
||||||
|
|
||||||
@ -5051,7 +5084,10 @@ func TestCommitObjectVersioned(t *testing.T) {
|
|||||||
ObjectStream: *objs[i],
|
ObjectStream: *objs[i],
|
||||||
Versioned: versioned,
|
Versioned: versioned,
|
||||||
},
|
},
|
||||||
|
ExpectVersion: expectVersion,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
objs[i].Version = expectVersion
|
||||||
|
expectVersion++
|
||||||
|
|
||||||
if versioned {
|
if versioned {
|
||||||
rawObjects = append(rawObjects, metabase.RawObject{
|
rawObjects = append(rawObjects, metabase.RawObject{
|
||||||
|
@ -4,6 +4,7 @@
|
|||||||
package metabase_test
|
package metabase_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -1028,125 +1029,131 @@ func TestFinishCopyObject(t *testing.T) {
|
|||||||
// - expected copy version
|
// - expected copy version
|
||||||
|
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
Bucket string
|
Bucket string
|
||||||
Key metabase.ObjectKey
|
Key metabase.ObjectKey
|
||||||
NewBucket string
|
NewBucket string
|
||||||
NewKey metabase.ObjectKey
|
NewKey metabase.ObjectKey
|
||||||
sourcePendingVersions []metabase.Version
|
sourcePendingVersions []metabase.Version
|
||||||
sourceCommittedVersion metabase.Version
|
sourceCommitVersion metabase.Version
|
||||||
destinationPendingVersions []metabase.Version
|
sourceCommittedVersion metabase.Version
|
||||||
destinationCommittedVersion metabase.Version
|
destinationPendingVersions []metabase.Version
|
||||||
expectedCopyVersion metabase.Version
|
destinationCommitVersion metabase.Version
|
||||||
|
destionationCommittedVersion metabase.Version
|
||||||
|
expectedCopyVersion metabase.Version
|
||||||
}{
|
}{
|
||||||
// the same bucket
|
// the same bucket
|
||||||
{"testbucket", "object", "testbucket", "new-object",
|
0: {"testbucket", "object", "testbucket", "new-object",
|
||||||
[]metabase.Version{}, 2,
|
[]metabase.Version{}, 2, 2,
|
||||||
[]metabase.Version{}, 1,
|
[]metabase.Version{}, 1, 1,
|
||||||
2},
|
2},
|
||||||
{"testbucket", "object", "testbucket", "new-object",
|
1: {"testbucket", "object", "testbucket", "new-object",
|
||||||
[]metabase.Version{}, 1,
|
[]metabase.Version{}, 1, 1,
|
||||||
[]metabase.Version{1}, 2,
|
[]metabase.Version{1}, 2, 2,
|
||||||
3},
|
3},
|
||||||
{"testbucket", "object", "testbucket", "new-object",
|
2: {"testbucket", "object", "testbucket", "new-object",
|
||||||
[]metabase.Version{}, 1,
|
[]metabase.Version{}, 1, 1,
|
||||||
[]metabase.Version{1, 3}, 2,
|
[]metabase.Version{1, 3}, 2, 4,
|
||||||
4},
|
5},
|
||||||
{"testbucket", "object", "testbucket", "new-object",
|
3: {"testbucket", "object", "testbucket", "new-object",
|
||||||
[]metabase.Version{1, 5}, 2,
|
[]metabase.Version{1, 5}, 2, 6,
|
||||||
[]metabase.Version{1, 3}, 2,
|
[]metabase.Version{1, 3}, 2, 4,
|
||||||
4},
|
5},
|
||||||
{"testbucket", "object", "newbucket", "object",
|
4: {"testbucket", "object", "newbucket", "object",
|
||||||
[]metabase.Version{2, 3}, 1,
|
[]metabase.Version{2, 3}, 1, 4,
|
||||||
[]metabase.Version{1, 5}, 2,
|
[]metabase.Version{1, 5}, 2, 6,
|
||||||
6},
|
7},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range testCases {
|
for i, tc := range testCases {
|
||||||
metabasetest.DeleteAll{}.Check(ctx, t, db)
|
t.Run(strconv.Itoa(i), func(t *testing.T) {
|
||||||
sourceObjStream.BucketName = tc.Bucket
|
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||||
sourceObjStream.ObjectKey = tc.Key
|
sourceObjStream.BucketName = tc.Bucket
|
||||||
destinationObjStream.BucketName = tc.NewBucket
|
sourceObjStream.ObjectKey = tc.Key
|
||||||
destinationObjStream.ObjectKey = tc.NewKey
|
destinationObjStream.BucketName = tc.NewBucket
|
||||||
|
destinationObjStream.ObjectKey = tc.NewKey
|
||||||
|
|
||||||
var rawObjects []metabase.RawObject
|
var rawObjects []metabase.RawObject
|
||||||
for _, version := range tc.sourcePendingVersions {
|
for _, version := range tc.sourcePendingVersions {
|
||||||
sourceObjStream.Version = version
|
sourceObjStream.Version = version
|
||||||
|
sourceObjStream.StreamID = testrand.UUID()
|
||||||
|
metabasetest.CreatePendingObject(ctx, t, db, sourceObjStream, 0)
|
||||||
|
|
||||||
|
rawObjects = append(rawObjects, metabase.RawObject{
|
||||||
|
ObjectStream: sourceObjStream,
|
||||||
|
CreatedAt: now,
|
||||||
|
Status: metabase.Pending,
|
||||||
|
|
||||||
|
Encryption: metabasetest.DefaultEncryption,
|
||||||
|
ZombieDeletionDeadline: &zombieDeadline,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
sourceObjStream.Version = tc.sourceCommitVersion
|
||||||
sourceObjStream.StreamID = testrand.UUID()
|
sourceObjStream.StreamID = testrand.UUID()
|
||||||
metabasetest.CreatePendingObject(ctx, t, db, sourceObjStream, 0)
|
sourceObj, _ := metabasetest.CreateTestObject{
|
||||||
|
|
||||||
rawObjects = append(rawObjects, metabase.RawObject{
|
|
||||||
ObjectStream: sourceObjStream,
|
|
||||||
CreatedAt: now,
|
|
||||||
Status: metabase.Pending,
|
|
||||||
|
|
||||||
Encryption: metabasetest.DefaultEncryption,
|
|
||||||
ZombieDeletionDeadline: &zombieDeadline,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
sourceObjStream.Version = tc.sourceCommittedVersion
|
|
||||||
sourceObjStream.StreamID = testrand.UUID()
|
|
||||||
sourceObj, _ := metabasetest.CreateTestObject{
|
|
||||||
BeginObjectExactVersion: &metabase.BeginObjectExactVersion{
|
|
||||||
ObjectStream: sourceObjStream,
|
|
||||||
Encryption: metabasetest.DefaultEncryption,
|
|
||||||
},
|
|
||||||
CommitObject: &metabase.CommitObject{
|
|
||||||
ObjectStream: sourceObjStream,
|
|
||||||
OverrideEncryptedMetadata: true,
|
|
||||||
EncryptedMetadata: testrand.Bytes(64),
|
|
||||||
EncryptedMetadataNonce: testrand.Nonce().Bytes(),
|
|
||||||
EncryptedMetadataEncryptedKey: testrand.Bytes(265),
|
|
||||||
},
|
|
||||||
}.Run(ctx, t, db, sourceObjStream, 0)
|
|
||||||
|
|
||||||
rawObjects = append(rawObjects, metabase.RawObject(sourceObj))
|
|
||||||
|
|
||||||
for _, version := range tc.destinationPendingVersions {
|
|
||||||
destinationObjStream.Version = version
|
|
||||||
destinationObjStream.StreamID = testrand.UUID()
|
|
||||||
metabasetest.CreatePendingObject(ctx, t, db, destinationObjStream, 0)
|
|
||||||
|
|
||||||
rawObjects = append(rawObjects, metabase.RawObject{
|
|
||||||
ObjectStream: destinationObjStream,
|
|
||||||
CreatedAt: now,
|
|
||||||
Status: metabase.Pending,
|
|
||||||
|
|
||||||
Encryption: metabasetest.DefaultEncryption,
|
|
||||||
ZombieDeletionDeadline: &zombieDeadline,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
if tc.destinationCommittedVersion != 0 {
|
|
||||||
destinationObjStream.StreamID = testrand.UUID()
|
|
||||||
destinationObjStream.Version = tc.destinationCommittedVersion
|
|
||||||
_, _ = metabasetest.CreateTestObject{
|
|
||||||
BeginObjectExactVersion: &metabase.BeginObjectExactVersion{
|
BeginObjectExactVersion: &metabase.BeginObjectExactVersion{
|
||||||
ObjectStream: destinationObjStream,
|
ObjectStream: sourceObjStream,
|
||||||
Encryption: metabasetest.DefaultEncryption,
|
Encryption: metabasetest.DefaultEncryption,
|
||||||
},
|
},
|
||||||
CommitObject: &metabase.CommitObject{
|
CommitObject: &metabase.CommitObject{
|
||||||
ObjectStream: destinationObjStream,
|
ObjectStream: sourceObjStream,
|
||||||
OverrideEncryptedMetadata: true,
|
OverrideEncryptedMetadata: true,
|
||||||
EncryptedMetadata: testrand.Bytes(64),
|
EncryptedMetadata: testrand.Bytes(64),
|
||||||
EncryptedMetadataNonce: testrand.Nonce().Bytes(),
|
EncryptedMetadataNonce: testrand.Nonce().Bytes(),
|
||||||
EncryptedMetadataEncryptedKey: testrand.Bytes(265),
|
EncryptedMetadataEncryptedKey: testrand.Bytes(265),
|
||||||
},
|
},
|
||||||
}.Run(ctx, t, db, destinationObjStream, 0)
|
ExpectVersion: tc.sourceCommittedVersion,
|
||||||
}
|
}.Run(ctx, t, db, sourceObjStream, 0)
|
||||||
|
|
||||||
copyObj, expectedOriginalSegments, _ := metabasetest.CreateObjectCopy{
|
rawObjects = append(rawObjects, metabase.RawObject(sourceObj))
|
||||||
OriginalObject: sourceObj,
|
|
||||||
CopyObjectStream: &destinationObjStream,
|
|
||||||
}.Run(ctx, t, db)
|
|
||||||
|
|
||||||
require.Equal(t, tc.expectedCopyVersion, copyObj.Version)
|
for _, version := range tc.destinationPendingVersions {
|
||||||
|
destinationObjStream.Version = version
|
||||||
|
destinationObjStream.StreamID = testrand.UUID()
|
||||||
|
metabasetest.CreatePendingObject(ctx, t, db, destinationObjStream, 0)
|
||||||
|
|
||||||
rawObjects = append(rawObjects, metabase.RawObject(copyObj))
|
rawObjects = append(rawObjects, metabase.RawObject{
|
||||||
|
ObjectStream: destinationObjStream,
|
||||||
|
CreatedAt: now,
|
||||||
|
Status: metabase.Pending,
|
||||||
|
|
||||||
metabasetest.Verify{
|
Encryption: metabasetest.DefaultEncryption,
|
||||||
Objects: rawObjects,
|
ZombieDeletionDeadline: &zombieDeadline,
|
||||||
Segments: expectedOriginalSegments,
|
})
|
||||||
}.Check(ctx, t, db)
|
}
|
||||||
|
|
||||||
|
if tc.destinationCommitVersion != 0 {
|
||||||
|
destinationObjStream.StreamID = testrand.UUID()
|
||||||
|
destinationObjStream.Version = tc.destinationCommitVersion
|
||||||
|
_, _ = metabasetest.CreateTestObject{
|
||||||
|
BeginObjectExactVersion: &metabase.BeginObjectExactVersion{
|
||||||
|
ObjectStream: destinationObjStream,
|
||||||
|
Encryption: metabasetest.DefaultEncryption,
|
||||||
|
},
|
||||||
|
CommitObject: &metabase.CommitObject{
|
||||||
|
ObjectStream: destinationObjStream,
|
||||||
|
OverrideEncryptedMetadata: true,
|
||||||
|
EncryptedMetadata: testrand.Bytes(64),
|
||||||
|
EncryptedMetadataNonce: testrand.Nonce().Bytes(),
|
||||||
|
EncryptedMetadataEncryptedKey: testrand.Bytes(265),
|
||||||
|
},
|
||||||
|
ExpectVersion: tc.destionationCommittedVersion,
|
||||||
|
}.Run(ctx, t, db, destinationObjStream, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
copyObj, expectedOriginalSegments, _ := metabasetest.CreateObjectCopy{
|
||||||
|
OriginalObject: sourceObj,
|
||||||
|
CopyObjectStream: &destinationObjStream,
|
||||||
|
}.Run(ctx, t, db)
|
||||||
|
|
||||||
|
require.Equal(t, tc.expectedCopyVersion, copyObj.Version)
|
||||||
|
|
||||||
|
rawObjects = append(rawObjects, metabase.RawObject(copyObj))
|
||||||
|
|
||||||
|
metabasetest.Verify{
|
||||||
|
Objects: rawObjects,
|
||||||
|
Segments: expectedOriginalSegments,
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -902,13 +902,13 @@ func TestListObjectsVersioned(t *testing.T) {
|
|||||||
|
|
||||||
a1 := a0
|
a1 := a0
|
||||||
a1.Version = 1001
|
a1.Version = 1001
|
||||||
b1 := a0
|
b1 := b0
|
||||||
b1.Version = 500
|
b1.Version = 500
|
||||||
|
|
||||||
objA0 := metabasetest.CreateObjectVersioned(ctx, t, db, a0, 0)
|
objA0 := metabasetest.CreateObjectVersioned(ctx, t, db, a0, 0)
|
||||||
objA1 := metabasetest.CreateObjectVersioned(ctx, t, db, a1, 0)
|
objA1 := metabasetest.CreateObjectVersioned(ctx, t, db, a1, 0)
|
||||||
objB0 := metabasetest.CreateObjectVersioned(ctx, t, db, b0, 0)
|
objB0 := metabasetest.CreateObjectVersioned(ctx, t, db, b0, 0)
|
||||||
objB1 := metabasetest.CreateObjectVersioned(ctx, t, db, b1, 0)
|
objB1 := metabasetest.CreateObjectVersionedOutOfOrder(ctx, t, db, b1, 0, 1001)
|
||||||
|
|
||||||
metabasetest.ListObjects{
|
metabasetest.ListObjects{
|
||||||
Opts: metabase.ListObjects{
|
Opts: metabase.ListObjects{
|
||||||
@ -922,7 +922,7 @@ func TestListObjectsVersioned(t *testing.T) {
|
|||||||
Result: metabase.ListObjectsResult{
|
Result: metabase.ListObjectsResult{
|
||||||
Objects: []metabase.ObjectEntry{
|
Objects: []metabase.ObjectEntry{
|
||||||
objectEntryFromRaw(metabase.RawObject(objA1)),
|
objectEntryFromRaw(metabase.RawObject(objA1)),
|
||||||
objectEntryFromRaw(metabase.RawObject(objB0)),
|
objectEntryFromRaw(metabase.RawObject(objB1)),
|
||||||
},
|
},
|
||||||
}}.Check(ctx, t, db)
|
}}.Check(ctx, t, db)
|
||||||
|
|
||||||
@ -952,13 +952,13 @@ func TestListObjectsVersioned(t *testing.T) {
|
|||||||
|
|
||||||
a1 := a0
|
a1 := a0
|
||||||
a1.Version = 1001
|
a1.Version = 1001
|
||||||
b1 := a0
|
b1 := b0
|
||||||
b1.Version = 500
|
b1.Version = 500
|
||||||
|
|
||||||
objA0 := metabasetest.CreateObjectVersioned(ctx, t, db, a0, 0)
|
objA0 := metabasetest.CreateObjectVersioned(ctx, t, db, a0, 0)
|
||||||
objA1 := metabasetest.CreateObjectVersioned(ctx, t, db, a1, 0)
|
objA1 := metabasetest.CreateObjectVersioned(ctx, t, db, a1, 0)
|
||||||
objB0 := metabasetest.CreateObjectVersioned(ctx, t, db, b0, 0)
|
objB0 := metabasetest.CreateObjectVersioned(ctx, t, db, b0, 0)
|
||||||
objB1 := metabasetest.CreateObjectVersioned(ctx, t, db, b1, 0)
|
objB1 := metabasetest.CreateObjectVersionedOutOfOrder(ctx, t, db, b1, 0, 1001)
|
||||||
|
|
||||||
deletionResult := metabasetest.DeleteObjectLastCommitted{
|
deletionResult := metabasetest.DeleteObjectLastCommitted{
|
||||||
Opts: metabase.DeleteObjectLastCommitted{
|
Opts: metabase.DeleteObjectLastCommitted{
|
||||||
@ -992,7 +992,7 @@ func TestListObjectsVersioned(t *testing.T) {
|
|||||||
},
|
},
|
||||||
Result: metabase.ListObjectsResult{
|
Result: metabase.ListObjectsResult{
|
||||||
Objects: []metabase.ObjectEntry{
|
Objects: []metabase.ObjectEntry{
|
||||||
objectEntryFromRaw(metabase.RawObject(objB0)),
|
objectEntryFromRaw(metabase.RawObject(objB1)),
|
||||||
},
|
},
|
||||||
}}.Check(ctx, t, db)
|
}}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
@ -124,6 +124,19 @@ func CreateObjectVersioned(ctx *testcontext.Context, t testing.TB, db *metabase.
|
|||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CreateObjectVersionedOutOfOrder creates a new committed object with the specified number of segments.
|
||||||
|
func CreateObjectVersionedOutOfOrder(ctx *testcontext.Context, t testing.TB, db *metabase.DB, obj metabase.ObjectStream, numberOfSegments byte, expectVersion metabase.Version) metabase.Object {
|
||||||
|
CreatePendingObject(ctx, t, db, obj, numberOfSegments)
|
||||||
|
|
||||||
|
return CommitObject{
|
||||||
|
Opts: metabase.CommitObject{
|
||||||
|
ObjectStream: obj,
|
||||||
|
Versioned: true,
|
||||||
|
},
|
||||||
|
ExpectVersion: expectVersion,
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
}
|
||||||
|
|
||||||
// CreateExpiredObject creates a new committed expired object with the specified number of segments.
|
// CreateExpiredObject creates a new committed expired object with the specified number of segments.
|
||||||
func CreateExpiredObject(ctx *testcontext.Context, t testing.TB, db *metabase.DB, obj metabase.ObjectStream, numberOfSegments byte, expiresAt time.Time) metabase.Object {
|
func CreateExpiredObject(ctx *testcontext.Context, t testing.TB, db *metabase.DB, obj metabase.ObjectStream, numberOfSegments byte, expiresAt time.Time) metabase.Object {
|
||||||
BeginObjectExactVersion{
|
BeginObjectExactVersion{
|
||||||
@ -206,6 +219,7 @@ func CreateSegments(ctx *testcontext.Context, t testing.TB, db *metabase.DB, obj
|
|||||||
type CreateTestObject struct {
|
type CreateTestObject struct {
|
||||||
BeginObjectExactVersion *metabase.BeginObjectExactVersion
|
BeginObjectExactVersion *metabase.BeginObjectExactVersion
|
||||||
CommitObject *metabase.CommitObject
|
CommitObject *metabase.CommitObject
|
||||||
|
ExpectVersion metabase.Version
|
||||||
CreateSegment func(object metabase.Object, index int) metabase.Segment
|
CreateSegment func(object metabase.Object, index int) metabase.Segment
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -302,7 +316,8 @@ func (co CreateTestObject) Run(ctx *testcontext.Context, t testing.TB, db *metab
|
|||||||
}
|
}
|
||||||
|
|
||||||
createdObject := CommitObject{
|
createdObject := CommitObject{
|
||||||
Opts: coOpts,
|
Opts: coOpts,
|
||||||
|
ExpectVersion: co.ExpectVersion,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
return createdObject, createdSegments
|
return createdObject, createdSegments
|
||||||
|
@ -87,9 +87,10 @@ func (step BeginObjectExactVersion) Check(ctx *testcontext.Context, t require.Te
|
|||||||
|
|
||||||
// CommitObject is for testing metabase.CommitObject.
|
// CommitObject is for testing metabase.CommitObject.
|
||||||
type CommitObject struct {
|
type CommitObject struct {
|
||||||
Opts metabase.CommitObject
|
Opts metabase.CommitObject
|
||||||
ErrClass *errs.Class
|
ExpectVersion metabase.Version
|
||||||
ErrText string
|
ErrClass *errs.Class
|
||||||
|
ErrText string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check runs the test.
|
// Check runs the test.
|
||||||
@ -97,6 +98,9 @@ func (step CommitObject) Check(ctx *testcontext.Context, t require.TestingT, db
|
|||||||
object, err := db.CommitObject(ctx, step.Opts)
|
object, err := db.CommitObject(ctx, step.Opts)
|
||||||
checkError(t, err, step.ErrClass, step.ErrText)
|
checkError(t, err, step.ErrClass, step.ErrText)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
if step.ExpectVersion != 0 {
|
||||||
|
step.Opts.ObjectStream.Version = step.ExpectVersion
|
||||||
|
}
|
||||||
require.Equal(t, step.Opts.ObjectStream, object.ObjectStream)
|
require.Equal(t, step.Opts.ObjectStream, object.ObjectStream)
|
||||||
}
|
}
|
||||||
return object
|
return object
|
||||||
@ -104,8 +108,10 @@ func (step CommitObject) Check(ctx *testcontext.Context, t require.TestingT, db
|
|||||||
|
|
||||||
// CommitObjectWithSegments is for testing metabase.CommitObjectWithSegments.
|
// CommitObjectWithSegments is for testing metabase.CommitObjectWithSegments.
|
||||||
type CommitObjectWithSegments struct {
|
type CommitObjectWithSegments struct {
|
||||||
Opts metabase.CommitObjectWithSegments
|
Opts metabase.CommitObjectWithSegments
|
||||||
Deleted []metabase.DeletedSegmentInfo
|
Deleted []metabase.DeletedSegmentInfo
|
||||||
|
ExpectVersion metabase.Version
|
||||||
|
|
||||||
ErrClass *errs.Class
|
ErrClass *errs.Class
|
||||||
ErrText string
|
ErrText string
|
||||||
}
|
}
|
||||||
@ -115,6 +121,9 @@ func (step CommitObjectWithSegments) Check(ctx *testcontext.Context, t testing.T
|
|||||||
object, deleted, err := db.CommitObjectWithSegments(ctx, step.Opts)
|
object, deleted, err := db.CommitObjectWithSegments(ctx, step.Opts)
|
||||||
checkError(t, err, step.ErrClass, step.ErrText)
|
checkError(t, err, step.ErrClass, step.ErrText)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
if step.ExpectVersion != 0 {
|
||||||
|
step.Opts.ObjectStream.Version = step.ExpectVersion
|
||||||
|
}
|
||||||
require.Equal(t, step.Opts.ObjectStream, object.ObjectStream)
|
require.Equal(t, step.Opts.ObjectStream, object.ObjectStream)
|
||||||
}
|
}
|
||||||
require.Equal(t, step.Deleted, deleted)
|
require.Equal(t, step.Deleted, deleted)
|
||||||
|
Loading…
Reference in New Issue
Block a user