satellite/{metainfo,metabase}: optimize expired/zombie objects deletion
* optimize SQL for zombie objects deletion query by reducing some direct selects to segments table * set AOST for expired/zombie object deletion (was 0 since now) https://github.com/storj/storj/issues/5881 Change-Id: I50482151d056a86fe0e31678a463f413d410759d
This commit is contained in:
parent
aea3baf6a9
commit
61933bc6f0
@ -22,9 +22,9 @@ const (
|
|||||||
|
|
||||||
// DeleteExpiredObjects contains all the information necessary to delete expired objects and segments.
|
// DeleteExpiredObjects contains all the information necessary to delete expired objects and segments.
|
||||||
type DeleteExpiredObjects struct {
|
type DeleteExpiredObjects struct {
|
||||||
ExpiredBefore time.Time
|
ExpiredBefore time.Time
|
||||||
AsOfSystemTime time.Time
|
AsOfSystemInterval time.Duration
|
||||||
BatchSize int
|
BatchSize int
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteExpiredObjects deletes all objects that expired before expiredBefore.
|
// DeleteExpiredObjects deletes all objects that expired before expiredBefore.
|
||||||
@ -37,7 +37,7 @@ func (db *DB) DeleteExpiredObjects(ctx context.Context, opts DeleteExpiredObject
|
|||||||
project_id, bucket_name, object_key, version, stream_id,
|
project_id, bucket_name, object_key, version, stream_id,
|
||||||
expires_at
|
expires_at
|
||||||
FROM objects
|
FROM objects
|
||||||
` + db.impl.AsOfSystemTime(opts.AsOfSystemTime) + `
|
` + db.impl.AsOfSystemInterval(opts.AsOfSystemInterval) + `
|
||||||
WHERE
|
WHERE
|
||||||
(project_id, bucket_name, object_key, version) > ($1, $2, $3, $4)
|
(project_id, bucket_name, object_key, version) > ($1, $2, $3, $4)
|
||||||
AND expires_at < $5
|
AND expires_at < $5
|
||||||
@ -95,10 +95,10 @@ func (db *DB) DeleteExpiredObjects(ctx context.Context, opts DeleteExpiredObject
|
|||||||
|
|
||||||
// DeleteZombieObjects contains all the information necessary to delete zombie objects and segments.
|
// DeleteZombieObjects contains all the information necessary to delete zombie objects and segments.
|
||||||
type DeleteZombieObjects struct {
|
type DeleteZombieObjects struct {
|
||||||
DeadlineBefore time.Time
|
DeadlineBefore time.Time
|
||||||
InactiveDeadline time.Time
|
InactiveDeadline time.Time
|
||||||
AsOfSystemTime time.Time
|
AsOfSystemInterval time.Duration
|
||||||
BatchSize int
|
BatchSize int
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteZombieObjects deletes all objects that zombie deletion deadline passed.
|
// DeleteZombieObjects deletes all objects that zombie deletion deadline passed.
|
||||||
@ -112,7 +112,7 @@ func (db *DB) DeleteZombieObjects(ctx context.Context, opts DeleteZombieObjects)
|
|||||||
SELECT
|
SELECT
|
||||||
project_id, bucket_name, object_key, version, stream_id
|
project_id, bucket_name, object_key, version, stream_id
|
||||||
FROM objects
|
FROM objects
|
||||||
` + db.impl.AsOfSystemTime(opts.AsOfSystemTime) + `
|
` + db.impl.AsOfSystemInterval(opts.AsOfSystemInterval) + `
|
||||||
WHERE
|
WHERE
|
||||||
(project_id, bucket_name, object_key, version) > ($1, $2, $3, $4)
|
(project_id, bucket_name, object_key, version) > ($1, $2, $3, $4)
|
||||||
AND status = ` + pendingStatus + `
|
AND status = ` + pendingStatus + `
|
||||||
@ -155,7 +155,7 @@ func (db *DB) DeleteZombieObjects(ctx context.Context, opts DeleteZombieObjects)
|
|||||||
return ObjectStream{}, nil
|
return ObjectStream{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err = db.deleteInactiveObjectsAndSegments(ctx, objects, opts.InactiveDeadline)
|
err = db.deleteInactiveObjectsAndSegments(ctx, objects, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
db.log.Warn("delete from DB zombie objects", zap.Error(err))
|
db.log.Warn("delete from DB zombie objects", zap.Error(err))
|
||||||
return ObjectStream{}, nil
|
return ObjectStream{}, nil
|
||||||
@ -237,7 +237,7 @@ func (db *DB) deleteObjectsAndSegments(ctx context.Context, objects []ObjectStre
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) deleteInactiveObjectsAndSegments(ctx context.Context, objects []ObjectStream, inactiveDeadline time.Time) (err error) {
|
func (db *DB) deleteInactiveObjectsAndSegments(ctx context.Context, objects []ObjectStream, opts DeleteZombieObjects) (err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
if len(objects) == 0 {
|
if len(objects) == 0 {
|
||||||
@ -248,24 +248,20 @@ func (db *DB) deleteInactiveObjectsAndSegments(ctx context.Context, objects []Ob
|
|||||||
var batch pgx.Batch
|
var batch pgx.Batch
|
||||||
for _, obj := range objects {
|
for _, obj := range objects {
|
||||||
batch.Queue(`
|
batch.Queue(`
|
||||||
WITH deleted_objects AS (
|
WITH check_segments AS (
|
||||||
|
SELECT 1 FROM segments WHERE stream_id = $5::BYTEA AND created_at > $6
|
||||||
|
), deleted_objects AS (
|
||||||
DELETE FROM objects
|
DELETE FROM objects
|
||||||
WHERE
|
WHERE
|
||||||
(project_id, bucket_name, object_key, version) = ($1::BYTEA, $2::BYTEA, $3::BYTEA, $4) AND
|
(project_id, bucket_name, object_key, version) = ($1::BYTEA, $2::BYTEA, $3::BYTEA, $4) AND
|
||||||
stream_id = $5::BYTEA AND (
|
NOT EXISTS (SELECT 1 FROM check_segments)
|
||||||
-- TODO figure out something more optimal
|
RETURNING stream_id
|
||||||
NOT EXISTS (SELECT stream_id FROM segments WHERE stream_id = $5::BYTEA)
|
|
||||||
OR
|
|
||||||
-- check that all segments where created before inactive time
|
|
||||||
NOT EXISTS (SELECT stream_id FROM segments WHERE stream_id = $5::BYTEA AND created_at > $6)
|
|
||||||
)
|
|
||||||
RETURNING version -- return anything
|
|
||||||
)
|
)
|
||||||
DELETE FROM segments
|
DELETE FROM segments
|
||||||
|
`+db.impl.AsOfSystemInterval(opts.AsOfSystemInterval)+`
|
||||||
WHERE
|
WHERE
|
||||||
segments.stream_id = $5::BYTEA AND
|
segments.stream_id IN (SELECT stream_id FROM deleted_objects)
|
||||||
NOT EXISTS (SELECT stream_id FROM segments WHERE stream_id = $5::BYTEA AND created_at > $6)
|
`, obj.ProjectID, []byte(obj.BucketName), []byte(obj.ObjectKey), obj.Version, obj.StreamID, opts.InactiveDeadline)
|
||||||
`, obj.ProjectID, []byte(obj.BucketName), []byte(obj.ObjectKey), obj.Version, obj.StreamID, inactiveDeadline)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
results := conn.SendBatch(ctx, &batch)
|
results := conn.SendBatch(ctx, &batch)
|
||||||
|
@ -23,10 +23,11 @@ var (
|
|||||||
|
|
||||||
// Config contains configurable values for zombie object cleanup.
|
// Config contains configurable values for zombie object cleanup.
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Interval time.Duration `help:"the time between each attempt to go through the db and clean up zombie objects" releaseDefault:"12h" devDefault:"10s"`
|
Interval time.Duration `help:"the time between each attempt to go through the db and clean up zombie objects" releaseDefault:"12h" devDefault:"10s"`
|
||||||
Enabled bool `help:"set if zombie object cleanup is enabled or not" default:"true"`
|
Enabled bool `help:"set if zombie object cleanup is enabled or not" default:"true"`
|
||||||
ListLimit int `help:"how many objects to query in a batch" default:"100"`
|
ListLimit int `help:"how many objects to query in a batch" default:"100"`
|
||||||
InactiveFor time.Duration `help:"after what time object will be deleted if there where no new upload activity" default:"24h"`
|
InactiveFor time.Duration `help:"after what time object will be deleted if there where no new upload activity" default:"24h"`
|
||||||
|
AsOfSystemInterval time.Duration `help:"as of system interval" releaseDefault:"-5m" devDefault:"-1us" testDefault:"-1us"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Chore implements the zombie objects cleanup chore.
|
// Chore implements the zombie objects cleanup chore.
|
||||||
@ -80,8 +81,9 @@ func (chore *Chore) deleteZombieObjects(ctx context.Context) (err error) {
|
|||||||
chore.log.Debug("deleting zombie objects")
|
chore.log.Debug("deleting zombie objects")
|
||||||
|
|
||||||
return chore.metabase.DeleteZombieObjects(ctx, metabase.DeleteZombieObjects{
|
return chore.metabase.DeleteZombieObjects(ctx, metabase.DeleteZombieObjects{
|
||||||
DeadlineBefore: chore.nowFn(),
|
DeadlineBefore: chore.nowFn(),
|
||||||
InactiveDeadline: chore.nowFn().Add(-chore.config.InactiveFor),
|
InactiveDeadline: chore.nowFn().Add(-chore.config.InactiveFor),
|
||||||
BatchSize: chore.config.ListLimit,
|
BatchSize: chore.config.ListLimit,
|
||||||
|
AsOfSystemInterval: chore.config.AsOfSystemInterval,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -26,8 +26,8 @@ func TestZombieDeletion(t *testing.T) {
|
|||||||
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
|
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
|
||||||
Reconfigure: testplanet.Reconfigure{
|
Reconfigure: testplanet.Reconfigure{
|
||||||
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
||||||
config.ZombieDeletion.Enabled = true
|
|
||||||
config.ZombieDeletion.Interval = 500 * time.Millisecond
|
config.ZombieDeletion.Interval = 500 * time.Millisecond
|
||||||
|
config.ZombieDeletion.AsOfSystemInterval = 0
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||||
@ -53,6 +53,7 @@ func TestZombieDeletion(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer ctx.Check(project.Close)
|
defer ctx.Check(project.Close)
|
||||||
|
|
||||||
|
// upload pending object with multipart upload but without segment
|
||||||
_, err = project.BeginUpload(ctx, "testbucket2", "zombie_object_multipart_no_segment", nil)
|
_, err = project.BeginUpload(ctx, "testbucket2", "zombie_object_multipart_no_segment", nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -23,9 +23,10 @@ var (
|
|||||||
|
|
||||||
// Config contains configurable values for expired segment cleanup.
|
// Config contains configurable values for expired segment cleanup.
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Interval time.Duration `help:"the time between each attempt to go through the db and clean up expired segments" releaseDefault:"24h" devDefault:"10s" testDefault:"$TESTINTERVAL"`
|
Interval time.Duration `help:"the time between each attempt to go through the db and clean up expired segments" releaseDefault:"24h" devDefault:"10s" testDefault:"$TESTINTERVAL"`
|
||||||
Enabled bool `help:"set if expired segment cleanup is enabled or not" releaseDefault:"true" devDefault:"true"`
|
Enabled bool `help:"set if expired segment cleanup is enabled or not" releaseDefault:"true" devDefault:"true"`
|
||||||
ListLimit int `help:"how many expired objects to query in a batch" default:"100"`
|
ListLimit int `help:"how many expired objects to query in a batch" default:"100"`
|
||||||
|
AsOfSystemInterval time.Duration `help:"as of system interval" releaseDefault:"-5m" devDefault:"-1us" testDefault:"-1us" hidden:"true"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Chore implements the expired segment cleanup chore.
|
// Chore implements the expired segment cleanup chore.
|
||||||
@ -81,8 +82,9 @@ func (chore *Chore) deleteExpiredObjects(ctx context.Context) (err error) {
|
|||||||
// TODO log error instead of crashing core until we will be sure
|
// TODO log error instead of crashing core until we will be sure
|
||||||
// that queries for deleting expired objects are stable
|
// that queries for deleting expired objects are stable
|
||||||
err = chore.metabase.DeleteExpiredObjects(ctx, metabase.DeleteExpiredObjects{
|
err = chore.metabase.DeleteExpiredObjects(ctx, metabase.DeleteExpiredObjects{
|
||||||
ExpiredBefore: chore.nowFn(),
|
ExpiredBefore: chore.nowFn(),
|
||||||
BatchSize: chore.config.ListLimit,
|
BatchSize: chore.config.ListLimit,
|
||||||
|
AsOfSystemInterval: chore.config.AsOfSystemInterval,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
chore.log.Error("deleting expired objects failed", zap.Error(err))
|
chore.log.Error("deleting expired objects failed", zap.Error(err))
|
||||||
|
3
scripts/testdata/satellite-config.yaml.lock
vendored
3
scripts/testdata/satellite-config.yaml.lock
vendored
@ -1171,6 +1171,9 @@ server.private-address: 127.0.0.1:7778
|
|||||||
# server address to check its version against
|
# server address to check its version against
|
||||||
# version.server-address: https://version.storj.io
|
# version.server-address: https://version.storj.io
|
||||||
|
|
||||||
|
# as of system interval
|
||||||
|
# zombie-deletion.as-of-system-interval: -5m0s
|
||||||
|
|
||||||
# set if zombie object cleanup is enabled or not
|
# set if zombie object cleanup is enabled or not
|
||||||
# zombie-deletion.enabled: true
|
# zombie-deletion.enabled: true
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user