267506bb20
metabase has become a central concept and it's more suitable for it to be directly nested under satellite rather than being part of metainfo. metainfo is going to be the "endpoint" logic for handling requests. Change-Id: I53770d6761ac1e9a1283b5aa68f471b21e784198
152 lines
3.9 KiB
Go
152 lines
3.9 KiB
Go
// Copyright (C) 2021 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"flag"
|
|
"log"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v4"
|
|
"github.com/zeebo/errs"
|
|
"go.uber.org/zap"
|
|
"go.uber.org/zap/zapcore"
|
|
|
|
"storj.io/common/uuid"
|
|
"storj.io/storj/satellite/metabase"
|
|
"storj.io/storj/satellite/metainfo"
|
|
)
|
|
|
|
// Config defines configuration for migration.
|
|
type Config struct {
|
|
LoopBatchSize int
|
|
UpdateBatchSize int
|
|
}
|
|
|
|
type update struct {
|
|
StreamID uuid.UUID
|
|
CreatedAt time.Time
|
|
}
|
|
|
|
func main() {
|
|
var metabasedb string
|
|
var loopBatchSize, updateBatchSize int
|
|
flag.StringVar(&metabasedb, "metabasedb", "", "connection URL for MetabaseDB")
|
|
flag.IntVar(&loopBatchSize, "loop-batch-size", 10000, "number of objects to process at once")
|
|
flag.IntVar(&updateBatchSize, "update-batch-size", 1000, "number of update requests in a single batch call")
|
|
|
|
flag.Parse()
|
|
|
|
if metabasedb == "" {
|
|
log.Fatalln("Flag '--metabasedb' is not set")
|
|
}
|
|
|
|
ctx := context.Background()
|
|
log, err := zap.Config{
|
|
Encoding: "console",
|
|
Level: zap.NewAtomicLevelAt(zapcore.DebugLevel),
|
|
OutputPaths: []string{"stdout"},
|
|
ErrorOutputPaths: []string{"stdout"},
|
|
EncoderConfig: zapcore.EncoderConfig{
|
|
LevelKey: "L",
|
|
NameKey: "N",
|
|
CallerKey: "C",
|
|
MessageKey: "M",
|
|
StacktraceKey: "S",
|
|
LineEnding: zapcore.DefaultLineEnding,
|
|
EncodeLevel: zapcore.CapitalLevelEncoder,
|
|
EncodeTime: zapcore.ISO8601TimeEncoder,
|
|
EncodeDuration: zapcore.StringDurationEncoder,
|
|
EncodeCaller: zapcore.ShortCallerEncoder,
|
|
},
|
|
}.Build()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
defer func() { _ = log.Sync() }()
|
|
|
|
err = Migrate(ctx, log, metabasedb, Config{
|
|
LoopBatchSize: loopBatchSize,
|
|
UpdateBatchSize: updateBatchSize,
|
|
})
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
// Migrate migrates created_at from object to corresponding segment if value is missing there.
|
|
func Migrate(ctx context.Context, log *zap.Logger, metabaseDBStr string, config Config) error {
|
|
rawMetabaseDB, err := pgx.Connect(ctx, metabaseDBStr)
|
|
if err != nil {
|
|
return errs.New("unable to connect %q: %w", metabaseDBStr, err)
|
|
}
|
|
defer func() { err = errs.Combine(err, rawMetabaseDB.Close(ctx)) }()
|
|
|
|
metabaseDB, err := metainfo.OpenMetabase(ctx, log.Named("metabase"), metabaseDBStr)
|
|
if err != nil {
|
|
return errs.New("unable to connect %q: %w", metabaseDBStr, err)
|
|
}
|
|
defer func() { err = errs.Combine(err, metabaseDB.Close()) }()
|
|
|
|
startingTime := time.Now()
|
|
|
|
updates := make([]update, 0, config.UpdateBatchSize)
|
|
|
|
numberOfObjects := 0
|
|
return metabaseDB.IterateLoopObjects(ctx, metabase.IterateLoopObjects{
|
|
BatchSize: config.LoopBatchSize,
|
|
AsOfSystemTime: startingTime,
|
|
}, func(ctx context.Context, it metabase.LoopObjectsIterator) error {
|
|
var entry metabase.LoopObjectEntry
|
|
for it.Next(ctx, &entry) {
|
|
updates = append(updates, update{
|
|
StreamID: entry.StreamID,
|
|
CreatedAt: entry.CreatedAt,
|
|
})
|
|
|
|
if len(updates) == config.UpdateBatchSize {
|
|
sendUpdates(ctx, log, rawMetabaseDB, updates)
|
|
|
|
updates = updates[:0]
|
|
}
|
|
|
|
if numberOfObjects%1000000 == 0 {
|
|
log.Info("updated", zap.Int("objects", numberOfObjects))
|
|
}
|
|
|
|
numberOfObjects++
|
|
}
|
|
|
|
sendUpdates(ctx, log, rawMetabaseDB, updates)
|
|
|
|
log.Info("finished", zap.Int("objects", numberOfObjects))
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func sendUpdates(ctx context.Context, log *zap.Logger, conn *pgx.Conn, updates []update) {
|
|
if len(updates) == 0 {
|
|
return
|
|
}
|
|
|
|
batch := &pgx.Batch{}
|
|
for _, update := range updates {
|
|
batch.Queue("UPDATE segments SET created_at = $2 WHERE stream_id = $1 AND created_at IS NULL;", update.StreamID, update.CreatedAt)
|
|
}
|
|
|
|
br := conn.SendBatch(ctx, batch)
|
|
for _, update := range updates {
|
|
_, err := br.Exec()
|
|
if err != nil {
|
|
log.Error("error during updating segment", zap.String("StreamID", update.StreamID.String()), zap.Error(err))
|
|
}
|
|
}
|
|
|
|
if err := br.Close(); err != nil {
|
|
log.Error("error during closing batch result", zap.Error(err))
|
|
}
|
|
}
|