diff --git a/Jenkinsfile.public b/Jenkinsfile.public index 9d18fb98d..1e9e7c8f0 100644 --- a/Jenkinsfile.public +++ b/Jenkinsfile.public @@ -167,7 +167,9 @@ pipeline { steps { sh 'psql -U postgres -c \'create database teststorj3;\'' - sh 'make test-sim-backwards-compatible' + catchError { + sh 'make test-sim-backwards-compatible' + } } } @@ -182,7 +184,9 @@ pipeline { steps { sh 'cockroach sql --insecure --host=localhost:26257 -e \'create database testcockroach5;\'' - sh 'make test-sim-backwards-compatible' + catchError { + sh 'make test-sim-backwards-compatible' + } sh 'cockroach sql --insecure --host=localhost:26257 -e \'drop database testcockroach5;\'' } } diff --git a/satellite/metainfo/metabase/alias.go b/satellite/metainfo/metabase/alias.go index c99bd004b..3c0e72fe5 100644 --- a/satellite/metainfo/metabase/alias.go +++ b/satellite/metainfo/metabase/alias.go @@ -10,6 +10,7 @@ import ( "storj.io/common/storj" "storj.io/storj/private/dbutil/pgutil" + "storj.io/storj/private/tagsql" ) // NodeAlias is a metabase local alias for NodeID-s to reduce segment table size. @@ -42,11 +43,7 @@ func (db *DB) EnsureNodeAliases(ctx context.Context, opts EnsureNodeAliases) (er SELECT unnest($1::BYTEA[]) ON CONFLICT DO NOTHING `, pgutil.NodeIDArray(opts.Nodes)) - if err != nil { - return Error.Wrap(err) - } - - return nil + return Error.Wrap(err) } // ListNodeAliases lists all node alias mappings. @@ -77,3 +74,57 @@ func (db *DB) ListNodeAliases(ctx context.Context) (_ []NodeAliasEntry, err erro return aliases, nil } + +// txNodeAliases is used inside a migration. +// This will be removed once the migration has been completed. +type txNodeAliases struct { + db tagsql.Tx +} + +// EnsureNodeAliases ensures that the supplied node ID-s have a alias. +// It's safe to concurrently try and create node ID-s for the same NodeID. +func (db *txNodeAliases) EnsureNodeAliases(ctx context.Context, opts EnsureNodeAliases) (err error) { + defer mon.Task()(&ctx)(&err) + + for _, node := range opts.Nodes { + if node.IsZero() { + return Error.New("tried to add alias to zero node") + } + } + + _, err = db.db.ExecContext(ctx, ` + INSERT INTO node_aliases(node_id) + SELECT unnest($1::BYTEA[]) + ON CONFLICT DO NOTHING + `, pgutil.NodeIDArray(opts.Nodes)) + return Error.Wrap(err) +} + +// ListNodeAliases lists all node alias mappings. +func (db *txNodeAliases) ListNodeAliases(ctx context.Context) (_ []NodeAliasEntry, err error) { + defer mon.Task()(&ctx)(&err) + + var aliases []NodeAliasEntry + rows, err := db.db.Query(ctx, ` + SELECT node_id, node_alias + FROM node_aliases + `) + if err != nil { + return nil, Error.New("ListNodeAliases query: %w", err) + } + defer func() { err = errs.Combine(err, rows.Close()) }() + + for rows.Next() { + var entry NodeAliasEntry + err := rows.Scan(&entry.ID, &entry.Alias) + if err != nil { + return nil, Error.New("ListNodeAliases scan failed: %w", err) + } + aliases = append(aliases, entry) + } + if err := rows.Err(); err != nil { + return nil, Error.New("ListNodeAliases scan failed: %w", err) + } + + return aliases, nil +} diff --git a/satellite/metainfo/metabase/aliascache.go b/satellite/metainfo/metabase/aliascache.go index 90ca2457e..097886417 100644 --- a/satellite/metainfo/metabase/aliascache.go +++ b/satellite/metainfo/metabase/aliascache.go @@ -126,6 +126,52 @@ func (cache *NodeAliasCache) refresh(ctx context.Context) (_ *NodeAliasMap, err return cache.latest, nil } +// ConvertPiecesToAliases converts pieces to alias pieces. +func (cache *NodeAliasCache) ConvertPiecesToAliases(ctx context.Context, pieces Pieces) (_ AliasPieces, err error) { + defer mon.Task()(&ctx)(&err) + + nodes := make([]storj.NodeID, len(pieces)) + for i, p := range pieces { + nodes[i] = p.StorageNode + } + + aliases, err := cache.Aliases(ctx, nodes) + if err != nil { + return nil, Error.Wrap(err) + } + + aliasPieces := make(AliasPieces, len(aliases)) + for i, alias := range aliases { + aliasPieces[i] = AliasPiece{ + Number: pieces[i].Number, + Alias: alias, + } + } + + return aliasPieces, nil +} + +// ConvertAliasesToPieces converts alias pieces to pieces. +func (cache *NodeAliasCache) ConvertAliasesToPieces(ctx context.Context, aliasPieces AliasPieces) (_ Pieces, err error) { + defer mon.Task()(&ctx)(&err) + + aliases := make([]NodeAlias, len(aliasPieces)) + pieces := make(Pieces, len(aliasPieces)) + for i, aliasPiece := range aliasPieces { + pieces[i].Number, aliases[i] = aliasPiece.Number, aliasPiece.Alias + } + + nodes, err := cache.Nodes(ctx, aliases) + if err != nil { + return nil, Error.Wrap(err) + } + for i, n := range nodes { + pieces[i].StorageNode = n + } + + return pieces, nil +} + // NodeAliasMap contains bidirectional mapping between node ID and a NodeAlias. type NodeAliasMap struct { node map[NodeAlias]storj.NodeID diff --git a/satellite/metainfo/metabase/bench_test.go b/satellite/metainfo/metabase/bench_test.go index 7bb7e03bc..3d5f58336 100644 --- a/satellite/metainfo/metabase/bench_test.go +++ b/satellite/metainfo/metabase/bench_test.go @@ -94,6 +94,9 @@ func (s *scenario) run(ctx *testcontext.Context, b *testing.B, db *metabase.DB) // wipe data so we can do the exact same test b.StopTimer() DeleteAll{}.Check(ctx, b, db) + if err := db.EnsureNodeAliases(ctx, metabase.EnsureNodeAliases{Nodes: nodes}); err != nil { + require.NoError(b, err) + } b.StartTimer() s.objectStream = nil diff --git a/satellite/metainfo/metabase/commit.go b/satellite/metainfo/metabase/commit.go index 89a8fcf05..5e00f396f 100644 --- a/satellite/metainfo/metabase/commit.go +++ b/satellite/metainfo/metabase/commit.go @@ -257,6 +257,11 @@ func (db *DB) CommitSegment(ctx context.Context, opts CommitSegment) (err error) // TODO: verify opts.Pieces is compatible with opts.Redundancy + aliasPieces, err := db.aliasCache.ConvertPiecesToAliases(ctx, opts.Pieces) + if err != nil { + return Error.New("unable to convert pieces to aliases: %w", err) + } + // Verify that object exists and is partial. _, err = db.db.ExecContext(ctx, ` INSERT INTO segments ( @@ -264,7 +269,7 @@ func (db *DB) CommitSegment(ctx context.Context, opts CommitSegment) (err error) root_piece_id, encrypted_key_nonce, encrypted_key, encrypted_size, plain_offset, plain_size, redundancy, - remote_pieces + remote_alias_pieces ) VALUES ( (SELECT stream_id FROM objects WHERE @@ -283,7 +288,7 @@ func (db *DB) CommitSegment(ctx context.Context, opts CommitSegment) (err error) opts.RootPieceID, opts.EncryptedKeyNonce, opts.EncryptedKey, opts.EncryptedSize, opts.PlainOffset, opts.PlainSize, redundancyScheme{&opts.Redundancy}, - opts.Pieces, + aliasPieces, opts.ProjectID, opts.BucketName, []byte(opts.ObjectKey), opts.Version, opts.StreamID, ) if err != nil { diff --git a/satellite/metainfo/metabase/commit_object.go b/satellite/metainfo/metabase/commit_object.go index 6ea6b46eb..c8aeebaa2 100644 --- a/satellite/metainfo/metabase/commit_object.go +++ b/satellite/metainfo/metabase/commit_object.go @@ -60,7 +60,7 @@ func (db *DB) CommitObjectWithSegments(ctx context.Context, opts CommitObjectWit return err } - deletedSegments, err = deleteSegmentsNotInCommit(ctx, tx, opts.StreamID, segmentsToDelete) + deletedSegments, err = db.deleteSegmentsNotInCommit(ctx, tx, opts.StreamID, segmentsToDelete) if err != nil { return err } @@ -291,7 +291,7 @@ func updateSegmentOffsets(ctx context.Context, tx tagsql.Tx, streamID uuid.UUID, } // deleteSegmentsNotInCommit deletes the listed segments inside the tx. -func deleteSegmentsNotInCommit(ctx context.Context, tx tagsql.Tx, streamID uuid.UUID, segments []SegmentPosition) (deletedSegments []DeletedSegmentInfo, err error) { +func (db *DB) deleteSegmentsNotInCommit(ctx context.Context, tx tagsql.Tx, streamID uuid.UUID, segments []SegmentPosition) (deletedSegments []DeletedSegmentInfo, err error) { defer mon.Task()(&ctx)(&err) if len(segments) == 0 { return nil, nil @@ -306,11 +306,12 @@ func deleteSegmentsNotInCommit(ctx context.Context, tx tagsql.Tx, streamID uuid. err = withRows(tx.Query(ctx, ` DELETE FROM segments WHERE stream_id = $1 AND position = ANY($2) - RETURNING root_piece_id, remote_pieces + RETURNING root_piece_id, remote_alias_pieces `, streamID, pgutil.Int8Array(positions)))(func(rows tagsql.Rows) error { for rows.Next() { var deleted DeletedSegmentInfo - err := rows.Scan(&deleted.RootPieceID, &deleted.Pieces) + var aliasPieces AliasPieces + err := rows.Scan(&deleted.RootPieceID, &aliasPieces) if err != nil { return Error.New("failed to scan segments: %w", err) } @@ -318,6 +319,11 @@ func deleteSegmentsNotInCommit(ctx context.Context, tx tagsql.Tx, streamID uuid. if deleted.RootPieceID.IsZero() { continue } + + deleted.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces) + if err != nil { + return Error.New("failed to convert aliases: %w", err) + } deletedSegments = append(deletedSegments, deleted) } return nil diff --git a/satellite/metainfo/metabase/common.go b/satellite/metainfo/metabase/common.go index 00c58af5f..522d74963 100644 --- a/satellite/metainfo/metabase/common.go +++ b/satellite/metainfo/metabase/common.go @@ -295,6 +295,12 @@ const ( // Pieces defines information for pieces. type Pieces []Piece +// Piece defines information for a segment piece. +type Piece struct { + Number uint16 + StorageNode storj.NodeID +} + // Verify verifies pieces. func (p Pieces) Verify() error { if len(p) == 0 { @@ -360,9 +366,3 @@ func (p Pieces) Less(i, j int) bool { return p[i].Number < p[j].Number } // Swap swaps the pieces with indexes i and j. func (p Pieces) Swap(i, j int) { p[i], p[j] = p[j], p[i] } - -// Piece defines information for a segment piece. -type Piece struct { - Number uint16 - StorageNode storj.NodeID -} diff --git a/satellite/metainfo/metabase/db.go b/satellite/metainfo/metabase/db.go index 1fd53cb81..ecc9dab21 100644 --- a/satellite/metainfo/metabase/db.go +++ b/satellite/metainfo/metabase/db.go @@ -6,13 +6,17 @@ package metabase import ( "context" + "sort" "strconv" _ "github.com/jackc/pgx/v4" // registers pgx as a tagsql driver. _ "github.com/jackc/pgx/v4/stdlib" // registers pgx as a tagsql driver. "github.com/spacemonkeygo/monkit/v3" + "github.com/zeebo/errs" "go.uber.org/zap" + "storj.io/common/storj" + "storj.io/common/uuid" "storj.io/storj/private/dbutil" "storj.io/storj/private/migrate" "storj.io/storj/private/tagsql" @@ -26,23 +30,31 @@ var ( type DB struct { log *zap.Logger db tagsql.DB + + aliasCache *NodeAliasCache } // Open opens a connection to metabase. func Open(ctx context.Context, log *zap.Logger, driverName, connstr string) (*DB, error) { - db, err := tagsql.Open(ctx, driverName, connstr) + rawdb, err := tagsql.Open(ctx, driverName, connstr) if err != nil { return nil, Error.Wrap(err) } - dbutil.Configure(ctx, db, "metabase", mon) + dbutil.Configure(ctx, rawdb, "metabase", mon) - return &DB{log: log, db: postgresRebind{db}}, nil + db := &DB{log: log, db: postgresRebind{rawdb}} + db.aliasCache = NewNodeAliasCache(db) + return db, nil } -// InternalImplementation returns *metabase.DB +// InternalImplementation returns *metabase.DB. // TODO: remove. func (db *DB) InternalImplementation() interface{} { return db } +// UnderlyingTagSQL returns *tagsql.DB. +// TODO: remove. +func (db *DB) UnderlyingTagSQL() tagsql.DB { return db.db } + // Ping checks whether connection has been established. func (db *DB) Ping(ctx context.Context) error { return Error.Wrap(db.db.PingContext(ctx)) @@ -63,6 +75,7 @@ func (db *DB) DestroyTables(ctx context.Context) error { DROP TABLE IF EXISTS node_aliases; DROP SEQUENCE IF EXISTS node_alias_seq; `) + db.aliasCache = NewNodeAliasCache(db) return Error.Wrap(err) } @@ -159,6 +172,105 @@ func (db *DB) PostgresMigration() *migrate.Migration { )`, }, }, + { + DB: &db.db, + Description: "add remote_alias_pieces column", + Version: 4, + Action: migrate.SQL{ + `ALTER TABLE segments ADD COLUMN remote_alias_pieces BYTEA`, + }, + }, + { + DB: &db.db, + Description: "convert remote_pieces to remote_alias_pieces", + Version: 5, + Action: migrate.Func(func(ctx context.Context, log *zap.Logger, db tagsql.DB, tx tagsql.Tx) error { + type segmentPieces struct { + StreamID uuid.UUID + Position SegmentPosition + RemotePieces Pieces + } + + var allSegments []segmentPieces + + err := withRows(tx.QueryContext(ctx, `SELECT stream_id, position, remote_pieces FROM segments WHERE remote_pieces IS NOT NULL`))( + func(rows tagsql.Rows) error { + for rows.Next() { + var seg segmentPieces + if err := rows.Scan(&seg.StreamID, &seg.Position, &seg.RemotePieces); err != nil { + return Error.Wrap(err) + } + allSegments = append(allSegments, seg) + } + return nil + }) + if err != nil { + return Error.Wrap(err) + } + + allNodes := map[storj.NodeID]struct{}{} + for i := range allSegments { + seg := &allSegments[i] + for k := range seg.RemotePieces { + p := &seg.RemotePieces[k] + allNodes[p.StorageNode] = struct{}{} + } + } + + nodesList := []storj.NodeID{} + for id := range allNodes { + nodesList = append(nodesList, id) + } + aliasCache := NewNodeAliasCache(&txNodeAliases{tx}) + _, err = aliasCache.Aliases(ctx, nodesList) + if err != nil { + return Error.Wrap(err) + } + + err = func() (err error) { + stmt, err := tx.PrepareContext(ctx, `UPDATE segments SET remote_alias_pieces = $3 WHERE stream_id = $1 AND position = $2`) + if err != nil { + return Error.Wrap(err) + } + defer func() { err = errs.Combine(err, Error.Wrap(stmt.Close())) }() + + for i := range allSegments { + seg := &allSegments[i] + if len(seg.RemotePieces) == 0 { + continue + } + + aliases, err := aliasCache.ConvertPiecesToAliases(ctx, seg.RemotePieces) + if err != nil { + return Error.Wrap(err) + } + sort.Slice(aliases, func(i, k int) bool { + return aliases[i].Number < aliases[k].Number + }) + + _, err = stmt.ExecContext(ctx, seg.StreamID, seg.Position, aliases) + if err != nil { + return Error.Wrap(err) + } + } + + return nil + }() + if err != nil { + return err + } + + return nil + }), + }, + { + DB: &db.db, + Description: "drop remote_pieces from segments table", + Version: 6, + Action: migrate.SQL{ + `ALTER TABLE segments DROP COLUMN remote_pieces`, + }, + }, }, } } diff --git a/satellite/metainfo/metabase/db_migrate_test.go b/satellite/metainfo/metabase/db_migrate_test.go new file mode 100644 index 000000000..e5f35f1f5 --- /dev/null +++ b/satellite/metainfo/metabase/db_migrate_test.go @@ -0,0 +1,117 @@ +// Copyright (C) 2021 Storj Labs, Inc. +// See LICENSE for copying information. + +package metabase_test + +import ( + "sort" + "testing" + + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + + "storj.io/common/testcontext" + "storj.io/common/testrand" + "storj.io/common/uuid" + "storj.io/storj/satellite/metainfo/metabase" + "storj.io/storj/satellite/satellitedb/satellitedbtest" +) + +func TestMigrateToAliases(t *testing.T) { + for _, info := range databaseInfos() { + info := info + t.Run(info.name, func(t *testing.T) { + t.Parallel() + + ctx := testcontext.New(t) + defer ctx.Cleanup() + + db, err := satellitedbtest.CreateMetabaseDB(ctx, zaptest.NewLogger(t), t.Name(), "M", 0, satellitedbtest.Database{ + Name: info.name, + URL: info.connstr, + Message: "", + }) + require.NoError(t, err) + defer ctx.Check(db.Close) + + mdb := db.InternalImplementation().(*metabase.DB) + + allMigrations := mdb.PostgresMigration() + + beforeAliases := allMigrations.TargetVersion(2) + err = beforeAliases.Run(ctx, zaptest.NewLogger(t)) + require.NoError(t, err) + + rawdb := mdb.UnderlyingTagSQL() + require.NotNil(t, rawdb) + + type segmentEntry struct { + StreamID uuid.UUID + Position metabase.SegmentPosition + Pieces metabase.Pieces + } + + s1, s2 := testrand.UUID(), testrand.UUID() + n1, n2, n3 := testrand.NodeID(), testrand.NodeID(), testrand.NodeID() + + entries := []segmentEntry{ + { + StreamID: s1, + Position: metabase.SegmentPosition{Index: 1}, + Pieces: metabase.Pieces{{1, n1}, {2, n2}}, + }, + { + StreamID: s1, + Position: metabase.SegmentPosition{Part: 1, Index: 2}, + Pieces: metabase.Pieces{{3, n3}, {2, n2}}, + }, + { + StreamID: s2, + Position: metabase.SegmentPosition{Part: 1, Index: 0}, + Pieces: metabase.Pieces{{1, n1}}, + }, + { + StreamID: s2, + Position: metabase.SegmentPosition{Part: 1, Index: 1}, + Pieces: metabase.Pieces{}, + }, + } + + for _, e := range entries { + _, err = rawdb.ExecContext(ctx, ` + INSERT INTO segments ( + stream_id, position, remote_pieces, + root_piece_id, encrypted_key_nonce, encrypted_key, + encrypted_size, plain_offset, plain_size, redundancy + ) VALUES ( + $1, $2, $3, + $4, $5, $6, + $7, $8, $9, $10 + )`, + e.StreamID, e.Position, e.Pieces, + // mock values + testrand.PieceID(), []byte{1, 2}, []byte{1, 2}, + 100, 100, 100, int64(0x10), + ) + require.NoError(t, err) + } + + err = allMigrations.Run(ctx, zaptest.NewLogger(t)) + require.NoError(t, err) + + for _, e := range entries { + seg, err := db.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: e.StreamID, + Position: e.Position, + }) + require.NoError(t, err) + + sortedPieces := e.Pieces + sort.Slice(sortedPieces, func(i, k int) bool { + return sortedPieces[i].Number < sortedPieces[k].Number + }) + require.Equal(t, sortedPieces, seg.Pieces) + } + }) + } +} diff --git a/satellite/metainfo/metabase/delete.go b/satellite/metainfo/metabase/delete.go index 27d37350d..b76c05757 100644 --- a/satellite/metainfo/metabase/delete.go +++ b/satellite/metainfo/metabase/delete.go @@ -135,7 +135,7 @@ func (db *DB) DeleteObjectExactVersion(ctx context.Context, opts DeleteObjectExa return storj.ErrObjectNotFound.Wrap(Error.New("no rows deleted")) } - segmentInfos, err := deleteSegments(ctx, tx, result.Objects) + segmentInfos, err := db.deleteSegments(ctx, tx, result.Objects) if err != nil { return err } @@ -215,7 +215,7 @@ func (db *DB) DeletePendingObject(ctx context.Context, opts DeletePendingObject) return storj.ErrObjectNotFound.Wrap(Error.New("no rows deleted")) } - segmentInfos, err := deleteSegments(ctx, tx, result.Objects) + segmentInfos, err := db.deleteSegments(ctx, tx, result.Objects) if err != nil { return err } @@ -294,7 +294,7 @@ func (db *DB) DeleteObjectLatestVersion(ctx context.Context, opts DeleteObjectLa return storj.ErrObjectNotFound.Wrap(Error.New("no rows deleted")) } - segmentInfos, err := deleteSegments(ctx, tx, result.Objects) + segmentInfos, err := db.deleteSegments(ctx, tx, result.Objects) if err != nil { return err } @@ -349,7 +349,7 @@ func (db *DB) DeleteObjectAnyStatusAllVersions(ctx context.Context, opts DeleteO return storj.ErrObjectNotFound.Wrap(Error.New("no rows deleted")) } - segmentInfos, err := deleteSegments(ctx, tx, result.Objects) + segmentInfos, err := db.deleteSegments(ctx, tx, result.Objects) if err != nil { return err } @@ -428,7 +428,7 @@ func (db *DB) DeleteObjectsAllVersions(ctx context.Context, opts DeleteObjectsAl return nil } - segmentInfos, err := deleteSegments(ctx, tx, result.Objects) + segmentInfos, err := db.deleteSegments(ctx, tx, result.Objects) if err != nil { return err } @@ -504,7 +504,7 @@ func scanMultipleObjectsDeletion(rows tagsql.Rows) (objects []Object, err error) return objects, nil } -func deleteSegments(ctx context.Context, tx tagsql.Tx, objects []Object) (_ []DeletedSegmentInfo, err error) { +func (db *DB) deleteSegments(ctx context.Context, tx tagsql.Tx, objects []Object) (_ []DeletedSegmentInfo, err error) { defer mon.Task()(&ctx)(&err) // TODO we need to figure out how integrate this with piece deletion code @@ -524,7 +524,7 @@ func deleteSegments(ctx context.Context, tx tagsql.Tx, objects []Object) (_ []De segmentsRows, err := tx.Query(ctx, ` DELETE FROM segments WHERE stream_id = ANY ($1) - RETURNING root_piece_id, remote_pieces; + RETURNING root_piece_id, remote_alias_pieces; `, pgutil.ByteaArray(streamIDs)) if err != nil { return []DeletedSegmentInfo{}, Error.New("unable to delete object: %w", err) @@ -534,11 +534,16 @@ func deleteSegments(ctx context.Context, tx tagsql.Tx, objects []Object) (_ []De infos := make([]DeletedSegmentInfo, 0, len(objects)) for segmentsRows.Next() { var segmentInfo DeletedSegmentInfo - err = segmentsRows.Scan(&segmentInfo.RootPieceID, &segmentInfo.Pieces) + var aliasPieces AliasPieces + err = segmentsRows.Scan(&segmentInfo.RootPieceID, &aliasPieces) if err != nil { return []DeletedSegmentInfo{}, Error.New("unable to delete object: %w", err) } + segmentInfo.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces) + if err != nil { + return []DeletedSegmentInfo{}, Error.New("failed to convert aliases: %w", err) + } if len(segmentInfo.Pieces) != 0 { infos = append(infos, segmentInfo) } diff --git a/satellite/metainfo/metabase/delete_bucket.go b/satellite/metainfo/metabase/delete_bucket.go index 897702b8b..2f86e96fe 100644 --- a/satellite/metainfo/metabase/delete_bucket.go +++ b/satellite/metainfo/metabase/delete_bucket.go @@ -54,16 +54,22 @@ func (db *DB) DeleteBucketObjects(ctx context.Context, opts DeleteBucketObjects) ) DELETE FROM segments WHERE segments.stream_id in (SELECT deleted_objects.stream_id FROM deleted_objects) - RETURNING segments.stream_id, segments.root_piece_id, segments.remote_pieces + RETURNING segments.stream_id, segments.root_piece_id, segments.remote_alias_pieces `, opts.Bucket.ProjectID, opts.Bucket.BucketName, batchSize))(func(rows tagsql.Rows) error { ids := map[uuid.UUID]struct{}{} // TODO: avoid map here for rows.Next() { var streamID uuid.UUID var segment DeletedSegmentInfo - err := rows.Scan(&streamID, &segment.RootPieceID, &segment.Pieces) + var aliasPieces AliasPieces + err := rows.Scan(&streamID, &segment.RootPieceID, &aliasPieces) if err != nil { return Error.Wrap(err) } + segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces) + if err != nil { + return Error.Wrap(err) + } + ids[streamID] = struct{}{} deleteSegments = append(deleteSegments, segment) } diff --git a/satellite/metainfo/metabase/get.go b/satellite/metainfo/metabase/get.go index d0350bc0e..6d75a6d55 100644 --- a/satellite/metainfo/metabase/get.go +++ b/satellite/metainfo/metabase/get.go @@ -168,13 +168,14 @@ func (db *DB) GetSegmentByLocation(ctx context.Context, opts GetSegmentByLocatio return Segment{}, err } + var aliasPieces AliasPieces err = db.db.QueryRow(ctx, ` SELECT stream_id, root_piece_id, encrypted_key_nonce, encrypted_key, encrypted_size, plain_offset, plain_size, redundancy, - inline_data, remote_pieces + inline_data, remote_alias_pieces FROM segments WHERE stream_id IN (SELECT stream_id FROM objects WHERE @@ -191,7 +192,7 @@ func (db *DB) GetSegmentByLocation(ctx context.Context, opts GetSegmentByLocatio &segment.RootPieceID, &segment.EncryptedKeyNonce, &segment.EncryptedKey, &segment.EncryptedSize, &segment.PlainOffset, &segment.PlainSize, redundancyScheme{&segment.Redundancy}, - &segment.InlineData, &segment.Pieces, + &segment.InlineData, &aliasPieces, ) if err != nil { if errors.Is(err, sql.ErrNoRows) { @@ -200,6 +201,10 @@ func (db *DB) GetSegmentByLocation(ctx context.Context, opts GetSegmentByLocatio return Segment{}, Error.New("unable to query segment: %w", err) } + segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces) + if err != nil { + return Segment{}, Error.New("unable to convert aliases to pieces: %w", err) + } segment.Position = opts.Position return segment, nil @@ -227,12 +232,13 @@ func (db *DB) GetSegmentByPosition(ctx context.Context, opts GetSegmentByPositio return Segment{}, err } + var aliasPieces AliasPieces err = db.db.QueryRow(ctx, ` SELECT root_piece_id, encrypted_key_nonce, encrypted_key, encrypted_size, plain_offset, plain_size, redundancy, - inline_data, remote_pieces + inline_data, remote_alias_pieces FROM segments WHERE stream_id = $1 AND @@ -242,7 +248,7 @@ func (db *DB) GetSegmentByPosition(ctx context.Context, opts GetSegmentByPositio &segment.RootPieceID, &segment.EncryptedKeyNonce, &segment.EncryptedKey, &segment.EncryptedSize, &segment.PlainOffset, &segment.PlainSize, redundancyScheme{&segment.Redundancy}, - &segment.InlineData, &segment.Pieces, + &segment.InlineData, &aliasPieces, ) if err != nil { if errors.Is(err, sql.ErrNoRows) { @@ -251,6 +257,11 @@ func (db *DB) GetSegmentByPosition(ctx context.Context, opts GetSegmentByPositio return Segment{}, Error.New("unable to query segment: %w", err) } + segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces) + if err != nil { + return Segment{}, Error.New("unable to convert aliases to pieces: %w", err) + } + segment.StreamID = opts.StreamID segment.Position = opts.Position @@ -270,13 +281,14 @@ func (db *DB) GetLatestObjectLastSegment(ctx context.Context, opts GetLatestObje return Segment{}, err } + var aliasPieces AliasPieces err = db.db.QueryRow(ctx, ` SELECT stream_id, position, root_piece_id, encrypted_key_nonce, encrypted_key, encrypted_size, plain_offset, plain_size, redundancy, - inline_data, remote_pieces + inline_data, remote_alias_pieces FROM segments WHERE stream_id IN (SELECT stream_id FROM objects WHERE @@ -295,7 +307,7 @@ func (db *DB) GetLatestObjectLastSegment(ctx context.Context, opts GetLatestObje &segment.RootPieceID, &segment.EncryptedKeyNonce, &segment.EncryptedKey, &segment.EncryptedSize, &segment.PlainOffset, &segment.PlainSize, redundancyScheme{&segment.Redundancy}, - &segment.InlineData, &segment.Pieces, + &segment.InlineData, &aliasPieces, ) if err != nil { if errors.Is(err, sql.ErrNoRows) { @@ -304,6 +316,11 @@ func (db *DB) GetLatestObjectLastSegment(ctx context.Context, opts GetLatestObje return Segment{}, Error.New("unable to query segment: %w", err) } + segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces) + if err != nil { + return Segment{}, Error.New("unable to convert aliases to pieces: %w", err) + } + return segment, nil } @@ -325,13 +342,14 @@ func (db *DB) GetSegmentByOffset(ctx context.Context, opts GetSegmentByOffset) ( return Segment{}, ErrInvalidRequest.New("Invalid PlainOffset: %d", opts.PlainOffset) } + var aliasPieces AliasPieces err = db.db.QueryRow(ctx, ` SELECT stream_id, position, root_piece_id, encrypted_key_nonce, encrypted_key, encrypted_size, plain_offset, plain_size, redundancy, - inline_data, remote_pieces + inline_data, remote_alias_pieces FROM segments WHERE stream_id IN (SELECT stream_id FROM objects WHERE @@ -352,7 +370,7 @@ func (db *DB) GetSegmentByOffset(ctx context.Context, opts GetSegmentByOffset) ( &segment.RootPieceID, &segment.EncryptedKeyNonce, &segment.EncryptedKey, &segment.EncryptedSize, &segment.PlainOffset, &segment.PlainSize, redundancyScheme{&segment.Redundancy}, - &segment.InlineData, &segment.Pieces, + &segment.InlineData, &aliasPieces, ) if err != nil { if errors.Is(err, sql.ErrNoRows) { @@ -361,6 +379,11 @@ func (db *DB) GetSegmentByOffset(ctx context.Context, opts GetSegmentByOffset) ( return Segment{}, Error.New("unable to query segment: %w", err) } + segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces) + if err != nil { + return Segment{}, Error.New("unable to convert aliases to pieces: %w", err) + } + return segment, nil } diff --git a/satellite/metainfo/metabase/list_segments.go b/satellite/metainfo/metabase/list_segments.go index aeae4eacf..46f41a324 100644 --- a/satellite/metainfo/metabase/list_segments.go +++ b/satellite/metainfo/metabase/list_segments.go @@ -47,7 +47,7 @@ func (db *DB) ListSegments(ctx context.Context, opts ListSegments) (result ListS root_piece_id, encrypted_key_nonce, encrypted_key, encrypted_size, plain_offset, plain_size, redundancy, - inline_data, remote_pieces + inline_data, remote_alias_pieces FROM segments WHERE stream_id = $1 AND @@ -57,17 +57,23 @@ func (db *DB) ListSegments(ctx context.Context, opts ListSegments) (result ListS `, opts.StreamID, opts.Cursor, opts.Limit+1))(func(rows tagsql.Rows) error { for rows.Next() { var segment Segment + var aliasPieces AliasPieces err = rows.Scan( &segment.Position, &segment.RootPieceID, &segment.EncryptedKeyNonce, &segment.EncryptedKey, &segment.EncryptedSize, &segment.PlainOffset, &segment.PlainSize, redundancyScheme{&segment.Redundancy}, - &segment.InlineData, &segment.Pieces, + &segment.InlineData, &aliasPieces, ) if err != nil { return Error.New("failed to scan segments: %w", err) } + segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces) + if err != nil { + return Error.New("failed to convert aliases to pieces: %w", err) + } + segment.StreamID = opts.StreamID result.Segments = append(result.Segments, segment) } diff --git a/satellite/metainfo/metabase/raw.go b/satellite/metainfo/metabase/raw.go index 9beea55e0..48bbfddfd 100644 --- a/satellite/metainfo/metabase/raw.go +++ b/satellite/metainfo/metabase/raw.go @@ -90,6 +90,7 @@ func (db *DB) TestingDeleteAll(ctx context.Context) (err error) { DELETE FROM node_aliases; SELECT setval('node_alias_seq', 1, false); `) + db.aliasCache = NewNodeAliasCache(db) return Error.Wrap(err) } @@ -165,7 +166,7 @@ func (db *DB) testingGetAllSegments(ctx context.Context) (_ []RawSegment, err er encrypted_size, plain_offset, plain_size, redundancy, - inline_data, remote_pieces + inline_data, remote_alias_pieces FROM segments ORDER BY stream_id ASC, position ASC `) @@ -175,6 +176,7 @@ func (db *DB) testingGetAllSegments(ctx context.Context) (_ []RawSegment, err er defer func() { err = errs.Combine(err, rows.Close()) }() for rows.Next() { var seg RawSegment + var aliasPieces AliasPieces err := rows.Scan( &seg.StreamID, &seg.Position, @@ -190,11 +192,17 @@ func (db *DB) testingGetAllSegments(ctx context.Context) (_ []RawSegment, err er redundancyScheme{&seg.Redundancy}, &seg.InlineData, - &seg.Pieces, + &aliasPieces, ) if err != nil { return nil, Error.New("testingGetAllSegments scan failed: %w", err) } + + seg.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces) + if err != nil { + return nil, Error.New("testingGetAllSegments convert aliases to pieces failed: %w", err) + } + segs = append(segs, seg) } if err := rows.Err(); err != nil { diff --git a/satellite/metainfo/metabase/update.go b/satellite/metainfo/metabase/update.go index 5caa88846..6afddf681 100644 --- a/satellite/metainfo/metabase/update.go +++ b/satellite/metainfo/metabase/update.go @@ -46,18 +46,28 @@ func (db *DB) UpdateSegmentPieces(ctx context.Context, opts UpdateSegmentPieces) return err } - var pieces Pieces + oldPieces, err := db.aliasCache.ConvertPiecesToAliases(ctx, opts.OldPieces) + if err != nil { + return Error.New("unable to convert pieces to aliases: %w", err) + } + + newPieces, err := db.aliasCache.ConvertPiecesToAliases(ctx, opts.NewPieces) + if err != nil { + return Error.New("unable to convert pieces to aliases: %w", err) + } + + var resultPieces AliasPieces err = db.db.QueryRow(ctx, ` UPDATE segments SET - remote_pieces = CASE - WHEN remote_pieces = $3 THEN $4 - ELSE remote_pieces + remote_alias_pieces = CASE + WHEN remote_alias_pieces = $3 THEN $4 + ELSE remote_alias_pieces END WHERE stream_id = $1 AND position = $2 - RETURNING remote_pieces - `, opts.StreamID, opts.Position, opts.OldPieces, opts.NewPieces).Scan(&pieces) + RETURNING remote_alias_pieces + `, opts.StreamID, opts.Position, oldPieces, newPieces).Scan(&resultPieces) if err != nil { if errors.Is(err, sql.ErrNoRows) { return ErrSegmentNotFound.New("segment missing") @@ -65,8 +75,8 @@ func (db *DB) UpdateSegmentPieces(ctx context.Context, opts UpdateSegmentPieces) return Error.New("unable to update segment pieces: %w", err) } - if !opts.NewPieces.Equal(pieces) { - return storage.ErrValueChanged.New("segment remote_pieces field was changed") + if !EqualAliasPieces(newPieces, resultPieces) { + return storage.ErrValueChanged.New("segment remote_alias_pieces field was changed") } return nil diff --git a/satellite/metainfo/metabase/update_test.go b/satellite/metainfo/metabase/update_test.go index 2cde39344..3bf747175 100644 --- a/satellite/metainfo/metabase/update_test.go +++ b/satellite/metainfo/metabase/update_test.go @@ -227,7 +227,7 @@ func TestUpdateSegmentPieces(t *testing.T) { }, }, ErrClass: &storage.ErrValueChanged, - ErrText: "segment remote_pieces field was changed", + ErrText: "segment remote_alias_pieces field was changed", }.Check(ctx, t, db) })