satellite/metabase: add CommitObject.Versioned

This allows to commit versioned objects.

Change-Id: I7ae100e508a23899392ba40084198617fe3e4e0c
This commit is contained in:
Egon Elbre 2023-10-16 14:48:25 +03:00 committed by Storj Robot
parent be5302d9cc
commit a23d9d20aa
5 changed files with 296 additions and 29 deletions

View File

@ -615,6 +615,9 @@ type CommitObject struct {
DisallowDelete bool
UsePendingObjectsTable bool
// Versioned indicates whether an object is allowed to have multiple versions.
Versioned bool
}
// Verify verifies reqest fields.
@ -685,8 +688,12 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
}
const versionArgIndex = 3
nextStatus := committedWhereVersioned(opts.Versioned)
args := []interface{}{
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, opts.StreamID,
nextStatus,
len(segments),
totalPlainSize,
totalEncryptedSize,
@ -694,21 +701,31 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
encryptionParameters{&opts.Encryption},
}
deleteResult, err := db.deleteObjectUnversionedCommitted(ctx, ObjectLocation{
ProjectID: opts.ProjectID,
BucketName: opts.BucketName,
ObjectKey: opts.ObjectKey,
}, tx)
if err != nil {
return Error.Wrap(err)
}
var highestVersion Version
if opts.Versioned {
// TODO(ver): fold this into the queries below using a subquery.
v, err := db.queryHighestVersion(ctx, opts.Location(), tx)
if err != nil {
return Error.Wrap(err)
}
highestVersion = v
} else {
// TODO(ver): fold this into the query below using a subquery using `ON CONFLICT` on the unique index.
// Note, we are prematurely deleting the object without permissions
// and then rolling the action back, if we were not allowed to.
deleteResult, err := db.deleteObjectUnversionedCommitted(ctx, opts.Location(), tx)
if err != nil {
return Error.Wrap(err)
}
if deleteResult.DeletedObjectCount > 0 && opts.DisallowDelete {
return ErrPermissionDenied.New("no permissions to delete existing object")
}
if deleteResult.DeletedObjectCount > 0 && opts.DisallowDelete {
return ErrPermissionDenied.New("no permissions to delete existing object")
highestVersion = deleteResult.MaxVersion
}
if opts.UsePendingObjectsTable {
opts.Version = deleteResult.MaxVersion + 1
opts.Version = highestVersion + 1
args[versionArgIndex] = opts.Version
args = append(args,
@ -736,27 +753,27 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
)
SELECT
$1 as project_id, $2 as bucket_name, $3 as object_key, $4::INT4 as version, $5 as stream_id,
`+statusCommittedUnversioned+` as status, $6::INT4 as segment_count, $7::INT8 as total_plain_size, $8::INT8 as total_encrypted_size,
$9::INT4 as fixed_segment_size, NULL::timestamp as zombie_deletion_deadline, expires_at,
$6 as status, $7::INT4 as segment_count, $8::INT8 as total_plain_size, $9::INT8 as total_encrypted_size,
$10::INT4 as fixed_segment_size, NULL::timestamp as zombie_deletion_deadline, expires_at,
-- TODO should we allow to override existing encryption parameters or return error if don't match with opts?
CASE
WHEN encryption = 0 AND $10 <> 0 THEN $10
WHEN encryption = 0 AND $10 = 0 THEN NULL
WHEN encryption = 0 AND $11 <> 0 THEN $11
WHEN encryption = 0 AND $11 = 0 THEN NULL
ELSE encryption
END as
encryption,
CASE
WHEN $14::BOOL = true THEN $11
WHEN $15::BOOL = true THEN $12
ELSE encrypted_metadata_nonce
END as
encrypted_metadata_nonce,
CASE
WHEN $14::BOOL = true THEN $12
WHEN $15::BOOL = true THEN $13
ELSE encrypted_metadata
END as
encrypted_metadata,
CASE
WHEN $14::BOOL = true THEN $13
WHEN $15::BOOL = true THEN $14
ELSE encrypted_metadata_encrypted_key
END as
encrypted_metadata_encrypted_key
@ -790,25 +807,25 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
opts.EncryptedMetadataEncryptedKey,
)
metadataColumns = `,
encrypted_metadata_nonce = $11,
encrypted_metadata = $12,
encrypted_metadata_encrypted_key = $13
encrypted_metadata_nonce = $12,
encrypted_metadata = $13,
encrypted_metadata_encrypted_key = $14
`
}
err = tx.QueryRowContext(ctx, `
UPDATE objects SET
status =`+statusCommittedUnversioned+`,
segment_count = $6,
status = $6,
segment_count = $7,
total_plain_size = $7,
total_encrypted_size = $8,
fixed_segment_size = $9,
total_plain_size = $8,
total_encrypted_size = $9,
fixed_segment_size = $10,
zombie_deletion_deadline = NULL,
-- TODO should we allow to override existing encryption parameters or return error if don't match with opts?
encryption = CASE
WHEN objects.encryption = 0 AND $10 <> 0 THEN $10
WHEN objects.encryption = 0 AND $10 = 0 THEN NULL
WHEN objects.encryption = 0 AND $11 <> 0 THEN $11
WHEN objects.encryption = 0 AND $11 = 0 THEN NULL
ELSE objects.encryption
END
`+metadataColumns+`
@ -844,7 +861,7 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
object.BucketName = opts.BucketName
object.ObjectKey = opts.ObjectKey
object.Version = opts.Version
object.Status = CommittedUnversioned
object.Status = nextStatus
object.SegmentCount = int32(len(segments))
object.TotalPlainSize = totalPlainSize
object.TotalEncryptedSize = totalEncryptedSize

View File

@ -4358,6 +4358,177 @@ func TestCommitObject(t *testing.T) {
})
}
func TestCommitObjectVersioned(t *testing.T) {
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
obj := metabasetest.RandObjectStream()
obj.Version = metabase.NextVersion
now := time.Now()
zombieExpiration := now.Add(24 * time.Hour)
t.Run("Commit mixed versioned and unversioned", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
v1 := obj
metabasetest.BeginObjectNextVersion{
Opts: metabase.BeginObjectNextVersion{
ObjectStream: v1,
Encryption: metabasetest.DefaultEncryption,
ZombieDeletionDeadline: &zombieExpiration,
},
Version: 1,
}.Check(ctx, t, db)
v1.Version = 1
v2 := obj
metabasetest.BeginObjectNextVersion{
Opts: metabase.BeginObjectNextVersion{
ObjectStream: v2,
Encryption: metabasetest.DefaultEncryption,
ZombieDeletionDeadline: &zombieExpiration,
},
Version: 2,
}.Check(ctx, t, db)
v2.Version = 2
v3 := obj
metabasetest.BeginObjectNextVersion{
Opts: metabase.BeginObjectNextVersion{
ObjectStream: v3,
Encryption: metabasetest.DefaultEncryption,
ZombieDeletionDeadline: &zombieExpiration,
},
Version: 3,
}.Check(ctx, t, db)
v3.Version = 3
v4 := obj
metabasetest.BeginObjectNextVersion{
Opts: metabase.BeginObjectNextVersion{
ObjectStream: v4,
Encryption: metabasetest.DefaultEncryption,
ZombieDeletionDeadline: &zombieExpiration,
},
Version: 4,
}.Check(ctx, t, db)
v4.Version = 4
// allow having multiple pending objects
metabasetest.Verify{
Objects: []metabase.RawObject{
{
ObjectStream: v1,
CreatedAt: now,
Status: metabase.Pending,
Encryption: metabasetest.DefaultEncryption,
ZombieDeletionDeadline: &zombieExpiration,
},
{
ObjectStream: v2,
CreatedAt: now,
Status: metabase.Pending,
Encryption: metabasetest.DefaultEncryption,
ZombieDeletionDeadline: &zombieExpiration,
},
{
ObjectStream: v3,
CreatedAt: now,
Status: metabase.Pending,
Encryption: metabasetest.DefaultEncryption,
ZombieDeletionDeadline: &zombieExpiration,
},
{
ObjectStream: v4,
CreatedAt: now,
Status: metabase.Pending,
Encryption: metabasetest.DefaultEncryption,
ZombieDeletionDeadline: &zombieExpiration,
},
},
}.Check(ctx, t, db)
metabasetest.CommitObject{
Opts: metabase.CommitObject{
ObjectStream: v3,
},
}.Check(ctx, t, db)
metabasetest.CommitObject{
Opts: metabase.CommitObject{
ObjectStream: v1,
},
}.Check(ctx, t, db)
// The latter commit should overwrite the v3.
// When pending objects table is enabled, then objects
// get the version during commit, hence the latest version
// will be the max.
metabasetest.Verify{
Objects: []metabase.RawObject{
{
ObjectStream: v1,
CreatedAt: now,
Status: metabase.CommittedUnversioned,
Encryption: metabasetest.DefaultEncryption,
},
{
ObjectStream: v2,
CreatedAt: now,
Status: metabase.Pending,
Encryption: metabasetest.DefaultEncryption,
ZombieDeletionDeadline: &zombieExpiration,
},
{
ObjectStream: v4,
CreatedAt: now,
Status: metabase.Pending,
Encryption: metabasetest.DefaultEncryption,
ZombieDeletionDeadline: &zombieExpiration,
},
},
}.Check(ctx, t, db)
metabasetest.CommitObject{
Opts: metabase.CommitObject{
ObjectStream: v2,
Versioned: true,
},
}.Check(ctx, t, db)
metabasetest.CommitObject{
Opts: metabase.CommitObject{
ObjectStream: v4,
Versioned: true,
},
}.Check(ctx, t, db)
metabasetest.Verify{
Objects: []metabase.RawObject{
{
ObjectStream: v1,
CreatedAt: now,
Status: metabase.CommittedUnversioned,
Encryption: metabasetest.DefaultEncryption,
},
{
ObjectStream: v2,
CreatedAt: now,
Status: metabase.CommittedVersioned,
Encryption: metabasetest.DefaultEncryption,
},
{
ObjectStream: v4,
CreatedAt: now,
Status: metabase.CommittedVersioned,
Encryption: metabasetest.DefaultEncryption,
},
},
}.Check(ctx, t, db)
})
})
}
func TestCommitObjectWithIncorrectPartSize(t *testing.T) {
metabasetest.RunWithConfig(t, metabase.Config{
ApplicationName: "satellite-test",

View File

@ -393,6 +393,13 @@ const (
statusesUnversioned = "(3,5)"
)
func committedWhereVersioned(versioned bool) ObjectStatus {
if versioned {
return CommittedVersioned
}
return CommittedUnversioned
}
// stub uses so the linter wouldn't complain.
var (
_ = CommittedVersioned

View File

@ -355,6 +355,29 @@ func (db *DB) deleteObjectUnversionedCommitted(ctx context.Context, loc ObjectLo
return result, nil
}
// queryHighestVersion queries the latest version of an object inside an transaction.
//
// TODO(ver): this should have a better and clearer name.
func (db *DB) queryHighestVersion(ctx context.Context, loc ObjectLocation, stmt stmtRow) (highest Version, err error) {
defer mon.Task()(&ctx)(&err)
if err := loc.Verify(); err != nil {
return 0, Error.Wrap(err)
}
err = stmt.QueryRowContext(ctx, `
SELECT MAX(version) as version
FROM objects
WHERE (project_id, bucket_name, object_key) = ($1, $2, $3)
`, loc.ProjectID, []byte(loc.BucketName), loc.ObjectKey).Scan(&highest)
if err != nil {
return 0, Error.Wrap(err)
}
return highest, nil
}
// DeleteObjectsAllVersions deletes all versions of multiple objects from the same bucket.
func (db *DB) DeleteObjectsAllVersions(ctx context.Context, opts DeleteObjectsAllVersions) (result DeleteObjectResult, err error) {
defer mon.Task()(&ctx)(&err)

View File

@ -179,6 +179,55 @@ func CreateObject(ctx *testcontext.Context, t require.TestingT, db *metabase.DB,
}.Check(ctx, t, db)
}
// CreateObjectVersioned creates a new committed object with the specified number of segments.
func CreateObjectVersioned(ctx *testcontext.Context, t require.TestingT, db *metabase.DB, obj metabase.ObjectStream, numberOfSegments byte) metabase.Object {
BeginObjectExactVersion{
Opts: metabase.BeginObjectExactVersion{
ObjectStream: obj,
Encryption: DefaultEncryption,
},
}.Check(ctx, t, db)
for i := byte(0); i < numberOfSegments; i++ {
BeginSegment{
Opts: metabase.BeginSegment{
ObjectStream: obj,
Position: metabase.SegmentPosition{Part: 0, Index: uint32(i)},
RootPieceID: storj.PieceID{i + 1},
Pieces: []metabase.Piece{{
Number: 1,
StorageNode: testrand.NodeID(),
}},
},
}.Check(ctx, t, db)
CommitSegment{
Opts: metabase.CommitSegment{
ObjectStream: obj,
Position: metabase.SegmentPosition{Part: 0, Index: uint32(i)},
RootPieceID: storj.PieceID{1},
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
EncryptedETag: []byte{5},
EncryptedSize: 1024,
PlainSize: 512,
PlainOffset: 0,
Redundancy: DefaultRedundancy,
},
}.Check(ctx, t, db)
}
return CommitObject{
Opts: metabase.CommitObject{
ObjectStream: obj,
Versioned: true,
},
}.Check(ctx, t, db)
}
// CreateExpiredObject creates a new committed expired object with the specified number of segments.
func CreateExpiredObject(ctx *testcontext.Context, t *testing.T, db *metabase.DB, obj metabase.ObjectStream, numberOfSegments byte, expiresAt time.Time) metabase.Object {
BeginObjectExactVersion{