From 31bb6d54c7e126325e7182c7a2e1d7c5159f4fd2 Mon Sep 17 00:00:00 2001 From: Michal Niewrzal Date: Thu, 6 Jul 2023 14:57:58 +0200 Subject: [PATCH] cmd/tools: add tool to migrate segment copies metadata We need migrate all existing segment copies to contain all the same metadata as original segment. So far we were not duplicating stored pieces but we are changing this behavior right now. We will use this tool after enabling new way of doing server side copies. Fixes https://github.com/storj/storj/issues/5890 Change-Id: Ia9ca12486f3c527abd28949eb438d1c4c7138d55 --- cmd/tools/migrate-segment-copies/main.go | 248 ++++++++++++++ cmd/tools/migrate-segment-copies/main_test.go | 324 ++++++++++++++++++ 2 files changed, 572 insertions(+) create mode 100644 cmd/tools/migrate-segment-copies/main.go create mode 100644 cmd/tools/migrate-segment-copies/main_test.go diff --git a/cmd/tools/migrate-segment-copies/main.go b/cmd/tools/migrate-segment-copies/main.go new file mode 100644 index 000000000..a5337eed7 --- /dev/null +++ b/cmd/tools/migrate-segment-copies/main.go @@ -0,0 +1,248 @@ +// Copyright (C) 2023 Storj Labs, Inc. +// See LICENSE for copying information. + +package main + +import ( + "context" + "encoding/csv" + "errors" + "os" + "time" + + "github.com/spacemonkeygo/monkit/v3" + "github.com/spf13/cobra" + "github.com/spf13/pflag" + "github.com/zeebo/errs" + "go.uber.org/zap" + + "storj.io/common/uuid" + "storj.io/private/cfgstruct" + "storj.io/private/dbutil/pgutil" + "storj.io/private/process" + "storj.io/private/tagsql" + "storj.io/storj/satellite/metabase" +) + +var mon = monkit.Package() + +var ( + rootCmd = &cobra.Command{ + Use: "migrate-segment-copies", + Short: "migrate-segment-copies", + } + + runCmd = &cobra.Command{ + Use: "run", + Short: "run migrate-segment-copies", + RunE: run, + } + + config Config +) + +func init() { + rootCmd.AddCommand(runCmd) + + cfgstruct.Bind(pflag.CommandLine, &config) +} + +// Config defines configuration for migration. +type Config struct { + MetabaseDB string `help:"connection URL for metabaseDB"` + BatchSize int `help:"number of entries from segment_copies processed at once" default:"2000"` + SegmentCopiesBackup string `help:"cvs file where segment copies entries will be backup"` +} + +// VerifyFlags verifies whether the values provided are valid. +func (config *Config) VerifyFlags() error { + var errlist errs.Group + if config.MetabaseDB == "" { + errlist.Add(errors.New("flag '--metabasedb' is not set")) + } + return errlist.Err() +} + +func run(cmd *cobra.Command, args []string) error { + if err := config.VerifyFlags(); err != nil { + return err + } + + ctx, _ := process.Ctx(cmd) + log := zap.L() + return Migrate(ctx, log, config) +} + +func main() { + process.Exec(rootCmd) +} + +// Migrate starts segment copies migration. +func Migrate(ctx context.Context, log *zap.Logger, config Config) (err error) { + defer mon.Task()(&ctx)(&err) + + db, err := metabase.Open(ctx, log, config.MetabaseDB, metabase.Config{}) + if err != nil { + return errs.New("unable to connect %q: %w", config.MetabaseDB, err) + } + defer func() { + err = errs.Combine(err, db.Close()) + }() + + return MigrateSegments(ctx, log, db, config) +} + +// MigrateSegments updates segment copies with proper metadata (pieces and placment). +func MigrateSegments(ctx context.Context, log *zap.Logger, metabaseDB *metabase.DB, config Config) (err error) { + defer mon.Task()(&ctx)(&err) + + var backupCSV *csv.Writer + if config.SegmentCopiesBackup != "" { + f, err := os.Create(config.SegmentCopiesBackup) + if err != nil { + return err + } + + defer func() { + err = errs.Combine(err, f.Close()) + }() + + backupCSV = csv.NewWriter(f) + + defer backupCSV.Flush() + + if err := backupCSV.Write([]string{"stream_id", "ancestor_stream_id"}); err != nil { + return err + } + } + + db := metabaseDB.UnderlyingTagSQL() + + var streamIDCursor uuid.UUID + ancestorStreamIDs := []uuid.UUID{} + streamIDs := []uuid.UUID{} + processed := 0 + + // what we are doing here: + // * read batch of entries from segment_copies table + // * read ancestors (original) segments metadata from segments table + // * update segment copies with missing metadata, one by one + // * delete entries from segment_copies table + for { + log.Info("Processed entries", zap.Int("processed", processed)) + + ancestorStreamIDs = ancestorStreamIDs[:0] + streamIDs = streamIDs[:0] + + idsMap := map[uuid.UUID][]uuid.UUID{} + err := withRows(db.QueryContext(ctx, ` + SELECT stream_id, ancestor_stream_id FROM segment_copies WHERE stream_id > $1 ORDER BY stream_id LIMIT $2 + `, streamIDCursor, config.BatchSize))(func(rows tagsql.Rows) error { + for rows.Next() { + var streamID, ancestorStreamID uuid.UUID + err := rows.Scan(&streamID, &ancestorStreamID) + if err != nil { + return err + } + + streamIDCursor = streamID + ancestorStreamIDs = append(ancestorStreamIDs, ancestorStreamID) + streamIDs = append(streamIDs, streamID) + + idsMap[ancestorStreamID] = append(idsMap[ancestorStreamID], streamID) + } + return nil + }) + if err != nil { + return err + } + + type Update struct { + StreamID uuid.UUID + AncestorStreamID uuid.UUID + Position int64 + RemoteAliasPieces []byte + RootPieceID []byte + RepairedAt *time.Time + Placement int64 + } + + updates := []Update{} + err = withRows(db.QueryContext(ctx, ` + SELECT stream_id, position, remote_alias_pieces, root_piece_id, repaired_at, placement FROM segments WHERE stream_id = ANY($1::BYTEA[]) + `, pgutil.UUIDArray(ancestorStreamIDs)))(func(rows tagsql.Rows) error { + for rows.Next() { + var ancestorStreamID uuid.UUID + var position int64 + var remoteAliasPieces, rootPieceID []byte + var repairedAt *time.Time + var placement int64 + err := rows.Scan(&ancestorStreamID, &position, &remoteAliasPieces, &rootPieceID, &repairedAt, &placement) + if err != nil { + return err + } + + streamIDs, ok := idsMap[ancestorStreamID] + if !ok { + return errs.New("unable to map ancestor stream id: %s", ancestorStreamID) + } + + for _, streamID := range streamIDs { + updates = append(updates, Update{ + StreamID: streamID, + AncestorStreamID: ancestorStreamID, + Position: position, + RemoteAliasPieces: remoteAliasPieces, + RootPieceID: rootPieceID, + RepairedAt: repairedAt, + Placement: placement, + }) + } + } + return nil + }) + if err != nil { + return err + } + + for _, update := range updates { + _, err := db.ExecContext(ctx, ` + UPDATE segments SET + remote_alias_pieces = $3, + root_piece_id = $4, + repaired_at = $5, + placement = $6 + WHERE (stream_id, position) = ($1, $2) + `, update.StreamID, update.Position, update.RemoteAliasPieces, update.RootPieceID, update.RepairedAt, update.Placement) + if err != nil { + return err + } + + if backupCSV != nil { + if err := backupCSV.Write([]string{update.StreamID.String(), update.AncestorStreamID.String()}); err != nil { + return err + } + } + } + + if backupCSV != nil { + backupCSV.Flush() + } + + processed += len(streamIDs) + + if len(updates) == 0 { + return nil + } + } +} + +func withRows(rows tagsql.Rows, err error) func(func(tagsql.Rows) error) error { + return func(callback func(tagsql.Rows) error) error { + if err != nil { + return err + } + err := callback(rows) + return errs.Combine(rows.Err(), rows.Close(), err) + } +} diff --git a/cmd/tools/migrate-segment-copies/main_test.go b/cmd/tools/migrate-segment-copies/main_test.go new file mode 100644 index 000000000..5a15fae16 --- /dev/null +++ b/cmd/tools/migrate-segment-copies/main_test.go @@ -0,0 +1,324 @@ +// Copyright (C) 2023 Storj Labs, Inc. +// See LICENSE for copying information. + +package main_test + +import ( + "os" + "testing" + + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + + "storj.io/common/memory" + "storj.io/common/storj" + "storj.io/common/testcontext" + "storj.io/common/testrand" + "storj.io/common/uuid" + cmd "storj.io/storj/cmd/tools/migrate-segment-copies" + "storj.io/storj/private/testplanet" + "storj.io/storj/satellite/metabase" + "storj.io/storj/satellite/metabase/metabasetest" +) + +func TestMigrateSingleCopy(t *testing.T) { + metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, metabaseDB *metabase.DB) { + obj := metabasetest.RandObjectStream() + + expectedPieces := metabase.Pieces{ + {Number: 1, StorageNode: testrand.NodeID()}, + {Number: 3, StorageNode: testrand.NodeID()}, + } + + object, _ := metabasetest.CreateTestObject{ + CreateSegment: func(object metabase.Object, index int) metabase.Segment { + metabasetest.CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 0, Index: uint32(index)}, + RootPieceID: testrand.PieceID(), + + Pieces: expectedPieces, + + EncryptedKey: []byte{3}, + EncryptedKeyNonce: []byte{4}, + EncryptedETag: []byte{5}, + + EncryptedSize: 1024, + PlainSize: 512, + PlainOffset: 0, + Redundancy: metabasetest.DefaultRedundancy, + Placement: storj.EEA, + }, + }.Check(ctx, t, metabaseDB) + + return metabase.Segment{} + }, + }.Run(ctx, t, metabaseDB, obj, 50) + + copyObject, _, _ := metabasetest.CreateObjectCopy{ + OriginalObject: object, + }.Run(ctx, t, metabaseDB, false) + + segments, err := metabaseDB.TestingAllSegments(ctx) + require.NoError(t, err) + for _, segment := range segments { + if segment.StreamID == copyObject.StreamID { + require.Len(t, segment.Pieces, 0) + require.Equal(t, storj.EveryCountry, segment.Placement) + } + } + + require.NotZero(t, numberOfSegmentCopies(t, ctx, metabaseDB)) + + err = cmd.MigrateSegments(ctx, zaptest.NewLogger(t), metabaseDB, cmd.Config{ + BatchSize: 3, + }) + require.NoError(t, err) + + segments, err = metabaseDB.TestingAllSegments(ctx) + require.NoError(t, err) + for _, segment := range segments { + require.Equal(t, expectedPieces, segment.Pieces) + require.Equal(t, storj.EEA, segment.Placement) + } + }) +} + +func TestMigrateManyCopies(t *testing.T) { + metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, metabaseDB *metabase.DB) { + obj := metabasetest.RandObjectStream() + + expectedPieces := metabase.Pieces{ + {Number: 1, StorageNode: testrand.NodeID()}, + {Number: 3, StorageNode: testrand.NodeID()}, + } + + object, _ := metabasetest.CreateTestObject{ + CreateSegment: func(object metabase.Object, index int) metabase.Segment { + metabasetest.CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 0, Index: uint32(index)}, + RootPieceID: testrand.PieceID(), + + Pieces: expectedPieces, + + EncryptedKey: []byte{3}, + EncryptedKeyNonce: []byte{4}, + EncryptedETag: []byte{5}, + + EncryptedSize: 1024, + PlainSize: 512, + PlainOffset: 0, + Redundancy: metabasetest.DefaultRedundancy, + Placement: storj.EEA, + }, + }.Check(ctx, t, metabaseDB) + + return metabase.Segment{} + }, + }.Run(ctx, t, metabaseDB, obj, 20) + + for i := 0; i < 10; i++ { + copyObject, _, _ := metabasetest.CreateObjectCopy{ + OriginalObject: object, + }.Run(ctx, t, metabaseDB, false) + + segments, err := metabaseDB.TestingAllSegments(ctx) + require.NoError(t, err) + for _, segment := range segments { + if segment.StreamID == copyObject.StreamID { + require.Len(t, segment.Pieces, 0) + require.Equal(t, storj.EveryCountry, segment.Placement) + } + } + } + + require.NotZero(t, numberOfSegmentCopies(t, ctx, metabaseDB)) + + err := cmd.MigrateSegments(ctx, zaptest.NewLogger(t), metabaseDB, cmd.Config{ + BatchSize: 7, + }) + require.NoError(t, err) + + segments, err := metabaseDB.TestingAllSegments(ctx) + require.NoError(t, err) + for _, segment := range segments { + require.Equal(t, expectedPieces, segment.Pieces) + require.Equal(t, storj.EEA, segment.Placement) + } + }) +} + +func TestMigrateDifferentSegment(t *testing.T) { + metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, metabaseDB *metabase.DB) { + type Segment struct { + StreamID uuid.UUID + Position int64 + } + + expectedResults := map[Segment]metabase.Pieces{} + createData := func(numberOfObjecsts int, pieces metabase.Pieces) { + for i := 0; i < numberOfObjecsts; i++ { + numberOfSegments := 3 + obj := metabasetest.RandObjectStream() + object, _ := metabasetest.CreateTestObject{ + CreateSegment: func(object metabase.Object, index int) metabase.Segment { + metabasetest.CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 0, Index: uint32(index)}, + RootPieceID: testrand.PieceID(), + + Pieces: pieces, + + EncryptedKey: []byte{3}, + EncryptedKeyNonce: []byte{4}, + EncryptedETag: []byte{5}, + + EncryptedSize: 1024, + PlainSize: 512, + PlainOffset: 0, + Redundancy: metabasetest.DefaultRedundancy, + Placement: storj.EEA, + }, + }.Check(ctx, t, metabaseDB) + + return metabase.Segment{} + }, + }.Run(ctx, t, metabaseDB, obj, 3) + for n := 0; n < numberOfSegments; n++ { + expectedResults[Segment{ + StreamID: object.StreamID, + Position: int64(n), + }] = pieces + } + + copyObject, _, _ := metabasetest.CreateObjectCopy{ + OriginalObject: object, + }.Run(ctx, t, metabaseDB, false) + + for n := 0; n < numberOfSegments; n++ { + expectedResults[Segment{ + StreamID: copyObject.StreamID, + Position: int64(n), + }] = pieces + + segments, err := metabaseDB.TestingAllSegments(ctx) + require.NoError(t, err) + for _, segment := range segments { + if segment.StreamID == copyObject.StreamID { + require.Len(t, segment.Pieces, 0) + require.Equal(t, storj.EveryCountry, segment.Placement) + } + } + } + } + } + + expectedPieces := metabase.Pieces{ + {Number: 1, StorageNode: testrand.NodeID()}, + {Number: 3, StorageNode: testrand.NodeID()}, + } + createData(5, expectedPieces) + + expectedPieces = metabase.Pieces{ + {Number: 2, StorageNode: testrand.NodeID()}, + {Number: 4, StorageNode: testrand.NodeID()}, + } + createData(5, expectedPieces) + + require.NotZero(t, numberOfSegmentCopies(t, ctx, metabaseDB)) + + err := cmd.MigrateSegments(ctx, zaptest.NewLogger(t), metabaseDB, cmd.Config{ + BatchSize: 7, + }) + require.NoError(t, err) + + segments, err := metabaseDB.TestingAllSegments(ctx) + require.NoError(t, err) + require.Equal(t, len(expectedResults), len(segments)) + for _, segment := range segments { + pieces := expectedResults[Segment{ + StreamID: segment.StreamID, + Position: int64(segment.Position.Encode()), + }] + require.Equal(t, pieces, segment.Pieces) + require.Equal(t, storj.EEA, segment.Placement) + } + }) +} + +func numberOfSegmentCopies(t *testing.T, ctx *testcontext.Context, metabaseDB *metabase.DB) int { + var count int + err := metabaseDB.UnderlyingTagSQL().QueryRow(ctx, "SELECT count(1) FROM segment_copies").Scan(&count) + require.NoError(t, err) + return count +} + +func TestMigrateEndToEnd(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + expectedData := testrand.Bytes(10 * memory.KiB) + err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "test", "object", expectedData) + require.NoError(t, err) + + project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0]) + require.NoError(t, err) + defer ctx.Check(project.Close) + + _, err = project.CopyObject(ctx, "test", "object", "test", "object-copy", nil) + require.NoError(t, err) + + data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], "test", "object-copy") + require.NoError(t, err) + require.Equal(t, expectedData, data) + + err = cmd.MigrateSegments(ctx, zaptest.NewLogger(t), planet.Satellites[0].Metabase.DB, cmd.Config{ + BatchSize: 1, + }) + require.NoError(t, err) + + data, err = planet.Uplinks[0].Download(ctx, planet.Satellites[0], "test", "object-copy") + require.NoError(t, err) + require.Equal(t, expectedData, data) + }) +} + +func TestMigrateBackupCSV(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + expectedData := testrand.Bytes(10 * memory.KiB) + err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "test", "object", expectedData) + require.NoError(t, err) + + project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0]) + require.NoError(t, err) + defer ctx.Check(project.Close) + + _, err = project.CopyObject(ctx, "test", "object", "test", "object-copy", nil) + require.NoError(t, err) + + data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], "test", "object-copy") + require.NoError(t, err) + require.Equal(t, expectedData, data) + + backupFile := ctx.File("backupcsv") + err = cmd.MigrateSegments(ctx, zaptest.NewLogger(t), planet.Satellites[0].Metabase.DB, cmd.Config{ + BatchSize: 1, + SegmentCopiesBackup: backupFile, + }) + require.NoError(t, err) + + data, err = planet.Uplinks[0].Download(ctx, planet.Satellites[0], "test", "object-copy") + require.NoError(t, err) + require.Equal(t, expectedData, data) + + fileByes, err := os.ReadFile(backupFile) + require.NoError(t, err) + require.NotEmpty(t, fileByes) + }) +}