From 76ad83f12c61a50e711b9a042c839ec480bf0e57 Mon Sep 17 00:00:00 2001 From: Cameron Date: Wed, 16 Oct 2019 12:50:29 -0400 Subject: [PATCH] satellite/accounting: add redis support to live accounting (#3213) * set up redis support in live accounting * move live.Service interface into accounting package and rename to Cache, pass into satellite * refactor Cache to store one int64 total, add IncrBy method to redis client implementation * add monkit tracing to live accounting --- cmd/satellite/main.go | 11 +- internal/testplanet/satellite.go | 20 +- satellite/accounting/db.go | 10 + satellite/accounting/live/cache.go | 45 +++++ satellite/accounting/live/live.go | 103 ----------- satellite/accounting/live/live_test.go | 193 +++++++++++++++----- satellite/accounting/live/memory.go | 77 ++++++++ satellite/accounting/live/redis.go | 72 ++++++++ satellite/accounting/projectusage.go | 21 +-- satellite/accounting/tally/tally.go | 10 +- satellite/api.go | 14 +- satellite/peer.go | 15 +- scripts/testdata/satellite-config.yaml.lock | 2 +- storage/redis/client.go | 10 + 14 files changed, 418 insertions(+), 185 deletions(-) create mode 100644 satellite/accounting/live/cache.go delete mode 100644 satellite/accounting/live/live.go create mode 100644 satellite/accounting/live/memory.go create mode 100644 satellite/accounting/live/redis.go diff --git a/cmd/satellite/main.go b/cmd/satellite/main.go index 467615723..d19ab2d45 100644 --- a/cmd/satellite/main.go +++ b/cmd/satellite/main.go @@ -23,6 +23,7 @@ import ( "storj.io/storj/pkg/process" "storj.io/storj/pkg/revocation" "storj.io/storj/satellite" + "storj.io/storj/satellite/accounting/live" "storj.io/storj/satellite/metainfo" "storj.io/storj/satellite/satellitedb" ) @@ -147,7 +148,15 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) { err = errs.Combine(err, revocationDB.Close()) }() - peer, err := satellite.New(log, identity, db, pointerDB, revocationDB, version.Build, &runCfg.Config) + liveAccounting, err := live.NewCache(log.Named("live-accounting"), runCfg.LiveAccounting) + if err != nil { + return errs.New("Error creating live accounting cache: %+v", err) + } + defer func() { + err = errs.Combine(err, liveAccounting.Close()) + }() + + peer, err := satellite.New(log, identity, db, pointerDB, revocationDB, liveAccounting, version.Build, &runCfg.Config) if err != nil { return err } diff --git a/internal/testplanet/satellite.go b/internal/testplanet/satellite.go index 34e0a51ef..98b384d61 100644 --- a/internal/testplanet/satellite.go +++ b/internal/testplanet/satellite.go @@ -121,7 +121,7 @@ type SatelliteSystem struct { } LiveAccounting struct { - Service live.Service + Cache accounting.Cache } Mail struct { @@ -370,9 +370,17 @@ func (planet *Planet) newSatellites(count int) ([]*SatelliteSystem, error) { if err != nil { return xs, errs.Wrap(err) } + planet.databases = append(planet.databases, revocationDB) - peer, err := satellite.New(log, identity, db, pointerDB, revocationDB, versionInfo, &config) + liveAccountingCache, err := live.NewCache(log.Named("live-accounting"), config.LiveAccounting) + if err != nil { + return xs, errs.Wrap(err) + } + + planet.databases = append(planet.databases, liveAccountingCache) + + peer, err := satellite.New(log, identity, db, pointerDB, revocationDB, liveAccountingCache, versionInfo, &config) if err != nil { return xs, err } @@ -465,5 +473,11 @@ func (planet *Planet) newAPI(count int, identity *identity.FullIdentity, db sate } planet.databases = append(planet.databases, revocationDB) - return satellite.NewAPI(log, identity, db, pointerDB, revocationDB, &config, versionInfo) + liveAccounting, err := live.NewCache(log.Named("live-accounting"), config.LiveAccounting) + if err != nil { + return nil, errs.Wrap(err) + } + planet.databases = append(planet.databases, liveAccounting) + + return satellite.NewAPI(log, identity, db, pointerDB, revocationDB, liveAccounting, &config, versionInfo) } diff --git a/satellite/accounting/db.go b/satellite/accounting/db.go index 49f98f21f..8c7434e62 100644 --- a/satellite/accounting/db.go +++ b/satellite/accounting/db.go @@ -94,3 +94,13 @@ type ProjectAccounting interface { // GetProjectUsageLimits returns project usage limit GetProjectUsageLimits(ctx context.Context, projectID uuid.UUID) (memory.Size, error) } + +// Cache stores live information about project storage which has not yet been synced to ProjectAccounting. +// +// architecture: Database +type Cache interface { + GetProjectStorageUsage(ctx context.Context, projectID uuid.UUID) (totalUsed int64, err error) + AddProjectStorageUsage(ctx context.Context, projectID uuid.UUID, inlineSpaceUsed, remoteSpaceUsed int64) error + ResetTotals(ctx context.Context) error + Close() error +} diff --git a/satellite/accounting/live/cache.go b/satellite/accounting/live/cache.go new file mode 100644 index 000000000..9aeac66bb --- /dev/null +++ b/satellite/accounting/live/cache.go @@ -0,0 +1,45 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package live + +import ( + "strings" + + "github.com/zeebo/errs" + "go.uber.org/zap" + "gopkg.in/spacemonkeygo/monkit.v2" + + "storj.io/storj/satellite/accounting" +) + +var ( + // Error is the default error class for live-accounting + Error = errs.Class("live-accounting") + mon = monkit.Package() +) + +// Config contains configurable values for the live accounting service. +type Config struct { + StorageBackend string `help:"what to use for storing real-time accounting data" default:"memory"` +} + +// NewCache creates a new accounting.Cache instance using the type specified backend in +// the provided config. +func NewCache(log *zap.Logger, config Config) (accounting.Cache, error) { + parts := strings.SplitN(config.StorageBackend, ":", 2) + var backendType string + if len(parts) == 0 || parts[0] == "" { + backendType = "memory" + } else { + backendType = parts[0] + } + switch backendType { + case "memory": + return newMemoryLiveAccounting(log) + case "redis": + return newRedisLiveAccounting(log, config.StorageBackend) + default: + return nil, Error.New("unrecognized live accounting backend specifier %q", backendType) + } +} diff --git a/satellite/accounting/live/live.go b/satellite/accounting/live/live.go deleted file mode 100644 index c385f7e48..000000000 --- a/satellite/accounting/live/live.go +++ /dev/null @@ -1,103 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information. - -package live - -import ( - "context" - "strings" - "sync" - - "github.com/skyrings/skyring-common/tools/uuid" - "github.com/zeebo/errs" - "go.uber.org/zap" -) - -// Config contains configurable values for the live accounting service. -type Config struct { - StorageBackend string `help:"what to use for storing real-time accounting data" default:"plainmemory"` -} - -// Service represents the external interface to the live accounting -// functionality. -// -// architecture: Service -type Service interface { - GetProjectStorageUsage(ctx context.Context, projectID uuid.UUID) (int64, int64, error) - AddProjectStorageUsage(ctx context.Context, projectID uuid.UUID, inlineSpaceUsed, remoteSpaceUsed int64) error - ResetTotals() -} - -// New creates a new live.Service instance of the type specified in -// the provided config. -func New(log *zap.Logger, config Config) (Service, error) { - parts := strings.SplitN(config.StorageBackend, ":", 2) - var backendType string - if len(parts) == 0 || parts[0] == "" { - backendType = "plainmemory" - } else { - backendType = parts[0] - } - if backendType == "plainmemory" { - return newPlainMemoryLiveAccounting(log) - } - return nil, errs.New("unrecognized live accounting backend specifier %q", backendType) -} - -// plainMemoryLiveAccounting represents an live.Service-implementing -// instance using plain memory (no coordination with other servers). It can be -// used to coordinate tracking of how much space a project has used. -// -// This should probably only be used at small scale or for testing areas where -// the accounting cache does not matter significantly. For production, an -// implementation that allows multiple servers to participate together would -// be preferable. -type plainMemoryLiveAccounting struct { - log *zap.Logger - - spaceMapLock sync.RWMutex - spaceDeltas map[uuid.UUID]spaceUsedAccounting -} - -type spaceUsedAccounting struct { - inlineSpace int64 - remoteSpace int64 -} - -func newPlainMemoryLiveAccounting(log *zap.Logger) (*plainMemoryLiveAccounting, error) { - pmac := &plainMemoryLiveAccounting{log: log} - pmac.ResetTotals() - return pmac, nil -} - -// GetProjectStorageUsage gets inline and remote storage totals for a given -// project, back to the time of the last accounting tally. -func (pmac *plainMemoryLiveAccounting) GetProjectStorageUsage(ctx context.Context, projectID uuid.UUID) (inlineTotal, remoteTotal int64, err error) { - pmac.spaceMapLock.Lock() - defer pmac.spaceMapLock.Unlock() - curVal := pmac.spaceDeltas[projectID] - return curVal.inlineSpace, curVal.remoteSpace, nil -} - -// AddProjectStorageUsage lets the live accounting know that the given -// project has just added inlineSpaceUsed bytes of inline space usage -// and remoteSpaceUsed bytes of remote space usage. -func (pmac *plainMemoryLiveAccounting) AddProjectStorageUsage(ctx context.Context, projectID uuid.UUID, inlineSpaceUsed, remoteSpaceUsed int64) error { - pmac.spaceMapLock.Lock() - defer pmac.spaceMapLock.Unlock() - curVal := pmac.spaceDeltas[projectID] - curVal.inlineSpace += inlineSpaceUsed - curVal.remoteSpace += remoteSpaceUsed - pmac.spaceDeltas[projectID] = curVal - return nil -} - -// ResetTotals reset all space-used totals for all projects back to zero. This -// would normally be done in concert with calculating new tally counts in the -// accountingDB. -func (pmac *plainMemoryLiveAccounting) ResetTotals() { - pmac.log.Info("Resetting real-time accounting data") - pmac.spaceMapLock.Lock() - pmac.spaceDeltas = make(map[uuid.UUID]spaceUsedAccounting) - pmac.spaceMapLock.Unlock() -} diff --git a/satellite/accounting/live/live_test.go b/satellite/accounting/live/live_test.go index c2acfb418..771be6c28 100644 --- a/satellite/accounting/live/live_test.go +++ b/satellite/accounting/live/live_test.go @@ -1,10 +1,11 @@ // Copyright (C) 2019 Storj Labs, Inc. // See LICENSE for copying information. -package live +package live_test import ( "context" + "fmt" "math/rand" "testing" @@ -14,35 +15,168 @@ import ( "go.uber.org/zap/zaptest" "golang.org/x/sync/errgroup" + "storj.io/storj/internal/testcontext" "storj.io/storj/internal/testrand" + "storj.io/storj/satellite/accounting" + "storj.io/storj/satellite/accounting/live" + "storj.io/storj/storage/redis/redisserver" ) -func TestPlainMemoryLiveAccounting(t *testing.T) { +func TestLiveAccountingCache(t *testing.T) { + tests := []struct { + backend string + }{ + { + backend: "memory", + }, + { + backend: "redis", + }, + } + ctx := testcontext.New(t) + defer ctx.Cleanup() + + address, cleanup, err := redisserver.Start() + require.NoError(t, err) + defer cleanup() + + for _, tt := range tests { + var config live.Config + if tt.backend == "redis" { + config = live.Config{ + StorageBackend: "redis://" + address + "?db=0", + } + } + + cache, err := live.NewCache(zaptest.NewLogger(t).Named("live-accounting"), config) + require.NoError(t, err) + + projectIDs, sum, err := populateCache(ctx, cache) + require.NoError(t, err) + + // make sure all of the "projects" got all space updates and got right totals + for _, projID := range projectIDs { + spaceUsed, err := cache.GetProjectStorageUsage(ctx, projID) + require.NoError(t, err) + assert.Equalf(t, sum, spaceUsed, "projectID %v", projID) + } + + err = cache.ResetTotals(ctx) + require.NoError(t, err) + + for _, projID := range projectIDs { + spaceUsed, err := cache.GetProjectStorageUsage(ctx, projID) + require.NoError(t, err) + assert.EqualValues(t, 0, spaceUsed) + } + } +} + +func TestRedisCacheConcurrency(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + address, cleanup, err := redisserver.Start() + require.NoError(t, err) + defer cleanup() + + config := live.Config{ + StorageBackend: "redis://" + address + "?db=0", + } + cache, err := live.NewCache(zaptest.NewLogger(t).Named("live-accounting"), config) + require.NoError(t, err) + + projectID := testrand.UUID() + + const ( + numConcurrent = 100 + inlineAmount = 10 + remoteAmount = 10 + ) + expectedSum := (inlineAmount * numConcurrent) + (remoteAmount * numConcurrent) + + var group errgroup.Group + for i := 0; i < numConcurrent; i++ { + group.Go(func() error { + return cache.AddProjectStorageUsage(ctx, projectID, inlineAmount, remoteAmount) + }) + } + require.NoError(t, group.Wait()) + + spaceUsed, err := cache.GetProjectStorageUsage(ctx, projectID) + require.NoError(t, err) + + require.EqualValues(t, expectedSum, spaceUsed) +} + +func TestPlainMemoryCacheConcurrency(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + config := live.Config{ + StorageBackend: "memory", + } + cache, err := live.NewCache(zaptest.NewLogger(t).Named("live-accounting"), config) + require.NoError(t, err) + + projectID := testrand.UUID() + + const ( + numConcurrent = 100 + inlineAmount = 10 + remoteAmount = 10 + ) + expectedSum := (inlineAmount * numConcurrent) + (remoteAmount * numConcurrent) + + var group errgroup.Group + for i := 0; i < numConcurrent; i++ { + group.Go(func() error { + return cache.AddProjectStorageUsage(ctx, projectID, inlineAmount, remoteAmount) + }) + } + require.NoError(t, group.Wait()) + + spaceUsed, err := cache.GetProjectStorageUsage(ctx, projectID) + require.NoError(t, err) + + require.EqualValues(t, expectedSum, spaceUsed) +} + +func TestNegativeSpaceUsed(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + config := live.Config{ + StorageBackend: "memory:", + } + cache, err := live.NewCache(zaptest.NewLogger(t).Named("live-accounting"), config) + require.NoError(t, err) + + projectID := testrand.UUID() + inline := int64(-10) + remote := int64(-20) + + expectedError := fmt.Sprintf("live-accounting: Used space amounts must be greater than 0. Inline: %d, Remote: %d", inline, remote) + + err = cache.AddProjectStorageUsage(ctx, projectID, inline, remote) + require.EqualError(t, err, expectedError) +} + +func populateCache(ctx context.Context, cache accounting.Cache) (projectIDs []uuid.UUID, sum int64, _ error) { const ( valuesListSize = 1000 valueMultiplier = 4096 numProjects = 200 ) - config := Config{ - StorageBackend: "plainmemory:", - } - service, err := New(zaptest.NewLogger(t).Named("live-accounting"), config) - require.NoError(t, err) - - // ensure we are using the expected underlying type - _, ok := service.(*plainMemoryLiveAccounting) - require.True(t, ok) - // make a largish list of varying values someValues := make([]int64, valuesListSize) - sum := int64(0) for i := range someValues { someValues[i] = int64((i + 1) * valueMultiplier) - sum += someValues[i] + sum += someValues[i] * 2 } // make up some project IDs - projectIDs := make([]uuid.UUID, numProjects) + projectIDs = make([]uuid.UUID, numProjects) for i := range projectIDs { projectIDs[i] = testrand.UUID() } @@ -61,38 +195,13 @@ func TestPlainMemoryLiveAccounting(t *testing.T) { }) for _, val := range myValues { - if err := service.AddProjectStorageUsage(ctx, projID, val, val); err != nil { + if err := cache.AddProjectStorageUsage(ctx, projID, val, val); err != nil { return err } } return nil }) } - require.NoError(t, errg.Wait()) - // make sure all of the "projects" got all space updates and got right totals - for _, projID := range projectIDs { - inlineUsed, remoteUsed, err := service.GetProjectStorageUsage(ctx, projID) - require.NoError(t, err) - assert.Equalf(t, sum, inlineUsed, "projectID %v", projID) - assert.Equalf(t, sum, remoteUsed, "projectID %v", projID) - } -} - -func TestResetTotals(t *testing.T) { - config := Config{ - StorageBackend: "plainmemory:", - } - service, err := New(zaptest.NewLogger(t).Named("live-accounting"), config) - require.NoError(t, err) - - // ensure we are using the expected underlying type - _, ok := service.(*plainMemoryLiveAccounting) - require.True(t, ok) - - ctx := context.Background() - - projectID := testrand.UUID() - err = service.AddProjectStorageUsage(ctx, projectID, 0, -20) - require.NoError(t, err) + return projectIDs, sum, errg.Wait() } diff --git a/satellite/accounting/live/memory.go b/satellite/accounting/live/memory.go new file mode 100644 index 000000000..749613c4e --- /dev/null +++ b/satellite/accounting/live/memory.go @@ -0,0 +1,77 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package live + +import ( + "context" + "sync" + + "github.com/skyrings/skyring-common/tools/uuid" + "go.uber.org/zap" +) + +// memoryLiveAccounting represents an accounting.Cache-implementing +// instance using plain memory (no coordination with other servers). It can be +// used to coordinate tracking of how much space a project has used. +// +// This should probably only be used at small scale or for testing areas where +// the accounting cache does not matter significantly. For production, an +// implementation that allows multiple servers to participate together would +// be preferable. +type memoryLiveAccounting struct { + log *zap.Logger + + spaceMapLock sync.RWMutex + spaceDeltas map[uuid.UUID]int64 +} + +func newMemoryLiveAccounting(log *zap.Logger) (*memoryLiveAccounting, error) { + pmac := &memoryLiveAccounting{log: log} + pmac.spaceDeltas = make(map[uuid.UUID]int64, 0) + return pmac, nil +} + +// GetProjectStorageUsage gets inline and remote storage totals for a given +// project, back to the time of the last accounting tally. +func (mac *memoryLiveAccounting) GetProjectStorageUsage(ctx context.Context, projectID uuid.UUID) (totalUsed int64, err error) { + defer mon.Task()(&ctx, projectID)(&err) + mac.spaceMapLock.Lock() + defer mac.spaceMapLock.Unlock() + curVal, ok := mac.spaceDeltas[projectID] + if !ok { + return 0, nil + } + return curVal, nil +} + +// AddProjectStorageUsage lets the live accounting know that the given +// project has just added inlineSpaceUsed bytes of inline space usage +// and remoteSpaceUsed bytes of remote space usage. +func (mac *memoryLiveAccounting) AddProjectStorageUsage(ctx context.Context, projectID uuid.UUID, inlineSpaceUsed, remoteSpaceUsed int64) (err error) { + defer mon.Task()(&ctx, projectID, inlineSpaceUsed, remoteSpaceUsed)(&err) + if inlineSpaceUsed < 0 || remoteSpaceUsed < 0 { + return Error.New("Used space amounts must be greater than 0. Inline: %d, Remote: %d", inlineSpaceUsed, remoteSpaceUsed) + } + mac.spaceMapLock.Lock() + defer mac.spaceMapLock.Unlock() + curVal := mac.spaceDeltas[projectID] + newTotal := curVal + inlineSpaceUsed + remoteSpaceUsed + mac.spaceDeltas[projectID] = newTotal + return nil +} + +// ResetTotals reset all space-used totals for all projects back to zero. This +// would normally be done in concert with calculating new tally counts in the +// accountingDB. +func (mac *memoryLiveAccounting) ResetTotals(ctx context.Context) (err error) { + defer mon.Task()(&ctx)(&err) + mac.log.Debug("Resetting real-time accounting data") + mac.spaceMapLock.Lock() + mac.spaceDeltas = make(map[uuid.UUID]int64) + mac.spaceMapLock.Unlock() + return nil +} + +// Close matches the accounting.LiveAccounting interface. +func (mac *memoryLiveAccounting) Close() error { return nil } diff --git a/satellite/accounting/live/redis.go b/satellite/accounting/live/redis.go new file mode 100644 index 000000000..928f022c9 --- /dev/null +++ b/satellite/accounting/live/redis.go @@ -0,0 +1,72 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package live + +import ( + "context" + "strconv" + + "github.com/skyrings/skyring-common/tools/uuid" + "go.uber.org/zap" + + "storj.io/storj/storage" + "storj.io/storj/storage/redis" +) + +type redisLiveAccounting struct { + log *zap.Logger + + client *redis.Client +} + +func newRedisLiveAccounting(log *zap.Logger, address string) (*redisLiveAccounting, error) { + client, err := redis.NewClientFrom(address) + if err != nil { + return nil, Error.Wrap(err) + } + return &redisLiveAccounting{ + log: log, + client: client, + }, nil +} + +// GetProjectStorageUsage gets inline and remote storage totals for a given +// project, back to the time of the last accounting tally. +func (cache *redisLiveAccounting) GetProjectStorageUsage(ctx context.Context, projectID uuid.UUID) (totalUsed int64, err error) { + defer mon.Task()(&ctx, projectID)(&err) + val, err := cache.client.Get(ctx, []byte(projectID.String())) + if err != nil { + if storage.ErrKeyNotFound.Has(err) { + return 0, nil + } + return 0, Error.Wrap(err) + } + intval, err := strconv.Atoi(string(val)) + return int64(intval), Error.Wrap(err) +} + +// AddProjectStorageUsage lets the live accounting know that the given +// project has just added inlineSpaceUsed bytes of inline space usage +// and remoteSpaceUsed bytes of remote space usage. +func (cache *redisLiveAccounting) AddProjectStorageUsage(ctx context.Context, projectID uuid.UUID, inlineSpaceUsed, remoteSpaceUsed int64) (err error) { + defer mon.Task()(&ctx, projectID, inlineSpaceUsed, remoteSpaceUsed)(&err) + if inlineSpaceUsed < 0 || remoteSpaceUsed < 0 { + return Error.New("Used space amounts must be greater than 0. Inline: %d, Remote: %d", inlineSpaceUsed, remoteSpaceUsed) + } + return cache.client.IncrBy(ctx, []byte(projectID.String()), inlineSpaceUsed+remoteSpaceUsed) +} + +// ResetTotals reset all space-used totals for all projects back to zero. This +// would normally be done in concert with calculating new tally counts in the +// accountingDB. +func (cache *redisLiveAccounting) ResetTotals(ctx context.Context) (err error) { + defer mon.Task()(&ctx)(&err) + cache.log.Debug("Resetting real-time accounting data") + return cache.client.FlushDB() +} + +// Close the DB connection. +func (cache *redisLiveAccounting) Close() error { + return cache.client.Close() +} diff --git a/satellite/accounting/projectusage.go b/satellite/accounting/projectusage.go index 9cf768114..0080c3d9d 100644 --- a/satellite/accounting/projectusage.go +++ b/satellite/accounting/projectusage.go @@ -13,7 +13,6 @@ import ( monkit "gopkg.in/spacemonkeygo/monkit.v2" "storj.io/storj/internal/memory" - "storj.io/storj/satellite/accounting/live" ) var mon = monkit.Package() @@ -36,12 +35,12 @@ var ( // architecture: Service type ProjectUsage struct { projectAccountingDB ProjectAccounting - liveAccounting live.Service + liveAccounting Cache maxAlphaUsage memory.Size } // NewProjectUsage created new instance of project usage service -func NewProjectUsage(projectAccountingDB ProjectAccounting, liveAccounting live.Service, maxAlphaUsage memory.Size) *ProjectUsage { +func NewProjectUsage(projectAccountingDB ProjectAccounting, liveAccounting Cache, maxAlphaUsage memory.Size) *ProjectUsage { return &ProjectUsage{ projectAccountingDB: projectAccountingDB, liveAccounting: liveAccounting, @@ -96,7 +95,7 @@ func (usage *ProjectUsage) ExceedsStorageUsage(ctx context.Context, projectID uu defer mon.Task()(&ctx)(&err) var group errgroup.Group - var inlineTotal, remoteTotal int64 + var totalUsed int64 limit = usage.maxAlphaUsage // TODO(michal): to reduce db load, consider using a cache to retrieve the project.UsageLimit value if needed @@ -109,7 +108,7 @@ func (usage *ProjectUsage) ExceedsStorageUsage(ctx context.Context, projectID uu }) group.Go(func() error { var err error - inlineTotal, remoteTotal, err = usage.getProjectStorageTotals(ctx, projectID) + totalUsed, err = usage.getProjectStorageTotals(ctx, projectID) return err }) err = group.Wait() @@ -118,25 +117,25 @@ func (usage *ProjectUsage) ExceedsStorageUsage(ctx context.Context, projectID uu } maxUsage := limit.Int64() * int64(ExpansionFactor) - if inlineTotal+remoteTotal >= maxUsage { + if totalUsed >= maxUsage { return true, limit, nil } return false, limit, nil } -func (usage *ProjectUsage) getProjectStorageTotals(ctx context.Context, projectID uuid.UUID) (inline int64, remote int64, err error) { +func (usage *ProjectUsage) getProjectStorageTotals(ctx context.Context, projectID uuid.UUID) (total int64, err error) { defer mon.Task()(&ctx)(&err) lastCountInline, lastCountRemote, err := usage.projectAccountingDB.GetStorageTotals(ctx, projectID) if err != nil { - return 0, 0, err + return 0, err } - rtInline, rtRemote, err := usage.liveAccounting.GetProjectStorageUsage(ctx, projectID) + cachedTotal, err := usage.liveAccounting.GetProjectStorageUsage(ctx, projectID) if err != nil { - return 0, 0, err + return 0, err } - return lastCountInline + rtInline, lastCountRemote + rtRemote, nil + return lastCountInline + lastCountRemote + cachedTotal, nil } // AddProjectStorageUsage lets the live accounting know that the given diff --git a/satellite/accounting/tally/tally.go b/satellite/accounting/tally/tally.go index 3f0934158..43c35231f 100644 --- a/satellite/accounting/tally/tally.go +++ b/satellite/accounting/tally/tally.go @@ -15,7 +15,6 @@ import ( "storj.io/storj/pkg/pb" "storj.io/storj/pkg/storj" "storj.io/storj/satellite/accounting" - "storj.io/storj/satellite/accounting/live" "storj.io/storj/satellite/metainfo" ) @@ -38,13 +37,13 @@ type Service struct { Loop sync2.Cycle metainfoLoop *metainfo.Loop - liveAccounting live.Service + liveAccounting accounting.Cache storagenodeAccountingDB accounting.StoragenodeAccounting projectAccountingDB accounting.ProjectAccounting } // New creates a new tally Service -func New(log *zap.Logger, sdb accounting.StoragenodeAccounting, pdb accounting.ProjectAccounting, liveAccounting live.Service, metainfoLoop *metainfo.Loop, interval time.Duration) *Service { +func New(log *zap.Logger, sdb accounting.StoragenodeAccounting, pdb accounting.ProjectAccounting, liveAccounting accounting.Cache, metainfoLoop *metainfo.Loop, interval time.Duration) *Service { return &Service{ log: log, Loop: *sync2.NewCycle(interval), @@ -87,7 +86,10 @@ func (service *Service) Tally(ctx context.Context) (err error) { // double-counted (counted in the tally and also counted as a delta to // the tally). If that happens, it will be fixed at the time of the next // tally run. - service.liveAccounting.ResetTotals() + err = service.liveAccounting.ResetTotals(ctx) + if err != nil { + return Error.Wrap(err) + } // Fetch when the last tally happened so we can roughly calculate the byte-hours. lastTime, err := service.storagenodeAccountingDB.LastTimestamp(ctx, accounting.LastAtRestTally) diff --git a/satellite/api.go b/satellite/api.go index 232ea83f6..4e37269ef 100644 --- a/satellite/api.go +++ b/satellite/api.go @@ -27,7 +27,6 @@ import ( "storj.io/storj/pkg/signing" "storj.io/storj/pkg/storj" "storj.io/storj/satellite/accounting" - "storj.io/storj/satellite/accounting/live" "storj.io/storj/satellite/console" "storj.io/storj/satellite/console/consoleauth" "storj.io/storj/satellite/console/consoleweb" @@ -96,7 +95,7 @@ type API struct { } LiveAccounting struct { - Service live.Service + Cache accounting.Cache } Mail struct { @@ -124,7 +123,7 @@ type API struct { } // NewAPI creates a new satellite API process -func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo.PointerDB, revocationDB extensions.RevocationDB, config *Config, versionInfo version.Info) (*API, error) { +func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo.PointerDB, revocationDB extensions.RevocationDB, liveAccounting accounting.Cache, config *Config, versionInfo version.Info) (*API, error) { peer := &API{ Log: log, Identity: full, @@ -208,19 +207,14 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metai { // setup live accounting log.Debug("Satellite API Process setting up live accounting") - config := config.LiveAccounting - liveAccountingService, err := live.New(peer.Log.Named("live-accounting"), config) - if err != nil { - return nil, err - } - peer.LiveAccounting.Service = liveAccountingService + peer.LiveAccounting.Cache = liveAccounting } { // setup accounting project usage log.Debug("Satellite API Process setting up accounting project usage") peer.Accounting.ProjectUsage = accounting.NewProjectUsage( peer.DB.ProjectAccounting(), - peer.LiveAccounting.Service, + peer.LiveAccounting.Cache, config.Rollup.MaxAlphaUsage, ) } diff --git a/satellite/peer.go b/satellite/peer.go index 2f558c4d1..891c28251 100644 --- a/satellite/peer.go +++ b/satellite/peer.go @@ -207,7 +207,7 @@ type Peer struct { } LiveAccounting struct { - Service live.Service + Cache accounting.Cache } Mail struct { @@ -240,7 +240,7 @@ type Peer struct { } // New creates a new satellite -func New(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo.PointerDB, revocationDB extensions.RevocationDB, versionInfo version.Info, config *Config) (*Peer, error) { +func New(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo.PointerDB, revocationDB extensions.RevocationDB, liveAccounting accounting.Cache, versionInfo version.Info, config *Config) (*Peer, error) { peer := &Peer{ Log: log, Identity: full, @@ -326,19 +326,14 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo { // setup live accounting log.Debug("Setting up live accounting") - config := config.LiveAccounting - liveAccountingService, err := live.New(peer.Log.Named("live-accounting"), config) - if err != nil { - return nil, err - } - peer.LiveAccounting.Service = liveAccountingService + peer.LiveAccounting.Cache = liveAccounting } { // setup accounting project usage log.Debug("Setting up accounting project usage") peer.Accounting.ProjectUsage = accounting.NewProjectUsage( peer.DB.ProjectAccounting(), - peer.LiveAccounting.Service, + peer.LiveAccounting.Cache, config.Rollup.MaxAlphaUsage, ) } @@ -493,7 +488,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo { // setup accounting log.Debug("Setting up accounting") - peer.Accounting.Tally = tally.New(peer.Log.Named("tally"), peer.DB.StoragenodeAccounting(), peer.DB.ProjectAccounting(), peer.LiveAccounting.Service, peer.Metainfo.Loop, config.Tally.Interval) + peer.Accounting.Tally = tally.New(peer.Log.Named("tally"), peer.DB.StoragenodeAccounting(), peer.DB.ProjectAccounting(), peer.LiveAccounting.Cache, peer.Metainfo.Loop, config.Tally.Interval) peer.Accounting.Rollup = rollup.New(peer.Log.Named("rollup"), peer.DB.StoragenodeAccounting(), config.Rollup.Interval, config.Rollup.DeleteTallies) } diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index 62735a689..8350cbd2f 100644 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -128,7 +128,7 @@ identity.cert-path: /root/.local/share/storj/identity/satellite/identity.cert identity.key-path: /root/.local/share/storj/identity/satellite/identity.key # what to use for storing real-time accounting data -# live-accounting.storage-backend: plainmemory +# live-accounting.storage-backend: memory # if true, log function filename and line number # log.caller: false diff --git a/storage/redis/client.go b/storage/redis/client.go index 33a46ade5..5111b926a 100644 --- a/storage/redis/client.go +++ b/storage/redis/client.go @@ -93,6 +93,16 @@ func (client *Client) Put(ctx context.Context, key storage.Key, value storage.Va return put(ctx, client.db, key, value, client.TTL) } +// IncrBy increments the value stored in key by the specified value. +func (client *Client) IncrBy(ctx context.Context, key storage.Key, value int64) (err error) { + defer mon.Task()(&ctx)(&err) + if key.IsZero() { + return storage.ErrEmptyKey.New("") + } + _, err = client.db.IncrBy(key.String(), value).Result() + return err +} + // List returns either a list of keys for which boltdb has values or an error. func (client *Client) List(ctx context.Context, first storage.Key, limit int) (_ storage.Keys, err error) { defer mon.Task()(&ctx)(&err)