289ae3325a
We implemented filtering of system metadata to reduce DB load in case user will need only list of objects but we missed that encryption configuration is needed to decrypt object key. This change is always including encryption configuration into list objects results. Change-Id: Iaf7b3d8441480e3ad787f19a28f1b648584d5f7d
512 lines
12 KiB
Go
512 lines
12 KiB
Go
// Copyright (C) 2020 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package metabase
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"strings"
|
|
|
|
"github.com/zeebo/errs"
|
|
|
|
"storj.io/common/uuid"
|
|
"storj.io/private/tagsql"
|
|
)
|
|
|
|
// objectIterator enables iteration on objects in a bucket.
|
|
type objectsIterator struct {
|
|
db *DB
|
|
|
|
projectID uuid.UUID
|
|
bucketName []byte
|
|
status ObjectStatus
|
|
prefix ObjectKey
|
|
prefixLimit ObjectKey
|
|
batchSize int
|
|
recursive bool
|
|
includeCustomMetadata bool
|
|
includeSystemMetadata bool
|
|
|
|
curIndex int
|
|
curRows tagsql.Rows
|
|
cursor iterateCursor // not relative to prefix
|
|
|
|
skipPrefix ObjectKey // relative to prefix
|
|
doNextQuery func(context.Context, *objectsIterator) (_ tagsql.Rows, err error)
|
|
|
|
// failErr is set when either scan or next query fails during iteration.
|
|
failErr error
|
|
}
|
|
|
|
type iterateCursor struct {
|
|
Key ObjectKey
|
|
Version Version
|
|
StreamID uuid.UUID
|
|
Inclusive bool
|
|
}
|
|
|
|
func iterateAllVersions(ctx context.Context, db *DB, opts IterateObjects, fn func(context.Context, ObjectsIterator) error) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
it := &objectsIterator{
|
|
db: db,
|
|
|
|
projectID: opts.ProjectID,
|
|
bucketName: []byte(opts.BucketName),
|
|
prefix: opts.Prefix,
|
|
prefixLimit: prefixLimit(opts.Prefix),
|
|
batchSize: opts.BatchSize,
|
|
recursive: true,
|
|
includeCustomMetadata: true,
|
|
includeSystemMetadata: true,
|
|
|
|
curIndex: 0,
|
|
cursor: firstIterateCursor(true, opts.Cursor, opts.Prefix),
|
|
doNextQuery: doNextQueryAllVersionsWithoutStatus,
|
|
}
|
|
|
|
// start from either the cursor or prefix, depending on which is larger
|
|
if lessKey(it.cursor.Key, opts.Prefix) {
|
|
it.cursor.Key = opts.Prefix
|
|
it.cursor.Version = -1
|
|
it.cursor.Inclusive = true
|
|
}
|
|
|
|
return iterate(ctx, it, fn)
|
|
}
|
|
|
|
func iterateAllVersionsWithStatus(ctx context.Context, db *DB, opts IterateObjectsWithStatus, fn func(context.Context, ObjectsIterator) error) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
it := &objectsIterator{
|
|
db: db,
|
|
|
|
projectID: opts.ProjectID,
|
|
bucketName: []byte(opts.BucketName),
|
|
status: opts.Status,
|
|
prefix: opts.Prefix,
|
|
prefixLimit: prefixLimit(opts.Prefix),
|
|
batchSize: opts.BatchSize,
|
|
recursive: opts.Recursive,
|
|
includeCustomMetadata: opts.IncludeCustomMetadata,
|
|
includeSystemMetadata: opts.IncludeSystemMetadata,
|
|
|
|
curIndex: 0,
|
|
cursor: firstIterateCursor(opts.Recursive, opts.Cursor, opts.Prefix),
|
|
|
|
doNextQuery: doNextQueryAllVersionsWithStatus,
|
|
}
|
|
|
|
// start from either the cursor or prefix, depending on which is larger
|
|
if lessKey(it.cursor.Key, opts.Prefix) {
|
|
it.cursor.Key = opts.Prefix
|
|
it.cursor.Version = -1
|
|
it.cursor.Inclusive = true
|
|
}
|
|
|
|
return iterate(ctx, it, fn)
|
|
}
|
|
|
|
func iteratePendingObjectsByKey(ctx context.Context, db *DB, opts IteratePendingObjectsByKey, fn func(context.Context, ObjectsIterator) error) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
cursor := opts.Cursor
|
|
|
|
if cursor.StreamID.IsZero() {
|
|
cursor.StreamID = uuid.UUID{}
|
|
}
|
|
|
|
it := &objectsIterator{
|
|
db: db,
|
|
|
|
projectID: opts.ProjectID,
|
|
bucketName: []byte(opts.BucketName),
|
|
prefix: "",
|
|
prefixLimit: "",
|
|
batchSize: opts.BatchSize,
|
|
recursive: true,
|
|
includeCustomMetadata: true,
|
|
includeSystemMetadata: true,
|
|
|
|
curIndex: 0,
|
|
cursor: iterateCursor{
|
|
Key: opts.ObjectKey,
|
|
Version: 0,
|
|
StreamID: opts.Cursor.StreamID,
|
|
},
|
|
doNextQuery: doNextQueryStreamsByKey,
|
|
}
|
|
|
|
return iterate(ctx, it, fn)
|
|
|
|
}
|
|
|
|
func iterate(ctx context.Context, it *objectsIterator, fn func(context.Context, ObjectsIterator) error) (err error) {
|
|
batchsizeLimit.Ensure(&it.batchSize)
|
|
|
|
it.curRows, err = it.doNextQuery(ctx, it)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
it.cursor.Inclusive = false
|
|
|
|
defer func() {
|
|
if rowsErr := it.curRows.Err(); rowsErr != nil {
|
|
err = errs.Combine(err, rowsErr)
|
|
}
|
|
err = errs.Combine(err, it.failErr, it.curRows.Close())
|
|
}()
|
|
|
|
return fn(ctx, it)
|
|
}
|
|
|
|
// Next returns true if there was another item and copy it in item.
|
|
func (it *objectsIterator) Next(ctx context.Context, item *ObjectEntry) bool {
|
|
if it.recursive {
|
|
return it.next(ctx, item)
|
|
}
|
|
|
|
// TODO: implement this on the database side
|
|
|
|
ok := it.next(ctx, item)
|
|
if !ok {
|
|
return false
|
|
}
|
|
|
|
// skip until we are past the prefix we returned before.
|
|
if it.skipPrefix != "" {
|
|
for strings.HasPrefix(string(item.ObjectKey), string(it.skipPrefix)) {
|
|
if !it.next(ctx, item) {
|
|
return false
|
|
}
|
|
}
|
|
it.skipPrefix = ""
|
|
}
|
|
|
|
// should this be treated as a prefix?
|
|
p := strings.IndexByte(string(item.ObjectKey), Delimiter)
|
|
if p >= 0 {
|
|
it.skipPrefix = item.ObjectKey[:p+1]
|
|
*item = ObjectEntry{
|
|
IsPrefix: true,
|
|
ObjectKey: item.ObjectKey[:p+1],
|
|
Status: it.status,
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// next returns true if there was another item and copy it in item.
|
|
func (it *objectsIterator) next(ctx context.Context, item *ObjectEntry) bool {
|
|
next := it.curRows.Next()
|
|
if !next {
|
|
if it.curIndex < it.batchSize {
|
|
return false
|
|
}
|
|
|
|
if it.curRows.Err() != nil {
|
|
return false
|
|
}
|
|
|
|
rows, err := it.doNextQuery(ctx, it)
|
|
if err != nil {
|
|
it.failErr = errs.Combine(it.failErr, err)
|
|
return false
|
|
}
|
|
|
|
if closeErr := it.curRows.Close(); closeErr != nil {
|
|
it.failErr = errs.Combine(it.failErr, closeErr, rows.Close())
|
|
return false
|
|
}
|
|
|
|
it.curRows = rows
|
|
it.curIndex = 0
|
|
if !it.curRows.Next() {
|
|
return false
|
|
}
|
|
}
|
|
|
|
err := it.scanItem(item)
|
|
if err != nil {
|
|
it.failErr = errs.Combine(it.failErr, err)
|
|
return false
|
|
}
|
|
|
|
it.curIndex++
|
|
it.cursor.Key = item.ObjectKey
|
|
it.cursor.Version = item.Version
|
|
it.cursor.StreamID = item.StreamID
|
|
|
|
if it.prefix != "" {
|
|
if !strings.HasPrefix(string(item.ObjectKey), string(it.prefix)) {
|
|
return false
|
|
}
|
|
}
|
|
|
|
// TODO this should be done with SQL query
|
|
item.ObjectKey = item.ObjectKey[len(it.prefix):]
|
|
|
|
return true
|
|
}
|
|
|
|
func doNextQueryAllVersionsWithoutStatus(ctx context.Context, it *objectsIterator) (_ tagsql.Rows, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
cursorCompare := ">"
|
|
if it.cursor.Inclusive {
|
|
cursorCompare = ">="
|
|
}
|
|
|
|
if it.prefixLimit == "" {
|
|
return it.db.db.QueryContext(ctx, `
|
|
SELECT
|
|
object_key, stream_id, version, encryption, status,
|
|
created_at, expires_at,
|
|
segment_count,
|
|
total_plain_size, total_encrypted_size, fixed_segment_size,
|
|
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key
|
|
FROM objects
|
|
WHERE
|
|
project_id = $1 AND bucket_name = $2
|
|
AND (object_key, version) `+cursorCompare+` ($3, $4)
|
|
ORDER BY object_key ASC, version ASC
|
|
LIMIT $5
|
|
`, it.projectID, it.bucketName,
|
|
[]byte(it.cursor.Key), int(it.cursor.Version),
|
|
it.batchSize,
|
|
)
|
|
}
|
|
|
|
// TODO this query should use SUBSTRING(object_key from $8) but there is a problem how it
|
|
// works with CRDB.
|
|
return it.db.db.QueryContext(ctx, `
|
|
SELECT
|
|
object_key, stream_id, version, encryption, status,
|
|
created_at, expires_at,
|
|
segment_count,
|
|
total_plain_size, total_encrypted_size, fixed_segment_size,
|
|
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key
|
|
FROM objects
|
|
WHERE
|
|
project_id = $1 AND bucket_name = $2
|
|
AND (object_key, version) `+cursorCompare+` ($3, $4)
|
|
AND object_key < $5
|
|
ORDER BY object_key ASC, version ASC
|
|
LIMIT $6
|
|
`, it.projectID, it.bucketName,
|
|
[]byte(it.cursor.Key), int(it.cursor.Version),
|
|
[]byte(it.prefixLimit),
|
|
it.batchSize,
|
|
|
|
// len(it.prefix)+1, // TODO uncomment when CRDB issue will be fixed
|
|
)
|
|
}
|
|
|
|
func doNextQueryAllVersionsWithStatus(ctx context.Context, it *objectsIterator) (_ tagsql.Rows, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
// minimum needed for cursor
|
|
querySelectFields := `
|
|
object_key
|
|
,stream_id
|
|
,version
|
|
,encryption`
|
|
|
|
if it.includeSystemMetadata {
|
|
querySelectFields += `
|
|
,status
|
|
,created_at
|
|
,expires_at
|
|
,segment_count
|
|
,total_plain_size
|
|
,total_encrypted_size
|
|
,fixed_segment_size`
|
|
}
|
|
|
|
if it.includeCustomMetadata {
|
|
querySelectFields += `
|
|
,encrypted_metadata_nonce
|
|
,encrypted_metadata
|
|
,encrypted_metadata_encrypted_key`
|
|
}
|
|
|
|
cursorCompare := ">"
|
|
if it.cursor.Inclusive {
|
|
cursorCompare = ">="
|
|
}
|
|
|
|
if it.prefixLimit == "" {
|
|
return it.db.db.QueryContext(ctx, `
|
|
SELECT
|
|
`+querySelectFields+`
|
|
FROM objects
|
|
WHERE
|
|
(project_id, bucket_name, object_key, version) `+cursorCompare+` ($1, $2, $4, $5)
|
|
AND (project_id, bucket_name) < ($1, $7)
|
|
AND status = $3
|
|
ORDER BY (project_id, bucket_name, object_key, version) ASC
|
|
LIMIT $6
|
|
`, it.projectID, it.bucketName,
|
|
it.status,
|
|
[]byte(it.cursor.Key), int(it.cursor.Version),
|
|
it.batchSize,
|
|
nextBucket(it.bucketName),
|
|
)
|
|
}
|
|
|
|
// TODO this query should use SUBSTRING(object_key from $8) but there is a problem how it
|
|
// works with CRDB.
|
|
return it.db.db.QueryContext(ctx, `
|
|
SELECT
|
|
`+querySelectFields+`
|
|
FROM objects
|
|
WHERE
|
|
(project_id, bucket_name, object_key, version) `+cursorCompare+` ($1, $2, $4, $5)
|
|
AND (project_id, bucket_name, object_key) < ($1, $2, $6)
|
|
AND status = $3
|
|
ORDER BY (project_id, bucket_name, object_key, version) ASC
|
|
LIMIT $7
|
|
`, it.projectID, it.bucketName,
|
|
it.status,
|
|
[]byte(it.cursor.Key), int(it.cursor.Version),
|
|
[]byte(it.prefixLimit),
|
|
it.batchSize,
|
|
// len(it.prefix)+1, // TODO uncomment when CRDB issue will be fixed
|
|
)
|
|
}
|
|
|
|
// nextBucket returns the lexicographically next bucket.
|
|
func nextBucket(b []byte) []byte {
|
|
xs := make([]byte, len(b)+1)
|
|
copy(xs, b)
|
|
return xs
|
|
}
|
|
|
|
// doNextQuery executes query to fetch the next batch returning the rows.
|
|
func doNextQueryStreamsByKey(ctx context.Context, it *objectsIterator) (_ tagsql.Rows, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
return it.db.db.QueryContext(ctx, `
|
|
SELECT
|
|
object_key, stream_id, version, encryption, status,
|
|
created_at, expires_at,
|
|
segment_count,
|
|
total_plain_size, total_encrypted_size, fixed_segment_size,
|
|
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key
|
|
FROM objects
|
|
WHERE
|
|
project_id = $1 AND bucket_name = $2
|
|
AND object_key = $3
|
|
AND stream_id > $4::BYTEA
|
|
AND status = `+pendingStatus+`
|
|
ORDER BY stream_id ASC
|
|
LIMIT $5
|
|
`, it.projectID, it.bucketName,
|
|
[]byte(it.cursor.Key),
|
|
it.cursor.StreamID,
|
|
it.batchSize,
|
|
)
|
|
}
|
|
|
|
// scanItem scans doNextQuery results into ObjectEntry.
|
|
func (it *objectsIterator) scanItem(item *ObjectEntry) (err error) {
|
|
item.IsPrefix = false
|
|
|
|
fields := []interface{}{
|
|
&item.ObjectKey,
|
|
&item.StreamID,
|
|
&item.Version,
|
|
encryptionParameters{&item.Encryption},
|
|
}
|
|
|
|
if it.includeSystemMetadata {
|
|
fields = append(fields,
|
|
&item.Status,
|
|
&item.CreatedAt,
|
|
&item.ExpiresAt,
|
|
&item.SegmentCount,
|
|
&item.TotalPlainSize,
|
|
&item.TotalEncryptedSize,
|
|
&item.FixedSegmentSize,
|
|
)
|
|
}
|
|
|
|
if it.includeCustomMetadata {
|
|
fields = append(fields,
|
|
&item.EncryptedMetadataNonce,
|
|
&item.EncryptedMetadata,
|
|
&item.EncryptedMetadataEncryptedKey,
|
|
)
|
|
}
|
|
|
|
err = it.curRows.Scan(fields...)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func prefixLimit(a ObjectKey) ObjectKey {
|
|
if a == "" {
|
|
return ""
|
|
}
|
|
if a[len(a)-1] == 0xFF {
|
|
return a + "\x00"
|
|
}
|
|
|
|
key := []byte(a)
|
|
key[len(key)-1]++
|
|
return ObjectKey(key)
|
|
}
|
|
|
|
// lessKey returns whether a < b.
|
|
func lessKey(a, b ObjectKey) bool {
|
|
return bytes.Compare([]byte(a), []byte(b)) < 0
|
|
}
|
|
|
|
// firstIterateCursor adjust the cursor for a non-recursive iteration.
|
|
// The cursor is non-inclusive and we need to adjust to handle prefix as cursor properly.
|
|
// We return the next possible key from the prefix.
|
|
func firstIterateCursor(recursive bool, cursor IterateCursor, prefix ObjectKey) iterateCursor {
|
|
if recursive {
|
|
return iterateCursor{
|
|
Key: cursor.Key,
|
|
Version: cursor.Version,
|
|
}
|
|
}
|
|
|
|
// when the cursor does not match the prefix, we'll return the original cursor.
|
|
if !strings.HasPrefix(string(cursor.Key), string(prefix)) {
|
|
return iterateCursor{
|
|
Key: cursor.Key,
|
|
Version: cursor.Version,
|
|
}
|
|
}
|
|
|
|
// handle case where:
|
|
// prefix: x/y/
|
|
// cursor: x/y/z/w
|
|
// In this case, we want the skip prefix to be `x/y/z` + string('/' + 1).
|
|
|
|
cursorWithoutPrefix := cursor.Key[len(prefix):]
|
|
p := strings.IndexByte(string(cursorWithoutPrefix), Delimiter)
|
|
if p < 0 {
|
|
// The cursor is not a prefix, but instead a path inside the prefix,
|
|
// so we can use it directly.
|
|
return iterateCursor{
|
|
Key: cursor.Key,
|
|
Version: cursor.Version,
|
|
}
|
|
}
|
|
|
|
// return the next prefix given a scoped path
|
|
return iterateCursor{
|
|
Key: cursor.Key[:len(prefix)+p] + ObjectKey(Delimiter+1),
|
|
Version: -1,
|
|
Inclusive: true,
|
|
}
|
|
}
|