satellite/metainfo/metainfo: simplify expired objects deletion query
Change-Id: Iba5fc498527338ed6d2c5dd779c508ec8b6ce443
This commit is contained in:
parent
afcc55fb47
commit
c3f8e06798
@ -32,7 +32,6 @@ type expiredObject struct {
|
||||
bucketName string
|
||||
objectKey ObjectKey
|
||||
version Version
|
||||
streamID uuid.UUID
|
||||
}
|
||||
|
||||
// DeleteExpiredObjects deletes all objects that expired before expiredBefore.
|
||||
@ -66,7 +65,7 @@ func (db *DB) deleteExpiredObjectsBatch(ctx context.Context, startAfter Object,
|
||||
}
|
||||
query := `
|
||||
SELECT project_id, bucket_name,
|
||||
object_key, version, stream_id,
|
||||
object_key, version,
|
||||
expires_at
|
||||
FROM objects
|
||||
` + asOfSystemTimeString + `
|
||||
@ -84,7 +83,7 @@ func (db *DB) deleteExpiredObjectsBatch(ctx context.Context, startAfter Object,
|
||||
)(func(rows tagsql.Rows) error {
|
||||
for rows.Next() {
|
||||
err = rows.Scan(&lastDeleted.ProjectID, &lastDeleted.BucketName,
|
||||
&lastDeleted.ObjectKey, &lastDeleted.Version, &lastDeleted.StreamID,
|
||||
&lastDeleted.ObjectKey, &lastDeleted.Version,
|
||||
&lastDeleted.ExpiresAt)
|
||||
if err != nil {
|
||||
return Error.New("unable to delete expired objects: %w", err)
|
||||
@ -99,7 +98,6 @@ func (db *DB) deleteExpiredObjectsBatch(ctx context.Context, startAfter Object,
|
||||
)
|
||||
expiredObjects = append(expiredObjects, expiredObject{
|
||||
lastDeleted.ProjectID, lastDeleted.BucketName, lastDeleted.ObjectKey, lastDeleted.Version,
|
||||
lastDeleted.StreamID,
|
||||
})
|
||||
}
|
||||
|
||||
@ -132,31 +130,28 @@ func (db *DB) deleteExpiredObjects(ctx context.Context, expiredObjects []expired
|
||||
buckets := make([][]byte, len(expiredObjects))
|
||||
objectKeys := make([][]byte, len(expiredObjects))
|
||||
versions := make([]int32, len(expiredObjects))
|
||||
streamIds := make([][]byte, len(expiredObjects))
|
||||
|
||||
for i, object := range expiredObjects {
|
||||
projectIds[i] = object.projectID
|
||||
buckets[i] = []byte(object.bucketName)
|
||||
objectKeys[i] = []byte(object.objectKey)
|
||||
versions[i] = int32(object.version)
|
||||
streamIds[i] = object.streamID[:]
|
||||
}
|
||||
query := `
|
||||
WITH deleted_objects AS (
|
||||
DELETE FROM objects
|
||||
WHERE
|
||||
(project_id, bucket_name, object_key, version, stream_id) IN (
|
||||
(project_id, bucket_name, object_key, version) IN (
|
||||
SELECT
|
||||
unnest($1::BYTEA[]),
|
||||
unnest($2::BYTEA[]),
|
||||
unnest($3::BYTEA[]),
|
||||
unnest($4::INT4[]),
|
||||
unnest($5::BYTEA[])
|
||||
unnest($4::INT4[])
|
||||
)
|
||||
RETURNING stream_id
|
||||
)
|
||||
DELETE FROM segments
|
||||
WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects ORDER BY deleted_objects.stream_id)
|
||||
WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
|
||||
`
|
||||
_, err = db.db.ExecContext(ctx,
|
||||
query,
|
||||
@ -164,7 +159,6 @@ func (db *DB) deleteExpiredObjects(ctx context.Context, expiredObjects []expired
|
||||
pgutil.ByteaArray(buckets),
|
||||
pgutil.ByteaArray(objectKeys),
|
||||
pgutil.Int4Array(versions),
|
||||
pgutil.ByteaArray(streamIds),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user