Object.Bucket as storj.Bucket instead of string (#747)
This commit is contained in:
parent
120d875e06
commit
0018ebf63e
@ -78,7 +78,7 @@ func (db *DB) GetObjectStream(ctx context.Context, bucket string, path storj.Pat
|
||||
func (db *DB) CreateObject(ctx context.Context, bucket string, path storj.Path, createInfo *storj.CreateObject) (object storj.MutableObject, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
_, err = db.GetBucket(ctx, bucket)
|
||||
bucketInfo, err := db.GetBucket(ctx, bucket)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -88,7 +88,7 @@ func (db *DB) CreateObject(ctx context.Context, bucket string, path storj.Path,
|
||||
}
|
||||
|
||||
info := storj.Object{
|
||||
Bucket: bucket,
|
||||
Bucket: bucketInfo,
|
||||
Path: path,
|
||||
}
|
||||
|
||||
@ -154,6 +154,11 @@ func (db *DB) ListPendingObjects(ctx context.Context, bucket string, options sto
|
||||
func (db *DB) ListObjects(ctx context.Context, bucket string, options storj.ListOptions) (list storj.ObjectList, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
bucketInfo, err := db.GetBucket(ctx, bucket)
|
||||
if err != nil {
|
||||
return storj.ObjectList{}, err
|
||||
}
|
||||
|
||||
objects, err := db.buckets.GetObjectStore(ctx, bucket)
|
||||
if err != nil {
|
||||
return storj.ObjectList{}, err
|
||||
@ -195,7 +200,7 @@ func (db *DB) ListObjects(ctx context.Context, bucket string, options storj.List
|
||||
}
|
||||
|
||||
for _, item := range items {
|
||||
list.Items = append(list.Items, objectFromMeta("", item.Path, item.IsPrefix, item.Meta))
|
||||
list.Items = append(list.Items, objectFromMeta(bucketInfo, item.Path, item.IsPrefix, item.Meta))
|
||||
}
|
||||
|
||||
return list, nil
|
||||
@ -275,7 +280,7 @@ func (db *DB) getInfo(ctx context.Context, prefix string, bucket string, path st
|
||||
return object{}, storj.Object{}, err
|
||||
}
|
||||
|
||||
info = objectStreamFromMeta(bucket, path, lastSegmentMeta, streamInfo, streamMeta, redundancyScheme)
|
||||
info = objectStreamFromMeta(bucketInfo, path, lastSegmentMeta, streamInfo, streamMeta, redundancyScheme)
|
||||
|
||||
return object{
|
||||
fullpath: fullpath,
|
||||
@ -286,7 +291,7 @@ func (db *DB) getInfo(ctx context.Context, prefix string, bucket string, path st
|
||||
}, info, nil
|
||||
}
|
||||
|
||||
func objectFromMeta(bucket string, path storj.Path, isPrefix bool, meta objects.Meta) storj.Object {
|
||||
func objectFromMeta(bucket storj.Bucket, path storj.Path, isPrefix bool, meta objects.Meta) storj.Object {
|
||||
return storj.Object{
|
||||
Version: 0, // TODO:
|
||||
Bucket: bucket,
|
||||
@ -307,7 +312,7 @@ func objectFromMeta(bucket string, path storj.Path, isPrefix bool, meta objects.
|
||||
}
|
||||
}
|
||||
|
||||
func objectStreamFromMeta(bucket string, path storj.Path, lastSegment segments.Meta, stream pb.StreamInfo, streamMeta pb.StreamMeta, redundancyScheme *pb.RedundancyScheme) storj.Object {
|
||||
func objectStreamFromMeta(bucket storj.Bucket, path storj.Path, lastSegment segments.Meta, stream pb.StreamInfo, streamMeta pb.StreamMeta, redundancyScheme *pb.RedundancyScheme) storj.Object {
|
||||
var nonce storj.Nonce
|
||||
copy(nonce[:], streamMeta.LastSegmentMeta.KeyNonce)
|
||||
return storj.Object{
|
||||
|
@ -74,7 +74,8 @@ func TestCreateObject(t *testing.T) {
|
||||
|
||||
info := obj.Info()
|
||||
|
||||
assert.Equal(t, TestBucket, info.Bucket, errTag)
|
||||
assert.Equal(t, TestBucket, info.Bucket.Name, errTag)
|
||||
assert.Equal(t, storj.AESGCM, info.Bucket.PathCipher, errTag)
|
||||
assert.Equal(t, TestFile, info.Path, errTag)
|
||||
assert.EqualValues(t, 0, info.Size, errTag)
|
||||
assert.Equal(t, tt.expectedRS, info.RedundancyScheme, errTag)
|
||||
@ -107,6 +108,8 @@ func TestGetObject(t *testing.T) {
|
||||
object, err := db.GetObject(ctx, bucket.Name, TestFile)
|
||||
if assert.NoError(t, err) {
|
||||
assert.Equal(t, TestFile, object.Path)
|
||||
assert.Equal(t, TestBucket, object.Bucket.Name)
|
||||
assert.Equal(t, storj.AESGCM, object.Bucket.PathCipher)
|
||||
}
|
||||
})
|
||||
}
|
||||
@ -140,7 +143,7 @@ func TestGetObjectStream(t *testing.T) {
|
||||
_, err = db.GetObjectStream(ctx, "non-existing-bucket", "small-file")
|
||||
assert.True(t, storj.ErrBucketNotFound.Has(err))
|
||||
|
||||
_, err = db.GetObject(ctx, bucket.Name, "non-existing-file")
|
||||
_, err = db.GetObjectStream(ctx, bucket.Name, "non-existing-file")
|
||||
assert.True(t, storj.ErrObjectNotFound.Has(err))
|
||||
|
||||
stream, err := db.GetObjectStream(ctx, bucket.Name, "empty-file")
|
||||
@ -171,7 +174,7 @@ func upload(ctx context.Context, t *testing.T, db *DB, bucket storj.Bucket, path
|
||||
return
|
||||
}
|
||||
|
||||
upload := stream.NewUpload(ctx, str, db.streams, bucket.PathCipher)
|
||||
upload := stream.NewUpload(ctx, str, db.streams)
|
||||
|
||||
_, err = upload.Write(data)
|
||||
if !assert.NoError(t, err) {
|
||||
@ -191,6 +194,8 @@ func upload(ctx context.Context, t *testing.T, db *DB, bucket storj.Bucket, path
|
||||
|
||||
func assertStream(ctx context.Context, t *testing.T, readOnly storj.ReadOnlyStream, streams streams.Store, path storj.Path, size int64, content []byte) {
|
||||
assert.Equal(t, path, readOnly.Info().Path)
|
||||
assert.Equal(t, TestBucket, readOnly.Info().Bucket.Name)
|
||||
assert.Equal(t, storj.AESGCM, readOnly.Info().Bucket.PathCipher)
|
||||
|
||||
segments, more, err := readOnly.Segments(ctx, 0, 0)
|
||||
if !assert.NoError(t, err) {
|
||||
@ -647,7 +652,11 @@ func TestListObjects(t *testing.T) {
|
||||
|
||||
if assert.NoError(t, err, errTag) {
|
||||
assert.Equal(t, tt.more, list.More, errTag)
|
||||
assert.Equal(t, tt.result, getObjectPaths(list), errTag)
|
||||
for i, item := range list.Items {
|
||||
assert.Equal(t, tt.result[i], item.Path, errTag)
|
||||
assert.Equal(t, TestBucket, item.Bucket.Name, errTag)
|
||||
assert.Equal(t, storj.Unencrypted, item.Bucket.PathCipher, errTag)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
@ -671,13 +680,3 @@ func optionsRecursive(prefix, cursor string, direction storj.ListDirection, limi
|
||||
Recursive: true,
|
||||
}
|
||||
}
|
||||
|
||||
func getObjectPaths(list storj.ObjectList) []string {
|
||||
names := make([]string, len(list.Items))
|
||||
|
||||
for i, item := range list.Items {
|
||||
names[i] = item.Path
|
||||
}
|
||||
|
||||
return names
|
||||
}
|
||||
|
@ -54,7 +54,7 @@ type CreateObject struct {
|
||||
}
|
||||
|
||||
// Object converts the CreateObject to an object with unitialized values
|
||||
func (create CreateObject) Object(bucket string, path Path) Object {
|
||||
func (create CreateObject) Object(bucket Bucket, path Path) Object {
|
||||
return Object{
|
||||
Bucket: bucket,
|
||||
Path: path,
|
||||
|
@ -33,7 +33,7 @@ type Bucket struct {
|
||||
// Object contains information about a specific object
|
||||
type Object struct {
|
||||
Version uint32
|
||||
Bucket string
|
||||
Bucket Bucket
|
||||
Path Path
|
||||
IsPrefix bool
|
||||
|
||||
|
@ -106,7 +106,7 @@ func (download *Download) resetReader(offset int64) error {
|
||||
|
||||
obj := download.stream.Info()
|
||||
|
||||
rr, _, err := download.streams.Get(download.ctx, storj.JoinPaths(obj.Bucket, obj.Path), obj.Cipher)
|
||||
rr, _, err := download.streams.Get(download.ctx, storj.JoinPaths(obj.Bucket.Name, obj.Path), obj.Bucket.PathCipher)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -16,30 +16,28 @@ import (
|
||||
|
||||
// Upload implements Writer and Closer for writing to stream.
|
||||
type Upload struct {
|
||||
ctx context.Context
|
||||
stream storj.MutableStream
|
||||
streams streams.Store
|
||||
pathCipher storj.Cipher
|
||||
writer io.WriteCloser
|
||||
closed bool
|
||||
errgroup errgroup.Group
|
||||
ctx context.Context
|
||||
stream storj.MutableStream
|
||||
streams streams.Store
|
||||
writer io.WriteCloser
|
||||
closed bool
|
||||
errgroup errgroup.Group
|
||||
}
|
||||
|
||||
// NewUpload creates new stream upload.
|
||||
func NewUpload(ctx context.Context, stream storj.MutableStream, streams streams.Store, pathCipher storj.Cipher) *Upload {
|
||||
func NewUpload(ctx context.Context, stream storj.MutableStream, streams streams.Store) *Upload {
|
||||
reader, writer := io.Pipe()
|
||||
|
||||
upload := Upload{
|
||||
ctx: ctx,
|
||||
stream: stream,
|
||||
streams: streams,
|
||||
pathCipher: pathCipher,
|
||||
writer: writer,
|
||||
ctx: ctx,
|
||||
stream: stream,
|
||||
streams: streams,
|
||||
writer: writer,
|
||||
}
|
||||
|
||||
upload.errgroup.Go(func() error {
|
||||
obj := stream.Info()
|
||||
_, err := streams.Put(ctx, storj.JoinPaths(obj.Bucket, obj.Path), pathCipher, reader, obj.Metadata, obj.Expires)
|
||||
_, err := streams.Put(ctx, storj.JoinPaths(obj.Bucket.Name, obj.Path), obj.Bucket.PathCipher, reader, obj.Metadata, obj.Expires)
|
||||
if err != nil {
|
||||
err = utils.CombineErrors(err, reader.CloseWithError(err))
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user