satellite/payments/stripecoinpayments: storjscan invoice generation

Add line item with unclaimed Storjscan wallet balance during invoice generation.

Change-Id: I018bfa01abfcf7bfdffba0c5a1350a69188f63d5
This commit is contained in:
dlamarmorgan 2022-05-10 12:19:53 -07:00
parent 5f6892f95c
commit 92be1d878f
21 changed files with 352 additions and 118 deletions

View File

@ -60,6 +60,8 @@ func setupPayments(log *zap.Logger, db satellite.DB) (*stripecoinpayments.Servic
stripeClient,
pc.StripeCoinPayments,
db.StripeCoinPayments(),
db.Wallets(),
db.Billing(),
db.Console().Projects(),
db.ProjectAccounting(),
pc.StorageTBPrice,

View File

@ -200,12 +200,18 @@ var (
Args: cobra.ExactArgs(1),
RunE: cmdPrepareCustomerInvoiceRecords,
}
createCustomerInvoiceItemsCmd = &cobra.Command{
Use: "create-invoice-items [period]",
Short: "Creates stripe invoice line items",
createCustomerProjectInvoiceItemsCmd = &cobra.Command{
Use: "create-project-invoice-items [period]",
Short: "Creates stripe invoice line items for project charges",
Long: "Creates stripe invoice line items for not consumed project records.",
Args: cobra.ExactArgs(1),
RunE: cmdCreateCustomerInvoiceItems,
RunE: cmdCreateCustomerProjectInvoiceItems,
}
createCustomerTokenInvoiceItemsCmd = &cobra.Command{
Use: "create-token-invoice-items [period]",
Short: "Creates stripe invoice line items for token payments",
Long: "Creates stripe invoice line items for unapplied token balances.",
RunE: cmdCreateCustomerTokenInvoiceItems,
}
createCustomerInvoicesCmd = &cobra.Command{
Use: "create-invoices [period]",
@ -331,7 +337,8 @@ func init() {
compensationCmd.AddCommand(recordOneOffPaymentsCmd)
billingCmd.AddCommand(applyFreeTierCouponsCmd)
billingCmd.AddCommand(prepareCustomerInvoiceRecordsCmd)
billingCmd.AddCommand(createCustomerInvoiceItemsCmd)
billingCmd.AddCommand(createCustomerProjectInvoiceItemsCmd)
billingCmd.AddCommand(createCustomerTokenInvoiceItemsCmd)
billingCmd.AddCommand(createCustomerInvoicesCmd)
billingCmd.AddCommand(finalizeCustomerInvoicesCmd)
billingCmd.AddCommand(stripeCustomerCmd)
@ -356,7 +363,8 @@ func init() {
process.Bind(partnerAttributionCmd, &partnerAttribtionCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(applyFreeTierCouponsCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(prepareCustomerInvoiceRecordsCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(createCustomerInvoiceItemsCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(createCustomerProjectInvoiceItemsCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(createCustomerTokenInvoiceItemsCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(createCustomerInvoicesCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(finalizeCustomerInvoicesCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(stripeCustomerCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
@ -718,7 +726,7 @@ func cmdPrepareCustomerInvoiceRecords(cmd *cobra.Command, args []string) (err er
})
}
func cmdCreateCustomerInvoiceItems(cmd *cobra.Command, args []string) (err error) {
func cmdCreateCustomerProjectInvoiceItems(cmd *cobra.Command, args []string) (err error) {
ctx, _ := process.Ctx(cmd)
period, err := parseBillingPeriod(args[0])
@ -731,6 +739,14 @@ func cmdCreateCustomerInvoiceItems(cmd *cobra.Command, args []string) (err error
})
}
func cmdCreateCustomerTokenInvoiceItems(cmd *cobra.Command, args []string) (err error) {
ctx, _ := process.Ctx(cmd)
return runBillingCmd(ctx, func(ctx context.Context, payments *stripecoinpayments.Service, _ satellite.DB) error {
return payments.InvoiceApplyTokenBalance(ctx)
})
}
func cmdCreateCustomerInvoices(cmd *cobra.Command, args []string) (err error) {
ctx, _ := process.Ctx(cmd)

View File

@ -150,6 +150,8 @@ func NewAdmin(log *zap.Logger, full *identity.FullIdentity, db DB, metabaseDB *m
stripeClient,
pc.StripeCoinPayments,
peer.DB.StripeCoinPayments(),
peer.DB.Wallets(),
peer.DB.Billing(),
peer.DB.Console().Projects(),
peer.DB.ProjectAccounting(),
pc.StorageTBPrice,

View File

@ -506,6 +506,8 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
stripeClient,
pc.StripeCoinPayments,
peer.DB.StripeCoinPayments(),
peer.DB.Wallets(),
peer.DB.Billing(),
peer.DB.Console().Projects(),
peer.DB.ProjectAccounting(),
pc.StorageTBPrice,

View File

@ -87,6 +87,8 @@ func TestGraphqlMutation(t *testing.T) {
),
pc.StripeCoinPayments,
db.StripeCoinPayments(),
db.Wallets(),
db.Billing(),
db.Console().Projects(),
db.ProjectAccounting(),
pc.StorageTBPrice,

View File

@ -71,6 +71,8 @@ func TestGraphqlQuery(t *testing.T) {
),
pc.StripeCoinPayments,
db.StripeCoinPayments(),
db.Wallets(),
db.Billing(),
db.Console().Projects(),
db.ProjectAccounting(),
pc.StorageTBPrice,

View File

@ -525,6 +525,8 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
stripeClient,
pc.StripeCoinPayments,
peer.DB.StripeCoinPayments(),
peer.DB.Wallets(),
peer.DB.Billing(),
peer.DB.Console().Projects(),
peer.DB.ProjectAccounting(),
pc.StorageTBPrice,

View File

@ -7,6 +7,8 @@ import (
"context"
"time"
"github.com/zeebo/errs"
"storj.io/common/uuid"
"storj.io/storj/satellite/payments/monetary"
)
@ -14,13 +16,16 @@ import (
// TransactionStatus indicates transaction status.
type TransactionStatus string
// ErrInsufficientFunds represents err when a user balance is too low for some transaction.
var ErrInsufficientFunds = errs.New("Insufficient funds for this transaction")
const (
// TransactionStatusPending indicates that status of this transaction is pending.
TransactionStatusPending = "pending"
// TransactionStatusCancelled indicates that status of this transaction is cancelled.
TransactionStatusCancelled = "cancelled"
// TransactionStatusComplete indicates that status of this transaction is complete.
TransactionStatusComplete = "complete"
// TransactionStatusCompleted indicates that status of this transaction is complete.
TransactionStatusCompleted = "complete"
)
// TransactionType indicates transaction type.

View File

@ -20,10 +20,6 @@ import (
"storj.io/storj/satellite/satellitedb/satellitedbtest"
)
func TestInsertBatch(t *testing.T) {
}
func TestTransactionsDBList(t *testing.T) {
const (
limit = 3
@ -38,7 +34,7 @@ func TestTransactionsDBList(t *testing.T) {
var txType billing.TransactionType
for i := 0; i < transactionCount; i++ {
txSource := "storjscan"
txStatus = billing.TransactionStatusComplete
txStatus = billing.TransactionStatusCompleted
txType = billing.TransactionTypeCredit
if i%2 == 0 {
txSource = "stripe"
@ -115,7 +111,7 @@ func TestTransactionsDBBalance(t *testing.T) {
Amount: tenUSD,
Description: "credit from storjscan payment",
Source: "storjscan",
Status: billing.TransactionStatusComplete,
Status: billing.TransactionStatusCompleted,
Type: billing.TransactionTypeCredit,
Metadata: metadata,
Timestamp: time.Now().Add(time.Second),
@ -127,7 +123,7 @@ func TestTransactionsDBBalance(t *testing.T) {
Amount: thirtyUSD,
Description: "credit from storjscan payment",
Source: "storjscan",
Status: billing.TransactionStatusComplete,
Status: billing.TransactionStatusCompleted,
Type: billing.TransactionTypeCredit,
Metadata: metadata,
Timestamp: time.Now().Add(time.Second * 2),
@ -139,7 +135,7 @@ func TestTransactionsDBBalance(t *testing.T) {
Amount: negativeTwentyUSD,
Description: "charge for storage and bandwidth",
Source: "storjscan",
Status: billing.TransactionStatusComplete,
Status: billing.TransactionStatusCompleted,
Type: billing.TransactionTypeDebit,
Metadata: metadata,
Timestamp: time.Now().Add(time.Second * 3),
@ -231,7 +227,7 @@ func TestUpdateTransactions(t *testing.T) {
Amount: tenUSD,
Description: "credit from storjscan payment",
Source: "storjscan",
Status: billing.TransactionStatusComplete,
Status: billing.TransactionStatusCompleted,
Type: billing.TransactionTypeCredit,
Metadata: metadata,
Timestamp: time.Now().Add(time.Second),

View File

@ -1,12 +0,0 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package storjscan
// DB is storjscan DB interface.
//
// architecture: Database
type DB interface {
// Wallets is getter for wallets db.
Wallets() WalletsDB
}

View File

@ -19,6 +19,12 @@ type WalletsDB interface {
Add(ctx context.Context, userID uuid.UUID, walletAddress blockchain.Address) error
// Get returns the wallet address associated with the given user.
Get(ctx context.Context, userID uuid.UUID) (blockchain.Address, error)
// GetAllUsers returns all user IDs that have associated storjscan wallets.
GetAllUsers(ctx context.Context) (_ []uuid.UUID, err error)
// GetAll returns all saved wallet entries.
GetAll(ctx context.Context) (_ []Wallet, err error)
}
// Wallet associates a user ID and a wallet address.
type Wallet struct {
UserID uuid.UUID
Address blockchain.Address
}

View File

@ -65,6 +65,8 @@ func TestSignupCouponCodes(t *testing.T) {
),
pc.StripeCoinPayments,
db.StripeCoinPayments(),
db.Wallets(),
db.Billing(),
db.Console().Projects(),
db.ProjectAccounting(),
pc.StorageTBPrice,

View File

@ -51,7 +51,9 @@ type StripeInvoices interface {
// StripeInvoiceItems Stripe InvoiceItems interface.
type StripeInvoiceItems interface {
New(params *stripe.InvoiceItemParams) (*stripe.InvoiceItem, error)
Update(id string, params *stripe.InvoiceItemParams) (*stripe.InvoiceItem, error)
List(listParams *stripe.InvoiceItemListParams) *invoiceitem.Iter
Del(id string, params *stripe.InvoiceItemParams) (*stripe.InvoiceItem, error)
}
// StripeCharges Stripe Charges interface.

View File

@ -5,6 +5,7 @@ package stripecoinpayments
import (
"context"
"encoding/json"
"errors"
"fmt"
"strconv"
@ -18,11 +19,14 @@ import (
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/uuid"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/console"
"storj.io/storj/satellite/payments"
"storj.io/storj/satellite/payments/billing"
"storj.io/storj/satellite/payments/coinpayments"
"storj.io/storj/satellite/payments/monetary"
"storj.io/storj/satellite/payments/storjscan"
)
var (
@ -53,8 +57,12 @@ type Config struct {
//
// architecture: Service
type Service struct {
log *zap.Logger
db DB
log *zap.Logger
db DB
walletsDB storjscan.WalletsDB
billingDB billing.TransactionsDB
projectsDB console.Projects
usageDB accounting.ProjectAccounting
stripeClient StripeClient
@ -80,7 +88,7 @@ type Service struct {
}
// NewService creates a Service instance.
func NewService(log *zap.Logger, stripeClient StripeClient, config Config, db DB, projectsDB console.Projects, usageDB accounting.ProjectAccounting, storageTBPrice, egressTBPrice, segmentPrice string, bonusRate int64) (*Service, error) {
func NewService(log *zap.Logger, stripeClient StripeClient, config Config, db DB, walletsDB storjscan.WalletsDB, billingDB billing.TransactionsDB, projectsDB console.Projects, usageDB accounting.ProjectAccounting, storageTBPrice, egressTBPrice, segmentPrice string, bonusRate int64) (*Service, error) {
coinPaymentsClient := coinpayments.NewClient(
coinpayments.Credentials{
PublicKey: config.CoinpaymentsPublicKey,
@ -109,6 +117,8 @@ func NewService(log *zap.Logger, stripeClient StripeClient, config Config, db DB
return &Service{
log: log,
db: db,
walletsDB: walletsDB,
billingDB: billingDB,
projectsDB: projectsDB,
usageDB: usageDB,
stripeClient: stripeClient,
@ -532,6 +542,155 @@ func (service *Service) InvoiceApplyProjectRecords(ctx context.Context, period t
return nil
}
// InvoiceApplyTokenBalance iterates through customer storjscan wallets and creates invoice line items
// for stripe customer.
func (service *Service) InvoiceApplyTokenBalance(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
// get all wallet entries
wallets, err := service.walletsDB.GetAll(ctx)
if err != nil {
return Error.New("unable to get users in the wallets table")
}
var errGrp errs.Group
for _, wallet := range wallets {
// get the user token balance, if it's not > 0, don't bother with the rest
tokenBalance, err := service.billingDB.GetBalance(ctx, wallet.UserID)
if err != nil {
errGrp.Add(Error.New("unable to compute balance for user ID %s", wallet.UserID.String()))
continue
}
if tokenBalance <= 0 {
continue
}
// get the stripe customer invoice balance
cusID, err := service.db.Customers().GetCustomerID(ctx, wallet.UserID)
if err != nil {
errGrp.Add(Error.New("unable to get stripe customer ID for user ID %s", wallet.UserID.String()))
continue
}
invoices, err := service.getInvoices(ctx, cusID)
if err != nil {
errGrp.Add(Error.New("unable to get invoice balance for stripe customer ID %s", cusID))
continue
}
for _, invoice := range invoices {
// if no balance due, do nothing
if invoice.AmountDue <= 0 {
continue
}
var tokenCreditAmount int64
if invoice.AmountDue >= tokenBalance {
tokenCreditAmount = -tokenBalance
} else {
tokenCreditAmount = -invoice.AmountDue
}
txID, err := service.createTokenPaymentBillingTransaction(ctx, wallet.UserID, invoice.ID, wallet.Address.Hex(), tokenCreditAmount)
if err != nil {
errGrp.Add(Error.New("unable to create token payment billing transaction for user %s", wallet.UserID.String()))
continue
}
invoiceItem, err := service.createTokenPaymentInvoiceItem(ctx, cusID, tokenCreditAmount, txID, wallet.Address.Hex())
if err != nil {
errGrp.Add(Error.New("unable to create token payment invoice item for user %s", wallet.UserID.String()))
continue
}
metadata, err := json.Marshal(map[string]interface{}{
"ItemID": invoiceItem.ID,
})
if err != nil {
errGrp.Add(Error.New("unable to marshall invoice item ID %s", invoiceItem.ID))
continue
}
err = service.billingDB.UpdateMetadata(ctx, txID, metadata)
if err != nil {
errGrp.Add(Error.New("unable to add invoice item ID to billing transaction for user %s", wallet.UserID.String()))
continue
}
}
}
return errGrp.Err()
}
// getInvoiceBalance returns the stripe customer's current invoices.
func (service *Service) getInvoices(ctx context.Context, cusID string) (_ []stripe.Invoice, err error) {
defer mon.Task()(&ctx)(&err)
params := &stripe.InvoiceListParams{
Customer: stripe.String(cusID),
Status: stripe.String(string(stripe.InvoiceStatusDraft)),
}
invoicesIterator := service.stripeClient.Invoices().List(params)
var stripeInvoices []stripe.Invoice
for invoicesIterator.Next() {
stripeInvoice := invoicesIterator.Invoice()
if stripeInvoice != nil {
stripeInvoices = append(stripeInvoices, *stripeInvoice)
}
}
return stripeInvoices, nil
}
// createTokenPaymentInvoiceItem creates an invoice line item for the user token payment.
func (service *Service) createTokenPaymentInvoiceItem(ctx context.Context, cusID string, amount int64, txID int64, wallet string) (invoiceItem *stripe.InvoiceItem, err error) {
defer mon.Task()(&ctx)(&err)
// add an invoice item for the total invoice amount
tokenCredit := &stripe.InvoiceItemParams{
Currency: stripe.String(string(stripe.CurrencyUSD)),
Customer: stripe.String(cusID),
Description: stripe.String("payment from tokens"),
UnitAmount: stripe.Int64(amount),
Params: stripe.Params{
Metadata: map[string]string{
"transaction ID": strconv.FormatInt(txID, 10),
"wallet address": wallet,
},
},
}
invoiceItem, err = service.stripeClient.InvoiceItems().New(tokenCredit)
if err != nil {
service.log.Warn("unable to add invoice item for stripe customer", zap.String("Customer ID", cusID))
return nil, Error.Wrap(err)
}
return
}
// createTokenPaymentBillingTransaction creates a billing DB entry for the user token payment.
func (service *Service) createTokenPaymentBillingTransaction(ctx context.Context, userID uuid.UUID, invoiceID, wallet string, amount int64) (_ int64, err error) {
defer mon.Task()(&ctx)(&err)
metadata, err := json.Marshal(map[string]interface{}{
"InvoiceID": invoiceID,
"Wallet": wallet,
})
transaction := billing.Transaction{
UserID: userID,
Amount: monetary.AmountFromBaseUnits(amount, monetary.USDollars),
Description: "Paid Stripe Invoice",
Source: "stripe",
Status: billing.TransactionStatusPending,
Type: billing.TransactionTypeDebit,
Metadata: metadata,
Timestamp: time.Now(),
}
txID, err := service.billingDB.Insert(ctx, transaction)
if err != nil {
service.log.Warn("unable to add transaction to billing DB for user", zap.String("User ID", userID.String()))
return 0, Error.Wrap(err)
}
return txID, nil
}
// applyProjectRecords applies invoice intents as invoice line items to stripe customer.
func (service *Service) applyProjectRecords(ctx context.Context, records []ProjectRecord) (err error) {
defer mon.Task()(&ctx)(&err)
@ -772,6 +931,16 @@ func (service *Service) FinalizeInvoices(ctx context.Context) (err error) {
if err != nil {
return Error.Wrap(err)
}
if transactionID, ok := stripeInvoice.Metadata["transaction ID"]; ok {
txID, err := strconv.ParseInt(transactionID, 10, 64)
if err != nil {
return Error.Wrap(err)
}
err = service.billingDB.UpdateStatus(ctx, txID, billing.TransactionStatusCompleted)
if err != nil {
return Error.Wrap(err)
}
}
}
return Error.Wrap(invoicesIterator.Err())

View File

@ -14,11 +14,15 @@ import (
"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/blockchain"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/console"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/payments/billing"
"storj.io/storj/satellite/payments/monetary"
"storj.io/storj/satellite/payments/stripecoinpayments"
)
@ -171,6 +175,7 @@ func TestService_InvoiceUserWithManyProjects(t *testing.T) {
err = payments.StripeService.CreateInvoices(ctx, period)
require.NoError(t, err)
})
}
@ -294,3 +299,46 @@ func TestService_InvoiceItemsFromProjectRecord(t *testing.T) {
}
})
}
func TestService_InvoiceItemsFromZeroTokenBalance(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 0,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Payments.StripeCoinPayments.ListingLimit = 4
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
payments := satellite.API.Payments
user, err := satellite.AddUser(ctx, console.CreateUser{
FullName: "testuser",
Email: "user@test",
}, 1)
require.NoError(t, err)
// setup storjscan wallet
address, err := blockchain.BytesToAddress(testrand.Bytes(20))
require.NoError(t, err)
userID := user.ID
err = satellite.DB.Wallets().Add(ctx, userID, address)
require.NoError(t, err)
_, err = satellite.DB.Billing().Insert(ctx, billing.Transaction{
UserID: userID,
Amount: monetary.AmountFromBaseUnits(10, monetary.USDollars),
Description: "token payment credit",
Source: "storjscan",
Status: billing.TransactionStatusCompleted,
Type: billing.TransactionTypeCredit,
Metadata: nil,
Timestamp: time.Now(),
CreatedAt: time.Now(),
})
require.NoError(t, err)
// run apply token balance to see if there are no unexpected errors
err = payments.StripeService.InvoiceApplyTokenBalance(ctx)
require.NoError(t, err)
})
}

View File

@ -433,6 +433,14 @@ func (m *mockInvoices) FinalizeInvoice(id string, params *stripe.InvoiceFinalize
type mockInvoiceItems struct {
}
func (m *mockInvoiceItems) Update(id string, params *stripe.InvoiceItemParams) (*stripe.InvoiceItem, error) {
return nil, nil
}
func (m *mockInvoiceItems) Del(id string, params *stripe.InvoiceItemParams) (*stripe.InvoiceItem, error) {
return nil, nil
}
func (m *mockInvoiceItems) New(params *stripe.InvoiceItemParams) (*stripe.InvoiceItem, error) {
return nil, nil
}

View File

@ -38,7 +38,7 @@ func (db billingDB) Insert(ctx context.Context, billingTX billing.Transaction) (
return 0, Error.Wrap(err)
}
if balance+billingTX.Amount.BaseUnits() < 0 {
return 0, Error.New("Insufficient funds for this transaction")
return 0, billing.ErrInsufficientFunds
}
var dbxTX *dbx.BillingTransaction
@ -73,10 +73,16 @@ func (db billingDB) Insert(ctx context.Context, billingTX billing.Transaction) (
dbx.BillingTransaction_Source(billingTX.Source),
dbx.BillingTransaction_Status(string(billingTX.Status)),
dbx.BillingTransaction_Type(string(billingTX.Type)),
dbx.BillingTransaction_Metadata(billingTX.Metadata),
dbx.BillingTransaction_Metadata(handleMetaDataZeroValue(billingTX.Metadata)),
dbx.BillingTransaction_Timestamp(billingTX.Timestamp))
return err
})
if err != nil {
return 0, err
}
if dbxTX == nil {
return 0, err
}
return dbxTX.Id, err
}
@ -84,9 +90,9 @@ func (db billingDB) InsertBatch(ctx context.Context, billingTXs []billing.Transa
err = pgxutil.Conn(ctx, db.db, func(conn *pgx.Conn) error {
var batch pgx.Batch
for _, billingTX := range billingTXs {
statement := db.db.Rebind(`INSERT INTO billing_transactions ( user_id, amount, currency, description, source, status, type, metadata, timestamp, created_at ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW() ) ON CONFLICT DO NOTHING`)
statement := db.db.Rebind(`INSERT INTO billing_transactions ( user_id, amount, currency, description, source, status, type, metadata, timestamp, created_at ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW() )`)
batch.Queue(statement, billingTX.UserID.Bytes(), billingTX.Amount.BaseUnits(), monetary.USDollars.Symbol(), billingTX.Description, billingTX.Source,
billingTX.Status, billingTX.Type, billingTX.Metadata, billingTX.Timestamp)
billingTX.Status, billingTX.Type, handleMetaDataZeroValue(billingTX.Metadata), billingTX.Timestamp)
if err != nil {
db.db.log.Warn("invalid metadata skipped in query ", zap.String("statement query", statement))
}
@ -106,40 +112,28 @@ func (db billingDB) InsertBatch(ctx context.Context, billingTXs []billing.Transa
})
return err
}
func (db billingDB) UpdateStatus(ctx context.Context, txID int64, status billing.TransactionStatus) error {
return db.update(ctx, txID, status, nil)
}
func (db billingDB) UpdateMetadata(ctx context.Context, txID int64, metadata []byte) error {
return db.update(ctx, txID, "", metadata)
}
// update the transaction for any provided fields that have non-zero value.
func (db billingDB) update(ctx context.Context, txID int64, status billing.TransactionStatus, newMetadata []byte) (err error) {
func (db billingDB) UpdateStatus(ctx context.Context, txID int64, status billing.TransactionStatus) (err error) {
defer mon.Task()(&ctx)(&err)
return db.db.UpdateNoReturn_BillingTransaction_By_Id(ctx, dbx.BillingTransaction_Id(txID), dbx.BillingTransaction_Update_Fields{
Status: dbx.BillingTransaction_Status(string(status)),
})
}
updateFields := dbx.BillingTransaction_Update_Fields{}
func (db billingDB) UpdateMetadata(ctx context.Context, txID int64, newMetadata []byte) (err error) {
if status != "" {
updateFields.Status = dbx.BillingTransaction_Status(string(status))
dbxTX, err := db.db.Get_BillingTransaction_Metadata_By_Id(ctx, dbx.BillingTransaction_Id(txID))
if err != nil {
return Error.Wrap(err)
}
if newMetadata != nil {
dbxTX, err := db.db.Get_BillingTransaction_Metadata_By_Id(ctx, dbx.BillingTransaction_Id(txID))
if err != nil {
return Error.Wrap(err)
}
updatedMetadata, err := updateMetadata(dbxTX.Metadata, newMetadata)
if err != nil {
return Error.Wrap(err)
}
updateFields.Metadata = dbx.BillingTransaction_Metadata(updatedMetadata)
updatedMetadata, err := updateMetadata(dbxTX.Metadata, newMetadata)
if err != nil {
return Error.Wrap(err)
}
return db.db.UpdateNoReturn_BillingTransaction_By_Id(ctx, dbx.BillingTransaction_Id(txID), updateFields)
return db.db.UpdateNoReturn_BillingTransaction_By_Id(ctx, dbx.BillingTransaction_Id(txID), dbx.BillingTransaction_Update_Fields{
Metadata: dbx.BillingTransaction_Metadata(updatedMetadata),
})
}
func (db billingDB) LastTransaction(ctx context.Context, txSource string, txType billing.TransactionType) (_ time.Time, err error) {
@ -217,10 +211,17 @@ func updateMetadata(oldMetaData []byte, newMetaData []byte) ([]byte, error) {
return nil, err
}
err = json.Unmarshal(newMetaData, &updatedMetadata)
err = json.Unmarshal(handleMetaDataZeroValue(newMetaData), &updatedMetadata)
if err != nil {
return nil, err
}
return json.Marshal(updatedMetadata)
}
func handleMetaDataZeroValue(metaData []byte) []byte {
if metaData != nil {
return metaData
}
return []byte(`{}`)
}

View File

@ -1232,7 +1232,7 @@ read one (
)
read all (
select storjscan_wallet.user_id
select storjscan_wallet
)
model coinpayments_transaction (

View File

@ -15742,11 +15742,11 @@ func (obj *pgxImpl) Get_StorjscanWallet_WalletAddress_By_UserId(ctx context.Cont
}
func (obj *pgxImpl) All_StorjscanWallet_UserId(ctx context.Context) (
rows []*UserId_Row, err error) {
func (obj *pgxImpl) All_StorjscanWallet(ctx context.Context) (
rows []*StorjscanWallet, err error) {
defer mon.Task()(&ctx)(&err)
var __embed_stmt = __sqlbundle_Literal("SELECT storjscan_wallets.user_id FROM storjscan_wallets")
var __embed_stmt = __sqlbundle_Literal("SELECT storjscan_wallets.user_id, storjscan_wallets.wallet_address, storjscan_wallets.created_at FROM storjscan_wallets")
var __values []interface{}
@ -15754,7 +15754,7 @@ func (obj *pgxImpl) All_StorjscanWallet_UserId(ctx context.Context) (
obj.logStmt(__stmt, __values...)
for {
rows, err = func() (rows []*UserId_Row, err error) {
rows, err = func() (rows []*StorjscanWallet, err error) {
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, err
@ -15762,12 +15762,12 @@ func (obj *pgxImpl) All_StorjscanWallet_UserId(ctx context.Context) (
defer __rows.Close()
for __rows.Next() {
row := &UserId_Row{}
err = __rows.Scan(&row.UserId)
storjscan_wallet := &StorjscanWallet{}
err = __rows.Scan(&storjscan_wallet.UserId, &storjscan_wallet.WalletAddress, &storjscan_wallet.CreatedAt)
if err != nil {
return nil, err
}
rows = append(rows, row)
rows = append(rows, storjscan_wallet)
}
if err := __rows.Err(); err != nil {
return nil, err
@ -23153,11 +23153,11 @@ func (obj *pgxcockroachImpl) Get_StorjscanWallet_WalletAddress_By_UserId(ctx con
}
func (obj *pgxcockroachImpl) All_StorjscanWallet_UserId(ctx context.Context) (
rows []*UserId_Row, err error) {
func (obj *pgxcockroachImpl) All_StorjscanWallet(ctx context.Context) (
rows []*StorjscanWallet, err error) {
defer mon.Task()(&ctx)(&err)
var __embed_stmt = __sqlbundle_Literal("SELECT storjscan_wallets.user_id FROM storjscan_wallets")
var __embed_stmt = __sqlbundle_Literal("SELECT storjscan_wallets.user_id, storjscan_wallets.wallet_address, storjscan_wallets.created_at FROM storjscan_wallets")
var __values []interface{}
@ -23165,7 +23165,7 @@ func (obj *pgxcockroachImpl) All_StorjscanWallet_UserId(ctx context.Context) (
obj.logStmt(__stmt, __values...)
for {
rows, err = func() (rows []*UserId_Row, err error) {
rows, err = func() (rows []*StorjscanWallet, err error) {
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, err
@ -23173,12 +23173,12 @@ func (obj *pgxcockroachImpl) All_StorjscanWallet_UserId(ctx context.Context) (
defer __rows.Close()
for __rows.Next() {
row := &UserId_Row{}
err = __rows.Scan(&row.UserId)
storjscan_wallet := &StorjscanWallet{}
err = __rows.Scan(&storjscan_wallet.UserId, &storjscan_wallet.WalletAddress, &storjscan_wallet.CreatedAt)
if err != nil {
return nil, err
}
rows = append(rows, row)
rows = append(rows, storjscan_wallet)
}
if err := __rows.Err(); err != nil {
return nil, err
@ -26965,13 +26965,13 @@ func (rx *Rx) All_StorjscanPayment_OrderBy_Asc_BlockNumber_Asc_LogIndex(ctx cont
return tx.All_StorjscanPayment_OrderBy_Asc_BlockNumber_Asc_LogIndex(ctx)
}
func (rx *Rx) All_StorjscanWallet_UserId(ctx context.Context) (
rows []*UserId_Row, err error) {
func (rx *Rx) All_StorjscanWallet(ctx context.Context) (
rows []*StorjscanWallet, err error) {
var tx *Tx
if tx, err = rx.getTx(ctx); err != nil {
return
}
return tx.All_StorjscanWallet_UserId(ctx)
return tx.All_StorjscanWallet(ctx)
}
func (rx *Rx) All_User_By_NormalizedEmail(ctx context.Context,
@ -28736,8 +28736,8 @@ type Methods interface {
All_StorjscanPayment_OrderBy_Asc_BlockNumber_Asc_LogIndex(ctx context.Context) (
rows []*StorjscanPayment, err error)
All_StorjscanWallet_UserId(ctx context.Context) (
rows []*UserId_Row, err error)
All_StorjscanWallet(ctx context.Context) (
rows []*StorjscanWallet, err error)
All_User_By_NormalizedEmail(ctx context.Context,
user_normalized_email User_NormalizedEmail_Field) (

View File

@ -1,23 +0,0 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package satellitedb
import (
"storj.io/storj/satellite/payments/storjscan"
)
// ensures that *storjscanDB implements storjscan.DB.
var _ storjscan.DB = (*storjscanDB)(nil)
// storjscanDB is storjscan DB.
//
// architecture: Database
type storjscanDB struct {
db *satelliteDB
}
// Wallets is getter for wallets db.
func (db storjscanDB) Wallets() storjscan.WalletsDB {
return &storjscanWalletsDB{db: db.db}
}

View File

@ -44,20 +44,24 @@ func (walletsDB storjscanWalletsDB) Get(ctx context.Context, userID uuid.UUID) (
return address, nil
}
// GetAllUsers returns with all the users which has associated wallet.
func (walletsDB storjscanWalletsDB) GetAllUsers(ctx context.Context) (_ []uuid.UUID, err error) {
// GetAll returns all saved wallet entries.
func (walletsDB storjscanWalletsDB) GetAll(ctx context.Context) (_ []storjscan.Wallet, err error) {
defer mon.Task()(&ctx)(&err)
users, err := walletsDB.db.All_StorjscanWallet_UserId(ctx)
entries, err := walletsDB.db.All_StorjscanWallet(ctx)
if err != nil {
return nil, Error.Wrap(err)
}
var userIDs []uuid.UUID
for _, user := range users {
userID, err := uuid.FromBytes(user.UserId)
var wallets []storjscan.Wallet
for _, entry := range entries {
userID, err := uuid.FromBytes(entry.UserId)
if err != nil {
return nil, Error.Wrap(err)
}
userIDs = append(userIDs, userID)
address, err := blockchain.BytesToAddress(entry.WalletAddress)
if err != nil {
return nil, Error.Wrap(err)
}
wallets = append(wallets, storjscan.Wallet{UserID: userID, Address: address})
}
return userIDs, nil
return wallets, nil
}