satellite/metabase: delete bucket deletes also from pending_objects
While deleting bucket we need also to delete pending objects from pending_objects table. Part of https://github.com/storj/storj/issues/6048 Change-Id: Icc83eaecf8388704e0b6329c397e8028debcf672
This commit is contained in:
parent
ffa50f0758
commit
16588033fd
@ -32,18 +32,36 @@ func (db *DB) DeleteBucketObjects(ctx context.Context, opts DeleteBucketObjects)
|
||||
|
||||
deleteBatchSizeLimit.Ensure(&opts.BatchSize)
|
||||
|
||||
for {
|
||||
// TODO we may think about doing pending and committed objects in parallel
|
||||
deletedBatchCount := int64(opts.BatchSize)
|
||||
for deletedBatchCount > 0 {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return deletedObjectCount, err
|
||||
}
|
||||
|
||||
deletedBatchCount, err := db.deleteBucketObjects(ctx, opts)
|
||||
deletedBatchCount, err = db.deleteBucketObjects(ctx, opts)
|
||||
deletedObjectCount += deletedBatchCount
|
||||
|
||||
if err != nil || deletedBatchCount == 0 {
|
||||
if err != nil {
|
||||
return deletedObjectCount, err
|
||||
}
|
||||
}
|
||||
|
||||
deletedBatchCount = int64(opts.BatchSize)
|
||||
for deletedBatchCount > 0 {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return deletedObjectCount, err
|
||||
}
|
||||
|
||||
deletedBatchCount, err = db.deleteBucketPendingObjects(ctx, opts)
|
||||
deletedObjectCount += deletedBatchCount
|
||||
|
||||
if err != nil {
|
||||
return deletedObjectCount, err
|
||||
}
|
||||
}
|
||||
|
||||
return deletedObjectCount, nil
|
||||
}
|
||||
|
||||
func (db *DB) deleteBucketObjects(ctx context.Context, opts DeleteBucketObjects) (deletedObjectCount int64, err error) {
|
||||
@ -97,3 +115,54 @@ func (db *DB) deleteBucketObjects(ctx context.Context, opts DeleteBucketObjects)
|
||||
|
||||
return deletedObjectCount, nil
|
||||
}
|
||||
|
||||
func (db *DB) deleteBucketPendingObjects(ctx context.Context, opts DeleteBucketObjects) (deletedObjectCount int64, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
var query string
|
||||
|
||||
// TODO handle number of deleted segments
|
||||
switch db.impl {
|
||||
case dbutil.Cockroach:
|
||||
query = `
|
||||
WITH deleted_objects AS (
|
||||
DELETE FROM pending_objects
|
||||
WHERE project_id = $1 AND bucket_name = $2 LIMIT $3
|
||||
RETURNING pending_objects.stream_id
|
||||
), deleted_segments AS (
|
||||
DELETE FROM segments
|
||||
WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
|
||||
RETURNING segments.stream_id
|
||||
)
|
||||
SELECT COUNT(1) FROM deleted_objects
|
||||
`
|
||||
case dbutil.Postgres:
|
||||
query = `
|
||||
WITH deleted_objects AS (
|
||||
DELETE FROM pending_objects
|
||||
WHERE stream_id IN (
|
||||
SELECT stream_id FROM pending_objects
|
||||
WHERE project_id = $1 AND bucket_name = $2
|
||||
LIMIT $3
|
||||
)
|
||||
RETURNING pending_objects.stream_id
|
||||
), deleted_segments AS (
|
||||
DELETE FROM segments
|
||||
WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
|
||||
RETURNING segments.stream_id
|
||||
)
|
||||
SELECT COUNT(1) FROM deleted_objects
|
||||
`
|
||||
default:
|
||||
return 0, Error.New("unhandled database: %v", db.impl)
|
||||
}
|
||||
|
||||
err = db.db.QueryRowContext(ctx, query, opts.Bucket.ProjectID, []byte(opts.Bucket.BucketName), opts.BatchSize).Scan(&deletedObjectCount)
|
||||
if err != nil {
|
||||
return 0, Error.Wrap(err)
|
||||
}
|
||||
|
||||
mon.Meter("object_delete").Mark64(deletedObjectCount)
|
||||
|
||||
return deletedObjectCount, nil
|
||||
}
|
||||
|
@ -238,6 +238,53 @@ func TestDeleteBucketObjects(t *testing.T) {
|
||||
|
||||
metabasetest.Verify{}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("pending and committed objects", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.CreateObject(ctx, t, db, obj1, 2)
|
||||
|
||||
obj1.ObjectKey = "some key"
|
||||
obj1.Version = metabase.NextVersion
|
||||
metabasetest.BeginObjectNextVersion{
|
||||
Opts: metabase.BeginObjectNextVersion{
|
||||
ObjectStream: obj1,
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
UsePendingObjectsTable: true,
|
||||
},
|
||||
Version: 1,
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.DeleteBucketObjects{
|
||||
Opts: metabase.DeleteBucketObjects{
|
||||
Bucket: obj1.Location().Bucket(),
|
||||
BatchSize: 2,
|
||||
},
|
||||
Deleted: 2,
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.Verify{}.Check(ctx, t, db)
|
||||
|
||||
// object only in pending_objects table
|
||||
metabasetest.BeginObjectNextVersion{
|
||||
Opts: metabase.BeginObjectNextVersion{
|
||||
ObjectStream: obj1,
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
UsePendingObjectsTable: true,
|
||||
},
|
||||
Version: 1,
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.DeleteBucketObjects{
|
||||
Opts: metabase.DeleteBucketObjects{
|
||||
Bucket: obj1.Location().Bucket(),
|
||||
BatchSize: 2,
|
||||
},
|
||||
Deleted: 1,
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.Verify{}.Check(ctx, t, db)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user