satellite/metainfo: add MigrateToLatest to PointerDB
In cases like the segment reaper script connecting to the metainfodb, we don't want a db migration to happen automatically when we call metainfo.NewStore. This adds MigrateToLatest method for postgreskv and cockroackv, and calls MigrateToLatest in places where NewStore used to create tables. Change-Id: I682d0f26d609af0601dfdb32a24866cdf5d32a7e
This commit is contained in:
parent
13bf0c62ab
commit
6f84be133a
@ -39,7 +39,7 @@ func cmdAdminRun(cmd *cobra.Command, args []string) (err error) {
|
||||
|
||||
pointerDB, err := metainfo.NewStore(log.Named("pointerdb"), runCfg.Config.Metainfo.DatabaseURL)
|
||||
if err != nil {
|
||||
return errs.New("Error creating metainfo database on satellite api: %+v", err)
|
||||
return errs.New("Error creating metainfodb connection on satellite api: %+v", err)
|
||||
}
|
||||
defer func() {
|
||||
err = errs.Combine(err, pointerDB.Close())
|
||||
@ -67,6 +67,11 @@ func cmdAdminRun(cmd *cobra.Command, args []string) (err error) {
|
||||
log.Warn("Failed to initialize telemetry batcher on satellite admin", zap.Error(err))
|
||||
}
|
||||
|
||||
err = pointerDB.MigrateToLatest(ctx)
|
||||
if err != nil {
|
||||
return errs.New("Error creating metainfodb tables on satellite api: %+v", err)
|
||||
}
|
||||
|
||||
err = db.CheckVersion(ctx)
|
||||
if err != nil {
|
||||
log.Fatal("Failed satellite database version check.", zap.Error(err))
|
||||
|
@ -42,7 +42,7 @@ func cmdAPIRun(cmd *cobra.Command, args []string) (err error) {
|
||||
|
||||
pointerDB, err := metainfo.NewStore(log.Named("pointerdb"), runCfg.Config.Metainfo.DatabaseURL)
|
||||
if err != nil {
|
||||
return errs.New("Error creating metainfo database on satellite api: %+v", err)
|
||||
return errs.New("Error creating metainfodb connection on satellite api: %+v", err)
|
||||
}
|
||||
defer func() {
|
||||
err = errs.Combine(err, pointerDB.Close())
|
||||
@ -83,6 +83,11 @@ func cmdAPIRun(cmd *cobra.Command, args []string) (err error) {
|
||||
log.Warn("Failed to initialize telemetry batcher on satellite api", zap.Error(err))
|
||||
}
|
||||
|
||||
err = pointerDB.MigrateToLatest(ctx)
|
||||
if err != nil {
|
||||
return errs.New("Error creating metainfodb tables on satellite api: %+v", err)
|
||||
}
|
||||
|
||||
err = db.CheckVersion(ctx)
|
||||
if err != nil {
|
||||
log.Fatal("Failed satellite database version check.", zap.Error(err))
|
||||
|
@ -37,7 +37,7 @@ func cmdGCRun(cmd *cobra.Command, args []string) (err error) {
|
||||
|
||||
pointerDB, err := metainfo.NewStore(log.Named("pointerdb"), runCfg.Metainfo.DatabaseURL)
|
||||
if err != nil {
|
||||
return errs.New("Error creating pointerDB GC: %+v", err)
|
||||
return errs.New("Error creating pointerDB connection GC: %+v", err)
|
||||
}
|
||||
defer func() {
|
||||
err = errs.Combine(err, pointerDB.Close())
|
||||
@ -65,6 +65,11 @@ func cmdGCRun(cmd *cobra.Command, args []string) (err error) {
|
||||
log.Warn("Failed to initialize telemetry batcher on satellite GC", zap.Error(err))
|
||||
}
|
||||
|
||||
err = pointerDB.MigrateToLatest(ctx)
|
||||
if err != nil {
|
||||
return errs.New("Error creating pointerDB tables GC: %+v", err)
|
||||
}
|
||||
|
||||
err = db.CheckVersion(ctx)
|
||||
if err != nil {
|
||||
log.Fatal("Failed satellite database version check for GC.", zap.Error(err))
|
||||
|
@ -272,7 +272,7 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
|
||||
|
||||
pointerDB, err := metainfo.NewStore(log.Named("pointerdb"), runCfg.Metainfo.DatabaseURL)
|
||||
if err != nil {
|
||||
return errs.New("Error creating revocation database: %+v", err)
|
||||
return errs.New("Error creating metainfodb connection: %+v", err)
|
||||
}
|
||||
defer func() {
|
||||
err = errs.Combine(err, pointerDB.Close())
|
||||
@ -314,6 +314,11 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
|
||||
log.Warn("Failed to initialize telemetry batcher", zap.Error(err))
|
||||
}
|
||||
|
||||
err = pointerDB.MigrateToLatest(ctx)
|
||||
if err != nil {
|
||||
return errs.New("Error creating metainfodb tables: %+v", err)
|
||||
}
|
||||
|
||||
err = db.CheckVersion(ctx)
|
||||
if err != nil {
|
||||
log.Fatal("Failed satellite database version check.", zap.Error(err))
|
||||
@ -342,15 +347,17 @@ func cmdMigrationRun(cmd *cobra.Command, args []string) (err error) {
|
||||
return errs.New("Error creating tables for master database on satellite: %+v", err)
|
||||
}
|
||||
|
||||
// There should be an explicit CreateTables call for the pointerdb as well.
|
||||
// This is tracked in jira ticket #3337.
|
||||
pdb, err := metainfo.NewStore(log.Named("migration"), runCfg.Metainfo.DatabaseURL)
|
||||
if err != nil {
|
||||
return errs.New("Error creating tables for pointer database on satellite: %+v", err)
|
||||
return errs.New("Error creating pointer database connection on satellite: %+v", err)
|
||||
}
|
||||
defer func() {
|
||||
err = errs.Combine(err, pdb.Close())
|
||||
}()
|
||||
err = pdb.MigrateToLatest(ctx)
|
||||
if err != nil {
|
||||
return errs.New("Error creating tables for pointer database on satellite: %+v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -39,7 +39,7 @@ func cmdRepairerRun(cmd *cobra.Command, args []string) (err error) {
|
||||
|
||||
pointerDB, err := metainfo.NewStore(log.Named("pointerdb"), runCfg.Metainfo.DatabaseURL)
|
||||
if err != nil {
|
||||
return errs.New("Error creating metainfo database: %+v", err)
|
||||
return errs.New("Error creating metainfo database connection: %+v", err)
|
||||
}
|
||||
defer func() {
|
||||
err = errs.Combine(err, pointerDB.Close())
|
||||
@ -84,6 +84,11 @@ func cmdRepairerRun(cmd *cobra.Command, args []string) (err error) {
|
||||
log.Warn("Failed to initialize telemetry batcher on repairer", zap.Error(err))
|
||||
}
|
||||
|
||||
err = pointerDB.MigrateToLatest(ctx)
|
||||
if err != nil {
|
||||
return errs.New("Error creating tables for metainfo database: %+v", err)
|
||||
}
|
||||
|
||||
err = db.CheckVersion(ctx)
|
||||
if err != nil {
|
||||
log.Fatal("Failed satellite database version check.", zap.Error(err))
|
||||
|
@ -4,6 +4,7 @@
|
||||
package metainfo
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
@ -63,10 +64,13 @@ type Config struct {
|
||||
//
|
||||
// architecture: Database
|
||||
type PointerDB interface {
|
||||
// MigrateToLatest migrates to latest schema version.
|
||||
MigrateToLatest(ctx context.Context) error
|
||||
|
||||
storage.KeyValueStore
|
||||
}
|
||||
|
||||
// NewStore returns database for storing pointer data
|
||||
// NewStore returns database for storing pointer data.
|
||||
func NewStore(logger *zap.Logger, dbURLString string) (db PointerDB, err error) {
|
||||
_, source, implementation, err := dbutil.SplitConnStr(dbURLString)
|
||||
if err != nil {
|
||||
|
@ -157,6 +157,10 @@ func CreatePointerDB(ctx context.Context, log *zap.Logger, name string, category
|
||||
// temporary database.
|
||||
func CreatePointerDBOnTopOf(ctx context.Context, log *zap.Logger, tempDB *dbutil.TempDatabase) (db metainfo.PointerDB, err error) {
|
||||
pointerDB, err := metainfo.NewStore(log.Named("pointerdb"), tempDB.ConnStr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = pointerDB.MigrateToLatest(ctx)
|
||||
return &tempPointerDB{PointerDB: pointerDB, tempDB: tempDB}, err
|
||||
}
|
||||
|
||||
|
@ -26,8 +26,8 @@ var (
|
||||
|
||||
// Client is the entrypoint into a cockroachkv data store
|
||||
type Client struct {
|
||||
db tagsql.DB
|
||||
|
||||
db tagsql.DB
|
||||
dbURL string
|
||||
lookupLimit int
|
||||
}
|
||||
|
||||
@ -42,18 +42,17 @@ func New(dbURL string) (*Client, error) {
|
||||
|
||||
dbutil.Configure(db, "cockroachkv", mon)
|
||||
|
||||
// TODO: new shouldn't be taking ctx as argument
|
||||
err = schema.PrepareDB(context.TODO(), db)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return NewWith(db), nil
|
||||
return NewWith(db, dbURL), nil
|
||||
}
|
||||
|
||||
// NewWith instantiates a new cockroachkv client given db.
|
||||
func NewWith(db tagsql.DB) *Client {
|
||||
return &Client{db: db, lookupLimit: storage.DefaultLookupLimit}
|
||||
func NewWith(db tagsql.DB, dbURL string) *Client {
|
||||
return &Client{db: db, dbURL: dbURL, lookupLimit: storage.DefaultLookupLimit}
|
||||
}
|
||||
|
||||
// MigrateToLatest migrates to latest schema version.
|
||||
func (client *Client) MigrateToLatest(ctx context.Context) error {
|
||||
return schema.PrepareDB(ctx, client.db)
|
||||
}
|
||||
|
||||
// SetLookupLimit sets the lookup limit.
|
||||
|
@ -7,11 +7,11 @@ import (
|
||||
"testing"
|
||||
|
||||
_ "github.com/lib/pq"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/storj/private/dbutil/cockroachutil"
|
||||
"storj.io/storj/private/dbutil/pgutil/pgtest"
|
||||
"storj.io/storj/storage/cockroachkv/schema"
|
||||
"storj.io/storj/storage/testsuite"
|
||||
)
|
||||
|
||||
@ -25,12 +25,7 @@ func newTestCockroachDB(ctx context.Context, t testing.TB) (store *Client, clean
|
||||
t.Fatalf("init: %+v", err)
|
||||
}
|
||||
|
||||
err = schema.PrepareDB(ctx, tdb.DB)
|
||||
if err != nil {
|
||||
t.Fatalf("init: %+v", err)
|
||||
}
|
||||
|
||||
return NewWith(tdb.DB), func() {
|
||||
return NewWith(tdb.DB, *pgtest.CrdbConnStr), func() {
|
||||
if err := tdb.Close(); err != nil {
|
||||
t.Fatalf("failed to close db: %v", err)
|
||||
}
|
||||
@ -44,6 +39,9 @@ func TestSuite(t *testing.T) {
|
||||
store, cleanup := newTestCockroachDB(ctx, t)
|
||||
defer cleanup()
|
||||
|
||||
err := store.MigrateToLatest(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
store.SetLookupLimit(500)
|
||||
testsuite.RunTests(t, store)
|
||||
}
|
||||
|
@ -24,12 +24,12 @@ var (
|
||||
|
||||
// Client is the entrypoint into a postgreskv data store
|
||||
type Client struct {
|
||||
db tagsql.DB
|
||||
|
||||
db tagsql.DB
|
||||
dbURL string
|
||||
lookupLimit int
|
||||
}
|
||||
|
||||
// New instantiates a new postgreskv client given db URL
|
||||
// New instantiates a new postgreskv client given db URL.
|
||||
func New(dbURL string) (*Client, error) {
|
||||
dbURL = pgutil.CheckApplicationName(dbURL)
|
||||
|
||||
@ -39,18 +39,17 @@ func New(dbURL string) (*Client, error) {
|
||||
}
|
||||
|
||||
dbutil.Configure(db, "postgreskv", mon)
|
||||
//TODO: Fix the parameters!!
|
||||
err = schema.PrepareDB(context.TODO(), db, dbURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return NewWith(db), nil
|
||||
return NewWith(db, dbURL), nil
|
||||
}
|
||||
|
||||
// NewWith instantiates a new postgreskv client given db.
|
||||
func NewWith(db tagsql.DB) *Client {
|
||||
return &Client{db: db, lookupLimit: storage.DefaultLookupLimit}
|
||||
func NewWith(db tagsql.DB, dbURL string) *Client {
|
||||
return &Client{db: db, lookupLimit: storage.DefaultLookupLimit, dbURL: dbURL}
|
||||
}
|
||||
|
||||
// MigrateToLatest migrates to latest schema version.
|
||||
func (client *Client) MigrateToLatest(ctx context.Context) error {
|
||||
return schema.PrepareDB(ctx, client.db, client.dbURL)
|
||||
}
|
||||
|
||||
// SetLookupLimit sets the lookup limit.
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/lib/pq"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/common/testcontext"
|
||||
@ -40,6 +41,10 @@ func TestSuite(t *testing.T) {
|
||||
store, cleanup := newTestPostgres(t)
|
||||
defer cleanup()
|
||||
|
||||
ctx := testcontext.New(t)
|
||||
err := store.MigrateToLatest(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// zap := zaptest.NewLogger(t)
|
||||
// loggedStore := storelogger.New(zap, store)
|
||||
store.SetLookupLimit(500)
|
||||
|
@ -44,6 +44,9 @@ type Client struct {
|
||||
// New creates a new in-memory key-value store
|
||||
func New() *Client { return &Client{lookupLimit: storage.DefaultLookupLimit} }
|
||||
|
||||
// MigrateToLatest pretends to migrate to latest db schema version.
|
||||
func (store *Client) MigrateToLatest(ctx context.Context) error { return nil }
|
||||
|
||||
// SetLookupLimit sets the lookup limit.
|
||||
func (store *Client) SetLookupLimit(v int) { store.lookupLimit = v }
|
||||
|
||||
|
@ -23,7 +23,6 @@ func RunTests(t *testing.T, store storage.KeyValueStore) {
|
||||
// store = storelogger.NewTest(t, store)
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
t.Run("CRUD", func(t *testing.T) { testCRUD(t, ctx, store) })
|
||||
t.Run("Constraints", func(t *testing.T) { testConstraints(t, ctx, store) })
|
||||
t.Run("Iterate", func(t *testing.T) { testIterate(t, ctx, store) })
|
||||
|
Loading…
Reference in New Issue
Block a user