storagenode/storagenodedb: refactor both data access objects and migrations to support multiple DB connections (#3057)

* Split the info.db database into multiple DBs using Backup API.

* Remove location. Prev refactor assumed we would need this but don't.

* Added VACUUM to reclaim space after splitting storage node databases.

* Added unique names to SQLite3 connection hooks to fix testplanet.

* Moving DB closing to the migration step.

* Removing the closing of the versions DB. It's already getting closed.

* Swapping the database connection references on reconnect.

* Moved sqlite closing logic away from the boltdb closing logic.

* Moved sqlite closing logic away from the boltdb closing logic.

* Remove certificate and vouchers from DB split migration.

* Removed vouchers and bumped up the migration version.

* Use same constructor in tests for storage node databases.

* Use same constructor in tests for storage node databases.

* Adding method to access underlining SQL database connections and cleanup

* Adding logging for migration diagnostics.

* Moved migration closing database logic to minimize disk usage.

* Cleaning up error handling.

* Fix missing copyright.

* Fix linting error.

* Add test for migration 21 (#3012)

* Refactoring migration code into a nicer to use object.

* Refactoring migration code into a nicer to use object.

* Fixing broken migration test.

* Removed unnecessary code that is no longer needed now that we close DBs.

* Removed unnecessary code that is no longer needed now that we close DBs.

* Fixed bug where an invalid database path was being opened.

* Fixed linting errors.

* Renamed VersionsDB to LegacyInfoDB and refactored DB lookup keys.

* Renamed VersionsDB to LegacyInfoDB and refactored DB lookup keys.

* Fix migration test. NOTE: This change does not address new tables satellites and satellite_exit_progress

* Removing v22 migration to move into it's own PR.

* Removing v22 migration to move into it's own PR.

* Refactored schema, rebind and configure functions to be re-useable.

* Renamed LegacyInfoDB to DeprecatedInfoDB.

* Cleaned up closeDatabase function.

* Renamed storageNodeSQLDB to migratableDB.

* Switched from using errs.Combine() to errs.Group in closeDatabases func.

* Removed constructors from storage node data access objects.

* Reformatted usage of const.

* Fixed broken test snapshots.

* Fixed linting error.
This commit is contained in:
Simon Guindon 2019-09-18 12:17:28 -04:00 committed by GitHub
parent 186e67e056
commit a2b1e9fa95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 283 additions and 265 deletions

View File

@ -21,22 +21,16 @@ import (
// ErrBandwidth represents errors from the bandwidthdb database.
var ErrBandwidth = errs.Class("bandwidthdb error")
// BandwidthDBName represents the database name.
const BandwidthDBName = "bandwidth"
type bandwidthDB struct {
// Moved to top of struct to resolve alignment issue with atomic operations on ARM
usedSpace int64
usedMu sync.RWMutex
usedSince time.Time
location string
SQLDB
}
// newBandwidthDB returns a new instance of usedSerials initialized with the specified database.
func newBandwidthDB(db SQLDB, location string) *bandwidthDB {
return &bandwidthDB{
location: location,
SQLDB: db,
}
migratableDB
}
// Add adds bandwidth usage to the table

View File

@ -8,12 +8,11 @@ import (
"database/sql"
"database/sql/driver"
"fmt"
"math/rand"
"os"
"path/filepath"
"time"
_ "github.com/mattn/go-sqlite3" // used indirectly
_ "github.com/mattn/go-sqlite3" // used indirectly.
"github.com/zeebo/errs"
"go.uber.org/zap"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
@ -24,7 +23,6 @@ import (
"storj.io/storj/storage"
"storj.io/storj/storage/boltdb"
"storj.io/storj/storage/filestore"
"storj.io/storj/storage/teststore"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/bandwidth"
"storj.io/storj/storagenode/orders"
@ -88,18 +86,22 @@ type DB struct {
Close() error
}
versionsDB *versionsDB
dbDirectory string
deprecatedInfoDB *deprecatedInfoDB
v0PieceInfoDB *v0PieceInfoDB
bandwidthDB *bandwidthDB
ordersDB *ordersDB
pieceExpirationDB *pieceExpirationDB
pieceSpaceUsedDB *pieceSpaceUsedDB
reputationDB *reputationDB
storageUsageDB *storageusageDB
storageUsageDB *storageUsageDB
usedSerialsDB *usedSerialsDB
satellitesDB *satellitesDB
kdb, ndb, adb storage.KeyValueStore
sqlDatabases map[string]*sql.DB
}
// New creates a new master database for storage node
@ -115,12 +117,6 @@ func New(log *zap.Logger, config Config) (*DB, error) {
return nil, err
}
versionsPath := config.Info2
versionsDB, err := openDatabase(versionsPath)
if err != nil {
return nil, err
}
db := &DB{
log: log,
pieces: pieces,
@ -128,95 +124,76 @@ func New(log *zap.Logger, config Config) (*DB, error) {
ndb: dbs[1],
adb: dbs[2],
// Initialize databases. Currently shares one info.db database file but
// in the future these will initialize their own database connections.
versionsDB: newVersionsDB(versionsDB, versionsPath),
v0PieceInfoDB: newV0PieceInfoDB(versionsDB, versionsPath),
bandwidthDB: newBandwidthDB(versionsDB, versionsPath),
ordersDB: newOrdersDB(versionsDB, versionsPath),
pieceExpirationDB: newPieceExpirationDB(versionsDB, versionsPath),
pieceSpaceUsedDB: newPieceSpaceUsedDB(versionsDB, versionsPath),
reputationDB: newReputationDB(versionsDB, versionsPath),
storageUsageDB: newStorageusageDB(versionsDB, versionsPath),
usedSerialsDB: newUsedSerialsDB(versionsDB, versionsPath),
satellitesDB: newSatellitesDB(versionsDB, versionsPath),
dbDirectory: filepath.Dir(config.Info2),
sqlDatabases: make(map[string]*sql.DB),
deprecatedInfoDB: &deprecatedInfoDB{},
v0PieceInfoDB: &v0PieceInfoDB{},
bandwidthDB: &bandwidthDB{},
ordersDB: &ordersDB{},
pieceExpirationDB: &pieceExpirationDB{},
pieceSpaceUsedDB: &pieceSpaceUsedDB{},
reputationDB: &reputationDB{},
storageUsageDB: &storageUsageDB{},
usedSerialsDB: &usedSerialsDB{},
satellitesDB: &satellitesDB{},
}
err = db.openDatabases()
if err != nil {
return nil, err
}
return db, nil
}
// NewTest creates new test database for storage node.
func NewTest(log *zap.Logger, storageDir string) (*DB, error) {
piecesDir, err := filestore.NewDir(storageDir)
// openDatabases opens all the SQLite3 storage node databases and returns if any fails to open successfully.
func (db *DB) openDatabases() error {
// These objects have a Configure method to allow setting the underlining SQLDB connection
// that each uses internally to do data access to the SQLite3 databases.
// The reason it was done this way was because there's some outside consumers that are
// taking a reference to the business object.
deprecatedInfoDB, err := db.openDatabase(DeprecatedInfoDBName)
if err != nil {
return nil, err
return errs.Combine(err, db.closeDatabases())
}
pieces := filestore.New(log, piecesDir)
versionsPath := ""
versionsDB, err := openTestDatabase()
if err != nil {
return nil, err
}
db := &DB{
log: log,
pieces: pieces,
kdb: teststore.New(),
ndb: teststore.New(),
adb: teststore.New(),
// Initialize databases. Currently shares one info.db database file but
// in the future these will initialize their own database connections.
versionsDB: newVersionsDB(versionsDB, versionsPath),
v0PieceInfoDB: newV0PieceInfoDB(versionsDB, versionsPath),
bandwidthDB: newBandwidthDB(versionsDB, versionsPath),
ordersDB: newOrdersDB(versionsDB, versionsPath),
pieceExpirationDB: newPieceExpirationDB(versionsDB, versionsPath),
pieceSpaceUsedDB: newPieceSpaceUsedDB(versionsDB, versionsPath),
reputationDB: newReputationDB(versionsDB, versionsPath),
storageUsageDB: newStorageusageDB(versionsDB, versionsPath),
usedSerialsDB: newUsedSerialsDB(versionsDB, versionsPath),
satellitesDB: newSatellitesDB(versionsDB, versionsPath),
}
return db, nil
db.deprecatedInfoDB.Configure(deprecatedInfoDB)
db.bandwidthDB.Configure(deprecatedInfoDB)
db.ordersDB.Configure(deprecatedInfoDB)
db.pieceExpirationDB.Configure(deprecatedInfoDB)
db.v0PieceInfoDB.Configure(deprecatedInfoDB)
db.pieceSpaceUsedDB.Configure(deprecatedInfoDB)
db.reputationDB.Configure(deprecatedInfoDB)
db.storageUsageDB.Configure(deprecatedInfoDB)
db.usedSerialsDB.Configure(deprecatedInfoDB)
db.satellitesDB.Configure(deprecatedInfoDB)
return nil
}
// openDatabase opens or creates a database at the specified path.
func openDatabase(path string) (*sql.DB, error) {
func (db *DB) openDatabase(dbName string) (*sql.DB, error) {
path := filepath.Join(db.dbDirectory, db.FilenameFromDBName(dbName))
if err := os.MkdirAll(filepath.Dir(path), 0700); err != nil {
return nil, err
}
db, err := sql.Open("sqlite3", "file:"+path+"?_journal=WAL&_busy_timeout=10000")
sqlDB, err := sql.Open("sqlite3", "file:"+path+"?_journal=WAL&_busy_timeout=10000")
if err != nil {
return nil, ErrDatabase.Wrap(err)
}
dbutil.Configure(db, mon)
return db, nil
// This isn't safe for concurrent access but we don't currently access this map concurrently.
// If we do in the future it needs some protection.
db.sqlDatabases[dbName] = sqlDB
dbutil.Configure(sqlDB, mon)
db.log.Debug(fmt.Sprintf("opened database %s", dbName))
return sqlDB, nil
}
// // openTestDatabase creates an in memory database.
func openTestDatabase() (*sql.DB, error) {
// create memory DB with a shared cache and a unique name to avoid collisions
db, err := sql.Open("sqlite3", fmt.Sprintf("file:memdb%d?mode=memory&cache=shared", rand.Int63()))
if err != nil {
return nil, ErrDatabase.Wrap(err)
}
// Set max idle and max open to 1 to control concurrent access to the memory DB
// Setting max open higher than 1 results in table locked errors
db.SetMaxIdleConns(1)
db.SetMaxOpenConns(1)
db.SetConnMaxLifetime(-1)
mon.Chain("db_stats", monkit.StatSourceFunc(
func(cb func(name string, val float64)) {
monkit.StatSourceFromStruct(db.Stats()).Stats(cb)
}))
return db, nil
// FilenameFromDBName returns a constructed filename for the specified database name.
func (db *DB) FilenameFromDBName(dbName string) string {
return dbName + ".db"
}
// CreateTables creates any necessary tables.
@ -232,22 +209,33 @@ func (db *DB) Close() error {
db.ndb.Close(),
db.adb.Close(),
db.versionsDB.Close(),
db.v0PieceInfoDB.Close(),
db.bandwidthDB.Close(),
db.ordersDB.Close(),
db.pieceExpirationDB.Close(),
db.pieceSpaceUsedDB.Close(),
db.reputationDB.Close(),
db.storageUsageDB.Close(),
db.usedSerialsDB.Close(),
db.satellitesDB.Close(),
db.closeDatabases(),
)
}
// Versions returns the instance of the versions database.
func (db *DB) Versions() SQLDB {
return db.versionsDB
// closeDatabases closes all the SQLite database connections and removes them from the associated maps.
func (db *DB) closeDatabases() error {
var errlist errs.Group
for k := range db.sqlDatabases {
errlist.Add(db.closeDatabase(k))
}
return errlist.Err()
}
// closeDatabase closes the specified SQLite database connections and removes them from the associated maps.
func (db *DB) closeDatabase(dbName string) (err error) {
conn, ok := db.sqlDatabases[dbName]
if !ok {
return ErrDatabase.New("double close on database %s", dbName)
}
delete(db.sqlDatabases, dbName)
return ErrDatabase.Wrap(conn.Close())
}
// DeprecatedInfoDB returns the instance of the versions database.
func (db *DB) DeprecatedInfoDB() SQLDB {
return db.deprecatedInfoDB
}
// V0PieceInfo returns the instance of the V0PieceInfoDB database.
@ -303,7 +291,16 @@ func (db *DB) RoutingTable() (kdb, ndb, adb storage.KeyValueStore) {
// RawDatabases are required for testing purposes
func (db *DB) RawDatabases() map[string]SQLDB {
return map[string]SQLDB{
"versions": db.versionsDB,
BandwidthDBName: db.bandwidthDB,
OrdersDBName: db.ordersDB,
PieceExpirationDBName: db.pieceExpirationDB,
PieceSpaceUsedDBName: db.pieceSpaceUsedDB,
ReputationDBName: db.reputationDB,
StorageUsageDBName: db.storageUsageDB,
UsedSerialsDBName: db.usedSerialsDB,
PieceInfoDBName: db.v0PieceInfoDB,
DeprecatedInfoDBName: db.deprecatedInfoDB,
SatellitesDBName: db.satellitesDB,
}
}
@ -313,7 +310,7 @@ func (db *DB) Migration() *migrate.Migration {
Table: "versions",
Steps: []*migrate.Step{
{
DB: db.versionsDB,
DB: db.deprecatedInfoDB,
Description: "Initial setup",
Version: 0,
Action: migrate.SQL{
@ -395,7 +392,7 @@ func (db *DB) Migration() *migrate.Migration {
},
},
{
DB: db.versionsDB,
DB: db.deprecatedInfoDB,
Description: "Network Wipe #2",
Version: 1,
Action: migrate.SQL{
@ -403,7 +400,7 @@ func (db *DB) Migration() *migrate.Migration {
},
},
{
DB: db.versionsDB,
DB: db.deprecatedInfoDB,
Description: "Add tracking of deletion failures.",
Version: 2,
Action: migrate.SQL{
@ -411,7 +408,7 @@ func (db *DB) Migration() *migrate.Migration {
},
},
{
DB: db.versionsDB,
DB: db.deprecatedInfoDB,
Description: "Add vouchersDB for storing and retrieving vouchers.",
Version: 3,
Action: migrate.SQL{
@ -423,7 +420,7 @@ func (db *DB) Migration() *migrate.Migration {
},
},
{
DB: db.versionsDB,
DB: db.deprecatedInfoDB,
Description: "Add index on pieceinfo expireation",
Version: 4,
Action: migrate.SQL{
@ -432,7 +429,7 @@ func (db *DB) Migration() *migrate.Migration {
},
},
{
DB: db.versionsDB,
DB: db.deprecatedInfoDB,
Description: "Partial Network Wipe - Tardigrade Satellites",
Version: 5,
Action: migrate.SQL{
@ -444,7 +441,7 @@ func (db *DB) Migration() *migrate.Migration {
},
},
{
DB: db.versionsDB,
DB: db.deprecatedInfoDB,
Description: "Add creation date.",
Version: 6,
Action: migrate.SQL{
@ -452,7 +449,7 @@ func (db *DB) Migration() *migrate.Migration {
},
},
{
DB: db.versionsDB,
DB: db.deprecatedInfoDB,
Description: "Drop certificate table.",
Version: 7,
Action: migrate.SQL{
@ -461,7 +458,7 @@ func (db *DB) Migration() *migrate.Migration {
},
},
{
DB: db.versionsDB,
DB: db.deprecatedInfoDB,
Description: "Drop old used serials and remove pieceinfo_deletion_failed index.",
Version: 8,
Action: migrate.SQL{
@ -470,7 +467,7 @@ func (db *DB) Migration() *migrate.Migration {
},
},
{
DB: db.versionsDB,
DB: db.deprecatedInfoDB,
Description: "Add order limit table.",
Version: 9,
Action: migrate.SQL{
@ -478,7 +475,7 @@ func (db *DB) Migration() *migrate.Migration {
},
},
{
DB: db.versionsDB,
DB: db.deprecatedInfoDB,
Description: "Optimize index usage.",
Version: 10,
Action: migrate.SQL{
@ -489,7 +486,7 @@ func (db *DB) Migration() *migrate.Migration {
},
},
{
DB: db.versionsDB,
DB: db.deprecatedInfoDB,
Description: "Create bandwidth_usage_rollup table.",
Version: 11,
Action: migrate.SQL{
@ -503,7 +500,7 @@ func (db *DB) Migration() *migrate.Migration {
},
},
{
DB: db.versionsDB,
DB: db.deprecatedInfoDB,
Description: "Clear Tables from Alpha data",
Version: 12,
Action: migrate.SQL{
@ -551,28 +548,23 @@ func (db *DB) Migration() *migrate.Migration {
},
},
{
DB: db.versionsDB,
DB: db.deprecatedInfoDB,
Description: "Free Storagenodes from trash data",
Version: 13,
Action: migrate.Func(func(log *zap.Logger, mgdb migrate.DB, tx *sql.Tx) error {
// When using inmemory DB, skip deletion process
if db.versionsDB.location == "" {
return nil
}
err := os.RemoveAll(filepath.Join(filepath.Dir(db.versionsDB.location), "blob/ukfu6bhbboxilvt7jrwlqk7y2tapb5d2r2tsmj2sjxvw5qaaaaaa")) // us-central1
err := os.RemoveAll(filepath.Join(db.dbDirectory, "blob/ukfu6bhbboxilvt7jrwlqk7y2tapb5d2r2tsmj2sjxvw5qaaaaaa")) // us-central1
if err != nil {
log.Sugar().Debug(err)
}
err = os.RemoveAll(filepath.Join(filepath.Dir(db.versionsDB.location), "blob/v4weeab67sbgvnbwd5z7tweqsqqun7qox2agpbxy44mqqaaaaaaa")) // europe-west1
err = os.RemoveAll(filepath.Join(db.dbDirectory, "blob/v4weeab67sbgvnbwd5z7tweqsqqun7qox2agpbxy44mqqaaaaaaa")) // europe-west1
if err != nil {
log.Sugar().Debug(err)
}
err = os.RemoveAll(filepath.Join(filepath.Dir(db.versionsDB.location), "blob/qstuylguhrn2ozjv4h2c6xpxykd622gtgurhql2k7k75wqaaaaaa")) // asia-east1
err = os.RemoveAll(filepath.Join(db.dbDirectory, "blob/qstuylguhrn2ozjv4h2c6xpxykd622gtgurhql2k7k75wqaaaaaa")) // asia-east1
if err != nil {
log.Sugar().Debug(err)
}
err = os.RemoveAll(filepath.Join(filepath.Dir(db.versionsDB.location), "blob/abforhuxbzyd35blusvrifvdwmfx4hmocsva4vmpp3rgqaaaaaaa")) // "tothemoon (stefan)"
err = os.RemoveAll(filepath.Join(db.dbDirectory, "blob/abforhuxbzyd35blusvrifvdwmfx4hmocsva4vmpp3rgqaaaaaaa")) // "tothemoon (stefan)"
if err != nil {
log.Sugar().Debug(err)
}
@ -581,16 +573,11 @@ func (db *DB) Migration() *migrate.Migration {
}),
},
{
DB: db.versionsDB,
DB: db.deprecatedInfoDB,
Description: "Free Storagenodes from orphaned tmp data",
Version: 14,
Action: migrate.Func(func(log *zap.Logger, mgdb migrate.DB, tx *sql.Tx) error {
// When using inmemory DB, skip deletion process
if db.versionsDB.location == "" {
return nil
}
err := os.RemoveAll(filepath.Join(filepath.Dir(db.versionsDB.location), "tmp"))
err := os.RemoveAll(filepath.Join(db.dbDirectory, "tmp"))
if err != nil {
log.Sugar().Debug(err)
}
@ -599,7 +586,7 @@ func (db *DB) Migration() *migrate.Migration {
}),
},
{
DB: db.versionsDB,
DB: db.deprecatedInfoDB,
Description: "Start piece_expirations table, deprecate pieceinfo table",
Version: 15,
Action: migrate.SQL{
@ -616,7 +603,7 @@ func (db *DB) Migration() *migrate.Migration {
},
},
{
DB: db.versionsDB,
DB: db.deprecatedInfoDB,
Description: "Add reputation and storage usage cache tables",
Version: 16,
Action: migrate.SQL{
@ -644,7 +631,7 @@ func (db *DB) Migration() *migrate.Migration {
},
},
{
DB: db.versionsDB,
DB: db.deprecatedInfoDB,
Description: "Create piece_space_used table",
Version: 17,
Action: migrate.SQL{
@ -658,7 +645,7 @@ func (db *DB) Migration() *migrate.Migration {
},
},
{
DB: db.versionsDB,
DB: db.deprecatedInfoDB,
Description: "Drop vouchers table",
Version: 18,
Action: migrate.SQL{
@ -666,7 +653,7 @@ func (db *DB) Migration() *migrate.Migration {
},
},
{
DB: db.versionsDB,
DB: db.deprecatedInfoDB,
Description: "Add disqualified field to reputation",
Version: 19,
Action: migrate.SQL{
@ -690,7 +677,7 @@ func (db *DB) Migration() *migrate.Migration {
},
},
{
DB: db.versionsDB,
DB: db.deprecatedInfoDB,
Description: "Empty storage_usage table, rename storage_usage.timestamp to interval_start",
Version: 20,
Action: migrate.SQL{
@ -704,7 +691,7 @@ func (db *DB) Migration() *migrate.Migration {
},
},
{
DB: db.versionsDB,
DB: db.deprecatedInfoDB,
Description: "Create satellites table and satellites_exit_progress table",
Version: 21,
Action: migrate.SQL{

View File

@ -0,0 +1,12 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package storagenodedb
// DeprecatedInfoDBName represents the database name.
const DeprecatedInfoDBName = "info"
// deprecatedInfoDB represents the database that contains the original legacy sqlite3 database.
type deprecatedInfoDB struct {
migratableDB
}

View File

@ -3,25 +3,25 @@
package storagenodedb
// versions represents the database that contains the database schema version history.
type versionsDB struct {
location string
type migratableDB struct {
SQLDB
}
func newVersionsDB(db SQLDB, location string) *versionsDB {
return &versionsDB{
location: location,
SQLDB: db,
}
// Schema returns schema
// These are implemented because the migrate.DB interface requires them.
// Maybe in the future we should untangle those.
func (db *migratableDB) Schema() string {
return ""
}
// Rebind rebind parameters
// These are implemented because the migrate.DB interface requires them.
// Maybe in the future we should untangle those.
func (db *versionsDB) Rebind(s string) string { return s }
func (db *migratableDB) Rebind(s string) string {
return s
}
// Schema returns schema
// These are implemented because the migrate.DB interface requires them.
// Maybe in the future we should untangle those.
func (db *versionsDB) Schema() string { return "" }
// Configure sets the underlining SQLDB connection.
func (db *migratableDB) Configure(sqlDB SQLDB) {
db.SQLDB = sqlDB
}

View File

@ -5,6 +5,7 @@ package storagenodedb_test
import (
"fmt"
"path/filepath"
"testing"
"github.com/stretchr/testify/require"
@ -77,10 +78,13 @@ func TestMigrate(t *testing.T) {
log := zaptest.NewLogger(t)
storageDir := ctx.Dir("storage")
cfg := storagenodedb.Config{
Pieces: ctx.Dir("storage"),
Info2: ctx.Dir("storage") + "/info.db",
Kademlia: ctx.Dir("storage") + "/kademlia",
Pieces: storageDir,
Storage: storageDir,
Info: filepath.Join(storageDir, "piecestore.db"),
Info2: filepath.Join(storageDir, "info.db"),
Kademlia: filepath.Join(storageDir, "kademlia"),
}
// create a new satellitedb connection
@ -121,6 +125,17 @@ func TestMigrate(t *testing.T) {
// verify schema and data for each db in the expected snapshot
for dbName, dbSnapshot := range multiDBSnapshot.DBSnapshots {
// If the tables and indexes of the schema are empty, that's
// semantically the same as nil. Set to nil explicitly to help with
// comparison to snapshot.
schema, ok := schemas[dbName]
if ok && len(schema.Tables) == 0 {
schema.Tables = nil
}
if ok && len(schema.Indexes) == 0 {
schema.Indexes = nil
}
require.Equal(t, dbSnapshot.Schema, schemas[dbName], tag)
require.Equal(t, dbSnapshot.Data, data[dbName], tag)
}

View File

@ -19,17 +19,11 @@ import (
// ErrOrders represents errors from the ordersdb database.
var ErrOrders = errs.Class("ordersdb error")
type ordersDB struct {
location string
SQLDB
}
// OrdersDBName represents the database name.
const OrdersDBName = "orders"
// newOrdersDB returns a new instance of ordersdb initialized with the specified database.
func newOrdersDB(db SQLDB, location string) *ordersDB {
return &ordersDB{
location: location,
SQLDB: db,
}
type ordersDB struct {
migratableDB
}
// Enqueue inserts order to the unsent list

View File

@ -16,17 +16,11 @@ import (
// ErrPieceExpiration represents errors from the piece expiration database.
var ErrPieceExpiration = errs.Class("piece expiration error")
type pieceExpirationDB struct {
location string
SQLDB
}
// PieceExpirationDBName represents the database filename.
const PieceExpirationDBName = "piece_expiration"
// newPieceExpirationDB returns a new instance of pieceExpirationDB initialized with the specified database.
func newPieceExpirationDB(db SQLDB, location string) *pieceExpirationDB {
return &pieceExpirationDB{
location: location,
SQLDB: db,
}
type pieceExpirationDB struct {
migratableDB
}
// GetExpired gets piece IDs that expire or have expired before the given time

View File

@ -21,17 +21,11 @@ import (
// ErrPieceInfo represents errors from the piece info database.
var ErrPieceInfo = errs.Class("v0pieceinfodb error")
type v0PieceInfoDB struct {
location string
SQLDB
}
// PieceInfoDBName represents the database name.
const PieceInfoDBName = "pieceinfo"
// newV0PieceInfoDB returns a new instance of pieceinfo initialized with the specified database.
func newV0PieceInfoDB(db SQLDB, location string) *v0PieceInfoDB {
return &v0PieceInfoDB{
location: location,
SQLDB: db,
}
type v0PieceInfoDB struct {
migratableDB
}
// Add inserts piece information into the database.

View File

@ -15,17 +15,11 @@ import (
// ErrPieceSpaceUsed represents errors from the piece spaced used database.
var ErrPieceSpaceUsed = errs.Class("piece space used error")
type pieceSpaceUsedDB struct {
location string
SQLDB
}
// PieceSpaceUsedDBName represents the database name.
const PieceSpaceUsedDBName = "piece_spaced_used"
// newPieceSpaceUsedDB returns a new instance of pieceSpaceUsedDB initialized with the specified database.
func newPieceSpaceUsedDB(db SQLDB, location string) *pieceSpaceUsedDB {
return &pieceSpaceUsedDB{
location: location,
SQLDB: db,
}
type pieceSpaceUsedDB struct {
migratableDB
}
// Init creates the one total record if it doesn't already exist

View File

@ -16,18 +16,12 @@ import (
// ErrReputation represents errors from the reputation database.
var ErrReputation = errs.Class("reputation error")
// ReputationDBName represents the database name.
const ReputationDBName = "reputation"
// reputation works with node reputation DB
type reputationDB struct {
location string
SQLDB
}
// newReputationDB returns a new instance of reputationDB initialized with the specified database.
func newReputationDB(db SQLDB, location string) *reputationDB {
return &reputationDB{
location: location,
SQLDB: db,
}
migratableDB
}
// Store inserts or updates reputation stats into the db.

View File

@ -10,16 +10,10 @@ import (
// ErrSatellitesDB represents errors from the satellites database.
var ErrSatellitesDB = errs.Class("satellitesdb error")
// SatellitesDBName represents the database name.
const SatellitesDBName = "satellites"
// reputation works with node reputation DB
type satellitesDB struct {
location string
SQLDB
}
// newSatellitesDB returns a new instance of satellitesDB initialized with the specified database.
func newSatellitesDB(db SQLDB, location string) *satellitesDB {
return &satellitesDB{
location: location,
SQLDB: db,
}
migratableDB
}

View File

@ -6,6 +6,7 @@ package storagenodedbtest
// This package should be referenced only in test files!
import (
"path/filepath"
"testing"
"go.uber.org/zap/zaptest"
@ -25,7 +26,16 @@ func Run(t *testing.T, test func(t *testing.T, db storagenode.DB)) {
log := zaptest.NewLogger(t)
db, err := storagenodedb.NewTest(log, ctx.Dir("storage"))
storageDir := ctx.Dir("storage")
cfg := storagenodedb.Config{
Storage: storageDir,
Info: filepath.Join(storageDir, "piecestore.db"),
Info2: filepath.Join(storageDir, "info.db"),
Pieces: storageDir,
Kademlia: filepath.Join(storageDir, "kad.db"),
}
db, err := storagenodedb.New(log, cfg)
if err != nil {
t.Fatal(err)
}

View File

@ -4,6 +4,7 @@
package storagenodedbtest_test
import (
"path/filepath"
"runtime"
"sync"
"testing"
@ -55,7 +56,16 @@ func TestInMemoryConcurrency(t *testing.T) {
log := zaptest.NewLogger(t)
db, err := storagenodedb.NewTest(log, ctx.Dir("storage"))
storageDir := ctx.Dir("storage")
cfg := storagenodedb.Config{
Pieces: storageDir,
Storage: storageDir,
Info: filepath.Join(storageDir, "piecestore.db"),
Info2: filepath.Join(storageDir, "info.db"),
Kademlia: filepath.Join(storageDir, "kademlia"),
}
db, err := storagenodedb.New(log, cfg)
if err != nil {
t.Fatal(err)
}

View File

@ -14,22 +14,16 @@ import (
"storj.io/storj/storagenode/storageusage"
)
// storageusageDB storage usage DB
type storageusageDB struct {
location string
SQLDB
}
// StorageUsageDBName represents the database name.
const StorageUsageDBName = "storage_usage"
// newStorageusageDB returns a new instance of storageusageDB initialized with the specified database.
func newStorageusageDB(db SQLDB, location string) *storageusageDB {
return &storageusageDB{
location: location,
SQLDB: db,
}
// storageUsageDB storage usage DB
type storageUsageDB struct {
migratableDB
}
// Store stores storage usage stamps to db replacing conflicting entries
func (db *storageusageDB) Store(ctx context.Context, stamps []storageusage.Stamp) (err error) {
func (db *storageUsageDB) Store(ctx context.Context, stamps []storageusage.Stamp) (err error) {
defer mon.Task()(&ctx)(&err)
if len(stamps) == 0 {
@ -54,7 +48,7 @@ func (db *storageusageDB) Store(ctx context.Context, stamps []storageusage.Stamp
// GetDaily returns daily storage usage stamps for particular satellite
// for provided time range
func (db *storageusageDB) GetDaily(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) (_ []storageusage.Stamp, err error) {
func (db *storageUsageDB) GetDaily(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) (_ []storageusage.Stamp, err error) {
defer mon.Task()(&ctx)(&err)
query := `SELECT satellite_id,
@ -98,7 +92,7 @@ func (db *storageusageDB) GetDaily(ctx context.Context, satelliteID storj.NodeID
// GetDailyTotal returns daily storage usage stamps summed across all known satellites
// for provided time range
func (db *storageusageDB) GetDailyTotal(ctx context.Context, from, to time.Time) (_ []storageusage.Stamp, err error) {
func (db *storageUsageDB) GetDailyTotal(ctx context.Context, from, to time.Time) (_ []storageusage.Stamp, err error) {
defer mon.Task()(&ctx)(&err)
query := `SELECT SUM(at_rest_total), interval_start
@ -136,7 +130,7 @@ func (db *storageusageDB) GetDailyTotal(ctx context.Context, from, to time.Time)
}
// Summary returns aggregated storage usage across all satellites.
func (db *storageusageDB) Summary(ctx context.Context, from, to time.Time) (_ float64, err error) {
func (db *storageUsageDB) Summary(ctx context.Context, from, to time.Time) (_ float64, err error) {
defer mon.Task()(&ctx, from, to)(&err)
var summary sql.NullFloat64
@ -149,7 +143,7 @@ func (db *storageusageDB) Summary(ctx context.Context, from, to time.Time) (_ fl
}
// SatelliteSummary returns aggregated storage usage for a particular satellite.
func (db *storageusageDB) SatelliteSummary(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) (_ float64, err error) {
func (db *storageUsageDB) SatelliteSummary(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) (_ float64, err error) {
defer mon.Task()(&ctx, satelliteID, from, to)(&err)
var summary sql.NullFloat64
@ -163,7 +157,7 @@ func (db *storageusageDB) SatelliteSummary(ctx context.Context, satelliteID stor
}
// withTx is a helper method which executes callback in transaction scope
func (db *storageusageDB) withTx(ctx context.Context, cb func(tx *sql.Tx) error) error {
func (db *storageUsageDB) withTx(ctx context.Context, cb func(tx *sql.Tx) error) error {
tx, err := db.Begin()
if err != nil {
return err

View File

@ -3,10 +3,12 @@
package testdata
import "storj.io/storj/storagenode/storagenodedb"
var v0 = MultiDBState{
Version: 0,
DBStates: DBStates{
"versions": &DBState{
storagenodedb.DeprecatedInfoDBName: &DBState{
SQL: `-- table for keeping serials that need to be verified against
CREATE TABLE used_serial (
satellite_id BLOB NOT NULL,

View File

@ -3,10 +3,12 @@
package testdata
import "storj.io/storj/storagenode/storagenodedb"
var v1 = MultiDBState{
Version: 1,
DBStates: DBStates{
"versions": &DBState{
storagenodedb.DeprecatedInfoDBName: &DBState{
SQL: `
-- table for keeping serials that need to be verified against
CREATE TABLE used_serial (

View File

@ -3,10 +3,12 @@
package testdata
import "storj.io/storj/storagenode/storagenodedb"
var v10 = MultiDBState{
Version: 10,
DBStates: DBStates{
"versions": &DBState{
storagenodedb.DeprecatedInfoDBName: &DBState{
SQL: `
-- table for keeping serials that need to be verified against
CREATE TABLE used_serial (

View File

@ -3,10 +3,12 @@
package testdata
import "storj.io/storj/storagenode/storagenodedb"
var v11 = MultiDBState{
Version: 11,
DBStates: DBStates{
"versions": &DBState{
storagenodedb.DeprecatedInfoDBName: &DBState{
SQL: `
-- table for keeping serials that need to be verified against
CREATE TABLE used_serial (

View File

@ -3,10 +3,12 @@
package testdata
import "storj.io/storj/storagenode/storagenodedb"
var v12 = MultiDBState{
Version: 12,
DBStates: DBStates{
"versions": &DBState{
storagenodedb.DeprecatedInfoDBName: &DBState{
SQL: `
-- table for keeping serials that need to be verified against
CREATE TABLE used_serial_ (

View File

@ -3,10 +3,12 @@
package testdata
import "storj.io/storj/storagenode/storagenodedb"
var v15 = MultiDBState{
Version: 15,
DBStates: DBStates{
"versions": &DBState{
storagenodedb.DeprecatedInfoDBName: &DBState{
SQL: `
-- table for keeping serials that need to be verified against
CREATE TABLE used_serial_ (

View File

@ -3,10 +3,12 @@
package testdata
import "storj.io/storj/storagenode/storagenodedb"
var v16 = MultiDBState{
Version: 16,
DBStates: DBStates{
"versions": &DBState{
storagenodedb.DeprecatedInfoDBName: &DBState{
SQL: `
-- table for keeping serials that need to be verified against
CREATE TABLE used_serial_ (

View File

@ -3,10 +3,12 @@
package testdata
import "storj.io/storj/storagenode/storagenodedb"
var v17 = MultiDBState{
Version: 17,
DBStates: DBStates{
"versions": &DBState{
storagenodedb.DeprecatedInfoDBName: &DBState{
SQL: `
-- table for keeping serials that need to be verified against
CREATE TABLE used_serial_ (

View File

@ -3,10 +3,12 @@
package testdata
import "storj.io/storj/storagenode/storagenodedb"
var v18 = MultiDBState{
Version: 18,
DBStates: DBStates{
"versions": &DBState{
storagenodedb.DeprecatedInfoDBName: &DBState{
SQL: `
-- table for keeping serials that need to be verified against
CREATE TABLE used_serial_ (

View File

@ -3,10 +3,12 @@
package testdata
import "storj.io/storj/storagenode/storagenodedb"
var v19 = MultiDBState{
Version: 19,
DBStates: DBStates{
"versions": &DBState{
storagenodedb.DeprecatedInfoDBName: &DBState{
SQL: `
-- table for keeping serials that need to be verified against
CREATE TABLE used_serial_ (

View File

@ -3,10 +3,12 @@
package testdata
import "storj.io/storj/storagenode/storagenodedb"
var v2 = MultiDBState{
Version: 2,
DBStates: DBStates{
"versions": &DBState{
storagenodedb.DeprecatedInfoDBName: &DBState{
SQL: `
-- table for keeping serials that need to be verified against
CREATE TABLE used_serial (

View File

@ -3,10 +3,12 @@
package testdata
import "storj.io/storj/storagenode/storagenodedb"
var v20 = MultiDBState{
Version: 20,
DBStates: DBStates{
"versions": &DBState{
storagenodedb.DeprecatedInfoDBName: &DBState{
SQL: `
-- table for keeping serials that need to be verified against
CREATE TABLE used_serial_ (

View File

@ -3,10 +3,12 @@
package testdata
import "storj.io/storj/storagenode/storagenodedb"
var v21 = MultiDBState{
Version: 21,
DBStates: DBStates{
"versions": &DBState{
storagenodedb.DeprecatedInfoDBName: &DBState{
SQL: `
-- table for keeping serials that need to be verified against
CREATE TABLE used_serial_ (

View File

@ -3,10 +3,12 @@
package testdata
import "storj.io/storj/storagenode/storagenodedb"
var v3 = MultiDBState{
Version: 3,
DBStates: DBStates{
"versions": &DBState{
storagenodedb.DeprecatedInfoDBName: &DBState{
SQL: `
-- table for keeping serials that need to be verified against
CREATE TABLE used_serial (

View File

@ -3,10 +3,12 @@
package testdata
import "storj.io/storj/storagenode/storagenodedb"
var v4 = MultiDBState{
Version: 4,
DBStates: DBStates{
"versions": &DBState{
storagenodedb.DeprecatedInfoDBName: &DBState{
SQL: `
-- table for keeping serials that need to be verified against
CREATE TABLE used_serial (

View File

@ -3,10 +3,12 @@
package testdata
import "storj.io/storj/storagenode/storagenodedb"
var v6 = MultiDBState{
Version: 6,
DBStates: DBStates{
"versions": &DBState{
storagenodedb.DeprecatedInfoDBName: &DBState{
SQL: `
-- table for keeping serials that need to be verified against
CREATE TABLE used_serial (

View File

@ -3,10 +3,12 @@
package testdata
import "storj.io/storj/storagenode/storagenodedb"
var v7 = MultiDBState{
Version: 7,
DBStates: DBStates{
"versions": &DBState{
storagenodedb.DeprecatedInfoDBName: &DBState{
SQL: `
-- table for keeping serials that need to be verified against
CREATE TABLE used_serial (

View File

@ -3,10 +3,12 @@
package testdata
import "storj.io/storj/storagenode/storagenodedb"
var v8 = MultiDBState{
Version: 8,
DBStates: DBStates{
"versions": &DBState{
storagenodedb.DeprecatedInfoDBName: &DBState{
SQL: `
-- table for keeping serials that need to be verified against
CREATE TABLE used_serial (

View File

@ -3,10 +3,12 @@
package testdata
import "storj.io/storj/storagenode/storagenodedb"
var v9 = MultiDBState{
Version: 9,
DBStates: DBStates{
"versions": &DBState{
storagenodedb.DeprecatedInfoDBName: &DBState{
SQL: `
-- table for keeping serials that need to be verified against
CREATE TABLE used_serial (

View File

@ -16,17 +16,11 @@ import (
// ErrUsedSerials represents errors from the used serials database.
var ErrUsedSerials = errs.Class("usedserialsdb error")
type usedSerialsDB struct {
location string
SQLDB
}
// UsedSerialsDBName represents the database name.
const UsedSerialsDBName = "used_serial"
// newUsedSerialsDB returns a new instance of usedSerials initialized with the specified database.
func newUsedSerialsDB(db SQLDB, location string) *usedSerialsDB {
return &usedSerialsDB{
location: location,
SQLDB: db,
}
type usedSerialsDB struct {
migratableDB
}
// Add adds a serial to the database.