satellite/satellitedb: replace explicit transaction with dbx query for

UpdateReputation

Change-Id: I7c139ededea83d4b58107536c3a031c4f92d6eb4
This commit is contained in:
Yingrong Zhao 2021-08-04 13:33:47 -04:00 committed by Yingrong Zhao
parent 19852a767b
commit e4cc965c39
3 changed files with 466 additions and 32 deletions

View File

@ -175,6 +175,13 @@ update node (
noreturn noreturn
) )
update node (
where node.id = ?
where node.disqualified = null
where node.exit_finished_at = null
noreturn
)
// "Get" query; fails if node not found // "Get" query; fails if node not found
read one ( read one (
select node select node

View File

@ -14382,6 +14382,224 @@ func (obj *pgxImpl) UpdateNoReturn_Node_By_Id(ctx context.Context,
return nil return nil
} }
func (obj *pgxImpl) UpdateNoReturn_Node_By_Id_And_Disqualified_Is_Null_And_ExitFinishedAt_Is_Null(ctx context.Context,
node_id Node_Id_Field,
update Node_Update_Fields) (
err error) {
defer mon.Task()(&ctx)(&err)
var __sets = &__sqlbundle_Hole{}
var __embed_stmt = __sqlbundle_Literals{Join: "", SQLs: []__sqlbundle_SQL{__sqlbundle_Literal("UPDATE nodes SET "), __sets, __sqlbundle_Literal(" WHERE nodes.id = ? AND nodes.disqualified is NULL AND nodes.exit_finished_at is NULL")}}
__sets_sql := __sqlbundle_Literals{Join: ", "}
var __values []interface{}
var __args []interface{}
if update.Address._set {
__values = append(__values, update.Address.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("address = ?"))
}
if update.LastNet._set {
__values = append(__values, update.LastNet.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("last_net = ?"))
}
if update.LastIpPort._set {
__values = append(__values, update.LastIpPort.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("last_ip_port = ?"))
}
if update.Protocol._set {
__values = append(__values, update.Protocol.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("protocol = ?"))
}
if update.Type._set {
__values = append(__values, update.Type.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("type = ?"))
}
if update.Email._set {
__values = append(__values, update.Email.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("email = ?"))
}
if update.Wallet._set {
__values = append(__values, update.Wallet.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("wallet = ?"))
}
if update.WalletFeatures._set {
__values = append(__values, update.WalletFeatures.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("wallet_features = ?"))
}
if update.FreeDisk._set {
__values = append(__values, update.FreeDisk.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("free_disk = ?"))
}
if update.PieceCount._set {
__values = append(__values, update.PieceCount.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("piece_count = ?"))
}
if update.Major._set {
__values = append(__values, update.Major.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("major = ?"))
}
if update.Minor._set {
__values = append(__values, update.Minor.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("minor = ?"))
}
if update.Patch._set {
__values = append(__values, update.Patch.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("patch = ?"))
}
if update.Hash._set {
__values = append(__values, update.Hash.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("hash = ?"))
}
if update.Timestamp._set {
__values = append(__values, update.Timestamp.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("timestamp = ?"))
}
if update.Release._set {
__values = append(__values, update.Release.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("release = ?"))
}
if update.Latency90._set {
__values = append(__values, update.Latency90.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("latency_90 = ?"))
}
if update.AuditSuccessCount._set {
__values = append(__values, update.AuditSuccessCount.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("audit_success_count = ?"))
}
if update.TotalAuditCount._set {
__values = append(__values, update.TotalAuditCount.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("total_audit_count = ?"))
}
if update.VettedAt._set {
__values = append(__values, update.VettedAt.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("vetted_at = ?"))
}
if update.LastContactSuccess._set {
__values = append(__values, update.LastContactSuccess.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("last_contact_success = ?"))
}
if update.LastContactFailure._set {
__values = append(__values, update.LastContactFailure.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("last_contact_failure = ?"))
}
if update.Contained._set {
__values = append(__values, update.Contained.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("contained = ?"))
}
if update.Disqualified._set {
__values = append(__values, update.Disqualified.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("disqualified = ?"))
}
if update.Suspended._set {
__values = append(__values, update.Suspended.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("suspended = ?"))
}
if update.UnknownAuditSuspended._set {
__values = append(__values, update.UnknownAuditSuspended.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("unknown_audit_suspended = ?"))
}
if update.OfflineSuspended._set {
__values = append(__values, update.OfflineSuspended.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("offline_suspended = ?"))
}
if update.UnderReview._set {
__values = append(__values, update.UnderReview.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("under_review = ?"))
}
if update.OnlineScore._set {
__values = append(__values, update.OnlineScore.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("online_score = ?"))
}
if update.AuditReputationAlpha._set {
__values = append(__values, update.AuditReputationAlpha.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("audit_reputation_alpha = ?"))
}
if update.AuditReputationBeta._set {
__values = append(__values, update.AuditReputationBeta.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("audit_reputation_beta = ?"))
}
if update.UnknownAuditReputationAlpha._set {
__values = append(__values, update.UnknownAuditReputationAlpha.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("unknown_audit_reputation_alpha = ?"))
}
if update.UnknownAuditReputationBeta._set {
__values = append(__values, update.UnknownAuditReputationBeta.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("unknown_audit_reputation_beta = ?"))
}
if update.ExitInitiatedAt._set {
__values = append(__values, update.ExitInitiatedAt.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("exit_initiated_at = ?"))
}
if update.ExitLoopCompletedAt._set {
__values = append(__values, update.ExitLoopCompletedAt.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("exit_loop_completed_at = ?"))
}
if update.ExitFinishedAt._set {
__values = append(__values, update.ExitFinishedAt.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("exit_finished_at = ?"))
}
if update.ExitSuccess._set {
__values = append(__values, update.ExitSuccess.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("exit_success = ?"))
}
__now := obj.db.Hooks.Now().UTC()
__values = append(__values, __now)
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("updated_at = ?"))
__args = append(__args, node_id.value())
__values = append(__values, __args...)
__sets.SQL = __sets_sql
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __values...)
_, err = obj.driver.ExecContext(ctx, __stmt, __values...)
if err != nil {
return obj.makeErr(err)
}
return nil
}
func (obj *pgxImpl) Update_AuditHistory_By_NodeId(ctx context.Context, func (obj *pgxImpl) Update_AuditHistory_By_NodeId(ctx context.Context,
audit_history_node_id AuditHistory_NodeId_Field, audit_history_node_id AuditHistory_NodeId_Field,
update AuditHistory_Update_Fields) ( update AuditHistory_Update_Fields) (
@ -20241,6 +20459,224 @@ func (obj *pgxcockroachImpl) UpdateNoReturn_Node_By_Id(ctx context.Context,
return nil return nil
} }
func (obj *pgxcockroachImpl) UpdateNoReturn_Node_By_Id_And_Disqualified_Is_Null_And_ExitFinishedAt_Is_Null(ctx context.Context,
node_id Node_Id_Field,
update Node_Update_Fields) (
err error) {
defer mon.Task()(&ctx)(&err)
var __sets = &__sqlbundle_Hole{}
var __embed_stmt = __sqlbundle_Literals{Join: "", SQLs: []__sqlbundle_SQL{__sqlbundle_Literal("UPDATE nodes SET "), __sets, __sqlbundle_Literal(" WHERE nodes.id = ? AND nodes.disqualified is NULL AND nodes.exit_finished_at is NULL")}}
__sets_sql := __sqlbundle_Literals{Join: ", "}
var __values []interface{}
var __args []interface{}
if update.Address._set {
__values = append(__values, update.Address.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("address = ?"))
}
if update.LastNet._set {
__values = append(__values, update.LastNet.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("last_net = ?"))
}
if update.LastIpPort._set {
__values = append(__values, update.LastIpPort.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("last_ip_port = ?"))
}
if update.Protocol._set {
__values = append(__values, update.Protocol.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("protocol = ?"))
}
if update.Type._set {
__values = append(__values, update.Type.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("type = ?"))
}
if update.Email._set {
__values = append(__values, update.Email.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("email = ?"))
}
if update.Wallet._set {
__values = append(__values, update.Wallet.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("wallet = ?"))
}
if update.WalletFeatures._set {
__values = append(__values, update.WalletFeatures.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("wallet_features = ?"))
}
if update.FreeDisk._set {
__values = append(__values, update.FreeDisk.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("free_disk = ?"))
}
if update.PieceCount._set {
__values = append(__values, update.PieceCount.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("piece_count = ?"))
}
if update.Major._set {
__values = append(__values, update.Major.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("major = ?"))
}
if update.Minor._set {
__values = append(__values, update.Minor.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("minor = ?"))
}
if update.Patch._set {
__values = append(__values, update.Patch.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("patch = ?"))
}
if update.Hash._set {
__values = append(__values, update.Hash.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("hash = ?"))
}
if update.Timestamp._set {
__values = append(__values, update.Timestamp.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("timestamp = ?"))
}
if update.Release._set {
__values = append(__values, update.Release.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("release = ?"))
}
if update.Latency90._set {
__values = append(__values, update.Latency90.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("latency_90 = ?"))
}
if update.AuditSuccessCount._set {
__values = append(__values, update.AuditSuccessCount.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("audit_success_count = ?"))
}
if update.TotalAuditCount._set {
__values = append(__values, update.TotalAuditCount.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("total_audit_count = ?"))
}
if update.VettedAt._set {
__values = append(__values, update.VettedAt.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("vetted_at = ?"))
}
if update.LastContactSuccess._set {
__values = append(__values, update.LastContactSuccess.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("last_contact_success = ?"))
}
if update.LastContactFailure._set {
__values = append(__values, update.LastContactFailure.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("last_contact_failure = ?"))
}
if update.Contained._set {
__values = append(__values, update.Contained.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("contained = ?"))
}
if update.Disqualified._set {
__values = append(__values, update.Disqualified.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("disqualified = ?"))
}
if update.Suspended._set {
__values = append(__values, update.Suspended.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("suspended = ?"))
}
if update.UnknownAuditSuspended._set {
__values = append(__values, update.UnknownAuditSuspended.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("unknown_audit_suspended = ?"))
}
if update.OfflineSuspended._set {
__values = append(__values, update.OfflineSuspended.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("offline_suspended = ?"))
}
if update.UnderReview._set {
__values = append(__values, update.UnderReview.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("under_review = ?"))
}
if update.OnlineScore._set {
__values = append(__values, update.OnlineScore.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("online_score = ?"))
}
if update.AuditReputationAlpha._set {
__values = append(__values, update.AuditReputationAlpha.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("audit_reputation_alpha = ?"))
}
if update.AuditReputationBeta._set {
__values = append(__values, update.AuditReputationBeta.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("audit_reputation_beta = ?"))
}
if update.UnknownAuditReputationAlpha._set {
__values = append(__values, update.UnknownAuditReputationAlpha.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("unknown_audit_reputation_alpha = ?"))
}
if update.UnknownAuditReputationBeta._set {
__values = append(__values, update.UnknownAuditReputationBeta.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("unknown_audit_reputation_beta = ?"))
}
if update.ExitInitiatedAt._set {
__values = append(__values, update.ExitInitiatedAt.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("exit_initiated_at = ?"))
}
if update.ExitLoopCompletedAt._set {
__values = append(__values, update.ExitLoopCompletedAt.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("exit_loop_completed_at = ?"))
}
if update.ExitFinishedAt._set {
__values = append(__values, update.ExitFinishedAt.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("exit_finished_at = ?"))
}
if update.ExitSuccess._set {
__values = append(__values, update.ExitSuccess.value())
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("exit_success = ?"))
}
__now := obj.db.Hooks.Now().UTC()
__values = append(__values, __now)
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("updated_at = ?"))
__args = append(__args, node_id.value())
__values = append(__values, __args...)
__sets.SQL = __sets_sql
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __values...)
_, err = obj.driver.ExecContext(ctx, __stmt, __values...)
if err != nil {
return obj.makeErr(err)
}
return nil
}
func (obj *pgxcockroachImpl) Update_AuditHistory_By_NodeId(ctx context.Context, func (obj *pgxcockroachImpl) Update_AuditHistory_By_NodeId(ctx context.Context,
audit_history_node_id AuditHistory_NodeId_Field, audit_history_node_id AuditHistory_NodeId_Field,
update AuditHistory_Update_Fields) ( update AuditHistory_Update_Fields) (
@ -23525,6 +23961,17 @@ func (rx *Rx) UpdateNoReturn_Node_By_Id(ctx context.Context,
return tx.UpdateNoReturn_Node_By_Id(ctx, node_id, update) return tx.UpdateNoReturn_Node_By_Id(ctx, node_id, update)
} }
func (rx *Rx) UpdateNoReturn_Node_By_Id_And_Disqualified_Is_Null_And_ExitFinishedAt_Is_Null(ctx context.Context,
node_id Node_Id_Field,
update Node_Update_Fields) (
err error) {
var tx *Tx
if tx, err = rx.getTx(ctx); err != nil {
return
}
return tx.UpdateNoReturn_Node_By_Id_And_Disqualified_Is_Null_And_ExitFinishedAt_Is_Null(ctx, node_id, update)
}
func (rx *Rx) UpdateNoReturn_PeerIdentity_By_NodeId(ctx context.Context, func (rx *Rx) UpdateNoReturn_PeerIdentity_By_NodeId(ctx context.Context,
peer_identity_node_id PeerIdentity_NodeId_Field, peer_identity_node_id PeerIdentity_NodeId_Field,
update PeerIdentity_Update_Fields) ( update PeerIdentity_Update_Fields) (
@ -24295,6 +24742,11 @@ type Methods interface {
update Node_Update_Fields) ( update Node_Update_Fields) (
err error) err error)
UpdateNoReturn_Node_By_Id_And_Disqualified_Is_Null_And_ExitFinishedAt_Is_Null(ctx context.Context,
node_id Node_Id_Field,
update Node_Update_Fields) (
err error)
UpdateNoReturn_PeerIdentity_By_NodeId(ctx context.Context, UpdateNoReturn_PeerIdentity_By_NodeId(ctx context.Context,
peer_identity_node_id PeerIdentity_NodeId_Field, peer_identity_node_id PeerIdentity_NodeId_Field,
update PeerIdentity_Update_Fields) ( update PeerIdentity_Update_Fields) (

View File

@ -498,26 +498,6 @@ func (cache *overlaycache) reliable(ctx context.Context, criteria *overlay.NodeC
func (cache *overlaycache) UpdateReputation(ctx context.Context, id storj.NodeID, request *overlay.ReputationStatus) (err error) { func (cache *overlaycache) UpdateReputation(ctx context.Context, id storj.NodeID, request *overlay.ReputationStatus) (err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
err = cache.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) (err error) {
_, err = tx.Tx.ExecContext(ctx, "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
if err != nil {
return err
}
dbNode, err := tx.Get_Node_By_Id(ctx, dbx.Node_Id(id.Bytes()))
if err != nil {
return err
}
// do not update reputation if node has been disqualified already
if dbNode.Disqualified != nil {
return nil
}
// do not update reputation if node has gracefully exited
if dbNode.ExitFinishedAt != nil {
return nil
}
updateFields := dbx.Node_Update_Fields{} updateFields := dbx.Node_Update_Fields{}
updateFields.Contained = dbx.Node_Contained(request.Contained) updateFields.Contained = dbx.Node_Contained(request.Contained)
updateFields.UnknownAuditSuspended = dbx.Node_UnknownAuditSuspended_Raw(request.UnknownAuditSuspended) updateFields.UnknownAuditSuspended = dbx.Node_UnknownAuditSuspended_Raw(request.UnknownAuditSuspended)
@ -525,12 +505,7 @@ func (cache *overlaycache) UpdateReputation(ctx context.Context, id storj.NodeID
updateFields.OfflineSuspended = dbx.Node_OfflineSuspended_Raw(request.OfflineSuspended) updateFields.OfflineSuspended = dbx.Node_OfflineSuspended_Raw(request.OfflineSuspended)
updateFields.VettedAt = dbx.Node_VettedAt_Raw(request.VettedAt) updateFields.VettedAt = dbx.Node_VettedAt_Raw(request.VettedAt)
_, err = cache.db.Update_Node_By_Id(ctx, dbx.Node_Id(id.Bytes()), updateFields) err = cache.db.UpdateNoReturn_Node_By_Id_And_Disqualified_Is_Null_And_ExitFinishedAt_Is_Null(ctx, dbx.Node_Id(id.Bytes()), updateFields)
if err != nil {
return err
}
return nil
})
return Error.Wrap(err) return Error.Wrap(err)
} }