satellite/repair: use survivability model for segment health

The chief segment health models we've come up with are the "immediate
danger" model and the "survivability" model. The former calculates the
chance of losing a segment becoming lost in the next time period (using
the CDF of the binomial distribution to estimate the chance of x nodes
failing in that period), while the latter estimates the number of
iterations for which a segment can be expected to survive (using the
mean of the negative binomial distribution). The immediate danger model
was a promising one for comparing segment health across segments with
different RS parameters, as it is more precisely what we want to
prevent, but it turns out that practically all segments in production
have infinite health, as the chance of losing segments with any
reasonable estimate of node failure rate is smaller than DBL_EPSILON,
the smallest possible difference from 1.0 representable in a float64
(about 1e-16).

Leaving aside the wisdom of worrying about the repair of segments that
have less than a 1e-16 chance of being lost, we want to be extremely
conservative and proactive in our repair efforts, and the health of the
segments we have been repairing thus far also evaluates to infinity
under the immediate danger model. Thus, we find ourselves reaching for
an alternative.

Dr. Ben saves the day: the survivability model is a reasonably close
approximation of the immediate danger model, and even better, it is
far simpler to calculate and yields manageable values for real-world
segments. The downside to it is that it requires as input an estimate
of the total number of active nodes.

This change replaces the segment health calculation to use the
survivability model, and reinstates the call to SegmentHealth() where it
was reverted. It gets estimates for the total number of active nodes by
leveraging the reliability cache.

Change-Id: Ia5d9b9031b9f6cf0fa7b9005a7011609415527dc
This commit is contained in:
paul cannon 2020-12-07 22:18:00 -06:00 committed by Jennifer Li Johnson
parent 3feee9f4f8
commit d3604a5e90
4 changed files with 164 additions and 211 deletions

View File

@ -84,6 +84,28 @@ func (checker *Checker) Run(ctx context.Context) (err error) {
return group.Wait() return group.Wait()
} }
// getNodesEstimate updates the estimate of the total number of nodes. It is guaranteed
// to return a number greater than 0 when the error is nil.
//
// We can't calculate this upon first starting a Checker, because there may not be any
// nodes yet. We expect that there will be nodes before there are segments, though.
func (checker *Checker) getNodesEstimate(ctx context.Context) (int, error) {
// this should be safe to call frequently; it is an efficient caching lookup.
totalNumNodes, err := checker.nodestate.NumNodes(ctx)
if err != nil {
// We could proceed here by returning the last good value, or by returning a fallback
// constant estimate, like "20000", and we'd probably be fine, but it would be better
// not to have that happen silently for too long. Also, if we can't get this from the
// database, we probably can't modify the injured segments queue, so it won't help to
// proceed with this repair operation.
return 0, err
}
if totalNumNodes == 0 {
return 0, Error.New("segment health is meaningless: there are no nodes")
}
return totalNumNodes, nil
}
// RefreshReliabilityCache forces refreshing node online status cache. // RefreshReliabilityCache forces refreshing node online status cache.
func (checker *Checker) RefreshReliabilityCache(ctx context.Context) error { func (checker *Checker) RefreshReliabilityCache(ctx context.Context) error {
return checker.nodestate.Refresh(ctx) return checker.nodestate.Refresh(ctx)
@ -109,6 +131,7 @@ func (checker *Checker) IdentifyInjuredSegments(ctx context.Context) (err error)
monStats: aggregateStats{}, monStats: aggregateStats{},
repairOverrides: checker.repairOverrides, repairOverrides: checker.repairOverrides,
nodeFailureRate: checker.nodeFailureRate, nodeFailureRate: checker.nodeFailureRate,
getNodesEstimate: checker.getNodesEstimate,
log: checker.logger, log: checker.logger,
} }
err = checker.metaLoop.Join(ctx, observer) err = checker.metaLoop.Join(ctx, observer)
@ -187,6 +210,11 @@ func (checker *Checker) updateIrreparableSegmentStatus(ctx context.Context, poin
repairThreshold = overrideValue repairThreshold = overrideValue
} }
totalNumNodes, err := checker.getNodesEstimate(ctx)
if err != nil {
return Error.New("could not get estimate of total number of nodes: %w", err)
}
// we repair when the number of healthy pieces is less than or equal to the repair threshold and is greater or equal to // we repair when the number of healthy pieces is less than or equal to the repair threshold and is greater or equal to
// minimum required pieces in redundancy // minimum required pieces in redundancy
// except for the case when the repair and success thresholds are the same (a case usually seen during testing) // except for the case when the repair and success thresholds are the same (a case usually seen during testing)
@ -194,7 +222,7 @@ func (checker *Checker) updateIrreparableSegmentStatus(ctx context.Context, poin
// If the segment is suddenly entirely healthy again, we don't need to repair and we don't need to // If the segment is suddenly entirely healthy again, we don't need to repair and we don't need to
// keep it in the irreparabledb queue either. // keep it in the irreparabledb queue either.
if numHealthy >= redundancy.MinReq && numHealthy <= repairThreshold && numHealthy < redundancy.SuccessThreshold { if numHealthy >= redundancy.MinReq && numHealthy <= repairThreshold && numHealthy < redundancy.SuccessThreshold {
segmentHealth := float64(numHealthy) segmentHealth := repair.SegmentHealth(int(numHealthy), int(redundancy.MinReq), totalNumNodes, checker.nodeFailureRate)
_, err = checker.repairQueue.Insert(ctx, &internalpb.InjuredSegment{ _, err = checker.repairQueue.Insert(ctx, &internalpb.InjuredSegment{
Path: key, Path: key,
LostPieces: missingPieces, LostPieces: missingPieces,
@ -247,6 +275,7 @@ type checkerObserver struct {
monStats aggregateStats // TODO(cam): once we verify statsCollector reports data correctly, remove this monStats aggregateStats // TODO(cam): once we verify statsCollector reports data correctly, remove this
repairOverrides RepairOverridesMap repairOverrides RepairOverridesMap
nodeFailureRate float64 nodeFailureRate float64
getNodesEstimate func(ctx context.Context) (int, error)
log *zap.Logger log *zap.Logger
} }
@ -295,6 +324,11 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metainfo
} }
} }
totalNumNodes, err := obs.getNodesEstimate(ctx)
if err != nil {
return Error.New("could not get estimate of total number of nodes: %w", err)
}
// TODO: update MissingPieces to accept metabase.Pieces // TODO: update MissingPieces to accept metabase.Pieces
missingPieces, err := obs.nodestate.MissingPieces(ctx, segment.CreationDate, pbPieces) missingPieces, err := obs.nodestate.MissingPieces(ctx, segment.CreationDate, pbPieces)
if err != nil { if err != nil {
@ -315,7 +349,7 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metainfo
required, repairThreshold, successThreshold, _ := obs.loadRedundancy(segment.Redundancy) required, repairThreshold, successThreshold, _ := obs.loadRedundancy(segment.Redundancy)
segmentHealth := repair.SegmentHealth(numHealthy, required, obs.nodeFailureRate) segmentHealth := repair.SegmentHealth(numHealthy, required, totalNumNodes, obs.nodeFailureRate)
mon.FloatVal("checker_segment_health").Observe(segmentHealth) //mon:locked mon.FloatVal("checker_segment_health").Observe(segmentHealth) //mon:locked
stats.segmentHealth.Observe(segmentHealth) stats.segmentHealth.Observe(segmentHealth)

View File

@ -39,7 +39,8 @@ func NewReliabilityCache(overlay *overlay.Service, staleness time.Duration) *Rel
} }
} }
// LastUpdate returns when the cache was last updated. // LastUpdate returns when the cache was last updated, or the zero value (time.Time{}) if it
// has never yet been updated. LastUpdate() does not trigger an update itself.
func (cache *ReliabilityCache) LastUpdate() time.Time { func (cache *ReliabilityCache) LastUpdate() time.Time {
if state, ok := cache.state.Load().(*reliabilityState); ok { if state, ok := cache.state.Load().(*reliabilityState); ok {
return state.created return state.created
@ -47,10 +48,40 @@ func (cache *ReliabilityCache) LastUpdate() time.Time {
return time.Time{} return time.Time{}
} }
// NumNodes returns the number of online active nodes (as determined by the reliability cache).
// This number is not guaranteed to be consistent with either the nodes database or the
// reliability cache after returning; it is just a best-effort count and should be treated as an
// estimate.
func (cache *ReliabilityCache) NumNodes(ctx context.Context) (numNodes int, err error) {
defer mon.Task()(&ctx)(&err)
state, err := cache.loadFast(ctx, time.Time{})
if err != nil {
return 0, err
}
return len(state.reliable), nil
}
// MissingPieces returns piece indices that are unreliable with the given staleness period. // MissingPieces returns piece indices that are unreliable with the given staleness period.
func (cache *ReliabilityCache) MissingPieces(ctx context.Context, created time.Time, pieces []*pb.RemotePiece) (_ []int32, err error) { func (cache *ReliabilityCache) MissingPieces(ctx context.Context, created time.Time, pieces []*pb.RemotePiece) (_ []int32, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
state, err := cache.loadFast(ctx, created)
if err != nil {
return nil, err
}
var unreliable []int32
for _, piece := range pieces {
if _, ok := state.reliable[piece.NodeId]; !ok {
unreliable = append(unreliable, piece.PieceNum)
}
}
return unreliable, nil
}
func (cache *ReliabilityCache) loadFast(ctx context.Context, validUpTo time.Time) (_ *reliabilityState, err error) {
defer mon.Task()(&ctx)(&err)
// This code is designed to be very fast in the case where a refresh is not needed: just an // This code is designed to be very fast in the case where a refresh is not needed: just an
// atomic load from rarely written to bit of shared memory. The general strategy is to first // atomic load from rarely written to bit of shared memory. The general strategy is to first
// read if the state suffices to answer the query. If not (due to it not existing, being // read if the state suffices to answer the query. If not (due to it not existing, being
@ -60,10 +91,10 @@ func (cache *ReliabilityCache) MissingPieces(ctx context.Context, created time.T
// the acquisition. Only then do we refresh and can then proceed answering the query. // the acquisition. Only then do we refresh and can then proceed answering the query.
state, ok := cache.state.Load().(*reliabilityState) state, ok := cache.state.Load().(*reliabilityState)
if !ok || created.After(state.created) || time.Since(state.created) > cache.staleness { if !ok || validUpTo.After(state.created) || time.Since(state.created) > cache.staleness {
cache.mu.Lock() cache.mu.Lock()
state, ok = cache.state.Load().(*reliabilityState) state, ok = cache.state.Load().(*reliabilityState)
if !ok || created.After(state.created) || time.Since(state.created) > cache.staleness { if !ok || validUpTo.After(state.created) || time.Since(state.created) > cache.staleness {
state, err = cache.refreshLocked(ctx) state, err = cache.refreshLocked(ctx)
} }
cache.mu.Unlock() cache.mu.Unlock()
@ -71,14 +102,7 @@ func (cache *ReliabilityCache) MissingPieces(ctx context.Context, created time.T
return nil, err return nil, err
} }
} }
return state, nil
var unreliable []int32
for _, piece := range pieces {
if _, ok := state.reliable[piece.NodeId]; !ok {
unreliable = append(unreliable, piece.PieceNum)
}
}
return unreliable, nil
} }
// Refresh refreshes the cache. // Refresh refreshes the cache.

View File

@ -3,144 +3,53 @@
package repair package repair
import ( import "math"
"math"
)
// SegmentHealth returns a value corresponding to the health of a segment // SegmentHealth returns a value corresponding to the health of a segment in the
// in the repair queue. Lower health segments should be repaired first. // repair queue. Lower health segments should be repaired first.
func SegmentHealth(numHealthy, minPieces int, failureRate float64) float64 {
return 1.0 / SegmentDanger(numHealthy, minPieces, failureRate)
}
// SegmentDanger returns the chance of a segment with the given minPieces
// and the given number of healthy pieces of being lost in the next time
// period.
// //
// It assumes: // This calculation purports to find the number of iterations for which a
// segment can be expected to survive, with the given failureRate. The number of
// iterations for the segment to survive (X) can be modeled with the negative
// binomial distribution, with the number of pieces that must be lost as the
// success threshold r, and the chance of losing a single piece in a round as
// the trial success probability p.
// //
// * Nodes fail at the given failureRate (i.e., each node has a failureRate // First, we calculate the expected number of iterations for a segment to
// chance of going offline within the next time period). // survive if we were to lose exactly one node every iteration:
// * Node failures are entirely independent. Obviously this is not the case,
// because many nodes may be operated by a single entity or share network
// infrastructure, in which case their failures would be correlated. But we
// can't easily model that, so our best hope is to try to avoid putting
// pieces for the same segment on related nodes to maximize failure
// independence.
// //
// (The "time period" we are talking about here could be anything. The returned // r = numHealthy - minPieces + 1
// danger value will be given in terms of whatever time period was used to // p = (totalNodes - numHealthy) / totalNodes
// determine failureRate. If it simplifies things, you can think of the time // X ~ NB(r, p)
// period as "one repair worker iteration".)
// //
// If those things are true, then the number of nodes holding this segment // Then we take the mean of that distribution to use as our expected value,
// that will go offline follows the Binomial distribution: // which is pr/(1-p).
// //
// X ~ Binom(numHealthy, failureRate) // Finally, to get away from the "one node per iteration" simplification, we
// // just scale the magnitude of the iterations in the model so that there really
// A segment is lost if the number of nodes that go offline is higher than // is one node being lost. For example, if our failureRate and totalNodes imply
// (numHealthy - minPieces). So we want to find // a churn rate of 3 nodes per day, we just take 1/3 of a day and call that an
// // "iteration" for purposes of the model. To convert iterations in the model to
// Pr[X > (numHealthy - minPieces)] // days, we divide the mean of the negative binomial distribution (X, above) by
// // the number of nodes that we estimate will churn in one day.
// If we invert the logic here, we can use the standard CDF for the binomial func SegmentHealth(numHealthy, minPieces, totalNodes int, failureRate float64) float64 {
// distribution. churnPerRound := float64(totalNodes) * failureRate
// if churnPerRound < minChurnPerRound {
// Pr[X > (numHealthy - minPieces)] = 1 - Pr[X <= (numHealthy - minPieces)] // we artificially limit churnPerRound from going too low in cases
// // where there are not many nodes, so that health values do not
// And that gives us the danger value. // start to approach the floating point maximum
func SegmentDanger(numHealthy, minPieces int, failureRate float64) float64 { churnPerRound = minChurnPerRound
return 1.0 - binomialCDF(float64(numHealthy-minPieces), float64(numHealthy), failureRate) }
p := float64(totalNodes-numHealthy) / float64(totalNodes)
if p == 1.0 {
// floating point precision is insufficient to represent the difference
// from p to 1. there are too many nodes for this model, or else
// numHealthy is 0 somehow. we can't proceed with the normal calculation
// or we will divide by zero.
return math.Inf(1)
}
mean1 := float64(numHealthy-minPieces+1) * p / (1 - p)
return mean1 / churnPerRound
} }
// math.Lgamma without the returned sign parameter; it's unneeded here. const minChurnPerRound = 1e-10
func lnGamma(x float64) float64 {
lg, _ := math.Lgamma(x)
return lg
}
// The following functions are based on code from
// Numerical Recipes in C, Second Edition, Section 6.4 (pp. 227-228).
// betaI calculates the incomplete beta function I_x(a, b).
func betaI(a, b, x float64) float64 {
if x < 0.0 || x > 1.0 {
return math.NaN()
}
bt := 0.0
if x > 0.0 && x < 1.0 {
// factors in front of the continued function
bt = math.Exp(lnGamma(a+b) - lnGamma(a) - lnGamma(b) + a*math.Log(x) + b*math.Log(1.0-x))
}
if x < (a+1.0)/(a+b+2.0) {
// use continued fraction directly
return bt * betaCF(a, b, x) / a
}
// use continued fraction after making the symmetry transformation
return 1.0 - bt*betaCF(b, a, 1.0-x)/b
}
const (
// unlikely to go this far, as betaCF is expected to converge quickly for
// typical values.
maxIter = 100
// betaI outputs will be accurate to within this amount.
epsilon = 1.0e-14
)
// betaCF evaluates the continued fraction for the incomplete beta function
// by a modified Lentz's method.
func betaCF(a, b, x float64) float64 {
avoidZero := func(f float64) float64 {
if math.Abs(f) < math.SmallestNonzeroFloat64 {
return math.SmallestNonzeroFloat64
}
return f
}
qab := a + b
qap := a + 1.0
qam := a - 1.0
c := 1.0
d := 1.0 / avoidZero(1.0-qab*x/qap)
h := d
for m := 1; m <= maxIter; m++ {
m := float64(m)
m2 := 2.0 * m
aa := m * (b - m) * x / ((qam + m2) * (a + m2))
// one step (the even one) of the recurrence
d = 1.0 / avoidZero(1.0+aa*d)
c = avoidZero(1.0 + aa/c)
h *= d * c
aa = -(a + m) * (qab + m) * x / ((a + m2) * (qap + m2))
// next step of the recurrence (the odd one)
d = 1.0 / avoidZero(1.0+aa*d)
c = avoidZero(1.0 + aa/c)
del := d * c
h *= del
if math.Abs(del-1.0) < epsilon {
return h
}
}
// a or b too big, or maxIter too small
return math.NaN()
}
// binomialCDF evaluates the CDF of the binomial distribution Binom(n, p) at k.
// This is done using (1-p)**(n-k) when k is 0, or with the incomplete beta
// function otherwise.
func binomialCDF(k, n, p float64) float64 {
k = math.Floor(k)
if k < 0.0 || n < k {
return math.NaN()
}
if k == n {
return 1.0
}
if k == 0 {
return math.Pow(1.0-p, n-k)
}
return betaI(n-k, k+1.0, 1.0-p)
}

View File

@ -10,59 +10,45 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
func TestBetaI(t *testing.T) {
// check a few places where betaI has some easily representable values
assert.Equal(t, 0.0, betaI(0.5, 5, 0))
assert.Equal(t, 0.0, betaI(1, 3, 0))
assert.Equal(t, 0.0, betaI(8, 10, 0))
assert.Equal(t, 0.0, betaI(8, 10, 0))
assert.InDelta(t, 0.5, betaI(0.5, 0.5, 0.5), epsilon)
assert.InDelta(t, 1.0/3.0, betaI(0.5, 0.5, 0.25), epsilon)
assert.InDelta(t, 0.488, betaI(1, 3, 0.2), epsilon)
}
func BenchmarkBetaI(b *testing.B) {
for i := 0; i < b.N; i++ {
assert.InDelta(b, 1.0/3.0, betaI(0.5, 0.5, 0.25), epsilon)
}
}
func TestSegmentDanger(t *testing.T) {
const failureRate = 0.01
assert.Greater(t,
SegmentDanger(11, 10, failureRate),
SegmentDanger(10, 5, failureRate))
assert.Greater(t,
SegmentDanger(11, 10, failureRate),
SegmentDanger(10, 9, failureRate))
assert.Greater(t,
SegmentDanger(10, 10, failureRate),
SegmentDanger(9, 9, failureRate))
assert.Less(t,
SegmentDanger(11, 10, failureRate),
SegmentDanger(12, 11, failureRate))
}
func TestSegmentHealth(t *testing.T) { func TestSegmentHealth(t *testing.T) {
const failureRate = 0.01 const failureRate = 0.01
assert.Less(t, assert.Less(t,
SegmentHealth(11, 10, failureRate), SegmentHealth(11, 10, 10000, failureRate),
SegmentHealth(10, 5, failureRate)) SegmentHealth(10, 5, 10000, failureRate))
assert.Less(t, assert.Less(t,
SegmentHealth(11, 10, failureRate), SegmentHealth(11, 10, 10000, failureRate),
SegmentHealth(10, 9, failureRate)) SegmentHealth(10, 9, 10000, failureRate))
assert.Less(t, assert.Less(t,
SegmentHealth(10, 10, failureRate), SegmentHealth(10, 10, 10000, failureRate),
SegmentHealth(9, 9, failureRate)) SegmentHealth(9, 9, 10000, failureRate))
assert.Greater(t, assert.Greater(t,
SegmentHealth(11, 10, failureRate), SegmentHealth(11, 10, 10000, failureRate),
SegmentHealth(12, 11, failureRate)) SegmentHealth(12, 11, 10000, failureRate))
assert.Greater(t, assert.Greater(t,
SegmentHealth(13, 10, failureRate), SegmentHealth(13, 10, 10000, failureRate),
SegmentHealth(12, 10, failureRate)) SegmentHealth(12, 10, 10000, failureRate))
} }
func TestSegmentHealthForDecayedSegment(t *testing.T) { func TestSegmentHealthForDecayedSegment(t *testing.T) {
const failureRate = 0.01 const failureRate = 0.01
assert.True(t, math.IsNaN(SegmentHealth(9, 10, failureRate))) got := SegmentHealth(9, 10, 10000, failureRate)
assert.Equal(t, float64(0), got)
}
func TestHighHealthAndLowFailureRate(t *testing.T) {
const failureRate = 0.00005435
assert.Less(t,
SegmentHealth(36, 35, 10000, failureRate), math.Inf(1))
assert.Greater(t,
SegmentHealth(36, 35, 10000, failureRate),
SegmentHealth(35, 35, 10000, failureRate))
assert.Less(t,
SegmentHealth(60, 29, 10000, failureRate), math.Inf(1))
assert.Greater(t,
SegmentHealth(61, 29, 10000, failureRate),
SegmentHealth(60, 29, 10000, failureRate))
assert.Greater(t,
SegmentHealth(11, 10, 10000, failureRate),
SegmentHealth(39, 34, 10000, failureRate))
} }