storj/satellite/metabase/rangedloop/observerlivecount.go
Michal Niewrzal 4bdbb25d83 satellite/metabase/rangedloop: move Segment definition
We will remove segments loop soon so we need first to move
Segment definition to rangedloop package.

https://github.com/storj/storj/issues/5237

Change-Id: Ibe6aad316ffb7073cc4de166f1f17b87aac07363
2023-05-16 12:37:17 +00:00

140 lines
4.4 KiB
Go

// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package rangedloop
import (
"context"
"sync/atomic"
"time"
"github.com/spacemonkeygo/monkit/v3"
"storj.io/storj/satellite/metabase"
)
var _ monkit.StatSource = (*LiveCountObserver)(nil)
var _ Observer = (*LiveCountObserver)(nil)
var _ Partial = (*LiveCountObserver)(nil)
// LiveCountObserver reports a count of segments during loop execution.
// This can be used to report the rate and progress of the loop.
// TODO we may need better name for this type.
type LiveCountObserver struct {
metabase *metabase.DB
suspiciousProcessedRatio float64
asOfSystemInterval time.Duration
segmentsProcessed int64
segmentsBefore int64
}
// NewLiveCountObserver .
// To avoid pollution, make sure to only use one instance of this observer.
// Also make sure to only add it to instances of the loop which are actually doing something.
func NewLiveCountObserver(metabase *metabase.DB, suspiciousProcessedRatio float64, asOfSystemInterval time.Duration) *LiveCountObserver {
liveCount := &LiveCountObserver{
metabase: metabase,
suspiciousProcessedRatio: suspiciousProcessedRatio,
asOfSystemInterval: asOfSystemInterval,
}
mon.Chain(liveCount)
return liveCount
}
// Start resets the count at start of the ranged segment loop and gets
// statistis about segments table.
func (o *LiveCountObserver) Start(ctx context.Context, startTime time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
atomic.StoreInt64(&o.segmentsProcessed, 0)
stats, err := o.metabase.GetTableStats(ctx, metabase.GetTableStats{
AsOfSystemInterval: o.asOfSystemInterval,
})
if err != nil {
return err
}
o.segmentsBefore = stats.SegmentCount
return nil
}
// Fork returns a shared instance so we have a view of all loop ranges.
func (o *LiveCountObserver) Fork(ctx context.Context) (Partial, error) {
return o, nil
}
// Join does nothing because the instance is shared across ranges.
func (o *LiveCountObserver) Join(ctx context.Context, partial Partial) error {
return nil
}
// Process increments the counter.
func (o *LiveCountObserver) Process(ctx context.Context, segments []Segment) error {
processed := atomic.AddInt64(&o.segmentsProcessed, int64(len(segments)))
mon.IntVal("segmentsProcessed").Observe(processed)
return nil
}
// Finish gets segments count after range execution and verifies them against
// processed segments and segments in table before loop execution.
func (o *LiveCountObserver) Finish(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
stats, err := o.metabase.GetTableStats(ctx, metabase.GetTableStats{
AsOfSystemInterval: o.asOfSystemInterval,
})
if err != nil {
return err
}
segmentsProcessed := atomic.LoadInt64(&o.segmentsProcessed)
return o.verifyCount(o.segmentsBefore, stats.SegmentCount, segmentsProcessed)
}
// Stats implements monkit.StatSource to report the number of segments.
func (o *LiveCountObserver) Stats(cb func(key monkit.SeriesKey, field string, val float64)) {
cb(monkit.NewSeriesKey("rangedloop_live"), "num_segments", float64(atomic.LoadInt64(&o.segmentsProcessed)))
}
func (o *LiveCountObserver) verifyCount(before, after, processed int64) error {
low, high := before, after
if low > high {
low, high = high, low
}
var deltaFromBounds int64
var ratio float64
if processed < low {
deltaFromBounds = low - processed
// +1 to avoid division by zero
ratio = float64(deltaFromBounds) / float64(low+1)
} else if processed > high {
deltaFromBounds = processed - high
// +1 to avoid division by zero
ratio = float64(deltaFromBounds) / float64(high+1)
}
mon.IntVal("segmentloop_verify_before").Observe(before)
mon.IntVal("segmentloop_verify_after").Observe(after)
mon.IntVal("segmentloop_verify_processed").Observe(processed)
mon.IntVal("segmentloop_verify_outside").Observe(deltaFromBounds)
mon.FloatVal("segmentloop_verify_outside_ratio").Observe(ratio)
// If we have very few items from the bounds, then it's expected and the ratio does not capture it well.
const minimumDeltaThreshold = 100
if deltaFromBounds < minimumDeltaThreshold {
return nil
}
if ratio > o.suspiciousProcessedRatio {
mon.Event("ranged_loop_suspicious_segments_count")
return Error.New("processed count looks suspicious: before:%v after:%v processed:%v ratio:%v threshold:%v", before, after, processed, ratio, o.suspiciousProcessedRatio)
}
return nil
}