satellite/metainfo: limit number of batched segments in metainfo loop

New metainfo loop can have memory issues when in one batch we will have object with many segments. This change limits number of batched segments to defined limit. Solution is not perfect as if we will have single object with extreme large segments count it can cross defined limit a lot. We need to prepare safer solution soon.

Change-Id: Iefcf466d5bac76513d4219b1a9d99adc361c54ae
This commit is contained in:
Michał Niewrzał 2021-02-25 14:54:30 +01:00
parent 8bbabe57db
commit 68605f32ed

View File

@ -392,6 +392,7 @@ func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*obs
return nil return nil
} }
segmentsInBatch := int32(0)
err = metabaseDB.FullIterateObjects(ctx, metabase.FullIterateObjects{ err = metabaseDB.FullIterateObjects(ctx, metabase.FullIterateObjects{
BatchSize: limit, BatchSize: limit,
}, func(ctx context.Context, it metabase.FullObjectsIterator) error { }, func(ctx context.Context, it metabase.FullObjectsIterator) error {
@ -407,7 +408,10 @@ func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*obs
objectsMap[entry.StreamID] = entry objectsMap[entry.StreamID] = entry
ids = append(ids, entry.StreamID) ids = append(ids, entry.StreamID)
if len(objectsMap) == limit { // add +1 to reduce risk of crossing limit
segmentsInBatch += entry.SegmentCount + 1
if segmentsInBatch >= int32(limit) {
err := processBatch() err := processBatch()
if err != nil { if err != nil {
if errors.Is(err, noObserversErr) { if errors.Is(err, noObserversErr) {
@ -421,6 +425,7 @@ func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*obs
} }
ids = ids[:0] ids = ids[:0]
segmentsInBatch = 0
} }
} }
err = processBatch() err = processBatch()