diff --git a/lib/uplink/bucket.go b/lib/uplink/bucket.go index 957a6b914..381cbfbb0 100644 --- a/lib/uplink/bucket.go +++ b/lib/uplink/bucket.go @@ -40,8 +40,6 @@ func (b *Bucket) OpenObject(ctx context.Context, path storj.Path) (o *Object, er return &Object{ Meta: ObjectMeta{ - bucketInfo: b.bucket, - Bucket: info.Bucket.Name, Path: info.Path, IsPrefix: info.IsPrefix, @@ -64,6 +62,8 @@ func (b *Bucket) OpenObject(ctx context.Context, path storj.Path) (o *Object, er }, metainfoDB: b.metainfo, streams: b.streams, + object: info, + bucket: b.bucket, }, nil } @@ -193,7 +193,12 @@ func (b *Bucket) NewReader(ctx context.Context, path storj.Path) (_ io.ReadClose func (b *Bucket) Download(ctx context.Context, path storj.Path) (_ io.ReadCloser, err error) { defer mon.Task()(&ctx)(&err) - segmentStream, err := b.metainfo.GetObjectStream(ctx, b.bucket, path) + object, err := b.metainfo.GetObject(ctx, b.bucket, path) + if err != nil { + return nil, err + } + + segmentStream, err := b.metainfo.GetObjectStream(ctx, b.bucket, object) if err != nil { return nil, err } @@ -205,7 +210,12 @@ func (b *Bucket) Download(ctx context.Context, path storj.Path) (_ io.ReadCloser func (b *Bucket) DownloadRange(ctx context.Context, path storj.Path, start, limit int64) (_ io.ReadCloser, err error) { defer mon.Task()(&ctx)(&err) - segmentStream, err := b.metainfo.GetObjectStream(ctx, b.bucket, path) + object, err := b.metainfo.GetObject(ctx, b.bucket, path) + if err != nil { + return nil, err + } + + segmentStream, err := b.metainfo.GetObjectStream(ctx, b.bucket, object) if err != nil { return nil, err } diff --git a/lib/uplink/object.go b/lib/uplink/object.go index 7e7b6e7e9..8d4c7a336 100644 --- a/lib/uplink/object.go +++ b/lib/uplink/object.go @@ -63,9 +63,6 @@ type ObjectMeta struct { // Object's data storage. SegmentsSize int64 } - - // full storj.Bucket object for internal use - bucketInfo storj.Bucket } // An Object is a sequence of bytes with associated metadata, stored in the @@ -78,13 +75,15 @@ type Object struct { metainfoDB *kvmetainfo.DB streams streams.Store + bucket storj.Bucket + object storj.Object } // DownloadRange returns an Object's data. A length of -1 will mean (Object.Size - offset). func (o *Object) DownloadRange(ctx context.Context, offset, length int64) (_ io.ReadCloser, err error) { defer mon.Task()(&ctx)(&err) - segmentStream, err := o.metainfoDB.GetObjectStream(ctx, o.Meta.bucketInfo, o.Meta.Path) + segmentStream, err := o.metainfoDB.GetObjectStream(ctx, o.bucket, o.object) if err != nil { return nil, err } diff --git a/pkg/storj/metainfo.go b/pkg/storj/metainfo.go index 6bf800455..56e9d39d3 100644 --- a/pkg/storj/metainfo.go +++ b/pkg/storj/metainfo.go @@ -26,7 +26,7 @@ type Metainfo interface { // GetObject returns information about an object GetObject(ctx context.Context, bucket Bucket, path Path) (Object, error) // GetObjectStream returns interface for reading the object stream - GetObjectStream(ctx context.Context, bucket Bucket, path Path) (ReadOnlyStream, error) + GetObjectStream(ctx context.Context, bucket Bucket, object Object) (ReadOnlyStream, error) // CreateObject creates a mutable object for uploading stream info CreateObject(ctx context.Context, bucket Bucket, path Path, info *CreateObject) (MutableObject, error) diff --git a/uplink/metainfo/kvmetainfo/objects.go b/uplink/metainfo/kvmetainfo/objects.go index e648a7490..fefe63874 100644 --- a/uplink/metainfo/kvmetainfo/objects.go +++ b/uplink/metainfo/kvmetainfo/objects.go @@ -48,25 +48,20 @@ func (db *DB) GetObject(ctx context.Context, bucket storj.Bucket, path storj.Pat } // GetObjectStream returns interface for reading the object stream -func (db *DB) GetObjectStream(ctx context.Context, bucket storj.Bucket, path storj.Path) (stream storj.ReadOnlyStream, err error) { +func (db *DB) GetObjectStream(ctx context.Context, bucket storj.Bucket, object storj.Object) (stream storj.ReadOnlyStream, err error) { defer mon.Task()(&ctx)(&err) - meta, info, err := db.getInfo(ctx, bucket, path) - if err != nil { - return nil, err + if bucket.Name == "" { + return nil, storj.ErrNoBucket.New("") } - streamKey, err := encryption.DeriveContentKey(bucket.Name, meta.fullpath.UnencryptedPath(), db.encStore) - if err != nil { - return nil, err + if object.Path == "" { + return nil, storj.ErrNoPath.New("") } return &readonlyStream{ - db: db, - info: info, - bucket: meta.bucket, - encPath: meta.encPath.Raw(), - streamKey: streamKey, + db: db, + info: object, }, nil } @@ -254,7 +249,7 @@ func (db *DB) getInfo(ctx context.Context, bucket storj.Bucket, path storj.Path) return object{}, storj.Object{}, err } - info, err = objectStreamFromMeta(bucket, objectInfo.StreamID, path, lastSegmentMeta, streamInfo, streamMeta, redundancyScheme) + info, err = objectStreamFromMeta(bucket, path, objectInfo.StreamID, lastSegmentMeta, streamInfo, streamMeta, redundancyScheme) if err != nil { return object{}, storj.Object{}, err } @@ -290,7 +285,7 @@ func objectFromMeta(bucket storj.Bucket, path storj.Path, isPrefix bool, meta ob } } -func objectStreamFromMeta(bucket storj.Bucket, streamID storj.StreamID, path storj.Path, lastSegment segments.Meta, stream pb.StreamInfo, streamMeta pb.StreamMeta, redundancyScheme storj.RedundancyScheme) (storj.Object, error) { +func objectStreamFromMeta(bucket storj.Bucket, path storj.Path, streamID storj.StreamID, lastSegment segments.Meta, stream pb.StreamInfo, streamMeta pb.StreamMeta, redundancyScheme storj.RedundancyScheme) (storj.Object, error) { var nonce storj.Nonce var encryptedKey storj.EncryptedPrivateKey if streamMeta.LastSegmentMeta != nil { diff --git a/uplink/metainfo/kvmetainfo/objects_test.go b/uplink/metainfo/kvmetainfo/objects_test.go index 9f5cad101..a3723adea 100644 --- a/uplink/metainfo/kvmetainfo/objects_test.go +++ b/uplink/metainfo/kvmetainfo/objects_test.go @@ -124,32 +124,37 @@ func TestGetObjectStream(t *testing.T) { bucket, err := db.CreateBucket(ctx, TestBucket, nil) require.NoError(t, err) - upload(ctx, t, db, streams, bucket, "empty-file", nil) - upload(ctx, t, db, streams, bucket, "small-file", []byte("test")) - upload(ctx, t, db, streams, bucket, "large-file", data) + emptyFile := upload(ctx, t, db, streams, bucket, "empty-file", nil) + smallFile := upload(ctx, t, db, streams, bucket, "small-file", []byte("test")) + largeFile := upload(ctx, t, db, streams, bucket, "large-file", data) emptyBucket := storj.Bucket{ PathCipher: storj.EncNull, } - _, err = db.GetObjectStream(ctx, emptyBucket, "") + _, err = db.GetObjectStream(ctx, emptyBucket, storj.Object{}) assert.True(t, storj.ErrNoBucket.Has(err)) - _, err = db.GetObjectStream(ctx, bucket, "") + _, err = db.GetObjectStream(ctx, bucket, storj.Object{}) assert.True(t, storj.ErrNoPath.Has(err)) nonExistingBucket := storj.Bucket{ Name: "non-existing-bucket", PathCipher: storj.EncNull, } - _, err = db.GetObjectStream(ctx, nonExistingBucket, "small-file") - assert.True(t, storj.ErrObjectNotFound.Has(err)) - _, err = db.GetObjectStream(ctx, bucket, "non-existing-file") - assert.True(t, storj.ErrObjectNotFound.Has(err)) + // no error because we are not doing satellite connection with this method + _, err = db.GetObjectStream(ctx, nonExistingBucket, smallFile) + assert.NoError(t, err) - assertStream(ctx, t, db, streams, bucket, "empty-file", []byte{}) - assertStream(ctx, t, db, streams, bucket, "small-file", []byte("test")) - assertStream(ctx, t, db, streams, bucket, "large-file", data) + // no error because we are not doing satellite connection with this method + _, err = db.GetObjectStream(ctx, bucket, storj.Object{ + Path: "non-existing-file", + }) + assert.NoError(t, err) + + assertStream(ctx, t, db, streams, bucket, emptyFile, []byte{}) + assertStream(ctx, t, db, streams, bucket, smallFile, []byte("test")) + assertStream(ctx, t, db, streams, bucket, largeFile, data) /* TODO: Disable stopping due to flakiness. // Stop randomly half of the storage nodes and remove them from satellite's overlay @@ -166,7 +171,7 @@ func TestGetObjectStream(t *testing.T) { }) } -func upload(ctx context.Context, t *testing.T, db *kvmetainfo.DB, streams streams.Store, bucket storj.Bucket, path storj.Path, data []byte) { +func upload(ctx context.Context, t *testing.T, db *kvmetainfo.DB, streams streams.Store, bucket storj.Bucket, path storj.Path, data []byte) storj.Object { obj, err := db.CreateObject(ctx, bucket, path, nil) require.NoError(t, err) @@ -183,13 +188,15 @@ func upload(ctx context.Context, t *testing.T, db *kvmetainfo.DB, streams stream err = obj.Commit(ctx) require.NoError(t, err) + + return obj.Info() } -func assertStream(ctx context.Context, t *testing.T, db *kvmetainfo.DB, streams streams.Store, bucket storj.Bucket, path storj.Path, content []byte) { - readOnly, err := db.GetObjectStream(ctx, bucket, path) +func assertStream(ctx context.Context, t *testing.T, db *kvmetainfo.DB, streams streams.Store, bucket storj.Bucket, object storj.Object, content []byte) { + readOnly, err := db.GetObjectStream(ctx, bucket, object) require.NoError(t, err) - assert.Equal(t, path, readOnly.Info().Path) + assert.Equal(t, object.Path, readOnly.Info().Path) assert.Equal(t, TestBucket, readOnly.Info().Bucket.Name) assert.Equal(t, storj.EncAESGCM, readOnly.Info().Bucket.PathCipher) @@ -232,7 +239,6 @@ func assertInlineSegment(t *testing.T, segment storj.Segment, content []byte) { func assertRemoteSegment(t *testing.T, segment storj.Segment) { assert.Nil(t, segment.Inline) assert.NotNil(t, segment.PieceID) - assert.NotEqual(t, 0, len(segment.Pieces)) // check that piece numbers and nodes are unique nums := make(map[byte]struct{}) diff --git a/uplink/metainfo/kvmetainfo/stream.go b/uplink/metainfo/kvmetainfo/stream.go index be5e5acbb..fdca0e7e0 100644 --- a/uplink/metainfo/kvmetainfo/stream.go +++ b/uplink/metainfo/kvmetainfo/stream.go @@ -8,7 +8,7 @@ import ( "errors" "storj.io/storj/pkg/encryption" - "storj.io/storj/pkg/pb" + "storj.io/storj/pkg/paths" "storj.io/storj/pkg/storj" "storj.io/storj/uplink/metainfo" ) @@ -18,11 +18,7 @@ var _ storj.ReadOnlyStream = (*readonlyStream)(nil) type readonlyStream struct { db *DB - id storj.StreamID - info storj.Object - bucket string - encPath storj.Path - streamKey *storj.Key // lazySegmentReader derivedKey + info storj.Object } func (stream *readonlyStream) Info() storj.Object { return stream.info } @@ -46,56 +42,45 @@ func (stream *readonlyStream) segment(ctx context.Context, index int64) (segment } isLastSegment := segment.Index+1 == stream.info.SegmentCount - if !isLastSegment { - info, _, err := stream.db.metainfo.DownloadSegment(ctx, metainfo.DownloadSegmentParams{ - StreamID: stream.id, - Position: storj.SegmentPosition{ - Index: int32(index), - }, - }) - if err != nil { - return segment, err - } - - segment.Size = stream.info.FixedSegmentSize - segment.EncryptedKeyNonce = info.SegmentEncryption.EncryptedKeyNonce - segment.EncryptedKey = info.SegmentEncryption.EncryptedKey - } else { - segment.Size = stream.info.LastSegment.Size - segment.EncryptedKeyNonce = stream.info.LastSegment.EncryptedKeyNonce - segment.EncryptedKey = stream.info.LastSegment.EncryptedKey + if isLastSegment { + index = -1 + } + info, limits, err := stream.db.metainfo.DownloadSegment(ctx, metainfo.DownloadSegmentParams{ + StreamID: stream.Info().ID, + Position: storj.SegmentPosition{ + Index: int32(index), + }, + }) + if err != nil { + return segment, err } - contentKey, err := encryption.DecryptKey(segment.EncryptedKey, stream.Info().EncryptionParameters.CipherSuite, stream.streamKey, &segment.EncryptedKeyNonce) + segment.Size = stream.info.Size + segment.EncryptedKeyNonce = info.SegmentEncryption.EncryptedKeyNonce + segment.EncryptedKey = info.SegmentEncryption.EncryptedKey + + streamKey, err := encryption.DeriveContentKey(stream.info.Bucket.Name, paths.NewUnencrypted(stream.info.Path), stream.db.encStore) + if err != nil { + return segment, err + } + + contentKey, err := encryption.DecryptKey(segment.EncryptedKey, stream.info.EncryptionParameters.CipherSuite, streamKey, &segment.EncryptedKeyNonce) if err != nil { return segment, err } nonce := new(storj.Nonce) - _, err = encryption.Increment(nonce, index+1) + _, err = encryption.Increment(nonce, segment.Index+1) if err != nil { return segment, err } - if isLastSegment { - index = -1 - } - - pointer, err := stream.db.metainfo.SegmentInfoOld(ctx, stream.bucket, stream.encPath, index) - if err != nil { - return segment, err - } - - if pointer.GetType() == pb.Pointer_INLINE { - segment.Inline, err = encryption.Decrypt(pointer.InlineSegment, stream.info.EncryptionParameters.CipherSuite, contentKey, nonce) - } else { - segment.PieceID = pointer.Remote.RootPieceId - segment.Pieces = make([]storj.Piece, 0, len(pointer.Remote.RemotePieces)) - for _, piece := range pointer.Remote.RemotePieces { - var nodeID storj.NodeID - copy(nodeID[:], piece.NodeId.Bytes()) - segment.Pieces = append(segment.Pieces, storj.Piece{Number: byte(piece.PieceNum), Location: nodeID}) + if len(info.EncryptedInlineData) != 0 || len(limits) == 0 { + inline, err := encryption.Decrypt(info.EncryptedInlineData, stream.info.EncryptionParameters.CipherSuite, contentKey, nonce) + if err != nil { + return segment, err } + segment.Inline = inline } return segment, nil