satellite/metainfo: loop cleanups
Change-Id: I89b4f519ce18cb0ce34a61a4cbe1675d93741637
This commit is contained in:
parent
95b78e8011
commit
846bb895e0
@ -18,6 +18,8 @@ import (
|
||||
"storj.io/storj/satellite/metainfo/metabase"
|
||||
)
|
||||
|
||||
const batchsizeLimit = 2500
|
||||
|
||||
var (
|
||||
// LoopError is a standard error class for this component.
|
||||
LoopError = errs.Class("metainfo loop error")
|
||||
@ -301,6 +303,10 @@ func iterateDatabase(ctx context.Context, metabaseDB MetabaseDB, observers []*ob
|
||||
func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*observerContext, limit int, rateLimiter *rate.Limiter) (_ []*observerContext, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if limit <= 0 || limit > batchsizeLimit {
|
||||
limit = batchsizeLimit
|
||||
}
|
||||
|
||||
noObserversErr := errs.New("no observers")
|
||||
|
||||
// TODO we may consider keeping only expiration time as its
|
||||
@ -322,13 +328,6 @@ func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*obs
|
||||
|
||||
var lastObject metabase.FullObjectEntry
|
||||
for _, segment := range segments.Segments {
|
||||
if err := rateLimiter.Wait(ctx); err != nil {
|
||||
// We don't really execute concurrent batches so we should never
|
||||
// exceed the burst size of 1 and this should never happen.
|
||||
// We can also enter here if the context is cancelled.
|
||||
return err
|
||||
}
|
||||
|
||||
if segment.StreamID != lastObject.StreamID {
|
||||
var ok bool
|
||||
lastObject, ok = objectsMap[segment.StreamID]
|
||||
|
Loading…
Reference in New Issue
Block a user