satellite/accounting/live: get project totals in batches
After infrastructure changes redis instance is not neccessay close to core instance (where tally is calculated) and round trips to get data from redis can be very costly. From less than hour calculation can take few hours for larger satellite. This change combines 'segment' and 'storage' usage requests into batches to reduce latency impact on tally calculation. https://github.com/storj/storj/issues/5800 Change-Id: I87e57ec09e88fd167060a4ed51dc8b0274a095c5
This commit is contained in:
parent
f30e0986b6
commit
73a279235a
@ -26,6 +26,7 @@ type Config struct {
|
||||
StorageBackend string `help:"what to use for storing real-time accounting data"`
|
||||
BandwidthCacheTTL time.Duration `default:"5m" help:"bandwidth cache key time to live"`
|
||||
AsOfSystemInterval time.Duration `default:"-10s" help:"as of system interval"`
|
||||
BatchSize int `default:"5000" help:"how much projects usage should be requested from redis cache at once"`
|
||||
}
|
||||
|
||||
// OpenCache creates a new accounting.Cache instance using the type specified backend in
|
||||
@ -49,7 +50,7 @@ func OpenCache(ctx context.Context, log *zap.Logger, config Config) (accounting.
|
||||
backendType = parts[0]
|
||||
switch backendType {
|
||||
case "redis":
|
||||
return openRedisLiveAccounting(ctx, config.StorageBackend)
|
||||
return openRedisLiveAccounting(ctx, config.StorageBackend, config.BatchSize)
|
||||
default:
|
||||
return nil, Error.New("unrecognized live accounting backend specifier %q. Currently only redis is supported", backendType)
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ package live_test
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -136,19 +137,28 @@ func TestGetAllProjectTotals(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
usage, err := cache.GetAllProjectTotals(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, usage, len(projectIDs))
|
||||
for _, batchSize := range []int{1, 2, 3, 10, 13, 10000} {
|
||||
t.Run("batch-size-"+strconv.Itoa(batchSize), func(t *testing.T) {
|
||||
config.BatchSize = batchSize
|
||||
testCache, err := live.OpenCache(ctx, zaptest.NewLogger(t).Named("live-accounting"), config)
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(testCache.Close)
|
||||
|
||||
// make sure each project ID and total was received
|
||||
for _, projID := range projectIDs {
|
||||
totalStorage, err := cache.GetProjectStorageUsage(ctx, projID)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, totalStorage, usage[projID].Storage)
|
||||
usage, err := testCache.GetAllProjectTotals(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, usage, len(projectIDs))
|
||||
|
||||
totalSegments, err := cache.GetProjectSegmentUsage(ctx, projID)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, totalSegments, usage[projID].Segments)
|
||||
// make sure each project ID and total was received
|
||||
for _, projID := range projectIDs {
|
||||
totalStorage, err := testCache.GetProjectStorageUsage(ctx, projID)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, totalStorage, usage[projID].Storage)
|
||||
|
||||
totalSegments, err := testCache.GetProjectSegmentUsage(ctx, projID)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, totalSegments, usage[projID].Segments)
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -18,6 +18,8 @@ import (
|
||||
|
||||
type redisLiveAccounting struct {
|
||||
client *redis.Client
|
||||
|
||||
batchSize int
|
||||
}
|
||||
|
||||
// openRedisLiveAccounting returns a redisLiveAccounting cache instance.
|
||||
@ -29,14 +31,15 @@ type redisLiveAccounting struct {
|
||||
// it fails then it returns an instance and accounting.ErrSystemOrNetError
|
||||
// because it means that Redis may not be operative at this precise moment but
|
||||
// it may be in future method calls as it handles automatically reconnects.
|
||||
func openRedisLiveAccounting(ctx context.Context, address string) (*redisLiveAccounting, error) {
|
||||
func openRedisLiveAccounting(ctx context.Context, address string, batchSize int) (*redisLiveAccounting, error) {
|
||||
opts, err := redis.ParseURL(address)
|
||||
if err != nil {
|
||||
return nil, accounting.ErrInvalidArgument.Wrap(err)
|
||||
}
|
||||
|
||||
cache := &redisLiveAccounting{
|
||||
client: redis.NewClient(opts),
|
||||
client: redis.NewClient(opts),
|
||||
batchSize: batchSize,
|
||||
}
|
||||
|
||||
// ping here to verify we are able to connect to Redis with the initialized client.
|
||||
@ -52,7 +55,7 @@ func openRedisLiveAccounting(ctx context.Context, address string) (*redisLiveAcc
|
||||
func (cache *redisLiveAccounting) GetProjectStorageUsage(ctx context.Context, projectID uuid.UUID) (totalUsed int64, err error) {
|
||||
defer mon.Task()(&ctx, projectID)(&err)
|
||||
|
||||
return cache.getInt64(ctx, string(projectID[:]))
|
||||
return cache.getInt64(ctx, createStorageProjectIDKey(projectID))
|
||||
}
|
||||
|
||||
// GetProjectBandwidthUsage returns the current bandwidth usage
|
||||
@ -175,7 +178,7 @@ func (cache *redisLiveAccounting) AddProjectSegmentUsageUpToLimit(ctx context.Co
|
||||
func (cache *redisLiveAccounting) AddProjectStorageUsage(ctx context.Context, projectID uuid.UUID, spaceUsed int64) (err error) {
|
||||
defer mon.Task()(&ctx, projectID, spaceUsed)(&err)
|
||||
|
||||
_, err = cache.client.IncrBy(ctx, string(projectID[:]), spaceUsed).Result()
|
||||
_, err = cache.client.IncrBy(ctx, createStorageProjectIDKey(projectID), spaceUsed).Result()
|
||||
if err != nil {
|
||||
return accounting.ErrSystemOrNetError.New("Redis incrby failed: %w", err)
|
||||
}
|
||||
@ -216,6 +219,7 @@ func (cache *redisLiveAccounting) GetAllProjectTotals(ctx context.Context) (_ ma
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
projects := make(map[uuid.UUID]accounting.Usage)
|
||||
|
||||
it := cache.client.Scan(ctx, 0, "*", 0).Iterator()
|
||||
for it.Next(ctx) {
|
||||
key := it.Val()
|
||||
@ -231,55 +235,90 @@ func (cache *redisLiveAccounting) GetAllProjectTotals(ctx context.Context) (_ ma
|
||||
return nil, accounting.ErrUnexpectedValue.New("cannot parse the key as UUID; key=%q", key)
|
||||
}
|
||||
|
||||
usage := accounting.Usage{}
|
||||
if seenUsage, seen := projects[projectID]; seen {
|
||||
if seenUsage.Segments != 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
usage = seenUsage
|
||||
}
|
||||
|
||||
segmentUsage, err := cache.GetProjectSegmentUsage(ctx, projectID)
|
||||
if err != nil {
|
||||
if accounting.ErrKeyNotFound.Has(err) {
|
||||
continue
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
usage.Segments = segmentUsage
|
||||
projects[projectID] = usage
|
||||
projects[projectID] = accounting.Usage{}
|
||||
} else {
|
||||
projectID, err := uuid.FromBytes([]byte(key))
|
||||
if err != nil {
|
||||
return nil, accounting.ErrUnexpectedValue.New("cannot parse the key as UUID; key=%q", key)
|
||||
}
|
||||
|
||||
usage := accounting.Usage{}
|
||||
if seenUsage, seen := projects[projectID]; seen {
|
||||
if seenUsage.Storage != 0 {
|
||||
continue
|
||||
}
|
||||
projects[projectID] = accounting.Usage{}
|
||||
}
|
||||
}
|
||||
|
||||
usage = seenUsage
|
||||
return cache.fillUsage(ctx, projects)
|
||||
}
|
||||
|
||||
func (cache *redisLiveAccounting) fillUsage(ctx context.Context, projects map[uuid.UUID]accounting.Usage) (_ map[uuid.UUID]accounting.Usage, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if len(projects) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
projectIDs := make([]uuid.UUID, 0, cache.batchSize)
|
||||
segmentKeys := make([]string, 0, cache.batchSize)
|
||||
storageKeys := make([]string, 0, cache.batchSize)
|
||||
|
||||
fetchProjectsUsage := func() error {
|
||||
if len(projectIDs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
segmentResult, err := cache.client.MGet(ctx, segmentKeys...).Result()
|
||||
if err != nil {
|
||||
return accounting.ErrGetProjectLimitCache.Wrap(err)
|
||||
}
|
||||
|
||||
storageResult, err := cache.client.MGet(ctx, storageKeys...).Result()
|
||||
if err != nil {
|
||||
return accounting.ErrGetProjectLimitCache.Wrap(err)
|
||||
}
|
||||
|
||||
for i, projectID := range projectIDs {
|
||||
segmentUsageValue := segmentResult[i].(string)
|
||||
segmentsUsage, err := strconv.ParseInt(segmentUsageValue, 10, 64)
|
||||
if err != nil {
|
||||
return accounting.ErrUnexpectedValue.New("cannot parse the value as int64; val=%q", segmentResult[i])
|
||||
}
|
||||
|
||||
storageUsage, err := cache.getInt64(ctx, key)
|
||||
storageUsageValue := storageResult[i].(string)
|
||||
storageUsage, err := strconv.ParseInt(storageUsageValue, 10, 64)
|
||||
if err != nil {
|
||||
if accounting.ErrKeyNotFound.Has(err) {
|
||||
continue
|
||||
}
|
||||
return accounting.ErrUnexpectedValue.New("cannot parse the value as int64; val=%q", storageResult[i])
|
||||
}
|
||||
|
||||
projects[projectID] = accounting.Usage{
|
||||
Segments: segmentsUsage,
|
||||
Storage: storageUsage,
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
for projectID := range projects {
|
||||
projectIDs = append(projectIDs, projectID)
|
||||
segmentKeys = append(segmentKeys, createSegmentProjectIDKey(projectID))
|
||||
storageKeys = append(storageKeys, createStorageProjectIDKey(projectID))
|
||||
|
||||
if len(projectIDs) >= cache.batchSize {
|
||||
err := fetchProjectsUsage()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
usage.Storage = storageUsage
|
||||
projects[projectID] = usage
|
||||
projectIDs = projectIDs[:0]
|
||||
segmentKeys = segmentKeys[:0]
|
||||
storageKeys = storageKeys[:0]
|
||||
}
|
||||
}
|
||||
|
||||
err = fetchProjectsUsage()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return projects, nil
|
||||
}
|
||||
|
||||
@ -325,3 +364,8 @@ func createBandwidthProjectIDKey(projectID uuid.UUID, now time.Time) string {
|
||||
func createSegmentProjectIDKey(projectID uuid.UUID) string {
|
||||
return string(projectID[:]) + ":segment"
|
||||
}
|
||||
|
||||
// createStorageProjectIDKey creates the storage project key.
|
||||
func createStorageProjectIDKey(projectID uuid.UUID) string {
|
||||
return string(projectID[:])
|
||||
}
|
||||
|
3
scripts/testdata/satellite-config.yaml.lock
vendored
3
scripts/testdata/satellite-config.yaml.lock
vendored
@ -583,6 +583,9 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
|
||||
# bandwidth cache key time to live
|
||||
# live-accounting.bandwidth-cache-ttl: 5m0s
|
||||
|
||||
# how much projects usage should be requested from redis cache at once
|
||||
# live-accounting.batch-size: 5000
|
||||
|
||||
# what to use for storing real-time accounting data
|
||||
# live-accounting.storage-backend: ""
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user