satellite/metabase/segmentloop: verify processed count

This adds verification for the processed count and before and after
segment/objects table counts.

This adds new flag:

  metainfo.segment-loop.suspicious-processed-ratio: 0.03

This defaults to 3%, which at 100M segments is 3M segments.

Change-Id: I5ee03e913ddc4e67e94010ced126a2a9ea51f41b
This commit is contained in:
Egon Elbre 2021-06-17 19:45:41 +03:00
parent 341033dda1
commit 9640cc2c06
2 changed files with 65 additions and 9 deletions

View File

@ -141,6 +141,8 @@ type Config struct {
ListLimit int `help:"how many items to query in a batch" default:"2500"`
AsOfSystemInterval time.Duration `help:"as of system interval" default:"-5m"`
SuspiciousProcessedRatio float64 `help:"ratio where to consider processed count as supicious" default:"0.03"`
}
// MetabaseDB contains iterators for the metabase data.
@ -149,6 +151,9 @@ type MetabaseDB interface {
Now(ctx context.Context) (time.Time, error)
// IterateLoopStreams iterates through all streams passed in as arguments.
IterateLoopSegments(ctx context.Context, opts metabase.IterateLoopSegments, fn func(context.Context, metabase.LoopSegmentsIterator) error) (err error)
// GetTableStats gathers statistics about the tables.
GetTableStats(context.Context, metabase.GetTableStats) (metabase.TableStats, error)
}
// Service is a segments loop service.
@ -334,7 +339,13 @@ func (loop *Service) iterateDatabase(ctx context.Context, observers []*observerC
finishObservers(observers)
}()
observers, err = loop.iterateSegments(ctx, observers)
before, err := loop.metabaseDB.GetTableStats(ctx, metabase.GetTableStats{})
if err != nil {
return Error.Wrap(err)
}
var processed processedStats
processed, observers, err = loop.iterateSegments(ctx, observers)
if errors.Is(err, errNoObservers) {
return nil
}
@ -342,10 +353,54 @@ func (loop *Service) iterateDatabase(ctx context.Context, observers []*observerC
return Error.Wrap(err)
}
after, err := loop.metabaseDB.GetTableStats(ctx, metabase.GetTableStats{})
if err != nil {
return Error.Wrap(err)
}
if err := loop.verifyCount(before.SegmentCount, after.SegmentCount, processed.segments); err != nil {
return Error.Wrap(err)
}
return err
}
func (loop *Service) iterateSegments(ctx context.Context, observers []*observerContext) (_ []*observerContext, err error) {
func (loop *Service) verifyCount(before, after, processed int64) error {
low, high := before, after
if low > high {
low, high = high, low
}
var deltaFromBounds int64
var ratio float64
if processed < low {
deltaFromBounds = low - processed
// +1 to avoid division by zero
ratio = float64(deltaFromBounds) / float64(low+1)
} else if processed > high {
deltaFromBounds = processed - high
// +1 to avoid division by zero
ratio = float64(deltaFromBounds) / float64(high+1)
}
mon.IntVal("segmentloop_verify_before").Observe(before)
mon.IntVal("segmentloop_verify_after").Observe(after)
mon.IntVal("segmentloop_verify_processed").Observe(processed)
mon.IntVal("segmentloop_verify_outside").Observe(deltaFromBounds)
mon.FloatVal("segmentloop_verify_outside_ratio").Observe(ratio)
if ratio > loop.config.SuspiciousProcessedRatio {
return Error.New("processed count looks suspicious: before:%v after:%v processed:%v ratio:%v threshold:%v", before, after, processed, ratio, loop.config.SuspiciousProcessedRatio)
}
return nil
}
type processedStats struct {
segments int64
}
func (loop *Service) iterateSegments(ctx context.Context, observers []*observerContext) (processed processedStats, _ []*observerContext, err error) {
defer mon.Task()(&ctx)(&err)
rateLimiter := rate.NewLimiter(rate.Limit(loop.config.RateLimit), 1)
@ -356,7 +411,7 @@ func (loop *Service) iterateSegments(ctx context.Context, observers []*observerC
startingTime, err := loop.metabaseDB.Now(ctx)
if err != nil {
return observers, Error.Wrap(err)
return processed, observers, Error.Wrap(err)
}
observers = withObservers(ctx, observers, func(ctx context.Context, observer *observerContext) bool {
@ -365,11 +420,9 @@ func (loop *Service) iterateSegments(ctx context.Context, observers []*observerC
})
if len(observers) == 0 {
return observers, errNoObservers
return processed, observers, errNoObservers
}
var segmentsProcessed int64
err = loop.metabaseDB.IterateLoopSegments(ctx, metabase.IterateLoopSegments{
BatchSize: limit,
AsOfSystemTime: startingTime,
@ -401,13 +454,13 @@ func (loop *Service) iterateSegments(ctx context.Context, observers []*observerC
return errNoObservers
}
segmentsProcessed++
mon.IntVal("segmentsProcessed").Observe(segmentsProcessed) //mon:locked
processed.segments++
mon.IntVal("segmentsProcessed").Observe(processed.segments) //mon:locked
}
return nil
})
return observers, err
return processed, observers, err
}
func withObservers(ctx context.Context, observers []*observerContext, handleObserver func(ctx context.Context, observer *observerContext) bool) []*observerContext {

View File

@ -433,6 +433,9 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
# rate limit (default is 0 which is unlimited segments per second)
# metainfo.segment-loop.rate-limit: 0
# ratio where to consider processed count as supicious
# metainfo.segment-loop.suspicious-processed-ratio: 0.03
# address(es) to send telemetry to (comma-separated)
# metrics.addr: collectora.storj.io:9000