c053bdbd70
Why: big.Float is not an ideal type for dealing with monetary amounts, because no matter how high the precision, some non-integer decimal values can not be represented exactly in base-2 floating point. Also, storing gob-encoded big.Float values in the database makes it very hard to use those values in meaningful queries, making it difficult to do any sort of analysis on billing. Now that we have amounts represented using monetary.Amount, we can simply store them in the database using integers (as given by the .BaseUnits() method on monetary.Amount). We should move toward storing the currency along with any monetary amount, wherever we are storing amounts, because satellites might want to deal with currencies other than STORJ and USD. Even better, it becomes much clearer what currency each monetary value is _supposed_ to be in (I had to dig through code to find that out for our current monetary columns). Deployment ---------- Getting rid of the big.Float columns will take multiple deployment steps. There does not seem to be any way to make the change in a way that lets existing queries continue to work on CockroachDB (it could be done with rules and triggers and a stored procedure that knows how to gob-decode big.Float objects, but CockroachDB doesn't have rules _or_ triggers _or_ stored procedures). Instead, in this first step, we make no changes to the database schema, but add code that knows how to deal with the planned changes to the schema when they are made in a future "step 2" deployment. All functions that deal with the coinbase_transactions table have been taught to recognize the "undefined column" error, and when it is seen, to call a separate "transition shim" function to accomplish the task. Once all the services are running this code, and the step 2 deployment makes breaking changes to the schema, any services that are still running and connected to the database will keep working correctly because of the fallback code included here. The step 2 deployment can be made without these transition shims included, because it will apply the database schema changes before any of its code runs. Step 1: No schema changes; just include code that recognizes the "undefined column" error when dealing with the coinbase_transactions or stripecoinpayments_tx_conversion_rates tables, and if found, assumes that the column changes from Step 2 have already been made. Step 2: In coinbase_transactions: * change the names of the 'amount' and 'received' columns to 'amount_gob' and 'received_gob' respectively * add new 'amount_numeric' and 'received_numeric' columns with INT8 type. In stripecoinpayments_tx_conversion_rates: * change the name of the 'rate' column to 'rate_gob' * add new 'rate_numeric' column with NUMERIC(8, 8) type Code reading from either of these tables must query both the X_gob and X_numeric columns. If X_numeric is not null, its value should be used; otherwise, the gob-encoded big.Float in X_gob should be used. A chore might be included in this step that transitions values from X_gob to X_numeric a few rows at a time. Step 3: Once all prod satellites have no values left in the _gob columns, we can drop those columns and add NOT NULL constraints to the _numeric columns. Change-Id: Id6db304b404e6fde44f5a8c23cdaeeaaa2324f20
879 lines
28 KiB
Go
879 lines
28 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package satellitedb
|
|
|
|
import (
|
|
"context"
|
|
"math/big"
|
|
"time"
|
|
|
|
pgxerrcode "github.com/jackc/pgerrcode"
|
|
"github.com/shopspring/decimal"
|
|
"github.com/zeebo/errs"
|
|
|
|
"storj.io/common/uuid"
|
|
"storj.io/private/dbutil/pgutil/pgerrcode"
|
|
"storj.io/storj/satellite/payments/coinpayments"
|
|
"storj.io/storj/satellite/payments/monetary"
|
|
"storj.io/storj/satellite/payments/stripecoinpayments"
|
|
"storj.io/storj/satellite/satellitedb/dbx"
|
|
)
|
|
|
|
// ensure that coinpaymentsTransactions implements stripecoinpayments.TransactionsDB.
|
|
var _ stripecoinpayments.TransactionsDB = (*coinPaymentsTransactions)(nil)
|
|
|
|
// applyBalanceIntentState defines states of the apply balance intents.
|
|
type applyBalanceIntentState int
|
|
|
|
const (
|
|
// apply balance intent waits to be applied.
|
|
applyBalanceIntentStateUnapplied applyBalanceIntentState = 0
|
|
// transaction which balance intent points to has been consumed.
|
|
applyBalanceIntentStateConsumed applyBalanceIntentState = 1
|
|
)
|
|
|
|
// Int returns intent state as int.
|
|
func (intent applyBalanceIntentState) Int() int {
|
|
return int(intent)
|
|
}
|
|
|
|
// coinPaymentsTransactions is CoinPayments transactions DB.
|
|
//
|
|
// architecture: Database
|
|
type coinPaymentsTransactions struct {
|
|
db *satelliteDB
|
|
}
|
|
|
|
// Insert inserts new coinpayments transaction into DB.
|
|
func (db *coinPaymentsTransactions) Insert(ctx context.Context, tx stripecoinpayments.Transaction) (createTime time.Time, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
amount, err := tx.Amount.AsBigFloat().GobEncode()
|
|
if err != nil {
|
|
return time.Time{}, errs.Wrap(err)
|
|
}
|
|
received, err := tx.Received.AsBigFloat().GobEncode()
|
|
if err != nil {
|
|
return time.Time{}, errs.Wrap(err)
|
|
}
|
|
|
|
dbxCPTX, err := db.db.Create_CoinpaymentsTransaction(ctx,
|
|
dbx.CoinpaymentsTransaction_Id(tx.ID.String()),
|
|
dbx.CoinpaymentsTransaction_UserId(tx.AccountID[:]),
|
|
dbx.CoinpaymentsTransaction_Address(tx.Address),
|
|
dbx.CoinpaymentsTransaction_Amount(amount),
|
|
dbx.CoinpaymentsTransaction_Received(received),
|
|
dbx.CoinpaymentsTransaction_Status(tx.Status.Int()),
|
|
dbx.CoinpaymentsTransaction_Key(tx.Key),
|
|
dbx.CoinpaymentsTransaction_Timeout(int(tx.Timeout.Seconds())),
|
|
)
|
|
if err != nil {
|
|
if errCode := pgerrcode.FromError(err); errCode == pgxerrcode.UndefinedColumn {
|
|
// TEMPORARY: fall back to expected new schema to facilitate transition
|
|
return db.insertTransitionShim(ctx, tx)
|
|
}
|
|
return time.Time{}, err
|
|
}
|
|
return dbxCPTX.CreatedAt, nil
|
|
}
|
|
|
|
// insertTransitionShim inserts new coinpayments transaction into DB.
|
|
//
|
|
// It is to be used only during the transition from gob-encoded 'amount' and
|
|
// 'received' columns to 'amount_numeric'/'received_numeric'.
|
|
//
|
|
// When the transition is complete, this method will go away and Insert()
|
|
// will operate only on the _numeric columns.
|
|
func (db *coinPaymentsTransactions) insertTransitionShim(ctx context.Context, tx stripecoinpayments.Transaction) (createTime time.Time, err error) {
|
|
row := db.db.DB.QueryRowContext(ctx, db.db.Rebind(`
|
|
INSERT INTO coinpayments_transactions (
|
|
id, user_id, address, amount_numeric, received_numeric, status, key, timeout, created_at
|
|
) VALUES (
|
|
?, ?, ?, ?, ?, ?, ?, ?, now()
|
|
) RETURNING created_at;
|
|
`), tx.ID.String(), tx.AccountID[:], tx.Address, tx.Amount.BaseUnits(), tx.Received.BaseUnits(), tx.Status.Int(), tx.Key, int(tx.Timeout.Seconds()))
|
|
if err := row.Scan(&createTime); err != nil {
|
|
return time.Time{}, Error.Wrap(err)
|
|
}
|
|
return createTime, nil
|
|
}
|
|
|
|
// Update updates status and received for set of transactions.
|
|
func (db *coinPaymentsTransactions) Update(ctx context.Context, updates []stripecoinpayments.TransactionUpdate, applies coinpayments.TransactionIDList) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
if len(updates) == 0 {
|
|
return nil
|
|
}
|
|
|
|
err = db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
|
|
for _, update := range updates {
|
|
receivedGob, err := update.Received.AsBigFloat().GobEncode()
|
|
if err != nil {
|
|
return errs.Wrap(err)
|
|
}
|
|
|
|
_, err = tx.Update_CoinpaymentsTransaction_By_Id(ctx,
|
|
dbx.CoinpaymentsTransaction_Id(update.TransactionID.String()),
|
|
dbx.CoinpaymentsTransaction_Update_Fields{
|
|
Received: dbx.CoinpaymentsTransaction_Received(receivedGob),
|
|
Status: dbx.CoinpaymentsTransaction_Status(update.Status.Int()),
|
|
},
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
for _, txID := range applies {
|
|
query := db.db.Rebind(`INSERT INTO stripecoinpayments_apply_balance_intents ( tx_id, state, created_at )
|
|
VALUES ( ?, ?, ? ) ON CONFLICT DO NOTHING`)
|
|
_, err = tx.Tx.ExecContext(ctx, query, txID.String(), applyBalanceIntentStateUnapplied.Int(), db.db.Hooks.Now().UTC())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
if errCode := pgerrcode.FromError(err); errCode == pgxerrcode.UndefinedColumn {
|
|
// TEMPORARY: fall back to expected new schema to facilitate transition
|
|
return db.updateTransitionShim(ctx, updates, applies)
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
// updateTransitionShim updates status and received for set of transactions.
|
|
//
|
|
// It is to be used only during the transition from gob-encoded 'amount' and
|
|
// 'received' columns to 'amount_numeric'/'received_numeric'. During the
|
|
// transition, the gob-encoded columns will still exist but under a different
|
|
// name ('amount_gob'/'received_gob'). If the _numeric column value for a given
|
|
// row is non-null, it takes precedence over the corresponding _gob column.
|
|
//
|
|
// When the transition is complete, this method will go away and
|
|
// Update() will operate only on the _numeric columns.
|
|
func (db *coinPaymentsTransactions) updateTransitionShim(ctx context.Context, updates []stripecoinpayments.TransactionUpdate, applies coinpayments.TransactionIDList) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
if len(updates) == 0 {
|
|
return nil
|
|
}
|
|
|
|
return db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
|
|
for _, update := range updates {
|
|
query := db.db.Rebind(`
|
|
UPDATE coinpayments_transactions
|
|
SET
|
|
received_gob = NULL,
|
|
received_numeric = ?,
|
|
status = ?
|
|
WHERE id = ?
|
|
`)
|
|
_, err := tx.Tx.ExecContext(ctx, query, update.Received.BaseUnits(), update.Status.Int(), update.TransactionID.String())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
for _, txID := range applies {
|
|
query := db.db.Rebind(`INSERT INTO stripecoinpayments_apply_balance_intents ( tx_id, state, created_at )
|
|
VALUES ( ?, ?, ? ) ON CONFLICT DO NOTHING`)
|
|
_, err = tx.Tx.ExecContext(ctx, query, txID.String(), applyBalanceIntentStateUnapplied.Int(), db.db.Hooks.Now().UTC())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// Consume marks transaction as consumed, so it won't participate in apply account balance loop.
|
|
func (db *coinPaymentsTransactions) Consume(ctx context.Context, id coinpayments.TransactionID) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
query := db.db.Rebind(`
|
|
WITH intent AS (
|
|
SELECT tx_id, state FROM stripecoinpayments_apply_balance_intents WHERE tx_id = ?
|
|
), updated AS (
|
|
UPDATE stripecoinpayments_apply_balance_intents AS ints
|
|
SET
|
|
state = ?
|
|
FROM intent
|
|
WHERE intent.tx_id = ints.tx_id AND ints.state = ?
|
|
RETURNING 1
|
|
)
|
|
SELECT EXISTS(SELECT 1 FROM intent) AS intent_exists, EXISTS(SELECT 1 FROM updated) AS intent_consumed;
|
|
`)
|
|
|
|
row := db.db.QueryRowContext(ctx, query, id, applyBalanceIntentStateConsumed, applyBalanceIntentStateUnapplied)
|
|
|
|
var exists, consumed bool
|
|
if err = row.Scan(&exists, &consumed); err != nil {
|
|
return err
|
|
}
|
|
|
|
if !exists {
|
|
return errs.New("can not consume transaction without apply balance intent")
|
|
}
|
|
if !consumed {
|
|
return stripecoinpayments.ErrTransactionConsumed
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// LockRate locks conversion rate for transaction.
|
|
func (db *coinPaymentsTransactions) LockRate(ctx context.Context, id coinpayments.TransactionID, rate decimal.Decimal) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
buff, err := rate.BigFloat().GobEncode()
|
|
if err != nil {
|
|
return Error.Wrap(err)
|
|
}
|
|
|
|
_, err = db.db.Create_StripecoinpaymentsTxConversionRate(ctx,
|
|
dbx.StripecoinpaymentsTxConversionRate_TxId(id.String()),
|
|
dbx.StripecoinpaymentsTxConversionRate_Rate(buff))
|
|
|
|
if err != nil {
|
|
if errCode := pgerrcode.FromError(err); errCode == pgxerrcode.UndefinedColumn {
|
|
// TEMPORARY: fall back to expected new schema to facilitate transition
|
|
return db.lockRateTransitionShim(ctx, id, rate)
|
|
}
|
|
}
|
|
return Error.Wrap(err)
|
|
}
|
|
|
|
// lockRateTransitionShim locks conversion rate for transaction.
|
|
//
|
|
// It is to be used only during the transition from the gob-encoded 'rate'
|
|
// column to 'rate_numeric'.
|
|
//
|
|
// When the transition is complete, this method will go away and
|
|
// LockRate() will operate only on the _numeric column.
|
|
func (db *coinPaymentsTransactions) lockRateTransitionShim(ctx context.Context, id coinpayments.TransactionID, rate decimal.Decimal) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
now := time.Now().UTC()
|
|
query := db.db.Rebind(`
|
|
INSERT INTO stripecoinpayments_tx_conversion_rates ( tx_id, rate_numeric, created_at ) VALUES ( ?, ?, ? )
|
|
`)
|
|
|
|
_, err = db.db.DB.ExecContext(ctx, query, id.String(), rate.String(), now)
|
|
return Error.Wrap(err)
|
|
}
|
|
|
|
// GetLockedRate returns locked conversion rate for transaction or error if non exists.
|
|
func (db *coinPaymentsTransactions) GetLockedRate(ctx context.Context, id coinpayments.TransactionID) (_ decimal.Decimal, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
dbxRate, err := db.db.Get_StripecoinpaymentsTxConversionRate_By_TxId(ctx,
|
|
dbx.StripecoinpaymentsTxConversionRate_TxId(id.String()),
|
|
)
|
|
if err != nil {
|
|
if errCode := pgerrcode.FromError(err); errCode == pgxerrcode.UndefinedColumn {
|
|
// TEMPORARY: fall back to expected new schema to facilitate transition
|
|
return db.getLockedRateTransitionShim(ctx, id)
|
|
}
|
|
return decimal.Decimal{}, err
|
|
}
|
|
|
|
var rateF big.Float
|
|
if err = rateF.GobDecode(dbxRate.Rate); err != nil {
|
|
return decimal.Decimal{}, errs.Wrap(err)
|
|
}
|
|
rate, err := monetary.DecimalFromBigFloat(&rateF)
|
|
if err != nil {
|
|
return decimal.Decimal{}, errs.Wrap(err)
|
|
}
|
|
|
|
return rate, nil
|
|
}
|
|
|
|
// getLockedRateTransitionShim returns locked conversion rate for transaction
|
|
// or error if none exists.
|
|
//
|
|
// It is to be used only during the transition from the gob-encoded 'rate'
|
|
// column to 'rate_numeric'. During the transition, the gob-encoded column will
|
|
// still exist but under a different name ('rate_gob'). If rate_numeric for a
|
|
// given row is non-null, it takes precedence over rate_gob.
|
|
//
|
|
// When the transition is complete, this method will go away and
|
|
// GetLockedRate() will operate only on rate_numeric.
|
|
func (db *coinPaymentsTransactions) getLockedRateTransitionShim(ctx context.Context, id coinpayments.TransactionID) (_ decimal.Decimal, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
var rateGob []byte
|
|
var rateNumeric *string
|
|
query := db.db.Rebind(`
|
|
SELECT rate_gob, rate_numeric
|
|
FROM stripecoinpayments_tx_conversion_rates
|
|
WHERE tx_id = ?
|
|
`)
|
|
row := db.db.DB.QueryRowContext(ctx, query, id.String())
|
|
err = row.Scan(&rateGob, &rateNumeric)
|
|
if err != nil {
|
|
return decimal.Decimal{}, Error.Wrap(err)
|
|
}
|
|
|
|
if rateNumeric == nil {
|
|
// This row does not have a numeric rate value yet
|
|
var rateF big.Float
|
|
if err = rateF.GobDecode(rateGob); err != nil {
|
|
return decimal.Decimal{}, Error.Wrap(err)
|
|
}
|
|
rate, err := monetary.DecimalFromBigFloat(&rateF)
|
|
return rate, Error.Wrap(err)
|
|
}
|
|
rate, err := decimal.NewFromString(*rateNumeric)
|
|
return rate, Error.Wrap(err)
|
|
}
|
|
|
|
// ListAccount returns all transaction for specific user.
|
|
func (db *coinPaymentsTransactions) ListAccount(ctx context.Context, userID uuid.UUID) (_ []stripecoinpayments.Transaction, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
dbxTXs, err := db.db.All_CoinpaymentsTransaction_By_UserId_OrderBy_Desc_CreatedAt(ctx,
|
|
dbx.CoinpaymentsTransaction_UserId(userID[:]),
|
|
)
|
|
if err != nil {
|
|
if errCode := pgerrcode.FromError(err); errCode == pgxerrcode.UndefinedColumn {
|
|
// TEMPORARY: fall back to expected new schema to facilitate transition
|
|
return db.listAccountTransitionShim(ctx, userID)
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
var txs []stripecoinpayments.Transaction
|
|
for _, dbxTX := range dbxTXs {
|
|
tx, err := fromDBXCoinpaymentsTransaction(dbxTX)
|
|
if err != nil {
|
|
return nil, errs.Wrap(err)
|
|
}
|
|
|
|
txs = append(txs, *tx)
|
|
}
|
|
|
|
return txs, nil
|
|
}
|
|
|
|
// listAccountTransitionShim returns all transaction for specific user.
|
|
//
|
|
// It is to be used only during the transition from gob-encoded 'amount' and
|
|
// 'received' columns to 'amount_numeric'/'received_numeric'. During the
|
|
// transition, the gob-encoded columns will still exist but under a different
|
|
// name ('amount_gob'/'received_gob'). If the _numeric column value for a given
|
|
// row is non-null, it takes precedence over the corresponding _gob column.
|
|
//
|
|
// When the transition is complete, this method will go away and ListAccount()
|
|
// will operate only on the _numeric columns.
|
|
func (db *coinPaymentsTransactions) listAccountTransitionShim(ctx context.Context, userID uuid.UUID) (_ []stripecoinpayments.Transaction, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
query := db.db.Rebind(`
|
|
SELECT
|
|
id,
|
|
user_id,
|
|
address,
|
|
amount_gob,
|
|
amount_numeric,
|
|
received_gob,
|
|
received_numeric,
|
|
status,
|
|
key,
|
|
timeout,
|
|
created_at
|
|
FROM coinpayments_transactions
|
|
WHERE user_id = ?
|
|
ORDER BY created_at DESC
|
|
`)
|
|
rows, err := db.db.DB.QueryContext(ctx, query, userID[:])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
defer func() { err = errs.Combine(err, rows.Close()) }()
|
|
|
|
var txs []stripecoinpayments.Transaction
|
|
for rows.Next() {
|
|
var tx stripecoinpayments.Transaction
|
|
var amountGob, receivedGob []byte
|
|
var amountNumeric, receivedNumeric *int64
|
|
var timeoutSeconds int
|
|
err := rows.Scan(&tx.ID, &tx.AccountID, &tx.Address, &amountGob, &amountNumeric, &receivedGob, &receivedNumeric, &tx.Status, &tx.Key, &timeoutSeconds, &tx.CreatedAt)
|
|
if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
tx.Timeout = time.Second * time.Duration(timeoutSeconds)
|
|
|
|
if amountNumeric == nil {
|
|
tx.Amount, err = monetaryAmountFromGobEncodedBigFloat(amountGob, monetary.StorjToken)
|
|
if err != nil {
|
|
return nil, Error.New("amount column: %v", err)
|
|
}
|
|
} else {
|
|
tx.Amount = monetary.AmountFromBaseUnits(*amountNumeric, monetary.StorjToken)
|
|
}
|
|
if receivedNumeric == nil {
|
|
tx.Received, err = monetaryAmountFromGobEncodedBigFloat(receivedGob, monetary.StorjToken)
|
|
if err != nil {
|
|
return nil, Error.New("received column: %v", err)
|
|
}
|
|
} else {
|
|
tx.Received = monetary.AmountFromBaseUnits(*receivedNumeric, monetary.StorjToken)
|
|
}
|
|
txs = append(txs, tx)
|
|
}
|
|
|
|
if err = rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return txs, nil
|
|
}
|
|
|
|
// ListPending returns paginated list of pending transactions.
|
|
func (db *coinPaymentsTransactions) ListPending(ctx context.Context, offset int64, limit int, before time.Time) (_ stripecoinpayments.TransactionsPage, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
query := db.db.Rebind(`SELECT
|
|
id,
|
|
user_id,
|
|
address,
|
|
amount,
|
|
received,
|
|
status,
|
|
key,
|
|
created_at
|
|
FROM coinpayments_transactions
|
|
WHERE status IN (?,?)
|
|
AND created_at <= ?
|
|
ORDER by created_at DESC
|
|
LIMIT ? OFFSET ?`)
|
|
|
|
rows, err := db.db.QueryContext(ctx, query, coinpayments.StatusPending, coinpayments.StatusReceived, before, limit+1, offset)
|
|
if err != nil {
|
|
if errCode := pgerrcode.FromError(err); errCode == pgxerrcode.UndefinedColumn {
|
|
// TEMPORARY: fall back to expected new schema to facilitate transition
|
|
return db.listPendingTransitionShim(ctx, offset, limit, before)
|
|
}
|
|
return stripecoinpayments.TransactionsPage{}, err
|
|
}
|
|
|
|
defer func() {
|
|
err = errs.Combine(err, rows.Close())
|
|
}()
|
|
|
|
var page stripecoinpayments.TransactionsPage
|
|
|
|
for rows.Next() {
|
|
var id, address string
|
|
var userID uuid.UUID
|
|
var amountB, receivedB []byte
|
|
var status int
|
|
var key string
|
|
var createdAt time.Time
|
|
|
|
err := rows.Scan(&id, &userID, &address, &amountB, &receivedB, &status, &key, &createdAt)
|
|
if err != nil {
|
|
return stripecoinpayments.TransactionsPage{}, err
|
|
}
|
|
|
|
// TODO: the currency here should be passed in to this function or stored
|
|
// in the database.
|
|
currency := monetary.StorjToken
|
|
|
|
amount, err := monetaryAmountFromGobEncodedBigFloat(amountB, currency)
|
|
if err != nil {
|
|
return stripecoinpayments.TransactionsPage{}, err
|
|
}
|
|
received, err := monetaryAmountFromGobEncodedBigFloat(receivedB, currency)
|
|
if err != nil {
|
|
return stripecoinpayments.TransactionsPage{}, err
|
|
}
|
|
|
|
page.Transactions = append(page.Transactions,
|
|
stripecoinpayments.Transaction{
|
|
ID: coinpayments.TransactionID(id),
|
|
AccountID: userID,
|
|
Address: address,
|
|
Amount: amount,
|
|
Received: received,
|
|
Status: coinpayments.Status(status),
|
|
Key: key,
|
|
CreatedAt: createdAt,
|
|
},
|
|
)
|
|
}
|
|
|
|
if err = rows.Err(); err != nil {
|
|
return stripecoinpayments.TransactionsPage{}, err
|
|
}
|
|
|
|
if len(page.Transactions) == limit+1 {
|
|
page.Next = true
|
|
page.NextOffset = offset + int64(limit)
|
|
page.Transactions = page.Transactions[:len(page.Transactions)-1]
|
|
}
|
|
|
|
return page, nil
|
|
}
|
|
|
|
// listPendingTransitionShim returns paginated list of pending transactions.
|
|
//
|
|
// It is to be used only during the transition from gob-encoded 'amount' and
|
|
// 'received' columns to 'amount_numeric'/'received_numeric'. During the
|
|
// transition, the gob-encoded columns will still exist but under a different
|
|
// name ('amount_gob'/'received_gob'). If the _numeric column value for a given
|
|
// row is non-null, it takes precedence over the corresponding _gob column.
|
|
//
|
|
// When the transition is complete, this method will go away and ListPending()
|
|
// will operate only on the _numeric columns.
|
|
func (db *coinPaymentsTransactions) listPendingTransitionShim(ctx context.Context, offset int64, limit int, before time.Time) (_ stripecoinpayments.TransactionsPage, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
query := db.db.Rebind(`SELECT
|
|
id,
|
|
user_id,
|
|
address,
|
|
amount_gob,
|
|
amount_numeric,
|
|
received_gob,
|
|
received_numeric,
|
|
status,
|
|
key,
|
|
created_at
|
|
FROM coinpayments_transactions
|
|
WHERE status IN (?,?)
|
|
AND created_at <= ?
|
|
LIMIT ? OFFSET ?`)
|
|
|
|
rows, err := db.db.QueryContext(ctx, query, coinpayments.StatusPending, coinpayments.StatusReceived, before, limit+1, offset)
|
|
if err != nil {
|
|
return stripecoinpayments.TransactionsPage{}, Error.Wrap(err)
|
|
}
|
|
|
|
defer func() {
|
|
err = errs.Combine(err, rows.Close())
|
|
}()
|
|
|
|
var page stripecoinpayments.TransactionsPage
|
|
|
|
for rows.Next() {
|
|
var id, address string
|
|
var userID uuid.UUID
|
|
var amountGob, receivedGob []byte
|
|
var amountNumeric, receivedNumeric *int64
|
|
var amount, received monetary.Amount
|
|
var status int
|
|
var key string
|
|
var createdAt time.Time
|
|
|
|
err := rows.Scan(&id, &userID, &address, &amountGob, &amountNumeric, &receivedGob, &receivedNumeric, &status, &key, &createdAt)
|
|
if err != nil {
|
|
return stripecoinpayments.TransactionsPage{}, err
|
|
}
|
|
|
|
if amountNumeric == nil {
|
|
// 'amount' in this row hasn't yet been updated to a numeric value
|
|
amount, err = monetaryAmountFromGobEncodedBigFloat(amountGob, monetary.StorjToken)
|
|
if err != nil {
|
|
return stripecoinpayments.TransactionsPage{}, Error.Wrap(err)
|
|
}
|
|
} else {
|
|
amount = monetary.AmountFromBaseUnits(*amountNumeric, monetary.StorjToken)
|
|
}
|
|
if receivedNumeric == nil {
|
|
// 'received' in this row hasn't yet been updated to a numeric value
|
|
received, err = monetaryAmountFromGobEncodedBigFloat(receivedGob, monetary.StorjToken)
|
|
if err != nil {
|
|
return stripecoinpayments.TransactionsPage{}, Error.Wrap(err)
|
|
}
|
|
} else {
|
|
received = monetary.AmountFromBaseUnits(*receivedNumeric, monetary.StorjToken)
|
|
}
|
|
|
|
page.Transactions = append(page.Transactions,
|
|
stripecoinpayments.Transaction{
|
|
ID: coinpayments.TransactionID(id),
|
|
AccountID: userID,
|
|
Address: address,
|
|
Amount: amount,
|
|
Received: received,
|
|
Status: coinpayments.Status(status),
|
|
Key: key,
|
|
CreatedAt: createdAt,
|
|
},
|
|
)
|
|
}
|
|
|
|
if err = rows.Err(); err != nil {
|
|
return stripecoinpayments.TransactionsPage{}, err
|
|
}
|
|
|
|
if len(page.Transactions) == limit+1 {
|
|
page.Next = true
|
|
page.NextOffset = offset + int64(limit)
|
|
page.Transactions = page.Transactions[:len(page.Transactions)-1]
|
|
}
|
|
|
|
return page, nil
|
|
}
|
|
|
|
// ListUnapplied returns TransactionsPage with a pending or completed status, that should be applied to account balance.
|
|
func (db *coinPaymentsTransactions) ListUnapplied(ctx context.Context, offset int64, limit int, before time.Time) (_ stripecoinpayments.TransactionsPage, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
query := db.db.Rebind(`SELECT
|
|
txs.id,
|
|
txs.user_id,
|
|
txs.address,
|
|
txs.amount,
|
|
txs.received,
|
|
txs.status,
|
|
txs.key,
|
|
txs.created_at
|
|
FROM coinpayments_transactions as txs
|
|
INNER JOIN stripecoinpayments_apply_balance_intents as ints
|
|
ON txs.id = ints.tx_id
|
|
WHERE txs.status >= ?
|
|
AND txs.created_at <= ?
|
|
AND ints.state = ?
|
|
ORDER by txs.created_at DESC
|
|
LIMIT ? OFFSET ?`)
|
|
|
|
rows, err := db.db.QueryContext(ctx, query, coinpayments.StatusReceived, before, applyBalanceIntentStateUnapplied, limit+1, offset)
|
|
if err != nil {
|
|
if errCode := pgerrcode.FromError(err); errCode == pgxerrcode.UndefinedColumn {
|
|
return db.listUnappliedTransitionShim(ctx, offset, limit, before)
|
|
}
|
|
return stripecoinpayments.TransactionsPage{}, err
|
|
}
|
|
defer func() { err = errs.Combine(err, rows.Close()) }()
|
|
|
|
var page stripecoinpayments.TransactionsPage
|
|
|
|
for rows.Next() {
|
|
var id, address string
|
|
var userID uuid.UUID
|
|
var amountB, receivedB []byte
|
|
var status int
|
|
var key string
|
|
var createdAt time.Time
|
|
|
|
err := rows.Scan(&id, &userID, &address, &amountB, &receivedB, &status, &key, &createdAt)
|
|
if err != nil {
|
|
return stripecoinpayments.TransactionsPage{}, err
|
|
}
|
|
|
|
// TODO: the currency here should be passed in to this function or stored
|
|
// in the database.
|
|
currency := monetary.StorjToken
|
|
|
|
amount, err := monetaryAmountFromGobEncodedBigFloat(amountB, currency)
|
|
if err != nil {
|
|
return stripecoinpayments.TransactionsPage{}, Error.New("amount column: %v", err)
|
|
}
|
|
received, err := monetaryAmountFromGobEncodedBigFloat(receivedB, currency)
|
|
if err != nil {
|
|
return stripecoinpayments.TransactionsPage{}, Error.New("received column: %v", err)
|
|
}
|
|
|
|
page.Transactions = append(page.Transactions,
|
|
stripecoinpayments.Transaction{
|
|
ID: coinpayments.TransactionID(id),
|
|
AccountID: userID,
|
|
Address: address,
|
|
Amount: amount,
|
|
Received: received,
|
|
Status: coinpayments.Status(status),
|
|
Key: key,
|
|
CreatedAt: createdAt,
|
|
},
|
|
)
|
|
}
|
|
|
|
if err = rows.Err(); err != nil {
|
|
return stripecoinpayments.TransactionsPage{}, err
|
|
}
|
|
|
|
if len(page.Transactions) == limit+1 {
|
|
page.Next = true
|
|
page.NextOffset = offset + int64(limit)
|
|
page.Transactions = page.Transactions[:len(page.Transactions)-1]
|
|
}
|
|
|
|
return page, nil
|
|
}
|
|
|
|
// listUnappliedTransitionShim returns TransactionsPage with a pending or
|
|
// completed status, that should be applied to account balance.
|
|
//
|
|
// It is to be used only during the transition from gob-encoded 'amount' and
|
|
// 'received' columns to 'amount_numeric'/'received_numeric'. During the
|
|
// transition, the gob-encoded columns will still exist but under a different
|
|
// name ('amount_gob'/'received_gob'). If the _numeric column value for a given
|
|
// row is non-null, it takes precedence over the corresponding _gob column.
|
|
//
|
|
// When the transition is complete, this method will go away and
|
|
// ListUnapplied() will operate only on the _numeric columns.
|
|
func (db *coinPaymentsTransactions) listUnappliedTransitionShim(ctx context.Context, offset int64, limit int, before time.Time) (_ stripecoinpayments.TransactionsPage, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
query := db.db.Rebind(`SELECT
|
|
txs.id,
|
|
txs.user_id,
|
|
txs.address,
|
|
txs.amount_gob,
|
|
txs.amount_numeric,
|
|
txs.received_gob,
|
|
txs.received_numeric,
|
|
txs.status,
|
|
txs.key,
|
|
txs.created_at
|
|
FROM coinpayments_transactions as txs
|
|
INNER JOIN stripecoinpayments_apply_balance_intents as ints
|
|
ON txs.id = ints.tx_id
|
|
WHERE txs.status >= ?
|
|
AND txs.created_at <= ?
|
|
AND ints.state = ?
|
|
ORDER by txs.created_at DESC
|
|
LIMIT ? OFFSET ?`)
|
|
|
|
rows, err := db.db.QueryContext(ctx, query, coinpayments.StatusReceived, before, applyBalanceIntentStateUnapplied, limit+1, offset)
|
|
if err != nil {
|
|
return stripecoinpayments.TransactionsPage{}, err
|
|
}
|
|
defer func() { err = errs.Combine(err, rows.Close()) }()
|
|
|
|
var page stripecoinpayments.TransactionsPage
|
|
|
|
for rows.Next() {
|
|
var id, address string
|
|
var userID uuid.UUID
|
|
var amountGob, receivedGob []byte
|
|
var amountNumeric, receivedNumeric *int64
|
|
var status int
|
|
var key string
|
|
var createdAt time.Time
|
|
|
|
err := rows.Scan(&id, &userID, &address, &amountGob, &amountNumeric, &receivedGob, &receivedNumeric, &status, &key, &createdAt)
|
|
if err != nil {
|
|
return stripecoinpayments.TransactionsPage{}, err
|
|
}
|
|
|
|
var amount, received monetary.Amount
|
|
if amountNumeric == nil {
|
|
// 'amount' in this row hasn't yet been updated to a numeric value
|
|
amount, err = monetaryAmountFromGobEncodedBigFloat(amountGob, monetary.StorjToken)
|
|
if err != nil {
|
|
return stripecoinpayments.TransactionsPage{}, Error.Wrap(err)
|
|
}
|
|
} else {
|
|
amount = monetary.AmountFromBaseUnits(*amountNumeric, monetary.StorjToken)
|
|
}
|
|
if receivedNumeric == nil {
|
|
// 'received' in this row hasn't yet been updated to a numeric value
|
|
received, err = monetaryAmountFromGobEncodedBigFloat(receivedGob, monetary.StorjToken)
|
|
if err != nil {
|
|
return stripecoinpayments.TransactionsPage{}, Error.Wrap(err)
|
|
}
|
|
} else {
|
|
received = monetary.AmountFromBaseUnits(*receivedNumeric, monetary.StorjToken)
|
|
}
|
|
|
|
page.Transactions = append(page.Transactions,
|
|
stripecoinpayments.Transaction{
|
|
ID: coinpayments.TransactionID(id),
|
|
AccountID: userID,
|
|
Address: address,
|
|
Amount: amount,
|
|
Received: received,
|
|
Status: coinpayments.Status(status),
|
|
Key: key,
|
|
CreatedAt: createdAt,
|
|
},
|
|
)
|
|
}
|
|
|
|
if err = rows.Err(); err != nil {
|
|
return stripecoinpayments.TransactionsPage{}, err
|
|
}
|
|
|
|
if len(page.Transactions) == limit+1 {
|
|
page.Next = true
|
|
page.NextOffset = offset + int64(limit)
|
|
page.Transactions = page.Transactions[:len(page.Transactions)-1]
|
|
}
|
|
|
|
return page, nil
|
|
}
|
|
|
|
// fromDBXCoinpaymentsTransaction converts *dbx.CoinpaymentsTransaction to *stripecoinpayments.Transaction.
|
|
func fromDBXCoinpaymentsTransaction(dbxCPTX *dbx.CoinpaymentsTransaction) (*stripecoinpayments.Transaction, error) {
|
|
userID, err := uuid.FromBytes(dbxCPTX.UserId)
|
|
if err != nil {
|
|
return nil, errs.Wrap(err)
|
|
}
|
|
|
|
// TODO: the currency here should be passed in to this function or stored
|
|
// in the database.
|
|
currency := monetary.StorjToken
|
|
|
|
amount, err := monetaryAmountFromGobEncodedBigFloat(dbxCPTX.Amount, currency)
|
|
if err != nil {
|
|
return nil, Error.New("amount column: %v", err)
|
|
}
|
|
received, err := monetaryAmountFromGobEncodedBigFloat(dbxCPTX.Received, currency)
|
|
if err != nil {
|
|
return nil, Error.New("received column: %v", err)
|
|
}
|
|
|
|
return &stripecoinpayments.Transaction{
|
|
ID: coinpayments.TransactionID(dbxCPTX.Id),
|
|
AccountID: userID,
|
|
Address: dbxCPTX.Address,
|
|
Amount: amount,
|
|
Received: received,
|
|
Status: coinpayments.Status(dbxCPTX.Status),
|
|
Key: dbxCPTX.Key,
|
|
Timeout: time.Second * time.Duration(dbxCPTX.Timeout),
|
|
CreatedAt: dbxCPTX.CreatedAt,
|
|
}, nil
|
|
}
|
|
|
|
func monetaryAmountFromGobEncodedBigFloat(encoded []byte, currency *monetary.Currency) (_ monetary.Amount, err error) {
|
|
var bf big.Float
|
|
if err := bf.GobDecode(encoded); err != nil {
|
|
return monetary.Amount{}, Error.Wrap(err)
|
|
}
|
|
return monetary.AmountFromBigFloat(&bf, currency)
|
|
}
|
|
|
|
// DebugPerformBigFloatTransition performs the schema changes expected as part
|
|
// of Step 2 of the transition away from gob-encoded big.Float columns in the
|
|
// database.
|
|
//
|
|
// This is for testing purposes only, to ensure that no data is lost and that
|
|
// code still works after the transition.
|
|
func (db *coinPaymentsTransactions) DebugPerformBigFloatTransition(ctx context.Context) error {
|
|
_, err := db.db.DB.ExecContext(ctx, `
|
|
ALTER TABLE coinpayments_transactions ALTER COLUMN amount DROP NOT NULL;
|
|
ALTER TABLE coinpayments_transactions ALTER COLUMN received DROP NOT NULL;
|
|
ALTER TABLE coinpayments_transactions RENAME COLUMN amount TO amount_gob;
|
|
ALTER TABLE coinpayments_transactions RENAME COLUMN received TO received_gob;
|
|
ALTER TABLE coinpayments_transactions ADD COLUMN amount_numeric INT8;
|
|
ALTER TABLE coinpayments_transactions ADD COLUMN received_numeric INT8;
|
|
ALTER TABLE stripecoinpayments_tx_conversion_rates ALTER COLUMN rate DROP NOT NULL;
|
|
ALTER TABLE stripecoinpayments_tx_conversion_rates RENAME COLUMN rate TO rate_gob;
|
|
ALTER TABLE stripecoinpayments_tx_conversion_rates ADD COLUMN rate_numeric NUMERIC(20, 8);
|
|
`)
|
|
return Error.Wrap(err)
|
|
}
|