From 507b099d4482d90a6f684cf06babe248c886a1db Mon Sep 17 00:00:00 2001 From: Egon Elbre Date: Wed, 14 Sep 2022 17:15:58 +0300 Subject: [PATCH] cmd/tools/segment-verify: add monitoring / error Change-Id: I6fd0369719ddf176a98208348560004a4134f810 --- cmd/tools/segment-verify/batch.go | 5 ++++- cmd/tools/segment-verify/process.go | 15 +++++++++------ cmd/tools/segment-verify/service.go | 25 +++++++++++++++++-------- 3 files changed, 30 insertions(+), 15 deletions(-) diff --git a/cmd/tools/segment-verify/batch.go b/cmd/tools/segment-verify/batch.go index 04fdd4220..50fd9ab22 100644 --- a/cmd/tools/segment-verify/batch.go +++ b/cmd/tools/segment-verify/batch.go @@ -4,13 +4,16 @@ package main import ( + "context" "sort" "storj.io/storj/satellite/metabase" ) // CreateBatches creates load-balanced queues of segments to verify. -func (service *Service) CreateBatches(segments []*Segment) ([]*Batch, error) { +func (service *Service) CreateBatches(ctx context.Context, segments []*Segment) (_ []*Batch, err error) { + defer mon.Task()(&ctx)(&err) + // Remove offline nodes and prioritize nodes. for _, segment := range segments { service.selectOnlinePieces(segment) diff --git a/cmd/tools/segment-verify/process.go b/cmd/tools/segment-verify/process.go index e943a72c4..57cda3a44 100644 --- a/cmd/tools/segment-verify/process.go +++ b/cmd/tools/segment-verify/process.go @@ -7,21 +7,22 @@ import ( "context" "sync" - "github.com/zeebo/errs" "go.uber.org/zap" "storj.io/common/sync2" ) // Verify verifies a collection of segments. -func (service *Service) Verify(ctx context.Context, segments []*Segment) error { +func (service *Service) Verify(ctx context.Context, segments []*Segment) (err error) { + defer mon.Task()(&ctx)(&err) + for _, segment := range segments { segment.Status.Retry = VerifyPieces } - batches, err := service.CreateBatches(segments) + batches, err := service.CreateBatches(ctx, segments) if err != nil { - return errs.Wrap(err) + return Error.Wrap(err) } service.VerifyBatches(ctx, batches) @@ -43,9 +44,9 @@ func (service *Service) Verify(ctx context.Context, segments []*Segment) error { service.removePriorityPieces(segment) } - retryBatches, err := service.CreateBatches(retrySegments) + retryBatches, err := service.CreateBatches(ctx, retrySegments) if err != nil { - return errs.Wrap(err) + return Error.Wrap(err) } service.VerifyBatches(ctx, retryBatches) @@ -55,6 +56,8 @@ func (service *Service) Verify(ctx context.Context, segments []*Segment) error { // VerifyBatches verifies batches. func (service *Service) VerifyBatches(ctx context.Context, batches []*Batch) { + defer mon.Task()(&ctx)(nil) + var mu sync.Mutex limiter := sync2.NewLimiter(ConcurrentRequests) diff --git a/cmd/tools/segment-verify/service.go b/cmd/tools/segment-verify/service.go index 356415ba1..50969b470 100644 --- a/cmd/tools/segment-verify/service.go +++ b/cmd/tools/segment-verify/service.go @@ -8,6 +8,7 @@ import ( "fmt" "sync/atomic" + "github.com/spacemonkeygo/monkit/v3" "github.com/zeebo/errs" "go.uber.org/zap" @@ -16,6 +17,8 @@ import ( "storj.io/storj/satellite/metabase" ) +var mon = monkit.Package() + // Error is global error class. var Error = errs.Class("segment-verify") @@ -60,7 +63,9 @@ func NewService(log *zap.Logger) *Service { } // Process processes segments between low and high uuid.UUID with the specified batchSize. -func (service *Service) Process(ctx context.Context, low, high uuid.UUID, batchSize int) error { +func (service *Service) Process(ctx context.Context, low, high uuid.UUID, batchSize int) (err error) { + defer mon.Task()(&ctx)(&err) + cursorStreamID := low if !low.IsZero() { cursorStreamID = uuidBefore(low) @@ -76,7 +81,7 @@ func (service *Service) Process(ctx context.Context, low, high uuid.UUID, batchS // TODO: add AS OF SYSTEM time. }) if err != nil { - return errs.Wrap(err) + return Error.Wrap(err) } verifySegments := result.Segments result.Segments = nil @@ -102,13 +107,15 @@ func (service *Service) Process(ctx context.Context, low, high uuid.UUID, batchS // Process the data. err = service.ProcessSegments(ctx, segments) if err != nil { - return errs.Wrap(err) + return Error.Wrap(err) } } } // ProcessSegments processes a collection of segments. -func (service *Service) ProcessSegments(ctx context.Context, segments []*Segment) error { +func (service *Service) ProcessSegments(ctx context.Context, segments []*Segment) (err error) { + defer mon.Task()(&ctx)(&err) + service.log.Info("processing segments", zap.Int("count", len(segments)), zap.Stringer("first", segments[0].StreamID), @@ -116,9 +123,9 @@ func (service *Service) ProcessSegments(ctx context.Context, segments []*Segment ) // Verify all the segments against storage nodes. - err := service.Verify(ctx, segments) + err = service.Verify(ctx, segments) if err != nil { - return errs.Wrap(err) + return Error.Wrap(err) } notFound := []*Segment{} @@ -141,11 +148,11 @@ func (service *Service) ProcessSegments(ctx context.Context, segments []*Segment // segments from the list. notFound, err = service.RemoveDeleted(ctx, notFound) if err != nil { - return errs.Wrap(err) + return Error.Wrap(err) } retry, err = service.RemoveDeleted(ctx, retry) if err != nil { - return errs.Wrap(err) + return Error.Wrap(err) } // Output the problematic segments: @@ -158,6 +165,8 @@ func (service *Service) ProcessSegments(ctx context.Context, segments []*Segment // RemoveDeleted modifies the slice and returns only the segments that // still exist in the database. func (service *Service) RemoveDeleted(ctx context.Context, segments []*Segment) (_ []*Segment, err error) { + defer mon.Task()(&ctx)(&err) + valid := segments[:0] for _, seg := range segments { _, err := service.metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{