From 7815e647de1b11962bd0aed55abbd4be15c8c835 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Niewrza=C5=82?= Date: Mon, 7 Jun 2021 15:27:34 +0200 Subject: [PATCH] cmd: add metabase-expiresat-migration tool Expires_at column was added to segments table and we need to migrate this value for existing segments from corresponding objects. This standalone tool will read all objects and if expires_at is set then will send update query for this object segments. Updates will be send in batches and in parallel. Change-Id: I1fddf0af8cde0f560582d25c6d0e07a00b58e534 --- cmd/metabase-expireat-migration/main.go | 187 ++++++++++++++++++ cmd/metabase-expireat-migration/main_test.go | 192 +++++++++++++++++++ go.sum | 1 + 3 files changed, 380 insertions(+) create mode 100644 cmd/metabase-expireat-migration/main.go create mode 100644 cmd/metabase-expireat-migration/main_test.go diff --git a/cmd/metabase-expireat-migration/main.go b/cmd/metabase-expireat-migration/main.go new file mode 100644 index 000000000..b70199e88 --- /dev/null +++ b/cmd/metabase-expireat-migration/main.go @@ -0,0 +1,187 @@ +// Copyright (C) 2021 Storj Labs, Inc. +// See LICENSE for copying information. + +package main + +import ( + "context" + "errors" + "time" + + "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/pgxpool" + "github.com/spacemonkeygo/monkit/v3" + "github.com/spf13/cobra" + flag "github.com/spf13/pflag" + "github.com/zeebo/errs" + "go.uber.org/zap" + + "storj.io/common/sync2" + "storj.io/common/uuid" + "storj.io/private/process" + "storj.io/storj/satellite/metabase" +) + +var mon = monkit.Package() + +var ( + rootCmd = &cobra.Command{ + Use: "metainfo-expiresat-migration", + Short: "metainfo-expiresat-migration", + } + + runCmd = &cobra.Command{ + Use: "run", + Short: "run metainfo-expiresat-migration", + RunE: run, + } + + config Config +) + +func init() { + rootCmd.AddCommand(runCmd) + + config.BindFlags(runCmd.Flags()) +} + +// Config defines configuration for migration. +type Config struct { + MetabaseDB string + LoopBatchSize int + UpdateBatchSize int + Cockroach bool +} + +// BindFlags adds bench flags to the the flagset. +func (config *Config) BindFlags(flag *flag.FlagSet) { + flag.StringVar(&config.MetabaseDB, "metabasedb", "", "connection URL for MetabaseDB") + flag.IntVar(&config.LoopBatchSize, "loop-batch-size", 10000, "number of objects to process at once") + flag.IntVar(&config.UpdateBatchSize, "update-batch-size", 100, "number of update requests in a single batch call") + flag.BoolVar(&config.Cockroach, "cockroach", true, "metabase is on CRDB") +} + +// VerifyFlags verifies whether the values provided are valid. +func (config *Config) VerifyFlags() error { + var errlist errs.Group + if config.MetabaseDB == "" { + errlist.Add(errors.New("flag '--metabasedb' is not set")) + } + return errlist.Err() +} + +type update struct { + StreamID uuid.UUID + ExpiresAt time.Time +} + +func run(cmd *cobra.Command, args []string) error { + if err := config.VerifyFlags(); err != nil { + return err + } + + ctx, _ := process.Ctx(cmd) + log := zap.L() + return Migrate(ctx, log, config) +} + +func main() { + process.Exec(rootCmd) +} + +// Migrate migrates created_at from object to corresponding segment if value is missing there. +func Migrate(ctx context.Context, log *zap.Logger, config Config) (err error) { + defer mon.Task()(&ctx)(&err) + + poolConfig, err := pgxpool.ParseConfig(config.MetabaseDB) + if err != nil { + return err + } + poolConfig.MaxConns = 10 + + rawMetabaseDB, err := pgxpool.ConnectConfig(ctx, poolConfig) + if err != nil { + return errs.New("unable to connect %q: %w", config.MetabaseDB, err) + } + defer func() { rawMetabaseDB.Close() }() + + metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), config.MetabaseDB) + if err != nil { + return errs.New("unable to connect %q: %w", config.MetabaseDB, err) + } + defer func() { err = errs.Combine(err, metabaseDB.Close()) }() + + startingTime, err := metabaseDB.Now(ctx) + if err != nil { + return err + } + + limiter := sync2.NewLimiter(10) + + updates := make([]update, 0, config.UpdateBatchSize) + numberOfObjects := 0 + return metabaseDB.IterateLoopObjects(ctx, metabase.IterateLoopObjects{ + BatchSize: config.LoopBatchSize, + AsOfSystemTime: startingTime, + }, func(ctx context.Context, it metabase.LoopObjectsIterator) error { + defer mon.TaskNamed("migrateExpiresAtLO")(&ctx, "objs", numberOfObjects)(&err) + + var entry metabase.LoopObjectEntry + for it.Next(ctx, &entry) { + if entry.ExpiresAt != nil { + updates = append(updates, update{ + StreamID: entry.StreamID, + ExpiresAt: *entry.ExpiresAt, + }) + + if len(updates) == config.UpdateBatchSize { + toSend := make([]update, len(updates)) + copy(toSend, updates) + limiter.Go(ctx, func() { + sendUpdates(ctx, log, rawMetabaseDB, toSend) + }) + updates = updates[:0] + } + } + + if numberOfObjects%100000 == 0 { + log.Info("updated", zap.Int("objects", numberOfObjects)) + } + + numberOfObjects++ + } + + limiter.Wait() + + sendUpdates(ctx, log, rawMetabaseDB, updates) + + log.Info("finished", zap.Int("objects", numberOfObjects)) + + return nil + }) +} + +func sendUpdates(ctx context.Context, log *zap.Logger, conn *pgxpool.Pool, updates []update) { + defer mon.Task()(&ctx)(nil) + + if len(updates) == 0 { + return + } + + batch := &pgx.Batch{} + for _, update := range updates { + batch.Queue("UPDATE segments SET expires_at = $2 WHERE stream_id = $1 AND expires_at IS NULL;", update.StreamID, update.ExpiresAt) + } + + br := conn.SendBatch(ctx, batch) + for _, update := range updates { + _, err := br.Exec() + if err != nil { + log.Error("error during updating segment", zap.String("StreamID", update.StreamID.String()), zap.Error(err)) + } + } + + if err := br.Close(); err != nil { + log.Error("error during closing batch result", zap.Error(err)) + } +} diff --git a/cmd/metabase-expireat-migration/main_test.go b/cmd/metabase-expireat-migration/main_test.go new file mode 100644 index 000000000..0f6f772e2 --- /dev/null +++ b/cmd/metabase-expireat-migration/main_test.go @@ -0,0 +1,192 @@ +// Copyright (C) 2021 Storj Labs, Inc. +// See LICENSE for copying information. + +package main_test + +import ( + "context" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + + "storj.io/common/testcontext" + "storj.io/common/uuid" + "storj.io/private/dbutil" + "storj.io/private/dbutil/tempdb" + migrator "storj.io/storj/cmd/metabase-expireat-migration" + "storj.io/storj/satellite/metabase" + "storj.io/storj/satellite/metabase/metabasetest" + "storj.io/storj/satellite/satellitedb/satellitedbtest" +) + +func TestMigrator_NoSegments(t *testing.T) { + prepare := func(t *testing.T, ctx *testcontext.Context, rawDB *dbutil.TempDatabase, metabaseDB *metabase.DB) { + metabasetest.CreateObject(ctx, t, metabaseDB, metabasetest.RandObjectStream(), 0) + } + + check := func(t *testing.T, ctx context.Context, metabaseDB *metabase.DB) { + segments, err := metabaseDB.TestingAllSegments(ctx) + require.NoError(t, err) + require.Len(t, segments, 0) + } + test(t, prepare, check) +} + +func TestMigrator_SingleSegment(t *testing.T) { + expectedExpiresAt := time.Now().Add(27 * time.Hour) + prepare := func(t *testing.T, ctx *testcontext.Context, rawDB *dbutil.TempDatabase, metabaseDB *metabase.DB) { + metabasetest.CreateExpiredObject(ctx, t, metabaseDB, metabasetest.RandObjectStream(), + 1, expectedExpiresAt) + + segments, err := metabaseDB.TestingAllSegments(ctx) + require.NoError(t, err) + require.Len(t, segments, 1) + require.NotNil(t, segments[0].ExpiresAt) + + _, err = rawDB.ExecContext(ctx, `UPDATE segments SET expires_at = NULL`) + require.NoError(t, err) + + segments, err = metabaseDB.TestingAllSegments(ctx) + require.NoError(t, err) + require.Len(t, segments, 1) + require.Nil(t, segments[0].ExpiresAt) + } + + check := func(t *testing.T, ctx context.Context, metabaseDB *metabase.DB) { + segments, err := metabaseDB.TestingAllSegments(ctx) + require.NoError(t, err) + require.Len(t, segments, 1) + require.NotNil(t, segments[0].ExpiresAt) + require.Equal(t, expectedExpiresAt.Unix(), segments[0].ExpiresAt.Unix()) + } + test(t, prepare, check) +} + +func TestMigrator_ManySegments(t *testing.T) { + numberOfObjects := 100 + expectedExpiresAt := map[uuid.UUID]*time.Time{} + prepare := func(t *testing.T, ctx *testcontext.Context, rawDB *dbutil.TempDatabase, metabaseDB *metabase.DB) { + for i := 0; i < numberOfObjects; i++ { + commitedObject := metabasetest.CreateExpiredObject(ctx, t, metabaseDB, metabasetest.RandObjectStream(), + 1, time.Now().Add(5*time.Hour)) + expectedExpiresAt[commitedObject.StreamID] = commitedObject.ExpiresAt + } + + segments, err := metabaseDB.TestingAllSegments(ctx) + require.NoError(t, err) + require.Len(t, segments, numberOfObjects) + for _, segment := range segments { + require.NotNil(t, segment.ExpiresAt) + } + + _, err = rawDB.ExecContext(ctx, `UPDATE segments SET expires_at = NULL`) + require.NoError(t, err) + + segments, err = metabaseDB.TestingAllSegments(ctx) + require.NoError(t, err) + require.Len(t, segments, numberOfObjects) + for _, segment := range segments { + require.Nil(t, segment.ExpiresAt) + } + } + + check := func(t *testing.T, ctx context.Context, metabaseDB *metabase.DB) { + segments, err := metabaseDB.TestingAllSegments(ctx) + require.NoError(t, err) + require.Len(t, segments, numberOfObjects) + for _, segment := range segments { + require.NotNil(t, segment.ExpiresAt) + expiresAt, found := expectedExpiresAt[segment.StreamID] + require.True(t, found) + require.Equal(t, expiresAt, segment.ExpiresAt) + } + } + test(t, prepare, check) +} + +func TestMigrator_SegmentsWithAndWithoutExpiresAt(t *testing.T) { + expectedExpiresAt := time.Now().Add(27 * time.Hour) + var segmentsBefore []metabase.Segment + prepare := func(t *testing.T, ctx *testcontext.Context, rawDB *dbutil.TempDatabase, metabaseDB *metabase.DB) { + metabasetest.CreateExpiredObject(ctx, t, metabaseDB, metabasetest.RandObjectStream(), + 10, expectedExpiresAt) + + segments, err := metabaseDB.TestingAllSegments(ctx) + require.NoError(t, err) + require.Len(t, segments, 10) + for _, segment := range segments { + require.NotNil(t, segment.ExpiresAt) + } + + // set expires_at to null for half of segments + _, err = rawDB.ExecContext(ctx, `UPDATE segments SET expires_at = NULL WHERE position < 5`) + require.NoError(t, err) + + segmentsBefore, err = metabaseDB.TestingAllSegments(ctx) + require.NoError(t, err) + require.Len(t, segmentsBefore, 10) + for i := 0; i < len(segmentsBefore); i++ { + if i < 5 { + require.Nil(t, segmentsBefore[i].ExpiresAt) + } else { + require.NotNil(t, segmentsBefore[i].ExpiresAt) + } + } + } + + check := func(t *testing.T, ctx context.Context, metabaseDB *metabase.DB) { + segments, err := metabaseDB.TestingAllSegments(ctx) + require.NoError(t, err) + require.Len(t, segments, 10) + for i := 0; i < len(segments); i++ { + require.NotNil(t, segments[i].ExpiresAt) + require.Equal(t, expectedExpiresAt.Unix(), segments[i].ExpiresAt.Unix()) + } + } + test(t, prepare, check) +} + +func test(t *testing.T, prepare func(t *testing.T, ctx *testcontext.Context, rawDB *dbutil.TempDatabase, metabaseDB *metabase.DB), + check func(t *testing.T, ctx context.Context, metabaseDB *metabase.DB)) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + log := zaptest.NewLogger(t) + + for _, satelliteDB := range satellitedbtest.Databases() { + satelliteDB := satelliteDB + t.Run(satelliteDB.Name, func(t *testing.T) { + schemaSuffix := satellitedbtest.SchemaSuffix() + schema := satellitedbtest.SchemaName(t.Name(), "category", 0, schemaSuffix) + + metabaseTempDB, err := tempdb.OpenUnique(ctx, satelliteDB.MetabaseDB.URL, schema) + require.NoError(t, err) + + metabaseDB, err := satellitedbtest.CreateMetabaseDBOnTopOf(ctx, log, metabaseTempDB) + require.NoError(t, err) + defer ctx.Check(metabaseDB.Close) + + err = metabaseDB.MigrateToLatest(ctx) + require.NoError(t, err) + + prepare(t, ctx, metabaseTempDB, metabaseDB) + + cockroach := strings.HasPrefix(metabaseTempDB.ConnStr, "cockroach") + + // TODO workaround for pgx + mConnStr := strings.Replace(metabaseTempDB.ConnStr, "cockroach", "postgres", 1) + err = migrator.Migrate(ctx, log, migrator.Config{ + MetabaseDB: mConnStr, + LoopBatchSize: 40, + UpdateBatchSize: 10, + Cockroach: cockroach, + }) + require.NoError(t, err) + + check(t, ctx, metabaseDB) + }) + } +} diff --git a/go.sum b/go.sum index 988a9d327..663ba0525 100644 --- a/go.sum +++ b/go.sum @@ -275,6 +275,7 @@ github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0f github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.1.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.1.1/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= +github.com/jackc/puddle v1.1.3 h1:JnPg/5Q9xVJGfjsO5CPUOjnJps1JaRUm8I9FXVCFK94= github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=