Migrate Minio Gateway to the new Metainfo API (#775)

This commit is contained in:
Kaloyan Raev 2018-12-07 20:31:29 +02:00 committed by GitHub
parent 147f86bba4
commit 72a20af569
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 939 additions and 735 deletions

View File

@ -75,11 +75,11 @@ func upload(ctx context.Context, src fpath.FPath, dst fpath.FPath, showProgress
return err
}
create := storj.CreateObject{
createInfo := storj.CreateObject{
RedundancyScheme: cfg.GetRedundancyScheme(),
EncryptionScheme: cfg.GetEncryptionScheme(),
}
obj, err := metainfo.CreateObject(ctx, dst.Bucket(), dst.Path(), &create)
obj, err := metainfo.CreateObject(ctx, dst.Bucket(), dst.Path(), &createInfo)
if err != nil {
return convertError(err, dst)
}
@ -220,11 +220,11 @@ func copy(ctx context.Context, src fpath.FPath, dst fpath.FPath) error {
dst = dst.Join(src.Base())
}
create := storj.CreateObject{
createInfo := storj.CreateObject{
RedundancyScheme: cfg.GetRedundancyScheme(),
EncryptionScheme: cfg.GetEncryptionScheme(),
}
obj, err := metainfo.CreateObject(ctx, dst.Bucket(), dst.Path(), &create)
obj, err := metainfo.CreateObject(ctx, dst.Bucket(), dst.Path(), &createInfo)
if err != nil {
return convertError(err, dst)
}

View File

@ -190,12 +190,12 @@ func (sf *storjFS) OpenDir(name string, context *fuse.Context) (c []fuse.DirEntr
func (sf *storjFS) Mkdir(name string, mode uint32, context *fuse.Context) fuse.Status {
zap.S().Debug("Mkdir: ", name)
create := storj.CreateObject{
createInfo := storj.CreateObject{
ContentType: "application/directory",
RedundancyScheme: cfg.GetRedundancyScheme(),
EncryptionScheme: cfg.GetEncryptionScheme(),
}
object, err := sf.metainfo.CreateObject(sf.ctx, sf.bucket.Name, name+"/", &create)
object, err := sf.metainfo.CreateObject(sf.ctx, sf.bucket.Name, name+"/", &createInfo)
if err != nil {
return fuse.EIO
}
@ -414,12 +414,12 @@ func (f *storjFile) getWriter(off int64) (io.Writer, error) {
f.size = 0
f.closeWriter()
create := storj.CreateObject{
createInfo := storj.CreateObject{
RedundancyScheme: cfg.GetRedundancyScheme(),
EncryptionScheme: cfg.GetEncryptionScheme(),
}
var err error
f.mutableObject, err = f.metainfo.CreateObject(f.ctx, f.bucket.Name, f.name, &create)
f.mutableObject, err = f.metainfo.CreateObject(f.ctx, f.bucket.Name, f.name, &createInfo)
if err != nil {
return nil, err
}

View File

@ -318,7 +318,7 @@ func runTest(t *testing.T, test func(context.Context, *DB)) {
defer ctx.Check(planet.Shutdown)
planet.Start(context.Background())
planet.Start(ctx)
db, err := newDB(planet)
if !assert.NoError(t, err) {

View File

@ -280,7 +280,10 @@ func (db *DB) getInfo(ctx context.Context, prefix string, bucket string, path st
return object{}, storj.Object{}, err
}
info = objectStreamFromMeta(bucketInfo, path, lastSegmentMeta, streamInfo, streamMeta, redundancyScheme)
info, err = objectStreamFromMeta(bucketInfo, path, lastSegmentMeta, streamInfo, streamMeta, redundancyScheme)
if err != nil {
return object{}, storj.Object{}, err
}
return object{
fullpath: fullpath,
@ -298,7 +301,7 @@ func objectFromMeta(bucket storj.Bucket, path storj.Path, isPrefix bool, meta ob
Path: path,
IsPrefix: isPrefix,
Metadata: nil,
Metadata: meta.UserDefined,
ContentType: meta.ContentType,
Created: meta.Modified, // TODO: use correct field
@ -312,21 +315,28 @@ func objectFromMeta(bucket storj.Bucket, path storj.Path, isPrefix bool, meta ob
}
}
func objectStreamFromMeta(bucket storj.Bucket, path storj.Path, lastSegment segments.Meta, stream pb.StreamInfo, streamMeta pb.StreamMeta, redundancyScheme *pb.RedundancyScheme) storj.Object {
func objectStreamFromMeta(bucket storj.Bucket, path storj.Path, lastSegment segments.Meta, stream pb.StreamInfo, streamMeta pb.StreamMeta, redundancyScheme *pb.RedundancyScheme) (storj.Object, error) {
var nonce storj.Nonce
copy(nonce[:], streamMeta.LastSegmentMeta.KeyNonce)
serMetaInfo := pb.SerializableMeta{}
err := proto.Unmarshal(stream.Metadata, &serMetaInfo)
if err != nil {
return storj.Object{}, err
}
return storj.Object{
Version: 0, // TODO:
Bucket: bucket,
Path: path,
IsPrefix: false,
Metadata: nil, // TODO:
Metadata: serMetaInfo.UserDefined,
// ContentType: object.ContentType,
Created: lastSegment.Modified, // TODO: use correct field
Modified: lastSegment.Modified, // TODO: use correct field
Expires: lastSegment.Expiration, // TODO: use correct field
ContentType: serMetaInfo.ContentType,
Created: lastSegment.Modified, // TODO: use correct field
Modified: lastSegment.Modified, // TODO: use correct field
Expires: lastSegment.Expiration, // TODO: use correct field
Stream: storj.Stream{
Size: stream.SegmentsSize*(stream.NumberOfSegments-1) + stream.LastSegmentSize,
@ -353,7 +363,7 @@ func objectStreamFromMeta(bucket storj.Bucket, path storj.Path, lastSegment segm
EncryptedKey: streamMeta.LastSegmentMeta.EncryptedKey,
},
},
}
}, nil
}
// convertTime converts gRPC timestamp to Go time
@ -391,6 +401,7 @@ func (object *mutableObject) DeleteStream(ctx context.Context) error {
}
func (object *mutableObject) Commit(ctx context.Context) error {
// Do nothing for now - the object will be committed to PointerDB with Upload.Close
return nil
_, info, err := object.db.getInfo(ctx, committedPrefix, object.info.Bucket.Name, object.info.Path)
object.info = info
return err
}

View File

@ -373,7 +373,7 @@ func TestListObjects(t *testing.T) {
more: true,
result: []string{"a", "a/"},
}, {
options: options("", "1", storj.After, 2),
options: options("", "`", storj.After, 2),
more: true,
result: []string{"a", "a/"},
}, {
@ -487,7 +487,7 @@ func TestListObjects(t *testing.T) {
options: options("", "", storj.Backward, 0),
result: []string{"a", "a/", "aa", "b", "b/", "bb", "c"},
}, {
options: options("", "1", storj.Backward, 0),
options: options("", "`", storj.Backward, 0),
result: []string{},
}, {
options: options("", "b", storj.Backward, 0),
@ -503,7 +503,7 @@ func TestListObjects(t *testing.T) {
more: true,
result: []string{"c"},
}, {
options: options("", "1", storj.Backward, 1),
options: options("", "`", storj.Backward, 1),
result: []string{},
}, {
options: options("", "aa", storj.Backward, 1),

View File

@ -122,54 +122,35 @@ func (c Config) action(ctx context.Context, cliCtx *cli.Context, identity *provi
return Error.New("unexpected minio exit")
}
// GetBucketStore returns an implementation of buckets.Store
func (c Config) GetBucketStore(ctx context.Context, identity *provider.FullIdentity) (bs buckets.Store, err error) {
defer mon.Task()(&ctx)(&err)
buckets, _, _, _, _, err := c.init(ctx, identity)
return buckets, err
}
// GetMetainfo returns an implementation of storj.Metainfo
func (c Config) GetMetainfo(ctx context.Context, identity *provider.FullIdentity) (db storj.Metainfo, ss streams.Store, err error) {
defer mon.Task()(&ctx)(&err)
buckets, streams, segments, pdb, key, err := c.init(ctx, identity)
oc, err := overlay.NewOverlayClient(identity, c.Client.OverlayAddr)
if err != nil {
return nil, nil, err
}
return kvmetainfo.New(buckets, streams, segments, pdb, key), streams, nil
}
func (c Config) init(ctx context.Context, identity *provider.FullIdentity) (buckets.Store, streams.Store, segments.Store, pdbclient.Client, *storj.Key, error) {
var oc overlay.Client
oc, err := overlay.NewOverlayClient(identity, c.Client.OverlayAddr)
if err != nil {
return nil, nil, nil, nil, nil, err
}
pdb, err := pdbclient.NewClient(identity, c.Client.PointerDBAddr, c.Client.APIKey)
if err != nil {
return nil, nil, nil, nil, nil, err
return nil, nil, err
}
ec := ecclient.NewClient(identity, c.RS.MaxBufferMem)
fc, err := infectious.NewFEC(c.RS.MinThreshold, c.RS.MaxThreshold)
if err != nil {
return nil, nil, nil, nil, nil, err
return nil, nil, err
}
rs, err := eestream.NewRedundancyStrategy(eestream.NewRSScheme(fc, c.RS.ErasureShareSize), c.RS.RepairThreshold, c.RS.SuccessThreshold)
if err != nil {
return nil, nil, nil, nil, nil, err
return nil, nil, err
}
segments := segments.NewSegmentStore(oc, ec, pdb, rs, c.Client.MaxInlineSize)
if c.RS.ErasureShareSize*c.RS.MinThreshold%c.Enc.BlockSize != 0 {
err = Error.New("EncryptionBlockSize must be a multiple of ErasureShareSize * RS MinThreshold")
return nil, nil, nil, nil, nil, err
return nil, nil, err
}
key := new(storj.Key)
@ -177,12 +158,12 @@ func (c Config) init(ctx context.Context, identity *provider.FullIdentity) (buck
streams, err := streams.NewStreamStore(segments, c.Client.SegmentSize, key, c.Enc.BlockSize, storj.Cipher(c.Enc.DataType))
if err != nil {
return nil, nil, nil, nil, nil, err
return nil, nil, err
}
buckets := buckets.NewStore(streams)
return buckets, streams, segments, pdb, key, nil
return kvmetainfo.New(buckets, streams, segments, pdb, key), streams, nil
}
// GetRedundancyScheme returns the configured redundancy scheme for new uploads
@ -208,10 +189,10 @@ func (c Config) GetEncryptionScheme() storj.EncryptionScheme {
func (c Config) NewGateway(ctx context.Context, identity *provider.FullIdentity) (gw minio.Gateway, err error) {
defer mon.Task()(&ctx)(&err)
bs, err := c.GetBucketStore(ctx, identity)
metainfo, streams, err := c.GetMetainfo(ctx, identity)
if err != nil {
return nil, err
}
return NewStorjGateway(bs, storj.Cipher(c.Enc.PathType)), nil
return NewStorjGateway(metainfo, streams, storj.Cipher(c.Enc.PathType), c.GetEncryptionScheme(), c.GetRedundancyScheme()), nil
}

View File

@ -5,9 +5,9 @@ package miniogw
import (
"context"
"encoding/hex"
"io"
"strings"
"time"
minio "github.com/minio/minio/cmd"
"github.com/minio/minio/pkg/auth"
@ -15,192 +15,180 @@ import (
"github.com/zeebo/errs"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/ranger"
"storj.io/storj/pkg/storage/buckets"
"storj.io/storj/pkg/storage/meta"
"storj.io/storj/pkg/storage/streams"
"storj.io/storj/pkg/storj"
"storj.io/storj/pkg/stream"
"storj.io/storj/pkg/utils"
)
var (
mon = monkit.Package()
//Error is the errs class of standard End User Client errors
// Error is the errs class of standard End User Client errors
Error = errs.Class("Storj Gateway error")
)
// NewStorjGateway creates a *Storj object from an existing ObjectStore
func NewStorjGateway(bs buckets.Store, pathCipher storj.Cipher) *Storj {
return &Storj{bs: bs, pathCipher: pathCipher, multipart: NewMultipartUploads()}
func NewStorjGateway(metainfo storj.Metainfo, streams streams.Store, pathCipher storj.Cipher, encryption storj.EncryptionScheme, redundancy storj.RedundancyScheme) *Gateway {
return &Gateway{
metainfo: metainfo,
streams: streams,
pathCipher: pathCipher,
encryption: encryption,
redundancy: redundancy,
multipart: NewMultipartUploads(),
}
}
//Storj is the implementation of a minio cmd.Gateway
type Storj struct {
bs buckets.Store
// Gateway is the implementation of a minio cmd.Gateway
type Gateway struct {
metainfo storj.Metainfo
streams streams.Store
pathCipher storj.Cipher
encryption storj.EncryptionScheme
redundancy storj.RedundancyScheme
multipart *MultipartUploads
}
// Name implements cmd.Gateway
func (s *Storj) Name() string {
func (gateway *Gateway) Name() string {
return "storj"
}
// NewGatewayLayer implements cmd.Gateway
func (s *Storj) NewGatewayLayer(creds auth.Credentials) (
minio.ObjectLayer, error) {
return &storjObjects{storj: s}, nil
func (gateway *Gateway) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, error) {
return &gatewayLayer{gateway: gateway}, nil
}
// Production implements cmd.Gateway
func (s *Storj) Production() bool {
func (gateway *Gateway) Production() bool {
return false
}
type storjObjects struct {
type gatewayLayer struct {
minio.GatewayUnsupported
storj *Storj
gateway *Gateway
}
func (s *storjObjects) DeleteBucket(ctx context.Context, bucket string) (err error) {
func (layer *gatewayLayer) DeleteBucket(ctx context.Context, bucket string) (err error) {
defer mon.Task()(&ctx)(&err)
o, err := s.storj.bs.GetObjectStore(ctx, bucket)
list, err := layer.gateway.metainfo.ListObjects(ctx, bucket, storj.ListOptions{Direction: storj.After, Recursive: true, Limit: 1})
if err != nil {
return convertBucketNotFoundError(err, bucket)
return convertError(err, bucket, "")
}
items, _, err := o.List(ctx, "", "", "", true, 1, meta.None)
if err != nil {
return err
}
if len(items) > 0 {
if len(list.Items) > 0 {
return minio.BucketNotEmpty{Bucket: bucket}
}
err = s.storj.bs.Delete(ctx, bucket)
err = layer.gateway.metainfo.DeleteBucket(ctx, bucket)
return convertBucketNotFoundError(err, bucket)
return convertError(err, bucket, "")
}
func (s *storjObjects) DeleteObject(ctx context.Context, bucket, object string) (err error) {
func (layer *gatewayLayer) DeleteObject(ctx context.Context, bucket, object string) (err error) {
defer mon.Task()(&ctx)(&err)
o, err := s.storj.bs.GetObjectStore(ctx, bucket)
if err != nil {
return convertBucketNotFoundError(err, bucket)
}
err = layer.gateway.metainfo.DeleteObject(ctx, bucket, object)
err = o.Delete(ctx, object)
return convertObjectNotFoundError(err, bucket, object)
return convertError(err, bucket, object)
}
func (s *storjObjects) GetBucketInfo(ctx context.Context, bucket string) (bucketInfo minio.BucketInfo, err error) {
func (layer *gatewayLayer) GetBucketInfo(ctx context.Context, bucket string) (bucketInfo minio.BucketInfo, err error) {
defer mon.Task()(&ctx)(&err)
meta, err := s.storj.bs.Get(ctx, bucket)
info, err := layer.gateway.metainfo.GetBucket(ctx, bucket)
if err != nil {
return minio.BucketInfo{}, convertBucketNotFoundError(err, bucket)
return minio.BucketInfo{}, convertError(err, bucket, "")
}
return minio.BucketInfo{Name: bucket, Created: meta.Created}, nil
return minio.BucketInfo{Name: info.Name, Created: info.Created}, nil
}
func (s *storjObjects) getObject(ctx context.Context, bucket, object string) (rr ranger.Ranger, err error) {
func (layer *gatewayLayer) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) (err error) {
defer mon.Task()(&ctx)(&err)
o, err := s.storj.bs.GetObjectStore(ctx, bucket)
readOnlyStream, err := layer.gateway.metainfo.GetObjectStream(ctx, bucket, object)
if err != nil {
return nil, convertBucketNotFoundError(err, bucket)
return convertError(err, bucket, object)
}
rr, _, err = o.Get(ctx, object)
if startOffset < 0 || length < -1 || startOffset+length > readOnlyStream.Info().Size {
return minio.InvalidRange{
OffsetBegin: startOffset,
OffsetEnd: startOffset + length,
ResourceSize: readOnlyStream.Info().Size,
}
}
return rr, convertObjectNotFoundError(err, bucket, object)
}
download := stream.NewDownload(ctx, readOnlyStream, layer.gateway.streams)
defer utils.LogClose(download)
func (s *storjObjects) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) (err error) {
defer mon.Task()(&ctx)(&err)
rr, err := s.getObject(ctx, bucket, object)
_, err = download.Seek(startOffset, io.SeekStart)
if err != nil {
return err
}
if length == -1 {
length = rr.Size() - startOffset
_, err = io.Copy(writer, download)
} else {
_, err = io.CopyN(writer, download, length)
}
r, err := rr.Range(ctx, startOffset, length)
if err != nil {
return err
}
defer utils.LogClose(r)
_, err = io.Copy(writer, r)
return err
}
func (s *storjObjects) GetObjectInfo(ctx context.Context, bucket, object string) (objInfo minio.ObjectInfo, err error) {
func (layer *gatewayLayer) GetObjectInfo(ctx context.Context, bucket, object string) (objInfo minio.ObjectInfo, err error) {
defer mon.Task()(&ctx)(&err)
o, err := s.storj.bs.GetObjectStore(ctx, bucket)
obj, err := layer.gateway.metainfo.GetObject(ctx, bucket, object)
if err != nil {
return minio.ObjectInfo{}, convertBucketNotFoundError(err, bucket)
}
m, err := o.Meta(ctx, object)
if err != nil {
return objInfo, convertObjectNotFoundError(err, bucket, object)
return minio.ObjectInfo{}, convertError(err, bucket, object)
}
return minio.ObjectInfo{
Name: object,
Bucket: bucket,
ModTime: m.Modified,
Size: m.Size,
ETag: m.Checksum,
ContentType: m.ContentType,
UserDefined: m.UserDefined,
ModTime: obj.Modified,
Size: obj.Size,
ETag: hex.EncodeToString(obj.Checksum),
ContentType: obj.ContentType,
UserDefined: obj.Metadata,
}, err
}
func (s *storjObjects) ListBuckets(ctx context.Context) (bucketItems []minio.BucketInfo, err error) {
func (layer *gatewayLayer) ListBuckets(ctx context.Context) (bucketItems []minio.BucketInfo, err error) {
defer mon.Task()(&ctx)(&err)
startAfter := ""
var items []buckets.ListItem
for {
moreItems, more, err := s.storj.bs.List(ctx, startAfter, "", 0)
list, err := layer.gateway.metainfo.ListBuckets(ctx, storj.BucketListOptions{Direction: storj.After, Cursor: startAfter})
if err != nil {
return nil, err
}
items = append(items, moreItems...)
if !more {
for _, item := range list.Items {
bucketItems = append(bucketItems, minio.BucketInfo{Name: item.Name, Created: item.Created})
}
if !list.More {
break
}
startAfter = moreItems[len(moreItems)-1].Bucket
}
bucketItems = make([]minio.BucketInfo, len(items))
for i, item := range items {
bucketItems[i].Name = item.Bucket
bucketItems[i].Created = item.Meta.Created
startAfter = list.Items[len(list.Items)-1].Name
}
return bucketItems, err
}
func (s *storjObjects) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result minio.ListObjectsInfo, err error) {
func (layer *gatewayLayer) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result minio.ListObjectsInfo, err error) {
defer mon.Task()(&ctx)(&err)
if delimiter != "" && delimiter != "/" {
return minio.ListObjectsInfo{}, Error.New("delimiter %s not supported", delimiter)
return minio.ListObjectsInfo{}, minio.UnsupportedDelimiter{Delimiter: delimiter}
}
startAfter := marker
@ -208,16 +196,20 @@ func (s *storjObjects) ListObjects(ctx context.Context, bucket, prefix, marker,
var objects []minio.ObjectInfo
var prefixes []string
o, err := s.storj.bs.GetObjectStore(ctx, bucket)
list, err := layer.gateway.metainfo.ListObjects(ctx, bucket, storj.ListOptions{
Direction: storj.After,
Cursor: startAfter,
Prefix: prefix,
Recursive: recursive,
Limit: maxKeys,
})
if err != nil {
return minio.ListObjectsInfo{}, convertBucketNotFoundError(err, bucket)
return result, convertError(err, bucket, "")
}
items, more, err := o.List(ctx, prefix, startAfter, "", recursive, maxKeys, meta.All)
if err != nil {
return result, err
}
if len(items) > 0 {
for _, item := range items {
if len(list.Items) > 0 {
for _, item := range list.Items {
path := item.Path
if recursive && prefix != "" {
path = storj.JoinPaths(strings.TrimSuffix(prefix, "/"), path)
@ -230,22 +222,22 @@ func (s *storjObjects) ListObjects(ctx context.Context, bucket, prefix, marker,
Bucket: bucket,
IsDir: false,
Name: path,
ModTime: item.Meta.Modified,
Size: item.Meta.Size,
ContentType: item.Meta.ContentType,
UserDefined: item.Meta.UserDefined,
ETag: item.Meta.Checksum,
ModTime: item.Modified,
Size: item.Size,
ETag: hex.EncodeToString(item.Checksum),
ContentType: item.ContentType,
UserDefined: item.Metadata,
})
}
startAfter = items[len(items)-1].Path
startAfter = list.Items[len(list.Items)-1].Path
}
result = minio.ListObjectsInfo{
IsTruncated: more,
IsTruncated: list.More,
Objects: objects,
Prefixes: prefixes,
}
if more {
if list.More {
result.NextMarker = startAfter
}
@ -253,11 +245,11 @@ func (s *storjObjects) ListObjects(ctx context.Context, bucket, prefix, marker,
}
// ListObjectsV2 - Not implemented stub
func (s *storjObjects) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result minio.ListObjectsV2Info, err error) {
func (layer *gatewayLayer) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result minio.ListObjectsV2Info, err error) {
defer mon.Task()(&ctx)(&err)
if delimiter != "" && delimiter != "/" {
return minio.ListObjectsV2Info{ContinuationToken: continuationToken}, Error.New("delimiter %s not supported", delimiter)
return minio.ListObjectsV2Info{ContinuationToken: continuationToken}, minio.UnsupportedDelimiter{Delimiter: delimiter}
}
recursive := delimiter == ""
@ -273,17 +265,20 @@ func (s *storjObjects) ListObjectsV2(ctx context.Context, bucket, prefix, contin
var objects []minio.ObjectInfo
var prefixes []string
o, err := s.storj.bs.GetObjectStore(ctx, bucket)
list, err := layer.gateway.metainfo.ListObjects(ctx, bucket, storj.ListOptions{
Direction: storj.After,
Cursor: startAfterPath,
Prefix: prefix,
Recursive: recursive,
Limit: maxKeys,
})
if err != nil {
return minio.ListObjectsV2Info{ContinuationToken: continuationToken}, convertBucketNotFoundError(err, bucket)
}
items, more, err := o.List(ctx, prefix, startAfterPath, "", recursive, maxKeys, meta.All)
if err != nil {
return result, err
return minio.ListObjectsV2Info{ContinuationToken: continuationToken}, convertError(err, bucket, "")
}
if len(items) > 0 {
for _, item := range items {
if len(list.Items) > 0 {
for _, item := range list.Items {
path := item.Path
if recursive && prefix != "" {
path = storj.JoinPaths(strings.TrimSuffix(prefix, "/"), path)
@ -296,31 +291,31 @@ func (s *storjObjects) ListObjectsV2(ctx context.Context, bucket, prefix, contin
Bucket: bucket,
IsDir: false,
Name: path,
ModTime: item.Meta.Modified,
Size: item.Meta.Size,
ContentType: item.Meta.ContentType,
UserDefined: item.Meta.UserDefined,
ETag: item.Meta.Checksum,
ModTime: item.Modified,
Size: item.Size,
ETag: hex.EncodeToString(item.Checksum),
ContentType: item.ContentType,
UserDefined: item.Metadata,
})
}
nextContinuationToken = items[len(items)-1].Path + "\x00"
nextContinuationToken = list.Items[len(list.Items)-1].Path + "\x00"
}
result = minio.ListObjectsV2Info{
IsTruncated: more,
IsTruncated: list.More,
ContinuationToken: continuationToken,
Objects: objects,
Prefixes: prefixes,
}
if more {
if list.More {
result.NextContinuationToken = nextContinuationToken
}
return result, err
}
func (s *storjObjects) MakeBucketWithLocation(ctx context.Context, bucket string, location string) (err error) {
func (layer *gatewayLayer) MakeBucketWithLocation(ctx context.Context, bucket string, location string) (err error) {
defer mon.Task()(&ctx)(&err)
// TODO: This current strategy of calling bs.Get
// to check if a bucket exists, then calling bs.Put
@ -329,94 +324,132 @@ func (s *storjObjects) MakeBucketWithLocation(ctx context.Context, bucket string
// therefore try to Put a bucket at the same time.
// The reason for the Get call to check if the
// bucket already exists is to match S3 CLI behavior.
_, err = s.storj.bs.Get(ctx, bucket)
_, err = layer.gateway.metainfo.GetBucket(ctx, bucket)
if err == nil {
return minio.BucketAlreadyExists{Bucket: bucket}
}
if !storj.ErrBucketNotFound.Has(err) {
return err
return convertError(err, bucket, "")
}
_, err = s.storj.bs.Put(ctx, bucket, s.storj.pathCipher)
_, err = layer.gateway.metainfo.CreateBucket(ctx, bucket, &storj.Bucket{PathCipher: layer.gateway.pathCipher})
return err
}
func (s *storjObjects) CopyObject(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo minio.ObjectInfo) (objInfo minio.ObjectInfo, err error) {
func (layer *gatewayLayer) CopyObject(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo minio.ObjectInfo) (objInfo minio.ObjectInfo, err error) {
defer mon.Task()(&ctx)(&err)
rr, err := s.getObject(ctx, srcBucket, srcObject)
readOnlyStream, err := layer.gateway.metainfo.GetObjectStream(ctx, srcBucket, srcObject)
if err != nil {
return objInfo, err
return minio.ObjectInfo{}, convertError(err, srcBucket, srcObject)
}
r, err := rr.Range(ctx, 0, rr.Size())
if err != nil {
return objInfo, err
download := stream.NewDownload(ctx, readOnlyStream, layer.gateway.streams)
defer utils.LogClose(download)
info := readOnlyStream.Info()
createInfo := storj.CreateObject{
ContentType: info.ContentType,
Expires: info.Expires,
Metadata: info.Metadata,
RedundancyScheme: info.RedundancyScheme,
EncryptionScheme: info.EncryptionScheme,
}
defer utils.LogClose(r)
serMetaInfo := pb.SerializableMeta{
ContentType: srcInfo.ContentType,
UserDefined: srcInfo.UserDefined,
}
return s.putObject(ctx, destBucket, destObject, r, serMetaInfo)
return layer.putObject(ctx, destBucket, destObject, download, &createInfo)
}
func (s *storjObjects) putObject(ctx context.Context, bucket, object string, r io.Reader, meta pb.SerializableMeta) (objInfo minio.ObjectInfo, err error) {
func (layer *gatewayLayer) putObject(ctx context.Context, bucket, object string, reader io.Reader, createInfo *storj.CreateObject) (objInfo minio.ObjectInfo, err error) {
defer mon.Task()(&ctx)(&err)
// setting zero value means the object never expires
expTime := time.Time{}
o, err := s.storj.bs.GetObjectStore(ctx, bucket)
mutableObject, err := layer.gateway.metainfo.CreateObject(ctx, bucket, object, createInfo)
if err != nil {
return minio.ObjectInfo{}, convertBucketNotFoundError(err, bucket)
return minio.ObjectInfo{}, convertError(err, bucket, object)
}
m, err := o.Put(ctx, object, r, meta, expTime)
err = upload(ctx, layer.gateway.streams, mutableObject, reader)
if err != nil {
return minio.ObjectInfo{}, err
}
err = mutableObject.Commit(ctx)
if err != nil {
return minio.ObjectInfo{}, err
}
info := mutableObject.Info()
return minio.ObjectInfo{
Name: object,
Bucket: bucket,
ModTime: m.Modified,
Size: m.Size,
ETag: m.Checksum,
ContentType: m.ContentType,
UserDefined: m.UserDefined,
}, err
ModTime: info.Modified,
Size: info.Size,
ETag: hex.EncodeToString(info.Checksum),
ContentType: info.ContentType,
UserDefined: info.Metadata,
}, nil
}
func (s *storjObjects) PutObject(ctx context.Context, bucket, object string, data *hash.Reader, metadata map[string]string) (objInfo minio.ObjectInfo, err error) {
defer mon.Task()(&ctx)(&err)
tempContType := metadata["content-type"]
delete(metadata, "content-type")
//metadata serialized
serMetaInfo := pb.SerializableMeta{
ContentType: tempContType,
UserDefined: metadata,
func upload(ctx context.Context, streams streams.Store, mutableObject storj.MutableObject, reader io.Reader) error {
mutableStream, err := mutableObject.CreateStream(ctx)
if err != nil {
return err
}
return s.putObject(ctx, bucket, object, data, serMetaInfo)
upload := stream.NewUpload(ctx, mutableStream, streams)
defer utils.LogClose(upload)
_, err = io.Copy(upload, reader)
if err != nil {
return err
}
return err
}
func (s *storjObjects) Shutdown(ctx context.Context) (err error) {
func (layer *gatewayLayer) PutObject(ctx context.Context, bucket, object string, data *hash.Reader, metadata map[string]string) (objInfo minio.ObjectInfo, err error) {
defer mon.Task()(&ctx)(&err)
contentType := metadata["content-type"]
delete(metadata, "content-type")
createInfo := storj.CreateObject{
ContentType: contentType,
Metadata: metadata,
RedundancyScheme: layer.gateway.redundancy,
EncryptionScheme: layer.gateway.encryption,
}
return layer.putObject(ctx, bucket, object, data, &createInfo)
}
func (layer *gatewayLayer) Shutdown(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
return nil
}
func (s *storjObjects) StorageInfo(context.Context) minio.StorageInfo {
func (layer *gatewayLayer) StorageInfo(context.Context) minio.StorageInfo {
return minio.StorageInfo{}
}
func convertBucketNotFoundError(err error, bucket string) error {
func convertError(err error, bucket, object string) error {
if storj.ErrNoBucket.Has(err) {
return minio.BucketNameInvalid{Bucket: bucket}
}
if storj.ErrBucketNotFound.Has(err) {
return minio.BucketNotFound{Bucket: bucket}
}
return err
}
func convertObjectNotFoundError(err error, bucket, object string) error {
if storj.ErrNoPath.Has(err) {
return minio.ObjectNameInvalid{Bucket: bucket, Object: object}
}
if storj.ErrObjectNotFound.Has(err) {
return minio.ObjectNotFound{Bucket: bucket, Object: object}
}
return err
}

File diff suppressed because it is too large Load Diff

View File

@ -15,64 +15,47 @@ import (
minio "github.com/minio/minio/cmd"
"github.com/minio/minio/pkg/hash"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
)
func (s *storjObjects) NewMultipartUpload(ctx context.Context, bucket, object string, metadata map[string]string) (uploadID string, err error) {
func (layer *gatewayLayer) NewMultipartUpload(ctx context.Context, bucket, object string, metadata map[string]string) (uploadID string, err error) {
defer mon.Task()(&ctx)(&err)
uploads := s.storj.multipart
uploads := layer.gateway.multipart
upload, err := uploads.Create(bucket, object, metadata)
if err != nil {
return "", err
}
objectStore, err := s.storj.bs.GetObjectStore(ctx, bucket)
if err != nil {
uploads.RemoveByID(upload.ID)
upload.fail(err)
return "", err
}
go func() {
// setting zero value means the object never expires
expTime := time.Time{}
tempContType := metadata["content-type"]
contentType := metadata["content-type"]
delete(metadata, "content-type")
// metadata serialized
serMetaInfo := pb.SerializableMeta{
ContentType: tempContType,
UserDefined: metadata,
createInfo := storj.CreateObject{
ContentType: contentType,
Metadata: metadata,
RedundancyScheme: layer.gateway.redundancy,
EncryptionScheme: layer.gateway.encryption,
}
objInfo, err := layer.putObject(ctx, bucket, object, upload.Stream, &createInfo)
result, err := objectStore.Put(ctx, object, upload.Stream, serMetaInfo, expTime)
uploads.RemoveByID(upload.ID)
if err != nil {
upload.fail(err)
} else {
upload.complete(minio.ObjectInfo{
Name: object,
Bucket: bucket,
ModTime: result.Modified,
Size: result.Size,
ETag: result.Checksum,
ContentType: result.ContentType,
UserDefined: result.UserDefined,
})
upload.complete(objInfo)
}
}()
return upload.ID, nil
}
func (s *storjObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, data *hash.Reader) (info minio.PartInfo, err error) {
func (layer *gatewayLayer) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, data *hash.Reader) (info minio.PartInfo, err error) {
defer mon.Task()(&ctx)(&err)
uploads := s.storj.multipart
uploads := layer.gateway.multipart
upload, err := uploads.Get(bucket, object, uploadID)
if err != nil {
@ -101,10 +84,10 @@ func (s *storjObjects) PutObjectPart(ctx context.Context, bucket, object, upload
return partInfo, nil
}
func (s *storjObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) (err error) {
func (layer *gatewayLayer) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) (err error) {
defer mon.Task()(&ctx)(&err)
uploads := s.storj.multipart
uploads := layer.gateway.multipart
upload, err := uploads.Remove(bucket, object, uploadID)
if err != nil {
@ -120,10 +103,10 @@ func (s *storjObjects) AbortMultipartUpload(ctx context.Context, bucket, object,
return nil
}
func (s *storjObjects) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []minio.CompletePart) (objInfo minio.ObjectInfo, err error) {
func (layer *gatewayLayer) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []minio.CompletePart) (objInfo minio.ObjectInfo, err error) {
defer mon.Task()(&ctx)(&err)
uploads := s.storj.multipart
uploads := layer.gateway.multipart
upload, err := uploads.Remove(bucket, object, uploadID)
if err != nil {
return minio.ObjectInfo{}, err
@ -137,10 +120,10 @@ func (s *storjObjects) CompleteMultipartUpload(ctx context.Context, bucket, obje
return result.Info, result.Error
}
func (s *storjObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker int, maxParts int) (result minio.ListPartsInfo, err error) {
func (layer *gatewayLayer) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker int, maxParts int) (result minio.ListPartsInfo, err error) {
defer mon.Task()(&ctx)(&err)
uploads := s.storj.multipart
uploads := layer.gateway.multipart
upload, err := uploads.Get(bucket, object, uploadID)
if err != nil {
return minio.ListPartsInfo{}, err
@ -179,8 +162,8 @@ func (s *storjObjects) ListObjectParts(ctx context.Context, bucket, object, uplo
}
// TODO: implement
// func (s *storjObjects) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result minio.ListMultipartsInfo, err error) {
// func (s *storjObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, uploadID string, partID int, startOffset int64, length int64, srcInfo minio.ObjectInfo) (info minio.PartInfo, err error) {
// func (layer *gatewayLayer) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result minio.ListMultipartsInfo, err error) {
// func (layer *gatewayLayer) CopyObjectPart(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, uploadID string, partID int, startOffset int64, length int64, srcInfo minio.ObjectInfo) (info minio.PartInfo, err error) {
// MultipartUploads manages pending multipart uploads
type MultipartUploads struct {

View File

@ -45,7 +45,7 @@ type Metainfo interface {
// CreateObject has optional parameters that can be set
type CreateObject struct {
Metadata []byte
Metadata map[string]string
ContentType string
Expires time.Time

View File

@ -37,7 +37,7 @@ type Object struct {
Path Path
IsPrefix bool
Metadata []byte
Metadata map[string]string
ContentType string
Created time.Time

View File

@ -7,8 +7,10 @@ import (
"context"
"io"
"github.com/gogo/protobuf/proto"
"golang.org/x/sync/errgroup"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storage/streams"
"storj.io/storj/pkg/storj"
"storj.io/storj/pkg/utils"
@ -37,11 +39,22 @@ func NewUpload(ctx context.Context, stream storj.MutableStream, streams streams.
upload.errgroup.Go(func() error {
obj := stream.Info()
_, err := streams.Put(ctx, storj.JoinPaths(obj.Bucket.Name, obj.Path), obj.Bucket.PathCipher, reader, obj.Metadata, obj.Expires)
if err != nil {
err = utils.CombineErrors(err, reader.CloseWithError(err))
serMetaInfo := pb.SerializableMeta{
ContentType: obj.ContentType,
UserDefined: obj.Metadata,
}
return err
metadata, err := proto.Marshal(&serMetaInfo)
if err != nil {
return utils.CombineErrors(err, reader.CloseWithError(err))
}
_, err = streams.Put(ctx, storj.JoinPaths(obj.Bucket.Name, obj.Path), obj.Bucket.PathCipher, reader, metadata, obj.Expires)
if err != nil {
return utils.CombineErrors(err, reader.CloseWithError(err))
}
return nil
})
return &upload