cmd/uplink: fix recursive copy and improve tests
recursive copy had a bug with relative local paths. this fixes that bug and changes the test framework to use more of the code that actually runs in uplink and only mocks out the direct interaction with the operating system. Change-Id: I9da2a80bfda8f86a8d05879b87171f299f759c7e
This commit is contained in:
parent
41c5879f7c
commit
89ccfe2dd7
@ -177,12 +177,12 @@ func TestCpRecursiveDifficult(t *testing.T) {
|
||||
|
||||
t.Run("DirectoryConflict", func(t *testing.T) {
|
||||
state := ultest.Setup(commands,
|
||||
ultest.WithFile("sj://user/fileder"),
|
||||
ultest.WithFile("sj://user/fileder/file"),
|
||||
ultest.WithFile("sj://user/filedir"),
|
||||
ultest.WithFile("sj://user/filedir/file"),
|
||||
)
|
||||
|
||||
state.Fail(t, "cp", "sj://user", "root", "--recursive").RequireLocalFiles(t,
|
||||
ultest.File{Loc: "root/fileder", Contents: "sj://user/fileder"},
|
||||
ultest.File{Loc: "root/filedir", Contents: "sj://user/filedir"},
|
||||
)
|
||||
})
|
||||
|
||||
@ -196,8 +196,8 @@ func TestCpRecursiveDifficult(t *testing.T) {
|
||||
|
||||
t.Run("ExistingDirectory", func(t *testing.T) {
|
||||
state := ultest.Setup(commands,
|
||||
ultest.WithFile("sj://user/fileder"),
|
||||
ultest.WithFile("/home/user/fileder/file"),
|
||||
ultest.WithFile("sj://user/filedir"),
|
||||
ultest.WithFile("/home/user/filedir/file"),
|
||||
)
|
||||
|
||||
state.Fail(t, "cp", "sj://user", "/home/user", "--recursive")
|
||||
@ -351,8 +351,8 @@ func TestCpLocalToLocal(t *testing.T) {
|
||||
)
|
||||
})
|
||||
|
||||
t.Run("EmptyToFolder", func(t *testing.T) {
|
||||
state.Succeed(t, "cp", "", "/pre", "--recursive").RequireFiles(t,
|
||||
t.Run("RootToFolder", func(t *testing.T) {
|
||||
state.Succeed(t, "cp", "/", "/pre", "--recursive").RequireFiles(t,
|
||||
ultest.File{Loc: "/home/user1/folder1/file1.txt", Contents: "data1"},
|
||||
ultest.File{Loc: "/home/user1/folder1/file2.txt", Contents: "data2"},
|
||||
ultest.File{Loc: "/home/user1/folder2/file3.txt", Contents: "data3"},
|
||||
@ -414,6 +414,7 @@ func TestCpStandard(t *testing.T) {
|
||||
state := ultest.Setup(commands,
|
||||
ultest.WithFile("sj://user/foo"),
|
||||
ultest.WithFile("/home/user/foo"),
|
||||
ultest.WithStdin("-"),
|
||||
)
|
||||
|
||||
t.Run("StdinToRemote", func(t *testing.T) {
|
||||
|
@ -357,15 +357,15 @@ func TestLsLocal(t *testing.T) {
|
||||
)
|
||||
|
||||
t.Run("Recursive", func(t *testing.T) {
|
||||
state.Succeed(t, "ls", "/user", "--recursive", "--utc").RequireStdout(t, `
|
||||
KIND CREATED SIZE KEY
|
||||
OBJ 1970-01-01 00:00:01 0 /user/deep/aaa/bbb/1
|
||||
OBJ 1970-01-01 00:00:02 0 /user/deep/aaa/bbb/2
|
||||
OBJ 1970-01-01 00:00:03 0 /user/deep/aaa/bbb/3
|
||||
OBJ 1970-01-01 00:00:04 0 /user/foobar/1
|
||||
OBJ 1970-01-01 00:00:05 0 /user/foobar/2
|
||||
OBJ 1970-01-01 00:00:06 0 /user/foobar/3
|
||||
OBJ 1970-01-01 00:00:07 0 /user/foobaz/1
|
||||
state.Succeed(t, "ls", "/user", "--recursive", "--utc").RequireStdoutGlob(t, `
|
||||
KIND CREATED SIZE KEY
|
||||
OBJ 20 /user/deep/aaa/bbb/1
|
||||
OBJ 20 /user/deep/aaa/bbb/2
|
||||
OBJ 20 /user/deep/aaa/bbb/3
|
||||
OBJ 14 /user/foobar/1
|
||||
OBJ 14 /user/foobar/2
|
||||
OBJ 14 /user/foobar/3
|
||||
OBJ 14 /user/foobaz/1
|
||||
`)
|
||||
})
|
||||
|
||||
@ -374,20 +374,20 @@ func TestLsLocal(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("ExactPrefix", func(t *testing.T) {
|
||||
state.Succeed(t, "ls", "/user/foobar", "--utc").RequireStdout(t, `
|
||||
KIND CREATED SIZE KEY
|
||||
OBJ 1970-01-01 00:00:04 0 1
|
||||
OBJ 1970-01-01 00:00:05 0 2
|
||||
OBJ 1970-01-01 00:00:06 0 3
|
||||
state.Succeed(t, "ls", "/user/foobar", "--utc").RequireStdoutGlob(t, `
|
||||
KIND CREATED SIZE KEY
|
||||
OBJ 14 1
|
||||
OBJ 14 2
|
||||
OBJ 14 3
|
||||
`)
|
||||
})
|
||||
|
||||
t.Run("ExactPrefixWithSlash", func(t *testing.T) {
|
||||
state.Succeed(t, "ls", "/user/foobar/", "--utc").RequireStdout(t, `
|
||||
KIND CREATED SIZE KEY
|
||||
OBJ 1970-01-01 00:00:04 0 1
|
||||
OBJ 1970-01-01 00:00:05 0 2
|
||||
OBJ 1970-01-01 00:00:06 0 3
|
||||
state.Succeed(t, "ls", "/user/foobar/", "--utc").RequireStdoutGlob(t, `
|
||||
KIND CREATED SIZE KEY
|
||||
OBJ 14 1
|
||||
OBJ 14 2
|
||||
OBJ 14 3
|
||||
`)
|
||||
})
|
||||
|
||||
@ -402,11 +402,11 @@ func TestLsLocal(t *testing.T) {
|
||||
PRE bbb/
|
||||
`)
|
||||
|
||||
state.Succeed(t, "ls", "/user/deep/aaa/bbb/", "--utc").RequireStdout(t, `
|
||||
KIND CREATED SIZE KEY
|
||||
OBJ 1970-01-01 00:00:01 0 1
|
||||
OBJ 1970-01-01 00:00:02 0 2
|
||||
OBJ 1970-01-01 00:00:03 0 3
|
||||
state.Succeed(t, "ls", "/user/deep/aaa/bbb/", "--utc").RequireStdoutGlob(t, `
|
||||
KIND CREATED SIZE KEY
|
||||
OBJ 20 1
|
||||
OBJ 20 2
|
||||
OBJ 20 3
|
||||
`)
|
||||
})
|
||||
}
|
||||
@ -442,26 +442,26 @@ func TestLsRelative(t *testing.T) {
|
||||
})
|
||||
|
||||
recursive := `
|
||||
KIND CREATED SIZE KEY
|
||||
OBJ 1970-01-01 00:00:01 0 deep/aaa/bbb/1
|
||||
OBJ 1970-01-01 00:00:02 0 deep/aaa/bbb/2
|
||||
OBJ 1970-01-01 00:00:03 0 deep/aaa/bbb/3
|
||||
OBJ 1970-01-01 00:00:04 0 foobar/1
|
||||
OBJ 1970-01-01 00:00:05 0 foobar/2
|
||||
OBJ 1970-01-01 00:00:06 0 foobar/3
|
||||
OBJ 1970-01-01 00:00:07 0 foobaz/1
|
||||
KIND CREATED SIZE KEY
|
||||
OBJ 14 deep/aaa/bbb/1
|
||||
OBJ 14 deep/aaa/bbb/2
|
||||
OBJ 14 deep/aaa/bbb/3
|
||||
OBJ 8 foobar/1
|
||||
OBJ 8 foobar/2
|
||||
OBJ 8 foobar/3
|
||||
OBJ 8 foobaz/1
|
||||
`
|
||||
|
||||
t.Run("Recursive", func(t *testing.T) {
|
||||
state.Succeed(t, "ls", "", "--recursive", "--utc").RequireStdout(t, recursive)
|
||||
state.Succeed(t, "ls", "", "--recursive", "--utc").RequireStdoutGlob(t, recursive)
|
||||
})
|
||||
|
||||
t.Run("RecursiveDot", func(t *testing.T) {
|
||||
state.Succeed(t, "ls", ".", "--recursive", "--utc").RequireStdout(t, recursive)
|
||||
state.Succeed(t, "ls", ".", "--recursive", "--utc").RequireStdoutGlob(t, recursive)
|
||||
})
|
||||
|
||||
t.Run("RecursiveDotSlash", func(t *testing.T) {
|
||||
state.Succeed(t, "ls", "./", "--recursive", "--utc").RequireStdout(t, recursive)
|
||||
state.Succeed(t, "ls", "./", "--recursive", "--utc").RequireStdoutGlob(t, recursive)
|
||||
})
|
||||
|
||||
}
|
||||
|
@ -22,7 +22,7 @@ func (ex *external) OpenFilesystem(ctx context.Context, accessName string, optio
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ulfs.NewMixed(ulfs.NewLocal(), ulfs.NewRemote(project)), nil
|
||||
return ulfs.NewMixed(ulfs.NewLocal(ulfs.NewLocalBackendOS()), ulfs.NewRemote(project)), nil
|
||||
}
|
||||
|
||||
func (ex *external) OpenProject(ctx context.Context, accessName string, options ...ulext.Option) (*uplink.Project, error) {
|
||||
|
@ -36,7 +36,7 @@ type RemoveOptions struct {
|
||||
|
||||
func (ro *RemoveOptions) isPending() bool { return ro != nil && ro.Pending }
|
||||
|
||||
// Filesystem represents either the local Filesystem or the data backed by a project.
|
||||
// Filesystem represents either the local filesystem or the data backed by a project.
|
||||
type Filesystem interface {
|
||||
Close() error
|
||||
Open(ctx clingy.Context, loc ulloc.Location) (MultiReadHandle, error)
|
||||
@ -49,6 +49,30 @@ type Filesystem interface {
|
||||
Stat(ctx context.Context, loc ulloc.Location) (*ObjectInfo, error)
|
||||
}
|
||||
|
||||
// FilesystemLocal is the interface for a local filesystem.
|
||||
type FilesystemLocal interface {
|
||||
IsLocalDir(ctx context.Context, path string) bool
|
||||
Open(ctx context.Context, path string) (MultiReadHandle, error)
|
||||
Create(ctx context.Context, path string) (MultiWriteHandle, error)
|
||||
Move(ctx context.Context, oldpath string, newpath string) error
|
||||
Copy(ctx context.Context, oldpath string, newpath string) error
|
||||
Remove(ctx context.Context, path string, opts *RemoveOptions) error
|
||||
List(ctx context.Context, path string, opts *ListOptions) (ObjectIterator, error)
|
||||
Stat(ctx context.Context, path string) (*ObjectInfo, error)
|
||||
}
|
||||
|
||||
// FilesystemRemote is the interface for a remote filesystem.
|
||||
type FilesystemRemote interface {
|
||||
Close() error
|
||||
Open(ctx context.Context, bucket, key string) (MultiReadHandle, error)
|
||||
Create(ctx context.Context, bucket, key string, opts *CreateOptions) (MultiWriteHandle, error)
|
||||
Move(ctx context.Context, oldbucket, oldkey string, newbucket, newkey string) error
|
||||
Copy(ctx context.Context, oldbucket, oldkey string, newbucket, newkey string) error
|
||||
Remove(ctx context.Context, bucket, key string, opts *RemoveOptions) error
|
||||
List(ctx context.Context, bucket, key string, opts *ListOptions) ObjectIterator
|
||||
Stat(ctx context.Context, bucket, key string) (*ObjectInfo, error)
|
||||
}
|
||||
|
||||
//
|
||||
// object info
|
||||
//
|
||||
|
@ -4,8 +4,6 @@
|
||||
package ulfs
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/cmd/uplink/ulloc"
|
||||
@ -16,7 +14,7 @@ import (
|
||||
//
|
||||
|
||||
// osMultiReadHandle implements MultiReadHandle for *os.Files.
|
||||
func newOSMultiReadHandle(fh *os.File) (MultiReadHandle, error) {
|
||||
func newOSMultiReadHandle(fh LocalBackendFile) (MultiReadHandle, error) {
|
||||
fi, err := fh.Stat()
|
||||
if err != nil {
|
||||
return nil, errs.Wrap(err)
|
||||
@ -33,19 +31,23 @@ func newOSMultiReadHandle(fh *os.File) (MultiReadHandle, error) {
|
||||
// write handles
|
||||
//
|
||||
|
||||
type fileGenericWriter os.File
|
||||
type fileGenericWriter struct {
|
||||
fs LocalBackend
|
||||
raw LocalBackendFile
|
||||
}
|
||||
|
||||
func (f *fileGenericWriter) raw() *os.File { return (*os.File)(f) }
|
||||
|
||||
func (f *fileGenericWriter) WriteAt(b []byte, off int64) (int, error) { return f.raw().WriteAt(b, off) }
|
||||
func (f *fileGenericWriter) Commit() error { return f.raw().Close() }
|
||||
func (f *fileGenericWriter) WriteAt(b []byte, off int64) (int, error) { return f.raw.WriteAt(b, off) }
|
||||
func (f *fileGenericWriter) Commit() error { return f.raw.Close() }
|
||||
func (f *fileGenericWriter) Abort() error {
|
||||
return errs.Combine(
|
||||
f.raw().Close(),
|
||||
os.Remove(f.raw().Name()),
|
||||
f.raw.Close(),
|
||||
f.fs.Remove(f.raw.Name()),
|
||||
)
|
||||
}
|
||||
|
||||
func newOSMultiWriteHandle(fh *os.File) MultiWriteHandle {
|
||||
return NewGenericMultiWriteHandle((*fileGenericWriter)(fh))
|
||||
func newOSMultiWriteHandle(fs LocalBackend, fh LocalBackendFile) MultiWriteHandle {
|
||||
return NewGenericMultiWriteHandle(&fileGenericWriter{
|
||||
fs: fs,
|
||||
raw: fh,
|
||||
})
|
||||
}
|
||||
|
@ -5,28 +5,51 @@ package ulfs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io/ioutil"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/cmd/uplink/ulloc"
|
||||
)
|
||||
|
||||
// LocalBackendFile represents a file in the filesystem.
|
||||
type LocalBackendFile interface {
|
||||
io.Closer
|
||||
io.ReaderAt
|
||||
io.WriterAt
|
||||
Name() string
|
||||
Stat() (os.FileInfo, error)
|
||||
Readdir(int) ([]os.FileInfo, error)
|
||||
}
|
||||
|
||||
// LocalBackend abstracts what the Local filesystem interacts with.
|
||||
type LocalBackend interface {
|
||||
Create(name string) (LocalBackendFile, error)
|
||||
MkdirAll(path string, perm os.FileMode) error
|
||||
Open(name string) (LocalBackendFile, error)
|
||||
Remove(name string) error
|
||||
Rename(oldname, newname string) error
|
||||
Stat(name string) (os.FileInfo, error)
|
||||
}
|
||||
|
||||
// Local implements something close to a filesystem but backed by the local disk.
|
||||
type Local struct{}
|
||||
type Local struct {
|
||||
fs LocalBackend
|
||||
}
|
||||
|
||||
// NewLocal constructs a Local filesystem.
|
||||
func NewLocal() *Local {
|
||||
return &Local{}
|
||||
func NewLocal(fs LocalBackend) *Local {
|
||||
return &Local{
|
||||
fs: fs,
|
||||
}
|
||||
}
|
||||
|
||||
// Open returns a read ReadHandle for the given local path.
|
||||
func (l *Local) Open(ctx context.Context, path string) (MultiReadHandle, error) {
|
||||
fh, err := os.Open(path)
|
||||
fh, err := l.fs.Open(path)
|
||||
if err != nil {
|
||||
return nil, errs.Wrap(err)
|
||||
}
|
||||
@ -35,28 +58,28 @@ func (l *Local) Open(ctx context.Context, path string) (MultiReadHandle, error)
|
||||
|
||||
// Create makes any directories necessary to create a file at path and returns a WriteHandle.
|
||||
func (l *Local) Create(ctx context.Context, path string) (MultiWriteHandle, error) {
|
||||
fi, err := os.Stat(path)
|
||||
fi, err := l.fs.Stat(path)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return nil, errs.Wrap(err)
|
||||
} else if err == nil && fi.IsDir() {
|
||||
return nil, errs.New("path exists as a directory already")
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
|
||||
if err := l.fs.MkdirAll(filepath.Dir(path), 0755); err != nil {
|
||||
return nil, errs.Wrap(err)
|
||||
}
|
||||
|
||||
// TODO: atomic rename
|
||||
fh, err := os.Create(path)
|
||||
fh, err := l.fs.Create(path)
|
||||
if err != nil {
|
||||
return nil, errs.Wrap(err)
|
||||
}
|
||||
return newOSMultiWriteHandle(fh), nil
|
||||
return newOSMultiWriteHandle(l.fs, fh), nil
|
||||
}
|
||||
|
||||
// Move moves file to provided path.
|
||||
func (l *Local) Move(ctx context.Context, oldpath, newpath string) error {
|
||||
return os.Rename(oldpath, newpath)
|
||||
return l.fs.Rename(oldpath, newpath)
|
||||
}
|
||||
|
||||
// Copy copies file to provided path.
|
||||
@ -70,7 +93,7 @@ func (l *Local) Remove(ctx context.Context, path string, opts *RemoveOptions) er
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := os.Remove(path); os.IsNotExist(err) {
|
||||
if err := l.fs.Remove(path); os.IsNotExist(err) {
|
||||
return nil
|
||||
} else if err != nil {
|
||||
return err
|
||||
@ -86,20 +109,13 @@ func (l *Local) List(ctx context.Context, path string, opts *ListOptions) (Objec
|
||||
return emptyObjectIterator{}, nil
|
||||
}
|
||||
|
||||
prefix := path
|
||||
if idx := strings.LastIndex(path, "/"); idx >= 0 {
|
||||
prefix = path[:idx+1]
|
||||
}
|
||||
|
||||
prefix, err := filepath.Abs(prefix)
|
||||
if err != nil {
|
||||
return nil, errs.Wrap(err)
|
||||
}
|
||||
prefix += string(filepath.Separator)
|
||||
prefix := filepath.Clean(path) + string(filepath.Separator)
|
||||
|
||||
var err error
|
||||
var files []os.FileInfo
|
||||
|
||||
if opts.isRecursive() {
|
||||
err = filepath.Walk(prefix, func(path string, info os.FileInfo, err error) error {
|
||||
err = walk(l.fs, filepath.Clean(path), func(path string, info os.FileInfo, err error) error {
|
||||
if err == nil && !info.IsDir() {
|
||||
rel, err := filepath.Rel(prefix, path)
|
||||
if err != nil {
|
||||
@ -113,10 +129,13 @@ func (l *Local) List(ctx context.Context, path string, opts *ListOptions) (Objec
|
||||
return nil
|
||||
})
|
||||
} else {
|
||||
files, err = ioutil.ReadDir(prefix)
|
||||
files, err = readdir(l.fs, prefix, -1)
|
||||
if os.IsNotExist(err) {
|
||||
return emptyObjectIterator{}, nil
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errs.Wrap(err)
|
||||
}
|
||||
|
||||
sort.Slice(files, func(i, j int) bool {
|
||||
@ -146,7 +165,7 @@ func (l *Local) List(ctx context.Context, path string, opts *ListOptions) (Objec
|
||||
|
||||
// IsLocalDir returns true if the path is a directory.
|
||||
func (l *Local) IsLocalDir(ctx context.Context, path string) bool {
|
||||
fi, err := os.Stat(path)
|
||||
fi, err := l.fs.Stat(path)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
@ -155,7 +174,7 @@ func (l *Local) IsLocalDir(ctx context.Context, path string) bool {
|
||||
|
||||
// Stat returns an ObjectInfo describing the provided path.
|
||||
func (l *Local) Stat(ctx context.Context, path string) (*ObjectInfo, error) {
|
||||
fi, err := os.Stat(path)
|
||||
fi, err := l.fs.Stat(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -203,3 +222,55 @@ func (fi *fileinfoObjectIterator) Item() ObjectInfo {
|
||||
ContentLength: fi.current.Size(),
|
||||
}
|
||||
}
|
||||
|
||||
func walk(fs LocalBackend, path string, cb func(path string, info os.FileInfo, err error) error) error {
|
||||
info, err := fs.Stat(path)
|
||||
if err != nil {
|
||||
return cb(path, nil, err)
|
||||
}
|
||||
return walkHelper(fs, path, info, cb)
|
||||
}
|
||||
|
||||
func walkHelper(fs LocalBackend, path string, info os.FileInfo, cb func(path string, info os.FileInfo, err error) error) error {
|
||||
if !info.IsDir() {
|
||||
return cb(path, info, nil)
|
||||
}
|
||||
|
||||
infos, err := readdir(fs, path, -1)
|
||||
err1 := cb(path, info, err)
|
||||
if err != nil || err1 != nil {
|
||||
return err1
|
||||
}
|
||||
|
||||
for _, info := range infos {
|
||||
filename := filepath.Join(path, info.Name())
|
||||
fileInfo, err := fs.Stat(filename)
|
||||
if err != nil {
|
||||
if err := cb(filename, fileInfo, err); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
err := walkHelper(fs, filename, fileInfo, cb)
|
||||
if err != nil && !fileInfo.IsDir() {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func readdir(fs LocalBackend, path string, n int) ([]os.FileInfo, error) {
|
||||
f, err := fs.Open(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
list, err := f.Readdir(n)
|
||||
_ = f.Close()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sort.Slice(list, func(i, j int) bool {
|
||||
return list[i].Name() < list[j].Name()
|
||||
})
|
||||
return list, nil
|
||||
}
|
||||
|
310
cmd/uplink/ulfs/local_backend_mem.go
Normal file
310
cmd/uplink/ulfs/local_backend_mem.go
Normal file
@ -0,0 +1,310 @@
|
||||
// Copyright (C) 2022 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package ulfs
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
)
|
||||
|
||||
// LocalBackendMem implements LocalBackend with memory backed files.
|
||||
type LocalBackendMem struct {
|
||||
root *memDir
|
||||
cwd *memDir
|
||||
}
|
||||
|
||||
// NewLocalBackendMem creates a new LocalBackendMem.
|
||||
func NewLocalBackendMem() *LocalBackendMem {
|
||||
return &LocalBackendMem{
|
||||
root: newMemDir("/"),
|
||||
cwd: newMemDir(""),
|
||||
}
|
||||
}
|
||||
|
||||
func (l *LocalBackendMem) openRoot(name string) (string, *memDir) {
|
||||
if strings.HasPrefix(name, string(filepath.Separator)) {
|
||||
return name, l.root
|
||||
} else if name == "." {
|
||||
return name[1:], l.cwd
|
||||
} else if strings.HasPrefix(name, "./") {
|
||||
return name[2:], l.cwd
|
||||
}
|
||||
return name, l.cwd
|
||||
}
|
||||
|
||||
func (l *LocalBackendMem) openParent(name string) (*memDir, string, error) {
|
||||
dir := filepath.Dir(name)
|
||||
|
||||
fh, err := l.Open(dir)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
md, ok := fh.(*memDir)
|
||||
if !ok {
|
||||
return nil, "", errs.New("parent not a directory: %q", dir)
|
||||
}
|
||||
return md, filepath.Base(name), nil
|
||||
}
|
||||
|
||||
// Create creates a new file for the given name.
|
||||
func (l *LocalBackendMem) Create(name string) (LocalBackendFile, error) {
|
||||
name = filepath.Clean(name)
|
||||
|
||||
md, base, err := l.openParent(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if fh, ok := md.children[base]; ok {
|
||||
if _, ok := fh.(*memDir); ok {
|
||||
return nil, errs.New("file already exists: %q", name)
|
||||
}
|
||||
}
|
||||
mf := newMemFile(name)
|
||||
md.children[base] = mf
|
||||
return mf, nil
|
||||
}
|
||||
|
||||
// MkdirAll recursively creates directories to make name a directory.
|
||||
func (l *LocalBackendMem) MkdirAll(name string, perm os.FileMode) error {
|
||||
name = filepath.Clean(name)
|
||||
|
||||
name, root := l.openRoot(name)
|
||||
return iterateComponents(name, func(name, ent string) error {
|
||||
fh, ok := root.children[ent]
|
||||
if !ok {
|
||||
fh = newMemDir(name)
|
||||
root.children[ent] = fh
|
||||
}
|
||||
md, ok := fh.(*memDir)
|
||||
if !ok {
|
||||
return errs.New("file already exists: %q", name)
|
||||
}
|
||||
root = md
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// Open opens the file with the given name.
|
||||
func (l *LocalBackendMem) Open(name string) (LocalBackendFile, error) {
|
||||
name = filepath.Clean(name)
|
||||
|
||||
var root LocalBackendFile
|
||||
name, root = l.openRoot(name)
|
||||
err := iterateComponents(name, func(name, ent string) error {
|
||||
md, ok := root.(*memDir)
|
||||
if !ok {
|
||||
return errs.New("not a directory: %q", name)
|
||||
}
|
||||
fh, ok := md.children[ent]
|
||||
if !ok {
|
||||
return os.ErrNotExist
|
||||
}
|
||||
root = fh
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return root, nil
|
||||
}
|
||||
|
||||
// Remove deletes the file with the given name.
|
||||
func (l *LocalBackendMem) Remove(name string) error {
|
||||
name = filepath.Clean(name)
|
||||
|
||||
md, base, err := l.openParent(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, ok := md.children[base]; !ok {
|
||||
return errs.New("file does not exists: %q", name)
|
||||
}
|
||||
delete(md.children, base)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Rename causes the file at oldname to be moved to newname.
|
||||
func (l *LocalBackendMem) Rename(oldname, newname string) error {
|
||||
oldname = filepath.Clean(oldname)
|
||||
newname = filepath.Clean(newname)
|
||||
|
||||
omd, obase, err := l.openParent(oldname)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
nmd, nbase, err := l.openParent(newname)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
f, ok := omd.children[obase]
|
||||
if !ok {
|
||||
return os.ErrNotExist
|
||||
}
|
||||
|
||||
switch f := f.(type) {
|
||||
case *memFile:
|
||||
f.name = newname
|
||||
case *memDir:
|
||||
f.name = newname
|
||||
}
|
||||
|
||||
nmd.children[nbase] = f
|
||||
delete(omd.children, obase)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stat returns file info for the given name.
|
||||
func (l *LocalBackendMem) Stat(name string) (os.FileInfo, error) {
|
||||
fh, err := l.Open(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return fh.Stat()
|
||||
}
|
||||
|
||||
//
|
||||
// memFile
|
||||
//
|
||||
|
||||
type memFile struct {
|
||||
name string
|
||||
buf []byte
|
||||
}
|
||||
|
||||
func newMemFile(name string) *memFile {
|
||||
return &memFile{
|
||||
name: name,
|
||||
}
|
||||
}
|
||||
|
||||
func (mf *memFile) String() string { return fmt.Sprintf("File[%q]", mf.name) }
|
||||
|
||||
func (mf *memFile) Name() string { return mf.name }
|
||||
func (mf *memFile) Close() error { return nil }
|
||||
|
||||
func (mf *memFile) ReadAt(p []byte, off int64) (int, error) {
|
||||
if off >= int64(len(mf.buf)) {
|
||||
return 0, io.EOF
|
||||
}
|
||||
return copy(p, mf.buf[off:]), nil
|
||||
}
|
||||
|
||||
func (mf *memFile) WriteAt(p []byte, off int64) (int, error) {
|
||||
if delta := (off + int64(len(p))) - int64(len(mf.buf)); delta > 0 {
|
||||
mf.buf = append(mf.buf, make([]byte, delta)...)
|
||||
}
|
||||
return copy(mf.buf[off:], p), nil
|
||||
}
|
||||
|
||||
func (mf *memFile) Stat() (os.FileInfo, error) {
|
||||
return (*memFileInfo)(mf), nil
|
||||
}
|
||||
|
||||
func (mf *memFile) Readdir(n int) ([]os.FileInfo, error) {
|
||||
return nil, errs.New("readdir on regular file")
|
||||
}
|
||||
|
||||
type memFileInfo memFile
|
||||
|
||||
var _ os.FileInfo = (*memFileInfo)(nil)
|
||||
|
||||
func (mfi *memFileInfo) Name() string {
|
||||
return filepath.Base((*memFile)(mfi).name)
|
||||
}
|
||||
|
||||
func (mfi *memFileInfo) Size() int64 { return int64(len((*memFile)(mfi).buf)) }
|
||||
func (mfi *memFileInfo) Mode() fs.FileMode { return 0777 }
|
||||
func (mfi *memFileInfo) ModTime() time.Time { return time.Time{} }
|
||||
func (mfi *memFileInfo) IsDir() bool { return false }
|
||||
func (mfi *memFileInfo) Sys() interface{} { return nil }
|
||||
|
||||
//
|
||||
// memDir
|
||||
//
|
||||
|
||||
type memDir struct {
|
||||
name string
|
||||
children map[string]LocalBackendFile
|
||||
}
|
||||
|
||||
func newMemDir(name string) *memDir {
|
||||
return &memDir{
|
||||
name: name,
|
||||
children: make(map[string]LocalBackendFile),
|
||||
}
|
||||
}
|
||||
|
||||
var _ LocalBackendFile = (*memDir)(nil)
|
||||
|
||||
func (md *memDir) String() string { return fmt.Sprintf("Dir[%q, %v]", md.name, md.children) }
|
||||
|
||||
func (md *memDir) Name() string { return md.name }
|
||||
func (md *memDir) Close() error { return nil }
|
||||
|
||||
func (md *memDir) ReadAt(p []byte, off int64) (int, error) {
|
||||
return 0, errs.New("readat on directory")
|
||||
}
|
||||
|
||||
func (md *memDir) WriteAt(p []byte, off int64) (int, error) {
|
||||
return 0, errs.New("writeat on directory")
|
||||
}
|
||||
|
||||
func (md *memDir) Stat() (os.FileInfo, error) {
|
||||
return (*memDirInfo)(md), nil
|
||||
}
|
||||
|
||||
func (md *memDir) Readdir(n int) ([]os.FileInfo, error) {
|
||||
if n != -1 {
|
||||
return nil, errs.New("can only read all entries")
|
||||
}
|
||||
out := make([]os.FileInfo, 0, len(md.children))
|
||||
for _, child := range md.children {
|
||||
info, _ := child.Stat()
|
||||
out = append(out, info)
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
type memDirInfo memDir
|
||||
|
||||
var _ os.FileInfo = (*memDirInfo)(nil)
|
||||
|
||||
func (dfi *memDirInfo) Name() string {
|
||||
return filepath.Base((*memDir)(dfi).name)
|
||||
}
|
||||
|
||||
func (dfi *memDirInfo) Size() int64 { return 0 }
|
||||
func (dfi *memDirInfo) Mode() fs.FileMode { return 0777 }
|
||||
func (dfi *memDirInfo) ModTime() time.Time { return time.Time{} }
|
||||
func (dfi *memDirInfo) IsDir() bool { return true }
|
||||
func (dfi *memDirInfo) Sys() interface{} { return nil }
|
||||
|
||||
//
|
||||
// helpers
|
||||
//
|
||||
|
||||
func iterateComponents(name string, cb func(name, ent string) error) error {
|
||||
i := 0
|
||||
for i < len(name) {
|
||||
part := strings.IndexByte(name[i:], filepath.Separator)
|
||||
if part == -1 {
|
||||
return cb(name, name[i:])
|
||||
}
|
||||
if err := cb(name[:i+part], name[i:i+part]); err != nil {
|
||||
return err
|
||||
}
|
||||
i += part + 1
|
||||
}
|
||||
return nil
|
||||
}
|
44
cmd/uplink/ulfs/local_backend_os.go
Normal file
44
cmd/uplink/ulfs/local_backend_os.go
Normal file
@ -0,0 +1,44 @@
|
||||
// Copyright (C) 2022 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package ulfs
|
||||
|
||||
import "os"
|
||||
|
||||
// LocalBackendOS implements LocalBackend by using the os package.
|
||||
type LocalBackendOS struct{}
|
||||
|
||||
// NewLocalBackendOS constructs a new LocalBackendOS.
|
||||
func NewLocalBackendOS() *LocalBackendOS {
|
||||
return new(LocalBackendOS)
|
||||
}
|
||||
|
||||
// Create calls os.Create.
|
||||
func (l *LocalBackendOS) Create(name string) (LocalBackendFile, error) {
|
||||
return os.Create(name)
|
||||
}
|
||||
|
||||
// MkdirAll calls os.MkdirAll.
|
||||
func (l *LocalBackendOS) MkdirAll(path string, perm os.FileMode) error {
|
||||
return os.MkdirAll(path, perm)
|
||||
}
|
||||
|
||||
// Open calls os.Open.
|
||||
func (l *LocalBackendOS) Open(name string) (LocalBackendFile, error) {
|
||||
return os.Open(name)
|
||||
}
|
||||
|
||||
// Remove calls os.Remove.
|
||||
func (l *LocalBackendOS) Remove(name string) error {
|
||||
return os.Remove(name)
|
||||
}
|
||||
|
||||
// Rename calls os.Rename.
|
||||
func (l *LocalBackendOS) Rename(oldname, newname string) error {
|
||||
return os.Rename(oldname, newname)
|
||||
}
|
||||
|
||||
// Stat calls os.Stat.
|
||||
func (l *LocalBackendOS) Stat(name string) (os.FileInfo, error) {
|
||||
return os.Stat(name)
|
||||
}
|
@ -14,12 +14,12 @@ import (
|
||||
|
||||
// Mixed dispatches to either the local or remote filesystem depending on the location.
|
||||
type Mixed struct {
|
||||
local *Local
|
||||
remote *Remote
|
||||
local FilesystemLocal
|
||||
remote FilesystemRemote
|
||||
}
|
||||
|
||||
// NewMixed returns a Mixed backed by the provided local and remote filesystems.
|
||||
func NewMixed(local *Local, remote *Remote) *Mixed {
|
||||
func NewMixed(local FilesystemLocal, remote FilesystemRemote) *Mixed {
|
||||
return &Mixed{
|
||||
local: local,
|
||||
remote: remote,
|
||||
|
@ -7,13 +7,10 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/zeebo/clingy"
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/cmd/uplink/ulfs"
|
||||
@ -24,22 +21,19 @@ import (
|
||||
// ulfs.Filesystem
|
||||
//
|
||||
|
||||
type testFilesystem struct {
|
||||
stdin string
|
||||
type remoteFilesystem struct {
|
||||
created int64
|
||||
files map[ulloc.Location]memFileData
|
||||
pending map[ulloc.Location][]*memWriteHandle
|
||||
locals map[string]bool // true means path is a directory
|
||||
buckets map[string]struct{}
|
||||
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func newTestFilesystem() *testFilesystem {
|
||||
return &testFilesystem{
|
||||
func newRemoteFilesystem() *remoteFilesystem {
|
||||
return &remoteFilesystem{
|
||||
files: make(map[ulloc.Location]memFileData),
|
||||
pending: make(map[ulloc.Location][]*memWriteHandle),
|
||||
locals: make(map[string]bool),
|
||||
buckets: make(map[string]struct{}),
|
||||
}
|
||||
}
|
||||
@ -54,12 +48,12 @@ func (mf memFileData) expired() bool {
|
||||
return mf.expires != time.Time{} && mf.expires.Before(time.Now())
|
||||
}
|
||||
|
||||
func (tfs *testFilesystem) ensureBucket(name string) {
|
||||
tfs.buckets[name] = struct{}{}
|
||||
func (rfs *remoteFilesystem) ensureBucket(name string) {
|
||||
rfs.buckets[name] = struct{}{}
|
||||
}
|
||||
|
||||
func (tfs *testFilesystem) Files() (files []File) {
|
||||
for loc, mf := range tfs.files {
|
||||
func (rfs *remoteFilesystem) Files() (files []File) {
|
||||
for loc, mf := range rfs.files {
|
||||
if mf.expired() {
|
||||
continue
|
||||
}
|
||||
@ -72,8 +66,8 @@ func (tfs *testFilesystem) Files() (files []File) {
|
||||
return files
|
||||
}
|
||||
|
||||
func (tfs *testFilesystem) Pending() (files []File) {
|
||||
for loc, mh := range tfs.pending {
|
||||
func (rfs *remoteFilesystem) Pending() (files []File) {
|
||||
for loc, mh := range rfs.pending {
|
||||
for _, h := range mh {
|
||||
files = append(files, File{
|
||||
Loc: loc.String(),
|
||||
@ -85,7 +79,7 @@ func (tfs *testFilesystem) Pending() (files []File) {
|
||||
return files
|
||||
}
|
||||
|
||||
func (tfs *testFilesystem) Close() error {
|
||||
func (rfs *remoteFilesystem) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -101,15 +95,13 @@ func newMultiReadHandle(contents string) ulfs.MultiReadHandle {
|
||||
})
|
||||
}
|
||||
|
||||
func (tfs *testFilesystem) Open(ctx clingy.Context, loc ulloc.Location) (ulfs.MultiReadHandle, error) {
|
||||
tfs.mu.Lock()
|
||||
defer tfs.mu.Unlock()
|
||||
func (rfs *remoteFilesystem) Open(ctx context.Context, bucket, key string) (ulfs.MultiReadHandle, error) {
|
||||
rfs.mu.Lock()
|
||||
defer rfs.mu.Unlock()
|
||||
|
||||
if loc.Std() {
|
||||
return newMultiReadHandle("-"), nil
|
||||
}
|
||||
loc := ulloc.NewRemote(bucket, key)
|
||||
|
||||
mf, ok := tfs.files[loc]
|
||||
mf, ok := rfs.files[loc]
|
||||
if !ok {
|
||||
return nil, errs.New("file does not exist %q", loc)
|
||||
}
|
||||
@ -117,28 +109,14 @@ func (tfs *testFilesystem) Open(ctx clingy.Context, loc ulloc.Location) (ulfs.Mu
|
||||
return newMultiReadHandle(mf.contents), nil
|
||||
}
|
||||
|
||||
func (tfs *testFilesystem) Create(ctx clingy.Context, loc ulloc.Location, opts *ulfs.CreateOptions) (_ ulfs.MultiWriteHandle, err error) {
|
||||
tfs.mu.Lock()
|
||||
defer tfs.mu.Unlock()
|
||||
func (rfs *remoteFilesystem) Create(ctx context.Context, bucket, key string, opts *ulfs.CreateOptions) (_ ulfs.MultiWriteHandle, err error) {
|
||||
rfs.mu.Lock()
|
||||
defer rfs.mu.Unlock()
|
||||
|
||||
if loc.Std() {
|
||||
return ulfs.NewGenericMultiWriteHandle(new(discardWriteHandle)), nil
|
||||
}
|
||||
loc := ulloc.NewRemote(bucket, key)
|
||||
|
||||
if bucket, _, ok := loc.RemoteParts(); ok {
|
||||
if _, ok := tfs.buckets[bucket]; !ok {
|
||||
return nil, errs.New("bucket %q does not exist", bucket)
|
||||
}
|
||||
}
|
||||
|
||||
if path, ok := loc.LocalParts(); ok {
|
||||
if loc.Directoryish() || tfs.isLocalDir(ctx, loc) {
|
||||
return nil, errs.New("unable to open file for writing: %q", loc)
|
||||
}
|
||||
dir := ulloc.CleanPath(filepath.Dir(path))
|
||||
if err := tfs.mkdirAll(ctx, dir); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if _, ok := rfs.buckets[bucket]; !ok {
|
||||
return nil, errs.New("bucket %q does not exist", bucket)
|
||||
}
|
||||
|
||||
expires := time.Time{}
|
||||
@ -146,71 +124,79 @@ func (tfs *testFilesystem) Create(ctx clingy.Context, loc ulloc.Location, opts *
|
||||
expires = opts.Expires
|
||||
}
|
||||
|
||||
tfs.created++
|
||||
rfs.created++
|
||||
wh := &memWriteHandle{
|
||||
loc: loc,
|
||||
tfs: tfs,
|
||||
cre: tfs.created,
|
||||
rfs: rfs,
|
||||
cre: rfs.created,
|
||||
expires: expires,
|
||||
}
|
||||
|
||||
if loc.Remote() {
|
||||
tfs.pending[loc] = append(tfs.pending[loc], wh)
|
||||
}
|
||||
rfs.pending[loc] = append(rfs.pending[loc], wh)
|
||||
|
||||
return ulfs.NewGenericMultiWriteHandle(wh), nil
|
||||
}
|
||||
|
||||
func (tfs *testFilesystem) Move(ctx clingy.Context, source, dest ulloc.Location) error {
|
||||
tfs.mu.Lock()
|
||||
defer tfs.mu.Unlock()
|
||||
func (rfs *remoteFilesystem) Move(ctx context.Context, oldbucket, oldkey string, newbucket, newkey string) error {
|
||||
rfs.mu.Lock()
|
||||
defer rfs.mu.Unlock()
|
||||
|
||||
mf, ok := tfs.files[source]
|
||||
source := ulloc.NewRemote(oldbucket, oldkey)
|
||||
dest := ulloc.NewRemote(newbucket, newkey)
|
||||
|
||||
mf, ok := rfs.files[source]
|
||||
if !ok {
|
||||
return errs.New("file does not exist %q", source)
|
||||
}
|
||||
delete(tfs.files, source)
|
||||
tfs.files[dest] = mf
|
||||
delete(rfs.files, source)
|
||||
rfs.files[dest] = mf
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tfs *testFilesystem) Copy(ctx clingy.Context, source, dest ulloc.Location) error {
|
||||
tfs.mu.Lock()
|
||||
defer tfs.mu.Unlock()
|
||||
func (rfs *remoteFilesystem) Copy(ctx context.Context, oldbucket, oldkey string, newbucket, newkey string) error {
|
||||
rfs.mu.Lock()
|
||||
defer rfs.mu.Unlock()
|
||||
|
||||
mf, ok := tfs.files[source]
|
||||
source := ulloc.NewRemote(oldbucket, oldkey)
|
||||
dest := ulloc.NewRemote(newbucket, newkey)
|
||||
|
||||
mf, ok := rfs.files[source]
|
||||
if !ok {
|
||||
return errs.New("file does not exist %q", source)
|
||||
}
|
||||
tfs.files[dest] = mf
|
||||
rfs.files[dest] = mf
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tfs *testFilesystem) Remove(ctx context.Context, loc ulloc.Location, opts *ulfs.RemoveOptions) error {
|
||||
tfs.mu.Lock()
|
||||
defer tfs.mu.Unlock()
|
||||
func (rfs *remoteFilesystem) Remove(ctx context.Context, bucket, key string, opts *ulfs.RemoveOptions) error {
|
||||
rfs.mu.Lock()
|
||||
defer rfs.mu.Unlock()
|
||||
|
||||
loc := ulloc.NewRemote(bucket, key)
|
||||
|
||||
if opts == nil || !opts.Pending {
|
||||
delete(tfs.files, loc)
|
||||
delete(rfs.files, loc)
|
||||
} else {
|
||||
// TODO: Remove needs an API that understands that multiple pending files may exist
|
||||
delete(tfs.pending, loc)
|
||||
delete(rfs.pending, loc)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tfs *testFilesystem) List(ctx context.Context, prefix ulloc.Location, opts *ulfs.ListOptions) (ulfs.ObjectIterator, error) {
|
||||
tfs.mu.Lock()
|
||||
defer tfs.mu.Unlock()
|
||||
func (rfs *remoteFilesystem) List(ctx context.Context, bucket, key string, opts *ulfs.ListOptions) ulfs.ObjectIterator {
|
||||
rfs.mu.Lock()
|
||||
defer rfs.mu.Unlock()
|
||||
|
||||
prefix := ulloc.NewRemote(bucket, key)
|
||||
|
||||
if opts != nil && opts.Pending {
|
||||
return tfs.listPending(ctx, prefix, opts)
|
||||
return rfs.listPending(ctx, prefix, opts)
|
||||
}
|
||||
|
||||
prefixDir := prefix.AsDirectoryish()
|
||||
|
||||
var infos []ulfs.ObjectInfo
|
||||
for loc, mf := range tfs.files {
|
||||
for loc, mf := range rfs.files {
|
||||
if (loc.HasPrefix(prefixDir) || loc == prefix) && !mf.expired() {
|
||||
infos = append(infos, ulfs.ObjectInfo{
|
||||
Loc: loc,
|
||||
@ -226,18 +212,14 @@ func (tfs *testFilesystem) List(ctx context.Context, prefix ulloc.Location, opts
|
||||
infos = collapseObjectInfos(prefix, infos)
|
||||
}
|
||||
|
||||
return &objectInfoIterator{infos: infos}, nil
|
||||
return &objectInfoIterator{infos: infos}
|
||||
}
|
||||
|
||||
func (tfs *testFilesystem) listPending(ctx context.Context, prefix ulloc.Location, opts *ulfs.ListOptions) (ulfs.ObjectIterator, error) {
|
||||
if prefix.Local() {
|
||||
return &objectInfoIterator{}, nil
|
||||
}
|
||||
|
||||
func (rfs *remoteFilesystem) listPending(ctx context.Context, prefix ulloc.Location, opts *ulfs.ListOptions) ulfs.ObjectIterator {
|
||||
prefixDir := prefix.AsDirectoryish()
|
||||
|
||||
var infos []ulfs.ObjectInfo
|
||||
for loc, whs := range tfs.pending {
|
||||
for loc, whs := range rfs.pending {
|
||||
if loc.HasPrefix(prefixDir) || loc == prefix {
|
||||
for _, wh := range whs {
|
||||
infos = append(infos, ulfs.ObjectInfo{
|
||||
@ -254,27 +236,16 @@ func (tfs *testFilesystem) listPending(ctx context.Context, prefix ulloc.Locatio
|
||||
infos = collapseObjectInfos(prefix, infos)
|
||||
}
|
||||
|
||||
return &objectInfoIterator{infos: infos}, nil
|
||||
return &objectInfoIterator{infos: infos}
|
||||
}
|
||||
|
||||
func (tfs *testFilesystem) IsLocalDir(ctx context.Context, loc ulloc.Location) (local bool) {
|
||||
tfs.mu.Lock()
|
||||
defer tfs.mu.Unlock()
|
||||
func (rfs *remoteFilesystem) Stat(ctx context.Context, bucket, key string) (*ulfs.ObjectInfo, error) {
|
||||
rfs.mu.Lock()
|
||||
defer rfs.mu.Unlock()
|
||||
|
||||
return tfs.isLocalDir(ctx, loc)
|
||||
}
|
||||
loc := ulloc.NewRemote(bucket, key)
|
||||
|
||||
func (tfs *testFilesystem) isLocalDir(ctx context.Context, loc ulloc.Location) (local bool) {
|
||||
path, ok := loc.LocalParts()
|
||||
return ok && (ulloc.CleanPath(path) == "." || tfs.locals[path])
|
||||
}
|
||||
|
||||
func (tfs *testFilesystem) Stat(ctx context.Context, loc ulloc.Location) (*ulfs.ObjectInfo, error) {
|
||||
if loc.Std() {
|
||||
return nil, errs.New("unable to stat loc %q", loc.Loc())
|
||||
}
|
||||
|
||||
mf, ok := tfs.files[loc]
|
||||
mf, ok := rfs.files[loc]
|
||||
if !ok {
|
||||
return nil, errs.New("file does not exist: %q", loc.Loc())
|
||||
}
|
||||
@ -291,32 +262,6 @@ func (tfs *testFilesystem) Stat(ctx context.Context, loc ulloc.Location) (*ulfs.
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (tfs *testFilesystem) mkdirAll(ctx context.Context, dir string) error {
|
||||
i := 0
|
||||
for i < len(dir) {
|
||||
slash := strings.Index(dir[i:], "/")
|
||||
if slash == -1 {
|
||||
break
|
||||
}
|
||||
if err := tfs.mkdir(ctx, dir[:i+slash]); err != nil {
|
||||
return err
|
||||
}
|
||||
i += slash + 1
|
||||
}
|
||||
if len(dir) > 0 {
|
||||
return tfs.mkdir(ctx, dir)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tfs *testFilesystem) mkdir(ctx context.Context, dir string) error {
|
||||
if isDir, ok := tfs.locals[dir]; ok && !isDir {
|
||||
return errs.New("cannot create directory: %q is a file", dir)
|
||||
}
|
||||
tfs.locals[dir] = true
|
||||
return nil
|
||||
}
|
||||
|
||||
//
|
||||
// ulfs.WriteHandle
|
||||
//
|
||||
@ -324,7 +269,7 @@ func (tfs *testFilesystem) mkdir(ctx context.Context, dir string) error {
|
||||
type memWriteHandle struct {
|
||||
buf []byte
|
||||
loc ulloc.Location
|
||||
tfs *testFilesystem
|
||||
rfs *remoteFilesystem
|
||||
cre int64
|
||||
expires time.Time
|
||||
done bool
|
||||
@ -342,18 +287,14 @@ func (b *memWriteHandle) WriteAt(p []byte, off int64) (int, error) {
|
||||
}
|
||||
|
||||
func (b *memWriteHandle) Commit() error {
|
||||
b.tfs.mu.Lock()
|
||||
defer b.tfs.mu.Unlock()
|
||||
b.rfs.mu.Lock()
|
||||
defer b.rfs.mu.Unlock()
|
||||
|
||||
if err := b.close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if path, ok := b.loc.LocalParts(); ok {
|
||||
b.tfs.locals[path] = false
|
||||
}
|
||||
|
||||
b.tfs.files[b.loc] = memFileData{
|
||||
b.rfs.files[b.loc] = memFileData{
|
||||
contents: string(b.buf),
|
||||
created: b.cre,
|
||||
expires: b.expires,
|
||||
@ -363,8 +304,8 @@ func (b *memWriteHandle) Commit() error {
|
||||
}
|
||||
|
||||
func (b *memWriteHandle) Abort() error {
|
||||
b.tfs.mu.Lock()
|
||||
defer b.tfs.mu.Unlock()
|
||||
b.rfs.mu.Lock()
|
||||
defer b.rfs.mu.Unlock()
|
||||
|
||||
if err := b.close(); err != nil {
|
||||
return err
|
||||
@ -379,7 +320,7 @@ func (b *memWriteHandle) close() error {
|
||||
}
|
||||
b.done = true
|
||||
|
||||
handles := b.tfs.pending[b.loc]
|
||||
handles := b.rfs.pending[b.loc]
|
||||
for i, v := range handles {
|
||||
if v == b {
|
||||
handles = append(handles[:i], handles[i+1:]...)
|
||||
@ -388,20 +329,14 @@ func (b *memWriteHandle) close() error {
|
||||
}
|
||||
|
||||
if len(handles) > 0 {
|
||||
b.tfs.pending[b.loc] = handles
|
||||
b.rfs.pending[b.loc] = handles
|
||||
} else {
|
||||
delete(b.tfs.pending, b.loc)
|
||||
delete(b.rfs.pending, b.loc)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type discardWriteHandle struct{}
|
||||
|
||||
func (discardWriteHandle) WriteAt(p []byte, off int64) (int, error) { return len(p), nil }
|
||||
func (discardWriteHandle) Commit() error { return nil }
|
||||
func (discardWriteHandle) Abort() error { return nil }
|
||||
|
||||
//
|
||||
// ulfs.ObjectIterator
|
||||
//
|
||||
|
@ -6,6 +6,8 @@ package ultest
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io/ioutil"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
@ -62,7 +64,15 @@ func (st State) Run(t *testing.T, args ...string) Result {
|
||||
var stdin bytes.Buffer
|
||||
var ran bool
|
||||
|
||||
tfs := newTestFilesystem()
|
||||
ctx := context.Background()
|
||||
lfs := ulfs.NewLocal(ulfs.NewLocalBackendMem())
|
||||
rfs := newRemoteFilesystem()
|
||||
fs := ulfs.NewMixed(lfs, rfs)
|
||||
|
||||
cs := &callbackState{
|
||||
fs: fs,
|
||||
rfs: rfs,
|
||||
}
|
||||
|
||||
ok, err := clingy.Environment{
|
||||
Name: "uplink-test",
|
||||
@ -74,72 +84,122 @@ func (st State) Run(t *testing.T, args ...string) Result {
|
||||
|
||||
Wrap: func(ctx clingy.Context, cmd clingy.Command) error {
|
||||
for _, opt := range st.opts {
|
||||
opt.fn(t, ctx, tfs)
|
||||
opt.fn(t, ctx, cs)
|
||||
}
|
||||
|
||||
if len(tfs.stdin) > 0 {
|
||||
_, _ = stdin.WriteString(tfs.stdin)
|
||||
if len(cs.stdin) > 0 {
|
||||
_, _ = stdin.WriteString(cs.stdin)
|
||||
}
|
||||
|
||||
ran = true
|
||||
return cmd.Execute(ctx)
|
||||
},
|
||||
}.Run(context.Background(), func(cmds clingy.Commands) {
|
||||
st.cmds(cmds, newExternal(tfs, nil))
|
||||
}.Run(ctx, func(cmds clingy.Commands) {
|
||||
st.cmds(cmds, newExternal(fs, nil))
|
||||
})
|
||||
|
||||
if ok && err == nil {
|
||||
require.True(t, ran, "no command was executed: %q", args)
|
||||
}
|
||||
|
||||
files := rfs.Files()
|
||||
files = gatherLocalFiles(ctx, t, lfs, files)
|
||||
sort.Slice(files, func(i, j int) bool { return files[i].less(files[j]) })
|
||||
|
||||
return Result{
|
||||
Stdout: stdout.String(),
|
||||
Stderr: stderr.String(),
|
||||
Ok: ok,
|
||||
Err: err,
|
||||
Files: tfs.Files(),
|
||||
Pending: tfs.Pending(),
|
||||
Files: files,
|
||||
Pending: rfs.Pending(),
|
||||
}
|
||||
}
|
||||
|
||||
func gatherLocalFiles(ctx context.Context, t *testing.T, fs ulfs.FilesystemLocal, files []File) []File {
|
||||
{
|
||||
iter, err := fs.List(ctx, "", &ulfs.ListOptions{Recursive: true})
|
||||
require.NoError(t, err)
|
||||
files = collectIterator(ctx, t, fs, iter, files)
|
||||
}
|
||||
{
|
||||
iter, err := fs.List(ctx, "/", &ulfs.ListOptions{Recursive: true})
|
||||
require.NoError(t, err)
|
||||
files = collectIterator(ctx, t, fs, iter, files)
|
||||
}
|
||||
return files
|
||||
}
|
||||
|
||||
func collectIterator(ctx context.Context, t *testing.T, fs ulfs.FilesystemLocal, iter ulfs.ObjectIterator, files []File) []File {
|
||||
for iter.Next() {
|
||||
func() {
|
||||
loc := iter.Item().Loc.Loc()
|
||||
|
||||
mrh, err := fs.Open(ctx, loc)
|
||||
require.NoError(t, err)
|
||||
defer func() { _ = mrh.Close() }()
|
||||
|
||||
rh, err := mrh.NextPart(ctx, -1)
|
||||
require.NoError(t, err)
|
||||
defer func() { _ = rh.Close() }()
|
||||
|
||||
data, err := ioutil.ReadAll(rh)
|
||||
require.NoError(t, err)
|
||||
files = append(files, File{
|
||||
Loc: loc,
|
||||
Contents: string(data),
|
||||
})
|
||||
}()
|
||||
}
|
||||
require.NoError(t, iter.Err())
|
||||
|
||||
return files
|
||||
}
|
||||
|
||||
type callbackState struct {
|
||||
stdin string
|
||||
fs ulfs.Filesystem
|
||||
rfs *remoteFilesystem
|
||||
}
|
||||
|
||||
// ExecuteOption allows one to control the environment that a command executes in.
|
||||
type ExecuteOption struct {
|
||||
fn func(t *testing.T, ctx clingy.Context, tfs *testFilesystem)
|
||||
fn func(t *testing.T, ctx clingy.Context, cs *callbackState)
|
||||
}
|
||||
|
||||
// WithFilesystem lets one do arbitrary setup on the filesystem in a callback.
|
||||
func WithFilesystem(cb func(t *testing.T, ctx clingy.Context, fs ulfs.Filesystem)) ExecuteOption {
|
||||
return ExecuteOption{func(t *testing.T, ctx clingy.Context, tfs *testFilesystem) {
|
||||
cb(t, ctx, tfs)
|
||||
return ExecuteOption{func(t *testing.T, ctx clingy.Context, cs *callbackState) {
|
||||
cb(t, ctx, cs.fs)
|
||||
}}
|
||||
}
|
||||
|
||||
// WithBucket ensures the bucket exists.
|
||||
func WithBucket(name string) ExecuteOption {
|
||||
return ExecuteOption{func(_ *testing.T, _ clingy.Context, tfs *testFilesystem) {
|
||||
tfs.ensureBucket(name)
|
||||
return ExecuteOption{func(_ *testing.T, _ clingy.Context, cs *callbackState) {
|
||||
cs.rfs.ensureBucket(name)
|
||||
}}
|
||||
}
|
||||
|
||||
// WithStdin sets the command to execute with the provided string as standard input.
|
||||
func WithStdin(stdin string) ExecuteOption {
|
||||
return ExecuteOption{func(_ *testing.T, _ clingy.Context, tfs *testFilesystem) {
|
||||
tfs.stdin = stdin
|
||||
return ExecuteOption{func(_ *testing.T, _ clingy.Context, cs *callbackState) {
|
||||
cs.stdin = stdin
|
||||
}}
|
||||
}
|
||||
|
||||
// WithFile sets the command to execute with a file created at the given location.
|
||||
func WithFile(location string, contents ...string) ExecuteOption {
|
||||
contents = append([]string(nil), contents...)
|
||||
return ExecuteOption{func(t *testing.T, ctx clingy.Context, tfs *testFilesystem) {
|
||||
return ExecuteOption{func(t *testing.T, ctx clingy.Context, cs *callbackState) {
|
||||
loc, err := ulloc.Parse(location)
|
||||
require.NoError(t, err)
|
||||
|
||||
if bucket, _, ok := loc.RemoteParts(); ok {
|
||||
tfs.ensureBucket(bucket)
|
||||
cs.rfs.ensureBucket(bucket)
|
||||
}
|
||||
|
||||
mwh, err := tfs.Create(ctx, loc, nil)
|
||||
mwh, err := cs.fs.Create(ctx, loc, nil)
|
||||
require.NoError(t, err)
|
||||
defer func() { _ = mwh.Abort(ctx) }()
|
||||
|
||||
@ -164,17 +224,17 @@ func WithFile(location string, contents ...string) ExecuteOption {
|
||||
// WithPendingFile sets the command to execute with a pending upload happening to
|
||||
// the provided location.
|
||||
func WithPendingFile(location string) ExecuteOption {
|
||||
return ExecuteOption{func(t *testing.T, ctx clingy.Context, tfs *testFilesystem) {
|
||||
return ExecuteOption{func(t *testing.T, ctx clingy.Context, cs *callbackState) {
|
||||
loc, err := ulloc.Parse(location)
|
||||
require.NoError(t, err)
|
||||
|
||||
if bucket, _, ok := loc.RemoteParts(); ok {
|
||||
tfs.ensureBucket(bucket)
|
||||
cs.rfs.ensureBucket(bucket)
|
||||
} else {
|
||||
t.Fatalf("Invalid pending local file: %s", loc)
|
||||
}
|
||||
|
||||
_, err = tfs.Create(ctx, loc, nil)
|
||||
_, err = cs.fs.Create(ctx, loc, nil)
|
||||
require.NoError(t, err)
|
||||
}}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user