storagenode: add pkg/debug support
Change-Id: If941095b886c28a0d53fff4c9bf9fa0ce7471dea
This commit is contained in:
parent
a2b2bc676b
commit
10be538602
@ -27,7 +27,7 @@ type Config struct {
|
||||
type Service struct {
|
||||
log *zap.Logger
|
||||
db DB
|
||||
Loop sync2.Cycle
|
||||
Loop *sync2.Cycle
|
||||
}
|
||||
|
||||
// NewService creates a new bandwidth service.
|
||||
@ -35,7 +35,7 @@ func NewService(log *zap.Logger, db DB, config Config) *Service {
|
||||
return &Service{
|
||||
log: log,
|
||||
db: db,
|
||||
Loop: *sync2.NewCycle(config.Interval),
|
||||
Loop: sync2.NewCycle(config.Interval),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -31,7 +31,7 @@ type Service struct {
|
||||
pieces *pieces.Store
|
||||
usedSerials piecestore.UsedSerials
|
||||
|
||||
Loop sync2.Cycle
|
||||
Loop *sync2.Cycle
|
||||
}
|
||||
|
||||
// NewService creates a new collector service.
|
||||
@ -40,7 +40,7 @@ func NewService(log *zap.Logger, pieces *pieces.Store, usedSerials piecestore.Us
|
||||
log: log,
|
||||
pieces: pieces,
|
||||
usedSerials: usedSerials,
|
||||
Loop: *sync2.NewCycle(config.Interval),
|
||||
Loop: sync2.NewCycle(config.Interval),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -29,8 +29,8 @@ type Chore struct {
|
||||
config Config
|
||||
|
||||
exitingMap sync.Map
|
||||
Loop sync2.Cycle
|
||||
limiter sync2.Limiter
|
||||
Loop *sync2.Cycle
|
||||
limiter *sync2.Limiter
|
||||
}
|
||||
|
||||
// NewChore instantiates Chore.
|
||||
@ -42,8 +42,8 @@ func NewChore(log *zap.Logger, config Config, store *pieces.Store, trust *trust.
|
||||
trust: trust,
|
||||
dialer: dialer,
|
||||
config: config,
|
||||
Loop: *sync2.NewCycle(config.ChoreInterval),
|
||||
limiter: *sync2.NewLimiter(config.NumWorkers),
|
||||
Loop: sync2.NewCycle(config.ChoreInterval),
|
||||
limiter: sync2.NewLimiter(config.NumWorkers),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -43,7 +43,7 @@ type Service struct {
|
||||
usageDB bandwidth.DB
|
||||
allocatedDiskSpace int64
|
||||
allocatedBandwidth int64
|
||||
Loop sync2.Cycle
|
||||
Loop *sync2.Cycle
|
||||
Config Config
|
||||
}
|
||||
|
||||
@ -58,7 +58,7 @@ func NewService(log *zap.Logger, store *pieces.Store, contact *contact.Service,
|
||||
usageDB: usageDB,
|
||||
allocatedDiskSpace: allocatedDiskSpace,
|
||||
allocatedBandwidth: allocatedBandwidth,
|
||||
Loop: *sync2.NewCycle(interval),
|
||||
Loop: sync2.NewCycle(interval),
|
||||
Config: config,
|
||||
}
|
||||
}
|
||||
|
@ -50,8 +50,8 @@ type Cache struct {
|
||||
trust *trust.Pool
|
||||
|
||||
maxSleep time.Duration
|
||||
reputationCycle sync2.Cycle
|
||||
storageCycle sync2.Cycle
|
||||
Reputation *sync2.Cycle
|
||||
Storage *sync2.Cycle
|
||||
}
|
||||
|
||||
// NewCache creates new caching service instance
|
||||
@ -62,8 +62,8 @@ func NewCache(log *zap.Logger, config Config, db CacheStorage, service *Service,
|
||||
service: service,
|
||||
trust: trust,
|
||||
maxSleep: config.MaxSleep,
|
||||
reputationCycle: *sync2.NewCycle(config.ReputationSync),
|
||||
storageCycle: *sync2.NewCycle(config.StorageSync),
|
||||
Reputation: sync2.NewCycle(config.ReputationSync),
|
||||
Storage: sync2.NewCycle(config.StorageSync),
|
||||
}
|
||||
}
|
||||
|
||||
@ -71,7 +71,7 @@ func NewCache(log *zap.Logger, config Config, db CacheStorage, service *Service,
|
||||
func (cache *Cache) Run(ctx context.Context) error {
|
||||
var group errgroup.Group
|
||||
|
||||
cache.reputationCycle.Start(ctx, &group, func(ctx context.Context) error {
|
||||
cache.Reputation.Start(ctx, &group, func(ctx context.Context) error {
|
||||
if err := cache.sleep(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -83,7 +83,7 @@ func (cache *Cache) Run(ctx context.Context) error {
|
||||
|
||||
return nil
|
||||
})
|
||||
cache.storageCycle.Start(ctx, &group, func(ctx context.Context) error {
|
||||
cache.Storage.Start(ctx, &group, func(ctx context.Context) error {
|
||||
if err := cache.sleep(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -176,7 +176,7 @@ func (cache *Cache) satelliteLoop(ctx context.Context, fn func(id storj.NodeID)
|
||||
// Close closes underlying cycles
|
||||
func (cache *Cache) Close() error {
|
||||
defer mon.Task()(nil)(nil)
|
||||
cache.reputationCycle.Close()
|
||||
cache.storageCycle.Close()
|
||||
cache.Reputation.Close()
|
||||
cache.Storage.Close()
|
||||
return nil
|
||||
}
|
||||
|
@ -102,8 +102,8 @@ type Service struct {
|
||||
orders DB
|
||||
trust *trust.Pool
|
||||
|
||||
Sender sync2.Cycle
|
||||
Cleanup sync2.Cycle
|
||||
Sender *sync2.Cycle
|
||||
Cleanup *sync2.Cycle
|
||||
}
|
||||
|
||||
// NewService creates an order service.
|
||||
@ -115,8 +115,8 @@ func NewService(log *zap.Logger, dialer rpc.Dialer, orders DB, trust *trust.Pool
|
||||
config: config,
|
||||
trust: trust,
|
||||
|
||||
Sender: *sync2.NewCycle(config.SenderInterval),
|
||||
Cleanup: *sync2.NewCycle(config.CleanupInterval),
|
||||
Sender: sync2.NewCycle(config.SenderInterval),
|
||||
Cleanup: sync2.NewCycle(config.CleanupInterval),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -386,6 +386,8 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
|
||||
Run: peer.Storage2.CacheService.Run,
|
||||
Close: peer.Storage2.CacheService.Close,
|
||||
})
|
||||
peer.Debug.Server.Panel.Add(
|
||||
debug.Cycle("Piecestore Cache", peer.Storage2.CacheService.Loop))
|
||||
|
||||
peer.Storage2.Monitor = monitor.NewService(
|
||||
log.Named("piecestore:monitor"),
|
||||
@ -403,6 +405,8 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
|
||||
Run: peer.Storage2.Monitor.Run,
|
||||
Close: peer.Storage2.Monitor.Close,
|
||||
})
|
||||
peer.Debug.Server.Panel.Add(
|
||||
debug.Cycle("Piecestore Monitor", peer.Storage2.Monitor.Loop))
|
||||
|
||||
peer.Storage2.RetainService = retain.NewService(
|
||||
peer.Log.Named("retain"),
|
||||
@ -457,6 +461,10 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
|
||||
Run: peer.Storage2.Orders.Run,
|
||||
Close: peer.Storage2.Orders.Close,
|
||||
})
|
||||
peer.Debug.Server.Panel.Add(
|
||||
debug.Cycle("Orders Sender", peer.Storage2.Orders.Sender))
|
||||
peer.Debug.Server.Panel.Add(
|
||||
debug.Cycle("Orders Cleanup", peer.Storage2.Orders.Cleanup))
|
||||
}
|
||||
|
||||
{ // setup node stats service
|
||||
@ -476,12 +484,15 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
|
||||
peer.NodeStats.Service,
|
||||
peer.Storage2.Trust,
|
||||
)
|
||||
|
||||
peer.Services.Add(lifecycle.Item{
|
||||
Name: "nodestats:cache",
|
||||
Run: peer.NodeStats.Cache.Run,
|
||||
Close: peer.NodeStats.Cache.Close,
|
||||
})
|
||||
peer.Debug.Server.Panel.Add(
|
||||
debug.Cycle("Node Stats Cache Reputation", peer.NodeStats.Cache.Reputation))
|
||||
peer.Debug.Server.Panel.Add(
|
||||
debug.Cycle("Node Stats Cache Storage", peer.NodeStats.Cache.Storage))
|
||||
}
|
||||
|
||||
{ // setup storage node operator dashboard
|
||||
@ -567,6 +578,8 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
|
||||
Run: peer.GracefulExit.Chore.Run,
|
||||
Close: peer.GracefulExit.Chore.Close,
|
||||
})
|
||||
peer.Debug.Server.Panel.Add(
|
||||
debug.Cycle("Graceful Exit", peer.GracefulExit.Chore.Loop))
|
||||
}
|
||||
|
||||
peer.Collector = collector.NewService(peer.Log.Named("collector"), peer.Storage2.Store, peer.DB.UsedSerials(), config.Collector)
|
||||
@ -575,6 +588,8 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
|
||||
Run: peer.Collector.Run,
|
||||
Close: peer.Collector.Close,
|
||||
})
|
||||
peer.Debug.Server.Panel.Add(
|
||||
debug.Cycle("Collector", peer.Collector.Loop))
|
||||
|
||||
peer.Bandwidth = bandwidth.NewService(peer.Log.Named("bandwidth"), peer.DB.Bandwidth(), config.Bandwidth)
|
||||
peer.Services.Add(lifecycle.Item{
|
||||
@ -582,6 +597,8 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
|
||||
Run: peer.Bandwidth.Run,
|
||||
Close: peer.Bandwidth.Close,
|
||||
})
|
||||
peer.Debug.Server.Panel.Add(
|
||||
debug.Cycle("Bandwidth", peer.Bandwidth.Loop))
|
||||
|
||||
return peer, nil
|
||||
}
|
||||
|
@ -23,7 +23,7 @@ type CacheService struct {
|
||||
log *zap.Logger
|
||||
usageCache *BlobsUsageCache
|
||||
store *Store
|
||||
loop sync2.Cycle
|
||||
Loop *sync2.Cycle
|
||||
}
|
||||
|
||||
// NewService creates a new cache service that updates the space usage cache on startup and syncs the cache values to
|
||||
@ -33,7 +33,7 @@ func NewService(log *zap.Logger, usageCache *BlobsUsageCache, pieces *Store, int
|
||||
log: log,
|
||||
usageCache: usageCache,
|
||||
store: pieces,
|
||||
loop: *sync2.NewCycle(interval),
|
||||
Loop: sync2.NewCycle(interval),
|
||||
}
|
||||
}
|
||||
|
||||
@ -68,7 +68,7 @@ func (service *CacheService) Run(ctx context.Context) (err error) {
|
||||
service.log.Error("error during init space usage db: ", zap.Error(err))
|
||||
}
|
||||
|
||||
return service.loop.Run(ctx, func(ctx context.Context) (err error) {
|
||||
return service.Loop.Run(ctx, func(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
// on a loop sync the cache values to the db so that we have the them saved
|
||||
@ -125,7 +125,7 @@ func (service *CacheService) Init(ctx context.Context) (err error) {
|
||||
|
||||
// Close closes the loop
|
||||
func (service *CacheService) Close() (err error) {
|
||||
service.loop.Close()
|
||||
service.Loop.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user