satellite/payments/stripecoinpayments: retry API calls with exp backoff
A Stripe backend implementation has been added that uses an exponential backoff strategy to retry failed API calls. This behavior can be configured in the satellite config. References #5156 Change-Id: I16ff21a39775ea331c442457f976be0c95a7b695
This commit is contained in:
parent
2dee4c9afc
commit
01f0e602b4
@ -4,15 +4,25 @@
|
||||
package stripecoinpayments
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"math"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/stripe/stripe-go/v72"
|
||||
"github.com/stripe/stripe-go/v72/charge"
|
||||
"github.com/stripe/stripe-go/v72/client"
|
||||
"github.com/stripe/stripe-go/v72/customerbalancetransaction"
|
||||
"github.com/stripe/stripe-go/v72/form"
|
||||
"github.com/stripe/stripe-go/v72/invoice"
|
||||
"github.com/stripe/stripe-go/v72/invoiceitem"
|
||||
"github.com/stripe/stripe-go/v72/paymentmethod"
|
||||
"github.com/stripe/stripe-go/v72/promotioncode"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/time2"
|
||||
)
|
||||
|
||||
// StripeClient Stripe client interface.
|
||||
@ -119,17 +129,145 @@ func (s *stripeClient) CreditNotes() StripeCreditNotes {
|
||||
|
||||
// NewStripeClient creates Stripe client from configuration.
|
||||
func NewStripeClient(log *zap.Logger, config Config) StripeClient {
|
||||
backendConfig := &stripe.BackendConfig{
|
||||
LeveledLogger: log.Sugar(),
|
||||
}
|
||||
|
||||
sClient := client.New(config.StripeSecretKey,
|
||||
&stripe.Backends{
|
||||
API: stripe.GetBackendWithConfig(stripe.APIBackend, backendConfig),
|
||||
Connect: stripe.GetBackendWithConfig(stripe.ConnectBackend, backendConfig),
|
||||
Uploads: stripe.GetBackendWithConfig(stripe.UploadsBackend, backendConfig),
|
||||
API: NewBackendWrapper(log, stripe.APIBackend, config.Retries),
|
||||
Connect: NewBackendWrapper(log, stripe.ConnectBackend, config.Retries),
|
||||
Uploads: NewBackendWrapper(log, stripe.UploadsBackend, config.Retries),
|
||||
},
|
||||
)
|
||||
|
||||
return &stripeClient{client: sClient}
|
||||
}
|
||||
|
||||
// RetryConfig contains the configuration for an exponential backoff strategy when retrying Stripe API calls.
|
||||
type RetryConfig struct {
|
||||
InitialBackoff time.Duration `help:"the duration of the first retry interval" default:"20ms"`
|
||||
MaxBackoff time.Duration `help:"the maximum duration of any retry interval" default:"5s"`
|
||||
Multiplier float64 `help:"the factor by which the retry interval will be multiplied on each iteration" default:"2"`
|
||||
MaxRetries int64 `help:"the maximum number of times to retry a request" default:"10"`
|
||||
}
|
||||
|
||||
// BackendWrapper is a wrapper for the Stripe backend that uses an exponential backoff strategy for retrying Stripe API calls.
|
||||
type BackendWrapper struct {
|
||||
backend stripe.Backend
|
||||
retryCfg RetryConfig
|
||||
clock time2.Clock
|
||||
}
|
||||
|
||||
// NewBackendWrapper creates a new wrapper for a Stripe backend.
|
||||
func NewBackendWrapper(log *zap.Logger, backendType stripe.SupportedBackend, retryCfg RetryConfig) *BackendWrapper {
|
||||
backendConfig := &stripe.BackendConfig{
|
||||
LeveledLogger: log.Sugar(),
|
||||
// Disable internal retries since we have our own retry+backoff strategy.
|
||||
MaxNetworkRetries: stripe.Int64(0),
|
||||
}
|
||||
|
||||
return &BackendWrapper{
|
||||
retryCfg: retryCfg,
|
||||
backend: stripe.GetBackendWithConfig(backendType, backendConfig),
|
||||
}
|
||||
}
|
||||
|
||||
// TestSwapBackend replaces the wrapped backend with the one specified for use in testing.
|
||||
func (w *BackendWrapper) TestSwapBackend(backend stripe.Backend) {
|
||||
w.backend = backend
|
||||
}
|
||||
|
||||
// TestSwapClock replaces the internal clock with the one specified for use in testing.
|
||||
func (w *BackendWrapper) TestSwapClock(clock time2.Clock) {
|
||||
w.clock = clock
|
||||
}
|
||||
|
||||
// Call implements the stripe.Backend interface.
|
||||
func (w *BackendWrapper) Call(method, path, key string, params stripe.ParamsContainer, v stripe.LastResponseSetter) error {
|
||||
return w.withRetries(params, func() error {
|
||||
return w.backend.Call(method, path, key, params, v)
|
||||
})
|
||||
}
|
||||
|
||||
// CallStreaming implements the stripe.Backend interface.
|
||||
func (w *BackendWrapper) CallStreaming(method, path, key string, params stripe.ParamsContainer, v stripe.StreamingLastResponseSetter) error {
|
||||
return w.withRetries(params, func() error {
|
||||
return w.backend.CallStreaming(method, path, key, params, v)
|
||||
})
|
||||
}
|
||||
|
||||
// CallRaw implements the stripe.Backend interface.
|
||||
func (w *BackendWrapper) CallRaw(method, path, key string, body *form.Values, params *stripe.Params, v stripe.LastResponseSetter) error {
|
||||
return w.withRetries(params, func() error {
|
||||
return w.backend.CallRaw(method, path, key, body, params, v)
|
||||
})
|
||||
}
|
||||
|
||||
// CallMultipart implements the stripe.Backend interface.
|
||||
func (w *BackendWrapper) CallMultipart(method, path, key, boundary string, body *bytes.Buffer, params *stripe.Params, v stripe.LastResponseSetter) error {
|
||||
return w.withRetries(params, func() error {
|
||||
return w.backend.CallMultipart(method, path, key, boundary, body, params, v)
|
||||
})
|
||||
}
|
||||
|
||||
// SetMaxNetworkRetries sets the maximum number of times to retry failed requests.
|
||||
func (w *BackendWrapper) SetMaxNetworkRetries(max int64) {
|
||||
w.retryCfg.MaxRetries = max
|
||||
}
|
||||
|
||||
// withRetries executes the provided Stripe API call using an exponential backoff strategy
|
||||
// for retrying in the case of failure.
|
||||
func (w *BackendWrapper) withRetries(params stripe.ParamsContainer, call func() error) error {
|
||||
ctx := context.Background()
|
||||
if params != nil {
|
||||
innerParams := params.GetParams()
|
||||
if innerParams != nil && innerParams.Context != nil {
|
||||
ctx = innerParams.Context
|
||||
}
|
||||
}
|
||||
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
backoff := float64(w.retryCfg.InitialBackoff)
|
||||
for retry := int64(0); ; retry++ {
|
||||
err := call()
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if !w.shouldRetry(retry, err) {
|
||||
return err
|
||||
}
|
||||
|
||||
if !w.clock.Sleep(ctx, time.Duration(backoff)) {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
backoff = math.Min(backoff*w.retryCfg.Multiplier, float64(w.retryCfg.MaxBackoff))
|
||||
}
|
||||
}
|
||||
|
||||
// shouldRetry returns whether a Stripe API call should be retried.
|
||||
func (w *BackendWrapper) shouldRetry(retry int64, err error) bool {
|
||||
if retry >= w.retryCfg.MaxRetries {
|
||||
return false
|
||||
}
|
||||
|
||||
var stripeErr *stripe.Error
|
||||
if !errors.As(err, &stripeErr) {
|
||||
return false
|
||||
}
|
||||
|
||||
resp := stripeErr.LastResponse
|
||||
if resp == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
switch resp.Header.Get("Stripe-Should-Retry") {
|
||||
case "true":
|
||||
return true
|
||||
case "false":
|
||||
return false
|
||||
}
|
||||
|
||||
return resp.StatusCode == http.StatusTooManyRequests
|
||||
}
|
||||
|
133
satellite/payments/stripecoinpayments/client_test.go
Normal file
133
satellite/payments/stripecoinpayments/client_test.go
Normal file
@ -0,0 +1,133 @@
|
||||
// Copyright (C) 2023 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package stripecoinpayments_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"net/http"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stripe/stripe-go/v72"
|
||||
"github.com/stripe/stripe-go/v72/form"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/time2"
|
||||
"storj.io/storj/satellite/payments/stripecoinpayments"
|
||||
)
|
||||
|
||||
var backendError = &stripe.Error{
|
||||
APIResource: stripe.APIResource{
|
||||
LastResponse: &stripe.APIResponse{
|
||||
Header: http.Header{
|
||||
"Stripe-Should-Retry": []string{"true"},
|
||||
},
|
||||
StatusCode: http.StatusTooManyRequests,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
type mockBackend struct {
|
||||
calls int64
|
||||
}
|
||||
|
||||
func (b *mockBackend) Call(method, path, key string, params stripe.ParamsContainer, v stripe.LastResponseSetter) error {
|
||||
b.calls++
|
||||
return backendError
|
||||
}
|
||||
|
||||
func (b *mockBackend) CallStreaming(method, path, key string, params stripe.ParamsContainer, v stripe.StreamingLastResponseSetter) error {
|
||||
return b.Call("", "", "", nil, nil)
|
||||
}
|
||||
|
||||
func (b *mockBackend) CallRaw(method, path, key string, body *form.Values, params *stripe.Params, v stripe.LastResponseSetter) error {
|
||||
return b.Call("", "", "", nil, nil)
|
||||
}
|
||||
|
||||
func (b *mockBackend) CallMultipart(method, path, key, boundary string, body *bytes.Buffer, params *stripe.Params, v stripe.LastResponseSetter) error {
|
||||
return b.Call("", "", "", nil, nil)
|
||||
}
|
||||
|
||||
func (b *mockBackend) SetMaxNetworkRetries(max int64) {}
|
||||
|
||||
func TestBackendWrapper(t *testing.T) {
|
||||
tm := time2.NewMachine()
|
||||
retryCfg := stripecoinpayments.RetryConfig{
|
||||
InitialBackoff: time.Millisecond,
|
||||
MaxBackoff: 3 * time.Millisecond,
|
||||
Multiplier: 2,
|
||||
MaxRetries: 5,
|
||||
}
|
||||
backend := stripecoinpayments.NewBackendWrapper(zaptest.NewLogger(t), stripe.APIBackend, retryCfg)
|
||||
mock := &mockBackend{}
|
||||
backend.TestSwapBackend(mock)
|
||||
backend.TestSwapClock(tm.Clock())
|
||||
|
||||
newCall := func(t *testing.T, ctx context.Context) (wait func(context.Context) error) {
|
||||
mock.calls = 0
|
||||
done := make(chan error)
|
||||
|
||||
go func() {
|
||||
done <- backend.Call("", "", "", &stripe.Params{Context: ctx}, nil)
|
||||
}()
|
||||
|
||||
wait = func(ctx context.Context) error {
|
||||
select {
|
||||
case err := <-done:
|
||||
return err
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
return wait
|
||||
}
|
||||
|
||||
t.Run("backoff intervals", func(t *testing.T) {
|
||||
ctx := testcontext.New(t)
|
||||
wait := newCall(t, ctx)
|
||||
|
||||
expectedBackoff := retryCfg.InitialBackoff
|
||||
for i := 0; i < int(retryCfg.MaxRetries); i++ {
|
||||
if !tm.BlockThenAdvance(ctx, 1, expectedBackoff) {
|
||||
t.Fatal("failed waiting for the client to attempt retry", i+1)
|
||||
}
|
||||
expectedBackoff *= 2
|
||||
if expectedBackoff > retryCfg.MaxBackoff {
|
||||
expectedBackoff = retryCfg.MaxBackoff
|
||||
}
|
||||
}
|
||||
|
||||
require.Error(t, wait(ctx), backendError)
|
||||
require.Equal(t, retryCfg.MaxRetries+1, mock.calls)
|
||||
})
|
||||
|
||||
t.Run("context cancellation", func(t *testing.T) {
|
||||
t.Run("before retries", func(t *testing.T) {
|
||||
ctx := testcontext.New(t)
|
||||
subCtx, cancel := context.WithCancel(ctx)
|
||||
cancel()
|
||||
|
||||
wait := newCall(t, subCtx)
|
||||
require.ErrorIs(t, wait(ctx), context.Canceled)
|
||||
require.Zero(t, mock.calls)
|
||||
})
|
||||
|
||||
t.Run("during retries", func(t *testing.T) {
|
||||
ctx := testcontext.New(t)
|
||||
subCtx, cancel := context.WithCancel(ctx)
|
||||
wait := newCall(t, subCtx)
|
||||
|
||||
if !tm.BlockThenAdvance(ctx, 1, retryCfg.InitialBackoff) {
|
||||
t.Fatal("failed waiting for the client to attempt first retry")
|
||||
}
|
||||
|
||||
cancel()
|
||||
require.ErrorIs(t, wait(ctx), context.Canceled)
|
||||
require.Equal(t, int64(2), mock.calls)
|
||||
})
|
||||
})
|
||||
}
|
@ -46,6 +46,7 @@ type Config struct {
|
||||
AutoAdvance bool `help:"toggle autoadvance feature for invoice creation" default:"false"`
|
||||
ListingLimit int `help:"sets the maximum amount of items before we start paging on requests" default:"100" hidden:"true"`
|
||||
SkipEmptyInvoices bool `help:"if set, skips the creation of empty invoices for customers with zero usage for the billing period" default:"true"`
|
||||
Retries RetryConfig
|
||||
}
|
||||
|
||||
// Service is an implementation for payment service via Stripe and Coinpayments.
|
||||
|
12
scripts/testdata/satellite-config.yaml.lock
vendored
12
scripts/testdata/satellite-config.yaml.lock
vendored
@ -862,6 +862,18 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
|
||||
# toggle autoadvance feature for invoice creation
|
||||
# payments.stripe-coin-payments.auto-advance: false
|
||||
|
||||
# the duration of the first retry interval
|
||||
# payments.stripe-coin-payments.retries.initial-backoff: 20ms
|
||||
|
||||
# the maximum duration of any retry interval
|
||||
# payments.stripe-coin-payments.retries.max-backoff: 5s
|
||||
|
||||
# the maximum number of times to retry a request
|
||||
# payments.stripe-coin-payments.retries.max-retries: 10
|
||||
|
||||
# the factor by which the retry interval will be multiplied on each iteration
|
||||
# payments.stripe-coin-payments.retries.multiplier: 2
|
||||
|
||||
# if set, skips the creation of empty invoices for customers with zero usage for the billing period
|
||||
# payments.stripe-coin-payments.skip-empty-invoices: true
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user