satellite/repair/checker: optimize processing, part 2

Optimizing collecting monkit metrics:
* initialize metrics once at the begining
* avoid using string in map for getting stats structs per redundancy

Benchmark results (compared against part 1 optimization change):
name                                       old time/op    new time/op    delta
RemoteSegment/Cockroach/healthy_segment-8    31.4µs ± 6%    21.7µs ± 8%  -30.73%  (p=0.008 n=5+5)

name                                       old alloc/op   new alloc/op   delta
RemoteSegment/healthy_segment-8    10.2kB ± 0%     7.4kB ± 0%  -27.03%  (p=0.008 n=5+5)

name                                       old allocs/op  new allocs/op  delta
RemoteSegment/healthy_segment-8       250 ± 0%       150 ± 0%  -40.00%  (p=0.008 n=5+5)

Change-Id: Ie09476eb469a4d6c09e52550c8ba92b3b4b34271
This commit is contained in:
Michal Niewrzal 2023-10-06 11:33:28 +02:00
parent de4559d862
commit e3e303754b
2 changed files with 52 additions and 39 deletions

View File

@ -50,7 +50,7 @@ type Observer struct {
TotalStats aggregateStats
mu sync.Mutex
statsCollector map[string]*observerRSStats
statsCollector map[storj.RedundancyScheme]*observerRSStats
}
// NewObserver creates new checker observer instance.
@ -75,7 +75,7 @@ func NewObserver(logger *zap.Logger, repairQueue queue.RepairQueue, overlay *ove
doDeclumping: config.DoDeclumping,
doPlacementCheck: config.DoPlacementCheck,
placementRules: placementRules,
statsCollector: make(map[string]*observerRSStats),
statsCollector: make(map[storj.RedundancyScheme]*observerRSStats),
}
}
@ -213,20 +213,32 @@ func (observer *Observer) collectAggregates() {
}
}
func (observer *Observer) getObserverStats(rsString string) *observerRSStats {
func (observer *Observer) getObserverStats(redundancy storj.RedundancyScheme) *observerRSStats {
observer.mu.Lock()
defer observer.mu.Unlock()
observerStats, exists := observer.statsCollector[rsString]
observerStats, exists := observer.statsCollector[redundancy]
if !exists {
rsString := getRSString(loadRedundancy(redundancy, observer.repairOverrides))
observerStats = &observerRSStats{aggregateStats{}, newIterationRSStats(rsString), newSegmentRSStats(rsString)}
mon.Chain(observerStats)
observer.statsCollector[rsString] = observerStats
observer.statsCollector[redundancy] = observerStats
}
return observerStats
}
func loadRedundancy(redundancy storj.RedundancyScheme, repairOverrides RepairOverridesMap) (int, int, int, int) {
repair := int(redundancy.RepairShares)
overrideValue := repairOverrides.GetOverrideValue(redundancy)
if overrideValue != 0 {
repair = int(overrideValue)
}
return int(redundancy.RequiredShares), repair, int(redundancy.OptimalShares), int(redundancy.TotalShares)
}
// RefreshReliabilityCache forces refreshing node online status cache.
func (observer *Observer) RefreshReliabilityCache(ctx context.Context) error {
return observer.nodesCache.Refresh(ctx)
@ -237,7 +249,7 @@ type observerFork struct {
repairQueue *queue.InsertBuffer
nodesCache *ReliabilityCache
overlayService *overlay.Service
rsStats map[string]*partialRSStats
rsStats map[storj.RedundancyScheme]*partialRSStats
repairOverrides RepairOverridesMap
nodeFailureRate float64
getNodesEstimate func(ctx context.Context) (int, error)
@ -255,7 +267,7 @@ type observerFork struct {
doPlacementCheck bool
placementRules overlay.PlacementRules
getObserverStats func(string) *observerRSStats
getObserverStats func(storj.RedundancyScheme) *observerRSStats
}
// newObserverFork creates new observer partial instance.
@ -265,7 +277,7 @@ func newObserverFork(observer *Observer) rangedloop.Partial {
repairQueue: observer.createInsertBuffer(),
nodesCache: observer.nodesCache,
overlayService: observer.overlayService,
rsStats: make(map[string]*partialRSStats),
rsStats: make(map[storj.RedundancyScheme]*partialRSStats),
repairOverrides: observer.repairOverrides,
nodeFailureRate: observer.nodeFailureRate,
getNodesEstimate: observer.getNodesEstimate,
@ -279,33 +291,20 @@ func newObserverFork(observer *Observer) rangedloop.Partial {
}
func (fork *observerFork) getStatsByRS(redundancy storj.RedundancyScheme) *partialRSStats {
rsString := getRSString(fork.loadRedundancy(redundancy))
stats, ok := fork.rsStats[rsString]
stats, ok := fork.rsStats[redundancy]
if !ok {
observerStats := fork.getObserverStats(rsString)
observerStats := fork.getObserverStats(redundancy)
fork.rsStats[rsString] = &partialRSStats{
fork.rsStats[redundancy] = &partialRSStats{
iterationAggregates: aggregateStats{},
segmentStats: observerStats.segmentStats,
}
return fork.rsStats[rsString]
return fork.rsStats[redundancy]
}
return stats
}
func (fork *observerFork) loadRedundancy(redundancy storj.RedundancyScheme) (int, int, int, int) {
repair := int(redundancy.RepairShares)
overrideValue := fork.repairOverrides.GetOverrideValue(redundancy)
if overrideValue != 0 {
repair = int(overrideValue)
}
return int(redundancy.RequiredShares), repair, int(redundancy.OptimalShares), int(redundancy.TotalShares)
}
// Process is called repeatedly with batches of segments. It is not called
// concurrently on the same instance. Method is not concurrent-safe on it own.
func (fork *observerFork) Process(ctx context.Context, segments []rangedloop.Segment) (err error) {
@ -318,6 +317,20 @@ func (fork *observerFork) Process(ctx context.Context, segments []rangedloop.Seg
return nil
}
var (
// initialize monkit metrics once for better performance.
segmentTotalCountIntVal = mon.IntVal("checker_segment_total_count") //mon:locked
segmentHealthyCountIntVal = mon.IntVal("checker_segment_healthy_count") //mon:locked
segmentClumpedCountIntVal = mon.IntVal("checker_segment_clumped_count") //mon:locked
segmentExitingCountIntVal = mon.IntVal("checker_segment_exiting_count")
segmentOffPlacementCountIntVal = mon.IntVal("checker_segment_off_placement_count") //mon:locked
segmentAgeIntVal = mon.IntVal("checker_segment_age") //mon:locked
segmentHealthFloatVal = mon.FloatVal("checker_segment_health") //mon:locked
segmentsBelowMinReqCounter = mon.Counter("checker_segments_below_min_req") //mon:locked
injuredSegmentHealthFloatVal = mon.FloatVal("checker_injured_segment_health") //mon:locked
segmentTimeUntilIrreparableIntVal = mon.IntVal("checker_segment_time_until_irreparable") //mon:locked
)
func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segment) (err error) {
if segment.Inline() {
if fork.lastStreamID.Compare(segment.StreamID) != 0 {
@ -344,7 +357,7 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
stats.iterationAggregates.remoteSegmentsChecked++
// ensure we get values, even if only zero values, so that redash can have an alert based on this
mon.Counter("checker_segments_below_min_req").Inc(0) //mon:locked
segmentsBelowMinReqCounter.Inc(0)
pieces := segment.Pieces
if len(pieces) == 0 {
fork.log.Debug("no pieces on remote segment")
@ -378,25 +391,25 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
fork.doDeclumping, fork.placementRules(segment.Placement), fork.nodeIDs)
numHealthy := len(piecesCheck.Healthy)
mon.IntVal("checker_segment_total_count").Observe(int64(len(pieces))) //mon:locked
segmentTotalCountIntVal.Observe(int64(len(pieces)))
stats.segmentStats.segmentTotalCount.Observe(int64(len(pieces)))
mon.IntVal("checker_segment_healthy_count").Observe(int64(numHealthy)) //mon:locked
segmentHealthyCountIntVal.Observe(int64(numHealthy))
stats.segmentStats.segmentHealthyCount.Observe(int64(numHealthy))
mon.IntVal("checker_segment_clumped_count").Observe(int64(len(piecesCheck.Clumped))) //mon:locked
segmentClumpedCountIntVal.Observe(int64(len(piecesCheck.Clumped)))
stats.segmentStats.segmentClumpedCount.Observe(int64(len(piecesCheck.Clumped)))
mon.IntVal("checker_segment_exiting_count").Observe(int64(len(piecesCheck.Exiting)))
segmentExitingCountIntVal.Observe(int64(len(piecesCheck.Exiting)))
stats.segmentStats.segmentExitingCount.Observe(int64(len(piecesCheck.Exiting)))
mon.IntVal("checker_segment_off_placement_count").Observe(int64(len(piecesCheck.OutOfPlacement))) //mon:locked
segmentOffPlacementCountIntVal.Observe(int64(len(piecesCheck.OutOfPlacement)))
stats.segmentStats.segmentOffPlacementCount.Observe(int64(len(piecesCheck.OutOfPlacement)))
segmentAge := time.Since(segment.CreatedAt)
mon.IntVal("checker_segment_age").Observe(int64(segmentAge.Seconds())) //mon:locked
segmentAgeIntVal.Observe(int64(segmentAge.Seconds()))
stats.segmentStats.segmentAge.Observe(int64(segmentAge.Seconds()))
required, repairThreshold, successThreshold, _ := fork.loadRedundancy(segment.Redundancy)
required, repairThreshold, successThreshold, _ := loadRedundancy(segment.Redundancy, fork.repairOverrides)
segmentHealth := repair.SegmentHealth(numHealthy, required, totalNumNodes, fork.nodeFailureRate)
mon.FloatVal("checker_segment_health").Observe(segmentHealth) //mon:locked
segmentHealthFloatVal.Observe(segmentHealth)
stats.segmentStats.segmentHealth.Observe(segmentHealth)
// we repair when the number of healthy pieces is less than or equal to the repair threshold and is greater or equal to
@ -405,7 +418,7 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
// separate case is when we find pieces which are outside segment placement. in such case we are putting segment
// into queue right away.
if (numHealthy <= repairThreshold && numHealthy < successThreshold) || len(piecesCheck.ForcingRepair) > 0 {
mon.FloatVal("checker_injured_segment_health").Observe(segmentHealth) //mon:locked
injuredSegmentHealthFloatVal.Observe(segmentHealth)
stats.segmentStats.injuredSegmentHealth.Observe(segmentHealth)
fork.totalStats.remoteSegmentsNeedingRepair++
stats.iterationAggregates.remoteSegmentsNeedingRepair++
@ -448,13 +461,13 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
segmentAge = time.Since(segment.CreatedAt)
}
mon.IntVal("checker_segment_time_until_irreparable").Observe(int64(segmentAge.Seconds())) //mon:locked
segmentTimeUntilIrreparableIntVal.Observe(int64(segmentAge.Seconds()))
stats.segmentStats.segmentTimeUntilIrreparable.Observe(int64(segmentAge.Seconds()))
fork.totalStats.remoteSegmentsLost++
stats.iterationAggregates.remoteSegmentsLost++
mon.Counter("checker_segments_below_min_req").Inc(1) //mon:locked
segmentsBelowMinReqCounter.Inc(1)
stats.segmentStats.segmentsBelowMinReq.Inc(1)
var missingNodes []string

View File

@ -52,7 +52,7 @@ func TestObserverForkProcess(t *testing.T) {
require.NoError(t, err)
createDefaultObserver := func() *Observer {
o := &Observer{
statsCollector: make(map[string]*observerRSStats),
statsCollector: make(map[storj.RedundancyScheme]*observerRSStats),
nodesCache: &ReliabilityCache{
staleness: time.Hour,
},
@ -72,7 +72,7 @@ func TestObserverForkProcess(t *testing.T) {
return &observerFork{
log: zaptest.NewLogger(t),
getObserverStats: o.getObserverStats,
rsStats: make(map[string]*partialRSStats),
rsStats: make(map[storj.RedundancyScheme]*partialRSStats),
doDeclumping: o.doDeclumping,
doPlacementCheck: o.doPlacementCheck,
placementRules: o.placementRules,