// Copyright (C) 2021 Storj Labs, Inc. // See LICENSE for copying information. package main import ( "fmt" "io" "strconv" "sync" "github.com/zeebo/clingy" "github.com/zeebo/errs" "storj.io/common/sync2" "storj.io/storj/cmd/uplinkng/ulext" "storj.io/storj/cmd/uplinkng/ulfs" "storj.io/storj/cmd/uplinkng/ulloc" ) type cmdRm struct { ex ulext.External access string recursive bool parallelism int encrypted bool pending bool location ulloc.Location } func newCmdRm(ex ulext.External) *cmdRm { return &cmdRm{ex: ex} } func (c *cmdRm) Setup(params clingy.Parameters) { c.access = params.Flag("access", "Access name or value to use", "").(string) c.recursive = params.Flag("recursive", "Remove recursively", false, clingy.Short('r'), clingy.Transform(strconv.ParseBool), ).(bool) c.parallelism = params.Flag("parallelism", "Controls how many uploads/downloads to perform in parallel", 1, clingy.Short('p'), clingy.Transform(strconv.Atoi), clingy.Transform(func(n int) (int, error) { if n <= 0 { return 0, errs.New("parallelism must be at least 1") } return n, nil }), ).(int) c.encrypted = params.Flag("encrypted", "Interprets keys base64 encoded without decrypting", false, clingy.Transform(strconv.ParseBool), ).(bool) c.pending = params.Flag("pending", "Remove pending object uploads instead", false, clingy.Transform(strconv.ParseBool), ).(bool) c.location = params.Arg("location", "Location to remove (sj://BUCKET[/KEY])", clingy.Transform(ulloc.Parse), ).(ulloc.Location) } func (c *cmdRm) Execute(ctx clingy.Context) error { fs, err := c.ex.OpenFilesystem(ctx, c.access, ulext.BypassEncryption(c.encrypted)) if err != nil { return err } defer func() { _ = fs.Close() }() if !c.recursive { err := fs.Remove(ctx, c.location, &ulfs.RemoveOptions{ Pending: c.pending, }) if err != nil { return err } fmt.Fprintln(ctx.Stdout(), "removed", c.location) return nil } iter, err := fs.List(ctx, c.location, &ulfs.ListOptions{ Recursive: true, Pending: c.pending, }) if err != nil { return err } var ( limiter = sync2.NewLimiter(c.parallelism) es errs.Group mu sync.Mutex ) fprintln := func(w io.Writer, args ...interface{}) { mu.Lock() defer mu.Unlock() fmt.Fprintln(w, args...) } addError := func(err error) { mu.Lock() defer mu.Unlock() es.Add(err) } for iter.Next() { loc := iter.Item().Loc ok := limiter.Go(ctx, func() { err := fs.Remove(ctx, loc, &ulfs.RemoveOptions{ Pending: c.pending, }) if err != nil { fprintln(ctx.Stderr(), "remove", loc, "failed:", err.Error()) addError(err) } else { fprintln(ctx.Stdout(), "removed", loc) } }) if !ok { break } } limiter.Wait() if err := iter.Err(); err != nil { return errs.Wrap(err) } else if len(es) > 0 { return es.Err() } return nil }