From 4ad51209235f38b7f64daf5e9e95fbda34ade503 Mon Sep 17 00:00:00 2001 From: aligeti <34487396+aligeti@users.noreply.github.com> Date: Fri, 31 May 2019 10:12:49 -0400 Subject: [PATCH] Checker service refactor (v3-1871) (#2082) * refactor the checker service * monkit update --- pkg/datarepair/checker/checker.go | 62 +++++++++++-------------------- 1 file changed, 21 insertions(+), 41 deletions(-) diff --git a/pkg/datarepair/checker/checker.go b/pkg/datarepair/checker/checker.go index b570cc9a7..cf1b46e61 100644 --- a/pkg/datarepair/checker/checker.go +++ b/pkg/datarepair/checker/checker.go @@ -10,6 +10,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/zeebo/errs" "go.uber.org/zap" + "golang.org/x/sync/errgroup" monkit "gopkg.in/spacemonkeygo/monkit.v2" "storj.io/storj/internal/sync2" @@ -53,6 +54,7 @@ type Checker struct { logger *zap.Logger Loop sync2.Cycle IrreparableLoop sync2.Cycle + monStats durabilityStats } // NewChecker creates a new instance of checker @@ -67,6 +69,7 @@ func NewChecker(metainfo *metainfo.Service, repairQueue queue.RepairQueue, overl logger: logger, Loop: *sync2.NewCycle(repairInterval), IrreparableLoop: *sync2.NewCycle(irreparableInterval), + monStats: durabilityStats{}, } return checker } @@ -75,34 +78,17 @@ func NewChecker(metainfo *metainfo.Service, repairQueue queue.RepairQueue, overl func (checker *Checker) Run(ctx context.Context) (err error) { defer mon.Task()(&ctx)(&err) - c := make(chan error) + group, ctx := errgroup.WithContext(ctx) - go func() { - c <- checker.Loop.Run(ctx, func(ctx context.Context) error { - err := checker.IdentifyInjuredSegments(ctx) - if err != nil { - checker.logger.Error("error with injured segments identification: ", zap.Error(err)) - } - return nil - }) - }() + group.Go(func() error { + return checker.Loop.Run(ctx, checker.IdentifyInjuredSegments) + }) - go func() { - c <- checker.IrreparableLoop.Run(ctx, func(ctx context.Context) error { - err := checker.IrreparableProcess(ctx) - if err != nil { - checker.logger.Error("error with irreparable segments identification", zap.Error(err)) - } - return nil - }) - }() + group.Go(func() error { + return checker.IrreparableLoop.Run(ctx, checker.IrreparableProcess) + }) - for err := range c { - if err != nil { - return err - } - } - return nil + return group.Wait() } // Close halts the Checker loop @@ -115,8 +101,6 @@ func (checker *Checker) Close() error { func (checker *Checker) IdentifyInjuredSegments(ctx context.Context) (err error) { defer mon.Task()(&ctx)(&err) - var monStats durabilityStats - err = checker.metainfo.Iterate("", checker.lastChecked, true, false, func(it storage.Iterator) error { var item storage.ListItem @@ -129,11 +113,14 @@ func (checker *Checker) IdentifyInjuredSegments(ctx context.Context) (err error) // if we have finished iterating, send and reset durability stats if checker.lastChecked == "" { // send durability stats - mon.IntVal("remote_files_checked").Observe(monStats.remoteFilesChecked) - mon.IntVal("remote_segments_checked").Observe(monStats.remoteSegmentsChecked) - mon.IntVal("remote_segments_needing_repair").Observe(monStats.remoteSegmentsNeedingRepair) - mon.IntVal("remote_segments_lost").Observe(monStats.remoteSegmentsLost) - mon.IntVal("remote_files_lost").Observe(int64(len(monStats.remoteSegmentInfo))) + mon.IntVal("remote_files_checked").Observe(checker.monStats.remoteFilesChecked) + mon.IntVal("remote_segments_checked").Observe(checker.monStats.remoteSegmentsChecked) + mon.IntVal("remote_segments_needing_repair").Observe(checker.monStats.remoteSegmentsNeedingRepair) + mon.IntVal("remote_segments_lost").Observe(checker.monStats.remoteSegmentsLost) + mon.IntVal("remote_files_lost").Observe(int64(len(checker.monStats.remoteSegmentInfo))) + + // reset durability stats for next iteration + checker.monStats = durabilityStats{} } }() @@ -145,7 +132,7 @@ func (checker *Checker) IdentifyInjuredSegments(ctx context.Context) (err error) return Error.New("error unmarshalling pointer %s", err) } - err = checker.updateSegmentStatus(ctx, pointer, item.Key.String(), &monStats) + err = checker.updateSegmentStatus(ctx, pointer, item.Key.String(), &checker.monStats) if err != nil { return err } @@ -253,7 +240,6 @@ func (checker *Checker) IrreparableProcess(ctx context.Context) (err error) { limit := 1 var offset int64 - var monStats durabilityStats for { seg, err := checker.irrdb.GetLimited(ctx, limit, offset) @@ -266,18 +252,12 @@ func (checker *Checker) IrreparableProcess(ctx context.Context) (err error) { break } - err = checker.updateSegmentStatus(ctx, seg[0].GetSegmentDetail(), string(seg[0].GetPath()), &monStats) + err = checker.updateSegmentStatus(ctx, seg[0].GetSegmentDetail(), string(seg[0].GetPath()), &durabilityStats{}) if err != nil { checker.logger.Error("irrepair segment checker failed: ", zap.Error(err)) } offset++ } - // send durability stats - mon.IntVal("remote_files_checked").Observe(monStats.remoteFilesChecked) - mon.IntVal("remote_segments_checked").Observe(monStats.remoteSegmentsChecked) - mon.IntVal("remote_segments_needing_repair").Observe(monStats.remoteSegmentsNeedingRepair) - mon.IntVal("remote_segments_lost").Observe(monStats.remoteSegmentsLost) - mon.IntVal("remote_files_lost").Observe(int64(len(monStats.remoteSegmentInfo))) return nil }