satellite/satellitedb: use transaction helpers in containment

Transactions in our code that might need to work against CockroachDB
need to be retried in the event of a retryable error. The transaction
helper functions in dbutil do that automatically. I am changing this
code to use those helpers instead.

Change-Id: I660540885a0784fae844cf99376d1537e208fa69
This commit is contained in:
paul cannon 2019-12-19 03:52:17 -06:00 committed by paul cannon
parent 2549c601e9
commit f3aee1b758

View File

@ -8,8 +8,6 @@ import (
"context" "context"
"database/sql" "database/sql"
"github.com/zeebo/errs"
"storj.io/common/pb" "storj.io/common/pb"
"storj.io/common/storj" "storj.io/common/storj"
"storj.io/storj/satellite/audit" "storj.io/storj/satellite/audit"
@ -41,76 +39,64 @@ func (containment *containment) Get(ctx context.Context, id pb.NodeID) (_ *audit
// IncrementPending creates a new pending audit entry, or increases its reverify count if it already exists // IncrementPending creates a new pending audit entry, or increases its reverify count if it already exists
func (containment *containment) IncrementPending(ctx context.Context, pendingAudit *audit.PendingAudit) (err error) { func (containment *containment) IncrementPending(ctx context.Context, pendingAudit *audit.PendingAudit) (err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
tx, err := containment.db.Open(ctx) err = containment.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
if err != nil { existingAudit, err := tx.Get_PendingAudits_By_NodeId(ctx, dbx.PendingAudits_NodeId(pendingAudit.NodeID.Bytes()))
return audit.ContainError.Wrap(err) switch err {
} case sql.ErrNoRows:
statement := containment.db.Rebind(
existingAudit, err := tx.Get_PendingAudits_By_NodeId(ctx, dbx.PendingAudits_NodeId(pendingAudit.NodeID.Bytes())) `INSERT INTO pending_audits (node_id, piece_id, stripe_index, share_size, expected_share_hash, reverify_count, path)
switch err {
case sql.ErrNoRows:
statement := containment.db.Rebind(
`INSERT INTO pending_audits (node_id, piece_id, stripe_index, share_size, expected_share_hash, reverify_count, path)
VALUES (?, ?, ?, ?, ?, ?, ?)`, VALUES (?, ?, ?, ?, ?, ?, ?)`,
) )
_, err = tx.Tx.ExecContext(ctx, statement, pendingAudit.NodeID.Bytes(), pendingAudit.PieceID.Bytes(), pendingAudit.StripeIndex, _, err = tx.Tx.ExecContext(ctx, statement, pendingAudit.NodeID.Bytes(), pendingAudit.PieceID.Bytes(), pendingAudit.StripeIndex,
pendingAudit.ShareSize, pendingAudit.ExpectedShareHash, pendingAudit.ReverifyCount, []byte(pendingAudit.Path)) pendingAudit.ShareSize, pendingAudit.ExpectedShareHash, pendingAudit.ReverifyCount, []byte(pendingAudit.Path))
if err != nil { if err != nil {
return audit.ContainError.Wrap(errs.Combine(err, tx.Rollback())) return err
} }
case nil: case nil:
if !bytes.Equal(existingAudit.ExpectedShareHash, pendingAudit.ExpectedShareHash) { if !bytes.Equal(existingAudit.ExpectedShareHash, pendingAudit.ExpectedShareHash) {
return audit.ContainError.Wrap(errs.Combine(audit.ErrAlreadyExists.New("%v", pendingAudit.NodeID), tx.Rollback())) return audit.ErrAlreadyExists.New("%v", pendingAudit.NodeID)
} }
statement := containment.db.Rebind( statement := containment.db.Rebind(
`UPDATE pending_audits SET reverify_count = pending_audits.reverify_count + 1 `UPDATE pending_audits SET reverify_count = pending_audits.reverify_count + 1
WHERE pending_audits.node_id=?`, WHERE pending_audits.node_id=?`,
) )
_, err = tx.Tx.ExecContext(ctx, statement, pendingAudit.NodeID.Bytes()) _, err = tx.Tx.ExecContext(ctx, statement, pendingAudit.NodeID.Bytes())
if err != nil { if err != nil {
return audit.ContainError.Wrap(errs.Combine(err, tx.Rollback())) return err
}
default:
return err
} }
default:
return audit.ContainError.Wrap(errs.Combine(err, tx.Rollback()))
}
updateContained := dbx.Node_Update_Fields{ updateContained := dbx.Node_Update_Fields{
Contained: dbx.Node_Contained(true), Contained: dbx.Node_Contained(true),
} }
err = tx.UpdateNoReturn_Node_By_Id(ctx, dbx.Node_Id(pendingAudit.NodeID.Bytes()), updateContained) return tx.UpdateNoReturn_Node_By_Id(ctx, dbx.Node_Id(pendingAudit.NodeID.Bytes()), updateContained)
if err != nil { })
return audit.ContainError.Wrap(errs.Combine(err, tx.Rollback())) return audit.ContainError.Wrap(err)
}
return audit.ContainError.Wrap(tx.Commit())
} }
// Delete deletes the pending audit // Delete deletes the pending audit
func (containment *containment) Delete(ctx context.Context, id pb.NodeID) (_ bool, err error) { func (containment *containment) Delete(ctx context.Context, id pb.NodeID) (isDeleted bool, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
if id.IsZero() { if id.IsZero() {
return false, audit.ContainError.New("node ID empty") return false, audit.ContainError.New("node ID empty")
} }
tx, err := containment.db.Open(ctx) err = containment.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) (err error) {
if err != nil { isDeleted, err = tx.Delete_PendingAudits_By_NodeId(ctx, dbx.PendingAudits_NodeId(id.Bytes()))
return false, audit.ContainError.Wrap(err) if err != nil {
} return err
}
isDeleted, err := tx.Delete_PendingAudits_By_NodeId(ctx, dbx.PendingAudits_NodeId(id.Bytes())) updateContained := dbx.Node_Update_Fields{
if err != nil { Contained: dbx.Node_Contained(false),
return isDeleted, audit.ContainError.Wrap(errs.Combine(audit.ErrContainDelete.Wrap(err), tx.Rollback())) }
}
updateContained := dbx.Node_Update_Fields{ return tx.UpdateNoReturn_Node_By_Id(ctx, dbx.Node_Id(id.Bytes()), updateContained)
Contained: dbx.Node_Contained(false), })
} return isDeleted, audit.ContainError.Wrap(err)
err = tx.UpdateNoReturn_Node_By_Id(ctx, dbx.Node_Id(id.Bytes()), updateContained)
if err != nil {
return isDeleted, audit.ContainError.Wrap(errs.Combine(err, tx.Rollback()))
}
return isDeleted, audit.ContainError.Wrap(tx.Commit())
} }
func convertDBPending(ctx context.Context, info *dbx.PendingAudits) (_ *audit.PendingAudit, err error) { func convertDBPending(ctx context.Context, info *dbx.PendingAudits) (_ *audit.PendingAudit, err error) {