satellite/orders: Flush all pending bandwidth rollup writes on shutdown

Currently we risk losing pending bandwidth rollup writes even on a clean
shutdown. This change ensures that all pending writes are actually
written to the db when shutting down the satellite.

Change-Id: Ideab62fa9808937d3dce9585c52405d8c8a0e703
This commit is contained in:
Isaac Hess 2020-01-17 15:55:53 -07:00
parent 960e103082
commit 40a890639d
10 changed files with 125 additions and 59 deletions

View File

@ -10,10 +10,12 @@ import (
"storj.io/storj/pkg/process"
"storj.io/storj/pkg/revocation"
"storj.io/storj/private/context2"
"storj.io/storj/private/version"
"storj.io/storj/satellite"
"storj.io/storj/satellite/accounting/live"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/satellitedb"
)
@ -60,7 +62,12 @@ func cmdAPIRun(cmd *cobra.Command, args []string) (err error) {
err = errs.Combine(err, accountingCache.Close())
}()
peer, err := satellite.NewAPI(log, identity, db, pointerDB, revocationDB, accountingCache, &runCfg.Config, version.Build)
rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), runCfg.Orders.FlushBatchSize)
defer func() {
err = errs.Combine(err, rollupsWriteCache.CloseAndFlush(context2.WithoutCancellation(ctx)))
}()
peer, err := satellite.NewAPI(log, identity, db, pointerDB, revocationDB, accountingCache, rollupsWriteCache, &runCfg.Config, version.Build)
if err != nil {
return err
}

View File

@ -23,10 +23,12 @@ import (
"storj.io/storj/pkg/cfgstruct"
"storj.io/storj/pkg/process"
"storj.io/storj/pkg/revocation"
"storj.io/storj/private/context2"
"storj.io/storj/private/version"
"storj.io/storj/satellite"
"storj.io/storj/satellite/accounting/live"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/satellitedb"
)
@ -222,7 +224,12 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
err = errs.Combine(err, liveAccounting.Close())
}()
peer, err := satellite.New(log, identity, db, pointerDB, revocationDB, liveAccounting, version.Build, &runCfg.Config)
rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), runCfg.Orders.FlushBatchSize)
defer func() {
err = errs.Combine(err, rollupsWriteCache.CloseAndFlush(context2.WithoutCancellation(ctx)))
}()
peer, err := satellite.New(log, identity, db, pointerDB, revocationDB, liveAccounting, rollupsWriteCache, version.Build, &runCfg.Config)
if err != nil {
return err
}

View File

@ -10,9 +10,11 @@ import (
"storj.io/storj/pkg/process"
"storj.io/storj/pkg/revocation"
"storj.io/storj/private/context2"
"storj.io/storj/private/version"
"storj.io/storj/satellite"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/satellitedb"
)
@ -49,6 +51,11 @@ func cmdRepairerRun(cmd *cobra.Command, args []string) (err error) {
err = errs.Combine(err, revocationDB.Close())
}()
rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), runCfg.Orders.FlushBatchSize)
defer func() {
err = errs.Combine(err, rollupsWriteCache.CloseAndFlush(context2.WithoutCancellation(ctx)))
}()
peer, err := satellite.NewRepairer(
log,
identity,
@ -58,6 +65,7 @@ func cmdRepairerRun(cmd *cobra.Command, args []string) (err error) {
db.Buckets(),
db.OverlayCache(),
db.Orders(),
rollupsWriteCache,
version.Build,
&runCfg.Config,
)

View File

@ -439,10 +439,12 @@ func (planet *Planet) newSatellites(count int) ([]*SatelliteSystem, error) {
if err != nil {
return xs, errs.Wrap(err)
}
planet.databases = append(planet.databases, liveAccounting)
peer, err := satellite.New(log, identity, db, pointerDB, revocationDB, liveAccounting, versionInfo, &config)
rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), config.Orders.FlushBatchSize)
planet.databases = append(planet.databases, rollupsWriteCacheCloser{rollupsWriteCache})
peer, err := satellite.New(log, identity, db, pointerDB, revocationDB, liveAccounting, rollupsWriteCache, versionInfo, &config)
if err != nil {
return xs, err
}
@ -558,7 +560,10 @@ func (planet *Planet) newAPI(count int, identity *identity.FullIdentity, db sate
}
planet.databases = append(planet.databases, liveAccounting)
return satellite.NewAPI(log, identity, db, pointerDB, revocationDB, liveAccounting, &config, versionInfo)
rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), config.Orders.FlushBatchSize)
planet.databases = append(planet.databases, rollupsWriteCacheCloser{rollupsWriteCache})
return satellite.NewAPI(log, identity, db, pointerDB, revocationDB, liveAccounting, rollupsWriteCache, &config, versionInfo)
}
func (planet *Planet) newRepairer(count int, identity *identity.FullIdentity, db satellite.DB, pointerDB metainfo.PointerDB, config satellite.Config,
@ -571,5 +576,16 @@ func (planet *Planet) newRepairer(count int, identity *identity.FullIdentity, db
return nil, errs.Wrap(err)
}
return satellite.NewRepairer(log, identity, pointerDB, revocationDB, db.RepairQueue(), db.Buckets(), db.OverlayCache(), db.Orders(), versionInfo, &config)
rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), config.Orders.FlushBatchSize)
planet.databases = append(planet.databases, rollupsWriteCacheCloser{rollupsWriteCache})
return satellite.NewRepairer(log, identity, pointerDB, revocationDB, db.RepairQueue(), db.Buckets(), db.OverlayCache(), db.Orders(), rollupsWriteCache, versionInfo, &config)
}
type rollupsWriteCacheCloser struct {
*orders.RollupsWriteCache
}
func (cache rollupsWriteCacheCloser) Close() error {
return cache.RollupsWriteCache.CloseAndFlush(context.TODO())
}

View File

@ -147,7 +147,9 @@ type API struct {
}
// NewAPI creates a new satellite API process
func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo.PointerDB, revocationDB extensions.RevocationDB, liveAccounting accounting.Cache, config *Config, versionInfo version.Info) (*API, error) {
func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
pointerDB metainfo.PointerDB, revocationDB extensions.RevocationDB, liveAccounting accounting.Cache, rollupsWriteCache *orders.RollupsWriteCache,
config *Config, versionInfo version.Info) (*API, error) {
peer := &API{
Log: log,
Identity: full,
@ -237,9 +239,8 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metai
}
{ // setup orders
ordersWriteCache := orders.NewRollupsWriteCache(log, peer.DB.Orders(), config.Orders.FlushBatchSize)
peer.Orders.DB = ordersWriteCache
peer.Orders.Chore = orders.NewChore(log.Named("orders chore"), ordersWriteCache, config.Orders)
peer.Orders.DB = rollupsWriteCache
peer.Orders.Chore = orders.NewChore(log.Named("orders chore"), rollupsWriteCache, config.Orders)
satelliteSignee := signing.SigneeFromPeerIdentity(peer.Identity.PeerIdentity())
peer.Orders.Endpoint = orders.NewEndpoint(
peer.Log.Named("orders:endpoint"),

View File

@ -128,7 +128,10 @@ type Core struct {
}
// New creates a new satellite
func New(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo.PointerDB, revocationDB extensions.RevocationDB, liveAccounting accounting.Cache, versionInfo version.Info, config *Config) (*Core, error) {
func New(log *zap.Logger, full *identity.FullIdentity, db DB,
pointerDB metainfo.PointerDB, revocationDB extensions.RevocationDB, liveAccounting accounting.Cache,
rollupsWriteCache *orders.RollupsWriteCache,
versionInfo version.Info, config *Config) (*Core, error) {
peer := &Core{
Log: log,
Identity: full,
@ -194,9 +197,8 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo
}
{ // setup orders
ordersWriteCache := orders.NewRollupsWriteCache(log, peer.DB.Orders(), config.Orders.FlushBatchSize)
peer.Orders.DB = ordersWriteCache
peer.Orders.Chore = orders.NewChore(log.Named("orders chore"), ordersWriteCache, config.Orders)
peer.Orders.DB = rollupsWriteCache
peer.Orders.Chore = orders.NewChore(log.Named("orders chore"), rollupsWriteCache, config.Orders)
peer.Orders.Service = orders.NewService(
peer.Log.Named("orders:service"),
signing.SignerFromFullIdentity(peer.Identity),
@ -454,6 +456,10 @@ func (peer *Core) Close() error {
// TODO: ensure that Close can be called on nil-s that way this code won't need the checks.
if peer.Orders.Chore != nil {
errlist.Add(peer.Orders.Chore.Close())
}
// close servers, to avoid new connections to closing subsystems
if peer.DowntimeTracking.EstimationChore != nil {
errlist.Add(peer.DowntimeTracking.EstimationChore.Close())
@ -509,9 +515,6 @@ func (peer *Core) Close() error {
if peer.Metainfo.Loop != nil {
errlist.Add(peer.Metainfo.Loop.Close())
}
if peer.Orders.Chore.Loop != nil {
errlist.Add(peer.Orders.Chore.Close())
}
return errlist.Err()
}

View File

@ -16,15 +16,15 @@ import (
// architecture: Chore
type Chore struct {
log *zap.Logger
ordersWriteCache *RollupsWriteCache
rollupsWriteCache *RollupsWriteCache
Loop *sync2.Cycle
}
// NewChore creates new chore for flushing the orders write cache to the database.
func NewChore(log *zap.Logger, ordersWriteCache *RollupsWriteCache, config Config) *Chore {
func NewChore(log *zap.Logger, rollupsWriteCache *RollupsWriteCache, config Config) *Chore {
return &Chore{
log: log,
ordersWriteCache: ordersWriteCache,
rollupsWriteCache: rollupsWriteCache,
Loop: sync2.NewCycle(config.FlushInterval),
}
}
@ -33,7 +33,7 @@ func NewChore(log *zap.Logger, ordersWriteCache *RollupsWriteCache, config Confi
func (chore *Chore) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
return chore.Loop.Run(ctx, func(ctx context.Context) error {
chore.ordersWriteCache.FlushToDB(ctx)
chore.rollupsWriteCache.Flush(ctx)
return nil
})
}

View File

@ -36,12 +36,14 @@ type RollupData map[CacheKey]CacheData
type RollupsWriteCache struct {
DB
batchSize int
currentSize int
latestTime time.Time
wg sync.WaitGroup
log *zap.Logger
mu sync.Mutex
pendingRollups RollupData
currentSize int
latestTime time.Time
stopped bool
nextFlushCompletion *sync2.Fence
}
@ -59,14 +61,12 @@ func NewRollupsWriteCache(log *zap.Logger, db DB, batchSize int) *RollupsWriteCa
// UpdateBucketBandwidthAllocation updates the rollups cache adding allocated data for a bucket bandwidth rollup
func (cache *RollupsWriteCache) UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error {
cache.updateCacheValue(ctx, projectID, bucketName, action, amount, 0, intervalStart.UTC())
return nil
return cache.updateCacheValue(ctx, projectID, bucketName, action, amount, 0, intervalStart.UTC())
}
// UpdateBucketBandwidthInline updates the rollups cache adding inline data for a bucket bandwidth rollup
func (cache *RollupsWriteCache) UpdateBucketBandwidthInline(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error {
cache.updateCacheValue(ctx, projectID, bucketName, action, 0, amount, intervalStart.UTC())
return nil
return cache.updateCacheValue(ctx, projectID, bucketName, action, 0, amount, intervalStart.UTC())
}
// resetCache should only be called after you have acquired the cache lock. It
@ -82,17 +82,28 @@ func (cache *RollupsWriteCache) resetCache() (RollupData, time.Time, int) {
return pendingRollups, latestTime, oldSize
}
// FlushToDB resets cache then flushes the everything in the rollups write cache to the database
func (cache *RollupsWriteCache) FlushToDB(ctx context.Context) {
// Flush resets cache then flushes the everything in the rollups write cache to the database
func (cache *RollupsWriteCache) Flush(ctx context.Context) {
defer mon.Task()(&ctx)(nil)
cache.mu.Lock()
defer cache.mu.Unlock()
pendingRollups, latestTime, oldSize := cache.resetCache()
go cache.flushToDB(ctx, pendingRollups, latestTime, oldSize)
cache.mu.Unlock()
cache.flush(ctx, pendingRollups, latestTime, oldSize)
}
// flushToDB flushes the everything in the rollups write cache to the database
func (cache *RollupsWriteCache) flushToDB(ctx context.Context, pendingRollups RollupData, latestTime time.Time, oldSize int) {
// CloseAndFlush flushes anything in the cache and marks the cache as stopped.
func (cache *RollupsWriteCache) CloseAndFlush(ctx context.Context) error {
cache.mu.Lock()
cache.stopped = true
cache.mu.Unlock()
cache.wg.Wait()
cache.Flush(ctx)
return nil
}
// flush flushes the everything in the rollups write cache to the database
func (cache *RollupsWriteCache) flush(ctx context.Context, pendingRollups RollupData, latestTime time.Time, oldSize int) {
defer mon.Task()(&ctx)(nil)
rollups := make([]BucketBandwidthRollup, 0, oldSize)
@ -120,12 +131,16 @@ func (cache *RollupsWriteCache) flushToDB(ctx context.Context, pendingRollups Ro
completion.Release()
}
func (cache *RollupsWriteCache) updateCacheValue(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, allocated, inline int64, intervalStart time.Time) {
func (cache *RollupsWriteCache) updateCacheValue(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, allocated, inline int64, intervalStart time.Time) error {
defer mon.Task()(&ctx)(nil)
cache.mu.Lock()
defer cache.mu.Unlock()
if cache.stopped {
return Error.New("RollupsWriteCache is stopped")
}
if intervalStart.After(cache.latestTime) {
cache.latestTime = intervalStart
}
@ -145,13 +160,20 @@ func (cache *RollupsWriteCache) updateCacheValue(ctx context.Context, projectID
cache.pendingRollups[key] = data
if cache.currentSize < cache.batchSize {
return
return nil
}
pendingRollups, latestTime, oldSize := cache.resetCache()
go cache.flushToDB(ctx, pendingRollups, latestTime, oldSize)
cache.wg.Add(1)
go func() {
cache.flush(ctx, pendingRollups, latestTime, oldSize)
cache.wg.Done()
}()
return nil
}
// OnNextFlush waits until the next time a flushToDB call is made, then closes
// OnNextFlush waits until the next time a flush call is made, then closes
// the returned channel.
func (cache *RollupsWriteCache) OnNextFlush() <-chan struct{} {
cache.mu.Lock()

View File

@ -36,23 +36,23 @@ func getTotalBandwidthInGB(ctx context.Context, accountingDB accounting.ProjectA
return total, nil
}
// TestOrdersWriteCacheBatchLimitReached makes sure bandwidth rollup values are not written to the
// TestRollupsWriteCacheBatchLimitReached makes sure bandwidth rollup values are not written to the
// db until the batch size is reached.
func TestOrdersWriteCacheBatchLimitReached(t *testing.T) {
func TestRollupsWriteCacheBatchLimitReached(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
useBatchSize := 10
amount := (memory.MB * 500).Int64()
projectID := testrand.UUID()
startTime := time.Now().UTC()
owc := orders.NewRollupsWriteCache(zaptest.NewLogger(t), db.Orders(), useBatchSize)
rwc := orders.NewRollupsWriteCache(zaptest.NewLogger(t), db.Orders(), useBatchSize)
accountingDB := db.ProjectAccounting()
// use different bucketName for each write, so they don't get aggregated yet
for i := 0; i < useBatchSize-1; i++ {
bucketName := fmt.Sprintf("my_files_%d", i)
err := owc.UpdateBucketBandwidthAllocation(ctx, projectID, []byte(bucketName), pb.PieceAction_GET, amount, startTime)
err := rwc.UpdateBucketBandwidthAllocation(ctx, projectID, []byte(bucketName), pb.PieceAction_GET, amount, startTime)
require.NoError(t, err)
// check that nothing was actually written since it should just be stored
@ -61,9 +61,9 @@ func TestOrdersWriteCacheBatchLimitReached(t *testing.T) {
require.Equal(t, int64(0), total)
}
whenDone := owc.OnNextFlush()
whenDone := rwc.OnNextFlush()
// write one more rollup record to hit the threshold
err := owc.UpdateBucketBandwidthAllocation(ctx, projectID, []byte("my_files_last"), pb.PieceAction_GET, amount, startTime)
err := rwc.UpdateBucketBandwidthAllocation(ctx, projectID, []byte("my_files_last"), pb.PieceAction_GET, amount, startTime)
require.NoError(t, err)
// make sure flushing is done
@ -80,9 +80,9 @@ func TestOrdersWriteCacheBatchLimitReached(t *testing.T) {
})
}
// TestOrdersWriteCacheBatchChore makes sure bandwidth rollup values are not written to the
// TestRollupsWriteCacheBatchChore makes sure bandwidth rollup values are not written to the
// db until the chore flushes the DB (assuming the batch size is not reached).
func TestOrdersWriteCacheBatchChore(t *testing.T) {
func TestRollupsWriteCacheBatchChore(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
},
@ -109,10 +109,9 @@ func TestOrdersWriteCacheBatchChore(t *testing.T) {
require.Equal(t, int64(0), total)
}
owc := ordersDB.(*orders.RollupsWriteCache)
whenDone := owc.OnNextFlush()
rwc := ordersDB.(*orders.RollupsWriteCache)
whenDone := rwc.OnNextFlush()
// wait for Loop to complete
planet.Satellites[0].Orders.Chore.Loop.Restart()
planet.Satellites[0].Orders.Chore.Loop.TriggerWait()
// make sure flushing is done

View File

@ -49,8 +49,12 @@ type Repairer struct {
}
// NewRepairer creates a new repairer peer.
func NewRepairer(log *zap.Logger, full *identity.FullIdentity, pointerDB metainfo.PointerDB, revocationDB extensions.RevocationDB, repairQueue queue.RepairQueue,
bucketsDB metainfo.BucketsDB, overlayCache overlay.DB, ordersDB orders.DB, versionInfo version.Info, config *Config) (*Repairer, error) {
func NewRepairer(log *zap.Logger, full *identity.FullIdentity,
pointerDB metainfo.PointerDB,
revocationDB extensions.RevocationDB, repairQueue queue.RepairQueue,
bucketsDB metainfo.BucketsDB, overlayCache overlay.DB, ordersDB orders.DB,
rollupsWriteCache *orders.RollupsWriteCache,
versionInfo version.Info, config *Config) (*Repairer, error) {
peer := &Repairer{
Log: log,
Identity: full,
@ -84,9 +88,8 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity, pointerDB metainf
}
{ // setup orders
ordersWriteCache := orders.NewRollupsWriteCache(log, ordersDB, config.Orders.FlushBatchSize)
peer.Orders.DB = ordersWriteCache
peer.Orders.Chore = orders.NewChore(log.Named("orders chore"), ordersWriteCache, config.Orders)
peer.Orders.DB = rollupsWriteCache
peer.Orders.Chore = orders.NewChore(log.Named("orders chore"), rollupsWriteCache, config.Orders)
peer.Orders.Service = orders.NewService(
log.Named("orders"),
signing.SignerFromFullIdentity(peer.Identity),