From a23d9d20aabcb54a7d78bcf3b8e8e8195d42d527 Mon Sep 17 00:00:00 2001 From: Egon Elbre Date: Mon, 16 Oct 2023 14:48:25 +0300 Subject: [PATCH] satellite/metabase: add CommitObject.Versioned This allows to commit versioned objects. Change-Id: I7ae100e508a23899392ba40084198617fe3e4e0c --- satellite/metabase/commit.go | 75 ++++++---- satellite/metabase/commit_test.go | 171 ++++++++++++++++++++++ satellite/metabase/common.go | 7 + satellite/metabase/delete.go | 23 +++ satellite/metabase/metabasetest/create.go | 49 +++++++ 5 files changed, 296 insertions(+), 29 deletions(-) diff --git a/satellite/metabase/commit.go b/satellite/metabase/commit.go index bd27a6797..9bbd3479a 100644 --- a/satellite/metabase/commit.go +++ b/satellite/metabase/commit.go @@ -615,6 +615,9 @@ type CommitObject struct { DisallowDelete bool UsePendingObjectsTable bool + + // Versioned indicates whether an object is allowed to have multiple versions. + Versioned bool } // Verify verifies reqest fields. @@ -685,8 +688,12 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec } const versionArgIndex = 3 + + nextStatus := committedWhereVersioned(opts.Versioned) + args := []interface{}{ opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, opts.StreamID, + nextStatus, len(segments), totalPlainSize, totalEncryptedSize, @@ -694,21 +701,31 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec encryptionParameters{&opts.Encryption}, } - deleteResult, err := db.deleteObjectUnversionedCommitted(ctx, ObjectLocation{ - ProjectID: opts.ProjectID, - BucketName: opts.BucketName, - ObjectKey: opts.ObjectKey, - }, tx) - if err != nil { - return Error.Wrap(err) - } + var highestVersion Version + if opts.Versioned { + // TODO(ver): fold this into the queries below using a subquery. + v, err := db.queryHighestVersion(ctx, opts.Location(), tx) + if err != nil { + return Error.Wrap(err) + } + highestVersion = v + } else { + // TODO(ver): fold this into the query below using a subquery using `ON CONFLICT` on the unique index. + // Note, we are prematurely deleting the object without permissions + // and then rolling the action back, if we were not allowed to. + deleteResult, err := db.deleteObjectUnversionedCommitted(ctx, opts.Location(), tx) + if err != nil { + return Error.Wrap(err) + } + if deleteResult.DeletedObjectCount > 0 && opts.DisallowDelete { + return ErrPermissionDenied.New("no permissions to delete existing object") + } - if deleteResult.DeletedObjectCount > 0 && opts.DisallowDelete { - return ErrPermissionDenied.New("no permissions to delete existing object") + highestVersion = deleteResult.MaxVersion } if opts.UsePendingObjectsTable { - opts.Version = deleteResult.MaxVersion + 1 + opts.Version = highestVersion + 1 args[versionArgIndex] = opts.Version args = append(args, @@ -736,27 +753,27 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec ) SELECT $1 as project_id, $2 as bucket_name, $3 as object_key, $4::INT4 as version, $5 as stream_id, - `+statusCommittedUnversioned+` as status, $6::INT4 as segment_count, $7::INT8 as total_plain_size, $8::INT8 as total_encrypted_size, - $9::INT4 as fixed_segment_size, NULL::timestamp as zombie_deletion_deadline, expires_at, + $6 as status, $7::INT4 as segment_count, $8::INT8 as total_plain_size, $9::INT8 as total_encrypted_size, + $10::INT4 as fixed_segment_size, NULL::timestamp as zombie_deletion_deadline, expires_at, -- TODO should we allow to override existing encryption parameters or return error if don't match with opts? CASE - WHEN encryption = 0 AND $10 <> 0 THEN $10 - WHEN encryption = 0 AND $10 = 0 THEN NULL + WHEN encryption = 0 AND $11 <> 0 THEN $11 + WHEN encryption = 0 AND $11 = 0 THEN NULL ELSE encryption END as encryption, CASE - WHEN $14::BOOL = true THEN $11 + WHEN $15::BOOL = true THEN $12 ELSE encrypted_metadata_nonce END as encrypted_metadata_nonce, CASE - WHEN $14::BOOL = true THEN $12 + WHEN $15::BOOL = true THEN $13 ELSE encrypted_metadata END as encrypted_metadata, CASE - WHEN $14::BOOL = true THEN $13 + WHEN $15::BOOL = true THEN $14 ELSE encrypted_metadata_encrypted_key END as encrypted_metadata_encrypted_key @@ -790,25 +807,25 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec opts.EncryptedMetadataEncryptedKey, ) metadataColumns = `, - encrypted_metadata_nonce = $11, - encrypted_metadata = $12, - encrypted_metadata_encrypted_key = $13 + encrypted_metadata_nonce = $12, + encrypted_metadata = $13, + encrypted_metadata_encrypted_key = $14 ` } err = tx.QueryRowContext(ctx, ` UPDATE objects SET - status =`+statusCommittedUnversioned+`, - segment_count = $6, + status = $6, + segment_count = $7, - total_plain_size = $7, - total_encrypted_size = $8, - fixed_segment_size = $9, + total_plain_size = $8, + total_encrypted_size = $9, + fixed_segment_size = $10, zombie_deletion_deadline = NULL, -- TODO should we allow to override existing encryption parameters or return error if don't match with opts? encryption = CASE - WHEN objects.encryption = 0 AND $10 <> 0 THEN $10 - WHEN objects.encryption = 0 AND $10 = 0 THEN NULL + WHEN objects.encryption = 0 AND $11 <> 0 THEN $11 + WHEN objects.encryption = 0 AND $11 = 0 THEN NULL ELSE objects.encryption END `+metadataColumns+` @@ -844,7 +861,7 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec object.BucketName = opts.BucketName object.ObjectKey = opts.ObjectKey object.Version = opts.Version - object.Status = CommittedUnversioned + object.Status = nextStatus object.SegmentCount = int32(len(segments)) object.TotalPlainSize = totalPlainSize object.TotalEncryptedSize = totalEncryptedSize diff --git a/satellite/metabase/commit_test.go b/satellite/metabase/commit_test.go index d58ec7d29..dab41a74a 100644 --- a/satellite/metabase/commit_test.go +++ b/satellite/metabase/commit_test.go @@ -4358,6 +4358,177 @@ func TestCommitObject(t *testing.T) { }) } +func TestCommitObjectVersioned(t *testing.T) { + metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + obj := metabasetest.RandObjectStream() + obj.Version = metabase.NextVersion + now := time.Now() + zombieExpiration := now.Add(24 * time.Hour) + + t.Run("Commit mixed versioned and unversioned", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + v1 := obj + metabasetest.BeginObjectNextVersion{ + Opts: metabase.BeginObjectNextVersion{ + ObjectStream: v1, + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieExpiration, + }, + Version: 1, + }.Check(ctx, t, db) + v1.Version = 1 + + v2 := obj + metabasetest.BeginObjectNextVersion{ + Opts: metabase.BeginObjectNextVersion{ + ObjectStream: v2, + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieExpiration, + }, + Version: 2, + }.Check(ctx, t, db) + v2.Version = 2 + + v3 := obj + metabasetest.BeginObjectNextVersion{ + Opts: metabase.BeginObjectNextVersion{ + ObjectStream: v3, + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieExpiration, + }, + Version: 3, + }.Check(ctx, t, db) + v3.Version = 3 + + v4 := obj + metabasetest.BeginObjectNextVersion{ + Opts: metabase.BeginObjectNextVersion{ + ObjectStream: v4, + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieExpiration, + }, + Version: 4, + }.Check(ctx, t, db) + v4.Version = 4 + + // allow having multiple pending objects + + metabasetest.Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: v1, + CreatedAt: now, + Status: metabase.Pending, + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieExpiration, + }, + { + ObjectStream: v2, + CreatedAt: now, + Status: metabase.Pending, + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieExpiration, + }, + { + ObjectStream: v3, + CreatedAt: now, + Status: metabase.Pending, + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieExpiration, + }, + { + ObjectStream: v4, + CreatedAt: now, + Status: metabase.Pending, + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieExpiration, + }, + }, + }.Check(ctx, t, db) + + metabasetest.CommitObject{ + Opts: metabase.CommitObject{ + ObjectStream: v3, + }, + }.Check(ctx, t, db) + + metabasetest.CommitObject{ + Opts: metabase.CommitObject{ + ObjectStream: v1, + }, + }.Check(ctx, t, db) + + // The latter commit should overwrite the v3. + // When pending objects table is enabled, then objects + // get the version during commit, hence the latest version + // will be the max. + + metabasetest.Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: v1, + CreatedAt: now, + Status: metabase.CommittedUnversioned, + Encryption: metabasetest.DefaultEncryption, + }, + { + ObjectStream: v2, + CreatedAt: now, + Status: metabase.Pending, + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieExpiration, + }, + { + ObjectStream: v4, + CreatedAt: now, + Status: metabase.Pending, + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieExpiration, + }, + }, + }.Check(ctx, t, db) + + metabasetest.CommitObject{ + Opts: metabase.CommitObject{ + ObjectStream: v2, + Versioned: true, + }, + }.Check(ctx, t, db) + + metabasetest.CommitObject{ + Opts: metabase.CommitObject{ + ObjectStream: v4, + Versioned: true, + }, + }.Check(ctx, t, db) + + metabasetest.Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: v1, + CreatedAt: now, + Status: metabase.CommittedUnversioned, + Encryption: metabasetest.DefaultEncryption, + }, + { + ObjectStream: v2, + CreatedAt: now, + Status: metabase.CommittedVersioned, + Encryption: metabasetest.DefaultEncryption, + }, + { + ObjectStream: v4, + CreatedAt: now, + Status: metabase.CommittedVersioned, + Encryption: metabasetest.DefaultEncryption, + }, + }, + }.Check(ctx, t, db) + }) + }) +} + func TestCommitObjectWithIncorrectPartSize(t *testing.T) { metabasetest.RunWithConfig(t, metabase.Config{ ApplicationName: "satellite-test", diff --git a/satellite/metabase/common.go b/satellite/metabase/common.go index a51994337..298a67169 100644 --- a/satellite/metabase/common.go +++ b/satellite/metabase/common.go @@ -393,6 +393,13 @@ const ( statusesUnversioned = "(3,5)" ) +func committedWhereVersioned(versioned bool) ObjectStatus { + if versioned { + return CommittedVersioned + } + return CommittedUnversioned +} + // stub uses so the linter wouldn't complain. var ( _ = CommittedVersioned diff --git a/satellite/metabase/delete.go b/satellite/metabase/delete.go index 2d406dfc7..28cd85133 100644 --- a/satellite/metabase/delete.go +++ b/satellite/metabase/delete.go @@ -355,6 +355,29 @@ func (db *DB) deleteObjectUnversionedCommitted(ctx context.Context, loc ObjectLo return result, nil } +// queryHighestVersion queries the latest version of an object inside an transaction. +// +// TODO(ver): this should have a better and clearer name. +func (db *DB) queryHighestVersion(ctx context.Context, loc ObjectLocation, stmt stmtRow) (highest Version, err error) { + defer mon.Task()(&ctx)(&err) + + if err := loc.Verify(); err != nil { + return 0, Error.Wrap(err) + } + + err = stmt.QueryRowContext(ctx, ` + SELECT MAX(version) as version + FROM objects + WHERE (project_id, bucket_name, object_key) = ($1, $2, $3) + `, loc.ProjectID, []byte(loc.BucketName), loc.ObjectKey).Scan(&highest) + + if err != nil { + return 0, Error.Wrap(err) + } + + return highest, nil +} + // DeleteObjectsAllVersions deletes all versions of multiple objects from the same bucket. func (db *DB) DeleteObjectsAllVersions(ctx context.Context, opts DeleteObjectsAllVersions) (result DeleteObjectResult, err error) { defer mon.Task()(&ctx)(&err) diff --git a/satellite/metabase/metabasetest/create.go b/satellite/metabase/metabasetest/create.go index 27c58c375..9b930e2f1 100644 --- a/satellite/metabase/metabasetest/create.go +++ b/satellite/metabase/metabasetest/create.go @@ -179,6 +179,55 @@ func CreateObject(ctx *testcontext.Context, t require.TestingT, db *metabase.DB, }.Check(ctx, t, db) } +// CreateObjectVersioned creates a new committed object with the specified number of segments. +func CreateObjectVersioned(ctx *testcontext.Context, t require.TestingT, db *metabase.DB, obj metabase.ObjectStream, numberOfSegments byte) metabase.Object { + BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: obj, + Encryption: DefaultEncryption, + }, + }.Check(ctx, t, db) + + for i := byte(0); i < numberOfSegments; i++ { + BeginSegment{ + Opts: metabase.BeginSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 0, Index: uint32(i)}, + RootPieceID: storj.PieceID{i + 1}, + Pieces: []metabase.Piece{{ + Number: 1, + StorageNode: testrand.NodeID(), + }}, + }, + }.Check(ctx, t, db) + + CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 0, Index: uint32(i)}, + RootPieceID: storj.PieceID{1}, + Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}}, + + EncryptedKey: []byte{3}, + EncryptedKeyNonce: []byte{4}, + EncryptedETag: []byte{5}, + + EncryptedSize: 1024, + PlainSize: 512, + PlainOffset: 0, + Redundancy: DefaultRedundancy, + }, + }.Check(ctx, t, db) + } + + return CommitObject{ + Opts: metabase.CommitObject{ + ObjectStream: obj, + Versioned: true, + }, + }.Check(ctx, t, db) +} + // CreateExpiredObject creates a new committed expired object with the specified number of segments. func CreateExpiredObject(ctx *testcontext.Context, t *testing.T, db *metabase.DB, obj metabase.ObjectStream, numberOfSegments byte, expiresAt time.Time) metabase.Object { BeginObjectExactVersion{