08d860570b
this was just supposed to add parallel uploads/downloads and it does do that, but i then found a bunch of bugs with respect to path handling that i thought i had under control. oops. so this adds a ton of tests and tries to make the logic in ulloc to be more consistent. almost all of the actual file handling bits and knowledge happens in cmd_cp now where it should belong. additionally, the s3 command has the behavior that if your bucket has the file s3://bucket/file, then executing s3 ls s3://bucket/fi returns nothing. this change makes uplinkng match that behavior even if i don't personally like it. a big portion of the weirdness is the concept introduced that i've named "directoryish", which intends to capture the behavior that if a user copies a file to that location then the base name of the source should be appended on rather than a direct copy. this concept is entirely a based on the string value and not the actual filesystem state. hence, the cp command is responsible for checking if local paths are actually a directory, and adding a trailing slash if necessary to make them "directoryish". additionally, the empty key for a bucket and the empty string for local paths are considered "directoryish". Change-Id: I9120d18616fd813b29ff81beed4f5993caa99fb6
224 lines
5.3 KiB
Go
224 lines
5.3 KiB
Go
// Copyright (C) 2021 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package main
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"strconv"
|
|
"sync"
|
|
|
|
progressbar "github.com/cheggaaa/pb/v3"
|
|
"github.com/zeebo/clingy"
|
|
"github.com/zeebo/errs"
|
|
|
|
"storj.io/common/sync2"
|
|
"storj.io/storj/cmd/uplinkng/ulext"
|
|
"storj.io/storj/cmd/uplinkng/ulfs"
|
|
"storj.io/storj/cmd/uplinkng/ulloc"
|
|
)
|
|
|
|
type cmdCp struct {
|
|
ex ulext.External
|
|
|
|
access string
|
|
recursive bool
|
|
parallelism int
|
|
dryrun bool
|
|
progress bool
|
|
|
|
source ulloc.Location
|
|
dest ulloc.Location
|
|
}
|
|
|
|
func newCmdCp(ex ulext.External) *cmdCp {
|
|
return &cmdCp{ex: ex}
|
|
}
|
|
|
|
func (c *cmdCp) Setup(params clingy.Parameters) {
|
|
c.access = params.Flag("access", "Which access to use", "").(string)
|
|
c.recursive = params.Flag("recursive", "Peform a recursive copy", false,
|
|
clingy.Short('r'),
|
|
clingy.Transform(strconv.ParseBool),
|
|
).(bool)
|
|
c.parallelism = params.Flag("parallelism", "Controls how many uploads/downloads to perform in parallel", 1,
|
|
clingy.Short('p'),
|
|
clingy.Transform(strconv.Atoi),
|
|
clingy.Transform(func(n int) (int, error) {
|
|
if n <= 0 {
|
|
return 0, errs.New("parallelism must be at least 1")
|
|
}
|
|
return n, nil
|
|
}),
|
|
).(int)
|
|
c.dryrun = params.Flag("dryrun", "Print what operations would happen but don't execute them", false,
|
|
clingy.Transform(strconv.ParseBool),
|
|
).(bool)
|
|
c.progress = params.Flag("progress", "Show a progress bar when possible", true,
|
|
clingy.Transform(strconv.ParseBool),
|
|
).(bool)
|
|
|
|
c.source = params.Arg("source", "Source to copy", clingy.Transform(ulloc.Parse)).(ulloc.Location)
|
|
c.dest = params.Arg("dest", "Desination to copy", clingy.Transform(ulloc.Parse)).(ulloc.Location)
|
|
}
|
|
|
|
func (c *cmdCp) Execute(ctx clingy.Context) error {
|
|
fs, err := c.ex.OpenFilesystem(ctx, c.access)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() { _ = fs.Close() }()
|
|
|
|
// we ensure the source and destination are lexically directoryish
|
|
// if they map to directories. the destination is always converted to be
|
|
// directoryish if the copy is recursive.
|
|
if fs.IsLocalDir(ctx, c.source) {
|
|
c.source = c.source.AsDirectoryish()
|
|
}
|
|
if c.recursive || fs.IsLocalDir(ctx, c.dest) {
|
|
c.dest = c.dest.AsDirectoryish()
|
|
}
|
|
|
|
if c.recursive {
|
|
return c.copyRecursive(ctx, fs)
|
|
}
|
|
|
|
// if the destination is directoryish, we add the basename of the source
|
|
// to the end of the destination to pick a filename.
|
|
var base string
|
|
if c.dest.Directoryish() && !c.source.Std() {
|
|
// we undirectoryish the source so that we ignore any trailing slashes
|
|
// when finding the base name.
|
|
var ok bool
|
|
base, ok = c.source.Undirectoryish().Base()
|
|
if !ok {
|
|
return errs.New("destination is a directory and cannot find base name for source %q", c.source)
|
|
}
|
|
}
|
|
c.dest = joinDestWith(c.dest, base)
|
|
|
|
if !c.source.Std() && !c.dest.Std() {
|
|
fmt.Fprintln(ctx.Stdout(), copyVerb(c.source, c.dest), c.source, "to", c.dest)
|
|
}
|
|
|
|
return c.copyFile(ctx, fs, c.source, c.dest, c.progress)
|
|
}
|
|
|
|
func (c *cmdCp) copyRecursive(ctx clingy.Context, fs ulfs.Filesystem) error {
|
|
if c.source.Std() || c.dest.Std() {
|
|
return errs.New("cannot recursively copy to stdin/stdout")
|
|
}
|
|
|
|
iter, err := fs.ListObjects(ctx, c.source, true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var (
|
|
limiter = sync2.NewLimiter(c.parallelism)
|
|
es errs.Group
|
|
mu sync.Mutex
|
|
)
|
|
|
|
fprintln := func(w io.Writer, args ...interface{}) {
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
|
|
fmt.Fprintln(w, args...)
|
|
}
|
|
|
|
addError := func(err error) {
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
|
|
es.Add(err)
|
|
}
|
|
|
|
for iter.Next() {
|
|
source := iter.Item().Loc
|
|
rel, err := c.source.RelativeTo(source)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
dest := joinDestWith(c.dest, rel)
|
|
|
|
ok := limiter.Go(ctx, func() {
|
|
fprintln(ctx.Stdout(), copyVerb(source, dest), source, "to", dest)
|
|
|
|
if err := c.copyFile(ctx, fs, source, dest, false); err != nil {
|
|
fprintln(ctx.Stderr(), copyVerb(source, dest), "failed:", err.Error())
|
|
addError(err)
|
|
}
|
|
})
|
|
if !ok {
|
|
break
|
|
}
|
|
}
|
|
|
|
limiter.Wait()
|
|
|
|
if err := iter.Err(); err != nil {
|
|
return errs.Wrap(err)
|
|
} else if len(es) > 0 {
|
|
return es.Err()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *cmdCp) copyFile(ctx clingy.Context, fs ulfs.Filesystem, source, dest ulloc.Location, progress bool) error {
|
|
if c.dryrun {
|
|
return nil
|
|
}
|
|
|
|
rh, err := fs.Open(ctx, source)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() { _ = rh.Close() }()
|
|
|
|
wh, err := fs.Create(ctx, dest)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() { _ = wh.Abort() }()
|
|
|
|
var bar *progressbar.ProgressBar
|
|
var writer io.Writer = wh
|
|
|
|
if length := rh.Info().ContentLength; progress && length >= 0 && !c.dest.Std() {
|
|
bar = progressbar.New64(length).SetWriter(ctx.Stdout())
|
|
writer = bar.NewProxyWriter(writer)
|
|
bar.Start()
|
|
defer bar.Finish()
|
|
}
|
|
|
|
if _, err := io.Copy(writer, rh); err != nil {
|
|
return errs.Combine(err, wh.Abort())
|
|
}
|
|
return errs.Wrap(wh.Commit())
|
|
}
|
|
|
|
func copyVerb(source, dest ulloc.Location) string {
|
|
switch {
|
|
case dest.Remote():
|
|
return "upload"
|
|
case source.Remote():
|
|
return "download"
|
|
default:
|
|
return "copy"
|
|
}
|
|
}
|
|
|
|
func joinDestWith(dest ulloc.Location, suffix string) ulloc.Location {
|
|
dest = dest.AppendKey(suffix)
|
|
// if the destination is local and directoryish, remove any
|
|
// trailing slashes that it has. this makes it so that if
|
|
// a remote file is name "foo/", then we copy it down as
|
|
// just "foo".
|
|
if dest.Local() && dest.Directoryish() {
|
|
dest = dest.Undirectoryish()
|
|
}
|
|
return dest
|
|
}
|