cmd/uplink: pass in a maximum concurrent segments value

also change the config creation to be more robust to
changes that add defaults in the future by not fully
reconstructing the config value passed in to the
project.

Change-Id: I673e8b54ce0b951ae735bf4658525c477c26ac5a
This commit is contained in:
Jeff Wendling 2023-05-25 10:53:38 -04:00
parent a0fbc87b31
commit ccfe5cae49

View File

@ -27,7 +27,6 @@ import (
"storj.io/storj/cmd/uplink/ulext"
"storj.io/storj/cmd/uplink/ulfs"
"storj.io/storj/cmd/uplink/ulloc"
"storj.io/uplink/private/eestream/scheduler"
"storj.io/uplink/private/testuplink"
)
@ -46,8 +45,7 @@ type cmdCp struct {
parallelism int
parallelismChunkSize memory.Size
maximumConcurrentPieces int
longTailMargin int
uploadConfig testuplink.ConcurrentSegmentUploadsConfig
uploadLogFile string
inmemoryEC bool
@ -105,12 +103,25 @@ func (c *cmdCp) Setup(params clingy.Parameters) {
}),
).(memory.Size)
def := testuplink.DefaultConcurrentSegmentUploadsConfig()
c.maximumConcurrentPieces = params.Flag("maximum-concurrent-pieces", "Maximum concurrent pieces to upload at once per transfer", def.SchedulerOptions.MaximumConcurrent,
c.uploadConfig = testuplink.DefaultConcurrentSegmentUploadsConfig()
c.uploadConfig.SchedulerOptions.MaximumConcurrent = params.Flag(
"maximum-concurrent-pieces",
"Maximum concurrent pieces to upload at once per transfer",
c.uploadConfig.SchedulerOptions.MaximumConcurrent,
clingy.Transform(strconv.Atoi),
clingy.Advanced,
).(int)
c.longTailMargin = params.Flag("long-tail-margin", "How many extra pieces to upload and cancel per segment", def.LongTailMargin,
c.uploadConfig.SchedulerOptions.MaximumConcurrentHandles = params.Flag(
"maximum-concurrent-segments",
"Maximum concurrent segments to upload at once per transfer",
c.uploadConfig.SchedulerOptions.MaximumConcurrentHandles,
clingy.Transform(strconv.Atoi),
clingy.Advanced,
).(int)
c.uploadConfig.LongTailMargin = params.Flag(
"long-tail-margin",
"How many extra pieces to upload and cancel per segment",
c.uploadConfig.LongTailMargin,
clingy.Transform(strconv.Atoi),
clingy.Advanced,
).(int)
@ -155,14 +166,10 @@ func (c *cmdCp) Execute(ctx context.Context) error {
}
fs, err := c.ex.OpenFilesystem(ctx, c.access,
ulext.ConcurrentSegmentUploadsConfig(testuplink.ConcurrentSegmentUploadsConfig{
SchedulerOptions: scheduler.Options{
MaximumConcurrent: c.maximumConcurrentPieces,
},
LongTailMargin: c.longTailMargin,
}),
ulext.ConcurrentSegmentUploadsConfig(c.uploadConfig),
ulext.ConnectionPoolOptions(rpcpool.Options{
Capacity: c.maximumConcurrentPieces,
// Add a bit more capacity for connections to the satellite
Capacity: c.uploadConfig.SchedulerOptions.MaximumConcurrent + 5,
KeyCapacity: 5,
IdleExpiration: 2 * time.Minute,
}))