satellite/metabase: Remove subquery on delete objects for CRDB

The subquery for DELETE FROM obects returns a stream_id field for filtering. Unfortunately stream_id is not indexed. This change removed the subquery from the CockroachDB delete bucket query.

Change-Id: If1abe21668c593e6d4bdc3ba8cdbad26c09d234e
This commit is contained in:
Ethan 2021-02-19 10:48:33 -05:00 committed by Ethan Adams
parent e0f15130a2
commit 79ecd80c0a
2 changed files with 44 additions and 16 deletions

View File

@ -28,8 +28,9 @@ var (
// DB implements a database for storing objects and segments. // DB implements a database for storing objects and segments.
type DB struct { type DB struct {
log *zap.Logger log *zap.Logger
db tagsql.DB db tagsql.DB
implementation dbutil.Implementation
aliasCache *NodeAliasCache aliasCache *NodeAliasCache
} }
@ -44,6 +45,12 @@ func Open(ctx context.Context, log *zap.Logger, driverName, connstr string) (*DB
db := &DB{log: log, db: postgresRebind{rawdb}} db := &DB{log: log, db: postgresRebind{rawdb}}
db.aliasCache = NewNodeAliasCache(db) db.aliasCache = NewNodeAliasCache(db)
_, _, db.implementation, err = dbutil.SplitConnStr(connstr)
if err != nil {
return nil, Error.Wrap(err)
}
return db, nil return db, nil
} }

View File

@ -9,6 +9,7 @@ import (
"errors" "errors"
"storj.io/common/uuid" "storj.io/common/uuid"
"storj.io/storj/private/dbutil"
"storj.io/storj/private/tagsql" "storj.io/storj/private/tagsql"
) )
@ -37,25 +38,45 @@ func (db *DB) DeleteBucketObjects(ctx context.Context, opts DeleteBucketObjects)
batchSize = deleteBatchSizeLimit batchSize = deleteBatchSizeLimit
} }
var query string
switch db.implementation {
case dbutil.Cockroach:
query = `
WITH deleted_objects AS (
DELETE FROM objects
WHERE project_id = $1 AND bucket_name = $2 LIMIT $3
RETURNING objects.stream_id
)
DELETE FROM segments
WHERE segments.stream_id in (SELECT deleted_objects.stream_id FROM deleted_objects)
RETURNING segments.stream_id, segments.root_piece_id, segments.remote_alias_pieces
`
case dbutil.Postgres:
query = `
WITH deleted_objects AS (
DELETE FROM objects
WHERE stream_id IN (
SELECT stream_id FROM objects
WHERE project_id = $1 AND bucket_name = $2
LIMIT $3
)
RETURNING objects.stream_id
)
DELETE FROM segments
WHERE segments.stream_id in (SELECT deleted_objects.stream_id FROM deleted_objects)
RETURNING segments.stream_id, segments.root_piece_id, segments.remote_alias_pieces
`
default:
return deletedObjectCount, Error.New("invalid dbType: %v", db.implementation)
}
// TODO: fix the count for objects without segments // TODO: fix the count for objects without segments
var deleteSegments []DeletedSegmentInfo var deleteSegments []DeletedSegmentInfo
for { for {
deleteSegments = deleteSegments[:0] deleteSegments = deleteSegments[:0]
err = withRows(db.db.Query(ctx, ` err = withRows(db.db.Query(ctx, query,
WITH deleted_objects AS ( opts.Bucket.ProjectID, opts.Bucket.BucketName, batchSize))(func(rows tagsql.Rows) error {
DELETE FROM objects
WHERE stream_id IN (
SELECT stream_id FROM objects
WHERE project_id = $1 AND bucket_name = $2
LIMIT $3
)
RETURNING objects.stream_id
)
DELETE FROM segments
WHERE segments.stream_id in (SELECT deleted_objects.stream_id FROM deleted_objects)
RETURNING segments.stream_id, segments.root_piece_id, segments.remote_alias_pieces
`, opts.Bucket.ProjectID, opts.Bucket.BucketName, batchSize))(func(rows tagsql.Rows) error {
ids := map[uuid.UUID]struct{}{} // TODO: avoid map here ids := map[uuid.UUID]struct{}{} // TODO: avoid map here
for rows.Next() { for rows.Next() {
var streamID uuid.UUID var streamID uuid.UUID