storj/satellite/reputation/service.go
Moby von Briesen 395426977f satellite/reputation: Alter logic for hasReputationChanged
This change modifies "reputation change trigger" logic for dq and
suspension in order to:
* count only nil -> not-nil and not-nil -> nil as "reputation changes"
* do not count the timestamp updating (not-nil -> not-nil) as a
  reputation change

Change-Id: Ie57c97e80e5f3b368e2f2dc004e680a2f9416d1c
2023-02-03 21:17:57 -05:00

326 lines
11 KiB
Go

// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package reputation
import (
"context"
"time"
"go.uber.org/zap"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/storj/satellite/nodeevents"
"storj.io/storj/satellite/overlay"
)
// DB is an interface for storing reputation data.
type DB interface {
Update(ctx context.Context, request UpdateRequest, now time.Time) (_ *Info, err error)
Get(ctx context.Context, nodeID storj.NodeID) (*Info, error)
// ApplyUpdates applies multiple updates (defined by the updates
// parameter) to a node's reputations record.
ApplyUpdates(ctx context.Context, nodeID storj.NodeID, updates Mutations, reputationConfig Config, now time.Time) (_ *Info, err error)
// UnsuspendNodeUnknownAudit unsuspends a storage node for unknown audits.
UnsuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID) (err error)
// DisqualifyNode disqualifies a storage node.
DisqualifyNode(ctx context.Context, nodeID storj.NodeID, disqualifiedAt time.Time, reason overlay.DisqualificationReason) (err error)
// SuspendNodeUnknownAudit suspends a storage node for unknown audits.
SuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID, suspendedAt time.Time) (err error)
}
// Info contains all reputation data to be stored in DB.
type Info struct {
AuditSuccessCount int64
TotalAuditCount int64
VettedAt *time.Time
UnknownAuditSuspended *time.Time
OfflineSuspended *time.Time
UnderReview *time.Time
Disqualified *time.Time
DisqualificationReason overlay.DisqualificationReason
OnlineScore float64
AuditHistory *pb.AuditHistory
AuditReputationAlpha float64
AuditReputationBeta float64
UnknownAuditReputationAlpha float64
UnknownAuditReputationBeta float64
}
// Copy creates a deep copy of the Info object.
func (i *Info) Copy() *Info {
i2 := *i
i2.VettedAt = cloneTime(i.VettedAt)
i2.UnknownAuditSuspended = cloneTime(i.UnknownAuditSuspended)
i2.OfflineSuspended = cloneTime(i.OfflineSuspended)
i2.UnderReview = cloneTime(i.UnderReview)
i2.Disqualified = cloneTime(i.Disqualified)
if i.AuditHistory != nil {
i2.AuditHistory = &pb.AuditHistory{
Score: i.AuditHistory.Score,
Windows: make([]*pb.AuditWindow, len(i.AuditHistory.Windows)),
}
for i, window := range i.AuditHistory.Windows {
w := *window
i2.AuditHistory.Windows[i] = &w
}
}
return &i2
}
// Mutations represents changes which should be made to a particular node's
// reputation, in terms of counts and/or timestamps of events which have
// occurred. A Mutations record can be applied to a reputations row without
// prior knowledge of that row's contents.
type Mutations struct {
PositiveResults int
FailureResults int
UnknownResults int
OfflineResults int
OnlineHistory *pb.AuditHistory
}
// Service handles storing node reputation data and updating
// the overlay cache when a node's status changes.
type Service struct {
log *zap.Logger
overlay *overlay.Service
db DB
config Config
}
// NewService creates a new reputation service.
func NewService(log *zap.Logger, overlay *overlay.Service, db DB, config Config) *Service {
return &Service{
log: log,
overlay: overlay,
db: db,
config: config,
}
}
// ApplyAudit receives an audit result and applies it to the relevant node in DB.
func (service *Service) ApplyAudit(ctx context.Context, nodeID storj.NodeID, reputation overlay.ReputationStatus, result AuditType) (err error) {
defer mon.Task()(&ctx)(&err)
now := time.Now()
statusUpdate, err := service.db.Update(ctx, UpdateRequest{
NodeID: nodeID,
AuditOutcome: result,
Config: service.config,
}, now)
if err != nil {
return err
}
// There are some cases where the caller did not get updated reputation-status information.
// (Usually this means the node was offline and we skipped creating an order limit for it.)
if reputation.Email == "" {
dossier, err := service.overlay.Get(ctx, nodeID)
if err != nil {
return err
}
reputation = dossier.Reputation.Status
}
// only update node if its health status has changed, or it's a newly vetted
// node.
// this prevents the need to require caller of ApplyAudit() to always know
// the previous VettedAt time for a node.
// Due to inconsistencies in the precision of time.Now() on different platforms and databases, the time comparison
// for the VettedAt status is done using time values that are truncated to second precision.
changed, repChanges := hasReputationChanged(*statusUpdate, reputation, now)
if changed {
reputationUpdate := &overlay.ReputationUpdate{
Disqualified: statusUpdate.Disqualified,
DisqualificationReason: statusUpdate.DisqualificationReason,
UnknownAuditSuspended: statusUpdate.UnknownAuditSuspended,
OfflineSuspended: statusUpdate.OfflineSuspended,
VettedAt: statusUpdate.VettedAt,
}
err = service.overlay.UpdateReputation(ctx, nodeID, reputation.Email, *reputationUpdate, repChanges)
if err != nil {
return err
}
}
return err
}
// Get returns a node's reputation info from DB.
// If a node is not found in the DB, default reputation information is returned.
func (service *Service) Get(ctx context.Context, nodeID storj.NodeID) (info *Info, err error) {
defer mon.Task()(&ctx)(&err)
info, err = service.db.Get(ctx, nodeID)
if err != nil {
if ErrNodeNotFound.Has(err) {
// if there is no audit reputation for the node, that's fine and we
// return default reputation values
info = &Info{
UnknownAuditReputationAlpha: 1,
AuditReputationAlpha: service.config.InitialAlpha,
AuditReputationBeta: service.config.InitialBeta,
OnlineScore: 1,
}
return info, nil
}
return nil, Error.Wrap(err)
}
return info, nil
}
// TestSuspendNodeUnknownAudit suspends a storage node for unknown audits.
func (service *Service) TestSuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID, suspendedAt time.Time) (err error) {
err = service.db.SuspendNodeUnknownAudit(ctx, nodeID, suspendedAt)
if err != nil {
return err
}
n, err := service.overlay.Get(ctx, nodeID)
if err != nil {
return err
}
update := overlay.ReputationUpdate{
Disqualified: n.Disqualified,
UnknownAuditSuspended: &suspendedAt,
OfflineSuspended: n.OfflineSuspended,
VettedAt: n.Reputation.Status.VettedAt,
}
if n.DisqualificationReason != nil {
update.DisqualificationReason = *n.DisqualificationReason
}
return service.overlay.UpdateReputation(ctx, nodeID, "", update, []nodeevents.Type{nodeevents.UnknownAuditSuspended})
}
// TestDisqualifyNode disqualifies a storage node.
func (service *Service) TestDisqualifyNode(ctx context.Context, nodeID storj.NodeID, reason overlay.DisqualificationReason) (err error) {
disqualifiedAt := time.Now()
err = service.db.DisqualifyNode(ctx, nodeID, disqualifiedAt, reason)
if err != nil {
return err
}
return service.overlay.DisqualifyNode(ctx, nodeID, reason)
}
// TestUnsuspendNodeUnknownAudit unsuspends a storage node for unknown audits.
func (service *Service) TestUnsuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID) (err error) {
err = service.db.UnsuspendNodeUnknownAudit(ctx, nodeID)
if err != nil {
return err
}
n, err := service.overlay.Get(ctx, nodeID)
if err != nil {
return err
}
update := overlay.ReputationUpdate{
Disqualified: n.Disqualified,
UnknownAuditSuspended: nil,
OfflineSuspended: n.OfflineSuspended,
VettedAt: n.Reputation.Status.VettedAt,
}
if n.DisqualificationReason != nil {
update.DisqualificationReason = *n.DisqualificationReason
}
return service.overlay.UpdateReputation(ctx, nodeID, "", update, []nodeevents.Type{nodeevents.UnknownAuditUnsuspended})
}
// TestFlushAllNodeInfo flushes any and all cached information about all
// nodes to the backing store, if the attached reputationDB does any caching
// at all.
func (service *Service) TestFlushAllNodeInfo(ctx context.Context) (err error) {
if db, ok := service.db.(*CachingDB); ok {
return db.FlushAll(ctx)
}
return nil
}
// FlushNodeInfo flushes any cached information about the specified node to
// the backing store, if the attached reputationDB does any caching at all.
func (service *Service) FlushNodeInfo(ctx context.Context, nodeID storj.NodeID) (err error) {
if db, ok := service.db.(*CachingDB); ok {
return db.RequestSync(ctx, nodeID)
}
return nil
}
// Close closes resources.
func (service *Service) Close() error { return nil }
// hasReputationChanged determines if the current node reputation is different from the newly updated reputation. This
// function will only consider the Disqualified, UnknownAudiSuspended and OfflineSuspended statuses for changes.
func hasReputationChanged(updated Info, current overlay.ReputationStatus, now time.Time) (changed bool, repChanges []nodeevents.Type) {
if statusChanged(current.Disqualified, updated.Disqualified) {
repChanges = append(repChanges, nodeevents.Disqualified)
changed = true
}
if statusChanged(current.UnknownAuditSuspended, updated.UnknownAuditSuspended) {
if updated.UnknownAuditSuspended != nil {
repChanges = append(repChanges, nodeevents.UnknownAuditSuspended)
} else {
repChanges = append(repChanges, nodeevents.UnknownAuditUnsuspended)
}
changed = true
}
if statusChanged(current.OfflineSuspended, updated.OfflineSuspended) {
if updated.OfflineSuspended != nil {
repChanges = append(repChanges, nodeevents.OfflineSuspended)
} else {
repChanges = append(repChanges, nodeevents.OfflineUnsuspended)
}
changed = true
}
// check for newly vetted nodes.
// Due to inconsistencies in the precision of time.Now() on different platforms and databases, the time comparison
// for the VettedAt status is done using time values that are truncated to second precision.
if updated.VettedAt != nil && updated.VettedAt.Truncate(time.Second).Equal(now.Truncate(time.Second)) {
changed = true
}
return changed, repChanges
}
// statusChanged determines if the two given statuses are different.
// a status is considered "different" if it went from nil to not-nil, or not-nil to nil.
// if not-nil and the only difference is the time, this is considered "not changed".
func statusChanged(s1, s2 *time.Time) bool {
return (s1 == nil && s2 != nil) || (s1 != nil && s2 == nil)
}
// UpdateRequestToMutations transforms an UpdateRequest into the equivalent
// Mutations structure, which can be used with ApplyUpdates.
func UpdateRequestToMutations(updateReq UpdateRequest, now time.Time) (Mutations, error) {
updates := Mutations{}
switch updateReq.AuditOutcome {
case AuditSuccess:
updates.PositiveResults = 1
case AuditFailure:
updates.FailureResults = 1
case AuditUnknown:
updates.UnknownResults = 1
case AuditOffline:
updates.OfflineResults = 1
}
updates.OnlineHistory = &pb.AuditHistory{}
err := AddAuditToHistory(updates.OnlineHistory, updateReq.AuditOutcome != AuditOffline, now, updateReq.Config.AuditHistory)
return updates, err
}
func cloneTime(t *time.Time) *time.Time {
if t == nil {
return nil
}
tCopy := *t
return &tCopy
}