uplink/metainfo: remove additional GetObject from download
This change removes one additional metainfo.GetObject call during download. Change-Id: I86b4e5165f299069e3e18944fe79c697c6a514d3
This commit is contained in:
parent
27462f68e9
commit
ffd570eb04
@ -40,8 +40,6 @@ func (b *Bucket) OpenObject(ctx context.Context, path storj.Path) (o *Object, er
|
|||||||
|
|
||||||
return &Object{
|
return &Object{
|
||||||
Meta: ObjectMeta{
|
Meta: ObjectMeta{
|
||||||
bucketInfo: b.bucket,
|
|
||||||
|
|
||||||
Bucket: info.Bucket.Name,
|
Bucket: info.Bucket.Name,
|
||||||
Path: info.Path,
|
Path: info.Path,
|
||||||
IsPrefix: info.IsPrefix,
|
IsPrefix: info.IsPrefix,
|
||||||
@ -64,6 +62,8 @@ func (b *Bucket) OpenObject(ctx context.Context, path storj.Path) (o *Object, er
|
|||||||
},
|
},
|
||||||
metainfoDB: b.metainfo,
|
metainfoDB: b.metainfo,
|
||||||
streams: b.streams,
|
streams: b.streams,
|
||||||
|
object: info,
|
||||||
|
bucket: b.bucket,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -193,7 +193,12 @@ func (b *Bucket) NewReader(ctx context.Context, path storj.Path) (_ io.ReadClose
|
|||||||
func (b *Bucket) Download(ctx context.Context, path storj.Path) (_ io.ReadCloser, err error) {
|
func (b *Bucket) Download(ctx context.Context, path storj.Path) (_ io.ReadCloser, err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
segmentStream, err := b.metainfo.GetObjectStream(ctx, b.bucket, path)
|
object, err := b.metainfo.GetObject(ctx, b.bucket, path)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
segmentStream, err := b.metainfo.GetObjectStream(ctx, b.bucket, object)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -205,7 +210,12 @@ func (b *Bucket) Download(ctx context.Context, path storj.Path) (_ io.ReadCloser
|
|||||||
func (b *Bucket) DownloadRange(ctx context.Context, path storj.Path, start, limit int64) (_ io.ReadCloser, err error) {
|
func (b *Bucket) DownloadRange(ctx context.Context, path storj.Path, start, limit int64) (_ io.ReadCloser, err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
segmentStream, err := b.metainfo.GetObjectStream(ctx, b.bucket, path)
|
object, err := b.metainfo.GetObject(ctx, b.bucket, path)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
segmentStream, err := b.metainfo.GetObjectStream(ctx, b.bucket, object)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -63,9 +63,6 @@ type ObjectMeta struct {
|
|||||||
// Object's data storage.
|
// Object's data storage.
|
||||||
SegmentsSize int64
|
SegmentsSize int64
|
||||||
}
|
}
|
||||||
|
|
||||||
// full storj.Bucket object for internal use
|
|
||||||
bucketInfo storj.Bucket
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// An Object is a sequence of bytes with associated metadata, stored in the
|
// An Object is a sequence of bytes with associated metadata, stored in the
|
||||||
@ -78,13 +75,15 @@ type Object struct {
|
|||||||
|
|
||||||
metainfoDB *kvmetainfo.DB
|
metainfoDB *kvmetainfo.DB
|
||||||
streams streams.Store
|
streams streams.Store
|
||||||
|
bucket storj.Bucket
|
||||||
|
object storj.Object
|
||||||
}
|
}
|
||||||
|
|
||||||
// DownloadRange returns an Object's data. A length of -1 will mean (Object.Size - offset).
|
// DownloadRange returns an Object's data. A length of -1 will mean (Object.Size - offset).
|
||||||
func (o *Object) DownloadRange(ctx context.Context, offset, length int64) (_ io.ReadCloser, err error) {
|
func (o *Object) DownloadRange(ctx context.Context, offset, length int64) (_ io.ReadCloser, err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
segmentStream, err := o.metainfoDB.GetObjectStream(ctx, o.Meta.bucketInfo, o.Meta.Path)
|
segmentStream, err := o.metainfoDB.GetObjectStream(ctx, o.bucket, o.object)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -26,7 +26,7 @@ type Metainfo interface {
|
|||||||
// GetObject returns information about an object
|
// GetObject returns information about an object
|
||||||
GetObject(ctx context.Context, bucket Bucket, path Path) (Object, error)
|
GetObject(ctx context.Context, bucket Bucket, path Path) (Object, error)
|
||||||
// GetObjectStream returns interface for reading the object stream
|
// GetObjectStream returns interface for reading the object stream
|
||||||
GetObjectStream(ctx context.Context, bucket Bucket, path Path) (ReadOnlyStream, error)
|
GetObjectStream(ctx context.Context, bucket Bucket, object Object) (ReadOnlyStream, error)
|
||||||
|
|
||||||
// CreateObject creates a mutable object for uploading stream info
|
// CreateObject creates a mutable object for uploading stream info
|
||||||
CreateObject(ctx context.Context, bucket Bucket, path Path, info *CreateObject) (MutableObject, error)
|
CreateObject(ctx context.Context, bucket Bucket, path Path, info *CreateObject) (MutableObject, error)
|
||||||
|
@ -48,25 +48,20 @@ func (db *DB) GetObject(ctx context.Context, bucket storj.Bucket, path storj.Pat
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetObjectStream returns interface for reading the object stream
|
// GetObjectStream returns interface for reading the object stream
|
||||||
func (db *DB) GetObjectStream(ctx context.Context, bucket storj.Bucket, path storj.Path) (stream storj.ReadOnlyStream, err error) {
|
func (db *DB) GetObjectStream(ctx context.Context, bucket storj.Bucket, object storj.Object) (stream storj.ReadOnlyStream, err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
meta, info, err := db.getInfo(ctx, bucket, path)
|
if bucket.Name == "" {
|
||||||
if err != nil {
|
return nil, storj.ErrNoBucket.New("")
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
streamKey, err := encryption.DeriveContentKey(bucket.Name, meta.fullpath.UnencryptedPath(), db.encStore)
|
if object.Path == "" {
|
||||||
if err != nil {
|
return nil, storj.ErrNoPath.New("")
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return &readonlyStream{
|
return &readonlyStream{
|
||||||
db: db,
|
db: db,
|
||||||
info: info,
|
info: object,
|
||||||
bucket: meta.bucket,
|
|
||||||
encPath: meta.encPath.Raw(),
|
|
||||||
streamKey: streamKey,
|
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -254,7 +249,7 @@ func (db *DB) getInfo(ctx context.Context, bucket storj.Bucket, path storj.Path)
|
|||||||
return object{}, storj.Object{}, err
|
return object{}, storj.Object{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
info, err = objectStreamFromMeta(bucket, objectInfo.StreamID, path, lastSegmentMeta, streamInfo, streamMeta, redundancyScheme)
|
info, err = objectStreamFromMeta(bucket, path, objectInfo.StreamID, lastSegmentMeta, streamInfo, streamMeta, redundancyScheme)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return object{}, storj.Object{}, err
|
return object{}, storj.Object{}, err
|
||||||
}
|
}
|
||||||
@ -290,7 +285,7 @@ func objectFromMeta(bucket storj.Bucket, path storj.Path, isPrefix bool, meta ob
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func objectStreamFromMeta(bucket storj.Bucket, streamID storj.StreamID, path storj.Path, lastSegment segments.Meta, stream pb.StreamInfo, streamMeta pb.StreamMeta, redundancyScheme storj.RedundancyScheme) (storj.Object, error) {
|
func objectStreamFromMeta(bucket storj.Bucket, path storj.Path, streamID storj.StreamID, lastSegment segments.Meta, stream pb.StreamInfo, streamMeta pb.StreamMeta, redundancyScheme storj.RedundancyScheme) (storj.Object, error) {
|
||||||
var nonce storj.Nonce
|
var nonce storj.Nonce
|
||||||
var encryptedKey storj.EncryptedPrivateKey
|
var encryptedKey storj.EncryptedPrivateKey
|
||||||
if streamMeta.LastSegmentMeta != nil {
|
if streamMeta.LastSegmentMeta != nil {
|
||||||
|
@ -124,32 +124,37 @@ func TestGetObjectStream(t *testing.T) {
|
|||||||
bucket, err := db.CreateBucket(ctx, TestBucket, nil)
|
bucket, err := db.CreateBucket(ctx, TestBucket, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
upload(ctx, t, db, streams, bucket, "empty-file", nil)
|
emptyFile := upload(ctx, t, db, streams, bucket, "empty-file", nil)
|
||||||
upload(ctx, t, db, streams, bucket, "small-file", []byte("test"))
|
smallFile := upload(ctx, t, db, streams, bucket, "small-file", []byte("test"))
|
||||||
upload(ctx, t, db, streams, bucket, "large-file", data)
|
largeFile := upload(ctx, t, db, streams, bucket, "large-file", data)
|
||||||
|
|
||||||
emptyBucket := storj.Bucket{
|
emptyBucket := storj.Bucket{
|
||||||
PathCipher: storj.EncNull,
|
PathCipher: storj.EncNull,
|
||||||
}
|
}
|
||||||
_, err = db.GetObjectStream(ctx, emptyBucket, "")
|
_, err = db.GetObjectStream(ctx, emptyBucket, storj.Object{})
|
||||||
assert.True(t, storj.ErrNoBucket.Has(err))
|
assert.True(t, storj.ErrNoBucket.Has(err))
|
||||||
|
|
||||||
_, err = db.GetObjectStream(ctx, bucket, "")
|
_, err = db.GetObjectStream(ctx, bucket, storj.Object{})
|
||||||
assert.True(t, storj.ErrNoPath.Has(err))
|
assert.True(t, storj.ErrNoPath.Has(err))
|
||||||
|
|
||||||
nonExistingBucket := storj.Bucket{
|
nonExistingBucket := storj.Bucket{
|
||||||
Name: "non-existing-bucket",
|
Name: "non-existing-bucket",
|
||||||
PathCipher: storj.EncNull,
|
PathCipher: storj.EncNull,
|
||||||
}
|
}
|
||||||
_, err = db.GetObjectStream(ctx, nonExistingBucket, "small-file")
|
|
||||||
assert.True(t, storj.ErrObjectNotFound.Has(err))
|
|
||||||
|
|
||||||
_, err = db.GetObjectStream(ctx, bucket, "non-existing-file")
|
// no error because we are not doing satellite connection with this method
|
||||||
assert.True(t, storj.ErrObjectNotFound.Has(err))
|
_, err = db.GetObjectStream(ctx, nonExistingBucket, smallFile)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
assertStream(ctx, t, db, streams, bucket, "empty-file", []byte{})
|
// no error because we are not doing satellite connection with this method
|
||||||
assertStream(ctx, t, db, streams, bucket, "small-file", []byte("test"))
|
_, err = db.GetObjectStream(ctx, bucket, storj.Object{
|
||||||
assertStream(ctx, t, db, streams, bucket, "large-file", data)
|
Path: "non-existing-file",
|
||||||
|
})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
assertStream(ctx, t, db, streams, bucket, emptyFile, []byte{})
|
||||||
|
assertStream(ctx, t, db, streams, bucket, smallFile, []byte("test"))
|
||||||
|
assertStream(ctx, t, db, streams, bucket, largeFile, data)
|
||||||
|
|
||||||
/* TODO: Disable stopping due to flakiness.
|
/* TODO: Disable stopping due to flakiness.
|
||||||
// Stop randomly half of the storage nodes and remove them from satellite's overlay
|
// Stop randomly half of the storage nodes and remove them from satellite's overlay
|
||||||
@ -166,7 +171,7 @@ func TestGetObjectStream(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func upload(ctx context.Context, t *testing.T, db *kvmetainfo.DB, streams streams.Store, bucket storj.Bucket, path storj.Path, data []byte) {
|
func upload(ctx context.Context, t *testing.T, db *kvmetainfo.DB, streams streams.Store, bucket storj.Bucket, path storj.Path, data []byte) storj.Object {
|
||||||
obj, err := db.CreateObject(ctx, bucket, path, nil)
|
obj, err := db.CreateObject(ctx, bucket, path, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
@ -183,13 +188,15 @@ func upload(ctx context.Context, t *testing.T, db *kvmetainfo.DB, streams stream
|
|||||||
|
|
||||||
err = obj.Commit(ctx)
|
err = obj.Commit(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
return obj.Info()
|
||||||
}
|
}
|
||||||
|
|
||||||
func assertStream(ctx context.Context, t *testing.T, db *kvmetainfo.DB, streams streams.Store, bucket storj.Bucket, path storj.Path, content []byte) {
|
func assertStream(ctx context.Context, t *testing.T, db *kvmetainfo.DB, streams streams.Store, bucket storj.Bucket, object storj.Object, content []byte) {
|
||||||
readOnly, err := db.GetObjectStream(ctx, bucket, path)
|
readOnly, err := db.GetObjectStream(ctx, bucket, object)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
assert.Equal(t, path, readOnly.Info().Path)
|
assert.Equal(t, object.Path, readOnly.Info().Path)
|
||||||
assert.Equal(t, TestBucket, readOnly.Info().Bucket.Name)
|
assert.Equal(t, TestBucket, readOnly.Info().Bucket.Name)
|
||||||
assert.Equal(t, storj.EncAESGCM, readOnly.Info().Bucket.PathCipher)
|
assert.Equal(t, storj.EncAESGCM, readOnly.Info().Bucket.PathCipher)
|
||||||
|
|
||||||
@ -232,7 +239,6 @@ func assertInlineSegment(t *testing.T, segment storj.Segment, content []byte) {
|
|||||||
func assertRemoteSegment(t *testing.T, segment storj.Segment) {
|
func assertRemoteSegment(t *testing.T, segment storj.Segment) {
|
||||||
assert.Nil(t, segment.Inline)
|
assert.Nil(t, segment.Inline)
|
||||||
assert.NotNil(t, segment.PieceID)
|
assert.NotNil(t, segment.PieceID)
|
||||||
assert.NotEqual(t, 0, len(segment.Pieces))
|
|
||||||
|
|
||||||
// check that piece numbers and nodes are unique
|
// check that piece numbers and nodes are unique
|
||||||
nums := make(map[byte]struct{})
|
nums := make(map[byte]struct{})
|
||||||
|
@ -8,7 +8,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
"storj.io/storj/pkg/encryption"
|
"storj.io/storj/pkg/encryption"
|
||||||
"storj.io/storj/pkg/pb"
|
"storj.io/storj/pkg/paths"
|
||||||
"storj.io/storj/pkg/storj"
|
"storj.io/storj/pkg/storj"
|
||||||
"storj.io/storj/uplink/metainfo"
|
"storj.io/storj/uplink/metainfo"
|
||||||
)
|
)
|
||||||
@ -18,11 +18,7 @@ var _ storj.ReadOnlyStream = (*readonlyStream)(nil)
|
|||||||
type readonlyStream struct {
|
type readonlyStream struct {
|
||||||
db *DB
|
db *DB
|
||||||
|
|
||||||
id storj.StreamID
|
|
||||||
info storj.Object
|
info storj.Object
|
||||||
bucket string
|
|
||||||
encPath storj.Path
|
|
||||||
streamKey *storj.Key // lazySegmentReader derivedKey
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (stream *readonlyStream) Info() storj.Object { return stream.info }
|
func (stream *readonlyStream) Info() storj.Object { return stream.info }
|
||||||
@ -46,9 +42,11 @@ func (stream *readonlyStream) segment(ctx context.Context, index int64) (segment
|
|||||||
}
|
}
|
||||||
|
|
||||||
isLastSegment := segment.Index+1 == stream.info.SegmentCount
|
isLastSegment := segment.Index+1 == stream.info.SegmentCount
|
||||||
if !isLastSegment {
|
if isLastSegment {
|
||||||
info, _, err := stream.db.metainfo.DownloadSegment(ctx, metainfo.DownloadSegmentParams{
|
index = -1
|
||||||
StreamID: stream.id,
|
}
|
||||||
|
info, limits, err := stream.db.metainfo.DownloadSegment(ctx, metainfo.DownloadSegmentParams{
|
||||||
|
StreamID: stream.Info().ID,
|
||||||
Position: storj.SegmentPosition{
|
Position: storj.SegmentPosition{
|
||||||
Index: int32(index),
|
Index: int32(index),
|
||||||
},
|
},
|
||||||
@ -57,45 +55,32 @@ func (stream *readonlyStream) segment(ctx context.Context, index int64) (segment
|
|||||||
return segment, err
|
return segment, err
|
||||||
}
|
}
|
||||||
|
|
||||||
segment.Size = stream.info.FixedSegmentSize
|
segment.Size = stream.info.Size
|
||||||
segment.EncryptedKeyNonce = info.SegmentEncryption.EncryptedKeyNonce
|
segment.EncryptedKeyNonce = info.SegmentEncryption.EncryptedKeyNonce
|
||||||
segment.EncryptedKey = info.SegmentEncryption.EncryptedKey
|
segment.EncryptedKey = info.SegmentEncryption.EncryptedKey
|
||||||
} else {
|
|
||||||
segment.Size = stream.info.LastSegment.Size
|
streamKey, err := encryption.DeriveContentKey(stream.info.Bucket.Name, paths.NewUnencrypted(stream.info.Path), stream.db.encStore)
|
||||||
segment.EncryptedKeyNonce = stream.info.LastSegment.EncryptedKeyNonce
|
if err != nil {
|
||||||
segment.EncryptedKey = stream.info.LastSegment.EncryptedKey
|
return segment, err
|
||||||
}
|
}
|
||||||
|
|
||||||
contentKey, err := encryption.DecryptKey(segment.EncryptedKey, stream.Info().EncryptionParameters.CipherSuite, stream.streamKey, &segment.EncryptedKeyNonce)
|
contentKey, err := encryption.DecryptKey(segment.EncryptedKey, stream.info.EncryptionParameters.CipherSuite, streamKey, &segment.EncryptedKeyNonce)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return segment, err
|
return segment, err
|
||||||
}
|
}
|
||||||
|
|
||||||
nonce := new(storj.Nonce)
|
nonce := new(storj.Nonce)
|
||||||
_, err = encryption.Increment(nonce, index+1)
|
_, err = encryption.Increment(nonce, segment.Index+1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return segment, err
|
return segment, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if isLastSegment {
|
if len(info.EncryptedInlineData) != 0 || len(limits) == 0 {
|
||||||
index = -1
|
inline, err := encryption.Decrypt(info.EncryptedInlineData, stream.info.EncryptionParameters.CipherSuite, contentKey, nonce)
|
||||||
}
|
|
||||||
|
|
||||||
pointer, err := stream.db.metainfo.SegmentInfoOld(ctx, stream.bucket, stream.encPath, index)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return segment, err
|
return segment, err
|
||||||
}
|
}
|
||||||
|
segment.Inline = inline
|
||||||
if pointer.GetType() == pb.Pointer_INLINE {
|
|
||||||
segment.Inline, err = encryption.Decrypt(pointer.InlineSegment, stream.info.EncryptionParameters.CipherSuite, contentKey, nonce)
|
|
||||||
} else {
|
|
||||||
segment.PieceID = pointer.Remote.RootPieceId
|
|
||||||
segment.Pieces = make([]storj.Piece, 0, len(pointer.Remote.RemotePieces))
|
|
||||||
for _, piece := range pointer.Remote.RemotePieces {
|
|
||||||
var nodeID storj.NodeID
|
|
||||||
copy(nodeID[:], piece.NodeId.Bytes())
|
|
||||||
segment.Pieces = append(segment.Pieces, storj.Piece{Number: byte(piece.PieceNum), Location: nodeID})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return segment, nil
|
return segment, nil
|
||||||
|
Loading…
Reference in New Issue
Block a user