2019-12-19 08:58:21 +00:00
|
|
|
// Copyright (C) 2019 Storj Labs, Inc.
|
|
|
|
// See LICENSE for copying information.
|
|
|
|
|
|
|
|
// Package txutil provides safe transaction-encapsulation functions which have retry
|
|
|
|
// semantics as necessary.
|
|
|
|
package txutil
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"database/sql"
|
2020-01-30 19:38:25 +00:00
|
|
|
"time"
|
2019-12-19 08:58:21 +00:00
|
|
|
|
2019-11-08 20:40:39 +00:00
|
|
|
"github.com/spacemonkeygo/monkit/v3"
|
2019-12-19 08:58:21 +00:00
|
|
|
"github.com/zeebo/errs"
|
2020-01-08 13:40:19 +00:00
|
|
|
|
2020-09-30 09:27:39 +01:00
|
|
|
"storj.io/storj/private/dbutil/pgutil/pgerrcode"
|
2020-01-17 20:07:00 +00:00
|
|
|
"storj.io/storj/private/tagsql"
|
2019-12-19 08:58:21 +00:00
|
|
|
)
|
|
|
|
|
2020-01-30 19:38:25 +00:00
|
|
|
var mon = monkit.Package()
|
2019-12-19 08:58:21 +00:00
|
|
|
|
|
|
|
// WithTx starts a transaction on the given sql.DB. The transaction is started in the appropriate
|
|
|
|
// manner, and will be restarted if appropriate. While in the transaction, fn is called with a
|
|
|
|
// handle to the transaction in order to make use of it. If fn returns an error, the transaction
|
|
|
|
// is rolled back. If fn returns nil, the transaction is committed.
|
|
|
|
//
|
|
|
|
// If fn has any side effects outside of changes to the database, they must be idempotent! fn may
|
|
|
|
// be called more than one time.
|
2020-01-30 19:38:25 +00:00
|
|
|
func WithTx(ctx context.Context, db tagsql.DB, txOpts *sql.TxOptions, fn func(context.Context, tagsql.Tx) error) (err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
|
|
|
start := time.Now()
|
|
|
|
|
|
|
|
for i := 0; ; i++ {
|
2020-02-14 19:36:45 +00:00
|
|
|
var retryErr error
|
2020-01-30 19:38:25 +00:00
|
|
|
err, rollbackErr := withTxOnce(ctx, db, txOpts, fn)
|
2020-04-08 19:54:18 +01:00
|
|
|
// if we had any error, check to see if we should retry.
|
|
|
|
if err != nil || rollbackErr != nil {
|
|
|
|
// we will only retry if we have enough resources (duration and count).
|
|
|
|
if dur := time.Since(start); dur < 5*time.Minute && i < 10 {
|
|
|
|
// even though the resources (duration and count) allow us to issue a retry,
|
|
|
|
// we only should if the error claims we should.
|
2020-09-30 09:27:39 +01:00
|
|
|
if code := pgerrcode.FromError(err); code == "CR000" || code == "40001" {
|
2020-04-08 19:54:18 +01:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
// we aren't issuing a retry due to resources (duration and count), so
|
|
|
|
// include a retry error in the output so that we know something is wrong.
|
|
|
|
retryErr = errs.New("unable to retry: duration:%v attempts:%d", dur, i)
|
2020-01-30 19:38:25 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
mon.IntVal("transaction_retries").Observe(int64(i))
|
2020-02-14 19:36:45 +00:00
|
|
|
return errs.Wrap(errs.Combine(err, rollbackErr, retryErr))
|
2020-01-30 19:38:25 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// withTxOnce creates a transaction, ensures that it is eventually released (commit or rollback)
|
|
|
|
// and passes it to the provided callback. It does not handle retries or anything, delegating
|
|
|
|
// that to callers.
|
|
|
|
func withTxOnce(ctx context.Context, db tagsql.DB, txOpts *sql.TxOptions, fn func(context.Context, tagsql.Tx) error) (err, rollbackErr error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
2019-12-19 08:58:21 +00:00
|
|
|
tx, err := db.BeginTx(ctx, txOpts)
|
|
|
|
if err != nil {
|
2020-01-30 19:38:25 +00:00
|
|
|
return errs.Wrap(err), nil
|
2019-12-19 08:58:21 +00:00
|
|
|
}
|
2020-01-30 19:38:25 +00:00
|
|
|
defer func() {
|
|
|
|
if err == nil {
|
|
|
|
err = tx.Commit()
|
|
|
|
} else {
|
|
|
|
rollbackErr = tx.Rollback()
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
return fn(ctx, tx), nil
|
|
|
|
}
|