cmd/uplinkng: cp

Change-Id: I9c251028d9f72572f4d42815de31d44517cd5525
This commit is contained in:
Jeff Wendling 2021-04-06 15:19:11 -04:00
parent 136af8e630
commit 46a3242ed4
15 changed files with 946 additions and 153 deletions

View File

@ -31,7 +31,7 @@ func init() {
setBasicFlags(rmCmd.Flags(), "encrypted")
}
func deleteObject(cmd *cobra.Command, args []string) error {
func deleteObject(cmd *cobra.Command, args []string) (err error) {
ctx, _ := withTelemetry(cmd)
if len(args) == 0 {
@ -64,7 +64,7 @@ func deleteObject(cmd *cobra.Command, args []string) error {
if err != nil {
return convertError(err, dst)
}
} else if list.Err() != nil {
} else if err := list.Err(); err != nil {
return convertError(err, dst)
}
} else if _, err = project.DeleteObject(ctx, dst.Bucket(), dst.Path()); err != nil {

View File

@ -67,6 +67,12 @@ func TestRmPending(t *testing.T) {
require.NoError(t, err)
}
// Ensure all of the objects exist
{
require.True(t, pendingObjectExists(ctx, satellite, project, bucketName, "pending-object"))
require.True(t, pendingObjectExists(ctx, satellite, project, bucketName, "prefixed/pending-object"))
}
// Try to delete a non-existing object.
{
cmd := exec.Command(uplinkExe,

View File

@ -13,7 +13,7 @@ import (
// accessPermissions holds flags and provides a Setup method for commands that
// have to modify permissions on access grants.
type accessPermissions struct {
paths []string // paths is the set of path prefixes that the grant will be limited to
prefixes []string // prefixes is the set of path prefixes that the grant will be limited to
readonly bool // implies disallowWrites and disallowDeletes
writeonly bool // implies disallowReads and disallowLists
@ -28,7 +28,7 @@ type accessPermissions struct {
}
func (ap *accessPermissions) Setup(a clingy.Arguments, f clingy.Flags) {
ap.paths = f.New("path", "Path prefix access will be restricted to", []string{},
ap.prefixes = f.New("prefix", "Key prefix access will be restricted to", []string{},
clingy.Repeated).([]string)
ap.readonly = f.New("readonly", "Implies --disallow-writes and --disallow-deletes", true,

View File

@ -4,17 +4,23 @@
package main
import (
"fmt"
"io"
"strconv"
progressbar "github.com/cheggaaa/pb/v3"
"github.com/zeebo/clingy"
"github.com/zeebo/errs"
)
type cmdCp struct {
projectProvider
recursive bool
source string
dest string
dryrun bool
source Location
dest Location
}
func (c *cmdCp) Setup(a clingy.Arguments, f clingy.Flags) {
@ -24,11 +30,114 @@ func (c *cmdCp) Setup(a clingy.Arguments, f clingy.Flags) {
clingy.Short('r'),
clingy.Transform(strconv.ParseBool),
).(bool)
c.dryrun = f.New("dryrun", "Print what operations would happen but don't execute them", false,
clingy.Transform(strconv.ParseBool),
).(bool)
c.source = a.New("source", "Source to copy").(string)
c.dest = a.New("dest", "Desination to copy").(string)
c.source = a.New("source", "Source to copy", clingy.Transform(parseLocation)).(Location)
c.dest = a.New("dest", "Desination to copy", clingy.Transform(parseLocation)).(Location)
}
func (c *cmdCp) Execute(ctx clingy.Context) error {
fs, err := c.OpenFilesystem(ctx)
if err != nil {
return err
}
defer func() { _ = fs.Close() }()
if c.recursive {
return c.copyRecursive(ctx, fs)
}
return c.copyFile(ctx, fs, c.source, c.dest, true)
}
func (c *cmdCp) copyRecursive(ctx clingy.Context, fs filesystem) error {
if c.source.Std() || c.dest.Std() {
return errs.New("cannot recursively copy to stdin/stdout")
}
var anyFailed bool
iter, err := fs.ListObjects(ctx, c.source, true)
if err != nil {
return err
}
for iter.Next() {
rel, err := c.source.RelativeTo(iter.Item().Loc)
if err != nil {
return err
}
source := iter.Item().Loc
dest := c.dest.AppendKey(rel)
if err := c.copyFile(ctx, fs, source, dest, false); err != nil {
fmt.Fprintln(ctx.Stderr(), copyVerb(source, dest), "failed:", err.Error())
anyFailed = true
}
}
if err := iter.Err(); err != nil {
return errs.Wrap(err)
} else if anyFailed {
return errs.New("some downloads failed")
}
return nil
}
func (c *cmdCp) copyFile(ctx clingy.Context, fs filesystem, source, dest Location, progress bool) error {
if isDir := fs.IsLocalDir(ctx, dest); isDir {
base, ok := source.Base()
if !ok {
return errs.New("destination is a directory and cannot find base name for %q", source)
}
dest = dest.AppendKey(base)
}
if !source.Std() && !dest.Std() {
fmt.Println(copyVerb(source, dest), source, "to", dest)
}
if c.dryrun {
return nil
}
rh, err := fs.Open(ctx, source)
if err != nil {
return err
}
defer func() { _ = rh.Close() }()
wh, err := fs.Create(ctx, dest)
if err != nil {
return err
}
defer func() { _ = wh.Abort() }()
var bar *progressbar.ProgressBar
var writer io.Writer = wh
if length := rh.Info().ContentLength; progress && length >= 0 && !c.dest.Std() {
bar = progressbar.New64(length).SetWriter(ctx.Stdout())
writer = bar.NewProxyWriter(writer)
bar.Start()
defer bar.Finish()
}
if _, err := io.Copy(writer, rh); err != nil {
return errs.Combine(err, wh.Abort())
}
return errs.Wrap(wh.Commit())
}
func copyVerb(source, dest Location) string {
switch {
case dest.remote:
return "upload"
case source.remote:
return "download"
default:
return "copy"
}
}

View File

@ -5,13 +5,9 @@ package main
import (
"strconv"
"strings"
"time"
"github.com/zeebo/clingy"
"github.com/zeebo/errs"
"storj.io/uplink"
)
type cmdLs struct {
@ -21,7 +17,7 @@ type cmdLs struct {
encrypted bool
pending bool
prefix *string
prefix *Location
}
func (c *cmdLs) Setup(a clingy.Arguments, f clingy.Flags) {
@ -31,21 +27,23 @@ func (c *cmdLs) Setup(a clingy.Arguments, f clingy.Flags) {
clingy.Short('r'),
clingy.Transform(strconv.ParseBool),
).(bool)
c.encrypted = f.New("encrypted", "Shows paths as base64-encoded encrypted paths", false,
c.encrypted = f.New("encrypted", "Shows keys base64 encoded without decrypting", false,
clingy.Transform(strconv.ParseBool),
).(bool)
c.pending = f.New("pending", "List pending multipart object uploads instead", false,
c.pending = f.New("pending", "List pending object uploads instead", false,
clingy.Transform(strconv.ParseBool),
).(bool)
c.prefix = a.New("prefix", "Prefix to list (sj://BUCKET[/KEY])", clingy.Optional).(*string)
c.prefix = a.New("prefix", "Prefix to list (sj://BUCKET[/KEY])", clingy.Optional,
clingy.Transform(parseLocation),
).(*Location)
}
func (c *cmdLs) Execute(ctx clingy.Context) error {
if c.prefix == nil {
return c.listBuckets(ctx)
}
return c.listPath(ctx, *c.prefix)
return c.listLocation(ctx, *c.prefix)
}
func (c *cmdLs) listBuckets(ctx clingy.Context) error {
@ -66,62 +64,34 @@ func (c *cmdLs) listBuckets(ctx clingy.Context) error {
return iter.Err()
}
func (c *cmdLs) listPath(ctx clingy.Context, path string) error {
bucket, key, ok, err := parsePath(path)
if err != nil {
return err
} else if !ok {
return errs.New("no bucket specified. use format sj://bucket")
}
project, err := c.OpenProject(ctx, bypassEncryption(c.encrypted))
func (c *cmdLs) listLocation(ctx clingy.Context, prefix Location) error {
fs, err := c.OpenFilesystem(ctx, bypassEncryption(c.encrypted))
if err != nil {
return err
}
defer func() { _ = project.Close() }()
defer func() { _ = fs.Close() }()
tw := newTabbedWriter(ctx.Stdout(), "KIND", "CREATED", "SIZE", "KEY")
defer tw.Done()
// in order to get a correct listing, including non-terminating components, what we
// must do is pop the last component off, ensuring the prefix is either empty or
// ends with a /, list there, then filter the results locally against the popped component.
prefix, filter := "", key
if idx := strings.LastIndexByte(key, '/'); idx >= 0 {
prefix, filter = key[:idx+1], key[idx+1:]
}
// create the object iterator of either existing objects or pending multipart uploads
var iter listObjectIterator
var iter objectIterator
if c.pending {
iter = (*uplinkUploadIterator)(project.ListUploads(ctx, bucket,
&uplink.ListUploadsOptions{
Prefix: prefix,
Recursive: c.recursive,
System: true,
}))
iter, err = fs.ListUploads(ctx, prefix, c.recursive)
} else {
iter = (*uplinkObjectIterator)(project.ListObjects(ctx, bucket,
&uplink.ListObjectsOptions{
Prefix: prefix,
Recursive: c.recursive,
System: true,
}))
iter, err = fs.ListObjects(ctx, prefix, c.recursive)
}
if err != nil {
return err
}
// iterate and print the results
for iter.Next() {
obj := iter.Item()
key := obj.Key[len(prefix):]
if !strings.HasPrefix(key, filter) {
continue
}
if obj.IsPrefix {
tw.WriteLine("PRE", "", "", key)
tw.WriteLine("PRE", "", "", obj.Loc.Key())
} else {
tw.WriteLine("OBJ", formatTime(obj.Created), obj.ContentLength, key)
tw.WriteLine("OBJ", formatTime(obj.Created), obj.ContentLength, obj.Loc.Key())
}
}
return iter.Err()
@ -130,47 +100,3 @@ func (c *cmdLs) listPath(ctx clingy.Context, path string) error {
func formatTime(x time.Time) string {
return x.Local().Format("2006-01-02 15:04:05")
}
// the following code wraps the two list iterator types behind an interface so that
// the list code can be generic against either of them.
type listObjectIterator interface {
Next() bool
Err() error
Item() listObject
}
type listObject struct {
Key string
IsPrefix bool
Created time.Time
ContentLength int64
}
type uplinkObjectIterator uplink.ObjectIterator
func (u *uplinkObjectIterator) Next() bool { return (*uplink.ObjectIterator)(u).Next() }
func (u *uplinkObjectIterator) Err() error { return (*uplink.ObjectIterator)(u).Err() }
func (u *uplinkObjectIterator) Item() listObject {
obj := (*uplink.ObjectIterator)(u).Item()
return listObject{
Key: obj.Key,
IsPrefix: obj.IsPrefix,
Created: obj.System.Created,
ContentLength: obj.System.ContentLength,
}
}
type uplinkUploadIterator uplink.UploadIterator
func (u *uplinkUploadIterator) Next() bool { return (*uplink.UploadIterator)(u).Next() }
func (u *uplinkUploadIterator) Err() error { return (*uplink.UploadIterator)(u).Err() }
func (u *uplinkUploadIterator) Item() listObject {
obj := (*uplink.UploadIterator)(u).Item()
return listObject{
Key: obj.Key,
IsPrefix: obj.IsPrefix,
Created: obj.System.Created,
ContentLength: obj.System.ContentLength,
}
}

View File

@ -10,14 +10,14 @@ import (
type cmdMetaGet struct {
projectProvider
path string
entry *string
location string
entry *string
}
func (c *cmdMetaGet) Setup(a clingy.Arguments, f clingy.Flags) {
c.projectProvider.Setup(a, f)
c.path = a.New("path", "Path to object (sj://BUCKET/KEY)").(string)
c.location = a.New("location", "Location of object (sj://BUCKET/KEY)").(string)
c.entry = a.New("entry", "Metadata entry to get", clingy.Optional).(*string)
}

View File

@ -7,6 +7,7 @@ import (
"strconv"
"github.com/zeebo/clingy"
"github.com/zeebo/errs"
)
type cmdRm struct {
@ -15,23 +16,40 @@ type cmdRm struct {
recursive bool
encrypted bool
path string
location string
}
func (c *cmdRm) Setup(a clingy.Arguments, f clingy.Flags) {
c.projectProvider.Setup(a, f)
c.recursive = f.New("recursive", "List recursively", false,
c.recursive = f.New("recursive", "Remove recursively", false,
clingy.Short('r'),
clingy.Transform(strconv.ParseBool),
).(bool)
c.encrypted = f.New("encrypted", "Shows paths as base64-encoded encrypted paths", false,
c.encrypted = f.New("encrypted", "Interprets keys base64 encoded without decrypting", false,
clingy.Transform(strconv.ParseBool),
).(bool)
c.path = a.New("path", "Path to remove (sj://BUCKET[/KEY])").(string)
c.location = a.New("location", "Location to remove (sj://BUCKET[/KEY])").(string)
}
func (c *cmdRm) Execute(ctx clingy.Context) error {
return nil
project, err := c.OpenProject(ctx, bypassEncryption(c.encrypted))
if err != nil {
return err
}
defer func() { _ = project.Close() }()
// TODO: use the filesystem interface
// TODO: recursive remove
p, err := parseLocation(c.location)
if err != nil {
return err
} else if !p.remote {
return errs.New("can only delete remote objects")
}
_, err = project.DeleteObject(ctx, p.bucket, p.key)
return err
}

265
cmd/uplinkng/filesystem.go Normal file
View File

@ -0,0 +1,265 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"context"
"io"
"os"
"strings"
"time"
"github.com/zeebo/clingy"
"github.com/zeebo/errs"
"storj.io/uplink"
)
// filesystem represents either the local filesystem or the data backed by a project.
type filesystem interface {
Close() error
Open(ctx clingy.Context, loc Location) (readHandle, error)
Create(ctx clingy.Context, loc Location) (writeHandle, error)
ListObjects(ctx context.Context, prefix Location, recursive bool) (objectIterator, error)
ListUploads(ctx context.Context, prefix Location, recursive bool) (objectIterator, error)
IsLocalDir(ctx context.Context, loc Location) bool
}
//
// object info
//
// objectInfo is a simpler *uplink.Object that contains the minimal information the
// uplink command needs that multiple types can be converted to.
type objectInfo struct {
Loc Location
IsPrefix bool
Created time.Time
ContentLength int64
}
// uplinkObjectToObjectInfo returns an objectInfo converted from an *uplink.Object.
func uplinkObjectToObjectInfo(bucket string, obj *uplink.Object) objectInfo {
return objectInfo{
Loc: Location{
bucket: bucket,
key: obj.Key,
remote: true,
},
IsPrefix: obj.IsPrefix,
Created: obj.System.Created,
ContentLength: obj.System.ContentLength,
}
}
// uplinkUploadInfoToObjectInfo returns an objectInfo converted from an *uplink.Object.
func uplinkUploadInfoToObjectInfo(bucket string, upl *uplink.UploadInfo) objectInfo {
return objectInfo{
Loc: Location{
bucket: bucket,
key: upl.Key,
remote: true,
},
IsPrefix: upl.IsPrefix,
Created: upl.System.Created,
ContentLength: upl.System.ContentLength,
}
}
//
// read handles
//
// readHandle is something that can be read from.
type readHandle interface {
io.Reader
io.Closer
Info() objectInfo
}
// uplinkReadHandle implements readHandle for *uplink.Downloads.
type uplinkReadHandle struct {
bucket string
dl *uplink.Download
}
// newUplinkReadHandle constructs an *uplinkReadHandle from an *uplink.Download.
func newUplinkReadHandle(bucket string, dl *uplink.Download) *uplinkReadHandle {
return &uplinkReadHandle{
bucket: bucket,
dl: dl,
}
}
func (u *uplinkReadHandle) Read(p []byte) (int, error) { return u.dl.Read(p) }
func (u *uplinkReadHandle) Close() error { return u.dl.Close() }
func (u *uplinkReadHandle) Info() objectInfo { return uplinkObjectToObjectInfo(u.bucket, u.dl.Info()) }
// osReadHandle implements readHandle for *os.Files.
type osReadHandle struct {
raw *os.File
info objectInfo
}
// newOsReadHandle constructs an *osReadHandle from an *os.File.
func newOSReadHandle(fh *os.File) (*osReadHandle, error) {
fi, err := fh.Stat()
if err != nil {
return nil, errs.Wrap(err)
}
return &osReadHandle{
raw: fh,
info: objectInfo{
Loc: Location{path: fh.Name()},
IsPrefix: false,
Created: fi.ModTime(), // TODO: os specific crtime
ContentLength: fi.Size(),
},
}, nil
}
func (o *osReadHandle) Read(p []byte) (int, error) { return o.raw.Read(p) }
func (o *osReadHandle) Close() error { return o.raw.Close() }
func (o *osReadHandle) Info() objectInfo { return o.info }
// genericReadHandle implements readHandle for an io.Reader.
type genericReadHandle struct{ r io.Reader }
// newGenericReadHandle constructs a *genericReadHandle from any io.Reader.
func newGenericReadHandle(r io.Reader) *genericReadHandle {
return &genericReadHandle{r: r}
}
func (g *genericReadHandle) Read(p []byte) (int, error) { return g.r.Read(p) }
func (g *genericReadHandle) Close() error { return nil }
func (g *genericReadHandle) Info() objectInfo { return objectInfo{ContentLength: -1} }
//
// write handles
//
// writeHandle is anything that can be written to with commit/abort semantics.
type writeHandle interface {
io.Writer
Commit() error
Abort() error
}
// uplinkWriteHandle implements writeHandle for *uplink.Uploads.
type uplinkWriteHandle uplink.Upload
// newUplinkWriteHandle constructs an *uplinkWriteHandle from an *uplink.Upload.
func newUplinkWriteHandle(dl *uplink.Upload) *uplinkWriteHandle {
return (*uplinkWriteHandle)(dl)
}
func (u *uplinkWriteHandle) raw() *uplink.Upload {
return (*uplink.Upload)(u)
}
func (u *uplinkWriteHandle) Write(p []byte) (int, error) { return u.raw().Write(p) }
func (u *uplinkWriteHandle) Commit() error { return u.raw().Commit() }
func (u *uplinkWriteHandle) Abort() error { return u.raw().Abort() }
// osWriteHandle implements writeHandle for *os.Files.
type osWriteHandle struct {
fh *os.File
done bool
}
// newOSWriteHandle constructs an *osWriteHandle from an *os.File.
func newOSWriteHandle(fh *os.File) *osWriteHandle {
return &osWriteHandle{fh: fh}
}
func (o *osWriteHandle) Write(p []byte) (int, error) { return o.fh.Write(p) }
func (o *osWriteHandle) Commit() error {
if o.done {
return nil
}
o.done = true
return o.fh.Close()
}
func (o *osWriteHandle) Abort() error {
if o.done {
return nil
}
o.done = true
return errs.Combine(
o.fh.Close(),
os.Remove(o.fh.Name()),
)
}
// genericWriteHandle implements writeHandle for an io.Writer.
type genericWriteHandle struct{ w io.Writer }
// newGenericWriteHandle constructs a *genericWriteHandle from an io.Writer.
func newGenericWriteHandle(w io.Writer) *genericWriteHandle {
return &genericWriteHandle{w: w}
}
func (g *genericWriteHandle) Write(p []byte) (int, error) { return g.w.Write(p) }
func (g *genericWriteHandle) Commit() error { return nil }
func (g *genericWriteHandle) Abort() error { return nil }
//
// object iteration
//
// objectIterator is an interface type for iterating over objectInfo values.
type objectIterator interface {
Next() bool
Err() error
Item() objectInfo
}
// filteredObjectIterator removes any iteration entries that do not begin with the filter.
// all entries must begin with the trim string which is removed before checking for the
// filter.
type filteredObjectIterator struct {
trim string
filter string
iter objectIterator
}
func (f *filteredObjectIterator) Next() bool {
for {
if !f.iter.Next() {
return false
}
key := f.iter.Item().Loc.Key()
if !strings.HasPrefix(key, f.trim) {
return false
}
if strings.HasPrefix(key, f.filter) {
return true
}
}
}
func (f *filteredObjectIterator) Err() error { return f.iter.Err() }
func (f *filteredObjectIterator) Item() objectInfo {
item := f.iter.Item()
path := item.Loc
if path.remote {
path.key = path.key[len(f.trim):]
} else {
path.path = path.path[len(f.trim):]
}
item.Loc = path
return item
}
// emptyObjectIterator is an objectIterator that has no objects.
type emptyObjectIterator struct{}
func (emptyObjectIterator) Next() bool { return false }
func (emptyObjectIterator) Err() error { return nil }
func (emptyObjectIterator) Item() objectInfo { return objectInfo{} }

View File

@ -0,0 +1,163 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"context"
"io/ioutil"
"os"
"path/filepath"
"strings"
"github.com/zeebo/errs"
)
// filesystemLocal implements something close to a filesystem but backed by the local disk.
type filesystemLocal struct{}
func (l *filesystemLocal) abs(path string) (string, error) {
abs, err := filepath.Abs(path)
if err != nil {
return "", errs.Wrap(err)
}
if strings.HasSuffix(path, string(filepath.Separator)) &&
!strings.HasSuffix(abs, string(filepath.Separator)) {
abs += string(filepath.Separator)
}
return abs, nil
}
func (l *filesystemLocal) Open(ctx context.Context, path string) (readHandle, error) {
path, err := l.abs(path)
if err != nil {
return nil, err
}
fh, err := os.Open(path)
if err != nil {
return nil, errs.Wrap(err)
}
return newOSReadHandle(fh)
}
func (l *filesystemLocal) Create(ctx context.Context, path string) (writeHandle, error) {
path, err := l.abs(path)
if err != nil {
return nil, err
}
fi, err := os.Stat(path)
if err != nil && !os.IsNotExist(err) {
return nil, errs.Wrap(err)
} else if err == nil && fi.IsDir() {
return nil, errs.New("path exists as a directory already")
}
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
return nil, errs.Wrap(err)
}
// TODO: atomic rename
fh, err := os.Create(path)
if err != nil {
return nil, errs.Wrap(err)
}
return newOSWriteHandle(fh), nil
}
func (l *filesystemLocal) ListObjects(ctx context.Context, path string, recursive bool) (objectIterator, error) {
path, err := l.abs(path)
if err != nil {
return nil, err
}
prefix := path
if idx := strings.LastIndexByte(path, filepath.Separator); idx >= 0 {
prefix = path[:idx+1]
}
var files []os.FileInfo
if recursive {
err = filepath.Walk(prefix, func(path string, info os.FileInfo, err error) error {
if err == nil && !info.IsDir() {
files = append(files, &namedFileInfo{
FileInfo: info,
name: path[len(prefix):],
})
}
return nil
})
} else {
files, err = ioutil.ReadDir(prefix)
}
if err != nil {
return nil, err
}
trim := prefix
if recursive {
trim = ""
}
return &filteredObjectIterator{
trim: trim,
filter: path,
iter: &fileinfoObjectIterator{
base: prefix,
files: files,
},
}, nil
}
func (l *filesystemLocal) IsLocalDir(ctx context.Context, path string) bool {
fi, err := os.Stat(path)
if err != nil {
return false
}
return fi.IsDir()
}
type namedFileInfo struct {
os.FileInfo
name string
}
func (n *namedFileInfo) Name() string { return n.name }
type fileinfoObjectIterator struct {
base string
files []os.FileInfo
current os.FileInfo
}
func (fi *fileinfoObjectIterator) Next() bool {
if len(fi.files) == 0 {
return false
}
fi.current, fi.files = fi.files[0], fi.files[1:]
return true
}
func (fi *fileinfoObjectIterator) Err() error { return nil }
func (fi *fileinfoObjectIterator) Item() objectInfo {
name := filepath.Join(fi.base, fi.current.Name())
isDir := fi.current.IsDir()
if isDir {
name += string(filepath.Separator)
}
// TODO(jeff): is this the right thing to do on windows? is there more to do?
// convert the paths to be forward slash based because keys are supposed to always be remote
if filepath.Separator != '/' {
name = strings.ReplaceAll(name, string(filepath.Separator), "/")
}
return objectInfo{
Loc: Location{path: name},
IsPrefix: isDir,
Created: fi.current.ModTime(), // TODO: use real crtime
ContentLength: fi.current.Size(),
}
}

View File

@ -0,0 +1,62 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"context"
"github.com/zeebo/clingy"
)
//
// filesystemMixed dispatches to either the local or remote filesystem depending on the path
//
type filesystemMixed struct {
local *filesystemLocal
remote *filesystemRemote
}
func (m *filesystemMixed) Close() error {
return m.remote.Close()
}
func (m *filesystemMixed) Open(ctx clingy.Context, loc Location) (readHandle, error) {
if loc.Remote() {
return m.remote.Open(ctx, loc.bucket, loc.key)
} else if loc.Std() {
return newGenericReadHandle(ctx.Stdin()), nil
}
return m.local.Open(ctx, loc.path)
}
func (m *filesystemMixed) Create(ctx clingy.Context, loc Location) (writeHandle, error) {
if loc.Remote() {
return m.remote.Create(ctx, loc.bucket, loc.key)
} else if loc.Std() {
return newGenericWriteHandle(ctx.Stdout()), nil
}
return m.local.Create(ctx, loc.path)
}
func (m *filesystemMixed) ListObjects(ctx context.Context, prefix Location, recursive bool) (objectIterator, error) {
if prefix.Remote() {
return m.remote.ListObjects(ctx, prefix.bucket, prefix.key, recursive), nil
}
return m.local.ListObjects(ctx, prefix.path, recursive)
}
func (m *filesystemMixed) ListUploads(ctx context.Context, prefix Location, recursive bool) (objectIterator, error) {
if prefix.Remote() {
return m.remote.ListPendingMultiparts(ctx, prefix.bucket, prefix.key, recursive), nil
}
return emptyObjectIterator{}, nil
}
func (m *filesystemMixed) IsLocalDir(ctx context.Context, loc Location) bool {
if !loc.Local() {
return false
}
return m.local.IsLocalDir(ctx, loc.path)
}

View File

@ -0,0 +1,124 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"context"
"strings"
"github.com/zeebo/errs"
"storj.io/uplink"
)
// filesystemRemote implements something close to a filesystem but backed by an uplink project.
type filesystemRemote struct {
project *uplink.Project
}
func (r *filesystemRemote) Close() error {
return r.project.Close()
}
func (r *filesystemRemote) Open(ctx context.Context, bucket, key string) (readHandle, error) {
fh, err := r.project.DownloadObject(ctx, bucket, key, nil)
if err != nil {
return nil, errs.Wrap(err)
}
return newUplinkReadHandle(bucket, fh), nil
}
func (r *filesystemRemote) Create(ctx context.Context, bucket, key string) (writeHandle, error) {
fh, err := r.project.UploadObject(ctx, bucket, key, nil)
if err != nil {
return nil, err
}
return newUplinkWriteHandle(fh), nil
}
func (r *filesystemRemote) ListObjects(ctx context.Context, bucket, prefix string, recursive bool) objectIterator {
parentPrefix := ""
if idx := strings.LastIndexByte(prefix, '/'); idx >= 0 {
parentPrefix = prefix[:idx+1]
}
trim := parentPrefix
if recursive {
trim = ""
}
return &filteredObjectIterator{
trim: trim,
filter: prefix,
iter: newUplinkObjectIterator(bucket, r.project.ListObjects(ctx, bucket,
&uplink.ListObjectsOptions{
Prefix: parentPrefix,
Recursive: recursive,
System: true,
})),
}
}
func (r *filesystemRemote) ListPendingMultiparts(ctx context.Context, bucket, prefix string, recursive bool) objectIterator {
parentPrefix := ""
if idx := strings.LastIndexByte(prefix, '/'); idx >= 0 {
parentPrefix = prefix[:idx+1]
}
trim := parentPrefix
if recursive {
trim = ""
}
return &filteredObjectIterator{
trim: trim,
filter: prefix,
iter: newUplinkUploadIterator(bucket, r.project.ListUploads(ctx, bucket,
&uplink.ListUploadsOptions{
Prefix: parentPrefix,
Recursive: recursive,
System: true,
})),
}
}
// uplinkObjectIterator implements objectIterator for *uplink.ObjectIterator.
type uplinkObjectIterator struct {
bucket string
iter *uplink.ObjectIterator
}
// newUplinkObjectIterator constructs an *uplinkObjectIterator from an *uplink.ObjectIterator.
func newUplinkObjectIterator(bucket string, iter *uplink.ObjectIterator) *uplinkObjectIterator {
return &uplinkObjectIterator{
bucket: bucket,
iter: iter,
}
}
func (u *uplinkObjectIterator) Next() bool { return u.iter.Next() }
func (u *uplinkObjectIterator) Err() error { return u.iter.Err() }
func (u *uplinkObjectIterator) Item() objectInfo {
return uplinkObjectToObjectInfo(u.bucket, u.iter.Item())
}
// uplinkUploadIterator implements objectIterator for *multipart.UploadIterators.
type uplinkUploadIterator struct {
bucket string
iter *uplink.UploadIterator
}
// newUplinkUploadIterator constructs a *uplinkUploadIterator from a *uplink.UploadIterator.
func newUplinkUploadIterator(bucket string, iter *uplink.UploadIterator) *uplinkUploadIterator {
return &uplinkUploadIterator{
bucket: bucket,
iter: iter,
}
}
func (u *uplinkUploadIterator) Next() bool { return u.iter.Next() }
func (u *uplinkUploadIterator) Err() error { return u.iter.Err() }
func (u *uplinkUploadIterator) Item() objectInfo {
return uplinkUploadInfoToObjectInfo(u.bucket, u.iter.Item())
}

View File

@ -103,7 +103,7 @@ func (g *globalFlags) loadOldConfig(r io.Reader) (string, map[string]string, []i
// because they go into a separate file now. check for keys that match
// one of those and stuff them away outside of entries.
if key == "access" {
access = key
access = value
} else if strings.HasPrefix(key, "accesses.") {
accesses[key[len("accesses."):]] = value
} else if section == "accesses" {

151
cmd/uplinkng/location.go Normal file
View File

@ -0,0 +1,151 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"fmt"
"path/filepath"
"strings"
"github.com/zeebo/errs"
)
// Location represets a local path, a remote object, or stdin/stdout.
type Location struct {
path string
bucket string
key string
remote bool
}
func parseLocation(location string) (p Location, err error) {
if location == "-" {
return Location{path: "-", key: "-"}, nil
}
// Locations, Chapter 2, Verses 9 to 21.
//
// And the Devs spake, saying,
// First shalt thou find the Special Prefix "sj:".
// Then, shalt thou count two slashes, no more, no less.
// Two shall be the number thou shalt count,
// and the number of the counting shall be two.
// Three shalt thou not count, nor either count thou one,
// excepting that thou then proceed to two.
// Four is right out!
// Once the number two, being the second number, be reached,
// then interpret thou thy location as a remote location,
// which being made of a bucket and key, shall split it.
if strings.HasPrefix(location, "sj://") || strings.HasPrefix(location, "s3://") {
trimmed := location[5:] // remove the scheme
idx := strings.IndexByte(trimmed, '/') // find the bucket index
// handles sj:// or sj:///foo
if len(trimmed) == 0 || idx == 0 {
return Location{}, errs.New("invalid path: empty bucket in path: %q", location)
}
var bucket, key string
if idx == -1 { // handles sj://foo
bucket, key = trimmed, ""
} else { // handles sj://foo/bar
bucket, key = trimmed[:idx], trimmed[idx+1:]
}
return Location{bucket: bucket, key: key, remote: true}, nil
}
return Location{path: location, remote: false}, nil
}
// Std returns true if the location refers to stdin/stdout.
func (p Location) Std() bool { return p.path == "-" && p.key == "-" }
// Remote returns true if the location is remote.
func (p Location) Remote() bool { return !p.Std() && p.remote }
// Local returns true if the location is local.
func (p Location) Local() bool { return !p.Std() && !p.remote }
// String returns the string form of the location.
func (p Location) String() string {
if p.Std() {
return "-"
} else if p.remote {
return fmt.Sprintf("sj://%s/%s", p.bucket, p.key)
}
return p.path
}
// Key returns either the path or the object key.
func (p Location) Key() string {
if p.remote {
return p.key
}
return p.path
}
// Base returns the last base component of the key.
func (p Location) Base() (string, bool) {
if p.Std() {
return "", false
} else if p.remote {
key := p.key
if idx := strings.LastIndexByte(key, '/'); idx >= 0 {
key = key[idx:]
}
return key, len(key) > 0
}
base := filepath.Base(p.path)
if base == "." || base == string(filepath.Separator) || base == "" {
return "", false
}
return base, true
}
// RelativeTo returns the string that when appended to the location string
// will return a string equivalent to the passed in target location.
func (p Location) RelativeTo(target Location) (string, error) {
if p.Std() || target.Std() {
return "", errs.New("cannot create relative location for stdin/stdout")
} else if target.remote != p.remote {
return "", errs.New("cannot create remote and local relative location")
} else if !target.remote {
abs, err := filepath.Abs(p.path)
if err != nil {
return "", errs.Wrap(err)
}
rel, err := filepath.Rel(abs, target.path)
if err != nil {
return "", errs.Wrap(err)
}
return rel, nil
} else if target.bucket != p.bucket {
return "", errs.New("cannot change buckets in relative remote location")
} else if !strings.HasPrefix(target.key, p.key) {
return "", errs.New("cannot make relative location because keys are not prefixes")
}
return target.key[len(p.key):], nil
}
// AppendKey adds the key to the end of the existing key, separating with the
// appropriate slash if necessary.
func (p Location) AppendKey(key string) Location {
if p.remote {
p.key += key
return p
}
// convert any / to the local filesystem slash if necessary
if filepath.Separator != '/' {
key = strings.ReplaceAll(key, "/", string(filepath.Separator))
}
// clean up issues with // or /../ or /./ etc.
key = filepath.Clean(string(filepath.Separator) + key)[1:]
p.path = filepath.Join(p.path, key)
return p
}

View File

@ -1,44 +0,0 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"strings"
"github.com/zeebo/errs"
)
func parsePath(path string) (bucket, key string, ok bool, err error) {
// Paths, Chapter 2, Verses 9 to 21.
//
// And the Devs spake, saying,
// First shalt thou find the Special Prefix "sj:".
// Then, shalt thou count two slashes, no more, no less.
// Two shall be the number thou shalt count,
// and the number of the counting shall be two.
// Three shalt thou not count, nor either count thou one,
// excepting that thou then proceed to two.
// Four is right out!
// Once the number two, being the second number, be reached,
// then interpret thou thy Path as a remote path,
// which being made of a bucket and key, shall split it.
if strings.HasPrefix(path, "sj://") || strings.HasPrefix(path, "s3://") {
unschemed := path[5:]
bucketIdx := strings.IndexByte(unschemed, '/')
// handles sj:// or sj:///foo
if len(unschemed) == 0 || bucketIdx == 0 {
return "", "", false, errs.New("invalid path: empty bucket in path: %q", path)
}
// handles sj://foo
if bucketIdx == -1 {
return unschemed, "", true, nil
}
return unschemed[:bucketIdx], unschemed[bucketIdx+1:], true, nil
}
return "", "", false, nil
}

View File

@ -21,6 +21,19 @@ func (pp *projectProvider) Setup(a clingy.Arguments, f clingy.Flags) {
pp.access = f.New("access", "Which access to use", "").(string)
}
func (pp *projectProvider) OpenFilesystem(ctx context.Context, options ...projectOption) (filesystem, error) {
project, err := pp.OpenProject(ctx, options...)
if err != nil {
return nil, err
}
return &filesystemMixed{
local: &filesystemLocal{},
remote: &filesystemRemote{
project: project,
},
}, nil
}
func (pp *projectProvider) OpenProject(ctx context.Context, options ...projectOption) (*uplink.Project, error) {
if pp.openProject != nil {
return pp.openProject(ctx)