storj/satellite/satellitedb/coinpaymentstxs.go
paul cannon c053bdbd70 satellite/satellitedb: prepare to remove big.Float from db
Why: big.Float is not an ideal type for dealing with monetary amounts,
because no matter how high the precision, some non-integer decimal
values can not be represented exactly in base-2 floating point. Also,
storing gob-encoded big.Float values in the database makes it very hard
to use those values in meaningful queries, making it difficult to do
any sort of analysis on billing.

Now that we have amounts represented using monetary.Amount, we can
simply store them in the database using integers (as given by the
.BaseUnits() method on monetary.Amount).

We should move toward storing the currency along with any monetary
amount, wherever we are storing amounts, because satellites might want
to deal with currencies other than STORJ and USD. Even better, it
becomes much clearer what currency each monetary value is _supposed_ to
be in (I had to dig through code to find that out for our current
monetary columns).

Deployment
----------

Getting rid of the big.Float columns will take multiple deployment
steps. There does not seem to be any way to make the change in a way
that lets existing queries continue to work on CockroachDB (it could be
done with rules and triggers and a stored procedure that knows how to
gob-decode big.Float objects, but CockroachDB doesn't have rules _or_
triggers _or_ stored procedures). Instead, in this first step, we make
no changes to the database schema, but add code that knows how to deal
with the planned changes to the schema when they are made in a future
"step 2" deployment. All functions that deal with the
coinbase_transactions table have been taught to recognize the "undefined
column" error, and when it is seen, to call a separate "transition shim"
function to accomplish the task. Once all the services are running this
code, and the step 2 deployment makes breaking changes to the schema,
any services that are still running and connected to the database will
keep working correctly because of the fallback code included here. The
step 2 deployment can be made without these transition shims included,
because it will apply the database schema changes before any of its code
runs.

Step 1:

    No schema changes; just include code that recognizes the
    "undefined column" error when dealing with the
    coinbase_transactions or stripecoinpayments_tx_conversion_rates
    tables, and if found, assumes that the column changes from Step
    2 have already been made.

Step 2:

    In coinbase_transactions:

     * change the names of the 'amount' and 'received' columns to
       'amount_gob' and 'received_gob' respectively
     * add new 'amount_numeric' and 'received_numeric' columns with
       INT8 type.

    In stripecoinpayments_tx_conversion_rates:

     * change the name of the 'rate' column to 'rate_gob'
     * add new 'rate_numeric' column with NUMERIC(8, 8) type

    Code reading from either of these tables must query both the X_gob
    and X_numeric columns. If X_numeric is not null, its value should
    be used; otherwise, the gob-encoded big.Float in X_gob should be
    used. A chore might be included in this step that transitions values
    from X_gob to X_numeric a few rows at a time.

Step 3:

    Once all prod satellites have no values left in the _gob columns, we
    can drop those columns and add NOT NULL constraints to the _numeric
    columns.

Change-Id: Id6db304b404e6fde44f5a8c23cdaeeaaa2324f20
2021-09-29 00:23:44 +00:00

879 lines
28 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package satellitedb
import (
"context"
"math/big"
"time"
pgxerrcode "github.com/jackc/pgerrcode"
"github.com/shopspring/decimal"
"github.com/zeebo/errs"
"storj.io/common/uuid"
"storj.io/private/dbutil/pgutil/pgerrcode"
"storj.io/storj/satellite/payments/coinpayments"
"storj.io/storj/satellite/payments/monetary"
"storj.io/storj/satellite/payments/stripecoinpayments"
"storj.io/storj/satellite/satellitedb/dbx"
)
// ensure that coinpaymentsTransactions implements stripecoinpayments.TransactionsDB.
var _ stripecoinpayments.TransactionsDB = (*coinPaymentsTransactions)(nil)
// applyBalanceIntentState defines states of the apply balance intents.
type applyBalanceIntentState int
const (
// apply balance intent waits to be applied.
applyBalanceIntentStateUnapplied applyBalanceIntentState = 0
// transaction which balance intent points to has been consumed.
applyBalanceIntentStateConsumed applyBalanceIntentState = 1
)
// Int returns intent state as int.
func (intent applyBalanceIntentState) Int() int {
return int(intent)
}
// coinPaymentsTransactions is CoinPayments transactions DB.
//
// architecture: Database
type coinPaymentsTransactions struct {
db *satelliteDB
}
// Insert inserts new coinpayments transaction into DB.
func (db *coinPaymentsTransactions) Insert(ctx context.Context, tx stripecoinpayments.Transaction) (createTime time.Time, err error) {
defer mon.Task()(&ctx)(&err)
amount, err := tx.Amount.AsBigFloat().GobEncode()
if err != nil {
return time.Time{}, errs.Wrap(err)
}
received, err := tx.Received.AsBigFloat().GobEncode()
if err != nil {
return time.Time{}, errs.Wrap(err)
}
dbxCPTX, err := db.db.Create_CoinpaymentsTransaction(ctx,
dbx.CoinpaymentsTransaction_Id(tx.ID.String()),
dbx.CoinpaymentsTransaction_UserId(tx.AccountID[:]),
dbx.CoinpaymentsTransaction_Address(tx.Address),
dbx.CoinpaymentsTransaction_Amount(amount),
dbx.CoinpaymentsTransaction_Received(received),
dbx.CoinpaymentsTransaction_Status(tx.Status.Int()),
dbx.CoinpaymentsTransaction_Key(tx.Key),
dbx.CoinpaymentsTransaction_Timeout(int(tx.Timeout.Seconds())),
)
if err != nil {
if errCode := pgerrcode.FromError(err); errCode == pgxerrcode.UndefinedColumn {
// TEMPORARY: fall back to expected new schema to facilitate transition
return db.insertTransitionShim(ctx, tx)
}
return time.Time{}, err
}
return dbxCPTX.CreatedAt, nil
}
// insertTransitionShim inserts new coinpayments transaction into DB.
//
// It is to be used only during the transition from gob-encoded 'amount' and
// 'received' columns to 'amount_numeric'/'received_numeric'.
//
// When the transition is complete, this method will go away and Insert()
// will operate only on the _numeric columns.
func (db *coinPaymentsTransactions) insertTransitionShim(ctx context.Context, tx stripecoinpayments.Transaction) (createTime time.Time, err error) {
row := db.db.DB.QueryRowContext(ctx, db.db.Rebind(`
INSERT INTO coinpayments_transactions (
id, user_id, address, amount_numeric, received_numeric, status, key, timeout, created_at
) VALUES (
?, ?, ?, ?, ?, ?, ?, ?, now()
) RETURNING created_at;
`), tx.ID.String(), tx.AccountID[:], tx.Address, tx.Amount.BaseUnits(), tx.Received.BaseUnits(), tx.Status.Int(), tx.Key, int(tx.Timeout.Seconds()))
if err := row.Scan(&createTime); err != nil {
return time.Time{}, Error.Wrap(err)
}
return createTime, nil
}
// Update updates status and received for set of transactions.
func (db *coinPaymentsTransactions) Update(ctx context.Context, updates []stripecoinpayments.TransactionUpdate, applies coinpayments.TransactionIDList) (err error) {
defer mon.Task()(&ctx)(&err)
if len(updates) == 0 {
return nil
}
err = db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
for _, update := range updates {
receivedGob, err := update.Received.AsBigFloat().GobEncode()
if err != nil {
return errs.Wrap(err)
}
_, err = tx.Update_CoinpaymentsTransaction_By_Id(ctx,
dbx.CoinpaymentsTransaction_Id(update.TransactionID.String()),
dbx.CoinpaymentsTransaction_Update_Fields{
Received: dbx.CoinpaymentsTransaction_Received(receivedGob),
Status: dbx.CoinpaymentsTransaction_Status(update.Status.Int()),
},
)
if err != nil {
return err
}
}
for _, txID := range applies {
query := db.db.Rebind(`INSERT INTO stripecoinpayments_apply_balance_intents ( tx_id, state, created_at )
VALUES ( ?, ?, ? ) ON CONFLICT DO NOTHING`)
_, err = tx.Tx.ExecContext(ctx, query, txID.String(), applyBalanceIntentStateUnapplied.Int(), db.db.Hooks.Now().UTC())
if err != nil {
return err
}
}
return nil
})
if err != nil {
if errCode := pgerrcode.FromError(err); errCode == pgxerrcode.UndefinedColumn {
// TEMPORARY: fall back to expected new schema to facilitate transition
return db.updateTransitionShim(ctx, updates, applies)
}
}
return err
}
// updateTransitionShim updates status and received for set of transactions.
//
// It is to be used only during the transition from gob-encoded 'amount' and
// 'received' columns to 'amount_numeric'/'received_numeric'. During the
// transition, the gob-encoded columns will still exist but under a different
// name ('amount_gob'/'received_gob'). If the _numeric column value for a given
// row is non-null, it takes precedence over the corresponding _gob column.
//
// When the transition is complete, this method will go away and
// Update() will operate only on the _numeric columns.
func (db *coinPaymentsTransactions) updateTransitionShim(ctx context.Context, updates []stripecoinpayments.TransactionUpdate, applies coinpayments.TransactionIDList) (err error) {
defer mon.Task()(&ctx)(&err)
if len(updates) == 0 {
return nil
}
return db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
for _, update := range updates {
query := db.db.Rebind(`
UPDATE coinpayments_transactions
SET
received_gob = NULL,
received_numeric = ?,
status = ?
WHERE id = ?
`)
_, err := tx.Tx.ExecContext(ctx, query, update.Received.BaseUnits(), update.Status.Int(), update.TransactionID.String())
if err != nil {
return err
}
}
for _, txID := range applies {
query := db.db.Rebind(`INSERT INTO stripecoinpayments_apply_balance_intents ( tx_id, state, created_at )
VALUES ( ?, ?, ? ) ON CONFLICT DO NOTHING`)
_, err = tx.Tx.ExecContext(ctx, query, txID.String(), applyBalanceIntentStateUnapplied.Int(), db.db.Hooks.Now().UTC())
if err != nil {
return err
}
}
return nil
})
}
// Consume marks transaction as consumed, so it won't participate in apply account balance loop.
func (db *coinPaymentsTransactions) Consume(ctx context.Context, id coinpayments.TransactionID) (err error) {
defer mon.Task()(&ctx)(&err)
query := db.db.Rebind(`
WITH intent AS (
SELECT tx_id, state FROM stripecoinpayments_apply_balance_intents WHERE tx_id = ?
), updated AS (
UPDATE stripecoinpayments_apply_balance_intents AS ints
SET
state = ?
FROM intent
WHERE intent.tx_id = ints.tx_id AND ints.state = ?
RETURNING 1
)
SELECT EXISTS(SELECT 1 FROM intent) AS intent_exists, EXISTS(SELECT 1 FROM updated) AS intent_consumed;
`)
row := db.db.QueryRowContext(ctx, query, id, applyBalanceIntentStateConsumed, applyBalanceIntentStateUnapplied)
var exists, consumed bool
if err = row.Scan(&exists, &consumed); err != nil {
return err
}
if !exists {
return errs.New("can not consume transaction without apply balance intent")
}
if !consumed {
return stripecoinpayments.ErrTransactionConsumed
}
return err
}
// LockRate locks conversion rate for transaction.
func (db *coinPaymentsTransactions) LockRate(ctx context.Context, id coinpayments.TransactionID, rate decimal.Decimal) (err error) {
defer mon.Task()(&ctx)(&err)
buff, err := rate.BigFloat().GobEncode()
if err != nil {
return Error.Wrap(err)
}
_, err = db.db.Create_StripecoinpaymentsTxConversionRate(ctx,
dbx.StripecoinpaymentsTxConversionRate_TxId(id.String()),
dbx.StripecoinpaymentsTxConversionRate_Rate(buff))
if err != nil {
if errCode := pgerrcode.FromError(err); errCode == pgxerrcode.UndefinedColumn {
// TEMPORARY: fall back to expected new schema to facilitate transition
return db.lockRateTransitionShim(ctx, id, rate)
}
}
return Error.Wrap(err)
}
// lockRateTransitionShim locks conversion rate for transaction.
//
// It is to be used only during the transition from the gob-encoded 'rate'
// column to 'rate_numeric'.
//
// When the transition is complete, this method will go away and
// LockRate() will operate only on the _numeric column.
func (db *coinPaymentsTransactions) lockRateTransitionShim(ctx context.Context, id coinpayments.TransactionID, rate decimal.Decimal) (err error) {
defer mon.Task()(&ctx)(&err)
now := time.Now().UTC()
query := db.db.Rebind(`
INSERT INTO stripecoinpayments_tx_conversion_rates ( tx_id, rate_numeric, created_at ) VALUES ( ?, ?, ? )
`)
_, err = db.db.DB.ExecContext(ctx, query, id.String(), rate.String(), now)
return Error.Wrap(err)
}
// GetLockedRate returns locked conversion rate for transaction or error if non exists.
func (db *coinPaymentsTransactions) GetLockedRate(ctx context.Context, id coinpayments.TransactionID) (_ decimal.Decimal, err error) {
defer mon.Task()(&ctx)(&err)
dbxRate, err := db.db.Get_StripecoinpaymentsTxConversionRate_By_TxId(ctx,
dbx.StripecoinpaymentsTxConversionRate_TxId(id.String()),
)
if err != nil {
if errCode := pgerrcode.FromError(err); errCode == pgxerrcode.UndefinedColumn {
// TEMPORARY: fall back to expected new schema to facilitate transition
return db.getLockedRateTransitionShim(ctx, id)
}
return decimal.Decimal{}, err
}
var rateF big.Float
if err = rateF.GobDecode(dbxRate.Rate); err != nil {
return decimal.Decimal{}, errs.Wrap(err)
}
rate, err := monetary.DecimalFromBigFloat(&rateF)
if err != nil {
return decimal.Decimal{}, errs.Wrap(err)
}
return rate, nil
}
// getLockedRateTransitionShim returns locked conversion rate for transaction
// or error if none exists.
//
// It is to be used only during the transition from the gob-encoded 'rate'
// column to 'rate_numeric'. During the transition, the gob-encoded column will
// still exist but under a different name ('rate_gob'). If rate_numeric for a
// given row is non-null, it takes precedence over rate_gob.
//
// When the transition is complete, this method will go away and
// GetLockedRate() will operate only on rate_numeric.
func (db *coinPaymentsTransactions) getLockedRateTransitionShim(ctx context.Context, id coinpayments.TransactionID) (_ decimal.Decimal, err error) {
defer mon.Task()(&ctx)(&err)
var rateGob []byte
var rateNumeric *string
query := db.db.Rebind(`
SELECT rate_gob, rate_numeric
FROM stripecoinpayments_tx_conversion_rates
WHERE tx_id = ?
`)
row := db.db.DB.QueryRowContext(ctx, query, id.String())
err = row.Scan(&rateGob, &rateNumeric)
if err != nil {
return decimal.Decimal{}, Error.Wrap(err)
}
if rateNumeric == nil {
// This row does not have a numeric rate value yet
var rateF big.Float
if err = rateF.GobDecode(rateGob); err != nil {
return decimal.Decimal{}, Error.Wrap(err)
}
rate, err := monetary.DecimalFromBigFloat(&rateF)
return rate, Error.Wrap(err)
}
rate, err := decimal.NewFromString(*rateNumeric)
return rate, Error.Wrap(err)
}
// ListAccount returns all transaction for specific user.
func (db *coinPaymentsTransactions) ListAccount(ctx context.Context, userID uuid.UUID) (_ []stripecoinpayments.Transaction, err error) {
defer mon.Task()(&ctx)(&err)
dbxTXs, err := db.db.All_CoinpaymentsTransaction_By_UserId_OrderBy_Desc_CreatedAt(ctx,
dbx.CoinpaymentsTransaction_UserId(userID[:]),
)
if err != nil {
if errCode := pgerrcode.FromError(err); errCode == pgxerrcode.UndefinedColumn {
// TEMPORARY: fall back to expected new schema to facilitate transition
return db.listAccountTransitionShim(ctx, userID)
}
return nil, err
}
var txs []stripecoinpayments.Transaction
for _, dbxTX := range dbxTXs {
tx, err := fromDBXCoinpaymentsTransaction(dbxTX)
if err != nil {
return nil, errs.Wrap(err)
}
txs = append(txs, *tx)
}
return txs, nil
}
// listAccountTransitionShim returns all transaction for specific user.
//
// It is to be used only during the transition from gob-encoded 'amount' and
// 'received' columns to 'amount_numeric'/'received_numeric'. During the
// transition, the gob-encoded columns will still exist but under a different
// name ('amount_gob'/'received_gob'). If the _numeric column value for a given
// row is non-null, it takes precedence over the corresponding _gob column.
//
// When the transition is complete, this method will go away and ListAccount()
// will operate only on the _numeric columns.
func (db *coinPaymentsTransactions) listAccountTransitionShim(ctx context.Context, userID uuid.UUID) (_ []stripecoinpayments.Transaction, err error) {
defer mon.Task()(&ctx)(&err)
query := db.db.Rebind(`
SELECT
id,
user_id,
address,
amount_gob,
amount_numeric,
received_gob,
received_numeric,
status,
key,
timeout,
created_at
FROM coinpayments_transactions
WHERE user_id = ?
ORDER BY created_at DESC
`)
rows, err := db.db.DB.QueryContext(ctx, query, userID[:])
if err != nil {
return nil, err
}
defer func() { err = errs.Combine(err, rows.Close()) }()
var txs []stripecoinpayments.Transaction
for rows.Next() {
var tx stripecoinpayments.Transaction
var amountGob, receivedGob []byte
var amountNumeric, receivedNumeric *int64
var timeoutSeconds int
err := rows.Scan(&tx.ID, &tx.AccountID, &tx.Address, &amountGob, &amountNumeric, &receivedGob, &receivedNumeric, &tx.Status, &tx.Key, &timeoutSeconds, &tx.CreatedAt)
if err != nil {
return nil, Error.Wrap(err)
}
tx.Timeout = time.Second * time.Duration(timeoutSeconds)
if amountNumeric == nil {
tx.Amount, err = monetaryAmountFromGobEncodedBigFloat(amountGob, monetary.StorjToken)
if err != nil {
return nil, Error.New("amount column: %v", err)
}
} else {
tx.Amount = monetary.AmountFromBaseUnits(*amountNumeric, monetary.StorjToken)
}
if receivedNumeric == nil {
tx.Received, err = monetaryAmountFromGobEncodedBigFloat(receivedGob, monetary.StorjToken)
if err != nil {
return nil, Error.New("received column: %v", err)
}
} else {
tx.Received = monetary.AmountFromBaseUnits(*receivedNumeric, monetary.StorjToken)
}
txs = append(txs, tx)
}
if err = rows.Err(); err != nil {
return nil, err
}
return txs, nil
}
// ListPending returns paginated list of pending transactions.
func (db *coinPaymentsTransactions) ListPending(ctx context.Context, offset int64, limit int, before time.Time) (_ stripecoinpayments.TransactionsPage, err error) {
defer mon.Task()(&ctx)(&err)
query := db.db.Rebind(`SELECT
id,
user_id,
address,
amount,
received,
status,
key,
created_at
FROM coinpayments_transactions
WHERE status IN (?,?)
AND created_at <= ?
ORDER by created_at DESC
LIMIT ? OFFSET ?`)
rows, err := db.db.QueryContext(ctx, query, coinpayments.StatusPending, coinpayments.StatusReceived, before, limit+1, offset)
if err != nil {
if errCode := pgerrcode.FromError(err); errCode == pgxerrcode.UndefinedColumn {
// TEMPORARY: fall back to expected new schema to facilitate transition
return db.listPendingTransitionShim(ctx, offset, limit, before)
}
return stripecoinpayments.TransactionsPage{}, err
}
defer func() {
err = errs.Combine(err, rows.Close())
}()
var page stripecoinpayments.TransactionsPage
for rows.Next() {
var id, address string
var userID uuid.UUID
var amountB, receivedB []byte
var status int
var key string
var createdAt time.Time
err := rows.Scan(&id, &userID, &address, &amountB, &receivedB, &status, &key, &createdAt)
if err != nil {
return stripecoinpayments.TransactionsPage{}, err
}
// TODO: the currency here should be passed in to this function or stored
// in the database.
currency := monetary.StorjToken
amount, err := monetaryAmountFromGobEncodedBigFloat(amountB, currency)
if err != nil {
return stripecoinpayments.TransactionsPage{}, err
}
received, err := monetaryAmountFromGobEncodedBigFloat(receivedB, currency)
if err != nil {
return stripecoinpayments.TransactionsPage{}, err
}
page.Transactions = append(page.Transactions,
stripecoinpayments.Transaction{
ID: coinpayments.TransactionID(id),
AccountID: userID,
Address: address,
Amount: amount,
Received: received,
Status: coinpayments.Status(status),
Key: key,
CreatedAt: createdAt,
},
)
}
if err = rows.Err(); err != nil {
return stripecoinpayments.TransactionsPage{}, err
}
if len(page.Transactions) == limit+1 {
page.Next = true
page.NextOffset = offset + int64(limit)
page.Transactions = page.Transactions[:len(page.Transactions)-1]
}
return page, nil
}
// listPendingTransitionShim returns paginated list of pending transactions.
//
// It is to be used only during the transition from gob-encoded 'amount' and
// 'received' columns to 'amount_numeric'/'received_numeric'. During the
// transition, the gob-encoded columns will still exist but under a different
// name ('amount_gob'/'received_gob'). If the _numeric column value for a given
// row is non-null, it takes precedence over the corresponding _gob column.
//
// When the transition is complete, this method will go away and ListPending()
// will operate only on the _numeric columns.
func (db *coinPaymentsTransactions) listPendingTransitionShim(ctx context.Context, offset int64, limit int, before time.Time) (_ stripecoinpayments.TransactionsPage, err error) {
defer mon.Task()(&ctx)(&err)
query := db.db.Rebind(`SELECT
id,
user_id,
address,
amount_gob,
amount_numeric,
received_gob,
received_numeric,
status,
key,
created_at
FROM coinpayments_transactions
WHERE status IN (?,?)
AND created_at <= ?
LIMIT ? OFFSET ?`)
rows, err := db.db.QueryContext(ctx, query, coinpayments.StatusPending, coinpayments.StatusReceived, before, limit+1, offset)
if err != nil {
return stripecoinpayments.TransactionsPage{}, Error.Wrap(err)
}
defer func() {
err = errs.Combine(err, rows.Close())
}()
var page stripecoinpayments.TransactionsPage
for rows.Next() {
var id, address string
var userID uuid.UUID
var amountGob, receivedGob []byte
var amountNumeric, receivedNumeric *int64
var amount, received monetary.Amount
var status int
var key string
var createdAt time.Time
err := rows.Scan(&id, &userID, &address, &amountGob, &amountNumeric, &receivedGob, &receivedNumeric, &status, &key, &createdAt)
if err != nil {
return stripecoinpayments.TransactionsPage{}, err
}
if amountNumeric == nil {
// 'amount' in this row hasn't yet been updated to a numeric value
amount, err = monetaryAmountFromGobEncodedBigFloat(amountGob, monetary.StorjToken)
if err != nil {
return stripecoinpayments.TransactionsPage{}, Error.Wrap(err)
}
} else {
amount = monetary.AmountFromBaseUnits(*amountNumeric, monetary.StorjToken)
}
if receivedNumeric == nil {
// 'received' in this row hasn't yet been updated to a numeric value
received, err = monetaryAmountFromGobEncodedBigFloat(receivedGob, monetary.StorjToken)
if err != nil {
return stripecoinpayments.TransactionsPage{}, Error.Wrap(err)
}
} else {
received = monetary.AmountFromBaseUnits(*receivedNumeric, monetary.StorjToken)
}
page.Transactions = append(page.Transactions,
stripecoinpayments.Transaction{
ID: coinpayments.TransactionID(id),
AccountID: userID,
Address: address,
Amount: amount,
Received: received,
Status: coinpayments.Status(status),
Key: key,
CreatedAt: createdAt,
},
)
}
if err = rows.Err(); err != nil {
return stripecoinpayments.TransactionsPage{}, err
}
if len(page.Transactions) == limit+1 {
page.Next = true
page.NextOffset = offset + int64(limit)
page.Transactions = page.Transactions[:len(page.Transactions)-1]
}
return page, nil
}
// ListUnapplied returns TransactionsPage with a pending or completed status, that should be applied to account balance.
func (db *coinPaymentsTransactions) ListUnapplied(ctx context.Context, offset int64, limit int, before time.Time) (_ stripecoinpayments.TransactionsPage, err error) {
defer mon.Task()(&ctx)(&err)
query := db.db.Rebind(`SELECT
txs.id,
txs.user_id,
txs.address,
txs.amount,
txs.received,
txs.status,
txs.key,
txs.created_at
FROM coinpayments_transactions as txs
INNER JOIN stripecoinpayments_apply_balance_intents as ints
ON txs.id = ints.tx_id
WHERE txs.status >= ?
AND txs.created_at <= ?
AND ints.state = ?
ORDER by txs.created_at DESC
LIMIT ? OFFSET ?`)
rows, err := db.db.QueryContext(ctx, query, coinpayments.StatusReceived, before, applyBalanceIntentStateUnapplied, limit+1, offset)
if err != nil {
if errCode := pgerrcode.FromError(err); errCode == pgxerrcode.UndefinedColumn {
return db.listUnappliedTransitionShim(ctx, offset, limit, before)
}
return stripecoinpayments.TransactionsPage{}, err
}
defer func() { err = errs.Combine(err, rows.Close()) }()
var page stripecoinpayments.TransactionsPage
for rows.Next() {
var id, address string
var userID uuid.UUID
var amountB, receivedB []byte
var status int
var key string
var createdAt time.Time
err := rows.Scan(&id, &userID, &address, &amountB, &receivedB, &status, &key, &createdAt)
if err != nil {
return stripecoinpayments.TransactionsPage{}, err
}
// TODO: the currency here should be passed in to this function or stored
// in the database.
currency := monetary.StorjToken
amount, err := monetaryAmountFromGobEncodedBigFloat(amountB, currency)
if err != nil {
return stripecoinpayments.TransactionsPage{}, Error.New("amount column: %v", err)
}
received, err := monetaryAmountFromGobEncodedBigFloat(receivedB, currency)
if err != nil {
return stripecoinpayments.TransactionsPage{}, Error.New("received column: %v", err)
}
page.Transactions = append(page.Transactions,
stripecoinpayments.Transaction{
ID: coinpayments.TransactionID(id),
AccountID: userID,
Address: address,
Amount: amount,
Received: received,
Status: coinpayments.Status(status),
Key: key,
CreatedAt: createdAt,
},
)
}
if err = rows.Err(); err != nil {
return stripecoinpayments.TransactionsPage{}, err
}
if len(page.Transactions) == limit+1 {
page.Next = true
page.NextOffset = offset + int64(limit)
page.Transactions = page.Transactions[:len(page.Transactions)-1]
}
return page, nil
}
// listUnappliedTransitionShim returns TransactionsPage with a pending or
// completed status, that should be applied to account balance.
//
// It is to be used only during the transition from gob-encoded 'amount' and
// 'received' columns to 'amount_numeric'/'received_numeric'. During the
// transition, the gob-encoded columns will still exist but under a different
// name ('amount_gob'/'received_gob'). If the _numeric column value for a given
// row is non-null, it takes precedence over the corresponding _gob column.
//
// When the transition is complete, this method will go away and
// ListUnapplied() will operate only on the _numeric columns.
func (db *coinPaymentsTransactions) listUnappliedTransitionShim(ctx context.Context, offset int64, limit int, before time.Time) (_ stripecoinpayments.TransactionsPage, err error) {
defer mon.Task()(&ctx)(&err)
query := db.db.Rebind(`SELECT
txs.id,
txs.user_id,
txs.address,
txs.amount_gob,
txs.amount_numeric,
txs.received_gob,
txs.received_numeric,
txs.status,
txs.key,
txs.created_at
FROM coinpayments_transactions as txs
INNER JOIN stripecoinpayments_apply_balance_intents as ints
ON txs.id = ints.tx_id
WHERE txs.status >= ?
AND txs.created_at <= ?
AND ints.state = ?
ORDER by txs.created_at DESC
LIMIT ? OFFSET ?`)
rows, err := db.db.QueryContext(ctx, query, coinpayments.StatusReceived, before, applyBalanceIntentStateUnapplied, limit+1, offset)
if err != nil {
return stripecoinpayments.TransactionsPage{}, err
}
defer func() { err = errs.Combine(err, rows.Close()) }()
var page stripecoinpayments.TransactionsPage
for rows.Next() {
var id, address string
var userID uuid.UUID
var amountGob, receivedGob []byte
var amountNumeric, receivedNumeric *int64
var status int
var key string
var createdAt time.Time
err := rows.Scan(&id, &userID, &address, &amountGob, &amountNumeric, &receivedGob, &receivedNumeric, &status, &key, &createdAt)
if err != nil {
return stripecoinpayments.TransactionsPage{}, err
}
var amount, received monetary.Amount
if amountNumeric == nil {
// 'amount' in this row hasn't yet been updated to a numeric value
amount, err = monetaryAmountFromGobEncodedBigFloat(amountGob, monetary.StorjToken)
if err != nil {
return stripecoinpayments.TransactionsPage{}, Error.Wrap(err)
}
} else {
amount = monetary.AmountFromBaseUnits(*amountNumeric, monetary.StorjToken)
}
if receivedNumeric == nil {
// 'received' in this row hasn't yet been updated to a numeric value
received, err = monetaryAmountFromGobEncodedBigFloat(receivedGob, monetary.StorjToken)
if err != nil {
return stripecoinpayments.TransactionsPage{}, Error.Wrap(err)
}
} else {
received = monetary.AmountFromBaseUnits(*receivedNumeric, monetary.StorjToken)
}
page.Transactions = append(page.Transactions,
stripecoinpayments.Transaction{
ID: coinpayments.TransactionID(id),
AccountID: userID,
Address: address,
Amount: amount,
Received: received,
Status: coinpayments.Status(status),
Key: key,
CreatedAt: createdAt,
},
)
}
if err = rows.Err(); err != nil {
return stripecoinpayments.TransactionsPage{}, err
}
if len(page.Transactions) == limit+1 {
page.Next = true
page.NextOffset = offset + int64(limit)
page.Transactions = page.Transactions[:len(page.Transactions)-1]
}
return page, nil
}
// fromDBXCoinpaymentsTransaction converts *dbx.CoinpaymentsTransaction to *stripecoinpayments.Transaction.
func fromDBXCoinpaymentsTransaction(dbxCPTX *dbx.CoinpaymentsTransaction) (*stripecoinpayments.Transaction, error) {
userID, err := uuid.FromBytes(dbxCPTX.UserId)
if err != nil {
return nil, errs.Wrap(err)
}
// TODO: the currency here should be passed in to this function or stored
// in the database.
currency := monetary.StorjToken
amount, err := monetaryAmountFromGobEncodedBigFloat(dbxCPTX.Amount, currency)
if err != nil {
return nil, Error.New("amount column: %v", err)
}
received, err := monetaryAmountFromGobEncodedBigFloat(dbxCPTX.Received, currency)
if err != nil {
return nil, Error.New("received column: %v", err)
}
return &stripecoinpayments.Transaction{
ID: coinpayments.TransactionID(dbxCPTX.Id),
AccountID: userID,
Address: dbxCPTX.Address,
Amount: amount,
Received: received,
Status: coinpayments.Status(dbxCPTX.Status),
Key: dbxCPTX.Key,
Timeout: time.Second * time.Duration(dbxCPTX.Timeout),
CreatedAt: dbxCPTX.CreatedAt,
}, nil
}
func monetaryAmountFromGobEncodedBigFloat(encoded []byte, currency *monetary.Currency) (_ monetary.Amount, err error) {
var bf big.Float
if err := bf.GobDecode(encoded); err != nil {
return monetary.Amount{}, Error.Wrap(err)
}
return monetary.AmountFromBigFloat(&bf, currency)
}
// DebugPerformBigFloatTransition performs the schema changes expected as part
// of Step 2 of the transition away from gob-encoded big.Float columns in the
// database.
//
// This is for testing purposes only, to ensure that no data is lost and that
// code still works after the transition.
func (db *coinPaymentsTransactions) DebugPerformBigFloatTransition(ctx context.Context) error {
_, err := db.db.DB.ExecContext(ctx, `
ALTER TABLE coinpayments_transactions ALTER COLUMN amount DROP NOT NULL;
ALTER TABLE coinpayments_transactions ALTER COLUMN received DROP NOT NULL;
ALTER TABLE coinpayments_transactions RENAME COLUMN amount TO amount_gob;
ALTER TABLE coinpayments_transactions RENAME COLUMN received TO received_gob;
ALTER TABLE coinpayments_transactions ADD COLUMN amount_numeric INT8;
ALTER TABLE coinpayments_transactions ADD COLUMN received_numeric INT8;
ALTER TABLE stripecoinpayments_tx_conversion_rates ALTER COLUMN rate DROP NOT NULL;
ALTER TABLE stripecoinpayments_tx_conversion_rates RENAME COLUMN rate TO rate_gob;
ALTER TABLE stripecoinpayments_tx_conversion_rates ADD COLUMN rate_numeric NUMERIC(20, 8);
`)
return Error.Wrap(err)
}