diff --git a/pkg/accounting/live/doc.go b/pkg/accounting/live/doc.go new file mode 100644 index 000000000..8f3620cb9 --- /dev/null +++ b/pkg/accounting/live/doc.go @@ -0,0 +1,9 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +/* +Package live provides live accounting functionality. That is, it keeps track +of deltas in the amount of storage used by each project relative to the last +tally operation (see pkg/accounting/tally). +*/ +package live diff --git a/pkg/accounting/live/live.go b/pkg/accounting/live/live.go new file mode 100644 index 000000000..014e72fc1 --- /dev/null +++ b/pkg/accounting/live/live.go @@ -0,0 +1,102 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package live + +import ( + "context" + "strings" + "sync" + + "github.com/skyrings/skyring-common/tools/uuid" + "github.com/zeebo/errs" + "go.uber.org/zap" +) + +// Config contains configurable values for the live accounting service. +type Config struct { + StorageBackend string `help:"what to use for storing real-time accounting data"` +} + +// Service represents the external interface to the live accounting +// functionality. +type Service interface { + GetProjectStorageUsage(ctx context.Context, projectID uuid.UUID) (int64, int64, error) + AddProjectStorageUsage(ctx context.Context, projectID uuid.UUID, inlineSpaceUsed, remoteSpaceUsed int64) error + ResetTotals() +} + +// New creates a new live.Service instance of the type specified in +// the provided config. +func New(log *zap.Logger, config Config) (Service, error) { + parts := strings.SplitN(config.StorageBackend, ":", 2) + var backendType string + if len(parts) == 0 || parts[0] == "" { + backendType = "plainmemory" + } else { + backendType = parts[0] + } + switch backendType { + case "plainmemory": + return newPlainMemoryLiveAccounting(log) + } + return nil, errs.New("unrecognized live accounting backend specifier %q", backendType) +} + +// plainMemoryLiveAccounting represents an live.Service-implementing +// instance using plain memory (no coordination with other servers). It can be +// used to coordinate tracking of how much space a project has used. +// +// This should probably only be used at small scale or for testing areas where +// the accounting cache does not matter significantly. For production, an +// implementation that allows multiple servers to participate together would +// be preferable. +type plainMemoryLiveAccounting struct { + log *zap.Logger + + spaceMapLock sync.RWMutex + spaceDeltas map[uuid.UUID]spaceUsedAccounting +} + +type spaceUsedAccounting struct { + inlineSpace int64 + remoteSpace int64 +} + +func newPlainMemoryLiveAccounting(log *zap.Logger) (*plainMemoryLiveAccounting, error) { + pmac := &plainMemoryLiveAccounting{log: log} + pmac.ResetTotals() + return pmac, nil +} + +// GetProjectStorageUsage gets inline and remote storage totals for a given +// project, back to the time of the last accounting tally. +func (pmac *plainMemoryLiveAccounting) GetProjectStorageUsage(ctx context.Context, projectID uuid.UUID) (inlineTotal, remoteTotal int64, err error) { + pmac.spaceMapLock.Lock() + defer pmac.spaceMapLock.Unlock() + curVal := pmac.spaceDeltas[projectID] + return curVal.inlineSpace, curVal.remoteSpace, nil +} + +// AddProjectStorageUsage lets the live accounting know that the given +// project has just added inlineSpaceUsed bytes of inline space usage +// and remoteSpaceUsed bytes of remote space usage. +func (pmac *plainMemoryLiveAccounting) AddProjectStorageUsage(ctx context.Context, projectID uuid.UUID, inlineSpaceUsed, remoteSpaceUsed int64) error { + pmac.spaceMapLock.Lock() + defer pmac.spaceMapLock.Unlock() + curVal := pmac.spaceDeltas[projectID] + curVal.inlineSpace += inlineSpaceUsed + curVal.remoteSpace += remoteSpaceUsed + pmac.spaceDeltas[projectID] = curVal + return nil +} + +// ResetTotals reset all space-used totals for all projects back to zero. This +// would normally be done in concert with calculating new tally counts in the +// accountingDB. +func (pmac *plainMemoryLiveAccounting) ResetTotals() { + pmac.log.Info("Resetting real-time accounting data") + pmac.spaceMapLock.Lock() + pmac.spaceDeltas = make(map[uuid.UUID]spaceUsedAccounting) + pmac.spaceMapLock.Unlock() +} diff --git a/pkg/accounting/live/live_test.go b/pkg/accounting/live/live_test.go new file mode 100644 index 000000000..c851993f9 --- /dev/null +++ b/pkg/accounting/live/live_test.go @@ -0,0 +1,99 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package live + +import ( + "context" + "encoding/binary" + "math/rand" + "testing" + + "github.com/skyrings/skyring-common/tools/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +func TestPlainMemoryLiveAccounting(t *testing.T) { + const ( + valuesListSize = 1000 + valueMultiplier = 4096 + numProjects = 200 + ) + config := Config{ + StorageBackend: "plainmemory:", + } + service, err := New(zap.L().Named("live-accounting"), config) + require.NoError(t, err) + + // ensure we are using the expected underlying type + _, ok := service.(*plainMemoryLiveAccounting) + require.True(t, ok) + + // make a largish list of varying values + someValues := make([]int64, valuesListSize) + sum := int64(0) + for i := range someValues { + someValues[i] = int64((i + 1) * valueMultiplier) + sum += someValues[i] + } + + // make up some project IDs + projectIDs := make([]uuid.UUID, numProjects) + for i := range projectIDs { + var u uuid.UUID + binary.BigEndian.PutUint64(u[len(u)-8:], uint64(i)) + projectIDs[i] = u + } + + // send lots of space used updates for all of these projects to the live + // accounting store. + errg, ctx := errgroup.WithContext(context.Background()) + for _, projID := range projectIDs { + projID := projID + errg.Go(func() error { + // have each project sending the values in a different order + myValues := make([]int64, valuesListSize) + copy(myValues, someValues) + rand.Shuffle(valuesListSize, func(v1, v2 int) { + myValues[v1], myValues[v2] = myValues[v2], myValues[v1] + }) + + for _, val := range myValues { + if err := service.AddProjectStorageUsage(ctx, projID, val, val); err != nil { + return err + } + } + return nil + }) + } + require.NoError(t, errg.Wait()) + + // make sure all of the "projects" got all space updates and got right totals + for _, projID := range projectIDs { + inlineUsed, remoteUsed, err := service.GetProjectStorageUsage(ctx, projID) + require.NoError(t, err) + assert.Equalf(t, sum, inlineUsed, "projectID %v", projID) + assert.Equalf(t, sum, remoteUsed, "projectID %v", projID) + } +} + +func TestResetTotals(t *testing.T) { + config := Config{ + StorageBackend: "plainmemory:", + } + service, err := New(zap.L().Named("live-accounting"), config) + require.NoError(t, err) + + // ensure we are using the expected underlying type + _, ok := service.(*plainMemoryLiveAccounting) + require.True(t, ok) + + ctx := context.Background() + projID, err := uuid.New() + require.NoError(t, err) + err = service.AddProjectStorageUsage(ctx, *projID, 0, -20) + require.NoError(t, err) +} diff --git a/pkg/accounting/tally/tally.go b/pkg/accounting/tally/tally.go index 78c6c5abd..9cfb63354 100644 --- a/pkg/accounting/tally/tally.go +++ b/pkg/accounting/tally/tally.go @@ -12,6 +12,7 @@ import ( "go.uber.org/zap" "storj.io/storj/pkg/accounting" + "storj.io/storj/pkg/accounting/live" "storj.io/storj/pkg/overlay" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/storj" @@ -26,23 +27,25 @@ type Config struct { // Service is the tally service for data stored on each storage node type Service struct { - logger *zap.Logger - metainfo *metainfo.Service - overlay *overlay.Cache - limit int - ticker *time.Ticker - accountingDB accounting.DB + logger *zap.Logger + metainfo *metainfo.Service + overlay *overlay.Cache + limit int + ticker *time.Ticker + accountingDB accounting.DB + liveAccounting live.Service } // New creates a new tally Service -func New(logger *zap.Logger, accountingDB accounting.DB, metainfo *metainfo.Service, overlay *overlay.Cache, limit int, interval time.Duration) *Service { +func New(logger *zap.Logger, accountingDB accounting.DB, liveAccounting live.Service, metainfo *metainfo.Service, overlay *overlay.Cache, limit int, interval time.Duration) *Service { return &Service{ - logger: logger, - metainfo: metainfo, - overlay: overlay, - limit: limit, - ticker: time.NewTicker(interval), - accountingDB: accountingDB, + logger: logger, + metainfo: metainfo, + overlay: overlay, + limit: limit, + ticker: time.NewTicker(interval), + accountingDB: accountingDB, + liveAccounting: liveAccounting, } } @@ -65,6 +68,15 @@ func (t *Service) Run(ctx context.Context) (err error) { // Tally calculates data-at-rest usage once func (t *Service) Tally(ctx context.Context) error { + // The live accounting store will only keep a delta to space used relative + // to the latest tally. Since a new tally is beginning, we will zero it out + // now. There is a window between this call and the point where the tally DB + // transaction starts, during which some changes in space usage may be + // double-counted (counted in the tally and also counted as a delta to + // the tally). If that happens, it will be fixed at the time of the next + // tally run. + t.liveAccounting.ResetTotals() + var errAtRest, errBucketInfo error latestTally, nodeData, bucketData, err := t.CalculateAtRestData(ctx) if err != nil { diff --git a/satellite/metainfo/metainfo.go b/satellite/metainfo/metainfo.go index 1dfbc697f..a2a236d1d 100644 --- a/satellite/metainfo/metainfo.go +++ b/satellite/metainfo/metainfo.go @@ -19,6 +19,7 @@ import ( "storj.io/storj/internal/memory" "storj.io/storj/pkg/accounting" + "storj.io/storj/pkg/accounting/live" "storj.io/storj/pkg/auth" "storj.io/storj/pkg/eestream" "storj.io/storj/pkg/identity" @@ -43,26 +44,28 @@ type APIKeys interface { // Endpoint metainfo endpoint type Endpoint struct { - log *zap.Logger - metainfo *Service - orders *orders.Service - cache *overlay.Cache - apiKeys APIKeys - accountingDB accounting.DB - maxAlphaUsage memory.Size + log *zap.Logger + metainfo *Service + orders *orders.Service + cache *overlay.Cache + apiKeys APIKeys + accountingDB accounting.DB + liveAccounting live.Service + maxAlphaUsage memory.Size } // NewEndpoint creates new metainfo endpoint instance -func NewEndpoint(log *zap.Logger, metainfo *Service, orders *orders.Service, cache *overlay.Cache, apiKeys APIKeys, acctDB accounting.DB, maxAlphaUsage memory.Size) *Endpoint { +func NewEndpoint(log *zap.Logger, metainfo *Service, orders *orders.Service, cache *overlay.Cache, apiKeys APIKeys, acctDB accounting.DB, liveAccounting live.Service, maxAlphaUsage memory.Size) *Endpoint { // TODO do something with too many params return &Endpoint{ - log: log, - metainfo: metainfo, - orders: orders, - cache: cache, - apiKeys: apiKeys, - accountingDB: acctDB, - maxAlphaUsage: maxAlphaUsage, + log: log, + metainfo: metainfo, + orders: orders, + cache: cache, + apiKeys: apiKeys, + accountingDB: acctDB, + liveAccounting: liveAccounting, + maxAlphaUsage: maxAlphaUsage, } } @@ -144,10 +147,9 @@ func (endpoint *Endpoint) CreateSegment(ctx context.Context, req *pb.SegmentWrit // Check if this projectID has exceeded alpha usage limits, i.e. 25GB of bandwidth or storage used in the past month // TODO: remove this code once we no longer need usage limiting for alpha release // Ref: https://storjlabs.atlassian.net/browse/V3-1274 - bucketID := createBucketID(keyInfo.ProjectID, req.Bucket) - inlineTotal, remoteTotal, err := endpoint.accountingDB.ProjectStorageTotals(ctx, keyInfo.ProjectID) + inlineTotal, remoteTotal, err := endpoint.getProjectStorageTotals(ctx, keyInfo.ProjectID) if err != nil { - endpoint.log.Error("retrieving ProjectStorageTotals", zap.Error(err)) + endpoint.log.Error("retrieving project storage totals", zap.Error(err)) } exceeded, resource := accounting.ExceedsAlphaUsage(0, inlineTotal, remoteTotal, endpoint.maxAlphaUsage) if exceeded { @@ -180,6 +182,7 @@ func (endpoint *Endpoint) CreateSegment(ctx context.Context, req *pb.SegmentWrit return nil, status.Errorf(codes.Internal, err.Error()) } + bucketID := createBucketID(keyInfo.ProjectID, req.Bucket) rootPieceID, addressedLimits, err := endpoint.orders.CreatePutOrderLimits(ctx, uplinkIdentity, bucketID, nodes, req.Expiration, maxPieceSize) if err != nil { return nil, Error.Wrap(err) @@ -188,6 +191,34 @@ func (endpoint *Endpoint) CreateSegment(ctx context.Context, req *pb.SegmentWrit return &pb.SegmentWriteResponse{AddressedLimits: addressedLimits, RootPieceId: rootPieceID}, nil } +func (endpoint *Endpoint) getProjectStorageTotals(ctx context.Context, projectID uuid.UUID) (int64, int64, error) { + lastCountInline, lastCountRemote, err := endpoint.accountingDB.ProjectStorageTotals(ctx, projectID) + if err != nil { + return 0, 0, err + } + rtInline, rtRemote, err := endpoint.liveAccounting.GetProjectStorageUsage(ctx, projectID) + if err != nil { + return 0, 0, err + } + return lastCountInline + rtInline, lastCountRemote + rtRemote, nil +} + +func calculateSpaceUsed(ptr *pb.Pointer) (inlineSpace, remoteSpace int64) { + inline := ptr.GetInlineSegment() + if inline != nil { + return int64(len(inline)), 0 + } + segmentSize := ptr.GetSegmentSize() + remote := ptr.GetRemote() + if remote == nil { + return 0, 0 + } + minReq := remote.GetRedundancy().GetMinReq() + pieceSize := segmentSize / int64(minReq) + pieces := remote.GetRemotePieces() + return 0, pieceSize * int64(len(pieces)) +} + // CommitSegment commits segment metadata func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentCommitRequest) (resp *pb.SegmentCommitResponse, err error) { defer mon.Task()(&ctx)(&err) @@ -217,6 +248,13 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm return nil, status.Errorf(codes.InvalidArgument, err.Error()) } + inlineUsed, remoteUsed := calculateSpaceUsed(req.Pointer) + if err := endpoint.liveAccounting.AddProjectStorageUsage(ctx, keyInfo.ProjectID, inlineUsed, remoteUsed); err != nil { + endpoint.log.Sugar().Errorf("Could not track new storage usage by project %v: %v", keyInfo.ProjectID, err) + // but continue. it's most likely our own fault that we couldn't track it, and the only thing + // that will be affected is our per-project bandwidth and storage limits. + } + err = endpoint.metainfo.Put(path, req.Pointer) if err != nil { return nil, status.Errorf(codes.Internal, err.Error()) diff --git a/satellite/peer.go b/satellite/peer.go index dd283e148..b5f0bd36b 100644 --- a/satellite/peer.go +++ b/satellite/peer.go @@ -22,6 +22,7 @@ import ( "storj.io/storj/internal/post/oauth2" "storj.io/storj/internal/version" "storj.io/storj/pkg/accounting" + "storj.io/storj/pkg/accounting/live" "storj.io/storj/pkg/accounting/rollup" "storj.io/storj/pkg/accounting/tally" "storj.io/storj/pkg/audit" @@ -102,8 +103,9 @@ type Config struct { Repairer repairer.Config Audit audit.Config - Tally tally.Config - Rollup rollup.Config + Tally tally.Config + Rollup rollup.Config + LiveAccounting live.Config Mail mailservice.Config Console consoleweb.Config @@ -176,6 +178,10 @@ type Peer struct { Rollup *rollup.Service } + LiveAccounting struct { + Service live.Service + } + Mail struct { Service *mailservice.Service } @@ -302,6 +308,16 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config, ve peer.Discovery.Service = discovery.New(peer.Log.Named("discovery"), peer.Overlay.Service, peer.Kademlia.Service, config) } + { // setup live accounting + log.Debug("Setting up live accounting") + config := config.LiveAccounting + liveAccountingService, err := live.New(peer.Log.Named("live-accounting"), config) + if err != nil { + return nil, err + } + peer.LiveAccounting.Service = liveAccountingService + } + { // setup orders log.Debug("Setting up orders") satelliteSignee := signing.SigneeFromPeerIdentity(peer.Identity.PeerIdentity()) @@ -339,6 +355,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config, ve peer.Overlay.Service, peer.DB.Console().APIKeys(), peer.DB.Accounting(), + peer.LiveAccounting.Service, config.Rollup.MaxAlphaUsage, ) @@ -396,7 +413,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config, ve { // setup accounting log.Debug("Setting up accounting") - peer.Accounting.Tally = tally.New(peer.Log.Named("tally"), peer.DB.Accounting(), peer.Metainfo.Service, peer.Overlay.Service, 0, config.Tally.Interval) + peer.Accounting.Tally = tally.New(peer.Log.Named("tally"), peer.DB.Accounting(), peer.LiveAccounting.Service, peer.Metainfo.Service, peer.Overlay.Service, 0, config.Tally.Interval) peer.Accounting.Rollup = rollup.New(peer.Log.Named("rollup"), peer.DB.Accounting(), config.Rollup.Interval, config.Rollup.DeleteTallies) } diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index 5e5cdf5be..28c7e9ae8 100644 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -85,6 +85,9 @@ kademlia.operator.wallet: "" # size of Kademlia replacement cache # kademlia.replacement-cache-size: 5 +# what to use for storing real-time accounting data +# live-accounting.storage-backend: "" + # if true, log function filename and line number # log.caller: false