From baaa96c208e9c04a917ef4676f5176596df323a3 Mon Sep 17 00:00:00 2001 From: Jeff Wendling Date: Thu, 9 Dec 2021 14:03:42 -0500 Subject: [PATCH] cmd/uplinkng: introduce MultiWriteHandle Change-Id: I6acf93141ddfa62728164818a322120ed6956b00 --- cmd/uplinkng/cmd_cp.go | 15 ++- cmd/uplinkng/ulfs/filesystem.go | 78 +++----------- cmd/uplinkng/ulfs/handle_file.go | 21 ++++ cmd/uplinkng/ulfs/handle_generic.go | 158 ++++++++++++++++++++++++++++ cmd/uplinkng/ulfs/handle_std.go | 124 ++++++++++++++++++++++ cmd/uplinkng/ulfs/handle_uplink.go | 93 ++++++++++++++++ cmd/uplinkng/ulfs/local.go | 4 +- cmd/uplinkng/ulfs/mixed.go | 4 +- cmd/uplinkng/ulfs/remote.go | 10 +- cmd/uplinkng/ultest/filesystem.go | 31 +++--- cmd/uplinkng/ultest/setup.go | 7 +- 11 files changed, 456 insertions(+), 89 deletions(-) diff --git a/cmd/uplinkng/cmd_cp.go b/cmd/uplinkng/cmd_cp.go index d5f8511b2..2de28256b 100644 --- a/cmd/uplinkng/cmd_cp.go +++ b/cmd/uplinkng/cmd_cp.go @@ -220,7 +220,13 @@ func (c *cmdCp) copyFile(ctx clingy.Context, fs ulfs.Filesystem, source, dest ul length = rh.Info().ContentLength } - wh, err := fs.Create(ctx, dest) + mwh, err := fs.Create(ctx, dest) + if err != nil { + return err + } + defer func() { _ = mwh.Abort(ctx) }() + + wh, err := mwh.NextPart(ctx, -1) if err != nil { return err } @@ -239,7 +245,12 @@ func (c *cmdCp) copyFile(ctx clingy.Context, fs ulfs.Filesystem, source, dest ul if _, err := io.Copy(writer, rh); err != nil { return errs.Combine(err, wh.Abort()) } - return errs.Wrap(wh.Commit()) + + if err := wh.Commit(); err != nil { + return errs.Combine(err, wh.Abort()) + } + + return errs.Wrap(mwh.Commit(ctx)) } func copyVerb(source, dest ulloc.Location) string { diff --git a/cmd/uplinkng/ulfs/filesystem.go b/cmd/uplinkng/ulfs/filesystem.go index d73e6de65..e9772af45 100644 --- a/cmd/uplinkng/ulfs/filesystem.go +++ b/cmd/uplinkng/ulfs/filesystem.go @@ -6,11 +6,9 @@ package ulfs import ( "context" "io" - "os" "time" "github.com/zeebo/clingy" - "github.com/zeebo/errs" "storj.io/storj/cmd/uplinkng/ulloc" "storj.io/uplink" @@ -37,7 +35,7 @@ func (ro *RemoveOptions) isPending() bool { return ro != nil && ro.Pending } type Filesystem interface { Close() error Open(ctx clingy.Context, loc ulloc.Location) (MultiReadHandle, error) - Create(ctx clingy.Context, loc ulloc.Location) (WriteHandle, error) + Create(ctx clingy.Context, loc ulloc.Location) (MultiWriteHandle, error) Move(ctx clingy.Context, source, dest ulloc.Location) error Remove(ctx context.Context, loc ulloc.Location, opts *RemoveOptions) error List(ctx context.Context, prefix ulloc.Location, opts *ListOptions) (ObjectIterator, error) @@ -114,6 +112,18 @@ type ReadHandle interface { // write handles // +// MultiWriteHandle lets one create multiple sequential WriteHandles for +// different sections of something. +// +// The returned WriteHandle will error if data is attempted to be written +// past the provided length. A negative length implies an unknown amount +// of data, and future calls to NextPart will error. +type MultiWriteHandle interface { + NextPart(ctx context.Context, length int64) (WriteHandle, error) + Commit(ctx context.Context) error + Abort(ctx context.Context) error +} + // WriteHandle is anything that can be written to with commit/abort semantics. type WriteHandle interface { io.Writer @@ -121,68 +131,6 @@ type WriteHandle interface { Abort() error } -// uplinkWriteHandle implements writeHandle for *uplink.Uploads. -type uplinkWriteHandle uplink.Upload - -// newUplinkWriteHandle constructs an *uplinkWriteHandle from an *uplink.Upload. -func newUplinkWriteHandle(dl *uplink.Upload) *uplinkWriteHandle { - return (*uplinkWriteHandle)(dl) -} - -func (u *uplinkWriteHandle) raw() *uplink.Upload { - return (*uplink.Upload)(u) -} - -func (u *uplinkWriteHandle) Write(p []byte) (int, error) { return u.raw().Write(p) } -func (u *uplinkWriteHandle) Commit() error { return u.raw().Commit() } -func (u *uplinkWriteHandle) Abort() error { return u.raw().Abort() } - -// osWriteHandle implements writeHandle for *os.Files. -type osWriteHandle struct { - fh *os.File - done bool -} - -// newOSWriteHandle constructs an *osWriteHandle from an *os.File. -func newOSWriteHandle(fh *os.File) *osWriteHandle { - return &osWriteHandle{fh: fh} -} - -func (o *osWriteHandle) Write(p []byte) (int, error) { return o.fh.Write(p) } - -func (o *osWriteHandle) Commit() error { - if o.done { - return nil - } - o.done = true - - return o.fh.Close() -} - -func (o *osWriteHandle) Abort() error { - if o.done { - return nil - } - o.done = true - - return errs.Combine( - o.fh.Close(), - os.Remove(o.fh.Name()), - ) -} - -// genericWriteHandle implements writeHandle for an io.Writer. -type genericWriteHandle struct{ w io.Writer } - -// newGenericWriteHandle constructs a *genericWriteHandle from an io.Writer. -func newGenericWriteHandle(w io.Writer) *genericWriteHandle { - return &genericWriteHandle{w: w} -} - -func (g *genericWriteHandle) Write(p []byte) (int, error) { return g.w.Write(p) } -func (g *genericWriteHandle) Commit() error { return nil } -func (g *genericWriteHandle) Abort() error { return nil } - // // object iteration // diff --git a/cmd/uplinkng/ulfs/handle_file.go b/cmd/uplinkng/ulfs/handle_file.go index db48367be..5bc9b387a 100644 --- a/cmd/uplinkng/ulfs/handle_file.go +++ b/cmd/uplinkng/ulfs/handle_file.go @@ -28,3 +28,24 @@ func newOSMultiReadHandle(fh *os.File) (MultiReadHandle, error) { ContentLength: fi.Size(), }), nil } + +// +// write handles +// + +type fileGenericWriter os.File + +func (f *fileGenericWriter) raw() *os.File { return (*os.File)(f) } + +func (f *fileGenericWriter) WriteAt(b []byte, off int64) (int, error) { return f.raw().WriteAt(b, off) } +func (f *fileGenericWriter) Commit() error { return f.raw().Close() } +func (f *fileGenericWriter) Abort() error { + return errs.Combine( + f.raw().Close(), + os.Remove(f.raw().Name()), + ) +} + +func newOSMultiWriteHandle(fh *os.File) MultiWriteHandle { + return NewGenericMultiWriteHandle((*fileGenericWriter)(fh)) +} diff --git a/cmd/uplinkng/ulfs/handle_generic.go b/cmd/uplinkng/ulfs/handle_generic.go index 25450e22b..2f5e0bc38 100644 --- a/cmd/uplinkng/ulfs/handle_generic.go +++ b/cmd/uplinkng/ulfs/handle_generic.go @@ -128,3 +128,161 @@ func (o *genericReadHandle) Read(p []byte) (int, error) { o.len -= int64(n) return n, err } + +// +// write handles +// + +// GenericWriter is an interface that can be turned into a GenericMultiWriteHandle. +type GenericWriter interface { + io.WriterAt + Commit() error + Abort() error +} + +// GenericMultiWriteHandle implements MultiWriteHandle for *os.Files. +type GenericMultiWriteHandle struct { + w GenericWriter + + mu sync.Mutex + off int64 + tail bool + done bool + abort bool +} + +// NewGenericMultiWriteHandle constructs an *GenericMultiWriteHandle from a GenericWriter. +func NewGenericMultiWriteHandle(w GenericWriter) *GenericMultiWriteHandle { + return &GenericMultiWriteHandle{ + w: w, + } +} + +func (o *GenericMultiWriteHandle) childAbort() { + o.mu.Lock() + defer o.mu.Unlock() + + if !o.done { + o.abort = true + } +} + +func (o *GenericMultiWriteHandle) status() (done, abort bool) { + o.mu.Lock() + defer o.mu.Unlock() + + return o.done, o.abort +} + +// NextPart returns a WriteHandle expecting length bytes to be written to it. +func (o *GenericMultiWriteHandle) NextPart(ctx context.Context, length int64) (WriteHandle, error) { + o.mu.Lock() + defer o.mu.Unlock() + + if o.done { + return nil, errs.New("already closed") + } else if o.tail { + return nil, errs.New("unable to make part after tail part") + } + + w := &genericWriteHandle{ + parent: o, + w: o.w, + off: o.off, + tail: length < 0, + len: length, + } + + if w.tail { + o.tail = true + } else { + o.off += length + } + + return w, nil +} + +// Commit commits the overall GenericMultiWriteHandle. It errors if +// any parts were aborted. +func (o *GenericMultiWriteHandle) Commit(ctx context.Context) error { + o.mu.Lock() + defer o.mu.Unlock() + + if o.done { + return nil + } + o.done = true + + if o.abort { + return errs.Combine( + errs.New("commit failed: not every child was committed"), + o.w.Abort(), + ) + } + + return o.w.Commit() +} + +// Abort aborts the overall GenericMultiWriteHandle. +func (o *GenericMultiWriteHandle) Abort(ctx context.Context) error { + o.mu.Lock() + defer o.mu.Unlock() + + if o.done { + return nil + } + o.done = true + o.abort = true + + return o.w.Abort() +} + +type genericWriteHandle struct { + parent *GenericMultiWriteHandle + w GenericWriter + done bool + off int64 + tail bool + len int64 +} + +func (o *genericWriteHandle) Write(p []byte) (int, error) { + if !o.tail { + if o.len <= 0 { + return 0, errs.New("write past maximum length") + } else if o.len < int64(len(p)) { + p = p[:o.len] + } + } + n, err := o.w.WriteAt(p, o.off) + o.off += int64(n) + if !o.tail { + o.len -= int64(n) + } + return n, err +} + +func (o *genericWriteHandle) Commit() error { + if o.done { + return nil + } + o.done = true + + done, abort := o.parent.status() + if abort { + return errs.New("commit failed: parent write handle aborted") + } else if done { + return errs.New("commit failed: parent write handle done") + } + return nil +} + +func (o *genericWriteHandle) Abort() error { + if o.done { + return nil + } + o.done = true + + o.parent.childAbort() + return nil +} diff --git a/cmd/uplinkng/ulfs/handle_std.go b/cmd/uplinkng/ulfs/handle_std.go index a7fee655a..9aaa00d80 100644 --- a/cmd/uplinkng/ulfs/handle_std.go +++ b/cmd/uplinkng/ulfs/handle_std.go @@ -128,3 +128,127 @@ func (o *stdReadHandle) Read(p []byte) (int, error) { return n, err } + +// +// write handles +// + +// stdMultiWriteHandle implements MultiWriteHandle for stdouts. +type stdMultiWriteHandle struct { + stdout io.Writer + + mu sync.Mutex + next *sync.Mutex + tail bool + done bool +} + +func newStdMultiWriteHandle(stdout io.Writer) *stdMultiWriteHandle { + return &stdMultiWriteHandle{ + stdout: stdout, + next: new(sync.Mutex), + } +} + +func (s *stdMultiWriteHandle) NextPart(ctx context.Context, length int64) (WriteHandle, error) { + s.mu.Lock() + defer s.mu.Unlock() + + if s.done { + return nil, errs.New("already closed") + } else if s.tail { + return nil, errs.New("unable to make part after tail part") + } + + next := new(sync.Mutex) + next.Lock() + + w := &stdWriteHandle{ + stdout: s.stdout, + mu: s.next, + next: next, + tail: length < 0, + len: length, + } + + s.tail = w.tail + s.next = next + + return w, nil +} + +func (s *stdMultiWriteHandle) Commit(ctx context.Context) error { + s.mu.Lock() + defer s.mu.Unlock() + + s.done = true + return nil +} + +func (s *stdMultiWriteHandle) Abort(ctx context.Context) error { + s.mu.Lock() + defer s.mu.Unlock() + + s.done = true + return nil +} + +// stdWriteHandle implements WriteHandle for stdouts. +type stdWriteHandle struct { + stdout io.Writer + mu *sync.Mutex + next *sync.Mutex + tail bool + len int64 +} + +func (s *stdWriteHandle) unlockNext() { + if s.next != nil { + s.next.Unlock() + s.next = nil + } +} + +func (s *stdWriteHandle) Write(p []byte) (int, error) { + s.mu.Lock() + defer s.mu.Unlock() + + if !s.tail { + if s.len <= 0 { + return 0, errs.New("write past maximum length") + } else if s.len < int64(len(p)) { + p = p[:s.len] + } + } + + n, err := s.stdout.Write(p) + + if !s.tail { + s.len -= int64(n) + if s.len == 0 { + s.unlockNext() + } + } + + return n, err +} + +func (s *stdWriteHandle) Commit() error { + s.mu.Lock() + defer s.mu.Unlock() + + s.len = 0 + s.unlockNext() + + return nil +} + +func (s *stdWriteHandle) Abort() error { + s.mu.Lock() + defer s.mu.Unlock() + + s.len = 0 + s.unlockNext() + + return nil +} diff --git a/cmd/uplinkng/ulfs/handle_uplink.go b/cmd/uplinkng/ulfs/handle_uplink.go index db0e3f6dc..5af59fa59 100644 --- a/cmd/uplinkng/ulfs/handle_uplink.go +++ b/cmd/uplinkng/ulfs/handle_uplink.go @@ -147,3 +147,96 @@ type uplinkReadHandle struct { func (u *uplinkReadHandle) Read(p []byte) (int, error) { return u.dl.Read(p) } func (u *uplinkReadHandle) Close() error { return u.dl.Close() } func (u *uplinkReadHandle) Info() ObjectInfo { return *u.info } + +// +// write handles +// + +type uplinkMultiWriteHandle struct { + project *uplink.Project + bucket string + info uplink.UploadInfo + + mu sync.Mutex + tail bool + part uint32 +} + +func newUplinkMultiWriteHandle(project *uplink.Project, bucket string, info uplink.UploadInfo) *uplinkMultiWriteHandle { + return &uplinkMultiWriteHandle{ + project: project, + bucket: bucket, + info: info, + } +} + +func (u *uplinkMultiWriteHandle) NextPart(ctx context.Context, length int64) (WriteHandle, error) { + part, err := func() (uint32, error) { + u.mu.Lock() + defer u.mu.Unlock() + + if u.tail { + return 0, errs.New("unable to make part after tail part") + } + u.tail = length < 0 + + u.part++ + return u.part, nil + }() + if err != nil { + return nil, err + } + + ul, err := u.project.UploadPart(ctx, u.bucket, u.info.Key, u.info.UploadID, part) + if err != nil { + return nil, err + } + + return &uplinkWriteHandle{ + ul: ul, + tail: length < 0, + len: length, + }, nil +} + +func (u *uplinkMultiWriteHandle) Commit(ctx context.Context) error { + _, err := u.project.CommitUpload(ctx, u.bucket, u.info.Key, u.info.UploadID, nil) + return err +} + +func (u *uplinkMultiWriteHandle) Abort(ctx context.Context) error { + return u.project.AbortUpload(ctx, u.bucket, u.info.Key, u.info.UploadID) +} + +// uplinkWriteHandle implements writeHandle for *uplink.Uploads. +type uplinkWriteHandle struct { + ul *uplink.PartUpload + tail bool + len int64 +} + +func (u *uplinkWriteHandle) Write(p []byte) (int, error) { + if !u.tail { + if u.len <= 0 { + return 0, errs.New("write past maximum length") + } else if u.len < int64(len(p)) { + p = p[:u.len] + } + } + + n, err := u.ul.Write(p) + + if !u.tail { + u.len -= int64(n) + } + + return n, err +} + +func (u *uplinkWriteHandle) Commit() error { + return u.ul.Commit() +} + +func (u *uplinkWriteHandle) Abort() error { + return u.ul.Abort() +} diff --git a/cmd/uplinkng/ulfs/local.go b/cmd/uplinkng/ulfs/local.go index bbb039263..9dacdf3b8 100644 --- a/cmd/uplinkng/ulfs/local.go +++ b/cmd/uplinkng/ulfs/local.go @@ -34,7 +34,7 @@ func (l *Local) Open(ctx context.Context, path string) (MultiReadHandle, error) } // Create makes any directories necessary to create a file at path and returns a WriteHandle. -func (l *Local) Create(ctx context.Context, path string) (WriteHandle, error) { +func (l *Local) Create(ctx context.Context, path string) (MultiWriteHandle, error) { fi, err := os.Stat(path) if err != nil && !os.IsNotExist(err) { return nil, errs.Wrap(err) @@ -51,7 +51,7 @@ func (l *Local) Create(ctx context.Context, path string) (WriteHandle, error) { if err != nil { return nil, errs.Wrap(err) } - return newOSWriteHandle(fh), nil + return newOSMultiWriteHandle(fh), nil } // Move moves file to provided path. diff --git a/cmd/uplinkng/ulfs/mixed.go b/cmd/uplinkng/ulfs/mixed.go index ec0a3ca07..65bef0591 100644 --- a/cmd/uplinkng/ulfs/mixed.go +++ b/cmd/uplinkng/ulfs/mixed.go @@ -42,13 +42,13 @@ func (m *Mixed) Open(ctx clingy.Context, loc ulloc.Location) (MultiReadHandle, e } // Create returns a WriteHandle to either a local file, remote object, or stdout. -func (m *Mixed) Create(ctx clingy.Context, loc ulloc.Location) (WriteHandle, error) { +func (m *Mixed) Create(ctx clingy.Context, loc ulloc.Location) (MultiWriteHandle, error) { if bucket, key, ok := loc.RemoteParts(); ok { return m.remote.Create(ctx, bucket, key) } else if path, ok := loc.LocalParts(); ok { return m.local.Create(ctx, path) } - return newGenericWriteHandle(ctx.Stdout()), nil + return newStdMultiWriteHandle(ctx.Stdout()), nil } // Move moves either a local file or remote object. diff --git a/cmd/uplinkng/ulfs/remote.go b/cmd/uplinkng/ulfs/remote.go index 0fd40c4e2..3acc53f0a 100644 --- a/cmd/uplinkng/ulfs/remote.go +++ b/cmd/uplinkng/ulfs/remote.go @@ -45,13 +45,13 @@ func (r *Remote) Stat(ctx context.Context, bucket, key string) (*ObjectInfo, err return &stat, nil } -// Create returns a WriteHandle for the object identified by a given bucket and key. -func (r *Remote) Create(ctx context.Context, bucket, key string) (WriteHandle, error) { - fh, err := r.project.UploadObject(ctx, bucket, key, nil) +// Create returns a MultiWriteHandle for the object identified by a given bucket and key. +func (r *Remote) Create(ctx context.Context, bucket, key string) (MultiWriteHandle, error) { + info, err := r.project.BeginUpload(ctx, bucket, key, nil) if err != nil { - return nil, errs.Wrap(err) + return nil, err } - return newUplinkWriteHandle(fh), nil + return newUplinkMultiWriteHandle(r.project, bucket, info), nil } // Move moves object to provided key and bucket. diff --git a/cmd/uplinkng/ultest/filesystem.go b/cmd/uplinkng/ultest/filesystem.go index 9e83d8020..c32737992 100644 --- a/cmd/uplinkng/ultest/filesystem.go +++ b/cmd/uplinkng/ultest/filesystem.go @@ -69,7 +69,7 @@ func (tfs *testFilesystem) Pending() (files []File) { for _, h := range mh { files = append(files, File{ Loc: loc.String(), - Contents: h.buf.String(), + Contents: string(h.buf), }) } } @@ -109,12 +109,12 @@ func (tfs *testFilesystem) Open(ctx clingy.Context, loc ulloc.Location) (ulfs.Mu return newMultiReadHandle(mf.contents), nil } -func (tfs *testFilesystem) Create(ctx clingy.Context, loc ulloc.Location) (_ ulfs.WriteHandle, err error) { +func (tfs *testFilesystem) Create(ctx clingy.Context, loc ulloc.Location) (_ ulfs.MultiWriteHandle, err error) { tfs.mu.Lock() defer tfs.mu.Unlock() if loc.Std() { - return new(discardWriteHandle), nil + return ulfs.NewGenericMultiWriteHandle(new(discardWriteHandle)), nil } if bucket, _, ok := loc.RemoteParts(); ok { @@ -135,7 +135,6 @@ func (tfs *testFilesystem) Create(ctx clingy.Context, loc ulloc.Location) (_ ulf tfs.created++ wh := &memWriteHandle{ - buf: bytes.NewBuffer(nil), loc: loc, tfs: tfs, cre: tfs.created, @@ -145,7 +144,7 @@ func (tfs *testFilesystem) Create(ctx clingy.Context, loc ulloc.Location) (_ ulf tfs.pending[loc] = append(tfs.pending[loc], wh) } - return wh, nil + return ulfs.NewGenericMultiWriteHandle(wh), nil } func (tfs *testFilesystem) Move(ctx clingy.Context, source, dest ulloc.Location) error { @@ -291,15 +290,22 @@ func (tfs *testFilesystem) mkdir(ctx context.Context, dir string) error { // type memWriteHandle struct { - buf *bytes.Buffer + buf []byte loc ulloc.Location tfs *testFilesystem cre int64 done bool } -func (b *memWriteHandle) Write(p []byte) (int, error) { - return b.buf.Write(p) +func (b *memWriteHandle) WriteAt(p []byte, off int64) (int, error) { + if b.done { + return 0, errs.New("write to closed handle") + } + end := int64(len(p)) + off + if grow := end - int64(len(b.buf)); grow > 0 { + b.buf = append(b.buf, make([]byte, grow)...) + } + return copy(b.buf[off:], p), nil } func (b *memWriteHandle) Commit() error { @@ -315,9 +321,10 @@ func (b *memWriteHandle) Commit() error { } b.tfs.files[b.loc] = memFileData{ - contents: b.buf.String(), + contents: string(b.buf), created: b.cre, } + return nil } @@ -357,9 +364,9 @@ func (b *memWriteHandle) close() error { type discardWriteHandle struct{} -func (discardWriteHandle) Write(p []byte) (int, error) { return len(p), nil } -func (discardWriteHandle) Commit() error { return nil } -func (discardWriteHandle) Abort() error { return nil } +func (discardWriteHandle) WriteAt(p []byte, off int64) (int, error) { return len(p), nil } +func (discardWriteHandle) Commit() error { return nil } +func (discardWriteHandle) Abort() error { return nil } // // ulfs.ObjectIterator diff --git a/cmd/uplinkng/ultest/setup.go b/cmd/uplinkng/ultest/setup.go index fb716e5e3..a8aaafc5c 100644 --- a/cmd/uplinkng/ultest/setup.go +++ b/cmd/uplinkng/ultest/setup.go @@ -139,7 +139,11 @@ func WithFile(location string, contents ...string) ExecuteOption { tfs.ensureBucket(bucket) } - wh, err := tfs.Create(ctx, loc) + mwh, err := tfs.Create(ctx, loc) + require.NoError(t, err) + defer func() { _ = mwh.Abort(ctx) }() + + wh, err := mwh.NextPart(ctx, -1) require.NoError(t, err) defer func() { _ = wh.Abort() }() @@ -153,6 +157,7 @@ func WithFile(location string, contents ...string) ExecuteOption { } require.NoError(t, wh.Commit()) + require.NoError(t, mwh.Commit(ctx)) }} }