diff --git a/satellite/overlay/service.go b/satellite/overlay/service.go index a6b5a5edd..4a9eced2d 100644 --- a/satellite/overlay/service.go +++ b/satellite/overlay/service.go @@ -539,9 +539,19 @@ func (service *Service) Reliable(ctx context.Context) (nodes storj.NodeIDList, e } // UpdateReputation updates the DB columns for any of the reputation fields. -func (service *Service) UpdateReputation(ctx context.Context, id storj.NodeID, request ReputationUpdate) (err error) { +func (service *Service) UpdateReputation(ctx context.Context, id storj.NodeID, email string, request ReputationUpdate, reputationChanges []nodeevents.Type) (err error) { defer mon.Task()(&ctx)(&err) - return service.db.UpdateReputation(ctx, id, request) + + err = service.db.UpdateReputation(ctx, id, request) + if err != nil { + return err + } + + if service.config.SendNodeEmails { + service.insertReputationNodeEvents(ctx, email, id, reputationChanges) + } + + return nil } // UpdateNodeInfo updates node dossier with info requested from the node itself like node type, email, wallet, capacity, and version. @@ -765,3 +775,38 @@ func (service *Service) TestNodeCountryCode(ctx context.Context, nodeID storj.No return nil } + +func (service *Service) insertReputationNodeEvents(ctx context.Context, email string, id storj.NodeID, repEvents []nodeevents.Type) { + defer mon.Task()(&ctx)(nil) + + for _, event := range repEvents { + switch event { + case nodeevents.Disqualified: + _, err := service.nodeEvents.Insert(ctx, email, id, nodeevents.Disqualified) + if err != nil { + service.log.Error("could not insert node disqualified into node events", zap.Error(err)) + } + case nodeevents.UnknownAuditSuspended: + _, err := service.nodeEvents.Insert(ctx, email, id, nodeevents.UnknownAuditSuspended) + if err != nil { + service.log.Error("could not insert node unknown audit suspended into node events", zap.Error(err)) + } + case nodeevents.UnknownAuditUnsuspended: + _, err := service.nodeEvents.Insert(ctx, email, id, nodeevents.UnknownAuditUnsuspended) + if err != nil { + service.log.Error("could not insert node unknown audit unsuspended into node events", zap.Error(err)) + } + case nodeevents.OfflineSuspended: + _, err := service.nodeEvents.Insert(ctx, email, id, nodeevents.OfflineSuspended) + if err != nil { + service.log.Error("could not insert node offline suspended into node events", zap.Error(err)) + } + case nodeevents.OfflineUnsuspended: + _, err := service.nodeEvents.Insert(ctx, email, id, nodeevents.OfflineUnsuspended) + if err != nil { + service.log.Error("could not insert node offline unsuspended into node events", zap.Error(err)) + } + default: + } + } +} diff --git a/satellite/overlay/service_test.go b/satellite/overlay/service_test.go index d222685e9..ad3159035 100644 --- a/satellite/overlay/service_test.go +++ b/satellite/overlay/service_test.go @@ -740,30 +740,31 @@ func TestUpdateReputation(t *testing.T) { t2 := t0.Add(2 * time.Hour) t3 := t0.Add(3 * time.Hour) - reputationChange := overlay.ReputationUpdate{ + reputationUpdate := overlay.ReputationUpdate{ Disqualified: nil, UnknownAuditSuspended: &t1, OfflineSuspended: &t2, VettedAt: &t3, } - err = service.UpdateReputation(ctx, node.ID(), reputationChange) + repChange := []nodeevents.Type{nodeevents.UnknownAuditSuspended, nodeevents.OfflineSuspended} + err = service.UpdateReputation(ctx, node.ID(), "", reputationUpdate, repChange) require.NoError(t, err) info, err = service.Get(ctx, node.ID()) require.NoError(t, err) - require.Equal(t, reputationChange.Disqualified, info.Disqualified) - require.Equal(t, reputationChange.UnknownAuditSuspended, info.UnknownAuditSuspended) - require.Equal(t, reputationChange.OfflineSuspended, info.OfflineSuspended) - require.Equal(t, reputationChange.VettedAt, info.Reputation.Status.VettedAt) + require.Equal(t, reputationUpdate.Disqualified, info.Disqualified) + require.Equal(t, reputationUpdate.UnknownAuditSuspended, info.UnknownAuditSuspended) + require.Equal(t, reputationUpdate.OfflineSuspended, info.OfflineSuspended) + require.Equal(t, reputationUpdate.VettedAt, info.Reputation.Status.VettedAt) - reputationChange.Disqualified = &t0 - - err = service.UpdateReputation(ctx, node.ID(), reputationChange) + reputationUpdate.Disqualified = &t0 + repChange = []nodeevents.Type{nodeevents.Disqualified} + err = service.UpdateReputation(ctx, node.ID(), "", reputationUpdate, repChange) require.NoError(t, err) info, err = service.Get(ctx, node.ID()) require.NoError(t, err) - require.Equal(t, reputationChange.Disqualified, info.Disqualified) + require.Equal(t, reputationUpdate.Disqualified, info.Disqualified) nodeInfo, err := overlaydb.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{ NodeID: node.ID(), @@ -777,8 +778,8 @@ func TestUpdateReputation(t *testing.T) { // make sure Disqualified field is not updated if a node has finished // graceful exit - reputationChange.Disqualified = &t0 - err = service.UpdateReputation(ctx, node.ID(), reputationChange) + reputationUpdate.Disqualified = &t0 + err = service.UpdateReputation(ctx, node.ID(), "", reputationUpdate, repChange) require.NoError(t, err) exitedNodeInfo, err := service.Get(ctx, node.ID()) @@ -882,6 +883,77 @@ func TestKnownReliableInExcludedCountries(t *testing.T) { }) } +func TestUpdateReputationNodeEvents(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 2, UplinkCount: 0, + Reconfigure: testplanet.Reconfigure{ + Satellite: func(log *zap.Logger, index int, config *satellite.Config) { + config.Overlay.SendNodeEmails = true + }, + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + service := planet.Satellites[0].Overlay.Service + node := planet.StorageNodes[0] + email := "test@storj.test" + neDB := planet.Satellites[0].DB.NodeEvents() + + now := time.Now() + repUpdate := overlay.ReputationUpdate{ + UnknownAuditSuspended: &now, + } + + repChanges := []nodeevents.Type{nodeevents.UnknownAuditSuspended} + + require.NoError(t, service.UpdateReputation(ctx, node.ID(), email, repUpdate, repChanges)) + + ne, err := neDB.GetLatestByEmailAndEvent(ctx, email, nodeevents.UnknownAuditSuspended) + require.NoError(t, err) + require.Equal(t, email, ne.Email) + require.Equal(t, node.ID(), ne.NodeID) + require.Equal(t, nodeevents.UnknownAuditSuspended, ne.Event) + + repUpdate.UnknownAuditSuspended = nil + repChanges = []nodeevents.Type{nodeevents.UnknownAuditUnsuspended} + require.NoError(t, service.UpdateReputation(ctx, node.ID(), "test@storj.test", repUpdate, repChanges)) + + ne, err = neDB.GetLatestByEmailAndEvent(ctx, email, nodeevents.UnknownAuditUnsuspended) + require.NoError(t, err) + require.Equal(t, email, ne.Email) + require.Equal(t, node.ID(), ne.NodeID) + require.Equal(t, nodeevents.UnknownAuditUnsuspended, ne.Event) + + repUpdate.OfflineSuspended = &now + repChanges = []nodeevents.Type{nodeevents.OfflineSuspended} + require.NoError(t, service.UpdateReputation(ctx, node.ID(), "test@storj.test", repUpdate, repChanges)) + + ne, err = neDB.GetLatestByEmailAndEvent(ctx, email, nodeevents.OfflineSuspended) + require.NoError(t, err) + require.Equal(t, email, ne.Email) + require.Equal(t, node.ID(), ne.NodeID) + require.Equal(t, nodeevents.OfflineSuspended, ne.Event) + + repUpdate.OfflineSuspended = nil + repChanges = []nodeevents.Type{nodeevents.OfflineUnsuspended} + require.NoError(t, service.UpdateReputation(ctx, node.ID(), "test@storj.test", repUpdate, repChanges)) + + ne, err = neDB.GetLatestByEmailAndEvent(ctx, email, nodeevents.OfflineUnsuspended) + require.NoError(t, err) + require.Equal(t, email, ne.Email) + require.Equal(t, node.ID(), ne.NodeID) + require.Equal(t, nodeevents.OfflineUnsuspended, ne.Event) + + repUpdate.Disqualified = &now + repChanges = []nodeevents.Type{nodeevents.Disqualified} + require.NoError(t, service.UpdateReputation(ctx, node.ID(), "test@storj.test", repUpdate, repChanges)) + + ne, err = neDB.GetLatestByEmailAndEvent(ctx, email, nodeevents.Disqualified) + require.NoError(t, err) + require.Equal(t, email, ne.Email) + require.Equal(t, node.ID(), ne.NodeID) + require.Equal(t, nodeevents.Disqualified, ne.Event) + }) +} + func TestUpdateCheckInNodeEventOnline(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 2, UplinkCount: 0, diff --git a/satellite/reputation/service.go b/satellite/reputation/service.go index 7f51def6f..d2aa7c511 100644 --- a/satellite/reputation/service.go +++ b/satellite/reputation/service.go @@ -11,6 +11,7 @@ import ( "storj.io/common/pb" "storj.io/common/storj" + "storj.io/storj/satellite/nodeevents" "storj.io/storj/satellite/overlay" ) @@ -120,7 +121,8 @@ func (service *Service) ApplyAudit(ctx context.Context, nodeID storj.NodeID, rep // the previous VettedAt time for a node. // Due to inconsistencies in the precision of time.Now() on different platforms and databases, the time comparison // for the VettedAt status is done using time values that are truncated to second precision. - if hasReputationChanged(*statusUpdate, reputation, now) { + changed, repChanges := hasReputationChanged(*statusUpdate, reputation, now) + if changed { reputationUpdate := &overlay.ReputationUpdate{ Disqualified: statusUpdate.Disqualified, DisqualificationReason: statusUpdate.DisqualificationReason, @@ -128,7 +130,7 @@ func (service *Service) ApplyAudit(ctx context.Context, nodeID storj.NodeID, rep OfflineSuspended: statusUpdate.OfflineSuspended, VettedAt: statusUpdate.VettedAt, } - err = service.overlay.UpdateReputation(ctx, nodeID, *reputationUpdate) + err = service.overlay.UpdateReputation(ctx, nodeID, reputation.Email, *reputationUpdate, repChanges) if err != nil { return err } @@ -184,7 +186,7 @@ func (service *Service) TestSuspendNodeUnknownAudit(ctx context.Context, nodeID if n.DisqualificationReason != nil { update.DisqualificationReason = *n.DisqualificationReason } - return service.overlay.UpdateReputation(ctx, nodeID, update) + return service.overlay.UpdateReputation(ctx, nodeID, "", update, []nodeevents.Type{nodeevents.UnknownAuditSuspended}) } // TestDisqualifyNode disqualifies a storage node. @@ -220,7 +222,7 @@ func (service *Service) TestUnsuspendNodeUnknownAudit(ctx context.Context, nodeI if n.DisqualificationReason != nil { update.DisqualificationReason = *n.DisqualificationReason } - return service.overlay.UpdateReputation(ctx, nodeID, update) + return service.overlay.UpdateReputation(ctx, nodeID, "", update, []nodeevents.Type{nodeevents.UnknownAuditUnsuspended}) } // TestFlushAllNodeInfo flushes any and all cached information about all @@ -247,19 +249,35 @@ func (service *Service) Close() error { return nil } // hasReputationChanged determines if the current node reputation is different from the newly updated reputation. This // function will only consider the Disqualified, UnknownAudiSuspended and OfflineSuspended statuses for changes. -func hasReputationChanged(updated Info, current overlay.ReputationStatus, now time.Time) bool { - if statusChanged(current.Disqualified, updated.Disqualified) || - statusChanged(current.UnknownAuditSuspended, updated.UnknownAuditSuspended) || - statusChanged(current.OfflineSuspended, updated.OfflineSuspended) { - return true +func hasReputationChanged(updated Info, current overlay.ReputationStatus, now time.Time) (changed bool, repChanges []nodeevents.Type) { + if statusChanged(current.Disqualified, updated.Disqualified) { + repChanges = append(repChanges, nodeevents.Disqualified) + changed = true } + if statusChanged(current.UnknownAuditSuspended, updated.UnknownAuditSuspended) { + if updated.UnknownAuditSuspended != nil { + repChanges = append(repChanges, nodeevents.UnknownAuditSuspended) + } else { + repChanges = append(repChanges, nodeevents.UnknownAuditUnsuspended) + } + changed = true + } + if statusChanged(current.OfflineSuspended, updated.OfflineSuspended) { + if updated.OfflineSuspended != nil { + repChanges = append(repChanges, nodeevents.OfflineSuspended) + } else { + repChanges = append(repChanges, nodeevents.OfflineUnsuspended) + } + changed = true + } + // check for newly vetted nodes. // Due to inconsistencies in the precision of time.Now() on different platforms and databases, the time comparison // for the VettedAt status is done using time values that are truncated to second precision. if updated.VettedAt != nil && updated.VettedAt.Truncate(time.Second).Equal(now.Truncate(time.Second)) { - return true + changed = true } - return false + return changed, repChanges } // statusChanged determines if the two given statuses are different.