changes to masterdb to support accountingDB (#846)
* added rollup to captplanet, moved accountingDB to masterdb
This commit is contained in:
parent
7363e6cfd8
commit
f9845e7e92
@ -12,6 +12,7 @@ import (
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/pkg/accounting/rollup"
|
||||
"storj.io/storj/pkg/accounting/tally"
|
||||
"storj.io/storj/pkg/audit"
|
||||
"storj.io/storj/pkg/auth/grpcauth"
|
||||
@ -52,6 +53,7 @@ type Satellite struct {
|
||||
Web satelliteweb.Config
|
||||
Database string `help:"satellite database connection string" default:"sqlite3://$CONFDIR/master.db"`
|
||||
Tally tally.Config
|
||||
Rollup rollup.Config
|
||||
}
|
||||
|
||||
// StorageNode is for configuring storage nodes
|
||||
@ -136,6 +138,7 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
|
||||
runCfg.Satellite.BwAgreement,
|
||||
runCfg.Satellite.Web,
|
||||
runCfg.Satellite.Tally,
|
||||
runCfg.Satellite.Rollup,
|
||||
|
||||
// NB(dylan): Inspector is only used for local development and testing.
|
||||
// It should not be added to the Satellite startup
|
||||
|
@ -5,57 +5,16 @@ package accounting
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/internal/migrate"
|
||||
dbx "storj.io/storj/pkg/accounting/dbx"
|
||||
"storj.io/storj/pkg/utils"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var (
|
||||
// Error is the default accountingdb errs class
|
||||
Error = errs.Class("accountingdb")
|
||||
|
||||
// LastBandwidthTally is a name in the accounting timestamps database
|
||||
LastBandwidthTally = dbx.Timestamps_Name("LastBandwidthTally")
|
||||
)
|
||||
|
||||
// Database contains access to accounting database
|
||||
type Database struct {
|
||||
db *dbx.DB
|
||||
}
|
||||
|
||||
// NewDB - constructor for Database
|
||||
func NewDB(databaseURL string) (*Database, error) {
|
||||
driver, source, err := utils.SplitDBURL(databaseURL)
|
||||
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
db, err := dbx.Open(driver, source)
|
||||
if err != nil {
|
||||
return nil, Error.New("failed opening database %q, %q: %v",
|
||||
driver, source, err)
|
||||
}
|
||||
err = migrate.Create("accounting", db)
|
||||
if err != nil {
|
||||
return nil, utils.CombineErrors(err, db.Close())
|
||||
}
|
||||
return &Database{db: db}, nil
|
||||
}
|
||||
|
||||
// BeginTx is used to open db connection
|
||||
func (db *Database) BeginTx(ctx context.Context) (*dbx.Tx, error) {
|
||||
return db.db.Open(ctx)
|
||||
}
|
||||
|
||||
// Close is used to close db connection
|
||||
func (db *Database) Close() error {
|
||||
return db.db.Close()
|
||||
}
|
||||
|
||||
// FindLastBwTally returns the timestamp of the last bandwidth tally
|
||||
func (db *Database) FindLastBwTally(ctx context.Context) (*dbx.Value_Row, error) {
|
||||
return db.db.Find_Timestamps_Value_By_Name(ctx, LastBandwidthTally)
|
||||
//DB is an interface for interacting with accounting stuff
|
||||
type DB interface {
|
||||
// LastGranularTime records the greatest last tallied bandwidth agreement time
|
||||
LastGranularTime(ctx context.Context) (time.Time, bool, error)
|
||||
// SaveGranulars records granular tallies (sums of bw agreement values) to the database
|
||||
// and updates the LastGranularTime
|
||||
SaveGranulars(ctx context.Context, logger *zap.Logger, latestBwa time.Time, bwTotals map[string]int64) error
|
||||
}
|
||||
|
@ -1,64 +0,0 @@
|
||||
// dbx.v1 golang accounting.dbx .
|
||||
|
||||
// timestamps just allows us to save the last time/thing that happened
|
||||
model timestamps (
|
||||
key name
|
||||
|
||||
field name text
|
||||
field value timestamp ( updatable )
|
||||
)
|
||||
create timestamps ( )
|
||||
update timestamps ( where timestamps.name = ? )
|
||||
read scalar (
|
||||
select timestamps.value
|
||||
where timestamps.name = ?
|
||||
)
|
||||
|
||||
model rollup (
|
||||
key id
|
||||
|
||||
field id serial64
|
||||
field node_id text
|
||||
field start_time timestamp
|
||||
field interval int64
|
||||
field data_type int
|
||||
field created_at timestamp ( autoinsert )
|
||||
field updated_at timestamp ( autoinsert, autoupdate )
|
||||
)
|
||||
|
||||
create rollup ( )
|
||||
update rollup ( where rollup.id = ? )
|
||||
delete rollup ( where rollup.id = ? )
|
||||
read one (
|
||||
select rollup
|
||||
where rollup.id = ?
|
||||
)
|
||||
read all (
|
||||
select rollup
|
||||
where rollup.node_id = ?
|
||||
)
|
||||
|
||||
|
||||
model raw (
|
||||
key id
|
||||
|
||||
field id serial64
|
||||
field node_id text
|
||||
field interval_end_time timestamp
|
||||
field data_total int64
|
||||
field data_type int
|
||||
field created_at timestamp ( autoinsert )
|
||||
field updated_at timestamp ( autoinsert, autoupdate )
|
||||
)
|
||||
|
||||
create raw ( )
|
||||
update raw ( where raw.id = ? )
|
||||
delete raw ( where raw.id = ? )
|
||||
read one (
|
||||
select raw
|
||||
where raw.id = ?
|
||||
)
|
||||
read all (
|
||||
select raw
|
||||
where raw.node_id = ?
|
||||
)
|
File diff suppressed because it is too large
Load Diff
@ -1,27 +0,0 @@
|
||||
-- AUTOGENERATED BY gopkg.in/spacemonkeygo/dbx.v1
|
||||
-- DO NOT EDIT
|
||||
CREATE TABLE raws (
|
||||
id bigserial NOT NULL,
|
||||
node_id text NOT NULL,
|
||||
interval_end_time timestamp with time zone NOT NULL,
|
||||
data_total bigint NOT NULL,
|
||||
data_type integer NOT NULL,
|
||||
created_at timestamp with time zone NOT NULL,
|
||||
updated_at timestamp with time zone NOT NULL,
|
||||
PRIMARY KEY ( id )
|
||||
);
|
||||
CREATE TABLE rollups (
|
||||
id bigserial NOT NULL,
|
||||
node_id text NOT NULL,
|
||||
start_time timestamp with time zone NOT NULL,
|
||||
interval bigint NOT NULL,
|
||||
data_type integer NOT NULL,
|
||||
created_at timestamp with time zone NOT NULL,
|
||||
updated_at timestamp with time zone NOT NULL,
|
||||
PRIMARY KEY ( id )
|
||||
);
|
||||
CREATE TABLE timestamps (
|
||||
name text NOT NULL,
|
||||
value timestamp with time zone NOT NULL,
|
||||
PRIMARY KEY ( name )
|
||||
);
|
@ -1,27 +0,0 @@
|
||||
-- AUTOGENERATED BY gopkg.in/spacemonkeygo/dbx.v1
|
||||
-- DO NOT EDIT
|
||||
CREATE TABLE raws (
|
||||
id INTEGER NOT NULL,
|
||||
node_id TEXT NOT NULL,
|
||||
interval_end_time TIMESTAMP NOT NULL,
|
||||
data_total INTEGER NOT NULL,
|
||||
data_type INTEGER NOT NULL,
|
||||
created_at TIMESTAMP NOT NULL,
|
||||
updated_at TIMESTAMP NOT NULL,
|
||||
PRIMARY KEY ( id )
|
||||
);
|
||||
CREATE TABLE rollups (
|
||||
id INTEGER NOT NULL,
|
||||
node_id TEXT NOT NULL,
|
||||
start_time TIMESTAMP NOT NULL,
|
||||
interval INTEGER NOT NULL,
|
||||
data_type INTEGER NOT NULL,
|
||||
created_at TIMESTAMP NOT NULL,
|
||||
updated_at TIMESTAMP NOT NULL,
|
||||
PRIMARY KEY ( id )
|
||||
);
|
||||
CREATE TABLE timestamps (
|
||||
name TEXT NOT NULL,
|
||||
value TIMESTAMP NOT NULL,
|
||||
PRIMARY KEY ( name )
|
||||
);
|
@ -1,17 +0,0 @@
|
||||
// Copyright (C) 2018 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package dbx
|
||||
|
||||
// go:generate dbx.v1 schema -d postgres -d sqlite3 accounting.dbx .
|
||||
// go:generate dbx.v1 golang -d postgres -d sqlite3 -p dbx accounting.dbx .
|
||||
|
||||
import (
|
||||
"github.com/zeebo/errs"
|
||||
)
|
||||
|
||||
func init() {
|
||||
// catch dbx errors
|
||||
c := errs.Class("accountingdb")
|
||||
WrapErr = func(e *Error) error { return c.Wrap(e) }
|
||||
}
|
@ -7,24 +7,25 @@ import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/storj/pkg/accounting"
|
||||
"storj.io/storj/pkg/provider"
|
||||
)
|
||||
|
||||
// Config contains configurable values for rollup
|
||||
type Config struct {
|
||||
Interval time.Duration `help:"how frequently rollup should run" default:"30s"`
|
||||
DatabaseURL string `help:"the database connection string to use" default:"sqlite3://$CONFDIR/stats.db"`
|
||||
Interval time.Duration `help:"how frequently rollup should run" default:"30s"`
|
||||
}
|
||||
|
||||
// Initialize a rollup struct
|
||||
func (c Config) initialize(ctx context.Context) (Rollup, error) {
|
||||
db, err := accounting.NewDB(c.DatabaseURL)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
db, ok := ctx.Value("masterdb").(interface{ Accounting() accounting.DB })
|
||||
if !ok {
|
||||
return nil, Error.Wrap(errs.New("unable to get master db instance"))
|
||||
}
|
||||
return newRollup(zap.L(), db, c.Interval)
|
||||
return newRollup(zap.L(), db.Accounting(), c.Interval), nil
|
||||
}
|
||||
|
||||
// Run runs the rollup with configured values
|
||||
|
@ -20,15 +20,15 @@ type Rollup interface {
|
||||
type rollup struct {
|
||||
logger *zap.Logger
|
||||
ticker *time.Ticker
|
||||
db *accounting.Database
|
||||
db accounting.DB
|
||||
}
|
||||
|
||||
func newRollup(logger *zap.Logger, db *accounting.Database, interval time.Duration) (*rollup, error) {
|
||||
func newRollup(logger *zap.Logger, db accounting.DB, interval time.Duration) *rollup {
|
||||
return &rollup{
|
||||
logger: logger,
|
||||
ticker: time.NewTicker(interval),
|
||||
db: db,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Run the rollup loop
|
||||
|
@ -19,24 +19,21 @@ import (
|
||||
|
||||
// Config contains configurable values for tally
|
||||
type Config struct {
|
||||
Interval time.Duration `help:"how frequently tally should run" default:"30s"`
|
||||
DatabaseURL string `help:"the database connection string to use" default:"sqlite3://$CONFDIR/accounting.db"`
|
||||
Interval time.Duration `help:"how frequently tally should run" default:"30s"`
|
||||
}
|
||||
|
||||
// Initialize a tally struct
|
||||
func (c Config) initialize(ctx context.Context) (Tally, error) {
|
||||
pointerdb := pointerdb.LoadFromContext(ctx)
|
||||
overlay := overlay.LoadServerFromContext(ctx)
|
||||
db, err := accounting.NewDB(c.DatabaseURL)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
masterDB, ok := ctx.Value("masterdb").(interface{ BandwidthAgreement() bwagreement.DB })
|
||||
db, ok := ctx.Value("masterdb").(interface {
|
||||
BandwidthAgreement() bwagreement.DB
|
||||
Accounting() accounting.DB
|
||||
})
|
||||
if !ok {
|
||||
return nil, errs.New("unable to get master db instance")
|
||||
return nil, Error.Wrap(errs.New("unable to get master db instance"))
|
||||
}
|
||||
return newTally(zap.L(), db, masterDB.BandwidthAgreement(), pointerdb, overlay, 0, c.Interval), nil
|
||||
return newTally(zap.L(), db.Accounting(), db.BandwidthAgreement(), pointerdb, overlay, 0, c.Interval), nil
|
||||
}
|
||||
|
||||
// Run runs the tally with configured values
|
||||
|
@ -11,7 +11,6 @@ import (
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/storj/pkg/accounting"
|
||||
dbx "storj.io/storj/pkg/accounting/dbx"
|
||||
"storj.io/storj/pkg/bwagreement"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/pointerdb"
|
||||
@ -25,24 +24,24 @@ type Tally interface {
|
||||
}
|
||||
|
||||
type tally struct {
|
||||
pointerdb *pointerdb.Server
|
||||
overlay pb.OverlayServer
|
||||
limit int
|
||||
logger *zap.Logger
|
||||
ticker *time.Ticker
|
||||
db *accounting.Database
|
||||
bwAgreement bwagreement.DB // bwagreements database
|
||||
pointerdb *pointerdb.Server
|
||||
overlay pb.OverlayServer
|
||||
limit int
|
||||
logger *zap.Logger
|
||||
ticker *time.Ticker
|
||||
accountingDB accounting.DB
|
||||
bwAgreementDB bwagreement.DB // bwagreements database
|
||||
}
|
||||
|
||||
func newTally(logger *zap.Logger, db *accounting.Database, bwAgreement bwagreement.DB, pointerdb *pointerdb.Server, overlay pb.OverlayServer, limit int, interval time.Duration) *tally {
|
||||
func newTally(logger *zap.Logger, accountingDB accounting.DB, bwAgreementDB bwagreement.DB, pointerdb *pointerdb.Server, overlay pb.OverlayServer, limit int, interval time.Duration) *tally {
|
||||
return &tally{
|
||||
pointerdb: pointerdb,
|
||||
overlay: overlay,
|
||||
limit: limit,
|
||||
logger: logger,
|
||||
ticker: time.NewTicker(interval),
|
||||
db: db,
|
||||
bwAgreement: bwAgreement,
|
||||
pointerdb: pointerdb,
|
||||
overlay: overlay,
|
||||
limit: limit,
|
||||
logger: logger,
|
||||
ticker: time.NewTicker(interval),
|
||||
accountingDB: accountingDB,
|
||||
bwAgreementDB: bwAgreementDB,
|
||||
}
|
||||
}
|
||||
|
||||
@ -110,7 +109,10 @@ func (t *tally) calculateAtRestData(ctx context.Context) (err error) {
|
||||
return nil
|
||||
},
|
||||
)
|
||||
return t.updateRawTable(ctx, nodeData)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
return Error.Wrap(t.updateRawTable(ctx, nodeData))
|
||||
}
|
||||
|
||||
func (t *tally) updateRawTable(ctx context.Context, nodeData map[storj.NodeID]int64) error {
|
||||
@ -121,16 +123,19 @@ func (t *tally) updateRawTable(ctx context.Context, nodeData map[storj.NodeID]in
|
||||
// Query bandwidth allocation database, selecting all new contracts since the last collection run time.
|
||||
// Grouping by storage node ID and adding total of bandwidth to granular data table.
|
||||
func (t *tally) Query(ctx context.Context) error {
|
||||
lastBwTally, err := t.db.FindLastBwTally(ctx)
|
||||
lastBwTally, isNil, err := t.accountingDB.LastGranularTime(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
var bwAgreements []bwagreement.Agreement
|
||||
if lastBwTally == nil {
|
||||
if isNil {
|
||||
t.logger.Info("Tally found no existing bandwith tracking data")
|
||||
bwAgreements, err = t.bwAgreement.GetAgreements(ctx)
|
||||
bwAgreements, err = t.bwAgreementDB.GetAgreements(ctx)
|
||||
} else {
|
||||
bwAgreements, err = t.bwAgreement.GetAgreementsSince(ctx, lastBwTally.Value)
|
||||
bwAgreements, err = t.bwAgreementDB.GetAgreementsSince(ctx, lastBwTally)
|
||||
}
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
if len(bwAgreements) == 0 {
|
||||
t.logger.Info("Tally found no new bandwidth allocations")
|
||||
@ -152,46 +157,5 @@ func (t *tally) Query(ctx context.Context) error {
|
||||
bwTotals[rbad.StorageNodeId.String()] += rbad.GetTotal() // todo: check for overflow?
|
||||
}
|
||||
|
||||
//todo: consider if we actually need EndTime in granular
|
||||
if lastBwTally == nil {
|
||||
t.logger.Info("No previous bandwidth timestamp found in tally query")
|
||||
lastBwTally = &dbx.Value_Row{Value: latestBwa} //todo: something better here?
|
||||
}
|
||||
|
||||
//insert all records in a transaction so if we fail, we don't have partial info stored
|
||||
//todo: replace with a WithTx() method per DBX docs?
|
||||
tx, err := t.db.BeginTx(ctx)
|
||||
if err != nil {
|
||||
t.logger.DPanic("Failed to create DB txn in tally query")
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if err == nil {
|
||||
err = tx.Commit()
|
||||
} else {
|
||||
t.logger.Warn("DB txn was rolled back in tally query")
|
||||
err = tx.Rollback()
|
||||
}
|
||||
}()
|
||||
|
||||
//todo: switch to bulk update SQL?
|
||||
for k, v := range bwTotals {
|
||||
nID := dbx.Raw_NodeId(k)
|
||||
end := dbx.Raw_IntervalEndTime(latestBwa)
|
||||
total := dbx.Raw_DataTotal(v)
|
||||
dataType := dbx.Raw_DataType(accounting.Bandwith)
|
||||
_, err = tx.Create_Raw(ctx, nID, end, total, dataType)
|
||||
if err != nil {
|
||||
t.logger.DPanic("Create granular SQL failed in tally query")
|
||||
return err //todo: retry strategy?
|
||||
}
|
||||
}
|
||||
|
||||
//todo: move this into txn when we have masterdb?
|
||||
update := dbx.Timestamps_Update_Fields{Value: dbx.Timestamps_Value(latestBwa)}
|
||||
_, err = tx.Update_Timestamps_By_Name(ctx, accounting.LastBandwidthTally, update)
|
||||
if err != nil {
|
||||
t.logger.DPanic("Failed to update bandwith timestamp in tally query")
|
||||
}
|
||||
return err
|
||||
return Error.Wrap(t.accountingDB.SaveGranulars(ctx, t.logger, lastBwTally, bwTotals))
|
||||
}
|
||||
|
@ -13,7 +13,6 @@ import (
|
||||
|
||||
testidentity "storj.io/storj/internal/identity"
|
||||
"storj.io/storj/internal/testcontext"
|
||||
"storj.io/storj/pkg/accounting"
|
||||
"storj.io/storj/pkg/bwagreement"
|
||||
"storj.io/storj/pkg/bwagreement/test"
|
||||
"storj.io/storj/pkg/overlay"
|
||||
@ -31,15 +30,12 @@ func TestQueryNoAgreements(t *testing.T) {
|
||||
pointerdb := pointerdb.NewServer(teststore.New(), &overlay.Cache{}, zap.NewNop(), pointerdb.Config{}, nil)
|
||||
overlayServer := mocks.NewOverlay([]*pb.Node{})
|
||||
|
||||
accountingDb, err := accounting.NewDB("sqlite3://file::memory:?mode=memory&cache=shared")
|
||||
db, err := satellitedb.NewInMemory()
|
||||
assert.NoError(t, err)
|
||||
defer ctx.Check(accountingDb.Close)
|
||||
defer ctx.Check(db.Close)
|
||||
assert.NoError(t, db.CreateTables())
|
||||
|
||||
masterDB, err := satellitedb.NewInMemory()
|
||||
assert.NoError(t, err)
|
||||
defer ctx.Check(masterDB.Close)
|
||||
|
||||
tally := newTally(zap.NewNop(), accountingDb, masterDB.BandwidthAgreement(), pointerdb, overlayServer, 0, time.Second)
|
||||
tally := newTally(zap.NewNop(), db.Accounting(), db.BandwidthAgreement(), pointerdb, overlayServer, 0, time.Second)
|
||||
|
||||
err = tally.Query(ctx)
|
||||
assert.NoError(t, err)
|
||||
@ -52,18 +48,13 @@ func TestQueryWithBw(t *testing.T) {
|
||||
pointerdb := pointerdb.NewServer(teststore.New(), &overlay.Cache{}, zap.NewNop(), pointerdb.Config{}, nil)
|
||||
overlayServer := mocks.NewOverlay([]*pb.Node{})
|
||||
|
||||
accountingDb, err := accounting.NewDB("sqlite3://file::memory:?mode=memory&cache=shared")
|
||||
db, err := satellitedb.NewInMemory()
|
||||
assert.NoError(t, err)
|
||||
defer ctx.Check(accountingDb.Close)
|
||||
defer ctx.Check(db.Close)
|
||||
assert.NoError(t, db.CreateTables())
|
||||
|
||||
masterDB, err := satellitedb.NewInMemory()
|
||||
assert.NoError(t, err)
|
||||
defer ctx.Check(masterDB.Close)
|
||||
err = masterDB.CreateTables()
|
||||
assert.NoError(t, err)
|
||||
|
||||
bwDb := masterDB.BandwidthAgreement()
|
||||
tally := newTally(zap.NewNop(), accountingDb, bwDb, pointerdb, overlayServer, 0, time.Second)
|
||||
bwDb := db.BandwidthAgreement()
|
||||
tally := newTally(zap.NewNop(), db.Accounting(), bwDb, pointerdb, overlayServer, 0, time.Second)
|
||||
|
||||
//get a private key
|
||||
fiC, err := testidentity.NewTestIdentity()
|
||||
|
74
satellite/satellitedb/accounting.go
Normal file
74
satellite/satellitedb/accounting.go
Normal file
@ -0,0 +1,74 @@
|
||||
// Copyright (C) 2018 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package satellitedb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/storj/pkg/accounting"
|
||||
dbx "storj.io/storj/satellite/satellitedb/dbx"
|
||||
)
|
||||
|
||||
//database implements DB
|
||||
type accountingDB struct {
|
||||
db *dbx.DB
|
||||
}
|
||||
|
||||
// LastGranularTime records the greatest last tallied bandwidth agreement time
|
||||
func (db *accountingDB) LastGranularTime(ctx context.Context) (time.Time, bool, error) {
|
||||
lastBwTally, err := db.db.Find_Timestamps_Value_By_Name(ctx, dbx.Timestamps_Name("LastBandwidthTally"))
|
||||
if lastBwTally == nil {
|
||||
return time.Time{}, true, err
|
||||
}
|
||||
return lastBwTally.Value, false, err
|
||||
}
|
||||
|
||||
// SaveGranulars records granular tallies (sums of bw agreement values) to the database
|
||||
// and updates the LastGranularTime
|
||||
func (db *accountingDB) SaveGranulars(ctx context.Context, logger *zap.Logger, latestBwa time.Time, bwTotals map[string]int64) (err error) {
|
||||
// We use the latest bandwidth agreement value of a batch of records as the start of the next batch
|
||||
// This enables us to not use:
|
||||
// 1) local time (which may deviate from DB time)
|
||||
// 2) absolute time intervals (where in processing time could exceed the interval, causing issues)
|
||||
// 3) per-node latest times (which simply would require a lot more work, albeit more precise)
|
||||
// Any change in these assumptions would result in a change to this function
|
||||
// in particular, we should consider finding the sum of bwagreements using SQL sum() direct against the bwa table
|
||||
if len(bwTotals) == 0 {
|
||||
logger.Warn("In SaveGranulars with empty bwtotals")
|
||||
return nil
|
||||
}
|
||||
//insert all records in a transaction so if we fail, we don't have partial info stored
|
||||
tx, err := db.db.Open(ctx)
|
||||
if err != nil {
|
||||
logger.DPanic("Failed to create DB txn in tally query")
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if err == nil {
|
||||
err = tx.Commit()
|
||||
} else {
|
||||
logger.Warn("DB txn was rolled back in tally query")
|
||||
err = tx.Rollback()
|
||||
}
|
||||
}()
|
||||
//create a granular record per node id
|
||||
for k, v := range bwTotals {
|
||||
nID := dbx.Raw_NodeId(k)
|
||||
end := dbx.Raw_IntervalEndTime(latestBwa)
|
||||
total := dbx.Raw_DataTotal(v)
|
||||
dataType := dbx.Raw_DataType(accounting.Bandwith)
|
||||
_, err = tx.Create_Raw(ctx, nID, end, total, dataType)
|
||||
if err != nil {
|
||||
logger.DPanic("Create granular SQL failed in tally query")
|
||||
return err
|
||||
}
|
||||
}
|
||||
//save this batch's greatest time
|
||||
update := dbx.Timestamps_Update_Fields{Value: dbx.Timestamps_Value(latestBwa)}
|
||||
_, err = tx.Update_Timestamps_By_Name(ctx, dbx.Timestamps_Name("LastBandwidthTally"), update)
|
||||
return err
|
||||
}
|
@ -7,6 +7,7 @@ import (
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/internal/migrate"
|
||||
"storj.io/storj/pkg/accounting"
|
||||
"storj.io/storj/pkg/bwagreement"
|
||||
"storj.io/storj/pkg/datarepair/irreparable"
|
||||
"storj.io/storj/pkg/utils"
|
||||
@ -67,10 +68,10 @@ func (db *DB) BandwidthAgreement() bwagreement.DB {
|
||||
// return &repairQueueDB{db: db.db}
|
||||
// }
|
||||
|
||||
// // AccountingDB is a getter for AccountingDB repository
|
||||
// func (db *DB) AccountingDB() accounting.DB {
|
||||
// return &accountingDB{db: db.db}
|
||||
// }
|
||||
// Accounting returns database for tracking bandwidth agreements over time
|
||||
func (db *DB) Accounting() accounting.DB {
|
||||
return &accountingDB{db: db.db}
|
||||
}
|
||||
|
||||
// Irreparable returns database for storing segments that failed repair
|
||||
func (db *DB) Irreparable() irreparable.DB {
|
||||
|
@ -1,5 +1,7 @@
|
||||
// dbx.v1 golang satellitedb.dbx .
|
||||
|
||||
//--- bwagreement ---//
|
||||
|
||||
model bwagreement (
|
||||
key signature
|
||||
|
||||
@ -27,7 +29,8 @@ read all (
|
||||
where bwagreement.created_at > ?
|
||||
)
|
||||
|
||||
// datarepair.irreparableDB
|
||||
//--- datarepair.irreparableDB ---//
|
||||
|
||||
model irreparabledb (
|
||||
key segmentpath
|
||||
|
||||
@ -44,4 +47,69 @@ delete irreparabledb ( where irreparabledb.segmentpath = ? )
|
||||
read one (
|
||||
select irreparabledb
|
||||
where irreparabledb.segmentpath = ?
|
||||
)
|
||||
|
||||
//--- accounting ---//
|
||||
|
||||
// timestamps just allows us to save the last time/thing that happened
|
||||
model timestamps (
|
||||
key name
|
||||
|
||||
field name text
|
||||
field value timestamp ( updatable )
|
||||
)
|
||||
create timestamps ( )
|
||||
update timestamps ( where timestamps.name = ? )
|
||||
read scalar (
|
||||
select timestamps.value
|
||||
where timestamps.name = ?
|
||||
)
|
||||
|
||||
model rollup (
|
||||
key id
|
||||
|
||||
field id serial64
|
||||
field node_id text
|
||||
field start_time timestamp
|
||||
field interval int64
|
||||
field data_type int
|
||||
field created_at timestamp ( autoinsert )
|
||||
field updated_at timestamp ( autoinsert, autoupdate )
|
||||
)
|
||||
|
||||
create rollup ( )
|
||||
update rollup ( where rollup.id = ? )
|
||||
delete rollup ( where rollup.id = ? )
|
||||
read one (
|
||||
select rollup
|
||||
where rollup.id = ?
|
||||
)
|
||||
read all (
|
||||
select rollup
|
||||
where rollup.node_id = ?
|
||||
)
|
||||
|
||||
|
||||
model raw (
|
||||
key id
|
||||
|
||||
field id serial64
|
||||
field node_id text
|
||||
field interval_end_time timestamp
|
||||
field data_total int64
|
||||
field data_type int
|
||||
field created_at timestamp ( autoinsert )
|
||||
field updated_at timestamp ( autoinsert, autoupdate )
|
||||
)
|
||||
|
||||
create raw ( )
|
||||
update raw ( where raw.id = ? )
|
||||
delete raw ( where raw.id = ? )
|
||||
read one (
|
||||
select raw
|
||||
where raw.id = ?
|
||||
)
|
||||
read all (
|
||||
select raw
|
||||
where raw.node_id = ?
|
||||
)
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue
Block a user