satellite/metabase: use implicit tx when batch deleting

Change-Id: I2dc577b95209674b626a76633dedf5b5b1641db0
This commit is contained in:
Egon Elbre 2021-07-27 15:19:23 +03:00
parent d0596c060d
commit 707c4a7b0e

View File

@ -179,44 +179,38 @@ func (db *DB) deleteObjectsAndSegments(ctx context.Context, objects []ObjectStre
for _, obj := range objects {
obj := obj
batch.Queue(`START TRANSACTION`)
batch.Queue(`
DELETE FROM objects
WHERE (project_id, bucket_name, object_key, version) = ($1::BYTEA, $2::BYTEA, $3::BYTEA, $4)
AND stream_id = $5::BYTEA
`, obj.ProjectID, []byte(obj.BucketName), []byte(obj.ObjectKey), obj.Version, obj.StreamID)
batch.Queue(`
WITH deleted_objects AS (
DELETE FROM objects
WHERE (project_id, bucket_name, object_key, version, stream_id) = ($1::BYTEA, $2, $3, $4, $5::BYTEA)
RETURNING stream_id
)
DELETE FROM segments
WHERE segments.stream_id = $1::BYTEA
`, obj.StreamID)
batch.Queue(`COMMIT TRANSACTION`)
WHERE segments.stream_id = $5::BYTEA
`, obj.ProjectID, []byte(obj.BucketName), []byte(obj.ObjectKey), obj.Version, obj.StreamID)
}
results := conn.SendBatch(ctx, &batch)
defer func() { err = errs.Combine(err, results.Close()) }()
var objectsDeleted, segmentsDeleted int64
var objectsDeletedGuess, segmentsDeleted int64
var errlist errs.Group
for i := 0; i < batch.Len(); i++ {
result, err := results.Exec()
errlist.Add(err)
switch i % 3 {
case 0: // start transcation
case 1: // delete objects
if err == nil {
objectsDeleted += result.RowsAffected()
}
case 2: // delete segments
if err == nil {
segmentsDeleted += result.RowsAffected()
}
case 3: // commit transaction
if affectedSegmentCount := result.RowsAffected(); affectedSegmentCount > 0 {
// Note, this slightly miscounts objects without any segments
// there doesn't seem to be a simple work around for this.
// Luckily, this is used only for metrics, where it's not a
// significant problem to slightly miscount.
objectsDeletedGuess++
segmentsDeleted += affectedSegmentCount
}
}
mon.Meter("object_delete").Mark64(objectsDeleted)
mon.Meter("object_delete").Mark64(objectsDeletedGuess)
mon.Meter("segment_delete").Mark64(segmentsDeleted)
return errlist.Err()