storj/cmd/metabase-createdat-migration/main.go

152 lines
3.9 KiB
Go
Raw Normal View History

// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"context"
"flag"
"log"
"time"
"github.com/jackc/pgx/v4"
"github.com/zeebo/errs"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"storj.io/common/uuid"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/metainfo/metabase"
)
// Config defines configuration for migration.
type Config struct {
LoopBatchSize int
UpdateBatchSize int
}
type update struct {
StreamID uuid.UUID
CreatedAt time.Time
}
func main() {
var metabasedb string
var loopBatchSize, updateBatchSize int
flag.StringVar(&metabasedb, "metabasedb", "", "connection URL for MetabaseDB")
flag.IntVar(&loopBatchSize, "loop-batch-size", 10000, "number of objects to process at once")
flag.IntVar(&updateBatchSize, "update-batch-size", 1000, "number of update requests in a single batch call")
flag.Parse()
if metabasedb == "" {
log.Fatalln("Flag '--metabasedb' is not set")
}
ctx := context.Background()
log, err := zap.Config{
Encoding: "console",
Level: zap.NewAtomicLevelAt(zapcore.DebugLevel),
OutputPaths: []string{"stdout"},
ErrorOutputPaths: []string{"stdout"},
EncoderConfig: zapcore.EncoderConfig{
LevelKey: "L",
NameKey: "N",
CallerKey: "C",
MessageKey: "M",
StacktraceKey: "S",
LineEnding: zapcore.DefaultLineEnding,
EncodeLevel: zapcore.CapitalLevelEncoder,
EncodeTime: zapcore.ISO8601TimeEncoder,
EncodeDuration: zapcore.StringDurationEncoder,
EncodeCaller: zapcore.ShortCallerEncoder,
},
}.Build()
if err != nil {
panic(err)
}
defer func() { _ = log.Sync() }()
err = Migrate(ctx, log, metabasedb, Config{
LoopBatchSize: loopBatchSize,
UpdateBatchSize: updateBatchSize,
})
if err != nil {
panic(err)
}
}
// Migrate migrates created_at from object to corresponding segment if value is missing there.
func Migrate(ctx context.Context, log *zap.Logger, metabaseDBStr string, config Config) error {
rawMetabaseDB, err := pgx.Connect(ctx, metabaseDBStr)
if err != nil {
return errs.New("unable to connect %q: %w", metabaseDBStr, err)
}
defer func() { err = errs.Combine(err, rawMetabaseDB.Close(ctx)) }()
metabaseDB, err := metainfo.OpenMetabase(ctx, log.Named("metabase"), metabaseDBStr)
if err != nil {
return errs.New("unable to connect %q: %w", metabaseDBStr, err)
}
defer func() { err = errs.Combine(err, metabaseDB.Close()) }()
startingTime := time.Now()
updates := make([]update, 0, config.UpdateBatchSize)
numberOfObjects := 0
return metabaseDB.IterateLoopObjects(ctx, metabase.IterateLoopObjects{
BatchSize: config.LoopBatchSize,
AsOfSystemTime: startingTime,
}, func(ctx context.Context, it metabase.LoopObjectsIterator) error {
var entry metabase.LoopObjectEntry
for it.Next(ctx, &entry) {
updates = append(updates, update{
StreamID: entry.StreamID,
CreatedAt: entry.CreatedAt,
})
if len(updates) == config.UpdateBatchSize {
sendUpdates(ctx, log, rawMetabaseDB, updates)
updates = updates[:0]
}
if numberOfObjects%1000000 == 0 {
log.Info("updated", zap.Int("objects", numberOfObjects))
}
numberOfObjects++
}
sendUpdates(ctx, log, rawMetabaseDB, updates)
log.Info("finished", zap.Int("objects", numberOfObjects))
return nil
})
}
func sendUpdates(ctx context.Context, log *zap.Logger, conn *pgx.Conn, updates []update) {
if len(updates) == 0 {
return
}
batch := &pgx.Batch{}
for _, update := range updates {
batch.Queue("UPDATE segments SET created_at = $2 WHERE stream_id = $1 AND created_at IS NULL;", update.StreamID, update.CreatedAt)
}
br := conn.SendBatch(ctx, batch)
for _, update := range updates {
_, err := br.Exec()
if err != nil {
log.Error("error during updating segment", zap.String("StreamID", update.StreamID.String()), zap.Error(err))
}
}
if err := br.Close(); err != nil {
log.Error("error during closing batch result", zap.Error(err))
}
}