satellite/metainfo/metabase: fix FixedSegmentSize calculation

Commiting multipart object with same sized segments should always
end with FixedSegmentSize = -1.

Change-Id: Iedeafb763b62b2513de0a753dce20ddad5b32d90
This commit is contained in:
Egon Elbre 2021-01-11 14:34:36 +02:00
parent ec88d21a3c
commit 0df0e980a0
3 changed files with 131 additions and 5 deletions

View File

@ -458,14 +458,14 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
// TODO: would we even need this when we make main index plain_offset?
fixedSegmentSize := int32(0)
if len(segments) > 0 && segments[0].Position.Part == 0 {
if len(segments) > 0 {
fixedSegmentSize = segments[0].EncryptedSize
for _, seg := range segments[:len(segments)-1] {
for i, seg := range segments {
if seg.Position.Part != 0 {
fixedSegmentSize = -1
break
}
if seg.EncryptedSize != fixedSegmentSize {
if i < len(segments)-1 && seg.EncryptedSize != fixedSegmentSize {
fixedSegmentSize = -1
break
}

View File

@ -69,8 +69,12 @@ func (db *DB) CommitObjectWithSegments(ctx context.Context, opts CommitObjectWit
fixedSegmentSize := int32(0)
if len(finalSegments) > 0 {
fixedSegmentSize = finalSegments[0].EncryptedSize
for _, seg := range finalSegments[:len(finalSegments)-1] {
if seg.EncryptedSize != fixedSegmentSize {
for i, seg := range finalSegments {
if seg.Position.Part != 0 {
fixedSegmentSize = -1
break
}
if i < len(finalSegments)-1 && seg.EncryptedSize != fixedSegmentSize {
fixedSegmentSize = -1
break
}

View File

@ -534,6 +534,128 @@ func TestCommitObjectWithSegments(t *testing.T) {
}.Check(ctx, t, db)
})
t.Run("fixed segment size", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
BeginObjectExactVersion{
Opts: metabase.BeginObjectExactVersion{
ObjectStream: obj,
Encryption: defaultTestEncryption,
},
Version: 1,
}.Check(ctx, t, db)
now := time.Now()
pos00 := metabase.SegmentPosition{Part: 0, Index: 0}
rootPieceID00 := testrand.PieceID()
pieces00 := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}}
encryptedKey00 := testrand.Bytes(32)
encryptedKeyNonce00 := testrand.Bytes(32)
pos10 := metabase.SegmentPosition{Part: 1, Index: 0}
rootPieceID10 := testrand.PieceID()
pieces10 := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}}
encryptedKey10 := testrand.Bytes(32)
encryptedKeyNonce10 := testrand.Bytes(32)
CommitSegment{
Opts: metabase.CommitSegment{
ObjectStream: obj,
Position: pos00,
RootPieceID: rootPieceID00,
Pieces: pieces00,
EncryptedKey: encryptedKey00,
EncryptedKeyNonce: encryptedKeyNonce00,
EncryptedSize: 1024,
PlainSize: 512,
PlainOffset: 0,
Redundancy: defaultTestRedundancy,
},
}.Check(ctx, t, db)
CommitSegment{
Opts: metabase.CommitSegment{
ObjectStream: obj,
Position: pos10,
RootPieceID: rootPieceID10,
Pieces: pieces10,
EncryptedKey: encryptedKey10,
EncryptedKeyNonce: encryptedKeyNonce10,
EncryptedSize: 1024,
PlainSize: 512,
PlainOffset: 0,
Redundancy: defaultTestRedundancy,
},
}.Check(ctx, t, db)
CommitObjectWithSegments{
Opts: metabase.CommitObjectWithSegments{
ObjectStream: obj,
Segments: []metabase.SegmentPosition{
pos00,
pos10,
},
},
}.Check(ctx, t, db)
Verify{
Objects: []metabase.RawObject{
{
ObjectStream: obj,
CreatedAt: now,
Status: metabase.Committed,
SegmentCount: 2,
TotalPlainSize: 1024,
TotalEncryptedSize: 2048,
FixedSegmentSize: -1,
Encryption: defaultTestEncryption,
},
},
Segments: []metabase.RawSegment{
{
StreamID: obj.StreamID,
Position: pos00,
RootPieceID: rootPieceID00,
EncryptedKey: encryptedKey00,
EncryptedKeyNonce: encryptedKeyNonce00,
EncryptedSize: 1024,
PlainOffset: 0,
PlainSize: 512,
Redundancy: defaultTestRedundancy,
Pieces: pieces00,
},
{
StreamID: obj.StreamID,
Position: pos10,
RootPieceID: rootPieceID10,
EncryptedKey: encryptedKey10,
EncryptedKeyNonce: encryptedKeyNonce10,
EncryptedSize: 1024,
PlainOffset: 512,
PlainSize: 512,
Redundancy: defaultTestRedundancy,
Pieces: pieces10,
},
},
}.Check(ctx, t, db)
})
t.Run("no segments", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)