satellite/rangedloop: migrate segments verification from segment loop

Segments loop have build-in sanity check to verify if number of segments
processed by loop is roughly fine. We want to have the same verification
for ranged loop.

https://github.com/storj/storj/issues/5544

Change-Id: Ia19edc0fb4aa8dc45993498a8e6a4eb5928485e9
This commit is contained in:
Michal Niewrzal 2023-03-06 10:30:53 +01:00 committed by Antonio Franco (He/Him)
parent f4ea730e69
commit 67ad792d1a
5 changed files with 99 additions and 18 deletions

View File

@ -10,6 +10,7 @@ import (
"github.com/spacemonkeygo/monkit/v3"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/segmentloop"
)
@ -19,22 +20,44 @@ var _ Partial = (*LiveCountObserver)(nil)
// LiveCountObserver reports a count of segments during loop execution.
// This can be used to report the rate and progress of the loop.
// TODO we may need better name for this type.
type LiveCountObserver struct {
numSegments int64
metabase *metabase.DB
suspiciousProcessedRatio float64
asOfSystemInterval time.Duration
segmentsProcessed int64
segmentsBefore int64
}
// NewLiveCountObserver .
// To avoid pollution, make sure to only use one instance of this observer.
// Also make sure to only add it to instances of the loop which are actually doing something.
func NewLiveCountObserver() *LiveCountObserver {
liveCount := &LiveCountObserver{}
func NewLiveCountObserver(metabase *metabase.DB, suspiciousProcessedRatio float64, asOfSystemInterval time.Duration) *LiveCountObserver {
liveCount := &LiveCountObserver{
metabase: metabase,
suspiciousProcessedRatio: suspiciousProcessedRatio,
asOfSystemInterval: asOfSystemInterval,
}
mon.Chain(liveCount)
return liveCount
}
// Start resets the count at start of the ranged segment loop.
func (o *LiveCountObserver) Start(context.Context, time.Time) error {
atomic.StoreInt64(&o.numSegments, 0)
// Start resets the count at start of the ranged segment loop and gets
// statistis about segments table.
func (o *LiveCountObserver) Start(ctx context.Context, startTime time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
atomic.StoreInt64(&o.segmentsProcessed, 0)
stats, err := o.metabase.GetTableStats(ctx, metabase.GetTableStats{
AsOfSystemInterval: o.asOfSystemInterval,
})
if err != nil {
return err
}
o.segmentsBefore = stats.SegmentCount
return nil
}
@ -48,20 +71,70 @@ func (o *LiveCountObserver) Join(ctx context.Context, partial Partial) error {
return nil
}
// Finish does nothing at loop end.
func (o *LiveCountObserver) Finish(ctx context.Context) error {
return nil
}
// Process increments the counter.
func (o *LiveCountObserver) Process(ctx context.Context, segments []segmentloop.Segment) error {
processed := atomic.AddInt64(&o.numSegments, int64(len(segments)))
processed := atomic.AddInt64(&o.segmentsProcessed, int64(len(segments)))
mon.IntVal("segmentsProcessed").Observe(processed)
return nil
}
// Finish gets segments count after range execution and verifies them against
// processed segments and segments in table before loop execution.
func (o *LiveCountObserver) Finish(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
stats, err := o.metabase.GetTableStats(ctx, metabase.GetTableStats{
AsOfSystemInterval: o.asOfSystemInterval,
})
if err != nil {
return err
}
segmentsProcessed := atomic.LoadInt64(&o.segmentsProcessed)
return o.verifyCount(o.segmentsBefore, stats.SegmentCount, segmentsProcessed)
}
// Stats implements monkit.StatSource to report the number of segments.
func (o *LiveCountObserver) Stats(cb func(key monkit.SeriesKey, field string, val float64)) {
cb(monkit.NewSeriesKey("rangedloop_live"), "num_segments", float64(atomic.LoadInt64(&o.numSegments)))
cb(monkit.NewSeriesKey("rangedloop_live"), "num_segments", float64(atomic.LoadInt64(&o.segmentsProcessed)))
}
func (o *LiveCountObserver) verifyCount(before, after, processed int64) error {
low, high := before, after
if low > high {
low, high = high, low
}
var deltaFromBounds int64
var ratio float64
if processed < low {
deltaFromBounds = low - processed
// +1 to avoid division by zero
ratio = float64(deltaFromBounds) / float64(low+1)
} else if processed > high {
deltaFromBounds = processed - high
// +1 to avoid division by zero
ratio = float64(deltaFromBounds) / float64(high+1)
}
mon.IntVal("segmentloop_verify_before").Observe(before)
mon.IntVal("segmentloop_verify_after").Observe(after)
mon.IntVal("segmentloop_verify_processed").Observe(processed)
mon.IntVal("segmentloop_verify_outside").Observe(deltaFromBounds)
mon.FloatVal("segmentloop_verify_outside_ratio").Observe(ratio)
// If we have very few items from the bounds, then it's expected and the ratio does not capture it well.
const minimumDeltaThreshold = 100
if deltaFromBounds < minimumDeltaThreshold {
return nil
}
if ratio > o.suspiciousProcessedRatio {
mon.Event("ranged_loop_suspicious_segments_count")
return Error.New("processed count looks suspicious: before:%v after:%v processed:%v ratio:%v threshold:%v", before, after, processed, ratio, o.suspiciousProcessedRatio)
}
return nil
}

View File

@ -21,6 +21,9 @@ import (
var (
mon = monkit.Package()
ev = eventkit.Package()
// Error is a standard error class for this component.
Error = errs.Class("ranged loop")
)
// Config contains configurable values for the shared loop.
@ -29,6 +32,8 @@ type Config struct {
BatchSize int `help:"how many items to query in a batch" default:"2500"`
AsOfSystemInterval time.Duration `help:"as of system interval" releaseDefault:"-5m" devDefault:"-1us" testDefault:"-1us"`
Interval time.Duration `help:"how often to run the loop" releaseDefault:"2h" devDefault:"10s" testDefault:"10s"`
SuspiciousProcessedRatio float64 `help:"ratio where to consider processed count as supicious" default:"0.03"`
}
// Service iterates through all segments and calls the attached observers for every segment
@ -159,7 +164,7 @@ func (service *Service) RunOnce(ctx context.Context) (observerDurations []Observ
return nil, errs.Combine(errList...)
}
return finishObservers(ctx, service.log, observerStates)
return finishObservers(ctx, service.log, observerStates), nil
}
func createGoroutineClosure(ctx context.Context, rangeProvider SegmentProvider, states []*rangeObserverState) func() error {
@ -205,14 +210,14 @@ func startObserver(ctx context.Context, log *zap.Logger, startTime time.Time, ob
}
}
func finishObservers(ctx context.Context, log *zap.Logger, observerStates []observerState) (observerDurations []ObserverDuration, err error) {
func finishObservers(ctx context.Context, log *zap.Logger, observerStates []observerState) (observerDurations []ObserverDuration) {
for _, state := range observerStates {
observerDurations = append(observerDurations, finishObserver(ctx, log, state))
}
sendObserverDurations(observerDurations)
return observerDurations, nil
return observerDurations
}
// Iterating over the segments is done.

View File

@ -402,7 +402,7 @@ func TestAllInOne(t *testing.T) {
bfConfig.AccessGrant = accessGrant
service := rangedloop.NewService(log, config, metabaseProvider, []rangedloop.Observer{
rangedloop.NewLiveCountObserver(),
rangedloop.NewLiveCountObserver(satellite.Metabase.DB, config.SuspiciousProcessedRatio, config.AsOfSystemInterval),
metrics.NewObserver(),
nodetally.NewRangedLoopObserver(log.Named("accounting:nodetally"),
satellite.DB.StoragenodeAccounting(),

View File

@ -171,7 +171,7 @@ func NewRangedLoop(log *zap.Logger, db DB, metabaseDB *metabase.DB, config *Conf
{ // setup ranged loop
observers := []rangedloop.Observer{
rangedloop.NewLiveCountObserver(),
rangedloop.NewLiveCountObserver(metabaseDB, config.RangedLoop.SuspiciousProcessedRatio, config.RangedLoop.AsOfSystemInterval),
}
if config.Audit.UseRangedLoop {

View File

@ -910,6 +910,9 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
# how many chunks of segments to process in parallel
# ranged-loop.parallelism: 2
# ratio where to consider processed count as supicious
# ranged-loop.suspicious-processed-ratio: 0.03
# time limit for downloading pieces from a node for repair
# repairer.download-timeout: 5m0s