private/dbutil: retry single statements on cockroachdb
This ought to make it so that all single statements (Exec- or Query-) on a CockroachDB backend will get retried as necessary. As there is no need for savepoints to be allocated or released in this case, there is no round-trip overhead except when statements actually do need to be retried. Change-Id: Ibd7f1725ff727477c456cb309120d080f3cd7099
This commit is contained in:
parent
665ed3b6b1
commit
5a1838bc28
1
go.mod
1
go.mod
@ -50,6 +50,7 @@ require (
|
||||
github.com/hashicorp/raft v1.0.0 // indirect
|
||||
github.com/howeyc/gopass v0.0.0-20170109162249-bf9dde6d0d2c // indirect
|
||||
github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf // indirect
|
||||
github.com/jackc/pgx v3.2.0+incompatible
|
||||
github.com/jtolds/gls v4.2.1+incompatible // indirect
|
||||
github.com/jtolds/go-luar v0.0.0-20170419063437-0786921db8c0
|
||||
github.com/jtolds/monkit-hw v0.0.0-20190108155550-0f753668cf20
|
||||
|
@ -4,11 +4,16 @@
|
||||
package cockroachutil
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"database/sql/driver"
|
||||
"errors"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
"github.com/jackc/pgx"
|
||||
"github.com/lib/pq"
|
||||
"github.com/zeebo/errs"
|
||||
)
|
||||
|
||||
// Driver is the type for the "cockroach" sql/database driver.
|
||||
@ -21,11 +26,6 @@ type Driver struct {
|
||||
|
||||
// Open opens a new cockroachDB connection.
|
||||
func (cd *Driver) Open(name string) (driver.Conn, error) {
|
||||
return Open(name)
|
||||
}
|
||||
|
||||
// Open opens a new cockroachDB connection.
|
||||
func Open(name string) (driver.Conn, error) {
|
||||
name = translateName(name)
|
||||
return pq.Open(name)
|
||||
}
|
||||
@ -38,7 +38,188 @@ func (cd *Driver) OpenConnector(name string) (driver.Connector, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Connector{pgConnector}, nil
|
||||
return &cockroachConnector{pgConnector}, nil
|
||||
}
|
||||
|
||||
// cockroachConnector is a thin wrapper around a pq-based connector. This allows
|
||||
// Driver to supply our custom cockroachConn type for connections.
|
||||
type cockroachConnector struct {
|
||||
pgConnector driver.Connector
|
||||
}
|
||||
|
||||
// Driver returns the driver being used for this connector.
|
||||
func (c *cockroachConnector) Driver() driver.Driver {
|
||||
return &Driver{}
|
||||
}
|
||||
|
||||
// Connect creates a new connection using the connector.
|
||||
func (c *cockroachConnector) Connect(ctx context.Context) (driver.Conn, error) {
|
||||
pgConn, err := c.pgConnector.Connect(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if pgConnAll, ok := pgConn.(connAll); ok {
|
||||
return &cockroachConn{pgConnAll}, nil
|
||||
}
|
||||
return nil, errs.New("Underlying connector type %T does not implement connAll?!", pgConn)
|
||||
}
|
||||
|
||||
type connAll interface {
|
||||
driver.Conn
|
||||
driver.ConnBeginTx
|
||||
driver.ExecerContext
|
||||
driver.QueryerContext
|
||||
}
|
||||
|
||||
// cockroachConn is a connection to a database. It is not used concurrently by multiple goroutines.
|
||||
type cockroachConn struct {
|
||||
underlying connAll
|
||||
}
|
||||
|
||||
// Assert that cockroachConn fulfills connAll.
|
||||
var _ connAll = (*cockroachConn)(nil)
|
||||
|
||||
// Close closes the cockroachConn.
|
||||
func (c *cockroachConn) Close() error {
|
||||
return c.underlying.Close()
|
||||
}
|
||||
|
||||
// ExecContext (when implemented by a driver.Conn) provides ExecContext
|
||||
// functionality to a sql.DB instance. This implementation provides
|
||||
// retry semantics for single statements.
|
||||
func (c *cockroachConn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
|
||||
result, err := c.underlying.ExecContext(ctx, query, args)
|
||||
for err != nil && !c.isInTransaction() && needsRetry(err) {
|
||||
result, err = c.underlying.ExecContext(ctx, query, args)
|
||||
}
|
||||
return result, err
|
||||
}
|
||||
|
||||
// QueryContext (when implemented by a driver.Conn) provides QueryContext
|
||||
// functionality to a sql.DB instance. This implementation provides
|
||||
// retry semantics for single statements.
|
||||
func (c *cockroachConn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
|
||||
result, err := c.underlying.QueryContext(ctx, query, args)
|
||||
for err != nil && !c.isInTransaction() && needsRetry(err) {
|
||||
result, err = c.underlying.QueryContext(ctx, query, args)
|
||||
}
|
||||
return result, err
|
||||
}
|
||||
|
||||
// Begin starts a new transaction.
|
||||
func (c *cockroachConn) Begin() (driver.Tx, error) {
|
||||
return c.BeginTx(context.Background(), driver.TxOptions{})
|
||||
}
|
||||
|
||||
// BeginTx begins a new transaction using the specified context and with the specified options.
|
||||
func (c *cockroachConn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
|
||||
return c.underlying.BeginTx(ctx, opts)
|
||||
}
|
||||
|
||||
// Prepare prepares a statement for future execution.
|
||||
func (c *cockroachConn) Prepare(query string) (driver.Stmt, error) {
|
||||
pqStmt, err := c.underlying.Prepare(query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
adapted, ok := pqStmt.(stmtAll)
|
||||
if !ok {
|
||||
return nil, errs.New("Stmt type %T does not provide stmtAll?!", adapted)
|
||||
}
|
||||
return &cockroachStmt{underlyingStmt: adapted, conn: c}, nil
|
||||
}
|
||||
|
||||
type transactionStatus byte
|
||||
|
||||
const (
|
||||
txnStatusIdle transactionStatus = 'I'
|
||||
txnStatusIdleInTransaction transactionStatus = 'T'
|
||||
txnStatusInFailedTransaction transactionStatus = 'E'
|
||||
)
|
||||
|
||||
func (c *cockroachConn) txnStatus() transactionStatus {
|
||||
// access c.underlying -> c.underlying.(*pq.conn) -> (*c.underlying.(*pq.conn)).txnStatus
|
||||
//
|
||||
// this is of course brittle if lib/pq internals change, so a test is necessary to make
|
||||
// sure we stay on the same page.
|
||||
return transactionStatus(reflect.ValueOf(c.underlying).Elem().Field(4).Uint())
|
||||
}
|
||||
|
||||
func (c *cockroachConn) isInTransaction() bool {
|
||||
txnStatus := c.txnStatus()
|
||||
return txnStatus == txnStatusIdleInTransaction || txnStatus == txnStatusInFailedTransaction
|
||||
}
|
||||
|
||||
type stmtAll interface {
|
||||
driver.Stmt
|
||||
driver.StmtExecContext
|
||||
driver.StmtQueryContext
|
||||
}
|
||||
|
||||
type cockroachStmt struct {
|
||||
underlyingStmt stmtAll
|
||||
conn *cockroachConn
|
||||
}
|
||||
|
||||
// Assert that cockroachStmt satisfies StmtExecContext and StmtQueryContext.
|
||||
var _ stmtAll = (*cockroachStmt)(nil)
|
||||
|
||||
// Close closes a prepared statement.
|
||||
func (stmt *cockroachStmt) Close() error {
|
||||
return stmt.underlyingStmt.Close()
|
||||
}
|
||||
|
||||
// NumInput returns the number of placeholder parameters.
|
||||
func (stmt *cockroachStmt) NumInput() int {
|
||||
return stmt.underlyingStmt.NumInput()
|
||||
}
|
||||
|
||||
// Exec executes a SQL statement in the background context.
|
||||
func (stmt *cockroachStmt) Exec(args []driver.Value) (driver.Result, error) {
|
||||
// since (driver.Stmt).Exec() is deprecated, we translate our Value args to NamedValue args
|
||||
// and pass in background context to ExecContext instead.
|
||||
namedArgs := make([]driver.NamedValue, len(args))
|
||||
for i, arg := range args {
|
||||
namedArgs[i] = driver.NamedValue{Ordinal: i + 1, Value: arg}
|
||||
}
|
||||
result, err := stmt.underlyingStmt.ExecContext(context.Background(), namedArgs)
|
||||
for err != nil && !stmt.conn.isInTransaction() && needsRetry(err) {
|
||||
result, err = stmt.underlyingStmt.ExecContext(context.Background(), namedArgs)
|
||||
}
|
||||
return result, err
|
||||
}
|
||||
|
||||
// Query executes a query in the background context.
|
||||
func (stmt *cockroachStmt) Query(args []driver.Value) (driver.Rows, error) {
|
||||
// since (driver.Stmt).Query() is deprecated, we translate our Value args to NamedValue args
|
||||
// and pass in background context to QueryContext instead.
|
||||
namedArgs := make([]driver.NamedValue, len(args))
|
||||
for i, arg := range args {
|
||||
namedArgs[i] = driver.NamedValue{Ordinal: i + 1, Value: arg}
|
||||
}
|
||||
result, err := stmt.underlyingStmt.QueryContext(context.Background(), namedArgs)
|
||||
for err != nil && !stmt.conn.isInTransaction() && needsRetry(err) {
|
||||
result, err = stmt.underlyingStmt.QueryContext(context.Background(), namedArgs)
|
||||
}
|
||||
return result, err
|
||||
}
|
||||
|
||||
// ExecContext executes SQL statements in the specified context.
|
||||
func (stmt *cockroachStmt) ExecContext(ctx context.Context, args []driver.NamedValue) (driver.Result, error) {
|
||||
result, err := stmt.underlyingStmt.ExecContext(ctx, args)
|
||||
for err != nil && !stmt.conn.isInTransaction() && needsRetry(err) {
|
||||
result, err = stmt.underlyingStmt.ExecContext(ctx, args)
|
||||
}
|
||||
return result, err
|
||||
}
|
||||
|
||||
// QueryContext executes a query in the specified context.
|
||||
func (stmt *cockroachStmt) QueryContext(ctx context.Context, args []driver.NamedValue) (driver.Rows, error) {
|
||||
rows, err := stmt.underlyingStmt.QueryContext(ctx, args)
|
||||
for err != nil && !stmt.conn.isInTransaction() && needsRetry(err) {
|
||||
rows, err = stmt.underlyingStmt.QueryContext(ctx, args)
|
||||
}
|
||||
return rows, err
|
||||
}
|
||||
|
||||
// translateName changes the scheme name in a `cockroach://` URL to
|
||||
@ -50,18 +231,36 @@ func translateName(name string) string {
|
||||
return name
|
||||
}
|
||||
|
||||
// Connector is a thin wrapper around a pq-based connector. This allows
|
||||
// Driver to satisfy driver.DriverContext, and avoids weird breakage if
|
||||
// and when we upgrade from pq 1.0 to pq 1.2 or higher.
|
||||
type Connector struct {
|
||||
driver.Connector
|
||||
// borrowed from code in crdb
|
||||
func needsRetry(err error) bool {
|
||||
code := errCode(err)
|
||||
return code == "40001" || code == "CR000"
|
||||
}
|
||||
|
||||
// Driver returns the driver being used for this connector.
|
||||
func (conn *Connector) Driver() driver.Driver {
|
||||
return &Driver{}
|
||||
// borrowed from crdb
|
||||
func errCode(err error) string {
|
||||
switch t := errorCause(err).(type) {
|
||||
case *pq.Error:
|
||||
return string(t.Code)
|
||||
case *pgx.PgError:
|
||||
return t.Code
|
||||
default:
|
||||
return ""
|
||||
}
|
||||
}
|
||||
|
||||
func errorCause(err error) error {
|
||||
for err != nil {
|
||||
cause := errors.Unwrap(err)
|
||||
if cause == nil {
|
||||
break
|
||||
}
|
||||
err = cause
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Assert that Driver satisfies DriverContext.
|
||||
var _ driver.DriverContext = &Driver{}
|
||||
|
||||
func init() {
|
||||
|
110
private/dbutil/cockroachutil/driver_test.go
Normal file
110
private/dbutil/cockroachutil/driver_test.go
Normal file
@ -0,0 +1,110 @@
|
||||
// Copyright (C) 2020 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package cockroachutil
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/storj/private/dbutil/pgutil/pgtest"
|
||||
"storj.io/storj/private/tagsql"
|
||||
)
|
||||
|
||||
func TestLibPqCompatibility(t *testing.T) {
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
if *pgtest.CrdbConnStr == "" {
|
||||
t.Skip("CockroachDB flag missing")
|
||||
}
|
||||
testDB, err := OpenUnique(ctx, *pgtest.CrdbConnStr, "TestLibPqCompatibility")
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(testDB.Close)
|
||||
|
||||
// use a single dedicated conn for testing
|
||||
conn, err := testDB.Conn(ctx)
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(conn.Close)
|
||||
|
||||
// should be in idle status, no transaction, initially
|
||||
require.Equal(t, txnStatusIdle, getTxnStatus(ctx, t, conn))
|
||||
require.False(t, checkIsInTx(ctx, t, conn))
|
||||
|
||||
// start a transaction
|
||||
tx, err := conn.BeginTx(ctx, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
func() {
|
||||
defer func() { err = tx.Rollback() }()
|
||||
|
||||
// should be idle in transaction now
|
||||
require.Equal(t, txnStatusIdleInTransaction, getTxnStatus(ctx, t, conn))
|
||||
require.True(t, checkIsInTx(ctx, t, conn))
|
||||
|
||||
// issue successful query
|
||||
rows, err := tx.QueryContext(ctx, `SELECT 1`)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.True(t, rows.Next())
|
||||
var n int
|
||||
err = rows.Scan(&n)
|
||||
require.NoError(t, err)
|
||||
require.False(t, rows.Next())
|
||||
err = rows.Err()
|
||||
require.NoError(t, err)
|
||||
err = rows.Close()
|
||||
require.NoError(t, err)
|
||||
|
||||
// should still be idle in transaction
|
||||
require.Equal(t, txnStatusIdleInTransaction, getTxnStatus(ctx, t, conn))
|
||||
require.True(t, checkIsInTx(ctx, t, conn))
|
||||
|
||||
// issue bad query
|
||||
_, err = tx.QueryContext(ctx, `SELECT BALONEY SANDWICHES`)
|
||||
require.Error(t, err)
|
||||
|
||||
// should be in a failed transaction now
|
||||
require.Equal(t, txnStatusInFailedTransaction, getTxnStatus(ctx, t, conn))
|
||||
require.True(t, checkIsInTx(ctx, t, conn))
|
||||
}()
|
||||
|
||||
// check rollback error
|
||||
require.NoError(t, err)
|
||||
|
||||
// should be back out of any transaction
|
||||
require.Equal(t, txnStatusIdle, getTxnStatus(ctx, t, conn))
|
||||
require.False(t, checkIsInTx(ctx, t, conn))
|
||||
}
|
||||
|
||||
func withCockroachConn(ctx context.Context, sqlConn tagsql.Conn, fn func(conn *cockroachConn) error) error {
|
||||
return sqlConn.Raw(ctx, func(rawConn interface{}) error {
|
||||
crConn, ok := rawConn.(*cockroachConn)
|
||||
if !ok {
|
||||
return errs.New("conn object is %T, not *cockroachConn", crConn)
|
||||
}
|
||||
return fn(crConn)
|
||||
})
|
||||
}
|
||||
|
||||
func getTxnStatus(ctx context.Context, t *testing.T, sqlConn tagsql.Conn) (txnStatus transactionStatus) {
|
||||
err := withCockroachConn(ctx, sqlConn, func(crConn *cockroachConn) error {
|
||||
txnStatus = crConn.txnStatus()
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
return txnStatus
|
||||
}
|
||||
|
||||
func checkIsInTx(ctx context.Context, t *testing.T, sqlConn tagsql.Conn) (isInTx bool) {
|
||||
err := withCockroachConn(ctx, sqlConn, func(crConn *cockroachConn) error {
|
||||
isInTx = crConn.isInTransaction()
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
return isInTx
|
||||
}
|
Loading…
Reference in New Issue
Block a user