cmd: migrate uplink CLI to new API

Change-Id: I8f8fcc8dd9a68aac18fd79c4071696fb54853a60
This commit is contained in:
Michal Niewrzal 2020-03-05 11:06:47 +01:00
parent 4e9cd77d54
commit c20cf25f35
15 changed files with 285 additions and 453 deletions

View File

@ -15,6 +15,7 @@ import (
"storj.io/common/peertls/tlsopts"
"storj.io/common/storj"
libuplink "storj.io/storj/lib/uplink"
"storj.io/uplink"
)
var mon = monkit.Package()
@ -164,6 +165,27 @@ func (a AccessConfig) GetAccess() (_ *libuplink.Scope, err error) {
}, nil
}
// GetNewAccess returns the appropriate access for the config.
func (a AccessConfig) GetNewAccess() (_ *uplink.Access, err error) {
defer mon.Task()(nil)(&err)
oldAccess, err := a.GetAccess()
if err != nil {
return nil, err
}
serializedOldAccess, err := oldAccess.Serialize()
if err != nil {
return nil, err
}
access, err := uplink.ParseAccess(serializedOldAccess)
if err != nil {
return nil, err
}
return access, nil
}
// GetNamedAccess returns named access if exists.
func (a AccessConfig) GetNamedAccess(name string) (_ *libuplink.Scope, err error) {
// if an access exists for that name, try to load it.

View File

@ -18,8 +18,8 @@ import (
"github.com/zeebo/errs"
"storj.io/common/fpath"
libuplink "storj.io/storj/lib/uplink"
"storj.io/storj/pkg/process"
"storj.io/uplink"
)
var (
@ -89,11 +89,11 @@ func upload(ctx context.Context, src fpath.FPath, dst fpath.FPath, showProgress
return fmt.Errorf("source cannot be a directory: %s", src)
}
project, bucket, err := cfg.GetProjectAndBucket(ctx, dst.Bucket())
project, err := cfg.getProject(ctx, false)
if err != nil {
return err
}
defer closeProjectAndBucket(project, bucket)
defer closeProject(project)
reader := io.Reader(file)
var bar *progressbar.ProgressBar
@ -103,27 +103,43 @@ func upload(ctx context.Context, src fpath.FPath, dst fpath.FPath, showProgress
bar.Start()
}
opts := &libuplink.UploadOptions{}
if *expires != "" {
opts.Expires = expiration.UTC()
}
var customMetadata uplink.CustomMetadata
if *metadata != "" {
var md map[string]string
err := json.Unmarshal([]byte(*metadata), &md)
err := json.Unmarshal([]byte(*metadata), &customMetadata)
if err != nil {
return err
}
opts.Metadata = md
if err := customMetadata.Verify(); err != nil {
return err
}
}
opts.Volatile.RedundancyScheme = cfg.GetRedundancyScheme()
opts.Volatile.EncryptionParameters = cfg.GetEncryptionParameters()
upload, err := project.UploadObject(ctx, dst.Bucket(), dst.Path(), &uplink.UploadOptions{
Expires: expiration,
})
if err != nil {
return err
}
err = upload.SetCustomMetadata(ctx, customMetadata)
if err != nil {
abortErr := upload.Abort()
err = errs.Combine(err, abortErr)
return err
}
_, err = io.Copy(upload, reader)
if err != nil {
abortErr := upload.Abort()
err = errs.Combine(err, abortErr)
return err
}
if err := upload.Commit(); err != nil {
return err
}
err = bucket.UploadObject(ctx, dst.Path(), reader, opts)
if bar != nil {
bar.Finish()
}
@ -146,31 +162,27 @@ func download(ctx context.Context, src fpath.FPath, dst fpath.FPath, showProgres
return fmt.Errorf("destination must be local path: %s", dst)
}
project, bucket, err := cfg.GetProjectAndBucket(ctx, src.Bucket())
project, err := cfg.getProject(ctx, false)
if err != nil {
return err
}
defer closeProjectAndBucket(project, bucket)
defer closeProject(project)
object, err := bucket.OpenObject(ctx, src.Path())
if err != nil {
return convertError(err, src)
}
rc, err := object.DownloadRange(ctx, 0, object.Meta.Size)
download, err := project.DownloadObject(ctx, src.Bucket(), src.Path(), nil)
if err != nil {
return err
}
defer func() { err = errs.Combine(err, rc.Close()) }()
defer func() { err = errs.Combine(err, download.Close()) }()
var bar *progressbar.ProgressBar
var reader io.ReadCloser
if showProgress {
bar = progressbar.New64(object.Meta.Size)
reader = bar.NewProxyReader(rc)
info := download.Info()
bar = progressbar.New64(info.System.ContentLength)
reader = bar.NewProxyReader(download)
bar.Start()
} else {
reader = rc
reader = download
}
if fileInfo, err := os.Stat(dst.Path()); err == nil && fileInfo.IsDir() {
@ -217,31 +229,28 @@ func copyObject(ctx context.Context, src fpath.FPath, dst fpath.FPath) (err erro
return fmt.Errorf("destination must be Storj URL: %s", dst)
}
project, bucket, err := cfg.GetProjectAndBucket(ctx, dst.Bucket())
project, err := cfg.getProject(ctx, false)
if err != nil {
return err
}
defer closeProjectAndBucket(project, bucket)
defer closeProject(project)
object, err := bucket.OpenObject(ctx, src.Path())
if err != nil {
return convertError(err, src)
}
rc, err := object.DownloadRange(ctx, 0, object.Meta.Size)
download, err := project.DownloadObject(ctx, src.Bucket(), src.Path(), nil)
if err != nil {
return err
}
defer func() { err = errs.Combine(err, rc.Close()) }()
defer func() { err = errs.Combine(err, download.Close()) }()
downloadInfo := download.Info()
var bar *progressbar.ProgressBar
var reader io.Reader
if *progress {
bar = progressbar.New64(object.Meta.Size)
reader = bar.NewProxyReader(reader)
bar = progressbar.New64(downloadInfo.System.ContentLength)
reader = bar.NewProxyReader(download)
bar.Start()
} else {
reader = rc
reader = download
}
// if destination object name not specified, default to source object name
@ -249,15 +258,27 @@ func copyObject(ctx context.Context, src fpath.FPath, dst fpath.FPath) (err erro
dst = dst.Join(src.Base())
}
opts := &libuplink.UploadOptions{
Expires: object.Meta.Expires,
ContentType: object.Meta.ContentType,
Metadata: object.Meta.Metadata,
}
opts.Volatile.RedundancyScheme = cfg.GetRedundancyScheme()
opts.Volatile.EncryptionParameters = cfg.GetEncryptionParameters()
upload, err := project.UploadObject(ctx, dst.Bucket(), dst.Path(), &uplink.UploadOptions{
Expires: downloadInfo.System.Expires,
})
_, err = io.Copy(upload, reader)
if err != nil {
abortErr := upload.Abort()
return errs.Combine(err, abortErr)
}
err = upload.SetCustomMetadata(ctx, downloadInfo.Custom)
if err != nil {
abortErr := upload.Abort()
return errs.Combine(err, abortErr)
}
err = upload.Commit()
if err != nil {
return err
}
err = bucket.UploadObject(ctx, dst.Path(), reader, opts)
if bar != nil {
bar.Finish()
}

View File

@ -6,14 +6,14 @@ package cmd
import (
"context"
"fmt"
"strings"
"time"
"github.com/spf13/cobra"
"storj.io/common/fpath"
"storj.io/common/storj"
libuplink "storj.io/storj/lib/uplink"
"storj.io/storj/pkg/process"
"storj.io/uplink"
)
var (
@ -37,26 +37,11 @@ func init() {
func list(cmd *cobra.Command, args []string) error {
ctx, _ := process.Ctx(cmd)
project, err := cfg.GetProject(ctx)
project, err := cfg.getProject(ctx, *lsEncryptedFlag)
if err != nil {
return err
}
defer func() {
if err := project.Close(); err != nil {
fmt.Printf("error closing project: %+v\n", err)
}
}()
access, err := cfg.GetAccess()
if err != nil {
return err
}
encAccess := access.EncryptionAccess
if *lsEncryptedFlag {
encAccess = libuplink.NewEncryptionAccessWithDefaultKey(storj.Key{})
encAccess.Store().EncryptionBypass = true
}
defer closeProject(project)
// list objects
if len(args) > 0 {
@ -69,48 +54,26 @@ func list(cmd *cobra.Command, args []string) error {
return fmt.Errorf("no bucket specified, use format sj://bucket/")
}
bucket, err := project.OpenBucket(ctx, src.Bucket(), encAccess)
if err != nil {
return err
}
defer func() {
if err := bucket.Close(); err != nil {
fmt.Printf("error closing bucket: %+v\n", err)
}
}()
err = listFiles(ctx, bucket, src, false)
err = listFiles(ctx, project, src.Bucket(), src.Path(), false)
return convertError(err, src)
}
noBuckets := true
// list buckets
listOpts := storj.BucketListOptions{
Direction: storj.Forward,
Cursor: "",
}
for {
list, err := project.ListBuckets(ctx, &listOpts)
if err != nil {
return err
}
if len(list.Items) > 0 {
noBuckets = false
for _, bucket := range list.Items {
buckets := project.ListBuckets(ctx, nil)
for buckets.Next() {
bucket := buckets.Item()
fmt.Println("BKT", formatTime(bucket.Created), bucket.Name)
if *lsRecursiveFlag {
if err := listFilesFromBucket(ctx, project, bucket.Name, encAccess); err != nil {
if err := listFilesFromBucket(ctx, project, bucket.Name); err != nil {
return err
}
}
noBuckets = false
}
}
if !list.More {
break
}
listOpts = listOpts.NextPage(list)
if buckets.Err() != nil {
return buckets.Err()
}
if noBuckets {
@ -120,61 +83,37 @@ func list(cmd *cobra.Command, args []string) error {
return nil
}
func listFilesFromBucket(ctx context.Context, project *libuplink.Project, bucketName string, access *libuplink.EncryptionAccess) error {
prefix, err := fpath.New(fmt.Sprintf("sj://%s/", bucketName))
if err != nil {
return err
func listFilesFromBucket(ctx context.Context, project *uplink.Project, bucket string) error {
return listFiles(ctx, project, bucket, "", true)
}
bucket, err := project.OpenBucket(ctx, bucketName, access)
if err != nil {
return err
}
defer func() {
if err := bucket.Close(); err != nil {
fmt.Printf("error closing bucket: %+v\n", err)
}
}()
err = listFiles(ctx, bucket, prefix, true)
if err != nil {
return err
func listFiles(ctx context.Context, project *uplink.Project, bucket, prefix string, prependBucket bool) error {
// TODO force adding slash at the end because fpath is removing it,
// most probably should be fixed in storj/common
if prefix != "" && !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
return nil
}
func listFiles(ctx context.Context, bucket *libuplink.Bucket, prefix fpath.FPath, prependBucket bool) error {
startAfter := ""
for {
list, err := bucket.ListObjects(ctx, &storj.ListOptions{
Direction: storj.After,
Cursor: startAfter,
Prefix: prefix.Path(),
objects := project.ListObjects(ctx, bucket, &uplink.ListObjectsOptions{
Prefix: prefix,
Recursive: *lsRecursiveFlag,
System: true,
})
if err != nil {
return err
}
for _, object := range list.Items {
path := object.Path
for objects.Next() {
object := objects.Item()
path := object.Key
if prependBucket {
path = fmt.Sprintf("%s/%s", prefix.Bucket(), path)
path = fmt.Sprintf("%s/%s", bucket, path)
}
if object.IsPrefix {
fmt.Println("PRE", path)
} else {
fmt.Printf("%v %v %12v %v\n", "OBJ", formatTime(object.Modified), object.Size, path)
fmt.Printf("%v %v %12v %v\n", "OBJ", formatTime(object.System.Created), object.System.ContentLength, path)
}
}
if !list.More {
break
}
startAfter = list.Items[len(list.Items)-1].Path
if objects.Err() != nil {
return objects.Err()
}
return nil

View File

@ -7,12 +7,8 @@ import (
"fmt"
"github.com/spf13/cobra"
"github.com/zeebo/errs"
"storj.io/common/fpath"
"storj.io/common/memory"
"storj.io/common/storj"
"storj.io/storj/lib/uplink"
"storj.io/storj/pkg/process"
)
@ -45,29 +41,13 @@ func makeBucket(cmd *cobra.Command, args []string) error {
return fmt.Errorf("nested buckets not supported, use format sj://bucket/")
}
project, err := cfg.GetProject(ctx)
project, err := cfg.getProject(ctx, false)
if err != nil {
return errs.New("error setting up project: %+v", err)
return err
}
defer func() {
if err := project.Close(); err != nil {
fmt.Printf("error closing project: %+v\n", err)
}
}()
defer closeProject(project)
bucketCfg := &uplink.BucketConfig{}
bucketCfg.PathCipher = cfg.GetPathCipherSuite()
bucketCfg.EncryptionParameters = cfg.GetEncryptionParameters()
bucketCfg.Volatile = struct {
RedundancyScheme storj.RedundancyScheme
SegmentsSize memory.Size
}{
RedundancyScheme: cfg.GetRedundancyScheme(),
SegmentsSize: cfg.GetSegmentSize(),
}
_, err = project.CreateBucket(ctx, dst.Bucket(), bucketCfg)
if err != nil {
if _, err := project.CreateBucket(ctx, dst.Bucket()); err != nil {
return err
}

View File

@ -8,7 +8,6 @@ import (
"fmt"
"github.com/spf13/cobra"
"github.com/zeebo/errs"
"storj.io/common/fpath"
"storj.io/storj/pkg/process"
@ -50,17 +49,16 @@ func metaGetMain(cmd *cobra.Command, args []string) (err error) {
return fmt.Errorf("the source destination must be a Storj URL")
}
project, bucket, err := cfg.GetProjectAndBucket(ctx, src.Bucket())
project, err := cfg.getProject(ctx, false)
if err != nil {
return err
}
defer closeProjectAndBucket(project, bucket)
defer closeProject(project)
object, err := bucket.OpenObject(ctx, src.Path())
object, err := project.StatObject(ctx, src.Bucket(), src.Path())
if err != nil {
return err
}
defer func() { err = errs.Combine(err, object.Close()) }()
if key != nil {
var keyNorm string
@ -69,7 +67,7 @@ func metaGetMain(cmd *cobra.Command, args []string) (err error) {
return err
}
value, ok := object.Meta.Metadata[keyNorm]
value, ok := object.Custom[keyNorm]
if !ok {
return fmt.Errorf("key does not exist")
}
@ -84,8 +82,8 @@ func metaGetMain(cmd *cobra.Command, args []string) (err error) {
return nil
}
if object.Meta.Metadata != nil {
str, err := json.MarshalIndent(object.Meta.Metadata, "", " ")
if object.Custom != nil {
str, err := json.MarshalIndent(object.Custom, "", " ")
if err != nil {
return err
}

View File

@ -7,6 +7,7 @@ import (
"encoding/json"
"fmt"
"os/exec"
"strings"
"testing"
"github.com/stretchr/testify/assert"
@ -60,6 +61,13 @@ func TestSetGetMeta(t *testing.T) {
// Upload file with metadata.
metadata := testrand.Metadata()
// TODO fix this in storj/common
for k, v := range metadata {
if strings.IndexByte(k, 0) >= 0 || strings.IndexByte(v, 0) >= 0 {
delete(metadata, k)
}
}
metadataBs, err := json.Marshal(metadata)
require.NoError(t, err)

View File

@ -9,7 +9,6 @@ import (
"github.com/spf13/cobra"
"storj.io/common/fpath"
"storj.io/common/storj"
"storj.io/storj/pkg/process"
)
@ -42,23 +41,13 @@ func deleteBucket(cmd *cobra.Command, args []string) error {
return fmt.Errorf("nested buckets not supported, use format sj://bucket/")
}
project, bucket, err := cfg.GetProjectAndBucket(ctx, dst.Bucket())
project, err := cfg.getProject(ctx, false)
if err != nil {
return convertError(err, dst)
}
defer closeProjectAndBucket(project, bucket)
defer closeProject(project)
list, err := bucket.ListObjects(ctx, &storj.ListOptions{Direction: storj.After, Recursive: true, Limit: 1})
if err != nil {
return convertError(err, dst)
}
if len(list.Items) > 0 {
return fmt.Errorf("bucket not empty: %s", dst.Bucket())
}
err = project.DeleteBucket(ctx, dst.Bucket())
if err != nil {
if _, err := project.DeleteBucket(ctx, dst.Bucket()); err != nil {
return convertError(err, dst)
}

View File

@ -9,8 +9,6 @@ import (
"github.com/spf13/cobra"
"storj.io/common/fpath"
"storj.io/common/storj"
libuplink "storj.io/storj/lib/uplink"
"storj.io/storj/pkg/process"
)
@ -45,45 +43,16 @@ func deleteObject(cmd *cobra.Command, args []string) error {
return fmt.Errorf("no bucket specified, use format sj://bucket/")
}
project, err := cfg.GetProject(ctx)
project, err := cfg.getProject(ctx, *rmEncryptedFlag)
if err != nil {
return err
}
defer func() {
if err := project.Close(); err != nil {
fmt.Printf("error closing project: %+v\n", err)
}
}()
defer closeProject(project)
access, err := cfg.GetAccess()
if err != nil {
return err
}
encAccess := access.EncryptionAccess
if *rmEncryptedFlag {
encAccess = libuplink.NewEncryptionAccessWithDefaultKey(storj.Key{})
encAccess.Store().EncryptionBypass = true
}
bucket, err := project.OpenBucket(ctx, dst.Bucket(), encAccess)
if err != nil {
return err
}
defer func() {
if err := bucket.Close(); err != nil {
fmt.Printf("error closing bucket: %+v\n", err)
}
}()
if err = bucket.DeleteObject(ctx, dst.Path()); err != nil {
if _, err = project.DeleteObject(ctx, dst.Bucket(), dst.Path()); err != nil {
return convertError(err, dst)
}
if err := project.Close(); err != nil {
return err
}
fmt.Printf("Deleted %s\n", dst)
return nil

View File

@ -23,11 +23,11 @@ import (
"storj.io/common/fpath"
"storj.io/common/storj"
libuplink "storj.io/storj/lib/uplink"
"storj.io/storj/pkg/cfgstruct"
"storj.io/storj/pkg/process"
"storj.io/storj/private/version"
"storj.io/storj/private/version/checker"
"storj.io/uplink"
privateAccess "storj.io/uplink/private/access"
)
const advancedFlagName = "advanced"
@ -84,90 +84,30 @@ func addCmd(cmd *cobra.Command, root *cobra.Command) *cobra.Command {
return cmd
}
// NewUplink returns a pointer to a new Client with a Config and Uplink pointer on it and an error.
func (cliCfg *UplinkFlags) NewUplink(ctx context.Context) (*libuplink.Uplink, error) {
// Transform the uplink cli config flags to the libuplink config object
libuplinkCfg := &libuplink.Config{}
libuplinkCfg.Volatile.Log = zap.L()
libuplinkCfg.Volatile.MaxInlineSize = cliCfg.Client.MaxInlineSize
libuplinkCfg.Volatile.MaxMemory = cliCfg.RS.MaxBufferMem
libuplinkCfg.Volatile.PeerIDVersion = cliCfg.TLS.PeerIDVersions
libuplinkCfg.Volatile.TLS.SkipPeerCAWhitelist = !cliCfg.TLS.UsePeerCAWhitelist
libuplinkCfg.Volatile.TLS.PeerCAWhitelistPath = cliCfg.TLS.PeerCAWhitelistPath
libuplinkCfg.Volatile.DialTimeout = cliCfg.Client.DialTimeout
libuplinkCfg.Volatile.PBKDFConcurrency = cliCfg.PBKDFConcurrency
return libuplink.NewUplink(ctx, libuplinkCfg)
}
// GetProject returns a *libuplink.Project for interacting with a specific project
func (cliCfg *UplinkFlags) GetProject(ctx context.Context) (_ *libuplink.Project, err error) {
err = checker.CheckProcessVersion(ctx, zap.L(), cliCfg.Version, version.Build, "Uplink")
func (cliCfg *UplinkFlags) getProject(ctx context.Context, encryptionBypass bool) (_ *uplink.Project, err error) {
access, err := cfg.GetNewAccess()
if err != nil {
return nil, err
}
access, err := cliCfg.GetAccess()
uplinkCfg := uplink.Config{}
uplinkCfg.DialTimeout = cliCfg.Client.DialTimeout
if encryptionBypass {
err = privateAccess.EnablePathEncryptionBypass(access)
if err != nil {
return nil, err
return nil, Error.Wrap(err)
}
}
project, err := uplinkCfg.OpenProject(ctx, access)
if err != nil {
return nil, Error.Wrap(err)
}
uplk, err := cliCfg.NewUplink(ctx)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
if err := uplk.Close(); err != nil {
fmt.Printf("error closing uplink: %+v\n", err)
}
}
}()
return uplk.OpenProject(ctx, access.SatelliteAddr, access.APIKey)
}
// GetProjectAndBucket returns a *libuplink.Bucket for interacting with a specific project's bucket
func (cliCfg *UplinkFlags) GetProjectAndBucket(ctx context.Context, bucketName string) (project *libuplink.Project, bucket *libuplink.Bucket, err error) {
access, err := cliCfg.GetAccess()
if err != nil {
return nil, nil, err
}
uplk, err := cliCfg.NewUplink(ctx)
if err != nil {
return nil, nil, err
}
defer func() {
if err != nil {
if err := uplk.Close(); err != nil {
fmt.Printf("error closing uplink: %+v\n", err)
}
}
}()
project, err = uplk.OpenProject(ctx, access.SatelliteAddr, access.APIKey)
if err != nil {
return nil, nil, err
}
defer func() {
if err != nil {
if err := project.Close(); err != nil {
fmt.Printf("error closing project: %+v\n", err)
}
}
}()
bucket, err = project.OpenBucket(ctx, bucketName, access.EncryptionAccess)
return project, bucket, err
}
func closeProjectAndBucket(project *libuplink.Project, bucket *libuplink.Bucket) {
if err := bucket.Close(); err != nil {
fmt.Printf("error closing bucket: %+v\n", err)
return project, nil
}
func closeProject(project *uplink.Project) {
if err := project.Close(); err != nil {
fmt.Printf("error closing project: %+v\n", err)
}

View File

@ -11,13 +11,12 @@ import (
"strings"
"github.com/spf13/cobra"
"github.com/zeebo/errs"
"storj.io/common/storj"
"storj.io/storj/cmd/internal/wizard"
libuplink "storj.io/storj/lib/uplink"
"storj.io/storj/pkg/cfgstruct"
"storj.io/storj/pkg/process"
"storj.io/uplink"
"storj.io/uplink/backcomp"
)
var (
@ -93,41 +92,25 @@ func cmdSetup(cmd *cobra.Command, args []string) (err error) {
return Error.Wrap(err)
}
apiKey, err := libuplink.ParseAPIKey(apiKeyString)
if err != nil {
return Error.Wrap(err)
}
passphrase, err := wizard.PromptForEncryptionPassphrase()
if err != nil {
return Error.Wrap(err)
}
uplink, err := setupCfg.NewUplink(ctx)
uplinkConfig := uplink.Config{
DialTimeout: setupCfg.Client.DialTimeout,
}
var access *uplink.Access
if setupCfg.PBKDFConcurrency == 0 {
access, err = uplinkConfig.RequestAccessWithPassphrase(ctx, satelliteAddress, apiKeyString, passphrase)
} else {
access, err = backcomp.RequestAccessWithPassphraseAndConcurrency(ctx, uplinkConfig, satelliteAddress, apiKeyString, passphrase, uint8(setupCfg.PBKDFConcurrency))
}
if err != nil {
return Error.Wrap(err)
}
defer func() { err = errs.Combine(err, uplink.Close()) }()
project, err := uplink.OpenProject(ctx, satelliteAddress, apiKey)
if err != nil {
return Error.Wrap(err)
}
defer func() { err = errs.Combine(err, project.Close()) }()
key, err := project.SaltedKeyFromPassphrase(ctx, passphrase)
if err != nil {
return Error.Wrap(err)
}
encAccess := libuplink.NewEncryptionAccessWithDefaultKey(*key)
encAccess.SetDefaultPathCipher(storj.EncAESGCM)
accessData, err := (&libuplink.Scope{
SatelliteAddr: satelliteAddress,
APIKey: apiKey,
EncryptionAccess: encAccess,
}).Serialize()
accessData, err := access.Serialize()
if err != nil {
return Error.Wrap(err)
}

View File

@ -14,10 +14,9 @@ import (
"github.com/zeebo/errs"
"storj.io/common/fpath"
"storj.io/common/macaroon"
libuplink "storj.io/storj/lib/uplink"
"storj.io/storj/pkg/cfgstruct"
"storj.io/storj/pkg/process"
"storj.io/uplink"
)
var shareCfg struct {
@ -50,30 +49,29 @@ func init() {
process.Bind(shareCmd, &shareCfg, defaults, cfgstruct.ConfDir(getConfDir()))
}
func parseHumanDate(date string, now time.Time) (*time.Time, error) {
func parseHumanDate(date string, now time.Time) (time.Time, error) {
switch {
case date == "":
return nil, nil
return time.Time{}, nil
case date == "now":
return &now, nil
return now, nil
case date[0] == '+':
d, err := time.ParseDuration(date[1:])
t := now.Add(d)
return &t, errs.Wrap(err)
return t, errs.Wrap(err)
case date[0] == '-':
d, err := time.ParseDuration(date[1:])
t := now.Add(-d)
return &t, errs.Wrap(err)
return t, errs.Wrap(err)
default:
t, err := time.Parse(time.RFC3339, date)
return &t, errs.Wrap(err)
return t, errs.Wrap(err)
}
}
// shareMain is the function executed when shareCmd is called
func shareMain(cmd *cobra.Command, args []string) (err error) {
now := time.Now()
notBefore, err := parseHumanDate(shareCfg.NotBefore, now)
if err != nil {
return err
@ -91,7 +89,7 @@ func shareMain(cmd *cobra.Command, args []string) (err error) {
}
}
var restrictions []libuplink.EncryptionRestriction
var sharePrefixes []uplink.SharePrefix
for _, path := range shareCfg.AllowedPathPrefix {
p, err := fpath.New(path)
if err != nil {
@ -101,62 +99,46 @@ func shareMain(cmd *cobra.Command, args []string) (err error) {
return errs.New("required path must be remote: %q", path)
}
restrictions = append(restrictions, libuplink.EncryptionRestriction{
sharePrefixes = append(sharePrefixes, uplink.SharePrefix{
Bucket: p.Bucket(),
PathPrefix: p.Path(),
Prefix: p.Path(),
})
}
access, err := shareCfg.GetAccess()
if err != nil {
return err
}
key, encAccess := access.APIKey, access.EncryptionAccess
if len(restrictions) > 0 {
key, encAccess, err = encAccess.Restrict(key, restrictions...)
if err != nil {
return err
}
}
caveat, err := macaroon.NewCaveat()
access, err := shareCfg.GetNewAccess()
if err != nil {
return err
}
caveat.DisallowDeletes = shareCfg.DisallowDeletes || shareCfg.Readonly
caveat.DisallowLists = shareCfg.DisallowLists || shareCfg.Writeonly
caveat.DisallowReads = shareCfg.DisallowReads || shareCfg.Writeonly
caveat.DisallowWrites = shareCfg.DisallowWrites || shareCfg.Readonly
caveat.NotBefore = notBefore
caveat.NotAfter = notAfter
permission := uplink.Permission{}
permission.AllowDelete = !shareCfg.DisallowDeletes && !shareCfg.Readonly
permission.AllowList = !shareCfg.DisallowLists && !shareCfg.Writeonly
permission.AllowDownload = !shareCfg.DisallowReads && !shareCfg.Writeonly
permission.AllowUpload = !shareCfg.DisallowWrites && !shareCfg.Readonly
permission.NotBefore = notBefore
permission.NotAfter = notAfter
key, err = key.Restrict(caveat)
newAccess, err := access.Share(permission, sharePrefixes...)
if err != nil {
return err
}
newAccess := &libuplink.Scope{
SatelliteAddr: access.SatelliteAddr,
APIKey: key,
EncryptionAccess: encAccess,
}
newAccessData, err := newAccess.Serialize()
if err != nil {
return err
}
fmt.Println("Sharing access to satellite", access.SatelliteAddr)
// TODO extend libuplink to give this value
// fmt.Println("Sharing access to satellite", access.SatelliteAddr)
fmt.Println("=========== ACCESS RESTRICTIONS ==========================================================")
fmt.Println("Reads :", formatPermission(!caveat.GetDisallowReads()))
fmt.Println("Writes :", formatPermission(!caveat.GetDisallowWrites()))
fmt.Println("Lists :", formatPermission(!caveat.GetDisallowLists()))
fmt.Println("Deletes :", formatPermission(!caveat.GetDisallowDeletes()))
fmt.Println("Not Before:", formatTimeRestriction(caveat.NotBefore))
fmt.Println("Not After :", formatTimeRestriction(caveat.NotAfter))
fmt.Println("Paths :", formatPaths(restrictions))
fmt.Println("Download :", formatPermission(permission.AllowDownload))
fmt.Println("Upload :", formatPermission(permission.AllowUpload))
fmt.Println("Lists :", formatPermission(permission.AllowList))
fmt.Println("Deletes :", formatPermission(permission.AllowDelete))
fmt.Println("NotBefore :", formatTimeRestriction(permission.NotBefore))
fmt.Println("NotAfter :", formatTimeRestriction(permission.NotAfter))
fmt.Println("Paths :", formatPaths(sharePrefixes))
fmt.Println("=========== SERIALIZED ACCESS WITH THE ABOVE RESTRICTIONS TO SHARE WITH OTHERS ===========")
fmt.Println("Access :", newAccessData)
@ -181,25 +163,25 @@ func formatPermission(allowed bool) string {
return "Disallowed"
}
func formatTimeRestriction(t *time.Time) string {
if t == nil {
func formatTimeRestriction(t time.Time) string {
if t.IsZero() {
return "No restriction"
}
return formatTime(*t)
return formatTime(t)
}
func formatPaths(restrictions []libuplink.EncryptionRestriction) string {
if len(restrictions) == 0 {
func formatPaths(sharePrefixes []uplink.SharePrefix) string {
if len(sharePrefixes) == 0 {
return "WARNING! The entire project is shared!"
}
var paths []string
for _, restriction := range restrictions {
path := "sj://" + restriction.Bucket
if len(restriction.PathPrefix) == 0 {
for _, prefix := range sharePrefixes {
path := "sj://" + prefix.Bucket
if len(prefix.Prefix) == 0 {
path += " (entire bucket)"
} else {
path += "/" + restriction.PathPrefix
path += "/" + prefix.Prefix
}
paths = append(paths, path)
}

2
go.mod
View File

@ -55,5 +55,5 @@ require (
gopkg.in/yaml.v2 v2.2.4
storj.io/common v0.0.0-20200303092706-429875361e5d
storj.io/drpc v0.0.8
storj.io/uplink v1.0.0-rc.2
storj.io/uplink v1.0.0-rc.3
)

8
go.sum
View File

@ -615,10 +615,18 @@ honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXe
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
storj.io/common v0.0.0-20200226144507-3fe9f7839df5/go.mod h1:lCc2baFO7GQlKsPTri8xwCsCPO2LsEUUiAGeRQuXY48=
storj.io/common v0.0.0-20200227094229-a07042157dcb/go.mod h1:c9228xUKEg/sqWSOiVLoKQ3DiUqm1WrUAd9autjYfQc=
storj.io/common v0.0.0-20200303092706-429875361e5d h1:TNUV5+Nc77VV0nRpDWXsGEnxopsyOBknO6vMtrUiRbU=
storj.io/common v0.0.0-20200303092706-429875361e5d/go.mod h1:c9228xUKEg/sqWSOiVLoKQ3DiUqm1WrUAd9autjYfQc=
storj.io/common v0.0.0-20200305112941-134b7e1ea4a4 h1:nXJIJl7dS8jDPJJ19Olcc9SDUA/NyK/4QcVxb1mEUgQ=
storj.io/drpc v0.0.7-0.20191115031725-2171c57838d2/go.mod h1:/ascUDbzNAv0A3Jj7wUIKFBH2JdJ2uJIBO/b9+2yHgQ=
storj.io/drpc v0.0.8 h1:wu68cMmtoT0vSWIAZz29RpJkWdi4o0S8BIrLslpH5FQ=
storj.io/drpc v0.0.8/go.mod h1:v39uWro/EbXXk+gNnrM9FQuVVS2zUBWBfeduydgeXUA=
storj.io/uplink v1.0.0-rc.2 h1:IykgqbqwXDRJAaDXO0+t1CCfntetRvdWK9mUQsWtz/8=
storj.io/uplink v1.0.0-rc.2/go.mod h1:WUQYQZFI7iUxqarO1zhUwlVgiHxv4PGZlsQFh8XE8yg=
storj.io/uplink v1.0.0-rc.2.0.20200227164344-590e15dc6dc8 h1:iiwnR134qXXuihTgUZaynVna3jt6GQ13Y+OS31+TaTo=
storj.io/uplink v1.0.0-rc.2.0.20200227164344-590e15dc6dc8/go.mod h1:vLSVPdd45bPN3ewdHQrfdWhCniE1YSQw++LnP9cZR2g=
storj.io/uplink v1.0.0-rc.2.0.20200303125047-a419cb9a6a56 h1:8kKEdwZKOuTHjRlwYYIO5dVV2UNIx4GscUoXfT746mo=
storj.io/uplink v1.0.0-rc.2.0.20200303125047-a419cb9a6a56/go.mod h1:vLSVPdd45bPN3ewdHQrfdWhCniE1YSQw++LnP9cZR2g=
storj.io/uplink v1.0.0-rc.3 h1:AqqF22oMTi1UgFfA9i8t/UWDdn8KLsyuPJBGMRLIWzA=
storj.io/uplink v1.0.0-rc.3/go.mod h1:vLSVPdd45bPN3ewdHQrfdWhCniE1YSQw++LnP9cZR2g=

View File

@ -337,7 +337,7 @@ func (client *Uplink) GetConfig(satellite *SatelliteSystem) cmd.Config {
encAccess.SetDefaultPathCipher(storj.EncAESGCM)
accessData, err := (&libuplink.Scope{
SatelliteAddr: satellite.Addr(),
SatelliteAddr: satellite.URL().String(),
APIKey: apiKey,
EncryptionAccess: encAccess,
}).Serialize()

View File

@ -23,23 +23,40 @@ random_bytes_file () {
head -c $size </dev/urandom > $output
}
compare_files () {
name=$(basename $2)
if cmp "$1" "$2"
then
echo "$name matches uploaded file"
else
echo "$name does not match uploaded file"
exit 1
fi
}
random_bytes_file "2048" "$SRC_DIR/small-upload-testfile" # create 2kb file of random bytes (inline)
random_bytes_file "5242880" "$SRC_DIR/big-upload-testfile" # create 5mb file of random bytes (remote)
random_bytes_file "12582912" "$SRC_DIR/multisegment-upload-testfile" # create 2 x 6mb file of random bytes (remote)
random_bytes_file "9437184" "$SRC_DIR/diff-size-segments" # create 9mb file of random bytes (remote)
random_bytes_file "100KiB" "$SRC_DIR/put-file" # create 100KiB file of random bytes (remote)
UPLINK_DEBUG_ADDR=""
uplink --access "$GATEWAY_0_ACCESS" --debug.addr "$UPLINK_DEBUG_ADDR" mb "sj://$BUCKET/"
export STORJ_ACCESS=$GATEWAY_0_ACCESS
export STORJ_DEBUG_ADDR=$UPLINK_DEBUG_ADDR
uplink --access "$GATEWAY_0_ACCESS" --progress=false --debug.addr "$UPLINK_DEBUG_ADDR" cp "$SRC_DIR/small-upload-testfile" "sj://$BUCKET/"
uplink --access "$GATEWAY_0_ACCESS" --progress=false --debug.addr "$UPLINK_DEBUG_ADDR" cp "$SRC_DIR/big-upload-testfile" "sj://$BUCKET/"
uplink --access "$GATEWAY_0_ACCESS" --progress=false --debug.addr "$UPLINK_DEBUG_ADDR" cp "$SRC_DIR/multisegment-upload-testfile" "sj://$BUCKET/"
uplink --access "$GATEWAY_0_ACCESS" --progress=false --debug.addr "$UPLINK_DEBUG_ADDR" cp "$SRC_DIR/diff-size-segments" "sj://$BUCKET/"
uplink mb "sj://$BUCKET/"
uplink cp "$SRC_DIR/small-upload-testfile" "sj://$BUCKET/" --progress=false
uplink cp "$SRC_DIR/big-upload-testfile" "sj://$BUCKET/" --progress=false
uplink cp "$SRC_DIR/multisegment-upload-testfile" "sj://$BUCKET/" --progress=false
uplink cp "$SRC_DIR/diff-size-segments" "sj://$BUCKET/" --progress=false
cat "$SRC_DIR/put-file" | uplink put "sj://$BUCKET/put-file"
uplink --config-dir "$UPLINK_DIR" import named-access $GATEWAY_0_ACCESS
FILES=$(uplink --config-dir "$UPLINK_DIR" --access named-access ls "sj://$BUCKET" | wc -l)
EXPECTED_FILES="4"
FILES=$(STORJ_ACCESS= uplink --config-dir "$UPLINK_DIR" --access named-access ls "sj://$BUCKET" | tee $TMPDIR/list | wc -l)
EXPECTED_FILES="5"
if [ "$FILES" == $EXPECTED_FILES ]
then
echo "listing returns $FILES files"
@ -48,60 +65,36 @@ else
exit 1
fi
uplink --access "$GATEWAY_0_ACCESS" --progress=false --debug.addr "$UPLINK_DEBUG_ADDR" cp "sj://$BUCKET/small-upload-testfile" "$DST_DIR"
uplink --access "$GATEWAY_0_ACCESS" --progress=false --debug.addr "$UPLINK_DEBUG_ADDR" cp "sj://$BUCKET/big-upload-testfile" "$DST_DIR"
uplink --access "$GATEWAY_0_ACCESS" --progress=false --debug.addr "$UPLINK_DEBUG_ADDR" cp "sj://$BUCKET/multisegment-upload-testfile" "$DST_DIR"
uplink --access "$GATEWAY_0_ACCESS" --progress=false --debug.addr "$UPLINK_DEBUG_ADDR" cp "sj://$BUCKET/diff-size-segments" "$DST_DIR"
uplink --access "$GATEWAY_0_ACCESS" --debug.addr "$UPLINK_DEBUG_ADDR" rm "sj://$BUCKET/small-upload-testfile"
uplink --access "$GATEWAY_0_ACCESS" --debug.addr "$UPLINK_DEBUG_ADDR" rm "sj://$BUCKET/big-upload-testfile"
uplink --access "$GATEWAY_0_ACCESS" --debug.addr "$UPLINK_DEBUG_ADDR" rm "sj://$BUCKET/multisegment-upload-testfile"
uplink --access "$GATEWAY_0_ACCESS" --debug.addr "$UPLINK_DEBUG_ADDR" rm "sj://$BUCKET/diff-size-segments"
uplink --access "$GATEWAY_0_ACCESS" --debug.addr "$UPLINK_DEBUG_ADDR" ls "sj://$BUCKET"
uplink --access "$GATEWAY_0_ACCESS" --debug.addr "$UPLINK_DEBUG_ADDR" rb "sj://$BUCKET"
if cmp "$SRC_DIR/small-upload-testfile" "$DST_DIR/small-upload-testfile"
SIZE_CHECK=$(cat "$TMPDIR/list" | awk '{if($4 == "0") print "invalid size";}')
if [ "$SIZE_CHECK" != "" ]
then
echo "small upload testfile matches uploaded file"
else
echo "small upload testfile does not match uploaded file"
echo "listing returns invalid size for one of the objects:"
cat "$TMPDIR/list"
exit 1
fi
if cmp "$SRC_DIR/big-upload-testfile" "$DST_DIR/big-upload-testfile"
then
echo "big upload testfile matches uploaded file"
else
echo "big upload testfile does not match uploaded file"
exit 1
fi
uplink ls "sj://$BUCKET/non-existing-prefix"
if cmp "$SRC_DIR/multisegment-upload-testfile" "$DST_DIR/multisegment-upload-testfile"
then
echo "multisegment upload testfile matches uploaded file"
else
echo "multisegment upload testfile does not match uploaded file"
exit 1
fi
uplink cp "sj://$BUCKET/small-upload-testfile" "$DST_DIR" --progress=false
uplink cp "sj://$BUCKET/big-upload-testfile" "$DST_DIR" --progress=false
uplink cp "sj://$BUCKET/multisegment-upload-testfile" "$DST_DIR" --progress=false
uplink cp "sj://$BUCKET/diff-size-segments" "$DST_DIR" --progress=false
uplink cp "sj://$BUCKET/put-file" "$DST_DIR" --progress=false
uplink cat "sj://$BUCKET/put-file" >> "$DST_DIR/put-file-from-cat"
if cmp "$SRC_DIR/diff-size-segments" "$DST_DIR/diff-size-segments"
then
echo "diff-size-segments testfile matches uploaded file"
else
echo "diff-size-segments testfile does not match uploaded file"
exit 1
fi
uplink rm "sj://$BUCKET/small-upload-testfile"
uplink rm "sj://$BUCKET/big-upload-testfile"
uplink rm "sj://$BUCKET/multisegment-upload-testfile"
uplink rm "sj://$BUCKET/diff-size-segments"
uplink rm "sj://$BUCKET/put-file"
uplink ls "sj://$BUCKET"
# check if all data files were removed
# FILES=$(find "$STORAGENODE_0_DIR/../" -type f -path "*/blob/*" ! -name "info.*")
# if [ -z "$FILES" ];
# then
# echo "all data files removed from storage nodes"
# else
# echo "not all data files removed from storage nodes:"
# echo $FILES
# exit 1
# fi
uplink rb "sj://$BUCKET"
compare_files "$SRC_DIR/small-upload-testfile" "$DST_DIR/small-upload-testfile"
compare_files "$SRC_DIR/big-upload-testfile" "$DST_DIR/big-upload-testfile"
compare_files "$SRC_DIR/multisegment-upload-testfile" "$DST_DIR/multisegment-upload-testfile"
compare_files "$SRC_DIR/diff-size-segments" "$DST_DIR/diff-size-segments"
compare_files "$SRC_DIR/put-file" "$DST_DIR/put-file"
compare_files "$SRC_DIR/put-file" "$DST_DIR/put-file-from-cat"