diff --git a/satellite/metabase/delete_objects.go b/satellite/metabase/delete_objects.go index 7bba50d99..e963b4d49 100644 --- a/satellite/metabase/delete_objects.go +++ b/satellite/metabase/delete_objects.go @@ -22,9 +22,9 @@ const ( // DeleteExpiredObjects contains all the information necessary to delete expired objects and segments. type DeleteExpiredObjects struct { - ExpiredBefore time.Time - AsOfSystemTime time.Time - BatchSize int + ExpiredBefore time.Time + AsOfSystemInterval time.Duration + BatchSize int } // DeleteExpiredObjects deletes all objects that expired before expiredBefore. @@ -37,7 +37,7 @@ func (db *DB) DeleteExpiredObjects(ctx context.Context, opts DeleteExpiredObject project_id, bucket_name, object_key, version, stream_id, expires_at FROM objects - ` + db.impl.AsOfSystemTime(opts.AsOfSystemTime) + ` + ` + db.impl.AsOfSystemInterval(opts.AsOfSystemInterval) + ` WHERE (project_id, bucket_name, object_key, version) > ($1, $2, $3, $4) AND expires_at < $5 @@ -95,10 +95,10 @@ func (db *DB) DeleteExpiredObjects(ctx context.Context, opts DeleteExpiredObject // DeleteZombieObjects contains all the information necessary to delete zombie objects and segments. type DeleteZombieObjects struct { - DeadlineBefore time.Time - InactiveDeadline time.Time - AsOfSystemTime time.Time - BatchSize int + DeadlineBefore time.Time + InactiveDeadline time.Time + AsOfSystemInterval time.Duration + BatchSize int } // DeleteZombieObjects deletes all objects that zombie deletion deadline passed. @@ -112,7 +112,7 @@ func (db *DB) DeleteZombieObjects(ctx context.Context, opts DeleteZombieObjects) SELECT project_id, bucket_name, object_key, version, stream_id FROM objects - ` + db.impl.AsOfSystemTime(opts.AsOfSystemTime) + ` + ` + db.impl.AsOfSystemInterval(opts.AsOfSystemInterval) + ` WHERE (project_id, bucket_name, object_key, version) > ($1, $2, $3, $4) AND status = ` + pendingStatus + ` @@ -155,7 +155,7 @@ func (db *DB) DeleteZombieObjects(ctx context.Context, opts DeleteZombieObjects) return ObjectStream{}, nil } - err = db.deleteInactiveObjectsAndSegments(ctx, objects, opts.InactiveDeadline) + err = db.deleteInactiveObjectsAndSegments(ctx, objects, opts) if err != nil { db.log.Warn("delete from DB zombie objects", zap.Error(err)) return ObjectStream{}, nil @@ -237,7 +237,7 @@ func (db *DB) deleteObjectsAndSegments(ctx context.Context, objects []ObjectStre return nil } -func (db *DB) deleteInactiveObjectsAndSegments(ctx context.Context, objects []ObjectStream, inactiveDeadline time.Time) (err error) { +func (db *DB) deleteInactiveObjectsAndSegments(ctx context.Context, objects []ObjectStream, opts DeleteZombieObjects) (err error) { defer mon.Task()(&ctx)(&err) if len(objects) == 0 { @@ -248,24 +248,20 @@ func (db *DB) deleteInactiveObjectsAndSegments(ctx context.Context, objects []Ob var batch pgx.Batch for _, obj := range objects { batch.Queue(` - WITH deleted_objects AS ( + WITH check_segments AS ( + SELECT 1 FROM segments WHERE stream_id = $5::BYTEA AND created_at > $6 + ), deleted_objects AS ( DELETE FROM objects WHERE (project_id, bucket_name, object_key, version) = ($1::BYTEA, $2::BYTEA, $3::BYTEA, $4) AND - stream_id = $5::BYTEA AND ( - -- TODO figure out something more optimal - NOT EXISTS (SELECT stream_id FROM segments WHERE stream_id = $5::BYTEA) - OR - -- check that all segments where created before inactive time - NOT EXISTS (SELECT stream_id FROM segments WHERE stream_id = $5::BYTEA AND created_at > $6) - ) - RETURNING version -- return anything + NOT EXISTS (SELECT 1 FROM check_segments) + RETURNING stream_id ) DELETE FROM segments + `+db.impl.AsOfSystemInterval(opts.AsOfSystemInterval)+` WHERE - segments.stream_id = $5::BYTEA AND - NOT EXISTS (SELECT stream_id FROM segments WHERE stream_id = $5::BYTEA AND created_at > $6) - `, obj.ProjectID, []byte(obj.BucketName), []byte(obj.ObjectKey), obj.Version, obj.StreamID, inactiveDeadline) + segments.stream_id IN (SELECT stream_id FROM deleted_objects) + `, obj.ProjectID, []byte(obj.BucketName), []byte(obj.ObjectKey), obj.Version, obj.StreamID, opts.InactiveDeadline) } results := conn.SendBatch(ctx, &batch) diff --git a/satellite/metabase/zombiedeletion/chore.go b/satellite/metabase/zombiedeletion/chore.go index 329560932..5be6bac5f 100644 --- a/satellite/metabase/zombiedeletion/chore.go +++ b/satellite/metabase/zombiedeletion/chore.go @@ -23,10 +23,11 @@ var ( // Config contains configurable values for zombie object cleanup. type Config struct { - Interval time.Duration `help:"the time between each attempt to go through the db and clean up zombie objects" releaseDefault:"12h" devDefault:"10s"` - Enabled bool `help:"set if zombie object cleanup is enabled or not" default:"true"` - ListLimit int `help:"how many objects to query in a batch" default:"100"` - InactiveFor time.Duration `help:"after what time object will be deleted if there where no new upload activity" default:"24h"` + Interval time.Duration `help:"the time between each attempt to go through the db and clean up zombie objects" releaseDefault:"12h" devDefault:"10s"` + Enabled bool `help:"set if zombie object cleanup is enabled or not" default:"true"` + ListLimit int `help:"how many objects to query in a batch" default:"100"` + InactiveFor time.Duration `help:"after what time object will be deleted if there where no new upload activity" default:"24h"` + AsOfSystemInterval time.Duration `help:"as of system interval" releaseDefault:"-5m" devDefault:"-1us" testDefault:"-1us"` } // Chore implements the zombie objects cleanup chore. @@ -80,8 +81,9 @@ func (chore *Chore) deleteZombieObjects(ctx context.Context) (err error) { chore.log.Debug("deleting zombie objects") return chore.metabase.DeleteZombieObjects(ctx, metabase.DeleteZombieObjects{ - DeadlineBefore: chore.nowFn(), - InactiveDeadline: chore.nowFn().Add(-chore.config.InactiveFor), - BatchSize: chore.config.ListLimit, + DeadlineBefore: chore.nowFn(), + InactiveDeadline: chore.nowFn().Add(-chore.config.InactiveFor), + BatchSize: chore.config.ListLimit, + AsOfSystemInterval: chore.config.AsOfSystemInterval, }) } diff --git a/satellite/metabase/zombiedeletion/zombiedeletion_test.go b/satellite/metabase/zombiedeletion/zombiedeletion_test.go index 2512ab85a..47fb88c59 100644 --- a/satellite/metabase/zombiedeletion/zombiedeletion_test.go +++ b/satellite/metabase/zombiedeletion/zombiedeletion_test.go @@ -26,8 +26,8 @@ func TestZombieDeletion(t *testing.T) { SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ Satellite: func(log *zap.Logger, index int, config *satellite.Config) { - config.ZombieDeletion.Enabled = true config.ZombieDeletion.Interval = 500 * time.Millisecond + config.ZombieDeletion.AsOfSystemInterval = 0 }, }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { @@ -53,6 +53,7 @@ func TestZombieDeletion(t *testing.T) { require.NoError(t, err) defer ctx.Check(project.Close) + // upload pending object with multipart upload but without segment _, err = project.BeginUpload(ctx, "testbucket2", "zombie_object_multipart_no_segment", nil) require.NoError(t, err) diff --git a/satellite/metainfo/expireddeletion/chore.go b/satellite/metainfo/expireddeletion/chore.go index 045369a88..e41c4f05d 100644 --- a/satellite/metainfo/expireddeletion/chore.go +++ b/satellite/metainfo/expireddeletion/chore.go @@ -23,9 +23,10 @@ var ( // Config contains configurable values for expired segment cleanup. type Config struct { - Interval time.Duration `help:"the time between each attempt to go through the db and clean up expired segments" releaseDefault:"24h" devDefault:"10s" testDefault:"$TESTINTERVAL"` - Enabled bool `help:"set if expired segment cleanup is enabled or not" releaseDefault:"true" devDefault:"true"` - ListLimit int `help:"how many expired objects to query in a batch" default:"100"` + Interval time.Duration `help:"the time between each attempt to go through the db and clean up expired segments" releaseDefault:"24h" devDefault:"10s" testDefault:"$TESTINTERVAL"` + Enabled bool `help:"set if expired segment cleanup is enabled or not" releaseDefault:"true" devDefault:"true"` + ListLimit int `help:"how many expired objects to query in a batch" default:"100"` + AsOfSystemInterval time.Duration `help:"as of system interval" releaseDefault:"-5m" devDefault:"-1us" testDefault:"-1us" hidden:"true"` } // Chore implements the expired segment cleanup chore. @@ -81,8 +82,9 @@ func (chore *Chore) deleteExpiredObjects(ctx context.Context) (err error) { // TODO log error instead of crashing core until we will be sure // that queries for deleting expired objects are stable err = chore.metabase.DeleteExpiredObjects(ctx, metabase.DeleteExpiredObjects{ - ExpiredBefore: chore.nowFn(), - BatchSize: chore.config.ListLimit, + ExpiredBefore: chore.nowFn(), + BatchSize: chore.config.ListLimit, + AsOfSystemInterval: chore.config.AsOfSystemInterval, }) if err != nil { chore.log.Error("deleting expired objects failed", zap.Error(err)) diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index 1c63288c9..0439c1052 100755 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -1171,6 +1171,9 @@ server.private-address: 127.0.0.1:7778 # server address to check its version against # version.server-address: https://version.storj.io +# as of system interval +# zombie-deletion.as-of-system-interval: -5m0s + # set if zombie object cleanup is enabled or not # zombie-deletion.enabled: true