From f3c58174c4d3c592b8c683d0d05188f39b53ff4a Mon Sep 17 00:00:00 2001 From: Jeff Wendling Date: Thu, 8 Jun 2023 11:42:03 -0400 Subject: [PATCH] cmd/uplink: only use new code path for uploads downloads still need the old copy code because they aren't parallel in the same way uploads are. revert all the code that removed the parallel copy, only use the non-parallel copy for uploads, and add back the parallelism and chunk size flags and have them set the maximum concurrent pieces flags to values based on each other when only one is set for backwards compatibility. mostly reverts 54ef1c8ca23cac8c4ab803f6123ab2d06fb84d4c Change-Id: I8b5f62bf18a6548fa60865c6c61b5f34fbcec14c --- cmd/uplink/cmd_cp.go | 244 ++++++++++++++++++++++++--- cmd/uplink/cmd_cp_test.go | 48 ++++++ cmd/uplink/ulfs/filesystem.go | 18 +- cmd/uplink/ulfs/handle_file.go | 29 +--- cmd/uplink/ulfs/handle_generic.go | 158 +++++++++++++++++ cmd/uplink/ulfs/handle_std.go | 144 +++++++++++++++- cmd/uplink/ulfs/handle_std_test.go | 101 +++++++++++ cmd/uplink/ulfs/handle_uplink.go | 127 +++++++++++++- cmd/uplink/ulfs/local.go | 6 +- cmd/uplink/ulfs/local_backend_mem.go | 10 +- cmd/uplink/ulfs/mixed.go | 4 +- cmd/uplink/ulfs/remote.go | 23 +-- cmd/uplink/ultest/filesystem.go | 13 +- cmd/uplink/ultest/setup.go | 7 +- go.mod | 2 +- go.sum | 4 +- 16 files changed, 845 insertions(+), 93 deletions(-) create mode 100644 cmd/uplink/ulfs/handle_std_test.go diff --git a/cmd/uplink/cmd_cp.go b/cmd/uplink/cmd_cp.go index f13ebbbb1..38d9a57fb 100644 --- a/cmd/uplink/cmd_cp.go +++ b/cmd/uplink/cmd_cp.go @@ -6,6 +6,7 @@ package main import ( "bufio" "context" + "errors" "fmt" "io" "os" @@ -20,6 +21,7 @@ import ( "github.com/zeebo/clingy" "github.com/zeebo/errs" + "storj.io/common/context2" "storj.io/common/fpath" "storj.io/common/memory" "storj.io/common/rpc/rpcpool" @@ -53,6 +55,8 @@ type cmdCp struct { locs []ulloc.Location } +const maxPartCount int64 = 10000 + func newCmdCp(ex ulext.External) *cmdCp { return &cmdCp{ex: ex} } @@ -81,8 +85,8 @@ func (c *cmdCp) Setup(params clingy.Parameters) { ).(bool) c.byteRange = params.Flag("range", "Downloads the specified range bytes of an object. For more information about the HTTP Range header, see https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35", "").(string) - c.parallelism = params.Flag("parallelism", "Deprecated", 1, - clingy.Hidden, + parallelism := params.Flag("parallelism", "Controls how many parallel chunks to upload/download from a file", nil, + clingy.Optional, clingy.Short('p'), clingy.Transform(strconv.Atoi), clingy.Transform(func(n int) (int, error) { @@ -91,9 +95,8 @@ func (c *cmdCp) Setup(params clingy.Parameters) { } return n, nil }), - ).(int) - c.parallelismChunkSize = params.Flag("parallelism-chunk-size", "Deprecated", memory.Size(0), - clingy.Hidden, + ).(*int) + c.parallelismChunkSize = params.Flag("parallelism-chunk-size", "Set the size of the chunks for parallelism, 0 means automatic adjustment", memory.Size(0), clingy.Transform(memory.ParseString), clingy.Transform(func(n int64) (memory.Size, error) { if n < 0 { @@ -104,13 +107,14 @@ func (c *cmdCp) Setup(params clingy.Parameters) { ).(memory.Size) c.uploadConfig = testuplink.DefaultConcurrentSegmentUploadsConfig() - c.uploadConfig.SchedulerOptions.MaximumConcurrent = params.Flag( + maxConcurrent := params.Flag( "maximum-concurrent-pieces", "Maximum concurrent pieces to upload at once per transfer", - c.uploadConfig.SchedulerOptions.MaximumConcurrent, + nil, + clingy.Optional, clingy.Transform(strconv.Atoi), clingy.Advanced, - ).(int) + ).(*int) c.uploadConfig.SchedulerOptions.MaximumConcurrentHandles = params.Flag( "maximum-concurrent-segments", "Maximum concurrent segments to upload at once per transfer", @@ -129,6 +133,28 @@ func (c *cmdCp) Setup(params clingy.Parameters) { clingy.Advanced, ).(string) + { // handle backwards compatibility around parallelism and maximum concurrent pieces + addr := func(x int) *int { return &x } + + switch { + // if neither are actively set, use defaults + case parallelism == nil && maxConcurrent == nil: + parallelism = addr(1) + maxConcurrent = addr(c.uploadConfig.SchedulerOptions.MaximumConcurrent) + + // if parallelism is not set, use a value based on maxConcurrent + case parallelism == nil: + parallelism = addr((*maxConcurrent + 99) / 100) + + // if maxConcurrent is not set, use a value based on parallelism + case maxConcurrent == nil: + maxConcurrent = addr(100 * *parallelism) + } + + c.uploadConfig.SchedulerOptions.MaximumConcurrent = *maxConcurrent + c.parallelism = *parallelism + } + c.inmemoryEC = params.Flag("inmemory-erasure-coding", "Keep erasure-coded pieces in-memory instead of writing them on the disk during upload", false, clingy.Transform(strconv.ParseBool), clingy.Boolean, @@ -384,24 +410,57 @@ func (c *cmdCp) copyFile(ctx context.Context, fs ulfs.Filesystem, source, dest u } defer func() { _ = mrh.Close() }() - wh, err := fs.Create(ctx, dest, &ulfs.CreateOptions{ + mwh, err := fs.Create(ctx, dest, &ulfs.CreateOptions{ Expires: c.expires, Metadata: c.metadata, }) if err != nil { return err } - defer func() { _ = wh.Abort() }() + defer func() { _ = mwh.Abort(ctx) }() - return errs.Wrap(c.copy( + // if we're uploading, do a single part of maximum size + if dest.Remote() { + return errs.Wrap(c.singleCopy( + ctx, + source, dest, + mrh, mwh, + offset, length, + bar, + )) + } + + partSize, err := c.calculatePartSize(mrh.Length(), c.parallelismChunkSize.Int64()) + if err != nil { + return err + } + + return errs.Wrap(c.parallelCopy( ctx, source, dest, - wh, mrh, + mrh, mwh, + c.parallelism, partSize, offset, length, bar, )) } +// calculatePartSize returns the needed part size in order to upload the file with size of 'length'. +// It hereby respects if the client requests/prefers a certain size and only increases if needed. +func (c *cmdCp) calculatePartSize(length, preferredSize int64) (requiredSize int64, err error) { + segC := (length / maxPartCount / (memory.MiB * 64).Int64()) + 1 + requiredSize = segC * (memory.MiB * 64).Int64() + switch { + case preferredSize == 0: + return requiredSize, nil + case requiredSize <= preferredSize: + return preferredSize, nil + default: + return 0, errs.New(fmt.Sprintf("the specified chunk size %s is too small, requires %s or larger", + memory.FormatBytes(preferredSize), memory.FormatBytes(requiredSize))) + } +} + func copyVerbing(source, dest ulloc.Location) (verb string) { return copyVerb(source, dest) + "ing" } @@ -429,11 +488,142 @@ func joinDestWith(dest ulloc.Location, suffix string) ulloc.Location { return dest } -func (c *cmdCp) copy( +func (c *cmdCp) parallelCopy( ctx context.Context, source, dest ulloc.Location, - dst ulfs.WriteHandle, src ulfs.MultiReadHandle, + dst ulfs.MultiWriteHandle, + p int, chunkSize int64, + offset, length int64, + bar *mpb.Bar) error { + + if offset != 0 { + if err := src.SetOffset(offset); err != nil { + return err + } + } + + var ( + limiter = sync2.NewLimiter(p) + es errs.Group + mu sync.Mutex + ) + + ctx, cancel := context.WithCancel(ctx) + + defer func() { _ = src.Close() }() + defer func() { + nocancel := context2.WithoutCancellation(ctx) + timedctx, cancel := context.WithTimeout(nocancel, 5*time.Second) + defer cancel() + _ = dst.Abort(timedctx) + }() + defer cancel() + + addError := func(err error) { + if err == nil { + return + } + + mu.Lock() + defer mu.Unlock() + + es.Add(err) + + // abort all other concurrenty copies + cancel() + } + + var readBufs *ulfs.BytesPool + if p > 1 && chunkSize > 0 && (source.Std() || dest.Std()) { + // Create the read buffer pool only for uploads from stdin and downloads to stdout with parallelism > 1. + readBufs = ulfs.NewBytesPool(int(chunkSize)) + } + + for i := 0; length != 0; i++ { + i := i + + chunk := chunkSize + if length > 0 && chunkSize > length { + chunk = length + } + length -= chunk + + rh, err := src.NextPart(ctx, chunk) + if err != nil { + if !errors.Is(err, io.EOF) { + addError(errs.New("error getting reader for part %d: %v", i, err)) + } + break + } + + wh, err := dst.NextPart(ctx, chunk) + if err != nil { + _ = rh.Close() + + addError(errs.New("error getting writer for part %d: %v", i, err)) + break + } + + ok := limiter.Go(ctx, func() { + defer func() { _ = rh.Close() }() + defer func() { _ = wh.Abort() }() + + if readBufs != nil { + buf := readBufs.Get() + defer readBufs.Put(buf) + + rh = ulfs.NewBufferedReadHandle(ctx, rh, buf) + } + + var w io.Writer = wh + if bar != nil { + bar.SetTotal(rh.Info().ContentLength, false) + bar.EnableTriggerComplete() + pw := bar.ProxyWriter(w) + defer func() { + _ = pw.Close() + }() + w = pw + } + + _, err := sync2.Copy(ctx, w, rh) + if err == nil { + err = wh.Commit() + } + + if err != nil { + // TODO: it would be also nice to use wh.Abort and rh.Close directly + // to avoid some of the waiting that's caused by sync2.Copy. + // + // However, some of the source / destination implementations don't seem + // to have concurrent safe API with that regards. + // + // Also, we may want to check that it actually helps, before implementing it. + + addError(errs.New("failed to %s part %d: %v", copyVerb(source, dest), i, err)) + } + }) + if !ok { + break + } + } + + limiter.Wait() + + // don't try to commit if any error occur + if len(es) == 0 { + es.Add(dst.Commit(ctx)) + } + + return errs.Wrap(combineErrs(es)) +} + +func (c *cmdCp) singleCopy( + ctx context.Context, + source, dest ulloc.Location, + src ulfs.MultiReadHandle, + dst ulfs.MultiWriteHandle, offset, length int64, bar *mpb.Bar) error { @@ -446,16 +636,19 @@ func (c *cmdCp) copy( ctx, cancel := context.WithCancel(ctx) defer cancel() - defer func() { _ = src.Close() }() - defer func() { _ = dst.Abort() }() - rh, err := src.NextPart(ctx, length) if err != nil { return errs.Wrap(err) } defer func() { _ = rh.Close() }() - var w io.Writer = dst + wh, err := dst.NextPart(ctx, length) + if err != nil { + return errs.Wrap(err) + } + defer func() { _ = wh.Abort() }() + + var w io.Writer = wh if bar != nil { bar.SetTotal(rh.Info().ContentLength, false) bar.EnableTriggerComplete() @@ -464,12 +657,19 @@ func (c *cmdCp) copy( w = pw } - _, err = sync2.Copy(ctx, w, rh) - if err == nil { - err = dst.Commit() + if _, err := sync2.Copy(ctx, w, rh); err != nil { + return errs.Wrap(err) } - return errs.Wrap(err) + if err := wh.Commit(); err != nil { + return errs.Wrap(err) + } + + if err := dst.Commit(ctx); err != nil { + return errs.Wrap(err) + } + + return nil } func newProgressBar(progress *mpb.Progress, name string, which, total int) *mpb.Bar { diff --git a/cmd/uplink/cmd_cp_test.go b/cmd/uplink/cmd_cp_test.go index 881e428a6..8dc4a446b 100644 --- a/cmd/uplink/cmd_cp_test.go +++ b/cmd/uplink/cmd_cp_test.go @@ -6,6 +6,9 @@ package main import ( "testing" + "github.com/stretchr/testify/require" + + "storj.io/common/memory" "storj.io/storj/cmd/uplink/ultest" ) @@ -93,6 +96,51 @@ func TestCpDownload(t *testing.T) { }) } +func TestCpPartSize(t *testing.T) { + c := newCmdCp(nil) + + // 1GiB file, should return 64MiB + partSize, err := c.calculatePartSize(memory.GiB.Int64(), c.parallelismChunkSize.Int64()) + require.NoError(t, err) + require.EqualValues(t, memory.MiB*64, partSize) + + // 640 GB file, should return 64MiB. + partSize, err = c.calculatePartSize(memory.GB.Int64()*640, c.parallelismChunkSize.Int64()) + require.NoError(t, err) + require.EqualValues(t, memory.MiB*64, partSize) + + // 640GiB file, should return 128MiB. + partSize, err = c.calculatePartSize(memory.GiB.Int64()*640, c.parallelismChunkSize.Int64()) + require.NoError(t, err) + require.EqualValues(t, memory.MiB*128, partSize) + + // 1TiB file, should return 128MiB. + partSize, err = c.calculatePartSize(memory.TiB.Int64(), c.parallelismChunkSize.Int64()) + require.NoError(t, err) + require.EqualValues(t, memory.MiB*128, partSize) + + // 1.3TiB file, should return 192MiB. + partSize, err = c.calculatePartSize(memory.GiB.Int64()*1300, c.parallelismChunkSize.Int64()) + require.NoError(t, err) + require.EqualValues(t, memory.MiB*192, partSize) + + // should return 1GiB as requested. + partSize, err = c.calculatePartSize(memory.GiB.Int64()*1300, memory.GiB.Int64()) + require.NoError(t, err) + require.EqualValues(t, memory.GiB, partSize) + + // should return 192 MiB and error, since preferred is too low. + partSize, err = c.calculatePartSize(memory.GiB.Int64()*1300, memory.MiB.Int64()) + require.Error(t, err) + require.Equal(t, "the specified chunk size 1.0 MiB is too small, requires 192.0 MiB or larger", err.Error()) + require.Zero(t, partSize) + + // negative length should return 64MiB part size + partSize, err = c.calculatePartSize(-1, c.parallelismChunkSize.Int64()) + require.NoError(t, err) + require.EqualValues(t, memory.MiB*64, partSize) +} + func TestCpUpload(t *testing.T) { state := ultest.Setup(commands, ultest.WithFile("/home/user/file1.txt", "local"), diff --git a/cmd/uplink/ulfs/filesystem.go b/cmd/uplink/ulfs/filesystem.go index e798f5e44..88796c3d0 100644 --- a/cmd/uplink/ulfs/filesystem.go +++ b/cmd/uplink/ulfs/filesystem.go @@ -39,7 +39,7 @@ func (ro *RemoveOptions) isPending() bool { return ro != nil && ro.Pending } type Filesystem interface { Close() error Open(ctx context.Context, loc ulloc.Location) (MultiReadHandle, error) - Create(ctx context.Context, loc ulloc.Location, opts *CreateOptions) (WriteHandle, error) + Create(ctx context.Context, loc ulloc.Location, opts *CreateOptions) (MultiWriteHandle, error) Move(ctx context.Context, source, dest ulloc.Location) error Copy(ctx context.Context, source, dest ulloc.Location) error Remove(ctx context.Context, loc ulloc.Location, opts *RemoveOptions) error @@ -52,7 +52,7 @@ type Filesystem interface { type FilesystemLocal interface { IsLocalDir(ctx context.Context, path string) bool Open(ctx context.Context, path string) (MultiReadHandle, error) - Create(ctx context.Context, path string) (WriteHandle, error) + Create(ctx context.Context, path string) (MultiWriteHandle, error) Move(ctx context.Context, oldpath string, newpath string) error Copy(ctx context.Context, oldpath string, newpath string) error Remove(ctx context.Context, path string, opts *RemoveOptions) error @@ -64,7 +64,7 @@ type FilesystemLocal interface { type FilesystemRemote interface { Close() error Open(ctx context.Context, bucket, key string) (MultiReadHandle, error) - Create(ctx context.Context, bucket, key string, opts *CreateOptions) (WriteHandle, error) + Create(ctx context.Context, bucket, key string, opts *CreateOptions) (MultiWriteHandle, error) Move(ctx context.Context, oldbucket, oldkey string, newbucket, newkey string) error Copy(ctx context.Context, oldbucket, oldkey string, newbucket, newkey string) error Remove(ctx context.Context, bucket, key string, opts *RemoveOptions) error @@ -142,6 +142,18 @@ type ReadHandle interface { // write handles // +// MultiWriteHandle lets one create multiple sequential WriteHandles for +// different sections of something. +// +// The returned WriteHandle will error if data is attempted to be written +// past the provided length. A negative length implies an unknown amount +// of data, and future calls to NextPart will error. +type MultiWriteHandle interface { + NextPart(ctx context.Context, length int64) (WriteHandle, error) + Commit(ctx context.Context) error + Abort(ctx context.Context) error +} + // WriteHandle is anything that can be written to with commit/abort semantics. type WriteHandle interface { io.Writer diff --git a/cmd/uplink/ulfs/handle_file.go b/cmd/uplink/ulfs/handle_file.go index d10d6e941..555a5b6c1 100644 --- a/cmd/uplink/ulfs/handle_file.go +++ b/cmd/uplink/ulfs/handle_file.go @@ -32,37 +32,22 @@ func newOSMultiReadHandle(fh LocalBackendFile) (MultiReadHandle, error) { // type fileGenericWriter struct { - fs LocalBackend - raw LocalBackendFile - done bool -} - -func (f *fileGenericWriter) Write(b []byte) (int, error) { return f.raw.Write(b) } - -func (f *fileGenericWriter) Commit() error { - if f.done { - return errs.New("already commit/aborted") - } - f.done = true - - return f.raw.Close() + fs LocalBackend + raw LocalBackendFile } +func (f *fileGenericWriter) WriteAt(b []byte, off int64) (int, error) { return f.raw.WriteAt(b, off) } +func (f *fileGenericWriter) Commit() error { return f.raw.Close() } func (f *fileGenericWriter) Abort() error { - if f.done { - return errs.New("already commit/aborted") - } - f.done = true - return errs.Combine( f.raw.Close(), f.fs.Remove(f.raw.Name()), ) } -func newOSWriteHandle(fs LocalBackend, fh LocalBackendFile) WriteHandle { - return &fileGenericWriter{ +func newOSMultiWriteHandle(fs LocalBackend, fh LocalBackendFile) MultiWriteHandle { + return NewGenericMultiWriteHandle(&fileGenericWriter{ fs: fs, raw: fh, - } + }) } diff --git a/cmd/uplink/ulfs/handle_generic.go b/cmd/uplink/ulfs/handle_generic.go index 5286da110..a85deb49c 100644 --- a/cmd/uplink/ulfs/handle_generic.go +++ b/cmd/uplink/ulfs/handle_generic.go @@ -133,3 +133,161 @@ func (o *genericReadHandle) Read(p []byte) (int, error) { o.len -= int64(n) return n, err } + +// +// write handles +// + +// GenericWriter is an interface that can be turned into a GenericMultiWriteHandle. +type GenericWriter interface { + io.WriterAt + Commit() error + Abort() error +} + +// GenericMultiWriteHandle implements MultiWriteHandle for *os.Files. +type GenericMultiWriteHandle struct { + w GenericWriter + + mu sync.Mutex + off int64 + tail bool + done bool + abort bool +} + +// NewGenericMultiWriteHandle constructs an *GenericMultiWriteHandle from a GenericWriter. +func NewGenericMultiWriteHandle(w GenericWriter) *GenericMultiWriteHandle { + return &GenericMultiWriteHandle{ + w: w, + } +} + +func (o *GenericMultiWriteHandle) childAbort() { + o.mu.Lock() + defer o.mu.Unlock() + + if !o.done { + o.abort = true + } +} + +func (o *GenericMultiWriteHandle) status() (done, abort bool) { + o.mu.Lock() + defer o.mu.Unlock() + + return o.done, o.abort +} + +// NextPart returns a WriteHandle expecting length bytes to be written to it. +func (o *GenericMultiWriteHandle) NextPart(ctx context.Context, length int64) (WriteHandle, error) { + o.mu.Lock() + defer o.mu.Unlock() + + if o.done { + return nil, errs.New("already closed") + } else if o.tail { + return nil, errs.New("unable to make part after tail part") + } + + w := &genericWriteHandle{ + parent: o, + w: o.w, + off: o.off, + tail: length < 0, + len: length, + } + + if w.tail { + o.tail = true + } else { + o.off += length + } + + return w, nil +} + +// Commit commits the overall GenericMultiWriteHandle. It errors if +// any parts were aborted. +func (o *GenericMultiWriteHandle) Commit(ctx context.Context) error { + o.mu.Lock() + defer o.mu.Unlock() + + if o.done { + return nil + } + o.done = true + + if o.abort { + return errs.Combine( + errs.New("commit failed: not every child was committed"), + o.w.Abort(), + ) + } + + return o.w.Commit() +} + +// Abort aborts the overall GenericMultiWriteHandle. +func (o *GenericMultiWriteHandle) Abort(ctx context.Context) error { + o.mu.Lock() + defer o.mu.Unlock() + + if o.done { + return nil + } + o.done = true + o.abort = true + + return o.w.Abort() +} + +type genericWriteHandle struct { + parent *GenericMultiWriteHandle + w GenericWriter + done bool + off int64 + tail bool + len int64 +} + +func (o *genericWriteHandle) Write(p []byte) (int, error) { + if !o.tail { + if o.len <= 0 { + return 0, errs.New("write past maximum length") + } else if o.len < int64(len(p)) { + p = p[:o.len] + } + } + n, err := o.w.WriteAt(p, o.off) + o.off += int64(n) + if !o.tail { + o.len -= int64(n) + } + return n, err +} + +func (o *genericWriteHandle) Commit() error { + if o.done { + return nil + } + o.done = true + + done, abort := o.parent.status() + if abort { + return errs.New("commit failed: parent write handle aborted") + } else if done { + return errs.New("commit failed: parent write handle done") + } + return nil +} + +func (o *genericWriteHandle) Abort() error { + if o.done { + return nil + } + o.done = true + + o.parent.childAbort() + return nil +} diff --git a/cmd/uplink/ulfs/handle_std.go b/cmd/uplink/ulfs/handle_std.go index 26b260d0f..759059115 100644 --- a/cmd/uplink/ulfs/handle_std.go +++ b/cmd/uplink/ulfs/handle_std.go @@ -140,17 +140,143 @@ func (o *stdReadHandle) Read(p []byte) (int, error) { // write handles // -// stdWriteHandle implements WriteHandle for stdouts. -type stdWriteHandle struct { - stdout io.Writer +// stdMultiWriteHandle implements MultiWriteHandle for stdouts. +type stdMultiWriteHandle struct { + stdout closableWriter + + mu sync.Mutex + next *sync.Mutex + tail bool + done bool } -func newStdWriteHandle(stdout io.Writer) *stdWriteHandle { - return &stdWriteHandle{ - stdout: stdout, +func newStdMultiWriteHandle(stdout io.Writer) *stdMultiWriteHandle { + return &stdMultiWriteHandle{ + stdout: closableWriter{Writer: stdout}, + next: new(sync.Mutex), } } -func (s *stdWriteHandle) Write(b []byte) (int, error) { return s.stdout.Write(b) } -func (s *stdWriteHandle) Commit() error { return nil } -func (s *stdWriteHandle) Abort() error { return nil } +func (s *stdMultiWriteHandle) NextPart(ctx context.Context, length int64) (WriteHandle, error) { + s.mu.Lock() + defer s.mu.Unlock() + + if s.done { + return nil, errs.New("already closed") + } else if s.tail { + return nil, errs.New("unable to make part after tail part") + } + + next := new(sync.Mutex) + next.Lock() + + w := &stdWriteHandle{ + stdout: &s.stdout, + mu: s.next, + next: next, + tail: length < 0, + len: length, + } + + s.tail = w.tail + s.next = next + + return w, nil +} + +func (s *stdMultiWriteHandle) Commit(ctx context.Context) error { + s.mu.Lock() + defer s.mu.Unlock() + + s.done = true + return nil +} + +func (s *stdMultiWriteHandle) Abort(ctx context.Context) error { + s.mu.Lock() + defer s.mu.Unlock() + + s.done = true + return nil +} + +// stdWriteHandle implements WriteHandle for stdouts. +type stdWriteHandle struct { + stdout *closableWriter + mu *sync.Mutex + next *sync.Mutex + tail bool + len int64 +} + +func (s *stdWriteHandle) unlockNext(err error) { + if s.next != nil { + if err != nil { + s.stdout.close(err) + } + s.next.Unlock() + s.next = nil + } +} + +func (s *stdWriteHandle) Write(p []byte) (int, error) { + s.mu.Lock() + defer s.mu.Unlock() + + if !s.tail { + if s.len <= 0 { + return 0, errs.New("write past maximum length") + } else if s.len < int64(len(p)) { + p = p[:s.len] + } + } + + n, err := s.stdout.Write(p) + + if !s.tail { + s.len -= int64(n) + if s.len == 0 { + s.unlockNext(err) + } + } + + return n, err +} + +func (s *stdWriteHandle) Commit() error { + s.mu.Lock() + defer s.mu.Unlock() + + s.len = 0 + s.unlockNext(nil) + + return nil +} + +func (s *stdWriteHandle) Abort() error { + s.mu.Lock() + defer s.mu.Unlock() + + s.len = 0 + s.unlockNext(context.Canceled) + + return nil +} + +type closableWriter struct { + io.Writer + err error +} + +func (out *closableWriter) Write(p []byte) (int, error) { + if out.err != nil { + return 0, out.err + } + n, err := out.Writer.Write(p) + out.err = err + return n, err +} + +func (out *closableWriter) close(err error) { + out.err = err +} diff --git a/cmd/uplink/ulfs/handle_std_test.go b/cmd/uplink/ulfs/handle_std_test.go new file mode 100644 index 000000000..3fbfc5b8c --- /dev/null +++ b/cmd/uplink/ulfs/handle_std_test.go @@ -0,0 +1,101 @@ +// Copyright (C) 2022 Storj Labs, Inc. +// See LICENSE for copying information. + +package ulfs + +import ( + "bytes" + "errors" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/require" + + "storj.io/common/testcontext" + "storj.io/common/testrand" +) + +type writeThrottle struct { + entered chan struct{} + release chan error +} + +type throttledWriter struct { + writex int64 + write []writeThrottle + data bytes.Buffer +} + +func newThrottledWriter(maxWrites int) *throttledWriter { + tw := &throttledWriter{ + writex: 0, + write: make([]writeThrottle, maxWrites), + } + for i := range tw.write { + tw.write[i] = writeThrottle{ + entered: make(chan struct{}), + release: make(chan error, 1), + } + } + return tw +} + +func (tw *throttledWriter) Write(data []byte) (n int, _ error) { + index := atomic.AddInt64(&tw.writex, 1) - 1 + + close(tw.write[index].entered) + forceErr := <-tw.write[index].release + + n, writeErr := tw.data.Write(data) + if writeErr != nil { + return n, writeErr + } + + return n, forceErr +} + +func TestStdMultiWriteAbort(t *testing.T) { + ctx := testcontext.New(t) + + stdout := newThrottledWriter(2) + multi := newStdMultiWriteHandle(stdout) + + head := testrand.Bytes(256) + tail := testrand.Bytes(256) + + part1, err := multi.NextPart(ctx, 256) + require.NoError(t, err) + ctx.Go(func() error { + defer func() { _ = part1.Abort() }() + + _, err := part1.Write(head) + if err == nil { + return errors.New("expected an error") + } + return nil + }) + + part2, err := multi.NextPart(ctx, 256) + require.NoError(t, err) + ctx.Go(func() error { + defer func() { _ = part2.Commit() }() + + // wait for the above part to enter write first + <-stdout.write[0].entered + _, err := part2.Write(tail) + if err == nil { + return errors.New("expected an error") + } + return nil + }) + + // wait until we enter both writes + <-stdout.write[0].entered + + stdout.write[0].release <- errors.New("fail 0") + stdout.write[1].release <- nil + + ctx.Wait() + + require.Equal(t, head, stdout.data.Bytes()) +} diff --git a/cmd/uplink/ulfs/handle_uplink.go b/cmd/uplink/ulfs/handle_uplink.go index a68314b72..328c4e64d 100644 --- a/cmd/uplink/ulfs/handle_uplink.go +++ b/cmd/uplink/ulfs/handle_uplink.go @@ -166,16 +166,127 @@ func (u *uplinkReadHandle) Info() ObjectInfo { return *u.info } // write handles // -type uplinkWriteHandle struct { - upload *uplink.Upload +type uplinkMultiWriteHandle struct { + project *uplink.Project + bucket string + info uplink.UploadInfo + metadata uplink.CustomMetadata + + mu sync.Mutex + tail bool + part uint32 + commitErr *error + abortErr *error } -func newUplinkWriteHandle(upload *uplink.Upload) *uplinkWriteHandle { - return &uplinkWriteHandle{ - upload: upload, +func newUplinkMultiWriteHandle(project *uplink.Project, bucket string, info uplink.UploadInfo, metadata uplink.CustomMetadata) *uplinkMultiWriteHandle { + return &uplinkMultiWriteHandle{ + project: project, + bucket: bucket, + info: info, + metadata: metadata, } } -func (u *uplinkWriteHandle) Write(b []byte) (int, error) { return u.upload.Write(b) } -func (u *uplinkWriteHandle) Commit() error { return u.upload.Commit() } -func (u *uplinkWriteHandle) Abort() error { return u.upload.Abort() } +func (u *uplinkMultiWriteHandle) NextPart(ctx context.Context, length int64) (WriteHandle, error) { + part, err := func() (uint32, error) { + u.mu.Lock() + defer u.mu.Unlock() + + switch { + case u.abortErr != nil: + return 0, errs.New("cannot make part after multipart write has been aborted") + case u.commitErr != nil: + return 0, errs.New("cannot make part after multipart write has been committed") + } + + if u.tail { + return 0, errs.New("unable to make part after tail part") + } + u.tail = length < 0 + + u.part++ + return u.part, nil + }() + if err != nil { + return nil, err + } + + ul, err := u.project.UploadPart(ctx, u.bucket, u.info.Key, u.info.UploadID, part) + if err != nil { + return nil, err + } + + return &uplinkWriteHandle{ + ul: ul, + tail: length < 0, + len: length, + }, nil +} + +func (u *uplinkMultiWriteHandle) Commit(ctx context.Context) error { + u.mu.Lock() + defer u.mu.Unlock() + + switch { + case u.abortErr != nil: + return errs.New("cannot commit an aborted multipart write") + case u.commitErr != nil: + return *u.commitErr + } + + _, err := u.project.CommitUpload(ctx, u.bucket, u.info.Key, u.info.UploadID, &uplink.CommitUploadOptions{ + CustomMetadata: u.metadata, + }) + u.commitErr = &err + return err +} + +func (u *uplinkMultiWriteHandle) Abort(ctx context.Context) error { + u.mu.Lock() + defer u.mu.Unlock() + + switch { + case u.abortErr != nil: + return *u.abortErr + case u.commitErr != nil: + return errs.New("cannot abort a committed multipart write") + } + + err := u.project.AbortUpload(ctx, u.bucket, u.info.Key, u.info.UploadID) + u.abortErr = &err + return err +} + +// uplinkWriteHandle implements writeHandle for *uplink.Uploads. +type uplinkWriteHandle struct { + ul *uplink.PartUpload + tail bool + len int64 +} + +func (u *uplinkWriteHandle) Write(p []byte) (int, error) { + if !u.tail { + if u.len <= 0 { + return 0, errs.New("write past maximum length") + } else if u.len < int64(len(p)) { + p = p[:u.len] + } + } + + n, err := u.ul.Write(p) + + if !u.tail { + u.len -= int64(n) + } + + return n, err +} + +func (u *uplinkWriteHandle) Commit() error { + return u.ul.Commit() +} + +func (u *uplinkWriteHandle) Abort() error { + return u.ul.Abort() +} diff --git a/cmd/uplink/ulfs/local.go b/cmd/uplink/ulfs/local.go index 9720b9608..d8c2fcee4 100644 --- a/cmd/uplink/ulfs/local.go +++ b/cmd/uplink/ulfs/local.go @@ -19,8 +19,8 @@ import ( // LocalBackendFile represents a file in the filesystem. type LocalBackendFile interface { io.Closer - io.Writer io.ReaderAt + io.WriterAt Name() string Stat() (os.FileInfo, error) Readdir(int) ([]os.FileInfo, error) @@ -58,7 +58,7 @@ func (l *Local) Open(ctx context.Context, path string) (MultiReadHandle, error) } // Create makes any directories necessary to create a file at path and returns a WriteHandle. -func (l *Local) Create(ctx context.Context, path string) (WriteHandle, error) { +func (l *Local) Create(ctx context.Context, path string) (MultiWriteHandle, error) { fi, err := l.fs.Stat(path) if err != nil && !os.IsNotExist(err) { return nil, errs.Wrap(err) @@ -75,7 +75,7 @@ func (l *Local) Create(ctx context.Context, path string) (WriteHandle, error) { if err != nil { return nil, errs.Wrap(err) } - return newOSWriteHandle(l.fs, fh), nil + return newOSMultiWriteHandle(l.fs, fh), nil } // Move moves file to provided path. diff --git a/cmd/uplink/ulfs/local_backend_mem.go b/cmd/uplink/ulfs/local_backend_mem.go index a020619ae..9f7a24911 100644 --- a/cmd/uplink/ulfs/local_backend_mem.go +++ b/cmd/uplink/ulfs/local_backend_mem.go @@ -200,9 +200,11 @@ func (mf *memFile) ReadAt(p []byte, off int64) (int, error) { return copy(p, mf.buf[off:]), nil } -func (mf *memFile) Write(p []byte) (int, error) { - mf.buf = append(mf.buf, p...) - return len(p), nil +func (mf *memFile) WriteAt(p []byte, off int64) (int, error) { + if delta := (off + int64(len(p))) - int64(len(mf.buf)); delta > 0 { + mf.buf = append(mf.buf, make([]byte, delta)...) + } + return copy(mf.buf[off:], p), nil } func (mf *memFile) Stat() (os.FileInfo, error) { @@ -254,7 +256,7 @@ func (md *memDir) ReadAt(p []byte, off int64) (int, error) { return 0, errs.New("readat on directory") } -func (md *memDir) Write(p []byte) (int, error) { +func (md *memDir) WriteAt(p []byte, off int64) (int, error) { return 0, errs.New("writeat on directory") } diff --git a/cmd/uplink/ulfs/mixed.go b/cmd/uplink/ulfs/mixed.go index 9b77e9b17..e67c7ffa7 100644 --- a/cmd/uplink/ulfs/mixed.go +++ b/cmd/uplink/ulfs/mixed.go @@ -42,13 +42,13 @@ func (m *Mixed) Open(ctx context.Context, loc ulloc.Location) (MultiReadHandle, } // Create returns a WriteHandle to either a local file, remote object, or stdout. -func (m *Mixed) Create(ctx context.Context, loc ulloc.Location, opts *CreateOptions) (WriteHandle, error) { +func (m *Mixed) Create(ctx context.Context, loc ulloc.Location, opts *CreateOptions) (MultiWriteHandle, error) { if bucket, key, ok := loc.RemoteParts(); ok { return m.remote.Create(ctx, bucket, key, opts) } else if path, ok := loc.LocalParts(); ok { return m.local.Create(ctx, path) } - return newStdWriteHandle(clingy.Stdout(ctx)), nil + return newStdMultiWriteHandle(clingy.Stdout(ctx)), nil } // Move moves either a local file or remote object. diff --git a/cmd/uplink/ulfs/remote.go b/cmd/uplink/ulfs/remote.go index 1405f6d1d..5a32b37f3 100644 --- a/cmd/uplink/ulfs/remote.go +++ b/cmd/uplink/ulfs/remote.go @@ -46,22 +46,23 @@ func (r *Remote) Stat(ctx context.Context, bucket, key string) (*ObjectInfo, err } // Create returns a MultiWriteHandle for the object identified by a given bucket and key. -func (r *Remote) Create(ctx context.Context, bucket, key string, opts *CreateOptions) (WriteHandle, error) { - upload, err := r.project.UploadObject(ctx, bucket, key, &uplink.UploadOptions{ +func (r *Remote) Create(ctx context.Context, bucket, key string, opts *CreateOptions) (MultiWriteHandle, error) { + var customMetadata uplink.CustomMetadata + if opts.Metadata != nil { + customMetadata = uplink.CustomMetadata(opts.Metadata) + + if err := customMetadata.Verify(); err != nil { + return nil, err + } + } + + info, err := r.project.BeginUpload(ctx, bucket, key, &uplink.UploadOptions{ Expires: opts.Expires, }) if err != nil { return nil, err } - - if opts.Metadata != nil { - if err := upload.SetCustomMetadata(ctx, uplink.CustomMetadata(opts.Metadata)); err != nil { - _ = upload.Abort() - return nil, err - } - } - - return newUplinkWriteHandle(upload), nil + return newUplinkMultiWriteHandle(r.project, bucket, info, customMetadata), nil } // Move moves object to provided key and bucket. diff --git a/cmd/uplink/ultest/filesystem.go b/cmd/uplink/ultest/filesystem.go index 360b6b44f..03acdb6ee 100644 --- a/cmd/uplink/ultest/filesystem.go +++ b/cmd/uplink/ultest/filesystem.go @@ -112,7 +112,7 @@ func (rfs *remoteFilesystem) Open(ctx context.Context, bucket, key string) (ulfs return newMultiReadHandle(mf.contents), nil } -func (rfs *remoteFilesystem) Create(ctx context.Context, bucket, key string, opts *ulfs.CreateOptions) (_ ulfs.WriteHandle, err error) { +func (rfs *remoteFilesystem) Create(ctx context.Context, bucket, key string, opts *ulfs.CreateOptions) (_ ulfs.MultiWriteHandle, err error) { rfs.mu.Lock() defer rfs.mu.Unlock() @@ -140,7 +140,7 @@ func (rfs *remoteFilesystem) Create(ctx context.Context, bucket, key string, opt rfs.pending[loc] = append(rfs.pending[loc], wh) - return wh, nil + return ulfs.NewGenericMultiWriteHandle(wh), nil } func (rfs *remoteFilesystem) Move(ctx context.Context, oldbucket, oldkey string, newbucket, newkey string) error { @@ -282,12 +282,15 @@ type memWriteHandle struct { done bool } -func (b *memWriteHandle) Write(p []byte) (int, error) { +func (b *memWriteHandle) WriteAt(p []byte, off int64) (int, error) { if b.done { return 0, errs.New("write to closed handle") } - b.buf = append(b.buf, p...) - return len(p), nil + end := int64(len(p)) + off + if grow := end - int64(len(b.buf)); grow > 0 { + b.buf = append(b.buf, make([]byte, grow)...) + } + return copy(b.buf[off:], p), nil } func (b *memWriteHandle) Commit() error { diff --git a/cmd/uplink/ultest/setup.go b/cmd/uplink/ultest/setup.go index 376fa5788..6ba68caf2 100644 --- a/cmd/uplink/ultest/setup.go +++ b/cmd/uplink/ultest/setup.go @@ -199,7 +199,11 @@ func WithFile(location string, contents ...string) ExecuteOption { cs.rfs.ensureBucket(bucket) } - wh, err := cs.fs.Create(ctx, loc, nil) + mwh, err := cs.fs.Create(ctx, loc, nil) + require.NoError(t, err) + defer func() { _ = mwh.Abort(ctx) }() + + wh, err := mwh.NextPart(ctx, -1) require.NoError(t, err) defer func() { _ = wh.Abort() }() @@ -213,6 +217,7 @@ func WithFile(location string, contents ...string) ExecuteOption { } require.NoError(t, wh.Commit()) + require.NoError(t, mwh.Commit(ctx)) }} } diff --git a/go.mod b/go.mod index 08b8db2a2..3be8e16ba 100644 --- a/go.mod +++ b/go.mod @@ -43,7 +43,7 @@ require ( github.com/vivint/infectious v0.0.0-20200605153912-25a574ae18a3 github.com/zeebo/assert v1.3.1 github.com/zeebo/blake3 v0.2.3 - github.com/zeebo/clingy v0.0.0-20230301225531-f2d4117c8e8c + github.com/zeebo/clingy v0.0.0-20230602044025-906be850f10d github.com/zeebo/errs v1.3.0 github.com/zeebo/ini v0.0.0-20210514163846-cc8fbd8d9599 github.com/zyedidia/generic v1.2.1 diff --git a/go.sum b/go.sum index 0a8431280..e20f35d96 100644 --- a/go.sum +++ b/go.sum @@ -605,8 +605,8 @@ github.com/zeebo/assert v1.3.1 h1:vukIABvugfNMZMQO1ABsyQDJDTVQbn+LWSMy1ol1h6A= github.com/zeebo/assert v1.3.1/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/blake3 v0.2.3 h1:TFoLXsjeXqRNFxSbk35Dk4YtszE/MQQGK10BH4ptoTg= github.com/zeebo/blake3 v0.2.3/go.mod h1:mjJjZpnsyIVtVgTOSpJ9vmRE4wgDeyt2HU3qXvvKCaQ= -github.com/zeebo/clingy v0.0.0-20230301225531-f2d4117c8e8c h1:bE9vXPFKa9wkCCq1HJi2Ms4pWuBoIKQEMe6CZzu/TKE= -github.com/zeebo/clingy v0.0.0-20230301225531-f2d4117c8e8c/go.mod h1:MHEhXvEfewflU7SSVKHI7nkdU+fpyxZ5XPPzj+5gYNw= +github.com/zeebo/clingy v0.0.0-20230602044025-906be850f10d h1:INuKdI3R6zDA8UEgbBxDFkb7qwO/nSvnJRdrBCMW+To= +github.com/zeebo/clingy v0.0.0-20230602044025-906be850f10d/go.mod h1:MHEhXvEfewflU7SSVKHI7nkdU+fpyxZ5XPPzj+5gYNw= github.com/zeebo/errs v1.2.2/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4= github.com/zeebo/errs v1.3.0 h1:hmiaKqgYZzcVgRL1Vkc1Mn2914BbzB0IBxs+ebeutGs= github.com/zeebo/errs v1.3.0/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4=