diff --git a/private/testplanet/satellite.go b/private/testplanet/satellite.go index 5dcef573a..c7b87be2c 100644 --- a/private/testplanet/satellite.go +++ b/private/testplanet/satellite.go @@ -553,6 +553,7 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int }, DBCleanup: dbcleanup.Config{ SerialsInterval: defaultInterval, + BatchSize: 1000, }, Tally: tally.Config{ Interval: defaultInterval, diff --git a/satellite/dbcleanup/chore.go b/satellite/dbcleanup/chore.go index 51ef40249..62fe86de0 100644 --- a/satellite/dbcleanup/chore.go +++ b/satellite/dbcleanup/chore.go @@ -25,6 +25,7 @@ var ( // Config defines configuration struct for dbcleanup chore. type Config struct { SerialsInterval time.Duration `help:"how often to delete expired serial numbers" default:"4h"` + BatchSize int `help:"number of records to delete per delete execution. 0 indicates no limit. applicable to cockroach DB only." default:"1000"` } // Chore for deleting DB entries that are no longer needed. @@ -35,15 +36,26 @@ type Chore struct { orders orders.DB Serials *sync2.Cycle + config Config + options *orders.SerialDeleteOptions } // NewChore creates new chore for deleting DB entries. -func NewChore(log *zap.Logger, orders orders.DB, config Config) *Chore { +func NewChore(log *zap.Logger, ordersDB orders.DB, config Config) *Chore { + var options *orders.SerialDeleteOptions + if config.BatchSize > 0 { + options = &orders.SerialDeleteOptions{ + BatchSize: config.BatchSize, + } + } + return &Chore{ log: log, - orders: orders, + orders: ordersDB, Serials: sync2.NewCycle(config.SerialsInterval), + config: config, + options: options, } } @@ -59,7 +71,7 @@ func (chore *Chore) deleteExpiredSerials(ctx context.Context) (err error) { now := time.Now() - deleted, err := chore.orders.DeleteExpiredSerials(ctx, now) + deleted, err := chore.orders.DeleteExpiredSerials(ctx, now, chore.options) if err != nil { chore.log.Error("deleting expired serial numbers", zap.Error(err)) } else { diff --git a/satellite/orders/endpoint.go b/satellite/orders/endpoint.go index e587ccb03..aee836111 100644 --- a/satellite/orders/endpoint.go +++ b/satellite/orders/endpoint.go @@ -39,7 +39,7 @@ type DB interface { // UnuseSerialNumber removes pair serial number -> storage node id from database UnuseSerialNumber(ctx context.Context, serialNumber storj.SerialNumber, storageNodeID storj.NodeID) error // DeleteExpiredSerials deletes all expired serials in serial_number, used_serials, and consumed_serials table. - DeleteExpiredSerials(ctx context.Context, now time.Time) (_ int, err error) + DeleteExpiredSerials(ctx context.Context, now time.Time, options *SerialDeleteOptions) (_ int, err error) // DeleteExpiredConsumedSerials deletes all expired serials in the consumed_serials table. DeleteExpiredConsumedSerials(ctx context.Context, now time.Time) (_ int, err error) // GetBucketIDFromSerialNumber returns the bucket ID associated with the serial number @@ -72,6 +72,11 @@ type DB interface { WithQueue(ctx context.Context, cb func(ctx context.Context, queue Queue) error) error } +// SerialDeleteOptions are option when deleting from serial tables. +type SerialDeleteOptions struct { + BatchSize int +} + // Transaction represents a database transaction but with higher level actions. type Transaction interface { // UpdateBucketBandwidthBatch updates all the bandwidth rollups in the database diff --git a/satellite/orders/serials_test.go b/satellite/orders/serials_test.go index 83323a3e8..4ba0fb5dd 100644 --- a/satellite/orders/serials_test.go +++ b/satellite/orders/serials_test.go @@ -46,7 +46,7 @@ func TestSerialNumbers(t *testing.T) { require.True(t, orders.ErrUsingSerialNumber.Has(err)) require.Empty(t, bucketID) - deleted, err := ordersDB.DeleteExpiredSerials(ctx, time.Now()) + deleted, err := ordersDB.DeleteExpiredSerials(ctx, time.Now(), nil) require.NoError(t, err) require.Equal(t, deleted, 1) @@ -55,3 +55,30 @@ func TestSerialNumbers(t *testing.T) { require.EqualError(t, err, "serial number: serial number not found") }) } + +func TestDeleteExpiredWithOptions(t *testing.T) { + satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { + ordersDB := db.Orders() + + err := ordersDB.CreateSerialInfo(ctx, storj.SerialNumber{1}, []byte("bucketID1"), time.Now()) + require.NoError(t, err) + + err = ordersDB.CreateSerialInfo(ctx, storj.SerialNumber{2}, []byte("bucketID2"), time.Now()) + require.NoError(t, err) + + options := &orders.SerialDeleteOptions{ + BatchSize: 1, + } + + deleted, err := ordersDB.DeleteExpiredSerials(ctx, time.Now(), options) + require.NoError(t, err) + require.Equal(t, 2, deleted) + + // check serial number has been deleted from serial_numbers and used_serials + _, err = ordersDB.UseSerialNumber(ctx, storj.SerialNumber{1}, storj.NodeID{1}) + require.EqualError(t, err, "serial number: serial number not found") + + _, err = ordersDB.UseSerialNumber(ctx, storj.SerialNumber{2}, storj.NodeID{1}) + require.EqualError(t, err, "serial number: serial number not found") + }) +} diff --git a/satellite/satellitedb/orders.go b/satellite/satellitedb/orders.go index 3cfb09291..d14647a61 100644 --- a/satellite/satellitedb/orders.go +++ b/satellite/satellitedb/orders.go @@ -58,10 +58,48 @@ func (db *ordersDB) CreateSerialInfo(ctx context.Context, serialNumber storj.Ser ) } -// DeleteExpiredSerials deletes all expired serials in serial_number and used_serials table. -func (db *ordersDB) DeleteExpiredSerials(ctx context.Context, now time.Time) (_ int, err error) { +// DeleteExpiredSerials deletes all expired serials in the serial_number table. +func (db *ordersDB) DeleteExpiredSerials(ctx context.Context, now time.Time, options *orders.SerialDeleteOptions) (_ int, err error) { defer mon.Task()(&ctx)(&err) + if db.db.implementation == dbutil.Cockroach && options != nil { + var deleted int + + for { + d, err := func() (_ int, err error) { + var r int + rs, err := db.db.Query(ctx, "DELETE FROM serial_numbers WHERE expires_at <= $1 ORDER BY expires_at DESC LIMIT $2 RETURNING expires_at;", now.UTC(), options.BatchSize) + if err != nil { + return 0, err + } + defer func() { err = errs.Combine(err, rs.Close()) }() + + for rs.Next() { + err = rs.Scan(&now) + if err != nil { + return r, err + } + r++ + } + if rs.Err() != nil { + return r, rs.Err() + } + + return r, nil + }() + deleted += d + if err != nil { + return deleted, err + } + + if d < options.BatchSize { + break + } + } + + return deleted, err + } + count, err := db.db.Delete_SerialNumber_By_ExpiresAt_LessOrEqual(ctx, dbx.SerialNumber_ExpiresAt(now.UTC())) if err != nil { return 0, err diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index b6eba20e4..b6bf87fc8 100755 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -157,6 +157,9 @@ contact.external-address: "" # macaroon revocation cache expiration # database-options.revocations-cache.expiration: 5m0s +# number of records to delete per delete execution. 0 indicates no limit. applicable to cockroach DB only. +# db-cleanup.batch-size: 1000 + # how often to delete expired serial numbers # db-cleanup.serials-interval: 4h0m0s