satellite/accounting: GetAllProjectTotals returns storage and segment totals combined

return storage and segment totals as a single result, instead of returning only storage
and bandwidth and segment values are filtered out, https://github.com/storj/storj/issues/4744

Change-Id: I624d67ed5205ae21ecd5a2f39775f63ed042e629
This commit is contained in:
Qweder93 2022-05-02 09:09:51 +03:00 committed by Fadila
parent ff64e9ffb3
commit a3c9ca653f
4 changed files with 75 additions and 29 deletions

View File

@ -159,6 +159,12 @@ type BucketUsageRollup struct {
Before time.Time `json:"before"`
}
// Usage contains project's usage split on segments and storage.
type Usage struct {
Storage int64
Segments int64
}
// StoragenodeAccounting stores information about bandwidth and storage usage for storage nodes.
//
// architecture: Database
@ -283,8 +289,8 @@ type Cache interface {
// The projectID is inserted to the spaceUsed when it doesn't exists, hence
// this method will never return ErrKeyNotFound.
AddProjectStorageUsage(ctx context.Context, projectID uuid.UUID, spaceUsed int64) error
// GetAllProjectTotals return the total projects' storage used space.
GetAllProjectTotals(ctx context.Context) (map[uuid.UUID]int64, error)
// GetAllProjectTotals return the total projects' storage and segments used space.
GetAllProjectTotals(ctx context.Context) (map[uuid.UUID]Usage, error)
// Close the client, releasing any open resources. Once it's called any other
// method must be called.
Close() error

View File

@ -132,17 +132,23 @@ func TestGetAllProjectTotals(t *testing.T) {
projectIDs[i] = testrand.UUID()
err := cache.AddProjectStorageUsage(ctx, projectIDs[i], int64(i))
require.NoError(t, err)
err = cache.UpdateProjectSegmentUsage(ctx, projectIDs[i], int64(i), time.Hour)
require.NoError(t, err)
}
projectTotals, err := cache.GetAllProjectTotals(ctx)
usage, err := cache.GetAllProjectTotals(ctx)
require.NoError(t, err)
require.Len(t, projectTotals, len(projectIDs))
require.Len(t, usage, len(projectIDs))
// make sure each project ID and total was received
for _, projID := range projectIDs {
total, err := cache.GetProjectStorageUsage(ctx, projID)
totalStorage, err := cache.GetProjectStorageUsage(ctx, projID)
require.NoError(t, err)
assert.Equal(t, total, projectTotals[projID])
assert.Equal(t, totalStorage, usage[projID].Storage)
totalSegments, err := cache.GetProjectSegmentUsage(ctx, projID)
require.NoError(t, err)
assert.Equal(t, totalSegments, usage[projID].Segments)
}
})
}

View File

@ -163,42 +163,76 @@ func (cache *redisLiveAccounting) AddProjectStorageUsage(ctx context.Context, pr
return nil
}
// GetAllProjectTotals iterates through the live accounting DB and returns a map of project IDs and totals.
// GetAllProjectTotals iterates through the live accounting DB and returns a map of project IDs and totals, amount of segments.
//
// TODO (https://storjlabs.atlassian.net/browse/IN-173): see if it possible to
// get key/value pairs with one single call.
func (cache *redisLiveAccounting) GetAllProjectTotals(ctx context.Context) (_ map[uuid.UUID]int64, err error) {
func (cache *redisLiveAccounting) GetAllProjectTotals(ctx context.Context) (_ map[uuid.UUID]accounting.Usage, err error) {
defer mon.Task()(&ctx)(&err)
projects := make(map[uuid.UUID]int64)
projects := make(map[uuid.UUID]accounting.Usage)
it := cache.client.Scan(ctx, 0, "*", 0).Iterator()
for it.Next(ctx) {
key := it.Val()
// skip bandwidth and segment keys
if strings.HasSuffix(key, "bandwidth") || strings.HasSuffix(key, "segment") {
// skip bandwidth keys
if strings.HasSuffix(key, "bandwidth") {
continue
}
projectID, err := uuid.FromBytes([]byte(key))
if err != nil {
return nil, accounting.ErrUnexpectedValue.New("cannot parse the key as UUID; key=%q", key)
}
if _, seen := projects[projectID]; seen {
continue
}
val, err := cache.getInt64(ctx, key)
if err != nil {
if accounting.ErrKeyNotFound.Has(err) {
continue
if strings.HasSuffix(key, "segment") {
projectID, err := uuid.FromBytes([]byte(strings.TrimSuffix(key, ":segment")))
if err != nil {
return nil, accounting.ErrUnexpectedValue.New("cannot parse the key as UUID; key=%q", key)
}
return nil, err
}
usage := accounting.Usage{}
if seenUsage, seen := projects[projectID]; seen {
if seenUsage.Segments != 0 {
continue
}
projects[projectID] = val
usage = seenUsage
}
segmentUsage, err := cache.GetProjectSegmentUsage(ctx, projectID)
if err != nil {
if accounting.ErrKeyNotFound.Has(err) {
continue
}
return nil, err
}
usage.Segments = segmentUsage
projects[projectID] = usage
} else {
projectID, err := uuid.FromBytes([]byte(key))
if err != nil {
return nil, accounting.ErrUnexpectedValue.New("cannot parse the key as UUID; key=%q", key)
}
usage := accounting.Usage{}
if seenUsage, seen := projects[projectID]; seen {
if seenUsage.Storage != 0 {
continue
}
usage = seenUsage
}
storageUsage, err := cache.getInt64(ctx, key)
if err != nil {
if accounting.ErrKeyNotFound.Has(err) {
continue
}
return nil, err
}
usage.Storage = storageUsage
projects[projectID] = usage
}
}
return projects, nil

View File

@ -141,14 +141,14 @@ func (service *Service) Tally(ctx context.Context) (err error) {
}
for projectID, tallyTotal := range tallyProjectTotals {
delta := latestLiveTotals[projectID] - initialLiveTotals[projectID]
delta := latestLiveTotals[projectID].Storage - initialLiveTotals[projectID].Storage
if delta < 0 {
delta = 0
}
// read the method documentation why the increase passed to this method
// is calculated in this way
err = service.liveAccounting.AddProjectStorageUsage(ctx, projectID, -latestLiveTotals[projectID]+tallyTotal+(delta/2))
err = service.liveAccounting.AddProjectStorageUsage(ctx, projectID, -latestLiveTotals[projectID].Storage+tallyTotal+(delta/2))
if err != nil {
if accounting.ErrSystemOrNetError.Has(err) {
service.log.Error(