cmd/tools/metabase-verify: switch to ranged loop

We would like to remove segments loop so we need to refactor
our tools to use ranged loop.

To simplify change ranged loop is used with single range only.

https://github.com/storj/storj/issues/5237

Change-Id: I94d96d54f9d0e37b06def4f4fc16b71c5b79baba
This commit is contained in:
Michal Niewrzal 2023-05-16 14:45:21 +02:00
parent 19cb08b025
commit cf5ff537e3
4 changed files with 108 additions and 75 deletions

View File

@ -4,8 +4,6 @@
package main
import (
"time"
"github.com/spf13/cobra"
"github.com/zeebo/errs"
"go.uber.org/zap"
@ -49,9 +47,7 @@ func VerifyCommand(log *zap.Logger) *cobra.Command {
flag.BoolVar(&ignoreVersionMismatch, "ignore-version-mismatch", false, "ignore version mismatch")
flag.DurationVar(&verifyConfig.Loop.CoalesceDuration, "loop.coalesce-duration", 5*time.Second, "how long to wait for new observers before starting iteration")
flag.Float64Var(&verifyConfig.Loop.RateLimit, "loop.rate-limit", 0, "rate limit (default is 0 which is unlimited segments per second)")
flag.IntVar(&verifyConfig.Loop.ListLimit, "loop.list-limit", 2500, "how many items to query in a batch")
flag.IntVar(&verifyConfig.Loop.BatchSize, "loop.batch-size", 2500, "how many items to query in a batch")
flag.Int64Var(&verifyConfig.ProgressPrintFrequency, "progress-frequency", 1000000, "how often should we print progress (every object)")

View File

@ -6,21 +6,62 @@ package verify
import (
"context"
"runtime"
"sync"
"time"
"go.uber.org/zap"
"storj.io/common/memory"
"storj.io/storj/satellite/metabase/segmentloop"
"storj.io/storj/satellite/metabase/rangedloop"
)
// ProgressObserver counts and prints progress of metabase loop.
type ProgressObserver struct {
Log *zap.Logger
mu sync.Mutex
ProgressPrintFrequency int64
RemoteSegmentCount int64
InlineSegmentCount int64
}
RemoteSegmentCount int64
InlineSegmentCount int64
// Start is called at the beginning of each segment loop.
func (progress *ProgressObserver) Start(context.Context, time.Time) error {
return nil
}
// Fork creates a Partial to process a chunk of all the segments. It is
// called after Start. It is not called concurrently.
func (progress *ProgressObserver) Fork(context.Context) (rangedloop.Partial, error) {
return progress, nil
}
// Join is called for each partial returned by Fork.
func (progress *ProgressObserver) Join(context.Context, rangedloop.Partial) error {
return nil
}
// Finish is called after all segments are processed by all observers.
func (progress *ProgressObserver) Finish(context.Context) error {
return nil
}
// Process is called repeatedly with batches of segments.
func (progress *ProgressObserver) Process(ctx context.Context, segments []rangedloop.Segment) error {
progress.mu.Lock()
defer progress.mu.Unlock()
for _, segment := range segments {
if segment.Inline() {
progress.InlineSegmentCount++
} else {
progress.RemoteSegmentCount++
}
if (progress.RemoteSegmentCount+progress.InlineSegmentCount)%progress.ProgressPrintFrequency == 0 {
progress.Report()
}
}
return nil
}
// Report reports the current progress.
@ -39,26 +80,3 @@ func (progress *ProgressObserver) Report() {
zap.Uint32("NumGC", m.NumGC),
)
}
// RemoteSegment implements the Observer interface.
func (progress *ProgressObserver) RemoteSegment(context.Context, *segmentloop.Segment) error {
progress.RemoteSegmentCount++
if (progress.RemoteSegmentCount+progress.InlineSegmentCount)%progress.ProgressPrintFrequency == 0 {
progress.Report()
}
return nil
}
// InlineSegment implements the Observer interface.
func (progress *ProgressObserver) InlineSegment(context.Context, *segmentloop.Segment) error {
progress.InlineSegmentCount++
if (progress.RemoteSegmentCount+progress.InlineSegmentCount)%progress.ProgressPrintFrequency == 0 {
progress.Report()
}
return nil
}
// LoopStarted is called at each start of a loop.
func (progress *ProgressObserver) LoopStarted(ctx context.Context, info segmentloop.LoopInfo) (err error) {
return nil
}

View File

@ -5,42 +5,64 @@ package verify
import (
"context"
"sync"
"time"
"go.uber.org/zap"
"storj.io/common/uuid"
"storj.io/storj/satellite/metabase/segmentloop"
"storj.io/storj/satellite/metabase/rangedloop"
)
// SegmentSizes verifies segments table plain_offset and plain_size.
type SegmentSizes struct {
Log *zap.Logger
segmentState
}
type segmentState struct {
StreamID uuid.UUID
ExpectedOffset int64
}
// LoopStarted is called at each start of a loop.
func (verify *SegmentSizes) LoopStarted(ctx context.Context, info segmentloop.LoopInfo) (err error) {
// SegmentSizes verifies segments table plain_offset and plain_size.
type SegmentSizes struct {
Log *zap.Logger
mu sync.Mutex
segmentState
}
// Start is called at the beginning of each segment loop.
func (verify *SegmentSizes) Start(context.Context, time.Time) error {
return nil
}
// RemoteSegment implements the Observer interface.
func (verify *SegmentSizes) RemoteSegment(ctx context.Context, seg *segmentloop.Segment) error {
return verify.advanceSegment(ctx, seg)
// Fork creates a Partial to process a chunk of all the segments. It is
// called after Start. It is not called concurrently.
func (verify *SegmentSizes) Fork(context.Context) (rangedloop.Partial, error) {
return verify, nil
}
// InlineSegment implements the Observer interface.
func (verify *SegmentSizes) InlineSegment(ctx context.Context, seg *segmentloop.Segment) error {
return verify.advanceSegment(ctx, seg)
// Join is called for each partial returned by Fork.
func (verify *SegmentSizes) Join(context.Context, rangedloop.Partial) error {
return nil
}
func (verify *SegmentSizes) advanceSegment(ctx context.Context, seg *segmentloop.Segment) error {
// Finish is called after all segments are processed by all observers.
func (verify *SegmentSizes) Finish(context.Context) error {
return nil
}
// Process is called repeatedly with batches of segments.
func (verify *SegmentSizes) Process(ctx context.Context, segments []rangedloop.Segment) error {
verify.mu.Lock()
defer verify.mu.Unlock()
for _, segment := range segments {
if err := verify.advanceSegment(ctx, segment); err != nil {
return err
}
}
return nil
}
func (verify *SegmentSizes) advanceSegment(ctx context.Context, seg rangedloop.Segment) error {
if verify.segmentState.StreamID != seg.StreamID {
verify.segmentState = segmentState{
StreamID: seg.StreamID,

View File

@ -5,12 +5,13 @@ package verify
import (
"context"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/errs2"
"storj.io/storj/satellite/metabase/segmentloop"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/rangedloop"
)
// Error is the default error class for the package.
@ -22,17 +23,18 @@ type Chore struct {
Config Config
DB segmentloop.MetabaseDB
DB *metabase.DB
}
// Config contains configuration for all the services.
type Config struct {
ProgressPrintFrequency int64
Loop segmentloop.Config
Loop rangedloop.Config
}
// New creates new verification.
func New(log *zap.Logger, mdb segmentloop.MetabaseDB, config Config) *Chore {
func New(log *zap.Logger, mdb *metabase.DB, config Config) *Chore {
return &Chore{
Log: log,
Config: config,
@ -42,28 +44,23 @@ func New(log *zap.Logger, mdb segmentloop.MetabaseDB, config Config) *Chore {
// RunOnce creates a new segmentloop and runs the verifications.
func (chore *Chore) RunOnce(ctx context.Context) error {
loop := segmentloop.New(chore.Log, chore.Config.Loop, chore.DB)
plainOffset := &SegmentSizes{
Log: chore.Log.Named("segment-sizes"),
}
progress := &ProgressObserver{
Log: chore.Log.Named("progress"),
ProgressPrintFrequency: chore.Config.ProgressPrintFrequency,
}
var group errs2.Group
group.Go(func() error {
plainOffset := &SegmentSizes{
Log: chore.Log.Named("segment-sizes"),
}
err := loop.Join(ctx, plainOffset)
return Error.Wrap(err)
})
// override parallelism to simulate old segments loop
chore.Config.Loop.Parallelism = 1
provider := rangedloop.NewMetabaseRangeSplitter(chore.DB, 5*time.Second, 2500)
loop := rangedloop.NewService(chore.Log, chore.Config.Loop, provider,
[]rangedloop.Observer{
plainOffset,
progress,
})
group.Go(func() error {
progress := &ProgressObserver{
Log: chore.Log.Named("progress"),
ProgressPrintFrequency: chore.Config.ProgressPrintFrequency,
}
err := loop.Monitor(ctx, progress)
progress.Report()
return Error.Wrap(err)
})
group.Go(func() error {
return Error.Wrap(loop.RunOnce(ctx))
})
return Error.Wrap(errs.Combine(group.Wait()...))
_, err := loop.RunOnce(ctx)
return Error.Wrap(err)
}