satellite/metabase: while deleting bucket objects delete pieces in batches
It's possible that single object will have a lot of segments and at the moment we are trying to collect all pieces at once and send to storage nodes information about deletion. Such approach my lead to using extensive amount of memory. This change is handling this problem by calling DeletePieces function multiple times with only part of pieces to delete for a single call. Change-Id: Ie1e66cd9d86d130eb89a61cf6e23f38b8cb8859e
This commit is contained in:
parent
a59232bb1d
commit
1f5fbbd24a
@ -15,11 +15,16 @@ import (
|
||||
|
||||
const deleteBatchSizeLimit = 100
|
||||
|
||||
const deletePieceBatchLimit = 1000
|
||||
|
||||
// DeleteBucketObjects contains arguments for deleting a whole bucket.
|
||||
type DeleteBucketObjects struct {
|
||||
Bucket BucketLocation
|
||||
BatchSize int
|
||||
|
||||
// DeletePiecesBatchSize maximum number of DeletedSegmentInfo entries
|
||||
// passed to DeletePieces function at once.
|
||||
DeletePiecesBatchSize int
|
||||
// DeletePieces is called for every batch of objects.
|
||||
// Slice `segments` will be reused between calls.
|
||||
DeletePieces func(ctx context.Context, segments []DeletedSegmentInfo) error
|
||||
@ -38,6 +43,11 @@ func (db *DB) DeleteBucketObjects(ctx context.Context, opts DeleteBucketObjects)
|
||||
batchSize = deleteBatchSizeLimit
|
||||
}
|
||||
|
||||
deletePiecesBatchSize := opts.DeletePiecesBatchSize
|
||||
if deletePiecesBatchSize <= 0 || deletePiecesBatchSize > deletePieceBatchLimit {
|
||||
deletePiecesBatchSize = deletePieceBatchLimit
|
||||
}
|
||||
|
||||
var query string
|
||||
switch db.impl {
|
||||
case dbutil.Cockroach:
|
||||
@ -71,10 +81,11 @@ func (db *DB) DeleteBucketObjects(ctx context.Context, opts DeleteBucketObjects)
|
||||
}
|
||||
|
||||
// TODO: fix the count for objects without segments
|
||||
var deleteSegments []DeletedSegmentInfo
|
||||
deletedSegmentsBatch := make([]DeletedSegmentInfo, 0, deletePiecesBatchSize)
|
||||
for {
|
||||
deleteSegments = deleteSegments[:0]
|
||||
deletedSegmentsBatch = deletedSegmentsBatch[:0]
|
||||
batchDeletedObjects := 0
|
||||
deletedSegments := 0
|
||||
err = withRows(db.db.Query(ctx, query,
|
||||
opts.Bucket.ProjectID, []byte(opts.Bucket.BucketName), batchSize))(func(rows tagsql.Rows) error {
|
||||
ids := map[uuid.UUID]struct{}{} // TODO: avoid map here
|
||||
@ -92,7 +103,19 @@ func (db *DB) DeleteBucketObjects(ctx context.Context, opts DeleteBucketObjects)
|
||||
}
|
||||
|
||||
ids[streamID] = struct{}{}
|
||||
deleteSegments = append(deleteSegments, segment)
|
||||
deletedSegmentsBatch = append(deletedSegmentsBatch, segment)
|
||||
|
||||
if len(deletedSegmentsBatch) == deletePiecesBatchSize {
|
||||
if opts.DeletePieces != nil {
|
||||
err = opts.DeletePieces(ctx, deletedSegmentsBatch)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
deletedSegmentsBatch = deletedSegmentsBatch[:0]
|
||||
}
|
||||
}
|
||||
|
||||
deletedSegments++
|
||||
}
|
||||
batchDeletedObjects = len(ids)
|
||||
deletedObjectCount += int64(len(ids))
|
||||
@ -100,7 +123,7 @@ func (db *DB) DeleteBucketObjects(ctx context.Context, opts DeleteBucketObjects)
|
||||
})
|
||||
|
||||
mon.Meter("object_delete").Mark(batchDeletedObjects)
|
||||
mon.Meter("segment_delete").Mark(len(deleteSegments))
|
||||
mon.Meter("segment_delete").Mark(deletedSegments)
|
||||
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
@ -108,12 +131,12 @@ func (db *DB) DeleteBucketObjects(ctx context.Context, opts DeleteBucketObjects)
|
||||
}
|
||||
return deletedObjectCount, Error.Wrap(err)
|
||||
}
|
||||
if len(deleteSegments) == 0 {
|
||||
if len(deletedSegmentsBatch) == 0 {
|
||||
return deletedObjectCount, nil
|
||||
}
|
||||
|
||||
if opts.DeletePieces != nil {
|
||||
err = opts.DeletePieces(ctx, deleteSegments)
|
||||
err = opts.DeletePieces(ctx, deletedSegmentsBatch)
|
||||
if err != nil {
|
||||
return deletedObjectCount, Error.Wrap(err)
|
||||
}
|
||||
|
@ -226,5 +226,28 @@ func TestDeleteBucketObjects(t *testing.T) {
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("object with multiple segments", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.CreateObject(ctx, t, db, obj1, 104)
|
||||
|
||||
metabasetest.DeleteBucketObjects{
|
||||
Opts: metabase.DeleteBucketObjects{
|
||||
Bucket: obj1.Location().Bucket(),
|
||||
BatchSize: 2,
|
||||
DeletePiecesBatchSize: 10,
|
||||
DeletePieces: func(ctx context.Context, segments []metabase.DeletedSegmentInfo) error {
|
||||
if len(segments) != 10 && len(segments) != 4 {
|
||||
return errors.New("expected 4 or 10 segments")
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
Deleted: 1,
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.Verify{}.Check(ctx, t, db)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user