Add inmemory psdb (#543)
This commit is contained in:
parent
0d03f2fbb5
commit
bd67288071
@ -37,7 +37,7 @@ type DB struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Open opens DB at DBPath
|
// Open opens DB at DBPath
|
||||||
func Open(ctx context.Context, DataPath, DBPath string) (db *DB, err error) {
|
func Open(ctx context.Context, dataPath, DBPath string) (db *DB, err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
if err = os.MkdirAll(filepath.Dir(DBPath), 0700); err != nil {
|
if err = os.MkdirAll(filepath.Dir(DBPath), 0700); err != nil {
|
||||||
@ -48,57 +48,82 @@ func Open(ctx context.Context, DataPath, DBPath string) (db *DB, err error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
db = &DB{
|
||||||
// try to enable write-ahead-logging
|
DB: sqlite,
|
||||||
_, _ = sqlite.Exec(`PRAGMA journal_mode = WAL`)
|
dataPath: dataPath,
|
||||||
|
check: time.NewTicker(*defaultCheckInterval),
|
||||||
defer func() {
|
|
||||||
if err != nil {
|
|
||||||
_ = sqlite.Close()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
tx, err := sqlite.Begin()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
defer func() { _ = tx.Rollback() }()
|
if err := db.init(); err != nil {
|
||||||
|
return nil, utils.CombineErrors(err, db.DB.Close())
|
||||||
_, err = tx.Exec("CREATE TABLE IF NOT EXISTS `ttl` (`id` BLOB UNIQUE, `created` INT(10), `expires` INT(10), `size` INT(10));")
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.Exec("CREATE TABLE IF NOT EXISTS `bandwidth_agreements` (`agreement` BLOB, `signature` BLOB);")
|
go db.garbageCollect(ctx)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = tx.Exec("CREATE INDEX IF NOT EXISTS idx_ttl_expires ON ttl (expires);")
|
return db, nil
|
||||||
if err != nil {
|
}
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = tx.Exec("CREATE TABLE IF NOT EXISTS `bwusagetbl` (`size` INT(10), `daystartdate` INT(10), `dayenddate` INT(10));")
|
// OpenInMemory opens sqlite DB inmemory
|
||||||
if err != nil {
|
func OpenInMemory(ctx context.Context, dataPath string) (db *DB, err error) {
|
||||||
return nil, err
|
defer mon.Task()(&ctx)(&err)
|
||||||
}
|
|
||||||
|
|
||||||
err = tx.Commit()
|
sqlite, err := sql.Open("sqlite3", ":memory:")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
db = &DB{
|
db = &DB{
|
||||||
DB: sqlite,
|
DB: sqlite,
|
||||||
dataPath: DataPath,
|
dataPath: dataPath,
|
||||||
check: time.NewTicker(*defaultCheckInterval),
|
check: time.NewTicker(*defaultCheckInterval),
|
||||||
}
|
}
|
||||||
|
if err := db.init(); err != nil {
|
||||||
|
return nil, utils.CombineErrors(err, db.DB.Close())
|
||||||
|
}
|
||||||
|
|
||||||
go db.garbageCollect(ctx)
|
go db.garbageCollect(ctx)
|
||||||
|
|
||||||
return db, nil
|
return db, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *DB) init() (err error) {
|
||||||
|
tx, err := db.DB.Begin()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
defer func() { _ = tx.Rollback() }()
|
||||||
|
|
||||||
|
_, err = tx.Exec("CREATE TABLE IF NOT EXISTS `ttl` (`id` BLOB UNIQUE, `created` INT(10), `expires` INT(10), `size` INT(10));")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.Exec("CREATE TABLE IF NOT EXISTS `bandwidth_agreements` (`agreement` BLOB, `signature` BLOB);")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.Exec("CREATE INDEX IF NOT EXISTS idx_ttl_expires ON ttl (expires);")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.Exec("CREATE TABLE IF NOT EXISTS `bwusagetbl` (`size` INT(10), `daystartdate` INT(10), `dayenddate` INT(10));")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = tx.Commit()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// try to enable write-ahead-logging
|
||||||
|
_, _ = db.DB.Exec(`PRAGMA journal_mode = WAL`)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Close the database
|
// Close the database
|
||||||
func (db *DB) Close() error {
|
func (db *DB) Close() error {
|
||||||
return db.DB.Close()
|
return db.DB.Close()
|
||||||
|
@ -5,6 +5,7 @@ package psdb
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
@ -14,7 +15,6 @@ import (
|
|||||||
|
|
||||||
"github.com/gogo/protobuf/proto"
|
"github.com/gogo/protobuf/proto"
|
||||||
_ "github.com/mattn/go-sqlite3"
|
_ "github.com/mattn/go-sqlite3"
|
||||||
"golang.org/x/net/context"
|
|
||||||
|
|
||||||
"storj.io/storj/pkg/pb"
|
"storj.io/storj/pkg/pb"
|
||||||
)
|
)
|
||||||
@ -23,7 +23,7 @@ var ctx = context.Background()
|
|||||||
|
|
||||||
const concurrency = 10
|
const concurrency = 10
|
||||||
|
|
||||||
func openTest(t testing.TB) (*DB, func()) {
|
func newDB(t testing.TB) (*DB, func()) {
|
||||||
tmpdir, err := ioutil.TempDir("", "storj-psdb")
|
tmpdir, err := ioutil.TempDir("", "storj-psdb")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@ -48,8 +48,18 @@ func openTest(t testing.TB) (*DB, func()) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNewInmemory(t *testing.T) {
|
||||||
|
db, err := OpenInMemory(context.Background(), "")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err := db.Close(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestHappyPath(t *testing.T) {
|
func TestHappyPath(t *testing.T) {
|
||||||
db, cleanup := openTest(t)
|
db, cleanup := newDB(t)
|
||||||
defer cleanup()
|
defer cleanup()
|
||||||
|
|
||||||
type TTL struct {
|
type TTL struct {
|
||||||
@ -186,7 +196,7 @@ func TestHappyPath(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestBandwidthUsage(t *testing.T) {
|
func TestBandwidthUsage(t *testing.T) {
|
||||||
db, cleanup := openTest(t)
|
db, cleanup := newDB(t)
|
||||||
defer cleanup()
|
defer cleanup()
|
||||||
|
|
||||||
type BWUSAGE struct {
|
type BWUSAGE struct {
|
||||||
@ -250,7 +260,7 @@ func TestBandwidthUsage(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkWriteBandwidthAllocation(b *testing.B) {
|
func BenchmarkWriteBandwidthAllocation(b *testing.B) {
|
||||||
db, cleanup := openTest(b)
|
db, cleanup := newDB(b)
|
||||||
defer cleanup()
|
defer cleanup()
|
||||||
|
|
||||||
const WritesPerLoop = 10
|
const WritesPerLoop = 10
|
||||||
|
@ -126,6 +126,7 @@ func Initialize(ctx context.Context, config Config, pkey crypto.PrivateKey) (*Se
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, ServerError.Wrap(err)
|
return nil, ServerError.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if usedBandwidth > allocatedBandwidth {
|
if usedBandwidth > allocatedBandwidth {
|
||||||
zap.S().Warnf("Exceed the allowed Bandwidth setting")
|
zap.S().Warnf("Exceed the allowed Bandwidth setting")
|
||||||
} else {
|
} else {
|
||||||
@ -137,7 +138,6 @@ func Initialize(ctx context.Context, config Config, pkey crypto.PrivateKey) (*Se
|
|||||||
if (totalUsed == 0x00) && (freeDiskSpace < allocatedDiskSpace) {
|
if (totalUsed == 0x00) && (freeDiskSpace < allocatedDiskSpace) {
|
||||||
allocatedDiskSpace = freeDiskSpace
|
allocatedDiskSpace = freeDiskSpace
|
||||||
zap.S().Warnf("Disk space is less than requested allocated space, allocating = %d Bytes", allocatedDiskSpace)
|
zap.S().Warnf("Disk space is less than requested allocated space, allocating = %d Bytes", allocatedDiskSpace)
|
||||||
return &Server{DataDir: dataDir, DB: db, pkey: pkey, totalAllocated: allocatedDiskSpace, totalBwAllocated: allocatedBandwidth}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// on restarting the Piece node server, assuming already been working as a node
|
// on restarting the Piece node server, assuming already been working as a node
|
||||||
@ -145,7 +145,6 @@ func Initialize(ctx context.Context, config Config, pkey crypto.PrivateKey) (*Se
|
|||||||
// before restarting
|
// before restarting
|
||||||
if totalUsed >= allocatedDiskSpace {
|
if totalUsed >= allocatedDiskSpace {
|
||||||
zap.S().Warnf("Used more space then allocated, allocating = %d Bytes", allocatedDiskSpace)
|
zap.S().Warnf("Used more space then allocated, allocating = %d Bytes", allocatedDiskSpace)
|
||||||
return &Server{DataDir: dataDir, DB: db, pkey: pkey, totalAllocated: allocatedDiskSpace, totalBwAllocated: allocatedBandwidth}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// the available diskspace is less than remaining allocated space,
|
// the available diskspace is less than remaining allocated space,
|
||||||
@ -153,11 +152,28 @@ func Initialize(ctx context.Context, config Config, pkey crypto.PrivateKey) (*Se
|
|||||||
if freeDiskSpace < (allocatedDiskSpace - totalUsed) {
|
if freeDiskSpace < (allocatedDiskSpace - totalUsed) {
|
||||||
allocatedDiskSpace = freeDiskSpace
|
allocatedDiskSpace = freeDiskSpace
|
||||||
zap.S().Warnf("Disk space is less than requested allocated space, allocating = %d Bytes", allocatedDiskSpace)
|
zap.S().Warnf("Disk space is less than requested allocated space, allocating = %d Bytes", allocatedDiskSpace)
|
||||||
return &Server{DataDir: dataDir, DB: db, pkey: pkey, totalAllocated: allocatedDiskSpace, totalBwAllocated: allocatedBandwidth}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
signatureVerifier := auth.NewSignedMessageVerifier()
|
return &Server{
|
||||||
return &Server{DataDir: dataDir, DB: db, pkey: pkey, totalAllocated: allocatedDiskSpace, totalBwAllocated: allocatedBandwidth, verifier: signatureVerifier}, nil
|
DataDir: dataDir,
|
||||||
|
DB: db,
|
||||||
|
pkey: pkey,
|
||||||
|
totalAllocated: allocatedDiskSpace,
|
||||||
|
totalBwAllocated: allocatedBandwidth,
|
||||||
|
verifier: auth.NewSignedMessageVerifier(),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// New creates a Server with custom db
|
||||||
|
func New(dataDir string, db *psdb.DB, config Config, pkey crypto.PrivateKey) *Server {
|
||||||
|
return &Server{
|
||||||
|
DataDir: dataDir,
|
||||||
|
DB: db,
|
||||||
|
pkey: pkey,
|
||||||
|
totalAllocated: config.AllocatedDiskSpace,
|
||||||
|
totalBwAllocated: config.AllocatedBandwidth,
|
||||||
|
verifier: auth.NewSignedMessageVerifier(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop the piececstore node
|
// Stop the piececstore node
|
||||||
|
Loading…
Reference in New Issue
Block a user