satellite/metabase: capture iterator errors

It was possible for the iterator to silently ignore scanning, nextQuery
and close errors.

Change-Id: I7e44674d9eae53267a3ed649b7657d932743bf73
This commit is contained in:
Egon Elbre 2021-06-04 20:32:25 +03:00
parent 5044337440
commit e6fe9d209e
2 changed files with 24 additions and 9 deletions

View File

@ -34,6 +34,9 @@ type objectsIterator struct {
skipPrefix ObjectKey
doNextQuery func(context.Context, *objectsIterator) (_ tagsql.Rows, err error)
// failErr is set when either scan or next query fails during iteration.
failErr error
}
type iterateCursor struct {
@ -157,7 +160,7 @@ func iterate(ctx context.Context, it *objectsIterator, fn func(context.Context,
if rowsErr := it.curRows.Err(); rowsErr != nil {
err = errs.Combine(err, rowsErr)
}
err = errs.Combine(err, it.curRows.Close())
err = errs.Combine(err, it.failErr, it.curRows.Close())
}()
return fn(ctx, it)
@ -215,11 +218,12 @@ func (it *objectsIterator) next(ctx context.Context, item *ObjectEntry) bool {
rows, err := it.doNextQuery(ctx, it)
if err != nil {
it.failErr = errs.Combine(it.failErr, err)
return false
}
if it.curRows.Close() != nil {
_ = rows.Close()
if closeErr := it.curRows.Close(); closeErr != nil {
it.failErr = errs.Combine(it.failErr, closeErr, rows.Close())
return false
}
@ -232,6 +236,7 @@ func (it *objectsIterator) next(ctx context.Context, item *ObjectEntry) bool {
err := it.scanItem(item)
if err != nil {
it.failErr = errs.Combine(it.failErr, err)
return false
}

View File

@ -81,7 +81,7 @@ func (db *DB) IterateLoopObjects(ctx context.Context, opts IterateLoopObjects, f
if rowsErr := it.curRows.Err(); rowsErr != nil {
err = errs.Combine(err, rowsErr)
}
err = errs.Combine(err, it.curRows.Close())
err = errs.Combine(err, it.failErr, it.curRows.Close())
}()
return fn(ctx, it)
@ -97,6 +97,9 @@ type loopIterator struct {
curIndex int
curRows tagsql.Rows
cursor loopIterateCursor
// failErr is set when either scan or next query fails during iteration.
failErr error
}
type loopIterateCursor struct {
@ -120,11 +123,12 @@ func (it *loopIterator) Next(ctx context.Context, item *LoopObjectEntry) bool {
rows, err := it.doNextQuery(ctx)
if err != nil {
it.failErr = errs.Combine(it.failErr, err)
return false
}
if it.curRows.Close() != nil {
_ = rows.Close()
if closeErr := it.curRows.Close(); closeErr != nil {
it.failErr = errs.Combine(it.failErr, closeErr, rows.Close())
return false
}
@ -137,6 +141,7 @@ func (it *loopIterator) Next(ctx context.Context, item *LoopObjectEntry) bool {
err := it.scanItem(item)
if err != nil {
it.failErr = errs.Combine(it.failErr, err)
return false
}
@ -371,7 +376,7 @@ func (db *DB) IterateLoopSegments(ctx context.Context, opts IterateLoopSegments,
if rowsErr := it.curRows.Err(); rowsErr != nil {
err = errs.Combine(err, rowsErr)
}
err = errs.Combine(err, it.curRows.Close())
err = errs.Combine(err, it.failErr, it.curRows.Close())
}()
return fn(ctx, it)
@ -387,6 +392,9 @@ type loopSegmentIterator struct {
curIndex int
curRows tagsql.Rows
cursor loopSegmentIteratorCursor
// failErr is set when either scan or next query fails during iteration.
failErr error
}
type loopSegmentIteratorCursor struct {
@ -408,11 +416,12 @@ func (it *loopSegmentIterator) Next(ctx context.Context, item *LoopSegmentEntry)
rows, err := it.doNextQuery(ctx)
if err != nil {
it.failErr = errs.Combine(it.failErr, err)
return false
}
if it.curRows.Close() != nil {
_ = rows.Close()
if failErr := it.curRows.Close(); failErr != nil {
it.failErr = errs.Combine(it.failErr, failErr, rows.Close())
return false
}
@ -425,6 +434,7 @@ func (it *loopSegmentIterator) Next(ctx context.Context, item *LoopSegmentEntry)
err := it.scanItem(ctx, item)
if err != nil {
it.failErr = errs.Combine(it.failErr, err)
return false
}