diff --git a/cmd/metabase-verify/main.go b/cmd/metabase-verify/main.go new file mode 100644 index 000000000..4f7723abb --- /dev/null +++ b/cmd/metabase-verify/main.go @@ -0,0 +1,84 @@ +// Copyright (C) 2021 Storj Labs, Inc. +// See LICENSE for copying information. + +package main + +import ( + "time" + + "github.com/spf13/cobra" + "github.com/zeebo/errs" + "go.uber.org/zap" + + "storj.io/private/process" + "storj.io/storj/cmd/metabase-verify/verify" + "storj.io/storj/satellite/metainfo" +) + +// Error is the default error class for the package. +var Error = errs.Class("metabase-verify") + +func main() { + log := zap.L() + + root := &cobra.Command{ + Use: "metainfo-loop-verify", + } + IncludeProfiling(root) + + root.AddCommand(VerifyCommand(log)) + + process.Exec(root) +} + +// VerifyCommand creates command for running verifications. +func VerifyCommand(log *zap.Logger) *cobra.Command { + var metabaseDB string + var ignoreVersionMismatch bool + var verifyConfig verify.Config + + cmd := &cobra.Command{ + Use: "run", + Short: "run metabase verification", + } + + flag := cmd.Flags() + + flag.StringVar(&metabaseDB, "metabasedb", "", "connection URL for MetabaseDB") + _ = cmd.MarkFlagRequired("metabasedb") + + flag.BoolVar(&ignoreVersionMismatch, "ignore-version-mismatch", false, "ignore version mismatch") + + flag.DurationVar(&verifyConfig.Loop.CoalesceDuration, "loop.coalesce-duration", 5*time.Second, "how long to wait for new observers before starting iteration") + flag.Float64Var(&verifyConfig.Loop.RateLimit, "loop.rate-limit", 0, "rate limit (default is 0 which is unlimited segments per second)") + flag.IntVar(&verifyConfig.Loop.ListLimit, "loop.list-limit", 2500, "how many items to query in a batch") + + flag.Int64Var(&verifyConfig.ProgressPrintFrequency, "progress-frequency", 1000000, "how often should we print progress (every object)") + + cmd.RunE = func(cmd *cobra.Command, args []string) error { + ctx, cancel := process.Ctx(cmd) + defer cancel() + + mdb, err := metainfo.OpenMetabase(ctx, log.Named("mdb"), metabaseDB) + if err != nil { + return Error.Wrap(err) + } + defer func() { _ = mdb.Close() }() + + versionErr := mdb.CheckVersion(ctx) + if versionErr != nil { + log.Error("versions skewed", zap.Error(versionErr)) + if !ignoreVersionMismatch { + return Error.Wrap(versionErr) + } + } + + verify := verify.New(log, mdb, verifyConfig) + if err := verify.RunOnce(ctx); err != nil { + return Error.Wrap(err) + } + return nil + } + + return cmd +} diff --git a/cmd/metabase-verify/profile.go b/cmd/metabase-verify/profile.go new file mode 100644 index 000000000..63edc38aa --- /dev/null +++ b/cmd/metabase-verify/profile.go @@ -0,0 +1,69 @@ +// Copyright (C) 2021 Storj Labs, Inc. +// See LICENSE for copying information. + +package main + +import ( + "os" + "runtime/pprof" + + "github.com/spf13/cobra" + "github.com/zeebo/errs" +) + +var errProfile = errs.Class("profile") + +// IncludeProfiling adds persistent profiling to cmd. +func IncludeProfiling(cmd *cobra.Command) { + var path string + var profile *CPUProfile + + flag := cmd.PersistentFlags() + flag.StringVar(&path, "cpuprofile", "", "write cpu profile to file") + + preRunE := cmd.PersistentPreRunE + cmd.PersistentPreRunE = func(cmd *cobra.Command, args []string) (err error) { + profile, err = NewProfile(path) + if err != nil { + return err + } + if preRunE != nil { + return preRunE(cmd, args) + } + return nil + } + postRunE := cmd.PersistentPostRunE + cmd.PersistentPostRunE = func(cmd *cobra.Command, args []string) (err error) { + if postRunE != nil { + return postRunE(cmd, args) + } + profile.Close() + return nil + } +} + +// CPUProfile contains active profiling information. +type CPUProfile struct{ file *os.File } + +// NewProfile starts a new profile on `path`. +func NewProfile(path string) (*CPUProfile, error) { + if path == "" { + return nil, nil + } + + f, err := os.Create(path) + if err != nil { + return nil, errProfile.New("unable to create file: %w", err) + } + + err = pprof.StartCPUProfile(f) + return &CPUProfile{file: f}, Error.Wrap(err) +} + +// Close finishes the profile. +func (p *CPUProfile) Close() { + if p == nil || p.file == nil { + return + } + pprof.StopCPUProfile() +} diff --git a/cmd/metabase-verify/verify/progress.go b/cmd/metabase-verify/verify/progress.go new file mode 100644 index 000000000..bdd60810a --- /dev/null +++ b/cmd/metabase-verify/verify/progress.go @@ -0,0 +1,69 @@ +// Copyright (C) 2021 Storj Labs, Inc. +// See LICENSE for copying information. + +package verify + +import ( + "context" + "runtime" + + "go.uber.org/zap" + + "storj.io/common/memory" + "storj.io/storj/satellite/metabase/metaloop" +) + +// ProgressObserver counts and prints progress of metabase loop. +type ProgressObserver struct { + Log *zap.Logger + + ProgressPrintFrequency int64 + + ObjectCount int64 + RemoteSegmentCount int64 + InlineSegmentCount int64 +} + +// Report reports the current progress. +func (progress *ProgressObserver) Report() { + progress.Log.Debug("progress", + zap.Int64("objects", progress.ObjectCount), + zap.Int64("remote segments", progress.RemoteSegmentCount), + zap.Int64("inline segments", progress.InlineSegmentCount), + ) + + var m runtime.MemStats + runtime.ReadMemStats(&m) + progress.Log.Debug("memory", + zap.String("Alloc", memory.Size(int64(m.Alloc)).String()), + zap.String("TotalAlloc", memory.Size(int64(m.TotalAlloc)).String()), + zap.String("Sys", memory.Size(int64(m.Sys)).String()), + zap.Uint32("NumGC", m.NumGC), + ) +} + +// Object implements the Observer interface. +func (progress *ProgressObserver) Object(context.Context, *metaloop.Object) error { + progress.ObjectCount++ + if progress.ObjectCount%progress.ProgressPrintFrequency == 0 { + progress.Report() + } + return nil +} + +// RemoteSegment implements the Observer interface. +func (progress *ProgressObserver) RemoteSegment(context.Context, *metaloop.Segment) error { + progress.RemoteSegmentCount++ + return nil +} + +// InlineSegment implements the Observer interface. +func (progress *ProgressObserver) InlineSegment(context.Context, *metaloop.Segment) error { + progress.InlineSegmentCount++ + return nil +} + +// LoopStarted is called at each start of a loop. +func (progress *ProgressObserver) LoopStarted(ctx context.Context, info metaloop.LoopInfo) (err error) { + return nil +} diff --git a/cmd/metabase-verify/verify/segmentsizes.go b/cmd/metabase-verify/verify/segmentsizes.go new file mode 100644 index 000000000..f8fe29215 --- /dev/null +++ b/cmd/metabase-verify/verify/segmentsizes.go @@ -0,0 +1,78 @@ +// Copyright (C) 2021 Storj Labs, Inc. +// See LICENSE for copying information. + +package verify + +import ( + "context" + + "go.uber.org/zap" + + "storj.io/storj/satellite/metabase" + "storj.io/storj/satellite/metabase/metaloop" +) + +// SegmentSizes verifies segments table plain_offset and plain_size. +type SegmentSizes struct { + Log *zap.Logger + + segmentState +} + +type segmentState struct { + Current metabase.ObjectStream + Status metabase.ObjectStatus + + ExpectedOffset int64 +} + +// Object implements the Observer interface. +func (verify *SegmentSizes) Object(ctx context.Context, obj *metaloop.Object) error { + verify.segmentState = segmentState{ + Current: obj.ObjectStream, + Status: obj.Status, + } + return nil +} + +// LoopStarted is called at each start of a loop. +func (verify *SegmentSizes) LoopStarted(ctx context.Context, info metaloop.LoopInfo) (err error) { + return nil +} + +// RemoteSegment implements the Observer interface. +func (verify *SegmentSizes) RemoteSegment(ctx context.Context, seg *metaloop.Segment) error { + return verify.advanceSegment(ctx, seg) +} + +// InlineSegment implements the Observer interface. +func (verify *SegmentSizes) InlineSegment(ctx context.Context, seg *metaloop.Segment) error { + return verify.advanceSegment(ctx, seg) +} + +func (verify *SegmentSizes) advanceSegment(ctx context.Context, seg *metaloop.Segment) error { + if seg.PlainSize > seg.EncryptedSize { + verify.Log.Error("plain size larger than encrypted size", + zap.Any("object", formatObject(verify.Current)), + zap.Any("position", seg.Position), + + zap.Int32("plain size", seg.PlainSize), + zap.Int32("encrypted size", seg.EncryptedSize)) + } + + if verify.Status != metabase.Committed { + return nil + } + + if verify.ExpectedOffset != seg.PlainOffset { + verify.Log.Error("invalid offset", + zap.Any("object", formatObject(verify.Current)), + zap.Any("position", seg.Position), + + zap.Int64("expected", verify.ExpectedOffset), + zap.Int64("actual", seg.PlainOffset)) + } + verify.ExpectedOffset += int64(seg.PlainSize) + + return nil +} diff --git a/cmd/metabase-verify/verify/verify.go b/cmd/metabase-verify/verify/verify.go new file mode 100644 index 000000000..017f7657d --- /dev/null +++ b/cmd/metabase-verify/verify/verify.go @@ -0,0 +1,77 @@ +// Copyright (C) 2021 Storj Labs, Inc. +// See LICENSE for copying information. + +package verify + +import ( + "context" + "fmt" + + "github.com/zeebo/errs" + "go.uber.org/zap" + + "storj.io/common/errs2" + "storj.io/storj/satellite/metabase" + "storj.io/storj/satellite/metabase/metaloop" +) + +// Error is the default error class for the package. +var Error = errs.Class("verify") + +// Chore runs different verifications on metabase loop. +type Chore struct { + Log *zap.Logger + + Config Config + + DB metaloop.MetabaseDB +} + +// Config contains configuration for all the services. +type Config struct { + ProgressPrintFrequency int64 + Loop metaloop.Config +} + +// New creates new verification. +func New(log *zap.Logger, mdb metaloop.MetabaseDB, config Config) *Chore { + return &Chore{ + Log: log, + Config: config, + DB: mdb, + } +} + +// RunOnce creates a new metaloop and runs the verifications. +func (chore *Chore) RunOnce(ctx context.Context) error { + loop := metaloop.New(chore.Config.Loop, chore.DB) + + var group errs2.Group + group.Go(func() error { + progress := &ProgressObserver{ + Log: chore.Log.Named("progress"), + ProgressPrintFrequency: chore.Config.ProgressPrintFrequency, + } + + plainOffset := &SegmentSizes{ + Log: chore.Log.Named("segment-sizes"), + } + + err := loop.Join(ctx, progress, plainOffset) + if err != nil { + return Error.Wrap(err) + } + + progress.Report() + + return nil + }) + group.Go(func() error { + return Error.Wrap(loop.RunOnce(ctx)) + }) + return Error.Wrap(errs.Combine(group.Wait()...)) +} + +func formatObject(obj metabase.ObjectStream) string { + return fmt.Sprintf("project:%q bucket:%q key:%q stream:\"%x\"", obj.ProjectID, obj.BucketName, obj.ObjectKey, obj.StreamID[:]) +} diff --git a/satellite/metabase/loop.go b/satellite/metabase/loop.go index 4da751d79..c79232211 100644 --- a/satellite/metabase/loop.go +++ b/satellite/metabase/loop.go @@ -43,11 +43,12 @@ type LoopObjectsIterator interface { // LoopObjectEntry contains information about object needed by metainfo loop. type LoopObjectEntry struct { - ObjectStream // metrics, repair, tally - CreatedAt time.Time // temp used by metabase-createdat-migration - ExpiresAt *time.Time // tally - SegmentCount int32 // metrics - EncryptedMetadataSize int // tally + ObjectStream // metrics, repair, tally + Status ObjectStatus // verify + CreatedAt time.Time // temp used by metabase-createdat-migration + ExpiresAt *time.Time // tally + SegmentCount int32 // metrics + EncryptedMetadataSize int // tally } // IterateLoopObjects iterates through all objects in metabase. @@ -162,6 +163,7 @@ func (it *loopIterator) doNextQuery(ctx context.Context) (_ tagsql.Rows, err err SELECT project_id, bucket_name, object_key, stream_id, version, + status, created_at, expires_at, segment_count, LENGTH(COALESCE(encrypted_metadata,'')) @@ -181,6 +183,7 @@ func (it *loopIterator) scanItem(item *LoopObjectEntry) error { return it.curRows.Scan( &item.ProjectID, &item.BucketName, &item.ObjectKey, &item.StreamID, &item.Version, + &item.Status, &item.CreatedAt, &item.ExpiresAt, &item.SegmentCount, &item.EncryptedMetadataSize, @@ -205,6 +208,8 @@ type LoopSegmentEntry struct { RepairedAt *time.Time // repair RootPieceID storj.PieceID EncryptedSize int32 // size of the whole segment (not a piece) + PlainOffset int64 // verify + PlainSize int32 // verify Redundancy storj.RedundancyScheme Pieces Pieces } @@ -247,6 +252,7 @@ func (db *DB) IterateLoopStreams(ctx context.Context, opts IterateLoopStreams, h created_at, repaired_at, root_piece_id, encrypted_size, + plain_offset, plain_size, redundancy, remote_alias_pieces FROM segments @@ -291,6 +297,7 @@ func (db *DB) IterateLoopStreams(ctx context.Context, opts IterateLoopStreams, h &segment.CreatedAt, &segment.RepairedAt, &segment.RootPieceID, &segment.EncryptedSize, + &segment.PlainOffset, &segment.PlainSize, redundancyScheme{&segment.Redundancy}, &aliasPieces, ) diff --git a/satellite/metabase/loop_test.go b/satellite/metabase/loop_test.go index 60783de5f..da26b36f9 100644 --- a/satellite/metabase/loop_test.go +++ b/satellite/metabase/loop_test.go @@ -100,10 +100,12 @@ func TestIterateLoopObjects(t *testing.T) { expected := []metabase.LoopObjectEntry{ { ObjectStream: pending, + Status: metabase.Pending, CreatedAt: createdAt, }, { ObjectStream: committed, + Status: metabase.Committed, EncryptedMetadataSize: len(encryptedMetadata), CreatedAt: createdAt, }, @@ -370,6 +372,8 @@ func TestIterateLoopStreams(t *testing.T) { CreatedAt: &now, RootPieceID: storj.PieceID{1}, EncryptedSize: 1024, + PlainSize: 512, + PlainOffset: int64(i * 512), Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}}, Redundancy: defaultTestRedundancy, } @@ -444,6 +448,7 @@ func createFullObjectsWithKeys(ctx *testcontext.Context, t *testing.T, db *metab objects[key] = metabase.LoopObjectEntry{ ObjectStream: obj, + Status: metabase.Committed, CreatedAt: time.Now(), } } @@ -454,8 +459,9 @@ func createFullObjectsWithKeys(ctx *testcontext.Context, t *testing.T, db *metab func loopObjectEntryFromRaw(m metabase.RawObject) metabase.LoopObjectEntry { return metabase.LoopObjectEntry{ ObjectStream: m.ObjectStream, + Status: metabase.Committed, + CreatedAt: m.CreatedAt, ExpiresAt: m.ExpiresAt, SegmentCount: m.SegmentCount, - CreatedAt: m.CreatedAt, } }