From 73a279235a394a91e2069c51f2a27f5514b4a372 Mon Sep 17 00:00:00 2001 From: Michal Niewrzal Date: Tue, 11 Jul 2023 14:06:36 +0200 Subject: [PATCH] satellite/accounting/live: get project totals in batches After infrastructure changes redis instance is not neccessay close to core instance (where tally is calculated) and round trips to get data from redis can be very costly. From less than hour calculation can take few hours for larger satellite. This change combines 'segment' and 'storage' usage requests into batches to reduce latency impact on tally calculation. https://github.com/storj/storj/issues/5800 Change-Id: I87e57ec09e88fd167060a4ed51dc8b0274a095c5 --- satellite/accounting/live/cache.go | 3 +- satellite/accounting/live/live_test.go | 32 ++++-- satellite/accounting/live/redis.go | 116 ++++++++++++++------ scripts/testdata/satellite-config.yaml.lock | 3 + 4 files changed, 106 insertions(+), 48 deletions(-) diff --git a/satellite/accounting/live/cache.go b/satellite/accounting/live/cache.go index 640ec17ca..e99734f3f 100644 --- a/satellite/accounting/live/cache.go +++ b/satellite/accounting/live/cache.go @@ -26,6 +26,7 @@ type Config struct { StorageBackend string `help:"what to use for storing real-time accounting data"` BandwidthCacheTTL time.Duration `default:"5m" help:"bandwidth cache key time to live"` AsOfSystemInterval time.Duration `default:"-10s" help:"as of system interval"` + BatchSize int `default:"5000" help:"how much projects usage should be requested from redis cache at once"` } // OpenCache creates a new accounting.Cache instance using the type specified backend in @@ -49,7 +50,7 @@ func OpenCache(ctx context.Context, log *zap.Logger, config Config) (accounting. backendType = parts[0] switch backendType { case "redis": - return openRedisLiveAccounting(ctx, config.StorageBackend) + return openRedisLiveAccounting(ctx, config.StorageBackend, config.BatchSize) default: return nil, Error.New("unrecognized live accounting backend specifier %q. Currently only redis is supported", backendType) } diff --git a/satellite/accounting/live/live_test.go b/satellite/accounting/live/live_test.go index 29836c607..885d45d5b 100644 --- a/satellite/accounting/live/live_test.go +++ b/satellite/accounting/live/live_test.go @@ -6,6 +6,7 @@ package live_test import ( "context" "math/rand" + "strconv" "testing" "time" @@ -136,19 +137,28 @@ func TestGetAllProjectTotals(t *testing.T) { require.NoError(t, err) } - usage, err := cache.GetAllProjectTotals(ctx) - require.NoError(t, err) - require.Len(t, usage, len(projectIDs)) + for _, batchSize := range []int{1, 2, 3, 10, 13, 10000} { + t.Run("batch-size-"+strconv.Itoa(batchSize), func(t *testing.T) { + config.BatchSize = batchSize + testCache, err := live.OpenCache(ctx, zaptest.NewLogger(t).Named("live-accounting"), config) + require.NoError(t, err) + defer ctx.Check(testCache.Close) - // make sure each project ID and total was received - for _, projID := range projectIDs { - totalStorage, err := cache.GetProjectStorageUsage(ctx, projID) - require.NoError(t, err) - assert.Equal(t, totalStorage, usage[projID].Storage) + usage, err := testCache.GetAllProjectTotals(ctx) + require.NoError(t, err) + require.Len(t, usage, len(projectIDs)) - totalSegments, err := cache.GetProjectSegmentUsage(ctx, projID) - require.NoError(t, err) - assert.Equal(t, totalSegments, usage[projID].Segments) + // make sure each project ID and total was received + for _, projID := range projectIDs { + totalStorage, err := testCache.GetProjectStorageUsage(ctx, projID) + require.NoError(t, err) + assert.Equal(t, totalStorage, usage[projID].Storage) + + totalSegments, err := testCache.GetProjectSegmentUsage(ctx, projID) + require.NoError(t, err) + assert.Equal(t, totalSegments, usage[projID].Segments) + } + }) } }) } diff --git a/satellite/accounting/live/redis.go b/satellite/accounting/live/redis.go index 90dca2012..b5cc55d6c 100644 --- a/satellite/accounting/live/redis.go +++ b/satellite/accounting/live/redis.go @@ -18,6 +18,8 @@ import ( type redisLiveAccounting struct { client *redis.Client + + batchSize int } // openRedisLiveAccounting returns a redisLiveAccounting cache instance. @@ -29,14 +31,15 @@ type redisLiveAccounting struct { // it fails then it returns an instance and accounting.ErrSystemOrNetError // because it means that Redis may not be operative at this precise moment but // it may be in future method calls as it handles automatically reconnects. -func openRedisLiveAccounting(ctx context.Context, address string) (*redisLiveAccounting, error) { +func openRedisLiveAccounting(ctx context.Context, address string, batchSize int) (*redisLiveAccounting, error) { opts, err := redis.ParseURL(address) if err != nil { return nil, accounting.ErrInvalidArgument.Wrap(err) } cache := &redisLiveAccounting{ - client: redis.NewClient(opts), + client: redis.NewClient(opts), + batchSize: batchSize, } // ping here to verify we are able to connect to Redis with the initialized client. @@ -52,7 +55,7 @@ func openRedisLiveAccounting(ctx context.Context, address string) (*redisLiveAcc func (cache *redisLiveAccounting) GetProjectStorageUsage(ctx context.Context, projectID uuid.UUID) (totalUsed int64, err error) { defer mon.Task()(&ctx, projectID)(&err) - return cache.getInt64(ctx, string(projectID[:])) + return cache.getInt64(ctx, createStorageProjectIDKey(projectID)) } // GetProjectBandwidthUsage returns the current bandwidth usage @@ -175,7 +178,7 @@ func (cache *redisLiveAccounting) AddProjectSegmentUsageUpToLimit(ctx context.Co func (cache *redisLiveAccounting) AddProjectStorageUsage(ctx context.Context, projectID uuid.UUID, spaceUsed int64) (err error) { defer mon.Task()(&ctx, projectID, spaceUsed)(&err) - _, err = cache.client.IncrBy(ctx, string(projectID[:]), spaceUsed).Result() + _, err = cache.client.IncrBy(ctx, createStorageProjectIDKey(projectID), spaceUsed).Result() if err != nil { return accounting.ErrSystemOrNetError.New("Redis incrby failed: %w", err) } @@ -216,6 +219,7 @@ func (cache *redisLiveAccounting) GetAllProjectTotals(ctx context.Context) (_ ma defer mon.Task()(&ctx)(&err) projects := make(map[uuid.UUID]accounting.Usage) + it := cache.client.Scan(ctx, 0, "*", 0).Iterator() for it.Next(ctx) { key := it.Val() @@ -231,55 +235,90 @@ func (cache *redisLiveAccounting) GetAllProjectTotals(ctx context.Context) (_ ma return nil, accounting.ErrUnexpectedValue.New("cannot parse the key as UUID; key=%q", key) } - usage := accounting.Usage{} - if seenUsage, seen := projects[projectID]; seen { - if seenUsage.Segments != 0 { - continue - } - - usage = seenUsage - } - - segmentUsage, err := cache.GetProjectSegmentUsage(ctx, projectID) - if err != nil { - if accounting.ErrKeyNotFound.Has(err) { - continue - } - - return nil, err - } - - usage.Segments = segmentUsage - projects[projectID] = usage + projects[projectID] = accounting.Usage{} } else { projectID, err := uuid.FromBytes([]byte(key)) if err != nil { return nil, accounting.ErrUnexpectedValue.New("cannot parse the key as UUID; key=%q", key) } - usage := accounting.Usage{} - if seenUsage, seen := projects[projectID]; seen { - if seenUsage.Storage != 0 { - continue - } + projects[projectID] = accounting.Usage{} + } + } - usage = seenUsage + return cache.fillUsage(ctx, projects) +} + +func (cache *redisLiveAccounting) fillUsage(ctx context.Context, projects map[uuid.UUID]accounting.Usage) (_ map[uuid.UUID]accounting.Usage, err error) { + defer mon.Task()(&ctx)(&err) + + if len(projects) == 0 { + return nil, nil + } + + projectIDs := make([]uuid.UUID, 0, cache.batchSize) + segmentKeys := make([]string, 0, cache.batchSize) + storageKeys := make([]string, 0, cache.batchSize) + + fetchProjectsUsage := func() error { + if len(projectIDs) == 0 { + return nil + } + + segmentResult, err := cache.client.MGet(ctx, segmentKeys...).Result() + if err != nil { + return accounting.ErrGetProjectLimitCache.Wrap(err) + } + + storageResult, err := cache.client.MGet(ctx, storageKeys...).Result() + if err != nil { + return accounting.ErrGetProjectLimitCache.Wrap(err) + } + + for i, projectID := range projectIDs { + segmentUsageValue := segmentResult[i].(string) + segmentsUsage, err := strconv.ParseInt(segmentUsageValue, 10, 64) + if err != nil { + return accounting.ErrUnexpectedValue.New("cannot parse the value as int64; val=%q", segmentResult[i]) } - storageUsage, err := cache.getInt64(ctx, key) + storageUsageValue := storageResult[i].(string) + storageUsage, err := strconv.ParseInt(storageUsageValue, 10, 64) if err != nil { - if accounting.ErrKeyNotFound.Has(err) { - continue - } + return accounting.ErrUnexpectedValue.New("cannot parse the value as int64; val=%q", storageResult[i]) + } + projects[projectID] = accounting.Usage{ + Segments: segmentsUsage, + Storage: storageUsage, + } + } + + return nil + } + + for projectID := range projects { + projectIDs = append(projectIDs, projectID) + segmentKeys = append(segmentKeys, createSegmentProjectIDKey(projectID)) + storageKeys = append(storageKeys, createStorageProjectIDKey(projectID)) + + if len(projectIDs) >= cache.batchSize { + err := fetchProjectsUsage() + if err != nil { return nil, err } - usage.Storage = storageUsage - projects[projectID] = usage + projectIDs = projectIDs[:0] + segmentKeys = segmentKeys[:0] + storageKeys = storageKeys[:0] } } + err = fetchProjectsUsage() + if err != nil { + return nil, err + } + return projects, nil } @@ -325,3 +364,8 @@ func createBandwidthProjectIDKey(projectID uuid.UUID, now time.Time) string { func createSegmentProjectIDKey(projectID uuid.UUID) string { return string(projectID[:]) + ":segment" } + +// createStorageProjectIDKey creates the storage project key. +func createStorageProjectIDKey(projectID uuid.UUID) string { + return string(projectID[:]) +} diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index 14ff6750b..5167f6387 100755 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -583,6 +583,9 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key # bandwidth cache key time to live # live-accounting.bandwidth-cache-ttl: 5m0s +# how much projects usage should be requested from redis cache at once +# live-accounting.batch-size: 5000 + # what to use for storing real-time accounting data # live-accounting.storage-backend: ""