satellite: remove unused coinpayments code and chores

issue: https://github.com/storj/storj/issues/4824

Change-Id: I2e3e63151d1def96270279719f6eceda0acba66c
This commit is contained in:
Yaroslav Vorobiov 2022-11-29 13:36:41 +01:00
parent 4df1f5b50f
commit bb1e86c790
18 changed files with 72 additions and 1770 deletions

View File

@ -135,7 +135,6 @@ type API struct {
StorjscanService *storjscan.Service
StorjscanClient *storjscan.Client
Conversion *stripecoinpayments.ConversionService
StripeService *stripecoinpayments.Service
StripeClient stripecoinpayments.StripeClient
}
@ -533,16 +532,6 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
peer.Payments.StripeClient = stripeClient
peer.Payments.Accounts = peer.Payments.StripeService.Accounts()
peer.Payments.Conversion = stripecoinpayments.NewConversionService(
peer.Log.Named("payments.stripe:version"),
peer.Payments.StripeService,
pc.StripeCoinPayments.ConversionRatesCycleInterval)
peer.Services.Add(lifecycle.Item{
Name: "payments.stripe:version",
Run: peer.Payments.Conversion.Run,
Close: peer.Payments.Conversion.Close,
})
peer.Payments.StorjscanClient = storjscan.NewClient(
pc.Storjscan.Endpoint,

View File

@ -1003,9 +1003,9 @@ func TestPaymentsWalletPayments(t *testing.T) {
Timeout: 0,
}
createdAt, err := sat.DB.StripeCoinPayments().Transactions().Insert(ctx, tx)
createdAt, err := sat.DB.StripeCoinPayments().Transactions().TestInsert(ctx, tx)
require.NoError(t, err)
err = sat.DB.StripeCoinPayments().Transactions().LockRate(ctx, tx.ID, decimal.NewFromInt(1))
err = sat.DB.StripeCoinPayments().Transactions().TestLockRate(ctx, tx.ID, decimal.NewFromInt(1))
require.NoError(t, err)
tx.CreatedAt = createdAt.UTC()

View File

@ -146,7 +146,6 @@ type Core struct {
Payments struct {
Accounts payments.Accounts
BillingChore *billing.Chore
Chore *stripecoinpayments.Chore
StorjscanClient *storjscan.Client
StorjscanService *storjscan.Service
StorjscanChore *storjscan.Chore
@ -565,21 +564,6 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
peer.Payments.Accounts = service.Accounts()
peer.Payments.Chore = stripecoinpayments.NewChore(
peer.Log.Named("payments.stripe:clearing"),
service,
pc.StripeCoinPayments.TransactionUpdateInterval,
pc.StripeCoinPayments.AccountBalanceUpdateInterval,
)
peer.Services.Add(lifecycle.Item{
Name: "payments.stripe:service",
Run: peer.Payments.Chore.Run,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Payments Stripe Transactions", peer.Payments.Chore.TransactionCycle),
debug.Cycle("Payments Stripe Account Balance", peer.Payments.Chore.AccountBalanceCycle),
)
peer.Payments.StorjscanClient = storjscan.NewClient(
pc.Storjscan.Endpoint,
pc.Storjscan.Auth.Identifier,

View File

@ -1,120 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package coinpayments
import (
"bytes"
"context"
"crypto/hmac"
"crypto/sha512"
"encoding/hex"
"encoding/json"
"net/http"
"net/url"
"github.com/zeebo/errs"
)
// Error is error class API errors.
var Error = errs.Class("coinpayments client")
// ErrMissingPublicKey is returned when Coinpayments client is missing public key.
var ErrMissingPublicKey = errs.Class("missing public key")
// Credentials contains public and private API keys for client.
type Credentials struct {
PublicKey string
PrivateKey string
}
type httpClient interface {
Do(*http.Request) (*http.Response, error)
}
// Client handles base API processing.
type Client struct {
creds Credentials
http httpClient
}
// NewClient creates new instance of client with provided credentials.
func NewClient(creds Credentials) *Client {
client := &Client{
creds: creds,
http: &http.Client{
Timeout: 0,
},
}
return client
}
// Transactions returns transactions API.
func (c *Client) Transactions() Transactions {
return Transactions{client: c}
}
// ConversionRates returns ConversionRates API.
func (c *Client) ConversionRates() ConversionRates {
return ConversionRates{client: c}
}
// do handles base API request routines.
func (c *Client) do(ctx context.Context, cmd string, values url.Values) (_ json.RawMessage, err error) {
if c.creds.PublicKey == "" {
return nil, Error.Wrap(ErrMissingPublicKey.New(""))
}
values.Set("version", "1")
values.Set("format", "json")
values.Set("key", c.creds.PublicKey)
values.Set("cmd", cmd)
encoded := values.Encode()
buff := bytes.NewBufferString(encoded)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "https://www.coinpayments.net/api.php", buff)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
req.Header.Set("HMAC", c.hmac([]byte(encoded)))
resp, err := c.http.Do(req.WithContext(ctx))
if err != nil {
return nil, err
}
defer func() {
err = errs.Combine(err, resp.Body.Close())
}()
if resp.StatusCode != http.StatusOK {
return nil, errs.New("internal server error")
}
var data struct {
Error string `json:"error"`
Result json.RawMessage `json:"result"`
}
if err = json.NewDecoder(resp.Body).Decode(&data); err != nil {
return nil, err
}
if data.Error != "ok" {
return nil, errs.New(data.Error)
}
return data.Result, nil
}
// hmac returns string representation of HMAC signature
// signed with clients private key.
func (c *Client) hmac(payload []byte) string {
mac := hmac.New(sha512.New, []byte(c.creds.PrivateKey))
_, _ = mac.Write(payload)
return hex.EncodeToString(mac.Sum(nil))
}

View File

@ -1,116 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package coinpayments
import (
"context"
"encoding/json"
"net/url"
"strconv"
"time"
"github.com/shopspring/decimal"
"storj.io/common/currency"
)
// cmdRates is API command for retrieving currency rate infos.
const cmdRates = "rates"
// ExchangeStatus defines if currency is exchangeable.
type ExchangeStatus string
const (
// ExchangeStatusOnline defines exchangeable currency.
ExchangeStatusOnline ExchangeStatus = "online"
// ExchangeStatusOffline defines currency that can not be convertible at the moment.
ExchangeStatusOffline ExchangeStatus = "offline"
)
// CurrencyRateInfo holds currency conversion info.
type CurrencyRateInfo struct {
IsFiat bool
RateBTC decimal.Decimal
TXFee decimal.Decimal
Status ExchangeStatus
LastUpdate time.Time
}
// UnmarshalJSON converts JSON string to currency rate info.
func (rateInfo *CurrencyRateInfo) UnmarshalJSON(b []byte) error {
var rateRaw struct {
IsFiat int `json:"is_fiat"`
RateBTC string `json:"rate_btc"`
TXFee string `json:"tx_fee"`
Status string `json:"status"`
LastUpdate string `json:"last_update"`
}
if err := json.Unmarshal(b, &rateRaw); err != nil {
return err
}
rateBTC, err := decimal.NewFromString(rateRaw.RateBTC)
if err != nil {
return err
}
txFee, err := decimal.NewFromString(rateRaw.TXFee)
if err != nil {
return err
}
lastUpdate, err := strconv.ParseInt(rateRaw.LastUpdate, 10, 64)
if err != nil {
return err
}
*rateInfo = CurrencyRateInfo{
IsFiat: rateRaw.IsFiat > 0,
RateBTC: rateBTC,
TXFee: txFee,
Status: ExchangeStatus(rateRaw.Status),
LastUpdate: time.Unix(lastUpdate, 0),
}
return nil
}
// CurrencyRateInfos maps currency to currency rate info.
type CurrencyRateInfos map[CurrencySymbol]CurrencyRateInfo
// ForCurrency allows lookup into a CurrencyRateInfos map by currency
// object, instead of by its coinpayments.net-specific symbol.
func (infos CurrencyRateInfos) ForCurrency(currency *currency.Currency) (info CurrencyRateInfo, ok bool) {
coinpaymentsSymbol, ok := currencySymbols[currency]
if !ok {
return info, false
}
info, ok = infos[coinpaymentsSymbol]
return info, ok
}
// ConversionRates collection of API methods for retrieving currency
// conversion rates.
type ConversionRates struct {
client *Client
}
// Get returns USD rate for specified currency.
func (rates ConversionRates) Get(ctx context.Context) (CurrencyRateInfos, error) {
values := make(url.Values)
values.Set("short", "1")
rateInfos := make(CurrencyRateInfos)
res, err := rates.client.do(ctx, cmdRates, values)
if err != nil {
return nil, Error.Wrap(err)
}
if err = json.Unmarshal(res, &rateInfos); err != nil {
return nil, Error.Wrap(err)
}
return rateInfos, nil
}

View File

@ -1,101 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package coinpayments
import (
"bytes"
"io"
"net/http"
"testing"
"github.com/shopspring/decimal"
"github.com/stretchr/testify/require"
"storj.io/common/currency"
"storj.io/common/testcontext"
)
const (
// reference https://www.coinpayments.net/apidoc-rates
ratesJSON = `{
"USD": {
"is_fiat": 1,
"rate_btc": "0.0001234215748657146721341",
"last_update": "1633015701",
"tx_fee": "0.00000000",
"status": "online",
"name": "United States Dollar",
"confirms": "3",
"capabilities": [
"payments", "wallet", "transfers", "convert"
]
},
"BTC": {
"is_fiat": 0,
"rate_btc": "1.000000000000000000000000",
"last_update": "1632931502",
"tx_fee": "0.00100000",
"status": "online",
"name": "Bitcoin",
"confirms": "2",
"capabilities": [
"payments", "wallet", "transfers", "convert"
]
},
"LTCT": {
"is_fiat": 0,
"rate_btc": "999999.999999999999999999",
"last_update": "1628027418",
"tx_fee": "0.00000000",
"status": "online",
"name": "LTCT test coins",
"confirms": "2",
"capabilities": []
}
}`
resultJSON = `{"error": "ok", "result": ` + ratesJSON + `}`
publicKey = "hi i am a public key"
privateKey = "hi i am a private key"
)
type dumbMockClient struct {
response string
}
func (c *dumbMockClient) Do(req *http.Request) (*http.Response, error) {
return &http.Response{
Status: "OK",
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewBuffer([]byte(c.response))),
ContentLength: int64(len(c.response)),
}, nil
}
func TestProcessingConversionRates(t *testing.T) {
rateService := Client{
creds: Credentials{PublicKey: publicKey, PrivateKey: privateKey},
http: &dumbMockClient{response: resultJSON},
}
rateInfos, err := rateService.ConversionRates().Get(testcontext.New(t))
require.NoError(t, err)
require.Truef(t, rateInfos["BTC"].RateBTC.Equal(decimal.NewFromFloat(1.0)),
"expected 1.0, but got %v", rateInfos["BTC"].RateBTC.String())
require.Truef(t, rateInfos["USD"].RateBTC.LessThan(decimal.NewFromInt(1)),
"expected value less than 1, but got %v", rateInfos["USD"].RateBTC.String())
rateInfo, ok := rateInfos.ForCurrency(currency.USDollars)
require.True(t, ok)
require.True(t, rateInfo.IsFiat)
_, ok = rateInfos.ForCurrency(currency.LiveGoats)
require.False(t, ok)
rateInfo, ok = rateInfos.ForCurrency(CurrencyLTCT)
require.True(t, ok)
require.True(t, rateInfo.TXFee.Equal(decimal.NewFromInt(0)))
}

View File

@ -1,28 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package coinpayments
import (
"storj.io/common/currency"
)
// CurrencySymbol is a symbol for a currency as recognized by coinpayments.net.
type CurrencySymbol string
var (
// CurrencyLTCT defines LTCT, coins used for testing purpose.
CurrencyLTCT = currency.New("LTCT test coins", "LTCT", 8)
// currencySymbols maps known currency objects to the currency symbols
// as recognized on coinpayments.net. In many cases, the currency's own
// idea of its symbol (currency.Symbol()) will be the same as this
// CurrencySymbol, but we should probably not count on that always being
// the case.
currencySymbols = map[*currency.Currency]CurrencySymbol{
currency.USDollars: "USD",
currency.StorjToken: "STORJ",
currency.Bitcoin: "BTC",
CurrencyLTCT: "LTCT",
}
)

View File

@ -3,25 +3,7 @@
package coinpayments
import (
"context"
"encoding/json"
"net/url"
"strconv"
"strings"
"time"
"github.com/shopspring/decimal"
"github.com/zeebo/errs"
"storj.io/common/currency"
)
const (
cmdCreateTransaction = "create_transaction"
cmdGetTransactionInfo = "get_tx_info"
cmdGetTransactionInfoList = "get_tx_info_multi"
)
import "strings"
// Status is a type wrapper for transaction statuses.
type Status int
@ -87,231 +69,3 @@ func (list TransactionIDList) Encode() string {
builder.WriteString(string(list[len(list)-1]))
return builder.String()
}
// Transaction contains data returned on transaction creation.
type Transaction struct {
ID TransactionID
Address string
Amount currency.Amount
DestTag string
ConfirmsNeeded int
Timeout time.Duration
CheckoutURL string
StatusURL string
QRCodeURL string
}
// UnmarshalJSON handles json unmarshaling for transaction.
func (tx *Transaction) UnmarshalJSON(b []byte) error {
var txRaw struct {
Amount string `json:"amount"`
Address string `json:"address"`
DestTag string `json:"dest_tag"`
TxID string `json:"txn_id"`
ConfirmsNeeded string `json:"confirms_needed"`
Timeout int `json:"timeout"`
CheckoutURL string `json:"checkout_url"`
StatusURL string `json:"status_url"`
QRCodeURL string `json:"qrcode_url"`
}
if err := json.Unmarshal(b, &txRaw); err != nil {
return err
}
amount, err := currency.AmountFromString(txRaw.Amount, currency.StorjToken)
if err != nil {
return err
}
confirms, err := strconv.ParseInt(txRaw.ConfirmsNeeded, 10, 64)
if err != nil {
return err
}
*tx = Transaction{
ID: TransactionID(txRaw.TxID),
Address: txRaw.Address,
Amount: amount,
DestTag: txRaw.DestTag,
ConfirmsNeeded: int(confirms),
Timeout: time.Second * time.Duration(txRaw.Timeout),
CheckoutURL: txRaw.CheckoutURL,
StatusURL: txRaw.StatusURL,
QRCodeURL: txRaw.QRCodeURL,
}
return nil
}
// TransactionInfo holds transaction information.
type TransactionInfo struct {
Address string
Coin CurrencySymbol
Amount decimal.Decimal
Received decimal.Decimal
ConfirmsReceived int
Status Status
ExpiresAt time.Time
CreatedAt time.Time
}
// UnmarshalJSON handles json unmarshaling for transaction info.
func (info *TransactionInfo) UnmarshalJSON(b []byte) error {
var txInfoRaw struct {
Address string `json:"payment_address"`
Coin string `json:"coin"`
Status int `json:"status"`
Amount decimal.Decimal `json:"amountf"`
Received decimal.Decimal `json:"receivedf"`
ConfirmsRecv int `json:"recv_confirms"`
ExpiresAt int64 `json:"time_expires"`
CreatedAt int64 `json:"time_created"`
}
if err := json.Unmarshal(b, &txInfoRaw); err != nil {
return err
}
*info = TransactionInfo{
Address: txInfoRaw.Address,
Coin: CurrencySymbol(txInfoRaw.Coin),
Amount: txInfoRaw.Amount,
Received: txInfoRaw.Received,
ConfirmsReceived: txInfoRaw.ConfirmsRecv,
Status: Status(txInfoRaw.Status),
ExpiresAt: time.Unix(txInfoRaw.ExpiresAt, 0),
CreatedAt: time.Unix(txInfoRaw.CreatedAt, 0),
}
return nil
}
// TransactionInfos is map of transaction infos by transaction id.
type TransactionInfos map[TransactionID]TransactionInfo
// UnmarshalJSON handles json unmarshaling for TransactionInfos.
func (infos *TransactionInfos) UnmarshalJSON(b []byte) error {
var _infos map[TransactionID]TransactionInfo
var errors map[TransactionID]struct {
Error string `json:"error"`
}
if err := json.Unmarshal(b, &errors); err != nil {
return err
}
var errg errs.Group
for _, info := range errors {
if info.Error != "ok" {
errg.Add(errs.New(info.Error))
}
}
if err := errg.Err(); err != nil {
return err
}
if err := json.Unmarshal(b, &_infos); err != nil {
return err
}
for id, info := range _infos {
(*infos)[id] = info
}
return nil
}
// CreateTX defines parameters for transaction creating.
type CreateTX struct {
Amount decimal.Decimal
CurrencyIn *currency.Currency
CurrencyOut *currency.Currency
BuyerEmail string
}
// Transactions defines transaction related API methods.
type Transactions struct {
client *Client
}
// Create creates new transaction.
func (t Transactions) Create(ctx context.Context, params *CreateTX) (*Transaction, error) {
cpSymbolIn, ok := currencySymbols[params.CurrencyIn]
if !ok {
return nil, Error.New("can't identify coinpayments currency symbol for %q", params.CurrencyIn.Name())
}
cpSymbolOut, ok := currencySymbols[params.CurrencyOut]
if !ok {
return nil, Error.New("can't identify coinpayments currency symbol for %q", params.CurrencyOut.Name())
}
values := make(url.Values)
values.Set("amount", params.Amount.String())
values.Set("currency1", string(cpSymbolIn))
values.Set("currency2", string(cpSymbolOut))
values.Set("buyer_email", params.BuyerEmail)
tx := new(Transaction)
res, err := t.client.do(ctx, cmdCreateTransaction, values)
if err != nil {
return nil, Error.Wrap(err)
}
if err = json.Unmarshal(res, tx); err != nil {
return nil, Error.Wrap(err)
}
return tx, nil
}
// Info receives transaction info by transaction id.
func (t Transactions) Info(ctx context.Context, id TransactionID) (*TransactionInfo, error) {
values := make(url.Values)
values.Set("txid", id.String())
txInfo := new(TransactionInfo)
res, err := t.client.do(ctx, cmdGetTransactionInfo, values)
if err != nil {
return nil, Error.Wrap(err)
}
if err = json.Unmarshal(res, txInfo); err != nil {
return nil, Error.Wrap(err)
}
return txInfo, nil
}
// ListInfos returns transaction infos.
func (t Transactions) ListInfos(ctx context.Context, ids TransactionIDList) (TransactionInfos, error) {
// The service supports a max batch size of 25 items
const batchSize = 25
var allErrors error
numIds := len(ids)
txInfos := make(TransactionInfos, numIds)
for i := 0; i < len(ids); i += batchSize {
j := i + batchSize
if j > numIds {
j = numIds
}
batchInfos := make(TransactionInfos, j-i)
values := make(url.Values, j-i)
values.Set("txid", ids[i:j].Encode())
res, err := t.client.do(ctx, cmdGetTransactionInfoList, values)
if err != nil {
allErrors = errs.Combine(allErrors, err)
}
if err = json.Unmarshal(res, &batchInfos); err != nil {
allErrors = errs.Combine(allErrors, err)
}
for k, v := range batchInfos {
txInfos[k] = v
}
}
return txInfos, allErrors
}

View File

@ -1,60 +0,0 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package coinpayments_test
import (
"testing"
"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"
"storj.io/common/currency"
"storj.io/common/testcontext"
"storj.io/storj/satellite"
"storj.io/storj/satellite/payments/coinpayments"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
)
func TestListInfos(t *testing.T) {
// This test is deliberately skipped as it requires credentials to coinpayments.net
t.SkipNow()
ctx := testcontext.New(t)
defer ctx.Cleanup()
payments := coinpayments.NewClient(coinpayments.Credentials{
PublicKey: "ask-littleskunk-on-keybase",
PrivateKey: "ask-littleskunk-on-keybase",
}).Transactions()
// verify that bad ids fail
infos, err := payments.ListInfos(ctx, coinpayments.TransactionIDList{"an_unlikely_id"})
assert.Error(t, err)
assert.Len(t, infos, 0)
// verify that ListInfos can handle more than 25 good ids
ids := coinpayments.TransactionIDList{}
for x := 0; x < 27; x++ {
tx, err := payments.Create(ctx,
&coinpayments.CreateTX{
Amount: decimal.NewFromInt(100),
CurrencyIn: currency.StorjToken,
CurrencyOut: currency.StorjToken,
BuyerEmail: "test@test.com",
},
)
ids = append(ids, tx.ID)
assert.NoError(t, err)
}
infos, err = payments.ListInfos(ctx, ids)
assert.NoError(t, err)
assert.Len(t, infos, 27)
}
func TestUpdateSameAppliesDoesNotExplode(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
tdb := db.StripeCoinPayments().Transactions()
assert.NoError(t, tdb.Update(ctx, nil, coinpayments.TransactionIDList{"blah", "blah"}))
assert.NoError(t, tdb.Update(ctx, nil, coinpayments.TransactionIDList{"blah", "blah"}))
})
}

View File

@ -3,31 +3,7 @@
package coinpayments
import (
"net/url"
"github.com/zeebo/errs"
)
// ErrNoAuthorizationKey is error that indicates that there is no authorization key.
var ErrNoAuthorizationKey = Error.New("no authorization key")
// GetTransactionKeyFromURL parses provided raw url string
// and extracts authorization key from it. Returns ErrNoAuthorizationKey if
// there is no authorization key and error if rawurl cannot be parsed.
func GetTransactionKeyFromURL(rawurl string) (string, error) {
u, err := url.Parse(rawurl)
if err != nil {
return "", errs.Wrap(err)
}
key := u.Query().Get("key")
if key == "" {
return "", ErrNoAuthorizationKey
}
return key, nil
}
import "net/url"
// GetCheckoutURL constructs checkout url from auth key and transaction id.
func GetCheckoutURL(key string, id TransactionID) string {

View File

@ -4,6 +4,7 @@
package coinpayments_test
import (
"net/url"
"testing"
"github.com/stretchr/testify/assert"
@ -15,10 +16,10 @@ import (
func TestGetCheckoutURL(t *testing.T) {
expected := "example"
url := coinpayments.GetCheckoutURL(expected, "id")
link := coinpayments.GetCheckoutURL(expected, "id")
key, err := coinpayments.GetTransactionKeyFromURL(url)
u, err := url.Parse(link)
require.NoError(t, err)
assert.Equal(t, expected, key)
assert.Equal(t, expected, u.Query().Get("key"))
}

View File

@ -1,81 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package stripecoinpayments
import (
"context"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"storj.io/common/sync2"
)
// ErrChore is stripecoinpayments clearing loop chore error class.
var ErrChore = errs.Class("stripecoinpayments chore")
// Chore runs clearing process of reconciling transactions deposits,
// customer balance, invoices and usages.
//
// architecture: Chore
type Chore struct {
log *zap.Logger
service *Service
TransactionCycle *sync2.Cycle
AccountBalanceCycle *sync2.Cycle
}
// NewChore creates new clearing loop chore.
// TODO: uncomment new interval when coupons will be finished.
func NewChore(log *zap.Logger, service *Service, txInterval, accBalanceInterval time.Duration) *Chore {
return &Chore{
log: log,
service: service,
TransactionCycle: sync2.NewCycle(txInterval),
AccountBalanceCycle: sync2.NewCycle(accBalanceInterval),
}
}
// Run runs all clearing related cycles.
func (chore *Chore) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
var group errgroup.Group
chore.TransactionCycle.Start(ctx, &group,
func(ctx context.Context) error {
chore.log.Info("running transactions update cycle")
if err := chore.service.updateTransactionsLoop(ctx); err != nil {
chore.log.Error("transaction update cycle failed", zap.Error(ErrChore.Wrap(err)))
}
return nil
},
)
chore.AccountBalanceCycle.Start(ctx, &group,
func(ctx context.Context) error {
chore.log.Info("running account balance update cycle")
if err := chore.service.updateAccountBalanceLoop(ctx); err != nil {
chore.log.Error("account balance update cycle failed", zap.Error(ErrChore.Wrap(err)))
}
return nil
},
)
return ErrChore.Wrap(group.Wait())
}
// Close closes all underlying resources.
func (chore *Chore) Close() (err error) {
defer mon.Task()(nil)(&err)
chore.TransactionCycle.Close()
chore.AccountBalanceCycle.Close()
return nil
}

View File

@ -4,15 +4,9 @@
package stripecoinpayments
import (
"context"
"time"
"github.com/shopspring/decimal"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/currency"
"storj.io/common/sync2"
)
// convertToCents convert amount to USD cents with given rate.
@ -22,49 +16,3 @@ func convertToCents(rate decimal.Decimal, amount currency.Amount) int64 {
usdCents := usd.Shift(2)
return usdCents.Round(0).IntPart()
}
// ErrConversion defines version service error.
var ErrConversion = errs.Class("conversion service")
// ConversionService updates conversion rates in a loop.
//
// architecture: Service
type ConversionService struct {
log *zap.Logger
service *Service
Cycle sync2.Cycle
}
// NewConversionService creates new instance of ConversionService.
func NewConversionService(log *zap.Logger, service *Service, interval time.Duration) *ConversionService {
return &ConversionService{
log: log,
service: service,
Cycle: *sync2.NewCycle(interval),
}
}
// Run runs loop which updates conversion rates for service.
func (conversion *ConversionService) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
return ErrConversion.Wrap(conversion.Cycle.Run(ctx,
func(ctx context.Context) error {
conversion.log.Debug("running conversion rates update cycle")
if err := conversion.service.UpdateRates(ctx); err != nil {
conversion.log.Error("conversion rates update cycle failed", zap.Error(ErrChore.Wrap(err)))
}
return nil
},
))
}
// Close closes underlying cycle.
func (conversion *ConversionService) Close() (err error) {
defer mon.Task()(nil)(&err)
conversion.Cycle.Close()
return nil
}

View File

@ -10,7 +10,6 @@ import (
"fmt"
"strconv"
"strings"
"sync"
"time"
"github.com/shopspring/decimal"
@ -25,7 +24,6 @@ import (
"storj.io/storj/satellite/console"
"storj.io/storj/satellite/payments"
"storj.io/storj/satellite/payments/billing"
"storj.io/storj/satellite/payments/coinpayments"
"storj.io/storj/satellite/payments/storjscan"
)
@ -47,11 +45,6 @@ type Config struct {
StripeSecretKey string `help:"stripe API secret key" default:""`
StripePublicKey string `help:"stripe API public key" default:""`
StripeFreeTierCouponID string `help:"stripe free tier coupon ID" default:""`
CoinpaymentsPublicKey string `help:"coinpayments API public key" default:""`
CoinpaymentsPrivateKey string `help:"coinpayments API private key key" default:""`
TransactionUpdateInterval time.Duration `help:"amount of time we wait before running next transaction update loop" default:"2m" testDefault:"$TESTINTERVAL"`
AccountBalanceUpdateInterval time.Duration `help:"amount of time we wait before running next account balance update loop" default:"2m" testDefault:"$TESTINTERVAL"`
ConversionRatesCycleInterval time.Duration `help:"amount of time we wait before running next conversion rates update loop" default:"10m" testDefault:"$TESTINTERVAL"`
AutoAdvance bool `help:"toogle autoadvance feature for invoice creation" default:"false"`
ListingLimit int `help:"sets the maximum amount of items before we start paging on requests" default:"100" hidden:"true"`
}
@ -69,7 +62,6 @@ type Service struct {
projectsDB console.Projects
usageDB accounting.ProjectAccounting
stripeClient StripeClient
coinPayments *coinpayments.Client
StorageMBMonthPriceCents decimal.Decimal
EgressMBPriceCents decimal.Decimal
@ -82,23 +74,12 @@ type Service struct {
// Stripe Extended Features
AutoAdvance bool
mu sync.Mutex
rates coinpayments.CurrencyRateInfos
ratesErr error
listingLimit int
nowFn func() time.Time
}
// NewService creates a Service instance.
func NewService(log *zap.Logger, stripeClient StripeClient, config Config, db DB, walletsDB storjscan.WalletsDB, billingDB billing.TransactionsDB, projectsDB console.Projects, usageDB accounting.ProjectAccounting, storageTBPrice, egressTBPrice, segmentPrice string, bonusRate int64) (*Service, error) {
coinPaymentsClient := coinpayments.NewClient(
coinpayments.Credentials{
PublicKey: config.CoinpaymentsPublicKey,
PrivateKey: config.CoinpaymentsPrivateKey,
},
)
storageTBMonthDollars, err := decimal.NewFromString(storageTBPrice)
if err != nil {
return nil, err
@ -125,7 +106,6 @@ func NewService(log *zap.Logger, stripeClient StripeClient, config Config, db DB
projectsDB: projectsDB,
usageDB: usageDB,
stripeClient: stripeClient,
coinPayments: coinPaymentsClient,
StorageMBMonthPriceCents: storageMBMonthPriceCents,
EgressMBPriceCents: egressMBPriceCents,
SegmentMonthPriceCents: segmentMonthPriceCents,
@ -142,255 +122,6 @@ func (service *Service) Accounts() payments.Accounts {
return &accounts{service: service}
}
// updateTransactionsLoop updates all pending transactions in a loop.
func (service *Service) updateTransactionsLoop(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
before := service.nowFn()
txsPage, err := service.db.Transactions().ListPending(ctx, 0, service.listingLimit, before)
if err != nil {
return err
}
if err := service.updateTransactions(ctx, txsPage.IDList(), txsPage.CreationTimes()); err != nil {
return err
}
for txsPage.Next {
if err = ctx.Err(); err != nil {
return err
}
txsPage, err = service.db.Transactions().ListPending(ctx, txsPage.NextOffset, service.listingLimit, before)
if err != nil {
return err
}
if err := service.updateTransactions(ctx, txsPage.IDList(), txsPage.CreationTimes()); err != nil {
return err
}
}
return nil
}
// updateTransactions updates statuses and received amount for given transactions.
func (service *Service) updateTransactions(ctx context.Context, ids TransactionAndUserList, creationTimes map[coinpayments.TransactionID]time.Time) (err error) {
defer mon.Task()(&ctx, ids)(&err)
if len(ids) == 0 {
service.log.Debug("no transactions found, skipping update")
return nil
}
infos, err := service.coinPayments.Transactions().ListInfos(ctx, ids.IDList())
if err != nil {
return err
}
var updates []TransactionUpdate
var applies coinpayments.TransactionIDList
for id, info := range infos {
service.log.Debug("Coinpayments results: ", zap.String("status", info.Status.String()), zap.String("id", id.String()))
updates = append(updates,
TransactionUpdate{
TransactionID: id,
Status: info.Status,
Received: currency.AmountFromDecimal(info.Received, currency.StorjToken),
},
)
// moment of CoinPayments receives funds, not when STORJ does
// this was a business decision to not wait until StatusCompleted
if info.Status >= coinpayments.StatusReceived {
// monkit currently does not have a DurationVal
mon.IntVal("coinpayment_duration").Observe(int64(time.Since(creationTimes[id])))
applies = append(applies, id)
}
}
return service.db.Transactions().Update(ctx, updates, applies)
}
// applyAccountBalanceLoop fetches all unapplied transaction in a loop, applying transaction
// received amount to stripe customer balance.
func (service *Service) updateAccountBalanceLoop(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
before := service.nowFn()
txsPage, err := service.db.Transactions().ListUnapplied(ctx, 0, service.listingLimit, before)
if err != nil {
return err
}
for _, tx := range txsPage.Transactions {
if err = ctx.Err(); err != nil {
return err
}
if err = service.applyTransactionBalance(ctx, tx); err != nil {
return err
}
}
for txsPage.Next {
if err = ctx.Err(); err != nil {
return err
}
txsPage, err = service.db.Transactions().ListUnapplied(ctx, txsPage.NextOffset, service.listingLimit, before)
if err != nil {
return err
}
for _, tx := range txsPage.Transactions {
if err = ctx.Err(); err != nil {
return err
}
if err = service.applyTransactionBalance(ctx, tx); err != nil {
return err
}
}
}
return nil
}
// applyTransactionBalance applies transaction received amount to stripe customer balance.
func (service *Service) applyTransactionBalance(ctx context.Context, tx Transaction) (err error) {
defer mon.Task()(&ctx)(&err)
cusID, err := service.db.Customers().GetCustomerID(ctx, tx.AccountID)
if err != nil {
return err
}
rate, err := service.db.Transactions().GetLockedRate(ctx, tx.ID)
if err != nil {
return err
}
cents := convertToCents(rate, tx.Received)
if cents <= 0 {
service.log.Warn("Trying to deposit non-positive amount.",
zap.Int64("USD cents", cents),
zap.Stringer("Transaction ID", tx.ID),
zap.Stringer("User ID", tx.AccountID),
)
return service.db.Transactions().Consume(ctx, tx.ID)
}
// Check for balance transactions created from previous failed attempt
var depositDone, bonusDone bool
it := service.stripeClient.CustomerBalanceTransactions().List(&stripe.CustomerBalanceTransactionListParams{Customer: stripe.String(cusID)})
for it.Next() {
cbt := it.CustomerBalanceTransaction()
if cbt.Type != stripe.CustomerBalanceTransactionTypeAdjustment {
continue
}
txID, ok := cbt.Metadata["txID"]
if !ok {
continue
}
if txID != tx.ID.String() {
continue
}
switch cbt.Description {
case StripeDepositTransactionDescription:
depositDone = true
case StripeDepositBonusTransactionDescription:
bonusDone = true
}
}
// The first balance transaction is for the actual deposit
if !depositDone {
params := &stripe.CustomerBalanceTransactionParams{
Amount: stripe.Int64(-cents),
Customer: stripe.String(cusID),
Currency: stripe.String(string(stripe.CurrencyUSD)),
Description: stripe.String(StripeDepositTransactionDescription),
}
params.AddMetadata("txID", tx.ID.String())
params.AddMetadata("storj_amount", tx.Amount.AsDecimal().String())
params.AddMetadata("storj_usd_rate", rate.String())
_, err = service.stripeClient.CustomerBalanceTransactions().New(params)
if err != nil {
return err
}
}
// The second balance transaction for the bonus
if !bonusDone {
params := &stripe.CustomerBalanceTransactionParams{
Amount: stripe.Int64(-cents * service.BonusRate / 100),
Customer: stripe.String(cusID),
Currency: stripe.String(string(stripe.CurrencyUSD)),
Description: stripe.String(StripeDepositBonusTransactionDescription),
}
params.AddMetadata("txID", tx.ID.String())
params.AddMetadata("percentage", strconv.Itoa(int(service.BonusRate)))
_, err = service.stripeClient.CustomerBalanceTransactions().New(params)
if err != nil {
return err
}
}
return service.db.Transactions().Consume(ctx, tx.ID)
}
// UpdateRates fetches new rates and updates service rate cache.
func (service *Service) UpdateRates(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
rates, err := service.coinPayments.ConversionRates().Get(ctx)
if coinpayments.ErrMissingPublicKey.Has(err) {
rates = coinpayments.CurrencyRateInfos{}
err = nil
service.log.Info("Coinpayment client is missing public key")
}
service.mu.Lock()
defer service.mu.Unlock()
service.rates = rates
service.ratesErr = err
return err
}
// GetRate returns conversion rate for specified currencies.
func (service *Service) GetRate(ctx context.Context, curr1, curr2 *currency.Currency) (_ decimal.Decimal, err error) {
defer mon.Task()(&ctx)(&err)
service.mu.Lock()
defer service.mu.Unlock()
if service.ratesErr != nil {
return decimal.Decimal{}, Error.Wrap(err)
}
info1, ok := service.rates.ForCurrency(curr1)
if !ok {
return decimal.Decimal{}, Error.New("no rate for currency %s", curr1.Name())
}
info2, ok := service.rates.ForCurrency(curr2)
if !ok {
return decimal.Decimal{}, Error.New("no rate for currency %s", curr2.Name())
}
return info1.RateBTC.Div(info2.RateBTC), nil
}
// PrepareInvoiceProjectRecords iterates through all projects and creates invoice records if none exist.
func (service *Service) PrepareInvoiceProjectRecords(ctx context.Context, period time.Time) (err error) {
defer mon.Task()(&ctx)(&err)

View File

@ -8,37 +8,25 @@ import (
"time"
"github.com/shopspring/decimal"
"github.com/zeebo/errs"
"storj.io/common/currency"
"storj.io/common/uuid"
"storj.io/storj/satellite/payments/coinpayments"
)
// ErrTransactionConsumed is thrown when trying to consume already consumed transaction.
var ErrTransactionConsumed = errs.New("error transaction already consumed")
// TransactionsDB is an interface which defines functionality
// of DB which stores coinpayments transactions.
//
// architecture: Database
type TransactionsDB interface {
// Insert inserts new coinpayments transaction into DB.
Insert(ctx context.Context, tx Transaction) (time.Time, error)
// Update updates status and received for set of transactions.
Update(ctx context.Context, updates []TransactionUpdate, applies coinpayments.TransactionIDList) error
// Consume marks transaction as consumed, so it won't participate in apply account balance loop.
Consume(ctx context.Context, id coinpayments.TransactionID) error
// LockRate locks conversion rate for transaction.
LockRate(ctx context.Context, id coinpayments.TransactionID, rate decimal.Decimal) error
// GetLockedRate returns locked conversion rate for transaction or error if non exists.
GetLockedRate(ctx context.Context, id coinpayments.TransactionID) (decimal.Decimal, error)
// ListAccount returns all transaction for specific user.
ListAccount(ctx context.Context, userID uuid.UUID) ([]Transaction, error)
// ListPending returns TransactionsPage with pending transactions.
ListPending(ctx context.Context, offset int64, limit int, before time.Time) (TransactionsPage, error)
// ListUnapplied returns TransactionsPage with completed transaction that should be applied to account balance.
ListUnapplied(ctx context.Context, offset int64, limit int, before time.Time) (TransactionsPage, error)
// TestInsert inserts new coinpayments transaction into DB.
TestInsert(ctx context.Context, tx Transaction) (time.Time, error)
// TestLockRate locks conversion rate for transaction.
TestLockRate(ctx context.Context, id coinpayments.TransactionID, rate decimal.Decimal) error
}
// Transaction defines coinpayments transaction info that is stored in the DB.
@ -53,48 +41,3 @@ type Transaction struct {
Timeout time.Duration
CreatedAt time.Time
}
// TransactionUpdate holds transaction update info.
type TransactionUpdate struct {
TransactionID coinpayments.TransactionID
Status coinpayments.Status
Received currency.Amount
}
// TransactionsPage holds set of transaction and indicates if
// there are more transactions to fetch.
type TransactionsPage struct {
Transactions []Transaction
Next bool
NextOffset int64
}
// IDList returns transaction id list of page's transactions.
func (page *TransactionsPage) IDList() TransactionAndUserList {
ids := make(TransactionAndUserList)
for _, tx := range page.Transactions {
ids[tx.ID] = tx.AccountID
}
return ids
}
// CreationTimes returns a map of creation times of page's transactions.
func (page *TransactionsPage) CreationTimes() map[coinpayments.TransactionID]time.Time {
creationTimes := make(map[coinpayments.TransactionID]time.Time)
for _, tx := range page.Transactions {
creationTimes[tx.ID] = tx.CreatedAt
}
return creationTimes
}
// TransactionAndUserList is a composite type for storing userID and txID.
type TransactionAndUserList map[coinpayments.TransactionID]uuid.UUID
// IDList returns transaction id list.
func (idMap TransactionAndUserList) IDList() coinpayments.TransactionIDList {
var list coinpayments.TransactionIDList
for transactionID := range idMap {
list = append(list, transactionID)
}
return list
}

View File

@ -5,24 +5,18 @@ package stripecoinpayments_test
import (
"encoding/base64"
"errors"
"sync"
"testing"
"time"
"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stripe/stripe-go/v72"
"github.com/zeebo/errs"
"storj.io/common/currency"
"storj.io/common/errs2"
"storj.io/common/memory"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/common/uuid"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
"storj.io/storj/satellite/payments/coinpayments"
"storj.io/storj/satellite/payments/stripecoinpayments"
@ -51,7 +45,7 @@ func TestTransactionsDB(t *testing.T) {
}
t.Run("insert", func(t *testing.T) {
createdAt, err := transactions.Insert(ctx, createTx)
createdAt, err := transactions.TestInsert(ctx, createTx)
require.NoError(t, err)
requireSaneTimestamp(t, createdAt)
txs, err := transactions.ListAccount(ctx, userID)
@ -59,62 +53,6 @@ func TestTransactionsDB(t *testing.T) {
require.Len(t, txs, 1)
compareTransactions(t, createTx, txs[0])
})
t.Run("update", func(t *testing.T) {
received, err := currency.AmountFromString("6.0000000000000000001", currency.StorjToken)
require.NoError(t, err)
update := stripecoinpayments.TransactionUpdate{
TransactionID: createTx.ID,
Status: coinpayments.StatusReceived,
Received: received,
}
err = transactions.Update(ctx, []stripecoinpayments.TransactionUpdate{update}, nil)
require.NoError(t, err)
page, err := transactions.ListPending(ctx, 0, 1, time.Now())
require.NoError(t, err)
require.Len(t, page.Transactions, 1)
assert.Equal(t, createTx.ID, page.Transactions[0].ID)
assert.Equal(t, update.Received, page.Transactions[0].Received)
assert.Equal(t, update.Status, page.Transactions[0].Status)
err = transactions.Update(ctx,
[]stripecoinpayments.TransactionUpdate{
{
TransactionID: createTx.ID,
Status: coinpayments.StatusCompleted,
Received: received,
},
},
coinpayments.TransactionIDList{
createTx.ID,
},
)
require.NoError(t, err)
page, err = transactions.ListUnapplied(ctx, 0, 1, time.Now())
require.NoError(t, err)
require.NotNil(t, page.Transactions)
require.Equal(t, 1, len(page.Transactions))
assert.Equal(t, createTx.ID, page.Transactions[0].ID)
assert.Equal(t, update.Received, page.Transactions[0].Received)
assert.Equal(t, coinpayments.StatusCompleted, page.Transactions[0].Status)
})
t.Run("consume", func(t *testing.T) {
err := transactions.Consume(ctx, createTx.ID)
require.NoError(t, err)
page, err := transactions.ListUnapplied(ctx, 0, 1, time.Now())
require.NoError(t, err)
assert.Nil(t, page.Transactions)
assert.Equal(t, 0, len(page.Transactions))
})
})
}
@ -126,77 +64,6 @@ func requireSaneTimestamp(t *testing.T, when time.Time) {
"%s seems too large to be a valid creation timestamp", when)
}
func TestConcurrentConsume(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
transactions := db.StripeCoinPayments().Transactions()
const concurrentTries = 30
amount, err := currency.AmountFromString("2.0000000000000000005", currency.StorjToken)
require.NoError(t, err)
received, err := currency.AmountFromString("1.0000000000000000003", currency.StorjToken)
require.NoError(t, err)
userID := testrand.UUID()
txID := coinpayments.TransactionID("testID")
_, err = transactions.Insert(ctx,
stripecoinpayments.Transaction{
ID: txID,
AccountID: userID,
Address: "testAddress",
Amount: amount,
Received: received,
Status: coinpayments.StatusPending,
Key: "testKey",
Timeout: time.Second * 60,
},
)
require.NoError(t, err)
err = transactions.Update(ctx,
[]stripecoinpayments.TransactionUpdate{{
TransactionID: txID,
Status: coinpayments.StatusCompleted,
Received: received,
}},
coinpayments.TransactionIDList{
txID,
},
)
require.NoError(t, err)
var errLock sync.Mutex
var alreadyConsumed []error
appendError := func(err error) {
defer errLock.Unlock()
errLock.Lock()
alreadyConsumed = append(alreadyConsumed, err)
}
var group errs2.Group
for i := 0; i < concurrentTries; i++ {
group.Go(func() error {
err := transactions.Consume(ctx, txID)
if err == nil {
return nil
}
if errors.Is(err, stripecoinpayments.ErrTransactionConsumed) {
appendError(err)
return nil
}
return err
})
}
require.NoError(t, errs.Combine(group.Wait()...))
require.Equal(t, concurrentTries-1, len(alreadyConsumed))
})
}
func TestTransactionsDBList(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
@ -218,47 +85,30 @@ func TestTransactionsDBList(t *testing.T) {
addr := base64.StdEncoding.EncodeToString(testrand.Bytes(4 * memory.B))
key := base64.StdEncoding.EncodeToString(testrand.Bytes(4 * memory.B))
status := coinpayments.StatusPending
if i%2 == 0 {
status = coinpayments.StatusReceived
}
createTX := stripecoinpayments.Transaction{
ID: coinpayments.TransactionID(id),
AccountID: uuid.UUID{},
Address: addr,
Amount: amount,
Received: received,
Status: status,
Status: coinpayments.StatusCompleted,
Key: key,
}
txs = append(txs, createTX)
}
t.Run("pending transactions", func(t *testing.T) {
t.Run("account", func(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
for _, tx := range txs {
_, err := db.StripeCoinPayments().Transactions().Insert(ctx, tx)
_, err := db.StripeCoinPayments().Transactions().TestInsert(ctx, tx)
require.NoError(t, err)
}
page, err := db.StripeCoinPayments().Transactions().ListPending(ctx, 0, limit, time.Now())
accTxs, err := db.StripeCoinPayments().Transactions().ListAccount(ctx, uuid.UUID{})
require.NoError(t, err)
pendingTXs := page.Transactions
for page.Next {
page, err = db.StripeCoinPayments().Transactions().ListPending(ctx, page.NextOffset, limit, time.Now())
require.NoError(t, err)
pendingTXs = append(pendingTXs, page.Transactions...)
}
require.False(t, page.Next)
require.Equal(t, transactionCount, len(pendingTXs))
for _, act := range page.Transactions {
for _, act := range accTxs {
for _, exp := range txs {
if act.ID == exp.ID {
compareTransactions(t, exp, act)
@ -267,58 +117,6 @@ func TestTransactionsDBList(t *testing.T) {
}
})
})
t.Run("unapplied transaction", func(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
var updatedTxs []stripecoinpayments.Transaction
var updates []stripecoinpayments.TransactionUpdate
var applies coinpayments.TransactionIDList
for _, tx := range txs {
_, err := db.StripeCoinPayments().Transactions().Insert(ctx, tx)
require.NoError(t, err)
tx.Status = coinpayments.StatusCompleted
updates = append(updates,
stripecoinpayments.TransactionUpdate{
TransactionID: tx.ID,
Status: tx.Status,
Received: tx.Received,
},
)
applies = append(applies, tx.ID)
updatedTxs = append(updatedTxs, tx)
}
err := db.StripeCoinPayments().Transactions().Update(ctx, updates, applies)
require.NoError(t, err)
page, err := db.StripeCoinPayments().Transactions().ListUnapplied(ctx, 0, limit, time.Now())
require.NoError(t, err)
unappliedTXs := page.Transactions
for page.Next {
page, err = db.StripeCoinPayments().Transactions().ListUnapplied(ctx, page.NextOffset, limit, time.Now())
require.NoError(t, err)
unappliedTXs = append(unappliedTXs, page.Transactions...)
}
require.False(t, page.Next)
require.Equal(t, transactionCount, len(unappliedTXs))
for _, act := range page.Transactions {
for _, exp := range updatedTxs {
if act.ID == exp.ID {
compareTransactions(t, exp, act)
}
}
}
})
})
}
func TestTransactionsDBRates(t *testing.T) {
@ -330,7 +128,7 @@ func TestTransactionsDBRates(t *testing.T) {
const txID = "tx_id"
err = transactions.LockRate(ctx, txID, val)
err = transactions.TestLockRate(ctx, txID, val)
require.NoError(t, err)
rate, err := transactions.GetLockedRate(ctx, txID)
@ -354,71 +152,3 @@ func compareTransactions(t *testing.T, exp, act stripecoinpayments.Transaction)
assert.Equal(t, exp.Timeout, act.Timeout)
assert.False(t, act.CreatedAt.IsZero())
}
func TestTransactions_ApplyTransactionBalance(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
transactions := satellite.API.DB.StripeCoinPayments().Transactions()
userID := planet.Uplinks[0].Projects[0].Owner.ID
satellite.Core.Payments.Chore.TransactionCycle.Pause()
satellite.Core.Payments.Chore.AccountBalanceCycle.Pause()
// Emulate a deposit through CoinPayments.
txID := coinpayments.TransactionID("testID")
storjAmount, err := currency.AmountFromString("100", currency.StorjToken)
require.NoError(t, err)
storjUSDRate, err := decimal.NewFromString("0.2")
require.NoError(t, err)
createTx := stripecoinpayments.Transaction{
ID: txID,
AccountID: userID,
Address: "testAddress",
Amount: storjAmount,
Received: storjAmount,
Status: coinpayments.StatusPending,
Key: "testKey",
Timeout: time.Second * 60,
}
tx, err := transactions.Insert(ctx, createTx)
require.NoError(t, err)
require.NotNil(t, tx)
update := stripecoinpayments.TransactionUpdate{
TransactionID: createTx.ID,
Status: coinpayments.StatusReceived,
Received: storjAmount,
}
err = transactions.Update(ctx, []stripecoinpayments.TransactionUpdate{update}, coinpayments.TransactionIDList{createTx.ID})
require.NoError(t, err)
// Check that the CoinPayments transaction is waiting to be applied to the Stripe customer balance.
page, err := transactions.ListUnapplied(ctx, 0, 1, time.Now())
require.NoError(t, err)
require.Len(t, page.Transactions, 1)
err = transactions.LockRate(ctx, txID, storjUSDRate)
require.NoError(t, err)
// Trigger the AccountBalanceCycle. This calls Service.applyTransactionBalance()
satellite.Core.Payments.Chore.AccountBalanceCycle.TriggerWait()
cusID, err := satellite.API.DB.StripeCoinPayments().Customers().GetCustomerID(ctx, userID)
require.NoError(t, err)
// Check that the CoinPayments deposit is reflected in the Stripe customer balance.
it := satellite.API.Payments.StripeClient.CustomerBalanceTransactions().List(&stripe.CustomerBalanceTransactionListParams{Customer: stripe.String(cusID)})
require.NoError(t, it.Err())
require.True(t, it.Next())
cbt := it.CustomerBalanceTransaction()
require.EqualValues(t, -2000, cbt.Amount)
require.EqualValues(t, txID, cbt.Metadata["txID"])
require.EqualValues(t, "100", cbt.Metadata["storj_amount"])
require.EqualValues(t, "0.2", cbt.Metadata["storj_usd_rate"])
})
}

View File

@ -20,21 +20,6 @@ import (
// ensure that coinpaymentsTransactions implements stripecoinpayments.TransactionsDB.
var _ stripecoinpayments.TransactionsDB = (*coinPaymentsTransactions)(nil)
// applyBalanceIntentState defines states of the apply balance intents.
type applyBalanceIntentState int
const (
// apply balance intent waits to be applied.
applyBalanceIntentStateUnapplied applyBalanceIntentState = 0
// transaction which balance intent points to has been consumed.
applyBalanceIntentStateConsumed applyBalanceIntentState = 1
)
// Int returns intent state as int.
func (intent applyBalanceIntentState) Int() int {
return int(intent)
}
// coinPaymentsTransactions is CoinPayments transactions DB.
//
// architecture: Database
@ -42,138 +27,6 @@ type coinPaymentsTransactions struct {
db *satelliteDB
}
// Insert inserts new coinpayments transaction into DB.
func (db *coinPaymentsTransactions) Insert(ctx context.Context, tx stripecoinpayments.Transaction) (createTime time.Time, err error) {
defer mon.Task()(&ctx)(&err)
dbxCPTX, err := db.db.Create_CoinpaymentsTransaction(ctx,
dbx.CoinpaymentsTransaction_Id(tx.ID.String()),
dbx.CoinpaymentsTransaction_UserId(tx.AccountID[:]),
dbx.CoinpaymentsTransaction_Address(tx.Address),
dbx.CoinpaymentsTransaction_AmountNumeric(tx.Amount.BaseUnits()),
dbx.CoinpaymentsTransaction_ReceivedNumeric(tx.Received.BaseUnits()),
dbx.CoinpaymentsTransaction_Status(tx.Status.Int()),
dbx.CoinpaymentsTransaction_Key(tx.Key),
dbx.CoinpaymentsTransaction_Timeout(int(tx.Timeout.Seconds())),
)
if err != nil {
return time.Time{}, err
}
return dbxCPTX.CreatedAt, nil
}
// Update updates status and received for set of transactions.
func (db *coinPaymentsTransactions) Update(ctx context.Context, updates []stripecoinpayments.TransactionUpdate, applies coinpayments.TransactionIDList) (err error) {
defer mon.Task()(&ctx)(&err)
if len(updates) == 0 {
return nil
}
err = db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
for _, update := range updates {
_, err = tx.Update_CoinpaymentsTransaction_By_Id(ctx,
dbx.CoinpaymentsTransaction_Id(update.TransactionID.String()),
dbx.CoinpaymentsTransaction_Update_Fields{
ReceivedNumeric: dbx.CoinpaymentsTransaction_ReceivedNumeric(update.Received.BaseUnits()),
Status: dbx.CoinpaymentsTransaction_Status(update.Status.Int()),
},
)
if err != nil {
return err
}
}
for _, txID := range applies {
query := tx.Rebind(`INSERT INTO stripecoinpayments_apply_balance_intents ( tx_id, state, created_at )
VALUES ( ?, ?, ? ) ON CONFLICT DO NOTHING`)
_, err = tx.Tx.ExecContext(ctx, query, txID.String(), applyBalanceIntentStateUnapplied.Int(), db.db.Hooks.Now().UTC())
if err != nil {
return err
}
}
return nil
})
return err
}
// Consume marks transaction as consumed, so it won't participate in apply account balance loop.
func (db *coinPaymentsTransactions) Consume(ctx context.Context, id coinpayments.TransactionID) (err error) {
defer mon.Task()(&ctx)(&err)
query := db.db.Rebind(`
WITH intent AS (
SELECT tx_id, state FROM stripecoinpayments_apply_balance_intents WHERE tx_id = ?
), updated AS (
UPDATE stripecoinpayments_apply_balance_intents AS ints
SET
state = ?
FROM intent
WHERE intent.tx_id = ints.tx_id AND ints.state = ?
RETURNING 1
)
SELECT EXISTS(SELECT 1 FROM intent) AS intent_exists, EXISTS(SELECT 1 FROM updated) AS intent_consumed;
`)
row := db.db.QueryRowContext(ctx, query, id, applyBalanceIntentStateConsumed, applyBalanceIntentStateUnapplied)
var exists, consumed bool
if err = row.Scan(&exists, &consumed); err != nil {
return err
}
if !exists {
return errs.New("can not consume transaction without apply balance intent")
}
if !consumed {
return stripecoinpayments.ErrTransactionConsumed
}
return err
}
// LockRate locks conversion rate for transaction.
func (db *coinPaymentsTransactions) LockRate(ctx context.Context, id coinpayments.TransactionID, rate decimal.Decimal) (err error) {
defer mon.Task()(&ctx)(&err)
rateFloat, exact := rate.Float64()
if !exact {
// It's not clear at the time of writing whether this
// inexactness will ever be something we need to worry about.
// According to the example in the API docs for
// coinpayments.net, exchange rates are given to 24 decimal
// places (!!), which is several digits more precision than we
// can represent exactly in IEEE754 double-precision floating
// point. However, that might not matter, since an exchange rate
// that is correct to ~15 decimal places multiplied by a precise
// monetary.Amount should give results that are correct to
// around 15 decimal places still. At current exchange rates,
// for example, a USD transaction would need to have a value of
// more than $1,000,000,000,000 USD before a calculation using
// this "inexact" rate would get the equivalent number of BTC
// wrong by a single satoshi (10^-8 BTC).
//
// We could avoid all of this by preserving the exact rates as
// given by our provider, but this would involve either (a)
// abuse of the SQL schema (e.g. storing rates as decimal values
// in VARCHAR), (b) storing rates in a way that is opaque to the
// db engine (e.g. gob-encoding, decimal coefficient with
// separate exponents), or (c) adding support for parameterized
// types like NUMERIC to dbx. None of those are very ideal
// either.
delta, _ := rate.Sub(decimal.NewFromFloat(rateFloat)).Float64()
mon.FloatVal("inexact-float64-exchange-rate-delta").Observe(delta)
}
_, err = db.db.Create_StripecoinpaymentsTxConversionRate(ctx,
dbx.StripecoinpaymentsTxConversionRate_TxId(id.String()),
dbx.StripecoinpaymentsTxConversionRate_RateNumeric(rateFloat),
)
return Error.Wrap(err)
}
// GetLockedRate returns locked conversion rate for transaction or error if non exists.
func (db *coinPaymentsTransactions) GetLockedRate(ctx context.Context, id coinpayments.TransactionID) (rate decimal.Decimal, err error) {
defer mon.Task()(&ctx)(&err)
@ -213,150 +66,64 @@ func (db *coinPaymentsTransactions) ListAccount(ctx context.Context, userID uuid
return txs, nil
}
// ListPending returns paginated list of pending transactions.
func (db *coinPaymentsTransactions) ListPending(ctx context.Context, offset int64, limit int, before time.Time) (_ stripecoinpayments.TransactionsPage, err error) {
// TestInsert inserts new coinpayments transaction into DB.
func (db *coinPaymentsTransactions) TestInsert(ctx context.Context, tx stripecoinpayments.Transaction) (createTime time.Time, err error) {
defer mon.Task()(&ctx)(&err)
query := db.db.Rebind(`SELECT
id,
user_id,
address,
amount_numeric,
received_numeric,
status,
key,
created_at
FROM coinpayments_transactions
WHERE status IN (?,?)
AND created_at <= ?
ORDER by created_at DESC
LIMIT ? OFFSET ?`)
rows, err := db.db.QueryContext(ctx, query, coinpayments.StatusPending, coinpayments.StatusReceived, before, limit+1, offset)
if err != nil {
return stripecoinpayments.TransactionsPage{}, Error.Wrap(err)
}
defer func() {
err = errs.Combine(err, rows.Close())
}()
var page stripecoinpayments.TransactionsPage
for rows.Next() {
var id, address string
var userID uuid.UUID
var amount, received *int64
var status int
var key string
var createdAt time.Time
err := rows.Scan(&id, &userID, &address, &amount, &received, &status, &key, &createdAt)
if err != nil {
return stripecoinpayments.TransactionsPage{}, Error.Wrap(err)
}
// TODO: the currency here should be passed in to this function or stored
// in the database.
page.Transactions = append(page.Transactions,
stripecoinpayments.Transaction{
ID: coinpayments.TransactionID(id),
AccountID: userID,
Address: address,
Amount: currency.AmountFromBaseUnits(*amount, currency.StorjToken),
Received: currency.AmountFromBaseUnits(*received, currency.StorjToken),
Status: coinpayments.Status(status),
Key: key,
CreatedAt: createdAt,
},
dbxCPTX, err := db.db.Create_CoinpaymentsTransaction(ctx,
dbx.CoinpaymentsTransaction_Id(tx.ID.String()),
dbx.CoinpaymentsTransaction_UserId(tx.AccountID[:]),
dbx.CoinpaymentsTransaction_Address(tx.Address),
dbx.CoinpaymentsTransaction_AmountNumeric(tx.Amount.BaseUnits()),
dbx.CoinpaymentsTransaction_ReceivedNumeric(tx.Received.BaseUnits()),
dbx.CoinpaymentsTransaction_Status(tx.Status.Int()),
dbx.CoinpaymentsTransaction_Key(tx.Key),
dbx.CoinpaymentsTransaction_Timeout(int(tx.Timeout.Seconds())),
)
if err != nil {
return time.Time{}, err
}
if err = rows.Err(); err != nil {
return stripecoinpayments.TransactionsPage{}, err
}
if len(page.Transactions) == limit+1 {
page.Next = true
page.NextOffset = offset + int64(limit)
page.Transactions = page.Transactions[:len(page.Transactions)-1]
}
return page, nil
return dbxCPTX.CreatedAt, nil
}
// ListUnapplied returns TransactionsPage with a pending or completed status, that should be applied to account balance.
func (db *coinPaymentsTransactions) ListUnapplied(ctx context.Context, offset int64, limit int, before time.Time) (_ stripecoinpayments.TransactionsPage, err error) {
// TestLockRate locks conversion rate for transaction.
func (db *coinPaymentsTransactions) TestLockRate(ctx context.Context, id coinpayments.TransactionID, rate decimal.Decimal) (err error) {
defer mon.Task()(&ctx)(&err)
query := db.db.Rebind(`SELECT
txs.id,
txs.user_id,
txs.address,
txs.amount_numeric,
txs.received_numeric,
txs.status,
txs.key,
txs.created_at
FROM coinpayments_transactions as txs
INNER JOIN stripecoinpayments_apply_balance_intents as ints
ON txs.id = ints.tx_id
WHERE txs.status >= ?
AND txs.created_at <= ?
AND ints.state = ?
ORDER by txs.created_at DESC
LIMIT ? OFFSET ?`)
rows, err := db.db.QueryContext(ctx, query, coinpayments.StatusReceived, before, applyBalanceIntentStateUnapplied, limit+1, offset)
if err != nil {
return stripecoinpayments.TransactionsPage{}, err
}
defer func() { err = errs.Combine(err, rows.Close()) }()
var page stripecoinpayments.TransactionsPage
for rows.Next() {
var id, address string
var userID uuid.UUID
var amountNumeric, receivedNumeric *int64
var status int
var key string
var createdAt time.Time
err := rows.Scan(&id, &userID, &address, &amountNumeric, &receivedNumeric, &status, &key, &createdAt)
if err != nil {
return stripecoinpayments.TransactionsPage{}, err
rateFloat, exact := rate.Float64()
if !exact {
// It's not clear at the time of writing whether this
// inexactness will ever be something we need to worry about.
// According to the example in the API docs for
// coinpayments.net, exchange rates are given to 24 decimal
// places (!!), which is several digits more precision than we
// can represent exactly in IEEE754 double-precision floating
// point. However, that might not matter, since an exchange rate
// that is correct to ~15 decimal places multiplied by a precise
// monetary.Amount should give results that are correct to
// around 15 decimal places still. At current exchange rates,
// for example, a USD transaction would need to have a value of
// more than $1,000,000,000,000 USD before a calculation using
// this "inexact" rate would get the equivalent number of BTC
// wrong by a single satoshi (10^-8 BTC).
//
// We could avoid all of this by preserving the exact rates as
// given by our provider, but this would involve either (a)
// abuse of the SQL schema (e.g. storing rates as decimal values
// in VARCHAR), (b) storing rates in a way that is opaque to the
// db engine (e.g. gob-encoding, decimal coefficient with
// separate exponents), or (c) adding support for parameterized
// types like NUMERIC to dbx. None of those are very ideal
// either.
delta, _ := rate.Sub(decimal.NewFromFloat(rateFloat)).Float64()
mon.FloatVal("inexact-float64-exchange-rate-delta").Observe(delta)
}
// TODO: the currency here should be passed in to this function or stored
// in the database.
page.Transactions = append(page.Transactions,
stripecoinpayments.Transaction{
ID: coinpayments.TransactionID(id),
AccountID: userID,
Address: address,
Amount: currency.AmountFromBaseUnits(*amountNumeric, currency.StorjToken),
Received: currency.AmountFromBaseUnits(*receivedNumeric, currency.StorjToken),
Status: coinpayments.Status(status),
Key: key,
CreatedAt: createdAt,
},
_, err = db.db.Create_StripecoinpaymentsTxConversionRate(ctx,
dbx.StripecoinpaymentsTxConversionRate_TxId(id.String()),
dbx.StripecoinpaymentsTxConversionRate_RateNumeric(rateFloat),
)
}
if err = rows.Err(); err != nil {
return stripecoinpayments.TransactionsPage{}, err
}
if len(page.Transactions) == limit+1 {
page.Next = true
page.NextOffset = offset + int64(limit)
page.Transactions = page.Transactions[:len(page.Transactions)-1]
}
return page, nil
return Error.Wrap(err)
}
// fromDBXCoinpaymentsTransaction converts *dbx.CoinpaymentsTransaction to *stripecoinpayments.Transaction.

View File

@ -814,21 +814,9 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
# storjscan chore interval to query new payments for all satellite deposit wallets
# payments.storjscan.interval: 1m0s
# amount of time we wait before running next account balance update loop
# payments.stripe-coin-payments.account-balance-update-interval: 2m0s
# toogle autoadvance feature for invoice creation
# payments.stripe-coin-payments.auto-advance: false
# coinpayments API private key key
# payments.stripe-coin-payments.coinpayments-private-key: ""
# coinpayments API public key
# payments.stripe-coin-payments.coinpayments-public-key: ""
# amount of time we wait before running next conversion rates update loop
# payments.stripe-coin-payments.conversion-rates-cycle-interval: 10m0s
# stripe free tier coupon ID
# payments.stripe-coin-payments.stripe-free-tier-coupon-id: ""
@ -838,9 +826,6 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
# stripe API secret key
# payments.stripe-coin-payments.stripe-secret-key: ""
# amount of time we wait before running next transaction update loop
# payments.stripe-coin-payments.transaction-update-interval: 2m0s
# how often to remove unused project bandwidth rollups
# project-bw-cleanup.interval: 168h0m0s