satellite/accounting: add cache for getting project storage and bw limits
This PR adds the following items: 1) an in-memory read-only cache thats stores project limit info for projectIDs This cache is stored in-memory since this is expected to be a small amount of data. In this implementation we are only storing in the cache projects that have been accessed. Currently for the largest Satellite (eu-west) there is about 4500 total projects. So storing the storage limit (int64) and the bandwidth limit (int64), this would end up being about 200kb (including the 32 byte project ID) if all 4500 projectIDs were in the cache. So this all fits in memory for the time being. At some point it may not as usage grows, but that seems years out. The cache is a read only cache. When requests come in to upload/download a file, we will read from the cache what the current limits are for that project. If the cache does not contain the projectID, it will get the info from the database (satellitedb project table), then add it to the cache. The only time the values in the cache are modified is when either a) the project ID is not in the cache, or b) the item in the cache has expired (default 10mins), then the data gets refreshed out of the database. This occurs by default every 10 mins. This means that if we update the usage limits in the database, that change might not show up in the cache for 10 mins which mean it will not be reflected to limit end users uploading/downloading files for that time period.. Change-Id: I3fd7056cf963676009834fcbcf9c4a0922ca4a8f
This commit is contained in:
parent
888bfaae4b
commit
4a2c66fa06
@ -152,6 +152,10 @@ type Satellite struct {
|
||||
Cache accounting.Cache
|
||||
}
|
||||
|
||||
ProjectLimits struct {
|
||||
Cache *accounting.ProjectLimitCache
|
||||
}
|
||||
|
||||
Mail struct {
|
||||
Service *mailservice.Service
|
||||
}
|
||||
@ -726,12 +730,14 @@ func createNewSystem(log *zap.Logger, config satellite.Config, peer *satellite.C
|
||||
|
||||
system.Accounting.Tally = peer.Accounting.Tally
|
||||
system.Accounting.Rollup = peer.Accounting.Rollup
|
||||
system.Accounting.ProjectUsage = peer.Accounting.ProjectUsage
|
||||
system.Accounting.ProjectUsage = api.Accounting.ProjectUsage
|
||||
system.Accounting.ReportedRollup = peer.Accounting.ReportedRollupChore
|
||||
system.Accounting.ProjectBWCleanup = peer.Accounting.ProjectBWCleanupChore
|
||||
|
||||
system.LiveAccounting = peer.LiveAccounting
|
||||
|
||||
system.ProjectLimits.Cache = api.ProjectLimits.Cache
|
||||
|
||||
system.Marketing.Listener = api.Marketing.Listener
|
||||
system.Marketing.Endpoint = api.Marketing.Endpoint
|
||||
|
||||
|
@ -76,6 +76,12 @@ type ProjectUsage struct {
|
||||
Before time.Time `json:"before"`
|
||||
}
|
||||
|
||||
// ProjectLimits contains the storage and bandwidth limits.
|
||||
type ProjectLimits struct {
|
||||
Usage *int64
|
||||
Bandwidth *int64
|
||||
}
|
||||
|
||||
// BucketUsage consist of total bucket usage for period.
|
||||
type BucketUsage struct {
|
||||
ProjectID uuid.UUID
|
||||
@ -185,6 +191,8 @@ type ProjectAccounting interface {
|
||||
GetProjectStorageLimit(ctx context.Context, projectID uuid.UUID) (*int64, error)
|
||||
// GetProjectBandwidthLimit returns project bandwidth usage limit.
|
||||
GetProjectBandwidthLimit(ctx context.Context, projectID uuid.UUID) (*int64, error)
|
||||
// GetProjectLimits returns current project limit for both storage and bandwidth.
|
||||
GetProjectLimits(ctx context.Context, projectID uuid.UUID) (ProjectLimits, error)
|
||||
// GetProjectTotal returns project usage summary for specified period of time.
|
||||
GetProjectTotal(ctx context.Context, projectID uuid.UUID, since, before time.Time) (*ProjectUsage, error)
|
||||
// GetBucketUsageRollups returns usage rollup per each bucket for specified period of time.
|
||||
|
124
satellite/accounting/projectlimitcache.go
Normal file
124
satellite/accounting/projectlimitcache.go
Normal file
@ -0,0 +1,124 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package accounting
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/common/memory"
|
||||
"storj.io/common/uuid"
|
||||
lrucache "storj.io/storj/pkg/cache"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrProjectLimitType error for project limit type.
|
||||
ErrProjectLimitType = errs.Class("project limit type error")
|
||||
// ErrGetProjectLimit error for getting project limits from database.
|
||||
ErrGetProjectLimit = errs.Class("get project limits error")
|
||||
// ErrGetProjectLimitCache error for getting project limits from cache.
|
||||
ErrGetProjectLimitCache = errs.Class("get project limits from cache error")
|
||||
)
|
||||
|
||||
// ProjectLimitDB stores information about projects limits for storage and bandwidth limits.
|
||||
//
|
||||
// architecture: Database
|
||||
type ProjectLimitDB interface {
|
||||
// GetProjectLimits returns current project limit for both storage and bandwidth.
|
||||
GetProjectLimits(ctx context.Context, projectID uuid.UUID) (ProjectLimits, error)
|
||||
}
|
||||
|
||||
// ProjectLimitConfig is a configuration struct for project limit.
|
||||
type ProjectLimitConfig struct {
|
||||
CacheCapacity int `help:"number of projects to cache." releaseDefault:"10000" devDefault:"100"`
|
||||
CacheExpiration time.Duration `help:"how long to cache the project limits." releaseDefault:"10m" devDefault:"30s"`
|
||||
}
|
||||
|
||||
// ProjectLimitCache stores the values for both storage usage limit and bandwidth limit for
|
||||
// each project ID if they differ from the default limits.
|
||||
type ProjectLimitCache struct {
|
||||
projectLimitDB ProjectLimitDB
|
||||
defaultMaxUsage memory.Size
|
||||
defaultMaxBandwidth memory.Size
|
||||
|
||||
state *lrucache.ExpiringLRU
|
||||
}
|
||||
|
||||
// NewProjectLimitCache creates a new project limit cache to store the project limits for each project ID.
|
||||
func NewProjectLimitCache(db ProjectLimitDB, defaultMaxUsage, defaultMaxBandwidth memory.Size, config ProjectLimitConfig) *ProjectLimitCache {
|
||||
return &ProjectLimitCache{
|
||||
projectLimitDB: db,
|
||||
defaultMaxUsage: defaultMaxUsage,
|
||||
defaultMaxBandwidth: defaultMaxBandwidth,
|
||||
state: lrucache.New(lrucache.Options{
|
||||
Capacity: config.CacheCapacity,
|
||||
Expiration: config.CacheExpiration,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
// GetProjectLimits returns current project limit for both storage and bandwidth.
|
||||
func (c *ProjectLimitCache) GetProjectLimits(ctx context.Context, projectID uuid.UUID) (_ ProjectLimits, err error) {
|
||||
defer mon.Task()(&ctx, projectID)(&err)
|
||||
|
||||
projectLimits, err := c.projectLimitDB.GetProjectLimits(ctx, projectID)
|
||||
if err != nil {
|
||||
return ProjectLimits{}, ErrGetProjectLimit.Wrap(err)
|
||||
}
|
||||
if projectLimits.Bandwidth == nil {
|
||||
defaultBandwidth := c.defaultMaxBandwidth.Int64()
|
||||
projectLimits.Bandwidth = &defaultBandwidth
|
||||
}
|
||||
if projectLimits.Usage == nil {
|
||||
defaultUsage := c.defaultMaxUsage.Int64()
|
||||
projectLimits.Usage = &defaultUsage
|
||||
}
|
||||
|
||||
return projectLimits, nil
|
||||
}
|
||||
|
||||
// Get returns the storage usage limit for a project ID.
|
||||
func (c *ProjectLimitCache) Get(ctx context.Context, projectID uuid.UUID) (ProjectLimits, error) {
|
||||
fn := func() (interface{}, error) {
|
||||
return c.GetProjectLimits(ctx, projectID)
|
||||
}
|
||||
projectLimits, err := c.state.Get(projectID.String(), fn)
|
||||
if err != nil {
|
||||
return ProjectLimits{}, ErrGetProjectLimitCache.Wrap(err)
|
||||
}
|
||||
limits, ok := projectLimits.(ProjectLimits)
|
||||
if !ok {
|
||||
return ProjectLimits{}, ErrProjectLimitType.New("cache Get error")
|
||||
}
|
||||
return limits, nil
|
||||
}
|
||||
|
||||
// GetProjectStorageLimit returns the storage usage limit for a project ID.
|
||||
func (c *ProjectLimitCache) GetProjectStorageLimit(ctx context.Context, projectID uuid.UUID) (_ memory.Size, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
projectLimits, err := c.Get(ctx, projectID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if projectLimits.Usage == nil {
|
||||
return c.defaultMaxUsage, nil
|
||||
}
|
||||
return memory.Size(*projectLimits.Usage), nil
|
||||
|
||||
}
|
||||
|
||||
// GetProjectBandwidthLimit return the bandwidth usage limit for a project ID.
|
||||
func (c *ProjectLimitCache) GetProjectBandwidthLimit(ctx context.Context, projectID uuid.UUID) (_ memory.Size, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
projectLimits, err := c.Get(ctx, projectID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if projectLimits.Bandwidth == nil {
|
||||
return c.defaultMaxBandwidth, nil
|
||||
}
|
||||
return memory.Size(*projectLimits.Bandwidth), nil
|
||||
}
|
161
satellite/accounting/projectlimitcache_test.go
Normal file
161
satellite/accounting/projectlimitcache_test.go
Normal file
@ -0,0 +1,161 @@
|
||||
// Copyright (C) 2020 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package accounting_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"storj.io/common/memory"
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/testrand"
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/accounting"
|
||||
"storj.io/storj/satellite/console"
|
||||
"storj.io/storj/satellite/satellitedb/satellitedbtest"
|
||||
)
|
||||
|
||||
type mockDB struct {
|
||||
callCount int
|
||||
}
|
||||
|
||||
func (mdb *mockDB) GetProjectLimits(ctx context.Context, projectID uuid.UUID) (accounting.ProjectLimits, error) {
|
||||
mdb.callCount++
|
||||
return accounting.ProjectLimits{}, nil
|
||||
}
|
||||
func TestProjectLimitCacheCallCount(t *testing.T) {
|
||||
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
||||
mdb := mockDB{}
|
||||
projectLimitCache := accounting.NewProjectLimitCache(&mdb, 0, 0, accounting.ProjectLimitConfig{CacheCapacity: 100})
|
||||
|
||||
testProject, err := db.Console().Projects().Insert(ctx, &console.Project{Name: "test", OwnerID: testrand.UUID()})
|
||||
require.NoError(t, err)
|
||||
|
||||
const expectedCallCount = 1
|
||||
|
||||
_, err = projectLimitCache.GetProjectBandwidthLimit(ctx, testProject.ID)
|
||||
require.NoError(t, err)
|
||||
// if the data isn't in the cache we call into the database to get it
|
||||
require.Equal(t, expectedCallCount, mdb.callCount)
|
||||
|
||||
_, err = projectLimitCache.GetProjectBandwidthLimit(ctx, testProject.ID)
|
||||
require.NoError(t, err)
|
||||
// call count should still be 1 since the data is in the cache and we don't need
|
||||
// to get it from the db
|
||||
require.Equal(t, expectedCallCount, mdb.callCount)
|
||||
})
|
||||
}
|
||||
|
||||
func TestProjectLimitCache(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 0,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
saPeer := planet.Satellites[0]
|
||||
projectUsageSvc := saPeer.Accounting.ProjectUsage
|
||||
accountingDB := saPeer.DB.ProjectAccounting()
|
||||
projectLimitCache := saPeer.ProjectLimits.Cache
|
||||
|
||||
testProject, err := saPeer.DB.Console().Projects().Insert(ctx, &console.Project{Name: "test", OwnerID: testrand.UUID()})
|
||||
require.NoError(t, err)
|
||||
|
||||
const (
|
||||
dbDefaultLimits = 50000000000
|
||||
errorLimit = 0
|
||||
expectedUsageLimit = 1
|
||||
expectedBandwidthLimit = 2
|
||||
)
|
||||
|
||||
t.Run("project ID doesn't exist", func(t *testing.T) {
|
||||
projectID := testrand.UUID()
|
||||
actualStorageLimitFromDB, err := accountingDB.GetProjectStorageLimit(ctx, projectID)
|
||||
assert.Error(t, err)
|
||||
assert.Nil(t, actualStorageLimitFromDB)
|
||||
|
||||
actualLimitsFromDB, err := accountingDB.GetProjectLimits(ctx, projectID)
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, accounting.ProjectLimits{}, actualLimitsFromDB)
|
||||
|
||||
actualStorageLimitFromCache, err := projectLimitCache.GetProjectStorageLimit(ctx, projectID)
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, memory.Size(errorLimit), actualStorageLimitFromCache)
|
||||
|
||||
actualStorageLimitFromSvc, err := projectUsageSvc.GetProjectStorageLimit(ctx, projectID)
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, memory.Size(errorLimit), actualStorageLimitFromSvc)
|
||||
})
|
||||
|
||||
t.Run("default limits", func(t *testing.T) {
|
||||
actualStorageLimitFromDB, err := accountingDB.GetProjectStorageLimit(ctx, testProject.ID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, int64(dbDefaultLimits), *actualStorageLimitFromDB)
|
||||
|
||||
actualLimitsFromDB, err := accountingDB.GetProjectLimits(ctx, testProject.ID)
|
||||
assert.NoError(t, err)
|
||||
defaultLimits := int64(dbDefaultLimits)
|
||||
assert.Equal(t, accounting.ProjectLimits{Usage: &defaultLimits, Bandwidth: &defaultLimits}, actualLimitsFromDB)
|
||||
|
||||
actualStorageLimitFromCache, err := projectLimitCache.GetProjectStorageLimit(ctx, testProject.ID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, memory.Size(dbDefaultLimits), actualStorageLimitFromCache)
|
||||
|
||||
actualStorageLimitFromSvc, err := projectUsageSvc.GetProjectStorageLimit(ctx, testProject.ID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, memory.Size(dbDefaultLimits), actualStorageLimitFromSvc)
|
||||
|
||||
actualBandwidthLimitFromDB, err := accountingDB.GetProjectBandwidthLimit(ctx, testProject.ID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, int64(dbDefaultLimits), *actualBandwidthLimitFromDB)
|
||||
|
||||
actualBandwidthLimitFromCache, err := projectLimitCache.GetProjectBandwidthLimit(ctx, testProject.ID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, memory.Size(dbDefaultLimits), actualBandwidthLimitFromCache)
|
||||
|
||||
actualBandwidthLimitFromSvc, err := projectUsageSvc.GetProjectBandwidthLimit(ctx, testProject.ID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, memory.Size(dbDefaultLimits), actualBandwidthLimitFromSvc)
|
||||
})
|
||||
|
||||
t.Run("update limits in the database", func(t *testing.T) {
|
||||
err = accountingDB.UpdateProjectUsageLimit(ctx, testProject.ID, expectedUsageLimit)
|
||||
require.NoError(t, err)
|
||||
err = accountingDB.UpdateProjectBandwidthLimit(ctx, testProject.ID, expectedBandwidthLimit)
|
||||
require.NoError(t, err)
|
||||
|
||||
actualStorageLimitFromDB, err := accountingDB.GetProjectStorageLimit(ctx, testProject.ID)
|
||||
assert.NoError(t, err)
|
||||
require.Equal(t, int64(expectedUsageLimit), *actualStorageLimitFromDB)
|
||||
|
||||
actualLimitsFromDB, err := accountingDB.GetProjectLimits(ctx, testProject.ID)
|
||||
assert.NoError(t, err)
|
||||
usageLimits := int64(expectedUsageLimit)
|
||||
bwLimits := int64(expectedBandwidthLimit)
|
||||
assert.Equal(t, accounting.ProjectLimits{Usage: &usageLimits, Bandwidth: &bwLimits}, actualLimitsFromDB)
|
||||
|
||||
actualStorageLimitFromCache, err := projectLimitCache.GetProjectStorageLimit(ctx, testProject.ID)
|
||||
assert.NoError(t, err)
|
||||
require.Equal(t, memory.Size(expectedUsageLimit), actualStorageLimitFromCache)
|
||||
|
||||
actualStorageLimitFromSvc, err := projectUsageSvc.GetProjectStorageLimit(ctx, testProject.ID)
|
||||
assert.NoError(t, err)
|
||||
require.Equal(t, memory.Size(expectedUsageLimit), actualStorageLimitFromSvc)
|
||||
|
||||
actualBandwidthLimitFromDB, err := accountingDB.GetProjectBandwidthLimit(ctx, testProject.ID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(expectedBandwidthLimit), *actualBandwidthLimitFromDB)
|
||||
|
||||
actualBandwidthLimitFromCache, err := projectLimitCache.GetProjectBandwidthLimit(ctx, testProject.ID)
|
||||
assert.NoError(t, err)
|
||||
require.Equal(t, memory.Size(expectedBandwidthLimit), actualBandwidthLimitFromCache)
|
||||
|
||||
actualBandwidthLimitFromSvc, err := projectUsageSvc.GetProjectBandwidthLimit(ctx, testProject.ID)
|
||||
assert.NoError(t, err)
|
||||
require.Equal(t, memory.Size(expectedBandwidthLimit), actualBandwidthLimitFromSvc)
|
||||
})
|
||||
})
|
||||
}
|
@ -29,19 +29,17 @@ var (
|
||||
type Service struct {
|
||||
projectAccountingDB ProjectAccounting
|
||||
liveAccounting Cache
|
||||
defaultMaxUsage memory.Size
|
||||
defaultMaxBandwidth memory.Size
|
||||
projectLimitCache *ProjectLimitCache
|
||||
bandwidthCacheTTL time.Duration
|
||||
nowFn func() time.Time
|
||||
}
|
||||
|
||||
// NewService created new instance of project usage service.
|
||||
func NewService(projectAccountingDB ProjectAccounting, liveAccounting Cache, defaultMaxUsage, defaultMaxBandwidth memory.Size, bandwidthCacheTTL time.Duration) *Service {
|
||||
func NewService(projectAccountingDB ProjectAccounting, liveAccounting Cache, limitCache *ProjectLimitCache, bandwidthCacheTTL time.Duration) *Service {
|
||||
return &Service{
|
||||
projectAccountingDB: projectAccountingDB,
|
||||
liveAccounting: liveAccounting,
|
||||
defaultMaxUsage: defaultMaxUsage,
|
||||
defaultMaxBandwidth: defaultMaxBandwidth,
|
||||
projectLimitCache: limitCache,
|
||||
bandwidthCacheTTL: bandwidthCacheTTL,
|
||||
nowFn: time.Now,
|
||||
}
|
||||
@ -50,7 +48,6 @@ func NewService(projectAccountingDB ProjectAccounting, liveAccounting Cache, def
|
||||
// ExceedsBandwidthUsage returns true if the bandwidth usage limits have been exceeded
|
||||
// for a project in the past month (30 days). The usage limit is (e.g 25GB) multiplied by the redundancy
|
||||
// expansion factor, so that the uplinks have a raw limit.
|
||||
// Ref: https://storjlabs.atlassian.net/browse/V3-1274
|
||||
func (usage *Service) ExceedsBandwidthUsage(ctx context.Context, projectID uuid.UUID) (_ bool, limit memory.Size, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
@ -58,10 +55,9 @@ func (usage *Service) ExceedsBandwidthUsage(ctx context.Context, projectID uuid.
|
||||
var bandwidthGetTotal int64
|
||||
var bandwidthUsage int64
|
||||
|
||||
// TODO(michal): to reduce db load, consider using a cache to retrieve the project.UsageLimit value if needed
|
||||
group.Go(func() error {
|
||||
var err error
|
||||
limit, err = usage.GetProjectBandwidthLimit(ctx, projectID)
|
||||
limit, err = usage.projectLimitCache.GetProjectBandwidthLimit(ctx, projectID)
|
||||
return err
|
||||
})
|
||||
group.Go(func() error {
|
||||
@ -113,10 +109,9 @@ func (usage *Service) ExceedsStorageUsage(ctx context.Context, projectID uuid.UU
|
||||
var group errgroup.Group
|
||||
var totalUsed int64
|
||||
|
||||
// TODO(michal): to reduce db load, consider using a cache to retrieve the project.UsageLimit value if needed
|
||||
group.Go(func() error {
|
||||
var err error
|
||||
limit, err = usage.GetProjectStorageLimit(ctx, projectID)
|
||||
limit, err = usage.projectLimitCache.GetProjectStorageLimit(ctx, projectID)
|
||||
return err
|
||||
})
|
||||
group.Go(func() error {
|
||||
@ -169,31 +164,13 @@ func (usage *Service) GetProjectAllocatedBandwidth(ctx context.Context, projectI
|
||||
// GetProjectStorageLimit returns current project storage limit.
|
||||
func (usage *Service) GetProjectStorageLimit(ctx context.Context, projectID uuid.UUID) (_ memory.Size, err error) {
|
||||
defer mon.Task()(&ctx, projectID)(&err)
|
||||
|
||||
limit, err := usage.projectAccountingDB.GetProjectStorageLimit(ctx, projectID)
|
||||
if err != nil {
|
||||
return 0, ErrProjectUsage.Wrap(err)
|
||||
}
|
||||
if limit == nil {
|
||||
return usage.defaultMaxUsage, nil
|
||||
}
|
||||
|
||||
return memory.Size(*limit), nil
|
||||
return usage.projectLimitCache.GetProjectStorageLimit(ctx, projectID)
|
||||
}
|
||||
|
||||
// GetProjectBandwidthLimit returns current project bandwidth limit.
|
||||
func (usage *Service) GetProjectBandwidthLimit(ctx context.Context, projectID uuid.UUID) (_ memory.Size, err error) {
|
||||
defer mon.Task()(&ctx, projectID)(&err)
|
||||
|
||||
limit, err := usage.projectAccountingDB.GetProjectBandwidthLimit(ctx, projectID)
|
||||
if err != nil {
|
||||
return 0, ErrProjectUsage.Wrap(err)
|
||||
}
|
||||
if limit == nil {
|
||||
return usage.defaultMaxBandwidth, nil
|
||||
}
|
||||
|
||||
return memory.Size(*limit), nil
|
||||
return usage.projectLimitCache.GetProjectBandwidthLimit(ctx, projectID)
|
||||
}
|
||||
|
||||
// UpdateProjectLimits sets new value for project's bandwidth and storage limit.
|
||||
|
@ -328,7 +328,6 @@ func TestProjectUsageCustomLimit(t *testing.T) {
|
||||
project := projects[0]
|
||||
// set custom usage limit for project
|
||||
expectedLimit := memory.Size(memory.GiB.Int64() * 10)
|
||||
|
||||
err = acctDB.UpdateProjectUsageLimit(ctx, project.ID, expectedLimit)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -118,6 +118,10 @@ type API struct {
|
||||
Cache accounting.Cache
|
||||
}
|
||||
|
||||
ProjectLimits struct {
|
||||
Cache *accounting.ProjectLimitCache
|
||||
}
|
||||
|
||||
Mail struct {
|
||||
Service *mailservice.Service
|
||||
}
|
||||
@ -293,12 +297,19 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
|
||||
peer.LiveAccounting.Cache = liveAccounting
|
||||
}
|
||||
|
||||
{ // setup project limits
|
||||
peer.ProjectLimits.Cache = accounting.NewProjectLimitCache(peer.DB.ProjectAccounting(),
|
||||
config.Metainfo.ProjectLimits.DefaultMaxUsage,
|
||||
config.Metainfo.ProjectLimits.DefaultMaxBandwidth,
|
||||
config.ProjectLimit,
|
||||
)
|
||||
}
|
||||
|
||||
{ // setup accounting project usage
|
||||
peer.Accounting.ProjectUsage = accounting.NewService(
|
||||
peer.DB.ProjectAccounting(),
|
||||
peer.LiveAccounting.Cache,
|
||||
config.Metainfo.ProjectLimits.DefaultMaxUsage,
|
||||
config.Metainfo.ProjectLimits.DefaultMaxBandwidth,
|
||||
peer.ProjectLimits.Cache,
|
||||
config.LiveAccounting.BandwidthCacheTTL,
|
||||
)
|
||||
}
|
||||
|
@ -67,7 +67,9 @@ func TestGrapqhlMutation(t *testing.T) {
|
||||
cache, err := live.NewCache(log.Named("cache"), live.Config{StorageBackend: "redis://" + redis.Addr() + "?db=0"})
|
||||
require.NoError(t, err)
|
||||
|
||||
projectUsage := accounting.NewService(db.ProjectAccounting(), cache, 0, 0, 5*time.Minute)
|
||||
projectLimitCache := accounting.NewProjectLimitCache(db.ProjectAccounting(), 0, 0, accounting.ProjectLimitConfig{CacheCapacity: 100})
|
||||
|
||||
projectUsage := accounting.NewService(db.ProjectAccounting(), cache, projectLimitCache, 5*time.Minute)
|
||||
|
||||
// TODO maybe switch this test to testplanet to avoid defining config and Stripe service
|
||||
pc := paymentsconfig.Config{
|
||||
|
@ -51,7 +51,9 @@ func TestGraphqlQuery(t *testing.T) {
|
||||
cache, err := live.NewCache(log.Named("cache"), live.Config{StorageBackend: "redis://" + redis.Addr() + "?db=0"})
|
||||
require.NoError(t, err)
|
||||
|
||||
projectUsage := accounting.NewService(db.ProjectAccounting(), cache, 0, 0, 5*time.Minute)
|
||||
projectLimitCache := accounting.NewProjectLimitCache(db.ProjectAccounting(), 0, 0, accounting.ProjectLimitConfig{CacheCapacity: 100})
|
||||
|
||||
projectUsage := accounting.NewService(db.ProjectAccounting(), cache, projectLimitCache, 5*time.Minute)
|
||||
|
||||
// TODO maybe switch this test to testplanet to avoid defining config and Stripe service
|
||||
pc := paymentsconfig.Config{
|
||||
|
@ -117,7 +117,6 @@ type Core struct {
|
||||
Accounting struct {
|
||||
Tally *tally.Service
|
||||
Rollup *rollup.Service
|
||||
ProjectUsage *accounting.Service
|
||||
ReportedRollupChore *reportedrollup.Chore
|
||||
ProjectBWCleanupChore *projectbwcleanup.Chore
|
||||
}
|
||||
@ -245,16 +244,6 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
|
||||
peer.LiveAccounting.Cache = liveAccounting
|
||||
}
|
||||
|
||||
{ // setup accounting project usage
|
||||
peer.Accounting.ProjectUsage = accounting.NewService(
|
||||
peer.DB.ProjectAccounting(),
|
||||
peer.LiveAccounting.Cache,
|
||||
config.Metainfo.ProjectLimits.DefaultMaxUsage,
|
||||
config.Metainfo.ProjectLimits.DefaultMaxBandwidth,
|
||||
config.LiveAccounting.BandwidthCacheTTL,
|
||||
)
|
||||
}
|
||||
|
||||
{ // setup orders
|
||||
peer.Orders.DB = rollupsWriteCache
|
||||
peer.Orders.Chore = orders.NewChore(log.Named("orders:chore"), rollupsWriteCache, config.Orders)
|
||||
|
@ -159,4 +159,6 @@ type Config struct {
|
||||
Downtime downtime.Config
|
||||
|
||||
Compensation compensation.Config
|
||||
|
||||
ProjectLimit accounting.ProjectLimitConfig
|
||||
}
|
||||
|
@ -350,6 +350,10 @@ read one (
|
||||
select project.max_buckets
|
||||
where project.id = ?
|
||||
)
|
||||
read one (
|
||||
select project.bandwidth_limit project.usage_limit
|
||||
where project.id = ?
|
||||
)
|
||||
|
||||
read all (
|
||||
select project
|
||||
|
@ -8715,6 +8715,11 @@ type BandwidthLimit_Row struct {
|
||||
BandwidthLimit *int64
|
||||
}
|
||||
|
||||
type BandwidthLimit_UsageLimit_Row struct {
|
||||
BandwidthLimit int64
|
||||
UsageLimit int64
|
||||
}
|
||||
|
||||
type BucketId_Row struct {
|
||||
BucketId []byte
|
||||
}
|
||||
@ -10850,6 +10855,28 @@ func (obj *pgxImpl) Get_Project_MaxBuckets_By_Id(ctx context.Context,
|
||||
|
||||
}
|
||||
|
||||
func (obj *pgxImpl) Get_Project_BandwidthLimit_Project_UsageLimit_By_Id(ctx context.Context,
|
||||
project_id Project_Id_Field) (
|
||||
row *BandwidthLimit_UsageLimit_Row, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
var __embed_stmt = __sqlbundle_Literal("SELECT projects.bandwidth_limit, projects.usage_limit FROM projects WHERE projects.id = ?")
|
||||
|
||||
var __values []interface{}
|
||||
__values = append(__values, project_id.value())
|
||||
|
||||
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
|
||||
obj.logStmt(__stmt, __values...)
|
||||
|
||||
row = &BandwidthLimit_UsageLimit_Row{}
|
||||
err = obj.driver.QueryRowContext(ctx, __stmt, __values...).Scan(&row.BandwidthLimit, &row.UsageLimit)
|
||||
if err != nil {
|
||||
return (*BandwidthLimit_UsageLimit_Row)(nil), obj.makeErr(err)
|
||||
}
|
||||
return row, nil
|
||||
|
||||
}
|
||||
|
||||
func (obj *pgxImpl) All_Project(ctx context.Context) (
|
||||
rows []*Project, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
@ -17117,6 +17144,28 @@ func (obj *pgxcockroachImpl) Get_Project_MaxBuckets_By_Id(ctx context.Context,
|
||||
|
||||
}
|
||||
|
||||
func (obj *pgxcockroachImpl) Get_Project_BandwidthLimit_Project_UsageLimit_By_Id(ctx context.Context,
|
||||
project_id Project_Id_Field) (
|
||||
row *BandwidthLimit_UsageLimit_Row, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
var __embed_stmt = __sqlbundle_Literal("SELECT projects.bandwidth_limit, projects.usage_limit FROM projects WHERE projects.id = ?")
|
||||
|
||||
var __values []interface{}
|
||||
__values = append(__values, project_id.value())
|
||||
|
||||
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
|
||||
obj.logStmt(__stmt, __values...)
|
||||
|
||||
row = &BandwidthLimit_UsageLimit_Row{}
|
||||
err = obj.driver.QueryRowContext(ctx, __stmt, __values...).Scan(&row.BandwidthLimit, &row.UsageLimit)
|
||||
if err != nil {
|
||||
return (*BandwidthLimit_UsageLimit_Row)(nil), obj.makeErr(err)
|
||||
}
|
||||
return row, nil
|
||||
|
||||
}
|
||||
|
||||
func (obj *pgxcockroachImpl) All_Project(ctx context.Context) (
|
||||
rows []*Project, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
@ -22585,6 +22634,16 @@ func (rx *Rx) Get_Project_BandwidthLimit_By_Id(ctx context.Context,
|
||||
return tx.Get_Project_BandwidthLimit_By_Id(ctx, project_id)
|
||||
}
|
||||
|
||||
func (rx *Rx) Get_Project_BandwidthLimit_Project_UsageLimit_By_Id(ctx context.Context,
|
||||
project_id Project_Id_Field) (
|
||||
row *BandwidthLimit_UsageLimit_Row, err error) {
|
||||
var tx *Tx
|
||||
if tx, err = rx.getTx(ctx); err != nil {
|
||||
return
|
||||
}
|
||||
return tx.Get_Project_BandwidthLimit_Project_UsageLimit_By_Id(ctx, project_id)
|
||||
}
|
||||
|
||||
func (rx *Rx) Get_Project_By_Id(ctx context.Context,
|
||||
project_id Project_Id_Field) (
|
||||
project *Project, err error) {
|
||||
@ -23750,6 +23809,10 @@ type Methods interface {
|
||||
project_id Project_Id_Field) (
|
||||
row *BandwidthLimit_Row, err error)
|
||||
|
||||
Get_Project_BandwidthLimit_Project_UsageLimit_By_Id(ctx context.Context,
|
||||
project_id Project_Id_Field) (
|
||||
row *BandwidthLimit_UsageLimit_Row, err error)
|
||||
|
||||
Get_Project_By_Id(ctx context.Context,
|
||||
project_id Project_Id_Field) (
|
||||
project *Project, err error)
|
||||
|
@ -656,3 +656,20 @@ func (db *ProjectAccounting) getBuckets(ctx context.Context, projectID uuid.UUID
|
||||
func timeTruncateDown(t time.Time) time.Time {
|
||||
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 0, t.Location())
|
||||
}
|
||||
|
||||
// GetProjectLimits returns current project limit for both storage and bandwidth.
|
||||
func (db *ProjectAccounting) GetProjectLimits(ctx context.Context, projectID uuid.UUID) (_ accounting.ProjectLimits, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
row, err := db.db.Get_Project_BandwidthLimit_Project_UsageLimit_By_Id(ctx,
|
||||
dbx.Project_Id(projectID[:]),
|
||||
)
|
||||
if err != nil {
|
||||
return accounting.ProjectLimits{}, err
|
||||
}
|
||||
|
||||
return accounting.ProjectLimits{
|
||||
Usage: &row.UsageLimit,
|
||||
Bandwidth: &row.BandwidthLimit,
|
||||
}, nil
|
||||
}
|
||||
|
6
scripts/testdata/satellite-config.yaml.lock
vendored
6
scripts/testdata/satellite-config.yaml.lock
vendored
@ -607,6 +607,12 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
|
||||
# number of months of project bandwidth rollups to retain, not including the current month
|
||||
# project-bw-cleanup.retain-months: 2
|
||||
|
||||
# number of projects to cache.
|
||||
# project-limit.cache-capacity: 10000
|
||||
|
||||
# how long to cache the project limits.
|
||||
# project-limit.cache-expiration: 10m0s
|
||||
|
||||
# the URL for referral manager
|
||||
# referrals.referral-manager-url: ""
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user