satellite/metainfo: add metabase to metainfo service

Change-Id: Ie3ff238b138d8a57d99e32b13f7a71aa624d53e3
This commit is contained in:
Kaloyan Raev 2020-10-29 18:54:35 +02:00
parent 995900e02f
commit b8c6fb764c
12 changed files with 179 additions and 33 deletions

View File

@ -50,6 +50,14 @@ func cmdAPIRun(cmd *cobra.Command, args []string) (err error) {
err = errs.Combine(err, pointerDB.Close())
}()
metabaseDB, err := metainfo.OpenMetabase(ctx, log.Named("metabase"), runCfg.Config.Metainfo.DatabaseURL)
if err != nil {
return errs.New("Error creating metabase connection on satellite api: %+v", err)
}
defer func() {
err = errs.Combine(err, metabaseDB.Close())
}()
revocationDB, err := revocation.OpenDBFromCfg(ctx, runCfg.Config.Server.Config)
if err != nil {
return errs.New("Error creating revocation database on satellite api: %+v", err)
@ -71,7 +79,7 @@ func cmdAPIRun(cmd *cobra.Command, args []string) (err error) {
err = errs.Combine(err, rollupsWriteCache.CloseAndFlush(context2.WithoutCancellation(ctx)))
}()
peer, err := satellite.NewAPI(log, identity, db, pointerDB, revocationDB, accountingCache, rollupsWriteCache, &runCfg.Config, version.Build, process.AtomicLevel(cmd))
peer, err := satellite.NewAPI(log, identity, db, pointerDB, metabaseDB, revocationDB, accountingCache, rollupsWriteCache, &runCfg.Config, version.Build, process.AtomicLevel(cmd))
if err != nil {
return err
}
@ -90,6 +98,11 @@ func cmdAPIRun(cmd *cobra.Command, args []string) (err error) {
return errs.New("Error creating metainfodb tables on satellite api: %+v", err)
}
err = metabaseDB.MigrateToLatest(ctx)
if err != nil {
return errs.New("Error creating metabase tables on satellite api: %+v", err)
}
err = db.CheckVersion(ctx)
if err != nil {
log.Error("Failed satellite database version check.", zap.Error(err))

View File

@ -343,6 +343,14 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
err = errs.Combine(err, pointerDB.Close())
}()
metabaseDB, err := metainfo.OpenMetabase(ctx, log.Named("metabase"), runCfg.Metainfo.DatabaseURL)
if err != nil {
return errs.New("Error creating metabase connection: %+v", err)
}
defer func() {
err = errs.Combine(err, metabaseDB.Close())
}()
revocationDB, err := revocation.OpenDBFromCfg(ctx, runCfg.Server.Config)
if err != nil {
return errs.New("Error creating revocation database: %+v", err)
@ -364,7 +372,7 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
err = errs.Combine(err, rollupsWriteCache.CloseAndFlush(context2.WithoutCancellation(ctx)))
}()
peer, err := satellite.New(log, identity, db, pointerDB, revocationDB, liveAccounting, rollupsWriteCache, version.Build, &runCfg.Config, process.AtomicLevel(cmd))
peer, err := satellite.New(log, identity, db, pointerDB, metabaseDB, revocationDB, liveAccounting, rollupsWriteCache, version.Build, &runCfg.Config, process.AtomicLevel(cmd))
if err != nil {
return err
}
@ -384,6 +392,11 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
return errs.New("Error creating metainfodb tables: %+v", err)
}
err = metabaseDB.MigrateToLatest(ctx)
if err != nil {
return errs.New("Error creating metabase tables: %+v", err)
}
err = db.CheckVersion(ctx)
if err != nil {
log.Error("Failed satellite database version check.", zap.Error(err))

View File

@ -46,6 +46,14 @@ func cmdRepairerRun(cmd *cobra.Command, args []string) (err error) {
err = errs.Combine(err, pointerDB.Close())
}()
metabaseDB, err := metainfo.OpenMetabase(ctx, log.Named("metabase"), runCfg.Metainfo.DatabaseURL)
if err != nil {
return errs.New("Error creating metabase connection: %+v", err)
}
defer func() {
err = errs.Combine(err, metabaseDB.Close())
}()
revocationDB, err := revocation.OpenDBFromCfg(ctx, runCfg.Server.Config)
if err != nil {
return errs.New("Error creating revocation database: %+v", err)
@ -63,6 +71,7 @@ func cmdRepairerRun(cmd *cobra.Command, args []string) (err error) {
log,
identity,
pointerDB,
metabaseDB,
revocationDB,
db.RepairQueue(),
db.Buckets(),
@ -91,6 +100,11 @@ func cmdRepairerRun(cmd *cobra.Command, args []string) (err error) {
return errs.New("Error creating tables for metainfo database: %+v", err)
}
err = metabaseDB.MigrateToLatest(ctx)
if err != nil {
return errs.New("Error creating tables for metabase: %+v", err)
}
err = db.CheckVersion(ctx)
if err != nil {
log.Error("Failed satellite database version check.", zap.Error(err))

View File

@ -20,9 +20,10 @@ import (
// Reconfigure allows to change node configurations.
type Reconfigure struct {
SatelliteDB func(log *zap.Logger, index int, db satellite.DB) (satellite.DB, error)
SatellitePointerDB func(log *zap.Logger, index int, db metainfo.PointerDB) (metainfo.PointerDB, error)
Satellite func(log *zap.Logger, index int, config *satellite.Config)
SatelliteDB func(log *zap.Logger, index int, db satellite.DB) (satellite.DB, error)
SatellitePointerDB func(log *zap.Logger, index int, db metainfo.PointerDB) (metainfo.PointerDB, error)
SatelliteMetabaseDB func(log *zap.Logger, index int, db metainfo.MetabaseDB) (metainfo.MetabaseDB, error)
Satellite func(log *zap.Logger, index int, config *satellite.Config)
ReferralManagerServer func(log *zap.Logger) pb.DRPCReferralManagerServer

View File

@ -386,6 +386,21 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int
}
planet.databases = append(planet.databases, pointerDB)
metabaseDB, err := satellitedbtest.CreateMetabaseDB(context.TODO(), log.Named("metabase"), planet.config.Name, "M", index, databases.MetabaseDB)
if err != nil {
return nil, err
}
if planet.config.Reconfigure.SatelliteMetabaseDB != nil {
var newMetabaseDB metainfo.MetabaseDB
newMetabaseDB, err = planet.config.Reconfigure.SatelliteMetabaseDB(log.Named("metabase"), index, metabaseDB)
if err != nil {
return nil, errs.Combine(err, metabaseDB.Close())
}
metabaseDB = newMetabaseDB
}
planet.databases = append(planet.databases, metabaseDB)
redis, err := redisserver.Mini(ctx)
if err != nil {
return nil, err
@ -656,7 +671,7 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int
rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), config.Orders.FlushBatchSize)
planet.databases = append(planet.databases, rollupsWriteCacheCloser{rollupsWriteCache})
peer, err := satellite.New(log, identity, db, pointerDB, revocationDB, liveAccounting, rollupsWriteCache, versionInfo, &config, nil)
peer, err := satellite.New(log, identity, db, pointerDB, metabaseDB, revocationDB, liveAccounting, rollupsWriteCache, versionInfo, &config, nil)
if err != nil {
return nil, err
}
@ -666,7 +681,7 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int
return nil, err
}
api, err := planet.newAPI(ctx, index, identity, db, pointerDB, config, versionInfo)
api, err := planet.newAPI(ctx, index, identity, db, pointerDB, metabaseDB, config, versionInfo)
if err != nil {
return nil, err
}
@ -676,7 +691,7 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int
return nil, err
}
repairerPeer, err := planet.newRepairer(ctx, index, identity, db, pointerDB, config, versionInfo)
repairerPeer, err := planet.newRepairer(ctx, index, identity, db, pointerDB, metabaseDB, config, versionInfo)
if err != nil {
return nil, err
}
@ -769,7 +784,7 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer
return system
}
func (planet *Planet) newAPI(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, pointerDB metainfo.PointerDB, config satellite.Config, versionInfo version.Info) (*satellite.API, error) {
func (planet *Planet) newAPI(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, pointerDB metainfo.PointerDB, metabaseDB metainfo.MetabaseDB, config satellite.Config, versionInfo version.Info) (*satellite.API, error) {
prefix := "satellite-api" + strconv.Itoa(index)
log := planet.log.Named(prefix)
var err error
@ -789,7 +804,7 @@ func (planet *Planet) newAPI(ctx context.Context, index int, identity *identity.
rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), config.Orders.FlushBatchSize)
planet.databases = append(planet.databases, rollupsWriteCacheCloser{rollupsWriteCache})
return satellite.NewAPI(log, identity, db, pointerDB, revocationDB, liveAccounting, rollupsWriteCache, &config, versionInfo, nil)
return satellite.NewAPI(log, identity, db, pointerDB, metabaseDB, revocationDB, liveAccounting, rollupsWriteCache, &config, versionInfo, nil)
}
func (planet *Planet) newAdmin(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, config satellite.Config, versionInfo version.Info) (*satellite.Admin, error) {
@ -799,7 +814,7 @@ func (planet *Planet) newAdmin(ctx context.Context, index int, identity *identit
return satellite.NewAdmin(log, identity, db, versionInfo, &config, nil)
}
func (planet *Planet) newRepairer(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, pointerDB metainfo.PointerDB, config satellite.Config, versionInfo version.Info) (*satellite.Repairer, error) {
func (planet *Planet) newRepairer(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, pointerDB metainfo.PointerDB, metabaseDB metainfo.MetabaseDB, config satellite.Config, versionInfo version.Info) (*satellite.Repairer, error) {
prefix := "satellite-repairer" + strconv.Itoa(index)
log := planet.log.Named(prefix)
@ -812,7 +827,7 @@ func (planet *Planet) newRepairer(ctx context.Context, index int, identity *iden
rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), config.Orders.FlushBatchSize)
planet.databases = append(planet.databases, rollupsWriteCacheCloser{rollupsWriteCache})
return satellite.NewRepairer(log, identity, pointerDB, revocationDB, db.RepairQueue(), db.Buckets(), db.OverlayCache(), rollupsWriteCache, db.Irreparable(), versionInfo, &config, nil)
return satellite.NewRepairer(log, identity, pointerDB, metabaseDB, revocationDB, db.RepairQueue(), db.Buckets(), db.OverlayCache(), rollupsWriteCache, db.Irreparable(), versionInfo, &config, nil)
}
type rollupsWriteCacheCloser struct {

View File

@ -97,6 +97,7 @@ type API struct {
Metainfo struct {
Database metainfo.PointerDB
Metabase metainfo.MetabaseDB
Service *metainfo.Service
PieceDeletion *piecedeletion.Service
Endpoint2 *metainfo.Endpoint
@ -167,7 +168,8 @@ type API struct {
// NewAPI creates a new satellite API process.
func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
pointerDB metainfo.PointerDB, revocationDB extensions.RevocationDB, liveAccounting accounting.Cache, rollupsWriteCache *orders.RollupsWriteCache,
pointerDB metainfo.PointerDB, metabaseDB metainfo.MetabaseDB, revocationDB extensions.RevocationDB,
liveAccounting accounting.Cache, rollupsWriteCache *orders.RollupsWriteCache,
config *Config, versionInfo version.Info, atomicLogLevel *zap.AtomicLevel) (*API, error) {
peer := &API{
Log: log,
@ -392,9 +394,11 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
{ // setup metainfo
peer.Metainfo.Database = pointerDB
peer.Metainfo.Metabase = metabaseDB
peer.Metainfo.Service = metainfo.NewService(peer.Log.Named("metainfo:service"),
peer.Metainfo.Database,
peer.DB.Buckets(),
peer.Metainfo.Metabase,
)
peer.Metainfo.PieceDeletion, err = piecedeletion.NewService(

View File

@ -81,6 +81,7 @@ type Core struct {
Metainfo struct {
Database metainfo.PointerDB // TODO: move into pointerDB
Metabase metainfo.MetabaseDB
Service *metainfo.Service
Loop *metainfo.Loop
}
@ -147,8 +148,8 @@ type Core struct {
// New creates a new satellite.
func New(log *zap.Logger, full *identity.FullIdentity, db DB,
pointerDB metainfo.PointerDB, revocationDB extensions.RevocationDB, liveAccounting accounting.Cache,
rollupsWriteCache *orders.RollupsWriteCache,
pointerDB metainfo.PointerDB, metabaseDB metainfo.MetabaseDB, revocationDB extensions.RevocationDB,
liveAccounting accounting.Cache, rollupsWriteCache *orders.RollupsWriteCache,
versionInfo version.Info, config *Config, atomicLogLevel *zap.AtomicLevel) (*Core, error) {
peer := &Core{
Log: log,
@ -272,9 +273,11 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
{ // setup metainfo
peer.Metainfo.Database = pointerDB // for logging: storelogger.New(peer.Log.Named("pdb"), db)
peer.Metainfo.Metabase = metabaseDB
peer.Metainfo.Service = metainfo.NewService(peer.Log.Named("metainfo:service"),
peer.Metainfo.Database,
peer.DB.Buckets(),
peer.Metainfo.Metabase,
)
peer.Metainfo.Loop = metainfo.NewLoop(config.Metainfo.Loop, peer.Metainfo.Database)
peer.Services.Add(lifecycle.Item{

View File

@ -5,12 +5,14 @@ package metainfo
import (
"context"
"io"
"time"
"go.uber.org/zap"
"storj.io/common/memory"
"storj.io/storj/private/dbutil"
"storj.io/storj/satellite/metainfo/metabase"
"storj.io/storj/satellite/metainfo/objectdeletion"
"storj.io/storj/satellite/metainfo/piecedeletion"
"storj.io/storj/storage"
@ -104,3 +106,34 @@ func OpenStore(ctx context.Context, logger *zap.Logger, dbURLString string) (db
logger.Debug("Connected to:", zap.String("db source", source))
return db, nil
}
// MetabaseDB stores objects and segments.
type MetabaseDB interface {
io.Closer
// MigrateToLatest migrates to latest schema version.
MigrateToLatest(ctx context.Context) error
}
// OpenMetabase returns database for storing objects and segments.
func OpenMetabase(ctx context.Context, logger *zap.Logger, dbURLString string) (db MetabaseDB, err error) {
_, source, implementation, err := dbutil.SplitConnStr(dbURLString)
if err != nil {
return nil, err
}
switch implementation {
case dbutil.Postgres:
db, err = metabase.Open(ctx, "pgx", dbURLString)
case dbutil.Cockroach:
db, err = metabase.Open(ctx, "cockroach", dbURLString)
default:
err = Error.New("unsupported db implementation: %s", dbURLString)
}
if err != nil {
return nil, err
}
logger.Debug("Connected to:", zap.String("db source", source))
return db, nil
}

View File

@ -62,7 +62,7 @@ func (db *DB) MigrateToLatest(ctx context.Context) error {
// TODO: verify whether this is all we need.
_, err = db.db.ExecContext(ctx, `
CREATE TABLE objects (
CREATE TABLE IF NOT EXISTS objects (
project_id BYTEA NOT NULL,
bucket_name BYTEA NOT NULL, -- we're using bucket_name here to avoid a lookup into buckets table
object_key BYTEA NOT NULL, -- using 'object_key' instead of 'key' to avoid reserved word
@ -94,7 +94,7 @@ func (db *DB) MigrateToLatest(ctx context.Context) error {
// TODO: verify whether this is all we need.
_, err = db.db.ExecContext(ctx, `
CREATE TABLE segments (
CREATE TABLE IF NOT EXISTS segments (
stream_id BYTEA NOT NULL,
position INT8 NOT NULL,

View File

@ -28,14 +28,20 @@ var (
//
// architecture: Service
type Service struct {
logger *zap.Logger
db PointerDB
bucketsDB BucketsDB
logger *zap.Logger
db PointerDB
bucketsDB BucketsDB
metabaseDB MetabaseDB
}
// NewService creates new metainfo service.
func NewService(logger *zap.Logger, db PointerDB, bucketsDB BucketsDB) *Service {
return &Service{logger: logger, db: db, bucketsDB: bucketsDB}
func NewService(logger *zap.Logger, db PointerDB, bucketsDB BucketsDB, metabaseDB MetabaseDB) *Service {
return &Service{
logger: logger,
db: db,
bucketsDB: bucketsDB,
metabaseDB: metabaseDB,
}
}
// Put puts pointer to db under specific path.

View File

@ -67,7 +67,7 @@ type Repairer struct {
// NewRepairer creates a new repairer peer.
func NewRepairer(log *zap.Logger, full *identity.FullIdentity,
pointerDB metainfo.PointerDB,
pointerDB metainfo.PointerDB, metabaseDB metainfo.MetabaseDB,
revocationDB extensions.RevocationDB, repairQueue queue.RepairQueue,
bucketsDB metainfo.BucketsDB, overlayCache overlay.DB,
rollupsWriteCache *orders.RollupsWriteCache, irrDB irreparable.DB,
@ -128,7 +128,7 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity,
}
{ // setup metainfo
peer.Metainfo = metainfo.NewService(log.Named("metainfo"), pointerDB, bucketsDB)
peer.Metainfo = metainfo.NewService(log.Named("metainfo"), pointerDB, bucketsDB, metabaseDB)
}
{ // setup overlay

View File

@ -29,9 +29,10 @@ import (
// SatelliteDatabases maybe name can be better.
type SatelliteDatabases struct {
Name string
MasterDB Database
PointerDB Database
Name string
MasterDB Database
PointerDB Database
MetabaseDB Database
}
// Database describes a test database.
@ -51,14 +52,16 @@ func Databases() []SatelliteDatabases {
postgresConnStr := pgtest.PickPostgres(ignoreSkip{})
return []SatelliteDatabases{
{
Name: "Postgres",
MasterDB: Database{"Postgres", postgresConnStr, "Postgres flag missing, example: -postgres-test-db=" + pgtest.DefaultPostgres + " or use STORJ_TEST_POSTGRES environment variable."},
PointerDB: Database{"Postgres", postgresConnStr, ""},
Name: "Postgres",
MasterDB: Database{"Postgres", postgresConnStr, "Postgres flag missing, example: -postgres-test-db=" + pgtest.DefaultPostgres + " or use STORJ_TEST_POSTGRES environment variable."},
PointerDB: Database{"Postgres", postgresConnStr, ""},
MetabaseDB: Database{"Postgres", postgresConnStr, ""},
},
{
Name: "Cockroach",
MasterDB: Database{"Cockroach", cockroachConnStr, "Cockroach flag missing, example: -cockroach-test-db=" + pgtest.DefaultCockroach + " or use STORJ_TEST_COCKROACH environment variable."},
PointerDB: Database{"Cockroach", cockroachConnStr, ""},
Name: "Cockroach",
MasterDB: Database{"Cockroach", cockroachConnStr, "Cockroach flag missing, example: -cockroach-test-db=" + pgtest.DefaultCockroach + " or use STORJ_TEST_COCKROACH environment variable."},
PointerDB: Database{"Cockroach", cockroachConnStr, ""},
MetabaseDB: Database{"Cockroach", cockroachConnStr, ""},
},
}
}
@ -170,6 +173,47 @@ func CreatePointerDBOnTopOf(ctx context.Context, log *zap.Logger, tempDB *dbutil
return &tempPointerDB{PointerDB: pointerDB, tempDB: tempDB}, err
}
// tempMetabaseDB is a metabase.DB-implementing type that cleans up after itself when closed.
type tempMetabaseDB struct {
metainfo.MetabaseDB
tempDB *dbutil.TempDatabase
}
// Close closes a tempPointerDB and cleans it up afterward.
func (db *tempMetabaseDB) Close() error {
return errs.Combine(db.MetabaseDB.Close(), db.tempDB.Close())
}
// CreateMetabaseDB creates a new satellite metabase for testing.
func CreateMetabaseDB(ctx context.Context, log *zap.Logger, name string, category string, index int, dbInfo Database) (db metainfo.MetabaseDB, err error) {
if dbInfo.URL == "" {
return nil, fmt.Errorf("Database %s connection string not provided. %s", dbInfo.Name, dbInfo.Message)
}
schemaSuffix := SchemaSuffix()
log.Debug("creating", zap.String("suffix", schemaSuffix))
schema := SchemaName(name, category, index, schemaSuffix)
tempDB, err := tempdb.OpenUnique(ctx, dbInfo.URL, schema)
if err != nil {
return nil, err
}
return CreateMetabaseDBOnTopOf(ctx, log, tempDB)
}
// CreateMetabaseDBOnTopOf creates a new metabase on top of an already existing
// temporary database.
func CreateMetabaseDBOnTopOf(ctx context.Context, log *zap.Logger, tempDB *dbutil.TempDatabase) (db metainfo.MetabaseDB, err error) {
metabaseDB, err := metainfo.OpenMetabase(ctx, log.Named("metabase"), tempDB.ConnStr)
if err != nil {
return nil, err
}
err = metabaseDB.MigrateToLatest(ctx)
return &tempMetabaseDB{MetabaseDB: metabaseDB, tempDB: tempDB}, err
}
// Run method will iterate over all supported databases. Will establish
// connection and will create tables for each DB.
func Run(t *testing.T, test func(ctx *testcontext.Context, t *testing.T, db satellite.DB)) {