diff --git a/pkg/piecestore/rpc/server/psdb/psdb.go b/pkg/piecestore/rpc/server/psdb/psdb.go index 0bed20640..86d5e934a 100644 --- a/pkg/piecestore/rpc/server/psdb/psdb.go +++ b/pkg/piecestore/rpc/server/psdb/psdb.go @@ -37,7 +37,7 @@ type DB struct { } // Open opens DB at DBPath -func Open(ctx context.Context, DataPath, DBPath string) (db *DB, err error) { +func Open(ctx context.Context, dataPath, DBPath string) (db *DB, err error) { defer mon.Task()(&ctx)(&err) if err = os.MkdirAll(filepath.Dir(DBPath), 0700); err != nil { @@ -48,57 +48,82 @@ func Open(ctx context.Context, DataPath, DBPath string) (db *DB, err error) { if err != nil { return nil, err } - - // try to enable write-ahead-logging - _, _ = sqlite.Exec(`PRAGMA journal_mode = WAL`) - - defer func() { - if err != nil { - _ = sqlite.Close() - } - }() - - tx, err := sqlite.Begin() - if err != nil { - return nil, err + db = &DB{ + DB: sqlite, + dataPath: dataPath, + check: time.NewTicker(*defaultCheckInterval), } - defer func() { _ = tx.Rollback() }() - - _, err = tx.Exec("CREATE TABLE IF NOT EXISTS `ttl` (`id` BLOB UNIQUE, `created` INT(10), `expires` INT(10), `size` INT(10));") - if err != nil { - return nil, err + if err := db.init(); err != nil { + return nil, utils.CombineErrors(err, db.DB.Close()) } - _, err = tx.Exec("CREATE TABLE IF NOT EXISTS `bandwidth_agreements` (`agreement` BLOB, `signature` BLOB);") - if err != nil { - return nil, err - } + go db.garbageCollect(ctx) - _, err = tx.Exec("CREATE INDEX IF NOT EXISTS idx_ttl_expires ON ttl (expires);") - if err != nil { - return nil, err - } + return db, nil +} - _, err = tx.Exec("CREATE TABLE IF NOT EXISTS `bwusagetbl` (`size` INT(10), `daystartdate` INT(10), `dayenddate` INT(10));") - if err != nil { - return nil, err - } +// OpenInMemory opens sqlite DB inmemory +func OpenInMemory(ctx context.Context, dataPath string) (db *DB, err error) { + defer mon.Task()(&ctx)(&err) - err = tx.Commit() + sqlite, err := sql.Open("sqlite3", ":memory:") if err != nil { return nil, err } db = &DB{ DB: sqlite, - dataPath: DataPath, + dataPath: dataPath, check: time.NewTicker(*defaultCheckInterval), } + if err := db.init(); err != nil { + return nil, utils.CombineErrors(err, db.DB.Close()) + } + go db.garbageCollect(ctx) return db, nil } +func (db *DB) init() (err error) { + tx, err := db.DB.Begin() + if err != nil { + return err + } + + defer func() { _ = tx.Rollback() }() + + _, err = tx.Exec("CREATE TABLE IF NOT EXISTS `ttl` (`id` BLOB UNIQUE, `created` INT(10), `expires` INT(10), `size` INT(10));") + if err != nil { + return err + } + + _, err = tx.Exec("CREATE TABLE IF NOT EXISTS `bandwidth_agreements` (`agreement` BLOB, `signature` BLOB);") + if err != nil { + return err + } + + _, err = tx.Exec("CREATE INDEX IF NOT EXISTS idx_ttl_expires ON ttl (expires);") + if err != nil { + return err + } + + _, err = tx.Exec("CREATE TABLE IF NOT EXISTS `bwusagetbl` (`size` INT(10), `daystartdate` INT(10), `dayenddate` INT(10));") + if err != nil { + return err + } + + err = tx.Commit() + if err != nil { + return err + } + + // try to enable write-ahead-logging + _, _ = db.DB.Exec(`PRAGMA journal_mode = WAL`) + + return nil +} + // Close the database func (db *DB) Close() error { return db.DB.Close() diff --git a/pkg/piecestore/rpc/server/psdb/psdb_test.go b/pkg/piecestore/rpc/server/psdb/psdb_test.go index 321f8c247..8a1d12894 100644 --- a/pkg/piecestore/rpc/server/psdb/psdb_test.go +++ b/pkg/piecestore/rpc/server/psdb/psdb_test.go @@ -5,6 +5,7 @@ package psdb import ( "bytes" + "context" "io/ioutil" "os" "path/filepath" @@ -14,7 +15,6 @@ import ( "github.com/gogo/protobuf/proto" _ "github.com/mattn/go-sqlite3" - "golang.org/x/net/context" "storj.io/storj/pkg/pb" ) @@ -23,7 +23,7 @@ var ctx = context.Background() const concurrency = 10 -func openTest(t testing.TB) (*DB, func()) { +func newDB(t testing.TB) (*DB, func()) { tmpdir, err := ioutil.TempDir("", "storj-psdb") if err != nil { t.Fatal(err) @@ -48,8 +48,18 @@ func openTest(t testing.TB) (*DB, func()) { } } +func TestNewInmemory(t *testing.T) { + db, err := OpenInMemory(context.Background(), "") + if err != nil { + t.Fatal(err) + } + if err := db.Close(); err != nil { + t.Fatal(err) + } +} + func TestHappyPath(t *testing.T) { - db, cleanup := openTest(t) + db, cleanup := newDB(t) defer cleanup() type TTL struct { @@ -186,7 +196,7 @@ func TestHappyPath(t *testing.T) { } func TestBandwidthUsage(t *testing.T) { - db, cleanup := openTest(t) + db, cleanup := newDB(t) defer cleanup() type BWUSAGE struct { @@ -250,7 +260,7 @@ func TestBandwidthUsage(t *testing.T) { } func BenchmarkWriteBandwidthAllocation(b *testing.B) { - db, cleanup := openTest(b) + db, cleanup := newDB(b) defer cleanup() const WritesPerLoop = 10 diff --git a/pkg/piecestore/rpc/server/server.go b/pkg/piecestore/rpc/server/server.go index bd0fba9ac..698db4107 100644 --- a/pkg/piecestore/rpc/server/server.go +++ b/pkg/piecestore/rpc/server/server.go @@ -126,6 +126,7 @@ func Initialize(ctx context.Context, config Config, pkey crypto.PrivateKey) (*Se if err != nil { return nil, ServerError.Wrap(err) } + if usedBandwidth > allocatedBandwidth { zap.S().Warnf("Exceed the allowed Bandwidth setting") } else { @@ -137,7 +138,6 @@ func Initialize(ctx context.Context, config Config, pkey crypto.PrivateKey) (*Se if (totalUsed == 0x00) && (freeDiskSpace < allocatedDiskSpace) { allocatedDiskSpace = freeDiskSpace zap.S().Warnf("Disk space is less than requested allocated space, allocating = %d Bytes", allocatedDiskSpace) - return &Server{DataDir: dataDir, DB: db, pkey: pkey, totalAllocated: allocatedDiskSpace, totalBwAllocated: allocatedBandwidth}, nil } // on restarting the Piece node server, assuming already been working as a node @@ -145,7 +145,6 @@ func Initialize(ctx context.Context, config Config, pkey crypto.PrivateKey) (*Se // before restarting if totalUsed >= allocatedDiskSpace { zap.S().Warnf("Used more space then allocated, allocating = %d Bytes", allocatedDiskSpace) - return &Server{DataDir: dataDir, DB: db, pkey: pkey, totalAllocated: allocatedDiskSpace, totalBwAllocated: allocatedBandwidth}, nil } // the available diskspace is less than remaining allocated space, @@ -153,11 +152,28 @@ func Initialize(ctx context.Context, config Config, pkey crypto.PrivateKey) (*Se if freeDiskSpace < (allocatedDiskSpace - totalUsed) { allocatedDiskSpace = freeDiskSpace zap.S().Warnf("Disk space is less than requested allocated space, allocating = %d Bytes", allocatedDiskSpace) - return &Server{DataDir: dataDir, DB: db, pkey: pkey, totalAllocated: allocatedDiskSpace, totalBwAllocated: allocatedBandwidth}, nil } - signatureVerifier := auth.NewSignedMessageVerifier() - return &Server{DataDir: dataDir, DB: db, pkey: pkey, totalAllocated: allocatedDiskSpace, totalBwAllocated: allocatedBandwidth, verifier: signatureVerifier}, nil + return &Server{ + DataDir: dataDir, + DB: db, + pkey: pkey, + totalAllocated: allocatedDiskSpace, + totalBwAllocated: allocatedBandwidth, + verifier: auth.NewSignedMessageVerifier(), + }, nil +} + +// New creates a Server with custom db +func New(dataDir string, db *psdb.DB, config Config, pkey crypto.PrivateKey) *Server { + return &Server{ + DataDir: dataDir, + DB: db, + pkey: pkey, + totalAllocated: config.AllocatedDiskSpace, + totalBwAllocated: config.AllocatedBandwidth, + verifier: auth.NewSignedMessageVerifier(), + } } // Stop the piececstore node