satellite/metainfo/metabase: avoid full table scan
Change-Id: Id47ffb5e1287d9303ce6fb530e87dbdc23cf8307
This commit is contained in:
parent
646cf229a2
commit
ba0197a9b7
@ -286,6 +286,34 @@ func (s *scenario) run(ctx *testcontext.Context, b *testing.B, db *metabase.DB)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
b.Run("IterateObjectsAllVersionsWithStatus", func(b *testing.B) {
|
||||
m := make(Metrics, 0, b.N*len(s.objectStream)*s.parts*s.segments)
|
||||
defer m.Report(b, "ns/seg")
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
for _, object := range s.objectStream {
|
||||
m.Record(func() {
|
||||
err := db.IterateObjectsAllVersionsWithStatus(ctx, metabase.IterateObjectsWithStatus{
|
||||
ProjectID: object.ProjectID,
|
||||
BucketName: object.BucketName,
|
||||
Recursive: true,
|
||||
BatchSize: 1,
|
||||
Cursor: metabase.IterateCursor{
|
||||
Key: object.ObjectKey,
|
||||
},
|
||||
Status: metabase.Committed,
|
||||
}, func(ctx context.Context, it metabase.ObjectsIterator) error {
|
||||
var item metabase.ObjectEntry
|
||||
for it.Next(ctx, &item) {
|
||||
}
|
||||
return nil
|
||||
})
|
||||
require.NoError(b, err)
|
||||
})
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Metrics records a set of time.Durations.
|
||||
|
@ -326,15 +326,16 @@ func doNextQueryAllVersionsWithStatus(ctx context.Context, it *objectsIterator)
|
||||
encryption
|
||||
FROM objects
|
||||
WHERE
|
||||
project_id = $1 AND bucket_name = $2
|
||||
(project_id, bucket_name, object_key, version) `+cursorCompare+` ($1, $2, $4, $5)
|
||||
AND (project_id, bucket_name) < ($1, $7)
|
||||
AND status = $3
|
||||
AND (object_key, version) `+cursorCompare+` ($4, $5)
|
||||
ORDER BY object_key ASC, version ASC
|
||||
ORDER BY (project_id, bucket_name, object_key, version) ASC
|
||||
LIMIT $6
|
||||
`, it.projectID, it.bucketName,
|
||||
it.status,
|
||||
[]byte(it.cursor.Key), int(it.cursor.Version),
|
||||
it.batchSize,
|
||||
nextBucket(it.bucketName),
|
||||
)
|
||||
}
|
||||
|
||||
@ -350,22 +351,25 @@ func doNextQueryAllVersionsWithStatus(ctx context.Context, it *objectsIterator)
|
||||
encryption
|
||||
FROM objects
|
||||
WHERE
|
||||
project_id = $1 AND bucket_name = $2
|
||||
(project_id, bucket_name, object_key, version) `+cursorCompare+` ($1, $2, $4, $5)
|
||||
AND (project_id, bucket_name, object_key) < ($1, $2, $6)
|
||||
AND status = $3
|
||||
AND (object_key, version) `+cursorCompare+` ($4, $5)
|
||||
AND object_key < $6
|
||||
ORDER BY object_key ASC, version ASC
|
||||
ORDER BY (project_id, bucket_name, object_key, version) ASC
|
||||
LIMIT $7
|
||||
`, it.projectID, it.bucketName,
|
||||
it.status,
|
||||
[]byte(it.cursor.Key), int(it.cursor.Version),
|
||||
[]byte(it.prefixLimit),
|
||||
it.batchSize,
|
||||
|
||||
// len(it.prefix)+1, // TODO uncomment when CRDB issue will be fixed
|
||||
)
|
||||
}
|
||||
|
||||
// nextBucket returns the lexicographically next bucket.
|
||||
func nextBucket(b string) string {
|
||||
return b + "\x01"
|
||||
}
|
||||
|
||||
// doNextQuery executes query to fetch the next batch returning the rows.
|
||||
func doNextQueryStreamsByKey(ctx context.Context, it *objectsIterator) (_ tagsql.Rows, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
@ -382,8 +386,8 @@ func doNextQueryStreamsByKey(ctx context.Context, it *objectsIterator) (_ tagsql
|
||||
WHERE
|
||||
project_id = $1 AND bucket_name = $2
|
||||
AND object_key = $3
|
||||
AND status = `+pendingStatus+`
|
||||
AND stream_id > $4::BYTEA
|
||||
AND status = `+pendingStatus+`
|
||||
ORDER BY stream_id ASC
|
||||
LIMIT $5
|
||||
`, it.projectID, it.bucketName,
|
||||
|
Loading…
Reference in New Issue
Block a user