diff --git a/cmd/uplinkng/cmd_cp.go b/cmd/uplinkng/cmd_cp.go index 5819f7958..a21a66354 100644 --- a/cmd/uplinkng/cmd_cp.go +++ b/cmd/uplinkng/cmd_cp.go @@ -13,6 +13,7 @@ import ( "github.com/zeebo/clingy" "github.com/zeebo/errs" + "storj.io/common/ranger/httpranger" "storj.io/common/sync2" "storj.io/storj/cmd/uplinkng/ulext" "storj.io/storj/cmd/uplinkng/ulfs" @@ -27,6 +28,7 @@ type cmdCp struct { parallelism int dryrun bool progress bool + byteRange string source ulloc.Location dest ulloc.Location @@ -58,12 +60,16 @@ func (c *cmdCp) Setup(params clingy.Parameters) { c.progress = params.Flag("progress", "Show a progress bar when possible", true, clingy.Transform(strconv.ParseBool), ).(bool) + c.byteRange = params.Flag("range", "Downloads the specified range bytes of an object. For more information about the HTTP Range header, see https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35", "").(string) c.source = params.Arg("source", "Source to copy", clingy.Transform(ulloc.Parse)).(ulloc.Location) c.dest = params.Arg("dest", "Desination to copy", clingy.Transform(ulloc.Parse)).(ulloc.Location) } func (c *cmdCp) Execute(ctx clingy.Context) error { + if c.parallelism > 1 && c.byteRange != "" { + return errs.New("parallelism and range flags are mutually exclusive") + } fs, err := c.ex.OpenFilesystem(ctx, c.access) if err != nil { return err @@ -173,12 +179,44 @@ func (c *cmdCp) copyFile(ctx clingy.Context, fs ulfs.Filesystem, source, dest ul return nil } - rh, err := fs.Open(ctx, source) + var length int64 + var openOpts *ulfs.OpenOptions + + if c.byteRange != "" { + // TODO: we might want to avoid this call if ranged download will be used frequently + stat, err := fs.Stat(ctx, source) + if err != nil { + return err + } + byteRange, err := httpranger.ParseRange(c.byteRange, stat.ContentLength) + if err != nil && byteRange == nil { + return errs.New("error parsing byte range %q: %w", c.byteRange, err) + } + if len(byteRange) == 0 { + return errs.New("invalid range") + } + if len(byteRange) > 1 { + return errs.New("retrieval of multiple byte ranges of data not supported: %d provided", len(byteRange)) + } + + length = byteRange[0].Length + + openOpts = &ulfs.OpenOptions{ + Offset: byteRange[0].Start, + Length: byteRange[0].Length, + } + } + + rh, err := fs.Open(ctx, source, openOpts) if err != nil { return err } defer func() { _ = rh.Close() }() + if length == 0 { + length = rh.Info().ContentLength + } + wh, err := fs.Create(ctx, dest) if err != nil { return err @@ -188,7 +226,7 @@ func (c *cmdCp) copyFile(ctx clingy.Context, fs ulfs.Filesystem, source, dest ul var bar *progressbar.ProgressBar var writer io.Writer = wh - if length := rh.Info().ContentLength; progress && length >= 0 && !c.dest.Std() { + if progress && length >= 0 && !c.dest.Std() { bar = progressbar.New64(length).SetWriter(ctx.Stdout()) writer = bar.NewProxyWriter(writer) bar.Start() diff --git a/cmd/uplinkng/cmd_cp_test.go b/cmd/uplinkng/cmd_cp_test.go index 718ee66a0..d7a3334d2 100644 --- a/cmd/uplinkng/cmd_cp_test.go +++ b/cmd/uplinkng/cmd_cp_test.go @@ -71,6 +71,26 @@ func TestCpDownload(t *testing.T) { state.Succeed(t, "cp", "sj://user/fo", "/home/user/dest", "--recursive").RequireLocalFiles(t) }) + + t.Run("Range", func(t *testing.T) { + state := ultest.Setup(commands, + ultest.WithFile("sj://user/file-for-byte-range", "abcdefghijklmnopqrstuvwxyz"), + ) + + // multiple byte ranges not supported + state.Fail(t, "cp", "sj://user/file-for-byte-range", "/home/user/dest/file-for-byte-range", "--range", "bytes=0-1,2-3").RequireFailure(t).RequireLocalFiles(t) + + state.Succeed(t, "cp", "sj://user/file-for-byte-range", "/home/user/dest/file-for-byte-range", "--range", "bytes=0-2").RequireLocalFiles(t, + ultest.File{Loc: "/home/user/dest/file-for-byte-range", Contents: "abc"}, + ) + + state.Succeed(t, "cp", "sj://user/file-for-byte-range", "/home/user/dest/file-for-byte-range", "--range", "bytes=-1").RequireLocalFiles(t, + ultest.File{Loc: "/home/user/dest/file-for-byte-range", Contents: "z"}, + ) + + // invalid range + state.Fail(t, "cp", "sj://user/file-for-byte-range", "/home/user/dest/file-for-byte-range", "--range", "bytes=0,-1").RequireFailure(t).RequireLocalFiles(t) + }) } func TestCpUpload(t *testing.T) { diff --git a/cmd/uplinkng/ulfs/filesystem.go b/cmd/uplinkng/ulfs/filesystem.go index e3bcc6313..f637524ae 100644 --- a/cmd/uplinkng/ulfs/filesystem.go +++ b/cmd/uplinkng/ulfs/filesystem.go @@ -33,14 +33,21 @@ type RemoveOptions struct { func (ro *RemoveOptions) isPending() bool { return ro != nil && ro.Pending } +// OpenOptions describes options for Filesystem.Open. +type OpenOptions struct { + Offset int64 + Length int64 +} + // Filesystem represents either the local Filesystem or the data backed by a project. type Filesystem interface { Close() error - Open(ctx clingy.Context, loc ulloc.Location) (ReadHandle, error) + Open(ctx clingy.Context, loc ulloc.Location, opts *OpenOptions) (ReadHandle, error) Create(ctx clingy.Context, loc ulloc.Location) (WriteHandle, error) Remove(ctx context.Context, loc ulloc.Location, opts *RemoveOptions) error List(ctx context.Context, prefix ulloc.Location, opts *ListOptions) (ObjectIterator, error) IsLocalDir(ctx context.Context, loc ulloc.Location) bool + Stat(ctx context.Context, loc ulloc.Location) (*ObjectInfo, error) } // diff --git a/cmd/uplinkng/ulfs/local.go b/cmd/uplinkng/ulfs/local.go index c4419e3d0..fe789646a 100644 --- a/cmd/uplinkng/ulfs/local.go +++ b/cmd/uplinkng/ulfs/local.go @@ -5,6 +5,7 @@ package ulfs import ( "context" + "io" "io/ioutil" "os" "path/filepath" @@ -25,11 +26,17 @@ func NewLocal() *Local { } // Open returns a read ReadHandle for the given local path. -func (l *Local) Open(ctx context.Context, path string) (ReadHandle, error) { +func (l *Local) Open(ctx context.Context, path string, opts *OpenOptions) (ReadHandle, error) { fh, err := os.Open(path) if err != nil { return nil, errs.Wrap(err) } + + if opts != nil { + fr := io.NewSectionReader(fh, opts.Offset, opts.Length) + return newGenericReadHandle(fr), nil + } + return newOSReadHandle(fh) } @@ -143,6 +150,20 @@ func (l *Local) IsLocalDir(ctx context.Context, path string) bool { return fi.IsDir() } +// Stat returns an ObjectInfo describing the provided path. +func (l *Local) Stat(ctx context.Context, path string) (*ObjectInfo, error) { + fi, err := os.Stat(path) + if err != nil { + return nil, err + } + + return &ObjectInfo{ + Loc: ulloc.NewLocal(path), + Created: fi.ModTime(), + ContentLength: fi.Size(), + }, nil +} + type namedFileInfo struct { os.FileInfo name string diff --git a/cmd/uplinkng/ulfs/mixed.go b/cmd/uplinkng/ulfs/mixed.go index 5357e4de5..ba371d2bc 100644 --- a/cmd/uplinkng/ulfs/mixed.go +++ b/cmd/uplinkng/ulfs/mixed.go @@ -32,11 +32,11 @@ func (m *Mixed) Close() error { } // Open returns a ReadHandle to either a local file, remote object, or stdin. -func (m *Mixed) Open(ctx clingy.Context, loc ulloc.Location) (ReadHandle, error) { +func (m *Mixed) Open(ctx clingy.Context, loc ulloc.Location, opts *OpenOptions) (ReadHandle, error) { if bucket, key, ok := loc.RemoteParts(); ok { - return m.remote.Open(ctx, bucket, key) + return m.remote.Open(ctx, bucket, key, opts) } else if path, ok := loc.LocalParts(); ok { - return m.local.Open(ctx, path) + return m.local.Open(ctx, path, opts) } return newGenericReadHandle(ctx.Stdin()), nil } @@ -79,3 +79,13 @@ func (m *Mixed) IsLocalDir(ctx context.Context, loc ulloc.Location) bool { } return false } + +// Stat returns information about an object at the specified Location. +func (m *Mixed) Stat(ctx context.Context, loc ulloc.Location) (*ObjectInfo, error) { + if bucket, key, ok := loc.RemoteParts(); ok { + return m.remote.Stat(ctx, bucket, key) + } else if path, ok := loc.LocalParts(); ok { + return m.local.Stat(ctx, path) + } + return nil, errs.New("unable to stat loc %q", loc.Loc()) +} diff --git a/cmd/uplinkng/ulfs/remote.go b/cmd/uplinkng/ulfs/remote.go index 6afdcdaa5..01336283f 100644 --- a/cmd/uplinkng/ulfs/remote.go +++ b/cmd/uplinkng/ulfs/remote.go @@ -31,14 +31,31 @@ func (r *Remote) Close() error { } // Open returns a ReadHandle for the object identified by a given bucket and key. -func (r *Remote) Open(ctx context.Context, bucket, key string) (ReadHandle, error) { - fh, err := r.project.DownloadObject(ctx, bucket, key, nil) +func (r *Remote) Open(ctx context.Context, bucket, key string, opts *OpenOptions) (ReadHandle, error) { + var downloadOpts *uplink.DownloadOptions + if opts != nil { + downloadOpts = &uplink.DownloadOptions{ + Offset: opts.Offset, + Length: opts.Length, + } + } + fh, err := r.project.DownloadObject(ctx, bucket, key, downloadOpts) if err != nil { return nil, errs.Wrap(err) } return newUplinkReadHandle(bucket, fh), nil } +// Stat returns information about an object at the specified key. +func (r *Remote) Stat(ctx context.Context, bucket, key string) (*ObjectInfo, error) { + fstat, err := r.project.StatObject(ctx, bucket, key) + if err != nil { + return nil, errs.Wrap(err) + } + stat := uplinkObjectToObjectInfo(bucket, fstat) + return &stat, nil +} + // Create returns a WriteHandle for the object identified by a given bucket and key. func (r *Remote) Create(ctx context.Context, bucket, key string) (WriteHandle, error) { fh, err := r.project.UploadObject(ctx, bucket, key, nil) diff --git a/cmd/uplinkng/ultest/filesystem.go b/cmd/uplinkng/ultest/filesystem.go index 8a18c23f0..d13e581a6 100644 --- a/cmd/uplinkng/ultest/filesystem.go +++ b/cmd/uplinkng/ultest/filesystem.go @@ -77,7 +77,7 @@ func (tfs *testFilesystem) Close() error { return nil } -func (tfs *testFilesystem) Open(ctx clingy.Context, loc ulloc.Location) (_ ulfs.ReadHandle, err error) { +func (tfs *testFilesystem) Open(ctx clingy.Context, loc ulloc.Location, opts *ulfs.OpenOptions) (_ ulfs.ReadHandle, err error) { if loc.Std() { return &byteReadHandle{Buffer: bytes.NewBufferString("-")}, nil } @@ -86,6 +86,11 @@ func (tfs *testFilesystem) Open(ctx clingy.Context, loc ulloc.Location) (_ ulfs. if !ok { return nil, errs.New("file does not exist") } + + if opts != nil { + return &byteReadHandle{Buffer: bytes.NewBufferString(mf.contents[opts.Offset:(opts.Offset + opts.Length)])}, nil + } + return &byteReadHandle{Buffer: bytes.NewBufferString(mf.contents)}, nil } @@ -194,6 +199,23 @@ func (tfs *testFilesystem) IsLocalDir(ctx context.Context, loc ulloc.Location) ( return ok && (ulloc.CleanPath(path) == "." || tfs.locals[path]) } +func (tfs *testFilesystem) Stat(ctx context.Context, loc ulloc.Location) (*ulfs.ObjectInfo, error) { + if loc.Std() { + return nil, errs.New("unable to stat loc %q", loc.Loc()) + } + + mf, ok := tfs.files[loc] + if !ok { + return nil, errs.New("file does not exist: %q", loc.Loc()) + } + + return &ulfs.ObjectInfo{ + Loc: loc, + Created: time.Unix(mf.created, 0), + ContentLength: int64(len(mf.contents)), + }, nil +} + func (tfs *testFilesystem) mkdirAll(ctx context.Context, dir string) error { i := 0 for i < len(dir) { diff --git a/scripts/test-uplinkng.sh b/scripts/test-uplinkng.sh index 02f911321..c8220c7d4 100755 --- a/scripts/test-uplinkng.sh +++ b/scripts/test-uplinkng.sh @@ -72,6 +72,27 @@ uplinkng cp "sj://$BUCKET/diff-size-segments" "$DST_DIR" --progress=f uplinkng ls "sj://$BUCKET/small-upload-testfile" --access $STORJ_ACCESS | grep "small-upload-testfile" +# test ranged download of object +uplinkng cp "sj://$BUCKET/small-upload-testfile" "$DST_DIR/file-from-cp-range" --progress=false --range bytes=0-5 +EXPECTED_FILE_SIZE="6" +ACTUAL_FILE_SIZE=$(get_file_size "$DST_DIR/file-from-cp-range") +if [ "$EXPECTED_FILE_SIZE" != "$ACTUAL_FILE_SIZE" ] +then + echo "expected downloaded file size to be equal to $EXPECTED_FILE_SIZE, got $ACTUAL_FILE_SIZE" + exit 1 +fi + +# test ranged download with multiple byte range +set +e +EXPECTED_ERROR="retrieval of multiple byte ranges of data not supported: 2 provided" +ERROR=$(uplinkng cp "sj://$BUCKET/small-upload-testfile" "$DST_DIR/file-from-cp-range" --range bytes=0-5,6-10) +if [ $ERROR != $EXPECTED_ERROR ] +then + echo EXPECTED_ERROR + exit 1 +fi +set -e + uplinkng rm "sj://$BUCKET/small-upload-testfile" --access $STORJ_ACCESS uplinkng rm "sj://$BUCKET/big-upload-testfile" --access $STORJ_ACCESS uplinkng rm "sj://$BUCKET/multisegment-upload-testfile" --access $STORJ_ACCESS