From 31f5e2cb65751c7534d43c29e47ff4f23524a7fb Mon Sep 17 00:00:00 2001 From: Michal Niewrzal Date: Wed, 29 Mar 2023 17:12:16 +0200 Subject: [PATCH] satellite/metainfo: limit uploads to the same location We would like to have ability to limit burst uploads to the single object (the same location). This change we are limiting such upload to one per second. Change-Id: Ib9351df1017cbc07d7fc2f846c2dbdbfcd3a360c --- satellite/metainfo/config.go | 9 ++++ satellite/metainfo/endpoint.go | 43 +++++++++-------- satellite/metainfo/endpoint_object.go | 4 ++ satellite/metainfo/endpoint_object_test.go | 51 +++++++++++++++++++++ satellite/metainfo/validation.go | 21 +++++++++ scripts/testdata/satellite-config.yaml.lock | 9 ++++ 6 files changed, 118 insertions(+), 19 deletions(-) diff --git a/satellite/metainfo/config.go b/satellite/metainfo/config.go index fae6bf9bd..747892de9 100644 --- a/satellite/metainfo/config.go +++ b/satellite/metainfo/config.go @@ -112,6 +112,14 @@ type RateLimiterConfig struct { CacheExpiration time.Duration `help:"how long to cache the projects limiter." releaseDefault:"10m" devDefault:"10s"` } +// UploadLimiterConfig is a configuration struct for endpoint upload limiting. +type UploadLimiterConfig struct { + Enabled bool `help:"whether rate limiting is enabled." releaseDefault:"true" devDefault:"true"` + SingleObjectLimit time.Duration `help:"how often we can upload to the single object (the same location) per API instance" default:"1s" devDefault:"1ms"` + + CacheCapacity int `help:"number of object locations to cache." releaseDefault:"10000" devDefault:"10" testDefault:"100"` +} + // ProjectLimitConfig is a configuration struct for default project limits. type ProjectLimitConfig struct { MaxBuckets int `help:"max bucket count for a project." default:"100" testDefault:"10"` @@ -134,6 +142,7 @@ type Config struct { RS RSConfig `releaseDefault:"29/35/80/110-256B" devDefault:"4/6/8/10-256B" help:"redundancy scheme configuration in the format k/m/o/n-sharesize"` SegmentLoop segmentloop.Config `help:"segment loop configuration"` RateLimiter RateLimiterConfig `help:"rate limiter configuration"` + UploadLimiter UploadLimiterConfig `help:"object upload limiter configuration"` ProjectLimits ProjectLimitConfig `help:"project limit configuration"` PieceDeletion piecedeletion.Config `help:"piece deletion configuration"` // TODO remove this flag when server-side copy implementation will be finished diff --git a/satellite/metainfo/endpoint.go b/satellite/metainfo/endpoint.go index 9a10a39f4..3898c9336 100644 --- a/satellite/metainfo/endpoint.go +++ b/satellite/metainfo/endpoint.go @@ -58,25 +58,26 @@ type APIKeys interface { type Endpoint struct { pb.DRPCMetainfoUnimplementedServer - log *zap.Logger - buckets *buckets.Service - metabase *metabase.DB - deletePieces *piecedeletion.Service - orders *orders.Service - overlay *overlay.Service - attributions attribution.DB - pointerVerification *pointerverification.Service - projectUsage *accounting.Service - projectLimits *accounting.ProjectLimitCache - projects console.Projects - apiKeys APIKeys - satellite signing.Signer - limiterCache *lrucache.ExpiringLRU - encInlineSegmentSize int64 // max inline segment size + encryption overhead - revocations revocation.DB - defaultRS *pb.RedundancyScheme - config Config - versionCollector *versionCollector + log *zap.Logger + buckets *buckets.Service + metabase *metabase.DB + deletePieces *piecedeletion.Service + orders *orders.Service + overlay *overlay.Service + attributions attribution.DB + pointerVerification *pointerverification.Service + projectUsage *accounting.Service + projectLimits *accounting.ProjectLimitCache + projects console.Projects + apiKeys APIKeys + satellite signing.Signer + limiterCache *lrucache.ExpiringLRU + singleObjectLimitCache *lrucache.ExpiringLRU + encInlineSegmentSize int64 // max inline segment size + encryption overhead + revocations revocation.DB + defaultRS *pb.RedundancyScheme + config Config + versionCollector *versionCollector } // NewEndpoint creates new metainfo endpoint instance. @@ -123,6 +124,10 @@ func NewEndpoint(log *zap.Logger, buckets *buckets.Service, metabaseDB *metabase Expiration: config.RateLimiter.CacheExpiration, Name: "metainfo-ratelimit", }), + singleObjectLimitCache: lrucache.New(lrucache.Options{ + Expiration: config.UploadLimiter.SingleObjectLimit, + Capacity: config.UploadLimiter.CacheCapacity, + }), encInlineSegmentSize: encInlineSegmentSize, revocations: revocations, defaultRS: defaultRSScheme, diff --git a/satellite/metainfo/endpoint_object.go b/satellite/metainfo/endpoint_object.go index 0cb411485..58f53f7f1 100644 --- a/satellite/metainfo/endpoint_object.go +++ b/satellite/metainfo/endpoint_object.go @@ -79,6 +79,10 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe return nil, err } + if err := endpoint.checkObjectUploadRate(keyInfo.ProjectID, req.Bucket, req.EncryptedObjectKey); err != nil { + return nil, err + } + // TODO this needs to be optimized to avoid DB call on each request placement, err := endpoint.buckets.GetBucketPlacement(ctx, req.Bucket, keyInfo.ProjectID) if err != nil { diff --git a/satellite/metainfo/endpoint_object_test.go b/satellite/metainfo/endpoint_object_test.go index 7e7271f16..403bacd41 100644 --- a/satellite/metainfo/endpoint_object_test.go +++ b/satellite/metainfo/endpoint_object_test.go @@ -660,6 +660,57 @@ func TestEndpoint_Object_No_StorageNodes(t *testing.T) { }) } }) + + }) +} + +func TestEndpoint_Object_UploadLimit(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: func(log *zap.Logger, index int, config *satellite.Config) { + config.Metainfo.UploadLimiter.SingleObjectLimit = 200 * time.Millisecond + }, + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()] + + metainfoClient, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey) + require.NoError(t, err) + defer ctx.Check(metainfoClient.Close) + + bucketName := "testbucket" + deleteBucket := func() error { + _, err := metainfoClient.DeleteBucket(ctx, metaclient.DeleteBucketParams{ + Name: []byte(bucketName), + DeleteAll: true, + }) + return err + } + + t.Run("limit single object upload", func(t *testing.T) { + defer ctx.Check(deleteBucket) + + // upload to the same location one by one should fail + err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, "single-object", []byte("test")) + require.NoError(t, err) + + err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, "single-object", []byte("test")) + require.Error(t, err) + require.True(t, errs2.IsRPC(err, rpcstatus.ResourceExhausted)) + + time.Sleep(500 * time.Millisecond) + + err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, "single-object", []byte("test")) + require.NoError(t, err) + + // upload to different locations one by one should NOT fail + err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, "single-objectA", []byte("test")) + require.NoError(t, err) + + err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, "single-objectB", []byte("test")) + require.NoError(t, err) + }) }) } diff --git a/satellite/metainfo/validation.go b/satellite/metainfo/validation.go index 4130a8eee..802983bcc 100644 --- a/satellite/metainfo/validation.go +++ b/satellite/metainfo/validation.go @@ -9,6 +9,7 @@ import ( "crypto/subtle" "regexp" "strconv" + "strings" "time" "github.com/jtolio/eventkit" @@ -492,3 +493,23 @@ func (endpoint *Endpoint) checkEncryptedMetadataSize(encryptedMetadata, encrypte } return nil } + +func (endpoint *Endpoint) checkObjectUploadRate(projectID uuid.UUID, bucketName []byte, objectKey []byte) error { + if !endpoint.config.UploadLimiter.Enabled { + return nil + } + + limited := true + // if object location is in cache it means that we won't allow to upload yet here, + // if it's not or internally key expired we are good to go + key := strings.Join([]string{string(projectID[:]), string(bucketName), string(objectKey)}, "/") + _, _ = endpoint.singleObjectLimitCache.Get(key, func() (interface{}, error) { + limited = false + return struct{}{}, nil + }) + if limited { + return rpcstatus.Error(rpcstatus.ResourceExhausted, "Too Many Requests") + } + + return nil +} diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index d91a89586..a30881249 100755 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -679,6 +679,15 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key # test the new query for non-recursive listing # metainfo.test-listing-query: false +# number of object locations to cache. +# metainfo.upload-limiter.cache-capacity: 10000 + +# whether rate limiting is enabled. +# metainfo.upload-limiter.enabled: true + +# how often we can upload to the single object (the same location) per API instance +# metainfo.upload-limiter.single-object-limit: 1s + # address(es) to send telemetry to (comma-separated) # metrics.addr: collectora.storj.io:9000