storj/pkg/miniogw/gateway.go
Jeff Wendling efcdaa43a3
lib/uplink: encryption context (#2349)
* lib/uplink: encryption context

Change-Id: I5c23dca3286a46b713b30c4997e9ae6e630b2280

* lib/uplink: bucket operation examples

Change-Id: Ia0f6e69f365dcff0cf11c731f51b30842bce053b

* lib/uplink: encryption key sharing test cases

Change-Id: I3a172d565f33f4e591402cdcb9460664a7cc7fbe

* fix encrypted path prefix restriction issue

Change-Id: I8f3921f9d52aaf4b84039de608b8cbbc88769554

* implement panics in libuplink encryption code

todo on cipher suite selection as well as an api concern

Change-Id: Ifa39eb3cc4b3443f7d96f9304df9b2ac4ec4085d

* implement GetProjectInfo api call to get salt

Change-Id: Ic5f6b3be9ea35df48c1aa214ab5d355fb328e2cf

* some fixes and accessors for encryption store

Change-Id: I3bb61f6712a037900e2a96e72ad4029ec1d3f718

* general fixes to builds/tests/etc

Change-Id: I9930fa96acb3b221d9a001f8e274af5729cc8a47

* java bindings changes

Change-Id: Ia2bd4c9c69739c8d3154d79616cff1f36fb403b6

* get libuplink examples passing

Change-Id: I828f09a144160e0a5dd932324f78491ae2ec8a07

* fix proto.lock file

Change-Id: I2fbbf4d0976a7d0473c2645e6dcb21aaa3be7651

* fix proto.lock again

Change-Id: I92702cf49e1a340eef6379c2be4f7c4a268112a9

* fix golint issues

Change-Id: I631ff9f43307a58e3b25a58cbb4a4cc2495f5eb6

* more linting fixes

Change-Id: I51f8f30b367b5bca14c94b15417b9a4c9e7aa0ce

* bug fixed by structs bump

Change-Id: Ibb03c691fce7606c35c08721b3ef0781ab48a38a

* retrigger

Change-Id: Ieee0470b6a2d07168a1578552e8e7f271ae93a13

* retrigger

Change-Id: I753d63853171e6a436c104ce176048892eb974c5

* semantic merge conflict

Change-Id: I9419448496de90340569047a6a16a1b858a7978a

* update total to match prod defaults

Change-Id: I693d55c1ebb28b5803ee1d26e9e198decf82308b

* retrigger

Change-Id: I28b74d5d6202f61aa3866fe407d423f6a0a14b9e

* retrigger

Change-Id: I6fd054885c715f602e2cef623fd464c42e88742c

* retrigger

Change-Id: I6a01bae88c72406d4ed5a8f13bf8a2b3c650bd2d
2019-06-27 17:36:51 +00:00

510 lines
15 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package miniogw
import (
"context"
"encoding/hex"
"io"
"strings"
minio "github.com/minio/minio/cmd"
"github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/hash"
"github.com/zeebo/errs"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/internal/memory"
"storj.io/storj/lib/uplink"
"storj.io/storj/pkg/storage/streams"
"storj.io/storj/pkg/storj"
"storj.io/storj/pkg/stream"
)
var (
mon = monkit.Package()
// Error is the errs class of standard End User Client errors
Error = errs.Class("Storj Gateway error")
)
// NewStorjGateway creates a *Storj object from an existing ObjectStore
func NewStorjGateway(project *uplink.Project, encCtx *uplink.EncryptionCtx, pathCipher storj.CipherSuite, encryption storj.EncryptionParameters, redundancy storj.RedundancyScheme, segmentSize memory.Size) *Gateway {
return &Gateway{
project: project,
encCtx: encCtx,
pathCipher: pathCipher,
encryption: encryption,
redundancy: redundancy,
segmentSize: segmentSize,
multipart: NewMultipartUploads(),
}
}
// Gateway is the implementation of a minio cmd.Gateway
type Gateway struct {
project *uplink.Project
encCtx *uplink.EncryptionCtx
pathCipher storj.CipherSuite
encryption storj.EncryptionParameters
redundancy storj.RedundancyScheme
segmentSize memory.Size
multipart *MultipartUploads
}
// Name implements cmd.Gateway
func (gateway *Gateway) Name() string {
return "storj"
}
// NewGatewayLayer implements cmd.Gateway
func (gateway *Gateway) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, error) {
return &gatewayLayer{gateway: gateway}, nil
}
// Production implements cmd.Gateway
func (gateway *Gateway) Production() bool {
return false
}
type gatewayLayer struct {
minio.GatewayUnsupported
gateway *Gateway
}
func (layer *gatewayLayer) DeleteBucket(ctx context.Context, bucketName string) (err error) {
defer mon.Task()(&ctx)(&err)
empty, err := layer.bucketEmpty(ctx, bucketName)
if err != nil {
return convertError(err, bucketName, "")
}
if !empty {
return minio.BucketNotEmpty{Bucket: bucketName}
}
err = layer.gateway.project.DeleteBucket(ctx, bucketName)
return convertError(err, bucketName, "")
}
func (layer *gatewayLayer) bucketEmpty(ctx context.Context, bucketName string) (empty bool, err error) {
defer mon.Task()(&ctx)(&err)
bucket, err := layer.gateway.project.OpenBucket(ctx, bucketName, layer.gateway.encCtx)
if err != nil {
return false, convertError(err, bucketName, "")
}
defer func() { err = errs.Combine(err, bucket.Close()) }()
list, err := bucket.ListObjects(ctx, &storj.ListOptions{Direction: storj.After, Recursive: true, Limit: 1})
if err != nil {
return false, convertError(err, bucketName, "")
}
return len(list.Items) == 0, nil
}
func (layer *gatewayLayer) DeleteObject(ctx context.Context, bucketName, objectPath string) (err error) {
defer mon.Task()(&ctx)(&err)
bucket, err := layer.gateway.project.OpenBucket(ctx, bucketName, layer.gateway.encCtx)
if err != nil {
return convertError(err, bucketName, "")
}
defer func() { err = errs.Combine(err, bucket.Close()) }()
err = bucket.DeleteObject(ctx, objectPath)
return convertError(err, bucketName, objectPath)
}
func (layer *gatewayLayer) GetBucketInfo(ctx context.Context, bucketName string) (bucketInfo minio.BucketInfo, err error) {
defer mon.Task()(&ctx)(&err)
bucket, _, err := layer.gateway.project.GetBucketInfo(ctx, bucketName)
if err != nil {
return minio.BucketInfo{}, convertError(err, bucketName, "")
}
return minio.BucketInfo{Name: bucket.Name, Created: bucket.Created}, nil
}
func (layer *gatewayLayer) GetObject(ctx context.Context, bucketName, objectPath string, startOffset int64, length int64, writer io.Writer, etag string) (err error) {
defer mon.Task()(&ctx)(&err)
bucket, err := layer.gateway.project.OpenBucket(ctx, bucketName, layer.gateway.encCtx)
if err != nil {
return convertError(err, bucketName, "")
}
defer func() { err = errs.Combine(err, bucket.Close()) }()
object, err := bucket.OpenObject(ctx, objectPath)
if err != nil {
return convertError(err, bucketName, objectPath)
}
defer func() { err = errs.Combine(err, object.Close()) }()
if startOffset < 0 || length < -1 || startOffset+length > object.Meta.Size {
return minio.InvalidRange{
OffsetBegin: startOffset,
OffsetEnd: startOffset + length,
ResourceSize: object.Meta.Size,
}
}
reader, err := object.DownloadRange(ctx, startOffset, length)
if err != nil {
return convertError(err, bucketName, objectPath)
}
defer func() { err = errs.Combine(err, reader.Close()) }()
_, err = io.Copy(writer, reader)
return err
}
func (layer *gatewayLayer) GetObjectInfo(ctx context.Context, bucketName, objectPath string) (objInfo minio.ObjectInfo, err error) {
defer mon.Task()(&ctx)(&err)
bucket, err := layer.gateway.project.OpenBucket(ctx, bucketName, layer.gateway.encCtx)
if err != nil {
return minio.ObjectInfo{}, convertError(err, bucketName, "")
}
defer func() { err = errs.Combine(err, bucket.Close()) }()
object, err := bucket.OpenObject(ctx, objectPath)
if err != nil {
return minio.ObjectInfo{}, convertError(err, bucketName, objectPath)
}
defer func() { err = errs.Combine(err, object.Close()) }()
return minio.ObjectInfo{
Name: object.Meta.Path,
Bucket: object.Meta.Bucket,
ModTime: object.Meta.Modified,
Size: object.Meta.Size,
ETag: hex.EncodeToString(object.Meta.Checksum),
ContentType: object.Meta.ContentType,
UserDefined: object.Meta.Metadata,
}, err
}
func (layer *gatewayLayer) ListBuckets(ctx context.Context) (bucketItems []minio.BucketInfo, err error) {
defer mon.Task()(&ctx)(&err)
startAfter := ""
for {
list, err := layer.gateway.project.ListBuckets(ctx, &storj.BucketListOptions{Direction: storj.After, Cursor: startAfter})
if err != nil {
return nil, err
}
for _, item := range list.Items {
bucketItems = append(bucketItems, minio.BucketInfo{Name: item.Name, Created: item.Created})
}
if !list.More {
break
}
startAfter = list.Items[len(list.Items)-1].Name
}
return bucketItems, err
}
func (layer *gatewayLayer) ListObjects(ctx context.Context, bucketName, prefix, marker, delimiter string, maxKeys int) (result minio.ListObjectsInfo, err error) {
defer mon.Task()(&ctx)(&err)
if delimiter != "" && delimiter != "/" {
return minio.ListObjectsInfo{}, minio.UnsupportedDelimiter{Delimiter: delimiter}
}
bucket, err := layer.gateway.project.OpenBucket(ctx, bucketName, layer.gateway.encCtx)
if err != nil {
return minio.ListObjectsInfo{}, convertError(err, bucketName, "")
}
defer func() { err = errs.Combine(err, bucket.Close()) }()
startAfter := marker
recursive := delimiter == ""
var objects []minio.ObjectInfo
var prefixes []string
list, err := bucket.ListObjects(ctx, &storj.ListOptions{
Direction: storj.After,
Cursor: startAfter,
Prefix: prefix,
Recursive: recursive,
Limit: maxKeys,
})
if err != nil {
return result, convertError(err, bucketName, "")
}
if len(list.Items) > 0 {
for _, item := range list.Items {
path := item.Path
if recursive && prefix != "" {
path = storj.JoinPaths(strings.TrimSuffix(prefix, "/"), path)
}
if item.IsPrefix {
prefixes = append(prefixes, path)
continue
}
objects = append(objects, minio.ObjectInfo{
Name: path,
Bucket: item.Bucket.Name,
ModTime: item.Modified,
Size: item.Size,
ETag: hex.EncodeToString(item.Checksum),
ContentType: item.ContentType,
UserDefined: item.Metadata,
})
}
startAfter = list.Items[len(list.Items)-1].Path
}
result = minio.ListObjectsInfo{
IsTruncated: list.More,
Objects: objects,
Prefixes: prefixes,
}
if list.More {
result.NextMarker = startAfter
}
return result, err
}
// ListObjectsV2 - Not implemented stub
func (layer *gatewayLayer) ListObjectsV2(ctx context.Context, bucketName, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result minio.ListObjectsV2Info, err error) {
defer mon.Task()(&ctx)(&err)
if delimiter != "" && delimiter != "/" {
return minio.ListObjectsV2Info{ContinuationToken: continuationToken}, minio.UnsupportedDelimiter{Delimiter: delimiter}
}
bucket, err := layer.gateway.project.OpenBucket(ctx, bucketName, layer.gateway.encCtx)
if err != nil {
return minio.ListObjectsV2Info{}, convertError(err, bucketName, "")
}
defer func() { err = errs.Combine(err, bucket.Close()) }()
recursive := delimiter == ""
var nextContinuationToken string
var startAfterPath storj.Path
if continuationToken != "" {
startAfterPath = continuationToken
}
if startAfterPath == "" && startAfter != "" {
startAfterPath = startAfter
}
var objects []minio.ObjectInfo
var prefixes []string
list, err := bucket.ListObjects(ctx, &storj.ListOptions{
Direction: storj.After,
Cursor: startAfterPath,
Prefix: prefix,
Recursive: recursive,
Limit: maxKeys,
})
if err != nil {
return minio.ListObjectsV2Info{ContinuationToken: continuationToken}, convertError(err, bucketName, "")
}
if len(list.Items) > 0 {
for _, item := range list.Items {
path := item.Path
if recursive && prefix != "" {
path = storj.JoinPaths(strings.TrimSuffix(prefix, "/"), path)
}
if item.IsPrefix {
prefixes = append(prefixes, path)
continue
}
objects = append(objects, minio.ObjectInfo{
Name: path,
Bucket: item.Bucket.Name,
ModTime: item.Modified,
Size: item.Size,
ETag: hex.EncodeToString(item.Checksum),
ContentType: item.ContentType,
UserDefined: item.Metadata,
})
}
nextContinuationToken = list.Items[len(list.Items)-1].Path + "\x00"
}
result = minio.ListObjectsV2Info{
IsTruncated: list.More,
ContinuationToken: continuationToken,
Objects: objects,
Prefixes: prefixes,
}
if list.More {
result.NextContinuationToken = nextContinuationToken
}
return result, err
}
func (layer *gatewayLayer) MakeBucketWithLocation(ctx context.Context, bucketName string, location string) (err error) {
defer mon.Task()(&ctx)(&err)
// TODO: This current strategy of calling bs.Get
// to check if a bucket exists, then calling bs.Put
// if not, can create a race condition if two people
// call MakeBucketWithLocation at the same time and
// therefore try to Put a bucket at the same time.
// The reason for the Get call to check if the
// bucket already exists is to match S3 CLI behavior.
_, _, err = layer.gateway.project.GetBucketInfo(ctx, bucketName)
if err == nil {
return minio.BucketAlreadyExists{Bucket: bucketName}
}
if !storj.ErrBucketNotFound.Has(err) {
return convertError(err, bucketName, "")
}
cfg := uplink.BucketConfig{
PathCipher: layer.gateway.pathCipher,
EncryptionParameters: layer.gateway.encryption,
}
cfg.Volatile.RedundancyScheme = layer.gateway.redundancy
cfg.Volatile.SegmentsSize = layer.gateway.segmentSize
_, err = layer.gateway.project.CreateBucket(ctx, bucketName, &cfg)
return err
}
func (layer *gatewayLayer) CopyObject(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo minio.ObjectInfo) (objInfo minio.ObjectInfo, err error) {
defer mon.Task()(&ctx)(&err)
bucket, err := layer.gateway.project.OpenBucket(ctx, srcBucket, layer.gateway.encCtx)
if err != nil {
return minio.ObjectInfo{}, convertError(err, srcBucket, "")
}
defer func() { err = errs.Combine(err, bucket.Close()) }()
object, err := bucket.OpenObject(ctx, srcObject)
if err != nil {
return minio.ObjectInfo{}, convertError(err, srcBucket, srcObject)
}
defer func() { err = errs.Combine(err, object.Close()) }()
reader, err := object.DownloadRange(ctx, 0, -1)
if err != nil {
return minio.ObjectInfo{}, convertError(err, srcBucket, srcObject)
}
defer func() { err = errs.Combine(err, reader.Close()) }()
opts := uplink.UploadOptions{
ContentType: object.Meta.ContentType,
Metadata: object.Meta.Metadata,
Expires: object.Meta.Expires,
}
opts.Volatile.EncryptionParameters = object.Meta.Volatile.EncryptionParameters
opts.Volatile.RedundancyScheme = object.Meta.Volatile.RedundancyScheme
return layer.putObject(ctx, destBucket, destObject, reader, &opts)
}
func (layer *gatewayLayer) putObject(ctx context.Context, bucketName, objectPath string, reader io.Reader, opts *uplink.UploadOptions) (objInfo minio.ObjectInfo, err error) {
defer mon.Task()(&ctx)(&err)
bucket, err := layer.gateway.project.OpenBucket(ctx, bucketName, layer.gateway.encCtx)
if err != nil {
return minio.ObjectInfo{}, convertError(err, bucketName, "")
}
defer func() { err = errs.Combine(err, bucket.Close()) }()
err = bucket.UploadObject(ctx, objectPath, reader, opts)
if err != nil {
return minio.ObjectInfo{}, convertError(err, bucketName, "")
}
object, err := bucket.OpenObject(ctx, objectPath)
if err != nil {
return minio.ObjectInfo{}, convertError(err, bucketName, objectPath)
}
defer func() { err = errs.Combine(err, object.Close()) }()
return minio.ObjectInfo{
Name: object.Meta.Path,
Bucket: object.Meta.Bucket,
ModTime: object.Meta.Modified,
Size: object.Meta.Size,
ETag: hex.EncodeToString(object.Meta.Checksum),
ContentType: object.Meta.ContentType,
UserDefined: object.Meta.Metadata,
}, nil
}
func upload(ctx context.Context, streams streams.Store, mutableObject storj.MutableObject, reader io.Reader) error {
mutableStream, err := mutableObject.CreateStream(ctx)
if err != nil {
return err
}
upload := stream.NewUpload(ctx, mutableStream, streams)
_, err = io.Copy(upload, reader)
return errs.Combine(err, upload.Close())
}
func (layer *gatewayLayer) PutObject(ctx context.Context, bucketName, objectPath string, data *hash.Reader, metadata map[string]string) (objInfo minio.ObjectInfo, err error) {
defer mon.Task()(&ctx)(&err)
contentType := metadata["content-type"]
delete(metadata, "content-type")
opts := uplink.UploadOptions{
ContentType: contentType,
Metadata: metadata,
}
return layer.putObject(ctx, bucketName, objectPath, data, &opts)
}
func (layer *gatewayLayer) Shutdown(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
return nil
}
func (layer *gatewayLayer) StorageInfo(context.Context) minio.StorageInfo {
return minio.StorageInfo{}
}
func convertError(err error, bucket, object string) error {
if storj.ErrNoBucket.Has(err) {
return minio.BucketNameInvalid{Bucket: bucket}
}
if storj.ErrBucketNotFound.Has(err) {
return minio.BucketNotFound{Bucket: bucket}
}
if storj.ErrNoPath.Has(err) {
return minio.ObjectNameInvalid{Bucket: bucket, Object: object}
}
if storj.ErrObjectNotFound.Has(err) {
return minio.ObjectNotFound{Bucket: bucket, Object: object}
}
return err
}