diff --git a/cmd/metabase-expireat-migration/main.go b/cmd/metabase-expireat-migration/main.go deleted file mode 100644 index b70199e88..000000000 --- a/cmd/metabase-expireat-migration/main.go +++ /dev/null @@ -1,187 +0,0 @@ -// Copyright (C) 2021 Storj Labs, Inc. -// See LICENSE for copying information. - -package main - -import ( - "context" - "errors" - "time" - - "github.com/jackc/pgx/v4" - "github.com/jackc/pgx/v4/pgxpool" - "github.com/spacemonkeygo/monkit/v3" - "github.com/spf13/cobra" - flag "github.com/spf13/pflag" - "github.com/zeebo/errs" - "go.uber.org/zap" - - "storj.io/common/sync2" - "storj.io/common/uuid" - "storj.io/private/process" - "storj.io/storj/satellite/metabase" -) - -var mon = monkit.Package() - -var ( - rootCmd = &cobra.Command{ - Use: "metainfo-expiresat-migration", - Short: "metainfo-expiresat-migration", - } - - runCmd = &cobra.Command{ - Use: "run", - Short: "run metainfo-expiresat-migration", - RunE: run, - } - - config Config -) - -func init() { - rootCmd.AddCommand(runCmd) - - config.BindFlags(runCmd.Flags()) -} - -// Config defines configuration for migration. -type Config struct { - MetabaseDB string - LoopBatchSize int - UpdateBatchSize int - Cockroach bool -} - -// BindFlags adds bench flags to the the flagset. -func (config *Config) BindFlags(flag *flag.FlagSet) { - flag.StringVar(&config.MetabaseDB, "metabasedb", "", "connection URL for MetabaseDB") - flag.IntVar(&config.LoopBatchSize, "loop-batch-size", 10000, "number of objects to process at once") - flag.IntVar(&config.UpdateBatchSize, "update-batch-size", 100, "number of update requests in a single batch call") - flag.BoolVar(&config.Cockroach, "cockroach", true, "metabase is on CRDB") -} - -// VerifyFlags verifies whether the values provided are valid. -func (config *Config) VerifyFlags() error { - var errlist errs.Group - if config.MetabaseDB == "" { - errlist.Add(errors.New("flag '--metabasedb' is not set")) - } - return errlist.Err() -} - -type update struct { - StreamID uuid.UUID - ExpiresAt time.Time -} - -func run(cmd *cobra.Command, args []string) error { - if err := config.VerifyFlags(); err != nil { - return err - } - - ctx, _ := process.Ctx(cmd) - log := zap.L() - return Migrate(ctx, log, config) -} - -func main() { - process.Exec(rootCmd) -} - -// Migrate migrates created_at from object to corresponding segment if value is missing there. -func Migrate(ctx context.Context, log *zap.Logger, config Config) (err error) { - defer mon.Task()(&ctx)(&err) - - poolConfig, err := pgxpool.ParseConfig(config.MetabaseDB) - if err != nil { - return err - } - poolConfig.MaxConns = 10 - - rawMetabaseDB, err := pgxpool.ConnectConfig(ctx, poolConfig) - if err != nil { - return errs.New("unable to connect %q: %w", config.MetabaseDB, err) - } - defer func() { rawMetabaseDB.Close() }() - - metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), config.MetabaseDB) - if err != nil { - return errs.New("unable to connect %q: %w", config.MetabaseDB, err) - } - defer func() { err = errs.Combine(err, metabaseDB.Close()) }() - - startingTime, err := metabaseDB.Now(ctx) - if err != nil { - return err - } - - limiter := sync2.NewLimiter(10) - - updates := make([]update, 0, config.UpdateBatchSize) - numberOfObjects := 0 - return metabaseDB.IterateLoopObjects(ctx, metabase.IterateLoopObjects{ - BatchSize: config.LoopBatchSize, - AsOfSystemTime: startingTime, - }, func(ctx context.Context, it metabase.LoopObjectsIterator) error { - defer mon.TaskNamed("migrateExpiresAtLO")(&ctx, "objs", numberOfObjects)(&err) - - var entry metabase.LoopObjectEntry - for it.Next(ctx, &entry) { - if entry.ExpiresAt != nil { - updates = append(updates, update{ - StreamID: entry.StreamID, - ExpiresAt: *entry.ExpiresAt, - }) - - if len(updates) == config.UpdateBatchSize { - toSend := make([]update, len(updates)) - copy(toSend, updates) - limiter.Go(ctx, func() { - sendUpdates(ctx, log, rawMetabaseDB, toSend) - }) - updates = updates[:0] - } - } - - if numberOfObjects%100000 == 0 { - log.Info("updated", zap.Int("objects", numberOfObjects)) - } - - numberOfObjects++ - } - - limiter.Wait() - - sendUpdates(ctx, log, rawMetabaseDB, updates) - - log.Info("finished", zap.Int("objects", numberOfObjects)) - - return nil - }) -} - -func sendUpdates(ctx context.Context, log *zap.Logger, conn *pgxpool.Pool, updates []update) { - defer mon.Task()(&ctx)(nil) - - if len(updates) == 0 { - return - } - - batch := &pgx.Batch{} - for _, update := range updates { - batch.Queue("UPDATE segments SET expires_at = $2 WHERE stream_id = $1 AND expires_at IS NULL;", update.StreamID, update.ExpiresAt) - } - - br := conn.SendBatch(ctx, batch) - for _, update := range updates { - _, err := br.Exec() - if err != nil { - log.Error("error during updating segment", zap.String("StreamID", update.StreamID.String()), zap.Error(err)) - } - } - - if err := br.Close(); err != nil { - log.Error("error during closing batch result", zap.Error(err)) - } -} diff --git a/cmd/metabase-expireat-migration/main_test.go b/cmd/metabase-expireat-migration/main_test.go deleted file mode 100644 index 0f6f772e2..000000000 --- a/cmd/metabase-expireat-migration/main_test.go +++ /dev/null @@ -1,192 +0,0 @@ -// Copyright (C) 2021 Storj Labs, Inc. -// See LICENSE for copying information. - -package main_test - -import ( - "context" - "strings" - "testing" - "time" - - "github.com/stretchr/testify/require" - "go.uber.org/zap/zaptest" - - "storj.io/common/testcontext" - "storj.io/common/uuid" - "storj.io/private/dbutil" - "storj.io/private/dbutil/tempdb" - migrator "storj.io/storj/cmd/metabase-expireat-migration" - "storj.io/storj/satellite/metabase" - "storj.io/storj/satellite/metabase/metabasetest" - "storj.io/storj/satellite/satellitedb/satellitedbtest" -) - -func TestMigrator_NoSegments(t *testing.T) { - prepare := func(t *testing.T, ctx *testcontext.Context, rawDB *dbutil.TempDatabase, metabaseDB *metabase.DB) { - metabasetest.CreateObject(ctx, t, metabaseDB, metabasetest.RandObjectStream(), 0) - } - - check := func(t *testing.T, ctx context.Context, metabaseDB *metabase.DB) { - segments, err := metabaseDB.TestingAllSegments(ctx) - require.NoError(t, err) - require.Len(t, segments, 0) - } - test(t, prepare, check) -} - -func TestMigrator_SingleSegment(t *testing.T) { - expectedExpiresAt := time.Now().Add(27 * time.Hour) - prepare := func(t *testing.T, ctx *testcontext.Context, rawDB *dbutil.TempDatabase, metabaseDB *metabase.DB) { - metabasetest.CreateExpiredObject(ctx, t, metabaseDB, metabasetest.RandObjectStream(), - 1, expectedExpiresAt) - - segments, err := metabaseDB.TestingAllSegments(ctx) - require.NoError(t, err) - require.Len(t, segments, 1) - require.NotNil(t, segments[0].ExpiresAt) - - _, err = rawDB.ExecContext(ctx, `UPDATE segments SET expires_at = NULL`) - require.NoError(t, err) - - segments, err = metabaseDB.TestingAllSegments(ctx) - require.NoError(t, err) - require.Len(t, segments, 1) - require.Nil(t, segments[0].ExpiresAt) - } - - check := func(t *testing.T, ctx context.Context, metabaseDB *metabase.DB) { - segments, err := metabaseDB.TestingAllSegments(ctx) - require.NoError(t, err) - require.Len(t, segments, 1) - require.NotNil(t, segments[0].ExpiresAt) - require.Equal(t, expectedExpiresAt.Unix(), segments[0].ExpiresAt.Unix()) - } - test(t, prepare, check) -} - -func TestMigrator_ManySegments(t *testing.T) { - numberOfObjects := 100 - expectedExpiresAt := map[uuid.UUID]*time.Time{} - prepare := func(t *testing.T, ctx *testcontext.Context, rawDB *dbutil.TempDatabase, metabaseDB *metabase.DB) { - for i := 0; i < numberOfObjects; i++ { - commitedObject := metabasetest.CreateExpiredObject(ctx, t, metabaseDB, metabasetest.RandObjectStream(), - 1, time.Now().Add(5*time.Hour)) - expectedExpiresAt[commitedObject.StreamID] = commitedObject.ExpiresAt - } - - segments, err := metabaseDB.TestingAllSegments(ctx) - require.NoError(t, err) - require.Len(t, segments, numberOfObjects) - for _, segment := range segments { - require.NotNil(t, segment.ExpiresAt) - } - - _, err = rawDB.ExecContext(ctx, `UPDATE segments SET expires_at = NULL`) - require.NoError(t, err) - - segments, err = metabaseDB.TestingAllSegments(ctx) - require.NoError(t, err) - require.Len(t, segments, numberOfObjects) - for _, segment := range segments { - require.Nil(t, segment.ExpiresAt) - } - } - - check := func(t *testing.T, ctx context.Context, metabaseDB *metabase.DB) { - segments, err := metabaseDB.TestingAllSegments(ctx) - require.NoError(t, err) - require.Len(t, segments, numberOfObjects) - for _, segment := range segments { - require.NotNil(t, segment.ExpiresAt) - expiresAt, found := expectedExpiresAt[segment.StreamID] - require.True(t, found) - require.Equal(t, expiresAt, segment.ExpiresAt) - } - } - test(t, prepare, check) -} - -func TestMigrator_SegmentsWithAndWithoutExpiresAt(t *testing.T) { - expectedExpiresAt := time.Now().Add(27 * time.Hour) - var segmentsBefore []metabase.Segment - prepare := func(t *testing.T, ctx *testcontext.Context, rawDB *dbutil.TempDatabase, metabaseDB *metabase.DB) { - metabasetest.CreateExpiredObject(ctx, t, metabaseDB, metabasetest.RandObjectStream(), - 10, expectedExpiresAt) - - segments, err := metabaseDB.TestingAllSegments(ctx) - require.NoError(t, err) - require.Len(t, segments, 10) - for _, segment := range segments { - require.NotNil(t, segment.ExpiresAt) - } - - // set expires_at to null for half of segments - _, err = rawDB.ExecContext(ctx, `UPDATE segments SET expires_at = NULL WHERE position < 5`) - require.NoError(t, err) - - segmentsBefore, err = metabaseDB.TestingAllSegments(ctx) - require.NoError(t, err) - require.Len(t, segmentsBefore, 10) - for i := 0; i < len(segmentsBefore); i++ { - if i < 5 { - require.Nil(t, segmentsBefore[i].ExpiresAt) - } else { - require.NotNil(t, segmentsBefore[i].ExpiresAt) - } - } - } - - check := func(t *testing.T, ctx context.Context, metabaseDB *metabase.DB) { - segments, err := metabaseDB.TestingAllSegments(ctx) - require.NoError(t, err) - require.Len(t, segments, 10) - for i := 0; i < len(segments); i++ { - require.NotNil(t, segments[i].ExpiresAt) - require.Equal(t, expectedExpiresAt.Unix(), segments[i].ExpiresAt.Unix()) - } - } - test(t, prepare, check) -} - -func test(t *testing.T, prepare func(t *testing.T, ctx *testcontext.Context, rawDB *dbutil.TempDatabase, metabaseDB *metabase.DB), - check func(t *testing.T, ctx context.Context, metabaseDB *metabase.DB)) { - ctx := testcontext.New(t) - defer ctx.Cleanup() - - log := zaptest.NewLogger(t) - - for _, satelliteDB := range satellitedbtest.Databases() { - satelliteDB := satelliteDB - t.Run(satelliteDB.Name, func(t *testing.T) { - schemaSuffix := satellitedbtest.SchemaSuffix() - schema := satellitedbtest.SchemaName(t.Name(), "category", 0, schemaSuffix) - - metabaseTempDB, err := tempdb.OpenUnique(ctx, satelliteDB.MetabaseDB.URL, schema) - require.NoError(t, err) - - metabaseDB, err := satellitedbtest.CreateMetabaseDBOnTopOf(ctx, log, metabaseTempDB) - require.NoError(t, err) - defer ctx.Check(metabaseDB.Close) - - err = metabaseDB.MigrateToLatest(ctx) - require.NoError(t, err) - - prepare(t, ctx, metabaseTempDB, metabaseDB) - - cockroach := strings.HasPrefix(metabaseTempDB.ConnStr, "cockroach") - - // TODO workaround for pgx - mConnStr := strings.Replace(metabaseTempDB.ConnStr, "cockroach", "postgres", 1) - err = migrator.Migrate(ctx, log, migrator.Config{ - MetabaseDB: mConnStr, - LoopBatchSize: 40, - UpdateBatchSize: 10, - Cockroach: cockroach, - }) - require.NoError(t, err) - - check(t, ctx, metabaseDB) - }) - } -} diff --git a/go.sum b/go.sum index 633ff382b..b546b9a3d 100644 --- a/go.sum +++ b/go.sum @@ -275,7 +275,6 @@ github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0f github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.1.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.1.1/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= -github.com/jackc/puddle v1.1.3 h1:JnPg/5Q9xVJGfjsO5CPUOjnJps1JaRUm8I9FXVCFK94= github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=