diff --git a/cmd/tools/segment-verify/process_test.go b/cmd/tools/segment-verify/process_test.go index 43c9c91a7..92455ee02 100644 --- a/cmd/tools/segment-verify/process_test.go +++ b/cmd/tools/segment-verify/process_test.go @@ -62,7 +62,7 @@ func TestProcess(t *testing.T) { } } - err = service.ProcessRange(ctx, uuid.UUID{}, maxUUID) + err = service.ProcessRange(ctx, uuid.UUID{}, uuid.Max()) require.NoError(t, err) require.NoError(t, service.Close()) diff --git a/cmd/tools/segment-verify/service_test.go b/cmd/tools/segment-verify/service_test.go index ef4414f49..177824d71 100644 --- a/cmd/tools/segment-verify/service_test.go +++ b/cmd/tools/segment-verify/service_test.go @@ -26,8 +26,6 @@ import ( "storj.io/storj/satellite/overlay" ) -var maxUUID = uuid.UUID{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff} - func TestService_EmptyRange(t *testing.T) { ctx := testcontext.New(t) log := testplanet.NewLogger(t) @@ -48,7 +46,7 @@ func TestService_EmptyRange(t *testing.T) { defer ctx.Check(service.Close) - err = service.ProcessRange(ctx, uuid.UUID{}, maxUUID) + err = service.ProcessRange(ctx, uuid.UUID{}, uuid.Max()) require.NoError(t, err) } @@ -271,7 +269,7 @@ func TestService_Failures(t *testing.T) { defer ctx.Check(service.Close) - err = service.ProcessRange(ctx, uuid.UUID{}, maxUUID) + err = service.ProcessRange(ctx, uuid.UUID{}, uuid.Max()) require.NoError(t, err) for node, list := range verifier.processed { diff --git a/satellite/console/dbcleanup/chore.go b/satellite/console/dbcleanup/chore.go new file mode 100644 index 000000000..70dcc6d48 --- /dev/null +++ b/satellite/console/dbcleanup/chore.go @@ -0,0 +1,65 @@ +// Copyright (C) 2023 Storj Labs, Inc. +// See LICENSE for copying information. + +package dbcleanup + +import ( + "context" + "time" + + "github.com/spacemonkeygo/monkit/v3" + "go.uber.org/zap" + + "storj.io/common/sync2" + "storj.io/storj/satellite/console" +) + +var mon = monkit.Package() + +// Config contains the configuration for the console DB cleanup chore. +type Config struct { + Enabled bool `help:"whether to run this chore" default:"false"` + Interval time.Duration `help:"interval between chore cycles" default:"24h"` + + AsOfSystemTimeInterval time.Duration `help:"interval for 'AS OF SYSTEM TIME' clause (CockroachDB specific) to read from the DB at a specific time in the past" default:"-5m" testDefault:"0"` + PageSize int `help:"maximum number of database records to scan at once" default:"1000"` + + MaxUnverifiedUserAge time.Duration `help:"maximum lifetime of unverified user account records" default:"168h"` +} + +// Chore periodically removes unwanted records from the satellite console database. +type Chore struct { + log *zap.Logger + db console.DB + Loop *sync2.Cycle + config Config +} + +// NewChore creates a new console DB cleanup chore. +func NewChore(log *zap.Logger, db console.DB, config Config) *Chore { + return &Chore{ + log: log, + db: db, + config: config, + Loop: sync2.NewCycle(config.Interval), + } +} + +// Run runs the console DB cleanup chore. +func (chore *Chore) Run(ctx context.Context) (err error) { + defer mon.Task()(&ctx)(&err) + return chore.Loop.Run(ctx, func(ctx context.Context) error { + before := time.Now().Add(-chore.config.MaxUnverifiedUserAge) + err := chore.db.Users().DeleteUnverifiedBefore(ctx, before, chore.config.AsOfSystemTimeInterval, chore.config.PageSize) + if err != nil { + chore.log.Error("Error deleting unverified users", zap.Error(err)) + } + return nil + }) +} + +// Close stops the console DB cleanup chore. +func (chore *Chore) Close() error { + chore.Loop.Stop() + return nil +} diff --git a/satellite/console/users.go b/satellite/console/users.go index 680903810..8ad6ecf16 100644 --- a/satellite/console/users.go +++ b/satellite/console/users.go @@ -33,8 +33,10 @@ type Users interface { GetByEmail(ctx context.Context, email string) (*User, error) // Insert is a method for inserting user into the database. Insert(ctx context.Context, user *User) (*User, error) - // Delete is a method for deleting user by Id from the database. + // Delete is a method for deleting user by ID from the database. Delete(ctx context.Context, id uuid.UUID) error + // DeleteUnverifiedBefore deletes unverified users created prior to some time from the database. + DeleteUnverifiedBefore(ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration, pageSize int) error // Update is a method for updating user entity. Update(ctx context.Context, userID uuid.UUID, request UpdateUserRequest) error // UpdatePaidTier sets whether the user is in the paid tier. diff --git a/satellite/core.go b/satellite/core.go index 76aa3974a..10b6a0356 100644 --- a/satellite/core.go +++ b/satellite/core.go @@ -32,6 +32,7 @@ import ( "storj.io/storj/satellite/audit" "storj.io/storj/satellite/console" "storj.io/storj/satellite/console/consoleauth" + "storj.io/storj/satellite/console/dbcleanup" "storj.io/storj/satellite/console/emailreminders" "storj.io/storj/satellite/mailservice" "storj.io/storj/satellite/metabase" @@ -140,6 +141,10 @@ type Core struct { StorjscanService *storjscan.Service StorjscanChore *storjscan.Chore } + + ConsoleDBCleanup struct { + Chore *dbcleanup.Chore + } } // New creates a new satellite. @@ -568,6 +573,21 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, } } + // setup console DB cleanup service + if config.ConsoleDBCleanup.Enabled { + peer.ConsoleDBCleanup.Chore = dbcleanup.NewChore( + peer.Log.Named("console.dbcleanup:chore"), + peer.DB.Console(), + config.ConsoleDBCleanup, + ) + + peer.Services.Add(lifecycle.Item{ + Name: "dbcleanup:chore", + Run: peer.ConsoleDBCleanup.Chore.Run, + Close: peer.ConsoleDBCleanup.Chore.Close, + }) + } + return peer, nil } diff --git a/satellite/metabase/loop.go b/satellite/metabase/loop.go index 3c5895cea..999d572f2 100644 --- a/satellite/metabase/loop.go +++ b/satellite/metabase/loop.go @@ -274,10 +274,7 @@ func (db *DB) IterateLoopSegments(ctx context.Context, opts IterateLoopSegments, it.cursor.StartPosition = SegmentPosition{math.MaxInt32, math.MaxInt32} } if it.cursor.EndStreamID.IsZero() { - it.cursor.EndStreamID, err = maxUUID() - if err != nil { - return err - } + it.cursor.EndStreamID = uuid.Max() } loopIteratorBatchSizeLimit.Ensure(&it.batchSize) @@ -410,11 +407,6 @@ func (it *loopSegmentIterator) scanItem(ctx context.Context, item *LoopSegmentEn return nil } -func maxUUID() (uuid.UUID, error) { - maxUUID, err := uuid.FromString("ffffffff-ffff-ffff-ffff-ffffffffffff") - return maxUUID, err -} - // BucketTally contains information about aggregate data stored in a bucket. type BucketTally struct { BucketLocation diff --git a/satellite/peer.go b/satellite/peer.go index a787f64a7..6e1f877ae 100644 --- a/satellite/peer.go +++ b/satellite/peer.go @@ -37,6 +37,7 @@ import ( "storj.io/storj/satellite/console" "storj.io/storj/satellite/console/consoleauth" "storj.io/storj/satellite/console/consoleweb" + "storj.io/storj/satellite/console/dbcleanup" "storj.io/storj/satellite/console/emailreminders" "storj.io/storj/satellite/console/restkeys" "storj.io/storj/satellite/console/userinfo" @@ -197,10 +198,11 @@ type Config struct { Payments paymentsconfig.Config - RESTKeys restkeys.Config - Console consoleweb.Config - ConsoleAuth consoleauth.Config - EmailReminders emailreminders.Config + RESTKeys restkeys.Config + Console consoleweb.Config + ConsoleAuth consoleauth.Config + EmailReminders emailreminders.Config + ConsoleDBCleanup dbcleanup.Config AccountFreeze accountfreeze.Config diff --git a/satellite/satellitedb/users.go b/satellite/satellitedb/users.go index bed8f81b0..10cc7888e 100644 --- a/satellite/satellitedb/users.go +++ b/satellite/satellitedb/users.go @@ -194,7 +194,7 @@ func (users *users) Insert(ctx context.Context, user *console.User) (_ *console. return userFromDBX(ctx, createdUser) } -// Delete is a method for deleting user by Id from the database. +// Delete is a method for deleting user by ID from the database. func (users *users) Delete(ctx context.Context, id uuid.UUID) (err error) { defer mon.Task()(&ctx)(&err) _, err = users.db.Delete_User_By_Id(ctx, dbx.User_Id(id[:])) @@ -202,6 +202,71 @@ func (users *users) Delete(ctx context.Context, id uuid.UUID) (err error) { return err } +// DeleteUnverifiedBefore deletes unverified users created prior to some time from the database. +func (users *users) DeleteUnverifiedBefore( + ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration, pageSize int) (err error) { + defer mon.Task()(&ctx)(&err) + + if pageSize <= 0 { + return Error.New("expected page size to be positive; got %d", pageSize) + } + + var pageCursor, pageEnd uuid.UUID + aost := users.db.impl.AsOfSystemInterval(asOfSystemTimeInterval) + for { + // Select the ID beginning this page of records + err = users.db.QueryRowContext(ctx, ` + SELECT id FROM users + `+aost+` + WHERE id > $1 AND users.status = $2 AND users.created_at < $3 + ORDER BY id LIMIT 1 + `, pageCursor, console.Inactive, before).Scan(&pageCursor) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil + } + return Error.Wrap(err) + } + + // Select the ID ending this page of records + err = users.db.QueryRowContext(ctx, ` + SELECT id FROM users + `+aost+` + WHERE id >= $1 + ORDER BY id LIMIT 1 OFFSET $2 + `, pageCursor, pageSize).Scan(&pageEnd) + if err != nil { + if !errors.Is(err, sql.ErrNoRows) { + return Error.Wrap(err) + } + // Since this is the last page, we want to return all remaining IDs + pageEnd = uuid.Max() + } + + // Delete all old, unverified users in the range between the beginning and ending IDs + _, err = users.db.ExecContext(ctx, ` + DELETE FROM users + WHERE id IN ( + SELECT id FROM users + `+aost+` + WHERE id >= $1 AND id <= $2 + AND users.status = $3 AND users.created_at < $4 + ORDER BY id + ) + `, pageCursor, pageEnd, console.Inactive, before) + if err != nil { + return Error.Wrap(err) + } + + if pageEnd == uuid.Max() { + return nil + } + + // Advance the cursor to the next page + pageCursor = pageEnd + } +} + // Update is a method for updating user entity. func (users *users) Update(ctx context.Context, userID uuid.UUID, updateRequest console.UpdateUserRequest) (err error) { defer mon.Task()(&ctx)(&err) diff --git a/satellite/satellitedb/users_test.go b/satellite/satellitedb/users_test.go index e02fe0d7b..0b5edbc28 100644 --- a/satellite/satellitedb/users_test.go +++ b/satellite/satellitedb/users_test.go @@ -13,6 +13,7 @@ import ( "storj.io/common/testcontext" "storj.io/common/testrand" + "storj.io/common/uuid" "storj.io/storj/satellite" "storj.io/storj/satellite/console" "storj.io/storj/satellite/satellitedb/satellitedbtest" @@ -386,3 +387,52 @@ func TestUserSettings(t *testing.T) { }) }) } + +func TestDeleteUnverifiedBefore(t *testing.T) { + maxUnverifiedAge := time.Hour + now := time.Now() + expiration := now.Add(-maxUnverifiedAge) + + satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { + usersDB := db.Console().Users() + now := time.Now() + + // Only positive page sizes should be allowed. + require.Error(t, usersDB.DeleteUnverifiedBefore(ctx, time.Time{}, 0, 0)) + require.Error(t, usersDB.DeleteUnverifiedBefore(ctx, time.Time{}, 0, -1)) + + createUser := func(status console.UserStatus, createdAt time.Time) uuid.UUID { + user, err := usersDB.Insert(ctx, &console.User{ + ID: testrand.UUID(), + PasswordHash: testrand.Bytes(8), + }) + require.NoError(t, err) + + result, err := db.Testing().RawDB().ExecContext(ctx, + "UPDATE users SET created_at = $1, status = $2 WHERE id = $3", + createdAt, status, user.ID, + ) + require.NoError(t, err) + + count, err := result.RowsAffected() + require.NoError(t, err) + require.EqualValues(t, 1, count) + + return user.ID + } + + oldActive := createUser(console.Active, expiration.Add(-time.Second)) + newUnverified := createUser(console.Inactive, now) + oldUnverified := createUser(console.Inactive, expiration.Add(-time.Second)) + + require.NoError(t, usersDB.DeleteUnverifiedBefore(ctx, expiration, 0, 1)) + + // Ensure that the old, unverified user record was deleted and the others remain. + _, err := usersDB.Get(ctx, oldUnverified) + require.ErrorIs(t, err, sql.ErrNoRows) + _, err = usersDB.Get(ctx, newUnverified) + require.NoError(t, err) + _, err = usersDB.Get(ctx, oldActive) + require.NoError(t, err) + }) +} diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index 7d644ba34..f4b8cdba2 100755 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -130,6 +130,21 @@ compensation.withheld-percents: 75,75,75,50,50,50,25,25,25,0,0,0,0,0,0 # expiration time for account recovery and activation tokens # console-auth.token-expiration-time: 30m0s +# interval for 'AS OF SYSTEM TIME' clause (CockroachDB specific) to read from the DB at a specific time in the past +# console-db-cleanup.as-of-system-time-interval: -5m0s + +# whether to run this chore +# console-db-cleanup.enabled: false + +# interval between chore cycles +# console-db-cleanup.interval: 24h0m0s + +# maximum lifetime of unverified user account records +# console-db-cleanup.max-unverified-user-age: 168h0m0s + +# maximum number of database records to scan at once +# console-db-cleanup.page-size: 1000 + # the Flagship API key # console.ab-testing.api-key: ""