satellite/satellitedb/orders: Handle serial_numbers deletes in smaller increments on CRDB
CRDB doesn't like large deletes. While testing in the POC environment we found that deletes on the serial_numbers table could take hours. This change limits deletes to 1000 at a time (configurable) to avoid blocking other queries. Change-Id: I08455e25db1574579dd4d7b7125a08e9c913dff1
This commit is contained in:
parent
2a981b86d5
commit
2b92bba563
@ -553,6 +553,7 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int
|
||||
},
|
||||
DBCleanup: dbcleanup.Config{
|
||||
SerialsInterval: defaultInterval,
|
||||
BatchSize: 1000,
|
||||
},
|
||||
Tally: tally.Config{
|
||||
Interval: defaultInterval,
|
||||
|
@ -25,6 +25,7 @@ var (
|
||||
// Config defines configuration struct for dbcleanup chore.
|
||||
type Config struct {
|
||||
SerialsInterval time.Duration `help:"how often to delete expired serial numbers" default:"4h"`
|
||||
BatchSize int `help:"number of records to delete per delete execution. 0 indicates no limit. applicable to cockroach DB only." default:"1000"`
|
||||
}
|
||||
|
||||
// Chore for deleting DB entries that are no longer needed.
|
||||
@ -35,15 +36,26 @@ type Chore struct {
|
||||
orders orders.DB
|
||||
|
||||
Serials *sync2.Cycle
|
||||
config Config
|
||||
options *orders.SerialDeleteOptions
|
||||
}
|
||||
|
||||
// NewChore creates new chore for deleting DB entries.
|
||||
func NewChore(log *zap.Logger, orders orders.DB, config Config) *Chore {
|
||||
func NewChore(log *zap.Logger, ordersDB orders.DB, config Config) *Chore {
|
||||
var options *orders.SerialDeleteOptions
|
||||
if config.BatchSize > 0 {
|
||||
options = &orders.SerialDeleteOptions{
|
||||
BatchSize: config.BatchSize,
|
||||
}
|
||||
}
|
||||
|
||||
return &Chore{
|
||||
log: log,
|
||||
orders: orders,
|
||||
orders: ordersDB,
|
||||
|
||||
Serials: sync2.NewCycle(config.SerialsInterval),
|
||||
config: config,
|
||||
options: options,
|
||||
}
|
||||
}
|
||||
|
||||
@ -59,7 +71,7 @@ func (chore *Chore) deleteExpiredSerials(ctx context.Context) (err error) {
|
||||
|
||||
now := time.Now()
|
||||
|
||||
deleted, err := chore.orders.DeleteExpiredSerials(ctx, now)
|
||||
deleted, err := chore.orders.DeleteExpiredSerials(ctx, now, chore.options)
|
||||
if err != nil {
|
||||
chore.log.Error("deleting expired serial numbers", zap.Error(err))
|
||||
} else {
|
||||
|
@ -39,7 +39,7 @@ type DB interface {
|
||||
// UnuseSerialNumber removes pair serial number -> storage node id from database
|
||||
UnuseSerialNumber(ctx context.Context, serialNumber storj.SerialNumber, storageNodeID storj.NodeID) error
|
||||
// DeleteExpiredSerials deletes all expired serials in serial_number, used_serials, and consumed_serials table.
|
||||
DeleteExpiredSerials(ctx context.Context, now time.Time) (_ int, err error)
|
||||
DeleteExpiredSerials(ctx context.Context, now time.Time, options *SerialDeleteOptions) (_ int, err error)
|
||||
// DeleteExpiredConsumedSerials deletes all expired serials in the consumed_serials table.
|
||||
DeleteExpiredConsumedSerials(ctx context.Context, now time.Time) (_ int, err error)
|
||||
// GetBucketIDFromSerialNumber returns the bucket ID associated with the serial number
|
||||
@ -72,6 +72,11 @@ type DB interface {
|
||||
WithQueue(ctx context.Context, cb func(ctx context.Context, queue Queue) error) error
|
||||
}
|
||||
|
||||
// SerialDeleteOptions are option when deleting from serial tables.
|
||||
type SerialDeleteOptions struct {
|
||||
BatchSize int
|
||||
}
|
||||
|
||||
// Transaction represents a database transaction but with higher level actions.
|
||||
type Transaction interface {
|
||||
// UpdateBucketBandwidthBatch updates all the bandwidth rollups in the database
|
||||
|
@ -46,7 +46,7 @@ func TestSerialNumbers(t *testing.T) {
|
||||
require.True(t, orders.ErrUsingSerialNumber.Has(err))
|
||||
require.Empty(t, bucketID)
|
||||
|
||||
deleted, err := ordersDB.DeleteExpiredSerials(ctx, time.Now())
|
||||
deleted, err := ordersDB.DeleteExpiredSerials(ctx, time.Now(), nil)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, deleted, 1)
|
||||
|
||||
@ -55,3 +55,30 @@ func TestSerialNumbers(t *testing.T) {
|
||||
require.EqualError(t, err, "serial number: serial number not found")
|
||||
})
|
||||
}
|
||||
|
||||
func TestDeleteExpiredWithOptions(t *testing.T) {
|
||||
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
||||
ordersDB := db.Orders()
|
||||
|
||||
err := ordersDB.CreateSerialInfo(ctx, storj.SerialNumber{1}, []byte("bucketID1"), time.Now())
|
||||
require.NoError(t, err)
|
||||
|
||||
err = ordersDB.CreateSerialInfo(ctx, storj.SerialNumber{2}, []byte("bucketID2"), time.Now())
|
||||
require.NoError(t, err)
|
||||
|
||||
options := &orders.SerialDeleteOptions{
|
||||
BatchSize: 1,
|
||||
}
|
||||
|
||||
deleted, err := ordersDB.DeleteExpiredSerials(ctx, time.Now(), options)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, deleted)
|
||||
|
||||
// check serial number has been deleted from serial_numbers and used_serials
|
||||
_, err = ordersDB.UseSerialNumber(ctx, storj.SerialNumber{1}, storj.NodeID{1})
|
||||
require.EqualError(t, err, "serial number: serial number not found")
|
||||
|
||||
_, err = ordersDB.UseSerialNumber(ctx, storj.SerialNumber{2}, storj.NodeID{1})
|
||||
require.EqualError(t, err, "serial number: serial number not found")
|
||||
})
|
||||
}
|
||||
|
@ -58,10 +58,48 @@ func (db *ordersDB) CreateSerialInfo(ctx context.Context, serialNumber storj.Ser
|
||||
)
|
||||
}
|
||||
|
||||
// DeleteExpiredSerials deletes all expired serials in serial_number and used_serials table.
|
||||
func (db *ordersDB) DeleteExpiredSerials(ctx context.Context, now time.Time) (_ int, err error) {
|
||||
// DeleteExpiredSerials deletes all expired serials in the serial_number table.
|
||||
func (db *ordersDB) DeleteExpiredSerials(ctx context.Context, now time.Time, options *orders.SerialDeleteOptions) (_ int, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if db.db.implementation == dbutil.Cockroach && options != nil {
|
||||
var deleted int
|
||||
|
||||
for {
|
||||
d, err := func() (_ int, err error) {
|
||||
var r int
|
||||
rs, err := db.db.Query(ctx, "DELETE FROM serial_numbers WHERE expires_at <= $1 ORDER BY expires_at DESC LIMIT $2 RETURNING expires_at;", now.UTC(), options.BatchSize)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer func() { err = errs.Combine(err, rs.Close()) }()
|
||||
|
||||
for rs.Next() {
|
||||
err = rs.Scan(&now)
|
||||
if err != nil {
|
||||
return r, err
|
||||
}
|
||||
r++
|
||||
}
|
||||
if rs.Err() != nil {
|
||||
return r, rs.Err()
|
||||
}
|
||||
|
||||
return r, nil
|
||||
}()
|
||||
deleted += d
|
||||
if err != nil {
|
||||
return deleted, err
|
||||
}
|
||||
|
||||
if d < options.BatchSize {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return deleted, err
|
||||
}
|
||||
|
||||
count, err := db.db.Delete_SerialNumber_By_ExpiresAt_LessOrEqual(ctx, dbx.SerialNumber_ExpiresAt(now.UTC()))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
|
3
scripts/testdata/satellite-config.yaml.lock
vendored
3
scripts/testdata/satellite-config.yaml.lock
vendored
@ -157,6 +157,9 @@ contact.external-address: ""
|
||||
# macaroon revocation cache expiration
|
||||
# database-options.revocations-cache.expiration: 5m0s
|
||||
|
||||
# number of records to delete per delete execution. 0 indicates no limit. applicable to cockroach DB only.
|
||||
# db-cleanup.batch-size: 1000
|
||||
|
||||
# how often to delete expired serial numbers
|
||||
# db-cleanup.serials-interval: 4h0m0s
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user