satellite/metabase: allow overwriting segments in pending objects

this is so that multipart upload allows overwriting parts. our
consistency model is more relaxed because part overwrites
won't be atomic if they consist of multiple segments, but at
least this allows overwriting at all.

Change-Id: I21dac4c24046e05efe74e6c6fd189a02c95eb6c8
This commit is contained in:
Jeff Wendling 2021-09-27 13:38:15 -04:00 committed by Michał Niewrzał
parent 5b729779a2
commit 91ce70e4ed
2 changed files with 195 additions and 41 deletions

View File

@ -185,12 +185,9 @@ func (db *DB) BeginSegment(ctx context.Context, opts BeginSegment) (err error) {
// NOTE: this isn't strictly necessary, since we can also fail this in CommitSegment.
// however, we should prevent creating segements for non-partial objects.
// NOTE: these queries could be combined into one.
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) {
// Verify that object exists and is partial.
var value int
err = tx.QueryRowContext(ctx, `
err = db.db.QueryRowContext(ctx, `
SELECT 1
FROM objects WHERE
project_id = $1 AND
@ -207,24 +204,6 @@ func (db *DB) BeginSegment(ctx context.Context, opts BeginSegment) (err error) {
return Error.New("unable to query object status: %w", err)
}
// Verify that the segment does not exist.
err = tx.QueryRowContext(ctx, `
SELECT 1
FROM segments WHERE
stream_id = $1 AND
position = $2
`, opts.StreamID, opts.Position).Scan(&value)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return Error.New("unable to query segments: %w", err)
}
err = nil //nolint: wastedassign, ineffassign // ignore any other err result (explicitly)
return nil
})
if err != nil {
return err
}
mon.Meter("segment_begin").Mark(1)
return nil
@ -313,7 +292,15 @@ func (db *DB) CommitSegment(ctx context.Context, opts CommitSegment) (err error)
$6, $7, $8, $9,
$10,
$11
)`, opts.Position, opts.ExpiresAt,
)
ON CONFLICT(stream_id, position)
DO UPDATE SET
expires_at = $2,
root_piece_id = $3, encrypted_key_nonce = $4, encrypted_key = $5,
encrypted_size = $6, plain_offset = $7, plain_size = $8, encrypted_etag = $9,
redundancy = $10,
remote_alias_pieces = $11
`, opts.Position, opts.ExpiresAt,
opts.RootPieceID, opts.EncryptedKeyNonce, opts.EncryptedKey,
opts.EncryptedSize, opts.PlainOffset, opts.PlainSize, opts.EncryptedETag,
redundancyScheme{&opts.Redundancy},
@ -324,9 +311,6 @@ func (db *DB) CommitSegment(ctx context.Context, opts CommitSegment) (err error)
if code := pgerrcode.FromError(err); code == pgxerrcode.NotNullViolation {
return Error.New("pending object missing")
}
if code := pgerrcode.FromError(err); code == pgxerrcode.UniqueViolation {
return ErrConflict.New("segment already exists")
}
return Error.New("unable to insert segment: %w", err)
}
@ -396,7 +380,14 @@ func (db *DB) CommitInlineSegment(ctx context.Context, opts CommitInlineSegment)
$3, $4, $5,
$6, $7, $8, $9,
$10
)`, opts.Position, opts.ExpiresAt,
)
ON CONFLICT(stream_id, position)
DO UPDATE SET
expires_at = $2,
root_piece_id = $3, encrypted_key_nonce = $4, encrypted_key = $5,
encrypted_size = $6, plain_offset = $7, plain_size = $8, encrypted_etag = $9,
inline_data = $10
`, opts.Position, opts.ExpiresAt,
storj.PieceID{}, opts.EncryptedKeyNonce, opts.EncryptedKey,
len(opts.InlineData), opts.PlainOffset, opts.PlainSize, opts.EncryptedETag,
opts.InlineData,
@ -406,9 +397,6 @@ func (db *DB) CommitInlineSegment(ctx context.Context, opts CommitInlineSegment)
if code := pgerrcode.FromError(err); code == pgxerrcode.NotNullViolation {
return Error.New("pending object missing")
}
if code := pgerrcode.FromError(err); code == pgxerrcode.UniqueViolation {
return ErrConflict.New("segment already exists")
}
return Error.New("unable to insert segment: %w", err)
}

View File

@ -1170,8 +1170,6 @@ func TestCommitSegment(t *testing.T) {
PlainOffset: 0,
Redundancy: metabasetest.DefaultRedundancy,
},
ErrClass: &metabase.ErrConflict,
ErrText: "segment already exists",
}.Check(ctx, t, db)
metabasetest.Verify{
@ -1207,6 +1205,102 @@ func TestCommitSegment(t *testing.T) {
}.Check(ctx, t, db)
})
t.Run("overwrite", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
now1 := time.Now()
zombieDeadline := now1.Add(24 * time.Hour)
metabasetest.BeginObjectExactVersion{
Opts: metabase.BeginObjectExactVersion{
ObjectStream: obj,
Encryption: metabasetest.DefaultEncryption,
},
Version: 1,
}.Check(ctx, t, db)
rootPieceID1 := testrand.PieceID()
rootPieceID2 := testrand.PieceID()
pieces1 := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}}
pieces2 := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}}
encryptedKey := testrand.Bytes(32)
encryptedKeyNonce := testrand.Bytes(32)
metabasetest.BeginSegment{
Opts: metabase.BeginSegment{
ObjectStream: obj,
Position: metabase.SegmentPosition{Part: 0, Index: 0},
RootPieceID: rootPieceID1,
Pieces: pieces1,
},
}.Check(ctx, t, db)
metabasetest.CommitSegment{
Opts: metabase.CommitSegment{
ObjectStream: obj,
Position: metabase.SegmentPosition{Part: 0, Index: 0},
RootPieceID: rootPieceID1,
Pieces: pieces1,
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
EncryptedSize: 1024,
PlainSize: 512,
PlainOffset: 0,
Redundancy: metabasetest.DefaultRedundancy,
},
}.Check(ctx, t, db)
metabasetest.CommitSegment{
Opts: metabase.CommitSegment{
ObjectStream: obj,
Position: metabase.SegmentPosition{Part: 0, Index: 0},
RootPieceID: rootPieceID2,
Pieces: pieces2,
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
EncryptedSize: 1024,
PlainSize: 512,
PlainOffset: 0,
Redundancy: metabasetest.DefaultRedundancy,
},
}.Check(ctx, t, db)
metabasetest.Verify{
Objects: []metabase.RawObject{
{
ObjectStream: obj,
CreatedAt: now1,
Status: metabase.Pending,
Encryption: metabasetest.DefaultEncryption,
ZombieDeletionDeadline: &zombieDeadline,
},
},
Segments: []metabase.RawSegment{
{
StreamID: obj.StreamID,
Position: metabase.SegmentPosition{Part: 0, Index: 0},
CreatedAt: now,
RootPieceID: rootPieceID2,
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
EncryptedSize: 1024,
PlainOffset: 0,
PlainSize: 512,
Redundancy: metabasetest.DefaultRedundancy,
Pieces: pieces2,
},
},
}.Check(ctx, t, db)
})
t.Run("commit segment of missing object", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
@ -1567,8 +1661,6 @@ func TestCommitInlineSegment(t *testing.T) {
PlainSize: 512,
PlainOffset: 0,
},
ErrClass: &metabase.ErrConflict,
ErrText: "segment already exists",
}.Check(ctx, t, db)
metabasetest.Verify{
@ -1601,6 +1693,80 @@ func TestCommitInlineSegment(t *testing.T) {
}.Check(ctx, t, db)
})
t.Run("overwrite", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
now1 := time.Now()
metabasetest.BeginObjectExactVersion{
Opts: metabase.BeginObjectExactVersion{
ObjectStream: obj,
Encryption: metabasetest.DefaultEncryption,
},
Version: 1,
}.Check(ctx, t, db)
encryptedKey := testrand.Bytes(32)
encryptedKeyNonce := testrand.Bytes(32)
metabasetest.CommitInlineSegment{
Opts: metabase.CommitInlineSegment{
ObjectStream: obj,
Position: metabase.SegmentPosition{Part: 0, Index: 0},
InlineData: []byte{1, 2, 3},
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
PlainSize: 512,
PlainOffset: 0,
},
}.Check(ctx, t, db)
metabasetest.CommitInlineSegment{
Opts: metabase.CommitInlineSegment{
ObjectStream: obj,
Position: metabase.SegmentPosition{Part: 0, Index: 0},
InlineData: []byte{4, 5, 6},
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
PlainSize: 512,
PlainOffset: 0,
},
}.Check(ctx, t, db)
metabasetest.Verify{
Objects: []metabase.RawObject{
{
ObjectStream: obj,
CreatedAt: now1,
Status: metabase.Pending,
Encryption: metabasetest.DefaultEncryption,
ZombieDeletionDeadline: &zombieDeadline,
},
},
Segments: []metabase.RawSegment{
{
StreamID: obj.StreamID,
Position: metabase.SegmentPosition{Part: 0, Index: 0},
CreatedAt: now,
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
PlainOffset: 0,
PlainSize: 512,
InlineData: []byte{4, 5, 6},
EncryptedSize: 3,
},
},
}.Check(ctx, t, db)
})
t.Run("commit inline segment of missing object", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)