diff --git a/satellite/accounting/db.go b/satellite/accounting/db.go index 4378804c8..f1aa31437 100644 --- a/satellite/accounting/db.go +++ b/satellite/accounting/db.go @@ -248,14 +248,20 @@ type ProjectAccounting interface { // // architecture: Database type Cache interface { - // GetProjectStorageUsage returns the project's storage usage. + // GetProjectStorageUsage returns the project's storage usage. GetProjectStorageUsage(ctx context.Context, projectID uuid.UUID) (totalUsed int64, err error) - // GetProjectBandwidthUsage returns the project's bandwidth usage. + // GetProjectBandwidthUsage returns the project's bandwidth usage. GetProjectBandwidthUsage(ctx context.Context, projectID uuid.UUID, now time.Time) (currentUsed int64, err error) + // GetProjectSegmentUsage returns the project's segment usage. + GetProjectSegmentUsage(ctx context.Context, projectID uuid.UUID) (currentUsed int64, err error) // UpdateProjectBandthUsage updates the project's bandwidth usage increasing // it. The projectID is inserted to the increment when it doesn't exists, // hence this method will never return ErrKeyNotFound error's class. UpdateProjectBandwidthUsage(ctx context.Context, projectID uuid.UUID, increment int64, ttl time.Duration, now time.Time) error + // UpdateProjectSegmentUsage updates the project's segment usage increasing + // it. The projectID is inserted to the increment when it doesn't exists, + // hence this method will never return ErrKeyNotFound error's class. + UpdateProjectSegmentUsage(ctx context.Context, projectID uuid.UUID, increment int64, ttl time.Duration) error // AddProjectStorageUsage adds to the projects storage usage the spacedUsed. // The projectID is inserted to the spaceUsed when it doesn't exists, hence // this method will never return ErrKeyNotFound. diff --git a/satellite/accounting/live/redis.go b/satellite/accounting/live/redis.go index a589e4cb4..cd03e35b9 100644 --- a/satellite/accounting/live/redis.go +++ b/satellite/accounting/live/redis.go @@ -114,6 +114,41 @@ func (cache *redisLiveAccounting) UpdateProjectBandwidthUsage(ctx context.Contex return nil } +// GetProjectSegmentUsage returns the current segment usage from specific project. +func (cache *redisLiveAccounting) GetProjectSegmentUsage(ctx context.Context, projectID uuid.UUID) (currentUsed int64, err error) { + defer mon.Task()(&ctx, projectID)(&err) + + return cache.getInt64(ctx, createSegmentProjectIDKey(projectID)) +} + +// UpdateProjectSegmentUsage increment the segment cache key value. +func (cache *redisLiveAccounting) UpdateProjectSegmentUsage(ctx context.Context, projectID uuid.UUID, increment int64, ttl time.Duration) (err error) { + mon.Task()(&ctx, projectID, increment, ttl)(&err) + + // The following script will increment the cache key + // by a specific value. If the key does not exist, it is + // set to 0 before performing the operation. + // The key expiration will be set only in the first iteration. + // To achieve this we compare the increment and key value, + // if they are equal its the first iteration. + // More details on rate limiter section: https://redis.io/commands/incr + script := fmt.Sprintf(`local current + current = redis.call("incrby", KEYS[1], "%d") + if tonumber(current) == %d then + redis.call("expire",KEYS[1], %d) + end + return current + `, increment, increment, int(ttl.Seconds())) + + key := createSegmentProjectIDKey(projectID) + err = cache.client.Eval(ctx, script, []string{key}).Err() + if err != nil { + return accounting.ErrSystemOrNetError.New("Redis eval failed: %w", err) + } + + return nil +} + // AddProjectStorageUsage lets the live accounting know that the given // project has just added spaceUsed bytes of storage (from the user's // perspective; i.e. segment size). @@ -206,3 +241,8 @@ func createBandwidthProjectIDKey(projectID uuid.UUID, now time.Time) string { _, month, day := now.Date() return string(projectID[:]) + string(byte(month)) + string(byte(day)) + ":bandwidth" } + +// createSegmentProjectIDKey creates the segment project key. +func createSegmentProjectIDKey(projectID uuid.UUID) string { + return string(projectID[:]) + ":segment" +} diff --git a/satellite/accounting/projectusage.go b/satellite/accounting/projectusage.go index 8ed692d1b..d2fcb656c 100644 --- a/satellite/accounting/projectusage.go +++ b/satellite/accounting/projectusage.go @@ -215,6 +215,23 @@ func (usage *Service) UpdateProjectBandwidthUsage(ctx context.Context, projectID return usage.liveAccounting.UpdateProjectBandwidthUsage(ctx, projectID, increment, usage.bandwidthCacheTTL, usage.nowFn()) } +// GetProjectSegmentUsage get the current segment usage from cache. +// +// It can return one of the following errors returned by +// storj.io/storj/satellite/accounting.Cache.GetProjectSegmentUsage. +func (usage *Service) GetProjectSegmentUsage(ctx context.Context, projectID uuid.UUID) (currentUsed int64, err error) { + return usage.liveAccounting.GetProjectSegmentUsage(ctx, projectID) +} + +// UpdateProjectSegmentUsage increments the segment cache key for a specific project. +// +// It can return one of the following errors returned by +// storj.io/storj/satellite/accounting.Cache.UpdatProjectSegmentUsage. +func (usage *Service) UpdateProjectSegmentUsage(ctx context.Context, projectID uuid.UUID, increment int64) (err error) { + // TODO rename bandwidthCacheTTL to cacheTTL + return usage.liveAccounting.UpdateProjectSegmentUsage(ctx, projectID, increment, usage.bandwidthCacheTTL) +} + // AddProjectStorageUsage lets the live accounting know that the given // project has just added spaceUsed bytes of storage (from the user's // perspective; i.e. segment size). diff --git a/satellite/accounting/projectusage_test.go b/satellite/accounting/projectusage_test.go index 570fa2003..57d4d4f3e 100644 --- a/satellite/accounting/projectusage_test.go +++ b/satellite/accounting/projectusage_test.go @@ -756,7 +756,7 @@ func TestProjectUsage_ResetLimitsFirstDayOfNextMonth(t *testing.T) { func TestProjectUsage_BandwidthCache(t *testing.T) { testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, + SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { project := planet.Uplinks[0].Projects[0] projectUsage := planet.Satellites[0].Accounting.ProjectUsage @@ -781,6 +781,33 @@ func TestProjectUsage_BandwidthCache(t *testing.T) { }) } +func TestProjectUsage_SegmentCache(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + project := planet.Uplinks[0].Projects[0] + projectUsage := planet.Satellites[0].Accounting.ProjectUsage + + segmentsUsed := int64(42) + + err := projectUsage.UpdateProjectSegmentUsage(ctx, project.ID, segmentsUsed) + require.NoError(t, err) + + // verify cache key creation. + fromCache, err := projectUsage.GetProjectSegmentUsage(ctx, project.ID) + require.NoError(t, err) + require.Equal(t, segmentsUsed, fromCache) + + // verify cache key increment. + increment := int64(10) + err = projectUsage.UpdateProjectSegmentUsage(ctx, project.ID, increment) + require.NoError(t, err) + fromCache, err = projectUsage.GetProjectSegmentUsage(ctx, project.ID) + require.NoError(t, err) + require.Equal(t, segmentsUsed+increment, fromCache) + }) +} + func TestProjectUsage_BandwidthDownloadLimit(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,