satellite/metainfo: use project limit cache with limiter
Metainfo needs to know rate and burst limit to be able to limit users requests. We made cache for per project limiter but to make single instance we need to know about limits. So far we were doing direct DB call to get rate/burst limit for project but it's generating lots of DB requests and can be easily cached as we even have project limit cache. This change extends project limit cache with rate/burst limit and starts using this change while creating project limiter instance for metainfo. Because data size kept in project limit cache is quite small this change also bumps a bit default capacity of the cache. Fixes https://github.com/storj/storj/issues/5663 Change-Id: Icb42ec1632bfa0c9f74857b559083dcbd054d071
This commit is contained in:
parent
59ebb0ef27
commit
ee720040c9
@ -86,11 +86,14 @@ type ProjectObjectsSegments struct {
|
||||
ObjectCount int64 `json:"objectCount"`
|
||||
}
|
||||
|
||||
// ProjectLimits contains the storage, bandwidth and segments limits.
|
||||
// ProjectLimits contains the project limits.
|
||||
type ProjectLimits struct {
|
||||
Usage *int64
|
||||
Bandwidth *int64
|
||||
Segments *int64
|
||||
|
||||
RateLimit *int
|
||||
BurstLimit *int
|
||||
}
|
||||
|
||||
// ProjectDailyUsage holds project daily usage.
|
||||
|
@ -33,7 +33,7 @@ type ProjectLimitDB interface {
|
||||
|
||||
// ProjectLimitConfig is a configuration struct for project limit.
|
||||
type ProjectLimitConfig struct {
|
||||
CacheCapacity int `help:"number of projects to cache." releaseDefault:"10000" devDefault:"100"`
|
||||
CacheCapacity int `help:"number of projects to cache." releaseDefault:"20000" devDefault:"100"`
|
||||
CacheExpiration time.Duration `help:"how long to cache the project limits." releaseDefault:"10m" devDefault:"30s"`
|
||||
}
|
||||
|
||||
|
@ -58,21 +58,33 @@ func TestProjectLimitCache(t *testing.T) {
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
saPeer := planet.Satellites[0]
|
||||
projectUsageSvc := saPeer.Accounting.ProjectUsage
|
||||
projects := saPeer.DB.Console().Projects()
|
||||
accountingDB := saPeer.DB.ProjectAccounting()
|
||||
projectLimitCache := saPeer.ProjectLimits.Cache
|
||||
defaultUsageLimit := saPeer.Config.Console.UsageLimits.Storage.Free.Int64()
|
||||
defaultBandwidthLimit := saPeer.Config.Console.UsageLimits.Bandwidth.Free.Int64()
|
||||
defaultSegmentLimit := int64(1000000)
|
||||
dbDefaultLimits := accounting.ProjectLimits{Usage: &defaultUsageLimit, Bandwidth: &defaultBandwidthLimit, Segments: &defaultSegmentLimit}
|
||||
dbDefaultLimits := accounting.ProjectLimits{
|
||||
Usage: &defaultUsageLimit,
|
||||
Bandwidth: &defaultBandwidthLimit,
|
||||
Segments: &defaultSegmentLimit,
|
||||
RateLimit: nil,
|
||||
BurstLimit: nil,
|
||||
}
|
||||
|
||||
testProject, err := saPeer.DB.Console().Projects().Insert(ctx, &console.Project{Name: "test", OwnerID: testrand.UUID()})
|
||||
require.NoError(t, err)
|
||||
|
||||
secondTestProject, err := saPeer.DB.Console().Projects().Insert(ctx, &console.Project{Name: "second project", OwnerID: testrand.UUID()})
|
||||
require.NoError(t, err)
|
||||
|
||||
const (
|
||||
errorLimit = 0
|
||||
expectedUsageLimit = 1
|
||||
expectedBandwidthLimit = 2
|
||||
expectedSegmentLimit = 3
|
||||
expectedRateLimit = 4
|
||||
expectedBurstLimit = 5
|
||||
)
|
||||
|
||||
t.Run("project ID doesn't exist", func(t *testing.T) {
|
||||
@ -185,6 +197,15 @@ func TestProjectLimitCache(t *testing.T) {
|
||||
actualSegmentLimitFromDB, err := accountingDB.GetProjectSegmentLimit(ctx, testProject.ID)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, expectedSegmentLimit, *actualSegmentLimitFromDB)
|
||||
|
||||
// rate and burst limit
|
||||
require.NoError(t, projects.UpdateRateLimit(ctx, secondTestProject.ID, expectedRateLimit))
|
||||
require.NoError(t, projects.UpdateBurstLimit(ctx, secondTestProject.ID, expectedBurstLimit))
|
||||
|
||||
limits, err := projectLimitCache.GetLimits(ctx, secondTestProject.ID)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, expectedRateLimit, *limits.RateLimit)
|
||||
require.EqualValues(t, expectedBurstLimit, *limits.BurstLimit)
|
||||
})
|
||||
|
||||
t.Run("cache is used", func(t *testing.T) {
|
||||
|
@ -458,6 +458,7 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
|
||||
peer.DB.PeerIdentities(),
|
||||
peer.DB.Console().APIKeys(),
|
||||
peer.Accounting.ProjectUsage,
|
||||
peer.ProjectLimits.Cache,
|
||||
peer.DB.Console().Projects(),
|
||||
signing.SignerFromFullIdentity(peer.Identity),
|
||||
peer.DB.Revocation(),
|
||||
|
@ -67,6 +67,7 @@ type Endpoint struct {
|
||||
attributions attribution.DB
|
||||
pointerVerification *pointerverification.Service
|
||||
projectUsage *accounting.Service
|
||||
projectLimits *accounting.ProjectLimitCache
|
||||
projects console.Projects
|
||||
apiKeys APIKeys
|
||||
satellite signing.Signer
|
||||
@ -82,7 +83,7 @@ type Endpoint struct {
|
||||
func NewEndpoint(log *zap.Logger, buckets *buckets.Service, metabaseDB *metabase.DB,
|
||||
deletePieces *piecedeletion.Service, orders *orders.Service, cache *overlay.Service,
|
||||
attributions attribution.DB, peerIdentities overlay.PeerIdentities,
|
||||
apiKeys APIKeys, projectUsage *accounting.Service, projects console.Projects,
|
||||
apiKeys APIKeys, projectUsage *accounting.Service, projectLimits *accounting.ProjectLimitCache, projects console.Projects,
|
||||
satellite signing.Signer, revocations revocation.DB, config Config) (*Endpoint, error) {
|
||||
// TODO do something with too many params
|
||||
|
||||
@ -114,6 +115,7 @@ func NewEndpoint(log *zap.Logger, buckets *buckets.Service, metabaseDB *metabase
|
||||
pointerVerification: pointerverification.NewService(peerIdentities),
|
||||
apiKeys: apiKeys,
|
||||
projectUsage: projectUsage,
|
||||
projectLimits: projectLimits,
|
||||
projects: projects,
|
||||
satellite: satellite,
|
||||
limiterCache: lrucache.New(lrucache.Options{
|
||||
|
@ -212,17 +212,17 @@ func (endpoint *Endpoint) checkRate(ctx context.Context, projectID uuid.UUID) (e
|
||||
rateLimit := rate.Limit(endpoint.config.RateLimiter.Rate)
|
||||
burstLimit := int(endpoint.config.RateLimiter.Rate)
|
||||
|
||||
project, err := endpoint.projects.Get(ctx, projectID)
|
||||
limits, err := endpoint.projectLimits.GetLimits(ctx, projectID)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if project.RateLimit != nil {
|
||||
rateLimit = rate.Limit(*project.RateLimit)
|
||||
burstLimit = *project.RateLimit
|
||||
if limits.RateLimit != nil {
|
||||
rateLimit = rate.Limit(*limits.RateLimit)
|
||||
burstLimit = *limits.RateLimit
|
||||
}
|
||||
// use the explicitly set burst value if it's defined
|
||||
if project.BurstLimit != nil {
|
||||
burstLimit = *project.BurstLimit
|
||||
if limits.BurstLimit != nil {
|
||||
burstLimit = *limits.BurstLimit
|
||||
}
|
||||
|
||||
return rate.NewLimiter(rateLimit, burstLimit), nil
|
||||
|
@ -89,7 +89,7 @@ read one (
|
||||
where project.id = ?
|
||||
)
|
||||
read one (
|
||||
select project.bandwidth_limit project.usage_limit project.segment_limit
|
||||
select project.bandwidth_limit project.usage_limit project.segment_limit project.rate_limit project.burst_limit
|
||||
where project.id = ?
|
||||
)
|
||||
|
||||
|
@ -11632,10 +11632,12 @@ type BandwidthLimit_Row struct {
|
||||
BandwidthLimit *int64
|
||||
}
|
||||
|
||||
type BandwidthLimit_UsageLimit_SegmentLimit_Row struct {
|
||||
type BandwidthLimit_UsageLimit_SegmentLimit_RateLimit_BurstLimit_Row struct {
|
||||
BandwidthLimit *int64
|
||||
UsageLimit *int64
|
||||
SegmentLimit *int64
|
||||
RateLimit *int
|
||||
BurstLimit *int
|
||||
}
|
||||
|
||||
type BlockNumber_Row struct {
|
||||
@ -15331,12 +15333,12 @@ func (obj *pgxImpl) Get_Project_MaxBuckets_By_Id(ctx context.Context,
|
||||
|
||||
}
|
||||
|
||||
func (obj *pgxImpl) Get_Project_BandwidthLimit_Project_UsageLimit_Project_SegmentLimit_By_Id(ctx context.Context,
|
||||
func (obj *pgxImpl) Get_Project_BandwidthLimit_Project_UsageLimit_Project_SegmentLimit_Project_RateLimit_Project_BurstLimit_By_Id(ctx context.Context,
|
||||
project_id Project_Id_Field) (
|
||||
row *BandwidthLimit_UsageLimit_SegmentLimit_Row, err error) {
|
||||
row *BandwidthLimit_UsageLimit_SegmentLimit_RateLimit_BurstLimit_Row, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
var __embed_stmt = __sqlbundle_Literal("SELECT projects.bandwidth_limit, projects.usage_limit, projects.segment_limit FROM projects WHERE projects.id = ?")
|
||||
var __embed_stmt = __sqlbundle_Literal("SELECT projects.bandwidth_limit, projects.usage_limit, projects.segment_limit, projects.rate_limit, projects.burst_limit FROM projects WHERE projects.id = ?")
|
||||
|
||||
var __values []interface{}
|
||||
__values = append(__values, project_id.value())
|
||||
@ -15344,10 +15346,10 @@ func (obj *pgxImpl) Get_Project_BandwidthLimit_Project_UsageLimit_Project_Segmen
|
||||
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
|
||||
obj.logStmt(__stmt, __values...)
|
||||
|
||||
row = &BandwidthLimit_UsageLimit_SegmentLimit_Row{}
|
||||
err = obj.queryRowContext(ctx, __stmt, __values...).Scan(&row.BandwidthLimit, &row.UsageLimit, &row.SegmentLimit)
|
||||
row = &BandwidthLimit_UsageLimit_SegmentLimit_RateLimit_BurstLimit_Row{}
|
||||
err = obj.queryRowContext(ctx, __stmt, __values...).Scan(&row.BandwidthLimit, &row.UsageLimit, &row.SegmentLimit, &row.RateLimit, &row.BurstLimit)
|
||||
if err != nil {
|
||||
return (*BandwidthLimit_UsageLimit_SegmentLimit_Row)(nil), obj.makeErr(err)
|
||||
return (*BandwidthLimit_UsageLimit_SegmentLimit_RateLimit_BurstLimit_Row)(nil), obj.makeErr(err)
|
||||
}
|
||||
return row, nil
|
||||
|
||||
@ -22917,12 +22919,12 @@ func (obj *pgxcockroachImpl) Get_Project_MaxBuckets_By_Id(ctx context.Context,
|
||||
|
||||
}
|
||||
|
||||
func (obj *pgxcockroachImpl) Get_Project_BandwidthLimit_Project_UsageLimit_Project_SegmentLimit_By_Id(ctx context.Context,
|
||||
func (obj *pgxcockroachImpl) Get_Project_BandwidthLimit_Project_UsageLimit_Project_SegmentLimit_Project_RateLimit_Project_BurstLimit_By_Id(ctx context.Context,
|
||||
project_id Project_Id_Field) (
|
||||
row *BandwidthLimit_UsageLimit_SegmentLimit_Row, err error) {
|
||||
row *BandwidthLimit_UsageLimit_SegmentLimit_RateLimit_BurstLimit_Row, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
var __embed_stmt = __sqlbundle_Literal("SELECT projects.bandwidth_limit, projects.usage_limit, projects.segment_limit FROM projects WHERE projects.id = ?")
|
||||
var __embed_stmt = __sqlbundle_Literal("SELECT projects.bandwidth_limit, projects.usage_limit, projects.segment_limit, projects.rate_limit, projects.burst_limit FROM projects WHERE projects.id = ?")
|
||||
|
||||
var __values []interface{}
|
||||
__values = append(__values, project_id.value())
|
||||
@ -22930,10 +22932,10 @@ func (obj *pgxcockroachImpl) Get_Project_BandwidthLimit_Project_UsageLimit_Proje
|
||||
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
|
||||
obj.logStmt(__stmt, __values...)
|
||||
|
||||
row = &BandwidthLimit_UsageLimit_SegmentLimit_Row{}
|
||||
err = obj.queryRowContext(ctx, __stmt, __values...).Scan(&row.BandwidthLimit, &row.UsageLimit, &row.SegmentLimit)
|
||||
row = &BandwidthLimit_UsageLimit_SegmentLimit_RateLimit_BurstLimit_Row{}
|
||||
err = obj.queryRowContext(ctx, __stmt, __values...).Scan(&row.BandwidthLimit, &row.UsageLimit, &row.SegmentLimit, &row.RateLimit, &row.BurstLimit)
|
||||
if err != nil {
|
||||
return (*BandwidthLimit_UsageLimit_SegmentLimit_Row)(nil), obj.makeErr(err)
|
||||
return (*BandwidthLimit_UsageLimit_SegmentLimit_RateLimit_BurstLimit_Row)(nil), obj.makeErr(err)
|
||||
}
|
||||
return row, nil
|
||||
|
||||
@ -28096,14 +28098,14 @@ func (rx *Rx) Get_Project_BandwidthLimit_By_Id(ctx context.Context,
|
||||
return tx.Get_Project_BandwidthLimit_By_Id(ctx, project_id)
|
||||
}
|
||||
|
||||
func (rx *Rx) Get_Project_BandwidthLimit_Project_UsageLimit_Project_SegmentLimit_By_Id(ctx context.Context,
|
||||
func (rx *Rx) Get_Project_BandwidthLimit_Project_UsageLimit_Project_SegmentLimit_Project_RateLimit_Project_BurstLimit_By_Id(ctx context.Context,
|
||||
project_id Project_Id_Field) (
|
||||
row *BandwidthLimit_UsageLimit_SegmentLimit_Row, err error) {
|
||||
row *BandwidthLimit_UsageLimit_SegmentLimit_RateLimit_BurstLimit_Row, err error) {
|
||||
var tx *Tx
|
||||
if tx, err = rx.getTx(ctx); err != nil {
|
||||
return
|
||||
}
|
||||
return tx.Get_Project_BandwidthLimit_Project_UsageLimit_Project_SegmentLimit_By_Id(ctx, project_id)
|
||||
return tx.Get_Project_BandwidthLimit_Project_UsageLimit_Project_SegmentLimit_Project_RateLimit_Project_BurstLimit_By_Id(ctx, project_id)
|
||||
}
|
||||
|
||||
func (rx *Rx) Get_Project_By_Id(ctx context.Context,
|
||||
@ -29434,9 +29436,9 @@ type Methods interface {
|
||||
project_id Project_Id_Field) (
|
||||
row *BandwidthLimit_Row, err error)
|
||||
|
||||
Get_Project_BandwidthLimit_Project_UsageLimit_Project_SegmentLimit_By_Id(ctx context.Context,
|
||||
Get_Project_BandwidthLimit_Project_UsageLimit_Project_SegmentLimit_Project_RateLimit_Project_BurstLimit_By_Id(ctx context.Context,
|
||||
project_id Project_Id_Field) (
|
||||
row *BandwidthLimit_UsageLimit_SegmentLimit_Row, err error)
|
||||
row *BandwidthLimit_UsageLimit_SegmentLimit_RateLimit_BurstLimit_Row, err error)
|
||||
|
||||
Get_Project_By_Id(ctx context.Context,
|
||||
project_id Project_Id_Field) (
|
||||
|
@ -1090,7 +1090,7 @@ func timeTruncateDown(t time.Time) time.Time {
|
||||
func (db *ProjectAccounting) GetProjectLimits(ctx context.Context, projectID uuid.UUID) (_ accounting.ProjectLimits, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
row, err := db.db.Get_Project_BandwidthLimit_Project_UsageLimit_Project_SegmentLimit_By_Id(ctx,
|
||||
row, err := db.db.Get_Project_BandwidthLimit_Project_UsageLimit_Project_SegmentLimit_Project_RateLimit_Project_BurstLimit_By_Id(ctx,
|
||||
dbx.Project_Id(projectID[:]),
|
||||
)
|
||||
if err != nil {
|
||||
@ -1101,6 +1101,9 @@ func (db *ProjectAccounting) GetProjectLimits(ctx context.Context, projectID uui
|
||||
Usage: row.UsageLimit,
|
||||
Bandwidth: row.BandwidthLimit,
|
||||
Segments: row.SegmentLimit,
|
||||
|
||||
RateLimit: row.RateLimit,
|
||||
BurstLimit: row.BurstLimit,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
2
scripts/testdata/satellite-config.yaml.lock
vendored
2
scripts/testdata/satellite-config.yaml.lock
vendored
@ -893,7 +893,7 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
|
||||
# project-bw-cleanup.retain-months: 2
|
||||
|
||||
# number of projects to cache.
|
||||
# project-limit.cache-capacity: 10000
|
||||
# project-limit.cache-capacity: 20000
|
||||
|
||||
# how long to cache the project limits.
|
||||
# project-limit.cache-expiration: 10m0s
|
||||
|
Loading…
Reference in New Issue
Block a user