satellite/metainfo/metabase: fix iterator boundaries

Currently the old encrypted keys may not match the path component
encoding. Change the iterator such that the prefixes handle arbitrary
byte sequences.

Change-Id: I0a50049f4ef9887e1c4df6f9692f967a054430eb
This commit is contained in:
Egon Elbre 2021-02-28 20:27:38 +02:00
parent 68605f32ed
commit 261a4c1c09
3 changed files with 184 additions and 57 deletions

View File

@ -21,14 +21,16 @@ type objectsIterator struct {
projectID uuid.UUID
bucketName string
status ObjectStatus
limitKey ObjectKey
prefix ObjectKey
prefixLimit ObjectKey
batchSize int
recursive bool
includePrefixes bool
curIndex int
curRows tagsql.Rows
cursor iterateCursor
curIndex int
curRows tagsql.Rows
cursor iterateCursor
inclusiveCursor bool
skipPrefix ObjectKey
doNextQuery func(context.Context, *objectsIterator) (_ tagsql.Rows, err error)
@ -48,10 +50,12 @@ func iterateAllVersions(ctx context.Context, db *DB, opts IterateObjects, fn fun
projectID: opts.ProjectID,
bucketName: opts.BucketName,
limitKey: nextPrefix(opts.Prefix),
prefix: opts.Prefix,
prefixLimit: prefixLimit(opts.Prefix),
batchSize: opts.BatchSize,
recursive: true,
includePrefixes: true,
inclusiveCursor: false,
curIndex: 0,
cursor: iterateCursor{
@ -63,8 +67,9 @@ func iterateAllVersions(ctx context.Context, db *DB, opts IterateObjects, fn fun
// start from either the cursor or prefix, depending on which is larger
if lessKey(it.cursor.Key, opts.Prefix) {
it.cursor.Key = beforeKey(opts.Prefix)
it.cursor.Key = opts.Prefix
it.cursor.Version = -1
it.inclusiveCursor = true
}
return iterate(ctx, it, fn)
@ -79,7 +84,8 @@ func iterateAllVersionsWithStatus(ctx context.Context, db *DB, opts IterateObjec
projectID: opts.ProjectID,
bucketName: opts.BucketName,
status: opts.Status,
limitKey: nextPrefix(opts.Prefix),
prefix: opts.Prefix,
prefixLimit: prefixLimit(opts.Prefix),
batchSize: opts.BatchSize,
recursive: opts.Recursive,
includePrefixes: true,
@ -94,8 +100,9 @@ func iterateAllVersionsWithStatus(ctx context.Context, db *DB, opts IterateObjec
// start from either the cursor or prefix, depending on which is larger
if lessKey(it.cursor.Key, opts.Prefix) {
it.cursor.Key = beforeKey(opts.Prefix)
it.cursor.Key = opts.Prefix
it.cursor.Version = -1
it.inclusiveCursor = true
}
return iterate(ctx, it, fn)
@ -115,7 +122,8 @@ func iteratePendingObjectsByKey(ctx context.Context, db *DB, opts IteratePending
projectID: opts.ProjectID,
bucketName: opts.BucketName,
limitKey: "",
prefix: "",
prefixLimit: "",
batchSize: opts.BatchSize,
recursive: false,
includePrefixes: false,
@ -143,6 +151,7 @@ func iterate(ctx context.Context, it *objectsIterator, fn func(context.Context,
if err != nil {
return err
}
it.inclusiveCursor = false
defer func() {
if rowsErr := it.curRows.Err(); rowsErr != nil {
@ -237,7 +246,12 @@ func (it *objectsIterator) next(ctx context.Context, item *ObjectEntry) bool {
func doNextQueryAllVersionsWithoutStatus(ctx context.Context, it *objectsIterator) (_ tagsql.Rows, err error) {
defer mon.Task()(&ctx)(&err)
if it.limitKey == "" {
cursorCompare := ">"
if it.inclusiveCursor {
cursorCompare = ">="
}
if it.prefixLimit == "" {
return it.db.db.Query(ctx, `
SELECT
object_key, stream_id, version, status,
@ -249,7 +263,7 @@ func doNextQueryAllVersionsWithoutStatus(ctx context.Context, it *objectsIterato
FROM objects
WHERE
project_id = $1 AND bucket_name = $2
AND (object_key, version) > ($3, $4)
AND (object_key, version) `+cursorCompare+` ($3, $4)
ORDER BY object_key ASC, version ASC
LIMIT $5
`, it.projectID, it.bucketName,
@ -271,24 +285,28 @@ func doNextQueryAllVersionsWithoutStatus(ctx context.Context, it *objectsIterato
FROM objects
WHERE
project_id = $1 AND bucket_name = $2
AND (object_key, version) > ($3, $4)
AND (object_key, version) `+cursorCompare+` ($3, $4)
AND object_key < $5
ORDER BY object_key ASC, version ASC
LIMIT $6
`, it.projectID, it.bucketName,
[]byte(it.cursor.Key), int(it.cursor.Version),
[]byte(it.limitKey),
[]byte(it.prefixLimit),
it.batchSize,
// len(it.limitKey)+1, // TODO uncomment when CRDB issue will be fixed
// len(it.prefix)+1, // TODO uncomment when CRDB issue will be fixed
)
}
func doNextQueryAllVersionsWithStatus(ctx context.Context, it *objectsIterator) (_ tagsql.Rows, err error) {
defer mon.Task()(&ctx)(&err)
if it.limitKey == "" {
cursorCompare := ">"
if it.inclusiveCursor {
cursorCompare = ">="
}
if it.prefixLimit == "" {
return it.db.db.Query(ctx, `
SELECT
object_key, stream_id, version, status,
@ -301,7 +319,7 @@ func doNextQueryAllVersionsWithStatus(ctx context.Context, it *objectsIterator)
WHERE
project_id = $1 AND bucket_name = $2
AND status = $3
AND (object_key, version) > ($4, $5)
AND (object_key, version) `+cursorCompare+` ($4, $5)
ORDER BY object_key ASC, version ASC
LIMIT $6
`, it.projectID, it.bucketName,
@ -325,17 +343,17 @@ func doNextQueryAllVersionsWithStatus(ctx context.Context, it *objectsIterator)
WHERE
project_id = $1 AND bucket_name = $2
AND status = $3
AND (object_key, version) > ($4, $5)
AND (object_key, version) `+cursorCompare+` ($4, $5)
AND object_key < $6
ORDER BY object_key ASC, version ASC
LIMIT $7
`, it.projectID, it.bucketName,
it.status,
[]byte(it.cursor.Key), int(it.cursor.Version),
[]byte(it.limitKey),
[]byte(it.prefixLimit),
it.batchSize,
// len(it.limitKey)+1, // TODO uncomment when CRDB issue will be fixed
// len(it.prefix)+1, // TODO uncomment when CRDB issue will be fixed
)
}
@ -381,30 +399,28 @@ func (it *objectsIterator) scanItem(item *ObjectEntry) error {
return err
}
if it.prefix != "" {
if !strings.HasPrefix(string(item.ObjectKey), string(it.prefix)) {
return Error.New("internal server error: project:%q bucket:%q key:%q prefix:%q", it.projectID, it.bucketName, item.ObjectKey, it.prefix)
}
}
// TODO this should be done with SQL query
item.ObjectKey = item.ObjectKey[len(it.limitKey):]
item.ObjectKey = item.ObjectKey[len(it.prefix):]
return nil
}
// nextPrefix returns the next prefix of the same length.
func nextPrefix(key ObjectKey) ObjectKey {
if key == "" {
func prefixLimit(a ObjectKey) ObjectKey {
if a == "" {
return ""
}
after := []byte(key)
after[len(after)-1]++
return ObjectKey(after)
}
// beforeKey returns the key just before the key.
func beforeKey(key ObjectKey) ObjectKey {
if key == "" {
return ""
if a[len(a)-1] == 0xFF {
return a + "\x00"
}
before := []byte(key)
before[len(before)-1]--
return ObjectKey(append(before, 0xFF))
key := []byte(a)
key[len(key)-1]++
return ObjectKey(key)
}
// lessKey returns whether a < b.

View File

@ -8,6 +8,8 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/common/uuid"
@ -345,6 +347,61 @@ func TestIterateObjects(t *testing.T) {
Result: nil,
}.Check(ctx, t, db)
})
t.Run("boundaries", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
projectID, bucketName := uuid.UUID{1}, "bucky"
queries := []metabase.ObjectKey{""}
for a := 0; a <= 0xFF; a++ {
if 5 < a && a < 250 {
continue
}
queries = append(queries, metabase.ObjectKey([]byte{byte(a)}))
for b := 0; b <= 0xFF; b++ {
if 5 < b && b < 250 {
continue
}
queries = append(queries, metabase.ObjectKey([]byte{byte(a), byte(b)}))
}
}
createObjectsWithKeys(ctx, t, db, projectID, bucketName, queries[1:])
for _, cursor := range queries {
for _, prefix := range queries {
var collector IterateCollector
err := db.IterateObjectsAllVersions(ctx, metabase.IterateObjects{
ProjectID: projectID,
BucketName: bucketName,
Cursor: metabase.IterateCursor{
Key: cursor,
Version: -1,
},
Prefix: prefix,
}, collector.Add)
require.NoError(t, err)
}
}
})
t.Run("verify-iterator-boundary", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
projectID, bucketName := uuid.UUID{1}, "bucky"
queries := []metabase.ObjectKey{"\x00\xFF"}
createObjectsWithKeys(ctx, t, db, projectID, bucketName, queries)
var collector IterateCollector
err := db.IterateObjectsAllVersions(ctx, metabase.IterateObjects{
ProjectID: projectID,
BucketName: bucketName,
Cursor: metabase.IterateCursor{
Key: metabase.ObjectKey([]byte{}),
Version: -1,
},
Prefix: metabase.ObjectKey([]byte{1}),
}, collector.Add)
require.NoError(t, err)
})
})
}
@ -880,8 +937,79 @@ func TestIterateObjectsWithStatus(t *testing.T) {
),
}.Check(ctx, t, db)
})
t.Run("boundaries", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
projectID, bucketName := uuid.UUID{1}, "bucky"
queries := []metabase.ObjectKey{""}
for a := 0; a <= 0xFF; a++ {
if 5 < a && a < 250 {
continue
}
queries = append(queries, metabase.ObjectKey([]byte{byte(a)}))
for b := 0; b <= 0xFF; b++ {
if 5 < b && b < 250 {
continue
}
queries = append(queries, metabase.ObjectKey([]byte{byte(a), byte(b)}))
}
}
createObjectsWithKeys(ctx, t, db, projectID, bucketName, queries[1:])
for _, cursor := range queries {
for _, prefix := range queries {
var collector IterateCollector
err := db.IterateObjectsAllVersionsWithStatus(ctx, metabase.IterateObjectsWithStatus{
ProjectID: projectID,
BucketName: bucketName,
Cursor: metabase.IterateCursor{
Key: cursor,
Version: -1,
},
Prefix: prefix,
Status: metabase.Committed,
}, collector.Add)
require.NoError(t, err)
err = db.IterateObjectsAllVersionsWithStatus(ctx, metabase.IterateObjectsWithStatus{
ProjectID: projectID,
BucketName: bucketName,
Cursor: metabase.IterateCursor{
Key: cursor,
Version: -1,
},
Prefix: prefix,
Recursive: true,
Status: metabase.Committed,
}, collector.Add)
require.NoError(t, err)
}
}
})
t.Run("verify-iterator-boundary", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
projectID, bucketName := uuid.UUID{1}, "bucky"
queries := []metabase.ObjectKey{"\x00\xFF"}
createObjectsWithKeys(ctx, t, db, projectID, bucketName, queries)
var collector IterateCollector
err := db.IterateObjectsAllVersionsWithStatus(ctx, metabase.IterateObjectsWithStatus{
ProjectID: projectID,
BucketName: bucketName,
Cursor: metabase.IterateCursor{
Key: metabase.ObjectKey([]byte{}),
Version: -1,
},
Prefix: metabase.ObjectKey([]byte{1}),
Status: metabase.Committed,
}, collector.Add)
require.NoError(t, err)
})
})
}
func TestIteratePendingObjectsWithObjectKey(t *testing.T) {
All(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
obj := randObjectStream()

View File

@ -9,38 +9,21 @@ import (
"github.com/stretchr/testify/require"
)
func TestNextPrefix(t *testing.T) {
func TestPrefixLimit(t *testing.T) {
unchanged := ObjectKey("unchanged")
_ = nextPrefix(unchanged)
_ = prefixLimit(unchanged)
require.Equal(t, ObjectKey("unchanged"), unchanged)
tests := []struct{ in, exp ObjectKey }{
{"", ""},
{"a", "b"},
{"\xF1", "\xF2"},
{"\xFF", "\xFF\x00"},
}
for _, test := range tests {
require.Equal(t, test.exp, nextPrefix(test.in))
require.Equal(t, test.exp, prefixLimit(test.in))
if test.in != "" {
require.True(t, lessKey(test.in, test.exp))
}
}
}
func TestBeforeKey(t *testing.T) {
unchanged := ObjectKey("unchanged")
_ = beforeKey(unchanged)
require.Equal(t, ObjectKey("unchanged"), unchanged)
tests := []struct{ in, exp ObjectKey }{
{"", ""},
{"b", "a\xFF"},
{"\xF1", "\xF0\xFF"},
}
for _, test := range tests {
require.Equal(t, test.exp, beforeKey(test.in))
if test.in != "" {
require.True(t, lessKey(test.exp, test.in))
}
}
}