diff --git a/cmd/uplink/cmd_cp_test.go b/cmd/uplink/cmd_cp_test.go index b51bc36e3..d808cc2d2 100644 --- a/cmd/uplink/cmd_cp_test.go +++ b/cmd/uplink/cmd_cp_test.go @@ -177,12 +177,12 @@ func TestCpRecursiveDifficult(t *testing.T) { t.Run("DirectoryConflict", func(t *testing.T) { state := ultest.Setup(commands, - ultest.WithFile("sj://user/fileder"), - ultest.WithFile("sj://user/fileder/file"), + ultest.WithFile("sj://user/filedir"), + ultest.WithFile("sj://user/filedir/file"), ) state.Fail(t, "cp", "sj://user", "root", "--recursive").RequireLocalFiles(t, - ultest.File{Loc: "root/fileder", Contents: "sj://user/fileder"}, + ultest.File{Loc: "root/filedir", Contents: "sj://user/filedir"}, ) }) @@ -196,8 +196,8 @@ func TestCpRecursiveDifficult(t *testing.T) { t.Run("ExistingDirectory", func(t *testing.T) { state := ultest.Setup(commands, - ultest.WithFile("sj://user/fileder"), - ultest.WithFile("/home/user/fileder/file"), + ultest.WithFile("sj://user/filedir"), + ultest.WithFile("/home/user/filedir/file"), ) state.Fail(t, "cp", "sj://user", "/home/user", "--recursive") @@ -351,8 +351,8 @@ func TestCpLocalToLocal(t *testing.T) { ) }) - t.Run("EmptyToFolder", func(t *testing.T) { - state.Succeed(t, "cp", "", "/pre", "--recursive").RequireFiles(t, + t.Run("RootToFolder", func(t *testing.T) { + state.Succeed(t, "cp", "/", "/pre", "--recursive").RequireFiles(t, ultest.File{Loc: "/home/user1/folder1/file1.txt", Contents: "data1"}, ultest.File{Loc: "/home/user1/folder1/file2.txt", Contents: "data2"}, ultest.File{Loc: "/home/user1/folder2/file3.txt", Contents: "data3"}, @@ -414,6 +414,7 @@ func TestCpStandard(t *testing.T) { state := ultest.Setup(commands, ultest.WithFile("sj://user/foo"), ultest.WithFile("/home/user/foo"), + ultest.WithStdin("-"), ) t.Run("StdinToRemote", func(t *testing.T) { diff --git a/cmd/uplink/cmd_ls_test.go b/cmd/uplink/cmd_ls_test.go index f71370b2f..ac0fdbfb4 100644 --- a/cmd/uplink/cmd_ls_test.go +++ b/cmd/uplink/cmd_ls_test.go @@ -357,15 +357,15 @@ func TestLsLocal(t *testing.T) { ) t.Run("Recursive", func(t *testing.T) { - state.Succeed(t, "ls", "/user", "--recursive", "--utc").RequireStdout(t, ` - KIND CREATED SIZE KEY - OBJ 1970-01-01 00:00:01 0 /user/deep/aaa/bbb/1 - OBJ 1970-01-01 00:00:02 0 /user/deep/aaa/bbb/2 - OBJ 1970-01-01 00:00:03 0 /user/deep/aaa/bbb/3 - OBJ 1970-01-01 00:00:04 0 /user/foobar/1 - OBJ 1970-01-01 00:00:05 0 /user/foobar/2 - OBJ 1970-01-01 00:00:06 0 /user/foobar/3 - OBJ 1970-01-01 00:00:07 0 /user/foobaz/1 + state.Succeed(t, "ls", "/user", "--recursive", "--utc").RequireStdoutGlob(t, ` + KIND CREATED SIZE KEY + OBJ 20 /user/deep/aaa/bbb/1 + OBJ 20 /user/deep/aaa/bbb/2 + OBJ 20 /user/deep/aaa/bbb/3 + OBJ 14 /user/foobar/1 + OBJ 14 /user/foobar/2 + OBJ 14 /user/foobar/3 + OBJ 14 /user/foobaz/1 `) }) @@ -374,20 +374,20 @@ func TestLsLocal(t *testing.T) { }) t.Run("ExactPrefix", func(t *testing.T) { - state.Succeed(t, "ls", "/user/foobar", "--utc").RequireStdout(t, ` - KIND CREATED SIZE KEY - OBJ 1970-01-01 00:00:04 0 1 - OBJ 1970-01-01 00:00:05 0 2 - OBJ 1970-01-01 00:00:06 0 3 + state.Succeed(t, "ls", "/user/foobar", "--utc").RequireStdoutGlob(t, ` + KIND CREATED SIZE KEY + OBJ 14 1 + OBJ 14 2 + OBJ 14 3 `) }) t.Run("ExactPrefixWithSlash", func(t *testing.T) { - state.Succeed(t, "ls", "/user/foobar/", "--utc").RequireStdout(t, ` - KIND CREATED SIZE KEY - OBJ 1970-01-01 00:00:04 0 1 - OBJ 1970-01-01 00:00:05 0 2 - OBJ 1970-01-01 00:00:06 0 3 + state.Succeed(t, "ls", "/user/foobar/", "--utc").RequireStdoutGlob(t, ` + KIND CREATED SIZE KEY + OBJ 14 1 + OBJ 14 2 + OBJ 14 3 `) }) @@ -402,11 +402,11 @@ func TestLsLocal(t *testing.T) { PRE bbb/ `) - state.Succeed(t, "ls", "/user/deep/aaa/bbb/", "--utc").RequireStdout(t, ` - KIND CREATED SIZE KEY - OBJ 1970-01-01 00:00:01 0 1 - OBJ 1970-01-01 00:00:02 0 2 - OBJ 1970-01-01 00:00:03 0 3 + state.Succeed(t, "ls", "/user/deep/aaa/bbb/", "--utc").RequireStdoutGlob(t, ` + KIND CREATED SIZE KEY + OBJ 20 1 + OBJ 20 2 + OBJ 20 3 `) }) } @@ -442,26 +442,26 @@ func TestLsRelative(t *testing.T) { }) recursive := ` - KIND CREATED SIZE KEY - OBJ 1970-01-01 00:00:01 0 deep/aaa/bbb/1 - OBJ 1970-01-01 00:00:02 0 deep/aaa/bbb/2 - OBJ 1970-01-01 00:00:03 0 deep/aaa/bbb/3 - OBJ 1970-01-01 00:00:04 0 foobar/1 - OBJ 1970-01-01 00:00:05 0 foobar/2 - OBJ 1970-01-01 00:00:06 0 foobar/3 - OBJ 1970-01-01 00:00:07 0 foobaz/1 + KIND CREATED SIZE KEY + OBJ 14 deep/aaa/bbb/1 + OBJ 14 deep/aaa/bbb/2 + OBJ 14 deep/aaa/bbb/3 + OBJ 8 foobar/1 + OBJ 8 foobar/2 + OBJ 8 foobar/3 + OBJ 8 foobaz/1 ` t.Run("Recursive", func(t *testing.T) { - state.Succeed(t, "ls", "", "--recursive", "--utc").RequireStdout(t, recursive) + state.Succeed(t, "ls", "", "--recursive", "--utc").RequireStdoutGlob(t, recursive) }) t.Run("RecursiveDot", func(t *testing.T) { - state.Succeed(t, "ls", ".", "--recursive", "--utc").RequireStdout(t, recursive) + state.Succeed(t, "ls", ".", "--recursive", "--utc").RequireStdoutGlob(t, recursive) }) t.Run("RecursiveDotSlash", func(t *testing.T) { - state.Succeed(t, "ls", "./", "--recursive", "--utc").RequireStdout(t, recursive) + state.Succeed(t, "ls", "./", "--recursive", "--utc").RequireStdoutGlob(t, recursive) }) } diff --git a/cmd/uplink/external_project.go b/cmd/uplink/external_project.go index 8324e9fb0..d47d2608e 100644 --- a/cmd/uplink/external_project.go +++ b/cmd/uplink/external_project.go @@ -22,7 +22,7 @@ func (ex *external) OpenFilesystem(ctx context.Context, accessName string, optio if err != nil { return nil, err } - return ulfs.NewMixed(ulfs.NewLocal(), ulfs.NewRemote(project)), nil + return ulfs.NewMixed(ulfs.NewLocal(ulfs.NewLocalBackendOS()), ulfs.NewRemote(project)), nil } func (ex *external) OpenProject(ctx context.Context, accessName string, options ...ulext.Option) (*uplink.Project, error) { diff --git a/cmd/uplink/ulfs/filesystem.go b/cmd/uplink/ulfs/filesystem.go index 566d4d18f..0dfa86395 100644 --- a/cmd/uplink/ulfs/filesystem.go +++ b/cmd/uplink/ulfs/filesystem.go @@ -36,7 +36,7 @@ type RemoveOptions struct { func (ro *RemoveOptions) isPending() bool { return ro != nil && ro.Pending } -// Filesystem represents either the local Filesystem or the data backed by a project. +// Filesystem represents either the local filesystem or the data backed by a project. type Filesystem interface { Close() error Open(ctx clingy.Context, loc ulloc.Location) (MultiReadHandle, error) @@ -49,6 +49,30 @@ type Filesystem interface { Stat(ctx context.Context, loc ulloc.Location) (*ObjectInfo, error) } +// FilesystemLocal is the interface for a local filesystem. +type FilesystemLocal interface { + IsLocalDir(ctx context.Context, path string) bool + Open(ctx context.Context, path string) (MultiReadHandle, error) + Create(ctx context.Context, path string) (MultiWriteHandle, error) + Move(ctx context.Context, oldpath string, newpath string) error + Copy(ctx context.Context, oldpath string, newpath string) error + Remove(ctx context.Context, path string, opts *RemoveOptions) error + List(ctx context.Context, path string, opts *ListOptions) (ObjectIterator, error) + Stat(ctx context.Context, path string) (*ObjectInfo, error) +} + +// FilesystemRemote is the interface for a remote filesystem. +type FilesystemRemote interface { + Close() error + Open(ctx context.Context, bucket, key string) (MultiReadHandle, error) + Create(ctx context.Context, bucket, key string, opts *CreateOptions) (MultiWriteHandle, error) + Move(ctx context.Context, oldbucket, oldkey string, newbucket, newkey string) error + Copy(ctx context.Context, oldbucket, oldkey string, newbucket, newkey string) error + Remove(ctx context.Context, bucket, key string, opts *RemoveOptions) error + List(ctx context.Context, bucket, key string, opts *ListOptions) ObjectIterator + Stat(ctx context.Context, bucket, key string) (*ObjectInfo, error) +} + // // object info // diff --git a/cmd/uplink/ulfs/handle_file.go b/cmd/uplink/ulfs/handle_file.go index c7ebb108b..555a5b6c1 100644 --- a/cmd/uplink/ulfs/handle_file.go +++ b/cmd/uplink/ulfs/handle_file.go @@ -4,8 +4,6 @@ package ulfs import ( - "os" - "github.com/zeebo/errs" "storj.io/storj/cmd/uplink/ulloc" @@ -16,7 +14,7 @@ import ( // // osMultiReadHandle implements MultiReadHandle for *os.Files. -func newOSMultiReadHandle(fh *os.File) (MultiReadHandle, error) { +func newOSMultiReadHandle(fh LocalBackendFile) (MultiReadHandle, error) { fi, err := fh.Stat() if err != nil { return nil, errs.Wrap(err) @@ -33,19 +31,23 @@ func newOSMultiReadHandle(fh *os.File) (MultiReadHandle, error) { // write handles // -type fileGenericWriter os.File +type fileGenericWriter struct { + fs LocalBackend + raw LocalBackendFile +} -func (f *fileGenericWriter) raw() *os.File { return (*os.File)(f) } - -func (f *fileGenericWriter) WriteAt(b []byte, off int64) (int, error) { return f.raw().WriteAt(b, off) } -func (f *fileGenericWriter) Commit() error { return f.raw().Close() } +func (f *fileGenericWriter) WriteAt(b []byte, off int64) (int, error) { return f.raw.WriteAt(b, off) } +func (f *fileGenericWriter) Commit() error { return f.raw.Close() } func (f *fileGenericWriter) Abort() error { return errs.Combine( - f.raw().Close(), - os.Remove(f.raw().Name()), + f.raw.Close(), + f.fs.Remove(f.raw.Name()), ) } -func newOSMultiWriteHandle(fh *os.File) MultiWriteHandle { - return NewGenericMultiWriteHandle((*fileGenericWriter)(fh)) +func newOSMultiWriteHandle(fs LocalBackend, fh LocalBackendFile) MultiWriteHandle { + return NewGenericMultiWriteHandle(&fileGenericWriter{ + fs: fs, + raw: fh, + }) } diff --git a/cmd/uplink/ulfs/local.go b/cmd/uplink/ulfs/local.go index 80c5c4192..bad3e3df6 100644 --- a/cmd/uplink/ulfs/local.go +++ b/cmd/uplink/ulfs/local.go @@ -5,28 +5,51 @@ package ulfs import ( "context" - "io/ioutil" + "io" "os" "path/filepath" "sort" - "strings" "github.com/zeebo/errs" "storj.io/storj/cmd/uplink/ulloc" ) +// LocalBackendFile represents a file in the filesystem. +type LocalBackendFile interface { + io.Closer + io.ReaderAt + io.WriterAt + Name() string + Stat() (os.FileInfo, error) + Readdir(int) ([]os.FileInfo, error) +} + +// LocalBackend abstracts what the Local filesystem interacts with. +type LocalBackend interface { + Create(name string) (LocalBackendFile, error) + MkdirAll(path string, perm os.FileMode) error + Open(name string) (LocalBackendFile, error) + Remove(name string) error + Rename(oldname, newname string) error + Stat(name string) (os.FileInfo, error) +} + // Local implements something close to a filesystem but backed by the local disk. -type Local struct{} +type Local struct { + fs LocalBackend +} // NewLocal constructs a Local filesystem. -func NewLocal() *Local { - return &Local{} +func NewLocal(fs LocalBackend) *Local { + return &Local{ + fs: fs, + } } // Open returns a read ReadHandle for the given local path. func (l *Local) Open(ctx context.Context, path string) (MultiReadHandle, error) { - fh, err := os.Open(path) + fh, err := l.fs.Open(path) if err != nil { return nil, errs.Wrap(err) } @@ -35,28 +58,28 @@ func (l *Local) Open(ctx context.Context, path string) (MultiReadHandle, error) // Create makes any directories necessary to create a file at path and returns a WriteHandle. func (l *Local) Create(ctx context.Context, path string) (MultiWriteHandle, error) { - fi, err := os.Stat(path) + fi, err := l.fs.Stat(path) if err != nil && !os.IsNotExist(err) { return nil, errs.Wrap(err) } else if err == nil && fi.IsDir() { return nil, errs.New("path exists as a directory already") } - if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { + if err := l.fs.MkdirAll(filepath.Dir(path), 0755); err != nil { return nil, errs.Wrap(err) } // TODO: atomic rename - fh, err := os.Create(path) + fh, err := l.fs.Create(path) if err != nil { return nil, errs.Wrap(err) } - return newOSMultiWriteHandle(fh), nil + return newOSMultiWriteHandle(l.fs, fh), nil } // Move moves file to provided path. func (l *Local) Move(ctx context.Context, oldpath, newpath string) error { - return os.Rename(oldpath, newpath) + return l.fs.Rename(oldpath, newpath) } // Copy copies file to provided path. @@ -70,7 +93,7 @@ func (l *Local) Remove(ctx context.Context, path string, opts *RemoveOptions) er return nil } - if err := os.Remove(path); os.IsNotExist(err) { + if err := l.fs.Remove(path); os.IsNotExist(err) { return nil } else if err != nil { return err @@ -86,20 +109,13 @@ func (l *Local) List(ctx context.Context, path string, opts *ListOptions) (Objec return emptyObjectIterator{}, nil } - prefix := path - if idx := strings.LastIndex(path, "/"); idx >= 0 { - prefix = path[:idx+1] - } - - prefix, err := filepath.Abs(prefix) - if err != nil { - return nil, errs.Wrap(err) - } - prefix += string(filepath.Separator) + prefix := filepath.Clean(path) + string(filepath.Separator) + var err error var files []os.FileInfo + if opts.isRecursive() { - err = filepath.Walk(prefix, func(path string, info os.FileInfo, err error) error { + err = walk(l.fs, filepath.Clean(path), func(path string, info os.FileInfo, err error) error { if err == nil && !info.IsDir() { rel, err := filepath.Rel(prefix, path) if err != nil { @@ -113,10 +129,13 @@ func (l *Local) List(ctx context.Context, path string, opts *ListOptions) (Objec return nil }) } else { - files, err = ioutil.ReadDir(prefix) + files, err = readdir(l.fs, prefix, -1) + if os.IsNotExist(err) { + return emptyObjectIterator{}, nil + } } if err != nil { - return nil, err + return nil, errs.Wrap(err) } sort.Slice(files, func(i, j int) bool { @@ -146,7 +165,7 @@ func (l *Local) List(ctx context.Context, path string, opts *ListOptions) (Objec // IsLocalDir returns true if the path is a directory. func (l *Local) IsLocalDir(ctx context.Context, path string) bool { - fi, err := os.Stat(path) + fi, err := l.fs.Stat(path) if err != nil { return false } @@ -155,7 +174,7 @@ func (l *Local) IsLocalDir(ctx context.Context, path string) bool { // Stat returns an ObjectInfo describing the provided path. func (l *Local) Stat(ctx context.Context, path string) (*ObjectInfo, error) { - fi, err := os.Stat(path) + fi, err := l.fs.Stat(path) if err != nil { return nil, err } @@ -203,3 +222,55 @@ func (fi *fileinfoObjectIterator) Item() ObjectInfo { ContentLength: fi.current.Size(), } } + +func walk(fs LocalBackend, path string, cb func(path string, info os.FileInfo, err error) error) error { + info, err := fs.Stat(path) + if err != nil { + return cb(path, nil, err) + } + return walkHelper(fs, path, info, cb) +} + +func walkHelper(fs LocalBackend, path string, info os.FileInfo, cb func(path string, info os.FileInfo, err error) error) error { + if !info.IsDir() { + return cb(path, info, nil) + } + + infos, err := readdir(fs, path, -1) + err1 := cb(path, info, err) + if err != nil || err1 != nil { + return err1 + } + + for _, info := range infos { + filename := filepath.Join(path, info.Name()) + fileInfo, err := fs.Stat(filename) + if err != nil { + if err := cb(filename, fileInfo, err); err != nil { + return err + } + } else { + err := walkHelper(fs, filename, fileInfo, cb) + if err != nil && !fileInfo.IsDir() { + return err + } + } + } + return nil +} + +func readdir(fs LocalBackend, path string, n int) ([]os.FileInfo, error) { + f, err := fs.Open(path) + if err != nil { + return nil, err + } + list, err := f.Readdir(n) + _ = f.Close() + if err != nil { + return nil, err + } + sort.Slice(list, func(i, j int) bool { + return list[i].Name() < list[j].Name() + }) + return list, nil +} diff --git a/cmd/uplink/ulfs/local_backend_mem.go b/cmd/uplink/ulfs/local_backend_mem.go new file mode 100644 index 000000000..9f7a24911 --- /dev/null +++ b/cmd/uplink/ulfs/local_backend_mem.go @@ -0,0 +1,310 @@ +// Copyright (C) 2022 Storj Labs, Inc. +// See LICENSE for copying information. + +package ulfs + +import ( + "fmt" + "io" + "io/fs" + "os" + "path/filepath" + "strings" + "time" + + "github.com/zeebo/errs" +) + +// LocalBackendMem implements LocalBackend with memory backed files. +type LocalBackendMem struct { + root *memDir + cwd *memDir +} + +// NewLocalBackendMem creates a new LocalBackendMem. +func NewLocalBackendMem() *LocalBackendMem { + return &LocalBackendMem{ + root: newMemDir("/"), + cwd: newMemDir(""), + } +} + +func (l *LocalBackendMem) openRoot(name string) (string, *memDir) { + if strings.HasPrefix(name, string(filepath.Separator)) { + return name, l.root + } else if name == "." { + return name[1:], l.cwd + } else if strings.HasPrefix(name, "./") { + return name[2:], l.cwd + } + return name, l.cwd +} + +func (l *LocalBackendMem) openParent(name string) (*memDir, string, error) { + dir := filepath.Dir(name) + + fh, err := l.Open(dir) + if err != nil { + return nil, "", err + } + + md, ok := fh.(*memDir) + if !ok { + return nil, "", errs.New("parent not a directory: %q", dir) + } + return md, filepath.Base(name), nil +} + +// Create creates a new file for the given name. +func (l *LocalBackendMem) Create(name string) (LocalBackendFile, error) { + name = filepath.Clean(name) + + md, base, err := l.openParent(name) + if err != nil { + return nil, err + } + if fh, ok := md.children[base]; ok { + if _, ok := fh.(*memDir); ok { + return nil, errs.New("file already exists: %q", name) + } + } + mf := newMemFile(name) + md.children[base] = mf + return mf, nil +} + +// MkdirAll recursively creates directories to make name a directory. +func (l *LocalBackendMem) MkdirAll(name string, perm os.FileMode) error { + name = filepath.Clean(name) + + name, root := l.openRoot(name) + return iterateComponents(name, func(name, ent string) error { + fh, ok := root.children[ent] + if !ok { + fh = newMemDir(name) + root.children[ent] = fh + } + md, ok := fh.(*memDir) + if !ok { + return errs.New("file already exists: %q", name) + } + root = md + return nil + }) +} + +// Open opens the file with the given name. +func (l *LocalBackendMem) Open(name string) (LocalBackendFile, error) { + name = filepath.Clean(name) + + var root LocalBackendFile + name, root = l.openRoot(name) + err := iterateComponents(name, func(name, ent string) error { + md, ok := root.(*memDir) + if !ok { + return errs.New("not a directory: %q", name) + } + fh, ok := md.children[ent] + if !ok { + return os.ErrNotExist + } + root = fh + return nil + }) + if err != nil { + return nil, err + } + return root, nil +} + +// Remove deletes the file with the given name. +func (l *LocalBackendMem) Remove(name string) error { + name = filepath.Clean(name) + + md, base, err := l.openParent(name) + if err != nil { + return err + } + if _, ok := md.children[base]; !ok { + return errs.New("file does not exists: %q", name) + } + delete(md.children, base) + return nil +} + +// Rename causes the file at oldname to be moved to newname. +func (l *LocalBackendMem) Rename(oldname, newname string) error { + oldname = filepath.Clean(oldname) + newname = filepath.Clean(newname) + + omd, obase, err := l.openParent(oldname) + if err != nil { + return err + } + nmd, nbase, err := l.openParent(newname) + if err != nil { + return err + } + + f, ok := omd.children[obase] + if !ok { + return os.ErrNotExist + } + + switch f := f.(type) { + case *memFile: + f.name = newname + case *memDir: + f.name = newname + } + + nmd.children[nbase] = f + delete(omd.children, obase) + + return nil +} + +// Stat returns file info for the given name. +func (l *LocalBackendMem) Stat(name string) (os.FileInfo, error) { + fh, err := l.Open(name) + if err != nil { + return nil, err + } + return fh.Stat() +} + +// +// memFile +// + +type memFile struct { + name string + buf []byte +} + +func newMemFile(name string) *memFile { + return &memFile{ + name: name, + } +} + +func (mf *memFile) String() string { return fmt.Sprintf("File[%q]", mf.name) } + +func (mf *memFile) Name() string { return mf.name } +func (mf *memFile) Close() error { return nil } + +func (mf *memFile) ReadAt(p []byte, off int64) (int, error) { + if off >= int64(len(mf.buf)) { + return 0, io.EOF + } + return copy(p, mf.buf[off:]), nil +} + +func (mf *memFile) WriteAt(p []byte, off int64) (int, error) { + if delta := (off + int64(len(p))) - int64(len(mf.buf)); delta > 0 { + mf.buf = append(mf.buf, make([]byte, delta)...) + } + return copy(mf.buf[off:], p), nil +} + +func (mf *memFile) Stat() (os.FileInfo, error) { + return (*memFileInfo)(mf), nil +} + +func (mf *memFile) Readdir(n int) ([]os.FileInfo, error) { + return nil, errs.New("readdir on regular file") +} + +type memFileInfo memFile + +var _ os.FileInfo = (*memFileInfo)(nil) + +func (mfi *memFileInfo) Name() string { + return filepath.Base((*memFile)(mfi).name) +} + +func (mfi *memFileInfo) Size() int64 { return int64(len((*memFile)(mfi).buf)) } +func (mfi *memFileInfo) Mode() fs.FileMode { return 0777 } +func (mfi *memFileInfo) ModTime() time.Time { return time.Time{} } +func (mfi *memFileInfo) IsDir() bool { return false } +func (mfi *memFileInfo) Sys() interface{} { return nil } + +// +// memDir +// + +type memDir struct { + name string + children map[string]LocalBackendFile +} + +func newMemDir(name string) *memDir { + return &memDir{ + name: name, + children: make(map[string]LocalBackendFile), + } +} + +var _ LocalBackendFile = (*memDir)(nil) + +func (md *memDir) String() string { return fmt.Sprintf("Dir[%q, %v]", md.name, md.children) } + +func (md *memDir) Name() string { return md.name } +func (md *memDir) Close() error { return nil } + +func (md *memDir) ReadAt(p []byte, off int64) (int, error) { + return 0, errs.New("readat on directory") +} + +func (md *memDir) WriteAt(p []byte, off int64) (int, error) { + return 0, errs.New("writeat on directory") +} + +func (md *memDir) Stat() (os.FileInfo, error) { + return (*memDirInfo)(md), nil +} + +func (md *memDir) Readdir(n int) ([]os.FileInfo, error) { + if n != -1 { + return nil, errs.New("can only read all entries") + } + out := make([]os.FileInfo, 0, len(md.children)) + for _, child := range md.children { + info, _ := child.Stat() + out = append(out, info) + } + return out, nil +} + +type memDirInfo memDir + +var _ os.FileInfo = (*memDirInfo)(nil) + +func (dfi *memDirInfo) Name() string { + return filepath.Base((*memDir)(dfi).name) +} + +func (dfi *memDirInfo) Size() int64 { return 0 } +func (dfi *memDirInfo) Mode() fs.FileMode { return 0777 } +func (dfi *memDirInfo) ModTime() time.Time { return time.Time{} } +func (dfi *memDirInfo) IsDir() bool { return true } +func (dfi *memDirInfo) Sys() interface{} { return nil } + +// +// helpers +// + +func iterateComponents(name string, cb func(name, ent string) error) error { + i := 0 + for i < len(name) { + part := strings.IndexByte(name[i:], filepath.Separator) + if part == -1 { + return cb(name, name[i:]) + } + if err := cb(name[:i+part], name[i:i+part]); err != nil { + return err + } + i += part + 1 + } + return nil +} diff --git a/cmd/uplink/ulfs/local_backend_os.go b/cmd/uplink/ulfs/local_backend_os.go new file mode 100644 index 000000000..8cd117873 --- /dev/null +++ b/cmd/uplink/ulfs/local_backend_os.go @@ -0,0 +1,44 @@ +// Copyright (C) 2022 Storj Labs, Inc. +// See LICENSE for copying information. + +package ulfs + +import "os" + +// LocalBackendOS implements LocalBackend by using the os package. +type LocalBackendOS struct{} + +// NewLocalBackendOS constructs a new LocalBackendOS. +func NewLocalBackendOS() *LocalBackendOS { + return new(LocalBackendOS) +} + +// Create calls os.Create. +func (l *LocalBackendOS) Create(name string) (LocalBackendFile, error) { + return os.Create(name) +} + +// MkdirAll calls os.MkdirAll. +func (l *LocalBackendOS) MkdirAll(path string, perm os.FileMode) error { + return os.MkdirAll(path, perm) +} + +// Open calls os.Open. +func (l *LocalBackendOS) Open(name string) (LocalBackendFile, error) { + return os.Open(name) +} + +// Remove calls os.Remove. +func (l *LocalBackendOS) Remove(name string) error { + return os.Remove(name) +} + +// Rename calls os.Rename. +func (l *LocalBackendOS) Rename(oldname, newname string) error { + return os.Rename(oldname, newname) +} + +// Stat calls os.Stat. +func (l *LocalBackendOS) Stat(name string) (os.FileInfo, error) { + return os.Stat(name) +} diff --git a/cmd/uplink/ulfs/mixed.go b/cmd/uplink/ulfs/mixed.go index 36e4ab143..f0b8087fe 100644 --- a/cmd/uplink/ulfs/mixed.go +++ b/cmd/uplink/ulfs/mixed.go @@ -14,12 +14,12 @@ import ( // Mixed dispatches to either the local or remote filesystem depending on the location. type Mixed struct { - local *Local - remote *Remote + local FilesystemLocal + remote FilesystemRemote } // NewMixed returns a Mixed backed by the provided local and remote filesystems. -func NewMixed(local *Local, remote *Remote) *Mixed { +func NewMixed(local FilesystemLocal, remote FilesystemRemote) *Mixed { return &Mixed{ local: local, remote: remote, diff --git a/cmd/uplink/ultest/filesystem.go b/cmd/uplink/ultest/filesystem.go index ce3591b4b..0a9fb01c3 100644 --- a/cmd/uplink/ultest/filesystem.go +++ b/cmd/uplink/ultest/filesystem.go @@ -7,13 +7,10 @@ import ( "bytes" "context" "io" - "path/filepath" "sort" - "strings" "sync" "time" - "github.com/zeebo/clingy" "github.com/zeebo/errs" "storj.io/storj/cmd/uplink/ulfs" @@ -24,22 +21,19 @@ import ( // ulfs.Filesystem // -type testFilesystem struct { - stdin string +type remoteFilesystem struct { created int64 files map[ulloc.Location]memFileData pending map[ulloc.Location][]*memWriteHandle - locals map[string]bool // true means path is a directory buckets map[string]struct{} mu sync.Mutex } -func newTestFilesystem() *testFilesystem { - return &testFilesystem{ +func newRemoteFilesystem() *remoteFilesystem { + return &remoteFilesystem{ files: make(map[ulloc.Location]memFileData), pending: make(map[ulloc.Location][]*memWriteHandle), - locals: make(map[string]bool), buckets: make(map[string]struct{}), } } @@ -54,12 +48,12 @@ func (mf memFileData) expired() bool { return mf.expires != time.Time{} && mf.expires.Before(time.Now()) } -func (tfs *testFilesystem) ensureBucket(name string) { - tfs.buckets[name] = struct{}{} +func (rfs *remoteFilesystem) ensureBucket(name string) { + rfs.buckets[name] = struct{}{} } -func (tfs *testFilesystem) Files() (files []File) { - for loc, mf := range tfs.files { +func (rfs *remoteFilesystem) Files() (files []File) { + for loc, mf := range rfs.files { if mf.expired() { continue } @@ -72,8 +66,8 @@ func (tfs *testFilesystem) Files() (files []File) { return files } -func (tfs *testFilesystem) Pending() (files []File) { - for loc, mh := range tfs.pending { +func (rfs *remoteFilesystem) Pending() (files []File) { + for loc, mh := range rfs.pending { for _, h := range mh { files = append(files, File{ Loc: loc.String(), @@ -85,7 +79,7 @@ func (tfs *testFilesystem) Pending() (files []File) { return files } -func (tfs *testFilesystem) Close() error { +func (rfs *remoteFilesystem) Close() error { return nil } @@ -101,15 +95,13 @@ func newMultiReadHandle(contents string) ulfs.MultiReadHandle { }) } -func (tfs *testFilesystem) Open(ctx clingy.Context, loc ulloc.Location) (ulfs.MultiReadHandle, error) { - tfs.mu.Lock() - defer tfs.mu.Unlock() +func (rfs *remoteFilesystem) Open(ctx context.Context, bucket, key string) (ulfs.MultiReadHandle, error) { + rfs.mu.Lock() + defer rfs.mu.Unlock() - if loc.Std() { - return newMultiReadHandle("-"), nil - } + loc := ulloc.NewRemote(bucket, key) - mf, ok := tfs.files[loc] + mf, ok := rfs.files[loc] if !ok { return nil, errs.New("file does not exist %q", loc) } @@ -117,28 +109,14 @@ func (tfs *testFilesystem) Open(ctx clingy.Context, loc ulloc.Location) (ulfs.Mu return newMultiReadHandle(mf.contents), nil } -func (tfs *testFilesystem) Create(ctx clingy.Context, loc ulloc.Location, opts *ulfs.CreateOptions) (_ ulfs.MultiWriteHandle, err error) { - tfs.mu.Lock() - defer tfs.mu.Unlock() +func (rfs *remoteFilesystem) Create(ctx context.Context, bucket, key string, opts *ulfs.CreateOptions) (_ ulfs.MultiWriteHandle, err error) { + rfs.mu.Lock() + defer rfs.mu.Unlock() - if loc.Std() { - return ulfs.NewGenericMultiWriteHandle(new(discardWriteHandle)), nil - } + loc := ulloc.NewRemote(bucket, key) - if bucket, _, ok := loc.RemoteParts(); ok { - if _, ok := tfs.buckets[bucket]; !ok { - return nil, errs.New("bucket %q does not exist", bucket) - } - } - - if path, ok := loc.LocalParts(); ok { - if loc.Directoryish() || tfs.isLocalDir(ctx, loc) { - return nil, errs.New("unable to open file for writing: %q", loc) - } - dir := ulloc.CleanPath(filepath.Dir(path)) - if err := tfs.mkdirAll(ctx, dir); err != nil { - return nil, err - } + if _, ok := rfs.buckets[bucket]; !ok { + return nil, errs.New("bucket %q does not exist", bucket) } expires := time.Time{} @@ -146,71 +124,79 @@ func (tfs *testFilesystem) Create(ctx clingy.Context, loc ulloc.Location, opts * expires = opts.Expires } - tfs.created++ + rfs.created++ wh := &memWriteHandle{ loc: loc, - tfs: tfs, - cre: tfs.created, + rfs: rfs, + cre: rfs.created, expires: expires, } - if loc.Remote() { - tfs.pending[loc] = append(tfs.pending[loc], wh) - } + rfs.pending[loc] = append(rfs.pending[loc], wh) return ulfs.NewGenericMultiWriteHandle(wh), nil } -func (tfs *testFilesystem) Move(ctx clingy.Context, source, dest ulloc.Location) error { - tfs.mu.Lock() - defer tfs.mu.Unlock() +func (rfs *remoteFilesystem) Move(ctx context.Context, oldbucket, oldkey string, newbucket, newkey string) error { + rfs.mu.Lock() + defer rfs.mu.Unlock() - mf, ok := tfs.files[source] + source := ulloc.NewRemote(oldbucket, oldkey) + dest := ulloc.NewRemote(newbucket, newkey) + + mf, ok := rfs.files[source] if !ok { return errs.New("file does not exist %q", source) } - delete(tfs.files, source) - tfs.files[dest] = mf + delete(rfs.files, source) + rfs.files[dest] = mf return nil } -func (tfs *testFilesystem) Copy(ctx clingy.Context, source, dest ulloc.Location) error { - tfs.mu.Lock() - defer tfs.mu.Unlock() +func (rfs *remoteFilesystem) Copy(ctx context.Context, oldbucket, oldkey string, newbucket, newkey string) error { + rfs.mu.Lock() + defer rfs.mu.Unlock() - mf, ok := tfs.files[source] + source := ulloc.NewRemote(oldbucket, oldkey) + dest := ulloc.NewRemote(newbucket, newkey) + + mf, ok := rfs.files[source] if !ok { return errs.New("file does not exist %q", source) } - tfs.files[dest] = mf + rfs.files[dest] = mf return nil } -func (tfs *testFilesystem) Remove(ctx context.Context, loc ulloc.Location, opts *ulfs.RemoveOptions) error { - tfs.mu.Lock() - defer tfs.mu.Unlock() +func (rfs *remoteFilesystem) Remove(ctx context.Context, bucket, key string, opts *ulfs.RemoveOptions) error { + rfs.mu.Lock() + defer rfs.mu.Unlock() + + loc := ulloc.NewRemote(bucket, key) if opts == nil || !opts.Pending { - delete(tfs.files, loc) + delete(rfs.files, loc) } else { // TODO: Remove needs an API that understands that multiple pending files may exist - delete(tfs.pending, loc) + delete(rfs.pending, loc) } return nil } -func (tfs *testFilesystem) List(ctx context.Context, prefix ulloc.Location, opts *ulfs.ListOptions) (ulfs.ObjectIterator, error) { - tfs.mu.Lock() - defer tfs.mu.Unlock() +func (rfs *remoteFilesystem) List(ctx context.Context, bucket, key string, opts *ulfs.ListOptions) ulfs.ObjectIterator { + rfs.mu.Lock() + defer rfs.mu.Unlock() + + prefix := ulloc.NewRemote(bucket, key) if opts != nil && opts.Pending { - return tfs.listPending(ctx, prefix, opts) + return rfs.listPending(ctx, prefix, opts) } prefixDir := prefix.AsDirectoryish() var infos []ulfs.ObjectInfo - for loc, mf := range tfs.files { + for loc, mf := range rfs.files { if (loc.HasPrefix(prefixDir) || loc == prefix) && !mf.expired() { infos = append(infos, ulfs.ObjectInfo{ Loc: loc, @@ -226,18 +212,14 @@ func (tfs *testFilesystem) List(ctx context.Context, prefix ulloc.Location, opts infos = collapseObjectInfos(prefix, infos) } - return &objectInfoIterator{infos: infos}, nil + return &objectInfoIterator{infos: infos} } -func (tfs *testFilesystem) listPending(ctx context.Context, prefix ulloc.Location, opts *ulfs.ListOptions) (ulfs.ObjectIterator, error) { - if prefix.Local() { - return &objectInfoIterator{}, nil - } - +func (rfs *remoteFilesystem) listPending(ctx context.Context, prefix ulloc.Location, opts *ulfs.ListOptions) ulfs.ObjectIterator { prefixDir := prefix.AsDirectoryish() var infos []ulfs.ObjectInfo - for loc, whs := range tfs.pending { + for loc, whs := range rfs.pending { if loc.HasPrefix(prefixDir) || loc == prefix { for _, wh := range whs { infos = append(infos, ulfs.ObjectInfo{ @@ -254,27 +236,16 @@ func (tfs *testFilesystem) listPending(ctx context.Context, prefix ulloc.Locatio infos = collapseObjectInfos(prefix, infos) } - return &objectInfoIterator{infos: infos}, nil + return &objectInfoIterator{infos: infos} } -func (tfs *testFilesystem) IsLocalDir(ctx context.Context, loc ulloc.Location) (local bool) { - tfs.mu.Lock() - defer tfs.mu.Unlock() +func (rfs *remoteFilesystem) Stat(ctx context.Context, bucket, key string) (*ulfs.ObjectInfo, error) { + rfs.mu.Lock() + defer rfs.mu.Unlock() - return tfs.isLocalDir(ctx, loc) -} + loc := ulloc.NewRemote(bucket, key) -func (tfs *testFilesystem) isLocalDir(ctx context.Context, loc ulloc.Location) (local bool) { - path, ok := loc.LocalParts() - return ok && (ulloc.CleanPath(path) == "." || tfs.locals[path]) -} - -func (tfs *testFilesystem) Stat(ctx context.Context, loc ulloc.Location) (*ulfs.ObjectInfo, error) { - if loc.Std() { - return nil, errs.New("unable to stat loc %q", loc.Loc()) - } - - mf, ok := tfs.files[loc] + mf, ok := rfs.files[loc] if !ok { return nil, errs.New("file does not exist: %q", loc.Loc()) } @@ -291,32 +262,6 @@ func (tfs *testFilesystem) Stat(ctx context.Context, loc ulloc.Location) (*ulfs. }, nil } -func (tfs *testFilesystem) mkdirAll(ctx context.Context, dir string) error { - i := 0 - for i < len(dir) { - slash := strings.Index(dir[i:], "/") - if slash == -1 { - break - } - if err := tfs.mkdir(ctx, dir[:i+slash]); err != nil { - return err - } - i += slash + 1 - } - if len(dir) > 0 { - return tfs.mkdir(ctx, dir) - } - return nil -} - -func (tfs *testFilesystem) mkdir(ctx context.Context, dir string) error { - if isDir, ok := tfs.locals[dir]; ok && !isDir { - return errs.New("cannot create directory: %q is a file", dir) - } - tfs.locals[dir] = true - return nil -} - // // ulfs.WriteHandle // @@ -324,7 +269,7 @@ func (tfs *testFilesystem) mkdir(ctx context.Context, dir string) error { type memWriteHandle struct { buf []byte loc ulloc.Location - tfs *testFilesystem + rfs *remoteFilesystem cre int64 expires time.Time done bool @@ -342,18 +287,14 @@ func (b *memWriteHandle) WriteAt(p []byte, off int64) (int, error) { } func (b *memWriteHandle) Commit() error { - b.tfs.mu.Lock() - defer b.tfs.mu.Unlock() + b.rfs.mu.Lock() + defer b.rfs.mu.Unlock() if err := b.close(); err != nil { return err } - if path, ok := b.loc.LocalParts(); ok { - b.tfs.locals[path] = false - } - - b.tfs.files[b.loc] = memFileData{ + b.rfs.files[b.loc] = memFileData{ contents: string(b.buf), created: b.cre, expires: b.expires, @@ -363,8 +304,8 @@ func (b *memWriteHandle) Commit() error { } func (b *memWriteHandle) Abort() error { - b.tfs.mu.Lock() - defer b.tfs.mu.Unlock() + b.rfs.mu.Lock() + defer b.rfs.mu.Unlock() if err := b.close(); err != nil { return err @@ -379,7 +320,7 @@ func (b *memWriteHandle) close() error { } b.done = true - handles := b.tfs.pending[b.loc] + handles := b.rfs.pending[b.loc] for i, v := range handles { if v == b { handles = append(handles[:i], handles[i+1:]...) @@ -388,20 +329,14 @@ func (b *memWriteHandle) close() error { } if len(handles) > 0 { - b.tfs.pending[b.loc] = handles + b.rfs.pending[b.loc] = handles } else { - delete(b.tfs.pending, b.loc) + delete(b.rfs.pending, b.loc) } return nil } -type discardWriteHandle struct{} - -func (discardWriteHandle) WriteAt(p []byte, off int64) (int, error) { return len(p), nil } -func (discardWriteHandle) Commit() error { return nil } -func (discardWriteHandle) Abort() error { return nil } - // // ulfs.ObjectIterator // diff --git a/cmd/uplink/ultest/setup.go b/cmd/uplink/ultest/setup.go index 40450e9a5..09a5929dd 100644 --- a/cmd/uplink/ultest/setup.go +++ b/cmd/uplink/ultest/setup.go @@ -6,6 +6,8 @@ package ultest import ( "bytes" "context" + "io/ioutil" + "sort" "testing" "github.com/stretchr/testify/require" @@ -62,7 +64,15 @@ func (st State) Run(t *testing.T, args ...string) Result { var stdin bytes.Buffer var ran bool - tfs := newTestFilesystem() + ctx := context.Background() + lfs := ulfs.NewLocal(ulfs.NewLocalBackendMem()) + rfs := newRemoteFilesystem() + fs := ulfs.NewMixed(lfs, rfs) + + cs := &callbackState{ + fs: fs, + rfs: rfs, + } ok, err := clingy.Environment{ Name: "uplink-test", @@ -74,72 +84,122 @@ func (st State) Run(t *testing.T, args ...string) Result { Wrap: func(ctx clingy.Context, cmd clingy.Command) error { for _, opt := range st.opts { - opt.fn(t, ctx, tfs) + opt.fn(t, ctx, cs) } - if len(tfs.stdin) > 0 { - _, _ = stdin.WriteString(tfs.stdin) + if len(cs.stdin) > 0 { + _, _ = stdin.WriteString(cs.stdin) } ran = true return cmd.Execute(ctx) }, - }.Run(context.Background(), func(cmds clingy.Commands) { - st.cmds(cmds, newExternal(tfs, nil)) + }.Run(ctx, func(cmds clingy.Commands) { + st.cmds(cmds, newExternal(fs, nil)) }) if ok && err == nil { require.True(t, ran, "no command was executed: %q", args) } + files := rfs.Files() + files = gatherLocalFiles(ctx, t, lfs, files) + sort.Slice(files, func(i, j int) bool { return files[i].less(files[j]) }) + return Result{ Stdout: stdout.String(), Stderr: stderr.String(), Ok: ok, Err: err, - Files: tfs.Files(), - Pending: tfs.Pending(), + Files: files, + Pending: rfs.Pending(), } } +func gatherLocalFiles(ctx context.Context, t *testing.T, fs ulfs.FilesystemLocal, files []File) []File { + { + iter, err := fs.List(ctx, "", &ulfs.ListOptions{Recursive: true}) + require.NoError(t, err) + files = collectIterator(ctx, t, fs, iter, files) + } + { + iter, err := fs.List(ctx, "/", &ulfs.ListOptions{Recursive: true}) + require.NoError(t, err) + files = collectIterator(ctx, t, fs, iter, files) + } + return files +} + +func collectIterator(ctx context.Context, t *testing.T, fs ulfs.FilesystemLocal, iter ulfs.ObjectIterator, files []File) []File { + for iter.Next() { + func() { + loc := iter.Item().Loc.Loc() + + mrh, err := fs.Open(ctx, loc) + require.NoError(t, err) + defer func() { _ = mrh.Close() }() + + rh, err := mrh.NextPart(ctx, -1) + require.NoError(t, err) + defer func() { _ = rh.Close() }() + + data, err := ioutil.ReadAll(rh) + require.NoError(t, err) + files = append(files, File{ + Loc: loc, + Contents: string(data), + }) + }() + } + require.NoError(t, iter.Err()) + + return files +} + +type callbackState struct { + stdin string + fs ulfs.Filesystem + rfs *remoteFilesystem +} + // ExecuteOption allows one to control the environment that a command executes in. type ExecuteOption struct { - fn func(t *testing.T, ctx clingy.Context, tfs *testFilesystem) + fn func(t *testing.T, ctx clingy.Context, cs *callbackState) } // WithFilesystem lets one do arbitrary setup on the filesystem in a callback. func WithFilesystem(cb func(t *testing.T, ctx clingy.Context, fs ulfs.Filesystem)) ExecuteOption { - return ExecuteOption{func(t *testing.T, ctx clingy.Context, tfs *testFilesystem) { - cb(t, ctx, tfs) + return ExecuteOption{func(t *testing.T, ctx clingy.Context, cs *callbackState) { + cb(t, ctx, cs.fs) }} } // WithBucket ensures the bucket exists. func WithBucket(name string) ExecuteOption { - return ExecuteOption{func(_ *testing.T, _ clingy.Context, tfs *testFilesystem) { - tfs.ensureBucket(name) + return ExecuteOption{func(_ *testing.T, _ clingy.Context, cs *callbackState) { + cs.rfs.ensureBucket(name) }} } // WithStdin sets the command to execute with the provided string as standard input. func WithStdin(stdin string) ExecuteOption { - return ExecuteOption{func(_ *testing.T, _ clingy.Context, tfs *testFilesystem) { - tfs.stdin = stdin + return ExecuteOption{func(_ *testing.T, _ clingy.Context, cs *callbackState) { + cs.stdin = stdin }} } // WithFile sets the command to execute with a file created at the given location. func WithFile(location string, contents ...string) ExecuteOption { contents = append([]string(nil), contents...) - return ExecuteOption{func(t *testing.T, ctx clingy.Context, tfs *testFilesystem) { + return ExecuteOption{func(t *testing.T, ctx clingy.Context, cs *callbackState) { loc, err := ulloc.Parse(location) require.NoError(t, err) if bucket, _, ok := loc.RemoteParts(); ok { - tfs.ensureBucket(bucket) + cs.rfs.ensureBucket(bucket) } - mwh, err := tfs.Create(ctx, loc, nil) + mwh, err := cs.fs.Create(ctx, loc, nil) require.NoError(t, err) defer func() { _ = mwh.Abort(ctx) }() @@ -164,17 +224,17 @@ func WithFile(location string, contents ...string) ExecuteOption { // WithPendingFile sets the command to execute with a pending upload happening to // the provided location. func WithPendingFile(location string) ExecuteOption { - return ExecuteOption{func(t *testing.T, ctx clingy.Context, tfs *testFilesystem) { + return ExecuteOption{func(t *testing.T, ctx clingy.Context, cs *callbackState) { loc, err := ulloc.Parse(location) require.NoError(t, err) if bucket, _, ok := loc.RemoteParts(); ok { - tfs.ensureBucket(bucket) + cs.rfs.ensureBucket(bucket) } else { t.Fatalf("Invalid pending local file: %s", loc) } - _, err = tfs.Create(ctx, loc, nil) + _, err = cs.fs.Create(ctx, loc, nil) require.NoError(t, err) }} }