satellite/metainfo: avoid temporary list

Currently ListV2 loaded the whole data into memory, even when all the
data wasn't being used, using up more memory than needed.

Change-Id: I5846d979344729b447c108a6cc9f4227229ec981
This commit is contained in:
Egon Elbre 2020-04-13 19:50:25 +03:00
parent 921b574554
commit 2c0d61b18e
2 changed files with 42 additions and 18 deletions

View File

@ -232,20 +232,20 @@ func (s *Service) List(ctx context.Context, prefix string, startAfter string, re
}
}
rawItems, more, err := storage.ListV2(ctx, s.db, storage.ListOptions{
more, err = storage.ListV2Iterate(ctx, s.db, storage.ListOptions{
Prefix: prefixKey,
StartAfter: storage.Key(startAfter),
Recursive: recursive,
Limit: int(limit),
IncludeValue: metaFlags != meta.None,
}, func(ctx context.Context, item *storage.ListItem) error {
items = append(items, s.createListItem(ctx, *item, metaFlags))
return nil
})
if err != nil {
return nil, false, Error.Wrap(err)
}
for _, rawItem := range rawItems {
items = append(items, s.createListItem(ctx, rawItem, metaFlags))
}
return items, more, nil
}

View File

@ -24,6 +24,36 @@ type ListOptions struct {
// If true then the caller must call List again to get more
// results by setting `StartAfter` appropriately.
func ListV2(ctx context.Context, store KeyValueStore, opts ListOptions) (result Items, more bool, err error) {
more, err = ListV2Iterate(ctx, store, opts, func(ctx context.Context, item *ListItem) error {
if opts.IncludeValue {
result = append(result, ListItem{
Key: CloneKey(item.Key),
Value: CloneValue(item.Value),
IsPrefix: item.IsPrefix,
})
} else {
result = append(result, ListItem{
Key: CloneKey(item.Key),
IsPrefix: item.IsPrefix,
})
}
return nil
})
return result, more, err
}
// ListV2Iterate lists all keys corresponding to ListOptions.
// limit is capped to LookupLimit.
//
// more indicates if the result was truncated. If false
// then the result []ListItem includes all requested keys.
// If true then the caller must call List again to get more
// results by setting `StartAfter` appropriately.
//
// The opts.IncludeValue is ignored for this func.
// The callback item will be reused for next calls.
// If the user needs the preserve the value, it must call storage.CloneValue or storage.CloneKey.
func ListV2Iterate(ctx context.Context, store KeyValueStore, opts ListOptions, fn func(context.Context, *ListItem) error) (more bool, err error) {
defer mon.Task()(&ctx)(&err)
limit := opts.Limit
@ -54,20 +84,14 @@ func ListV2(ctx context.Context, store KeyValueStore, opts ListOptions) (result
}
}
task := mon.TaskNamed("appending_to_results")(nil)
if opts.IncludeValue {
result = append(result, ListItem{
Key: CloneKey(relativeKey),
Value: CloneValue(item.Value),
IsPrefix: item.IsPrefix,
})
} else {
result = append(result, ListItem{
Key: CloneKey(relativeKey),
IsPrefix: item.IsPrefix,
})
}
task := mon.TaskNamed("handling_item")(nil)
item.Key = relativeKey
err := fn(ctx, &item)
task(nil)
if err != nil {
return err
}
}
// we still need to consume one item for the more flag
@ -86,7 +110,7 @@ func ListV2(ctx context.Context, store KeyValueStore, opts ListOptions) (result
Limit: limit,
}, iterate)
return result, more, err
return more, err
}
func joinKey(a, b Key) Key {