diff --git a/satellite/audit/reservoirservice.go b/satellite/audit/reservoirservice.go new file mode 100644 index 000000000..175954faa --- /dev/null +++ b/satellite/audit/reservoirservice.go @@ -0,0 +1,66 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package audit + +import ( + "context" + "math/rand" + "time" + + "go.uber.org/zap" + + "storj.io/storj/internal/sync2" + "storj.io/storj/pkg/storj" + "storj.io/storj/satellite/metainfo" +) + +// ReservoirService is a temp name for the service struct during the audit 2.0 refactor. +// Once V3-2363 and V3-2364 are implemented, ReservoirService will replace the existing Service struct. +type ReservoirService struct { + log *zap.Logger + + reservoirSlots int + Reservoirs map[storj.NodeID]*Reservoir + rand *rand.Rand + + MetainfoLoop *metainfo.Loop + Loop sync2.Cycle +} + +// NewReservoirService instantiates ReservoirService +func NewReservoirService(log *zap.Logger, metaLoop *metainfo.Loop, config Config) *ReservoirService { + return &ReservoirService{ + log: log, + + reservoirSlots: config.Slots, + rand: rand.New(rand.NewSource(time.Now().Unix())), + + MetainfoLoop: metaLoop, + Loop: *sync2.NewCycle(config.Interval), + } +} + +// Run runs auditing service 2.0 +func (service *ReservoirService) Run(ctx context.Context) (err error) { + defer mon.Task()(&ctx)(&err) + service.log.Info("audit 2.0 is starting up") + + return service.Loop.Run(ctx, func(ctx context.Context) (err error) { + defer mon.Task()(&ctx)(&err) + pathCollector := NewPathCollector(service.reservoirSlots, service.rand) + err = service.MetainfoLoop.Join(ctx, pathCollector) + if err != nil { + service.log.Error("error joining metainfoloop", zap.Error(err)) + return nil + } + service.Reservoirs = pathCollector.Reservoirs + return nil + }) +} + +// Close halts the reservoir service loop +func (service *ReservoirService) Close() error { + service.Loop.Close() + return nil +} diff --git a/satellite/audit/service.go b/satellite/audit/service.go index 0916a6572..53ecded90 100644 --- a/satellite/audit/service.go +++ b/satellite/audit/service.go @@ -5,7 +5,6 @@ package audit import ( "context" - "math/rand" "time" "github.com/zeebo/errs" @@ -61,50 +60,6 @@ func NewService(log *zap.Logger, config Config, metainfo *metainfo.Service, }, nil } -// ReservoirService is a temp name for the service struct during the audit 2.0 refactor. -// Once V3-2363 and V3-2364 are implemented, Service2 will replace the existing Service struct. -type ReservoirService struct { - log *zap.Logger - - reservoirSlots int - Reservoirs map[storj.NodeID]*Reservoir - rand *rand.Rand - - MetainfoLoop *metainfo.Loop - Loop sync2.Cycle -} - -// NewReservoirService instantiates Service2 -func NewReservoirService(log *zap.Logger, metaLoop *metainfo.Loop, config Config) (*ReservoirService, error) { - return &ReservoirService{ - log: log, - - reservoirSlots: config.Slots, - rand: rand.New(rand.NewSource(time.Now().Unix())), - - MetainfoLoop: metaLoop, - Loop: *sync2.NewCycle(config.Interval), - }, nil -} - -// Run runs auditing service 2.0 -func (service *ReservoirService) Run(ctx context.Context) (err error) { - defer mon.Task()(&ctx)(&err) - service.log.Info("audit 2.0 is starting up") - - return service.Loop.Run(ctx, func(ctx context.Context) (err error) { - defer mon.Task()(&ctx)(&err) - pathCollector := NewPathCollector(service.reservoirSlots, service.rand) - err = service.MetainfoLoop.Join(ctx, pathCollector) - if err != nil { - service.log.Error("error joining metainfoloop", zap.Error(err)) - return nil - } - service.Reservoirs = pathCollector.Reservoirs - return nil - }) -} - // Run runs auditing service func (service *Service) Run(ctx context.Context) (err error) { defer mon.Task()(&ctx)(&err) diff --git a/satellite/peer.go b/satellite/peer.go index 9416f453a..f9717ce8d 100644 --- a/satellite/peer.go +++ b/satellite/peer.go @@ -489,13 +489,10 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten } // setup audit 2.0 - peer.Audit.ReservoirService, err = audit.NewReservoirService(peer.Log.Named("reservoir service"), + peer.Audit.ReservoirService = audit.NewReservoirService(peer.Log.Named("reservoir service"), peer.Metainfo.Loop, config, ) - if err != nil { - return nil, errs.Combine(err, peer.Close()) - } } { // setup garbage collection