satellite/payments: use stripe idempotency on create/update invoice items

Add idempotency key for every create/update invoice item request.
Key consists of public project ID, line item description and formatted invoicing period.
This should eliminate the possibility of making duplicate create/update actions.
Also, with this change, we mark a project record as consumed only after the appropriate invoice items are created or the record itself is skipped.

Issue:
https://github.com/storj/storj/issues/6501

Change-Id: I7506a8f3886d7f575bcc0facc3f107513352a312
This commit is contained in:
Vitalii 2023-12-01 16:59:06 +02:00
parent 504c72f29d
commit b31e417546
2 changed files with 85 additions and 16 deletions

View File

@ -58,6 +58,7 @@ type Config struct {
SkipEmptyInvoices bool `help:"if set, skips the creation of empty invoices for customers with zero usage for the billing period" default:"true"`
MaxParallelCalls int `help:"the maximum number of concurrent Stripe API calls in invoicing methods" default:"10"`
RemoveExpiredCredit bool `help:"whether to remove expired package credit or not" default:"true"`
UseIdempotency bool `help:"whether to use idempotency for create/update requests" default:"false"`
Retries RetryConfig
}
@ -94,6 +95,7 @@ type Service struct {
skipEmptyInvoices bool
maxParallelCalls int
removeExpiredCredit bool
useIdempotency bool
nowFn func() time.Time
}
@ -125,6 +127,7 @@ func NewService(log *zap.Logger, stripeClient Client, config Config, db DB, wall
skipEmptyInvoices: config.SkipEmptyInvoices,
maxParallelCalls: config.MaxParallelCalls,
removeExpiredCredit: config.RemoveExpiredCredit,
useIdempotency: config.UseIdempotency,
nowFn: time.Now,
}, nil
}
@ -291,7 +294,7 @@ func (service *Service) InvoiceApplyProjectRecords(ctx context.Context, period t
}
totalRecords += len(recordsPage.Records)
skipped, err := service.applyProjectRecords(ctx, recordsPage.Records)
skipped, err := service.applyProjectRecords(ctx, recordsPage.Records, period)
if err != nil {
return Error.Wrap(err)
}
@ -338,7 +341,7 @@ func (service *Service) InvoiceApplyToBeAggregatedProjectRecords(ctx context.Con
}
totalRecords += len(recordsPage.Records)
skipped, err := service.applyToBeAggregatedProjectRecords(ctx, recordsPage.Records)
skipped, err := service.applyToBeAggregatedProjectRecords(ctx, recordsPage.Records, period)
if err != nil {
return Error.Wrap(err)
}
@ -489,7 +492,7 @@ func (service *Service) createTokenPaymentBillingTransaction(ctx context.Context
}
// applyProjectRecords applies invoice intents as invoice line items to stripe customer.
func (service *Service) applyProjectRecords(ctx context.Context, records []ProjectRecord) (skipCount int, err error) {
func (service *Service) applyProjectRecords(ctx context.Context, records []ProjectRecord, period time.Time) (skipCount int, err error) {
defer mon.Task()(&ctx)(&err)
var mu sync.Mutex
@ -534,7 +537,7 @@ func (service *Service) applyProjectRecords(ctx context.Context, records []Proje
record := record
limiter.Go(ctx, func() {
skipped, err := service.createInvoiceItems(ctx, cusID, proj.Name, record)
skipped, err := service.createInvoiceItems(ctx, cusID, proj.Name, record, period)
if err != nil {
mu.Lock()
errGrp.Add(errs.Wrap(err))
@ -555,7 +558,7 @@ func (service *Service) applyProjectRecords(ctx context.Context, records []Proje
}
// applyToBeAggregatedProjectRecords applies to be aggregated invoice intents as invoice line items to stripe customer.
func (service *Service) applyToBeAggregatedProjectRecords(ctx context.Context, records []ProjectRecord) (skipCount int, err error) {
func (service *Service) applyToBeAggregatedProjectRecords(ctx context.Context, records []ProjectRecord, period time.Time) (skipCount int, err error) {
defer mon.Task()(&ctx)(&err)
for _, record := range records {
@ -587,7 +590,7 @@ func (service *Service) applyToBeAggregatedProjectRecords(ctx context.Context, r
}
record := record
skipped, err := service.processProjectRecord(ctx, cusID, proj.Name, record)
skipped, err := service.processProjectRecord(ctx, cusID, proj.Name, record, period)
if err != nil {
return 0, errs.Wrap(err)
}
@ -600,14 +603,22 @@ func (service *Service) applyToBeAggregatedProjectRecords(ctx context.Context, r
}
// createInvoiceItems creates invoice line items for stripe customer.
func (service *Service) createInvoiceItems(ctx context.Context, cusID, projName string, record ProjectRecord) (skipped bool, err error) {
func (service *Service) createInvoiceItems(ctx context.Context, cusID, projName string, record ProjectRecord, period time.Time) (skipped bool, err error) {
defer mon.Task()(&ctx)(&err)
if !service.useIdempotency {
if err = service.db.ProjectRecords().Consume(ctx, record.ID); err != nil {
return false, err
}
}
if service.skipEmptyInvoices && doesProjectRecordHaveNoUsage(record) {
if service.useIdempotency {
if err = service.db.ProjectRecords().Consume(ctx, record.ID); err != nil {
return false, err
}
}
return true, nil
}
@ -621,14 +632,25 @@ func (service *Service) createInvoiceItems(ctx context.Context, cusID, projName
item.Params = stripe.Params{Context: ctx}
item.Currency = stripe.String(string(stripe.CurrencyUSD))
item.Customer = stripe.String(cusID)
// TODO: do not expose regular project ID.
item.AddMetadata("projectID", record.ProjectID.String())
if service.useIdempotency {
item.SetIdempotencyKey(getIdempotencyKey(record.ProjectID, *item.Description, period))
}
_, err = service.stripeClient.InvoiceItems().New(item)
if err != nil {
return false, err
}
}
if service.useIdempotency {
if err = service.db.ProjectRecords().Consume(ctx, record.ID); err != nil {
return false, err
}
}
return false, nil
}
@ -641,14 +663,22 @@ const (
)
// processProjectRecord creates or updates invoice line items for stripe customer.
func (service *Service) processProjectRecord(ctx context.Context, cusID, projName string, record ProjectRecord) (skipped bool, err error) {
func (service *Service) processProjectRecord(ctx context.Context, cusID, projName string, record ProjectRecord, period time.Time) (skipped bool, err error) {
defer mon.Task()(&ctx)(&err)
if !service.useIdempotency {
if err = service.db.ProjectRecords().Consume(ctx, record.ID); err != nil {
return false, err
}
}
if service.skipEmptyInvoices && doesProjectRecordHaveNoUsage(record) {
if service.useIdempotency {
if err = service.db.ProjectRecords().Consume(ctx, record.ID); err != nil {
return false, err
}
}
return true, nil
}
@ -669,23 +699,53 @@ func (service *Service) processProjectRecord(ctx context.Context, cusID, projNam
item.Params = stripe.Params{Context: ctx}
item.Currency = stripe.String(string(stripe.CurrencyUSD))
item.Customer = stripe.String(cusID)
// TODO: do not expose regular project ID.
item.AddMetadata("projectID", record.ProjectID.String())
if service.useIdempotency {
item.SetIdempotencyKey(getIdempotencyKey(record.ProjectID, *item.Description, period))
}
_, err = service.stripeClient.InvoiceItems().New(item)
if err != nil {
return false, err
}
}
} else {
err = service.updateExistingInvoiceItems(ctx, existingItems, newItems)
err = service.updateExistingInvoiceItems(ctx, existingItems, newItems, record.ProjectID, period)
if err != nil {
return false, err
}
}
if service.useIdempotency {
if err = service.db.ProjectRecords().Consume(ctx, record.ID); err != nil {
return false, err
}
}
return false, nil
}
// getIdempotencyKey creates new unique idempotency key for given invoice item.
func getIdempotencyKey(projectID uuid.UUID, itemDesc string, period time.Time) string {
// We can't just use item.Description because it includes project name.
// There is a chance project name can be updated by the user during invoicing process.
itemIdentifier := itemDesc
if strings.Contains(itemDesc, storageInvoiceItemDesc) {
itemIdentifier = "storage"
} else if strings.Contains(itemDesc, egressInvoiceItemDesc) {
itemIdentifier = "egress"
} else if strings.Contains(itemDesc, segmentInvoiceItemDesc) {
itemIdentifier = "segment"
}
key := fmt.Sprintf("%s-%s-%s", projectID, itemIdentifier, period.Format("2006-01"))
key = strings.ToLower(strings.ReplaceAll(key, " ", "-"))
return key
}
// getExistingInvoiceItems lists 3 existing pending invoice line items for stripe customer.
func (service *Service) getExistingInvoiceItems(ctx context.Context, cusID string) (map[usage]*stripe.InvoiceItem, error) {
existingItemsIter := service.stripeClient.InvoiceItems().List(&stripe.InvoiceItemListParams{
@ -718,7 +778,7 @@ func (service *Service) getExistingInvoiceItems(ctx context.Context, cusID strin
}
// updateExistingInvoiceItems updates 3 existing pending invoice line items for stripe customer.
func (service *Service) updateExistingInvoiceItems(ctx context.Context, existingItems map[usage]*stripe.InvoiceItem, newItems []*stripe.InvoiceItemParams) (err error) {
func (service *Service) updateExistingInvoiceItems(ctx context.Context, existingItems map[usage]*stripe.InvoiceItem, newItems []*stripe.InvoiceItemParams, projectID uuid.UUID, period time.Time) (err error) {
for _, item := range newItems {
if strings.Contains(*item.Description, storageInvoiceItemDesc) {
existingItems[storage].Quantity += *item.Quantity
@ -730,10 +790,16 @@ func (service *Service) updateExistingInvoiceItems(ctx context.Context, existing
}
for _, item := range existingItems {
_, err = service.stripeClient.InvoiceItems().Update(item.ID, &stripe.InvoiceItemParams{
params := &stripe.InvoiceItemParams{
Params: stripe.Params{Context: ctx},
Quantity: stripe.Int64(item.Quantity),
})
}
if service.useIdempotency {
params.SetIdempotencyKey(getIdempotencyKey(projectID, item.Description, period))
}
_, err = service.stripeClient.InvoiceItems().Update(item.ID, params)
if err != nil {
return err
}

View File

@ -967,6 +967,9 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
# stripe API secret key
# payments.stripe-coin-payments.stripe-secret-key: ""
# whether to use idempotency for create/update requests
# payments.stripe-coin-payments.use-idempotency: false
# semicolon-separated usage price overrides in the format partner:storage,egress,segment,egress_discount_ratio. The egress discount ratio is the ratio of free egress per unit-month of storage
# payments.usage-price-overrides: ""