storj/storage/cockroachkv/ordered_iterator.go

182 lines
4.0 KiB
Go
Raw Normal View History

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package cockroachkv
import (
"bytes"
"context"
"database/sql"
"fmt"
"github.com/zeebo/errs"
"storj.io/storj/private/dbutil/cockroachutil"
"storj.io/storj/storage"
)
type orderedCockroachIterator struct {
client *Client
opts *storage.IterateOptions
delimiter byte
batchSize int
curIndex int
curRows *sql.Rows
skipPrefix bool
lastKeySeen storage.Key
largestKey storage.Key
errEncountered error
}
func newOrderedCockroachIterator(ctx context.Context, cli *Client, opts storage.IterateOptions) (_ *orderedCockroachIterator, err error) {
defer mon.Task()(&ctx)(&err)
if opts.Prefix == nil {
opts.Prefix = storage.Key("")
}
if opts.First == nil {
opts.First = storage.Key("")
}
if opts.First.Less(opts.Prefix) {
opts.First = opts.Prefix
}
oci := &orderedCockroachIterator{
client: cli,
opts: &opts,
delimiter: byte('/'),
batchSize: opts.Limit,
curIndex: 0,
}
if len(opts.Prefix) > 0 {
oci.largestKey = storage.AfterPrefix(opts.Prefix)
}
newRows, err := oci.doNextQuery(ctx)
if err != nil {
return nil, err
}
oci.curRows = newRows
return oci, nil
}
func (oci *orderedCockroachIterator) Close() error {
defer mon.Task()(nil)(nil)
return errs.Combine(oci.errEncountered, oci.curRows.Close())
}
// Next fills in info for the next item in an ongoing listing.
func (oci *orderedCockroachIterator) Next(ctx context.Context, item *storage.ListItem) bool {
defer mon.Task()(&ctx)(nil)
for {
for {
nextTask := mon.TaskNamed("check_next_row")(nil)
next := oci.curRows.Next()
nextTask(nil)
if next {
break
}
result := func() bool {
defer mon.TaskNamed("acquire_new_query")(nil)(nil)
retry := false
if err := oci.curRows.Err(); err != nil && err != sql.ErrNoRows {
// This NeedsRetry needs to be exported here because it is
// expected behavior for cockroach to return retryable errors
// that will be captured in this Rows object.
if cockroachutil.NeedsRetry(err) {
mon.Event("needed_retry")
retry = true
} else {
oci.errEncountered = errs.Wrap(err)
return false
}
}
if err := oci.curRows.Close(); err != nil {
if cockroachutil.NeedsRetry(err) {
mon.Event("needed_retry")
retry = true
} else {
oci.errEncountered = errs.Wrap(err)
return false
}
}
if oci.curIndex < oci.batchSize && !retry {
return false
}
newRows, err := oci.doNextQuery(ctx)
if err != nil {
oci.errEncountered = errs.Wrap(err)
return false
}
oci.curRows = newRows
oci.curIndex = 0
return true
}()
if !result {
return result
}
}
var k, v []byte
scanTask := mon.TaskNamed("scan_next_row")(nil)
err := oci.curRows.Scan(&k, &v)
scanTask(&err)
if err != nil {
oci.errEncountered = errs.Wrap(err)
return false
}
oci.curIndex++
if !bytes.HasPrefix(k, []byte(oci.opts.Prefix)) {
return false
}
item.Key = storage.Key(k)
item.Value = storage.Value(v)
item.IsPrefix = false
if !oci.opts.Recurse {
if idx := bytes.IndexByte(item.Key[len(oci.opts.Prefix):], oci.delimiter); idx >= 0 {
item.Key = item.Key[:len(oci.opts.Prefix)+idx+1]
item.Value = nil
item.IsPrefix = true
}
}
if oci.lastKeySeen.Equal(item.Key) {
continue
}
oci.skipPrefix = item.IsPrefix
oci.lastKeySeen = item.Key
return true
}
}
func (oci *orderedCockroachIterator) doNextQuery(ctx context.Context) (_ *sql.Rows, err error) {
defer mon.Task()(&ctx)(&err)
gt := ">"
start := oci.lastKeySeen
if len(start) == 0 {
start = oci.opts.First
gt = ">="
} else if oci.skipPrefix {
start = storage.AfterPrefix(start)
gt = ">="
}
return oci.client.db.QueryContext(ctx, fmt.Sprintf(`
SELECT pd.fullpath, pd.metadata
FROM pathdata pd
WHERE pd.fullpath %s $1:::BYTEA
AND ($2:::BYTEA = '':::BYTEA OR pd.fullpath < $2:::BYTEA)
LIMIT $3
`, gt), start, []byte(oci.largestKey), oci.batchSize)
}