Ivan Fraixedes ce26616647
satellite/accounting/live: Use Redis client directly
We have to adapt the live accounting to allow the packages that use it
to differentiate about errors for being able to ignore them and make our
satellite resilient to Redis downtime.

For differentiating errors we should make changes in the live accounting
but also in the storage/redis.Client, however, we may need to do some
dirty workarounds or break other parts of the implementation that
depends on it.

On the other hand we want to get rid of the storage/redis.Client because
it has more functionality that the one that we are using and some
process has been started to remove it.

Hence, we have refactored the live accounting to directly use the Redis
client library for later on (in a future commit) adapt the satellite for
being resilient to Redis downtime.

Last but not least, a test for expired bandwidth keys have been added
and with it a bug was spotted and fix it.

Change-Id: Ibd191522cd20f6a9a15e5ccb7beb83a678e530ff
2021-01-12 15:33:29 +01:00

269 lines
6.9 KiB

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package live_test
import (
func TestAddGetProjectStorageAndBandwidthUsage(t *testing.T) {
tests := []struct {
backend string
backend: "redis",
ctx := testcontext.New(t)
defer ctx.Cleanup()
redis, err := redisserver.Start(ctx)
require.NoError(t, err)
defer ctx.Check(redis.Close)
for _, tt := range tests {
tt := tt
t.Run(tt.backend, func(t *testing.T) {
ctx := testcontext.New(t)
var config live.Config
if tt.backend == "redis" {
config = live.Config{
StorageBackend: "redis://" + redis.Addr() + "?db=0",
cache, err := live.NewCache(zaptest.NewLogger(t).Named("live-accounting"), config)
require.NoError(t, err)
defer ctx.Check(cache.Close)
populatedData, err := populateCache(ctx, cache)
require.NoError(t, err)
// make sure all of the "projects" got all space updates and got right totals
for _, pdata := range populatedData {
pdata := pdata
t.Run("storage", func(t *testing.T) {
spaceUsed, err := cache.GetProjectStorageUsage(ctx, pdata.projectID)
require.NoError(t, err)
assert.Equalf(t, pdata.storageSum, spaceUsed, "projectID %v", pdata.projectID)
// upate it again and check
negativeVal := -(rand.Int63n(pdata.storageSum) + 1)
pdata.storageSum += negativeVal
err = cache.AddProjectStorageUsage(ctx, pdata.projectID, negativeVal)
require.NoError(t, err)
spaceUsed, err = cache.GetProjectStorageUsage(ctx, pdata.projectID)
require.NoError(t, err)
assert.EqualValues(t, pdata.storageSum, spaceUsed)
t.Run("bandwidth", func(t *testing.T) {
bandwidthUsed, err := cache.GetProjectBandwidthUsage(ctx, pdata.projectID, pdata.bandwidthNow)
require.NoError(t, err)
assert.Equalf(t, pdata.bandwidthSum, bandwidthUsed, "projectID %v", pdata.projectID)
// upate it again and check
negativeVal := -(rand.Int63n(pdata.bandwidthSum) + 1)
pdata.bandwidthSum += negativeVal
err = cache.UpdateProjectBandwidthUsage(ctx, pdata.projectID, negativeVal, time.Second*2, pdata.bandwidthNow)
require.NoError(t, err)
bandwidthUsed, err = cache.GetProjectBandwidthUsage(ctx, pdata.projectID, pdata.bandwidthNow)
require.NoError(t, err)
assert.EqualValues(t, pdata.bandwidthSum, bandwidthUsed)
func TestGetAllProjectTotals(t *testing.T) {
tests := []struct {
backend string
backend: "redis",
ctx := testcontext.New(t)
defer ctx.Cleanup()
redis, err := redisserver.Start(ctx)
require.NoError(t, err)
defer ctx.Check(redis.Close)
for _, tt := range tests {
tt := tt
t.Run(tt.backend, func(t *testing.T) {
ctx := testcontext.New(t)
var config live.Config
if tt.backend == "redis" {
config = live.Config{
StorageBackend: "redis://" + redis.Addr() + "?db=0",
cache, err := live.NewCache(zaptest.NewLogger(t).Named("live-accounting"), config)
require.NoError(t, err)
defer ctx.Check(cache.Close)
projectIDs := make([]uuid.UUID, 1000)
for i := range projectIDs {
projectIDs[i] = testrand.UUID()
err := cache.AddProjectStorageUsage(ctx, projectIDs[i], int64(i))
require.NoError(t, err)
projectTotals, err := cache.GetAllProjectTotals(ctx)
require.NoError(t, err)
require.Len(t, projectTotals, len(projectIDs))
// make sure each project ID and total was received
for _, projID := range projectIDs {
total, err := cache.GetProjectStorageUsage(ctx, projID)
require.NoError(t, err)
assert.Equal(t, total, projectTotals[projID])
func TestLiveAccountingCache_ProjectBandwidthUsage_expiration(t *testing.T) {
tests := []struct {
backend string
backend: "redis",
ctx := testcontext.New(t)
defer ctx.Cleanup()
redis, err := redisserver.Start(ctx)
require.NoError(t, err)
defer ctx.Check(redis.Close)
for _, tt := range tests {
tt := tt
t.Run(tt.backend, func(t *testing.T) {
ctx := testcontext.New(t)
var config live.Config
if tt.backend == "redis" {
config = live.Config{
StorageBackend: "redis://" + redis.Addr() + "?db=0",
cache, err := live.NewCache(zaptest.NewLogger(t).Named("live-accounting"), config)
require.NoError(t, err)
defer ctx.Check(cache.Close)
var (
projectID = testrand.UUID()
now = time.Now()
err = cache.UpdateProjectBandwidthUsage(ctx, projectID, rand.Int63n(4096)+1, time.Second, now)
require.NoError(t, err)
if tt.backend == "redis" {
time.Sleep(2 * time.Second)
_, err = cache.GetProjectBandwidthUsage(ctx, projectID, now)
require.Error(t, err)
type populateCacheData struct {
projectID uuid.UUID
storageSum int64
bandwidthSum int64
bandwidthNow time.Time
func populateCache(ctx context.Context, cache accounting.Cache) ([]populateCacheData, error) {
var (
valuesListSize = rand.Intn(10) + 10
numProjects = rand.Intn(100) + 100
valueStorageMultiplier = rand.Int63n(4095) + 1
valueBandwdithMultiplier = rand.Int63n(4095) + 1
// make a largish list of varying values
baseValues := make([]int64, valuesListSize)
for i := range baseValues {
baseValues[i] = rand.Int63n(int64(valuesListSize)) + 1
// make up some project IDs
populatedData := make([]populateCacheData, numProjects)
for i := range populatedData {
populatedData[i] = populateCacheData{
projectID: testrand.UUID(),
// send lots of space used updates for all of these projects to the live
// accounting store.
errg, ctx := errgroup.WithContext(context.Background())
for i, pdata := range populatedData {
var (
i = i
projID = pdata.projectID
errg.Go(func() error {
// have each project sending the values in a different order
myValues := make([]int64, valuesListSize)
copy(myValues, baseValues)
rand.Shuffle(valuesListSize, func(v1, v2 int) {
myValues[v1], myValues[v2] = myValues[v2], myValues[v1]
now := time.Now()
populatedData[i].bandwidthNow = now
for _, val := range myValues {
storageVal := val * valueStorageMultiplier
populatedData[i].storageSum += storageVal
if err := cache.AddProjectStorageUsage(ctx, projID, storageVal); err != nil {
return err
bandwidthVal := val * valueBandwdithMultiplier
populatedData[i].bandwidthSum += bandwidthVal
if err := cache.UpdateProjectBandwidthUsage(ctx, projID, bandwidthVal, time.Hour, now); err != nil {
return err
return nil
return populatedData, errg.Wait()