implement basic migration (#1261)
* Implement database migrations. * Add few notes * fix linter errors * add non working example * initial improvements + basic tests * update test * add more tests * more changes * small changes * revert unnecessary changes * revert unnecessary changes * don't log ignored steps * remove OnCreate action * rename method + validate steps * fix version comparing
This commit is contained in:
parent
b2f9453184
commit
0c6d5fd9ad
@ -1,7 +1,7 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package migrate
|
||||
package migrate_test
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
@ -12,6 +12,8 @@ import (
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"storj.io/storj/internal/migrate"
|
||||
|
||||
_ "github.com/lib/pq"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
)
|
||||
@ -24,19 +26,19 @@ func TestCreate_Sqlite(t *testing.T) {
|
||||
defer func() { assert.NoError(t, db.Close()) }()
|
||||
|
||||
// should create table
|
||||
err = Create("example", &sqliteDB{db, "CREATE TABLE example_table (id text)"})
|
||||
err = migrate.Create("example", &sqliteDB{db, "CREATE TABLE example_table (id text)"})
|
||||
assert.NoError(t, err)
|
||||
|
||||
// shouldn't create a new table
|
||||
err = Create("example", &sqliteDB{db, "CREATE TABLE example_table (id text)"})
|
||||
err = migrate.Create("example", &sqliteDB{db, "CREATE TABLE example_table (id text)"})
|
||||
assert.NoError(t, err)
|
||||
|
||||
// should fail, because schema changed
|
||||
err = Create("example", &sqliteDB{db, "CREATE TABLE example_table (id text, version int)"})
|
||||
err = migrate.Create("example", &sqliteDB{db, "CREATE TABLE example_table (id text, version int)"})
|
||||
assert.Error(t, err)
|
||||
|
||||
// should fail, because of trying to CREATE TABLE with same name
|
||||
err = Create("conflict", &sqliteDB{db, "CREATE TABLE example_table (id text, version int)"})
|
||||
err = migrate.Create("conflict", &sqliteDB{db, "CREATE TABLE example_table (id text, version int)"})
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
@ -57,19 +59,19 @@ func TestCreate_Postgres(t *testing.T) {
|
||||
defer func() { assert.NoError(t, db.Close()) }()
|
||||
|
||||
// should create table
|
||||
err = Create("example", &postgresDB{db, "CREATE TABLE example_table (id text)"})
|
||||
err = migrate.Create("example", &postgresDB{db, "CREATE TABLE example_table (id text)"})
|
||||
assert.NoError(t, err)
|
||||
|
||||
// shouldn't create a new table
|
||||
err = Create("example", &postgresDB{db, "CREATE TABLE example_table (id text)"})
|
||||
err = migrate.Create("example", &postgresDB{db, "CREATE TABLE example_table (id text)"})
|
||||
assert.NoError(t, err)
|
||||
|
||||
// should fail, because schema changed
|
||||
err = Create("example", &postgresDB{db, "CREATE TABLE example_table (id text, version integer)"})
|
||||
err = migrate.Create("example", &postgresDB{db, "CREATE TABLE example_table (id text, version integer)"})
|
||||
assert.Error(t, err)
|
||||
|
||||
// should fail, because of trying to CREATE TABLE with same name
|
||||
err = Create("conflict", &postgresDB{db, "CREATE TABLE example_table (id text, version integer)"})
|
||||
err = migrate.Create("conflict", &postgresDB{db, "CREATE TABLE example_table (id text, version integer)"})
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
|
203
internal/migrate/versions.go
Normal file
203
internal/migrate/versions.go
Normal file
@ -0,0 +1,203 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package migrate
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
/*
|
||||
|
||||
Scenarios it doesn't handle properly.
|
||||
|
||||
1. Rollback to initial state on multi-step migration.
|
||||
|
||||
Let's say there's a scenario where we run migration steps:
|
||||
1. update a table schema
|
||||
2. move files
|
||||
3. update a table schema
|
||||
4. update a table schema, which fails
|
||||
|
||||
In this case there's no easy way to rollback the moving of files.
|
||||
|
||||
2. Undoing migrations.
|
||||
|
||||
Intentionally left out, because we do not gain that much from currently.
|
||||
|
||||
3. Snapshotting the whole state.
|
||||
|
||||
This probably should be done by the user of this library, when there's disk-space available.
|
||||
|
||||
4. Figuring out what the exact executed steps are.
|
||||
*/
|
||||
|
||||
// Migration describes a migration steps
|
||||
type Migration struct {
|
||||
Table string
|
||||
Steps []*Step
|
||||
}
|
||||
|
||||
// Step describes a single step in migration.
|
||||
type Step struct {
|
||||
Description string
|
||||
Version int // Versions should start at 0
|
||||
Action Action
|
||||
}
|
||||
|
||||
// Action is something that needs to be done
|
||||
type Action interface {
|
||||
Run(log *zap.Logger, db DB, tx *sql.Tx) error
|
||||
}
|
||||
|
||||
// ValidTableName checks whether the specified table name is valid
|
||||
func (migration *Migration) ValidTableName() error {
|
||||
matched, err := regexp.MatchString(`^[a-z_]+$`, migration.Table)
|
||||
if !matched || err != nil {
|
||||
return Error.New("invalid table name: %v", migration.Table)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ValidateSteps checks whether the specified table name is valid
|
||||
func (migration *Migration) ValidateSteps() error {
|
||||
sorted := sort.SliceIsSorted(migration.Steps, func(i, j int) bool {
|
||||
return migration.Steps[i].Version <= migration.Steps[j].Version
|
||||
})
|
||||
if !sorted {
|
||||
return Error.New("steps have incorrect order")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Run runs the migration steps
|
||||
func (migration *Migration) Run(log *zap.Logger, db DB) error {
|
||||
err := migration.ValidTableName()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = migration.ValidateSteps()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = migration.ensureVersionTable(log, db)
|
||||
if err != nil {
|
||||
return Error.New("creating version table failed: %v", err)
|
||||
}
|
||||
|
||||
version, err := migration.getLatestVersion(log, db)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
if version >= 0 {
|
||||
log.Info("Latest Version", zap.Int("version", version))
|
||||
} else {
|
||||
log.Info("No Version")
|
||||
}
|
||||
|
||||
for _, step := range migration.Steps {
|
||||
if step.Version <= version {
|
||||
continue
|
||||
}
|
||||
|
||||
log := log.Named(strconv.Itoa(step.Version))
|
||||
log.Info(step.Description)
|
||||
|
||||
tx, err := db.Begin()
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
err = step.Action.Run(log, db, tx)
|
||||
if err != nil {
|
||||
return Error.Wrap(errs.Combine(err, tx.Rollback()))
|
||||
}
|
||||
|
||||
err = migration.addVersion(tx, db, step.Version)
|
||||
if err != nil {
|
||||
return Error.Wrap(errs.Combine(err, tx.Rollback()))
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// createVersionTable creates a new version table
|
||||
func (migration *Migration) ensureVersionTable(log *zap.Logger, db DB) error {
|
||||
tx, err := db.Begin()
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
_, err = tx.Exec(db.Rebind(`CREATE TABLE IF NOT EXISTS ` + migration.Table + ` (version int, commited_at text)`))
|
||||
if err != nil {
|
||||
return Error.Wrap(errs.Combine(err, tx.Rollback()))
|
||||
}
|
||||
|
||||
return Error.Wrap(tx.Commit())
|
||||
}
|
||||
|
||||
// getLatestVersion finds the latest version table
|
||||
func (migration *Migration) getLatestVersion(log *zap.Logger, db DB) (int, error) {
|
||||
tx, err := db.Begin()
|
||||
if err != nil {
|
||||
return -1, Error.Wrap(err)
|
||||
}
|
||||
|
||||
var version sql.NullInt64
|
||||
err = tx.QueryRow(db.Rebind(`SELECT MAX(version) FROM ` + migration.Table)).Scan(&version)
|
||||
if err == sql.ErrNoRows || !version.Valid {
|
||||
return -1, Error.Wrap(tx.Commit())
|
||||
}
|
||||
if err != nil {
|
||||
return -1, Error.Wrap(errs.Combine(err, tx.Rollback()))
|
||||
}
|
||||
|
||||
return int(version.Int64), Error.Wrap(tx.Commit())
|
||||
}
|
||||
|
||||
// addVersion adds information about a new migration
|
||||
func (migration *Migration) addVersion(tx *sql.Tx, db DB, version int) error {
|
||||
_, err := tx.Exec(db.Rebind(`
|
||||
INSERT INTO `+migration.Table+` (version, commited_at)
|
||||
VALUES (?, ?)`),
|
||||
version, time.Now().String(),
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
// SQL statements that are executed on the database
|
||||
type SQL []string
|
||||
|
||||
// Run runs the SQL statements
|
||||
func (sql SQL) Run(log *zap.Logger, db DB, tx *sql.Tx) (err error) {
|
||||
for _, query := range sql {
|
||||
_, err := tx.Exec(db.Rebind(query))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Func is an arbitrary operation
|
||||
type Func func(log *zap.Logger, db DB, tx *sql.Tx) error
|
||||
|
||||
// Run runs the migration
|
||||
func (fn Func) Run(log *zap.Logger, db DB, tx *sql.Tx) error {
|
||||
return fn(log, db, tx)
|
||||
}
|
251
internal/migrate/versions_test.go
Normal file
251
internal/migrate/versions_test.go
Normal file
@ -0,0 +1,251 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package migrate_test
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/storj/internal/migrate"
|
||||
"storj.io/storj/internal/testcontext"
|
||||
)
|
||||
|
||||
func TestBasicMigrationSqlite(t *testing.T) {
|
||||
db, err := sql.Open("sqlite3", ":memory:")
|
||||
require.NoError(t, err)
|
||||
defer func() { assert.NoError(t, db.Close()) }()
|
||||
|
||||
basicMigration(t, db, &sqliteDB{DB: db})
|
||||
}
|
||||
|
||||
func TestBasicMigrationPostgres(t *testing.T) {
|
||||
if *testPostgres == "" {
|
||||
t.Skipf("postgres flag missing, example:\n-postgres-test-db=%s", defaultPostgresConn)
|
||||
}
|
||||
|
||||
db, err := sql.Open("postgres", *testPostgres)
|
||||
require.NoError(t, err)
|
||||
defer func() { assert.NoError(t, db.Close()) }()
|
||||
|
||||
basicMigration(t, db, &postgresDB{DB: db})
|
||||
}
|
||||
|
||||
func basicMigration(t *testing.T, db *sql.DB, testDB migrate.DB) {
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
dbName := strings.ToLower(`versions_` + t.Name())
|
||||
defer func() { assert.NoError(t, dropTables(db, dbName, "users")) }()
|
||||
|
||||
err := ioutil.WriteFile(ctx.File("alpha.txt"), []byte("test"), 0644)
|
||||
require.NoError(t, err)
|
||||
m := migrate.Migration{
|
||||
Table: dbName,
|
||||
Steps: []*migrate.Step{
|
||||
{
|
||||
Description: "Initialize Table",
|
||||
Version: 1,
|
||||
Action: migrate.SQL{
|
||||
`CREATE TABLE users (id int)`,
|
||||
`INSERT INTO users (id) VALUES (1)`,
|
||||
},
|
||||
},
|
||||
{
|
||||
Description: "Move files",
|
||||
Version: 2,
|
||||
Action: migrate.Func(func(log *zap.Logger, _ migrate.DB, tx *sql.Tx) error {
|
||||
return os.Rename(ctx.File("alpha.txt"), ctx.File("beta.txt"))
|
||||
}),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
err = m.Run(zap.NewNop(), testDB)
|
||||
assert.NoError(t, err)
|
||||
|
||||
var version int
|
||||
err = db.QueryRow(`SELECT MAX(version) FROM ` + dbName).Scan(&version)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 2, version)
|
||||
|
||||
var id int
|
||||
err = db.QueryRow(`SELECT MAX(id) FROM users`).Scan(&id)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, id)
|
||||
|
||||
// file not exists
|
||||
_, err = os.Stat(ctx.File("alpha.txt"))
|
||||
assert.Error(t, err)
|
||||
|
||||
// file exists
|
||||
_, err = os.Stat(ctx.File("beta.txt"))
|
||||
assert.NoError(t, err)
|
||||
data, err := ioutil.ReadFile(ctx.File("beta.txt"))
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, []byte("test"), data)
|
||||
}
|
||||
|
||||
func TestMultipleMigrationSqlite(t *testing.T) {
|
||||
db, err := sql.Open("sqlite3", ":memory:")
|
||||
require.NoError(t, err)
|
||||
defer func() { assert.NoError(t, db.Close()) }()
|
||||
|
||||
multipleMigration(t, db, &sqliteDB{DB: db})
|
||||
}
|
||||
|
||||
func TestMultipleMigrationPostgres(t *testing.T) {
|
||||
if *testPostgres == "" {
|
||||
t.Skipf("postgres flag missing, example:\n-postgres-test-db=%s", defaultPostgresConn)
|
||||
}
|
||||
|
||||
db, err := sql.Open("postgres", *testPostgres)
|
||||
require.NoError(t, err)
|
||||
defer func() { assert.NoError(t, db.Close()) }()
|
||||
|
||||
multipleMigration(t, db, &postgresDB{DB: db})
|
||||
}
|
||||
|
||||
func multipleMigration(t *testing.T, db *sql.DB, testDB migrate.DB) {
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
dbName := strings.ToLower(`versions_` + t.Name())
|
||||
defer func() { assert.NoError(t, dropTables(db, dbName)) }()
|
||||
|
||||
steps := 0
|
||||
m := migrate.Migration{
|
||||
Table: dbName,
|
||||
Steps: []*migrate.Step{
|
||||
{
|
||||
Description: "Step 1",
|
||||
Version: 1,
|
||||
Action: migrate.Func(func(log *zap.Logger, _ migrate.DB, tx *sql.Tx) error {
|
||||
steps++
|
||||
return nil
|
||||
}),
|
||||
},
|
||||
{
|
||||
Description: "Step 2",
|
||||
Version: 2,
|
||||
Action: migrate.Func(func(log *zap.Logger, _ migrate.DB, tx *sql.Tx) error {
|
||||
steps++
|
||||
return nil
|
||||
}),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
err := m.Run(zap.NewNop(), testDB)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 2, steps)
|
||||
|
||||
m.Steps = append(m.Steps, &migrate.Step{
|
||||
Description: "Step 3",
|
||||
Version: 3,
|
||||
Action: migrate.Func(func(log *zap.Logger, _ migrate.DB, tx *sql.Tx) error {
|
||||
steps++
|
||||
return nil
|
||||
}),
|
||||
})
|
||||
err = m.Run(zap.NewNop(), testDB)
|
||||
assert.NoError(t, err)
|
||||
|
||||
var version int
|
||||
err = db.QueryRow(`SELECT MAX(version) FROM ` + dbName).Scan(&version)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 3, version)
|
||||
|
||||
assert.Equal(t, 3, steps)
|
||||
}
|
||||
|
||||
func TestFailedMigrationSqlite(t *testing.T) {
|
||||
db, err := sql.Open("sqlite3", ":memory:")
|
||||
require.NoError(t, err)
|
||||
defer func() { assert.NoError(t, db.Close()) }()
|
||||
|
||||
failedMigration(t, db, &sqliteDB{DB: db})
|
||||
}
|
||||
|
||||
func TestFailedMigrationPostgres(t *testing.T) {
|
||||
if *testPostgres == "" {
|
||||
t.Skipf("postgres flag missing, example:\n-postgres-test-db=%s", defaultPostgresConn)
|
||||
}
|
||||
|
||||
db, err := sql.Open("postgres", *testPostgres)
|
||||
require.NoError(t, err)
|
||||
defer func() { assert.NoError(t, db.Close()) }()
|
||||
|
||||
failedMigration(t, db, &postgresDB{DB: db})
|
||||
}
|
||||
|
||||
func failedMigration(t *testing.T, db *sql.DB, testDB migrate.DB) {
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
dbName := strings.ToLower(`versions_` + t.Name())
|
||||
defer func() { assert.NoError(t, dropTables(db, dbName)) }()
|
||||
|
||||
m := migrate.Migration{
|
||||
Table: dbName,
|
||||
Steps: []*migrate.Step{
|
||||
{
|
||||
Description: "Step 1",
|
||||
Version: 1,
|
||||
Action: migrate.Func(func(log *zap.Logger, _ migrate.DB, tx *sql.Tx) error {
|
||||
return fmt.Errorf("migration failed")
|
||||
}),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
err := m.Run(zap.NewNop(), testDB)
|
||||
require.Error(t, err, "migration failed")
|
||||
|
||||
var version sql.NullInt64
|
||||
err = db.QueryRow(`SELECT MAX(version) FROM ` + dbName).Scan(&version)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, false, version.Valid)
|
||||
}
|
||||
|
||||
func TestInvalidStepsOrder(t *testing.T) {
|
||||
m := migrate.Migration{
|
||||
Table: "test",
|
||||
Steps: []*migrate.Step{
|
||||
{
|
||||
Version: 0,
|
||||
},
|
||||
{
|
||||
Version: 1,
|
||||
},
|
||||
{
|
||||
Version: 4,
|
||||
},
|
||||
{
|
||||
Version: 2,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
err := m.ValidateSteps()
|
||||
require.Error(t, err, "migrate: steps have incorrect order")
|
||||
}
|
||||
|
||||
func dropTables(db *sql.DB, names ...string) error {
|
||||
var errlist errs.Group
|
||||
for _, name := range names {
|
||||
_, err := db.Exec(`DROP TABLE ` + name)
|
||||
errlist.Add(err)
|
||||
}
|
||||
|
||||
return errlist.Err()
|
||||
}
|
Loading…
Reference in New Issue
Block a user