satellite: support pointing db components at different databases

the immediate need is to be able to move the repair queue back out
of cockroach if we can't save it.

Change-Id: If26001a4e6804f6bb8713b4aee7e4fd6254dc326
This commit is contained in:
JT Olio 2020-11-28 09:23:39 -07:00 committed by Stefan Benten
parent 75f0f713a3
commit 0ba516d405
10 changed files with 267 additions and 56 deletions

54
private/dbutil/mapping.go Normal file
View File

@ -0,0 +1,54 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package dbutil
import (
"fmt"
"strings"
)
// EscapableCommaSplit is like strings.Split(x, ","), but if
// it sees two ','s in a row, it will treat them like one
// unsplit comma. So "hello,there,,friend" will result in
// ["hello", "there,friend"].
func EscapableCommaSplit(val string) []string {
bytes := []byte(val)
var vals []string
current := make([]byte, 0, len(bytes))
for i := 0; i < len(bytes); i++ {
char := bytes[i]
if char == ',' {
if i < len(bytes)-1 && bytes[i+1] == ',' {
current = append(current, ',')
i++
} else {
vals = append(vals, string(current))
current = nil
}
} else {
current = append(current, char)
}
}
vals = append(vals, string(current))
return vals
}
// ParseDBMapping parses a mapping of database connection URLs, preceded
// by the default URL. An example that overrides the repairqueue looks like:
// cockroach://user:pw@host/database,repairqueue:postgres://user:pw@host/database.
// The default is stored in "".
func ParseDBMapping(urlSpec string) (map[string]string, error) {
parts := EscapableCommaSplit(urlSpec)
rv := map[string]string{
"": parts[0],
}
for _, other := range parts[1:] {
override := strings.SplitN(other, ":", 2)
if len(override) != 2 || strings.HasPrefix(override[1], "/") {
return nil, fmt.Errorf("invalid db mapping spec: %q", urlSpec)
}
rv[override[0]] = override[1]
}
return rv, nil
}

View File

@ -0,0 +1,52 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package dbutil
import (
"fmt"
"testing"
"github.com/stretchr/testify/require"
)
func TestEscapableCommaSplit(t *testing.T) {
for _, testcase := range []struct {
input string
expected []string
}{
{"", []string{""}},
{",", []string{"", ""}},
{",hello", []string{"", "hello"}},
{"hello,", []string{"hello", ""}},
{"hello,there", []string{"hello", "there"}},
{"hello,,there", []string{"hello,there"}},
{",,hello", []string{",hello"}},
{"hello,,", []string{"hello,"}},
{"hello,,,there", []string{"hello,", "there"}},
{"hello,,,,there", []string{"hello,,there"}},
} {
require.Equal(t, testcase.expected, EscapableCommaSplit(testcase.input))
}
}
func TestParseDBMapping(t *testing.T) {
for _, testcase := range []struct {
input string
expected map[string]string
err error
}{
{"db://host", map[string]string{"": "db://host"}, nil},
{"db://host,override:db2://host2/db,,name",
map[string]string{"": "db://host", "override": "db2://host2/db,name"}, nil},
{"db://host,db2://host2", nil,
fmt.Errorf("invalid db mapping spec: %q", "db://host,db2://host2")},
} {
actual, err := ParseDBMapping(testcase.input)
if testcase.err != nil {
require.Equal(t, testcase.err, err)
} else {
require.Equal(t, testcase.expected, actual)
}
}
}

View File

@ -12,6 +12,7 @@ import (
"storj.io/common/identity"
"storj.io/private/debug"
"storj.io/storj/pkg/server"
"storj.io/storj/private/migrate"
version_checker "storj.io/storj/private/version/checker"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/accounting/live"
@ -47,6 +48,7 @@ import (
"storj.io/storj/satellite/repair/repairer"
"storj.io/storj/satellite/revocation"
"storj.io/storj/satellite/rewards"
"storj.io/storj/satellite/satellitedb/dbx"
"storj.io/storj/satellite/snopayout"
)
@ -69,6 +71,12 @@ type DB interface {
// TestingMigrateToLatest initializes the database for testplanet.
TestingMigrateToLatest(ctx context.Context) error
// MigrationTestingDefaultDB assists in testing migrations themselves
// against the default database.
MigrationTestingDefaultDB() interface {
TestDBAccess() *dbx.DB
PostgresMigration() *migrate.Migration
}
// PeerIdentities returns a storage for peer identities
PeerIdentities() overlay.PeerIdentities

View File

@ -8,6 +8,7 @@ import (
"time"
"storj.io/storj/satellite/internalpb"
"storj.io/storj/satellite/satellitedb/dbx"
)
// RepairQueue implements queueing for segments that need repairing.
@ -27,4 +28,8 @@ type RepairQueue interface {
SelectN(ctx context.Context, limit int) ([]internalpb.InjuredSegment, error)
// Count counts the number of segments in the repair queue.
Count(ctx context.Context) (count int, err error)
// TestDBAccess returns raw RepairQueue database access for test purposes.
// TODO: remove dependency.
TestDBAccess() *dbx.DB
}

View File

@ -69,7 +69,7 @@ func TestOrder(t *testing.T) {
}
// TODO: remove dependency on *dbx.DB
dbAccess := db.(interface{ TestDBAccess() *dbx.DB }).TestDBAccess()
dbAccess := db.RepairQueue().TestDBAccess()
err := dbAccess.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
updateList := []struct {
@ -133,7 +133,7 @@ func TestOrderHealthyPieces(t *testing.T) {
// ("path/h", 10, now-8h)
// TODO: remove dependency on *dbx.DB
dbAccess := db.(interface{ TestDBAccess() *dbx.DB }).TestDBAccess()
dbAccess := db.RepairQueue().TestDBAccess()
// insert the 8 segments according to the plan above
injuredSegList := []struct {

View File

@ -13,6 +13,7 @@ import (
"storj.io/storj/pkg/cache"
"storj.io/storj/private/dbutil"
"storj.io/storj/private/dbutil/pgutil"
"storj.io/storj/private/migrate"
"storj.io/storj/private/tagsql"
"storj.io/storj/satellite"
"storj.io/storj/satellite/accounting"
@ -22,6 +23,7 @@ import (
"storj.io/storj/satellite/console"
"storj.io/storj/satellite/downtime"
"storj.io/storj/satellite/gracefulexit"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/nodeapiversion"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
@ -39,6 +41,10 @@ var (
Error = errs.Class("satellitedb")
)
type satelliteDBCollection struct {
dbs map[string]*satelliteDB
}
// satelliteDB combines access to different database tables with a record
// of the db driver, db implementation, and db source URL.
type satelliteDB struct {
@ -71,8 +77,39 @@ type Options struct {
var _ dbx.DBMethods = &satelliteDB{}
// Open creates instance of database supports postgres.
func Open(ctx context.Context, log *zap.Logger, databaseURL string, opts Options) (satellite.DB, error) {
var safelyPartitionableDBs = map[string]bool{
// WARNING: only list additional db names here after they have been
// validated to be safely partitionable and that they do not do
// cross-db queries.
"repairqueue": true,
}
// Open creates instance of satellite.DB.
func Open(ctx context.Context, log *zap.Logger, databaseURL string, opts Options) (rv satellite.DB, err error) {
dbMapping, err := dbutil.ParseDBMapping(databaseURL)
if err != nil {
return nil, err
}
dbc := &satelliteDBCollection{dbs: map[string]*satelliteDB{}}
defer func() {
if err != nil {
err = errs.Combine(err, dbc.Close())
}
}()
for key, val := range dbMapping {
db, err := open(ctx, log, val, opts, key)
if err != nil {
return nil, err
}
dbc.dbs[key] = db
}
return dbc, nil
}
func open(ctx context.Context, log *zap.Logger, databaseURL string, opts Options, override string) (*satelliteDB, error) {
driver, source, implementation, err := dbutil.SplitConnStr(databaseURL)
if err != nil {
return nil, err
@ -90,7 +127,11 @@ func Open(ctx context.Context, log *zap.Logger, databaseURL string, opts Options
}
log.Debug("Connected to:", zap.String("db source", source))
dbutil.Configure(ctx, dbxDB.DB, "satellitedb", mon)
name := "satellitedb"
if override != "" {
name += ":" + override
}
dbutil.Configure(ctx, dbxDB.DB, name, mon)
core := &satelliteDB{
DB: dbxDB,
@ -107,47 +148,66 @@ func Open(ctx context.Context, log *zap.Logger, databaseURL string, opts Options
return core, nil
}
func (dbc *satelliteDBCollection) getByName(name string) *satelliteDB {
if safelyPartitionableDBs[name] {
if db, exists := dbc.dbs[name]; exists {
return db
}
}
return dbc.dbs[""]
}
// TestDBAccess for raw database access,
// should not be used outside of migration tests.
func (db *satelliteDB) TestDBAccess() *dbx.DB { return db.DB }
// MigrationTestingDefaultDB assists in testing migrations themselves against
// the default database.
func (dbc *satelliteDBCollection) MigrationTestingDefaultDB() interface {
TestDBAccess() *dbx.DB
PostgresMigration() *migrate.Migration
} {
return dbc.getByName("")
}
// PeerIdentities returns a storage for peer identities.
func (db *satelliteDB) PeerIdentities() overlay.PeerIdentities {
return &peerIdentities{db: db}
func (dbc *satelliteDBCollection) PeerIdentities() overlay.PeerIdentities {
return &peerIdentities{db: dbc.getByName("peeridentities")}
}
// Attribution is a getter for value attribution repository.
func (db *satelliteDB) Attribution() attribution.DB {
return &attributionDB{db: db}
func (dbc *satelliteDBCollection) Attribution() attribution.DB {
return &attributionDB{db: dbc.getByName("attribution")}
}
// OverlayCache is a getter for overlay cache repository.
func (db *satelliteDB) OverlayCache() overlay.DB {
return &overlaycache{db: db}
func (dbc *satelliteDBCollection) OverlayCache() overlay.DB {
return &overlaycache{db: dbc.getByName("overlaycache")}
}
// RepairQueue is a getter for RepairQueue repository.
func (db *satelliteDB) RepairQueue() queue.RepairQueue {
return &repairQueue{db: db}
func (dbc *satelliteDBCollection) RepairQueue() queue.RepairQueue {
return &repairQueue{db: dbc.getByName("repairqueue")}
}
// StoragenodeAccounting returns database for tracking storagenode usage.
func (db *satelliteDB) StoragenodeAccounting() accounting.StoragenodeAccounting {
return &StoragenodeAccounting{db: db}
func (dbc *satelliteDBCollection) StoragenodeAccounting() accounting.StoragenodeAccounting {
return &StoragenodeAccounting{db: dbc.getByName("storagenodeaccounting")}
}
// ProjectAccounting returns database for tracking project data use.
func (db *satelliteDB) ProjectAccounting() accounting.ProjectAccounting {
return &ProjectAccounting{db: db}
func (dbc *satelliteDBCollection) ProjectAccounting() accounting.ProjectAccounting {
return &ProjectAccounting{db: dbc.getByName("projectaccounting")}
}
// Irreparable returns database for storing segments that failed repair.
func (db *satelliteDB) Irreparable() irreparable.DB {
return &irreparableDB{db: db}
func (dbc *satelliteDBCollection) Irreparable() irreparable.DB {
return &irreparableDB{db: dbc.getByName("irreparable")}
}
// Revocation returns the database to deal with macaroon revocation.
func (db *satelliteDB) Revocation() revocation.DB {
func (dbc *satelliteDBCollection) Revocation() revocation.DB {
db := dbc.getByName("revocation")
db.revocationDBOnce.Do(func() {
db.revocationDB = &revocationDB{
db: db,
@ -159,7 +219,8 @@ func (db *satelliteDB) Revocation() revocation.DB {
}
// Console returns database for storing users, projects and api keys.
func (db *satelliteDB) Console() console.DB {
func (dbc *satelliteDBCollection) Console() console.DB {
db := dbc.getByName("console")
db.consoleDBOnce.Do(func() {
db.consoleDB = &ConsoleDB{
apikeysLRUOptions: db.opts.APIKeysLRUOptions,
@ -175,46 +236,88 @@ func (db *satelliteDB) Console() console.DB {
}
// Rewards returns database for storing offers.
func (db *satelliteDB) Rewards() rewards.DB {
return &offersDB{db: db}
func (dbc *satelliteDBCollection) Rewards() rewards.DB {
return &offersDB{db: dbc.getByName("rewards")}
}
// Orders returns database for storing orders.
func (db *satelliteDB) Orders() orders.DB {
func (dbc *satelliteDBCollection) Orders() orders.DB {
db := dbc.getByName("orders")
return &ordersDB{db: db, reportedRollupsReadBatchSize: db.opts.ReportedRollupsReadBatchSize}
}
// Containment returns database for storing pending audit info.
func (db *satelliteDB) Containment() audit.Containment {
return &containment{db: db}
func (dbc *satelliteDBCollection) Containment() audit.Containment {
return &containment{db: dbc.getByName("containment")}
}
// GracefulExit returns database for graceful exit.
func (db *satelliteDB) GracefulExit() gracefulexit.DB {
return &gracefulexitDB{db: db}
func (dbc *satelliteDBCollection) GracefulExit() gracefulexit.DB {
return &gracefulexitDB{db: dbc.getByName("gracefulexit")}
}
// StripeCoinPayments returns database for stripecoinpayments.
func (db *satelliteDB) StripeCoinPayments() stripecoinpayments.DB {
return &stripeCoinPaymentsDB{db: db}
func (dbc *satelliteDBCollection) StripeCoinPayments() stripecoinpayments.DB {
return &stripeCoinPaymentsDB{db: dbc.getByName("stripecoinpayments")}
}
// DowntimeTracking returns database for downtime tracking.
func (db *satelliteDB) DowntimeTracking() downtime.DB {
return &downtimeTrackingDB{db: db}
func (dbc *satelliteDBCollection) DowntimeTracking() downtime.DB {
return &downtimeTrackingDB{db: dbc.getByName("downtimetracking")}
}
// SnoPayout returns database for storagenode payStubs and payments info.
func (db *satelliteDB) SnoPayout() snopayout.DB {
return &paymentStubs{db: db}
func (dbc *satelliteDBCollection) SnoPayout() snopayout.DB {
return &paymentStubs{db: dbc.getByName("snopayout")}
}
// Compenstation returns database for storage node compensation.
func (db *satelliteDB) Compensation() compensation.DB {
return &compensationDB{db: db}
func (dbc *satelliteDBCollection) Compensation() compensation.DB {
return &compensationDB{db: dbc.getByName("compensation")}
}
// NodeAPIVersion returns database for storage node api version lower bounds.
func (db *satelliteDB) NodeAPIVersion() nodeapiversion.DB {
return &nodeAPIVersionDB{db: db}
func (dbc *satelliteDBCollection) NodeAPIVersion() nodeapiversion.DB {
return &nodeAPIVersionDB{db: dbc.getByName("nodeapiversion")}
}
// Buckets returns database for interacting with buckets.
func (dbc *satelliteDBCollection) Buckets() metainfo.BucketsDB {
return &bucketsDB{db: dbc.getByName("buckets")}
}
// CheckVersion confirms all databases are at the desired version.
func (dbc *satelliteDBCollection) CheckVersion(ctx context.Context) error {
var eg errs.Group
for _, db := range dbc.dbs {
eg.Add(db.CheckVersion(ctx))
}
return eg.Err()
}
// MigrateToLatest migrates all databases to the latest version.
func (dbc *satelliteDBCollection) MigrateToLatest(ctx context.Context) error {
var eg errs.Group
for _, db := range dbc.dbs {
eg.Add(db.MigrateToLatest(ctx))
}
return eg.Err()
}
// TestingMigrateToLatest is a method for creating all tables for all database for testing.
func (dbc *satelliteDBCollection) TestingMigrateToLatest(ctx context.Context) error {
var eg errs.Group
for _, db := range dbc.dbs {
eg.Add(db.TestingMigrateToLatest(ctx))
}
return eg.Err()
}
// Close closes all satellite dbs.
func (dbc *satelliteDBCollection) Close() error {
var eg errs.Group
for _, db := range dbc.dbs {
eg.Add(db.Close())
}
return eg.Err()
}

View File

@ -25,9 +25,7 @@ import (
"storj.io/storj/private/dbutil/pgtest"
"storj.io/storj/private/dbutil/pgutil"
"storj.io/storj/private/dbutil/tempdb"
"storj.io/storj/private/migrate"
"storj.io/storj/satellite/satellitedb"
"storj.io/storj/satellite/satellitedb/dbx"
)
// loadSnapshots loads all the dbschemas from `testdata/postgres.*`.
@ -134,13 +132,6 @@ func loadSchemaFromSQL(ctx context.Context, connstr, script string) (_ *dbschema
func TestMigratePostgres(t *testing.T) { migrateTest(t, pgtest.PickPostgres(t)) }
func TestMigrateCockroach(t *testing.T) { migrateTest(t, pgtest.PickCockroachAlt(t)) }
// satelliteDB provides access to certain methods on a *satellitedb.satelliteDB
// instance, since that type is not exported.
type satelliteDB interface {
TestDBAccess() *dbx.DB
PostgresMigration() *migrate.Migration
}
func migrateTest(t *testing.T, connStr string) {
t.Parallel()
@ -160,7 +151,7 @@ func migrateTest(t *testing.T, connStr string) {
defer func() { require.NoError(t, db.Close()) }()
// we need raw database access unfortunately
rawdb := db.(satelliteDB).TestDBAccess()
rawdb := db.MigrationTestingDefaultDB().TestDBAccess()
snapshots, dbxschema, err := loadSnapshots(ctx, connStr, rawdb.Schema())
require.NoError(t, err)
@ -168,7 +159,7 @@ func migrateTest(t *testing.T, connStr string) {
var finalSchema *dbschema.Schema
// get migration for this database
migrations := db.(satelliteDB).PostgresMigration()
migrations := db.MigrationTestingDefaultDB().PostgresMigration()
for i, step := range migrations.Steps {
tag := fmt.Sprintf("#%d - v%d", i, step.Version)

View File

@ -1566,6 +1566,6 @@ func (cache *overlaycache) TestUnvetNode(ctx context.Context, nodeID storj.NodeI
if err != nil {
return err
}
_, err = cache.db.OverlayCache().Get(ctx, nodeID)
_, err = cache.Get(ctx, nodeID)
return err
}

View File

@ -24,6 +24,10 @@ type repairQueue struct {
db *satelliteDB
}
func (r *repairQueue) TestDBAccess() *dbx.DB {
return r.db.DB
}
func (r *repairQueue) Insert(ctx context.Context, seg *internalpb.InjuredSegment, segmentHealth float64) (alreadyInserted bool, err error) {
defer mon.Task()(&ctx)(&err)
// insert if not exists, or update healthy count if does exist

View File

@ -24,7 +24,6 @@ import (
"storj.io/storj/satellite"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/satellitedb"
"storj.io/storj/satellite/satellitedb/dbx"
)
// SatelliteDatabases maybe name can be better.
@ -99,11 +98,6 @@ func (db *tempMasterDB) Close() error {
return errs.Combine(db.DB.Close(), db.tempDB.Close())
}
// TestDBAccess provides a somewhat regularized access to the underlying DB.
func (db *tempMasterDB) TestDBAccess() *dbx.DB {
return db.DB.(interface{ TestDBAccess() *dbx.DB }).TestDBAccess()
}
// CreateMasterDB creates a new satellite database for testing.
func CreateMasterDB(ctx context.Context, log *zap.Logger, name string, category string, index int, dbInfo Database) (db satellite.DB, err error) {
if dbInfo.URL == "" {