cmd/uplink: add parallelism flag for single object download

Adds support for new uplink method DownloadObjectAt which
gives ability to download single object in parallel.

Change-Id: I8388653429992b0d24c383d17d7e90904203fe77
This commit is contained in:
Michał Niewrzał 2021-08-19 13:24:43 +02:00 committed by Michal Niewrzal
parent 5c91ecd271
commit 211a630982
4 changed files with 83 additions and 31 deletions

View File

@ -19,12 +19,14 @@ import (
"storj.io/common/fpath"
"storj.io/uplink"
"storj.io/uplink/private/object"
)
var (
progress *bool
expires *string
metadata *string
progress *bool
expires *string
metadata *string
parallelism *int
)
func init() {
@ -38,6 +40,7 @@ func init() {
progress = cpCmd.Flags().Bool("progress", true, "if true, show progress")
expires = cpCmd.Flags().String("expires", "", "optional expiration date of an object. Please use format (yyyy-mm-ddThh:mm:ssZhh:mm)")
metadata = cpCmd.Flags().String("metadata", "", "optional metadata for the object. Please use a single level JSON object of string to string only")
parallelism = cpCmd.Flags().Int("parallelism", 1, "controls how many parallel downloads of a single object will be performed")
setBasicFlags(cpCmd.Flags(), "progress", "expires", "metadata")
}
@ -137,6 +140,25 @@ func upload(ctx context.Context, src fpath.FPath, dst fpath.FPath, expiration ti
return nil
}
// WriterAt wraps writer and progress bar to display progress correctly.
type WriterAt struct {
object.WriterAt
bar *progressbar.ProgressBar
}
// WriteAt writes bytes to wrapped writer and add amount of bytes to progress bar.
func (w *WriterAt) WriteAt(p []byte, off int64) (n int, err error) {
n, err = w.WriterAt.WriteAt(p, off)
w.bar.Add(n)
return
}
// Truncate truncates writer to specific size.
func (w *WriterAt) Truncate(size int64) error {
w.bar.SetTotal(size)
return w.WriterAt.Truncate(size)
}
// download transfers s3 compatible object src to dst on local machine.
func download(ctx context.Context, src fpath.FPath, dst fpath.FPath, showProgress bool) (err error) {
if src.IsLocal() {
@ -147,29 +169,16 @@ func download(ctx context.Context, src fpath.FPath, dst fpath.FPath, showProgres
return fmt.Errorf("destination must be local path: %s", dst)
}
if *parallelism < 1 {
return fmt.Errorf("parallelism must be at least 1")
}
project, err := cfg.getProject(ctx, false)
if err != nil {
return err
}
defer closeProject(project)
download, err := project.DownloadObject(ctx, src.Bucket(), src.Path(), nil)
if err != nil {
return err
}
defer func() { err = errs.Combine(err, download.Close()) }()
var bar *progressbar.ProgressBar
var reader io.ReadCloser
if showProgress {
info := download.Info()
bar = progressbar.New64(info.System.ContentLength)
reader = bar.NewProxyReader(download)
bar.Start()
} else {
reader = download
}
if fileInfo, err := os.Stat(dst.Path()); err == nil && fileInfo.IsDir() {
dst = dst.Join(src.Base())
}
@ -189,7 +198,43 @@ func download(ctx context.Context, src fpath.FPath, dst fpath.FPath, showProgres
}()
}
_, err = io.Copy(file, reader)
var bar *progressbar.ProgressBar
if *parallelism <= 1 {
download, err := project.DownloadObject(ctx, src.Bucket(), src.Path(), nil)
if err != nil {
return err
}
defer func() { err = errs.Combine(err, download.Close()) }()
var reader io.ReadCloser
if showProgress {
info := download.Info()
bar = progressbar.New64(info.System.ContentLength)
reader = bar.NewProxyReader(download)
bar.Start()
} else {
reader = download
}
_, err = io.Copy(file, reader)
} else {
var writer object.WriterAt
if showProgress {
bar = progressbar.New64(0)
bar.Set(progressbar.Bytes, true)
writer = &WriterAt{file, bar}
bar.Start()
} else {
writer = file
}
// final DownloadObjectAt method signature is under design so we can still have some
// inconsistency between naming e.g. concurrency - parallelism.
err = object.DownloadObjectAt(ctx, project, src.Bucket(), src.Path(), writer, &object.DownloadObjectAtOptions{
Concurrency: *parallelism,
})
}
if bar != nil {
bar.Finish()
}

3
go.mod
View File

@ -52,11 +52,12 @@ require (
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e
google.golang.org/api v0.20.0 // indirect
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect
gopkg.in/segmentio/analytics-go.v3 v3.1.0
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c
storj.io/common v0.0.0-20210818163656-4667d2cafb27
storj.io/drpc v0.0.24
storj.io/monkit-jaeger v0.0.0-20210426161729-debb1cbcbbd7
storj.io/private v0.0.0-20210625132526-af46b647eda5
storj.io/uplink v1.5.0-rc.1.0.20210512164354-e2e5889614a9
storj.io/uplink v1.5.0-rc.1.0.20210820085250-799c214b35ac
)

16
go.sum
View File

@ -147,6 +147,7 @@ github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfb
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4=
github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
@ -168,7 +169,6 @@ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5a
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
@ -318,6 +318,7 @@ github.com/lib/pq v1.3.0 h1:/qkRGz8zljWiDcFvgpwUpwIAPu3r07TDvs3Rws+o/pU=
github.com/lib/pq v1.3.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/loov/hrtime v1.0.3 h1:LiWKU3B9skJwRPUf0Urs9+0+OE3TxdMuiRPOTwR0gcU=
github.com/loov/hrtime v1.0.3/go.mod h1:yDY3Pwv2izeY4sq7YcPX/dtLwzg5NU1AxWuWxKwd0p0=
github.com/lucas-clemente/quic-go v0.20.1/go.mod h1:fZq/HUDIM+mW6X6wtzORjC0E/WDBMKe5Hf9bgjISwLk=
github.com/lucas-clemente/quic-go v0.22.0 h1:o8NIiHaavjoHe6z8Bqm6fw7g0YIP6AFKMYer+oNxInA=
github.com/lucas-clemente/quic-go v0.22.0/go.mod h1:vF5M1XqhBAHgbjKcJOXY3JZz3GP0T3FQhz/uyOUS38Q=
github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI=
@ -329,6 +330,7 @@ github.com/marten-seemann/qpack v0.2.1/go.mod h1:F7Gl5L1jIgN1D11ucXefiuJS9UMVP2o
github.com/marten-seemann/qtls-go1-15 v0.1.4/go.mod h1:GyFwywLKkRt+6mfU99csTEY1joMZz5vmB1WNZH3P81I=
github.com/marten-seemann/qtls-go1-15 v0.1.5 h1:Ci4EIUN6Rlb+D6GmLdej/bCQ4nPYNtVXQB+xjiXE1nk=
github.com/marten-seemann/qtls-go1-15 v0.1.5/go.mod h1:GyFwywLKkRt+6mfU99csTEY1joMZz5vmB1WNZH3P81I=
github.com/marten-seemann/qtls-go1-16 v0.1.3/go.mod h1:gNpI2Ol+lRS3WwSOtIUUtRwZEQMXjYK+dQSBFbethAk=
github.com/marten-seemann/qtls-go1-16 v0.1.4 h1:xbHbOGGhrenVtII6Co8akhLEdrawwB2iHl5yhJRpnco=
github.com/marten-seemann/qtls-go1-16 v0.1.4/go.mod h1:gNpI2Ol+lRS3WwSOtIUUtRwZEQMXjYK+dQSBFbethAk=
github.com/marten-seemann/qtls-go1-17 v0.1.0-rc.1 h1:/rpmWuGvceLwwWuaKPdjpR4JJEUH0tq64/I3hvzaNLM=
@ -476,8 +478,6 @@ github.com/spacemonkeygo/monkit/v3 v3.0.0-20191108235033-eacca33b3037/go.mod h1:
github.com/spacemonkeygo/monkit/v3 v3.0.4/go.mod h1:JcK1pCbReQsOsMKF/POFSZCq7drXFybgGmbc27tuwes=
github.com/spacemonkeygo/monkit/v3 v3.0.5/go.mod h1:JcK1pCbReQsOsMKF/POFSZCq7drXFybgGmbc27tuwes=
github.com/spacemonkeygo/monkit/v3 v3.0.7-0.20200515175308-072401d8c752/go.mod h1:kj1ViJhlyADa7DiA4xVnTuPA46lFKbM7mxQTrXCuJP4=
github.com/spacemonkeygo/monkit/v3 v3.0.7/go.mod h1:kj1ViJhlyADa7DiA4xVnTuPA46lFKbM7mxQTrXCuJP4=
github.com/spacemonkeygo/monkit/v3 v3.0.10/go.mod h1:kj1ViJhlyADa7DiA4xVnTuPA46lFKbM7mxQTrXCuJP4=
github.com/spacemonkeygo/monkit/v3 v3.0.12/go.mod h1:kj1ViJhlyADa7DiA4xVnTuPA46lFKbM7mxQTrXCuJP4=
github.com/spacemonkeygo/monkit/v3 v3.0.14 h1:oHMuvvnHTawibf41bKy4T6iLoIPqSYsHgvKh/xrC+kQ=
github.com/spacemonkeygo/monkit/v3 v3.0.14/go.mod h1:kj1ViJhlyADa7DiA4xVnTuPA46lFKbM7mxQTrXCuJP4=
@ -509,7 +509,6 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stripe/stripe-go/v72 v72.51.0 h1:scXELorHW1SnAfARThO1QayscOsfEIoIAUy0yxoTqxY=
@ -722,6 +721,7 @@ golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200610111108-226ff32320da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201231184435-2d18734c6014/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210415045647-66c3f260301c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -835,7 +835,6 @@ google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi
google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
@ -877,12 +876,11 @@ sourcegraph.com/sourcegraph/go-diff v0.5.0/go.mod h1:kuch7UrkMzY0X+p9CRK03kfuPQ2
sourcegraph.com/sqs/pbtypes v0.0.0-20180604144634-d3ebe8f20ae4/go.mod h1:ketZ/q3QxT9HOBeFhu6RdvsftgpsbFHBF5Cas6cDKZ0=
storj.io/common v0.0.0-20200424175742-65ac59022f4f/go.mod h1:pZyXiIE7bGETIRXtfs0nICqMwp7PM8HqnDuyUeldNA0=
storj.io/common v0.0.0-20201026135900-1aaeec90670b/go.mod h1:GqdmNf3fLm2UZX/7Zr0BLFCJ4gFjgm6eHrk/fnmr5jQ=
storj.io/common v0.0.0-20210504141454-bcb03a80052f/go.mod h1:PdP3eTld9RqSV3E4K44JSlw7Z/zNsymj9rnKuHFKhJE=
storj.io/common v0.0.0-20210805073808-8e0feb09e92a/go.mod h1:mhZYWpTojKsACxWE66RfXNz19zbyr/uEDVWHJH8dHog=
storj.io/common v0.0.0-20210818163656-4667d2cafb27 h1:yXSY7oJDWbmIayhh9acESmMn/YFxolHdceeKz0a1Tyw=
storj.io/common v0.0.0-20210818163656-4667d2cafb27/go.mod h1:ubZQYZozoKgjEkvaqmgBXqhLzKMXlLyDlK7EQdkGp/s=
storj.io/drpc v0.0.11/go.mod h1:TiFc2obNjL9/3isMW1Rpxjy8V9uE0B2HMeMFGiiI7Iw=
storj.io/drpc v0.0.14/go.mod h1:82nfl+6YwRwF6UG31cEWWUqv/FaKvP5SGqUvoqTxCMA=
storj.io/drpc v0.0.20/go.mod h1:eAxUDk8HWvGl9iqznpuphtZ+WIjIGPJFqNXuKHgRiMM=
storj.io/drpc v0.0.24 h1:9WgD+q8WWDIz7XCMib+U6xX9SjblMqOz9R6sU7rJnS8=
storj.io/drpc v0.0.24/go.mod h1:ofQUDPQbbIymRDKE0tms48k8bLP5Y+dsI9CbXGv3gko=
storj.io/monkit-jaeger v0.0.0-20210225162224-66fb37637bf6/go.mod h1:gj4vuCeyCRjRmH8LIrgoyU9Dc9uR6H+/GcDUXmTbf80=
@ -890,5 +888,5 @@ storj.io/monkit-jaeger v0.0.0-20210426161729-debb1cbcbbd7 h1:zi0w9zoBfvuqysSAqxJ
storj.io/monkit-jaeger v0.0.0-20210426161729-debb1cbcbbd7/go.mod h1:gj4vuCeyCRjRmH8LIrgoyU9Dc9uR6H+/GcDUXmTbf80=
storj.io/private v0.0.0-20210625132526-af46b647eda5 h1:U07CKRc/FceT4V6FNXIEKYpiurx4hqV5RjzAA2EilbA=
storj.io/private v0.0.0-20210625132526-af46b647eda5/go.mod h1:+p0XLkdbH7/Fl2UhIRVNaj82U9Z8K1MaUu9cpgi80xc=
storj.io/uplink v1.5.0-rc.1.0.20210512164354-e2e5889614a9 h1:F+A+Ki4eo3uzYXxesihRBq7PYBhU8MgfZeebd4O8hio=
storj.io/uplink v1.5.0-rc.1.0.20210512164354-e2e5889614a9/go.mod h1:geRW2dh4rvPhgruFZbN71LSYkMmCJLpwg0y8K/uLr3Y=
storj.io/uplink v1.5.0-rc.1.0.20210820085250-799c214b35ac h1:hSMRYWkjov2FbC+2V59j+oy2PunwPddjFnkt/SjIqeI=
storj.io/uplink v1.5.0-rc.1.0.20210820085250-799c214b35ac/go.mod h1:oIb5Fhc2MoWJOUZkt6Ll0KXzCpJvVMpJHvUBGQ1JdZs=

View File

@ -70,6 +70,10 @@ uplink cp "sj://$BUCKET/diff-size-segments" "$DST_DIR" --progress=fal
uplink cp "sj://$BUCKET/put-file" "$DST_DIR" --progress=false
uplink cat "sj://$BUCKET/put-file" >> "$DST_DIR/put-file-from-cat"
# test parallelism of single object
uplink cp "sj://$BUCKET/multisegment-upload-testfile" "$DST_DIR/multisegment-upload-testfile_p2" --parallelism 2 --progress=false
uplink cp "sj://$BUCKET/diff-size-segments" "$DST_DIR/diff-size-segments_p2" --parallelism 2 --progress=false
uplink ls "sj://$BUCKET/small-upload-testfile" | grep "small-upload-testfile"
uplink rm "sj://$BUCKET/small-upload-testfile"
@ -89,6 +93,10 @@ compare_files "$SRC_DIR/diff-size-segments" "$DST_DIR/diff-size-segmen
compare_files "$SRC_DIR/put-file" "$DST_DIR/put-file"
compare_files "$SRC_DIR/put-file" "$DST_DIR/put-file-from-cat"
# test parallelism of single object
compare_files "$SRC_DIR/multisegment-upload-testfile" "$DST_DIR/multisegment-upload-testfile_p2"
compare_files "$SRC_DIR/diff-size-segments" "$DST_DIR/diff-size-segments_p2"
# test deleting non empty bucket with --force flag
uplink mb "sj://$BUCKET/"