satellite/metabase: adjust segment commit to use pending_objects table

Change is adjusting CommitSegment to check pending object existence in
`pending_objects` or `objects` table.

Satellite stream id is used to determine if we need to use
`pending_objects` or `objects` table.

General goal is to support both tables until `objects` table will be
free from pending objects. Whenever it will be needed code will be
supporting both tables at once.

Part of https://github.com/storj/storj/issues/6046

Change-Id: I954444a53b4733ae6fc909420573242b02746787
This commit is contained in:
Michal Niewrzal 2023-07-28 11:30:44 +02:00 committed by Storj Robot
parent fac522d8dd
commit f40805763e
3 changed files with 876 additions and 94 deletions

View File

@ -338,6 +338,8 @@ type CommitSegment struct {
Pieces Pieces
Placement storj.PlacementConstraint
UsePendingObjectsTable bool
}
// CommitSegment commits segment to the database.
@ -378,47 +380,89 @@ func (db *DB) CommitSegment(ctx context.Context, opts CommitSegment) (err error)
return Error.New("unable to convert pieces to aliases: %w", err)
}
// second part will be removed when there will be no pending_objects in objects table.
// Verify that object exists and is partial.
_, err = db.db.ExecContext(ctx, `
INSERT INTO segments (
stream_id, position, expires_at,
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size, plain_offset, plain_size, encrypted_etag,
redundancy,
remote_alias_pieces,
placement
) VALUES (
(SELECT stream_id
FROM objects WHERE
project_id = $12 AND
bucket_name = $13 AND
object_key = $14 AND
version = $15 AND
stream_id = $16 AND
status = `+pendingStatus+
` ), $1, $2,
$3, $4, $5,
$6, $7, $8, $9,
$10,
$11,
$17
)
ON CONFLICT(stream_id, position)
DO UPDATE SET
expires_at = $2,
root_piece_id = $3, encrypted_key_nonce = $4, encrypted_key = $5,
encrypted_size = $6, plain_offset = $7, plain_size = $8, encrypted_etag = $9,
redundancy = $10,
remote_alias_pieces = $11,
placement = $17
if opts.UsePendingObjectsTable {
_, err = db.db.ExecContext(ctx, `
INSERT INTO segments (
stream_id, position, expires_at,
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size, plain_offset, plain_size, encrypted_etag,
redundancy,
remote_alias_pieces,
placement
) VALUES (
(SELECT stream_id
FROM pending_objects WHERE
project_id = $12 AND
bucket_name = $13 AND
object_key = $14 AND
stream_id = $15
), $1, $2,
$3, $4, $5,
$6, $7, $8, $9,
$10,
$11,
$16
)
ON CONFLICT(stream_id, position)
DO UPDATE SET
expires_at = $2,
root_piece_id = $3, encrypted_key_nonce = $4, encrypted_key = $5,
encrypted_size = $6, plain_offset = $7, plain_size = $8, encrypted_etag = $9,
redundancy = $10,
remote_alias_pieces = $11,
placement = $16
`, opts.Position, opts.ExpiresAt,
opts.RootPieceID, opts.EncryptedKeyNonce, opts.EncryptedKey,
opts.EncryptedSize, opts.PlainOffset, opts.PlainSize, opts.EncryptedETag,
redundancyScheme{&opts.Redundancy},
aliasPieces,
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, opts.StreamID,
opts.Placement,
)
opts.RootPieceID, opts.EncryptedKeyNonce, opts.EncryptedKey,
opts.EncryptedSize, opts.PlainOffset, opts.PlainSize, opts.EncryptedETag,
redundancyScheme{&opts.Redundancy},
aliasPieces,
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.StreamID,
opts.Placement,
)
} else {
_, err = db.db.ExecContext(ctx, `
INSERT INTO segments (
stream_id, position, expires_at,
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size, plain_offset, plain_size, encrypted_etag,
redundancy,
remote_alias_pieces,
placement
) VALUES (
(SELECT stream_id
FROM objects WHERE
project_id = $12 AND
bucket_name = $13 AND
object_key = $14 AND
version = $15 AND
stream_id = $16 AND
status = `+pendingStatus+
` ), $1, $2,
$3, $4, $5,
$6, $7, $8, $9,
$10,
$11,
$17
)
ON CONFLICT(stream_id, position)
DO UPDATE SET
expires_at = $2,
root_piece_id = $3, encrypted_key_nonce = $4, encrypted_key = $5,
encrypted_size = $6, plain_offset = $7, plain_size = $8, encrypted_etag = $9,
redundancy = $10,
remote_alias_pieces = $11,
placement = $17
`, opts.Position, opts.ExpiresAt,
opts.RootPieceID, opts.EncryptedKeyNonce, opts.EncryptedKey,
opts.EncryptedSize, opts.PlainOffset, opts.PlainSize, opts.EncryptedETag,
redundancyScheme{&opts.Redundancy},
aliasPieces,
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, opts.StreamID,
opts.Placement,
)
}
if err != nil {
if code := pgerrcode.FromError(err); code == pgxerrcode.NotNullViolation {
return ErrPendingObjectMissing.New("")
@ -448,6 +492,8 @@ type CommitInlineSegment struct {
EncryptedETag []byte
InlineData []byte
UsePendingObjectsTable bool
}
// CommitInlineSegment commits inline segment to the database.
@ -472,39 +518,71 @@ func (db *DB) CommitInlineSegment(ctx context.Context, opts CommitInlineSegment)
return ErrInvalidRequest.New("PlainOffset negative")
}
// Verify that object exists and is partial.
_, err = db.db.ExecContext(ctx, `
INSERT INTO segments (
stream_id, position, expires_at,
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size, plain_offset, plain_size, encrypted_etag,
inline_data
) VALUES (
(SELECT stream_id
FROM objects WHERE
project_id = $11 AND
bucket_name = $12 AND
object_key = $13 AND
version = $14 AND
stream_id = $15 AND
status = `+pendingStatus+
` ), $1, $2,
$3, $4, $5,
$6, $7, $8, $9,
$10
)
ON CONFLICT(stream_id, position)
DO UPDATE SET
expires_at = $2,
root_piece_id = $3, encrypted_key_nonce = $4, encrypted_key = $5,
encrypted_size = $6, plain_offset = $7, plain_size = $8, encrypted_etag = $9,
inline_data = $10
if opts.UsePendingObjectsTable {
_, err = db.db.ExecContext(ctx, `
INSERT INTO segments (
stream_id, position, expires_at,
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size, plain_offset, plain_size, encrypted_etag,
inline_data
) VALUES (
(SELECT stream_id
FROM pending_objects WHERE
project_id = $11 AND
bucket_name = $12 AND
object_key = $13 AND
stream_id = $14
), $1, $2,
$3, $4, $5,
$6, $7, $8, $9,
$10
)
ON CONFLICT(stream_id, position)
DO UPDATE SET
expires_at = $2,
root_piece_id = $3, encrypted_key_nonce = $4, encrypted_key = $5,
encrypted_size = $6, plain_offset = $7, plain_size = $8, encrypted_etag = $9,
inline_data = $10
`, opts.Position, opts.ExpiresAt,
storj.PieceID{}, opts.EncryptedKeyNonce, opts.EncryptedKey,
len(opts.InlineData), opts.PlainOffset, opts.PlainSize, opts.EncryptedETag,
opts.InlineData,
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, opts.StreamID,
)
storj.PieceID{}, opts.EncryptedKeyNonce, opts.EncryptedKey,
len(opts.InlineData), opts.PlainOffset, opts.PlainSize, opts.EncryptedETag,
opts.InlineData,
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.StreamID,
)
} else {
_, err = db.db.ExecContext(ctx, `
INSERT INTO segments (
stream_id, position, expires_at,
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size, plain_offset, plain_size, encrypted_etag,
inline_data
) VALUES (
(SELECT stream_id
FROM objects WHERE
project_id = $11 AND
bucket_name = $12 AND
object_key = $13 AND
version = $14 AND
stream_id = $15 AND
status = `+pendingStatus+
` ), $1, $2,
$3, $4, $5,
$6, $7, $8, $9,
$10
)
ON CONFLICT(stream_id, position)
DO UPDATE SET
expires_at = $2,
root_piece_id = $3, encrypted_key_nonce = $4, encrypted_key = $5,
encrypted_size = $6, plain_offset = $7, plain_size = $8, encrypted_etag = $9,
inline_data = $10
`, opts.Position, opts.ExpiresAt,
storj.PieceID{}, opts.EncryptedKeyNonce, opts.EncryptedKey,
len(opts.InlineData), opts.PlainOffset, opts.PlainSize, opts.EncryptedETag,
opts.InlineData,
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, opts.StreamID,
)
}
if err != nil {
if code := pgerrcode.FromError(err); code == pgxerrcode.NotNullViolation {
return ErrPendingObjectMissing.New("")

View File

@ -1137,6 +1137,8 @@ func TestBeginSegment(t *testing.T) {
}.Check(ctx, t, db)
})
// TODO those test are copies of tests above with some adjustments to test pending_objects table.
// we will be able to delete those tests when we will start supporting only pending_objects table.
t.Run("use pending objects table", func(t *testing.T) {
obj.Version = metabase.NextVersion
t.Run("pending object missing", func(t *testing.T) {
@ -1952,6 +1954,349 @@ func TestCommitSegment(t *testing.T) {
},
}.Check(ctx, t, db)
})
// TODO those test are copies of tests above with some adjustments to test pending_objects table.
// we will be able to delete those tests when we will start supporting only pending_objects table.
t.Run("use pending objects table", func(t *testing.T) {
obj.Version = metabase.NextVersion
t.Run("duplicate", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
now1 := time.Now()
zombieDeadline := now1.Add(24 * time.Hour)
metabasetest.BeginObjectNextVersion{
Opts: metabase.BeginObjectNextVersion{
ObjectStream: obj,
Encryption: metabasetest.DefaultEncryption,
UsePendingObjectsTable: true,
},
Version: 1,
}.Check(ctx, t, db)
rootPieceID := testrand.PieceID()
pieces := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}}
encryptedKey := testrand.Bytes(32)
encryptedKeyNonce := testrand.Bytes(32)
metabasetest.BeginSegment{
Opts: metabase.BeginSegment{
ObjectStream: obj,
Position: metabase.SegmentPosition{Part: 0, Index: 0},
RootPieceID: rootPieceID,
Pieces: pieces,
UsePendingObjectsTable: true,
},
}.Check(ctx, t, db)
metabasetest.CommitSegment{
Opts: metabase.CommitSegment{
ObjectStream: obj,
Position: metabase.SegmentPosition{Part: 0, Index: 0},
RootPieceID: rootPieceID,
Pieces: pieces,
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
EncryptedSize: 1024,
PlainSize: 512,
PlainOffset: 0,
Redundancy: metabasetest.DefaultRedundancy,
UsePendingObjectsTable: true,
},
}.Check(ctx, t, db)
metabasetest.CommitSegment{
Opts: metabase.CommitSegment{
ObjectStream: obj,
Position: metabase.SegmentPosition{Part: 0, Index: 0},
RootPieceID: rootPieceID,
Pieces: pieces,
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
EncryptedSize: 1024,
PlainSize: 512,
PlainOffset: 0,
Redundancy: metabasetest.DefaultRedundancy,
UsePendingObjectsTable: true,
},
}.Check(ctx, t, db)
metabasetest.Verify{
PendingObjects: []metabase.RawPendingObject{
{
PendingObjectStream: metabasetest.ObjectStreamToPending(obj),
CreatedAt: now1,
Encryption: metabasetest.DefaultEncryption,
ZombieDeletionDeadline: &zombieDeadline,
},
},
Segments: []metabase.RawSegment{
{
StreamID: obj.StreamID,
Position: metabase.SegmentPosition{Part: 0, Index: 0},
CreatedAt: now,
RootPieceID: rootPieceID,
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
EncryptedSize: 1024,
PlainOffset: 0,
PlainSize: 512,
Redundancy: metabasetest.DefaultRedundancy,
Pieces: pieces,
},
},
}.Check(ctx, t, db)
})
t.Run("overwrite", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
now1 := time.Now()
zombieDeadline := now1.Add(24 * time.Hour)
metabasetest.BeginObjectNextVersion{
Opts: metabase.BeginObjectNextVersion{
ObjectStream: obj,
Encryption: metabasetest.DefaultEncryption,
UsePendingObjectsTable: true,
},
Version: 1,
}.Check(ctx, t, db)
rootPieceID1 := testrand.PieceID()
rootPieceID2 := testrand.PieceID()
pieces1 := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}}
pieces2 := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}}
encryptedKey := testrand.Bytes(32)
encryptedKeyNonce := testrand.Bytes(32)
metabasetest.BeginSegment{
Opts: metabase.BeginSegment{
ObjectStream: obj,
Position: metabase.SegmentPosition{Part: 0, Index: 0},
RootPieceID: rootPieceID1,
Pieces: pieces1,
UsePendingObjectsTable: true,
},
}.Check(ctx, t, db)
metabasetest.CommitSegment{
Opts: metabase.CommitSegment{
ObjectStream: obj,
Position: metabase.SegmentPosition{Part: 0, Index: 0},
RootPieceID: rootPieceID1,
Pieces: pieces1,
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
EncryptedSize: 1024,
PlainSize: 512,
PlainOffset: 0,
Redundancy: metabasetest.DefaultRedundancy,
UsePendingObjectsTable: true,
},
}.Check(ctx, t, db)
metabasetest.CommitSegment{
Opts: metabase.CommitSegment{
ObjectStream: obj,
Position: metabase.SegmentPosition{Part: 0, Index: 0},
RootPieceID: rootPieceID2,
Pieces: pieces2,
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
EncryptedSize: 1024,
PlainSize: 512,
PlainOffset: 0,
Redundancy: metabasetest.DefaultRedundancy,
UsePendingObjectsTable: true,
},
}.Check(ctx, t, db)
metabasetest.Verify{
PendingObjects: []metabase.RawPendingObject{
{
PendingObjectStream: metabasetest.ObjectStreamToPending(obj),
CreatedAt: now1,
Encryption: metabasetest.DefaultEncryption,
ZombieDeletionDeadline: &zombieDeadline,
},
},
Segments: []metabase.RawSegment{
{
StreamID: obj.StreamID,
Position: metabase.SegmentPosition{Part: 0, Index: 0},
CreatedAt: now,
RootPieceID: rootPieceID2,
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
EncryptedSize: 1024,
PlainOffset: 0,
PlainSize: 512,
Redundancy: metabasetest.DefaultRedundancy,
Pieces: pieces2,
},
},
}.Check(ctx, t, db)
})
t.Run("commit segment of object with expires at", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
rootPieceID := testrand.PieceID()
pieces := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}}
encryptedKey := testrand.Bytes(32)
encryptedKeyNonce := testrand.Bytes(32)
now := time.Now()
expectedExpiresAt := now.Add(33 * time.Hour)
zombieDeadline := now.Add(24 * time.Hour)
metabasetest.BeginObjectNextVersion{
Opts: metabase.BeginObjectNextVersion{
ObjectStream: obj,
Encryption: metabasetest.DefaultEncryption,
ExpiresAt: &expectedExpiresAt,
UsePendingObjectsTable: true,
},
Version: 1,
}.Check(ctx, t, db)
metabasetest.CommitSegment{
Opts: metabase.CommitSegment{
ObjectStream: obj,
ExpiresAt: &expectedExpiresAt,
RootPieceID: rootPieceID,
Pieces: pieces,
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
EncryptedSize: 1024,
PlainSize: 512,
PlainOffset: 0,
Redundancy: metabasetest.DefaultRedundancy,
UsePendingObjectsTable: true,
},
}.Check(ctx, t, db)
metabasetest.Verify{
PendingObjects: []metabase.RawPendingObject{
{
PendingObjectStream: metabasetest.ObjectStreamToPending(obj),
CreatedAt: now,
ExpiresAt: &expectedExpiresAt,
Encryption: metabasetest.DefaultEncryption,
ZombieDeletionDeadline: &zombieDeadline,
},
},
Segments: []metabase.RawSegment{
{
StreamID: obj.StreamID,
CreatedAt: now,
ExpiresAt: &expectedExpiresAt,
RootPieceID: rootPieceID,
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
EncryptedSize: 1024,
PlainOffset: 0,
PlainSize: 512,
Redundancy: metabasetest.DefaultRedundancy,
Pieces: pieces,
},
},
}.Check(ctx, t, db)
})
t.Run("commit segment of pending object", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
rootPieceID := testrand.PieceID()
pieces := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}}
encryptedKey := testrand.Bytes(32)
encryptedKeyNonce := testrand.Bytes(32)
encryptedETag := testrand.Bytes(32)
now := time.Now()
zombieDeadline := now.Add(24 * time.Hour)
metabasetest.BeginObjectNextVersion{
Opts: metabase.BeginObjectNextVersion{
ObjectStream: obj,
Encryption: metabasetest.DefaultEncryption,
UsePendingObjectsTable: true,
},
Version: 1,
}.Check(ctx, t, db)
metabasetest.CommitSegment{
Opts: metabase.CommitSegment{
ObjectStream: obj,
RootPieceID: rootPieceID,
Pieces: pieces,
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
EncryptedSize: 1024,
PlainSize: 512,
PlainOffset: 0,
Redundancy: metabasetest.DefaultRedundancy,
EncryptedETag: encryptedETag,
UsePendingObjectsTable: true,
},
}.Check(ctx, t, db)
metabasetest.Verify{
PendingObjects: []metabase.RawPendingObject{
{
PendingObjectStream: metabasetest.ObjectStreamToPending(obj),
CreatedAt: now,
Encryption: metabasetest.DefaultEncryption,
ZombieDeletionDeadline: &zombieDeadline,
},
},
Segments: []metabase.RawSegment{
{
StreamID: obj.StreamID,
CreatedAt: now,
RootPieceID: rootPieceID,
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
EncryptedSize: 1024,
PlainOffset: 0,
PlainSize: 512,
EncryptedETag: encryptedETag,
Redundancy: metabasetest.DefaultRedundancy,
Pieces: pieces,
},
},
}.Check(ctx, t, db)
})
})
})
}
@ -2047,6 +2392,29 @@ func TestCommitInlineSegment(t *testing.T) {
}.Check(ctx, t, db)
})
t.Run("commit inline segment of missing object", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
encryptedKey := testrand.Bytes(32)
encryptedKeyNonce := testrand.Bytes(32)
metabasetest.CommitInlineSegment{
Opts: metabase.CommitInlineSegment{
ObjectStream: obj,
InlineData: []byte{1, 2, 3},
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
PlainSize: 512,
PlainOffset: 0,
},
ErrClass: &metabase.ErrPendingObjectMissing,
}.Check(ctx, t, db)
metabasetest.Verify{}.Check(ctx, t, db)
})
t.Run("duplicate", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
@ -2197,29 +2565,6 @@ func TestCommitInlineSegment(t *testing.T) {
}.Check(ctx, t, db)
})
t.Run("commit inline segment of missing object", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
encryptedKey := testrand.Bytes(32)
encryptedKeyNonce := testrand.Bytes(32)
metabasetest.CommitInlineSegment{
Opts: metabase.CommitInlineSegment{
ObjectStream: obj,
InlineData: []byte{1, 2, 3},
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
PlainSize: 512,
PlainOffset: 0,
},
ErrClass: &metabase.ErrPendingObjectMissing,
}.Check(ctx, t, db)
metabasetest.Verify{}.Check(ctx, t, db)
})
t.Run("commit segment of committed object", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
@ -2439,6 +2784,361 @@ func TestCommitInlineSegment(t *testing.T) {
},
}.Check(ctx, t, db)
})
// TODO those test are copies of tests above with some adjustments to test pending_objects table.
// we will be able to delete those tests when we will start supporting only pending_objects table.
t.Run("use pending objects table", func(t *testing.T) {
obj.Version = metabase.NextVersion
t.Run("duplicate", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
now := time.Now()
zombieDeadline := now.Add(24 * time.Hour)
metabasetest.BeginObjectNextVersion{
Opts: metabase.BeginObjectNextVersion{
ObjectStream: obj,
Encryption: metabasetest.DefaultEncryption,
UsePendingObjectsTable: true,
},
Version: 1,
}.Check(ctx, t, db)
encryptedKey := testrand.Bytes(32)
encryptedKeyNonce := testrand.Bytes(32)
metabasetest.CommitInlineSegment{
Opts: metabase.CommitInlineSegment{
ObjectStream: obj,
Position: metabase.SegmentPosition{Part: 0, Index: 0},
InlineData: []byte{1, 2, 3},
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
PlainSize: 512,
PlainOffset: 0,
UsePendingObjectsTable: true,
},
}.Check(ctx, t, db)
metabasetest.CommitInlineSegment{
Opts: metabase.CommitInlineSegment{
ObjectStream: obj,
Position: metabase.SegmentPosition{Part: 0, Index: 0},
InlineData: []byte{1, 2, 3},
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
PlainSize: 512,
PlainOffset: 0,
UsePendingObjectsTable: true,
},
}.Check(ctx, t, db)
metabasetest.Verify{
PendingObjects: []metabase.RawPendingObject{
{
PendingObjectStream: metabasetest.ObjectStreamToPending(obj),
CreatedAt: now,
Encryption: metabasetest.DefaultEncryption,
ZombieDeletionDeadline: &zombieDeadline,
},
},
Segments: []metabase.RawSegment{
{
StreamID: obj.StreamID,
Position: metabase.SegmentPosition{Part: 0, Index: 0},
CreatedAt: now,
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
PlainOffset: 0,
PlainSize: 512,
InlineData: []byte{1, 2, 3},
EncryptedSize: 3,
},
},
}.Check(ctx, t, db)
})
t.Run("overwrite", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
now := time.Now()
zombieDeadline := now.Add(24 * time.Hour)
metabasetest.BeginObjectNextVersion{
Opts: metabase.BeginObjectNextVersion{
ObjectStream: obj,
Encryption: metabasetest.DefaultEncryption,
UsePendingObjectsTable: true,
},
Version: 1,
}.Check(ctx, t, db)
encryptedKey := testrand.Bytes(32)
encryptedKeyNonce := testrand.Bytes(32)
metabasetest.CommitInlineSegment{
Opts: metabase.CommitInlineSegment{
ObjectStream: obj,
Position: metabase.SegmentPosition{Part: 0, Index: 0},
InlineData: []byte{1, 2, 3},
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
PlainSize: 512,
PlainOffset: 0,
UsePendingObjectsTable: true,
},
}.Check(ctx, t, db)
metabasetest.CommitInlineSegment{
Opts: metabase.CommitInlineSegment{
ObjectStream: obj,
Position: metabase.SegmentPosition{Part: 0, Index: 0},
InlineData: []byte{4, 5, 6},
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
PlainSize: 512,
PlainOffset: 0,
UsePendingObjectsTable: true,
},
}.Check(ctx, t, db)
metabasetest.Verify{
PendingObjects: []metabase.RawPendingObject{
{
PendingObjectStream: metabasetest.ObjectStreamToPending(obj),
CreatedAt: now,
Encryption: metabasetest.DefaultEncryption,
ZombieDeletionDeadline: &zombieDeadline,
},
},
Segments: []metabase.RawSegment{
{
StreamID: obj.StreamID,
Position: metabase.SegmentPosition{Part: 0, Index: 0},
CreatedAt: now,
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
PlainOffset: 0,
PlainSize: 512,
InlineData: []byte{4, 5, 6},
EncryptedSize: 3,
},
},
}.Check(ctx, t, db)
})
t.Run("commit empty segment of pending object", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
encryptedKey := testrand.Bytes(32)
encryptedKeyNonce := testrand.Bytes(32)
encryptedETag := testrand.Bytes(32)
now := time.Now()
zombieDeadline := now.Add(24 * time.Hour)
metabasetest.BeginObjectNextVersion{
Opts: metabase.BeginObjectNextVersion{
ObjectStream: obj,
Encryption: metabasetest.DefaultEncryption,
UsePendingObjectsTable: true,
},
Version: 1,
}.Check(ctx, t, db)
metabasetest.CommitInlineSegment{
Opts: metabase.CommitInlineSegment{
ObjectStream: obj,
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
PlainSize: 0,
PlainOffset: 0,
EncryptedETag: encryptedETag,
UsePendingObjectsTable: true,
},
}.Check(ctx, t, db)
metabasetest.Verify{
PendingObjects: []metabase.RawPendingObject{
{
PendingObjectStream: metabasetest.ObjectStreamToPending(obj),
CreatedAt: now,
Encryption: metabasetest.DefaultEncryption,
ZombieDeletionDeadline: &zombieDeadline,
},
},
Segments: []metabase.RawSegment{
{
StreamID: obj.StreamID,
CreatedAt: now,
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
PlainOffset: 0,
PlainSize: 0,
EncryptedSize: 0,
EncryptedETag: encryptedETag,
},
},
}.Check(ctx, t, db)
})
t.Run("commit segment of pending object", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
encryptedKey := testrand.Bytes(32)
encryptedKeyNonce := testrand.Bytes(32)
encryptedETag := testrand.Bytes(32)
now := time.Now()
zombieDeadline := now.Add(24 * time.Hour)
metabasetest.BeginObjectNextVersion{
Opts: metabase.BeginObjectNextVersion{
ObjectStream: obj,
Encryption: metabasetest.DefaultEncryption,
UsePendingObjectsTable: true,
},
Version: 1,
}.Check(ctx, t, db)
metabasetest.CommitInlineSegment{
Opts: metabase.CommitInlineSegment{
ObjectStream: obj,
InlineData: []byte{1, 2, 3},
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
PlainSize: 512,
PlainOffset: 0,
EncryptedETag: encryptedETag,
UsePendingObjectsTable: true,
},
}.Check(ctx, t, db)
metabasetest.Verify{
PendingObjects: []metabase.RawPendingObject{
{
PendingObjectStream: metabasetest.ObjectStreamToPending(obj),
CreatedAt: now,
Encryption: metabasetest.DefaultEncryption,
ZombieDeletionDeadline: &zombieDeadline,
},
},
Segments: []metabase.RawSegment{
{
StreamID: obj.StreamID,
CreatedAt: now,
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
PlainOffset: 0,
PlainSize: 512,
InlineData: []byte{1, 2, 3},
EncryptedSize: 3,
EncryptedETag: encryptedETag,
},
},
}.Check(ctx, t, db)
})
t.Run("commit segment of object with expires at", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
encryptedKey := testrand.Bytes(32)
encryptedKeyNonce := testrand.Bytes(32)
encryptedETag := testrand.Bytes(32)
now := time.Now()
zombieDeadline := now.Add(24 * time.Hour)
expectedExpiresAt := now.Add(33 * time.Hour)
metabasetest.BeginObjectNextVersion{
Opts: metabase.BeginObjectNextVersion{
ObjectStream: obj,
Encryption: metabasetest.DefaultEncryption,
ExpiresAt: &expectedExpiresAt,
UsePendingObjectsTable: true,
},
Version: 1,
}.Check(ctx, t, db)
metabasetest.CommitInlineSegment{
Opts: metabase.CommitInlineSegment{
ObjectStream: obj,
ExpiresAt: &expectedExpiresAt,
InlineData: []byte{1, 2, 3},
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
PlainSize: 512,
PlainOffset: 0,
EncryptedETag: encryptedETag,
UsePendingObjectsTable: true,
},
}.Check(ctx, t, db)
metabasetest.Verify{
PendingObjects: []metabase.RawPendingObject{
{
PendingObjectStream: metabasetest.ObjectStreamToPending(obj),
CreatedAt: now,
ExpiresAt: &expectedExpiresAt,
Encryption: metabasetest.DefaultEncryption,
ZombieDeletionDeadline: &zombieDeadline,
},
},
Segments: []metabase.RawSegment{
{
StreamID: obj.StreamID,
CreatedAt: now,
ExpiresAt: &expectedExpiresAt,
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
PlainOffset: 0,
PlainSize: 512,
InlineData: []byte{1, 2, 3},
EncryptedSize: 3,
EncryptedETag: encryptedETag,
},
},
}.Check(ctx, t, db)
})
})
})
}

View File

@ -373,6 +373,8 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm
Redundancy: rs,
Pieces: pieces,
Placement: storj.PlacementConstraint(streamID.Placement),
UsePendingObjectsTable: streamID.UsePendingObjectsTable,
}
err = endpoint.validateRemoteSegment(ctx, mbCommitSegment, originalLimits)
@ -488,6 +490,8 @@ func (endpoint *Endpoint) MakeInlineSegment(ctx context.Context, req *pb.Segment
EncryptedETag: req.EncryptedETag,
InlineData: req.EncryptedInlineData,
UsePendingObjectsTable: streamID.UsePendingObjectsTable,
})
if err != nil {
return nil, endpoint.convertMetabaseErr(err)