satellite/metabase/{meta,segment}loop: avoid passing config
Currently the iterate is being called in only one location so there's no benefit in passing them as arguments over using the receiver. Change-Id: I433a5d8b795b1bcc1f1e9320d87b10820cf537f1
This commit is contained in:
parent
46a3242ed4
commit
4469d229f8
@ -334,7 +334,7 @@ waitformore:
|
||||
}
|
||||
close(earlyExitDone)
|
||||
|
||||
return iterateDatabase(ctx, loop.metabaseDB, observers, loop.config.ListLimit, rate.NewLimiter(rate.Limit(loop.config.RateLimit), 1), loop.config.MaxAsOfSystemDuration)
|
||||
return loop.iterateDatabase(ctx, observers)
|
||||
}
|
||||
|
||||
func stopTimer(t *time.Timer) {
|
||||
@ -354,7 +354,7 @@ func (loop *Service) Wait() {
|
||||
|
||||
var errNoObservers = errs.New("no observers")
|
||||
|
||||
func iterateDatabase(ctx context.Context, metabaseDB MetabaseDB, observers []*observerContext, limit int, rateLimiter *rate.Limiter, maxAsOfSystemDuration time.Duration) (err error) {
|
||||
func (loop *Service) iterateDatabase(ctx context.Context, observers []*observerContext) (err error) {
|
||||
defer func() {
|
||||
if err != nil {
|
||||
errorObservers(observers, err)
|
||||
@ -363,7 +363,7 @@ func iterateDatabase(ctx context.Context, metabaseDB MetabaseDB, observers []*ob
|
||||
finishObservers(observers)
|
||||
}()
|
||||
|
||||
observers, err = iterateObjects(ctx, metabaseDB, observers, limit, rateLimiter, maxAsOfSystemDuration)
|
||||
observers, err = loop.iterateObjects(ctx, observers)
|
||||
if errors.Is(err, errNoObservers) {
|
||||
return nil
|
||||
}
|
||||
@ -374,14 +374,17 @@ func iterateDatabase(ctx context.Context, metabaseDB MetabaseDB, observers []*ob
|
||||
return nil
|
||||
}
|
||||
|
||||
func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*observerContext, limit int, rateLimiter *rate.Limiter, maxAsOfSystemDuration time.Duration) (_ []*observerContext, err error) {
|
||||
func (loop *Service) iterateObjects(ctx context.Context, observers []*observerContext) (_ []*observerContext, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
limit := loop.config.ListLimit
|
||||
if limit <= 0 || limit > batchsizeLimit {
|
||||
limit = batchsizeLimit
|
||||
}
|
||||
|
||||
startingTime, err := metabaseDB.Now(ctx)
|
||||
rateLimiter := rate.NewLimiter(rate.Limit(loop.config.RateLimit), 1)
|
||||
|
||||
startingTime, err := loop.metabaseDB.Now(ctx)
|
||||
if err != nil {
|
||||
return observers, Error.Wrap(err)
|
||||
}
|
||||
@ -410,11 +413,11 @@ func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*obs
|
||||
return nil
|
||||
}
|
||||
|
||||
if time.Since(currentAsOfSystemTime) > maxAsOfSystemDuration {
|
||||
currentAsOfSystemTime = time.Now().Add(-maxAsOfSystemDuration)
|
||||
if time.Since(currentAsOfSystemTime) > loop.config.MaxAsOfSystemDuration {
|
||||
currentAsOfSystemTime = time.Now().Add(-loop.config.MaxAsOfSystemDuration)
|
||||
}
|
||||
|
||||
err = metabaseDB.IterateLoopStreams(ctx, metabase.IterateLoopStreams{
|
||||
err = loop.metabaseDB.IterateLoopStreams(ctx, metabase.IterateLoopStreams{
|
||||
StreamIDs: ids,
|
||||
AsOfSystemTime: currentAsOfSystemTime,
|
||||
}, func(ctx context.Context, streamID uuid.UUID, next metabase.SegmentIterator) (err error) {
|
||||
@ -487,7 +490,7 @@ func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*obs
|
||||
var objectsIterated int64
|
||||
|
||||
segmentsInBatch := int32(0)
|
||||
err = metabaseDB.IterateLoopObjects(ctx, metabase.IterateLoopObjects{
|
||||
err = loop.metabaseDB.IterateLoopObjects(ctx, metabase.IterateLoopObjects{
|
||||
BatchSize: limit,
|
||||
AsOfSystemTime: startingTime,
|
||||
}, func(ctx context.Context, it metabase.LoopObjectsIterator) (err error) {
|
||||
|
@ -298,7 +298,7 @@ waitformore:
|
||||
}
|
||||
close(earlyExitDone)
|
||||
|
||||
return iterateDatabase(ctx, loop.metabaseDB, observers, loop.config.ListLimit, rate.NewLimiter(rate.Limit(loop.config.RateLimit), 1))
|
||||
return loop.iterateDatabase(ctx, observers)
|
||||
}
|
||||
|
||||
func stopTimer(t *time.Timer) {
|
||||
@ -318,7 +318,7 @@ func (loop *Service) Wait() {
|
||||
|
||||
var errNoObservers = errs.New("no observers")
|
||||
|
||||
func iterateDatabase(ctx context.Context, metabaseDB MetabaseDB, observers []*observerContext, limit int, rateLimiter *rate.Limiter) (err error) {
|
||||
func (loop *Service) iterateDatabase(ctx context.Context, observers []*observerContext) (err error) {
|
||||
defer func() {
|
||||
if err != nil {
|
||||
errorObservers(observers, err)
|
||||
@ -327,7 +327,7 @@ func iterateDatabase(ctx context.Context, metabaseDB MetabaseDB, observers []*ob
|
||||
finishObservers(observers)
|
||||
}()
|
||||
|
||||
observers, err = iterateSegments(ctx, metabaseDB, observers, limit, rateLimiter)
|
||||
observers, err = loop.iterateSegments(ctx, observers)
|
||||
if errors.Is(err, errNoObservers) {
|
||||
return nil
|
||||
}
|
||||
@ -338,14 +338,16 @@ func iterateDatabase(ctx context.Context, metabaseDB MetabaseDB, observers []*ob
|
||||
return err
|
||||
}
|
||||
|
||||
func iterateSegments(ctx context.Context, metabaseDB MetabaseDB, observers []*observerContext, limit int, rateLimiter *rate.Limiter) (_ []*observerContext, err error) {
|
||||
func (loop *Service) iterateSegments(ctx context.Context, observers []*observerContext) (_ []*observerContext, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
rateLimiter := rate.NewLimiter(rate.Limit(loop.config.RateLimit), 1)
|
||||
limit := loop.config.ListLimit
|
||||
if limit <= 0 || limit > batchsizeLimit {
|
||||
limit = batchsizeLimit
|
||||
}
|
||||
|
||||
startingTime, err := metabaseDB.Now(ctx)
|
||||
startingTime, err := loop.metabaseDB.Now(ctx)
|
||||
if err != nil {
|
||||
return observers, Error.Wrap(err)
|
||||
}
|
||||
@ -361,7 +363,7 @@ func iterateSegments(ctx context.Context, metabaseDB MetabaseDB, observers []*ob
|
||||
|
||||
var segmentsProcessed int64
|
||||
|
||||
err = metabaseDB.IterateLoopSegments(ctx, metabase.IterateLoopSegments{
|
||||
err = loop.metabaseDB.IterateLoopSegments(ctx, metabase.IterateLoopSegments{
|
||||
BatchSize: limit,
|
||||
AsOfSystemTime: startingTime,
|
||||
}, func(ctx context.Context, iterator metabase.LoopSegmentsIterator) error {
|
||||
|
Loading…
Reference in New Issue
Block a user