From f19ef4afe5e1f7076c5c9dbbdcc950a816d4924b Mon Sep 17 00:00:00 2001 From: Egon Elbre Date: Tue, 23 Mar 2021 14:14:38 +0200 Subject: [PATCH] satellite/metainfo/metaloop: move loop to a separate package Change-Id: I94c931a27c1af6062185ec62688624ec02050f11 --- cmd/metainfo-loop-benchmark/bench.go | 12 ++--- private/testplanet/satellite.go | 5 +- satellite/accounting/tally/tally.go | 14 +++--- satellite/audit/chore.go | 6 +-- satellite/audit/collector.go | 10 ++-- satellite/audit/reservoir.go | 4 +- satellite/core.go | 5 +- satellite/gc.go | 5 +- satellite/gc/piecetracker.go | 10 ++-- satellite/gc/service.go | 8 +-- satellite/gracefulexit/chore.go | 6 +-- satellite/gracefulexit/pathcollector.go | 10 ++-- satellite/metainfo/config.go | 3 +- .../metainfo/{loop.go => metaloop/service.go} | 50 +++++++++++-------- .../service_test.go} | 12 ++--- .../{loopstats.go => metaloop/stats.go} | 2 +- satellite/metrics/chore.go | 6 +-- satellite/metrics/counter.go | 8 +-- satellite/repair/checker/checker.go | 13 ++--- storagenode/storagenodedb/schema.go | 1 - 20 files changed, 102 insertions(+), 88 deletions(-) rename satellite/metainfo/{loop.go => metaloop/service.go} (89%) rename satellite/metainfo/{loop_test.go => metaloop/service_test.go} (98%) rename satellite/metainfo/{loopstats.go => metaloop/stats.go} (99%) diff --git a/cmd/metainfo-loop-benchmark/bench.go b/cmd/metainfo-loop-benchmark/bench.go index 6d8c5f800..36f22fa23 100644 --- a/cmd/metainfo-loop-benchmark/bench.go +++ b/cmd/metainfo-loop-benchmark/bench.go @@ -19,6 +19,7 @@ import ( "storj.io/common/errs2" "storj.io/common/memory" "storj.io/storj/satellite/metainfo" + "storj.io/storj/satellite/metainfo/metaloop" ) var mon = monkit.Package() @@ -35,7 +36,7 @@ type Bench struct { ProgressPrintFrequency int64 - Loop metainfo.LoopConfig + Loop metaloop.Config } // BindFlags adds bench flags to the the flagset. @@ -100,8 +101,7 @@ func (bench *Bench) Run(ctx context.Context, log *zap.Logger) (err error) { var group errs2.Group - // Passing PointerDB as nil, since metainfo.Loop actually doesn't need it. - loop := metainfo.NewLoop(bench.Loop, mdb) + loop := metaloop.New(bench.Loop, mdb) group.Go(func() error { progress := &ProgressObserver{ @@ -156,7 +156,7 @@ func (progress *ProgressObserver) Report() { } // Object implements the Observer interface. -func (progress *ProgressObserver) Object(context.Context, *metainfo.Object) error { +func (progress *ProgressObserver) Object(context.Context, *metaloop.Object) error { progress.ObjectCount++ if progress.ObjectCount%progress.ProgressPrintFrequency == 0 { progress.Report() @@ -165,13 +165,13 @@ func (progress *ProgressObserver) Object(context.Context, *metainfo.Object) erro } // RemoteSegment implements the Observer interface. -func (progress *ProgressObserver) RemoteSegment(context.Context, *metainfo.Segment) error { +func (progress *ProgressObserver) RemoteSegment(context.Context, *metaloop.Segment) error { progress.RemoteSegmentCount++ return nil } // InlineSegment implements the Observer interface. -func (progress *ProgressObserver) InlineSegment(context.Context, *metainfo.Segment) error { +func (progress *ProgressObserver) InlineSegment(context.Context, *metaloop.Segment) error { progress.InlineSegmentCount++ return nil } diff --git a/private/testplanet/satellite.go b/private/testplanet/satellite.go index 0a8e466a6..3255df533 100644 --- a/private/testplanet/satellite.go +++ b/private/testplanet/satellite.go @@ -50,6 +50,7 @@ import ( "storj.io/storj/satellite/mailservice" "storj.io/storj/satellite/metainfo" "storj.io/storj/satellite/metainfo/expireddeletion" + "storj.io/storj/satellite/metainfo/metaloop" "storj.io/storj/satellite/metainfo/piecedeletion" "storj.io/storj/satellite/metrics" "storj.io/storj/satellite/nodestats" @@ -102,7 +103,7 @@ type Satellite struct { Metabase metainfo.MetabaseDB Service *metainfo.Service Endpoint2 *metainfo.Endpoint - Loop *metainfo.Loop + Loop *metaloop.Service } Inspector struct { @@ -471,7 +472,7 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int Success: atLeastOne(planet.config.StorageNodeCount * 3 / 5), Total: atLeastOne(planet.config.StorageNodeCount * 4 / 5), }, - Loop: metainfo.LoopConfig{ + Loop: metaloop.Config{ CoalesceDuration: 1 * time.Second, ListLimit: 10000, }, diff --git a/satellite/accounting/tally/tally.go b/satellite/accounting/tally/tally.go index be4471531..b25eb0beb 100644 --- a/satellite/accounting/tally/tally.go +++ b/satellite/accounting/tally/tally.go @@ -15,8 +15,8 @@ import ( "storj.io/common/sync2" "storj.io/common/uuid" "storj.io/storj/satellite/accounting" - "storj.io/storj/satellite/metainfo" "storj.io/storj/satellite/metainfo/metabase" + "storj.io/storj/satellite/metainfo/metaloop" ) // Error is a standard error class for this package. @@ -39,7 +39,7 @@ type Service struct { log *zap.Logger Loop *sync2.Cycle - metainfoLoop *metainfo.Loop + metainfoLoop *metaloop.Service liveAccounting accounting.Cache storagenodeAccountingDB accounting.StoragenodeAccounting projectAccountingDB accounting.ProjectAccounting @@ -47,7 +47,7 @@ type Service struct { } // New creates a new tally Service. -func New(log *zap.Logger, sdb accounting.StoragenodeAccounting, pdb accounting.ProjectAccounting, liveAccounting accounting.Cache, metainfoLoop *metainfo.Loop, interval time.Duration) *Service { +func New(log *zap.Logger, sdb accounting.StoragenodeAccounting, pdb accounting.ProjectAccounting, liveAccounting accounting.Cache, metainfoLoop *metaloop.Service, interval time.Duration) *Service { return &Service{ log: log, Loop: sync2.NewCycle(interval), @@ -239,7 +239,7 @@ func (service *Service) Tally(ctx context.Context) (err error) { return errs.Combine(errAtRest, errBucketInfo) } -var _ metainfo.Observer = (*Observer)(nil) +var _ metaloop.Observer = (*Observer)(nil) // Observer observes metainfo and adds up tallies for nodes and buckets. type Observer struct { @@ -274,7 +274,7 @@ func (observer *Observer) ensureBucket(ctx context.Context, location metabase.Ob } // Object is called for each object once. -func (observer *Observer) Object(ctx context.Context, object *metainfo.Object) (err error) { +func (observer *Observer) Object(ctx context.Context, object *metaloop.Object) (err error) { if object.Expired(observer.Now) { return nil } @@ -287,7 +287,7 @@ func (observer *Observer) Object(ctx context.Context, object *metainfo.Object) ( } // InlineSegment is called for each inline segment. -func (observer *Observer) InlineSegment(ctx context.Context, segment *metainfo.Segment) (err error) { +func (observer *Observer) InlineSegment(ctx context.Context, segment *metaloop.Segment) (err error) { if segment.Expired(observer.Now) { return nil } @@ -300,7 +300,7 @@ func (observer *Observer) InlineSegment(ctx context.Context, segment *metainfo.S } // RemoteSegment is called for each remote segment. -func (observer *Observer) RemoteSegment(ctx context.Context, segment *metainfo.Segment) (err error) { +func (observer *Observer) RemoteSegment(ctx context.Context, segment *metaloop.Segment) (err error) { if segment.Expired(observer.Now) { return nil } diff --git a/satellite/audit/chore.go b/satellite/audit/chore.go index 802d889c8..e466e1590 100644 --- a/satellite/audit/chore.go +++ b/satellite/audit/chore.go @@ -11,7 +11,7 @@ import ( "go.uber.org/zap" "storj.io/common/sync2" - "storj.io/storj/satellite/metainfo" + "storj.io/storj/satellite/metainfo/metaloop" ) // Chore populates reservoirs and the audit queue. @@ -23,12 +23,12 @@ type Chore struct { queues *Queues Loop *sync2.Cycle - metainfoLoop *metainfo.Loop + metainfoLoop *metaloop.Service config Config } // NewChore instantiates Chore. -func NewChore(log *zap.Logger, queues *Queues, metaLoop *metainfo.Loop, config Config) *Chore { +func NewChore(log *zap.Logger, queues *Queues, metaLoop *metaloop.Service, config Config) *Chore { return &Chore{ log: log, rand: rand.New(rand.NewSource(time.Now().Unix())), diff --git a/satellite/audit/collector.go b/satellite/audit/collector.go index a87419b82..3d1474dc1 100644 --- a/satellite/audit/collector.go +++ b/satellite/audit/collector.go @@ -8,10 +8,10 @@ import ( "math/rand" "storj.io/common/storj" - "storj.io/storj/satellite/metainfo" + "storj.io/storj/satellite/metainfo/metaloop" ) -var _ metainfo.Observer = (*Collector)(nil) +var _ metaloop.Observer = (*Collector)(nil) // Collector uses the metainfo loop to add segments to node reservoirs. type Collector struct { @@ -30,7 +30,7 @@ func NewCollector(reservoirSlots int, r *rand.Rand) *Collector { } // RemoteSegment takes a remote segment found in metainfo and creates a reservoir for it if it doesn't exist already. -func (collector *Collector) RemoteSegment(ctx context.Context, segment *metainfo.Segment) (err error) { +func (collector *Collector) RemoteSegment(ctx context.Context, segment *metaloop.Segment) (err error) { for _, piece := range segment.Pieces { if _, ok := collector.Reservoirs[piece.StorageNode]; !ok { collector.Reservoirs[piece.StorageNode] = NewReservoir(collector.slotCount) @@ -41,11 +41,11 @@ func (collector *Collector) RemoteSegment(ctx context.Context, segment *metainfo } // Object returns nil because the audit service does not interact with objects. -func (collector *Collector) Object(ctx context.Context, object *metainfo.Object) (err error) { +func (collector *Collector) Object(ctx context.Context, object *metaloop.Object) (err error) { return nil } // InlineSegment returns nil because we're only auditing for storage nodes for now. -func (collector *Collector) InlineSegment(ctx context.Context, segment *metainfo.Segment) (err error) { +func (collector *Collector) InlineSegment(ctx context.Context, segment *metaloop.Segment) (err error) { return nil } diff --git a/satellite/audit/reservoir.go b/satellite/audit/reservoir.go index 770172cf7..2bbbd5232 100644 --- a/satellite/audit/reservoir.go +++ b/satellite/audit/reservoir.go @@ -8,8 +8,8 @@ import ( "time" "storj.io/common/uuid" - "storj.io/storj/satellite/metainfo" "storj.io/storj/satellite/metainfo/metabase" + "storj.io/storj/satellite/metainfo/metaloop" ) const maxReservoirSize = 3 @@ -56,7 +56,7 @@ type Segment struct { } // NewSegment creates a new segment to audit from a metainfo loop segment. -func NewSegment(loopSegment *metainfo.Segment) Segment { +func NewSegment(loopSegment *metaloop.Segment) Segment { return Segment{ SegmentLocation: loopSegment.Location, StreamID: loopSegment.StreamID, diff --git a/satellite/core.go b/satellite/core.go index 351835f59..1a294a69d 100644 --- a/satellite/core.go +++ b/satellite/core.go @@ -35,6 +35,7 @@ import ( "storj.io/storj/satellite/gracefulexit" "storj.io/storj/satellite/metainfo" "storj.io/storj/satellite/metainfo/expireddeletion" + "storj.io/storj/satellite/metainfo/metaloop" "storj.io/storj/satellite/metrics" "storj.io/storj/satellite/orders" "storj.io/storj/satellite/overlay" @@ -83,7 +84,7 @@ type Core struct { Database metainfo.PointerDB // TODO: move into pointerDB Metabase metainfo.MetabaseDB Service *metainfo.Service - Loop *metainfo.Loop + Loop *metaloop.Service } Orders struct { @@ -280,7 +281,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, peer.DB.Buckets(), peer.Metainfo.Metabase, ) - peer.Metainfo.Loop = metainfo.NewLoop( + peer.Metainfo.Loop = metaloop.New( config.Metainfo.Loop, peer.Metainfo.Metabase, ) diff --git a/satellite/gc.go b/satellite/gc.go index 8d04a16f4..7c0e2a402 100644 --- a/satellite/gc.go +++ b/satellite/gc.go @@ -24,6 +24,7 @@ import ( version_checker "storj.io/storj/private/version/checker" "storj.io/storj/satellite/gc" "storj.io/storj/satellite/metainfo" + "storj.io/storj/satellite/metainfo/metaloop" "storj.io/storj/satellite/metrics" "storj.io/storj/satellite/overlay" ) @@ -57,7 +58,7 @@ type GarbageCollection struct { Metainfo struct { Database metainfo.PointerDB - Loop *metainfo.Loop + Loop *metaloop.Service } GarbageCollection struct { @@ -140,7 +141,7 @@ func NewGarbageCollection(log *zap.Logger, full *identity.FullIdentity, db DB, // GC runs infrequently, this shouldn't add too much extra load on the metainfo db. // As long as garbage collection is the only observer joining the metainfo loop, then by default // the metainfo loop will only run when the garbage collection joins (which happens every GarbageCollection.Interval) - peer.Metainfo.Loop = metainfo.NewLoop( + peer.Metainfo.Loop = metaloop.New( config.Metainfo.Loop, metabaseDB, ) diff --git a/satellite/gc/piecetracker.go b/satellite/gc/piecetracker.go index 9f1bcf980..46b42e4be 100644 --- a/satellite/gc/piecetracker.go +++ b/satellite/gc/piecetracker.go @@ -12,10 +12,10 @@ import ( "storj.io/common/bloomfilter" "storj.io/common/memory" "storj.io/common/storj" - "storj.io/storj/satellite/metainfo" + "storj.io/storj/satellite/metainfo/metaloop" ) -var _ metainfo.Observer = (*PieceTracker)(nil) +var _ metaloop.Observer = (*PieceTracker)(nil) // PieceTracker implements the metainfo loop observer interface for garbage collection. // @@ -43,7 +43,7 @@ func NewPieceTracker(log *zap.Logger, config Config, pieceCounts map[storj.NodeI } // RemoteSegment takes a remote segment found in metainfo and adds pieces to bloom filters. -func (pieceTracker *PieceTracker) RemoteSegment(ctx context.Context, segment *metainfo.Segment) (err error) { +func (pieceTracker *PieceTracker) RemoteSegment(ctx context.Context, segment *metaloop.Segment) (err error) { defer mon.Task()(&ctx)(&err) for _, piece := range segment.Pieces { @@ -55,12 +55,12 @@ func (pieceTracker *PieceTracker) RemoteSegment(ctx context.Context, segment *me } // Object returns nil because gc does not interact with remote objects. -func (pieceTracker *PieceTracker) Object(ctx context.Context, object *metainfo.Object) (err error) { +func (pieceTracker *PieceTracker) Object(ctx context.Context, object *metaloop.Object) (err error) { return nil } // InlineSegment returns nil because we're only doing gc for storage nodes for now. -func (pieceTracker *PieceTracker) InlineSegment(ctx context.Context, segment *metainfo.Segment) (err error) { +func (pieceTracker *PieceTracker) InlineSegment(ctx context.Context, segment *metaloop.Segment) (err error) { return nil } diff --git a/satellite/gc/service.go b/satellite/gc/service.go index 678b7cbf8..7a6332e92 100644 --- a/satellite/gc/service.go +++ b/satellite/gc/service.go @@ -16,7 +16,7 @@ import ( "storj.io/common/rpc" "storj.io/common/storj" "storj.io/common/sync2" - "storj.io/storj/satellite/metainfo" + "storj.io/storj/satellite/metainfo/metaloop" "storj.io/storj/satellite/overlay" "storj.io/uplink/private/piecestore" ) @@ -51,7 +51,7 @@ type Service struct { dialer rpc.Dialer overlay overlay.DB - metainfoLoop *metainfo.Loop + metainfoLoop *metaloop.Service } // RetainInfo contains info needed for a storage node to retain important data and delete garbage data. @@ -62,7 +62,7 @@ type RetainInfo struct { } // NewService creates a new instance of the gc service. -func NewService(log *zap.Logger, config Config, dialer rpc.Dialer, overlay overlay.DB, loop *metainfo.Loop) *Service { +func NewService(log *zap.Logger, config Config, dialer rpc.Dialer, overlay overlay.DB, loop *metaloop.Service) *Service { return &Service{ log: log, config: config, @@ -83,7 +83,7 @@ func (service *Service) Run(ctx context.Context) (err error) { if service.config.SkipFirst { // make sure the metainfo loop runs once - err = service.metainfoLoop.Join(ctx, metainfo.NullObserver{}) + err = service.metainfoLoop.Join(ctx, metaloop.NullObserver{}) if err != nil { return err } diff --git a/satellite/gracefulexit/chore.go b/satellite/gracefulexit/chore.go index 66a620365..0b6f31088 100644 --- a/satellite/gracefulexit/chore.go +++ b/satellite/gracefulexit/chore.go @@ -13,7 +13,7 @@ import ( "storj.io/common/storj" "storj.io/common/sync2" - "storj.io/storj/satellite/metainfo" + "storj.io/storj/satellite/metainfo/metaloop" "storj.io/storj/satellite/overlay" ) @@ -26,11 +26,11 @@ type Chore struct { db DB config Config overlay overlay.DB - metainfoLoop *metainfo.Loop + metainfoLoop *metaloop.Service } // NewChore instantiates Chore. -func NewChore(log *zap.Logger, db DB, overlay overlay.DB, metaLoop *metainfo.Loop, config Config) *Chore { +func NewChore(log *zap.Logger, db DB, overlay overlay.DB, metaLoop *metaloop.Service, config Config) *Chore { return &Chore{ log: log, Loop: sync2.NewCycle(config.ChoreInterval), diff --git a/satellite/gracefulexit/pathcollector.go b/satellite/gracefulexit/pathcollector.go index e5a97e97e..50f7a5042 100644 --- a/satellite/gracefulexit/pathcollector.go +++ b/satellite/gracefulexit/pathcollector.go @@ -11,11 +11,11 @@ import ( "go.uber.org/zap" "storj.io/common/storj" - "storj.io/storj/satellite/metainfo" + "storj.io/storj/satellite/metainfo/metaloop" "storj.io/uplink/private/eestream" ) -var _ metainfo.Observer = (*PathCollector)(nil) +var _ metaloop.Observer = (*PathCollector)(nil) // PathCollector uses the metainfo loop to add paths to node reservoirs. // @@ -55,7 +55,7 @@ func (collector *PathCollector) Flush(ctx context.Context) (err error) { } // RemoteSegment takes a remote segment found in metainfo and creates a graceful exit transfer queue item if it doesn't exist already. -func (collector *PathCollector) RemoteSegment(ctx context.Context, segment *metainfo.Segment) (err error) { +func (collector *PathCollector) RemoteSegment(ctx context.Context, segment *metaloop.Segment) (err error) { if len(collector.nodeIDStorage) == 0 { return nil } @@ -99,12 +99,12 @@ func (collector *PathCollector) RemoteSegment(ctx context.Context, segment *meta } // Object returns nil because the audit service does not interact with objects. -func (collector *PathCollector) Object(ctx context.Context, object *metainfo.Object) (err error) { +func (collector *PathCollector) Object(ctx context.Context, object *metaloop.Object) (err error) { return nil } // InlineSegment returns nil because we're only auditing for storage nodes for now. -func (collector *PathCollector) InlineSegment(ctx context.Context, segment *metainfo.Segment) (err error) { +func (collector *PathCollector) InlineSegment(ctx context.Context, segment *metaloop.Segment) (err error) { return nil } diff --git a/satellite/metainfo/config.go b/satellite/metainfo/config.go index 9c81872de..04028af09 100644 --- a/satellite/metainfo/config.go +++ b/satellite/metainfo/config.go @@ -18,6 +18,7 @@ import ( "storj.io/common/uuid" "storj.io/storj/private/dbutil" "storj.io/storj/satellite/metainfo/metabase" + "storj.io/storj/satellite/metainfo/metaloop" "storj.io/storj/satellite/metainfo/piecedeletion" "storj.io/storj/storage" "storj.io/storj/storage/cockroachkv" @@ -125,7 +126,7 @@ type Config struct { MaxCommitInterval time.Duration `default:"48h" help:"maximum time allowed to pass between creating and committing a segment"` Overlay bool `default:"true" help:"toggle flag if overlay is enabled"` RS RSConfig `releaseDefault:"29/35/80/110-256B" devDefault:"4/6/8/10-256B" help:"redundancy scheme configuration in the format k/m/o/n-sharesize"` - Loop LoopConfig `help:"loop configuration"` + Loop metaloop.Config `help:"loop configuration"` RateLimiter RateLimiterConfig `help:"rate limiter configuration"` ProjectLimits ProjectLimitConfig `help:"project limit configuration"` PieceDeletion piecedeletion.Config `help:"piece deletion configuration"` diff --git a/satellite/metainfo/loop.go b/satellite/metainfo/metaloop/service.go similarity index 89% rename from satellite/metainfo/loop.go rename to satellite/metainfo/metaloop/service.go index 5a860c382..ed7dff6d6 100644 --- a/satellite/metainfo/loop.go +++ b/satellite/metainfo/metaloop/service.go @@ -1,7 +1,7 @@ // Copyright (C) 2019 Storj Labs, Inc. // See LICENSE for copying information. -package metainfo +package metaloop import ( "context" @@ -20,10 +20,12 @@ import ( const batchsizeLimit = 2500 var ( - // LoopError is a standard error class for this component. - LoopError = errs.Class("metainfo loop error") - // LoopClosedError is a loop closed error. - LoopClosedError = LoopError.New("loop closed") + mon = monkit.Package() + + // Error is a standard error class for this component. + Error = errs.Class("metainfo loop error") + // ErrClosed is a loop closed error. + ErrClosed = Error.New("loop closed") ) // Object is the object info passed to Observer by metainfo loop. @@ -146,26 +148,34 @@ func (observer *observerContext) Wait() error { return <-observer.done } -// LoopConfig contains configurable values for the metainfo loop. -type LoopConfig struct { +// Config contains configurable values for the metainfo loop. +type Config struct { CoalesceDuration time.Duration `help:"how long to wait for new observers before starting iteration" releaseDefault:"5s" devDefault:"5s"` RateLimit float64 `help:"rate limit (default is 0 which is unlimited segments per second)" default:"0"` ListLimit int `help:"how many items to query in a batch" default:"2500"` } -// Loop is a metainfo loop service. +// MetabaseDB contains iterators for the metabase data. +type MetabaseDB interface { + // IterateLoopObjects iterates through all objects in metabase for metainfo loop purpose. + IterateLoopObjects(ctx context.Context, opts metabase.IterateLoopObjects, fn func(context.Context, metabase.LoopObjectsIterator) error) (err error) + // IterateLoopStreams iterates through all streams passed in as arguments. + IterateLoopStreams(ctx context.Context, opts metabase.IterateLoopStreams, handleStream func(ctx context.Context, streamID uuid.UUID, next metabase.SegmentIterator) error) (err error) +} + +// Service is a metainfo loop service. // // architecture: Service -type Loop struct { - config LoopConfig +type Service struct { + config Config metabaseDB MetabaseDB join chan []*observerContext done chan struct{} } -// NewLoop creates a new metainfo loop service. -func NewLoop(config LoopConfig, metabaseDB MetabaseDB) *Loop { - return &Loop{ +// New creates a new metainfo loop service. +func New(config Config, metabaseDB MetabaseDB) *Service { + return &Service{ metabaseDB: metabaseDB, config: config, join: make(chan []*observerContext), @@ -177,7 +187,7 @@ func NewLoop(config LoopConfig, metabaseDB MetabaseDB) *Loop { // On ctx cancel the observer will return without completely finishing. // Only on full complete iteration it will return nil. // Safe to be called concurrently. -func (loop *Loop) Join(ctx context.Context, observers ...Observer) (err error) { +func (loop *Service) Join(ctx context.Context, observers ...Observer) (err error) { defer mon.Task()(&ctx)(&err) obsContexts := make([]*observerContext, len(observers)) @@ -190,7 +200,7 @@ func (loop *Loop) Join(ctx context.Context, observers ...Observer) (err error) { case <-ctx.Done(): return ctx.Err() case <-loop.done: - return LoopClosedError + return ErrClosed } var errList errs.Group @@ -203,7 +213,7 @@ func (loop *Loop) Join(ctx context.Context, observers ...Observer) (err error) { // Run starts the looping service. // It can only be called once, otherwise a panic will occur. -func (loop *Loop) Run(ctx context.Context) (err error) { +func (loop *Service) Run(ctx context.Context) (err error) { defer mon.Task()(&ctx)(&err) for { @@ -215,7 +225,7 @@ func (loop *Loop) Run(ctx context.Context) (err error) { } // Close closes the looping services. -func (loop *Loop) Close() (err error) { +func (loop *Service) Close() (err error) { close(loop.done) return nil } @@ -223,7 +233,7 @@ func (loop *Loop) Close() (err error) { // RunOnce goes through metainfo one time and sends information to observers. // // It is not safe to call this concurrently with Run. -func (loop *Loop) RunOnce(ctx context.Context) (err error) { +func (loop *Service) RunOnce(ctx context.Context) (err error) { defer mon.Task()(&ctx)(&err) var observers []*observerContext @@ -255,7 +265,7 @@ waitformore: // Wait waits for run to be finished. // Safe to be called concurrently. -func (loop *Loop) Wait() { +func (loop *Service) Wait() { <-loop.done } @@ -272,7 +282,7 @@ func iterateDatabase(ctx context.Context, metabaseDB MetabaseDB, observers []*ob observers, err = iterateObjects(ctx, metabaseDB, observers, limit, rateLimiter) if err != nil { - return LoopError.Wrap(err) + return Error.Wrap(err) } return err diff --git a/satellite/metainfo/loop_test.go b/satellite/metainfo/metaloop/service_test.go similarity index 98% rename from satellite/metainfo/loop_test.go rename to satellite/metainfo/metaloop/service_test.go index 886638170..157411baa 100644 --- a/satellite/metainfo/loop_test.go +++ b/satellite/metainfo/metaloop/service_test.go @@ -1,7 +1,7 @@ // Copyright (C) 2019 Storj Labs, Inc. // See LICENSE for copying information. -package metainfo_test +package metaloop_test import ( "context" @@ -23,8 +23,8 @@ import ( "storj.io/common/testrand" "storj.io/storj/private/testplanet" "storj.io/storj/satellite" - "storj.io/storj/satellite/metainfo" "storj.io/storj/satellite/metainfo/metabase" + "storj.io/storj/satellite/metainfo/metaloop" "storj.io/uplink/private/multipart" ) @@ -313,7 +313,7 @@ func TestLoopCancel(t *testing.T) { } // create a new metainfo loop - metaLoop := metainfo.NewLoop(metainfo.LoopConfig{ + metaLoop := metaloop.New(metaloop.Config{ CoalesceDuration: 1 * time.Second, ListLimit: 10000, }, satellite.Metainfo.Metabase) @@ -399,7 +399,7 @@ func newTestObserver(onSegment func(context.Context) error) *testObserver { } } -func (obs *testObserver) RemoteSegment(ctx context.Context, segment *metainfo.Segment) error { +func (obs *testObserver) RemoteSegment(ctx context.Context, segment *metaloop.Segment) error { obs.remoteSegCount++ key := segment.Location.Encode() @@ -416,13 +416,13 @@ func (obs *testObserver) RemoteSegment(ctx context.Context, segment *metainfo.Se return nil } -func (obs *testObserver) Object(ctx context.Context, object *metainfo.Object) error { +func (obs *testObserver) Object(ctx context.Context, object *metaloop.Object) error { obs.objectCount++ obs.totalMetadataSize += object.EncryptedMetadataSize return nil } -func (obs *testObserver) InlineSegment(ctx context.Context, segment *metainfo.Segment) error { +func (obs *testObserver) InlineSegment(ctx context.Context, segment *metaloop.Segment) error { obs.inlineSegCount++ key := segment.Location.Encode() if _, ok := obs.uniquePaths[string(key)]; ok { diff --git a/satellite/metainfo/loopstats.go b/satellite/metainfo/metaloop/stats.go similarity index 99% rename from satellite/metainfo/loopstats.go rename to satellite/metainfo/metaloop/stats.go index 242707026..2a7bae6e7 100644 --- a/satellite/metainfo/loopstats.go +++ b/satellite/metainfo/metaloop/stats.go @@ -1,7 +1,7 @@ // Copyright (C) 2020 Storj Labs, Inc. // See LICENSE for copying information. -package metainfo +package metaloop import ( "sync" diff --git a/satellite/metrics/chore.go b/satellite/metrics/chore.go index ffa37c7ee..e23bee077 100644 --- a/satellite/metrics/chore.go +++ b/satellite/metrics/chore.go @@ -12,7 +12,7 @@ import ( "go.uber.org/zap" "storj.io/common/sync2" - "storj.io/storj/satellite/metainfo" + "storj.io/storj/satellite/metainfo/metaloop" ) var ( @@ -33,12 +33,12 @@ type Chore struct { log *zap.Logger config Config Loop *sync2.Cycle - metainfoLoop *metainfo.Loop + metainfoLoop *metaloop.Service Counter *Counter } // NewChore creates a new instance of the metrics chore. -func NewChore(log *zap.Logger, config Config, loop *metainfo.Loop) *Chore { +func NewChore(log *zap.Logger, config Config, loop *metaloop.Service) *Chore { return &Chore{ log: log, config: config, diff --git a/satellite/metrics/counter.go b/satellite/metrics/counter.go index 2ea4d5edd..e083b46bf 100644 --- a/satellite/metrics/counter.go +++ b/satellite/metrics/counter.go @@ -7,7 +7,7 @@ import ( "context" "storj.io/common/uuid" - "storj.io/storj/satellite/metainfo" + "storj.io/storj/satellite/metainfo/metaloop" ) // Counter implements the metainfo loop observer interface for data science metrics collection. @@ -26,7 +26,7 @@ func NewCounter() *Counter { } // Object increments the count for total objects and for inline objects in case the object has no segments. -func (counter *Counter) Object(ctx context.Context, object *metainfo.Object) (err error) { +func (counter *Counter) Object(ctx context.Context, object *metaloop.Object) (err error) { defer mon.Task()(&ctx)(&err) counter.ObjectCount++ @@ -35,7 +35,7 @@ func (counter *Counter) Object(ctx context.Context, object *metainfo.Object) (er } // RemoteSegment increments the count for objects with remote segments. -func (counter *Counter) RemoteSegment(ctx context.Context, segment *metainfo.Segment) (err error) { +func (counter *Counter) RemoteSegment(ctx context.Context, segment *metaloop.Segment) (err error) { defer mon.Task()(&ctx)(&err) if counter.checkObjectRemoteness == segment.StreamID { @@ -47,7 +47,7 @@ func (counter *Counter) RemoteSegment(ctx context.Context, segment *metainfo.Seg } // InlineSegment increments the count for inline objects. -func (counter *Counter) InlineSegment(ctx context.Context, segment *metainfo.Segment) (err error) { +func (counter *Counter) InlineSegment(ctx context.Context, segment *metaloop.Segment) (err error) { return nil } diff --git a/satellite/repair/checker/checker.go b/satellite/repair/checker/checker.go index b4c788edd..53f4cf457 100644 --- a/satellite/repair/checker/checker.go +++ b/satellite/repair/checker/checker.go @@ -19,6 +19,7 @@ import ( "storj.io/storj/satellite/internalpb" "storj.io/storj/satellite/metainfo" "storj.io/storj/satellite/metainfo/metabase" + "storj.io/storj/satellite/metainfo/metaloop" "storj.io/storj/satellite/overlay" "storj.io/storj/satellite/repair" "storj.io/storj/satellite/repair/irreparable" @@ -39,7 +40,7 @@ type Checker struct { repairQueue queue.RepairQueue irrdb irreparable.DB metabase metainfo.MetabaseDB - metaLoop *metainfo.Loop + metaLoop *metaloop.Service nodestate *ReliabilityCache statsCollector *statsCollector repairOverrides RepairOverridesMap @@ -49,7 +50,7 @@ type Checker struct { } // NewChecker creates a new instance of checker. -func NewChecker(logger *zap.Logger, repairQueue queue.RepairQueue, irrdb irreparable.DB, metabase metainfo.MetabaseDB, metaLoop *metainfo.Loop, overlay *overlay.Service, config Config) *Checker { +func NewChecker(logger *zap.Logger, repairQueue queue.RepairQueue, irrdb irreparable.DB, metabase metainfo.MetabaseDB, metaLoop *metaloop.Service, overlay *overlay.Service, config Config) *Checker { return &Checker{ logger: logger, @@ -260,7 +261,7 @@ func (checker *Checker) updateIrreparableSegmentStatus(ctx context.Context, key return nil } -var _ metainfo.Observer = (*checkerObserver)(nil) +var _ metaloop.Observer = (*checkerObserver)(nil) // checkerObserver implements the metainfo loop Observer interface. // @@ -294,7 +295,7 @@ func (obs *checkerObserver) loadRedundancy(redundancy storj.RedundancyScheme) (i return int(redundancy.RequiredShares), repair, int(redundancy.OptimalShares), int(redundancy.TotalShares) } -func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metainfo.Segment) (err error) { +func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metaloop.Segment) (err error) { defer mon.Task()(&ctx)(&err) // ignore segment if expired @@ -453,7 +454,7 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metainfo return nil } -func (obs *checkerObserver) Object(ctx context.Context, object *metainfo.Object) (err error) { +func (obs *checkerObserver) Object(ctx context.Context, object *metaloop.Object) (err error) { defer mon.Task()(&ctx)(&err) obs.monStats.objectsChecked++ @@ -470,7 +471,7 @@ func (obs *checkerObserver) Object(ctx context.Context, object *metainfo.Object) return nil } -func (obs *checkerObserver) InlineSegment(ctx context.Context, segment *metainfo.Segment) (err error) { +func (obs *checkerObserver) InlineSegment(ctx context.Context, segment *metaloop.Segment) (err error) { defer mon.Task()(&ctx)(&err) // TODO: check for expired segments diff --git a/storagenode/storagenodedb/schema.go b/storagenode/storagenodedb/schema.go index d17fb7c64..90de597b8 100644 --- a/storagenode/storagenodedb/schema.go +++ b/storagenode/storagenodedb/schema.go @@ -724,4 +724,3 @@ func Schema() map[string]*dbschema.Schema { "used_serial": &dbschema.Schema{}, } } -