From 799b159bba8673d3c08f150e05f78f6ae4857bc3 Mon Sep 17 00:00:00 2001 From: paul cannon Date: Wed, 27 Jul 2022 18:43:02 -0500 Subject: [PATCH] satellite/reputation: offset write times by random, not by satelliteID In an effort to distribute load on the reputation database, the reputation write cache scheduled nodes to be written at a time offset by the local nodeID. The idea was that no two repair workers would have the same nodeID, so they would not tend to write to the same row at the same time. Instead, since all satellite processes share the same satellite ID (duh), this caused _all_ workers to try and write to the same row at the same time _always_. This was not ideal. This change uses a random number instead of the satellite ID. The random number is sourced from the number of nanoseconds since the Unix epoch. As long as workers are not started at the exact same nanosecond, they ought to get well-distributed offsets. Change-Id: I149bdaa6ca1ee6043cfedcf1489dd9d3e3c7a163 --- satellite/api.go | 2 +- satellite/core.go | 2 +- satellite/repairer.go | 2 +- satellite/reputation/db_test.go | 2 +- satellite/reputation/writecache.go | 21 ++++++++++--------- .../reputation/writecache_service_test.go | 4 ++-- satellite/reputation/writecache_unit_test.go | 16 +++++++++----- 7 files changed, 28 insertions(+), 21 deletions(-) diff --git a/satellite/api.go b/satellite/api.go index f3f689177..fbf47ab31 100644 --- a/satellite/api.go +++ b/satellite/api.go @@ -288,7 +288,7 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, { // setup reputation reputationDB := peer.DB.Reputation() if config.Reputation.FlushInterval > 0 { - cachingDB := reputation.NewCachingDB(log.Named("reputation:writecache"), peer.Identity.ID, reputationDB, config.Reputation) + cachingDB := reputation.NewCachingDB(log.Named("reputation:writecache"), reputationDB, config.Reputation) peer.Services.Add(lifecycle.Item{ Name: "reputation:writecache", Run: cachingDB.Manage, diff --git a/satellite/core.go b/satellite/core.go index 1847d245c..1d6e96d8e 100644 --- a/satellite/core.go +++ b/satellite/core.go @@ -342,7 +342,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, { // setup reputation reputationDB := peer.DB.Reputation() if config.Reputation.FlushInterval > 0 { - cachingDB := reputation.NewCachingDB(log.Named("reputation:writecache"), peer.Identity.ID, reputationDB, config.Reputation) + cachingDB := reputation.NewCachingDB(log.Named("reputation:writecache"), reputationDB, config.Reputation) peer.Services.Add(lifecycle.Item{ Name: "reputation:writecache", Run: cachingDB.Manage, diff --git a/satellite/repairer.go b/satellite/repairer.go index 71f38bcec..1a93f1475 100644 --- a/satellite/repairer.go +++ b/satellite/repairer.go @@ -158,7 +158,7 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity, { // setup reputation if config.Reputation.FlushInterval > 0 { - cachingDB := reputation.NewCachingDB(log.Named("reputation:writecache"), peer.Identity.ID, reputationdb, config.Reputation) + cachingDB := reputation.NewCachingDB(log.Named("reputation:writecache"), reputationdb, config.Reputation) peer.Services.Add(lifecycle.Item{ Name: "reputation:writecache", Run: cachingDB.Manage, diff --git a/satellite/reputation/db_test.go b/satellite/reputation/db_test.go index 162b2b8b8..1523e8cfa 100644 --- a/satellite/reputation/db_test.go +++ b/satellite/reputation/db_test.go @@ -223,7 +223,7 @@ func TestApplyUpdatesEquivalentToMultipleUpdatesCached(t *testing.T) { } satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { - cachingDB := reputation.NewCachingDB(zaptest.NewLogger(t), testrand.NodeID(), db.Reputation(), config) + cachingDB := reputation.NewCachingDB(zaptest.NewLogger(t), db.Reputation(), config) cancelCtx, cancel := context.WithCancel(ctx) defer cancel() ctx.Go(func() error { diff --git a/satellite/reputation/writecache.go b/satellite/reputation/writecache.go index dbb8e0fc7..04790faad 100644 --- a/satellite/reputation/writecache.go +++ b/satellite/reputation/writecache.go @@ -8,6 +8,7 @@ import ( "context" "encoding/binary" "errors" + "math/rand" "sync" "time" @@ -22,10 +23,11 @@ import ( var _ DB = (*CachingDB)(nil) // NewCachingDB creates a new CachingDB instance. -func NewCachingDB(log *zap.Logger, peerID storj.NodeID, backingStore DB, reputationConfig Config) *CachingDB { +func NewCachingDB(log *zap.Logger, backingStore DB, reputationConfig Config) *CachingDB { + randSource := rand.New(rand.NewSource(time.Now().UnixNano())) return &CachingDB{ log: log, - peerID: peerID, + instanceOffset: randSource.Uint64(), backingStore: backingStore, nowFunc: time.Now, reputationConfig: reputationConfig, @@ -43,7 +45,7 @@ type CachingDB struct { // These fields must be populated before the cache starts being used. // They are not expected to change. log *zap.Logger - peerID storj.NodeID + instanceOffset uint64 backingStore DB nowFunc func() time.Time reputationConfig Config @@ -558,7 +560,7 @@ func (cdb *CachingDB) getEntriesToSync(now time.Time, f func(entryToSync *cached } func (cdb *CachingDB) nextTimeForSync(nodeID storj.NodeID, now time.Time) time.Time { - return nextTimeForSync(cdb.peerID, nodeID, now, cdb.syncInterval) + return nextTimeForSync(cdb.instanceOffset, nodeID, now, cdb.syncInterval) } // nextTimeForSync decides the next time at which the given nodeID should next @@ -566,15 +568,14 @@ func (cdb *CachingDB) nextTimeForSync(nodeID storj.NodeID, now time.Time) time.T // // We make an effort to distribute the nodes in time, so that the service // is not usually trying to retrieve or update many rows at the same time. We -// also make an effort to offset this sync schedule by the instance ID of this -// process so that in most cases, instances will not be trying to update the -// same row at the same time, minimizing contention. -func nextTimeForSync(peerID, nodeID storj.NodeID, now time.Time, syncInterval time.Duration) time.Time { +// also make an effort to offset this sync schedule by a random value unique +// to this process so that in most cases, instances will not be trying to +// update the same row at the same time, minimizing contention. +func nextTimeForSync(instanceOffset uint64, nodeID storj.NodeID, now time.Time, syncInterval time.Duration) time.Time { // calculate the fraction into the FlushInterval at which this node // should always be synchronized. initialPosition := binary.BigEndian.Uint64(nodeID[:8]) - offset := binary.BigEndian.Uint64(peerID[:8]) - finalPosition := initialPosition + offset + finalPosition := initialPosition + instanceOffset positionAsFraction := float64(finalPosition) / (1 << 64) // and apply that fraction to the actual interval periodStart := now.Truncate(syncInterval) diff --git a/satellite/reputation/writecache_service_test.go b/satellite/reputation/writecache_service_test.go index 7f36cc45b..a487d3b7b 100644 --- a/satellite/reputation/writecache_service_test.go +++ b/satellite/reputation/writecache_service_test.go @@ -68,7 +68,7 @@ func TestHeavyLockContention(t *testing.T) { ErrorRetryInterval: 0, } reputationDB := db.Reputation() - writecacheDB := reputation.NewCachingDB(zaptest.NewLogger(t), testrand.NodeID(), reputationDB, config) + writecacheDB := reputation.NewCachingDB(zaptest.NewLogger(t), reputationDB, config) var group errgroup.Group // Make room for results ahead of time, so we don't need to use any @@ -148,7 +148,7 @@ func TestFetchingInfoWhileEntryIsSyncing(t *testing.T) { } logger := zaptest.NewLogger(t) reputationDB := db.Reputation() - writecache := reputation.NewCachingDB(logger.Named("writecache"), testrand.NodeID(), reputationDB, config) + writecache := reputation.NewCachingDB(logger.Named("writecache"), reputationDB, config) const positiveAudits = 123 for i := 0; i < numRounds; i++ { diff --git a/satellite/reputation/writecache_unit_test.go b/satellite/reputation/writecache_unit_test.go index 118b6edc6..0244e4948 100644 --- a/satellite/reputation/writecache_unit_test.go +++ b/satellite/reputation/writecache_unit_test.go @@ -23,22 +23,28 @@ func TestNextTimeForSync(t *testing.T) { var quarterwayID storj.NodeID binary.BigEndian.PutUint64(quarterwayID[:8], 1<<62) + const ( + zeroOffset = 0 + halfwayOffset = 1 << 63 + quarterwayOffset = 1 << 62 + ) + startOfHour := time.Now().Truncate(time.Hour) now := startOfHour.Add(15 * time.Minute) - nextTime := nextTimeForSync(zeroesID, halfwayID, now, time.Hour) + nextTime := nextTimeForSync(zeroOffset, halfwayID, now, time.Hour) requireInDeltaTime(t, startOfHour.Add(30*time.Minute), nextTime, time.Second) - nextTime = nextTimeForSync(halfwayID, zeroesID, now, time.Hour) + nextTime = nextTimeForSync(halfwayOffset, zeroesID, now, time.Hour) requireInDeltaTime(t, startOfHour.Add(30*time.Minute), nextTime, time.Second) - nextTime = nextTimeForSync(zeroesID, zeroesID, now, time.Hour) + nextTime = nextTimeForSync(zeroOffset, zeroesID, now, time.Hour) requireInDeltaTime(t, startOfHour.Add(time.Hour), nextTime, time.Second) - nextTime = nextTimeForSync(halfwayID, halfwayID, now, time.Hour) + nextTime = nextTimeForSync(halfwayOffset, halfwayID, now, time.Hour) requireInDeltaTime(t, startOfHour.Add(time.Hour), nextTime, time.Second) - nextTime = nextTimeForSync(quarterwayID, halfwayID, now, time.Hour) + nextTime = nextTimeForSync(quarterwayOffset, halfwayID, now, time.Hour) requireInDeltaTime(t, startOfHour.Add(45*time.Minute), nextTime, time.Second) }