diff --git a/satellite/payments/stripecoinpayments/tokens.go b/satellite/payments/stripecoinpayments/tokens.go index 270b84f37..55a917d35 100644 --- a/satellite/payments/stripecoinpayments/tokens.go +++ b/satellite/payments/stripecoinpayments/tokens.go @@ -89,7 +89,7 @@ func (tokens *storjTokens) Deposit(ctx context.Context, userID uuid.UUID, amount return nil, Error.Wrap(err) } - cpTX, err := tokens.service.db.Transactions().Insert(ctx, + createTime, err := tokens.service.db.Transactions().Insert(ctx, Transaction{ ID: tx.ID, AccountID: userID, @@ -112,7 +112,7 @@ func (tokens *storjTokens) Deposit(ctx context.Context, userID uuid.UUID, amount Status: payments.TransactionStatusPending, Timeout: tx.Timeout, Link: tx.CheckoutURL, - CreatedAt: cpTX.CreatedAt, + CreatedAt: createTime, }, nil } diff --git a/satellite/payments/stripecoinpayments/transactions.go b/satellite/payments/stripecoinpayments/transactions.go index 0478b521f..6a9cedd35 100644 --- a/satellite/payments/stripecoinpayments/transactions.go +++ b/satellite/payments/stripecoinpayments/transactions.go @@ -24,7 +24,7 @@ var ErrTransactionConsumed = errs.New("error transaction already consumed") // architecture: Database type TransactionsDB interface { // Insert inserts new coinpayments transaction into DB. - Insert(ctx context.Context, tx Transaction) (*Transaction, error) + Insert(ctx context.Context, tx Transaction) (time.Time, error) // Update updates status and received for set of transactions. Update(ctx context.Context, updates []TransactionUpdate, applies coinpayments.TransactionIDList) error // Consume marks transaction as consumed, so it won't participate in apply account balance loop. diff --git a/satellite/payments/stripecoinpayments/transactions_test.go b/satellite/payments/stripecoinpayments/transactions_test.go index f4e58bed8..fdccd0907 100644 --- a/satellite/payments/stripecoinpayments/transactions_test.go +++ b/satellite/payments/stripecoinpayments/transactions_test.go @@ -4,6 +4,7 @@ package stripecoinpayments_test import ( + "context" "encoding/base64" "errors" "sync" @@ -51,9 +52,9 @@ func TestTransactionsDB(t *testing.T) { } t.Run("insert", func(t *testing.T) { - tx, err := transactions.Insert(ctx, createTx) + createdAt, err := transactions.Insert(ctx, createTx) require.NoError(t, err) - requireSaneTimestamp(t, tx.CreatedAt) + requireSaneTimestamp(t, createdAt) txs, err := transactions.ListAccount(ctx, userID) require.NoError(t, err) require.Len(t, txs, 1) @@ -139,7 +140,7 @@ func TestConcurrentConsume(t *testing.T) { userID := testrand.UUID() txID := coinpayments.TransactionID("testID") - tx, err := transactions.Insert(ctx, + _, err = transactions.Insert(ctx, stripecoinpayments.Transaction{ ID: txID, AccountID: userID, @@ -155,12 +156,12 @@ func TestConcurrentConsume(t *testing.T) { err = transactions.Update(ctx, []stripecoinpayments.TransactionUpdate{{ - TransactionID: tx.ID, + TransactionID: txID, Status: coinpayments.StatusCompleted, Received: received, }}, coinpayments.TransactionIDList{ - tx.ID, + txID, }, ) require.NoError(t, err) @@ -178,7 +179,7 @@ func TestConcurrentConsume(t *testing.T) { var group errs2.Group for i := 0; i < concurrentTries; i++ { group.Go(func() error { - err := transactions.Consume(ctx, tx.ID) + err := transactions.Consume(ctx, txID) if err == nil { return nil @@ -422,3 +423,215 @@ func TestTransactions_ApplyTransactionBalance(t *testing.T) { require.EqualValues(t, "0.2", cbt.Metadata["storj_usd_rate"]) }) } + +func TestDatabaseBigFloatTransition(t *testing.T) { + satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { + transactions := db.StripeCoinPayments().Transactions() + + accountID := testrand.UUID() + amount, err := monetary.AmountFromString("100000000.00000001", monetary.StorjToken) + require.NoError(t, err) + received, err := monetary.AmountFromString("0.00000002", monetary.StorjToken) + require.NoError(t, err) + rate1, err := decimal.NewFromString("1001.23456789") + require.NoError(t, err) + rate2, err := decimal.NewFromString("3.14159265") + require.NoError(t, err) + + tx1 := stripecoinpayments.Transaction{ + ID: "abcdefg", + AccountID: accountID, + Address: "0xbeedeebeedeebeedeebeedeebeedeebeedeebeed", + Amount: amount, + Received: received, + Status: coinpayments.StatusPending, + Key: "beep beep im a sheep", + Timeout: time.Hour * 48, + } + tx2 := stripecoinpayments.Transaction{ + ID: "hijklmn", + AccountID: accountID, + Address: "0x77687927642075206576656e20626f746865723f", + Amount: amount, + Received: received, + Status: coinpayments.StatusPending, + Key: "how now im a cow", + Timeout: 0, + } + + // perform an Insert on old schema + { + createdAt, err := transactions.Insert(ctx, tx1) + require.NoError(t, err) + requireSaneTimestamp(t, createdAt) + tx1.CreatedAt = createdAt + } + + // perform an Update on old schema + { + newReceived, err := monetary.AmountFromString("0.12345678", monetary.StorjToken) + require.NoError(t, err) + upd := stripecoinpayments.TransactionUpdate{ + TransactionID: tx1.ID, + Status: coinpayments.StatusReceived, + Received: newReceived, + } + err = transactions.Update(ctx, []stripecoinpayments.TransactionUpdate{upd}, coinpayments.TransactionIDList{tx1.ID}) + require.NoError(t, err) + tx1.Status = coinpayments.StatusReceived + tx1.Received = newReceived + } + + // perform a ListAccount on old schema + { + accountTxs, err := transactions.ListAccount(ctx, accountID) + require.NoError(t, err) + require.Len(t, accountTxs, 1) + require.Equal(t, tx1, accountTxs[0]) + } + + // perform a ListPending on old schema + { + pendingTxs, err := transactions.ListPending(ctx, 0, 10, time.Now().UTC()) + require.NoError(t, err) + require.Len(t, pendingTxs.Transactions, 1) + // ListPending doesn't get the timeout column, so set it manually in order to check equality of other fields + pendingTxs.Transactions[0].Timeout = tx1.Timeout + require.Equal(t, tx1, pendingTxs.Transactions[0]) + require.False(t, pendingTxs.Next) + require.Equal(t, int64(0), pendingTxs.NextOffset) + } + + // perform a ListUnapplied on old schema + { + unappliedTxs, err := transactions.ListUnapplied(ctx, 0, 10, time.Now().UTC()) + require.NoError(t, err) + require.Len(t, unappliedTxs.Transactions, 1) + // ListUnapplied doesn't get the timeout column, so set it manually in order to check equality of other fields + unappliedTxs.Transactions[0].Timeout = tx1.Timeout + require.Equal(t, tx1, unappliedTxs.Transactions[0]) + require.False(t, unappliedTxs.Next) + require.Equal(t, int64(0), unappliedTxs.NextOffset) + } + + // perform a LockRate on old schema + { + err = transactions.LockRate(ctx, tx1.ID, rate1) + require.NoError(t, err) + } + + // perform a GetLockedRate on old schema + { + gotRate, err := transactions.GetLockedRate(ctx, tx1.ID) + require.NoError(t, err) + require.Equal(t, rate1, gotRate) + } + + // do schema update + { + transitionDB := transactions.(interface { + DebugPerformBigFloatTransition(ctx context.Context) error + }) + err = transitionDB.DebugPerformBigFloatTransition(ctx) + require.NoError(t, err) + } + + // perform an Insert on new schema + { + createdAt, err := transactions.Insert(ctx, tx2) + require.NoError(t, err) + requireSaneTimestamp(t, createdAt) + tx2.CreatedAt = createdAt + } + + // perform a ListAccount on new schema, while tx1 has a received_gob + { + accountTxs, err := transactions.ListAccount(ctx, accountID) + require.NoError(t, err) + require.Len(t, accountTxs, 2) + // results should be ordered in reverse order of creation + require.Equal(t, tx2, accountTxs[0]) + require.Equal(t, tx1, accountTxs[1]) + } + + // perform an Update on new schema, on a row with a received_gob + { + newReceived, err := monetary.AmountFromString("1.11111111", monetary.StorjToken) + require.NoError(t, err) + upd := stripecoinpayments.TransactionUpdate{ + TransactionID: tx1.ID, + Status: coinpayments.StatusPending, + Received: newReceived, + } + err = transactions.Update(ctx, []stripecoinpayments.TransactionUpdate{upd}, nil) + require.NoError(t, err) + tx1.Status = coinpayments.StatusPending + tx1.Received = newReceived + } + + // perform an Update on new schema, on a row with a received_numeric + { + newReceived, err := monetary.AmountFromString("2.12121212", monetary.StorjToken) + require.NoError(t, err) + upd := stripecoinpayments.TransactionUpdate{ + TransactionID: tx2.ID, + Status: coinpayments.StatusCompleted, + Received: newReceived, + } + err = transactions.Update(ctx, []stripecoinpayments.TransactionUpdate{upd}, coinpayments.TransactionIDList{tx2.ID}) + require.NoError(t, err) + tx2.Status = coinpayments.StatusCompleted + tx2.Received = newReceived + } + + // perform a ListAccount on new schema, after the above changes + { + accountTxs, err := transactions.ListAccount(ctx, accountID) + require.NoError(t, err) + require.Len(t, accountTxs, 2) + // results should be ordered in reverse order of creation + require.Equal(t, tx2, accountTxs[0]) + require.Equal(t, tx1, accountTxs[1]) + } + + // perform a ListPending on new schema + { + pendingTxs, err := transactions.ListPending(ctx, 0, 10, time.Now().UTC()) + require.NoError(t, err) + require.Len(t, pendingTxs.Transactions, 1) + // ListPending doesn't get the timeout column, so set it manually in order to check equality of other fields + pendingTxs.Transactions[0].Timeout = tx1.Timeout + require.Equal(t, tx1, pendingTxs.Transactions[0]) + require.False(t, pendingTxs.Next) + require.Equal(t, int64(0), pendingTxs.NextOffset) + } + + // perform a ListUnapplied on new schema + { + unappliedTxs, err := transactions.ListUnapplied(ctx, 0, 10, time.Now().UTC()) + require.NoError(t, err) + require.Len(t, unappliedTxs.Transactions, 1) + // ListUnapplied doesn't get the timeout column, so set it manually in order to check equality of other fields + unappliedTxs.Transactions[0].Timeout = tx2.Timeout + require.Equal(t, tx2, unappliedTxs.Transactions[0]) + require.False(t, unappliedTxs.Next) + require.Equal(t, int64(0), unappliedTxs.NextOffset) + } + + // perform a LockRate on new schema + { + err = transactions.LockRate(ctx, tx2.ID, rate2) + require.NoError(t, err) + } + + // perform a GetLockedRate on new schema + { + gotRate, err := transactions.GetLockedRate(ctx, tx1.ID) + require.NoError(t, err) + require.Equal(t, rate1, gotRate) + gotRate, err = transactions.GetLockedRate(ctx, tx2.ID) + require.NoError(t, err) + require.Equal(t, rate2, gotRate) + } + }) +} diff --git a/satellite/satellitedb/coinpaymentstxs.go b/satellite/satellitedb/coinpaymentstxs.go index 4a327bd68..69e04b94d 100644 --- a/satellite/satellitedb/coinpaymentstxs.go +++ b/satellite/satellitedb/coinpaymentstxs.go @@ -8,10 +8,12 @@ import ( "math/big" "time" + pgxerrcode "github.com/jackc/pgerrcode" "github.com/shopspring/decimal" "github.com/zeebo/errs" "storj.io/common/uuid" + "storj.io/private/dbutil/pgutil/pgerrcode" "storj.io/storj/satellite/payments/coinpayments" "storj.io/storj/satellite/payments/monetary" "storj.io/storj/satellite/payments/stripecoinpayments" @@ -44,16 +46,16 @@ type coinPaymentsTransactions struct { } // Insert inserts new coinpayments transaction into DB. -func (db *coinPaymentsTransactions) Insert(ctx context.Context, tx stripecoinpayments.Transaction) (_ *stripecoinpayments.Transaction, err error) { +func (db *coinPaymentsTransactions) Insert(ctx context.Context, tx stripecoinpayments.Transaction) (createTime time.Time, err error) { defer mon.Task()(&ctx)(&err) amount, err := tx.Amount.AsBigFloat().GobEncode() if err != nil { - return nil, errs.Wrap(err) + return time.Time{}, errs.Wrap(err) } received, err := tx.Received.AsBigFloat().GobEncode() if err != nil { - return nil, errs.Wrap(err) + return time.Time{}, errs.Wrap(err) } dbxCPTX, err := db.db.Create_CoinpaymentsTransaction(ctx, @@ -67,10 +69,34 @@ func (db *coinPaymentsTransactions) Insert(ctx context.Context, tx stripecoinpay dbx.CoinpaymentsTransaction_Timeout(int(tx.Timeout.Seconds())), ) if err != nil { - return nil, err + if errCode := pgerrcode.FromError(err); errCode == pgxerrcode.UndefinedColumn { + // TEMPORARY: fall back to expected new schema to facilitate transition + return db.insertTransitionShim(ctx, tx) + } + return time.Time{}, err } + return dbxCPTX.CreatedAt, nil +} - return fromDBXCoinpaymentsTransaction(dbxCPTX) +// insertTransitionShim inserts new coinpayments transaction into DB. +// +// It is to be used only during the transition from gob-encoded 'amount' and +// 'received' columns to 'amount_numeric'/'received_numeric'. +// +// When the transition is complete, this method will go away and Insert() +// will operate only on the _numeric columns. +func (db *coinPaymentsTransactions) insertTransitionShim(ctx context.Context, tx stripecoinpayments.Transaction) (createTime time.Time, err error) { + row := db.db.DB.QueryRowContext(ctx, db.db.Rebind(` + INSERT INTO coinpayments_transactions ( + id, user_id, address, amount_numeric, received_numeric, status, key, timeout, created_at + ) VALUES ( + ?, ?, ?, ?, ?, ?, ?, ?, now() + ) RETURNING created_at; + `), tx.ID.String(), tx.AccountID[:], tx.Address, tx.Amount.BaseUnits(), tx.Received.BaseUnits(), tx.Status.Int(), tx.Key, int(tx.Timeout.Seconds())) + if err := row.Scan(&createTime); err != nil { + return time.Time{}, Error.Wrap(err) + } + return createTime, nil } // Update updates status and received for set of transactions. @@ -81,9 +107,9 @@ func (db *coinPaymentsTransactions) Update(ctx context.Context, updates []stripe return nil } - return db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error { + err = db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error { for _, update := range updates { - received, err := update.Received.AsBigFloat().GobEncode() + receivedGob, err := update.Received.AsBigFloat().GobEncode() if err != nil { return errs.Wrap(err) } @@ -91,7 +117,7 @@ func (db *coinPaymentsTransactions) Update(ctx context.Context, updates []stripe _, err = tx.Update_CoinpaymentsTransaction_By_Id(ctx, dbx.CoinpaymentsTransaction_Id(update.TransactionID.String()), dbx.CoinpaymentsTransaction_Update_Fields{ - Received: dbx.CoinpaymentsTransaction_Received(received), + Received: dbx.CoinpaymentsTransaction_Received(receivedGob), Status: dbx.CoinpaymentsTransaction_Status(update.Status.Int()), }, ) @@ -111,19 +137,73 @@ func (db *coinPaymentsTransactions) Update(ctx context.Context, updates []stripe return nil }) + + if err != nil { + if errCode := pgerrcode.FromError(err); errCode == pgxerrcode.UndefinedColumn { + // TEMPORARY: fall back to expected new schema to facilitate transition + return db.updateTransitionShim(ctx, updates, applies) + } + } + return err +} + +// updateTransitionShim updates status and received for set of transactions. +// +// It is to be used only during the transition from gob-encoded 'amount' and +// 'received' columns to 'amount_numeric'/'received_numeric'. During the +// transition, the gob-encoded columns will still exist but under a different +// name ('amount_gob'/'received_gob'). If the _numeric column value for a given +// row is non-null, it takes precedence over the corresponding _gob column. +// +// When the transition is complete, this method will go away and +// Update() will operate only on the _numeric columns. +func (db *coinPaymentsTransactions) updateTransitionShim(ctx context.Context, updates []stripecoinpayments.TransactionUpdate, applies coinpayments.TransactionIDList) (err error) { + defer mon.Task()(&ctx)(&err) + + if len(updates) == 0 { + return nil + } + + return db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error { + for _, update := range updates { + query := db.db.Rebind(` + UPDATE coinpayments_transactions + SET + received_gob = NULL, + received_numeric = ?, + status = ? + WHERE id = ? + `) + _, err := tx.Tx.ExecContext(ctx, query, update.Received.BaseUnits(), update.Status.Int(), update.TransactionID.String()) + if err != nil { + return err + } + } + + for _, txID := range applies { + query := db.db.Rebind(`INSERT INTO stripecoinpayments_apply_balance_intents ( tx_id, state, created_at ) + VALUES ( ?, ?, ? ) ON CONFLICT DO NOTHING`) + _, err = tx.Tx.ExecContext(ctx, query, txID.String(), applyBalanceIntentStateUnapplied.Int(), db.db.Hooks.Now().UTC()) + if err != nil { + return err + } + } + + return nil + }) } // Consume marks transaction as consumed, so it won't participate in apply account balance loop. func (db *coinPaymentsTransactions) Consume(ctx context.Context, id coinpayments.TransactionID) (err error) { defer mon.Task()(&ctx)(&err) - query := db.db.Rebind(` + query := db.db.Rebind(` WITH intent AS ( - SELECT tx_id, state FROM stripecoinpayments_apply_balance_intents WHERE tx_id = ? + SELECT tx_id, state FROM stripecoinpayments_apply_balance_intents WHERE tx_id = ? ), updated AS ( UPDATE stripecoinpayments_apply_balance_intents AS ints - SET - state = ? + SET + state = ? FROM intent WHERE intent.tx_id = ints.tx_id AND ints.state = ? RETURNING 1 @@ -154,14 +234,39 @@ func (db *coinPaymentsTransactions) LockRate(ctx context.Context, id coinpayment buff, err := rate.BigFloat().GobEncode() if err != nil { - return errs.Wrap(err) + return Error.Wrap(err) } _, err = db.db.Create_StripecoinpaymentsTxConversionRate(ctx, dbx.StripecoinpaymentsTxConversionRate_TxId(id.String()), dbx.StripecoinpaymentsTxConversionRate_Rate(buff)) - return err + if err != nil { + if errCode := pgerrcode.FromError(err); errCode == pgxerrcode.UndefinedColumn { + // TEMPORARY: fall back to expected new schema to facilitate transition + return db.lockRateTransitionShim(ctx, id, rate) + } + } + return Error.Wrap(err) +} + +// lockRateTransitionShim locks conversion rate for transaction. +// +// It is to be used only during the transition from the gob-encoded 'rate' +// column to 'rate_numeric'. +// +// When the transition is complete, this method will go away and +// LockRate() will operate only on the _numeric column. +func (db *coinPaymentsTransactions) lockRateTransitionShim(ctx context.Context, id coinpayments.TransactionID, rate decimal.Decimal) (err error) { + defer mon.Task()(&ctx)(&err) + + now := time.Now().UTC() + query := db.db.Rebind(` + INSERT INTO stripecoinpayments_tx_conversion_rates ( tx_id, rate_numeric, created_at ) VALUES ( ?, ?, ? ) + `) + + _, err = db.db.DB.ExecContext(ctx, query, id.String(), rate.String(), now) + return Error.Wrap(err) } // GetLockedRate returns locked conversion rate for transaction or error if non exists. @@ -172,6 +277,10 @@ func (db *coinPaymentsTransactions) GetLockedRate(ctx context.Context, id coinpa dbx.StripecoinpaymentsTxConversionRate_TxId(id.String()), ) if err != nil { + if errCode := pgerrcode.FromError(err); errCode == pgxerrcode.UndefinedColumn { + // TEMPORARY: fall back to expected new schema to facilitate transition + return db.getLockedRateTransitionShim(ctx, id) + } return decimal.Decimal{}, err } @@ -187,6 +296,45 @@ func (db *coinPaymentsTransactions) GetLockedRate(ctx context.Context, id coinpa return rate, nil } +// getLockedRateTransitionShim returns locked conversion rate for transaction +// or error if none exists. +// +// It is to be used only during the transition from the gob-encoded 'rate' +// column to 'rate_numeric'. During the transition, the gob-encoded column will +// still exist but under a different name ('rate_gob'). If rate_numeric for a +// given row is non-null, it takes precedence over rate_gob. +// +// When the transition is complete, this method will go away and +// GetLockedRate() will operate only on rate_numeric. +func (db *coinPaymentsTransactions) getLockedRateTransitionShim(ctx context.Context, id coinpayments.TransactionID) (_ decimal.Decimal, err error) { + defer mon.Task()(&ctx)(&err) + + var rateGob []byte + var rateNumeric *string + query := db.db.Rebind(` + SELECT rate_gob, rate_numeric + FROM stripecoinpayments_tx_conversion_rates + WHERE tx_id = ? + `) + row := db.db.DB.QueryRowContext(ctx, query, id.String()) + err = row.Scan(&rateGob, &rateNumeric) + if err != nil { + return decimal.Decimal{}, Error.Wrap(err) + } + + if rateNumeric == nil { + // This row does not have a numeric rate value yet + var rateF big.Float + if err = rateF.GobDecode(rateGob); err != nil { + return decimal.Decimal{}, Error.Wrap(err) + } + rate, err := monetary.DecimalFromBigFloat(&rateF) + return rate, Error.Wrap(err) + } + rate, err := decimal.NewFromString(*rateNumeric) + return rate, Error.Wrap(err) +} + // ListAccount returns all transaction for specific user. func (db *coinPaymentsTransactions) ListAccount(ctx context.Context, userID uuid.UUID) (_ []stripecoinpayments.Transaction, err error) { defer mon.Task()(&ctx)(&err) @@ -195,6 +343,10 @@ func (db *coinPaymentsTransactions) ListAccount(ctx context.Context, userID uuid dbx.CoinpaymentsTransaction_UserId(userID[:]), ) if err != nil { + if errCode := pgerrcode.FromError(err); errCode == pgxerrcode.UndefinedColumn { + // TEMPORARY: fall back to expected new schema to facilitate transition + return db.listAccountTransitionShim(ctx, userID) + } return nil, err } @@ -211,11 +363,86 @@ func (db *coinPaymentsTransactions) ListAccount(ctx context.Context, userID uuid return txs, nil } +// listAccountTransitionShim returns all transaction for specific user. +// +// It is to be used only during the transition from gob-encoded 'amount' and +// 'received' columns to 'amount_numeric'/'received_numeric'. During the +// transition, the gob-encoded columns will still exist but under a different +// name ('amount_gob'/'received_gob'). If the _numeric column value for a given +// row is non-null, it takes precedence over the corresponding _gob column. +// +// When the transition is complete, this method will go away and ListAccount() +// will operate only on the _numeric columns. +func (db *coinPaymentsTransactions) listAccountTransitionShim(ctx context.Context, userID uuid.UUID) (_ []stripecoinpayments.Transaction, err error) { + defer mon.Task()(&ctx)(&err) + + query := db.db.Rebind(` + SELECT + id, + user_id, + address, + amount_gob, + amount_numeric, + received_gob, + received_numeric, + status, + key, + timeout, + created_at + FROM coinpayments_transactions + WHERE user_id = ? + ORDER BY created_at DESC + `) + rows, err := db.db.DB.QueryContext(ctx, query, userID[:]) + if err != nil { + return nil, err + } + + defer func() { err = errs.Combine(err, rows.Close()) }() + + var txs []stripecoinpayments.Transaction + for rows.Next() { + var tx stripecoinpayments.Transaction + var amountGob, receivedGob []byte + var amountNumeric, receivedNumeric *int64 + var timeoutSeconds int + err := rows.Scan(&tx.ID, &tx.AccountID, &tx.Address, &amountGob, &amountNumeric, &receivedGob, &receivedNumeric, &tx.Status, &tx.Key, &timeoutSeconds, &tx.CreatedAt) + if err != nil { + return nil, Error.Wrap(err) + } + tx.Timeout = time.Second * time.Duration(timeoutSeconds) + + if amountNumeric == nil { + tx.Amount, err = monetaryAmountFromGobEncodedBigFloat(amountGob, monetary.StorjToken) + if err != nil { + return nil, Error.New("amount column: %v", err) + } + } else { + tx.Amount = monetary.AmountFromBaseUnits(*amountNumeric, monetary.StorjToken) + } + if receivedNumeric == nil { + tx.Received, err = monetaryAmountFromGobEncodedBigFloat(receivedGob, monetary.StorjToken) + if err != nil { + return nil, Error.New("received column: %v", err) + } + } else { + tx.Received = monetary.AmountFromBaseUnits(*receivedNumeric, monetary.StorjToken) + } + txs = append(txs, tx) + } + + if err = rows.Err(); err != nil { + return nil, err + } + + return txs, nil +} + // ListPending returns paginated list of pending transactions. func (db *coinPaymentsTransactions) ListPending(ctx context.Context, offset int64, limit int, before time.Time) (_ stripecoinpayments.TransactionsPage, err error) { defer mon.Task()(&ctx)(&err) - query := db.db.Rebind(`SELECT + query := db.db.Rebind(`SELECT id, user_id, address, @@ -224,7 +451,7 @@ func (db *coinPaymentsTransactions) ListPending(ctx context.Context, offset int6 status, key, created_at - FROM coinpayments_transactions + FROM coinpayments_transactions WHERE status IN (?,?) AND created_at <= ? ORDER by created_at DESC @@ -232,6 +459,10 @@ func (db *coinPaymentsTransactions) ListPending(ctx context.Context, offset int6 rows, err := db.db.QueryContext(ctx, query, coinpayments.StatusPending, coinpayments.StatusReceived, before, limit+1, offset) if err != nil { + if errCode := pgerrcode.FromError(err); errCode == pgxerrcode.UndefinedColumn { + // TEMPORARY: fall back to expected new schema to facilitate transition + return db.listPendingTransitionShim(ctx, offset, limit, before) + } return stripecoinpayments.TransactionsPage{}, err } @@ -294,11 +525,112 @@ func (db *coinPaymentsTransactions) ListPending(ctx context.Context, offset int6 return page, nil } +// listPendingTransitionShim returns paginated list of pending transactions. +// +// It is to be used only during the transition from gob-encoded 'amount' and +// 'received' columns to 'amount_numeric'/'received_numeric'. During the +// transition, the gob-encoded columns will still exist but under a different +// name ('amount_gob'/'received_gob'). If the _numeric column value for a given +// row is non-null, it takes precedence over the corresponding _gob column. +// +// When the transition is complete, this method will go away and ListPending() +// will operate only on the _numeric columns. +func (db *coinPaymentsTransactions) listPendingTransitionShim(ctx context.Context, offset int64, limit int, before time.Time) (_ stripecoinpayments.TransactionsPage, err error) { + defer mon.Task()(&ctx)(&err) + + query := db.db.Rebind(`SELECT + id, + user_id, + address, + amount_gob, + amount_numeric, + received_gob, + received_numeric, + status, + key, + created_at + FROM coinpayments_transactions + WHERE status IN (?,?) + AND created_at <= ? + LIMIT ? OFFSET ?`) + + rows, err := db.db.QueryContext(ctx, query, coinpayments.StatusPending, coinpayments.StatusReceived, before, limit+1, offset) + if err != nil { + return stripecoinpayments.TransactionsPage{}, Error.Wrap(err) + } + + defer func() { + err = errs.Combine(err, rows.Close()) + }() + + var page stripecoinpayments.TransactionsPage + + for rows.Next() { + var id, address string + var userID uuid.UUID + var amountGob, receivedGob []byte + var amountNumeric, receivedNumeric *int64 + var amount, received monetary.Amount + var status int + var key string + var createdAt time.Time + + err := rows.Scan(&id, &userID, &address, &amountGob, &amountNumeric, &receivedGob, &receivedNumeric, &status, &key, &createdAt) + if err != nil { + return stripecoinpayments.TransactionsPage{}, err + } + + if amountNumeric == nil { + // 'amount' in this row hasn't yet been updated to a numeric value + amount, err = monetaryAmountFromGobEncodedBigFloat(amountGob, monetary.StorjToken) + if err != nil { + return stripecoinpayments.TransactionsPage{}, Error.Wrap(err) + } + } else { + amount = monetary.AmountFromBaseUnits(*amountNumeric, monetary.StorjToken) + } + if receivedNumeric == nil { + // 'received' in this row hasn't yet been updated to a numeric value + received, err = monetaryAmountFromGobEncodedBigFloat(receivedGob, monetary.StorjToken) + if err != nil { + return stripecoinpayments.TransactionsPage{}, Error.Wrap(err) + } + } else { + received = monetary.AmountFromBaseUnits(*receivedNumeric, monetary.StorjToken) + } + + page.Transactions = append(page.Transactions, + stripecoinpayments.Transaction{ + ID: coinpayments.TransactionID(id), + AccountID: userID, + Address: address, + Amount: amount, + Received: received, + Status: coinpayments.Status(status), + Key: key, + CreatedAt: createdAt, + }, + ) + } + + if err = rows.Err(); err != nil { + return stripecoinpayments.TransactionsPage{}, err + } + + if len(page.Transactions) == limit+1 { + page.Next = true + page.NextOffset = offset + int64(limit) + page.Transactions = page.Transactions[:len(page.Transactions)-1] + } + + return page, nil +} + // ListUnapplied returns TransactionsPage with a pending or completed status, that should be applied to account balance. func (db *coinPaymentsTransactions) ListUnapplied(ctx context.Context, offset int64, limit int, before time.Time) (_ stripecoinpayments.TransactionsPage, err error) { defer mon.Task()(&ctx)(&err) - query := db.db.Rebind(`SELECT + query := db.db.Rebind(`SELECT txs.id, txs.user_id, txs.address, @@ -307,7 +639,7 @@ func (db *coinPaymentsTransactions) ListUnapplied(ctx context.Context, offset in txs.status, txs.key, txs.created_at - FROM coinpayments_transactions as txs + FROM coinpayments_transactions as txs INNER JOIN stripecoinpayments_apply_balance_intents as ints ON txs.id = ints.tx_id WHERE txs.status >= ? @@ -318,6 +650,9 @@ func (db *coinPaymentsTransactions) ListUnapplied(ctx context.Context, offset in rows, err := db.db.QueryContext(ctx, query, coinpayments.StatusReceived, before, applyBalanceIntentStateUnapplied, limit+1, offset) if err != nil { + if errCode := pgerrcode.FromError(err); errCode == pgxerrcode.UndefinedColumn { + return db.listUnappliedTransitionShim(ctx, offset, limit, before) + } return stripecoinpayments.TransactionsPage{}, err } defer func() { err = errs.Combine(err, rows.Close()) }() @@ -343,11 +678,114 @@ func (db *coinPaymentsTransactions) ListUnapplied(ctx context.Context, offset in amount, err := monetaryAmountFromGobEncodedBigFloat(amountB, currency) if err != nil { - return stripecoinpayments.TransactionsPage{}, errs.Wrap(err) + return stripecoinpayments.TransactionsPage{}, Error.New("amount column: %v", err) } received, err := monetaryAmountFromGobEncodedBigFloat(receivedB, currency) if err != nil { - return stripecoinpayments.TransactionsPage{}, errs.Wrap(err) + return stripecoinpayments.TransactionsPage{}, Error.New("received column: %v", err) + } + + page.Transactions = append(page.Transactions, + stripecoinpayments.Transaction{ + ID: coinpayments.TransactionID(id), + AccountID: userID, + Address: address, + Amount: amount, + Received: received, + Status: coinpayments.Status(status), + Key: key, + CreatedAt: createdAt, + }, + ) + } + + if err = rows.Err(); err != nil { + return stripecoinpayments.TransactionsPage{}, err + } + + if len(page.Transactions) == limit+1 { + page.Next = true + page.NextOffset = offset + int64(limit) + page.Transactions = page.Transactions[:len(page.Transactions)-1] + } + + return page, nil +} + +// listUnappliedTransitionShim returns TransactionsPage with a pending or +// completed status, that should be applied to account balance. +// +// It is to be used only during the transition from gob-encoded 'amount' and +// 'received' columns to 'amount_numeric'/'received_numeric'. During the +// transition, the gob-encoded columns will still exist but under a different +// name ('amount_gob'/'received_gob'). If the _numeric column value for a given +// row is non-null, it takes precedence over the corresponding _gob column. +// +// When the transition is complete, this method will go away and +// ListUnapplied() will operate only on the _numeric columns. +func (db *coinPaymentsTransactions) listUnappliedTransitionShim(ctx context.Context, offset int64, limit int, before time.Time) (_ stripecoinpayments.TransactionsPage, err error) { + defer mon.Task()(&ctx)(&err) + + query := db.db.Rebind(`SELECT + txs.id, + txs.user_id, + txs.address, + txs.amount_gob, + txs.amount_numeric, + txs.received_gob, + txs.received_numeric, + txs.status, + txs.key, + txs.created_at + FROM coinpayments_transactions as txs + INNER JOIN stripecoinpayments_apply_balance_intents as ints + ON txs.id = ints.tx_id + WHERE txs.status >= ? + AND txs.created_at <= ? + AND ints.state = ? + ORDER by txs.created_at DESC + LIMIT ? OFFSET ?`) + + rows, err := db.db.QueryContext(ctx, query, coinpayments.StatusReceived, before, applyBalanceIntentStateUnapplied, limit+1, offset) + if err != nil { + return stripecoinpayments.TransactionsPage{}, err + } + defer func() { err = errs.Combine(err, rows.Close()) }() + + var page stripecoinpayments.TransactionsPage + + for rows.Next() { + var id, address string + var userID uuid.UUID + var amountGob, receivedGob []byte + var amountNumeric, receivedNumeric *int64 + var status int + var key string + var createdAt time.Time + + err := rows.Scan(&id, &userID, &address, &amountGob, &amountNumeric, &receivedGob, &receivedNumeric, &status, &key, &createdAt) + if err != nil { + return stripecoinpayments.TransactionsPage{}, err + } + + var amount, received monetary.Amount + if amountNumeric == nil { + // 'amount' in this row hasn't yet been updated to a numeric value + amount, err = monetaryAmountFromGobEncodedBigFloat(amountGob, monetary.StorjToken) + if err != nil { + return stripecoinpayments.TransactionsPage{}, Error.Wrap(err) + } + } else { + amount = monetary.AmountFromBaseUnits(*amountNumeric, monetary.StorjToken) + } + if receivedNumeric == nil { + // 'received' in this row hasn't yet been updated to a numeric value + received, err = monetaryAmountFromGobEncodedBigFloat(receivedGob, monetary.StorjToken) + if err != nil { + return stripecoinpayments.TransactionsPage{}, Error.Wrap(err) + } + } else { + received = monetary.AmountFromBaseUnits(*receivedNumeric, monetary.StorjToken) } page.Transactions = append(page.Transactions, @@ -390,11 +828,11 @@ func fromDBXCoinpaymentsTransaction(dbxCPTX *dbx.CoinpaymentsTransaction) (*stri amount, err := monetaryAmountFromGobEncodedBigFloat(dbxCPTX.Amount, currency) if err != nil { - return nil, errs.Wrap(err) + return nil, Error.New("amount column: %v", err) } received, err := monetaryAmountFromGobEncodedBigFloat(dbxCPTX.Received, currency) if err != nil { - return nil, errs.Wrap(err) + return nil, Error.New("received column: %v", err) } return &stripecoinpayments.Transaction{ @@ -417,3 +855,24 @@ func monetaryAmountFromGobEncodedBigFloat(encoded []byte, currency *monetary.Cur } return monetary.AmountFromBigFloat(&bf, currency) } + +// DebugPerformBigFloatTransition performs the schema changes expected as part +// of Step 2 of the transition away from gob-encoded big.Float columns in the +// database. +// +// This is for testing purposes only, to ensure that no data is lost and that +// code still works after the transition. +func (db *coinPaymentsTransactions) DebugPerformBigFloatTransition(ctx context.Context) error { + _, err := db.db.DB.ExecContext(ctx, ` + ALTER TABLE coinpayments_transactions ALTER COLUMN amount DROP NOT NULL; + ALTER TABLE coinpayments_transactions ALTER COLUMN received DROP NOT NULL; + ALTER TABLE coinpayments_transactions RENAME COLUMN amount TO amount_gob; + ALTER TABLE coinpayments_transactions RENAME COLUMN received TO received_gob; + ALTER TABLE coinpayments_transactions ADD COLUMN amount_numeric INT8; + ALTER TABLE coinpayments_transactions ADD COLUMN received_numeric INT8; + ALTER TABLE stripecoinpayments_tx_conversion_rates ALTER COLUMN rate DROP NOT NULL; + ALTER TABLE stripecoinpayments_tx_conversion_rates RENAME COLUMN rate TO rate_gob; + ALTER TABLE stripecoinpayments_tx_conversion_rates ADD COLUMN rate_numeric NUMERIC(20, 8); + `) + return Error.Wrap(err) +} diff --git a/satellite/satellitedb/satellitedbtest/run.go b/satellite/satellitedb/satellitedbtest/run.go index 7215800d4..5ddabcb02 100644 --- a/satellite/satellitedb/satellitedbtest/run.go +++ b/satellite/satellitedb/satellitedbtest/run.go @@ -21,6 +21,7 @@ import ( "storj.io/private/dbutil/pgtest" "storj.io/private/dbutil/pgutil" "storj.io/private/dbutil/tempdb" + "storj.io/private/tagsql" "storj.io/storj/satellite" "storj.io/storj/satellite/metabase" "storj.io/storj/satellite/satellitedb" @@ -105,6 +106,12 @@ func (db *tempMasterDB) Close() error { return errs.Combine(db.DB.Close(), db.tempDB.Close()) } +// DebugGetDBHandle exposes a handle to the raw database object. This is intended +// only for testing purposes and is temporary. +func (db *tempMasterDB) DebugGetDBHandle() tagsql.DB { + return db.tempDB.DB +} + // CreateMasterDB creates a new satellite database for testing. func CreateMasterDB(ctx context.Context, log *zap.Logger, name string, category string, index int, dbInfo Database) (db satellite.DB, err error) { if dbInfo.URL == "" {