2019-12-03 17:40:04 +00:00
|
|
|
// Copyright (C) 2019 Storj Labs, Inc.
|
|
|
|
// See LICENSE for copying information.
|
|
|
|
|
|
|
|
package cockroachkv
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"context"
|
|
|
|
"database/sql"
|
2020-07-16 16:50:15 +01:00
|
|
|
"errors"
|
2019-12-03 17:40:04 +00:00
|
|
|
"fmt"
|
|
|
|
|
|
|
|
"github.com/zeebo/errs"
|
|
|
|
|
2020-05-26 16:58:05 +01:00
|
|
|
"storj.io/storj/private/dbutil/cockroachutil"
|
2020-01-19 13:42:08 +00:00
|
|
|
"storj.io/storj/private/tagsql"
|
2019-12-03 17:40:04 +00:00
|
|
|
"storj.io/storj/storage"
|
|
|
|
)
|
|
|
|
|
|
|
|
type orderedCockroachIterator struct {
|
|
|
|
client *Client
|
|
|
|
opts *storage.IterateOptions
|
|
|
|
delimiter byte
|
|
|
|
batchSize int
|
|
|
|
curIndex int
|
2020-01-19 13:42:08 +00:00
|
|
|
curRows tagsql.Rows
|
2019-12-03 17:40:04 +00:00
|
|
|
skipPrefix bool
|
|
|
|
lastKeySeen storage.Key
|
|
|
|
largestKey storage.Key
|
|
|
|
errEncountered error
|
|
|
|
}
|
|
|
|
|
2020-01-19 19:56:51 +00:00
|
|
|
func newOrderedCockroachIterator(ctx context.Context, cli *Client, opts storage.IterateOptions) (_ *orderedCockroachIterator, err error) {
|
2019-12-03 17:40:04 +00:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
if opts.Prefix == nil {
|
|
|
|
opts.Prefix = storage.Key("")
|
|
|
|
}
|
|
|
|
if opts.First == nil {
|
|
|
|
opts.First = storage.Key("")
|
|
|
|
}
|
|
|
|
if opts.First.Less(opts.Prefix) {
|
|
|
|
opts.First = opts.Prefix
|
|
|
|
}
|
|
|
|
|
2020-05-27 22:05:02 +01:00
|
|
|
oci := &orderedCockroachIterator{
|
2019-12-03 17:40:04 +00:00
|
|
|
client: cli,
|
|
|
|
opts: &opts,
|
|
|
|
delimiter: byte('/'),
|
2020-01-19 19:56:51 +00:00
|
|
|
batchSize: opts.Limit,
|
2019-12-03 17:40:04 +00:00
|
|
|
curIndex: 0,
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(opts.Prefix) > 0 {
|
2020-05-27 22:05:02 +01:00
|
|
|
oci.largestKey = storage.AfterPrefix(opts.Prefix)
|
2019-12-03 17:40:04 +00:00
|
|
|
}
|
|
|
|
|
2020-05-27 22:05:02 +01:00
|
|
|
newRows, err := oci.doNextQuery(ctx)
|
2019-12-03 17:40:04 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2020-05-27 22:05:02 +01:00
|
|
|
oci.curRows = newRows
|
2019-12-03 17:40:04 +00:00
|
|
|
|
2020-05-27 22:05:02 +01:00
|
|
|
return oci, nil
|
2019-12-03 17:40:04 +00:00
|
|
|
}
|
|
|
|
|
2020-05-27 22:05:02 +01:00
|
|
|
func (oci *orderedCockroachIterator) Close() error {
|
2020-01-28 01:39:49 +00:00
|
|
|
defer mon.Task()(nil)(nil)
|
|
|
|
|
2020-01-19 13:42:08 +00:00
|
|
|
return errs.Combine(oci.curRows.Err(), oci.errEncountered, oci.curRows.Close())
|
2019-12-03 17:40:04 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Next fills in info for the next item in an ongoing listing.
|
2020-05-27 22:05:02 +01:00
|
|
|
func (oci *orderedCockroachIterator) Next(ctx context.Context, item *storage.ListItem) bool {
|
2019-12-03 17:40:04 +00:00
|
|
|
defer mon.Task()(&ctx)(nil)
|
|
|
|
|
|
|
|
for {
|
2020-05-05 21:05:57 +01:00
|
|
|
for {
|
|
|
|
nextTask := mon.TaskNamed("check_next_row")(nil)
|
2020-05-27 22:05:02 +01:00
|
|
|
next := oci.curRows.Next()
|
2020-05-05 21:05:57 +01:00
|
|
|
nextTask(nil)
|
|
|
|
if next {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
2020-01-28 01:39:49 +00:00
|
|
|
result := func() bool {
|
|
|
|
defer mon.TaskNamed("acquire_new_query")(nil)(nil)
|
|
|
|
|
2020-05-26 16:58:05 +01:00
|
|
|
retry := false
|
2020-07-16 16:50:15 +01:00
|
|
|
if err := oci.curRows.Err(); err != nil && !errors.Is(err, sql.ErrNoRows) {
|
2020-05-26 16:58:05 +01:00
|
|
|
// This NeedsRetry needs to be exported here because it is
|
|
|
|
// expected behavior for cockroach to return retryable errors
|
|
|
|
// that will be captured in this Rows object.
|
|
|
|
if cockroachutil.NeedsRetry(err) {
|
|
|
|
mon.Event("needed_retry")
|
|
|
|
retry = true
|
|
|
|
} else {
|
2020-05-27 22:05:02 +01:00
|
|
|
oci.errEncountered = errs.Wrap(err)
|
2020-05-26 16:58:05 +01:00
|
|
|
return false
|
|
|
|
}
|
2020-01-28 01:39:49 +00:00
|
|
|
}
|
2020-05-27 22:05:02 +01:00
|
|
|
if err := oci.curRows.Close(); err != nil {
|
2020-05-26 16:58:05 +01:00
|
|
|
if cockroachutil.NeedsRetry(err) {
|
|
|
|
mon.Event("needed_retry")
|
|
|
|
retry = true
|
|
|
|
} else {
|
2020-05-27 22:05:02 +01:00
|
|
|
oci.errEncountered = errs.Wrap(err)
|
2020-05-26 16:58:05 +01:00
|
|
|
return false
|
|
|
|
}
|
2020-01-28 01:39:49 +00:00
|
|
|
}
|
2020-05-27 22:05:02 +01:00
|
|
|
if oci.curIndex < oci.batchSize && !retry {
|
2020-01-28 01:39:49 +00:00
|
|
|
return false
|
|
|
|
}
|
2020-05-27 22:05:02 +01:00
|
|
|
newRows, err := oci.doNextQuery(ctx)
|
2020-01-28 01:39:49 +00:00
|
|
|
if err != nil {
|
2020-05-27 22:05:02 +01:00
|
|
|
oci.errEncountered = errs.Wrap(err)
|
2020-01-28 01:39:49 +00:00
|
|
|
return false
|
|
|
|
}
|
2020-05-27 22:05:02 +01:00
|
|
|
oci.curRows = newRows
|
|
|
|
oci.curIndex = 0
|
2020-01-28 01:39:49 +00:00
|
|
|
return true
|
|
|
|
}()
|
|
|
|
if !result {
|
|
|
|
return result
|
2019-12-03 17:40:04 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
var k, v []byte
|
2020-01-28 01:39:49 +00:00
|
|
|
scanTask := mon.TaskNamed("scan_next_row")(nil)
|
2020-05-27 22:05:02 +01:00
|
|
|
err := oci.curRows.Scan(&k, &v)
|
2020-01-28 01:39:49 +00:00
|
|
|
scanTask(&err)
|
2019-12-03 17:40:04 +00:00
|
|
|
if err != nil {
|
2020-05-27 22:05:02 +01:00
|
|
|
oci.errEncountered = errs.Wrap(err)
|
2019-12-03 17:40:04 +00:00
|
|
|
return false
|
|
|
|
}
|
2020-05-27 22:05:02 +01:00
|
|
|
oci.curIndex++
|
2019-12-03 17:40:04 +00:00
|
|
|
|
2020-05-27 22:05:02 +01:00
|
|
|
if !bytes.HasPrefix(k, []byte(oci.opts.Prefix)) {
|
2019-12-03 17:40:04 +00:00
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
item.Key = storage.Key(k)
|
|
|
|
item.Value = storage.Value(v)
|
|
|
|
item.IsPrefix = false
|
|
|
|
|
2020-05-27 22:05:02 +01:00
|
|
|
if !oci.opts.Recurse {
|
|
|
|
if idx := bytes.IndexByte(item.Key[len(oci.opts.Prefix):], oci.delimiter); idx >= 0 {
|
|
|
|
item.Key = item.Key[:len(oci.opts.Prefix)+idx+1]
|
2019-12-03 17:40:04 +00:00
|
|
|
item.Value = nil
|
|
|
|
item.IsPrefix = true
|
|
|
|
}
|
|
|
|
}
|
2020-05-27 22:05:02 +01:00
|
|
|
if oci.lastKeySeen.Equal(item.Key) {
|
2019-12-03 17:40:04 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2020-05-27 22:05:02 +01:00
|
|
|
oci.skipPrefix = item.IsPrefix
|
|
|
|
oci.lastKeySeen = item.Key
|
2019-12-03 17:40:04 +00:00
|
|
|
return true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-01-19 13:42:08 +00:00
|
|
|
func (oci *orderedCockroachIterator) doNextQuery(ctx context.Context) (_ tagsql.Rows, err error) {
|
2019-12-03 17:40:04 +00:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
|
|
|
gt := ">"
|
2020-05-27 22:05:02 +01:00
|
|
|
start := oci.lastKeySeen
|
2019-12-03 17:40:04 +00:00
|
|
|
|
2020-06-28 04:56:29 +01:00
|
|
|
largestKey := []byte(oci.largestKey)
|
|
|
|
if largestKey == nil {
|
|
|
|
// github.com/lib/pq would treat nil as an empty bytea array, while
|
|
|
|
// github.com/jackc/pgx will treat nil as NULL. Make an explicit empty
|
|
|
|
// byte array so that they'll work the same.
|
|
|
|
largestKey = []byte{}
|
|
|
|
}
|
2019-12-03 17:40:04 +00:00
|
|
|
if len(start) == 0 {
|
2020-05-27 22:05:02 +01:00
|
|
|
start = oci.opts.First
|
2019-12-03 17:40:04 +00:00
|
|
|
gt = ">="
|
2020-05-27 22:05:02 +01:00
|
|
|
} else if oci.skipPrefix {
|
2019-12-03 17:40:04 +00:00
|
|
|
start = storage.AfterPrefix(start)
|
|
|
|
gt = ">="
|
|
|
|
}
|
|
|
|
|
2020-05-27 22:05:02 +01:00
|
|
|
return oci.client.db.QueryContext(ctx, fmt.Sprintf(`
|
2019-12-03 17:40:04 +00:00
|
|
|
SELECT pd.fullpath, pd.metadata
|
|
|
|
FROM pathdata pd
|
|
|
|
WHERE pd.fullpath %s $1:::BYTEA
|
|
|
|
AND ($2:::BYTEA = '':::BYTEA OR pd.fullpath < $2:::BYTEA)
|
|
|
|
LIMIT $3
|
2020-06-28 04:56:29 +01:00
|
|
|
`, gt), start, largestKey, oci.batchSize)
|
2019-12-03 17:40:04 +00:00
|
|
|
}
|