satellite/reputation: return full Info from reputation update

This will let us update our reputation cache when writing through to the
db.

Since the information is already being fetched from the db and returned
to the application, the extra cpu load here should be minimal.

Refs: https://github.com/storj/storj/issues/4601

Change-Id: I2b8619f2c0d541893c7d3e7d33b1863b96775ebd
This commit is contained in:
paul cannon 2022-04-20 12:00:25 -05:00
parent f08732f30d
commit ac7752ac3c
4 changed files with 64 additions and 55 deletions

View File

@ -7,6 +7,7 @@ import (
"time"
"storj.io/common/pb"
"storj.io/storj/satellite/internalpb"
)
// AuditHistory represents a node's audit history for the most recent tracking period.
@ -44,3 +45,22 @@ func AuditHistoryToPB(auditHistory AuditHistory) (historyPB *pb.AuditHistory) {
}
return historyPB
}
// AuditHistoryFromBytes decodes an AuditHistory from the givenprotobuf-encoded
// internalpb.AuditHistory object.
func AuditHistoryFromBytes(encodedHistory []byte) (history AuditHistory, err error) {
var pbHistory internalpb.AuditHistory
if err := pb.Unmarshal(encodedHistory, &pbHistory); err != nil {
return AuditHistory{}, err
}
history.Score = pbHistory.Score
history.Windows = make([]*AuditWindow, len(pbHistory.Windows))
for i, window := range pbHistory.Windows {
history.Windows[i] = &AuditWindow{
TotalCount: window.TotalCount,
OnlineCount: window.OnlineCount,
WindowStart: window.WindowStart,
}
}
return history, nil
}

View File

@ -15,7 +15,7 @@ import (
// DB is an interface for storing reputation data.
type DB interface {
Update(ctx context.Context, request UpdateRequest, now time.Time) (_ *overlay.ReputationUpdate, err error)
Update(ctx context.Context, request UpdateRequest, now time.Time) (_ *Info, err error)
Get(ctx context.Context, nodeID storj.NodeID) (*Info, error)
// UnsuspendNodeUnknownAudit unsuspends a storage node for unknown audits.
@ -93,7 +93,14 @@ func (service *Service) ApplyAudit(ctx context.Context, nodeID storj.NodeID, rep
// Due to inconsistencies in the precision of time.Now() on different platforms and databases, the time comparison
// for the VettedAt status is done using time values that are truncated to second precision.
if hasReputationChanged(*statusUpdate, reputation, now) {
err = service.overlay.UpdateReputation(ctx, nodeID, *statusUpdate)
reputationUpdate := &overlay.ReputationUpdate{
Disqualified: statusUpdate.Disqualified,
DisqualificationReason: statusUpdate.DisqualificationReason,
UnknownAuditSuspended: statusUpdate.UnknownAuditSuspended,
OfflineSuspended: statusUpdate.OfflineSuspended,
VettedAt: statusUpdate.VettedAt,
}
err = service.overlay.UpdateReputation(ctx, nodeID, *reputationUpdate)
if err != nil {
return err
}
@ -164,7 +171,7 @@ func (service *Service) Close() error { return nil }
// hasReputationChanged determines if the current node reputation is different from the newly updated reputation. This
// function will only consider the Disqualified, UnknownAudiSuspended and OfflineSuspended statuses for changes.
func hasReputationChanged(updated overlay.ReputationUpdate, current overlay.ReputationStatus, now time.Time) bool {
func hasReputationChanged(updated Info, current overlay.ReputationStatus, now time.Time) bool {
if statusChanged(current.Disqualified, updated.Disqualified) ||
statusChanged(current.UnknownAuditSuspended, updated.UnknownAuditSuspended) ||
statusChanged(current.OfflineSuspended, updated.OfflineSuspended) {

View File

@ -106,23 +106,3 @@ func (reputations *reputations) UpdateAuditHistory(ctx context.Context, oldHisto
res.NewScore = history.Score
return res, nil
}
func auditHistoryFromPB(historyBytes []byte) (auditHistory *reputation.AuditHistory, err error) {
historyPB := &internalpb.AuditHistory{}
err = pb.Unmarshal(historyBytes, historyPB)
if err != nil {
return nil, err
}
history := &reputation.AuditHistory{
Score: historyPB.Score,
Windows: make([]*reputation.AuditWindow, len(historyPB.Windows)),
}
for i, window := range historyPB.Windows {
history.Windows[i] = &reputation.AuditWindow{
TotalCount: window.TotalCount,
OnlineCount: window.OnlineCount,
WindowStart: window.WindowStart,
}
}
return history, nil
}

View File

@ -34,7 +34,7 @@ type reputations struct {
// 2. Depends on the result of the first step,
// a. if existing row is returned, do compare-and-swap.
// b. if no row found, insert a new row.
func (reputations *reputations) Update(ctx context.Context, updateReq reputation.UpdateRequest, now time.Time) (_ *overlay.ReputationUpdate, err error) {
func (reputations *reputations) Update(ctx context.Context, updateReq reputation.UpdateRequest, now time.Time) (_ *reputation.Info, err error) {
defer mon.Task()(&ctx)(&err)
for {
@ -81,17 +81,11 @@ func (reputations *reputations) Update(ctx context.Context, updateReq reputation
return nil, Error.Wrap(err)
}
status := getNodeStatus(stats)
repUpdate := overlay.ReputationUpdate{
Disqualified: status.Disqualified,
UnknownAuditSuspended: status.UnknownAuditSuspended,
OfflineSuspended: status.OfflineSuspended,
VettedAt: status.VettedAt,
status, err := dbxToReputationInfo(stats)
if err != nil {
return nil, Error.Wrap(err)
}
if status.DisqualificationReason != nil {
repUpdate.DisqualificationReason = *status.DisqualificationReason
}
return &repUpdate, nil
return &status, nil
}
auditHistoryResponse, err := reputations.UpdateAuditHistory(ctx, dbNode.AuditHistory, updateReq, now)
@ -115,17 +109,11 @@ func (reputations *reputations) Update(ctx context.Context, updateReq reputation
continue
}
status := getNodeStatus(dbNode)
repUpdate := overlay.ReputationUpdate{
Disqualified: status.Disqualified,
UnknownAuditSuspended: status.UnknownAuditSuspended,
OfflineSuspended: status.OfflineSuspended,
VettedAt: status.VettedAt,
status, err := dbxToReputationInfo(dbNode)
if err != nil {
return nil, Error.Wrap(err)
}
if status.DisqualificationReason != nil {
repUpdate.DisqualificationReason = *status.DisqualificationReason
}
return &repUpdate, nil
return &status, nil
}
}
@ -138,7 +126,7 @@ func (reputations *reputations) Get(ctx context.Context, nodeID storj.NodeID) (*
return nil, Error.Wrap(err)
}
history, err := auditHistoryFromPB(res.AuditHistory)
history, err := reputation.AuditHistoryFromBytes(res.AuditHistory)
if err != nil {
return nil, Error.Wrap(err)
}
@ -157,7 +145,7 @@ func (reputations *reputations) Get(ctx context.Context, nodeID storj.NodeID) (*
Disqualified: res.Disqualified,
DisqualificationReason: dqReason,
OnlineScore: res.OnlineScore,
AuditHistory: *history,
AuditHistory: history,
AuditReputationAlpha: res.AuditReputationAlpha,
AuditReputationBeta: res.AuditReputationBeta,
UnknownAuditReputationAlpha: res.UnknownAuditReputationAlpha,
@ -636,18 +624,32 @@ type updateNodeStats struct {
OnlineScore float64Field
}
func getNodeStatus(dbNode *dbx.Reputation) overlay.ReputationStatus {
status := overlay.ReputationStatus{
VettedAt: dbNode.VettedAt,
Disqualified: dbNode.Disqualified,
UnknownAuditSuspended: dbNode.UnknownAuditSuspended,
OfflineSuspended: dbNode.OfflineSuspended,
func dbxToReputationInfo(dbNode *dbx.Reputation) (reputation.Info, error) {
info := reputation.Info{
AuditSuccessCount: dbNode.AuditSuccessCount,
TotalAuditCount: dbNode.TotalAuditCount,
VettedAt: dbNode.VettedAt,
UnknownAuditSuspended: dbNode.UnknownAuditSuspended,
OfflineSuspended: dbNode.OfflineSuspended,
UnderReview: dbNode.UnderReview,
Disqualified: dbNode.Disqualified,
OnlineScore: dbNode.OnlineScore,
AuditReputationAlpha: dbNode.AuditReputationAlpha,
AuditReputationBeta: dbNode.AuditReputationBeta,
UnknownAuditReputationAlpha: dbNode.UnknownAuditReputationAlpha,
UnknownAuditReputationBeta: dbNode.UnknownAuditReputationBeta,
}
if dbNode.DisqualificationReason != nil {
status.DisqualificationReason = new(overlay.DisqualificationReason)
*status.DisqualificationReason = overlay.DisqualificationReason(*dbNode.DisqualificationReason)
info.DisqualificationReason = overlay.DisqualificationReason(*dbNode.DisqualificationReason)
}
return status
if dbNode.AuditHistory != nil {
auditHistory, err := reputation.AuditHistoryFromBytes(dbNode.AuditHistory)
if err != nil {
return info, err
}
info.AuditHistory = auditHistory
}
return info, nil
}
// updateReputation uses the Beta distribution model to determine a node's reputation.