satellite/audit: mv ReservoirService into its own file (#2886)
This commit is contained in:
parent
599324c364
commit
49303ea3ac
66
satellite/audit/reservoirservice.go
Normal file
66
satellite/audit/reservoirservice.go
Normal file
@ -0,0 +1,66 @@
|
|||||||
|
// Copyright (C) 2019 Storj Labs, Inc.
|
||||||
|
// See LICENSE for copying information.
|
||||||
|
|
||||||
|
package audit
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"math/rand"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"storj.io/storj/internal/sync2"
|
||||||
|
"storj.io/storj/pkg/storj"
|
||||||
|
"storj.io/storj/satellite/metainfo"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ReservoirService is a temp name for the service struct during the audit 2.0 refactor.
|
||||||
|
// Once V3-2363 and V3-2364 are implemented, ReservoirService will replace the existing Service struct.
|
||||||
|
type ReservoirService struct {
|
||||||
|
log *zap.Logger
|
||||||
|
|
||||||
|
reservoirSlots int
|
||||||
|
Reservoirs map[storj.NodeID]*Reservoir
|
||||||
|
rand *rand.Rand
|
||||||
|
|
||||||
|
MetainfoLoop *metainfo.Loop
|
||||||
|
Loop sync2.Cycle
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewReservoirService instantiates ReservoirService
|
||||||
|
func NewReservoirService(log *zap.Logger, metaLoop *metainfo.Loop, config Config) *ReservoirService {
|
||||||
|
return &ReservoirService{
|
||||||
|
log: log,
|
||||||
|
|
||||||
|
reservoirSlots: config.Slots,
|
||||||
|
rand: rand.New(rand.NewSource(time.Now().Unix())),
|
||||||
|
|
||||||
|
MetainfoLoop: metaLoop,
|
||||||
|
Loop: *sync2.NewCycle(config.Interval),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run runs auditing service 2.0
|
||||||
|
func (service *ReservoirService) Run(ctx context.Context) (err error) {
|
||||||
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
service.log.Info("audit 2.0 is starting up")
|
||||||
|
|
||||||
|
return service.Loop.Run(ctx, func(ctx context.Context) (err error) {
|
||||||
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
pathCollector := NewPathCollector(service.reservoirSlots, service.rand)
|
||||||
|
err = service.MetainfoLoop.Join(ctx, pathCollector)
|
||||||
|
if err != nil {
|
||||||
|
service.log.Error("error joining metainfoloop", zap.Error(err))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
service.Reservoirs = pathCollector.Reservoirs
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close halts the reservoir service loop
|
||||||
|
func (service *ReservoirService) Close() error {
|
||||||
|
service.Loop.Close()
|
||||||
|
return nil
|
||||||
|
}
|
@ -5,7 +5,6 @@ package audit
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"math/rand"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/zeebo/errs"
|
"github.com/zeebo/errs"
|
||||||
@ -61,50 +60,6 @@ func NewService(log *zap.Logger, config Config, metainfo *metainfo.Service,
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReservoirService is a temp name for the service struct during the audit 2.0 refactor.
|
|
||||||
// Once V3-2363 and V3-2364 are implemented, Service2 will replace the existing Service struct.
|
|
||||||
type ReservoirService struct {
|
|
||||||
log *zap.Logger
|
|
||||||
|
|
||||||
reservoirSlots int
|
|
||||||
Reservoirs map[storj.NodeID]*Reservoir
|
|
||||||
rand *rand.Rand
|
|
||||||
|
|
||||||
MetainfoLoop *metainfo.Loop
|
|
||||||
Loop sync2.Cycle
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewReservoirService instantiates Service2
|
|
||||||
func NewReservoirService(log *zap.Logger, metaLoop *metainfo.Loop, config Config) (*ReservoirService, error) {
|
|
||||||
return &ReservoirService{
|
|
||||||
log: log,
|
|
||||||
|
|
||||||
reservoirSlots: config.Slots,
|
|
||||||
rand: rand.New(rand.NewSource(time.Now().Unix())),
|
|
||||||
|
|
||||||
MetainfoLoop: metaLoop,
|
|
||||||
Loop: *sync2.NewCycle(config.Interval),
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Run runs auditing service 2.0
|
|
||||||
func (service *ReservoirService) Run(ctx context.Context) (err error) {
|
|
||||||
defer mon.Task()(&ctx)(&err)
|
|
||||||
service.log.Info("audit 2.0 is starting up")
|
|
||||||
|
|
||||||
return service.Loop.Run(ctx, func(ctx context.Context) (err error) {
|
|
||||||
defer mon.Task()(&ctx)(&err)
|
|
||||||
pathCollector := NewPathCollector(service.reservoirSlots, service.rand)
|
|
||||||
err = service.MetainfoLoop.Join(ctx, pathCollector)
|
|
||||||
if err != nil {
|
|
||||||
service.log.Error("error joining metainfoloop", zap.Error(err))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
service.Reservoirs = pathCollector.Reservoirs
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Run runs auditing service
|
// Run runs auditing service
|
||||||
func (service *Service) Run(ctx context.Context) (err error) {
|
func (service *Service) Run(ctx context.Context) (err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
@ -489,13 +489,10 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
|
|||||||
}
|
}
|
||||||
|
|
||||||
// setup audit 2.0
|
// setup audit 2.0
|
||||||
peer.Audit.ReservoirService, err = audit.NewReservoirService(peer.Log.Named("reservoir service"),
|
peer.Audit.ReservoirService = audit.NewReservoirService(peer.Log.Named("reservoir service"),
|
||||||
peer.Metainfo.Loop,
|
peer.Metainfo.Loop,
|
||||||
config,
|
config,
|
||||||
)
|
)
|
||||||
if err != nil {
|
|
||||||
return nil, errs.Combine(err, peer.Close())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
{ // setup garbage collection
|
{ // setup garbage collection
|
||||||
|
Loading…
Reference in New Issue
Block a user