diff --git a/satellite/repair/checker/checker.go b/satellite/repair/checker/checker.go index f1c2a4f8d..2f99527f4 100644 --- a/satellite/repair/checker/checker.go +++ b/satellite/repair/checker/checker.go @@ -84,6 +84,28 @@ func (checker *Checker) Run(ctx context.Context) (err error) { return group.Wait() } +// getNodesEstimate updates the estimate of the total number of nodes. It is guaranteed +// to return a number greater than 0 when the error is nil. +// +// We can't calculate this upon first starting a Checker, because there may not be any +// nodes yet. We expect that there will be nodes before there are segments, though. +func (checker *Checker) getNodesEstimate(ctx context.Context) (int, error) { + // this should be safe to call frequently; it is an efficient caching lookup. + totalNumNodes, err := checker.nodestate.NumNodes(ctx) + if err != nil { + // We could proceed here by returning the last good value, or by returning a fallback + // constant estimate, like "20000", and we'd probably be fine, but it would be better + // not to have that happen silently for too long. Also, if we can't get this from the + // database, we probably can't modify the injured segments queue, so it won't help to + // proceed with this repair operation. + return 0, err + } + if totalNumNodes == 0 { + return 0, Error.New("segment health is meaningless: there are no nodes") + } + return totalNumNodes, nil +} + // RefreshReliabilityCache forces refreshing node online status cache. func (checker *Checker) RefreshReliabilityCache(ctx context.Context) error { return checker.nodestate.Refresh(ctx) @@ -102,14 +124,15 @@ func (checker *Checker) IdentifyInjuredSegments(ctx context.Context) (err error) startTime := time.Now() observer := &checkerObserver{ - repairQueue: checker.repairQueue, - irrdb: checker.irrdb, - nodestate: checker.nodestate, - statsCollector: checker.statsCollector, - monStats: aggregateStats{}, - repairOverrides: checker.repairOverrides, - nodeFailureRate: checker.nodeFailureRate, - log: checker.logger, + repairQueue: checker.repairQueue, + irrdb: checker.irrdb, + nodestate: checker.nodestate, + statsCollector: checker.statsCollector, + monStats: aggregateStats{}, + repairOverrides: checker.repairOverrides, + nodeFailureRate: checker.nodeFailureRate, + getNodesEstimate: checker.getNodesEstimate, + log: checker.logger, } err = checker.metaLoop.Join(ctx, observer) if err != nil { @@ -187,6 +210,11 @@ func (checker *Checker) updateIrreparableSegmentStatus(ctx context.Context, poin repairThreshold = overrideValue } + totalNumNodes, err := checker.getNodesEstimate(ctx) + if err != nil { + return Error.New("could not get estimate of total number of nodes: %w", err) + } + // we repair when the number of healthy pieces is less than or equal to the repair threshold and is greater or equal to // minimum required pieces in redundancy // except for the case when the repair and success thresholds are the same (a case usually seen during testing) @@ -194,7 +222,7 @@ func (checker *Checker) updateIrreparableSegmentStatus(ctx context.Context, poin // If the segment is suddenly entirely healthy again, we don't need to repair and we don't need to // keep it in the irreparabledb queue either. if numHealthy >= redundancy.MinReq && numHealthy <= repairThreshold && numHealthy < redundancy.SuccessThreshold { - segmentHealth := float64(numHealthy) + segmentHealth := repair.SegmentHealth(int(numHealthy), int(redundancy.MinReq), totalNumNodes, checker.nodeFailureRate) _, err = checker.repairQueue.Insert(ctx, &internalpb.InjuredSegment{ Path: key, LostPieces: missingPieces, @@ -240,14 +268,15 @@ var _ metainfo.Observer = (*checkerObserver)(nil) // // architecture: Observer type checkerObserver struct { - repairQueue queue.RepairQueue - irrdb irreparable.DB - nodestate *ReliabilityCache - statsCollector *statsCollector - monStats aggregateStats // TODO(cam): once we verify statsCollector reports data correctly, remove this - repairOverrides RepairOverridesMap - nodeFailureRate float64 - log *zap.Logger + repairQueue queue.RepairQueue + irrdb irreparable.DB + nodestate *ReliabilityCache + statsCollector *statsCollector + monStats aggregateStats // TODO(cam): once we verify statsCollector reports data correctly, remove this + repairOverrides RepairOverridesMap + nodeFailureRate float64 + getNodesEstimate func(ctx context.Context) (int, error) + log *zap.Logger } func (obs *checkerObserver) getStatsByRS(redundancy storj.RedundancyScheme) *stats { @@ -295,6 +324,11 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metainfo } } + totalNumNodes, err := obs.getNodesEstimate(ctx) + if err != nil { + return Error.New("could not get estimate of total number of nodes: %w", err) + } + // TODO: update MissingPieces to accept metabase.Pieces missingPieces, err := obs.nodestate.MissingPieces(ctx, segment.CreationDate, pbPieces) if err != nil { @@ -315,7 +349,7 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metainfo required, repairThreshold, successThreshold, _ := obs.loadRedundancy(segment.Redundancy) - segmentHealth := repair.SegmentHealth(numHealthy, required, obs.nodeFailureRate) + segmentHealth := repair.SegmentHealth(numHealthy, required, totalNumNodes, obs.nodeFailureRate) mon.FloatVal("checker_segment_health").Observe(segmentHealth) //mon:locked stats.segmentHealth.Observe(segmentHealth) diff --git a/satellite/repair/checker/online.go b/satellite/repair/checker/online.go index 050265934..f8d0ea4eb 100644 --- a/satellite/repair/checker/online.go +++ b/satellite/repair/checker/online.go @@ -39,7 +39,8 @@ func NewReliabilityCache(overlay *overlay.Service, staleness time.Duration) *Rel } } -// LastUpdate returns when the cache was last updated. +// LastUpdate returns when the cache was last updated, or the zero value (time.Time{}) if it +// has never yet been updated. LastUpdate() does not trigger an update itself. func (cache *ReliabilityCache) LastUpdate() time.Time { if state, ok := cache.state.Load().(*reliabilityState); ok { return state.created @@ -47,10 +48,40 @@ func (cache *ReliabilityCache) LastUpdate() time.Time { return time.Time{} } +// NumNodes returns the number of online active nodes (as determined by the reliability cache). +// This number is not guaranteed to be consistent with either the nodes database or the +// reliability cache after returning; it is just a best-effort count and should be treated as an +// estimate. +func (cache *ReliabilityCache) NumNodes(ctx context.Context) (numNodes int, err error) { + defer mon.Task()(&ctx)(&err) + + state, err := cache.loadFast(ctx, time.Time{}) + if err != nil { + return 0, err + } + return len(state.reliable), nil +} + // MissingPieces returns piece indices that are unreliable with the given staleness period. func (cache *ReliabilityCache) MissingPieces(ctx context.Context, created time.Time, pieces []*pb.RemotePiece) (_ []int32, err error) { defer mon.Task()(&ctx)(&err) + state, err := cache.loadFast(ctx, created) + if err != nil { + return nil, err + } + var unreliable []int32 + for _, piece := range pieces { + if _, ok := state.reliable[piece.NodeId]; !ok { + unreliable = append(unreliable, piece.PieceNum) + } + } + return unreliable, nil +} + +func (cache *ReliabilityCache) loadFast(ctx context.Context, validUpTo time.Time) (_ *reliabilityState, err error) { + defer mon.Task()(&ctx)(&err) + // This code is designed to be very fast in the case where a refresh is not needed: just an // atomic load from rarely written to bit of shared memory. The general strategy is to first // read if the state suffices to answer the query. If not (due to it not existing, being @@ -60,10 +91,10 @@ func (cache *ReliabilityCache) MissingPieces(ctx context.Context, created time.T // the acquisition. Only then do we refresh and can then proceed answering the query. state, ok := cache.state.Load().(*reliabilityState) - if !ok || created.After(state.created) || time.Since(state.created) > cache.staleness { + if !ok || validUpTo.After(state.created) || time.Since(state.created) > cache.staleness { cache.mu.Lock() state, ok = cache.state.Load().(*reliabilityState) - if !ok || created.After(state.created) || time.Since(state.created) > cache.staleness { + if !ok || validUpTo.After(state.created) || time.Since(state.created) > cache.staleness { state, err = cache.refreshLocked(ctx) } cache.mu.Unlock() @@ -71,14 +102,7 @@ func (cache *ReliabilityCache) MissingPieces(ctx context.Context, created time.T return nil, err } } - - var unreliable []int32 - for _, piece := range pieces { - if _, ok := state.reliable[piece.NodeId]; !ok { - unreliable = append(unreliable, piece.PieceNum) - } - } - return unreliable, nil + return state, nil } // Refresh refreshes the cache. diff --git a/satellite/repair/priority.go b/satellite/repair/priority.go index 4194483c8..eb8c35116 100644 --- a/satellite/repair/priority.go +++ b/satellite/repair/priority.go @@ -3,144 +3,53 @@ package repair -import ( - "math" -) +import "math" -// SegmentHealth returns a value corresponding to the health of a segment -// in the repair queue. Lower health segments should be repaired first. -func SegmentHealth(numHealthy, minPieces int, failureRate float64) float64 { - return 1.0 / SegmentDanger(numHealthy, minPieces, failureRate) +// SegmentHealth returns a value corresponding to the health of a segment in the +// repair queue. Lower health segments should be repaired first. +// +// This calculation purports to find the number of iterations for which a +// segment can be expected to survive, with the given failureRate. The number of +// iterations for the segment to survive (X) can be modeled with the negative +// binomial distribution, with the number of pieces that must be lost as the +// success threshold r, and the chance of losing a single piece in a round as +// the trial success probability p. +// +// First, we calculate the expected number of iterations for a segment to +// survive if we were to lose exactly one node every iteration: +// +// r = numHealthy - minPieces + 1 +// p = (totalNodes - numHealthy) / totalNodes +// X ~ NB(r, p) +// +// Then we take the mean of that distribution to use as our expected value, +// which is pr/(1-p). +// +// Finally, to get away from the "one node per iteration" simplification, we +// just scale the magnitude of the iterations in the model so that there really +// is one node being lost. For example, if our failureRate and totalNodes imply +// a churn rate of 3 nodes per day, we just take 1/3 of a day and call that an +// "iteration" for purposes of the model. To convert iterations in the model to +// days, we divide the mean of the negative binomial distribution (X, above) by +// the number of nodes that we estimate will churn in one day. +func SegmentHealth(numHealthy, minPieces, totalNodes int, failureRate float64) float64 { + churnPerRound := float64(totalNodes) * failureRate + if churnPerRound < minChurnPerRound { + // we artificially limit churnPerRound from going too low in cases + // where there are not many nodes, so that health values do not + // start to approach the floating point maximum + churnPerRound = minChurnPerRound + } + p := float64(totalNodes-numHealthy) / float64(totalNodes) + if p == 1.0 { + // floating point precision is insufficient to represent the difference + // from p to 1. there are too many nodes for this model, or else + // numHealthy is 0 somehow. we can't proceed with the normal calculation + // or we will divide by zero. + return math.Inf(1) + } + mean1 := float64(numHealthy-minPieces+1) * p / (1 - p) + return mean1 / churnPerRound } -// SegmentDanger returns the chance of a segment with the given minPieces -// and the given number of healthy pieces of being lost in the next time -// period. -// -// It assumes: -// -// * Nodes fail at the given failureRate (i.e., each node has a failureRate -// chance of going offline within the next time period). -// * Node failures are entirely independent. Obviously this is not the case, -// because many nodes may be operated by a single entity or share network -// infrastructure, in which case their failures would be correlated. But we -// can't easily model that, so our best hope is to try to avoid putting -// pieces for the same segment on related nodes to maximize failure -// independence. -// -// (The "time period" we are talking about here could be anything. The returned -// danger value will be given in terms of whatever time period was used to -// determine failureRate. If it simplifies things, you can think of the time -// period as "one repair worker iteration".) -// -// If those things are true, then the number of nodes holding this segment -// that will go offline follows the Binomial distribution: -// -// X ~ Binom(numHealthy, failureRate) -// -// A segment is lost if the number of nodes that go offline is higher than -// (numHealthy - minPieces). So we want to find -// -// Pr[X > (numHealthy - minPieces)] -// -// If we invert the logic here, we can use the standard CDF for the binomial -// distribution. -// -// Pr[X > (numHealthy - minPieces)] = 1 - Pr[X <= (numHealthy - minPieces)] -// -// And that gives us the danger value. -func SegmentDanger(numHealthy, minPieces int, failureRate float64) float64 { - return 1.0 - binomialCDF(float64(numHealthy-minPieces), float64(numHealthy), failureRate) -} - -// math.Lgamma without the returned sign parameter; it's unneeded here. -func lnGamma(x float64) float64 { - lg, _ := math.Lgamma(x) - return lg -} - -// The following functions are based on code from -// Numerical Recipes in C, Second Edition, Section 6.4 (pp. 227-228). - -// betaI calculates the incomplete beta function I_x(a, b). -func betaI(a, b, x float64) float64 { - if x < 0.0 || x > 1.0 { - return math.NaN() - } - bt := 0.0 - if x > 0.0 && x < 1.0 { - // factors in front of the continued function - bt = math.Exp(lnGamma(a+b) - lnGamma(a) - lnGamma(b) + a*math.Log(x) + b*math.Log(1.0-x)) - } - if x < (a+1.0)/(a+b+2.0) { - // use continued fraction directly - return bt * betaCF(a, b, x) / a - } - // use continued fraction after making the symmetry transformation - return 1.0 - bt*betaCF(b, a, 1.0-x)/b -} - -const ( - // unlikely to go this far, as betaCF is expected to converge quickly for - // typical values. - maxIter = 100 - - // betaI outputs will be accurate to within this amount. - epsilon = 1.0e-14 -) - -// betaCF evaluates the continued fraction for the incomplete beta function -// by a modified Lentz's method. -func betaCF(a, b, x float64) float64 { - avoidZero := func(f float64) float64 { - if math.Abs(f) < math.SmallestNonzeroFloat64 { - return math.SmallestNonzeroFloat64 - } - return f - } - - qab := a + b - qap := a + 1.0 - qam := a - 1.0 - c := 1.0 - d := 1.0 / avoidZero(1.0-qab*x/qap) - h := d - - for m := 1; m <= maxIter; m++ { - m := float64(m) - m2 := 2.0 * m - aa := m * (b - m) * x / ((qam + m2) * (a + m2)) - // one step (the even one) of the recurrence - d = 1.0 / avoidZero(1.0+aa*d) - c = avoidZero(1.0 + aa/c) - h *= d * c - aa = -(a + m) * (qab + m) * x / ((a + m2) * (qap + m2)) - // next step of the recurrence (the odd one) - d = 1.0 / avoidZero(1.0+aa*d) - c = avoidZero(1.0 + aa/c) - del := d * c - h *= del - if math.Abs(del-1.0) < epsilon { - return h - } - } - // a or b too big, or maxIter too small - return math.NaN() -} - -// binomialCDF evaluates the CDF of the binomial distribution Binom(n, p) at k. -// This is done using (1-p)**(n-k) when k is 0, or with the incomplete beta -// function otherwise. -func binomialCDF(k, n, p float64) float64 { - k = math.Floor(k) - if k < 0.0 || n < k { - return math.NaN() - } - if k == n { - return 1.0 - } - if k == 0 { - return math.Pow(1.0-p, n-k) - } - return betaI(n-k, k+1.0, 1.0-p) -} +const minChurnPerRound = 1e-10 diff --git a/satellite/repair/priority_test.go b/satellite/repair/priority_test.go index c09b050cb..59959da08 100644 --- a/satellite/repair/priority_test.go +++ b/satellite/repair/priority_test.go @@ -10,59 +10,45 @@ import ( "github.com/stretchr/testify/assert" ) -func TestBetaI(t *testing.T) { - // check a few places where betaI has some easily representable values - assert.Equal(t, 0.0, betaI(0.5, 5, 0)) - assert.Equal(t, 0.0, betaI(1, 3, 0)) - assert.Equal(t, 0.0, betaI(8, 10, 0)) - assert.Equal(t, 0.0, betaI(8, 10, 0)) - assert.InDelta(t, 0.5, betaI(0.5, 0.5, 0.5), epsilon) - assert.InDelta(t, 1.0/3.0, betaI(0.5, 0.5, 0.25), epsilon) - assert.InDelta(t, 0.488, betaI(1, 3, 0.2), epsilon) -} - -func BenchmarkBetaI(b *testing.B) { - for i := 0; i < b.N; i++ { - assert.InDelta(b, 1.0/3.0, betaI(0.5, 0.5, 0.25), epsilon) - } -} - -func TestSegmentDanger(t *testing.T) { - const failureRate = 0.01 - assert.Greater(t, - SegmentDanger(11, 10, failureRate), - SegmentDanger(10, 5, failureRate)) - assert.Greater(t, - SegmentDanger(11, 10, failureRate), - SegmentDanger(10, 9, failureRate)) - assert.Greater(t, - SegmentDanger(10, 10, failureRate), - SegmentDanger(9, 9, failureRate)) - assert.Less(t, - SegmentDanger(11, 10, failureRate), - SegmentDanger(12, 11, failureRate)) -} - func TestSegmentHealth(t *testing.T) { const failureRate = 0.01 assert.Less(t, - SegmentHealth(11, 10, failureRate), - SegmentHealth(10, 5, failureRate)) + SegmentHealth(11, 10, 10000, failureRate), + SegmentHealth(10, 5, 10000, failureRate)) assert.Less(t, - SegmentHealth(11, 10, failureRate), - SegmentHealth(10, 9, failureRate)) + SegmentHealth(11, 10, 10000, failureRate), + SegmentHealth(10, 9, 10000, failureRate)) assert.Less(t, - SegmentHealth(10, 10, failureRate), - SegmentHealth(9, 9, failureRate)) + SegmentHealth(10, 10, 10000, failureRate), + SegmentHealth(9, 9, 10000, failureRate)) assert.Greater(t, - SegmentHealth(11, 10, failureRate), - SegmentHealth(12, 11, failureRate)) + SegmentHealth(11, 10, 10000, failureRate), + SegmentHealth(12, 11, 10000, failureRate)) assert.Greater(t, - SegmentHealth(13, 10, failureRate), - SegmentHealth(12, 10, failureRate)) + SegmentHealth(13, 10, 10000, failureRate), + SegmentHealth(12, 10, 10000, failureRate)) } func TestSegmentHealthForDecayedSegment(t *testing.T) { const failureRate = 0.01 - assert.True(t, math.IsNaN(SegmentHealth(9, 10, failureRate))) + got := SegmentHealth(9, 10, 10000, failureRate) + assert.Equal(t, float64(0), got) +} + +func TestHighHealthAndLowFailureRate(t *testing.T) { + const failureRate = 0.00005435 + assert.Less(t, + SegmentHealth(36, 35, 10000, failureRate), math.Inf(1)) + assert.Greater(t, + SegmentHealth(36, 35, 10000, failureRate), + SegmentHealth(35, 35, 10000, failureRate)) + assert.Less(t, + SegmentHealth(60, 29, 10000, failureRate), math.Inf(1)) + assert.Greater(t, + SegmentHealth(61, 29, 10000, failureRate), + SegmentHealth(60, 29, 10000, failureRate)) + + assert.Greater(t, + SegmentHealth(11, 10, 10000, failureRate), + SegmentHealth(39, 34, 10000, failureRate)) }