satellite/accounting: refactor live accounting to hold current estimated totals

live accounting used to be a cache to store writes before they are picked up during
the tally iteration, after which the cache is cleared. This created a window in which
users could potentially exceed the storage limit. This PR refactors live accounting to
hold current estimations of space used per project. This should also reduce DB load
since we no longer need to query the satellite DB when checking space used for limiting.

The mechanism by which the new live accounting system works is as follows:

During the upload of any segment, the size of that segment is added to its respective
project total in live accounting. At the beginning of the tally iteration we record
the current values in live accounting as `initialLiveTotals`. At the end of the tally
iteration we again record the current totals in live accounting as `latestLiveTotals`.
The metainfo loop observer in tally allows us to get the project totals from what it
observed in metainfo DB which are stored in `tallyProjectTotals`. However, for any
particular segment uploaded during the metainfo loop, the observer may or may not
have seen it. Thus, we take half of the difference between `latestLiveTotals` and
`initialLiveTotals`, and add that to the total that was found during tally and set that
as the new live accounting total.

Initially, live accounting was storing the total stored amount across all nodes rather than
the segment size, which is inconsistent with how we record amounts stored in the project
accounting DB, so we have refactored live accounting to record segment size

Change-Id: Ie48bfdef453428fcdc180b2d781a69d58fd927fb
This commit is contained in:
Cameron Ayer 2019-10-31 13:27:38 -04:00
parent 81d53b8097
commit 4424697d7f
16 changed files with 295 additions and 280 deletions

View File

@ -13,7 +13,6 @@ import (
"storj.io/storj/satellite"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
"storj.io/storj/storage/redis/redisserver"
)
// Run runs testplanet in multiple configurations.
@ -30,22 +29,7 @@ func Run(t *testing.T, config Config, test func(t *testing.T, ctx *testcontext.C
t.Skipf("Database %s connection string not provided. %s", satelliteDB.MasterDB.Name, satelliteDB.MasterDB.Message)
}
addr, cleanup, err := redisserver.Mini()
if err != nil {
t.Fatal(err)
}
defer cleanup()
planetConfig := config
reconfigSat := planetConfig.Reconfigure.Satellite
planetConfig.Reconfigure.Satellite = func(log *zap.Logger, index int, config *satellite.Config) {
config.LiveAccounting.StorageBackend = "redis://" + addr + "?db=0"
if reconfigSat != nil {
reconfigSat(log, index, config)
}
}
planetConfig.Reconfigure.NewSatelliteDB = func(log *zap.Logger, index int) (satellite.DB, error) {
return satellitedbtest.CreateMasterDB(ctx, t, "S", index, satelliteDB.MasterDB)
}

View File

@ -57,6 +57,7 @@ import (
"storj.io/storj/satellite/repair/repairer"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
"storj.io/storj/satellite/vouchers"
"storj.io/storj/storage/redis/redisserver"
)
// SatelliteSystem contains all the processes needed to run a full Satellite setup
@ -268,6 +269,13 @@ func (planet *Planet) newSatellites(count int) ([]*SatelliteSystem, error) {
return nil, err
}
liveAccountingServer := redisserver.NewMini()
addr, _, err := liveAccountingServer.Run()
if err != nil {
return xs, errs.Wrap(err)
}
planet.databases = append(planet.databases, liveAccountingServer)
config := satellite.Config{
Server: server.Config{
Address: "127.0.0.1:0",
@ -373,6 +381,9 @@ func (planet *Planet) newSatellites(count int) ([]*SatelliteSystem, error) {
ReportedRollup: reportedrollup.Config{
Interval: defaultInterval,
},
LiveAccounting: live.Config{
StorageBackend: "redis://" + addr + "?db=0",
},
Mail: mailservice.Config{
SMTPServerAddress: "smtp.mail.test:587",
From: "Labs <storj@mail.test>",

View File

@ -177,7 +177,7 @@ type ProjectAccounting interface {
// architecture: Database
type Cache interface {
GetProjectStorageUsage(ctx context.Context, projectID uuid.UUID) (totalUsed int64, err error)
AddProjectStorageUsage(ctx context.Context, projectID uuid.UUID, inlineSpaceUsed, remoteSpaceUsed int64) error
ResetTotals(ctx context.Context) error
AddProjectStorageUsage(ctx context.Context, projectID uuid.UUID, spaceUsed int64) error
GetAllProjectTotals(ctx context.Context) (map[uuid.UUID]int64, error)
Close() error
}

View File

@ -21,7 +21,7 @@ var (
// Config contains configurable values for the live accounting service.
type Config struct {
StorageBackend string `help:"what to use for storing real-time accounting data" default:"memory"`
StorageBackend string `help:"what to use for storing real-time accounting data"`
}
// NewCache creates a new accounting.Cache instance using the type specified backend in
@ -30,16 +30,14 @@ func NewCache(log *zap.Logger, config Config) (accounting.Cache, error) {
parts := strings.SplitN(config.StorageBackend, ":", 2)
var backendType string
if len(parts) == 0 || parts[0] == "" {
backendType = "memory"
} else {
backendType = parts[0]
return nil, Error.New("please specify a backend for live accounting")
}
backendType = parts[0]
switch backendType {
case "memory":
return newMemoryLiveAccounting(log)
case "redis":
return newRedisLiveAccounting(log, config.StorageBackend)
default:
return nil, Error.New("unrecognized live accounting backend specifier %q", backendType)
return nil, Error.New("unrecognized live accounting backend specifier %q. Currently only redis is supported", backendType)
}
}

View File

@ -5,7 +5,6 @@ package live_test
import (
"context"
"fmt"
"math/rand"
"testing"
@ -26,9 +25,6 @@ func TestLiveAccountingCache(t *testing.T) {
tests := []struct {
backend string
}{
{
backend: "memory",
},
{
backend: "redis",
},
@ -61,13 +57,16 @@ func TestLiveAccountingCache(t *testing.T) {
assert.Equalf(t, sum, spaceUsed, "projectID %v", projID)
}
err = cache.ResetTotals(ctx)
require.NoError(t, err)
negativeVal := int64(-100)
sum += negativeVal
for _, projID := range projectIDs {
err = cache.AddProjectStorageUsage(ctx, projID, negativeVal)
require.NoError(t, err)
spaceUsed, err := cache.GetProjectStorageUsage(ctx, projID)
require.NoError(t, err)
assert.EqualValues(t, 0, spaceUsed)
assert.EqualValues(t, sum, spaceUsed)
}
}
}
@ -90,76 +89,22 @@ func TestRedisCacheConcurrency(t *testing.T) {
const (
numConcurrent = 100
inlineAmount = 10
remoteAmount = 10
spaceUsed = 10
)
expectedSum := (inlineAmount * numConcurrent) + (remoteAmount * numConcurrent)
expectedSum := spaceUsed * numConcurrent
var group errgroup.Group
for i := 0; i < numConcurrent; i++ {
group.Go(func() error {
return cache.AddProjectStorageUsage(ctx, projectID, inlineAmount, remoteAmount)
return cache.AddProjectStorageUsage(ctx, projectID, spaceUsed)
})
}
require.NoError(t, group.Wait())
spaceUsed, err := cache.GetProjectStorageUsage(ctx, projectID)
total, err := cache.GetProjectStorageUsage(ctx, projectID)
require.NoError(t, err)
require.EqualValues(t, expectedSum, spaceUsed)
}
func TestPlainMemoryCacheConcurrency(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
config := live.Config{
StorageBackend: "memory",
}
cache, err := live.NewCache(zaptest.NewLogger(t).Named("live-accounting"), config)
require.NoError(t, err)
projectID := testrand.UUID()
const (
numConcurrent = 100
inlineAmount = 10
remoteAmount = 10
)
expectedSum := (inlineAmount * numConcurrent) + (remoteAmount * numConcurrent)
var group errgroup.Group
for i := 0; i < numConcurrent; i++ {
group.Go(func() error {
return cache.AddProjectStorageUsage(ctx, projectID, inlineAmount, remoteAmount)
})
}
require.NoError(t, group.Wait())
spaceUsed, err := cache.GetProjectStorageUsage(ctx, projectID)
require.NoError(t, err)
require.EqualValues(t, expectedSum, spaceUsed)
}
func TestNegativeSpaceUsed(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
config := live.Config{
StorageBackend: "memory:",
}
cache, err := live.NewCache(zaptest.NewLogger(t).Named("live-accounting"), config)
require.NoError(t, err)
projectID := testrand.UUID()
inline := int64(-10)
remote := int64(-20)
expectedError := fmt.Sprintf("live-accounting: Used space amounts must be greater than 0. Inline: %d, Remote: %d", inline, remote)
err = cache.AddProjectStorageUsage(ctx, projectID, inline, remote)
require.EqualError(t, err, expectedError)
require.EqualValues(t, expectedSum, total)
}
func populateCache(ctx context.Context, cache accounting.Cache) (projectIDs []uuid.UUID, sum int64, _ error) {
@ -172,7 +117,7 @@ func populateCache(ctx context.Context, cache accounting.Cache) (projectIDs []uu
someValues := make([]int64, valuesListSize)
for i := range someValues {
someValues[i] = int64((i + 1) * valueMultiplier)
sum += someValues[i] * 2
sum += someValues[i]
}
// make up some project IDs
@ -195,7 +140,7 @@ func populateCache(ctx context.Context, cache accounting.Cache) (projectIDs []uu
})
for _, val := range myValues {
if err := cache.AddProjectStorageUsage(ctx, projID, val, val); err != nil {
if err := cache.AddProjectStorageUsage(ctx, projID, val); err != nil {
return err
}
}
@ -205,3 +150,49 @@ func populateCache(ctx context.Context, cache accounting.Cache) (projectIDs []uu
return projectIDs, sum, errg.Wait()
}
func TestGetAllProjectTotals(t *testing.T) {
tests := []struct {
backend string
}{
{
backend: "redis",
},
}
ctx := testcontext.New(t)
defer ctx.Cleanup()
address, cleanup, err := redisserver.Start()
require.NoError(t, err)
defer cleanup()
for _, tt := range tests {
var config live.Config
if tt.backend == "redis" {
config = live.Config{
StorageBackend: "redis://" + address + "?db=0",
}
}
cache, err := live.NewCache(zaptest.NewLogger(t).Named("live-accounting"), config)
require.NoError(t, err)
projectIDs := make([]uuid.UUID, 1000)
for i := range projectIDs {
projectIDs[i] = testrand.UUID()
err := cache.AddProjectStorageUsage(ctx, projectIDs[i], int64(i))
require.NoError(t, err)
}
projectTotals, err := cache.GetAllProjectTotals(ctx)
require.NoError(t, err)
require.Len(t, projectTotals, len(projectIDs))
// make sure each project ID and total was received
for _, projID := range projectIDs {
total, err := cache.GetProjectStorageUsage(ctx, projID)
require.NoError(t, err)
assert.Equal(t, total, projectTotals[projID])
}
}
}

View File

@ -1,77 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package live
import (
"context"
"sync"
"github.com/skyrings/skyring-common/tools/uuid"
"go.uber.org/zap"
)
// memoryLiveAccounting represents an accounting.Cache-implementing
// instance using plain memory (no coordination with other servers). It can be
// used to coordinate tracking of how much space a project has used.
//
// This should probably only be used at small scale or for testing areas where
// the accounting cache does not matter significantly. For production, an
// implementation that allows multiple servers to participate together would
// be preferable.
type memoryLiveAccounting struct {
log *zap.Logger
spaceMapLock sync.RWMutex
spaceDeltas map[uuid.UUID]int64
}
func newMemoryLiveAccounting(log *zap.Logger) (*memoryLiveAccounting, error) {
pmac := &memoryLiveAccounting{log: log}
pmac.spaceDeltas = make(map[uuid.UUID]int64)
return pmac, nil
}
// GetProjectStorageUsage gets inline and remote storage totals for a given
// project, back to the time of the last accounting tally.
func (mac *memoryLiveAccounting) GetProjectStorageUsage(ctx context.Context, projectID uuid.UUID) (totalUsed int64, err error) {
defer mon.Task()(&ctx, projectID)(&err)
mac.spaceMapLock.Lock()
defer mac.spaceMapLock.Unlock()
curVal, ok := mac.spaceDeltas[projectID]
if !ok {
return 0, nil
}
return curVal, nil
}
// AddProjectStorageUsage lets the live accounting know that the given
// project has just added inlineSpaceUsed bytes of inline space usage
// and remoteSpaceUsed bytes of remote space usage.
func (mac *memoryLiveAccounting) AddProjectStorageUsage(ctx context.Context, projectID uuid.UUID, inlineSpaceUsed, remoteSpaceUsed int64) (err error) {
defer mon.Task()(&ctx, projectID, inlineSpaceUsed, remoteSpaceUsed)(&err)
if inlineSpaceUsed < 0 || remoteSpaceUsed < 0 {
return Error.New("Used space amounts must be greater than 0. Inline: %d, Remote: %d", inlineSpaceUsed, remoteSpaceUsed)
}
mac.spaceMapLock.Lock()
defer mac.spaceMapLock.Unlock()
curVal := mac.spaceDeltas[projectID]
newTotal := curVal + inlineSpaceUsed + remoteSpaceUsed
mac.spaceDeltas[projectID] = newTotal
return nil
}
// ResetTotals reset all space-used totals for all projects back to zero. This
// would normally be done in concert with calculating new tally counts in the
// accountingDB.
func (mac *memoryLiveAccounting) ResetTotals(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
mac.log.Debug("Resetting real-time accounting data")
mac.spaceMapLock.Lock()
mac.spaceDeltas = make(map[uuid.UUID]int64)
mac.spaceMapLock.Unlock()
return nil
}
// Close matches the accounting.LiveAccounting interface.
func (mac *memoryLiveAccounting) Close() error { return nil }

View File

@ -47,23 +47,36 @@ func (cache *redisLiveAccounting) GetProjectStorageUsage(ctx context.Context, pr
}
// AddProjectStorageUsage lets the live accounting know that the given
// project has just added inlineSpaceUsed bytes of inline space usage
// and remoteSpaceUsed bytes of remote space usage.
func (cache *redisLiveAccounting) AddProjectStorageUsage(ctx context.Context, projectID uuid.UUID, inlineSpaceUsed, remoteSpaceUsed int64) (err error) {
defer mon.Task()(&ctx, projectID, inlineSpaceUsed, remoteSpaceUsed)(&err)
if inlineSpaceUsed < 0 || remoteSpaceUsed < 0 {
return Error.New("Used space amounts must be greater than 0. Inline: %d, Remote: %d", inlineSpaceUsed, remoteSpaceUsed)
}
return cache.client.IncrBy(ctx, projectID[:], inlineSpaceUsed+remoteSpaceUsed)
// project has just added spaceUsed bytes of storage (from the user's
// perspective; i.e. segment size).
func (cache *redisLiveAccounting) AddProjectStorageUsage(ctx context.Context, projectID uuid.UUID, spaceUsed int64) (err error) {
defer mon.Task()(&ctx, projectID, spaceUsed)(&err)
return cache.client.IncrBy(ctx, projectID[:], spaceUsed)
}
// ResetTotals reset all space-used totals for all projects back to zero. This
// would normally be done in concert with calculating new tally counts in the
// accountingDB.
func (cache *redisLiveAccounting) ResetTotals(ctx context.Context) (err error) {
// GetAllProjectTotals iterates through the live accounting DB and returns a map of project IDs and totals.
func (cache *redisLiveAccounting) GetAllProjectTotals(ctx context.Context) (_ map[uuid.UUID]int64, err error) {
defer mon.Task()(&ctx)(&err)
cache.log.Debug("Resetting real-time accounting data")
return cache.client.FlushDB()
projects := make(map[uuid.UUID]int64)
err = cache.client.Iterate(ctx, storage.IterateOptions{Recurse: true}, func(ctx context.Context, it storage.Iterator) error {
var item storage.ListItem
for it.Next(ctx, &item) {
if item.Key == nil {
return Error.New("nil key")
}
id := new(uuid.UUID)
copy(id[:], item.Key[:])
intval, err := strconv.Atoi(string(item.Value))
if err != nil {
return Error.New("could not get total for project %s", id.String())
}
projects[*id] = int64(intval)
}
return nil
})
return projects, err
}
// Close the DB connection.

View File

@ -83,10 +83,7 @@ func (usage *Service) ExceedsBandwidthUsage(ctx context.Context, projectID uuid.
return false, limit, nil
}
// ExceedsStorageUsage returns true if the storage usage limits have been exceeded
// for a project in the past month (30 days). The usage limit is (e.g. 25GB) multiplied by the redundancy
// expansion factor, so that the uplinks have a raw limit.
// Ref: https://storjlabs.atlassian.net/browse/V3-1274
// ExceedsStorageUsage returns true if the storage usage for a project is currently over that project's limit.
func (usage *Service) ExceedsStorageUsage(ctx context.Context, projectID uuid.UUID) (_ bool, limit memory.Size, err error) {
defer mon.Task()(&ctx)(&err)
@ -110,8 +107,7 @@ func (usage *Service) ExceedsStorageUsage(ctx context.Context, projectID uuid.UU
return false, 0, ErrProjectUsage.Wrap(err)
}
maxUsage := limit.Int64() * int64(ExpansionFactor)
if totalUsed >= maxUsage {
if totalUsed >= limit.Int64() {
return true, limit, nil
}
@ -122,15 +118,9 @@ func (usage *Service) ExceedsStorageUsage(ctx context.Context, projectID uuid.UU
func (usage *Service) GetProjectStorageTotals(ctx context.Context, projectID uuid.UUID) (total int64, err error) {
defer mon.Task()(&ctx, projectID)(&err)
lastCountInline, lastCountRemote, err := usage.projectAccountingDB.GetStorageTotals(ctx, projectID)
if err != nil {
return 0, ErrProjectUsage.Wrap(err)
}
cachedTotal, err := usage.liveAccounting.GetProjectStorageUsage(ctx, projectID)
if err != nil {
return 0, ErrProjectUsage.Wrap(err)
}
return lastCountInline + lastCountRemote + cachedTotal, nil
total, err = usage.liveAccounting.GetProjectStorageUsage(ctx, projectID)
return total, ErrProjectUsage.Wrap(err)
}
// GetProjectBandwidthTotals returns total amount of allocated bandwidth used for past 30 days.
@ -181,9 +171,9 @@ func (usage *Service) UpdateProjectLimits(ctx context.Context, projectID uuid.UU
}
// AddProjectStorageUsage lets the live accounting know that the given
// project has just added inlineSpaceUsed bytes of inline space usage
// and remoteSpaceUsed bytes of remote space usage.
func (usage *Service) AddProjectStorageUsage(ctx context.Context, projectID uuid.UUID, inlineSpaceUsed, remoteSpaceUsed int64) (err error) {
defer mon.Task()(&ctx)(&err)
return usage.liveAccounting.AddProjectStorageUsage(ctx, projectID, inlineSpaceUsed, remoteSpaceUsed)
// project has just added spaceUsed bytes of storage (from the user's
// perspective; i.e. segment size).
func (usage *Service) AddProjectStorageUsage(ctx context.Context, projectID uuid.UUID, spaceUsed int64) (err error) {
defer mon.Task()(&ctx, projectID)(&err)
return usage.liveAccounting.AddProjectStorageUsage(ctx, projectID, spaceUsed)
}

View File

@ -4,20 +4,26 @@
package accounting_test
import (
"context"
"encoding/binary"
"fmt"
"sync/atomic"
"testing"
"time"
"github.com/skyrings/skyring-common/tools/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"storj.io/common/errs2"
"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/rpc/rpcstatus"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
@ -40,46 +46,75 @@ func TestProjectUsageStorage(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Rollup.MaxAlphaUsage = 2 * memory.MB
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
planet.Satellites[0].Accounting.Tally.Loop.Pause()
projectID := planet.Uplinks[0].ProjectID[planet.Satellites[0].ID()]
saDB := planet.Satellites[0].DB
acctDB := saDB.ProjectAccounting()
var uploaded uint32
// Setup: create a new project to use the projectID
projects, err := planet.Satellites[0].DB.Console().Projects().GetAll(ctx)
require.NoError(t, err)
projectID := projects[0].ID
checkctx, checkcancel := context.WithCancel(ctx)
defer checkcancel()
projectUsage := planet.Satellites[0].Accounting.ProjectUsage
var group errgroup.Group
group.Go(func() error {
// wait things to be uploaded
for atomic.LoadUint32(&uploaded) == 0 {
if !sync2.Sleep(checkctx, time.Microsecond) {
return nil
}
}
for {
if !sync2.Sleep(checkctx, time.Microsecond) {
return nil
}
total, err := planet.Satellites[0].Accounting.ProjectUsage.GetProjectStorageTotals(ctx, projectID)
if err != nil {
return errs.Wrap(err)
}
if total == 0 {
return errs.New("got 0 from GetProjectStorageTotals")
}
}
})
for _, ttc := range cases {
testCase := ttc
t.Run(testCase.name, func(t *testing.T) {
// Setup: create some bytes for the uplink to upload
expectedData := testrand.Bytes(1 * memory.MB)
// Setup: create BucketStorageTally records to test exceeding storage project limit
// Setup: upload data to test exceeding storage project limit
if testCase.expectedResource == "storage" {
now := time.Now()
err := setUpStorageTallies(ctx, projectID, acctDB, 25, now)
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path/0", expectedData)
atomic.StoreUint32(&uploaded, 1)
require.NoError(t, err)
// by triggering tally we ensure that this segment is 100% accounted for
planet.Satellites[0].Accounting.Tally.Loop.TriggerWait()
}
actualExceeded, _, err := projectUsage.ExceedsStorageUsage(ctx, projectID)
require.NoError(t, err)
require.Equal(t, testCase.expectedExceeded, actualExceeded)
// Setup: create some bytes for the uplink to upload
expectedData := testrand.Bytes(50 * memory.KiB)
// Execute test: check that the uplink gets an error when they have exceeded storage limits and try to upload a file
actualErr := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path", expectedData)
actualErr := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path/1", expectedData)
atomic.StoreUint32(&uploaded, 1)
if testCase.expectedResource == "storage" {
require.True(t, errs2.IsRPC(actualErr, testCase.expectedStatus))
} else {
require.NoError(t, actualErr)
}
planet.Satellites[0].Accounting.Tally.Loop.TriggerWait()
})
}
checkcancel()
if err := group.Wait(); err != nil {
t.Fatal(err)
}
})
}
@ -100,7 +135,6 @@ func TestProjectUsageBandwidth(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
planet.Satellites[0].Accounting.Tally.Loop.Pause()
saDB := planet.Satellites[0].DB
orderDB := saDB.Orders()
@ -151,28 +185,6 @@ func createBucketID(projectID uuid.UUID, bucket []byte) []byte {
return []byte(storj.JoinPaths(entries...))
}
func setUpStorageTallies(ctx *testcontext.Context, projectID uuid.UUID, acctDB accounting.ProjectAccounting, numberOfGB int, time time.Time) error {
// Create many records that sum greater than project usage limit of 25GB
for i := 0; i < numberOfGB; i++ {
bucketName := fmt.Sprintf("%s%d", "testbucket", i)
tally := accounting.BucketStorageTally{
BucketName: bucketName,
ProjectID: projectID,
IntervalStart: time,
// In order to exceed the project limits, create storage tally records
// that sum greater than the maxAlphaUsage * expansionFactor
RemoteBytes: memory.GB.Int64() * accounting.ExpansionFactor,
}
err := acctDB.CreateStorageTally(ctx, tally)
if err != nil {
return err
}
}
return nil
}
func createBucketBandwidthRollups(ctx *testcontext.Context, satelliteDB satellite.DB, projectID uuid.UUID) (int64, error) {
var expectedSum int64
ordersDB := satelliteDB.Orders()
@ -274,9 +286,8 @@ func TestProjectUsageCustomLimit(t *testing.T) {
projectUsage := planet.Satellites[0].Accounting.ProjectUsage
// Setup: create BucketStorageTally records to test exceeding storage project limit
now := time.Now()
err = setUpStorageTallies(ctx, project.ID, acctDB, 11, now)
// Setup: add data to live accounting to exceed new limit
err = projectUsage.AddProjectStorageUsage(ctx, project.ID, expectedLimit.Int64())
require.NoError(t, err)
actualExceeded, limit, err := projectUsage.ExceedsStorageUsage(ctx, project.ID)

View File

@ -7,6 +7,7 @@ import (
"context"
"time"
"github.com/skyrings/skyring-common/tools/uuid"
"github.com/zeebo/errs"
"go.uber.org/zap"
"gopkg.in/spacemonkeygo/monkit.v2"
@ -78,14 +79,7 @@ func (service *Service) Close() error {
func (service *Service) Tally(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
// The live accounting store will only keep a delta to space used relative
// to the latest tally. Since a new tally is beginning, we will zero it out
// now. There is a window between this call and the point where the tally DB
// transaction starts, during which some changes in space usage may be
// double-counted (counted in the tally and also counted as a delta to
// the tally). If that happens, it will be fixed at the time of the next
// tally run.
err = service.liveAccounting.ResetTotals(ctx)
initialLiveTotals, err := service.liveAccounting.GetAllProjectTotals(ctx)
if err != nil {
return Error.Wrap(err)
}
@ -123,10 +117,25 @@ func (service *Service) Tally(ctx context.Context) (err error) {
}
if len(observer.Bucket) > 0 {
// record bucket tallies to DB
err = service.projectAccountingDB.SaveTallies(ctx, finishTime, observer.Bucket)
if err != nil {
errAtRest = errs.New("ProjectAccounting.SaveTallies failed: %v", err)
}
// update live accounting totals
tallyProjectTotals := projectTotalsFromBuckets(observer.Bucket)
latestLiveTotals, err := service.liveAccounting.GetAllProjectTotals(ctx)
if err != nil {
return Error.Wrap(err)
}
for projectID, latest := range latestLiveTotals {
delta := latest - initialLiveTotals[projectID]
err = service.liveAccounting.AddProjectStorageUsage(ctx, projectID, -latest+tallyProjectTotals[projectID]+(delta/2))
if err != nil {
return Error.Wrap(err)
}
}
}
// report bucket metrics
@ -234,5 +243,13 @@ func (observer *Observer) RemoteSegment(ctx context.Context, path metainfo.Scope
return nil
}
func projectTotalsFromBuckets(buckets map[string]*accounting.BucketTally) map[uuid.UUID]int64 {
projectTallyTotals := make(map[uuid.UUID]int64)
for _, bucket := range buckets {
projectTallyTotals[bucket.ProjectID] += (bucket.InlineBytes + bucket.RemoteBytes)
}
return projectTallyTotals
}
// using custom name to avoid breaking monitoring
var monAccounting = monkit.ScopeNamed("storj.io/storj/satellite/accounting")

View File

@ -202,6 +202,52 @@ func TestCalculateBucketAtRestData(t *testing.T) {
})
}
func TestTallyLiveAccounting(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
tally := planet.Satellites[0].Accounting.Tally
projectID := planet.Uplinks[0].ProjectID[planet.Satellites[0].ID()]
tally.Loop.Pause()
expectedData := testrand.Bytes(5 * memory.MB)
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path", expectedData)
require.NoError(t, err)
key, err := planet.Satellites[0].Metainfo.Database.List(ctx, nil, 10)
require.NoError(t, err)
require.Len(t, key, 1)
ptr, err := planet.Satellites[0].Metainfo.Service.Get(ctx, key[0].String())
require.NoError(t, err)
require.NotNil(t, ptr)
segmentSize := ptr.GetSegmentSize()
tally.Loop.TriggerWait()
expectedSize := segmentSize
total, err := planet.Satellites[0].Accounting.ProjectUsage.GetProjectStorageTotals(ctx, projectID)
require.NoError(t, err)
require.Equal(t, expectedSize, total)
for i := 0; i < 5; i++ {
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", fmt.Sprintf("test/path/%d", i), expectedData)
require.NoError(t, err)
tally.Loop.TriggerWait()
expectedSize += segmentSize
total, err := planet.Satellites[0].Accounting.ProjectUsage.GetProjectStorageTotals(ctx, projectID)
require.NoError(t, err)
require.Equal(t, expectedSize, total)
}
})
}
// addBucketTally creates a new expected bucket tally based on the
// pointer that was just created for the test case
func addBucketTally(existingTally *accounting.BucketTally, inline, last bool) *accounting.BucketTally {

View File

@ -27,6 +27,7 @@ import (
"storj.io/storj/satellite/payments/stripecoinpayments"
"storj.io/storj/satellite/rewards"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
"storj.io/storj/storage/redis/redisserver"
)
// discardSender discard sending of an actual email
@ -68,7 +69,12 @@ func TestGrapqhlMutation(t *testing.T) {
0, 0, 0,
)
cache, err := live.NewCache(log.Named("cache"), live.Config{StorageBackend: "memory"})
miniredis := redisserver.NewMini()
addr, cleanup, err := miniredis.Run()
require.NoError(t, err)
defer cleanup()
cache, err := live.NewCache(log.Named("cache"), live.Config{StorageBackend: "redis://" + addr + "?db=0"})
require.NoError(t, err)
projectUsage := accounting.NewService(db.ProjectAccounting(), cache, 0)

View File

@ -25,6 +25,7 @@ import (
"storj.io/storj/satellite/payments/stripecoinpayments"
"storj.io/storj/satellite/rewards"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
"storj.io/storj/storage/redis/redisserver"
)
func TestGraphqlQuery(t *testing.T) {
@ -53,7 +54,12 @@ func TestGraphqlQuery(t *testing.T) {
0, 0, 0,
)
cache, err := live.NewCache(log.Named("cache"), live.Config{StorageBackend: "memory"})
miniredis := redisserver.NewMini()
addr, cleanup, err := miniredis.Run()
require.NoError(t, err)
defer cleanup()
cache, err := live.NewCache(log.Named("cache"), live.Config{StorageBackend: "redis://" + addr + "?db=0"})
require.NoError(t, err)
projectUsage := accounting.NewService(db.ProjectAccounting(), cache, 0)

View File

@ -215,12 +215,13 @@ func (endpoint *Endpoint) CreateSegmentOld(ctx context.Context, req *pb.SegmentW
return &pb.SegmentWriteResponseOld{AddressedLimits: addressedLimits, RootPieceId: rootPieceID, PrivateKey: piecePrivateKey}, nil
}
func calculateSpaceUsed(ptr *pb.Pointer) (inlineSpace, remoteSpace int64) {
func calculateSpaceUsed(ptr *pb.Pointer) (segmentSize, totalStored int64) {
inline := ptr.GetInlineSegment()
if inline != nil {
return int64(len(inline)), 0
inlineSize := int64(len(inline))
return inlineSize, inlineSize
}
segmentSize := ptr.GetSegmentSize()
segmentSize = ptr.GetSegmentSize()
remote := ptr.GetRemote()
if remote == nil {
return 0, 0
@ -228,7 +229,7 @@ func calculateSpaceUsed(ptr *pb.Pointer) (inlineSpace, remoteSpace int64) {
minReq := remote.GetRedundancy().GetMinReq()
pieceSize := segmentSize / int64(minReq)
pieces := remote.GetRemotePieces()
return 0, pieceSize * int64(len(pieces))
return segmentSize, pieceSize * int64(len(pieces))
}
// CommitSegmentOld commits segment metadata
@ -282,19 +283,19 @@ func (endpoint *Endpoint) CommitSegmentOld(ctx context.Context, req *pb.SegmentC
}
req.Pointer.PieceHashesVerified = true
inlineUsed, remoteUsed := calculateSpaceUsed(req.Pointer)
segmentSize, totalStored := calculateSpaceUsed(req.Pointer)
// ToDo: Replace with hash & signature validation
// Ensure neither uplink or storage nodes are cheating on us
if req.Pointer.Type == pb.Pointer_REMOTE {
//We cannot have more redundancy than total/min
if float64(remoteUsed) > (float64(req.Pointer.SegmentSize)/float64(req.Pointer.Remote.Redundancy.MinReq))*float64(req.Pointer.Remote.Redundancy.Total) {
endpoint.log.Sugar().Debugf("data size mismatch, got segment: %d, pieces: %d, RS Min, Total: %d,%d", req.Pointer.SegmentSize, remoteUsed, req.Pointer.Remote.Redundancy.MinReq, req.Pointer.Remote.Redundancy.Total)
if float64(totalStored) > (float64(req.Pointer.SegmentSize)/float64(req.Pointer.Remote.Redundancy.MinReq))*float64(req.Pointer.Remote.Redundancy.Total) {
endpoint.log.Sugar().Debugf("data size mismatch, got segment: %d, pieces: %d, RS Min, Total: %d,%d", req.Pointer.SegmentSize, totalStored, req.Pointer.Remote.Redundancy.MinReq, req.Pointer.Remote.Redundancy.Total)
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "mismatched segment size and piece usage")
}
}
if err := endpoint.projectUsage.AddProjectStorageUsage(ctx, keyInfo.ProjectID, inlineUsed, remoteUsed); err != nil {
if err := endpoint.projectUsage.AddProjectStorageUsage(ctx, keyInfo.ProjectID, segmentSize); err != nil {
endpoint.log.Sugar().Errorf("Could not track new storage usage by project %q: %v", keyInfo.ProjectID, err)
// but continue. it's most likely our own fault that we couldn't track it, and the only thing
// that will be affected is our per-project bandwidth and storage limits.
@ -1638,16 +1639,16 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm
piece.Hash = nil
}
inlineUsed, remoteUsed := calculateSpaceUsed(pointer)
segmentSize, totalStored := calculateSpaceUsed(pointer)
// ToDo: Replace with hash & signature validation
// Ensure neither uplink or storage nodes are cheating on us
if pointer.Type == pb.Pointer_REMOTE {
//We cannot have more redundancy than total/min
if float64(remoteUsed) > (float64(pointer.SegmentSize)/float64(pointer.Remote.Redundancy.MinReq))*float64(pointer.Remote.Redundancy.Total) {
if float64(totalStored) > (float64(pointer.SegmentSize)/float64(pointer.Remote.Redundancy.MinReq))*float64(pointer.Remote.Redundancy.Total) {
endpoint.log.Debug("data size mismatch",
zap.Int64("segment", pointer.SegmentSize),
zap.Int64("pieces", remoteUsed),
zap.Int64("pieces", totalStored),
zap.Int32("redundancy minimum requested", pointer.Remote.Redundancy.MinReq),
zap.Int32("redundancy total", pointer.Remote.Redundancy.Total),
)
@ -1655,7 +1656,7 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm
}
}
if err := endpoint.projectUsage.AddProjectStorageUsage(ctx, keyInfo.ProjectID, inlineUsed, remoteUsed); err != nil {
if err := endpoint.projectUsage.AddProjectStorageUsage(ctx, keyInfo.ProjectID, segmentSize); err != nil {
endpoint.log.Error("Could not track new storage usage by project",
zap.Stringer("Project ID", keyInfo.ProjectID),
zap.Error(err),
@ -1715,7 +1716,7 @@ func (endpoint *Endpoint) MakeInlineSegment(ctx context.Context, req *pb.Segment
inlineUsed := int64(len(req.EncryptedInlineData))
if err := endpoint.projectUsage.AddProjectStorageUsage(ctx, keyInfo.ProjectID, inlineUsed, 0); err != nil {
if err := endpoint.projectUsage.AddProjectStorageUsage(ctx, keyInfo.ProjectID, inlineUsed); err != nil {
endpoint.log.Sugar().Errorf("Could not track new storage usage by project %v: %v", keyInfo.ProjectID, err)
// but continue. it's most likely our own fault that we couldn't track it, and the only thing
// that will be affected is our per-project bandwidth and storage limits.

View File

@ -167,7 +167,7 @@ identity.cert-path: /root/.local/share/storj/identity/satellite/identity.cert
identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
# what to use for storing real-time accounting data
# live-accounting.storage-backend: memory
# live-accounting.storage-backend: ""
# if true, log function filename and line number
# log.caller: false

View File

@ -30,6 +30,11 @@ const (
fallbackPort = 6379
)
// Mini is a wrapper for *miniredis.MiniRedis which implements the io.Closer interface.
type Mini struct {
server *miniredis.Miniredis
}
func freeport() (addr string, port int) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
@ -49,7 +54,8 @@ func Start() (addr string, cleanup func(), err error) {
addr, cleanup, err = Process()
if err != nil {
log.Println("failed to start redis-server: ", err)
return Mini()
mini := NewMini()
return mini.Run()
}
return addr, cleanup, err
}
@ -145,14 +151,26 @@ func pingServer(addr string) error {
return client.Ping().Err()
}
// Mini starts miniredis server
func Mini() (addr string, cleanup func(), err error) {
server, err := miniredis.Run()
// NewMini creates a new Mini.
func NewMini() *Mini {
return &Mini{
server: miniredis.NewMiniRedis(),
}
}
// Run starts the miniredis server.
func (mini *Mini) Run() (addr string, cleanup func(), err error) {
err = mini.server.Start()
if err != nil {
return "", nil, err
}
return server.Addr(), func() {
server.Close()
return mini.server.Addr(), func() {
mini.server.Close()
}, nil
}
// Close closes the miniredis server.
func (mini *Mini) Close() error {
mini.server.Close()
return nil
}