satellite/repair/checker: buffer repair queue
Integrate previous changes. Speed up the segment loop by batch inserting into repair queue. Change-Id: Ib9f4962d91960d21bad298f7771345b0dd270276
This commit is contained in:
parent
9b49ffe48a
commit
db1cc8ca95
@ -43,6 +43,7 @@ type Checker struct {
|
||||
statsCollector *statsCollector
|
||||
repairOverrides RepairOverridesMap
|
||||
nodeFailureRate float64
|
||||
repairQueueBatchSize int
|
||||
Loop *sync2.Cycle
|
||||
}
|
||||
|
||||
@ -58,6 +59,7 @@ func NewChecker(logger *zap.Logger, repairQueue queue.RepairQueue, metabase *met
|
||||
statsCollector: newStatsCollector(),
|
||||
repairOverrides: config.RepairOverrides.GetMap(),
|
||||
nodeFailureRate: config.NodeFailureRate,
|
||||
repairQueueBatchSize: config.RepairQueueInsertBatchSize,
|
||||
|
||||
Loop: sync2.NewCycle(config.Interval),
|
||||
}
|
||||
@ -92,6 +94,10 @@ func (checker *Checker) getNodesEstimate(ctx context.Context) (int, error) {
|
||||
return totalNumNodes, nil
|
||||
}
|
||||
|
||||
func (checker *Checker) createInsertBuffer() *queue.InsertBuffer {
|
||||
return queue.NewInsertBuffer(checker.repairQueue, checker.repairQueueBatchSize)
|
||||
}
|
||||
|
||||
// RefreshReliabilityCache forces refreshing node online status cache.
|
||||
func (checker *Checker) RefreshReliabilityCache(ctx context.Context) error {
|
||||
return checker.nodestate.Refresh(ctx)
|
||||
@ -110,7 +116,7 @@ func (checker *Checker) IdentifyInjuredSegments(ctx context.Context) (err error)
|
||||
startTime := time.Now()
|
||||
|
||||
observer := &checkerObserver{
|
||||
repairQueue: checker.repairQueue,
|
||||
repairQueue: checker.createInsertBuffer(),
|
||||
nodestate: checker.nodestate,
|
||||
statsCollector: checker.statsCollector,
|
||||
monStats: aggregateStats{},
|
||||
@ -127,6 +133,11 @@ func (checker *Checker) IdentifyInjuredSegments(ctx context.Context) (err error)
|
||||
return nil
|
||||
}
|
||||
|
||||
err = observer.repairQueue.Flush(ctx)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
// remove all segments which were not seen as unhealthy by this checker iteration
|
||||
healthyDeleted, err := checker.repairQueue.Clean(ctx, startTime)
|
||||
if err != nil {
|
||||
@ -163,7 +174,7 @@ var _ segmentloop.Observer = (*checkerObserver)(nil)
|
||||
//
|
||||
// architecture: Observer
|
||||
type checkerObserver struct {
|
||||
repairQueue queue.RepairQueue
|
||||
repairQueue *queue.InsertBuffer
|
||||
nodestate *ReliabilityCache
|
||||
statsCollector *statsCollector
|
||||
monStats aggregateStats // TODO(cam): once we verify statsCollector reports data correctly, remove this
|
||||
@ -282,22 +293,22 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *segmentl
|
||||
stats.injuredSegmentHealth.Observe(segmentHealth)
|
||||
obs.monStats.remoteSegmentsNeedingRepair++
|
||||
stats.iterationAggregates.remoteSegmentsNeedingRepair++
|
||||
alreadyInserted, err := obs.repairQueue.Insert(ctx, &queue.InjuredSegment{
|
||||
err := obs.repairQueue.Insert(ctx, &queue.InjuredSegment{
|
||||
StreamID: segment.StreamID,
|
||||
Position: segment.Position,
|
||||
UpdatedAt: time.Now().UTC(),
|
||||
SegmentHealth: segmentHealth,
|
||||
}, func() {
|
||||
// Counters are increased after the queue has determined
|
||||
// that the segment wasn't already queued for repair.
|
||||
obs.monStats.newRemoteSegmentsNeedingRepair++
|
||||
stats.iterationAggregates.newRemoteSegmentsNeedingRepair++
|
||||
})
|
||||
if err != nil {
|
||||
obs.log.Error("error adding injured segment to queue", zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
|
||||
if !alreadyInserted {
|
||||
obs.monStats.newRemoteSegmentsNeedingRepair++
|
||||
stats.iterationAggregates.newRemoteSegmentsNeedingRepair++
|
||||
}
|
||||
|
||||
// monitor irreperable segments
|
||||
if numHealthy < required {
|
||||
if !containsStreamID(obs.monStats.objectsLost, segment.StreamID) {
|
||||
|
@ -22,6 +22,7 @@ type Config struct {
|
||||
// Node failure rate is an estimation based on a 6 hour checker run interval (4 checker iterations per day), a network of about 9200 nodes, and about 2 nodes churning per day.
|
||||
// This results in `2/9200/4 = 0.00005435` being the probability of any single node going down in the interval of one checker iteration.
|
||||
NodeFailureRate float64 `help:"the probability of a single node going down within the next checker iteration" default:"0.00005435" `
|
||||
RepairQueueInsertBatchSize int `help:"Number of damaged segments to buffer in-memory before flushing to the repair queue" default:"100" `
|
||||
}
|
||||
|
||||
// RepairOverride is a configuration struct that contains an override repair
|
||||
|
@ -5,8 +5,12 @@ package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/spacemonkeygo/monkit/v3"
|
||||
)
|
||||
|
||||
var mon = monkit.Package()
|
||||
|
||||
// InsertBuffer exposes a synchronous API to buffer a batch of segments
|
||||
// and insert them at once. Not threadsafe. Call Flush() before discarding.
|
||||
type InsertBuffer struct {
|
||||
@ -56,6 +60,8 @@ func (r *InsertBuffer) Insert(
|
||||
|
||||
// Flush inserts the remaining segments into the database.
|
||||
func (r *InsertBuffer) Flush(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
newlyInsertedSegments, err := r.queue.InsertBatch(ctx, r.batch)
|
||||
if err != nil {
|
||||
return err
|
||||
|
3
scripts/testdata/satellite-config.yaml.lock
vendored
3
scripts/testdata/satellite-config.yaml.lock
vendored
@ -112,6 +112,9 @@
|
||||
# comma-separated override values for repair threshold in the format k/o/n-override (min/optimal/total-override)
|
||||
# checker.repair-overrides: 29/80/110-52,29/80/95-52,29/80/130-52
|
||||
|
||||
# Number of damaged segments to buffer in-memory before flushing to the repair queue
|
||||
# checker.repair-queue-insert-batch-size: 100
|
||||
|
||||
# percent of held amount disposed to node after leaving withheld
|
||||
compensation.dispose-percent: 50
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user