storagenode/suspension: fixed timings of downtime suspension notifications

Change-Id: Ie5b4f369efc0e66910b2f939aa8c5b11dc92946c
This commit is contained in:
Qweder93 2021-01-29 16:36:16 +02:00
parent e86041a5cf
commit dc6a8cbcf3
3 changed files with 234 additions and 97 deletions

View File

@ -7,7 +7,7 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"storj.io/common/identity/testidentity"
"storj.io/common/storj"
@ -51,43 +51,43 @@ func TestNotificationsDB(t *testing.T) {
}
notificationFromDB0, err := notificationsdb.Insert(ctx, expectedNotification0)
assert.NoError(t, err)
assert.Equal(t, expectedNotification0.SenderID, notificationFromDB0.SenderID)
assert.Equal(t, expectedNotification0.Type, notificationFromDB0.Type)
assert.Equal(t, expectedNotification0.Title, notificationFromDB0.Title)
assert.Equal(t, expectedNotification0.Message, notificationFromDB0.Message)
require.NoError(t, err)
require.Equal(t, expectedNotification0.SenderID, notificationFromDB0.SenderID)
require.Equal(t, expectedNotification0.Type, notificationFromDB0.Type)
require.Equal(t, expectedNotification0.Title, notificationFromDB0.Title)
require.Equal(t, expectedNotification0.Message, notificationFromDB0.Message)
// Ensure that every insert gets a different "created at" time.
waitForTimeToChange()
notificationFromDB1, err := notificationsdb.Insert(ctx, expectedNotification1)
assert.NoError(t, err)
assert.Equal(t, expectedNotification1.SenderID, notificationFromDB1.SenderID)
assert.Equal(t, expectedNotification1.Type, notificationFromDB1.Type)
assert.Equal(t, expectedNotification1.Title, notificationFromDB1.Title)
assert.Equal(t, expectedNotification1.Message, notificationFromDB1.Message)
require.NoError(t, err)
require.Equal(t, expectedNotification1.SenderID, notificationFromDB1.SenderID)
require.Equal(t, expectedNotification1.Type, notificationFromDB1.Type)
require.Equal(t, expectedNotification1.Title, notificationFromDB1.Title)
require.Equal(t, expectedNotification1.Message, notificationFromDB1.Message)
waitForTimeToChange()
notificationFromDB2, err := notificationsdb.Insert(ctx, expectedNotification2)
assert.NoError(t, err)
assert.Equal(t, expectedNotification2.SenderID, notificationFromDB2.SenderID)
assert.Equal(t, expectedNotification2.Type, notificationFromDB2.Type)
assert.Equal(t, expectedNotification2.Title, notificationFromDB2.Title)
assert.Equal(t, expectedNotification2.Message, notificationFromDB2.Message)
require.NoError(t, err)
require.Equal(t, expectedNotification2.SenderID, notificationFromDB2.SenderID)
require.Equal(t, expectedNotification2.Type, notificationFromDB2.Type)
require.Equal(t, expectedNotification2.Title, notificationFromDB2.Title)
require.Equal(t, expectedNotification2.Message, notificationFromDB2.Message)
page := notifications.Page{}
// test List method to return right form of page depending on cursor.
t.Run("test paged list", func(t *testing.T) {
page, err = notificationsdb.List(ctx, notificationCursor)
assert.NoError(t, err)
assert.Equal(t, 2, len(page.Notifications))
assert.Equal(t, notificationFromDB1, page.Notifications[1])
assert.Equal(t, notificationFromDB2, page.Notifications[0])
assert.Equal(t, notificationCursor.Limit, page.Limit)
assert.Equal(t, uint64(0), page.Offset)
assert.Equal(t, uint(2), page.PageCount)
assert.Equal(t, uint64(3), page.TotalCount)
assert.Equal(t, uint(1), page.CurrentPage)
require.NoError(t, err)
require.Equal(t, 2, len(page.Notifications))
require.Equal(t, notificationFromDB1, page.Notifications[1])
require.Equal(t, notificationFromDB2, page.Notifications[0])
require.Equal(t, notificationCursor.Limit, page.Limit)
require.Equal(t, uint64(0), page.Offset)
require.Equal(t, uint(2), page.PageCount)
require.Equal(t, uint64(3), page.TotalCount)
require.Equal(t, uint(1), page.CurrentPage)
})
notificationCursor = notifications.Cursor{
@ -98,32 +98,32 @@ func TestNotificationsDB(t *testing.T) {
// test Read method to make specific notification's status as read.
t.Run("test notification read", func(t *testing.T) {
err = notificationsdb.Read(ctx, notificationFromDB0.ID)
assert.NoError(t, err)
require.NoError(t, err)
page, err = notificationsdb.List(ctx, notificationCursor)
assert.NoError(t, err)
assert.NotEqual(t, page.Notifications[2].ReadAt, (*time.Time)(nil))
require.NoError(t, err)
require.NotEqual(t, page.Notifications[2].ReadAt, (*time.Time)(nil))
err = notificationsdb.Read(ctx, notificationFromDB1.ID)
assert.NoError(t, err)
require.NoError(t, err)
page, err = notificationsdb.List(ctx, notificationCursor)
assert.NoError(t, err)
assert.NotEqual(t, page.Notifications[1].ReadAt, (*time.Time)(nil))
require.NoError(t, err)
require.NotEqual(t, page.Notifications[1].ReadAt, (*time.Time)(nil))
assert.Equal(t, page.Notifications[0].ReadAt, (*time.Time)(nil))
require.Equal(t, page.Notifications[0].ReadAt, (*time.Time)(nil))
})
// test ReadAll method to make all notifications' status as read.
t.Run("test notification read all", func(t *testing.T) {
err = notificationsdb.ReadAll(ctx)
assert.NoError(t, err)
require.NoError(t, err)
page, err = notificationsdb.List(ctx, notificationCursor)
assert.NoError(t, err)
assert.NotEqual(t, page.Notifications[2].ReadAt, (*time.Time)(nil))
assert.NotEqual(t, page.Notifications[1].ReadAt, (*time.Time)(nil))
assert.NotEqual(t, page.Notifications[0].ReadAt, (*time.Time)(nil))
require.NoError(t, err)
require.NotEqual(t, page.Notifications[2].ReadAt, (*time.Time)(nil))
require.NotEqual(t, page.Notifications[1].ReadAt, (*time.Time)(nil))
require.NotEqual(t, page.Notifications[0].ReadAt, (*time.Time)(nil))
})
})
}
@ -140,25 +140,25 @@ func TestEmptyNotificationsDB(t *testing.T) {
// test List method to return right form of page depending on cursor with empty database.
t.Run("test empty paged list", func(t *testing.T) {
page, err := notificationsdb.List(ctx, notificationCursor)
assert.NoError(t, err)
assert.Equal(t, len(page.Notifications), 0)
assert.Equal(t, page.Limit, notificationCursor.Limit)
assert.Equal(t, page.Offset, uint64(0))
assert.Equal(t, page.PageCount, uint(0))
assert.Equal(t, page.TotalCount, uint64(0))
assert.Equal(t, page.CurrentPage, uint(0))
require.NoError(t, err)
require.Equal(t, len(page.Notifications), 0)
require.Equal(t, page.Limit, notificationCursor.Limit)
require.Equal(t, page.Offset, uint64(0))
require.Equal(t, page.PageCount, uint(0))
require.Equal(t, page.TotalCount, uint64(0))
require.Equal(t, page.CurrentPage, uint(0))
})
// test notification read with not existing id.
t.Run("test notification read with not existing id", func(t *testing.T) {
err := notificationsdb.Read(ctx, testrand.UUID())
assert.Error(t, err, "no rows affected")
require.Error(t, err, "no rows affected")
})
// test read for all notifications if they don't exist.
t.Run("test notification readAll on empty page", func(t *testing.T) {
err := notificationsdb.ReadAll(ctx)
assert.NoError(t, err)
require.NoError(t, err)
})
})
}

View File

@ -7,13 +7,15 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/notifications"
"storj.io/storj/storagenode/reputation"
"storj.io/storj/storagenode/storagenodedb/storagenodedbtest"
)
@ -46,22 +48,22 @@ func TestReputationDBGetInsert(t *testing.T) {
t.Run("insert", func(t *testing.T) {
err := reputationDB.Store(ctx, stats)
assert.NoError(t, err)
require.NoError(t, err)
})
t.Run("get", func(t *testing.T) {
res, err := reputationDB.Get(ctx, stats.SatelliteID)
assert.NoError(t, err)
require.NoError(t, err)
assert.Equal(t, res.SatelliteID, stats.SatelliteID)
assert.True(t, res.DisqualifiedAt.Equal(*stats.DisqualifiedAt))
assert.True(t, res.SuspendedAt.Equal(*stats.SuspendedAt))
assert.True(t, res.UpdatedAt.Equal(stats.UpdatedAt))
assert.True(t, res.JoinedAt.Equal(stats.JoinedAt))
assert.True(t, res.OfflineSuspendedAt.Equal(*stats.OfflineSuspendedAt))
assert.True(t, res.OfflineUnderReviewAt.Equal(*stats.OfflineUnderReviewAt))
assert.Equal(t, res.OnlineScore, stats.OnlineScore)
assert.Nil(t, res.AuditHistory)
require.Equal(t, res.SatelliteID, stats.SatelliteID)
require.True(t, res.DisqualifiedAt.Equal(*stats.DisqualifiedAt))
require.True(t, res.SuspendedAt.Equal(*stats.SuspendedAt))
require.True(t, res.UpdatedAt.Equal(stats.UpdatedAt))
require.True(t, res.JoinedAt.Equal(stats.JoinedAt))
require.True(t, res.OfflineSuspendedAt.Equal(*stats.OfflineSuspendedAt))
require.True(t, res.OfflineUnderReviewAt.Equal(*stats.OfflineUnderReviewAt))
require.Equal(t, res.OnlineScore, stats.OnlineScore)
require.Nil(t, res.AuditHistory)
compareReputationMetric(t, &res.Audit, &stats.Audit)
})
@ -105,22 +107,22 @@ func TestReputationDBGetAll(t *testing.T) {
}
res, err := reputationDB.All(ctx)
assert.NoError(t, err)
assert.NotNil(t, res)
assert.Equal(t, len(stats), len(res))
require.NoError(t, err)
require.NotNil(t, res)
require.Equal(t, len(stats), len(res))
for _, rep := range res {
assert.Contains(t, stats, rep)
require.Contains(t, stats, rep)
if rep.SatelliteID == stats[0].SatelliteID {
assert.Equal(t, rep.DisqualifiedAt, stats[0].DisqualifiedAt)
assert.Equal(t, rep.SuspendedAt, stats[0].SuspendedAt)
assert.Equal(t, rep.UpdatedAt, stats[0].UpdatedAt)
assert.Equal(t, rep.JoinedAt, stats[0].JoinedAt)
assert.Equal(t, rep.OfflineSuspendedAt, stats[0].OfflineSuspendedAt)
assert.Equal(t, rep.OfflineUnderReviewAt, stats[0].OfflineUnderReviewAt)
assert.Equal(t, rep.OnlineScore, stats[0].OnlineScore)
assert.Nil(t, rep.AuditHistory)
require.Equal(t, rep.DisqualifiedAt, stats[0].DisqualifiedAt)
require.Equal(t, rep.SuspendedAt, stats[0].SuspendedAt)
require.Equal(t, rep.UpdatedAt, stats[0].UpdatedAt)
require.Equal(t, rep.JoinedAt, stats[0].JoinedAt)
require.Equal(t, rep.OfflineSuspendedAt, stats[0].OfflineSuspendedAt)
require.Equal(t, rep.OfflineUnderReviewAt, stats[0].OfflineUnderReviewAt)
require.Equal(t, rep.OnlineScore, stats[0].OnlineScore)
require.Nil(t, rep.AuditHistory)
compareReputationMetric(t, &rep.Audit, &stats[0].Audit)
}
@ -130,11 +132,11 @@ func TestReputationDBGetAll(t *testing.T) {
// compareReputationMetric compares two reputation metrics and asserts that they are equal.
func compareReputationMetric(t *testing.T, a, b *reputation.Metric) {
assert.Equal(t, a.SuccessCount, b.SuccessCount)
assert.Equal(t, a.TotalCount, b.TotalCount)
assert.Equal(t, a.Alpha, b.Alpha)
assert.Equal(t, a.Beta, b.Beta)
assert.Equal(t, a.Score, b.Score)
require.Equal(t, a.SuccessCount, b.SuccessCount)
require.Equal(t, a.TotalCount, b.TotalCount)
require.Equal(t, a.Alpha, b.Alpha)
require.Equal(t, a.Beta, b.Beta)
require.Equal(t, a.Score, b.Score)
}
func TestReputationDBGetInsertAuditHistory(t *testing.T) {
@ -159,20 +161,136 @@ func TestReputationDBGetInsertAuditHistory(t *testing.T) {
t.Run("insert", func(t *testing.T) {
err := reputationDB.Store(ctx, stats)
assert.NoError(t, err)
require.NoError(t, err)
})
t.Run("get", func(t *testing.T) {
res, err := reputationDB.Get(ctx, stats.SatelliteID)
assert.NoError(t, err)
require.NoError(t, err)
assert.Equal(t, res.AuditHistory.Score, stats.AuditHistory.Score)
assert.Equal(t, len(res.AuditHistory.Windows), len(stats.AuditHistory.Windows))
require.Equal(t, res.AuditHistory.Score, stats.AuditHistory.Score)
require.Equal(t, len(res.AuditHistory.Windows), len(stats.AuditHistory.Windows))
resWindow := res.AuditHistory.Windows[0]
statsWindow := stats.AuditHistory.Windows[0]
assert.True(t, resWindow.WindowStart.Equal(statsWindow.WindowStart))
assert.Equal(t, resWindow.TotalCount, statsWindow.TotalCount)
assert.Equal(t, resWindow.OnlineCount, statsWindow.OnlineCount)
require.True(t, resWindow.WindowStart.Equal(statsWindow.WindowStart))
require.Equal(t, resWindow.TotalCount, statsWindow.TotalCount)
require.Equal(t, resWindow.OnlineCount, statsWindow.OnlineCount)
})
})
}
func TestServiceStore(t *testing.T) {
storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) {
reputationDB := db.Reputation()
notificationsDB := db.Notifications()
log := zaptest.NewLogger(t)
notificationService := notifications.NewService(log, notificationsDB)
reputationService := reputation.NewService(log, reputationDB, storj.NodeID{}, notificationService)
id := testrand.NodeID()
now := time.Now().AddDate(0, 0, -2)
later := time.Now().AddDate(0, 0, -1)
stats := reputation.Stats{
SatelliteID: id,
}
err := reputationDB.Store(ctx, stats)
require.NoError(t, err)
statsNew := reputation.Stats{
SatelliteID: id,
OfflineSuspendedAt: &now,
}
err = reputationService.Store(ctx, statsNew, id)
require.NoError(t, err)
amount, err := notificationsDB.UnreadAmount(ctx)
require.NoError(t, err)
require.Equal(t, amount, 1)
statsNew = reputation.Stats{
SatelliteID: id,
OfflineSuspendedAt: &later,
}
err = reputationService.Store(ctx, statsNew, id)
require.NoError(t, err)
amount, err = notificationsDB.UnreadAmount(ctx)
require.NoError(t, err)
require.Equal(t, amount, 2)
statsNew = reputation.Stats{
SatelliteID: id,
OfflineSuspendedAt: &later,
DisqualifiedAt: &later,
}
err = reputationService.Store(ctx, statsNew, id)
require.NoError(t, err)
amount, err = notificationsDB.UnreadAmount(ctx)
require.NoError(t, err)
require.Equal(t, amount, 2)
statsNew = reputation.Stats{
SatelliteID: id,
OfflineSuspendedAt: &now,
DisqualifiedAt: &later,
}
err = reputationService.Store(ctx, statsNew, id)
require.NoError(t, err)
amount, err = notificationsDB.UnreadAmount(ctx)
require.NoError(t, err)
require.Equal(t, amount, 2)
statsNew = reputation.Stats{
SatelliteID: id,
OfflineSuspendedAt: &later,
DisqualifiedAt: nil,
}
err = reputationService.Store(ctx, statsNew, id)
require.NoError(t, err)
amount, err = notificationsDB.UnreadAmount(ctx)
require.NoError(t, err)
require.Equal(t, amount, 3)
later = later.AddDate(0, 1, 0)
statsNew = reputation.Stats{
SatelliteID: id,
OfflineSuspendedAt: &later,
}
err = reputationService.Store(ctx, statsNew, id)
require.NoError(t, err)
amount, err = notificationsDB.UnreadAmount(ctx)
require.NoError(t, err)
require.Equal(t, amount, 4)
statsNew = reputation.Stats{
SatelliteID: id,
OfflineSuspendedAt: &later,
}
err = reputationService.Store(ctx, statsNew, id)
require.NoError(t, err)
amount, err = notificationsDB.UnreadAmount(ctx)
require.NoError(t, err)
require.Equal(t, amount, 4)
id2 := testrand.NodeID()
statsNew = reputation.Stats{
SatelliteID: id2,
OfflineSuspendedAt: &later,
}
err = reputationService.Store(ctx, statsNew, id2)
require.NoError(t, err)
amount, err = notificationsDB.UnreadAmount(ctx)
require.NoError(t, err)
require.Equal(t, amount, 5)
})
}

View File

@ -5,6 +5,7 @@ package reputation
import (
"context"
"time"
"go.uber.org/zap"
@ -35,33 +36,51 @@ func NewService(log *zap.Logger, db DB, nodeID storj.NodeID, notifications *noti
// Store stores reputation stats into db, and notify's in case of offline suspension.
func (s *Service) Store(ctx context.Context, stats Stats, satelliteID storj.NodeID) error {
if err := s.db.Store(ctx, stats); err != nil {
rep, err := s.db.Get(ctx, satelliteID)
if err != nil {
return err
}
if stats.DisqualifiedAt == nil && stats.OfflineSuspendedAt != nil {
s.notifyOfflineSuspension(ctx, satelliteID)
err = s.db.Store(ctx, stats)
if err != nil {
return err
}
if stats.DisqualifiedAt == nil && isSuspended(stats, *rep) {
notification := newSuspensionNotification(satelliteID, s.nodeID, *stats.OfflineSuspendedAt)
_, err = s.notifications.Receive(ctx, notification)
if err != nil {
s.log.Sugar().Errorf("Failed to receive notification", err.Error())
}
}
return nil
}
// NotifyOfflineSuspension notifies storagenode about offline suspension.
func (s *Service) notifyOfflineSuspension(ctx context.Context, satelliteID storj.NodeID) {
notification := NewSuspensionNotification(satelliteID, s.nodeID)
_, err := s.notifications.Receive(ctx, notification)
if err != nil {
s.log.Sugar().Errorf("Failed to receive notification", err.Error())
// isSuspended returns if there's new downtime suspension.
func isSuspended(new, old Stats) bool {
if new.OfflineSuspendedAt == nil {
return false
}
if old.OfflineSuspendedAt == nil {
return true
}
if !old.OfflineSuspendedAt.Equal(*new.OfflineSuspendedAt) {
return true
}
return false
}
// NewSuspensionNotification - returns offline suspension notification.
func NewSuspensionNotification(satelliteID storj.NodeID, senderID storj.NodeID) (_ notifications.NewNotification) {
// newSuspensionNotification - returns offline suspension notification.
func newSuspensionNotification(satelliteID storj.NodeID, senderID storj.NodeID, time time.Time) (_ notifications.NewNotification) {
return notifications.NewNotification{
SenderID: senderID,
Type: notifications.TypeCustom,
Title: "Your Node was suspended!",
Message: "This is a reminder that your Storage Node on " + satelliteID.String() + "Satellite is suspended",
Type: notifications.TypeSuspension,
Title: "Your Node was suspended " + time.String(),
Message: "This is a reminder that your StorageNode on " + satelliteID.String() + "Satellite is suspended",
}
}