From 8893884044bcbe636c093e64df2931a5fd6b87b5 Mon Sep 17 00:00:00 2001 From: Egon Elbre Date: Fri, 11 Jan 2019 13:26:39 +0200 Subject: [PATCH] convert piecestorage into a struct (#1024) --- cmd/storagenode/main.go | 2 +- examples/piecestore-cli/main.go | 127 --------- internal/testcontext/context.go | 2 +- internal/testplanet/planet.go | 2 +- pkg/piecestore/psserver/config.go | 8 +- pkg/piecestore/psserver/psdb/psdb.go | 39 ++- pkg/piecestore/psserver/psdb/psdb_test.go | 11 +- pkg/piecestore/psserver/retrieve.go | 5 +- pkg/piecestore/psserver/server.go | 22 +- pkg/piecestore/psserver/server_test.go | 93 ++++--- pkg/piecestore/psserver/store.go | 3 +- pkg/piecestore/pstore.go | 106 ++++---- pkg/piecestore/pstore_test.go | 309 ++++------------------ storagenode/peer.go | 5 +- storagenode/storagenodedb/database.go | 22 +- 15 files changed, 229 insertions(+), 527 deletions(-) delete mode 100644 examples/piecestore-cli/main.go diff --git a/cmd/storagenode/main.go b/cmd/storagenode/main.go index 41c67f35f..f7b4fd6bb 100644 --- a/cmd/storagenode/main.go +++ b/cmd/storagenode/main.go @@ -208,7 +208,7 @@ func cmdDiag(cmd *cobra.Command, args []string) (err error) { // open the sql db dbpath := filepath.Join(diagDir, "storage", "piecestore.db") - db, err := psdb.Open(context.Background(), "", dbpath) + db, err := psdb.Open(context.Background(), nil, dbpath) if err != nil { fmt.Println("Storagenode database couldnt open:", dbpath) return err diff --git a/examples/piecestore-cli/main.go b/examples/piecestore-cli/main.go deleted file mode 100644 index 16e8b12af..000000000 --- a/examples/piecestore-cli/main.go +++ /dev/null @@ -1,127 +0,0 @@ -// Copyright (C) 2018 Storj Labs, Inc. -// See LICENSE for copying information. - -package main - -import ( - "context" - "fmt" - "io" - "os" - - "github.com/spf13/cobra" - - "storj.io/storj/pkg/piecestore" - "storj.io/storj/pkg/process" -) - -func main() { - cobra.EnableCommandSorting = false - root := &cobra.Command{ - Use: "piecestore-cli", - Short: "piecestore example cli", - } - - root.AddCommand( - storeMain, - retrieveMain, - deleteMain, - ) - - process.Exec(root) -} - -var storeMain = &cobra.Command{ - Use: "store [id] [dataPath] [storeDir]", - Aliases: []string{"s"}, - Short: "Store data by id", - Args: cobra.ExactArgs(3), - ValidArgs: []string{"id", "datapath", "storedir"}, - - RunE: func(cmd *cobra.Command, args []string) error { - id := args[0] - path := args[1] - outputDir := args[2] - - file, err := os.Open(path) - if err != nil { - return err - } - - // Close the file when we are done - defer printError(file.Close) - - fileInfo, err := os.Stat(path) - if err != nil { - return err - } - - if fileInfo.IsDir() { - return fmt.Errorf("Path (%s) is a directory, not a file", path) - } - - dataFileChunk, err := pstore.StoreWriter(id, outputDir) - if err != nil { - return err - } - - // Close when finished - defer printError(dataFileChunk.Close) - - _, err = io.Copy(dataFileChunk, file) - - return err - }, -} - -var retrieveMain = &cobra.Command{ - Use: "retrieve [id] [storeDir]", - Aliases: []string{"r"}, - Args: cobra.ExactArgs(2), - Short: "Retrieve data by id and print to Stdout", - - RunE: func(cmd *cobra.Command, args []string) error { - id := args[0] - path := args[1] - - fileInfo, err := os.Stat(path) - if err != nil { - return err - } - - if !fileInfo.IsDir() { - return fmt.Errorf("Path (%s) is a file, not a directory", path) - } - - dataFileChunk, err := pstore.RetrieveReader(context.Background(), id, 0, -1, path) - if err != nil { - return err - } - - // Close when finished - defer printError(dataFileChunk.Close) - - _, err = io.Copy(os.Stdout, dataFileChunk) - return err - }, -} - -var deleteMain = &cobra.Command{ - Use: "delete [id] [storeDir]", - Aliases: []string{"d"}, - Args: cobra.ExactArgs(2), - Short: "Delete data by id", - - RunE: func(cmd *cobra.Command, args []string) error { - id := args[0] - directory := args[1] - return pstore.Delete(id, directory) - }, -} - -func printError(fn func() error) { - err := fn() - if err != nil { - fmt.Println(err) - } -} diff --git a/internal/testcontext/context.go b/internal/testcontext/context.go index f96cedb6b..0a866c03f 100644 --- a/internal/testcontext/context.go +++ b/internal/testcontext/context.go @@ -118,7 +118,7 @@ func (ctx *Context) Dir(subs ...string) string { }) dir := filepath.Join(append([]string{ctx.directory}, subs...)...) - _ = os.MkdirAll(dir, 0644) + _ = os.MkdirAll(dir, 0744) return dir } diff --git a/internal/testplanet/planet.go b/internal/testplanet/planet.go index a34688f1a..3645e1eb3 100644 --- a/internal/testplanet/planet.go +++ b/internal/testplanet/planet.go @@ -292,7 +292,7 @@ func (planet *Planet) newStorageNodes(count int) ([]*storagenode.Peer, error) { }, }, Storage: psserver.Config{ - Path: db.Disk(), + Path: "", // TODO: this argument won't be needed with master storagenodedb AllocatedDiskSpace: memory.TB.Int64(), AllocatedBandwidth: memory.TB.Int64(), KBucketRefreshInterval: time.Minute, diff --git a/pkg/piecestore/psserver/config.go b/pkg/piecestore/psserver/config.go index 4b1d3a010..83cd9c049 100644 --- a/pkg/piecestore/psserver/config.go +++ b/pkg/piecestore/psserver/config.go @@ -15,6 +15,7 @@ import ( "storj.io/storj/pkg/kademlia" "storj.io/storj/pkg/pb" + pstore "storj.io/storj/pkg/piecestore" "storj.io/storj/pkg/piecestore/psserver/agreementsender" "storj.io/storj/pkg/piecestore/psserver/psdb" "storj.io/storj/pkg/provider" @@ -39,11 +40,14 @@ func (c Config) Run(ctx context.Context, server *provider.Provider) (err error) ctx, cancel := context.WithCancel(ctx) //piecestore - db, err := psdb.Open(ctx, filepath.Join(c.Path, "piece-store-data"), filepath.Join(c.Path, "piecestore.db")) + storage := pstore.NewStorage(filepath.Join(c.Path, "piece-store-data")) + + db, err := psdb.Open(ctx, storage, filepath.Join(c.Path, "piecestore.db")) if err != nil { return ServerError.Wrap(err) } - s, err := NewEndpoint(zap.L(), c, db, server.Identity().Key) + + s, err := NewEndpoint(zap.L(), c, storage, db, server.Identity().Key) if err != nil { return err } diff --git a/pkg/piecestore/psserver/psdb/psdb.go b/pkg/piecestore/psserver/psdb/psdb.go index f8317f06d..bd762907f 100644 --- a/pkg/piecestore/psserver/psdb/psdb.go +++ b/pkg/piecestore/psserver/psdb/psdb.go @@ -36,10 +36,10 @@ var ( // DB is a piece store database type DB struct { - dataPath string - mu sync.Mutex - DB *sql.DB // TODO: hide - check *time.Ticker + storage *pstore.Storage + mu sync.Mutex + DB *sql.DB // TODO: hide + check *time.Ticker } // Agreement is a struct that contains a bandwidth agreement and the associated signature @@ -49,7 +49,7 @@ type Agreement struct { } // Open opens DB at DBPath -func Open(ctx context.Context, dataPath, DBPath string) (db *DB, err error) { +func Open(ctx context.Context, storage *pstore.Storage, DBPath string) (db *DB, err error) { defer mon.Task()(&ctx)(&err) if err = os.MkdirAll(filepath.Dir(DBPath), 0700); err != nil { @@ -61,9 +61,9 @@ func Open(ctx context.Context, dataPath, DBPath string) (db *DB, err error) { return nil, Error.Wrap(err) } db = &DB{ - DB: sqlite, - dataPath: dataPath, - check: time.NewTicker(*defaultCheckInterval), + DB: sqlite, + storage: storage, + check: time.NewTicker(*defaultCheckInterval), } if err := db.init(); err != nil { return nil, utils.CombineErrors(err, db.DB.Close()) @@ -75,7 +75,7 @@ func Open(ctx context.Context, dataPath, DBPath string) (db *DB, err error) { } // OpenInMemory opens sqlite DB inmemory -func OpenInMemory(ctx context.Context, dataPath string) (db *DB, err error) { +func OpenInMemory(ctx context.Context, storage *pstore.Storage) (db *DB, err error) { defer mon.Task()(&ctx)(&err) sqlite, err := sql.Open("sqlite3", ":memory:") @@ -84,9 +84,9 @@ func OpenInMemory(ctx context.Context, dataPath string) (db *DB, err error) { } db = &DB{ - DB: sqlite, - dataPath: dataPath, - check: time.NewTicker(*defaultCheckInterval), + DB: sqlite, + storage: storage, + check: time.NewTicker(*defaultCheckInterval), } if err := db.init(); err != nil { return nil, utils.CombineErrors(err, db.DB.Close()) @@ -187,16 +187,15 @@ func (db *DB) DeleteExpired(ctx context.Context) (err error) { return tx.Commit() }() - var errs []error - for _, id := range expired { - err := pstore.Delete(id, db.dataPath) - if err != nil { - errs = append(errs, err) + if db.storage != nil { + var errlist errs.Group + for _, id := range expired { + errlist.Add(db.storage.Delete(id)) } - } - if len(errs) > 0 { - return utils.CombineErrors(errs...) + if len(errlist) > 0 { + return errlist.Err() + } } return nil diff --git a/pkg/piecestore/psserver/psdb/psdb_test.go b/pkg/piecestore/psserver/psdb/psdb_test.go index 910d2a4c7..5c95d16f3 100644 --- a/pkg/piecestore/psserver/psdb/psdb_test.go +++ b/pkg/piecestore/psserver/psdb/psdb_test.go @@ -18,6 +18,7 @@ import ( "storj.io/storj/internal/teststorj" "storj.io/storj/pkg/pb" + pstore "storj.io/storj/pkg/piecestore" "storj.io/storj/pkg/storj" ) @@ -32,7 +33,9 @@ func newDB(t testing.TB) (*DB, func()) { } dbpath := filepath.Join(tmpdir, "psdb.db") - db, err := Open(ctx, "", dbpath) + storage := pstore.NewStorage(tmpdir) + + db, err := Open(ctx, storage, dbpath) if err != nil { t.Fatal(err) } @@ -42,6 +45,10 @@ func newDB(t testing.TB) (*DB, func()) { if err != nil { t.Fatal(err) } + err = storage.Close() + if err != nil { + t.Fatal(err) + } err = os.RemoveAll(tmpdir) if err != nil { @@ -51,7 +58,7 @@ func newDB(t testing.TB) (*DB, func()) { } func TestNewInmemory(t *testing.T) { - db, err := OpenInMemory(context.Background(), "") + db, err := OpenInMemory(context.Background(), nil) if err != nil { t.Fatal(err) } diff --git a/pkg/piecestore/psserver/retrieve.go b/pkg/piecestore/psserver/retrieve.go index 159a72983..b2c5b1293 100644 --- a/pkg/piecestore/psserver/retrieve.go +++ b/pkg/piecestore/psserver/retrieve.go @@ -16,7 +16,6 @@ import ( "storj.io/storj/internal/sync2" "storj.io/storj/pkg/pb" - "storj.io/storj/pkg/piecestore" "storj.io/storj/pkg/utils" ) @@ -59,7 +58,7 @@ func (s *Server) Retrieve(stream pb.PieceStoreRoutes_RetrieveServer) (err error) } // Get path to data being retrieved - path, err := pstore.PathByID(id, s.DataDir) + path, err := s.storage.PiecePath(id) if err != nil { return err } @@ -95,7 +94,7 @@ func (s *Server) Retrieve(stream pb.PieceStoreRoutes_RetrieveServer) (err error) func (s *Server) retrieveData(ctx context.Context, stream pb.PieceStoreRoutes_RetrieveServer, id string, offset, length int64) (retrieved, allocated int64, err error) { defer mon.Task()(&ctx)(&err) - storeFile, err := pstore.RetrieveReader(ctx, id, offset, length, s.DataDir) + storeFile, err := s.storage.Reader(ctx, id, offset, length) if err != nil { return 0, 0, err } diff --git a/pkg/piecestore/psserver/server.go b/pkg/piecestore/psserver/server.go index 19a7c7459..73abb4e63 100644 --- a/pkg/piecestore/psserver/server.go +++ b/pkg/piecestore/psserver/server.go @@ -57,7 +57,7 @@ func DirSize(path string) (int64, error) { // Server -- GRPC server meta data used in route calls type Server struct { log *zap.Logger - DataDir string + storage *pstore.Storage DB *psdb.DB pkey crypto.PrivateKey totalAllocated int64 @@ -66,8 +66,7 @@ type Server struct { } // NewEndpoint -- initializes a new endpoint for a piecestore server -func NewEndpoint(log *zap.Logger, config Config, db *psdb.DB, pkey crypto.PrivateKey) (*Server, error) { - +func NewEndpoint(log *zap.Logger, config Config, storage *pstore.Storage, db *psdb.DB, pkey crypto.PrivateKey) (*Server, error) { // read the allocated disk space from the config file allocatedDiskSpace := config.AllocatedDiskSpace allocatedBandwidth := config.AllocatedBandwidth @@ -122,7 +121,7 @@ func NewEndpoint(log *zap.Logger, config Config, db *psdb.DB, pkey crypto.Privat return &Server{ log: log, - DataDir: filepath.Join(config.Path, "piece-store-data"), + storage: storage, DB: db, pkey: pkey, totalAllocated: allocatedDiskSpace, @@ -132,10 +131,10 @@ func NewEndpoint(log *zap.Logger, config Config, db *psdb.DB, pkey crypto.Privat } // New creates a Server with custom db -func New(log *zap.Logger, dataDir string, db *psdb.DB, config Config, pkey crypto.PrivateKey) *Server { +func New(log *zap.Logger, storage *pstore.Storage, db *psdb.DB, config Config, pkey crypto.PrivateKey) *Server { return &Server{ log: log, - DataDir: dataDir, + storage: storage, DB: db, pkey: pkey, totalAllocated: config.AllocatedDiskSpace, @@ -148,7 +147,12 @@ func New(log *zap.Logger, dataDir string, db *psdb.DB, config Config, pkey crypt func (s *Server) Close() error { return nil } // Stop the piececstore node -func (s *Server) Stop(ctx context.Context) error { return s.DB.Close() } +func (s *Server) Stop(ctx context.Context) error { + return errs.Combine( + s.DB.Close(), + s.storage.Close(), + ) +} // Piece -- Send meta data about a stored by by Id func (s *Server) Piece(ctx context.Context, in *pb.PieceId) (*pb.PieceSummary, error) { @@ -164,7 +168,7 @@ func (s *Server) Piece(ctx context.Context, in *pb.PieceId) (*pb.PieceSummary, e return nil, err } - path, err := pstore.PathByID(id, s.DataDir) + path, err := s.storage.PiecePath(id) if err != nil { return nil, err } @@ -231,7 +235,7 @@ func (s *Server) Delete(ctx context.Context, in *pb.PieceDelete) (*pb.PieceDelet } func (s *Server) deleteByID(id string) error { - if err := pstore.Delete(id, s.DataDir); err != nil { + if err := s.storage.Delete(id); err != nil { return err } diff --git a/pkg/piecestore/psserver/server_test.go b/pkg/piecestore/psserver/server_test.go index c1357c456..b6e423b63 100644 --- a/pkg/piecestore/psserver/server_test.go +++ b/pkg/piecestore/psserver/server_test.go @@ -4,7 +4,6 @@ package psserver import ( - "bytes" "crypto" "crypto/ecdsa" "fmt" @@ -14,7 +13,6 @@ import ( "math" "net" "os" - "path" "path/filepath" "runtime" "strings" @@ -24,10 +22,12 @@ import ( "github.com/gtank/cryptopasta" _ "github.com/mattn/go-sqlite3" "github.com/stretchr/testify/assert" + "github.com/zeebo/errs" "go.uber.org/zap/zaptest" "golang.org/x/net/context" "google.golang.org/grpc" + "storj.io/storj/internal/testcontext" "storj.io/storj/internal/testidentity" "storj.io/storj/internal/teststorj" "storj.io/storj/pkg/pb" @@ -36,33 +36,30 @@ import ( "storj.io/storj/pkg/storj" ) -var ctx = context.Background() - -func writeFileToDir(name, dir string) error { - file, err := pstore.StoreWriter(name, dir) +func (TS *TestServer) writeFile(pieceID string) error { + file, err := TS.s.storage.Writer(pieceID) if err != nil { return err } - // Close when finished - _, err = io.Copy(file, bytes.NewReader([]byte("butts"))) - if err != nil { - _ = file.Close() - return err - } - return file.Close() + _, err = file.Write([]byte("xyzwq")) + return errs.Combine(err, file.Close()) + } func TestPiece(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + TS := NewTestServer(t) defer TS.Stop() - if err := writeFileToDir("11111111111111111111", TS.s.DataDir); err != nil { + if err := TS.writeFile("11111111111111111111"); err != nil { t.Errorf("Error: %v\nCould not create test piece", err) return } - defer func() { _ = pstore.Delete("11111111111111111111", TS.s.DataDir) }() + defer func() { _ = TS.s.storage.Delete("11111111111111111111") }() // set up test cases tests := []struct { @@ -81,13 +78,16 @@ func TestPiece(t *testing.T) { id: "123", size: 5, expiration: 9999999999, - err: "rpc error: code = Unknown desc = argError: invalid id length", + err: "rpc error: code = Unknown desc = piecestore error: invalid id length", }, { // server should err with nonexistent file id: "22222222222222222222", size: 5, expiration: 9999999999, - err: fmt.Sprintf("rpc error: code = Unknown desc = stat %s: no such file or directory", path.Join(TS.s.DataDir, "/22/22/2222222222222222")), + err: fmt.Sprintf("rpc error: code = Unknown desc = stat %s: no such file or directory", func() string { + path, _ := TS.s.storage.PiecePath("22222222222222222222") + return path + }()), }, { // server should err with invalid TTL id: "22222222222222222222;DELETE*FROM TTL;;;;", @@ -135,16 +135,18 @@ func TestPiece(t *testing.T) { func TestRetrieve(t *testing.T) { t.Skip("broken test") + ctx := testcontext.New(t) + defer ctx.Cleanup() + TS := NewTestServer(t) defer TS.Stop() - // simulate piece stored with storagenode - if err := writeFileToDir("11111111111111111111", TS.s.DataDir); err != nil { + if err := TS.writeFile("11111111111111111111"); err != nil { t.Errorf("Error: %v\nCould not create test piece", err) return } - defer func() { _ = pstore.Delete("11111111111111111111", TS.s.DataDir) }() + defer func() { _ = TS.s.storage.Delete("11111111111111111111") }() // set up test cases tests := []struct { @@ -162,7 +164,7 @@ func TestRetrieve(t *testing.T) { respSize: 5, allocSize: 5, offset: 0, - content: []byte("butts"), + content: []byte("xyzwq"), err: "", }, { // should successfully retrieve data in customizeable increments @@ -171,7 +173,7 @@ func TestRetrieve(t *testing.T) { respSize: 5, allocSize: 2, offset: 0, - content: []byte("butts"), + content: []byte("xyzwq"), err: "", }, { // should successfully retrieve data with lower allocations @@ -189,7 +191,7 @@ func TestRetrieve(t *testing.T) { respSize: 5, allocSize: 5, offset: 0, - content: []byte("butts"), + content: []byte("xyzwq"), err: "", }, { // server should err with invalid id @@ -198,8 +200,8 @@ func TestRetrieve(t *testing.T) { respSize: 5, allocSize: 5, offset: 0, - content: []byte("butts"), - err: "rpc error: code = Unknown desc = argError: invalid id length", + content: []byte("xyzwq"), + err: "rpc error: code = Unknown desc = piecestore error: invalid id length", }, { // server should err with nonexistent file id: "22222222222222222222", @@ -207,8 +209,11 @@ func TestRetrieve(t *testing.T) { respSize: 5, allocSize: 5, offset: 0, - content: []byte("butts"), - err: fmt.Sprintf("rpc error: code = Unknown desc = retrieve error: stat %s: no such file or directory", path.Join(TS.s.DataDir, "/22/22/2222222222222222")), + content: []byte("xyzwq"), + err: fmt.Sprintf("rpc error: code = Unknown desc = retrieve error: stat %s: no such file or directory", func() string { + path, _ := TS.s.storage.PiecePath("22222222222222222222") + return path + }()), }, { // server should return expected content and respSize with offset and excess reqSize id: "11111111111111111111", @@ -292,6 +297,9 @@ func TestRetrieve(t *testing.T) { } func TestStore(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + TS := NewTestServer(t) defer TS.Stop() @@ -308,7 +316,7 @@ func TestStore(t *testing.T) { { // should successfully store data id: "99999999999999999999", ttl: 9999999999, - content: []byte("butts"), + content: []byte("xyzwq"), message: "OK", totalReceived: 5, err: "", @@ -316,15 +324,15 @@ func TestStore(t *testing.T) { { // should err with invalid id length id: "butts", ttl: 9999999999, - content: []byte("butts"), + content: []byte("xyzwq"), message: "", totalReceived: 0, - err: "rpc error: code = Unknown desc = argError: invalid id length", + err: "rpc error: code = Unknown desc = piecestore error: invalid id length", }, { // should err with piece ID not specified id: "", ttl: 9999999999, - content: []byte("butts"), + content: []byte("xyzwq"), message: "", totalReceived: 0, err: "rpc error: code = Unknown desc = store error: piece ID not specified", @@ -417,6 +425,9 @@ func TestStore(t *testing.T) { } func TestPbaValidation(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + TS := NewTestServer(t) defer TS.Stop() @@ -497,6 +508,9 @@ func TestPbaValidation(t *testing.T) { } func TestDelete(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + TS := NewTestServer(t) defer TS.Stop() @@ -515,8 +529,8 @@ func TestDelete(t *testing.T) { }, { // should err with invalid id length id: "123", - message: "rpc error: code = Unknown desc = argError: invalid id length", - err: "rpc error: code = Unknown desc = argError: invalid id length", + message: "rpc error: code = Unknown desc = piecestore error: invalid id length", + err: "rpc error: code = Unknown desc = piecestore error: invalid id length", }, { // should return OK with nonexistent file id: "22222222222222222223", @@ -530,7 +544,7 @@ func TestDelete(t *testing.T) { assert := assert.New(t) // simulate piece stored with storagenode - if err := writeFileToDir("11111111111111111111", TS.s.DataDir); err != nil { + if err := TS.writeFile("11111111111111111111"); err != nil { t.Errorf("Error: %v\nCould not create test piece", err) return } @@ -545,7 +559,7 @@ func TestDelete(t *testing.T) { }() defer func() { - assert.NoError(pstore.Delete("11111111111111111111", TS.s.DataDir)) + assert.NoError(TS.s.storage.Delete("11111111111111111111")) }() req := &pb.PieceDelete{Id: tt.id} @@ -560,7 +574,7 @@ func TestDelete(t *testing.T) { assert.Equal(tt.message, resp.GetMessage()) // if test passes, check if file was indeed deleted - filePath, err := pstore.PathByID(tt.id, TS.s.DataDir) + filePath, err := TS.s.storage.PiecePath(tt.id) assert.NoError(err) if _, err = os.Stat(filePath); os.IsExist(err) { t.Errorf("File not deleted") @@ -578,8 +592,9 @@ func newTestServerStruct(t *testing.T) (*Server, func()) { tempDBPath := filepath.Join(tmp, "test.db") tempDir := filepath.Join(tmp, "test-data", "3000") + storage := pstore.NewStorage(tempDir) - psDB, err := psdb.Open(ctx, tempDir, tempDBPath) + psDB, err := psdb.Open(context.TODO(), storage, tempDBPath) if err != nil { t.Fatalf("failed open psdb: %v", err) } @@ -589,14 +604,14 @@ func newTestServerStruct(t *testing.T) (*Server, func()) { } server := &Server{ log: zaptest.NewLogger(t), - DataDir: tempDir, + storage: storage, DB: psDB, verifier: verifier, totalAllocated: math.MaxInt64, totalBwAllocated: math.MaxInt64, } return server, func() { - if serr := server.Stop(ctx); serr != nil { + if serr := server.Stop(context.TODO()); serr != nil { t.Fatal(serr) } // TODO:fix this error check diff --git a/pkg/piecestore/psserver/store.go b/pkg/piecestore/psserver/store.go index deeb731e3..59c4baa7a 100644 --- a/pkg/piecestore/psserver/store.go +++ b/pkg/piecestore/psserver/store.go @@ -13,7 +13,6 @@ import ( "go.uber.org/zap" "storj.io/storj/pkg/pb" - "storj.io/storj/pkg/piecestore" "storj.io/storj/pkg/utils" ) @@ -87,7 +86,7 @@ func (s *Server) storeData(ctx context.Context, stream pb.PieceStoreRoutes_Store }() // Initialize file for storing data - storeFile, err := pstore.StoreWriter(id, s.DataDir) + storeFile, err := s.storage.Writer(id) if err != nil { return 0, err } diff --git a/pkg/piecestore/pstore.go b/pkg/piecestore/pstore.go index a7dce8d2e..92fcc5cf6 100644 --- a/pkg/piecestore/pstore.go +++ b/pkg/piecestore/pstore.go @@ -7,7 +7,6 @@ import ( "context" "io" "os" - "path" "path/filepath" "github.com/zeebo/errs" @@ -15,84 +14,82 @@ import ( "storj.io/storj/pkg/ranger" ) +// Storage stores piecestore pieces +type Storage struct { + dir string +} + +// NewStorage creates database for storing pieces +func NewStorage(dir string) *Storage { + return &Storage{dir} +} + +// Close closes resources +func (storage *Storage) Close() error { return nil } + // IDLength -- Minimum ID length const IDLength = 20 // Errors var ( - ArgError = errs.Class("argError") - FSError = errs.Class("fsError") + Error = errs.Class("piecestore error") ) -// PathByID creates datapath from id and dir -func PathByID(id, dir string) (string, error) { - if len(id) < IDLength { - return "", ArgError.New("invalid id length") - } - if dir == "" { - return "", ArgError.New("no path provided") +// PiecePath creates piece storage path from id and dir +func (storage *Storage) PiecePath(pieceID string) (string, error) { + if len(pieceID) < IDLength { + return "", Error.New("invalid id length") } - folder1 := id[0:2] - folder2 := id[2:4] - fileName := id[4:] - - return path.Join(dir, folder1, folder2, fileName), nil + folder1, folder2, filename := pieceID[0:2], pieceID[2:4], pieceID[4:] + return filepath.Join(storage.dir, folder1, folder2, filename), nil } -// StoreWriter stores data into piece store in multiple writes -// id is the id of the data to be stored -// dir is the pstore directory containing all other data stored -// returns error if failed and nil if successful -func StoreWriter(id string, dir string) (io.WriteCloser, error) { - dataPath, err := PathByID(id, dir) +// Writer returns a writer that can be used to store piece. +func (storage *Storage) Writer(pieceID string) (io.WriteCloser, error) { + path, err := storage.PiecePath(pieceID) if err != nil { return nil, err } - // Create directory path on file system - if err = os.MkdirAll(filepath.Dir(dataPath), 0700); err != nil { - return nil, err + if err = os.MkdirAll(filepath.Dir(path), 0700); err != nil { + return nil, Error.Wrap(err) } - // Create File on file system - return os.OpenFile(dataPath, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0600) + file, err := os.OpenFile(path, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0600) + if err != nil { + return nil, Error.Wrap(err) + } + + return file, nil } -// RetrieveReader retrieves data from pstore directory -// id is the id of the stored data -// offset is the offset of the data that you are reading. Useful for multiple connections to split the data transfer -// length is the amount of data to read. Read all data if -1 -// dir is the pstore directory containing all other data stored -// returns error if failed and nil if successful -func RetrieveReader(ctx context.Context, id string, offset int64, length int64, dir string) (io.ReadCloser, error) { - dataPath, err := PathByID(id, dir) +// Reader returns a reader for the specified piece at the location +func (storage *Storage) Reader(ctx context.Context, pieceID string, offset int64, length int64) (io.ReadCloser, error) { + path, err := storage.PiecePath(pieceID) if err != nil { return nil, err } - fileInfo, err := os.Stat(dataPath) + info, err := os.Stat(path) if err != nil { return nil, err } - // If offset is greater than file size return - if offset >= fileInfo.Size() || offset < 0 { - return nil, ArgError.New("invalid offset: %v", offset) + if offset >= info.Size() || offset < 0 { + return nil, Error.New("invalid offset: %v", offset) } - // If length less than 0 read the entire file if length <= -1 { - length = fileInfo.Size() + length = info.Size() } // If trying to read past the end of the file, just read to the end - if fileInfo.Size() < offset+length { - length = fileInfo.Size() - offset + if info.Size() < offset+length { + length = info.Size() - offset } - // Created a section reader so that we can concurrently retrieve the same file. - rr, err := ranger.FileRanger(dataPath) + rr, err := ranger.FileRanger(path) if err != nil { return nil, err } @@ -100,23 +97,16 @@ func RetrieveReader(ctx context.Context, id string, offset int64, length int64, return rr.Range(ctx, offset, length) } -// Delete deletes data from storagenode -// id is the id of the data to be stored -// dir is the pstore directory containing all other data stored -// returns error if failed and nil if successful -func Delete(id string, dir string) error { - dataPath, err := PathByID(id, dir) +// Delete deletes piece from storage +func (storage *Storage) Delete(pieceID string) error { + path, err := storage.PiecePath(pieceID) if err != nil { return err } - if _, err = os.Stat(dataPath); os.IsNotExist(err) { - return nil + err = os.Remove(path) + if os.IsNotExist(err) { + err = nil } - - if err = os.Remove(dataPath); err != nil { - return err - } - - return nil + return err } diff --git a/pkg/piecestore/pstore_test.go b/pkg/piecestore/pstore_test.go index 754eb752f..32d44111d 100644 --- a/pkg/piecestore/pstore_test.go +++ b/pkg/piecestore/pstore_test.go @@ -4,271 +4,78 @@ package pstore import ( - "context" - "os" - "path" - "path/filepath" + "bytes" + "io" + "io/ioutil" + "math/rand" + "strings" "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "storj.io/storj/internal/testcontext" ) func TestStore(t *testing.T) { - tests := []struct { - it string - id string - content []byte - expectedContent []byte - err string - }{ - { - it: "should successfully store data", - id: "0123456789ABCDEFGHIJ", - content: []byte("butts"), - expectedContent: []byte("butts"), - err: "", - }, - { - it: "should return an error when given an invalid id", - id: "012", - content: []byte("butts"), - expectedContent: []byte("butts"), - err: "argError: invalid id length", - }, + ctx := testcontext.New(t) + defer ctx.Cleanup() + + store := NewStorage(ctx.Dir("example")) + defer ctx.Check(store.Close) + + pieceID := strings.Repeat("AB01", 10) + + source := make([]byte, 8000) + _, _ = rand.Read(source[:]) + + { // write data + w, err := store.Writer(pieceID) + require.NoError(t, err) + + n, err := io.Copy(w, bytes.NewReader(source)) + assert.Equal(t, n, int64(len(source))) + assert.NoError(t, err) + + assert.NoError(t, w.Close()) } - for _, tt := range tests { - t.Run(tt.it, func(t *testing.T) { - assert := assert.New(t) - storeFile, err := StoreWriter(tt.id, os.TempDir()) - if tt.err != "" { - if assert.NotNil(err) { - assert.Equal(tt.err, err.Error()) - } - return - } else if err != nil { - t.Errorf("Error: %s", err.Error()) - return + { // valid reads + read := func(offset, length int64) []byte { + reader, err := store.Reader(ctx, pieceID, offset, length) + if assert.NoError(t, err) { + data, err := ioutil.ReadAll(reader) + assert.NoError(t, err) + assert.NoError(t, reader.Close()) + return data } + return nil + } - // Write chunk received to disk - _, err = storeFile.Write(tt.content) - assert.NoError(err) + assert.Equal(t, source, read(0, -1)) + assert.Equal(t, source, read(0, 16000)) - assert.NoError(storeFile.Close()) + assert.Equal(t, source[10:1010], read(10, 1000)) + assert.Equal(t, source[10:11], read(10, 1)) + } - folder1 := tt.id[0:2] - folder2 := tt.id[2:4] - fileName := tt.id[4:] - - createdFilePath := path.Join(os.TempDir(), folder1, folder2, fileName) - - createdFile, err := os.Open(createdFilePath) - if err != nil { - t.Errorf("Error: %s opening created file %s", err.Error(), createdFilePath) - return + { // invalid reads + badread := func(offset, length int64) error { + reader, err := store.Reader(ctx, pieceID, offset, length) + if err == nil { + assert.NoError(t, reader.Close()) } - buffer := make([]byte, int64(len(tt.content))) - _, _ = createdFile.Seek(0, 0) - _, _ = createdFile.Read(buffer) + return err + } - assert.NoError(createdFile.Close()) - assert.NoError(os.RemoveAll(path.Join(os.TempDir(), folder1))) + assert.Error(t, badread(-100, 0)) + assert.Error(t, badread(-100, -10)) + } - if string(buffer) != string(tt.expectedContent) { - t.Errorf("Expected data butts does not equal Actual data %s", string(buffer)) - return - } - }) - } -} - -func TestRetrieve(t *testing.T) { - t.Skip("flaky") - - tests := []struct { - it string - id string - size int64 - offset int64 - content []byte - expectedContent []byte - err string - }{ - { - it: "should successfully retrieve data", - id: "0123456789ABCDEFGHIJ1", - size: 5, - offset: 0, - content: []byte("butts"), - expectedContent: []byte("butts"), - err: "", - }, - { - it: "should successfully retrieve data by offset", - id: "0123456789ABCDEFGHIJ2", - size: 5, - offset: 5, - content: []byte("butts"), - expectedContent: []byte("butts"), - err: "", - }, - { - it: "should successfully retrieve data by chunk", - id: "0123456789ABCDEFGHIJ3", - size: 2, - offset: 5, - content: []byte("bu"), - expectedContent: []byte("bu"), - err: "", - }, - { - it: "should return an error when given negative offset", - id: "0123456789ABCDEFGHIJ4", - size: 0, - offset: -1337, - content: []byte("butts"), - expectedContent: []byte(""), - err: "argError: invalid offset: -1337", - }, - { - it: "should successfully retrieve data with negative length", - id: "0123456789ABCDEFGHIJ5", - size: -1, - offset: 0, - content: []byte("butts"), - expectedContent: []byte("butts"), - err: "", - }, - } - - for _, tt := range tests { - t.Run(tt.it, func(t *testing.T) { - assert := assert.New(t) - - folder1 := tt.id[0:2] - folder2 := tt.id[2:4] - fileName := tt.id[4:] - - tmpdir := filepath.Join(os.TempDir(), folder1) - defer func() { - assert.NoError(os.RemoveAll(tmpdir)) - }() - - createdFilePath := path.Join(tmpdir, folder2, fileName) - - if err := os.MkdirAll(filepath.Dir(createdFilePath), 0700); err != nil { - t.Errorf("Error: %s when creating dir", err.Error()) - return - } - - createdFile, err := os.OpenFile(createdFilePath, os.O_RDWR|os.O_CREATE, 0755) - if err != nil { - t.Errorf("Error: %s opening created file %s", err.Error(), createdFilePath) - return - } - - _, err = createdFile.Seek(tt.offset, 0) - if tt.offset < 0 { - assert.Error(err) - } - _, err = createdFile.Write(tt.content) - if err != nil { - t.Errorf("Error: %s writing to created file", err.Error()) - return - } - assert.NoError(createdFile.Close()) - - storeFile, err := RetrieveReader(context.Background(), tt.id, tt.offset, tt.size, os.TempDir()) - if tt.err != "" { - if assert.NotNil(err) { - assert.Equal(tt.err, err.Error()) - } - return - } else if err != nil { - t.Errorf("Error: %s", err.Error()) - return - } - - size := tt.size - if tt.size < 0 { - size = int64(len(tt.content)) - } - buffer := make([]byte, size) - _, err = storeFile.Read(buffer) - assert.NoError(err) - - assert.NoError(storeFile.Close()) - - if string(buffer) != string(tt.expectedContent) { - t.Errorf("Expected data butts does not equal Actual data %s", string(buffer)) - return - } - }) - } -} - -func TestDelete(t *testing.T) { - tests := []struct { - it string - id string - err string - }{ - { - it: "should successfully delete data", - id: "11111111111111111111", - err: "", - }, - { - it: "should return nil-err with non-existent id", - id: "11111111111111111111", - err: "", - }, - { - it: "should err with invalid id length", - id: "111111", - err: "argError: invalid id length", - }, - } - - for _, tt := range tests { - t.Run(tt.it, func(t *testing.T) { - assert := assert.New(t) - - folder1 := tt.id[0:2] - folder2 := tt.id[2:4] - fileName := tt.id[4:] - - createdFilePath := path.Join(os.TempDir(), folder1, folder2, fileName) - - if err := os.MkdirAll(filepath.Dir(createdFilePath), 0700); err != nil { - t.Errorf("Error: %s when creating dir", err.Error()) - return - } - - createdFile, err := os.OpenFile(createdFilePath, os.O_RDWR|os.O_CREATE, 0755) - if err != nil { - t.Errorf("Error: %s opening created file %s", err.Error(), createdFilePath) - return - } - - assert.NoError(createdFile.Close()) - - err = Delete(tt.id, os.TempDir()) - if tt.err != "" { - if assert.NotNil(err) { - assert.Equal(tt.err, err.Error()) - } - return - } else if err != nil { - t.Errorf("Error: %s", err.Error()) - return - } - - if _, err = os.Stat(createdFilePath); os.IsExist(err) { - t.Errorf("Error deleting file") - return - } - }) + { // test delete + assert.NoError(t, store.Delete(pieceID)) + + _, err := store.Reader(ctx, pieceID, 0, -1) + assert.Error(t, err) } } diff --git a/storagenode/peer.go b/storagenode/peer.go index 8f8e71ca0..d6ff1960c 100644 --- a/storagenode/peer.go +++ b/storagenode/peer.go @@ -16,6 +16,7 @@ import ( "storj.io/storj/pkg/kademlia" "storj.io/storj/pkg/node" "storj.io/storj/pkg/pb" + pstore "storj.io/storj/pkg/piecestore" "storj.io/storj/pkg/piecestore/psserver" "storj.io/storj/pkg/piecestore/psserver/psdb" "storj.io/storj/pkg/server" @@ -26,7 +27,7 @@ import ( // DB is the master database for Storage Node type DB interface { // TODO: use better interfaces - Disk() string + Storage() *pstore.Storage PSDB() *psdb.DB RoutingTable() (kdb, ndb storage.KeyValueStore) } @@ -135,7 +136,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config) (*P config := config.Storage // TODO: psserver shouldn't need the private key - peer.Piecestore = psserver.New(peer.Log.Named("piecestore"), peer.DB.Disk(), peer.DB.PSDB(), config, peer.Identity.Key) + peer.Piecestore = psserver.New(peer.Log.Named("piecestore"), peer.DB.Storage(), peer.DB.PSDB(), config, peer.Identity.Key) pb.RegisterPieceStoreRoutesServer(peer.Public.Server.GRPC(), peer.Piecestore) } diff --git a/storagenode/storagenodedb/database.go b/storagenode/storagenodedb/database.go index 7985b917e..67dbc241d 100644 --- a/storagenode/storagenodedb/database.go +++ b/storagenode/storagenodedb/database.go @@ -8,6 +8,7 @@ import ( "github.com/zeebo/errs" + pstore "storj.io/storj/pkg/piecestore" "storj.io/storj/pkg/piecestore/psserver/psdb" "storj.io/storj/storage" "storj.io/storj/storage/teststore" @@ -18,7 +19,7 @@ var _ storagenode.DB = (*DB)(nil) // DB contains access to different database tables type DB struct { - disk string + storage *pstore.Storage psdb *psdb.DB kdb, ndb storage.KeyValueStore } @@ -26,17 +27,19 @@ type DB struct { // NewInMemory creates new inmemory database for storagenode // TODO: still stores data on disk func NewInMemory(storageDir string) (*DB, error) { + storage := pstore.NewStorage(storageDir) + // TODO: OpenInMemory shouldn't need context argument - psdb, err := psdb.OpenInMemory(context.TODO(), storageDir) + psdb, err := psdb.OpenInMemory(context.TODO(), storage) if err != nil { return nil, err } return &DB{ - disk: storageDir, - psdb: psdb, - kdb: teststore.New(), - ndb: teststore.New(), + storage: storage, + psdb: psdb, + kdb: teststore.New(), + ndb: teststore.New(), }, nil } @@ -46,12 +49,13 @@ func (db *DB) Close() error { db.psdb.Close(), db.kdb.Close(), db.ndb.Close(), + db.storage.Close(), ) } -// Disk returns piecestore data folder -func (db *DB) Disk() string { - return db.disk +// Storage returns piecestore location +func (db *DB) Storage() *pstore.Storage { + return db.storage } // PSDB returns piecestore database