satellite/metabase: fix listing prefixes with cursor set

We were not skipping the initial prefix from the cursor.

Change-Id: I2bb472e960b92cae77fd1226de0b26fac79c429b
This commit is contained in:
Michał Niewrzał 2021-06-25 16:01:12 +02:00 committed by Egon Elbre
parent 8f4505f532
commit 2e9d3a737c
4 changed files with 378 additions and 31 deletions

View File

@ -26,12 +26,11 @@ type objectsIterator struct {
batchSize int batchSize int
recursive bool recursive bool
curIndex int curIndex int
curRows tagsql.Rows curRows tagsql.Rows
cursor iterateCursor cursor iterateCursor // not relative to prefix
inclusiveCursor bool
skipPrefix ObjectKey skipPrefix ObjectKey // relative to prefix
doNextQuery func(context.Context, *objectsIterator) (_ tagsql.Rows, err error) doNextQuery func(context.Context, *objectsIterator) (_ tagsql.Rows, err error)
// failErr is set when either scan or next query fails during iteration. // failErr is set when either scan or next query fails during iteration.
@ -39,9 +38,10 @@ type objectsIterator struct {
} }
type iterateCursor struct { type iterateCursor struct {
Key ObjectKey Key ObjectKey
Version Version Version Version
StreamID uuid.UUID StreamID uuid.UUID
Inclusive bool
} }
func iterateAllVersions(ctx context.Context, db *DB, opts IterateObjects, fn func(context.Context, ObjectsIterator) error) (err error) { func iterateAllVersions(ctx context.Context, db *DB, opts IterateObjects, fn func(context.Context, ObjectsIterator) error) (err error) {
@ -50,19 +50,15 @@ func iterateAllVersions(ctx context.Context, db *DB, opts IterateObjects, fn fun
it := &objectsIterator{ it := &objectsIterator{
db: db, db: db,
projectID: opts.ProjectID, projectID: opts.ProjectID,
bucketName: []byte(opts.BucketName), bucketName: []byte(opts.BucketName),
prefix: opts.Prefix, prefix: opts.Prefix,
prefixLimit: prefixLimit(opts.Prefix), prefixLimit: prefixLimit(opts.Prefix),
batchSize: opts.BatchSize, batchSize: opts.BatchSize,
recursive: true, recursive: true,
inclusiveCursor: false,
curIndex: 0, curIndex: 0,
cursor: iterateCursor{ cursor: firstIterateCursor(true, opts.Cursor, opts.Prefix),
Key: opts.Cursor.Key,
Version: opts.Cursor.Version,
},
doNextQuery: doNextQueryAllVersionsWithoutStatus, doNextQuery: doNextQueryAllVersionsWithoutStatus,
} }
@ -70,7 +66,7 @@ func iterateAllVersions(ctx context.Context, db *DB, opts IterateObjects, fn fun
if lessKey(it.cursor.Key, opts.Prefix) { if lessKey(it.cursor.Key, opts.Prefix) {
it.cursor.Key = opts.Prefix it.cursor.Key = opts.Prefix
it.cursor.Version = -1 it.cursor.Version = -1
it.inclusiveCursor = true it.cursor.Inclusive = true
} }
return iterate(ctx, it, fn) return iterate(ctx, it, fn)
@ -91,10 +87,8 @@ func iterateAllVersionsWithStatus(ctx context.Context, db *DB, opts IterateObjec
recursive: opts.Recursive, recursive: opts.Recursive,
curIndex: 0, curIndex: 0,
cursor: iterateCursor{ cursor: firstIterateCursor(opts.Recursive, opts.Cursor, opts.Prefix),
Key: opts.Cursor.Key,
Version: opts.Cursor.Version,
},
doNextQuery: doNextQueryAllVersionsWithStatus, doNextQuery: doNextQueryAllVersionsWithStatus,
} }
@ -102,7 +96,7 @@ func iterateAllVersionsWithStatus(ctx context.Context, db *DB, opts IterateObjec
if lessKey(it.cursor.Key, opts.Prefix) { if lessKey(it.cursor.Key, opts.Prefix) {
it.cursor.Key = opts.Prefix it.cursor.Key = opts.Prefix
it.cursor.Version = -1 it.cursor.Version = -1
it.inclusiveCursor = true it.cursor.Inclusive = true
} }
return iterate(ctx, it, fn) return iterate(ctx, it, fn)
@ -150,7 +144,7 @@ func iterate(ctx context.Context, it *objectsIterator, fn func(context.Context,
if err != nil { if err != nil {
return err return err
} }
it.inclusiveCursor = false it.cursor.Inclusive = false
defer func() { defer func() {
if rowsErr := it.curRows.Err(); rowsErr != nil { if rowsErr := it.curRows.Err(); rowsErr != nil {
@ -256,7 +250,7 @@ func doNextQueryAllVersionsWithoutStatus(ctx context.Context, it *objectsIterato
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
cursorCompare := ">" cursorCompare := ">"
if it.inclusiveCursor { if it.cursor.Inclusive {
cursorCompare = ">=" cursorCompare = ">="
} }
@ -311,7 +305,7 @@ func doNextQueryAllVersionsWithStatus(ctx context.Context, it *objectsIterator)
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
cursorCompare := ">" cursorCompare := ">"
if it.inclusiveCursor { if it.cursor.Inclusive {
cursorCompare = ">=" cursorCompare = ">="
} }
@ -433,3 +427,46 @@ func prefixLimit(a ObjectKey) ObjectKey {
func lessKey(a, b ObjectKey) bool { func lessKey(a, b ObjectKey) bool {
return bytes.Compare([]byte(a), []byte(b)) < 0 return bytes.Compare([]byte(a), []byte(b)) < 0
} }
// firstIterateCursor adjust the cursor for a non-recursive iteration.
// The cursor is non-inclusive and we need to adjust to handle prefix as cursor properly.
// We return the next possible key from the prefix.
func firstIterateCursor(recursive bool, cursor IterateCursor, prefix ObjectKey) iterateCursor {
if recursive {
return iterateCursor{
Key: cursor.Key,
Version: cursor.Version,
}
}
// when the cursor does not match the prefix, we'll return the original cursor.
if !strings.HasPrefix(string(cursor.Key), string(prefix)) {
return iterateCursor{
Key: cursor.Key,
Version: cursor.Version,
}
}
// handle case where:
// prefix: x/y/
// cursor: x/y/z/w
// In this case, we want the skip prefix to be `x/y/z` + string('/' + 1).
cursorWithoutPrefix := cursor.Key[len(prefix):]
p := strings.IndexByte(string(cursorWithoutPrefix), Delimiter)
if p < 0 {
// The cursor is not a prefix, but instead a path inside the prefix,
// so we can use it directly.
return iterateCursor{
Key: cursor.Key,
Version: cursor.Version,
}
}
// return the next prefix given a scoped path
return iterateCursor{
Key: cursor.Key[:len(prefix)+p] + ObjectKey(Delimiter+1),
Version: -1,
Inclusive: true,
}
}

View File

@ -256,6 +256,45 @@ func TestIterateObjects(t *testing.T) {
}, },
}.Check(ctx, t, db) }.Check(ctx, t, db)
metabasetest.IterateObjects{
Opts: metabase.IterateObjects{
ProjectID: projectID,
BucketName: bucketName,
Cursor: metabase.IterateCursor{Key: "a", Version: 0},
},
Result: []metabase.ObjectEntry{
objects["a"],
objects["b/1"],
objects["b/2"],
objects["b/3"],
objects["c"],
objects["c/"],
objects["c//"],
objects["c/1"],
objects["g"],
},
}.Check(ctx, t, db)
metabasetest.IterateObjects{
Opts: metabase.IterateObjects{
ProjectID: projectID,
BucketName: bucketName,
Cursor: metabase.IterateCursor{Key: "a", Version: 1},
},
Result: []metabase.ObjectEntry{
objects["b/1"],
objects["b/2"],
objects["b/3"],
objects["c"],
objects["c/"],
objects["c//"],
objects["c/1"],
objects["g"],
},
}.Check(ctx, t, db)
metabasetest.IterateObjects{ metabasetest.IterateObjects{
Opts: metabase.IterateObjects{ Opts: metabase.IterateObjects{
ProjectID: projectID, ProjectID: projectID,
@ -628,12 +667,16 @@ func TestIterateObjectsWithStatus(t *testing.T) {
t.Run("objects in one bucket in project with 2 buckets", func(t *testing.T) { t.Run("objects in one bucket in project with 2 buckets", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db) defer metabasetest.DeleteAll{}.Check(ctx, t, db)
numberOfObjectsPerBucket := 5 numberOfObjectsPerBucket := 5
expected := make([]metabase.ObjectEntry, numberOfObjectsPerBucket) expected := make([]metabase.ObjectEntry, numberOfObjectsPerBucket)
objectsBucketA := createObjects(ctx, t, db, numberOfObjectsPerBucket, uuid.UUID{1}, "bucket-a") objectsBucketA := createObjects(ctx, t, db, numberOfObjectsPerBucket, uuid.UUID{1}, "bucket-a")
objectsBucketB := createObjects(ctx, t, db, numberOfObjectsPerBucket, uuid.UUID{1}, "bucket-b") objectsBucketB := createObjects(ctx, t, db, numberOfObjectsPerBucket, uuid.UUID{1}, "bucket-b")
for i, obj := range objectsBucketA { for i, obj := range objectsBucketA {
expected[i] = objectEntryFromRaw(obj) expected[i] = objectEntryFromRaw(obj)
} }
metabasetest.IterateObjectsWithStatus{ metabasetest.IterateObjectsWithStatus{
Opts: metabase.IterateObjectsWithStatus{ Opts: metabase.IterateObjectsWithStatus{
ProjectID: uuid.UUID{1}, ProjectID: uuid.UUID{1},
@ -643,18 +686,24 @@ func TestIterateObjectsWithStatus(t *testing.T) {
}, },
Result: expected, Result: expected,
}.Check(ctx, t, db) }.Check(ctx, t, db)
metabasetest.Verify{Objects: append(objectsBucketA, objectsBucketB...)}.Check(ctx, t, db)
metabasetest.Verify{
Objects: append(objectsBucketA, objectsBucketB...),
}.Check(ctx, t, db)
}) })
t.Run("objects in one bucket with same bucketName in another project", func(t *testing.T) { t.Run("objects in one bucket with same bucketName in another project", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db) defer metabasetest.DeleteAll{}.Check(ctx, t, db)
numberOfObjectsPerBucket := 5 numberOfObjectsPerBucket := 5
expected := make([]metabase.ObjectEntry, numberOfObjectsPerBucket) expected := make([]metabase.ObjectEntry, numberOfObjectsPerBucket)
objectsProject1 := createObjects(ctx, t, db, numberOfObjectsPerBucket, uuid.UUID{1}, "mybucket") objectsProject1 := createObjects(ctx, t, db, numberOfObjectsPerBucket, uuid.UUID{1}, "mybucket")
objectsProject2 := createObjects(ctx, t, db, numberOfObjectsPerBucket, uuid.UUID{2}, "mybucket") objectsProject2 := createObjects(ctx, t, db, numberOfObjectsPerBucket, uuid.UUID{2}, "mybucket")
for i, obj := range objectsProject1 { for i, obj := range objectsProject1 {
expected[i] = objectEntryFromRaw(obj) expected[i] = objectEntryFromRaw(obj)
} }
metabasetest.IterateObjectsWithStatus{ metabasetest.IterateObjectsWithStatus{
Opts: metabase.IterateObjectsWithStatus{ Opts: metabase.IterateObjectsWithStatus{
ProjectID: uuid.UUID{1}, ProjectID: uuid.UUID{1},
@ -664,7 +713,10 @@ func TestIterateObjectsWithStatus(t *testing.T) {
}, },
Result: expected, Result: expected,
}.Check(ctx, t, db) }.Check(ctx, t, db)
metabasetest.Verify{Objects: append(objectsProject1, objectsProject2...)}.Check(ctx, t, db)
metabasetest.Verify{
Objects: append(objectsProject1, objectsProject2...),
}.Check(ctx, t, db)
}) })
t.Run("recursive", func(t *testing.T) { t.Run("recursive", func(t *testing.T) {
@ -1052,6 +1104,228 @@ func TestIterateObjectsWithStatus(t *testing.T) {
}) })
} }
func TestIterateObjectsSkipCursor(t *testing.T) {
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
projectID, bucketName := uuid.UUID{1}, "bucky"
t.Run("no prefix", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
createObjectsWithKeys(ctx, t, db, projectID, bucketName, []metabase.ObjectKey{
"08/test",
"09/test",
"10/test",
})
metabasetest.IterateObjectsWithStatus{
Opts: metabase.IterateObjectsWithStatus{
ProjectID: projectID,
BucketName: bucketName,
Recursive: false,
Prefix: "",
Cursor: metabase.IterateCursor{
Key: metabase.ObjectKey("08/"),
Version: 1,
},
Status: metabase.Committed,
},
Result: []metabase.ObjectEntry{
prefixEntry(metabase.ObjectKey("09/"), metabase.Committed),
prefixEntry(metabase.ObjectKey("10/"), metabase.Committed),
},
}.Check(ctx, t, db)
metabasetest.IterateObjectsWithStatus{
Opts: metabase.IterateObjectsWithStatus{
ProjectID: projectID,
BucketName: bucketName,
Recursive: false,
Prefix: "",
Cursor: metabase.IterateCursor{
Key: metabase.ObjectKey("08"),
Version: 1,
},
Status: metabase.Committed,
},
Result: []metabase.ObjectEntry{
prefixEntry(metabase.ObjectKey("08/"), metabase.Committed),
prefixEntry(metabase.ObjectKey("09/"), metabase.Committed),
prefixEntry(metabase.ObjectKey("10/"), metabase.Committed),
},
}.Check(ctx, t, db)
metabasetest.IterateObjectsWithStatus{
Opts: metabase.IterateObjectsWithStatus{
ProjectID: projectID,
BucketName: bucketName,
Recursive: false,
Prefix: "",
Cursor: metabase.IterateCursor{
Key: metabase.ObjectKey("08/a/x"),
Version: 1,
},
Status: metabase.Committed,
},
Result: []metabase.ObjectEntry{
prefixEntry(metabase.ObjectKey("09/"), metabase.Committed),
prefixEntry(metabase.ObjectKey("10/"), metabase.Committed),
},
}.Check(ctx, t, db)
})
t.Run("prefix", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
createObjectsWithKeys(ctx, t, db, projectID, bucketName, []metabase.ObjectKey{
"2017/05/08/test",
"2017/05/09/test",
"2017/05/10/test",
})
metabasetest.IterateObjectsWithStatus{
Opts: metabase.IterateObjectsWithStatus{
ProjectID: projectID,
BucketName: bucketName,
Recursive: false,
Prefix: metabase.ObjectKey("2017/05/"),
Cursor: metabase.IterateCursor{
Key: metabase.ObjectKey("2017/05/08"),
Version: 1,
},
Status: metabase.Committed,
},
Result: []metabase.ObjectEntry{
prefixEntry(metabase.ObjectKey("08/"), metabase.Committed),
prefixEntry(metabase.ObjectKey("09/"), metabase.Committed),
prefixEntry(metabase.ObjectKey("10/"), metabase.Committed),
},
}.Check(ctx, t, db)
metabasetest.IterateObjectsWithStatus{
Opts: metabase.IterateObjectsWithStatus{
ProjectID: projectID,
BucketName: bucketName,
Recursive: false,
Prefix: metabase.ObjectKey("2017/05/"),
Cursor: metabase.IterateCursor{
Key: metabase.ObjectKey("2017/05/08/"),
Version: 1,
},
Status: metabase.Committed,
},
Result: []metabase.ObjectEntry{
prefixEntry(metabase.ObjectKey("09/"), metabase.Committed),
prefixEntry(metabase.ObjectKey("10/"), metabase.Committed),
},
}.Check(ctx, t, db)
metabasetest.IterateObjectsWithStatus{
Opts: metabase.IterateObjectsWithStatus{
ProjectID: projectID,
BucketName: bucketName,
Recursive: false,
Prefix: metabase.ObjectKey("2017/05/"),
Cursor: metabase.IterateCursor{
Key: metabase.ObjectKey("2017/05/08/a/x"),
Version: 1,
},
Status: metabase.Committed,
},
Result: []metabase.ObjectEntry{
prefixEntry(metabase.ObjectKey("09/"), metabase.Committed),
prefixEntry(metabase.ObjectKey("10/"), metabase.Committed),
},
}.Check(ctx, t, db)
})
t.Run("batch-size", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
afterDelimiter := metabase.ObjectKey(metabase.Delimiter + 1)
objects := createObjectsWithKeys(ctx, t, db, projectID, bucketName, []metabase.ObjectKey{
"2017/05/08",
"2017/05/08/a",
"2017/05/08/b",
"2017/05/08/c",
"2017/05/08/d",
"2017/05/08/e",
"2017/05/08" + afterDelimiter,
"2017/05/09/a",
"2017/05/09/b",
"2017/05/09/c",
"2017/05/09/d",
"2017/05/09/e",
"2017/05/10/a",
"2017/05/10/b",
"2017/05/10/c",
"2017/05/10/d",
"2017/05/10/e",
})
metabasetest.IterateObjectsWithStatus{
Opts: metabase.IterateObjectsWithStatus{
ProjectID: projectID,
BucketName: bucketName,
Recursive: false,
BatchSize: 3,
Prefix: metabase.ObjectKey("2017/05/"),
Cursor: metabase.IterateCursor{
Key: metabase.ObjectKey("2017/05/08"),
Version: 1,
},
Status: metabase.Committed,
},
Result: []metabase.ObjectEntry{
prefixEntry(metabase.ObjectKey("08/"), metabase.Committed),
withoutPrefix1("2017/05/", objects["2017/05/08"+afterDelimiter]),
prefixEntry(metabase.ObjectKey("09/"), metabase.Committed),
prefixEntry(metabase.ObjectKey("10/"), metabase.Committed),
},
}.Check(ctx, t, db)
metabasetest.IterateObjectsWithStatus{
Opts: metabase.IterateObjectsWithStatus{
ProjectID: projectID,
BucketName: bucketName,
Recursive: false,
BatchSize: 3,
Prefix: metabase.ObjectKey("2017/05/"),
Cursor: metabase.IterateCursor{
Key: metabase.ObjectKey("2017/05/08/"),
Version: 1,
},
Status: metabase.Committed,
},
Result: []metabase.ObjectEntry{
withoutPrefix1("2017/05/", objects["2017/05/08"+afterDelimiter]),
prefixEntry(metabase.ObjectKey("09/"), metabase.Committed),
prefixEntry(metabase.ObjectKey("10/"), metabase.Committed),
},
}.Check(ctx, t, db)
metabasetest.IterateObjectsWithStatus{
Opts: metabase.IterateObjectsWithStatus{
ProjectID: projectID,
BucketName: bucketName,
Recursive: false,
Prefix: metabase.ObjectKey("2017/05/"),
Cursor: metabase.IterateCursor{
Key: metabase.ObjectKey("2017/05/08/a/x"),
Version: 1,
},
Status: metabase.Committed,
},
Result: []metabase.ObjectEntry{
withoutPrefix1("2017/05/", objects["2017/05/08"+afterDelimiter]),
prefixEntry(metabase.ObjectKey("09/"), metabase.Committed),
prefixEntry(metabase.ObjectKey("10/"), metabase.Committed),
},
}.Check(ctx, t, db)
})
})
}
func TestIteratePendingObjectsWithObjectKey(t *testing.T) { func TestIteratePendingObjectsWithObjectKey(t *testing.T) {
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
obj := metabasetest.RandObjectStream() obj := metabasetest.RandObjectStream()
@ -1299,6 +1573,11 @@ func withoutPrefix(prefix metabase.ObjectKey, entries ...metabase.ObjectEntry) [
return xs return xs
} }
func withoutPrefix1(prefix metabase.ObjectKey, entry metabase.ObjectEntry) metabase.ObjectEntry {
entry.ObjectKey = entry.ObjectKey[len(prefix):]
return entry
}
func prefixEntry(key metabase.ObjectKey, status metabase.ObjectStatus) metabase.ObjectEntry { func prefixEntry(key metabase.ObjectKey, status metabase.ObjectStatus) metabase.ObjectEntry {
return metabase.ObjectEntry{ return metabase.ObjectEntry{
IsPrefix: true, IsPrefix: true,

View File

@ -6,6 +6,7 @@ package metabase
import ( import (
"testing" "testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -27,3 +28,31 @@ func TestPrefixLimit(t *testing.T) {
} }
} }
} }
func TestFirstIterateCursor(t *testing.T) {
afterDelimiter := ObjectKey('/' + 1)
assert.Equal(t,
iterateCursor{Key: "a"},
firstIterateCursor(false, IterateCursor{Key: "a"}, ""))
assert.Equal(t,
iterateCursor{Key: "a" + afterDelimiter, Version: -1, Inclusive: true},
firstIterateCursor(false, IterateCursor{Key: "a/"}, ""))
assert.Equal(t,
iterateCursor{Key: "a" + afterDelimiter, Version: -1, Inclusive: true},
firstIterateCursor(false, IterateCursor{Key: "a/x/y"}, ""))
assert.Equal(t,
iterateCursor{Key: "a/x/y"},
firstIterateCursor(false, IterateCursor{Key: "a/x/y"}, "a/x/"))
assert.Equal(t,
iterateCursor{Key: "2017/05/08" + afterDelimiter, Version: -1, Inclusive: true},
firstIterateCursor(false, IterateCursor{Key: "2017/05/08/"}, "2017/05/"))
assert.Equal(t,
iterateCursor{Key: "2017/05/08" + afterDelimiter, Version: -1, Inclusive: true},
firstIterateCursor(false, IterateCursor{Key: "2017/05/08/x/y"}, "2017/05/"))
}

View File

@ -42,6 +42,8 @@ type ObjectsIterator interface {
} }
// IterateCursor is a cursor used during iteration through objects. // IterateCursor is a cursor used during iteration through objects.
//
// The cursor is exclusive.
type IterateCursor struct { type IterateCursor struct {
Key ObjectKey Key ObjectKey
Version Version Version Version