satellite/console/dbcleanup: create console DB cleanup chore
A chore responsible for purging data from the console DB has been implemented. Currently, it removes old records for unverified user accounts. We plan to extend this functionality to include expired project member invitations in the future. Resolves #5790 References #5816 Change-Id: I1f3ef62fc96c10a42a383804b3b1d2846d7813f7
This commit is contained in:
parent
2bb636684e
commit
f61230a670
@ -62,7 +62,7 @@ func TestProcess(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
err = service.ProcessRange(ctx, uuid.UUID{}, maxUUID)
|
||||
err = service.ProcessRange(ctx, uuid.UUID{}, uuid.Max())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, service.Close())
|
||||
|
@ -26,8 +26,6 @@ import (
|
||||
"storj.io/storj/satellite/overlay"
|
||||
)
|
||||
|
||||
var maxUUID = uuid.UUID{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}
|
||||
|
||||
func TestService_EmptyRange(t *testing.T) {
|
||||
ctx := testcontext.New(t)
|
||||
log := testplanet.NewLogger(t)
|
||||
@ -48,7 +46,7 @@ func TestService_EmptyRange(t *testing.T) {
|
||||
|
||||
defer ctx.Check(service.Close)
|
||||
|
||||
err = service.ProcessRange(ctx, uuid.UUID{}, maxUUID)
|
||||
err = service.ProcessRange(ctx, uuid.UUID{}, uuid.Max())
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
@ -271,7 +269,7 @@ func TestService_Failures(t *testing.T) {
|
||||
|
||||
defer ctx.Check(service.Close)
|
||||
|
||||
err = service.ProcessRange(ctx, uuid.UUID{}, maxUUID)
|
||||
err = service.ProcessRange(ctx, uuid.UUID{}, uuid.Max())
|
||||
require.NoError(t, err)
|
||||
|
||||
for node, list := range verifier.processed {
|
||||
|
65
satellite/console/dbcleanup/chore.go
Normal file
65
satellite/console/dbcleanup/chore.go
Normal file
@ -0,0 +1,65 @@
|
||||
// Copyright (C) 2023 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package dbcleanup
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/spacemonkeygo/monkit/v3"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/sync2"
|
||||
"storj.io/storj/satellite/console"
|
||||
)
|
||||
|
||||
var mon = monkit.Package()
|
||||
|
||||
// Config contains the configuration for the console DB cleanup chore.
|
||||
type Config struct {
|
||||
Enabled bool `help:"whether to run this chore" default:"false"`
|
||||
Interval time.Duration `help:"interval between chore cycles" default:"24h"`
|
||||
|
||||
AsOfSystemTimeInterval time.Duration `help:"interval for 'AS OF SYSTEM TIME' clause (CockroachDB specific) to read from the DB at a specific time in the past" default:"-5m" testDefault:"0"`
|
||||
PageSize int `help:"maximum number of database records to scan at once" default:"1000"`
|
||||
|
||||
MaxUnverifiedUserAge time.Duration `help:"maximum lifetime of unverified user account records" default:"168h"`
|
||||
}
|
||||
|
||||
// Chore periodically removes unwanted records from the satellite console database.
|
||||
type Chore struct {
|
||||
log *zap.Logger
|
||||
db console.DB
|
||||
Loop *sync2.Cycle
|
||||
config Config
|
||||
}
|
||||
|
||||
// NewChore creates a new console DB cleanup chore.
|
||||
func NewChore(log *zap.Logger, db console.DB, config Config) *Chore {
|
||||
return &Chore{
|
||||
log: log,
|
||||
db: db,
|
||||
config: config,
|
||||
Loop: sync2.NewCycle(config.Interval),
|
||||
}
|
||||
}
|
||||
|
||||
// Run runs the console DB cleanup chore.
|
||||
func (chore *Chore) Run(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
return chore.Loop.Run(ctx, func(ctx context.Context) error {
|
||||
before := time.Now().Add(-chore.config.MaxUnverifiedUserAge)
|
||||
err := chore.db.Users().DeleteUnverifiedBefore(ctx, before, chore.config.AsOfSystemTimeInterval, chore.config.PageSize)
|
||||
if err != nil {
|
||||
chore.log.Error("Error deleting unverified users", zap.Error(err))
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// Close stops the console DB cleanup chore.
|
||||
func (chore *Chore) Close() error {
|
||||
chore.Loop.Stop()
|
||||
return nil
|
||||
}
|
@ -33,8 +33,10 @@ type Users interface {
|
||||
GetByEmail(ctx context.Context, email string) (*User, error)
|
||||
// Insert is a method for inserting user into the database.
|
||||
Insert(ctx context.Context, user *User) (*User, error)
|
||||
// Delete is a method for deleting user by Id from the database.
|
||||
// Delete is a method for deleting user by ID from the database.
|
||||
Delete(ctx context.Context, id uuid.UUID) error
|
||||
// DeleteUnverifiedBefore deletes unverified users created prior to some time from the database.
|
||||
DeleteUnverifiedBefore(ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration, pageSize int) error
|
||||
// Update is a method for updating user entity.
|
||||
Update(ctx context.Context, userID uuid.UUID, request UpdateUserRequest) error
|
||||
// UpdatePaidTier sets whether the user is in the paid tier.
|
||||
|
@ -32,6 +32,7 @@ import (
|
||||
"storj.io/storj/satellite/audit"
|
||||
"storj.io/storj/satellite/console"
|
||||
"storj.io/storj/satellite/console/consoleauth"
|
||||
"storj.io/storj/satellite/console/dbcleanup"
|
||||
"storj.io/storj/satellite/console/emailreminders"
|
||||
"storj.io/storj/satellite/mailservice"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
@ -140,6 +141,10 @@ type Core struct {
|
||||
StorjscanService *storjscan.Service
|
||||
StorjscanChore *storjscan.Chore
|
||||
}
|
||||
|
||||
ConsoleDBCleanup struct {
|
||||
Chore *dbcleanup.Chore
|
||||
}
|
||||
}
|
||||
|
||||
// New creates a new satellite.
|
||||
@ -568,6 +573,21 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
|
||||
}
|
||||
}
|
||||
|
||||
// setup console DB cleanup service
|
||||
if config.ConsoleDBCleanup.Enabled {
|
||||
peer.ConsoleDBCleanup.Chore = dbcleanup.NewChore(
|
||||
peer.Log.Named("console.dbcleanup:chore"),
|
||||
peer.DB.Console(),
|
||||
config.ConsoleDBCleanup,
|
||||
)
|
||||
|
||||
peer.Services.Add(lifecycle.Item{
|
||||
Name: "dbcleanup:chore",
|
||||
Run: peer.ConsoleDBCleanup.Chore.Run,
|
||||
Close: peer.ConsoleDBCleanup.Chore.Close,
|
||||
})
|
||||
}
|
||||
|
||||
return peer, nil
|
||||
}
|
||||
|
||||
|
@ -274,10 +274,7 @@ func (db *DB) IterateLoopSegments(ctx context.Context, opts IterateLoopSegments,
|
||||
it.cursor.StartPosition = SegmentPosition{math.MaxInt32, math.MaxInt32}
|
||||
}
|
||||
if it.cursor.EndStreamID.IsZero() {
|
||||
it.cursor.EndStreamID, err = maxUUID()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
it.cursor.EndStreamID = uuid.Max()
|
||||
}
|
||||
|
||||
loopIteratorBatchSizeLimit.Ensure(&it.batchSize)
|
||||
@ -410,11 +407,6 @@ func (it *loopSegmentIterator) scanItem(ctx context.Context, item *LoopSegmentEn
|
||||
return nil
|
||||
}
|
||||
|
||||
func maxUUID() (uuid.UUID, error) {
|
||||
maxUUID, err := uuid.FromString("ffffffff-ffff-ffff-ffff-ffffffffffff")
|
||||
return maxUUID, err
|
||||
}
|
||||
|
||||
// BucketTally contains information about aggregate data stored in a bucket.
|
||||
type BucketTally struct {
|
||||
BucketLocation
|
||||
|
@ -37,6 +37,7 @@ import (
|
||||
"storj.io/storj/satellite/console"
|
||||
"storj.io/storj/satellite/console/consoleauth"
|
||||
"storj.io/storj/satellite/console/consoleweb"
|
||||
"storj.io/storj/satellite/console/dbcleanup"
|
||||
"storj.io/storj/satellite/console/emailreminders"
|
||||
"storj.io/storj/satellite/console/restkeys"
|
||||
"storj.io/storj/satellite/console/userinfo"
|
||||
@ -201,6 +202,7 @@ type Config struct {
|
||||
Console consoleweb.Config
|
||||
ConsoleAuth consoleauth.Config
|
||||
EmailReminders emailreminders.Config
|
||||
ConsoleDBCleanup dbcleanup.Config
|
||||
|
||||
AccountFreeze accountfreeze.Config
|
||||
|
||||
|
@ -194,7 +194,7 @@ func (users *users) Insert(ctx context.Context, user *console.User) (_ *console.
|
||||
return userFromDBX(ctx, createdUser)
|
||||
}
|
||||
|
||||
// Delete is a method for deleting user by Id from the database.
|
||||
// Delete is a method for deleting user by ID from the database.
|
||||
func (users *users) Delete(ctx context.Context, id uuid.UUID) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
_, err = users.db.Delete_User_By_Id(ctx, dbx.User_Id(id[:]))
|
||||
@ -202,6 +202,71 @@ func (users *users) Delete(ctx context.Context, id uuid.UUID) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
// DeleteUnverifiedBefore deletes unverified users created prior to some time from the database.
|
||||
func (users *users) DeleteUnverifiedBefore(
|
||||
ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration, pageSize int) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if pageSize <= 0 {
|
||||
return Error.New("expected page size to be positive; got %d", pageSize)
|
||||
}
|
||||
|
||||
var pageCursor, pageEnd uuid.UUID
|
||||
aost := users.db.impl.AsOfSystemInterval(asOfSystemTimeInterval)
|
||||
for {
|
||||
// Select the ID beginning this page of records
|
||||
err = users.db.QueryRowContext(ctx, `
|
||||
SELECT id FROM users
|
||||
`+aost+`
|
||||
WHERE id > $1 AND users.status = $2 AND users.created_at < $3
|
||||
ORDER BY id LIMIT 1
|
||||
`, pageCursor, console.Inactive, before).Scan(&pageCursor)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return nil
|
||||
}
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
// Select the ID ending this page of records
|
||||
err = users.db.QueryRowContext(ctx, `
|
||||
SELECT id FROM users
|
||||
`+aost+`
|
||||
WHERE id >= $1
|
||||
ORDER BY id LIMIT 1 OFFSET $2
|
||||
`, pageCursor, pageSize).Scan(&pageEnd)
|
||||
if err != nil {
|
||||
if !errors.Is(err, sql.ErrNoRows) {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
// Since this is the last page, we want to return all remaining IDs
|
||||
pageEnd = uuid.Max()
|
||||
}
|
||||
|
||||
// Delete all old, unverified users in the range between the beginning and ending IDs
|
||||
_, err = users.db.ExecContext(ctx, `
|
||||
DELETE FROM users
|
||||
WHERE id IN (
|
||||
SELECT id FROM users
|
||||
`+aost+`
|
||||
WHERE id >= $1 AND id <= $2
|
||||
AND users.status = $3 AND users.created_at < $4
|
||||
ORDER BY id
|
||||
)
|
||||
`, pageCursor, pageEnd, console.Inactive, before)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
if pageEnd == uuid.Max() {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Advance the cursor to the next page
|
||||
pageCursor = pageEnd
|
||||
}
|
||||
}
|
||||
|
||||
// Update is a method for updating user entity.
|
||||
func (users *users) Update(ctx context.Context, userID uuid.UUID, updateRequest console.UpdateUserRequest) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
@ -13,6 +13,7 @@ import (
|
||||
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/testrand"
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/console"
|
||||
"storj.io/storj/satellite/satellitedb/satellitedbtest"
|
||||
@ -386,3 +387,52 @@ func TestUserSettings(t *testing.T) {
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestDeleteUnverifiedBefore(t *testing.T) {
|
||||
maxUnverifiedAge := time.Hour
|
||||
now := time.Now()
|
||||
expiration := now.Add(-maxUnverifiedAge)
|
||||
|
||||
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
||||
usersDB := db.Console().Users()
|
||||
now := time.Now()
|
||||
|
||||
// Only positive page sizes should be allowed.
|
||||
require.Error(t, usersDB.DeleteUnverifiedBefore(ctx, time.Time{}, 0, 0))
|
||||
require.Error(t, usersDB.DeleteUnverifiedBefore(ctx, time.Time{}, 0, -1))
|
||||
|
||||
createUser := func(status console.UserStatus, createdAt time.Time) uuid.UUID {
|
||||
user, err := usersDB.Insert(ctx, &console.User{
|
||||
ID: testrand.UUID(),
|
||||
PasswordHash: testrand.Bytes(8),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
result, err := db.Testing().RawDB().ExecContext(ctx,
|
||||
"UPDATE users SET created_at = $1, status = $2 WHERE id = $3",
|
||||
createdAt, status, user.ID,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
count, err := result.RowsAffected()
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 1, count)
|
||||
|
||||
return user.ID
|
||||
}
|
||||
|
||||
oldActive := createUser(console.Active, expiration.Add(-time.Second))
|
||||
newUnverified := createUser(console.Inactive, now)
|
||||
oldUnverified := createUser(console.Inactive, expiration.Add(-time.Second))
|
||||
|
||||
require.NoError(t, usersDB.DeleteUnverifiedBefore(ctx, expiration, 0, 1))
|
||||
|
||||
// Ensure that the old, unverified user record was deleted and the others remain.
|
||||
_, err := usersDB.Get(ctx, oldUnverified)
|
||||
require.ErrorIs(t, err, sql.ErrNoRows)
|
||||
_, err = usersDB.Get(ctx, newUnverified)
|
||||
require.NoError(t, err)
|
||||
_, err = usersDB.Get(ctx, oldActive)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
15
scripts/testdata/satellite-config.yaml.lock
vendored
15
scripts/testdata/satellite-config.yaml.lock
vendored
@ -130,6 +130,21 @@ compensation.withheld-percents: 75,75,75,50,50,50,25,25,25,0,0,0,0,0,0
|
||||
# expiration time for account recovery and activation tokens
|
||||
# console-auth.token-expiration-time: 30m0s
|
||||
|
||||
# interval for 'AS OF SYSTEM TIME' clause (CockroachDB specific) to read from the DB at a specific time in the past
|
||||
# console-db-cleanup.as-of-system-time-interval: -5m0s
|
||||
|
||||
# whether to run this chore
|
||||
# console-db-cleanup.enabled: false
|
||||
|
||||
# interval between chore cycles
|
||||
# console-db-cleanup.interval: 24h0m0s
|
||||
|
||||
# maximum lifetime of unverified user account records
|
||||
# console-db-cleanup.max-unverified-user-age: 168h0m0s
|
||||
|
||||
# maximum number of database records to scan at once
|
||||
# console-db-cleanup.page-size: 1000
|
||||
|
||||
# the Flagship API key
|
||||
# console.ab-testing.api-key: ""
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user