cmd/uplinkng: introduce MultiWriteHandle

Change-Id: I6acf93141ddfa62728164818a322120ed6956b00
This commit is contained in:
Jeff Wendling 2021-12-09 14:03:42 -05:00
parent 34890c9195
commit baaa96c208
11 changed files with 456 additions and 89 deletions

View File

@ -220,7 +220,13 @@ func (c *cmdCp) copyFile(ctx clingy.Context, fs ulfs.Filesystem, source, dest ul
length = rh.Info().ContentLength
}
wh, err := fs.Create(ctx, dest)
mwh, err := fs.Create(ctx, dest)
if err != nil {
return err
}
defer func() { _ = mwh.Abort(ctx) }()
wh, err := mwh.NextPart(ctx, -1)
if err != nil {
return err
}
@ -239,7 +245,12 @@ func (c *cmdCp) copyFile(ctx clingy.Context, fs ulfs.Filesystem, source, dest ul
if _, err := io.Copy(writer, rh); err != nil {
return errs.Combine(err, wh.Abort())
}
return errs.Wrap(wh.Commit())
if err := wh.Commit(); err != nil {
return errs.Combine(err, wh.Abort())
}
return errs.Wrap(mwh.Commit(ctx))
}
func copyVerb(source, dest ulloc.Location) string {

View File

@ -6,11 +6,9 @@ package ulfs
import (
"context"
"io"
"os"
"time"
"github.com/zeebo/clingy"
"github.com/zeebo/errs"
"storj.io/storj/cmd/uplinkng/ulloc"
"storj.io/uplink"
@ -37,7 +35,7 @@ func (ro *RemoveOptions) isPending() bool { return ro != nil && ro.Pending }
type Filesystem interface {
Close() error
Open(ctx clingy.Context, loc ulloc.Location) (MultiReadHandle, error)
Create(ctx clingy.Context, loc ulloc.Location) (WriteHandle, error)
Create(ctx clingy.Context, loc ulloc.Location) (MultiWriteHandle, error)
Move(ctx clingy.Context, source, dest ulloc.Location) error
Remove(ctx context.Context, loc ulloc.Location, opts *RemoveOptions) error
List(ctx context.Context, prefix ulloc.Location, opts *ListOptions) (ObjectIterator, error)
@ -114,6 +112,18 @@ type ReadHandle interface {
// write handles
//
// MultiWriteHandle lets one create multiple sequential WriteHandles for
// different sections of something.
//
// The returned WriteHandle will error if data is attempted to be written
// past the provided length. A negative length implies an unknown amount
// of data, and future calls to NextPart will error.
type MultiWriteHandle interface {
NextPart(ctx context.Context, length int64) (WriteHandle, error)
Commit(ctx context.Context) error
Abort(ctx context.Context) error
}
// WriteHandle is anything that can be written to with commit/abort semantics.
type WriteHandle interface {
io.Writer
@ -121,68 +131,6 @@ type WriteHandle interface {
Abort() error
}
// uplinkWriteHandle implements writeHandle for *uplink.Uploads.
type uplinkWriteHandle uplink.Upload
// newUplinkWriteHandle constructs an *uplinkWriteHandle from an *uplink.Upload.
func newUplinkWriteHandle(dl *uplink.Upload) *uplinkWriteHandle {
return (*uplinkWriteHandle)(dl)
}
func (u *uplinkWriteHandle) raw() *uplink.Upload {
return (*uplink.Upload)(u)
}
func (u *uplinkWriteHandle) Write(p []byte) (int, error) { return u.raw().Write(p) }
func (u *uplinkWriteHandle) Commit() error { return u.raw().Commit() }
func (u *uplinkWriteHandle) Abort() error { return u.raw().Abort() }
// osWriteHandle implements writeHandle for *os.Files.
type osWriteHandle struct {
fh *os.File
done bool
}
// newOSWriteHandle constructs an *osWriteHandle from an *os.File.
func newOSWriteHandle(fh *os.File) *osWriteHandle {
return &osWriteHandle{fh: fh}
}
func (o *osWriteHandle) Write(p []byte) (int, error) { return o.fh.Write(p) }
func (o *osWriteHandle) Commit() error {
if o.done {
return nil
}
o.done = true
return o.fh.Close()
}
func (o *osWriteHandle) Abort() error {
if o.done {
return nil
}
o.done = true
return errs.Combine(
o.fh.Close(),
os.Remove(o.fh.Name()),
)
}
// genericWriteHandle implements writeHandle for an io.Writer.
type genericWriteHandle struct{ w io.Writer }
// newGenericWriteHandle constructs a *genericWriteHandle from an io.Writer.
func newGenericWriteHandle(w io.Writer) *genericWriteHandle {
return &genericWriteHandle{w: w}
}
func (g *genericWriteHandle) Write(p []byte) (int, error) { return g.w.Write(p) }
func (g *genericWriteHandle) Commit() error { return nil }
func (g *genericWriteHandle) Abort() error { return nil }
//
// object iteration
//

View File

@ -28,3 +28,24 @@ func newOSMultiReadHandle(fh *os.File) (MultiReadHandle, error) {
ContentLength: fi.Size(),
}), nil
}
//
// write handles
//
type fileGenericWriter os.File
func (f *fileGenericWriter) raw() *os.File { return (*os.File)(f) }
func (f *fileGenericWriter) WriteAt(b []byte, off int64) (int, error) { return f.raw().WriteAt(b, off) }
func (f *fileGenericWriter) Commit() error { return f.raw().Close() }
func (f *fileGenericWriter) Abort() error {
return errs.Combine(
f.raw().Close(),
os.Remove(f.raw().Name()),
)
}
func newOSMultiWriteHandle(fh *os.File) MultiWriteHandle {
return NewGenericMultiWriteHandle((*fileGenericWriter)(fh))
}

View File

@ -128,3 +128,161 @@ func (o *genericReadHandle) Read(p []byte) (int, error) {
o.len -= int64(n)
return n, err
}
//
// write handles
//
// GenericWriter is an interface that can be turned into a GenericMultiWriteHandle.
type GenericWriter interface {
io.WriterAt
Commit() error
Abort() error
}
// GenericMultiWriteHandle implements MultiWriteHandle for *os.Files.
type GenericMultiWriteHandle struct {
w GenericWriter
mu sync.Mutex
off int64
tail bool
done bool
abort bool
}
// NewGenericMultiWriteHandle constructs an *GenericMultiWriteHandle from a GenericWriter.
func NewGenericMultiWriteHandle(w GenericWriter) *GenericMultiWriteHandle {
return &GenericMultiWriteHandle{
w: w,
}
}
func (o *GenericMultiWriteHandle) childAbort() {
o.mu.Lock()
defer o.mu.Unlock()
if !o.done {
o.abort = true
}
}
func (o *GenericMultiWriteHandle) status() (done, abort bool) {
o.mu.Lock()
defer o.mu.Unlock()
return o.done, o.abort
}
// NextPart returns a WriteHandle expecting length bytes to be written to it.
func (o *GenericMultiWriteHandle) NextPart(ctx context.Context, length int64) (WriteHandle, error) {
o.mu.Lock()
defer o.mu.Unlock()
if o.done {
return nil, errs.New("already closed")
} else if o.tail {
return nil, errs.New("unable to make part after tail part")
}
w := &genericWriteHandle{
parent: o,
w: o.w,
off: o.off,
tail: length < 0,
len: length,
}
if w.tail {
o.tail = true
} else {
o.off += length
}
return w, nil
}
// Commit commits the overall GenericMultiWriteHandle. It errors if
// any parts were aborted.
func (o *GenericMultiWriteHandle) Commit(ctx context.Context) error {
o.mu.Lock()
defer o.mu.Unlock()
if o.done {
return nil
}
o.done = true
if o.abort {
return errs.Combine(
errs.New("commit failed: not every child was committed"),
o.w.Abort(),
)
}
return o.w.Commit()
}
// Abort aborts the overall GenericMultiWriteHandle.
func (o *GenericMultiWriteHandle) Abort(ctx context.Context) error {
o.mu.Lock()
defer o.mu.Unlock()
if o.done {
return nil
}
o.done = true
o.abort = true
return o.w.Abort()
}
type genericWriteHandle struct {
parent *GenericMultiWriteHandle
w GenericWriter
done bool
off int64
tail bool
len int64
}
func (o *genericWriteHandle) Write(p []byte) (int, error) {
if !o.tail {
if o.len <= 0 {
return 0, errs.New("write past maximum length")
} else if o.len < int64(len(p)) {
p = p[:o.len]
}
}
n, err := o.w.WriteAt(p, o.off)
o.off += int64(n)
if !o.tail {
o.len -= int64(n)
}
return n, err
}
func (o *genericWriteHandle) Commit() error {
if o.done {
return nil
}
o.done = true
done, abort := o.parent.status()
if abort {
return errs.New("commit failed: parent write handle aborted")
} else if done {
return errs.New("commit failed: parent write handle done")
}
return nil
}
func (o *genericWriteHandle) Abort() error {
if o.done {
return nil
}
o.done = true
o.parent.childAbort()
return nil
}

View File

@ -128,3 +128,127 @@ func (o *stdReadHandle) Read(p []byte) (int, error) {
return n, err
}
//
// write handles
//
// stdMultiWriteHandle implements MultiWriteHandle for stdouts.
type stdMultiWriteHandle struct {
stdout io.Writer
mu sync.Mutex
next *sync.Mutex
tail bool
done bool
}
func newStdMultiWriteHandle(stdout io.Writer) *stdMultiWriteHandle {
return &stdMultiWriteHandle{
stdout: stdout,
next: new(sync.Mutex),
}
}
func (s *stdMultiWriteHandle) NextPart(ctx context.Context, length int64) (WriteHandle, error) {
s.mu.Lock()
defer s.mu.Unlock()
if s.done {
return nil, errs.New("already closed")
} else if s.tail {
return nil, errs.New("unable to make part after tail part")
}
next := new(sync.Mutex)
next.Lock()
w := &stdWriteHandle{
stdout: s.stdout,
mu: s.next,
next: next,
tail: length < 0,
len: length,
}
s.tail = w.tail
s.next = next
return w, nil
}
func (s *stdMultiWriteHandle) Commit(ctx context.Context) error {
s.mu.Lock()
defer s.mu.Unlock()
s.done = true
return nil
}
func (s *stdMultiWriteHandle) Abort(ctx context.Context) error {
s.mu.Lock()
defer s.mu.Unlock()
s.done = true
return nil
}
// stdWriteHandle implements WriteHandle for stdouts.
type stdWriteHandle struct {
stdout io.Writer
mu *sync.Mutex
next *sync.Mutex
tail bool
len int64
}
func (s *stdWriteHandle) unlockNext() {
if s.next != nil {
s.next.Unlock()
s.next = nil
}
}
func (s *stdWriteHandle) Write(p []byte) (int, error) {
s.mu.Lock()
defer s.mu.Unlock()
if !s.tail {
if s.len <= 0 {
return 0, errs.New("write past maximum length")
} else if s.len < int64(len(p)) {
p = p[:s.len]
}
}
n, err := s.stdout.Write(p)
if !s.tail {
s.len -= int64(n)
if s.len == 0 {
s.unlockNext()
}
}
return n, err
}
func (s *stdWriteHandle) Commit() error {
s.mu.Lock()
defer s.mu.Unlock()
s.len = 0
s.unlockNext()
return nil
}
func (s *stdWriteHandle) Abort() error {
s.mu.Lock()
defer s.mu.Unlock()
s.len = 0
s.unlockNext()
return nil
}

View File

@ -147,3 +147,96 @@ type uplinkReadHandle struct {
func (u *uplinkReadHandle) Read(p []byte) (int, error) { return u.dl.Read(p) }
func (u *uplinkReadHandle) Close() error { return u.dl.Close() }
func (u *uplinkReadHandle) Info() ObjectInfo { return *u.info }
//
// write handles
//
type uplinkMultiWriteHandle struct {
project *uplink.Project
bucket string
info uplink.UploadInfo
mu sync.Mutex
tail bool
part uint32
}
func newUplinkMultiWriteHandle(project *uplink.Project, bucket string, info uplink.UploadInfo) *uplinkMultiWriteHandle {
return &uplinkMultiWriteHandle{
project: project,
bucket: bucket,
info: info,
}
}
func (u *uplinkMultiWriteHandle) NextPart(ctx context.Context, length int64) (WriteHandle, error) {
part, err := func() (uint32, error) {
u.mu.Lock()
defer u.mu.Unlock()
if u.tail {
return 0, errs.New("unable to make part after tail part")
}
u.tail = length < 0
u.part++
return u.part, nil
}()
if err != nil {
return nil, err
}
ul, err := u.project.UploadPart(ctx, u.bucket, u.info.Key, u.info.UploadID, part)
if err != nil {
return nil, err
}
return &uplinkWriteHandle{
ul: ul,
tail: length < 0,
len: length,
}, nil
}
func (u *uplinkMultiWriteHandle) Commit(ctx context.Context) error {
_, err := u.project.CommitUpload(ctx, u.bucket, u.info.Key, u.info.UploadID, nil)
return err
}
func (u *uplinkMultiWriteHandle) Abort(ctx context.Context) error {
return u.project.AbortUpload(ctx, u.bucket, u.info.Key, u.info.UploadID)
}
// uplinkWriteHandle implements writeHandle for *uplink.Uploads.
type uplinkWriteHandle struct {
ul *uplink.PartUpload
tail bool
len int64
}
func (u *uplinkWriteHandle) Write(p []byte) (int, error) {
if !u.tail {
if u.len <= 0 {
return 0, errs.New("write past maximum length")
} else if u.len < int64(len(p)) {
p = p[:u.len]
}
}
n, err := u.ul.Write(p)
if !u.tail {
u.len -= int64(n)
}
return n, err
}
func (u *uplinkWriteHandle) Commit() error {
return u.ul.Commit()
}
func (u *uplinkWriteHandle) Abort() error {
return u.ul.Abort()
}

View File

@ -34,7 +34,7 @@ func (l *Local) Open(ctx context.Context, path string) (MultiReadHandle, error)
}
// Create makes any directories necessary to create a file at path and returns a WriteHandle.
func (l *Local) Create(ctx context.Context, path string) (WriteHandle, error) {
func (l *Local) Create(ctx context.Context, path string) (MultiWriteHandle, error) {
fi, err := os.Stat(path)
if err != nil && !os.IsNotExist(err) {
return nil, errs.Wrap(err)
@ -51,7 +51,7 @@ func (l *Local) Create(ctx context.Context, path string) (WriteHandle, error) {
if err != nil {
return nil, errs.Wrap(err)
}
return newOSWriteHandle(fh), nil
return newOSMultiWriteHandle(fh), nil
}
// Move moves file to provided path.

View File

@ -42,13 +42,13 @@ func (m *Mixed) Open(ctx clingy.Context, loc ulloc.Location) (MultiReadHandle, e
}
// Create returns a WriteHandle to either a local file, remote object, or stdout.
func (m *Mixed) Create(ctx clingy.Context, loc ulloc.Location) (WriteHandle, error) {
func (m *Mixed) Create(ctx clingy.Context, loc ulloc.Location) (MultiWriteHandle, error) {
if bucket, key, ok := loc.RemoteParts(); ok {
return m.remote.Create(ctx, bucket, key)
} else if path, ok := loc.LocalParts(); ok {
return m.local.Create(ctx, path)
}
return newGenericWriteHandle(ctx.Stdout()), nil
return newStdMultiWriteHandle(ctx.Stdout()), nil
}
// Move moves either a local file or remote object.

View File

@ -45,13 +45,13 @@ func (r *Remote) Stat(ctx context.Context, bucket, key string) (*ObjectInfo, err
return &stat, nil
}
// Create returns a WriteHandle for the object identified by a given bucket and key.
func (r *Remote) Create(ctx context.Context, bucket, key string) (WriteHandle, error) {
fh, err := r.project.UploadObject(ctx, bucket, key, nil)
// Create returns a MultiWriteHandle for the object identified by a given bucket and key.
func (r *Remote) Create(ctx context.Context, bucket, key string) (MultiWriteHandle, error) {
info, err := r.project.BeginUpload(ctx, bucket, key, nil)
if err != nil {
return nil, errs.Wrap(err)
return nil, err
}
return newUplinkWriteHandle(fh), nil
return newUplinkMultiWriteHandle(r.project, bucket, info), nil
}
// Move moves object to provided key and bucket.

View File

@ -69,7 +69,7 @@ func (tfs *testFilesystem) Pending() (files []File) {
for _, h := range mh {
files = append(files, File{
Loc: loc.String(),
Contents: h.buf.String(),
Contents: string(h.buf),
})
}
}
@ -109,12 +109,12 @@ func (tfs *testFilesystem) Open(ctx clingy.Context, loc ulloc.Location) (ulfs.Mu
return newMultiReadHandle(mf.contents), nil
}
func (tfs *testFilesystem) Create(ctx clingy.Context, loc ulloc.Location) (_ ulfs.WriteHandle, err error) {
func (tfs *testFilesystem) Create(ctx clingy.Context, loc ulloc.Location) (_ ulfs.MultiWriteHandle, err error) {
tfs.mu.Lock()
defer tfs.mu.Unlock()
if loc.Std() {
return new(discardWriteHandle), nil
return ulfs.NewGenericMultiWriteHandle(new(discardWriteHandle)), nil
}
if bucket, _, ok := loc.RemoteParts(); ok {
@ -135,7 +135,6 @@ func (tfs *testFilesystem) Create(ctx clingy.Context, loc ulloc.Location) (_ ulf
tfs.created++
wh := &memWriteHandle{
buf: bytes.NewBuffer(nil),
loc: loc,
tfs: tfs,
cre: tfs.created,
@ -145,7 +144,7 @@ func (tfs *testFilesystem) Create(ctx clingy.Context, loc ulloc.Location) (_ ulf
tfs.pending[loc] = append(tfs.pending[loc], wh)
}
return wh, nil
return ulfs.NewGenericMultiWriteHandle(wh), nil
}
func (tfs *testFilesystem) Move(ctx clingy.Context, source, dest ulloc.Location) error {
@ -291,15 +290,22 @@ func (tfs *testFilesystem) mkdir(ctx context.Context, dir string) error {
//
type memWriteHandle struct {
buf *bytes.Buffer
buf []byte
loc ulloc.Location
tfs *testFilesystem
cre int64
done bool
}
func (b *memWriteHandle) Write(p []byte) (int, error) {
return b.buf.Write(p)
func (b *memWriteHandle) WriteAt(p []byte, off int64) (int, error) {
if b.done {
return 0, errs.New("write to closed handle")
}
end := int64(len(p)) + off
if grow := end - int64(len(b.buf)); grow > 0 {
b.buf = append(b.buf, make([]byte, grow)...)
}
return copy(b.buf[off:], p), nil
}
func (b *memWriteHandle) Commit() error {
@ -315,9 +321,10 @@ func (b *memWriteHandle) Commit() error {
}
b.tfs.files[b.loc] = memFileData{
contents: b.buf.String(),
contents: string(b.buf),
created: b.cre,
}
return nil
}
@ -357,9 +364,9 @@ func (b *memWriteHandle) close() error {
type discardWriteHandle struct{}
func (discardWriteHandle) Write(p []byte) (int, error) { return len(p), nil }
func (discardWriteHandle) Commit() error { return nil }
func (discardWriteHandle) Abort() error { return nil }
func (discardWriteHandle) WriteAt(p []byte, off int64) (int, error) { return len(p), nil }
func (discardWriteHandle) Commit() error { return nil }
func (discardWriteHandle) Abort() error { return nil }
//
// ulfs.ObjectIterator

View File

@ -139,7 +139,11 @@ func WithFile(location string, contents ...string) ExecuteOption {
tfs.ensureBucket(bucket)
}
wh, err := tfs.Create(ctx, loc)
mwh, err := tfs.Create(ctx, loc)
require.NoError(t, err)
defer func() { _ = mwh.Abort(ctx) }()
wh, err := mwh.NextPart(ctx, -1)
require.NoError(t, err)
defer func() { _ = wh.Abort() }()
@ -153,6 +157,7 @@ func WithFile(location string, contents ...string) ExecuteOption {
}
require.NoError(t, wh.Commit())
require.NoError(t, mwh.Commit(ctx))
}}
}