From c053bdbd703f2ee10350358ad350cb20db0a53fc Mon Sep 17 00:00:00 2001 From: paul cannon Date: Tue, 10 Aug 2021 17:30:23 -0500 Subject: [PATCH] satellite/satellitedb: prepare to remove big.Float from db Why: big.Float is not an ideal type for dealing with monetary amounts, because no matter how high the precision, some non-integer decimal values can not be represented exactly in base-2 floating point. Also, storing gob-encoded big.Float values in the database makes it very hard to use those values in meaningful queries, making it difficult to do any sort of analysis on billing. Now that we have amounts represented using monetary.Amount, we can simply store them in the database using integers (as given by the .BaseUnits() method on monetary.Amount). We should move toward storing the currency along with any monetary amount, wherever we are storing amounts, because satellites might want to deal with currencies other than STORJ and USD. Even better, it becomes much clearer what currency each monetary value is _supposed_ to be in (I had to dig through code to find that out for our current monetary columns). Deployment ---------- Getting rid of the big.Float columns will take multiple deployment steps. There does not seem to be any way to make the change in a way that lets existing queries continue to work on CockroachDB (it could be done with rules and triggers and a stored procedure that knows how to gob-decode big.Float objects, but CockroachDB doesn't have rules _or_ triggers _or_ stored procedures). Instead, in this first step, we make no changes to the database schema, but add code that knows how to deal with the planned changes to the schema when they are made in a future "step 2" deployment. All functions that deal with the coinbase_transactions table have been taught to recognize the "undefined column" error, and when it is seen, to call a separate "transition shim" function to accomplish the task. Once all the services are running this code, and the step 2 deployment makes breaking changes to the schema, any services that are still running and connected to the database will keep working correctly because of the fallback code included here. The step 2 deployment can be made without these transition shims included, because it will apply the database schema changes before any of its code runs. Step 1: No schema changes; just include code that recognizes the "undefined column" error when dealing with the coinbase_transactions or stripecoinpayments_tx_conversion_rates tables, and if found, assumes that the column changes from Step 2 have already been made. Step 2: In coinbase_transactions: * change the names of the 'amount' and 'received' columns to 'amount_gob' and 'received_gob' respectively * add new 'amount_numeric' and 'received_numeric' columns with INT8 type. In stripecoinpayments_tx_conversion_rates: * change the name of the 'rate' column to 'rate_gob' * add new 'rate_numeric' column with NUMERIC(8, 8) type Code reading from either of these tables must query both the X_gob and X_numeric columns. If X_numeric is not null, its value should be used; otherwise, the gob-encoded big.Float in X_gob should be used. A chore might be included in this step that transitions values from X_gob to X_numeric a few rows at a time. Step 3: Once all prod satellites have no values left in the _gob columns, we can drop those columns and add NOT NULL constraints to the _numeric columns. Change-Id: Id6db304b404e6fde44f5a8c23cdaeeaaa2324f20 --- .../payments/stripecoinpayments/tokens.go | 4 +- .../stripecoinpayments/transactions.go | 2 +- .../stripecoinpayments/transactions_test.go | 225 +++++++- satellite/satellitedb/coinpaymentstxs.go | 503 +++++++++++++++++- satellite/satellitedb/satellitedbtest/run.go | 7 + 5 files changed, 710 insertions(+), 31 deletions(-) diff --git a/satellite/payments/stripecoinpayments/tokens.go b/satellite/payments/stripecoinpayments/tokens.go index 270b84f37..55a917d35 100644 --- a/satellite/payments/stripecoinpayments/tokens.go +++ b/satellite/payments/stripecoinpayments/tokens.go @@ -89,7 +89,7 @@ func (tokens *storjTokens) Deposit(ctx context.Context, userID uuid.UUID, amount return nil, Error.Wrap(err) } - cpTX, err := tokens.service.db.Transactions().Insert(ctx, + createTime, err := tokens.service.db.Transactions().Insert(ctx, Transaction{ ID: tx.ID, AccountID: userID, @@ -112,7 +112,7 @@ func (tokens *storjTokens) Deposit(ctx context.Context, userID uuid.UUID, amount Status: payments.TransactionStatusPending, Timeout: tx.Timeout, Link: tx.CheckoutURL, - CreatedAt: cpTX.CreatedAt, + CreatedAt: createTime, }, nil } diff --git a/satellite/payments/stripecoinpayments/transactions.go b/satellite/payments/stripecoinpayments/transactions.go index 0478b521f..6a9cedd35 100644 --- a/satellite/payments/stripecoinpayments/transactions.go +++ b/satellite/payments/stripecoinpayments/transactions.go @@ -24,7 +24,7 @@ var ErrTransactionConsumed = errs.New("error transaction already consumed") // architecture: Database type TransactionsDB interface { // Insert inserts new coinpayments transaction into DB. - Insert(ctx context.Context, tx Transaction) (*Transaction, error) + Insert(ctx context.Context, tx Transaction) (time.Time, error) // Update updates status and received for set of transactions. Update(ctx context.Context, updates []TransactionUpdate, applies coinpayments.TransactionIDList) error // Consume marks transaction as consumed, so it won't participate in apply account balance loop. diff --git a/satellite/payments/stripecoinpayments/transactions_test.go b/satellite/payments/stripecoinpayments/transactions_test.go index f4e58bed8..fdccd0907 100644 --- a/satellite/payments/stripecoinpayments/transactions_test.go +++ b/satellite/payments/stripecoinpayments/transactions_test.go @@ -4,6 +4,7 @@ package stripecoinpayments_test import ( + "context" "encoding/base64" "errors" "sync" @@ -51,9 +52,9 @@ func TestTransactionsDB(t *testing.T) { } t.Run("insert", func(t *testing.T) { - tx, err := transactions.Insert(ctx, createTx) + createdAt, err := transactions.Insert(ctx, createTx) require.NoError(t, err) - requireSaneTimestamp(t, tx.CreatedAt) + requireSaneTimestamp(t, createdAt) txs, err := transactions.ListAccount(ctx, userID) require.NoError(t, err) require.Len(t, txs, 1) @@ -139,7 +140,7 @@ func TestConcurrentConsume(t *testing.T) { userID := testrand.UUID() txID := coinpayments.TransactionID("testID") - tx, err := transactions.Insert(ctx, + _, err = transactions.Insert(ctx, stripecoinpayments.Transaction{ ID: txID, AccountID: userID, @@ -155,12 +156,12 @@ func TestConcurrentConsume(t *testing.T) { err = transactions.Update(ctx, []stripecoinpayments.TransactionUpdate{{ - TransactionID: tx.ID, + TransactionID: txID, Status: coinpayments.StatusCompleted, Received: received, }}, coinpayments.TransactionIDList{ - tx.ID, + txID, }, ) require.NoError(t, err) @@ -178,7 +179,7 @@ func TestConcurrentConsume(t *testing.T) { var group errs2.Group for i := 0; i < concurrentTries; i++ { group.Go(func() error { - err := transactions.Consume(ctx, tx.ID) + err := transactions.Consume(ctx, txID) if err == nil { return nil @@ -422,3 +423,215 @@ func TestTransactions_ApplyTransactionBalance(t *testing.T) { require.EqualValues(t, "0.2", cbt.Metadata["storj_usd_rate"]) }) } + +func TestDatabaseBigFloatTransition(t *testing.T) { + satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { + transactions := db.StripeCoinPayments().Transactions() + + accountID := testrand.UUID() + amount, err := monetary.AmountFromString("100000000.00000001", monetary.StorjToken) + require.NoError(t, err) + received, err := monetary.AmountFromString("0.00000002", monetary.StorjToken) + require.NoError(t, err) + rate1, err := decimal.NewFromString("1001.23456789") + require.NoError(t, err) + rate2, err := decimal.NewFromString("3.14159265") + require.NoError(t, err) + + tx1 := stripecoinpayments.Transaction{ + ID: "abcdefg", + AccountID: accountID, + Address: "0xbeedeebeedeebeedeebeedeebeedeebeedeebeed", + Amount: amount, + Received: received, + Status: coinpayments.StatusPending, + Key: "beep beep im a sheep", + Timeout: time.Hour * 48, + } + tx2 := stripecoinpayments.Transaction{ + ID: "hijklmn", + AccountID: accountID, + Address: "0x77687927642075206576656e20626f746865723f", + Amount: amount, + Received: received, + Status: coinpayments.StatusPending, + Key: "how now im a cow", + Timeout: 0, + } + + // perform an Insert on old schema + { + createdAt, err := transactions.Insert(ctx, tx1) + require.NoError(t, err) + requireSaneTimestamp(t, createdAt) + tx1.CreatedAt = createdAt + } + + // perform an Update on old schema + { + newReceived, err := monetary.AmountFromString("0.12345678", monetary.StorjToken) + require.NoError(t, err) + upd := stripecoinpayments.TransactionUpdate{ + TransactionID: tx1.ID, + Status: coinpayments.StatusReceived, + Received: newReceived, + } + err = transactions.Update(ctx, []stripecoinpayments.TransactionUpdate{upd}, coinpayments.TransactionIDList{tx1.ID}) + require.NoError(t, err) + tx1.Status = coinpayments.StatusReceived + tx1.Received = newReceived + } + + // perform a ListAccount on old schema + { + accountTxs, err := transactions.ListAccount(ctx, accountID) + require.NoError(t, err) + require.Len(t, accountTxs, 1) + require.Equal(t, tx1, accountTxs[0]) + } + + // perform a ListPending on old schema + { + pendingTxs, err := transactions.ListPending(ctx, 0, 10, time.Now().UTC()) + require.NoError(t, err) + require.Len(t, pendingTxs.Transactions, 1) + // ListPending doesn't get the timeout column, so set it manually in order to check equality of other fields + pendingTxs.Transactions[0].Timeout = tx1.Timeout + require.Equal(t, tx1, pendingTxs.Transactions[0]) + require.False(t, pendingTxs.Next) + require.Equal(t, int64(0), pendingTxs.NextOffset) + } + + // perform a ListUnapplied on old schema + { + unappliedTxs, err := transactions.ListUnapplied(ctx, 0, 10, time.Now().UTC()) + require.NoError(t, err) + require.Len(t, unappliedTxs.Transactions, 1) + // ListUnapplied doesn't get the timeout column, so set it manually in order to check equality of other fields + unappliedTxs.Transactions[0].Timeout = tx1.Timeout + require.Equal(t, tx1, unappliedTxs.Transactions[0]) + require.False(t, unappliedTxs.Next) + require.Equal(t, int64(0), unappliedTxs.NextOffset) + } + + // perform a LockRate on old schema + { + err = transactions.LockRate(ctx, tx1.ID, rate1) + require.NoError(t, err) + } + + // perform a GetLockedRate on old schema + { + gotRate, err := transactions.GetLockedRate(ctx, tx1.ID) + require.NoError(t, err) + require.Equal(t, rate1, gotRate) + } + + // do schema update + { + transitionDB := transactions.(interface { + DebugPerformBigFloatTransition(ctx context.Context) error + }) + err = transitionDB.DebugPerformBigFloatTransition(ctx) + require.NoError(t, err) + } + + // perform an Insert on new schema + { + createdAt, err := transactions.Insert(ctx, tx2) + require.NoError(t, err) + requireSaneTimestamp(t, createdAt) + tx2.CreatedAt = createdAt + } + + // perform a ListAccount on new schema, while tx1 has a received_gob + { + accountTxs, err := transactions.ListAccount(ctx, accountID) + require.NoError(t, err) + require.Len(t, accountTxs, 2) + // results should be ordered in reverse order of creation + require.Equal(t, tx2, accountTxs[0]) + require.Equal(t, tx1, accountTxs[1]) + } + + // perform an Update on new schema, on a row with a received_gob + { + newReceived, err := monetary.AmountFromString("1.11111111", monetary.StorjToken) + require.NoError(t, err) + upd := stripecoinpayments.TransactionUpdate{ + TransactionID: tx1.ID, + Status: coinpayments.StatusPending, + Received: newReceived, + } + err = transactions.Update(ctx, []stripecoinpayments.TransactionUpdate{upd}, nil) + require.NoError(t, err) + tx1.Status = coinpayments.StatusPending + tx1.Received = newReceived + } + + // perform an Update on new schema, on a row with a received_numeric + { + newReceived, err := monetary.AmountFromString("2.12121212", monetary.StorjToken) + require.NoError(t, err) + upd := stripecoinpayments.TransactionUpdate{ + TransactionID: tx2.ID, + Status: coinpayments.StatusCompleted, + Received: newReceived, + } + err = transactions.Update(ctx, []stripecoinpayments.TransactionUpdate{upd}, coinpayments.TransactionIDList{tx2.ID}) + require.NoError(t, err) + tx2.Status = coinpayments.StatusCompleted + tx2.Received = newReceived + } + + // perform a ListAccount on new schema, after the above changes + { + accountTxs, err := transactions.ListAccount(ctx, accountID) + require.NoError(t, err) + require.Len(t, accountTxs, 2) + // results should be ordered in reverse order of creation + require.Equal(t, tx2, accountTxs[0]) + require.Equal(t, tx1, accountTxs[1]) + } + + // perform a ListPending on new schema + { + pendingTxs, err := transactions.ListPending(ctx, 0, 10, time.Now().UTC()) + require.NoError(t, err) + require.Len(t, pendingTxs.Transactions, 1) + // ListPending doesn't get the timeout column, so set it manually in order to check equality of other fields + pendingTxs.Transactions[0].Timeout = tx1.Timeout + require.Equal(t, tx1, pendingTxs.Transactions[0]) + require.False(t, pendingTxs.Next) + require.Equal(t, int64(0), pendingTxs.NextOffset) + } + + // perform a ListUnapplied on new schema + { + unappliedTxs, err := transactions.ListUnapplied(ctx, 0, 10, time.Now().UTC()) + require.NoError(t, err) + require.Len(t, unappliedTxs.Transactions, 1) + // ListUnapplied doesn't get the timeout column, so set it manually in order to check equality of other fields + unappliedTxs.Transactions[0].Timeout = tx2.Timeout + require.Equal(t, tx2, unappliedTxs.Transactions[0]) + require.False(t, unappliedTxs.Next) + require.Equal(t, int64(0), unappliedTxs.NextOffset) + } + + // perform a LockRate on new schema + { + err = transactions.LockRate(ctx, tx2.ID, rate2) + require.NoError(t, err) + } + + // perform a GetLockedRate on new schema + { + gotRate, err := transactions.GetLockedRate(ctx, tx1.ID) + require.NoError(t, err) + require.Equal(t, rate1, gotRate) + gotRate, err = transactions.GetLockedRate(ctx, tx2.ID) + require.NoError(t, err) + require.Equal(t, rate2, gotRate) + } + }) +} diff --git a/satellite/satellitedb/coinpaymentstxs.go b/satellite/satellitedb/coinpaymentstxs.go index 4a327bd68..69e04b94d 100644 --- a/satellite/satellitedb/coinpaymentstxs.go +++ b/satellite/satellitedb/coinpaymentstxs.go @@ -8,10 +8,12 @@ import ( "math/big" "time" + pgxerrcode "github.com/jackc/pgerrcode" "github.com/shopspring/decimal" "github.com/zeebo/errs" "storj.io/common/uuid" + "storj.io/private/dbutil/pgutil/pgerrcode" "storj.io/storj/satellite/payments/coinpayments" "storj.io/storj/satellite/payments/monetary" "storj.io/storj/satellite/payments/stripecoinpayments" @@ -44,16 +46,16 @@ type coinPaymentsTransactions struct { } // Insert inserts new coinpayments transaction into DB. -func (db *coinPaymentsTransactions) Insert(ctx context.Context, tx stripecoinpayments.Transaction) (_ *stripecoinpayments.Transaction, err error) { +func (db *coinPaymentsTransactions) Insert(ctx context.Context, tx stripecoinpayments.Transaction) (createTime time.Time, err error) { defer mon.Task()(&ctx)(&err) amount, err := tx.Amount.AsBigFloat().GobEncode() if err != nil { - return nil, errs.Wrap(err) + return time.Time{}, errs.Wrap(err) } received, err := tx.Received.AsBigFloat().GobEncode() if err != nil { - return nil, errs.Wrap(err) + return time.Time{}, errs.Wrap(err) } dbxCPTX, err := db.db.Create_CoinpaymentsTransaction(ctx, @@ -67,10 +69,34 @@ func (db *coinPaymentsTransactions) Insert(ctx context.Context, tx stripecoinpay dbx.CoinpaymentsTransaction_Timeout(int(tx.Timeout.Seconds())), ) if err != nil { - return nil, err + if errCode := pgerrcode.FromError(err); errCode == pgxerrcode.UndefinedColumn { + // TEMPORARY: fall back to expected new schema to facilitate transition + return db.insertTransitionShim(ctx, tx) + } + return time.Time{}, err } + return dbxCPTX.CreatedAt, nil +} - return fromDBXCoinpaymentsTransaction(dbxCPTX) +// insertTransitionShim inserts new coinpayments transaction into DB. +// +// It is to be used only during the transition from gob-encoded 'amount' and +// 'received' columns to 'amount_numeric'/'received_numeric'. +// +// When the transition is complete, this method will go away and Insert() +// will operate only on the _numeric columns. +func (db *coinPaymentsTransactions) insertTransitionShim(ctx context.Context, tx stripecoinpayments.Transaction) (createTime time.Time, err error) { + row := db.db.DB.QueryRowContext(ctx, db.db.Rebind(` + INSERT INTO coinpayments_transactions ( + id, user_id, address, amount_numeric, received_numeric, status, key, timeout, created_at + ) VALUES ( + ?, ?, ?, ?, ?, ?, ?, ?, now() + ) RETURNING created_at; + `), tx.ID.String(), tx.AccountID[:], tx.Address, tx.Amount.BaseUnits(), tx.Received.BaseUnits(), tx.Status.Int(), tx.Key, int(tx.Timeout.Seconds())) + if err := row.Scan(&createTime); err != nil { + return time.Time{}, Error.Wrap(err) + } + return createTime, nil } // Update updates status and received for set of transactions. @@ -81,9 +107,9 @@ func (db *coinPaymentsTransactions) Update(ctx context.Context, updates []stripe return nil } - return db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error { + err = db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error { for _, update := range updates { - received, err := update.Received.AsBigFloat().GobEncode() + receivedGob, err := update.Received.AsBigFloat().GobEncode() if err != nil { return errs.Wrap(err) } @@ -91,7 +117,7 @@ func (db *coinPaymentsTransactions) Update(ctx context.Context, updates []stripe _, err = tx.Update_CoinpaymentsTransaction_By_Id(ctx, dbx.CoinpaymentsTransaction_Id(update.TransactionID.String()), dbx.CoinpaymentsTransaction_Update_Fields{ - Received: dbx.CoinpaymentsTransaction_Received(received), + Received: dbx.CoinpaymentsTransaction_Received(receivedGob), Status: dbx.CoinpaymentsTransaction_Status(update.Status.Int()), }, ) @@ -111,19 +137,73 @@ func (db *coinPaymentsTransactions) Update(ctx context.Context, updates []stripe return nil }) + + if err != nil { + if errCode := pgerrcode.FromError(err); errCode == pgxerrcode.UndefinedColumn { + // TEMPORARY: fall back to expected new schema to facilitate transition + return db.updateTransitionShim(ctx, updates, applies) + } + } + return err +} + +// updateTransitionShim updates status and received for set of transactions. +// +// It is to be used only during the transition from gob-encoded 'amount' and +// 'received' columns to 'amount_numeric'/'received_numeric'. During the +// transition, the gob-encoded columns will still exist but under a different +// name ('amount_gob'/'received_gob'). If the _numeric column value for a given +// row is non-null, it takes precedence over the corresponding _gob column. +// +// When the transition is complete, this method will go away and +// Update() will operate only on the _numeric columns. +func (db *coinPaymentsTransactions) updateTransitionShim(ctx context.Context, updates []stripecoinpayments.TransactionUpdate, applies coinpayments.TransactionIDList) (err error) { + defer mon.Task()(&ctx)(&err) + + if len(updates) == 0 { + return nil + } + + return db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error { + for _, update := range updates { + query := db.db.Rebind(` + UPDATE coinpayments_transactions + SET + received_gob = NULL, + received_numeric = ?, + status = ? + WHERE id = ? + `) + _, err := tx.Tx.ExecContext(ctx, query, update.Received.BaseUnits(), update.Status.Int(), update.TransactionID.String()) + if err != nil { + return err + } + } + + for _, txID := range applies { + query := db.db.Rebind(`INSERT INTO stripecoinpayments_apply_balance_intents ( tx_id, state, created_at ) + VALUES ( ?, ?, ? ) ON CONFLICT DO NOTHING`) + _, err = tx.Tx.ExecContext(ctx, query, txID.String(), applyBalanceIntentStateUnapplied.Int(), db.db.Hooks.Now().UTC()) + if err != nil { + return err + } + } + + return nil + }) } // Consume marks transaction as consumed, so it won't participate in apply account balance loop. func (db *coinPaymentsTransactions) Consume(ctx context.Context, id coinpayments.TransactionID) (err error) { defer mon.Task()(&ctx)(&err) - query := db.db.Rebind(` + query := db.db.Rebind(` WITH intent AS ( - SELECT tx_id, state FROM stripecoinpayments_apply_balance_intents WHERE tx_id = ? + SELECT tx_id, state FROM stripecoinpayments_apply_balance_intents WHERE tx_id = ? ), updated AS ( UPDATE stripecoinpayments_apply_balance_intents AS ints - SET - state = ? + SET + state = ? FROM intent WHERE intent.tx_id = ints.tx_id AND ints.state = ? RETURNING 1 @@ -154,14 +234,39 @@ func (db *coinPaymentsTransactions) LockRate(ctx context.Context, id coinpayment buff, err := rate.BigFloat().GobEncode() if err != nil { - return errs.Wrap(err) + return Error.Wrap(err) } _, err = db.db.Create_StripecoinpaymentsTxConversionRate(ctx, dbx.StripecoinpaymentsTxConversionRate_TxId(id.String()), dbx.StripecoinpaymentsTxConversionRate_Rate(buff)) - return err + if err != nil { + if errCode := pgerrcode.FromError(err); errCode == pgxerrcode.UndefinedColumn { + // TEMPORARY: fall back to expected new schema to facilitate transition + return db.lockRateTransitionShim(ctx, id, rate) + } + } + return Error.Wrap(err) +} + +// lockRateTransitionShim locks conversion rate for transaction. +// +// It is to be used only during the transition from the gob-encoded 'rate' +// column to 'rate_numeric'. +// +// When the transition is complete, this method will go away and +// LockRate() will operate only on the _numeric column. +func (db *coinPaymentsTransactions) lockRateTransitionShim(ctx context.Context, id coinpayments.TransactionID, rate decimal.Decimal) (err error) { + defer mon.Task()(&ctx)(&err) + + now := time.Now().UTC() + query := db.db.Rebind(` + INSERT INTO stripecoinpayments_tx_conversion_rates ( tx_id, rate_numeric, created_at ) VALUES ( ?, ?, ? ) + `) + + _, err = db.db.DB.ExecContext(ctx, query, id.String(), rate.String(), now) + return Error.Wrap(err) } // GetLockedRate returns locked conversion rate for transaction or error if non exists. @@ -172,6 +277,10 @@ func (db *coinPaymentsTransactions) GetLockedRate(ctx context.Context, id coinpa dbx.StripecoinpaymentsTxConversionRate_TxId(id.String()), ) if err != nil { + if errCode := pgerrcode.FromError(err); errCode == pgxerrcode.UndefinedColumn { + // TEMPORARY: fall back to expected new schema to facilitate transition + return db.getLockedRateTransitionShim(ctx, id) + } return decimal.Decimal{}, err } @@ -187,6 +296,45 @@ func (db *coinPaymentsTransactions) GetLockedRate(ctx context.Context, id coinpa return rate, nil } +// getLockedRateTransitionShim returns locked conversion rate for transaction +// or error if none exists. +// +// It is to be used only during the transition from the gob-encoded 'rate' +// column to 'rate_numeric'. During the transition, the gob-encoded column will +// still exist but under a different name ('rate_gob'). If rate_numeric for a +// given row is non-null, it takes precedence over rate_gob. +// +// When the transition is complete, this method will go away and +// GetLockedRate() will operate only on rate_numeric. +func (db *coinPaymentsTransactions) getLockedRateTransitionShim(ctx context.Context, id coinpayments.TransactionID) (_ decimal.Decimal, err error) { + defer mon.Task()(&ctx)(&err) + + var rateGob []byte + var rateNumeric *string + query := db.db.Rebind(` + SELECT rate_gob, rate_numeric + FROM stripecoinpayments_tx_conversion_rates + WHERE tx_id = ? + `) + row := db.db.DB.QueryRowContext(ctx, query, id.String()) + err = row.Scan(&rateGob, &rateNumeric) + if err != nil { + return decimal.Decimal{}, Error.Wrap(err) + } + + if rateNumeric == nil { + // This row does not have a numeric rate value yet + var rateF big.Float + if err = rateF.GobDecode(rateGob); err != nil { + return decimal.Decimal{}, Error.Wrap(err) + } + rate, err := monetary.DecimalFromBigFloat(&rateF) + return rate, Error.Wrap(err) + } + rate, err := decimal.NewFromString(*rateNumeric) + return rate, Error.Wrap(err) +} + // ListAccount returns all transaction for specific user. func (db *coinPaymentsTransactions) ListAccount(ctx context.Context, userID uuid.UUID) (_ []stripecoinpayments.Transaction, err error) { defer mon.Task()(&ctx)(&err) @@ -195,6 +343,10 @@ func (db *coinPaymentsTransactions) ListAccount(ctx context.Context, userID uuid dbx.CoinpaymentsTransaction_UserId(userID[:]), ) if err != nil { + if errCode := pgerrcode.FromError(err); errCode == pgxerrcode.UndefinedColumn { + // TEMPORARY: fall back to expected new schema to facilitate transition + return db.listAccountTransitionShim(ctx, userID) + } return nil, err } @@ -211,11 +363,86 @@ func (db *coinPaymentsTransactions) ListAccount(ctx context.Context, userID uuid return txs, nil } +// listAccountTransitionShim returns all transaction for specific user. +// +// It is to be used only during the transition from gob-encoded 'amount' and +// 'received' columns to 'amount_numeric'/'received_numeric'. During the +// transition, the gob-encoded columns will still exist but under a different +// name ('amount_gob'/'received_gob'). If the _numeric column value for a given +// row is non-null, it takes precedence over the corresponding _gob column. +// +// When the transition is complete, this method will go away and ListAccount() +// will operate only on the _numeric columns. +func (db *coinPaymentsTransactions) listAccountTransitionShim(ctx context.Context, userID uuid.UUID) (_ []stripecoinpayments.Transaction, err error) { + defer mon.Task()(&ctx)(&err) + + query := db.db.Rebind(` + SELECT + id, + user_id, + address, + amount_gob, + amount_numeric, + received_gob, + received_numeric, + status, + key, + timeout, + created_at + FROM coinpayments_transactions + WHERE user_id = ? + ORDER BY created_at DESC + `) + rows, err := db.db.DB.QueryContext(ctx, query, userID[:]) + if err != nil { + return nil, err + } + + defer func() { err = errs.Combine(err, rows.Close()) }() + + var txs []stripecoinpayments.Transaction + for rows.Next() { + var tx stripecoinpayments.Transaction + var amountGob, receivedGob []byte + var amountNumeric, receivedNumeric *int64 + var timeoutSeconds int + err := rows.Scan(&tx.ID, &tx.AccountID, &tx.Address, &amountGob, &amountNumeric, &receivedGob, &receivedNumeric, &tx.Status, &tx.Key, &timeoutSeconds, &tx.CreatedAt) + if err != nil { + return nil, Error.Wrap(err) + } + tx.Timeout = time.Second * time.Duration(timeoutSeconds) + + if amountNumeric == nil { + tx.Amount, err = monetaryAmountFromGobEncodedBigFloat(amountGob, monetary.StorjToken) + if err != nil { + return nil, Error.New("amount column: %v", err) + } + } else { + tx.Amount = monetary.AmountFromBaseUnits(*amountNumeric, monetary.StorjToken) + } + if receivedNumeric == nil { + tx.Received, err = monetaryAmountFromGobEncodedBigFloat(receivedGob, monetary.StorjToken) + if err != nil { + return nil, Error.New("received column: %v", err) + } + } else { + tx.Received = monetary.AmountFromBaseUnits(*receivedNumeric, monetary.StorjToken) + } + txs = append(txs, tx) + } + + if err = rows.Err(); err != nil { + return nil, err + } + + return txs, nil +} + // ListPending returns paginated list of pending transactions. func (db *coinPaymentsTransactions) ListPending(ctx context.Context, offset int64, limit int, before time.Time) (_ stripecoinpayments.TransactionsPage, err error) { defer mon.Task()(&ctx)(&err) - query := db.db.Rebind(`SELECT + query := db.db.Rebind(`SELECT id, user_id, address, @@ -224,7 +451,7 @@ func (db *coinPaymentsTransactions) ListPending(ctx context.Context, offset int6 status, key, created_at - FROM coinpayments_transactions + FROM coinpayments_transactions WHERE status IN (?,?) AND created_at <= ? ORDER by created_at DESC @@ -232,6 +459,10 @@ func (db *coinPaymentsTransactions) ListPending(ctx context.Context, offset int6 rows, err := db.db.QueryContext(ctx, query, coinpayments.StatusPending, coinpayments.StatusReceived, before, limit+1, offset) if err != nil { + if errCode := pgerrcode.FromError(err); errCode == pgxerrcode.UndefinedColumn { + // TEMPORARY: fall back to expected new schema to facilitate transition + return db.listPendingTransitionShim(ctx, offset, limit, before) + } return stripecoinpayments.TransactionsPage{}, err } @@ -294,11 +525,112 @@ func (db *coinPaymentsTransactions) ListPending(ctx context.Context, offset int6 return page, nil } +// listPendingTransitionShim returns paginated list of pending transactions. +// +// It is to be used only during the transition from gob-encoded 'amount' and +// 'received' columns to 'amount_numeric'/'received_numeric'. During the +// transition, the gob-encoded columns will still exist but under a different +// name ('amount_gob'/'received_gob'). If the _numeric column value for a given +// row is non-null, it takes precedence over the corresponding _gob column. +// +// When the transition is complete, this method will go away and ListPending() +// will operate only on the _numeric columns. +func (db *coinPaymentsTransactions) listPendingTransitionShim(ctx context.Context, offset int64, limit int, before time.Time) (_ stripecoinpayments.TransactionsPage, err error) { + defer mon.Task()(&ctx)(&err) + + query := db.db.Rebind(`SELECT + id, + user_id, + address, + amount_gob, + amount_numeric, + received_gob, + received_numeric, + status, + key, + created_at + FROM coinpayments_transactions + WHERE status IN (?,?) + AND created_at <= ? + LIMIT ? OFFSET ?`) + + rows, err := db.db.QueryContext(ctx, query, coinpayments.StatusPending, coinpayments.StatusReceived, before, limit+1, offset) + if err != nil { + return stripecoinpayments.TransactionsPage{}, Error.Wrap(err) + } + + defer func() { + err = errs.Combine(err, rows.Close()) + }() + + var page stripecoinpayments.TransactionsPage + + for rows.Next() { + var id, address string + var userID uuid.UUID + var amountGob, receivedGob []byte + var amountNumeric, receivedNumeric *int64 + var amount, received monetary.Amount + var status int + var key string + var createdAt time.Time + + err := rows.Scan(&id, &userID, &address, &amountGob, &amountNumeric, &receivedGob, &receivedNumeric, &status, &key, &createdAt) + if err != nil { + return stripecoinpayments.TransactionsPage{}, err + } + + if amountNumeric == nil { + // 'amount' in this row hasn't yet been updated to a numeric value + amount, err = monetaryAmountFromGobEncodedBigFloat(amountGob, monetary.StorjToken) + if err != nil { + return stripecoinpayments.TransactionsPage{}, Error.Wrap(err) + } + } else { + amount = monetary.AmountFromBaseUnits(*amountNumeric, monetary.StorjToken) + } + if receivedNumeric == nil { + // 'received' in this row hasn't yet been updated to a numeric value + received, err = monetaryAmountFromGobEncodedBigFloat(receivedGob, monetary.StorjToken) + if err != nil { + return stripecoinpayments.TransactionsPage{}, Error.Wrap(err) + } + } else { + received = monetary.AmountFromBaseUnits(*receivedNumeric, monetary.StorjToken) + } + + page.Transactions = append(page.Transactions, + stripecoinpayments.Transaction{ + ID: coinpayments.TransactionID(id), + AccountID: userID, + Address: address, + Amount: amount, + Received: received, + Status: coinpayments.Status(status), + Key: key, + CreatedAt: createdAt, + }, + ) + } + + if err = rows.Err(); err != nil { + return stripecoinpayments.TransactionsPage{}, err + } + + if len(page.Transactions) == limit+1 { + page.Next = true + page.NextOffset = offset + int64(limit) + page.Transactions = page.Transactions[:len(page.Transactions)-1] + } + + return page, nil +} + // ListUnapplied returns TransactionsPage with a pending or completed status, that should be applied to account balance. func (db *coinPaymentsTransactions) ListUnapplied(ctx context.Context, offset int64, limit int, before time.Time) (_ stripecoinpayments.TransactionsPage, err error) { defer mon.Task()(&ctx)(&err) - query := db.db.Rebind(`SELECT + query := db.db.Rebind(`SELECT txs.id, txs.user_id, txs.address, @@ -307,7 +639,7 @@ func (db *coinPaymentsTransactions) ListUnapplied(ctx context.Context, offset in txs.status, txs.key, txs.created_at - FROM coinpayments_transactions as txs + FROM coinpayments_transactions as txs INNER JOIN stripecoinpayments_apply_balance_intents as ints ON txs.id = ints.tx_id WHERE txs.status >= ? @@ -318,6 +650,9 @@ func (db *coinPaymentsTransactions) ListUnapplied(ctx context.Context, offset in rows, err := db.db.QueryContext(ctx, query, coinpayments.StatusReceived, before, applyBalanceIntentStateUnapplied, limit+1, offset) if err != nil { + if errCode := pgerrcode.FromError(err); errCode == pgxerrcode.UndefinedColumn { + return db.listUnappliedTransitionShim(ctx, offset, limit, before) + } return stripecoinpayments.TransactionsPage{}, err } defer func() { err = errs.Combine(err, rows.Close()) }() @@ -343,11 +678,114 @@ func (db *coinPaymentsTransactions) ListUnapplied(ctx context.Context, offset in amount, err := monetaryAmountFromGobEncodedBigFloat(amountB, currency) if err != nil { - return stripecoinpayments.TransactionsPage{}, errs.Wrap(err) + return stripecoinpayments.TransactionsPage{}, Error.New("amount column: %v", err) } received, err := monetaryAmountFromGobEncodedBigFloat(receivedB, currency) if err != nil { - return stripecoinpayments.TransactionsPage{}, errs.Wrap(err) + return stripecoinpayments.TransactionsPage{}, Error.New("received column: %v", err) + } + + page.Transactions = append(page.Transactions, + stripecoinpayments.Transaction{ + ID: coinpayments.TransactionID(id), + AccountID: userID, + Address: address, + Amount: amount, + Received: received, + Status: coinpayments.Status(status), + Key: key, + CreatedAt: createdAt, + }, + ) + } + + if err = rows.Err(); err != nil { + return stripecoinpayments.TransactionsPage{}, err + } + + if len(page.Transactions) == limit+1 { + page.Next = true + page.NextOffset = offset + int64(limit) + page.Transactions = page.Transactions[:len(page.Transactions)-1] + } + + return page, nil +} + +// listUnappliedTransitionShim returns TransactionsPage with a pending or +// completed status, that should be applied to account balance. +// +// It is to be used only during the transition from gob-encoded 'amount' and +// 'received' columns to 'amount_numeric'/'received_numeric'. During the +// transition, the gob-encoded columns will still exist but under a different +// name ('amount_gob'/'received_gob'). If the _numeric column value for a given +// row is non-null, it takes precedence over the corresponding _gob column. +// +// When the transition is complete, this method will go away and +// ListUnapplied() will operate only on the _numeric columns. +func (db *coinPaymentsTransactions) listUnappliedTransitionShim(ctx context.Context, offset int64, limit int, before time.Time) (_ stripecoinpayments.TransactionsPage, err error) { + defer mon.Task()(&ctx)(&err) + + query := db.db.Rebind(`SELECT + txs.id, + txs.user_id, + txs.address, + txs.amount_gob, + txs.amount_numeric, + txs.received_gob, + txs.received_numeric, + txs.status, + txs.key, + txs.created_at + FROM coinpayments_transactions as txs + INNER JOIN stripecoinpayments_apply_balance_intents as ints + ON txs.id = ints.tx_id + WHERE txs.status >= ? + AND txs.created_at <= ? + AND ints.state = ? + ORDER by txs.created_at DESC + LIMIT ? OFFSET ?`) + + rows, err := db.db.QueryContext(ctx, query, coinpayments.StatusReceived, before, applyBalanceIntentStateUnapplied, limit+1, offset) + if err != nil { + return stripecoinpayments.TransactionsPage{}, err + } + defer func() { err = errs.Combine(err, rows.Close()) }() + + var page stripecoinpayments.TransactionsPage + + for rows.Next() { + var id, address string + var userID uuid.UUID + var amountGob, receivedGob []byte + var amountNumeric, receivedNumeric *int64 + var status int + var key string + var createdAt time.Time + + err := rows.Scan(&id, &userID, &address, &amountGob, &amountNumeric, &receivedGob, &receivedNumeric, &status, &key, &createdAt) + if err != nil { + return stripecoinpayments.TransactionsPage{}, err + } + + var amount, received monetary.Amount + if amountNumeric == nil { + // 'amount' in this row hasn't yet been updated to a numeric value + amount, err = monetaryAmountFromGobEncodedBigFloat(amountGob, monetary.StorjToken) + if err != nil { + return stripecoinpayments.TransactionsPage{}, Error.Wrap(err) + } + } else { + amount = monetary.AmountFromBaseUnits(*amountNumeric, monetary.StorjToken) + } + if receivedNumeric == nil { + // 'received' in this row hasn't yet been updated to a numeric value + received, err = monetaryAmountFromGobEncodedBigFloat(receivedGob, monetary.StorjToken) + if err != nil { + return stripecoinpayments.TransactionsPage{}, Error.Wrap(err) + } + } else { + received = monetary.AmountFromBaseUnits(*receivedNumeric, monetary.StorjToken) } page.Transactions = append(page.Transactions, @@ -390,11 +828,11 @@ func fromDBXCoinpaymentsTransaction(dbxCPTX *dbx.CoinpaymentsTransaction) (*stri amount, err := monetaryAmountFromGobEncodedBigFloat(dbxCPTX.Amount, currency) if err != nil { - return nil, errs.Wrap(err) + return nil, Error.New("amount column: %v", err) } received, err := monetaryAmountFromGobEncodedBigFloat(dbxCPTX.Received, currency) if err != nil { - return nil, errs.Wrap(err) + return nil, Error.New("received column: %v", err) } return &stripecoinpayments.Transaction{ @@ -417,3 +855,24 @@ func monetaryAmountFromGobEncodedBigFloat(encoded []byte, currency *monetary.Cur } return monetary.AmountFromBigFloat(&bf, currency) } + +// DebugPerformBigFloatTransition performs the schema changes expected as part +// of Step 2 of the transition away from gob-encoded big.Float columns in the +// database. +// +// This is for testing purposes only, to ensure that no data is lost and that +// code still works after the transition. +func (db *coinPaymentsTransactions) DebugPerformBigFloatTransition(ctx context.Context) error { + _, err := db.db.DB.ExecContext(ctx, ` + ALTER TABLE coinpayments_transactions ALTER COLUMN amount DROP NOT NULL; + ALTER TABLE coinpayments_transactions ALTER COLUMN received DROP NOT NULL; + ALTER TABLE coinpayments_transactions RENAME COLUMN amount TO amount_gob; + ALTER TABLE coinpayments_transactions RENAME COLUMN received TO received_gob; + ALTER TABLE coinpayments_transactions ADD COLUMN amount_numeric INT8; + ALTER TABLE coinpayments_transactions ADD COLUMN received_numeric INT8; + ALTER TABLE stripecoinpayments_tx_conversion_rates ALTER COLUMN rate DROP NOT NULL; + ALTER TABLE stripecoinpayments_tx_conversion_rates RENAME COLUMN rate TO rate_gob; + ALTER TABLE stripecoinpayments_tx_conversion_rates ADD COLUMN rate_numeric NUMERIC(20, 8); + `) + return Error.Wrap(err) +} diff --git a/satellite/satellitedb/satellitedbtest/run.go b/satellite/satellitedb/satellitedbtest/run.go index 7215800d4..5ddabcb02 100644 --- a/satellite/satellitedb/satellitedbtest/run.go +++ b/satellite/satellitedb/satellitedbtest/run.go @@ -21,6 +21,7 @@ import ( "storj.io/private/dbutil/pgtest" "storj.io/private/dbutil/pgutil" "storj.io/private/dbutil/tempdb" + "storj.io/private/tagsql" "storj.io/storj/satellite" "storj.io/storj/satellite/metabase" "storj.io/storj/satellite/satellitedb" @@ -105,6 +106,12 @@ func (db *tempMasterDB) Close() error { return errs.Combine(db.DB.Close(), db.tempDB.Close()) } +// DebugGetDBHandle exposes a handle to the raw database object. This is intended +// only for testing purposes and is temporary. +func (db *tempMasterDB) DebugGetDBHandle() tagsql.DB { + return db.tempDB.DB +} + // CreateMasterDB creates a new satellite database for testing. func CreateMasterDB(ctx context.Context, log *zap.Logger, name string, category string, index int, dbInfo Database) (db satellite.DB, err error) { if dbInfo.URL == "" {