cmd/metabase-verify: tool for verifying metabase state

Currently the tool verifies:
* validity of plain_offset
* whether plain_size is smaller than encrypted_size

Change-Id: I9ec4fb5ead3356a196392c26ca377fcdb367138e
This commit is contained in:
Egon Elbre 2021-04-15 14:06:08 +03:00
parent 6307875203
commit fff21b330d
7 changed files with 396 additions and 6 deletions

View File

@ -0,0 +1,84 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"time"
"github.com/spf13/cobra"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/private/process"
"storj.io/storj/cmd/metabase-verify/verify"
"storj.io/storj/satellite/metainfo"
)
// Error is the default error class for the package.
var Error = errs.Class("metabase-verify")
func main() {
log := zap.L()
root := &cobra.Command{
Use: "metainfo-loop-verify",
}
IncludeProfiling(root)
root.AddCommand(VerifyCommand(log))
process.Exec(root)
}
// VerifyCommand creates command for running verifications.
func VerifyCommand(log *zap.Logger) *cobra.Command {
var metabaseDB string
var ignoreVersionMismatch bool
var verifyConfig verify.Config
cmd := &cobra.Command{
Use: "run",
Short: "run metabase verification",
}
flag := cmd.Flags()
flag.StringVar(&metabaseDB, "metabasedb", "", "connection URL for MetabaseDB")
_ = cmd.MarkFlagRequired("metabasedb")
flag.BoolVar(&ignoreVersionMismatch, "ignore-version-mismatch", false, "ignore version mismatch")
flag.DurationVar(&verifyConfig.Loop.CoalesceDuration, "loop.coalesce-duration", 5*time.Second, "how long to wait for new observers before starting iteration")
flag.Float64Var(&verifyConfig.Loop.RateLimit, "loop.rate-limit", 0, "rate limit (default is 0 which is unlimited segments per second)")
flag.IntVar(&verifyConfig.Loop.ListLimit, "loop.list-limit", 2500, "how many items to query in a batch")
flag.Int64Var(&verifyConfig.ProgressPrintFrequency, "progress-frequency", 1000000, "how often should we print progress (every object)")
cmd.RunE = func(cmd *cobra.Command, args []string) error {
ctx, cancel := process.Ctx(cmd)
defer cancel()
mdb, err := metainfo.OpenMetabase(ctx, log.Named("mdb"), metabaseDB)
if err != nil {
return Error.Wrap(err)
}
defer func() { _ = mdb.Close() }()
versionErr := mdb.CheckVersion(ctx)
if versionErr != nil {
log.Error("versions skewed", zap.Error(versionErr))
if !ignoreVersionMismatch {
return Error.Wrap(versionErr)
}
}
verify := verify.New(log, mdb, verifyConfig)
if err := verify.RunOnce(ctx); err != nil {
return Error.Wrap(err)
}
return nil
}
return cmd
}

View File

@ -0,0 +1,69 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"os"
"runtime/pprof"
"github.com/spf13/cobra"
"github.com/zeebo/errs"
)
var errProfile = errs.Class("profile")
// IncludeProfiling adds persistent profiling to cmd.
func IncludeProfiling(cmd *cobra.Command) {
var path string
var profile *CPUProfile
flag := cmd.PersistentFlags()
flag.StringVar(&path, "cpuprofile", "", "write cpu profile to file")
preRunE := cmd.PersistentPreRunE
cmd.PersistentPreRunE = func(cmd *cobra.Command, args []string) (err error) {
profile, err = NewProfile(path)
if err != nil {
return err
}
if preRunE != nil {
return preRunE(cmd, args)
}
return nil
}
postRunE := cmd.PersistentPostRunE
cmd.PersistentPostRunE = func(cmd *cobra.Command, args []string) (err error) {
if postRunE != nil {
return postRunE(cmd, args)
}
profile.Close()
return nil
}
}
// CPUProfile contains active profiling information.
type CPUProfile struct{ file *os.File }
// NewProfile starts a new profile on `path`.
func NewProfile(path string) (*CPUProfile, error) {
if path == "" {
return nil, nil
}
f, err := os.Create(path)
if err != nil {
return nil, errProfile.New("unable to create file: %w", err)
}
err = pprof.StartCPUProfile(f)
return &CPUProfile{file: f}, Error.Wrap(err)
}
// Close finishes the profile.
func (p *CPUProfile) Close() {
if p == nil || p.file == nil {
return
}
pprof.StopCPUProfile()
}

View File

@ -0,0 +1,69 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package verify
import (
"context"
"runtime"
"go.uber.org/zap"
"storj.io/common/memory"
"storj.io/storj/satellite/metabase/metaloop"
)
// ProgressObserver counts and prints progress of metabase loop.
type ProgressObserver struct {
Log *zap.Logger
ProgressPrintFrequency int64
ObjectCount int64
RemoteSegmentCount int64
InlineSegmentCount int64
}
// Report reports the current progress.
func (progress *ProgressObserver) Report() {
progress.Log.Debug("progress",
zap.Int64("objects", progress.ObjectCount),
zap.Int64("remote segments", progress.RemoteSegmentCount),
zap.Int64("inline segments", progress.InlineSegmentCount),
)
var m runtime.MemStats
runtime.ReadMemStats(&m)
progress.Log.Debug("memory",
zap.String("Alloc", memory.Size(int64(m.Alloc)).String()),
zap.String("TotalAlloc", memory.Size(int64(m.TotalAlloc)).String()),
zap.String("Sys", memory.Size(int64(m.Sys)).String()),
zap.Uint32("NumGC", m.NumGC),
)
}
// Object implements the Observer interface.
func (progress *ProgressObserver) Object(context.Context, *metaloop.Object) error {
progress.ObjectCount++
if progress.ObjectCount%progress.ProgressPrintFrequency == 0 {
progress.Report()
}
return nil
}
// RemoteSegment implements the Observer interface.
func (progress *ProgressObserver) RemoteSegment(context.Context, *metaloop.Segment) error {
progress.RemoteSegmentCount++
return nil
}
// InlineSegment implements the Observer interface.
func (progress *ProgressObserver) InlineSegment(context.Context, *metaloop.Segment) error {
progress.InlineSegmentCount++
return nil
}
// LoopStarted is called at each start of a loop.
func (progress *ProgressObserver) LoopStarted(ctx context.Context, info metaloop.LoopInfo) (err error) {
return nil
}

View File

@ -0,0 +1,78 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package verify
import (
"context"
"go.uber.org/zap"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/metaloop"
)
// SegmentSizes verifies segments table plain_offset and plain_size.
type SegmentSizes struct {
Log *zap.Logger
segmentState
}
type segmentState struct {
Current metabase.ObjectStream
Status metabase.ObjectStatus
ExpectedOffset int64
}
// Object implements the Observer interface.
func (verify *SegmentSizes) Object(ctx context.Context, obj *metaloop.Object) error {
verify.segmentState = segmentState{
Current: obj.ObjectStream,
Status: obj.Status,
}
return nil
}
// LoopStarted is called at each start of a loop.
func (verify *SegmentSizes) LoopStarted(ctx context.Context, info metaloop.LoopInfo) (err error) {
return nil
}
// RemoteSegment implements the Observer interface.
func (verify *SegmentSizes) RemoteSegment(ctx context.Context, seg *metaloop.Segment) error {
return verify.advanceSegment(ctx, seg)
}
// InlineSegment implements the Observer interface.
func (verify *SegmentSizes) InlineSegment(ctx context.Context, seg *metaloop.Segment) error {
return verify.advanceSegment(ctx, seg)
}
func (verify *SegmentSizes) advanceSegment(ctx context.Context, seg *metaloop.Segment) error {
if seg.PlainSize > seg.EncryptedSize {
verify.Log.Error("plain size larger than encrypted size",
zap.Any("object", formatObject(verify.Current)),
zap.Any("position", seg.Position),
zap.Int32("plain size", seg.PlainSize),
zap.Int32("encrypted size", seg.EncryptedSize))
}
if verify.Status != metabase.Committed {
return nil
}
if verify.ExpectedOffset != seg.PlainOffset {
verify.Log.Error("invalid offset",
zap.Any("object", formatObject(verify.Current)),
zap.Any("position", seg.Position),
zap.Int64("expected", verify.ExpectedOffset),
zap.Int64("actual", seg.PlainOffset))
}
verify.ExpectedOffset += int64(seg.PlainSize)
return nil
}

View File

@ -0,0 +1,77 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package verify
import (
"context"
"fmt"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/errs2"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/metaloop"
)
// Error is the default error class for the package.
var Error = errs.Class("verify")
// Chore runs different verifications on metabase loop.
type Chore struct {
Log *zap.Logger
Config Config
DB metaloop.MetabaseDB
}
// Config contains configuration for all the services.
type Config struct {
ProgressPrintFrequency int64
Loop metaloop.Config
}
// New creates new verification.
func New(log *zap.Logger, mdb metaloop.MetabaseDB, config Config) *Chore {
return &Chore{
Log: log,
Config: config,
DB: mdb,
}
}
// RunOnce creates a new metaloop and runs the verifications.
func (chore *Chore) RunOnce(ctx context.Context) error {
loop := metaloop.New(chore.Config.Loop, chore.DB)
var group errs2.Group
group.Go(func() error {
progress := &ProgressObserver{
Log: chore.Log.Named("progress"),
ProgressPrintFrequency: chore.Config.ProgressPrintFrequency,
}
plainOffset := &SegmentSizes{
Log: chore.Log.Named("segment-sizes"),
}
err := loop.Join(ctx, progress, plainOffset)
if err != nil {
return Error.Wrap(err)
}
progress.Report()
return nil
})
group.Go(func() error {
return Error.Wrap(loop.RunOnce(ctx))
})
return Error.Wrap(errs.Combine(group.Wait()...))
}
func formatObject(obj metabase.ObjectStream) string {
return fmt.Sprintf("project:%q bucket:%q key:%q stream:\"%x\"", obj.ProjectID, obj.BucketName, obj.ObjectKey, obj.StreamID[:])
}

View File

@ -44,6 +44,7 @@ type LoopObjectsIterator interface {
// LoopObjectEntry contains information about object needed by metainfo loop. // LoopObjectEntry contains information about object needed by metainfo loop.
type LoopObjectEntry struct { type LoopObjectEntry struct {
ObjectStream // metrics, repair, tally ObjectStream // metrics, repair, tally
Status ObjectStatus // verify
CreatedAt time.Time // temp used by metabase-createdat-migration CreatedAt time.Time // temp used by metabase-createdat-migration
ExpiresAt *time.Time // tally ExpiresAt *time.Time // tally
SegmentCount int32 // metrics SegmentCount int32 // metrics
@ -162,6 +163,7 @@ func (it *loopIterator) doNextQuery(ctx context.Context) (_ tagsql.Rows, err err
SELECT SELECT
project_id, bucket_name, project_id, bucket_name,
object_key, stream_id, version, object_key, stream_id, version,
status,
created_at, expires_at, created_at, expires_at,
segment_count, segment_count,
LENGTH(COALESCE(encrypted_metadata,'')) LENGTH(COALESCE(encrypted_metadata,''))
@ -181,6 +183,7 @@ func (it *loopIterator) scanItem(item *LoopObjectEntry) error {
return it.curRows.Scan( return it.curRows.Scan(
&item.ProjectID, &item.BucketName, &item.ProjectID, &item.BucketName,
&item.ObjectKey, &item.StreamID, &item.Version, &item.ObjectKey, &item.StreamID, &item.Version,
&item.Status,
&item.CreatedAt, &item.ExpiresAt, &item.CreatedAt, &item.ExpiresAt,
&item.SegmentCount, &item.SegmentCount,
&item.EncryptedMetadataSize, &item.EncryptedMetadataSize,
@ -205,6 +208,8 @@ type LoopSegmentEntry struct {
RepairedAt *time.Time // repair RepairedAt *time.Time // repair
RootPieceID storj.PieceID RootPieceID storj.PieceID
EncryptedSize int32 // size of the whole segment (not a piece) EncryptedSize int32 // size of the whole segment (not a piece)
PlainOffset int64 // verify
PlainSize int32 // verify
Redundancy storj.RedundancyScheme Redundancy storj.RedundancyScheme
Pieces Pieces Pieces Pieces
} }
@ -247,6 +252,7 @@ func (db *DB) IterateLoopStreams(ctx context.Context, opts IterateLoopStreams, h
created_at, repaired_at, created_at, repaired_at,
root_piece_id, root_piece_id,
encrypted_size, encrypted_size,
plain_offset, plain_size,
redundancy, redundancy,
remote_alias_pieces remote_alias_pieces
FROM segments FROM segments
@ -291,6 +297,7 @@ func (db *DB) IterateLoopStreams(ctx context.Context, opts IterateLoopStreams, h
&segment.CreatedAt, &segment.RepairedAt, &segment.CreatedAt, &segment.RepairedAt,
&segment.RootPieceID, &segment.RootPieceID,
&segment.EncryptedSize, &segment.EncryptedSize,
&segment.PlainOffset, &segment.PlainSize,
redundancyScheme{&segment.Redundancy}, redundancyScheme{&segment.Redundancy},
&aliasPieces, &aliasPieces,
) )

View File

@ -100,10 +100,12 @@ func TestIterateLoopObjects(t *testing.T) {
expected := []metabase.LoopObjectEntry{ expected := []metabase.LoopObjectEntry{
{ {
ObjectStream: pending, ObjectStream: pending,
Status: metabase.Pending,
CreatedAt: createdAt, CreatedAt: createdAt,
}, },
{ {
ObjectStream: committed, ObjectStream: committed,
Status: metabase.Committed,
EncryptedMetadataSize: len(encryptedMetadata), EncryptedMetadataSize: len(encryptedMetadata),
CreatedAt: createdAt, CreatedAt: createdAt,
}, },
@ -370,6 +372,8 @@ func TestIterateLoopStreams(t *testing.T) {
CreatedAt: &now, CreatedAt: &now,
RootPieceID: storj.PieceID{1}, RootPieceID: storj.PieceID{1},
EncryptedSize: 1024, EncryptedSize: 1024,
PlainSize: 512,
PlainOffset: int64(i * 512),
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}}, Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
Redundancy: defaultTestRedundancy, Redundancy: defaultTestRedundancy,
} }
@ -444,6 +448,7 @@ func createFullObjectsWithKeys(ctx *testcontext.Context, t *testing.T, db *metab
objects[key] = metabase.LoopObjectEntry{ objects[key] = metabase.LoopObjectEntry{
ObjectStream: obj, ObjectStream: obj,
Status: metabase.Committed,
CreatedAt: time.Now(), CreatedAt: time.Now(),
} }
} }
@ -454,8 +459,9 @@ func createFullObjectsWithKeys(ctx *testcontext.Context, t *testing.T, db *metab
func loopObjectEntryFromRaw(m metabase.RawObject) metabase.LoopObjectEntry { func loopObjectEntryFromRaw(m metabase.RawObject) metabase.LoopObjectEntry {
return metabase.LoopObjectEntry{ return metabase.LoopObjectEntry{
ObjectStream: m.ObjectStream, ObjectStream: m.ObjectStream,
Status: metabase.Committed,
CreatedAt: m.CreatedAt,
ExpiresAt: m.ExpiresAt, ExpiresAt: m.ExpiresAt,
SegmentCount: m.SegmentCount, SegmentCount: m.SegmentCount,
CreatedAt: m.CreatedAt,
} }
} }