satellite/metabase: GetLatestObjectLastSegment for copied segments

Getting a copied segment by GetLatestObjectLastSegment needs to retrieve inline_data or remote_alias_pieces and other information from the original segment.

Resolves https://github.com/storj/storj/issues/4478

Change-Id: I8c7822c343b1ec3e04683f31a20f71e3097b4b4a
This commit is contained in:
Qweder93 2022-03-02 11:23:47 +02:00 committed by Fadila
parent 15a1428828
commit 9eaeebe115
2 changed files with 548 additions and 50 deletions

View File

@ -178,40 +178,16 @@ func (db *DB) GetSegmentByPosition(ctx context.Context, opts GetSegmentByPositio
}
}
if db.config.ServerSideCopy {
if segment.PiecesInAncestorSegment() {
// TODO check performance
err = db.db.QueryRowContext(ctx, `
SELECT
root_piece_id,
repaired_at,
remote_alias_pieces
FROM segments
WHERE
stream_id IN (SELECT ancestor_stream_id FROM segment_copies WHERE stream_id = $1)
AND position = $2
`, opts.StreamID, opts.Position.Encode()).Scan(
&segment.RootPieceID,
&segment.RepairedAt,
&aliasPieces,
)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return Segment{}, ErrSegmentNotFound.New("segment missing")
}
return Segment{}, Error.New("unable to query segment: %w", err)
}
segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
if err != nil {
return Segment{}, Error.New("unable to convert aliases to pieces: %w", err)
}
}
}
segment.StreamID = opts.StreamID
segment.Position = opts.Position
if db.config.ServerSideCopy {
err = db.updateWithAncestorSegment(ctx, &segment)
if err != nil {
return Segment{}, err
}
}
return segment, nil
}
@ -269,14 +245,59 @@ func (db *DB) GetLatestObjectLastSegment(ctx context.Context, opts GetLatestObje
return Segment{}, Error.New("unable to query segment: %w", err)
}
if len(aliasPieces) > 0 {
segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
if err != nil {
return Segment{}, Error.New("unable to convert aliases to pieces: %w", err)
}
}
if db.config.ServerSideCopy {
err = db.updateWithAncestorSegment(ctx, &segment)
if err != nil {
return Segment{}, err
}
}
return segment, nil
}
func (db *DB) updateWithAncestorSegment(ctx context.Context, segment *Segment) (err error) {
if !segment.PiecesInAncestorSegment() {
return nil
}
var aliasPieces AliasPieces
err = db.db.QueryRowContext(ctx, `
SELECT
root_piece_id,
repaired_at,
remote_alias_pieces
FROM segments
WHERE
stream_id IN (SELECT ancestor_stream_id FROM segment_copies WHERE stream_id = $1)
AND position = $2
`, segment.StreamID, segment.Position.Encode()).Scan(
&segment.RootPieceID,
&segment.RepairedAt,
&aliasPieces,
)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return ErrSegmentNotFound.New("segment missing")
}
return Error.New("unable to query segment: %w", err)
}
segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
if err != nil {
return Error.New("unable to convert aliases to pieces: %w", err)
}
return nil
}
// GetSegmentByOffset contains arguments necessary for fetching a segment information.
type GetSegmentByOffset struct {
ObjectLocation

View File

@ -289,11 +289,6 @@ func TestGetSegmentByPosition(t *testing.T) {
objStream := metabasetest.RandObjectStream()
copyObjStream := metabasetest.RandObjectStream()
// make sure segments are ordered as expected when checking database
if copyObjStream.StreamID.Less(objStream.StreamID) {
objStream.StreamID, copyObjStream.StreamID = copyObjStream.StreamID, objStream.StreamID
}
copyObjStream.ProjectID = objStream.ProjectID
obj := metabasetest.CreateObject(ctx, t, db, objStream, 1)
@ -453,11 +448,6 @@ func TestGetSegmentByPosition(t *testing.T) {
objStream := metabasetest.RandObjectStream()
copyObjStream := metabasetest.RandObjectStream()
// make sure segments are ordered as expected when checking database
if copyObjStream.StreamID.Less(objStream.StreamID) {
objStream.StreamID, copyObjStream.StreamID = copyObjStream.StreamID, objStream.StreamID
}
copyObjStream.ProjectID = objStream.ProjectID
metabasetest.BeginObjectExactVersion{
@ -624,13 +614,7 @@ func TestGetSegmentByPosition(t *testing.T) {
objStream := metabasetest.RandObjectStream()
copyObjStream := metabasetest.RandObjectStream()
data := testrand.Bytes(1024)
// make sure segments are ordered as expected when checking database
if copyObjStream.StreamID.Less(objStream.StreamID) {
objStream.StreamID, copyObjStream.StreamID = copyObjStream.StreamID, objStream.StreamID
}
copyObjStream.ProjectID = objStream.ProjectID
metabasetest.BeginObjectExactVersion{
@ -897,6 +881,499 @@ func TestGetLatestObjectLastSegment(t *testing.T) {
},
}.Check(ctx, t, db)
})
t.Run("Get segment copy", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
objStream := metabasetest.RandObjectStream()
copyObjStream := metabasetest.RandObjectStream()
copyObjStream.ProjectID = objStream.ProjectID
objLocation := objStream.Location()
copyLocation := copyObjStream.Location()
obj := metabasetest.CreateObject(ctx, t, db, objStream, 1)
encryptedKeyNonces := []metabase.EncryptedKeyAndNonce{{
EncryptedKeyNonce: []byte{4},
EncryptedKey: []byte{3},
Position: metabase.SegmentPosition{
Index: 0,
},
}}
newEncryptedKeyNonces := []metabase.EncryptedKeyAndNonce{{
EncryptedKeyNonce: []byte{3},
EncryptedKey: []byte{4},
Position: metabase.SegmentPosition{
Index: 0,
},
}}
metabasetest.BeginCopyObject{
Opts: metabase.BeginCopyObject{
Version: obj.Version,
ObjectLocation: obj.Location(),
},
Result: metabase.BeginCopyObjectResult{
StreamID: obj.StreamID,
EncryptedMetadata: obj.EncryptedMetadata,
EncryptedMetadataKey: obj.EncryptedMetadataEncryptedKey,
EncryptedMetadataKeyNonce: obj.EncryptedMetadataNonce,
EncryptedKeysNonces: encryptedKeyNonces,
EncryptionParameters: obj.Encryption,
},
}.Check(ctx, t, db)
newEncryptedMetadataKeyNonce := testrand.Nonce()
newEncryptedMetadataKey := testrand.Bytes(32)
_, err := db.FinishCopyObject(ctx, metabase.FinishCopyObject{
NewStreamID: copyObjStream.StreamID,
NewBucket: copyObjStream.BucketName,
ObjectStream: obj.ObjectStream,
NewSegmentKeys: newEncryptedKeyNonces,
NewEncryptedObjectKey: []byte(copyObjStream.ObjectKey),
NewEncryptedMetadataKeyNonce: newEncryptedMetadataKeyNonce.Bytes(),
NewEncryptedMetadataKey: newEncryptedMetadataKey,
})
require.NoError(t, err)
expectedSegment := metabase.Segment{
StreamID: obj.StreamID,
Position: metabase.SegmentPosition{
Index: 0,
},
CreatedAt: obj.CreatedAt,
ExpiresAt: obj.ExpiresAt,
RootPieceID: storj.PieceID{1},
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
EncryptedETag: []byte{5},
EncryptedSize: 1024,
PlainSize: 512,
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
Redundancy: metabasetest.DefaultRedundancy,
}
expectedCopiedSegmentRaw := metabase.Segment{
StreamID: copyObjStream.StreamID,
Position: metabase.SegmentPosition{
Index: 0,
},
CreatedAt: obj.CreatedAt,
ExpiresAt: obj.ExpiresAt,
RootPieceID: storj.PieceID{1},
Pieces: metabase.Pieces{},
EncryptedKey: newEncryptedKeyNonces[0].EncryptedKey,
EncryptedKeyNonce: newEncryptedKeyNonces[0].EncryptedKeyNonce,
EncryptedSize: 1024,
PlainSize: 512,
Redundancy: metabasetest.DefaultRedundancy,
}
expectedCopiedSegmentGet := expectedSegment
expectedCopiedSegmentGet.EncryptedETag = nil
expectedCopiedSegmentGet.StreamID = copyObjStream.StreamID
expectedCopiedSegmentGet.EncryptedKey = newEncryptedKeyNonces[0].EncryptedKey
expectedCopiedSegmentGet.EncryptedKeyNonce = newEncryptedKeyNonces[0].EncryptedKeyNonce
metabasetest.GetLatestObjectLastSegment{
Opts: metabase.GetLatestObjectLastSegment{
ObjectLocation: objLocation,
},
Result: expectedSegment,
}.Check(ctx, t, db)
metabasetest.GetLatestObjectLastSegment{
Opts: metabase.GetLatestObjectLastSegment{
ObjectLocation: copyLocation,
},
Result: expectedCopiedSegmentGet,
}.Check(ctx, t, db)
metabasetest.Verify{
Objects: []metabase.RawObject{
{
ObjectStream: obj.ObjectStream,
CreatedAt: now,
Status: metabase.Committed,
SegmentCount: 1,
TotalPlainSize: 512,
TotalEncryptedSize: 1024,
FixedSegmentSize: 512,
Encryption: metabasetest.DefaultEncryption,
},
{
ObjectStream: copyObjStream,
CreatedAt: now,
ExpiresAt: obj.ExpiresAt,
Status: metabase.Committed,
SegmentCount: 1,
TotalPlainSize: 512,
TotalEncryptedSize: 1024,
FixedSegmentSize: 512,
EncryptedMetadataNonce: newEncryptedMetadataKeyNonce[:],
EncryptedMetadataEncryptedKey: newEncryptedMetadataKey,
Encryption: metabasetest.DefaultEncryption,
},
},
Segments: []metabase.RawSegment{
metabase.RawSegment(expectedSegment),
metabase.RawSegment(expectedCopiedSegmentRaw),
},
Copies: []metabase.RawCopy{
{
StreamID: copyObjStream.StreamID,
AncestorStreamID: objStream.StreamID,
}},
}.Check(ctx, t, db)
})
t.Run("Get empty inline segment copy", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
objStream := metabasetest.RandObjectStream()
copyObjStream := metabasetest.RandObjectStream()
copyObjStream.ProjectID = objStream.ProjectID
objLocation := objStream.Location()
copyLocation := copyObjStream.Location()
metabasetest.BeginObjectExactVersion{
Opts: metabase.BeginObjectExactVersion{
ObjectStream: objStream,
Encryption: metabasetest.DefaultEncryption,
},
Version: obj.Version,
}.Check(ctx, t, db)
metabasetest.CommitInlineSegment{
Opts: metabase.CommitInlineSegment{
ObjectStream: objStream,
Position: metabase.SegmentPosition{Part: 0, Index: uint32(0)},
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
PlainSize: 0,
PlainOffset: 0,
},
}.Check(ctx, t, db)
obj := metabasetest.CommitObject{
Opts: metabase.CommitObject{
ObjectStream: objStream,
},
}.Check(ctx, t, db)
encryptedKeyNonces := []metabase.EncryptedKeyAndNonce{{
EncryptedKeyNonce: []byte{4},
EncryptedKey: []byte{3},
Position: metabase.SegmentPosition{
Index: 0,
},
}}
newEncryptedKeyNonces := []metabase.EncryptedKeyAndNonce{{
EncryptedKeyNonce: []byte{3},
EncryptedKey: []byte{4},
Position: metabase.SegmentPosition{
Index: 0,
},
}}
metabasetest.BeginCopyObject{
Opts: metabase.BeginCopyObject{
Version: obj.Version,
ObjectLocation: obj.Location(),
},
Result: metabase.BeginCopyObjectResult{
StreamID: obj.StreamID,
EncryptedMetadata: obj.EncryptedMetadata,
EncryptedMetadataKey: obj.EncryptedMetadataEncryptedKey,
EncryptedMetadataKeyNonce: obj.EncryptedMetadataNonce,
EncryptedKeysNonces: encryptedKeyNonces,
EncryptionParameters: obj.Encryption,
},
}.Check(ctx, t, db)
newEncryptedMetadataKeyNonce := testrand.Nonce()
newEncryptedMetadataKey := testrand.Bytes(32)
_, err := db.FinishCopyObject(ctx, metabase.FinishCopyObject{
ObjectStream: obj.ObjectStream,
NewStreamID: copyObjStream.StreamID,
NewBucket: copyObjStream.BucketName,
NewSegmentKeys: newEncryptedKeyNonces,
NewEncryptedObjectKey: []byte(copyObjStream.ObjectKey),
NewEncryptedMetadataKeyNonce: newEncryptedMetadataKeyNonce.Bytes(),
NewEncryptedMetadataKey: newEncryptedMetadataKey,
})
require.NoError(t, err)
expectedSegment := metabase.Segment{
StreamID: obj.StreamID,
Position: metabase.SegmentPosition{
Index: 0,
},
CreatedAt: obj.CreatedAt,
ExpiresAt: obj.ExpiresAt,
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
EncryptedSize: 0,
PlainSize: 0,
}
expectedCopiedSegmentRaw := metabase.Segment{
StreamID: copyObjStream.StreamID,
Position: metabase.SegmentPosition{
Index: 0,
},
CreatedAt: obj.CreatedAt,
ExpiresAt: obj.ExpiresAt,
Pieces: metabase.Pieces{},
EncryptedKey: newEncryptedKeyNonces[0].EncryptedKey,
EncryptedKeyNonce: newEncryptedKeyNonces[0].EncryptedKeyNonce,
}
expectedCopiedSegmentGet := expectedSegment
expectedCopiedSegmentGet.StreamID = copyObjStream.StreamID
expectedCopiedSegmentGet.EncryptedKey = newEncryptedKeyNonces[0].EncryptedKey
expectedCopiedSegmentGet.EncryptedKeyNonce = newEncryptedKeyNonces[0].EncryptedKeyNonce
metabasetest.GetLatestObjectLastSegment{
Opts: metabase.GetLatestObjectLastSegment{
ObjectLocation: objLocation,
},
Result: expectedSegment,
}.Check(ctx, t, db)
metabasetest.GetLatestObjectLastSegment{
Opts: metabase.GetLatestObjectLastSegment{
ObjectLocation: copyLocation,
},
Result: expectedCopiedSegmentGet,
}.Check(ctx, t, db)
metabasetest.Verify{
Objects: []metabase.RawObject{
{
ObjectStream: obj.ObjectStream,
CreatedAt: now,
Status: metabase.Committed,
SegmentCount: 1,
Encryption: metabasetest.DefaultEncryption,
},
{
ObjectStream: copyObjStream,
CreatedAt: now,
ExpiresAt: obj.ExpiresAt,
Status: metabase.Committed,
SegmentCount: 1,
EncryptedMetadataNonce: newEncryptedMetadataKeyNonce[:],
EncryptedMetadataEncryptedKey: newEncryptedMetadataKey,
Encryption: metabasetest.DefaultEncryption,
},
},
Segments: []metabase.RawSegment{
metabase.RawSegment(expectedSegment),
metabase.RawSegment(expectedCopiedSegmentRaw),
},
Copies: []metabase.RawCopy{
{
StreamID: copyObjStream.StreamID,
AncestorStreamID: objStream.StreamID,
}},
}.Check(ctx, t, db)
})
t.Run("Get inline segment copy", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
objStream := metabasetest.RandObjectStream()
copyObjStream := metabasetest.RandObjectStream()
data := testrand.Bytes(1024)
copyObjStream.ProjectID = objStream.ProjectID
objLocation := objStream.Location()
copyLocation := copyObjStream.Location()
metabasetest.BeginObjectExactVersion{
Opts: metabase.BeginObjectExactVersion{
ObjectStream: objStream,
Encryption: metabasetest.DefaultEncryption,
},
Version: obj.Version,
}.Check(ctx, t, db)
metabasetest.CommitInlineSegment{
Opts: metabase.CommitInlineSegment{
ObjectStream: objStream,
Position: metabase.SegmentPosition{Part: 0, Index: uint32(0)},
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
PlainSize: 0,
PlainOffset: 0,
InlineData: data,
},
}.Check(ctx, t, db)
obj := metabasetest.CommitObject{
Opts: metabase.CommitObject{
ObjectStream: objStream,
},
}.Check(ctx, t, db)
encryptedKeyNonces := []metabase.EncryptedKeyAndNonce{{
EncryptedKeyNonce: []byte{4},
EncryptedKey: []byte{3},
Position: metabase.SegmentPosition{
Index: 0,
},
}}
newEncryptedKeyNonces := []metabase.EncryptedKeyAndNonce{{
EncryptedKeyNonce: []byte{3},
EncryptedKey: []byte{4},
Position: metabase.SegmentPosition{
Index: 0,
},
}}
metabasetest.BeginCopyObject{
Opts: metabase.BeginCopyObject{
Version: obj.Version,
ObjectLocation: obj.Location(),
},
Result: metabase.BeginCopyObjectResult{
StreamID: obj.StreamID,
EncryptedMetadata: obj.EncryptedMetadata,
EncryptedMetadataKey: obj.EncryptedMetadataEncryptedKey,
EncryptedMetadataKeyNonce: obj.EncryptedMetadataNonce,
EncryptedKeysNonces: encryptedKeyNonces,
EncryptionParameters: obj.Encryption,
},
}.Check(ctx, t, db)
newEncryptedMetadataKeyNonce := testrand.Nonce()
newEncryptedMetadataKey := testrand.Bytes(32)
_, err := db.FinishCopyObject(ctx, metabase.FinishCopyObject{
ObjectStream: obj.ObjectStream,
NewStreamID: copyObjStream.StreamID,
NewBucket: copyObjStream.BucketName,
NewSegmentKeys: newEncryptedKeyNonces,
NewEncryptedObjectKey: []byte(copyObjStream.ObjectKey),
NewEncryptedMetadataKeyNonce: newEncryptedMetadataKeyNonce.Bytes(),
NewEncryptedMetadataKey: newEncryptedMetadataKey,
})
require.NoError(t, err)
expectedSegment := metabase.Segment{
StreamID: obj.StreamID,
Position: metabase.SegmentPosition{
Index: 0,
},
CreatedAt: obj.CreatedAt,
ExpiresAt: obj.ExpiresAt,
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
EncryptedSize: 1024,
PlainSize: 0,
InlineData: data,
}
expectedCopiedSegmentRaw := metabase.Segment{
StreamID: copyObjStream.StreamID,
Position: metabase.SegmentPosition{
Index: 0,
},
CreatedAt: obj.CreatedAt,
ExpiresAt: obj.ExpiresAt,
Pieces: metabase.Pieces{},
EncryptedKey: newEncryptedKeyNonces[0].EncryptedKey,
EncryptedKeyNonce: newEncryptedKeyNonces[0].EncryptedKeyNonce,
EncryptedSize: 1024,
InlineData: data,
}
expectedCopiedSegmentGet := expectedSegment
expectedCopiedSegmentGet.StreamID = copyObjStream.StreamID
expectedCopiedSegmentGet.EncryptedKey = newEncryptedKeyNonces[0].EncryptedKey
expectedCopiedSegmentGet.EncryptedKeyNonce = newEncryptedKeyNonces[0].EncryptedKeyNonce
metabasetest.GetLatestObjectLastSegment{
Opts: metabase.GetLatestObjectLastSegment{
ObjectLocation: objLocation,
},
Result: expectedSegment,
}.Check(ctx, t, db)
metabasetest.GetLatestObjectLastSegment{
Opts: metabase.GetLatestObjectLastSegment{
ObjectLocation: copyLocation,
},
Result: expectedCopiedSegmentGet,
}.Check(ctx, t, db)
metabasetest.Verify{
Objects: []metabase.RawObject{
{
ObjectStream: obj.ObjectStream,
CreatedAt: now,
Status: metabase.Committed,
SegmentCount: 1,
Encryption: metabasetest.DefaultEncryption,
TotalEncryptedSize: 1024,
},
{
ObjectStream: copyObjStream,
CreatedAt: now,
ExpiresAt: obj.ExpiresAt,
Status: metabase.Committed,
SegmentCount: 1,
EncryptedMetadataNonce: newEncryptedMetadataKeyNonce[:],
EncryptedMetadataEncryptedKey: newEncryptedMetadataKey,
Encryption: metabasetest.DefaultEncryption,
TotalEncryptedSize: 1024,
},
},
Segments: []metabase.RawSegment{
metabase.RawSegment(expectedSegment),
metabase.RawSegment(expectedCopiedSegmentRaw),
},
Copies: []metabase.RawCopy{
{
StreamID: copyObjStream.StreamID,
AncestorStreamID: objStream.StreamID,
}},
}.Check(ctx, t, db)
})
})
}