metainfo-migrator: handle objects with missing segments
We can have objects with missing segments. Such objects will be removed by segment reaper but between executions we can have it. We should not interrupt migration but collect such objects to cleanup migrated DB later. Change-Id: I5cc4a66395c1773a6430f34cc25a0f2449133f80
This commit is contained in:
parent
981f1ca449
commit
ce4024a521
@ -27,6 +27,7 @@ var (
|
||||
writeParallelLimit = flag.Int("writeParallelLimit", defaultWriteParallelLimit, "limit of parallel batch writes")
|
||||
preGeneratedStreamIDs = flag.Int("preGeneratedStreamIDs", defaultPreGeneratedStreamIDs, "number of pre generated stream ids for segment")
|
||||
nodes = flag.String("nodes", "", "file with nodes ids")
|
||||
invalidObjects = flag.String("invalidObjects", "", "file for storing invalid objects")
|
||||
|
||||
pointerdb = flag.String("pointerdb", "", "connection URL for PointerDB")
|
||||
metabasedb = flag.String("metabasedb", "", "connection URL for MetabaseDB")
|
||||
@ -72,6 +73,7 @@ func main() {
|
||||
WriteBatchSize: *writeBatchSize,
|
||||
WriteParallelLimit: *writeParallelLimit,
|
||||
Nodes: *nodes,
|
||||
InvalidObjectsFile: *invalidObjects,
|
||||
}
|
||||
migrator := NewMigrator(log, *pointerdb, *metabasedb, config)
|
||||
err = migrator.MigrateProjects(ctx)
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"encoding/csv"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"os"
|
||||
@ -62,6 +63,7 @@ type Config struct {
|
||||
WriteBatchSize int
|
||||
WriteParallelLimit int
|
||||
Nodes string
|
||||
InvalidObjectsFile string
|
||||
}
|
||||
|
||||
// Migrator defines metainfo migrator.
|
||||
@ -166,6 +168,24 @@ func (m *Migrator) MigrateProjects(ctx context.Context) (err error) {
|
||||
var fullpath, lastFullPath, metadata []byte
|
||||
var allObjects, allSegments, zombieSegments int64
|
||||
|
||||
var invalidObjectsWriter *csv.Writer
|
||||
if m.config.InvalidObjectsFile != "" {
|
||||
objFile, err := os.Create(m.config.InvalidObjectsFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() { err = errs.Combine(err, objFile.Close()) }()
|
||||
|
||||
invalidObjectsWriter = csv.NewWriter(objFile)
|
||||
} else {
|
||||
invalidObjectsWriter = csv.NewWriter(os.Stdout)
|
||||
}
|
||||
|
||||
err = invalidObjectsWriter.Write([]string{"project_id", "bucket_name", "object_key", "stream_id", "expected_segments", "read_segments"})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
if m.config.Nodes != "" {
|
||||
err = m.aliasNodes(ctx, mb)
|
||||
@ -215,12 +235,32 @@ func (m *Migrator) MigrateProjects(ctx context.Context) (err error) {
|
||||
return err
|
||||
}
|
||||
if !bytes.Equal(currentProject[:], segmentKey.ProjectID[:]) {
|
||||
currentProject = segmentKey.ProjectID
|
||||
|
||||
if len(objects) != 0 {
|
||||
return errs.New("Object map should be empty after processing whole project")
|
||||
// TODO should we add such incomplete object into metabase?
|
||||
for key, object := range objects {
|
||||
err = invalidObjectsWriter.Write([]string{
|
||||
currentProject.String(),
|
||||
key.Bucket,
|
||||
hex.EncodeToString([]byte(key.Key)),
|
||||
object.StreamID.String(),
|
||||
strconv.FormatInt(object.SegmentsExpected, 10),
|
||||
strconv.FormatInt(object.SegmentsRead, 10),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
invalidObjectsWriter.Flush()
|
||||
|
||||
if err := invalidObjectsWriter.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m.log.Warn("Object map should be empty after processing whole project", zap.String("ProjectID", currentProject.String()), zap.Int("Number of objects", len(objects)))
|
||||
}
|
||||
|
||||
currentProject = segmentKey.ProjectID
|
||||
|
||||
for b := range objects {
|
||||
delete(objects, b)
|
||||
}
|
||||
|
@ -178,6 +178,7 @@ func test(t *testing.T, createPointers func(t *testing.T, ctx context.Context, p
|
||||
PreGeneratedStreamIDs: 1000,
|
||||
WriteBatchSize: 3,
|
||||
WriteParallelLimit: 6,
|
||||
InvalidObjectsFile: ctx.File(satelliteDB.Name + "_invalid_objects.csv"),
|
||||
})
|
||||
err = migrator.MigrateProjects(ctx)
|
||||
require.NoError(t, err)
|
||||
|
Loading…
Reference in New Issue
Block a user