storagenode/suspensions: added offline-suspension notificatio chore + tests
Change-Id: I2521cd2e7d08a1dd379e717a554a026c7508c18f
This commit is contained in:
parent
e0dca4042d
commit
f5ba8b8009
@ -46,11 +46,11 @@ type CacheStorage struct {
|
||||
type Cache struct {
|
||||
log *zap.Logger
|
||||
|
||||
db CacheStorage
|
||||
service *Service
|
||||
payoutEndpoint *payout.Endpoint
|
||||
payoutService *payout.Service
|
||||
trust *trust.Pool
|
||||
db CacheStorage
|
||||
service *Service
|
||||
payoutEndpoint *payout.Endpoint
|
||||
reputationService *reputation.Service
|
||||
trust *trust.Pool
|
||||
|
||||
maxSleep time.Duration
|
||||
Reputation *sync2.Cycle
|
||||
@ -58,17 +58,17 @@ type Cache struct {
|
||||
}
|
||||
|
||||
// NewCache creates new caching service instance.
|
||||
func NewCache(log *zap.Logger, config Config, db CacheStorage, service *Service, heldamountEndpoint *payout.Endpoint, heldamountService *payout.Service, trust *trust.Pool) *Cache {
|
||||
func NewCache(log *zap.Logger, config Config, db CacheStorage, service *Service, payoutEndpoint *payout.Endpoint, reputationService *reputation.Service, trust *trust.Pool) *Cache {
|
||||
return &Cache{
|
||||
log: log,
|
||||
db: db,
|
||||
service: service,
|
||||
payoutEndpoint: heldamountEndpoint,
|
||||
payoutService: heldamountService,
|
||||
trust: trust,
|
||||
maxSleep: config.MaxSleep,
|
||||
Reputation: sync2.NewCycle(config.ReputationSync),
|
||||
Storage: sync2.NewCycle(config.StorageSync),
|
||||
log: log,
|
||||
db: db,
|
||||
service: service,
|
||||
payoutEndpoint: payoutEndpoint,
|
||||
reputationService: reputationService,
|
||||
trust: trust,
|
||||
maxSleep: config.MaxSleep,
|
||||
Reputation: sync2.NewCycle(config.ReputationSync),
|
||||
Storage: sync2.NewCycle(config.StorageSync),
|
||||
}
|
||||
}
|
||||
|
||||
@ -155,8 +155,8 @@ func (cache *Cache) CacheReputationStats(ctx context.Context) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = cache.db.Reputation.Store(ctx, *stats); err != nil {
|
||||
cache.log.Error("err", zap.Error(err))
|
||||
if err = cache.reputationService.Store(ctx, *stats, satellite); err != nil {
|
||||
cache.log.Error("failed to store reputation", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -9,9 +9,7 @@ import (
|
||||
"github.com/spacemonkeygo/monkit/v3"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/private/version"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -108,30 +106,3 @@ func (service *Service) UnreadAmount(ctx context.Context) (_ int, err error) {
|
||||
|
||||
return amount, nil
|
||||
}
|
||||
|
||||
// NewVersionNotification - returns version update required notification.
|
||||
func NewVersionNotification(timesSent TimesNotified, suggestedVersion version.SemVer, senderID storj.NodeID) (_ NewNotification) {
|
||||
switch timesSent {
|
||||
case TimesNotifiedZero:
|
||||
return NewNotification{
|
||||
SenderID: senderID,
|
||||
Type: TypeCustom,
|
||||
Title: "Please update your Node to Version " + suggestedVersion.String(),
|
||||
Message: "It's time to update your Node's software, new version is available.",
|
||||
}
|
||||
case TimesNotifiedFirst:
|
||||
return NewNotification{
|
||||
SenderID: senderID,
|
||||
Type: TypeCustom,
|
||||
Title: "Please update your Node to Version " + suggestedVersion.String(),
|
||||
Message: "It's time to update your Node's software, you are running outdated version!",
|
||||
}
|
||||
default:
|
||||
return NewNotification{
|
||||
SenderID: senderID,
|
||||
Type: TypeCustom,
|
||||
Title: "Please update your Node to Version " + suggestedVersion.String(),
|
||||
Message: "Last chance to update your software! Your node is running outdated version!",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -276,6 +276,8 @@ type Peer struct {
|
||||
}
|
||||
|
||||
Bandwidth *bandwidth.Service
|
||||
|
||||
Reputation *reputation.Service
|
||||
}
|
||||
|
||||
// New creates a new Storage Node.
|
||||
@ -566,6 +568,15 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
|
||||
)
|
||||
}
|
||||
|
||||
{ // setup reputation service.
|
||||
peer.Reputation = reputation.NewService(
|
||||
peer.Log.Named("reputation:service"),
|
||||
peer.DB.Reputation(),
|
||||
peer.Identity.ID,
|
||||
peer.Notifications.Service,
|
||||
)
|
||||
}
|
||||
|
||||
{ // setup node stats service
|
||||
peer.NodeStats.Service = nodestats.NewService(
|
||||
peer.Log.Named("nodestats:service"),
|
||||
@ -585,7 +596,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
|
||||
},
|
||||
peer.NodeStats.Service,
|
||||
peer.Payout.Endpoint,
|
||||
peer.Payout.Service,
|
||||
peer.Reputation,
|
||||
peer.Storage2.Trust,
|
||||
)
|
||||
peer.Services.Add(lifecycle.Item{
|
||||
|
67
storagenode/reputation/service.go
Normal file
67
storagenode/reputation/service.go
Normal file
@ -0,0 +1,67 @@
|
||||
// Copyright (C) 2020 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package reputation
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/storj"
|
||||
"storj.io/storj/storagenode/notifications"
|
||||
)
|
||||
|
||||
// Service is the reputation service.
|
||||
//
|
||||
// architecture: Service
|
||||
type Service struct {
|
||||
log *zap.Logger
|
||||
|
||||
db DB
|
||||
nodeID storj.NodeID
|
||||
notifications *notifications.Service
|
||||
}
|
||||
|
||||
// NewService creates new instance of service.
|
||||
func NewService(log *zap.Logger, db DB, nodeID storj.NodeID, notifications *notifications.Service) *Service {
|
||||
return &Service{
|
||||
log: log,
|
||||
db: db,
|
||||
nodeID: nodeID,
|
||||
notifications: notifications,
|
||||
}
|
||||
}
|
||||
|
||||
// Store stores reputation stats into db, and notify's in case of offline suspension.
|
||||
func (s *Service) Store(ctx context.Context, stats Stats, satelliteID storj.NodeID) error {
|
||||
if err := s.db.Store(ctx, stats); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if stats.DisqualifiedAt == nil && stats.OfflineSuspendedAt != nil {
|
||||
s.notifyOfflineSuspension(ctx, satelliteID)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// NotifyOfflineSuspension notifies storagenode about offline suspension.
|
||||
func (s *Service) notifyOfflineSuspension(ctx context.Context, satelliteID storj.NodeID) {
|
||||
notification := NewSuspensionNotification(satelliteID, s.nodeID)
|
||||
|
||||
_, err := s.notifications.Receive(ctx, notification)
|
||||
if err != nil {
|
||||
s.log.Sugar().Errorf("Failed to receive notification", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// NewSuspensionNotification - returns offline suspension notification.
|
||||
func NewSuspensionNotification(satelliteID storj.NodeID, senderID storj.NodeID) (_ notifications.NewNotification) {
|
||||
return notifications.NewNotification{
|
||||
SenderID: senderID,
|
||||
Type: notifications.TypeCustom,
|
||||
Title: "Your Node was suspended!",
|
||||
Message: "This is a reminder that your Storage Node on " + satelliteID.String() + "Satellite is suspended",
|
||||
}
|
||||
}
|
@ -80,15 +80,15 @@ func (chore *Chore) Run(ctx context.Context) (err error) {
|
||||
now := chore.nowFn()
|
||||
switch {
|
||||
case chore.version.FirstTimeSpotted.Add(time.Hour*336).Before(now) && chore.version.TimesNotified == notifications.TimesNotifiedSecond:
|
||||
notification = notifications.NewVersionNotification(notifications.TimesNotifiedSecond, suggested, chore.nodeID)
|
||||
notification = NewVersionNotification(notifications.TimesNotifiedSecond, suggested, chore.nodeID)
|
||||
chore.version.TimesNotified = notifications.TimesNotifiedLast
|
||||
|
||||
case chore.version.FirstTimeSpotted.Add(time.Hour*144).Before(now) && chore.version.TimesNotified == notifications.TimesNotifiedFirst:
|
||||
notification = notifications.NewVersionNotification(notifications.TimesNotifiedFirst, suggested, chore.nodeID)
|
||||
notification = NewVersionNotification(notifications.TimesNotifiedFirst, suggested, chore.nodeID)
|
||||
chore.version.TimesNotified = notifications.TimesNotifiedSecond
|
||||
|
||||
case chore.version.FirstTimeSpotted.Add(time.Hour*96).Before(now) && chore.version.TimesNotified == notifications.TimesNotifiedZero:
|
||||
notification = notifications.NewVersionNotification(notifications.TimesNotifiedZero, suggested, chore.nodeID)
|
||||
notification = NewVersionNotification(notifications.TimesNotifiedZero, suggested, chore.nodeID)
|
||||
chore.version.TimesNotified = notifications.TimesNotifiedFirst
|
||||
default:
|
||||
return nil
|
||||
@ -161,3 +161,30 @@ func (chore *Chore) TestSetNow(now func() time.Time) {
|
||||
func (chore *Chore) TestCheckVersion() (relevance Relevance) {
|
||||
return chore.version
|
||||
}
|
||||
|
||||
// NewVersionNotification - returns version update required notification.
|
||||
func NewVersionNotification(timesSent notifications.TimesNotified, suggestedVersion version.SemVer, senderID storj.NodeID) (_ notifications.NewNotification) {
|
||||
switch timesSent {
|
||||
case notifications.TimesNotifiedZero:
|
||||
return notifications.NewNotification{
|
||||
SenderID: senderID,
|
||||
Type: notifications.TypeCustom,
|
||||
Title: "Please update your Node to Version " + suggestedVersion.String(),
|
||||
Message: "It's time to update your Node's software, new version is available.",
|
||||
}
|
||||
case notifications.TimesNotifiedFirst:
|
||||
return notifications.NewNotification{
|
||||
SenderID: senderID,
|
||||
Type: notifications.TypeCustom,
|
||||
Title: "Please update your Node to Version " + suggestedVersion.String(),
|
||||
Message: "It's time to update your Node's software, you are running outdated version!",
|
||||
}
|
||||
default:
|
||||
return notifications.NewNotification{
|
||||
SenderID: senderID,
|
||||
Type: notifications.TypeCustom,
|
||||
Title: "Please update your Node to Version " + suggestedVersion.String(),
|
||||
Message: "Last chance to update your software! Your node is running outdated version!",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user