storj/satellite/payments/stripecoinpayments/transactions_test.go
paul cannon c053bdbd70 satellite/satellitedb: prepare to remove big.Float from db
Why: big.Float is not an ideal type for dealing with monetary amounts,
because no matter how high the precision, some non-integer decimal
values can not be represented exactly in base-2 floating point. Also,
storing gob-encoded big.Float values in the database makes it very hard
to use those values in meaningful queries, making it difficult to do
any sort of analysis on billing.

Now that we have amounts represented using monetary.Amount, we can
simply store them in the database using integers (as given by the
.BaseUnits() method on monetary.Amount).

We should move toward storing the currency along with any monetary
amount, wherever we are storing amounts, because satellites might want
to deal with currencies other than STORJ and USD. Even better, it
becomes much clearer what currency each monetary value is _supposed_ to
be in (I had to dig through code to find that out for our current
monetary columns).

Deployment
----------

Getting rid of the big.Float columns will take multiple deployment
steps. There does not seem to be any way to make the change in a way
that lets existing queries continue to work on CockroachDB (it could be
done with rules and triggers and a stored procedure that knows how to
gob-decode big.Float objects, but CockroachDB doesn't have rules _or_
triggers _or_ stored procedures). Instead, in this first step, we make
no changes to the database schema, but add code that knows how to deal
with the planned changes to the schema when they are made in a future
"step 2" deployment. All functions that deal with the
coinbase_transactions table have been taught to recognize the "undefined
column" error, and when it is seen, to call a separate "transition shim"
function to accomplish the task. Once all the services are running this
code, and the step 2 deployment makes breaking changes to the schema,
any services that are still running and connected to the database will
keep working correctly because of the fallback code included here. The
step 2 deployment can be made without these transition shims included,
because it will apply the database schema changes before any of its code
runs.

Step 1:

    No schema changes; just include code that recognizes the
    "undefined column" error when dealing with the
    coinbase_transactions or stripecoinpayments_tx_conversion_rates
    tables, and if found, assumes that the column changes from Step
    2 have already been made.

Step 2:

    In coinbase_transactions:

     * change the names of the 'amount' and 'received' columns to
       'amount_gob' and 'received_gob' respectively
     * add new 'amount_numeric' and 'received_numeric' columns with
       INT8 type.

    In stripecoinpayments_tx_conversion_rates:

     * change the name of the 'rate' column to 'rate_gob'
     * add new 'rate_numeric' column with NUMERIC(8, 8) type

    Code reading from either of these tables must query both the X_gob
    and X_numeric columns. If X_numeric is not null, its value should
    be used; otherwise, the gob-encoded big.Float in X_gob should be
    used. A chore might be included in this step that transitions values
    from X_gob to X_numeric a few rows at a time.

Step 3:

    Once all prod satellites have no values left in the _gob columns, we
    can drop those columns and add NOT NULL constraints to the _numeric
    columns.

Change-Id: Id6db304b404e6fde44f5a8c23cdaeeaaa2324f20
2021-09-29 00:23:44 +00:00

638 lines
20 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package stripecoinpayments_test
import (
"context"
"encoding/base64"
"errors"
"sync"
"testing"
"time"
"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stripe/stripe-go/v72"
"github.com/zeebo/errs"
"storj.io/common/errs2"
"storj.io/common/memory"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/common/uuid"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
"storj.io/storj/satellite/payments/coinpayments"
"storj.io/storj/satellite/payments/monetary"
"storj.io/storj/satellite/payments/stripecoinpayments"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
)
func TestTransactionsDB(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
transactions := db.StripeCoinPayments().Transactions()
amount, err := monetary.AmountFromString("2.0000000000000000005", monetary.StorjToken)
require.NoError(t, err)
received, err := monetary.AmountFromString("1.0000000000000000003", monetary.StorjToken)
require.NoError(t, err)
userID := testrand.UUID()
createTx := stripecoinpayments.Transaction{
ID: "testID",
AccountID: userID,
Address: "testAddress",
Amount: amount,
Received: received,
Status: coinpayments.StatusPending,
Key: "testKey",
Timeout: time.Second * 60,
}
t.Run("insert", func(t *testing.T) {
createdAt, err := transactions.Insert(ctx, createTx)
require.NoError(t, err)
requireSaneTimestamp(t, createdAt)
txs, err := transactions.ListAccount(ctx, userID)
require.NoError(t, err)
require.Len(t, txs, 1)
compareTransactions(t, createTx, txs[0])
})
t.Run("update", func(t *testing.T) {
received, err := monetary.AmountFromString("6.0000000000000000001", monetary.StorjToken)
require.NoError(t, err)
update := stripecoinpayments.TransactionUpdate{
TransactionID: createTx.ID,
Status: coinpayments.StatusReceived,
Received: received,
}
err = transactions.Update(ctx, []stripecoinpayments.TransactionUpdate{update}, nil)
require.NoError(t, err)
page, err := transactions.ListPending(ctx, 0, 1, time.Now())
require.NoError(t, err)
require.Len(t, page.Transactions, 1)
assert.Equal(t, createTx.ID, page.Transactions[0].ID)
assert.Equal(t, update.Received, page.Transactions[0].Received)
assert.Equal(t, update.Status, page.Transactions[0].Status)
err = transactions.Update(ctx,
[]stripecoinpayments.TransactionUpdate{
{
TransactionID: createTx.ID,
Status: coinpayments.StatusCompleted,
Received: received,
},
},
coinpayments.TransactionIDList{
createTx.ID,
},
)
require.NoError(t, err)
page, err = transactions.ListUnapplied(ctx, 0, 1, time.Now())
require.NoError(t, err)
require.NotNil(t, page.Transactions)
require.Equal(t, 1, len(page.Transactions))
assert.Equal(t, createTx.ID, page.Transactions[0].ID)
assert.Equal(t, update.Received, page.Transactions[0].Received)
assert.Equal(t, coinpayments.StatusCompleted, page.Transactions[0].Status)
})
t.Run("consume", func(t *testing.T) {
err := transactions.Consume(ctx, createTx.ID)
require.NoError(t, err)
page, err := transactions.ListUnapplied(ctx, 0, 1, time.Now())
require.NoError(t, err)
assert.Nil(t, page.Transactions)
assert.Equal(t, 0, len(page.Transactions))
})
})
}
func requireSaneTimestamp(t *testing.T, when time.Time) {
// ensure time value is sane. I apologize to you people of the future when this starts breaking
require.Truef(t, when.After(time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC)),
"%s seems too small to be a valid creation timestamp", when)
require.Truef(t, when.Before(time.Date(2500, 1, 1, 0, 0, 0, 0, time.UTC)),
"%s seems too large to be a valid creation timestamp", when)
}
func TestConcurrentConsume(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
transactions := db.StripeCoinPayments().Transactions()
const concurrentTries = 30
amount, err := monetary.AmountFromString("2.0000000000000000005", monetary.StorjToken)
require.NoError(t, err)
received, err := monetary.AmountFromString("1.0000000000000000003", monetary.StorjToken)
require.NoError(t, err)
userID := testrand.UUID()
txID := coinpayments.TransactionID("testID")
_, err = transactions.Insert(ctx,
stripecoinpayments.Transaction{
ID: txID,
AccountID: userID,
Address: "testAddress",
Amount: amount,
Received: received,
Status: coinpayments.StatusPending,
Key: "testKey",
Timeout: time.Second * 60,
},
)
require.NoError(t, err)
err = transactions.Update(ctx,
[]stripecoinpayments.TransactionUpdate{{
TransactionID: txID,
Status: coinpayments.StatusCompleted,
Received: received,
}},
coinpayments.TransactionIDList{
txID,
},
)
require.NoError(t, err)
var errLock sync.Mutex
var alreadyConsumed []error
appendError := func(err error) {
defer errLock.Unlock()
errLock.Lock()
alreadyConsumed = append(alreadyConsumed, err)
}
var group errs2.Group
for i := 0; i < concurrentTries; i++ {
group.Go(func() error {
err := transactions.Consume(ctx, txID)
if err == nil {
return nil
}
if errors.Is(err, stripecoinpayments.ErrTransactionConsumed) {
appendError(err)
return nil
}
return err
})
}
require.NoError(t, errs.Combine(group.Wait()...))
require.Equal(t, concurrentTries-1, len(alreadyConsumed))
})
}
func TestTransactionsDBList(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
const (
limit = 5
transactionCount = limit * 4
)
// create transactions
amount, err := monetary.AmountFromString("4.0000000000000000005", monetary.StorjToken)
require.NoError(t, err)
received, err := monetary.AmountFromString("5.0000000000000000003", monetary.StorjToken)
require.NoError(t, err)
var txs []stripecoinpayments.Transaction
for i := 0; i < transactionCount; i++ {
id := base64.StdEncoding.EncodeToString(testrand.Bytes(4 * memory.B))
addr := base64.StdEncoding.EncodeToString(testrand.Bytes(4 * memory.B))
key := base64.StdEncoding.EncodeToString(testrand.Bytes(4 * memory.B))
status := coinpayments.StatusPending
if i%2 == 0 {
status = coinpayments.StatusReceived
}
createTX := stripecoinpayments.Transaction{
ID: coinpayments.TransactionID(id),
AccountID: uuid.UUID{},
Address: addr,
Amount: amount,
Received: received,
Status: status,
Key: key,
}
txs = append(txs, createTX)
}
t.Run("pending transactions", func(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
for _, tx := range txs {
_, err := db.StripeCoinPayments().Transactions().Insert(ctx, tx)
require.NoError(t, err)
}
page, err := db.StripeCoinPayments().Transactions().ListPending(ctx, 0, limit, time.Now())
require.NoError(t, err)
pendingTXs := page.Transactions
for page.Next {
page, err = db.StripeCoinPayments().Transactions().ListPending(ctx, page.NextOffset, limit, time.Now())
require.NoError(t, err)
pendingTXs = append(pendingTXs, page.Transactions...)
}
require.False(t, page.Next)
require.Equal(t, transactionCount, len(pendingTXs))
for _, act := range page.Transactions {
for _, exp := range txs {
if act.ID == exp.ID {
compareTransactions(t, exp, act)
}
}
}
})
})
t.Run("unapplied transaction", func(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
var updatedTxs []stripecoinpayments.Transaction
var updates []stripecoinpayments.TransactionUpdate
var applies coinpayments.TransactionIDList
for _, tx := range txs {
_, err := db.StripeCoinPayments().Transactions().Insert(ctx, tx)
require.NoError(t, err)
tx.Status = coinpayments.StatusCompleted
updates = append(updates,
stripecoinpayments.TransactionUpdate{
TransactionID: tx.ID,
Status: tx.Status,
Received: tx.Received,
},
)
applies = append(applies, tx.ID)
updatedTxs = append(updatedTxs, tx)
}
err := db.StripeCoinPayments().Transactions().Update(ctx, updates, applies)
require.NoError(t, err)
page, err := db.StripeCoinPayments().Transactions().ListUnapplied(ctx, 0, limit, time.Now())
require.NoError(t, err)
unappliedTXs := page.Transactions
for page.Next {
page, err = db.StripeCoinPayments().Transactions().ListUnapplied(ctx, page.NextOffset, limit, time.Now())
require.NoError(t, err)
unappliedTXs = append(unappliedTXs, page.Transactions...)
}
require.False(t, page.Next)
require.Equal(t, transactionCount, len(unappliedTXs))
for _, act := range page.Transactions {
for _, exp := range updatedTxs {
if act.ID == exp.ID {
compareTransactions(t, exp, act)
}
}
}
})
})
}
func TestTransactionsDBRates(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
transactions := db.StripeCoinPayments().Transactions()
val, err := decimal.NewFromString("4.000000000000000005")
require.NoError(t, err)
const txID = "tx_id"
err = transactions.LockRate(ctx, txID, val)
require.NoError(t, err)
rate, err := transactions.GetLockedRate(ctx, txID)
require.NoError(t, err)
assert.Equal(t, val, rate)
})
}
// compareTransactions is a helper method to compare tx used to create db entry,
// with the tx returned from the db. Method doesn't compare created at field, but
// ensures that is not empty.
func compareTransactions(t *testing.T, exp, act stripecoinpayments.Transaction) {
assert.Equal(t, exp.ID, act.ID)
assert.Equal(t, exp.AccountID, act.AccountID)
assert.Equal(t, exp.Address, act.Address)
assert.Equal(t, exp.Amount, act.Amount)
assert.Equal(t, exp.Received, act.Received)
assert.Equal(t, exp.Status, act.Status)
assert.Equal(t, exp.Key, act.Key)
assert.Equal(t, exp.Timeout, act.Timeout)
assert.False(t, act.CreatedAt.IsZero())
}
func TestTransactions_ApplyTransactionBalance(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
transactions := satellite.API.DB.StripeCoinPayments().Transactions()
userID := planet.Uplinks[0].Projects[0].Owner.ID
satellite.Core.Payments.Chore.TransactionCycle.Pause()
satellite.Core.Payments.Chore.AccountBalanceCycle.Pause()
// Emulate a deposit through CoinPayments.
txID := coinpayments.TransactionID("testID")
storjAmount, err := monetary.AmountFromString("100", monetary.StorjToken)
require.NoError(t, err)
storjUSDRate, err := decimal.NewFromString("0.2")
require.NoError(t, err)
createTx := stripecoinpayments.Transaction{
ID: txID,
AccountID: userID,
Address: "testAddress",
Amount: storjAmount,
Received: storjAmount,
Status: coinpayments.StatusPending,
Key: "testKey",
Timeout: time.Second * 60,
}
tx, err := transactions.Insert(ctx, createTx)
require.NoError(t, err)
require.NotNil(t, tx)
update := stripecoinpayments.TransactionUpdate{
TransactionID: createTx.ID,
Status: coinpayments.StatusReceived,
Received: storjAmount,
}
err = transactions.Update(ctx, []stripecoinpayments.TransactionUpdate{update}, coinpayments.TransactionIDList{createTx.ID})
require.NoError(t, err)
// Check that the CoinPayments transaction is waiting to be applied to the Stripe customer balance.
page, err := transactions.ListUnapplied(ctx, 0, 1, time.Now())
require.NoError(t, err)
require.Len(t, page.Transactions, 1)
err = transactions.LockRate(ctx, txID, storjUSDRate)
require.NoError(t, err)
// Trigger the AccountBalanceCycle. This calls Service.applyTransactionBalance()
satellite.Core.Payments.Chore.AccountBalanceCycle.TriggerWait()
cusID, err := satellite.API.DB.StripeCoinPayments().Customers().GetCustomerID(ctx, userID)
require.NoError(t, err)
// Check that the CoinPayments deposit is reflected in the Stripe customer balance.
it := satellite.API.Payments.Stripe.CustomerBalanceTransactions().List(&stripe.CustomerBalanceTransactionListParams{Customer: stripe.String(cusID)})
require.NoError(t, it.Err())
require.True(t, it.Next())
cbt := it.CustomerBalanceTransaction()
require.EqualValues(t, -2000, cbt.Amount)
require.EqualValues(t, txID, cbt.Metadata["txID"])
require.EqualValues(t, "100", cbt.Metadata["storj_amount"])
require.EqualValues(t, "0.2", cbt.Metadata["storj_usd_rate"])
})
}
func TestDatabaseBigFloatTransition(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
transactions := db.StripeCoinPayments().Transactions()
accountID := testrand.UUID()
amount, err := monetary.AmountFromString("100000000.00000001", monetary.StorjToken)
require.NoError(t, err)
received, err := monetary.AmountFromString("0.00000002", monetary.StorjToken)
require.NoError(t, err)
rate1, err := decimal.NewFromString("1001.23456789")
require.NoError(t, err)
rate2, err := decimal.NewFromString("3.14159265")
require.NoError(t, err)
tx1 := stripecoinpayments.Transaction{
ID: "abcdefg",
AccountID: accountID,
Address: "0xbeedeebeedeebeedeebeedeebeedeebeedeebeed",
Amount: amount,
Received: received,
Status: coinpayments.StatusPending,
Key: "beep beep im a sheep",
Timeout: time.Hour * 48,
}
tx2 := stripecoinpayments.Transaction{
ID: "hijklmn",
AccountID: accountID,
Address: "0x77687927642075206576656e20626f746865723f",
Amount: amount,
Received: received,
Status: coinpayments.StatusPending,
Key: "how now im a cow",
Timeout: 0,
}
// perform an Insert on old schema
{
createdAt, err := transactions.Insert(ctx, tx1)
require.NoError(t, err)
requireSaneTimestamp(t, createdAt)
tx1.CreatedAt = createdAt
}
// perform an Update on old schema
{
newReceived, err := monetary.AmountFromString("0.12345678", monetary.StorjToken)
require.NoError(t, err)
upd := stripecoinpayments.TransactionUpdate{
TransactionID: tx1.ID,
Status: coinpayments.StatusReceived,
Received: newReceived,
}
err = transactions.Update(ctx, []stripecoinpayments.TransactionUpdate{upd}, coinpayments.TransactionIDList{tx1.ID})
require.NoError(t, err)
tx1.Status = coinpayments.StatusReceived
tx1.Received = newReceived
}
// perform a ListAccount on old schema
{
accountTxs, err := transactions.ListAccount(ctx, accountID)
require.NoError(t, err)
require.Len(t, accountTxs, 1)
require.Equal(t, tx1, accountTxs[0])
}
// perform a ListPending on old schema
{
pendingTxs, err := transactions.ListPending(ctx, 0, 10, time.Now().UTC())
require.NoError(t, err)
require.Len(t, pendingTxs.Transactions, 1)
// ListPending doesn't get the timeout column, so set it manually in order to check equality of other fields
pendingTxs.Transactions[0].Timeout = tx1.Timeout
require.Equal(t, tx1, pendingTxs.Transactions[0])
require.False(t, pendingTxs.Next)
require.Equal(t, int64(0), pendingTxs.NextOffset)
}
// perform a ListUnapplied on old schema
{
unappliedTxs, err := transactions.ListUnapplied(ctx, 0, 10, time.Now().UTC())
require.NoError(t, err)
require.Len(t, unappliedTxs.Transactions, 1)
// ListUnapplied doesn't get the timeout column, so set it manually in order to check equality of other fields
unappliedTxs.Transactions[0].Timeout = tx1.Timeout
require.Equal(t, tx1, unappliedTxs.Transactions[0])
require.False(t, unappliedTxs.Next)
require.Equal(t, int64(0), unappliedTxs.NextOffset)
}
// perform a LockRate on old schema
{
err = transactions.LockRate(ctx, tx1.ID, rate1)
require.NoError(t, err)
}
// perform a GetLockedRate on old schema
{
gotRate, err := transactions.GetLockedRate(ctx, tx1.ID)
require.NoError(t, err)
require.Equal(t, rate1, gotRate)
}
// do schema update
{
transitionDB := transactions.(interface {
DebugPerformBigFloatTransition(ctx context.Context) error
})
err = transitionDB.DebugPerformBigFloatTransition(ctx)
require.NoError(t, err)
}
// perform an Insert on new schema
{
createdAt, err := transactions.Insert(ctx, tx2)
require.NoError(t, err)
requireSaneTimestamp(t, createdAt)
tx2.CreatedAt = createdAt
}
// perform a ListAccount on new schema, while tx1 has a received_gob
{
accountTxs, err := transactions.ListAccount(ctx, accountID)
require.NoError(t, err)
require.Len(t, accountTxs, 2)
// results should be ordered in reverse order of creation
require.Equal(t, tx2, accountTxs[0])
require.Equal(t, tx1, accountTxs[1])
}
// perform an Update on new schema, on a row with a received_gob
{
newReceived, err := monetary.AmountFromString("1.11111111", monetary.StorjToken)
require.NoError(t, err)
upd := stripecoinpayments.TransactionUpdate{
TransactionID: tx1.ID,
Status: coinpayments.StatusPending,
Received: newReceived,
}
err = transactions.Update(ctx, []stripecoinpayments.TransactionUpdate{upd}, nil)
require.NoError(t, err)
tx1.Status = coinpayments.StatusPending
tx1.Received = newReceived
}
// perform an Update on new schema, on a row with a received_numeric
{
newReceived, err := monetary.AmountFromString("2.12121212", monetary.StorjToken)
require.NoError(t, err)
upd := stripecoinpayments.TransactionUpdate{
TransactionID: tx2.ID,
Status: coinpayments.StatusCompleted,
Received: newReceived,
}
err = transactions.Update(ctx, []stripecoinpayments.TransactionUpdate{upd}, coinpayments.TransactionIDList{tx2.ID})
require.NoError(t, err)
tx2.Status = coinpayments.StatusCompleted
tx2.Received = newReceived
}
// perform a ListAccount on new schema, after the above changes
{
accountTxs, err := transactions.ListAccount(ctx, accountID)
require.NoError(t, err)
require.Len(t, accountTxs, 2)
// results should be ordered in reverse order of creation
require.Equal(t, tx2, accountTxs[0])
require.Equal(t, tx1, accountTxs[1])
}
// perform a ListPending on new schema
{
pendingTxs, err := transactions.ListPending(ctx, 0, 10, time.Now().UTC())
require.NoError(t, err)
require.Len(t, pendingTxs.Transactions, 1)
// ListPending doesn't get the timeout column, so set it manually in order to check equality of other fields
pendingTxs.Transactions[0].Timeout = tx1.Timeout
require.Equal(t, tx1, pendingTxs.Transactions[0])
require.False(t, pendingTxs.Next)
require.Equal(t, int64(0), pendingTxs.NextOffset)
}
// perform a ListUnapplied on new schema
{
unappliedTxs, err := transactions.ListUnapplied(ctx, 0, 10, time.Now().UTC())
require.NoError(t, err)
require.Len(t, unappliedTxs.Transactions, 1)
// ListUnapplied doesn't get the timeout column, so set it manually in order to check equality of other fields
unappliedTxs.Transactions[0].Timeout = tx2.Timeout
require.Equal(t, tx2, unappliedTxs.Transactions[0])
require.False(t, unappliedTxs.Next)
require.Equal(t, int64(0), unappliedTxs.NextOffset)
}
// perform a LockRate on new schema
{
err = transactions.LockRate(ctx, tx2.ID, rate2)
require.NoError(t, err)
}
// perform a GetLockedRate on new schema
{
gotRate, err := transactions.GetLockedRate(ctx, tx1.ID)
require.NoError(t, err)
require.Equal(t, rate1, gotRate)
gotRate, err = transactions.GetLockedRate(ctx, tx2.ID)
require.NoError(t, err)
require.Equal(t, rate2, gotRate)
}
})
}