satellite/metabase: adjust CommitObject to use pending_objects table

Change is adjusting CommitObject to use `pending_objects` table to
commit object.

Satellite stream id is used to determine if we need to use
`pending_objects` or `objects` table during commit.

General goal is to support both tables until `objects` table will be
free from pending objects.

Part of https://github.com/storj/storj/issues/6046

Change-Id: I2ebe0cd6b446727c98c8e210d4d00504dd0dacb6
This commit is contained in:
Michal Niewrzal 2023-07-28 20:39:32 +02:00 committed by Michał Niewrzał
parent 2ed08922d9
commit 03c52f184e
3 changed files with 687 additions and 44 deletions

View File

@ -12,6 +12,7 @@ import (
pgxerrcode "github.com/jackc/pgerrcode"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/exp/slices"
"storj.io/common/memory"
"storj.io/common/storj"
@ -616,6 +617,8 @@ type CommitObject struct {
// Wil be only executed after succesfull commit + delete DB operation.
// Error on this function won't revert back committed object.
OnDelete func(segments []DeletedSegmentInfo)
UsePendingObjectsTable bool
}
// Verify verifies reqest fields.
@ -696,20 +699,6 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
encryptionParameters{&opts.Encryption},
}
metadataColumns := ""
if opts.OverrideEncryptedMetadata {
args = append(args,
opts.EncryptedMetadataNonce,
opts.EncryptedMetadata,
opts.EncryptedMetadataEncryptedKey,
)
metadataColumns = `,
encrypted_metadata_nonce = $11,
encrypted_metadata = $12,
encrypted_metadata_encrypted_key = $13
`
}
versionsToDelete := []Version{}
if err := withRows(tx.QueryContext(ctx, `
SELECT version
@ -745,39 +734,138 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
return ErrPermissionDenied.New("no permissions to delete existing object")
}
err = tx.QueryRowContext(ctx, `
UPDATE objects SET
status =`+committedStatus+`,
segment_count = $6,
if opts.UsePendingObjectsTable {
opts.Version = 1
total_plain_size = $7,
total_encrypted_size = $8,
fixed_segment_size = $9,
zombie_deletion_deadline = NULL,
// we remove from deletion list object with version 1 as we will
// override/update instead deleting and inserting new one.
index := slices.Index(versionsToDelete, opts.Version)
if index != -1 {
versionsToDelete = slices.Delete(versionsToDelete, index, index+1)
}
-- TODO should we allow to override existing encryption parameters or return error if don't match with opts?
encryption = CASE
WHEN objects.encryption = 0 AND $10 <> 0 THEN $10
WHEN objects.encryption = 0 AND $10 = 0 THEN NULL
ELSE objects.encryption
END
`+metadataColumns+`
WHERE
project_id = $1 AND
bucket_name = $2 AND
object_key = $3 AND
version = $4 AND
stream_id = $5 AND
status = `+pendingStatus+`
RETURNING
created_at, expires_at,
encrypted_metadata, encrypted_metadata_encrypted_key, encrypted_metadata_nonce,
encryption
args = append(args,
opts.EncryptedMetadataNonce,
opts.EncryptedMetadata,
opts.EncryptedMetadataEncryptedKey,
opts.OverrideEncryptedMetadata,
)
err = tx.QueryRowContext(ctx, `
WITH delete_pending_object AS (
DELETE FROM pending_objects WHERE
project_id = $1 AND
bucket_name = $2 AND
object_key = $3 AND
stream_id = $5
RETURNING
project_id, bucket_name, object_key, $4::INT as version, stream_id,
`+committedStatus+` as status, $6::INT as segment_count, $7::INT as total_plain_size, $8::INT as total_encrypted_size, $9::INT as fixed_segment_size, NULL as zombie_deletion_deadline,
CASE
WHEN encryption = 0 AND $10 <> 0 THEN $10
WHEN encryption = 0 AND $10 = 0 THEN NULL
ELSE encryption
END as
encryption,
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key
), object_to_commit AS (
SELECT
project_id, bucket_name, object_key, version, stream_id,
status, segment_count, total_plain_size, total_encrypted_size, fixed_segment_size, zombie_deletion_deadline,
encryption,
CASE
WHEN $14::BOOL = true THEN $11
ELSE delete_pending_object.encrypted_metadata_nonce
END as
encrypted_metadata_nonce,
CASE
WHEN $14::BOOL = true THEN $12
ELSE delete_pending_object.encrypted_metadata
END as
encrypted_metadata,
CASE
WHEN $14::BOOL = true THEN $13
ELSE delete_pending_object.encrypted_metadata_encrypted_key
END as
encrypted_metadata_encrypted_key
FROM delete_pending_object
)
INSERT INTO objects (
project_id, bucket_name, object_key, version, stream_id,
status, segment_count, total_plain_size, total_encrypted_size, fixed_segment_size, zombie_deletion_deadline,
encryption,
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key
)
SELECT * FROM object_to_commit
ON CONFLICT (project_id, bucket_name, object_key, version)
DO UPDATE SET (
stream_id,
status, segment_count, total_plain_size, total_encrypted_size, fixed_segment_size, zombie_deletion_deadline,
encryption,
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key
) = (SELECT
stream_id,
status, segment_count, total_plain_size, total_encrypted_size, fixed_segment_size, zombie_deletion_deadline,
encryption,
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key
FROM object_to_commit)
RETURNING
created_at, expires_at,
encrypted_metadata, encrypted_metadata_encrypted_key, encrypted_metadata_nonce,
encryption
`, args...).Scan(
&object.CreatedAt, &object.ExpiresAt,
&object.EncryptedMetadata, &object.EncryptedMetadataEncryptedKey, &object.EncryptedMetadataNonce,
encryptionParameters{&object.Encryption},
)
&object.CreatedAt, &object.ExpiresAt,
&object.EncryptedMetadata, &object.EncryptedMetadataEncryptedKey, &object.EncryptedMetadataNonce,
encryptionParameters{&object.Encryption},
)
} else {
metadataColumns := ""
if opts.OverrideEncryptedMetadata {
args = append(args,
opts.EncryptedMetadataNonce,
opts.EncryptedMetadata,
opts.EncryptedMetadataEncryptedKey,
)
metadataColumns = `,
encrypted_metadata_nonce = $11,
encrypted_metadata = $12,
encrypted_metadata_encrypted_key = $13
`
}
err = tx.QueryRowContext(ctx, `
UPDATE objects SET
status =`+committedStatus+`,
segment_count = $6,
total_plain_size = $7,
total_encrypted_size = $8,
fixed_segment_size = $9,
zombie_deletion_deadline = NULL,
-- TODO should we allow to override existing encryption parameters or return error if don't match with opts?
encryption = CASE
WHEN objects.encryption = 0 AND $10 <> 0 THEN $10
WHEN objects.encryption = 0 AND $10 = 0 THEN NULL
ELSE objects.encryption
END
`+metadataColumns+`
WHERE
project_id = $1 AND
bucket_name = $2 AND
object_key = $3 AND
version = $4 AND
stream_id = $5 AND
status = `+pendingStatus+`
RETURNING
created_at, expires_at,
encrypted_metadata, encrypted_metadata_encrypted_key, encrypted_metadata_nonce,
encryption
`, args...).Scan(
&object.CreatedAt, &object.ExpiresAt,
&object.EncryptedMetadata, &object.EncryptedMetadataEncryptedKey, &object.EncryptedMetadataNonce,
encryptionParameters{&object.Encryption},
)
}
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return ErrObjectNotFound.Wrap(Error.New("object with specified version and pending status is missing"))

View File

@ -3801,6 +3801,559 @@ func TestCommitObject(t *testing.T) {
require.Equal(t, expectedDeletedSegments, deletedSegments)
})
t.Run("use pending objects table", func(t *testing.T) {
obj.Version = metabase.NextVersion
t.Run("version", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
metabasetest.BeginObjectNextVersion{
Opts: metabase.BeginObjectNextVersion{
ObjectStream: metabase.ObjectStream{
ProjectID: obj.ProjectID,
BucketName: obj.BucketName,
ObjectKey: obj.ObjectKey,
StreamID: obj.StreamID,
},
Encryption: metabasetest.DefaultEncryption,
UsePendingObjectsTable: true,
},
Version: 1,
}.Check(ctx, t, db)
now := time.Now()
encryptedMetadata := testrand.Bytes(1024)
encryptedMetadataNonce := testrand.Nonce()
encryptedMetadataKey := testrand.Bytes(265)
metabasetest.CommitObject{
Opts: metabase.CommitObject{
ObjectStream: metabase.ObjectStream{
ProjectID: obj.ProjectID,
BucketName: obj.BucketName,
ObjectKey: obj.ObjectKey,
Version: 1,
StreamID: obj.StreamID,
},
OverrideEncryptedMetadata: true,
EncryptedMetadataNonce: encryptedMetadataNonce[:],
EncryptedMetadata: encryptedMetadata,
EncryptedMetadataEncryptedKey: encryptedMetadataKey,
UsePendingObjectsTable: true,
},
}.Check(ctx, t, db)
// disallow for double commit
metabasetest.CommitObject{
Opts: metabase.CommitObject{
ObjectStream: metabase.ObjectStream{
ProjectID: obj.ProjectID,
BucketName: obj.BucketName,
ObjectKey: obj.ObjectKey,
Version: 1,
StreamID: obj.StreamID,
},
UsePendingObjectsTable: true,
},
ErrClass: &metabase.ErrObjectNotFound,
ErrText: "metabase: object with specified version and pending status is missing", // TODO: this error message could be better
}.Check(ctx, t, db)
metabasetest.Verify{
Objects: []metabase.RawObject{
{
ObjectStream: metabase.ObjectStream{
ProjectID: obj.ProjectID,
BucketName: obj.BucketName,
ObjectKey: obj.ObjectKey,
Version: 1,
StreamID: obj.StreamID,
},
CreatedAt: now,
Status: metabase.Committed,
EncryptedMetadataNonce: encryptedMetadataNonce[:],
EncryptedMetadata: encryptedMetadata,
EncryptedMetadataEncryptedKey: encryptedMetadataKey,
Encryption: metabasetest.DefaultEncryption,
},
},
}.Check(ctx, t, db)
})
t.Run("assign plain_offset", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
metabasetest.BeginObjectNextVersion{
Opts: metabase.BeginObjectNextVersion{
ObjectStream: obj,
Encryption: metabasetest.DefaultEncryption,
UsePendingObjectsTable: true,
},
Version: 1,
}.Check(ctx, t, db)
obj.Version++
now := time.Now()
rootPieceID := testrand.PieceID()
pieces := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}}
encryptedKey := testrand.Bytes(32)
encryptedKeyNonce := testrand.Bytes(32)
metabasetest.CommitSegment{
Opts: metabase.CommitSegment{
ObjectStream: obj,
Position: metabase.SegmentPosition{Index: 0},
RootPieceID: rootPieceID,
Pieces: pieces,
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
EncryptedSize: 1024,
PlainSize: 512,
PlainOffset: 999999,
Redundancy: metabasetest.DefaultRedundancy,
UsePendingObjectsTable: true,
},
}.Check(ctx, t, db)
metabasetest.CommitSegment{
Opts: metabase.CommitSegment{
ObjectStream: obj,
Position: metabase.SegmentPosition{Index: 1},
RootPieceID: rootPieceID,
Pieces: pieces,
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
EncryptedSize: 1024,
PlainSize: 512,
PlainOffset: 999999,
Redundancy: metabasetest.DefaultRedundancy,
UsePendingObjectsTable: true,
},
}.Check(ctx, t, db)
metabasetest.CommitObject{
Opts: metabase.CommitObject{
ObjectStream: obj,
UsePendingObjectsTable: true,
},
}.Check(ctx, t, db)
metabasetest.Verify{
Segments: []metabase.RawSegment{
{
StreamID: obj.StreamID,
Position: metabase.SegmentPosition{Index: 0},
CreatedAt: now,
RootPieceID: rootPieceID,
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
EncryptedSize: 1024,
PlainSize: 512,
PlainOffset: 0,
Redundancy: metabasetest.DefaultRedundancy,
Pieces: pieces,
},
{
StreamID: obj.StreamID,
Position: metabase.SegmentPosition{Index: 1},
CreatedAt: now,
RootPieceID: rootPieceID,
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
EncryptedSize: 1024,
PlainSize: 512,
PlainOffset: 512,
Redundancy: metabasetest.DefaultRedundancy,
Pieces: pieces,
},
},
Objects: []metabase.RawObject{
{
ObjectStream: obj,
CreatedAt: now,
Status: metabase.Committed,
SegmentCount: 2,
FixedSegmentSize: 512,
TotalPlainSize: 2 * 512,
TotalEncryptedSize: 2 * 1024,
Encryption: metabasetest.DefaultEncryption,
},
},
}.Check(ctx, t, db)
})
t.Run("large object over 2 GB", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
obj.Version = metabase.NextVersion
metabasetest.BeginObjectNextVersion{
Opts: metabase.BeginObjectNextVersion{
ObjectStream: obj,
Encryption: metabasetest.DefaultEncryption,
UsePendingObjectsTable: true,
},
Version: 1,
}.Check(ctx, t, db)
obj.Version++
now := time.Now()
rootPieceID := testrand.PieceID()
pieces := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}}
encryptedKey := testrand.Bytes(32)
encryptedKeyNonce := testrand.Bytes(32)
metabasetest.CommitSegment{
Opts: metabase.CommitSegment{
ObjectStream: obj,
Position: metabase.SegmentPosition{Index: 0},
RootPieceID: rootPieceID,
Pieces: pieces,
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
EncryptedSize: math.MaxInt32,
PlainSize: math.MaxInt32,
Redundancy: metabasetest.DefaultRedundancy,
UsePendingObjectsTable: true,
},
}.Check(ctx, t, db)
metabasetest.CommitSegment{
Opts: metabase.CommitSegment{
ObjectStream: obj,
Position: metabase.SegmentPosition{Index: 1},
RootPieceID: rootPieceID,
Pieces: pieces,
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
EncryptedSize: math.MaxInt32,
PlainSize: math.MaxInt32,
Redundancy: metabasetest.DefaultRedundancy,
UsePendingObjectsTable: true,
},
}.Check(ctx, t, db)
metabasetest.CommitObject{
Opts: metabase.CommitObject{
ObjectStream: obj,
UsePendingObjectsTable: true,
},
}.Check(ctx, t, db)
metabasetest.Verify{
Segments: []metabase.RawSegment{
{
StreamID: obj.StreamID,
Position: metabase.SegmentPosition{Index: 0},
CreatedAt: now,
RootPieceID: rootPieceID,
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
EncryptedSize: math.MaxInt32,
PlainSize: math.MaxInt32,
Redundancy: metabasetest.DefaultRedundancy,
Pieces: pieces,
},
{
StreamID: obj.StreamID,
Position: metabase.SegmentPosition{Index: 1},
CreatedAt: now,
RootPieceID: rootPieceID,
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
EncryptedSize: math.MaxInt32,
PlainSize: math.MaxInt32,
PlainOffset: math.MaxInt32,
Redundancy: metabasetest.DefaultRedundancy,
Pieces: pieces,
},
},
Objects: []metabase.RawObject{
{
ObjectStream: obj,
CreatedAt: now,
Status: metabase.Committed,
SegmentCount: 2,
FixedSegmentSize: math.MaxInt32,
TotalPlainSize: 2 * math.MaxInt32,
TotalEncryptedSize: 2 * math.MaxInt32,
Encryption: metabasetest.DefaultEncryption,
},
},
}.Check(ctx, t, db)
})
t.Run("commit with encryption (no override)", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
obj.Version = metabase.NextVersion
metabasetest.BeginObjectNextVersion{
Opts: metabase.BeginObjectNextVersion{
ObjectStream: obj,
Encryption: metabasetest.DefaultEncryption,
UsePendingObjectsTable: true,
},
Version: 1,
}.Check(ctx, t, db)
obj.Version++
now := time.Now()
metabasetest.CommitObject{
Opts: metabase.CommitObject{
ObjectStream: obj,
// set different encryption than with BeginObjectExactVersion
Encryption: storj.EncryptionParameters{
CipherSuite: storj.EncNull,
BlockSize: 512,
},
UsePendingObjectsTable: true,
},
}.Check(ctx, t, db)
metabasetest.Verify{
Objects: []metabase.RawObject{
{
ObjectStream: obj,
CreatedAt: now,
Status: metabase.Committed,
SegmentCount: 0,
Encryption: metabasetest.DefaultEncryption,
},
},
}.Check(ctx, t, db)
})
t.Run("commit with metadata (no overwrite)", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
now := time.Now()
expectedMetadata := testrand.Bytes(memory.KiB)
expectedMetadataKey := testrand.Bytes(32)
expectedMetadataNonce := testrand.Nonce().Bytes()
obj.Version = metabase.NextVersion
metabasetest.BeginObjectNextVersion{
Opts: metabase.BeginObjectNextVersion{
ObjectStream: obj,
Encryption: metabasetest.DefaultEncryption,
EncryptedMetadata: expectedMetadata,
EncryptedMetadataEncryptedKey: expectedMetadataKey,
EncryptedMetadataNonce: expectedMetadataNonce,
UsePendingObjectsTable: true,
},
Version: metabase.DefaultVersion,
}.Check(ctx, t, db)
obj.Version++
metabasetest.CommitObject{
Opts: metabase.CommitObject{
ObjectStream: obj,
Encryption: metabasetest.DefaultEncryption,
UsePendingObjectsTable: true,
},
}.Check(ctx, t, db)
metabasetest.Verify{
Objects: []metabase.RawObject{
{
ObjectStream: obj,
CreatedAt: now,
Status: metabase.Committed,
Encryption: metabasetest.DefaultEncryption,
EncryptedMetadata: expectedMetadata,
EncryptedMetadataEncryptedKey: expectedMetadataKey,
EncryptedMetadataNonce: expectedMetadataNonce,
},
},
}.Check(ctx, t, db)
})
t.Run("commit with metadata (overwrite)", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
now := time.Now()
expectedMetadata := testrand.Bytes(memory.KiB)
expecedMetadataKey := testrand.Bytes(32)
expecedMetadataNonce := testrand.Nonce().Bytes()
obj.Version = metabase.NextVersion
metabasetest.BeginObjectNextVersion{
Opts: metabase.BeginObjectNextVersion{
ObjectStream: obj,
Encryption: metabasetest.DefaultEncryption,
EncryptedMetadata: testrand.Bytes(memory.KiB),
EncryptedMetadataEncryptedKey: testrand.Bytes(32),
EncryptedMetadataNonce: testrand.Nonce().Bytes(),
UsePendingObjectsTable: true,
},
Version: 1,
}.Check(ctx, t, db)
obj.Version++
metabasetest.CommitObject{
Opts: metabase.CommitObject{
ObjectStream: obj,
Encryption: metabasetest.DefaultEncryption,
OverrideEncryptedMetadata: true,
EncryptedMetadata: expectedMetadata,
EncryptedMetadataEncryptedKey: expecedMetadataKey,
EncryptedMetadataNonce: expecedMetadataNonce,
UsePendingObjectsTable: true,
},
}.Check(ctx, t, db)
metabasetest.Verify{
Objects: []metabase.RawObject{
{
ObjectStream: obj,
CreatedAt: now,
Status: metabase.Committed,
Encryption: metabasetest.DefaultEncryption,
EncryptedMetadata: expectedMetadata,
EncryptedMetadataEncryptedKey: expecedMetadataKey,
EncryptedMetadataNonce: expecedMetadataNonce,
},
},
}.Check(ctx, t, db)
})
t.Run("commit with empty metadata (overwrite)", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
now := time.Now()
obj.Version = metabase.NextVersion
metabasetest.BeginObjectNextVersion{
Opts: metabase.BeginObjectNextVersion{
ObjectStream: obj,
Encryption: metabasetest.DefaultEncryption,
EncryptedMetadata: testrand.Bytes(memory.KiB),
EncryptedMetadataEncryptedKey: testrand.Bytes(32),
EncryptedMetadataNonce: testrand.Nonce().Bytes(),
UsePendingObjectsTable: true,
},
Version: 1,
}.Check(ctx, t, db)
obj.Version++
metabasetest.CommitObject{
Opts: metabase.CommitObject{
ObjectStream: obj,
Encryption: metabasetest.DefaultEncryption,
OverrideEncryptedMetadata: true,
EncryptedMetadata: nil,
EncryptedMetadataEncryptedKey: nil,
EncryptedMetadataNonce: nil,
UsePendingObjectsTable: true,
},
}.Check(ctx, t, db)
metabasetest.Verify{
Objects: []metabase.RawObject{
{
ObjectStream: obj,
CreatedAt: now,
Status: metabase.Committed,
Encryption: metabasetest.DefaultEncryption,
},
},
}.Check(ctx, t, db)
})
t.Run("overwrite object", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
now := time.Now()
obj := metabasetest.RandObjectStream()
for i := 0; i < 10; i++ {
obj.Version = metabase.NextVersion
obj.StreamID = testrand.UUID()
metabasetest.BeginObjectNextVersion{
Opts: metabase.BeginObjectNextVersion{
ObjectStream: obj,
Encryption: metabasetest.DefaultEncryption,
EncryptedMetadata: testrand.Bytes(memory.KiB),
EncryptedMetadataEncryptedKey: testrand.Bytes(32),
EncryptedMetadataNonce: testrand.Nonce().Bytes(),
UsePendingObjectsTable: true,
},
Version: 1,
}.Check(ctx, t, db)
obj.Version++
metabasetest.CommitObject{
Opts: metabase.CommitObject{
ObjectStream: obj,
Encryption: metabasetest.DefaultEncryption,
OverrideEncryptedMetadata: true,
EncryptedMetadata: nil,
EncryptedMetadataEncryptedKey: nil,
EncryptedMetadataNonce: nil,
UsePendingObjectsTable: true,
},
}.Check(ctx, t, db)
metabasetest.Verify{
Objects: []metabase.RawObject{
{
ObjectStream: obj,
CreatedAt: now,
Status: metabase.Committed,
Encryption: metabasetest.DefaultEncryption,
},
},
}.Check(ctx, t, db)
}
})
})
})
}

View File

@ -255,6 +255,8 @@ func (endpoint *Endpoint) CommitObject(ctx context.Context, req *pb.ObjectCommit
Encryption: encryption,
DisallowDelete: !allowDelete,
UsePendingObjectsTable: streamID.UsePendingObjectsTable,
}
// uplink can send empty metadata with not empty key/nonce
// we need to fix it on uplink side but that part will be