sql based tally (#1166)
* WIP * wacky changes * more * it builds.... IT BUILDS!!! * fixed SQL, broke out saving tallies for testing * shorter lines * fixed SQL, moved tally_test to testplanet * lint * WIP logic error preventing PUT and GETs to same serialnum * fixed BWA test * fixed temporary brain failure * eliminated magic numbers * cleaned up satellite uplink stats * use errs.Combine instead * thrashing * fixed tally erroneous error msg * fixed tally test * lint * SQL syntax attempt to fix * spelling error * made bwa db test resist old postgres data * postgres pk error msg hunting * postgres pk error msg hunting * postgres * err might be nil? * fixed error logging bug * hopefully solved postgres issue * using rebind * moved tests to _test package * fixing test dirs * finally made sense of Egons package name feedback * UTC, array fixes
This commit is contained in:
parent
f7c692f844
commit
0f662b8e38
@ -8,7 +8,6 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"text/tabwriter"
|
||||
"time"
|
||||
|
||||
@ -18,9 +17,7 @@ import (
|
||||
|
||||
"storj.io/storj/internal/fpath"
|
||||
"storj.io/storj/pkg/cfgstruct"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/process"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/satellitedb"
|
||||
)
|
||||
@ -196,58 +193,20 @@ func cmdDiag(cmd *cobra.Command, args []string) (err error) {
|
||||
}()
|
||||
|
||||
//get all bandwidth agreements rows already ordered
|
||||
baRows, err := database.BandwidthAgreement().GetAgreements(context.Background())
|
||||
stats, err := database.BandwidthAgreement().GetUplinkStats(context.Background(), time.Time{}, time.Now())
|
||||
if err != nil {
|
||||
fmt.Printf("error reading satellite database %v: %v\n", diagCfg.Database, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Agreement is a struct that contains a bandwidth agreement and the associated signature
|
||||
type UplinkSummary struct {
|
||||
TotalBytes int64
|
||||
PutActionCount int64
|
||||
GetActionCount int64
|
||||
TotalTransactions int64
|
||||
// additional attributes add here ...
|
||||
}
|
||||
|
||||
// attributes per uplinkid
|
||||
summaries := make(map[storj.NodeID]*UplinkSummary)
|
||||
uplinkIDs := storj.NodeIDList{}
|
||||
|
||||
for _, baRow := range baRows {
|
||||
// deserializing rbad you get payerbwallocation, total & storage node id
|
||||
rbad := baRow.Agreement
|
||||
pbad := rbad.PayerAllocation
|
||||
uplinkID := pbad.UplinkId
|
||||
summary, ok := summaries[uplinkID]
|
||||
if !ok {
|
||||
summaries[uplinkID] = &UplinkSummary{}
|
||||
uplinkIDs = append(uplinkIDs, uplinkID)
|
||||
summary = summaries[uplinkID]
|
||||
}
|
||||
|
||||
// fill the summary info
|
||||
summary.TotalBytes += rbad.Total
|
||||
summary.TotalTransactions++
|
||||
switch pbad.GetAction() {
|
||||
case pb.BandwidthAction_PUT:
|
||||
summary.PutActionCount++
|
||||
case pb.BandwidthAction_GET:
|
||||
summary.GetActionCount++
|
||||
}
|
||||
}
|
||||
|
||||
// initialize the table header (fields)
|
||||
const padding = 3
|
||||
w := tabwriter.NewWriter(os.Stdout, 0, 0, padding, ' ', tabwriter.AlignRight|tabwriter.Debug)
|
||||
fmt.Fprintln(w, "UplinkID\tTotal\t# Of Transactions\tPUT Action\tGET Action\t")
|
||||
|
||||
// populate the row fields
|
||||
sort.Sort(uplinkIDs)
|
||||
for _, uplinkID := range uplinkIDs {
|
||||
summary := summaries[uplinkID]
|
||||
fmt.Fprint(w, uplinkID, "\t", summary.TotalBytes, "\t", summary.TotalTransactions, "\t", summary.PutActionCount, "\t", summary.GetActionCount, "\t\n")
|
||||
for _, s := range stats {
|
||||
fmt.Fprint(w, s.NodeID, "\t", s.TotalBytes, "\t", s.TotalTransactions, "\t", s.PutActionCount, "\t", s.GetActionCount, "\t\n")
|
||||
}
|
||||
|
||||
// display the data
|
||||
|
@ -7,13 +7,9 @@ import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/storj"
|
||||
)
|
||||
|
||||
//BWTally is a convenience alias
|
||||
type BWTally [pb.BandwidthAction_PUT_REPAIR + 1]map[storj.NodeID]int64
|
||||
|
||||
//RollupStats is a convenience alias
|
||||
type RollupStats map[time.Time]map[storj.NodeID]*Rollup
|
||||
|
||||
@ -42,18 +38,18 @@ type Rollup struct {
|
||||
|
||||
// DB stores information about bandwidth usage
|
||||
type DB interface {
|
||||
// LastRawTime records the latest last tallied time.
|
||||
LastRawTime(ctx context.Context, timestampType string) (time.Time, bool, error)
|
||||
// SaveBWRaw records raw sums of agreement values to the database and updates the LastRawTime.
|
||||
SaveBWRaw(ctx context.Context, latestBwa time.Time, isNew bool, bwTotals BWTally) error
|
||||
// LastTimestamp records the latest last tallied time.
|
||||
LastTimestamp(ctx context.Context, timestampType string) (time.Time, error)
|
||||
// SaveBWRaw records raw sums of agreement values to the database and updates the LastTimestamp.
|
||||
SaveBWRaw(ctx context.Context, tallyEnd time.Time, bwTotals map[storj.NodeID][]int64) error
|
||||
// SaveAtRestRaw records raw tallies of at-rest-data.
|
||||
SaveAtRestRaw(ctx context.Context, latestTally time.Time, isNew bool, nodeData map[storj.NodeID]float64) error
|
||||
SaveAtRestRaw(ctx context.Context, latestTally time.Time, nodeData map[storj.NodeID]float64) error
|
||||
// GetRaw retrieves all raw tallies
|
||||
GetRaw(ctx context.Context) ([]*Raw, error)
|
||||
// GetRawSince r retrieves all raw tallies sinces
|
||||
GetRawSince(ctx context.Context, latestRollup time.Time) ([]*Raw, error)
|
||||
// SaveRollup records raw tallies of at rest data to the database
|
||||
SaveRollup(ctx context.Context, latestTally time.Time, isNew bool, stats RollupStats) error
|
||||
SaveRollup(ctx context.Context, latestTally time.Time, stats RollupStats) error
|
||||
// QueryPaymentInfo queries StatDB, Accounting Rollup on nodeID
|
||||
QueryPaymentInfo(ctx context.Context, start time.Time, end time.Time) ([]*CSVRow, error)
|
||||
}
|
||||
|
@ -56,17 +56,11 @@ func (r *Rollup) Run(ctx context.Context) (err error) {
|
||||
func (r *Rollup) Query(ctx context.Context) error {
|
||||
// only Rollup new things - get LastRollup
|
||||
var latestTally time.Time
|
||||
lastRollup, isNil, err := r.db.LastRawTime(ctx, accounting.LastRollup)
|
||||
lastRollup, err := r.db.LastTimestamp(ctx, accounting.LastRollup)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
var tallies []*accounting.Raw
|
||||
if isNil {
|
||||
r.logger.Info("Rollup found no existing raw tally data")
|
||||
tallies, err = r.db.GetRaw(ctx)
|
||||
} else {
|
||||
tallies, err = r.db.GetRawSince(ctx, lastRollup)
|
||||
}
|
||||
tallies, err := r.db.GetRawSince(ctx, lastRollup)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
@ -115,5 +109,5 @@ func (r *Rollup) Query(ctx context.Context) error {
|
||||
r.logger.Info("Rollup only found tallies for today")
|
||||
return nil
|
||||
}
|
||||
return Error.Wrap(r.db.SaveRollup(ctx, latestTally, isNil, rollupStats))
|
||||
return Error.Wrap(r.db.SaveRollup(ctx, latestTally, rollupStats))
|
||||
}
|
||||
|
@ -27,7 +27,7 @@ func TestQueryOneDay(t *testing.T) {
|
||||
now := time.Now().UTC()
|
||||
later := now.Add(time.Hour * 24)
|
||||
|
||||
err := db.Accounting().SaveAtRestRaw(ctx, now, true, nodeData)
|
||||
err := db.Accounting().SaveAtRestRaw(ctx, now, nodeData)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// test should return error because we delete latest day's rollup
|
||||
@ -48,7 +48,7 @@ func TestQueryTwoDays(t *testing.T) {
|
||||
now := time.Now().UTC()
|
||||
then := now.Add(time.Hour * -24)
|
||||
|
||||
err := db.Accounting().SaveAtRestRaw(ctx, now, true, nodeData)
|
||||
err := db.Accounting().SaveAtRestRaw(ctx, now, nodeData)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// db.db.Exec("UPDATE accounting_raws SET created_at= WHERE ")
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/storj/pkg/accounting"
|
||||
@ -52,15 +53,9 @@ func (t *Tally) Run(ctx context.Context) (err error) {
|
||||
t.logger.Info("Tally service starting up")
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
for {
|
||||
err = t.calculateAtRestData(ctx)
|
||||
if err != nil {
|
||||
t.logger.Error("calculateAtRestData failed", zap.Error(err))
|
||||
if err = t.Tally(ctx); err != nil {
|
||||
t.logger.Error("Tally failed", zap.Error(err))
|
||||
}
|
||||
err = t.QueryBW(ctx)
|
||||
if err != nil {
|
||||
t.logger.Error("Query for bandwidth failed", zap.Error(err))
|
||||
}
|
||||
|
||||
select {
|
||||
case <-t.ticker.C: // wait for the next interval to happen
|
||||
case <-ctx.Done(): // or the Tally is canceled via context
|
||||
@ -69,17 +64,42 @@ func (t *Tally) Run(ctx context.Context) (err error) {
|
||||
}
|
||||
}
|
||||
|
||||
//Tally calculates data-at-rest and bandwidth usage once
|
||||
func (t *Tally) Tally(ctx context.Context) error {
|
||||
//data at rest
|
||||
var errAtRest, errBWA error
|
||||
latestTally, nodeData, err := t.calculateAtRestData(ctx)
|
||||
if err != nil {
|
||||
errAtRest = errs.New("Query for data-at-rest failed : %v", err)
|
||||
} else if len(nodeData) > 0 {
|
||||
err = t.SaveAtRestRaw(ctx, latestTally, nodeData)
|
||||
if err != nil {
|
||||
errAtRest = errs.New("Saving data-at-rest failed : %v", err)
|
||||
}
|
||||
}
|
||||
//bandwdith
|
||||
tallyEnd, bwTotals, err := t.QueryBW(ctx)
|
||||
if err != nil {
|
||||
errBWA = errs.New("Query for bandwidth failed: %v", err)
|
||||
} else if len(bwTotals) > 0 {
|
||||
err = t.SaveBWRaw(ctx, tallyEnd, bwTotals)
|
||||
if err != nil {
|
||||
errBWA = errs.New("Saving for bandwidth failed : %v", err)
|
||||
}
|
||||
}
|
||||
return errs.Combine(errAtRest, errBWA)
|
||||
}
|
||||
|
||||
// calculateAtRestData iterates through the pieces on pointerdb and calculates
|
||||
// the amount of at-rest data stored on each respective node
|
||||
func (t *Tally) calculateAtRestData(ctx context.Context) (err error) {
|
||||
func (t *Tally) calculateAtRestData(ctx context.Context) (latestTally time.Time, nodeData map[storj.NodeID]float64, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
latestTally, isNil, err := t.accountingDB.LastRawTime(ctx, accounting.LastAtRestTally)
|
||||
latestTally, err = t.accountingDB.LastTimestamp(ctx, accounting.LastAtRestTally)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
return latestTally, nodeData, Error.Wrap(err)
|
||||
}
|
||||
|
||||
var nodeData = make(map[storj.NodeID]float64)
|
||||
err = t.pointerdb.Iterate("", "", true, false,
|
||||
func(it storage.Iterator) error {
|
||||
var item storage.ListItem
|
||||
@ -119,60 +139,49 @@ func (t *Tally) calculateAtRestData(ctx context.Context) (err error) {
|
||||
},
|
||||
)
|
||||
if len(nodeData) == 0 {
|
||||
return nil
|
||||
return latestTally, nodeData, nil
|
||||
}
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
return latestTally, nodeData, Error.Wrap(err)
|
||||
}
|
||||
//store byte hours, not just bytes
|
||||
numHours := 1.0 //todo: something more considered?
|
||||
if !isNil {
|
||||
numHours = time.Now().UTC().Sub(latestTally).Hours()
|
||||
numHours := time.Now().Sub(latestTally).Hours()
|
||||
if latestTally.IsZero() {
|
||||
numHours = 1.0 //todo: something more considered?
|
||||
}
|
||||
|
||||
latestTally = time.Now().UTC()
|
||||
|
||||
latestTally = time.Now()
|
||||
for k := range nodeData {
|
||||
nodeData[k] *= numHours
|
||||
nodeData[k] *= numHours //calculate byte hours
|
||||
}
|
||||
return Error.Wrap(t.accountingDB.SaveAtRestRaw(ctx, latestTally, isNil, nodeData))
|
||||
return latestTally, nodeData, err
|
||||
}
|
||||
|
||||
// SaveAtRestRaw records raw tallies of at-rest-data and updates the LastTimestamp
|
||||
func (t *Tally) SaveAtRestRaw(ctx context.Context, latestTally time.Time, nodeData map[storj.NodeID]float64) error {
|
||||
return t.accountingDB.SaveAtRestRaw(ctx, latestTally, nodeData)
|
||||
}
|
||||
|
||||
// QueryBW queries bandwidth allocation database, selecting all new contracts since the last collection run time.
|
||||
// Grouping by action type, storage node ID and adding total of bandwidth to granular data table.
|
||||
func (t *Tally) QueryBW(ctx context.Context) error {
|
||||
lastBwTally, isNil, err := t.accountingDB.LastRawTime(ctx, accounting.LastBandwidthTally)
|
||||
func (t *Tally) QueryBW(ctx context.Context) (time.Time, map[storj.NodeID][]int64, error) {
|
||||
var bwTotals map[storj.NodeID][]int64
|
||||
now := time.Now()
|
||||
lastBwTally, err := t.accountingDB.LastTimestamp(ctx, accounting.LastBandwidthTally)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
var bwAgreements []bwagreement.Agreement
|
||||
if isNil {
|
||||
t.logger.Info("Tally found no existing bandwidth tracking data")
|
||||
bwAgreements, err = t.bwAgreementDB.GetAgreements(ctx)
|
||||
} else {
|
||||
bwAgreements, err = t.bwAgreementDB.GetAgreementsSince(ctx, lastBwTally)
|
||||
return now, bwTotals, Error.Wrap(err)
|
||||
}
|
||||
bwTotals, err = t.bwAgreementDB.GetTotals(ctx, lastBwTally, now)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
return now, bwTotals, Error.Wrap(err)
|
||||
}
|
||||
if len(bwAgreements) == 0 {
|
||||
if len(bwTotals) == 0 {
|
||||
t.logger.Info("Tally found no new bandwidth allocations")
|
||||
return nil
|
||||
return now, bwTotals, nil
|
||||
}
|
||||
|
||||
// sum totals by node id ... todo: add nodeid as SQL column so DB can do this?
|
||||
var bwTotals accounting.BWTally
|
||||
for i := range bwTotals {
|
||||
bwTotals[i] = make(map[storj.NodeID]int64)
|
||||
}
|
||||
var latestBwa time.Time
|
||||
for _, baRow := range bwAgreements {
|
||||
rba := baRow.Agreement
|
||||
if baRow.CreatedAt.After(latestBwa) {
|
||||
latestBwa = baRow.CreatedAt
|
||||
}
|
||||
bwTotals[rba.PayerAllocation.Action][rba.StorageNodeId] += rba.Total
|
||||
}
|
||||
return Error.Wrap(t.accountingDB.SaveBWRaw(ctx, latestBwa, isNil, bwTotals))
|
||||
return now, bwTotals, nil
|
||||
}
|
||||
|
||||
// SaveBWRaw records granular tallies (sums of bw agreement values) to the database and updates the LastTimestamp
|
||||
func (t *Tally) SaveBWRaw(ctx context.Context, tallyEnd time.Time, bwTotals map[storj.NodeID][]int64) error {
|
||||
return t.accountingDB.SaveBWRaw(ctx, tallyEnd, bwTotals)
|
||||
}
|
||||
|
@ -8,82 +8,70 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.uber.org/zap"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"storj.io/storj/internal/testcontext"
|
||||
"storj.io/storj/internal/testidentity"
|
||||
"storj.io/storj/internal/teststorj"
|
||||
"storj.io/storj/pkg/accounting/tally"
|
||||
"storj.io/storj/pkg/bwagreement"
|
||||
"storj.io/storj/internal/testplanet"
|
||||
"storj.io/storj/pkg/bwagreement/testbwagreement"
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/overlay/mocks"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/pointerdb"
|
||||
"storj.io/storj/satellite/satellitedb"
|
||||
"storj.io/storj/storage/teststore"
|
||||
"storj.io/storj/pkg/piecestore/psserver/psdb"
|
||||
)
|
||||
|
||||
func TestQueryNoAgreements(t *testing.T) {
|
||||
func runPlanet(t *testing.T, f func(context.Context, *testplanet.Planet)) {
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
planet, err := testplanet.New(t, 1, 1, 1)
|
||||
//time.Sleep(5 * time.Second)
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(planet.Shutdown)
|
||||
planet.Start(ctx)
|
||||
f(ctx, planet)
|
||||
}
|
||||
|
||||
// TODO: use testplanet
|
||||
|
||||
service := pointerdb.NewService(zap.NewNop(), teststore.New())
|
||||
overlayServer := mocks.NewOverlay([]*pb.Node{})
|
||||
db, err := satellitedb.NewInMemory()
|
||||
assert.NoError(t, err)
|
||||
defer ctx.Check(db.Close)
|
||||
assert.NoError(t, db.CreateTables())
|
||||
|
||||
tally := tally.New(zap.NewNop(), db.Accounting(), db.BandwidthAgreement(), service, overlayServer, 0, time.Second)
|
||||
|
||||
err = tally.QueryBW(ctx)
|
||||
assert.NoError(t, err)
|
||||
func TestQueryNoAgreements(t *testing.T) {
|
||||
runPlanet(t, func(ctx context.Context, planet *testplanet.Planet) {
|
||||
tally := planet.Satellites[0].Accounting.Tally
|
||||
_, bwTotals, err := tally.QueryBW(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, bwTotals, 0)
|
||||
})
|
||||
}
|
||||
|
||||
func TestQueryWithBw(t *testing.T) {
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
// TODO: use testplanet
|
||||
|
||||
service := pointerdb.NewService(zap.NewNop(), teststore.New())
|
||||
overlayServer := mocks.NewOverlay([]*pb.Node{})
|
||||
|
||||
db, err := satellitedb.NewInMemory()
|
||||
assert.NoError(t, err)
|
||||
defer ctx.Check(db.Close)
|
||||
|
||||
assert.NoError(t, db.CreateTables())
|
||||
|
||||
bwDb := db.BandwidthAgreement()
|
||||
tally := tally.New(zap.NewNop(), db.Accounting(), bwDb, service, overlayServer, 0, time.Second)
|
||||
|
||||
//get a private key
|
||||
fiC, err := testidentity.NewTestIdentity(ctx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
makeBWA(ctx, t, bwDb, fiC, pb.BandwidthAction_PUT)
|
||||
makeBWA(ctx, t, bwDb, fiC, pb.BandwidthAction_GET)
|
||||
makeBWA(ctx, t, bwDb, fiC, pb.BandwidthAction_GET_AUDIT)
|
||||
makeBWA(ctx, t, bwDb, fiC, pb.BandwidthAction_GET_REPAIR)
|
||||
makeBWA(ctx, t, bwDb, fiC, pb.BandwidthAction_PUT_REPAIR)
|
||||
|
||||
//check the db
|
||||
err = tally.QueryBW(ctx)
|
||||
assert.NoError(t, err)
|
||||
runPlanet(t, func(ctx context.Context, planet *testplanet.Planet) {
|
||||
tally := planet.Satellites[0].Accounting.Tally
|
||||
makeBWAs(ctx, t, planet)
|
||||
//check the db
|
||||
tallyEnd, bwTotals, err := tally.QueryBW(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, bwTotals, 1)
|
||||
for id, nodeTotals := range bwTotals {
|
||||
require.Len(t, nodeTotals, 5)
|
||||
for _, total := range nodeTotals {
|
||||
require.Equal(t, planet.StorageNodes[0].Identity.ID, id)
|
||||
require.Equal(t, int64(1000), total)
|
||||
}
|
||||
}
|
||||
err = tally.SaveBWRaw(ctx, tallyEnd, bwTotals)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func makeBWA(ctx context.Context, t *testing.T, bwDb bwagreement.DB, fiC *identity.FullIdentity, action pb.BandwidthAction) {
|
||||
//generate an agreement with the key
|
||||
pba, err := testbwagreement.GeneratePayerBandwidthAllocation(action, fiC, fiC, time.Hour)
|
||||
assert.NoError(t, err)
|
||||
rba, err := testbwagreement.GenerateRenterBandwidthAllocation(pba, teststorj.NodeIDFromString("StorageNodeID"), fiC, 666)
|
||||
assert.NoError(t, err)
|
||||
//save to db
|
||||
err = bwDb.CreateAgreement(ctx, rba)
|
||||
assert.NoError(t, err)
|
||||
func makeBWAs(ctx context.Context, t *testing.T, planet *testplanet.Planet) {
|
||||
satID := planet.Satellites[0].Identity
|
||||
upID := planet.Uplinks[0].Identity
|
||||
snID := planet.StorageNodes[0].Identity
|
||||
sender := planet.StorageNodes[0].Agreements.Sender
|
||||
actions := []pb.BandwidthAction{pb.BandwidthAction_PUT, pb.BandwidthAction_GET,
|
||||
pb.BandwidthAction_GET_AUDIT, pb.BandwidthAction_GET_REPAIR, pb.BandwidthAction_PUT_REPAIR}
|
||||
agreements := make([]*psdb.Agreement, len(actions))
|
||||
for i, action := range actions {
|
||||
pba, err := testbwagreement.GeneratePayerBandwidthAllocation(action, satID, upID, time.Hour)
|
||||
require.NoError(t, err)
|
||||
rba, err := testbwagreement.GenerateRenterBandwidthAllocation(pba, snID.ID, upID, 1000)
|
||||
require.NoError(t, err)
|
||||
agreements[i] = &psdb.Agreement{Agreement: *rba}
|
||||
}
|
||||
sender.SendAgreementsToSatellite(ctx, satID.ID, agreements)
|
||||
|
||||
}
|
||||
|
80
pkg/bwagreement/bwagreement_test.go
Normal file
80
pkg/bwagreement/bwagreement_test.go
Normal file
@ -0,0 +1,80 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package bwagreement_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"storj.io/storj/internal/testcontext"
|
||||
"storj.io/storj/internal/testidentity"
|
||||
"storj.io/storj/pkg/bwagreement"
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/satellitedb/satellitedbtest"
|
||||
)
|
||||
|
||||
func TestBandwidthDBAgreement(t *testing.T) {
|
||||
satellitedbtest.Run(t, func(t *testing.T, db satellite.DB) {
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
upID, err := testidentity.NewTestIdentity(ctx)
|
||||
require.NoError(t, err)
|
||||
snID, err := testidentity.NewTestIdentity(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, testCreateAgreement(ctx, t, db.BandwidthAgreement(), pb.BandwidthAction_PUT, "1", upID, snID))
|
||||
require.Error(t, testCreateAgreement(ctx, t, db.BandwidthAgreement(), pb.BandwidthAction_GET, "1", upID, snID))
|
||||
require.NoError(t, testCreateAgreement(ctx, t, db.BandwidthAgreement(), pb.BandwidthAction_GET, "2", upID, snID))
|
||||
testGetTotals(ctx, t, db.BandwidthAgreement(), snID)
|
||||
testGetUplinkStats(ctx, t, db.BandwidthAgreement(), upID)
|
||||
})
|
||||
}
|
||||
|
||||
func testCreateAgreement(ctx context.Context, t *testing.T, b bwagreement.DB, action pb.BandwidthAction,
|
||||
serialNum string, upID, snID *identity.FullIdentity) error {
|
||||
rba := &pb.RenterBandwidthAllocation{
|
||||
PayerAllocation: pb.PayerBandwidthAllocation{
|
||||
Action: action,
|
||||
SerialNumber: serialNum,
|
||||
UplinkId: upID.ID,
|
||||
},
|
||||
Total: 1000,
|
||||
StorageNodeId: snID.ID,
|
||||
}
|
||||
return b.CreateAgreement(ctx, rba)
|
||||
}
|
||||
|
||||
func testGetUplinkStats(ctx context.Context, t *testing.T, b bwagreement.DB, upID *identity.FullIdentity) {
|
||||
stats, err := b.GetUplinkStats(ctx, time.Time{}, time.Now().UTC())
|
||||
require.NoError(t, err)
|
||||
var found int
|
||||
for _, s := range stats {
|
||||
if upID.ID == s.NodeID {
|
||||
found++
|
||||
require.Equal(t, int64(2000), s.TotalBytes)
|
||||
require.Equal(t, 1, s.GetActionCount)
|
||||
require.Equal(t, 1, s.PutActionCount)
|
||||
require.Equal(t, 2, s.TotalTransactions)
|
||||
}
|
||||
}
|
||||
require.Equal(t, 1, found)
|
||||
}
|
||||
|
||||
func testGetTotals(ctx context.Context, t *testing.T, b bwagreement.DB, snID *identity.FullIdentity) {
|
||||
totals, err := b.GetTotals(ctx, time.Time{}, time.Now().UTC())
|
||||
require.NoError(t, err)
|
||||
total := totals[snID.ID]
|
||||
require.Len(t, total, 5)
|
||||
require.Equal(t, int64(1000), total[pb.BandwidthAction_PUT])
|
||||
require.Equal(t, int64(1000), total[pb.BandwidthAction_GET])
|
||||
require.Equal(t, int64(0), total[pb.BandwidthAction_GET_AUDIT])
|
||||
require.Equal(t, int64(0), total[pb.BandwidthAction_GET_REPAIR])
|
||||
require.Equal(t, int64(0), total[pb.BandwidthAction_PUT_REPAIR])
|
||||
}
|
@ -5,6 +5,7 @@ package bwagreement
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
@ -28,14 +29,23 @@ var (
|
||||
type Config struct {
|
||||
}
|
||||
|
||||
//UplinkStat contains information about an uplink's returned bandwidth agreement
|
||||
type UplinkStat struct {
|
||||
NodeID storj.NodeID
|
||||
TotalBytes int64
|
||||
PutActionCount int
|
||||
GetActionCount int
|
||||
TotalTransactions int
|
||||
}
|
||||
|
||||
// DB stores bandwidth agreements.
|
||||
type DB interface {
|
||||
// CreateAgreement adds a new bandwidth agreement.
|
||||
CreateAgreement(context.Context, *pb.RenterBandwidthAllocation) error
|
||||
// GetAgreements gets all bandwidth agreements.
|
||||
GetAgreements(context.Context) ([]Agreement, error)
|
||||
// GetAgreementsSince gets all bandwidth agreements since specific time.
|
||||
GetAgreementsSince(context.Context, time.Time) ([]Agreement, error)
|
||||
// GetTotalsSince returns the sum of each bandwidth type after (exluding) a given date range
|
||||
GetTotals(context.Context, time.Time, time.Time) (map[storj.NodeID][]int64, error)
|
||||
//GetTotals returns stats about an uplink
|
||||
GetUplinkStats(context.Context, time.Time, time.Time) ([]UplinkStat, error)
|
||||
}
|
||||
|
||||
// Server is an implementation of the pb.BandwidthServer interface
|
||||
@ -45,12 +55,6 @@ type Server struct {
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
// Agreement is a struct that contains a bandwidth agreement and the associated signature
|
||||
type Agreement struct {
|
||||
Agreement pb.RenterBandwidthAllocation
|
||||
CreatedAt time.Time
|
||||
}
|
||||
|
||||
// NewServer creates instance of Server
|
||||
func NewServer(db DB, logger *zap.Logger, nodeID storj.NodeID) *Server {
|
||||
// TODO: reorder arguments, rename logger -> log
|
||||
@ -71,11 +75,11 @@ func (s *Server) BandwidthAgreements(ctx context.Context, rba *pb.RenterBandwidt
|
||||
//verify message content
|
||||
pi, err := identity.PeerIdentityFromContext(ctx)
|
||||
if err != nil || rba.StorageNodeId != pi.ID {
|
||||
return reply, auth.ErrBadID.New("Storage Node ID: %s vs %s", rba.StorageNodeId, pi.ID)
|
||||
return reply, auth.ErrBadID.New("Storage Node ID: %v vs %v", rba.StorageNodeId, pi.ID)
|
||||
}
|
||||
//todo: use whitelist for uplinks?
|
||||
if pba.SatelliteId != s.NodeID {
|
||||
return reply, pb.ErrPayer.New("Satellite ID: %s vs %s", pba.SatelliteId, s.NodeID)
|
||||
return reply, pb.ErrPayer.New("Satellite ID: %v vs %v", pba.SatelliteId, s.NodeID)
|
||||
}
|
||||
exp := time.Unix(pba.GetExpirationUnixSec(), 0).UTC()
|
||||
if exp.Before(time.Now().UTC()) {
|
||||
@ -90,10 +94,13 @@ func (s *Server) BandwidthAgreements(ctx context.Context, rba *pb.RenterBandwidt
|
||||
}
|
||||
|
||||
//save and return rersults
|
||||
err = s.db.CreateAgreement(ctx, rba)
|
||||
if err != nil {
|
||||
//todo: better classify transport errors (AgreementsSummary_FAIL) vs logical (AgreementsSummary_REJECTED)
|
||||
return reply, pb.ErrPayer.Wrap(auth.ErrSerial.Wrap(err))
|
||||
if err = s.db.CreateAgreement(ctx, rba); err != nil {
|
||||
if strings.Contains(err.Error(), "UNIQUE constraint failed") ||
|
||||
strings.Contains(err.Error(), "violates unique constraint") {
|
||||
return reply, pb.ErrPayer.Wrap(auth.ErrSerial.Wrap(err))
|
||||
}
|
||||
reply.Status = pb.AgreementsSummary_FAIL
|
||||
return reply, pb.ErrPayer.Wrap(err)
|
||||
}
|
||||
reply.Status = pb.AgreementsSummary_OK
|
||||
s.logger.Debug("Stored Agreement...")
|
||||
|
@ -112,7 +112,7 @@ func testDatabase(ctx context.Context, t *testing.T, bwdb bwagreement.DB) {
|
||||
assert.NoError(t, err)
|
||||
|
||||
reply, err := satellite.BandwidthAgreements(ctxSN1, rbaNode1)
|
||||
assert.True(t, auth.ErrSerial.Has(err))
|
||||
assert.True(t, auth.ErrSerial.Has(err), err.Error())
|
||||
assert.Equal(t, pb.AgreementsSummary_REJECTED, reply.Status)
|
||||
}
|
||||
|
||||
|
@ -52,7 +52,7 @@ func (as *AgreementSender) Run(ctx context.Context) error {
|
||||
continue
|
||||
}
|
||||
for satellite, agreements := range agreementGroups {
|
||||
as.sendAgreementsToSatellite(ctx, satellite, agreements)
|
||||
as.SendAgreementsToSatellite(ctx, satellite, agreements)
|
||||
}
|
||||
select {
|
||||
case <-ticker.C:
|
||||
@ -62,7 +62,8 @@ func (as *AgreementSender) Run(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
func (as *AgreementSender) sendAgreementsToSatellite(ctx context.Context, satID storj.NodeID, agreements []*psdb.Agreement) {
|
||||
//SendAgreementsToSatellite uploads agreements to the satellite
|
||||
func (as *AgreementSender) SendAgreementsToSatellite(ctx context.Context, satID storj.NodeID, agreements []*psdb.Agreement) {
|
||||
as.log.Info("Sending agreements to satellite", zap.Int("number of agreements", len(agreements)), zap.String("satellite id", satID.String()))
|
||||
// todo: cache kad responses if this interval is very small
|
||||
// Get satellite ip from kademlia
|
||||
|
@ -7,6 +7,8 @@ import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/pkg/accounting"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/pkg/utils"
|
||||
@ -18,18 +20,31 @@ type accountingDB struct {
|
||||
db *dbx.DB
|
||||
}
|
||||
|
||||
// LastRawTime records the greatest last tallied time
|
||||
func (db *accountingDB) LastRawTime(ctx context.Context, timestampType string) (time.Time, bool, error) {
|
||||
lastTally, err := db.db.Find_AccountingTimestamps_Value_By_Name(ctx, dbx.AccountingTimestamps_Name(timestampType))
|
||||
if lastTally == nil {
|
||||
return time.Time{}, true, err
|
||||
// LastTimestamp records the greatest last tallied time
|
||||
func (db *accountingDB) LastTimestamp(ctx context.Context, timestampType string) (last time.Time, err error) {
|
||||
// todo: use WithTx https://github.com/spacemonkeygo/dbx#transactions
|
||||
tx, err := db.db.Open(ctx)
|
||||
if err != nil {
|
||||
return last, Error.Wrap(err)
|
||||
}
|
||||
return lastTally.Value, false, err
|
||||
defer func() {
|
||||
if err == nil {
|
||||
err = tx.Commit()
|
||||
} else {
|
||||
err = errs.Combine(err, tx.Rollback())
|
||||
}
|
||||
}()
|
||||
lastTally, err := tx.Find_AccountingTimestamps_Value_By_Name(ctx, dbx.AccountingTimestamps_Name(timestampType))
|
||||
if lastTally == nil {
|
||||
update := dbx.AccountingTimestamps_Value(time.Time{})
|
||||
_, err = tx.Create_AccountingTimestamps(ctx, dbx.AccountingTimestamps_Name(timestampType), update)
|
||||
return time.Time{}, err
|
||||
}
|
||||
return lastTally.Value, err
|
||||
}
|
||||
|
||||
// SaveBWRaw records granular tallies (sums of bw agreement values) to the database
|
||||
// and updates the LastRawTime
|
||||
func (db *accountingDB) SaveBWRaw(ctx context.Context, latestBwa time.Time, isNew bool, bwTotals accounting.BWTally) (err error) {
|
||||
// SaveBWRaw records granular tallies (sums of bw agreement values) to the database and updates the LastTimestamp
|
||||
func (db *accountingDB) SaveBWRaw(ctx context.Context, tallyEnd time.Time, bwTotals map[storj.NodeID][]int64) (err error) {
|
||||
// We use the latest bandwidth agreement value of a batch of records as the start of the next batch
|
||||
// todo: consider finding the sum of bwagreements using SQL sum() direct against the bwa table
|
||||
if len(bwTotals) == 0 {
|
||||
@ -48,11 +63,11 @@ func (db *accountingDB) SaveBWRaw(ctx context.Context, latestBwa time.Time, isNe
|
||||
}
|
||||
}()
|
||||
//create a granular record per node id
|
||||
for actionType, bwActionTotals := range bwTotals {
|
||||
for k, v := range bwActionTotals {
|
||||
nID := dbx.AccountingRaw_NodeId(k.Bytes())
|
||||
end := dbx.AccountingRaw_IntervalEndTime(latestBwa)
|
||||
total := dbx.AccountingRaw_DataTotal(float64(v))
|
||||
for nodeID, totals := range bwTotals {
|
||||
for actionType, total := range totals {
|
||||
nID := dbx.AccountingRaw_NodeId(nodeID.Bytes())
|
||||
end := dbx.AccountingRaw_IntervalEndTime(tallyEnd)
|
||||
total := dbx.AccountingRaw_DataTotal(float64(total))
|
||||
dataType := dbx.AccountingRaw_DataType(actionType)
|
||||
_, err = tx.Create_AccountingRaw(ctx, nID, end, total, dataType)
|
||||
if err != nil {
|
||||
@ -61,19 +76,13 @@ func (db *accountingDB) SaveBWRaw(ctx context.Context, latestBwa time.Time, isNe
|
||||
}
|
||||
}
|
||||
//save this batch's greatest time
|
||||
|
||||
if isNew {
|
||||
update := dbx.AccountingTimestamps_Value(latestBwa)
|
||||
_, err = tx.Create_AccountingTimestamps(ctx, dbx.AccountingTimestamps_Name(accounting.LastBandwidthTally), update)
|
||||
} else {
|
||||
update := dbx.AccountingTimestamps_Update_Fields{Value: dbx.AccountingTimestamps_Value(latestBwa)}
|
||||
_, err = tx.Update_AccountingTimestamps_By_Name(ctx, dbx.AccountingTimestamps_Name(accounting.LastBandwidthTally), update)
|
||||
}
|
||||
update := dbx.AccountingTimestamps_Update_Fields{Value: dbx.AccountingTimestamps_Value(tallyEnd)}
|
||||
_, err = tx.Update_AccountingTimestamps_By_Name(ctx, dbx.AccountingTimestamps_Name(accounting.LastBandwidthTally), update)
|
||||
return err
|
||||
}
|
||||
|
||||
// SaveAtRestRaw records raw tallies of at rest data to the database
|
||||
func (db *accountingDB) SaveAtRestRaw(ctx context.Context, latestTally time.Time, isNew bool, nodeData map[storj.NodeID]float64) error {
|
||||
func (db *accountingDB) SaveAtRestRaw(ctx context.Context, latestTally time.Time, nodeData map[storj.NodeID]float64) error {
|
||||
if len(nodeData) == 0 {
|
||||
return Error.New("In SaveAtRestRaw with empty nodeData")
|
||||
}
|
||||
@ -98,14 +107,8 @@ func (db *accountingDB) SaveAtRestRaw(ctx context.Context, latestTally time.Time
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
}
|
||||
|
||||
if isNew {
|
||||
update := dbx.AccountingTimestamps_Value(latestTally)
|
||||
_, err = tx.Create_AccountingTimestamps(ctx, dbx.AccountingTimestamps_Name(accounting.LastAtRestTally), update)
|
||||
} else {
|
||||
update := dbx.AccountingTimestamps_Update_Fields{Value: dbx.AccountingTimestamps_Value(latestTally)}
|
||||
_, err = tx.Update_AccountingTimestamps_By_Name(ctx, dbx.AccountingTimestamps_Name(accounting.LastAtRestTally), update)
|
||||
}
|
||||
update := dbx.AccountingTimestamps_Update_Fields{Value: dbx.AccountingTimestamps_Value(latestTally)}
|
||||
_, err = tx.Update_AccountingTimestamps_By_Name(ctx, dbx.AccountingTimestamps_Name(accounting.LastAtRestTally), update)
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
@ -152,7 +155,7 @@ func (db *accountingDB) GetRawSince(ctx context.Context, latestRollup time.Time)
|
||||
}
|
||||
|
||||
// SaveRollup records raw tallies of at rest data to the database
|
||||
func (db *accountingDB) SaveRollup(ctx context.Context, latestRollup time.Time, isNew bool, stats accounting.RollupStats) error {
|
||||
func (db *accountingDB) SaveRollup(ctx context.Context, latestRollup time.Time, stats accounting.RollupStats) error {
|
||||
if len(stats) == 0 {
|
||||
return Error.New("In SaveRollup with empty nodeData")
|
||||
}
|
||||
@ -183,13 +186,8 @@ func (db *accountingDB) SaveRollup(ctx context.Context, latestRollup time.Time,
|
||||
}
|
||||
}
|
||||
}
|
||||
if isNew {
|
||||
update := dbx.AccountingTimestamps_Value(latestRollup)
|
||||
_, err = tx.Create_AccountingTimestamps(ctx, dbx.AccountingTimestamps_Name(accounting.LastRollup), update)
|
||||
} else {
|
||||
update := dbx.AccountingTimestamps_Update_Fields{Value: dbx.AccountingTimestamps_Value(latestRollup)}
|
||||
_, err = tx.Update_AccountingTimestamps_By_Name(ctx, dbx.AccountingTimestamps_Name(accounting.LastRollup), update)
|
||||
}
|
||||
update := dbx.AccountingTimestamps_Update_Fields{Value: dbx.AccountingTimestamps_Value(latestRollup)}
|
||||
_, err = tx.Update_AccountingTimestamps_By_Name(ctx, dbx.AccountingTimestamps_Name(accounting.LastRollup), update)
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
|
@ -5,12 +5,14 @@ package satellitedb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/pkg/bwagreement"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/storj"
|
||||
dbx "storj.io/storj/satellite/satellitedb/dbx"
|
||||
)
|
||||
|
||||
@ -18,17 +20,13 @@ type bandwidthagreement struct {
|
||||
db *dbx.DB
|
||||
}
|
||||
|
||||
func (b *bandwidthagreement) CreateAgreement(ctx context.Context, rba *pb.RenterBandwidthAllocation) error {
|
||||
rbaBytes, err := proto.Marshal(rba)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
func (b *bandwidthagreement) CreateAgreement(ctx context.Context, rba *pb.RenterBandwidthAllocation) (err error) {
|
||||
expiration := time.Unix(rba.PayerAllocation.ExpirationUnixSec, 0)
|
||||
_, err = b.db.Create_Bwagreement(
|
||||
ctx,
|
||||
dbx.Bwagreement_Serialnum(rba.PayerAllocation.SerialNumber+rba.StorageNodeId.String()),
|
||||
dbx.Bwagreement_Data(rbaBytes),
|
||||
dbx.Bwagreement_StorageNode(rba.StorageNodeId.Bytes()),
|
||||
dbx.Bwagreement_StorageNodeId(rba.StorageNodeId.Bytes()),
|
||||
dbx.Bwagreement_UplinkId(rba.PayerAllocation.UplinkId.Bytes()),
|
||||
dbx.Bwagreement_Action(int64(rba.PayerAllocation.Action)),
|
||||
dbx.Bwagreement_Total(rba.Total),
|
||||
dbx.Bwagreement_ExpiresAt(expiration),
|
||||
@ -36,43 +34,71 @@ func (b *bandwidthagreement) CreateAgreement(ctx context.Context, rba *pb.Renter
|
||||
return err
|
||||
}
|
||||
|
||||
func (b *bandwidthagreement) GetAgreements(ctx context.Context) ([]bwagreement.Agreement, error) {
|
||||
rows, err := b.db.All_Bwagreement(ctx)
|
||||
//GetTotals returns stats about an uplink
|
||||
func (b *bandwidthagreement) GetUplinkStats(ctx context.Context, from, to time.Time) (stats []bwagreement.UplinkStat, err error) {
|
||||
|
||||
var uplinkSQL = fmt.Sprintf(`SELECT uplink_id, SUM(total),
|
||||
COUNT(CASE WHEN action = %d THEN total ELSE null END),
|
||||
COUNT(CASE WHEN action = %d THEN total ELSE null END), COUNT(*)
|
||||
FROM bwagreements WHERE created_at > ?
|
||||
AND created_at <= ? GROUP BY uplink_id ORDER BY uplink_id`,
|
||||
pb.BandwidthAction_PUT, pb.BandwidthAction_GET)
|
||||
rows, err := b.db.DB.Query(b.db.Rebind(uplinkSQL), from, to)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
agreements := make([]bwagreement.Agreement, len(rows))
|
||||
for i, entry := range rows {
|
||||
rba := pb.RenterBandwidthAllocation{}
|
||||
err := proto.Unmarshal(entry.Data, &rba)
|
||||
defer func() { err = errs.Combine(err, rows.Close()) }()
|
||||
for rows.Next() {
|
||||
var nodeID []byte
|
||||
stat := bwagreement.UplinkStat{}
|
||||
err := rows.Scan(&nodeID, &stat.TotalBytes, &stat.PutActionCount, &stat.GetActionCount, &stat.TotalTransactions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return stats, err
|
||||
}
|
||||
agreement := &agreements[i]
|
||||
agreement.Agreement = rba
|
||||
agreement.CreatedAt = entry.CreatedAt
|
||||
id, err := storj.NodeIDFromBytes(nodeID)
|
||||
if err != nil {
|
||||
return stats, err
|
||||
}
|
||||
stat.NodeID = id
|
||||
stats = append(stats, stat)
|
||||
}
|
||||
return agreements, nil
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
func (b *bandwidthagreement) GetAgreementsSince(ctx context.Context, since time.Time) ([]bwagreement.Agreement, error) {
|
||||
rows, err := b.db.All_Bwagreement_By_CreatedAt_Greater(ctx, dbx.Bwagreement_CreatedAt(since))
|
||||
//GetTotals returns the sum of each bandwidth type after (exluding) a given date range
|
||||
func (b *bandwidthagreement) GetTotals(ctx context.Context, from, to time.Time) (bwa map[storj.NodeID][]int64, err error) {
|
||||
var getTotalsSQL = fmt.Sprintf(`SELECT storage_node_id,
|
||||
SUM(CASE WHEN action = %d THEN total ELSE 0 END),
|
||||
SUM(CASE WHEN action = %d THEN total ELSE 0 END),
|
||||
SUM(CASE WHEN action = %d THEN total ELSE 0 END),
|
||||
SUM(CASE WHEN action = %d THEN total ELSE 0 END),
|
||||
SUM(CASE WHEN action = %d THEN total ELSE 0 END)
|
||||
FROM bwagreements WHERE created_at > ? AND created_at <= ?
|
||||
GROUP BY storage_node_id ORDER BY storage_node_id`, pb.BandwidthAction_PUT,
|
||||
pb.BandwidthAction_GET, pb.BandwidthAction_GET_AUDIT,
|
||||
pb.BandwidthAction_GET_REPAIR, pb.BandwidthAction_PUT_REPAIR)
|
||||
rows, err := b.db.DB.Query(b.db.Rebind(getTotalsSQL), from, to)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() { err = errs.Combine(err, rows.Close()) }()
|
||||
|
||||
agreements := make([]bwagreement.Agreement, len(rows))
|
||||
for i, entry := range rows {
|
||||
rba := pb.RenterBandwidthAllocation{}
|
||||
err := proto.Unmarshal(entry.Data, &rba)
|
||||
totals := make(map[storj.NodeID][]int64)
|
||||
for i := 0; rows.Next(); i++ {
|
||||
var nodeID []byte
|
||||
data := make([]int64, len(pb.BandwidthAction_value))
|
||||
err := rows.Scan(&nodeID, &data[pb.BandwidthAction_PUT], &data[pb.BandwidthAction_GET],
|
||||
&data[pb.BandwidthAction_GET_AUDIT], &data[pb.BandwidthAction_GET_REPAIR], &data[pb.BandwidthAction_PUT_REPAIR])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return totals, err
|
||||
}
|
||||
agreement := &agreements[i]
|
||||
agreement.Agreement = rba
|
||||
agreement.CreatedAt = entry.CreatedAt
|
||||
id, err := storj.NodeIDFromBytes(nodeID)
|
||||
if err != nil {
|
||||
return totals, err
|
||||
}
|
||||
totals[id] = data
|
||||
}
|
||||
return agreements, nil
|
||||
return totals, nil
|
||||
}
|
||||
|
||||
func (b *bandwidthagreement) DeletePaidAndExpired(ctx context.Context) error {
|
||||
|
@ -5,32 +5,18 @@
|
||||
model bwagreement (
|
||||
key serialnum
|
||||
|
||||
field serialnum text
|
||||
field data blob
|
||||
field storage_node blob
|
||||
field action int64
|
||||
field total int64
|
||||
field created_at timestamp ( autoinsert )
|
||||
field expires_at timestamp
|
||||
field serialnum text
|
||||
field storage_node_id blob
|
||||
field uplink_id blob
|
||||
field action int64
|
||||
field total int64
|
||||
field created_at timestamp ( autoinsert )
|
||||
field expires_at timestamp
|
||||
)
|
||||
|
||||
create bwagreement ( )
|
||||
delete bwagreement ( where bwagreement.serialnum = ? )
|
||||
delete bwagreement ( where bwagreement.expires_at <= ?)
|
||||
|
||||
read one (
|
||||
select bwagreement
|
||||
where bwagreement.serialnum = ?
|
||||
)
|
||||
|
||||
read limitoffset (
|
||||
select bwagreement
|
||||
)
|
||||
|
||||
read all (
|
||||
select bwagreement
|
||||
)
|
||||
|
||||
read limitoffset ( select bwagreement)
|
||||
read all ( select bwagreement)
|
||||
read all (
|
||||
select bwagreement
|
||||
where bwagreement.created_at > ?
|
||||
|
@ -301,8 +301,8 @@ CREATE TABLE accounting_timestamps (
|
||||
);
|
||||
CREATE TABLE bwagreements (
|
||||
serialnum text NOT NULL,
|
||||
data bytea NOT NULL,
|
||||
storage_node bytea NOT NULL,
|
||||
storage_node_id bytea NOT NULL,
|
||||
uplink_id bytea NOT NULL,
|
||||
action bigint NOT NULL,
|
||||
total bigint NOT NULL,
|
||||
created_at timestamp with time zone NOT NULL,
|
||||
@ -483,8 +483,8 @@ CREATE TABLE accounting_timestamps (
|
||||
);
|
||||
CREATE TABLE bwagreements (
|
||||
serialnum TEXT NOT NULL,
|
||||
data BLOB NOT NULL,
|
||||
storage_node BLOB NOT NULL,
|
||||
storage_node_id BLOB NOT NULL,
|
||||
uplink_id BLOB NOT NULL,
|
||||
action INTEGER NOT NULL,
|
||||
total INTEGER NOT NULL,
|
||||
created_at TIMESTAMP NOT NULL,
|
||||
@ -1002,13 +1002,13 @@ func (f AccountingTimestamps_Value_Field) value() interface{} {
|
||||
func (AccountingTimestamps_Value_Field) _Column() string { return "value" }
|
||||
|
||||
type Bwagreement struct {
|
||||
Serialnum string
|
||||
Data []byte
|
||||
StorageNode []byte
|
||||
Action int64
|
||||
Total int64
|
||||
CreatedAt time.Time
|
||||
ExpiresAt time.Time
|
||||
Serialnum string
|
||||
StorageNodeId []byte
|
||||
UplinkId []byte
|
||||
Action int64
|
||||
Total int64
|
||||
CreatedAt time.Time
|
||||
ExpiresAt time.Time
|
||||
}
|
||||
|
||||
func (Bwagreement) _Table() string { return "bwagreements" }
|
||||
@ -1035,43 +1035,43 @@ func (f Bwagreement_Serialnum_Field) value() interface{} {
|
||||
|
||||
func (Bwagreement_Serialnum_Field) _Column() string { return "serialnum" }
|
||||
|
||||
type Bwagreement_Data_Field struct {
|
||||
type Bwagreement_StorageNodeId_Field struct {
|
||||
_set bool
|
||||
_null bool
|
||||
_value []byte
|
||||
}
|
||||
|
||||
func Bwagreement_Data(v []byte) Bwagreement_Data_Field {
|
||||
return Bwagreement_Data_Field{_set: true, _value: v}
|
||||
func Bwagreement_StorageNodeId(v []byte) Bwagreement_StorageNodeId_Field {
|
||||
return Bwagreement_StorageNodeId_Field{_set: true, _value: v}
|
||||
}
|
||||
|
||||
func (f Bwagreement_Data_Field) value() interface{} {
|
||||
func (f Bwagreement_StorageNodeId_Field) value() interface{} {
|
||||
if !f._set || f._null {
|
||||
return nil
|
||||
}
|
||||
return f._value
|
||||
}
|
||||
|
||||
func (Bwagreement_Data_Field) _Column() string { return "data" }
|
||||
func (Bwagreement_StorageNodeId_Field) _Column() string { return "storage_node_id" }
|
||||
|
||||
type Bwagreement_StorageNode_Field struct {
|
||||
type Bwagreement_UplinkId_Field struct {
|
||||
_set bool
|
||||
_null bool
|
||||
_value []byte
|
||||
}
|
||||
|
||||
func Bwagreement_StorageNode(v []byte) Bwagreement_StorageNode_Field {
|
||||
return Bwagreement_StorageNode_Field{_set: true, _value: v}
|
||||
func Bwagreement_UplinkId(v []byte) Bwagreement_UplinkId_Field {
|
||||
return Bwagreement_UplinkId_Field{_set: true, _value: v}
|
||||
}
|
||||
|
||||
func (f Bwagreement_StorageNode_Field) value() interface{} {
|
||||
func (f Bwagreement_UplinkId_Field) value() interface{} {
|
||||
if !f._set || f._null {
|
||||
return nil
|
||||
}
|
||||
return f._value
|
||||
}
|
||||
|
||||
func (Bwagreement_StorageNode_Field) _Column() string { return "storage_node" }
|
||||
func (Bwagreement_UplinkId_Field) _Column() string { return "uplink_id" }
|
||||
|
||||
type Bwagreement_Action_Field struct {
|
||||
_set bool
|
||||
@ -2502,8 +2502,8 @@ type Value_Row struct {
|
||||
|
||||
func (obj *postgresImpl) Create_Bwagreement(ctx context.Context,
|
||||
bwagreement_serialnum Bwagreement_Serialnum_Field,
|
||||
bwagreement_data Bwagreement_Data_Field,
|
||||
bwagreement_storage_node Bwagreement_StorageNode_Field,
|
||||
bwagreement_storage_node_id Bwagreement_StorageNodeId_Field,
|
||||
bwagreement_uplink_id Bwagreement_UplinkId_Field,
|
||||
bwagreement_action Bwagreement_Action_Field,
|
||||
bwagreement_total Bwagreement_Total_Field,
|
||||
bwagreement_expires_at Bwagreement_ExpiresAt_Field) (
|
||||
@ -2511,20 +2511,20 @@ func (obj *postgresImpl) Create_Bwagreement(ctx context.Context,
|
||||
|
||||
__now := obj.db.Hooks.Now().UTC()
|
||||
__serialnum_val := bwagreement_serialnum.value()
|
||||
__data_val := bwagreement_data.value()
|
||||
__storage_node_val := bwagreement_storage_node.value()
|
||||
__storage_node_id_val := bwagreement_storage_node_id.value()
|
||||
__uplink_id_val := bwagreement_uplink_id.value()
|
||||
__action_val := bwagreement_action.value()
|
||||
__total_val := bwagreement_total.value()
|
||||
__created_at_val := __now
|
||||
__expires_at_val := bwagreement_expires_at.value()
|
||||
|
||||
var __embed_stmt = __sqlbundle_Literal("INSERT INTO bwagreements ( serialnum, data, storage_node, action, total, created_at, expires_at ) VALUES ( ?, ?, ?, ?, ?, ?, ? ) RETURNING bwagreements.serialnum, bwagreements.data, bwagreements.storage_node, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at")
|
||||
var __embed_stmt = __sqlbundle_Literal("INSERT INTO bwagreements ( serialnum, storage_node_id, uplink_id, action, total, created_at, expires_at ) VALUES ( ?, ?, ?, ?, ?, ?, ? ) RETURNING bwagreements.serialnum, bwagreements.storage_node_id, bwagreements.uplink_id, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at")
|
||||
|
||||
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
|
||||
obj.logStmt(__stmt, __serialnum_val, __data_val, __storage_node_val, __action_val, __total_val, __created_at_val, __expires_at_val)
|
||||
obj.logStmt(__stmt, __serialnum_val, __storage_node_id_val, __uplink_id_val, __action_val, __total_val, __created_at_val, __expires_at_val)
|
||||
|
||||
bwagreement = &Bwagreement{}
|
||||
err = obj.driver.QueryRow(__stmt, __serialnum_val, __data_val, __storage_node_val, __action_val, __total_val, __created_at_val, __expires_at_val).Scan(&bwagreement.Serialnum, &bwagreement.Data, &bwagreement.StorageNode, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt)
|
||||
err = obj.driver.QueryRow(__stmt, __serialnum_val, __storage_node_id_val, __uplink_id_val, __action_val, __total_val, __created_at_val, __expires_at_val).Scan(&bwagreement.Serialnum, &bwagreement.StorageNodeId, &bwagreement.UplinkId, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt)
|
||||
if err != nil {
|
||||
return nil, obj.makeErr(err)
|
||||
}
|
||||
@ -2874,32 +2874,11 @@ func (obj *postgresImpl) Create_BucketInfo(ctx context.Context,
|
||||
|
||||
}
|
||||
|
||||
func (obj *postgresImpl) Get_Bwagreement_By_Serialnum(ctx context.Context,
|
||||
bwagreement_serialnum Bwagreement_Serialnum_Field) (
|
||||
bwagreement *Bwagreement, err error) {
|
||||
|
||||
var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.data, bwagreements.storage_node, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements WHERE bwagreements.serialnum = ?")
|
||||
|
||||
var __values []interface{}
|
||||
__values = append(__values, bwagreement_serialnum.value())
|
||||
|
||||
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
|
||||
obj.logStmt(__stmt, __values...)
|
||||
|
||||
bwagreement = &Bwagreement{}
|
||||
err = obj.driver.QueryRow(__stmt, __values...).Scan(&bwagreement.Serialnum, &bwagreement.Data, &bwagreement.StorageNode, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt)
|
||||
if err != nil {
|
||||
return nil, obj.makeErr(err)
|
||||
}
|
||||
return bwagreement, nil
|
||||
|
||||
}
|
||||
|
||||
func (obj *postgresImpl) Limited_Bwagreement(ctx context.Context,
|
||||
limit int, offset int64) (
|
||||
rows []*Bwagreement, err error) {
|
||||
|
||||
var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.data, bwagreements.storage_node, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements LIMIT ? OFFSET ?")
|
||||
var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.storage_node_id, bwagreements.uplink_id, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements LIMIT ? OFFSET ?")
|
||||
|
||||
var __values []interface{}
|
||||
__values = append(__values)
|
||||
@ -2917,7 +2896,7 @@ func (obj *postgresImpl) Limited_Bwagreement(ctx context.Context,
|
||||
|
||||
for __rows.Next() {
|
||||
bwagreement := &Bwagreement{}
|
||||
err = __rows.Scan(&bwagreement.Serialnum, &bwagreement.Data, &bwagreement.StorageNode, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt)
|
||||
err = __rows.Scan(&bwagreement.Serialnum, &bwagreement.StorageNodeId, &bwagreement.UplinkId, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt)
|
||||
if err != nil {
|
||||
return nil, obj.makeErr(err)
|
||||
}
|
||||
@ -2933,7 +2912,7 @@ func (obj *postgresImpl) Limited_Bwagreement(ctx context.Context,
|
||||
func (obj *postgresImpl) All_Bwagreement(ctx context.Context) (
|
||||
rows []*Bwagreement, err error) {
|
||||
|
||||
var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.data, bwagreements.storage_node, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements")
|
||||
var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.storage_node_id, bwagreements.uplink_id, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements")
|
||||
|
||||
var __values []interface{}
|
||||
__values = append(__values)
|
||||
@ -2949,7 +2928,7 @@ func (obj *postgresImpl) All_Bwagreement(ctx context.Context) (
|
||||
|
||||
for __rows.Next() {
|
||||
bwagreement := &Bwagreement{}
|
||||
err = __rows.Scan(&bwagreement.Serialnum, &bwagreement.Data, &bwagreement.StorageNode, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt)
|
||||
err = __rows.Scan(&bwagreement.Serialnum, &bwagreement.StorageNodeId, &bwagreement.UplinkId, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt)
|
||||
if err != nil {
|
||||
return nil, obj.makeErr(err)
|
||||
}
|
||||
@ -2966,7 +2945,7 @@ func (obj *postgresImpl) All_Bwagreement_By_CreatedAt_Greater(ctx context.Contex
|
||||
bwagreement_created_at_greater Bwagreement_CreatedAt_Field) (
|
||||
rows []*Bwagreement, err error) {
|
||||
|
||||
var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.data, bwagreements.storage_node, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements WHERE bwagreements.created_at > ?")
|
||||
var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.storage_node_id, bwagreements.uplink_id, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements WHERE bwagreements.created_at > ?")
|
||||
|
||||
var __values []interface{}
|
||||
__values = append(__values, bwagreement_created_at_greater.value())
|
||||
@ -2982,7 +2961,7 @@ func (obj *postgresImpl) All_Bwagreement_By_CreatedAt_Greater(ctx context.Contex
|
||||
|
||||
for __rows.Next() {
|
||||
bwagreement := &Bwagreement{}
|
||||
err = __rows.Scan(&bwagreement.Serialnum, &bwagreement.Data, &bwagreement.StorageNode, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt)
|
||||
err = __rows.Scan(&bwagreement.Serialnum, &bwagreement.StorageNodeId, &bwagreement.UplinkId, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt)
|
||||
if err != nil {
|
||||
return nil, obj.makeErr(err)
|
||||
}
|
||||
@ -4143,58 +4122,6 @@ func (obj *postgresImpl) Update_ApiKey_By_Id(ctx context.Context,
|
||||
return api_key, nil
|
||||
}
|
||||
|
||||
func (obj *postgresImpl) Delete_Bwagreement_By_Serialnum(ctx context.Context,
|
||||
bwagreement_serialnum Bwagreement_Serialnum_Field) (
|
||||
deleted bool, err error) {
|
||||
|
||||
var __embed_stmt = __sqlbundle_Literal("DELETE FROM bwagreements WHERE bwagreements.serialnum = ?")
|
||||
|
||||
var __values []interface{}
|
||||
__values = append(__values, bwagreement_serialnum.value())
|
||||
|
||||
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
|
||||
obj.logStmt(__stmt, __values...)
|
||||
|
||||
__res, err := obj.driver.Exec(__stmt, __values...)
|
||||
if err != nil {
|
||||
return false, obj.makeErr(err)
|
||||
}
|
||||
|
||||
__count, err := __res.RowsAffected()
|
||||
if err != nil {
|
||||
return false, obj.makeErr(err)
|
||||
}
|
||||
|
||||
return __count > 0, nil
|
||||
|
||||
}
|
||||
|
||||
func (obj *postgresImpl) Delete_Bwagreement_By_ExpiresAt_LessOrEqual(ctx context.Context,
|
||||
bwagreement_expires_at_less_or_equal Bwagreement_ExpiresAt_Field) (
|
||||
count int64, err error) {
|
||||
|
||||
var __embed_stmt = __sqlbundle_Literal("DELETE FROM bwagreements WHERE bwagreements.expires_at <= ?")
|
||||
|
||||
var __values []interface{}
|
||||
__values = append(__values, bwagreement_expires_at_less_or_equal.value())
|
||||
|
||||
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
|
||||
obj.logStmt(__stmt, __values...)
|
||||
|
||||
__res, err := obj.driver.Exec(__stmt, __values...)
|
||||
if err != nil {
|
||||
return 0, obj.makeErr(err)
|
||||
}
|
||||
|
||||
count, err = __res.RowsAffected()
|
||||
if err != nil {
|
||||
return 0, obj.makeErr(err)
|
||||
}
|
||||
|
||||
return count, nil
|
||||
|
||||
}
|
||||
|
||||
func (obj *postgresImpl) Delete_Irreparabledb_By_Segmentpath(ctx context.Context,
|
||||
irreparabledb_segmentpath Irreparabledb_Segmentpath_Field) (
|
||||
deleted bool, err error) {
|
||||
@ -4632,8 +4559,8 @@ func (obj *postgresImpl) deleteAll(ctx context.Context) (count int64, err error)
|
||||
|
||||
func (obj *sqlite3Impl) Create_Bwagreement(ctx context.Context,
|
||||
bwagreement_serialnum Bwagreement_Serialnum_Field,
|
||||
bwagreement_data Bwagreement_Data_Field,
|
||||
bwagreement_storage_node Bwagreement_StorageNode_Field,
|
||||
bwagreement_storage_node_id Bwagreement_StorageNodeId_Field,
|
||||
bwagreement_uplink_id Bwagreement_UplinkId_Field,
|
||||
bwagreement_action Bwagreement_Action_Field,
|
||||
bwagreement_total Bwagreement_Total_Field,
|
||||
bwagreement_expires_at Bwagreement_ExpiresAt_Field) (
|
||||
@ -4641,19 +4568,19 @@ func (obj *sqlite3Impl) Create_Bwagreement(ctx context.Context,
|
||||
|
||||
__now := obj.db.Hooks.Now().UTC()
|
||||
__serialnum_val := bwagreement_serialnum.value()
|
||||
__data_val := bwagreement_data.value()
|
||||
__storage_node_val := bwagreement_storage_node.value()
|
||||
__storage_node_id_val := bwagreement_storage_node_id.value()
|
||||
__uplink_id_val := bwagreement_uplink_id.value()
|
||||
__action_val := bwagreement_action.value()
|
||||
__total_val := bwagreement_total.value()
|
||||
__created_at_val := __now
|
||||
__expires_at_val := bwagreement_expires_at.value()
|
||||
|
||||
var __embed_stmt = __sqlbundle_Literal("INSERT INTO bwagreements ( serialnum, data, storage_node, action, total, created_at, expires_at ) VALUES ( ?, ?, ?, ?, ?, ?, ? )")
|
||||
var __embed_stmt = __sqlbundle_Literal("INSERT INTO bwagreements ( serialnum, storage_node_id, uplink_id, action, total, created_at, expires_at ) VALUES ( ?, ?, ?, ?, ?, ?, ? )")
|
||||
|
||||
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
|
||||
obj.logStmt(__stmt, __serialnum_val, __data_val, __storage_node_val, __action_val, __total_val, __created_at_val, __expires_at_val)
|
||||
obj.logStmt(__stmt, __serialnum_val, __storage_node_id_val, __uplink_id_val, __action_val, __total_val, __created_at_val, __expires_at_val)
|
||||
|
||||
__res, err := obj.driver.Exec(__stmt, __serialnum_val, __data_val, __storage_node_val, __action_val, __total_val, __created_at_val, __expires_at_val)
|
||||
__res, err := obj.driver.Exec(__stmt, __serialnum_val, __storage_node_id_val, __uplink_id_val, __action_val, __total_val, __created_at_val, __expires_at_val)
|
||||
if err != nil {
|
||||
return nil, obj.makeErr(err)
|
||||
}
|
||||
@ -5043,32 +4970,11 @@ func (obj *sqlite3Impl) Create_BucketInfo(ctx context.Context,
|
||||
|
||||
}
|
||||
|
||||
func (obj *sqlite3Impl) Get_Bwagreement_By_Serialnum(ctx context.Context,
|
||||
bwagreement_serialnum Bwagreement_Serialnum_Field) (
|
||||
bwagreement *Bwagreement, err error) {
|
||||
|
||||
var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.data, bwagreements.storage_node, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements WHERE bwagreements.serialnum = ?")
|
||||
|
||||
var __values []interface{}
|
||||
__values = append(__values, bwagreement_serialnum.value())
|
||||
|
||||
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
|
||||
obj.logStmt(__stmt, __values...)
|
||||
|
||||
bwagreement = &Bwagreement{}
|
||||
err = obj.driver.QueryRow(__stmt, __values...).Scan(&bwagreement.Serialnum, &bwagreement.Data, &bwagreement.StorageNode, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt)
|
||||
if err != nil {
|
||||
return nil, obj.makeErr(err)
|
||||
}
|
||||
return bwagreement, nil
|
||||
|
||||
}
|
||||
|
||||
func (obj *sqlite3Impl) Limited_Bwagreement(ctx context.Context,
|
||||
limit int, offset int64) (
|
||||
rows []*Bwagreement, err error) {
|
||||
|
||||
var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.data, bwagreements.storage_node, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements LIMIT ? OFFSET ?")
|
||||
var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.storage_node_id, bwagreements.uplink_id, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements LIMIT ? OFFSET ?")
|
||||
|
||||
var __values []interface{}
|
||||
__values = append(__values)
|
||||
@ -5086,7 +4992,7 @@ func (obj *sqlite3Impl) Limited_Bwagreement(ctx context.Context,
|
||||
|
||||
for __rows.Next() {
|
||||
bwagreement := &Bwagreement{}
|
||||
err = __rows.Scan(&bwagreement.Serialnum, &bwagreement.Data, &bwagreement.StorageNode, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt)
|
||||
err = __rows.Scan(&bwagreement.Serialnum, &bwagreement.StorageNodeId, &bwagreement.UplinkId, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt)
|
||||
if err != nil {
|
||||
return nil, obj.makeErr(err)
|
||||
}
|
||||
@ -5102,7 +5008,7 @@ func (obj *sqlite3Impl) Limited_Bwagreement(ctx context.Context,
|
||||
func (obj *sqlite3Impl) All_Bwagreement(ctx context.Context) (
|
||||
rows []*Bwagreement, err error) {
|
||||
|
||||
var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.data, bwagreements.storage_node, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements")
|
||||
var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.storage_node_id, bwagreements.uplink_id, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements")
|
||||
|
||||
var __values []interface{}
|
||||
__values = append(__values)
|
||||
@ -5118,7 +5024,7 @@ func (obj *sqlite3Impl) All_Bwagreement(ctx context.Context) (
|
||||
|
||||
for __rows.Next() {
|
||||
bwagreement := &Bwagreement{}
|
||||
err = __rows.Scan(&bwagreement.Serialnum, &bwagreement.Data, &bwagreement.StorageNode, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt)
|
||||
err = __rows.Scan(&bwagreement.Serialnum, &bwagreement.StorageNodeId, &bwagreement.UplinkId, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt)
|
||||
if err != nil {
|
||||
return nil, obj.makeErr(err)
|
||||
}
|
||||
@ -5135,7 +5041,7 @@ func (obj *sqlite3Impl) All_Bwagreement_By_CreatedAt_Greater(ctx context.Context
|
||||
bwagreement_created_at_greater Bwagreement_CreatedAt_Field) (
|
||||
rows []*Bwagreement, err error) {
|
||||
|
||||
var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.data, bwagreements.storage_node, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements WHERE bwagreements.created_at > ?")
|
||||
var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.storage_node_id, bwagreements.uplink_id, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements WHERE bwagreements.created_at > ?")
|
||||
|
||||
var __values []interface{}
|
||||
__values = append(__values, bwagreement_created_at_greater.value())
|
||||
@ -5151,7 +5057,7 @@ func (obj *sqlite3Impl) All_Bwagreement_By_CreatedAt_Greater(ctx context.Context
|
||||
|
||||
for __rows.Next() {
|
||||
bwagreement := &Bwagreement{}
|
||||
err = __rows.Scan(&bwagreement.Serialnum, &bwagreement.Data, &bwagreement.StorageNode, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt)
|
||||
err = __rows.Scan(&bwagreement.Serialnum, &bwagreement.StorageNodeId, &bwagreement.UplinkId, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt)
|
||||
if err != nil {
|
||||
return nil, obj.makeErr(err)
|
||||
}
|
||||
@ -6382,58 +6288,6 @@ func (obj *sqlite3Impl) Update_ApiKey_By_Id(ctx context.Context,
|
||||
return api_key, nil
|
||||
}
|
||||
|
||||
func (obj *sqlite3Impl) Delete_Bwagreement_By_Serialnum(ctx context.Context,
|
||||
bwagreement_serialnum Bwagreement_Serialnum_Field) (
|
||||
deleted bool, err error) {
|
||||
|
||||
var __embed_stmt = __sqlbundle_Literal("DELETE FROM bwagreements WHERE bwagreements.serialnum = ?")
|
||||
|
||||
var __values []interface{}
|
||||
__values = append(__values, bwagreement_serialnum.value())
|
||||
|
||||
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
|
||||
obj.logStmt(__stmt, __values...)
|
||||
|
||||
__res, err := obj.driver.Exec(__stmt, __values...)
|
||||
if err != nil {
|
||||
return false, obj.makeErr(err)
|
||||
}
|
||||
|
||||
__count, err := __res.RowsAffected()
|
||||
if err != nil {
|
||||
return false, obj.makeErr(err)
|
||||
}
|
||||
|
||||
return __count > 0, nil
|
||||
|
||||
}
|
||||
|
||||
func (obj *sqlite3Impl) Delete_Bwagreement_By_ExpiresAt_LessOrEqual(ctx context.Context,
|
||||
bwagreement_expires_at_less_or_equal Bwagreement_ExpiresAt_Field) (
|
||||
count int64, err error) {
|
||||
|
||||
var __embed_stmt = __sqlbundle_Literal("DELETE FROM bwagreements WHERE bwagreements.expires_at <= ?")
|
||||
|
||||
var __values []interface{}
|
||||
__values = append(__values, bwagreement_expires_at_less_or_equal.value())
|
||||
|
||||
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
|
||||
obj.logStmt(__stmt, __values...)
|
||||
|
||||
__res, err := obj.driver.Exec(__stmt, __values...)
|
||||
if err != nil {
|
||||
return 0, obj.makeErr(err)
|
||||
}
|
||||
|
||||
count, err = __res.RowsAffected()
|
||||
if err != nil {
|
||||
return 0, obj.makeErr(err)
|
||||
}
|
||||
|
||||
return count, nil
|
||||
|
||||
}
|
||||
|
||||
func (obj *sqlite3Impl) Delete_Irreparabledb_By_Segmentpath(ctx context.Context,
|
||||
irreparabledb_segmentpath Irreparabledb_Segmentpath_Field) (
|
||||
deleted bool, err error) {
|
||||
@ -6725,13 +6579,13 @@ func (obj *sqlite3Impl) getLastBwagreement(ctx context.Context,
|
||||
pk int64) (
|
||||
bwagreement *Bwagreement, err error) {
|
||||
|
||||
var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.data, bwagreements.storage_node, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements WHERE _rowid_ = ?")
|
||||
var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.storage_node_id, bwagreements.uplink_id, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements WHERE _rowid_ = ?")
|
||||
|
||||
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
|
||||
obj.logStmt(__stmt, pk)
|
||||
|
||||
bwagreement = &Bwagreement{}
|
||||
err = obj.driver.QueryRow(__stmt, pk).Scan(&bwagreement.Serialnum, &bwagreement.Data, &bwagreement.StorageNode, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt)
|
||||
err = obj.driver.QueryRow(__stmt, pk).Scan(&bwagreement.Serialnum, &bwagreement.StorageNodeId, &bwagreement.UplinkId, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt)
|
||||
if err != nil {
|
||||
return nil, obj.makeErr(err)
|
||||
}
|
||||
@ -7339,8 +7193,8 @@ func (rx *Rx) Create_BucketInfo(ctx context.Context,
|
||||
|
||||
func (rx *Rx) Create_Bwagreement(ctx context.Context,
|
||||
bwagreement_serialnum Bwagreement_Serialnum_Field,
|
||||
bwagreement_data Bwagreement_Data_Field,
|
||||
bwagreement_storage_node Bwagreement_StorageNode_Field,
|
||||
bwagreement_storage_node_id Bwagreement_StorageNodeId_Field,
|
||||
bwagreement_uplink_id Bwagreement_UplinkId_Field,
|
||||
bwagreement_action Bwagreement_Action_Field,
|
||||
bwagreement_total Bwagreement_Total_Field,
|
||||
bwagreement_expires_at Bwagreement_ExpiresAt_Field) (
|
||||
@ -7349,7 +7203,7 @@ func (rx *Rx) Create_Bwagreement(ctx context.Context,
|
||||
if tx, err = rx.getTx(ctx); err != nil {
|
||||
return
|
||||
}
|
||||
return tx.Create_Bwagreement(ctx, bwagreement_serialnum, bwagreement_data, bwagreement_storage_node, bwagreement_action, bwagreement_total, bwagreement_expires_at)
|
||||
return tx.Create_Bwagreement(ctx, bwagreement_serialnum, bwagreement_storage_node_id, bwagreement_uplink_id, bwagreement_action, bwagreement_total, bwagreement_expires_at)
|
||||
|
||||
}
|
||||
|
||||
@ -7501,27 +7355,6 @@ func (rx *Rx) Delete_BucketInfo_By_Name(ctx context.Context,
|
||||
return tx.Delete_BucketInfo_By_Name(ctx, bucket_info_name)
|
||||
}
|
||||
|
||||
func (rx *Rx) Delete_Bwagreement_By_ExpiresAt_LessOrEqual(ctx context.Context,
|
||||
bwagreement_expires_at_less_or_equal Bwagreement_ExpiresAt_Field) (
|
||||
count int64, err error) {
|
||||
var tx *Tx
|
||||
if tx, err = rx.getTx(ctx); err != nil {
|
||||
return
|
||||
}
|
||||
return tx.Delete_Bwagreement_By_ExpiresAt_LessOrEqual(ctx, bwagreement_expires_at_less_or_equal)
|
||||
|
||||
}
|
||||
|
||||
func (rx *Rx) Delete_Bwagreement_By_Serialnum(ctx context.Context,
|
||||
bwagreement_serialnum Bwagreement_Serialnum_Field) (
|
||||
deleted bool, err error) {
|
||||
var tx *Tx
|
||||
if tx, err = rx.getTx(ctx); err != nil {
|
||||
return
|
||||
}
|
||||
return tx.Delete_Bwagreement_By_Serialnum(ctx, bwagreement_serialnum)
|
||||
}
|
||||
|
||||
func (rx *Rx) Delete_Injuredsegment_By_Id(ctx context.Context,
|
||||
injuredsegment_id Injuredsegment_Id_Field) (
|
||||
deleted bool, err error) {
|
||||
@ -7662,16 +7495,6 @@ func (rx *Rx) Get_BucketInfo_By_Name(ctx context.Context,
|
||||
return tx.Get_BucketInfo_By_Name(ctx, bucket_info_name)
|
||||
}
|
||||
|
||||
func (rx *Rx) Get_Bwagreement_By_Serialnum(ctx context.Context,
|
||||
bwagreement_serialnum Bwagreement_Serialnum_Field) (
|
||||
bwagreement *Bwagreement, err error) {
|
||||
var tx *Tx
|
||||
if tx, err = rx.getTx(ctx); err != nil {
|
||||
return
|
||||
}
|
||||
return tx.Get_Bwagreement_By_Serialnum(ctx, bwagreement_serialnum)
|
||||
}
|
||||
|
||||
func (rx *Rx) Get_Irreparabledb_By_Segmentpath(ctx context.Context,
|
||||
irreparabledb_segmentpath Irreparabledb_Segmentpath_Field) (
|
||||
irreparabledb *Irreparabledb, err error) {
|
||||
@ -7944,8 +7767,8 @@ type Methods interface {
|
||||
|
||||
Create_Bwagreement(ctx context.Context,
|
||||
bwagreement_serialnum Bwagreement_Serialnum_Field,
|
||||
bwagreement_data Bwagreement_Data_Field,
|
||||
bwagreement_storage_node Bwagreement_StorageNode_Field,
|
||||
bwagreement_storage_node_id Bwagreement_StorageNodeId_Field,
|
||||
bwagreement_uplink_id Bwagreement_UplinkId_Field,
|
||||
bwagreement_action Bwagreement_Action_Field,
|
||||
bwagreement_total Bwagreement_Total_Field,
|
||||
bwagreement_expires_at Bwagreement_ExpiresAt_Field) (
|
||||
@ -8026,14 +7849,6 @@ type Methods interface {
|
||||
bucket_info_name BucketInfo_Name_Field) (
|
||||
deleted bool, err error)
|
||||
|
||||
Delete_Bwagreement_By_ExpiresAt_LessOrEqual(ctx context.Context,
|
||||
bwagreement_expires_at_less_or_equal Bwagreement_ExpiresAt_Field) (
|
||||
count int64, err error)
|
||||
|
||||
Delete_Bwagreement_By_Serialnum(ctx context.Context,
|
||||
bwagreement_serialnum Bwagreement_Serialnum_Field) (
|
||||
deleted bool, err error)
|
||||
|
||||
Delete_Injuredsegment_By_Id(ctx context.Context,
|
||||
injuredsegment_id Injuredsegment_Id_Field) (
|
||||
deleted bool, err error)
|
||||
@ -8090,10 +7905,6 @@ type Methods interface {
|
||||
bucket_info_name BucketInfo_Name_Field) (
|
||||
bucket_info *BucketInfo, err error)
|
||||
|
||||
Get_Bwagreement_By_Serialnum(ctx context.Context,
|
||||
bwagreement_serialnum Bwagreement_Serialnum_Field) (
|
||||
bwagreement *Bwagreement, err error)
|
||||
|
||||
Get_Irreparabledb_By_Segmentpath(ctx context.Context,
|
||||
irreparabledb_segmentpath Irreparabledb_Segmentpath_Field) (
|
||||
irreparabledb *Irreparabledb, err error)
|
||||
|
@ -28,8 +28,8 @@ CREATE TABLE accounting_timestamps (
|
||||
);
|
||||
CREATE TABLE bwagreements (
|
||||
serialnum text NOT NULL,
|
||||
data bytea NOT NULL,
|
||||
storage_node bytea NOT NULL,
|
||||
storage_node_id bytea NOT NULL,
|
||||
uplink_id bytea NOT NULL,
|
||||
action bigint NOT NULL,
|
||||
total bigint NOT NULL,
|
||||
created_at timestamp with time zone NOT NULL,
|
||||
|
@ -28,8 +28,8 @@ CREATE TABLE accounting_timestamps (
|
||||
);
|
||||
CREATE TABLE bwagreements (
|
||||
serialnum TEXT NOT NULL,
|
||||
data BLOB NOT NULL,
|
||||
storage_node BLOB NOT NULL,
|
||||
storage_node_id BLOB NOT NULL,
|
||||
uplink_id BLOB NOT NULL,
|
||||
action INTEGER NOT NULL,
|
||||
total INTEGER NOT NULL,
|
||||
created_at TIMESTAMP NOT NULL,
|
||||
|
@ -62,11 +62,11 @@ func (m *lockedAccounting) GetRawSince(ctx context.Context, latestRollup time.Ti
|
||||
return m.db.GetRawSince(ctx, latestRollup)
|
||||
}
|
||||
|
||||
// LastRawTime records the latest last tallied time.
|
||||
func (m *lockedAccounting) LastRawTime(ctx context.Context, timestampType string) (time.Time, bool, error) {
|
||||
// LastTimestamp records the latest last tallied time.
|
||||
func (m *lockedAccounting) LastTimestamp(ctx context.Context, timestampType string) (time.Time, error) {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
return m.db.LastRawTime(ctx, timestampType)
|
||||
return m.db.LastTimestamp(ctx, timestampType)
|
||||
}
|
||||
|
||||
// QueryPaymentInfo queries StatDB, Accounting Rollup on nodeID
|
||||
@ -77,24 +77,24 @@ func (m *lockedAccounting) QueryPaymentInfo(ctx context.Context, start time.Time
|
||||
}
|
||||
|
||||
// SaveAtRestRaw records raw tallies of at-rest-data.
|
||||
func (m *lockedAccounting) SaveAtRestRaw(ctx context.Context, latestTally time.Time, isNew bool, nodeData map[storj.NodeID]float64) error {
|
||||
func (m *lockedAccounting) SaveAtRestRaw(ctx context.Context, latestTally time.Time, nodeData map[storj.NodeID]float64) error {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
return m.db.SaveAtRestRaw(ctx, latestTally, isNew, nodeData)
|
||||
return m.db.SaveAtRestRaw(ctx, latestTally, nodeData)
|
||||
}
|
||||
|
||||
// SaveBWRaw records raw sums of agreement values to the database and updates the LastRawTime.
|
||||
func (m *lockedAccounting) SaveBWRaw(ctx context.Context, latestBwa time.Time, isNew bool, bwTotals accounting.BWTally) error {
|
||||
// SaveBWRaw records raw sums of agreement values to the database and updates the LastTimestamp.
|
||||
func (m *lockedAccounting) SaveBWRaw(ctx context.Context, tallyEnd time.Time, bwTotals map[storj.NodeID][]int64) error {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
return m.db.SaveBWRaw(ctx, latestBwa, isNew, bwTotals)
|
||||
return m.db.SaveBWRaw(ctx, tallyEnd, bwTotals)
|
||||
}
|
||||
|
||||
// SaveRollup records raw tallies of at rest data to the database
|
||||
func (m *lockedAccounting) SaveRollup(ctx context.Context, latestTally time.Time, isNew bool, stats accounting.RollupStats) error {
|
||||
func (m *lockedAccounting) SaveRollup(ctx context.Context, latestTally time.Time, stats accounting.RollupStats) error {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
return m.db.SaveRollup(ctx, latestTally, isNew, stats)
|
||||
return m.db.SaveRollup(ctx, latestTally, stats)
|
||||
}
|
||||
|
||||
// BandwidthAgreement returns database for storing bandwidth agreements
|
||||
@ -117,18 +117,18 @@ func (m *lockedBandwidthAgreement) CreateAgreement(ctx context.Context, a1 *pb.R
|
||||
return m.db.CreateAgreement(ctx, a1)
|
||||
}
|
||||
|
||||
// GetAgreements gets all bandwidth agreements.
|
||||
func (m *lockedBandwidthAgreement) GetAgreements(ctx context.Context) ([]bwagreement.Agreement, error) {
|
||||
// GetTotalsSince returns the sum of each bandwidth type after (exluding) a given date range
|
||||
func (m *lockedBandwidthAgreement) GetTotals(ctx context.Context, a1 time.Time, a2 time.Time) (map[storj.NodeID][]int64, error) {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
return m.db.GetAgreements(ctx)
|
||||
return m.db.GetTotals(ctx, a1, a2)
|
||||
}
|
||||
|
||||
// GetAgreementsSince gets all bandwidth agreements since specific time.
|
||||
func (m *lockedBandwidthAgreement) GetAgreementsSince(ctx context.Context, a1 time.Time) ([]bwagreement.Agreement, error) {
|
||||
// GetTotals returns stats about an uplink
|
||||
func (m *lockedBandwidthAgreement) GetUplinkStats(ctx context.Context, a1 time.Time, a2 time.Time) ([]bwagreement.UplinkStat, error) {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
return m.db.GetAgreementsSince(ctx, a1)
|
||||
return m.db.GetUplinkStats(ctx, a1, a2)
|
||||
}
|
||||
|
||||
// Close closes the database
|
||||
|
Loading…
Reference in New Issue
Block a user