From 818242f4520e4a1477b80c1a69ae5fbb7a854098 Mon Sep 17 00:00:00 2001 From: ccase Date: Wed, 22 Jan 2020 11:10:14 -0500 Subject: [PATCH] storage/postgreskv: Reverting back to the venerable PG CAS This was inadvertently converted to the Cockroach version. This reverts most of that and keeps the changes since then. Change-Id: Ia440eeebb01bc89fbfa8ce266668030173061469 --- storage/postgreskv/client.go | 94 +++++++++++++++++------------------- 1 file changed, 43 insertions(+), 51 deletions(-) diff --git a/storage/postgreskv/client.go b/storage/postgreskv/client.go index 3940c6641..0d700719e 100644 --- a/storage/postgreskv/client.go +++ b/storage/postgreskv/client.go @@ -4,7 +4,6 @@ package postgreskv import ( - "bytes" "context" "database/sql" @@ -14,7 +13,6 @@ import ( "storj.io/storj/private/dbutil" "storj.io/storj/private/dbutil/pgutil" - "storj.io/storj/private/dbutil/txutil" "storj.io/storj/private/tagsql" "storj.io/storj/storage" "storj.io/storj/storage/postgreskv/schema" @@ -201,6 +199,7 @@ func (client *Client) CompareAndSwap(ctx context.Context, key storage.Key, oldVa if err != nil { return Error.Wrap(err) } + return storage.ErrValueChanged.New("%q", key) } @@ -217,62 +216,55 @@ func (client *Client) CompareAndSwap(ctx context.Context, key storage.Key, oldVa if err == sql.ErrNoRows { return storage.ErrValueChanged.New("%q", key) } + return Error.Wrap(err) } - return txutil.WithTx(ctx, client.db, nil, func(_ context.Context, txn tagsql.Tx) error { - q := "SELECT metadata FROM pathdata WHERE fullpath = $1::BYTEA;" - row := txn.QueryRowContext(ctx, q, []byte(key)) - - var metadata []byte - err = row.Scan(&metadata) - if err == sql.ErrNoRows { - // Row not found for this fullpath. - // Potentially because another concurrent transaction changed the row. - return storage.ErrKeyNotFound.New("%q", key) - } - if err != nil { - return Error.Wrap(err) - } - - if equal := bytes.Compare(metadata, oldValue); equal != 0 { - // If the row is found but the metadata has been already changed - // we can't continue to delete it. - return storage.ErrValueChanged.New("%q", key) - } - - var res sql.Result - if newValue == nil { - q = ` - DELETE FROM pathdata - WHERE pathdata.fullpath = $1::BYTEA - AND pathdata.metadata = $2::BYTEA + var row *sql.Row + if newValue == nil { + q := ` + WITH matching_key AS ( + SELECT * FROM pathdata WHERE fullpath = $1::BYTEA + ), updated AS ( + DELETE FROM pathdata + USING matching_key mk + WHERE pathdata.metadata = $2::BYTEA + AND pathdata.fullpath = mk.fullpath + RETURNING 1 + ) + SELECT EXISTS(SELECT 1 FROM matching_key) AS key_present, EXISTS(SELECT 1 FROM updated) AS value_updated ` - - res, err = txn.ExecContext(ctx, q, []byte(key), []byte(oldValue)) - } else { - q = ` - UPDATE pathdata - SET metadata = $3::BYTEA - WHERE pathdata.fullpath = $1::BYTEA - AND pathdata.metadata = $2::BYTEA + row = client.db.QueryRow(ctx, q, []byte(key), []byte(oldValue)) + } else { + q := ` + WITH matching_key AS ( + SELECT * FROM pathdata WHERE fullpath = $1::BYTEA + ), updated AS ( + UPDATE pathdata + SET metadata = $3::BYTEA + FROM matching_key mk + WHERE pathdata.metadata = $2::BYTEA + AND pathdata.fullpath = mk.fullpath + RETURNING 1 + ) + SELECT EXISTS(SELECT 1 FROM matching_key) AS key_present, EXISTS(SELECT 1 FROM updated) AS value_updated; ` - res, err = txn.ExecContext(ctx, q, []byte(key), []byte(oldValue), []byte(newValue)) - } + row = client.db.QueryRow(ctx, q, []byte(key), []byte(oldValue), []byte(newValue)) + } - if err != nil { - return Error.Wrap(err) - } + var keyPresent, valueUpdated bool + err = row.Scan(&keyPresent, &valueUpdated) + if err != nil { + return Error.Wrap(err) + } - affected, err := res.RowsAffected() - if err != nil { - return Error.Wrap(err) - } + if !keyPresent { + return storage.ErrKeyNotFound.New("%q", key) + } - if affected != 1 { - return storage.ErrValueChanged.New("%q", key) - } + if !valueUpdated { + return storage.ErrValueChanged.New("%q", key) + } - return nil - }) + return nil }