storage/cockroachkv: add check if retry is needed during iteration
This changeset replaces https://review.dev.storj.io/c/storj/storj/+/1839 which did the same thing but Nat couldn't figure out how to fix conflicting files the correct gerrity way. Change-Id: If05a8902aca986ea9f6c9168a90b31beebab839a
This commit is contained in:
parent
88a2561317
commit
8bd4d7b43e
@ -89,7 +89,7 @@ func (c *cockroachConn) Close() error {
|
||||
// retry semantics for single statements.
|
||||
func (c *cockroachConn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
|
||||
result, err := c.underlying.ExecContext(ctx, query, args)
|
||||
for err != nil && !c.isInTransaction() && needsRetry(err) {
|
||||
for err != nil && !c.isInTransaction() && NeedsRetry(err) {
|
||||
result, err = c.underlying.ExecContext(ctx, query, args)
|
||||
}
|
||||
return result, err
|
||||
@ -145,7 +145,7 @@ func (c *cockroachConn) QueryContext(ctx context.Context, query string, args []d
|
||||
for {
|
||||
result, err := c.underlying.QueryContext(ctx, query, args)
|
||||
if err != nil {
|
||||
if needsRetry(err) {
|
||||
if NeedsRetry(err) {
|
||||
if c.isInTransaction() {
|
||||
return nil, err
|
||||
}
|
||||
@ -158,7 +158,7 @@ func (c *cockroachConn) QueryContext(ctx context.Context, query string, args []d
|
||||
// If this returns an error it's probably the same error
|
||||
// we got from calling Next inside wrapRows.
|
||||
_ = result.Close()
|
||||
if needsRetry(err) {
|
||||
if NeedsRetry(err) {
|
||||
if c.isInTransaction() {
|
||||
return nil, err
|
||||
}
|
||||
@ -247,7 +247,7 @@ func (stmt *cockroachStmt) Exec(args []driver.Value) (driver.Result, error) {
|
||||
namedArgs[i] = driver.NamedValue{Ordinal: i + 1, Value: arg}
|
||||
}
|
||||
result, err := stmt.underlyingStmt.ExecContext(context.Background(), namedArgs)
|
||||
for err != nil && !stmt.conn.isInTransaction() && needsRetry(err) {
|
||||
for err != nil && !stmt.conn.isInTransaction() && NeedsRetry(err) {
|
||||
result, err = stmt.underlyingStmt.ExecContext(context.Background(), namedArgs)
|
||||
}
|
||||
return result, err
|
||||
@ -267,7 +267,7 @@ func (stmt *cockroachStmt) Query(args []driver.Value) (driver.Rows, error) {
|
||||
// ExecContext executes SQL statements in the specified context.
|
||||
func (stmt *cockroachStmt) ExecContext(ctx context.Context, args []driver.NamedValue) (driver.Result, error) {
|
||||
result, err := stmt.underlyingStmt.ExecContext(ctx, args)
|
||||
for err != nil && !stmt.conn.isInTransaction() && needsRetry(err) {
|
||||
for err != nil && !stmt.conn.isInTransaction() && NeedsRetry(err) {
|
||||
result, err = stmt.underlyingStmt.ExecContext(ctx, args)
|
||||
}
|
||||
return result, err
|
||||
@ -279,7 +279,7 @@ func (stmt *cockroachStmt) QueryContext(ctx context.Context, args []driver.Named
|
||||
for {
|
||||
result, err := stmt.underlyingStmt.QueryContext(ctx, args)
|
||||
if err != nil {
|
||||
if needsRetry(err) {
|
||||
if NeedsRetry(err) {
|
||||
if stmt.conn.isInTransaction() {
|
||||
return nil, err
|
||||
}
|
||||
@ -292,7 +292,7 @@ func (stmt *cockroachStmt) QueryContext(ctx context.Context, args []driver.Named
|
||||
// If this returns an error it's probably the same error
|
||||
// we got from calling Next inside wrapRows.
|
||||
_ = result.Close()
|
||||
if needsRetry(err) {
|
||||
if NeedsRetry(err) {
|
||||
if stmt.conn.isInTransaction() {
|
||||
return nil, err
|
||||
}
|
||||
@ -313,8 +313,9 @@ func translateName(name string) string {
|
||||
return name
|
||||
}
|
||||
|
||||
// borrowed from code in crdb
|
||||
func needsRetry(err error) bool {
|
||||
// NeedsRetry checks if the error code means a retry is needed,
|
||||
// borrowed from code in crdb.
|
||||
func NeedsRetry(err error) bool {
|
||||
code := errCode(err)
|
||||
return code == "40001" || code == "CR000"
|
||||
}
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/private/dbutil/cockroachutil"
|
||||
"storj.io/storj/storage"
|
||||
)
|
||||
|
||||
@ -82,15 +83,29 @@ func (opi *orderedCockroachIterator) Next(ctx context.Context, item *storage.Lis
|
||||
result := func() bool {
|
||||
defer mon.TaskNamed("acquire_new_query")(nil)(nil)
|
||||
|
||||
retry := false
|
||||
if err := opi.curRows.Err(); err != nil && err != sql.ErrNoRows {
|
||||
// This NeedsRetry needs to be exported here because it is
|
||||
// expected behavior for cockroach to return retryable errors
|
||||
// that will be captured in this Rows object.
|
||||
if cockroachutil.NeedsRetry(err) {
|
||||
mon.Event("needed_retry")
|
||||
retry = true
|
||||
} else {
|
||||
opi.errEncountered = errs.Wrap(err)
|
||||
return false
|
||||
}
|
||||
}
|
||||
if err := opi.curRows.Close(); err != nil {
|
||||
if cockroachutil.NeedsRetry(err) {
|
||||
mon.Event("needed_retry")
|
||||
retry = true
|
||||
} else {
|
||||
opi.errEncountered = errs.Wrap(err)
|
||||
return false
|
||||
}
|
||||
if opi.curIndex < opi.batchSize {
|
||||
}
|
||||
if opi.curIndex < opi.batchSize && !retry {
|
||||
return false
|
||||
}
|
||||
newRows, err := opi.doNextQuery(ctx)
|
||||
|
Loading…
Reference in New Issue
Block a user