diff --git a/cmd/satellite/api.go b/cmd/satellite/api.go index 5533008f1..4c9d36111 100644 --- a/cmd/satellite/api.go +++ b/cmd/satellite/api.go @@ -10,10 +10,12 @@ import ( "storj.io/storj/pkg/process" "storj.io/storj/pkg/revocation" + "storj.io/storj/private/context2" "storj.io/storj/private/version" "storj.io/storj/satellite" "storj.io/storj/satellite/accounting/live" "storj.io/storj/satellite/metainfo" + "storj.io/storj/satellite/orders" "storj.io/storj/satellite/satellitedb" ) @@ -60,7 +62,12 @@ func cmdAPIRun(cmd *cobra.Command, args []string) (err error) { err = errs.Combine(err, accountingCache.Close()) }() - peer, err := satellite.NewAPI(log, identity, db, pointerDB, revocationDB, accountingCache, &runCfg.Config, version.Build) + rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), runCfg.Orders.FlushBatchSize) + defer func() { + err = errs.Combine(err, rollupsWriteCache.CloseAndFlush(context2.WithoutCancellation(ctx))) + }() + + peer, err := satellite.NewAPI(log, identity, db, pointerDB, revocationDB, accountingCache, rollupsWriteCache, &runCfg.Config, version.Build) if err != nil { return err } diff --git a/cmd/satellite/main.go b/cmd/satellite/main.go index acd2269b6..8a70e3a5a 100644 --- a/cmd/satellite/main.go +++ b/cmd/satellite/main.go @@ -23,10 +23,12 @@ import ( "storj.io/storj/pkg/cfgstruct" "storj.io/storj/pkg/process" "storj.io/storj/pkg/revocation" + "storj.io/storj/private/context2" "storj.io/storj/private/version" "storj.io/storj/satellite" "storj.io/storj/satellite/accounting/live" "storj.io/storj/satellite/metainfo" + "storj.io/storj/satellite/orders" "storj.io/storj/satellite/satellitedb" ) @@ -222,7 +224,12 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) { err = errs.Combine(err, liveAccounting.Close()) }() - peer, err := satellite.New(log, identity, db, pointerDB, revocationDB, liveAccounting, version.Build, &runCfg.Config) + rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), runCfg.Orders.FlushBatchSize) + defer func() { + err = errs.Combine(err, rollupsWriteCache.CloseAndFlush(context2.WithoutCancellation(ctx))) + }() + + peer, err := satellite.New(log, identity, db, pointerDB, revocationDB, liveAccounting, rollupsWriteCache, version.Build, &runCfg.Config) if err != nil { return err } diff --git a/cmd/satellite/repairer.go b/cmd/satellite/repairer.go index b4524643b..b80e273ba 100644 --- a/cmd/satellite/repairer.go +++ b/cmd/satellite/repairer.go @@ -10,9 +10,11 @@ import ( "storj.io/storj/pkg/process" "storj.io/storj/pkg/revocation" + "storj.io/storj/private/context2" "storj.io/storj/private/version" "storj.io/storj/satellite" "storj.io/storj/satellite/metainfo" + "storj.io/storj/satellite/orders" "storj.io/storj/satellite/satellitedb" ) @@ -49,6 +51,11 @@ func cmdRepairerRun(cmd *cobra.Command, args []string) (err error) { err = errs.Combine(err, revocationDB.Close()) }() + rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), runCfg.Orders.FlushBatchSize) + defer func() { + err = errs.Combine(err, rollupsWriteCache.CloseAndFlush(context2.WithoutCancellation(ctx))) + }() + peer, err := satellite.NewRepairer( log, identity, @@ -58,6 +65,7 @@ func cmdRepairerRun(cmd *cobra.Command, args []string) (err error) { db.Buckets(), db.OverlayCache(), db.Orders(), + rollupsWriteCache, version.Build, &runCfg.Config, ) diff --git a/private/testplanet/satellite.go b/private/testplanet/satellite.go index b2a523a8b..42bb2b9fb 100644 --- a/private/testplanet/satellite.go +++ b/private/testplanet/satellite.go @@ -439,10 +439,12 @@ func (planet *Planet) newSatellites(count int) ([]*SatelliteSystem, error) { if err != nil { return xs, errs.Wrap(err) } - planet.databases = append(planet.databases, liveAccounting) - peer, err := satellite.New(log, identity, db, pointerDB, revocationDB, liveAccounting, versionInfo, &config) + rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), config.Orders.FlushBatchSize) + planet.databases = append(planet.databases, rollupsWriteCacheCloser{rollupsWriteCache}) + + peer, err := satellite.New(log, identity, db, pointerDB, revocationDB, liveAccounting, rollupsWriteCache, versionInfo, &config) if err != nil { return xs, err } @@ -558,7 +560,10 @@ func (planet *Planet) newAPI(count int, identity *identity.FullIdentity, db sate } planet.databases = append(planet.databases, liveAccounting) - return satellite.NewAPI(log, identity, db, pointerDB, revocationDB, liveAccounting, &config, versionInfo) + rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), config.Orders.FlushBatchSize) + planet.databases = append(planet.databases, rollupsWriteCacheCloser{rollupsWriteCache}) + + return satellite.NewAPI(log, identity, db, pointerDB, revocationDB, liveAccounting, rollupsWriteCache, &config, versionInfo) } func (planet *Planet) newRepairer(count int, identity *identity.FullIdentity, db satellite.DB, pointerDB metainfo.PointerDB, config satellite.Config, @@ -571,5 +576,16 @@ func (planet *Planet) newRepairer(count int, identity *identity.FullIdentity, db return nil, errs.Wrap(err) } - return satellite.NewRepairer(log, identity, pointerDB, revocationDB, db.RepairQueue(), db.Buckets(), db.OverlayCache(), db.Orders(), versionInfo, &config) + rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), config.Orders.FlushBatchSize) + planet.databases = append(planet.databases, rollupsWriteCacheCloser{rollupsWriteCache}) + + return satellite.NewRepairer(log, identity, pointerDB, revocationDB, db.RepairQueue(), db.Buckets(), db.OverlayCache(), db.Orders(), rollupsWriteCache, versionInfo, &config) +} + +type rollupsWriteCacheCloser struct { + *orders.RollupsWriteCache +} + +func (cache rollupsWriteCacheCloser) Close() error { + return cache.RollupsWriteCache.CloseAndFlush(context.TODO()) } diff --git a/satellite/api.go b/satellite/api.go index dc104e4b0..1b4762172 100644 --- a/satellite/api.go +++ b/satellite/api.go @@ -147,7 +147,9 @@ type API struct { } // NewAPI creates a new satellite API process -func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo.PointerDB, revocationDB extensions.RevocationDB, liveAccounting accounting.Cache, config *Config, versionInfo version.Info) (*API, error) { +func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, + pointerDB metainfo.PointerDB, revocationDB extensions.RevocationDB, liveAccounting accounting.Cache, rollupsWriteCache *orders.RollupsWriteCache, + config *Config, versionInfo version.Info) (*API, error) { peer := &API{ Log: log, Identity: full, @@ -237,9 +239,8 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metai } { // setup orders - ordersWriteCache := orders.NewRollupsWriteCache(log, peer.DB.Orders(), config.Orders.FlushBatchSize) - peer.Orders.DB = ordersWriteCache - peer.Orders.Chore = orders.NewChore(log.Named("orders chore"), ordersWriteCache, config.Orders) + peer.Orders.DB = rollupsWriteCache + peer.Orders.Chore = orders.NewChore(log.Named("orders chore"), rollupsWriteCache, config.Orders) satelliteSignee := signing.SigneeFromPeerIdentity(peer.Identity.PeerIdentity()) peer.Orders.Endpoint = orders.NewEndpoint( peer.Log.Named("orders:endpoint"), diff --git a/satellite/core.go b/satellite/core.go index 72c38c32d..7fcf23bb5 100644 --- a/satellite/core.go +++ b/satellite/core.go @@ -128,7 +128,10 @@ type Core struct { } // New creates a new satellite -func New(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo.PointerDB, revocationDB extensions.RevocationDB, liveAccounting accounting.Cache, versionInfo version.Info, config *Config) (*Core, error) { +func New(log *zap.Logger, full *identity.FullIdentity, db DB, + pointerDB metainfo.PointerDB, revocationDB extensions.RevocationDB, liveAccounting accounting.Cache, + rollupsWriteCache *orders.RollupsWriteCache, + versionInfo version.Info, config *Config) (*Core, error) { peer := &Core{ Log: log, Identity: full, @@ -194,9 +197,8 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo } { // setup orders - ordersWriteCache := orders.NewRollupsWriteCache(log, peer.DB.Orders(), config.Orders.FlushBatchSize) - peer.Orders.DB = ordersWriteCache - peer.Orders.Chore = orders.NewChore(log.Named("orders chore"), ordersWriteCache, config.Orders) + peer.Orders.DB = rollupsWriteCache + peer.Orders.Chore = orders.NewChore(log.Named("orders chore"), rollupsWriteCache, config.Orders) peer.Orders.Service = orders.NewService( peer.Log.Named("orders:service"), signing.SignerFromFullIdentity(peer.Identity), @@ -454,6 +456,10 @@ func (peer *Core) Close() error { // TODO: ensure that Close can be called on nil-s that way this code won't need the checks. + if peer.Orders.Chore != nil { + errlist.Add(peer.Orders.Chore.Close()) + } + // close servers, to avoid new connections to closing subsystems if peer.DowntimeTracking.EstimationChore != nil { errlist.Add(peer.DowntimeTracking.EstimationChore.Close()) @@ -509,9 +515,6 @@ func (peer *Core) Close() error { if peer.Metainfo.Loop != nil { errlist.Add(peer.Metainfo.Loop.Close()) } - if peer.Orders.Chore.Loop != nil { - errlist.Add(peer.Orders.Chore.Close()) - } return errlist.Err() } diff --git a/satellite/orders/chore.go b/satellite/orders/rollups_chore.go similarity index 66% rename from satellite/orders/chore.go rename to satellite/orders/rollups_chore.go index 12a665a6a..921dbe428 100644 --- a/satellite/orders/chore.go +++ b/satellite/orders/rollups_chore.go @@ -15,17 +15,17 @@ import ( // // architecture: Chore type Chore struct { - log *zap.Logger - ordersWriteCache *RollupsWriteCache - Loop *sync2.Cycle + log *zap.Logger + rollupsWriteCache *RollupsWriteCache + Loop *sync2.Cycle } // NewChore creates new chore for flushing the orders write cache to the database. -func NewChore(log *zap.Logger, ordersWriteCache *RollupsWriteCache, config Config) *Chore { +func NewChore(log *zap.Logger, rollupsWriteCache *RollupsWriteCache, config Config) *Chore { return &Chore{ - log: log, - ordersWriteCache: ordersWriteCache, - Loop: sync2.NewCycle(config.FlushInterval), + log: log, + rollupsWriteCache: rollupsWriteCache, + Loop: sync2.NewCycle(config.FlushInterval), } } @@ -33,7 +33,7 @@ func NewChore(log *zap.Logger, ordersWriteCache *RollupsWriteCache, config Confi func (chore *Chore) Run(ctx context.Context) (err error) { defer mon.Task()(&ctx)(&err) return chore.Loop.Run(ctx, func(ctx context.Context) error { - chore.ordersWriteCache.FlushToDB(ctx) + chore.rollupsWriteCache.Flush(ctx) return nil }) } diff --git a/satellite/orders/rollups_write_cache.go b/satellite/orders/rollups_write_cache.go index 2383cd74f..8a86861f5 100644 --- a/satellite/orders/rollups_write_cache.go +++ b/satellite/orders/rollups_write_cache.go @@ -35,13 +35,15 @@ type RollupData map[CacheKey]CacheData // RollupsWriteCache stores information needed to update bucket bandwidth rollups type RollupsWriteCache struct { DB - batchSize int - currentSize int - latestTime time.Time + batchSize int + wg sync.WaitGroup + log *zap.Logger - log *zap.Logger mu sync.Mutex pendingRollups RollupData + currentSize int + latestTime time.Time + stopped bool nextFlushCompletion *sync2.Fence } @@ -59,14 +61,12 @@ func NewRollupsWriteCache(log *zap.Logger, db DB, batchSize int) *RollupsWriteCa // UpdateBucketBandwidthAllocation updates the rollups cache adding allocated data for a bucket bandwidth rollup func (cache *RollupsWriteCache) UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error { - cache.updateCacheValue(ctx, projectID, bucketName, action, amount, 0, intervalStart.UTC()) - return nil + return cache.updateCacheValue(ctx, projectID, bucketName, action, amount, 0, intervalStart.UTC()) } // UpdateBucketBandwidthInline updates the rollups cache adding inline data for a bucket bandwidth rollup func (cache *RollupsWriteCache) UpdateBucketBandwidthInline(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error { - cache.updateCacheValue(ctx, projectID, bucketName, action, 0, amount, intervalStart.UTC()) - return nil + return cache.updateCacheValue(ctx, projectID, bucketName, action, 0, amount, intervalStart.UTC()) } // resetCache should only be called after you have acquired the cache lock. It @@ -82,17 +82,28 @@ func (cache *RollupsWriteCache) resetCache() (RollupData, time.Time, int) { return pendingRollups, latestTime, oldSize } -// FlushToDB resets cache then flushes the everything in the rollups write cache to the database -func (cache *RollupsWriteCache) FlushToDB(ctx context.Context) { +// Flush resets cache then flushes the everything in the rollups write cache to the database +func (cache *RollupsWriteCache) Flush(ctx context.Context) { defer mon.Task()(&ctx)(nil) cache.mu.Lock() - defer cache.mu.Unlock() pendingRollups, latestTime, oldSize := cache.resetCache() - go cache.flushToDB(ctx, pendingRollups, latestTime, oldSize) + cache.mu.Unlock() + cache.flush(ctx, pendingRollups, latestTime, oldSize) } -// flushToDB flushes the everything in the rollups write cache to the database -func (cache *RollupsWriteCache) flushToDB(ctx context.Context, pendingRollups RollupData, latestTime time.Time, oldSize int) { +// CloseAndFlush flushes anything in the cache and marks the cache as stopped. +func (cache *RollupsWriteCache) CloseAndFlush(ctx context.Context) error { + cache.mu.Lock() + cache.stopped = true + cache.mu.Unlock() + cache.wg.Wait() + + cache.Flush(ctx) + return nil +} + +// flush flushes the everything in the rollups write cache to the database +func (cache *RollupsWriteCache) flush(ctx context.Context, pendingRollups RollupData, latestTime time.Time, oldSize int) { defer mon.Task()(&ctx)(nil) rollups := make([]BucketBandwidthRollup, 0, oldSize) @@ -120,12 +131,16 @@ func (cache *RollupsWriteCache) flushToDB(ctx context.Context, pendingRollups Ro completion.Release() } -func (cache *RollupsWriteCache) updateCacheValue(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, allocated, inline int64, intervalStart time.Time) { +func (cache *RollupsWriteCache) updateCacheValue(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, allocated, inline int64, intervalStart time.Time) error { defer mon.Task()(&ctx)(nil) cache.mu.Lock() defer cache.mu.Unlock() + if cache.stopped { + return Error.New("RollupsWriteCache is stopped") + } + if intervalStart.After(cache.latestTime) { cache.latestTime = intervalStart } @@ -145,13 +160,20 @@ func (cache *RollupsWriteCache) updateCacheValue(ctx context.Context, projectID cache.pendingRollups[key] = data if cache.currentSize < cache.batchSize { - return + return nil } pendingRollups, latestTime, oldSize := cache.resetCache() - go cache.flushToDB(ctx, pendingRollups, latestTime, oldSize) + + cache.wg.Add(1) + go func() { + cache.flush(ctx, pendingRollups, latestTime, oldSize) + cache.wg.Done() + }() + + return nil } -// OnNextFlush waits until the next time a flushToDB call is made, then closes +// OnNextFlush waits until the next time a flush call is made, then closes // the returned channel. func (cache *RollupsWriteCache) OnNextFlush() <-chan struct{} { cache.mu.Lock() diff --git a/satellite/orders/rollups_write_cache_test.go b/satellite/orders/rollups_write_cache_test.go index e9ad2b761..21c23faa7 100644 --- a/satellite/orders/rollups_write_cache_test.go +++ b/satellite/orders/rollups_write_cache_test.go @@ -36,23 +36,23 @@ func getTotalBandwidthInGB(ctx context.Context, accountingDB accounting.ProjectA return total, nil } -// TestOrdersWriteCacheBatchLimitReached makes sure bandwidth rollup values are not written to the +// TestRollupsWriteCacheBatchLimitReached makes sure bandwidth rollup values are not written to the // db until the batch size is reached. -func TestOrdersWriteCacheBatchLimitReached(t *testing.T) { +func TestRollupsWriteCacheBatchLimitReached(t *testing.T) { satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { useBatchSize := 10 amount := (memory.MB * 500).Int64() projectID := testrand.UUID() startTime := time.Now().UTC() - owc := orders.NewRollupsWriteCache(zaptest.NewLogger(t), db.Orders(), useBatchSize) + rwc := orders.NewRollupsWriteCache(zaptest.NewLogger(t), db.Orders(), useBatchSize) accountingDB := db.ProjectAccounting() // use different bucketName for each write, so they don't get aggregated yet for i := 0; i < useBatchSize-1; i++ { bucketName := fmt.Sprintf("my_files_%d", i) - err := owc.UpdateBucketBandwidthAllocation(ctx, projectID, []byte(bucketName), pb.PieceAction_GET, amount, startTime) + err := rwc.UpdateBucketBandwidthAllocation(ctx, projectID, []byte(bucketName), pb.PieceAction_GET, amount, startTime) require.NoError(t, err) // check that nothing was actually written since it should just be stored @@ -61,9 +61,9 @@ func TestOrdersWriteCacheBatchLimitReached(t *testing.T) { require.Equal(t, int64(0), total) } - whenDone := owc.OnNextFlush() + whenDone := rwc.OnNextFlush() // write one more rollup record to hit the threshold - err := owc.UpdateBucketBandwidthAllocation(ctx, projectID, []byte("my_files_last"), pb.PieceAction_GET, amount, startTime) + err := rwc.UpdateBucketBandwidthAllocation(ctx, projectID, []byte("my_files_last"), pb.PieceAction_GET, amount, startTime) require.NoError(t, err) // make sure flushing is done @@ -80,9 +80,9 @@ func TestOrdersWriteCacheBatchLimitReached(t *testing.T) { }) } -// TestOrdersWriteCacheBatchChore makes sure bandwidth rollup values are not written to the +// TestRollupsWriteCacheBatchChore makes sure bandwidth rollup values are not written to the // db until the chore flushes the DB (assuming the batch size is not reached). -func TestOrdersWriteCacheBatchChore(t *testing.T) { +func TestRollupsWriteCacheBatchChore(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, }, @@ -109,10 +109,9 @@ func TestOrdersWriteCacheBatchChore(t *testing.T) { require.Equal(t, int64(0), total) } - owc := ordersDB.(*orders.RollupsWriteCache) - whenDone := owc.OnNextFlush() + rwc := ordersDB.(*orders.RollupsWriteCache) + whenDone := rwc.OnNextFlush() // wait for Loop to complete - planet.Satellites[0].Orders.Chore.Loop.Restart() planet.Satellites[0].Orders.Chore.Loop.TriggerWait() // make sure flushing is done diff --git a/satellite/repairer.go b/satellite/repairer.go index dad43bea0..8ac2794e1 100644 --- a/satellite/repairer.go +++ b/satellite/repairer.go @@ -49,8 +49,12 @@ type Repairer struct { } // NewRepairer creates a new repairer peer. -func NewRepairer(log *zap.Logger, full *identity.FullIdentity, pointerDB metainfo.PointerDB, revocationDB extensions.RevocationDB, repairQueue queue.RepairQueue, - bucketsDB metainfo.BucketsDB, overlayCache overlay.DB, ordersDB orders.DB, versionInfo version.Info, config *Config) (*Repairer, error) { +func NewRepairer(log *zap.Logger, full *identity.FullIdentity, + pointerDB metainfo.PointerDB, + revocationDB extensions.RevocationDB, repairQueue queue.RepairQueue, + bucketsDB metainfo.BucketsDB, overlayCache overlay.DB, ordersDB orders.DB, + rollupsWriteCache *orders.RollupsWriteCache, + versionInfo version.Info, config *Config) (*Repairer, error) { peer := &Repairer{ Log: log, Identity: full, @@ -84,9 +88,8 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity, pointerDB metainf } { // setup orders - ordersWriteCache := orders.NewRollupsWriteCache(log, ordersDB, config.Orders.FlushBatchSize) - peer.Orders.DB = ordersWriteCache - peer.Orders.Chore = orders.NewChore(log.Named("orders chore"), ordersWriteCache, config.Orders) + peer.Orders.DB = rollupsWriteCache + peer.Orders.Chore = orders.NewChore(log.Named("orders chore"), rollupsWriteCache, config.Orders) peer.Orders.Service = orders.NewService( log.Named("orders"), signing.SignerFromFullIdentity(peer.Identity),