From 260a673327ecac1e164ffd7b040f247fafe2d623 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Niewrza=C5=82?= Date: Thu, 5 Aug 2021 21:44:04 +0200 Subject: [PATCH] satellite/metabase: don't delete pieces until query results are received To optimize memory consumption we where consuming segment data during processing results from delete query. Turns out that there is a chance that query will be rolled-back if something will go wrong while reading results. In such case its possible to delete pices but object/segment will be still in DB. This change removed piece deletion from problematic place. Pieces are still deleted in batches but are not limited at the moment. To avoid memory issues object deletion batch was decreased. Change-Id: Icb3667220f9c25f64b73cf71d0cf3fdc7e5107c5 --- satellite/metabase/delete_bucket.go | 48 ++++++++++-------------- satellite/metabase/delete_bucket_test.go | 47 +++++++++++++++++------ 2 files changed, 54 insertions(+), 41 deletions(-) diff --git a/satellite/metabase/delete_bucket.go b/satellite/metabase/delete_bucket.go index b20ae9fdd..df6386330 100644 --- a/satellite/metabase/delete_bucket.go +++ b/satellite/metabase/delete_bucket.go @@ -14,8 +14,7 @@ import ( ) const ( - deleteBatchSizeLimit = intLimitRange(100) - deletePieceBatchLimit = intLimitRange(1000) + deleteBatchSizeLimit = intLimitRange(50) ) // DeleteBucketObjects contains arguments for deleting a whole bucket. @@ -23,15 +22,15 @@ type DeleteBucketObjects struct { Bucket BucketLocation BatchSize int - // DeletePiecesBatchSize maximum number of DeletedSegmentInfo entries - // passed to DeletePieces function at once. - DeletePiecesBatchSize int // DeletePieces is called for every batch of objects. // Slice `segments` will be reused between calls. DeletePieces func(ctx context.Context, segments []DeletedSegmentInfo) error } // DeleteBucketObjects deletes all objects in the specified bucket. +// Deletion performs in batches, so in case of error while processing, +// this method will return the number of objects deleted to the moment +// when an error occurs. func (db *DB) DeleteBucketObjects(ctx context.Context, opts DeleteBucketObjects) (deletedObjectCount int64, err error) { defer mon.Task()(&ctx)(&err) @@ -40,7 +39,6 @@ func (db *DB) DeleteBucketObjects(ctx context.Context, opts DeleteBucketObjects) } deleteBatchSizeLimit.Ensure(&opts.BatchSize) - deletePieceBatchLimit.Ensure(&opts.DeletePiecesBatchSize) var query string switch db.impl { @@ -75,11 +73,14 @@ func (db *DB) DeleteBucketObjects(ctx context.Context, opts DeleteBucketObjects) } // TODO: fix the count for objects without segments - deletedSegmentsBatch := make([]DeletedSegmentInfo, 0, opts.DeletePiecesBatchSize) + deletedSegments := make([]DeletedSegmentInfo, 0, 100) for { - deletedSegmentsBatch = deletedSegmentsBatch[:0] - batchDeletedObjects := 0 - deletedSegments := 0 + if err := ctx.Err(); err != nil { + return 0, err + } + + deletedSegments = deletedSegments[:0] + deletedObjects := 0 err = withRows(db.db.QueryContext(ctx, query, opts.Bucket.ProjectID, []byte(opts.Bucket.BucketName), opts.BatchSize))(func(rows tagsql.Rows) error { ids := map[uuid.UUID]struct{}{} // TODO: avoid map here @@ -97,27 +98,15 @@ func (db *DB) DeleteBucketObjects(ctx context.Context, opts DeleteBucketObjects) } ids[streamID] = struct{}{} - deletedSegmentsBatch = append(deletedSegmentsBatch, segment) - - if len(deletedSegmentsBatch) >= opts.DeletePiecesBatchSize { - if opts.DeletePieces != nil { - err = opts.DeletePieces(ctx, deletedSegmentsBatch) - if err != nil { - return Error.Wrap(err) - } - deletedSegmentsBatch = deletedSegmentsBatch[:0] - } - } - - deletedSegments++ + deletedSegments = append(deletedSegments, segment) } - batchDeletedObjects = len(ids) - deletedObjectCount += int64(len(ids)) + deletedObjects = len(ids) + deletedObjectCount += int64(deletedObjects) return nil }) - mon.Meter("object_delete").Mark(batchDeletedObjects) - mon.Meter("segment_delete").Mark(deletedSegments) + mon.Meter("object_delete").Mark(deletedObjects) + mon.Meter("segment_delete").Mark(len(deletedSegments)) if err != nil { if errors.Is(err, sql.ErrNoRows) { @@ -125,12 +114,13 @@ func (db *DB) DeleteBucketObjects(ctx context.Context, opts DeleteBucketObjects) } return deletedObjectCount, Error.Wrap(err) } - if deletedSegments == 0 { + + if len(deletedSegments) == 0 { return deletedObjectCount, nil } if opts.DeletePieces != nil { - err = opts.DeletePieces(ctx, deletedSegmentsBatch) + err = opts.DeletePieces(ctx, deletedSegments) if err != nil { return deletedObjectCount, Error.Wrap(err) } diff --git a/satellite/metabase/delete_bucket_test.go b/satellite/metabase/delete_bucket_test.go index f13e8da43..58bb703b6 100644 --- a/satellite/metabase/delete_bucket_test.go +++ b/satellite/metabase/delete_bucket_test.go @@ -232,16 +232,15 @@ func TestDeleteBucketObjects(t *testing.T) { t.Run("object with multiple segments", func(t *testing.T) { defer metabasetest.DeleteAll{}.Check(ctx, t, db) - metabasetest.CreateObject(ctx, t, db, obj1, 104) + metabasetest.CreateObject(ctx, t, db, obj1, 37) metabasetest.DeleteBucketObjects{ Opts: metabase.DeleteBucketObjects{ - Bucket: obj1.Location().Bucket(), - BatchSize: 2, - DeletePiecesBatchSize: 10, + Bucket: obj1.Location().Bucket(), + BatchSize: 2, DeletePieces: func(ctx context.Context, segments []metabase.DeletedSegmentInfo) error { - if len(segments) != 10 && len(segments) != 4 { - return errors.New("expected 4 or 10 segments") + if len(segments) != 37 { + return errors.New("expected 37 segments") } return nil }, @@ -266,9 +265,8 @@ func TestDeleteBucketObjects(t *testing.T) { segmentsDeleted := 0 metabasetest.DeleteBucketObjects{ Opts: metabase.DeleteBucketObjects{ - Bucket: root.Location().Bucket(), - BatchSize: 1, - DeletePiecesBatchSize: 1, + Bucket: root.Location().Bucket(), + BatchSize: 1, DeletePieces: func(ctx context.Context, segments []metabase.DeletedSegmentInfo) error { segmentsDeleted += len(segments) return nil @@ -302,9 +300,8 @@ func TestDeleteBucketObjectsParallel(t *testing.T) { for i := 0; i < 3; i++ { ctx.Go(func() error { _, err := db.DeleteBucketObjects(ctx, metabase.DeleteBucketObjects{ - Bucket: root.Location().Bucket(), - BatchSize: 2, - DeletePiecesBatchSize: 10, + Bucket: root.Location().Bucket(), + BatchSize: 2, DeletePieces: func(ctx context.Context, segments []metabase.DeletedSegmentInfo) error { return nil }, @@ -318,3 +315,29 @@ func TestDeleteBucketObjectsParallel(t *testing.T) { metabasetest.Verify{}.Check(ctx, t, db) }) } + +func TestDeleteBucketObjectsCancel(t *testing.T) { + metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + object := metabasetest.CreateObject(ctx, t, db, metabasetest.RandObjectStream(), 1) + + testCtx, cancel := context.WithCancel(ctx) + cancel() + _, err := db.DeleteBucketObjects(testCtx, metabase.DeleteBucketObjects{ + Bucket: object.Location().Bucket(), + BatchSize: 2, + DeletePieces: func(ctx context.Context, segments []metabase.DeletedSegmentInfo) error { + return nil + }, + }) + require.Error(t, err) + + metabasetest.Verify{ + Objects: []metabase.RawObject{metabase.RawObject(object)}, + Segments: []metabase.RawSegment{ + metabasetest.DefaultRawSegment(object.ObjectStream, metabase.SegmentPosition{}), + }, + }.Check(ctx, t, db) + }) +}