From 01f0e602b450a66fcc220b12388fac1f182e3706 Mon Sep 17 00:00:00 2001 From: Jeremy Wharton Date: Fri, 3 Mar 2023 02:20:28 -0600 Subject: [PATCH] satellite/payments/stripecoinpayments: retry API calls with exp backoff A Stripe backend implementation has been added that uses an exponential backoff strategy to retry failed API calls. This behavior can be configured in the satellite config. References #5156 Change-Id: I16ff21a39775ea331c442457f976be0c95a7b695 --- .../payments/stripecoinpayments/client.go | 152 +++++++++++++++++- .../stripecoinpayments/client_test.go | 133 +++++++++++++++ .../payments/stripecoinpayments/service.go | 1 + scripts/testdata/satellite-config.yaml.lock | 12 ++ 4 files changed, 291 insertions(+), 7 deletions(-) create mode 100644 satellite/payments/stripecoinpayments/client_test.go diff --git a/satellite/payments/stripecoinpayments/client.go b/satellite/payments/stripecoinpayments/client.go index 796d9922c..297fe660f 100644 --- a/satellite/payments/stripecoinpayments/client.go +++ b/satellite/payments/stripecoinpayments/client.go @@ -4,15 +4,25 @@ package stripecoinpayments import ( + "bytes" + "context" + "errors" + "math" + "net/http" + "time" + "github.com/stripe/stripe-go/v72" "github.com/stripe/stripe-go/v72/charge" "github.com/stripe/stripe-go/v72/client" "github.com/stripe/stripe-go/v72/customerbalancetransaction" + "github.com/stripe/stripe-go/v72/form" "github.com/stripe/stripe-go/v72/invoice" "github.com/stripe/stripe-go/v72/invoiceitem" "github.com/stripe/stripe-go/v72/paymentmethod" "github.com/stripe/stripe-go/v72/promotioncode" "go.uber.org/zap" + + "storj.io/common/time2" ) // StripeClient Stripe client interface. @@ -119,17 +129,145 @@ func (s *stripeClient) CreditNotes() StripeCreditNotes { // NewStripeClient creates Stripe client from configuration. func NewStripeClient(log *zap.Logger, config Config) StripeClient { - backendConfig := &stripe.BackendConfig{ - LeveledLogger: log.Sugar(), - } - sClient := client.New(config.StripeSecretKey, &stripe.Backends{ - API: stripe.GetBackendWithConfig(stripe.APIBackend, backendConfig), - Connect: stripe.GetBackendWithConfig(stripe.ConnectBackend, backendConfig), - Uploads: stripe.GetBackendWithConfig(stripe.UploadsBackend, backendConfig), + API: NewBackendWrapper(log, stripe.APIBackend, config.Retries), + Connect: NewBackendWrapper(log, stripe.ConnectBackend, config.Retries), + Uploads: NewBackendWrapper(log, stripe.UploadsBackend, config.Retries), }, ) return &stripeClient{client: sClient} } + +// RetryConfig contains the configuration for an exponential backoff strategy when retrying Stripe API calls. +type RetryConfig struct { + InitialBackoff time.Duration `help:"the duration of the first retry interval" default:"20ms"` + MaxBackoff time.Duration `help:"the maximum duration of any retry interval" default:"5s"` + Multiplier float64 `help:"the factor by which the retry interval will be multiplied on each iteration" default:"2"` + MaxRetries int64 `help:"the maximum number of times to retry a request" default:"10"` +} + +// BackendWrapper is a wrapper for the Stripe backend that uses an exponential backoff strategy for retrying Stripe API calls. +type BackendWrapper struct { + backend stripe.Backend + retryCfg RetryConfig + clock time2.Clock +} + +// NewBackendWrapper creates a new wrapper for a Stripe backend. +func NewBackendWrapper(log *zap.Logger, backendType stripe.SupportedBackend, retryCfg RetryConfig) *BackendWrapper { + backendConfig := &stripe.BackendConfig{ + LeveledLogger: log.Sugar(), + // Disable internal retries since we have our own retry+backoff strategy. + MaxNetworkRetries: stripe.Int64(0), + } + + return &BackendWrapper{ + retryCfg: retryCfg, + backend: stripe.GetBackendWithConfig(backendType, backendConfig), + } +} + +// TestSwapBackend replaces the wrapped backend with the one specified for use in testing. +func (w *BackendWrapper) TestSwapBackend(backend stripe.Backend) { + w.backend = backend +} + +// TestSwapClock replaces the internal clock with the one specified for use in testing. +func (w *BackendWrapper) TestSwapClock(clock time2.Clock) { + w.clock = clock +} + +// Call implements the stripe.Backend interface. +func (w *BackendWrapper) Call(method, path, key string, params stripe.ParamsContainer, v stripe.LastResponseSetter) error { + return w.withRetries(params, func() error { + return w.backend.Call(method, path, key, params, v) + }) +} + +// CallStreaming implements the stripe.Backend interface. +func (w *BackendWrapper) CallStreaming(method, path, key string, params stripe.ParamsContainer, v stripe.StreamingLastResponseSetter) error { + return w.withRetries(params, func() error { + return w.backend.CallStreaming(method, path, key, params, v) + }) +} + +// CallRaw implements the stripe.Backend interface. +func (w *BackendWrapper) CallRaw(method, path, key string, body *form.Values, params *stripe.Params, v stripe.LastResponseSetter) error { + return w.withRetries(params, func() error { + return w.backend.CallRaw(method, path, key, body, params, v) + }) +} + +// CallMultipart implements the stripe.Backend interface. +func (w *BackendWrapper) CallMultipart(method, path, key, boundary string, body *bytes.Buffer, params *stripe.Params, v stripe.LastResponseSetter) error { + return w.withRetries(params, func() error { + return w.backend.CallMultipart(method, path, key, boundary, body, params, v) + }) +} + +// SetMaxNetworkRetries sets the maximum number of times to retry failed requests. +func (w *BackendWrapper) SetMaxNetworkRetries(max int64) { + w.retryCfg.MaxRetries = max +} + +// withRetries executes the provided Stripe API call using an exponential backoff strategy +// for retrying in the case of failure. +func (w *BackendWrapper) withRetries(params stripe.ParamsContainer, call func() error) error { + ctx := context.Background() + if params != nil { + innerParams := params.GetParams() + if innerParams != nil && innerParams.Context != nil { + ctx = innerParams.Context + } + } + + if err := ctx.Err(); err != nil { + return err + } + + backoff := float64(w.retryCfg.InitialBackoff) + for retry := int64(0); ; retry++ { + err := call() + if err == nil { + return nil + } + + if !w.shouldRetry(retry, err) { + return err + } + + if !w.clock.Sleep(ctx, time.Duration(backoff)) { + return ctx.Err() + } + + backoff = math.Min(backoff*w.retryCfg.Multiplier, float64(w.retryCfg.MaxBackoff)) + } +} + +// shouldRetry returns whether a Stripe API call should be retried. +func (w *BackendWrapper) shouldRetry(retry int64, err error) bool { + if retry >= w.retryCfg.MaxRetries { + return false + } + + var stripeErr *stripe.Error + if !errors.As(err, &stripeErr) { + return false + } + + resp := stripeErr.LastResponse + if resp == nil { + return false + } + + switch resp.Header.Get("Stripe-Should-Retry") { + case "true": + return true + case "false": + return false + } + + return resp.StatusCode == http.StatusTooManyRequests +} diff --git a/satellite/payments/stripecoinpayments/client_test.go b/satellite/payments/stripecoinpayments/client_test.go new file mode 100644 index 000000000..7c8cdf675 --- /dev/null +++ b/satellite/payments/stripecoinpayments/client_test.go @@ -0,0 +1,133 @@ +// Copyright (C) 2023 Storj Labs, Inc. +// See LICENSE for copying information. + +package stripecoinpayments_test + +import ( + "bytes" + "context" + "net/http" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/stripe/stripe-go/v72" + "github.com/stripe/stripe-go/v72/form" + "go.uber.org/zap/zaptest" + + "storj.io/common/testcontext" + "storj.io/common/time2" + "storj.io/storj/satellite/payments/stripecoinpayments" +) + +var backendError = &stripe.Error{ + APIResource: stripe.APIResource{ + LastResponse: &stripe.APIResponse{ + Header: http.Header{ + "Stripe-Should-Retry": []string{"true"}, + }, + StatusCode: http.StatusTooManyRequests, + }, + }, +} + +type mockBackend struct { + calls int64 +} + +func (b *mockBackend) Call(method, path, key string, params stripe.ParamsContainer, v stripe.LastResponseSetter) error { + b.calls++ + return backendError +} + +func (b *mockBackend) CallStreaming(method, path, key string, params stripe.ParamsContainer, v stripe.StreamingLastResponseSetter) error { + return b.Call("", "", "", nil, nil) +} + +func (b *mockBackend) CallRaw(method, path, key string, body *form.Values, params *stripe.Params, v stripe.LastResponseSetter) error { + return b.Call("", "", "", nil, nil) +} + +func (b *mockBackend) CallMultipart(method, path, key, boundary string, body *bytes.Buffer, params *stripe.Params, v stripe.LastResponseSetter) error { + return b.Call("", "", "", nil, nil) +} + +func (b *mockBackend) SetMaxNetworkRetries(max int64) {} + +func TestBackendWrapper(t *testing.T) { + tm := time2.NewMachine() + retryCfg := stripecoinpayments.RetryConfig{ + InitialBackoff: time.Millisecond, + MaxBackoff: 3 * time.Millisecond, + Multiplier: 2, + MaxRetries: 5, + } + backend := stripecoinpayments.NewBackendWrapper(zaptest.NewLogger(t), stripe.APIBackend, retryCfg) + mock := &mockBackend{} + backend.TestSwapBackend(mock) + backend.TestSwapClock(tm.Clock()) + + newCall := func(t *testing.T, ctx context.Context) (wait func(context.Context) error) { + mock.calls = 0 + done := make(chan error) + + go func() { + done <- backend.Call("", "", "", &stripe.Params{Context: ctx}, nil) + }() + + wait = func(ctx context.Context) error { + select { + case err := <-done: + return err + case <-ctx.Done(): + return ctx.Err() + } + } + return wait + } + + t.Run("backoff intervals", func(t *testing.T) { + ctx := testcontext.New(t) + wait := newCall(t, ctx) + + expectedBackoff := retryCfg.InitialBackoff + for i := 0; i < int(retryCfg.MaxRetries); i++ { + if !tm.BlockThenAdvance(ctx, 1, expectedBackoff) { + t.Fatal("failed waiting for the client to attempt retry", i+1) + } + expectedBackoff *= 2 + if expectedBackoff > retryCfg.MaxBackoff { + expectedBackoff = retryCfg.MaxBackoff + } + } + + require.Error(t, wait(ctx), backendError) + require.Equal(t, retryCfg.MaxRetries+1, mock.calls) + }) + + t.Run("context cancellation", func(t *testing.T) { + t.Run("before retries", func(t *testing.T) { + ctx := testcontext.New(t) + subCtx, cancel := context.WithCancel(ctx) + cancel() + + wait := newCall(t, subCtx) + require.ErrorIs(t, wait(ctx), context.Canceled) + require.Zero(t, mock.calls) + }) + + t.Run("during retries", func(t *testing.T) { + ctx := testcontext.New(t) + subCtx, cancel := context.WithCancel(ctx) + wait := newCall(t, subCtx) + + if !tm.BlockThenAdvance(ctx, 1, retryCfg.InitialBackoff) { + t.Fatal("failed waiting for the client to attempt first retry") + } + + cancel() + require.ErrorIs(t, wait(ctx), context.Canceled) + require.Equal(t, int64(2), mock.calls) + }) + }) +} diff --git a/satellite/payments/stripecoinpayments/service.go b/satellite/payments/stripecoinpayments/service.go index 32befb2f6..2e83af7ed 100644 --- a/satellite/payments/stripecoinpayments/service.go +++ b/satellite/payments/stripecoinpayments/service.go @@ -46,6 +46,7 @@ type Config struct { AutoAdvance bool `help:"toggle autoadvance feature for invoice creation" default:"false"` ListingLimit int `help:"sets the maximum amount of items before we start paging on requests" default:"100" hidden:"true"` SkipEmptyInvoices bool `help:"if set, skips the creation of empty invoices for customers with zero usage for the billing period" default:"true"` + Retries RetryConfig } // Service is an implementation for payment service via Stripe and Coinpayments. diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index 8da8e2406..aa113c632 100755 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -862,6 +862,18 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key # toggle autoadvance feature for invoice creation # payments.stripe-coin-payments.auto-advance: false +# the duration of the first retry interval +# payments.stripe-coin-payments.retries.initial-backoff: 20ms + +# the maximum duration of any retry interval +# payments.stripe-coin-payments.retries.max-backoff: 5s + +# the maximum number of times to retry a request +# payments.stripe-coin-payments.retries.max-retries: 10 + +# the factor by which the retry interval will be multiplied on each iteration +# payments.stripe-coin-payments.retries.multiplier: 2 + # if set, skips the creation of empty invoices for customers with zero usage for the billing period # payments.stripe-coin-payments.skip-empty-invoices: true