cmd/uplinkng: parallel rm

Change-Id: I4f911ce1f384f13812986a34b96a94ded70cfb4e
This commit is contained in:
Jeff Wendling 2021-08-13 15:33:27 -04:00
parent d57583dfd4
commit 2782e000ac

View File

@ -5,11 +5,14 @@ package main
import (
"fmt"
"io"
"strconv"
"sync"
"github.com/zeebo/clingy"
"github.com/zeebo/errs"
"storj.io/common/sync2"
"storj.io/storj/cmd/uplinkng/ulext"
"storj.io/storj/cmd/uplinkng/ulloc"
)
@ -17,9 +20,10 @@ import (
type cmdRm struct {
ex ulext.External
access string
recursive bool
encrypted bool
access string
recursive bool
parallelism int
encrypted bool
location ulloc.Location
}
@ -34,6 +38,16 @@ func (c *cmdRm) Setup(params clingy.Parameters) {
clingy.Short('r'),
clingy.Transform(strconv.ParseBool),
).(bool)
c.parallelism = params.Flag("parallelism", "Controls how many uploads/downloads to perform in parallel", 1,
clingy.Short('p'),
clingy.Transform(strconv.Atoi),
clingy.Transform(func(n int) (int, error) {
if n <= 0 {
return 0, errs.New("parallelism must be at least 1")
}
return n, nil
}),
).(int)
c.encrypted = params.Flag("encrypted", "Interprets keys base64 encoded without decrypting", false,
clingy.Transform(strconv.ParseBool),
).(bool)
@ -64,22 +78,48 @@ func (c *cmdRm) Execute(ctx clingy.Context) error {
return err
}
anyFailed := false
var (
limiter = sync2.NewLimiter(c.parallelism)
es errs.Group
mu sync.Mutex
)
fprintln := func(w io.Writer, args ...interface{}) {
mu.Lock()
defer mu.Unlock()
fmt.Fprintln(w, args...)
}
addError := func(err error) {
mu.Lock()
defer mu.Unlock()
es.Add(err)
}
for iter.Next() {
loc := iter.Item().Loc
if err := fs.Remove(ctx, loc); err != nil {
fmt.Fprintln(ctx.Stderr(), "remove", loc, "failed:", err.Error())
anyFailed = true
} else {
fmt.Fprintln(ctx.Stdout(), "removed", loc)
ok := limiter.Go(ctx, func() {
if err := fs.Remove(ctx, loc); err != nil {
fprintln(ctx.Stderr(), "remove", loc, "failed:", err.Error())
addError(err)
} else {
fprintln(ctx.Stdout(), "removed", loc)
}
})
if !ok {
break
}
}
limiter.Wait()
if err := iter.Err(); err != nil {
return errs.Wrap(err)
} else if anyFailed {
return errs.New("some removals failed")
} else if len(es) > 0 {
return es.Err()
}
return nil
}