metainfo-migration: extend test case

Change-Id: I9d1ac41a18bb08200ef20f7ff2f8df5531140f99
This commit is contained in:
Michał Niewrzał 2021-02-09 13:06:18 +01:00
parent 5a23d9c167
commit 341a4b7f59
5 changed files with 191 additions and 56 deletions

View File

@ -167,9 +167,7 @@ pipeline {
steps {
sh 'psql -U postgres -c \'create database teststorj3;\''
// catchError {
// sh 'make test-sim-backwards-compatible'
// }
sh 'make test-sim-backwards-compatible'
}
}
@ -184,9 +182,7 @@ pipeline {
steps {
sh 'cockroach sql --insecure --host=localhost:26257 -e \'create database testcockroach5;\''
// catchError {
// sh 'make test-sim-backwards-compatible'
// }
sh 'make test-sim-backwards-compatible'
sh 'cockroach sql --insecure --host=localhost:26257 -e \'drop database testcockroach5;\''
}
}

View File

@ -6,8 +6,7 @@ package main
import (
"context"
"flag"
"fmt"
"os"
"log"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
@ -27,6 +26,7 @@ var (
writeBatchSize = flag.Int("writeBatchSize", defaultWriteBatchSize, "batch size for inserting objects and segments")
writeParallelLimit = flag.Int("writeParallelLimit", defaultWriteParallelLimit, "limit of parallel batch writes")
preGeneratedStreamIDs = flag.Int("preGeneratedStreamIDs", defaultPreGeneratedStreamIDs, "number of pre generated stream ids for segment")
nodes = flag.String("nodes", "", "file with nodes ids")
pointerdb = flag.String("pointerdb", "", "connection URL for PointerDB")
metabasedb = flag.String("metabasedb", "", "connection URL for MetabaseDB")
@ -36,12 +36,10 @@ func main() {
flag.Parse()
if *pointerdb == "" {
fmt.Println("Flag '--pointerdb' is not set")
os.Exit(1)
log.Fatalln("Flag '--pointerdb' is not set")
}
if *metabasedb == "" {
fmt.Println("Flag '--metabasedb' is not set")
os.Exit(1)
log.Fatalln("Flag '--metabasedb' is not set")
}
ctx := context.Background()
@ -73,6 +71,7 @@ func main() {
ReadBatchSize: *readBatchSize,
WriteBatchSize: *writeBatchSize,
WriteParallelLimit: *writeParallelLimit,
Nodes: *nodes,
}
migrator := NewMigrator(log, *pointerdb, *metabasedb, config)
err = migrator.MigrateProjects(ctx)

View File

@ -4,9 +4,11 @@
package main
import (
"bufio"
"bytes"
"context"
"encoding/binary"
"encoding/hex"
"errors"
"os"
"runtime/pprof"
@ -30,7 +32,7 @@ import (
"storj.io/storj/satellite/metainfo/metabase"
)
const objectArgs = 13
const objectArgs = 14
const segmentArgs = 11
// EntryKey map key for object.
@ -41,16 +43,16 @@ type EntryKey struct {
// Object represents object metadata.
type Object struct {
StreamID uuid.UUID
CreationDate time.Time
ExpireAt *time.Time
EncryptedKey []byte
Encryption int64
Metadata []byte
TotalEncryptedSize int64
SegmentsRead int64
SegmentsExpected int64
StreamID uuid.UUID
CreationDate time.Time
ExpireAt *time.Time
EncryptedMetadata []byte
EncryptedMetadataKey []byte
EncryptedMetadataNonce []byte
Encryption int64
TotalEncryptedSize int64
SegmentsRead int64
SegmentsExpected int64
}
// Config initial settings for migrator.
@ -59,6 +61,7 @@ type Config struct {
ReadBatchSize int
WriteBatchSize int
WriteParallelLimit int
Nodes string
}
// Migrator defines metainfo migrator.
@ -116,7 +119,6 @@ func (m *Migrator) MigrateProjects(ctx context.Context) (err error) {
}
defer func() { err = errs.Combine(err, pointerDBConn.Close(ctx)) }()
// TODO use initial custom DB schema (e.g. to avoid indexes)
mb, err := metainfo.OpenMetabase(ctx, m.log.Named("metabase"), m.metabaseDBStr)
if err != nil {
return err
@ -124,9 +126,9 @@ func (m *Migrator) MigrateProjects(ctx context.Context) (err error) {
if err := mb.MigrateToLatest(ctx); err != nil {
return err
}
if err := mb.Close(); err != nil {
return err
}
defer func() { err = errs.Combine(err, mb.Close()) }()
aliasCache := metabase.NewNodeAliasCache(mb)
config, err := pgxpool.ParseConfig(m.metabaseDBStr)
if err != nil {
@ -162,16 +164,23 @@ func (m *Migrator) MigrateProjects(ctx context.Context) (err error) {
objects := make(map[EntryKey]Object)
var currentProject uuid.UUID
var fullpath, lastFullPath, metadata []byte
var allSegments, zombieSegments int64
var allObjects, allSegments, zombieSegments int64
start := time.Now()
if m.config.Nodes != "" {
err = m.aliasNodes(ctx, mb)
if err != nil {
return err
}
}
lastCheck := time.Now()
m.log.Info("Start generating StreamIDs", zap.Int("total", m.config.PreGeneratedStreamIDs))
ids, err := generateStreamIDs(m.config.PreGeneratedStreamIDs)
if err != nil {
return err
}
m.log.Info("Finished generating StreamIDs", zap.Duration("took", time.Since(start)))
m.log.Info("Finished generating StreamIDs", zap.Duration("took", time.Since(lastCheck)))
m.log.Info("Start", zap.Time("time", start),
zap.Int("readBatchSize", m.config.ReadBatchSize),
@ -179,7 +188,7 @@ func (m *Migrator) MigrateProjects(ctx context.Context) (err error) {
zap.Int("writeParallelLimit", m.config.WriteParallelLimit),
)
lastCheck := time.Now()
lastCheck = time.Now()
for {
hasResults := false
err = func() error {
@ -235,6 +244,9 @@ func (m *Migrator) MigrateProjects(ctx context.Context) (err error) {
// * detect empty objects and insert only object
if segmentKey.Position.Index == metabase.LastSegmentIndex {
// process last segment, it contains information about object and segment metadata
if len(ids) == 0 {
return errs.New("not enough generated stream ids")
}
streamID := ids[0]
err = proto.Unmarshal(pointer.Metadata, streamMeta)
if err != nil {
@ -262,8 +274,9 @@ func (m *Migrator) MigrateProjects(ctx context.Context) (err error) {
object.CreationDate = pointer.CreationDate
object.ExpireAt = expireAt
object.Encryption = encryption
object.EncryptedKey = streamMeta.LastSegmentMeta.EncryptedKey
object.Metadata = pointer.Metadata // TODO this needs to be striped to EncryptedStreamInfo
object.EncryptedMetadataKey = streamMeta.LastSegmentMeta.EncryptedKey
object.EncryptedMetadataNonce = streamMeta.LastSegmentMeta.KeyNonce
object.EncryptedMetadata = pointer.Metadata // TODO this needs to be striped to EncryptedStreamInfo
object.SegmentsRead = 1
object.TotalEncryptedSize = pointer.SegmentSize
@ -278,12 +291,13 @@ func (m *Migrator) MigrateProjects(ctx context.Context) (err error) {
if err != nil {
return err
}
allObjects++
} else {
objects[key] = object
}
segmentPosition.Index = uint32(streamMeta.NumberOfSegments - 1)
err = m.insertSegment(ctx, metabaseConn, streamID, segmentPosition.Encode(), pointer, streamMeta.LastSegmentMeta)
err = m.insertSegment(ctx, metabaseConn, aliasCache, streamID, segmentPosition.Encode(), pointer, streamMeta.LastSegmentMeta)
if err != nil {
return err
}
@ -299,7 +313,7 @@ func (m *Migrator) MigrateProjects(ctx context.Context) (err error) {
}
segmentPosition.Index = segmentKey.Position.Index
err = m.insertSegment(ctx, metabaseConn, object.StreamID, segmentPosition.Encode(), pointer, segmentMeta)
err = m.insertSegment(ctx, metabaseConn, aliasCache, object.StreamID, segmentPosition.Encode(), pointer, segmentMeta)
if err != nil {
return err
}
@ -314,6 +328,7 @@ func (m *Migrator) MigrateProjects(ctx context.Context) (err error) {
if err != nil {
return err
}
allObjects++
delete(objects, key)
} else {
@ -346,7 +361,7 @@ func (m *Migrator) MigrateProjects(ctx context.Context) (err error) {
m.metabaseLimiter.Wait()
m.log.Info("Finished", zap.Int64("segments", allSegments), zap.Int64("invalid", zombieSegments), zap.Duration("total", time.Since(start)))
m.log.Info("Finished", zap.Int64("objects", allObjects), zap.Int64("segments", allSegments), zap.Int64("invalid", zombieSegments), zap.Duration("total", time.Since(start)))
return nil
}
@ -356,7 +371,7 @@ func (m *Migrator) insertObject(ctx context.Context, conn *pgxpool.Pool, locatio
location.ProjectID, location.BucketName, []byte(location.ObjectKey), 1, object.StreamID,
object.CreationDate, object.ExpireAt,
metabase.Committed, object.SegmentsRead,
object.Metadata, object.EncryptedKey,
object.EncryptedMetadata, object.EncryptedMetadataKey, object.EncryptedMetadataNonce,
object.TotalEncryptedSize,
object.Encryption,
})
@ -370,7 +385,7 @@ func (m *Migrator) insertObject(ctx context.Context, conn *pgxpool.Pool, locatio
return nil
}
func (m *Migrator) insertSegment(ctx context.Context, conn *pgxpool.Pool, streamID uuid.UUID, position uint64, pointer *fastpb.Pointer, segmentMeta *fastpb.SegmentMeta) (err error) {
func (m *Migrator) insertSegment(ctx context.Context, conn *pgxpool.Pool, aliasCache *metabase.NodeAliasCache, streamID uuid.UUID, position uint64, pointer *fastpb.Pointer, segmentMeta *fastpb.SegmentMeta) (err error) {
var rootPieceID storj.PieceID
var remotePieces metabase.Pieces
var redundancy int64
@ -389,13 +404,22 @@ func (m *Migrator) insertSegment(ctx context.Context, conn *pgxpool.Pool, stream
}
}
pieces, err := aliasCache.ConvertPiecesToAliases(ctx, remotePieces)
if err != nil {
return err
}
sort.Slice(pieces, func(i, j int) bool {
return pieces[i].Number < pieces[j].Number
})
m.segments = append(m.segments, []interface{}{
streamID, position,
rootPieceID,
segmentMeta.EncryptedKey, segmentMeta.KeyNonce,
pointer.SegmentSize, 0, 0,
redundancy,
pointer.InlineSegment, remotePieces,
pointer.InlineSegment, pieces,
})
if len(m.segments) >= m.config.WriteBatchSize {
@ -508,7 +532,7 @@ func prepareObjectsSQL(batchSize int) string {
project_id, bucket_name, object_key, version, stream_id,
created_at, expires_at,
status, segment_count,
encrypted_metadata, encrypted_metadata_encrypted_key,
encrypted_metadata, encrypted_metadata_encrypted_key, encrypted_metadata_nonce,
total_encrypted_size,
encryption
) VALUES
@ -562,3 +586,49 @@ func generateStreamIDs(numberOfIDs int) ([]uuid.UUID, error) {
})
return ids, nil
}
func (m *Migrator) aliasNodes(ctx context.Context, mb metainfo.MetabaseDB) error {
start := time.Now()
m.log.Info("Start aliasing nodes")
file, err := os.Open(m.config.Nodes)
if err != nil {
return err
}
defer func() { err = errs.Combine(err, file.Close()) }()
scanner := bufio.NewScanner(file)
nodes := make([]storj.NodeID, 0, 30000)
for scanner.Scan() {
line := scanner.Text()
decoded, err := hex.DecodeString(line)
if err != nil {
m.log.Error("unable decode node id", zap.String("value", line), zap.Error(err))
continue
}
node, err := storj.NodeIDFromBytes(decoded)
if err != nil {
m.log.Error("unable create node id", zap.String("value", line), zap.Error(err))
continue
}
nodes = append(nodes, node)
}
// batch is used because we had issue with CRDB to put all nodes in one insert
batch := 1000
for len(nodes) > 0 {
if len(nodes) < batch {
batch = len(nodes)
}
err = mb.EnsureNodeAliases(ctx, metabase.EnsureNodeAliases{
Nodes: nodes[:batch],
})
if err != nil {
return err
}
nodes = nodes[batch:]
m.log.Info("Left to insert", zap.Int("nodes", len(nodes)))
}
m.log.Info("Finished aliasing nodes", zap.Duration("took", time.Since(start)))
return nil
}

View File

@ -20,15 +20,21 @@ import (
migration "storj.io/storj/cmd/metainfo-migration"
"storj.io/storj/private/dbutil/tempdb"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/metainfo/metabase"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
"storj.io/storj/storage"
)
func TestMigrator_SingleSegmentObj(t *testing.T) {
expectedEntries := 1
expectedProjectID := testrand.UUID()
expectedBucket := []byte("bucket-name")
expectedObjectKey := []byte("encrypted-key")
var pointer *pb.Pointer
createPointers := func(t *testing.T, ctx context.Context, pointerDB metainfo.PointerDB) {
projectID := testrand.UUID()
err := createLastSegment(ctx, pointerDB, projectID, []byte("bucket-name"), []byte("encrypted-key"), 1)
var err error
pointer, err = createLastSegment(ctx, pointerDB, expectedProjectID, expectedBucket, expectedObjectKey, 1)
require.NoError(t, err)
}
@ -37,23 +43,67 @@ func TestMigrator_SingleSegmentObj(t *testing.T) {
require.NoError(t, err)
require.Len(t, objects, expectedEntries)
segments, err := metabaseDB.TestingAllSegments(ctx)
require.NoError(t, err)
require.Equal(t, len(segments), expectedEntries)
{ // verify object
require.EqualValues(t, expectedProjectID, objects[0].ProjectID)
require.EqualValues(t, expectedBucket, objects[0].BucketName)
require.EqualValues(t, expectedObjectKey, objects[0].ObjectKey)
require.EqualValues(t, pointer.SegmentSize, objects[0].TotalEncryptedSize)
require.EqualValues(t, 1, objects[0].SegmentCount)
require.Equal(t, metabase.Committed, objects[0].Status)
require.Zero(t, objects[0].TotalPlainSize)
require.WithinDuration(t, pointer.CreationDate, objects[0].CreatedAt, 5*time.Second)
require.WithinDuration(t, pointer.ExpirationDate, *objects[0].ExpiresAt, 5*time.Second)
require.EqualValues(t, 0, segments[0].Position.Part)
require.EqualValues(t, 0, segments[0].Position.Index)
streamMeta := &pb.StreamMeta{}
err = pb.Unmarshal(pointer.Metadata, streamMeta)
require.NoError(t, err)
require.Equal(t, pointer.Metadata, objects[0].EncryptedMetadata)
require.EqualValues(t, streamMeta.LastSegmentMeta.EncryptedKey, objects[0].EncryptedMetadataEncryptedKey)
require.EqualValues(t, streamMeta.LastSegmentMeta.KeyNonce, objects[0].EncryptedMetadataNonce)
require.EqualValues(t, streamMeta.EncryptionType, objects[0].Encryption.CipherSuite)
require.EqualValues(t, streamMeta.EncryptionBlockSize, objects[0].Encryption.BlockSize)
}
{ // verify segment
segments, err := metabaseDB.TestingAllSegments(ctx)
require.NoError(t, err)
require.Equal(t, len(segments), expectedEntries)
require.Zero(t, segments[0].Position.Part)
require.Zero(t, segments[0].Position.Index)
require.Zero(t, segments[0].PlainOffset)
require.Zero(t, segments[0].PlainSize)
require.EqualValues(t, pointer.Remote.RootPieceId, segments[0].RootPieceID)
redundancy := pointer.Remote.Redundancy
require.EqualValues(t, redundancy.ErasureShareSize, segments[0].Redundancy.ShareSize)
require.EqualValues(t, redundancy.Type, segments[0].Redundancy.Algorithm)
require.EqualValues(t, redundancy.MinReq, segments[0].Redundancy.RequiredShares)
require.EqualValues(t, redundancy.RepairThreshold, segments[0].Redundancy.RepairShares)
require.EqualValues(t, redundancy.SuccessThreshold, segments[0].Redundancy.OptimalShares)
require.EqualValues(t, redundancy.Total, segments[0].Redundancy.TotalShares)
require.Empty(t, segments[0].InlineData)
require.Equal(t, len(pointer.Remote.RemotePieces), len(segments[0].Pieces))
for i, piece := range pointer.Remote.RemotePieces {
require.EqualValues(t, piece.PieceNum, segments[0].Pieces[i].Number)
require.Equal(t, piece.NodeId, segments[0].Pieces[i].StorageNode)
}
}
}
test(t, createPointers, checkMigration)
}
func TestMigrator_ManyOneSegObj(t *testing.T) {
expectedEntries := 1000
expectedEntries := 300
createPointers := func(t *testing.T, ctx context.Context, pointerDB metainfo.PointerDB) {
projectID := testrand.UUID()
for i := 0; i < expectedEntries; i++ {
err := createLastSegment(ctx, pointerDB, projectID, []byte("bucket-name"), []byte("encrypted-key"+strconv.Itoa(i)), 1)
_, err := createLastSegment(ctx, pointerDB, projectID, []byte("bucket-name"), []byte("encrypted-key"+strconv.Itoa(i)), 1)
require.NoError(t, err)
}
}
@ -75,7 +125,7 @@ func TestMigrator_MultiSegmentObj(t *testing.T) {
createPointers := func(t *testing.T, ctx context.Context, pointerDB metainfo.PointerDB) {
projectID := testrand.UUID()
err := createLastSegment(ctx, pointerDB, projectID, []byte("bucket-name"), []byte("encrypted-key"), expectedEntries+1)
_, err := createLastSegment(ctx, pointerDB, projectID, []byte("bucket-name"), []byte("encrypted-key"), expectedEntries+1)
require.NoError(t, err)
for i := 0; i < expectedEntries; i++ {
err = createSegment(ctx, pointerDB, projectID, uint32(i), []byte("bucket-name"), []byte("encrypted-key"))
@ -137,18 +187,33 @@ func test(t *testing.T, createPointers func(t *testing.T, ctx context.Context, p
}
}
func createLastSegment(ctx context.Context, pointerDB metainfo.PointerDB, projectID uuid.UUID, bucket, encryptedKey []byte, numberOfSegments int) error {
func createLastSegment(ctx context.Context, pointerDB metainfo.PointerDB, projectID uuid.UUID, bucket, encryptedKey []byte, numberOfSegments int) (*pb.Pointer, error) {
pointer := &pb.Pointer{}
pointer.Type = pb.Pointer_REMOTE
pointer.SegmentSize = 10
pointer.CreationDate = time.Now()
pointer.ExpirationDate = time.Now()
pointer.Remote = &pb.RemoteSegment{
RootPieceId: testrand.PieceID(),
Redundancy: &pb.RedundancyScheme{},
RootPieceId: testrand.PieceID(),
Redundancy: &pb.RedundancyScheme{
ErasureShareSize: 256,
Type: pb.RedundancyScheme_RS,
MinReq: 1,
RepairThreshold: 2,
SuccessThreshold: 3,
Total: 4,
},
RemotePieces: []*pb.RemotePiece{},
}
for i := 0; i < 10; i++ {
pointer.Remote.RemotePieces = append(pointer.Remote.RemotePieces, &pb.RemotePiece{
PieceNum: int32(i),
NodeId: testrand.NodeID(),
})
}
streamMeta := &pb.StreamMeta{}
streamMeta.NumberOfSegments = int64(numberOfSegments)
streamMeta.EncryptedStreamInfo = testrand.Bytes(1024)
@ -160,7 +225,7 @@ func createLastSegment(ctx context.Context, pointerDB metainfo.PointerDB, projec
}
metaBytes, err := pb.Marshal(streamMeta)
if err != nil {
return err
return nil, err
}
pointer.Metadata = metaBytes
@ -168,13 +233,13 @@ func createLastSegment(ctx context.Context, pointerDB metainfo.PointerDB, projec
pointerBytes, err := pb.Marshal(pointer)
if err != nil {
return err
return nil, err
}
err = pointerDB.Put(ctx, storage.Key(path), storage.Value(pointerBytes))
if err != nil {
return err
return nil, err
}
return nil
return pointer, nil
}
func createSegment(ctx context.Context, pointerDB metainfo.PointerDB, projectID uuid.UUID, segmentIndex uint32, bucket, encryptedKey []byte) error {

View File

@ -212,6 +212,11 @@ type MetabaseDB interface {
BucketEmpty(ctx context.Context, opts metabase.BucketEmpty) (empty bool, err error)
// UpdateSegmentPieces updates pieces for specified segment. If provided old pieces won't match current database state update will fail.
UpdateSegmentPieces(ctx context.Context, opts metabase.UpdateSegmentPieces) (err error)
// EnsureNodeAliases ensures that the supplied node ID-s have a alias.
// It's safe to concurrently try and create node ID-s for the same NodeID.
EnsureNodeAliases(ctx context.Context, opts metabase.EnsureNodeAliases) (err error)
// ListNodeAliases lists all node alias mappings.
ListNodeAliases(ctx context.Context) (_ []metabase.NodeAliasEntry, err error)
// TestingAllCommittedObjects gets all committed objects from bucket. Use only for testing purposes.
TestingAllCommittedObjects(ctx context.Context, projectID uuid.UUID, bucketName string) (objects []metabase.ObjectEntry, err error)