From 91ce70e4ed1ac4e334e120d4daa215a9a069b1e3 Mon Sep 17 00:00:00 2001 From: Jeff Wendling Date: Mon, 27 Sep 2021 13:38:15 -0400 Subject: [PATCH] satellite/metabase: allow overwriting segments in pending objects this is so that multipart upload allows overwriting parts. our consistency model is more relaxed because part overwrites won't be atomic if they consist of multiple segments, but at least this allows overwriting at all. Change-Id: I21dac4c24046e05efe74e6c6fd189a02c95eb6c8 --- satellite/metabase/commit.go | 62 +++++------ satellite/metabase/commit_test.go | 174 +++++++++++++++++++++++++++++- 2 files changed, 195 insertions(+), 41 deletions(-) diff --git a/satellite/metabase/commit.go b/satellite/metabase/commit.go index d29f370bd..71fbc1dc4 100644 --- a/satellite/metabase/commit.go +++ b/satellite/metabase/commit.go @@ -185,12 +185,9 @@ func (db *DB) BeginSegment(ctx context.Context, opts BeginSegment) (err error) { // NOTE: this isn't strictly necessary, since we can also fail this in CommitSegment. // however, we should prevent creating segements for non-partial objects. - // NOTE: these queries could be combined into one. - - err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) { - // Verify that object exists and is partial. - var value int - err = tx.QueryRowContext(ctx, ` + // Verify that object exists and is partial. + var value int + err = db.db.QueryRowContext(ctx, ` SELECT 1 FROM objects WHERE project_id = $1 AND @@ -199,30 +196,12 @@ func (db *DB) BeginSegment(ctx context.Context, opts BeginSegment) (err error) { version = $4 AND stream_id = $5 AND status = `+pendingStatus, - opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, opts.StreamID).Scan(&value) - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - return Error.New("pending object missing") - } - return Error.New("unable to query object status: %w", err) - } - - // Verify that the segment does not exist. - err = tx.QueryRowContext(ctx, ` - SELECT 1 - FROM segments WHERE - stream_id = $1 AND - position = $2 - `, opts.StreamID, opts.Position).Scan(&value) - if err != nil && !errors.Is(err, sql.ErrNoRows) { - return Error.New("unable to query segments: %w", err) - } - err = nil //nolint: wastedassign, ineffassign // ignore any other err result (explicitly) - - return nil - }) + opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, opts.StreamID).Scan(&value) if err != nil { - return err + if errors.Is(err, sql.ErrNoRows) { + return Error.New("pending object missing") + } + return Error.New("unable to query object status: %w", err) } mon.Meter("segment_begin").Mark(1) @@ -313,7 +292,15 @@ func (db *DB) CommitSegment(ctx context.Context, opts CommitSegment) (err error) $6, $7, $8, $9, $10, $11 - )`, opts.Position, opts.ExpiresAt, + ) + ON CONFLICT(stream_id, position) + DO UPDATE SET + expires_at = $2, + root_piece_id = $3, encrypted_key_nonce = $4, encrypted_key = $5, + encrypted_size = $6, plain_offset = $7, plain_size = $8, encrypted_etag = $9, + redundancy = $10, + remote_alias_pieces = $11 + `, opts.Position, opts.ExpiresAt, opts.RootPieceID, opts.EncryptedKeyNonce, opts.EncryptedKey, opts.EncryptedSize, opts.PlainOffset, opts.PlainSize, opts.EncryptedETag, redundancyScheme{&opts.Redundancy}, @@ -324,9 +311,6 @@ func (db *DB) CommitSegment(ctx context.Context, opts CommitSegment) (err error) if code := pgerrcode.FromError(err); code == pgxerrcode.NotNullViolation { return Error.New("pending object missing") } - if code := pgerrcode.FromError(err); code == pgxerrcode.UniqueViolation { - return ErrConflict.New("segment already exists") - } return Error.New("unable to insert segment: %w", err) } @@ -396,7 +380,14 @@ func (db *DB) CommitInlineSegment(ctx context.Context, opts CommitInlineSegment) $3, $4, $5, $6, $7, $8, $9, $10 - )`, opts.Position, opts.ExpiresAt, + ) + ON CONFLICT(stream_id, position) + DO UPDATE SET + expires_at = $2, + root_piece_id = $3, encrypted_key_nonce = $4, encrypted_key = $5, + encrypted_size = $6, plain_offset = $7, plain_size = $8, encrypted_etag = $9, + inline_data = $10 + `, opts.Position, opts.ExpiresAt, storj.PieceID{}, opts.EncryptedKeyNonce, opts.EncryptedKey, len(opts.InlineData), opts.PlainOffset, opts.PlainSize, opts.EncryptedETag, opts.InlineData, @@ -406,9 +397,6 @@ func (db *DB) CommitInlineSegment(ctx context.Context, opts CommitInlineSegment) if code := pgerrcode.FromError(err); code == pgxerrcode.NotNullViolation { return Error.New("pending object missing") } - if code := pgerrcode.FromError(err); code == pgxerrcode.UniqueViolation { - return ErrConflict.New("segment already exists") - } return Error.New("unable to insert segment: %w", err) } diff --git a/satellite/metabase/commit_test.go b/satellite/metabase/commit_test.go index 90fbf14e5..fa109289a 100644 --- a/satellite/metabase/commit_test.go +++ b/satellite/metabase/commit_test.go @@ -1170,8 +1170,6 @@ func TestCommitSegment(t *testing.T) { PlainOffset: 0, Redundancy: metabasetest.DefaultRedundancy, }, - ErrClass: &metabase.ErrConflict, - ErrText: "segment already exists", }.Check(ctx, t, db) metabasetest.Verify{ @@ -1207,6 +1205,102 @@ func TestCommitSegment(t *testing.T) { }.Check(ctx, t, db) }) + t.Run("overwrite", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + now1 := time.Now() + zombieDeadline := now1.Add(24 * time.Hour) + metabasetest.BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: obj, + Encryption: metabasetest.DefaultEncryption, + }, + Version: 1, + }.Check(ctx, t, db) + + rootPieceID1 := testrand.PieceID() + rootPieceID2 := testrand.PieceID() + pieces1 := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}} + pieces2 := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}} + encryptedKey := testrand.Bytes(32) + encryptedKeyNonce := testrand.Bytes(32) + + metabasetest.BeginSegment{ + Opts: metabase.BeginSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 0, Index: 0}, + RootPieceID: rootPieceID1, + Pieces: pieces1, + }, + }.Check(ctx, t, db) + + metabasetest.CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 0, Index: 0}, + RootPieceID: rootPieceID1, + Pieces: pieces1, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + EncryptedSize: 1024, + PlainSize: 512, + PlainOffset: 0, + Redundancy: metabasetest.DefaultRedundancy, + }, + }.Check(ctx, t, db) + + metabasetest.CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 0, Index: 0}, + RootPieceID: rootPieceID2, + Pieces: pieces2, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + EncryptedSize: 1024, + PlainSize: 512, + PlainOffset: 0, + Redundancy: metabasetest.DefaultRedundancy, + }, + }.Check(ctx, t, db) + + metabasetest.Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: obj, + CreatedAt: now1, + Status: metabase.Pending, + + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieDeadline, + }, + }, + Segments: []metabase.RawSegment{ + { + StreamID: obj.StreamID, + Position: metabase.SegmentPosition{Part: 0, Index: 0}, + CreatedAt: now, + + RootPieceID: rootPieceID2, + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + EncryptedSize: 1024, + PlainOffset: 0, + PlainSize: 512, + + Redundancy: metabasetest.DefaultRedundancy, + + Pieces: pieces2, + }, + }, + }.Check(ctx, t, db) + }) + t.Run("commit segment of missing object", func(t *testing.T) { defer metabasetest.DeleteAll{}.Check(ctx, t, db) @@ -1567,8 +1661,6 @@ func TestCommitInlineSegment(t *testing.T) { PlainSize: 512, PlainOffset: 0, }, - ErrClass: &metabase.ErrConflict, - ErrText: "segment already exists", }.Check(ctx, t, db) metabasetest.Verify{ @@ -1601,6 +1693,80 @@ func TestCommitInlineSegment(t *testing.T) { }.Check(ctx, t, db) }) + t.Run("overwrite", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + now1 := time.Now() + + metabasetest.BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: obj, + Encryption: metabasetest.DefaultEncryption, + }, + Version: 1, + }.Check(ctx, t, db) + + encryptedKey := testrand.Bytes(32) + encryptedKeyNonce := testrand.Bytes(32) + + metabasetest.CommitInlineSegment{ + Opts: metabase.CommitInlineSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 0, Index: 0}, + InlineData: []byte{1, 2, 3}, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + PlainSize: 512, + PlainOffset: 0, + }, + }.Check(ctx, t, db) + + metabasetest.CommitInlineSegment{ + Opts: metabase.CommitInlineSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 0, Index: 0}, + InlineData: []byte{4, 5, 6}, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + PlainSize: 512, + PlainOffset: 0, + }, + }.Check(ctx, t, db) + + metabasetest.Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: obj, + CreatedAt: now1, + Status: metabase.Pending, + + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieDeadline, + }, + }, + Segments: []metabase.RawSegment{ + { + StreamID: obj.StreamID, + Position: metabase.SegmentPosition{Part: 0, Index: 0}, + CreatedAt: now, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + PlainOffset: 0, + PlainSize: 512, + + InlineData: []byte{4, 5, 6}, + EncryptedSize: 3, + }, + }, + }.Check(ctx, t, db) + }) + t.Run("commit inline segment of missing object", func(t *testing.T) { defer metabasetest.DeleteAll{}.Check(ctx, t, db)