cmd: add command to migrate project public_ids

This is to update all projects to have a public_id if they do not have
one.

github issue: https://github.com/storj/storj/issues/4861

Change-Id: Icfa42b62e15ca75d3c04a0aab48a3c3b0f3a9d6e
This commit is contained in:
Cameron 2022-08-12 16:58:56 -04:00 committed by Cameron
parent df5b0d0044
commit 7933e0c4c7
3 changed files with 373 additions and 0 deletions

View File

@ -0,0 +1,93 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"context"
"errors"
pgx "github.com/jackc/pgx/v4"
"github.com/spacemonkeygo/monkit/v3"
"github.com/spf13/cobra"
flag "github.com/spf13/pflag"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/private/process"
)
var mon = monkit.Package()
var (
rootCmd = &cobra.Command{
Use: "migrate-public-ids",
Short: "migrate-public-ids",
}
runCmd = &cobra.Command{
Use: "run",
Short: "run migrate-public-ids",
RunE: run,
}
config Config
)
func init() {
rootCmd.AddCommand(runCmd)
config.BindFlags(runCmd.Flags())
}
// Config defines configuration for migration.
type Config struct {
SatelliteDB string
Limit int
MaxUpdates int
}
// BindFlags adds bench flags to the the flagset.
func (config *Config) BindFlags(flag *flag.FlagSet) {
flag.StringVar(&config.SatelliteDB, "satellitedb", "", "connection URL for satelliteDB")
flag.IntVar(&config.Limit, "limit", 1000, "number of updates to perform at once")
flag.IntVar(&config.MaxUpdates, "max-updates", 0, "max number of updates to perform on each table")
}
// VerifyFlags verifies whether the values provided are valid.
func (config *Config) VerifyFlags() error {
var errlist errs.Group
if config.SatelliteDB == "" {
errlist.Add(errors.New("flag '--satellitedb' is not set"))
}
return errlist.Err()
}
func run(cmd *cobra.Command, args []string) error {
if err := config.VerifyFlags(); err != nil {
return err
}
ctx, _ := process.Ctx(cmd)
log := zap.L()
return Migrate(ctx, log, config)
}
func main() {
process.Exec(rootCmd)
}
// Migrate updates projects with a new public_id where public_id is null.
func Migrate(ctx context.Context, log *zap.Logger, config Config) (err error) {
defer mon.Task()(&ctx)(&err)
conn, err := pgx.Connect(ctx, config.SatelliteDB)
if err != nil {
return errs.New("unable to connect %q: %w", config.SatelliteDB, err)
}
defer func() {
err = errs.Combine(err, conn.Close(ctx))
}()
return MigrateProjects(ctx, log, conn, config)
}

View File

@ -0,0 +1,176 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package main_test
import (
"context"
"strings"
"testing"
pgx "github.com/jackc/pgx/v4"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/common/uuid"
"storj.io/private/dbutil"
"storj.io/private/dbutil/tempdb"
migrator "storj.io/storj/cmd/migrate-public-ids"
"storj.io/storj/satellite"
"storj.io/storj/satellite/console"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
)
// Test no entries in table doesn't error.
func TestMigrateProjectsSelectNoRows(t *testing.T) {
t.Parallel()
ctx := testcontext.New(t)
defer ctx.Cleanup()
prepare := func(t *testing.T, ctx *testcontext.Context, rawDB *dbutil.TempDatabase, db satellite.DB, conn *pgx.Conn, log *zap.Logger) {
}
check := func(t *testing.T, ctx context.Context, db satellite.DB) {}
test(t, prepare, migrator.MigrateProjects, check, &migrator.Config{
Limit: 8,
})
}
// Test user_agent field is updated correctly.
func TestMigrateProjectsTest(t *testing.T) {
t.Parallel()
ctx := testcontext.New(t)
defer ctx.Cleanup()
var n int
var notUpdate *console.Project
prepare := func(t *testing.T, ctx *testcontext.Context, rawDB *dbutil.TempDatabase, db satellite.DB, conn *pgx.Conn, log *zap.Logger) {
_, err := db.Console().Projects().Insert(ctx, &console.Project{
Name: "test",
Description: "test",
OwnerID: testrand.UUID(),
})
require.NoError(t, err)
n++
_, err = db.Console().Projects().Insert(ctx, &console.Project{
Name: "test1",
Description: "test1",
OwnerID: testrand.UUID(),
})
require.NoError(t, err)
n++
_, err = db.Console().Projects().Insert(ctx, &console.Project{
Name: "test",
Description: "test",
OwnerID: testrand.UUID(),
})
require.NoError(t, err)
n++
notUpdate, err = db.Console().Projects().Insert(ctx, &console.Project{
Name: "test",
Description: "test",
OwnerID: testrand.UUID(),
})
require.NoError(t, err)
err = testNullifyPublicIDs(ctx, log, conn, notUpdate.ID)
require.NoError(t, err)
projects, err := db.Console().Projects().GetAll(ctx)
require.NoError(t, err)
for _, p := range projects {
if p.ID == notUpdate.ID {
require.False(t, p.PublicID.IsZero())
} else {
require.True(t, p.PublicID.IsZero())
}
}
}
check := func(t *testing.T, ctx context.Context, db satellite.DB) {
projects, err := db.Console().Projects().GetAll(ctx)
require.NoError(t, err)
var updated int
var checkedNotUpdate bool
publicIDs := make(map[uuid.UUID]bool)
for _, prj := range projects {
if prj.ID == notUpdate.ID {
checkedNotUpdate = true
require.Equal(t, notUpdate.PublicID, prj.PublicID)
} else if !prj.PublicID.IsZero() {
updated++
}
if _, ok := publicIDs[prj.ID]; !ok {
publicIDs[prj.ID] = true
} else {
t.Fatalf("duplicate public_id: %v", prj.ID)
}
}
require.Equal(t, n, updated)
require.True(t, checkedNotUpdate)
n = 0
notUpdate = &console.Project{}
}
test(t, prepare, migrator.MigrateProjects, check, &migrator.Config{
Limit: 2,
})
}
func test(t *testing.T, prepare func(t *testing.T, ctx *testcontext.Context, rawDB *dbutil.TempDatabase, db satellite.DB, conn *pgx.Conn, log *zap.Logger),
migrate func(ctx context.Context, log *zap.Logger, conn *pgx.Conn, config migrator.Config) (err error),
check func(t *testing.T, ctx context.Context, db satellite.DB), config *migrator.Config) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
log := zaptest.NewLogger(t)
for _, satelliteDB := range satellitedbtest.Databases() {
satelliteDB := satelliteDB
t.Run(satelliteDB.Name, func(t *testing.T) {
schemaSuffix := satellitedbtest.SchemaSuffix()
schema := satellitedbtest.SchemaName(t.Name(), "category", 0, schemaSuffix)
tempDB, err := tempdb.OpenUnique(ctx, satelliteDB.MasterDB.URL, schema)
require.NoError(t, err)
db, err := satellitedbtest.CreateMasterDBOnTopOf(ctx, log, tempDB)
require.NoError(t, err)
defer ctx.Check(db.Close)
err = db.TestingMigrateToLatest(ctx)
require.NoError(t, err)
mConnStr := strings.Replace(tempDB.ConnStr, "cockroach", "postgres", 1)
conn, err := pgx.Connect(ctx, mConnStr)
require.NoError(t, err)
prepare(t, ctx, tempDB, db, conn, log)
err = migrate(ctx, log, conn, *config)
require.NoError(t, err)
require.NoError(t, err)
check(t, ctx, db)
})
}
}
// This is required to test the migration since now all projects are inserted with a public_id.
//
// * * * THIS IS ONLY FOR TESTING!!! * * *.
func testNullifyPublicIDs(ctx context.Context, log *zap.Logger, conn *pgx.Conn, exclude uuid.UUID) error {
_, err := conn.Exec(ctx, `
UPDATE projects
SET public_id = NULL
WHERE id != $1;
`, exclude.Bytes())
return err
}

View File

@ -0,0 +1,104 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"context"
pgx "github.com/jackc/pgx/v4"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/uuid"
"storj.io/private/dbutil/cockroachutil"
"storj.io/private/dbutil/pgutil"
)
// MigrateProjects updates all rows in the projects table, giving them a new UUID if they do not already have one.
func MigrateProjects(ctx context.Context, log *zap.Logger, conn *pgx.Conn, config Config) (err error) {
defer mon.Task()(&ctx)(&err)
lastID := []byte{}
var total int
var rowsFound bool
for {
rowsFound = false
idsToUpdate := struct {
ids [][]byte
publicIDs [][]byte
}{}
err = func() error {
rows, err := conn.Query(ctx, `
SELECT id FROM projects
WHERE id > $1
AND (public_id IS NULL OR public_id = '\x00000000000000000000000000000000')
ORDER BY id
LIMIT $2
`, lastID, config.Limit)
if err != nil {
return errs.New("error select ids for update: %w", err)
}
defer rows.Close()
for rows.Next() {
rowsFound = true
var id []byte
err = rows.Scan(&id)
if err != nil {
return errs.New("error scanning results from select: %w", err)
}
publicID, err := uuid.New()
if err != nil {
return errs.New("error creating new uuid for public_id: %w", err)
}
idsToUpdate.ids = append(idsToUpdate.ids, id)
idsToUpdate.publicIDs = append(idsToUpdate.publicIDs, publicID.Bytes())
lastID = id
}
return rows.Err()
}()
if err != nil {
return err
}
if !rowsFound {
break
}
var updated int
for {
row := conn.QueryRow(ctx, `
WITH to_update AS (
SELECT unnest($1::bytea[]) as id,
unnest($2::bytea[]) as public_id
),
updated as (
UPDATE projects
SET public_id = to_update.public_id
FROM to_update
WHERE projects.id = to_update.id
RETURNING 1
)
SELECT count(*)
FROM updated;
`, pgutil.ByteaArray(idsToUpdate.ids), pgutil.ByteaArray(idsToUpdate.publicIDs),
)
err := row.Scan(&updated)
if err != nil {
if cockroachutil.NeedsRetry(err) {
continue
} else if errs.Is(err, pgx.ErrNoRows) {
break
}
return errs.New("error updating projects %w", err)
}
break
}
total += updated
log.Info("batch update complete", zap.Int("rows updated", updated), zap.Binary("last id", lastID))
}
log.Info("projects migration complete", zap.Int("total rows updated", total))
return nil
}