satellite/accounting: add redis support to live accounting (#3213)
* set up redis support in live accounting * move live.Service interface into accounting package and rename to Cache, pass into satellite * refactor Cache to store one int64 total, add IncrBy method to redis client implementation * add monkit tracing to live accounting
This commit is contained in:
parent
21c6737b10
commit
76ad83f12c
@ -23,6 +23,7 @@ import (
|
||||
"storj.io/storj/pkg/process"
|
||||
"storj.io/storj/pkg/revocation"
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/accounting/live"
|
||||
"storj.io/storj/satellite/metainfo"
|
||||
"storj.io/storj/satellite/satellitedb"
|
||||
)
|
||||
@ -147,7 +148,15 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
|
||||
err = errs.Combine(err, revocationDB.Close())
|
||||
}()
|
||||
|
||||
peer, err := satellite.New(log, identity, db, pointerDB, revocationDB, version.Build, &runCfg.Config)
|
||||
liveAccounting, err := live.NewCache(log.Named("live-accounting"), runCfg.LiveAccounting)
|
||||
if err != nil {
|
||||
return errs.New("Error creating live accounting cache: %+v", err)
|
||||
}
|
||||
defer func() {
|
||||
err = errs.Combine(err, liveAccounting.Close())
|
||||
}()
|
||||
|
||||
peer, err := satellite.New(log, identity, db, pointerDB, revocationDB, liveAccounting, version.Build, &runCfg.Config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -121,7 +121,7 @@ type SatelliteSystem struct {
|
||||
}
|
||||
|
||||
LiveAccounting struct {
|
||||
Service live.Service
|
||||
Cache accounting.Cache
|
||||
}
|
||||
|
||||
Mail struct {
|
||||
@ -370,9 +370,17 @@ func (planet *Planet) newSatellites(count int) ([]*SatelliteSystem, error) {
|
||||
if err != nil {
|
||||
return xs, errs.Wrap(err)
|
||||
}
|
||||
|
||||
planet.databases = append(planet.databases, revocationDB)
|
||||
|
||||
peer, err := satellite.New(log, identity, db, pointerDB, revocationDB, versionInfo, &config)
|
||||
liveAccountingCache, err := live.NewCache(log.Named("live-accounting"), config.LiveAccounting)
|
||||
if err != nil {
|
||||
return xs, errs.Wrap(err)
|
||||
}
|
||||
|
||||
planet.databases = append(planet.databases, liveAccountingCache)
|
||||
|
||||
peer, err := satellite.New(log, identity, db, pointerDB, revocationDB, liveAccountingCache, versionInfo, &config)
|
||||
if err != nil {
|
||||
return xs, err
|
||||
}
|
||||
@ -465,5 +473,11 @@ func (planet *Planet) newAPI(count int, identity *identity.FullIdentity, db sate
|
||||
}
|
||||
planet.databases = append(planet.databases, revocationDB)
|
||||
|
||||
return satellite.NewAPI(log, identity, db, pointerDB, revocationDB, &config, versionInfo)
|
||||
liveAccounting, err := live.NewCache(log.Named("live-accounting"), config.LiveAccounting)
|
||||
if err != nil {
|
||||
return nil, errs.Wrap(err)
|
||||
}
|
||||
planet.databases = append(planet.databases, liveAccounting)
|
||||
|
||||
return satellite.NewAPI(log, identity, db, pointerDB, revocationDB, liveAccounting, &config, versionInfo)
|
||||
}
|
||||
|
@ -94,3 +94,13 @@ type ProjectAccounting interface {
|
||||
// GetProjectUsageLimits returns project usage limit
|
||||
GetProjectUsageLimits(ctx context.Context, projectID uuid.UUID) (memory.Size, error)
|
||||
}
|
||||
|
||||
// Cache stores live information about project storage which has not yet been synced to ProjectAccounting.
|
||||
//
|
||||
// architecture: Database
|
||||
type Cache interface {
|
||||
GetProjectStorageUsage(ctx context.Context, projectID uuid.UUID) (totalUsed int64, err error)
|
||||
AddProjectStorageUsage(ctx context.Context, projectID uuid.UUID, inlineSpaceUsed, remoteSpaceUsed int64) error
|
||||
ResetTotals(ctx context.Context) error
|
||||
Close() error
|
||||
}
|
||||
|
45
satellite/accounting/live/cache.go
Normal file
45
satellite/accounting/live/cache.go
Normal file
@ -0,0 +1,45 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package live
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
"gopkg.in/spacemonkeygo/monkit.v2"
|
||||
|
||||
"storj.io/storj/satellite/accounting"
|
||||
)
|
||||
|
||||
var (
|
||||
// Error is the default error class for live-accounting
|
||||
Error = errs.Class("live-accounting")
|
||||
mon = monkit.Package()
|
||||
)
|
||||
|
||||
// Config contains configurable values for the live accounting service.
|
||||
type Config struct {
|
||||
StorageBackend string `help:"what to use for storing real-time accounting data" default:"memory"`
|
||||
}
|
||||
|
||||
// NewCache creates a new accounting.Cache instance using the type specified backend in
|
||||
// the provided config.
|
||||
func NewCache(log *zap.Logger, config Config) (accounting.Cache, error) {
|
||||
parts := strings.SplitN(config.StorageBackend, ":", 2)
|
||||
var backendType string
|
||||
if len(parts) == 0 || parts[0] == "" {
|
||||
backendType = "memory"
|
||||
} else {
|
||||
backendType = parts[0]
|
||||
}
|
||||
switch backendType {
|
||||
case "memory":
|
||||
return newMemoryLiveAccounting(log)
|
||||
case "redis":
|
||||
return newRedisLiveAccounting(log, config.StorageBackend)
|
||||
default:
|
||||
return nil, Error.New("unrecognized live accounting backend specifier %q", backendType)
|
||||
}
|
||||
}
|
@ -1,103 +0,0 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package live
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/skyrings/skyring-common/tools/uuid"
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Config contains configurable values for the live accounting service.
|
||||
type Config struct {
|
||||
StorageBackend string `help:"what to use for storing real-time accounting data" default:"plainmemory"`
|
||||
}
|
||||
|
||||
// Service represents the external interface to the live accounting
|
||||
// functionality.
|
||||
//
|
||||
// architecture: Service
|
||||
type Service interface {
|
||||
GetProjectStorageUsage(ctx context.Context, projectID uuid.UUID) (int64, int64, error)
|
||||
AddProjectStorageUsage(ctx context.Context, projectID uuid.UUID, inlineSpaceUsed, remoteSpaceUsed int64) error
|
||||
ResetTotals()
|
||||
}
|
||||
|
||||
// New creates a new live.Service instance of the type specified in
|
||||
// the provided config.
|
||||
func New(log *zap.Logger, config Config) (Service, error) {
|
||||
parts := strings.SplitN(config.StorageBackend, ":", 2)
|
||||
var backendType string
|
||||
if len(parts) == 0 || parts[0] == "" {
|
||||
backendType = "plainmemory"
|
||||
} else {
|
||||
backendType = parts[0]
|
||||
}
|
||||
if backendType == "plainmemory" {
|
||||
return newPlainMemoryLiveAccounting(log)
|
||||
}
|
||||
return nil, errs.New("unrecognized live accounting backend specifier %q", backendType)
|
||||
}
|
||||
|
||||
// plainMemoryLiveAccounting represents an live.Service-implementing
|
||||
// instance using plain memory (no coordination with other servers). It can be
|
||||
// used to coordinate tracking of how much space a project has used.
|
||||
//
|
||||
// This should probably only be used at small scale or for testing areas where
|
||||
// the accounting cache does not matter significantly. For production, an
|
||||
// implementation that allows multiple servers to participate together would
|
||||
// be preferable.
|
||||
type plainMemoryLiveAccounting struct {
|
||||
log *zap.Logger
|
||||
|
||||
spaceMapLock sync.RWMutex
|
||||
spaceDeltas map[uuid.UUID]spaceUsedAccounting
|
||||
}
|
||||
|
||||
type spaceUsedAccounting struct {
|
||||
inlineSpace int64
|
||||
remoteSpace int64
|
||||
}
|
||||
|
||||
func newPlainMemoryLiveAccounting(log *zap.Logger) (*plainMemoryLiveAccounting, error) {
|
||||
pmac := &plainMemoryLiveAccounting{log: log}
|
||||
pmac.ResetTotals()
|
||||
return pmac, nil
|
||||
}
|
||||
|
||||
// GetProjectStorageUsage gets inline and remote storage totals for a given
|
||||
// project, back to the time of the last accounting tally.
|
||||
func (pmac *plainMemoryLiveAccounting) GetProjectStorageUsage(ctx context.Context, projectID uuid.UUID) (inlineTotal, remoteTotal int64, err error) {
|
||||
pmac.spaceMapLock.Lock()
|
||||
defer pmac.spaceMapLock.Unlock()
|
||||
curVal := pmac.spaceDeltas[projectID]
|
||||
return curVal.inlineSpace, curVal.remoteSpace, nil
|
||||
}
|
||||
|
||||
// AddProjectStorageUsage lets the live accounting know that the given
|
||||
// project has just added inlineSpaceUsed bytes of inline space usage
|
||||
// and remoteSpaceUsed bytes of remote space usage.
|
||||
func (pmac *plainMemoryLiveAccounting) AddProjectStorageUsage(ctx context.Context, projectID uuid.UUID, inlineSpaceUsed, remoteSpaceUsed int64) error {
|
||||
pmac.spaceMapLock.Lock()
|
||||
defer pmac.spaceMapLock.Unlock()
|
||||
curVal := pmac.spaceDeltas[projectID]
|
||||
curVal.inlineSpace += inlineSpaceUsed
|
||||
curVal.remoteSpace += remoteSpaceUsed
|
||||
pmac.spaceDeltas[projectID] = curVal
|
||||
return nil
|
||||
}
|
||||
|
||||
// ResetTotals reset all space-used totals for all projects back to zero. This
|
||||
// would normally be done in concert with calculating new tally counts in the
|
||||
// accountingDB.
|
||||
func (pmac *plainMemoryLiveAccounting) ResetTotals() {
|
||||
pmac.log.Info("Resetting real-time accounting data")
|
||||
pmac.spaceMapLock.Lock()
|
||||
pmac.spaceDeltas = make(map[uuid.UUID]spaceUsedAccounting)
|
||||
pmac.spaceMapLock.Unlock()
|
||||
}
|
@ -1,10 +1,11 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package live
|
||||
package live_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
@ -14,35 +15,168 @@ import (
|
||||
"go.uber.org/zap/zaptest"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"storj.io/storj/internal/testcontext"
|
||||
"storj.io/storj/internal/testrand"
|
||||
"storj.io/storj/satellite/accounting"
|
||||
"storj.io/storj/satellite/accounting/live"
|
||||
"storj.io/storj/storage/redis/redisserver"
|
||||
)
|
||||
|
||||
func TestPlainMemoryLiveAccounting(t *testing.T) {
|
||||
func TestLiveAccountingCache(t *testing.T) {
|
||||
tests := []struct {
|
||||
backend string
|
||||
}{
|
||||
{
|
||||
backend: "memory",
|
||||
},
|
||||
{
|
||||
backend: "redis",
|
||||
},
|
||||
}
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
address, cleanup, err := redisserver.Start()
|
||||
require.NoError(t, err)
|
||||
defer cleanup()
|
||||
|
||||
for _, tt := range tests {
|
||||
var config live.Config
|
||||
if tt.backend == "redis" {
|
||||
config = live.Config{
|
||||
StorageBackend: "redis://" + address + "?db=0",
|
||||
}
|
||||
}
|
||||
|
||||
cache, err := live.NewCache(zaptest.NewLogger(t).Named("live-accounting"), config)
|
||||
require.NoError(t, err)
|
||||
|
||||
projectIDs, sum, err := populateCache(ctx, cache)
|
||||
require.NoError(t, err)
|
||||
|
||||
// make sure all of the "projects" got all space updates and got right totals
|
||||
for _, projID := range projectIDs {
|
||||
spaceUsed, err := cache.GetProjectStorageUsage(ctx, projID)
|
||||
require.NoError(t, err)
|
||||
assert.Equalf(t, sum, spaceUsed, "projectID %v", projID)
|
||||
}
|
||||
|
||||
err = cache.ResetTotals(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, projID := range projectIDs {
|
||||
spaceUsed, err := cache.GetProjectStorageUsage(ctx, projID)
|
||||
require.NoError(t, err)
|
||||
assert.EqualValues(t, 0, spaceUsed)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRedisCacheConcurrency(t *testing.T) {
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
address, cleanup, err := redisserver.Start()
|
||||
require.NoError(t, err)
|
||||
defer cleanup()
|
||||
|
||||
config := live.Config{
|
||||
StorageBackend: "redis://" + address + "?db=0",
|
||||
}
|
||||
cache, err := live.NewCache(zaptest.NewLogger(t).Named("live-accounting"), config)
|
||||
require.NoError(t, err)
|
||||
|
||||
projectID := testrand.UUID()
|
||||
|
||||
const (
|
||||
numConcurrent = 100
|
||||
inlineAmount = 10
|
||||
remoteAmount = 10
|
||||
)
|
||||
expectedSum := (inlineAmount * numConcurrent) + (remoteAmount * numConcurrent)
|
||||
|
||||
var group errgroup.Group
|
||||
for i := 0; i < numConcurrent; i++ {
|
||||
group.Go(func() error {
|
||||
return cache.AddProjectStorageUsage(ctx, projectID, inlineAmount, remoteAmount)
|
||||
})
|
||||
}
|
||||
require.NoError(t, group.Wait())
|
||||
|
||||
spaceUsed, err := cache.GetProjectStorageUsage(ctx, projectID)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.EqualValues(t, expectedSum, spaceUsed)
|
||||
}
|
||||
|
||||
func TestPlainMemoryCacheConcurrency(t *testing.T) {
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
config := live.Config{
|
||||
StorageBackend: "memory",
|
||||
}
|
||||
cache, err := live.NewCache(zaptest.NewLogger(t).Named("live-accounting"), config)
|
||||
require.NoError(t, err)
|
||||
|
||||
projectID := testrand.UUID()
|
||||
|
||||
const (
|
||||
numConcurrent = 100
|
||||
inlineAmount = 10
|
||||
remoteAmount = 10
|
||||
)
|
||||
expectedSum := (inlineAmount * numConcurrent) + (remoteAmount * numConcurrent)
|
||||
|
||||
var group errgroup.Group
|
||||
for i := 0; i < numConcurrent; i++ {
|
||||
group.Go(func() error {
|
||||
return cache.AddProjectStorageUsage(ctx, projectID, inlineAmount, remoteAmount)
|
||||
})
|
||||
}
|
||||
require.NoError(t, group.Wait())
|
||||
|
||||
spaceUsed, err := cache.GetProjectStorageUsage(ctx, projectID)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.EqualValues(t, expectedSum, spaceUsed)
|
||||
}
|
||||
|
||||
func TestNegativeSpaceUsed(t *testing.T) {
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
config := live.Config{
|
||||
StorageBackend: "memory:",
|
||||
}
|
||||
cache, err := live.NewCache(zaptest.NewLogger(t).Named("live-accounting"), config)
|
||||
require.NoError(t, err)
|
||||
|
||||
projectID := testrand.UUID()
|
||||
inline := int64(-10)
|
||||
remote := int64(-20)
|
||||
|
||||
expectedError := fmt.Sprintf("live-accounting: Used space amounts must be greater than 0. Inline: %d, Remote: %d", inline, remote)
|
||||
|
||||
err = cache.AddProjectStorageUsage(ctx, projectID, inline, remote)
|
||||
require.EqualError(t, err, expectedError)
|
||||
}
|
||||
|
||||
func populateCache(ctx context.Context, cache accounting.Cache) (projectIDs []uuid.UUID, sum int64, _ error) {
|
||||
const (
|
||||
valuesListSize = 1000
|
||||
valueMultiplier = 4096
|
||||
numProjects = 200
|
||||
)
|
||||
config := Config{
|
||||
StorageBackend: "plainmemory:",
|
||||
}
|
||||
service, err := New(zaptest.NewLogger(t).Named("live-accounting"), config)
|
||||
require.NoError(t, err)
|
||||
|
||||
// ensure we are using the expected underlying type
|
||||
_, ok := service.(*plainMemoryLiveAccounting)
|
||||
require.True(t, ok)
|
||||
|
||||
// make a largish list of varying values
|
||||
someValues := make([]int64, valuesListSize)
|
||||
sum := int64(0)
|
||||
for i := range someValues {
|
||||
someValues[i] = int64((i + 1) * valueMultiplier)
|
||||
sum += someValues[i]
|
||||
sum += someValues[i] * 2
|
||||
}
|
||||
|
||||
// make up some project IDs
|
||||
projectIDs := make([]uuid.UUID, numProjects)
|
||||
projectIDs = make([]uuid.UUID, numProjects)
|
||||
for i := range projectIDs {
|
||||
projectIDs[i] = testrand.UUID()
|
||||
}
|
||||
@ -61,38 +195,13 @@ func TestPlainMemoryLiveAccounting(t *testing.T) {
|
||||
})
|
||||
|
||||
for _, val := range myValues {
|
||||
if err := service.AddProjectStorageUsage(ctx, projID, val, val); err != nil {
|
||||
if err := cache.AddProjectStorageUsage(ctx, projID, val, val); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
require.NoError(t, errg.Wait())
|
||||
|
||||
// make sure all of the "projects" got all space updates and got right totals
|
||||
for _, projID := range projectIDs {
|
||||
inlineUsed, remoteUsed, err := service.GetProjectStorageUsage(ctx, projID)
|
||||
require.NoError(t, err)
|
||||
assert.Equalf(t, sum, inlineUsed, "projectID %v", projID)
|
||||
assert.Equalf(t, sum, remoteUsed, "projectID %v", projID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestResetTotals(t *testing.T) {
|
||||
config := Config{
|
||||
StorageBackend: "plainmemory:",
|
||||
}
|
||||
service, err := New(zaptest.NewLogger(t).Named("live-accounting"), config)
|
||||
require.NoError(t, err)
|
||||
|
||||
// ensure we are using the expected underlying type
|
||||
_, ok := service.(*plainMemoryLiveAccounting)
|
||||
require.True(t, ok)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
projectID := testrand.UUID()
|
||||
err = service.AddProjectStorageUsage(ctx, projectID, 0, -20)
|
||||
require.NoError(t, err)
|
||||
return projectIDs, sum, errg.Wait()
|
||||
}
|
||||
|
77
satellite/accounting/live/memory.go
Normal file
77
satellite/accounting/live/memory.go
Normal file
@ -0,0 +1,77 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package live
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/skyrings/skyring-common/tools/uuid"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// memoryLiveAccounting represents an accounting.Cache-implementing
|
||||
// instance using plain memory (no coordination with other servers). It can be
|
||||
// used to coordinate tracking of how much space a project has used.
|
||||
//
|
||||
// This should probably only be used at small scale or for testing areas where
|
||||
// the accounting cache does not matter significantly. For production, an
|
||||
// implementation that allows multiple servers to participate together would
|
||||
// be preferable.
|
||||
type memoryLiveAccounting struct {
|
||||
log *zap.Logger
|
||||
|
||||
spaceMapLock sync.RWMutex
|
||||
spaceDeltas map[uuid.UUID]int64
|
||||
}
|
||||
|
||||
func newMemoryLiveAccounting(log *zap.Logger) (*memoryLiveAccounting, error) {
|
||||
pmac := &memoryLiveAccounting{log: log}
|
||||
pmac.spaceDeltas = make(map[uuid.UUID]int64, 0)
|
||||
return pmac, nil
|
||||
}
|
||||
|
||||
// GetProjectStorageUsage gets inline and remote storage totals for a given
|
||||
// project, back to the time of the last accounting tally.
|
||||
func (mac *memoryLiveAccounting) GetProjectStorageUsage(ctx context.Context, projectID uuid.UUID) (totalUsed int64, err error) {
|
||||
defer mon.Task()(&ctx, projectID)(&err)
|
||||
mac.spaceMapLock.Lock()
|
||||
defer mac.spaceMapLock.Unlock()
|
||||
curVal, ok := mac.spaceDeltas[projectID]
|
||||
if !ok {
|
||||
return 0, nil
|
||||
}
|
||||
return curVal, nil
|
||||
}
|
||||
|
||||
// AddProjectStorageUsage lets the live accounting know that the given
|
||||
// project has just added inlineSpaceUsed bytes of inline space usage
|
||||
// and remoteSpaceUsed bytes of remote space usage.
|
||||
func (mac *memoryLiveAccounting) AddProjectStorageUsage(ctx context.Context, projectID uuid.UUID, inlineSpaceUsed, remoteSpaceUsed int64) (err error) {
|
||||
defer mon.Task()(&ctx, projectID, inlineSpaceUsed, remoteSpaceUsed)(&err)
|
||||
if inlineSpaceUsed < 0 || remoteSpaceUsed < 0 {
|
||||
return Error.New("Used space amounts must be greater than 0. Inline: %d, Remote: %d", inlineSpaceUsed, remoteSpaceUsed)
|
||||
}
|
||||
mac.spaceMapLock.Lock()
|
||||
defer mac.spaceMapLock.Unlock()
|
||||
curVal := mac.spaceDeltas[projectID]
|
||||
newTotal := curVal + inlineSpaceUsed + remoteSpaceUsed
|
||||
mac.spaceDeltas[projectID] = newTotal
|
||||
return nil
|
||||
}
|
||||
|
||||
// ResetTotals reset all space-used totals for all projects back to zero. This
|
||||
// would normally be done in concert with calculating new tally counts in the
|
||||
// accountingDB.
|
||||
func (mac *memoryLiveAccounting) ResetTotals(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
mac.log.Debug("Resetting real-time accounting data")
|
||||
mac.spaceMapLock.Lock()
|
||||
mac.spaceDeltas = make(map[uuid.UUID]int64)
|
||||
mac.spaceMapLock.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close matches the accounting.LiveAccounting interface.
|
||||
func (mac *memoryLiveAccounting) Close() error { return nil }
|
72
satellite/accounting/live/redis.go
Normal file
72
satellite/accounting/live/redis.go
Normal file
@ -0,0 +1,72 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package live
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
|
||||
"github.com/skyrings/skyring-common/tools/uuid"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/storj/storage"
|
||||
"storj.io/storj/storage/redis"
|
||||
)
|
||||
|
||||
type redisLiveAccounting struct {
|
||||
log *zap.Logger
|
||||
|
||||
client *redis.Client
|
||||
}
|
||||
|
||||
func newRedisLiveAccounting(log *zap.Logger, address string) (*redisLiveAccounting, error) {
|
||||
client, err := redis.NewClientFrom(address)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
return &redisLiveAccounting{
|
||||
log: log,
|
||||
client: client,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetProjectStorageUsage gets inline and remote storage totals for a given
|
||||
// project, back to the time of the last accounting tally.
|
||||
func (cache *redisLiveAccounting) GetProjectStorageUsage(ctx context.Context, projectID uuid.UUID) (totalUsed int64, err error) {
|
||||
defer mon.Task()(&ctx, projectID)(&err)
|
||||
val, err := cache.client.Get(ctx, []byte(projectID.String()))
|
||||
if err != nil {
|
||||
if storage.ErrKeyNotFound.Has(err) {
|
||||
return 0, nil
|
||||
}
|
||||
return 0, Error.Wrap(err)
|
||||
}
|
||||
intval, err := strconv.Atoi(string(val))
|
||||
return int64(intval), Error.Wrap(err)
|
||||
}
|
||||
|
||||
// AddProjectStorageUsage lets the live accounting know that the given
|
||||
// project has just added inlineSpaceUsed bytes of inline space usage
|
||||
// and remoteSpaceUsed bytes of remote space usage.
|
||||
func (cache *redisLiveAccounting) AddProjectStorageUsage(ctx context.Context, projectID uuid.UUID, inlineSpaceUsed, remoteSpaceUsed int64) (err error) {
|
||||
defer mon.Task()(&ctx, projectID, inlineSpaceUsed, remoteSpaceUsed)(&err)
|
||||
if inlineSpaceUsed < 0 || remoteSpaceUsed < 0 {
|
||||
return Error.New("Used space amounts must be greater than 0. Inline: %d, Remote: %d", inlineSpaceUsed, remoteSpaceUsed)
|
||||
}
|
||||
return cache.client.IncrBy(ctx, []byte(projectID.String()), inlineSpaceUsed+remoteSpaceUsed)
|
||||
}
|
||||
|
||||
// ResetTotals reset all space-used totals for all projects back to zero. This
|
||||
// would normally be done in concert with calculating new tally counts in the
|
||||
// accountingDB.
|
||||
func (cache *redisLiveAccounting) ResetTotals(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
cache.log.Debug("Resetting real-time accounting data")
|
||||
return cache.client.FlushDB()
|
||||
}
|
||||
|
||||
// Close the DB connection.
|
||||
func (cache *redisLiveAccounting) Close() error {
|
||||
return cache.client.Close()
|
||||
}
|
@ -13,7 +13,6 @@ import (
|
||||
monkit "gopkg.in/spacemonkeygo/monkit.v2"
|
||||
|
||||
"storj.io/storj/internal/memory"
|
||||
"storj.io/storj/satellite/accounting/live"
|
||||
)
|
||||
|
||||
var mon = monkit.Package()
|
||||
@ -36,12 +35,12 @@ var (
|
||||
// architecture: Service
|
||||
type ProjectUsage struct {
|
||||
projectAccountingDB ProjectAccounting
|
||||
liveAccounting live.Service
|
||||
liveAccounting Cache
|
||||
maxAlphaUsage memory.Size
|
||||
}
|
||||
|
||||
// NewProjectUsage created new instance of project usage service
|
||||
func NewProjectUsage(projectAccountingDB ProjectAccounting, liveAccounting live.Service, maxAlphaUsage memory.Size) *ProjectUsage {
|
||||
func NewProjectUsage(projectAccountingDB ProjectAccounting, liveAccounting Cache, maxAlphaUsage memory.Size) *ProjectUsage {
|
||||
return &ProjectUsage{
|
||||
projectAccountingDB: projectAccountingDB,
|
||||
liveAccounting: liveAccounting,
|
||||
@ -96,7 +95,7 @@ func (usage *ProjectUsage) ExceedsStorageUsage(ctx context.Context, projectID uu
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
var group errgroup.Group
|
||||
var inlineTotal, remoteTotal int64
|
||||
var totalUsed int64
|
||||
limit = usage.maxAlphaUsage
|
||||
|
||||
// TODO(michal): to reduce db load, consider using a cache to retrieve the project.UsageLimit value if needed
|
||||
@ -109,7 +108,7 @@ func (usage *ProjectUsage) ExceedsStorageUsage(ctx context.Context, projectID uu
|
||||
})
|
||||
group.Go(func() error {
|
||||
var err error
|
||||
inlineTotal, remoteTotal, err = usage.getProjectStorageTotals(ctx, projectID)
|
||||
totalUsed, err = usage.getProjectStorageTotals(ctx, projectID)
|
||||
return err
|
||||
})
|
||||
err = group.Wait()
|
||||
@ -118,25 +117,25 @@ func (usage *ProjectUsage) ExceedsStorageUsage(ctx context.Context, projectID uu
|
||||
}
|
||||
|
||||
maxUsage := limit.Int64() * int64(ExpansionFactor)
|
||||
if inlineTotal+remoteTotal >= maxUsage {
|
||||
if totalUsed >= maxUsage {
|
||||
return true, limit, nil
|
||||
}
|
||||
|
||||
return false, limit, nil
|
||||
}
|
||||
|
||||
func (usage *ProjectUsage) getProjectStorageTotals(ctx context.Context, projectID uuid.UUID) (inline int64, remote int64, err error) {
|
||||
func (usage *ProjectUsage) getProjectStorageTotals(ctx context.Context, projectID uuid.UUID) (total int64, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
lastCountInline, lastCountRemote, err := usage.projectAccountingDB.GetStorageTotals(ctx, projectID)
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
return 0, err
|
||||
}
|
||||
rtInline, rtRemote, err := usage.liveAccounting.GetProjectStorageUsage(ctx, projectID)
|
||||
cachedTotal, err := usage.liveAccounting.GetProjectStorageUsage(ctx, projectID)
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
return 0, err
|
||||
}
|
||||
return lastCountInline + rtInline, lastCountRemote + rtRemote, nil
|
||||
return lastCountInline + lastCountRemote + cachedTotal, nil
|
||||
}
|
||||
|
||||
// AddProjectStorageUsage lets the live accounting know that the given
|
||||
|
@ -15,7 +15,6 @@ import (
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/satellite/accounting"
|
||||
"storj.io/storj/satellite/accounting/live"
|
||||
"storj.io/storj/satellite/metainfo"
|
||||
)
|
||||
|
||||
@ -38,13 +37,13 @@ type Service struct {
|
||||
Loop sync2.Cycle
|
||||
|
||||
metainfoLoop *metainfo.Loop
|
||||
liveAccounting live.Service
|
||||
liveAccounting accounting.Cache
|
||||
storagenodeAccountingDB accounting.StoragenodeAccounting
|
||||
projectAccountingDB accounting.ProjectAccounting
|
||||
}
|
||||
|
||||
// New creates a new tally Service
|
||||
func New(log *zap.Logger, sdb accounting.StoragenodeAccounting, pdb accounting.ProjectAccounting, liveAccounting live.Service, metainfoLoop *metainfo.Loop, interval time.Duration) *Service {
|
||||
func New(log *zap.Logger, sdb accounting.StoragenodeAccounting, pdb accounting.ProjectAccounting, liveAccounting accounting.Cache, metainfoLoop *metainfo.Loop, interval time.Duration) *Service {
|
||||
return &Service{
|
||||
log: log,
|
||||
Loop: *sync2.NewCycle(interval),
|
||||
@ -87,7 +86,10 @@ func (service *Service) Tally(ctx context.Context) (err error) {
|
||||
// double-counted (counted in the tally and also counted as a delta to
|
||||
// the tally). If that happens, it will be fixed at the time of the next
|
||||
// tally run.
|
||||
service.liveAccounting.ResetTotals()
|
||||
err = service.liveAccounting.ResetTotals(ctx)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
// Fetch when the last tally happened so we can roughly calculate the byte-hours.
|
||||
lastTime, err := service.storagenodeAccountingDB.LastTimestamp(ctx, accounting.LastAtRestTally)
|
||||
|
@ -27,7 +27,6 @@ import (
|
||||
"storj.io/storj/pkg/signing"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/satellite/accounting"
|
||||
"storj.io/storj/satellite/accounting/live"
|
||||
"storj.io/storj/satellite/console"
|
||||
"storj.io/storj/satellite/console/consoleauth"
|
||||
"storj.io/storj/satellite/console/consoleweb"
|
||||
@ -96,7 +95,7 @@ type API struct {
|
||||
}
|
||||
|
||||
LiveAccounting struct {
|
||||
Service live.Service
|
||||
Cache accounting.Cache
|
||||
}
|
||||
|
||||
Mail struct {
|
||||
@ -124,7 +123,7 @@ type API struct {
|
||||
}
|
||||
|
||||
// NewAPI creates a new satellite API process
|
||||
func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo.PointerDB, revocationDB extensions.RevocationDB, config *Config, versionInfo version.Info) (*API, error) {
|
||||
func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo.PointerDB, revocationDB extensions.RevocationDB, liveAccounting accounting.Cache, config *Config, versionInfo version.Info) (*API, error) {
|
||||
peer := &API{
|
||||
Log: log,
|
||||
Identity: full,
|
||||
@ -208,19 +207,14 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metai
|
||||
|
||||
{ // setup live accounting
|
||||
log.Debug("Satellite API Process setting up live accounting")
|
||||
config := config.LiveAccounting
|
||||
liveAccountingService, err := live.New(peer.Log.Named("live-accounting"), config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
peer.LiveAccounting.Service = liveAccountingService
|
||||
peer.LiveAccounting.Cache = liveAccounting
|
||||
}
|
||||
|
||||
{ // setup accounting project usage
|
||||
log.Debug("Satellite API Process setting up accounting project usage")
|
||||
peer.Accounting.ProjectUsage = accounting.NewProjectUsage(
|
||||
peer.DB.ProjectAccounting(),
|
||||
peer.LiveAccounting.Service,
|
||||
peer.LiveAccounting.Cache,
|
||||
config.Rollup.MaxAlphaUsage,
|
||||
)
|
||||
}
|
||||
|
@ -207,7 +207,7 @@ type Peer struct {
|
||||
}
|
||||
|
||||
LiveAccounting struct {
|
||||
Service live.Service
|
||||
Cache accounting.Cache
|
||||
}
|
||||
|
||||
Mail struct {
|
||||
@ -240,7 +240,7 @@ type Peer struct {
|
||||
}
|
||||
|
||||
// New creates a new satellite
|
||||
func New(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo.PointerDB, revocationDB extensions.RevocationDB, versionInfo version.Info, config *Config) (*Peer, error) {
|
||||
func New(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo.PointerDB, revocationDB extensions.RevocationDB, liveAccounting accounting.Cache, versionInfo version.Info, config *Config) (*Peer, error) {
|
||||
peer := &Peer{
|
||||
Log: log,
|
||||
Identity: full,
|
||||
@ -326,19 +326,14 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo
|
||||
|
||||
{ // setup live accounting
|
||||
log.Debug("Setting up live accounting")
|
||||
config := config.LiveAccounting
|
||||
liveAccountingService, err := live.New(peer.Log.Named("live-accounting"), config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
peer.LiveAccounting.Service = liveAccountingService
|
||||
peer.LiveAccounting.Cache = liveAccounting
|
||||
}
|
||||
|
||||
{ // setup accounting project usage
|
||||
log.Debug("Setting up accounting project usage")
|
||||
peer.Accounting.ProjectUsage = accounting.NewProjectUsage(
|
||||
peer.DB.ProjectAccounting(),
|
||||
peer.LiveAccounting.Service,
|
||||
peer.LiveAccounting.Cache,
|
||||
config.Rollup.MaxAlphaUsage,
|
||||
)
|
||||
}
|
||||
@ -493,7 +488,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo
|
||||
|
||||
{ // setup accounting
|
||||
log.Debug("Setting up accounting")
|
||||
peer.Accounting.Tally = tally.New(peer.Log.Named("tally"), peer.DB.StoragenodeAccounting(), peer.DB.ProjectAccounting(), peer.LiveAccounting.Service, peer.Metainfo.Loop, config.Tally.Interval)
|
||||
peer.Accounting.Tally = tally.New(peer.Log.Named("tally"), peer.DB.StoragenodeAccounting(), peer.DB.ProjectAccounting(), peer.LiveAccounting.Cache, peer.Metainfo.Loop, config.Tally.Interval)
|
||||
peer.Accounting.Rollup = rollup.New(peer.Log.Named("rollup"), peer.DB.StoragenodeAccounting(), config.Rollup.Interval, config.Rollup.DeleteTallies)
|
||||
}
|
||||
|
||||
|
2
scripts/testdata/satellite-config.yaml.lock
vendored
2
scripts/testdata/satellite-config.yaml.lock
vendored
@ -128,7 +128,7 @@ identity.cert-path: /root/.local/share/storj/identity/satellite/identity.cert
|
||||
identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
|
||||
|
||||
# what to use for storing real-time accounting data
|
||||
# live-accounting.storage-backend: plainmemory
|
||||
# live-accounting.storage-backend: memory
|
||||
|
||||
# if true, log function filename and line number
|
||||
# log.caller: false
|
||||
|
@ -93,6 +93,16 @@ func (client *Client) Put(ctx context.Context, key storage.Key, value storage.Va
|
||||
return put(ctx, client.db, key, value, client.TTL)
|
||||
}
|
||||
|
||||
// IncrBy increments the value stored in key by the specified value.
|
||||
func (client *Client) IncrBy(ctx context.Context, key storage.Key, value int64) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
if key.IsZero() {
|
||||
return storage.ErrEmptyKey.New("")
|
||||
}
|
||||
_, err = client.db.IncrBy(key.String(), value).Result()
|
||||
return err
|
||||
}
|
||||
|
||||
// List returns either a list of keys for which boltdb has values or an error.
|
||||
func (client *Client) List(ctx context.Context, first storage.Key, limit int) (_ storage.Keys, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
Loading…
Reference in New Issue
Block a user