From df037564d7ef1db4be31c1147cf926ba04fd3b06 Mon Sep 17 00:00:00 2001 From: Michal Niewrzal Date: Wed, 9 Aug 2023 13:25:33 +0200 Subject: [PATCH] satellite/zombiedeletion: remove inactive uploads from pending_objects With zombie deletion chore we are removing inactive pending objects from objects table but new we need also to do this for pending_objects table. https://github.com/storj/storj/issues/6050 Change-Id: Ia29116c103673a1d9e10c2f16654022572210a8a --- satellite/metabase/delete_objects.go | 117 ++++++++ satellite/metabase/delete_objects_test.go | 308 +++++++++++++++++++++ satellite/metabase/metabasetest/test.go | 14 + satellite/metabase/zombiedeletion/chore.go | 11 +- 4 files changed, 448 insertions(+), 2 deletions(-) diff --git a/satellite/metabase/delete_objects.go b/satellite/metabase/delete_objects.go index de679ff0d..a5b9c25db 100644 --- a/satellite/metabase/delete_objects.go +++ b/satellite/metabase/delete_objects.go @@ -102,6 +102,7 @@ type DeleteZombieObjects struct { } // DeleteZombieObjects deletes all objects that zombie deletion deadline passed. +// TODO will be removed when objects table will be free from pending objects. func (db *DB) DeleteZombieObjects(ctx context.Context, opts DeleteZombieObjects) (err error) { defer mon.Task()(&ctx)(&err) @@ -289,3 +290,119 @@ func (db *DB) deleteInactiveObjectsAndSegments(ctx context.Context, objects []Ob return nil } + +// DeleteInactivePendingObjects deletes all pending objects that are inactive. Inactive means that zombie deletion deadline passed +// and no new segmets were uploaded after opts.InactiveDeadline. +func (db *DB) DeleteInactivePendingObjects(ctx context.Context, opts DeleteZombieObjects) (err error) { + defer mon.Task()(&ctx)(&err) + + return db.deleteObjectsAndSegmentsBatch(ctx, opts.BatchSize, func(startAfter ObjectStream, batchsize int) (last ObjectStream, err error) { + defer mon.Task()(&ctx)(&err) + + query := ` + SELECT + project_id, bucket_name, object_key, stream_id + FROM pending_objects + ` + db.impl.AsOfSystemInterval(opts.AsOfSystemInterval) + ` + WHERE + (project_id, bucket_name, object_key, stream_id) > ($1, $2, $3, $4) + AND (zombie_deletion_deadline IS NULL OR zombie_deletion_deadline < $5) + ORDER BY project_id, bucket_name, object_key, stream_id + LIMIT $6;` + + objects := make([]ObjectStream, 0, batchsize) + + scanErrClass := errs.Class("DB rows scan has failed") + err = withRows(db.db.QueryContext(ctx, query, + startAfter.ProjectID, []byte(startAfter.BucketName), []byte(startAfter.ObjectKey), startAfter.StreamID, + opts.DeadlineBefore, + batchsize), + )(func(rows tagsql.Rows) error { + for rows.Next() { + err = rows.Scan(&last.ProjectID, &last.BucketName, &last.ObjectKey, &last.StreamID) + if err != nil { + return scanErrClass.Wrap(err) + } + + db.log.Debug("selected zombie object for deleting it", + zap.Stringer("Project", last.ProjectID), + zap.String("Bucket", last.BucketName), + zap.String("Object Key", string(last.ObjectKey)), + zap.Stringer("StreamID", last.StreamID), + ) + objects = append(objects, last) + } + + return nil + }) + if err != nil { + if scanErrClass.Has(err) { + return ObjectStream{}, Error.New("unable to select zombie objects for deletion: %w", err) + } + + db.log.Warn("unable to select zombie objects for deletion", zap.Error(Error.Wrap(err))) + return ObjectStream{}, nil + } + + err = db.deleteInactiveObjectsAndSegmentsNew(ctx, objects, opts) + if err != nil { + db.log.Warn("delete from DB zombie objects", zap.Error(err)) + return ObjectStream{}, nil + } + + return last, nil + }) +} + +func (db *DB) deleteInactiveObjectsAndSegmentsNew(ctx context.Context, objects []ObjectStream, opts DeleteZombieObjects) (err error) { + defer mon.Task()(&ctx)(&err) + + if len(objects) == 0 { + return nil + } + + err = pgxutil.Conn(ctx, db.db, func(conn *pgx.Conn) error { + var batch pgx.Batch + for _, obj := range objects { + batch.Queue(` + WITH check_segments AS ( + SELECT 1 FROM segments + WHERE stream_id = $4::BYTEA AND created_at > $5 + ), deleted_objects AS ( + DELETE FROM pending_objects + WHERE + (project_id, bucket_name, object_key, stream_id) = ($1::BYTEA, $2::BYTEA, $3::BYTEA, $4) AND + NOT EXISTS (SELECT 1 FROM check_segments) + RETURNING stream_id + ) + DELETE FROM segments + WHERE segments.stream_id IN (SELECT stream_id FROM deleted_objects) + `, obj.ProjectID, []byte(obj.BucketName), []byte(obj.ObjectKey), obj.StreamID, opts.InactiveDeadline) + } + + results := conn.SendBatch(ctx, &batch) + defer func() { err = errs.Combine(err, results.Close()) }() + + var segmentsDeleted int64 + var errlist errs.Group + for i := 0; i < batch.Len(); i++ { + result, err := results.Exec() + errlist.Add(err) + + if err == nil { + segmentsDeleted += result.RowsAffected() + } + } + + // TODO calculate deleted objects + mon.Meter("zombie_segment_delete").Mark64(segmentsDeleted) + mon.Meter("segment_delete").Mark64(segmentsDeleted) + + return errlist.Err() + }) + if err != nil { + return Error.New("unable to delete zombie objects: %w", err) + } + + return nil +} diff --git a/satellite/metabase/delete_objects_test.go b/satellite/metabase/delete_objects_test.go index 3266a42af..0c5980a18 100644 --- a/satellite/metabase/delete_objects_test.go +++ b/satellite/metabase/delete_objects_test.go @@ -490,3 +490,311 @@ func TestDeleteZombieObjects(t *testing.T) { }) }) } + +func TestDeleteInactivePendingObjects(t *testing.T) { + metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + obj1 := metabasetest.RandObjectStream() + obj1.Version = metabase.NextVersion + obj2 := metabasetest.RandObjectStream() + obj2.Version = metabase.NextVersion + obj3 := metabasetest.RandObjectStream() + obj3.Version = metabase.NextVersion + + now := time.Now() + zombieDeadline := now.Add(24 * time.Hour) + pastTime := now.Add(-1 * time.Hour) + futureTime := now.Add(1 * time.Hour) + + t.Run("none", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + metabasetest.DeleteInactivePendingObjects{ + Opts: metabase.DeleteZombieObjects{ + DeadlineBefore: now, + }, + }.Check(ctx, t, db) + metabasetest.Verify{}.Check(ctx, t, db) + }) + + t.Run("partial objects", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + // zombie object with default deadline + metabasetest.BeginObjectNextVersion{ + Opts: metabase.BeginObjectNextVersion{ + ObjectStream: obj1, + Encryption: metabasetest.DefaultEncryption, + UsePendingObjectsTable: true, + }, + Version: 1, + }.Check(ctx, t, db) + + // zombie object with deadline time in the past + metabasetest.BeginObjectNextVersion{ + Opts: metabase.BeginObjectNextVersion{ + ObjectStream: obj2, + ZombieDeletionDeadline: &pastTime, + Encryption: metabasetest.DefaultEncryption, + UsePendingObjectsTable: true, + }, + Version: 1, + }.Check(ctx, t, db) + + // pending object with expiration time in the future + metabasetest.BeginObjectNextVersion{ + Opts: metabase.BeginObjectNextVersion{ + ObjectStream: obj3, + ZombieDeletionDeadline: &futureTime, + Encryption: metabasetest.DefaultEncryption, + UsePendingObjectsTable: true, + }, + Version: 1, + }.Check(ctx, t, db) + + metabasetest.DeleteInactivePendingObjects{ + Opts: metabase.DeleteZombieObjects{ + DeadlineBefore: now, + InactiveDeadline: now, + }, + }.Check(ctx, t, db) + + metabasetest.Verify{ // the object with zombie deadline time in the past is gone + PendingObjects: []metabase.RawPendingObject{ + { + PendingObjectStream: metabasetest.ObjectStreamToPending(obj1), + CreatedAt: now, + + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieDeadline, + }, + { + PendingObjectStream: metabasetest.ObjectStreamToPending(obj3), + CreatedAt: now, + + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &futureTime, + }, + }, + }.Check(ctx, t, db) + }) + + t.Run("partial object with segment", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + metabasetest.BeginObjectNextVersion{ + Opts: metabase.BeginObjectNextVersion{ + ObjectStream: obj1, + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &now, + UsePendingObjectsTable: true, + }, + Version: 1, + }.Check(ctx, t, db) + metabasetest.BeginSegment{ + Opts: metabase.BeginSegment{ + ObjectStream: obj1, + RootPieceID: storj.PieceID{1}, + Pieces: []metabase.Piece{{ + Number: 1, + StorageNode: testrand.NodeID(), + }}, + UsePendingObjectsTable: true, + }, + }.Check(ctx, t, db) + metabasetest.CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj1, + RootPieceID: storj.PieceID{1}, + Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}}, + + EncryptedKey: []byte{3}, + EncryptedKeyNonce: []byte{4}, + EncryptedETag: []byte{5}, + + EncryptedSize: 1024, + PlainSize: 512, + PlainOffset: 0, + Redundancy: metabasetest.DefaultRedundancy, + UsePendingObjectsTable: true, + }, + }.Check(ctx, t, db) + + // object will be checked if is inactive but inactive time is in future + metabasetest.DeleteInactivePendingObjects{ + Opts: metabase.DeleteZombieObjects{ + DeadlineBefore: now.Add(1 * time.Hour), + InactiveDeadline: now.Add(-1 * time.Hour), + }, + }.Check(ctx, t, db) + + metabasetest.Verify{ + PendingObjects: []metabase.RawPendingObject{ + { + PendingObjectStream: metabasetest.ObjectStreamToPending(obj1), + CreatedAt: now, + + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &now, + }, + }, + Segments: []metabase.RawSegment{ + { + StreamID: obj1.StreamID, + RootPieceID: storj.PieceID{1}, + Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}}, + CreatedAt: now, + + EncryptedKey: []byte{3}, + EncryptedKeyNonce: []byte{4}, + EncryptedETag: []byte{5}, + + EncryptedSize: 1024, + PlainSize: 512, + PlainOffset: 0, + Redundancy: metabasetest.DefaultRedundancy, + }, + }, + }.Check(ctx, t, db) + + // object will be checked if is inactive and will be deleted with segment + metabasetest.DeleteInactivePendingObjects{ + Opts: metabase.DeleteZombieObjects{ + DeadlineBefore: now.Add(1 * time.Hour), + InactiveDeadline: now.Add(2 * time.Hour), + AsOfSystemInterval: -1 * time.Microsecond, + }, + }.Check(ctx, t, db) + + metabasetest.Verify{}.Check(ctx, t, db) + }) + + t.Run("batch size", func(t *testing.T) { + for i := 0; i < 33; i++ { + obj := metabasetest.RandObjectStream() + obj.Version = metabase.NextVersion + + metabasetest.BeginObjectNextVersion{ + Opts: metabase.BeginObjectNextVersion{ + ObjectStream: obj, + Encryption: metabasetest.DefaultEncryption, + // use default 24h zombie deletion deadline + UsePendingObjectsTable: true, + }, + Version: 1, + }.Check(ctx, t, db) + + for i := byte(0); i < 3; i++ { + metabasetest.BeginSegment{ + Opts: metabase.BeginSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 0, Index: uint32(i)}, + RootPieceID: storj.PieceID{i + 1}, + Pieces: []metabase.Piece{{ + Number: 1, + StorageNode: testrand.NodeID(), + }}, + UsePendingObjectsTable: true, + }, + }.Check(ctx, t, db) + + metabasetest.CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 0, Index: uint32(i)}, + RootPieceID: storj.PieceID{1}, + Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}}, + + EncryptedKey: []byte{3}, + EncryptedKeyNonce: []byte{4}, + EncryptedETag: []byte{5}, + + EncryptedSize: 1024, + PlainSize: 512, + PlainOffset: 0, + Redundancy: metabasetest.DefaultRedundancy, + UsePendingObjectsTable: true, + }, + }.Check(ctx, t, db) + } + } + + metabasetest.DeleteInactivePendingObjects{ + Opts: metabase.DeleteZombieObjects{ + DeadlineBefore: now.Add(25 * time.Hour), + InactiveDeadline: now.Add(48 * time.Hour), + BatchSize: 4, + }, + }.Check(ctx, t, db) + + metabasetest.Verify{}.Check(ctx, t, db) + }) + + t.Run("committed objects", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + obj1 := obj1 + obj1.Version = metabase.DefaultVersion + object1, _ := metabasetest.CreateTestObject{}.Run(ctx, t, db, obj1, 1) + + obj2 := obj2 + obj2.Version = metabase.DefaultVersion + object2 := object1 + object2.ObjectStream = obj2 + metabasetest.CreateTestObject{ + BeginObjectExactVersion: &metabase.BeginObjectExactVersion{ + ObjectStream: object2.ObjectStream, + ZombieDeletionDeadline: &pastTime, + Encryption: metabasetest.DefaultEncryption, + }, + }.Run(ctx, t, db, object2.ObjectStream, 1) + + obj3 := obj3 + obj3.Version = metabase.DefaultVersion + object3, _ := metabasetest.CreateTestObject{ + BeginObjectExactVersion: &metabase.BeginObjectExactVersion{ + ObjectStream: obj3, + ZombieDeletionDeadline: &futureTime, + Encryption: metabasetest.DefaultEncryption, + }, + }.Run(ctx, t, db, obj3, 1) + + expectedObj1Segment := metabase.Segment{ + StreamID: obj1.StreamID, + RootPieceID: storj.PieceID{1}, + CreatedAt: now, + EncryptedKey: []byte{3}, + EncryptedKeyNonce: []byte{4}, + EncryptedETag: []byte{5}, + EncryptedSize: 1060, + PlainSize: 512, + Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}}, + Redundancy: metabasetest.DefaultRedundancy, + } + + expectedObj2Segment := expectedObj1Segment + expectedObj2Segment.StreamID = object2.StreamID + expectedObj3Segment := expectedObj1Segment + expectedObj3Segment.StreamID = object3.StreamID + + metabasetest.DeleteInactivePendingObjects{ + Opts: metabase.DeleteZombieObjects{ + DeadlineBefore: now, + InactiveDeadline: now.Add(1 * time.Hour), + }, + }.Check(ctx, t, db) + + metabasetest.Verify{ // all committed objects should NOT be deleted + Objects: []metabase.RawObject{ + metabase.RawObject(object1), + metabase.RawObject(object2), + metabase.RawObject(object3), + }, + Segments: []metabase.RawSegment{ + metabase.RawSegment(expectedObj1Segment), + metabase.RawSegment(expectedObj2Segment), + metabase.RawSegment(expectedObj3Segment), + }, + }.Check(ctx, t, db) + }) + }) +} diff --git a/satellite/metabase/metabasetest/test.go b/satellite/metabase/metabasetest/test.go index 0d77cf5ed..354b34db8 100644 --- a/satellite/metabase/metabasetest/test.go +++ b/satellite/metabase/metabasetest/test.go @@ -516,6 +516,20 @@ func (step DeleteZombieObjects) Check(ctx *testcontext.Context, t testing.TB, db checkError(t, err, step.ErrClass, step.ErrText) } +// DeleteInactivePendingObjects is for testing metabase.DeleteInactivePendingObjects. +type DeleteInactivePendingObjects struct { + Opts metabase.DeleteZombieObjects + + ErrClass *errs.Class + ErrText string +} + +// Check runs the test. +func (step DeleteInactivePendingObjects) Check(ctx *testcontext.Context, t testing.TB, db *metabase.DB) { + err := db.DeleteInactivePendingObjects(ctx, step.Opts) + checkError(t, err, step.ErrClass, step.ErrText) +} + // IterateCollector is for testing metabase.IterateCollector. type IterateCollector []metabase.ObjectEntry diff --git a/satellite/metabase/zombiedeletion/chore.go b/satellite/metabase/zombiedeletion/chore.go index 5be6bac5f..523e44e51 100644 --- a/satellite/metabase/zombiedeletion/chore.go +++ b/satellite/metabase/zombiedeletion/chore.go @@ -80,10 +80,17 @@ func (chore *Chore) deleteZombieObjects(ctx context.Context) (err error) { defer mon.Task()(&ctx)(&err) chore.log.Debug("deleting zombie objects") - return chore.metabase.DeleteZombieObjects(ctx, metabase.DeleteZombieObjects{ + opts := metabase.DeleteZombieObjects{ DeadlineBefore: chore.nowFn(), InactiveDeadline: chore.nowFn().Add(-chore.config.InactiveFor), BatchSize: chore.config.ListLimit, AsOfSystemInterval: chore.config.AsOfSystemInterval, - }) + } + + err = chore.metabase.DeleteZombieObjects(ctx, opts) + if err != nil { + return err + } + + return chore.metabase.DeleteInactivePendingObjects(ctx, opts) }