From 4469d229f8f8ec207cbef03ea39281ebef57e47b Mon Sep 17 00:00:00 2001 From: Egon Elbre Date: Wed, 2 Jun 2021 15:04:40 +0300 Subject: [PATCH] satellite/metabase/{meta,segment}loop: avoid passing config Currently the iterate is being called in only one location so there's no benefit in passing them as arguments over using the receiver. Change-Id: I433a5d8b795b1bcc1f1e9320d87b10820cf537f1 --- satellite/metabase/metaloop/service.go | 21 ++++++++++++--------- satellite/metabase/segmentloop/service.go | 14 ++++++++------ 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/satellite/metabase/metaloop/service.go b/satellite/metabase/metaloop/service.go index f7dcd0e15..cda723bfc 100644 --- a/satellite/metabase/metaloop/service.go +++ b/satellite/metabase/metaloop/service.go @@ -334,7 +334,7 @@ waitformore: } close(earlyExitDone) - return iterateDatabase(ctx, loop.metabaseDB, observers, loop.config.ListLimit, rate.NewLimiter(rate.Limit(loop.config.RateLimit), 1), loop.config.MaxAsOfSystemDuration) + return loop.iterateDatabase(ctx, observers) } func stopTimer(t *time.Timer) { @@ -354,7 +354,7 @@ func (loop *Service) Wait() { var errNoObservers = errs.New("no observers") -func iterateDatabase(ctx context.Context, metabaseDB MetabaseDB, observers []*observerContext, limit int, rateLimiter *rate.Limiter, maxAsOfSystemDuration time.Duration) (err error) { +func (loop *Service) iterateDatabase(ctx context.Context, observers []*observerContext) (err error) { defer func() { if err != nil { errorObservers(observers, err) @@ -363,7 +363,7 @@ func iterateDatabase(ctx context.Context, metabaseDB MetabaseDB, observers []*ob finishObservers(observers) }() - observers, err = iterateObjects(ctx, metabaseDB, observers, limit, rateLimiter, maxAsOfSystemDuration) + observers, err = loop.iterateObjects(ctx, observers) if errors.Is(err, errNoObservers) { return nil } @@ -374,14 +374,17 @@ func iterateDatabase(ctx context.Context, metabaseDB MetabaseDB, observers []*ob return nil } -func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*observerContext, limit int, rateLimiter *rate.Limiter, maxAsOfSystemDuration time.Duration) (_ []*observerContext, err error) { +func (loop *Service) iterateObjects(ctx context.Context, observers []*observerContext) (_ []*observerContext, err error) { defer mon.Task()(&ctx)(&err) + limit := loop.config.ListLimit if limit <= 0 || limit > batchsizeLimit { limit = batchsizeLimit } - startingTime, err := metabaseDB.Now(ctx) + rateLimiter := rate.NewLimiter(rate.Limit(loop.config.RateLimit), 1) + + startingTime, err := loop.metabaseDB.Now(ctx) if err != nil { return observers, Error.Wrap(err) } @@ -410,11 +413,11 @@ func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*obs return nil } - if time.Since(currentAsOfSystemTime) > maxAsOfSystemDuration { - currentAsOfSystemTime = time.Now().Add(-maxAsOfSystemDuration) + if time.Since(currentAsOfSystemTime) > loop.config.MaxAsOfSystemDuration { + currentAsOfSystemTime = time.Now().Add(-loop.config.MaxAsOfSystemDuration) } - err = metabaseDB.IterateLoopStreams(ctx, metabase.IterateLoopStreams{ + err = loop.metabaseDB.IterateLoopStreams(ctx, metabase.IterateLoopStreams{ StreamIDs: ids, AsOfSystemTime: currentAsOfSystemTime, }, func(ctx context.Context, streamID uuid.UUID, next metabase.SegmentIterator) (err error) { @@ -487,7 +490,7 @@ func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*obs var objectsIterated int64 segmentsInBatch := int32(0) - err = metabaseDB.IterateLoopObjects(ctx, metabase.IterateLoopObjects{ + err = loop.metabaseDB.IterateLoopObjects(ctx, metabase.IterateLoopObjects{ BatchSize: limit, AsOfSystemTime: startingTime, }, func(ctx context.Context, it metabase.LoopObjectsIterator) (err error) { diff --git a/satellite/metabase/segmentloop/service.go b/satellite/metabase/segmentloop/service.go index edf68c4fb..cff950208 100644 --- a/satellite/metabase/segmentloop/service.go +++ b/satellite/metabase/segmentloop/service.go @@ -298,7 +298,7 @@ waitformore: } close(earlyExitDone) - return iterateDatabase(ctx, loop.metabaseDB, observers, loop.config.ListLimit, rate.NewLimiter(rate.Limit(loop.config.RateLimit), 1)) + return loop.iterateDatabase(ctx, observers) } func stopTimer(t *time.Timer) { @@ -318,7 +318,7 @@ func (loop *Service) Wait() { var errNoObservers = errs.New("no observers") -func iterateDatabase(ctx context.Context, metabaseDB MetabaseDB, observers []*observerContext, limit int, rateLimiter *rate.Limiter) (err error) { +func (loop *Service) iterateDatabase(ctx context.Context, observers []*observerContext) (err error) { defer func() { if err != nil { errorObservers(observers, err) @@ -327,7 +327,7 @@ func iterateDatabase(ctx context.Context, metabaseDB MetabaseDB, observers []*ob finishObservers(observers) }() - observers, err = iterateSegments(ctx, metabaseDB, observers, limit, rateLimiter) + observers, err = loop.iterateSegments(ctx, observers) if errors.Is(err, errNoObservers) { return nil } @@ -338,14 +338,16 @@ func iterateDatabase(ctx context.Context, metabaseDB MetabaseDB, observers []*ob return err } -func iterateSegments(ctx context.Context, metabaseDB MetabaseDB, observers []*observerContext, limit int, rateLimiter *rate.Limiter) (_ []*observerContext, err error) { +func (loop *Service) iterateSegments(ctx context.Context, observers []*observerContext) (_ []*observerContext, err error) { defer mon.Task()(&ctx)(&err) + rateLimiter := rate.NewLimiter(rate.Limit(loop.config.RateLimit), 1) + limit := loop.config.ListLimit if limit <= 0 || limit > batchsizeLimit { limit = batchsizeLimit } - startingTime, err := metabaseDB.Now(ctx) + startingTime, err := loop.metabaseDB.Now(ctx) if err != nil { return observers, Error.Wrap(err) } @@ -361,7 +363,7 @@ func iterateSegments(ctx context.Context, metabaseDB MetabaseDB, observers []*ob var segmentsProcessed int64 - err = metabaseDB.IterateLoopSegments(ctx, metabase.IterateLoopSegments{ + err = loop.metabaseDB.IterateLoopSegments(ctx, metabase.IterateLoopSegments{ BatchSize: limit, AsOfSystemTime: startingTime, }, func(ctx context.Context, iterator metabase.LoopSegmentsIterator) error {