satellite/satellitedb: use transaction helpers in overlaycache

Transactions in our code that might need to work against CockroachDB
need to be retried in the event of a retryable error. The transaction
helper functions in dbutil do that automatically. I am changing this
code to use those helpers instead.

Change-Id: Icd3da71448a84c582c6afdc6b52d1f345fe9469f
This commit is contained in:
paul cannon 2019-12-19 04:03:20 -06:00 committed by paul cannon
parent b072e16ff7
commit 4203e25c54

View File

@ -517,70 +517,66 @@ func (cache *overlaycache) UpdateAddress(ctx context.Context, info *pb.Node, def
return overlay.ErrEmptyNode
}
tx, err := cache.db.Open(ctx)
if err != nil {
return Error.Wrap(err)
}
// TODO: use upsert
_, err = tx.Get_Node_By_Id(ctx, dbx.Node_Id(info.Id.Bytes()))
err = cache.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) (err error) {
// TODO: use upsert
_, err = tx.Get_Node_By_Id(ctx, dbx.Node_Id(info.Id.Bytes()))
address := info.Address
if address == nil {
address = &pb.NodeAddress{}
}
if err != nil {
// add the node to DB for first time
err = tx.CreateNoReturn_Node(
ctx,
dbx.Node_Id(info.Id.Bytes()),
dbx.Node_Address(address.Address),
dbx.Node_LastNet(info.LastIp),
dbx.Node_Protocol(int(address.Transport)),
dbx.Node_Type(int(pb.NodeType_INVALID)),
dbx.Node_Email(""),
dbx.Node_Wallet(""),
dbx.Node_FreeBandwidth(-1),
dbx.Node_FreeDisk(-1),
dbx.Node_Major(0),
dbx.Node_Minor(0),
dbx.Node_Patch(0),
dbx.Node_Hash(""),
dbx.Node_Timestamp(time.Time{}),
dbx.Node_Release(false),
dbx.Node_Latency90(0),
dbx.Node_AuditSuccessCount(0),
dbx.Node_TotalAuditCount(0),
dbx.Node_UptimeSuccessCount(0),
dbx.Node_TotalUptimeCount(0),
dbx.Node_LastContactSuccess(time.Now()),
dbx.Node_LastContactFailure(time.Time{}),
dbx.Node_Contained(false),
dbx.Node_AuditReputationAlpha(defaults.AuditReputationAlpha0),
dbx.Node_AuditReputationBeta(defaults.AuditReputationBeta0),
dbx.Node_UptimeReputationAlpha(defaults.UptimeReputationAlpha0),
dbx.Node_UptimeReputationBeta(defaults.UptimeReputationBeta0),
dbx.Node_ExitSuccess(false),
dbx.Node_Create_Fields{
Disqualified: dbx.Node_Disqualified_Null(),
},
)
if err != nil {
return Error.Wrap(errs.Combine(err, tx.Rollback()))
address := info.Address
if address == nil {
address = &pb.NodeAddress{}
}
} else {
err = tx.UpdateNoReturn_Node_By_Id(ctx, dbx.Node_Id(info.Id.Bytes()),
dbx.Node_Update_Fields{
Address: dbx.Node_Address(address.Address),
LastNet: dbx.Node_LastNet(info.LastIp),
Protocol: dbx.Node_Protocol(int(address.Transport)),
})
if err != nil {
return Error.Wrap(errs.Combine(err, tx.Rollback()))
}
}
return Error.Wrap(tx.Commit())
if err != nil {
if err != sql.ErrNoRows {
return err
}
// add the node to DB for first time
err = tx.CreateNoReturn_Node(
ctx,
dbx.Node_Id(info.Id.Bytes()),
dbx.Node_Address(address.Address),
dbx.Node_LastNet(info.LastIp),
dbx.Node_Protocol(int(address.Transport)),
dbx.Node_Type(int(pb.NodeType_INVALID)),
dbx.Node_Email(""),
dbx.Node_Wallet(""),
dbx.Node_FreeBandwidth(-1),
dbx.Node_FreeDisk(-1),
dbx.Node_Major(0),
dbx.Node_Minor(0),
dbx.Node_Patch(0),
dbx.Node_Hash(""),
dbx.Node_Timestamp(time.Time{}),
dbx.Node_Release(false),
dbx.Node_Latency90(0),
dbx.Node_AuditSuccessCount(0),
dbx.Node_TotalAuditCount(0),
dbx.Node_UptimeSuccessCount(0),
dbx.Node_TotalUptimeCount(0),
dbx.Node_LastContactSuccess(time.Now()),
dbx.Node_LastContactFailure(time.Time{}),
dbx.Node_Contained(false),
dbx.Node_AuditReputationAlpha(defaults.AuditReputationAlpha0),
dbx.Node_AuditReputationBeta(defaults.AuditReputationBeta0),
dbx.Node_UptimeReputationAlpha(defaults.UptimeReputationAlpha0),
dbx.Node_UptimeReputationBeta(defaults.UptimeReputationBeta0),
dbx.Node_ExitSuccess(false),
dbx.Node_Create_Fields{
Disqualified: dbx.Node_Disqualified_Null(),
},
)
} else {
err = tx.UpdateNoReturn_Node_By_Id(ctx, dbx.Node_Id(info.Id.Bytes()),
dbx.Node_Update_Fields{
Address: dbx.Node_Address(address.Address),
LastNet: dbx.Node_LastNet(info.LastIp),
Protocol: dbx.Node_Protocol(int(address.Transport)),
})
}
return err
})
return Error.Wrap(err)
}
// BatchUpdateStats updates multiple storagenode's stats in one transaction
@ -602,45 +598,47 @@ func (cache *overlaycache) BatchUpdateStats(ctx context.Context, updateRequests
}
}
tx, err := cache.db.Open(ctx)
doAppendAll := true
err = cache.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) (err error) {
var allSQL string
for _, updateReq := range updateSlice {
dbNode, err := tx.Get_Node_By_Id(ctx, dbx.Node_Id(updateReq.NodeID.Bytes()))
if err != nil {
doAppendAll = false
return err
}
// do not update reputation if node is disqualified
if dbNode.Disqualified != nil {
continue
}
updateNodeStats := populateUpdateNodeStats(dbNode, updateReq)
sql := buildUpdateStatement(updateNodeStats)
allSQL += sql
}
if allSQL != "" {
results, err := tx.Tx.Exec(allSQL)
if err != nil {
return err
}
_, err = results.RowsAffected()
if err != nil {
return err
}
}
return nil
})
if err != nil {
appendAll()
if doAppendAll {
appendAll()
}
return duf, Error.Wrap(err)
}
var allSQL string
for _, updateReq := range updateSlice {
dbNode, err := tx.Get_Node_By_Id(ctx, dbx.Node_Id(updateReq.NodeID.Bytes()))
if err != nil {
return nil, Error.Wrap(errs.Combine(err, tx.Rollback()))
}
// do not update reputation if node is disqualified
if dbNode.Disqualified != nil {
continue
}
updateNodeStats := populateUpdateNodeStats(dbNode, updateReq)
sql := buildUpdateStatement(updateNodeStats)
allSQL += sql
}
if allSQL != "" {
results, err := tx.Tx.Exec(allSQL)
if results == nil || err != nil {
appendAll()
return duf, errs.Combine(err, tx.Rollback())
}
_, err = results.RowsAffected()
if err != nil {
appendAll()
return duf, errs.Combine(err, tx.Rollback())
}
}
return duf, Error.Wrap(tx.Commit())
return duf, nil
}
var errlist errs.Group
@ -667,40 +665,39 @@ func (cache *overlaycache) UpdateStats(ctx context.Context, updateReq *overlay.U
defer mon.Task()(&ctx)(&err)
nodeID := updateReq.NodeID
tx, err := cache.db.Open(ctx)
var dbNode *dbx.Node
err = cache.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) (err error) {
dbNode, err = tx.Get_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()))
if err != nil {
return err
}
// do not update reputation if node is disqualified
if dbNode.Disqualified != nil {
return nil
}
updateFields := populateUpdateFields(dbNode, updateReq)
dbNode, err = tx.Update_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()), updateFields)
if err != nil {
return err
}
// Cleanup containment table too
_, err = tx.Delete_PendingAudits_By_NodeId(ctx, dbx.PendingAudits_NodeId(nodeID.Bytes()))
return err
})
if err != nil {
return nil, Error.Wrap(err)
}
dbNode, err := tx.Get_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()))
if err != nil {
return nil, Error.Wrap(errs.Combine(err, tx.Rollback()))
}
// do not update reputation if node is disqualified
if dbNode.Disqualified != nil {
return getNodeStats(dbNode), Error.Wrap(tx.Commit())
}
updateFields := populateUpdateFields(dbNode, updateReq)
dbNode, err = tx.Update_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()), updateFields)
if err != nil {
return nil, Error.Wrap(errs.Combine(err, tx.Rollback()))
}
// Cleanup containment table too
_, err = tx.Delete_PendingAudits_By_NodeId(ctx, dbx.PendingAudits_NodeId(nodeID.Bytes()))
if err != nil {
return nil, Error.Wrap(errs.Combine(err, tx.Rollback()))
}
// TODO: Allegedly tx.Get_Node_By_Id and tx.Update_Node_By_Id should never return a nil value for dbNode,
// however we've seen from some crashes that it does. We need to track down the cause of these crashes
// but for now we're adding a nil check to prevent a panic.
if dbNode == nil {
return nil, Error.Wrap(errs.Combine(errs.New("unable to get node by ID: %v", nodeID), tx.Rollback()))
return nil, Error.New("unable to get node by ID: %v", nodeID)
}
return getNodeStats(dbNode), Error.Wrap(tx.Commit())
return getNodeStats(dbNode), nil
}
// UpdateNodeInfo updates the following fields for a given node ID:
@ -747,82 +744,83 @@ func (cache *overlaycache) UpdateNodeInfo(ctx context.Context, nodeID storj.Node
func (cache *overlaycache) UpdateUptime(ctx context.Context, nodeID storj.NodeID, isUp bool, lambda, weight, uptimeDQ float64) (stats *overlay.NodeStats, err error) {
defer mon.Task()(&ctx)(&err)
tx, err := cache.db.Open(ctx)
var dbNode *dbx.Node
err = cache.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) (err error) {
dbNode, err = tx.Get_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()))
if err != nil {
return err
}
// do not update reputation if node is disqualified
if dbNode.Disqualified != nil {
return nil
}
updateFields := dbx.Node_Update_Fields{}
uptimeAlpha, uptimeBeta, totalUptimeCount := updateReputation(
isUp,
dbNode.UptimeReputationAlpha,
dbNode.UptimeReputationBeta,
lambda,
weight,
dbNode.TotalUptimeCount,
)
mon.FloatVal("uptime_reputation_alpha").Observe(uptimeAlpha)
mon.FloatVal("uptime_reputation_beta").Observe(uptimeBeta)
updateFields.UptimeReputationAlpha = dbx.Node_UptimeReputationAlpha(uptimeAlpha)
updateFields.UptimeReputationBeta = dbx.Node_UptimeReputationBeta(uptimeBeta)
updateFields.TotalUptimeCount = dbx.Node_TotalUptimeCount(totalUptimeCount)
uptimeRep := uptimeAlpha / (uptimeAlpha + uptimeBeta)
if uptimeRep <= uptimeDQ {
updateFields.Disqualified = dbx.Node_Disqualified(time.Now().UTC())
}
lastContactSuccess := dbNode.LastContactSuccess
lastContactFailure := dbNode.LastContactFailure
mon.Meter("uptime_updates").Mark(1)
if isUp {
updateFields.UptimeSuccessCount = dbx.Node_UptimeSuccessCount(dbNode.UptimeSuccessCount + 1)
updateFields.LastContactSuccess = dbx.Node_LastContactSuccess(time.Now())
mon.Meter("uptime_update_successes").Mark(1)
// we have seen this node in the past 24 hours
if time.Since(lastContactFailure) > time.Hour*24 {
mon.Meter("uptime_seen_24h").Mark(1)
}
// we have seen this node in the past week
if time.Since(lastContactFailure) > time.Hour*24*7 {
mon.Meter("uptime_seen_week").Mark(1)
}
} else {
updateFields.LastContactFailure = dbx.Node_LastContactFailure(time.Now())
mon.Meter("uptime_update_failures").Mark(1)
// it's been over 24 hours since we've seen this node
if time.Since(lastContactSuccess) > time.Hour*24 {
mon.Meter("uptime_not_seen_24h").Mark(1)
}
// it's been over a week since we've seen this node
if time.Since(lastContactSuccess) > time.Hour*24*7 {
mon.Meter("uptime_not_seen_week").Mark(1)
}
}
dbNode, err = tx.Update_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()), updateFields)
return err
})
if err != nil {
return nil, Error.Wrap(err)
}
dbNode, err := tx.Get_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()))
if err != nil {
return nil, Error.Wrap(errs.Combine(err, tx.Rollback()))
}
// do not update reputation if node is disqualified
if dbNode.Disqualified != nil {
return getNodeStats(dbNode), Error.Wrap(tx.Commit())
}
updateFields := dbx.Node_Update_Fields{}
uptimeAlpha, uptimeBeta, totalUptimeCount := updateReputation(
isUp,
dbNode.UptimeReputationAlpha,
dbNode.UptimeReputationBeta,
lambda,
weight,
dbNode.TotalUptimeCount,
)
mon.FloatVal("uptime_reputation_alpha").Observe(uptimeAlpha)
mon.FloatVal("uptime_reputation_beta").Observe(uptimeBeta)
updateFields.UptimeReputationAlpha = dbx.Node_UptimeReputationAlpha(uptimeAlpha)
updateFields.UptimeReputationBeta = dbx.Node_UptimeReputationBeta(uptimeBeta)
updateFields.TotalUptimeCount = dbx.Node_TotalUptimeCount(totalUptimeCount)
uptimeRep := uptimeAlpha / (uptimeAlpha + uptimeBeta)
if uptimeRep <= uptimeDQ {
updateFields.Disqualified = dbx.Node_Disqualified(time.Now().UTC())
}
lastContactSuccess := dbNode.LastContactSuccess
lastContactFailure := dbNode.LastContactFailure
mon.Meter("uptime_updates").Mark(1)
if isUp {
updateFields.UptimeSuccessCount = dbx.Node_UptimeSuccessCount(dbNode.UptimeSuccessCount + 1)
updateFields.LastContactSuccess = dbx.Node_LastContactSuccess(time.Now())
mon.Meter("uptime_update_successes").Mark(1)
// we have seen this node in the past 24 hours
if time.Since(lastContactFailure) > time.Hour*24 {
mon.Meter("uptime_seen_24h").Mark(1)
}
// we have seen this node in the past week
if time.Since(lastContactFailure) > time.Hour*24*7 {
mon.Meter("uptime_seen_week").Mark(1)
}
} else {
updateFields.LastContactFailure = dbx.Node_LastContactFailure(time.Now())
mon.Meter("uptime_update_failures").Mark(1)
// it's been over 24 hours since we've seen this node
if time.Since(lastContactSuccess) > time.Hour*24 {
mon.Meter("uptime_not_seen_24h").Mark(1)
}
// it's been over a week since we've seen this node
if time.Since(lastContactSuccess) > time.Hour*24*7 {
mon.Meter("uptime_not_seen_week").Mark(1)
}
}
dbNode, err = tx.Update_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()), updateFields)
if err != nil {
return nil, Error.Wrap(errs.Combine(err, tx.Rollback()))
}
// TODO: Allegedly tx.Get_Node_By_Id and tx.Update_Node_By_Id should never return a nil value for dbNode,
// however we've seen from some crashes that it does. We need to track down the cause of these crashes
// but for now we're adding a nil check to prevent a panic.
if dbNode == nil {
return nil, Error.Wrap(errs.Combine(errs.New("unable to get node by ID: %v", nodeID), tx.Rollback()))
return nil, Error.New("unable to get node by ID: %v", nodeID)
}
return getNodeStats(dbNode), Error.Wrap(tx.Commit())
return getNodeStats(dbNode), nil
}
// AllPieceCounts returns a map of node IDs to piece counts from the db.