satellite/metainfo: remove unneeded dependencies from Loop
metainfo.Loop doesn't require buckets nor pointerdb anymore. Also: * fix comments * update full iterator limit to 2500 Change-Id: I6604402868f5c34079197c407f969ac8015e63c5
This commit is contained in:
parent
a25e35f0b0
commit
4c0ea717eb
@ -17,7 +17,6 @@ import (
|
||||
|
||||
"storj.io/common/errs2"
|
||||
"storj.io/storj/satellite/metainfo"
|
||||
"storj.io/storj/satellite/satellitedb"
|
||||
)
|
||||
|
||||
var mon = monkit.Package()
|
||||
@ -28,7 +27,6 @@ var Error = errs.Class("metaloop-benchmark")
|
||||
// Bench benchmarks metainfo loop performance.
|
||||
type Bench struct {
|
||||
CPUProfile string
|
||||
Database string
|
||||
MetabaseDB string
|
||||
|
||||
IgnoreVersionMismatch bool
|
||||
@ -41,7 +39,6 @@ type Bench struct {
|
||||
// BindFlags adds bench flags to the the flagset.
|
||||
func (bench *Bench) BindFlags(flag *flag.FlagSet) {
|
||||
flag.StringVar(&bench.CPUProfile, "cpuprofile", "", "write cpu profile to file")
|
||||
flag.StringVar(&bench.Database, "database", "", "connection URL for Database")
|
||||
flag.StringVar(&bench.MetabaseDB, "metabasedb", "", "connection URL for MetabaseDB")
|
||||
|
||||
flag.BoolVar(&bench.IgnoreVersionMismatch, "ignore-version-mismatch", false, "ignore version mismatch")
|
||||
@ -56,9 +53,6 @@ func (bench *Bench) BindFlags(flag *flag.FlagSet) {
|
||||
// VerifyFlags verifies whether the values provided are valid.
|
||||
func (bench *Bench) VerifyFlags() error {
|
||||
var errlist errs.Group
|
||||
if bench.Database == "" {
|
||||
errlist.Add(errors.New("flag '--database' is not set"))
|
||||
}
|
||||
if bench.MetabaseDB == "" {
|
||||
errlist.Add(errors.New("flag '--metabasedb' is not set"))
|
||||
}
|
||||
@ -85,29 +79,18 @@ func (bench *Bench) Run(ctx context.Context, log *zap.Logger) (err error) {
|
||||
|
||||
// setup databases
|
||||
|
||||
db, err := satellitedb.Open(ctx, log.Named("db"), bench.Database, satellitedb.Options{
|
||||
ApplicationName: "metainfo-loop-benchmark",
|
||||
})
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
defer func() { _ = db.Close() }()
|
||||
|
||||
mdb, err := metainfo.OpenMetabase(ctx, log.Named("mdb"), bench.MetabaseDB)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
defer func() { _ = mdb.Close() }()
|
||||
|
||||
checkDatabase := db.CheckVersion(ctx)
|
||||
checkMetabase := mdb.CheckVersion(ctx)
|
||||
|
||||
if checkDatabase != nil || checkMetabase != nil {
|
||||
log.Error("versions skewed",
|
||||
zap.Any("database version", checkDatabase),
|
||||
zap.Any("metabase version", checkMetabase))
|
||||
if checkMetabase != nil {
|
||||
log.Error("versions skewed", zap.Any("metabase version", checkMetabase))
|
||||
if !bench.IgnoreVersionMismatch {
|
||||
return errs.Combine(checkDatabase, checkMetabase)
|
||||
return checkMetabase
|
||||
}
|
||||
}
|
||||
|
||||
@ -116,7 +99,7 @@ func (bench *Bench) Run(ctx context.Context, log *zap.Logger) (err error) {
|
||||
var group errs2.Group
|
||||
|
||||
// Passing PointerDB as nil, since metainfo.Loop actually doesn't need it.
|
||||
loop := metainfo.NewLoop(bench.Loop, nil, db.Buckets(), mdb)
|
||||
loop := metainfo.NewLoop(bench.Loop, mdb)
|
||||
|
||||
group.Go(func() error {
|
||||
progress := &ProgressObserver{
|
||||
|
@ -282,8 +282,6 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
|
||||
)
|
||||
peer.Metainfo.Loop = metainfo.NewLoop(
|
||||
config.Metainfo.Loop,
|
||||
peer.Metainfo.Database,
|
||||
peer.DB.Buckets(),
|
||||
peer.Metainfo.Metabase,
|
||||
)
|
||||
peer.Services.Add(lifecycle.Item{
|
||||
|
@ -142,8 +142,6 @@ func NewGarbageCollection(log *zap.Logger, full *identity.FullIdentity, db DB,
|
||||
// the metainfo loop will only run when the garbage collection joins (which happens every GarbageCollection.Interval)
|
||||
peer.Metainfo.Loop = metainfo.NewLoop(
|
||||
config.Metainfo.Loop,
|
||||
peer.Metainfo.Database,
|
||||
peer.DB.Buckets(),
|
||||
metabaseDB,
|
||||
)
|
||||
peer.Services.Add(lifecycle.Item{
|
||||
|
@ -166,18 +166,14 @@ type LoopConfig struct {
|
||||
// architecture: Service
|
||||
type Loop struct {
|
||||
config LoopConfig
|
||||
db PointerDB
|
||||
bucketsDB BucketsDB
|
||||
metabaseDB MetabaseDB
|
||||
join chan []*observerContext
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
// NewLoop creates a new metainfo loop service.
|
||||
func NewLoop(config LoopConfig, db PointerDB, bucketsDB BucketsDB, metabaseDB MetabaseDB) *Loop {
|
||||
func NewLoop(config LoopConfig, metabaseDB MetabaseDB) *Loop {
|
||||
return &Loop{
|
||||
db: db,
|
||||
bucketsDB: bucketsDB,
|
||||
metabaseDB: metabaseDB,
|
||||
config: config,
|
||||
join: make(chan []*observerContext),
|
||||
@ -262,18 +258,18 @@ waitformore:
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
return iterateDatabase(ctx, loop.db, loop.bucketsDB, loop.metabaseDB, observers, loop.config.ListLimit, rate.NewLimiter(rate.Limit(loop.config.RateLimit), 1))
|
||||
return iterateDatabase(ctx, loop.metabaseDB, observers, loop.config.ListLimit, rate.NewLimiter(rate.Limit(loop.config.RateLimit), 1))
|
||||
}
|
||||
|
||||
// IterateDatabase iterates over PointerDB and notifies specified observers about results.
|
||||
//
|
||||
// It uses 10000 as the lookup limit for iterating.
|
||||
func IterateDatabase(ctx context.Context, rateLimit float64, db PointerDB, bucketsDB BucketsDB, metabaseDB MetabaseDB, observers ...Observer) error {
|
||||
func IterateDatabase(ctx context.Context, rateLimit float64, metabaseDB MetabaseDB, observers ...Observer) error {
|
||||
obsContexts := make([]*observerContext, len(observers))
|
||||
for i, observer := range observers {
|
||||
obsContexts[i] = newObserverContext(ctx, observer)
|
||||
}
|
||||
return iterateDatabase(ctx, db, bucketsDB, metabaseDB, obsContexts, 10000, rate.NewLimiter(rate.Limit(rateLimit), 1))
|
||||
return iterateDatabase(ctx, metabaseDB, obsContexts, 10000, rate.NewLimiter(rate.Limit(rateLimit), 1))
|
||||
}
|
||||
|
||||
// Wait waits for run to be finished.
|
||||
@ -282,7 +278,7 @@ func (loop *Loop) Wait() {
|
||||
<-loop.done
|
||||
}
|
||||
|
||||
func iterateDatabase(ctx context.Context, db PointerDB, bucketsDB BucketsDB, metabaseDB MetabaseDB, observers []*observerContext, limit int, rateLimiter *rate.Limiter) (err error) {
|
||||
func iterateDatabase(ctx context.Context, metabaseDB MetabaseDB, observers []*observerContext, limit int, rateLimiter *rate.Limiter) (err error) {
|
||||
defer func() {
|
||||
if err != nil {
|
||||
for _, observer := range observers {
|
||||
|
@ -260,7 +260,7 @@ func TestLoopCancel(t *testing.T) {
|
||||
metaLoop := metainfo.NewLoop(metainfo.LoopConfig{
|
||||
CoalesceDuration: 1 * time.Second,
|
||||
ListLimit: 10000,
|
||||
}, satellite.Metainfo.Database, satellite.DB.Buckets(), satellite.Metainfo.Metabase)
|
||||
}, satellite.Metainfo.Metabase)
|
||||
|
||||
// create a cancelable context to pass into metaLoop.Run
|
||||
loopCtx, cancel := context.WithCancel(ctx)
|
||||
|
@ -14,6 +14,8 @@ import (
|
||||
"storj.io/storj/private/tagsql"
|
||||
)
|
||||
|
||||
const fullIteratorBatchSizeLimit = 2500
|
||||
|
||||
// FullObjectEntry contains information about and object in metabase.
|
||||
type FullObjectEntry struct {
|
||||
ObjectStream
|
||||
@ -76,8 +78,8 @@ func (db *DB) FullIterateObjects(ctx context.Context, opts FullIterateObjects, f
|
||||
}
|
||||
|
||||
// ensure batch size is reasonable
|
||||
if it.batchSize <= 0 || it.batchSize > batchsizeLimit {
|
||||
it.batchSize = batchsizeLimit
|
||||
if it.batchSize <= 0 || it.batchSize > fullIteratorBatchSizeLimit {
|
||||
it.batchSize = fullIteratorBatchSizeLimit
|
||||
}
|
||||
|
||||
it.curRows, err = it.doNextQuery(ctx)
|
||||
|
@ -476,7 +476,7 @@ func (obs *checkerObserver) InlineSegment(ctx context.Context, segment *metainfo
|
||||
// TODO: check for expired segments
|
||||
|
||||
if !obs.objectCounted {
|
||||
// Note: this may give may give false stats when an object starts with a inline segment.
|
||||
// Note: this may give false stats when an object starts with a inline segment.
|
||||
obs.objectCounted = true
|
||||
stats := obs.getStatsByRS(storj.RedundancyScheme{})
|
||||
stats.iterationAggregates.objectsChecked++
|
||||
|
Loading…
Reference in New Issue
Block a user