diff --git a/cmd/metainfo-loop-benchmark/bench.go b/cmd/metainfo-loop-benchmark/bench.go index 2bcc1f828..285644767 100644 --- a/cmd/metainfo-loop-benchmark/bench.go +++ b/cmd/metainfo-loop-benchmark/bench.go @@ -17,7 +17,6 @@ import ( "storj.io/common/errs2" "storj.io/storj/satellite/metainfo" - "storj.io/storj/satellite/satellitedb" ) var mon = monkit.Package() @@ -28,7 +27,6 @@ var Error = errs.Class("metaloop-benchmark") // Bench benchmarks metainfo loop performance. type Bench struct { CPUProfile string - Database string MetabaseDB string IgnoreVersionMismatch bool @@ -41,7 +39,6 @@ type Bench struct { // BindFlags adds bench flags to the the flagset. func (bench *Bench) BindFlags(flag *flag.FlagSet) { flag.StringVar(&bench.CPUProfile, "cpuprofile", "", "write cpu profile to file") - flag.StringVar(&bench.Database, "database", "", "connection URL for Database") flag.StringVar(&bench.MetabaseDB, "metabasedb", "", "connection URL for MetabaseDB") flag.BoolVar(&bench.IgnoreVersionMismatch, "ignore-version-mismatch", false, "ignore version mismatch") @@ -56,9 +53,6 @@ func (bench *Bench) BindFlags(flag *flag.FlagSet) { // VerifyFlags verifies whether the values provided are valid. func (bench *Bench) VerifyFlags() error { var errlist errs.Group - if bench.Database == "" { - errlist.Add(errors.New("flag '--database' is not set")) - } if bench.MetabaseDB == "" { errlist.Add(errors.New("flag '--metabasedb' is not set")) } @@ -85,29 +79,18 @@ func (bench *Bench) Run(ctx context.Context, log *zap.Logger) (err error) { // setup databases - db, err := satellitedb.Open(ctx, log.Named("db"), bench.Database, satellitedb.Options{ - ApplicationName: "metainfo-loop-benchmark", - }) - if err != nil { - return Error.Wrap(err) - } - defer func() { _ = db.Close() }() - mdb, err := metainfo.OpenMetabase(ctx, log.Named("mdb"), bench.MetabaseDB) if err != nil { return Error.Wrap(err) } defer func() { _ = mdb.Close() }() - checkDatabase := db.CheckVersion(ctx) checkMetabase := mdb.CheckVersion(ctx) - if checkDatabase != nil || checkMetabase != nil { - log.Error("versions skewed", - zap.Any("database version", checkDatabase), - zap.Any("metabase version", checkMetabase)) + if checkMetabase != nil { + log.Error("versions skewed", zap.Any("metabase version", checkMetabase)) if !bench.IgnoreVersionMismatch { - return errs.Combine(checkDatabase, checkMetabase) + return checkMetabase } } @@ -116,7 +99,7 @@ func (bench *Bench) Run(ctx context.Context, log *zap.Logger) (err error) { var group errs2.Group // Passing PointerDB as nil, since metainfo.Loop actually doesn't need it. - loop := metainfo.NewLoop(bench.Loop, nil, db.Buckets(), mdb) + loop := metainfo.NewLoop(bench.Loop, mdb) group.Go(func() error { progress := &ProgressObserver{ diff --git a/satellite/core.go b/satellite/core.go index 387b973e3..351835f59 100644 --- a/satellite/core.go +++ b/satellite/core.go @@ -282,8 +282,6 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, ) peer.Metainfo.Loop = metainfo.NewLoop( config.Metainfo.Loop, - peer.Metainfo.Database, - peer.DB.Buckets(), peer.Metainfo.Metabase, ) peer.Services.Add(lifecycle.Item{ diff --git a/satellite/gc.go b/satellite/gc.go index 52ff697eb..8d04a16f4 100644 --- a/satellite/gc.go +++ b/satellite/gc.go @@ -142,8 +142,6 @@ func NewGarbageCollection(log *zap.Logger, full *identity.FullIdentity, db DB, // the metainfo loop will only run when the garbage collection joins (which happens every GarbageCollection.Interval) peer.Metainfo.Loop = metainfo.NewLoop( config.Metainfo.Loop, - peer.Metainfo.Database, - peer.DB.Buckets(), metabaseDB, ) peer.Services.Add(lifecycle.Item{ diff --git a/satellite/metainfo/loop.go b/satellite/metainfo/loop.go index 8b654986c..96917ddb8 100644 --- a/satellite/metainfo/loop.go +++ b/satellite/metainfo/loop.go @@ -166,18 +166,14 @@ type LoopConfig struct { // architecture: Service type Loop struct { config LoopConfig - db PointerDB - bucketsDB BucketsDB metabaseDB MetabaseDB join chan []*observerContext done chan struct{} } // NewLoop creates a new metainfo loop service. -func NewLoop(config LoopConfig, db PointerDB, bucketsDB BucketsDB, metabaseDB MetabaseDB) *Loop { +func NewLoop(config LoopConfig, metabaseDB MetabaseDB) *Loop { return &Loop{ - db: db, - bucketsDB: bucketsDB, metabaseDB: metabaseDB, config: config, join: make(chan []*observerContext), @@ -262,18 +258,18 @@ waitformore: return ctx.Err() } } - return iterateDatabase(ctx, loop.db, loop.bucketsDB, loop.metabaseDB, observers, loop.config.ListLimit, rate.NewLimiter(rate.Limit(loop.config.RateLimit), 1)) + return iterateDatabase(ctx, loop.metabaseDB, observers, loop.config.ListLimit, rate.NewLimiter(rate.Limit(loop.config.RateLimit), 1)) } // IterateDatabase iterates over PointerDB and notifies specified observers about results. // // It uses 10000 as the lookup limit for iterating. -func IterateDatabase(ctx context.Context, rateLimit float64, db PointerDB, bucketsDB BucketsDB, metabaseDB MetabaseDB, observers ...Observer) error { +func IterateDatabase(ctx context.Context, rateLimit float64, metabaseDB MetabaseDB, observers ...Observer) error { obsContexts := make([]*observerContext, len(observers)) for i, observer := range observers { obsContexts[i] = newObserverContext(ctx, observer) } - return iterateDatabase(ctx, db, bucketsDB, metabaseDB, obsContexts, 10000, rate.NewLimiter(rate.Limit(rateLimit), 1)) + return iterateDatabase(ctx, metabaseDB, obsContexts, 10000, rate.NewLimiter(rate.Limit(rateLimit), 1)) } // Wait waits for run to be finished. @@ -282,7 +278,7 @@ func (loop *Loop) Wait() { <-loop.done } -func iterateDatabase(ctx context.Context, db PointerDB, bucketsDB BucketsDB, metabaseDB MetabaseDB, observers []*observerContext, limit int, rateLimiter *rate.Limiter) (err error) { +func iterateDatabase(ctx context.Context, metabaseDB MetabaseDB, observers []*observerContext, limit int, rateLimiter *rate.Limiter) (err error) { defer func() { if err != nil { for _, observer := range observers { diff --git a/satellite/metainfo/loop_test.go b/satellite/metainfo/loop_test.go index 3537643c2..43eb0e8ef 100644 --- a/satellite/metainfo/loop_test.go +++ b/satellite/metainfo/loop_test.go @@ -260,7 +260,7 @@ func TestLoopCancel(t *testing.T) { metaLoop := metainfo.NewLoop(metainfo.LoopConfig{ CoalesceDuration: 1 * time.Second, ListLimit: 10000, - }, satellite.Metainfo.Database, satellite.DB.Buckets(), satellite.Metainfo.Metabase) + }, satellite.Metainfo.Metabase) // create a cancelable context to pass into metaLoop.Run loopCtx, cancel := context.WithCancel(ctx) diff --git a/satellite/metainfo/metabase/fulliterator.go b/satellite/metainfo/metabase/fulliterator.go index ae88e1b78..86c0ffd1c 100644 --- a/satellite/metainfo/metabase/fulliterator.go +++ b/satellite/metainfo/metabase/fulliterator.go @@ -14,6 +14,8 @@ import ( "storj.io/storj/private/tagsql" ) +const fullIteratorBatchSizeLimit = 2500 + // FullObjectEntry contains information about and object in metabase. type FullObjectEntry struct { ObjectStream @@ -76,8 +78,8 @@ func (db *DB) FullIterateObjects(ctx context.Context, opts FullIterateObjects, f } // ensure batch size is reasonable - if it.batchSize <= 0 || it.batchSize > batchsizeLimit { - it.batchSize = batchsizeLimit + if it.batchSize <= 0 || it.batchSize > fullIteratorBatchSizeLimit { + it.batchSize = fullIteratorBatchSizeLimit } it.curRows, err = it.doNextQuery(ctx) diff --git a/satellite/repair/checker/checker.go b/satellite/repair/checker/checker.go index 6ae06e1cd..b4c788edd 100644 --- a/satellite/repair/checker/checker.go +++ b/satellite/repair/checker/checker.go @@ -476,7 +476,7 @@ func (obs *checkerObserver) InlineSegment(ctx context.Context, segment *metainfo // TODO: check for expired segments if !obs.objectCounted { - // Note: this may give may give false stats when an object starts with a inline segment. + // Note: this may give false stats when an object starts with a inline segment. obs.objectCounted = true stats := obs.getStatsByRS(storj.RedundancyScheme{}) stats.iterationAggregates.objectsChecked++