From 16a334020fc27a6685862321b2f72d3fceae4ef9 Mon Sep 17 00:00:00 2001 From: Clement Sam Date: Tue, 26 Oct 2021 10:49:03 +0000 Subject: [PATCH] cmd/uplinkng: add ranged download This change adds the ability to download byte ranges to uplinkng. Extended the uplinkng Filesystem interface with Stat method and an OpenOptions struct as parameter for the Open method. Also added a few tests for the ranged download Change-Id: I89a7276a75c51a4b22d7a450f15b3eb18ba838d4 --- cmd/uplinkng/cmd_cp.go | 42 +++++++++++++++++++++++++++++-- cmd/uplinkng/cmd_cp_test.go | 20 +++++++++++++++ cmd/uplinkng/ulfs/filesystem.go | 9 ++++++- cmd/uplinkng/ulfs/local.go | 23 ++++++++++++++++- cmd/uplinkng/ulfs/mixed.go | 16 +++++++++--- cmd/uplinkng/ulfs/remote.go | 21 ++++++++++++++-- cmd/uplinkng/ultest/filesystem.go | 24 +++++++++++++++++- scripts/test-uplinkng.sh | 21 ++++++++++++++++ 8 files changed, 166 insertions(+), 10 deletions(-) diff --git a/cmd/uplinkng/cmd_cp.go b/cmd/uplinkng/cmd_cp.go index 5819f7958..a21a66354 100644 --- a/cmd/uplinkng/cmd_cp.go +++ b/cmd/uplinkng/cmd_cp.go @@ -13,6 +13,7 @@ import ( "github.com/zeebo/clingy" "github.com/zeebo/errs" + "storj.io/common/ranger/httpranger" "storj.io/common/sync2" "storj.io/storj/cmd/uplinkng/ulext" "storj.io/storj/cmd/uplinkng/ulfs" @@ -27,6 +28,7 @@ type cmdCp struct { parallelism int dryrun bool progress bool + byteRange string source ulloc.Location dest ulloc.Location @@ -58,12 +60,16 @@ func (c *cmdCp) Setup(params clingy.Parameters) { c.progress = params.Flag("progress", "Show a progress bar when possible", true, clingy.Transform(strconv.ParseBool), ).(bool) + c.byteRange = params.Flag("range", "Downloads the specified range bytes of an object. For more information about the HTTP Range header, see https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35", "").(string) c.source = params.Arg("source", "Source to copy", clingy.Transform(ulloc.Parse)).(ulloc.Location) c.dest = params.Arg("dest", "Desination to copy", clingy.Transform(ulloc.Parse)).(ulloc.Location) } func (c *cmdCp) Execute(ctx clingy.Context) error { + if c.parallelism > 1 && c.byteRange != "" { + return errs.New("parallelism and range flags are mutually exclusive") + } fs, err := c.ex.OpenFilesystem(ctx, c.access) if err != nil { return err @@ -173,12 +179,44 @@ func (c *cmdCp) copyFile(ctx clingy.Context, fs ulfs.Filesystem, source, dest ul return nil } - rh, err := fs.Open(ctx, source) + var length int64 + var openOpts *ulfs.OpenOptions + + if c.byteRange != "" { + // TODO: we might want to avoid this call if ranged download will be used frequently + stat, err := fs.Stat(ctx, source) + if err != nil { + return err + } + byteRange, err := httpranger.ParseRange(c.byteRange, stat.ContentLength) + if err != nil && byteRange == nil { + return errs.New("error parsing byte range %q: %w", c.byteRange, err) + } + if len(byteRange) == 0 { + return errs.New("invalid range") + } + if len(byteRange) > 1 { + return errs.New("retrieval of multiple byte ranges of data not supported: %d provided", len(byteRange)) + } + + length = byteRange[0].Length + + openOpts = &ulfs.OpenOptions{ + Offset: byteRange[0].Start, + Length: byteRange[0].Length, + } + } + + rh, err := fs.Open(ctx, source, openOpts) if err != nil { return err } defer func() { _ = rh.Close() }() + if length == 0 { + length = rh.Info().ContentLength + } + wh, err := fs.Create(ctx, dest) if err != nil { return err @@ -188,7 +226,7 @@ func (c *cmdCp) copyFile(ctx clingy.Context, fs ulfs.Filesystem, source, dest ul var bar *progressbar.ProgressBar var writer io.Writer = wh - if length := rh.Info().ContentLength; progress && length >= 0 && !c.dest.Std() { + if progress && length >= 0 && !c.dest.Std() { bar = progressbar.New64(length).SetWriter(ctx.Stdout()) writer = bar.NewProxyWriter(writer) bar.Start() diff --git a/cmd/uplinkng/cmd_cp_test.go b/cmd/uplinkng/cmd_cp_test.go index 718ee66a0..d7a3334d2 100644 --- a/cmd/uplinkng/cmd_cp_test.go +++ b/cmd/uplinkng/cmd_cp_test.go @@ -71,6 +71,26 @@ func TestCpDownload(t *testing.T) { state.Succeed(t, "cp", "sj://user/fo", "/home/user/dest", "--recursive").RequireLocalFiles(t) }) + + t.Run("Range", func(t *testing.T) { + state := ultest.Setup(commands, + ultest.WithFile("sj://user/file-for-byte-range", "abcdefghijklmnopqrstuvwxyz"), + ) + + // multiple byte ranges not supported + state.Fail(t, "cp", "sj://user/file-for-byte-range", "/home/user/dest/file-for-byte-range", "--range", "bytes=0-1,2-3").RequireFailure(t).RequireLocalFiles(t) + + state.Succeed(t, "cp", "sj://user/file-for-byte-range", "/home/user/dest/file-for-byte-range", "--range", "bytes=0-2").RequireLocalFiles(t, + ultest.File{Loc: "/home/user/dest/file-for-byte-range", Contents: "abc"}, + ) + + state.Succeed(t, "cp", "sj://user/file-for-byte-range", "/home/user/dest/file-for-byte-range", "--range", "bytes=-1").RequireLocalFiles(t, + ultest.File{Loc: "/home/user/dest/file-for-byte-range", Contents: "z"}, + ) + + // invalid range + state.Fail(t, "cp", "sj://user/file-for-byte-range", "/home/user/dest/file-for-byte-range", "--range", "bytes=0,-1").RequireFailure(t).RequireLocalFiles(t) + }) } func TestCpUpload(t *testing.T) { diff --git a/cmd/uplinkng/ulfs/filesystem.go b/cmd/uplinkng/ulfs/filesystem.go index e3bcc6313..f637524ae 100644 --- a/cmd/uplinkng/ulfs/filesystem.go +++ b/cmd/uplinkng/ulfs/filesystem.go @@ -33,14 +33,21 @@ type RemoveOptions struct { func (ro *RemoveOptions) isPending() bool { return ro != nil && ro.Pending } +// OpenOptions describes options for Filesystem.Open. +type OpenOptions struct { + Offset int64 + Length int64 +} + // Filesystem represents either the local Filesystem or the data backed by a project. type Filesystem interface { Close() error - Open(ctx clingy.Context, loc ulloc.Location) (ReadHandle, error) + Open(ctx clingy.Context, loc ulloc.Location, opts *OpenOptions) (ReadHandle, error) Create(ctx clingy.Context, loc ulloc.Location) (WriteHandle, error) Remove(ctx context.Context, loc ulloc.Location, opts *RemoveOptions) error List(ctx context.Context, prefix ulloc.Location, opts *ListOptions) (ObjectIterator, error) IsLocalDir(ctx context.Context, loc ulloc.Location) bool + Stat(ctx context.Context, loc ulloc.Location) (*ObjectInfo, error) } // diff --git a/cmd/uplinkng/ulfs/local.go b/cmd/uplinkng/ulfs/local.go index c4419e3d0..fe789646a 100644 --- a/cmd/uplinkng/ulfs/local.go +++ b/cmd/uplinkng/ulfs/local.go @@ -5,6 +5,7 @@ package ulfs import ( "context" + "io" "io/ioutil" "os" "path/filepath" @@ -25,11 +26,17 @@ func NewLocal() *Local { } // Open returns a read ReadHandle for the given local path. -func (l *Local) Open(ctx context.Context, path string) (ReadHandle, error) { +func (l *Local) Open(ctx context.Context, path string, opts *OpenOptions) (ReadHandle, error) { fh, err := os.Open(path) if err != nil { return nil, errs.Wrap(err) } + + if opts != nil { + fr := io.NewSectionReader(fh, opts.Offset, opts.Length) + return newGenericReadHandle(fr), nil + } + return newOSReadHandle(fh) } @@ -143,6 +150,20 @@ func (l *Local) IsLocalDir(ctx context.Context, path string) bool { return fi.IsDir() } +// Stat returns an ObjectInfo describing the provided path. +func (l *Local) Stat(ctx context.Context, path string) (*ObjectInfo, error) { + fi, err := os.Stat(path) + if err != nil { + return nil, err + } + + return &ObjectInfo{ + Loc: ulloc.NewLocal(path), + Created: fi.ModTime(), + ContentLength: fi.Size(), + }, nil +} + type namedFileInfo struct { os.FileInfo name string diff --git a/cmd/uplinkng/ulfs/mixed.go b/cmd/uplinkng/ulfs/mixed.go index 5357e4de5..ba371d2bc 100644 --- a/cmd/uplinkng/ulfs/mixed.go +++ b/cmd/uplinkng/ulfs/mixed.go @@ -32,11 +32,11 @@ func (m *Mixed) Close() error { } // Open returns a ReadHandle to either a local file, remote object, or stdin. -func (m *Mixed) Open(ctx clingy.Context, loc ulloc.Location) (ReadHandle, error) { +func (m *Mixed) Open(ctx clingy.Context, loc ulloc.Location, opts *OpenOptions) (ReadHandle, error) { if bucket, key, ok := loc.RemoteParts(); ok { - return m.remote.Open(ctx, bucket, key) + return m.remote.Open(ctx, bucket, key, opts) } else if path, ok := loc.LocalParts(); ok { - return m.local.Open(ctx, path) + return m.local.Open(ctx, path, opts) } return newGenericReadHandle(ctx.Stdin()), nil } @@ -79,3 +79,13 @@ func (m *Mixed) IsLocalDir(ctx context.Context, loc ulloc.Location) bool { } return false } + +// Stat returns information about an object at the specified Location. +func (m *Mixed) Stat(ctx context.Context, loc ulloc.Location) (*ObjectInfo, error) { + if bucket, key, ok := loc.RemoteParts(); ok { + return m.remote.Stat(ctx, bucket, key) + } else if path, ok := loc.LocalParts(); ok { + return m.local.Stat(ctx, path) + } + return nil, errs.New("unable to stat loc %q", loc.Loc()) +} diff --git a/cmd/uplinkng/ulfs/remote.go b/cmd/uplinkng/ulfs/remote.go index 6afdcdaa5..01336283f 100644 --- a/cmd/uplinkng/ulfs/remote.go +++ b/cmd/uplinkng/ulfs/remote.go @@ -31,14 +31,31 @@ func (r *Remote) Close() error { } // Open returns a ReadHandle for the object identified by a given bucket and key. -func (r *Remote) Open(ctx context.Context, bucket, key string) (ReadHandle, error) { - fh, err := r.project.DownloadObject(ctx, bucket, key, nil) +func (r *Remote) Open(ctx context.Context, bucket, key string, opts *OpenOptions) (ReadHandle, error) { + var downloadOpts *uplink.DownloadOptions + if opts != nil { + downloadOpts = &uplink.DownloadOptions{ + Offset: opts.Offset, + Length: opts.Length, + } + } + fh, err := r.project.DownloadObject(ctx, bucket, key, downloadOpts) if err != nil { return nil, errs.Wrap(err) } return newUplinkReadHandle(bucket, fh), nil } +// Stat returns information about an object at the specified key. +func (r *Remote) Stat(ctx context.Context, bucket, key string) (*ObjectInfo, error) { + fstat, err := r.project.StatObject(ctx, bucket, key) + if err != nil { + return nil, errs.Wrap(err) + } + stat := uplinkObjectToObjectInfo(bucket, fstat) + return &stat, nil +} + // Create returns a WriteHandle for the object identified by a given bucket and key. func (r *Remote) Create(ctx context.Context, bucket, key string) (WriteHandle, error) { fh, err := r.project.UploadObject(ctx, bucket, key, nil) diff --git a/cmd/uplinkng/ultest/filesystem.go b/cmd/uplinkng/ultest/filesystem.go index 8a18c23f0..d13e581a6 100644 --- a/cmd/uplinkng/ultest/filesystem.go +++ b/cmd/uplinkng/ultest/filesystem.go @@ -77,7 +77,7 @@ func (tfs *testFilesystem) Close() error { return nil } -func (tfs *testFilesystem) Open(ctx clingy.Context, loc ulloc.Location) (_ ulfs.ReadHandle, err error) { +func (tfs *testFilesystem) Open(ctx clingy.Context, loc ulloc.Location, opts *ulfs.OpenOptions) (_ ulfs.ReadHandle, err error) { if loc.Std() { return &byteReadHandle{Buffer: bytes.NewBufferString("-")}, nil } @@ -86,6 +86,11 @@ func (tfs *testFilesystem) Open(ctx clingy.Context, loc ulloc.Location) (_ ulfs. if !ok { return nil, errs.New("file does not exist") } + + if opts != nil { + return &byteReadHandle{Buffer: bytes.NewBufferString(mf.contents[opts.Offset:(opts.Offset + opts.Length)])}, nil + } + return &byteReadHandle{Buffer: bytes.NewBufferString(mf.contents)}, nil } @@ -194,6 +199,23 @@ func (tfs *testFilesystem) IsLocalDir(ctx context.Context, loc ulloc.Location) ( return ok && (ulloc.CleanPath(path) == "." || tfs.locals[path]) } +func (tfs *testFilesystem) Stat(ctx context.Context, loc ulloc.Location) (*ulfs.ObjectInfo, error) { + if loc.Std() { + return nil, errs.New("unable to stat loc %q", loc.Loc()) + } + + mf, ok := tfs.files[loc] + if !ok { + return nil, errs.New("file does not exist: %q", loc.Loc()) + } + + return &ulfs.ObjectInfo{ + Loc: loc, + Created: time.Unix(mf.created, 0), + ContentLength: int64(len(mf.contents)), + }, nil +} + func (tfs *testFilesystem) mkdirAll(ctx context.Context, dir string) error { i := 0 for i < len(dir) { diff --git a/scripts/test-uplinkng.sh b/scripts/test-uplinkng.sh index 02f911321..c8220c7d4 100755 --- a/scripts/test-uplinkng.sh +++ b/scripts/test-uplinkng.sh @@ -72,6 +72,27 @@ uplinkng cp "sj://$BUCKET/diff-size-segments" "$DST_DIR" --progress=f uplinkng ls "sj://$BUCKET/small-upload-testfile" --access $STORJ_ACCESS | grep "small-upload-testfile" +# test ranged download of object +uplinkng cp "sj://$BUCKET/small-upload-testfile" "$DST_DIR/file-from-cp-range" --progress=false --range bytes=0-5 +EXPECTED_FILE_SIZE="6" +ACTUAL_FILE_SIZE=$(get_file_size "$DST_DIR/file-from-cp-range") +if [ "$EXPECTED_FILE_SIZE" != "$ACTUAL_FILE_SIZE" ] +then + echo "expected downloaded file size to be equal to $EXPECTED_FILE_SIZE, got $ACTUAL_FILE_SIZE" + exit 1 +fi + +# test ranged download with multiple byte range +set +e +EXPECTED_ERROR="retrieval of multiple byte ranges of data not supported: 2 provided" +ERROR=$(uplinkng cp "sj://$BUCKET/small-upload-testfile" "$DST_DIR/file-from-cp-range" --range bytes=0-5,6-10) +if [ $ERROR != $EXPECTED_ERROR ] +then + echo EXPECTED_ERROR + exit 1 +fi +set -e + uplinkng rm "sj://$BUCKET/small-upload-testfile" --access $STORJ_ACCESS uplinkng rm "sj://$BUCKET/big-upload-testfile" --access $STORJ_ACCESS uplinkng rm "sj://$BUCKET/multisegment-upload-testfile" --access $STORJ_ACCESS