satellite/metabase: drop bucket deletion code for copy with references
With this change we are removing code responsible to handle deleting bucket and supporting server side copies created with references. In practice we are restoring delete queries that we had before server side copy implementation (with small exception, see bellow). From deletion queries we are also removing parts with segment metadata as result because we are not longer sending explicit delete requests to storage nodes. https://github.com/storj/storj/issues/5891 Change-Id: If866d9f3a1b01e9ebd9b49c4740a6425ba06dd43
This commit is contained in:
parent
00531fe2b0
commit
a5cbec7b3b
@ -5,17 +5,8 @@ package metabase
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/private/dbutil"
|
||||
"storj.io/private/dbutil/txutil"
|
||||
"storj.io/private/tagsql"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -28,43 +19,6 @@ type DeleteBucketObjects struct {
|
||||
BatchSize int
|
||||
}
|
||||
|
||||
var deleteObjectsCockroachSubSQL = `
|
||||
DELETE FROM objects
|
||||
WHERE project_id = $1 AND bucket_name = $2
|
||||
LIMIT $3
|
||||
`
|
||||
|
||||
// postgres does not support LIMIT in DELETE.
|
||||
var deleteObjectsPostgresSubSQL = `
|
||||
DELETE FROM objects
|
||||
WHERE (objects.project_id, objects.bucket_name) IN (
|
||||
SELECT project_id, bucket_name FROM objects
|
||||
WHERE project_id = $1 AND bucket_name = $2
|
||||
LIMIT $3
|
||||
)`
|
||||
|
||||
var deleteBucketObjectsWithCopyFeaturePostgresSQL = fmt.Sprintf(
|
||||
deleteBucketObjectsWithCopyFeatureSQL,
|
||||
deleteObjectsPostgresSubSQL,
|
||||
"", "",
|
||||
)
|
||||
var deleteBucketObjectsWithCopyFeatureCockroachSQL = fmt.Sprintf(
|
||||
deleteBucketObjectsWithCopyFeatureSQL,
|
||||
deleteObjectsCockroachSubSQL,
|
||||
"", "",
|
||||
)
|
||||
|
||||
func getDeleteBucketObjectsSQLWithCopyFeature(impl dbutil.Implementation) (string, error) {
|
||||
switch impl {
|
||||
case dbutil.Cockroach:
|
||||
return deleteBucketObjectsWithCopyFeatureCockroachSQL, nil
|
||||
case dbutil.Postgres:
|
||||
return deleteBucketObjectsWithCopyFeaturePostgresSQL, nil
|
||||
default:
|
||||
return "", Error.New("unhandled database: %v", impl)
|
||||
}
|
||||
}
|
||||
|
||||
// DeleteBucketObjects deletes all objects in the specified bucket.
|
||||
// Deletion performs in batches, so in case of error while processing,
|
||||
// this method will return the number of objects deleted to the moment
|
||||
@ -78,22 +32,12 @@ func (db *DB) DeleteBucketObjects(ctx context.Context, opts DeleteBucketObjects)
|
||||
|
||||
deleteBatchSizeLimit.Ensure(&opts.BatchSize)
|
||||
|
||||
if db.config.ServerSideCopy {
|
||||
return db.deleteBucketObjectsWithCopyFeatureEnabled(ctx, opts)
|
||||
}
|
||||
|
||||
return db.deleteBucketObjectsWithCopyFeatureDisabled(ctx, opts)
|
||||
}
|
||||
|
||||
func (db *DB) deleteBucketObjectsWithCopyFeatureEnabled(ctx context.Context, opts DeleteBucketObjects) (deletedObjectCount int64, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
for {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return deletedObjectCount, err
|
||||
}
|
||||
|
||||
deletedBatchCount, err := db.deleteBucketObjectBatchWithCopyFeatureEnabled(ctx, opts)
|
||||
deletedBatchCount, err := db.deleteBucketObjects(ctx, opts)
|
||||
deletedObjectCount += deletedBatchCount
|
||||
|
||||
if err != nil || deletedBatchCount == 0 {
|
||||
@ -102,85 +46,7 @@ func (db *DB) deleteBucketObjectsWithCopyFeatureEnabled(ctx context.Context, opt
|
||||
}
|
||||
}
|
||||
|
||||
// deleteBucketObjectBatchWithCopyFeatureEnabled deletes a single batch from metabase.
|
||||
// This function has been factored out for metric purposes.
|
||||
func (db *DB) deleteBucketObjectBatchWithCopyFeatureEnabled(ctx context.Context, opts DeleteBucketObjects) (deletedObjectCount int64, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
query, err := getDeleteBucketObjectsSQLWithCopyFeature(db.impl)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
var objects []deletedObjectInfo
|
||||
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) {
|
||||
err = withRows(
|
||||
tx.QueryContext(ctx, query, opts.Bucket.ProjectID, []byte(opts.Bucket.BucketName), opts.BatchSize),
|
||||
)(func(rows tagsql.Rows) error {
|
||||
objects, err = db.scanBucketObjectsDeletionServerSideCopy(ctx, opts.Bucket, rows)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return db.promoteNewAncestors(ctx, tx, objects)
|
||||
})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return int64(len(objects)), err
|
||||
}
|
||||
|
||||
func (db *DB) scanBucketObjectsDeletionServerSideCopy(ctx context.Context, location BucketLocation, rows tagsql.Rows) (result []deletedObjectInfo, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
defer func() { err = errs.Combine(err, rows.Close()) }()
|
||||
|
||||
result = make([]deletedObjectInfo, 0, 10)
|
||||
var rootPieceID *storj.PieceID
|
||||
var object deletedObjectInfo
|
||||
var segment deletedRemoteSegmentInfo
|
||||
var aliasPieces AliasPieces
|
||||
var segmentPosition *SegmentPosition
|
||||
|
||||
for rows.Next() {
|
||||
object.ProjectID = location.ProjectID
|
||||
object.BucketName = location.BucketName
|
||||
|
||||
err = rows.Scan(
|
||||
&object.StreamID,
|
||||
&segmentPosition,
|
||||
&rootPieceID,
|
||||
&aliasPieces,
|
||||
&object.PromotedAncestor,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, Error.New("unable to delete bucket objects: %w", err)
|
||||
}
|
||||
|
||||
if len(result) == 0 || result[len(result)-1].StreamID != object.StreamID {
|
||||
result = append(result, object)
|
||||
}
|
||||
if rootPieceID != nil {
|
||||
segment.Position = *segmentPosition
|
||||
segment.RootPieceID = *rootPieceID
|
||||
segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
if len(segment.Pieces) > 0 {
|
||||
result[len(result)-1].Segments = append(result[len(result)-1].Segments, segment)
|
||||
}
|
||||
}
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, Error.New("unable to delete object: %w", err)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (db *DB) deleteBucketObjectsWithCopyFeatureDisabled(ctx context.Context, opts DeleteBucketObjects) (deletedObjectCount int64, err error) {
|
||||
func (db *DB) deleteBucketObjects(ctx context.Context, opts DeleteBucketObjects) (deletedObjectCount int64, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
var query string
|
||||
@ -191,11 +57,13 @@ func (db *DB) deleteBucketObjectsWithCopyFeatureDisabled(ctx context.Context, op
|
||||
WITH deleted_objects AS (
|
||||
DELETE FROM objects
|
||||
WHERE project_id = $1 AND bucket_name = $2 LIMIT $3
|
||||
RETURNING objects.stream_id
|
||||
RETURNING objects.stream_id, objects.segment_count
|
||||
), deleted_segments AS (
|
||||
DELETE FROM segments
|
||||
WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
|
||||
RETURNING segments.stream_id
|
||||
)
|
||||
DELETE FROM segments
|
||||
WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
|
||||
RETURNING segments.stream_id, segments.root_piece_id, segments.remote_alias_pieces
|
||||
SELECT COUNT(1), COALESCE(SUM(segment_count), 0) FROM deleted_objects
|
||||
`
|
||||
case dbutil.Postgres:
|
||||
query = `
|
||||
@ -206,61 +74,26 @@ func (db *DB) deleteBucketObjectsWithCopyFeatureDisabled(ctx context.Context, op
|
||||
WHERE project_id = $1 AND bucket_name = $2
|
||||
LIMIT $3
|
||||
)
|
||||
RETURNING objects.stream_id
|
||||
RETURNING objects.stream_id, objects.segment_count
|
||||
), deleted_segments AS (
|
||||
DELETE FROM segments
|
||||
WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
|
||||
RETURNING segments.stream_id
|
||||
)
|
||||
DELETE FROM segments
|
||||
WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
|
||||
RETURNING segments.stream_id, segments.root_piece_id, segments.remote_alias_pieces
|
||||
SELECT COUNT(1), COALESCE(SUM(segment_count), 0) FROM deleted_objects
|
||||
`
|
||||
default:
|
||||
return 0, Error.New("unhandled database: %v", db.impl)
|
||||
}
|
||||
|
||||
// TODO: fix the count for objects without segments
|
||||
deletedSegments := make([]DeletedSegmentInfo, 0, 100)
|
||||
for {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
deletedSegments = deletedSegments[:0]
|
||||
deletedObjects := 0
|
||||
err = withRows(db.db.QueryContext(ctx, query,
|
||||
opts.Bucket.ProjectID, []byte(opts.Bucket.BucketName), opts.BatchSize))(func(rows tagsql.Rows) error {
|
||||
ids := map[uuid.UUID]struct{}{} // TODO: avoid map here
|
||||
for rows.Next() {
|
||||
var streamID uuid.UUID
|
||||
var segment DeletedSegmentInfo
|
||||
var aliasPieces AliasPieces
|
||||
err := rows.Scan(&streamID, &segment.RootPieceID, &aliasPieces)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
ids[streamID] = struct{}{}
|
||||
deletedSegments = append(deletedSegments, segment)
|
||||
}
|
||||
deletedObjects = len(ids)
|
||||
deletedObjectCount += int64(deletedObjects)
|
||||
return nil
|
||||
})
|
||||
|
||||
mon.Meter("object_delete").Mark(deletedObjects)
|
||||
mon.Meter("segment_delete").Mark(len(deletedSegments))
|
||||
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return deletedObjectCount, nil
|
||||
}
|
||||
return deletedObjectCount, Error.Wrap(err)
|
||||
}
|
||||
|
||||
if len(deletedSegments) == 0 {
|
||||
return deletedObjectCount, nil
|
||||
}
|
||||
var deletedSegmentCount int64
|
||||
err = db.db.QueryRowContext(ctx, query, opts.Bucket.ProjectID, []byte(opts.Bucket.BucketName), opts.BatchSize).Scan(&deletedObjectCount, &deletedSegmentCount)
|
||||
if err != nil {
|
||||
return 0, Error.Wrap(err)
|
||||
}
|
||||
|
||||
mon.Meter("object_delete").Mark64(deletedObjectCount)
|
||||
mon.Meter("segment_delete").Mark64(deletedSegmentCount)
|
||||
|
||||
return deletedObjectCount, nil
|
||||
}
|
||||
|
@ -297,6 +297,7 @@ func TestDeleteBucketObjectsCancel(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDeleteBucketWithCopies(t *testing.T) {
|
||||
duplicateMetadata := true
|
||||
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||
for _, numberOfSegments := range []int{0, 1, 3} {
|
||||
t.Run(fmt.Sprintf("%d segments", numberOfSegments), func(t *testing.T) {
|
||||
@ -321,7 +322,7 @@ func TestDeleteBucketWithCopies(t *testing.T) {
|
||||
metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj,
|
||||
CopyObjectStream: ©ObjectStream,
|
||||
}.Run(ctx, t, db, false)
|
||||
}.Run(ctx, t, db, duplicateMetadata)
|
||||
|
||||
_, err := db.DeleteBucketObjects(ctx, metabase.DeleteBucketObjects{
|
||||
Bucket: metabase.BucketLocation{
|
||||
@ -362,7 +363,7 @@ func TestDeleteBucketWithCopies(t *testing.T) {
|
||||
copyObj, _, copySegments := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj,
|
||||
CopyObjectStream: ©ObjectStream,
|
||||
}.Run(ctx, t, db, false)
|
||||
}.Run(ctx, t, db, duplicateMetadata)
|
||||
|
||||
_, err := db.DeleteBucketObjects(ctx, metabase.DeleteBucketObjects{
|
||||
Bucket: metabase.BucketLocation{
|
||||
@ -420,12 +421,12 @@ func TestDeleteBucketWithCopies(t *testing.T) {
|
||||
metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj1,
|
||||
CopyObjectStream: ©ObjectStream1,
|
||||
}.Run(ctx, t, db, false)
|
||||
}.Run(ctx, t, db, duplicateMetadata)
|
||||
|
||||
copyObj2, _, copySegments2 := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj2,
|
||||
CopyObjectStream: ©ObjectStream2,
|
||||
}.Run(ctx, t, db, false)
|
||||
}.Run(ctx, t, db, duplicateMetadata)
|
||||
|
||||
// done preparing, delete bucket 1
|
||||
_, err := db.DeleteBucketObjects(ctx, metabase.DeleteBucketObjects{
|
||||
@ -486,12 +487,12 @@ func TestDeleteBucketWithCopies(t *testing.T) {
|
||||
metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj1,
|
||||
CopyObjectStream: ©ObjectStream1,
|
||||
}.Run(ctx, t, db, true)
|
||||
}.Run(ctx, t, db, duplicateMetadata)
|
||||
|
||||
copyObj2, _, copySegments2 := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj2,
|
||||
CopyObjectStream: ©ObjectStream2,
|
||||
}.Run(ctx, t, db, true)
|
||||
}.Run(ctx, t, db, duplicateMetadata)
|
||||
|
||||
// done preparing, delete bucket 1
|
||||
_, err := db.DeleteBucketObjects(ctx, metabase.DeleteBucketObjects{
|
||||
|
@ -264,7 +264,7 @@ func (endpoint *Endpoint) deleteBucketNotEmpty(ctx context.Context, projectID uu
|
||||
deletedCount, err := endpoint.deleteBucketObjects(ctx, projectID, bucketName)
|
||||
if err != nil {
|
||||
endpoint.log.Error("internal", zap.Error(err))
|
||||
return nil, 0, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
return nil, 0, rpcstatus.Error(rpcstatus.Internal, "internal error")
|
||||
}
|
||||
|
||||
err = endpoint.deleteBucket(ctx, bucketName, projectID)
|
||||
@ -276,7 +276,7 @@ func (endpoint *Endpoint) deleteBucketNotEmpty(ctx context.Context, projectID uu
|
||||
return bucketName, 0, nil
|
||||
}
|
||||
endpoint.log.Error("internal", zap.Error(err))
|
||||
return nil, deletedCount, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
return nil, deletedCount, rpcstatus.Error(rpcstatus.Internal, "internal error")
|
||||
}
|
||||
|
||||
return bucketName, deletedCount, nil
|
||||
|
Loading…
Reference in New Issue
Block a user