satellite/metainfo/metabase: add prefix to iterator

Change-Id: I2dd7b7917aed9def0be3f131ca60ab12f2077d83
This commit is contained in:
Egon Elbre 2020-11-16 16:02:11 +02:00
parent 7aba265db0
commit 402cfcb7c6
4 changed files with 315 additions and 40 deletions

View File

@ -4,40 +4,57 @@
package metabase
import (
"bytes"
"context"
"github.com/zeebo/errs"
"storj.io/common/uuid"
"storj.io/storj/private/tagsql"
)
// objectIterator enables iteration on objects in a bucket.
type objectsIterator struct {
opts *IterateObjects
db *DB
batchSize int
curIndex int
curRows tagsql.Rows
status ObjectStatus
cursor IterateCursor
db *DB
projectID uuid.UUID
bucketName string
status ObjectStatus
limitKey ObjectKey
batchSize int
curIndex int
curRows tagsql.Rows
cursor IterateCursor
}
func iterateAllVersions(ctx context.Context, db *DB, opts IterateObjects, fn func(context.Context, ObjectsIterator) error) (err error) {
defer mon.Task()(&ctx)(&err)
it := &objectsIterator{
db: db,
opts: &opts,
batchSize: opts.BatchSize,
curIndex: 0,
status: opts.Status,
cursor: opts.Cursor,
db: db,
projectID: opts.ProjectID,
bucketName: opts.BucketName,
status: opts.Status,
limitKey: nextPrefix(opts.Prefix),
batchSize: opts.BatchSize,
curIndex: 0,
cursor: opts.Cursor,
}
// ensure batch size is reasonable
if it.batchSize <= 0 || it.batchSize > batchsizeLimit {
it.batchSize = batchsizeLimit
}
// start from either the cursor or prefix, depending on which is larger
if lessKey(it.cursor.Key, opts.Prefix) {
it.cursor.Key = beforeKey(opts.Prefix)
it.cursor.Version = -1
}
it.curRows, err = it.doNextQuery(ctx)
if err != nil {
return err
@ -87,8 +104,8 @@ func (it *objectsIterator) Next(ctx context.Context, item *ObjectEntry) bool {
return false
}
item.ProjectID = it.opts.ProjectID
item.BucketName = it.opts.BucketName
item.ProjectID = it.projectID
item.BucketName = it.bucketName
it.curIndex++
it.cursor.Key = item.ObjectKey
@ -97,9 +114,33 @@ func (it *objectsIterator) Next(ctx context.Context, item *ObjectEntry) bool {
return true
}
// doNextQuery executes query to fetch the next batch returning the rows.
func (it *objectsIterator) doNextQuery(ctx context.Context) (_ tagsql.Rows, err error) {
defer mon.Task()(&ctx)(&err)
if it.limitKey == "" {
return it.db.db.Query(ctx, `
SELECT
object_key, stream_id, version, status,
created_at, expires_at,
segment_count,
encrypted_metadata_nonce, encrypted_metadata,
total_encrypted_size, fixed_segment_size,
encryption
FROM objects
WHERE
project_id = $1 AND bucket_name = $2
AND status = $3
AND (object_key, version) > ($4, $5)
ORDER BY object_key ASC, version ASC
LIMIT $6
`, it.projectID, it.bucketName,
it.status,
[]byte(it.cursor.Key), int(it.cursor.Version),
it.batchSize,
)
}
return it.db.db.Query(ctx, `
SELECT
object_key, stream_id, version, status,
@ -113,11 +154,18 @@ func (it *objectsIterator) doNextQuery(ctx context.Context) (_ tagsql.Rows, err
project_id = $1 AND bucket_name = $2
AND status = $3
AND (object_key, version) > ($4, $5)
AND object_key < $6
ORDER BY object_key ASC, version ASC
LIMIT $6
`, it.opts.ProjectID, it.opts.BucketName, it.status, []byte(it.cursor.Key), int(it.cursor.Version), it.opts.BatchSize)
LIMIT $7
`, it.projectID, it.bucketName,
it.status,
[]byte(it.cursor.Key), int(it.cursor.Version),
[]byte(it.limitKey),
it.batchSize,
)
}
// scanItem scans doNextQuery results into ObjectEntry.
func (it *objectsIterator) scanItem(item *ObjectEntry) error {
return it.curRows.Scan(
&item.ObjectKey, &item.StreamID, &item.Version, &item.Status,
@ -128,3 +176,29 @@ func (it *objectsIterator) scanItem(item *ObjectEntry) error {
encryptionParameters{&item.Encryption},
)
}
// nextPrefix returns the next prefix of the same length.
func nextPrefix(key ObjectKey) ObjectKey {
if key == "" {
return ""
}
after := []byte(key)
after[len(after)-1]++
return ObjectKey(after)
}
// beforeKey returns the key just before the key.
func beforeKey(key ObjectKey) ObjectKey {
if key == "" {
return ""
}
before := []byte(key)
before[len(before)-1]--
return ObjectKey(append(before, 0xFF))
}
// lessKey returns whether a < b.
func lessKey(a, b ObjectKey) bool {
return bytes.Compare([]byte(a), []byte(b)) < 0
}

View File

@ -0,0 +1,46 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package metabase
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestNextPrefix(t *testing.T) {
unchanged := ObjectKey("unchanged")
_ = nextPrefix(unchanged)
require.Equal(t, ObjectKey("unchanged"), unchanged)
tests := []struct{ in, exp ObjectKey }{
{"", ""},
{"a", "b"},
{"\xF1", "\xF2"},
}
for _, test := range tests {
require.Equal(t, test.exp, nextPrefix(test.in))
if test.in != "" {
require.True(t, lessKey(test.in, test.exp))
}
}
}
func TestBeforeKey(t *testing.T) {
unchanged := ObjectKey("unchanged")
_ = beforeKey(unchanged)
require.Equal(t, ObjectKey("unchanged"), unchanged)
tests := []struct{ in, exp ObjectKey }{
{"", ""},
{"b", "a\xFF"},
{"\xF1", "\xF0\xFF"},
}
for _, test := range tests {
require.Equal(t, test.exp, beforeKey(test.in))
if test.in != "" {
require.True(t, lessKey(test.exp, test.in))
}
}
}

View File

@ -46,14 +46,12 @@ func (db *DB) IterateObjectsAllVersions(ctx context.Context, opts IterateObjects
// Verify verifies get object request fields.
func (opts *IterateObjects) Verify() error {
switch {
case opts.BucketName == "":
return ErrInvalidRequest.New("BucketName missing")
case opts.ProjectID.IsZero():
return ErrInvalidRequest.New("ProjectID missing")
case opts.BucketName == "":
return ErrInvalidRequest.New("BucketName missing")
case !opts.Recursive:
return ErrInvalidRequest.New("non-recursive listing not implemented yet")
case opts.Prefix != "":
return ErrInvalidRequest.New("prefixed listing not implemented yet")
case opts.BatchSize < 0:
return ErrInvalidRequest.New("BatchSize is negative")
case !(opts.Status == Pending || opts.Status == Committed):

View File

@ -17,6 +17,20 @@ func TestIterateObjects(t *testing.T) {
All(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
t.Run("BucketName missing", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
t.Run("ProjectID missing", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
IterateObjects{
Opts: metabase.IterateObjects{
ProjectID: uuid.UUID{},
BucketName: "sj://mybucket",
Recursive: true,
Status: metabase.Committed,
},
ErrClass: &metabase.ErrInvalidRequest,
ErrText: "ProjectID missing",
}.Check(ctx, t, db)
Verify{}.Check(ctx, t, db)
})
IterateObjects{
Opts: metabase.IterateObjects{
ProjectID: uuid.UUID{1},
@ -29,20 +43,6 @@ func TestIterateObjects(t *testing.T) {
}.Check(ctx, t, db)
Verify{}.Check(ctx, t, db)
})
t.Run("ProjectID missing", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
IterateObjects{
Opts: metabase.IterateObjects{
ProjectID: uuid.UUID{},
BucketName: "sj://mybucket",
Recursive: true,
Status: metabase.Committed,
},
ErrClass: &metabase.ErrInvalidRequest,
ErrText: "ProjectID missing",
}.Check(ctx, t, db)
Verify{}.Check(ctx, t, db)
})
t.Run("Limit is negative", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
IterateObjects{
@ -201,7 +201,6 @@ func TestIterateObjects(t *testing.T) {
t.Run("objects in one bucket in project with 2 buckets", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
numberOfObjectsPerBucket := 5
batchSize := 10
expected := make([]metabase.ObjectEntry, numberOfObjectsPerBucket)
objectsBucketA := createObjects(ctx, t, db, numberOfObjectsPerBucket, uuid.UUID{1}, "bucket-a")
objectsBucketB := createObjects(ctx, t, db, numberOfObjectsPerBucket, uuid.UUID{1}, "bucket-b")
@ -213,7 +212,6 @@ func TestIterateObjects(t *testing.T) {
ProjectID: uuid.UUID{1},
BucketName: "bucket-a",
Recursive: true,
BatchSize: batchSize,
Status: metabase.Committed,
},
Result: expected,
@ -224,7 +222,6 @@ func TestIterateObjects(t *testing.T) {
t.Run("objects in one bucket with same bucketName in another project", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
numberOfObjectsPerBucket := 5
batchSize := 10
expected := make([]metabase.ObjectEntry, numberOfObjectsPerBucket)
objectsProject1 := createObjects(ctx, t, db, numberOfObjectsPerBucket, uuid.UUID{1}, "mybucket")
objectsProject2 := createObjects(ctx, t, db, numberOfObjectsPerBucket, uuid.UUID{2}, "mybucket")
@ -236,18 +233,156 @@ func TestIterateObjects(t *testing.T) {
ProjectID: uuid.UUID{1},
BucketName: "mybucket",
Recursive: true,
BatchSize: batchSize,
Status: metabase.Committed,
},
Result: expected,
}.Check(ctx, t, db)
Verify{Objects: append(objectsProject1, objectsProject2...)}.Check(ctx, t, db)
})
t.Run("options", func(t *testing.T) {
projectID, bucketName := uuid.UUID{1}, "bucky"
objects := createObjectsWithKeys(ctx, t, db, projectID, bucketName, []metabase.ObjectKey{
"a",
"b/1",
"b/2",
"b/3",
"c",
"c/",
"c//",
"c/1",
"g",
})
IterateObjects{
Opts: metabase.IterateObjects{
ProjectID: projectID,
BucketName: bucketName,
Recursive: true,
Status: metabase.Committed,
},
Result: []metabase.ObjectEntry{
objects["a"],
objects["b/1"],
objects["b/2"],
objects["b/3"],
objects["c"],
objects["c/"],
objects["c//"],
objects["c/1"],
objects["g"],
},
}.Check(ctx, t, db)
IterateObjects{
Opts: metabase.IterateObjects{
ProjectID: projectID,
BucketName: bucketName,
Recursive: true,
Status: metabase.Committed,
Cursor: metabase.IterateCursor{Key: "a", Version: 10},
},
Result: []metabase.ObjectEntry{
objects["b/1"],
objects["b/2"],
objects["b/3"],
objects["c"],
objects["c/"],
objects["c//"],
objects["c/1"],
objects["g"],
},
}.Check(ctx, t, db)
IterateObjects{
Opts: metabase.IterateObjects{
ProjectID: projectID,
BucketName: bucketName,
Recursive: true,
Status: metabase.Committed,
Cursor: metabase.IterateCursor{Key: "b", Version: 0},
},
Result: []metabase.ObjectEntry{
objects["b/1"],
objects["b/2"],
objects["b/3"],
objects["c"],
objects["c/"],
objects["c//"],
objects["c/1"],
objects["g"],
},
}.Check(ctx, t, db)
IterateObjects{
Opts: metabase.IterateObjects{
ProjectID: projectID,
BucketName: bucketName,
Recursive: true,
Status: metabase.Committed,
Prefix: "b/",
},
Result: []metabase.ObjectEntry{
objects["b/1"],
objects["b/2"],
objects["b/3"],
},
}.Check(ctx, t, db)
IterateObjects{
Opts: metabase.IterateObjects{
ProjectID: projectID,
BucketName: bucketName,
Recursive: true,
Status: metabase.Committed,
Prefix: "b/",
Cursor: metabase.IterateCursor{Key: "a"},
},
Result: []metabase.ObjectEntry{
objects["b/1"],
objects["b/2"],
objects["b/3"],
},
}.Check(ctx, t, db)
IterateObjects{
Opts: metabase.IterateObjects{
ProjectID: projectID,
BucketName: bucketName,
Recursive: true,
Status: metabase.Committed,
Prefix: "b/",
Cursor: metabase.IterateCursor{Key: "b/2", Version: -3},
},
Result: []metabase.ObjectEntry{
objects["b/2"],
objects["b/3"],
},
}.Check(ctx, t, db)
IterateObjects{
Opts: metabase.IterateObjects{
ProjectID: projectID,
BucketName: bucketName,
Recursive: true,
Status: metabase.Committed,
Prefix: "b/",
Cursor: metabase.IterateCursor{Key: "c/"},
},
Result: nil,
}.Check(ctx, t, db)
})
})
}
func createObjects(ctx *testcontext.Context, t *testing.T, db *metabase.DB, numberOfObjects int, projectID uuid.UUID, bucketName string) []metabase.RawObject {
objects := make([]metabase.RawObject, numberOfObjects)
for i := 0; i < numberOfObjects; i++ {
obj := randObjectStream()
@ -269,3 +404,25 @@ func createObjects(ctx *testcontext.Context, t *testing.T, db *metabase.DB, numb
})
return objects
}
func createObjectsWithKeys(ctx *testcontext.Context, t *testing.T, db *metabase.DB, projectID uuid.UUID, bucketName string, keys []metabase.ObjectKey) map[metabase.ObjectKey]metabase.ObjectEntry {
objects := make(map[metabase.ObjectKey]metabase.ObjectEntry, len(keys))
for _, key := range keys {
obj := randObjectStream()
obj.ProjectID = projectID
obj.BucketName = bucketName
obj.ObjectKey = key
now := time.Now()
createObject(ctx, t, db, obj, 0)
objects[key] = metabase.ObjectEntry{
ObjectStream: obj,
CreatedAt: now,
Status: metabase.Committed,
Encryption: defaultTestEncryption,
}
}
return objects
}