storage/{cockroachkv,postgreskv}: detailed monitoring for list

Change-Id: Iedba10776367233e59f3a6523efdb303b836b241
This commit is contained in:
Jeff Wendling 2020-01-27 18:39:49 -07:00 committed by Egon Elbre
parent dbf46c4aa7
commit 5d6cb68cd7
3 changed files with 60 additions and 34 deletions

View File

@ -61,6 +61,8 @@ func newOrderedCockroachIterator(ctx context.Context, cli *Client, opts storage.
}
func (opi *orderedCockroachIterator) Close() error {
defer mon.Task()(nil)(nil)
return errs.Combine(opi.errEncountered, opi.curRows.Close())
}
@ -70,28 +72,38 @@ func (opi *orderedCockroachIterator) Next(ctx context.Context, item *storage.Lis
for {
for !opi.curRows.Next() {
if err := opi.curRows.Err(); err != nil && err != sql.ErrNoRows {
opi.errEncountered = errs.Wrap(err)
return false
result := func() bool {
defer mon.TaskNamed("acquire_new_query")(nil)(nil)
if err := opi.curRows.Err(); err != nil && err != sql.ErrNoRows {
opi.errEncountered = errs.Wrap(err)
return false
}
if err := opi.curRows.Close(); err != nil {
opi.errEncountered = errs.Wrap(err)
return false
}
if opi.curIndex < opi.batchSize {
return false
}
newRows, err := opi.doNextQuery(ctx)
if err != nil {
opi.errEncountered = errs.Wrap(err)
return false
}
opi.curRows = newRows
opi.curIndex = 0
return true
}()
if !result {
return result
}
if err := opi.curRows.Close(); err != nil {
opi.errEncountered = errs.Wrap(err)
return false
}
if opi.curIndex < opi.batchSize {
return false
}
newRows, err := opi.doNextQuery(ctx)
if err != nil {
opi.errEncountered = errs.Wrap(err)
return false
}
opi.curRows = newRows
opi.curIndex = 0
}
var k, v []byte
scanTask := mon.TaskNamed("scan_next_row")(nil)
err := opi.curRows.Scan(&k, &v)
scanTask(&err)
if err != nil {
opi.errEncountered = errs.Wrap(err)
return false

View File

@ -54,6 +54,7 @@ func ListV2(ctx context.Context, store KeyValueStore, opts ListOptions) (result
}
}
task := mon.TaskNamed("appending_to_results")(nil)
if opts.IncludeValue {
result = append(result, ListItem{
Key: CloneKey(relativeKey),
@ -66,6 +67,7 @@ func ListV2(ctx context.Context, store KeyValueStore, opts ListOptions) (result
IsPrefix: item.IsPrefix,
})
}
task(nil)
}
// we still need to consume one item for the more flag

View File

@ -61,6 +61,8 @@ func newOrderedPostgresIterator(ctx context.Context, cli *Client, opts storage.I
}
func (opi *orderedPostgresIterator) Close() error {
defer mon.Task()(nil)(nil)
return errs.Combine(opi.errEncountered, opi.curRows.Close())
}
@ -70,28 +72,38 @@ func (opi *orderedPostgresIterator) Next(ctx context.Context, item *storage.List
for {
for !opi.curRows.Next() {
if err := opi.curRows.Err(); err != nil && err != sql.ErrNoRows {
opi.errEncountered = errs.Wrap(err)
return false
result := func() bool {
defer mon.TaskNamed("acquire_new_query")(nil)(nil)
if err := opi.curRows.Err(); err != nil && err != sql.ErrNoRows {
opi.errEncountered = errs.Wrap(err)
return false
}
if err := opi.curRows.Close(); err != nil {
opi.errEncountered = errs.Wrap(err)
return false
}
if opi.curIndex < opi.batchSize {
return false
}
newRows, err := opi.doNextQuery(ctx)
if err != nil {
opi.errEncountered = errs.Wrap(err)
return false
}
opi.curRows = newRows
opi.curIndex = 0
return true
}()
if !result {
return result
}
if err := opi.curRows.Close(); err != nil {
opi.errEncountered = errs.Wrap(err)
return false
}
if opi.curIndex < opi.batchSize {
return false
}
newRows, err := opi.doNextQuery(ctx)
if err != nil {
opi.errEncountered = errs.Wrap(err)
return false
}
opi.curRows = newRows
opi.curIndex = 0
}
var k, v []byte
scanTask := mon.TaskNamed("scan_next_row")(nil)
err := opi.curRows.Scan(&k, &v)
scanTask(&err)
if err != nil {
opi.errEncountered = errs.Wrap(err)
return false