cmd/tools: add tool to migrate segment copies metadata

We need migrate all existing segment copies to contain all the same
metadata as original segment. So far we were not duplicating stored
pieces but we are changing this behavior right now. We will use this
tool after enabling new way of doing server side copies.

Fixes https://github.com/storj/storj/issues/5890

Change-Id: Ia9ca12486f3c527abd28949eb438d1c4c7138d55
This commit is contained in:
Michal Niewrzal 2023-07-06 14:57:58 +02:00 committed by Michał Niewrzał
parent 23631dc8bb
commit 31bb6d54c7
2 changed files with 572 additions and 0 deletions

View File

@ -0,0 +1,248 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"context"
"encoding/csv"
"errors"
"os"
"time"
"github.com/spacemonkeygo/monkit/v3"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/uuid"
"storj.io/private/cfgstruct"
"storj.io/private/dbutil/pgutil"
"storj.io/private/process"
"storj.io/private/tagsql"
"storj.io/storj/satellite/metabase"
)
var mon = monkit.Package()
var (
rootCmd = &cobra.Command{
Use: "migrate-segment-copies",
Short: "migrate-segment-copies",
}
runCmd = &cobra.Command{
Use: "run",
Short: "run migrate-segment-copies",
RunE: run,
}
config Config
)
func init() {
rootCmd.AddCommand(runCmd)
cfgstruct.Bind(pflag.CommandLine, &config)
}
// Config defines configuration for migration.
type Config struct {
MetabaseDB string `help:"connection URL for metabaseDB"`
BatchSize int `help:"number of entries from segment_copies processed at once" default:"2000"`
SegmentCopiesBackup string `help:"cvs file where segment copies entries will be backup"`
}
// VerifyFlags verifies whether the values provided are valid.
func (config *Config) VerifyFlags() error {
var errlist errs.Group
if config.MetabaseDB == "" {
errlist.Add(errors.New("flag '--metabasedb' is not set"))
}
return errlist.Err()
}
func run(cmd *cobra.Command, args []string) error {
if err := config.VerifyFlags(); err != nil {
return err
}
ctx, _ := process.Ctx(cmd)
log := zap.L()
return Migrate(ctx, log, config)
}
func main() {
process.Exec(rootCmd)
}
// Migrate starts segment copies migration.
func Migrate(ctx context.Context, log *zap.Logger, config Config) (err error) {
defer mon.Task()(&ctx)(&err)
db, err := metabase.Open(ctx, log, config.MetabaseDB, metabase.Config{})
if err != nil {
return errs.New("unable to connect %q: %w", config.MetabaseDB, err)
}
defer func() {
err = errs.Combine(err, db.Close())
}()
return MigrateSegments(ctx, log, db, config)
}
// MigrateSegments updates segment copies with proper metadata (pieces and placment).
func MigrateSegments(ctx context.Context, log *zap.Logger, metabaseDB *metabase.DB, config Config) (err error) {
defer mon.Task()(&ctx)(&err)
var backupCSV *csv.Writer
if config.SegmentCopiesBackup != "" {
f, err := os.Create(config.SegmentCopiesBackup)
if err != nil {
return err
}
defer func() {
err = errs.Combine(err, f.Close())
}()
backupCSV = csv.NewWriter(f)
defer backupCSV.Flush()
if err := backupCSV.Write([]string{"stream_id", "ancestor_stream_id"}); err != nil {
return err
}
}
db := metabaseDB.UnderlyingTagSQL()
var streamIDCursor uuid.UUID
ancestorStreamIDs := []uuid.UUID{}
streamIDs := []uuid.UUID{}
processed := 0
// what we are doing here:
// * read batch of entries from segment_copies table
// * read ancestors (original) segments metadata from segments table
// * update segment copies with missing metadata, one by one
// * delete entries from segment_copies table
for {
log.Info("Processed entries", zap.Int("processed", processed))
ancestorStreamIDs = ancestorStreamIDs[:0]
streamIDs = streamIDs[:0]
idsMap := map[uuid.UUID][]uuid.UUID{}
err := withRows(db.QueryContext(ctx, `
SELECT stream_id, ancestor_stream_id FROM segment_copies WHERE stream_id > $1 ORDER BY stream_id LIMIT $2
`, streamIDCursor, config.BatchSize))(func(rows tagsql.Rows) error {
for rows.Next() {
var streamID, ancestorStreamID uuid.UUID
err := rows.Scan(&streamID, &ancestorStreamID)
if err != nil {
return err
}
streamIDCursor = streamID
ancestorStreamIDs = append(ancestorStreamIDs, ancestorStreamID)
streamIDs = append(streamIDs, streamID)
idsMap[ancestorStreamID] = append(idsMap[ancestorStreamID], streamID)
}
return nil
})
if err != nil {
return err
}
type Update struct {
StreamID uuid.UUID
AncestorStreamID uuid.UUID
Position int64
RemoteAliasPieces []byte
RootPieceID []byte
RepairedAt *time.Time
Placement int64
}
updates := []Update{}
err = withRows(db.QueryContext(ctx, `
SELECT stream_id, position, remote_alias_pieces, root_piece_id, repaired_at, placement FROM segments WHERE stream_id = ANY($1::BYTEA[])
`, pgutil.UUIDArray(ancestorStreamIDs)))(func(rows tagsql.Rows) error {
for rows.Next() {
var ancestorStreamID uuid.UUID
var position int64
var remoteAliasPieces, rootPieceID []byte
var repairedAt *time.Time
var placement int64
err := rows.Scan(&ancestorStreamID, &position, &remoteAliasPieces, &rootPieceID, &repairedAt, &placement)
if err != nil {
return err
}
streamIDs, ok := idsMap[ancestorStreamID]
if !ok {
return errs.New("unable to map ancestor stream id: %s", ancestorStreamID)
}
for _, streamID := range streamIDs {
updates = append(updates, Update{
StreamID: streamID,
AncestorStreamID: ancestorStreamID,
Position: position,
RemoteAliasPieces: remoteAliasPieces,
RootPieceID: rootPieceID,
RepairedAt: repairedAt,
Placement: placement,
})
}
}
return nil
})
if err != nil {
return err
}
for _, update := range updates {
_, err := db.ExecContext(ctx, `
UPDATE segments SET
remote_alias_pieces = $3,
root_piece_id = $4,
repaired_at = $5,
placement = $6
WHERE (stream_id, position) = ($1, $2)
`, update.StreamID, update.Position, update.RemoteAliasPieces, update.RootPieceID, update.RepairedAt, update.Placement)
if err != nil {
return err
}
if backupCSV != nil {
if err := backupCSV.Write([]string{update.StreamID.String(), update.AncestorStreamID.String()}); err != nil {
return err
}
}
}
if backupCSV != nil {
backupCSV.Flush()
}
processed += len(streamIDs)
if len(updates) == 0 {
return nil
}
}
}
func withRows(rows tagsql.Rows, err error) func(func(tagsql.Rows) error) error {
return func(callback func(tagsql.Rows) error) error {
if err != nil {
return err
}
err := callback(rows)
return errs.Combine(rows.Err(), rows.Close(), err)
}
}

View File

@ -0,0 +1,324 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package main_test
import (
"os"
"testing"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"storj.io/common/memory"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/common/uuid"
cmd "storj.io/storj/cmd/tools/migrate-segment-copies"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/metabasetest"
)
func TestMigrateSingleCopy(t *testing.T) {
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, metabaseDB *metabase.DB) {
obj := metabasetest.RandObjectStream()
expectedPieces := metabase.Pieces{
{Number: 1, StorageNode: testrand.NodeID()},
{Number: 3, StorageNode: testrand.NodeID()},
}
object, _ := metabasetest.CreateTestObject{
CreateSegment: func(object metabase.Object, index int) metabase.Segment {
metabasetest.CommitSegment{
Opts: metabase.CommitSegment{
ObjectStream: obj,
Position: metabase.SegmentPosition{Part: 0, Index: uint32(index)},
RootPieceID: testrand.PieceID(),
Pieces: expectedPieces,
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
EncryptedETag: []byte{5},
EncryptedSize: 1024,
PlainSize: 512,
PlainOffset: 0,
Redundancy: metabasetest.DefaultRedundancy,
Placement: storj.EEA,
},
}.Check(ctx, t, metabaseDB)
return metabase.Segment{}
},
}.Run(ctx, t, metabaseDB, obj, 50)
copyObject, _, _ := metabasetest.CreateObjectCopy{
OriginalObject: object,
}.Run(ctx, t, metabaseDB, false)
segments, err := metabaseDB.TestingAllSegments(ctx)
require.NoError(t, err)
for _, segment := range segments {
if segment.StreamID == copyObject.StreamID {
require.Len(t, segment.Pieces, 0)
require.Equal(t, storj.EveryCountry, segment.Placement)
}
}
require.NotZero(t, numberOfSegmentCopies(t, ctx, metabaseDB))
err = cmd.MigrateSegments(ctx, zaptest.NewLogger(t), metabaseDB, cmd.Config{
BatchSize: 3,
})
require.NoError(t, err)
segments, err = metabaseDB.TestingAllSegments(ctx)
require.NoError(t, err)
for _, segment := range segments {
require.Equal(t, expectedPieces, segment.Pieces)
require.Equal(t, storj.EEA, segment.Placement)
}
})
}
func TestMigrateManyCopies(t *testing.T) {
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, metabaseDB *metabase.DB) {
obj := metabasetest.RandObjectStream()
expectedPieces := metabase.Pieces{
{Number: 1, StorageNode: testrand.NodeID()},
{Number: 3, StorageNode: testrand.NodeID()},
}
object, _ := metabasetest.CreateTestObject{
CreateSegment: func(object metabase.Object, index int) metabase.Segment {
metabasetest.CommitSegment{
Opts: metabase.CommitSegment{
ObjectStream: obj,
Position: metabase.SegmentPosition{Part: 0, Index: uint32(index)},
RootPieceID: testrand.PieceID(),
Pieces: expectedPieces,
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
EncryptedETag: []byte{5},
EncryptedSize: 1024,
PlainSize: 512,
PlainOffset: 0,
Redundancy: metabasetest.DefaultRedundancy,
Placement: storj.EEA,
},
}.Check(ctx, t, metabaseDB)
return metabase.Segment{}
},
}.Run(ctx, t, metabaseDB, obj, 20)
for i := 0; i < 10; i++ {
copyObject, _, _ := metabasetest.CreateObjectCopy{
OriginalObject: object,
}.Run(ctx, t, metabaseDB, false)
segments, err := metabaseDB.TestingAllSegments(ctx)
require.NoError(t, err)
for _, segment := range segments {
if segment.StreamID == copyObject.StreamID {
require.Len(t, segment.Pieces, 0)
require.Equal(t, storj.EveryCountry, segment.Placement)
}
}
}
require.NotZero(t, numberOfSegmentCopies(t, ctx, metabaseDB))
err := cmd.MigrateSegments(ctx, zaptest.NewLogger(t), metabaseDB, cmd.Config{
BatchSize: 7,
})
require.NoError(t, err)
segments, err := metabaseDB.TestingAllSegments(ctx)
require.NoError(t, err)
for _, segment := range segments {
require.Equal(t, expectedPieces, segment.Pieces)
require.Equal(t, storj.EEA, segment.Placement)
}
})
}
func TestMigrateDifferentSegment(t *testing.T) {
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, metabaseDB *metabase.DB) {
type Segment struct {
StreamID uuid.UUID
Position int64
}
expectedResults := map[Segment]metabase.Pieces{}
createData := func(numberOfObjecsts int, pieces metabase.Pieces) {
for i := 0; i < numberOfObjecsts; i++ {
numberOfSegments := 3
obj := metabasetest.RandObjectStream()
object, _ := metabasetest.CreateTestObject{
CreateSegment: func(object metabase.Object, index int) metabase.Segment {
metabasetest.CommitSegment{
Opts: metabase.CommitSegment{
ObjectStream: obj,
Position: metabase.SegmentPosition{Part: 0, Index: uint32(index)},
RootPieceID: testrand.PieceID(),
Pieces: pieces,
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
EncryptedETag: []byte{5},
EncryptedSize: 1024,
PlainSize: 512,
PlainOffset: 0,
Redundancy: metabasetest.DefaultRedundancy,
Placement: storj.EEA,
},
}.Check(ctx, t, metabaseDB)
return metabase.Segment{}
},
}.Run(ctx, t, metabaseDB, obj, 3)
for n := 0; n < numberOfSegments; n++ {
expectedResults[Segment{
StreamID: object.StreamID,
Position: int64(n),
}] = pieces
}
copyObject, _, _ := metabasetest.CreateObjectCopy{
OriginalObject: object,
}.Run(ctx, t, metabaseDB, false)
for n := 0; n < numberOfSegments; n++ {
expectedResults[Segment{
StreamID: copyObject.StreamID,
Position: int64(n),
}] = pieces
segments, err := metabaseDB.TestingAllSegments(ctx)
require.NoError(t, err)
for _, segment := range segments {
if segment.StreamID == copyObject.StreamID {
require.Len(t, segment.Pieces, 0)
require.Equal(t, storj.EveryCountry, segment.Placement)
}
}
}
}
}
expectedPieces := metabase.Pieces{
{Number: 1, StorageNode: testrand.NodeID()},
{Number: 3, StorageNode: testrand.NodeID()},
}
createData(5, expectedPieces)
expectedPieces = metabase.Pieces{
{Number: 2, StorageNode: testrand.NodeID()},
{Number: 4, StorageNode: testrand.NodeID()},
}
createData(5, expectedPieces)
require.NotZero(t, numberOfSegmentCopies(t, ctx, metabaseDB))
err := cmd.MigrateSegments(ctx, zaptest.NewLogger(t), metabaseDB, cmd.Config{
BatchSize: 7,
})
require.NoError(t, err)
segments, err := metabaseDB.TestingAllSegments(ctx)
require.NoError(t, err)
require.Equal(t, len(expectedResults), len(segments))
for _, segment := range segments {
pieces := expectedResults[Segment{
StreamID: segment.StreamID,
Position: int64(segment.Position.Encode()),
}]
require.Equal(t, pieces, segment.Pieces)
require.Equal(t, storj.EEA, segment.Placement)
}
})
}
func numberOfSegmentCopies(t *testing.T, ctx *testcontext.Context, metabaseDB *metabase.DB) int {
var count int
err := metabaseDB.UnderlyingTagSQL().QueryRow(ctx, "SELECT count(1) FROM segment_copies").Scan(&count)
require.NoError(t, err)
return count
}
func TestMigrateEndToEnd(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
expectedData := testrand.Bytes(10 * memory.KiB)
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "test", "object", expectedData)
require.NoError(t, err)
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
require.NoError(t, err)
defer ctx.Check(project.Close)
_, err = project.CopyObject(ctx, "test", "object", "test", "object-copy", nil)
require.NoError(t, err)
data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], "test", "object-copy")
require.NoError(t, err)
require.Equal(t, expectedData, data)
err = cmd.MigrateSegments(ctx, zaptest.NewLogger(t), planet.Satellites[0].Metabase.DB, cmd.Config{
BatchSize: 1,
})
require.NoError(t, err)
data, err = planet.Uplinks[0].Download(ctx, planet.Satellites[0], "test", "object-copy")
require.NoError(t, err)
require.Equal(t, expectedData, data)
})
}
func TestMigrateBackupCSV(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
expectedData := testrand.Bytes(10 * memory.KiB)
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "test", "object", expectedData)
require.NoError(t, err)
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
require.NoError(t, err)
defer ctx.Check(project.Close)
_, err = project.CopyObject(ctx, "test", "object", "test", "object-copy", nil)
require.NoError(t, err)
data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], "test", "object-copy")
require.NoError(t, err)
require.Equal(t, expectedData, data)
backupFile := ctx.File("backupcsv")
err = cmd.MigrateSegments(ctx, zaptest.NewLogger(t), planet.Satellites[0].Metabase.DB, cmd.Config{
BatchSize: 1,
SegmentCopiesBackup: backupFile,
})
require.NoError(t, err)
data, err = planet.Uplinks[0].Download(ctx, planet.Satellites[0], "test", "object-copy")
require.NoError(t, err)
require.Equal(t, expectedData, data)
fileByes, err := os.ReadFile(backupFile)
require.NoError(t, err)
require.NotEmpty(t, fileByes)
})
}