uplink/metainfo: remove GetObject from download Batch (#3596)
This commit is contained in:
parent
24318d74b3
commit
5964502ce0
@ -55,6 +55,8 @@ type ObjectInfo struct {
|
||||
|
||||
// Stream is information about an object stream
|
||||
type Stream struct {
|
||||
ID StreamID
|
||||
|
||||
// Size is the total size of the stream in bytes
|
||||
Size int64
|
||||
// Checksum is the checksum of the segment checksums
|
||||
|
@ -1010,6 +1010,9 @@ func (client *Client) DownloadSegment(ctx context.Context, params DownloadSegmen
|
||||
|
||||
response, err := client.client.DownloadSegment(ctx, params.toRequest(client.header()))
|
||||
if err != nil {
|
||||
if errs2.IsRPC(err, rpcstatus.NotFound) {
|
||||
return storj.SegmentDownloadInfo{}, nil, storj.ErrObjectNotFound.Wrap(err)
|
||||
}
|
||||
return storj.SegmentDownloadInfo{}, nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
|
@ -254,7 +254,7 @@ func (db *DB) getInfo(ctx context.Context, bucket storj.Bucket, path storj.Path)
|
||||
return object{}, storj.Object{}, err
|
||||
}
|
||||
|
||||
info, err = objectStreamFromMeta(bucket, path, lastSegmentMeta, streamInfo, streamMeta, redundancyScheme)
|
||||
info, err = objectStreamFromMeta(bucket, objectInfo.StreamID, path, lastSegmentMeta, streamInfo, streamMeta, redundancyScheme)
|
||||
if err != nil {
|
||||
return object{}, storj.Object{}, err
|
||||
}
|
||||
@ -290,7 +290,7 @@ func objectFromMeta(bucket storj.Bucket, path storj.Path, isPrefix bool, meta ob
|
||||
}
|
||||
}
|
||||
|
||||
func objectStreamFromMeta(bucket storj.Bucket, path storj.Path, lastSegment segments.Meta, stream pb.StreamInfo, streamMeta pb.StreamMeta, redundancyScheme storj.RedundancyScheme) (storj.Object, error) {
|
||||
func objectStreamFromMeta(bucket storj.Bucket, streamID storj.StreamID, path storj.Path, lastSegment segments.Meta, stream pb.StreamInfo, streamMeta pb.StreamMeta, redundancyScheme storj.RedundancyScheme) (storj.Object, error) {
|
||||
var nonce storj.Nonce
|
||||
var encryptedKey storj.EncryptedPrivateKey
|
||||
if streamMeta.LastSegmentMeta != nil {
|
||||
@ -323,6 +323,7 @@ func objectStreamFromMeta(bucket storj.Bucket, path storj.Path, lastSegment segm
|
||||
Expires: lastSegment.Expiration, // TODO: use correct field
|
||||
|
||||
Stream: storj.Stream{
|
||||
ID: streamID,
|
||||
Size: stream.SegmentsSize*(numberOfSegments-1) + stream.LastSegmentSize,
|
||||
// Checksum: []byte(object.Checksum),
|
||||
|
||||
|
@ -29,14 +29,14 @@ func (o *prefixedObjStore) Meta(ctx context.Context, path storj.Path) (meta obje
|
||||
return o.store.Meta(ctx, storj.JoinPaths(o.prefix, path))
|
||||
}
|
||||
|
||||
func (o *prefixedObjStore) Get(ctx context.Context, path storj.Path) (rr ranger.Ranger, meta objects.Meta, err error) {
|
||||
func (o *prefixedObjStore) Get(ctx context.Context, path storj.Path, object storj.Object) (rr ranger.Ranger, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if len(path) == 0 {
|
||||
return nil, objects.Meta{}, storj.ErrNoPath.New("")
|
||||
return nil, storj.ErrNoPath.New("")
|
||||
}
|
||||
|
||||
return o.store.Get(ctx, storj.JoinPaths(o.prefix, path))
|
||||
return o.store.Get(ctx, storj.JoinPaths(o.prefix, path), object)
|
||||
}
|
||||
|
||||
func (o *prefixedObjStore) Put(ctx context.Context, path storj.Path, data io.Reader, metadata pb.SerializableMeta, expiration time.Time) (meta objects.Meta, err error) {
|
||||
|
@ -40,7 +40,7 @@ type ListItem struct {
|
||||
// Store for objects
|
||||
type Store interface {
|
||||
Meta(ctx context.Context, path storj.Path) (meta Meta, err error)
|
||||
Get(ctx context.Context, path storj.Path) (rr ranger.Ranger, meta Meta, err error)
|
||||
Get(ctx context.Context, path storj.Path, object storj.Object) (rr ranger.Ranger, err error)
|
||||
Put(ctx context.Context, path storj.Path, data io.Reader, metadata pb.SerializableMeta, expiration time.Time) (meta Meta, err error)
|
||||
Delete(ctx context.Context, path storj.Path) (err error)
|
||||
List(ctx context.Context, prefix, startAfter storj.Path, recursive bool, limit int, metaFlags uint32) (items []ListItem, more bool, err error)
|
||||
@ -72,21 +72,20 @@ func (o *objStore) Meta(ctx context.Context, path storj.Path) (meta Meta, err er
|
||||
return convertMeta(m), err
|
||||
}
|
||||
|
||||
func (o *objStore) Get(ctx context.Context, path storj.Path) (
|
||||
rr ranger.Ranger, meta Meta, err error) {
|
||||
func (o *objStore) Get(ctx context.Context, path storj.Path, object storj.Object) (
|
||||
rr ranger.Ranger, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if len(path) == 0 {
|
||||
return nil, Meta{}, storj.ErrNoPath.New("")
|
||||
return nil, storj.ErrNoPath.New("")
|
||||
}
|
||||
|
||||
rr, m, err := o.store.Get(ctx, path, o.pathCipher)
|
||||
|
||||
rr, err = o.store.Get(ctx, path, object, o.pathCipher)
|
||||
if storage.ErrKeyNotFound.Has(err) {
|
||||
err = storj.ErrObjectNotFound.Wrap(err)
|
||||
}
|
||||
|
||||
return rr, convertMeta(m), err
|
||||
return rr, err
|
||||
}
|
||||
|
||||
func (o *objStore) Put(ctx context.Context, path storj.Path, data io.Reader, metadata pb.SerializableMeta, expiration time.Time) (meta Meta, err error) {
|
||||
|
@ -18,7 +18,7 @@ import (
|
||||
// Store interface methods for streams to satisfy to be a store
|
||||
type Store interface {
|
||||
Meta(ctx context.Context, path storj.Path, pathCipher storj.CipherSuite) (Meta, error)
|
||||
Get(ctx context.Context, path storj.Path, pathCipher storj.CipherSuite) (ranger.Ranger, Meta, error)
|
||||
Get(ctx context.Context, path storj.Path, object storj.Object, pathCipher storj.CipherSuite) (ranger.Ranger, error)
|
||||
Put(ctx context.Context, path storj.Path, pathCipher storj.CipherSuite, data io.Reader, metadata []byte, expiration time.Time) (Meta, error)
|
||||
Delete(ctx context.Context, path storj.Path, pathCipher storj.CipherSuite) error
|
||||
List(ctx context.Context, prefix, startAfter storj.Path, pathCipher storj.CipherSuite, recursive bool, limit int, metaFlags uint32) (items []ListItem, more bool, err error)
|
||||
@ -45,10 +45,10 @@ func (s *shimStore) Meta(ctx context.Context, path storj.Path, pathCipher storj.
|
||||
}
|
||||
|
||||
// Get parses the passed in path and dispatches to the typed store.
|
||||
func (s *shimStore) Get(ctx context.Context, path storj.Path, pathCipher storj.CipherSuite) (_ ranger.Ranger, _ Meta, err error) {
|
||||
func (s *shimStore) Get(ctx context.Context, path storj.Path, object storj.Object, pathCipher storj.CipherSuite) (_ ranger.Ranger, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
return s.store.Get(ctx, ParsePath(path), pathCipher)
|
||||
return s.store.Get(ctx, ParsePath(path), object, pathCipher)
|
||||
}
|
||||
|
||||
// Put parses the passed in path and dispatches to the typed store.
|
||||
|
@ -56,7 +56,7 @@ func convertMeta(modified, expiration time.Time, stream pb.StreamInfo, streamMet
|
||||
// Store interface methods for streams to satisfy to be a store
|
||||
type typedStore interface {
|
||||
Meta(ctx context.Context, path Path, pathCipher storj.CipherSuite) (Meta, error)
|
||||
Get(ctx context.Context, path Path, pathCipher storj.CipherSuite) (ranger.Ranger, Meta, error)
|
||||
Get(ctx context.Context, path Path, object storj.Object, pathCipher storj.CipherSuite) (ranger.Ranger, error)
|
||||
Put(ctx context.Context, path Path, pathCipher storj.CipherSuite, data io.Reader, metadata []byte, expiration time.Time) (Meta, error)
|
||||
Delete(ctx context.Context, path Path, pathCipher storj.CipherSuite) error
|
||||
List(ctx context.Context, prefix Path, startAfter string, pathCipher storj.CipherSuite, recursive bool, limit int, metaFlags uint32) (items []ListItem, more bool, err error)
|
||||
@ -380,128 +380,74 @@ func (s *streamStore) upload(ctx context.Context, path Path, pathCipher storj.Ci
|
||||
// Get returns a ranger that knows what the overall size is (from l/<path>)
|
||||
// and then returns the appropriate data from segments s0/<path>, s1/<path>,
|
||||
// ..., l/<path>.
|
||||
func (s *streamStore) Get(ctx context.Context, path Path, pathCipher storj.CipherSuite) (rr ranger.Ranger, meta Meta, err error) {
|
||||
func (s *streamStore) Get(ctx context.Context, path Path, object storj.Object, pathCipher storj.CipherSuite) (rr ranger.Ranger, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
encPath, err := encryption.EncryptPath(path.Bucket(), path.UnencryptedPath(), pathCipher, s.encStore)
|
||||
if err != nil {
|
||||
return nil, Meta{}, err
|
||||
}
|
||||
|
||||
resps, err := s.metainfo.Batch(ctx,
|
||||
&metainfo.GetObjectParams{
|
||||
Bucket: []byte(path.Bucket()),
|
||||
EncryptedPath: []byte(encPath.Raw()),
|
||||
info, limits, err := s.metainfo.DownloadSegment(ctx, metainfo.DownloadSegmentParams{
|
||||
StreamID: object.ID,
|
||||
Position: storj.SegmentPosition{
|
||||
Index: -1, // Request the last segment
|
||||
},
|
||||
&metainfo.DownloadSegmentParams{
|
||||
Position: storj.SegmentPosition{
|
||||
Index: -1, // Request the last segment
|
||||
},
|
||||
},
|
||||
)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, Meta{}, err
|
||||
}
|
||||
|
||||
if len(resps) != 2 {
|
||||
return nil, Meta{}, errs.New(
|
||||
"metainfo.Batch request returned an unexpected number of responses. Want: 2, got: %d", len(resps),
|
||||
)
|
||||
}
|
||||
|
||||
var object storj.ObjectInfo
|
||||
{
|
||||
resp, err := resps[0].GetObject()
|
||||
if err != nil {
|
||||
return nil, Meta{}, err
|
||||
}
|
||||
|
||||
object = resp.Info
|
||||
}
|
||||
|
||||
var (
|
||||
info storj.SegmentDownloadInfo
|
||||
limits []*pb.AddressedOrderLimit
|
||||
)
|
||||
{
|
||||
resp, err := resps[1].DownloadSegment()
|
||||
if err != nil {
|
||||
return nil, Meta{}, err
|
||||
}
|
||||
|
||||
info = resp.Info
|
||||
limits = resp.Limits
|
||||
return nil, err
|
||||
}
|
||||
|
||||
lastSegmentRanger, err := s.segments.Ranger(ctx, info, limits, object.RedundancyScheme)
|
||||
if err != nil {
|
||||
return nil, Meta{}, err
|
||||
}
|
||||
|
||||
streamInfo, streamMeta, err := TypedDecryptStreamInfo(ctx, object.Metadata, path, s.encStore)
|
||||
if err != nil {
|
||||
return nil, Meta{}, err
|
||||
}
|
||||
|
||||
stream := pb.StreamInfo{}
|
||||
err = proto.Unmarshal(streamInfo, &stream)
|
||||
if err != nil {
|
||||
return nil, Meta{}, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
derivedKey, err := encryption.DeriveContentKey(path.Bucket(), path.UnencryptedPath(), s.encStore)
|
||||
if err != nil {
|
||||
return nil, Meta{}, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var rangers []ranger.Ranger
|
||||
for i := int64(0); i < numberOfSegments(&stream, &streamMeta)-1; i++ {
|
||||
for i := int64(0); i < object.SegmentCount-1; i++ {
|
||||
var contentNonce storj.Nonce
|
||||
_, err = encryption.Increment(&contentNonce, i+1)
|
||||
if err != nil {
|
||||
return nil, Meta{}, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rangers = append(rangers, &lazySegmentRanger{
|
||||
metainfo: s.metainfo,
|
||||
segments: s.segments,
|
||||
streamID: object.StreamID,
|
||||
streamID: object.ID,
|
||||
segmentIndex: int32(i),
|
||||
rs: object.RedundancyScheme,
|
||||
size: stream.SegmentsSize,
|
||||
size: object.FixedSegmentSize,
|
||||
derivedKey: derivedKey,
|
||||
startingNonce: &contentNonce,
|
||||
encBlockSize: int(streamMeta.EncryptionBlockSize),
|
||||
cipher: storj.CipherSuite(streamMeta.EncryptionType),
|
||||
encBlockSize: int(object.EncryptionParameters.BlockSize),
|
||||
cipher: object.CipherSuite,
|
||||
})
|
||||
}
|
||||
|
||||
var contentNonce storj.Nonce
|
||||
_, err = encryption.Increment(&contentNonce, numberOfSegments(&stream, &streamMeta))
|
||||
_, err = encryption.Increment(&contentNonce, object.SegmentCount)
|
||||
if err != nil {
|
||||
return nil, Meta{}, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
encryptedKey, keyNonce := getEncryptedKeyAndNonce(streamMeta.LastSegmentMeta)
|
||||
decryptedLastSegmentRanger, err := decryptRanger(
|
||||
ctx,
|
||||
lastSegmentRanger,
|
||||
stream.LastSegmentSize,
|
||||
storj.CipherSuite(streamMeta.EncryptionType),
|
||||
object.LastSegment.Size,
|
||||
object.CipherSuite,
|
||||
derivedKey,
|
||||
encryptedKey,
|
||||
keyNonce,
|
||||
info.SegmentEncryption.EncryptedKey,
|
||||
&info.SegmentEncryption.EncryptedKeyNonce,
|
||||
&contentNonce,
|
||||
int(streamMeta.EncryptionBlockSize),
|
||||
int(object.EncryptionParameters.BlockSize),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, Meta{}, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rangers = append(rangers, decryptedLastSegmentRanger)
|
||||
catRangers := ranger.Concat(rangers...)
|
||||
meta = convertMeta(object.Modified, object.Expires, stream, streamMeta)
|
||||
return catRangers, meta, nil
|
||||
return ranger.Concat(rangers...), nil
|
||||
}
|
||||
|
||||
// Meta implements Store.Meta
|
||||
|
@ -6,7 +6,6 @@ package streams_test
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io/ioutil"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -33,81 +32,6 @@ const (
|
||||
TestEncKey = "test-encryption-key"
|
||||
)
|
||||
|
||||
func TestStreamsStorePutGet(t *testing.T) {
|
||||
runTest(t, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, streamStore streams.Store) {
|
||||
bucketName := "bucket-name"
|
||||
err := planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], bucketName)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, tt := range []struct {
|
||||
name string
|
||||
path string
|
||||
metadata []byte
|
||||
expiration time.Time
|
||||
content []byte
|
||||
}{
|
||||
{"test inline put/get", "path/1", []byte("inline-metadata"), time.Time{}, testrand.Bytes(2 * memory.KiB)},
|
||||
{"test remote put/get", "mypath/1", []byte("remote-metadata"), time.Time{}, testrand.Bytes(100 * memory.KiB)},
|
||||
} {
|
||||
test := tt
|
||||
|
||||
path := storj.JoinPaths(bucketName, test.path)
|
||||
_, err = streamStore.Put(ctx, path, storj.EncNull, bytes.NewReader(test.content), test.metadata, test.expiration)
|
||||
require.NoError(t, err, test.name)
|
||||
|
||||
rr, metadata, err := streamStore.Get(ctx, path, storj.EncNull)
|
||||
require.NoError(t, err, test.name)
|
||||
require.Equal(t, test.metadata, metadata.Data)
|
||||
|
||||
reader, err := rr.Range(ctx, 0, rr.Size())
|
||||
require.NoError(t, err, test.name)
|
||||
content, err := ioutil.ReadAll(reader)
|
||||
require.NoError(t, err, test.name)
|
||||
require.Equal(t, test.content, content)
|
||||
|
||||
require.NoError(t, reader.Close(), test.name)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestStreamsStoreDelete(t *testing.T) {
|
||||
runTest(t, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, streamStore streams.Store) {
|
||||
bucketName := "bucket-name"
|
||||
err := planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], bucketName)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, tt := range []struct {
|
||||
name string
|
||||
path string
|
||||
metadata []byte
|
||||
expiration time.Time
|
||||
content []byte
|
||||
}{
|
||||
{"test inline delete", "path/1", []byte("inline-metadata"), time.Time{}, testrand.Bytes(2 * memory.KiB)},
|
||||
{"test remote delete", "mypath/1", []byte("remote-metadata"), time.Time{}, testrand.Bytes(100 * memory.KiB)},
|
||||
} {
|
||||
test := tt
|
||||
|
||||
path := storj.JoinPaths(bucketName, test.path)
|
||||
_, err = streamStore.Put(ctx, path, storj.EncNull, bytes.NewReader(test.content), test.metadata, test.expiration)
|
||||
require.NoError(t, err, test.name)
|
||||
|
||||
// delete existing
|
||||
err = streamStore.Delete(ctx, path, storj.EncNull)
|
||||
require.NoError(t, err, test.name)
|
||||
|
||||
_, _, err = streamStore.Get(ctx, path, storj.EncNull)
|
||||
require.Error(t, err, test.name)
|
||||
require.True(t, storj.ErrObjectNotFound.Has(err))
|
||||
|
||||
// delete non existing
|
||||
err = streamStore.Delete(ctx, path, storj.EncNull)
|
||||
require.Error(t, err, test.name)
|
||||
require.True(t, storj.ErrObjectNotFound.Has(err))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// TestStreamsInterruptedDelete tests a special case where the delete command is
|
||||
// interrupted before all segments are deleted. On subsequent calls to
|
||||
// streamStore.Delete we want to ensure it completes the delete without error,
|
||||
|
@ -101,7 +101,7 @@ func (download *Download) resetReader(offset int64) error {
|
||||
|
||||
obj := download.stream.Info()
|
||||
|
||||
rr, _, err := download.streams.Get(download.ctx, storj.JoinPaths(obj.Bucket.Name, obj.Path), obj.Bucket.PathCipher)
|
||||
rr, err := download.streams.Get(download.ctx, storj.JoinPaths(obj.Bucket.Name, obj.Path), obj, obj.Bucket.PathCipher)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user