From a25e35f0b0886e1c84cdb30bd56ffcf8f2c1c2bc Mon Sep 17 00:00:00 2001 From: Egon Elbre Date: Wed, 17 Feb 2021 12:46:44 +0200 Subject: [PATCH] cmd/metainfo-loop-benchmark: add benchmark Change-Id: I0745cfcf9f8c9d73fd025dcba6ee8a7480273fe2 --- cmd/metainfo-loop-benchmark/bench.go | 183 +++++++++++++++++++++++++++ cmd/metainfo-loop-benchmark/main.go | 56 ++++++++ satellite/metainfo/config.go | 4 + satellite/metainfo/loop.go | 8 +- satellite/metainfo/metabase/db.go | 8 +- 5 files changed, 254 insertions(+), 5 deletions(-) create mode 100644 cmd/metainfo-loop-benchmark/bench.go create mode 100644 cmd/metainfo-loop-benchmark/main.go diff --git a/cmd/metainfo-loop-benchmark/bench.go b/cmd/metainfo-loop-benchmark/bench.go new file mode 100644 index 000000000..2bcc1f828 --- /dev/null +++ b/cmd/metainfo-loop-benchmark/bench.go @@ -0,0 +1,183 @@ +// Copyright (C) 2021 Storj Labs, Inc. +// See LICENSE for copying information. + +package main + +import ( + "context" + "errors" + "flag" + "os" + "runtime/pprof" + "time" + + "github.com/spacemonkeygo/monkit/v3" + "github.com/zeebo/errs" + "go.uber.org/zap" + + "storj.io/common/errs2" + "storj.io/storj/satellite/metainfo" + "storj.io/storj/satellite/satellitedb" +) + +var mon = monkit.Package() + +// Error is the default error class for the package. +var Error = errs.Class("metaloop-benchmark") + +// Bench benchmarks metainfo loop performance. +type Bench struct { + CPUProfile string + Database string + MetabaseDB string + + IgnoreVersionMismatch bool + + ProgressPrintFrequency int64 + + Loop metainfo.LoopConfig +} + +// BindFlags adds bench flags to the the flagset. +func (bench *Bench) BindFlags(flag *flag.FlagSet) { + flag.StringVar(&bench.CPUProfile, "cpuprofile", "", "write cpu profile to file") + flag.StringVar(&bench.Database, "database", "", "connection URL for Database") + flag.StringVar(&bench.MetabaseDB, "metabasedb", "", "connection URL for MetabaseDB") + + flag.BoolVar(&bench.IgnoreVersionMismatch, "ignore-version-mismatch", false, "ignore version mismatch") + + flag.Int64Var(&bench.ProgressPrintFrequency, "progress.frequency", 1000000, "how often should we print progress (every object)") + + flag.DurationVar(&bench.Loop.CoalesceDuration, "loop.coalesce-duration", 5*time.Second, "how long to wait for new observers before starting iteration") + flag.Float64Var(&bench.Loop.RateLimit, "loop.rate-limit", 0, "rate limit (default is 0 which is unlimited segments per second)") + flag.IntVar(&bench.Loop.ListLimit, "loop.list-limit", 2500, "how many items to query in a batch") +} + +// VerifyFlags verifies whether the values provided are valid. +func (bench *Bench) VerifyFlags() error { + var errlist errs.Group + if bench.Database == "" { + errlist.Add(errors.New("flag '--database' is not set")) + } + if bench.MetabaseDB == "" { + errlist.Add(errors.New("flag '--metabasedb' is not set")) + } + return errlist.Err() +} + +// Run runs the benchmark. +func (bench *Bench) Run(ctx context.Context, log *zap.Logger) (err error) { + defer mon.Task()(&ctx)(&err) + + // setup profiling + + if bench.CPUProfile != "" { + f, err := os.Create(bench.CPUProfile) + if err != nil { + return err + } + err = pprof.StartCPUProfile(f) + if err != nil { + return err + } + defer pprof.StopCPUProfile() + } + + // setup databases + + db, err := satellitedb.Open(ctx, log.Named("db"), bench.Database, satellitedb.Options{ + ApplicationName: "metainfo-loop-benchmark", + }) + if err != nil { + return Error.Wrap(err) + } + defer func() { _ = db.Close() }() + + mdb, err := metainfo.OpenMetabase(ctx, log.Named("mdb"), bench.MetabaseDB) + if err != nil { + return Error.Wrap(err) + } + defer func() { _ = mdb.Close() }() + + checkDatabase := db.CheckVersion(ctx) + checkMetabase := mdb.CheckVersion(ctx) + + if checkDatabase != nil || checkMetabase != nil { + log.Error("versions skewed", + zap.Any("database version", checkDatabase), + zap.Any("metabase version", checkMetabase)) + if !bench.IgnoreVersionMismatch { + return errs.Combine(checkDatabase, checkMetabase) + } + } + + // setup metainfo loop + + var group errs2.Group + + // Passing PointerDB as nil, since metainfo.Loop actually doesn't need it. + loop := metainfo.NewLoop(bench.Loop, nil, db.Buckets(), mdb) + + group.Go(func() error { + progress := &ProgressObserver{ + Log: log.Named("progress"), + ProgressPrintFrequency: bench.ProgressPrintFrequency, + } + err := loop.Join(ctx, progress) + progress.Report() + return Error.Wrap(err) + }) + + group.Go(func() error { + err := loop.RunOnce(ctx) + return Error.Wrap(err) + }) + + // wait for loop to finish + if allErrors := group.Wait(); len(allErrors) > 0 { + return Error.Wrap(errs.Combine(allErrors...)) + } + + return nil +} + +// ProgressObserver counts and prints progress of metainfo loop. +type ProgressObserver struct { + Log *zap.Logger + + ProgressPrintFrequency int64 + + ObjectCount int64 + RemoteSegmentCount int64 + InlineSegmentCount int64 +} + +// Report reports the current progress. +func (progress *ProgressObserver) Report() { + progress.Log.Debug("progress", + zap.Int64("objects", progress.ObjectCount), + zap.Int64("remote segments", progress.RemoteSegmentCount), + zap.Int64("inline segments", progress.InlineSegmentCount), + ) +} + +// Object implements the Observer interface. +func (progress *ProgressObserver) Object(context.Context, *metainfo.Object) error { + progress.ObjectCount++ + if progress.ObjectCount%progress.ProgressPrintFrequency == 0 { + progress.Report() + } + return nil +} + +// RemoteSegment implements the Observer interface. +func (progress *ProgressObserver) RemoteSegment(context.Context, *metainfo.Segment) error { + progress.RemoteSegmentCount++ + return nil +} + +// InlineSegment implements the Observer interface. +func (progress *ProgressObserver) InlineSegment(context.Context, *metainfo.Segment) error { + progress.InlineSegmentCount++ + return nil +} diff --git a/cmd/metainfo-loop-benchmark/main.go b/cmd/metainfo-loop-benchmark/main.go new file mode 100644 index 000000000..da16f0436 --- /dev/null +++ b/cmd/metainfo-loop-benchmark/main.go @@ -0,0 +1,56 @@ +// Copyright (C) 2021 Storj Labs, Inc. +// See LICENSE for copying information. + +package main + +import ( + "context" + "flag" + "fmt" + "os" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +func main() { + var bench Bench + bench.BindFlags(flag.CommandLine) + flag.Parse() + if err := bench.VerifyFlags(); err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + + ctx := context.Background() + + log, err := zap.Config{ + Encoding: "console", + Level: zap.NewAtomicLevelAt(zapcore.DebugLevel), + OutputPaths: []string{"stdout"}, + ErrorOutputPaths: []string{"stdout"}, + EncoderConfig: zapcore.EncoderConfig{ + LevelKey: "L", + NameKey: "N", + TimeKey: "T", + CallerKey: "C", + MessageKey: "M", + StacktraceKey: "S", + LineEnding: zapcore.DefaultLineEnding, + EncodeLevel: zapcore.CapitalLevelEncoder, + EncodeTime: zapcore.ISO8601TimeEncoder, + EncodeDuration: zapcore.StringDurationEncoder, + EncodeCaller: zapcore.ShortCallerEncoder, + }, + }.Build() + if err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + + if err := bench.Run(ctx, log); err != nil { + _ = log.Sync() + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } +} diff --git a/satellite/metainfo/config.go b/satellite/metainfo/config.go index f236be63d..c0e3c0477 100644 --- a/satellite/metainfo/config.go +++ b/satellite/metainfo/config.go @@ -167,8 +167,12 @@ func OpenStore(ctx context.Context, logger *zap.Logger, dbURLString string, app // MetabaseDB stores objects and segments. type MetabaseDB interface { io.Closer + // MigrateToLatest migrates to latest schema version. MigrateToLatest(ctx context.Context) error + // CheckVersion checks the database is the correct version + CheckVersion(ctx context.Context) error + // DeleteObjectAnyStatusAllVersions deletes all object versions. DeleteObjectAnyStatusAllVersions(ctx context.Context, opts metabase.DeleteObjectAnyStatusAllVersions) (result metabase.DeleteObjectResult, err error) // DeleteObjectsAllVersions deletes all versions of multiple objects from the same bucket. diff --git a/satellite/metainfo/loop.go b/satellite/metainfo/loop.go index 0aa55ccdc..8b654986c 100644 --- a/satellite/metainfo/loop.go +++ b/satellite/metainfo/loop.go @@ -219,7 +219,7 @@ func (loop *Loop) Run(ctx context.Context) (err error) { defer mon.Task()(&ctx)(&err) for { - err := loop.runOnce(ctx) + err := loop.RunOnce(ctx) if err != nil { return err } @@ -232,8 +232,10 @@ func (loop *Loop) Close() (err error) { return nil } -// runOnce goes through metainfo one time and sends information to observers. -func (loop *Loop) runOnce(ctx context.Context) (err error) { +// RunOnce goes through metainfo one time and sends information to observers. +// +// It is not safe to call this concurrently with Run. +func (loop *Loop) RunOnce(ctx context.Context) (err error) { defer mon.Task()(&ctx)(&err) var observers []*observerContext diff --git a/satellite/metainfo/metabase/db.go b/satellite/metainfo/metabase/db.go index ecc9dab21..c7089b78e 100644 --- a/satellite/metainfo/metabase/db.go +++ b/satellite/metainfo/metabase/db.go @@ -80,13 +80,17 @@ func (db *DB) DestroyTables(ctx context.Context) error { } // MigrateToLatest migrates database to the latest version. -// -// TODO: use migrate package. func (db *DB) MigrateToLatest(ctx context.Context) error { migration := db.PostgresMigration() return migration.Run(ctx, db.log.Named("migrate")) } +// CheckVersion checks the database is the correct version. +func (db *DB) CheckVersion(ctx context.Context) error { + migration := db.PostgresMigration() + return migration.ValidateVersions(ctx, db.log) +} + // PostgresMigration returns steps needed for migrating postgres database. func (db *DB) PostgresMigration() *migrate.Migration { // TODO: merge this with satellite migration code or a way to keep them in sync.