From f5ba8b8009b37ce89f27590f68c64830ac113b85 Mon Sep 17 00:00:00 2001 From: Qweder93 Date: Wed, 21 Oct 2020 15:34:15 +0300 Subject: [PATCH] storagenode/suspensions: added offline-suspension notificatio chore + tests Change-Id: I2521cd2e7d08a1dd379e717a554a026c7508c18f --- storagenode/nodestats/cache.go | 34 +++++++------- storagenode/notifications/service.go | 29 ------------ storagenode/peer.go | 13 +++++- storagenode/reputation/service.go | 67 ++++++++++++++++++++++++++++ storagenode/version/chore.go | 33 ++++++++++++-- 5 files changed, 126 insertions(+), 50 deletions(-) create mode 100644 storagenode/reputation/service.go diff --git a/storagenode/nodestats/cache.go b/storagenode/nodestats/cache.go index d3d4731ac..2240e7c87 100644 --- a/storagenode/nodestats/cache.go +++ b/storagenode/nodestats/cache.go @@ -46,11 +46,11 @@ type CacheStorage struct { type Cache struct { log *zap.Logger - db CacheStorage - service *Service - payoutEndpoint *payout.Endpoint - payoutService *payout.Service - trust *trust.Pool + db CacheStorage + service *Service + payoutEndpoint *payout.Endpoint + reputationService *reputation.Service + trust *trust.Pool maxSleep time.Duration Reputation *sync2.Cycle @@ -58,17 +58,17 @@ type Cache struct { } // NewCache creates new caching service instance. -func NewCache(log *zap.Logger, config Config, db CacheStorage, service *Service, heldamountEndpoint *payout.Endpoint, heldamountService *payout.Service, trust *trust.Pool) *Cache { +func NewCache(log *zap.Logger, config Config, db CacheStorage, service *Service, payoutEndpoint *payout.Endpoint, reputationService *reputation.Service, trust *trust.Pool) *Cache { return &Cache{ - log: log, - db: db, - service: service, - payoutEndpoint: heldamountEndpoint, - payoutService: heldamountService, - trust: trust, - maxSleep: config.MaxSleep, - Reputation: sync2.NewCycle(config.ReputationSync), - Storage: sync2.NewCycle(config.StorageSync), + log: log, + db: db, + service: service, + payoutEndpoint: payoutEndpoint, + reputationService: reputationService, + trust: trust, + maxSleep: config.MaxSleep, + Reputation: sync2.NewCycle(config.ReputationSync), + Storage: sync2.NewCycle(config.StorageSync), } } @@ -155,8 +155,8 @@ func (cache *Cache) CacheReputationStats(ctx context.Context) (err error) { return err } - if err = cache.db.Reputation.Store(ctx, *stats); err != nil { - cache.log.Error("err", zap.Error(err)) + if err = cache.reputationService.Store(ctx, *stats, satellite); err != nil { + cache.log.Error("failed to store reputation", zap.Error(err)) return err } diff --git a/storagenode/notifications/service.go b/storagenode/notifications/service.go index dd906d6ca..63fbe0991 100644 --- a/storagenode/notifications/service.go +++ b/storagenode/notifications/service.go @@ -9,9 +9,7 @@ import ( "github.com/spacemonkeygo/monkit/v3" "go.uber.org/zap" - "storj.io/common/storj" "storj.io/common/uuid" - "storj.io/private/version" ) var ( @@ -108,30 +106,3 @@ func (service *Service) UnreadAmount(ctx context.Context) (_ int, err error) { return amount, nil } - -// NewVersionNotification - returns version update required notification. -func NewVersionNotification(timesSent TimesNotified, suggestedVersion version.SemVer, senderID storj.NodeID) (_ NewNotification) { - switch timesSent { - case TimesNotifiedZero: - return NewNotification{ - SenderID: senderID, - Type: TypeCustom, - Title: "Please update your Node to Version " + suggestedVersion.String(), - Message: "It's time to update your Node's software, new version is available.", - } - case TimesNotifiedFirst: - return NewNotification{ - SenderID: senderID, - Type: TypeCustom, - Title: "Please update your Node to Version " + suggestedVersion.String(), - Message: "It's time to update your Node's software, you are running outdated version!", - } - default: - return NewNotification{ - SenderID: senderID, - Type: TypeCustom, - Title: "Please update your Node to Version " + suggestedVersion.String(), - Message: "Last chance to update your software! Your node is running outdated version!", - } - } -} diff --git a/storagenode/peer.go b/storagenode/peer.go index 3e35e3483..697c0a2b0 100644 --- a/storagenode/peer.go +++ b/storagenode/peer.go @@ -276,6 +276,8 @@ type Peer struct { } Bandwidth *bandwidth.Service + + Reputation *reputation.Service } // New creates a new Storage Node. @@ -566,6 +568,15 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten ) } + { // setup reputation service. + peer.Reputation = reputation.NewService( + peer.Log.Named("reputation:service"), + peer.DB.Reputation(), + peer.Identity.ID, + peer.Notifications.Service, + ) + } + { // setup node stats service peer.NodeStats.Service = nodestats.NewService( peer.Log.Named("nodestats:service"), @@ -585,7 +596,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten }, peer.NodeStats.Service, peer.Payout.Endpoint, - peer.Payout.Service, + peer.Reputation, peer.Storage2.Trust, ) peer.Services.Add(lifecycle.Item{ diff --git a/storagenode/reputation/service.go b/storagenode/reputation/service.go new file mode 100644 index 000000000..3f179e9fa --- /dev/null +++ b/storagenode/reputation/service.go @@ -0,0 +1,67 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +package reputation + +import ( + "context" + + "go.uber.org/zap" + + "storj.io/common/storj" + "storj.io/storj/storagenode/notifications" +) + +// Service is the reputation service. +// +// architecture: Service +type Service struct { + log *zap.Logger + + db DB + nodeID storj.NodeID + notifications *notifications.Service +} + +// NewService creates new instance of service. +func NewService(log *zap.Logger, db DB, nodeID storj.NodeID, notifications *notifications.Service) *Service { + return &Service{ + log: log, + db: db, + nodeID: nodeID, + notifications: notifications, + } +} + +// Store stores reputation stats into db, and notify's in case of offline suspension. +func (s *Service) Store(ctx context.Context, stats Stats, satelliteID storj.NodeID) error { + if err := s.db.Store(ctx, stats); err != nil { + return err + } + + if stats.DisqualifiedAt == nil && stats.OfflineSuspendedAt != nil { + s.notifyOfflineSuspension(ctx, satelliteID) + } + + return nil +} + +// NotifyOfflineSuspension notifies storagenode about offline suspension. +func (s *Service) notifyOfflineSuspension(ctx context.Context, satelliteID storj.NodeID) { + notification := NewSuspensionNotification(satelliteID, s.nodeID) + + _, err := s.notifications.Receive(ctx, notification) + if err != nil { + s.log.Sugar().Errorf("Failed to receive notification", err.Error()) + } +} + +// NewSuspensionNotification - returns offline suspension notification. +func NewSuspensionNotification(satelliteID storj.NodeID, senderID storj.NodeID) (_ notifications.NewNotification) { + return notifications.NewNotification{ + SenderID: senderID, + Type: notifications.TypeCustom, + Title: "Your Node was suspended!", + Message: "This is a reminder that your Storage Node on " + satelliteID.String() + "Satellite is suspended", + } +} diff --git a/storagenode/version/chore.go b/storagenode/version/chore.go index 9f1773bca..10668c391 100644 --- a/storagenode/version/chore.go +++ b/storagenode/version/chore.go @@ -80,15 +80,15 @@ func (chore *Chore) Run(ctx context.Context) (err error) { now := chore.nowFn() switch { case chore.version.FirstTimeSpotted.Add(time.Hour*336).Before(now) && chore.version.TimesNotified == notifications.TimesNotifiedSecond: - notification = notifications.NewVersionNotification(notifications.TimesNotifiedSecond, suggested, chore.nodeID) + notification = NewVersionNotification(notifications.TimesNotifiedSecond, suggested, chore.nodeID) chore.version.TimesNotified = notifications.TimesNotifiedLast case chore.version.FirstTimeSpotted.Add(time.Hour*144).Before(now) && chore.version.TimesNotified == notifications.TimesNotifiedFirst: - notification = notifications.NewVersionNotification(notifications.TimesNotifiedFirst, suggested, chore.nodeID) + notification = NewVersionNotification(notifications.TimesNotifiedFirst, suggested, chore.nodeID) chore.version.TimesNotified = notifications.TimesNotifiedSecond case chore.version.FirstTimeSpotted.Add(time.Hour*96).Before(now) && chore.version.TimesNotified == notifications.TimesNotifiedZero: - notification = notifications.NewVersionNotification(notifications.TimesNotifiedZero, suggested, chore.nodeID) + notification = NewVersionNotification(notifications.TimesNotifiedZero, suggested, chore.nodeID) chore.version.TimesNotified = notifications.TimesNotifiedFirst default: return nil @@ -161,3 +161,30 @@ func (chore *Chore) TestSetNow(now func() time.Time) { func (chore *Chore) TestCheckVersion() (relevance Relevance) { return chore.version } + +// NewVersionNotification - returns version update required notification. +func NewVersionNotification(timesSent notifications.TimesNotified, suggestedVersion version.SemVer, senderID storj.NodeID) (_ notifications.NewNotification) { + switch timesSent { + case notifications.TimesNotifiedZero: + return notifications.NewNotification{ + SenderID: senderID, + Type: notifications.TypeCustom, + Title: "Please update your Node to Version " + suggestedVersion.String(), + Message: "It's time to update your Node's software, new version is available.", + } + case notifications.TimesNotifiedFirst: + return notifications.NewNotification{ + SenderID: senderID, + Type: notifications.TypeCustom, + Title: "Please update your Node to Version " + suggestedVersion.String(), + Message: "It's time to update your Node's software, you are running outdated version!", + } + default: + return notifications.NewNotification{ + SenderID: senderID, + Type: notifications.TypeCustom, + Title: "Please update your Node to Version " + suggestedVersion.String(), + Message: "Last chance to update your software! Your node is running outdated version!", + } + } +}