diff --git a/cmd/uplink/cmd_cp.go b/cmd/uplink/cmd_cp.go index f13ebbbb1..38d9a57fb 100644 --- a/cmd/uplink/cmd_cp.go +++ b/cmd/uplink/cmd_cp.go @@ -6,6 +6,7 @@ package main import ( "bufio" "context" + "errors" "fmt" "io" "os" @@ -20,6 +21,7 @@ import ( "github.com/zeebo/clingy" "github.com/zeebo/errs" + "storj.io/common/context2" "storj.io/common/fpath" "storj.io/common/memory" "storj.io/common/rpc/rpcpool" @@ -53,6 +55,8 @@ type cmdCp struct { locs []ulloc.Location } +const maxPartCount int64 = 10000 + func newCmdCp(ex ulext.External) *cmdCp { return &cmdCp{ex: ex} } @@ -81,8 +85,8 @@ func (c *cmdCp) Setup(params clingy.Parameters) { ).(bool) c.byteRange = params.Flag("range", "Downloads the specified range bytes of an object. For more information about the HTTP Range header, see https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35", "").(string) - c.parallelism = params.Flag("parallelism", "Deprecated", 1, - clingy.Hidden, + parallelism := params.Flag("parallelism", "Controls how many parallel chunks to upload/download from a file", nil, + clingy.Optional, clingy.Short('p'), clingy.Transform(strconv.Atoi), clingy.Transform(func(n int) (int, error) { @@ -91,9 +95,8 @@ func (c *cmdCp) Setup(params clingy.Parameters) { } return n, nil }), - ).(int) - c.parallelismChunkSize = params.Flag("parallelism-chunk-size", "Deprecated", memory.Size(0), - clingy.Hidden, + ).(*int) + c.parallelismChunkSize = params.Flag("parallelism-chunk-size", "Set the size of the chunks for parallelism, 0 means automatic adjustment", memory.Size(0), clingy.Transform(memory.ParseString), clingy.Transform(func(n int64) (memory.Size, error) { if n < 0 { @@ -104,13 +107,14 @@ func (c *cmdCp) Setup(params clingy.Parameters) { ).(memory.Size) c.uploadConfig = testuplink.DefaultConcurrentSegmentUploadsConfig() - c.uploadConfig.SchedulerOptions.MaximumConcurrent = params.Flag( + maxConcurrent := params.Flag( "maximum-concurrent-pieces", "Maximum concurrent pieces to upload at once per transfer", - c.uploadConfig.SchedulerOptions.MaximumConcurrent, + nil, + clingy.Optional, clingy.Transform(strconv.Atoi), clingy.Advanced, - ).(int) + ).(*int) c.uploadConfig.SchedulerOptions.MaximumConcurrentHandles = params.Flag( "maximum-concurrent-segments", "Maximum concurrent segments to upload at once per transfer", @@ -129,6 +133,28 @@ func (c *cmdCp) Setup(params clingy.Parameters) { clingy.Advanced, ).(string) + { // handle backwards compatibility around parallelism and maximum concurrent pieces + addr := func(x int) *int { return &x } + + switch { + // if neither are actively set, use defaults + case parallelism == nil && maxConcurrent == nil: + parallelism = addr(1) + maxConcurrent = addr(c.uploadConfig.SchedulerOptions.MaximumConcurrent) + + // if parallelism is not set, use a value based on maxConcurrent + case parallelism == nil: + parallelism = addr((*maxConcurrent + 99) / 100) + + // if maxConcurrent is not set, use a value based on parallelism + case maxConcurrent == nil: + maxConcurrent = addr(100 * *parallelism) + } + + c.uploadConfig.SchedulerOptions.MaximumConcurrent = *maxConcurrent + c.parallelism = *parallelism + } + c.inmemoryEC = params.Flag("inmemory-erasure-coding", "Keep erasure-coded pieces in-memory instead of writing them on the disk during upload", false, clingy.Transform(strconv.ParseBool), clingy.Boolean, @@ -384,24 +410,57 @@ func (c *cmdCp) copyFile(ctx context.Context, fs ulfs.Filesystem, source, dest u } defer func() { _ = mrh.Close() }() - wh, err := fs.Create(ctx, dest, &ulfs.CreateOptions{ + mwh, err := fs.Create(ctx, dest, &ulfs.CreateOptions{ Expires: c.expires, Metadata: c.metadata, }) if err != nil { return err } - defer func() { _ = wh.Abort() }() + defer func() { _ = mwh.Abort(ctx) }() - return errs.Wrap(c.copy( + // if we're uploading, do a single part of maximum size + if dest.Remote() { + return errs.Wrap(c.singleCopy( + ctx, + source, dest, + mrh, mwh, + offset, length, + bar, + )) + } + + partSize, err := c.calculatePartSize(mrh.Length(), c.parallelismChunkSize.Int64()) + if err != nil { + return err + } + + return errs.Wrap(c.parallelCopy( ctx, source, dest, - wh, mrh, + mrh, mwh, + c.parallelism, partSize, offset, length, bar, )) } +// calculatePartSize returns the needed part size in order to upload the file with size of 'length'. +// It hereby respects if the client requests/prefers a certain size and only increases if needed. +func (c *cmdCp) calculatePartSize(length, preferredSize int64) (requiredSize int64, err error) { + segC := (length / maxPartCount / (memory.MiB * 64).Int64()) + 1 + requiredSize = segC * (memory.MiB * 64).Int64() + switch { + case preferredSize == 0: + return requiredSize, nil + case requiredSize <= preferredSize: + return preferredSize, nil + default: + return 0, errs.New(fmt.Sprintf("the specified chunk size %s is too small, requires %s or larger", + memory.FormatBytes(preferredSize), memory.FormatBytes(requiredSize))) + } +} + func copyVerbing(source, dest ulloc.Location) (verb string) { return copyVerb(source, dest) + "ing" } @@ -429,11 +488,142 @@ func joinDestWith(dest ulloc.Location, suffix string) ulloc.Location { return dest } -func (c *cmdCp) copy( +func (c *cmdCp) parallelCopy( ctx context.Context, source, dest ulloc.Location, - dst ulfs.WriteHandle, src ulfs.MultiReadHandle, + dst ulfs.MultiWriteHandle, + p int, chunkSize int64, + offset, length int64, + bar *mpb.Bar) error { + + if offset != 0 { + if err := src.SetOffset(offset); err != nil { + return err + } + } + + var ( + limiter = sync2.NewLimiter(p) + es errs.Group + mu sync.Mutex + ) + + ctx, cancel := context.WithCancel(ctx) + + defer func() { _ = src.Close() }() + defer func() { + nocancel := context2.WithoutCancellation(ctx) + timedctx, cancel := context.WithTimeout(nocancel, 5*time.Second) + defer cancel() + _ = dst.Abort(timedctx) + }() + defer cancel() + + addError := func(err error) { + if err == nil { + return + } + + mu.Lock() + defer mu.Unlock() + + es.Add(err) + + // abort all other concurrenty copies + cancel() + } + + var readBufs *ulfs.BytesPool + if p > 1 && chunkSize > 0 && (source.Std() || dest.Std()) { + // Create the read buffer pool only for uploads from stdin and downloads to stdout with parallelism > 1. + readBufs = ulfs.NewBytesPool(int(chunkSize)) + } + + for i := 0; length != 0; i++ { + i := i + + chunk := chunkSize + if length > 0 && chunkSize > length { + chunk = length + } + length -= chunk + + rh, err := src.NextPart(ctx, chunk) + if err != nil { + if !errors.Is(err, io.EOF) { + addError(errs.New("error getting reader for part %d: %v", i, err)) + } + break + } + + wh, err := dst.NextPart(ctx, chunk) + if err != nil { + _ = rh.Close() + + addError(errs.New("error getting writer for part %d: %v", i, err)) + break + } + + ok := limiter.Go(ctx, func() { + defer func() { _ = rh.Close() }() + defer func() { _ = wh.Abort() }() + + if readBufs != nil { + buf := readBufs.Get() + defer readBufs.Put(buf) + + rh = ulfs.NewBufferedReadHandle(ctx, rh, buf) + } + + var w io.Writer = wh + if bar != nil { + bar.SetTotal(rh.Info().ContentLength, false) + bar.EnableTriggerComplete() + pw := bar.ProxyWriter(w) + defer func() { + _ = pw.Close() + }() + w = pw + } + + _, err := sync2.Copy(ctx, w, rh) + if err == nil { + err = wh.Commit() + } + + if err != nil { + // TODO: it would be also nice to use wh.Abort and rh.Close directly + // to avoid some of the waiting that's caused by sync2.Copy. + // + // However, some of the source / destination implementations don't seem + // to have concurrent safe API with that regards. + // + // Also, we may want to check that it actually helps, before implementing it. + + addError(errs.New("failed to %s part %d: %v", copyVerb(source, dest), i, err)) + } + }) + if !ok { + break + } + } + + limiter.Wait() + + // don't try to commit if any error occur + if len(es) == 0 { + es.Add(dst.Commit(ctx)) + } + + return errs.Wrap(combineErrs(es)) +} + +func (c *cmdCp) singleCopy( + ctx context.Context, + source, dest ulloc.Location, + src ulfs.MultiReadHandle, + dst ulfs.MultiWriteHandle, offset, length int64, bar *mpb.Bar) error { @@ -446,16 +636,19 @@ func (c *cmdCp) copy( ctx, cancel := context.WithCancel(ctx) defer cancel() - defer func() { _ = src.Close() }() - defer func() { _ = dst.Abort() }() - rh, err := src.NextPart(ctx, length) if err != nil { return errs.Wrap(err) } defer func() { _ = rh.Close() }() - var w io.Writer = dst + wh, err := dst.NextPart(ctx, length) + if err != nil { + return errs.Wrap(err) + } + defer func() { _ = wh.Abort() }() + + var w io.Writer = wh if bar != nil { bar.SetTotal(rh.Info().ContentLength, false) bar.EnableTriggerComplete() @@ -464,12 +657,19 @@ func (c *cmdCp) copy( w = pw } - _, err = sync2.Copy(ctx, w, rh) - if err == nil { - err = dst.Commit() + if _, err := sync2.Copy(ctx, w, rh); err != nil { + return errs.Wrap(err) } - return errs.Wrap(err) + if err := wh.Commit(); err != nil { + return errs.Wrap(err) + } + + if err := dst.Commit(ctx); err != nil { + return errs.Wrap(err) + } + + return nil } func newProgressBar(progress *mpb.Progress, name string, which, total int) *mpb.Bar { diff --git a/cmd/uplink/cmd_cp_test.go b/cmd/uplink/cmd_cp_test.go index 881e428a6..8dc4a446b 100644 --- a/cmd/uplink/cmd_cp_test.go +++ b/cmd/uplink/cmd_cp_test.go @@ -6,6 +6,9 @@ package main import ( "testing" + "github.com/stretchr/testify/require" + + "storj.io/common/memory" "storj.io/storj/cmd/uplink/ultest" ) @@ -93,6 +96,51 @@ func TestCpDownload(t *testing.T) { }) } +func TestCpPartSize(t *testing.T) { + c := newCmdCp(nil) + + // 1GiB file, should return 64MiB + partSize, err := c.calculatePartSize(memory.GiB.Int64(), c.parallelismChunkSize.Int64()) + require.NoError(t, err) + require.EqualValues(t, memory.MiB*64, partSize) + + // 640 GB file, should return 64MiB. + partSize, err = c.calculatePartSize(memory.GB.Int64()*640, c.parallelismChunkSize.Int64()) + require.NoError(t, err) + require.EqualValues(t, memory.MiB*64, partSize) + + // 640GiB file, should return 128MiB. + partSize, err = c.calculatePartSize(memory.GiB.Int64()*640, c.parallelismChunkSize.Int64()) + require.NoError(t, err) + require.EqualValues(t, memory.MiB*128, partSize) + + // 1TiB file, should return 128MiB. + partSize, err = c.calculatePartSize(memory.TiB.Int64(), c.parallelismChunkSize.Int64()) + require.NoError(t, err) + require.EqualValues(t, memory.MiB*128, partSize) + + // 1.3TiB file, should return 192MiB. + partSize, err = c.calculatePartSize(memory.GiB.Int64()*1300, c.parallelismChunkSize.Int64()) + require.NoError(t, err) + require.EqualValues(t, memory.MiB*192, partSize) + + // should return 1GiB as requested. + partSize, err = c.calculatePartSize(memory.GiB.Int64()*1300, memory.GiB.Int64()) + require.NoError(t, err) + require.EqualValues(t, memory.GiB, partSize) + + // should return 192 MiB and error, since preferred is too low. + partSize, err = c.calculatePartSize(memory.GiB.Int64()*1300, memory.MiB.Int64()) + require.Error(t, err) + require.Equal(t, "the specified chunk size 1.0 MiB is too small, requires 192.0 MiB or larger", err.Error()) + require.Zero(t, partSize) + + // negative length should return 64MiB part size + partSize, err = c.calculatePartSize(-1, c.parallelismChunkSize.Int64()) + require.NoError(t, err) + require.EqualValues(t, memory.MiB*64, partSize) +} + func TestCpUpload(t *testing.T) { state := ultest.Setup(commands, ultest.WithFile("/home/user/file1.txt", "local"), diff --git a/cmd/uplink/ulfs/filesystem.go b/cmd/uplink/ulfs/filesystem.go index e798f5e44..88796c3d0 100644 --- a/cmd/uplink/ulfs/filesystem.go +++ b/cmd/uplink/ulfs/filesystem.go @@ -39,7 +39,7 @@ func (ro *RemoveOptions) isPending() bool { return ro != nil && ro.Pending } type Filesystem interface { Close() error Open(ctx context.Context, loc ulloc.Location) (MultiReadHandle, error) - Create(ctx context.Context, loc ulloc.Location, opts *CreateOptions) (WriteHandle, error) + Create(ctx context.Context, loc ulloc.Location, opts *CreateOptions) (MultiWriteHandle, error) Move(ctx context.Context, source, dest ulloc.Location) error Copy(ctx context.Context, source, dest ulloc.Location) error Remove(ctx context.Context, loc ulloc.Location, opts *RemoveOptions) error @@ -52,7 +52,7 @@ type Filesystem interface { type FilesystemLocal interface { IsLocalDir(ctx context.Context, path string) bool Open(ctx context.Context, path string) (MultiReadHandle, error) - Create(ctx context.Context, path string) (WriteHandle, error) + Create(ctx context.Context, path string) (MultiWriteHandle, error) Move(ctx context.Context, oldpath string, newpath string) error Copy(ctx context.Context, oldpath string, newpath string) error Remove(ctx context.Context, path string, opts *RemoveOptions) error @@ -64,7 +64,7 @@ type FilesystemLocal interface { type FilesystemRemote interface { Close() error Open(ctx context.Context, bucket, key string) (MultiReadHandle, error) - Create(ctx context.Context, bucket, key string, opts *CreateOptions) (WriteHandle, error) + Create(ctx context.Context, bucket, key string, opts *CreateOptions) (MultiWriteHandle, error) Move(ctx context.Context, oldbucket, oldkey string, newbucket, newkey string) error Copy(ctx context.Context, oldbucket, oldkey string, newbucket, newkey string) error Remove(ctx context.Context, bucket, key string, opts *RemoveOptions) error @@ -142,6 +142,18 @@ type ReadHandle interface { // write handles // +// MultiWriteHandle lets one create multiple sequential WriteHandles for +// different sections of something. +// +// The returned WriteHandle will error if data is attempted to be written +// past the provided length. A negative length implies an unknown amount +// of data, and future calls to NextPart will error. +type MultiWriteHandle interface { + NextPart(ctx context.Context, length int64) (WriteHandle, error) + Commit(ctx context.Context) error + Abort(ctx context.Context) error +} + // WriteHandle is anything that can be written to with commit/abort semantics. type WriteHandle interface { io.Writer diff --git a/cmd/uplink/ulfs/handle_file.go b/cmd/uplink/ulfs/handle_file.go index d10d6e941..555a5b6c1 100644 --- a/cmd/uplink/ulfs/handle_file.go +++ b/cmd/uplink/ulfs/handle_file.go @@ -32,37 +32,22 @@ func newOSMultiReadHandle(fh LocalBackendFile) (MultiReadHandle, error) { // type fileGenericWriter struct { - fs LocalBackend - raw LocalBackendFile - done bool -} - -func (f *fileGenericWriter) Write(b []byte) (int, error) { return f.raw.Write(b) } - -func (f *fileGenericWriter) Commit() error { - if f.done { - return errs.New("already commit/aborted") - } - f.done = true - - return f.raw.Close() + fs LocalBackend + raw LocalBackendFile } +func (f *fileGenericWriter) WriteAt(b []byte, off int64) (int, error) { return f.raw.WriteAt(b, off) } +func (f *fileGenericWriter) Commit() error { return f.raw.Close() } func (f *fileGenericWriter) Abort() error { - if f.done { - return errs.New("already commit/aborted") - } - f.done = true - return errs.Combine( f.raw.Close(), f.fs.Remove(f.raw.Name()), ) } -func newOSWriteHandle(fs LocalBackend, fh LocalBackendFile) WriteHandle { - return &fileGenericWriter{ +func newOSMultiWriteHandle(fs LocalBackend, fh LocalBackendFile) MultiWriteHandle { + return NewGenericMultiWriteHandle(&fileGenericWriter{ fs: fs, raw: fh, - } + }) } diff --git a/cmd/uplink/ulfs/handle_generic.go b/cmd/uplink/ulfs/handle_generic.go index 5286da110..a85deb49c 100644 --- a/cmd/uplink/ulfs/handle_generic.go +++ b/cmd/uplink/ulfs/handle_generic.go @@ -133,3 +133,161 @@ func (o *genericReadHandle) Read(p []byte) (int, error) { o.len -= int64(n) return n, err } + +// +// write handles +// + +// GenericWriter is an interface that can be turned into a GenericMultiWriteHandle. +type GenericWriter interface { + io.WriterAt + Commit() error + Abort() error +} + +// GenericMultiWriteHandle implements MultiWriteHandle for *os.Files. +type GenericMultiWriteHandle struct { + w GenericWriter + + mu sync.Mutex + off int64 + tail bool + done bool + abort bool +} + +// NewGenericMultiWriteHandle constructs an *GenericMultiWriteHandle from a GenericWriter. +func NewGenericMultiWriteHandle(w GenericWriter) *GenericMultiWriteHandle { + return &GenericMultiWriteHandle{ + w: w, + } +} + +func (o *GenericMultiWriteHandle) childAbort() { + o.mu.Lock() + defer o.mu.Unlock() + + if !o.done { + o.abort = true + } +} + +func (o *GenericMultiWriteHandle) status() (done, abort bool) { + o.mu.Lock() + defer o.mu.Unlock() + + return o.done, o.abort +} + +// NextPart returns a WriteHandle expecting length bytes to be written to it. +func (o *GenericMultiWriteHandle) NextPart(ctx context.Context, length int64) (WriteHandle, error) { + o.mu.Lock() + defer o.mu.Unlock() + + if o.done { + return nil, errs.New("already closed") + } else if o.tail { + return nil, errs.New("unable to make part after tail part") + } + + w := &genericWriteHandle{ + parent: o, + w: o.w, + off: o.off, + tail: length < 0, + len: length, + } + + if w.tail { + o.tail = true + } else { + o.off += length + } + + return w, nil +} + +// Commit commits the overall GenericMultiWriteHandle. It errors if +// any parts were aborted. +func (o *GenericMultiWriteHandle) Commit(ctx context.Context) error { + o.mu.Lock() + defer o.mu.Unlock() + + if o.done { + return nil + } + o.done = true + + if o.abort { + return errs.Combine( + errs.New("commit failed: not every child was committed"), + o.w.Abort(), + ) + } + + return o.w.Commit() +} + +// Abort aborts the overall GenericMultiWriteHandle. +func (o *GenericMultiWriteHandle) Abort(ctx context.Context) error { + o.mu.Lock() + defer o.mu.Unlock() + + if o.done { + return nil + } + o.done = true + o.abort = true + + return o.w.Abort() +} + +type genericWriteHandle struct { + parent *GenericMultiWriteHandle + w GenericWriter + done bool + off int64 + tail bool + len int64 +} + +func (o *genericWriteHandle) Write(p []byte) (int, error) { + if !o.tail { + if o.len <= 0 { + return 0, errs.New("write past maximum length") + } else if o.len < int64(len(p)) { + p = p[:o.len] + } + } + n, err := o.w.WriteAt(p, o.off) + o.off += int64(n) + if !o.tail { + o.len -= int64(n) + } + return n, err +} + +func (o *genericWriteHandle) Commit() error { + if o.done { + return nil + } + o.done = true + + done, abort := o.parent.status() + if abort { + return errs.New("commit failed: parent write handle aborted") + } else if done { + return errs.New("commit failed: parent write handle done") + } + return nil +} + +func (o *genericWriteHandle) Abort() error { + if o.done { + return nil + } + o.done = true + + o.parent.childAbort() + return nil +} diff --git a/cmd/uplink/ulfs/handle_std.go b/cmd/uplink/ulfs/handle_std.go index 26b260d0f..759059115 100644 --- a/cmd/uplink/ulfs/handle_std.go +++ b/cmd/uplink/ulfs/handle_std.go @@ -140,17 +140,143 @@ func (o *stdReadHandle) Read(p []byte) (int, error) { // write handles // -// stdWriteHandle implements WriteHandle for stdouts. -type stdWriteHandle struct { - stdout io.Writer +// stdMultiWriteHandle implements MultiWriteHandle for stdouts. +type stdMultiWriteHandle struct { + stdout closableWriter + + mu sync.Mutex + next *sync.Mutex + tail bool + done bool } -func newStdWriteHandle(stdout io.Writer) *stdWriteHandle { - return &stdWriteHandle{ - stdout: stdout, +func newStdMultiWriteHandle(stdout io.Writer) *stdMultiWriteHandle { + return &stdMultiWriteHandle{ + stdout: closableWriter{Writer: stdout}, + next: new(sync.Mutex), } } -func (s *stdWriteHandle) Write(b []byte) (int, error) { return s.stdout.Write(b) } -func (s *stdWriteHandle) Commit() error { return nil } -func (s *stdWriteHandle) Abort() error { return nil } +func (s *stdMultiWriteHandle) NextPart(ctx context.Context, length int64) (WriteHandle, error) { + s.mu.Lock() + defer s.mu.Unlock() + + if s.done { + return nil, errs.New("already closed") + } else if s.tail { + return nil, errs.New("unable to make part after tail part") + } + + next := new(sync.Mutex) + next.Lock() + + w := &stdWriteHandle{ + stdout: &s.stdout, + mu: s.next, + next: next, + tail: length < 0, + len: length, + } + + s.tail = w.tail + s.next = next + + return w, nil +} + +func (s *stdMultiWriteHandle) Commit(ctx context.Context) error { + s.mu.Lock() + defer s.mu.Unlock() + + s.done = true + return nil +} + +func (s *stdMultiWriteHandle) Abort(ctx context.Context) error { + s.mu.Lock() + defer s.mu.Unlock() + + s.done = true + return nil +} + +// stdWriteHandle implements WriteHandle for stdouts. +type stdWriteHandle struct { + stdout *closableWriter + mu *sync.Mutex + next *sync.Mutex + tail bool + len int64 +} + +func (s *stdWriteHandle) unlockNext(err error) { + if s.next != nil { + if err != nil { + s.stdout.close(err) + } + s.next.Unlock() + s.next = nil + } +} + +func (s *stdWriteHandle) Write(p []byte) (int, error) { + s.mu.Lock() + defer s.mu.Unlock() + + if !s.tail { + if s.len <= 0 { + return 0, errs.New("write past maximum length") + } else if s.len < int64(len(p)) { + p = p[:s.len] + } + } + + n, err := s.stdout.Write(p) + + if !s.tail { + s.len -= int64(n) + if s.len == 0 { + s.unlockNext(err) + } + } + + return n, err +} + +func (s *stdWriteHandle) Commit() error { + s.mu.Lock() + defer s.mu.Unlock() + + s.len = 0 + s.unlockNext(nil) + + return nil +} + +func (s *stdWriteHandle) Abort() error { + s.mu.Lock() + defer s.mu.Unlock() + + s.len = 0 + s.unlockNext(context.Canceled) + + return nil +} + +type closableWriter struct { + io.Writer + err error +} + +func (out *closableWriter) Write(p []byte) (int, error) { + if out.err != nil { + return 0, out.err + } + n, err := out.Writer.Write(p) + out.err = err + return n, err +} + +func (out *closableWriter) close(err error) { + out.err = err +} diff --git a/cmd/uplink/ulfs/handle_std_test.go b/cmd/uplink/ulfs/handle_std_test.go new file mode 100644 index 000000000..3fbfc5b8c --- /dev/null +++ b/cmd/uplink/ulfs/handle_std_test.go @@ -0,0 +1,101 @@ +// Copyright (C) 2022 Storj Labs, Inc. +// See LICENSE for copying information. + +package ulfs + +import ( + "bytes" + "errors" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/require" + + "storj.io/common/testcontext" + "storj.io/common/testrand" +) + +type writeThrottle struct { + entered chan struct{} + release chan error +} + +type throttledWriter struct { + writex int64 + write []writeThrottle + data bytes.Buffer +} + +func newThrottledWriter(maxWrites int) *throttledWriter { + tw := &throttledWriter{ + writex: 0, + write: make([]writeThrottle, maxWrites), + } + for i := range tw.write { + tw.write[i] = writeThrottle{ + entered: make(chan struct{}), + release: make(chan error, 1), + } + } + return tw +} + +func (tw *throttledWriter) Write(data []byte) (n int, _ error) { + index := atomic.AddInt64(&tw.writex, 1) - 1 + + close(tw.write[index].entered) + forceErr := <-tw.write[index].release + + n, writeErr := tw.data.Write(data) + if writeErr != nil { + return n, writeErr + } + + return n, forceErr +} + +func TestStdMultiWriteAbort(t *testing.T) { + ctx := testcontext.New(t) + + stdout := newThrottledWriter(2) + multi := newStdMultiWriteHandle(stdout) + + head := testrand.Bytes(256) + tail := testrand.Bytes(256) + + part1, err := multi.NextPart(ctx, 256) + require.NoError(t, err) + ctx.Go(func() error { + defer func() { _ = part1.Abort() }() + + _, err := part1.Write(head) + if err == nil { + return errors.New("expected an error") + } + return nil + }) + + part2, err := multi.NextPart(ctx, 256) + require.NoError(t, err) + ctx.Go(func() error { + defer func() { _ = part2.Commit() }() + + // wait for the above part to enter write first + <-stdout.write[0].entered + _, err := part2.Write(tail) + if err == nil { + return errors.New("expected an error") + } + return nil + }) + + // wait until we enter both writes + <-stdout.write[0].entered + + stdout.write[0].release <- errors.New("fail 0") + stdout.write[1].release <- nil + + ctx.Wait() + + require.Equal(t, head, stdout.data.Bytes()) +} diff --git a/cmd/uplink/ulfs/handle_uplink.go b/cmd/uplink/ulfs/handle_uplink.go index a68314b72..328c4e64d 100644 --- a/cmd/uplink/ulfs/handle_uplink.go +++ b/cmd/uplink/ulfs/handle_uplink.go @@ -166,16 +166,127 @@ func (u *uplinkReadHandle) Info() ObjectInfo { return *u.info } // write handles // -type uplinkWriteHandle struct { - upload *uplink.Upload +type uplinkMultiWriteHandle struct { + project *uplink.Project + bucket string + info uplink.UploadInfo + metadata uplink.CustomMetadata + + mu sync.Mutex + tail bool + part uint32 + commitErr *error + abortErr *error } -func newUplinkWriteHandle(upload *uplink.Upload) *uplinkWriteHandle { - return &uplinkWriteHandle{ - upload: upload, +func newUplinkMultiWriteHandle(project *uplink.Project, bucket string, info uplink.UploadInfo, metadata uplink.CustomMetadata) *uplinkMultiWriteHandle { + return &uplinkMultiWriteHandle{ + project: project, + bucket: bucket, + info: info, + metadata: metadata, } } -func (u *uplinkWriteHandle) Write(b []byte) (int, error) { return u.upload.Write(b) } -func (u *uplinkWriteHandle) Commit() error { return u.upload.Commit() } -func (u *uplinkWriteHandle) Abort() error { return u.upload.Abort() } +func (u *uplinkMultiWriteHandle) NextPart(ctx context.Context, length int64) (WriteHandle, error) { + part, err := func() (uint32, error) { + u.mu.Lock() + defer u.mu.Unlock() + + switch { + case u.abortErr != nil: + return 0, errs.New("cannot make part after multipart write has been aborted") + case u.commitErr != nil: + return 0, errs.New("cannot make part after multipart write has been committed") + } + + if u.tail { + return 0, errs.New("unable to make part after tail part") + } + u.tail = length < 0 + + u.part++ + return u.part, nil + }() + if err != nil { + return nil, err + } + + ul, err := u.project.UploadPart(ctx, u.bucket, u.info.Key, u.info.UploadID, part) + if err != nil { + return nil, err + } + + return &uplinkWriteHandle{ + ul: ul, + tail: length < 0, + len: length, + }, nil +} + +func (u *uplinkMultiWriteHandle) Commit(ctx context.Context) error { + u.mu.Lock() + defer u.mu.Unlock() + + switch { + case u.abortErr != nil: + return errs.New("cannot commit an aborted multipart write") + case u.commitErr != nil: + return *u.commitErr + } + + _, err := u.project.CommitUpload(ctx, u.bucket, u.info.Key, u.info.UploadID, &uplink.CommitUploadOptions{ + CustomMetadata: u.metadata, + }) + u.commitErr = &err + return err +} + +func (u *uplinkMultiWriteHandle) Abort(ctx context.Context) error { + u.mu.Lock() + defer u.mu.Unlock() + + switch { + case u.abortErr != nil: + return *u.abortErr + case u.commitErr != nil: + return errs.New("cannot abort a committed multipart write") + } + + err := u.project.AbortUpload(ctx, u.bucket, u.info.Key, u.info.UploadID) + u.abortErr = &err + return err +} + +// uplinkWriteHandle implements writeHandle for *uplink.Uploads. +type uplinkWriteHandle struct { + ul *uplink.PartUpload + tail bool + len int64 +} + +func (u *uplinkWriteHandle) Write(p []byte) (int, error) { + if !u.tail { + if u.len <= 0 { + return 0, errs.New("write past maximum length") + } else if u.len < int64(len(p)) { + p = p[:u.len] + } + } + + n, err := u.ul.Write(p) + + if !u.tail { + u.len -= int64(n) + } + + return n, err +} + +func (u *uplinkWriteHandle) Commit() error { + return u.ul.Commit() +} + +func (u *uplinkWriteHandle) Abort() error { + return u.ul.Abort() +} diff --git a/cmd/uplink/ulfs/local.go b/cmd/uplink/ulfs/local.go index 9720b9608..d8c2fcee4 100644 --- a/cmd/uplink/ulfs/local.go +++ b/cmd/uplink/ulfs/local.go @@ -19,8 +19,8 @@ import ( // LocalBackendFile represents a file in the filesystem. type LocalBackendFile interface { io.Closer - io.Writer io.ReaderAt + io.WriterAt Name() string Stat() (os.FileInfo, error) Readdir(int) ([]os.FileInfo, error) @@ -58,7 +58,7 @@ func (l *Local) Open(ctx context.Context, path string) (MultiReadHandle, error) } // Create makes any directories necessary to create a file at path and returns a WriteHandle. -func (l *Local) Create(ctx context.Context, path string) (WriteHandle, error) { +func (l *Local) Create(ctx context.Context, path string) (MultiWriteHandle, error) { fi, err := l.fs.Stat(path) if err != nil && !os.IsNotExist(err) { return nil, errs.Wrap(err) @@ -75,7 +75,7 @@ func (l *Local) Create(ctx context.Context, path string) (WriteHandle, error) { if err != nil { return nil, errs.Wrap(err) } - return newOSWriteHandle(l.fs, fh), nil + return newOSMultiWriteHandle(l.fs, fh), nil } // Move moves file to provided path. diff --git a/cmd/uplink/ulfs/local_backend_mem.go b/cmd/uplink/ulfs/local_backend_mem.go index a020619ae..9f7a24911 100644 --- a/cmd/uplink/ulfs/local_backend_mem.go +++ b/cmd/uplink/ulfs/local_backend_mem.go @@ -200,9 +200,11 @@ func (mf *memFile) ReadAt(p []byte, off int64) (int, error) { return copy(p, mf.buf[off:]), nil } -func (mf *memFile) Write(p []byte) (int, error) { - mf.buf = append(mf.buf, p...) - return len(p), nil +func (mf *memFile) WriteAt(p []byte, off int64) (int, error) { + if delta := (off + int64(len(p))) - int64(len(mf.buf)); delta > 0 { + mf.buf = append(mf.buf, make([]byte, delta)...) + } + return copy(mf.buf[off:], p), nil } func (mf *memFile) Stat() (os.FileInfo, error) { @@ -254,7 +256,7 @@ func (md *memDir) ReadAt(p []byte, off int64) (int, error) { return 0, errs.New("readat on directory") } -func (md *memDir) Write(p []byte) (int, error) { +func (md *memDir) WriteAt(p []byte, off int64) (int, error) { return 0, errs.New("writeat on directory") } diff --git a/cmd/uplink/ulfs/mixed.go b/cmd/uplink/ulfs/mixed.go index 9b77e9b17..e67c7ffa7 100644 --- a/cmd/uplink/ulfs/mixed.go +++ b/cmd/uplink/ulfs/mixed.go @@ -42,13 +42,13 @@ func (m *Mixed) Open(ctx context.Context, loc ulloc.Location) (MultiReadHandle, } // Create returns a WriteHandle to either a local file, remote object, or stdout. -func (m *Mixed) Create(ctx context.Context, loc ulloc.Location, opts *CreateOptions) (WriteHandle, error) { +func (m *Mixed) Create(ctx context.Context, loc ulloc.Location, opts *CreateOptions) (MultiWriteHandle, error) { if bucket, key, ok := loc.RemoteParts(); ok { return m.remote.Create(ctx, bucket, key, opts) } else if path, ok := loc.LocalParts(); ok { return m.local.Create(ctx, path) } - return newStdWriteHandle(clingy.Stdout(ctx)), nil + return newStdMultiWriteHandle(clingy.Stdout(ctx)), nil } // Move moves either a local file or remote object. diff --git a/cmd/uplink/ulfs/remote.go b/cmd/uplink/ulfs/remote.go index 1405f6d1d..5a32b37f3 100644 --- a/cmd/uplink/ulfs/remote.go +++ b/cmd/uplink/ulfs/remote.go @@ -46,22 +46,23 @@ func (r *Remote) Stat(ctx context.Context, bucket, key string) (*ObjectInfo, err } // Create returns a MultiWriteHandle for the object identified by a given bucket and key. -func (r *Remote) Create(ctx context.Context, bucket, key string, opts *CreateOptions) (WriteHandle, error) { - upload, err := r.project.UploadObject(ctx, bucket, key, &uplink.UploadOptions{ +func (r *Remote) Create(ctx context.Context, bucket, key string, opts *CreateOptions) (MultiWriteHandle, error) { + var customMetadata uplink.CustomMetadata + if opts.Metadata != nil { + customMetadata = uplink.CustomMetadata(opts.Metadata) + + if err := customMetadata.Verify(); err != nil { + return nil, err + } + } + + info, err := r.project.BeginUpload(ctx, bucket, key, &uplink.UploadOptions{ Expires: opts.Expires, }) if err != nil { return nil, err } - - if opts.Metadata != nil { - if err := upload.SetCustomMetadata(ctx, uplink.CustomMetadata(opts.Metadata)); err != nil { - _ = upload.Abort() - return nil, err - } - } - - return newUplinkWriteHandle(upload), nil + return newUplinkMultiWriteHandle(r.project, bucket, info, customMetadata), nil } // Move moves object to provided key and bucket. diff --git a/cmd/uplink/ultest/filesystem.go b/cmd/uplink/ultest/filesystem.go index 360b6b44f..03acdb6ee 100644 --- a/cmd/uplink/ultest/filesystem.go +++ b/cmd/uplink/ultest/filesystem.go @@ -112,7 +112,7 @@ func (rfs *remoteFilesystem) Open(ctx context.Context, bucket, key string) (ulfs return newMultiReadHandle(mf.contents), nil } -func (rfs *remoteFilesystem) Create(ctx context.Context, bucket, key string, opts *ulfs.CreateOptions) (_ ulfs.WriteHandle, err error) { +func (rfs *remoteFilesystem) Create(ctx context.Context, bucket, key string, opts *ulfs.CreateOptions) (_ ulfs.MultiWriteHandle, err error) { rfs.mu.Lock() defer rfs.mu.Unlock() @@ -140,7 +140,7 @@ func (rfs *remoteFilesystem) Create(ctx context.Context, bucket, key string, opt rfs.pending[loc] = append(rfs.pending[loc], wh) - return wh, nil + return ulfs.NewGenericMultiWriteHandle(wh), nil } func (rfs *remoteFilesystem) Move(ctx context.Context, oldbucket, oldkey string, newbucket, newkey string) error { @@ -282,12 +282,15 @@ type memWriteHandle struct { done bool } -func (b *memWriteHandle) Write(p []byte) (int, error) { +func (b *memWriteHandle) WriteAt(p []byte, off int64) (int, error) { if b.done { return 0, errs.New("write to closed handle") } - b.buf = append(b.buf, p...) - return len(p), nil + end := int64(len(p)) + off + if grow := end - int64(len(b.buf)); grow > 0 { + b.buf = append(b.buf, make([]byte, grow)...) + } + return copy(b.buf[off:], p), nil } func (b *memWriteHandle) Commit() error { diff --git a/cmd/uplink/ultest/setup.go b/cmd/uplink/ultest/setup.go index 376fa5788..6ba68caf2 100644 --- a/cmd/uplink/ultest/setup.go +++ b/cmd/uplink/ultest/setup.go @@ -199,7 +199,11 @@ func WithFile(location string, contents ...string) ExecuteOption { cs.rfs.ensureBucket(bucket) } - wh, err := cs.fs.Create(ctx, loc, nil) + mwh, err := cs.fs.Create(ctx, loc, nil) + require.NoError(t, err) + defer func() { _ = mwh.Abort(ctx) }() + + wh, err := mwh.NextPart(ctx, -1) require.NoError(t, err) defer func() { _ = wh.Abort() }() @@ -213,6 +217,7 @@ func WithFile(location string, contents ...string) ExecuteOption { } require.NoError(t, wh.Commit()) + require.NoError(t, mwh.Commit(ctx)) }} } diff --git a/go.mod b/go.mod index 08b8db2a2..3be8e16ba 100644 --- a/go.mod +++ b/go.mod @@ -43,7 +43,7 @@ require ( github.com/vivint/infectious v0.0.0-20200605153912-25a574ae18a3 github.com/zeebo/assert v1.3.1 github.com/zeebo/blake3 v0.2.3 - github.com/zeebo/clingy v0.0.0-20230301225531-f2d4117c8e8c + github.com/zeebo/clingy v0.0.0-20230602044025-906be850f10d github.com/zeebo/errs v1.3.0 github.com/zeebo/ini v0.0.0-20210514163846-cc8fbd8d9599 github.com/zyedidia/generic v1.2.1 diff --git a/go.sum b/go.sum index 0a8431280..e20f35d96 100644 --- a/go.sum +++ b/go.sum @@ -605,8 +605,8 @@ github.com/zeebo/assert v1.3.1 h1:vukIABvugfNMZMQO1ABsyQDJDTVQbn+LWSMy1ol1h6A= github.com/zeebo/assert v1.3.1/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/blake3 v0.2.3 h1:TFoLXsjeXqRNFxSbk35Dk4YtszE/MQQGK10BH4ptoTg= github.com/zeebo/blake3 v0.2.3/go.mod h1:mjJjZpnsyIVtVgTOSpJ9vmRE4wgDeyt2HU3qXvvKCaQ= -github.com/zeebo/clingy v0.0.0-20230301225531-f2d4117c8e8c h1:bE9vXPFKa9wkCCq1HJi2Ms4pWuBoIKQEMe6CZzu/TKE= -github.com/zeebo/clingy v0.0.0-20230301225531-f2d4117c8e8c/go.mod h1:MHEhXvEfewflU7SSVKHI7nkdU+fpyxZ5XPPzj+5gYNw= +github.com/zeebo/clingy v0.0.0-20230602044025-906be850f10d h1:INuKdI3R6zDA8UEgbBxDFkb7qwO/nSvnJRdrBCMW+To= +github.com/zeebo/clingy v0.0.0-20230602044025-906be850f10d/go.mod h1:MHEhXvEfewflU7SSVKHI7nkdU+fpyxZ5XPPzj+5gYNw= github.com/zeebo/errs v1.2.2/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4= github.com/zeebo/errs v1.3.0 h1:hmiaKqgYZzcVgRL1Vkc1Mn2914BbzB0IBxs+ebeutGs= github.com/zeebo/errs v1.3.0/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4=