diff --git a/satellite/metainfo/metabase/delete_expired.go b/satellite/metainfo/metabase/delete_expired.go index 337bbb2e9..d180b70ff 100644 --- a/satellite/metainfo/metabase/delete_expired.go +++ b/satellite/metainfo/metabase/delete_expired.go @@ -5,6 +5,7 @@ package metabase import ( "context" + "encoding/hex" "fmt" "time" @@ -27,13 +28,6 @@ type DeleteExpiredObjects struct { BatchSize int } -type expiredObject struct { - projectID uuid.UUID - bucketName string - objectKey ObjectKey - version Version -} - // DeleteExpiredObjects deletes all objects that expired before expiredBefore. func (db *DB) DeleteExpiredObjects(ctx context.Context, opts DeleteExpiredObjects) (err error) { defer mon.Task()(&ctx)(&err) @@ -42,7 +36,7 @@ func (db *DB) DeleteExpiredObjects(ctx context.Context, opts DeleteExpiredObject if opts.BatchSize == 0 || opts.BatchSize > expiredBatchsizeLimit { batchsize = expiredBatchsizeLimit } - var startAfter Object + var startAfter ObjectStream for { lastDeleted, err := db.deleteExpiredObjectsBatch(ctx, startAfter, opts.ExpiredBefore, opts.AsOfSystemTime, batchsize) if err != nil { @@ -55,7 +49,7 @@ func (db *DB) DeleteExpiredObjects(ctx context.Context, opts DeleteExpiredObject } } -func (db *DB) deleteExpiredObjectsBatch(ctx context.Context, startAfter Object, expiredBefore time.Time, asOfSystemTime time.Time, batchsize int) (lastDeleted Object, err error) { +func (db *DB) deleteExpiredObjectsBatch(ctx context.Context, startAfter ObjectStream, expiredBefore time.Time, asOfSystemTime time.Time, batchsize int) (last ObjectStream, err error) { defer mon.Task()(&ctx)(&err) var asOfSystemTimeString string @@ -64,9 +58,9 @@ func (db *DB) deleteExpiredObjectsBatch(ctx context.Context, startAfter Object, asOfSystemTimeString = fmt.Sprintf(` AS OF SYSTEM TIME '%d' `, asOfSystemTime.Add(1*time.Second).UTC().UnixNano()) } query := ` - SELECT project_id, bucket_name, - object_key, version, - expires_at + SELECT + project_id, bucket_name, object_key, version, stream_id, + expires_at FROM objects ` + asOfSystemTimeString + ` WHERE @@ -75,51 +69,48 @@ func (db *DB) deleteExpiredObjectsBatch(ctx context.Context, startAfter Object, ORDER BY project_id, bucket_name, object_key, version LIMIT $6;` - expiredObjects := make([]expiredObject, 0, batchsize) + expiredObjects := make([]ObjectStream, 0, batchsize) - err = withRows(db.db.QueryContext(ctx, query, lastDeleted.ProjectID, []byte(lastDeleted.BucketName), []byte(lastDeleted.ObjectKey), lastDeleted.Version, + err = withRows(db.db.QueryContext(ctx, query, + startAfter.ProjectID, []byte(startAfter.BucketName), []byte(startAfter.ObjectKey), startAfter.Version, expiredBefore, batchsize), )(func(rows tagsql.Rows) error { for rows.Next() { - err = rows.Scan(&lastDeleted.ProjectID, &lastDeleted.BucketName, - &lastDeleted.ObjectKey, &lastDeleted.Version, - &lastDeleted.ExpiresAt) + var expiresAt time.Time + err = rows.Scan( + &last.ProjectID, &last.BucketName, &last.ObjectKey, &last.Version, &last.StreamID, + &expiresAt) if err != nil { return Error.New("unable to delete expired objects: %w", err) } db.log.Info("Deleting expired object", - zap.Stringer("Project", lastDeleted.ProjectID), - zap.String("Bucket", lastDeleted.BucketName), - zap.String("Object Key", string(lastDeleted.ObjectKey)), - zap.Int64("Version", int64(lastDeleted.Version)), - zap.Time("Expired At", *lastDeleted.ExpiresAt), + zap.Stringer("Project", last.ProjectID), + zap.String("Bucket", last.BucketName), + zap.String("Object Key", string(last.ObjectKey)), + zap.Int64("Version", int64(last.Version)), + zap.String("StreamID", hex.EncodeToString(last.StreamID[:])), + zap.Time("Expired At", expiresAt), ) - expiredObjects = append(expiredObjects, expiredObject{ - lastDeleted.ProjectID, lastDeleted.BucketName, lastDeleted.ObjectKey, lastDeleted.Version, - }) + expiredObjects = append(expiredObjects, last) } return nil }) if err != nil { - return Object{}, Error.New("unable to delete expired objects: %w", err) + return ObjectStream{}, Error.New("unable to delete expired objects: %w", err) } err = db.deleteExpiredObjects(ctx, expiredObjects) if err != nil { - return Object{}, err + return ObjectStream{}, err } - if err != nil { - return Object{}, err - } - - return lastDeleted, nil + return last, nil } -func (db *DB) deleteExpiredObjects(ctx context.Context, expiredObjects []expiredObject) (err error) { +func (db *DB) deleteExpiredObjects(ctx context.Context, expiredObjects []ObjectStream) (err error) { defer mon.Task()(&ctx)(&err) if len(expiredObjects) == 0 { @@ -130,28 +121,31 @@ func (db *DB) deleteExpiredObjects(ctx context.Context, expiredObjects []expired buckets := make([][]byte, len(expiredObjects)) objectKeys := make([][]byte, len(expiredObjects)) versions := make([]int32, len(expiredObjects)) + streamIDs := make([]uuid.UUID, len(expiredObjects)) for i, object := range expiredObjects { - projectIds[i] = object.projectID - buckets[i] = []byte(object.bucketName) - objectKeys[i] = []byte(object.objectKey) - versions[i] = int32(object.version) + projectIds[i] = object.ProjectID + buckets[i] = []byte(object.BucketName) + objectKeys[i] = []byte(object.ObjectKey) + versions[i] = int32(object.Version) + streamIDs[i] = object.StreamID } query := ` WITH deleted_objects AS ( DELETE FROM objects WHERE - (project_id, bucket_name, object_key, version) IN ( + (project_id, bucket_name, object_key, version, stream_id) IN ( SELECT unnest($1::BYTEA[]), unnest($2::BYTEA[]), unnest($3::BYTEA[]), - unnest($4::INT4[]) + unnest($4::INT4[]), + unnest($5::BYTEA[]) ) - RETURNING stream_id + RETURNING 1 ) DELETE FROM segments - WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects) + WHERE segments.stream_id = ANY($5::BYTEA[]) ` _, err = db.db.ExecContext(ctx, query, @@ -159,6 +153,7 @@ func (db *DB) deleteExpiredObjects(ctx context.Context, expiredObjects []expired pgutil.ByteaArray(buckets), pgutil.ByteaArray(objectKeys), pgutil.Int4Array(versions), + pgutil.UUIDArray(streamIDs), ) if err != nil { diff --git a/satellite/metainfo/metabase/delete_expired_test.go b/satellite/metainfo/metabase/delete_expired_test.go index 68490434d..0c131e465 100644 --- a/satellite/metainfo/metabase/delete_expired_test.go +++ b/satellite/metainfo/metabase/delete_expired_test.go @@ -22,14 +22,18 @@ func TestDeleteExpiredObjects(t *testing.T) { pastTime := now.Add(-1 * time.Hour) futureTime := now.Add(1 * time.Hour) - t.Run("Empty metabase", func(t *testing.T) { + t.Run("none", func(t *testing.T) { defer DeleteAll{}.Check(ctx, t, db) - DeleteExpiredObjects{}.Check(ctx, t, db) + DeleteExpiredObjects{ + Opts: metabase.DeleteExpiredObjects{ + ExpiredBefore: time.Now(), + }, + }.Check(ctx, t, db) Verify{}.Check(ctx, t, db) }) - t.Run("Delete expired partial objects", func(t *testing.T) { + t.Run("partial objects", func(t *testing.T) { defer DeleteAll{}.Check(ctx, t, db) // pending object without expiration time @@ -61,7 +65,11 @@ func TestDeleteExpiredObjects(t *testing.T) { Version: 1, }.Check(ctx, t, db) - DeleteExpiredObjects{}.Check(ctx, t, db) + DeleteExpiredObjects{ + Opts: metabase.DeleteExpiredObjects{ + ExpiredBefore: time.Now(), + }, + }.Check(ctx, t, db) Verify{ // the object with expiration time in the past is gone Objects: []metabase.RawObject{ @@ -84,7 +92,22 @@ func TestDeleteExpiredObjects(t *testing.T) { }.Check(ctx, t, db) }) - t.Run("Delete expired committed objects", func(t *testing.T) { + t.Run("batch size", func(t *testing.T) { + expiresAt := time.Now().Add(-30 * 24 * time.Hour) + for i := 0; i < 32; i++ { + _ = createExpiredObject(ctx, t, db, randObjectStream(), 3, expiresAt) + } + DeleteExpiredObjects{ + Opts: metabase.DeleteExpiredObjects{ + ExpiredBefore: time.Now().Add(time.Hour), + BatchSize: 4, + }, + }.Check(ctx, t, db) + + Verify{}.Check(ctx, t, db) + }) + + t.Run("committed objects", func(t *testing.T) { defer DeleteAll{}.Check(ctx, t, db) object1 := CreateTestObject{}.Run(ctx, t, db, obj1, 1) @@ -119,7 +142,11 @@ func TestDeleteExpiredObjects(t *testing.T) { expectedObj3Segment := expectedObj1Segment expectedObj3Segment.StreamID = obj3.StreamID - DeleteExpiredObjects{}.Check(ctx, t, db) + DeleteExpiredObjects{ + Opts: metabase.DeleteExpiredObjects{ + ExpiredBefore: time.Now(), + }, + }.Check(ctx, t, db) Verify{ // the object with expiration time in the past is gone Objects: []metabase.RawObject{ diff --git a/satellite/metainfo/metabase/delete_test.go b/satellite/metainfo/metabase/delete_test.go index bea07c602..6d570e776 100644 --- a/satellite/metainfo/metabase/delete_test.go +++ b/satellite/metainfo/metabase/delete_test.go @@ -1309,3 +1309,52 @@ func createObject(ctx *testcontext.Context, t *testing.T, db *metabase.DB, obj m }, }.Check(ctx, t, db) } + +func createExpiredObject(ctx *testcontext.Context, t *testing.T, db *metabase.DB, obj metabase.ObjectStream, numberOfSegments byte, expiresAt time.Time) metabase.Object { + BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: obj, + Encryption: defaultTestEncryption, + ExpiresAt: &expiresAt, + }, + Version: obj.Version, + }.Check(ctx, t, db) + + for i := byte(0); i < numberOfSegments; i++ { + BeginSegment{ + Opts: metabase.BeginSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 0, Index: uint32(i)}, + RootPieceID: storj.PieceID{i + 1}, + Pieces: []metabase.Piece{{ + Number: 1, + StorageNode: testrand.NodeID(), + }}, + }, + }.Check(ctx, t, db) + + CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 0, Index: uint32(i)}, + RootPieceID: storj.PieceID{1}, + Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}}, + + EncryptedKey: []byte{3}, + EncryptedKeyNonce: []byte{4}, + EncryptedETag: []byte{5}, + + EncryptedSize: 1024, + PlainSize: 512, + PlainOffset: 0, + Redundancy: defaultTestRedundancy, + }, + }.Check(ctx, t, db) + } + + return CommitObject{ + Opts: metabase.CommitObject{ + ObjectStream: obj, + }, + }.Check(ctx, t, db) +} diff --git a/satellite/metainfo/metabase/test_test.go b/satellite/metainfo/metabase/test_test.go index 5240ccd16..f356f2c26 100644 --- a/satellite/metainfo/metabase/test_test.go +++ b/satellite/metainfo/metabase/test_test.go @@ -411,14 +411,14 @@ func (step DeleteObjectsAllVersions) Check(ctx *testcontext.Context, t testing.T } type DeleteExpiredObjects struct { + Opts metabase.DeleteExpiredObjects + ErrClass *errs.Class ErrText string } func (step DeleteExpiredObjects) Check(ctx *testcontext.Context, t testing.TB, db *metabase.DB) { - err := db.DeleteExpiredObjects(ctx, metabase.DeleteExpiredObjects{ - ExpiredBefore: time.Now(), - }) + err := db.DeleteExpiredObjects(ctx, step.Opts) checkError(t, err, step.ErrClass, step.ErrText) }