satellite/metainfo/metabase: fix delete expiration
The cursor was not being used in the batch deletion. The stream ID was not being used while deleting, which could in rare circumstaces delete a newly uploaded object. Use the stream id in deletion, rather than passing that information from one query to another. Change-Id: I03271c6e72747e345dfb0bb70989f29e835efd8e
This commit is contained in:
parent
9cd17fd804
commit
b2be1f1629
@ -5,6 +5,7 @@ package metabase
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
@ -27,13 +28,6 @@ type DeleteExpiredObjects struct {
|
||||
BatchSize int
|
||||
}
|
||||
|
||||
type expiredObject struct {
|
||||
projectID uuid.UUID
|
||||
bucketName string
|
||||
objectKey ObjectKey
|
||||
version Version
|
||||
}
|
||||
|
||||
// DeleteExpiredObjects deletes all objects that expired before expiredBefore.
|
||||
func (db *DB) DeleteExpiredObjects(ctx context.Context, opts DeleteExpiredObjects) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
@ -42,7 +36,7 @@ func (db *DB) DeleteExpiredObjects(ctx context.Context, opts DeleteExpiredObject
|
||||
if opts.BatchSize == 0 || opts.BatchSize > expiredBatchsizeLimit {
|
||||
batchsize = expiredBatchsizeLimit
|
||||
}
|
||||
var startAfter Object
|
||||
var startAfter ObjectStream
|
||||
for {
|
||||
lastDeleted, err := db.deleteExpiredObjectsBatch(ctx, startAfter, opts.ExpiredBefore, opts.AsOfSystemTime, batchsize)
|
||||
if err != nil {
|
||||
@ -55,7 +49,7 @@ func (db *DB) DeleteExpiredObjects(ctx context.Context, opts DeleteExpiredObject
|
||||
}
|
||||
}
|
||||
|
||||
func (db *DB) deleteExpiredObjectsBatch(ctx context.Context, startAfter Object, expiredBefore time.Time, asOfSystemTime time.Time, batchsize int) (lastDeleted Object, err error) {
|
||||
func (db *DB) deleteExpiredObjectsBatch(ctx context.Context, startAfter ObjectStream, expiredBefore time.Time, asOfSystemTime time.Time, batchsize int) (last ObjectStream, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
var asOfSystemTimeString string
|
||||
@ -64,8 +58,8 @@ func (db *DB) deleteExpiredObjectsBatch(ctx context.Context, startAfter Object,
|
||||
asOfSystemTimeString = fmt.Sprintf(` AS OF SYSTEM TIME '%d' `, asOfSystemTime.Add(1*time.Second).UTC().UnixNano())
|
||||
}
|
||||
query := `
|
||||
SELECT project_id, bucket_name,
|
||||
object_key, version,
|
||||
SELECT
|
||||
project_id, bucket_name, object_key, version, stream_id,
|
||||
expires_at
|
||||
FROM objects
|
||||
` + asOfSystemTimeString + `
|
||||
@ -75,51 +69,48 @@ func (db *DB) deleteExpiredObjectsBatch(ctx context.Context, startAfter Object,
|
||||
ORDER BY project_id, bucket_name, object_key, version
|
||||
LIMIT $6;`
|
||||
|
||||
expiredObjects := make([]expiredObject, 0, batchsize)
|
||||
expiredObjects := make([]ObjectStream, 0, batchsize)
|
||||
|
||||
err = withRows(db.db.QueryContext(ctx, query, lastDeleted.ProjectID, []byte(lastDeleted.BucketName), []byte(lastDeleted.ObjectKey), lastDeleted.Version,
|
||||
err = withRows(db.db.QueryContext(ctx, query,
|
||||
startAfter.ProjectID, []byte(startAfter.BucketName), []byte(startAfter.ObjectKey), startAfter.Version,
|
||||
expiredBefore,
|
||||
batchsize),
|
||||
)(func(rows tagsql.Rows) error {
|
||||
for rows.Next() {
|
||||
err = rows.Scan(&lastDeleted.ProjectID, &lastDeleted.BucketName,
|
||||
&lastDeleted.ObjectKey, &lastDeleted.Version,
|
||||
&lastDeleted.ExpiresAt)
|
||||
var expiresAt time.Time
|
||||
err = rows.Scan(
|
||||
&last.ProjectID, &last.BucketName, &last.ObjectKey, &last.Version, &last.StreamID,
|
||||
&expiresAt)
|
||||
if err != nil {
|
||||
return Error.New("unable to delete expired objects: %w", err)
|
||||
}
|
||||
|
||||
db.log.Info("Deleting expired object",
|
||||
zap.Stringer("Project", lastDeleted.ProjectID),
|
||||
zap.String("Bucket", lastDeleted.BucketName),
|
||||
zap.String("Object Key", string(lastDeleted.ObjectKey)),
|
||||
zap.Int64("Version", int64(lastDeleted.Version)),
|
||||
zap.Time("Expired At", *lastDeleted.ExpiresAt),
|
||||
zap.Stringer("Project", last.ProjectID),
|
||||
zap.String("Bucket", last.BucketName),
|
||||
zap.String("Object Key", string(last.ObjectKey)),
|
||||
zap.Int64("Version", int64(last.Version)),
|
||||
zap.String("StreamID", hex.EncodeToString(last.StreamID[:])),
|
||||
zap.Time("Expired At", expiresAt),
|
||||
)
|
||||
expiredObjects = append(expiredObjects, expiredObject{
|
||||
lastDeleted.ProjectID, lastDeleted.BucketName, lastDeleted.ObjectKey, lastDeleted.Version,
|
||||
})
|
||||
expiredObjects = append(expiredObjects, last)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return Object{}, Error.New("unable to delete expired objects: %w", err)
|
||||
return ObjectStream{}, Error.New("unable to delete expired objects: %w", err)
|
||||
}
|
||||
|
||||
err = db.deleteExpiredObjects(ctx, expiredObjects)
|
||||
if err != nil {
|
||||
return Object{}, err
|
||||
return ObjectStream{}, err
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return Object{}, err
|
||||
return last, nil
|
||||
}
|
||||
|
||||
return lastDeleted, nil
|
||||
}
|
||||
|
||||
func (db *DB) deleteExpiredObjects(ctx context.Context, expiredObjects []expiredObject) (err error) {
|
||||
func (db *DB) deleteExpiredObjects(ctx context.Context, expiredObjects []ObjectStream) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if len(expiredObjects) == 0 {
|
||||
@ -130,28 +121,31 @@ func (db *DB) deleteExpiredObjects(ctx context.Context, expiredObjects []expired
|
||||
buckets := make([][]byte, len(expiredObjects))
|
||||
objectKeys := make([][]byte, len(expiredObjects))
|
||||
versions := make([]int32, len(expiredObjects))
|
||||
streamIDs := make([]uuid.UUID, len(expiredObjects))
|
||||
|
||||
for i, object := range expiredObjects {
|
||||
projectIds[i] = object.projectID
|
||||
buckets[i] = []byte(object.bucketName)
|
||||
objectKeys[i] = []byte(object.objectKey)
|
||||
versions[i] = int32(object.version)
|
||||
projectIds[i] = object.ProjectID
|
||||
buckets[i] = []byte(object.BucketName)
|
||||
objectKeys[i] = []byte(object.ObjectKey)
|
||||
versions[i] = int32(object.Version)
|
||||
streamIDs[i] = object.StreamID
|
||||
}
|
||||
query := `
|
||||
WITH deleted_objects AS (
|
||||
DELETE FROM objects
|
||||
WHERE
|
||||
(project_id, bucket_name, object_key, version) IN (
|
||||
(project_id, bucket_name, object_key, version, stream_id) IN (
|
||||
SELECT
|
||||
unnest($1::BYTEA[]),
|
||||
unnest($2::BYTEA[]),
|
||||
unnest($3::BYTEA[]),
|
||||
unnest($4::INT4[])
|
||||
unnest($4::INT4[]),
|
||||
unnest($5::BYTEA[])
|
||||
)
|
||||
RETURNING stream_id
|
||||
RETURNING 1
|
||||
)
|
||||
DELETE FROM segments
|
||||
WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
|
||||
WHERE segments.stream_id = ANY($5::BYTEA[])
|
||||
`
|
||||
_, err = db.db.ExecContext(ctx,
|
||||
query,
|
||||
@ -159,6 +153,7 @@ func (db *DB) deleteExpiredObjects(ctx context.Context, expiredObjects []expired
|
||||
pgutil.ByteaArray(buckets),
|
||||
pgutil.ByteaArray(objectKeys),
|
||||
pgutil.Int4Array(versions),
|
||||
pgutil.UUIDArray(streamIDs),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
|
@ -22,14 +22,18 @@ func TestDeleteExpiredObjects(t *testing.T) {
|
||||
pastTime := now.Add(-1 * time.Hour)
|
||||
futureTime := now.Add(1 * time.Hour)
|
||||
|
||||
t.Run("Empty metabase", func(t *testing.T) {
|
||||
t.Run("none", func(t *testing.T) {
|
||||
defer DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
DeleteExpiredObjects{}.Check(ctx, t, db)
|
||||
DeleteExpiredObjects{
|
||||
Opts: metabase.DeleteExpiredObjects{
|
||||
ExpiredBefore: time.Now(),
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
Verify{}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("Delete expired partial objects", func(t *testing.T) {
|
||||
t.Run("partial objects", func(t *testing.T) {
|
||||
defer DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
// pending object without expiration time
|
||||
@ -61,7 +65,11 @@ func TestDeleteExpiredObjects(t *testing.T) {
|
||||
Version: 1,
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
DeleteExpiredObjects{}.Check(ctx, t, db)
|
||||
DeleteExpiredObjects{
|
||||
Opts: metabase.DeleteExpiredObjects{
|
||||
ExpiredBefore: time.Now(),
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
Verify{ // the object with expiration time in the past is gone
|
||||
Objects: []metabase.RawObject{
|
||||
@ -84,7 +92,22 @@ func TestDeleteExpiredObjects(t *testing.T) {
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("Delete expired committed objects", func(t *testing.T) {
|
||||
t.Run("batch size", func(t *testing.T) {
|
||||
expiresAt := time.Now().Add(-30 * 24 * time.Hour)
|
||||
for i := 0; i < 32; i++ {
|
||||
_ = createExpiredObject(ctx, t, db, randObjectStream(), 3, expiresAt)
|
||||
}
|
||||
DeleteExpiredObjects{
|
||||
Opts: metabase.DeleteExpiredObjects{
|
||||
ExpiredBefore: time.Now().Add(time.Hour),
|
||||
BatchSize: 4,
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
Verify{}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("committed objects", func(t *testing.T) {
|
||||
defer DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
object1 := CreateTestObject{}.Run(ctx, t, db, obj1, 1)
|
||||
@ -119,7 +142,11 @@ func TestDeleteExpiredObjects(t *testing.T) {
|
||||
expectedObj3Segment := expectedObj1Segment
|
||||
expectedObj3Segment.StreamID = obj3.StreamID
|
||||
|
||||
DeleteExpiredObjects{}.Check(ctx, t, db)
|
||||
DeleteExpiredObjects{
|
||||
Opts: metabase.DeleteExpiredObjects{
|
||||
ExpiredBefore: time.Now(),
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
Verify{ // the object with expiration time in the past is gone
|
||||
Objects: []metabase.RawObject{
|
||||
|
@ -1309,3 +1309,52 @@ func createObject(ctx *testcontext.Context, t *testing.T, db *metabase.DB, obj m
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
}
|
||||
|
||||
func createExpiredObject(ctx *testcontext.Context, t *testing.T, db *metabase.DB, obj metabase.ObjectStream, numberOfSegments byte, expiresAt time.Time) metabase.Object {
|
||||
BeginObjectExactVersion{
|
||||
Opts: metabase.BeginObjectExactVersion{
|
||||
ObjectStream: obj,
|
||||
Encryption: defaultTestEncryption,
|
||||
ExpiresAt: &expiresAt,
|
||||
},
|
||||
Version: obj.Version,
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
for i := byte(0); i < numberOfSegments; i++ {
|
||||
BeginSegment{
|
||||
Opts: metabase.BeginSegment{
|
||||
ObjectStream: obj,
|
||||
Position: metabase.SegmentPosition{Part: 0, Index: uint32(i)},
|
||||
RootPieceID: storj.PieceID{i + 1},
|
||||
Pieces: []metabase.Piece{{
|
||||
Number: 1,
|
||||
StorageNode: testrand.NodeID(),
|
||||
}},
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
CommitSegment{
|
||||
Opts: metabase.CommitSegment{
|
||||
ObjectStream: obj,
|
||||
Position: metabase.SegmentPosition{Part: 0, Index: uint32(i)},
|
||||
RootPieceID: storj.PieceID{1},
|
||||
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
|
||||
|
||||
EncryptedKey: []byte{3},
|
||||
EncryptedKeyNonce: []byte{4},
|
||||
EncryptedETag: []byte{5},
|
||||
|
||||
EncryptedSize: 1024,
|
||||
PlainSize: 512,
|
||||
PlainOffset: 0,
|
||||
Redundancy: defaultTestRedundancy,
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
}
|
||||
|
||||
return CommitObject{
|
||||
Opts: metabase.CommitObject{
|
||||
ObjectStream: obj,
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
}
|
||||
|
@ -411,14 +411,14 @@ func (step DeleteObjectsAllVersions) Check(ctx *testcontext.Context, t testing.T
|
||||
}
|
||||
|
||||
type DeleteExpiredObjects struct {
|
||||
Opts metabase.DeleteExpiredObjects
|
||||
|
||||
ErrClass *errs.Class
|
||||
ErrText string
|
||||
}
|
||||
|
||||
func (step DeleteExpiredObjects) Check(ctx *testcontext.Context, t testing.TB, db *metabase.DB) {
|
||||
err := db.DeleteExpiredObjects(ctx, metabase.DeleteExpiredObjects{
|
||||
ExpiredBefore: time.Now(),
|
||||
})
|
||||
err := db.DeleteExpiredObjects(ctx, step.Opts)
|
||||
checkError(t, err, step.ErrClass, step.ErrText)
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user