cmd/uplinkng: file level parallel copy
Change-Id: I074abca67ea6f4a2fd0983fe452c74f76ee62c6a
This commit is contained in:
parent
b6f1efbbdc
commit
c8c4e8835e
@ -4,16 +4,19 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
progressbar "github.com/cheggaaa/pb/v3"
|
progressbar "github.com/cheggaaa/pb/v3"
|
||||||
"github.com/zeebo/clingy"
|
"github.com/zeebo/clingy"
|
||||||
"github.com/zeebo/errs"
|
"github.com/zeebo/errs"
|
||||||
|
|
||||||
"storj.io/common/ranger/httpranger"
|
"storj.io/common/memory"
|
||||||
"storj.io/common/sync2"
|
"storj.io/common/sync2"
|
||||||
"storj.io/storj/cmd/uplinkng/ulext"
|
"storj.io/storj/cmd/uplinkng/ulext"
|
||||||
"storj.io/storj/cmd/uplinkng/ulfs"
|
"storj.io/storj/cmd/uplinkng/ulfs"
|
||||||
@ -30,6 +33,9 @@ type cmdCp struct {
|
|||||||
progress bool
|
progress bool
|
||||||
byteRange string
|
byteRange string
|
||||||
|
|
||||||
|
fileParallelism int
|
||||||
|
fileChunkSize memory.Size
|
||||||
|
|
||||||
source ulloc.Location
|
source ulloc.Location
|
||||||
dest ulloc.Location
|
dest ulloc.Location
|
||||||
}
|
}
|
||||||
@ -62,6 +68,25 @@ func (c *cmdCp) Setup(params clingy.Parameters) {
|
|||||||
).(bool)
|
).(bool)
|
||||||
c.byteRange = params.Flag("range", "Downloads the specified range bytes of an object. For more information about the HTTP Range header, see https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35", "").(string)
|
c.byteRange = params.Flag("range", "Downloads the specified range bytes of an object. For more information about the HTTP Range header, see https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35", "").(string)
|
||||||
|
|
||||||
|
c.fileParallelism = params.Flag("file-parallelism", "Controls how many parallel chunks to download from a file", 8,
|
||||||
|
clingy.Transform(strconv.Atoi),
|
||||||
|
clingy.Transform(func(n int) (int, error) {
|
||||||
|
if n <= 0 {
|
||||||
|
return 0, errs.New("file parallelism must be at least 1")
|
||||||
|
}
|
||||||
|
return n, nil
|
||||||
|
}),
|
||||||
|
).(int)
|
||||||
|
c.fileChunkSize = params.Flag("file-chunk-size", "Controls the size of the chunks for file parallelism", 64*memory.MB,
|
||||||
|
clingy.Transform(memory.ParseString),
|
||||||
|
clingy.Transform(func(n int64) (memory.Size, error) {
|
||||||
|
if memory.Size(n) < 1*memory.MB {
|
||||||
|
return 0, errs.New("file chunk size must be at least 1 MB")
|
||||||
|
}
|
||||||
|
return memory.Size(n), nil
|
||||||
|
}),
|
||||||
|
).(memory.Size)
|
||||||
|
|
||||||
c.source = params.Arg("source", "Source to copy", clingy.Transform(ulloc.Parse)).(ulloc.Location)
|
c.source = params.Arg("source", "Source to copy", clingy.Transform(ulloc.Parse)).(ulloc.Location)
|
||||||
c.dest = params.Arg("dest", "Destination to copy", clingy.Transform(ulloc.Parse)).(ulloc.Location)
|
c.dest = params.Arg("dest", "Destination to copy", clingy.Transform(ulloc.Parse)).(ulloc.Location)
|
||||||
}
|
}
|
||||||
@ -87,6 +112,9 @@ func (c *cmdCp) Execute(ctx clingy.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if c.recursive {
|
if c.recursive {
|
||||||
|
if c.byteRange != "" {
|
||||||
|
return errs.New("unable to do recursive copy with byte range")
|
||||||
|
}
|
||||||
return c.copyRecursive(ctx, fs)
|
return c.copyRecursive(ctx, fs)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -179,26 +207,9 @@ func (c *cmdCp) copyFile(ctx clingy.Context, fs ulfs.Filesystem, source, dest ul
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var offset, length int64 = 0, -1
|
offset, length, err := parseRange(c.byteRange)
|
||||||
|
|
||||||
if c.byteRange != "" {
|
|
||||||
// TODO: we might want to avoid this call if ranged download will be used frequently
|
|
||||||
stat, err := fs.Stat(ctx, source)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return errs.Wrap(err)
|
||||||
}
|
|
||||||
byteRange, err := httpranger.ParseRange(c.byteRange, stat.ContentLength)
|
|
||||||
if err != nil && byteRange == nil {
|
|
||||||
return errs.New("error parsing byte range %q: %w", c.byteRange, err)
|
|
||||||
}
|
|
||||||
if len(byteRange) == 0 {
|
|
||||||
return errs.New("invalid range")
|
|
||||||
}
|
|
||||||
if len(byteRange) > 1 {
|
|
||||||
return errs.New("retrieval of multiple byte ranges of data not supported: %d provided", len(byteRange))
|
|
||||||
}
|
|
||||||
|
|
||||||
offset, length = byteRange[0].Start, byteRange[0].Length
|
|
||||||
}
|
}
|
||||||
|
|
||||||
mrh, err := fs.Open(ctx, source)
|
mrh, err := fs.Open(ctx, source)
|
||||||
@ -207,50 +218,25 @@ func (c *cmdCp) copyFile(ctx clingy.Context, fs ulfs.Filesystem, source, dest ul
|
|||||||
}
|
}
|
||||||
defer func() { _ = mrh.Close() }()
|
defer func() { _ = mrh.Close() }()
|
||||||
|
|
||||||
if err := mrh.SetOffset(offset); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
rh, err := mrh.NextPart(ctx, length)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if length == -1 {
|
|
||||||
length = rh.Info().ContentLength
|
|
||||||
}
|
|
||||||
|
|
||||||
mwh, err := fs.Create(ctx, dest)
|
mwh, err := fs.Create(ctx, dest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer func() { _ = mwh.Abort(ctx) }()
|
defer func() { _ = mwh.Abort(ctx) }()
|
||||||
|
|
||||||
wh, err := mwh.NextPart(ctx, -1)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer func() { _ = wh.Abort() }()
|
|
||||||
|
|
||||||
var bar *progressbar.ProgressBar
|
var bar *progressbar.ProgressBar
|
||||||
var writer io.Writer = wh
|
if progress && !c.dest.Std() {
|
||||||
|
bar = progressbar.New64(0).SetWriter(ctx.Stdout())
|
||||||
if progress && length >= 0 && !c.dest.Std() {
|
|
||||||
bar = progressbar.New64(length).SetWriter(ctx.Stdout())
|
|
||||||
writer = bar.NewProxyWriter(writer)
|
|
||||||
bar.Start()
|
|
||||||
defer bar.Finish()
|
defer bar.Finish()
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := io.Copy(writer, rh); err != nil {
|
return errs.Wrap(parallelCopy(
|
||||||
return errs.Combine(err, wh.Abort())
|
ctx,
|
||||||
}
|
mwh, mrh,
|
||||||
|
c.fileParallelism, c.fileChunkSize.Int64(),
|
||||||
if err := wh.Commit(); err != nil {
|
offset, length,
|
||||||
return errs.Combine(err, wh.Abort())
|
bar,
|
||||||
}
|
))
|
||||||
|
|
||||||
return errs.Wrap(mwh.Commit(ctx))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func copyVerb(source, dest ulloc.Location) string {
|
func copyVerb(source, dest ulloc.Location) string {
|
||||||
@ -275,3 +261,142 @@ func joinDestWith(dest ulloc.Location, suffix string) ulloc.Location {
|
|||||||
}
|
}
|
||||||
return dest
|
return dest
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func parallelCopy(
|
||||||
|
clctx clingy.Context,
|
||||||
|
dst ulfs.MultiWriteHandle,
|
||||||
|
src ulfs.MultiReadHandle,
|
||||||
|
p int, chunkSize int64,
|
||||||
|
offset, length int64,
|
||||||
|
bar *progressbar.ProgressBar) error {
|
||||||
|
|
||||||
|
if offset != 0 {
|
||||||
|
if err := src.SetOffset(offset); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
limiter = sync2.NewLimiter(p)
|
||||||
|
es errs.Group
|
||||||
|
mu sync.Mutex
|
||||||
|
)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(clctx)
|
||||||
|
|
||||||
|
defer limiter.Wait()
|
||||||
|
defer func() { _ = src.Close() }()
|
||||||
|
defer func() { _ = dst.Abort(ctx) }()
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
for i := 0; length != 0; i++ {
|
||||||
|
i := i
|
||||||
|
|
||||||
|
chunk := chunkSize
|
||||||
|
if length > 0 && chunkSize > length {
|
||||||
|
chunk = length
|
||||||
|
}
|
||||||
|
length -= chunk
|
||||||
|
|
||||||
|
rh, err := src.NextPart(ctx, chunk)
|
||||||
|
if errors.Is(err, io.EOF) {
|
||||||
|
break
|
||||||
|
} else if err != nil {
|
||||||
|
mu.Lock()
|
||||||
|
fmt.Fprintln(clctx.Stderr(), "Error getting reader for part", i)
|
||||||
|
mu.Unlock()
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
wh, err := dst.NextPart(ctx, chunk)
|
||||||
|
if err != nil {
|
||||||
|
_ = rh.Close()
|
||||||
|
|
||||||
|
mu.Lock()
|
||||||
|
fmt.Fprintln(clctx.Stderr(), "Error getting writer for part", i)
|
||||||
|
mu.Unlock()
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
ok := limiter.Go(ctx, func() {
|
||||||
|
defer func() { _ = rh.Close() }()
|
||||||
|
defer func() { _ = wh.Abort() }()
|
||||||
|
|
||||||
|
var w io.Writer = wh
|
||||||
|
if bar != nil {
|
||||||
|
bar.SetTotal(rh.Info().ContentLength).Start()
|
||||||
|
w = bar.NewProxyWriter(w)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := io.Copy(w, rh)
|
||||||
|
if err == nil {
|
||||||
|
err = wh.Commit()
|
||||||
|
}
|
||||||
|
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
|
|
||||||
|
es.Add(err)
|
||||||
|
})
|
||||||
|
if !ok {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
limiter.Wait()
|
||||||
|
|
||||||
|
es.Add(dst.Commit(ctx))
|
||||||
|
|
||||||
|
return es.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseRange(r string) (offset, length int64, err error) {
|
||||||
|
r = strings.TrimPrefix(strings.TrimSpace(r), "bytes=")
|
||||||
|
if r == "" {
|
||||||
|
return 0, -1, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if strings.Contains(r, ",") {
|
||||||
|
return 0, 0, errs.New("invalid range: must be single range")
|
||||||
|
}
|
||||||
|
|
||||||
|
idx := strings.Index(r, "-")
|
||||||
|
if idx < 0 {
|
||||||
|
return 0, 0, errs.New(`invalid range: no "-"`)
|
||||||
|
}
|
||||||
|
|
||||||
|
start, end := strings.TrimSpace(r[:idx]), strings.TrimSpace(r[idx+1:])
|
||||||
|
|
||||||
|
var starti, endi int64
|
||||||
|
|
||||||
|
if start != "" {
|
||||||
|
starti, err = strconv.ParseInt(start, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, errs.New("invalid range: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if end != "" {
|
||||||
|
endi, err = strconv.ParseInt(end, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, errs.New("invalid range: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case start == "" && end == "":
|
||||||
|
return 0, 0, errs.New("invalid range")
|
||||||
|
case start == "":
|
||||||
|
return -endi, -1, nil
|
||||||
|
case end == "":
|
||||||
|
return starti, -1, nil
|
||||||
|
case starti < 0:
|
||||||
|
return 0, 0, errs.New("invalid range: negative start: %q", start)
|
||||||
|
case starti > endi:
|
||||||
|
return 0, 0, errs.New("invalid range: %v > %v", starti, endi)
|
||||||
|
default:
|
||||||
|
return starti, endi - starti + 1, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -86,6 +86,9 @@ func (u *uplinkMultiReadHandle) NextPart(ctx context.Context, length int64) (Rea
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: this can cause tearing if the object is modified during
|
||||||
|
// the download. this should be fixed when we extend the api to
|
||||||
|
// allow requesting a specific version of the object.
|
||||||
dl, err := u.project.DownloadObject(ctx, u.bucket, u.key, opts)
|
dl, err := u.project.DownloadObject(ctx, u.bucket, u.key, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
Loading…
Reference in New Issue
Block a user