cmd/uplinkng: allow removing pending objects

Change-Id: I12457e7d6fb28492ed4b6c5816f78aa7820fed6b
This commit is contained in:
Jeff Wendling 2021-10-01 19:47:53 -04:00
parent 2df41028a3
commit 987cb6ab11
11 changed files with 213 additions and 91 deletions

View File

@ -110,7 +110,9 @@ func (c *cmdCp) copyRecursive(ctx clingy.Context, fs ulfs.Filesystem) error {
return errs.New("cannot recursively copy to stdin/stdout")
}
iter, err := fs.ListObjects(ctx, c.source, true)
iter, err := fs.List(ctx, c.source, &ulfs.ListOptions{
Recursive: true,
})
if err != nil {
return err
}

View File

@ -91,12 +91,10 @@ func (c *cmdLs) listLocation(ctx clingy.Context, prefix ulloc.Location) error {
defer tw.Done()
// create the object iterator of either existing objects or pending multipart uploads
var iter ulfs.ObjectIterator
if c.pending {
iter, err = fs.ListUploads(ctx, prefix, c.recursive)
} else {
iter, err = fs.ListObjects(ctx, prefix, c.recursive)
}
iter, err := fs.List(ctx, prefix, &ulfs.ListOptions{
Recursive: c.recursive,
Pending: c.pending,
})
if err != nil {
return err
}

View File

@ -14,6 +14,7 @@ import (
"storj.io/common/sync2"
"storj.io/storj/cmd/uplinkng/ulext"
"storj.io/storj/cmd/uplinkng/ulfs"
"storj.io/storj/cmd/uplinkng/ulloc"
)
@ -24,6 +25,7 @@ type cmdRm struct {
recursive bool
parallelism int
encrypted bool
pending bool
location ulloc.Location
}
@ -51,6 +53,9 @@ func (c *cmdRm) Setup(params clingy.Parameters) {
c.encrypted = params.Flag("encrypted", "Interprets keys base64 encoded without decrypting", false,
clingy.Transform(strconv.ParseBool),
).(bool)
c.pending = params.Flag("pending", "Remove pending object uploads instead", false,
clingy.Transform(strconv.ParseBool),
).(bool)
c.location = params.Arg("location", "Location to remove (sj://BUCKET[/KEY])",
clingy.Transform(ulloc.Parse),
@ -65,7 +70,10 @@ func (c *cmdRm) Execute(ctx clingy.Context) error {
defer func() { _ = fs.Close() }()
if !c.recursive {
if err := fs.Remove(ctx, c.location); err != nil {
err := fs.Remove(ctx, c.location, &ulfs.RemoveOptions{
Pending: c.pending,
})
if err != nil {
return err
}
@ -73,7 +81,10 @@ func (c *cmdRm) Execute(ctx clingy.Context) error {
return nil
}
iter, err := fs.ListObjects(ctx, c.location, c.recursive)
iter, err := fs.List(ctx, c.location, &ulfs.ListOptions{
Recursive: true,
Pending: c.pending,
})
if err != nil {
return err
}
@ -102,7 +113,10 @@ func (c *cmdRm) Execute(ctx clingy.Context) error {
loc := iter.Item().Loc
ok := limiter.Go(ctx, func() {
if err := fs.Remove(ctx, loc); err != nil {
err := fs.Remove(ctx, loc, &ulfs.RemoveOptions{
Pending: c.pending,
})
if err != nil {
fprintln(ctx.Stderr(), "remove", loc, "failed:", err.Error())
addError(err)
} else {

View File

@ -40,6 +40,31 @@ func TestRmRemote(t *testing.T) {
ultest.File{Loc: "/home/user/files/file2.txt"},
)
})
t.Run("Pending", func(t *testing.T) {
state := ultest.Setup(commands,
ultest.WithPendingFile("sj://user/files/file1.txt"),
ultest.WithPendingFile("sj://user/files/file2.txt"),
ultest.WithPendingFile("sj://user/other_file1.txt"),
)
state.Succeed(t, "rm", "sj://user/files/file1.txt", "--pending").RequirePending(t,
ultest.File{Loc: "sj://user/files/file2.txt"},
ultest.File{Loc: "sj://user/other_file1.txt"},
)
})
t.Run("Pending Recursive", func(t *testing.T) {
state := ultest.Setup(commands,
ultest.WithPendingFile("sj://user/files/file1.txt"),
ultest.WithPendingFile("sj://user/files/file2.txt"),
ultest.WithPendingFile("sj://user/other_file1.txt"),
)
state.Succeed(t, "rm", "sj://user/files", "-r", "--pending").RequirePending(t,
ultest.File{Loc: "sj://user/other_file1.txt"},
)
})
}
func TestRmLocal(t *testing.T) {
@ -73,4 +98,20 @@ func TestRmLocal(t *testing.T) {
ultest.File{Loc: "/home/user/other_file1.txt"},
)
})
t.Run("Pending", func(t *testing.T) {
state := ultest.Setup(commands,
ultest.WithFile("sj://user/file1.txt"),
ultest.WithFile("sj://user/file2.txt"),
ultest.WithFile("/home/user/file1.txt"),
ultest.WithFile("/home/user/file2.txt"),
)
state.Succeed(t, "rm", "/home/user/file1.txt", "--pending").RequireFiles(t,
ultest.File{Loc: "sj://user/file1.txt"},
ultest.File{Loc: "sj://user/file2.txt"},
ultest.File{Loc: "/home/user/file1.txt"},
ultest.File{Loc: "/home/user/file2.txt"},
)
})
}

View File

@ -16,14 +16,29 @@ import (
"storj.io/uplink"
)
// ListOptions describes options to the List command.
type ListOptions struct {
Recursive bool
Pending bool
}
func (lo *ListOptions) isRecursive() bool { return lo != nil && lo.Recursive }
func (lo *ListOptions) isPending() bool { return lo != nil && lo.Pending }
// RemoveOptions describes options to the Remove command.
type RemoveOptions struct {
Pending bool
}
func (ro *RemoveOptions) isPending() bool { return ro != nil && ro.Pending }
// Filesystem represents either the local Filesystem or the data backed by a project.
type Filesystem interface {
Close() error
Open(ctx clingy.Context, loc ulloc.Location) (ReadHandle, error)
Create(ctx clingy.Context, loc ulloc.Location) (WriteHandle, error)
Remove(ctx context.Context, loc ulloc.Location) error
ListObjects(ctx context.Context, prefix ulloc.Location, recursive bool) (ObjectIterator, error)
ListUploads(ctx context.Context, prefix ulloc.Location, recursive bool) (ObjectIterator, error)
Remove(ctx context.Context, loc ulloc.Location, opts *RemoveOptions) error
List(ctx context.Context, prefix ulloc.Location, opts *ListOptions) (ObjectIterator, error)
IsLocalDir(ctx context.Context, loc ulloc.Location) bool
}

View File

@ -55,7 +55,11 @@ func (l *Local) Create(ctx context.Context, path string) (WriteHandle, error) {
}
// Remove unlinks the file at the path. It is not an error if the file does not exist.
func (l *Local) Remove(ctx context.Context, path string) error {
func (l *Local) Remove(ctx context.Context, path string, opts *RemoveOptions) error {
if opts.isPending() {
return nil
}
if err := os.Remove(path); os.IsNotExist(err) {
return nil
} else if err != nil {
@ -65,9 +69,13 @@ func (l *Local) Remove(ctx context.Context, path string) error {
return nil
}
// ListObjects returns an ObjectIterator listing files and directories that have string prefix
// List returns an ObjectIterator listing files and directories that have string prefix
// with the provided path.
func (l *Local) ListObjects(ctx context.Context, path string, recursive bool) (ObjectIterator, error) {
func (l *Local) List(ctx context.Context, path string, opts *ListOptions) (ObjectIterator, error) {
if opts.isPending() {
return emptyObjectIterator{}, nil
}
prefix := path
if idx := strings.LastIndex(path, "/"); idx >= 0 {
prefix = path[:idx+1]
@ -80,7 +88,7 @@ func (l *Local) ListObjects(ctx context.Context, path string, recursive bool) (O
prefix += string(filepath.Separator)
var files []os.FileInfo
if recursive {
if opts.isRecursive() {
err = filepath.Walk(prefix, func(path string, info os.FileInfo, err error) error {
if err == nil && !info.IsDir() {
rel, err := filepath.Rel(prefix, path)
@ -112,7 +120,7 @@ func (l *Local) ListObjects(ctx context.Context, path string, recursive bool) (O
})
var trim ulloc.Location
if !recursive {
if !opts.isRecursive() {
trim = ulloc.NewLocal(prefix)
}

View File

@ -52,36 +52,26 @@ func (m *Mixed) Create(ctx clingy.Context, loc ulloc.Location) (WriteHandle, err
}
// Remove deletes either a local file or remote object.
func (m *Mixed) Remove(ctx context.Context, loc ulloc.Location) error {
func (m *Mixed) Remove(ctx context.Context, loc ulloc.Location, opts *RemoveOptions) error {
if bucket, key, ok := loc.RemoteParts(); ok {
return m.remote.Remove(ctx, bucket, key)
return m.remote.Remove(ctx, bucket, key, opts)
} else if path, ok := loc.LocalParts(); ok {
return m.local.Remove(ctx, path)
return m.local.Remove(ctx, path, opts)
}
return nil
}
// ListObjects lists either files and directories with some local path prefix or remote objects
// List lists either files and directories with some local path prefix or remote objects
// with a given bucket and key.
func (m *Mixed) ListObjects(ctx context.Context, prefix ulloc.Location, recursive bool) (ObjectIterator, error) {
func (m *Mixed) List(ctx context.Context, prefix ulloc.Location, opts *ListOptions) (ObjectIterator, error) {
if bucket, key, ok := prefix.RemoteParts(); ok {
return m.remote.ListObjects(ctx, bucket, key, recursive), nil
return m.remote.List(ctx, bucket, key, opts), nil
} else if path, ok := prefix.LocalParts(); ok {
return m.local.ListObjects(ctx, path, recursive)
return m.local.List(ctx, path, opts)
}
return nil, errs.New("unable to list objects for prefix %q", prefix)
}
// ListUploads lists all of the pending uploads for remote objects with some given bucket and key.
func (m *Mixed) ListUploads(ctx context.Context, prefix ulloc.Location, recursive bool) (ObjectIterator, error) {
if bucket, key, ok := prefix.RemoteParts(); ok {
return m.remote.ListUploads(ctx, bucket, key, recursive), nil
} else if prefix.Local() {
return emptyObjectIterator{}, nil
}
return nil, errs.New("unable to list uploads for prefix %q", prefix)
}
// IsLocalDir returns true if the location is a directory that is local.
func (m *Mixed) IsLocalDir(ctx context.Context, loc ulloc.Location) bool {
if path, ok := loc.LocalParts(); ok {

View File

@ -43,65 +43,74 @@ func (r *Remote) Open(ctx context.Context, bucket, key string) (ReadHandle, erro
func (r *Remote) Create(ctx context.Context, bucket, key string) (WriteHandle, error) {
fh, err := r.project.UploadObject(ctx, bucket, key, nil)
if err != nil {
return nil, err
return nil, errs.Wrap(err)
}
return newUplinkWriteHandle(fh), nil
}
// Remove deletes the object at the provided key and bucket.
func (r *Remote) Remove(ctx context.Context, bucket, key string) error {
_, err := r.project.DeleteObject(ctx, bucket, key)
if err != nil {
return err
func (r *Remote) Remove(ctx context.Context, bucket, key string, opts *RemoveOptions) error {
if !opts.isPending() {
_, err := r.project.DeleteObject(ctx, bucket, key)
if err != nil {
return errs.Wrap(err)
}
return nil
}
// TODO: we may need a dedicated endpoint for deleting pending object streams
list := r.project.ListUploads(ctx, bucket, &uplink.ListUploadsOptions{Prefix: key})
// TODO: modify when we can have several pending objects for the same object key
if list.Next() {
err := r.project.AbortUpload(ctx, bucket, key, list.Item().UploadID)
if err != nil {
return errs.Wrap(err)
}
}
if err := list.Err(); err != nil {
return errs.Wrap(err)
}
return nil
}
// ListObjects lists all of the objects in some bucket that begin with the given prefix.
func (r *Remote) ListObjects(ctx context.Context, bucket, prefix string, recursive bool) ObjectIterator {
// List lists all of the objects in some bucket that begin with the given prefix.
func (r *Remote) List(ctx context.Context, bucket, prefix string, opts *ListOptions) ObjectIterator {
parentPrefix := ""
if idx := strings.LastIndexByte(prefix, '/'); idx >= 0 {
parentPrefix = prefix[:idx+1]
}
trim := ulloc.NewRemote(bucket, "")
if !recursive {
if !opts.isRecursive() {
trim = ulloc.NewRemote(bucket, parentPrefix)
}
var iter ObjectIterator
if opts.isPending() {
iter = newUplinkUploadIterator(
bucket,
r.project.ListUploads(ctx, bucket, &uplink.ListUploadsOptions{
Prefix: parentPrefix,
Recursive: opts.Recursive,
System: true,
}),
)
} else {
iter = newUplinkObjectIterator(
bucket,
r.project.ListObjects(ctx, bucket, &uplink.ListObjectsOptions{
Prefix: parentPrefix,
Recursive: opts.Recursive,
System: true,
}),
)
}
return &filteredObjectIterator{
trim: trim,
filter: ulloc.NewRemote(bucket, prefix),
iter: newUplinkObjectIterator(bucket, r.project.ListObjects(ctx, bucket,
&uplink.ListObjectsOptions{
Prefix: parentPrefix,
Recursive: recursive,
System: true,
})),
}
}
// ListUploads lists all of the pending uploads in some bucket that begin with the given prefix.
func (r *Remote) ListUploads(ctx context.Context, bucket, prefix string, recursive bool) ObjectIterator {
parentPrefix := ""
if idx := strings.LastIndexByte(prefix, '/'); idx >= 0 {
parentPrefix = prefix[:idx+1]
}
trim := ulloc.NewRemote(bucket, "")
if !recursive {
trim = ulloc.NewRemote(bucket, parentPrefix)
}
return &filteredObjectIterator{
trim: trim,
filter: ulloc.NewRemote(bucket, prefix),
iter: newUplinkUploadIterator(bucket, r.project.ListUploads(ctx, bucket,
&uplink.ListUploadsOptions{
Prefix: parentPrefix,
Recursive: recursive,
System: true,
})),
iter: iter,
}
}

View File

@ -60,6 +60,19 @@ func (tfs *testFilesystem) Files() (files []File) {
return files
}
func (tfs *testFilesystem) Pending() (files []File) {
for loc, mh := range tfs.pending {
for _, h := range mh {
files = append(files, File{
Loc: loc.String(),
Contents: h.buf.String(),
})
}
}
sort.Slice(files, func(i, j int) bool { return files[i].less(files[j]) })
return files
}
func (tfs *testFilesystem) Close() error {
return nil
}
@ -105,17 +118,28 @@ func (tfs *testFilesystem) Create(ctx clingy.Context, loc ulloc.Location) (_ ulf
cre: tfs.created,
}
tfs.pending[loc] = append(tfs.pending[loc], wh)
if loc.Remote() {
tfs.pending[loc] = append(tfs.pending[loc], wh)
}
return wh, nil
}
func (tfs *testFilesystem) Remove(ctx context.Context, loc ulloc.Location) error {
delete(tfs.files, loc)
func (tfs *testFilesystem) Remove(ctx context.Context, loc ulloc.Location, opts *ulfs.RemoveOptions) error {
if opts == nil || !opts.Pending {
delete(tfs.files, loc)
} else {
// TODO: Remove needs an API that understands that multiple pending files may exist
delete(tfs.pending, loc)
}
return nil
}
func (tfs *testFilesystem) ListObjects(ctx context.Context, prefix ulloc.Location, recursive bool) (ulfs.ObjectIterator, error) {
func (tfs *testFilesystem) List(ctx context.Context, prefix ulloc.Location, opts *ulfs.ListOptions) (ulfs.ObjectIterator, error) {
if opts != nil && opts.Pending {
return tfs.listPending(ctx, prefix, opts)
}
prefixDir := prefix.AsDirectoryish()
var infos []ulfs.ObjectInfo
@ -130,19 +154,23 @@ func (tfs *testFilesystem) ListObjects(ctx context.Context, prefix ulloc.Locatio
sort.Sort(objectInfos(infos))
if !recursive {
if opts == nil || !opts.Recursive {
infos = collapseObjectInfos(prefix, infos)
}
return &objectInfoIterator{infos: infos}, nil
}
func (tfs *testFilesystem) ListUploads(ctx context.Context, prefix ulloc.Location, recursive bool) (ulfs.ObjectIterator, error) {
func (tfs *testFilesystem) listPending(ctx context.Context, prefix ulloc.Location, opts *ulfs.ListOptions) (ulfs.ObjectIterator, error) {
if prefix.Local() {
return &objectInfoIterator{}, nil
}
prefixDir := prefix.AsDirectoryish()
var infos []ulfs.ObjectInfo
for loc, whs := range tfs.pending {
if loc.Remote() && loc.HasPrefix(prefixDir) || loc == prefix {
if loc.HasPrefix(prefixDir) || loc == prefix {
for _, wh := range whs {
infos = append(infos, ulfs.ObjectInfo{
Loc: loc,
@ -154,7 +182,7 @@ func (tfs *testFilesystem) ListUploads(ctx context.Context, prefix ulloc.Locatio
sort.Sort(objectInfos(infos))
if !recursive {
if opts == nil || !opts.Recursive {
infos = collapseObjectInfos(prefix, infos)
}

View File

@ -15,11 +15,12 @@ import (
// Result captures all the output of running a command for inspection.
type Result struct {
Stdout string
Stderr string
Ok bool
Err error
Files []File
Stdout string
Stderr string
Ok bool
Err error
Files []File
Pending []File
}
// RequireSuccess fails if the Result did not observe a successful execution.
@ -60,6 +61,13 @@ func (r Result) RequireFiles(t *testing.T, files ...File) Result {
return r
}
// RequirePending requires that the set of files provided are all of the files that
// existed as pending at the end of the execution.
func (r Result) RequirePending(t *testing.T, files ...File) Result {
require.Equal(t, canonicalizePendingFiles(files), r.Pending)
return r
}
// RequireLocalFiles requires that the set of files provided are all of the
// local files that existed at the end of the execution. It assumes any passed
// in files with no contents contain the filename as the contents instead.
@ -98,6 +106,12 @@ func canonicalizeFiles(files []File) (out []File) {
return out
}
func canonicalizePendingFiles(files []File) (out []File) {
out = append(out, files...)
sort.Slice(out, func(i, j int) bool { return out[i].less(out[j]) })
return out
}
func fileIsLocal(file File) bool {
loc, _ := ulloc.Parse(file.Loc)
return loc.Local()

View File

@ -93,11 +93,12 @@ func (st State) Run(t *testing.T, args ...string) Result {
}
return Result{
Stdout: stdout.String(),
Stderr: stderr.String(),
Ok: ok,
Err: err,
Files: tfs.Files(),
Stdout: stdout.String(),
Stderr: stderr.String(),
Ok: ok,
Err: err,
Files: tfs.Files(),
Pending: tfs.Pending(),
}
}
@ -164,6 +165,8 @@ func WithPendingFile(location string) ExecuteOption {
if bucket, _, ok := loc.RemoteParts(); ok {
tfs.ensureBucket(bucket)
} else {
t.Fatalf("Invalid pending local file: %s", loc)
}
_, err = tfs.Create(ctx, loc)