Migrate Minio Gateway to libuplink (#1652)
This commit is contained in:
parent
1b8976ab9a
commit
d6850f8691
@ -19,6 +19,7 @@ import (
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/storj/internal/fpath"
|
||||
libuplink "storj.io/storj/lib/uplink"
|
||||
"storj.io/storj/pkg/cfgstruct"
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/miniogw"
|
||||
@ -216,18 +217,44 @@ func (flags GatewayFlags) action(ctx context.Context, cliCtx *cli.Context, ident
|
||||
}
|
||||
|
||||
// NewGateway creates a new minio Gateway
|
||||
func (flags GatewayFlags) NewGateway(ctx context.Context, identity *identity.FullIdentity) (gw minio.Gateway, err error) {
|
||||
metainfo, streams, err := flags.GetMetainfo(ctx, identity)
|
||||
func (flags GatewayFlags) NewGateway(ctx context.Context, ident *identity.FullIdentity) (gw minio.Gateway, err error) {
|
||||
cfg := libuplink.Config{}
|
||||
cfg.Volatile.TLS = struct {
|
||||
SkipPeerCAWhitelist bool
|
||||
PeerCAWhitelistPath string
|
||||
}{
|
||||
SkipPeerCAWhitelist: !flags.TLS.UsePeerCAWhitelist,
|
||||
PeerCAWhitelistPath: flags.TLS.PeerCAWhitelistPath,
|
||||
}
|
||||
cfg.Volatile.UseIdentity = ident
|
||||
cfg.Volatile.MaxInlineSize = flags.Client.MaxInlineSize
|
||||
cfg.Volatile.MaxMemory = flags.RS.MaxBufferMem
|
||||
|
||||
uplink, err := libuplink.NewUplink(ctx, &cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
apiKey, err := libuplink.ParseAPIKey(flags.Client.APIKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
encKey := new(storj.Key)
|
||||
copy(encKey[:], flags.Enc.Key)
|
||||
|
||||
project, err := uplink.OpenProject(ctx, flags.Client.SatelliteAddr, encKey, apiKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return miniogw.NewStorjGateway(
|
||||
metainfo,
|
||||
streams,
|
||||
storj.Cipher(flags.Enc.PathType),
|
||||
flags.GetEncryptionScheme(),
|
||||
project,
|
||||
encKey,
|
||||
storj.Cipher(flags.Enc.PathType).ToCipherSuite(),
|
||||
flags.GetEncryptionScheme().ToEncryptionParameters(),
|
||||
flags.GetRedundancyScheme(),
|
||||
flags.Client.SegmentSize,
|
||||
), nil
|
||||
}
|
||||
|
||||
|
@ -288,6 +288,8 @@ func newNetwork(flags *Flags) (*Processes, error) {
|
||||
|
||||
"--satellite-addr", satellite.Address,
|
||||
|
||||
"--enc.key=TestEncryptionKey",
|
||||
|
||||
"--rs.min-threshold", strconv.Itoa(1 * flags.StorageNodeCount / 5),
|
||||
"--rs.repair-threshold", strconv.Itoa(2 * flags.StorageNodeCount / 5),
|
||||
"--rs.success-threshold", strconv.Itoa(3 * flags.StorageNodeCount / 5),
|
||||
|
@ -15,6 +15,8 @@ import (
|
||||
"github.com/zeebo/errs"
|
||||
monkit "gopkg.in/spacemonkeygo/monkit.v2"
|
||||
|
||||
"storj.io/storj/internal/memory"
|
||||
"storj.io/storj/lib/uplink"
|
||||
"storj.io/storj/pkg/storage/streams"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/pkg/stream"
|
||||
@ -28,25 +30,27 @@ var (
|
||||
)
|
||||
|
||||
// NewStorjGateway creates a *Storj object from an existing ObjectStore
|
||||
func NewStorjGateway(metainfo storj.Metainfo, streams streams.Store, pathCipher storj.Cipher, encryption storj.EncryptionScheme, redundancy storj.RedundancyScheme) *Gateway {
|
||||
func NewStorjGateway(project *uplink.Project, rootEncKey *storj.Key, pathCipher storj.CipherSuite, encryption storj.EncryptionParameters, redundancy storj.RedundancyScheme, segmentSize memory.Size) *Gateway {
|
||||
return &Gateway{
|
||||
metainfo: metainfo,
|
||||
streams: streams,
|
||||
pathCipher: pathCipher,
|
||||
encryption: encryption,
|
||||
redundancy: redundancy,
|
||||
multipart: NewMultipartUploads(),
|
||||
project: project,
|
||||
rootEncKey: rootEncKey,
|
||||
pathCipher: pathCipher,
|
||||
encryption: encryption,
|
||||
redundancy: redundancy,
|
||||
segmentSize: segmentSize,
|
||||
multipart: NewMultipartUploads(),
|
||||
}
|
||||
}
|
||||
|
||||
// Gateway is the implementation of a minio cmd.Gateway
|
||||
type Gateway struct {
|
||||
metainfo storj.Metainfo
|
||||
streams streams.Store
|
||||
pathCipher storj.Cipher
|
||||
encryption storj.EncryptionScheme
|
||||
redundancy storj.RedundancyScheme
|
||||
multipart *MultipartUploads
|
||||
project *uplink.Project
|
||||
rootEncKey *storj.Key
|
||||
pathCipher storj.CipherSuite
|
||||
encryption storj.EncryptionParameters
|
||||
redundancy storj.RedundancyScheme
|
||||
segmentSize memory.Size
|
||||
multipart *MultipartUploads
|
||||
}
|
||||
|
||||
// Name implements cmd.Gateway
|
||||
@ -69,92 +73,121 @@ type gatewayLayer struct {
|
||||
gateway *Gateway
|
||||
}
|
||||
|
||||
func (layer *gatewayLayer) DeleteBucket(ctx context.Context, bucket string) (err error) {
|
||||
func (layer *gatewayLayer) DeleteBucket(ctx context.Context, bucketName string) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
list, err := layer.gateway.metainfo.ListObjects(ctx, bucket, storj.ListOptions{Direction: storj.After, Recursive: true, Limit: 1})
|
||||
empty, err := layer.bucketEmpty(ctx, bucketName)
|
||||
if err != nil {
|
||||
return convertError(err, bucket, "")
|
||||
return convertError(err, bucketName, "")
|
||||
}
|
||||
|
||||
if len(list.Items) > 0 {
|
||||
return minio.BucketNotEmpty{Bucket: bucket}
|
||||
if !empty {
|
||||
return minio.BucketNotEmpty{Bucket: bucketName}
|
||||
}
|
||||
|
||||
err = layer.gateway.metainfo.DeleteBucket(ctx, bucket)
|
||||
err = layer.gateway.project.DeleteBucket(ctx, bucketName)
|
||||
|
||||
return convertError(err, bucket, "")
|
||||
return convertError(err, bucketName, "")
|
||||
}
|
||||
|
||||
func (layer *gatewayLayer) DeleteObject(ctx context.Context, bucket, object string) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
err = layer.gateway.metainfo.DeleteObject(ctx, bucket, object)
|
||||
|
||||
return convertError(err, bucket, object)
|
||||
}
|
||||
|
||||
func (layer *gatewayLayer) GetBucketInfo(ctx context.Context, bucket string) (bucketInfo minio.BucketInfo, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
info, err := layer.gateway.metainfo.GetBucket(ctx, bucket)
|
||||
|
||||
func (layer *gatewayLayer) bucketEmpty(ctx context.Context, bucketName string) (empty bool, err error) {
|
||||
bucket, err := layer.gateway.project.OpenBucket(ctx, bucketName, &uplink.EncryptionAccess{Key: *layer.gateway.rootEncKey})
|
||||
if err != nil {
|
||||
return minio.BucketInfo{}, convertError(err, bucket, "")
|
||||
return false, convertError(err, bucketName, "")
|
||||
}
|
||||
defer func() { err = errs.Combine(err, bucket.Close()) }()
|
||||
|
||||
list, err := bucket.ListObjects(ctx, &storj.ListOptions{Direction: storj.After, Recursive: true, Limit: 1})
|
||||
if err != nil {
|
||||
return false, convertError(err, bucketName, "")
|
||||
}
|
||||
|
||||
return minio.BucketInfo{Name: info.Name, Created: info.Created}, nil
|
||||
return len(list.Items) == 0, nil
|
||||
}
|
||||
|
||||
func (layer *gatewayLayer) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) (err error) {
|
||||
func (layer *gatewayLayer) DeleteObject(ctx context.Context, bucketName, objectPath string) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
readOnlyStream, err := layer.gateway.metainfo.GetObjectStream(ctx, bucket, object)
|
||||
bucket, err := layer.gateway.project.OpenBucket(ctx, bucketName, &uplink.EncryptionAccess{Key: *layer.gateway.rootEncKey})
|
||||
if err != nil {
|
||||
return convertError(err, bucket, object)
|
||||
return convertError(err, bucketName, "")
|
||||
}
|
||||
defer func() { err = errs.Combine(err, bucket.Close()) }()
|
||||
|
||||
err = bucket.DeleteObject(ctx, objectPath)
|
||||
|
||||
return convertError(err, bucketName, objectPath)
|
||||
}
|
||||
|
||||
func (layer *gatewayLayer) GetBucketInfo(ctx context.Context, bucketName string) (bucketInfo minio.BucketInfo, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
bucket, _, err := layer.gateway.project.GetBucketInfo(ctx, bucketName)
|
||||
|
||||
if err != nil {
|
||||
return minio.BucketInfo{}, convertError(err, bucketName, "")
|
||||
}
|
||||
|
||||
if startOffset < 0 || length < -1 || startOffset+length > readOnlyStream.Info().Size {
|
||||
return minio.BucketInfo{Name: bucket.Name, Created: bucket.Created}, nil
|
||||
}
|
||||
|
||||
func (layer *gatewayLayer) GetObject(ctx context.Context, bucketName, objectPath string, startOffset int64, length int64, writer io.Writer, etag string) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
bucket, err := layer.gateway.project.OpenBucket(ctx, bucketName, &uplink.EncryptionAccess{Key: *layer.gateway.rootEncKey})
|
||||
if err != nil {
|
||||
return convertError(err, bucketName, "")
|
||||
}
|
||||
defer func() { err = errs.Combine(err, bucket.Close()) }()
|
||||
|
||||
object, err := bucket.OpenObject(ctx, objectPath)
|
||||
if err != nil {
|
||||
return convertError(err, bucketName, objectPath)
|
||||
}
|
||||
defer func() { err = errs.Combine(err, object.Close()) }()
|
||||
|
||||
if startOffset < 0 || length < -1 || startOffset+length > object.Meta.Size {
|
||||
return minio.InvalidRange{
|
||||
OffsetBegin: startOffset,
|
||||
OffsetEnd: startOffset + length,
|
||||
ResourceSize: readOnlyStream.Info().Size,
|
||||
ResourceSize: object.Meta.Size,
|
||||
}
|
||||
}
|
||||
|
||||
download := stream.NewDownload(ctx, readOnlyStream, layer.gateway.streams)
|
||||
defer func() { err = errs.Combine(err, download.Close()) }()
|
||||
|
||||
_, err = download.Seek(startOffset, io.SeekStart)
|
||||
reader, err := object.DownloadRange(ctx, startOffset, length)
|
||||
if err != nil {
|
||||
return err
|
||||
return convertError(err, bucketName, objectPath)
|
||||
}
|
||||
defer func() { err = errs.Combine(err, reader.Close()) }()
|
||||
|
||||
if length == -1 {
|
||||
_, err = io.Copy(writer, download)
|
||||
} else {
|
||||
_, err = io.CopyN(writer, download, length)
|
||||
}
|
||||
_, err = io.Copy(writer, reader)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (layer *gatewayLayer) GetObjectInfo(ctx context.Context, bucket, object string) (objInfo minio.ObjectInfo, err error) {
|
||||
func (layer *gatewayLayer) GetObjectInfo(ctx context.Context, bucketName, objectPath string) (objInfo minio.ObjectInfo, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
obj, err := layer.gateway.metainfo.GetObject(ctx, bucket, object)
|
||||
bucket, err := layer.gateway.project.OpenBucket(ctx, bucketName, &uplink.EncryptionAccess{Key: *layer.gateway.rootEncKey})
|
||||
if err != nil {
|
||||
return minio.ObjectInfo{}, convertError(err, bucket, object)
|
||||
return minio.ObjectInfo{}, convertError(err, bucketName, "")
|
||||
}
|
||||
defer func() { err = errs.Combine(err, bucket.Close()) }()
|
||||
|
||||
object, err := bucket.OpenObject(ctx, objectPath)
|
||||
if err != nil {
|
||||
return minio.ObjectInfo{}, convertError(err, bucketName, objectPath)
|
||||
}
|
||||
defer func() { err = errs.Combine(err, object.Close()) }()
|
||||
|
||||
return minio.ObjectInfo{
|
||||
Name: object,
|
||||
Bucket: bucket,
|
||||
ModTime: obj.Modified,
|
||||
Size: obj.Size,
|
||||
ETag: hex.EncodeToString(obj.Checksum),
|
||||
ContentType: obj.ContentType,
|
||||
UserDefined: obj.Metadata,
|
||||
Name: object.Meta.Path,
|
||||
Bucket: object.Meta.Bucket,
|
||||
ModTime: object.Meta.Modified,
|
||||
Size: object.Meta.Size,
|
||||
ETag: hex.EncodeToString(object.Meta.Checksum),
|
||||
ContentType: object.Meta.ContentType,
|
||||
UserDefined: object.Meta.Metadata,
|
||||
}, err
|
||||
}
|
||||
|
||||
@ -164,7 +197,7 @@ func (layer *gatewayLayer) ListBuckets(ctx context.Context) (bucketItems []minio
|
||||
startAfter := ""
|
||||
|
||||
for {
|
||||
list, err := layer.gateway.metainfo.ListBuckets(ctx, storj.BucketListOptions{Direction: storj.After, Cursor: startAfter})
|
||||
list, err := layer.gateway.project.ListBuckets(ctx, &storj.BucketListOptions{Direction: storj.After, Cursor: startAfter})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -183,20 +216,26 @@ func (layer *gatewayLayer) ListBuckets(ctx context.Context) (bucketItems []minio
|
||||
return bucketItems, err
|
||||
}
|
||||
|
||||
func (layer *gatewayLayer) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result minio.ListObjectsInfo, err error) {
|
||||
func (layer *gatewayLayer) ListObjects(ctx context.Context, bucketName, prefix, marker, delimiter string, maxKeys int) (result minio.ListObjectsInfo, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if delimiter != "" && delimiter != "/" {
|
||||
return minio.ListObjectsInfo{}, minio.UnsupportedDelimiter{Delimiter: delimiter}
|
||||
}
|
||||
|
||||
bucket, err := layer.gateway.project.OpenBucket(ctx, bucketName, &uplink.EncryptionAccess{Key: *layer.gateway.rootEncKey})
|
||||
if err != nil {
|
||||
return minio.ListObjectsInfo{}, convertError(err, bucketName, "")
|
||||
}
|
||||
defer func() { err = errs.Combine(err, bucket.Close()) }()
|
||||
|
||||
startAfter := marker
|
||||
recursive := delimiter == ""
|
||||
|
||||
var objects []minio.ObjectInfo
|
||||
var prefixes []string
|
||||
|
||||
list, err := layer.gateway.metainfo.ListObjects(ctx, bucket, storj.ListOptions{
|
||||
list, err := bucket.ListObjects(ctx, &storj.ListOptions{
|
||||
Direction: storj.After,
|
||||
Cursor: startAfter,
|
||||
Prefix: prefix,
|
||||
@ -204,7 +243,7 @@ func (layer *gatewayLayer) ListObjects(ctx context.Context, bucket, prefix, mark
|
||||
Limit: maxKeys,
|
||||
})
|
||||
if err != nil {
|
||||
return result, convertError(err, bucket, "")
|
||||
return result, convertError(err, bucketName, "")
|
||||
}
|
||||
|
||||
if len(list.Items) > 0 {
|
||||
@ -218,9 +257,8 @@ func (layer *gatewayLayer) ListObjects(ctx context.Context, bucket, prefix, mark
|
||||
continue
|
||||
}
|
||||
objects = append(objects, minio.ObjectInfo{
|
||||
Bucket: bucket,
|
||||
IsDir: false,
|
||||
Name: path,
|
||||
Bucket: item.Bucket.Name,
|
||||
ModTime: item.Modified,
|
||||
Size: item.Size,
|
||||
ETag: hex.EncodeToString(item.Checksum),
|
||||
@ -244,13 +282,19 @@ func (layer *gatewayLayer) ListObjects(ctx context.Context, bucket, prefix, mark
|
||||
}
|
||||
|
||||
// ListObjectsV2 - Not implemented stub
|
||||
func (layer *gatewayLayer) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result minio.ListObjectsV2Info, err error) {
|
||||
func (layer *gatewayLayer) ListObjectsV2(ctx context.Context, bucketName, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result minio.ListObjectsV2Info, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if delimiter != "" && delimiter != "/" {
|
||||
return minio.ListObjectsV2Info{ContinuationToken: continuationToken}, minio.UnsupportedDelimiter{Delimiter: delimiter}
|
||||
}
|
||||
|
||||
bucket, err := layer.gateway.project.OpenBucket(ctx, bucketName, &uplink.EncryptionAccess{Key: *layer.gateway.rootEncKey})
|
||||
if err != nil {
|
||||
return minio.ListObjectsV2Info{}, convertError(err, bucketName, "")
|
||||
}
|
||||
defer func() { err = errs.Combine(err, bucket.Close()) }()
|
||||
|
||||
recursive := delimiter == ""
|
||||
var nextContinuationToken string
|
||||
|
||||
@ -265,7 +309,7 @@ func (layer *gatewayLayer) ListObjectsV2(ctx context.Context, bucket, prefix, co
|
||||
var objects []minio.ObjectInfo
|
||||
var prefixes []string
|
||||
|
||||
list, err := layer.gateway.metainfo.ListObjects(ctx, bucket, storj.ListOptions{
|
||||
list, err := bucket.ListObjects(ctx, &storj.ListOptions{
|
||||
Direction: storj.After,
|
||||
Cursor: startAfterPath,
|
||||
Prefix: prefix,
|
||||
@ -273,7 +317,7 @@ func (layer *gatewayLayer) ListObjectsV2(ctx context.Context, bucket, prefix, co
|
||||
Limit: maxKeys,
|
||||
})
|
||||
if err != nil {
|
||||
return minio.ListObjectsV2Info{ContinuationToken: continuationToken}, convertError(err, bucket, "")
|
||||
return minio.ListObjectsV2Info{ContinuationToken: continuationToken}, convertError(err, bucketName, "")
|
||||
}
|
||||
|
||||
if len(list.Items) > 0 {
|
||||
@ -287,9 +331,8 @@ func (layer *gatewayLayer) ListObjectsV2(ctx context.Context, bucket, prefix, co
|
||||
continue
|
||||
}
|
||||
objects = append(objects, minio.ObjectInfo{
|
||||
Bucket: bucket,
|
||||
IsDir: false,
|
||||
Name: path,
|
||||
Bucket: item.Bucket.Name,
|
||||
ModTime: item.Modified,
|
||||
Size: item.Size,
|
||||
ETag: hex.EncodeToString(item.Checksum),
|
||||
@ -314,7 +357,7 @@ func (layer *gatewayLayer) ListObjectsV2(ctx context.Context, bucket, prefix, co
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (layer *gatewayLayer) MakeBucketWithLocation(ctx context.Context, bucket string, location string) (err error) {
|
||||
func (layer *gatewayLayer) MakeBucketWithLocation(ctx context.Context, bucketName string, location string) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
// TODO: This current strategy of calling bs.Get
|
||||
// to check if a bucket exists, then calling bs.Put
|
||||
@ -323,16 +366,23 @@ func (layer *gatewayLayer) MakeBucketWithLocation(ctx context.Context, bucket st
|
||||
// therefore try to Put a bucket at the same time.
|
||||
// The reason for the Get call to check if the
|
||||
// bucket already exists is to match S3 CLI behavior.
|
||||
_, err = layer.gateway.metainfo.GetBucket(ctx, bucket)
|
||||
_, _, err = layer.gateway.project.GetBucketInfo(ctx, bucketName)
|
||||
if err == nil {
|
||||
return minio.BucketAlreadyExists{Bucket: bucket}
|
||||
return minio.BucketAlreadyExists{Bucket: bucketName}
|
||||
}
|
||||
|
||||
if !storj.ErrBucketNotFound.Has(err) {
|
||||
return convertError(err, bucket, "")
|
||||
return convertError(err, bucketName, "")
|
||||
}
|
||||
|
||||
_, err = layer.gateway.metainfo.CreateBucket(ctx, bucket, &storj.Bucket{PathCipher: layer.gateway.pathCipher})
|
||||
cfg := uplink.BucketConfig{
|
||||
PathCipher: layer.gateway.pathCipher,
|
||||
EncryptionParameters: layer.gateway.encryption,
|
||||
}
|
||||
cfg.Volatile.RedundancyScheme = layer.gateway.redundancy
|
||||
cfg.Volatile.SegmentsSize = layer.gateway.segmentSize
|
||||
|
||||
_, err = layer.gateway.project.CreateBucket(ctx, bucketName, &cfg)
|
||||
|
||||
return err
|
||||
}
|
||||
@ -340,54 +390,63 @@ func (layer *gatewayLayer) MakeBucketWithLocation(ctx context.Context, bucket st
|
||||
func (layer *gatewayLayer) CopyObject(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo minio.ObjectInfo) (objInfo minio.ObjectInfo, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
readOnlyStream, err := layer.gateway.metainfo.GetObjectStream(ctx, srcBucket, srcObject)
|
||||
bucket, err := layer.gateway.project.OpenBucket(ctx, srcBucket, &uplink.EncryptionAccess{Key: *layer.gateway.rootEncKey})
|
||||
if err != nil {
|
||||
return minio.ObjectInfo{}, convertError(err, srcBucket, "")
|
||||
}
|
||||
defer func() { err = errs.Combine(err, bucket.Close()) }()
|
||||
|
||||
object, err := bucket.OpenObject(ctx, srcObject)
|
||||
if err != nil {
|
||||
return minio.ObjectInfo{}, convertError(err, srcBucket, srcObject)
|
||||
}
|
||||
defer func() { err = errs.Combine(err, object.Close()) }()
|
||||
|
||||
download := stream.NewDownload(ctx, readOnlyStream, layer.gateway.streams)
|
||||
defer func() { err = errs.Combine(err, download.Close()) }()
|
||||
|
||||
info := readOnlyStream.Info()
|
||||
createInfo := storj.CreateObject{
|
||||
ContentType: info.ContentType,
|
||||
Expires: info.Expires,
|
||||
Metadata: info.Metadata,
|
||||
RedundancyScheme: info.RedundancyScheme,
|
||||
EncryptionScheme: info.EncryptionScheme,
|
||||
reader, err := object.DownloadRange(ctx, 0, -1)
|
||||
if err != nil {
|
||||
return minio.ObjectInfo{}, convertError(err, srcBucket, srcObject)
|
||||
}
|
||||
defer func() { err = errs.Combine(err, reader.Close()) }()
|
||||
|
||||
return layer.putObject(ctx, destBucket, destObject, download, &createInfo)
|
||||
opts := uplink.UploadOptions{
|
||||
ContentType: object.Meta.ContentType,
|
||||
Metadata: object.Meta.Metadata,
|
||||
Expires: object.Meta.Expires,
|
||||
}
|
||||
opts.Volatile.EncryptionParameters = object.Meta.Volatile.EncryptionParameters
|
||||
opts.Volatile.RedundancyScheme = object.Meta.Volatile.RedundancyScheme
|
||||
|
||||
return layer.putObject(ctx, destBucket, destObject, reader, &opts)
|
||||
}
|
||||
|
||||
func (layer *gatewayLayer) putObject(ctx context.Context, bucket, object string, reader io.Reader, createInfo *storj.CreateObject) (objInfo minio.ObjectInfo, err error) {
|
||||
func (layer *gatewayLayer) putObject(ctx context.Context, bucketName, objectPath string, reader io.Reader, opts *uplink.UploadOptions) (objInfo minio.ObjectInfo, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
mutableObject, err := layer.gateway.metainfo.CreateObject(ctx, bucket, object, createInfo)
|
||||
bucket, err := layer.gateway.project.OpenBucket(ctx, bucketName, &uplink.EncryptionAccess{Key: *layer.gateway.rootEncKey})
|
||||
if err != nil {
|
||||
return minio.ObjectInfo{}, convertError(err, bucket, object)
|
||||
return minio.ObjectInfo{}, convertError(err, bucketName, "")
|
||||
}
|
||||
defer func() { err = errs.Combine(err, bucket.Close()) }()
|
||||
|
||||
err = bucket.UploadObject(ctx, objectPath, reader, opts)
|
||||
if err != nil {
|
||||
return minio.ObjectInfo{}, convertError(err, bucketName, "")
|
||||
}
|
||||
|
||||
err = upload(ctx, layer.gateway.streams, mutableObject, reader)
|
||||
object, err := bucket.OpenObject(ctx, objectPath)
|
||||
if err != nil {
|
||||
return minio.ObjectInfo{}, err
|
||||
return minio.ObjectInfo{}, convertError(err, bucketName, objectPath)
|
||||
}
|
||||
|
||||
err = mutableObject.Commit(ctx)
|
||||
if err != nil {
|
||||
return minio.ObjectInfo{}, err
|
||||
}
|
||||
|
||||
info := mutableObject.Info()
|
||||
defer func() { err = errs.Combine(err, object.Close()) }()
|
||||
|
||||
return minio.ObjectInfo{
|
||||
Name: object,
|
||||
Bucket: bucket,
|
||||
ModTime: info.Modified,
|
||||
Size: info.Size,
|
||||
ETag: hex.EncodeToString(info.Checksum),
|
||||
ContentType: info.ContentType,
|
||||
UserDefined: info.Metadata,
|
||||
Name: object.Meta.Path,
|
||||
Bucket: object.Meta.Bucket,
|
||||
ModTime: object.Meta.Modified,
|
||||
Size: object.Meta.Size,
|
||||
ETag: hex.EncodeToString(object.Meta.Checksum),
|
||||
ContentType: object.Meta.ContentType,
|
||||
UserDefined: object.Meta.Metadata,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -404,20 +463,18 @@ func upload(ctx context.Context, streams streams.Store, mutableObject storj.Muta
|
||||
return errs.Combine(err, upload.Close())
|
||||
}
|
||||
|
||||
func (layer *gatewayLayer) PutObject(ctx context.Context, bucket, object string, data *hash.Reader, metadata map[string]string) (objInfo minio.ObjectInfo, err error) {
|
||||
func (layer *gatewayLayer) PutObject(ctx context.Context, bucketName, objectPath string, data *hash.Reader, metadata map[string]string) (objInfo minio.ObjectInfo, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
contentType := metadata["content-type"]
|
||||
delete(metadata, "content-type")
|
||||
|
||||
createInfo := storj.CreateObject{
|
||||
ContentType: contentType,
|
||||
Metadata: metadata,
|
||||
RedundancyScheme: layer.gateway.redundancy,
|
||||
EncryptionScheme: layer.gateway.encryption,
|
||||
opts := uplink.UploadOptions{
|
||||
ContentType: contentType,
|
||||
Metadata: metadata,
|
||||
}
|
||||
|
||||
return layer.putObject(ctx, bucket, object, data, &createInfo)
|
||||
return layer.putObject(ctx, bucketName, objectPath, data, &opts)
|
||||
}
|
||||
|
||||
func (layer *gatewayLayer) Shutdown(ctx context.Context) (err error) {
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"storj.io/storj/internal/memory"
|
||||
"storj.io/storj/internal/testcontext"
|
||||
"storj.io/storj/internal/testplanet"
|
||||
libuplink "storj.io/storj/lib/uplink"
|
||||
"storj.io/storj/pkg/eestream"
|
||||
"storj.io/storj/pkg/metainfo/kvmetainfo"
|
||||
"storj.io/storj/pkg/pb"
|
||||
@ -56,7 +57,7 @@ func TestMakeBucketWithLocation(t *testing.T) {
|
||||
bucket, err := metainfo.GetBucket(ctx, TestBucket)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, TestBucket, bucket.Name)
|
||||
assert.True(t, time.Since(bucket.Created) < 1*time.Second)
|
||||
assert.True(t, time.Since(bucket.Created) < 1*time.Minute)
|
||||
assert.Equal(t, storj.AESGCM, bucket.PathCipher)
|
||||
|
||||
// Check the error when trying to create an existing bucket
|
||||
@ -197,7 +198,7 @@ func TestPutObject(t *testing.T) {
|
||||
assert.Equal(t, TestFile, info.Name)
|
||||
assert.Equal(t, TestBucket, info.Bucket)
|
||||
assert.False(t, info.IsDir)
|
||||
assert.True(t, time.Since(info.ModTime) < 1*time.Second)
|
||||
assert.True(t, time.Since(info.ModTime) < 1*time.Minute)
|
||||
assert.Equal(t, data.Size(), info.Size)
|
||||
// assert.Equal(t, data.SHA256HexString(), info.ETag) TODO: when we start calculating checksums
|
||||
assert.Equal(t, serMetaInfo.ContentType, info.ContentType)
|
||||
@ -375,7 +376,7 @@ func TestCopyObject(t *testing.T) {
|
||||
assert.Equal(t, DestFile, info.Name)
|
||||
assert.Equal(t, DestBucket, info.Bucket)
|
||||
assert.False(t, info.IsDir)
|
||||
assert.True(t, info.ModTime.Sub(obj.Modified) < 1*time.Second)
|
||||
assert.True(t, info.ModTime.Sub(obj.Modified) < 1*time.Minute)
|
||||
assert.Equal(t, obj.Size, info.Size)
|
||||
assert.Equal(t, hex.EncodeToString(obj.Checksum), info.ETag)
|
||||
assert.Equal(t, createInfo.ContentType, info.ContentType)
|
||||
@ -647,7 +648,7 @@ func runTest(t *testing.T, test func(context.Context, minio.ObjectLayer, storj.M
|
||||
|
||||
planet.Start(ctx)
|
||||
|
||||
layer, metainfo, streams, err := initEnv(planet)
|
||||
layer, metainfo, streams, err := initEnv(ctx, planet)
|
||||
if !assert.NoError(t, err) {
|
||||
return
|
||||
}
|
||||
@ -655,10 +656,10 @@ func runTest(t *testing.T, test func(context.Context, minio.ObjectLayer, storj.M
|
||||
test(ctx, layer, metainfo, streams)
|
||||
}
|
||||
|
||||
func initEnv(planet *testplanet.Planet) (minio.ObjectLayer, storj.Metainfo, streams.Store, error) {
|
||||
func initEnv(ctx context.Context, planet *testplanet.Planet) (minio.ObjectLayer, storj.Metainfo, streams.Store, error) {
|
||||
// TODO(kaloyan): We should have a better way for configuring the Satellite's API Key
|
||||
// add project to satisfy constraint
|
||||
project, err := planet.Satellites[0].DB.Console().Projects().Insert(context.Background(), &console.Project{
|
||||
project, err := planet.Satellites[0].DB.Console().Projects().Insert(ctx, &console.Project{
|
||||
Name: "testProject",
|
||||
})
|
||||
|
||||
@ -673,12 +674,12 @@ func initEnv(planet *testplanet.Planet) (minio.ObjectLayer, storj.Metainfo, stre
|
||||
}
|
||||
|
||||
// add api key to db
|
||||
_, err = planet.Satellites[0].DB.Console().APIKeys().Create(context.Background(), apiKey, apiKeyInfo)
|
||||
_, err = planet.Satellites[0].DB.Console().APIKeys().Create(ctx, apiKey, apiKeyInfo)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
metainfo, err := planet.Uplinks[0].DialMetainfo(context.Background(), planet.Satellites[0], apiKey.String())
|
||||
metainfo, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey.String())
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
@ -696,25 +697,49 @@ func initEnv(planet *testplanet.Planet) (minio.ObjectLayer, storj.Metainfo, stre
|
||||
|
||||
segments := segments.NewSegmentStore(metainfo, ec, rs, 4*memory.KiB.Int(), 8*memory.MiB.Int64())
|
||||
|
||||
key := new(storj.Key)
|
||||
copy(key[:], TestEncKey)
|
||||
encKey := new(storj.Key)
|
||||
copy(encKey[:], TestEncKey)
|
||||
|
||||
streams, err := streams.NewStreamStore(segments, 64*memory.MiB.Int64(), key, 1*memory.KiB.Int(), storj.AESGCM)
|
||||
streams, err := streams.NewStreamStore(segments, 64*memory.MiB.Int64(), encKey, 1*memory.KiB.Int(), storj.AESGCM)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
buckets := buckets.NewStore(streams)
|
||||
|
||||
kvmetainfo := kvmetainfo.New(metainfo, buckets, streams, segments, key, 1*memory.KiB.Int32(), rs, 64*memory.MiB.Int64())
|
||||
kvmetainfo := kvmetainfo.New(metainfo, buckets, streams, segments, encKey, 1*memory.KiB.Int32(), rs, 64*memory.MiB.Int64())
|
||||
|
||||
cfg := libuplink.Config{}
|
||||
cfg.Volatile.TLS = struct {
|
||||
SkipPeerCAWhitelist bool
|
||||
PeerCAWhitelistPath string
|
||||
}{
|
||||
SkipPeerCAWhitelist: true,
|
||||
}
|
||||
cfg.Volatile.UseIdentity = planet.Uplinks[0].Identity
|
||||
|
||||
uplink, err := libuplink.NewUplink(ctx, &cfg)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
parsedAPIKey, err := libuplink.ParseAPIKey(apiKey.String())
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
proj, err := uplink.OpenProject(ctx, planet.Satellites[0].Addr(), encKey, parsedAPIKey)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
gateway := NewStorjGateway(
|
||||
kvmetainfo,
|
||||
streams,
|
||||
storj.AESGCM,
|
||||
storj.EncryptionScheme{
|
||||
Cipher: storj.AESGCM,
|
||||
BlockSize: 1 * memory.KiB.Int32(),
|
||||
proj,
|
||||
encKey,
|
||||
storj.EncAESGCM,
|
||||
storj.EncryptionParameters{
|
||||
CipherSuite: storj.EncAESGCM,
|
||||
BlockSize: 1 * memory.KiB.Int32(),
|
||||
},
|
||||
storj.RedundancyScheme{
|
||||
Algorithm: storj.ReedSolomon,
|
||||
@ -724,6 +749,7 @@ func initEnv(planet *testplanet.Planet) (minio.ObjectLayer, storj.Metainfo, stre
|
||||
TotalShares: int16(rs.TotalCount()),
|
||||
ShareSize: int32(rs.ErasureShareSize()),
|
||||
},
|
||||
8*memory.MiB,
|
||||
)
|
||||
|
||||
layer, err := gateway.NewGatewayLayer(auth.Credentials{})
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
"storj.io/storj/internal/testcontext"
|
||||
"storj.io/storj/internal/testidentity"
|
||||
"storj.io/storj/internal/testplanet"
|
||||
libuplink "storj.io/storj/lib/uplink"
|
||||
"storj.io/storj/pkg/cfgstruct"
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/miniogw"
|
||||
@ -141,7 +142,7 @@ func TestUploadDownload(t *testing.T) {
|
||||
}
|
||||
|
||||
// runGateway creates and starts a gateway
|
||||
func runGateway(ctx context.Context, gwCfg config, uplinkCfg uplink.Config, log *zap.Logger, identity *identity.FullIdentity) (err error) {
|
||||
func runGateway(ctx context.Context, gwCfg config, uplinkCfg uplink.Config, log *zap.Logger, ident *identity.FullIdentity) (err error) {
|
||||
|
||||
// set gateway flags
|
||||
flags := flag.NewFlagSet("gateway", flag.ExitOnError)
|
||||
@ -168,17 +169,43 @@ func runGateway(ctx context.Context, gwCfg config, uplinkCfg uplink.Config, log
|
||||
return err
|
||||
}
|
||||
|
||||
metainfo, streams, err := uplinkCfg.GetMetainfo(ctx, identity)
|
||||
cfg := libuplink.Config{}
|
||||
cfg.Volatile.TLS = struct {
|
||||
SkipPeerCAWhitelist bool
|
||||
PeerCAWhitelistPath string
|
||||
}{
|
||||
SkipPeerCAWhitelist: !uplinkCfg.TLS.UsePeerCAWhitelist,
|
||||
PeerCAWhitelistPath: uplinkCfg.TLS.PeerCAWhitelistPath,
|
||||
}
|
||||
cfg.Volatile.UseIdentity = ident
|
||||
cfg.Volatile.MaxInlineSize = uplinkCfg.Client.MaxInlineSize
|
||||
cfg.Volatile.MaxMemory = uplinkCfg.RS.MaxBufferMem
|
||||
|
||||
uplink, err := libuplink.NewUplink(ctx, &cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
apiKey, err := libuplink.ParseAPIKey(uplinkCfg.Client.APIKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
encKey := new(storj.Key)
|
||||
copy(encKey[:], uplinkCfg.Enc.Key)
|
||||
|
||||
project, err := uplink.OpenProject(ctx, uplinkCfg.Client.SatelliteAddr, encKey, apiKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
gw := miniogw.NewStorjGateway(
|
||||
metainfo,
|
||||
streams,
|
||||
storj.Cipher(uplinkCfg.Enc.PathType),
|
||||
uplinkCfg.GetEncryptionScheme(),
|
||||
project,
|
||||
encKey,
|
||||
storj.Cipher(uplinkCfg.Enc.PathType).ToCipherSuite(),
|
||||
uplinkCfg.GetEncryptionScheme().ToEncryptionParameters(),
|
||||
uplinkCfg.GetRedundancyScheme(),
|
||||
uplinkCfg.Client.SegmentSize,
|
||||
)
|
||||
|
||||
minio.StartGateway(cliCtx, miniogw.Logging(gw, log))
|
||||
|
@ -15,14 +15,14 @@ import (
|
||||
minio "github.com/minio/minio/cmd"
|
||||
"github.com/minio/minio/pkg/hash"
|
||||
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/lib/uplink"
|
||||
)
|
||||
|
||||
func (layer *gatewayLayer) NewMultipartUpload(ctx context.Context, bucket, object string, metadata map[string]string) (uploadID string, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
// Check that the bucket exists
|
||||
_, err = layer.gateway.metainfo.GetBucket(ctx, bucket)
|
||||
_, _, err = layer.gateway.project.GetBucketInfo(ctx, bucket)
|
||||
if err != nil {
|
||||
return "", convertError(err, bucket, "")
|
||||
}
|
||||
@ -38,13 +38,11 @@ func (layer *gatewayLayer) NewMultipartUpload(ctx context.Context, bucket, objec
|
||||
contentType := metadata["content-type"]
|
||||
delete(metadata, "content-type")
|
||||
|
||||
createInfo := storj.CreateObject{
|
||||
ContentType: contentType,
|
||||
Metadata: metadata,
|
||||
RedundancyScheme: layer.gateway.redundancy,
|
||||
EncryptionScheme: layer.gateway.encryption,
|
||||
opts := uplink.UploadOptions{
|
||||
ContentType: contentType,
|
||||
Metadata: metadata,
|
||||
}
|
||||
objInfo, err := layer.putObject(ctx, bucket, object, upload.Stream, &createInfo)
|
||||
objInfo, err := layer.putObject(ctx, bucket, object, upload.Stream, &opts)
|
||||
|
||||
uploads.RemoveByID(upload.ID)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user