cmd/uplinkng: add mv command

Add ability to move files and objects.

Change-Id: I4929da730984c06aa578678b1d8c8e9b4aceade8
This commit is contained in:
Michał Niewrzał 2021-10-05 18:48:13 +02:00 committed by Michal Niewrzal
parent 07fad75912
commit 24cf7e8ea6
10 changed files with 395 additions and 3 deletions

View File

@ -63,7 +63,7 @@ func (c *cmdCp) Setup(params clingy.Parameters) {
c.byteRange = params.Flag("range", "Downloads the specified range bytes of an object. For more information about the HTTP Range header, see https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35", "").(string)
c.source = params.Arg("source", "Source to copy", clingy.Transform(ulloc.Parse)).(ulloc.Location)
c.dest = params.Arg("dest", "Desination to copy", clingy.Transform(ulloc.Parse)).(ulloc.Location)
c.dest = params.Arg("dest", "Destination to copy", clingy.Transform(ulloc.Parse)).(ulloc.Location)
}
func (c *cmdCp) Execute(ctx clingy.Context) error {

196
cmd/uplinkng/cmd_mv.go Normal file
View File

@ -0,0 +1,196 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"fmt"
"io"
"strconv"
"sync"
"github.com/zeebo/clingy"
"github.com/zeebo/errs"
"storj.io/common/sync2"
"storj.io/storj/cmd/uplinkng/ulext"
"storj.io/storj/cmd/uplinkng/ulfs"
"storj.io/storj/cmd/uplinkng/ulloc"
)
type cmdMv struct {
ex ulext.External
access string
recursive bool
parallelism int
dryrun bool
progress bool
source ulloc.Location
dest ulloc.Location
}
func newCmdMv(ex ulext.External) *cmdMv {
return &cmdMv{ex: ex}
}
func (c *cmdMv) Setup(params clingy.Parameters) {
c.access = params.Flag("access", "Access name or value to use", "").(string)
c.recursive = params.Flag("recursive", "Move all objects or files under the specified prefix or directory", false,
clingy.Short('r'),
clingy.Transform(strconv.ParseBool),
).(bool)
c.parallelism = params.Flag("parallelism", "Controls how many objects will be moved in parallel", 1,
clingy.Short('p'),
clingy.Transform(strconv.Atoi),
clingy.Transform(func(n int) (int, error) {
if n <= 0 {
return 0, errs.New("parallelism must be at least 1")
}
return n, nil
}),
).(int)
c.dryrun = params.Flag("dryrun", "Print what operations would happen but don't execute them", false,
clingy.Transform(strconv.ParseBool),
).(bool)
c.progress = params.Flag("progress", "Show a progress bar when possible", true,
clingy.Transform(strconv.ParseBool),
).(bool)
c.source = params.Arg("source", "Source to move", clingy.Transform(ulloc.Parse)).(ulloc.Location)
c.dest = params.Arg("dest", "Destination to move", clingy.Transform(ulloc.Parse)).(ulloc.Location)
}
func (c *cmdMv) Execute(ctx clingy.Context) error {
fs, err := c.ex.OpenFilesystem(ctx, c.access)
if err != nil {
return err
}
defer func() { _ = fs.Close() }()
switch {
case c.source.Std() || c.dest.Std():
return errs.New("cannot move to stdin/stdout")
case c.source.String() == "" || c.dest.String() == "": // TODO maybe add Empty() method
return errs.New("both source and dest cannot be empty")
case (c.source.Local() && c.dest.Remote()) || (c.source.Remote() && c.dest.Local()):
return errs.New("source and dest must be both local or both remote")
case c.source.String() == c.dest.String():
return errs.New("source and dest cannot be equal")
case c.recursive && (!c.source.Directoryish() || !c.dest.Directoryish()):
return errs.New("with --recursive flag source and destination must end with '/'")
}
// we ensure the source and destination are lexically directoryish
// if they map to directories. the destination is always converted to be
// directoryish if the copy is recursive.
if fs.IsLocalDir(ctx, c.source) {
c.source = c.source.AsDirectoryish()
}
if c.recursive || fs.IsLocalDir(ctx, c.dest) {
c.dest = c.dest.AsDirectoryish()
}
if c.recursive {
return c.moveRecursive(ctx, fs)
}
// if the destination is directoryish, we add the basename of the source
// to the end of the destination to pick a filename.
var base string
if c.dest.Directoryish() && !c.source.Std() {
// we undirectoryish the source so that we ignore any trailing slashes
// when finding the base name.
var ok bool
base, ok = c.source.Undirectoryish().Base()
if !ok {
return errs.New("destination is a directory and cannot find base name for source %q", c.source)
}
}
c.dest = joinDestWith(c.dest, base)
return c.moveFile(ctx, fs, c.source, c.dest)
}
func (c *cmdMv) moveRecursive(ctx clingy.Context, fs ulfs.Filesystem) error {
iter, err := fs.List(ctx, c.source, &ulfs.ListOptions{
Recursive: true,
})
if err != nil {
return errs.Wrap(err)
}
var (
limiter = sync2.NewLimiter(c.parallelism)
es errs.Group
mu sync.Mutex
)
fprintln := func(w io.Writer, args ...interface{}) {
mu.Lock()
defer mu.Unlock()
fmt.Fprintln(w, args...)
}
addError := func(err error) {
mu.Lock()
defer mu.Unlock()
es.Add(err)
}
items := make([]ulfs.ObjectInfo, 0, 10)
for iter.Next() {
item := iter.Item()
if item.IsPrefix {
continue
}
items = append(items, item)
}
if err := iter.Err(); err != nil {
return errs.Wrap(err)
}
for _, item := range items {
source := item.Loc
rel, err := c.source.RelativeTo(source)
if err != nil {
return err
}
dest := joinDestWith(c.dest, rel)
ok := limiter.Go(ctx, func() {
if c.progress {
fprintln(ctx.Stdout(), "Move", source, "to", dest)
}
if err := c.moveFile(ctx, fs, source, dest); err != nil {
fprintln(ctx.Stderr(), "Move", "failed:", err.Error())
addError(err)
}
})
if !ok {
break
}
}
limiter.Wait()
if len(es) > 0 {
return errs.Wrap(es.Err())
}
return nil
}
func (c *cmdMv) moveFile(ctx clingy.Context, fs ulfs.Filesystem, source, dest ulloc.Location) error {
if c.dryrun {
return nil
}
return errs.Wrap(fs.Move(ctx, source, dest))
}

111
cmd/uplinkng/cmd_mv_test.go Normal file
View File

@ -0,0 +1,111 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"testing"
"storj.io/storj/cmd/uplinkng/ultest"
)
func TestMv(t *testing.T) {
state := ultest.Setup(commands,
ultest.WithFile("sj://b1/file1.txt", "remote"),
ultest.WithFile("/home/user/file1.txt", "local"),
ultest.WithBucket("b2"),
)
t.Run("Basic", func(t *testing.T) {
state.Succeed(t, "mv", "sj://b1/file1.txt", "sj://b1/moved-file1.txt").RequireRemoteFiles(t,
ultest.File{Loc: "sj://b1/moved-file1.txt", Contents: "remote"},
)
state.Succeed(t, "mv", "sj://b1/file1.txt", "sj://b1/prefix/").RequireRemoteFiles(t,
ultest.File{Loc: "sj://b1/prefix/file1.txt", Contents: "remote"},
)
state.Succeed(t, "mv", "/home/user/file1.txt", "/home/user/moved-file1.txt").RequireLocalFiles(t,
ultest.File{Loc: "/home/user/moved-file1.txt", Contents: "local"},
)
state.Fail(t, "mv", "sj://user/not-existing", "sj://user/moved-file1.txt")
state.Fail(t, "mv", "/home/user/not-existing", "/home/user/moved-file1.txt")
state.Fail(t, "mv", "/home/user/file1.txt", "/home/user/file1.txt")
})
t.Run("BucketToBucket", func(t *testing.T) {
state.Succeed(t, "mv", "sj://b1/file1.txt", "sj://b2/file1.txt").RequireRemoteFiles(t,
ultest.File{Loc: "sj://b2/file1.txt", Contents: "remote"},
)
})
t.Run("Relative", func(t *testing.T) {
state.Fail(t, "mv", "sj://b1/file1.txt", "")
state.Fail(t, "mv", "", "sj://b1/moved-file1.txt")
state.Fail(t, "mv", "/home/user/file1.txt", "")
state.Fail(t, "mv", "", "/home/user/moved-file1.txt")
})
t.Run("Mixed", func(t *testing.T) {
state.Fail(t, "mv", "sj://user/file1.txt", "/home/user/file1.txt")
state.Fail(t, "mv", "/home/user/file1.txt", "sj://user/file1.txt")
})
}
func TestMvRecursive(t *testing.T) {
state := ultest.Setup(commands,
ultest.WithFile("sj://b1/file1.txt", "remote"),
ultest.WithFile("sj://b1/foo/file2.txt", "remote"),
ultest.WithFile("sj://b1/foo/file3.txt", "remote"),
ultest.WithFile("/home/user/file1.txt", "local"),
ultest.WithBucket("b2"),
)
t.Run("Basic", func(t *testing.T) {
state.Succeed(t, "mv", "sj://b1/", "sj://b1/prefix/", "--recursive").RequireRemoteFiles(t,
ultest.File{Loc: "sj://b1/prefix/file1.txt", Contents: "remote"},
ultest.File{Loc: "sj://b1/prefix/foo/file2.txt", Contents: "remote"},
ultest.File{Loc: "sj://b1/prefix/foo/file3.txt", Contents: "remote"},
)
state.Succeed(t, "mv", "sj://b1/prefix/", "sj://b1/", "--recursive").RequireRemoteFiles(t,
ultest.File{Loc: "sj://b1/file1.txt", Contents: "remote"},
ultest.File{Loc: "sj://b1/foo/file2.txt", Contents: "remote"},
ultest.File{Loc: "sj://b1/foo/file3.txt", Contents: "remote"},
)
state.Succeed(t, "mv", "sj://b1/foo/", "sj://b1/foo2/", "--recursive").RequireRemoteFiles(t,
ultest.File{Loc: "sj://b1/file1.txt", Contents: "remote"},
ultest.File{Loc: "sj://b1/foo2/file2.txt", Contents: "remote"},
ultest.File{Loc: "sj://b1/foo2/file3.txt", Contents: "remote"},
)
state.Fail(t, "mv", "sj://b1/foo", "sj://b1/foo2", "--recursive")
state.Fail(t, "mv", "sj://b1/foo", "sj://b1/foo2/", "--recursive")
state.Fail(t, "mv", "sj://b1/foo/", "sj://b1/foo2", "--recursive")
state.Fail(t, "mv", "sj://b1/", "/home/user/", "--recursive")
state.Fail(t, "mv", "/home/user/", "sj://user/", "--recursive")
})
t.Run("BucketToBucket", func(t *testing.T) {
state.Succeed(t, "mv", "sj://b1/", "sj://b2/", "--recursive").RequireRemoteFiles(t,
ultest.File{Loc: "sj://b2/file1.txt", Contents: "remote"},
ultest.File{Loc: "sj://b2/foo/file2.txt", Contents: "remote"},
ultest.File{Loc: "sj://b2/foo/file3.txt", Contents: "remote"},
)
})
t.Run("Parallelism", func(t *testing.T) {
state.Succeed(t, "mv", "sj://b1/", "sj://b1/prefix/", "--recursive", "--parallelism", "2").RequireRemoteFiles(t,
ultest.File{Loc: "sj://b1/prefix/file1.txt", Contents: "remote"},
ultest.File{Loc: "sj://b1/prefix/foo/file2.txt", Contents: "remote"},
ultest.File{Loc: "sj://b1/prefix/foo/file3.txt", Contents: "remote"},
)
state.Fail(t, "mv", "sj://b1/", "sj://b1/prefix/", "--recursive", "--parallelism", "0")
})
}

View File

@ -49,6 +49,7 @@ func commands(cmds clingy.Commands, ex ulext.External) {
cmds.New("mb", "Create a new bucket", newCmdMb(ex))
cmds.New("rb", "Remove a bucket bucket", newCmdRb(ex))
cmds.New("cp", "Copies files or objects into or out of storj", newCmdCp(ex))
cmds.New("mv", "Moves files or objects", newCmdMv(ex))
cmds.New("ls", "Lists buckets, prefixes, or objects", newCmdLs(ex))
cmds.New("rm", "Remove an object", newCmdRm(ex))
cmds.Group("meta", "Object metadata related commands", func() {

View File

@ -44,6 +44,7 @@ type Filesystem interface {
Close() error
Open(ctx clingy.Context, loc ulloc.Location, opts *OpenOptions) (ReadHandle, error)
Create(ctx clingy.Context, loc ulloc.Location) (WriteHandle, error)
Move(ctx clingy.Context, source, dest ulloc.Location) error
Remove(ctx context.Context, loc ulloc.Location, opts *RemoveOptions) error
List(ctx context.Context, prefix ulloc.Location, opts *ListOptions) (ObjectIterator, error)
IsLocalDir(ctx context.Context, loc ulloc.Location) bool

View File

@ -61,6 +61,11 @@ func (l *Local) Create(ctx context.Context, path string) (WriteHandle, error) {
return newOSWriteHandle(fh), nil
}
// Move moves file to provided path.
func (l *Local) Move(ctx context.Context, oldpath, newpath string) error {
return os.Rename(oldpath, newpath)
}
// Remove unlinks the file at the path. It is not an error if the file does not exist.
func (l *Local) Remove(ctx context.Context, path string, opts *RemoveOptions) error {
if opts.isPending() {

View File

@ -51,6 +51,20 @@ func (m *Mixed) Create(ctx clingy.Context, loc ulloc.Location) (WriteHandle, err
return newGenericWriteHandle(ctx.Stdout()), nil
}
// Move moves either a local file or remote object.
func (m *Mixed) Move(ctx clingy.Context, source, dest ulloc.Location) error {
if oldbucket, oldkey, ok := source.RemoteParts(); ok {
if newbucket, newkey, ok := dest.RemoteParts(); ok {
return m.remote.Move(ctx, oldbucket, oldkey, newbucket, newkey)
}
} else if oldpath, ok := source.LocalParts(); ok {
if newpath, ok := dest.LocalParts(); ok {
return m.local.Move(ctx, oldpath, newpath)
}
}
return errs.New("moving objects between local and remote is not supported")
}
// Remove deletes either a local file or remote object.
func (m *Mixed) Remove(ctx context.Context, loc ulloc.Location, opts *RemoveOptions) error {
if bucket, key, ok := loc.RemoteParts(); ok {

View File

@ -65,6 +65,11 @@ func (r *Remote) Create(ctx context.Context, bucket, key string) (WriteHandle, e
return newUplinkWriteHandle(fh), nil
}
// Move moves object to provided key and bucket.
func (r *Remote) Move(ctx context.Context, oldbucket, oldkey, newbucket, newkey string) error {
return errs.Wrap(r.project.MoveObject(ctx, oldbucket, oldkey, newbucket, newkey, nil))
}
// Remove deletes the object at the provided key and bucket.
func (r *Remote) Remove(ctx context.Context, bucket, key string, opts *RemoveOptions) error {
if !opts.isPending() {

View File

@ -9,6 +9,7 @@ import (
"path/filepath"
"sort"
"strings"
"sync"
"time"
"github.com/zeebo/clingy"
@ -29,6 +30,8 @@ type testFilesystem struct {
pending map[ulloc.Location][]*memWriteHandle
locals map[string]bool // true means path is a directory
buckets map[string]struct{}
mu sync.Mutex
}
func newTestFilesystem() *testFilesystem {
@ -78,13 +81,16 @@ func (tfs *testFilesystem) Close() error {
}
func (tfs *testFilesystem) Open(ctx clingy.Context, loc ulloc.Location, opts *ulfs.OpenOptions) (_ ulfs.ReadHandle, err error) {
tfs.mu.Lock()
defer tfs.mu.Unlock()
if loc.Std() {
return &byteReadHandle{Buffer: bytes.NewBufferString("-")}, nil
}
mf, ok := tfs.files[loc]
if !ok {
return nil, errs.New("file does not exist")
return nil, errs.New("file does not exist %q", loc)
}
if opts != nil {
@ -95,6 +101,9 @@ func (tfs *testFilesystem) Open(ctx clingy.Context, loc ulloc.Location, opts *ul
}
func (tfs *testFilesystem) Create(ctx clingy.Context, loc ulloc.Location) (_ ulfs.WriteHandle, err error) {
tfs.mu.Lock()
defer tfs.mu.Unlock()
if loc.Std() {
return new(discardWriteHandle), nil
}
@ -106,7 +115,7 @@ func (tfs *testFilesystem) Create(ctx clingy.Context, loc ulloc.Location) (_ ulf
}
if path, ok := loc.LocalParts(); ok {
if loc.Directoryish() || tfs.IsLocalDir(ctx, loc) {
if loc.Directoryish() || tfs.isLocalDir(ctx, loc) {
return nil, errs.New("unable to open file for writing: %q", loc)
}
dir := ulloc.CleanPath(filepath.Dir(path))
@ -130,7 +139,23 @@ func (tfs *testFilesystem) Create(ctx clingy.Context, loc ulloc.Location) (_ ulf
return wh, nil
}
func (tfs *testFilesystem) Move(ctx clingy.Context, source, dest ulloc.Location) error {
tfs.mu.Lock()
defer tfs.mu.Unlock()
mf, ok := tfs.files[source]
if !ok {
return errs.New("file does not exist %q", source)
}
delete(tfs.files, source)
tfs.files[dest] = mf
return nil
}
func (tfs *testFilesystem) Remove(ctx context.Context, loc ulloc.Location, opts *ulfs.RemoveOptions) error {
tfs.mu.Lock()
defer tfs.mu.Unlock()
if opts == nil || !opts.Pending {
delete(tfs.files, loc)
} else {
@ -141,6 +166,9 @@ func (tfs *testFilesystem) Remove(ctx context.Context, loc ulloc.Location, opts
}
func (tfs *testFilesystem) List(ctx context.Context, prefix ulloc.Location, opts *ulfs.ListOptions) (ulfs.ObjectIterator, error) {
tfs.mu.Lock()
defer tfs.mu.Unlock()
if opts != nil && opts.Pending {
return tfs.listPending(ctx, prefix, opts)
}
@ -195,6 +223,13 @@ func (tfs *testFilesystem) listPending(ctx context.Context, prefix ulloc.Locatio
}
func (tfs *testFilesystem) IsLocalDir(ctx context.Context, loc ulloc.Location) (local bool) {
tfs.mu.Lock()
defer tfs.mu.Unlock()
return tfs.isLocalDir(ctx, loc)
}
func (tfs *testFilesystem) isLocalDir(ctx context.Context, loc ulloc.Location) (local bool) {
path, ok := loc.LocalParts()
return ok && (ulloc.CleanPath(path) == "." || tfs.locals[path])
}
@ -270,6 +305,9 @@ func (b *memWriteHandle) Write(p []byte) (int, error) {
}
func (b *memWriteHandle) Commit() error {
b.tfs.mu.Lock()
defer b.tfs.mu.Unlock()
if err := b.close(); err != nil {
return err
}
@ -286,6 +324,9 @@ func (b *memWriteHandle) Commit() error {
}
func (b *memWriteHandle) Abort() error {
b.tfs.mu.Lock()
defer b.tfs.mu.Unlock()
if err := b.close(); err != nil {
return err
}

View File

@ -93,6 +93,24 @@ then
fi
set -e
# test server-side move operation
uplinkng mv "sj://$BUCKET/big-upload-testfile" "sj://$BUCKET/moved-big-upload-testfile" --access $STORJ_ACCESS
uplinkng ls "sj://$BUCKET/moved-big-upload-testfile" --access $STORJ_ACCESS | grep "moved-big-upload-testfile"
uplinkng mv "sj://$BUCKET/moved-big-upload-testfile" "sj://$BUCKET/big-upload-testfile" --access $STORJ_ACCESS
# move prefix
uplinkng mv "sj://$BUCKET/" "sj://$BUCKET/my-prefix/" --recursive --access $STORJ_ACCESS
FILES=$(uplinkng ls "sj://$BUCKET/my-prefix/" --access $STORJ_ACCESS | tee $TMPDIR/list | wc -l)
EXPECTED_FILES="5" # 4 objects + one line more for headers
if [ "$FILES" == $EXPECTED_FILES ]
then
echo "listing after move returns $FILES files"
else
echo "listing after move returns $FILES files but want $EXPECTED_FILES"
cat $TMPDIR/list
exit 1
fi
uplinkng mv "sj://$BUCKET/my-prefix/" "sj://$BUCKET/" --recursive --access $STORJ_ACCESS
uplinkng rm "sj://$BUCKET/small-upload-testfile" --access $STORJ_ACCESS
uplinkng rm "sj://$BUCKET/big-upload-testfile" --access $STORJ_ACCESS
uplinkng rm "sj://$BUCKET/multisegment-upload-testfile" --access $STORJ_ACCESS