storj/satellite/metabase/delete_objects.go
Michal Niewrzal e129841130 satellite/metabase: remove AOST from deleteInactiveObjectsAndSegments
By mistake AOST was added to query in deleteInactiveObjectsAndSegments
in DeleteZombieObjects. Delete statement is not supporting it.
Unfortunately unit tests didn't cover this case. This change removes
AOST from mentioned method and it adding AOST cases to unit tests.

Change-Id: Ib7f65134290df08c490c96b7e367d12f497a3373
2023-06-28 13:24:14 +00:00

292 lines
8.7 KiB
Go

// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package metabase
import (
"context"
"encoding/hex"
"time"
"github.com/jackc/pgx/v5"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/private/dbutil/pgxutil"
"storj.io/private/tagsql"
)
const (
deleteBatchsizeLimit = intLimitRange(1000)
)
// DeleteExpiredObjects contains all the information necessary to delete expired objects and segments.
type DeleteExpiredObjects struct {
ExpiredBefore time.Time
AsOfSystemInterval time.Duration
BatchSize int
}
// DeleteExpiredObjects deletes all objects that expired before expiredBefore.
func (db *DB) DeleteExpiredObjects(ctx context.Context, opts DeleteExpiredObjects) (err error) {
defer mon.Task()(&ctx)(&err)
return db.deleteObjectsAndSegmentsBatch(ctx, opts.BatchSize, func(startAfter ObjectStream, batchsize int) (last ObjectStream, err error) {
query := `
SELECT
project_id, bucket_name, object_key, version, stream_id,
expires_at
FROM objects
` + db.impl.AsOfSystemInterval(opts.AsOfSystemInterval) + `
WHERE
(project_id, bucket_name, object_key, version) > ($1, $2, $3, $4)
AND expires_at < $5
ORDER BY project_id, bucket_name, object_key, version
LIMIT $6;`
expiredObjects := make([]ObjectStream, 0, batchsize)
scanErrClass := errs.Class("DB rows scan has failed")
err = withRows(db.db.QueryContext(ctx, query,
startAfter.ProjectID, []byte(startAfter.BucketName), []byte(startAfter.ObjectKey), startAfter.Version,
opts.ExpiredBefore,
batchsize),
)(func(rows tagsql.Rows) error {
for rows.Next() {
var expiresAt time.Time
err = rows.Scan(
&last.ProjectID, &last.BucketName, &last.ObjectKey, &last.Version, &last.StreamID,
&expiresAt)
if err != nil {
return scanErrClass.Wrap(err)
}
db.log.Info("Deleting expired object",
zap.Stringer("Project", last.ProjectID),
zap.String("Bucket", last.BucketName),
zap.String("Object Key", string(last.ObjectKey)),
zap.Int64("Version", int64(last.Version)),
zap.String("StreamID", hex.EncodeToString(last.StreamID[:])),
zap.Time("Expired At", expiresAt),
)
expiredObjects = append(expiredObjects, last)
}
return nil
})
if err != nil {
if scanErrClass.Has(err) {
return ObjectStream{}, Error.New("unable to select expired objects for deletion: %w", err)
}
db.log.Warn("unable to select expired objects for deletion", zap.Error(Error.Wrap(err)))
return ObjectStream{}, nil
}
err = db.deleteObjectsAndSegments(ctx, expiredObjects)
if err != nil {
db.log.Warn("delete from DB expired objects", zap.Error(err))
return ObjectStream{}, nil
}
return last, nil
})
}
// DeleteZombieObjects contains all the information necessary to delete zombie objects and segments.
type DeleteZombieObjects struct {
DeadlineBefore time.Time
InactiveDeadline time.Time
AsOfSystemInterval time.Duration
BatchSize int
}
// DeleteZombieObjects deletes all objects that zombie deletion deadline passed.
func (db *DB) DeleteZombieObjects(ctx context.Context, opts DeleteZombieObjects) (err error) {
defer mon.Task()(&ctx)(&err)
return db.deleteObjectsAndSegmentsBatch(ctx, opts.BatchSize, func(startAfter ObjectStream, batchsize int) (last ObjectStream, err error) {
// pending objects migrated to metabase didn't have zombie_deletion_deadline column set, because
// of that we need to get into account also object with zombie_deletion_deadline set to NULL
query := `
SELECT
project_id, bucket_name, object_key, version, stream_id
FROM objects
` + db.impl.AsOfSystemInterval(opts.AsOfSystemInterval) + `
WHERE
(project_id, bucket_name, object_key, version) > ($1, $2, $3, $4)
AND status = ` + pendingStatus + `
AND (zombie_deletion_deadline IS NULL OR zombie_deletion_deadline < $5)
ORDER BY project_id, bucket_name, object_key, version
LIMIT $6;`
objects := make([]ObjectStream, 0, batchsize)
scanErrClass := errs.Class("DB rows scan has failed")
err = withRows(db.db.QueryContext(ctx, query,
startAfter.ProjectID, []byte(startAfter.BucketName), []byte(startAfter.ObjectKey), startAfter.Version,
opts.DeadlineBefore,
batchsize),
)(func(rows tagsql.Rows) error {
for rows.Next() {
err = rows.Scan(&last.ProjectID, &last.BucketName, &last.ObjectKey, &last.Version, &last.StreamID)
if err != nil {
return scanErrClass.Wrap(err)
}
db.log.Debug("selected zombie object for deleting it",
zap.Stringer("Project", last.ProjectID),
zap.String("Bucket", last.BucketName),
zap.String("Object Key", string(last.ObjectKey)),
zap.Int64("Version", int64(last.Version)),
zap.String("StreamID", hex.EncodeToString(last.StreamID[:])),
)
objects = append(objects, last)
}
return nil
})
if err != nil {
if scanErrClass.Has(err) {
return ObjectStream{}, Error.New("unable to select zombie objects for deletion: %w", err)
}
db.log.Warn("unable to select zombie objects for deletion", zap.Error(Error.Wrap(err)))
return ObjectStream{}, nil
}
err = db.deleteInactiveObjectsAndSegments(ctx, objects, opts)
if err != nil {
db.log.Warn("delete from DB zombie objects", zap.Error(err))
return ObjectStream{}, nil
}
return last, nil
})
}
func (db *DB) deleteObjectsAndSegmentsBatch(ctx context.Context, batchsize int, deleteBatch func(startAfter ObjectStream, batchsize int) (last ObjectStream, err error)) (err error) {
defer mon.Task()(&ctx)(&err)
deleteBatchsizeLimit.Ensure(&batchsize)
var startAfter ObjectStream
for {
lastDeleted, err := deleteBatch(startAfter, batchsize)
if err != nil {
return err
}
if lastDeleted.StreamID.IsZero() {
return nil
}
startAfter = lastDeleted
}
}
func (db *DB) deleteObjectsAndSegments(ctx context.Context, objects []ObjectStream) (err error) {
defer mon.Task()(&ctx)(&err)
if len(objects) == 0 {
return nil
}
err = pgxutil.Conn(ctx, db.db, func(conn *pgx.Conn) error {
var batch pgx.Batch
for _, obj := range objects {
obj := obj
batch.Queue(`
WITH deleted_objects AS (
DELETE FROM objects
WHERE (project_id, bucket_name, object_key, version, stream_id) = ($1::BYTEA, $2, $3, $4, $5::BYTEA)
RETURNING stream_id
)
DELETE FROM segments
WHERE segments.stream_id = $5::BYTEA
`, obj.ProjectID, []byte(obj.BucketName), []byte(obj.ObjectKey), obj.Version, obj.StreamID)
}
results := conn.SendBatch(ctx, &batch)
defer func() { err = errs.Combine(err, results.Close()) }()
var objectsDeletedGuess, segmentsDeleted int64
var errlist errs.Group
for i := 0; i < batch.Len(); i++ {
result, err := results.Exec()
errlist.Add(err)
if affectedSegmentCount := result.RowsAffected(); affectedSegmentCount > 0 {
// Note, this slightly miscounts objects without any segments
// there doesn't seem to be a simple work around for this.
// Luckily, this is used only for metrics, where it's not a
// significant problem to slightly miscount.
objectsDeletedGuess++
segmentsDeleted += affectedSegmentCount
}
}
mon.Meter("object_delete").Mark64(objectsDeletedGuess)
mon.Meter("segment_delete").Mark64(segmentsDeleted)
return errlist.Err()
})
if err != nil {
return Error.New("unable to delete expired objects: %w", err)
}
return nil
}
func (db *DB) deleteInactiveObjectsAndSegments(ctx context.Context, objects []ObjectStream, opts DeleteZombieObjects) (err error) {
defer mon.Task()(&ctx)(&err)
if len(objects) == 0 {
return nil
}
err = pgxutil.Conn(ctx, db.db, func(conn *pgx.Conn) error {
var batch pgx.Batch
for _, obj := range objects {
batch.Queue(`
WITH check_segments AS (
SELECT 1 FROM segments
WHERE stream_id = $5::BYTEA AND created_at > $6
), deleted_objects AS (
DELETE FROM objects
WHERE
(project_id, bucket_name, object_key, version) = ($1::BYTEA, $2::BYTEA, $3::BYTEA, $4) AND
NOT EXISTS (SELECT 1 FROM check_segments)
RETURNING stream_id
)
DELETE FROM segments
WHERE segments.stream_id IN (SELECT stream_id FROM deleted_objects)
`, obj.ProjectID, []byte(obj.BucketName), []byte(obj.ObjectKey), obj.Version, obj.StreamID, opts.InactiveDeadline)
}
results := conn.SendBatch(ctx, &batch)
defer func() { err = errs.Combine(err, results.Close()) }()
var segmentsDeleted int64
var errlist errs.Group
for i := 0; i < batch.Len(); i++ {
result, err := results.Exec()
errlist.Add(err)
if err == nil {
segmentsDeleted += result.RowsAffected()
}
}
// TODO calculate deleted objects
mon.Meter("zombie_segment_delete").Mark64(segmentsDeleted)
mon.Meter("segment_delete").Mark64(segmentsDeleted)
return errlist.Err()
})
if err != nil {
return Error.New("unable to delete zombie objects: %w", err)
}
return nil
}