satellite/dbcleanup: delete expired serials from satellite (#2867)

Creates a new chore, dbcleanup, which can be used for routine deletion of items from the satellite database and adds functionality for deletion of expired serial numbers
This commit is contained in:
Cameron 2019-08-27 13:12:38 -04:00 committed by GitHub
parent c309bd3fec
commit 599324c364
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 193 additions and 1 deletions

View File

@ -24,6 +24,7 @@ import (
"storj.io/storj/satellite/audit"
"storj.io/storj/satellite/console"
"storj.io/storj/satellite/console/consoleweb"
"storj.io/storj/satellite/dbcleanup"
"storj.io/storj/satellite/discovery"
"storj.io/storj/satellite/gc"
"storj.io/storj/satellite/mailservice"
@ -174,6 +175,9 @@ func (planet *Planet) newSatellites(count int) ([]*satellite.Peer, error) {
FalsePositiveRate: 0.1,
ConcurrentSends: 1,
},
DBCleanup: dbcleanup.Config{
SerialsInterval: time.Hour,
},
Tally: tally.Config{
Interval: 30 * time.Second,
},

View File

@ -0,0 +1,72 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package dbcleanup
import (
"context"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/internal/sync2"
"storj.io/storj/satellite/orders"
)
var (
// Error the default dbcleanup errs class.
Error = errs.Class("dbcleanup error")
mon = monkit.Package()
)
// Config defines configuration struct for dbcleanup chore.
type Config struct {
SerialsInterval time.Duration `help:"how often to delete expired serial numbers" default:"24h"`
}
// Chore for deleting DB entries that are no longer needed.
type Chore struct {
log *zap.Logger
orders orders.DB
Serials sync2.Cycle
}
// NewChore creates new chore for deleting DB entries.
func NewChore(log *zap.Logger, orders orders.DB, config Config) *Chore {
return &Chore{
log: log,
orders: orders,
Serials: *sync2.NewCycle(config.SerialsInterval),
}
}
// Run starts the db cleanup chore.
func (chore *Chore) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
return chore.Serials.Run(ctx, chore.deleteExpiredSerials)
}
func (chore *Chore) deleteExpiredSerials(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
chore.log.Debug("deleting expired serial numbers")
deleted, err := chore.orders.DeleteExpiredSerials(ctx, time.Now().UTC())
if err != nil {
chore.log.Error("deleting expired serial numbers", zap.Error(err))
return nil
}
chore.log.Debug("expired serials deleted", zap.Int("items deleted", deleted))
return nil
}
// Close stops the dbcleanup chore.
func (chore *Chore) Close() error {
chore.Serials.Close()
return nil
}

View File

@ -0,0 +1,68 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package dbcleanup_test
import (
"testing"
"time"
"github.com/stretchr/testify/require"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testplanet"
"storj.io/storj/pkg/storj"
)
func TestDeleteExpiredSerials(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
node := planet.StorageNodes[0].ID()
satellite.DBCleanup.Chore.Serials.Pause()
var expiredSerials []storj.SerialNumber
for i := 0; i < 5; i++ {
expiredSerials = append(expiredSerials, storj.SerialNumber{byte(i)})
}
var freshSerials []storj.SerialNumber
for i := 5; i < 10; i++ {
freshSerials = append(freshSerials, storj.SerialNumber{byte(i)})
}
yesterday := time.Now().UTC().Add(-24 * time.Hour)
for _, serial := range expiredSerials {
err := satellite.DB.Orders().CreateSerialInfo(ctx, serial, []byte("bucket"), yesterday)
require.NoError(t, err)
_, err = satellite.DB.Orders().UseSerialNumber(ctx, serial, node)
require.NoError(t, err)
}
tomorrow := yesterday.Add(48 * time.Hour)
for _, serial := range freshSerials {
err := satellite.DB.Orders().CreateSerialInfo(ctx, serial, []byte("bucket"), tomorrow)
require.NoError(t, err)
_, err = satellite.DB.Orders().UseSerialNumber(ctx, serial, node)
require.NoError(t, err)
}
// trigger expired serial number deletion
satellite.DBCleanup.Chore.Serials.TriggerWait()
// check expired serial numbers have been deleted from serial_numbers and used_serials
for _, serial := range expiredSerials {
_, err := satellite.DB.Orders().UseSerialNumber(ctx, serial, node)
require.EqualError(t, err, "serial number: serial number not found")
}
// check fresh serial numbers have not been deleted
for _, serial := range freshSerials {
_, err := satellite.DB.Orders().UseSerialNumber(ctx, serial, node)
require.EqualError(t, err, "serial number: serial number already used")
}
})
}

View File

@ -29,6 +29,8 @@ type DB interface {
UseSerialNumber(ctx context.Context, serialNumber storj.SerialNumber, storageNodeID storj.NodeID) ([]byte, error)
// UnuseSerialNumber removes pair serial number -> storage node id from database
UnuseSerialNumber(ctx context.Context, serialNumber storj.SerialNumber, storageNodeID storj.NodeID) error
// DeleteExpiredSerials deletes all expired serials in serial_number and used_serials table.
DeleteExpiredSerials(ctx context.Context, now time.Time) (_ int, err error)
// UpdateBucketBandwidthAllocation updates 'allocated' bandwidth for given bucket
UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error

View File

@ -48,5 +48,13 @@ func TestSerialNumbers(t *testing.T) {
require.Error(t, err)
require.True(t, orders.ErrUsingSerialNumber.Has(err))
require.Empty(t, bucketID)
deleted, err := ordersDB.DeleteExpiredSerials(ctx, time.Now().UTC())
require.NoError(t, err)
require.Equal(t, deleted, 1)
// check serial number has been deleted from serial_numbers and used_serials
_, err = ordersDB.UseSerialNumber(ctx, storj.SerialNumber{1}, storj.NodeID{1})
require.EqualError(t, err, "serial number: serial number not found")
})
}

View File

@ -40,6 +40,7 @@ import (
"storj.io/storj/satellite/console"
"storj.io/storj/satellite/console/consoleauth"
"storj.io/storj/satellite/console/consoleweb"
"storj.io/storj/satellite/dbcleanup"
"storj.io/storj/satellite/discovery"
"storj.io/storj/satellite/gc"
"storj.io/storj/satellite/inspector"
@ -121,6 +122,8 @@ type Config struct {
GarbageCollection gc.Config
DBCleanup dbcleanup.Config
Tally tally.Config
Rollup rollup.Config
LiveAccounting live.Config
@ -197,6 +200,10 @@ type Peer struct {
Service *gc.Service
}
DBCleanup struct {
Chore *dbcleanup.Chore
}
Accounting struct {
Tally *tally.Service
Rollup *rollup.Service
@ -503,6 +510,11 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
)
}
{ // setup db cleanup
log.Debug("Setting up db cleanup")
peer.DBCleanup.Chore = dbcleanup.NewChore(peer.Log.Named("dbcleanup"), peer.DB.Orders(), config.DBCleanup)
}
{ // setup accounting
log.Debug("Setting up accounting")
peer.Accounting.Tally = tally.New(peer.Log.Named("tally"), peer.DB.StoragenodeAccounting(), peer.DB.ProjectAccounting(), peer.LiveAccounting.Service, peer.Metainfo.Service, peer.Overlay.Service, 0, config.Tally.Interval)
@ -721,6 +733,9 @@ func (peer *Peer) Run(ctx context.Context) (err error) {
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Marketing.Endpoint.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.DBCleanup.Chore.Run(ctx))
})
return group.Wait()
}
@ -753,6 +768,9 @@ func (peer *Peer) Close() error {
}
// close services in reverse initialization order
if peer.DBCleanup.Chore != nil {
errlist.Add(peer.DBCleanup.Chore.Close())
}
if peer.Repair.Repairer != nil {
errlist.Add(peer.Repair.Repairer.Close())
}

View File

@ -760,6 +760,13 @@ func (m *lockedOrders) CreateSerialInfo(ctx context.Context, serialNumber storj.
return m.db.CreateSerialInfo(ctx, serialNumber, bucketID, limitExpiration)
}
// DeleteExpiredSerials deletes all expired serials in serial_number and used_serials table.
func (m *lockedOrders) DeleteExpiredSerials(ctx context.Context, now time.Time) (_ int, err error) {
m.Lock()
defer m.Unlock()
return m.db.DeleteExpiredSerials(ctx, now)
}
// GetBucketBandwidth gets total bucket bandwidth from period of time
func (m *lockedOrders) GetBucketBandwidth(ctx context.Context, projectID uuid.UUID, bucketName []byte, from time.Time, to time.Time) (int64, error) {
m.Lock()
@ -969,7 +976,7 @@ type lockedPeerIdentities struct {
}
// BatchGet gets all nodes peer identities in a transaction
func (m *lockedPeerIdentities) BatchGet(ctx context.Context, a1 storj.NodeIDList) (_ []*identity.PeerIdentity, err error) {
func (m *lockedPeerIdentities) BatchGet(ctx context.Context, a1 storj.NodeIDList) ([]*identity.PeerIdentity, error) {
m.Lock()
defer m.Unlock()
return m.db.BatchGet(ctx, a1)

View File

@ -43,6 +43,16 @@ func (db *ordersDB) CreateSerialInfo(ctx context.Context, serialNumber storj.Ser
return err
}
// DeleteExpiredSerials deletes all expired serials in serial_number and used_serials table.
func (db *ordersDB) DeleteExpiredSerials(ctx context.Context, now time.Time) (_ int, err error) {
defer mon.Task()(&ctx)(&err)
count, err := db.db.Delete_SerialNumber_By_ExpiresAt_LessOrEqual(ctx, dbx.SerialNumber_ExpiresAt(now))
if err != nil {
return 0, err
}
return int(count), nil
}
// UseSerialNumber creates serial number entry in database
func (db *ordersDB) UseSerialNumber(ctx context.Context, serialNumber storj.SerialNumber, storageNodeID storj.NodeID) (_ []byte, err error) {
defer mon.Task()(&ctx)(&err)

View File

@ -46,6 +46,9 @@
# satellite database connection string
# database: postgres://
# how often to delete expired serial numbers
# db-cleanup.serials-interval: 24h0m0s
# Maximum Database Connection Lifetime, -1ns means the stdlib default
# db.conn_max_lifetime: -1ns