cmd/uplinkng: introduce MultiReadHandle

Change-Id: I57b98b5e1406e7b38edf3bc65907d9796a1a663b
This commit is contained in:
Jeff Wendling 2021-12-09 13:36:11 -05:00 committed by Michal Niewrzal
parent c33d2f58cf
commit 34890c9195
10 changed files with 498 additions and 124 deletions

View File

@ -179,8 +179,7 @@ func (c *cmdCp) copyFile(ctx clingy.Context, fs ulfs.Filesystem, source, dest ul
return nil
}
var length int64
var openOpts *ulfs.OpenOptions
var offset, length int64 = 0, -1
if c.byteRange != "" {
// TODO: we might want to avoid this call if ranged download will be used frequently
@ -199,21 +198,25 @@ func (c *cmdCp) copyFile(ctx clingy.Context, fs ulfs.Filesystem, source, dest ul
return errs.New("retrieval of multiple byte ranges of data not supported: %d provided", len(byteRange))
}
length = byteRange[0].Length
openOpts = &ulfs.OpenOptions{
Offset: byteRange[0].Start,
Length: byteRange[0].Length,
}
offset, length = byteRange[0].Start, byteRange[0].Length
}
rh, err := fs.Open(ctx, source, openOpts)
mrh, err := fs.Open(ctx, source)
if err != nil {
return err
}
defer func() { _ = rh.Close() }()
defer func() { _ = mrh.Close() }()
if length == 0 {
if err := mrh.SetOffset(offset); err != nil {
return err
}
rh, err := mrh.NextPart(ctx, length)
if err != nil {
return err
}
if length == -1 {
length = rh.Info().ContentLength
}

View File

@ -33,16 +33,10 @@ type RemoveOptions struct {
func (ro *RemoveOptions) isPending() bool { return ro != nil && ro.Pending }
// OpenOptions describes options for Filesystem.Open.
type OpenOptions struct {
Offset int64
Length int64
}
// Filesystem represents either the local Filesystem or the data backed by a project.
type Filesystem interface {
Close() error
Open(ctx clingy.Context, loc ulloc.Location, opts *OpenOptions) (ReadHandle, error)
Open(ctx clingy.Context, loc ulloc.Location) (MultiReadHandle, error)
Create(ctx clingy.Context, loc ulloc.Location) (WriteHandle, error)
Move(ctx clingy.Context, source, dest ulloc.Location) error
Remove(ctx context.Context, loc ulloc.Location, opts *RemoveOptions) error
@ -94,70 +88,28 @@ func uplinkUploadInfoToObjectInfo(bucket string, upl *uplink.UploadInfo) ObjectI
// read handles
//
// ReadHandle is something that can be read from.
type ReadHandle interface {
io.Reader
// MultiReadHandle allows one to read different sections of something.
// The offset parameter can be negative to signal that the offset should
// start that many bytes back from the end. Any negative value for length
// indicates to read up to the end.
//
// TODO: A negative offset requires a negative length, but there is no
// reason why that must be so.
type MultiReadHandle interface {
io.Closer
SetOffset(offset int64) error
NextPart(ctx context.Context, length int64) (ReadHandle, error)
Info(ctx context.Context) (*ObjectInfo, error)
}
// ReadHandle is something that can be read from distinct parts possibly
// in parallel.
type ReadHandle interface {
io.Closer
io.Reader
Info() ObjectInfo
}
// uplinkReadHandle implements readHandle for *uplink.Downloads.
type uplinkReadHandle struct {
bucket string
dl *uplink.Download
}
// newUplinkReadHandle constructs an *uplinkReadHandle from an *uplink.Download.
func newUplinkReadHandle(bucket string, dl *uplink.Download) *uplinkReadHandle {
return &uplinkReadHandle{
bucket: bucket,
dl: dl,
}
}
func (u *uplinkReadHandle) Read(p []byte) (int, error) { return u.dl.Read(p) }
func (u *uplinkReadHandle) Close() error { return u.dl.Close() }
func (u *uplinkReadHandle) Info() ObjectInfo { return uplinkObjectToObjectInfo(u.bucket, u.dl.Info()) }
// osReadHandle implements readHandle for *os.Files.
type osReadHandle struct {
raw *os.File
info ObjectInfo
}
// newOsReadHandle constructs an *osReadHandle from an *os.File.
func newOSReadHandle(fh *os.File) (*osReadHandle, error) {
fi, err := fh.Stat()
if err != nil {
return nil, errs.Wrap(err)
}
return &osReadHandle{
raw: fh,
info: ObjectInfo{
Loc: ulloc.NewLocal(fh.Name()),
IsPrefix: false,
Created: fi.ModTime(), // TODO: os specific crtime
ContentLength: fi.Size(),
},
}, nil
}
func (o *osReadHandle) Read(p []byte) (int, error) { return o.raw.Read(p) }
func (o *osReadHandle) Close() error { return o.raw.Close() }
func (o *osReadHandle) Info() ObjectInfo { return o.info }
// genericReadHandle implements readHandle for an io.Reader.
type genericReadHandle struct{ r io.Reader }
// newGenericReadHandle constructs a *genericReadHandle from any io.Reader.
func newGenericReadHandle(r io.Reader) *genericReadHandle {
return &genericReadHandle{r: r}
}
func (g *genericReadHandle) Read(p []byte) (int, error) { return g.r.Read(p) }
func (g *genericReadHandle) Close() error { return nil }
func (g *genericReadHandle) Info() ObjectInfo { return ObjectInfo{ContentLength: -1} }
//
// write handles
//

View File

@ -0,0 +1,30 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package ulfs
import (
"os"
"github.com/zeebo/errs"
"storj.io/storj/cmd/uplinkng/ulloc"
)
//
// read handles
//
// osMultiReadHandle implements MultiReadHandle for *os.Files.
func newOSMultiReadHandle(fh *os.File) (MultiReadHandle, error) {
fi, err := fh.Stat()
if err != nil {
return nil, errs.Wrap(err)
}
return NewGenericMultiReadHandle(fh, ObjectInfo{
Loc: ulloc.NewLocal(fh.Name()),
IsPrefix: false,
Created: fi.ModTime(), // TODO: os specific crtime
ContentLength: fi.Size(),
}), nil
}

View File

@ -0,0 +1,130 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package ulfs
import (
"context"
"io"
"sync"
"github.com/zeebo/errs"
)
//
// read handles
//
// GenericReader is an interface that can be turned into a GenericMultiReadHandle.
type GenericReader interface {
io.Closer
io.ReaderAt
}
// NewGenericMultiReadHandle implements MultiReadHandle for *os.Files.
func NewGenericMultiReadHandle(r GenericReader, info ObjectInfo) *GenericMultiReadHandle {
return &GenericMultiReadHandle{
r: r,
info: info,
}
}
// GenericMultiReadHandle can turn any GenericReader into a MultiReadHandle.
type GenericMultiReadHandle struct {
r GenericReader
info ObjectInfo
mu sync.Mutex
off int64
done bool
}
// Close closes the GenericMultiReadHandle.
func (o *GenericMultiReadHandle) Close() error {
o.mu.Lock()
defer o.mu.Unlock()
if o.done {
return nil
}
o.done = true
return o.r.Close()
}
// SetOffset will set the offset for the next call to NextPart.
func (o *GenericMultiReadHandle) SetOffset(offset int64) error {
o.mu.Lock()
defer o.mu.Unlock()
if o.done {
return errs.New("already closed")
}
o.off = offset
return nil
}
// NextPart returns a ReadHandle of length bytes at the current offset.
func (o *GenericMultiReadHandle) NextPart(ctx context.Context, length int64) (ReadHandle, error) {
o.mu.Lock()
defer o.mu.Unlock()
if o.done {
return nil, errs.New("already closed")
}
if o.off < 0 {
o.off += o.info.ContentLength
}
if o.off < 0 || o.off > o.info.ContentLength {
return nil, errs.New("invalid offset: %d for length %d", o.off, o.info.ContentLength)
}
if o.off == o.info.ContentLength {
return nil, io.EOF
}
if length < 0 {
length = o.info.ContentLength
}
if o.off+length > o.info.ContentLength {
length = o.info.ContentLength - o.off
}
r := &genericReadHandle{
r: o.r,
info: o.info,
off: o.off,
len: length,
}
o.off += length
return r, nil
}
// Info returns the object info.
func (o *GenericMultiReadHandle) Info(ctx context.Context) (*ObjectInfo, error) {
info := o.info
return &info, nil
}
type genericReadHandle struct {
r GenericReader
info ObjectInfo
off int64
len int64
}
func (o *genericReadHandle) Close() error { return nil }
func (o *genericReadHandle) Info() ObjectInfo { return o.info }
func (o *genericReadHandle) Read(p []byte) (int, error) {
if o.len <= 0 {
return 0, io.EOF
} else if o.len < int64(len(p)) {
p = p[:o.len]
}
n, err := o.r.ReadAt(p, o.off)
o.off += int64(n)
o.len -= int64(n)
return n, err
}

View File

@ -0,0 +1,130 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package ulfs
import (
"context"
"io"
"sync"
"github.com/zeebo/errs"
"storj.io/common/sync2"
)
//
// read handles
//
// stdMultiReadHandle implements MultiReadHandle for stdin.
type stdMultiReadHandle struct {
stdin io.Reader
mu sync.Mutex
curr *stdReadHandle
done bool
}
func newStdMultiReadHandle(stdin io.Reader) *stdMultiReadHandle {
return &stdMultiReadHandle{
stdin: stdin,
}
}
func (o *stdMultiReadHandle) Close() error {
o.mu.Lock()
defer o.mu.Unlock()
o.done = true
return nil
}
func (o *stdMultiReadHandle) SetOffset(offset int64) error {
return errs.New("cannot set offset on stdin read handle")
}
func (o *stdMultiReadHandle) NextPart(ctx context.Context, length int64) (ReadHandle, error) {
o.mu.Lock()
defer o.mu.Unlock()
if o.done {
return nil, errs.New("already closed")
}
if o.curr != nil {
if !o.curr.done.Wait(ctx) {
return nil, ctx.Err()
}
o.curr.mu.Lock()
defer o.curr.mu.Unlock()
if o.curr.err != nil {
return nil, o.curr.err
}
}
o.curr = &stdReadHandle{
stdin: o.stdin,
len: length,
}
return o.curr, nil
}
func (o *stdMultiReadHandle) Info(ctx context.Context) (*ObjectInfo, error) {
return &ObjectInfo{ContentLength: -1}, nil
}
// stdReadHandle implements ReadHandle for stdin.
type stdReadHandle struct {
stdin io.Reader
mu sync.Mutex
done sync2.Fence
err error
len int64
closed bool
}
func (o *stdReadHandle) Info() ObjectInfo { return ObjectInfo{ContentLength: -1} }
func (o *stdReadHandle) Close() error {
o.mu.Lock()
defer o.mu.Unlock()
o.closed = true
o.done.Release()
return nil
}
func (o *stdReadHandle) Read(p []byte) (int, error) {
o.mu.Lock()
defer o.mu.Unlock()
if o.err != nil {
return 0, o.err
} else if o.closed {
return 0, io.EOF
}
if o.len < int64(len(p)) {
p = p[:o.len]
}
n, err := o.stdin.Read(p)
o.len -= int64(n)
if err != nil && o.err == nil {
o.err = err
o.done.Release()
}
if o.len <= 0 {
o.closed = true
o.done.Release()
}
return n, err
}

View File

@ -0,0 +1,149 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package ulfs
import (
"context"
"io"
"sync"
"github.com/zeebo/errs"
"storj.io/uplink"
)
//
// read handles
//
type uplinkMultiReadHandle struct {
project *uplink.Project
bucket string
key string
mu sync.Mutex
done bool
eof bool
off int64
info *ObjectInfo
}
func newUplinkMultiReadHandle(project *uplink.Project, bucket, key string) *uplinkMultiReadHandle {
return &uplinkMultiReadHandle{
project: project,
bucket: bucket,
key: key,
}
}
func (u *uplinkMultiReadHandle) Close() error {
u.mu.Lock()
defer u.mu.Unlock()
u.done = true
return nil
}
func (u *uplinkMultiReadHandle) SetOffset(offset int64) error {
u.mu.Lock()
defer u.mu.Unlock()
if u.done {
return errs.New("already closed")
}
u.off = offset
u.eof = false
return nil
}
func (u *uplinkMultiReadHandle) NextPart(ctx context.Context, length int64) (ReadHandle, error) {
opts, err := func() (opts *uplink.DownloadOptions, err error) {
u.mu.Lock()
defer u.mu.Unlock()
if u.done {
return nil, errs.New("already closed")
} else if u.eof {
return nil, io.EOF
} else if u.info != nil && u.off >= u.info.ContentLength {
return nil, io.EOF
}
opts = &uplink.DownloadOptions{Offset: u.off, Length: length}
if u.off < 0 {
opts.Length = -1
u.eof = u.off+length > 0
}
u.off += length
return opts, nil
}()
if err != nil {
return nil, err
}
dl, err := u.project.DownloadObject(ctx, u.bucket, u.key, opts)
if err != nil {
return nil, err
}
u.mu.Lock()
defer u.mu.Unlock()
if u.info == nil {
info := uplinkObjectToObjectInfo(u.bucket, dl.Info())
u.info = &info
}
if u.off < 0 {
if norm := u.off + u.info.ContentLength; norm > 0 {
u.off = norm
}
}
return &uplinkReadHandle{
info: u.info,
dl: dl,
}, nil
}
func (u *uplinkMultiReadHandle) Info(ctx context.Context) (*ObjectInfo, error) {
u.mu.Lock()
if u.info != nil {
u.mu.Unlock()
return u.info, nil
}
u.mu.Unlock()
// TODO(jeff): maybe we want to dedupe concurrent requests?
obj, err := u.project.StatObject(ctx, u.bucket, u.key)
if err != nil {
return nil, err
}
u.mu.Lock()
defer u.mu.Unlock()
if u.info == nil {
info := uplinkObjectToObjectInfo(u.bucket, obj)
u.info = &info
}
info := *u.info
return &info, nil
}
// uplinkReadHandle implements readHandle for *uplink.Downloads.
type uplinkReadHandle struct {
info *ObjectInfo
dl *uplink.Download
}
func (u *uplinkReadHandle) Read(p []byte) (int, error) { return u.dl.Read(p) }
func (u *uplinkReadHandle) Close() error { return u.dl.Close() }
func (u *uplinkReadHandle) Info() ObjectInfo { return *u.info }

View File

@ -5,7 +5,6 @@ package ulfs
import (
"context"
"io"
"io/ioutil"
"os"
"path/filepath"
@ -26,18 +25,12 @@ func NewLocal() *Local {
}
// Open returns a read ReadHandle for the given local path.
func (l *Local) Open(ctx context.Context, path string, opts *OpenOptions) (ReadHandle, error) {
func (l *Local) Open(ctx context.Context, path string) (MultiReadHandle, error) {
fh, err := os.Open(path)
if err != nil {
return nil, errs.Wrap(err)
}
if opts != nil {
fr := io.NewSectionReader(fh, opts.Offset, opts.Length)
return newGenericReadHandle(fr), nil
}
return newOSReadHandle(fh)
return newOSMultiReadHandle(fh)
}
// Create makes any directories necessary to create a file at path and returns a WriteHandle.

View File

@ -31,14 +31,14 @@ func (m *Mixed) Close() error {
return m.remote.Close()
}
// Open returns a ReadHandle to either a local file, remote object, or stdin.
func (m *Mixed) Open(ctx clingy.Context, loc ulloc.Location, opts *OpenOptions) (ReadHandle, error) {
// Open returns a MultiReadHandle to either a local file, remote object, or stdin.
func (m *Mixed) Open(ctx clingy.Context, loc ulloc.Location) (MultiReadHandle, error) {
if bucket, key, ok := loc.RemoteParts(); ok {
return m.remote.Open(ctx, bucket, key, opts)
return m.remote.Open(ctx, bucket, key)
} else if path, ok := loc.LocalParts(); ok {
return m.local.Open(ctx, path, opts)
return m.local.Open(ctx, path)
}
return newGenericReadHandle(ctx.Stdin()), nil
return newStdMultiReadHandle(ctx.Stdin()), nil
}
// Create returns a WriteHandle to either a local file, remote object, or stdout.

View File

@ -30,20 +30,9 @@ func (r *Remote) Close() error {
return r.project.Close()
}
// Open returns a ReadHandle for the object identified by a given bucket and key.
func (r *Remote) Open(ctx context.Context, bucket, key string, opts *OpenOptions) (ReadHandle, error) {
var downloadOpts *uplink.DownloadOptions
if opts != nil {
downloadOpts = &uplink.DownloadOptions{
Offset: opts.Offset,
Length: opts.Length,
}
}
fh, err := r.project.DownloadObject(ctx, bucket, key, downloadOpts)
if err != nil {
return nil, errs.Wrap(err)
}
return newUplinkReadHandle(bucket, fh), nil
// Open returns a MultiReadHandle for the object identified by a given bucket and key.
func (r *Remote) Open(ctx context.Context, bucket, key string) (MultiReadHandle, error) {
return newUplinkMultiReadHandle(r.project, bucket, key), nil
}
// Stat returns information about an object at the specified key.

View File

@ -6,6 +6,7 @@ package ultest
import (
"bytes"
"context"
"io"
"path/filepath"
"sort"
"strings"
@ -80,12 +81,24 @@ func (tfs *testFilesystem) Close() error {
return nil
}
func (tfs *testFilesystem) Open(ctx clingy.Context, loc ulloc.Location, opts *ulfs.OpenOptions) (_ ulfs.ReadHandle, err error) {
type nopClosingGenericReader struct{ io.ReaderAt }
func (n nopClosingGenericReader) Close() error { return nil }
func newMultiReadHandle(contents string) ulfs.MultiReadHandle {
return ulfs.NewGenericMultiReadHandle(nopClosingGenericReader{
ReaderAt: bytes.NewReader([]byte(contents)),
}, ulfs.ObjectInfo{
ContentLength: int64(len(contents)),
})
}
func (tfs *testFilesystem) Open(ctx clingy.Context, loc ulloc.Location) (ulfs.MultiReadHandle, error) {
tfs.mu.Lock()
defer tfs.mu.Unlock()
if loc.Std() {
return &byteReadHandle{Buffer: bytes.NewBufferString("-")}, nil
return newMultiReadHandle("-"), nil
}
mf, ok := tfs.files[loc]
@ -93,11 +106,7 @@ func (tfs *testFilesystem) Open(ctx clingy.Context, loc ulloc.Location, opts *ul
return nil, errs.New("file does not exist %q", loc)
}
if opts != nil {
return &byteReadHandle{Buffer: bytes.NewBufferString(mf.contents[opts.Offset:(opts.Offset + opts.Length)])}, nil
}
return &byteReadHandle{Buffer: bytes.NewBufferString(mf.contents)}, nil
return newMultiReadHandle(mf.contents), nil
}
func (tfs *testFilesystem) Create(ctx clingy.Context, loc ulloc.Location) (_ ulfs.WriteHandle, err error) {
@ -277,17 +286,6 @@ func (tfs *testFilesystem) mkdir(ctx context.Context, dir string) error {
return nil
}
//
// ulfs.ReadHandle
//
type byteReadHandle struct {
*bytes.Buffer
}
func (b *byteReadHandle) Close() error { return nil }
func (b *byteReadHandle) Info() ulfs.ObjectInfo { return ulfs.ObjectInfo{} }
//
// ulfs.WriteHandle
//