2020-11-09 14:55:10 +00:00
|
|
|
// Copyright (C) 2020 Storj Labs, Inc.
|
|
|
|
// See LICENSE for copying information.
|
|
|
|
|
|
|
|
package metabase
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2021-04-16 10:14:27 +01:00
|
|
|
"encoding/hex"
|
2021-03-16 11:54:40 +00:00
|
|
|
"fmt"
|
2020-11-09 14:55:10 +00:00
|
|
|
"time"
|
|
|
|
|
2021-04-16 11:03:07 +01:00
|
|
|
"github.com/jackc/pgx/v4"
|
|
|
|
"github.com/jackc/pgx/v4/stdlib"
|
|
|
|
"github.com/zeebo/errs"
|
2020-11-09 14:55:10 +00:00
|
|
|
"go.uber.org/zap"
|
|
|
|
|
2021-04-23 10:52:40 +01:00
|
|
|
"storj.io/private/dbutil"
|
|
|
|
"storj.io/private/tagsql"
|
2020-11-09 14:55:10 +00:00
|
|
|
)
|
|
|
|
|
2021-03-16 11:54:40 +00:00
|
|
|
const (
|
2021-04-29 10:21:29 +01:00
|
|
|
deleteBatchsizeLimit = 1000
|
2021-03-16 11:54:40 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
// DeleteExpiredObjects contains all the information necessary to delete expired objects and segments.
|
|
|
|
type DeleteExpiredObjects struct {
|
|
|
|
ExpiredBefore time.Time
|
|
|
|
AsOfSystemTime time.Time
|
|
|
|
BatchSize int
|
|
|
|
}
|
|
|
|
|
2020-11-09 14:55:10 +00:00
|
|
|
// DeleteExpiredObjects deletes all objects that expired before expiredBefore.
|
2021-03-16 11:54:40 +00:00
|
|
|
func (db *DB) DeleteExpiredObjects(ctx context.Context, opts DeleteExpiredObjects) (err error) {
|
2020-11-09 14:55:10 +00:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
2021-03-16 11:54:40 +00:00
|
|
|
|
2021-04-29 10:21:29 +01:00
|
|
|
var asOfSystemTimeString string
|
|
|
|
if !opts.AsOfSystemTime.IsZero() && db.implementation == dbutil.Cockroach {
|
|
|
|
asOfSystemTimeString = fmt.Sprintf(` AS OF SYSTEM TIME '%d' `, opts.AsOfSystemTime.UnixNano())
|
2021-03-16 11:54:40 +00:00
|
|
|
}
|
2021-04-29 10:21:29 +01:00
|
|
|
|
|
|
|
return db.deleteObjectsAndSegmentsBatch(ctx, opts.BatchSize, func(startAfter ObjectStream, batchsize int) (last ObjectStream, err error) {
|
|
|
|
query := `
|
|
|
|
SELECT
|
|
|
|
project_id, bucket_name, object_key, version, stream_id,
|
|
|
|
expires_at
|
|
|
|
FROM objects
|
|
|
|
` + asOfSystemTimeString + `
|
|
|
|
WHERE
|
|
|
|
(project_id, bucket_name, object_key, version) > ($1, $2, $3, $4)
|
|
|
|
AND expires_at < $5
|
|
|
|
ORDER BY project_id, bucket_name, object_key, version
|
|
|
|
LIMIT $6;`
|
|
|
|
|
|
|
|
expiredObjects := make([]ObjectStream, 0, batchsize)
|
|
|
|
|
|
|
|
err = withRows(db.db.QueryContext(ctx, query,
|
|
|
|
startAfter.ProjectID, []byte(startAfter.BucketName), []byte(startAfter.ObjectKey), startAfter.Version,
|
|
|
|
opts.ExpiredBefore,
|
|
|
|
batchsize),
|
|
|
|
)(func(rows tagsql.Rows) error {
|
|
|
|
for rows.Next() {
|
|
|
|
var expiresAt time.Time
|
|
|
|
err = rows.Scan(
|
|
|
|
&last.ProjectID, &last.BucketName, &last.ObjectKey, &last.Version, &last.StreamID,
|
|
|
|
&expiresAt)
|
|
|
|
if err != nil {
|
|
|
|
return Error.New("unable to delete expired objects: %w", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
db.log.Info("Deleting expired object",
|
|
|
|
zap.Stringer("Project", last.ProjectID),
|
|
|
|
zap.String("Bucket", last.BucketName),
|
|
|
|
zap.String("Object Key", string(last.ObjectKey)),
|
|
|
|
zap.Int64("Version", int64(last.Version)),
|
|
|
|
zap.String("StreamID", hex.EncodeToString(last.StreamID[:])),
|
|
|
|
zap.Time("Expired At", expiresAt),
|
|
|
|
)
|
|
|
|
expiredObjects = append(expiredObjects, last)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
})
|
2020-11-09 14:55:10 +00:00
|
|
|
if err != nil {
|
2021-04-29 10:21:29 +01:00
|
|
|
return ObjectStream{}, Error.New("unable to delete expired objects: %w", err)
|
2020-11-09 14:55:10 +00:00
|
|
|
}
|
2021-04-29 10:21:29 +01:00
|
|
|
|
|
|
|
err = db.deleteObjectsAndSegments(ctx, expiredObjects)
|
|
|
|
if err != nil {
|
|
|
|
return ObjectStream{}, err
|
2020-11-09 14:55:10 +00:00
|
|
|
}
|
2021-04-29 10:21:29 +01:00
|
|
|
|
|
|
|
return last, nil
|
|
|
|
})
|
2020-11-09 14:55:10 +00:00
|
|
|
}
|
|
|
|
|
2021-04-29 10:21:29 +01:00
|
|
|
// DeleteZombieObjects contains all the information necessary to delete zombie objects and segments.
|
|
|
|
type DeleteZombieObjects struct {
|
|
|
|
DeadlineBefore time.Time
|
|
|
|
AsOfSystemTime time.Time
|
|
|
|
BatchSize int
|
|
|
|
}
|
|
|
|
|
|
|
|
// DeleteZombieObjects deletes all objects that zombie deletion deadline passed.
|
|
|
|
func (db *DB) DeleteZombieObjects(ctx context.Context, opts DeleteZombieObjects) (err error) {
|
2020-11-09 14:55:10 +00:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
2021-03-16 11:54:40 +00:00
|
|
|
var asOfSystemTimeString string
|
2021-04-29 10:21:29 +01:00
|
|
|
if !opts.AsOfSystemTime.IsZero() && db.implementation == dbutil.Cockroach {
|
|
|
|
asOfSystemTimeString = fmt.Sprintf(` AS OF SYSTEM TIME '%d' `, opts.AsOfSystemTime.UnixNano())
|
2021-03-16 11:54:40 +00:00
|
|
|
}
|
2021-04-29 10:21:29 +01:00
|
|
|
|
|
|
|
return db.deleteObjectsAndSegmentsBatch(ctx, opts.BatchSize, func(startAfter ObjectStream, batchsize int) (last ObjectStream, err error) {
|
|
|
|
query := `
|
2021-04-16 10:14:27 +01:00
|
|
|
SELECT
|
2021-04-29 10:21:29 +01:00
|
|
|
project_id, bucket_name, object_key, version, stream_id
|
2021-03-16 11:54:40 +00:00
|
|
|
FROM objects
|
|
|
|
` + asOfSystemTimeString + `
|
2021-03-02 10:25:54 +00:00
|
|
|
WHERE
|
|
|
|
(project_id, bucket_name, object_key, version) > ($1, $2, $3, $4)
|
2021-04-29 10:21:29 +01:00
|
|
|
AND status = ` + pendingStatus + `
|
|
|
|
AND zombie_deletion_deadline < $5
|
2021-03-02 10:25:54 +00:00
|
|
|
ORDER BY project_id, bucket_name, object_key, version
|
2021-03-16 11:54:40 +00:00
|
|
|
LIMIT $6;`
|
|
|
|
|
2021-04-29 10:21:29 +01:00
|
|
|
objects := make([]ObjectStream, 0, batchsize)
|
|
|
|
|
|
|
|
err = withRows(db.db.QueryContext(ctx, query,
|
|
|
|
startAfter.ProjectID, []byte(startAfter.BucketName), []byte(startAfter.ObjectKey), startAfter.Version,
|
|
|
|
opts.DeadlineBefore,
|
|
|
|
batchsize),
|
|
|
|
)(func(rows tagsql.Rows) error {
|
|
|
|
for rows.Next() {
|
|
|
|
err = rows.Scan(&last.ProjectID, &last.BucketName, &last.ObjectKey, &last.Version, &last.StreamID)
|
|
|
|
if err != nil {
|
|
|
|
return Error.New("unable to delete zombie objects: %w", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
db.log.Info("Deleting zombie object",
|
|
|
|
zap.Stringer("Project", last.ProjectID),
|
|
|
|
zap.String("Bucket", last.BucketName),
|
|
|
|
zap.String("Object Key", string(last.ObjectKey)),
|
|
|
|
zap.Int64("Version", int64(last.Version)),
|
|
|
|
zap.String("StreamID", hex.EncodeToString(last.StreamID[:])),
|
|
|
|
)
|
|
|
|
objects = append(objects, last)
|
2020-11-09 14:55:10 +00:00
|
|
|
}
|
|
|
|
|
2021-04-29 10:21:29 +01:00
|
|
|
return nil
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return ObjectStream{}, Error.New("unable to delete zombie objects: %w", err)
|
2020-11-09 14:55:10 +00:00
|
|
|
}
|
|
|
|
|
2021-04-29 10:21:29 +01:00
|
|
|
err = db.deleteObjectsAndSegments(ctx, objects)
|
|
|
|
if err != nil {
|
|
|
|
return ObjectStream{}, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return last, nil
|
2020-11-09 14:55:10 +00:00
|
|
|
})
|
2021-04-29 10:21:29 +01:00
|
|
|
}
|
2021-03-16 11:54:40 +00:00
|
|
|
|
2021-04-29 10:21:29 +01:00
|
|
|
func (db *DB) deleteObjectsAndSegmentsBatch(ctx context.Context, batchsize int, deleteBatch func(startAfter ObjectStream, batchsize int) (last ObjectStream, err error)) (err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
2020-11-09 14:55:10 +00:00
|
|
|
|
2021-04-29 10:21:29 +01:00
|
|
|
bs := batchsize
|
|
|
|
if batchsize == 0 || batchsize > deleteBatchsizeLimit {
|
|
|
|
bs = deleteBatchsizeLimit
|
|
|
|
}
|
|
|
|
var startAfter ObjectStream
|
|
|
|
for {
|
|
|
|
lastDeleted, err := deleteBatch(startAfter, bs)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if lastDeleted.StreamID.IsZero() {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
startAfter = lastDeleted
|
|
|
|
}
|
2020-11-09 14:55:10 +00:00
|
|
|
}
|
|
|
|
|
2021-04-29 10:21:29 +01:00
|
|
|
func (db *DB) deleteObjectsAndSegments(ctx context.Context, objects []ObjectStream) (err error) {
|
2020-11-09 14:55:10 +00:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
2021-04-29 10:21:29 +01:00
|
|
|
if len(objects) == 0 {
|
2020-11-09 14:55:10 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-04-16 11:03:07 +01:00
|
|
|
conn, err := db.db.Conn(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return Error.New("unable to get the raw conn: %w", err)
|
2021-03-16 11:54:40 +00:00
|
|
|
}
|
2021-04-16 11:03:07 +01:00
|
|
|
defer func() { err = errs.Combine(err, conn.Close()) }()
|
2020-11-09 14:55:10 +00:00
|
|
|
|
2021-04-16 11:03:07 +01:00
|
|
|
err = conn.Raw(ctx, func(driverConn interface{}) (err error) {
|
|
|
|
defer func() {
|
|
|
|
if closable, ok := driverConn.(interface{ Close() error }); ok {
|
|
|
|
err = errs.Combine(err, closable.Close())
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
var pgxconn *pgx.Conn
|
|
|
|
switch conn := driverConn.(type) {
|
|
|
|
case interface{ StdlibConn() *stdlib.Conn }:
|
|
|
|
pgxconn = conn.StdlibConn().Conn()
|
|
|
|
case *stdlib.Conn:
|
|
|
|
pgxconn = conn.Conn()
|
|
|
|
default:
|
|
|
|
return Error.New("invalid raw conn driver %T", driverConn)
|
|
|
|
}
|
|
|
|
|
|
|
|
var batch pgx.Batch
|
2021-04-29 10:21:29 +01:00
|
|
|
for _, obj := range objects {
|
2021-04-16 11:03:07 +01:00
|
|
|
obj := obj
|
|
|
|
|
|
|
|
batch.Queue(`START TRANSACTION`)
|
|
|
|
batch.Queue(`
|
|
|
|
DELETE FROM objects
|
|
|
|
WHERE (project_id, bucket_name, object_key, version) = ($1::BYTEA, $2::BYTEA, $3::BYTEA, $4)
|
|
|
|
AND stream_id = $5::BYTEA
|
|
|
|
`, obj.ProjectID, []byte(obj.BucketName), []byte(obj.ObjectKey), obj.Version, obj.StreamID)
|
|
|
|
batch.Queue(`
|
|
|
|
DELETE FROM segments
|
|
|
|
WHERE segments.stream_id = $1::BYTEA
|
|
|
|
`, obj.StreamID)
|
|
|
|
batch.Queue(`COMMIT TRANSACTION`)
|
|
|
|
}
|
|
|
|
|
|
|
|
results := pgxconn.SendBatch(ctx, &batch)
|
|
|
|
defer func() { err = errs.Combine(err, results.Close()) }()
|
|
|
|
|
|
|
|
var errlist errs.Group
|
|
|
|
for i := 0; i < batch.Len(); i++ {
|
|
|
|
_, err := results.Exec()
|
|
|
|
errlist.Add(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return errlist.Err()
|
|
|
|
})
|
2020-11-09 14:55:10 +00:00
|
|
|
if err != nil {
|
2021-03-16 11:54:40 +00:00
|
|
|
return Error.New("unable to delete expired objects: %w", err)
|
2020-11-09 14:55:10 +00:00
|
|
|
}
|
2021-04-16 11:03:07 +01:00
|
|
|
|
2020-11-09 14:55:10 +00:00
|
|
|
return nil
|
|
|
|
}
|