satellite/metabase: reuse Pieces while looping segments

Segments loop implementation is using lots of memory to convert
alias pieces to pieces for each segment while iteration. To improve
situation this change is reusing Pieces between batch pages. This
should signifcantly reduce memory usage for ranged loop executions.

Change-Id: I469188779908facb19ad85c6bb7bc3657111cc9a
This commit is contained in:
Michal Niewrzal 2023-07-12 10:07:16 +02:00 committed by Michał Niewrzał
parent 062ca285a0
commit 99128ab551
2 changed files with 26 additions and 7 deletions

View File

@ -201,13 +201,21 @@ func (cache *NodeAliasCache) EnsurePiecesToAliases(ctx context.Context, pieces P
// ConvertAliasesToPieces converts alias pieces to pieces.
func (cache *NodeAliasCache) ConvertAliasesToPieces(ctx context.Context, aliasPieces AliasPieces) (_ Pieces, err error) {
return cache.convertAliasesToPieces(ctx, aliasPieces, make(Pieces, len(aliasPieces)))
}
// convertAliasesToPieces converts AliasPieces by populating Pieces with converted data.
func (cache *NodeAliasCache) convertAliasesToPieces(ctx context.Context, aliasPieces AliasPieces, pieces Pieces) (_ Pieces, err error) {
if len(aliasPieces) == 0 {
return Pieces{}, nil
}
if len(aliasPieces) != len(pieces) {
return Pieces{}, Error.New("aliasPieces and pieces length must be equal")
}
latest := cache.getLatest()
pieces := make(Pieces, len(aliasPieces))
var missing []NodeAlias
for i, aliasPiece := range aliasPieces {
@ -224,13 +232,13 @@ func (cache *NodeAliasCache) ConvertAliasesToPieces(ctx context.Context, aliasPi
var err error
latest, err = cache.refresh(ctx, nil, missing)
if err != nil {
return nil, Error.New("failed to refresh node alias db: %w", err)
return Pieces{}, Error.New("failed to refresh node alias db: %w", err)
}
for i, aliasPiece := range aliasPieces {
node, ok := latest.Node(aliasPiece.Alias)
if !ok {
return nil, Error.New("aliases missing in database: %v", missing)
return Pieces{}, Error.New("aliases missing in database: %v", missing)
}
pieces[i].Number = aliasPiece.Number
pieces[i].StorageNode = node

View File

@ -255,12 +255,15 @@ func (db *DB) IterateLoopSegments(ctx context.Context, opts IterateLoopSegments,
return err
}
loopIteratorBatchSizeLimit.Ensure(&opts.BatchSize)
it := &loopSegmentIterator{
db: db,
asOfSystemTime: opts.AsOfSystemTime,
asOfSystemInterval: opts.AsOfSystemInterval,
batchSize: opts.BatchSize,
batchPieces: make([]Pieces, opts.BatchSize),
curIndex: 0,
cursor: loopSegmentIteratorCursor{
@ -277,8 +280,6 @@ func (db *DB) IterateLoopSegments(ctx context.Context, opts IterateLoopSegments,
it.cursor.EndStreamID = uuid.Max()
}
loopIteratorBatchSizeLimit.Ensure(&it.batchSize)
it.curRows, err = it.doNextQuery(ctx)
if err != nil {
return err
@ -298,7 +299,10 @@ func (db *DB) IterateLoopSegments(ctx context.Context, opts IterateLoopSegments,
type loopSegmentIterator struct {
db *DB
batchSize int
batchSize int
// batchPieces are reused between result pages to reduce memory consumption
batchPieces []Pieces
asOfSystemTime time.Time
asOfSystemInterval time.Duration
@ -399,7 +403,14 @@ func (it *loopSegmentIterator) scanItem(ctx context.Context, item *LoopSegmentEn
return Error.New("failed to scan segments: %w", err)
}
item.Pieces, err = it.db.aliasCache.ConvertAliasesToPieces(ctx, item.AliasPieces)
// allocate new Pieces only if existing have not enough capacity
if cap(it.batchPieces[it.curIndex]) < len(item.AliasPieces) {
it.batchPieces[it.curIndex] = make(Pieces, len(item.AliasPieces))
} else {
it.batchPieces[it.curIndex] = it.batchPieces[it.curIndex][:len(item.AliasPieces)]
}
item.Pieces, err = it.db.aliasCache.convertAliasesToPieces(ctx, item.AliasPieces, it.batchPieces[it.curIndex])
if err != nil {
return Error.New("failed to convert aliases to pieces: %w", err)
}