private/dbutil/txutil: create new transactions for retries
it was noticed that if you had a long lived transaction A that was blocking some other transaction B and A was being aborted due to retriable errors, then transaction B was never given priority. this was due to using savepoints to do lightweight retries. this behavior was problematic becaue we had some queries blocked for over 16 hours, so this commit addresses the issue with two prongs: 1. bound the amount of time we will retry a transaction 2. create new transactions when a retry is needed the first ensures that we never wait for 16 hours, and the value chosen is 10 minutes. that should be long enough for an ample amount of retries for small queries, and huge queries probably shouldn't be retried, even if possible: it's more preferrable to find a way to make them smaller. the second ensures that even in the case of retries, queries that are blocked on the aborted transaction gain priority to run. between those two changes, the maximum stall time due to retries should be bounded to around 10 minutes. Change-Id: Icf898501ef505a89738820a3fae2580988f9f5f4
This commit is contained in:
parent
71ff044edb
commit
d20db90cff
1
go.mod
1
go.mod
@ -26,7 +26,6 @@ require (
|
||||
github.com/boltdb/bolt v1.3.1
|
||||
github.com/cheggaaa/pb v1.0.5-0.20160713104425-73ae1d68fe0b // indirect
|
||||
github.com/cheggaaa/pb/v3 v3.0.1
|
||||
github.com/cockroachdb/cockroach-go v0.0.0-20181001143604-e0a95dfd547c
|
||||
github.com/djherbis/atime v1.0.0 // indirect
|
||||
github.com/dustin/go-humanize v1.0.0 // indirect
|
||||
github.com/eclipse/paho.mqtt.golang v1.1.1 // indirect
|
||||
|
2
go.sum
2
go.sum
@ -599,8 +599,6 @@ honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWh
|
||||
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||
storj.io/common v0.0.0-20200108114547-1c62e5708bce/go.mod h1:mDnchZF+e7g7u630Wwgu/X/zCmqpU0lvy5kQ3jQgs5k=
|
||||
storj.io/common v0.0.0-20200124181041-5b2cd8f217e2 h1:NnI0ikttUAambPxieAxRbbwacpSGxlInX9o39KhT+IU=
|
||||
storj.io/common v0.0.0-20200124181041-5b2cd8f217e2/go.mod h1:0yn1ANoDXETNBREGQHq8d7m1Kq0vWMu6Ul7C2YPZo/E=
|
||||
storj.io/common v0.0.0-20200127192906-afcfc1488e5e h1:m9led2A4O/rssM4loH44yncSM75GJLPYFgxSJbKMBjI=
|
||||
storj.io/common v0.0.0-20200127192906-afcfc1488e5e/go.mod h1:0yn1ANoDXETNBREGQHq8d7m1Kq0vWMu6Ul7C2YPZo/E=
|
||||
storj.io/common v0.0.0-20200128165341-932febe7e00f h1:1r+nfY0bKctX+sex/FyZ3ImLK3yGMh/YKlMvRJQFXZ4=
|
||||
|
@ -5,7 +5,6 @@ package sqliteutil
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"database/sql/driver"
|
||||
"fmt"
|
||||
|
||||
@ -13,17 +12,9 @@ import (
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/private/dbutil/txutil"
|
||||
"storj.io/storj/private/migrate"
|
||||
"storj.io/storj/private/tagsql"
|
||||
)
|
||||
|
||||
// DB is the minimal interface required to perform migrations.
|
||||
type DB interface {
|
||||
migrate.DB
|
||||
Conn(ctx context.Context) (tagsql.Conn, error)
|
||||
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
|
||||
}
|
||||
|
||||
var (
|
||||
// ErrMigrateTables is error class for MigrateTables
|
||||
ErrMigrateTables = errs.Class("migrate tables")
|
||||
@ -49,7 +40,7 @@ func getSqlite3Conn(conn interface{}) (*sqlite3.SQLiteConn, error) {
|
||||
// MigrateTablesToDatabase copies the specified tables from srcDB into destDB.
|
||||
// All tables in destDB will be dropped other than those specified in
|
||||
// tablesToKeep.
|
||||
func MigrateTablesToDatabase(ctx context.Context, srcDB, destDB DB, tablesToKeep ...string) error {
|
||||
func MigrateTablesToDatabase(ctx context.Context, srcDB, destDB tagsql.DB, tablesToKeep ...string) error {
|
||||
err := backupDBs(ctx, srcDB, destDB)
|
||||
if err != nil {
|
||||
return ErrMigrateTables.Wrap(err)
|
||||
@ -59,7 +50,7 @@ func MigrateTablesToDatabase(ctx context.Context, srcDB, destDB DB, tablesToKeep
|
||||
return ErrMigrateTables.Wrap(KeepTables(ctx, destDB, tablesToKeep...))
|
||||
}
|
||||
|
||||
func backupDBs(ctx context.Context, srcDB, destDB DB) error {
|
||||
func backupDBs(ctx context.Context, srcDB, destDB tagsql.DB) error {
|
||||
// Retrieve the raw Sqlite3 driver connections for the src and dest so that
|
||||
// we can execute the backup API for a corruption safe clone.
|
||||
srcConn, err := srcDB.Conn(ctx)
|
||||
@ -164,7 +155,7 @@ func backupConns(ctx context.Context, sourceDB *sqlite3.SQLiteConn, destDB *sqli
|
||||
}
|
||||
|
||||
// KeepTables drops all the tables except the specified tables to keep.
|
||||
func KeepTables(ctx context.Context, db DB, tablesToKeep ...string) (err error) {
|
||||
func KeepTables(ctx context.Context, db tagsql.DB, tablesToKeep ...string) (err error) {
|
||||
err = dropTables(ctx, db, tablesToKeep...)
|
||||
if err != nil {
|
||||
return ErrKeepTables.Wrap(err)
|
||||
@ -182,12 +173,8 @@ func KeepTables(ctx context.Context, db DB, tablesToKeep ...string) (err error)
|
||||
}
|
||||
|
||||
// dropTables performs the table drops in a single transaction
|
||||
func dropTables(ctx context.Context, db DB, tablesToKeep ...string) (err error) {
|
||||
tx, err := db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = txutil.ExecuteInTx(ctx, db.Driver(), tx, func() error {
|
||||
func dropTables(ctx context.Context, db tagsql.DB, tablesToKeep ...string) (err error) {
|
||||
err = txutil.WithTx(ctx, db, nil, func(ctx context.Context, tx tagsql.Tx) error {
|
||||
// Get a list of tables excluding sqlite3 system tables.
|
||||
rows, err := tx.QueryContext(ctx, "SELECT name FROM sqlite_master WHERE type ='table' AND name NOT LIKE 'sqlite_%';")
|
||||
if err != nil {
|
||||
|
@ -8,47 +8,17 @@ package txutil
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"database/sql/driver"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/cockroach-go/crdb"
|
||||
"github.com/lib/pq"
|
||||
"github.com/zeebo/errs"
|
||||
"gopkg.in/spacemonkeygo/monkit.v2"
|
||||
|
||||
"storj.io/storj/private/dbutil/cockroachutil"
|
||||
"storj.io/storj/private/tagsql"
|
||||
)
|
||||
|
||||
// txLike is the minimal interface for transaction-like objects to work with the necessary retry
|
||||
// semantics on things like CockroachDB.
|
||||
type txLike interface {
|
||||
ExecContext(context.Context, string, ...interface{}) (sql.Result, error)
|
||||
Commit() error
|
||||
Rollback() error
|
||||
}
|
||||
|
||||
// ExecuteInTx runs the fn callback inside the specified transaction, restarting the transaction
|
||||
// as necessary (for systems like CockroachDB), and committing or rolling back the transaction
|
||||
// depending on whether fn returns an error.
|
||||
//
|
||||
// In most cases, WithTx() is to be preferred, but this variant is useful when working with a db
|
||||
// that isn't an *sql.DB.
|
||||
func ExecuteInTx(ctx context.Context, dbDriver driver.Driver, tx txLike, fn func() error) (err error) {
|
||||
if _, ok := dbDriver.(*cockroachutil.Driver); ok {
|
||||
return crdb.ExecuteInTx(ctx, tx, fn)
|
||||
}
|
||||
defer func() {
|
||||
if x := recover(); x != nil {
|
||||
// does nothing if tx is already rolled back or committed.
|
||||
_ = tx.Rollback()
|
||||
panic(x)
|
||||
}
|
||||
if err == nil {
|
||||
err = tx.Commit()
|
||||
} else {
|
||||
err = errs.Combine(err, tx.Rollback())
|
||||
}
|
||||
}()
|
||||
return fn()
|
||||
}
|
||||
var mon = monkit.Package()
|
||||
|
||||
// WithTx starts a transaction on the given sql.DB. The transaction is started in the appropriate
|
||||
// manner, and will be restarted if appropriate. While in the transaction, fn is called with a
|
||||
@ -57,12 +27,54 @@ func ExecuteInTx(ctx context.Context, dbDriver driver.Driver, tx txLike, fn func
|
||||
//
|
||||
// If fn has any side effects outside of changes to the database, they must be idempotent! fn may
|
||||
// be called more than one time.
|
||||
func WithTx(ctx context.Context, db tagsql.DB, txOpts *sql.TxOptions, fn func(context.Context, tagsql.Tx) error) error {
|
||||
func WithTx(ctx context.Context, db tagsql.DB, txOpts *sql.TxOptions, fn func(context.Context, tagsql.Tx) error) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
start := time.Now()
|
||||
|
||||
for i := 0; ; i++ {
|
||||
err, rollbackErr := withTxOnce(ctx, db, txOpts, fn)
|
||||
if time.Since(start) < 5*time.Minute && i < 10 {
|
||||
if code := errCode(err); code == "CR000" || code == "40001" {
|
||||
mon.Event(fmt.Sprintf("transaction_retry_%d", i+1))
|
||||
continue
|
||||
}
|
||||
}
|
||||
mon.IntVal("transaction_retries").Observe(int64(i))
|
||||
return errs.Wrap(errs.Combine(err, rollbackErr))
|
||||
}
|
||||
}
|
||||
|
||||
// withTxOnce creates a transaction, ensures that it is eventually released (commit or rollback)
|
||||
// and passes it to the provided callback. It does not handle retries or anything, delegating
|
||||
// that to callers.
|
||||
func withTxOnce(ctx context.Context, db tagsql.DB, txOpts *sql.TxOptions, fn func(context.Context, tagsql.Tx) error) (err, rollbackErr error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
tx, err := db.BeginTx(ctx, txOpts)
|
||||
if err != nil {
|
||||
return err
|
||||
return errs.Wrap(err), nil
|
||||
}
|
||||
return ExecuteInTx(ctx, db.Driver(), tx, func() error {
|
||||
return fn(ctx, tx)
|
||||
})
|
||||
defer func() {
|
||||
if err == nil {
|
||||
err = tx.Commit()
|
||||
} else {
|
||||
rollbackErr = tx.Rollback()
|
||||
}
|
||||
}()
|
||||
|
||||
return fn(ctx, tx), nil
|
||||
}
|
||||
|
||||
// errCode returns the error code associated with any postgres error in the chain of
|
||||
// errors walked by unwrapping.
|
||||
func errCode(err error) (code string) {
|
||||
errs.IsFunc(err, func(err error) bool {
|
||||
if pgerr, ok := err.(*pq.Error); ok {
|
||||
code = string(pgerr.Code)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
})
|
||||
return code
|
||||
}
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/private/dbutil/txutil"
|
||||
"storj.io/storj/private/tagsql"
|
||||
)
|
||||
|
||||
@ -21,7 +22,7 @@ func Create(ctx context.Context, identifier string, db DBX) error {
|
||||
// when the schemas match.
|
||||
justRollbackPlease := errs.Class("only used to tell WithTx to do a rollback")
|
||||
|
||||
err := WithTx(ctx, db, func(ctx context.Context, tx tagsql.Tx) (err error) {
|
||||
err := txutil.WithTx(ctx, db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) {
|
||||
schema := db.Schema()
|
||||
|
||||
_, err = tx.ExecContext(ctx, db.Rebind(`CREATE TABLE IF NOT EXISTS table_schemas (id text, schemaText text);`))
|
||||
|
@ -4,44 +4,20 @@
|
||||
package migrate
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"database/sql/driver"
|
||||
|
||||
"storj.io/storj/private/dbutil/txutil"
|
||||
"storj.io/storj/private/tagsql"
|
||||
)
|
||||
|
||||
// DB is the minimal implementation that is needed by migrations.
|
||||
//
|
||||
// DB can optionally have `Rebind(string) string` for translating `? queries for the specific database.
|
||||
type DB interface {
|
||||
BeginTx(ctx context.Context, txOptions *sql.TxOptions) (tagsql.Tx, error)
|
||||
Driver() driver.Driver
|
||||
}
|
||||
|
||||
// DBX contains additional methods for migrations.
|
||||
type DBX interface {
|
||||
DB
|
||||
tagsql.DB
|
||||
Schema() string
|
||||
Rebind(string) string
|
||||
}
|
||||
|
||||
// rebind uses Rebind method when the database has the func.
|
||||
func rebind(db DB, s string) string {
|
||||
func rebind(db tagsql.DB, s string) string {
|
||||
if dbx, ok := db.(interface{ Rebind(string) string }); ok {
|
||||
return dbx.Rebind(s)
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// WithTx runs the given callback in the context of a transaction.
|
||||
func WithTx(ctx context.Context, db DB, fn func(ctx context.Context, tx tagsql.Tx) error) error {
|
||||
tx, err := db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return txutil.ExecuteInTx(ctx, db.Driver(), tx, func() error {
|
||||
return fn(ctx, tx)
|
||||
})
|
||||
}
|
||||
|
@ -14,6 +14,7 @@ import (
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/storj/private/dbutil/txutil"
|
||||
"storj.io/storj/private/tagsql"
|
||||
)
|
||||
|
||||
@ -59,7 +60,7 @@ type Migration struct {
|
||||
|
||||
// Step describes a single step in migration.
|
||||
type Step struct {
|
||||
DB DB // The DB to execute this step on
|
||||
DB tagsql.DB // The DB to execute this step on
|
||||
Description string
|
||||
Version int // Versions should start at 0
|
||||
Action Action
|
||||
@ -67,7 +68,7 @@ type Step struct {
|
||||
|
||||
// Action is something that needs to be done
|
||||
type Action interface {
|
||||
Run(ctx context.Context, log *zap.Logger, db DB, tx tagsql.Tx) error
|
||||
Run(ctx context.Context, log *zap.Logger, db tagsql.DB, tx tagsql.Tx) error
|
||||
}
|
||||
|
||||
// TargetVersion returns migration with steps upto specified version
|
||||
@ -166,7 +167,7 @@ func (migration *Migration) Run(ctx context.Context, log *zap.Logger) error {
|
||||
stepLog.Info(step.Description)
|
||||
}
|
||||
|
||||
err = WithTx(ctx, step.DB, func(ctx context.Context, tx tagsql.Tx) error {
|
||||
err = txutil.WithTx(ctx, step.DB, nil, func(ctx context.Context, tx tagsql.Tx) error {
|
||||
err = step.Action.Run(ctx, stepLog, step.DB, tx)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -198,8 +199,8 @@ func (migration *Migration) Run(ctx context.Context, log *zap.Logger) error {
|
||||
}
|
||||
|
||||
// createVersionTable creates a new version table
|
||||
func (migration *Migration) ensureVersionTable(ctx context.Context, log *zap.Logger, db DB) error {
|
||||
err := WithTx(ctx, db, func(ctx context.Context, tx tagsql.Tx) error {
|
||||
func (migration *Migration) ensureVersionTable(ctx context.Context, log *zap.Logger, db tagsql.DB) error {
|
||||
err := txutil.WithTx(ctx, db, nil, func(ctx context.Context, tx tagsql.Tx) error {
|
||||
_, err := tx.Exec(ctx, rebind(db, `CREATE TABLE IF NOT EXISTS `+migration.Table+` (version int, commited_at text)`)) //nolint:misspell
|
||||
return err
|
||||
})
|
||||
@ -207,9 +208,9 @@ func (migration *Migration) ensureVersionTable(ctx context.Context, log *zap.Log
|
||||
}
|
||||
|
||||
// getLatestVersion finds the latest version table
|
||||
func (migration *Migration) getLatestVersion(ctx context.Context, log *zap.Logger, db DB) (int, error) {
|
||||
func (migration *Migration) getLatestVersion(ctx context.Context, log *zap.Logger, db tagsql.DB) (int, error) {
|
||||
var version sql.NullInt64
|
||||
err := WithTx(ctx, db, func(ctx context.Context, tx tagsql.Tx) error {
|
||||
err := txutil.WithTx(ctx, db, nil, func(ctx context.Context, tx tagsql.Tx) error {
|
||||
err := tx.QueryRow(ctx, rebind(db, `SELECT MAX(version) FROM `+migration.Table)).Scan(&version)
|
||||
if err == sql.ErrNoRows || !version.Valid {
|
||||
version.Int64 = -1
|
||||
@ -222,7 +223,7 @@ func (migration *Migration) getLatestVersion(ctx context.Context, log *zap.Logge
|
||||
}
|
||||
|
||||
// addVersion adds information about a new migration
|
||||
func (migration *Migration) addVersion(ctx context.Context, tx tagsql.Tx, db DB, version int) error {
|
||||
func (migration *Migration) addVersion(ctx context.Context, tx tagsql.Tx, db tagsql.DB, version int) error {
|
||||
_, err := tx.Exec(ctx, rebind(db, `
|
||||
INSERT INTO `+migration.Table+` (version, commited_at) VALUES (?, ?)`), //nolint:misspell
|
||||
version, time.Now().String(),
|
||||
@ -231,7 +232,7 @@ func (migration *Migration) addVersion(ctx context.Context, tx tagsql.Tx, db DB,
|
||||
}
|
||||
|
||||
// CurrentVersion finds the latest version for the db
|
||||
func (migration *Migration) CurrentVersion(ctx context.Context, log *zap.Logger, db DB) (int, error) {
|
||||
func (migration *Migration) CurrentVersion(ctx context.Context, log *zap.Logger, db tagsql.DB) (int, error) {
|
||||
err := migration.ensureVersionTable(ctx, log, db)
|
||||
if err != nil {
|
||||
return -1, Error.Wrap(err)
|
||||
@ -243,7 +244,7 @@ func (migration *Migration) CurrentVersion(ctx context.Context, log *zap.Logger,
|
||||
type SQL []string
|
||||
|
||||
// Run runs the SQL statements
|
||||
func (sql SQL) Run(ctx context.Context, log *zap.Logger, db DB, tx tagsql.Tx) (err error) {
|
||||
func (sql SQL) Run(ctx context.Context, log *zap.Logger, db tagsql.DB, tx tagsql.Tx) (err error) {
|
||||
for _, query := range sql {
|
||||
_, err := tx.Exec(ctx, rebind(db, query))
|
||||
if err != nil {
|
||||
@ -254,9 +255,9 @@ func (sql SQL) Run(ctx context.Context, log *zap.Logger, db DB, tx tagsql.Tx) (e
|
||||
}
|
||||
|
||||
// Func is an arbitrary operation
|
||||
type Func func(ctx context.Context, log *zap.Logger, db DB, tx tagsql.Tx) error
|
||||
type Func func(ctx context.Context, log *zap.Logger, db tagsql.DB, tx tagsql.Tx) error
|
||||
|
||||
// Run runs the migration
|
||||
func (fn Func) Run(ctx context.Context, log *zap.Logger, db DB, tx tagsql.Tx) error {
|
||||
func (fn Func) Run(ctx context.Context, log *zap.Logger, db tagsql.DB, tx tagsql.Tx) error {
|
||||
return fn(ctx, log, db, tx)
|
||||
}
|
||||
|
@ -76,7 +76,7 @@ func testBasicMigrationGeneric(ctx *testcontext.Context, t *testing.T, connStr s
|
||||
basicMigration(ctx, t, db.DB, &postgresDB{DB: db.DB})
|
||||
}
|
||||
|
||||
func basicMigration(ctx *testcontext.Context, t *testing.T, db tagsql.DB, testDB migrate.DB) {
|
||||
func basicMigration(ctx *testcontext.Context, t *testing.T, db tagsql.DB, testDB tagsql.DB) {
|
||||
dbName := strings.ToLower(`versions_` + t.Name())
|
||||
defer func() { assert.NoError(t, dropTables(ctx, db, dbName, "users")) }()
|
||||
|
||||
@ -98,7 +98,7 @@ func basicMigration(ctx *testcontext.Context, t *testing.T, db tagsql.DB, testDB
|
||||
DB: testDB,
|
||||
Description: "Move files",
|
||||
Version: 2,
|
||||
Action: migrate.Func(func(_ context.Context, log *zap.Logger, _ migrate.DB, tx tagsql.Tx) error {
|
||||
Action: migrate.Func(func(_ context.Context, log *zap.Logger, _ tagsql.DB, tx tagsql.Tx) error {
|
||||
return os.Rename(ctx.File("alpha.txt"), ctx.File("beta.txt"))
|
||||
}),
|
||||
},
|
||||
@ -171,7 +171,7 @@ func TestMultipleMigrationPostgres(t *testing.T) {
|
||||
multipleMigration(t, db, &postgresDB{DB: db})
|
||||
}
|
||||
|
||||
func multipleMigration(t *testing.T, db tagsql.DB, testDB migrate.DB) {
|
||||
func multipleMigration(t *testing.T, db tagsql.DB, testDB tagsql.DB) {
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
@ -186,7 +186,7 @@ func multipleMigration(t *testing.T, db tagsql.DB, testDB migrate.DB) {
|
||||
DB: testDB,
|
||||
Description: "Step 1",
|
||||
Version: 1,
|
||||
Action: migrate.Func(func(ctx context.Context, log *zap.Logger, _ migrate.DB, tx tagsql.Tx) error {
|
||||
Action: migrate.Func(func(ctx context.Context, log *zap.Logger, _ tagsql.DB, tx tagsql.Tx) error {
|
||||
steps++
|
||||
return nil
|
||||
}),
|
||||
@ -195,7 +195,7 @@ func multipleMigration(t *testing.T, db tagsql.DB, testDB migrate.DB) {
|
||||
DB: testDB,
|
||||
Description: "Step 2",
|
||||
Version: 2,
|
||||
Action: migrate.Func(func(ctx context.Context, log *zap.Logger, _ migrate.DB, tx tagsql.Tx) error {
|
||||
Action: migrate.Func(func(ctx context.Context, log *zap.Logger, _ tagsql.DB, tx tagsql.Tx) error {
|
||||
steps++
|
||||
return nil
|
||||
}),
|
||||
@ -211,7 +211,7 @@ func multipleMigration(t *testing.T, db tagsql.DB, testDB migrate.DB) {
|
||||
DB: testDB,
|
||||
Description: "Step 3",
|
||||
Version: 3,
|
||||
Action: migrate.Func(func(ctx context.Context, log *zap.Logger, _ migrate.DB, tx tagsql.Tx) error {
|
||||
Action: migrate.Func(func(ctx context.Context, log *zap.Logger, _ tagsql.DB, tx tagsql.Tx) error {
|
||||
steps++
|
||||
return nil
|
||||
}),
|
||||
@ -247,7 +247,7 @@ func TestFailedMigrationPostgres(t *testing.T) {
|
||||
failedMigration(t, db, &postgresDB{DB: db})
|
||||
}
|
||||
|
||||
func failedMigration(t *testing.T, db tagsql.DB, testDB migrate.DB) {
|
||||
func failedMigration(t *testing.T, db tagsql.DB, testDB tagsql.DB) {
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
@ -261,7 +261,7 @@ func failedMigration(t *testing.T, db tagsql.DB, testDB migrate.DB) {
|
||||
DB: testDB,
|
||||
Description: "Step 1",
|
||||
Version: 1,
|
||||
Action: migrate.Func(func(ctx context.Context, log *zap.Logger, _ migrate.DB, tx tagsql.Tx) error {
|
||||
Action: migrate.Func(func(ctx context.Context, log *zap.Logger, _ tagsql.DB, tx tagsql.Tx) error {
|
||||
return fmt.Errorf("migration failed")
|
||||
}),
|
||||
},
|
||||
|
@ -13,6 +13,7 @@ import (
|
||||
// load our cockroach sql driver for anywhere that uses this dbx.Open
|
||||
_ "storj.io/storj/private/dbutil/cockroachutil"
|
||||
"storj.io/storj/private/dbutil/txutil"
|
||||
"storj.io/storj/private/tagsql"
|
||||
)
|
||||
|
||||
//go:generate sh gen.sh
|
||||
@ -57,11 +58,10 @@ func (err *constraintError) Error() string {
|
||||
|
||||
// WithTx wraps DB code in a transaction
|
||||
func (db *DB) WithTx(ctx context.Context, fn func(context.Context, *Tx) error) (err error) {
|
||||
tx, err := db.Open(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return txutil.ExecuteInTx(ctx, db.Driver(), tx.Tx, func() error {
|
||||
return fn(ctx, tx)
|
||||
return txutil.WithTx(ctx, db, nil, func(ctx context.Context, tx tagsql.Tx) error {
|
||||
return fn(ctx, &Tx{
|
||||
Tx: tx,
|
||||
txMethods: db.wrapTx(tx),
|
||||
})
|
||||
})
|
||||
}
|
||||
|
@ -14,6 +14,7 @@ import (
|
||||
"storj.io/storj/private/dbutil"
|
||||
"storj.io/storj/private/dbutil/pgutil"
|
||||
"storj.io/storj/private/migrate"
|
||||
"storj.io/storj/private/tagsql"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -143,7 +144,7 @@ func (db *satelliteDB) CheckVersion(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func flattenMigration(m *migrate.Migration) (*migrate.Migration, error) {
|
||||
var db migrate.DB
|
||||
var db tagsql.DB
|
||||
var version int
|
||||
var statements migrate.SQL
|
||||
|
||||
|
@ -7,8 +7,6 @@ package storagenodedb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"database/sql/driver"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
@ -51,28 +49,14 @@ var (
|
||||
|
||||
var _ storagenode.DB = (*DB)(nil)
|
||||
|
||||
// SQLDB is an abstract database so that we can mock out what database
|
||||
// implementation we're using.
|
||||
type SQLDB interface {
|
||||
Close() error
|
||||
|
||||
Conn(ctx context.Context) (tagsql.Conn, error)
|
||||
Driver() driver.Driver
|
||||
|
||||
BeginTx(ctx context.Context, txOptions *sql.TxOptions) (tagsql.Tx, error)
|
||||
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
|
||||
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
|
||||
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
|
||||
}
|
||||
|
||||
// DBContainer defines an interface to allow accessing and setting a SQLDB
|
||||
type DBContainer interface {
|
||||
Configure(sqlDB SQLDB)
|
||||
GetDB() SQLDB
|
||||
Configure(sqlDB tagsql.DB)
|
||||
GetDB() tagsql.DB
|
||||
}
|
||||
|
||||
// withTx is a helper method which executes callback in transaction scope
|
||||
func withTx(ctx context.Context, db SQLDB, cb func(tx tagsql.Tx) error) error {
|
||||
func withTx(ctx context.Context, db tagsql.DB, cb func(tx tagsql.Tx) error) error {
|
||||
tx, err := db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -247,7 +231,7 @@ func (db *DB) openDatabases() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *DB) rawDatabaseFromName(dbName string) SQLDB {
|
||||
func (db *DB) rawDatabaseFromName(dbName string) tagsql.DB {
|
||||
return db.SQLDBs[dbName].GetDB()
|
||||
}
|
||||
|
||||
@ -744,7 +728,7 @@ func (db *DB) Migration(ctx context.Context) *migrate.Migration {
|
||||
DB: db.deprecatedInfoDB,
|
||||
Description: "Free Storagenodes from trash data",
|
||||
Version: 13,
|
||||
Action: migrate.Func(func(ctx context.Context, log *zap.Logger, mgdb migrate.DB, tx tagsql.Tx) error {
|
||||
Action: migrate.Func(func(ctx context.Context, log *zap.Logger, mgdb tagsql.DB, tx tagsql.Tx) error {
|
||||
err := os.RemoveAll(filepath.Join(db.dbDirectory, "blob/ukfu6bhbboxilvt7jrwlqk7y2tapb5d2r2tsmj2sjxvw5qaaaaaa")) // us-central1
|
||||
if err != nil {
|
||||
log.Sugar().Debug(err)
|
||||
@ -769,7 +753,7 @@ func (db *DB) Migration(ctx context.Context) *migrate.Migration {
|
||||
DB: db.deprecatedInfoDB,
|
||||
Description: "Free Storagenodes from orphaned tmp data",
|
||||
Version: 14,
|
||||
Action: migrate.Func(func(ctx context.Context, log *zap.Logger, mgdb migrate.DB, tx tagsql.Tx) error {
|
||||
Action: migrate.Func(func(ctx context.Context, log *zap.Logger, mgdb tagsql.DB, tx tagsql.Tx) error {
|
||||
err := os.RemoveAll(filepath.Join(db.dbDirectory, "tmp"))
|
||||
if err != nil {
|
||||
log.Sugar().Debug(err)
|
||||
@ -910,7 +894,7 @@ func (db *DB) Migration(ctx context.Context) *migrate.Migration {
|
||||
DB: db.deprecatedInfoDB,
|
||||
Description: "Vacuum info db",
|
||||
Version: 22,
|
||||
Action: migrate.Func(func(ctx context.Context, log *zap.Logger, _ migrate.DB, tx tagsql.Tx) error {
|
||||
Action: migrate.Func(func(ctx context.Context, log *zap.Logger, _ tagsql.DB, tx tagsql.Tx) error {
|
||||
_, err := db.deprecatedInfoDB.GetDB().ExecContext(ctx, "VACUUM;")
|
||||
return err
|
||||
}),
|
||||
@ -919,7 +903,7 @@ func (db *DB) Migration(ctx context.Context) *migrate.Migration {
|
||||
DB: db.deprecatedInfoDB,
|
||||
Description: "Split into multiple sqlite databases",
|
||||
Version: 23,
|
||||
Action: migrate.Func(func(ctx context.Context, log *zap.Logger, _ migrate.DB, tx tagsql.Tx) error {
|
||||
Action: migrate.Func(func(ctx context.Context, log *zap.Logger, _ tagsql.DB, tx tagsql.Tx) error {
|
||||
// Migrate all the tables to new database files.
|
||||
if err := db.migrateToDB(ctx, BandwidthDBName, "bandwidth_usage", "bandwidth_usage_rollups"); err != nil {
|
||||
return ErrDatabase.Wrap(err)
|
||||
@ -956,7 +940,7 @@ func (db *DB) Migration(ctx context.Context) *migrate.Migration {
|
||||
DB: db.deprecatedInfoDB,
|
||||
Description: "Drop unneeded tables in deprecatedInfoDB",
|
||||
Version: 24,
|
||||
Action: migrate.Func(func(ctx context.Context, log *zap.Logger, _ migrate.DB, tx tagsql.Tx) error {
|
||||
Action: migrate.Func(func(ctx context.Context, log *zap.Logger, _ tagsql.DB, tx tagsql.Tx) error {
|
||||
// We drop the migrated tables from the deprecated database and VACUUM SQLite3
|
||||
// in migration step 23 because if we were to keep that as part of step 22
|
||||
// and an error occurred it would replay the entire migration but some tables
|
||||
|
@ -3,31 +3,19 @@
|
||||
|
||||
package storagenodedb
|
||||
|
||||
import "storj.io/storj/private/tagsql"
|
||||
|
||||
// dbContainerImpl fulfills the migrate.DB interface and the SQLDB interface.
|
||||
type dbContainerImpl struct {
|
||||
SQLDB
|
||||
}
|
||||
|
||||
// Schema returns schema
|
||||
// These are implemented because the migrate.DB interface requires them.
|
||||
// Maybe in the future we should untangle those.
|
||||
func (db *dbContainerImpl) Schema() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
// Rebind rebind parameters
|
||||
// These are implemented because the migrate.DB interface requires them.
|
||||
// Maybe in the future we should untangle those.
|
||||
func (db *dbContainerImpl) Rebind(s string) string {
|
||||
return s
|
||||
tagsql.DB
|
||||
}
|
||||
|
||||
// Configure sets the underlining SQLDB connection.
|
||||
func (db *dbContainerImpl) Configure(sqlDB SQLDB) {
|
||||
db.SQLDB = sqlDB
|
||||
func (db *dbContainerImpl) Configure(newDB tagsql.DB) {
|
||||
db.DB = newDB
|
||||
}
|
||||
|
||||
// GetDB returns underlying implementation of dbContainerImpl.
|
||||
func (db *dbContainerImpl) GetDB() SQLDB {
|
||||
return db.SQLDB
|
||||
func (db *dbContainerImpl) GetDB() tagsql.DB {
|
||||
return db.DB
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user