From 211a63098286654859480275728b637818b19ac3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Niewrza=C5=82?= Date: Thu, 19 Aug 2021 13:24:43 +0200 Subject: [PATCH] cmd/uplink: add parallelism flag for single object download Adds support for new uplink method DownloadObjectAt which gives ability to download single object in parallel. Change-Id: I8388653429992b0d24c383d17d7e90904203fe77 --- cmd/uplink/cmd/cp.go | 87 ++++++++++++++++++++++++++++++++---------- go.mod | 3 +- go.sum | 16 ++++---- scripts/test-uplink.sh | 8 ++++ 4 files changed, 83 insertions(+), 31 deletions(-) diff --git a/cmd/uplink/cmd/cp.go b/cmd/uplink/cmd/cp.go index 5aa23abbe..ddbb1a71e 100644 --- a/cmd/uplink/cmd/cp.go +++ b/cmd/uplink/cmd/cp.go @@ -19,12 +19,14 @@ import ( "storj.io/common/fpath" "storj.io/uplink" + "storj.io/uplink/private/object" ) var ( - progress *bool - expires *string - metadata *string + progress *bool + expires *string + metadata *string + parallelism *int ) func init() { @@ -38,6 +40,7 @@ func init() { progress = cpCmd.Flags().Bool("progress", true, "if true, show progress") expires = cpCmd.Flags().String("expires", "", "optional expiration date of an object. Please use format (yyyy-mm-ddThh:mm:ssZhh:mm)") metadata = cpCmd.Flags().String("metadata", "", "optional metadata for the object. Please use a single level JSON object of string to string only") + parallelism = cpCmd.Flags().Int("parallelism", 1, "controls how many parallel downloads of a single object will be performed") setBasicFlags(cpCmd.Flags(), "progress", "expires", "metadata") } @@ -137,6 +140,25 @@ func upload(ctx context.Context, src fpath.FPath, dst fpath.FPath, expiration ti return nil } +// WriterAt wraps writer and progress bar to display progress correctly. +type WriterAt struct { + object.WriterAt + bar *progressbar.ProgressBar +} + +// WriteAt writes bytes to wrapped writer and add amount of bytes to progress bar. +func (w *WriterAt) WriteAt(p []byte, off int64) (n int, err error) { + n, err = w.WriterAt.WriteAt(p, off) + w.bar.Add(n) + return +} + +// Truncate truncates writer to specific size. +func (w *WriterAt) Truncate(size int64) error { + w.bar.SetTotal(size) + return w.WriterAt.Truncate(size) +} + // download transfers s3 compatible object src to dst on local machine. func download(ctx context.Context, src fpath.FPath, dst fpath.FPath, showProgress bool) (err error) { if src.IsLocal() { @@ -147,29 +169,16 @@ func download(ctx context.Context, src fpath.FPath, dst fpath.FPath, showProgres return fmt.Errorf("destination must be local path: %s", dst) } + if *parallelism < 1 { + return fmt.Errorf("parallelism must be at least 1") + } + project, err := cfg.getProject(ctx, false) if err != nil { return err } defer closeProject(project) - download, err := project.DownloadObject(ctx, src.Bucket(), src.Path(), nil) - if err != nil { - return err - } - defer func() { err = errs.Combine(err, download.Close()) }() - - var bar *progressbar.ProgressBar - var reader io.ReadCloser - if showProgress { - info := download.Info() - bar = progressbar.New64(info.System.ContentLength) - reader = bar.NewProxyReader(download) - bar.Start() - } else { - reader = download - } - if fileInfo, err := os.Stat(dst.Path()); err == nil && fileInfo.IsDir() { dst = dst.Join(src.Base()) } @@ -189,7 +198,43 @@ func download(ctx context.Context, src fpath.FPath, dst fpath.FPath, showProgres }() } - _, err = io.Copy(file, reader) + var bar *progressbar.ProgressBar + if *parallelism <= 1 { + download, err := project.DownloadObject(ctx, src.Bucket(), src.Path(), nil) + if err != nil { + return err + } + defer func() { err = errs.Combine(err, download.Close()) }() + + var reader io.ReadCloser + if showProgress { + info := download.Info() + bar = progressbar.New64(info.System.ContentLength) + reader = bar.NewProxyReader(download) + bar.Start() + } else { + reader = download + } + + _, err = io.Copy(file, reader) + } else { + var writer object.WriterAt + if showProgress { + bar = progressbar.New64(0) + bar.Set(progressbar.Bytes, true) + writer = &WriterAt{file, bar} + bar.Start() + } else { + writer = file + } + + // final DownloadObjectAt method signature is under design so we can still have some + // inconsistency between naming e.g. concurrency - parallelism. + err = object.DownloadObjectAt(ctx, project, src.Bucket(), src.Path(), writer, &object.DownloadObjectAtOptions{ + Concurrency: *parallelism, + }) + } + if bar != nil { bar.Finish() } diff --git a/go.mod b/go.mod index f75e06e2d..196bc3506 100644 --- a/go.mod +++ b/go.mod @@ -52,11 +52,12 @@ require ( golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e google.golang.org/api v0.20.0 // indirect + google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect gopkg.in/segmentio/analytics-go.v3 v3.1.0 gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c storj.io/common v0.0.0-20210818163656-4667d2cafb27 storj.io/drpc v0.0.24 storj.io/monkit-jaeger v0.0.0-20210426161729-debb1cbcbbd7 storj.io/private v0.0.0-20210625132526-af46b647eda5 - storj.io/uplink v1.5.0-rc.1.0.20210512164354-e2e5889614a9 + storj.io/uplink v1.5.0-rc.1.0.20210820085250-799c214b35ac ) diff --git a/go.sum b/go.sum index 5a7d9fada..a468780bd 100644 --- a/go.sum +++ b/go.sum @@ -147,6 +147,7 @@ github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfb github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4= +github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8= github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -168,7 +169,6 @@ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5a github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= @@ -318,6 +318,7 @@ github.com/lib/pq v1.3.0 h1:/qkRGz8zljWiDcFvgpwUpwIAPu3r07TDvs3Rws+o/pU= github.com/lib/pq v1.3.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/loov/hrtime v1.0.3 h1:LiWKU3B9skJwRPUf0Urs9+0+OE3TxdMuiRPOTwR0gcU= github.com/loov/hrtime v1.0.3/go.mod h1:yDY3Pwv2izeY4sq7YcPX/dtLwzg5NU1AxWuWxKwd0p0= +github.com/lucas-clemente/quic-go v0.20.1/go.mod h1:fZq/HUDIM+mW6X6wtzORjC0E/WDBMKe5Hf9bgjISwLk= github.com/lucas-clemente/quic-go v0.22.0 h1:o8NIiHaavjoHe6z8Bqm6fw7g0YIP6AFKMYer+oNxInA= github.com/lucas-clemente/quic-go v0.22.0/go.mod h1:vF5M1XqhBAHgbjKcJOXY3JZz3GP0T3FQhz/uyOUS38Q= github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI= @@ -329,6 +330,7 @@ github.com/marten-seemann/qpack v0.2.1/go.mod h1:F7Gl5L1jIgN1D11ucXefiuJS9UMVP2o github.com/marten-seemann/qtls-go1-15 v0.1.4/go.mod h1:GyFwywLKkRt+6mfU99csTEY1joMZz5vmB1WNZH3P81I= github.com/marten-seemann/qtls-go1-15 v0.1.5 h1:Ci4EIUN6Rlb+D6GmLdej/bCQ4nPYNtVXQB+xjiXE1nk= github.com/marten-seemann/qtls-go1-15 v0.1.5/go.mod h1:GyFwywLKkRt+6mfU99csTEY1joMZz5vmB1WNZH3P81I= +github.com/marten-seemann/qtls-go1-16 v0.1.3/go.mod h1:gNpI2Ol+lRS3WwSOtIUUtRwZEQMXjYK+dQSBFbethAk= github.com/marten-seemann/qtls-go1-16 v0.1.4 h1:xbHbOGGhrenVtII6Co8akhLEdrawwB2iHl5yhJRpnco= github.com/marten-seemann/qtls-go1-16 v0.1.4/go.mod h1:gNpI2Ol+lRS3WwSOtIUUtRwZEQMXjYK+dQSBFbethAk= github.com/marten-seemann/qtls-go1-17 v0.1.0-rc.1 h1:/rpmWuGvceLwwWuaKPdjpR4JJEUH0tq64/I3hvzaNLM= @@ -476,8 +478,6 @@ github.com/spacemonkeygo/monkit/v3 v3.0.0-20191108235033-eacca33b3037/go.mod h1: github.com/spacemonkeygo/monkit/v3 v3.0.4/go.mod h1:JcK1pCbReQsOsMKF/POFSZCq7drXFybgGmbc27tuwes= github.com/spacemonkeygo/monkit/v3 v3.0.5/go.mod h1:JcK1pCbReQsOsMKF/POFSZCq7drXFybgGmbc27tuwes= github.com/spacemonkeygo/monkit/v3 v3.0.7-0.20200515175308-072401d8c752/go.mod h1:kj1ViJhlyADa7DiA4xVnTuPA46lFKbM7mxQTrXCuJP4= -github.com/spacemonkeygo/monkit/v3 v3.0.7/go.mod h1:kj1ViJhlyADa7DiA4xVnTuPA46lFKbM7mxQTrXCuJP4= -github.com/spacemonkeygo/monkit/v3 v3.0.10/go.mod h1:kj1ViJhlyADa7DiA4xVnTuPA46lFKbM7mxQTrXCuJP4= github.com/spacemonkeygo/monkit/v3 v3.0.12/go.mod h1:kj1ViJhlyADa7DiA4xVnTuPA46lFKbM7mxQTrXCuJP4= github.com/spacemonkeygo/monkit/v3 v3.0.14 h1:oHMuvvnHTawibf41bKy4T6iLoIPqSYsHgvKh/xrC+kQ= github.com/spacemonkeygo/monkit/v3 v3.0.14/go.mod h1:kj1ViJhlyADa7DiA4xVnTuPA46lFKbM7mxQTrXCuJP4= @@ -509,7 +509,6 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stripe/stripe-go/v72 v72.51.0 h1:scXELorHW1SnAfARThO1QayscOsfEIoIAUy0yxoTqxY= @@ -722,6 +721,7 @@ golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200610111108-226ff32320da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201231184435-2d18734c6014/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210415045647-66c3f260301c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -835,7 +835,6 @@ google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= @@ -877,12 +876,11 @@ sourcegraph.com/sourcegraph/go-diff v0.5.0/go.mod h1:kuch7UrkMzY0X+p9CRK03kfuPQ2 sourcegraph.com/sqs/pbtypes v0.0.0-20180604144634-d3ebe8f20ae4/go.mod h1:ketZ/q3QxT9HOBeFhu6RdvsftgpsbFHBF5Cas6cDKZ0= storj.io/common v0.0.0-20200424175742-65ac59022f4f/go.mod h1:pZyXiIE7bGETIRXtfs0nICqMwp7PM8HqnDuyUeldNA0= storj.io/common v0.0.0-20201026135900-1aaeec90670b/go.mod h1:GqdmNf3fLm2UZX/7Zr0BLFCJ4gFjgm6eHrk/fnmr5jQ= -storj.io/common v0.0.0-20210504141454-bcb03a80052f/go.mod h1:PdP3eTld9RqSV3E4K44JSlw7Z/zNsymj9rnKuHFKhJE= +storj.io/common v0.0.0-20210805073808-8e0feb09e92a/go.mod h1:mhZYWpTojKsACxWE66RfXNz19zbyr/uEDVWHJH8dHog= storj.io/common v0.0.0-20210818163656-4667d2cafb27 h1:yXSY7oJDWbmIayhh9acESmMn/YFxolHdceeKz0a1Tyw= storj.io/common v0.0.0-20210818163656-4667d2cafb27/go.mod h1:ubZQYZozoKgjEkvaqmgBXqhLzKMXlLyDlK7EQdkGp/s= storj.io/drpc v0.0.11/go.mod h1:TiFc2obNjL9/3isMW1Rpxjy8V9uE0B2HMeMFGiiI7Iw= storj.io/drpc v0.0.14/go.mod h1:82nfl+6YwRwF6UG31cEWWUqv/FaKvP5SGqUvoqTxCMA= -storj.io/drpc v0.0.20/go.mod h1:eAxUDk8HWvGl9iqznpuphtZ+WIjIGPJFqNXuKHgRiMM= storj.io/drpc v0.0.24 h1:9WgD+q8WWDIz7XCMib+U6xX9SjblMqOz9R6sU7rJnS8= storj.io/drpc v0.0.24/go.mod h1:ofQUDPQbbIymRDKE0tms48k8bLP5Y+dsI9CbXGv3gko= storj.io/monkit-jaeger v0.0.0-20210225162224-66fb37637bf6/go.mod h1:gj4vuCeyCRjRmH8LIrgoyU9Dc9uR6H+/GcDUXmTbf80= @@ -890,5 +888,5 @@ storj.io/monkit-jaeger v0.0.0-20210426161729-debb1cbcbbd7 h1:zi0w9zoBfvuqysSAqxJ storj.io/monkit-jaeger v0.0.0-20210426161729-debb1cbcbbd7/go.mod h1:gj4vuCeyCRjRmH8LIrgoyU9Dc9uR6H+/GcDUXmTbf80= storj.io/private v0.0.0-20210625132526-af46b647eda5 h1:U07CKRc/FceT4V6FNXIEKYpiurx4hqV5RjzAA2EilbA= storj.io/private v0.0.0-20210625132526-af46b647eda5/go.mod h1:+p0XLkdbH7/Fl2UhIRVNaj82U9Z8K1MaUu9cpgi80xc= -storj.io/uplink v1.5.0-rc.1.0.20210512164354-e2e5889614a9 h1:F+A+Ki4eo3uzYXxesihRBq7PYBhU8MgfZeebd4O8hio= -storj.io/uplink v1.5.0-rc.1.0.20210512164354-e2e5889614a9/go.mod h1:geRW2dh4rvPhgruFZbN71LSYkMmCJLpwg0y8K/uLr3Y= +storj.io/uplink v1.5.0-rc.1.0.20210820085250-799c214b35ac h1:hSMRYWkjov2FbC+2V59j+oy2PunwPddjFnkt/SjIqeI= +storj.io/uplink v1.5.0-rc.1.0.20210820085250-799c214b35ac/go.mod h1:oIb5Fhc2MoWJOUZkt6Ll0KXzCpJvVMpJHvUBGQ1JdZs= diff --git a/scripts/test-uplink.sh b/scripts/test-uplink.sh index fc240764f..d4e4789dc 100755 --- a/scripts/test-uplink.sh +++ b/scripts/test-uplink.sh @@ -70,6 +70,10 @@ uplink cp "sj://$BUCKET/diff-size-segments" "$DST_DIR" --progress=fal uplink cp "sj://$BUCKET/put-file" "$DST_DIR" --progress=false uplink cat "sj://$BUCKET/put-file" >> "$DST_DIR/put-file-from-cat" +# test parallelism of single object +uplink cp "sj://$BUCKET/multisegment-upload-testfile" "$DST_DIR/multisegment-upload-testfile_p2" --parallelism 2 --progress=false +uplink cp "sj://$BUCKET/diff-size-segments" "$DST_DIR/diff-size-segments_p2" --parallelism 2 --progress=false + uplink ls "sj://$BUCKET/small-upload-testfile" | grep "small-upload-testfile" uplink rm "sj://$BUCKET/small-upload-testfile" @@ -89,6 +93,10 @@ compare_files "$SRC_DIR/diff-size-segments" "$DST_DIR/diff-size-segmen compare_files "$SRC_DIR/put-file" "$DST_DIR/put-file" compare_files "$SRC_DIR/put-file" "$DST_DIR/put-file-from-cat" +# test parallelism of single object +compare_files "$SRC_DIR/multisegment-upload-testfile" "$DST_DIR/multisegment-upload-testfile_p2" +compare_files "$SRC_DIR/diff-size-segments" "$DST_DIR/diff-size-segments_p2" + # test deleting non empty bucket with --force flag uplink mb "sj://$BUCKET/"