cmd/uplink: add back parallelism

for very large machines (>10Gbit) it is still useful
to have parallelism for uploads because we're actually
bound by getting new pieces from the satellite, so doing
that in parallel provides a big win.

this change adds back that flag to exist for uploads, and
removes the backwards compatibility code for the flag with
the maximum-concurrent-pieces as they are now independent.

the upload code parallelism story is now this:

    - each object is a transfer
    - each transfer happens in N parts (size dynamically
      chosen to avoid having >10000 parts)
    - each part can happen in parallel up to the limit
      specified
    - each parallel part can have up to the limit of
      max concurrent pieces and segments

this change also changes some defaults to be better.

    - the connection pool capacity now takes into acount
      transfers, parallelism and max concurrent pieces
    - the default smallest part size is 1GiB to allow the
      new upload code path to upload multiple segments

Change-Id: Iff6709ae73425fbc2858ed360faa2d3ece297c2d
This commit is contained in:
Jeff Wendling 2023-07-05 09:48:46 -04:00
parent 03690daa35
commit 1cbad0fcab
2 changed files with 44 additions and 124 deletions

View File

@ -85,8 +85,7 @@ func (c *cmdCp) Setup(params clingy.Parameters) {
).(bool) ).(bool)
c.byteRange = params.Flag("range", "Downloads the specified range bytes of an object. For more information about the HTTP Range header, see https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35", "").(string) c.byteRange = params.Flag("range", "Downloads the specified range bytes of an object. For more information about the HTTP Range header, see https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35", "").(string)
parallelism := params.Flag("parallelism", "Controls how many parallel chunks to upload/download from a file", nil, c.parallelism = params.Flag("parallelism", "Controls how many parallel parts to upload/download from a file", 1,
clingy.Optional,
clingy.Short('p'), clingy.Short('p'),
clingy.Transform(strconv.Atoi), clingy.Transform(strconv.Atoi),
clingy.Transform(func(n int) (int, error) { clingy.Transform(func(n int) (int, error) {
@ -95,8 +94,8 @@ func (c *cmdCp) Setup(params clingy.Parameters) {
} }
return n, nil return n, nil
}), }),
).(*int) ).(int)
c.parallelismChunkSize = params.Flag("parallelism-chunk-size", "Set the size of the chunks for parallelism, 0 means automatic adjustment", memory.Size(0), c.parallelismChunkSize = params.Flag("parallelism-chunk-size", "Set the size of the parts for parallelism, 0 means automatic adjustment", memory.Size(0),
clingy.Transform(memory.ParseString), clingy.Transform(memory.ParseString),
clingy.Transform(func(n int64) (memory.Size, error) { clingy.Transform(func(n int64) (memory.Size, error) {
if n < 0 { if n < 0 {
@ -107,17 +106,16 @@ func (c *cmdCp) Setup(params clingy.Parameters) {
).(memory.Size) ).(memory.Size)
c.uploadConfig = testuplink.DefaultConcurrentSegmentUploadsConfig() c.uploadConfig = testuplink.DefaultConcurrentSegmentUploadsConfig()
maxConcurrent := params.Flag( c.uploadConfig.SchedulerOptions.MaximumConcurrent = params.Flag(
"maximum-concurrent-pieces", "maximum-concurrent-pieces",
"Maximum concurrent pieces to upload at once per transfer", "Maximum concurrent pieces to upload at once per part",
nil, c.uploadConfig.SchedulerOptions.MaximumConcurrent,
clingy.Optional,
clingy.Transform(strconv.Atoi), clingy.Transform(strconv.Atoi),
clingy.Advanced, clingy.Advanced,
).(*int) ).(int)
c.uploadConfig.SchedulerOptions.MaximumConcurrentHandles = params.Flag( c.uploadConfig.SchedulerOptions.MaximumConcurrentHandles = params.Flag(
"maximum-concurrent-segments", "maximum-concurrent-segments",
"Maximum concurrent segments to upload at once per transfer", "Maximum concurrent segments to upload at once per part",
c.uploadConfig.SchedulerOptions.MaximumConcurrentHandles, c.uploadConfig.SchedulerOptions.MaximumConcurrentHandles,
clingy.Transform(strconv.Atoi), clingy.Transform(strconv.Atoi),
clingy.Advanced, clingy.Advanced,
@ -133,28 +131,6 @@ func (c *cmdCp) Setup(params clingy.Parameters) {
clingy.Advanced, clingy.Advanced,
).(string) ).(string)
{ // handle backwards compatibility around parallelism and maximum concurrent pieces
addr := func(x int) *int { return &x }
switch {
// if neither are actively set, use defaults
case parallelism == nil && maxConcurrent == nil:
parallelism = addr(1)
maxConcurrent = addr(c.uploadConfig.SchedulerOptions.MaximumConcurrent)
// if parallelism is not set, use a value based on maxConcurrent
case parallelism == nil:
parallelism = addr((*maxConcurrent + 99) / 100)
// if maxConcurrent is not set, use a value based on parallelism
case maxConcurrent == nil:
maxConcurrent = addr(100 * *parallelism)
}
c.uploadConfig.SchedulerOptions.MaximumConcurrent = *maxConcurrent
c.parallelism = *parallelism
}
c.inmemoryEC = params.Flag("inmemory-erasure-coding", "Keep erasure-coded pieces in-memory instead of writing them on the disk during upload", false, c.inmemoryEC = params.Flag("inmemory-erasure-coding", "Keep erasure-coded pieces in-memory instead of writing them on the disk during upload", false,
clingy.Transform(strconv.ParseBool), clingy.Transform(strconv.ParseBool),
clingy.Boolean, clingy.Boolean,
@ -194,9 +170,10 @@ func (c *cmdCp) Execute(ctx context.Context) error {
fs, err := c.ex.OpenFilesystem(ctx, c.access, fs, err := c.ex.OpenFilesystem(ctx, c.access,
ulext.ConcurrentSegmentUploadsConfig(c.uploadConfig), ulext.ConcurrentSegmentUploadsConfig(c.uploadConfig),
ulext.ConnectionPoolOptions(rpcpool.Options{ ulext.ConnectionPoolOptions(rpcpool.Options{
// Add a bit more capacity for connections to the satellite // Allow at least as many connections as the maximum concurrent pieces per
Capacity: c.uploadConfig.SchedulerOptions.MaximumConcurrent + 5, // parallel part per transfer, plus a few extra for the satellite.
KeyCapacity: 5, Capacity: c.transfers*c.parallelism*c.uploadConfig.SchedulerOptions.MaximumConcurrent + 5,
KeyCapacity: 2,
IdleExpiration: 2 * time.Minute, IdleExpiration: 2 * time.Minute,
})) }))
if err != nil { if err != nil {
@ -419,17 +396,6 @@ func (c *cmdCp) copyFile(ctx context.Context, fs ulfs.Filesystem, source, dest u
} }
defer func() { _ = mwh.Abort(ctx) }() defer func() { _ = mwh.Abort(ctx) }()
// if we're uploading, do a single part of maximum size
if dest.Remote() {
return errs.Wrap(c.singleCopy(
ctx,
source, dest,
mrh, mwh,
offset, length,
bar,
))
}
partSize, err := c.calculatePartSize(mrh.Length(), c.parallelismChunkSize.Int64()) partSize, err := c.calculatePartSize(mrh.Length(), c.parallelismChunkSize.Int64())
if err != nil { if err != nil {
return err return err
@ -448,13 +414,15 @@ func (c *cmdCp) copyFile(ctx context.Context, fs ulfs.Filesystem, source, dest u
// calculatePartSize returns the needed part size in order to upload the file with size of 'length'. // calculatePartSize returns the needed part size in order to upload the file with size of 'length'.
// It hereby respects if the client requests/prefers a certain size and only increases if needed. // It hereby respects if the client requests/prefers a certain size and only increases if needed.
func (c *cmdCp) calculatePartSize(length, preferredSize int64) (requiredSize int64, err error) { func (c *cmdCp) calculatePartSize(length, preferredSize int64) (requiredSize int64, err error) {
segC := (length / maxPartCount / (memory.MiB * 64).Int64()) + 1 segC := (length / maxPartCount / memory.GiB.Int64()) + 1
requiredSize = segC * (memory.MiB * 64).Int64() requiredSize = segC * memory.GiB.Int64()
switch { switch {
case preferredSize == 0: case preferredSize == 0:
return requiredSize, nil return requiredSize, nil
case requiredSize <= preferredSize: case requiredSize <= preferredSize:
return preferredSize, nil return preferredSize, nil
case length < 0: // let the user pick their size if we don't have a length to know better
return preferredSize, nil
default: default:
return 0, errs.New(fmt.Sprintf("the specified chunk size %s is too small, requires %s or larger", return 0, errs.New(fmt.Sprintf("the specified chunk size %s is too small, requires %s or larger",
memory.FormatBytes(preferredSize), memory.FormatBytes(requiredSize))) memory.FormatBytes(preferredSize), memory.FormatBytes(requiredSize)))
@ -535,8 +503,8 @@ func (c *cmdCp) parallelCopy(
} }
var readBufs *ulfs.BytesPool var readBufs *ulfs.BytesPool
if p > 1 && chunkSize > 0 && (source.Std() || dest.Std()) { if p > 1 && chunkSize > 0 && source.Std() {
// Create the read buffer pool only for uploads from stdin and downloads to stdout with parallelism > 1. // Create the read buffer pool only for uploads from stdin with parallelism > 1.
readBufs = ulfs.NewBytesPool(int(chunkSize)) readBufs = ulfs.NewBytesPool(int(chunkSize))
} }
@ -619,59 +587,6 @@ func (c *cmdCp) parallelCopy(
return errs.Wrap(combineErrs(es)) return errs.Wrap(combineErrs(es))
} }
func (c *cmdCp) singleCopy(
ctx context.Context,
source, dest ulloc.Location,
src ulfs.MultiReadHandle,
dst ulfs.MultiWriteHandle,
offset, length int64,
bar *mpb.Bar) error {
if offset != 0 {
if err := src.SetOffset(offset); err != nil {
return err
}
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
rh, err := src.NextPart(ctx, length)
if err != nil {
return errs.Wrap(err)
}
defer func() { _ = rh.Close() }()
wh, err := dst.NextPart(ctx, length)
if err != nil {
return errs.Wrap(err)
}
defer func() { _ = wh.Abort() }()
var w io.Writer = wh
if bar != nil {
bar.SetTotal(rh.Info().ContentLength, false)
bar.EnableTriggerComplete()
pw := bar.ProxyWriter(w)
defer func() { _ = pw.Close() }()
w = pw
}
if _, err := sync2.Copy(ctx, w, rh); err != nil {
return errs.Wrap(err)
}
if err := wh.Commit(); err != nil {
return errs.Wrap(err)
}
if err := dst.Commit(ctx); err != nil {
return errs.Wrap(err)
}
return nil
}
func newProgressBar(progress *mpb.Progress, name string, which, total int) *mpb.Bar { func newProgressBar(progress *mpb.Progress, name string, which, total int) *mpb.Bar {
const counterFmt = " % .2f / % .2f" const counterFmt = " % .2f / % .2f"
const percentageFmt = "%.2f " const percentageFmt = "%.2f "

View File

@ -99,46 +99,51 @@ func TestCpDownload(t *testing.T) {
func TestCpPartSize(t *testing.T) { func TestCpPartSize(t *testing.T) {
c := newCmdCp(nil) c := newCmdCp(nil)
// 1GiB file, should return 64MiB // 10 GiB file, should return 1 GiB
partSize, err := c.calculatePartSize(memory.GiB.Int64(), c.parallelismChunkSize.Int64()) partSize, err := c.calculatePartSize(10*memory.GiB.Int64(), c.parallelismChunkSize.Int64())
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, memory.MiB*64, partSize) require.EqualValues(t, 1*memory.GiB, partSize)
// 640 GB file, should return 64MiB. // 10000 GB file, should return 1 GiB.
partSize, err = c.calculatePartSize(memory.GB.Int64()*640, c.parallelismChunkSize.Int64()) partSize, err = c.calculatePartSize(10000*memory.GB.Int64(), c.parallelismChunkSize.Int64())
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, memory.MiB*64, partSize) require.EqualValues(t, 1*memory.GiB, partSize)
// 640GiB file, should return 128MiB. // 10000 GiB file, should return 2 GiB.
partSize, err = c.calculatePartSize(memory.GiB.Int64()*640, c.parallelismChunkSize.Int64()) partSize, err = c.calculatePartSize(10000*memory.GiB.Int64(), c.parallelismChunkSize.Int64())
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, memory.MiB*128, partSize) require.EqualValues(t, 2*memory.GiB, partSize)
// 1TiB file, should return 128MiB. // 10 TiB file, should return 2 GiB.
partSize, err = c.calculatePartSize(memory.TiB.Int64(), c.parallelismChunkSize.Int64()) partSize, err = c.calculatePartSize(10*memory.TiB.Int64(), c.parallelismChunkSize.Int64())
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, memory.MiB*128, partSize) require.EqualValues(t, 2*memory.GiB, partSize)
// 1.3TiB file, should return 192MiB. // 20001 GiB file, should return 3 GiB.
partSize, err = c.calculatePartSize(memory.GiB.Int64()*1300, c.parallelismChunkSize.Int64()) partSize, err = c.calculatePartSize(20001*memory.GiB.Int64(), c.parallelismChunkSize.Int64())
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, memory.MiB*192, partSize) require.EqualValues(t, 3*memory.GiB, partSize)
// should return 1GiB as requested. // should return 1GiB as requested.
partSize, err = c.calculatePartSize(memory.GiB.Int64()*1300, memory.GiB.Int64()) partSize, err = c.calculatePartSize(memory.GiB.Int64()*1300, memory.GiB.Int64())
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, memory.GiB, partSize) require.EqualValues(t, memory.GiB, partSize)
// should return 192 MiB and error, since preferred is too low. // should return 1 GiB and error, since preferred is too low.
partSize, err = c.calculatePartSize(memory.GiB.Int64()*1300, memory.MiB.Int64()) partSize, err = c.calculatePartSize(1300*memory.GiB.Int64(), memory.MiB.Int64())
require.Error(t, err) require.Error(t, err)
require.Equal(t, "the specified chunk size 1.0 MiB is too small, requires 192.0 MiB or larger", err.Error()) require.Equal(t, "the specified chunk size 1.0 MiB is too small, requires 1.0 GiB or larger", err.Error())
require.Zero(t, partSize) require.Zero(t, partSize)
// negative length should return 64MiB part size // negative length should return asked for amount
partSize, err = c.calculatePartSize(-1, c.parallelismChunkSize.Int64()) partSize, err = c.calculatePartSize(-1, 1*memory.GiB.Int64())
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, memory.MiB*64, partSize) require.EqualValues(t, 1*memory.GiB, partSize)
// negative length should return specified amount
partSize, err = c.calculatePartSize(-1, 100)
require.NoError(t, err)
require.EqualValues(t, 100, partSize)
} }
func TestCpUpload(t *testing.T) { func TestCpUpload(t *testing.T) {