metabase-createdat-migration: add migration tool
We recently added create_at column to segments table. Old segments needs to get this value from objects table. This tool will iterate over all objects and update corresponding segments if create_at column is not set. Change-Id: Ib5aedc384637e739ee9af84454af0639e2559416
This commit is contained in:
parent
372bb1140e
commit
6b88a675c5
151
cmd/metabase-createdat-migration/main.go
Normal file
151
cmd/metabase-createdat-migration/main.go
Normal file
@ -0,0 +1,151 @@
|
||||
// Copyright (C) 2021 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v4"
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/storj/satellite/metainfo"
|
||||
"storj.io/storj/satellite/metainfo/metabase"
|
||||
)
|
||||
|
||||
// Config defines configuration for migration.
|
||||
type Config struct {
|
||||
LoopBatchSize int
|
||||
UpdateBatchSize int
|
||||
}
|
||||
|
||||
type update struct {
|
||||
StreamID uuid.UUID
|
||||
CreatedAt time.Time
|
||||
}
|
||||
|
||||
func main() {
|
||||
var metabasedb string
|
||||
var loopBatchSize, updateBatchSize int
|
||||
flag.StringVar(&metabasedb, "metabasedb", "", "connection URL for MetabaseDB")
|
||||
flag.IntVar(&loopBatchSize, "loop-batch-size", 10000, "number of objects to process at once")
|
||||
flag.IntVar(&updateBatchSize, "update-batch-size", 1000, "number of update requests in a single batch call")
|
||||
|
||||
flag.Parse()
|
||||
|
||||
if metabasedb == "" {
|
||||
log.Fatalln("Flag '--metabasedb' is not set")
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
log, err := zap.Config{
|
||||
Encoding: "console",
|
||||
Level: zap.NewAtomicLevelAt(zapcore.DebugLevel),
|
||||
OutputPaths: []string{"stdout"},
|
||||
ErrorOutputPaths: []string{"stdout"},
|
||||
EncoderConfig: zapcore.EncoderConfig{
|
||||
LevelKey: "L",
|
||||
NameKey: "N",
|
||||
CallerKey: "C",
|
||||
MessageKey: "M",
|
||||
StacktraceKey: "S",
|
||||
LineEnding: zapcore.DefaultLineEnding,
|
||||
EncodeLevel: zapcore.CapitalLevelEncoder,
|
||||
EncodeTime: zapcore.ISO8601TimeEncoder,
|
||||
EncodeDuration: zapcore.StringDurationEncoder,
|
||||
EncodeCaller: zapcore.ShortCallerEncoder,
|
||||
},
|
||||
}.Build()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer func() { _ = log.Sync() }()
|
||||
|
||||
err = Migrate(ctx, log, metabasedb, Config{
|
||||
LoopBatchSize: loopBatchSize,
|
||||
UpdateBatchSize: updateBatchSize,
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Migrate migrates created_at from object to corresponding segment if value is missing there.
|
||||
func Migrate(ctx context.Context, log *zap.Logger, metabaseDBStr string, config Config) error {
|
||||
rawMetabaseDB, err := pgx.Connect(ctx, metabaseDBStr)
|
||||
if err != nil {
|
||||
return errs.New("unable to connect %q: %w", metabaseDBStr, err)
|
||||
}
|
||||
defer func() { err = errs.Combine(err, rawMetabaseDB.Close(ctx)) }()
|
||||
|
||||
metabaseDB, err := metainfo.OpenMetabase(ctx, log.Named("metabase"), metabaseDBStr)
|
||||
if err != nil {
|
||||
return errs.New("unable to connect %q: %w", metabaseDBStr, err)
|
||||
}
|
||||
defer func() { err = errs.Combine(err, metabaseDB.Close()) }()
|
||||
|
||||
startingTime := time.Now()
|
||||
|
||||
updates := make([]update, 0, config.UpdateBatchSize)
|
||||
|
||||
numberOfObjects := 0
|
||||
return metabaseDB.IterateLoopObjects(ctx, metabase.IterateLoopObjects{
|
||||
BatchSize: config.LoopBatchSize,
|
||||
AsOfSystemTime: startingTime,
|
||||
}, func(ctx context.Context, it metabase.LoopObjectsIterator) error {
|
||||
var entry metabase.LoopObjectEntry
|
||||
for it.Next(ctx, &entry) {
|
||||
updates = append(updates, update{
|
||||
StreamID: entry.StreamID,
|
||||
CreatedAt: entry.CreatedAt,
|
||||
})
|
||||
|
||||
if len(updates) == config.UpdateBatchSize {
|
||||
sendUpdates(ctx, log, rawMetabaseDB, updates)
|
||||
|
||||
updates = updates[:0]
|
||||
}
|
||||
|
||||
if numberOfObjects%1000000 == 0 {
|
||||
log.Info("updated", zap.Int("objects", numberOfObjects))
|
||||
}
|
||||
|
||||
numberOfObjects++
|
||||
}
|
||||
|
||||
sendUpdates(ctx, log, rawMetabaseDB, updates)
|
||||
|
||||
log.Info("finished", zap.Int("objects", numberOfObjects))
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func sendUpdates(ctx context.Context, log *zap.Logger, conn *pgx.Conn, updates []update) {
|
||||
if len(updates) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
batch := &pgx.Batch{}
|
||||
for _, update := range updates {
|
||||
batch.Queue("UPDATE segments SET created_at = $2 WHERE stream_id = $1 AND created_at IS NULL;", update.StreamID, update.CreatedAt)
|
||||
}
|
||||
|
||||
br := conn.SendBatch(ctx, batch)
|
||||
for _, update := range updates {
|
||||
_, err := br.Exec()
|
||||
if err != nil {
|
||||
log.Error("error during updating segment", zap.String("StreamID", update.StreamID.String()), zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
if err := br.Close(); err != nil {
|
||||
log.Error("error during closing batch result", zap.Error(err))
|
||||
}
|
||||
}
|
271
cmd/metabase-createdat-migration/main_test.go
Normal file
271
cmd/metabase-createdat-migration/main_test.go
Normal file
@ -0,0 +1,271 @@
|
||||
// Copyright (C) 2021 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package main_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/testrand"
|
||||
"storj.io/common/uuid"
|
||||
migrator "storj.io/storj/cmd/metabase-createdat-migration"
|
||||
"storj.io/storj/private/dbutil"
|
||||
"storj.io/storj/private/dbutil/tempdb"
|
||||
"storj.io/storj/satellite/metainfo"
|
||||
"storj.io/storj/satellite/metainfo/metabase"
|
||||
"storj.io/storj/satellite/satellitedb/satellitedbtest"
|
||||
)
|
||||
|
||||
var defaultTestRedundancy = storj.RedundancyScheme{
|
||||
Algorithm: storj.ReedSolomon,
|
||||
ShareSize: 2048,
|
||||
RequiredShares: 1,
|
||||
RepairShares: 1,
|
||||
OptimalShares: 1,
|
||||
TotalShares: 1,
|
||||
}
|
||||
|
||||
var defaultTestEncryption = storj.EncryptionParameters{
|
||||
CipherSuite: storj.EncAESGCM,
|
||||
BlockSize: 29 * 256,
|
||||
}
|
||||
|
||||
func TestMigrator_NoSegments(t *testing.T) {
|
||||
prepare := func(t *testing.T, ctx context.Context, rawDB *dbutil.TempDatabase, metabaseDB metainfo.MetabaseDB) {
|
||||
createObject(ctx, t, metabaseDB, 0)
|
||||
}
|
||||
|
||||
check := func(t *testing.T, ctx context.Context, metabaseDB metainfo.MetabaseDB) {
|
||||
segments, err := metabaseDB.TestingAllSegments(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, segments, 0)
|
||||
}
|
||||
test(t, prepare, check)
|
||||
}
|
||||
|
||||
func TestMigrator_SingleSegment(t *testing.T) {
|
||||
var expectedCreatedAt time.Time
|
||||
prepare := func(t *testing.T, ctx context.Context, rawDB *dbutil.TempDatabase, metabaseDB metainfo.MetabaseDB) {
|
||||
commitedObject := createObject(ctx, t, metabaseDB, 1)
|
||||
expectedCreatedAt = commitedObject.CreatedAt
|
||||
|
||||
segments, err := metabaseDB.TestingAllSegments(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, segments, 1)
|
||||
require.NotNil(t, segments[0].CreatedAt)
|
||||
|
||||
_, err = rawDB.ExecContext(ctx, `UPDATE segments SET created_at = NULL`)
|
||||
require.NoError(t, err)
|
||||
|
||||
segments, err = metabaseDB.TestingAllSegments(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, segments, 1)
|
||||
require.Nil(t, segments[0].CreatedAt)
|
||||
}
|
||||
|
||||
check := func(t *testing.T, ctx context.Context, metabaseDB metainfo.MetabaseDB) {
|
||||
segments, err := metabaseDB.TestingAllSegments(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, segments, 1)
|
||||
require.NotNil(t, segments[0].CreatedAt)
|
||||
require.Equal(t, expectedCreatedAt, *segments[0].CreatedAt)
|
||||
}
|
||||
test(t, prepare, check)
|
||||
}
|
||||
|
||||
func TestMigrator_ManySegments(t *testing.T) {
|
||||
numberOfObjects := 100
|
||||
expectedCreatedAt := map[uuid.UUID]time.Time{}
|
||||
|
||||
prepare := func(t *testing.T, ctx context.Context, rawDB *dbutil.TempDatabase, metabaseDB metainfo.MetabaseDB) {
|
||||
for i := 0; i < numberOfObjects; i++ {
|
||||
commitedObject := createObject(ctx, t, metabaseDB, 1)
|
||||
expectedCreatedAt[commitedObject.StreamID] = commitedObject.CreatedAt
|
||||
}
|
||||
|
||||
segments, err := metabaseDB.TestingAllSegments(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, segments, numberOfObjects)
|
||||
for _, segment := range segments {
|
||||
require.NotNil(t, segment.CreatedAt)
|
||||
}
|
||||
|
||||
_, err = rawDB.ExecContext(ctx, `UPDATE segments SET created_at = NULL`)
|
||||
require.NoError(t, err)
|
||||
|
||||
segments, err = metabaseDB.TestingAllSegments(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, segments, numberOfObjects)
|
||||
for _, segment := range segments {
|
||||
require.Nil(t, segment.CreatedAt)
|
||||
}
|
||||
}
|
||||
|
||||
check := func(t *testing.T, ctx context.Context, metabaseDB metainfo.MetabaseDB) {
|
||||
segments, err := metabaseDB.TestingAllSegments(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, segments, numberOfObjects)
|
||||
for _, segment := range segments {
|
||||
require.NotNil(t, segment.CreatedAt)
|
||||
createdAt, found := expectedCreatedAt[segment.StreamID]
|
||||
require.True(t, found)
|
||||
require.Equal(t, createdAt, *segment.CreatedAt)
|
||||
}
|
||||
}
|
||||
test(t, prepare, check)
|
||||
}
|
||||
|
||||
func TestMigrator_SegmentsWithAndWithoutCreatedAt(t *testing.T) {
|
||||
var expectedCreatedAt time.Time
|
||||
var segmentsBefore []metabase.Segment
|
||||
prepare := func(t *testing.T, ctx context.Context, rawDB *dbutil.TempDatabase, metabaseDB metainfo.MetabaseDB) {
|
||||
commitedObject := createObject(ctx, t, metabaseDB, 10)
|
||||
expectedCreatedAt = commitedObject.CreatedAt
|
||||
|
||||
segments, err := metabaseDB.TestingAllSegments(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, segments, 10)
|
||||
for _, segment := range segments {
|
||||
require.NotNil(t, segment.CreatedAt)
|
||||
}
|
||||
|
||||
// set created_at to null for half of segments
|
||||
_, err = rawDB.ExecContext(ctx, `UPDATE segments SET created_at = NULL WHERE position < 5`)
|
||||
require.NoError(t, err)
|
||||
|
||||
segmentsBefore, err = metabaseDB.TestingAllSegments(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, segmentsBefore, 10)
|
||||
for i := 0; i < len(segmentsBefore); i++ {
|
||||
if i < 5 {
|
||||
require.Nil(t, segmentsBefore[i].CreatedAt)
|
||||
} else {
|
||||
require.NotNil(t, segmentsBefore[i].CreatedAt)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
check := func(t *testing.T, ctx context.Context, metabaseDB metainfo.MetabaseDB) {
|
||||
segments, err := metabaseDB.TestingAllSegments(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, segments, 10)
|
||||
for i := 0; i < len(segments); i++ {
|
||||
require.NotNil(t, segments[i].CreatedAt)
|
||||
if i < 5 {
|
||||
require.Equal(t, expectedCreatedAt, *segments[i].CreatedAt)
|
||||
} else {
|
||||
require.NotEqual(t, expectedCreatedAt, segments[i].CreatedAt)
|
||||
require.Equal(t, segmentsBefore[i].CreatedAt, segments[i].CreatedAt)
|
||||
}
|
||||
}
|
||||
}
|
||||
test(t, prepare, check)
|
||||
}
|
||||
|
||||
func test(t *testing.T, prepare func(t *testing.T, ctx context.Context, rawDB *dbutil.TempDatabase, metabaseDB metainfo.MetabaseDB),
|
||||
check func(t *testing.T, ctx context.Context, metabaseDB metainfo.MetabaseDB)) {
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
log := zaptest.NewLogger(t)
|
||||
|
||||
for _, satelliteDB := range satellitedbtest.Databases() {
|
||||
satelliteDB := satelliteDB
|
||||
if strings.EqualFold(satelliteDB.MasterDB.URL, "omit") {
|
||||
continue
|
||||
}
|
||||
t.Run(satelliteDB.Name, func(t *testing.T) {
|
||||
schemaSuffix := satellitedbtest.SchemaSuffix()
|
||||
schema := satellitedbtest.SchemaName(t.Name(), "category", 0, schemaSuffix)
|
||||
|
||||
metabaseTempDB, err := tempdb.OpenUnique(ctx, satelliteDB.MetabaseDB.URL, schema)
|
||||
require.NoError(t, err)
|
||||
|
||||
metabaseDB, err := satellitedbtest.CreateMetabaseDBOnTopOf(ctx, log, metabaseTempDB)
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(metabaseDB.Close)
|
||||
|
||||
err = metabaseDB.MigrateToLatest(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
prepare(t, ctx, metabaseTempDB, metabaseDB)
|
||||
|
||||
// TODO workaround for pgx
|
||||
mConnStr := strings.Replace(metabaseTempDB.ConnStr, "cockroach", "postgres", 1)
|
||||
err = migrator.Migrate(ctx, log, mConnStr, migrator.Config{
|
||||
LoopBatchSize: 40,
|
||||
UpdateBatchSize: 10,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
check(t, ctx, metabaseDB)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func randObjectStream() metabase.ObjectStream {
|
||||
return metabase.ObjectStream{
|
||||
ProjectID: testrand.UUID(),
|
||||
BucketName: testrand.BucketName(),
|
||||
ObjectKey: metabase.ObjectKey(testrand.Bytes(16)),
|
||||
Version: 1,
|
||||
StreamID: testrand.UUID(),
|
||||
}
|
||||
}
|
||||
|
||||
func createObject(ctx context.Context, t *testing.T, metabaseDB metainfo.MetabaseDB, numberOfSegments int) metabase.Object {
|
||||
object, err := metabaseDB.BeginObjectExactVersion(ctx, metabase.BeginObjectExactVersion{
|
||||
ObjectStream: randObjectStream(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
rootPieceID := testrand.PieceID()
|
||||
pieces := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}}
|
||||
encryptedKey := testrand.Bytes(32)
|
||||
encryptedKeyNonce := testrand.Bytes(32)
|
||||
|
||||
for i := 0; i < numberOfSegments; i++ {
|
||||
err = metabaseDB.BeginSegment(ctx, metabase.BeginSegment{
|
||||
ObjectStream: object.ObjectStream,
|
||||
Position: metabase.SegmentPosition{Part: 0, Index: uint32(i)},
|
||||
RootPieceID: rootPieceID,
|
||||
Pieces: []metabase.Piece{{
|
||||
Number: 1,
|
||||
StorageNode: testrand.NodeID(),
|
||||
}},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = metabaseDB.CommitSegment(ctx, metabase.CommitSegment{
|
||||
ObjectStream: object.ObjectStream,
|
||||
Position: metabase.SegmentPosition{Part: 0, Index: uint32(i)},
|
||||
RootPieceID: rootPieceID,
|
||||
Pieces: pieces,
|
||||
|
||||
EncryptedKey: encryptedKey,
|
||||
EncryptedKeyNonce: encryptedKeyNonce,
|
||||
|
||||
EncryptedSize: 1024,
|
||||
PlainSize: 512,
|
||||
PlainOffset: 0,
|
||||
Redundancy: defaultTestRedundancy,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
commitedObject, err := metabaseDB.CommitObject(ctx, metabase.CommitObject{
|
||||
ObjectStream: object.ObjectStream,
|
||||
Encryption: defaultTestEncryption,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
return commitedObject
|
||||
}
|
@ -44,6 +44,7 @@ type LoopObjectsIterator interface {
|
||||
// LoopObjectEntry contains information about object needed by metainfo loop.
|
||||
type LoopObjectEntry struct {
|
||||
ObjectStream // metrics, repair, tally
|
||||
CreatedAt time.Time // temp used by metabase-createdat-migration
|
||||
ExpiresAt *time.Time // tally
|
||||
SegmentCount int32 // metrics
|
||||
EncryptedMetadataSize int // tally
|
||||
@ -160,7 +161,7 @@ func (it *loopIterator) doNextQuery(ctx context.Context) (_ tagsql.Rows, err err
|
||||
SELECT
|
||||
project_id, bucket_name,
|
||||
object_key, stream_id, version,
|
||||
expires_at,
|
||||
created_at, expires_at,
|
||||
segment_count,
|
||||
LENGTH(COALESCE(encrypted_metadata,''))
|
||||
FROM objects
|
||||
@ -179,7 +180,7 @@ func (it *loopIterator) scanItem(item *LoopObjectEntry) error {
|
||||
return it.curRows.Scan(
|
||||
&item.ProjectID, &item.BucketName,
|
||||
&item.ObjectKey, &item.StreamID, &item.Version,
|
||||
&item.ExpiresAt,
|
||||
&item.CreatedAt, &item.ExpiresAt,
|
||||
&item.SegmentCount,
|
||||
&item.EncryptedMetadataSize,
|
||||
)
|
||||
|
@ -96,13 +96,16 @@ func TestIterateLoopObjects(t *testing.T) {
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
createdAt := time.Now()
|
||||
expected := []metabase.LoopObjectEntry{
|
||||
{
|
||||
ObjectStream: pending,
|
||||
CreatedAt: createdAt,
|
||||
},
|
||||
{
|
||||
ObjectStream: committed,
|
||||
EncryptedMetadataSize: len(encryptedMetadata),
|
||||
CreatedAt: createdAt,
|
||||
},
|
||||
}
|
||||
|
||||
@ -441,6 +444,7 @@ func createFullObjectsWithKeys(ctx *testcontext.Context, t *testing.T, db *metab
|
||||
|
||||
objects[key] = metabase.LoopObjectEntry{
|
||||
ObjectStream: obj,
|
||||
CreatedAt: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -452,5 +456,6 @@ func loopObjectEntryFromRaw(m metabase.RawObject) metabase.LoopObjectEntry {
|
||||
ObjectStream: m.ObjectStream,
|
||||
ExpiresAt: m.ExpiresAt,
|
||||
SegmentCount: m.SegmentCount,
|
||||
CreatedAt: m.CreatedAt,
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user