From f61230a670450b43d70badeac170ce42d44a37ed Mon Sep 17 00:00:00 2001 From: Jeremy Wharton Date: Fri, 28 Apr 2023 06:43:27 -0500 Subject: [PATCH] satellite/console/dbcleanup: create console DB cleanup chore A chore responsible for purging data from the console DB has been implemented. Currently, it removes old records for unverified user accounts. We plan to extend this functionality to include expired project member invitations in the future. Resolves #5790 References #5816 Change-Id: I1f3ef62fc96c10a42a383804b3b1d2846d7813f7 --- cmd/tools/segment-verify/process_test.go | 2 +- cmd/tools/segment-verify/service_test.go | 6 +- satellite/console/dbcleanup/chore.go | 65 ++++++++++++++++++++ satellite/console/users.go | 4 +- satellite/core.go | 20 ++++++ satellite/metabase/loop.go | 10 +-- satellite/peer.go | 10 +-- satellite/satellitedb/users.go | 67 ++++++++++++++++++++- satellite/satellitedb/users_test.go | 50 +++++++++++++++ scripts/testdata/satellite-config.yaml.lock | 15 +++++ 10 files changed, 229 insertions(+), 20 deletions(-) create mode 100644 satellite/console/dbcleanup/chore.go diff --git a/cmd/tools/segment-verify/process_test.go b/cmd/tools/segment-verify/process_test.go index 43c9c91a7..92455ee02 100644 --- a/cmd/tools/segment-verify/process_test.go +++ b/cmd/tools/segment-verify/process_test.go @@ -62,7 +62,7 @@ func TestProcess(t *testing.T) { } } - err = service.ProcessRange(ctx, uuid.UUID{}, maxUUID) + err = service.ProcessRange(ctx, uuid.UUID{}, uuid.Max()) require.NoError(t, err) require.NoError(t, service.Close()) diff --git a/cmd/tools/segment-verify/service_test.go b/cmd/tools/segment-verify/service_test.go index ef4414f49..177824d71 100644 --- a/cmd/tools/segment-verify/service_test.go +++ b/cmd/tools/segment-verify/service_test.go @@ -26,8 +26,6 @@ import ( "storj.io/storj/satellite/overlay" ) -var maxUUID = uuid.UUID{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff} - func TestService_EmptyRange(t *testing.T) { ctx := testcontext.New(t) log := testplanet.NewLogger(t) @@ -48,7 +46,7 @@ func TestService_EmptyRange(t *testing.T) { defer ctx.Check(service.Close) - err = service.ProcessRange(ctx, uuid.UUID{}, maxUUID) + err = service.ProcessRange(ctx, uuid.UUID{}, uuid.Max()) require.NoError(t, err) } @@ -271,7 +269,7 @@ func TestService_Failures(t *testing.T) { defer ctx.Check(service.Close) - err = service.ProcessRange(ctx, uuid.UUID{}, maxUUID) + err = service.ProcessRange(ctx, uuid.UUID{}, uuid.Max()) require.NoError(t, err) for node, list := range verifier.processed { diff --git a/satellite/console/dbcleanup/chore.go b/satellite/console/dbcleanup/chore.go new file mode 100644 index 000000000..70dcc6d48 --- /dev/null +++ b/satellite/console/dbcleanup/chore.go @@ -0,0 +1,65 @@ +// Copyright (C) 2023 Storj Labs, Inc. +// See LICENSE for copying information. + +package dbcleanup + +import ( + "context" + "time" + + "github.com/spacemonkeygo/monkit/v3" + "go.uber.org/zap" + + "storj.io/common/sync2" + "storj.io/storj/satellite/console" +) + +var mon = monkit.Package() + +// Config contains the configuration for the console DB cleanup chore. +type Config struct { + Enabled bool `help:"whether to run this chore" default:"false"` + Interval time.Duration `help:"interval between chore cycles" default:"24h"` + + AsOfSystemTimeInterval time.Duration `help:"interval for 'AS OF SYSTEM TIME' clause (CockroachDB specific) to read from the DB at a specific time in the past" default:"-5m" testDefault:"0"` + PageSize int `help:"maximum number of database records to scan at once" default:"1000"` + + MaxUnverifiedUserAge time.Duration `help:"maximum lifetime of unverified user account records" default:"168h"` +} + +// Chore periodically removes unwanted records from the satellite console database. +type Chore struct { + log *zap.Logger + db console.DB + Loop *sync2.Cycle + config Config +} + +// NewChore creates a new console DB cleanup chore. +func NewChore(log *zap.Logger, db console.DB, config Config) *Chore { + return &Chore{ + log: log, + db: db, + config: config, + Loop: sync2.NewCycle(config.Interval), + } +} + +// Run runs the console DB cleanup chore. +func (chore *Chore) Run(ctx context.Context) (err error) { + defer mon.Task()(&ctx)(&err) + return chore.Loop.Run(ctx, func(ctx context.Context) error { + before := time.Now().Add(-chore.config.MaxUnverifiedUserAge) + err := chore.db.Users().DeleteUnverifiedBefore(ctx, before, chore.config.AsOfSystemTimeInterval, chore.config.PageSize) + if err != nil { + chore.log.Error("Error deleting unverified users", zap.Error(err)) + } + return nil + }) +} + +// Close stops the console DB cleanup chore. +func (chore *Chore) Close() error { + chore.Loop.Stop() + return nil +} diff --git a/satellite/console/users.go b/satellite/console/users.go index 680903810..8ad6ecf16 100644 --- a/satellite/console/users.go +++ b/satellite/console/users.go @@ -33,8 +33,10 @@ type Users interface { GetByEmail(ctx context.Context, email string) (*User, error) // Insert is a method for inserting user into the database. Insert(ctx context.Context, user *User) (*User, error) - // Delete is a method for deleting user by Id from the database. + // Delete is a method for deleting user by ID from the database. Delete(ctx context.Context, id uuid.UUID) error + // DeleteUnverifiedBefore deletes unverified users created prior to some time from the database. + DeleteUnverifiedBefore(ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration, pageSize int) error // Update is a method for updating user entity. Update(ctx context.Context, userID uuid.UUID, request UpdateUserRequest) error // UpdatePaidTier sets whether the user is in the paid tier. diff --git a/satellite/core.go b/satellite/core.go index 76aa3974a..10b6a0356 100644 --- a/satellite/core.go +++ b/satellite/core.go @@ -32,6 +32,7 @@ import ( "storj.io/storj/satellite/audit" "storj.io/storj/satellite/console" "storj.io/storj/satellite/console/consoleauth" + "storj.io/storj/satellite/console/dbcleanup" "storj.io/storj/satellite/console/emailreminders" "storj.io/storj/satellite/mailservice" "storj.io/storj/satellite/metabase" @@ -140,6 +141,10 @@ type Core struct { StorjscanService *storjscan.Service StorjscanChore *storjscan.Chore } + + ConsoleDBCleanup struct { + Chore *dbcleanup.Chore + } } // New creates a new satellite. @@ -568,6 +573,21 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, } } + // setup console DB cleanup service + if config.ConsoleDBCleanup.Enabled { + peer.ConsoleDBCleanup.Chore = dbcleanup.NewChore( + peer.Log.Named("console.dbcleanup:chore"), + peer.DB.Console(), + config.ConsoleDBCleanup, + ) + + peer.Services.Add(lifecycle.Item{ + Name: "dbcleanup:chore", + Run: peer.ConsoleDBCleanup.Chore.Run, + Close: peer.ConsoleDBCleanup.Chore.Close, + }) + } + return peer, nil } diff --git a/satellite/metabase/loop.go b/satellite/metabase/loop.go index 3c5895cea..999d572f2 100644 --- a/satellite/metabase/loop.go +++ b/satellite/metabase/loop.go @@ -274,10 +274,7 @@ func (db *DB) IterateLoopSegments(ctx context.Context, opts IterateLoopSegments, it.cursor.StartPosition = SegmentPosition{math.MaxInt32, math.MaxInt32} } if it.cursor.EndStreamID.IsZero() { - it.cursor.EndStreamID, err = maxUUID() - if err != nil { - return err - } + it.cursor.EndStreamID = uuid.Max() } loopIteratorBatchSizeLimit.Ensure(&it.batchSize) @@ -410,11 +407,6 @@ func (it *loopSegmentIterator) scanItem(ctx context.Context, item *LoopSegmentEn return nil } -func maxUUID() (uuid.UUID, error) { - maxUUID, err := uuid.FromString("ffffffff-ffff-ffff-ffff-ffffffffffff") - return maxUUID, err -} - // BucketTally contains information about aggregate data stored in a bucket. type BucketTally struct { BucketLocation diff --git a/satellite/peer.go b/satellite/peer.go index a787f64a7..6e1f877ae 100644 --- a/satellite/peer.go +++ b/satellite/peer.go @@ -37,6 +37,7 @@ import ( "storj.io/storj/satellite/console" "storj.io/storj/satellite/console/consoleauth" "storj.io/storj/satellite/console/consoleweb" + "storj.io/storj/satellite/console/dbcleanup" "storj.io/storj/satellite/console/emailreminders" "storj.io/storj/satellite/console/restkeys" "storj.io/storj/satellite/console/userinfo" @@ -197,10 +198,11 @@ type Config struct { Payments paymentsconfig.Config - RESTKeys restkeys.Config - Console consoleweb.Config - ConsoleAuth consoleauth.Config - EmailReminders emailreminders.Config + RESTKeys restkeys.Config + Console consoleweb.Config + ConsoleAuth consoleauth.Config + EmailReminders emailreminders.Config + ConsoleDBCleanup dbcleanup.Config AccountFreeze accountfreeze.Config diff --git a/satellite/satellitedb/users.go b/satellite/satellitedb/users.go index bed8f81b0..10cc7888e 100644 --- a/satellite/satellitedb/users.go +++ b/satellite/satellitedb/users.go @@ -194,7 +194,7 @@ func (users *users) Insert(ctx context.Context, user *console.User) (_ *console. return userFromDBX(ctx, createdUser) } -// Delete is a method for deleting user by Id from the database. +// Delete is a method for deleting user by ID from the database. func (users *users) Delete(ctx context.Context, id uuid.UUID) (err error) { defer mon.Task()(&ctx)(&err) _, err = users.db.Delete_User_By_Id(ctx, dbx.User_Id(id[:])) @@ -202,6 +202,71 @@ func (users *users) Delete(ctx context.Context, id uuid.UUID) (err error) { return err } +// DeleteUnverifiedBefore deletes unverified users created prior to some time from the database. +func (users *users) DeleteUnverifiedBefore( + ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration, pageSize int) (err error) { + defer mon.Task()(&ctx)(&err) + + if pageSize <= 0 { + return Error.New("expected page size to be positive; got %d", pageSize) + } + + var pageCursor, pageEnd uuid.UUID + aost := users.db.impl.AsOfSystemInterval(asOfSystemTimeInterval) + for { + // Select the ID beginning this page of records + err = users.db.QueryRowContext(ctx, ` + SELECT id FROM users + `+aost+` + WHERE id > $1 AND users.status = $2 AND users.created_at < $3 + ORDER BY id LIMIT 1 + `, pageCursor, console.Inactive, before).Scan(&pageCursor) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil + } + return Error.Wrap(err) + } + + // Select the ID ending this page of records + err = users.db.QueryRowContext(ctx, ` + SELECT id FROM users + `+aost+` + WHERE id >= $1 + ORDER BY id LIMIT 1 OFFSET $2 + `, pageCursor, pageSize).Scan(&pageEnd) + if err != nil { + if !errors.Is(err, sql.ErrNoRows) { + return Error.Wrap(err) + } + // Since this is the last page, we want to return all remaining IDs + pageEnd = uuid.Max() + } + + // Delete all old, unverified users in the range between the beginning and ending IDs + _, err = users.db.ExecContext(ctx, ` + DELETE FROM users + WHERE id IN ( + SELECT id FROM users + `+aost+` + WHERE id >= $1 AND id <= $2 + AND users.status = $3 AND users.created_at < $4 + ORDER BY id + ) + `, pageCursor, pageEnd, console.Inactive, before) + if err != nil { + return Error.Wrap(err) + } + + if pageEnd == uuid.Max() { + return nil + } + + // Advance the cursor to the next page + pageCursor = pageEnd + } +} + // Update is a method for updating user entity. func (users *users) Update(ctx context.Context, userID uuid.UUID, updateRequest console.UpdateUserRequest) (err error) { defer mon.Task()(&ctx)(&err) diff --git a/satellite/satellitedb/users_test.go b/satellite/satellitedb/users_test.go index e02fe0d7b..0b5edbc28 100644 --- a/satellite/satellitedb/users_test.go +++ b/satellite/satellitedb/users_test.go @@ -13,6 +13,7 @@ import ( "storj.io/common/testcontext" "storj.io/common/testrand" + "storj.io/common/uuid" "storj.io/storj/satellite" "storj.io/storj/satellite/console" "storj.io/storj/satellite/satellitedb/satellitedbtest" @@ -386,3 +387,52 @@ func TestUserSettings(t *testing.T) { }) }) } + +func TestDeleteUnverifiedBefore(t *testing.T) { + maxUnverifiedAge := time.Hour + now := time.Now() + expiration := now.Add(-maxUnverifiedAge) + + satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { + usersDB := db.Console().Users() + now := time.Now() + + // Only positive page sizes should be allowed. + require.Error(t, usersDB.DeleteUnverifiedBefore(ctx, time.Time{}, 0, 0)) + require.Error(t, usersDB.DeleteUnverifiedBefore(ctx, time.Time{}, 0, -1)) + + createUser := func(status console.UserStatus, createdAt time.Time) uuid.UUID { + user, err := usersDB.Insert(ctx, &console.User{ + ID: testrand.UUID(), + PasswordHash: testrand.Bytes(8), + }) + require.NoError(t, err) + + result, err := db.Testing().RawDB().ExecContext(ctx, + "UPDATE users SET created_at = $1, status = $2 WHERE id = $3", + createdAt, status, user.ID, + ) + require.NoError(t, err) + + count, err := result.RowsAffected() + require.NoError(t, err) + require.EqualValues(t, 1, count) + + return user.ID + } + + oldActive := createUser(console.Active, expiration.Add(-time.Second)) + newUnverified := createUser(console.Inactive, now) + oldUnverified := createUser(console.Inactive, expiration.Add(-time.Second)) + + require.NoError(t, usersDB.DeleteUnverifiedBefore(ctx, expiration, 0, 1)) + + // Ensure that the old, unverified user record was deleted and the others remain. + _, err := usersDB.Get(ctx, oldUnverified) + require.ErrorIs(t, err, sql.ErrNoRows) + _, err = usersDB.Get(ctx, newUnverified) + require.NoError(t, err) + _, err = usersDB.Get(ctx, oldActive) + require.NoError(t, err) + }) +} diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index 7d644ba34..f4b8cdba2 100755 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -130,6 +130,21 @@ compensation.withheld-percents: 75,75,75,50,50,50,25,25,25,0,0,0,0,0,0 # expiration time for account recovery and activation tokens # console-auth.token-expiration-time: 30m0s +# interval for 'AS OF SYSTEM TIME' clause (CockroachDB specific) to read from the DB at a specific time in the past +# console-db-cleanup.as-of-system-time-interval: -5m0s + +# whether to run this chore +# console-db-cleanup.enabled: false + +# interval between chore cycles +# console-db-cleanup.interval: 24h0m0s + +# maximum lifetime of unverified user account records +# console-db-cleanup.max-unverified-user-age: 168h0m0s + +# maximum number of database records to scan at once +# console-db-cleanup.page-size: 1000 + # the Flagship API key # console.ab-testing.api-key: ""