satellite/downtime: add concurrency to downtime estimation
We want to increase our throughput for downtime estimation. This commit adds the ability to reach out to multiple nodes concurrently for downtime estimation. The number of concurrent routines is determined by a new config flag, EstimationConcurrencyLimit. It also increases the default EstimationBatchSize to 1000. Change-Id: I800ce7ec1035885afa194c3c3f64eedd4f6f61eb
This commit is contained in:
parent
17ccf36c15
commit
3ee6c14f54
@ -466,9 +466,10 @@ func (planet *Planet) newSatellites(count int, satelliteDatabases satellitedbtes
|
||||
ChoreInterval: defaultInterval,
|
||||
},
|
||||
Downtime: downtime.Config{
|
||||
DetectionInterval: defaultInterval,
|
||||
EstimationInterval: defaultInterval,
|
||||
EstimationBatchSize: 0,
|
||||
DetectionInterval: defaultInterval,
|
||||
EstimationInterval: defaultInterval,
|
||||
EstimationBatchSize: 5,
|
||||
EstimationConcurrencyLimit: 5,
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -15,7 +15,8 @@ var (
|
||||
|
||||
// Config for the chore.
|
||||
type Config struct {
|
||||
DetectionInterval time.Duration `help:"how often to run the downtime detection chore." releaseDefault:"1h0s" devDefault:"30s"`
|
||||
EstimationInterval time.Duration `help:"how often to run the downtime estimation chore" releaseDefault:"1h0s" devDefault:"30s"`
|
||||
EstimationBatchSize int `help:"the downtime estimation chore should check this many offline nodes" releaseDefault:"100" devDefault:"100"`
|
||||
DetectionInterval time.Duration `help:"how often to run the downtime detection chore." releaseDefault:"1h0s" devDefault:"30s"`
|
||||
EstimationInterval time.Duration `help:"how often to run the downtime estimation chore" releaseDefault:"1h0s" devDefault:"30s"`
|
||||
EstimationBatchSize int `help:"the downtime estimation chore should check this many offline nodes" releaseDefault:"1000" devDefault:"100"`
|
||||
EstimationConcurrencyLimit int `help:"max number of concurrent connections in estimation chore" default:"10"`
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ import (
|
||||
type EstimationChore struct {
|
||||
log *zap.Logger
|
||||
Loop *sync2.Cycle
|
||||
limiter *sync2.Limiter
|
||||
config Config
|
||||
startTime time.Time
|
||||
overlay *overlay.Service
|
||||
@ -28,9 +29,13 @@ type EstimationChore struct {
|
||||
|
||||
// NewEstimationChore instantiates EstimationChore.
|
||||
func NewEstimationChore(log *zap.Logger, config Config, overlay *overlay.Service, service *Service, db DB) *EstimationChore {
|
||||
if config.EstimationConcurrencyLimit <= 0 {
|
||||
config.EstimationConcurrencyLimit = 1
|
||||
}
|
||||
return &EstimationChore{
|
||||
log: log,
|
||||
Loop: sync2.NewCycle(config.EstimationInterval),
|
||||
limiter: sync2.NewLimiter(config.EstimationConcurrencyLimit),
|
||||
config: config,
|
||||
startTime: time.Now().UTC(),
|
||||
overlay: overlay,
|
||||
@ -55,26 +60,30 @@ func (chore *EstimationChore) Run(ctx context.Context) (err error) {
|
||||
}
|
||||
|
||||
for _, node := range offlineNodes {
|
||||
success, err := chore.service.CheckAndUpdateNodeAvailability(ctx, node.ID, node.Address)
|
||||
if err != nil {
|
||||
chore.log.Error("error during downtime estimation ping back",
|
||||
zap.Bool("success", success),
|
||||
zap.Error(err))
|
||||
continue
|
||||
}
|
||||
if !success && node.LastContactFailure.After(chore.startTime) {
|
||||
now := time.Now().UTC()
|
||||
duration := now.Sub(node.LastContactFailure)
|
||||
|
||||
err = chore.db.Add(ctx, node.ID, now, duration)
|
||||
node := node
|
||||
chore.limiter.Go(ctx, func() {
|
||||
success, err := chore.service.CheckAndUpdateNodeAvailability(ctx, node.ID, node.Address)
|
||||
if err != nil {
|
||||
chore.log.Error("error adding node seconds offline information.",
|
||||
zap.Stringer("node ID", node.ID),
|
||||
zap.Stringer("duration", duration),
|
||||
chore.log.Error("error during downtime estimation ping back",
|
||||
zap.Bool("success", success),
|
||||
zap.Error(err))
|
||||
return
|
||||
}
|
||||
}
|
||||
if !success && node.LastContactFailure.After(chore.startTime) {
|
||||
now := time.Now().UTC()
|
||||
duration := now.Sub(node.LastContactFailure)
|
||||
|
||||
err = chore.db.Add(ctx, node.ID, now, duration)
|
||||
if err != nil {
|
||||
chore.log.Error("error adding node seconds offline information.",
|
||||
zap.Stringer("node ID", node.ID),
|
||||
zap.Stringer("duration", duration),
|
||||
zap.Error(err))
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
chore.limiter.Wait()
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
@ -15,6 +15,7 @@ import (
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/downtime"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
)
|
||||
|
||||
// TestEstimationChoreBasic tests the basic functionality of the downtime estimation chore:
|
||||
@ -22,60 +23,72 @@ import (
|
||||
// 2. Test that when a node that had one failed ping, and another failed ping >1s later has at least 1s of recorded downtime
|
||||
func TestEstimationChoreBasic(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
|
||||
SatelliteCount: 1, StorageNodeCount: 2, UplinkCount: 0,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
config.Downtime.EstimationBatchSize = 1
|
||||
config.Downtime.EstimationBatchSize = 2
|
||||
},
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
node := planet.StorageNodes[0]
|
||||
satellite := planet.Satellites[0]
|
||||
node.Contact.Chore.Pause(ctx)
|
||||
satellite.DowntimeTracking.EstimationChore.Loop.Pause()
|
||||
{ // test estimation chore updates uptime correctly for an online node
|
||||
// mark node as failing an uptime check so the estimation chore picks it up
|
||||
_, err := satellite.DB.OverlayCache().UpdateUptime(ctx, node.ID(), false)
|
||||
require.NoError(t, err)
|
||||
oldNode, err := satellite.DB.OverlayCache().Get(ctx, node.ID())
|
||||
require.NoError(t, err)
|
||||
require.True(t, oldNode.Reputation.LastContactSuccess.Before(oldNode.Reputation.LastContactFailure))
|
||||
|
||||
{ // test last_contact_success is updated for nodes where last_contact_failure > last_contact_success, but node is online
|
||||
var oldNodes []*overlay.NodeDossier
|
||||
for _, node := range planet.StorageNodes {
|
||||
node.Contact.Chore.Pause(ctx)
|
||||
// mark node as failing an uptime check so the estimation chore picks it up
|
||||
_, err := satellite.DB.OverlayCache().UpdateUptime(ctx, node.ID(), false)
|
||||
require.NoError(t, err)
|
||||
oldNode, err := satellite.DB.OverlayCache().Get(ctx, node.ID())
|
||||
require.NoError(t, err)
|
||||
require.True(t, oldNode.Reputation.LastContactSuccess.Before(oldNode.Reputation.LastContactFailure))
|
||||
oldNodes = append(oldNodes, oldNode)
|
||||
}
|
||||
// run estimation chore
|
||||
time.Sleep(1 * time.Second) // wait for 1s because estimation chore truncates offline duration to seconds
|
||||
satellite.DowntimeTracking.EstimationChore.Loop.TriggerWait()
|
||||
// get offline time for node, expect it to be 0 since node was online when chore pinged it
|
||||
downtime, err := satellite.DB.DowntimeTracking().GetOfflineTime(ctx, node.ID(), time.Now().Add(-5*time.Hour), time.Now())
|
||||
require.NoError(t, err)
|
||||
require.True(t, downtime == 0)
|
||||
// expect node last contact success was updated
|
||||
newNode, err := satellite.DB.OverlayCache().Get(ctx, node.ID())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, oldNode.Reputation.LastContactFailure, newNode.Reputation.LastContactFailure)
|
||||
require.True(t, oldNode.Reputation.LastContactSuccess.Before(newNode.Reputation.LastContactSuccess))
|
||||
require.True(t, newNode.Reputation.LastContactFailure.Before(newNode.Reputation.LastContactSuccess))
|
||||
for i, node := range planet.StorageNodes {
|
||||
// get offline time for node, expect it to be 0 since node was online when chore pinged it
|
||||
downtime, err := satellite.DB.DowntimeTracking().GetOfflineTime(ctx, node.ID(), time.Now().Add(-5*time.Hour), time.Now())
|
||||
require.NoError(t, err)
|
||||
require.True(t, downtime == 0)
|
||||
// expect node last contact success was updated
|
||||
newNode, err := satellite.DB.OverlayCache().Get(ctx, node.ID())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, oldNodes[i].Reputation.LastContactFailure, newNode.Reputation.LastContactFailure)
|
||||
require.True(t, oldNodes[i].Reputation.LastContactSuccess.Before(newNode.Reputation.LastContactSuccess))
|
||||
require.True(t, newNode.Reputation.LastContactFailure.Before(newNode.Reputation.LastContactSuccess))
|
||||
}
|
||||
}
|
||||
{ // test estimation chore correctly aggregates offline time
|
||||
// mark node as failing an uptime check so the estimation chore picks it up
|
||||
_, err := satellite.DB.OverlayCache().UpdateUptime(ctx, node.ID(), false)
|
||||
require.NoError(t, err)
|
||||
oldNode, err := satellite.DB.OverlayCache().Get(ctx, node.ID())
|
||||
require.NoError(t, err)
|
||||
require.True(t, oldNode.Reputation.LastContactSuccess.Before(oldNode.Reputation.LastContactFailure))
|
||||
// close the node service so the ping back will fail
|
||||
err = node.Server.Close()
|
||||
require.NoError(t, err)
|
||||
{ // test last_contact_failure is updated and downtime is recorded for nodes where last_contact_failure > last_contact_success and node is offline
|
||||
var oldNodes []*overlay.NodeDossier
|
||||
for _, node := range planet.StorageNodes {
|
||||
// mark node as failing an uptime check so the estimation chore picks it up
|
||||
_, err := satellite.DB.OverlayCache().UpdateUptime(ctx, node.ID(), false)
|
||||
require.NoError(t, err)
|
||||
oldNode, err := satellite.DB.OverlayCache().Get(ctx, node.ID())
|
||||
require.NoError(t, err)
|
||||
require.True(t, oldNode.Reputation.LastContactSuccess.Before(oldNode.Reputation.LastContactFailure))
|
||||
// close the node service so the ping back will fail
|
||||
err = node.Server.Close()
|
||||
require.NoError(t, err)
|
||||
oldNodes = append(oldNodes, oldNode)
|
||||
}
|
||||
// run estimation chore
|
||||
time.Sleep(1 * time.Second) // wait for 1s because estimation chore truncates offline duration to seconds
|
||||
satellite.DowntimeTracking.EstimationChore.Loop.TriggerWait()
|
||||
// get offline time for node, expect it to be greater than 0 since node has been offline for at least 1s
|
||||
downtime, err := satellite.DB.DowntimeTracking().GetOfflineTime(ctx, node.ID(), time.Now().Add(-5*time.Hour), time.Now())
|
||||
require.NoError(t, err)
|
||||
require.True(t, downtime > 0)
|
||||
// expect node last contact failure was updated
|
||||
newNode, err := satellite.DB.OverlayCache().Get(ctx, node.ID())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, oldNode.Reputation.LastContactSuccess, newNode.Reputation.LastContactSuccess)
|
||||
require.True(t, oldNode.Reputation.LastContactFailure.Before(newNode.Reputation.LastContactFailure))
|
||||
for i, node := range planet.StorageNodes {
|
||||
// get offline time for node, expect it to be greater than 0 since node has been offline for at least 1s
|
||||
downtime, err := satellite.DB.DowntimeTracking().GetOfflineTime(ctx, node.ID(), time.Now().Add(-5*time.Hour), time.Now())
|
||||
require.NoError(t, err)
|
||||
require.True(t, downtime > 0)
|
||||
// expect node last contact failure was updated
|
||||
newNode, err := satellite.DB.OverlayCache().Get(ctx, node.ID())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, oldNodes[i].Reputation.LastContactSuccess, newNode.Reputation.LastContactSuccess)
|
||||
require.True(t, oldNodes[i].Reputation.LastContactFailure.Before(newNode.Reputation.LastContactFailure))
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
@ -111,8 +124,9 @@ func TestEstimationChoreSatelliteDowntime(t *testing.T) {
|
||||
newEstimationChore := downtime.NewEstimationChore(
|
||||
satellite.Log,
|
||||
downtime.Config{
|
||||
EstimationInterval: 1 * time.Second,
|
||||
EstimationBatchSize: 10,
|
||||
EstimationInterval: 1 * time.Second,
|
||||
EstimationBatchSize: 10,
|
||||
EstimationConcurrencyLimit: 10,
|
||||
},
|
||||
satellite.Overlay.Service,
|
||||
satellite.DowntimeTracking.Service,
|
||||
|
5
scripts/testdata/satellite-config.yaml.lock
vendored
5
scripts/testdata/satellite-config.yaml.lock
vendored
@ -146,7 +146,10 @@ contact.external-address: ""
|
||||
# downtime.detection-interval: 1h0m0s
|
||||
|
||||
# the downtime estimation chore should check this many offline nodes
|
||||
# downtime.estimation-batch-size: 100
|
||||
# downtime.estimation-batch-size: 1000
|
||||
|
||||
# max number of concurrent connections in estimation chore
|
||||
# downtime.estimation-concurrency-limit: 10
|
||||
|
||||
# how often to run the downtime estimation chore
|
||||
# downtime.estimation-interval: 1h0m0s
|
||||
|
Loading…
Reference in New Issue
Block a user