cmd: delete metabase-expireat-migration
This migration was applied to all satellites and can be removed. Change-Id: I6c63eeb60da17f6bb8b2d4dd3cd5e6e3d9612638
This commit is contained in:
parent
bab43af6ce
commit
7cbff95090
@ -1,187 +0,0 @@
|
||||
// Copyright (C) 2021 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v4"
|
||||
"github.com/jackc/pgx/v4/pgxpool"
|
||||
"github.com/spacemonkeygo/monkit/v3"
|
||||
"github.com/spf13/cobra"
|
||||
flag "github.com/spf13/pflag"
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/sync2"
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/private/process"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
)
|
||||
|
||||
var mon = monkit.Package()
|
||||
|
||||
var (
|
||||
rootCmd = &cobra.Command{
|
||||
Use: "metainfo-expiresat-migration",
|
||||
Short: "metainfo-expiresat-migration",
|
||||
}
|
||||
|
||||
runCmd = &cobra.Command{
|
||||
Use: "run",
|
||||
Short: "run metainfo-expiresat-migration",
|
||||
RunE: run,
|
||||
}
|
||||
|
||||
config Config
|
||||
)
|
||||
|
||||
func init() {
|
||||
rootCmd.AddCommand(runCmd)
|
||||
|
||||
config.BindFlags(runCmd.Flags())
|
||||
}
|
||||
|
||||
// Config defines configuration for migration.
|
||||
type Config struct {
|
||||
MetabaseDB string
|
||||
LoopBatchSize int
|
||||
UpdateBatchSize int
|
||||
Cockroach bool
|
||||
}
|
||||
|
||||
// BindFlags adds bench flags to the the flagset.
|
||||
func (config *Config) BindFlags(flag *flag.FlagSet) {
|
||||
flag.StringVar(&config.MetabaseDB, "metabasedb", "", "connection URL for MetabaseDB")
|
||||
flag.IntVar(&config.LoopBatchSize, "loop-batch-size", 10000, "number of objects to process at once")
|
||||
flag.IntVar(&config.UpdateBatchSize, "update-batch-size", 100, "number of update requests in a single batch call")
|
||||
flag.BoolVar(&config.Cockroach, "cockroach", true, "metabase is on CRDB")
|
||||
}
|
||||
|
||||
// VerifyFlags verifies whether the values provided are valid.
|
||||
func (config *Config) VerifyFlags() error {
|
||||
var errlist errs.Group
|
||||
if config.MetabaseDB == "" {
|
||||
errlist.Add(errors.New("flag '--metabasedb' is not set"))
|
||||
}
|
||||
return errlist.Err()
|
||||
}
|
||||
|
||||
type update struct {
|
||||
StreamID uuid.UUID
|
||||
ExpiresAt time.Time
|
||||
}
|
||||
|
||||
func run(cmd *cobra.Command, args []string) error {
|
||||
if err := config.VerifyFlags(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx, _ := process.Ctx(cmd)
|
||||
log := zap.L()
|
||||
return Migrate(ctx, log, config)
|
||||
}
|
||||
|
||||
func main() {
|
||||
process.Exec(rootCmd)
|
||||
}
|
||||
|
||||
// Migrate migrates created_at from object to corresponding segment if value is missing there.
|
||||
func Migrate(ctx context.Context, log *zap.Logger, config Config) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
poolConfig, err := pgxpool.ParseConfig(config.MetabaseDB)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
poolConfig.MaxConns = 10
|
||||
|
||||
rawMetabaseDB, err := pgxpool.ConnectConfig(ctx, poolConfig)
|
||||
if err != nil {
|
||||
return errs.New("unable to connect %q: %w", config.MetabaseDB, err)
|
||||
}
|
||||
defer func() { rawMetabaseDB.Close() }()
|
||||
|
||||
metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), config.MetabaseDB)
|
||||
if err != nil {
|
||||
return errs.New("unable to connect %q: %w", config.MetabaseDB, err)
|
||||
}
|
||||
defer func() { err = errs.Combine(err, metabaseDB.Close()) }()
|
||||
|
||||
startingTime, err := metabaseDB.Now(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
limiter := sync2.NewLimiter(10)
|
||||
|
||||
updates := make([]update, 0, config.UpdateBatchSize)
|
||||
numberOfObjects := 0
|
||||
return metabaseDB.IterateLoopObjects(ctx, metabase.IterateLoopObjects{
|
||||
BatchSize: config.LoopBatchSize,
|
||||
AsOfSystemTime: startingTime,
|
||||
}, func(ctx context.Context, it metabase.LoopObjectsIterator) error {
|
||||
defer mon.TaskNamed("migrateExpiresAtLO")(&ctx, "objs", numberOfObjects)(&err)
|
||||
|
||||
var entry metabase.LoopObjectEntry
|
||||
for it.Next(ctx, &entry) {
|
||||
if entry.ExpiresAt != nil {
|
||||
updates = append(updates, update{
|
||||
StreamID: entry.StreamID,
|
||||
ExpiresAt: *entry.ExpiresAt,
|
||||
})
|
||||
|
||||
if len(updates) == config.UpdateBatchSize {
|
||||
toSend := make([]update, len(updates))
|
||||
copy(toSend, updates)
|
||||
limiter.Go(ctx, func() {
|
||||
sendUpdates(ctx, log, rawMetabaseDB, toSend)
|
||||
})
|
||||
updates = updates[:0]
|
||||
}
|
||||
}
|
||||
|
||||
if numberOfObjects%100000 == 0 {
|
||||
log.Info("updated", zap.Int("objects", numberOfObjects))
|
||||
}
|
||||
|
||||
numberOfObjects++
|
||||
}
|
||||
|
||||
limiter.Wait()
|
||||
|
||||
sendUpdates(ctx, log, rawMetabaseDB, updates)
|
||||
|
||||
log.Info("finished", zap.Int("objects", numberOfObjects))
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func sendUpdates(ctx context.Context, log *zap.Logger, conn *pgxpool.Pool, updates []update) {
|
||||
defer mon.Task()(&ctx)(nil)
|
||||
|
||||
if len(updates) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
batch := &pgx.Batch{}
|
||||
for _, update := range updates {
|
||||
batch.Queue("UPDATE segments SET expires_at = $2 WHERE stream_id = $1 AND expires_at IS NULL;", update.StreamID, update.ExpiresAt)
|
||||
}
|
||||
|
||||
br := conn.SendBatch(ctx, batch)
|
||||
for _, update := range updates {
|
||||
_, err := br.Exec()
|
||||
if err != nil {
|
||||
log.Error("error during updating segment", zap.String("StreamID", update.StreamID.String()), zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
if err := br.Close(); err != nil {
|
||||
log.Error("error during closing batch result", zap.Error(err))
|
||||
}
|
||||
}
|
@ -1,192 +0,0 @@
|
||||
// Copyright (C) 2021 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package main_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/private/dbutil"
|
||||
"storj.io/private/dbutil/tempdb"
|
||||
migrator "storj.io/storj/cmd/metabase-expireat-migration"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
"storj.io/storj/satellite/metabase/metabasetest"
|
||||
"storj.io/storj/satellite/satellitedb/satellitedbtest"
|
||||
)
|
||||
|
||||
func TestMigrator_NoSegments(t *testing.T) {
|
||||
prepare := func(t *testing.T, ctx *testcontext.Context, rawDB *dbutil.TempDatabase, metabaseDB *metabase.DB) {
|
||||
metabasetest.CreateObject(ctx, t, metabaseDB, metabasetest.RandObjectStream(), 0)
|
||||
}
|
||||
|
||||
check := func(t *testing.T, ctx context.Context, metabaseDB *metabase.DB) {
|
||||
segments, err := metabaseDB.TestingAllSegments(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, segments, 0)
|
||||
}
|
||||
test(t, prepare, check)
|
||||
}
|
||||
|
||||
func TestMigrator_SingleSegment(t *testing.T) {
|
||||
expectedExpiresAt := time.Now().Add(27 * time.Hour)
|
||||
prepare := func(t *testing.T, ctx *testcontext.Context, rawDB *dbutil.TempDatabase, metabaseDB *metabase.DB) {
|
||||
metabasetest.CreateExpiredObject(ctx, t, metabaseDB, metabasetest.RandObjectStream(),
|
||||
1, expectedExpiresAt)
|
||||
|
||||
segments, err := metabaseDB.TestingAllSegments(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, segments, 1)
|
||||
require.NotNil(t, segments[0].ExpiresAt)
|
||||
|
||||
_, err = rawDB.ExecContext(ctx, `UPDATE segments SET expires_at = NULL`)
|
||||
require.NoError(t, err)
|
||||
|
||||
segments, err = metabaseDB.TestingAllSegments(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, segments, 1)
|
||||
require.Nil(t, segments[0].ExpiresAt)
|
||||
}
|
||||
|
||||
check := func(t *testing.T, ctx context.Context, metabaseDB *metabase.DB) {
|
||||
segments, err := metabaseDB.TestingAllSegments(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, segments, 1)
|
||||
require.NotNil(t, segments[0].ExpiresAt)
|
||||
require.Equal(t, expectedExpiresAt.Unix(), segments[0].ExpiresAt.Unix())
|
||||
}
|
||||
test(t, prepare, check)
|
||||
}
|
||||
|
||||
func TestMigrator_ManySegments(t *testing.T) {
|
||||
numberOfObjects := 100
|
||||
expectedExpiresAt := map[uuid.UUID]*time.Time{}
|
||||
prepare := func(t *testing.T, ctx *testcontext.Context, rawDB *dbutil.TempDatabase, metabaseDB *metabase.DB) {
|
||||
for i := 0; i < numberOfObjects; i++ {
|
||||
commitedObject := metabasetest.CreateExpiredObject(ctx, t, metabaseDB, metabasetest.RandObjectStream(),
|
||||
1, time.Now().Add(5*time.Hour))
|
||||
expectedExpiresAt[commitedObject.StreamID] = commitedObject.ExpiresAt
|
||||
}
|
||||
|
||||
segments, err := metabaseDB.TestingAllSegments(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, segments, numberOfObjects)
|
||||
for _, segment := range segments {
|
||||
require.NotNil(t, segment.ExpiresAt)
|
||||
}
|
||||
|
||||
_, err = rawDB.ExecContext(ctx, `UPDATE segments SET expires_at = NULL`)
|
||||
require.NoError(t, err)
|
||||
|
||||
segments, err = metabaseDB.TestingAllSegments(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, segments, numberOfObjects)
|
||||
for _, segment := range segments {
|
||||
require.Nil(t, segment.ExpiresAt)
|
||||
}
|
||||
}
|
||||
|
||||
check := func(t *testing.T, ctx context.Context, metabaseDB *metabase.DB) {
|
||||
segments, err := metabaseDB.TestingAllSegments(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, segments, numberOfObjects)
|
||||
for _, segment := range segments {
|
||||
require.NotNil(t, segment.ExpiresAt)
|
||||
expiresAt, found := expectedExpiresAt[segment.StreamID]
|
||||
require.True(t, found)
|
||||
require.Equal(t, expiresAt, segment.ExpiresAt)
|
||||
}
|
||||
}
|
||||
test(t, prepare, check)
|
||||
}
|
||||
|
||||
func TestMigrator_SegmentsWithAndWithoutExpiresAt(t *testing.T) {
|
||||
expectedExpiresAt := time.Now().Add(27 * time.Hour)
|
||||
var segmentsBefore []metabase.Segment
|
||||
prepare := func(t *testing.T, ctx *testcontext.Context, rawDB *dbutil.TempDatabase, metabaseDB *metabase.DB) {
|
||||
metabasetest.CreateExpiredObject(ctx, t, metabaseDB, metabasetest.RandObjectStream(),
|
||||
10, expectedExpiresAt)
|
||||
|
||||
segments, err := metabaseDB.TestingAllSegments(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, segments, 10)
|
||||
for _, segment := range segments {
|
||||
require.NotNil(t, segment.ExpiresAt)
|
||||
}
|
||||
|
||||
// set expires_at to null for half of segments
|
||||
_, err = rawDB.ExecContext(ctx, `UPDATE segments SET expires_at = NULL WHERE position < 5`)
|
||||
require.NoError(t, err)
|
||||
|
||||
segmentsBefore, err = metabaseDB.TestingAllSegments(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, segmentsBefore, 10)
|
||||
for i := 0; i < len(segmentsBefore); i++ {
|
||||
if i < 5 {
|
||||
require.Nil(t, segmentsBefore[i].ExpiresAt)
|
||||
} else {
|
||||
require.NotNil(t, segmentsBefore[i].ExpiresAt)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
check := func(t *testing.T, ctx context.Context, metabaseDB *metabase.DB) {
|
||||
segments, err := metabaseDB.TestingAllSegments(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, segments, 10)
|
||||
for i := 0; i < len(segments); i++ {
|
||||
require.NotNil(t, segments[i].ExpiresAt)
|
||||
require.Equal(t, expectedExpiresAt.Unix(), segments[i].ExpiresAt.Unix())
|
||||
}
|
||||
}
|
||||
test(t, prepare, check)
|
||||
}
|
||||
|
||||
func test(t *testing.T, prepare func(t *testing.T, ctx *testcontext.Context, rawDB *dbutil.TempDatabase, metabaseDB *metabase.DB),
|
||||
check func(t *testing.T, ctx context.Context, metabaseDB *metabase.DB)) {
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
log := zaptest.NewLogger(t)
|
||||
|
||||
for _, satelliteDB := range satellitedbtest.Databases() {
|
||||
satelliteDB := satelliteDB
|
||||
t.Run(satelliteDB.Name, func(t *testing.T) {
|
||||
schemaSuffix := satellitedbtest.SchemaSuffix()
|
||||
schema := satellitedbtest.SchemaName(t.Name(), "category", 0, schemaSuffix)
|
||||
|
||||
metabaseTempDB, err := tempdb.OpenUnique(ctx, satelliteDB.MetabaseDB.URL, schema)
|
||||
require.NoError(t, err)
|
||||
|
||||
metabaseDB, err := satellitedbtest.CreateMetabaseDBOnTopOf(ctx, log, metabaseTempDB)
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(metabaseDB.Close)
|
||||
|
||||
err = metabaseDB.MigrateToLatest(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
prepare(t, ctx, metabaseTempDB, metabaseDB)
|
||||
|
||||
cockroach := strings.HasPrefix(metabaseTempDB.ConnStr, "cockroach")
|
||||
|
||||
// TODO workaround for pgx
|
||||
mConnStr := strings.Replace(metabaseTempDB.ConnStr, "cockroach", "postgres", 1)
|
||||
err = migrator.Migrate(ctx, log, migrator.Config{
|
||||
MetabaseDB: mConnStr,
|
||||
LoopBatchSize: 40,
|
||||
UpdateBatchSize: 10,
|
||||
Cockroach: cockroach,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
check(t, ctx, metabaseDB)
|
||||
})
|
||||
}
|
||||
}
|
1
go.sum
1
go.sum
@ -275,7 +275,6 @@ github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0f
|
||||
github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
|
||||
github.com/jackc/puddle v1.1.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
|
||||
github.com/jackc/puddle v1.1.1/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
|
||||
github.com/jackc/puddle v1.1.3 h1:JnPg/5Q9xVJGfjsO5CPUOjnJps1JaRUm8I9FXVCFK94=
|
||||
github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
|
||||
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
|
||||
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
||||
|
Loading…
Reference in New Issue
Block a user