satellite/metainfo/metabase: add support for encrypted ETag

Change-Id: I142d5f759a452b12bf2ce67e6c939846a5e86ce7
This commit is contained in:
Kaloyan Raev 2021-03-25 09:53:10 +02:00
parent cc42dfcc78
commit ec929ca1da
13 changed files with 93 additions and 36 deletions

View File

@ -221,6 +221,8 @@ type CommitSegment struct {
PlainSize int32 // size before encryption
EncryptedSize int32 // segment size after encryption
EncryptedETag []byte
Redundancy storj.RedundancyScheme
Pieces Pieces
@ -269,26 +271,26 @@ func (db *DB) CommitSegment(ctx context.Context, opts CommitSegment) (err error)
INSERT INTO segments (
stream_id, position,
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size, plain_offset, plain_size,
encrypted_size, plain_offset, plain_size, encrypted_etag,
redundancy,
remote_alias_pieces
) VALUES (
(SELECT stream_id
FROM objects WHERE
project_id = $10 AND
bucket_name = $11 AND
object_key = $12 AND
version = $13 AND
stream_id = $14 AND
project_id = $11 AND
bucket_name = $12 AND
object_key = $13 AND
version = $14 AND
stream_id = $15 AND
status = `+pendingStatus+
` ), $1,
$2, $3, $4,
$5, $6, $7,
$8,
$9
$5, $6, $7, $8,
$9,
$10
)`, opts.Position,
opts.RootPieceID, opts.EncryptedKeyNonce, opts.EncryptedKey,
opts.EncryptedSize, opts.PlainOffset, opts.PlainSize,
opts.EncryptedSize, opts.PlainOffset, opts.PlainSize, opts.EncryptedETag,
redundancyScheme{&opts.Redundancy},
aliasPieces,
opts.ProjectID, []byte(opts.BucketName), []byte(opts.ObjectKey), opts.Version, opts.StreamID,
@ -314,8 +316,9 @@ type CommitInlineSegment struct {
EncryptedKeyNonce []byte
EncryptedKey []byte
PlainOffset int64 // offset in the original data stream
PlainSize int32 // size before encryption
PlainOffset int64 // offset in the original data stream
PlainSize int32 // size before encryption
EncryptedETag []byte
InlineData []byte
}
@ -347,24 +350,24 @@ func (db *DB) CommitInlineSegment(ctx context.Context, opts CommitInlineSegment)
INSERT INTO segments (
stream_id, position,
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size, plain_offset, plain_size,
encrypted_size, plain_offset, plain_size, encrypted_etag,
inline_data
) VALUES (
(SELECT stream_id
FROM objects WHERE
project_id = $9 AND
bucket_name = $10 AND
object_key = $11 AND
version = $12 AND
stream_id = $13 AND
project_id = $10 AND
bucket_name = $11 AND
object_key = $12 AND
version = $13 AND
stream_id = $14 AND
status = `+pendingStatus+
` ), $1,
$2, $3, $4,
$5, $6, $7,
$8
$5, $6, $7, $8,
$9
)`, opts.Position,
storj.PieceID{}, opts.EncryptedKeyNonce, opts.EncryptedKey,
len(opts.InlineData), opts.PlainOffset, opts.PlainSize,
len(opts.InlineData), opts.PlainOffset, opts.PlainSize, opts.EncryptedETag,
opts.InlineData,
opts.ProjectID, []byte(opts.BucketName), []byte(opts.ObjectKey), opts.Version, opts.StreamID,
)

View File

@ -1369,6 +1369,7 @@ func TestCommitSegment(t *testing.T) {
pieces := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}}
encryptedKey := testrand.Bytes(32)
encryptedKeyNonce := testrand.Bytes(32)
encryptedETag := testrand.Bytes(32)
now := time.Now()
BeginObjectExactVersion{
@ -1392,6 +1393,7 @@ func TestCommitSegment(t *testing.T) {
PlainSize: 512,
PlainOffset: 0,
Redundancy: defaultTestRedundancy,
EncryptedETag: encryptedETag,
},
}.Check(ctx, t, db)
@ -1417,6 +1419,7 @@ func TestCommitSegment(t *testing.T) {
EncryptedSize: 1024,
PlainOffset: 0,
PlainSize: 512,
EncryptedETag: encryptedETag,
Redundancy: defaultTestRedundancy,
@ -1660,6 +1663,7 @@ func TestCommitInlineSegment(t *testing.T) {
encryptedKey := testrand.Bytes(32)
encryptedKeyNonce := testrand.Bytes(32)
encryptedETag := testrand.Bytes(32)
now := time.Now()
BeginObjectExactVersion{
@ -1677,8 +1681,9 @@ func TestCommitInlineSegment(t *testing.T) {
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
PlainSize: 0,
PlainOffset: 0,
PlainSize: 0,
PlainOffset: 0,
EncryptedETag: encryptedETag,
},
}.Check(ctx, t, db)
@ -1703,6 +1708,7 @@ func TestCommitInlineSegment(t *testing.T) {
PlainSize: 0,
EncryptedSize: 0,
EncryptedETag: encryptedETag,
},
},
}.Check(ctx, t, db)
@ -1713,6 +1719,7 @@ func TestCommitInlineSegment(t *testing.T) {
encryptedKey := testrand.Bytes(32)
encryptedKeyNonce := testrand.Bytes(32)
encryptedETag := testrand.Bytes(32)
now := time.Now()
BeginObjectExactVersion{
@ -1731,8 +1738,9 @@ func TestCommitInlineSegment(t *testing.T) {
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
PlainSize: 512,
PlainOffset: 0,
PlainSize: 512,
PlainOffset: 0,
EncryptedETag: encryptedETag,
},
}.Check(ctx, t, db)
@ -1758,6 +1766,7 @@ func TestCommitInlineSegment(t *testing.T) {
InlineData: []byte{1, 2, 3},
EncryptedSize: 3,
EncryptedETag: encryptedETag,
},
},
}.Check(ctx, t, db)

View File

@ -197,6 +197,7 @@ func TestDeleteBucketObjects(t *testing.T) {
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
EncryptedETag: []byte{5},
EncryptedSize: 1024,
PlainSize: 512,
@ -213,6 +214,7 @@ func TestDeleteBucketObjects(t *testing.T) {
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
EncryptedETag: []byte{5},
EncryptedSize: 1024,
PlainSize: 512,

View File

@ -109,6 +109,7 @@ func TestDeleteExpiredObjects(t *testing.T) {
CreatedAt: &now,
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
EncryptedETag: []byte{5},
EncryptedSize: 1060,
PlainSize: 512,
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},

View File

@ -720,6 +720,7 @@ func TestDeleteObjectLatestVersion(t *testing.T) {
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
EncryptedETag: []byte{5},
EncryptedSize: 1024,
PlainSize: 512,
@ -1250,6 +1251,7 @@ func createPendingObject(ctx *testcontext.Context, t *testing.T, db *metabase.DB
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
EncryptedETag: []byte{5},
EncryptedSize: 1024,
PlainSize: 512,
@ -1291,6 +1293,7 @@ func createObject(ctx *testcontext.Context, t *testing.T, db *metabase.DB, obj m
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
EncryptedETag: []byte{5},
EncryptedSize: 1024,
PlainSize: 512,

View File

@ -174,6 +174,7 @@ func (db *DB) GetSegmentByLocation(ctx context.Context, opts GetSegmentByLocatio
created_at,
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size, plain_offset, plain_size,
encrypted_etag,
redundancy,
inline_data, remote_alias_pieces
FROM segments
@ -192,6 +193,7 @@ func (db *DB) GetSegmentByLocation(ctx context.Context, opts GetSegmentByLocatio
&segment.CreatedAt,
&segment.RootPieceID, &segment.EncryptedKeyNonce, &segment.EncryptedKey,
&segment.EncryptedSize, &segment.PlainOffset, &segment.PlainSize,
&segment.EncryptedETag,
redundancyScheme{&segment.Redundancy},
&segment.InlineData, &aliasPieces,
)
@ -239,6 +241,7 @@ func (db *DB) GetSegmentByPosition(ctx context.Context, opts GetSegmentByPositio
created_at,
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size, plain_offset, plain_size,
encrypted_etag,
redundancy,
inline_data, remote_alias_pieces
FROM segments
@ -250,6 +253,7 @@ func (db *DB) GetSegmentByPosition(ctx context.Context, opts GetSegmentByPositio
&segment.CreatedAt,
&segment.RootPieceID, &segment.EncryptedKeyNonce, &segment.EncryptedKey,
&segment.EncryptedSize, &segment.PlainOffset, &segment.PlainSize,
&segment.EncryptedETag,
redundancyScheme{&segment.Redundancy},
&segment.InlineData, &aliasPieces,
)
@ -291,6 +295,7 @@ func (db *DB) GetLatestObjectLastSegment(ctx context.Context, opts GetLatestObje
created_at,
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size, plain_offset, plain_size,
encrypted_etag,
redundancy,
inline_data, remote_alias_pieces
FROM segments
@ -311,6 +316,7 @@ func (db *DB) GetLatestObjectLastSegment(ctx context.Context, opts GetLatestObje
&segment.CreatedAt,
&segment.RootPieceID, &segment.EncryptedKeyNonce, &segment.EncryptedKey,
&segment.EncryptedSize, &segment.PlainOffset, &segment.PlainSize,
&segment.EncryptedETag,
redundancyScheme{&segment.Redundancy},
&segment.InlineData, &aliasPieces,
)
@ -354,6 +360,7 @@ func (db *DB) GetSegmentByOffset(ctx context.Context, opts GetSegmentByOffset) (
created_at,
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size, plain_offset, plain_size,
encrypted_etag,
redundancy,
inline_data, remote_alias_pieces
FROM segments
@ -376,6 +383,7 @@ func (db *DB) GetSegmentByOffset(ctx context.Context, opts GetSegmentByOffset) (
&segment.CreatedAt,
&segment.RootPieceID, &segment.EncryptedKeyNonce, &segment.EncryptedKey,
&segment.EncryptedSize, &segment.PlainOffset, &segment.PlainSize,
&segment.EncryptedETag,
redundancyScheme{&segment.Redundancy},
&segment.InlineData, &aliasPieces,
)

View File

@ -382,6 +382,7 @@ func TestGetSegmentByLocation(t *testing.T) {
RootPieceID: storj.PieceID{1},
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
EncryptedETag: []byte{5},
EncryptedSize: 1024,
PlainSize: 512,
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
@ -480,6 +481,7 @@ func TestGetSegmentByPosition(t *testing.T) {
RootPieceID: storj.PieceID{1},
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
EncryptedETag: []byte{5},
EncryptedSize: 1024,
PlainSize: 512,
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
@ -581,6 +583,7 @@ func TestGetLatestObjectLastSegment(t *testing.T) {
RootPieceID: storj.PieceID{1},
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
EncryptedETag: []byte{5},
EncryptedSize: 1024,
PlainSize: 512,
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
@ -688,6 +691,7 @@ func TestGetSegmentByOffset(t *testing.T) {
RootPieceID: storj.PieceID{1},
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
EncryptedETag: []byte{5},
EncryptedSize: 1060,
PlainSize: 512,
PlainOffset: int64(i * 512),

View File

@ -48,6 +48,7 @@ func (db *DB) ListSegments(ctx context.Context, opts ListSegments) (result ListS
created_at,
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size, plain_offset, plain_size,
encrypted_etag,
redundancy,
inline_data, remote_alias_pieces
FROM segments
@ -65,6 +66,7 @@ func (db *DB) ListSegments(ctx context.Context, opts ListSegments) (result ListS
&segment.CreatedAt,
&segment.RootPieceID, &segment.EncryptedKeyNonce, &segment.EncryptedKey,
&segment.EncryptedSize, &segment.PlainOffset, &segment.PlainSize,
&segment.EncryptedETag,
redundancyScheme{&segment.Redundancy},
&segment.InlineData, &aliasPieces,
)
@ -112,9 +114,12 @@ type ListStreamPositionsResult struct {
// SegmentPositionInfo contains information for segment position.
type SegmentPositionInfo struct {
Position SegmentPosition
PlainSize int32
CreatedAt *time.Time // TODO: make it non-nilable after we migrate all existing segments to have creation time
Position SegmentPosition
PlainSize int32
CreatedAt *time.Time // TODO: make it non-nilable after we migrate all existing segments to have creation time
EncryptedETag []byte
EncryptedKeyNonce []byte
EncryptedKey []byte
}
// ListStreamPositions lists specified stream segment positions.
@ -135,7 +140,8 @@ func (db *DB) ListStreamPositions(ctx context.Context, opts ListStreamPositions)
err = withRows(db.db.Query(ctx, `
SELECT
position, plain_size, created_at
position, plain_size, created_at,
encrypted_etag, encrypted_key_nonce, encrypted_key
FROM segments
WHERE
stream_id = $1 AND
@ -145,7 +151,10 @@ func (db *DB) ListStreamPositions(ctx context.Context, opts ListStreamPositions)
`, opts.StreamID, opts.Cursor, opts.Limit+1))(func(rows tagsql.Rows) error {
for rows.Next() {
var segment SegmentPositionInfo
err = rows.Scan(&segment.Position, &segment.PlainSize, &segment.CreatedAt)
err = rows.Scan(
&segment.Position, &segment.PlainSize, &segment.CreatedAt,
&segment.EncryptedETag, &segment.EncryptedKeyNonce, &segment.EncryptedKey,
)
if err != nil {
return Error.New("failed to scan segments: %w", err)
}

View File

@ -73,6 +73,7 @@ func TestListSegments(t *testing.T) {
RootPieceID: storj.PieceID{1},
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
EncryptedETag: []byte{5},
EncryptedSize: 1024,
PlainSize: 512,
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
@ -189,6 +190,7 @@ func TestListSegments(t *testing.T) {
RootPieceID: storj.PieceID{1},
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
EncryptedETag: []byte{5},
EncryptedSize: 1024,
PlainSize: 512,
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
@ -228,6 +230,7 @@ func TestListSegments(t *testing.T) {
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
EncryptedETag: []byte{5},
EncryptedSize: 1024,
PlainSize: 512,
@ -324,6 +327,7 @@ func TestListStreamPositions(t *testing.T) {
RootPieceID: storj.PieceID{1},
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
EncryptedETag: []byte{5},
EncryptedSize: 1024,
PlainSize: 512,
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
@ -336,9 +340,12 @@ func TestListStreamPositions(t *testing.T) {
expectedSegment.Position.Index = uint32(i)
expectedRawSegments[i] = metabase.RawSegment(expectedSegment)
expectedSegments[i] = metabase.SegmentPositionInfo{
Position: expectedSegment.Position,
PlainSize: expectedSegment.PlainSize,
CreatedAt: &now,
Position: expectedSegment.Position,
PlainSize: expectedSegment.PlainSize,
CreatedAt: &now,
EncryptedKey: expectedSegment.EncryptedKey,
EncryptedKeyNonce: expectedSegment.EncryptedKeyNonce,
EncryptedETag: expectedSegment.EncryptedETag,
}
}
@ -443,6 +450,7 @@ func TestListStreamPositions(t *testing.T) {
RootPieceID: storj.PieceID{1},
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
EncryptedETag: []byte{5},
EncryptedSize: 1024,
PlainSize: 512,
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
@ -482,6 +490,7 @@ func TestListStreamPositions(t *testing.T) {
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
EncryptedETag: []byte{5},
EncryptedSize: 1024,
PlainSize: 512,
@ -502,9 +511,12 @@ func TestListStreamPositions(t *testing.T) {
pos := expectedSegment.Position
pos.Part = uint32(i)
expectedSegments[i] = metabase.SegmentPositionInfo{
Position: pos,
PlainSize: expectedSegment.PlainSize,
CreatedAt: &now,
Position: pos,
PlainSize: expectedSegment.PlainSize,
CreatedAt: &now,
EncryptedKey: expectedSegment.EncryptedKey,
EncryptedKeyNonce: expectedSegment.EncryptedKeyNonce,
EncryptedETag: expectedSegment.EncryptedETag,
}
}

View File

@ -382,6 +382,7 @@ func TestIterateLoopStreams(t *testing.T) {
PlainSize: 512,
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
EncryptedETag: []byte{5},
})
}
expectedMap[object.StreamID] = expectedSegments

View File

@ -54,6 +54,7 @@ type RawSegment struct {
EncryptedSize int32 // size of the whole segment (not a piece)
PlainSize int32
PlainOffset int64
EncryptedETag []byte
Redundancy storj.RedundancyScheme
@ -168,6 +169,7 @@ func (db *DB) testingGetAllSegments(ctx context.Context) (_ []RawSegment, err er
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size,
plain_offset, plain_size,
encrypted_etag,
redundancy,
inline_data, remote_alias_pieces
FROM segments
@ -194,6 +196,7 @@ func (db *DB) testingGetAllSegments(ctx context.Context) (_ []RawSegment, err er
&seg.EncryptedSize,
&seg.PlainOffset,
&seg.PlainSize,
&seg.EncryptedETag,
redundancyScheme{&seg.Redundancy},

View File

@ -652,6 +652,7 @@ func (co CreateTestObject) Run(ctx *testcontext.Context, t testing.TB, db *metab
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
EncryptedETag: []byte{5},
EncryptedSize: 1060,
PlainSize: 512,

View File

@ -271,6 +271,7 @@ func TestUpdateSegmentPieces(t *testing.T) {
CreatedAt: &now,
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
EncryptedETag: []byte{5},
EncryptedSize: 1024,
PlainOffset: 0,
PlainSize: 512,