From 99128ab551438c585f8d86d29d5831b81d8d3784 Mon Sep 17 00:00:00 2001 From: Michal Niewrzal Date: Wed, 12 Jul 2023 10:07:16 +0200 Subject: [PATCH] satellite/metabase: reuse Pieces while looping segments Segments loop implementation is using lots of memory to convert alias pieces to pieces for each segment while iteration. To improve situation this change is reusing Pieces between batch pages. This should signifcantly reduce memory usage for ranged loop executions. Change-Id: I469188779908facb19ad85c6bb7bc3657111cc9a --- satellite/metabase/aliascache.go | 14 +++++++++++--- satellite/metabase/loop.go | 19 +++++++++++++++---- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/satellite/metabase/aliascache.go b/satellite/metabase/aliascache.go index bf0d98860..67b7d4680 100644 --- a/satellite/metabase/aliascache.go +++ b/satellite/metabase/aliascache.go @@ -201,13 +201,21 @@ func (cache *NodeAliasCache) EnsurePiecesToAliases(ctx context.Context, pieces P // ConvertAliasesToPieces converts alias pieces to pieces. func (cache *NodeAliasCache) ConvertAliasesToPieces(ctx context.Context, aliasPieces AliasPieces) (_ Pieces, err error) { + return cache.convertAliasesToPieces(ctx, aliasPieces, make(Pieces, len(aliasPieces))) +} + +// convertAliasesToPieces converts AliasPieces by populating Pieces with converted data. +func (cache *NodeAliasCache) convertAliasesToPieces(ctx context.Context, aliasPieces AliasPieces, pieces Pieces) (_ Pieces, err error) { if len(aliasPieces) == 0 { return Pieces{}, nil } + if len(aliasPieces) != len(pieces) { + return Pieces{}, Error.New("aliasPieces and pieces length must be equal") + } + latest := cache.getLatest() - pieces := make(Pieces, len(aliasPieces)) var missing []NodeAlias for i, aliasPiece := range aliasPieces { @@ -224,13 +232,13 @@ func (cache *NodeAliasCache) ConvertAliasesToPieces(ctx context.Context, aliasPi var err error latest, err = cache.refresh(ctx, nil, missing) if err != nil { - return nil, Error.New("failed to refresh node alias db: %w", err) + return Pieces{}, Error.New("failed to refresh node alias db: %w", err) } for i, aliasPiece := range aliasPieces { node, ok := latest.Node(aliasPiece.Alias) if !ok { - return nil, Error.New("aliases missing in database: %v", missing) + return Pieces{}, Error.New("aliases missing in database: %v", missing) } pieces[i].Number = aliasPiece.Number pieces[i].StorageNode = node diff --git a/satellite/metabase/loop.go b/satellite/metabase/loop.go index 2b299a45c..974049363 100644 --- a/satellite/metabase/loop.go +++ b/satellite/metabase/loop.go @@ -255,12 +255,15 @@ func (db *DB) IterateLoopSegments(ctx context.Context, opts IterateLoopSegments, return err } + loopIteratorBatchSizeLimit.Ensure(&opts.BatchSize) + it := &loopSegmentIterator{ db: db, asOfSystemTime: opts.AsOfSystemTime, asOfSystemInterval: opts.AsOfSystemInterval, batchSize: opts.BatchSize, + batchPieces: make([]Pieces, opts.BatchSize), curIndex: 0, cursor: loopSegmentIteratorCursor{ @@ -277,8 +280,6 @@ func (db *DB) IterateLoopSegments(ctx context.Context, opts IterateLoopSegments, it.cursor.EndStreamID = uuid.Max() } - loopIteratorBatchSizeLimit.Ensure(&it.batchSize) - it.curRows, err = it.doNextQuery(ctx) if err != nil { return err @@ -298,7 +299,10 @@ func (db *DB) IterateLoopSegments(ctx context.Context, opts IterateLoopSegments, type loopSegmentIterator struct { db *DB - batchSize int + batchSize int + // batchPieces are reused between result pages to reduce memory consumption + batchPieces []Pieces + asOfSystemTime time.Time asOfSystemInterval time.Duration @@ -399,7 +403,14 @@ func (it *loopSegmentIterator) scanItem(ctx context.Context, item *LoopSegmentEn return Error.New("failed to scan segments: %w", err) } - item.Pieces, err = it.db.aliasCache.ConvertAliasesToPieces(ctx, item.AliasPieces) + // allocate new Pieces only if existing have not enough capacity + if cap(it.batchPieces[it.curIndex]) < len(item.AliasPieces) { + it.batchPieces[it.curIndex] = make(Pieces, len(item.AliasPieces)) + } else { + it.batchPieces[it.curIndex] = it.batchPieces[it.curIndex][:len(item.AliasPieces)] + } + + item.Pieces, err = it.db.aliasCache.convertAliasesToPieces(ctx, item.AliasPieces, it.batchPieces[it.curIndex]) if err != nil { return Error.New("failed to convert aliases to pieces: %w", err) }