From dc6a8cbcf337ba70a0038d675aed7a63d2cc30ac Mon Sep 17 00:00:00 2001 From: Qweder93 Date: Fri, 29 Jan 2021 16:36:16 +0200 Subject: [PATCH] storagenode/suspension: fixed timings of downtime suspension notifications Change-Id: Ie5b4f369efc0e66910b2f939aa8c5b11dc92946c --- .../notifications/notifications_test.go | 92 ++++----- storagenode/reputation/reputation_test.go | 190 ++++++++++++++---- storagenode/reputation/service.go | 49 +++-- 3 files changed, 234 insertions(+), 97 deletions(-) diff --git a/storagenode/notifications/notifications_test.go b/storagenode/notifications/notifications_test.go index 2d7ef35f2..a6cf19292 100644 --- a/storagenode/notifications/notifications_test.go +++ b/storagenode/notifications/notifications_test.go @@ -7,7 +7,7 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "storj.io/common/identity/testidentity" "storj.io/common/storj" @@ -51,43 +51,43 @@ func TestNotificationsDB(t *testing.T) { } notificationFromDB0, err := notificationsdb.Insert(ctx, expectedNotification0) - assert.NoError(t, err) - assert.Equal(t, expectedNotification0.SenderID, notificationFromDB0.SenderID) - assert.Equal(t, expectedNotification0.Type, notificationFromDB0.Type) - assert.Equal(t, expectedNotification0.Title, notificationFromDB0.Title) - assert.Equal(t, expectedNotification0.Message, notificationFromDB0.Message) + require.NoError(t, err) + require.Equal(t, expectedNotification0.SenderID, notificationFromDB0.SenderID) + require.Equal(t, expectedNotification0.Type, notificationFromDB0.Type) + require.Equal(t, expectedNotification0.Title, notificationFromDB0.Title) + require.Equal(t, expectedNotification0.Message, notificationFromDB0.Message) // Ensure that every insert gets a different "created at" time. waitForTimeToChange() notificationFromDB1, err := notificationsdb.Insert(ctx, expectedNotification1) - assert.NoError(t, err) - assert.Equal(t, expectedNotification1.SenderID, notificationFromDB1.SenderID) - assert.Equal(t, expectedNotification1.Type, notificationFromDB1.Type) - assert.Equal(t, expectedNotification1.Title, notificationFromDB1.Title) - assert.Equal(t, expectedNotification1.Message, notificationFromDB1.Message) + require.NoError(t, err) + require.Equal(t, expectedNotification1.SenderID, notificationFromDB1.SenderID) + require.Equal(t, expectedNotification1.Type, notificationFromDB1.Type) + require.Equal(t, expectedNotification1.Title, notificationFromDB1.Title) + require.Equal(t, expectedNotification1.Message, notificationFromDB1.Message) waitForTimeToChange() notificationFromDB2, err := notificationsdb.Insert(ctx, expectedNotification2) - assert.NoError(t, err) - assert.Equal(t, expectedNotification2.SenderID, notificationFromDB2.SenderID) - assert.Equal(t, expectedNotification2.Type, notificationFromDB2.Type) - assert.Equal(t, expectedNotification2.Title, notificationFromDB2.Title) - assert.Equal(t, expectedNotification2.Message, notificationFromDB2.Message) + require.NoError(t, err) + require.Equal(t, expectedNotification2.SenderID, notificationFromDB2.SenderID) + require.Equal(t, expectedNotification2.Type, notificationFromDB2.Type) + require.Equal(t, expectedNotification2.Title, notificationFromDB2.Title) + require.Equal(t, expectedNotification2.Message, notificationFromDB2.Message) page := notifications.Page{} // test List method to return right form of page depending on cursor. t.Run("test paged list", func(t *testing.T) { page, err = notificationsdb.List(ctx, notificationCursor) - assert.NoError(t, err) - assert.Equal(t, 2, len(page.Notifications)) - assert.Equal(t, notificationFromDB1, page.Notifications[1]) - assert.Equal(t, notificationFromDB2, page.Notifications[0]) - assert.Equal(t, notificationCursor.Limit, page.Limit) - assert.Equal(t, uint64(0), page.Offset) - assert.Equal(t, uint(2), page.PageCount) - assert.Equal(t, uint64(3), page.TotalCount) - assert.Equal(t, uint(1), page.CurrentPage) + require.NoError(t, err) + require.Equal(t, 2, len(page.Notifications)) + require.Equal(t, notificationFromDB1, page.Notifications[1]) + require.Equal(t, notificationFromDB2, page.Notifications[0]) + require.Equal(t, notificationCursor.Limit, page.Limit) + require.Equal(t, uint64(0), page.Offset) + require.Equal(t, uint(2), page.PageCount) + require.Equal(t, uint64(3), page.TotalCount) + require.Equal(t, uint(1), page.CurrentPage) }) notificationCursor = notifications.Cursor{ @@ -98,32 +98,32 @@ func TestNotificationsDB(t *testing.T) { // test Read method to make specific notification's status as read. t.Run("test notification read", func(t *testing.T) { err = notificationsdb.Read(ctx, notificationFromDB0.ID) - assert.NoError(t, err) + require.NoError(t, err) page, err = notificationsdb.List(ctx, notificationCursor) - assert.NoError(t, err) - assert.NotEqual(t, page.Notifications[2].ReadAt, (*time.Time)(nil)) + require.NoError(t, err) + require.NotEqual(t, page.Notifications[2].ReadAt, (*time.Time)(nil)) err = notificationsdb.Read(ctx, notificationFromDB1.ID) - assert.NoError(t, err) + require.NoError(t, err) page, err = notificationsdb.List(ctx, notificationCursor) - assert.NoError(t, err) - assert.NotEqual(t, page.Notifications[1].ReadAt, (*time.Time)(nil)) + require.NoError(t, err) + require.NotEqual(t, page.Notifications[1].ReadAt, (*time.Time)(nil)) - assert.Equal(t, page.Notifications[0].ReadAt, (*time.Time)(nil)) + require.Equal(t, page.Notifications[0].ReadAt, (*time.Time)(nil)) }) // test ReadAll method to make all notifications' status as read. t.Run("test notification read all", func(t *testing.T) { err = notificationsdb.ReadAll(ctx) - assert.NoError(t, err) + require.NoError(t, err) page, err = notificationsdb.List(ctx, notificationCursor) - assert.NoError(t, err) - assert.NotEqual(t, page.Notifications[2].ReadAt, (*time.Time)(nil)) - assert.NotEqual(t, page.Notifications[1].ReadAt, (*time.Time)(nil)) - assert.NotEqual(t, page.Notifications[0].ReadAt, (*time.Time)(nil)) + require.NoError(t, err) + require.NotEqual(t, page.Notifications[2].ReadAt, (*time.Time)(nil)) + require.NotEqual(t, page.Notifications[1].ReadAt, (*time.Time)(nil)) + require.NotEqual(t, page.Notifications[0].ReadAt, (*time.Time)(nil)) }) }) } @@ -140,25 +140,25 @@ func TestEmptyNotificationsDB(t *testing.T) { // test List method to return right form of page depending on cursor with empty database. t.Run("test empty paged list", func(t *testing.T) { page, err := notificationsdb.List(ctx, notificationCursor) - assert.NoError(t, err) - assert.Equal(t, len(page.Notifications), 0) - assert.Equal(t, page.Limit, notificationCursor.Limit) - assert.Equal(t, page.Offset, uint64(0)) - assert.Equal(t, page.PageCount, uint(0)) - assert.Equal(t, page.TotalCount, uint64(0)) - assert.Equal(t, page.CurrentPage, uint(0)) + require.NoError(t, err) + require.Equal(t, len(page.Notifications), 0) + require.Equal(t, page.Limit, notificationCursor.Limit) + require.Equal(t, page.Offset, uint64(0)) + require.Equal(t, page.PageCount, uint(0)) + require.Equal(t, page.TotalCount, uint64(0)) + require.Equal(t, page.CurrentPage, uint(0)) }) // test notification read with not existing id. t.Run("test notification read with not existing id", func(t *testing.T) { err := notificationsdb.Read(ctx, testrand.UUID()) - assert.Error(t, err, "no rows affected") + require.Error(t, err, "no rows affected") }) // test read for all notifications if they don't exist. t.Run("test notification readAll on empty page", func(t *testing.T) { err := notificationsdb.ReadAll(ctx) - assert.NoError(t, err) + require.NoError(t, err) }) }) } diff --git a/storagenode/reputation/reputation_test.go b/storagenode/reputation/reputation_test.go index 12a2baf89..0ff184aa0 100644 --- a/storagenode/reputation/reputation_test.go +++ b/storagenode/reputation/reputation_test.go @@ -7,13 +7,15 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" "storj.io/common/pb" + "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/common/testrand" "storj.io/storj/storagenode" + "storj.io/storj/storagenode/notifications" "storj.io/storj/storagenode/reputation" "storj.io/storj/storagenode/storagenodedb/storagenodedbtest" ) @@ -46,22 +48,22 @@ func TestReputationDBGetInsert(t *testing.T) { t.Run("insert", func(t *testing.T) { err := reputationDB.Store(ctx, stats) - assert.NoError(t, err) + require.NoError(t, err) }) t.Run("get", func(t *testing.T) { res, err := reputationDB.Get(ctx, stats.SatelliteID) - assert.NoError(t, err) + require.NoError(t, err) - assert.Equal(t, res.SatelliteID, stats.SatelliteID) - assert.True(t, res.DisqualifiedAt.Equal(*stats.DisqualifiedAt)) - assert.True(t, res.SuspendedAt.Equal(*stats.SuspendedAt)) - assert.True(t, res.UpdatedAt.Equal(stats.UpdatedAt)) - assert.True(t, res.JoinedAt.Equal(stats.JoinedAt)) - assert.True(t, res.OfflineSuspendedAt.Equal(*stats.OfflineSuspendedAt)) - assert.True(t, res.OfflineUnderReviewAt.Equal(*stats.OfflineUnderReviewAt)) - assert.Equal(t, res.OnlineScore, stats.OnlineScore) - assert.Nil(t, res.AuditHistory) + require.Equal(t, res.SatelliteID, stats.SatelliteID) + require.True(t, res.DisqualifiedAt.Equal(*stats.DisqualifiedAt)) + require.True(t, res.SuspendedAt.Equal(*stats.SuspendedAt)) + require.True(t, res.UpdatedAt.Equal(stats.UpdatedAt)) + require.True(t, res.JoinedAt.Equal(stats.JoinedAt)) + require.True(t, res.OfflineSuspendedAt.Equal(*stats.OfflineSuspendedAt)) + require.True(t, res.OfflineUnderReviewAt.Equal(*stats.OfflineUnderReviewAt)) + require.Equal(t, res.OnlineScore, stats.OnlineScore) + require.Nil(t, res.AuditHistory) compareReputationMetric(t, &res.Audit, &stats.Audit) }) @@ -105,22 +107,22 @@ func TestReputationDBGetAll(t *testing.T) { } res, err := reputationDB.All(ctx) - assert.NoError(t, err) - assert.NotNil(t, res) - assert.Equal(t, len(stats), len(res)) + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, len(stats), len(res)) for _, rep := range res { - assert.Contains(t, stats, rep) + require.Contains(t, stats, rep) if rep.SatelliteID == stats[0].SatelliteID { - assert.Equal(t, rep.DisqualifiedAt, stats[0].DisqualifiedAt) - assert.Equal(t, rep.SuspendedAt, stats[0].SuspendedAt) - assert.Equal(t, rep.UpdatedAt, stats[0].UpdatedAt) - assert.Equal(t, rep.JoinedAt, stats[0].JoinedAt) - assert.Equal(t, rep.OfflineSuspendedAt, stats[0].OfflineSuspendedAt) - assert.Equal(t, rep.OfflineUnderReviewAt, stats[0].OfflineUnderReviewAt) - assert.Equal(t, rep.OnlineScore, stats[0].OnlineScore) - assert.Nil(t, rep.AuditHistory) + require.Equal(t, rep.DisqualifiedAt, stats[0].DisqualifiedAt) + require.Equal(t, rep.SuspendedAt, stats[0].SuspendedAt) + require.Equal(t, rep.UpdatedAt, stats[0].UpdatedAt) + require.Equal(t, rep.JoinedAt, stats[0].JoinedAt) + require.Equal(t, rep.OfflineSuspendedAt, stats[0].OfflineSuspendedAt) + require.Equal(t, rep.OfflineUnderReviewAt, stats[0].OfflineUnderReviewAt) + require.Equal(t, rep.OnlineScore, stats[0].OnlineScore) + require.Nil(t, rep.AuditHistory) compareReputationMetric(t, &rep.Audit, &stats[0].Audit) } @@ -130,11 +132,11 @@ func TestReputationDBGetAll(t *testing.T) { // compareReputationMetric compares two reputation metrics and asserts that they are equal. func compareReputationMetric(t *testing.T, a, b *reputation.Metric) { - assert.Equal(t, a.SuccessCount, b.SuccessCount) - assert.Equal(t, a.TotalCount, b.TotalCount) - assert.Equal(t, a.Alpha, b.Alpha) - assert.Equal(t, a.Beta, b.Beta) - assert.Equal(t, a.Score, b.Score) + require.Equal(t, a.SuccessCount, b.SuccessCount) + require.Equal(t, a.TotalCount, b.TotalCount) + require.Equal(t, a.Alpha, b.Alpha) + require.Equal(t, a.Beta, b.Beta) + require.Equal(t, a.Score, b.Score) } func TestReputationDBGetInsertAuditHistory(t *testing.T) { @@ -159,20 +161,136 @@ func TestReputationDBGetInsertAuditHistory(t *testing.T) { t.Run("insert", func(t *testing.T) { err := reputationDB.Store(ctx, stats) - assert.NoError(t, err) + require.NoError(t, err) }) t.Run("get", func(t *testing.T) { res, err := reputationDB.Get(ctx, stats.SatelliteID) - assert.NoError(t, err) + require.NoError(t, err) - assert.Equal(t, res.AuditHistory.Score, stats.AuditHistory.Score) - assert.Equal(t, len(res.AuditHistory.Windows), len(stats.AuditHistory.Windows)) + require.Equal(t, res.AuditHistory.Score, stats.AuditHistory.Score) + require.Equal(t, len(res.AuditHistory.Windows), len(stats.AuditHistory.Windows)) resWindow := res.AuditHistory.Windows[0] statsWindow := stats.AuditHistory.Windows[0] - assert.True(t, resWindow.WindowStart.Equal(statsWindow.WindowStart)) - assert.Equal(t, resWindow.TotalCount, statsWindow.TotalCount) - assert.Equal(t, resWindow.OnlineCount, statsWindow.OnlineCount) + require.True(t, resWindow.WindowStart.Equal(statsWindow.WindowStart)) + require.Equal(t, resWindow.TotalCount, statsWindow.TotalCount) + require.Equal(t, resWindow.OnlineCount, statsWindow.OnlineCount) }) }) } + +func TestServiceStore(t *testing.T) { + storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) { + reputationDB := db.Reputation() + notificationsDB := db.Notifications() + log := zaptest.NewLogger(t) + notificationService := notifications.NewService(log, notificationsDB) + reputationService := reputation.NewService(log, reputationDB, storj.NodeID{}, notificationService) + + id := testrand.NodeID() + now := time.Now().AddDate(0, 0, -2) + later := time.Now().AddDate(0, 0, -1) + + stats := reputation.Stats{ + SatelliteID: id, + } + + err := reputationDB.Store(ctx, stats) + require.NoError(t, err) + + statsNew := reputation.Stats{ + SatelliteID: id, + OfflineSuspendedAt: &now, + } + + err = reputationService.Store(ctx, statsNew, id) + require.NoError(t, err) + amount, err := notificationsDB.UnreadAmount(ctx) + require.NoError(t, err) + require.Equal(t, amount, 1) + + statsNew = reputation.Stats{ + SatelliteID: id, + OfflineSuspendedAt: &later, + } + + err = reputationService.Store(ctx, statsNew, id) + require.NoError(t, err) + amount, err = notificationsDB.UnreadAmount(ctx) + require.NoError(t, err) + require.Equal(t, amount, 2) + + statsNew = reputation.Stats{ + SatelliteID: id, + OfflineSuspendedAt: &later, + DisqualifiedAt: &later, + } + + err = reputationService.Store(ctx, statsNew, id) + require.NoError(t, err) + amount, err = notificationsDB.UnreadAmount(ctx) + require.NoError(t, err) + require.Equal(t, amount, 2) + + statsNew = reputation.Stats{ + SatelliteID: id, + OfflineSuspendedAt: &now, + DisqualifiedAt: &later, + } + + err = reputationService.Store(ctx, statsNew, id) + require.NoError(t, err) + amount, err = notificationsDB.UnreadAmount(ctx) + require.NoError(t, err) + require.Equal(t, amount, 2) + + statsNew = reputation.Stats{ + SatelliteID: id, + OfflineSuspendedAt: &later, + DisqualifiedAt: nil, + } + + err = reputationService.Store(ctx, statsNew, id) + require.NoError(t, err) + amount, err = notificationsDB.UnreadAmount(ctx) + require.NoError(t, err) + require.Equal(t, amount, 3) + + later = later.AddDate(0, 1, 0) + + statsNew = reputation.Stats{ + SatelliteID: id, + OfflineSuspendedAt: &later, + } + + err = reputationService.Store(ctx, statsNew, id) + require.NoError(t, err) + amount, err = notificationsDB.UnreadAmount(ctx) + require.NoError(t, err) + require.Equal(t, amount, 4) + + statsNew = reputation.Stats{ + SatelliteID: id, + OfflineSuspendedAt: &later, + } + + err = reputationService.Store(ctx, statsNew, id) + require.NoError(t, err) + amount, err = notificationsDB.UnreadAmount(ctx) + require.NoError(t, err) + require.Equal(t, amount, 4) + + id2 := testrand.NodeID() + + statsNew = reputation.Stats{ + SatelliteID: id2, + OfflineSuspendedAt: &later, + } + + err = reputationService.Store(ctx, statsNew, id2) + require.NoError(t, err) + amount, err = notificationsDB.UnreadAmount(ctx) + require.NoError(t, err) + require.Equal(t, amount, 5) + }) +} diff --git a/storagenode/reputation/service.go b/storagenode/reputation/service.go index 3f179e9fa..d5b4c2fd4 100644 --- a/storagenode/reputation/service.go +++ b/storagenode/reputation/service.go @@ -5,6 +5,7 @@ package reputation import ( "context" + "time" "go.uber.org/zap" @@ -35,33 +36,51 @@ func NewService(log *zap.Logger, db DB, nodeID storj.NodeID, notifications *noti // Store stores reputation stats into db, and notify's in case of offline suspension. func (s *Service) Store(ctx context.Context, stats Stats, satelliteID storj.NodeID) error { - if err := s.db.Store(ctx, stats); err != nil { + rep, err := s.db.Get(ctx, satelliteID) + if err != nil { return err } - if stats.DisqualifiedAt == nil && stats.OfflineSuspendedAt != nil { - s.notifyOfflineSuspension(ctx, satelliteID) + err = s.db.Store(ctx, stats) + if err != nil { + return err + } + + if stats.DisqualifiedAt == nil && isSuspended(stats, *rep) { + notification := newSuspensionNotification(satelliteID, s.nodeID, *stats.OfflineSuspendedAt) + + _, err = s.notifications.Receive(ctx, notification) + if err != nil { + s.log.Sugar().Errorf("Failed to receive notification", err.Error()) + } } return nil } -// NotifyOfflineSuspension notifies storagenode about offline suspension. -func (s *Service) notifyOfflineSuspension(ctx context.Context, satelliteID storj.NodeID) { - notification := NewSuspensionNotification(satelliteID, s.nodeID) - - _, err := s.notifications.Receive(ctx, notification) - if err != nil { - s.log.Sugar().Errorf("Failed to receive notification", err.Error()) +// isSuspended returns if there's new downtime suspension. +func isSuspended(new, old Stats) bool { + if new.OfflineSuspendedAt == nil { + return false } + + if old.OfflineSuspendedAt == nil { + return true + } + + if !old.OfflineSuspendedAt.Equal(*new.OfflineSuspendedAt) { + return true + } + + return false } -// NewSuspensionNotification - returns offline suspension notification. -func NewSuspensionNotification(satelliteID storj.NodeID, senderID storj.NodeID) (_ notifications.NewNotification) { +// newSuspensionNotification - returns offline suspension notification. +func newSuspensionNotification(satelliteID storj.NodeID, senderID storj.NodeID, time time.Time) (_ notifications.NewNotification) { return notifications.NewNotification{ SenderID: senderID, - Type: notifications.TypeCustom, - Title: "Your Node was suspended!", - Message: "This is a reminder that your Storage Node on " + satelliteID.String() + "Satellite is suspended", + Type: notifications.TypeSuspension, + Title: "Your Node was suspended " + time.String(), + Message: "This is a reminder that your StorageNode on " + satelliteID.String() + "Satellite is suspended", } }