From c0e7f463fe90eeef5229de44a0b0303125f62179 Mon Sep 17 00:00:00 2001 From: Michal Niewrzal Date: Wed, 10 May 2023 10:36:52 +0200 Subject: [PATCH] satellite/metabase: remove segmentsloop package Last change to remove segments loop from codebase. https://github.com/storj/storj/issues/5237 Change-Id: I77b12911b6b4e390a7385e6e8057c7587e74b70a --- cmd/tools/metabase-verify/verify/verify.go | 2 +- monkit.lock | 3 - private/testplanet/satellite.go | 8 +- satellite/core.go | 15 +- satellite/gc-bf.go | 5 - satellite/gc/bloomfilter/observer_test.go | 6 +- satellite/metabase/segmentloop/service.go | 542 ------------------ .../metabase/segmentloop/service_test.go | 426 -------------- satellite/metabase/segmentloop/stats.go | 79 --- satellite/metainfo/config.go | 2 - scripts/testdata/satellite-config.yaml.lock | 15 - 11 files changed, 4 insertions(+), 1099 deletions(-) delete mode 100644 satellite/metabase/segmentloop/service.go delete mode 100644 satellite/metabase/segmentloop/service_test.go delete mode 100644 satellite/metabase/segmentloop/stats.go diff --git a/cmd/tools/metabase-verify/verify/verify.go b/cmd/tools/metabase-verify/verify/verify.go index b93e52303..f2caeb8ac 100644 --- a/cmd/tools/metabase-verify/verify/verify.go +++ b/cmd/tools/metabase-verify/verify/verify.go @@ -42,7 +42,7 @@ func New(log *zap.Logger, mdb *metabase.DB, config Config) *Chore { } } -// RunOnce creates a new segmentloop and runs the verifications. +// RunOnce creates a new rangedloop and runs the verifications. func (chore *Chore) RunOnce(ctx context.Context) error { plainOffset := &SegmentSizes{ Log: chore.Log.Named("segment-sizes"), diff --git a/monkit.lock b/monkit.lock index 1ef55c88e..ccfd32e9b 100644 --- a/monkit.lock +++ b/monkit.lock @@ -78,9 +78,6 @@ storj.io/storj/satellite/gracefulexit."graceful_exit_successful_pieces_transfer_ storj.io/storj/satellite/gracefulexit."graceful_exit_transfer_piece_fail" Meter storj.io/storj/satellite/gracefulexit."graceful_exit_transfer_piece_success" Meter storj.io/storj/satellite/metabase/rangedloop."rangedloop_error" Event -storj.io/storj/satellite/metabase/segmentloop."segmentloop_error" Event -storj.io/storj/satellite/metabase/segmentloop."segmentsProcessed" IntVal -storj.io/storj/satellite/metabase/segmentloop.*Service.RunOnce Task storj.io/storj/satellite/metainfo."metainfo_rate_limit_exceeded" Event storj.io/storj/satellite/metainfo/piecedeletion."delete_batch_size" IntVal storj.io/storj/satellite/metainfo/piecedeletion."deletion_pieces_unhandled_count" IntVal diff --git a/private/testplanet/satellite.go b/private/testplanet/satellite.go index f3a7898e6..6dded8236 100644 --- a/private/testplanet/satellite.go +++ b/private/testplanet/satellite.go @@ -45,7 +45,6 @@ import ( "storj.io/storj/satellite/inspector" "storj.io/storj/satellite/mailservice" "storj.io/storj/satellite/metabase" - "storj.io/storj/satellite/metabase/segmentloop" "storj.io/storj/satellite/metabase/zombiedeletion" "storj.io/storj/satellite/metainfo" "storj.io/storj/satellite/metainfo/expireddeletion" @@ -107,8 +106,6 @@ type Satellite struct { // TODO remove when uplink will be adjusted to use Metabase.DB Metabase *metabase.DB Endpoint *metainfo.Endpoint - // TODO remove when uplink will be adjusted to use Metabase.SegmentLoop - SegmentLoop *segmentloop.Service } Userinfo struct { @@ -116,8 +113,7 @@ type Satellite struct { } Metabase struct { - DB *metabase.DB - SegmentLoop *segmentloop.Service + DB *metabase.DB } Inspector struct { @@ -452,7 +448,6 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int config.Compensation.DisposePercent = 0 config.ProjectLimit.CacheCapacity = 0 config.ProjectLimit.CacheExpiration = 0 - config.Metainfo.SegmentLoop.ListLimit = 0 // Actual testplanet-specific configuration config.Server.Address = planet.NewListenAddress() @@ -619,7 +614,6 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer system.Userinfo.Endpoint = api.Userinfo.Endpoint system.Metabase.DB = api.Metainfo.Metabase - system.Metabase.SegmentLoop = peer.Metainfo.SegmentLoop system.Inspector.Endpoint = api.Inspector.Endpoint diff --git a/satellite/core.go b/satellite/core.go index 0b1bfa83c..b50d54738 100644 --- a/satellite/core.go +++ b/satellite/core.go @@ -36,7 +36,6 @@ import ( "storj.io/storj/satellite/console/emailreminders" "storj.io/storj/satellite/mailservice" "storj.io/storj/satellite/metabase" - "storj.io/storj/satellite/metabase/segmentloop" "storj.io/storj/satellite/metabase/zombiedeletion" "storj.io/storj/satellite/metainfo/expireddeletion" "storj.io/storj/satellite/nodeevents" @@ -95,8 +94,7 @@ type Core struct { } Metainfo struct { - Metabase *metabase.DB - SegmentLoop *segmentloop.Service + Metabase *metabase.DB } Reputation struct { @@ -304,17 +302,6 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, { // setup metainfo peer.Metainfo.Metabase = metabaseDB - - peer.Metainfo.SegmentLoop = segmentloop.New( - peer.Log.Named("metainfo:segmentloop"), - config.Metainfo.SegmentLoop, - peer.Metainfo.Metabase, - ) - peer.Services.Add(lifecycle.Item{ - Name: "metainfo:segmentloop", - Run: peer.Metainfo.SegmentLoop.Run, - Close: peer.Metainfo.SegmentLoop.Close, - }) } { // setup reputation diff --git a/satellite/gc-bf.go b/satellite/gc-bf.go index e2c286d87..114e60abe 100644 --- a/satellite/gc-bf.go +++ b/satellite/gc-bf.go @@ -21,7 +21,6 @@ import ( "storj.io/storj/satellite/gc/bloomfilter" "storj.io/storj/satellite/metabase" "storj.io/storj/satellite/metabase/rangedloop" - "storj.io/storj/satellite/metabase/segmentloop" "storj.io/storj/satellite/overlay" ) @@ -44,10 +43,6 @@ type GarbageCollectionBF struct { DB overlay.DB } - Metainfo struct { - SegmentLoop *segmentloop.Service - } - GarbageCollection struct { Config bloomfilter.Config } diff --git a/satellite/gc/bloomfilter/observer_test.go b/satellite/gc/bloomfilter/observer_test.go index 3ed766e66..fb0558689 100644 --- a/satellite/gc/bloomfilter/observer_test.go +++ b/satellite/gc/bloomfilter/observer_test.go @@ -35,10 +35,7 @@ func TestObserverGarbageCollectionBloomFilters(t *testing.T) { StorageNodeCount: 7, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ - Satellite: func(log *zap.Logger, index int, config *satellite.Config) { - config.Metainfo.SegmentLoop.AsOfSystemInterval = 1 - testplanet.ReconfigureRS(2, 2, 7, 7)(log, index, config) - }, + Satellite: testplanet.ReconfigureRS(2, 2, 7, 7), }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "object", testrand.Bytes(10*memory.KiB)) @@ -172,7 +169,6 @@ func TestObserverGarbageCollectionBloomFilters_AllowNotEmptyBucket(t *testing.T) UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ Satellite: func(log *zap.Logger, index int, config *satellite.Config) { - config.Metainfo.SegmentLoop.AsOfSystemInterval = 1 testplanet.ReconfigureRS(2, 2, 4, 4)(log, index, config) }, }, diff --git a/satellite/metabase/segmentloop/service.go b/satellite/metabase/segmentloop/service.go deleted file mode 100644 index 4412e6303..000000000 --- a/satellite/metabase/segmentloop/service.go +++ /dev/null @@ -1,542 +0,0 @@ -// Copyright (C) 2021 Storj Labs, Inc. -// See LICENSE for copying information. - -package segmentloop - -import ( - "context" - "errors" - "fmt" - "time" - - "github.com/spacemonkeygo/monkit/v3" - "github.com/zeebo/errs" - "go.uber.org/zap" - "golang.org/x/time/rate" - - "storj.io/common/errs2" - "storj.io/common/sync2" - "storj.io/storj/satellite/metabase" -) - -const batchsizeLimit = 5000 - -var ( - mon = monkit.Package() - - // Error is a standard error class for this component. - Error = errs.Class("segments loop") - // ErrClosed is a loop closed error. - ErrClosed = Error.New("loop closed") -) - -// Segment contains information about segment metadata which will be received by observers. -type Segment metabase.LoopSegmentEntry - -// Inline returns true if segment is inline. -func (s Segment) Inline() bool { - return s.Redundancy.IsZero() && len(s.Pieces) == 0 -} - -// Expired checks if segment expired relative to now. -func (s *Segment) Expired(now time.Time) bool { - return s.ExpiresAt != nil && s.ExpiresAt.Before(now) -} - -// PieceSize returns calculated piece size for segment. -func (s Segment) PieceSize() int64 { - return s.Redundancy.PieceSize(int64(s.EncryptedSize)) -} - -// Observer is an interface defining an observer that can subscribe to the segments loop. -// -// architecture: Observer -type Observer interface { - LoopStarted(context.Context, LoopInfo) error - RemoteSegment(context.Context, *Segment) error - InlineSegment(context.Context, *Segment) error -} - -// LoopInfo contains information about the current loop. -type LoopInfo struct { - Started time.Time -} - -// NullObserver is an observer that does nothing. This is useful for joining -// and ensuring the segments loop runs once before you use a real observer. -type NullObserver struct{} - -// LoopStarted is called at each loop start. -func (NullObserver) LoopStarted(context.Context, LoopInfo) error { - return nil -} - -// RemoteSegment implements the Observer interface. -func (NullObserver) RemoteSegment(context.Context, *Segment) error { - return nil -} - -// InlineSegment implements the Observer interface. -func (NullObserver) InlineSegment(context.Context, *Segment) error { - return nil -} - -type observerContext struct { - immediate bool - trigger bool - observer Observer - - ctx context.Context - done chan error - - remote *monkit.DurationDist - inline *monkit.DurationDist -} - -func newObserverContext(ctx context.Context, obs Observer) *observerContext { - name := fmt.Sprintf("%T", obs) - key := monkit.NewSeriesKey("observer").WithTag("name", name) - - return &observerContext{ - observer: obs, - - ctx: ctx, - done: make(chan error), - - inline: monkit.NewDurationDist(key.WithTag("pointer_type", "inline")), - remote: monkit.NewDurationDist(key.WithTag("pointer_type", "remote")), - } -} - -func (observer *observerContext) RemoteSegment(ctx context.Context, segment *Segment) error { - start := time.Now() - defer func() { observer.remote.Insert(time.Since(start)) }() - - return observer.observer.RemoteSegment(ctx, segment) -} - -func (observer *observerContext) InlineSegment(ctx context.Context, segment *Segment) error { - start := time.Now() - defer func() { observer.inline.Insert(time.Since(start)) }() - - return observer.observer.InlineSegment(ctx, segment) -} - -func (observer *observerContext) HandleError(err error) bool { - if err != nil { - observer.done <- err - observer.Finish() - return true - } - return false -} - -func (observer *observerContext) Finish() { - close(observer.done) - - name := fmt.Sprintf("%T", observer.observer) - stats := allObserverStatsCollectors.GetStats(name) - stats.Observe(observer) -} - -func (observer *observerContext) Wait() error { - return <-observer.done -} - -// Config contains configurable values for the segments loop. -type Config struct { - CoalesceDuration time.Duration `help:"how long to wait for new observers before starting iteration" releaseDefault:"5s" devDefault:"5s" testDefault:"1s"` - RateLimit float64 `help:"rate limit (default is 0 which is unlimited segments per second)" default:"0"` - ListLimit int `help:"how many items to query in a batch" default:"2500"` - - AsOfSystemInterval time.Duration `help:"as of system interval" releaseDefault:"-5m" devDefault:"-1us" testDefault:"-1us"` - - SuspiciousProcessedRatio float64 `help:"ratio where to consider processed count as supicious" default:"0.03"` -} - -// MetabaseDB contains iterators for the metabase data. -type MetabaseDB interface { - // Now returns the time on the database. - Now(ctx context.Context) (time.Time, error) - // IterateLoopSegments iterates through all streams passed in as arguments. - IterateLoopSegments(ctx context.Context, opts metabase.IterateLoopSegments, fn func(context.Context, metabase.LoopSegmentsIterator) error) (err error) - - // GetTableStats gathers statistics about the tables. - GetTableStats(context.Context, metabase.GetTableStats) (metabase.TableStats, error) -} - -// Service is a segments loop service. -// -// architecture: Service -type Service struct { - log *zap.Logger - config Config - metabaseDB MetabaseDB - join chan *observerContext - done chan struct{} -} - -// New creates a new segments loop service. -func New(log *zap.Logger, config Config, metabaseDB MetabaseDB) *Service { - return &Service{ - log: log, - metabaseDB: metabaseDB, - config: config, - join: make(chan *observerContext), - done: make(chan struct{}), - } -} - -// Join will join the looper for one full cycle until completion and then returns. -// Joining will trigger a new iteration after coalesce duration. -// On ctx cancel the observer will return without completely finishing. -// Only on full complete iteration it will return nil. -// Safe to be called concurrently. -func (loop *Service) Join(ctx context.Context, observer Observer) (err error) { - return loop.joinObserver(ctx, true, observer) -} - -// Monitor will join the looper for one full cycle until completion and then returns. -// Joining with monitoring won't trigger after coalesce duration. -// On ctx cancel the observer will return without completely finishing. -// Only on full complete iteration it will return nil. -// Safe to be called concurrently. -func (loop *Service) Monitor(ctx context.Context, observer Observer) (err error) { - return loop.joinObserver(ctx, false, observer) -} - -// joinObserver will join the looper for one full cycle until completion and then returns. -// On ctx cancel the observer will return without completely finishing. -// Only on full complete iteration it will return nil. -// Safe to be called concurrently. -func (loop *Service) joinObserver(ctx context.Context, trigger bool, obs Observer) (err error) { - defer mon.Task()(&ctx)(&err) - - obsctx := newObserverContext(ctx, obs) - obsctx.immediate = sync2.IsManuallyTriggeredCycle(ctx) - obsctx.trigger = trigger || obsctx.immediate - - select { - case loop.join <- obsctx: - case <-ctx.Done(): - return ctx.Err() - case <-loop.done: - return ErrClosed - } - - return obsctx.Wait() -} - -// Run starts the looping service. -// It can only be called once, otherwise a panic will occur. -func (loop *Service) Run(ctx context.Context) (err error) { - defer mon.Task()(&ctx)(&err) - - for { - err := loop.RunOnce(ctx) - if err != nil { - loop.log.Error("segment loop failure", zap.Error(err)) - - if errs2.IsCanceled(err) { - return err - } - if ctx.Err() != nil { - return errs.Combine(err, ctx.Err()) - } - - mon.Event("segmentloop_error") //mon:locked - } - } -} - -// Close closes the looping services. -func (loop *Service) Close() (err error) { - close(loop.done) - return nil -} - -// RunOnce goes through segments one time and sends information to observers. -// -// It is not safe to call this concurrently with Run. -func (loop *Service) RunOnce(ctx context.Context) (err error) { - defer mon.Task()(&ctx)(&err) //mon:locked - - coalesceTimer := time.NewTimer(loop.config.CoalesceDuration) - defer coalesceTimer.Stop() - stopTimer(coalesceTimer) - - earlyExit := make(chan *observerContext) - earlyExitDone := make(chan struct{}) - monitorEarlyExit := func(obs *observerContext) { - select { - case <-obs.ctx.Done(): - select { - case <-earlyExitDone: - case earlyExit <- obs: - } - case <-earlyExitDone: - } - } - - timerStarted := false - observers := []*observerContext{} - -waitformore: - for { - select { - // when the coalesce timer hits, we have waited enough for observers to join. - case <-coalesceTimer.C: - break waitformore - - // wait for a new observer to join. - case obsctx := <-loop.join: - // when the observer triggers the loop and it's the first one, - // then start the coalescing timer. - if obsctx.trigger { - if !timerStarted { - coalesceTimer.Reset(loop.config.CoalesceDuration) - timerStarted = true - } - } - - observers = append(observers, obsctx) - go monitorEarlyExit(obsctx) - - if obsctx.immediate { - break waitformore - } - - // remove an observer from waiting when it's canceled before the loop starts. - case obsctx := <-earlyExit: - for i, obs := range observers { - if obs == obsctx { - observers = append(observers[:i], observers[i+1:]...) - break - } - } - - obsctx.HandleError(obsctx.ctx.Err()) - - // reevalute, whether we acually need to start the loop. - timerShouldRun := false - for _, obs := range observers { - timerShouldRun = timerShouldRun || obs.trigger - } - - if !timerShouldRun && timerStarted { - stopTimer(coalesceTimer) - } - - // when ctx done happens we can finish all the waiting observers. - case <-ctx.Done(): - close(earlyExitDone) - errorObservers(observers, ctx.Err()) - return ctx.Err() - } - } - close(earlyExitDone) - - return loop.iterateDatabase(ctx, observers) -} - -func stopTimer(t *time.Timer) { - t.Stop() - // drain if it contains something - select { - case <-t.C: - default: - } -} - -// Wait waits for run to be finished. -// Safe to be called concurrently. -func (loop *Service) Wait() { - <-loop.done -} - -var errNoObservers = errs.New("no observers") - -func (loop *Service) iterateDatabase(ctx context.Context, observers []*observerContext) (err error) { - defer mon.Task()(&ctx)(&err) - - defer func() { - if err != nil { - errorObservers(observers, err) - return - } - finishObservers(observers) - }() - - before, err := loop.metabaseDB.GetTableStats(ctx, metabase.GetTableStats{ - AsOfSystemInterval: loop.config.AsOfSystemInterval, - }) - if err != nil { - return Error.Wrap(err) - } - - var processed processedStats - processed, observers, err = loop.iterateSegments(ctx, observers) - if errors.Is(err, errNoObservers) { - return nil - } - if err != nil { - return Error.Wrap(err) - } - - after, err := loop.metabaseDB.GetTableStats(ctx, metabase.GetTableStats{ - AsOfSystemInterval: loop.config.AsOfSystemInterval, - }) - if err != nil { - return Error.Wrap(err) - } - - if err := loop.verifyCount(before.SegmentCount, after.SegmentCount, processed.segments); err != nil { - return Error.Wrap(err) - } - - return err -} - -func (loop *Service) verifyCount(before, after, processed int64) error { - low, high := before, after - if low > high { - low, high = high, low - } - - var deltaFromBounds int64 - var ratio float64 - if processed < low { - deltaFromBounds = low - processed - // +1 to avoid division by zero - ratio = float64(deltaFromBounds) / float64(low+1) - } else if processed > high { - deltaFromBounds = processed - high - // +1 to avoid division by zero - ratio = float64(deltaFromBounds) / float64(high+1) - } - - mon.IntVal("segmentloop_verify_before").Observe(before) - mon.IntVal("segmentloop_verify_after").Observe(after) - mon.IntVal("segmentloop_verify_processed").Observe(processed) - mon.IntVal("segmentloop_verify_outside").Observe(deltaFromBounds) - mon.FloatVal("segmentloop_verify_outside_ratio").Observe(ratio) - - // If we have very few items from the bounds, then it's expected and the ratio does not capture it well. - const minimumDeltaThreshold = 100 - if deltaFromBounds < minimumDeltaThreshold { - return nil - } - - if ratio > loop.config.SuspiciousProcessedRatio { - return Error.New("processed count looks suspicious: before:%v after:%v processed:%v ratio:%v threshold:%v", before, after, processed, ratio, loop.config.SuspiciousProcessedRatio) - } - - return nil -} - -type processedStats struct { - segments int64 -} - -func (loop *Service) iterateSegments(ctx context.Context, observers []*observerContext) (processed processedStats, _ []*observerContext, err error) { - defer mon.Task()(&ctx)(&err) - - rateLimiter := rate.NewLimiter(rate.Limit(loop.config.RateLimit), 1) - - limit := loop.config.ListLimit - if limit <= 0 || limit > batchsizeLimit { - limit = batchsizeLimit - } - - startingTime, err := loop.metabaseDB.Now(ctx) - if err != nil { - return processed, observers, Error.Wrap(err) - } - - observers = withObservers(ctx, observers, func(ctx context.Context, observer *observerContext) bool { - err := observer.observer.LoopStarted(ctx, LoopInfo{Started: startingTime}) - return !observer.HandleError(err) - }) - - if len(observers) == 0 { - return processed, observers, errNoObservers - } - - err = loop.metabaseDB.IterateLoopSegments(ctx, metabase.IterateLoopSegments{ - BatchSize: limit, - AsOfSystemTime: startingTime, - AsOfSystemInterval: loop.config.AsOfSystemInterval, - }, func(ctx context.Context, iterator metabase.LoopSegmentsIterator) error { - defer mon.TaskNamed("iterateLoopSegmentsCB")(&ctx)(&err) - - var entry metabase.LoopSegmentEntry - for iterator.Next(ctx, &entry) { - if err := ctx.Err(); err != nil { - return err - } - - if loop.config.RateLimit > 0 { - timer := mon.Timer("iterateLoopSegmentsRateLimit").Start() - if err := rateLimiter.Wait(ctx); err != nil { - // We don't really execute concurrent batches so we should never - // exceed the burst size of 1 and this should never happen. - // We can also enter here if the context is cancelled. - timer.Stop() - return err - } - timer.Stop() - } - - observers = withObservers(ctx, observers, func(ctx context.Context, observer *observerContext) bool { - segment := Segment(entry) - return !observer.HandleError(handleSegment(ctx, observer, &segment)) - }) - if len(observers) == 0 { - return errNoObservers - } - - processed.segments++ - mon.IntVal("segmentsProcessed").Observe(processed.segments) //mon:locked - } - return nil - }) - - return processed, observers, err -} - -func withObservers(ctx context.Context, observers []*observerContext, handleObserver func(ctx context.Context, observer *observerContext) bool) []*observerContext { - nextObservers := observers[:0] - for _, observer := range observers { - keepObserver := handleObserver(ctx, observer) - if keepObserver { - nextObservers = append(nextObservers, observer) - } - } - return nextObservers -} - -func handleSegment(ctx context.Context, observer *observerContext, segment *Segment) (err error) { - if segment.Inline() { - if err := observer.InlineSegment(ctx, segment); err != nil { - return err - } - } else { - if err := observer.RemoteSegment(ctx, segment); err != nil { - return err - } - } - - return observer.ctx.Err() -} - -func finishObservers(observers []*observerContext) { - for _, observer := range observers { - observer.Finish() - } -} - -func errorObservers(observers []*observerContext, err error) { - for _, observer := range observers { - observer.HandleError(err) - } -} diff --git a/satellite/metabase/segmentloop/service_test.go b/satellite/metabase/segmentloop/service_test.go deleted file mode 100644 index bd0d67b2a..000000000 --- a/satellite/metabase/segmentloop/service_test.go +++ /dev/null @@ -1,426 +0,0 @@ -// Copyright (C) 2021 Storj Labs, Inc. -// See LICENSE for copying information. - -package segmentloop_test - -import ( - "context" - "errors" - "strconv" - "strings" - "sync/atomic" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/zap" - "go.uber.org/zap/zaptest" - "golang.org/x/sync/errgroup" - - "storj.io/common/errs2" - "storj.io/common/memory" - "storj.io/common/testcontext" - "storj.io/common/testrand" - "storj.io/common/uuid" - "storj.io/storj/private/testplanet" - "storj.io/storj/satellite" - "storj.io/storj/satellite/metabase" - "storj.io/storj/satellite/metabase/segmentloop" -) - -// TestSegmentsLoop does the following -// * upload 5 remote files with 1 segment -// * upload 2 remote files with 2 segments -// * upload 2 inline files -// * connect two observers to the segments loop -// * run the segments loop. -func TestSegmentsLoop(t *testing.T) { - segmentSize := 50 * memory.KiB - - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, - StorageNodeCount: 4, - UplinkCount: 1, - Reconfigure: testplanet.Reconfigure{ - Satellite: func(log *zap.Logger, index int, config *satellite.Config) { - config.Metainfo.SegmentLoop.CoalesceDuration = 1 * time.Second - config.Metainfo.MaxSegmentSize = segmentSize - }, - }, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - ul := planet.Uplinks[0] - satellite := planet.Satellites[0] - segmentLoop := satellite.Metabase.SegmentLoop - - // upload 5 remote objects with 1 segment - for i := 0; i < 5; i++ { - testData := testrand.Bytes(8 * memory.KiB) - path := "/some/remote/path/" + strconv.Itoa(i) - err := ul.Upload(ctx, satellite, "bucket", path, testData) - require.NoError(t, err) - } - - // upload 2 remote objects with 2 segment each - for i := 0; i < 2; i++ { - // exact 2*segmentSize will make inline segment at the end of object - testData := testrand.Bytes(2*segmentSize - 1000) - path := "/some/other/remote/path/" + strconv.Itoa(i) - err := ul.Upload(ctx, satellite, "bucket", path, testData) - require.NoError(t, err) - } - - // upload 2 inline files - for i := 0; i < 2; i++ { - testData := testrand.Bytes(1 * memory.KiB) - path := "/some/inline/path/" + strconv.Itoa(i) - err := ul.Upload(ctx, satellite, "bucket", path, testData) - require.NoError(t, err) - } - - // create 2 observers - obs1 := newTestObserver(nil) - obs2 := newTestObserver(nil) - - var group errgroup.Group - group.Go(func() error { - return segmentLoop.Join(ctx, obs1) - }) - group.Go(func() error { - return segmentLoop.Join(ctx, obs2) - }) - - err := group.Wait() - require.NoError(t, err) - - for _, obs := range []*testObserver{obs1, obs2} { - assert.EqualValues(t, 9, obs.remoteSegCount) - assert.EqualValues(t, 2, obs.inlineSegCount) - assert.EqualValues(t, 11, len(obs.uniqueKeys)) - } - }) -} - -func TestSegmentsLoop_AllData(t *testing.T) { - segmentSize := 8 * memory.KiB - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, - StorageNodeCount: 4, - UplinkCount: 3, - Reconfigure: testplanet.Reconfigure{ - Satellite: func(log *zap.Logger, index int, config *satellite.Config) { - config.Metainfo.SegmentLoop.CoalesceDuration = 1 * time.Second - config.Metainfo.SegmentLoop.ListLimit = 2 - }, - }, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - bucketNames := strings.Split("abc", "") - - data := testrand.Bytes(segmentSize) - for _, up := range planet.Uplinks { - for _, bucketName := range bucketNames { - err := up.Upload(ctx, planet.Satellites[0], "zzz"+bucketName, "1", data) - require.NoError(t, err) - } - } - - loop := planet.Satellites[0].Metabase.SegmentLoop - - obs := newTestObserver(nil) - err := loop.Join(ctx, obs) - require.NoError(t, err) - - gotItems := len(obs.uniqueKeys) - require.Equal(t, len(bucketNames)*len(planet.Uplinks), gotItems) - }) -} - -// TestsegmentsLoopObserverCancel does the following: -// * upload 3 remote segments -// * hook three observers up to segments loop -// * let observer 1 run normally -// * let observer 2 return an error from one of its handlers -// * let observer 3's context be canceled -// * expect observer 1 to see all segments -// * expect observers 2 and 3 to finish with errors. -func TestSegmentsLoopObserverCancel(t *testing.T) { - segmentSize := 8 * memory.KiB - - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, - StorageNodeCount: 4, - UplinkCount: 1, - Reconfigure: testplanet.Reconfigure{ - Satellite: func(log *zap.Logger, index int, config *satellite.Config) { - config.Metainfo.SegmentLoop.CoalesceDuration = 1 * time.Second - }, - }, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - ul := planet.Uplinks[0] - satellite := planet.Satellites[0] - loop := satellite.Metabase.SegmentLoop - - // upload 3 remote files with 1 segment - for i := 0; i < 3; i++ { - testData := testrand.Bytes(segmentSize) - path := "/some/remote/path/" + strconv.Itoa(i) - err := ul.Upload(ctx, satellite, "bucket", path, testData) - require.NoError(t, err) - } - - // create 1 "good" observer - obs1 := newTestObserver(nil) - mon1 := newTestObserver(nil) - - // create observer that will return an error from RemoteSegment - obs2 := newTestObserver(func(ctx context.Context) error { - return errors.New("test error") - }) - - // create observer that will cancel its own context from RemoteSegment - obs3Ctx, cancel := context.WithCancel(ctx) - var once int64 - obs3 := newTestObserver(func(ctx context.Context) error { - if atomic.AddInt64(&once, 1) == 1 { - cancel() - <-obs3Ctx.Done() // ensure we wait for cancellation to propagate - } else { - panic("multiple calls to observer after loop cancel") - } - return nil - }) - - var group errgroup.Group - group.Go(func() error { - return loop.Join(ctx, obs1) - }) - group.Go(func() error { - return loop.Monitor(ctx, mon1) - }) - group.Go(func() error { - err := loop.Join(ctx, obs2) - if err == nil { - return errors.New("got no error") - } - if !strings.Contains(err.Error(), "test error") { - return errors.New("expected to find error") - } - return nil - }) - group.Go(func() error { - err := loop.Join(obs3Ctx, obs3) - if !errs2.IsCanceled(err) { - return errors.New("expected canceled") - } - return nil - }) - - err := group.Wait() - require.NoError(t, err) - - // expect that obs1 saw all three segments, but obs2 and obs3 only saw the first one - assert.EqualValues(t, 3, obs1.remoteSegCount) - assert.EqualValues(t, 3, mon1.remoteSegCount) - assert.EqualValues(t, 1, obs2.remoteSegCount) - assert.EqualValues(t, 1, obs3.remoteSegCount) - }) -} - -// TestSegmentsLoopCancel does the following: -// * upload 3 remote segments -// * hook two observers up to segments loop -// * cancel loop context partway through -// * expect both observers to exit with an error and see fewer than 3 remote segments -// * expect that a new observer attempting to join at this point receives a loop closed error. -func TestSegmentsLoopCancel(t *testing.T) { - segmentSize := 8 * memory.KiB - - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, - StorageNodeCount: 4, - UplinkCount: 1, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - ul := planet.Uplinks[0] - satellite := planet.Satellites[0] - - // upload 3 remote files with 1 segment - for i := 0; i < 3; i++ { - testData := testrand.Bytes(segmentSize) - path := "/some/remote/path/" + strconv.Itoa(i) - err := ul.Upload(ctx, satellite, "bucket", path, testData) - require.NoError(t, err) - } - - loop := segmentloop.New(zaptest.NewLogger(t), segmentloop.Config{ - CoalesceDuration: 1 * time.Second, - ListLimit: 10000, - }, satellite.Metabase.DB) - - // create a cancelable context to pass into metaLoop.Run - loopCtx, cancel := context.WithCancel(ctx) - - // create 1 normal observer - obs1 := newTestObserver(nil) - - var once int64 - // create another normal observer that will wait before returning during RemoteSegment so we can sync with context cancelation - obs2 := newTestObserver(func(ctx context.Context) error { - // cancel context during call to obs2.RemoteSegment inside loop - if atomic.AddInt64(&once, 1) == 1 { - cancel() - <-ctx.Done() // ensure we wait for cancellation to propagate - } else { - panic("multiple calls to observer after loop cancel") - } - return nil - }) - - var group errgroup.Group - - // start loop with cancelable context - group.Go(func() error { - err := loop.Run(loopCtx) - if !errs2.IsCanceled(err) { - return errors.New("expected context canceled") - } - return nil - }) - group.Go(func() error { - err := loop.Join(ctx, obs1) - if !errs2.IsCanceled(err) { - return errors.New("expected context canceled") - } - return nil - }) - group.Go(func() error { - err := loop.Join(ctx, obs2) - if !errs2.IsCanceled(err) { - return errors.New("expected context canceled") - } - return nil - }) - - err := group.Wait() - require.NoError(t, err) - - err = loop.Close() - require.NoError(t, err) - - obs3 := newTestObserver(nil) - err = loop.Join(ctx, obs3) - require.Error(t, err) - assert.Contains(t, err.Error(), "loop closed") - - // expect that obs1 and obs2 each saw fewer than three remote segments - assert.True(t, obs1.remoteSegCount < 3) - assert.True(t, obs2.remoteSegCount < 3) - }) -} - -func TestSegmentsLoop_MonitorCancel(t *testing.T) { - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - satellite := planet.Satellites[0] - - loop := segmentloop.New(zaptest.NewLogger(t), segmentloop.Config{ - CoalesceDuration: time.Nanosecond, - ListLimit: 10000, - }, satellite.Metabase.DB) - - obs1 := newTestObserver(func(ctx context.Context) error { - return errors.New("test error") - }) - - var group errgroup.Group - - loopCtx, loopCancel := context.WithCancel(ctx) - group.Go(func() error { - err := loop.Run(loopCtx) - t.Log("segments loop stopped") - if !errs2.IsCanceled(err) { - return errors.New("expected context canceled") - } - return nil - }) - - obsCtx, obsCancel := context.WithCancel(ctx) - group.Go(func() error { - defer loopCancel() - err := loop.Monitor(obsCtx, obs1) - t.Log("observer stopped") - if !errs2.IsCanceled(err) { - return errors.New("expected context canceled") - } - return nil - }) - - obsCancel() - - err := group.Wait() - require.NoError(t, err) - - err = loop.Close() - require.NoError(t, err) - }) -} - -type testKey struct { - StreamID uuid.UUID - Position metabase.SegmentPosition -} - -type testObserver struct { - remoteSegCount int - inlineSegCount int - uniqueKeys map[testKey]struct{} - onSegment func(context.Context) error // if set, run this during RemoteSegment() -} - -func newTestObserver(onSegment func(context.Context) error) *testObserver { - return &testObserver{ - remoteSegCount: 0, - inlineSegCount: 0, - uniqueKeys: make(map[testKey]struct{}), - onSegment: onSegment, - } -} - -// LoopStarted is called at each start of a loop. -func (obs *testObserver) LoopStarted(ctx context.Context, info segmentloop.LoopInfo) (err error) { - return nil -} - -func (obs *testObserver) RemoteSegment(ctx context.Context, segment *segmentloop.Segment) error { - obs.remoteSegCount++ - - key := testKey{ - StreamID: segment.StreamID, - Position: segment.Position, - } - if _, ok := obs.uniqueKeys[key]; ok { - // TODO: collect the errors and check in test - panic("Expected unique pair StreamID/Position in observer.RemoteSegment") - } - obs.uniqueKeys[key] = struct{}{} - - if obs.onSegment != nil { - return obs.onSegment(ctx) - } - - return nil -} - -func (obs *testObserver) InlineSegment(ctx context.Context, segment *segmentloop.Segment) error { - obs.inlineSegCount++ - key := testKey{ - StreamID: segment.StreamID, - Position: segment.Position, - } - if _, ok := obs.uniqueKeys[key]; ok { - // TODO: collect the errors and check in test - panic("Expected unique pair StreamID/Position in observer.InlineSegment") - } - obs.uniqueKeys[key] = struct{}{} - return nil -} diff --git a/satellite/metabase/segmentloop/stats.go b/satellite/metabase/segmentloop/stats.go deleted file mode 100644 index 827325be9..000000000 --- a/satellite/metabase/segmentloop/stats.go +++ /dev/null @@ -1,79 +0,0 @@ -// Copyright (C) 2021 Storj Labs, Inc. -// See LICENSE for copying information. - -package segmentloop - -import ( - "sync" - "time" - - "github.com/spacemonkeygo/monkit/v3" -) - -var allObserverStatsCollectors = newObserverStatsCollectors() - -type observerStatsCollectors struct { - mu sync.Mutex - observer map[string]*observerStats -} - -func newObserverStatsCollectors() *observerStatsCollectors { - return &observerStatsCollectors{ - observer: make(map[string]*observerStats), - } -} - -func (list *observerStatsCollectors) GetStats(name string) *observerStats { - list.mu.Lock() - defer list.mu.Unlock() - - stats, ok := list.observer[name] - if !ok { - stats = newObserverStats(name) - mon.Chain(stats) - list.observer[name] = stats - } - return stats -} - -// observerStats tracks the most recent observer stats. -type observerStats struct { - mu sync.Mutex - - key monkit.SeriesKey - total time.Duration - inline *monkit.DurationDist - remote *monkit.DurationDist -} - -func newObserverStats(name string) *observerStats { - return &observerStats{ - key: monkit.NewSeriesKey("segment-observer").WithTag("name", name), - total: 0, - inline: nil, - remote: nil, - } -} - -func (stats *observerStats) Observe(observer *observerContext) { - stats.mu.Lock() - defer stats.mu.Unlock() - - stats.total = observer.inline.Sum + observer.remote.Sum - stats.inline = observer.inline - stats.remote = observer.remote -} - -func (stats *observerStats) Stats(cb func(key monkit.SeriesKey, field string, val float64)) { - stats.mu.Lock() - defer stats.mu.Unlock() - - cb(stats.key, "sum", stats.total.Seconds()) - - if stats.inline != nil { - stats.inline.Stats(cb) - } - if stats.remote != nil { - stats.remote.Stats(cb) - } -} diff --git a/satellite/metainfo/config.go b/satellite/metainfo/config.go index 3ce7c0d28..1f1b3b4ad 100644 --- a/satellite/metainfo/config.go +++ b/satellite/metainfo/config.go @@ -13,7 +13,6 @@ import ( "storj.io/common/memory" "storj.io/storj/satellite/metabase" - "storj.io/storj/satellite/metabase/segmentloop" "storj.io/storj/satellite/metainfo/piecedeletion" "storj.io/uplink/private/eestream" ) @@ -140,7 +139,6 @@ type Config struct { MaxNumberOfParts int `default:"10000" help:"maximum number of parts object can contain"` Overlay bool `default:"true" help:"toggle flag if overlay is enabled"` RS RSConfig `releaseDefault:"29/35/80/110-256B" devDefault:"4/6/8/10-256B" help:"redundancy scheme configuration in the format k/m/o/n-sharesize"` - SegmentLoop segmentloop.Config `help:"segment loop configuration"` RateLimiter RateLimiterConfig `help:"rate limiter configuration"` UploadLimiter UploadLimiterConfig `help:"object upload limiter configuration"` ProjectLimits ProjectLimitConfig `help:"project limit configuration"` diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index 54ac75a27..48c641fb5 100755 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -670,21 +670,6 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key # redundancy scheme configuration in the format k/m/o/n-sharesize # metainfo.rs: 29/35/80/110-256 B -# as of system interval -# metainfo.segment-loop.as-of-system-interval: -5m0s - -# how long to wait for new observers before starting iteration -# metainfo.segment-loop.coalesce-duration: 5s - -# how many items to query in a batch -# metainfo.segment-loop.list-limit: 2500 - -# rate limit (default is 0 which is unlimited segments per second) -# metainfo.segment-loop.rate-limit: 0 - -# ratio where to consider processed count as supicious -# metainfo.segment-loop.suspicious-processed-ratio: 0.03 - # enable code for server-side copy, deprecated. please leave this to true. # metainfo.server-side-copy: true