satellite/metabase: drop alias migration code
We have migrated all of the satellites and we shouldn't keep dead-code around. Change-Id: I539d6766cfafa2f278ff7767ceb2d39f6777ace3
This commit is contained in:
parent
2af7e4ef26
commit
6e6051b172
@ -10,7 +10,6 @@ import (
|
||||
|
||||
"storj.io/common/storj"
|
||||
"storj.io/private/dbutil/pgutil"
|
||||
"storj.io/private/tagsql"
|
||||
)
|
||||
|
||||
// NodeAlias is a metabase local alias for NodeID-s to reduce segment table size.
|
||||
@ -74,57 +73,3 @@ func (db *DB) ListNodeAliases(ctx context.Context) (_ []NodeAliasEntry, err erro
|
||||
|
||||
return aliases, nil
|
||||
}
|
||||
|
||||
// txNodeAliases is used inside a migration.
|
||||
// This will be removed once the migration has been completed.
|
||||
type txNodeAliases struct {
|
||||
db tagsql.Tx
|
||||
}
|
||||
|
||||
// EnsureNodeAliases ensures that the supplied node ID-s have a alias.
|
||||
// It's safe to concurrently try and create node ID-s for the same NodeID.
|
||||
func (db *txNodeAliases) EnsureNodeAliases(ctx context.Context, opts EnsureNodeAliases) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
for _, node := range opts.Nodes {
|
||||
if node.IsZero() {
|
||||
return Error.New("tried to add alias to zero node")
|
||||
}
|
||||
}
|
||||
|
||||
_, err = db.db.ExecContext(ctx, `
|
||||
INSERT INTO node_aliases(node_id)
|
||||
SELECT unnest($1::BYTEA[])
|
||||
ON CONFLICT DO NOTHING
|
||||
`, pgutil.NodeIDArray(opts.Nodes))
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
// ListNodeAliases lists all node alias mappings.
|
||||
func (db *txNodeAliases) ListNodeAliases(ctx context.Context) (_ []NodeAliasEntry, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
var aliases []NodeAliasEntry
|
||||
rows, err := db.db.Query(ctx, `
|
||||
SELECT node_id, node_alias
|
||||
FROM node_aliases
|
||||
`)
|
||||
if err != nil {
|
||||
return nil, Error.New("ListNodeAliases query: %w", err)
|
||||
}
|
||||
defer func() { err = errs.Combine(err, rows.Close()) }()
|
||||
|
||||
for rows.Next() {
|
||||
var entry NodeAliasEntry
|
||||
err := rows.Scan(&entry.ID, &entry.Alias)
|
||||
if err != nil {
|
||||
return nil, Error.New("ListNodeAliases scan failed: %w", err)
|
||||
}
|
||||
aliases = append(aliases, entry)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, Error.New("ListNodeAliases scan failed: %w", err)
|
||||
}
|
||||
|
||||
return aliases, nil
|
||||
}
|
||||
|
@ -7,7 +7,6 @@ package metabase
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
@ -17,8 +16,6 @@ import (
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/private/dbutil"
|
||||
"storj.io/private/dbutil/pgutil"
|
||||
"storj.io/private/tagsql"
|
||||
@ -227,89 +224,6 @@ func (db *DB) PostgresMigration() *migrate.Migration {
|
||||
`ALTER TABLE segments ADD COLUMN remote_alias_pieces BYTEA`,
|
||||
},
|
||||
},
|
||||
{
|
||||
DB: &db.db,
|
||||
Description: "convert remote_pieces to remote_alias_pieces",
|
||||
Version: 5,
|
||||
Action: migrate.Func(func(ctx context.Context, log *zap.Logger, db tagsql.DB, tx tagsql.Tx) error {
|
||||
type segmentPieces struct {
|
||||
StreamID uuid.UUID
|
||||
Position SegmentPosition
|
||||
RemotePieces Pieces
|
||||
}
|
||||
|
||||
var allSegments []segmentPieces
|
||||
|
||||
err := withRows(tx.QueryContext(ctx, `SELECT stream_id, position, remote_pieces FROM segments WHERE remote_pieces IS NOT NULL`))(
|
||||
func(rows tagsql.Rows) error {
|
||||
for rows.Next() {
|
||||
var seg segmentPieces
|
||||
if err := rows.Scan(&seg.StreamID, &seg.Position, &seg.RemotePieces); err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
allSegments = append(allSegments, seg)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
allNodes := map[storj.NodeID]struct{}{}
|
||||
for i := range allSegments {
|
||||
seg := &allSegments[i]
|
||||
for k := range seg.RemotePieces {
|
||||
p := &seg.RemotePieces[k]
|
||||
allNodes[p.StorageNode] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
nodesList := []storj.NodeID{}
|
||||
for id := range allNodes {
|
||||
nodesList = append(nodesList, id)
|
||||
}
|
||||
aliasCache := NewNodeAliasCache(&txNodeAliases{tx})
|
||||
_, err = aliasCache.Aliases(ctx, nodesList)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
err = func() (err error) {
|
||||
stmt, err := tx.PrepareContext(ctx, `UPDATE segments SET remote_alias_pieces = $3 WHERE stream_id = $1 AND position = $2`)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
defer func() { err = errs.Combine(err, Error.Wrap(stmt.Close())) }()
|
||||
|
||||
for i := range allSegments {
|
||||
seg := &allSegments[i]
|
||||
if len(seg.RemotePieces) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
aliases, err := aliasCache.ConvertPiecesToAliases(ctx, seg.RemotePieces)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
sort.Slice(aliases, func(i, k int) bool {
|
||||
return aliases[i].Number < aliases[k].Number
|
||||
})
|
||||
|
||||
_, err = stmt.ExecContext(ctx, seg.StreamID, seg.Position, aliases)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}),
|
||||
},
|
||||
{
|
||||
DB: &db.db,
|
||||
Description: "drop remote_pieces from segments table",
|
||||
|
@ -4,120 +4,16 @@
|
||||
package metabase_test
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/testrand"
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
"storj.io/storj/satellite/metabase/metabasetest"
|
||||
"storj.io/storj/satellite/satellitedb/satellitedbtest"
|
||||
)
|
||||
|
||||
func TestMigrateToAliases(t *testing.T) {
|
||||
for _, info := range metabasetest.DatabaseEntries() {
|
||||
info := info
|
||||
t.Run(info.Name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
db, err := satellitedbtest.CreateMetabaseDB(ctx, zaptest.NewLogger(t), t.Name(), "M", 0, satellitedbtest.Database{
|
||||
Name: info.Name,
|
||||
URL: info.ConnStr,
|
||||
Message: "",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(db.Close)
|
||||
|
||||
mdb := db.InternalImplementation().(*metabase.DB)
|
||||
|
||||
allMigrations := mdb.PostgresMigration()
|
||||
|
||||
beforeAliases := allMigrations.TargetVersion(2)
|
||||
err = beforeAliases.Run(ctx, zaptest.NewLogger(t))
|
||||
require.NoError(t, err)
|
||||
|
||||
rawdb := mdb.UnderlyingTagSQL()
|
||||
require.NotNil(t, rawdb)
|
||||
|
||||
type segmentEntry struct {
|
||||
StreamID uuid.UUID
|
||||
Position metabase.SegmentPosition
|
||||
Pieces metabase.Pieces
|
||||
}
|
||||
|
||||
s1, s2 := testrand.UUID(), testrand.UUID()
|
||||
n1, n2, n3 := testrand.NodeID(), testrand.NodeID(), testrand.NodeID()
|
||||
|
||||
entries := []segmentEntry{
|
||||
{
|
||||
StreamID: s1,
|
||||
Position: metabase.SegmentPosition{Index: 1},
|
||||
Pieces: metabase.Pieces{{1, n1}, {2, n2}},
|
||||
},
|
||||
{
|
||||
StreamID: s1,
|
||||
Position: metabase.SegmentPosition{Part: 1, Index: 2},
|
||||
Pieces: metabase.Pieces{{3, n3}, {2, n2}},
|
||||
},
|
||||
{
|
||||
StreamID: s2,
|
||||
Position: metabase.SegmentPosition{Part: 1, Index: 0},
|
||||
Pieces: metabase.Pieces{{1, n1}},
|
||||
},
|
||||
{
|
||||
StreamID: s2,
|
||||
Position: metabase.SegmentPosition{Part: 1, Index: 1},
|
||||
Pieces: metabase.Pieces{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, e := range entries {
|
||||
_, err = rawdb.ExecContext(ctx, `
|
||||
INSERT INTO segments (
|
||||
stream_id, position, remote_pieces,
|
||||
root_piece_id, encrypted_key_nonce, encrypted_key,
|
||||
encrypted_size, plain_offset, plain_size, redundancy
|
||||
) VALUES (
|
||||
$1, $2, $3,
|
||||
$4, $5, $6,
|
||||
$7, $8, $9, $10
|
||||
)`,
|
||||
e.StreamID, e.Position, e.Pieces,
|
||||
// mock values
|
||||
testrand.PieceID(), []byte{1, 2}, []byte{1, 2},
|
||||
100, 100, 100, int64(0x10),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
err = allMigrations.Run(ctx, zaptest.NewLogger(t))
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, e := range entries {
|
||||
seg, err := db.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
||||
StreamID: e.StreamID,
|
||||
Position: e.Position,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
sortedPieces := e.Pieces
|
||||
sort.Slice(sortedPieces, func(i, k int) bool {
|
||||
return sortedPieces[i].Number < sortedPieces[k].Number
|
||||
})
|
||||
require.Equal(t, sortedPieces, seg.Pieces)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestNow(t *testing.T) {
|
||||
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||
sysnow := time.Now()
|
||||
|
Loading…
Reference in New Issue
Block a user