From 0c6d5fd9adeeb4f2cf77866846c3be31aeeb7448 Mon Sep 17 00:00:00 2001 From: Egon Elbre Date: Tue, 12 Feb 2019 18:57:26 +0200 Subject: [PATCH] implement basic migration (#1261) * Implement database migrations. * Add few notes * fix linter errors * add non working example * initial improvements + basic tests * update test * add more tests * more changes * small changes * revert unnecessary changes * revert unnecessary changes * don't log ignored steps * remove OnCreate action * rename method + validate steps * fix version comparing --- internal/migrate/create_test.go | 20 +-- internal/migrate/versions.go | 203 ++++++++++++++++++++++++ internal/migrate/versions_test.go | 251 ++++++++++++++++++++++++++++++ 3 files changed, 465 insertions(+), 9 deletions(-) create mode 100644 internal/migrate/versions.go create mode 100644 internal/migrate/versions_test.go diff --git a/internal/migrate/create_test.go b/internal/migrate/create_test.go index e4e4b06c0..acafc6ced 100644 --- a/internal/migrate/create_test.go +++ b/internal/migrate/create_test.go @@ -1,7 +1,7 @@ // Copyright (C) 2019 Storj Labs, Inc. // See LICENSE for copying information. -package migrate +package migrate_test import ( "database/sql" @@ -12,6 +12,8 @@ import ( "github.com/stretchr/testify/assert" + "storj.io/storj/internal/migrate" + _ "github.com/lib/pq" _ "github.com/mattn/go-sqlite3" ) @@ -24,19 +26,19 @@ func TestCreate_Sqlite(t *testing.T) { defer func() { assert.NoError(t, db.Close()) }() // should create table - err = Create("example", &sqliteDB{db, "CREATE TABLE example_table (id text)"}) + err = migrate.Create("example", &sqliteDB{db, "CREATE TABLE example_table (id text)"}) assert.NoError(t, err) // shouldn't create a new table - err = Create("example", &sqliteDB{db, "CREATE TABLE example_table (id text)"}) + err = migrate.Create("example", &sqliteDB{db, "CREATE TABLE example_table (id text)"}) assert.NoError(t, err) // should fail, because schema changed - err = Create("example", &sqliteDB{db, "CREATE TABLE example_table (id text, version int)"}) + err = migrate.Create("example", &sqliteDB{db, "CREATE TABLE example_table (id text, version int)"}) assert.Error(t, err) // should fail, because of trying to CREATE TABLE with same name - err = Create("conflict", &sqliteDB{db, "CREATE TABLE example_table (id text, version int)"}) + err = migrate.Create("conflict", &sqliteDB{db, "CREATE TABLE example_table (id text, version int)"}) assert.Error(t, err) } @@ -57,19 +59,19 @@ func TestCreate_Postgres(t *testing.T) { defer func() { assert.NoError(t, db.Close()) }() // should create table - err = Create("example", &postgresDB{db, "CREATE TABLE example_table (id text)"}) + err = migrate.Create("example", &postgresDB{db, "CREATE TABLE example_table (id text)"}) assert.NoError(t, err) // shouldn't create a new table - err = Create("example", &postgresDB{db, "CREATE TABLE example_table (id text)"}) + err = migrate.Create("example", &postgresDB{db, "CREATE TABLE example_table (id text)"}) assert.NoError(t, err) // should fail, because schema changed - err = Create("example", &postgresDB{db, "CREATE TABLE example_table (id text, version integer)"}) + err = migrate.Create("example", &postgresDB{db, "CREATE TABLE example_table (id text, version integer)"}) assert.Error(t, err) // should fail, because of trying to CREATE TABLE with same name - err = Create("conflict", &postgresDB{db, "CREATE TABLE example_table (id text, version integer)"}) + err = migrate.Create("conflict", &postgresDB{db, "CREATE TABLE example_table (id text, version integer)"}) assert.Error(t, err) } diff --git a/internal/migrate/versions.go b/internal/migrate/versions.go new file mode 100644 index 000000000..309767e6f --- /dev/null +++ b/internal/migrate/versions.go @@ -0,0 +1,203 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package migrate + +import ( + "database/sql" + "regexp" + "sort" + "strconv" + "time" + + "github.com/zeebo/errs" + "go.uber.org/zap" +) + +/* + +Scenarios it doesn't handle properly. + +1. Rollback to initial state on multi-step migration. + + Let's say there's a scenario where we run migration steps: + 1. update a table schema + 2. move files + 3. update a table schema + 4. update a table schema, which fails + + In this case there's no easy way to rollback the moving of files. + +2. Undoing migrations. + + Intentionally left out, because we do not gain that much from currently. + +3. Snapshotting the whole state. + + This probably should be done by the user of this library, when there's disk-space available. + +4. Figuring out what the exact executed steps are. +*/ + +// Migration describes a migration steps +type Migration struct { + Table string + Steps []*Step +} + +// Step describes a single step in migration. +type Step struct { + Description string + Version int // Versions should start at 0 + Action Action +} + +// Action is something that needs to be done +type Action interface { + Run(log *zap.Logger, db DB, tx *sql.Tx) error +} + +// ValidTableName checks whether the specified table name is valid +func (migration *Migration) ValidTableName() error { + matched, err := regexp.MatchString(`^[a-z_]+$`, migration.Table) + if !matched || err != nil { + return Error.New("invalid table name: %v", migration.Table) + } + return nil +} + +// ValidateSteps checks whether the specified table name is valid +func (migration *Migration) ValidateSteps() error { + sorted := sort.SliceIsSorted(migration.Steps, func(i, j int) bool { + return migration.Steps[i].Version <= migration.Steps[j].Version + }) + if !sorted { + return Error.New("steps have incorrect order") + } + return nil +} + +// Run runs the migration steps +func (migration *Migration) Run(log *zap.Logger, db DB) error { + err := migration.ValidTableName() + if err != nil { + return err + } + + err = migration.ValidateSteps() + if err != nil { + return err + } + + err = migration.ensureVersionTable(log, db) + if err != nil { + return Error.New("creating version table failed: %v", err) + } + + version, err := migration.getLatestVersion(log, db) + if err != nil { + return Error.Wrap(err) + } + + if version >= 0 { + log.Info("Latest Version", zap.Int("version", version)) + } else { + log.Info("No Version") + } + + for _, step := range migration.Steps { + if step.Version <= version { + continue + } + + log := log.Named(strconv.Itoa(step.Version)) + log.Info(step.Description) + + tx, err := db.Begin() + if err != nil { + return Error.Wrap(err) + } + + err = step.Action.Run(log, db, tx) + if err != nil { + return Error.Wrap(errs.Combine(err, tx.Rollback())) + } + + err = migration.addVersion(tx, db, step.Version) + if err != nil { + return Error.Wrap(errs.Combine(err, tx.Rollback())) + } + + if err := tx.Commit(); err != nil { + return Error.Wrap(err) + } + } + + return nil +} + +// createVersionTable creates a new version table +func (migration *Migration) ensureVersionTable(log *zap.Logger, db DB) error { + tx, err := db.Begin() + if err != nil { + return Error.Wrap(err) + } + + _, err = tx.Exec(db.Rebind(`CREATE TABLE IF NOT EXISTS ` + migration.Table + ` (version int, commited_at text)`)) + if err != nil { + return Error.Wrap(errs.Combine(err, tx.Rollback())) + } + + return Error.Wrap(tx.Commit()) +} + +// getLatestVersion finds the latest version table +func (migration *Migration) getLatestVersion(log *zap.Logger, db DB) (int, error) { + tx, err := db.Begin() + if err != nil { + return -1, Error.Wrap(err) + } + + var version sql.NullInt64 + err = tx.QueryRow(db.Rebind(`SELECT MAX(version) FROM ` + migration.Table)).Scan(&version) + if err == sql.ErrNoRows || !version.Valid { + return -1, Error.Wrap(tx.Commit()) + } + if err != nil { + return -1, Error.Wrap(errs.Combine(err, tx.Rollback())) + } + + return int(version.Int64), Error.Wrap(tx.Commit()) +} + +// addVersion adds information about a new migration +func (migration *Migration) addVersion(tx *sql.Tx, db DB, version int) error { + _, err := tx.Exec(db.Rebind(` + INSERT INTO `+migration.Table+` (version, commited_at) + VALUES (?, ?)`), + version, time.Now().String(), + ) + return err +} + +// SQL statements that are executed on the database +type SQL []string + +// Run runs the SQL statements +func (sql SQL) Run(log *zap.Logger, db DB, tx *sql.Tx) (err error) { + for _, query := range sql { + _, err := tx.Exec(db.Rebind(query)) + if err != nil { + return err + } + } + return nil +} + +// Func is an arbitrary operation +type Func func(log *zap.Logger, db DB, tx *sql.Tx) error + +// Run runs the migration +func (fn Func) Run(log *zap.Logger, db DB, tx *sql.Tx) error { + return fn(log, db, tx) +} diff --git a/internal/migrate/versions_test.go b/internal/migrate/versions_test.go new file mode 100644 index 000000000..3836eaa71 --- /dev/null +++ b/internal/migrate/versions_test.go @@ -0,0 +1,251 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package migrate_test + +import ( + "database/sql" + "fmt" + "io/ioutil" + "os" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/zeebo/errs" + "go.uber.org/zap" + + "storj.io/storj/internal/migrate" + "storj.io/storj/internal/testcontext" +) + +func TestBasicMigrationSqlite(t *testing.T) { + db, err := sql.Open("sqlite3", ":memory:") + require.NoError(t, err) + defer func() { assert.NoError(t, db.Close()) }() + + basicMigration(t, db, &sqliteDB{DB: db}) +} + +func TestBasicMigrationPostgres(t *testing.T) { + if *testPostgres == "" { + t.Skipf("postgres flag missing, example:\n-postgres-test-db=%s", defaultPostgresConn) + } + + db, err := sql.Open("postgres", *testPostgres) + require.NoError(t, err) + defer func() { assert.NoError(t, db.Close()) }() + + basicMigration(t, db, &postgresDB{DB: db}) +} + +func basicMigration(t *testing.T, db *sql.DB, testDB migrate.DB) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + dbName := strings.ToLower(`versions_` + t.Name()) + defer func() { assert.NoError(t, dropTables(db, dbName, "users")) }() + + err := ioutil.WriteFile(ctx.File("alpha.txt"), []byte("test"), 0644) + require.NoError(t, err) + m := migrate.Migration{ + Table: dbName, + Steps: []*migrate.Step{ + { + Description: "Initialize Table", + Version: 1, + Action: migrate.SQL{ + `CREATE TABLE users (id int)`, + `INSERT INTO users (id) VALUES (1)`, + }, + }, + { + Description: "Move files", + Version: 2, + Action: migrate.Func(func(log *zap.Logger, _ migrate.DB, tx *sql.Tx) error { + return os.Rename(ctx.File("alpha.txt"), ctx.File("beta.txt")) + }), + }, + }, + } + + err = m.Run(zap.NewNop(), testDB) + assert.NoError(t, err) + + var version int + err = db.QueryRow(`SELECT MAX(version) FROM ` + dbName).Scan(&version) + assert.NoError(t, err) + assert.Equal(t, 2, version) + + var id int + err = db.QueryRow(`SELECT MAX(id) FROM users`).Scan(&id) + assert.NoError(t, err) + assert.Equal(t, 1, id) + + // file not exists + _, err = os.Stat(ctx.File("alpha.txt")) + assert.Error(t, err) + + // file exists + _, err = os.Stat(ctx.File("beta.txt")) + assert.NoError(t, err) + data, err := ioutil.ReadFile(ctx.File("beta.txt")) + assert.NoError(t, err) + assert.Equal(t, []byte("test"), data) +} + +func TestMultipleMigrationSqlite(t *testing.T) { + db, err := sql.Open("sqlite3", ":memory:") + require.NoError(t, err) + defer func() { assert.NoError(t, db.Close()) }() + + multipleMigration(t, db, &sqliteDB{DB: db}) +} + +func TestMultipleMigrationPostgres(t *testing.T) { + if *testPostgres == "" { + t.Skipf("postgres flag missing, example:\n-postgres-test-db=%s", defaultPostgresConn) + } + + db, err := sql.Open("postgres", *testPostgres) + require.NoError(t, err) + defer func() { assert.NoError(t, db.Close()) }() + + multipleMigration(t, db, &postgresDB{DB: db}) +} + +func multipleMigration(t *testing.T, db *sql.DB, testDB migrate.DB) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + dbName := strings.ToLower(`versions_` + t.Name()) + defer func() { assert.NoError(t, dropTables(db, dbName)) }() + + steps := 0 + m := migrate.Migration{ + Table: dbName, + Steps: []*migrate.Step{ + { + Description: "Step 1", + Version: 1, + Action: migrate.Func(func(log *zap.Logger, _ migrate.DB, tx *sql.Tx) error { + steps++ + return nil + }), + }, + { + Description: "Step 2", + Version: 2, + Action: migrate.Func(func(log *zap.Logger, _ migrate.DB, tx *sql.Tx) error { + steps++ + return nil + }), + }, + }, + } + + err := m.Run(zap.NewNop(), testDB) + assert.NoError(t, err) + assert.Equal(t, 2, steps) + + m.Steps = append(m.Steps, &migrate.Step{ + Description: "Step 3", + Version: 3, + Action: migrate.Func(func(log *zap.Logger, _ migrate.DB, tx *sql.Tx) error { + steps++ + return nil + }), + }) + err = m.Run(zap.NewNop(), testDB) + assert.NoError(t, err) + + var version int + err = db.QueryRow(`SELECT MAX(version) FROM ` + dbName).Scan(&version) + assert.NoError(t, err) + assert.Equal(t, 3, version) + + assert.Equal(t, 3, steps) +} + +func TestFailedMigrationSqlite(t *testing.T) { + db, err := sql.Open("sqlite3", ":memory:") + require.NoError(t, err) + defer func() { assert.NoError(t, db.Close()) }() + + failedMigration(t, db, &sqliteDB{DB: db}) +} + +func TestFailedMigrationPostgres(t *testing.T) { + if *testPostgres == "" { + t.Skipf("postgres flag missing, example:\n-postgres-test-db=%s", defaultPostgresConn) + } + + db, err := sql.Open("postgres", *testPostgres) + require.NoError(t, err) + defer func() { assert.NoError(t, db.Close()) }() + + failedMigration(t, db, &postgresDB{DB: db}) +} + +func failedMigration(t *testing.T, db *sql.DB, testDB migrate.DB) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + dbName := strings.ToLower(`versions_` + t.Name()) + defer func() { assert.NoError(t, dropTables(db, dbName)) }() + + m := migrate.Migration{ + Table: dbName, + Steps: []*migrate.Step{ + { + Description: "Step 1", + Version: 1, + Action: migrate.Func(func(log *zap.Logger, _ migrate.DB, tx *sql.Tx) error { + return fmt.Errorf("migration failed") + }), + }, + }, + } + + err := m.Run(zap.NewNop(), testDB) + require.Error(t, err, "migration failed") + + var version sql.NullInt64 + err = db.QueryRow(`SELECT MAX(version) FROM ` + dbName).Scan(&version) + assert.NoError(t, err) + assert.Equal(t, false, version.Valid) +} + +func TestInvalidStepsOrder(t *testing.T) { + m := migrate.Migration{ + Table: "test", + Steps: []*migrate.Step{ + { + Version: 0, + }, + { + Version: 1, + }, + { + Version: 4, + }, + { + Version: 2, + }, + }, + } + + err := m.ValidateSteps() + require.Error(t, err, "migrate: steps have incorrect order") +} + +func dropTables(db *sql.DB, names ...string) error { + var errlist errs.Group + for _, name := range names { + _, err := db.Exec(`DROP TABLE ` + name) + errlist.Add(err) + } + + return errlist.Err() +}