storj/satellite/metabase/rangedloop/service.go
Qweder93 8c69ee62fc {cmd/storj-sim, satellite/rangedloop}: added rangedloop to storj-sim, removed identity
added in storj-sim rangedloop for each satellite, to verify it works for metrics oveserver,
removed identity from rangedloop peer as we never use it, added logs on service run, added loop
to service instead of endless for loop, interval value to config

Closes: https://github.com/storj/storj/issues/5414

Change-Id: Ibc3b06071b68feda4a35b45da2bbe36e22a02fc8
2023-01-05 11:29:00 +00:00

215 lines
5.6 KiB
Go

// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package rangedloop
import (
"context"
"time"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/errs2"
"storj.io/common/sync2"
"storj.io/storj/satellite/metabase/segmentloop"
)
var (
mon = monkit.Package()
)
// Config contains configurable values for the shared loop.
type Config struct {
Parallelism int `help:"how many chunks of segments to process in parallel" default:"2"`
BatchSize int `help:"how many items to query in a batch" default:"2500"`
AsOfSystemInterval time.Duration `help:"as of system interval" releaseDefault:"-5m" devDefault:"-1us" testDefault:"-1us"`
Interval time.Duration `help:"how often to run the loop" releaseDefault:"2h" devDefault:"10s" testDefault:"10s"`
}
// Service iterates through all segments and calls the attached observers for every segment
//
// architecture: Service
type Service struct {
log *zap.Logger
config Config
provider RangeSplitter
observers []Observer
Loop *sync2.Cycle
}
// NewService creates a new instance of the ranged loop service.
func NewService(log *zap.Logger, config Config, provider RangeSplitter, observers []Observer) *Service {
return &Service{
log: log,
config: config,
provider: provider,
observers: observers,
Loop: sync2.NewCycle(config.Interval),
}
}
// observerState contains information to manage an observer during a loop iteration.
// Improvement: track duration.
type observerState struct {
observer Observer
rangeObservers []*rangeObserverState
}
type rangeObserverState struct {
rangeObserver Partial
duration time.Duration
}
// ObserverDuration reports back on how long it took the observer to process all the segments.
type ObserverDuration struct {
Observer Observer
Duration time.Duration
}
// Run starts the looping service.
func (service *Service) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
service.log.Info("ranged loop initialized")
return service.Loop.Run(ctx, func(ctx context.Context) error {
service.log.Info("ranged loop started")
_, err := service.RunOnce(ctx)
if err != nil {
service.log.Error("ranged loop failure", zap.Error(err))
if errs2.IsCanceled(err) {
return err
}
if ctx.Err() != nil {
return errs.Combine(err, ctx.Err())
}
mon.Event("rangedloop_error") //mon:locked
}
service.log.Info("ranged loop finished")
return nil
})
}
// RunOnce goes through one time and sends information to observers.
func (service *Service) RunOnce(ctx context.Context) (observerDurations []ObserverDuration, err error) {
defer mon.Task()(&ctx)(&err)
observerStates, err := startObservers(ctx, service.observers)
if err != nil {
return nil, err
}
rangeProviders, err := service.provider.CreateRanges(service.config.Parallelism, service.config.BatchSize)
if err != nil {
return nil, err
}
group := errs2.Group{}
for _, rangeProvider := range rangeProviders {
rangeObservers := []*rangeObserverState{}
for i, observerState := range observerStates {
rangeObserver, err := observerState.observer.Fork(ctx)
if err != nil {
return nil, err
}
rangeState := &rangeObserverState{
rangeObserver: rangeObserver,
}
rangeObservers = append(rangeObservers, rangeState)
observerStates[i].rangeObservers = append(observerStates[i].rangeObservers, rangeState)
}
// Create closure to capture loop variables.
group.Go(createGoroutineClosure(ctx, rangeProvider, rangeObservers))
}
// Improvement: stop all ranges when one has an error.
errList := group.Wait()
if errList != nil {
return nil, errs.Combine(errList...)
}
return finishObservers(ctx, observerStates)
}
func createGoroutineClosure(ctx context.Context, rangeProvider SegmentProvider, states []*rangeObserverState) func() error {
return func() (err error) {
defer mon.Task()(&ctx)(&err)
return rangeProvider.Iterate(ctx, func(segments []segmentloop.Segment) error {
for _, state := range states {
start := time.Now()
err := state.rangeObserver.Process(ctx, segments)
if err != nil {
return err
}
state.duration += time.Since(start)
}
return nil
})
}
}
func startObservers(ctx context.Context, observers []Observer) (observerStates []observerState, err error) {
startTime := time.Now()
for _, obs := range observers {
state, err := startObserver(ctx, startTime, obs)
if err != nil {
return nil, err
}
observerStates = append(observerStates, state)
}
return observerStates, nil
}
func startObserver(ctx context.Context, startTime time.Time, observer Observer) (observerState, error) {
err := observer.Start(ctx, startTime)
return observerState{
observer: observer,
}, err
}
func finishObservers(ctx context.Context, observerStates []observerState) (observerDurations []ObserverDuration, err error) {
for _, state := range observerStates {
observerDuration, err := finishObserver(ctx, state)
if err != nil {
return nil, err
}
observerDurations = append(observerDurations, observerDuration)
}
sendObserverDurations(observerDurations)
return observerDurations, nil
}
// Iterating over the segments is done.
// This is the reduce step.
func finishObserver(ctx context.Context, state observerState) (ObserverDuration, error) {
var duration time.Duration
for _, rangeObserver := range state.rangeObservers {
err := state.observer.Join(ctx, rangeObserver.rangeObserver)
if err != nil {
return ObserverDuration{}, err
}
duration += rangeObserver.duration
}
return ObserverDuration{
Duration: duration,
Observer: state.observer,
}, state.observer.Finish(ctx)
}