cmd/tools/segment-verify: add monitoring / error

Change-Id: I6fd0369719ddf176a98208348560004a4134f810
This commit is contained in:
Egon Elbre 2022-09-14 17:15:58 +03:00
parent 6127f465dc
commit 507b099d44
3 changed files with 30 additions and 15 deletions

View File

@ -4,13 +4,16 @@
package main package main
import ( import (
"context"
"sort" "sort"
"storj.io/storj/satellite/metabase" "storj.io/storj/satellite/metabase"
) )
// CreateBatches creates load-balanced queues of segments to verify. // CreateBatches creates load-balanced queues of segments to verify.
func (service *Service) CreateBatches(segments []*Segment) ([]*Batch, error) { func (service *Service) CreateBatches(ctx context.Context, segments []*Segment) (_ []*Batch, err error) {
defer mon.Task()(&ctx)(&err)
// Remove offline nodes and prioritize nodes. // Remove offline nodes and prioritize nodes.
for _, segment := range segments { for _, segment := range segments {
service.selectOnlinePieces(segment) service.selectOnlinePieces(segment)

View File

@ -7,21 +7,22 @@ import (
"context" "context"
"sync" "sync"
"github.com/zeebo/errs"
"go.uber.org/zap" "go.uber.org/zap"
"storj.io/common/sync2" "storj.io/common/sync2"
) )
// Verify verifies a collection of segments. // Verify verifies a collection of segments.
func (service *Service) Verify(ctx context.Context, segments []*Segment) error { func (service *Service) Verify(ctx context.Context, segments []*Segment) (err error) {
defer mon.Task()(&ctx)(&err)
for _, segment := range segments { for _, segment := range segments {
segment.Status.Retry = VerifyPieces segment.Status.Retry = VerifyPieces
} }
batches, err := service.CreateBatches(segments) batches, err := service.CreateBatches(ctx, segments)
if err != nil { if err != nil {
return errs.Wrap(err) return Error.Wrap(err)
} }
service.VerifyBatches(ctx, batches) service.VerifyBatches(ctx, batches)
@ -43,9 +44,9 @@ func (service *Service) Verify(ctx context.Context, segments []*Segment) error {
service.removePriorityPieces(segment) service.removePriorityPieces(segment)
} }
retryBatches, err := service.CreateBatches(retrySegments) retryBatches, err := service.CreateBatches(ctx, retrySegments)
if err != nil { if err != nil {
return errs.Wrap(err) return Error.Wrap(err)
} }
service.VerifyBatches(ctx, retryBatches) service.VerifyBatches(ctx, retryBatches)
@ -55,6 +56,8 @@ func (service *Service) Verify(ctx context.Context, segments []*Segment) error {
// VerifyBatches verifies batches. // VerifyBatches verifies batches.
func (service *Service) VerifyBatches(ctx context.Context, batches []*Batch) { func (service *Service) VerifyBatches(ctx context.Context, batches []*Batch) {
defer mon.Task()(&ctx)(nil)
var mu sync.Mutex var mu sync.Mutex
limiter := sync2.NewLimiter(ConcurrentRequests) limiter := sync2.NewLimiter(ConcurrentRequests)

View File

@ -8,6 +8,7 @@ import (
"fmt" "fmt"
"sync/atomic" "sync/atomic"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs" "github.com/zeebo/errs"
"go.uber.org/zap" "go.uber.org/zap"
@ -16,6 +17,8 @@ import (
"storj.io/storj/satellite/metabase" "storj.io/storj/satellite/metabase"
) )
var mon = monkit.Package()
// Error is global error class. // Error is global error class.
var Error = errs.Class("segment-verify") var Error = errs.Class("segment-verify")
@ -60,7 +63,9 @@ func NewService(log *zap.Logger) *Service {
} }
// Process processes segments between low and high uuid.UUID with the specified batchSize. // Process processes segments between low and high uuid.UUID with the specified batchSize.
func (service *Service) Process(ctx context.Context, low, high uuid.UUID, batchSize int) error { func (service *Service) Process(ctx context.Context, low, high uuid.UUID, batchSize int) (err error) {
defer mon.Task()(&ctx)(&err)
cursorStreamID := low cursorStreamID := low
if !low.IsZero() { if !low.IsZero() {
cursorStreamID = uuidBefore(low) cursorStreamID = uuidBefore(low)
@ -76,7 +81,7 @@ func (service *Service) Process(ctx context.Context, low, high uuid.UUID, batchS
// TODO: add AS OF SYSTEM time. // TODO: add AS OF SYSTEM time.
}) })
if err != nil { if err != nil {
return errs.Wrap(err) return Error.Wrap(err)
} }
verifySegments := result.Segments verifySegments := result.Segments
result.Segments = nil result.Segments = nil
@ -102,13 +107,15 @@ func (service *Service) Process(ctx context.Context, low, high uuid.UUID, batchS
// Process the data. // Process the data.
err = service.ProcessSegments(ctx, segments) err = service.ProcessSegments(ctx, segments)
if err != nil { if err != nil {
return errs.Wrap(err) return Error.Wrap(err)
} }
} }
} }
// ProcessSegments processes a collection of segments. // ProcessSegments processes a collection of segments.
func (service *Service) ProcessSegments(ctx context.Context, segments []*Segment) error { func (service *Service) ProcessSegments(ctx context.Context, segments []*Segment) (err error) {
defer mon.Task()(&ctx)(&err)
service.log.Info("processing segments", service.log.Info("processing segments",
zap.Int("count", len(segments)), zap.Int("count", len(segments)),
zap.Stringer("first", segments[0].StreamID), zap.Stringer("first", segments[0].StreamID),
@ -116,9 +123,9 @@ func (service *Service) ProcessSegments(ctx context.Context, segments []*Segment
) )
// Verify all the segments against storage nodes. // Verify all the segments against storage nodes.
err := service.Verify(ctx, segments) err = service.Verify(ctx, segments)
if err != nil { if err != nil {
return errs.Wrap(err) return Error.Wrap(err)
} }
notFound := []*Segment{} notFound := []*Segment{}
@ -141,11 +148,11 @@ func (service *Service) ProcessSegments(ctx context.Context, segments []*Segment
// segments from the list. // segments from the list.
notFound, err = service.RemoveDeleted(ctx, notFound) notFound, err = service.RemoveDeleted(ctx, notFound)
if err != nil { if err != nil {
return errs.Wrap(err) return Error.Wrap(err)
} }
retry, err = service.RemoveDeleted(ctx, retry) retry, err = service.RemoveDeleted(ctx, retry)
if err != nil { if err != nil {
return errs.Wrap(err) return Error.Wrap(err)
} }
// Output the problematic segments: // Output the problematic segments:
@ -158,6 +165,8 @@ func (service *Service) ProcessSegments(ctx context.Context, segments []*Segment
// RemoveDeleted modifies the slice and returns only the segments that // RemoveDeleted modifies the slice and returns only the segments that
// still exist in the database. // still exist in the database.
func (service *Service) RemoveDeleted(ctx context.Context, segments []*Segment) (_ []*Segment, err error) { func (service *Service) RemoveDeleted(ctx context.Context, segments []*Segment) (_ []*Segment, err error) {
defer mon.Task()(&ctx)(&err)
valid := segments[:0] valid := segments[:0]
for _, seg := range segments { for _, seg := range segments {
_, err := service.metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ _, err := service.metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{