satellite/payments: add payments loop, update pending transactions (#3318)
This commit is contained in:
parent
521c39bda0
commit
1a304f5ef9
@ -41,6 +41,8 @@ import (
|
||||
"storj.io/storj/satellite/nodestats"
|
||||
"storj.io/storj/satellite/orders"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
"storj.io/storj/satellite/payments"
|
||||
"storj.io/storj/satellite/payments/paymentsconfig"
|
||||
"storj.io/storj/satellite/payments/stripecoinpayments"
|
||||
"storj.io/storj/satellite/repair/irreparable"
|
||||
"storj.io/storj/satellite/vouchers"
|
||||
@ -104,6 +106,11 @@ type API struct {
|
||||
Service *mailservice.Service
|
||||
}
|
||||
|
||||
Payments struct {
|
||||
Accounts payments.Accounts
|
||||
Clearing payments.Clearing
|
||||
}
|
||||
|
||||
Console struct {
|
||||
Listener net.Listener
|
||||
Service *console.Service
|
||||
@ -354,6 +361,24 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metai
|
||||
}
|
||||
}
|
||||
|
||||
{ // setup payments
|
||||
config := paymentsconfig.Config{}
|
||||
|
||||
service := stripecoinpayments.NewService(
|
||||
peer.Log.Named("stripecoinpayments service"),
|
||||
config.StripeCoinPayments,
|
||||
peer.DB.Customers(),
|
||||
peer.DB.CoinpaymentsTransactions())
|
||||
|
||||
clearing := stripecoinpayments.NewClearing(
|
||||
peer.Log.Named("stripecoinpayments clearing loop"),
|
||||
service,
|
||||
config.StripeCoinPayments.TransactionUpdateInterval)
|
||||
|
||||
peer.Payments.Accounts = service.Accounts()
|
||||
peer.Payments.Clearing = clearing
|
||||
}
|
||||
|
||||
{ // setup console
|
||||
log.Debug("Satellite API Process setting up console")
|
||||
consoleConfig := config.Console
|
||||
@ -365,14 +390,12 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metai
|
||||
return nil, errs.New("Auth token secret required")
|
||||
}
|
||||
|
||||
payments := stripecoinpayments.NewService(stripecoinpayments.Config{}, peer.DB.Customers(), peer.DB.CoinpaymentsTransactions())
|
||||
|
||||
peer.Console.Service, err = console.NewService(
|
||||
peer.Log.Named("console:service"),
|
||||
&consoleauth.Hmac{Secret: []byte(consoleConfig.AuthTokenSecret)},
|
||||
peer.DB.Console(),
|
||||
peer.DB.Rewards(),
|
||||
payments.Accounts(),
|
||||
peer.Payments.Accounts,
|
||||
consoleConfig.PasswordCost,
|
||||
)
|
||||
if err != nil {
|
||||
|
@ -51,7 +51,7 @@ func TestGrapqhlMutation(t *testing.T) {
|
||||
log := zaptest.NewLogger(t)
|
||||
|
||||
paymentsConfig := stripecoinpayments.Config{}
|
||||
payments := stripecoinpayments.NewService(paymentsConfig, db.Customers(), db.CoinpaymentsTransactions())
|
||||
payments := stripecoinpayments.NewService(log, paymentsConfig, db.Customers(), db.CoinpaymentsTransactions())
|
||||
|
||||
service, err := console.NewService(
|
||||
log,
|
||||
|
@ -32,7 +32,7 @@ func TestGraphqlQuery(t *testing.T) {
|
||||
log := zaptest.NewLogger(t)
|
||||
|
||||
paymentsConfig := stripecoinpayments.Config{}
|
||||
payments := stripecoinpayments.NewService(paymentsConfig, db.Customers(), db.CoinpaymentsTransactions())
|
||||
payments := stripecoinpayments.NewService(log, paymentsConfig, db.Customers(), db.CoinpaymentsTransactions())
|
||||
|
||||
service, err := console.NewService(
|
||||
log,
|
||||
|
15
satellite/payments/clearing.go
Normal file
15
satellite/payments/clearing.go
Normal file
@ -0,0 +1,15 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package payments
|
||||
|
||||
import "context"
|
||||
|
||||
// Clearing runs process of reconciling transactions deposits,
|
||||
// customer balance, invoices and usages.
|
||||
type Clearing interface {
|
||||
// Run runs payments clearing loop.
|
||||
Run(ctx context.Context) error
|
||||
// Closes closes payments clearing loop.
|
||||
Close() error
|
||||
}
|
12
satellite/payments/paymentsconfig/config.go
Normal file
12
satellite/payments/paymentsconfig/config.go
Normal file
@ -0,0 +1,12 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package paymentsconfig
|
||||
|
||||
import "storj.io/storj/satellite/payments/stripecoinpayments"
|
||||
|
||||
// Config defines global payments config.
|
||||
type Config struct {
|
||||
Provider string `help:"payments provider to use" default:""`
|
||||
StripeCoinPayments stripecoinpayments.Config
|
||||
}
|
61
satellite/payments/stripecoinpayments/clearing.go
Normal file
61
satellite/payments/stripecoinpayments/clearing.go
Normal file
@ -0,0 +1,61 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package stripecoinpayments
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/storj/internal/sync2"
|
||||
)
|
||||
|
||||
// ErrClearing is stripecoinpayments clearing loop error class.
|
||||
var ErrClearing = errs.Class("stripecoinpayments clearing error")
|
||||
|
||||
// Clearing runs process of reconciling transactions deposits,
|
||||
// customer balance, invoices and usages.
|
||||
type Clearing struct {
|
||||
log *zap.Logger
|
||||
service *Service
|
||||
TransactionCycle sync2.Cycle
|
||||
}
|
||||
|
||||
// NewClearing creates new clearing loop.
|
||||
func NewClearing(log *zap.Logger, service *Service, txInterval time.Duration) *Clearing {
|
||||
return &Clearing{
|
||||
log: log,
|
||||
service: service,
|
||||
TransactionCycle: *sync2.NewCycle(txInterval),
|
||||
}
|
||||
}
|
||||
|
||||
// Run runs all clearing related cycles.
|
||||
func (clearing *Clearing) Run(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
err = clearing.TransactionCycle.Run(ctx,
|
||||
func(ctx context.Context) error {
|
||||
clearing.log.Info("running transactions update cycle")
|
||||
|
||||
if err := clearing.service.updateTransactionsLoop(ctx); err != nil {
|
||||
clearing.log.Error("transaction update cycle failed", zap.Error(ErrClearing.Wrap(err)))
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
)
|
||||
|
||||
return ErrClearing.Wrap(err)
|
||||
}
|
||||
|
||||
// Close closes all underlying resources.
|
||||
func (clearing *Clearing) Close() (err error) {
|
||||
defer mon.Task()(nil)(&err)
|
||||
|
||||
clearing.TransactionCycle.Close()
|
||||
return nil
|
||||
}
|
@ -4,8 +4,12 @@
|
||||
package stripecoinpayments
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/stripe/stripe-go/client"
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
monkit "gopkg.in/spacemonkeygo/monkit.v2"
|
||||
|
||||
"storj.io/storj/satellite/payments"
|
||||
@ -19,24 +23,26 @@ var Error = errs.Class("stripecoinpayments service error")
|
||||
|
||||
// Config stores needed information for payment service initialization.
|
||||
type Config struct {
|
||||
StripeSecretKey string
|
||||
CoinpaymentsPublicKey string
|
||||
CoinpaymentsPrivateKey string
|
||||
StripeSecretKey string `help:"stripe API secret key" default:""`
|
||||
CoinpaymentsPublicKey string `help:"coinpayments API public key" default:""`
|
||||
CoinpaymentsPrivateKey string `help:"coinpayments API preivate key key" default:""`
|
||||
TransactionUpdateInterval time.Duration `help:"amount of time we wait before running next transaction update loop" devDefault:"1m" releaseDefault:"30m"`
|
||||
}
|
||||
|
||||
// Service is an implementation for payment service via Stripe and Coinpayments.
|
||||
type Service struct {
|
||||
log *zap.Logger
|
||||
customers CustomersDB
|
||||
transactionsDB TransactionsDB
|
||||
stripeClient *client.API
|
||||
coinpayments coinpayments.Client
|
||||
coinPayments *coinpayments.Client
|
||||
}
|
||||
|
||||
// NewService creates a Service instance.
|
||||
func NewService(config Config, customers CustomersDB, transactionsDB TransactionsDB) *Service {
|
||||
func NewService(log *zap.Logger, config Config, customers CustomersDB, transactionsDB TransactionsDB) *Service {
|
||||
stripeClient := client.New(config.StripeSecretKey, nil)
|
||||
|
||||
coinpaymentsClient := coinpayments.NewClient(
|
||||
coinPaymentsClient := coinpayments.NewClient(
|
||||
coinpayments.Credentials{
|
||||
PublicKey: config.CoinpaymentsPublicKey,
|
||||
PrivateKey: config.CoinpaymentsPrivateKey,
|
||||
@ -44,10 +50,11 @@ func NewService(config Config, customers CustomersDB, transactionsDB Transaction
|
||||
)
|
||||
|
||||
return &Service{
|
||||
log: log,
|
||||
customers: customers,
|
||||
transactionsDB: transactionsDB,
|
||||
stripeClient: stripeClient,
|
||||
coinpayments: *coinpaymentsClient,
|
||||
coinPayments: coinPaymentsClient,
|
||||
}
|
||||
}
|
||||
|
||||
@ -55,3 +62,72 @@ func NewService(config Config, customers CustomersDB, transactionsDB Transaction
|
||||
func (service *Service) Accounts() payments.Accounts {
|
||||
return &accounts{service: service}
|
||||
}
|
||||
|
||||
// updateTransactionsLoop updates all pending transactions in a loop.
|
||||
func (service *Service) updateTransactionsLoop(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
const (
|
||||
limit = 25
|
||||
)
|
||||
|
||||
before := time.Now()
|
||||
|
||||
txsPage, err := service.transactionsDB.ListPending(ctx, 0, limit, before)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := service.updateTransactions(ctx, txsPage.IDList()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for txsPage.Next {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
txsPage, err = service.transactionsDB.ListPending(ctx, txsPage.NextOffset, limit, before)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := service.updateTransactions(ctx, txsPage.IDList()); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateTransactions updates statuses and received amount for given transactions.
|
||||
func (service *Service) updateTransactions(ctx context.Context, ids coinpayments.TransactionIDList) (err error) {
|
||||
defer mon.Task()(&ctx, ids)(&err)
|
||||
|
||||
if len(ids) == 0 {
|
||||
service.log.Debug("no transactions found, skipping update")
|
||||
return nil
|
||||
}
|
||||
|
||||
infos, err := service.coinPayments.Transactions().ListInfos(ctx, ids)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var updates []TransactionUpdate
|
||||
for id, info := range infos {
|
||||
updates = append(updates,
|
||||
TransactionUpdate{
|
||||
TransactionID: id,
|
||||
Status: info.Status,
|
||||
Received: info.Received,
|
||||
},
|
||||
)
|
||||
|
||||
// TODO: update stripe customer balance
|
||||
}
|
||||
|
||||
return service.transactionsDB.Update(ctx, updates)
|
||||
}
|
||||
|
@ -38,7 +38,7 @@ func (tokens *storjTokens) Deposit(ctx context.Context, userID uuid.UUID, amount
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
tx, err := tokens.service.coinpayments.Transactions().Create(ctx,
|
||||
tx, err := tokens.service.coinPayments.Transactions().Create(ctx,
|
||||
coinpayments.CreateTX{
|
||||
Amount: amount,
|
||||
CurrencyIn: coinpayments.CurrencyLTCT,
|
||||
|
@ -20,6 +20,10 @@ import (
|
||||
type TransactionsDB interface {
|
||||
// Insert inserts new coinpayments transaction into DB.
|
||||
Insert(ctx context.Context, tx Transaction) (*Transaction, error)
|
||||
// Update updates status and received for set of transactions.
|
||||
Update(ctx context.Context, updates []TransactionUpdate) error
|
||||
// ListPending returns TransactionsPage with pending transactions.
|
||||
ListPending(ctx context.Context, offset int64, limit int, before time.Time) (TransactionsPage, error)
|
||||
}
|
||||
|
||||
// Transaction defines coinpayments transaction info that is stored in the DB.
|
||||
@ -33,3 +37,27 @@ type Transaction struct {
|
||||
Key string
|
||||
CreatedAt time.Time
|
||||
}
|
||||
|
||||
// TransactionUpdate holds transaction update info.
|
||||
type TransactionUpdate struct {
|
||||
TransactionID coinpayments.TransactionID
|
||||
Status coinpayments.Status
|
||||
Received big.Float
|
||||
}
|
||||
|
||||
// TransactionsPage holds set of transaction and indicates if
|
||||
// there are more transactions to fetch.
|
||||
type TransactionsPage struct {
|
||||
Transactions []Transaction
|
||||
Next bool
|
||||
NextOffset int64
|
||||
}
|
||||
|
||||
// IDList returns transaction id list of page's transactions.
|
||||
func (page *TransactionsPage) IDList() coinpayments.TransactionIDList {
|
||||
var list coinpayments.TransactionIDList
|
||||
for _, tx := range page.Transactions {
|
||||
list = append(list, tx.ID)
|
||||
}
|
||||
return list
|
||||
}
|
||||
|
@ -4,57 +4,142 @@
|
||||
package stripecoinpayments_test
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"math/big"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/skyrings/skyring-common/tools/uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"storj.io/storj/internal/memory"
|
||||
"storj.io/storj/internal/testcontext"
|
||||
"storj.io/storj/internal/testrand"
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/payments/coinpayments"
|
||||
"storj.io/storj/satellite/payments/stripecoinpayments"
|
||||
"storj.io/storj/satellite/satellitedb/satellitedbtest"
|
||||
)
|
||||
|
||||
func TestTransactionsDB(t *testing.T) {
|
||||
func TestInsertUpdate(t *testing.T) {
|
||||
satellitedbtest.Run(t, func(t *testing.T, db satellite.DB) {
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
transactions := db.CoinpaymentsTransactions()
|
||||
|
||||
amount, ok := new(big.Float).SetPrec(1000).SetString("2.0000000000000000005")
|
||||
require.True(t, ok)
|
||||
received, ok := new(big.Float).SetPrec(1000).SetString("1.0000000000000000003")
|
||||
require.True(t, ok)
|
||||
|
||||
createTx := stripecoinpayments.Transaction{
|
||||
ID: "testID",
|
||||
AccountID: uuid.UUID{1, 2, 3},
|
||||
Address: "testAddress",
|
||||
Amount: *amount,
|
||||
Received: *received,
|
||||
Status: coinpayments.StatusReceived,
|
||||
Key: "testKey",
|
||||
}
|
||||
|
||||
t.Run("insert", func(t *testing.T) {
|
||||
amount, received := new(big.Float).SetPrec(1000), new(big.Float).SetPrec(1000)
|
||||
|
||||
amount, ok := amount.SetString("2.0000000000000000005")
|
||||
require.True(t, ok)
|
||||
received, ok = received.SetString("1.0000000000000000003")
|
||||
require.True(t, ok)
|
||||
|
||||
createTx := stripecoinpayments.Transaction{
|
||||
ID: "testID",
|
||||
AccountID: uuid.UUID{1, 2, 3},
|
||||
Address: "testAddress",
|
||||
Amount: *amount,
|
||||
Received: *received,
|
||||
Status: coinpayments.StatusReceived,
|
||||
Key: "testKey",
|
||||
}
|
||||
|
||||
tx, err := transactions.Insert(ctx, createTx)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, tx)
|
||||
|
||||
assert.Equal(t, createTx.ID, tx.ID)
|
||||
assert.Equal(t, createTx.AccountID, tx.AccountID)
|
||||
assert.Equal(t, createTx.Address, tx.Address)
|
||||
assert.Equal(t, createTx.Amount, tx.Amount)
|
||||
assert.Equal(t, createTx.Received, tx.Received)
|
||||
assert.Equal(t, createTx.Status, tx.Status)
|
||||
assert.Equal(t, createTx.Key, tx.Key)
|
||||
assert.False(t, tx.CreatedAt.IsZero())
|
||||
compareTransactions(t, createTx, *tx)
|
||||
})
|
||||
|
||||
t.Run("update", func(t *testing.T) {
|
||||
received, ok := new(big.Float).SetPrec(1000).SetString("6.0000000000000000001")
|
||||
require.True(t, ok)
|
||||
|
||||
update := stripecoinpayments.TransactionUpdate{
|
||||
TransactionID: createTx.ID,
|
||||
Status: coinpayments.StatusPending,
|
||||
Received: *received,
|
||||
}
|
||||
|
||||
err := transactions.Update(ctx, []stripecoinpayments.TransactionUpdate{update})
|
||||
require.NoError(t, err)
|
||||
|
||||
page, err := transactions.ListPending(ctx, 0, 1, time.Now())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NotNil(t, page.Transactions)
|
||||
require.Equal(t, 1, len(page.Transactions))
|
||||
assert.Equal(t, createTx.ID, page.Transactions[0].ID)
|
||||
assert.Equal(t, update.Received, page.Transactions[0].Received)
|
||||
assert.Equal(t, update.Status, page.Transactions[0].Status)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestList(t *testing.T) {
|
||||
satellitedbtest.Run(t, func(t *testing.T, db satellite.DB) {
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
transactions := db.CoinpaymentsTransactions()
|
||||
|
||||
const (
|
||||
transactionCount = 10
|
||||
)
|
||||
|
||||
// create transactions
|
||||
amount, ok := new(big.Float).SetPrec(1000).SetString("4.0000000000000000005")
|
||||
require.True(t, ok)
|
||||
received, ok := new(big.Float).SetPrec(1000).SetString("5.0000000000000000003")
|
||||
require.True(t, ok)
|
||||
|
||||
var txs []stripecoinpayments.Transaction
|
||||
for i := 0; i < transactionCount; i++ {
|
||||
id := base64.StdEncoding.EncodeToString(testrand.Bytes(4 * memory.B))
|
||||
addr := base64.StdEncoding.EncodeToString(testrand.Bytes(4 * memory.B))
|
||||
key := base64.StdEncoding.EncodeToString(testrand.Bytes(4 * memory.B))
|
||||
|
||||
createTX := stripecoinpayments.Transaction{
|
||||
ID: coinpayments.TransactionID(id),
|
||||
AccountID: uuid.UUID{},
|
||||
Address: addr,
|
||||
Amount: *amount,
|
||||
Received: *received,
|
||||
Status: coinpayments.StatusPending,
|
||||
Key: key,
|
||||
}
|
||||
|
||||
_, err := transactions.Insert(ctx, createTX)
|
||||
require.NoError(t, err)
|
||||
|
||||
txs = append(txs, createTX)
|
||||
}
|
||||
|
||||
page, err := transactions.ListPending(ctx, 0, transactionCount, time.Now())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, transactionCount, len(page.Transactions))
|
||||
|
||||
for _, act := range page.Transactions {
|
||||
for _, exp := range txs {
|
||||
if act.ID == exp.ID {
|
||||
compareTransactions(t, exp, act)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// compareTransactions is a helper method to compare tx used to create db entry,
|
||||
// with the tx returned from the db. Method doesn't compare created at field, but
|
||||
// ensures that is not empty.
|
||||
func compareTransactions(t *testing.T, exp, act stripecoinpayments.Transaction) {
|
||||
assert.Equal(t, exp.ID, act.ID)
|
||||
assert.Equal(t, exp.AccountID, act.AccountID)
|
||||
assert.Equal(t, exp.Address, act.Address)
|
||||
assert.Equal(t, exp.Amount, act.Amount)
|
||||
assert.Equal(t, exp.Received, act.Received)
|
||||
assert.Equal(t, exp.Status, act.Status)
|
||||
assert.Equal(t, exp.Key, act.Key)
|
||||
assert.False(t, act.CreatedAt.IsZero())
|
||||
}
|
||||
|
@ -50,6 +50,8 @@ import (
|
||||
"storj.io/storj/satellite/nodestats"
|
||||
"storj.io/storj/satellite/orders"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
"storj.io/storj/satellite/payments"
|
||||
"storj.io/storj/satellite/payments/paymentsconfig"
|
||||
"storj.io/storj/satellite/payments/stripecoinpayments"
|
||||
"storj.io/storj/satellite/repair/checker"
|
||||
"storj.io/storj/satellite/repair/irreparable"
|
||||
@ -224,6 +226,11 @@ type Peer struct {
|
||||
Endpoint *vouchers.Endpoint
|
||||
}
|
||||
|
||||
Payments struct {
|
||||
Accounts payments.Accounts
|
||||
Clearing payments.Clearing
|
||||
}
|
||||
|
||||
Console struct {
|
||||
Listener net.Listener
|
||||
Service *console.Service
|
||||
@ -581,6 +588,24 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo
|
||||
}
|
||||
}
|
||||
|
||||
{ // setup payments
|
||||
config := paymentsconfig.Config{}
|
||||
|
||||
service := stripecoinpayments.NewService(
|
||||
peer.Log.Named("stripecoinpayments service"),
|
||||
config.StripeCoinPayments,
|
||||
peer.DB.Customers(),
|
||||
peer.DB.CoinpaymentsTransactions())
|
||||
|
||||
clearing := stripecoinpayments.NewClearing(
|
||||
peer.Log.Named("stripecoinpayments clearing loop"),
|
||||
service,
|
||||
config.StripeCoinPayments.TransactionUpdateInterval)
|
||||
|
||||
peer.Payments.Accounts = service.Accounts()
|
||||
peer.Payments.Clearing = clearing
|
||||
}
|
||||
|
||||
{ // setup console
|
||||
log.Debug("Setting up console")
|
||||
consoleConfig := config.Console
|
||||
@ -594,14 +619,12 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo
|
||||
return nil, errs.New("Auth token secret required")
|
||||
}
|
||||
|
||||
payments := stripecoinpayments.NewService(stripecoinpayments.Config{}, peer.DB.Customers(), peer.DB.CoinpaymentsTransactions())
|
||||
|
||||
peer.Console.Service, err = console.NewService(
|
||||
peer.Log.Named("console:service"),
|
||||
&consoleauth.Hmac{Secret: []byte(consoleConfig.AuthTokenSecret)},
|
||||
peer.DB.Console(),
|
||||
peer.DB.Rewards(),
|
||||
payments.Accounts(),
|
||||
peer.Payments.Accounts,
|
||||
consoleConfig.PasswordCost,
|
||||
)
|
||||
|
||||
|
@ -6,6 +6,7 @@ package satellitedb
|
||||
import (
|
||||
"context"
|
||||
"math/big"
|
||||
"time"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
@ -51,6 +52,71 @@ func (db *coinpaymentsTransactions) Insert(ctx context.Context, tx stripecoinpay
|
||||
return fromDBXCoinpaymentsTransaction(dbxCPTX)
|
||||
}
|
||||
|
||||
// Update updates status and received for set of transactions.
|
||||
func (db *coinpaymentsTransactions) Update(ctx context.Context, updates []stripecoinpayments.TransactionUpdate) error {
|
||||
if len(updates) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
return db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
|
||||
for _, update := range updates {
|
||||
received, err := update.Received.GobEncode()
|
||||
if err != nil {
|
||||
return errs.Wrap(err)
|
||||
}
|
||||
|
||||
_, err = tx.Update_CoinpaymentsTransaction_By_Id(ctx,
|
||||
dbx.CoinpaymentsTransaction_Id(update.TransactionID.String()),
|
||||
dbx.CoinpaymentsTransaction_Update_Fields{
|
||||
Received: dbx.CoinpaymentsTransaction_Received(received),
|
||||
Status: dbx.CoinpaymentsTransaction_Status(update.Status.Int()),
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// ListPending returns paginated list of pending transactions.
|
||||
func (db *coinpaymentsTransactions) ListPending(ctx context.Context, offset int64, limit int, before time.Time) (stripecoinpayments.TransactionsPage, error) {
|
||||
var page stripecoinpayments.TransactionsPage
|
||||
|
||||
dbxTXs, err := db.db.Limited_CoinpaymentsTransaction_By_CreatedAt_LessOrEqual_And_Status_OrderBy_Desc_CreatedAt(
|
||||
ctx,
|
||||
dbx.CoinpaymentsTransaction_CreatedAt(before.UTC()),
|
||||
dbx.CoinpaymentsTransaction_Status(coinpayments.StatusPending.Int()),
|
||||
limit+1,
|
||||
offset,
|
||||
)
|
||||
if err != nil {
|
||||
return stripecoinpayments.TransactionsPage{}, err
|
||||
}
|
||||
|
||||
if len(dbxTXs) == limit+1 {
|
||||
page.Next = true
|
||||
page.NextOffset = offset + int64(limit) + 1
|
||||
|
||||
dbxTXs = dbxTXs[:len(dbxTXs)-1]
|
||||
}
|
||||
|
||||
var txs []stripecoinpayments.Transaction
|
||||
for _, dbxTX := range dbxTXs {
|
||||
tx, err := fromDBXCoinpaymentsTransaction(dbxTX)
|
||||
if err != nil {
|
||||
return stripecoinpayments.TransactionsPage{}, err
|
||||
}
|
||||
|
||||
txs = append(txs, *tx)
|
||||
}
|
||||
|
||||
page.Transactions = txs
|
||||
return page, nil
|
||||
}
|
||||
|
||||
// fromDBXCoinpaymentsTransaction converts *dbx.CoinpaymentsTransaction to *stripecoinpayments.Transaction.
|
||||
func fromDBXCoinpaymentsTransaction(dbxCPTX *dbx.CoinpaymentsTransaction) (*stripecoinpayments.Transaction, error) {
|
||||
userID, err := bytesToUUID(dbxCPTX.UserId)
|
||||
|
@ -909,3 +909,11 @@ model coinpayments_transaction (
|
||||
)
|
||||
|
||||
create coinpayments_transaction ()
|
||||
update coinpayments_transaction ( where coinpayments_transaction.id = ? )
|
||||
|
||||
read limitoffset (
|
||||
select coinpayments_transaction
|
||||
where coinpayments_transaction.created_at <= ?
|
||||
where coinpayments_transaction.status = ?
|
||||
orderby desc coinpayments_transaction.created_at
|
||||
)
|
||||
|
@ -8239,6 +8239,43 @@ func (obj *postgresImpl) Get_StripeCustomer_CustomerId_By_UserId(ctx context.Con
|
||||
|
||||
}
|
||||
|
||||
func (obj *postgresImpl) Limited_CoinpaymentsTransaction_By_CreatedAt_LessOrEqual_And_Status_OrderBy_Desc_CreatedAt(ctx context.Context,
|
||||
coinpayments_transaction_created_at_less_or_equal CoinpaymentsTransaction_CreatedAt_Field,
|
||||
coinpayments_transaction_status CoinpaymentsTransaction_Status_Field,
|
||||
limit int, offset int64) (
|
||||
rows []*CoinpaymentsTransaction, err error) {
|
||||
|
||||
var __embed_stmt = __sqlbundle_Literal("SELECT coinpayments_transactions.id, coinpayments_transactions.user_id, coinpayments_transactions.address, coinpayments_transactions.amount, coinpayments_transactions.received, coinpayments_transactions.status, coinpayments_transactions.key, coinpayments_transactions.created_at FROM coinpayments_transactions WHERE coinpayments_transactions.created_at <= ? AND coinpayments_transactions.status = ? ORDER BY coinpayments_transactions.created_at DESC LIMIT ? OFFSET ?")
|
||||
|
||||
var __values []interface{}
|
||||
__values = append(__values, coinpayments_transaction_created_at_less_or_equal.value(), coinpayments_transaction_status.value())
|
||||
|
||||
__values = append(__values, limit, offset)
|
||||
|
||||
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
|
||||
obj.logStmt(__stmt, __values...)
|
||||
|
||||
__rows, err := obj.driver.Query(__stmt, __values...)
|
||||
if err != nil {
|
||||
return nil, obj.makeErr(err)
|
||||
}
|
||||
defer __rows.Close()
|
||||
|
||||
for __rows.Next() {
|
||||
coinpayments_transaction := &CoinpaymentsTransaction{}
|
||||
err = __rows.Scan(&coinpayments_transaction.Id, &coinpayments_transaction.UserId, &coinpayments_transaction.Address, &coinpayments_transaction.Amount, &coinpayments_transaction.Received, &coinpayments_transaction.Status, &coinpayments_transaction.Key, &coinpayments_transaction.CreatedAt)
|
||||
if err != nil {
|
||||
return nil, obj.makeErr(err)
|
||||
}
|
||||
rows = append(rows, coinpayments_transaction)
|
||||
}
|
||||
if err := __rows.Err(); err != nil {
|
||||
return nil, obj.makeErr(err)
|
||||
}
|
||||
return rows, nil
|
||||
|
||||
}
|
||||
|
||||
func (obj *postgresImpl) Update_PendingAudits_By_NodeId(ctx context.Context,
|
||||
pending_audits_node_id PendingAudits_NodeId_Field,
|
||||
update PendingAudits_Update_Fields) (
|
||||
@ -9258,6 +9295,51 @@ func (obj *postgresImpl) UpdateNoReturn_GracefulExitTransferQueue_By_NodeId_And_
|
||||
return nil
|
||||
}
|
||||
|
||||
func (obj *postgresImpl) Update_CoinpaymentsTransaction_By_Id(ctx context.Context,
|
||||
coinpayments_transaction_id CoinpaymentsTransaction_Id_Field,
|
||||
update CoinpaymentsTransaction_Update_Fields) (
|
||||
coinpayments_transaction *CoinpaymentsTransaction, err error) {
|
||||
var __sets = &__sqlbundle_Hole{}
|
||||
|
||||
var __embed_stmt = __sqlbundle_Literals{Join: "", SQLs: []__sqlbundle_SQL{__sqlbundle_Literal("UPDATE coinpayments_transactions SET "), __sets, __sqlbundle_Literal(" WHERE coinpayments_transactions.id = ? RETURNING coinpayments_transactions.id, coinpayments_transactions.user_id, coinpayments_transactions.address, coinpayments_transactions.amount, coinpayments_transactions.received, coinpayments_transactions.status, coinpayments_transactions.key, coinpayments_transactions.created_at")}}
|
||||
|
||||
__sets_sql := __sqlbundle_Literals{Join: ", "}
|
||||
var __values []interface{}
|
||||
var __args []interface{}
|
||||
|
||||
if update.Received._set {
|
||||
__values = append(__values, update.Received.value())
|
||||
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("received = ?"))
|
||||
}
|
||||
|
||||
if update.Status._set {
|
||||
__values = append(__values, update.Status.value())
|
||||
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("status = ?"))
|
||||
}
|
||||
|
||||
if len(__sets_sql.SQLs) == 0 {
|
||||
return nil, emptyUpdate()
|
||||
}
|
||||
|
||||
__args = append(__args, coinpayments_transaction_id.value())
|
||||
|
||||
__values = append(__values, __args...)
|
||||
__sets.SQL = __sets_sql
|
||||
|
||||
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
|
||||
obj.logStmt(__stmt, __values...)
|
||||
|
||||
coinpayments_transaction = &CoinpaymentsTransaction{}
|
||||
err = obj.driver.QueryRow(__stmt, __values...).Scan(&coinpayments_transaction.Id, &coinpayments_transaction.UserId, &coinpayments_transaction.Address, &coinpayments_transaction.Amount, &coinpayments_transaction.Received, &coinpayments_transaction.Status, &coinpayments_transaction.Key, &coinpayments_transaction.CreatedAt)
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, obj.makeErr(err)
|
||||
}
|
||||
return coinpayments_transaction, nil
|
||||
}
|
||||
|
||||
func (obj *postgresImpl) Delete_ValueAttribution_By_ProjectId_And_BucketName(ctx context.Context,
|
||||
value_attribution_project_id ValueAttribution_ProjectId_Field,
|
||||
value_attribution_bucket_name ValueAttribution_BucketName_Field) (
|
||||
@ -11215,6 +11297,18 @@ func (rx *Rx) Limited_BucketUsage_By_BucketId_And_RollupEndTime_Greater_And_Roll
|
||||
return tx.Limited_BucketUsage_By_BucketId_And_RollupEndTime_Greater_And_RollupEndTime_LessOrEqual_OrderBy_Desc_RollupEndTime(ctx, bucket_usage_bucket_id, bucket_usage_rollup_end_time_greater, bucket_usage_rollup_end_time_less_or_equal, limit, offset)
|
||||
}
|
||||
|
||||
func (rx *Rx) Limited_CoinpaymentsTransaction_By_CreatedAt_LessOrEqual_And_Status_OrderBy_Desc_CreatedAt(ctx context.Context,
|
||||
coinpayments_transaction_created_at_less_or_equal CoinpaymentsTransaction_CreatedAt_Field,
|
||||
coinpayments_transaction_status CoinpaymentsTransaction_Status_Field,
|
||||
limit int, offset int64) (
|
||||
rows []*CoinpaymentsTransaction, err error) {
|
||||
var tx *Tx
|
||||
if tx, err = rx.getTx(ctx); err != nil {
|
||||
return
|
||||
}
|
||||
return tx.Limited_CoinpaymentsTransaction_By_CreatedAt_LessOrEqual_And_Status_OrderBy_Desc_CreatedAt(ctx, coinpayments_transaction_created_at_less_or_equal, coinpayments_transaction_status, limit, offset)
|
||||
}
|
||||
|
||||
func (rx *Rx) Limited_Irreparabledb_By_Segmentpath_Greater_OrderBy_Asc_Segmentpath(ctx context.Context,
|
||||
irreparabledb_segmentpath_greater Irreparabledb_Segmentpath_Field,
|
||||
limit int, offset int64) (
|
||||
@ -11360,6 +11454,17 @@ func (rx *Rx) Update_BucketMetainfo_By_ProjectId_And_Name(ctx context.Context,
|
||||
return tx.Update_BucketMetainfo_By_ProjectId_And_Name(ctx, bucket_metainfo_project_id, bucket_metainfo_name, update)
|
||||
}
|
||||
|
||||
func (rx *Rx) Update_CoinpaymentsTransaction_By_Id(ctx context.Context,
|
||||
coinpayments_transaction_id CoinpaymentsTransaction_Id_Field,
|
||||
update CoinpaymentsTransaction_Update_Fields) (
|
||||
coinpayments_transaction *CoinpaymentsTransaction, err error) {
|
||||
var tx *Tx
|
||||
if tx, err = rx.getTx(ctx); err != nil {
|
||||
return
|
||||
}
|
||||
return tx.Update_CoinpaymentsTransaction_By_Id(ctx, coinpayments_transaction_id, update)
|
||||
}
|
||||
|
||||
func (rx *Rx) Update_Node_By_Id(ctx context.Context,
|
||||
node_id Node_Id_Field,
|
||||
update Node_Update_Fields) (
|
||||
@ -11952,6 +12057,12 @@ type Methods interface {
|
||||
limit int, offset int64) (
|
||||
rows []*BucketUsage, err error)
|
||||
|
||||
Limited_CoinpaymentsTransaction_By_CreatedAt_LessOrEqual_And_Status_OrderBy_Desc_CreatedAt(ctx context.Context,
|
||||
coinpayments_transaction_created_at_less_or_equal CoinpaymentsTransaction_CreatedAt_Field,
|
||||
coinpayments_transaction_status CoinpaymentsTransaction_Status_Field,
|
||||
limit int, offset int64) (
|
||||
rows []*CoinpaymentsTransaction, err error)
|
||||
|
||||
Limited_Irreparabledb_By_Segmentpath_Greater_OrderBy_Asc_Segmentpath(ctx context.Context,
|
||||
irreparabledb_segmentpath_greater Irreparabledb_Segmentpath_Field,
|
||||
limit int, offset int64) (
|
||||
@ -12019,6 +12130,11 @@ type Methods interface {
|
||||
update BucketMetainfo_Update_Fields) (
|
||||
bucket_metainfo *BucketMetainfo, err error)
|
||||
|
||||
Update_CoinpaymentsTransaction_By_Id(ctx context.Context,
|
||||
coinpayments_transaction_id CoinpaymentsTransaction_Id_Field,
|
||||
update CoinpaymentsTransaction_Update_Fields) (
|
||||
coinpayments_transaction *CoinpaymentsTransaction, err error)
|
||||
|
||||
Update_Node_By_Id(ctx context.Context,
|
||||
node_id Node_Id_Field,
|
||||
update Node_Update_Fields) (
|
||||
|
Loading…
Reference in New Issue
Block a user