satellite/accounting/live: Use Redis client directly
We have to adapt the live accounting to allow the packages that use it to differentiate about errors for being able to ignore them and make our satellite resilient to Redis downtime. For differentiating errors we should make changes in the live accounting but also in the storage/redis.Client, however, we may need to do some dirty workarounds or break other parts of the implementation that depends on it. On the other hand we want to get rid of the storage/redis.Client because it has more functionality that the one that we are using and some process has been started to remove it. Hence, we have refactored the live accounting to directly use the Redis client library for later on (in a future commit) adapt the satellite for being resilient to Redis downtime. Last but not least, a test for expired bandwidth keys have been added and with it a bug was spotted and fix it. Change-Id: Ibd191522cd20f6a9a15e5ccb7beb83a678e530ff
This commit is contained in:
parent
1ad69b9f96
commit
ce26616647
@ -6,6 +6,8 @@ package accounting
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/common/storj"
|
||||
)
|
||||
|
||||
@ -19,6 +21,20 @@ const (
|
||||
LastRollup = "LastRollup"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrInvalidArgument is returned when a function argument has an invalid
|
||||
// business domain value.
|
||||
ErrInvalidArgument = errs.Class("invalid argument")
|
||||
// ErrSystemOrNetError is returned when the used storage backend returns an
|
||||
// internal system or network error.
|
||||
ErrSystemOrNetError = errs.Class("backend system error")
|
||||
// ErrKeyNotFound is returned when the key is not found in the cache.
|
||||
ErrKeyNotFound = errs.Class("key not found")
|
||||
// ErrUnexpectedValue is returned when an unexpected value according the
|
||||
// business domain is in the cache.
|
||||
ErrUnexpectedValue = errs.Class("unexpected value")
|
||||
)
|
||||
|
||||
// CSVRow represents data from QueryPaymentInfo without exposing dbx.
|
||||
type CSVRow struct {
|
||||
NodeID storj.NodeID
|
||||
|
@ -203,12 +203,40 @@ type ProjectAccounting interface {
|
||||
|
||||
// Cache stores live information about project storage which has not yet been synced to ProjectAccounting.
|
||||
//
|
||||
// All the implementations must follow the convention of returning errors of one
|
||||
// of the classes defined in this package.
|
||||
//
|
||||
// All the methods return:
|
||||
//
|
||||
// ErrInvalidArgument: an implementation may return if some parameter contain a
|
||||
// value which isn't accepted, nonetheless, not all the implementations impose
|
||||
// the same constraints on them.
|
||||
//
|
||||
// ErrSystemOrNetError: any method will return this if there is an error with
|
||||
// the underlining system or the network.
|
||||
//
|
||||
// ErrKeyNotFound: returned when a key is not found.
|
||||
//
|
||||
// ErrUnexpectedValue: returned when a key or value stored in the underlying
|
||||
// system isn't of the expected format or type according the business domain.
|
||||
//
|
||||
// architecture: Database
|
||||
type Cache interface {
|
||||
// GetProjectStorageUsage returns the project's storage usage.
|
||||
GetProjectStorageUsage(ctx context.Context, projectID uuid.UUID) (totalUsed int64, err error)
|
||||
// GetProjectBandwidthUsage returns the project's bandwidth usage.
|
||||
GetProjectBandwidthUsage(ctx context.Context, projectID uuid.UUID, now time.Time) (currentUsed int64, err error)
|
||||
// UpdateProjectBandthUsage updates the project's bandwidth usage increasing
|
||||
// it. The projectID is inserted to the increment when it doesn't exists,
|
||||
// hence this method will never return ErrKeyNotFound error's class.
|
||||
UpdateProjectBandwidthUsage(ctx context.Context, projectID uuid.UUID, increment int64, ttl time.Duration, now time.Time) error
|
||||
// AddProjectStorageUsage adds to the projects storage usage the spacedUsed.
|
||||
// The projectID is inserted to the spaceUsed when it doesn't exists, hence
|
||||
// this method will never return ErrKeyNotFound.
|
||||
AddProjectStorageUsage(ctx context.Context, projectID uuid.UUID, spaceUsed int64) error
|
||||
// GetAllProjectTotals return the total projects' storage used space.
|
||||
GetAllProjectTotals(ctx context.Context) (map[uuid.UUID]int64, error)
|
||||
// Close the client, releasing any open resources. Once it's called any other
|
||||
// method must be called.
|
||||
Close() error
|
||||
}
|
||||
|
@ -28,6 +28,15 @@ type Config struct {
|
||||
|
||||
// NewCache creates a new accounting.Cache instance using the type specified backend in
|
||||
// the provided config.
|
||||
//
|
||||
// The cache instance may be returned despite of returning the
|
||||
// accounting.ErrSystemOrNetError because some backends allows to reconnect on
|
||||
// each operation if the connection was not established or it was disconnected,
|
||||
// which is what it could happen at the moment to instance it and the cache will
|
||||
// work one the backend system will be reachable later on.
|
||||
// For this reason, the components that uses the cache should operate despite
|
||||
// the backend is not responding successfully although their service is
|
||||
// degraded.
|
||||
func NewCache(log *zap.Logger, config Config) (accounting.Cache, error) {
|
||||
parts := strings.SplitN(config.StorageBackend, ":", 2)
|
||||
var backendType string
|
||||
@ -38,7 +47,7 @@ func NewCache(log *zap.Logger, config Config) (accounting.Cache, error) {
|
||||
backendType = parts[0]
|
||||
switch backendType {
|
||||
case "redis":
|
||||
return newRedisLiveAccounting(log, config.StorageBackend)
|
||||
return newRedisLiveAccounting(config.StorageBackend)
|
||||
default:
|
||||
return nil, Error.New("unrecognized live accounting backend specifier %q. Currently only redis is supported", backendType)
|
||||
}
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
@ -21,7 +22,7 @@ import (
|
||||
"storj.io/storj/storage/redis/redisserver"
|
||||
)
|
||||
|
||||
func TestLiveAccountingCache(t *testing.T) {
|
||||
func TestAddGetProjectStorageAndBandwidthUsage(t *testing.T) {
|
||||
tests := []struct {
|
||||
backend string
|
||||
}{
|
||||
@ -32,123 +33,67 @@ func TestLiveAccountingCache(t *testing.T) {
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
redis, err := redisserver.Mini(ctx)
|
||||
redis, err := redisserver.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(redis.Close)
|
||||
|
||||
for _, tt := range tests {
|
||||
var config live.Config
|
||||
if tt.backend == "redis" {
|
||||
config = live.Config{
|
||||
StorageBackend: "redis://" + redis.Addr() + "?db=0",
|
||||
}
|
||||
}
|
||||
tt := tt
|
||||
t.Run(tt.backend, func(t *testing.T) {
|
||||
ctx := testcontext.New(t)
|
||||
|
||||
cache, err := live.NewCache(zaptest.NewLogger(t).Named("live-accounting"), config)
|
||||
require.NoError(t, err)
|
||||
|
||||
projectIDs, sum, err := populateCache(ctx, cache)
|
||||
require.NoError(t, err)
|
||||
|
||||
// make sure all of the "projects" got all space updates and got right totals
|
||||
for _, projID := range projectIDs {
|
||||
spaceUsed, err := cache.GetProjectStorageUsage(ctx, projID)
|
||||
require.NoError(t, err)
|
||||
assert.Equalf(t, sum, spaceUsed, "projectID %v", projID)
|
||||
}
|
||||
|
||||
negativeVal := int64(-100)
|
||||
sum += negativeVal
|
||||
|
||||
for _, projID := range projectIDs {
|
||||
err = cache.AddProjectStorageUsage(ctx, projID, negativeVal)
|
||||
require.NoError(t, err)
|
||||
|
||||
spaceUsed, err := cache.GetProjectStorageUsage(ctx, projID)
|
||||
require.NoError(t, err)
|
||||
assert.EqualValues(t, sum, spaceUsed)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRedisCacheConcurrency(t *testing.T) {
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
redis, err := redisserver.Mini(ctx)
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(redis.Close)
|
||||
|
||||
config := live.Config{
|
||||
StorageBackend: "redis://" + redis.Addr() + "?db=0",
|
||||
}
|
||||
cache, err := live.NewCache(zaptest.NewLogger(t).Named("live-accounting"), config)
|
||||
require.NoError(t, err)
|
||||
|
||||
projectID := testrand.UUID()
|
||||
|
||||
const (
|
||||
numConcurrent = 100
|
||||
spaceUsed = 10
|
||||
)
|
||||
expectedSum := spaceUsed * numConcurrent
|
||||
|
||||
var group errgroup.Group
|
||||
for i := 0; i < numConcurrent; i++ {
|
||||
group.Go(func() error {
|
||||
return cache.AddProjectStorageUsage(ctx, projectID, spaceUsed)
|
||||
})
|
||||
}
|
||||
require.NoError(t, group.Wait())
|
||||
|
||||
total, err := cache.GetProjectStorageUsage(ctx, projectID)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.EqualValues(t, expectedSum, total)
|
||||
}
|
||||
|
||||
func populateCache(ctx context.Context, cache accounting.Cache) (projectIDs []uuid.UUID, sum int64, _ error) {
|
||||
const (
|
||||
valuesListSize = 10
|
||||
valueMultiplier = 4096
|
||||
numProjects = 100
|
||||
)
|
||||
// make a largish list of varying values
|
||||
someValues := make([]int64, valuesListSize)
|
||||
for i := range someValues {
|
||||
someValues[i] = int64((i + 1) * valueMultiplier)
|
||||
sum += someValues[i]
|
||||
}
|
||||
|
||||
// make up some project IDs
|
||||
projectIDs = make([]uuid.UUID, numProjects)
|
||||
for i := range projectIDs {
|
||||
projectIDs[i] = testrand.UUID()
|
||||
}
|
||||
|
||||
// send lots of space used updates for all of these projects to the live
|
||||
// accounting store.
|
||||
errg, ctx := errgroup.WithContext(context.Background())
|
||||
for _, projID := range projectIDs {
|
||||
projID := projID
|
||||
errg.Go(func() error {
|
||||
// have each project sending the values in a different order
|
||||
myValues := make([]int64, valuesListSize)
|
||||
copy(myValues, someValues)
|
||||
rand.Shuffle(valuesListSize, func(v1, v2 int) {
|
||||
myValues[v1], myValues[v2] = myValues[v2], myValues[v1]
|
||||
})
|
||||
|
||||
for _, val := range myValues {
|
||||
if err := cache.AddProjectStorageUsage(ctx, projID, val); err != nil {
|
||||
return err
|
||||
var config live.Config
|
||||
if tt.backend == "redis" {
|
||||
config = live.Config{
|
||||
StorageBackend: "redis://" + redis.Addr() + "?db=0",
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
cache, err := live.NewCache(zaptest.NewLogger(t).Named("live-accounting"), config)
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(cache.Close)
|
||||
|
||||
populatedData, err := populateCache(ctx, cache)
|
||||
require.NoError(t, err)
|
||||
|
||||
// make sure all of the "projects" got all space updates and got right totals
|
||||
for _, pdata := range populatedData {
|
||||
pdata := pdata
|
||||
|
||||
t.Run("storage", func(t *testing.T) {
|
||||
spaceUsed, err := cache.GetProjectStorageUsage(ctx, pdata.projectID)
|
||||
require.NoError(t, err)
|
||||
assert.Equalf(t, pdata.storageSum, spaceUsed, "projectID %v", pdata.projectID)
|
||||
|
||||
// upate it again and check
|
||||
negativeVal := -(rand.Int63n(pdata.storageSum) + 1)
|
||||
pdata.storageSum += negativeVal
|
||||
err = cache.AddProjectStorageUsage(ctx, pdata.projectID, negativeVal)
|
||||
require.NoError(t, err)
|
||||
|
||||
spaceUsed, err = cache.GetProjectStorageUsage(ctx, pdata.projectID)
|
||||
require.NoError(t, err)
|
||||
assert.EqualValues(t, pdata.storageSum, spaceUsed)
|
||||
})
|
||||
|
||||
t.Run("bandwidth", func(t *testing.T) {
|
||||
bandwidthUsed, err := cache.GetProjectBandwidthUsage(ctx, pdata.projectID, pdata.bandwidthNow)
|
||||
require.NoError(t, err)
|
||||
assert.Equalf(t, pdata.bandwidthSum, bandwidthUsed, "projectID %v", pdata.projectID)
|
||||
|
||||
// upate it again and check
|
||||
negativeVal := -(rand.Int63n(pdata.bandwidthSum) + 1)
|
||||
pdata.bandwidthSum += negativeVal
|
||||
err = cache.UpdateProjectBandwidthUsage(ctx, pdata.projectID, negativeVal, time.Second*2, pdata.bandwidthNow)
|
||||
require.NoError(t, err)
|
||||
|
||||
bandwidthUsed, err = cache.GetProjectBandwidthUsage(ctx, pdata.projectID, pdata.bandwidthNow)
|
||||
require.NoError(t, err)
|
||||
assert.EqualValues(t, pdata.bandwidthSum, bandwidthUsed)
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
return projectIDs, sum, errg.Wait()
|
||||
}
|
||||
|
||||
func TestGetAllProjectTotals(t *testing.T) {
|
||||
@ -162,37 +107,162 @@ func TestGetAllProjectTotals(t *testing.T) {
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
redis, err := redisserver.Mini(ctx)
|
||||
redis, err := redisserver.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(redis.Close)
|
||||
|
||||
for _, tt := range tests {
|
||||
var config live.Config
|
||||
if tt.backend == "redis" {
|
||||
config = live.Config{
|
||||
StorageBackend: "redis://" + redis.Addr() + "?db=0",
|
||||
tt := tt
|
||||
t.Run(tt.backend, func(t *testing.T) {
|
||||
ctx := testcontext.New(t)
|
||||
|
||||
var config live.Config
|
||||
if tt.backend == "redis" {
|
||||
config = live.Config{
|
||||
StorageBackend: "redis://" + redis.Addr() + "?db=0",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cache, err := live.NewCache(zaptest.NewLogger(t).Named("live-accounting"), config)
|
||||
require.NoError(t, err)
|
||||
|
||||
projectIDs := make([]uuid.UUID, 1000)
|
||||
for i := range projectIDs {
|
||||
projectIDs[i] = testrand.UUID()
|
||||
err := cache.AddProjectStorageUsage(ctx, projectIDs[i], int64(i))
|
||||
cache, err := live.NewCache(zaptest.NewLogger(t).Named("live-accounting"), config)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
defer ctx.Check(cache.Close)
|
||||
|
||||
projectTotals, err := cache.GetAllProjectTotals(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, projectTotals, len(projectIDs))
|
||||
projectIDs := make([]uuid.UUID, 1000)
|
||||
for i := range projectIDs {
|
||||
projectIDs[i] = testrand.UUID()
|
||||
err := cache.AddProjectStorageUsage(ctx, projectIDs[i], int64(i))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// make sure each project ID and total was received
|
||||
for _, projID := range projectIDs {
|
||||
total, err := cache.GetProjectStorageUsage(ctx, projID)
|
||||
projectTotals, err := cache.GetAllProjectTotals(ctx)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, total, projectTotals[projID])
|
||||
}
|
||||
require.Len(t, projectTotals, len(projectIDs))
|
||||
|
||||
// make sure each project ID and total was received
|
||||
for _, projID := range projectIDs {
|
||||
total, err := cache.GetProjectStorageUsage(ctx, projID)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, total, projectTotals[projID])
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestLiveAccountingCache_ProjectBandwidthUsage_expiration(t *testing.T) {
|
||||
tests := []struct {
|
||||
backend string
|
||||
}{
|
||||
{
|
||||
backend: "redis",
|
||||
},
|
||||
}
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
redis, err := redisserver.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
defer ctx.Check(redis.Close)
|
||||
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
t.Run(tt.backend, func(t *testing.T) {
|
||||
ctx := testcontext.New(t)
|
||||
|
||||
var config live.Config
|
||||
if tt.backend == "redis" {
|
||||
config = live.Config{
|
||||
StorageBackend: "redis://" + redis.Addr() + "?db=0",
|
||||
}
|
||||
}
|
||||
|
||||
cache, err := live.NewCache(zaptest.NewLogger(t).Named("live-accounting"), config)
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(cache.Close)
|
||||
|
||||
var (
|
||||
projectID = testrand.UUID()
|
||||
now = time.Now()
|
||||
)
|
||||
err = cache.UpdateProjectBandwidthUsage(ctx, projectID, rand.Int63n(4096)+1, time.Second, now)
|
||||
require.NoError(t, err)
|
||||
|
||||
if tt.backend == "redis" {
|
||||
redis.TestingFastForward(time.Second)
|
||||
}
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
_, err = cache.GetProjectBandwidthUsage(ctx, projectID, now)
|
||||
require.Error(t, err)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type populateCacheData struct {
|
||||
projectID uuid.UUID
|
||||
storageSum int64
|
||||
bandwidthSum int64
|
||||
bandwidthNow time.Time
|
||||
}
|
||||
|
||||
func populateCache(ctx context.Context, cache accounting.Cache) ([]populateCacheData, error) {
|
||||
var (
|
||||
valuesListSize = rand.Intn(10) + 10
|
||||
numProjects = rand.Intn(100) + 100
|
||||
valueStorageMultiplier = rand.Int63n(4095) + 1
|
||||
valueBandwdithMultiplier = rand.Int63n(4095) + 1
|
||||
)
|
||||
// make a largish list of varying values
|
||||
baseValues := make([]int64, valuesListSize)
|
||||
for i := range baseValues {
|
||||
baseValues[i] = rand.Int63n(int64(valuesListSize)) + 1
|
||||
}
|
||||
|
||||
// make up some project IDs
|
||||
populatedData := make([]populateCacheData, numProjects)
|
||||
for i := range populatedData {
|
||||
populatedData[i] = populateCacheData{
|
||||
projectID: testrand.UUID(),
|
||||
}
|
||||
}
|
||||
|
||||
// send lots of space used updates for all of these projects to the live
|
||||
// accounting store.
|
||||
errg, ctx := errgroup.WithContext(context.Background())
|
||||
for i, pdata := range populatedData {
|
||||
var (
|
||||
i = i
|
||||
projID = pdata.projectID
|
||||
)
|
||||
|
||||
errg.Go(func() error {
|
||||
// have each project sending the values in a different order
|
||||
myValues := make([]int64, valuesListSize)
|
||||
copy(myValues, baseValues)
|
||||
rand.Shuffle(valuesListSize, func(v1, v2 int) {
|
||||
myValues[v1], myValues[v2] = myValues[v2], myValues[v1]
|
||||
})
|
||||
|
||||
now := time.Now()
|
||||
populatedData[i].bandwidthNow = now
|
||||
|
||||
for _, val := range myValues {
|
||||
storageVal := val * valueStorageMultiplier
|
||||
populatedData[i].storageSum += storageVal
|
||||
if err := cache.AddProjectStorageUsage(ctx, projID, storageVal); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
bandwidthVal := val * valueBandwdithMultiplier
|
||||
populatedData[i].bandwidthSum += bandwidthVal
|
||||
if err := cache.UpdateProjectBandwidthUsage(ctx, projID, bandwidthVal, time.Hour, now); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
return populatedData, errg.Wait()
|
||||
}
|
||||
|
@ -5,73 +5,90 @@ package live
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"github.com/go-redis/redis"
|
||||
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/storj/storage"
|
||||
"storj.io/storj/storage/redis"
|
||||
"storj.io/storj/satellite/accounting"
|
||||
)
|
||||
|
||||
type redisLiveAccounting struct {
|
||||
log *zap.Logger
|
||||
|
||||
client *redis.Client
|
||||
}
|
||||
|
||||
func newRedisLiveAccounting(log *zap.Logger, address string) (*redisLiveAccounting, error) {
|
||||
client, err := redis.NewClientFrom(address)
|
||||
// newRedisLiveAccounting returns a redisLiveAccounting cache instance.
|
||||
//
|
||||
// It returns accounting.ErrInvalidArgument if the connection address is invalid
|
||||
// according to Redis.
|
||||
//
|
||||
// The function pings to the Redis server for verifying the connectivity but if
|
||||
// it fails then it returns an instance and accounting.ErrSystemOrNetError
|
||||
// because it means that Redis may not be operative at this precise moment but
|
||||
// it may be in future method calls as it handles automatically reconnects.
|
||||
func newRedisLiveAccounting(address string) (*redisLiveAccounting, error) {
|
||||
redisurl, err := url.Parse(address)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
return nil, accounting.ErrInvalidArgument.New("address: invalid URL; %w", err)
|
||||
}
|
||||
return &redisLiveAccounting{
|
||||
log: log,
|
||||
|
||||
if redisurl.Scheme != "redis" {
|
||||
return nil, accounting.ErrInvalidArgument.New("address: not a redis:// formatted address")
|
||||
}
|
||||
|
||||
q := redisurl.Query()
|
||||
db := q.Get("db")
|
||||
if db == "" {
|
||||
return nil, accounting.ErrInvalidArgument.New("address: a database number has to be specified")
|
||||
}
|
||||
|
||||
dbn, err := strconv.Atoi(db)
|
||||
if err != nil {
|
||||
return nil, accounting.ErrInvalidArgument.New("address: invalid database number %s", db)
|
||||
}
|
||||
|
||||
client := redis.NewClient(&redis.Options{
|
||||
Addr: redisurl.Host,
|
||||
Password: q.Get("password"),
|
||||
DB: dbn,
|
||||
})
|
||||
|
||||
cache := &redisLiveAccounting{
|
||||
client: client,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ping here to verify we are able to connect to Redis with the initialized client.
|
||||
if err := client.Ping().Err(); err != nil {
|
||||
return cache, accounting.ErrSystemOrNetError.New("Redis ping failed: %w", err)
|
||||
}
|
||||
|
||||
return cache, nil
|
||||
}
|
||||
|
||||
// GetProjectStorageUsage gets inline and remote storage totals for a given
|
||||
// project, back to the time of the last accounting tally.
|
||||
func (cache *redisLiveAccounting) GetProjectStorageUsage(ctx context.Context, projectID uuid.UUID) (totalUsed int64, err error) {
|
||||
defer mon.Task()(&ctx, projectID)(&err)
|
||||
val, err := cache.client.Get(ctx, projectID[:])
|
||||
if err != nil {
|
||||
if storage.ErrKeyNotFound.Has(err) {
|
||||
return 0, nil
|
||||
}
|
||||
return 0, Error.Wrap(err)
|
||||
}
|
||||
intval, err := strconv.ParseInt(string([]byte(val)), 10, 64)
|
||||
return intval, Error.Wrap(err)
|
||||
}
|
||||
|
||||
// createBandwidthProjectIDKey creates the bandwidth project key.
|
||||
// The current month is combined with projectID to create a prefix.
|
||||
func createBandwidthProjectIDKey(projectID uuid.UUID, now time.Time) []byte {
|
||||
// Add current month as prefix
|
||||
_, month, _ := now.Date()
|
||||
key := append(projectID[:], byte(int(month)))
|
||||
|
||||
return append(key, []byte(":bandwidth")...)
|
||||
return cache.getInt64(ctx, string(projectID[:]))
|
||||
}
|
||||
|
||||
// GetProjectBandwidthUsage returns the current bandwidth usage
|
||||
// from specific project.
|
||||
func (cache *redisLiveAccounting) GetProjectBandwidthUsage(ctx context.Context, projectID uuid.UUID, now time.Time) (currentUsed int64, err error) {
|
||||
val, err := cache.client.Get(ctx, createBandwidthProjectIDKey(projectID, now))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
intval, err := strconv.ParseInt(string([]byte(val)), 10, 64)
|
||||
return intval, Error.Wrap(err)
|
||||
defer mon.Task()(&ctx, projectID, now)(&err)
|
||||
|
||||
return cache.getInt64(ctx, createBandwidthProjectIDKey(projectID, now))
|
||||
}
|
||||
|
||||
// UpdateProjectBandwidthUsage increment the bandwidth cache key value.
|
||||
func (cache *redisLiveAccounting) UpdateProjectBandwidthUsage(ctx context.Context, projectID uuid.UUID, increment int64, ttl time.Duration, now time.Time) (err error) {
|
||||
mon.Task()(&ctx, projectID, increment, ttl, now)(&err)
|
||||
|
||||
// The following script will increment the cache key
|
||||
// by a specific value. If the key does not exist, it is
|
||||
@ -80,18 +97,21 @@ func (cache *redisLiveAccounting) UpdateProjectBandwidthUsage(ctx context.Contex
|
||||
// To achieve this we compare the increment and key value,
|
||||
// if they are equal its the first iteration.
|
||||
// More details on rate limiter section: https://redis.io/commands/incr
|
||||
|
||||
script := fmt.Sprintf(`local current
|
||||
current = redis.call("incrby", KEYS[1], "%d")
|
||||
if tonumber(current) == "%d" then
|
||||
if tonumber(current) == %d then
|
||||
redis.call("expire",KEYS[1], %d)
|
||||
end
|
||||
return current
|
||||
`, increment, increment, int(ttl.Seconds()))
|
||||
|
||||
key := createBandwidthProjectIDKey(projectID, now)
|
||||
err = cache.client.Eval(script, []string{key}).Err()
|
||||
if err != nil {
|
||||
return accounting.ErrSystemOrNetError.New("Redis eval failed: %w", err)
|
||||
}
|
||||
|
||||
return cache.client.Eval(ctx, script, []string{string(key)})
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddProjectStorageUsage lets the live accounting know that the given
|
||||
@ -99,37 +119,92 @@ func (cache *redisLiveAccounting) UpdateProjectBandwidthUsage(ctx context.Contex
|
||||
// perspective; i.e. segment size).
|
||||
func (cache *redisLiveAccounting) AddProjectStorageUsage(ctx context.Context, projectID uuid.UUID, spaceUsed int64) (err error) {
|
||||
defer mon.Task()(&ctx, projectID, spaceUsed)(&err)
|
||||
return cache.client.IncrBy(ctx, projectID[:], spaceUsed)
|
||||
|
||||
_, err = cache.client.IncrBy(string(projectID[:]), spaceUsed).Result()
|
||||
if err != nil {
|
||||
return accounting.ErrSystemOrNetError.New("Redis incrby failed: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetAllProjectTotals iterates through the live accounting DB and returns a map of project IDs and totals.
|
||||
//
|
||||
// TODO (https://storjlabs.atlassian.net/browse/IN-173): see if it possible to
|
||||
// get key/value pairs with one single call.
|
||||
func (cache *redisLiveAccounting) GetAllProjectTotals(ctx context.Context) (_ map[uuid.UUID]int64, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
projects := make(map[uuid.UUID]int64)
|
||||
it := cache.client.Scan(0, "*", 0).Iterator()
|
||||
for it.Next() {
|
||||
key := it.Val()
|
||||
|
||||
err = cache.client.Iterate(ctx, storage.IterateOptions{Recurse: true}, func(ctx context.Context, it storage.Iterator) error {
|
||||
var item storage.ListItem
|
||||
for it.Next(ctx, &item) {
|
||||
if item.Key == nil {
|
||||
return Error.New("nil key")
|
||||
}
|
||||
id := new(uuid.UUID)
|
||||
copy(id[:], item.Key[:])
|
||||
intval, err := strconv.ParseInt(string([]byte(item.Value)), 10, 64)
|
||||
if err != nil {
|
||||
return Error.New("could not get total for project %s", id.String())
|
||||
}
|
||||
if !strings.HasSuffix(item.Key.String(), "bandwidth") {
|
||||
projects[*id] = intval
|
||||
}
|
||||
// skip bandwidth keys
|
||||
if strings.HasSuffix(key, "bandwidth") {
|
||||
continue
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return projects, err
|
||||
|
||||
projectID, err := uuid.FromBytes([]byte(key))
|
||||
if err != nil {
|
||||
return nil, accounting.ErrUnexpectedValue.New("cannot parse the key as UUID; key=%q", key)
|
||||
}
|
||||
|
||||
if _, seen := projects[projectID]; seen {
|
||||
continue
|
||||
}
|
||||
|
||||
val, err := cache.getInt64(ctx, key)
|
||||
if err != nil {
|
||||
if accounting.ErrKeyNotFound.Has(err) {
|
||||
continue
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
projects[projectID] = val
|
||||
}
|
||||
|
||||
return projects, nil
|
||||
}
|
||||
|
||||
// Close the DB connection.
|
||||
func (cache *redisLiveAccounting) Close() error {
|
||||
return cache.client.Close()
|
||||
err := cache.client.Close()
|
||||
if err != nil {
|
||||
return accounting.ErrSystemOrNetError.New("Redis close failed: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cache *redisLiveAccounting) getInt64(ctx context.Context, key string) (_ int64, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
val, err := cache.client.Get(key).Bytes()
|
||||
if err != nil {
|
||||
if errors.Is(err, redis.Nil) {
|
||||
return 0, accounting.ErrKeyNotFound.New("%q", key)
|
||||
}
|
||||
|
||||
return 0, accounting.ErrSystemOrNetError.New("Redis get failed: %w", err)
|
||||
}
|
||||
|
||||
intval, err := strconv.ParseInt(string(val), 10, 64)
|
||||
if err != nil {
|
||||
return 0, accounting.ErrUnexpectedValue.New("cannot parse the value as int64; key=%q val=%q", key, val)
|
||||
}
|
||||
|
||||
return intval, nil
|
||||
}
|
||||
|
||||
// createBandwidthProjectIDKey creates the bandwidth project key.
|
||||
// The current month is combined with projectID to create a prefix.
|
||||
func createBandwidthProjectIDKey(projectID uuid.UUID, now time.Time) string {
|
||||
// Add current month as prefix
|
||||
_, month, _ := now.Date()
|
||||
key := append(projectID[:], byte(int(month)))
|
||||
|
||||
return string(key) + ":bandwidth"
|
||||
}
|
||||
|
@ -13,7 +13,6 @@ import (
|
||||
|
||||
"storj.io/common/memory"
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/storj/storage"
|
||||
)
|
||||
|
||||
var mon = monkit.Package()
|
||||
@ -48,6 +47,10 @@ func NewService(projectAccountingDB ProjectAccounting, liveAccounting Cache, lim
|
||||
// ExceedsBandwidthUsage returns true if the bandwidth usage limits have been exceeded
|
||||
// for a project in the past month (30 days). The usage limit is (e.g 25GB) multiplied by the redundancy
|
||||
// expansion factor, so that the uplinks have a raw limit.
|
||||
//
|
||||
// Among others,it can return one of the following errors returned by
|
||||
// storj.io/storj/satellite/accounting.Cache except the ErrKeyNotFound, wrapped
|
||||
// by ErrProjectUsage.
|
||||
func (usage *Service) ExceedsBandwidthUsage(ctx context.Context, projectID uuid.UUID) (_ bool, limit memory.Size, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
@ -65,10 +68,9 @@ func (usage *Service) ExceedsBandwidthUsage(ctx context.Context, projectID uuid.
|
||||
|
||||
// Get the current bandwidth usage from cache.
|
||||
bandwidthUsage, err = usage.liveAccounting.GetProjectBandwidthUsage(ctx, projectID, usage.nowFn())
|
||||
|
||||
if err != nil {
|
||||
// Verify If the cache key was not found
|
||||
if storage.ErrKeyNotFound.Has(err) {
|
||||
if ErrKeyNotFound.Has(err) {
|
||||
|
||||
// Get current bandwidth value from database.
|
||||
now := usage.nowFn()
|
||||
@ -133,10 +135,17 @@ func (usage *Service) ExceedsStorageUsage(ctx context.Context, projectID uuid.UU
|
||||
}
|
||||
|
||||
// GetProjectStorageTotals returns total amount of storage used by project.
|
||||
//
|
||||
// It can return one of the following errors returned by
|
||||
// storj.io/storj/satellite/accounting.Cache.GetProjectStorageUsage except the
|
||||
// ErrKeyNotFound, wrapped by ErrProjectUsage.
|
||||
func (usage *Service) GetProjectStorageTotals(ctx context.Context, projectID uuid.UUID) (total int64, err error) {
|
||||
defer mon.Task()(&ctx, projectID)(&err)
|
||||
|
||||
total, err = usage.liveAccounting.GetProjectStorageUsage(ctx, projectID)
|
||||
if ErrKeyNotFound.Has(err) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
return total, ErrProjectUsage.Wrap(err)
|
||||
}
|
||||
@ -181,11 +190,19 @@ func (usage *Service) UpdateProjectLimits(ctx context.Context, projectID uuid.UU
|
||||
}
|
||||
|
||||
// GetProjectBandwidthUsage get the current bandwidth usage from cache.
|
||||
//
|
||||
// It can return one of the following errors returned by
|
||||
// storj.io/storj/satellite/accounting.Cache.GetProjectBandwidthUsage, wrapped
|
||||
// by ErrProjectUsage.
|
||||
func (usage *Service) GetProjectBandwidthUsage(ctx context.Context, projectID uuid.UUID) (currentUsed int64, err error) {
|
||||
return usage.liveAccounting.GetProjectBandwidthUsage(ctx, projectID, usage.nowFn())
|
||||
}
|
||||
|
||||
// UpdateProjectBandwidthUsage increments the bandwidth cache key for a specific project.
|
||||
//
|
||||
// It can return one of the following errors returned by
|
||||
// storj.io/storj/satellite/accounting.Cache.UpdatProjectBandwidthUsage, wrapped
|
||||
// by ErrProjectUsage.
|
||||
func (usage *Service) UpdateProjectBandwidthUsage(ctx context.Context, projectID uuid.UUID, increment int64) (err error) {
|
||||
return usage.liveAccounting.UpdateProjectBandwidthUsage(ctx, projectID, increment, usage.bandwidthCacheTTL, usage.nowFn())
|
||||
}
|
||||
@ -193,6 +210,10 @@ func (usage *Service) UpdateProjectBandwidthUsage(ctx context.Context, projectID
|
||||
// AddProjectStorageUsage lets the live accounting know that the given
|
||||
// project has just added spaceUsed bytes of storage (from the user's
|
||||
// perspective; i.e. segment size).
|
||||
//
|
||||
// It can return one of the following errors returned by
|
||||
// storj.io/storj/satellite/accounting.Cache.AddProjectStorageUsage, wrapped by
|
||||
// ErrProjectUsage.
|
||||
func (usage *Service) AddProjectStorageUsage(ctx context.Context, projectID uuid.UUID, spaceUsed int64) (err error) {
|
||||
defer mon.Task()(&ctx, projectID)(&err)
|
||||
return usage.liveAccounting.AddProjectStorageUsage(ctx, projectID, spaceUsed)
|
||||
|
@ -36,6 +36,13 @@ const (
|
||||
type Server interface {
|
||||
Addr() string
|
||||
Close() error
|
||||
// TestingFastForward is a function for enforce the TTL of keys in
|
||||
// implementations what they have not exercise the expiration by themselves
|
||||
// (e.g. Minitredis). This method is a no-op in implementations which support
|
||||
// the expiration as usual.
|
||||
//
|
||||
// All the keys whose TTL minus d become <= 0 will be removed.
|
||||
TestingFastForward(d time.Duration)
|
||||
}
|
||||
|
||||
func freeport() (addr string, port int) {
|
||||
@ -151,8 +158,16 @@ type process struct {
|
||||
close func()
|
||||
}
|
||||
|
||||
func (process *process) Addr() string { return process.addr }
|
||||
func (process *process) Close() error { process.close(); return nil }
|
||||
func (process *process) Addr() string {
|
||||
return process.addr
|
||||
}
|
||||
|
||||
func (process *process) Close() error {
|
||||
process.close()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (process *process) TestingFastForward(_ time.Duration) {}
|
||||
|
||||
func pingServer(addr string) error {
|
||||
client := redis.NewClient(&redis.Options{Addr: addr, DB: 1})
|
||||
@ -184,3 +199,7 @@ func (s *miniserver) Close() error {
|
||||
s.Miniredis.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *miniserver) TestingFastForward(d time.Duration) {
|
||||
s.FastForward(d)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user