satellite/metainfo/metabase: streams iterator

Iterate over streams/segments rather than loading all of them into
memory. This reduces the memory overhead of metainfo loop.

Change-Id: I9e98ab98f0d5f6e80668677269b62d6549526e57
This commit is contained in:
Egon Elbre 2021-03-01 17:27:04 +02:00
parent c51ea68ad3
commit b0b7b81105
5 changed files with 266 additions and 206 deletions

View File

@ -205,8 +205,6 @@ type MetabaseDB interface {
GetLatestObjectLastSegment(ctx context.Context, opts metabase.GetLatestObjectLastSegment) (segment metabase.Segment, err error)
// ListSegments lists specified stream segments.
ListSegments(ctx context.Context, opts metabase.ListSegments) (result metabase.ListSegmentsResult, err error)
// ListLoopSegmentEntries lists streams loop segment entries.
ListLoopSegmentEntries(ctx context.Context, opts metabase.ListLoopSegmentEntries) (result metabase.ListLoopSegmentEntriesResult, err error)
// IterateObjectsAllVersions iterates through all versions of all objects.
IterateObjectsAllVersions(ctx context.Context, opts metabase.IterateObjects, fn func(context.Context, metabase.ObjectsIterator) error) (err error)
// IterateObjectsAllVersionsWithStatus iterates through all versions of all objects with specified status.
@ -215,6 +213,8 @@ type MetabaseDB interface {
IteratePendingObjectsByKey(ctx context.Context, opts metabase.IteratePendingObjectsByKey, fn func(context.Context, metabase.ObjectsIterator) error) (err error)
// IterateLoopObjects iterates through all objects in metabase for metainfo loop purpose.
IterateLoopObjects(ctx context.Context, opts metabase.IterateLoopObjects, fn func(context.Context, metabase.LoopObjectsIterator) error) (err error)
// IterateLoopStreams iterates through all streams passed in as arguments.
IterateLoopStreams(ctx context.Context, opts metabase.IterateLoopStreams, handleStream func(ctx context.Context, streamID uuid.UUID, next metabase.SegmentIterator) error) (err error)
// BucketEmpty returns true if bucket does not contain objects (pending or committed).
// This method doesn't check bucket existence.
BucketEmpty(ctx context.Context, opts metabase.BucketEmpty) (empty bool, err error)

View File

@ -296,6 +296,8 @@ func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*obs
limit = batchsizeLimit
}
startingTime := time.Now()
noObserversErr := errs.New("no observers")
// TODO we may consider keeping only expiration time as its
@ -308,77 +310,64 @@ func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*obs
return nil
}
segments, err := metabaseDB.ListLoopSegmentEntries(ctx, metabase.ListLoopSegmentEntries{
StreamIDs: ids,
})
if err != nil {
return err
}
var lastEntry metabase.LoopObjectEntry
for _, segment := range segments.Segments {
if segment.StreamID != lastEntry.StreamID {
var ok bool
lastEntry, ok = objectsMap[segment.StreamID]
if !ok {
return errs.New("unable to find corresponding object: %v", segment.StreamID)
}
delete(objectsMap, lastEntry.StreamID)
// TODO should we move this directly to iterator to have object
// state as close as possible to time of reading
observers = withObservers(observers, func(observer *observerContext) bool {
object := Object(lastEntry)
return handleObject(ctx, observer, &object)
})
if len(observers) == 0 {
return noObserversErr
}
// if context has been canceled exit. Otherwise, continue
if err := ctx.Err(); err != nil {
return err
}
}
location := metabase.SegmentLocation{
ProjectID: lastEntry.ProjectID,
BucketName: lastEntry.BucketName,
ObjectKey: lastEntry.ObjectKey,
Position: segment.Position,
}
segment := segment
observers = withObservers(observers, func(observer *observerContext) bool {
return handleSegment(ctx, observer, location, segment, lastEntry.ExpiresAt)
})
if len(observers) == 0 {
return noObserversErr
}
// if context has been canceled exit. Otherwise, continue
err := metabaseDB.IterateLoopStreams(ctx, metabase.IterateLoopStreams{
StreamIDs: ids,
AsOfSystemTime: startingTime,
}, func(ctx context.Context, streamID uuid.UUID, next metabase.SegmentIterator) error {
if err := ctx.Err(); err != nil {
return err
}
}
// we have now only objects without segments
for id, entry := range objectsMap {
delete(objectsMap, id)
obj, ok := objectsMap[streamID]
if !ok {
return Error.New("unable to find corresponding object: %v", streamID)
}
delete(objectsMap, streamID)
object := Object(entry)
observers = withObservers(observers, func(observer *observerContext) bool {
object := Object(obj)
return handleObject(ctx, observer, &object)
})
if len(observers) == 0 {
return noObserversErr
}
// if context has been canceled exit. Otherwise, continue
if err := ctx.Err(); err != nil {
return err
for {
// if context has been canceled exit. Otherwise, continue
if err := ctx.Err(); err != nil {
return err
}
var segment metabase.LoopSegmentEntry
if !next(&segment) {
break
}
location := metabase.SegmentLocation{
ProjectID: obj.ProjectID,
BucketName: obj.BucketName,
ObjectKey: obj.ObjectKey,
Position: segment.Position,
}
observers = withObservers(observers, func(observer *observerContext) bool {
return handleSegment(ctx, observer, location, segment, obj.ExpiresAt)
})
if len(observers) == 0 {
return noObserversErr
}
}
return nil
})
if err != nil {
return Error.Wrap(err)
}
if len(objectsMap) > 0 {
return Error.New("unhandled objects %#v", objectsMap)
}
return nil
}

View File

@ -6,8 +6,7 @@ package metabase
import (
"bytes"
"context"
"database/sql"
"errors"
"fmt"
"sort"
"time"
@ -15,6 +14,7 @@ import (
"storj.io/common/storj"
"storj.io/common/uuid"
"storj.io/storj/private/dbutil"
"storj.io/storj/private/dbutil/pgutil"
"storj.io/storj/private/tagsql"
)
@ -176,6 +176,16 @@ func (it *loopIterator) scanItem(item *LoopObjectEntry) error {
)
}
// IterateLoopStreams contains arguments necessary for listing multiple streams segments.
type IterateLoopStreams struct {
StreamIDs []uuid.UUID
AsOfSystemTime time.Time
}
// SegmentIterator returns the next segment.
type SegmentIterator func(segment *LoopSegmentEntry) bool
// LoopSegmentEntry contains information about segment metadata needed by metainfo loop.
type LoopSegmentEntry struct {
StreamID uuid.UUID
@ -191,40 +201,34 @@ func (s LoopSegmentEntry) Inline() bool {
return s.Redundancy.IsZero() && len(s.Pieces) == 0
}
// ListLoopSegmentEntries contains arguments necessary for listing streams loop segment entries.
type ListLoopSegmentEntries struct {
StreamIDs []uuid.UUID
}
// ListLoopSegmentEntriesResult result of listing streams loop segment entries.
type ListLoopSegmentEntriesResult struct {
Segments []LoopSegmentEntry
}
// ListLoopSegmentEntries lists streams loop segment entries.
func (db *DB) ListLoopSegmentEntries(ctx context.Context, opts ListLoopSegmentEntries) (result ListLoopSegmentEntriesResult, err error) {
// IterateLoopStreams lists multiple streams segments.
func (db *DB) IterateLoopStreams(ctx context.Context, opts IterateLoopStreams, handleStream func(ctx context.Context, streamID uuid.UUID, next SegmentIterator) error) (err error) {
defer mon.Task()(&ctx)(&err)
if len(opts.StreamIDs) == 0 {
return ListLoopSegmentEntriesResult{}, ErrInvalidRequest.New("StreamIDs list is empty")
return ErrInvalidRequest.New("StreamIDs list is empty")
}
// TODO do something like pgutil.UUIDArray()
ids := make([][]byte, len(opts.StreamIDs))
for i, streamID := range opts.StreamIDs {
if streamID.IsZero() {
return ListLoopSegmentEntriesResult{}, ErrInvalidRequest.New("StreamID missing: index %d", i)
}
id := streamID
ids[i] = id[:]
}
sort.Slice(ids, func(i, j int) bool {
return bytes.Compare(ids[i], ids[j]) < 0
sort.Slice(opts.StreamIDs, func(i, k int) bool {
return bytes.Compare(opts.StreamIDs[i][:], opts.StreamIDs[k][:]) < 0
})
err = withRows(db.db.Query(ctx, `
// TODO do something like pgutil.UUIDArray()
bytesIDs := make([][]byte, len(opts.StreamIDs))
for i, streamID := range opts.StreamIDs {
if streamID.IsZero() {
return ErrInvalidRequest.New("StreamID missing: index %d", i)
}
id := streamID
bytesIDs[i] = id[:]
}
var asOfSystemTime string
if !opts.AsOfSystemTime.IsZero() && db.implementation == dbutil.Cockroach {
asOfSystemTime = fmt.Sprintf(` AS OF SYSTEM TIME '%d' `, opts.AsOfSystemTime.UnixNano())
}
rows, err := db.db.Query(ctx, `
SELECT
stream_id, position,
root_piece_id,
@ -232,12 +236,40 @@ func (db *DB) ListLoopSegmentEntries(ctx context.Context, opts ListLoopSegmentEn
redundancy,
remote_alias_pieces
FROM segments
`+asOfSystemTime+`
WHERE
-- this turns out to be a little bit faster than stream_id IN (SELECT unnest($1::BYTEA[]))
stream_id = ANY ($1::BYTEA[])
ORDER BY stream_id ASC, position ASC
`, pgutil.ByteaArray(ids)))(func(rows tagsql.Rows) error {
for rows.Next() {
`, pgutil.ByteaArray(bytesIDs))
if err != nil {
return Error.Wrap(err)
}
defer func() { err = errs.Combine(err, rows.Err(), rows.Close()) }()
var noMoreData bool
var nextSegment *LoopSegmentEntry
for _, streamID := range opts.StreamIDs {
streamID := streamID
var internalError error
err := handleStream(ctx, streamID, func(output *LoopSegmentEntry) bool {
if nextSegment != nil {
if nextSegment.StreamID != streamID {
return false
}
*output = *nextSegment
nextSegment = nil
return true
}
if noMoreData {
return false
}
if !rows.Next() {
noMoreData = true
return false
}
var segment LoopSegmentEntry
var aliasPieces AliasPieces
err = rows.Scan(
@ -248,24 +280,28 @@ func (db *DB) ListLoopSegmentEntries(ctx context.Context, opts ListLoopSegmentEn
&aliasPieces,
)
if err != nil {
return Error.New("failed to scan segments: %w", err)
internalError = Error.New("failed to scan segments: %w", err)
return false
}
segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
if err != nil {
return Error.New("failed to convert aliases to pieces: %w", err)
internalError = Error.New("failed to convert aliases to pieces: %w", err)
return false
}
result.Segments = append(result.Segments, segment)
if segment.StreamID != streamID {
nextSegment = &segment
return false
}
*output = segment
return true
})
if internalError != nil || err != nil {
return Error.Wrap(errs.Combine(internalError, err))
}
return nil
})
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return ListLoopSegmentEntriesResult{}, nil
}
return ListLoopSegmentEntriesResult{}, Error.New("unable to fetch object segments: %w", err)
}
return result, nil
return Error.Wrap(err)
}

View File

@ -8,6 +8,7 @@ import (
"sort"
"strings"
"testing"
"time"
"storj.io/common/storj"
"storj.io/common/testcontext"
@ -236,6 +237,127 @@ func TestIterateLoopObjects(t *testing.T) {
})
}
func TestIterateLoopStreams(t *testing.T) {
All(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
t.Run("StreamIDs list is empty", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
IterateLoopStreams{
Opts: metabase.IterateLoopStreams{},
Result: map[uuid.UUID][]metabase.LoopSegmentEntry{},
ErrClass: &metabase.ErrInvalidRequest,
ErrText: "StreamIDs list is empty",
}.Check(ctx, t, db)
Verify{}.Check(ctx, t, db)
})
t.Run("StreamIDs list contains empty ID", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
IterateLoopStreams{
Opts: metabase.IterateLoopStreams{
StreamIDs: []uuid.UUID{{}},
},
Result: map[uuid.UUID][]metabase.LoopSegmentEntry{},
ErrClass: &metabase.ErrInvalidRequest,
ErrText: "StreamID missing: index 0",
}.Check(ctx, t, db)
Verify{}.Check(ctx, t, db)
})
t.Run("List objects segments", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
expectedObject00 := createObject(ctx, t, db, randObjectStream(), 0)
expectedObject01 := createObject(ctx, t, db, randObjectStream(), 1)
expectedObject02 := createObject(ctx, t, db, randObjectStream(), 5)
expectedObject03 := createObject(ctx, t, db, randObjectStream(), 3)
expectedRawSegments := []metabase.RawSegment{}
objects := []metabase.Object{
expectedObject00,
expectedObject01,
expectedObject02,
expectedObject03,
}
sort.Slice(objects, func(i, j int) bool {
return bytes.Compare(objects[i].StreamID[:], objects[j].StreamID[:]) < 0
})
expectedMap := make(map[uuid.UUID][]metabase.LoopSegmentEntry)
for _, object := range objects {
var expectedSegments []metabase.LoopSegmentEntry
for i := 0; i < int(object.SegmentCount); i++ {
segment := metabase.LoopSegmentEntry{
StreamID: object.StreamID,
Position: metabase.SegmentPosition{
Index: uint32(i),
},
RootPieceID: storj.PieceID{1},
EncryptedSize: 1024,
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
Redundancy: defaultTestRedundancy,
}
expectedSegments = append(expectedSegments, segment)
expectedRawSegments = append(expectedRawSegments, metabase.RawSegment{
StreamID: segment.StreamID,
Position: segment.Position,
EncryptedSize: segment.EncryptedSize,
Pieces: segment.Pieces,
Redundancy: segment.Redundancy,
RootPieceID: segment.RootPieceID,
PlainSize: 512,
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
})
}
expectedMap[object.StreamID] = expectedSegments
}
IterateLoopStreams{
Opts: metabase.IterateLoopStreams{
StreamIDs: []uuid.UUID{
expectedObject00.StreamID,
expectedObject01.StreamID,
expectedObject02.StreamID,
expectedObject03.StreamID,
},
AsOfSystemTime: time.Now(),
},
Result: expectedMap,
}.Check(ctx, t, db)
IterateLoopStreams{
Opts: metabase.IterateLoopStreams{
StreamIDs: []uuid.UUID{
expectedObject00.StreamID,
expectedObject01.StreamID,
expectedObject02.StreamID,
expectedObject03.StreamID,
},
},
Result: expectedMap,
}.Check(ctx, t, db)
Verify{
Objects: []metabase.RawObject{
metabase.RawObject(expectedObject00),
metabase.RawObject(expectedObject01),
metabase.RawObject(expectedObject02),
metabase.RawObject(expectedObject03),
},
Segments: expectedRawSegments,
}.Check(ctx, t, db)
})
})
}
func createFullObjectsWithKeys(ctx *testcontext.Context, t *testing.T, db *metabase.DB, projectID uuid.UUID, bucketName string, keys []metabase.ObjectKey) map[metabase.ObjectKey]metabase.LoopObjectEntry {
objects := make(map[metabase.ObjectKey]metabase.LoopObjectEntry, len(keys))
for _, key := range keys {
@ -261,103 +383,3 @@ func loopObjectEntryFromRaw(m metabase.RawObject) metabase.LoopObjectEntry {
SegmentCount: m.SegmentCount,
}
}
func TestListLoopSegmentEntries(t *testing.T) {
All(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
t.Run("StreamIDs list is empty", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
ListLoopSegmentEntries{
Opts: metabase.ListLoopSegmentEntries{},
ErrClass: &metabase.ErrInvalidRequest,
ErrText: "StreamIDs list is empty",
}.Check(ctx, t, db)
Verify{}.Check(ctx, t, db)
})
t.Run("StreamIDs list contains empty ID", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
ListLoopSegmentEntries{
Opts: metabase.ListLoopSegmentEntries{
StreamIDs: []uuid.UUID{{}},
},
ErrClass: &metabase.ErrInvalidRequest,
ErrText: "StreamID missing: index 0",
}.Check(ctx, t, db)
Verify{}.Check(ctx, t, db)
})
t.Run("List objects segments", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
expectedObject01 := createObject(ctx, t, db, randObjectStream(), 1)
expectedObject02 := createObject(ctx, t, db, randObjectStream(), 5)
expectedObject03 := createObject(ctx, t, db, randObjectStream(), 3)
expectedSegments := []metabase.LoopSegmentEntry{}
expectedRawSegments := []metabase.RawSegment{}
objects := []metabase.Object{expectedObject01, expectedObject02, expectedObject03}
sort.Slice(objects, func(i, j int) bool {
return bytes.Compare(objects[i].StreamID[:], objects[j].StreamID[:]) < 0
})
addSegments := func(object metabase.Object) {
for i := 0; i < int(object.SegmentCount); i++ {
segment := metabase.LoopSegmentEntry{
StreamID: object.StreamID,
Position: metabase.SegmentPosition{
Index: uint32(i),
},
RootPieceID: storj.PieceID{1},
EncryptedSize: 1024,
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
Redundancy: defaultTestRedundancy,
}
expectedSegments = append(expectedSegments, segment)
expectedRawSegments = append(expectedRawSegments, metabase.RawSegment{
StreamID: segment.StreamID,
Position: segment.Position,
EncryptedSize: segment.EncryptedSize,
Pieces: segment.Pieces,
Redundancy: segment.Redundancy,
RootPieceID: segment.RootPieceID,
PlainSize: 512,
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
})
}
}
for _, object := range objects {
addSegments(object)
}
ListLoopSegmentEntries{
Opts: metabase.ListLoopSegmentEntries{
StreamIDs: []uuid.UUID{
expectedObject01.StreamID,
expectedObject02.StreamID,
expectedObject03.StreamID,
},
},
Result: metabase.ListLoopSegmentEntriesResult{
Segments: expectedSegments,
},
}.Check(ctx, t, db)
Verify{
Objects: []metabase.RawObject{
metabase.RawObject(expectedObject01),
metabase.RawObject(expectedObject02),
metabase.RawObject(expectedObject03),
},
Segments: expectedRawSegments,
}.Check(ctx, t, db)
})
})
}

View File

@ -274,15 +274,28 @@ func (step ListSegments) Check(ctx *testcontext.Context, t testing.TB, db *metab
require.Zero(t, diff)
}
type ListLoopSegmentEntries struct {
Opts metabase.ListLoopSegmentEntries
Result metabase.ListLoopSegmentEntriesResult
type IterateLoopStreams struct {
Opts metabase.IterateLoopStreams
Result map[uuid.UUID][]metabase.LoopSegmentEntry
ErrClass *errs.Class
ErrText string
}
func (step ListLoopSegmentEntries) Check(ctx *testcontext.Context, t testing.TB, db *metabase.DB) {
result, err := db.ListLoopSegmentEntries(ctx, step.Opts)
func (step IterateLoopStreams) Check(ctx *testcontext.Context, t testing.TB, db *metabase.DB) {
result := make(map[uuid.UUID][]metabase.LoopSegmentEntry)
err := db.IterateLoopStreams(ctx, step.Opts,
func(ctx context.Context, streamID uuid.UUID, next metabase.SegmentIterator) error {
var segments []metabase.LoopSegmentEntry
for {
var segment metabase.LoopSegmentEntry
if !next(&segment) {
break
}
segments = append(segments, segment)
}
result[streamID] = segments
return nil
})
checkError(t, err, step.ErrClass, step.ErrText)
diff := cmp.Diff(step.Result, result, cmpopts.EquateApproxTime(5*time.Second))