cmd/tools: remove migrate-segment-copies tool

Migration was done. We can remove tool now.

https://github.com/storj/storj/issues/5891

Change-Id: I5d56bad1ac680cd77dabfcf271788e100a6a435b
This commit is contained in:
Michal Niewrzal 2023-08-04 10:08:51 +02:00 committed by Michał Niewrzał
parent a5cbec7b3b
commit 7f249ab7ca
2 changed files with 0 additions and 574 deletions

View File

@ -1,250 +0,0 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"context"
"encoding/csv"
"errors"
"os"
"time"
"github.com/spacemonkeygo/monkit/v3"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/uuid"
"storj.io/private/cfgstruct"
"storj.io/private/dbutil/pgutil"
"storj.io/private/process"
"storj.io/private/tagsql"
"storj.io/storj/satellite/metabase"
)
var mon = monkit.Package()
var (
rootCmd = &cobra.Command{
Use: "migrate-segment-copies",
Short: "migrate-segment-copies",
}
runCmd = &cobra.Command{
Use: "run",
Short: "run migrate-segment-copies",
RunE: run,
}
config Config
)
func init() {
rootCmd.AddCommand(runCmd)
cfgstruct.Bind(pflag.CommandLine, &config)
}
// Config defines configuration for migration.
type Config struct {
MetabaseDB string `help:"connection URL for metabaseDB"`
BatchSize int `help:"number of entries from segment_copies processed at once" default:"2000"`
SegmentCopiesBackup string `help:"cvs file where segment copies entries will be backup"`
}
// VerifyFlags verifies whether the values provided are valid.
func (config *Config) VerifyFlags() error {
var errlist errs.Group
if config.MetabaseDB == "" {
errlist.Add(errors.New("flag '--metabasedb' is not set"))
}
return errlist.Err()
}
func run(cmd *cobra.Command, args []string) error {
if err := config.VerifyFlags(); err != nil {
return err
}
ctx, _ := process.Ctx(cmd)
log := zap.L()
return Migrate(ctx, log, config)
}
func main() {
process.Exec(rootCmd)
}
// Migrate starts segment copies migration.
func Migrate(ctx context.Context, log *zap.Logger, config Config) (err error) {
defer mon.Task()(&ctx)(&err)
db, err := metabase.Open(ctx, log, config.MetabaseDB, metabase.Config{
ApplicationName: "migrate-segment-copies",
})
if err != nil {
return errs.New("unable to connect %q: %w", config.MetabaseDB, err)
}
defer func() {
err = errs.Combine(err, db.Close())
}()
return MigrateSegments(ctx, log, db, config)
}
// MigrateSegments updates segment copies with proper metadata (pieces and placment).
func MigrateSegments(ctx context.Context, log *zap.Logger, metabaseDB *metabase.DB, config Config) (err error) {
defer mon.Task()(&ctx)(&err)
var backupCSV *csv.Writer
if config.SegmentCopiesBackup != "" {
f, err := os.Create(config.SegmentCopiesBackup)
if err != nil {
return err
}
defer func() {
err = errs.Combine(err, f.Close())
}()
backupCSV = csv.NewWriter(f)
defer backupCSV.Flush()
if err := backupCSV.Write([]string{"stream_id", "ancestor_stream_id"}); err != nil {
return err
}
}
db := metabaseDB.UnderlyingTagSQL()
var streamIDCursor uuid.UUID
ancestorStreamIDs := []uuid.UUID{}
streamIDs := []uuid.UUID{}
processed := 0
// what we are doing here:
// * read batch of entries from segment_copies table
// * read ancestors (original) segments metadata from segments table
// * update segment copies with missing metadata, one by one
// * delete entries from segment_copies table
for {
log.Info("Processed entries", zap.Int("processed", processed))
ancestorStreamIDs = ancestorStreamIDs[:0]
streamIDs = streamIDs[:0]
idsMap := map[uuid.UUID][]uuid.UUID{}
err := withRows(db.QueryContext(ctx, `
SELECT stream_id, ancestor_stream_id FROM segment_copies WHERE stream_id > $1 ORDER BY stream_id LIMIT $2
`, streamIDCursor, config.BatchSize))(func(rows tagsql.Rows) error {
for rows.Next() {
var streamID, ancestorStreamID uuid.UUID
err := rows.Scan(&streamID, &ancestorStreamID)
if err != nil {
return err
}
streamIDCursor = streamID
ancestorStreamIDs = append(ancestorStreamIDs, ancestorStreamID)
streamIDs = append(streamIDs, streamID)
idsMap[ancestorStreamID] = append(idsMap[ancestorStreamID], streamID)
}
return nil
})
if err != nil {
return err
}
type Update struct {
StreamID uuid.UUID
AncestorStreamID uuid.UUID
Position int64
RemoteAliasPieces []byte
RootPieceID []byte
RepairedAt *time.Time
Placement *int64
}
updates := []Update{}
err = withRows(db.QueryContext(ctx, `
SELECT stream_id, position, remote_alias_pieces, root_piece_id, repaired_at, placement FROM segments WHERE stream_id = ANY($1::BYTEA[])
`, pgutil.UUIDArray(ancestorStreamIDs)))(func(rows tagsql.Rows) error {
for rows.Next() {
var ancestorStreamID uuid.UUID
var position int64
var remoteAliasPieces, rootPieceID []byte
var repairedAt *time.Time
var placement *int64
err := rows.Scan(&ancestorStreamID, &position, &remoteAliasPieces, &rootPieceID, &repairedAt, &placement)
if err != nil {
return err
}
streamIDs, ok := idsMap[ancestorStreamID]
if !ok {
return errs.New("unable to map ancestor stream id: %s", ancestorStreamID)
}
for _, streamID := range streamIDs {
updates = append(updates, Update{
StreamID: streamID,
AncestorStreamID: ancestorStreamID,
Position: position,
RemoteAliasPieces: remoteAliasPieces,
RootPieceID: rootPieceID,
RepairedAt: repairedAt,
Placement: placement,
})
}
}
return nil
})
if err != nil {
return err
}
for _, update := range updates {
_, err := db.ExecContext(ctx, `
UPDATE segments SET
remote_alias_pieces = $3,
root_piece_id = $4,
repaired_at = $5,
placement = $6
WHERE (stream_id, position) = ($1, $2)
`, update.StreamID, update.Position, update.RemoteAliasPieces, update.RootPieceID, update.RepairedAt, update.Placement)
if err != nil {
return err
}
if backupCSV != nil {
if err := backupCSV.Write([]string{update.StreamID.String(), update.AncestorStreamID.String()}); err != nil {
return err
}
}
}
if backupCSV != nil {
backupCSV.Flush()
}
processed += len(streamIDs)
if len(updates) == 0 {
return nil
}
}
}
func withRows(rows tagsql.Rows, err error) func(func(tagsql.Rows) error) error {
return func(callback func(tagsql.Rows) error) error {
if err != nil {
return err
}
err := callback(rows)
return errs.Combine(rows.Err(), rows.Close(), err)
}
}

View File

@ -1,324 +0,0 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package main_test
import (
"os"
"testing"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"storj.io/common/memory"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/common/uuid"
cmd "storj.io/storj/cmd/tools/migrate-segment-copies"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/metabasetest"
)
func TestMigrateSingleCopy(t *testing.T) {
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, metabaseDB *metabase.DB) {
obj := metabasetest.RandObjectStream()
expectedPieces := metabase.Pieces{
{Number: 1, StorageNode: testrand.NodeID()},
{Number: 3, StorageNode: testrand.NodeID()},
}
object, _ := metabasetest.CreateTestObject{
CreateSegment: func(object metabase.Object, index int) metabase.Segment {
metabasetest.CommitSegment{
Opts: metabase.CommitSegment{
ObjectStream: obj,
Position: metabase.SegmentPosition{Part: 0, Index: uint32(index)},
RootPieceID: testrand.PieceID(),
Pieces: expectedPieces,
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
EncryptedETag: []byte{5},
EncryptedSize: 1024,
PlainSize: 512,
PlainOffset: 0,
Redundancy: metabasetest.DefaultRedundancy,
Placement: storj.EEA,
},
}.Check(ctx, t, metabaseDB)
return metabase.Segment{}
},
}.Run(ctx, t, metabaseDB, obj, 50)
copyObject, _, _ := metabasetest.CreateObjectCopy{
OriginalObject: object,
}.Run(ctx, t, metabaseDB, false)
segments, err := metabaseDB.TestingAllSegments(ctx)
require.NoError(t, err)
for _, segment := range segments {
if segment.StreamID == copyObject.StreamID {
require.Len(t, segment.Pieces, 0)
require.Equal(t, storj.EveryCountry, segment.Placement)
}
}
require.NotZero(t, numberOfSegmentCopies(t, ctx, metabaseDB))
err = cmd.MigrateSegments(ctx, zaptest.NewLogger(t), metabaseDB, cmd.Config{
BatchSize: 3,
})
require.NoError(t, err)
segments, err = metabaseDB.TestingAllSegments(ctx)
require.NoError(t, err)
for _, segment := range segments {
require.Equal(t, expectedPieces, segment.Pieces)
require.Equal(t, storj.EEA, segment.Placement)
}
})
}
func TestMigrateManyCopies(t *testing.T) {
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, metabaseDB *metabase.DB) {
obj := metabasetest.RandObjectStream()
expectedPieces := metabase.Pieces{
{Number: 1, StorageNode: testrand.NodeID()},
{Number: 3, StorageNode: testrand.NodeID()},
}
object, _ := metabasetest.CreateTestObject{
CreateSegment: func(object metabase.Object, index int) metabase.Segment {
metabasetest.CommitSegment{
Opts: metabase.CommitSegment{
ObjectStream: obj,
Position: metabase.SegmentPosition{Part: 0, Index: uint32(index)},
RootPieceID: testrand.PieceID(),
Pieces: expectedPieces,
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
EncryptedETag: []byte{5},
EncryptedSize: 1024,
PlainSize: 512,
PlainOffset: 0,
Redundancy: metabasetest.DefaultRedundancy,
Placement: storj.EEA,
},
}.Check(ctx, t, metabaseDB)
return metabase.Segment{}
},
}.Run(ctx, t, metabaseDB, obj, 20)
for i := 0; i < 10; i++ {
copyObject, _, _ := metabasetest.CreateObjectCopy{
OriginalObject: object,
}.Run(ctx, t, metabaseDB, false)
segments, err := metabaseDB.TestingAllSegments(ctx)
require.NoError(t, err)
for _, segment := range segments {
if segment.StreamID == copyObject.StreamID {
require.Len(t, segment.Pieces, 0)
require.Equal(t, storj.EveryCountry, segment.Placement)
}
}
}
require.NotZero(t, numberOfSegmentCopies(t, ctx, metabaseDB))
err := cmd.MigrateSegments(ctx, zaptest.NewLogger(t), metabaseDB, cmd.Config{
BatchSize: 7,
})
require.NoError(t, err)
segments, err := metabaseDB.TestingAllSegments(ctx)
require.NoError(t, err)
for _, segment := range segments {
require.Equal(t, expectedPieces, segment.Pieces)
require.Equal(t, storj.EEA, segment.Placement)
}
})
}
func TestMigrateDifferentSegment(t *testing.T) {
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, metabaseDB *metabase.DB) {
type Segment struct {
StreamID uuid.UUID
Position int64
}
expectedResults := map[Segment]metabase.Pieces{}
createData := func(numberOfObjecsts int, pieces metabase.Pieces) {
for i := 0; i < numberOfObjecsts; i++ {
numberOfSegments := 3
obj := metabasetest.RandObjectStream()
object, _ := metabasetest.CreateTestObject{
CreateSegment: func(object metabase.Object, index int) metabase.Segment {
metabasetest.CommitSegment{
Opts: metabase.CommitSegment{
ObjectStream: obj,
Position: metabase.SegmentPosition{Part: 0, Index: uint32(index)},
RootPieceID: testrand.PieceID(),
Pieces: pieces,
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
EncryptedETag: []byte{5},
EncryptedSize: 1024,
PlainSize: 512,
PlainOffset: 0,
Redundancy: metabasetest.DefaultRedundancy,
Placement: storj.EEA,
},
}.Check(ctx, t, metabaseDB)
return metabase.Segment{}
},
}.Run(ctx, t, metabaseDB, obj, 3)
for n := 0; n < numberOfSegments; n++ {
expectedResults[Segment{
StreamID: object.StreamID,
Position: int64(n),
}] = pieces
}
copyObject, _, _ := metabasetest.CreateObjectCopy{
OriginalObject: object,
}.Run(ctx, t, metabaseDB, false)
for n := 0; n < numberOfSegments; n++ {
expectedResults[Segment{
StreamID: copyObject.StreamID,
Position: int64(n),
}] = pieces
segments, err := metabaseDB.TestingAllSegments(ctx)
require.NoError(t, err)
for _, segment := range segments {
if segment.StreamID == copyObject.StreamID {
require.Len(t, segment.Pieces, 0)
require.Equal(t, storj.EveryCountry, segment.Placement)
}
}
}
}
}
expectedPieces := metabase.Pieces{
{Number: 1, StorageNode: testrand.NodeID()},
{Number: 3, StorageNode: testrand.NodeID()},
}
createData(5, expectedPieces)
expectedPieces = metabase.Pieces{
{Number: 2, StorageNode: testrand.NodeID()},
{Number: 4, StorageNode: testrand.NodeID()},
}
createData(5, expectedPieces)
require.NotZero(t, numberOfSegmentCopies(t, ctx, metabaseDB))
err := cmd.MigrateSegments(ctx, zaptest.NewLogger(t), metabaseDB, cmd.Config{
BatchSize: 7,
})
require.NoError(t, err)
segments, err := metabaseDB.TestingAllSegments(ctx)
require.NoError(t, err)
require.Equal(t, len(expectedResults), len(segments))
for _, segment := range segments {
pieces := expectedResults[Segment{
StreamID: segment.StreamID,
Position: int64(segment.Position.Encode()),
}]
require.Equal(t, pieces, segment.Pieces)
require.Equal(t, storj.EEA, segment.Placement)
}
})
}
func numberOfSegmentCopies(t *testing.T, ctx *testcontext.Context, metabaseDB *metabase.DB) int {
var count int
err := metabaseDB.UnderlyingTagSQL().QueryRow(ctx, "SELECT count(1) FROM segment_copies").Scan(&count)
require.NoError(t, err)
return count
}
func TestMigrateEndToEnd(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
expectedData := testrand.Bytes(10 * memory.KiB)
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "test", "object", expectedData)
require.NoError(t, err)
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
require.NoError(t, err)
defer ctx.Check(project.Close)
_, err = project.CopyObject(ctx, "test", "object", "test", "object-copy", nil)
require.NoError(t, err)
data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], "test", "object-copy")
require.NoError(t, err)
require.Equal(t, expectedData, data)
err = cmd.MigrateSegments(ctx, zaptest.NewLogger(t), planet.Satellites[0].Metabase.DB, cmd.Config{
BatchSize: 1,
})
require.NoError(t, err)
data, err = planet.Uplinks[0].Download(ctx, planet.Satellites[0], "test", "object-copy")
require.NoError(t, err)
require.Equal(t, expectedData, data)
})
}
func TestMigrateBackupCSV(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
expectedData := testrand.Bytes(10 * memory.KiB)
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "test", "object", expectedData)
require.NoError(t, err)
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
require.NoError(t, err)
defer ctx.Check(project.Close)
_, err = project.CopyObject(ctx, "test", "object", "test", "object-copy", nil)
require.NoError(t, err)
data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], "test", "object-copy")
require.NoError(t, err)
require.Equal(t, expectedData, data)
backupFile := ctx.File("backupcsv")
err = cmd.MigrateSegments(ctx, zaptest.NewLogger(t), planet.Satellites[0].Metabase.DB, cmd.Config{
BatchSize: 1,
SegmentCopiesBackup: backupFile,
})
require.NoError(t, err)
data, err = planet.Uplinks[0].Download(ctx, planet.Satellites[0], "test", "object-copy")
require.NoError(t, err)
require.Equal(t, expectedData, data)
fileByes, err := os.ReadFile(backupFile)
require.NoError(t, err)
require.NotEmpty(t, fileByes)
})
}