satellite/payments: Remove expired package credits

During billing, before invoice creation, check if users are part of a
package plan. If so, and if the package plan is expired, remove unused
credit from the user's balance. If the user has credit in addition to
the package credit, send an analytics event to notify someone to handle
the credit removal manually.

Change-Id: Iad71d791f67c9733f9d9e42f962c64b2780264cc
This commit is contained in:
Cameron 2023-04-19 15:48:36 -04:00 committed by Storj Robot
parent 250704493d
commit 09ec5f107d
14 changed files with 332 additions and 29 deletions

View File

@ -14,6 +14,7 @@ import (
"storj.io/common/uuid"
"storj.io/private/process"
"storj.io/storj/satellite"
"storj.io/storj/satellite/analytics"
"storj.io/storj/satellite/payments/stripe"
"storj.io/storj/satellite/satellitedb"
)
@ -76,7 +77,9 @@ func setupPayments(log *zap.Logger, db satellite.DB) (*stripe.Service, error) {
prices,
priceOverrides,
pc.PackagePlans.Packages,
pc.BonusRate)
pc.BonusRate,
analytics.NewService(log.Named("analytics:service"), runCfg.Analytics, runCfg.Console.SatelliteName),
)
}
// parseYearMonth parses year and month from the provided string and returns a corresponding time.Time for the first day

View File

@ -53,6 +53,10 @@ type Admin struct {
Service *checker.Service
}
Analytics struct {
Service *analytics.Service
}
Payments struct {
Accounts payments.Accounts
Service *stripe.Service
@ -135,6 +139,16 @@ func NewAdmin(log *zap.Logger, full *identity.FullIdentity, db DB, metabaseDB *m
})
}
{ // setup analytics
peer.Analytics.Service = analytics.NewService(peer.Log.Named("analytics:service"), config.Analytics, config.Console.SatelliteName)
peer.Services.Add(lifecycle.Item{
Name: "analytics:service",
Run: peer.Analytics.Service.Run,
Close: peer.Analytics.Service.Close,
})
}
{ // setup payments
pc := config.Payments
@ -163,6 +177,13 @@ func NewAdmin(log *zap.Logger, full *identity.FullIdentity, db DB, metabaseDB *m
return nil, errs.Combine(err, peer.Close())
}
peer.FreezeAccounts.Service = console.NewAccountFreezeService(
db.Console().AccountFreezeEvents(),
db.Console().Users(),
db.Console().Projects(),
peer.Analytics.Service,
)
peer.Payments.Service, err = stripe.NewService(
peer.Log.Named("payments.stripe:service"),
stripeClient,
@ -176,7 +197,9 @@ func NewAdmin(log *zap.Logger, full *identity.FullIdentity, db DB, metabaseDB *m
prices,
priceOverrides,
pc.PackagePlans.Packages,
pc.BonusRate)
pc.BonusRate,
peer.Analytics.Service,
)
if err != nil {
return nil, errs.Combine(err, peer.Close())
@ -184,12 +207,6 @@ func NewAdmin(log *zap.Logger, full *identity.FullIdentity, db DB, metabaseDB *m
peer.Payments.Stripe = stripeClient
peer.Payments.Accounts = peer.Payments.Service.Accounts()
peer.FreezeAccounts.Service = console.NewAccountFreezeService(
db.Console().AccountFreezeEvents(),
db.Console().Users(),
db.Console().Projects(),
analytics.NewService(peer.Log.Named("analytics:service"), config.Analytics, config.Console.SatelliteName),
)
}
{ // setup admin endpoint

View File

@ -84,6 +84,8 @@ const (
eventAccountUnwarned = "Account Unwarned"
eventAccountFreezeWarning = "Account Freeze Warning"
eventUnpaidLargeInvoice = "Large Invoice Unpaid"
eventExpiredCreditNeedsRemoval = "Expired Credit Needs Removal"
eventExpiredCreditRemoved = "Expired Credit Removed"
)
var (
@ -636,3 +638,37 @@ func (service *Service) TrackProjectMemberDeletion(userID uuid.UUID, email strin
})
}
// TrackExpiredCreditNeedsRemoval sends an "Expired Credit Needs Removal" event to Segment.
func (service *Service) TrackExpiredCreditNeedsRemoval(userID uuid.UUID, customerID, packagePlan string) {
if !service.config.Enabled {
return
}
props := segment.NewProperties()
props.Set("customer ID", customerID)
props.Set("package plan", packagePlan)
service.enqueueMessage(segment.Track{
UserId: userID.String(),
Event: service.satelliteName + " " + eventExpiredCreditNeedsRemoval,
Properties: props,
})
}
// TrackExpiredCreditRemoved sends an "Expired Credit Removed" event to Segment.
func (service *Service) TrackExpiredCreditRemoved(userID uuid.UUID, customerID, packagePlan string) {
if !service.config.Enabled {
return
}
props := segment.NewProperties()
props.Set("customer ID", customerID)
props.Set("package plan", packagePlan)
service.enqueueMessage(segment.Track{
UserId: userID.String(),
Event: service.satelliteName + " " + eventExpiredCreditRemoved,
Properties: props,
})
}

View File

@ -540,7 +540,9 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
prices,
priceOverrides,
pc.PackagePlans.Packages,
pc.BonusRate)
pc.BonusRate,
peer.Analytics.Service,
)
if err != nil {
return nil, errs.Combine(err, peer.Close())

View File

@ -95,7 +95,9 @@ func TestGraphqlMutation(t *testing.T) {
prices,
priceOverrides,
pc.PackagePlans.Packages,
pc.BonusRate)
pc.BonusRate,
nil,
)
require.NoError(t, err)
service, err := console.NewService(

View File

@ -79,7 +79,9 @@ func TestGraphqlQuery(t *testing.T) {
prices,
priceOverrides,
pc.PackagePlans.Packages,
pc.BonusRate)
pc.BonusRate,
nil,
)
require.NoError(t, err)
service, err := console.NewService(

View File

@ -69,6 +69,10 @@ type Core struct {
Service *version_checker.Service
}
Analytics struct {
Service *analytics.Service
}
Mail struct {
Service *mailservice.Service
EmailReminders *emailreminders.Chore
@ -418,6 +422,16 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
}
}
{ // setup analytics service
peer.Analytics.Service = analytics.NewService(peer.Log.Named("analytics:service"), config.Analytics, config.Console.SatelliteName)
peer.Services.Add(lifecycle.Item{
Name: "analytics:service",
Run: peer.Analytics.Service.Run,
Close: peer.Analytics.Service.Close,
})
}
// TODO: remove in future, should be in API
{ // setup payments
pc := config.Payments
@ -460,7 +474,9 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
prices,
priceOverrides,
pc.PackagePlans.Packages,
pc.BonusRate)
pc.BonusRate,
peer.Analytics.Service,
)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
@ -513,14 +529,13 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
{ // setup account freeze
if config.AccountFreeze.Enabled {
analyticService := analytics.NewService(peer.Log.Named("analytics:service"), config.Analytics, config.Console.SatelliteName)
peer.Payments.AccountFreeze = accountfreeze.NewChore(
peer.Log.Named("payments.accountfreeze:chore"),
peer.DB.StripeCoinPayments(),
peer.Payments.Accounts,
peer.DB.Console().Users(),
console.NewAccountFreezeService(db.Console().AccountFreezeEvents(), db.Console().Users(), db.Console().Projects(), analyticService),
analyticService,
console.NewAccountFreezeService(db.Console().AccountFreezeEvents(), db.Console().Users(), db.Console().Projects(), peer.Analytics.Service),
peer.Analytics.Service,
config.AccountFreeze,
)

View File

@ -73,7 +73,9 @@ func TestSignupCouponCodes(t *testing.T) {
prices,
priceOverrides,
pc.PackagePlans.Packages,
pc.BonusRate)
pc.BonusRate,
nil,
)
require.NoError(t, err)
service, err := console.NewService(

View File

@ -58,12 +58,12 @@ func TestBalances(t *testing.T) {
list, err := balances.ListTransactions(ctx, userID)
require.NoError(t, err)
require.Len(t, list, 3)
require.Equal(t, tx1Amount, list[0].Amount)
require.Equal(t, tx1Desc, list[0].Description)
require.Equal(t, tx3Amount, list[0].Amount)
require.Equal(t, tx3Desc, list[0].Description)
require.Equal(t, tx2Amount, list[1].Amount)
require.Equal(t, tx2Desc, list[1].Description)
require.Equal(t, tx3Amount, list[2].Amount)
require.Equal(t, tx3Desc, list[2].Description)
require.Equal(t, tx1Amount, list[2].Amount)
require.Equal(t, tx1Desc, list[2].Description)
b, err = balances.ApplyCredit(ctx, userID, tx2Amount, stripe.MockCBTXsNewFailure)
require.Error(t, err)

View File

@ -25,6 +25,7 @@ import (
"storj.io/common/sync2"
"storj.io/common/uuid"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/analytics"
"storj.io/storj/satellite/console"
"storj.io/storj/satellite/payments"
"storj.io/storj/satellite/payments/billing"
@ -50,6 +51,7 @@ type Config struct {
ListingLimit int `help:"sets the maximum amount of items before we start paging on requests" default:"100" hidden:"true"`
SkipEmptyInvoices bool `help:"if set, skips the creation of empty invoices for customers with zero usage for the billing period" default:"true"`
MaxParallelCalls int `help:"the maximum number of concurrent Stripe API calls in invoicing methods" default:"10"`
RemoveExpiredCredit bool `help:"whether to remove expired package credit or not" default:"true"`
Retries RetryConfig
}
@ -68,6 +70,8 @@ type Service struct {
usageDB accounting.ProjectAccounting
stripeClient Client
analytics *analytics.Service
usagePrices payments.ProjectUsagePriceModel
usagePriceOverrides map[string]payments.ProjectUsagePriceModel
packagePlans map[string]payments.PackagePlan
@ -80,14 +84,15 @@ type Service struct {
// Stripe Extended Features
AutoAdvance bool
listingLimit int
skipEmptyInvoices bool
maxParallelCalls int
nowFn func() time.Time
listingLimit int
skipEmptyInvoices bool
maxParallelCalls int
removeExpiredCredit bool
nowFn func() time.Time
}
// NewService creates a Service instance.
func NewService(log *zap.Logger, stripeClient Client, config Config, db DB, walletsDB storjscan.WalletsDB, billingDB billing.TransactionsDB, projectsDB console.Projects, usersDB console.Users, usageDB accounting.ProjectAccounting, usagePrices payments.ProjectUsagePriceModel, usagePriceOverrides map[string]payments.ProjectUsagePriceModel, packagePlans map[string]payments.PackagePlan, bonusRate int64) (*Service, error) {
func NewService(log *zap.Logger, stripeClient Client, config Config, db DB, walletsDB storjscan.WalletsDB, billingDB billing.TransactionsDB, projectsDB console.Projects, usersDB console.Users, usageDB accounting.ProjectAccounting, usagePrices payments.ProjectUsagePriceModel, usagePriceOverrides map[string]payments.ProjectUsagePriceModel, packagePlans map[string]payments.PackagePlan, bonusRate int64, analyticsService *analytics.Service) (*Service, error) {
var partners []string
for partner := range usagePriceOverrides {
partners = append(partners, partner)
@ -102,6 +107,7 @@ func NewService(log *zap.Logger, stripeClient Client, config Config, db DB, wall
usersDB: usersDB,
usageDB: usageDB,
stripeClient: stripeClient,
analytics: analyticsService,
usagePrices: usagePrices,
usagePriceOverrides: usagePriceOverrides,
packagePlans: packagePlans,
@ -112,6 +118,7 @@ func NewService(log *zap.Logger, stripeClient Client, config Config, db DB, wall
listingLimit: config.ListingLimit,
skipEmptyInvoices: config.SkipEmptyInvoices,
maxParallelCalls: config.MaxParallelCalls,
removeExpiredCredit: config.RemoveExpiredCredit,
nowFn: time.Now,
}, nil
}
@ -573,6 +580,75 @@ func (service *Service) InvoiceItemsFromProjectUsage(projName string, partnerUsa
return result
}
// RemoveExpiredPackageCredit removes a user's package plan credit, or sends an analytics event, if it has expired.
// If the user has never received credit from anything other than the package, and it is expired, the remaining package
// credit is removed. If the user has received credit from another source, we send an analytics event instead of removing
// the remaining credit so someone can remove it manually. `sentEvent` indicates whether this analytics event was sent.
func (service *Service) RemoveExpiredPackageCredit(ctx context.Context, customer Customer) (sentEvent bool, err error) {
defer mon.Task()(&ctx)(&err)
// TODO: store the package expiration somewhere
if customer.PackagePlan == nil || customer.PackagePurchasedAt == nil ||
customer.PackagePurchasedAt.After(service.nowFn().AddDate(-1, -1, 0)) {
return false, nil
}
list := service.stripeClient.CustomerBalanceTransactions().List(&stripe.CustomerBalanceTransactionListParams{
Customer: stripe.String(customer.ID),
})
var balance int64
var gotBalance, foundOtherCredit bool
var tx *stripe.CustomerBalanceTransaction
for list.Next() {
tx = list.CustomerBalanceTransaction()
if !gotBalance {
// Stripe returns list ordered by most recent, so ending balance of the first item is current balance.
balance = tx.EndingBalance
gotBalance = true
// if user doesn't have credit, we're done.
if balance >= 0 {
break
}
}
// negative amount means credit
if tx.Amount < 0 {
if tx.Description != *customer.PackagePlan {
foundOtherCredit = true
}
}
}
// send analytics event to notify someone to handle removing credit if credit other than package exists.
if foundOtherCredit {
if service.analytics != nil {
service.analytics.TrackExpiredCreditNeedsRemoval(customer.UserID, customer.ID, *customer.PackagePlan)
}
return true, nil
}
// If no other credit found, we can set the balance to zero.
if balance < 0 {
_, err = service.stripeClient.CustomerBalanceTransactions().New(&stripe.CustomerBalanceTransactionParams{
Customer: stripe.String(customer.ID),
Amount: stripe.Int64(-balance),
Currency: stripe.String(string(stripe.CurrencyUSD)),
Description: stripe.String(fmt.Sprintf("%s expired", *customer.PackagePlan)),
})
if err != nil {
return false, Error.Wrap(err)
}
if service.analytics != nil {
service.analytics.TrackExpiredCreditRemoved(customer.UserID, customer.ID, *customer.PackagePlan)
}
}
err = service.Accounts().UpdatePackage(ctx, customer.UserID, nil, nil)
return false, Error.Wrap(err)
}
// ApplyFreeTierCoupons iterates through all customers in Stripe. For each customer,
// if that customer does not currently have a Stripe coupon, the free tier Stripe coupon
// is applied.
@ -661,7 +737,7 @@ func (service *Service) applyFreeTierCoupon(ctx context.Context, cusID string) (
return true, nil
}
// CreateInvoices lists through all customers and creates invoices.
// CreateInvoices lists through all customers, removes expired credit if applicable, and creates invoices.
func (service *Service) CreateInvoices(ctx context.Context, period time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
@ -683,6 +759,16 @@ func (service *Service) CreateInvoices(ctx context.Context, period time.Time) (e
return Error.Wrap(err)
}
if service.removeExpiredCredit {
for _, c := range cusPage.Customers {
if c.PackagePlan != nil {
if _, err := service.RemoveExpiredPackageCredit(ctx, c); err != nil {
return Error.Wrap(err)
}
}
}
}
scheduled, draft, err := service.createInvoices(ctx, cusPage.Customers, start)
if err != nil {
return Error.Wrap(err)

View File

@ -12,6 +12,7 @@ import (
"testing"
"time"
"github.com/shopspring/decimal"
"github.com/stretchr/testify/require"
"github.com/stripe/stripe-go/v72"
"go.uber.org/zap"
@ -967,3 +968,136 @@ func TestPayInvoicesSkipDue(t *testing.T) {
}
})
}
func TestRemoveExpiredPackageCredit(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 4,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
p := satellite.API.Payments
u0 := planet.Uplinks[0].Projects[0].Owner.ID
u1 := planet.Uplinks[1].Projects[0].Owner.ID
u2 := planet.Uplinks[2].Projects[0].Owner.ID
u3 := planet.Uplinks[3].Projects[0].Owner.ID
credit := int64(1000)
pkgDesc := "test package plan"
now := time.Now()
expiredPurchase := now.AddDate(-1, -1, 0)
removeExpiredCredit := func(u uuid.UUID, expectAlert bool, purchaseTime *time.Time) {
require.NoError(t, p.Accounts.UpdatePackage(ctx, u, &pkgDesc, purchaseTime))
cPage, err := satellite.API.DB.StripeCoinPayments().Customers().List(ctx, uuid.UUID{}, 10, time.Now().Add(1*time.Hour))
require.NoError(t, err)
var c stripe1.Customer
for _, cus := range cPage.Customers {
if cus.UserID == u {
c = cus
}
}
alertSent, err := p.StripeService.RemoveExpiredPackageCredit(ctx, stripe1.Customer{
ID: c.ID,
UserID: c.UserID,
PackagePlan: c.PackagePlan,
PackagePurchasedAt: c.PackagePurchasedAt,
})
require.NoError(t, err)
if expectAlert {
require.True(t, alertSent)
} else {
require.False(t, alertSent)
}
}
checkCreditAndPackage := func(u uuid.UUID, expectedBalance int64, expectNilPackage bool) {
b, err := p.Accounts.Balances().Get(ctx, u)
require.NoError(t, err)
require.Equal(t, decimal.NewFromInt(expectedBalance), b.Credits)
dbPkgInfo, dbPurchaseTime, err := p.StripeService.Accounts().GetPackageInfo(ctx, u)
require.NoError(t, err)
if expectNilPackage {
require.Nil(t, dbPkgInfo)
require.Nil(t, dbPurchaseTime)
} else {
require.NotNil(t, dbPkgInfo)
require.NotNil(t, dbPurchaseTime)
}
}
t.Run("nil package plan returns safely", func(t *testing.T) {
_, err := p.StripeService.RemoveExpiredPackageCredit(ctx, stripe1.Customer{
ID: "test-customer-ID",
UserID: testrand.UUID(),
PackagePlan: nil,
PackagePurchasedAt: &now,
})
require.NoError(t, err)
})
t.Run("nil package purchase time returns safely", func(t *testing.T) {
_, err := p.StripeService.RemoveExpiredPackageCredit(ctx, stripe1.Customer{
ID: "test-customer-ID",
UserID: testrand.UUID(),
PackagePlan: new(string),
PackagePurchasedAt: nil,
})
require.NoError(t, err)
})
t.Run("package not expired retains credit", func(t *testing.T) {
b, err := p.Accounts.Balances().ApplyCredit(ctx, u3, credit, pkgDesc)
require.NoError(t, err)
require.Equal(t, decimal.NewFromInt(credit), b.Credits)
removeExpiredCredit(u3, false, &now)
checkCreditAndPackage(u3, credit, false)
})
t.Run("used all credit", func(t *testing.T) {
b, err := p.Accounts.Balances().ApplyCredit(ctx, u0, credit, pkgDesc)
require.NoError(t, err)
require.Equal(t, decimal.NewFromInt(credit), b.Credits)
// remove credit as if they used it all
b, err = p.Accounts.Balances().ApplyCredit(ctx, u0, -credit, pkgDesc)
require.NoError(t, err)
require.Equal(t, decimal.NewFromInt(0), b.Credits)
removeExpiredCredit(u0, false, &expiredPurchase)
checkCreditAndPackage(u0, 0, true)
})
t.Run("has remaining credit but no credit source other than package", func(t *testing.T) {
b, err := p.Accounts.Balances().ApplyCredit(ctx, u1, credit, pkgDesc)
require.NoError(t, err)
require.Equal(t, decimal.NewFromInt(credit), b.Credits)
// remove some credit, but not all, as if it were used
toRemove := credit / 2
remaining := credit - toRemove
b, err = p.Accounts.Balances().ApplyCredit(ctx, u1, -toRemove, pkgDesc)
require.NoError(t, err)
require.Equal(t, decimal.NewFromInt(remaining), b.Credits)
removeExpiredCredit(u1, false, &expiredPurchase)
checkCreditAndPackage(u1, 0, true)
})
t.Run("has additional credit source", func(t *testing.T) {
b, err := p.Accounts.Balances().ApplyCredit(ctx, u2, credit, pkgDesc)
require.NoError(t, err)
require.Equal(t, decimal.NewFromInt(credit), b.Credits)
// give additional credit
additional := int64(2000)
b, err = p.Accounts.Balances().ApplyCredit(ctx, u2, additional, "additional credit")
require.NoError(t, err)
require.Equal(t, decimal.NewFromInt(credit+additional), b.Credits)
removeExpiredCredit(u2, true, &expiredPurchase)
checkCreditAndPackage(u2, credit+additional, false)
})
})
}

View File

@ -780,7 +780,8 @@ func (m *mockCustomerBalanceTransactions) List(listParams *stripe.CustomerBalanc
ret := make([]interface{}, len(txs))
for i, v := range txs {
ret[i] = v
// stripe returns list of transactions ordered by most recent, so reverse the array.
ret[len(txs)-1-i] = v
}
listMeta := &stripe.ListMeta{

View File

@ -83,7 +83,7 @@ func (customers *customers) List(ctx context.Context, userIDCursor uuid.UUID, li
rows, err := customers.db.QueryContext(ctx, customers.db.Rebind(`
SELECT
stripe_customers.user_id, stripe_customers.customer_id
stripe_customers.user_id, stripe_customers.customer_id, stripe_customers.package_plan, stripe_customers.purchased_package_at
FROM
stripe_customers
WHERE
@ -103,7 +103,7 @@ func (customers *customers) List(ctx context.Context, userIDCursor uuid.UUID, li
results := []stripe.Customer{}
for rows.Next() {
var customer stripe.Customer
err := rows.Scan(&customer.UserID, &customer.ID)
err := rows.Scan(&customer.UserID, &customer.ID, &customer.PackagePlan, &customer.PackagePurchasedAt)
if err != nil {
return stripe.CustomersPage{}, errs.New("unable to get stripe customer: %+v", err)
}

View File

@ -865,6 +865,9 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
# the maximum number of concurrent Stripe API calls in invoicing methods
# payments.stripe-coin-payments.max-parallel-calls: 10
# whether to remove expired package credit or not
# payments.stripe-coin-payments.remove-expired-credit: true
# the duration of the first retry interval
# payments.stripe-coin-payments.retries.initial-backoff: 20ms