cmd/uplink: cp sets connection pool capacity based on parallelism

If the cp command is executed with higher level of parallelism, it would
open more connections to storage nodes at the same time. Therefore, the
connection pool capacity should be expanded accordingly.

The pool capacity is set to 100 * parallelism.

Change-Id: Ia8b3ab6a99340d8cbb87a7b80c3354b2b21c1958
This commit is contained in:
Kaloyan Raev 2022-04-15 17:10:39 +03:00
parent b24e5cbc44
commit 978e0f1a26
3 changed files with 21 additions and 2 deletions

View File

@ -18,6 +18,7 @@ import (
"github.com/zeebo/errs"
"storj.io/common/memory"
"storj.io/common/rpc/rpcpool"
"storj.io/common/sync2"
"storj.io/storj/cmd/uplink/ulext"
"storj.io/storj/cmd/uplink/ulfs"
@ -99,7 +100,11 @@ func (c *cmdCp) Setup(params clingy.Parameters) {
}
func (c *cmdCp) Execute(ctx clingy.Context) error {
fs, err := c.ex.OpenFilesystem(ctx, c.access)
fs, err := c.ex.OpenFilesystem(ctx, c.access, ulext.ConnectionPoolOptions(rpcpool.Options{
Capacity: 100 * c.parallelism,
KeyCapacity: 5,
IdleExpiration: 2 * time.Minute,
}))
if err != nil {
return err
}

View File

@ -7,6 +7,7 @@ import (
"context"
"storj.io/common/rpc"
"storj.io/common/rpc/rpcpool"
"storj.io/storj/cmd/uplink/ulext"
"storj.io/storj/cmd/uplink/ulfs"
"storj.io/uplink"
@ -46,5 +47,11 @@ func (ex *external) OpenProject(ctx context.Context, accessName string, options
transport.SetConnector(&config, rpc.NewHybridConnector())
}
if opts.ConnectionPoolOptions != (rpcpool.Options{}) {
if err := transport.SetConnectionPool(ctx, &config, rpcpool.New(opts.ConnectionPoolOptions)); err != nil {
return nil, err
}
}
return config.OpenProject(ctx, access)
}

View File

@ -16,6 +16,7 @@ import (
"github.com/zeebo/clingy"
"github.com/zeebo/errs"
"storj.io/common/rpc/rpcpool"
"storj.io/storj/cmd/uplink/ulfs"
"storj.io/uplink"
)
@ -42,7 +43,8 @@ type External interface {
// Options contains all of the possible options for opening a filesystem or project.
type Options struct {
EncryptionBypass bool
EncryptionBypass bool
ConnectionPoolOptions rpcpool.Options
}
// LoadOptions takes a slice of Option values and returns a filled out Options struct.
@ -63,6 +65,11 @@ func BypassEncryption(bypass bool) Option {
return Option{apply: func(opt *Options) { opt.EncryptionBypass = bypass }}
}
// ConnectionPoolOptions will initialize the connection pool with options.
func ConnectionPoolOptions(options rpcpool.Options) Option {
return Option{apply: func(opt *Options) { opt.ConnectionPoolOptions = options }}
}
// RegisterAccess registers an access grant with a Gateway Authorization Service.
func RegisterAccess(ctx context.Context, access *uplink.Access, authService string, public bool, timeout time.Duration) (accessKey, secretKey, endpoint string, err error) {
if authService == "" {