satellite/accounting: add functionality to get and cache project segment count
We want to set maximum number of segments per project. This change will add functionality to get number of segments currently used by project. To avoid often DB calls segment cound will be cached and refreshed every few minutes. Change-Id: I2ecb6484f5afc3875c0e0dfaea360e8872f9d196
This commit is contained in:
parent
af3edf2b78
commit
000957f64a
@ -248,14 +248,20 @@ type ProjectAccounting interface {
|
|||||||
//
|
//
|
||||||
// architecture: Database
|
// architecture: Database
|
||||||
type Cache interface {
|
type Cache interface {
|
||||||
// GetProjectStorageUsage returns the project's storage usage.
|
// GetProjectStorageUsage returns the project's storage usage.
|
||||||
GetProjectStorageUsage(ctx context.Context, projectID uuid.UUID) (totalUsed int64, err error)
|
GetProjectStorageUsage(ctx context.Context, projectID uuid.UUID) (totalUsed int64, err error)
|
||||||
// GetProjectBandwidthUsage returns the project's bandwidth usage.
|
// GetProjectBandwidthUsage returns the project's bandwidth usage.
|
||||||
GetProjectBandwidthUsage(ctx context.Context, projectID uuid.UUID, now time.Time) (currentUsed int64, err error)
|
GetProjectBandwidthUsage(ctx context.Context, projectID uuid.UUID, now time.Time) (currentUsed int64, err error)
|
||||||
|
// GetProjectSegmentUsage returns the project's segment usage.
|
||||||
|
GetProjectSegmentUsage(ctx context.Context, projectID uuid.UUID) (currentUsed int64, err error)
|
||||||
// UpdateProjectBandthUsage updates the project's bandwidth usage increasing
|
// UpdateProjectBandthUsage updates the project's bandwidth usage increasing
|
||||||
// it. The projectID is inserted to the increment when it doesn't exists,
|
// it. The projectID is inserted to the increment when it doesn't exists,
|
||||||
// hence this method will never return ErrKeyNotFound error's class.
|
// hence this method will never return ErrKeyNotFound error's class.
|
||||||
UpdateProjectBandwidthUsage(ctx context.Context, projectID uuid.UUID, increment int64, ttl time.Duration, now time.Time) error
|
UpdateProjectBandwidthUsage(ctx context.Context, projectID uuid.UUID, increment int64, ttl time.Duration, now time.Time) error
|
||||||
|
// UpdateProjectSegmentUsage updates the project's segment usage increasing
|
||||||
|
// it. The projectID is inserted to the increment when it doesn't exists,
|
||||||
|
// hence this method will never return ErrKeyNotFound error's class.
|
||||||
|
UpdateProjectSegmentUsage(ctx context.Context, projectID uuid.UUID, increment int64, ttl time.Duration) error
|
||||||
// AddProjectStorageUsage adds to the projects storage usage the spacedUsed.
|
// AddProjectStorageUsage adds to the projects storage usage the spacedUsed.
|
||||||
// The projectID is inserted to the spaceUsed when it doesn't exists, hence
|
// The projectID is inserted to the spaceUsed when it doesn't exists, hence
|
||||||
// this method will never return ErrKeyNotFound.
|
// this method will never return ErrKeyNotFound.
|
||||||
|
@ -114,6 +114,41 @@ func (cache *redisLiveAccounting) UpdateProjectBandwidthUsage(ctx context.Contex
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetProjectSegmentUsage returns the current segment usage from specific project.
|
||||||
|
func (cache *redisLiveAccounting) GetProjectSegmentUsage(ctx context.Context, projectID uuid.UUID) (currentUsed int64, err error) {
|
||||||
|
defer mon.Task()(&ctx, projectID)(&err)
|
||||||
|
|
||||||
|
return cache.getInt64(ctx, createSegmentProjectIDKey(projectID))
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdateProjectSegmentUsage increment the segment cache key value.
|
||||||
|
func (cache *redisLiveAccounting) UpdateProjectSegmentUsage(ctx context.Context, projectID uuid.UUID, increment int64, ttl time.Duration) (err error) {
|
||||||
|
mon.Task()(&ctx, projectID, increment, ttl)(&err)
|
||||||
|
|
||||||
|
// The following script will increment the cache key
|
||||||
|
// by a specific value. If the key does not exist, it is
|
||||||
|
// set to 0 before performing the operation.
|
||||||
|
// The key expiration will be set only in the first iteration.
|
||||||
|
// To achieve this we compare the increment and key value,
|
||||||
|
// if they are equal its the first iteration.
|
||||||
|
// More details on rate limiter section: https://redis.io/commands/incr
|
||||||
|
script := fmt.Sprintf(`local current
|
||||||
|
current = redis.call("incrby", KEYS[1], "%d")
|
||||||
|
if tonumber(current) == %d then
|
||||||
|
redis.call("expire",KEYS[1], %d)
|
||||||
|
end
|
||||||
|
return current
|
||||||
|
`, increment, increment, int(ttl.Seconds()))
|
||||||
|
|
||||||
|
key := createSegmentProjectIDKey(projectID)
|
||||||
|
err = cache.client.Eval(ctx, script, []string{key}).Err()
|
||||||
|
if err != nil {
|
||||||
|
return accounting.ErrSystemOrNetError.New("Redis eval failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// AddProjectStorageUsage lets the live accounting know that the given
|
// AddProjectStorageUsage lets the live accounting know that the given
|
||||||
// project has just added spaceUsed bytes of storage (from the user's
|
// project has just added spaceUsed bytes of storage (from the user's
|
||||||
// perspective; i.e. segment size).
|
// perspective; i.e. segment size).
|
||||||
@ -206,3 +241,8 @@ func createBandwidthProjectIDKey(projectID uuid.UUID, now time.Time) string {
|
|||||||
_, month, day := now.Date()
|
_, month, day := now.Date()
|
||||||
return string(projectID[:]) + string(byte(month)) + string(byte(day)) + ":bandwidth"
|
return string(projectID[:]) + string(byte(month)) + string(byte(day)) + ":bandwidth"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// createSegmentProjectIDKey creates the segment project key.
|
||||||
|
func createSegmentProjectIDKey(projectID uuid.UUID) string {
|
||||||
|
return string(projectID[:]) + ":segment"
|
||||||
|
}
|
||||||
|
@ -215,6 +215,23 @@ func (usage *Service) UpdateProjectBandwidthUsage(ctx context.Context, projectID
|
|||||||
return usage.liveAccounting.UpdateProjectBandwidthUsage(ctx, projectID, increment, usage.bandwidthCacheTTL, usage.nowFn())
|
return usage.liveAccounting.UpdateProjectBandwidthUsage(ctx, projectID, increment, usage.bandwidthCacheTTL, usage.nowFn())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetProjectSegmentUsage get the current segment usage from cache.
|
||||||
|
//
|
||||||
|
// It can return one of the following errors returned by
|
||||||
|
// storj.io/storj/satellite/accounting.Cache.GetProjectSegmentUsage.
|
||||||
|
func (usage *Service) GetProjectSegmentUsage(ctx context.Context, projectID uuid.UUID) (currentUsed int64, err error) {
|
||||||
|
return usage.liveAccounting.GetProjectSegmentUsage(ctx, projectID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdateProjectSegmentUsage increments the segment cache key for a specific project.
|
||||||
|
//
|
||||||
|
// It can return one of the following errors returned by
|
||||||
|
// storj.io/storj/satellite/accounting.Cache.UpdatProjectSegmentUsage.
|
||||||
|
func (usage *Service) UpdateProjectSegmentUsage(ctx context.Context, projectID uuid.UUID, increment int64) (err error) {
|
||||||
|
// TODO rename bandwidthCacheTTL to cacheTTL
|
||||||
|
return usage.liveAccounting.UpdateProjectSegmentUsage(ctx, projectID, increment, usage.bandwidthCacheTTL)
|
||||||
|
}
|
||||||
|
|
||||||
// AddProjectStorageUsage lets the live accounting know that the given
|
// AddProjectStorageUsage lets the live accounting know that the given
|
||||||
// project has just added spaceUsed bytes of storage (from the user's
|
// project has just added spaceUsed bytes of storage (from the user's
|
||||||
// perspective; i.e. segment size).
|
// perspective; i.e. segment size).
|
||||||
|
@ -756,7 +756,7 @@ func TestProjectUsage_ResetLimitsFirstDayOfNextMonth(t *testing.T) {
|
|||||||
|
|
||||||
func TestProjectUsage_BandwidthCache(t *testing.T) {
|
func TestProjectUsage_BandwidthCache(t *testing.T) {
|
||||||
testplanet.Run(t, testplanet.Config{
|
testplanet.Run(t, testplanet.Config{
|
||||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1,
|
||||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||||
project := planet.Uplinks[0].Projects[0]
|
project := planet.Uplinks[0].Projects[0]
|
||||||
projectUsage := planet.Satellites[0].Accounting.ProjectUsage
|
projectUsage := planet.Satellites[0].Accounting.ProjectUsage
|
||||||
@ -781,6 +781,33 @@ func TestProjectUsage_BandwidthCache(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestProjectUsage_SegmentCache(t *testing.T) {
|
||||||
|
testplanet.Run(t, testplanet.Config{
|
||||||
|
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1,
|
||||||
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||||
|
project := planet.Uplinks[0].Projects[0]
|
||||||
|
projectUsage := planet.Satellites[0].Accounting.ProjectUsage
|
||||||
|
|
||||||
|
segmentsUsed := int64(42)
|
||||||
|
|
||||||
|
err := projectUsage.UpdateProjectSegmentUsage(ctx, project.ID, segmentsUsed)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// verify cache key creation.
|
||||||
|
fromCache, err := projectUsage.GetProjectSegmentUsage(ctx, project.ID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, segmentsUsed, fromCache)
|
||||||
|
|
||||||
|
// verify cache key increment.
|
||||||
|
increment := int64(10)
|
||||||
|
err = projectUsage.UpdateProjectSegmentUsage(ctx, project.ID, increment)
|
||||||
|
require.NoError(t, err)
|
||||||
|
fromCache, err = projectUsage.GetProjectSegmentUsage(ctx, project.ID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, segmentsUsed+increment, fromCache)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestProjectUsage_BandwidthDownloadLimit(t *testing.T) {
|
func TestProjectUsage_BandwidthDownloadLimit(t *testing.T) {
|
||||||
testplanet.Run(t, testplanet.Config{
|
testplanet.Run(t, testplanet.Config{
|
||||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||||
|
Loading…
Reference in New Issue
Block a user