satellite/metainfo/metabase: use alias pieces in segments table

This makes all tables automatically convert between aliases and piece
ID-s.

Change-Id: I27fa42c82bbb09e05e3327f85e13a000b48faffd
This commit is contained in:
Egon Elbre 2021-02-08 11:33:45 +02:00
parent 25f81f353c
commit 2848bf488f
16 changed files with 456 additions and 54 deletions

View File

@ -167,7 +167,9 @@ pipeline {
steps {
sh 'psql -U postgres -c \'create database teststorj3;\''
sh 'make test-sim-backwards-compatible'
catchError {
sh 'make test-sim-backwards-compatible'
}
}
}
@ -182,7 +184,9 @@ pipeline {
steps {
sh 'cockroach sql --insecure --host=localhost:26257 -e \'create database testcockroach5;\''
sh 'make test-sim-backwards-compatible'
catchError {
sh 'make test-sim-backwards-compatible'
}
sh 'cockroach sql --insecure --host=localhost:26257 -e \'drop database testcockroach5;\''
}
}

View File

@ -10,6 +10,7 @@ import (
"storj.io/common/storj"
"storj.io/storj/private/dbutil/pgutil"
"storj.io/storj/private/tagsql"
)
// NodeAlias is a metabase local alias for NodeID-s to reduce segment table size.
@ -42,11 +43,7 @@ func (db *DB) EnsureNodeAliases(ctx context.Context, opts EnsureNodeAliases) (er
SELECT unnest($1::BYTEA[])
ON CONFLICT DO NOTHING
`, pgutil.NodeIDArray(opts.Nodes))
if err != nil {
return Error.Wrap(err)
}
return nil
return Error.Wrap(err)
}
// ListNodeAliases lists all node alias mappings.
@ -77,3 +74,57 @@ func (db *DB) ListNodeAliases(ctx context.Context) (_ []NodeAliasEntry, err erro
return aliases, nil
}
// txNodeAliases is used inside a migration.
// This will be removed once the migration has been completed.
type txNodeAliases struct {
db tagsql.Tx
}
// EnsureNodeAliases ensures that the supplied node ID-s have a alias.
// It's safe to concurrently try and create node ID-s for the same NodeID.
func (db *txNodeAliases) EnsureNodeAliases(ctx context.Context, opts EnsureNodeAliases) (err error) {
defer mon.Task()(&ctx)(&err)
for _, node := range opts.Nodes {
if node.IsZero() {
return Error.New("tried to add alias to zero node")
}
}
_, err = db.db.ExecContext(ctx, `
INSERT INTO node_aliases(node_id)
SELECT unnest($1::BYTEA[])
ON CONFLICT DO NOTHING
`, pgutil.NodeIDArray(opts.Nodes))
return Error.Wrap(err)
}
// ListNodeAliases lists all node alias mappings.
func (db *txNodeAliases) ListNodeAliases(ctx context.Context) (_ []NodeAliasEntry, err error) {
defer mon.Task()(&ctx)(&err)
var aliases []NodeAliasEntry
rows, err := db.db.Query(ctx, `
SELECT node_id, node_alias
FROM node_aliases
`)
if err != nil {
return nil, Error.New("ListNodeAliases query: %w", err)
}
defer func() { err = errs.Combine(err, rows.Close()) }()
for rows.Next() {
var entry NodeAliasEntry
err := rows.Scan(&entry.ID, &entry.Alias)
if err != nil {
return nil, Error.New("ListNodeAliases scan failed: %w", err)
}
aliases = append(aliases, entry)
}
if err := rows.Err(); err != nil {
return nil, Error.New("ListNodeAliases scan failed: %w", err)
}
return aliases, nil
}

View File

@ -126,6 +126,52 @@ func (cache *NodeAliasCache) refresh(ctx context.Context) (_ *NodeAliasMap, err
return cache.latest, nil
}
// ConvertPiecesToAliases converts pieces to alias pieces.
func (cache *NodeAliasCache) ConvertPiecesToAliases(ctx context.Context, pieces Pieces) (_ AliasPieces, err error) {
defer mon.Task()(&ctx)(&err)
nodes := make([]storj.NodeID, len(pieces))
for i, p := range pieces {
nodes[i] = p.StorageNode
}
aliases, err := cache.Aliases(ctx, nodes)
if err != nil {
return nil, Error.Wrap(err)
}
aliasPieces := make(AliasPieces, len(aliases))
for i, alias := range aliases {
aliasPieces[i] = AliasPiece{
Number: pieces[i].Number,
Alias: alias,
}
}
return aliasPieces, nil
}
// ConvertAliasesToPieces converts alias pieces to pieces.
func (cache *NodeAliasCache) ConvertAliasesToPieces(ctx context.Context, aliasPieces AliasPieces) (_ Pieces, err error) {
defer mon.Task()(&ctx)(&err)
aliases := make([]NodeAlias, len(aliasPieces))
pieces := make(Pieces, len(aliasPieces))
for i, aliasPiece := range aliasPieces {
pieces[i].Number, aliases[i] = aliasPiece.Number, aliasPiece.Alias
}
nodes, err := cache.Nodes(ctx, aliases)
if err != nil {
return nil, Error.Wrap(err)
}
for i, n := range nodes {
pieces[i].StorageNode = n
}
return pieces, nil
}
// NodeAliasMap contains bidirectional mapping between node ID and a NodeAlias.
type NodeAliasMap struct {
node map[NodeAlias]storj.NodeID

View File

@ -94,6 +94,9 @@ func (s *scenario) run(ctx *testcontext.Context, b *testing.B, db *metabase.DB)
// wipe data so we can do the exact same test
b.StopTimer()
DeleteAll{}.Check(ctx, b, db)
if err := db.EnsureNodeAliases(ctx, metabase.EnsureNodeAliases{Nodes: nodes}); err != nil {
require.NoError(b, err)
}
b.StartTimer()
s.objectStream = nil

View File

@ -257,6 +257,11 @@ func (db *DB) CommitSegment(ctx context.Context, opts CommitSegment) (err error)
// TODO: verify opts.Pieces is compatible with opts.Redundancy
aliasPieces, err := db.aliasCache.ConvertPiecesToAliases(ctx, opts.Pieces)
if err != nil {
return Error.New("unable to convert pieces to aliases: %w", err)
}
// Verify that object exists and is partial.
_, err = db.db.ExecContext(ctx, `
INSERT INTO segments (
@ -264,7 +269,7 @@ func (db *DB) CommitSegment(ctx context.Context, opts CommitSegment) (err error)
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size, plain_offset, plain_size,
redundancy,
remote_pieces
remote_alias_pieces
) VALUES (
(SELECT stream_id
FROM objects WHERE
@ -283,7 +288,7 @@ func (db *DB) CommitSegment(ctx context.Context, opts CommitSegment) (err error)
opts.RootPieceID, opts.EncryptedKeyNonce, opts.EncryptedKey,
opts.EncryptedSize, opts.PlainOffset, opts.PlainSize,
redundancyScheme{&opts.Redundancy},
opts.Pieces,
aliasPieces,
opts.ProjectID, opts.BucketName, []byte(opts.ObjectKey), opts.Version, opts.StreamID,
)
if err != nil {

View File

@ -60,7 +60,7 @@ func (db *DB) CommitObjectWithSegments(ctx context.Context, opts CommitObjectWit
return err
}
deletedSegments, err = deleteSegmentsNotInCommit(ctx, tx, opts.StreamID, segmentsToDelete)
deletedSegments, err = db.deleteSegmentsNotInCommit(ctx, tx, opts.StreamID, segmentsToDelete)
if err != nil {
return err
}
@ -291,7 +291,7 @@ func updateSegmentOffsets(ctx context.Context, tx tagsql.Tx, streamID uuid.UUID,
}
// deleteSegmentsNotInCommit deletes the listed segments inside the tx.
func deleteSegmentsNotInCommit(ctx context.Context, tx tagsql.Tx, streamID uuid.UUID, segments []SegmentPosition) (deletedSegments []DeletedSegmentInfo, err error) {
func (db *DB) deleteSegmentsNotInCommit(ctx context.Context, tx tagsql.Tx, streamID uuid.UUID, segments []SegmentPosition) (deletedSegments []DeletedSegmentInfo, err error) {
defer mon.Task()(&ctx)(&err)
if len(segments) == 0 {
return nil, nil
@ -306,11 +306,12 @@ func deleteSegmentsNotInCommit(ctx context.Context, tx tagsql.Tx, streamID uuid.
err = withRows(tx.Query(ctx, `
DELETE FROM segments
WHERE stream_id = $1 AND position = ANY($2)
RETURNING root_piece_id, remote_pieces
RETURNING root_piece_id, remote_alias_pieces
`, streamID, pgutil.Int8Array(positions)))(func(rows tagsql.Rows) error {
for rows.Next() {
var deleted DeletedSegmentInfo
err := rows.Scan(&deleted.RootPieceID, &deleted.Pieces)
var aliasPieces AliasPieces
err := rows.Scan(&deleted.RootPieceID, &aliasPieces)
if err != nil {
return Error.New("failed to scan segments: %w", err)
}
@ -318,6 +319,11 @@ func deleteSegmentsNotInCommit(ctx context.Context, tx tagsql.Tx, streamID uuid.
if deleted.RootPieceID.IsZero() {
continue
}
deleted.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
if err != nil {
return Error.New("failed to convert aliases: %w", err)
}
deletedSegments = append(deletedSegments, deleted)
}
return nil

View File

@ -295,6 +295,12 @@ const (
// Pieces defines information for pieces.
type Pieces []Piece
// Piece defines information for a segment piece.
type Piece struct {
Number uint16
StorageNode storj.NodeID
}
// Verify verifies pieces.
func (p Pieces) Verify() error {
if len(p) == 0 {
@ -360,9 +366,3 @@ func (p Pieces) Less(i, j int) bool { return p[i].Number < p[j].Number }
// Swap swaps the pieces with indexes i and j.
func (p Pieces) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
// Piece defines information for a segment piece.
type Piece struct {
Number uint16
StorageNode storj.NodeID
}

View File

@ -6,13 +6,17 @@ package metabase
import (
"context"
"sort"
"strconv"
_ "github.com/jackc/pgx/v4" // registers pgx as a tagsql driver.
_ "github.com/jackc/pgx/v4/stdlib" // registers pgx as a tagsql driver.
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/storj"
"storj.io/common/uuid"
"storj.io/storj/private/dbutil"
"storj.io/storj/private/migrate"
"storj.io/storj/private/tagsql"
@ -26,23 +30,31 @@ var (
type DB struct {
log *zap.Logger
db tagsql.DB
aliasCache *NodeAliasCache
}
// Open opens a connection to metabase.
func Open(ctx context.Context, log *zap.Logger, driverName, connstr string) (*DB, error) {
db, err := tagsql.Open(ctx, driverName, connstr)
rawdb, err := tagsql.Open(ctx, driverName, connstr)
if err != nil {
return nil, Error.Wrap(err)
}
dbutil.Configure(ctx, db, "metabase", mon)
dbutil.Configure(ctx, rawdb, "metabase", mon)
return &DB{log: log, db: postgresRebind{db}}, nil
db := &DB{log: log, db: postgresRebind{rawdb}}
db.aliasCache = NewNodeAliasCache(db)
return db, nil
}
// InternalImplementation returns *metabase.DB
// InternalImplementation returns *metabase.DB.
// TODO: remove.
func (db *DB) InternalImplementation() interface{} { return db }
// UnderlyingTagSQL returns *tagsql.DB.
// TODO: remove.
func (db *DB) UnderlyingTagSQL() tagsql.DB { return db.db }
// Ping checks whether connection has been established.
func (db *DB) Ping(ctx context.Context) error {
return Error.Wrap(db.db.PingContext(ctx))
@ -63,6 +75,7 @@ func (db *DB) DestroyTables(ctx context.Context) error {
DROP TABLE IF EXISTS node_aliases;
DROP SEQUENCE IF EXISTS node_alias_seq;
`)
db.aliasCache = NewNodeAliasCache(db)
return Error.Wrap(err)
}
@ -159,6 +172,105 @@ func (db *DB) PostgresMigration() *migrate.Migration {
)`,
},
},
{
DB: &db.db,
Description: "add remote_alias_pieces column",
Version: 4,
Action: migrate.SQL{
`ALTER TABLE segments ADD COLUMN remote_alias_pieces BYTEA`,
},
},
{
DB: &db.db,
Description: "convert remote_pieces to remote_alias_pieces",
Version: 5,
Action: migrate.Func(func(ctx context.Context, log *zap.Logger, db tagsql.DB, tx tagsql.Tx) error {
type segmentPieces struct {
StreamID uuid.UUID
Position SegmentPosition
RemotePieces Pieces
}
var allSegments []segmentPieces
err := withRows(tx.QueryContext(ctx, `SELECT stream_id, position, remote_pieces FROM segments WHERE remote_pieces IS NOT NULL`))(
func(rows tagsql.Rows) error {
for rows.Next() {
var seg segmentPieces
if err := rows.Scan(&seg.StreamID, &seg.Position, &seg.RemotePieces); err != nil {
return Error.Wrap(err)
}
allSegments = append(allSegments, seg)
}
return nil
})
if err != nil {
return Error.Wrap(err)
}
allNodes := map[storj.NodeID]struct{}{}
for i := range allSegments {
seg := &allSegments[i]
for k := range seg.RemotePieces {
p := &seg.RemotePieces[k]
allNodes[p.StorageNode] = struct{}{}
}
}
nodesList := []storj.NodeID{}
for id := range allNodes {
nodesList = append(nodesList, id)
}
aliasCache := NewNodeAliasCache(&txNodeAliases{tx})
_, err = aliasCache.Aliases(ctx, nodesList)
if err != nil {
return Error.Wrap(err)
}
err = func() (err error) {
stmt, err := tx.PrepareContext(ctx, `UPDATE segments SET remote_alias_pieces = $3 WHERE stream_id = $1 AND position = $2`)
if err != nil {
return Error.Wrap(err)
}
defer func() { err = errs.Combine(err, Error.Wrap(stmt.Close())) }()
for i := range allSegments {
seg := &allSegments[i]
if len(seg.RemotePieces) == 0 {
continue
}
aliases, err := aliasCache.ConvertPiecesToAliases(ctx, seg.RemotePieces)
if err != nil {
return Error.Wrap(err)
}
sort.Slice(aliases, func(i, k int) bool {
return aliases[i].Number < aliases[k].Number
})
_, err = stmt.ExecContext(ctx, seg.StreamID, seg.Position, aliases)
if err != nil {
return Error.Wrap(err)
}
}
return nil
}()
if err != nil {
return err
}
return nil
}),
},
{
DB: &db.db,
Description: "drop remote_pieces from segments table",
Version: 6,
Action: migrate.SQL{
`ALTER TABLE segments DROP COLUMN remote_pieces`,
},
},
},
}
}

View File

@ -0,0 +1,117 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package metabase_test
import (
"sort"
"testing"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/common/uuid"
"storj.io/storj/satellite/metainfo/metabase"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
)
func TestMigrateToAliases(t *testing.T) {
for _, info := range databaseInfos() {
info := info
t.Run(info.name, func(t *testing.T) {
t.Parallel()
ctx := testcontext.New(t)
defer ctx.Cleanup()
db, err := satellitedbtest.CreateMetabaseDB(ctx, zaptest.NewLogger(t), t.Name(), "M", 0, satellitedbtest.Database{
Name: info.name,
URL: info.connstr,
Message: "",
})
require.NoError(t, err)
defer ctx.Check(db.Close)
mdb := db.InternalImplementation().(*metabase.DB)
allMigrations := mdb.PostgresMigration()
beforeAliases := allMigrations.TargetVersion(2)
err = beforeAliases.Run(ctx, zaptest.NewLogger(t))
require.NoError(t, err)
rawdb := mdb.UnderlyingTagSQL()
require.NotNil(t, rawdb)
type segmentEntry struct {
StreamID uuid.UUID
Position metabase.SegmentPosition
Pieces metabase.Pieces
}
s1, s2 := testrand.UUID(), testrand.UUID()
n1, n2, n3 := testrand.NodeID(), testrand.NodeID(), testrand.NodeID()
entries := []segmentEntry{
{
StreamID: s1,
Position: metabase.SegmentPosition{Index: 1},
Pieces: metabase.Pieces{{1, n1}, {2, n2}},
},
{
StreamID: s1,
Position: metabase.SegmentPosition{Part: 1, Index: 2},
Pieces: metabase.Pieces{{3, n3}, {2, n2}},
},
{
StreamID: s2,
Position: metabase.SegmentPosition{Part: 1, Index: 0},
Pieces: metabase.Pieces{{1, n1}},
},
{
StreamID: s2,
Position: metabase.SegmentPosition{Part: 1, Index: 1},
Pieces: metabase.Pieces{},
},
}
for _, e := range entries {
_, err = rawdb.ExecContext(ctx, `
INSERT INTO segments (
stream_id, position, remote_pieces,
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size, plain_offset, plain_size, redundancy
) VALUES (
$1, $2, $3,
$4, $5, $6,
$7, $8, $9, $10
)`,
e.StreamID, e.Position, e.Pieces,
// mock values
testrand.PieceID(), []byte{1, 2}, []byte{1, 2},
100, 100, 100, int64(0x10),
)
require.NoError(t, err)
}
err = allMigrations.Run(ctx, zaptest.NewLogger(t))
require.NoError(t, err)
for _, e := range entries {
seg, err := db.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: e.StreamID,
Position: e.Position,
})
require.NoError(t, err)
sortedPieces := e.Pieces
sort.Slice(sortedPieces, func(i, k int) bool {
return sortedPieces[i].Number < sortedPieces[k].Number
})
require.Equal(t, sortedPieces, seg.Pieces)
}
})
}
}

View File

@ -135,7 +135,7 @@ func (db *DB) DeleteObjectExactVersion(ctx context.Context, opts DeleteObjectExa
return storj.ErrObjectNotFound.Wrap(Error.New("no rows deleted"))
}
segmentInfos, err := deleteSegments(ctx, tx, result.Objects)
segmentInfos, err := db.deleteSegments(ctx, tx, result.Objects)
if err != nil {
return err
}
@ -215,7 +215,7 @@ func (db *DB) DeletePendingObject(ctx context.Context, opts DeletePendingObject)
return storj.ErrObjectNotFound.Wrap(Error.New("no rows deleted"))
}
segmentInfos, err := deleteSegments(ctx, tx, result.Objects)
segmentInfos, err := db.deleteSegments(ctx, tx, result.Objects)
if err != nil {
return err
}
@ -294,7 +294,7 @@ func (db *DB) DeleteObjectLatestVersion(ctx context.Context, opts DeleteObjectLa
return storj.ErrObjectNotFound.Wrap(Error.New("no rows deleted"))
}
segmentInfos, err := deleteSegments(ctx, tx, result.Objects)
segmentInfos, err := db.deleteSegments(ctx, tx, result.Objects)
if err != nil {
return err
}
@ -349,7 +349,7 @@ func (db *DB) DeleteObjectAnyStatusAllVersions(ctx context.Context, opts DeleteO
return storj.ErrObjectNotFound.Wrap(Error.New("no rows deleted"))
}
segmentInfos, err := deleteSegments(ctx, tx, result.Objects)
segmentInfos, err := db.deleteSegments(ctx, tx, result.Objects)
if err != nil {
return err
}
@ -428,7 +428,7 @@ func (db *DB) DeleteObjectsAllVersions(ctx context.Context, opts DeleteObjectsAl
return nil
}
segmentInfos, err := deleteSegments(ctx, tx, result.Objects)
segmentInfos, err := db.deleteSegments(ctx, tx, result.Objects)
if err != nil {
return err
}
@ -504,7 +504,7 @@ func scanMultipleObjectsDeletion(rows tagsql.Rows) (objects []Object, err error)
return objects, nil
}
func deleteSegments(ctx context.Context, tx tagsql.Tx, objects []Object) (_ []DeletedSegmentInfo, err error) {
func (db *DB) deleteSegments(ctx context.Context, tx tagsql.Tx, objects []Object) (_ []DeletedSegmentInfo, err error) {
defer mon.Task()(&ctx)(&err)
// TODO we need to figure out how integrate this with piece deletion code
@ -524,7 +524,7 @@ func deleteSegments(ctx context.Context, tx tagsql.Tx, objects []Object) (_ []De
segmentsRows, err := tx.Query(ctx, `
DELETE FROM segments
WHERE stream_id = ANY ($1)
RETURNING root_piece_id, remote_pieces;
RETURNING root_piece_id, remote_alias_pieces;
`, pgutil.ByteaArray(streamIDs))
if err != nil {
return []DeletedSegmentInfo{}, Error.New("unable to delete object: %w", err)
@ -534,11 +534,16 @@ func deleteSegments(ctx context.Context, tx tagsql.Tx, objects []Object) (_ []De
infos := make([]DeletedSegmentInfo, 0, len(objects))
for segmentsRows.Next() {
var segmentInfo DeletedSegmentInfo
err = segmentsRows.Scan(&segmentInfo.RootPieceID, &segmentInfo.Pieces)
var aliasPieces AliasPieces
err = segmentsRows.Scan(&segmentInfo.RootPieceID, &aliasPieces)
if err != nil {
return []DeletedSegmentInfo{}, Error.New("unable to delete object: %w", err)
}
segmentInfo.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
if err != nil {
return []DeletedSegmentInfo{}, Error.New("failed to convert aliases: %w", err)
}
if len(segmentInfo.Pieces) != 0 {
infos = append(infos, segmentInfo)
}

View File

@ -54,16 +54,22 @@ func (db *DB) DeleteBucketObjects(ctx context.Context, opts DeleteBucketObjects)
)
DELETE FROM segments
WHERE segments.stream_id in (SELECT deleted_objects.stream_id FROM deleted_objects)
RETURNING segments.stream_id, segments.root_piece_id, segments.remote_pieces
RETURNING segments.stream_id, segments.root_piece_id, segments.remote_alias_pieces
`, opts.Bucket.ProjectID, opts.Bucket.BucketName, batchSize))(func(rows tagsql.Rows) error {
ids := map[uuid.UUID]struct{}{} // TODO: avoid map here
for rows.Next() {
var streamID uuid.UUID
var segment DeletedSegmentInfo
err := rows.Scan(&streamID, &segment.RootPieceID, &segment.Pieces)
var aliasPieces AliasPieces
err := rows.Scan(&streamID, &segment.RootPieceID, &aliasPieces)
if err != nil {
return Error.Wrap(err)
}
segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
if err != nil {
return Error.Wrap(err)
}
ids[streamID] = struct{}{}
deleteSegments = append(deleteSegments, segment)
}

View File

@ -168,13 +168,14 @@ func (db *DB) GetSegmentByLocation(ctx context.Context, opts GetSegmentByLocatio
return Segment{}, err
}
var aliasPieces AliasPieces
err = db.db.QueryRow(ctx, `
SELECT
stream_id,
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size, plain_offset, plain_size,
redundancy,
inline_data, remote_pieces
inline_data, remote_alias_pieces
FROM segments
WHERE
stream_id IN (SELECT stream_id FROM objects WHERE
@ -191,7 +192,7 @@ func (db *DB) GetSegmentByLocation(ctx context.Context, opts GetSegmentByLocatio
&segment.RootPieceID, &segment.EncryptedKeyNonce, &segment.EncryptedKey,
&segment.EncryptedSize, &segment.PlainOffset, &segment.PlainSize,
redundancyScheme{&segment.Redundancy},
&segment.InlineData, &segment.Pieces,
&segment.InlineData, &aliasPieces,
)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
@ -200,6 +201,10 @@ func (db *DB) GetSegmentByLocation(ctx context.Context, opts GetSegmentByLocatio
return Segment{}, Error.New("unable to query segment: %w", err)
}
segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
if err != nil {
return Segment{}, Error.New("unable to convert aliases to pieces: %w", err)
}
segment.Position = opts.Position
return segment, nil
@ -227,12 +232,13 @@ func (db *DB) GetSegmentByPosition(ctx context.Context, opts GetSegmentByPositio
return Segment{}, err
}
var aliasPieces AliasPieces
err = db.db.QueryRow(ctx, `
SELECT
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size, plain_offset, plain_size,
redundancy,
inline_data, remote_pieces
inline_data, remote_alias_pieces
FROM segments
WHERE
stream_id = $1 AND
@ -242,7 +248,7 @@ func (db *DB) GetSegmentByPosition(ctx context.Context, opts GetSegmentByPositio
&segment.RootPieceID, &segment.EncryptedKeyNonce, &segment.EncryptedKey,
&segment.EncryptedSize, &segment.PlainOffset, &segment.PlainSize,
redundancyScheme{&segment.Redundancy},
&segment.InlineData, &segment.Pieces,
&segment.InlineData, &aliasPieces,
)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
@ -251,6 +257,11 @@ func (db *DB) GetSegmentByPosition(ctx context.Context, opts GetSegmentByPositio
return Segment{}, Error.New("unable to query segment: %w", err)
}
segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
if err != nil {
return Segment{}, Error.New("unable to convert aliases to pieces: %w", err)
}
segment.StreamID = opts.StreamID
segment.Position = opts.Position
@ -270,13 +281,14 @@ func (db *DB) GetLatestObjectLastSegment(ctx context.Context, opts GetLatestObje
return Segment{}, err
}
var aliasPieces AliasPieces
err = db.db.QueryRow(ctx, `
SELECT
stream_id, position,
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size, plain_offset, plain_size,
redundancy,
inline_data, remote_pieces
inline_data, remote_alias_pieces
FROM segments
WHERE
stream_id IN (SELECT stream_id FROM objects WHERE
@ -295,7 +307,7 @@ func (db *DB) GetLatestObjectLastSegment(ctx context.Context, opts GetLatestObje
&segment.RootPieceID, &segment.EncryptedKeyNonce, &segment.EncryptedKey,
&segment.EncryptedSize, &segment.PlainOffset, &segment.PlainSize,
redundancyScheme{&segment.Redundancy},
&segment.InlineData, &segment.Pieces,
&segment.InlineData, &aliasPieces,
)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
@ -304,6 +316,11 @@ func (db *DB) GetLatestObjectLastSegment(ctx context.Context, opts GetLatestObje
return Segment{}, Error.New("unable to query segment: %w", err)
}
segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
if err != nil {
return Segment{}, Error.New("unable to convert aliases to pieces: %w", err)
}
return segment, nil
}
@ -325,13 +342,14 @@ func (db *DB) GetSegmentByOffset(ctx context.Context, opts GetSegmentByOffset) (
return Segment{}, ErrInvalidRequest.New("Invalid PlainOffset: %d", opts.PlainOffset)
}
var aliasPieces AliasPieces
err = db.db.QueryRow(ctx, `
SELECT
stream_id, position,
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size, plain_offset, plain_size,
redundancy,
inline_data, remote_pieces
inline_data, remote_alias_pieces
FROM segments
WHERE
stream_id IN (SELECT stream_id FROM objects WHERE
@ -352,7 +370,7 @@ func (db *DB) GetSegmentByOffset(ctx context.Context, opts GetSegmentByOffset) (
&segment.RootPieceID, &segment.EncryptedKeyNonce, &segment.EncryptedKey,
&segment.EncryptedSize, &segment.PlainOffset, &segment.PlainSize,
redundancyScheme{&segment.Redundancy},
&segment.InlineData, &segment.Pieces,
&segment.InlineData, &aliasPieces,
)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
@ -361,6 +379,11 @@ func (db *DB) GetSegmentByOffset(ctx context.Context, opts GetSegmentByOffset) (
return Segment{}, Error.New("unable to query segment: %w", err)
}
segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
if err != nil {
return Segment{}, Error.New("unable to convert aliases to pieces: %w", err)
}
return segment, nil
}

View File

@ -47,7 +47,7 @@ func (db *DB) ListSegments(ctx context.Context, opts ListSegments) (result ListS
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size, plain_offset, plain_size,
redundancy,
inline_data, remote_pieces
inline_data, remote_alias_pieces
FROM segments
WHERE
stream_id = $1 AND
@ -57,17 +57,23 @@ func (db *DB) ListSegments(ctx context.Context, opts ListSegments) (result ListS
`, opts.StreamID, opts.Cursor, opts.Limit+1))(func(rows tagsql.Rows) error {
for rows.Next() {
var segment Segment
var aliasPieces AliasPieces
err = rows.Scan(
&segment.Position,
&segment.RootPieceID, &segment.EncryptedKeyNonce, &segment.EncryptedKey,
&segment.EncryptedSize, &segment.PlainOffset, &segment.PlainSize,
redundancyScheme{&segment.Redundancy},
&segment.InlineData, &segment.Pieces,
&segment.InlineData, &aliasPieces,
)
if err != nil {
return Error.New("failed to scan segments: %w", err)
}
segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
if err != nil {
return Error.New("failed to convert aliases to pieces: %w", err)
}
segment.StreamID = opts.StreamID
result.Segments = append(result.Segments, segment)
}

View File

@ -90,6 +90,7 @@ func (db *DB) TestingDeleteAll(ctx context.Context) (err error) {
DELETE FROM node_aliases;
SELECT setval('node_alias_seq', 1, false);
`)
db.aliasCache = NewNodeAliasCache(db)
return Error.Wrap(err)
}
@ -165,7 +166,7 @@ func (db *DB) testingGetAllSegments(ctx context.Context) (_ []RawSegment, err er
encrypted_size,
plain_offset, plain_size,
redundancy,
inline_data, remote_pieces
inline_data, remote_alias_pieces
FROM segments
ORDER BY stream_id ASC, position ASC
`)
@ -175,6 +176,7 @@ func (db *DB) testingGetAllSegments(ctx context.Context) (_ []RawSegment, err er
defer func() { err = errs.Combine(err, rows.Close()) }()
for rows.Next() {
var seg RawSegment
var aliasPieces AliasPieces
err := rows.Scan(
&seg.StreamID,
&seg.Position,
@ -190,11 +192,17 @@ func (db *DB) testingGetAllSegments(ctx context.Context) (_ []RawSegment, err er
redundancyScheme{&seg.Redundancy},
&seg.InlineData,
&seg.Pieces,
&aliasPieces,
)
if err != nil {
return nil, Error.New("testingGetAllSegments scan failed: %w", err)
}
seg.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
if err != nil {
return nil, Error.New("testingGetAllSegments convert aliases to pieces failed: %w", err)
}
segs = append(segs, seg)
}
if err := rows.Err(); err != nil {

View File

@ -46,18 +46,28 @@ func (db *DB) UpdateSegmentPieces(ctx context.Context, opts UpdateSegmentPieces)
return err
}
var pieces Pieces
oldPieces, err := db.aliasCache.ConvertPiecesToAliases(ctx, opts.OldPieces)
if err != nil {
return Error.New("unable to convert pieces to aliases: %w", err)
}
newPieces, err := db.aliasCache.ConvertPiecesToAliases(ctx, opts.NewPieces)
if err != nil {
return Error.New("unable to convert pieces to aliases: %w", err)
}
var resultPieces AliasPieces
err = db.db.QueryRow(ctx, `
UPDATE segments SET
remote_pieces = CASE
WHEN remote_pieces = $3 THEN $4
ELSE remote_pieces
remote_alias_pieces = CASE
WHEN remote_alias_pieces = $3 THEN $4
ELSE remote_alias_pieces
END
WHERE
stream_id = $1 AND
position = $2
RETURNING remote_pieces
`, opts.StreamID, opts.Position, opts.OldPieces, opts.NewPieces).Scan(&pieces)
RETURNING remote_alias_pieces
`, opts.StreamID, opts.Position, oldPieces, newPieces).Scan(&resultPieces)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return ErrSegmentNotFound.New("segment missing")
@ -65,8 +75,8 @@ func (db *DB) UpdateSegmentPieces(ctx context.Context, opts UpdateSegmentPieces)
return Error.New("unable to update segment pieces: %w", err)
}
if !opts.NewPieces.Equal(pieces) {
return storage.ErrValueChanged.New("segment remote_pieces field was changed")
if !EqualAliasPieces(newPieces, resultPieces) {
return storage.ErrValueChanged.New("segment remote_alias_pieces field was changed")
}
return nil

View File

@ -227,7 +227,7 @@ func TestUpdateSegmentPieces(t *testing.T) {
},
},
ErrClass: &storage.ErrValueChanged,
ErrText: "segment remote_pieces field was changed",
ErrText: "segment remote_alias_pieces field was changed",
}.Check(ctx, t, db)
})