From 54ef1c8ca23cac8c4ab803f6123ab2d06fb84d4c Mon Sep 17 00:00:00 2001 From: Jeff Wendling Date: Thu, 6 Apr 2023 13:03:02 -0400 Subject: [PATCH] cmd/uplink: use new upload code path the parallelism and parallelism-chunk-size flags which used to control how many parts to split a segment into and many to perform in parallel are now deprecated and replaced by maximum-concurrent-pieces and long-tail-margin. now, for an individual transfer, the total number of piece uploads that transfer will perform is controlled by maximum-concurrent-pieces, and segments within that transfer will automatically be performed in parallel. so if you used to set your parallelism to n, a good value for the pieces might be something approximately like 130*n, and the parallelism-chunk-size is unnecessary. Change-Id: Ibe724ca70b07eba89dad551eb612a1db988b18b9 --- cmd/uplink/cmd_cp.go | 201 ++++++++------------------- cmd/uplink/cmd_cp_test.go | 48 ------- cmd/uplink/external_project.go | 5 + cmd/uplink/ulext/external.go | 11 +- cmd/uplink/ulfs/filesystem.go | 18 +-- cmd/uplink/ulfs/handle_file.go | 29 +++- cmd/uplink/ulfs/handle_generic.go | 158 --------------------- cmd/uplink/ulfs/handle_std.go | 148 ++------------------ cmd/uplink/ulfs/handle_std_test.go | 101 -------------- cmd/uplink/ulfs/handle_uplink.go | 125 +---------------- cmd/uplink/ulfs/local.go | 6 +- cmd/uplink/ulfs/local_backend_mem.go | 10 +- cmd/uplink/ulfs/mixed.go | 4 +- cmd/uplink/ulfs/remote.go | 23 ++- cmd/uplink/ultest/filesystem.go | 13 +- cmd/uplink/ultest/setup.go | 7 +- go.mod | 2 +- go.sum | 4 +- 18 files changed, 142 insertions(+), 771 deletions(-) delete mode 100644 cmd/uplink/ulfs/handle_std_test.go diff --git a/cmd/uplink/cmd_cp.go b/cmd/uplink/cmd_cp.go index 7650e520d..0160b0082 100644 --- a/cmd/uplink/cmd_cp.go +++ b/cmd/uplink/cmd_cp.go @@ -5,7 +5,6 @@ package main import ( "context" - "errors" "fmt" "io" "strconv" @@ -19,7 +18,6 @@ import ( "github.com/zeebo/clingy" "github.com/zeebo/errs" - "storj.io/common/context2" "storj.io/common/fpath" "storj.io/common/memory" "storj.io/common/rpc/rpcpool" @@ -27,6 +25,8 @@ import ( "storj.io/storj/cmd/uplink/ulext" "storj.io/storj/cmd/uplink/ulfs" "storj.io/storj/cmd/uplink/ulloc" + "storj.io/uplink/private/eestream/scheduler" + "storj.io/uplink/private/testuplink" ) type cmdCp struct { @@ -44,13 +44,14 @@ type cmdCp struct { parallelism int parallelismChunkSize memory.Size + maximumConcurrentPieces int + longTailMargin int + inmemoryEC bool locs []ulloc.Location } -const maxPartCount int64 = 10000 - func newCmdCp(ex ulext.External) *cmdCp { return &cmdCp{ex: ex} } @@ -79,7 +80,8 @@ func (c *cmdCp) Setup(params clingy.Parameters) { ).(bool) c.byteRange = params.Flag("range", "Downloads the specified range bytes of an object. For more information about the HTTP Range header, see https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35", "").(string) - c.parallelism = params.Flag("parallelism", "Controls how many parallel chunks to upload/download from a file", 1, + c.parallelism = params.Flag("parallelism", "Deprecated", 1, + clingy.Hidden, clingy.Short('p'), clingy.Transform(strconv.Atoi), clingy.Transform(func(n int) (int, error) { @@ -89,7 +91,8 @@ func (c *cmdCp) Setup(params clingy.Parameters) { return n, nil }), ).(int) - c.parallelismChunkSize = params.Flag("parallelism-chunk-size", "Set the size of the chunks for parallelism, 0 means automatic adjustment", memory.Size(0), + c.parallelismChunkSize = params.Flag("parallelism-chunk-size", "Deprecated", memory.Size(0), + clingy.Hidden, clingy.Transform(memory.ParseString), clingy.Transform(func(n int64) (memory.Size, error) { if n < 0 { @@ -99,6 +102,16 @@ func (c *cmdCp) Setup(params clingy.Parameters) { }), ).(memory.Size) + def := testuplink.DefaultConcurrentSegmentUploadsConfig() + c.maximumConcurrentPieces = params.Flag("maximum-concurrent-pieces", "Maximum concurrent pieces to upload at once per transfer", def.SchedulerOptions.MaximumConcurrent, + clingy.Transform(strconv.Atoi), + clingy.Advanced, + ).(int) + c.longTailMargin = params.Flag("long-tail-margin", "How many extra pieces to upload and cancel per segment", def.LongTailMargin, + clingy.Transform(strconv.Atoi), + clingy.Advanced, + ).(int) + c.inmemoryEC = params.Flag("inmemory-erasure-coding", "Keep erasure-coded pieces in-memory instead of writing them on the disk during upload", false, clingy.Transform(strconv.ParseBool), clingy.Boolean, @@ -124,11 +137,18 @@ func (c *cmdCp) Execute(ctx context.Context) error { return errs.New("must have at least one source and destination path") } - fs, err := c.ex.OpenFilesystem(ctx, c.access, ulext.ConnectionPoolOptions(rpcpool.Options{ - Capacity: 100 * c.parallelism, - KeyCapacity: 5, - IdleExpiration: 2 * time.Minute, - })) + fs, err := c.ex.OpenFilesystem(ctx, c.access, + ulext.ConcurrentSegmentUploadsConfig(testuplink.ConcurrentSegmentUploadsConfig{ + SchedulerOptions: scheduler.Options{ + MaximumConcurrent: c.maximumConcurrentPieces, + }, + LongTailMargin: c.longTailMargin, + }), + ulext.ConnectionPoolOptions(rpcpool.Options{ + Capacity: c.maximumConcurrentPieces, + KeyCapacity: 5, + IdleExpiration: 2 * time.Minute, + })) if err != nil { return err } @@ -340,47 +360,24 @@ func (c *cmdCp) copyFile(ctx context.Context, fs ulfs.Filesystem, source, dest u } defer func() { _ = mrh.Close() }() - mwh, err := fs.Create(ctx, dest, &ulfs.CreateOptions{ + wh, err := fs.Create(ctx, dest, &ulfs.CreateOptions{ Expires: c.expires, Metadata: c.metadata, }) if err != nil { return err } - defer func() { _ = mwh.Abort(ctx) }() + defer func() { _ = wh.Abort() }() - partSize, err := c.calculatePartSize(mrh.Length(), c.parallelismChunkSize.Int64()) - if err != nil { - return err - } - - return errs.Wrap(c.parallelCopy( + return errs.Wrap(c.copy( ctx, source, dest, - mwh, mrh, - c.parallelism, partSize, + wh, mrh, offset, length, bar, )) } -// calculatePartSize returns the needed part size in order to upload the file with size of 'length'. -// It hereby respects if the client requests/prefers a certain size and only increases if needed. -// If length is -1 (ie. stdin input), then this will limit to 64MiB and the total file length to 640GB. -func (c *cmdCp) calculatePartSize(length, preferredSize int64) (requiredSize int64, err error) { - segC := (length / maxPartCount / (memory.MiB * 64).Int64()) + 1 - requiredSize = segC * (memory.MiB * 64).Int64() - switch { - case preferredSize == 0: - return requiredSize, nil - case requiredSize <= preferredSize: - return preferredSize, nil - default: - return 0, errs.New(fmt.Sprintf("the specified chunk size %s is too small, requires %s or larger", - memory.FormatBytes(preferredSize), memory.FormatBytes(requiredSize))) - } -} - func copyVerbing(source, dest ulloc.Location) (verb string) { return copyVerb(source, dest) + "ing" } @@ -408,12 +405,11 @@ func joinDestWith(dest ulloc.Location, suffix string) ulloc.Location { return dest } -func (c *cmdCp) parallelCopy( +func (c *cmdCp) copy( ctx context.Context, source, dest ulloc.Location, - dst ulfs.MultiWriteHandle, + dst ulfs.WriteHandle, src ulfs.MultiReadHandle, - p int, chunkSize int64, offset, length int64, bar *mpb.Bar) error { @@ -423,120 +419,33 @@ func (c *cmdCp) parallelCopy( } } - var ( - limiter = sync2.NewLimiter(p) - es errs.Group - mu sync.Mutex - ) - ctx, cancel := context.WithCancel(ctx) - - defer func() { _ = src.Close() }() - defer func() { - nocancel := context2.WithoutCancellation(ctx) - timedctx, cancel := context.WithTimeout(nocancel, 5*time.Second) - defer cancel() - _ = dst.Abort(timedctx) - }() defer cancel() - addError := func(err error) { - if err == nil { - return - } + defer func() { _ = src.Close() }() + defer func() { _ = dst.Abort() }() - mu.Lock() - defer mu.Unlock() + rh, err := src.NextPart(ctx, length) + if err != nil { + return errs.Wrap(err) + } + defer func() { _ = rh.Close() }() - es.Add(err) - - // abort all other concurrenty copies - cancel() + var w io.Writer = dst + if bar != nil { + bar.SetTotal(rh.Info().ContentLength, false) + bar.EnableTriggerComplete() + pw := bar.ProxyWriter(w) + defer func() { _ = pw.Close() }() + w = pw } - var readBufs *ulfs.BytesPool - if p > 1 && (source.Std() || dest.Std()) { - // Create the read buffer pool only for uploads from stdin and downloads to stdout with parallelism > 1. - readBufs = ulfs.NewBytesPool(int(chunkSize)) + _, err = sync2.Copy(ctx, w, rh) + if err == nil { + err = dst.Commit() } - for i := 0; length != 0; i++ { - i := i - - chunk := chunkSize - if length > 0 && chunkSize > length { - chunk = length - } - length -= chunk - - rh, err := src.NextPart(ctx, chunk) - if err != nil { - if !errors.Is(err, io.EOF) { - addError(errs.New("error getting reader for part %d: %v", i, err)) - } - break - } - - wh, err := dst.NextPart(ctx, chunk) - if err != nil { - _ = rh.Close() - - addError(errs.New("error getting writer for part %d: %v", i, err)) - break - } - - ok := limiter.Go(ctx, func() { - defer func() { _ = rh.Close() }() - defer func() { _ = wh.Abort() }() - - if readBufs != nil { - buf := readBufs.Get() - defer readBufs.Put(buf) - - rh = ulfs.NewBufferedReadHandle(ctx, rh, buf) - } - - var w io.Writer = wh - if bar != nil { - bar.SetTotal(rh.Info().ContentLength, false) - bar.EnableTriggerComplete() - pw := bar.ProxyWriter(w) - defer func() { - _ = pw.Close() - }() - w = pw - } - - _, err := sync2.Copy(ctx, w, rh) - if err == nil { - err = wh.Commit() - } - - if err != nil { - // TODO: it would be also nice to use wh.Abort and rh.Close directly - // to avoid some of the waiting that's caused by sync2.Copy. - // - // However, some of the source / destination implementations don't seem - // to have concurrent safe API with that regards. - // - // Also, we may want to check that it actually helps, before implementing it. - - addError(errs.New("failed to %s part %d: %v", copyVerb(source, dest), i, err)) - } - }) - if !ok { - break - } - } - - limiter.Wait() - - // don't try to commit if any error occur - if len(es) == 0 { - es.Add(dst.Commit(ctx)) - } - - return errs.Wrap(combineErrs(es)) + return errs.Wrap(err) } func newProgressBar(progress *mpb.Progress, name string, which, total int) *mpb.Bar { diff --git a/cmd/uplink/cmd_cp_test.go b/cmd/uplink/cmd_cp_test.go index 8dc4a446b..881e428a6 100644 --- a/cmd/uplink/cmd_cp_test.go +++ b/cmd/uplink/cmd_cp_test.go @@ -6,9 +6,6 @@ package main import ( "testing" - "github.com/stretchr/testify/require" - - "storj.io/common/memory" "storj.io/storj/cmd/uplink/ultest" ) @@ -96,51 +93,6 @@ func TestCpDownload(t *testing.T) { }) } -func TestCpPartSize(t *testing.T) { - c := newCmdCp(nil) - - // 1GiB file, should return 64MiB - partSize, err := c.calculatePartSize(memory.GiB.Int64(), c.parallelismChunkSize.Int64()) - require.NoError(t, err) - require.EqualValues(t, memory.MiB*64, partSize) - - // 640 GB file, should return 64MiB. - partSize, err = c.calculatePartSize(memory.GB.Int64()*640, c.parallelismChunkSize.Int64()) - require.NoError(t, err) - require.EqualValues(t, memory.MiB*64, partSize) - - // 640GiB file, should return 128MiB. - partSize, err = c.calculatePartSize(memory.GiB.Int64()*640, c.parallelismChunkSize.Int64()) - require.NoError(t, err) - require.EqualValues(t, memory.MiB*128, partSize) - - // 1TiB file, should return 128MiB. - partSize, err = c.calculatePartSize(memory.TiB.Int64(), c.parallelismChunkSize.Int64()) - require.NoError(t, err) - require.EqualValues(t, memory.MiB*128, partSize) - - // 1.3TiB file, should return 192MiB. - partSize, err = c.calculatePartSize(memory.GiB.Int64()*1300, c.parallelismChunkSize.Int64()) - require.NoError(t, err) - require.EqualValues(t, memory.MiB*192, partSize) - - // should return 1GiB as requested. - partSize, err = c.calculatePartSize(memory.GiB.Int64()*1300, memory.GiB.Int64()) - require.NoError(t, err) - require.EqualValues(t, memory.GiB, partSize) - - // should return 192 MiB and error, since preferred is too low. - partSize, err = c.calculatePartSize(memory.GiB.Int64()*1300, memory.MiB.Int64()) - require.Error(t, err) - require.Equal(t, "the specified chunk size 1.0 MiB is too small, requires 192.0 MiB or larger", err.Error()) - require.Zero(t, partSize) - - // negative length should return 64MiB part size - partSize, err = c.calculatePartSize(-1, c.parallelismChunkSize.Int64()) - require.NoError(t, err) - require.EqualValues(t, memory.MiB*64, partSize) -} - func TestCpUpload(t *testing.T) { state := ultest.Setup(commands, ultest.WithFile("/home/user/file1.txt", "local"), diff --git a/cmd/uplink/external_project.go b/cmd/uplink/external_project.go index f75b3cdf4..bbf666192 100644 --- a/cmd/uplink/external_project.go +++ b/cmd/uplink/external_project.go @@ -11,6 +11,7 @@ import ( "storj.io/storj/cmd/uplink/ulfs" "storj.io/uplink" privateAccess "storj.io/uplink/private/access" + "storj.io/uplink/private/testuplink" "storj.io/uplink/private/transport" ) @@ -48,5 +49,9 @@ func (ex *external) OpenProject(ctx context.Context, accessName string, options } } + if opts.ConcurrentSegmentUploadsConfig != (testuplink.ConcurrentSegmentUploadsConfig{}) { + ctx = testuplink.WithConcurrentSegmentUploadsConfig(ctx, opts.ConcurrentSegmentUploadsConfig) + } + return config.OpenProject(ctx, access) } diff --git a/cmd/uplink/ulext/external.go b/cmd/uplink/ulext/external.go index 39c1ddb80..cbbb815be 100644 --- a/cmd/uplink/ulext/external.go +++ b/cmd/uplink/ulext/external.go @@ -18,6 +18,7 @@ import ( "storj.io/common/rpc/rpcpool" "storj.io/storj/cmd/uplink/ulfs" "storj.io/uplink" + "storj.io/uplink/private/testuplink" ) // External is the interface for all of the ways that the uplink command may interact with @@ -42,8 +43,9 @@ type External interface { // Options contains all of the possible options for opening a filesystem or project. type Options struct { - EncryptionBypass bool - ConnectionPoolOptions rpcpool.Options + EncryptionBypass bool + ConnectionPoolOptions rpcpool.Options + ConcurrentSegmentUploadsConfig testuplink.ConcurrentSegmentUploadsConfig } // LoadOptions takes a slice of Option values and returns a filled out Options struct. @@ -69,6 +71,11 @@ func ConnectionPoolOptions(options rpcpool.Options) Option { return Option{apply: func(opt *Options) { opt.ConnectionPoolOptions = options }} } +// ConcurrentSegmentUploadsConfig will initialize the concurrent segment uploads config with config. +func ConcurrentSegmentUploadsConfig(config testuplink.ConcurrentSegmentUploadsConfig) Option { + return Option{apply: func(opt *Options) { opt.ConcurrentSegmentUploadsConfig = config }} +} + // RegisterAccess registers an access grant with a Gateway Authorization Service. func RegisterAccess(ctx context.Context, access *uplink.Access, authService string, public bool, timeout time.Duration) (accessKey, secretKey, endpoint string, err error) { if authService == "" { diff --git a/cmd/uplink/ulfs/filesystem.go b/cmd/uplink/ulfs/filesystem.go index 88796c3d0..e798f5e44 100644 --- a/cmd/uplink/ulfs/filesystem.go +++ b/cmd/uplink/ulfs/filesystem.go @@ -39,7 +39,7 @@ func (ro *RemoveOptions) isPending() bool { return ro != nil && ro.Pending } type Filesystem interface { Close() error Open(ctx context.Context, loc ulloc.Location) (MultiReadHandle, error) - Create(ctx context.Context, loc ulloc.Location, opts *CreateOptions) (MultiWriteHandle, error) + Create(ctx context.Context, loc ulloc.Location, opts *CreateOptions) (WriteHandle, error) Move(ctx context.Context, source, dest ulloc.Location) error Copy(ctx context.Context, source, dest ulloc.Location) error Remove(ctx context.Context, loc ulloc.Location, opts *RemoveOptions) error @@ -52,7 +52,7 @@ type Filesystem interface { type FilesystemLocal interface { IsLocalDir(ctx context.Context, path string) bool Open(ctx context.Context, path string) (MultiReadHandle, error) - Create(ctx context.Context, path string) (MultiWriteHandle, error) + Create(ctx context.Context, path string) (WriteHandle, error) Move(ctx context.Context, oldpath string, newpath string) error Copy(ctx context.Context, oldpath string, newpath string) error Remove(ctx context.Context, path string, opts *RemoveOptions) error @@ -64,7 +64,7 @@ type FilesystemLocal interface { type FilesystemRemote interface { Close() error Open(ctx context.Context, bucket, key string) (MultiReadHandle, error) - Create(ctx context.Context, bucket, key string, opts *CreateOptions) (MultiWriteHandle, error) + Create(ctx context.Context, bucket, key string, opts *CreateOptions) (WriteHandle, error) Move(ctx context.Context, oldbucket, oldkey string, newbucket, newkey string) error Copy(ctx context.Context, oldbucket, oldkey string, newbucket, newkey string) error Remove(ctx context.Context, bucket, key string, opts *RemoveOptions) error @@ -142,18 +142,6 @@ type ReadHandle interface { // write handles // -// MultiWriteHandle lets one create multiple sequential WriteHandles for -// different sections of something. -// -// The returned WriteHandle will error if data is attempted to be written -// past the provided length. A negative length implies an unknown amount -// of data, and future calls to NextPart will error. -type MultiWriteHandle interface { - NextPart(ctx context.Context, length int64) (WriteHandle, error) - Commit(ctx context.Context) error - Abort(ctx context.Context) error -} - // WriteHandle is anything that can be written to with commit/abort semantics. type WriteHandle interface { io.Writer diff --git a/cmd/uplink/ulfs/handle_file.go b/cmd/uplink/ulfs/handle_file.go index 555a5b6c1..d10d6e941 100644 --- a/cmd/uplink/ulfs/handle_file.go +++ b/cmd/uplink/ulfs/handle_file.go @@ -32,22 +32,37 @@ func newOSMultiReadHandle(fh LocalBackendFile) (MultiReadHandle, error) { // type fileGenericWriter struct { - fs LocalBackend - raw LocalBackendFile + fs LocalBackend + raw LocalBackendFile + done bool +} + +func (f *fileGenericWriter) Write(b []byte) (int, error) { return f.raw.Write(b) } + +func (f *fileGenericWriter) Commit() error { + if f.done { + return errs.New("already commit/aborted") + } + f.done = true + + return f.raw.Close() } -func (f *fileGenericWriter) WriteAt(b []byte, off int64) (int, error) { return f.raw.WriteAt(b, off) } -func (f *fileGenericWriter) Commit() error { return f.raw.Close() } func (f *fileGenericWriter) Abort() error { + if f.done { + return errs.New("already commit/aborted") + } + f.done = true + return errs.Combine( f.raw.Close(), f.fs.Remove(f.raw.Name()), ) } -func newOSMultiWriteHandle(fs LocalBackend, fh LocalBackendFile) MultiWriteHandle { - return NewGenericMultiWriteHandle(&fileGenericWriter{ +func newOSWriteHandle(fs LocalBackend, fh LocalBackendFile) WriteHandle { + return &fileGenericWriter{ fs: fs, raw: fh, - }) + } } diff --git a/cmd/uplink/ulfs/handle_generic.go b/cmd/uplink/ulfs/handle_generic.go index a85deb49c..5286da110 100644 --- a/cmd/uplink/ulfs/handle_generic.go +++ b/cmd/uplink/ulfs/handle_generic.go @@ -133,161 +133,3 @@ func (o *genericReadHandle) Read(p []byte) (int, error) { o.len -= int64(n) return n, err } - -// -// write handles -// - -// GenericWriter is an interface that can be turned into a GenericMultiWriteHandle. -type GenericWriter interface { - io.WriterAt - Commit() error - Abort() error -} - -// GenericMultiWriteHandle implements MultiWriteHandle for *os.Files. -type GenericMultiWriteHandle struct { - w GenericWriter - - mu sync.Mutex - off int64 - tail bool - done bool - abort bool -} - -// NewGenericMultiWriteHandle constructs an *GenericMultiWriteHandle from a GenericWriter. -func NewGenericMultiWriteHandle(w GenericWriter) *GenericMultiWriteHandle { - return &GenericMultiWriteHandle{ - w: w, - } -} - -func (o *GenericMultiWriteHandle) childAbort() { - o.mu.Lock() - defer o.mu.Unlock() - - if !o.done { - o.abort = true - } -} - -func (o *GenericMultiWriteHandle) status() (done, abort bool) { - o.mu.Lock() - defer o.mu.Unlock() - - return o.done, o.abort -} - -// NextPart returns a WriteHandle expecting length bytes to be written to it. -func (o *GenericMultiWriteHandle) NextPart(ctx context.Context, length int64) (WriteHandle, error) { - o.mu.Lock() - defer o.mu.Unlock() - - if o.done { - return nil, errs.New("already closed") - } else if o.tail { - return nil, errs.New("unable to make part after tail part") - } - - w := &genericWriteHandle{ - parent: o, - w: o.w, - off: o.off, - tail: length < 0, - len: length, - } - - if w.tail { - o.tail = true - } else { - o.off += length - } - - return w, nil -} - -// Commit commits the overall GenericMultiWriteHandle. It errors if -// any parts were aborted. -func (o *GenericMultiWriteHandle) Commit(ctx context.Context) error { - o.mu.Lock() - defer o.mu.Unlock() - - if o.done { - return nil - } - o.done = true - - if o.abort { - return errs.Combine( - errs.New("commit failed: not every child was committed"), - o.w.Abort(), - ) - } - - return o.w.Commit() -} - -// Abort aborts the overall GenericMultiWriteHandle. -func (o *GenericMultiWriteHandle) Abort(ctx context.Context) error { - o.mu.Lock() - defer o.mu.Unlock() - - if o.done { - return nil - } - o.done = true - o.abort = true - - return o.w.Abort() -} - -type genericWriteHandle struct { - parent *GenericMultiWriteHandle - w GenericWriter - done bool - off int64 - tail bool - len int64 -} - -func (o *genericWriteHandle) Write(p []byte) (int, error) { - if !o.tail { - if o.len <= 0 { - return 0, errs.New("write past maximum length") - } else if o.len < int64(len(p)) { - p = p[:o.len] - } - } - n, err := o.w.WriteAt(p, o.off) - o.off += int64(n) - if !o.tail { - o.len -= int64(n) - } - return n, err -} - -func (o *genericWriteHandle) Commit() error { - if o.done { - return nil - } - o.done = true - - done, abort := o.parent.status() - if abort { - return errs.New("commit failed: parent write handle aborted") - } else if done { - return errs.New("commit failed: parent write handle done") - } - return nil -} - -func (o *genericWriteHandle) Abort() error { - if o.done { - return nil - } - o.done = true - - o.parent.childAbort() - return nil -} diff --git a/cmd/uplink/ulfs/handle_std.go b/cmd/uplink/ulfs/handle_std.go index 2ddc9b522..26b260d0f 100644 --- a/cmd/uplink/ulfs/handle_std.go +++ b/cmd/uplink/ulfs/handle_std.go @@ -114,19 +114,21 @@ func (o *stdReadHandle) Read(p []byte) (int, error) { return 0, io.EOF } - if o.len < int64(len(p)) { + if o.len >= 0 && o.len < int64(len(p)) { p = p[:o.len] } n, err := o.stdin.Read(p) - o.len -= int64(n) + if o.len > 0 { + o.len -= int64(n) + } if err != nil && o.err == nil { o.err = err o.done.Release() } - if o.len <= 0 { + if o.len == 0 { o.closed = true o.done.Release() } @@ -138,143 +140,17 @@ func (o *stdReadHandle) Read(p []byte) (int, error) { // write handles // -// stdMultiWriteHandle implements MultiWriteHandle for stdouts. -type stdMultiWriteHandle struct { - stdout closableWriter - - mu sync.Mutex - next *sync.Mutex - tail bool - done bool -} - -func newStdMultiWriteHandle(stdout io.Writer) *stdMultiWriteHandle { - return &stdMultiWriteHandle{ - stdout: closableWriter{Writer: stdout}, - next: new(sync.Mutex), - } -} - -func (s *stdMultiWriteHandle) NextPart(ctx context.Context, length int64) (WriteHandle, error) { - s.mu.Lock() - defer s.mu.Unlock() - - if s.done { - return nil, errs.New("already closed") - } else if s.tail { - return nil, errs.New("unable to make part after tail part") - } - - next := new(sync.Mutex) - next.Lock() - - w := &stdWriteHandle{ - stdout: &s.stdout, - mu: s.next, - next: next, - tail: length < 0, - len: length, - } - - s.tail = w.tail - s.next = next - - return w, nil -} - -func (s *stdMultiWriteHandle) Commit(ctx context.Context) error { - s.mu.Lock() - defer s.mu.Unlock() - - s.done = true - return nil -} - -func (s *stdMultiWriteHandle) Abort(ctx context.Context) error { - s.mu.Lock() - defer s.mu.Unlock() - - s.done = true - return nil -} - // stdWriteHandle implements WriteHandle for stdouts. type stdWriteHandle struct { - stdout *closableWriter - mu *sync.Mutex - next *sync.Mutex - tail bool - len int64 + stdout io.Writer } -func (s *stdWriteHandle) unlockNext(err error) { - if s.next != nil { - if err != nil { - s.stdout.close(err) - } - s.next.Unlock() - s.next = nil +func newStdWriteHandle(stdout io.Writer) *stdWriteHandle { + return &stdWriteHandle{ + stdout: stdout, } } -func (s *stdWriteHandle) Write(p []byte) (int, error) { - s.mu.Lock() - defer s.mu.Unlock() - - if !s.tail { - if s.len <= 0 { - return 0, errs.New("write past maximum length") - } else if s.len < int64(len(p)) { - p = p[:s.len] - } - } - - n, err := s.stdout.Write(p) - - if !s.tail { - s.len -= int64(n) - if s.len == 0 { - s.unlockNext(err) - } - } - - return n, err -} - -func (s *stdWriteHandle) Commit() error { - s.mu.Lock() - defer s.mu.Unlock() - - s.len = 0 - s.unlockNext(nil) - - return nil -} - -func (s *stdWriteHandle) Abort() error { - s.mu.Lock() - defer s.mu.Unlock() - - s.len = 0 - s.unlockNext(context.Canceled) - - return nil -} - -type closableWriter struct { - io.Writer - err error -} - -func (out *closableWriter) Write(p []byte) (int, error) { - if out.err != nil { - return 0, out.err - } - n, err := out.Writer.Write(p) - out.err = err - return n, err -} - -func (out *closableWriter) close(err error) { - out.err = err -} +func (s *stdWriteHandle) Write(b []byte) (int, error) { return s.stdout.Write(b) } +func (s *stdWriteHandle) Commit() error { return nil } +func (s *stdWriteHandle) Abort() error { return nil } diff --git a/cmd/uplink/ulfs/handle_std_test.go b/cmd/uplink/ulfs/handle_std_test.go deleted file mode 100644 index 3fbfc5b8c..000000000 --- a/cmd/uplink/ulfs/handle_std_test.go +++ /dev/null @@ -1,101 +0,0 @@ -// Copyright (C) 2022 Storj Labs, Inc. -// See LICENSE for copying information. - -package ulfs - -import ( - "bytes" - "errors" - "sync/atomic" - "testing" - - "github.com/stretchr/testify/require" - - "storj.io/common/testcontext" - "storj.io/common/testrand" -) - -type writeThrottle struct { - entered chan struct{} - release chan error -} - -type throttledWriter struct { - writex int64 - write []writeThrottle - data bytes.Buffer -} - -func newThrottledWriter(maxWrites int) *throttledWriter { - tw := &throttledWriter{ - writex: 0, - write: make([]writeThrottle, maxWrites), - } - for i := range tw.write { - tw.write[i] = writeThrottle{ - entered: make(chan struct{}), - release: make(chan error, 1), - } - } - return tw -} - -func (tw *throttledWriter) Write(data []byte) (n int, _ error) { - index := atomic.AddInt64(&tw.writex, 1) - 1 - - close(tw.write[index].entered) - forceErr := <-tw.write[index].release - - n, writeErr := tw.data.Write(data) - if writeErr != nil { - return n, writeErr - } - - return n, forceErr -} - -func TestStdMultiWriteAbort(t *testing.T) { - ctx := testcontext.New(t) - - stdout := newThrottledWriter(2) - multi := newStdMultiWriteHandle(stdout) - - head := testrand.Bytes(256) - tail := testrand.Bytes(256) - - part1, err := multi.NextPart(ctx, 256) - require.NoError(t, err) - ctx.Go(func() error { - defer func() { _ = part1.Abort() }() - - _, err := part1.Write(head) - if err == nil { - return errors.New("expected an error") - } - return nil - }) - - part2, err := multi.NextPart(ctx, 256) - require.NoError(t, err) - ctx.Go(func() error { - defer func() { _ = part2.Commit() }() - - // wait for the above part to enter write first - <-stdout.write[0].entered - _, err := part2.Write(tail) - if err == nil { - return errors.New("expected an error") - } - return nil - }) - - // wait until we enter both writes - <-stdout.write[0].entered - - stdout.write[0].release <- errors.New("fail 0") - stdout.write[1].release <- nil - - ctx.Wait() - - require.Equal(t, head, stdout.data.Bytes()) -} diff --git a/cmd/uplink/ulfs/handle_uplink.go b/cmd/uplink/ulfs/handle_uplink.go index 328c4e64d..a68314b72 100644 --- a/cmd/uplink/ulfs/handle_uplink.go +++ b/cmd/uplink/ulfs/handle_uplink.go @@ -166,127 +166,16 @@ func (u *uplinkReadHandle) Info() ObjectInfo { return *u.info } // write handles // -type uplinkMultiWriteHandle struct { - project *uplink.Project - bucket string - info uplink.UploadInfo - metadata uplink.CustomMetadata - - mu sync.Mutex - tail bool - part uint32 - commitErr *error - abortErr *error -} - -func newUplinkMultiWriteHandle(project *uplink.Project, bucket string, info uplink.UploadInfo, metadata uplink.CustomMetadata) *uplinkMultiWriteHandle { - return &uplinkMultiWriteHandle{ - project: project, - bucket: bucket, - info: info, - metadata: metadata, - } -} - -func (u *uplinkMultiWriteHandle) NextPart(ctx context.Context, length int64) (WriteHandle, error) { - part, err := func() (uint32, error) { - u.mu.Lock() - defer u.mu.Unlock() - - switch { - case u.abortErr != nil: - return 0, errs.New("cannot make part after multipart write has been aborted") - case u.commitErr != nil: - return 0, errs.New("cannot make part after multipart write has been committed") - } - - if u.tail { - return 0, errs.New("unable to make part after tail part") - } - u.tail = length < 0 - - u.part++ - return u.part, nil - }() - if err != nil { - return nil, err - } - - ul, err := u.project.UploadPart(ctx, u.bucket, u.info.Key, u.info.UploadID, part) - if err != nil { - return nil, err - } - - return &uplinkWriteHandle{ - ul: ul, - tail: length < 0, - len: length, - }, nil -} - -func (u *uplinkMultiWriteHandle) Commit(ctx context.Context) error { - u.mu.Lock() - defer u.mu.Unlock() - - switch { - case u.abortErr != nil: - return errs.New("cannot commit an aborted multipart write") - case u.commitErr != nil: - return *u.commitErr - } - - _, err := u.project.CommitUpload(ctx, u.bucket, u.info.Key, u.info.UploadID, &uplink.CommitUploadOptions{ - CustomMetadata: u.metadata, - }) - u.commitErr = &err - return err -} - -func (u *uplinkMultiWriteHandle) Abort(ctx context.Context) error { - u.mu.Lock() - defer u.mu.Unlock() - - switch { - case u.abortErr != nil: - return *u.abortErr - case u.commitErr != nil: - return errs.New("cannot abort a committed multipart write") - } - - err := u.project.AbortUpload(ctx, u.bucket, u.info.Key, u.info.UploadID) - u.abortErr = &err - return err -} - -// uplinkWriteHandle implements writeHandle for *uplink.Uploads. type uplinkWriteHandle struct { - ul *uplink.PartUpload - tail bool - len int64 + upload *uplink.Upload } -func (u *uplinkWriteHandle) Write(p []byte) (int, error) { - if !u.tail { - if u.len <= 0 { - return 0, errs.New("write past maximum length") - } else if u.len < int64(len(p)) { - p = p[:u.len] - } +func newUplinkWriteHandle(upload *uplink.Upload) *uplinkWriteHandle { + return &uplinkWriteHandle{ + upload: upload, } - - n, err := u.ul.Write(p) - - if !u.tail { - u.len -= int64(n) - } - - return n, err } -func (u *uplinkWriteHandle) Commit() error { - return u.ul.Commit() -} - -func (u *uplinkWriteHandle) Abort() error { - return u.ul.Abort() -} +func (u *uplinkWriteHandle) Write(b []byte) (int, error) { return u.upload.Write(b) } +func (u *uplinkWriteHandle) Commit() error { return u.upload.Commit() } +func (u *uplinkWriteHandle) Abort() error { return u.upload.Abort() } diff --git a/cmd/uplink/ulfs/local.go b/cmd/uplink/ulfs/local.go index d8c2fcee4..9720b9608 100644 --- a/cmd/uplink/ulfs/local.go +++ b/cmd/uplink/ulfs/local.go @@ -19,8 +19,8 @@ import ( // LocalBackendFile represents a file in the filesystem. type LocalBackendFile interface { io.Closer + io.Writer io.ReaderAt - io.WriterAt Name() string Stat() (os.FileInfo, error) Readdir(int) ([]os.FileInfo, error) @@ -58,7 +58,7 @@ func (l *Local) Open(ctx context.Context, path string) (MultiReadHandle, error) } // Create makes any directories necessary to create a file at path and returns a WriteHandle. -func (l *Local) Create(ctx context.Context, path string) (MultiWriteHandle, error) { +func (l *Local) Create(ctx context.Context, path string) (WriteHandle, error) { fi, err := l.fs.Stat(path) if err != nil && !os.IsNotExist(err) { return nil, errs.Wrap(err) @@ -75,7 +75,7 @@ func (l *Local) Create(ctx context.Context, path string) (MultiWriteHandle, erro if err != nil { return nil, errs.Wrap(err) } - return newOSMultiWriteHandle(l.fs, fh), nil + return newOSWriteHandle(l.fs, fh), nil } // Move moves file to provided path. diff --git a/cmd/uplink/ulfs/local_backend_mem.go b/cmd/uplink/ulfs/local_backend_mem.go index 9f7a24911..a020619ae 100644 --- a/cmd/uplink/ulfs/local_backend_mem.go +++ b/cmd/uplink/ulfs/local_backend_mem.go @@ -200,11 +200,9 @@ func (mf *memFile) ReadAt(p []byte, off int64) (int, error) { return copy(p, mf.buf[off:]), nil } -func (mf *memFile) WriteAt(p []byte, off int64) (int, error) { - if delta := (off + int64(len(p))) - int64(len(mf.buf)); delta > 0 { - mf.buf = append(mf.buf, make([]byte, delta)...) - } - return copy(mf.buf[off:], p), nil +func (mf *memFile) Write(p []byte) (int, error) { + mf.buf = append(mf.buf, p...) + return len(p), nil } func (mf *memFile) Stat() (os.FileInfo, error) { @@ -256,7 +254,7 @@ func (md *memDir) ReadAt(p []byte, off int64) (int, error) { return 0, errs.New("readat on directory") } -func (md *memDir) WriteAt(p []byte, off int64) (int, error) { +func (md *memDir) Write(p []byte) (int, error) { return 0, errs.New("writeat on directory") } diff --git a/cmd/uplink/ulfs/mixed.go b/cmd/uplink/ulfs/mixed.go index e67c7ffa7..9b77e9b17 100644 --- a/cmd/uplink/ulfs/mixed.go +++ b/cmd/uplink/ulfs/mixed.go @@ -42,13 +42,13 @@ func (m *Mixed) Open(ctx context.Context, loc ulloc.Location) (MultiReadHandle, } // Create returns a WriteHandle to either a local file, remote object, or stdout. -func (m *Mixed) Create(ctx context.Context, loc ulloc.Location, opts *CreateOptions) (MultiWriteHandle, error) { +func (m *Mixed) Create(ctx context.Context, loc ulloc.Location, opts *CreateOptions) (WriteHandle, error) { if bucket, key, ok := loc.RemoteParts(); ok { return m.remote.Create(ctx, bucket, key, opts) } else if path, ok := loc.LocalParts(); ok { return m.local.Create(ctx, path) } - return newStdMultiWriteHandle(clingy.Stdout(ctx)), nil + return newStdWriteHandle(clingy.Stdout(ctx)), nil } // Move moves either a local file or remote object. diff --git a/cmd/uplink/ulfs/remote.go b/cmd/uplink/ulfs/remote.go index 5a32b37f3..1405f6d1d 100644 --- a/cmd/uplink/ulfs/remote.go +++ b/cmd/uplink/ulfs/remote.go @@ -46,23 +46,22 @@ func (r *Remote) Stat(ctx context.Context, bucket, key string) (*ObjectInfo, err } // Create returns a MultiWriteHandle for the object identified by a given bucket and key. -func (r *Remote) Create(ctx context.Context, bucket, key string, opts *CreateOptions) (MultiWriteHandle, error) { - var customMetadata uplink.CustomMetadata - if opts.Metadata != nil { - customMetadata = uplink.CustomMetadata(opts.Metadata) - - if err := customMetadata.Verify(); err != nil { - return nil, err - } - } - - info, err := r.project.BeginUpload(ctx, bucket, key, &uplink.UploadOptions{ +func (r *Remote) Create(ctx context.Context, bucket, key string, opts *CreateOptions) (WriteHandle, error) { + upload, err := r.project.UploadObject(ctx, bucket, key, &uplink.UploadOptions{ Expires: opts.Expires, }) if err != nil { return nil, err } - return newUplinkMultiWriteHandle(r.project, bucket, info, customMetadata), nil + + if opts.Metadata != nil { + if err := upload.SetCustomMetadata(ctx, uplink.CustomMetadata(opts.Metadata)); err != nil { + _ = upload.Abort() + return nil, err + } + } + + return newUplinkWriteHandle(upload), nil } // Move moves object to provided key and bucket. diff --git a/cmd/uplink/ultest/filesystem.go b/cmd/uplink/ultest/filesystem.go index 03acdb6ee..360b6b44f 100644 --- a/cmd/uplink/ultest/filesystem.go +++ b/cmd/uplink/ultest/filesystem.go @@ -112,7 +112,7 @@ func (rfs *remoteFilesystem) Open(ctx context.Context, bucket, key string) (ulfs return newMultiReadHandle(mf.contents), nil } -func (rfs *remoteFilesystem) Create(ctx context.Context, bucket, key string, opts *ulfs.CreateOptions) (_ ulfs.MultiWriteHandle, err error) { +func (rfs *remoteFilesystem) Create(ctx context.Context, bucket, key string, opts *ulfs.CreateOptions) (_ ulfs.WriteHandle, err error) { rfs.mu.Lock() defer rfs.mu.Unlock() @@ -140,7 +140,7 @@ func (rfs *remoteFilesystem) Create(ctx context.Context, bucket, key string, opt rfs.pending[loc] = append(rfs.pending[loc], wh) - return ulfs.NewGenericMultiWriteHandle(wh), nil + return wh, nil } func (rfs *remoteFilesystem) Move(ctx context.Context, oldbucket, oldkey string, newbucket, newkey string) error { @@ -282,15 +282,12 @@ type memWriteHandle struct { done bool } -func (b *memWriteHandle) WriteAt(p []byte, off int64) (int, error) { +func (b *memWriteHandle) Write(p []byte) (int, error) { if b.done { return 0, errs.New("write to closed handle") } - end := int64(len(p)) + off - if grow := end - int64(len(b.buf)); grow > 0 { - b.buf = append(b.buf, make([]byte, grow)...) - } - return copy(b.buf[off:], p), nil + b.buf = append(b.buf, p...) + return len(p), nil } func (b *memWriteHandle) Commit() error { diff --git a/cmd/uplink/ultest/setup.go b/cmd/uplink/ultest/setup.go index 6ba68caf2..376fa5788 100644 --- a/cmd/uplink/ultest/setup.go +++ b/cmd/uplink/ultest/setup.go @@ -199,11 +199,7 @@ func WithFile(location string, contents ...string) ExecuteOption { cs.rfs.ensureBucket(bucket) } - mwh, err := cs.fs.Create(ctx, loc, nil) - require.NoError(t, err) - defer func() { _ = mwh.Abort(ctx) }() - - wh, err := mwh.NextPart(ctx, -1) + wh, err := cs.fs.Create(ctx, loc, nil) require.NoError(t, err) defer func() { _ = wh.Abort() }() @@ -217,7 +213,6 @@ func WithFile(location string, contents ...string) ExecuteOption { } require.NoError(t, wh.Commit()) - require.NoError(t, mwh.Commit(ctx)) }} } diff --git a/go.mod b/go.mod index a43db6a4c..78bb8d441 100644 --- a/go.mod +++ b/go.mod @@ -43,7 +43,7 @@ require ( github.com/vivint/infectious v0.0.0-20200605153912-25a574ae18a3 github.com/zeebo/assert v1.3.1 github.com/zeebo/blake3 v0.2.3 - github.com/zeebo/clingy v0.0.0-20220926155919-717640cb8ccd + github.com/zeebo/clingy v0.0.0-20230301225531-f2d4117c8e8c github.com/zeebo/errs v1.3.0 github.com/zeebo/ini v0.0.0-20210514163846-cc8fbd8d9599 github.com/zyedidia/generic v1.2.1 diff --git a/go.sum b/go.sum index 335731c23..68be3eb9c 100644 --- a/go.sum +++ b/go.sum @@ -605,8 +605,8 @@ github.com/zeebo/assert v1.3.1 h1:vukIABvugfNMZMQO1ABsyQDJDTVQbn+LWSMy1ol1h6A= github.com/zeebo/assert v1.3.1/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/blake3 v0.2.3 h1:TFoLXsjeXqRNFxSbk35Dk4YtszE/MQQGK10BH4ptoTg= github.com/zeebo/blake3 v0.2.3/go.mod h1:mjJjZpnsyIVtVgTOSpJ9vmRE4wgDeyt2HU3qXvvKCaQ= -github.com/zeebo/clingy v0.0.0-20220926155919-717640cb8ccd h1:gDDoyNwLC/yJbK2AXiJplnGb1HUvOYJm9EWt8FyZJY0= -github.com/zeebo/clingy v0.0.0-20220926155919-717640cb8ccd/go.mod h1:MHEhXvEfewflU7SSVKHI7nkdU+fpyxZ5XPPzj+5gYNw= +github.com/zeebo/clingy v0.0.0-20230301225531-f2d4117c8e8c h1:bE9vXPFKa9wkCCq1HJi2Ms4pWuBoIKQEMe6CZzu/TKE= +github.com/zeebo/clingy v0.0.0-20230301225531-f2d4117c8e8c/go.mod h1:MHEhXvEfewflU7SSVKHI7nkdU+fpyxZ5XPPzj+5gYNw= github.com/zeebo/errs v1.1.1/go.mod h1:Yj8dHrUQwls1bF3dr/vcSIu+qf4mI7idnTcHfoACc6I= github.com/zeebo/errs v1.2.2/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4= github.com/zeebo/errs v1.3.0 h1:hmiaKqgYZzcVgRL1Vkc1Mn2914BbzB0IBxs+ebeutGs=