satellite/satellitedb: improve Update query for reputation

Change-Id: Iee140f726cd05c34028c7b532e1f855e2473ddbc
This commit is contained in:
Yingrong Zhao 2021-08-03 22:01:19 -04:00 committed by Yingrong Zhao
parent 6b153192a3
commit c074a5666b
7 changed files with 400 additions and 64 deletions

View File

@ -15,6 +15,7 @@ import (
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
"storj.io/storj/satellite/internalpb"
"storj.io/storj/satellite/reputation"
)
func TestAuditHistoryBasic(t *testing.T) {
@ -40,13 +41,18 @@ func TestAuditHistoryBasic(t *testing.T) {
newHistory := &internalpb.AuditHistory{}
historyBytes, err := pb.Marshal(newHistory)
require.NoError(t, err)
updateReq := reputation.UpdateRequest{
AuditOutcome: reputation.AuditOffline,
AuditHistory: config,
}
// online score should be 1 until the first window is finished
res, err := db.UpdateAuditHistory(ctx, historyBytes, currentWindow.Add(2*time.Minute), false, config)
res, err := db.UpdateAuditHistory(ctx, historyBytes, updateReq, currentWindow.Add(2*time.Minute))
require.NoError(t, err)
require.EqualValues(t, 1, res.NewScore)
require.False(t, res.TrackingPeriodFull)
res, err = db.UpdateAuditHistory(ctx, res.History, currentWindow.Add(20*time.Minute), true, config)
updateReq.AuditOutcome = reputation.AuditSuccess
res, err = db.UpdateAuditHistory(ctx, res.History, updateReq, currentWindow.Add(20*time.Minute))
require.NoError(t, err)
require.EqualValues(t, 1, res.NewScore)
require.False(t, res.TrackingPeriodFull)
@ -55,12 +61,14 @@ func TestAuditHistoryBasic(t *testing.T) {
currentWindow = currentWindow.Add(time.Hour)
// online score should be now be 0.5 since the first window is complete with one online audit and one offline audit
res, err = db.UpdateAuditHistory(ctx, res.History, currentWindow.Add(2*time.Minute), false, config)
updateReq.AuditOutcome = reputation.AuditOffline
res, err = db.UpdateAuditHistory(ctx, res.History, updateReq, currentWindow.Add(2*time.Minute))
require.NoError(t, err)
require.EqualValues(t, 0.5, res.NewScore)
require.False(t, res.TrackingPeriodFull)
res, err = db.UpdateAuditHistory(ctx, res.History, currentWindow.Add(20*time.Minute), true, config)
updateReq.AuditOutcome = reputation.AuditSuccess
res, err = db.UpdateAuditHistory(ctx, res.History, updateReq, currentWindow.Add(20*time.Minute))
require.NoError(t, err)
require.EqualValues(t, 0.5, res.NewScore)
require.False(t, res.TrackingPeriodFull)
@ -69,17 +77,20 @@ func TestAuditHistoryBasic(t *testing.T) {
currentWindow = currentWindow.Add(time.Hour)
// try to add an audit for an old window, expect error
_, err = db.UpdateAuditHistory(ctx, res.History, startingWindow, true, config)
updateReq.AuditOutcome = reputation.AuditSuccess
_, err = db.UpdateAuditHistory(ctx, res.History, updateReq, startingWindow)
require.Error(t, err)
// add another online audit for the latest window; score should still be 0.5
res, err = db.UpdateAuditHistory(ctx, res.History, currentWindow, true, config)
updateReq.AuditOutcome = reputation.AuditSuccess
res, err = db.UpdateAuditHistory(ctx, res.History, updateReq, currentWindow)
require.NoError(t, err)
require.EqualValues(t, 0.5, res.NewScore)
// now that we have two full windows other than the current one, tracking period should be considered full.
require.True(t, res.TrackingPeriodFull)
// add another online audit for the latest window; score should still be 0.5
res, err = db.UpdateAuditHistory(ctx, res.History, currentWindow.Add(45*time.Minute), true, config)
updateReq.AuditOutcome = reputation.AuditSuccess
res, err = db.UpdateAuditHistory(ctx, res.History, updateReq, currentWindow.Add(45*time.Minute))
require.NoError(t, err)
require.EqualValues(t, 0.5, res.NewScore)
require.True(t, res.TrackingPeriodFull)
@ -90,7 +101,8 @@ func TestAuditHistoryBasic(t *testing.T) {
// window gets included in the tracking period, and the earliest 0.5 window gets dropped.
expectedScore := (0.5*float64(windowsInTrackingPeriod-1) + 1) / float64(windowsInTrackingPeriod)
// add online audit for next window; score should now be expectedScore
res, err = db.UpdateAuditHistory(ctx, res.History, currentWindow.Add(time.Minute), true, config)
updateReq.AuditOutcome = reputation.AuditSuccess
res, err = db.UpdateAuditHistory(ctx, res.History, updateReq, currentWindow.Add(time.Minute))
require.NoError(t, err)
require.EqualValues(t, expectedScore, res.NewScore)
require.True(t, res.TrackingPeriodFull)

View File

@ -26,7 +26,7 @@ type DB interface {
// SuspendNodeUnknownAudit suspends a storage node for unknown audits.
SuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID, suspendedAt time.Time) (err error)
// UpdateAuditHistory updates a node's audit history
UpdateAuditHistory(ctx context.Context, oldHistory []byte, auditTime time.Time, online bool, config AuditHistoryConfig) (res *UpdateAuditHistoryResponse, err error)
UpdateAuditHistory(ctx context.Context, oldHistory []byte, updateReq UpdateRequest, auditTime time.Time) (res *UpdateAuditHistoryResponse, err error)
}
// Info contains all reputation data to be stored in DB.

View File

@ -104,10 +104,6 @@ func TestAuditSuspendWithUpdateStats(t *testing.T) {
func TestAuditSuspendFailedAudit(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
nodeID := planet.StorageNodes[0].ID()
oc := planet.Satellites[0].Overlay.DB

View File

@ -77,9 +77,13 @@ func addAudit(a *internalpb.AuditHistory, auditTime time.Time, online bool, conf
return nil
}
func (reputations *reputations) UpdateAuditHistory(ctx context.Context, oldHistory []byte, auditTime time.Time, online bool, config reputation.AuditHistoryConfig) (res *reputation.UpdateAuditHistoryResponse, err error) {
func (reputations *reputations) UpdateAuditHistory(ctx context.Context, oldHistory []byte, updateReq reputation.UpdateRequest, auditTime time.Time) (res *reputation.UpdateAuditHistoryResponse, err error) {
defer mon.Task()(&ctx)(&err)
config := updateReq.AuditHistory
online := updateReq.AuditOutcome != reputation.AuditOffline
res = &reputation.UpdateAuditHistoryResponse{
NewScore: 1,
TrackingPeriodFull: false,
@ -105,7 +109,7 @@ func (reputations *reputations) UpdateAuditHistory(ctx context.Context, oldHisto
windowsPerTrackingPeriod := int(config.TrackingPeriod.Seconds() / config.WindowSize.Seconds())
res.TrackingPeriodFull = len(history.Windows)-1 >= windowsPerTrackingPeriod
res.NewScore = history.Score
return res, Error.Wrap(err)
return res, nil
}
// GetAuditHistory gets a node's audit history.

View File

@ -253,6 +253,10 @@ model reputation (
create reputation ()
update reputation ( where reputation.id = ? )
update reputation (
where reputation.id = ?
where reputation.audit_history = ?
)
update reputation (
where reputation.id = ?
noreturn

View File

@ -14753,6 +14753,119 @@ func (obj *pgxImpl) Update_Reputation_By_Id(ctx context.Context,
return reputation, nil
}
func (obj *pgxImpl) Update_Reputation_By_Id_And_AuditHistory(ctx context.Context,
reputation_id Reputation_Id_Field,
reputation_audit_history Reputation_AuditHistory_Field,
update Reputation_Update_Fields) (
reputation *Reputation, err error) {
defer mon.Task()(&ctx)(&err)
var __sets = &__sqlbundle_Hole{}
var __embed_stmt = __sqlbundle_Literals{Join: "", SQLs: []__sqlbundle_SQL{__sqlbundle_Literal("UPDATE reputations SET "), __sets, __sqlbundle_Literal(" WHERE reputations.id = ? AND reputations.audit_history = ? RETURNING reputations.id, reputations.audit_success_count, reputations.total_audit_count, reputations.vetted_at, reputations.created_at, reputations.updated_at, reputations.contained, reputations.disqualified, reputations.suspended, reputations.unknown_audit_suspended, reputations.offline_suspended, reputations.under_review, reputations.online_score, reputations.audit_history, reputations.audit_reputation_alpha, reputations.audit_reputation_beta, reputations.unknown_audit_reputation_alpha, reputations.unknown_audit_reputation_beta")}}
__sets_sql := __sqlbundle_Literals{Join: ", "}
var __values []interface{}
var __args []interface{}
if update.AuditSuccessCount._set {
__values = append(__values, update.AuditSuccessCount.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("audit_success_count = ?"))
}
if update.TotalAuditCount._set {
__values = append(__values, update.TotalAuditCount.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("total_audit_count = ?"))
}
if update.VettedAt._set {
__values = append(__values, update.VettedAt.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("vetted_at = ?"))
}
if update.Contained._set {
__values = append(__values, update.Contained.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("contained = ?"))
}
if update.Disqualified._set {
__values = append(__values, update.Disqualified.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("disqualified = ?"))
}
if update.Suspended._set {
__values = append(__values, update.Suspended.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("suspended = ?"))
}
if update.UnknownAuditSuspended._set {
__values = append(__values, update.UnknownAuditSuspended.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("unknown_audit_suspended = ?"))
}
if update.OfflineSuspended._set {
__values = append(__values, update.OfflineSuspended.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("offline_suspended = ?"))
}
if update.UnderReview._set {
__values = append(__values, update.UnderReview.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("under_review = ?"))
}
if update.OnlineScore._set {
__values = append(__values, update.OnlineScore.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("online_score = ?"))
}
if update.AuditHistory._set {
__values = append(__values, update.AuditHistory.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("audit_history = ?"))
}
if update.AuditReputationAlpha._set {
__values = append(__values, update.AuditReputationAlpha.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("audit_reputation_alpha = ?"))
}
if update.AuditReputationBeta._set {
__values = append(__values, update.AuditReputationBeta.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("audit_reputation_beta = ?"))
}
if update.UnknownAuditReputationAlpha._set {
__values = append(__values, update.UnknownAuditReputationAlpha.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("unknown_audit_reputation_alpha = ?"))
}
if update.UnknownAuditReputationBeta._set {
__values = append(__values, update.UnknownAuditReputationBeta.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("unknown_audit_reputation_beta = ?"))
}
__now := obj.db.Hooks.Now().UTC()
__values = append(__values, __now)
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("updated_at = ?"))
__args = append(__args, reputation_id.value(), reputation_audit_history.value())
__values = append(__values, __args...)
__sets.SQL = __sets_sql
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __values...)
reputation = &Reputation{}
err = obj.queryRowContext(ctx, __stmt, __values...).Scan(&reputation.Id, &reputation.AuditSuccessCount, &reputation.TotalAuditCount, &reputation.VettedAt, &reputation.CreatedAt, &reputation.UpdatedAt, &reputation.Contained, &reputation.Disqualified, &reputation.Suspended, &reputation.UnknownAuditSuspended, &reputation.OfflineSuspended, &reputation.UnderReview, &reputation.OnlineScore, &reputation.AuditHistory, &reputation.AuditReputationAlpha, &reputation.AuditReputationBeta, &reputation.UnknownAuditReputationAlpha, &reputation.UnknownAuditReputationBeta)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, obj.makeErr(err)
}
return reputation, nil
}
func (obj *pgxImpl) UpdateNoReturn_Reputation_By_Id(ctx context.Context,
reputation_id Reputation_Id_Field,
update Reputation_Update_Fields) (
@ -20830,6 +20943,119 @@ func (obj *pgxcockroachImpl) Update_Reputation_By_Id(ctx context.Context,
return reputation, nil
}
func (obj *pgxcockroachImpl) Update_Reputation_By_Id_And_AuditHistory(ctx context.Context,
reputation_id Reputation_Id_Field,
reputation_audit_history Reputation_AuditHistory_Field,
update Reputation_Update_Fields) (
reputation *Reputation, err error) {
defer mon.Task()(&ctx)(&err)
var __sets = &__sqlbundle_Hole{}
var __embed_stmt = __sqlbundle_Literals{Join: "", SQLs: []__sqlbundle_SQL{__sqlbundle_Literal("UPDATE reputations SET "), __sets, __sqlbundle_Literal(" WHERE reputations.id = ? AND reputations.audit_history = ? RETURNING reputations.id, reputations.audit_success_count, reputations.total_audit_count, reputations.vetted_at, reputations.created_at, reputations.updated_at, reputations.contained, reputations.disqualified, reputations.suspended, reputations.unknown_audit_suspended, reputations.offline_suspended, reputations.under_review, reputations.online_score, reputations.audit_history, reputations.audit_reputation_alpha, reputations.audit_reputation_beta, reputations.unknown_audit_reputation_alpha, reputations.unknown_audit_reputation_beta")}}
__sets_sql := __sqlbundle_Literals{Join: ", "}
var __values []interface{}
var __args []interface{}
if update.AuditSuccessCount._set {
__values = append(__values, update.AuditSuccessCount.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("audit_success_count = ?"))
}
if update.TotalAuditCount._set {
__values = append(__values, update.TotalAuditCount.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("total_audit_count = ?"))
}
if update.VettedAt._set {
__values = append(__values, update.VettedAt.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("vetted_at = ?"))
}
if update.Contained._set {
__values = append(__values, update.Contained.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("contained = ?"))
}
if update.Disqualified._set {
__values = append(__values, update.Disqualified.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("disqualified = ?"))
}
if update.Suspended._set {
__values = append(__values, update.Suspended.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("suspended = ?"))
}
if update.UnknownAuditSuspended._set {
__values = append(__values, update.UnknownAuditSuspended.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("unknown_audit_suspended = ?"))
}
if update.OfflineSuspended._set {
__values = append(__values, update.OfflineSuspended.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("offline_suspended = ?"))
}
if update.UnderReview._set {
__values = append(__values, update.UnderReview.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("under_review = ?"))
}
if update.OnlineScore._set {
__values = append(__values, update.OnlineScore.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("online_score = ?"))
}
if update.AuditHistory._set {
__values = append(__values, update.AuditHistory.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("audit_history = ?"))
}
if update.AuditReputationAlpha._set {
__values = append(__values, update.AuditReputationAlpha.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("audit_reputation_alpha = ?"))
}
if update.AuditReputationBeta._set {
__values = append(__values, update.AuditReputationBeta.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("audit_reputation_beta = ?"))
}
if update.UnknownAuditReputationAlpha._set {
__values = append(__values, update.UnknownAuditReputationAlpha.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("unknown_audit_reputation_alpha = ?"))
}
if update.UnknownAuditReputationBeta._set {
__values = append(__values, update.UnknownAuditReputationBeta.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("unknown_audit_reputation_beta = ?"))
}
__now := obj.db.Hooks.Now().UTC()
__values = append(__values, __now)
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("updated_at = ?"))
__args = append(__args, reputation_id.value(), reputation_audit_history.value())
__values = append(__values, __args...)
__sets.SQL = __sets_sql
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __values...)
reputation = &Reputation{}
err = obj.queryRowContext(ctx, __stmt, __values...).Scan(&reputation.Id, &reputation.AuditSuccessCount, &reputation.TotalAuditCount, &reputation.VettedAt, &reputation.CreatedAt, &reputation.UpdatedAt, &reputation.Contained, &reputation.Disqualified, &reputation.Suspended, &reputation.UnknownAuditSuspended, &reputation.OfflineSuspended, &reputation.UnderReview, &reputation.OnlineScore, &reputation.AuditHistory, &reputation.AuditReputationAlpha, &reputation.AuditReputationBeta, &reputation.UnknownAuditReputationAlpha, &reputation.UnknownAuditReputationBeta)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, obj.makeErr(err)
}
return reputation, nil
}
func (obj *pgxcockroachImpl) UpdateNoReturn_Reputation_By_Id(ctx context.Context,
reputation_id Reputation_Id_Field,
update Reputation_Update_Fields) (
@ -24095,6 +24321,18 @@ func (rx *Rx) Update_Reputation_By_Id(ctx context.Context,
return tx.Update_Reputation_By_Id(ctx, reputation_id, update)
}
func (rx *Rx) Update_Reputation_By_Id_And_AuditHistory(ctx context.Context,
reputation_id Reputation_Id_Field,
reputation_audit_history Reputation_AuditHistory_Field,
update Reputation_Update_Fields) (
reputation *Reputation, err error) {
var tx *Tx
if tx, err = rx.getTx(ctx); err != nil {
return
}
return tx.Update_Reputation_By_Id_And_AuditHistory(ctx, reputation_id, reputation_audit_history, update)
}
func (rx *Rx) Update_StripecoinpaymentsInvoiceProjectRecord_By_Id(ctx context.Context,
stripecoinpayments_invoice_project_record_id StripecoinpaymentsInvoiceProjectRecord_Id_Field,
update StripecoinpaymentsInvoiceProjectRecord_Update_Fields) (
@ -24804,6 +25042,12 @@ type Methods interface {
update Reputation_Update_Fields) (
reputation *Reputation, err error)
Update_Reputation_By_Id_And_AuditHistory(ctx context.Context,
reputation_id Reputation_Id_Field,
reputation_audit_history Reputation_AuditHistory_Field,
update Reputation_Update_Fields) (
reputation *Reputation, err error)
Update_StripecoinpaymentsInvoiceProjectRecord_By_Id(ctx context.Context,
stripecoinpayments_invoice_project_record_id StripecoinpaymentsInvoiceProjectRecord_Id_Field,
update StripecoinpaymentsInvoiceProjectRecord_Update_Fields) (

View File

@ -26,79 +26,93 @@ type reputations struct {
db *satelliteDB
}
// Update updates a node's reputation stats.
// The update is done in a loop to handle concurrent update calls and to avoid
// the need for a explicit transaction.
// There are two main steps go into the update process:
// 1. Get existing row for the node
// 2. Depends on the result of the first step,
// a. if existing row is returned, do compare-and-swap.
// b. if no row found, insert a new row.
func (reputations *reputations) Update(ctx context.Context, updateReq reputation.UpdateRequest, now time.Time) (_ *overlay.ReputationStatus, changed bool, err error) {
defer mon.Task()(&ctx)(&err)
nodeID := updateReq.NodeID
var dbNode *dbx.Reputation
var oldStatus overlay.ReputationStatus
err = reputations.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) (err error) {
_, err = tx.Tx.ExecContext(ctx, "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
if err != nil {
return err
for {
// get existing reputation stats
dbNode, err := reputations.db.Get_Reputation_By_Id(ctx, dbx.Reputation_Id(updateReq.NodeID.Bytes()))
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return nil, false, Error.Wrap(err)
}
dbNode, err = tx.Get_Reputation_By_Id(ctx, dbx.Reputation_Id(nodeID.Bytes()))
if errors.Is(err, sql.ErrNoRows) {
// if this is a new node, we will insert a new entry into the table
if dbNode == nil {
historyBytes, err := pb.Marshal(&internalpb.AuditHistory{})
if err != nil {
return err
return nil, false, Error.Wrap(err)
}
_, err = tx.Tx.ExecContext(ctx, `
INSERT INTO reputations (id, audit_history)
VALUES ($1, $2);
`, nodeID.Bytes(), historyBytes)
auditHistoryResponse, err := reputations.UpdateAuditHistory(ctx, historyBytes, updateReq, now)
if err != nil {
return err
return nil, false, Error.Wrap(err)
}
dbNode, err = tx.Get_Reputation_By_Id(ctx, dbx.Reputation_Id(nodeID.Bytes()))
if err != nil {
return err
// set default reputation stats for new node
newNode := dbx.Reputation{
Id: updateReq.NodeID.Bytes(),
UnknownAuditReputationAlpha: 1,
AuditReputationAlpha: 1,
OnlineScore: 1,
AuditHistory: auditHistoryResponse.History,
}
} else if err != nil {
return err
createFields := reputations.populateCreateFields(&newNode, updateReq, auditHistoryResponse, now)
stats, err := reputations.db.Create_Reputation(ctx, dbx.Reputation_Id(updateReq.NodeID.Bytes()), dbx.Reputation_AuditHistory(auditHistoryResponse.History), createFields)
if err != nil {
// if node has been added into the table during a concurrent
// Update call happened between Get and Insert, we will try again so the audit is recorded
// correctly
if dbx.IsConstraintError(err) {
mon.Event("reputations_update_query_retry_create")
continue
}
return nil, false, Error.Wrap(err)
}
rep := getNodeStatus(stats)
return &rep, !rep.Equal(overlay.ReputationStatus{}), nil
}
// do not update reputation if node is disqualified
if dbNode.Disqualified != nil {
return nil
return nil, false, nil
}
oldStatus = overlay.ReputationStatus{
Contained: dbNode.Contained,
Disqualified: dbNode.Disqualified,
UnknownAuditSuspended: dbNode.UnknownAuditSuspended,
OfflineSuspended: dbNode.OfflineSuspended,
VettedAt: dbNode.VettedAt,
}
oldStats := getNodeStatus(dbNode)
isUp := updateReq.AuditOutcome != reputation.AuditOffline
auditHistoryResponse, err := reputations.UpdateAuditHistory(ctx, dbNode.AuditHistory, now, isUp, updateReq.AuditHistory)
auditHistoryResponse, err := reputations.UpdateAuditHistory(ctx, dbNode.AuditHistory, updateReq, now)
if err != nil {
return err
return nil, false, Error.Wrap(err)
}
updateFields := reputations.populateUpdateFields(dbNode, updateReq, auditHistoryResponse, now)
dbNode, err = tx.Update_Reputation_By_Id(ctx, dbx.Reputation_Id(nodeID.Bytes()), updateFields)
oldAuditHistory := dbx.Reputation_AuditHistory(dbNode.AuditHistory)
dbNode, err = reputations.db.Update_Reputation_By_Id_And_AuditHistory(ctx, dbx.Reputation_Id(updateReq.NodeID.Bytes()), oldAuditHistory, updateFields)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return nil, false, Error.Wrap(err)
}
return err
})
if err != nil {
return nil, false, Error.Wrap(err)
// if update failed due to concurrent audit_history updates, we will try
// again to get the latest data and update it
if dbNode == nil {
mon.Event("reputations_update_query_retry_update")
continue
}
newStats := getNodeStatus(dbNode)
return &newStats, !newStats.Equal(oldStats), nil
}
newStatus := overlay.ReputationStatus{
Contained: dbNode.Contained,
Disqualified: dbNode.Disqualified,
UnknownAuditSuspended: dbNode.UnknownAuditSuspended,
OfflineSuspended: dbNode.OfflineSuspended,
VettedAt: dbNode.VettedAt,
}
return getNodeStatus(dbNode), !oldStatus.Equal(newStatus), nil
}
// SetNodeStatus updates node reputation status.
@ -269,8 +283,70 @@ func (reputations *reputations) UnsuspendNodeUnknownAudit(ctx context.Context, n
return Error.Wrap(err)
}
func (reputations *reputations) populateUpdateFields(dbNode *dbx.Reputation, updateReq reputation.UpdateRequest, auditHistoryResponse *reputation.UpdateAuditHistoryResponse, now time.Time) dbx.Reputation_Update_Fields {
func (reputations *reputations) populateCreateFields(dbNode *dbx.Reputation, updateReq reputation.UpdateRequest, auditHistoryResponse *reputation.UpdateAuditHistoryResponse, now time.Time) dbx.Reputation_Create_Fields {
update := reputations.populateUpdateNodeStats(dbNode, updateReq, auditHistoryResponse, now)
createFields := dbx.Reputation_Create_Fields{}
if update.VettedAt.set {
createFields.VettedAt = dbx.Reputation_VettedAt(update.VettedAt.value)
}
if update.TotalAuditCount.set {
createFields.TotalAuditCount = dbx.Reputation_TotalAuditCount(update.TotalAuditCount.value)
}
if update.AuditReputationAlpha.set {
createFields.AuditReputationAlpha = dbx.Reputation_AuditReputationAlpha(update.AuditReputationAlpha.value)
}
if update.AuditReputationBeta.set {
createFields.AuditReputationBeta = dbx.Reputation_AuditReputationBeta(update.AuditReputationBeta.value)
}
if update.Disqualified.set {
createFields.Disqualified = dbx.Reputation_Disqualified(update.Disqualified.value)
}
if update.UnknownAuditReputationAlpha.set {
createFields.UnknownAuditReputationAlpha = dbx.Reputation_UnknownAuditReputationAlpha(update.UnknownAuditReputationAlpha.value)
}
if update.UnknownAuditReputationBeta.set {
createFields.UnknownAuditReputationBeta = dbx.Reputation_UnknownAuditReputationBeta(update.UnknownAuditReputationBeta.value)
}
if update.UnknownAuditSuspended.set {
if update.UnknownAuditSuspended.isNil {
createFields.UnknownAuditSuspended = dbx.Reputation_UnknownAuditSuspended_Null()
} else {
createFields.UnknownAuditSuspended = dbx.Reputation_UnknownAuditSuspended(update.UnknownAuditSuspended.value)
}
}
if update.AuditSuccessCount.set {
createFields.AuditSuccessCount = dbx.Reputation_AuditSuccessCount(update.AuditSuccessCount.value)
}
if update.Contained.set {
createFields.Contained = dbx.Reputation_Contained(update.Contained.value)
}
if updateReq.AuditOutcome == reputation.AuditSuccess {
createFields.AuditSuccessCount = dbx.Reputation_AuditSuccessCount(dbNode.AuditSuccessCount + 1)
}
if update.OnlineScore.set {
createFields.OnlineScore = dbx.Reputation_OnlineScore(update.OnlineScore.value)
}
if update.OfflineSuspended.set {
if update.OfflineSuspended.isNil {
createFields.OfflineSuspended = dbx.Reputation_OfflineSuspended_Null()
} else {
createFields.OfflineSuspended = dbx.Reputation_OfflineSuspended(update.OfflineSuspended.value)
}
}
if update.OfflineUnderReview.set {
if update.OfflineUnderReview.isNil {
createFields.UnderReview = dbx.Reputation_UnderReview_Null()
} else {
createFields.UnderReview = dbx.Reputation_UnderReview(update.OfflineUnderReview.value)
}
}
return createFields
}
func (reputations *reputations) populateUpdateFields(dbNode *dbx.Reputation, updateReq reputation.UpdateRequest, auditHistoryResponse *reputation.UpdateAuditHistoryResponse, now time.Time) dbx.Reputation_Update_Fields {
update := reputations.populateUpdateNodeStats(dbNode, updateReq, auditHistoryResponse, now)
updateFields := dbx.Reputation_Update_Fields{
@ -563,8 +639,8 @@ type updateNodeStats struct {
OnlineScore float64Field
}
func getNodeStatus(dbNode *dbx.Reputation) *overlay.ReputationStatus {
return &overlay.ReputationStatus{
func getNodeStatus(dbNode *dbx.Reputation) overlay.ReputationStatus {
return overlay.ReputationStatus{
Contained: dbNode.Contained,
VettedAt: dbNode.VettedAt,
Disqualified: dbNode.Disqualified,