satellite/metainfo: limit uploads to the same location

We would like to have ability to limit burst uploads to the single
object (the same location). This change we are limiting such upload to
one per second.

Change-Id: Ib9351df1017cbc07d7fc2f846c2dbdbfcd3a360c
This commit is contained in:
Michal Niewrzal 2023-03-29 17:12:16 +02:00 committed by Storj Robot
parent c1f1aacffe
commit 31f5e2cb65
6 changed files with 118 additions and 19 deletions

View File

@ -112,6 +112,14 @@ type RateLimiterConfig struct {
CacheExpiration time.Duration `help:"how long to cache the projects limiter." releaseDefault:"10m" devDefault:"10s"`
}
// UploadLimiterConfig is a configuration struct for endpoint upload limiting.
type UploadLimiterConfig struct {
Enabled bool `help:"whether rate limiting is enabled." releaseDefault:"true" devDefault:"true"`
SingleObjectLimit time.Duration `help:"how often we can upload to the single object (the same location) per API instance" default:"1s" devDefault:"1ms"`
CacheCapacity int `help:"number of object locations to cache." releaseDefault:"10000" devDefault:"10" testDefault:"100"`
}
// ProjectLimitConfig is a configuration struct for default project limits.
type ProjectLimitConfig struct {
MaxBuckets int `help:"max bucket count for a project." default:"100" testDefault:"10"`
@ -134,6 +142,7 @@ type Config struct {
RS RSConfig `releaseDefault:"29/35/80/110-256B" devDefault:"4/6/8/10-256B" help:"redundancy scheme configuration in the format k/m/o/n-sharesize"`
SegmentLoop segmentloop.Config `help:"segment loop configuration"`
RateLimiter RateLimiterConfig `help:"rate limiter configuration"`
UploadLimiter UploadLimiterConfig `help:"object upload limiter configuration"`
ProjectLimits ProjectLimitConfig `help:"project limit configuration"`
PieceDeletion piecedeletion.Config `help:"piece deletion configuration"`
// TODO remove this flag when server-side copy implementation will be finished

View File

@ -58,25 +58,26 @@ type APIKeys interface {
type Endpoint struct {
pb.DRPCMetainfoUnimplementedServer
log *zap.Logger
buckets *buckets.Service
metabase *metabase.DB
deletePieces *piecedeletion.Service
orders *orders.Service
overlay *overlay.Service
attributions attribution.DB
pointerVerification *pointerverification.Service
projectUsage *accounting.Service
projectLimits *accounting.ProjectLimitCache
projects console.Projects
apiKeys APIKeys
satellite signing.Signer
limiterCache *lrucache.ExpiringLRU
encInlineSegmentSize int64 // max inline segment size + encryption overhead
revocations revocation.DB
defaultRS *pb.RedundancyScheme
config Config
versionCollector *versionCollector
log *zap.Logger
buckets *buckets.Service
metabase *metabase.DB
deletePieces *piecedeletion.Service
orders *orders.Service
overlay *overlay.Service
attributions attribution.DB
pointerVerification *pointerverification.Service
projectUsage *accounting.Service
projectLimits *accounting.ProjectLimitCache
projects console.Projects
apiKeys APIKeys
satellite signing.Signer
limiterCache *lrucache.ExpiringLRU
singleObjectLimitCache *lrucache.ExpiringLRU
encInlineSegmentSize int64 // max inline segment size + encryption overhead
revocations revocation.DB
defaultRS *pb.RedundancyScheme
config Config
versionCollector *versionCollector
}
// NewEndpoint creates new metainfo endpoint instance.
@ -123,6 +124,10 @@ func NewEndpoint(log *zap.Logger, buckets *buckets.Service, metabaseDB *metabase
Expiration: config.RateLimiter.CacheExpiration,
Name: "metainfo-ratelimit",
}),
singleObjectLimitCache: lrucache.New(lrucache.Options{
Expiration: config.UploadLimiter.SingleObjectLimit,
Capacity: config.UploadLimiter.CacheCapacity,
}),
encInlineSegmentSize: encInlineSegmentSize,
revocations: revocations,
defaultRS: defaultRSScheme,

View File

@ -79,6 +79,10 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe
return nil, err
}
if err := endpoint.checkObjectUploadRate(keyInfo.ProjectID, req.Bucket, req.EncryptedObjectKey); err != nil {
return nil, err
}
// TODO this needs to be optimized to avoid DB call on each request
placement, err := endpoint.buckets.GetBucketPlacement(ctx, req.Bucket, keyInfo.ProjectID)
if err != nil {

View File

@ -660,6 +660,57 @@ func TestEndpoint_Object_No_StorageNodes(t *testing.T) {
})
}
})
})
}
func TestEndpoint_Object_UploadLimit(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Metainfo.UploadLimiter.SingleObjectLimit = 200 * time.Millisecond
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()]
metainfoClient, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey)
require.NoError(t, err)
defer ctx.Check(metainfoClient.Close)
bucketName := "testbucket"
deleteBucket := func() error {
_, err := metainfoClient.DeleteBucket(ctx, metaclient.DeleteBucketParams{
Name: []byte(bucketName),
DeleteAll: true,
})
return err
}
t.Run("limit single object upload", func(t *testing.T) {
defer ctx.Check(deleteBucket)
// upload to the same location one by one should fail
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, "single-object", []byte("test"))
require.NoError(t, err)
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, "single-object", []byte("test"))
require.Error(t, err)
require.True(t, errs2.IsRPC(err, rpcstatus.ResourceExhausted))
time.Sleep(500 * time.Millisecond)
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, "single-object", []byte("test"))
require.NoError(t, err)
// upload to different locations one by one should NOT fail
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, "single-objectA", []byte("test"))
require.NoError(t, err)
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, "single-objectB", []byte("test"))
require.NoError(t, err)
})
})
}

View File

@ -9,6 +9,7 @@ import (
"crypto/subtle"
"regexp"
"strconv"
"strings"
"time"
"github.com/jtolio/eventkit"
@ -492,3 +493,23 @@ func (endpoint *Endpoint) checkEncryptedMetadataSize(encryptedMetadata, encrypte
}
return nil
}
func (endpoint *Endpoint) checkObjectUploadRate(projectID uuid.UUID, bucketName []byte, objectKey []byte) error {
if !endpoint.config.UploadLimiter.Enabled {
return nil
}
limited := true
// if object location is in cache it means that we won't allow to upload yet here,
// if it's not or internally key expired we are good to go
key := strings.Join([]string{string(projectID[:]), string(bucketName), string(objectKey)}, "/")
_, _ = endpoint.singleObjectLimitCache.Get(key, func() (interface{}, error) {
limited = false
return struct{}{}, nil
})
if limited {
return rpcstatus.Error(rpcstatus.ResourceExhausted, "Too Many Requests")
}
return nil
}

View File

@ -679,6 +679,15 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
# test the new query for non-recursive listing
# metainfo.test-listing-query: false
# number of object locations to cache.
# metainfo.upload-limiter.cache-capacity: 10000
# whether rate limiting is enabled.
# metainfo.upload-limiter.enabled: true
# how often we can upload to the single object (the same location) per API instance
# metainfo.upload-limiter.single-object-limit: 1s
# address(es) to send telemetry to (comma-separated)
# metrics.addr: collectora.storj.io:9000