From d3429fafd04bb9aa86ea5506110c2afc34657071 Mon Sep 17 00:00:00 2001 From: Egon Elbre Date: Mon, 16 Oct 2023 15:04:07 +0300 Subject: [PATCH] satellite/metabase: add CommitObjectWithSegments.Versioned Change-Id: I4e1fc82b45cca94ff0b48d5b8d9deb6e13d0957b --- satellite/metabase/commit_object.go | 48 ++++++-- satellite/metabase/commit_object_test.go | 133 +++++++++++++++++++++++ 2 files changed, 171 insertions(+), 10 deletions(-) diff --git a/satellite/metabase/commit_object.go b/satellite/metabase/commit_object.go index 18a18967d..ef3116618 100644 --- a/satellite/metabase/commit_object.go +++ b/satellite/metabase/commit_object.go @@ -18,6 +18,8 @@ import ( ) // CommitObjectWithSegments contains arguments necessary for committing an object. +// +// TODO: not ready for production. type CommitObjectWithSegments struct { ObjectStream @@ -27,9 +29,18 @@ type CommitObjectWithSegments struct { // TODO: this probably should use segment ranges rather than individual items Segments []SegmentPosition + + // DisallowDelete indicates whether the user is allowed to overwrite + // the previous unversioned object. + DisallowDelete bool + + // Versioned indicates whether an object is allowed to have multiple versions. + Versioned bool } // CommitObjectWithSegments commits pending object to the database. +// +// TODO: not ready for production. func (db *DB) CommitObjectWithSegments(ctx context.Context, opts CommitObjectWithSegments) (object Object, deletedSegments []DeletedSegmentInfo, err error) { defer mon.Task()(&ctx)(&err) @@ -40,10 +51,25 @@ func (db *DB) CommitObjectWithSegments(ctx context.Context, opts CommitObjectWit return Object{}, nil, err } + var previousObject deleteObjectUnversionedCommittedResult + err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) error { // TODO: should we prevent this from executing when the object has been committed // currently this requires quite a lot of database communication, so invalid handling can be expensive. + if !opts.Versioned { + var err error + // Note, we are prematurely deleting the object without permissions + // and then rolling the action back, if we were not allowed to. + previousObject, err = db.deleteObjectUnversionedCommitted(ctx, opts.Location(), tx) + if err != nil { + return err + } + if previousObject.DeletedObjectCount > 0 && opts.DisallowDelete { + return ErrPermissionDenied.New("no permissions to delete existing object") + } + } + segmentsInDatabase, err := fetchSegmentsForCommit(ctx, tx, opts.StreamID) if err != nil { return err @@ -86,18 +112,20 @@ func (db *DB) CommitObjectWithSegments(ctx context.Context, opts CommitObjectWit totalEncryptedSize += int64(seg.EncryptedSize) } + nextStatus := committedWhereVersioned(opts.Versioned) + err = tx.QueryRowContext(ctx, ` UPDATE objects SET - status =`+statusCommittedUnversioned+`, - segment_count = $6, + status = $6, + segment_count = $7, - encrypted_metadata_nonce = $7, - encrypted_metadata = $8, - encrypted_metadata_encrypted_key = $9, + encrypted_metadata_nonce = $8, + encrypted_metadata = $9, + encrypted_metadata_encrypted_key = $10, - total_plain_size = $10, - total_encrypted_size = $11, - fixed_segment_size = $12, + total_plain_size = $11, + total_encrypted_size = $12, + fixed_segment_size = $13, zombie_deletion_deadline = NULL WHERE project_id = $1 AND @@ -109,7 +137,7 @@ func (db *DB) CommitObjectWithSegments(ctx context.Context, opts CommitObjectWit RETURNING created_at, expires_at, encryption; - `, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, opts.StreamID, + `, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, opts.StreamID, nextStatus, len(finalSegments), opts.EncryptedMetadataNonce, opts.EncryptedMetadata, opts.EncryptedMetadataEncryptedKey, totalPlainSize, @@ -132,7 +160,7 @@ func (db *DB) CommitObjectWithSegments(ctx context.Context, opts CommitObjectWit object.BucketName = opts.BucketName object.ObjectKey = opts.ObjectKey object.Version = opts.Version - object.Status = CommittedUnversioned + object.Status = nextStatus object.SegmentCount = int32(len(finalSegments)) object.EncryptedMetadataNonce = opts.EncryptedMetadataNonce object.EncryptedMetadata = opts.EncryptedMetadata diff --git a/satellite/metabase/commit_object_test.go b/satellite/metabase/commit_object_test.go index 12c15c6af..57ed306df 100644 --- a/satellite/metabase/commit_object_test.go +++ b/satellite/metabase/commit_object_test.go @@ -171,6 +171,139 @@ func TestCommitObjectWithSegments(t *testing.T) { }.Check(ctx, t, db) }) + // TODO(ver): add tests for DisallowDelete + + t.Run("delete previous unversioned object", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + metabasetest.CreateObject(ctx, t, db, obj, 1) + + metabasetest.BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: metabase.ObjectStream{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + Version: 5, + StreamID: obj.StreamID, + }, + Encryption: metabasetest.DefaultEncryption, + }, + }.Check(ctx, t, db) + now := time.Now() + + encryptedMetadata := testrand.Bytes(1024) + encryptedMetadataNonce := testrand.Nonce() + encryptedMetadataKey := testrand.Bytes(265) + + metabasetest.CommitObjectWithSegments{ + Opts: metabase.CommitObjectWithSegments{ + ObjectStream: metabase.ObjectStream{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + Version: 5, + StreamID: obj.StreamID, + }, + EncryptedMetadataNonce: encryptedMetadataNonce[:], + EncryptedMetadata: encryptedMetadata, + EncryptedMetadataEncryptedKey: encryptedMetadataKey, + }, + }.Check(ctx, t, db) + + metabasetest.Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: metabase.ObjectStream{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + Version: 5, + StreamID: obj.StreamID, + }, + CreatedAt: now, + Status: metabase.CommittedUnversioned, + + EncryptedMetadataNonce: encryptedMetadataNonce[:], + EncryptedMetadata: encryptedMetadata, + EncryptedMetadataEncryptedKey: encryptedMetadataKey, + + Encryption: metabasetest.DefaultEncryption, + }, + }, + }.Check(ctx, t, db) + }) + + t.Run("keep previous objects when committing versioned", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + unversionedStream := obj + unversionedStream.Version = 3 + unversionedObject := metabasetest.CreateObject(ctx, t, db, unversionedStream, 0) + versionedStream := obj + versionedStream.Version = 4 + versionedObject := metabasetest.CreateObjectVersioned(ctx, t, db, versionedStream, 0) + + metabasetest.BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: metabase.ObjectStream{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + Version: 5, + StreamID: obj.StreamID, + }, + Encryption: metabasetest.DefaultEncryption, + }, + }.Check(ctx, t, db) + now := time.Now() + + encryptedMetadata := testrand.Bytes(1024) + encryptedMetadataNonce := testrand.Nonce() + encryptedMetadataKey := testrand.Bytes(265) + + metabasetest.CommitObjectWithSegments{ + Opts: metabase.CommitObjectWithSegments{ + ObjectStream: metabase.ObjectStream{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + Version: 5, + StreamID: obj.StreamID, + }, + EncryptedMetadataNonce: encryptedMetadataNonce[:], + EncryptedMetadata: encryptedMetadata, + EncryptedMetadataEncryptedKey: encryptedMetadataKey, + + Versioned: true, + }, + }.Check(ctx, t, db) + + metabasetest.Verify{ + Objects: []metabase.RawObject{ + metabase.RawObject(unversionedObject), + metabase.RawObject(versionedObject), + { + ObjectStream: metabase.ObjectStream{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + Version: 5, + StreamID: obj.StreamID, + }, + CreatedAt: now, + Status: metabase.CommittedVersioned, + + EncryptedMetadataNonce: encryptedMetadataNonce[:], + EncryptedMetadata: encryptedMetadata, + EncryptedMetadataEncryptedKey: encryptedMetadataKey, + + Encryption: metabasetest.DefaultEncryption, + }, + }, + }.Check(ctx, t, db) + }) + t.Run("segments missing in database", func(t *testing.T) { defer metabasetest.DeleteAll{}.Check(ctx, t, db)