Metainfo handle inline segments in ReadOnlyStream (#674)

This commit is contained in:
Kaloyan Raev 2018-11-19 13:08:28 +02:00 committed by GitHub
parent 2e0fa0c7e3
commit bb1cf151e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 32 additions and 8 deletions

View File

@ -93,7 +93,7 @@ func TestGetObjectStream(t *testing.T) {
stream, err := db.GetObjectStream(ctx, bucket.Name, "empty-file")
if assert.NoError(t, err) {
assertStream(ctx, t, stream, "empty-file", []byte(nil))
assertStream(ctx, t, stream, "empty-file", []byte{})
}
stream, err = db.GetObjectStream(ctx, bucket.Name, "test-file")
@ -119,8 +119,7 @@ func assertStream(ctx context.Context, t *testing.T, stream storj.ReadOnlyStream
assert.EqualValues(t, 0, segments[0].Index)
assert.EqualValues(t, len(content), segments[0].Size)
// TODO: Currently Inline is always empty
// assert.Equal(t, content, segments[0].Inline)
assert.Equal(t, content, segments[0].Inline)
return true
}

View File

@ -10,6 +10,7 @@ import (
"github.com/gogo/protobuf/proto"
"storj.io/storj/pkg/encryption"
"storj.io/storj/pkg/node"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
)
@ -37,16 +38,17 @@ func (stream *readonlyStream) SegmentsAt(ctx context.Context, byteOffset int64,
return stream.Segments(ctx, index, limit)
}
func (stream *readonlyStream) segment(ctx context.Context, index int64) (info storj.Segment, err error) {
func (stream *readonlyStream) segment(ctx context.Context, index int64) (segment storj.Segment, err error) {
defer mon.Task()(&ctx)(&err)
segment := storj.Segment{
segment = storj.Segment{
Index: index,
}
var segmentPath storj.Path
isLastSegment := segment.Index+1 == stream.info.SegmentCount
if !isLastSegment {
segmentPath := getSegmentPath(stream.encryptedPath, index+1)
segmentPath = getSegmentPath(stream.encryptedPath, index)
_, meta, err := stream.db.segments.Get(ctx, segmentPath)
if err != nil {
return segment, err
@ -62,17 +64,40 @@ func (stream *readonlyStream) segment(ctx context.Context, index int64) (info st
copy(segment.EncryptedKeyNonce[:], segmentMeta.KeyNonce)
segment.EncryptedKey = segmentMeta.EncryptedKey
} else {
segmentPath = storj.JoinPaths("l", stream.encryptedPath)
segment.Size = stream.info.LastSegment.Size
segment.EncryptedKeyNonce = stream.info.LastSegment.EncryptedKeyNonce
segment.EncryptedKey = stream.info.LastSegment.EncryptedKey
}
var nonce storj.Nonce
_, err = encryption.Increment(&nonce, index+1)
contentKey, err := encryption.DecryptKey(segment.EncryptedKey, stream.Info().EncryptionScheme.Cipher, stream.streamKey, &segment.EncryptedKeyNonce)
if err != nil {
return segment, err
}
nonce := new(storj.Nonce)
_, err = encryption.Increment(nonce, index+1)
if err != nil {
return segment, err
}
pointer, _, err := stream.db.pointers.Get(ctx, segmentPath)
if err != nil {
return segment, err
}
if pointer.GetType() == pb.Pointer_INLINE {
segment.Inline, err = encryption.Decrypt(pointer.InlineSegment, stream.info.EncryptionScheme.Cipher, contentKey, nonce)
} else {
segment.PieceID = storj.PieceID(pointer.Remote.PieceId)
segment.Pieces = make([]storj.Piece, 0, len(pointer.Remote.RemotePieces))
for _, piece := range pointer.Remote.RemotePieces {
var nodeID storj.NodeID
copy(nodeID[:], node.IDFromString(piece.NodeId).Bytes())
segment.Pieces = append(segment.Pieces, storj.Piece{Number: byte(piece.PieceNum), Location: nodeID})
}
}
return segment, nil
}