convert piecestorage into a struct (#1024)

This commit is contained in:
Egon Elbre 2019-01-11 13:26:39 +02:00 committed by GitHub
parent dbc8d3ec91
commit 8893884044
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 229 additions and 527 deletions

View File

@ -208,7 +208,7 @@ func cmdDiag(cmd *cobra.Command, args []string) (err error) {
// open the sql db
dbpath := filepath.Join(diagDir, "storage", "piecestore.db")
db, err := psdb.Open(context.Background(), "", dbpath)
db, err := psdb.Open(context.Background(), nil, dbpath)
if err != nil {
fmt.Println("Storagenode database couldnt open:", dbpath)
return err

View File

@ -1,127 +0,0 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"context"
"fmt"
"io"
"os"
"github.com/spf13/cobra"
"storj.io/storj/pkg/piecestore"
"storj.io/storj/pkg/process"
)
func main() {
cobra.EnableCommandSorting = false
root := &cobra.Command{
Use: "piecestore-cli",
Short: "piecestore example cli",
}
root.AddCommand(
storeMain,
retrieveMain,
deleteMain,
)
process.Exec(root)
}
var storeMain = &cobra.Command{
Use: "store [id] [dataPath] [storeDir]",
Aliases: []string{"s"},
Short: "Store data by id",
Args: cobra.ExactArgs(3),
ValidArgs: []string{"id", "datapath", "storedir"},
RunE: func(cmd *cobra.Command, args []string) error {
id := args[0]
path := args[1]
outputDir := args[2]
file, err := os.Open(path)
if err != nil {
return err
}
// Close the file when we are done
defer printError(file.Close)
fileInfo, err := os.Stat(path)
if err != nil {
return err
}
if fileInfo.IsDir() {
return fmt.Errorf("Path (%s) is a directory, not a file", path)
}
dataFileChunk, err := pstore.StoreWriter(id, outputDir)
if err != nil {
return err
}
// Close when finished
defer printError(dataFileChunk.Close)
_, err = io.Copy(dataFileChunk, file)
return err
},
}
var retrieveMain = &cobra.Command{
Use: "retrieve [id] [storeDir]",
Aliases: []string{"r"},
Args: cobra.ExactArgs(2),
Short: "Retrieve data by id and print to Stdout",
RunE: func(cmd *cobra.Command, args []string) error {
id := args[0]
path := args[1]
fileInfo, err := os.Stat(path)
if err != nil {
return err
}
if !fileInfo.IsDir() {
return fmt.Errorf("Path (%s) is a file, not a directory", path)
}
dataFileChunk, err := pstore.RetrieveReader(context.Background(), id, 0, -1, path)
if err != nil {
return err
}
// Close when finished
defer printError(dataFileChunk.Close)
_, err = io.Copy(os.Stdout, dataFileChunk)
return err
},
}
var deleteMain = &cobra.Command{
Use: "delete [id] [storeDir]",
Aliases: []string{"d"},
Args: cobra.ExactArgs(2),
Short: "Delete data by id",
RunE: func(cmd *cobra.Command, args []string) error {
id := args[0]
directory := args[1]
return pstore.Delete(id, directory)
},
}
func printError(fn func() error) {
err := fn()
if err != nil {
fmt.Println(err)
}
}

View File

@ -118,7 +118,7 @@ func (ctx *Context) Dir(subs ...string) string {
})
dir := filepath.Join(append([]string{ctx.directory}, subs...)...)
_ = os.MkdirAll(dir, 0644)
_ = os.MkdirAll(dir, 0744)
return dir
}

View File

@ -292,7 +292,7 @@ func (planet *Planet) newStorageNodes(count int) ([]*storagenode.Peer, error) {
},
},
Storage: psserver.Config{
Path: db.Disk(),
Path: "", // TODO: this argument won't be needed with master storagenodedb
AllocatedDiskSpace: memory.TB.Int64(),
AllocatedBandwidth: memory.TB.Int64(),
KBucketRefreshInterval: time.Minute,

View File

@ -15,6 +15,7 @@ import (
"storj.io/storj/pkg/kademlia"
"storj.io/storj/pkg/pb"
pstore "storj.io/storj/pkg/piecestore"
"storj.io/storj/pkg/piecestore/psserver/agreementsender"
"storj.io/storj/pkg/piecestore/psserver/psdb"
"storj.io/storj/pkg/provider"
@ -39,11 +40,14 @@ func (c Config) Run(ctx context.Context, server *provider.Provider) (err error)
ctx, cancel := context.WithCancel(ctx)
//piecestore
db, err := psdb.Open(ctx, filepath.Join(c.Path, "piece-store-data"), filepath.Join(c.Path, "piecestore.db"))
storage := pstore.NewStorage(filepath.Join(c.Path, "piece-store-data"))
db, err := psdb.Open(ctx, storage, filepath.Join(c.Path, "piecestore.db"))
if err != nil {
return ServerError.Wrap(err)
}
s, err := NewEndpoint(zap.L(), c, db, server.Identity().Key)
s, err := NewEndpoint(zap.L(), c, storage, db, server.Identity().Key)
if err != nil {
return err
}

View File

@ -36,10 +36,10 @@ var (
// DB is a piece store database
type DB struct {
dataPath string
mu sync.Mutex
DB *sql.DB // TODO: hide
check *time.Ticker
storage *pstore.Storage
mu sync.Mutex
DB *sql.DB // TODO: hide
check *time.Ticker
}
// Agreement is a struct that contains a bandwidth agreement and the associated signature
@ -49,7 +49,7 @@ type Agreement struct {
}
// Open opens DB at DBPath
func Open(ctx context.Context, dataPath, DBPath string) (db *DB, err error) {
func Open(ctx context.Context, storage *pstore.Storage, DBPath string) (db *DB, err error) {
defer mon.Task()(&ctx)(&err)
if err = os.MkdirAll(filepath.Dir(DBPath), 0700); err != nil {
@ -61,9 +61,9 @@ func Open(ctx context.Context, dataPath, DBPath string) (db *DB, err error) {
return nil, Error.Wrap(err)
}
db = &DB{
DB: sqlite,
dataPath: dataPath,
check: time.NewTicker(*defaultCheckInterval),
DB: sqlite,
storage: storage,
check: time.NewTicker(*defaultCheckInterval),
}
if err := db.init(); err != nil {
return nil, utils.CombineErrors(err, db.DB.Close())
@ -75,7 +75,7 @@ func Open(ctx context.Context, dataPath, DBPath string) (db *DB, err error) {
}
// OpenInMemory opens sqlite DB inmemory
func OpenInMemory(ctx context.Context, dataPath string) (db *DB, err error) {
func OpenInMemory(ctx context.Context, storage *pstore.Storage) (db *DB, err error) {
defer mon.Task()(&ctx)(&err)
sqlite, err := sql.Open("sqlite3", ":memory:")
@ -84,9 +84,9 @@ func OpenInMemory(ctx context.Context, dataPath string) (db *DB, err error) {
}
db = &DB{
DB: sqlite,
dataPath: dataPath,
check: time.NewTicker(*defaultCheckInterval),
DB: sqlite,
storage: storage,
check: time.NewTicker(*defaultCheckInterval),
}
if err := db.init(); err != nil {
return nil, utils.CombineErrors(err, db.DB.Close())
@ -187,16 +187,15 @@ func (db *DB) DeleteExpired(ctx context.Context) (err error) {
return tx.Commit()
}()
var errs []error
for _, id := range expired {
err := pstore.Delete(id, db.dataPath)
if err != nil {
errs = append(errs, err)
if db.storage != nil {
var errlist errs.Group
for _, id := range expired {
errlist.Add(db.storage.Delete(id))
}
}
if len(errs) > 0 {
return utils.CombineErrors(errs...)
if len(errlist) > 0 {
return errlist.Err()
}
}
return nil

View File

@ -18,6 +18,7 @@ import (
"storj.io/storj/internal/teststorj"
"storj.io/storj/pkg/pb"
pstore "storj.io/storj/pkg/piecestore"
"storj.io/storj/pkg/storj"
)
@ -32,7 +33,9 @@ func newDB(t testing.TB) (*DB, func()) {
}
dbpath := filepath.Join(tmpdir, "psdb.db")
db, err := Open(ctx, "", dbpath)
storage := pstore.NewStorage(tmpdir)
db, err := Open(ctx, storage, dbpath)
if err != nil {
t.Fatal(err)
}
@ -42,6 +45,10 @@ func newDB(t testing.TB) (*DB, func()) {
if err != nil {
t.Fatal(err)
}
err = storage.Close()
if err != nil {
t.Fatal(err)
}
err = os.RemoveAll(tmpdir)
if err != nil {
@ -51,7 +58,7 @@ func newDB(t testing.TB) (*DB, func()) {
}
func TestNewInmemory(t *testing.T) {
db, err := OpenInMemory(context.Background(), "")
db, err := OpenInMemory(context.Background(), nil)
if err != nil {
t.Fatal(err)
}

View File

@ -16,7 +16,6 @@ import (
"storj.io/storj/internal/sync2"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/piecestore"
"storj.io/storj/pkg/utils"
)
@ -59,7 +58,7 @@ func (s *Server) Retrieve(stream pb.PieceStoreRoutes_RetrieveServer) (err error)
}
// Get path to data being retrieved
path, err := pstore.PathByID(id, s.DataDir)
path, err := s.storage.PiecePath(id)
if err != nil {
return err
}
@ -95,7 +94,7 @@ func (s *Server) Retrieve(stream pb.PieceStoreRoutes_RetrieveServer) (err error)
func (s *Server) retrieveData(ctx context.Context, stream pb.PieceStoreRoutes_RetrieveServer, id string, offset, length int64) (retrieved, allocated int64, err error) {
defer mon.Task()(&ctx)(&err)
storeFile, err := pstore.RetrieveReader(ctx, id, offset, length, s.DataDir)
storeFile, err := s.storage.Reader(ctx, id, offset, length)
if err != nil {
return 0, 0, err
}

View File

@ -57,7 +57,7 @@ func DirSize(path string) (int64, error) {
// Server -- GRPC server meta data used in route calls
type Server struct {
log *zap.Logger
DataDir string
storage *pstore.Storage
DB *psdb.DB
pkey crypto.PrivateKey
totalAllocated int64
@ -66,8 +66,7 @@ type Server struct {
}
// NewEndpoint -- initializes a new endpoint for a piecestore server
func NewEndpoint(log *zap.Logger, config Config, db *psdb.DB, pkey crypto.PrivateKey) (*Server, error) {
func NewEndpoint(log *zap.Logger, config Config, storage *pstore.Storage, db *psdb.DB, pkey crypto.PrivateKey) (*Server, error) {
// read the allocated disk space from the config file
allocatedDiskSpace := config.AllocatedDiskSpace
allocatedBandwidth := config.AllocatedBandwidth
@ -122,7 +121,7 @@ func NewEndpoint(log *zap.Logger, config Config, db *psdb.DB, pkey crypto.Privat
return &Server{
log: log,
DataDir: filepath.Join(config.Path, "piece-store-data"),
storage: storage,
DB: db,
pkey: pkey,
totalAllocated: allocatedDiskSpace,
@ -132,10 +131,10 @@ func NewEndpoint(log *zap.Logger, config Config, db *psdb.DB, pkey crypto.Privat
}
// New creates a Server with custom db
func New(log *zap.Logger, dataDir string, db *psdb.DB, config Config, pkey crypto.PrivateKey) *Server {
func New(log *zap.Logger, storage *pstore.Storage, db *psdb.DB, config Config, pkey crypto.PrivateKey) *Server {
return &Server{
log: log,
DataDir: dataDir,
storage: storage,
DB: db,
pkey: pkey,
totalAllocated: config.AllocatedDiskSpace,
@ -148,7 +147,12 @@ func New(log *zap.Logger, dataDir string, db *psdb.DB, config Config, pkey crypt
func (s *Server) Close() error { return nil }
// Stop the piececstore node
func (s *Server) Stop(ctx context.Context) error { return s.DB.Close() }
func (s *Server) Stop(ctx context.Context) error {
return errs.Combine(
s.DB.Close(),
s.storage.Close(),
)
}
// Piece -- Send meta data about a stored by by Id
func (s *Server) Piece(ctx context.Context, in *pb.PieceId) (*pb.PieceSummary, error) {
@ -164,7 +168,7 @@ func (s *Server) Piece(ctx context.Context, in *pb.PieceId) (*pb.PieceSummary, e
return nil, err
}
path, err := pstore.PathByID(id, s.DataDir)
path, err := s.storage.PiecePath(id)
if err != nil {
return nil, err
}
@ -231,7 +235,7 @@ func (s *Server) Delete(ctx context.Context, in *pb.PieceDelete) (*pb.PieceDelet
}
func (s *Server) deleteByID(id string) error {
if err := pstore.Delete(id, s.DataDir); err != nil {
if err := s.storage.Delete(id); err != nil {
return err
}

View File

@ -4,7 +4,6 @@
package psserver
import (
"bytes"
"crypto"
"crypto/ecdsa"
"fmt"
@ -14,7 +13,6 @@ import (
"math"
"net"
"os"
"path"
"path/filepath"
"runtime"
"strings"
@ -24,10 +22,12 @@ import (
"github.com/gtank/cryptopasta"
_ "github.com/mattn/go-sqlite3"
"github.com/stretchr/testify/assert"
"github.com/zeebo/errs"
"go.uber.org/zap/zaptest"
"golang.org/x/net/context"
"google.golang.org/grpc"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testidentity"
"storj.io/storj/internal/teststorj"
"storj.io/storj/pkg/pb"
@ -36,33 +36,30 @@ import (
"storj.io/storj/pkg/storj"
)
var ctx = context.Background()
func writeFileToDir(name, dir string) error {
file, err := pstore.StoreWriter(name, dir)
func (TS *TestServer) writeFile(pieceID string) error {
file, err := TS.s.storage.Writer(pieceID)
if err != nil {
return err
}
// Close when finished
_, err = io.Copy(file, bytes.NewReader([]byte("butts")))
if err != nil {
_ = file.Close()
return err
}
return file.Close()
_, err = file.Write([]byte("xyzwq"))
return errs.Combine(err, file.Close())
}
func TestPiece(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
TS := NewTestServer(t)
defer TS.Stop()
if err := writeFileToDir("11111111111111111111", TS.s.DataDir); err != nil {
if err := TS.writeFile("11111111111111111111"); err != nil {
t.Errorf("Error: %v\nCould not create test piece", err)
return
}
defer func() { _ = pstore.Delete("11111111111111111111", TS.s.DataDir) }()
defer func() { _ = TS.s.storage.Delete("11111111111111111111") }()
// set up test cases
tests := []struct {
@ -81,13 +78,16 @@ func TestPiece(t *testing.T) {
id: "123",
size: 5,
expiration: 9999999999,
err: "rpc error: code = Unknown desc = argError: invalid id length",
err: "rpc error: code = Unknown desc = piecestore error: invalid id length",
},
{ // server should err with nonexistent file
id: "22222222222222222222",
size: 5,
expiration: 9999999999,
err: fmt.Sprintf("rpc error: code = Unknown desc = stat %s: no such file or directory", path.Join(TS.s.DataDir, "/22/22/2222222222222222")),
err: fmt.Sprintf("rpc error: code = Unknown desc = stat %s: no such file or directory", func() string {
path, _ := TS.s.storage.PiecePath("22222222222222222222")
return path
}()),
},
{ // server should err with invalid TTL
id: "22222222222222222222;DELETE*FROM TTL;;;;",
@ -135,16 +135,18 @@ func TestPiece(t *testing.T) {
func TestRetrieve(t *testing.T) {
t.Skip("broken test")
ctx := testcontext.New(t)
defer ctx.Cleanup()
TS := NewTestServer(t)
defer TS.Stop()
// simulate piece stored with storagenode
if err := writeFileToDir("11111111111111111111", TS.s.DataDir); err != nil {
if err := TS.writeFile("11111111111111111111"); err != nil {
t.Errorf("Error: %v\nCould not create test piece", err)
return
}
defer func() { _ = pstore.Delete("11111111111111111111", TS.s.DataDir) }()
defer func() { _ = TS.s.storage.Delete("11111111111111111111") }()
// set up test cases
tests := []struct {
@ -162,7 +164,7 @@ func TestRetrieve(t *testing.T) {
respSize: 5,
allocSize: 5,
offset: 0,
content: []byte("butts"),
content: []byte("xyzwq"),
err: "",
},
{ // should successfully retrieve data in customizeable increments
@ -171,7 +173,7 @@ func TestRetrieve(t *testing.T) {
respSize: 5,
allocSize: 2,
offset: 0,
content: []byte("butts"),
content: []byte("xyzwq"),
err: "",
},
{ // should successfully retrieve data with lower allocations
@ -189,7 +191,7 @@ func TestRetrieve(t *testing.T) {
respSize: 5,
allocSize: 5,
offset: 0,
content: []byte("butts"),
content: []byte("xyzwq"),
err: "",
},
{ // server should err with invalid id
@ -198,8 +200,8 @@ func TestRetrieve(t *testing.T) {
respSize: 5,
allocSize: 5,
offset: 0,
content: []byte("butts"),
err: "rpc error: code = Unknown desc = argError: invalid id length",
content: []byte("xyzwq"),
err: "rpc error: code = Unknown desc = piecestore error: invalid id length",
},
{ // server should err with nonexistent file
id: "22222222222222222222",
@ -207,8 +209,11 @@ func TestRetrieve(t *testing.T) {
respSize: 5,
allocSize: 5,
offset: 0,
content: []byte("butts"),
err: fmt.Sprintf("rpc error: code = Unknown desc = retrieve error: stat %s: no such file or directory", path.Join(TS.s.DataDir, "/22/22/2222222222222222")),
content: []byte("xyzwq"),
err: fmt.Sprintf("rpc error: code = Unknown desc = retrieve error: stat %s: no such file or directory", func() string {
path, _ := TS.s.storage.PiecePath("22222222222222222222")
return path
}()),
},
{ // server should return expected content and respSize with offset and excess reqSize
id: "11111111111111111111",
@ -292,6 +297,9 @@ func TestRetrieve(t *testing.T) {
}
func TestStore(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
TS := NewTestServer(t)
defer TS.Stop()
@ -308,7 +316,7 @@ func TestStore(t *testing.T) {
{ // should successfully store data
id: "99999999999999999999",
ttl: 9999999999,
content: []byte("butts"),
content: []byte("xyzwq"),
message: "OK",
totalReceived: 5,
err: "",
@ -316,15 +324,15 @@ func TestStore(t *testing.T) {
{ // should err with invalid id length
id: "butts",
ttl: 9999999999,
content: []byte("butts"),
content: []byte("xyzwq"),
message: "",
totalReceived: 0,
err: "rpc error: code = Unknown desc = argError: invalid id length",
err: "rpc error: code = Unknown desc = piecestore error: invalid id length",
},
{ // should err with piece ID not specified
id: "",
ttl: 9999999999,
content: []byte("butts"),
content: []byte("xyzwq"),
message: "",
totalReceived: 0,
err: "rpc error: code = Unknown desc = store error: piece ID not specified",
@ -417,6 +425,9 @@ func TestStore(t *testing.T) {
}
func TestPbaValidation(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
TS := NewTestServer(t)
defer TS.Stop()
@ -497,6 +508,9 @@ func TestPbaValidation(t *testing.T) {
}
func TestDelete(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
TS := NewTestServer(t)
defer TS.Stop()
@ -515,8 +529,8 @@ func TestDelete(t *testing.T) {
},
{ // should err with invalid id length
id: "123",
message: "rpc error: code = Unknown desc = argError: invalid id length",
err: "rpc error: code = Unknown desc = argError: invalid id length",
message: "rpc error: code = Unknown desc = piecestore error: invalid id length",
err: "rpc error: code = Unknown desc = piecestore error: invalid id length",
},
{ // should return OK with nonexistent file
id: "22222222222222222223",
@ -530,7 +544,7 @@ func TestDelete(t *testing.T) {
assert := assert.New(t)
// simulate piece stored with storagenode
if err := writeFileToDir("11111111111111111111", TS.s.DataDir); err != nil {
if err := TS.writeFile("11111111111111111111"); err != nil {
t.Errorf("Error: %v\nCould not create test piece", err)
return
}
@ -545,7 +559,7 @@ func TestDelete(t *testing.T) {
}()
defer func() {
assert.NoError(pstore.Delete("11111111111111111111", TS.s.DataDir))
assert.NoError(TS.s.storage.Delete("11111111111111111111"))
}()
req := &pb.PieceDelete{Id: tt.id}
@ -560,7 +574,7 @@ func TestDelete(t *testing.T) {
assert.Equal(tt.message, resp.GetMessage())
// if test passes, check if file was indeed deleted
filePath, err := pstore.PathByID(tt.id, TS.s.DataDir)
filePath, err := TS.s.storage.PiecePath(tt.id)
assert.NoError(err)
if _, err = os.Stat(filePath); os.IsExist(err) {
t.Errorf("File not deleted")
@ -578,8 +592,9 @@ func newTestServerStruct(t *testing.T) (*Server, func()) {
tempDBPath := filepath.Join(tmp, "test.db")
tempDir := filepath.Join(tmp, "test-data", "3000")
storage := pstore.NewStorage(tempDir)
psDB, err := psdb.Open(ctx, tempDir, tempDBPath)
psDB, err := psdb.Open(context.TODO(), storage, tempDBPath)
if err != nil {
t.Fatalf("failed open psdb: %v", err)
}
@ -589,14 +604,14 @@ func newTestServerStruct(t *testing.T) (*Server, func()) {
}
server := &Server{
log: zaptest.NewLogger(t),
DataDir: tempDir,
storage: storage,
DB: psDB,
verifier: verifier,
totalAllocated: math.MaxInt64,
totalBwAllocated: math.MaxInt64,
}
return server, func() {
if serr := server.Stop(ctx); serr != nil {
if serr := server.Stop(context.TODO()); serr != nil {
t.Fatal(serr)
}
// TODO:fix this error check

View File

@ -13,7 +13,6 @@ import (
"go.uber.org/zap"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/piecestore"
"storj.io/storj/pkg/utils"
)
@ -87,7 +86,7 @@ func (s *Server) storeData(ctx context.Context, stream pb.PieceStoreRoutes_Store
}()
// Initialize file for storing data
storeFile, err := pstore.StoreWriter(id, s.DataDir)
storeFile, err := s.storage.Writer(id)
if err != nil {
return 0, err
}

View File

@ -7,7 +7,6 @@ import (
"context"
"io"
"os"
"path"
"path/filepath"
"github.com/zeebo/errs"
@ -15,84 +14,82 @@ import (
"storj.io/storj/pkg/ranger"
)
// Storage stores piecestore pieces
type Storage struct {
dir string
}
// NewStorage creates database for storing pieces
func NewStorage(dir string) *Storage {
return &Storage{dir}
}
// Close closes resources
func (storage *Storage) Close() error { return nil }
// IDLength -- Minimum ID length
const IDLength = 20
// Errors
var (
ArgError = errs.Class("argError")
FSError = errs.Class("fsError")
Error = errs.Class("piecestore error")
)
// PathByID creates datapath from id and dir
func PathByID(id, dir string) (string, error) {
if len(id) < IDLength {
return "", ArgError.New("invalid id length")
}
if dir == "" {
return "", ArgError.New("no path provided")
// PiecePath creates piece storage path from id and dir
func (storage *Storage) PiecePath(pieceID string) (string, error) {
if len(pieceID) < IDLength {
return "", Error.New("invalid id length")
}
folder1 := id[0:2]
folder2 := id[2:4]
fileName := id[4:]
return path.Join(dir, folder1, folder2, fileName), nil
folder1, folder2, filename := pieceID[0:2], pieceID[2:4], pieceID[4:]
return filepath.Join(storage.dir, folder1, folder2, filename), nil
}
// StoreWriter stores data into piece store in multiple writes
// id is the id of the data to be stored
// dir is the pstore directory containing all other data stored
// returns error if failed and nil if successful
func StoreWriter(id string, dir string) (io.WriteCloser, error) {
dataPath, err := PathByID(id, dir)
// Writer returns a writer that can be used to store piece.
func (storage *Storage) Writer(pieceID string) (io.WriteCloser, error) {
path, err := storage.PiecePath(pieceID)
if err != nil {
return nil, err
}
// Create directory path on file system
if err = os.MkdirAll(filepath.Dir(dataPath), 0700); err != nil {
return nil, err
if err = os.MkdirAll(filepath.Dir(path), 0700); err != nil {
return nil, Error.Wrap(err)
}
// Create File on file system
return os.OpenFile(dataPath, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0600)
file, err := os.OpenFile(path, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0600)
if err != nil {
return nil, Error.Wrap(err)
}
return file, nil
}
// RetrieveReader retrieves data from pstore directory
// id is the id of the stored data
// offset is the offset of the data that you are reading. Useful for multiple connections to split the data transfer
// length is the amount of data to read. Read all data if -1
// dir is the pstore directory containing all other data stored
// returns error if failed and nil if successful
func RetrieveReader(ctx context.Context, id string, offset int64, length int64, dir string) (io.ReadCloser, error) {
dataPath, err := PathByID(id, dir)
// Reader returns a reader for the specified piece at the location
func (storage *Storage) Reader(ctx context.Context, pieceID string, offset int64, length int64) (io.ReadCloser, error) {
path, err := storage.PiecePath(pieceID)
if err != nil {
return nil, err
}
fileInfo, err := os.Stat(dataPath)
info, err := os.Stat(path)
if err != nil {
return nil, err
}
// If offset is greater than file size return
if offset >= fileInfo.Size() || offset < 0 {
return nil, ArgError.New("invalid offset: %v", offset)
if offset >= info.Size() || offset < 0 {
return nil, Error.New("invalid offset: %v", offset)
}
// If length less than 0 read the entire file
if length <= -1 {
length = fileInfo.Size()
length = info.Size()
}
// If trying to read past the end of the file, just read to the end
if fileInfo.Size() < offset+length {
length = fileInfo.Size() - offset
if info.Size() < offset+length {
length = info.Size() - offset
}
// Created a section reader so that we can concurrently retrieve the same file.
rr, err := ranger.FileRanger(dataPath)
rr, err := ranger.FileRanger(path)
if err != nil {
return nil, err
}
@ -100,23 +97,16 @@ func RetrieveReader(ctx context.Context, id string, offset int64, length int64,
return rr.Range(ctx, offset, length)
}
// Delete deletes data from storagenode
// id is the id of the data to be stored
// dir is the pstore directory containing all other data stored
// returns error if failed and nil if successful
func Delete(id string, dir string) error {
dataPath, err := PathByID(id, dir)
// Delete deletes piece from storage
func (storage *Storage) Delete(pieceID string) error {
path, err := storage.PiecePath(pieceID)
if err != nil {
return err
}
if _, err = os.Stat(dataPath); os.IsNotExist(err) {
return nil
err = os.Remove(path)
if os.IsNotExist(err) {
err = nil
}
if err = os.Remove(dataPath); err != nil {
return err
}
return nil
return err
}

View File

@ -4,271 +4,78 @@
package pstore
import (
"context"
"os"
"path"
"path/filepath"
"bytes"
"io"
"io/ioutil"
"math/rand"
"strings"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"storj.io/storj/internal/testcontext"
)
func TestStore(t *testing.T) {
tests := []struct {
it string
id string
content []byte
expectedContent []byte
err string
}{
{
it: "should successfully store data",
id: "0123456789ABCDEFGHIJ",
content: []byte("butts"),
expectedContent: []byte("butts"),
err: "",
},
{
it: "should return an error when given an invalid id",
id: "012",
content: []byte("butts"),
expectedContent: []byte("butts"),
err: "argError: invalid id length",
},
ctx := testcontext.New(t)
defer ctx.Cleanup()
store := NewStorage(ctx.Dir("example"))
defer ctx.Check(store.Close)
pieceID := strings.Repeat("AB01", 10)
source := make([]byte, 8000)
_, _ = rand.Read(source[:])
{ // write data
w, err := store.Writer(pieceID)
require.NoError(t, err)
n, err := io.Copy(w, bytes.NewReader(source))
assert.Equal(t, n, int64(len(source)))
assert.NoError(t, err)
assert.NoError(t, w.Close())
}
for _, tt := range tests {
t.Run(tt.it, func(t *testing.T) {
assert := assert.New(t)
storeFile, err := StoreWriter(tt.id, os.TempDir())
if tt.err != "" {
if assert.NotNil(err) {
assert.Equal(tt.err, err.Error())
}
return
} else if err != nil {
t.Errorf("Error: %s", err.Error())
return
{ // valid reads
read := func(offset, length int64) []byte {
reader, err := store.Reader(ctx, pieceID, offset, length)
if assert.NoError(t, err) {
data, err := ioutil.ReadAll(reader)
assert.NoError(t, err)
assert.NoError(t, reader.Close())
return data
}
return nil
}
// Write chunk received to disk
_, err = storeFile.Write(tt.content)
assert.NoError(err)
assert.Equal(t, source, read(0, -1))
assert.Equal(t, source, read(0, 16000))
assert.NoError(storeFile.Close())
assert.Equal(t, source[10:1010], read(10, 1000))
assert.Equal(t, source[10:11], read(10, 1))
}
folder1 := tt.id[0:2]
folder2 := tt.id[2:4]
fileName := tt.id[4:]
createdFilePath := path.Join(os.TempDir(), folder1, folder2, fileName)
createdFile, err := os.Open(createdFilePath)
if err != nil {
t.Errorf("Error: %s opening created file %s", err.Error(), createdFilePath)
return
{ // invalid reads
badread := func(offset, length int64) error {
reader, err := store.Reader(ctx, pieceID, offset, length)
if err == nil {
assert.NoError(t, reader.Close())
}
buffer := make([]byte, int64(len(tt.content)))
_, _ = createdFile.Seek(0, 0)
_, _ = createdFile.Read(buffer)
return err
}
assert.NoError(createdFile.Close())
assert.NoError(os.RemoveAll(path.Join(os.TempDir(), folder1)))
assert.Error(t, badread(-100, 0))
assert.Error(t, badread(-100, -10))
}
if string(buffer) != string(tt.expectedContent) {
t.Errorf("Expected data butts does not equal Actual data %s", string(buffer))
return
}
})
}
}
func TestRetrieve(t *testing.T) {
t.Skip("flaky")
tests := []struct {
it string
id string
size int64
offset int64
content []byte
expectedContent []byte
err string
}{
{
it: "should successfully retrieve data",
id: "0123456789ABCDEFGHIJ1",
size: 5,
offset: 0,
content: []byte("butts"),
expectedContent: []byte("butts"),
err: "",
},
{
it: "should successfully retrieve data by offset",
id: "0123456789ABCDEFGHIJ2",
size: 5,
offset: 5,
content: []byte("butts"),
expectedContent: []byte("butts"),
err: "",
},
{
it: "should successfully retrieve data by chunk",
id: "0123456789ABCDEFGHIJ3",
size: 2,
offset: 5,
content: []byte("bu"),
expectedContent: []byte("bu"),
err: "",
},
{
it: "should return an error when given negative offset",
id: "0123456789ABCDEFGHIJ4",
size: 0,
offset: -1337,
content: []byte("butts"),
expectedContent: []byte(""),
err: "argError: invalid offset: -1337",
},
{
it: "should successfully retrieve data with negative length",
id: "0123456789ABCDEFGHIJ5",
size: -1,
offset: 0,
content: []byte("butts"),
expectedContent: []byte("butts"),
err: "",
},
}
for _, tt := range tests {
t.Run(tt.it, func(t *testing.T) {
assert := assert.New(t)
folder1 := tt.id[0:2]
folder2 := tt.id[2:4]
fileName := tt.id[4:]
tmpdir := filepath.Join(os.TempDir(), folder1)
defer func() {
assert.NoError(os.RemoveAll(tmpdir))
}()
createdFilePath := path.Join(tmpdir, folder2, fileName)
if err := os.MkdirAll(filepath.Dir(createdFilePath), 0700); err != nil {
t.Errorf("Error: %s when creating dir", err.Error())
return
}
createdFile, err := os.OpenFile(createdFilePath, os.O_RDWR|os.O_CREATE, 0755)
if err != nil {
t.Errorf("Error: %s opening created file %s", err.Error(), createdFilePath)
return
}
_, err = createdFile.Seek(tt.offset, 0)
if tt.offset < 0 {
assert.Error(err)
}
_, err = createdFile.Write(tt.content)
if err != nil {
t.Errorf("Error: %s writing to created file", err.Error())
return
}
assert.NoError(createdFile.Close())
storeFile, err := RetrieveReader(context.Background(), tt.id, tt.offset, tt.size, os.TempDir())
if tt.err != "" {
if assert.NotNil(err) {
assert.Equal(tt.err, err.Error())
}
return
} else if err != nil {
t.Errorf("Error: %s", err.Error())
return
}
size := tt.size
if tt.size < 0 {
size = int64(len(tt.content))
}
buffer := make([]byte, size)
_, err = storeFile.Read(buffer)
assert.NoError(err)
assert.NoError(storeFile.Close())
if string(buffer) != string(tt.expectedContent) {
t.Errorf("Expected data butts does not equal Actual data %s", string(buffer))
return
}
})
}
}
func TestDelete(t *testing.T) {
tests := []struct {
it string
id string
err string
}{
{
it: "should successfully delete data",
id: "11111111111111111111",
err: "",
},
{
it: "should return nil-err with non-existent id",
id: "11111111111111111111",
err: "",
},
{
it: "should err with invalid id length",
id: "111111",
err: "argError: invalid id length",
},
}
for _, tt := range tests {
t.Run(tt.it, func(t *testing.T) {
assert := assert.New(t)
folder1 := tt.id[0:2]
folder2 := tt.id[2:4]
fileName := tt.id[4:]
createdFilePath := path.Join(os.TempDir(), folder1, folder2, fileName)
if err := os.MkdirAll(filepath.Dir(createdFilePath), 0700); err != nil {
t.Errorf("Error: %s when creating dir", err.Error())
return
}
createdFile, err := os.OpenFile(createdFilePath, os.O_RDWR|os.O_CREATE, 0755)
if err != nil {
t.Errorf("Error: %s opening created file %s", err.Error(), createdFilePath)
return
}
assert.NoError(createdFile.Close())
err = Delete(tt.id, os.TempDir())
if tt.err != "" {
if assert.NotNil(err) {
assert.Equal(tt.err, err.Error())
}
return
} else if err != nil {
t.Errorf("Error: %s", err.Error())
return
}
if _, err = os.Stat(createdFilePath); os.IsExist(err) {
t.Errorf("Error deleting file")
return
}
})
{ // test delete
assert.NoError(t, store.Delete(pieceID))
_, err := store.Reader(ctx, pieceID, 0, -1)
assert.Error(t, err)
}
}

View File

@ -16,6 +16,7 @@ import (
"storj.io/storj/pkg/kademlia"
"storj.io/storj/pkg/node"
"storj.io/storj/pkg/pb"
pstore "storj.io/storj/pkg/piecestore"
"storj.io/storj/pkg/piecestore/psserver"
"storj.io/storj/pkg/piecestore/psserver/psdb"
"storj.io/storj/pkg/server"
@ -26,7 +27,7 @@ import (
// DB is the master database for Storage Node
type DB interface {
// TODO: use better interfaces
Disk() string
Storage() *pstore.Storage
PSDB() *psdb.DB
RoutingTable() (kdb, ndb storage.KeyValueStore)
}
@ -135,7 +136,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config) (*P
config := config.Storage
// TODO: psserver shouldn't need the private key
peer.Piecestore = psserver.New(peer.Log.Named("piecestore"), peer.DB.Disk(), peer.DB.PSDB(), config, peer.Identity.Key)
peer.Piecestore = psserver.New(peer.Log.Named("piecestore"), peer.DB.Storage(), peer.DB.PSDB(), config, peer.Identity.Key)
pb.RegisterPieceStoreRoutesServer(peer.Public.Server.GRPC(), peer.Piecestore)
}

View File

@ -8,6 +8,7 @@ import (
"github.com/zeebo/errs"
pstore "storj.io/storj/pkg/piecestore"
"storj.io/storj/pkg/piecestore/psserver/psdb"
"storj.io/storj/storage"
"storj.io/storj/storage/teststore"
@ -18,7 +19,7 @@ var _ storagenode.DB = (*DB)(nil)
// DB contains access to different database tables
type DB struct {
disk string
storage *pstore.Storage
psdb *psdb.DB
kdb, ndb storage.KeyValueStore
}
@ -26,17 +27,19 @@ type DB struct {
// NewInMemory creates new inmemory database for storagenode
// TODO: still stores data on disk
func NewInMemory(storageDir string) (*DB, error) {
storage := pstore.NewStorage(storageDir)
// TODO: OpenInMemory shouldn't need context argument
psdb, err := psdb.OpenInMemory(context.TODO(), storageDir)
psdb, err := psdb.OpenInMemory(context.TODO(), storage)
if err != nil {
return nil, err
}
return &DB{
disk: storageDir,
psdb: psdb,
kdb: teststore.New(),
ndb: teststore.New(),
storage: storage,
psdb: psdb,
kdb: teststore.New(),
ndb: teststore.New(),
}, nil
}
@ -46,12 +49,13 @@ func (db *DB) Close() error {
db.psdb.Close(),
db.kdb.Close(),
db.ndb.Close(),
db.storage.Close(),
)
}
// Disk returns piecestore data folder
func (db *DB) Disk() string {
return db.disk
// Storage returns piecestore location
func (db *DB) Storage() *pstore.Storage {
return db.storage
}
// PSDB returns piecestore database