4bdbb25d83
We will remove segments loop soon so we need first to move Segment definition to rangedloop package. https://github.com/storj/storj/issues/5237 Change-Id: Ibe6aad316ffb7073cc4de166f1f17b87aac07363
301 lines
8.4 KiB
Go
301 lines
8.4 KiB
Go
// Copyright (C) 2022 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package rangedloop
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/jtolio/eventkit"
|
|
"github.com/spacemonkeygo/monkit/v3"
|
|
"github.com/zeebo/errs"
|
|
"go.uber.org/zap"
|
|
|
|
"storj.io/common/errs2"
|
|
"storj.io/common/sync2"
|
|
)
|
|
|
|
var (
|
|
mon = monkit.Package()
|
|
ev = eventkit.Package()
|
|
|
|
// Error is a standard error class for this component.
|
|
Error = errs.Class("ranged loop")
|
|
)
|
|
|
|
// Config contains configurable values for the shared loop.
|
|
type Config struct {
|
|
Parallelism int `help:"how many chunks of segments to process in parallel" default:"2"`
|
|
BatchSize int `help:"how many items to query in a batch" default:"2500"`
|
|
AsOfSystemInterval time.Duration `help:"as of system interval" releaseDefault:"-5m" devDefault:"-1us" testDefault:"-1us"`
|
|
Interval time.Duration `help:"how often to run the loop" releaseDefault:"2h" devDefault:"10s" testDefault:"10s"`
|
|
|
|
SuspiciousProcessedRatio float64 `help:"ratio where to consider processed count as supicious" default:"0.03"`
|
|
}
|
|
|
|
// Service iterates through all segments and calls the attached observers for every segment
|
|
//
|
|
// architecture: Service
|
|
type Service struct {
|
|
log *zap.Logger
|
|
config Config
|
|
provider RangeSplitter
|
|
observers []Observer
|
|
|
|
Loop *sync2.Cycle
|
|
}
|
|
|
|
// NewService creates a new instance of the ranged loop service.
|
|
func NewService(log *zap.Logger, config Config, provider RangeSplitter, observers []Observer) *Service {
|
|
return &Service{
|
|
log: log,
|
|
config: config,
|
|
provider: provider,
|
|
observers: observers,
|
|
Loop: sync2.NewCycle(config.Interval),
|
|
}
|
|
}
|
|
|
|
// observerState contains information to manage an observer during a loop iteration.
|
|
type observerState struct {
|
|
observer Observer
|
|
rangeObservers []*rangeObserverState
|
|
// err is the error that occurred during the observer's Start method.
|
|
// If err is set, the observer will be skipped during the loop iteration.
|
|
err error
|
|
}
|
|
|
|
type rangeObserverState struct {
|
|
rangeObserver Partial
|
|
duration time.Duration
|
|
// err is the error that is returned by the observer's Fork or Process method.
|
|
// If err is set, the range observer will be skipped during the loop iteration.
|
|
err error
|
|
}
|
|
|
|
// ObserverDuration reports back on how long it took the observer to process all the segments.
|
|
type ObserverDuration struct {
|
|
Observer Observer
|
|
// Duration is set to -1 when the observer has errored out
|
|
// so someone watching metrics can tell that something went wrong.
|
|
Duration time.Duration
|
|
}
|
|
|
|
// Close stops the ranged loop.
|
|
func (service *Service) Close() error {
|
|
service.Loop.Close()
|
|
return nil
|
|
}
|
|
|
|
// Run starts the looping service.
|
|
func (service *Service) Run(ctx context.Context) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
service.log.Info("ranged loop initialized")
|
|
|
|
return service.Loop.Run(ctx, func(ctx context.Context) error {
|
|
service.log.Info("ranged loop started",
|
|
zap.Int("parallelism", service.config.Parallelism),
|
|
zap.Int("batchSize", service.config.BatchSize),
|
|
)
|
|
_, err := service.RunOnce(ctx)
|
|
if err != nil {
|
|
service.log.Error("ranged loop failure", zap.Error(err))
|
|
|
|
if errs2.IsCanceled(err) {
|
|
return err
|
|
}
|
|
|
|
if ctx.Err() != nil {
|
|
return errs.Combine(err, ctx.Err())
|
|
}
|
|
|
|
mon.Event("rangedloop_error") //mon:locked
|
|
}
|
|
|
|
service.log.Info("ranged loop finished")
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// RunOnce goes through one time and sends information to observers.
|
|
func (service *Service) RunOnce(ctx context.Context) (observerDurations []ObserverDuration, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
observerStates, err := startObservers(ctx, service.log, service.observers)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
rangeProviders, err := service.provider.CreateRanges(service.config.Parallelism, service.config.BatchSize)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
group := errs2.Group{}
|
|
for index, rangeProvider := range rangeProviders {
|
|
uuidRange := rangeProvider.Range()
|
|
service.log.Debug("creating range", zap.Int("index", index), zap.Stringer("start", uuidRange.Start), zap.Stringer("end", uuidRange.End))
|
|
|
|
rangeObservers := []*rangeObserverState{}
|
|
for i, observerState := range observerStates {
|
|
if observerState.err != nil {
|
|
service.log.Debug("observer returned error", zap.Error(observerState.err))
|
|
continue
|
|
}
|
|
rangeObserver, err := observerState.observer.Fork(ctx)
|
|
rangeState := &rangeObserverState{
|
|
rangeObserver: rangeObserver,
|
|
err: err,
|
|
}
|
|
rangeObservers = append(rangeObservers, rangeState)
|
|
observerStates[i].rangeObservers = append(observerStates[i].rangeObservers, rangeState)
|
|
}
|
|
|
|
// Create closure to capture loop variables.
|
|
group.Go(createGoroutineClosure(ctx, rangeProvider, rangeObservers))
|
|
}
|
|
|
|
// Improvement: stop all ranges when one has an error.
|
|
errList := group.Wait()
|
|
if errList != nil {
|
|
return nil, errs.Combine(errList...)
|
|
}
|
|
|
|
return finishObservers(ctx, service.log, observerStates), nil
|
|
}
|
|
|
|
func createGoroutineClosure(ctx context.Context, rangeProvider SegmentProvider, states []*rangeObserverState) func() error {
|
|
return func() (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
return rangeProvider.Iterate(ctx, func(segments []Segment) error {
|
|
// check for cancellation every segment batch
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
return processBatch(ctx, states, segments)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func startObservers(ctx context.Context, log *zap.Logger, observers []Observer) (observerStates []observerState, err error) {
|
|
startTime := time.Now()
|
|
|
|
for _, obs := range observers {
|
|
observerStates = append(observerStates, startObserver(ctx, log, startTime, obs))
|
|
}
|
|
|
|
return observerStates, nil
|
|
}
|
|
|
|
func startObserver(ctx context.Context, log *zap.Logger, startTime time.Time, observer Observer) observerState {
|
|
err := observer.Start(ctx, startTime)
|
|
|
|
if err != nil {
|
|
log.Error(
|
|
"Starting observer failed. This observer will be excluded from this run of the ranged segment loop.",
|
|
zap.String("observer", fmt.Sprintf("%T", observer)),
|
|
zap.Error(err),
|
|
)
|
|
}
|
|
|
|
return observerState{
|
|
observer: observer,
|
|
err: err,
|
|
}
|
|
}
|
|
|
|
func finishObservers(ctx context.Context, log *zap.Logger, observerStates []observerState) (observerDurations []ObserverDuration) {
|
|
for _, state := range observerStates {
|
|
observerDurations = append(observerDurations, finishObserver(ctx, log, state))
|
|
}
|
|
|
|
sendObserverDurations(observerDurations)
|
|
|
|
return observerDurations
|
|
}
|
|
|
|
// Iterating over the segments is done.
|
|
// This is the reduce step.
|
|
func finishObserver(ctx context.Context, log *zap.Logger, state observerState) ObserverDuration {
|
|
if state.err != nil {
|
|
return ObserverDuration{
|
|
Observer: state.observer,
|
|
Duration: -1 * time.Second,
|
|
}
|
|
}
|
|
for _, rangeObserver := range state.rangeObservers {
|
|
if rangeObserver.err != nil {
|
|
log.Error(
|
|
"Observer failed during Process(), it will not be finalized in this run of the ranged segment loop",
|
|
zap.String("observer", fmt.Sprintf("%T", state.observer)),
|
|
zap.Error(rangeObserver.err),
|
|
)
|
|
return ObserverDuration{
|
|
Observer: state.observer,
|
|
Duration: -1 * time.Second,
|
|
}
|
|
}
|
|
}
|
|
|
|
var duration time.Duration
|
|
for _, rangeObserver := range state.rangeObservers {
|
|
err := state.observer.Join(ctx, rangeObserver.rangeObserver)
|
|
if err != nil {
|
|
log.Error(
|
|
"Observer failed during Join(), it will not be finalized in this run of the ranged segment loop",
|
|
zap.String("observer", fmt.Sprintf("%T", state.observer)),
|
|
zap.Error(rangeObserver.err),
|
|
)
|
|
return ObserverDuration{
|
|
Observer: state.observer,
|
|
Duration: -1 * time.Second,
|
|
}
|
|
}
|
|
duration += rangeObserver.duration
|
|
}
|
|
|
|
err := state.observer.Finish(ctx)
|
|
if err != nil {
|
|
log.Error(
|
|
"Observer failed during Finish()",
|
|
zap.String("observer", fmt.Sprintf("%T", state.observer)),
|
|
zap.Error(err),
|
|
)
|
|
return ObserverDuration{
|
|
Observer: state.observer,
|
|
Duration: -1 * time.Second,
|
|
}
|
|
}
|
|
|
|
return ObserverDuration{
|
|
Duration: duration,
|
|
Observer: state.observer,
|
|
}
|
|
}
|
|
|
|
func processBatch(ctx context.Context, states []*rangeObserverState, segments []Segment) (err error) {
|
|
for _, state := range states {
|
|
if state.err != nil {
|
|
// this observer has errored in a previous batch
|
|
continue
|
|
}
|
|
start := time.Now()
|
|
err := state.rangeObserver.Process(ctx, segments)
|
|
state.duration += time.Since(start)
|
|
if err != nil {
|
|
// unsure if this is necessary here
|
|
if errs2.IsCanceled(err) {
|
|
return err
|
|
}
|
|
state.err = err
|
|
}
|
|
}
|
|
return nil
|
|
}
|