storage/postgreskv: Reverting back to the venerable PG CAS
This was inadvertently converted to the Cockroach version. This reverts most of that and keeps the changes since then. Change-Id: Ia440eeebb01bc89fbfa8ce266668030173061469
This commit is contained in:
parent
b2c454dbd6
commit
818242f452
@ -4,7 +4,6 @@
|
|||||||
package postgreskv
|
package postgreskv
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
|
||||||
@ -14,7 +13,6 @@ import (
|
|||||||
|
|
||||||
"storj.io/storj/private/dbutil"
|
"storj.io/storj/private/dbutil"
|
||||||
"storj.io/storj/private/dbutil/pgutil"
|
"storj.io/storj/private/dbutil/pgutil"
|
||||||
"storj.io/storj/private/dbutil/txutil"
|
|
||||||
"storj.io/storj/private/tagsql"
|
"storj.io/storj/private/tagsql"
|
||||||
"storj.io/storj/storage"
|
"storj.io/storj/storage"
|
||||||
"storj.io/storj/storage/postgreskv/schema"
|
"storj.io/storj/storage/postgreskv/schema"
|
||||||
@ -201,6 +199,7 @@ func (client *Client) CompareAndSwap(ctx context.Context, key storage.Key, oldVa
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return Error.Wrap(err)
|
return Error.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return storage.ErrValueChanged.New("%q", key)
|
return storage.ErrValueChanged.New("%q", key)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -217,62 +216,55 @@ func (client *Client) CompareAndSwap(ctx context.Context, key storage.Key, oldVa
|
|||||||
if err == sql.ErrNoRows {
|
if err == sql.ErrNoRows {
|
||||||
return storage.ErrValueChanged.New("%q", key)
|
return storage.ErrValueChanged.New("%q", key)
|
||||||
}
|
}
|
||||||
|
|
||||||
return Error.Wrap(err)
|
return Error.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return txutil.WithTx(ctx, client.db, nil, func(_ context.Context, txn tagsql.Tx) error {
|
var row *sql.Row
|
||||||
q := "SELECT metadata FROM pathdata WHERE fullpath = $1::BYTEA;"
|
|
||||||
row := txn.QueryRowContext(ctx, q, []byte(key))
|
|
||||||
|
|
||||||
var metadata []byte
|
|
||||||
err = row.Scan(&metadata)
|
|
||||||
if err == sql.ErrNoRows {
|
|
||||||
// Row not found for this fullpath.
|
|
||||||
// Potentially because another concurrent transaction changed the row.
|
|
||||||
return storage.ErrKeyNotFound.New("%q", key)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return Error.Wrap(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if equal := bytes.Compare(metadata, oldValue); equal != 0 {
|
|
||||||
// If the row is found but the metadata has been already changed
|
|
||||||
// we can't continue to delete it.
|
|
||||||
return storage.ErrValueChanged.New("%q", key)
|
|
||||||
}
|
|
||||||
|
|
||||||
var res sql.Result
|
|
||||||
if newValue == nil {
|
if newValue == nil {
|
||||||
q = `
|
q := `
|
||||||
|
WITH matching_key AS (
|
||||||
|
SELECT * FROM pathdata WHERE fullpath = $1::BYTEA
|
||||||
|
), updated AS (
|
||||||
DELETE FROM pathdata
|
DELETE FROM pathdata
|
||||||
WHERE pathdata.fullpath = $1::BYTEA
|
USING matching_key mk
|
||||||
AND pathdata.metadata = $2::BYTEA
|
WHERE pathdata.metadata = $2::BYTEA
|
||||||
|
AND pathdata.fullpath = mk.fullpath
|
||||||
|
RETURNING 1
|
||||||
|
)
|
||||||
|
SELECT EXISTS(SELECT 1 FROM matching_key) AS key_present, EXISTS(SELECT 1 FROM updated) AS value_updated
|
||||||
`
|
`
|
||||||
|
row = client.db.QueryRow(ctx, q, []byte(key), []byte(oldValue))
|
||||||
res, err = txn.ExecContext(ctx, q, []byte(key), []byte(oldValue))
|
|
||||||
} else {
|
} else {
|
||||||
q = `
|
q := `
|
||||||
|
WITH matching_key AS (
|
||||||
|
SELECT * FROM pathdata WHERE fullpath = $1::BYTEA
|
||||||
|
), updated AS (
|
||||||
UPDATE pathdata
|
UPDATE pathdata
|
||||||
SET metadata = $3::BYTEA
|
SET metadata = $3::BYTEA
|
||||||
WHERE pathdata.fullpath = $1::BYTEA
|
FROM matching_key mk
|
||||||
AND pathdata.metadata = $2::BYTEA
|
WHERE pathdata.metadata = $2::BYTEA
|
||||||
|
AND pathdata.fullpath = mk.fullpath
|
||||||
|
RETURNING 1
|
||||||
|
)
|
||||||
|
SELECT EXISTS(SELECT 1 FROM matching_key) AS key_present, EXISTS(SELECT 1 FROM updated) AS value_updated;
|
||||||
`
|
`
|
||||||
res, err = txn.ExecContext(ctx, q, []byte(key), []byte(oldValue), []byte(newValue))
|
row = client.db.QueryRow(ctx, q, []byte(key), []byte(oldValue), []byte(newValue))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var keyPresent, valueUpdated bool
|
||||||
|
err = row.Scan(&keyPresent, &valueUpdated)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Error.Wrap(err)
|
return Error.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
affected, err := res.RowsAffected()
|
if !keyPresent {
|
||||||
if err != nil {
|
return storage.ErrKeyNotFound.New("%q", key)
|
||||||
return Error.Wrap(err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if affected != 1 {
|
if !valueUpdated {
|
||||||
return storage.ErrValueChanged.New("%q", key)
|
return storage.ErrValueChanged.New("%q", key)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user