satellite/satellitedb: prepare to remove big.Float from db

Why: big.Float is not an ideal type for dealing with monetary amounts,
because no matter how high the precision, some non-integer decimal
values can not be represented exactly in base-2 floating point. Also,
storing gob-encoded big.Float values in the database makes it very hard
to use those values in meaningful queries, making it difficult to do
any sort of analysis on billing.

Now that we have amounts represented using monetary.Amount, we can
simply store them in the database using integers (as given by the
.BaseUnits() method on monetary.Amount).

We should move toward storing the currency along with any monetary
amount, wherever we are storing amounts, because satellites might want
to deal with currencies other than STORJ and USD. Even better, it
becomes much clearer what currency each monetary value is _supposed_ to
be in (I had to dig through code to find that out for our current
monetary columns).

Deployment
----------

Getting rid of the big.Float columns will take multiple deployment
steps. There does not seem to be any way to make the change in a way
that lets existing queries continue to work on CockroachDB (it could be
done with rules and triggers and a stored procedure that knows how to
gob-decode big.Float objects, but CockroachDB doesn't have rules _or_
triggers _or_ stored procedures). Instead, in this first step, we make
no changes to the database schema, but add code that knows how to deal
with the planned changes to the schema when they are made in a future
"step 2" deployment. All functions that deal with the
coinbase_transactions table have been taught to recognize the "undefined
column" error, and when it is seen, to call a separate "transition shim"
function to accomplish the task. Once all the services are running this
code, and the step 2 deployment makes breaking changes to the schema,
any services that are still running and connected to the database will
keep working correctly because of the fallback code included here. The
step 2 deployment can be made without these transition shims included,
because it will apply the database schema changes before any of its code
runs.

Step 1:

    No schema changes; just include code that recognizes the
    "undefined column" error when dealing with the
    coinbase_transactions or stripecoinpayments_tx_conversion_rates
    tables, and if found, assumes that the column changes from Step
    2 have already been made.

Step 2:

    In coinbase_transactions:

     * change the names of the 'amount' and 'received' columns to
       'amount_gob' and 'received_gob' respectively
     * add new 'amount_numeric' and 'received_numeric' columns with
       INT8 type.

    In stripecoinpayments_tx_conversion_rates:

     * change the name of the 'rate' column to 'rate_gob'
     * add new 'rate_numeric' column with NUMERIC(8, 8) type

    Code reading from either of these tables must query both the X_gob
    and X_numeric columns. If X_numeric is not null, its value should
    be used; otherwise, the gob-encoded big.Float in X_gob should be
    used. A chore might be included in this step that transitions values
    from X_gob to X_numeric a few rows at a time.

Step 3:

    Once all prod satellites have no values left in the _gob columns, we
    can drop those columns and add NOT NULL constraints to the _numeric
    columns.

Change-Id: Id6db304b404e6fde44f5a8c23cdaeeaaa2324f20
This commit is contained in:
paul cannon 2021-08-10 17:30:23 -05:00
parent a16aecfa96
commit c053bdbd70
5 changed files with 710 additions and 31 deletions

View File

@ -89,7 +89,7 @@ func (tokens *storjTokens) Deposit(ctx context.Context, userID uuid.UUID, amount
return nil, Error.Wrap(err) return nil, Error.Wrap(err)
} }
cpTX, err := tokens.service.db.Transactions().Insert(ctx, createTime, err := tokens.service.db.Transactions().Insert(ctx,
Transaction{ Transaction{
ID: tx.ID, ID: tx.ID,
AccountID: userID, AccountID: userID,
@ -112,7 +112,7 @@ func (tokens *storjTokens) Deposit(ctx context.Context, userID uuid.UUID, amount
Status: payments.TransactionStatusPending, Status: payments.TransactionStatusPending,
Timeout: tx.Timeout, Timeout: tx.Timeout,
Link: tx.CheckoutURL, Link: tx.CheckoutURL,
CreatedAt: cpTX.CreatedAt, CreatedAt: createTime,
}, nil }, nil
} }

View File

@ -24,7 +24,7 @@ var ErrTransactionConsumed = errs.New("error transaction already consumed")
// architecture: Database // architecture: Database
type TransactionsDB interface { type TransactionsDB interface {
// Insert inserts new coinpayments transaction into DB. // Insert inserts new coinpayments transaction into DB.
Insert(ctx context.Context, tx Transaction) (*Transaction, error) Insert(ctx context.Context, tx Transaction) (time.Time, error)
// Update updates status and received for set of transactions. // Update updates status and received for set of transactions.
Update(ctx context.Context, updates []TransactionUpdate, applies coinpayments.TransactionIDList) error Update(ctx context.Context, updates []TransactionUpdate, applies coinpayments.TransactionIDList) error
// Consume marks transaction as consumed, so it won't participate in apply account balance loop. // Consume marks transaction as consumed, so it won't participate in apply account balance loop.

View File

@ -4,6 +4,7 @@
package stripecoinpayments_test package stripecoinpayments_test
import ( import (
"context"
"encoding/base64" "encoding/base64"
"errors" "errors"
"sync" "sync"
@ -51,9 +52,9 @@ func TestTransactionsDB(t *testing.T) {
} }
t.Run("insert", func(t *testing.T) { t.Run("insert", func(t *testing.T) {
tx, err := transactions.Insert(ctx, createTx) createdAt, err := transactions.Insert(ctx, createTx)
require.NoError(t, err) require.NoError(t, err)
requireSaneTimestamp(t, tx.CreatedAt) requireSaneTimestamp(t, createdAt)
txs, err := transactions.ListAccount(ctx, userID) txs, err := transactions.ListAccount(ctx, userID)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, txs, 1) require.Len(t, txs, 1)
@ -139,7 +140,7 @@ func TestConcurrentConsume(t *testing.T) {
userID := testrand.UUID() userID := testrand.UUID()
txID := coinpayments.TransactionID("testID") txID := coinpayments.TransactionID("testID")
tx, err := transactions.Insert(ctx, _, err = transactions.Insert(ctx,
stripecoinpayments.Transaction{ stripecoinpayments.Transaction{
ID: txID, ID: txID,
AccountID: userID, AccountID: userID,
@ -155,12 +156,12 @@ func TestConcurrentConsume(t *testing.T) {
err = transactions.Update(ctx, err = transactions.Update(ctx,
[]stripecoinpayments.TransactionUpdate{{ []stripecoinpayments.TransactionUpdate{{
TransactionID: tx.ID, TransactionID: txID,
Status: coinpayments.StatusCompleted, Status: coinpayments.StatusCompleted,
Received: received, Received: received,
}}, }},
coinpayments.TransactionIDList{ coinpayments.TransactionIDList{
tx.ID, txID,
}, },
) )
require.NoError(t, err) require.NoError(t, err)
@ -178,7 +179,7 @@ func TestConcurrentConsume(t *testing.T) {
var group errs2.Group var group errs2.Group
for i := 0; i < concurrentTries; i++ { for i := 0; i < concurrentTries; i++ {
group.Go(func() error { group.Go(func() error {
err := transactions.Consume(ctx, tx.ID) err := transactions.Consume(ctx, txID)
if err == nil { if err == nil {
return nil return nil
@ -422,3 +423,215 @@ func TestTransactions_ApplyTransactionBalance(t *testing.T) {
require.EqualValues(t, "0.2", cbt.Metadata["storj_usd_rate"]) require.EqualValues(t, "0.2", cbt.Metadata["storj_usd_rate"])
}) })
} }
func TestDatabaseBigFloatTransition(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
transactions := db.StripeCoinPayments().Transactions()
accountID := testrand.UUID()
amount, err := monetary.AmountFromString("100000000.00000001", monetary.StorjToken)
require.NoError(t, err)
received, err := monetary.AmountFromString("0.00000002", monetary.StorjToken)
require.NoError(t, err)
rate1, err := decimal.NewFromString("1001.23456789")
require.NoError(t, err)
rate2, err := decimal.NewFromString("3.14159265")
require.NoError(t, err)
tx1 := stripecoinpayments.Transaction{
ID: "abcdefg",
AccountID: accountID,
Address: "0xbeedeebeedeebeedeebeedeebeedeebeedeebeed",
Amount: amount,
Received: received,
Status: coinpayments.StatusPending,
Key: "beep beep im a sheep",
Timeout: time.Hour * 48,
}
tx2 := stripecoinpayments.Transaction{
ID: "hijklmn",
AccountID: accountID,
Address: "0x77687927642075206576656e20626f746865723f",
Amount: amount,
Received: received,
Status: coinpayments.StatusPending,
Key: "how now im a cow",
Timeout: 0,
}
// perform an Insert on old schema
{
createdAt, err := transactions.Insert(ctx, tx1)
require.NoError(t, err)
requireSaneTimestamp(t, createdAt)
tx1.CreatedAt = createdAt
}
// perform an Update on old schema
{
newReceived, err := monetary.AmountFromString("0.12345678", monetary.StorjToken)
require.NoError(t, err)
upd := stripecoinpayments.TransactionUpdate{
TransactionID: tx1.ID,
Status: coinpayments.StatusReceived,
Received: newReceived,
}
err = transactions.Update(ctx, []stripecoinpayments.TransactionUpdate{upd}, coinpayments.TransactionIDList{tx1.ID})
require.NoError(t, err)
tx1.Status = coinpayments.StatusReceived
tx1.Received = newReceived
}
// perform a ListAccount on old schema
{
accountTxs, err := transactions.ListAccount(ctx, accountID)
require.NoError(t, err)
require.Len(t, accountTxs, 1)
require.Equal(t, tx1, accountTxs[0])
}
// perform a ListPending on old schema
{
pendingTxs, err := transactions.ListPending(ctx, 0, 10, time.Now().UTC())
require.NoError(t, err)
require.Len(t, pendingTxs.Transactions, 1)
// ListPending doesn't get the timeout column, so set it manually in order to check equality of other fields
pendingTxs.Transactions[0].Timeout = tx1.Timeout
require.Equal(t, tx1, pendingTxs.Transactions[0])
require.False(t, pendingTxs.Next)
require.Equal(t, int64(0), pendingTxs.NextOffset)
}
// perform a ListUnapplied on old schema
{
unappliedTxs, err := transactions.ListUnapplied(ctx, 0, 10, time.Now().UTC())
require.NoError(t, err)
require.Len(t, unappliedTxs.Transactions, 1)
// ListUnapplied doesn't get the timeout column, so set it manually in order to check equality of other fields
unappliedTxs.Transactions[0].Timeout = tx1.Timeout
require.Equal(t, tx1, unappliedTxs.Transactions[0])
require.False(t, unappliedTxs.Next)
require.Equal(t, int64(0), unappliedTxs.NextOffset)
}
// perform a LockRate on old schema
{
err = transactions.LockRate(ctx, tx1.ID, rate1)
require.NoError(t, err)
}
// perform a GetLockedRate on old schema
{
gotRate, err := transactions.GetLockedRate(ctx, tx1.ID)
require.NoError(t, err)
require.Equal(t, rate1, gotRate)
}
// do schema update
{
transitionDB := transactions.(interface {
DebugPerformBigFloatTransition(ctx context.Context) error
})
err = transitionDB.DebugPerformBigFloatTransition(ctx)
require.NoError(t, err)
}
// perform an Insert on new schema
{
createdAt, err := transactions.Insert(ctx, tx2)
require.NoError(t, err)
requireSaneTimestamp(t, createdAt)
tx2.CreatedAt = createdAt
}
// perform a ListAccount on new schema, while tx1 has a received_gob
{
accountTxs, err := transactions.ListAccount(ctx, accountID)
require.NoError(t, err)
require.Len(t, accountTxs, 2)
// results should be ordered in reverse order of creation
require.Equal(t, tx2, accountTxs[0])
require.Equal(t, tx1, accountTxs[1])
}
// perform an Update on new schema, on a row with a received_gob
{
newReceived, err := monetary.AmountFromString("1.11111111", monetary.StorjToken)
require.NoError(t, err)
upd := stripecoinpayments.TransactionUpdate{
TransactionID: tx1.ID,
Status: coinpayments.StatusPending,
Received: newReceived,
}
err = transactions.Update(ctx, []stripecoinpayments.TransactionUpdate{upd}, nil)
require.NoError(t, err)
tx1.Status = coinpayments.StatusPending
tx1.Received = newReceived
}
// perform an Update on new schema, on a row with a received_numeric
{
newReceived, err := monetary.AmountFromString("2.12121212", monetary.StorjToken)
require.NoError(t, err)
upd := stripecoinpayments.TransactionUpdate{
TransactionID: tx2.ID,
Status: coinpayments.StatusCompleted,
Received: newReceived,
}
err = transactions.Update(ctx, []stripecoinpayments.TransactionUpdate{upd}, coinpayments.TransactionIDList{tx2.ID})
require.NoError(t, err)
tx2.Status = coinpayments.StatusCompleted
tx2.Received = newReceived
}
// perform a ListAccount on new schema, after the above changes
{
accountTxs, err := transactions.ListAccount(ctx, accountID)
require.NoError(t, err)
require.Len(t, accountTxs, 2)
// results should be ordered in reverse order of creation
require.Equal(t, tx2, accountTxs[0])
require.Equal(t, tx1, accountTxs[1])
}
// perform a ListPending on new schema
{
pendingTxs, err := transactions.ListPending(ctx, 0, 10, time.Now().UTC())
require.NoError(t, err)
require.Len(t, pendingTxs.Transactions, 1)
// ListPending doesn't get the timeout column, so set it manually in order to check equality of other fields
pendingTxs.Transactions[0].Timeout = tx1.Timeout
require.Equal(t, tx1, pendingTxs.Transactions[0])
require.False(t, pendingTxs.Next)
require.Equal(t, int64(0), pendingTxs.NextOffset)
}
// perform a ListUnapplied on new schema
{
unappliedTxs, err := transactions.ListUnapplied(ctx, 0, 10, time.Now().UTC())
require.NoError(t, err)
require.Len(t, unappliedTxs.Transactions, 1)
// ListUnapplied doesn't get the timeout column, so set it manually in order to check equality of other fields
unappliedTxs.Transactions[0].Timeout = tx2.Timeout
require.Equal(t, tx2, unappliedTxs.Transactions[0])
require.False(t, unappliedTxs.Next)
require.Equal(t, int64(0), unappliedTxs.NextOffset)
}
// perform a LockRate on new schema
{
err = transactions.LockRate(ctx, tx2.ID, rate2)
require.NoError(t, err)
}
// perform a GetLockedRate on new schema
{
gotRate, err := transactions.GetLockedRate(ctx, tx1.ID)
require.NoError(t, err)
require.Equal(t, rate1, gotRate)
gotRate, err = transactions.GetLockedRate(ctx, tx2.ID)
require.NoError(t, err)
require.Equal(t, rate2, gotRate)
}
})
}

View File

@ -8,10 +8,12 @@ import (
"math/big" "math/big"
"time" "time"
pgxerrcode "github.com/jackc/pgerrcode"
"github.com/shopspring/decimal" "github.com/shopspring/decimal"
"github.com/zeebo/errs" "github.com/zeebo/errs"
"storj.io/common/uuid" "storj.io/common/uuid"
"storj.io/private/dbutil/pgutil/pgerrcode"
"storj.io/storj/satellite/payments/coinpayments" "storj.io/storj/satellite/payments/coinpayments"
"storj.io/storj/satellite/payments/monetary" "storj.io/storj/satellite/payments/monetary"
"storj.io/storj/satellite/payments/stripecoinpayments" "storj.io/storj/satellite/payments/stripecoinpayments"
@ -44,16 +46,16 @@ type coinPaymentsTransactions struct {
} }
// Insert inserts new coinpayments transaction into DB. // Insert inserts new coinpayments transaction into DB.
func (db *coinPaymentsTransactions) Insert(ctx context.Context, tx stripecoinpayments.Transaction) (_ *stripecoinpayments.Transaction, err error) { func (db *coinPaymentsTransactions) Insert(ctx context.Context, tx stripecoinpayments.Transaction) (createTime time.Time, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
amount, err := tx.Amount.AsBigFloat().GobEncode() amount, err := tx.Amount.AsBigFloat().GobEncode()
if err != nil { if err != nil {
return nil, errs.Wrap(err) return time.Time{}, errs.Wrap(err)
} }
received, err := tx.Received.AsBigFloat().GobEncode() received, err := tx.Received.AsBigFloat().GobEncode()
if err != nil { if err != nil {
return nil, errs.Wrap(err) return time.Time{}, errs.Wrap(err)
} }
dbxCPTX, err := db.db.Create_CoinpaymentsTransaction(ctx, dbxCPTX, err := db.db.Create_CoinpaymentsTransaction(ctx,
@ -67,10 +69,34 @@ func (db *coinPaymentsTransactions) Insert(ctx context.Context, tx stripecoinpay
dbx.CoinpaymentsTransaction_Timeout(int(tx.Timeout.Seconds())), dbx.CoinpaymentsTransaction_Timeout(int(tx.Timeout.Seconds())),
) )
if err != nil { if err != nil {
return nil, err if errCode := pgerrcode.FromError(err); errCode == pgxerrcode.UndefinedColumn {
// TEMPORARY: fall back to expected new schema to facilitate transition
return db.insertTransitionShim(ctx, tx)
}
return time.Time{}, err
} }
return dbxCPTX.CreatedAt, nil
}
return fromDBXCoinpaymentsTransaction(dbxCPTX) // insertTransitionShim inserts new coinpayments transaction into DB.
//
// It is to be used only during the transition from gob-encoded 'amount' and
// 'received' columns to 'amount_numeric'/'received_numeric'.
//
// When the transition is complete, this method will go away and Insert()
// will operate only on the _numeric columns.
func (db *coinPaymentsTransactions) insertTransitionShim(ctx context.Context, tx stripecoinpayments.Transaction) (createTime time.Time, err error) {
row := db.db.DB.QueryRowContext(ctx, db.db.Rebind(`
INSERT INTO coinpayments_transactions (
id, user_id, address, amount_numeric, received_numeric, status, key, timeout, created_at
) VALUES (
?, ?, ?, ?, ?, ?, ?, ?, now()
) RETURNING created_at;
`), tx.ID.String(), tx.AccountID[:], tx.Address, tx.Amount.BaseUnits(), tx.Received.BaseUnits(), tx.Status.Int(), tx.Key, int(tx.Timeout.Seconds()))
if err := row.Scan(&createTime); err != nil {
return time.Time{}, Error.Wrap(err)
}
return createTime, nil
} }
// Update updates status and received for set of transactions. // Update updates status and received for set of transactions.
@ -81,9 +107,9 @@ func (db *coinPaymentsTransactions) Update(ctx context.Context, updates []stripe
return nil return nil
} }
return db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error { err = db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
for _, update := range updates { for _, update := range updates {
received, err := update.Received.AsBigFloat().GobEncode() receivedGob, err := update.Received.AsBigFloat().GobEncode()
if err != nil { if err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
@ -91,7 +117,7 @@ func (db *coinPaymentsTransactions) Update(ctx context.Context, updates []stripe
_, err = tx.Update_CoinpaymentsTransaction_By_Id(ctx, _, err = tx.Update_CoinpaymentsTransaction_By_Id(ctx,
dbx.CoinpaymentsTransaction_Id(update.TransactionID.String()), dbx.CoinpaymentsTransaction_Id(update.TransactionID.String()),
dbx.CoinpaymentsTransaction_Update_Fields{ dbx.CoinpaymentsTransaction_Update_Fields{
Received: dbx.CoinpaymentsTransaction_Received(received), Received: dbx.CoinpaymentsTransaction_Received(receivedGob),
Status: dbx.CoinpaymentsTransaction_Status(update.Status.Int()), Status: dbx.CoinpaymentsTransaction_Status(update.Status.Int()),
}, },
) )
@ -111,19 +137,73 @@ func (db *coinPaymentsTransactions) Update(ctx context.Context, updates []stripe
return nil return nil
}) })
if err != nil {
if errCode := pgerrcode.FromError(err); errCode == pgxerrcode.UndefinedColumn {
// TEMPORARY: fall back to expected new schema to facilitate transition
return db.updateTransitionShim(ctx, updates, applies)
}
}
return err
}
// updateTransitionShim updates status and received for set of transactions.
//
// It is to be used only during the transition from gob-encoded 'amount' and
// 'received' columns to 'amount_numeric'/'received_numeric'. During the
// transition, the gob-encoded columns will still exist but under a different
// name ('amount_gob'/'received_gob'). If the _numeric column value for a given
// row is non-null, it takes precedence over the corresponding _gob column.
//
// When the transition is complete, this method will go away and
// Update() will operate only on the _numeric columns.
func (db *coinPaymentsTransactions) updateTransitionShim(ctx context.Context, updates []stripecoinpayments.TransactionUpdate, applies coinpayments.TransactionIDList) (err error) {
defer mon.Task()(&ctx)(&err)
if len(updates) == 0 {
return nil
}
return db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
for _, update := range updates {
query := db.db.Rebind(`
UPDATE coinpayments_transactions
SET
received_gob = NULL,
received_numeric = ?,
status = ?
WHERE id = ?
`)
_, err := tx.Tx.ExecContext(ctx, query, update.Received.BaseUnits(), update.Status.Int(), update.TransactionID.String())
if err != nil {
return err
}
}
for _, txID := range applies {
query := db.db.Rebind(`INSERT INTO stripecoinpayments_apply_balance_intents ( tx_id, state, created_at )
VALUES ( ?, ?, ? ) ON CONFLICT DO NOTHING`)
_, err = tx.Tx.ExecContext(ctx, query, txID.String(), applyBalanceIntentStateUnapplied.Int(), db.db.Hooks.Now().UTC())
if err != nil {
return err
}
}
return nil
})
} }
// Consume marks transaction as consumed, so it won't participate in apply account balance loop. // Consume marks transaction as consumed, so it won't participate in apply account balance loop.
func (db *coinPaymentsTransactions) Consume(ctx context.Context, id coinpayments.TransactionID) (err error) { func (db *coinPaymentsTransactions) Consume(ctx context.Context, id coinpayments.TransactionID) (err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
query := db.db.Rebind(` query := db.db.Rebind(`
WITH intent AS ( WITH intent AS (
SELECT tx_id, state FROM stripecoinpayments_apply_balance_intents WHERE tx_id = ? SELECT tx_id, state FROM stripecoinpayments_apply_balance_intents WHERE tx_id = ?
), updated AS ( ), updated AS (
UPDATE stripecoinpayments_apply_balance_intents AS ints UPDATE stripecoinpayments_apply_balance_intents AS ints
SET SET
state = ? state = ?
FROM intent FROM intent
WHERE intent.tx_id = ints.tx_id AND ints.state = ? WHERE intent.tx_id = ints.tx_id AND ints.state = ?
RETURNING 1 RETURNING 1
@ -154,14 +234,39 @@ func (db *coinPaymentsTransactions) LockRate(ctx context.Context, id coinpayment
buff, err := rate.BigFloat().GobEncode() buff, err := rate.BigFloat().GobEncode()
if err != nil { if err != nil {
return errs.Wrap(err) return Error.Wrap(err)
} }
_, err = db.db.Create_StripecoinpaymentsTxConversionRate(ctx, _, err = db.db.Create_StripecoinpaymentsTxConversionRate(ctx,
dbx.StripecoinpaymentsTxConversionRate_TxId(id.String()), dbx.StripecoinpaymentsTxConversionRate_TxId(id.String()),
dbx.StripecoinpaymentsTxConversionRate_Rate(buff)) dbx.StripecoinpaymentsTxConversionRate_Rate(buff))
return err if err != nil {
if errCode := pgerrcode.FromError(err); errCode == pgxerrcode.UndefinedColumn {
// TEMPORARY: fall back to expected new schema to facilitate transition
return db.lockRateTransitionShim(ctx, id, rate)
}
}
return Error.Wrap(err)
}
// lockRateTransitionShim locks conversion rate for transaction.
//
// It is to be used only during the transition from the gob-encoded 'rate'
// column to 'rate_numeric'.
//
// When the transition is complete, this method will go away and
// LockRate() will operate only on the _numeric column.
func (db *coinPaymentsTransactions) lockRateTransitionShim(ctx context.Context, id coinpayments.TransactionID, rate decimal.Decimal) (err error) {
defer mon.Task()(&ctx)(&err)
now := time.Now().UTC()
query := db.db.Rebind(`
INSERT INTO stripecoinpayments_tx_conversion_rates ( tx_id, rate_numeric, created_at ) VALUES ( ?, ?, ? )
`)
_, err = db.db.DB.ExecContext(ctx, query, id.String(), rate.String(), now)
return Error.Wrap(err)
} }
// GetLockedRate returns locked conversion rate for transaction or error if non exists. // GetLockedRate returns locked conversion rate for transaction or error if non exists.
@ -172,6 +277,10 @@ func (db *coinPaymentsTransactions) GetLockedRate(ctx context.Context, id coinpa
dbx.StripecoinpaymentsTxConversionRate_TxId(id.String()), dbx.StripecoinpaymentsTxConversionRate_TxId(id.String()),
) )
if err != nil { if err != nil {
if errCode := pgerrcode.FromError(err); errCode == pgxerrcode.UndefinedColumn {
// TEMPORARY: fall back to expected new schema to facilitate transition
return db.getLockedRateTransitionShim(ctx, id)
}
return decimal.Decimal{}, err return decimal.Decimal{}, err
} }
@ -187,6 +296,45 @@ func (db *coinPaymentsTransactions) GetLockedRate(ctx context.Context, id coinpa
return rate, nil return rate, nil
} }
// getLockedRateTransitionShim returns locked conversion rate for transaction
// or error if none exists.
//
// It is to be used only during the transition from the gob-encoded 'rate'
// column to 'rate_numeric'. During the transition, the gob-encoded column will
// still exist but under a different name ('rate_gob'). If rate_numeric for a
// given row is non-null, it takes precedence over rate_gob.
//
// When the transition is complete, this method will go away and
// GetLockedRate() will operate only on rate_numeric.
func (db *coinPaymentsTransactions) getLockedRateTransitionShim(ctx context.Context, id coinpayments.TransactionID) (_ decimal.Decimal, err error) {
defer mon.Task()(&ctx)(&err)
var rateGob []byte
var rateNumeric *string
query := db.db.Rebind(`
SELECT rate_gob, rate_numeric
FROM stripecoinpayments_tx_conversion_rates
WHERE tx_id = ?
`)
row := db.db.DB.QueryRowContext(ctx, query, id.String())
err = row.Scan(&rateGob, &rateNumeric)
if err != nil {
return decimal.Decimal{}, Error.Wrap(err)
}
if rateNumeric == nil {
// This row does not have a numeric rate value yet
var rateF big.Float
if err = rateF.GobDecode(rateGob); err != nil {
return decimal.Decimal{}, Error.Wrap(err)
}
rate, err := monetary.DecimalFromBigFloat(&rateF)
return rate, Error.Wrap(err)
}
rate, err := decimal.NewFromString(*rateNumeric)
return rate, Error.Wrap(err)
}
// ListAccount returns all transaction for specific user. // ListAccount returns all transaction for specific user.
func (db *coinPaymentsTransactions) ListAccount(ctx context.Context, userID uuid.UUID) (_ []stripecoinpayments.Transaction, err error) { func (db *coinPaymentsTransactions) ListAccount(ctx context.Context, userID uuid.UUID) (_ []stripecoinpayments.Transaction, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
@ -195,6 +343,10 @@ func (db *coinPaymentsTransactions) ListAccount(ctx context.Context, userID uuid
dbx.CoinpaymentsTransaction_UserId(userID[:]), dbx.CoinpaymentsTransaction_UserId(userID[:]),
) )
if err != nil { if err != nil {
if errCode := pgerrcode.FromError(err); errCode == pgxerrcode.UndefinedColumn {
// TEMPORARY: fall back to expected new schema to facilitate transition
return db.listAccountTransitionShim(ctx, userID)
}
return nil, err return nil, err
} }
@ -211,11 +363,86 @@ func (db *coinPaymentsTransactions) ListAccount(ctx context.Context, userID uuid
return txs, nil return txs, nil
} }
// listAccountTransitionShim returns all transaction for specific user.
//
// It is to be used only during the transition from gob-encoded 'amount' and
// 'received' columns to 'amount_numeric'/'received_numeric'. During the
// transition, the gob-encoded columns will still exist but under a different
// name ('amount_gob'/'received_gob'). If the _numeric column value for a given
// row is non-null, it takes precedence over the corresponding _gob column.
//
// When the transition is complete, this method will go away and ListAccount()
// will operate only on the _numeric columns.
func (db *coinPaymentsTransactions) listAccountTransitionShim(ctx context.Context, userID uuid.UUID) (_ []stripecoinpayments.Transaction, err error) {
defer mon.Task()(&ctx)(&err)
query := db.db.Rebind(`
SELECT
id,
user_id,
address,
amount_gob,
amount_numeric,
received_gob,
received_numeric,
status,
key,
timeout,
created_at
FROM coinpayments_transactions
WHERE user_id = ?
ORDER BY created_at DESC
`)
rows, err := db.db.DB.QueryContext(ctx, query, userID[:])
if err != nil {
return nil, err
}
defer func() { err = errs.Combine(err, rows.Close()) }()
var txs []stripecoinpayments.Transaction
for rows.Next() {
var tx stripecoinpayments.Transaction
var amountGob, receivedGob []byte
var amountNumeric, receivedNumeric *int64
var timeoutSeconds int
err := rows.Scan(&tx.ID, &tx.AccountID, &tx.Address, &amountGob, &amountNumeric, &receivedGob, &receivedNumeric, &tx.Status, &tx.Key, &timeoutSeconds, &tx.CreatedAt)
if err != nil {
return nil, Error.Wrap(err)
}
tx.Timeout = time.Second * time.Duration(timeoutSeconds)
if amountNumeric == nil {
tx.Amount, err = monetaryAmountFromGobEncodedBigFloat(amountGob, monetary.StorjToken)
if err != nil {
return nil, Error.New("amount column: %v", err)
}
} else {
tx.Amount = monetary.AmountFromBaseUnits(*amountNumeric, monetary.StorjToken)
}
if receivedNumeric == nil {
tx.Received, err = monetaryAmountFromGobEncodedBigFloat(receivedGob, monetary.StorjToken)
if err != nil {
return nil, Error.New("received column: %v", err)
}
} else {
tx.Received = monetary.AmountFromBaseUnits(*receivedNumeric, monetary.StorjToken)
}
txs = append(txs, tx)
}
if err = rows.Err(); err != nil {
return nil, err
}
return txs, nil
}
// ListPending returns paginated list of pending transactions. // ListPending returns paginated list of pending transactions.
func (db *coinPaymentsTransactions) ListPending(ctx context.Context, offset int64, limit int, before time.Time) (_ stripecoinpayments.TransactionsPage, err error) { func (db *coinPaymentsTransactions) ListPending(ctx context.Context, offset int64, limit int, before time.Time) (_ stripecoinpayments.TransactionsPage, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
query := db.db.Rebind(`SELECT query := db.db.Rebind(`SELECT
id, id,
user_id, user_id,
address, address,
@ -224,7 +451,7 @@ func (db *coinPaymentsTransactions) ListPending(ctx context.Context, offset int6
status, status,
key, key,
created_at created_at
FROM coinpayments_transactions FROM coinpayments_transactions
WHERE status IN (?,?) WHERE status IN (?,?)
AND created_at <= ? AND created_at <= ?
ORDER by created_at DESC ORDER by created_at DESC
@ -232,6 +459,10 @@ func (db *coinPaymentsTransactions) ListPending(ctx context.Context, offset int6
rows, err := db.db.QueryContext(ctx, query, coinpayments.StatusPending, coinpayments.StatusReceived, before, limit+1, offset) rows, err := db.db.QueryContext(ctx, query, coinpayments.StatusPending, coinpayments.StatusReceived, before, limit+1, offset)
if err != nil { if err != nil {
if errCode := pgerrcode.FromError(err); errCode == pgxerrcode.UndefinedColumn {
// TEMPORARY: fall back to expected new schema to facilitate transition
return db.listPendingTransitionShim(ctx, offset, limit, before)
}
return stripecoinpayments.TransactionsPage{}, err return stripecoinpayments.TransactionsPage{}, err
} }
@ -294,11 +525,112 @@ func (db *coinPaymentsTransactions) ListPending(ctx context.Context, offset int6
return page, nil return page, nil
} }
// listPendingTransitionShim returns paginated list of pending transactions.
//
// It is to be used only during the transition from gob-encoded 'amount' and
// 'received' columns to 'amount_numeric'/'received_numeric'. During the
// transition, the gob-encoded columns will still exist but under a different
// name ('amount_gob'/'received_gob'). If the _numeric column value for a given
// row is non-null, it takes precedence over the corresponding _gob column.
//
// When the transition is complete, this method will go away and ListPending()
// will operate only on the _numeric columns.
func (db *coinPaymentsTransactions) listPendingTransitionShim(ctx context.Context, offset int64, limit int, before time.Time) (_ stripecoinpayments.TransactionsPage, err error) {
defer mon.Task()(&ctx)(&err)
query := db.db.Rebind(`SELECT
id,
user_id,
address,
amount_gob,
amount_numeric,
received_gob,
received_numeric,
status,
key,
created_at
FROM coinpayments_transactions
WHERE status IN (?,?)
AND created_at <= ?
LIMIT ? OFFSET ?`)
rows, err := db.db.QueryContext(ctx, query, coinpayments.StatusPending, coinpayments.StatusReceived, before, limit+1, offset)
if err != nil {
return stripecoinpayments.TransactionsPage{}, Error.Wrap(err)
}
defer func() {
err = errs.Combine(err, rows.Close())
}()
var page stripecoinpayments.TransactionsPage
for rows.Next() {
var id, address string
var userID uuid.UUID
var amountGob, receivedGob []byte
var amountNumeric, receivedNumeric *int64
var amount, received monetary.Amount
var status int
var key string
var createdAt time.Time
err := rows.Scan(&id, &userID, &address, &amountGob, &amountNumeric, &receivedGob, &receivedNumeric, &status, &key, &createdAt)
if err != nil {
return stripecoinpayments.TransactionsPage{}, err
}
if amountNumeric == nil {
// 'amount' in this row hasn't yet been updated to a numeric value
amount, err = monetaryAmountFromGobEncodedBigFloat(amountGob, monetary.StorjToken)
if err != nil {
return stripecoinpayments.TransactionsPage{}, Error.Wrap(err)
}
} else {
amount = monetary.AmountFromBaseUnits(*amountNumeric, monetary.StorjToken)
}
if receivedNumeric == nil {
// 'received' in this row hasn't yet been updated to a numeric value
received, err = monetaryAmountFromGobEncodedBigFloat(receivedGob, monetary.StorjToken)
if err != nil {
return stripecoinpayments.TransactionsPage{}, Error.Wrap(err)
}
} else {
received = monetary.AmountFromBaseUnits(*receivedNumeric, monetary.StorjToken)
}
page.Transactions = append(page.Transactions,
stripecoinpayments.Transaction{
ID: coinpayments.TransactionID(id),
AccountID: userID,
Address: address,
Amount: amount,
Received: received,
Status: coinpayments.Status(status),
Key: key,
CreatedAt: createdAt,
},
)
}
if err = rows.Err(); err != nil {
return stripecoinpayments.TransactionsPage{}, err
}
if len(page.Transactions) == limit+1 {
page.Next = true
page.NextOffset = offset + int64(limit)
page.Transactions = page.Transactions[:len(page.Transactions)-1]
}
return page, nil
}
// ListUnapplied returns TransactionsPage with a pending or completed status, that should be applied to account balance. // ListUnapplied returns TransactionsPage with a pending or completed status, that should be applied to account balance.
func (db *coinPaymentsTransactions) ListUnapplied(ctx context.Context, offset int64, limit int, before time.Time) (_ stripecoinpayments.TransactionsPage, err error) { func (db *coinPaymentsTransactions) ListUnapplied(ctx context.Context, offset int64, limit int, before time.Time) (_ stripecoinpayments.TransactionsPage, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
query := db.db.Rebind(`SELECT query := db.db.Rebind(`SELECT
txs.id, txs.id,
txs.user_id, txs.user_id,
txs.address, txs.address,
@ -307,7 +639,7 @@ func (db *coinPaymentsTransactions) ListUnapplied(ctx context.Context, offset in
txs.status, txs.status,
txs.key, txs.key,
txs.created_at txs.created_at
FROM coinpayments_transactions as txs FROM coinpayments_transactions as txs
INNER JOIN stripecoinpayments_apply_balance_intents as ints INNER JOIN stripecoinpayments_apply_balance_intents as ints
ON txs.id = ints.tx_id ON txs.id = ints.tx_id
WHERE txs.status >= ? WHERE txs.status >= ?
@ -318,6 +650,9 @@ func (db *coinPaymentsTransactions) ListUnapplied(ctx context.Context, offset in
rows, err := db.db.QueryContext(ctx, query, coinpayments.StatusReceived, before, applyBalanceIntentStateUnapplied, limit+1, offset) rows, err := db.db.QueryContext(ctx, query, coinpayments.StatusReceived, before, applyBalanceIntentStateUnapplied, limit+1, offset)
if err != nil { if err != nil {
if errCode := pgerrcode.FromError(err); errCode == pgxerrcode.UndefinedColumn {
return db.listUnappliedTransitionShim(ctx, offset, limit, before)
}
return stripecoinpayments.TransactionsPage{}, err return stripecoinpayments.TransactionsPage{}, err
} }
defer func() { err = errs.Combine(err, rows.Close()) }() defer func() { err = errs.Combine(err, rows.Close()) }()
@ -343,11 +678,114 @@ func (db *coinPaymentsTransactions) ListUnapplied(ctx context.Context, offset in
amount, err := monetaryAmountFromGobEncodedBigFloat(amountB, currency) amount, err := monetaryAmountFromGobEncodedBigFloat(amountB, currency)
if err != nil { if err != nil {
return stripecoinpayments.TransactionsPage{}, errs.Wrap(err) return stripecoinpayments.TransactionsPage{}, Error.New("amount column: %v", err)
} }
received, err := monetaryAmountFromGobEncodedBigFloat(receivedB, currency) received, err := monetaryAmountFromGobEncodedBigFloat(receivedB, currency)
if err != nil { if err != nil {
return stripecoinpayments.TransactionsPage{}, errs.Wrap(err) return stripecoinpayments.TransactionsPage{}, Error.New("received column: %v", err)
}
page.Transactions = append(page.Transactions,
stripecoinpayments.Transaction{
ID: coinpayments.TransactionID(id),
AccountID: userID,
Address: address,
Amount: amount,
Received: received,
Status: coinpayments.Status(status),
Key: key,
CreatedAt: createdAt,
},
)
}
if err = rows.Err(); err != nil {
return stripecoinpayments.TransactionsPage{}, err
}
if len(page.Transactions) == limit+1 {
page.Next = true
page.NextOffset = offset + int64(limit)
page.Transactions = page.Transactions[:len(page.Transactions)-1]
}
return page, nil
}
// listUnappliedTransitionShim returns TransactionsPage with a pending or
// completed status, that should be applied to account balance.
//
// It is to be used only during the transition from gob-encoded 'amount' and
// 'received' columns to 'amount_numeric'/'received_numeric'. During the
// transition, the gob-encoded columns will still exist but under a different
// name ('amount_gob'/'received_gob'). If the _numeric column value for a given
// row is non-null, it takes precedence over the corresponding _gob column.
//
// When the transition is complete, this method will go away and
// ListUnapplied() will operate only on the _numeric columns.
func (db *coinPaymentsTransactions) listUnappliedTransitionShim(ctx context.Context, offset int64, limit int, before time.Time) (_ stripecoinpayments.TransactionsPage, err error) {
defer mon.Task()(&ctx)(&err)
query := db.db.Rebind(`SELECT
txs.id,
txs.user_id,
txs.address,
txs.amount_gob,
txs.amount_numeric,
txs.received_gob,
txs.received_numeric,
txs.status,
txs.key,
txs.created_at
FROM coinpayments_transactions as txs
INNER JOIN stripecoinpayments_apply_balance_intents as ints
ON txs.id = ints.tx_id
WHERE txs.status >= ?
AND txs.created_at <= ?
AND ints.state = ?
ORDER by txs.created_at DESC
LIMIT ? OFFSET ?`)
rows, err := db.db.QueryContext(ctx, query, coinpayments.StatusReceived, before, applyBalanceIntentStateUnapplied, limit+1, offset)
if err != nil {
return stripecoinpayments.TransactionsPage{}, err
}
defer func() { err = errs.Combine(err, rows.Close()) }()
var page stripecoinpayments.TransactionsPage
for rows.Next() {
var id, address string
var userID uuid.UUID
var amountGob, receivedGob []byte
var amountNumeric, receivedNumeric *int64
var status int
var key string
var createdAt time.Time
err := rows.Scan(&id, &userID, &address, &amountGob, &amountNumeric, &receivedGob, &receivedNumeric, &status, &key, &createdAt)
if err != nil {
return stripecoinpayments.TransactionsPage{}, err
}
var amount, received monetary.Amount
if amountNumeric == nil {
// 'amount' in this row hasn't yet been updated to a numeric value
amount, err = monetaryAmountFromGobEncodedBigFloat(amountGob, monetary.StorjToken)
if err != nil {
return stripecoinpayments.TransactionsPage{}, Error.Wrap(err)
}
} else {
amount = monetary.AmountFromBaseUnits(*amountNumeric, monetary.StorjToken)
}
if receivedNumeric == nil {
// 'received' in this row hasn't yet been updated to a numeric value
received, err = monetaryAmountFromGobEncodedBigFloat(receivedGob, monetary.StorjToken)
if err != nil {
return stripecoinpayments.TransactionsPage{}, Error.Wrap(err)
}
} else {
received = monetary.AmountFromBaseUnits(*receivedNumeric, monetary.StorjToken)
} }
page.Transactions = append(page.Transactions, page.Transactions = append(page.Transactions,
@ -390,11 +828,11 @@ func fromDBXCoinpaymentsTransaction(dbxCPTX *dbx.CoinpaymentsTransaction) (*stri
amount, err := monetaryAmountFromGobEncodedBigFloat(dbxCPTX.Amount, currency) amount, err := monetaryAmountFromGobEncodedBigFloat(dbxCPTX.Amount, currency)
if err != nil { if err != nil {
return nil, errs.Wrap(err) return nil, Error.New("amount column: %v", err)
} }
received, err := monetaryAmountFromGobEncodedBigFloat(dbxCPTX.Received, currency) received, err := monetaryAmountFromGobEncodedBigFloat(dbxCPTX.Received, currency)
if err != nil { if err != nil {
return nil, errs.Wrap(err) return nil, Error.New("received column: %v", err)
} }
return &stripecoinpayments.Transaction{ return &stripecoinpayments.Transaction{
@ -417,3 +855,24 @@ func monetaryAmountFromGobEncodedBigFloat(encoded []byte, currency *monetary.Cur
} }
return monetary.AmountFromBigFloat(&bf, currency) return monetary.AmountFromBigFloat(&bf, currency)
} }
// DebugPerformBigFloatTransition performs the schema changes expected as part
// of Step 2 of the transition away from gob-encoded big.Float columns in the
// database.
//
// This is for testing purposes only, to ensure that no data is lost and that
// code still works after the transition.
func (db *coinPaymentsTransactions) DebugPerformBigFloatTransition(ctx context.Context) error {
_, err := db.db.DB.ExecContext(ctx, `
ALTER TABLE coinpayments_transactions ALTER COLUMN amount DROP NOT NULL;
ALTER TABLE coinpayments_transactions ALTER COLUMN received DROP NOT NULL;
ALTER TABLE coinpayments_transactions RENAME COLUMN amount TO amount_gob;
ALTER TABLE coinpayments_transactions RENAME COLUMN received TO received_gob;
ALTER TABLE coinpayments_transactions ADD COLUMN amount_numeric INT8;
ALTER TABLE coinpayments_transactions ADD COLUMN received_numeric INT8;
ALTER TABLE stripecoinpayments_tx_conversion_rates ALTER COLUMN rate DROP NOT NULL;
ALTER TABLE stripecoinpayments_tx_conversion_rates RENAME COLUMN rate TO rate_gob;
ALTER TABLE stripecoinpayments_tx_conversion_rates ADD COLUMN rate_numeric NUMERIC(20, 8);
`)
return Error.Wrap(err)
}

View File

@ -21,6 +21,7 @@ import (
"storj.io/private/dbutil/pgtest" "storj.io/private/dbutil/pgtest"
"storj.io/private/dbutil/pgutil" "storj.io/private/dbutil/pgutil"
"storj.io/private/dbutil/tempdb" "storj.io/private/dbutil/tempdb"
"storj.io/private/tagsql"
"storj.io/storj/satellite" "storj.io/storj/satellite"
"storj.io/storj/satellite/metabase" "storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/satellitedb" "storj.io/storj/satellite/satellitedb"
@ -105,6 +106,12 @@ func (db *tempMasterDB) Close() error {
return errs.Combine(db.DB.Close(), db.tempDB.Close()) return errs.Combine(db.DB.Close(), db.tempDB.Close())
} }
// DebugGetDBHandle exposes a handle to the raw database object. This is intended
// only for testing purposes and is temporary.
func (db *tempMasterDB) DebugGetDBHandle() tagsql.DB {
return db.tempDB.DB
}
// CreateMasterDB creates a new satellite database for testing. // CreateMasterDB creates a new satellite database for testing.
func CreateMasterDB(ctx context.Context, log *zap.Logger, name string, category string, index int, dbInfo Database) (db satellite.DB, err error) { func CreateMasterDB(ctx context.Context, log *zap.Logger, name string, category string, index int, dbInfo Database) (db satellite.DB, err error) {
if dbInfo.URL == "" { if dbInfo.URL == "" {