satellite/payments/stripecoinpayments: parallelize invoice methods
Invoicing-related payment service methods have been modified to send Stripe API requests in parallel. Additionally, randomness has been added to the Stripe backend wrapper's exponential backoff strategy in order to reduce the effects of the thundering herd problem, which arises when executing many simultaneous API calls. Resolves #5156 Change-Id: I568f933284f4229ef41c155377ca0cc33f0eb5a4
This commit is contained in:
parent
21249e6c00
commit
e0bb410192
@ -8,6 +8,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"math"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
@ -227,7 +228,6 @@ func (w *BackendWrapper) withRetries(params stripe.ParamsContainer, call func()
|
||||
return err
|
||||
}
|
||||
|
||||
backoff := float64(w.retryCfg.InitialBackoff)
|
||||
for retry := int64(0); ; retry++ {
|
||||
err := call()
|
||||
if err == nil {
|
||||
@ -238,11 +238,16 @@ func (w *BackendWrapper) withRetries(params stripe.ParamsContainer, call func()
|
||||
return err
|
||||
}
|
||||
|
||||
minBackoff := float64(w.retryCfg.InitialBackoff)
|
||||
maxBackoff := math.Min(
|
||||
float64(w.retryCfg.MaxBackoff),
|
||||
minBackoff*math.Pow(w.retryCfg.Multiplier, float64(retry)),
|
||||
)
|
||||
backoff := minBackoff + rand.Float64()*(maxBackoff-minBackoff)
|
||||
|
||||
if !w.clock.Sleep(ctx, time.Duration(backoff)) {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
backoff = math.Min(backoff*w.retryCfg.Multiplier, float64(w.retryCfg.MaxBackoff))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/shopspring/decimal"
|
||||
@ -20,6 +21,7 @@ import (
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/currency"
|
||||
"storj.io/common/sync2"
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/storj/satellite/accounting"
|
||||
"storj.io/storj/satellite/console"
|
||||
@ -46,6 +48,7 @@ type Config struct {
|
||||
AutoAdvance bool `help:"toggle autoadvance feature for invoice creation" default:"false"`
|
||||
ListingLimit int `help:"sets the maximum amount of items before we start paging on requests" default:"100" hidden:"true"`
|
||||
SkipEmptyInvoices bool `help:"if set, skips the creation of empty invoices for customers with zero usage for the billing period" default:"true"`
|
||||
MaxParallelCalls int `help:"the maximum number of concurrent Stripe API calls in invoicing methods" default:"10"`
|
||||
Retries RetryConfig
|
||||
}
|
||||
|
||||
@ -78,6 +81,7 @@ type Service struct {
|
||||
|
||||
listingLimit int
|
||||
skipEmptyInvoices bool
|
||||
maxParallelCalls int
|
||||
nowFn func() time.Time
|
||||
}
|
||||
|
||||
@ -106,6 +110,7 @@ func NewService(log *zap.Logger, stripeClient StripeClient, config Config, db DB
|
||||
AutoAdvance: config.AutoAdvance,
|
||||
listingLimit: config.ListingLimit,
|
||||
skipEmptyInvoices: config.SkipEmptyInvoices,
|
||||
maxParallelCalls: config.MaxParallelCalls,
|
||||
nowFn: time.Now,
|
||||
}, nil
|
||||
}
|
||||
@ -239,6 +244,12 @@ func (service *Service) InvoiceApplyProjectRecords(ctx context.Context, period t
|
||||
var totalRecords int
|
||||
var totalSkipped int
|
||||
|
||||
for {
|
||||
if err = ctx.Err(); err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
// we are always starting from offset 0 because applyProjectRecords is changing project record state to applied
|
||||
recordsPage, err := service.db.ProjectRecords().ListUnapplied(ctx, 0, service.listingLimit, start, end)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
@ -251,23 +262,9 @@ func (service *Service) InvoiceApplyProjectRecords(ctx context.Context, period t
|
||||
}
|
||||
totalSkipped += skipped
|
||||
|
||||
for recordsPage.Next {
|
||||
if err = ctx.Err(); err != nil {
|
||||
return Error.Wrap(err)
|
||||
if !recordsPage.Next {
|
||||
break
|
||||
}
|
||||
|
||||
// we are always starting from offset 0 because applyProjectRecords is changing project record state to applied
|
||||
recordsPage, err = service.db.ProjectRecords().ListUnapplied(ctx, 0, service.listingLimit, start, end)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
totalRecords += len(recordsPage.Records)
|
||||
|
||||
skipped, err := service.applyProjectRecords(ctx, recordsPage.Records)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
totalSkipped += skipped
|
||||
}
|
||||
|
||||
service.log.Info("Processed project records.",
|
||||
@ -446,6 +443,15 @@ func (service *Service) createTokenPaymentBillingTransaction(ctx context.Context
|
||||
func (service *Service) applyProjectRecords(ctx context.Context, records []ProjectRecord) (skipCount int, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
var mu sync.Mutex
|
||||
var errGrp errs.Group
|
||||
limiter := sync2.NewLimiter(service.maxParallelCalls)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer func() {
|
||||
cancel()
|
||||
limiter.Wait()
|
||||
}()
|
||||
|
||||
for _, record := range records {
|
||||
if err = ctx.Err(); err != nil {
|
||||
return 0, errs.Wrap(err)
|
||||
@ -468,14 +474,26 @@ func (service *Service) applyProjectRecords(ctx context.Context, records []Proje
|
||||
return 0, errs.Wrap(err)
|
||||
}
|
||||
|
||||
if skipped, err := service.createInvoiceItems(ctx, cusID, proj.Name, record); err != nil {
|
||||
return 0, errs.Wrap(err)
|
||||
} else if skipped {
|
||||
skipCount++
|
||||
record := record
|
||||
limiter.Go(ctx, func() {
|
||||
skipped, err := service.createInvoiceItems(ctx, cusID, proj.Name, record)
|
||||
if err != nil {
|
||||
mu.Lock()
|
||||
errGrp.Add(errs.Wrap(err))
|
||||
mu.Unlock()
|
||||
return
|
||||
}
|
||||
if skipped {
|
||||
mu.Lock()
|
||||
skipCount++
|
||||
mu.Unlock()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
return skipCount, nil
|
||||
limiter.Wait()
|
||||
|
||||
return skipCount, errGrp.Err()
|
||||
}
|
||||
|
||||
// createInvoiceItems creates invoice line items for stripe customer.
|
||||
@ -568,7 +586,15 @@ func (service *Service) ApplyFreeTierCoupons(ctx context.Context) (err error) {
|
||||
|
||||
customers := service.db.Customers()
|
||||
|
||||
appliedCoupons := 0
|
||||
limiter := sync2.NewLimiter(service.maxParallelCalls)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer func() {
|
||||
cancel()
|
||||
limiter.Wait()
|
||||
}()
|
||||
|
||||
var mu sync.Mutex
|
||||
var appliedCoupons int
|
||||
failedUsers := []string{}
|
||||
morePages := true
|
||||
nextOffset := int64(0)
|
||||
@ -583,29 +609,25 @@ func (service *Service) ApplyFreeTierCoupons(ctx context.Context) (err error) {
|
||||
nextOffset = customersPage.NextOffset
|
||||
|
||||
for _, c := range customersPage.Customers {
|
||||
params := &stripe.CustomerParams{Params: stripe.Params{Context: ctx}}
|
||||
stripeCust, err := service.stripeClient.Customers().Get(c.ID, params)
|
||||
cusID := c.ID
|
||||
limiter.Go(ctx, func() {
|
||||
applied, err := service.applyFreeTierCoupon(ctx, cusID)
|
||||
if err != nil {
|
||||
service.log.Error("Failed to get customer", zap.Error(err))
|
||||
failedUsers = append(failedUsers, c.ID)
|
||||
continue
|
||||
}
|
||||
// if customer does not have a coupon, apply the free tier coupon
|
||||
if stripeCust.Discount == nil || stripeCust.Discount.Coupon == nil {
|
||||
params := &stripe.CustomerParams{
|
||||
Params: stripe.Params{Context: ctx},
|
||||
Coupon: stripe.String(service.StripeFreeTierCouponID),
|
||||
}
|
||||
_, err := service.stripeClient.Customers().Update(c.ID, params)
|
||||
if err != nil {
|
||||
service.log.Error("Failed to update customer with free tier coupon", zap.Error(err))
|
||||
failedUsers = append(failedUsers, c.ID)
|
||||
continue
|
||||
mu.Lock()
|
||||
failedUsers = append(failedUsers, cusID)
|
||||
mu.Unlock()
|
||||
return
|
||||
}
|
||||
if applied {
|
||||
mu.Lock()
|
||||
appliedCoupons++
|
||||
mu.Unlock()
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
limiter.Wait()
|
||||
|
||||
if len(failedUsers) > 0 {
|
||||
service.log.Warn("Failed to get or apply free tier coupon to some customers:", zap.String("idlist", strings.Join(failedUsers, ", ")))
|
||||
@ -615,6 +637,35 @@ func (service *Service) ApplyFreeTierCoupons(ctx context.Context) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// applyFreeTierCoupon applies the free tier Stripe coupon to a customer if it doesn't already have a coupon.
|
||||
func (service *Service) applyFreeTierCoupon(ctx context.Context, cusID string) (applied bool, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
params := &stripe.CustomerParams{Params: stripe.Params{Context: ctx}}
|
||||
stripeCust, err := service.stripeClient.Customers().Get(cusID, params)
|
||||
if err != nil {
|
||||
service.log.Error("Failed to get customer", zap.Error(err))
|
||||
return false, err
|
||||
}
|
||||
|
||||
// if customer has a coupon, don't apply the free tier coupon
|
||||
if stripeCust.Discount != nil && stripeCust.Discount.Coupon != nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
params = &stripe.CustomerParams{
|
||||
Params: stripe.Params{Context: ctx},
|
||||
Coupon: stripe.String(service.StripeFreeTierCouponID),
|
||||
}
|
||||
_, err = service.stripeClient.Customers().Update(cusID, params)
|
||||
if err != nil {
|
||||
service.log.Error("Failed to update customer with free tier coupon", zap.Error(err))
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// CreateInvoices lists through all customers and creates invoices.
|
||||
func (service *Service) CreateInvoices(ctx context.Context, period time.Time) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
@ -630,31 +681,19 @@ func (service *Service) CreateInvoices(ctx context.Context, period time.Time) (e
|
||||
}
|
||||
|
||||
var nextOffset int64
|
||||
var draft, scheduled int
|
||||
var totalDraft, totalScheduled int
|
||||
for {
|
||||
cusPage, err := service.db.Customers().List(ctx, nextOffset, service.listingLimit, end)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
for _, cus := range cusPage.Customers {
|
||||
if err = ctx.Err(); err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
stripeInvoice, err := service.createInvoice(ctx, cus.ID, start)
|
||||
scheduled, draft, err := service.createInvoices(ctx, cusPage.Customers, start)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
switch {
|
||||
case stripeInvoice == nil:
|
||||
case stripeInvoice.AutoAdvance:
|
||||
scheduled++
|
||||
default:
|
||||
draft++
|
||||
}
|
||||
}
|
||||
totalScheduled += scheduled
|
||||
totalDraft += draft
|
||||
|
||||
if !cusPage.Next {
|
||||
break
|
||||
@ -662,17 +701,15 @@ func (service *Service) CreateInvoices(ctx context.Context, period time.Time) (e
|
||||
nextOffset = cusPage.NextOffset
|
||||
}
|
||||
|
||||
service.log.Info("Number of created invoices", zap.Int("Draft", draft), zap.Int("Scheduled", scheduled))
|
||||
service.log.Info("Number of created invoices", zap.Int("Draft", totalDraft), zap.Int("Scheduled", totalScheduled))
|
||||
return nil
|
||||
}
|
||||
|
||||
// createInvoice creates invoice for stripe customer. Returns nil error and nil invoice
|
||||
// if there are no pending invoice line items for customer.
|
||||
// createInvoice creates invoice for Stripe customer.
|
||||
func (service *Service) createInvoice(ctx context.Context, cusID string, period time.Time) (stripeInvoice *stripe.Invoice, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
description := fmt.Sprintf("Storj DCS Cloud Storage for %s %d", period.Month(), period.Year())
|
||||
|
||||
stripeInvoice, err = service.stripeClient.Invoices().New(
|
||||
&stripe.InvoiceParams{
|
||||
Params: stripe.Params{Context: ctx},
|
||||
@ -686,7 +723,7 @@ func (service *Service) createInvoice(ctx context.Context, cusID string, period
|
||||
var stripErr *stripe.Error
|
||||
if errors.As(err, &stripErr) {
|
||||
if stripErr.Code == stripe.ErrorCodeInvoiceNoCustomerLineItems {
|
||||
return nil, nil
|
||||
return stripeInvoice, nil
|
||||
}
|
||||
}
|
||||
return nil, err
|
||||
@ -707,6 +744,41 @@ func (service *Service) createInvoice(ctx context.Context, cusID string, period
|
||||
return stripeInvoice, nil
|
||||
}
|
||||
|
||||
// createInvoices creates invoices for Stripe customers.
|
||||
func (service *Service) createInvoices(ctx context.Context, customers []Customer, period time.Time) (scheduled, draft int, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
limiter := sync2.NewLimiter(service.maxParallelCalls)
|
||||
var errGrp errs.Group
|
||||
var mu sync.Mutex
|
||||
|
||||
for _, cus := range customers {
|
||||
cusID := cus.ID
|
||||
limiter.Go(ctx, func() {
|
||||
inv, err := service.createInvoice(ctx, cusID, period)
|
||||
if err != nil {
|
||||
mu.Lock()
|
||||
errGrp.Add(err)
|
||||
mu.Unlock()
|
||||
return
|
||||
}
|
||||
if inv != nil {
|
||||
mu.Lock()
|
||||
if inv.AutoAdvance {
|
||||
scheduled++
|
||||
} else {
|
||||
draft++
|
||||
}
|
||||
mu.Unlock()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
limiter.Wait()
|
||||
|
||||
return scheduled, draft, errGrp.Err()
|
||||
}
|
||||
|
||||
// GenerateInvoices performs all tasks necessary to generate Stripe invoices.
|
||||
// This is equivalent to invoking ApplyFreeTierCoupons, PrepareInvoiceProjectRecords,
|
||||
// InvoiceApplyProjectRecords, and CreateInvoices in order.
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
@ -625,12 +626,15 @@ func TestProjectUsagePrice(t *testing.T) {
|
||||
|
||||
items := getCustomerInvoiceItems(ctx, sat.API.Payments.StripeClient, cusID)
|
||||
require.Len(t, items, 3)
|
||||
storage, _ := tt.expectedPrice.StorageMBMonthCents.Float64()
|
||||
require.Equal(t, storage, items[0].UnitAmountDecimal)
|
||||
sort.Slice(items, func(i, j int) bool {
|
||||
return items[i].Description < items[j].Description
|
||||
})
|
||||
egress, _ := tt.expectedPrice.EgressMBCents.Float64()
|
||||
require.Equal(t, egress, items[1].UnitAmountDecimal)
|
||||
require.Equal(t, egress, items[0].UnitAmountDecimal)
|
||||
segment, _ := tt.expectedPrice.SegmentMonthCents.Float64()
|
||||
require.Equal(t, segment, items[2].UnitAmountDecimal)
|
||||
require.Equal(t, segment, items[1].UnitAmountDecimal)
|
||||
storage, _ := tt.expectedPrice.StorageMBMonthCents.Float64()
|
||||
require.Equal(t, storage, items[2].UnitAmountDecimal)
|
||||
})
|
||||
}
|
||||
})
|
||||
|
@ -651,6 +651,9 @@ func (m *mockInvoiceItems) New(params *stripe.InvoiceItemParams) (*stripe.Invoic
|
||||
item := &stripe.InvoiceItem{
|
||||
Metadata: params.Metadata,
|
||||
}
|
||||
if params.Description != nil {
|
||||
item.Description = *params.Description
|
||||
}
|
||||
if params.UnitAmountDecimal != nil {
|
||||
item.UnitAmountDecimal = *params.UnitAmountDecimal
|
||||
}
|
||||
|
3
scripts/testdata/satellite-config.yaml.lock
vendored
3
scripts/testdata/satellite-config.yaml.lock
vendored
@ -853,6 +853,9 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
|
||||
# toggle autoadvance feature for invoice creation
|
||||
# payments.stripe-coin-payments.auto-advance: false
|
||||
|
||||
# the maximum number of concurrent Stripe API calls in invoicing methods
|
||||
# payments.stripe-coin-payments.max-parallel-calls: 10
|
||||
|
||||
# the duration of the first retry interval
|
||||
# payments.stripe-coin-payments.retries.initial-backoff: 20ms
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user