cmd/uplink: only use new code path for uploads

downloads still need the old copy code because they aren't
parallel in the same way uploads are. revert all the code
that removed the parallel copy, only use the non-parallel
copy for uploads, and add back the parallelism and chunk
size flags and have them set the maximum concurrent pieces
flags to values based on each other when only one is set
for backwards compatibility.

mostly reverts 54ef1c8ca2

Change-Id: I8b5f62bf18a6548fa60865c6c61b5f34fbcec14c
This commit is contained in:
Jeff Wendling 2023-06-08 11:42:03 -04:00 committed by Storj Robot
parent f9c076abcf
commit f3c58174c4
16 changed files with 845 additions and 93 deletions

View File

@ -6,6 +6,7 @@ package main
import ( import (
"bufio" "bufio"
"context" "context"
"errors"
"fmt" "fmt"
"io" "io"
"os" "os"
@ -20,6 +21,7 @@ import (
"github.com/zeebo/clingy" "github.com/zeebo/clingy"
"github.com/zeebo/errs" "github.com/zeebo/errs"
"storj.io/common/context2"
"storj.io/common/fpath" "storj.io/common/fpath"
"storj.io/common/memory" "storj.io/common/memory"
"storj.io/common/rpc/rpcpool" "storj.io/common/rpc/rpcpool"
@ -53,6 +55,8 @@ type cmdCp struct {
locs []ulloc.Location locs []ulloc.Location
} }
const maxPartCount int64 = 10000
func newCmdCp(ex ulext.External) *cmdCp { func newCmdCp(ex ulext.External) *cmdCp {
return &cmdCp{ex: ex} return &cmdCp{ex: ex}
} }
@ -81,8 +85,8 @@ func (c *cmdCp) Setup(params clingy.Parameters) {
).(bool) ).(bool)
c.byteRange = params.Flag("range", "Downloads the specified range bytes of an object. For more information about the HTTP Range header, see https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35", "").(string) c.byteRange = params.Flag("range", "Downloads the specified range bytes of an object. For more information about the HTTP Range header, see https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35", "").(string)
c.parallelism = params.Flag("parallelism", "Deprecated", 1, parallelism := params.Flag("parallelism", "Controls how many parallel chunks to upload/download from a file", nil,
clingy.Hidden, clingy.Optional,
clingy.Short('p'), clingy.Short('p'),
clingy.Transform(strconv.Atoi), clingy.Transform(strconv.Atoi),
clingy.Transform(func(n int) (int, error) { clingy.Transform(func(n int) (int, error) {
@ -91,9 +95,8 @@ func (c *cmdCp) Setup(params clingy.Parameters) {
} }
return n, nil return n, nil
}), }),
).(int) ).(*int)
c.parallelismChunkSize = params.Flag("parallelism-chunk-size", "Deprecated", memory.Size(0), c.parallelismChunkSize = params.Flag("parallelism-chunk-size", "Set the size of the chunks for parallelism, 0 means automatic adjustment", memory.Size(0),
clingy.Hidden,
clingy.Transform(memory.ParseString), clingy.Transform(memory.ParseString),
clingy.Transform(func(n int64) (memory.Size, error) { clingy.Transform(func(n int64) (memory.Size, error) {
if n < 0 { if n < 0 {
@ -104,13 +107,14 @@ func (c *cmdCp) Setup(params clingy.Parameters) {
).(memory.Size) ).(memory.Size)
c.uploadConfig = testuplink.DefaultConcurrentSegmentUploadsConfig() c.uploadConfig = testuplink.DefaultConcurrentSegmentUploadsConfig()
c.uploadConfig.SchedulerOptions.MaximumConcurrent = params.Flag( maxConcurrent := params.Flag(
"maximum-concurrent-pieces", "maximum-concurrent-pieces",
"Maximum concurrent pieces to upload at once per transfer", "Maximum concurrent pieces to upload at once per transfer",
c.uploadConfig.SchedulerOptions.MaximumConcurrent, nil,
clingy.Optional,
clingy.Transform(strconv.Atoi), clingy.Transform(strconv.Atoi),
clingy.Advanced, clingy.Advanced,
).(int) ).(*int)
c.uploadConfig.SchedulerOptions.MaximumConcurrentHandles = params.Flag( c.uploadConfig.SchedulerOptions.MaximumConcurrentHandles = params.Flag(
"maximum-concurrent-segments", "maximum-concurrent-segments",
"Maximum concurrent segments to upload at once per transfer", "Maximum concurrent segments to upload at once per transfer",
@ -129,6 +133,28 @@ func (c *cmdCp) Setup(params clingy.Parameters) {
clingy.Advanced, clingy.Advanced,
).(string) ).(string)
{ // handle backwards compatibility around parallelism and maximum concurrent pieces
addr := func(x int) *int { return &x }
switch {
// if neither are actively set, use defaults
case parallelism == nil && maxConcurrent == nil:
parallelism = addr(1)
maxConcurrent = addr(c.uploadConfig.SchedulerOptions.MaximumConcurrent)
// if parallelism is not set, use a value based on maxConcurrent
case parallelism == nil:
parallelism = addr((*maxConcurrent + 99) / 100)
// if maxConcurrent is not set, use a value based on parallelism
case maxConcurrent == nil:
maxConcurrent = addr(100 * *parallelism)
}
c.uploadConfig.SchedulerOptions.MaximumConcurrent = *maxConcurrent
c.parallelism = *parallelism
}
c.inmemoryEC = params.Flag("inmemory-erasure-coding", "Keep erasure-coded pieces in-memory instead of writing them on the disk during upload", false, c.inmemoryEC = params.Flag("inmemory-erasure-coding", "Keep erasure-coded pieces in-memory instead of writing them on the disk during upload", false,
clingy.Transform(strconv.ParseBool), clingy.Transform(strconv.ParseBool),
clingy.Boolean, clingy.Boolean,
@ -384,24 +410,57 @@ func (c *cmdCp) copyFile(ctx context.Context, fs ulfs.Filesystem, source, dest u
} }
defer func() { _ = mrh.Close() }() defer func() { _ = mrh.Close() }()
wh, err := fs.Create(ctx, dest, &ulfs.CreateOptions{ mwh, err := fs.Create(ctx, dest, &ulfs.CreateOptions{
Expires: c.expires, Expires: c.expires,
Metadata: c.metadata, Metadata: c.metadata,
}) })
if err != nil { if err != nil {
return err return err
} }
defer func() { _ = wh.Abort() }() defer func() { _ = mwh.Abort(ctx) }()
return errs.Wrap(c.copy( // if we're uploading, do a single part of maximum size
if dest.Remote() {
return errs.Wrap(c.singleCopy(
ctx,
source, dest,
mrh, mwh,
offset, length,
bar,
))
}
partSize, err := c.calculatePartSize(mrh.Length(), c.parallelismChunkSize.Int64())
if err != nil {
return err
}
return errs.Wrap(c.parallelCopy(
ctx, ctx,
source, dest, source, dest,
wh, mrh, mrh, mwh,
c.parallelism, partSize,
offset, length, offset, length,
bar, bar,
)) ))
} }
// calculatePartSize returns the needed part size in order to upload the file with size of 'length'.
// It hereby respects if the client requests/prefers a certain size and only increases if needed.
func (c *cmdCp) calculatePartSize(length, preferredSize int64) (requiredSize int64, err error) {
segC := (length / maxPartCount / (memory.MiB * 64).Int64()) + 1
requiredSize = segC * (memory.MiB * 64).Int64()
switch {
case preferredSize == 0:
return requiredSize, nil
case requiredSize <= preferredSize:
return preferredSize, nil
default:
return 0, errs.New(fmt.Sprintf("the specified chunk size %s is too small, requires %s or larger",
memory.FormatBytes(preferredSize), memory.FormatBytes(requiredSize)))
}
}
func copyVerbing(source, dest ulloc.Location) (verb string) { func copyVerbing(source, dest ulloc.Location) (verb string) {
return copyVerb(source, dest) + "ing" return copyVerb(source, dest) + "ing"
} }
@ -429,11 +488,142 @@ func joinDestWith(dest ulloc.Location, suffix string) ulloc.Location {
return dest return dest
} }
func (c *cmdCp) copy( func (c *cmdCp) parallelCopy(
ctx context.Context, ctx context.Context,
source, dest ulloc.Location, source, dest ulloc.Location,
dst ulfs.WriteHandle,
src ulfs.MultiReadHandle, src ulfs.MultiReadHandle,
dst ulfs.MultiWriteHandle,
p int, chunkSize int64,
offset, length int64,
bar *mpb.Bar) error {
if offset != 0 {
if err := src.SetOffset(offset); err != nil {
return err
}
}
var (
limiter = sync2.NewLimiter(p)
es errs.Group
mu sync.Mutex
)
ctx, cancel := context.WithCancel(ctx)
defer func() { _ = src.Close() }()
defer func() {
nocancel := context2.WithoutCancellation(ctx)
timedctx, cancel := context.WithTimeout(nocancel, 5*time.Second)
defer cancel()
_ = dst.Abort(timedctx)
}()
defer cancel()
addError := func(err error) {
if err == nil {
return
}
mu.Lock()
defer mu.Unlock()
es.Add(err)
// abort all other concurrenty copies
cancel()
}
var readBufs *ulfs.BytesPool
if p > 1 && chunkSize > 0 && (source.Std() || dest.Std()) {
// Create the read buffer pool only for uploads from stdin and downloads to stdout with parallelism > 1.
readBufs = ulfs.NewBytesPool(int(chunkSize))
}
for i := 0; length != 0; i++ {
i := i
chunk := chunkSize
if length > 0 && chunkSize > length {
chunk = length
}
length -= chunk
rh, err := src.NextPart(ctx, chunk)
if err != nil {
if !errors.Is(err, io.EOF) {
addError(errs.New("error getting reader for part %d: %v", i, err))
}
break
}
wh, err := dst.NextPart(ctx, chunk)
if err != nil {
_ = rh.Close()
addError(errs.New("error getting writer for part %d: %v", i, err))
break
}
ok := limiter.Go(ctx, func() {
defer func() { _ = rh.Close() }()
defer func() { _ = wh.Abort() }()
if readBufs != nil {
buf := readBufs.Get()
defer readBufs.Put(buf)
rh = ulfs.NewBufferedReadHandle(ctx, rh, buf)
}
var w io.Writer = wh
if bar != nil {
bar.SetTotal(rh.Info().ContentLength, false)
bar.EnableTriggerComplete()
pw := bar.ProxyWriter(w)
defer func() {
_ = pw.Close()
}()
w = pw
}
_, err := sync2.Copy(ctx, w, rh)
if err == nil {
err = wh.Commit()
}
if err != nil {
// TODO: it would be also nice to use wh.Abort and rh.Close directly
// to avoid some of the waiting that's caused by sync2.Copy.
//
// However, some of the source / destination implementations don't seem
// to have concurrent safe API with that regards.
//
// Also, we may want to check that it actually helps, before implementing it.
addError(errs.New("failed to %s part %d: %v", copyVerb(source, dest), i, err))
}
})
if !ok {
break
}
}
limiter.Wait()
// don't try to commit if any error occur
if len(es) == 0 {
es.Add(dst.Commit(ctx))
}
return errs.Wrap(combineErrs(es))
}
func (c *cmdCp) singleCopy(
ctx context.Context,
source, dest ulloc.Location,
src ulfs.MultiReadHandle,
dst ulfs.MultiWriteHandle,
offset, length int64, offset, length int64,
bar *mpb.Bar) error { bar *mpb.Bar) error {
@ -446,16 +636,19 @@ func (c *cmdCp) copy(
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
defer func() { _ = src.Close() }()
defer func() { _ = dst.Abort() }()
rh, err := src.NextPart(ctx, length) rh, err := src.NextPart(ctx, length)
if err != nil { if err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
defer func() { _ = rh.Close() }() defer func() { _ = rh.Close() }()
var w io.Writer = dst wh, err := dst.NextPart(ctx, length)
if err != nil {
return errs.Wrap(err)
}
defer func() { _ = wh.Abort() }()
var w io.Writer = wh
if bar != nil { if bar != nil {
bar.SetTotal(rh.Info().ContentLength, false) bar.SetTotal(rh.Info().ContentLength, false)
bar.EnableTriggerComplete() bar.EnableTriggerComplete()
@ -464,12 +657,19 @@ func (c *cmdCp) copy(
w = pw w = pw
} }
_, err = sync2.Copy(ctx, w, rh) if _, err := sync2.Copy(ctx, w, rh); err != nil {
if err == nil { return errs.Wrap(err)
err = dst.Commit()
} }
return errs.Wrap(err) if err := wh.Commit(); err != nil {
return errs.Wrap(err)
}
if err := dst.Commit(ctx); err != nil {
return errs.Wrap(err)
}
return nil
} }
func newProgressBar(progress *mpb.Progress, name string, which, total int) *mpb.Bar { func newProgressBar(progress *mpb.Progress, name string, which, total int) *mpb.Bar {

View File

@ -6,6 +6,9 @@ package main
import ( import (
"testing" "testing"
"github.com/stretchr/testify/require"
"storj.io/common/memory"
"storj.io/storj/cmd/uplink/ultest" "storj.io/storj/cmd/uplink/ultest"
) )
@ -93,6 +96,51 @@ func TestCpDownload(t *testing.T) {
}) })
} }
func TestCpPartSize(t *testing.T) {
c := newCmdCp(nil)
// 1GiB file, should return 64MiB
partSize, err := c.calculatePartSize(memory.GiB.Int64(), c.parallelismChunkSize.Int64())
require.NoError(t, err)
require.EqualValues(t, memory.MiB*64, partSize)
// 640 GB file, should return 64MiB.
partSize, err = c.calculatePartSize(memory.GB.Int64()*640, c.parallelismChunkSize.Int64())
require.NoError(t, err)
require.EqualValues(t, memory.MiB*64, partSize)
// 640GiB file, should return 128MiB.
partSize, err = c.calculatePartSize(memory.GiB.Int64()*640, c.parallelismChunkSize.Int64())
require.NoError(t, err)
require.EqualValues(t, memory.MiB*128, partSize)
// 1TiB file, should return 128MiB.
partSize, err = c.calculatePartSize(memory.TiB.Int64(), c.parallelismChunkSize.Int64())
require.NoError(t, err)
require.EqualValues(t, memory.MiB*128, partSize)
// 1.3TiB file, should return 192MiB.
partSize, err = c.calculatePartSize(memory.GiB.Int64()*1300, c.parallelismChunkSize.Int64())
require.NoError(t, err)
require.EqualValues(t, memory.MiB*192, partSize)
// should return 1GiB as requested.
partSize, err = c.calculatePartSize(memory.GiB.Int64()*1300, memory.GiB.Int64())
require.NoError(t, err)
require.EqualValues(t, memory.GiB, partSize)
// should return 192 MiB and error, since preferred is too low.
partSize, err = c.calculatePartSize(memory.GiB.Int64()*1300, memory.MiB.Int64())
require.Error(t, err)
require.Equal(t, "the specified chunk size 1.0 MiB is too small, requires 192.0 MiB or larger", err.Error())
require.Zero(t, partSize)
// negative length should return 64MiB part size
partSize, err = c.calculatePartSize(-1, c.parallelismChunkSize.Int64())
require.NoError(t, err)
require.EqualValues(t, memory.MiB*64, partSize)
}
func TestCpUpload(t *testing.T) { func TestCpUpload(t *testing.T) {
state := ultest.Setup(commands, state := ultest.Setup(commands,
ultest.WithFile("/home/user/file1.txt", "local"), ultest.WithFile("/home/user/file1.txt", "local"),

View File

@ -39,7 +39,7 @@ func (ro *RemoveOptions) isPending() bool { return ro != nil && ro.Pending }
type Filesystem interface { type Filesystem interface {
Close() error Close() error
Open(ctx context.Context, loc ulloc.Location) (MultiReadHandle, error) Open(ctx context.Context, loc ulloc.Location) (MultiReadHandle, error)
Create(ctx context.Context, loc ulloc.Location, opts *CreateOptions) (WriteHandle, error) Create(ctx context.Context, loc ulloc.Location, opts *CreateOptions) (MultiWriteHandle, error)
Move(ctx context.Context, source, dest ulloc.Location) error Move(ctx context.Context, source, dest ulloc.Location) error
Copy(ctx context.Context, source, dest ulloc.Location) error Copy(ctx context.Context, source, dest ulloc.Location) error
Remove(ctx context.Context, loc ulloc.Location, opts *RemoveOptions) error Remove(ctx context.Context, loc ulloc.Location, opts *RemoveOptions) error
@ -52,7 +52,7 @@ type Filesystem interface {
type FilesystemLocal interface { type FilesystemLocal interface {
IsLocalDir(ctx context.Context, path string) bool IsLocalDir(ctx context.Context, path string) bool
Open(ctx context.Context, path string) (MultiReadHandle, error) Open(ctx context.Context, path string) (MultiReadHandle, error)
Create(ctx context.Context, path string) (WriteHandle, error) Create(ctx context.Context, path string) (MultiWriteHandle, error)
Move(ctx context.Context, oldpath string, newpath string) error Move(ctx context.Context, oldpath string, newpath string) error
Copy(ctx context.Context, oldpath string, newpath string) error Copy(ctx context.Context, oldpath string, newpath string) error
Remove(ctx context.Context, path string, opts *RemoveOptions) error Remove(ctx context.Context, path string, opts *RemoveOptions) error
@ -64,7 +64,7 @@ type FilesystemLocal interface {
type FilesystemRemote interface { type FilesystemRemote interface {
Close() error Close() error
Open(ctx context.Context, bucket, key string) (MultiReadHandle, error) Open(ctx context.Context, bucket, key string) (MultiReadHandle, error)
Create(ctx context.Context, bucket, key string, opts *CreateOptions) (WriteHandle, error) Create(ctx context.Context, bucket, key string, opts *CreateOptions) (MultiWriteHandle, error)
Move(ctx context.Context, oldbucket, oldkey string, newbucket, newkey string) error Move(ctx context.Context, oldbucket, oldkey string, newbucket, newkey string) error
Copy(ctx context.Context, oldbucket, oldkey string, newbucket, newkey string) error Copy(ctx context.Context, oldbucket, oldkey string, newbucket, newkey string) error
Remove(ctx context.Context, bucket, key string, opts *RemoveOptions) error Remove(ctx context.Context, bucket, key string, opts *RemoveOptions) error
@ -142,6 +142,18 @@ type ReadHandle interface {
// write handles // write handles
// //
// MultiWriteHandle lets one create multiple sequential WriteHandles for
// different sections of something.
//
// The returned WriteHandle will error if data is attempted to be written
// past the provided length. A negative length implies an unknown amount
// of data, and future calls to NextPart will error.
type MultiWriteHandle interface {
NextPart(ctx context.Context, length int64) (WriteHandle, error)
Commit(ctx context.Context) error
Abort(ctx context.Context) error
}
// WriteHandle is anything that can be written to with commit/abort semantics. // WriteHandle is anything that can be written to with commit/abort semantics.
type WriteHandle interface { type WriteHandle interface {
io.Writer io.Writer

View File

@ -32,37 +32,22 @@ func newOSMultiReadHandle(fh LocalBackendFile) (MultiReadHandle, error) {
// //
type fileGenericWriter struct { type fileGenericWriter struct {
fs LocalBackend fs LocalBackend
raw LocalBackendFile raw LocalBackendFile
done bool
}
func (f *fileGenericWriter) Write(b []byte) (int, error) { return f.raw.Write(b) }
func (f *fileGenericWriter) Commit() error {
if f.done {
return errs.New("already commit/aborted")
}
f.done = true
return f.raw.Close()
} }
func (f *fileGenericWriter) WriteAt(b []byte, off int64) (int, error) { return f.raw.WriteAt(b, off) }
func (f *fileGenericWriter) Commit() error { return f.raw.Close() }
func (f *fileGenericWriter) Abort() error { func (f *fileGenericWriter) Abort() error {
if f.done {
return errs.New("already commit/aborted")
}
f.done = true
return errs.Combine( return errs.Combine(
f.raw.Close(), f.raw.Close(),
f.fs.Remove(f.raw.Name()), f.fs.Remove(f.raw.Name()),
) )
} }
func newOSWriteHandle(fs LocalBackend, fh LocalBackendFile) WriteHandle { func newOSMultiWriteHandle(fs LocalBackend, fh LocalBackendFile) MultiWriteHandle {
return &fileGenericWriter{ return NewGenericMultiWriteHandle(&fileGenericWriter{
fs: fs, fs: fs,
raw: fh, raw: fh,
} })
} }

View File

@ -133,3 +133,161 @@ func (o *genericReadHandle) Read(p []byte) (int, error) {
o.len -= int64(n) o.len -= int64(n)
return n, err return n, err
} }
//
// write handles
//
// GenericWriter is an interface that can be turned into a GenericMultiWriteHandle.
type GenericWriter interface {
io.WriterAt
Commit() error
Abort() error
}
// GenericMultiWriteHandle implements MultiWriteHandle for *os.Files.
type GenericMultiWriteHandle struct {
w GenericWriter
mu sync.Mutex
off int64
tail bool
done bool
abort bool
}
// NewGenericMultiWriteHandle constructs an *GenericMultiWriteHandle from a GenericWriter.
func NewGenericMultiWriteHandle(w GenericWriter) *GenericMultiWriteHandle {
return &GenericMultiWriteHandle{
w: w,
}
}
func (o *GenericMultiWriteHandle) childAbort() {
o.mu.Lock()
defer o.mu.Unlock()
if !o.done {
o.abort = true
}
}
func (o *GenericMultiWriteHandle) status() (done, abort bool) {
o.mu.Lock()
defer o.mu.Unlock()
return o.done, o.abort
}
// NextPart returns a WriteHandle expecting length bytes to be written to it.
func (o *GenericMultiWriteHandle) NextPart(ctx context.Context, length int64) (WriteHandle, error) {
o.mu.Lock()
defer o.mu.Unlock()
if o.done {
return nil, errs.New("already closed")
} else if o.tail {
return nil, errs.New("unable to make part after tail part")
}
w := &genericWriteHandle{
parent: o,
w: o.w,
off: o.off,
tail: length < 0,
len: length,
}
if w.tail {
o.tail = true
} else {
o.off += length
}
return w, nil
}
// Commit commits the overall GenericMultiWriteHandle. It errors if
// any parts were aborted.
func (o *GenericMultiWriteHandle) Commit(ctx context.Context) error {
o.mu.Lock()
defer o.mu.Unlock()
if o.done {
return nil
}
o.done = true
if o.abort {
return errs.Combine(
errs.New("commit failed: not every child was committed"),
o.w.Abort(),
)
}
return o.w.Commit()
}
// Abort aborts the overall GenericMultiWriteHandle.
func (o *GenericMultiWriteHandle) Abort(ctx context.Context) error {
o.mu.Lock()
defer o.mu.Unlock()
if o.done {
return nil
}
o.done = true
o.abort = true
return o.w.Abort()
}
type genericWriteHandle struct {
parent *GenericMultiWriteHandle
w GenericWriter
done bool
off int64
tail bool
len int64
}
func (o *genericWriteHandle) Write(p []byte) (int, error) {
if !o.tail {
if o.len <= 0 {
return 0, errs.New("write past maximum length")
} else if o.len < int64(len(p)) {
p = p[:o.len]
}
}
n, err := o.w.WriteAt(p, o.off)
o.off += int64(n)
if !o.tail {
o.len -= int64(n)
}
return n, err
}
func (o *genericWriteHandle) Commit() error {
if o.done {
return nil
}
o.done = true
done, abort := o.parent.status()
if abort {
return errs.New("commit failed: parent write handle aborted")
} else if done {
return errs.New("commit failed: parent write handle done")
}
return nil
}
func (o *genericWriteHandle) Abort() error {
if o.done {
return nil
}
o.done = true
o.parent.childAbort()
return nil
}

View File

@ -140,17 +140,143 @@ func (o *stdReadHandle) Read(p []byte) (int, error) {
// write handles // write handles
// //
// stdWriteHandle implements WriteHandle for stdouts. // stdMultiWriteHandle implements MultiWriteHandle for stdouts.
type stdWriteHandle struct { type stdMultiWriteHandle struct {
stdout io.Writer stdout closableWriter
mu sync.Mutex
next *sync.Mutex
tail bool
done bool
} }
func newStdWriteHandle(stdout io.Writer) *stdWriteHandle { func newStdMultiWriteHandle(stdout io.Writer) *stdMultiWriteHandle {
return &stdWriteHandle{ return &stdMultiWriteHandle{
stdout: stdout, stdout: closableWriter{Writer: stdout},
next: new(sync.Mutex),
} }
} }
func (s *stdWriteHandle) Write(b []byte) (int, error) { return s.stdout.Write(b) } func (s *stdMultiWriteHandle) NextPart(ctx context.Context, length int64) (WriteHandle, error) {
func (s *stdWriteHandle) Commit() error { return nil } s.mu.Lock()
func (s *stdWriteHandle) Abort() error { return nil } defer s.mu.Unlock()
if s.done {
return nil, errs.New("already closed")
} else if s.tail {
return nil, errs.New("unable to make part after tail part")
}
next := new(sync.Mutex)
next.Lock()
w := &stdWriteHandle{
stdout: &s.stdout,
mu: s.next,
next: next,
tail: length < 0,
len: length,
}
s.tail = w.tail
s.next = next
return w, nil
}
func (s *stdMultiWriteHandle) Commit(ctx context.Context) error {
s.mu.Lock()
defer s.mu.Unlock()
s.done = true
return nil
}
func (s *stdMultiWriteHandle) Abort(ctx context.Context) error {
s.mu.Lock()
defer s.mu.Unlock()
s.done = true
return nil
}
// stdWriteHandle implements WriteHandle for stdouts.
type stdWriteHandle struct {
stdout *closableWriter
mu *sync.Mutex
next *sync.Mutex
tail bool
len int64
}
func (s *stdWriteHandle) unlockNext(err error) {
if s.next != nil {
if err != nil {
s.stdout.close(err)
}
s.next.Unlock()
s.next = nil
}
}
func (s *stdWriteHandle) Write(p []byte) (int, error) {
s.mu.Lock()
defer s.mu.Unlock()
if !s.tail {
if s.len <= 0 {
return 0, errs.New("write past maximum length")
} else if s.len < int64(len(p)) {
p = p[:s.len]
}
}
n, err := s.stdout.Write(p)
if !s.tail {
s.len -= int64(n)
if s.len == 0 {
s.unlockNext(err)
}
}
return n, err
}
func (s *stdWriteHandle) Commit() error {
s.mu.Lock()
defer s.mu.Unlock()
s.len = 0
s.unlockNext(nil)
return nil
}
func (s *stdWriteHandle) Abort() error {
s.mu.Lock()
defer s.mu.Unlock()
s.len = 0
s.unlockNext(context.Canceled)
return nil
}
type closableWriter struct {
io.Writer
err error
}
func (out *closableWriter) Write(p []byte) (int, error) {
if out.err != nil {
return 0, out.err
}
n, err := out.Writer.Write(p)
out.err = err
return n, err
}
func (out *closableWriter) close(err error) {
out.err = err
}

View File

@ -0,0 +1,101 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package ulfs
import (
"bytes"
"errors"
"sync/atomic"
"testing"
"github.com/stretchr/testify/require"
"storj.io/common/testcontext"
"storj.io/common/testrand"
)
type writeThrottle struct {
entered chan struct{}
release chan error
}
type throttledWriter struct {
writex int64
write []writeThrottle
data bytes.Buffer
}
func newThrottledWriter(maxWrites int) *throttledWriter {
tw := &throttledWriter{
writex: 0,
write: make([]writeThrottle, maxWrites),
}
for i := range tw.write {
tw.write[i] = writeThrottle{
entered: make(chan struct{}),
release: make(chan error, 1),
}
}
return tw
}
func (tw *throttledWriter) Write(data []byte) (n int, _ error) {
index := atomic.AddInt64(&tw.writex, 1) - 1
close(tw.write[index].entered)
forceErr := <-tw.write[index].release
n, writeErr := tw.data.Write(data)
if writeErr != nil {
return n, writeErr
}
return n, forceErr
}
func TestStdMultiWriteAbort(t *testing.T) {
ctx := testcontext.New(t)
stdout := newThrottledWriter(2)
multi := newStdMultiWriteHandle(stdout)
head := testrand.Bytes(256)
tail := testrand.Bytes(256)
part1, err := multi.NextPart(ctx, 256)
require.NoError(t, err)
ctx.Go(func() error {
defer func() { _ = part1.Abort() }()
_, err := part1.Write(head)
if err == nil {
return errors.New("expected an error")
}
return nil
})
part2, err := multi.NextPart(ctx, 256)
require.NoError(t, err)
ctx.Go(func() error {
defer func() { _ = part2.Commit() }()
// wait for the above part to enter write first
<-stdout.write[0].entered
_, err := part2.Write(tail)
if err == nil {
return errors.New("expected an error")
}
return nil
})
// wait until we enter both writes
<-stdout.write[0].entered
stdout.write[0].release <- errors.New("fail 0")
stdout.write[1].release <- nil
ctx.Wait()
require.Equal(t, head, stdout.data.Bytes())
}

View File

@ -166,16 +166,127 @@ func (u *uplinkReadHandle) Info() ObjectInfo { return *u.info }
// write handles // write handles
// //
type uplinkWriteHandle struct { type uplinkMultiWriteHandle struct {
upload *uplink.Upload project *uplink.Project
bucket string
info uplink.UploadInfo
metadata uplink.CustomMetadata
mu sync.Mutex
tail bool
part uint32
commitErr *error
abortErr *error
} }
func newUplinkWriteHandle(upload *uplink.Upload) *uplinkWriteHandle { func newUplinkMultiWriteHandle(project *uplink.Project, bucket string, info uplink.UploadInfo, metadata uplink.CustomMetadata) *uplinkMultiWriteHandle {
return &uplinkWriteHandle{ return &uplinkMultiWriteHandle{
upload: upload, project: project,
bucket: bucket,
info: info,
metadata: metadata,
} }
} }
func (u *uplinkWriteHandle) Write(b []byte) (int, error) { return u.upload.Write(b) } func (u *uplinkMultiWriteHandle) NextPart(ctx context.Context, length int64) (WriteHandle, error) {
func (u *uplinkWriteHandle) Commit() error { return u.upload.Commit() } part, err := func() (uint32, error) {
func (u *uplinkWriteHandle) Abort() error { return u.upload.Abort() } u.mu.Lock()
defer u.mu.Unlock()
switch {
case u.abortErr != nil:
return 0, errs.New("cannot make part after multipart write has been aborted")
case u.commitErr != nil:
return 0, errs.New("cannot make part after multipart write has been committed")
}
if u.tail {
return 0, errs.New("unable to make part after tail part")
}
u.tail = length < 0
u.part++
return u.part, nil
}()
if err != nil {
return nil, err
}
ul, err := u.project.UploadPart(ctx, u.bucket, u.info.Key, u.info.UploadID, part)
if err != nil {
return nil, err
}
return &uplinkWriteHandle{
ul: ul,
tail: length < 0,
len: length,
}, nil
}
func (u *uplinkMultiWriteHandle) Commit(ctx context.Context) error {
u.mu.Lock()
defer u.mu.Unlock()
switch {
case u.abortErr != nil:
return errs.New("cannot commit an aborted multipart write")
case u.commitErr != nil:
return *u.commitErr
}
_, err := u.project.CommitUpload(ctx, u.bucket, u.info.Key, u.info.UploadID, &uplink.CommitUploadOptions{
CustomMetadata: u.metadata,
})
u.commitErr = &err
return err
}
func (u *uplinkMultiWriteHandle) Abort(ctx context.Context) error {
u.mu.Lock()
defer u.mu.Unlock()
switch {
case u.abortErr != nil:
return *u.abortErr
case u.commitErr != nil:
return errs.New("cannot abort a committed multipart write")
}
err := u.project.AbortUpload(ctx, u.bucket, u.info.Key, u.info.UploadID)
u.abortErr = &err
return err
}
// uplinkWriteHandle implements writeHandle for *uplink.Uploads.
type uplinkWriteHandle struct {
ul *uplink.PartUpload
tail bool
len int64
}
func (u *uplinkWriteHandle) Write(p []byte) (int, error) {
if !u.tail {
if u.len <= 0 {
return 0, errs.New("write past maximum length")
} else if u.len < int64(len(p)) {
p = p[:u.len]
}
}
n, err := u.ul.Write(p)
if !u.tail {
u.len -= int64(n)
}
return n, err
}
func (u *uplinkWriteHandle) Commit() error {
return u.ul.Commit()
}
func (u *uplinkWriteHandle) Abort() error {
return u.ul.Abort()
}

View File

@ -19,8 +19,8 @@ import (
// LocalBackendFile represents a file in the filesystem. // LocalBackendFile represents a file in the filesystem.
type LocalBackendFile interface { type LocalBackendFile interface {
io.Closer io.Closer
io.Writer
io.ReaderAt io.ReaderAt
io.WriterAt
Name() string Name() string
Stat() (os.FileInfo, error) Stat() (os.FileInfo, error)
Readdir(int) ([]os.FileInfo, error) Readdir(int) ([]os.FileInfo, error)
@ -58,7 +58,7 @@ func (l *Local) Open(ctx context.Context, path string) (MultiReadHandle, error)
} }
// Create makes any directories necessary to create a file at path and returns a WriteHandle. // Create makes any directories necessary to create a file at path and returns a WriteHandle.
func (l *Local) Create(ctx context.Context, path string) (WriteHandle, error) { func (l *Local) Create(ctx context.Context, path string) (MultiWriteHandle, error) {
fi, err := l.fs.Stat(path) fi, err := l.fs.Stat(path)
if err != nil && !os.IsNotExist(err) { if err != nil && !os.IsNotExist(err) {
return nil, errs.Wrap(err) return nil, errs.Wrap(err)
@ -75,7 +75,7 @@ func (l *Local) Create(ctx context.Context, path string) (WriteHandle, error) {
if err != nil { if err != nil {
return nil, errs.Wrap(err) return nil, errs.Wrap(err)
} }
return newOSWriteHandle(l.fs, fh), nil return newOSMultiWriteHandle(l.fs, fh), nil
} }
// Move moves file to provided path. // Move moves file to provided path.

View File

@ -200,9 +200,11 @@ func (mf *memFile) ReadAt(p []byte, off int64) (int, error) {
return copy(p, mf.buf[off:]), nil return copy(p, mf.buf[off:]), nil
} }
func (mf *memFile) Write(p []byte) (int, error) { func (mf *memFile) WriteAt(p []byte, off int64) (int, error) {
mf.buf = append(mf.buf, p...) if delta := (off + int64(len(p))) - int64(len(mf.buf)); delta > 0 {
return len(p), nil mf.buf = append(mf.buf, make([]byte, delta)...)
}
return copy(mf.buf[off:], p), nil
} }
func (mf *memFile) Stat() (os.FileInfo, error) { func (mf *memFile) Stat() (os.FileInfo, error) {
@ -254,7 +256,7 @@ func (md *memDir) ReadAt(p []byte, off int64) (int, error) {
return 0, errs.New("readat on directory") return 0, errs.New("readat on directory")
} }
func (md *memDir) Write(p []byte) (int, error) { func (md *memDir) WriteAt(p []byte, off int64) (int, error) {
return 0, errs.New("writeat on directory") return 0, errs.New("writeat on directory")
} }

View File

@ -42,13 +42,13 @@ func (m *Mixed) Open(ctx context.Context, loc ulloc.Location) (MultiReadHandle,
} }
// Create returns a WriteHandle to either a local file, remote object, or stdout. // Create returns a WriteHandle to either a local file, remote object, or stdout.
func (m *Mixed) Create(ctx context.Context, loc ulloc.Location, opts *CreateOptions) (WriteHandle, error) { func (m *Mixed) Create(ctx context.Context, loc ulloc.Location, opts *CreateOptions) (MultiWriteHandle, error) {
if bucket, key, ok := loc.RemoteParts(); ok { if bucket, key, ok := loc.RemoteParts(); ok {
return m.remote.Create(ctx, bucket, key, opts) return m.remote.Create(ctx, bucket, key, opts)
} else if path, ok := loc.LocalParts(); ok { } else if path, ok := loc.LocalParts(); ok {
return m.local.Create(ctx, path) return m.local.Create(ctx, path)
} }
return newStdWriteHandle(clingy.Stdout(ctx)), nil return newStdMultiWriteHandle(clingy.Stdout(ctx)), nil
} }
// Move moves either a local file or remote object. // Move moves either a local file or remote object.

View File

@ -46,22 +46,23 @@ func (r *Remote) Stat(ctx context.Context, bucket, key string) (*ObjectInfo, err
} }
// Create returns a MultiWriteHandle for the object identified by a given bucket and key. // Create returns a MultiWriteHandle for the object identified by a given bucket and key.
func (r *Remote) Create(ctx context.Context, bucket, key string, opts *CreateOptions) (WriteHandle, error) { func (r *Remote) Create(ctx context.Context, bucket, key string, opts *CreateOptions) (MultiWriteHandle, error) {
upload, err := r.project.UploadObject(ctx, bucket, key, &uplink.UploadOptions{ var customMetadata uplink.CustomMetadata
if opts.Metadata != nil {
customMetadata = uplink.CustomMetadata(opts.Metadata)
if err := customMetadata.Verify(); err != nil {
return nil, err
}
}
info, err := r.project.BeginUpload(ctx, bucket, key, &uplink.UploadOptions{
Expires: opts.Expires, Expires: opts.Expires,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
} }
return newUplinkMultiWriteHandle(r.project, bucket, info, customMetadata), nil
if opts.Metadata != nil {
if err := upload.SetCustomMetadata(ctx, uplink.CustomMetadata(opts.Metadata)); err != nil {
_ = upload.Abort()
return nil, err
}
}
return newUplinkWriteHandle(upload), nil
} }
// Move moves object to provided key and bucket. // Move moves object to provided key and bucket.

View File

@ -112,7 +112,7 @@ func (rfs *remoteFilesystem) Open(ctx context.Context, bucket, key string) (ulfs
return newMultiReadHandle(mf.contents), nil return newMultiReadHandle(mf.contents), nil
} }
func (rfs *remoteFilesystem) Create(ctx context.Context, bucket, key string, opts *ulfs.CreateOptions) (_ ulfs.WriteHandle, err error) { func (rfs *remoteFilesystem) Create(ctx context.Context, bucket, key string, opts *ulfs.CreateOptions) (_ ulfs.MultiWriteHandle, err error) {
rfs.mu.Lock() rfs.mu.Lock()
defer rfs.mu.Unlock() defer rfs.mu.Unlock()
@ -140,7 +140,7 @@ func (rfs *remoteFilesystem) Create(ctx context.Context, bucket, key string, opt
rfs.pending[loc] = append(rfs.pending[loc], wh) rfs.pending[loc] = append(rfs.pending[loc], wh)
return wh, nil return ulfs.NewGenericMultiWriteHandle(wh), nil
} }
func (rfs *remoteFilesystem) Move(ctx context.Context, oldbucket, oldkey string, newbucket, newkey string) error { func (rfs *remoteFilesystem) Move(ctx context.Context, oldbucket, oldkey string, newbucket, newkey string) error {
@ -282,12 +282,15 @@ type memWriteHandle struct {
done bool done bool
} }
func (b *memWriteHandle) Write(p []byte) (int, error) { func (b *memWriteHandle) WriteAt(p []byte, off int64) (int, error) {
if b.done { if b.done {
return 0, errs.New("write to closed handle") return 0, errs.New("write to closed handle")
} }
b.buf = append(b.buf, p...) end := int64(len(p)) + off
return len(p), nil if grow := end - int64(len(b.buf)); grow > 0 {
b.buf = append(b.buf, make([]byte, grow)...)
}
return copy(b.buf[off:], p), nil
} }
func (b *memWriteHandle) Commit() error { func (b *memWriteHandle) Commit() error {

View File

@ -199,7 +199,11 @@ func WithFile(location string, contents ...string) ExecuteOption {
cs.rfs.ensureBucket(bucket) cs.rfs.ensureBucket(bucket)
} }
wh, err := cs.fs.Create(ctx, loc, nil) mwh, err := cs.fs.Create(ctx, loc, nil)
require.NoError(t, err)
defer func() { _ = mwh.Abort(ctx) }()
wh, err := mwh.NextPart(ctx, -1)
require.NoError(t, err) require.NoError(t, err)
defer func() { _ = wh.Abort() }() defer func() { _ = wh.Abort() }()
@ -213,6 +217,7 @@ func WithFile(location string, contents ...string) ExecuteOption {
} }
require.NoError(t, wh.Commit()) require.NoError(t, wh.Commit())
require.NoError(t, mwh.Commit(ctx))
}} }}
} }

2
go.mod
View File

@ -43,7 +43,7 @@ require (
github.com/vivint/infectious v0.0.0-20200605153912-25a574ae18a3 github.com/vivint/infectious v0.0.0-20200605153912-25a574ae18a3
github.com/zeebo/assert v1.3.1 github.com/zeebo/assert v1.3.1
github.com/zeebo/blake3 v0.2.3 github.com/zeebo/blake3 v0.2.3
github.com/zeebo/clingy v0.0.0-20230301225531-f2d4117c8e8c github.com/zeebo/clingy v0.0.0-20230602044025-906be850f10d
github.com/zeebo/errs v1.3.0 github.com/zeebo/errs v1.3.0
github.com/zeebo/ini v0.0.0-20210514163846-cc8fbd8d9599 github.com/zeebo/ini v0.0.0-20210514163846-cc8fbd8d9599
github.com/zyedidia/generic v1.2.1 github.com/zyedidia/generic v1.2.1

4
go.sum
View File

@ -605,8 +605,8 @@ github.com/zeebo/assert v1.3.1 h1:vukIABvugfNMZMQO1ABsyQDJDTVQbn+LWSMy1ol1h6A=
github.com/zeebo/assert v1.3.1/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/assert v1.3.1/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
github.com/zeebo/blake3 v0.2.3 h1:TFoLXsjeXqRNFxSbk35Dk4YtszE/MQQGK10BH4ptoTg= github.com/zeebo/blake3 v0.2.3 h1:TFoLXsjeXqRNFxSbk35Dk4YtszE/MQQGK10BH4ptoTg=
github.com/zeebo/blake3 v0.2.3/go.mod h1:mjJjZpnsyIVtVgTOSpJ9vmRE4wgDeyt2HU3qXvvKCaQ= github.com/zeebo/blake3 v0.2.3/go.mod h1:mjJjZpnsyIVtVgTOSpJ9vmRE4wgDeyt2HU3qXvvKCaQ=
github.com/zeebo/clingy v0.0.0-20230301225531-f2d4117c8e8c h1:bE9vXPFKa9wkCCq1HJi2Ms4pWuBoIKQEMe6CZzu/TKE= github.com/zeebo/clingy v0.0.0-20230602044025-906be850f10d h1:INuKdI3R6zDA8UEgbBxDFkb7qwO/nSvnJRdrBCMW+To=
github.com/zeebo/clingy v0.0.0-20230301225531-f2d4117c8e8c/go.mod h1:MHEhXvEfewflU7SSVKHI7nkdU+fpyxZ5XPPzj+5gYNw= github.com/zeebo/clingy v0.0.0-20230602044025-906be850f10d/go.mod h1:MHEhXvEfewflU7SSVKHI7nkdU+fpyxZ5XPPzj+5gYNw=
github.com/zeebo/errs v1.2.2/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4= github.com/zeebo/errs v1.2.2/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4=
github.com/zeebo/errs v1.3.0 h1:hmiaKqgYZzcVgRL1Vkc1Mn2914BbzB0IBxs+ebeutGs= github.com/zeebo/errs v1.3.0 h1:hmiaKqgYZzcVgRL1Vkc1Mn2914BbzB0IBxs+ebeutGs=
github.com/zeebo/errs v1.3.0/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4= github.com/zeebo/errs v1.3.0/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4=