diff --git a/satellite/metainfo/config.go b/satellite/metainfo/config.go index fae6bf9bd..747892de9 100644 --- a/satellite/metainfo/config.go +++ b/satellite/metainfo/config.go @@ -112,6 +112,14 @@ type RateLimiterConfig struct { CacheExpiration time.Duration `help:"how long to cache the projects limiter." releaseDefault:"10m" devDefault:"10s"` } +// UploadLimiterConfig is a configuration struct for endpoint upload limiting. +type UploadLimiterConfig struct { + Enabled bool `help:"whether rate limiting is enabled." releaseDefault:"true" devDefault:"true"` + SingleObjectLimit time.Duration `help:"how often we can upload to the single object (the same location) per API instance" default:"1s" devDefault:"1ms"` + + CacheCapacity int `help:"number of object locations to cache." releaseDefault:"10000" devDefault:"10" testDefault:"100"` +} + // ProjectLimitConfig is a configuration struct for default project limits. type ProjectLimitConfig struct { MaxBuckets int `help:"max bucket count for a project." default:"100" testDefault:"10"` @@ -134,6 +142,7 @@ type Config struct { RS RSConfig `releaseDefault:"29/35/80/110-256B" devDefault:"4/6/8/10-256B" help:"redundancy scheme configuration in the format k/m/o/n-sharesize"` SegmentLoop segmentloop.Config `help:"segment loop configuration"` RateLimiter RateLimiterConfig `help:"rate limiter configuration"` + UploadLimiter UploadLimiterConfig `help:"object upload limiter configuration"` ProjectLimits ProjectLimitConfig `help:"project limit configuration"` PieceDeletion piecedeletion.Config `help:"piece deletion configuration"` // TODO remove this flag when server-side copy implementation will be finished diff --git a/satellite/metainfo/endpoint.go b/satellite/metainfo/endpoint.go index 9a10a39f4..3898c9336 100644 --- a/satellite/metainfo/endpoint.go +++ b/satellite/metainfo/endpoint.go @@ -58,25 +58,26 @@ type APIKeys interface { type Endpoint struct { pb.DRPCMetainfoUnimplementedServer - log *zap.Logger - buckets *buckets.Service - metabase *metabase.DB - deletePieces *piecedeletion.Service - orders *orders.Service - overlay *overlay.Service - attributions attribution.DB - pointerVerification *pointerverification.Service - projectUsage *accounting.Service - projectLimits *accounting.ProjectLimitCache - projects console.Projects - apiKeys APIKeys - satellite signing.Signer - limiterCache *lrucache.ExpiringLRU - encInlineSegmentSize int64 // max inline segment size + encryption overhead - revocations revocation.DB - defaultRS *pb.RedundancyScheme - config Config - versionCollector *versionCollector + log *zap.Logger + buckets *buckets.Service + metabase *metabase.DB + deletePieces *piecedeletion.Service + orders *orders.Service + overlay *overlay.Service + attributions attribution.DB + pointerVerification *pointerverification.Service + projectUsage *accounting.Service + projectLimits *accounting.ProjectLimitCache + projects console.Projects + apiKeys APIKeys + satellite signing.Signer + limiterCache *lrucache.ExpiringLRU + singleObjectLimitCache *lrucache.ExpiringLRU + encInlineSegmentSize int64 // max inline segment size + encryption overhead + revocations revocation.DB + defaultRS *pb.RedundancyScheme + config Config + versionCollector *versionCollector } // NewEndpoint creates new metainfo endpoint instance. @@ -123,6 +124,10 @@ func NewEndpoint(log *zap.Logger, buckets *buckets.Service, metabaseDB *metabase Expiration: config.RateLimiter.CacheExpiration, Name: "metainfo-ratelimit", }), + singleObjectLimitCache: lrucache.New(lrucache.Options{ + Expiration: config.UploadLimiter.SingleObjectLimit, + Capacity: config.UploadLimiter.CacheCapacity, + }), encInlineSegmentSize: encInlineSegmentSize, revocations: revocations, defaultRS: defaultRSScheme, diff --git a/satellite/metainfo/endpoint_object.go b/satellite/metainfo/endpoint_object.go index 0cb411485..58f53f7f1 100644 --- a/satellite/metainfo/endpoint_object.go +++ b/satellite/metainfo/endpoint_object.go @@ -79,6 +79,10 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe return nil, err } + if err := endpoint.checkObjectUploadRate(keyInfo.ProjectID, req.Bucket, req.EncryptedObjectKey); err != nil { + return nil, err + } + // TODO this needs to be optimized to avoid DB call on each request placement, err := endpoint.buckets.GetBucketPlacement(ctx, req.Bucket, keyInfo.ProjectID) if err != nil { diff --git a/satellite/metainfo/endpoint_object_test.go b/satellite/metainfo/endpoint_object_test.go index 7e7271f16..403bacd41 100644 --- a/satellite/metainfo/endpoint_object_test.go +++ b/satellite/metainfo/endpoint_object_test.go @@ -660,6 +660,57 @@ func TestEndpoint_Object_No_StorageNodes(t *testing.T) { }) } }) + + }) +} + +func TestEndpoint_Object_UploadLimit(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: func(log *zap.Logger, index int, config *satellite.Config) { + config.Metainfo.UploadLimiter.SingleObjectLimit = 200 * time.Millisecond + }, + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()] + + metainfoClient, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey) + require.NoError(t, err) + defer ctx.Check(metainfoClient.Close) + + bucketName := "testbucket" + deleteBucket := func() error { + _, err := metainfoClient.DeleteBucket(ctx, metaclient.DeleteBucketParams{ + Name: []byte(bucketName), + DeleteAll: true, + }) + return err + } + + t.Run("limit single object upload", func(t *testing.T) { + defer ctx.Check(deleteBucket) + + // upload to the same location one by one should fail + err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, "single-object", []byte("test")) + require.NoError(t, err) + + err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, "single-object", []byte("test")) + require.Error(t, err) + require.True(t, errs2.IsRPC(err, rpcstatus.ResourceExhausted)) + + time.Sleep(500 * time.Millisecond) + + err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, "single-object", []byte("test")) + require.NoError(t, err) + + // upload to different locations one by one should NOT fail + err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, "single-objectA", []byte("test")) + require.NoError(t, err) + + err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, "single-objectB", []byte("test")) + require.NoError(t, err) + }) }) } diff --git a/satellite/metainfo/validation.go b/satellite/metainfo/validation.go index 4130a8eee..802983bcc 100644 --- a/satellite/metainfo/validation.go +++ b/satellite/metainfo/validation.go @@ -9,6 +9,7 @@ import ( "crypto/subtle" "regexp" "strconv" + "strings" "time" "github.com/jtolio/eventkit" @@ -492,3 +493,23 @@ func (endpoint *Endpoint) checkEncryptedMetadataSize(encryptedMetadata, encrypte } return nil } + +func (endpoint *Endpoint) checkObjectUploadRate(projectID uuid.UUID, bucketName []byte, objectKey []byte) error { + if !endpoint.config.UploadLimiter.Enabled { + return nil + } + + limited := true + // if object location is in cache it means that we won't allow to upload yet here, + // if it's not or internally key expired we are good to go + key := strings.Join([]string{string(projectID[:]), string(bucketName), string(objectKey)}, "/") + _, _ = endpoint.singleObjectLimitCache.Get(key, func() (interface{}, error) { + limited = false + return struct{}{}, nil + }) + if limited { + return rpcstatus.Error(rpcstatus.ResourceExhausted, "Too Many Requests") + } + + return nil +} diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index d91a89586..a30881249 100755 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -679,6 +679,15 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key # test the new query for non-recursive listing # metainfo.test-listing-query: false +# number of object locations to cache. +# metainfo.upload-limiter.cache-capacity: 10000 + +# whether rate limiting is enabled. +# metainfo.upload-limiter.enabled: true + +# how often we can upload to the single object (the same location) per API instance +# metainfo.upload-limiter.single-object-limit: 1s + # address(es) to send telemetry to (comma-separated) # metrics.addr: collectora.storj.io:9000