From d3604a5e90036ae7d1b4dcdb9a5729e40950e2ec Mon Sep 17 00:00:00 2001 From: paul cannon Date: Mon, 7 Dec 2020 22:18:00 -0600 Subject: [PATCH] satellite/repair: use survivability model for segment health The chief segment health models we've come up with are the "immediate danger" model and the "survivability" model. The former calculates the chance of losing a segment becoming lost in the next time period (using the CDF of the binomial distribution to estimate the chance of x nodes failing in that period), while the latter estimates the number of iterations for which a segment can be expected to survive (using the mean of the negative binomial distribution). The immediate danger model was a promising one for comparing segment health across segments with different RS parameters, as it is more precisely what we want to prevent, but it turns out that practically all segments in production have infinite health, as the chance of losing segments with any reasonable estimate of node failure rate is smaller than DBL_EPSILON, the smallest possible difference from 1.0 representable in a float64 (about 1e-16). Leaving aside the wisdom of worrying about the repair of segments that have less than a 1e-16 chance of being lost, we want to be extremely conservative and proactive in our repair efforts, and the health of the segments we have been repairing thus far also evaluates to infinity under the immediate danger model. Thus, we find ourselves reaching for an alternative. Dr. Ben saves the day: the survivability model is a reasonably close approximation of the immediate danger model, and even better, it is far simpler to calculate and yields manageable values for real-world segments. The downside to it is that it requires as input an estimate of the total number of active nodes. This change replaces the segment health calculation to use the survivability model, and reinstates the call to SegmentHealth() where it was reverted. It gets estimates for the total number of active nodes by leveraging the reliability cache. Change-Id: Ia5d9b9031b9f6cf0fa7b9005a7011609415527dc --- satellite/repair/checker/checker.go | 70 ++++++++--- satellite/repair/checker/online.go | 46 +++++-- satellite/repair/priority.go | 185 +++++++--------------------- satellite/repair/priority_test.go | 74 +++++------ 4 files changed, 164 insertions(+), 211 deletions(-) diff --git a/satellite/repair/checker/checker.go b/satellite/repair/checker/checker.go index f1c2a4f8d..2f99527f4 100644 --- a/satellite/repair/checker/checker.go +++ b/satellite/repair/checker/checker.go @@ -84,6 +84,28 @@ func (checker *Checker) Run(ctx context.Context) (err error) { return group.Wait() } +// getNodesEstimate updates the estimate of the total number of nodes. It is guaranteed +// to return a number greater than 0 when the error is nil. +// +// We can't calculate this upon first starting a Checker, because there may not be any +// nodes yet. We expect that there will be nodes before there are segments, though. +func (checker *Checker) getNodesEstimate(ctx context.Context) (int, error) { + // this should be safe to call frequently; it is an efficient caching lookup. + totalNumNodes, err := checker.nodestate.NumNodes(ctx) + if err != nil { + // We could proceed here by returning the last good value, or by returning a fallback + // constant estimate, like "20000", and we'd probably be fine, but it would be better + // not to have that happen silently for too long. Also, if we can't get this from the + // database, we probably can't modify the injured segments queue, so it won't help to + // proceed with this repair operation. + return 0, err + } + if totalNumNodes == 0 { + return 0, Error.New("segment health is meaningless: there are no nodes") + } + return totalNumNodes, nil +} + // RefreshReliabilityCache forces refreshing node online status cache. func (checker *Checker) RefreshReliabilityCache(ctx context.Context) error { return checker.nodestate.Refresh(ctx) @@ -102,14 +124,15 @@ func (checker *Checker) IdentifyInjuredSegments(ctx context.Context) (err error) startTime := time.Now() observer := &checkerObserver{ - repairQueue: checker.repairQueue, - irrdb: checker.irrdb, - nodestate: checker.nodestate, - statsCollector: checker.statsCollector, - monStats: aggregateStats{}, - repairOverrides: checker.repairOverrides, - nodeFailureRate: checker.nodeFailureRate, - log: checker.logger, + repairQueue: checker.repairQueue, + irrdb: checker.irrdb, + nodestate: checker.nodestate, + statsCollector: checker.statsCollector, + monStats: aggregateStats{}, + repairOverrides: checker.repairOverrides, + nodeFailureRate: checker.nodeFailureRate, + getNodesEstimate: checker.getNodesEstimate, + log: checker.logger, } err = checker.metaLoop.Join(ctx, observer) if err != nil { @@ -187,6 +210,11 @@ func (checker *Checker) updateIrreparableSegmentStatus(ctx context.Context, poin repairThreshold = overrideValue } + totalNumNodes, err := checker.getNodesEstimate(ctx) + if err != nil { + return Error.New("could not get estimate of total number of nodes: %w", err) + } + // we repair when the number of healthy pieces is less than or equal to the repair threshold and is greater or equal to // minimum required pieces in redundancy // except for the case when the repair and success thresholds are the same (a case usually seen during testing) @@ -194,7 +222,7 @@ func (checker *Checker) updateIrreparableSegmentStatus(ctx context.Context, poin // If the segment is suddenly entirely healthy again, we don't need to repair and we don't need to // keep it in the irreparabledb queue either. if numHealthy >= redundancy.MinReq && numHealthy <= repairThreshold && numHealthy < redundancy.SuccessThreshold { - segmentHealth := float64(numHealthy) + segmentHealth := repair.SegmentHealth(int(numHealthy), int(redundancy.MinReq), totalNumNodes, checker.nodeFailureRate) _, err = checker.repairQueue.Insert(ctx, &internalpb.InjuredSegment{ Path: key, LostPieces: missingPieces, @@ -240,14 +268,15 @@ var _ metainfo.Observer = (*checkerObserver)(nil) // // architecture: Observer type checkerObserver struct { - repairQueue queue.RepairQueue - irrdb irreparable.DB - nodestate *ReliabilityCache - statsCollector *statsCollector - monStats aggregateStats // TODO(cam): once we verify statsCollector reports data correctly, remove this - repairOverrides RepairOverridesMap - nodeFailureRate float64 - log *zap.Logger + repairQueue queue.RepairQueue + irrdb irreparable.DB + nodestate *ReliabilityCache + statsCollector *statsCollector + monStats aggregateStats // TODO(cam): once we verify statsCollector reports data correctly, remove this + repairOverrides RepairOverridesMap + nodeFailureRate float64 + getNodesEstimate func(ctx context.Context) (int, error) + log *zap.Logger } func (obs *checkerObserver) getStatsByRS(redundancy storj.RedundancyScheme) *stats { @@ -295,6 +324,11 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metainfo } } + totalNumNodes, err := obs.getNodesEstimate(ctx) + if err != nil { + return Error.New("could not get estimate of total number of nodes: %w", err) + } + // TODO: update MissingPieces to accept metabase.Pieces missingPieces, err := obs.nodestate.MissingPieces(ctx, segment.CreationDate, pbPieces) if err != nil { @@ -315,7 +349,7 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metainfo required, repairThreshold, successThreshold, _ := obs.loadRedundancy(segment.Redundancy) - segmentHealth := repair.SegmentHealth(numHealthy, required, obs.nodeFailureRate) + segmentHealth := repair.SegmentHealth(numHealthy, required, totalNumNodes, obs.nodeFailureRate) mon.FloatVal("checker_segment_health").Observe(segmentHealth) //mon:locked stats.segmentHealth.Observe(segmentHealth) diff --git a/satellite/repair/checker/online.go b/satellite/repair/checker/online.go index 050265934..f8d0ea4eb 100644 --- a/satellite/repair/checker/online.go +++ b/satellite/repair/checker/online.go @@ -39,7 +39,8 @@ func NewReliabilityCache(overlay *overlay.Service, staleness time.Duration) *Rel } } -// LastUpdate returns when the cache was last updated. +// LastUpdate returns when the cache was last updated, or the zero value (time.Time{}) if it +// has never yet been updated. LastUpdate() does not trigger an update itself. func (cache *ReliabilityCache) LastUpdate() time.Time { if state, ok := cache.state.Load().(*reliabilityState); ok { return state.created @@ -47,10 +48,40 @@ func (cache *ReliabilityCache) LastUpdate() time.Time { return time.Time{} } +// NumNodes returns the number of online active nodes (as determined by the reliability cache). +// This number is not guaranteed to be consistent with either the nodes database or the +// reliability cache after returning; it is just a best-effort count and should be treated as an +// estimate. +func (cache *ReliabilityCache) NumNodes(ctx context.Context) (numNodes int, err error) { + defer mon.Task()(&ctx)(&err) + + state, err := cache.loadFast(ctx, time.Time{}) + if err != nil { + return 0, err + } + return len(state.reliable), nil +} + // MissingPieces returns piece indices that are unreliable with the given staleness period. func (cache *ReliabilityCache) MissingPieces(ctx context.Context, created time.Time, pieces []*pb.RemotePiece) (_ []int32, err error) { defer mon.Task()(&ctx)(&err) + state, err := cache.loadFast(ctx, created) + if err != nil { + return nil, err + } + var unreliable []int32 + for _, piece := range pieces { + if _, ok := state.reliable[piece.NodeId]; !ok { + unreliable = append(unreliable, piece.PieceNum) + } + } + return unreliable, nil +} + +func (cache *ReliabilityCache) loadFast(ctx context.Context, validUpTo time.Time) (_ *reliabilityState, err error) { + defer mon.Task()(&ctx)(&err) + // This code is designed to be very fast in the case where a refresh is not needed: just an // atomic load from rarely written to bit of shared memory. The general strategy is to first // read if the state suffices to answer the query. If not (due to it not existing, being @@ -60,10 +91,10 @@ func (cache *ReliabilityCache) MissingPieces(ctx context.Context, created time.T // the acquisition. Only then do we refresh and can then proceed answering the query. state, ok := cache.state.Load().(*reliabilityState) - if !ok || created.After(state.created) || time.Since(state.created) > cache.staleness { + if !ok || validUpTo.After(state.created) || time.Since(state.created) > cache.staleness { cache.mu.Lock() state, ok = cache.state.Load().(*reliabilityState) - if !ok || created.After(state.created) || time.Since(state.created) > cache.staleness { + if !ok || validUpTo.After(state.created) || time.Since(state.created) > cache.staleness { state, err = cache.refreshLocked(ctx) } cache.mu.Unlock() @@ -71,14 +102,7 @@ func (cache *ReliabilityCache) MissingPieces(ctx context.Context, created time.T return nil, err } } - - var unreliable []int32 - for _, piece := range pieces { - if _, ok := state.reliable[piece.NodeId]; !ok { - unreliable = append(unreliable, piece.PieceNum) - } - } - return unreliable, nil + return state, nil } // Refresh refreshes the cache. diff --git a/satellite/repair/priority.go b/satellite/repair/priority.go index 4194483c8..eb8c35116 100644 --- a/satellite/repair/priority.go +++ b/satellite/repair/priority.go @@ -3,144 +3,53 @@ package repair -import ( - "math" -) +import "math" -// SegmentHealth returns a value corresponding to the health of a segment -// in the repair queue. Lower health segments should be repaired first. -func SegmentHealth(numHealthy, minPieces int, failureRate float64) float64 { - return 1.0 / SegmentDanger(numHealthy, minPieces, failureRate) +// SegmentHealth returns a value corresponding to the health of a segment in the +// repair queue. Lower health segments should be repaired first. +// +// This calculation purports to find the number of iterations for which a +// segment can be expected to survive, with the given failureRate. The number of +// iterations for the segment to survive (X) can be modeled with the negative +// binomial distribution, with the number of pieces that must be lost as the +// success threshold r, and the chance of losing a single piece in a round as +// the trial success probability p. +// +// First, we calculate the expected number of iterations for a segment to +// survive if we were to lose exactly one node every iteration: +// +// r = numHealthy - minPieces + 1 +// p = (totalNodes - numHealthy) / totalNodes +// X ~ NB(r, p) +// +// Then we take the mean of that distribution to use as our expected value, +// which is pr/(1-p). +// +// Finally, to get away from the "one node per iteration" simplification, we +// just scale the magnitude of the iterations in the model so that there really +// is one node being lost. For example, if our failureRate and totalNodes imply +// a churn rate of 3 nodes per day, we just take 1/3 of a day and call that an +// "iteration" for purposes of the model. To convert iterations in the model to +// days, we divide the mean of the negative binomial distribution (X, above) by +// the number of nodes that we estimate will churn in one day. +func SegmentHealth(numHealthy, minPieces, totalNodes int, failureRate float64) float64 { + churnPerRound := float64(totalNodes) * failureRate + if churnPerRound < minChurnPerRound { + // we artificially limit churnPerRound from going too low in cases + // where there are not many nodes, so that health values do not + // start to approach the floating point maximum + churnPerRound = minChurnPerRound + } + p := float64(totalNodes-numHealthy) / float64(totalNodes) + if p == 1.0 { + // floating point precision is insufficient to represent the difference + // from p to 1. there are too many nodes for this model, or else + // numHealthy is 0 somehow. we can't proceed with the normal calculation + // or we will divide by zero. + return math.Inf(1) + } + mean1 := float64(numHealthy-minPieces+1) * p / (1 - p) + return mean1 / churnPerRound } -// SegmentDanger returns the chance of a segment with the given minPieces -// and the given number of healthy pieces of being lost in the next time -// period. -// -// It assumes: -// -// * Nodes fail at the given failureRate (i.e., each node has a failureRate -// chance of going offline within the next time period). -// * Node failures are entirely independent. Obviously this is not the case, -// because many nodes may be operated by a single entity or share network -// infrastructure, in which case their failures would be correlated. But we -// can't easily model that, so our best hope is to try to avoid putting -// pieces for the same segment on related nodes to maximize failure -// independence. -// -// (The "time period" we are talking about here could be anything. The returned -// danger value will be given in terms of whatever time period was used to -// determine failureRate. If it simplifies things, you can think of the time -// period as "one repair worker iteration".) -// -// If those things are true, then the number of nodes holding this segment -// that will go offline follows the Binomial distribution: -// -// X ~ Binom(numHealthy, failureRate) -// -// A segment is lost if the number of nodes that go offline is higher than -// (numHealthy - minPieces). So we want to find -// -// Pr[X > (numHealthy - minPieces)] -// -// If we invert the logic here, we can use the standard CDF for the binomial -// distribution. -// -// Pr[X > (numHealthy - minPieces)] = 1 - Pr[X <= (numHealthy - minPieces)] -// -// And that gives us the danger value. -func SegmentDanger(numHealthy, minPieces int, failureRate float64) float64 { - return 1.0 - binomialCDF(float64(numHealthy-minPieces), float64(numHealthy), failureRate) -} - -// math.Lgamma without the returned sign parameter; it's unneeded here. -func lnGamma(x float64) float64 { - lg, _ := math.Lgamma(x) - return lg -} - -// The following functions are based on code from -// Numerical Recipes in C, Second Edition, Section 6.4 (pp. 227-228). - -// betaI calculates the incomplete beta function I_x(a, b). -func betaI(a, b, x float64) float64 { - if x < 0.0 || x > 1.0 { - return math.NaN() - } - bt := 0.0 - if x > 0.0 && x < 1.0 { - // factors in front of the continued function - bt = math.Exp(lnGamma(a+b) - lnGamma(a) - lnGamma(b) + a*math.Log(x) + b*math.Log(1.0-x)) - } - if x < (a+1.0)/(a+b+2.0) { - // use continued fraction directly - return bt * betaCF(a, b, x) / a - } - // use continued fraction after making the symmetry transformation - return 1.0 - bt*betaCF(b, a, 1.0-x)/b -} - -const ( - // unlikely to go this far, as betaCF is expected to converge quickly for - // typical values. - maxIter = 100 - - // betaI outputs will be accurate to within this amount. - epsilon = 1.0e-14 -) - -// betaCF evaluates the continued fraction for the incomplete beta function -// by a modified Lentz's method. -func betaCF(a, b, x float64) float64 { - avoidZero := func(f float64) float64 { - if math.Abs(f) < math.SmallestNonzeroFloat64 { - return math.SmallestNonzeroFloat64 - } - return f - } - - qab := a + b - qap := a + 1.0 - qam := a - 1.0 - c := 1.0 - d := 1.0 / avoidZero(1.0-qab*x/qap) - h := d - - for m := 1; m <= maxIter; m++ { - m := float64(m) - m2 := 2.0 * m - aa := m * (b - m) * x / ((qam + m2) * (a + m2)) - // one step (the even one) of the recurrence - d = 1.0 / avoidZero(1.0+aa*d) - c = avoidZero(1.0 + aa/c) - h *= d * c - aa = -(a + m) * (qab + m) * x / ((a + m2) * (qap + m2)) - // next step of the recurrence (the odd one) - d = 1.0 / avoidZero(1.0+aa*d) - c = avoidZero(1.0 + aa/c) - del := d * c - h *= del - if math.Abs(del-1.0) < epsilon { - return h - } - } - // a or b too big, or maxIter too small - return math.NaN() -} - -// binomialCDF evaluates the CDF of the binomial distribution Binom(n, p) at k. -// This is done using (1-p)**(n-k) when k is 0, or with the incomplete beta -// function otherwise. -func binomialCDF(k, n, p float64) float64 { - k = math.Floor(k) - if k < 0.0 || n < k { - return math.NaN() - } - if k == n { - return 1.0 - } - if k == 0 { - return math.Pow(1.0-p, n-k) - } - return betaI(n-k, k+1.0, 1.0-p) -} +const minChurnPerRound = 1e-10 diff --git a/satellite/repair/priority_test.go b/satellite/repair/priority_test.go index c09b050cb..59959da08 100644 --- a/satellite/repair/priority_test.go +++ b/satellite/repair/priority_test.go @@ -10,59 +10,45 @@ import ( "github.com/stretchr/testify/assert" ) -func TestBetaI(t *testing.T) { - // check a few places where betaI has some easily representable values - assert.Equal(t, 0.0, betaI(0.5, 5, 0)) - assert.Equal(t, 0.0, betaI(1, 3, 0)) - assert.Equal(t, 0.0, betaI(8, 10, 0)) - assert.Equal(t, 0.0, betaI(8, 10, 0)) - assert.InDelta(t, 0.5, betaI(0.5, 0.5, 0.5), epsilon) - assert.InDelta(t, 1.0/3.0, betaI(0.5, 0.5, 0.25), epsilon) - assert.InDelta(t, 0.488, betaI(1, 3, 0.2), epsilon) -} - -func BenchmarkBetaI(b *testing.B) { - for i := 0; i < b.N; i++ { - assert.InDelta(b, 1.0/3.0, betaI(0.5, 0.5, 0.25), epsilon) - } -} - -func TestSegmentDanger(t *testing.T) { - const failureRate = 0.01 - assert.Greater(t, - SegmentDanger(11, 10, failureRate), - SegmentDanger(10, 5, failureRate)) - assert.Greater(t, - SegmentDanger(11, 10, failureRate), - SegmentDanger(10, 9, failureRate)) - assert.Greater(t, - SegmentDanger(10, 10, failureRate), - SegmentDanger(9, 9, failureRate)) - assert.Less(t, - SegmentDanger(11, 10, failureRate), - SegmentDanger(12, 11, failureRate)) -} - func TestSegmentHealth(t *testing.T) { const failureRate = 0.01 assert.Less(t, - SegmentHealth(11, 10, failureRate), - SegmentHealth(10, 5, failureRate)) + SegmentHealth(11, 10, 10000, failureRate), + SegmentHealth(10, 5, 10000, failureRate)) assert.Less(t, - SegmentHealth(11, 10, failureRate), - SegmentHealth(10, 9, failureRate)) + SegmentHealth(11, 10, 10000, failureRate), + SegmentHealth(10, 9, 10000, failureRate)) assert.Less(t, - SegmentHealth(10, 10, failureRate), - SegmentHealth(9, 9, failureRate)) + SegmentHealth(10, 10, 10000, failureRate), + SegmentHealth(9, 9, 10000, failureRate)) assert.Greater(t, - SegmentHealth(11, 10, failureRate), - SegmentHealth(12, 11, failureRate)) + SegmentHealth(11, 10, 10000, failureRate), + SegmentHealth(12, 11, 10000, failureRate)) assert.Greater(t, - SegmentHealth(13, 10, failureRate), - SegmentHealth(12, 10, failureRate)) + SegmentHealth(13, 10, 10000, failureRate), + SegmentHealth(12, 10, 10000, failureRate)) } func TestSegmentHealthForDecayedSegment(t *testing.T) { const failureRate = 0.01 - assert.True(t, math.IsNaN(SegmentHealth(9, 10, failureRate))) + got := SegmentHealth(9, 10, 10000, failureRate) + assert.Equal(t, float64(0), got) +} + +func TestHighHealthAndLowFailureRate(t *testing.T) { + const failureRate = 0.00005435 + assert.Less(t, + SegmentHealth(36, 35, 10000, failureRate), math.Inf(1)) + assert.Greater(t, + SegmentHealth(36, 35, 10000, failureRate), + SegmentHealth(35, 35, 10000, failureRate)) + assert.Less(t, + SegmentHealth(60, 29, 10000, failureRate), math.Inf(1)) + assert.Greater(t, + SegmentHealth(61, 29, 10000, failureRate), + SegmentHealth(60, 29, 10000, failureRate)) + + assert.Greater(t, + SegmentHealth(11, 10, 10000, failureRate), + SegmentHealth(39, 34, 10000, failureRate)) }