satellite/satellitedb: switch to postgres only (#3320)
This commit is contained in:
parent
c0ffd59887
commit
89ed997706
@ -30,7 +30,7 @@ import (
|
||||
|
||||
// Satellite defines satellite configuration
|
||||
type Satellite struct {
|
||||
Database string `help:"satellite database connection string" releaseDefault:"postgres://" devDefault:"sqlite3://$CONFDIR/master.db"`
|
||||
Database string `help:"satellite database connection string" releaseDefault:"postgres://" devDefault:"postgres://"`
|
||||
|
||||
satellite.Config
|
||||
}
|
||||
@ -84,15 +84,15 @@ var (
|
||||
setupCfg Satellite
|
||||
|
||||
qdiagCfg struct {
|
||||
Database string `help:"satellite database connection string" releaseDefault:"postgres://" devDefault:"sqlite3://$CONFDIR/master.db"`
|
||||
Database string `help:"satellite database connection string" releaseDefault:"postgres://" devDefault:"postgres://"`
|
||||
QListLimit int `help:"maximum segments that can be requested" default:"1000"`
|
||||
}
|
||||
nodeUsageCfg struct {
|
||||
Database string `help:"satellite database connection string" releaseDefault:"postgres://" devDefault:"sqlite3://$CONFDIR/master.db"`
|
||||
Database string `help:"satellite database connection string" releaseDefault:"postgres://" devDefault:"postgres://"`
|
||||
Output string `help:"destination of report output" default:""`
|
||||
}
|
||||
partnerAttribtionCfg struct {
|
||||
Database string `help:"satellite database connection string" releaseDefault:"postgres://" devDefault:"sqlite3://$CONFDIR/master.db"`
|
||||
Database string `help:"satellite database connection string" releaseDefault:"postgres://" devDefault:"postgres://"`
|
||||
Output string `help:"destination of report output" default:""`
|
||||
}
|
||||
confDir string
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"go.uber.org/zap/zaptest"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"storj.io/storj/internal/dbutil/pgutil"
|
||||
"storj.io/storj/internal/testidentity"
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/storj"
|
||||
@ -50,10 +51,13 @@ type Config struct {
|
||||
Identities *testidentity.Identities
|
||||
IdentityVersion *storj.IDVersion
|
||||
Reconfigure Reconfigure
|
||||
|
||||
Name string
|
||||
}
|
||||
|
||||
// Planet is a full storj system setup.
|
||||
type Planet struct {
|
||||
id string
|
||||
log *zap.Logger
|
||||
config Config
|
||||
directory string // TODO: ensure that everything is in-memory to speed things up
|
||||
@ -105,7 +109,13 @@ func New(t zaptest.TestingT, satelliteCount, storageNodeCount, uplinkCount int)
|
||||
log = zaptest.NewLogger(t)
|
||||
}
|
||||
|
||||
return NewWithLogger(log, satelliteCount, storageNodeCount, uplinkCount)
|
||||
return NewCustom(log, Config{
|
||||
SatelliteCount: satelliteCount,
|
||||
StorageNodeCount: storageNodeCount,
|
||||
UplinkCount: uplinkCount,
|
||||
|
||||
Name: t.Name(),
|
||||
})
|
||||
}
|
||||
|
||||
// NewWithIdentityVersion creates a new full system with the given version for node identities and the given number of nodes.
|
||||
@ -122,15 +132,8 @@ func NewWithIdentityVersion(t zaptest.TestingT, identityVersion *storj.IDVersion
|
||||
StorageNodeCount: storageNodeCount,
|
||||
UplinkCount: uplinkCount,
|
||||
IdentityVersion: identityVersion,
|
||||
})
|
||||
}
|
||||
|
||||
// NewWithLogger creates a new full system with the given number of nodes.
|
||||
func NewWithLogger(log *zap.Logger, satelliteCount, storageNodeCount, uplinkCount int) (*Planet, error) {
|
||||
return NewCustom(log, Config{
|
||||
SatelliteCount: satelliteCount,
|
||||
StorageNodeCount: storageNodeCount,
|
||||
UplinkCount: uplinkCount,
|
||||
Name: t.Name(),
|
||||
})
|
||||
}
|
||||
|
||||
@ -146,6 +149,7 @@ func NewCustom(log *zap.Logger, config Config) (*Planet, error) {
|
||||
|
||||
planet := &Planet{
|
||||
log: log,
|
||||
id: config.Name + "/" + pgutil.CreateRandomTestingSchemaName(6),
|
||||
config: config,
|
||||
identities: config.Identities,
|
||||
}
|
||||
|
@ -4,8 +4,6 @@
|
||||
package testplanet
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
@ -23,23 +21,13 @@ import (
|
||||
|
||||
// Run runs testplanet in multiple configurations.
|
||||
func Run(t *testing.T, config Config, test func(t *testing.T, ctx *testcontext.Context, planet *Planet)) {
|
||||
schemaSuffix := pgutil.CreateRandomTestingSchemaName(6)
|
||||
t.Log("schema-suffix ", schemaSuffix)
|
||||
|
||||
for _, satelliteDB := range satellitedbtest.Databases() {
|
||||
satelliteDB := satelliteDB
|
||||
t.Run(satelliteDB.MasterDB.Name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// postgres has a maximum schema length of 64
|
||||
// we need additional 6 bytes for the random suffix
|
||||
// and 4 bytes for the satellite index "/S0/""
|
||||
const MaxTestNameLength = 64 - 6 - 4
|
||||
|
||||
testname := t.Name()
|
||||
if len(testname) > MaxTestNameLength {
|
||||
testname = testname[:MaxTestNameLength]
|
||||
}
|
||||
schemaSuffix := satellitedbtest.SchemaSuffix()
|
||||
t.Log("schema-suffix ", schemaSuffix)
|
||||
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
@ -50,26 +38,23 @@ func Run(t *testing.T, config Config, test func(t *testing.T, ctx *testcontext.C
|
||||
|
||||
planetConfig := config
|
||||
planetConfig.Reconfigure.NewSatelliteDB = func(log *zap.Logger, index int) (satellite.DB, error) {
|
||||
schema := strings.ToLower(testname + "/S" + strconv.Itoa(index) + "/" + schemaSuffix)
|
||||
schema := satellitedbtest.SchemaName(t.Name(), "S", index, schemaSuffix)
|
||||
|
||||
db, err := satellitedb.New(log, pgutil.ConnstrWithSchema(satelliteDB.MasterDB.URL, schema))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = db.CreateSchema(schema)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
return &satelliteSchema{
|
||||
DB: db,
|
||||
schema: schema,
|
||||
return &satellitedbtest.SchemaDB{
|
||||
DB: db,
|
||||
Schema: schema,
|
||||
AutoDrop: true,
|
||||
}, nil
|
||||
}
|
||||
|
||||
if satelliteDB.PointerDB.URL != "" {
|
||||
planetConfig.Reconfigure.NewSatellitePointerDB = func(log *zap.Logger, index int) (metainfo.PointerDB, error) {
|
||||
schema := strings.ToLower(testname + "/P" + strconv.Itoa(index) + "/" + schemaSuffix)
|
||||
schema := satellitedbtest.SchemaName(t.Name(), "P", index, schemaSuffix)
|
||||
|
||||
db, err := postgreskv.New(pgutil.ConnstrWithSchema(satelliteDB.PointerDB.URL, schema))
|
||||
if err != nil {
|
||||
@ -96,20 +81,6 @@ func Run(t *testing.T, config Config, test func(t *testing.T, ctx *testcontext.C
|
||||
}
|
||||
}
|
||||
|
||||
// satelliteSchema closes database and drops the associated schema
|
||||
type satelliteSchema struct {
|
||||
satellite.DB
|
||||
schema string
|
||||
}
|
||||
|
||||
// Close closes the database and drops the schema.
|
||||
func (db *satelliteSchema) Close() error {
|
||||
return errs.Combine(
|
||||
db.DB.DropSchema(db.schema),
|
||||
db.DB.Close(),
|
||||
)
|
||||
}
|
||||
|
||||
// satellitePointerSchema closes database and drops the associated schema
|
||||
type satellitePointerSchema struct {
|
||||
*postgreskv.Client
|
||||
|
@ -48,7 +48,7 @@ import (
|
||||
"storj.io/storj/satellite/repair/checker"
|
||||
"storj.io/storj/satellite/repair/irreparable"
|
||||
"storj.io/storj/satellite/repair/repairer"
|
||||
"storj.io/storj/satellite/satellitedb"
|
||||
"storj.io/storj/satellite/satellitedb/satellitedbtest"
|
||||
"storj.io/storj/satellite/vouchers"
|
||||
)
|
||||
|
||||
@ -220,7 +220,8 @@ func (planet *Planet) newSatellites(count int) ([]*SatelliteSystem, error) {
|
||||
if planet.config.Reconfigure.NewSatelliteDB != nil {
|
||||
db, err = planet.config.Reconfigure.NewSatelliteDB(log.Named("db"), i)
|
||||
} else {
|
||||
db, err = satellitedb.NewInMemory(log.Named("db"))
|
||||
schema := satellitedbtest.SchemaName(planet.id, "S", i, "")
|
||||
db, err = satellitedbtest.NewPostgres(log.Named("db"), schema)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -13,7 +13,6 @@ import (
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/storj/internal/errs2"
|
||||
@ -351,8 +350,7 @@ func TestCommitSegment(t *testing.T) {
|
||||
}
|
||||
_, err = metainfo.CommitSegment(ctx, "bucket", "path", -1, pointer, limits)
|
||||
require.Error(t, err)
|
||||
err = errs.Unwrap(err)
|
||||
require.Equal(t, rpcstatus.Code(err), rpcstatus.InvalidArgument)
|
||||
require.True(t, errs2.IsRPC(err, rpcstatus.InvalidArgument))
|
||||
require.Contains(t, err.Error(), "is less than or equal to the repair threshold")
|
||||
}
|
||||
|
||||
|
@ -29,8 +29,6 @@ var (
|
||||
Error = errs.Class("satellitedb")
|
||||
)
|
||||
|
||||
//go:generate go run ../../scripts/lockedgen.go -o locked.go -p satellitedb -i storj.io/storj/satellite.DB
|
||||
|
||||
// DB contains access to different database tables
|
||||
type DB struct {
|
||||
log *zap.Logger
|
||||
@ -39,15 +37,18 @@ type DB struct {
|
||||
source string
|
||||
}
|
||||
|
||||
// New creates instance of database (supports: postgres, sqlite3)
|
||||
// New creates instance of database supports postgres
|
||||
func New(log *zap.Logger, databaseURL string) (satellite.DB, error) {
|
||||
driver, source, err := dbutil.SplitConnstr(databaseURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if driver == "postgres" {
|
||||
source = pgutil.CheckApplicationName(source)
|
||||
if driver != "postgres" {
|
||||
return nil, Error.New("unsupported driver %q", driver)
|
||||
}
|
||||
|
||||
source = pgutil.CheckApplicationName(source)
|
||||
|
||||
db, err := dbx.Open(driver, source)
|
||||
if err != nil {
|
||||
return nil, Error.New("failed opening database %q, %q: %v",
|
||||
@ -58,17 +59,9 @@ func New(log *zap.Logger, databaseURL string) (satellite.DB, error) {
|
||||
dbutil.Configure(db.DB, mon)
|
||||
|
||||
core := &DB{log: log, db: db, driver: driver, source: source}
|
||||
if driver == "sqlite3" {
|
||||
return newLocked(core), nil
|
||||
}
|
||||
return core, nil
|
||||
}
|
||||
|
||||
// NewInMemory creates instance of Sqlite in memory satellite database
|
||||
func NewInMemory(log *zap.Logger) (satellite.DB, error) {
|
||||
return New(log, "sqlite3://file::memory:?mode=memory")
|
||||
}
|
||||
|
||||
// Close is used to close db connection
|
||||
func (db *DB) Close() error {
|
||||
return db.db.Close()
|
||||
@ -76,10 +69,7 @@ func (db *DB) Close() error {
|
||||
|
||||
// CreateSchema creates a schema if it doesn't exist.
|
||||
func (db *DB) CreateSchema(schema string) error {
|
||||
if db.driver == "postgres" {
|
||||
return pgutil.CreateSchema(db.db, schema)
|
||||
}
|
||||
return nil
|
||||
return pgutil.CreateSchema(db.db, schema)
|
||||
}
|
||||
|
||||
// TestDBAccess for raw database access,
|
||||
@ -94,10 +84,7 @@ func (db *locked) TestDBAccess() *dbx.DB {
|
||||
|
||||
// DropSchema drops the named schema
|
||||
func (db *DB) DropSchema(schema string) error {
|
||||
if db.driver == "postgres" {
|
||||
return pgutil.DropSchema(db.db, schema)
|
||||
}
|
||||
return nil
|
||||
return pgutil.DropSchema(db.db, schema)
|
||||
}
|
||||
|
||||
// PeerIdentities returns a storage for peer identities
|
||||
|
63
satellite/satellitedb/satellitedbtest/pgschema.go
Normal file
63
satellite/satellitedb/satellitedbtest/pgschema.go
Normal file
@ -0,0 +1,63 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package satellitedbtest
|
||||
|
||||
import (
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/storj/internal/dbutil/pgutil"
|
||||
"storj.io/storj/internal/dbutil/pgutil/pgtest"
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/satellitedb"
|
||||
dbx "storj.io/storj/satellite/satellitedb/dbx"
|
||||
)
|
||||
|
||||
// NewPostgres returns the default postgres satellite.DB for testing.
|
||||
func NewPostgres(log *zap.Logger, schema string) (satellite.DB, error) {
|
||||
db, err := satellitedb.New(log, pgutil.ConnstrWithSchema(*pgtest.ConnStr, schema))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &SchemaDB{
|
||||
DB: db,
|
||||
Schema: schema,
|
||||
AutoDrop: true,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// SchemaDB implements automatic schema handling for satellite.DB
|
||||
type SchemaDB struct {
|
||||
satellite.DB
|
||||
|
||||
Schema string
|
||||
AutoDrop bool
|
||||
}
|
||||
|
||||
// TestDBAccess for raw database access,
|
||||
// should not be used outside of migration tests.
|
||||
func (db *SchemaDB) TestDBAccess() *dbx.DB {
|
||||
return db.DB.(interface{ TestDBAccess() *dbx.DB }).TestDBAccess()
|
||||
}
|
||||
|
||||
// CreateTables creates the schema and creates tables.
|
||||
func (db *SchemaDB) CreateTables() error {
|
||||
err := db.DB.CreateSchema(db.Schema)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return db.DB.CreateTables()
|
||||
}
|
||||
|
||||
// Close closes the database and drops the schema, when `AutoDrop` is set.
|
||||
func (db *SchemaDB) Close() error {
|
||||
var dropErr error
|
||||
if db.AutoDrop {
|
||||
dropErr = db.DB.DropSchema(db.Schema)
|
||||
}
|
||||
|
||||
return errs.Combine(dropErr, db.DB.Close())
|
||||
}
|
@ -6,11 +6,10 @@ package satellitedbtest
|
||||
// This package should be referenced only in test files!
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
||||
"storj.io/storj/internal/dbutil/pgutil"
|
||||
@ -19,11 +18,6 @@ import (
|
||||
"storj.io/storj/satellite/satellitedb"
|
||||
)
|
||||
|
||||
const (
|
||||
// DefaultSqliteConn is a connstring that is inmemory
|
||||
DefaultSqliteConn = "sqlite3://file::memory:?mode=memory"
|
||||
)
|
||||
|
||||
// SatelliteDatabases maybe name can be better
|
||||
type SatelliteDatabases struct {
|
||||
MasterDB Database
|
||||
@ -41,48 +35,68 @@ type Database struct {
|
||||
func Databases() []SatelliteDatabases {
|
||||
return []SatelliteDatabases{
|
||||
{
|
||||
MasterDB: Database{"Sqlite", DefaultSqliteConn, ""},
|
||||
PointerDB: Database{"Bolt", "", "should use preconfigured URL"},
|
||||
},
|
||||
{
|
||||
MasterDB: Database{"Postgres", *pgtest.ConnStr, "Postgres flag missing, example: -postgres-test-db=" + pgtest.DefaultConnStr},
|
||||
MasterDB: Database{"Postgres", *pgtest.ConnStr, "Postgres flag missing, example: -postgres-test-db=" + pgtest.DefaultConnStr + " or use STORJ_POSTGRES_TEST environment variable."},
|
||||
PointerDB: Database{"Postgres", *pgtest.ConnStr, ""},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// SchemaSuffix returns a suffix for schemas.
|
||||
func SchemaSuffix() string {
|
||||
return pgutil.CreateRandomTestingSchemaName(6)
|
||||
}
|
||||
|
||||
// SchemaName returns a properly formatted schema string.
|
||||
func SchemaName(testname, category string, index int, schemaSuffix string) string {
|
||||
// postgres has a maximum schema length of 64
|
||||
// we need additional 6 bytes for the random suffix
|
||||
// and 4 bytes for the satellite index "/S0/""
|
||||
|
||||
indexStr := strconv.Itoa(index)
|
||||
|
||||
var maxTestNameLen = 64 - len(category) - len(indexStr) - len(schemaSuffix) - 2
|
||||
if len(testname) > maxTestNameLen {
|
||||
testname = testname[:maxTestNameLen]
|
||||
}
|
||||
|
||||
if schemaSuffix == "" {
|
||||
return strings.ToLower(testname + "/" + category + indexStr)
|
||||
}
|
||||
|
||||
return strings.ToLower(testname + "/" + schemaSuffix + "/" + category + indexStr)
|
||||
}
|
||||
|
||||
// Run method will iterate over all supported databases. Will establish
|
||||
// connection and will create tables for each DB.
|
||||
func Run(t *testing.T, test func(t *testing.T, db satellite.DB)) {
|
||||
schemaSuffix := pgutil.CreateRandomTestingSchemaName(8)
|
||||
t.Log("schema-suffix ", schemaSuffix)
|
||||
|
||||
for _, dbInfo := range Databases() {
|
||||
dbInfo := dbInfo
|
||||
t.Run(dbInfo.MasterDB.Name+"/"+dbInfo.PointerDB.Name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
if dbInfo.MasterDB.URL == "" {
|
||||
t.Skipf("Database %s connection string not provided. %s", dbInfo.MasterDB.Name, dbInfo.MasterDB.Message)
|
||||
t.Fatalf("Database %s connection string not provided. %s", dbInfo.MasterDB.Name, dbInfo.MasterDB.Message)
|
||||
}
|
||||
|
||||
schemaSuffix := SchemaSuffix()
|
||||
t.Log("schema-suffix ", schemaSuffix)
|
||||
|
||||
log := zaptest.NewLogger(t)
|
||||
schema := SchemaName(t.Name(), "T", 0, schemaSuffix)
|
||||
|
||||
schema := strings.ToLower(t.Name() + "-satellite/x-" + schemaSuffix)
|
||||
connstr := pgutil.ConnstrWithSchema(dbInfo.MasterDB.URL, schema)
|
||||
db, err := satellitedb.New(log, connstr)
|
||||
pgdb, err := satellitedb.New(log, pgutil.ConnstrWithSchema(dbInfo.MasterDB.URL, schema))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = db.CreateSchema(schema)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
db := &SchemaDB{
|
||||
DB: pgdb,
|
||||
Schema: schema,
|
||||
AutoDrop: true,
|
||||
}
|
||||
|
||||
defer func() {
|
||||
dropErr := db.DropSchema(schema)
|
||||
err := errs.Combine(dropErr, db.Close())
|
||||
err := db.Close()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -101,9 +115,6 @@ func Run(t *testing.T, test func(t *testing.T, db satellite.DB)) {
|
||||
// Bench method will iterate over all supported databases. Will establish
|
||||
// connection and will create tables for each DB.
|
||||
func Bench(b *testing.B, bench func(b *testing.B, db satellite.DB)) {
|
||||
schemaSuffix := pgutil.CreateRandomTestingSchemaName(8)
|
||||
b.Log("schema-suffix ", schemaSuffix)
|
||||
|
||||
for _, dbInfo := range Databases() {
|
||||
dbInfo := dbInfo
|
||||
b.Run(dbInfo.MasterDB.Name+"/"+dbInfo.PointerDB.Name, func(b *testing.B) {
|
||||
@ -111,23 +122,25 @@ func Bench(b *testing.B, bench func(b *testing.B, db satellite.DB)) {
|
||||
b.Skipf("Database %s connection string not provided. %s", dbInfo.MasterDB.Name, dbInfo.MasterDB.Message)
|
||||
}
|
||||
|
||||
log := zap.NewNop()
|
||||
schemaSuffix := SchemaSuffix()
|
||||
b.Log("schema-suffix ", schemaSuffix)
|
||||
|
||||
schema := strings.ToLower(b.Name() + "-satellite/x-" + schemaSuffix)
|
||||
connstr := pgutil.ConnstrWithSchema(dbInfo.MasterDB.URL, schema)
|
||||
db, err := satellitedb.New(log, connstr)
|
||||
log := zaptest.NewLogger(b)
|
||||
schema := SchemaName(b.Name(), "X", 0, schemaSuffix)
|
||||
|
||||
pgdb, err := satellitedb.New(log, pgutil.ConnstrWithSchema(dbInfo.MasterDB.URL, schema))
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
||||
err = db.CreateSchema(schema)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
db := &SchemaDB{
|
||||
DB: pgdb,
|
||||
Schema: schema,
|
||||
AutoDrop: true,
|
||||
}
|
||||
|
||||
defer func() {
|
||||
dropErr := db.DropSchema(schema)
|
||||
err := errs.Combine(dropErr, db.Close())
|
||||
err := db.Close()
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user