From 0f662b8e381b9c3cdc262a2434d1e32de386d696 Mon Sep 17 00:00:00 2001 From: Bill Thorp Date: Fri, 1 Feb 2019 13:50:12 -0500 Subject: [PATCH] sql based tally (#1166) * WIP * wacky changes * more * it builds.... IT BUILDS!!! * fixed SQL, broke out saving tallies for testing * shorter lines * fixed SQL, moved tally_test to testplanet * lint * WIP logic error preventing PUT and GETs to same serialnum * fixed BWA test * fixed temporary brain failure * eliminated magic numbers * cleaned up satellite uplink stats * use errs.Combine instead * thrashing * fixed tally erroneous error msg * fixed tally test * lint * SQL syntax attempt to fix * spelling error * made bwa db test resist old postgres data * postgres pk error msg hunting * postgres pk error msg hunting * postgres * err might be nil? * fixed error logging bug * hopefully solved postgres issue * using rebind * moved tests to _test package * fixing test dirs * finally made sense of Egons package name feedback * UTC, array fixes --- cmd/satellite/main.go | 47 +-- pkg/accounting/db.go | 16 +- pkg/accounting/rollup/rollup.go | 12 +- pkg/accounting/rollup/rollup_test.go | 4 +- pkg/accounting/tally/tally.go | 111 ++++--- pkg/accounting/tally/tally_test.go | 116 +++---- pkg/bwagreement/bwagreement_test.go | 80 +++++ pkg/bwagreement/server.go | 39 ++- pkg/bwagreement/server_test.go | 2 +- .../agreementsender/agreementsender.go | 5 +- satellite/satellitedb/accounting.go | 76 +++-- satellite/satellitedb/bandwidthagreement.go | 86 +++-- satellite/satellitedb/dbx/satellitedb.dbx | 32 +- satellite/satellitedb/dbx/satellitedb.dbx.go | 297 ++++-------------- .../dbx/satellitedb.dbx.postgres.sql | 4 +- .../dbx/satellitedb.dbx.sqlite3.sql | 4 +- satellite/satellitedb/locked.go | 32 +- 17 files changed, 409 insertions(+), 554 deletions(-) create mode 100644 pkg/bwagreement/bwagreement_test.go diff --git a/cmd/satellite/main.go b/cmd/satellite/main.go index 601d7c0f0..dbfaffc76 100644 --- a/cmd/satellite/main.go +++ b/cmd/satellite/main.go @@ -8,7 +8,6 @@ import ( "fmt" "os" "path/filepath" - "sort" "text/tabwriter" "time" @@ -18,9 +17,7 @@ import ( "storj.io/storj/internal/fpath" "storj.io/storj/pkg/cfgstruct" - "storj.io/storj/pkg/pb" "storj.io/storj/pkg/process" - "storj.io/storj/pkg/storj" "storj.io/storj/satellite" "storj.io/storj/satellite/satellitedb" ) @@ -196,58 +193,20 @@ func cmdDiag(cmd *cobra.Command, args []string) (err error) { }() //get all bandwidth agreements rows already ordered - baRows, err := database.BandwidthAgreement().GetAgreements(context.Background()) + stats, err := database.BandwidthAgreement().GetUplinkStats(context.Background(), time.Time{}, time.Now()) if err != nil { fmt.Printf("error reading satellite database %v: %v\n", diagCfg.Database, err) return err } - // Agreement is a struct that contains a bandwidth agreement and the associated signature - type UplinkSummary struct { - TotalBytes int64 - PutActionCount int64 - GetActionCount int64 - TotalTransactions int64 - // additional attributes add here ... - } - - // attributes per uplinkid - summaries := make(map[storj.NodeID]*UplinkSummary) - uplinkIDs := storj.NodeIDList{} - - for _, baRow := range baRows { - // deserializing rbad you get payerbwallocation, total & storage node id - rbad := baRow.Agreement - pbad := rbad.PayerAllocation - uplinkID := pbad.UplinkId - summary, ok := summaries[uplinkID] - if !ok { - summaries[uplinkID] = &UplinkSummary{} - uplinkIDs = append(uplinkIDs, uplinkID) - summary = summaries[uplinkID] - } - - // fill the summary info - summary.TotalBytes += rbad.Total - summary.TotalTransactions++ - switch pbad.GetAction() { - case pb.BandwidthAction_PUT: - summary.PutActionCount++ - case pb.BandwidthAction_GET: - summary.GetActionCount++ - } - } - // initialize the table header (fields) const padding = 3 w := tabwriter.NewWriter(os.Stdout, 0, 0, padding, ' ', tabwriter.AlignRight|tabwriter.Debug) fmt.Fprintln(w, "UplinkID\tTotal\t# Of Transactions\tPUT Action\tGET Action\t") // populate the row fields - sort.Sort(uplinkIDs) - for _, uplinkID := range uplinkIDs { - summary := summaries[uplinkID] - fmt.Fprint(w, uplinkID, "\t", summary.TotalBytes, "\t", summary.TotalTransactions, "\t", summary.PutActionCount, "\t", summary.GetActionCount, "\t\n") + for _, s := range stats { + fmt.Fprint(w, s.NodeID, "\t", s.TotalBytes, "\t", s.TotalTransactions, "\t", s.PutActionCount, "\t", s.GetActionCount, "\t\n") } // display the data diff --git a/pkg/accounting/db.go b/pkg/accounting/db.go index 30cdd8a42..910764727 100644 --- a/pkg/accounting/db.go +++ b/pkg/accounting/db.go @@ -7,13 +7,9 @@ import ( "context" "time" - "storj.io/storj/pkg/pb" "storj.io/storj/pkg/storj" ) -//BWTally is a convenience alias -type BWTally [pb.BandwidthAction_PUT_REPAIR + 1]map[storj.NodeID]int64 - //RollupStats is a convenience alias type RollupStats map[time.Time]map[storj.NodeID]*Rollup @@ -42,18 +38,18 @@ type Rollup struct { // DB stores information about bandwidth usage type DB interface { - // LastRawTime records the latest last tallied time. - LastRawTime(ctx context.Context, timestampType string) (time.Time, bool, error) - // SaveBWRaw records raw sums of agreement values to the database and updates the LastRawTime. - SaveBWRaw(ctx context.Context, latestBwa time.Time, isNew bool, bwTotals BWTally) error + // LastTimestamp records the latest last tallied time. + LastTimestamp(ctx context.Context, timestampType string) (time.Time, error) + // SaveBWRaw records raw sums of agreement values to the database and updates the LastTimestamp. + SaveBWRaw(ctx context.Context, tallyEnd time.Time, bwTotals map[storj.NodeID][]int64) error // SaveAtRestRaw records raw tallies of at-rest-data. - SaveAtRestRaw(ctx context.Context, latestTally time.Time, isNew bool, nodeData map[storj.NodeID]float64) error + SaveAtRestRaw(ctx context.Context, latestTally time.Time, nodeData map[storj.NodeID]float64) error // GetRaw retrieves all raw tallies GetRaw(ctx context.Context) ([]*Raw, error) // GetRawSince r retrieves all raw tallies sinces GetRawSince(ctx context.Context, latestRollup time.Time) ([]*Raw, error) // SaveRollup records raw tallies of at rest data to the database - SaveRollup(ctx context.Context, latestTally time.Time, isNew bool, stats RollupStats) error + SaveRollup(ctx context.Context, latestTally time.Time, stats RollupStats) error // QueryPaymentInfo queries StatDB, Accounting Rollup on nodeID QueryPaymentInfo(ctx context.Context, start time.Time, end time.Time) ([]*CSVRow, error) } diff --git a/pkg/accounting/rollup/rollup.go b/pkg/accounting/rollup/rollup.go index aee3051fb..d9d45dc2b 100644 --- a/pkg/accounting/rollup/rollup.go +++ b/pkg/accounting/rollup/rollup.go @@ -56,17 +56,11 @@ func (r *Rollup) Run(ctx context.Context) (err error) { func (r *Rollup) Query(ctx context.Context) error { // only Rollup new things - get LastRollup var latestTally time.Time - lastRollup, isNil, err := r.db.LastRawTime(ctx, accounting.LastRollup) + lastRollup, err := r.db.LastTimestamp(ctx, accounting.LastRollup) if err != nil { return Error.Wrap(err) } - var tallies []*accounting.Raw - if isNil { - r.logger.Info("Rollup found no existing raw tally data") - tallies, err = r.db.GetRaw(ctx) - } else { - tallies, err = r.db.GetRawSince(ctx, lastRollup) - } + tallies, err := r.db.GetRawSince(ctx, lastRollup) if err != nil { return Error.Wrap(err) } @@ -115,5 +109,5 @@ func (r *Rollup) Query(ctx context.Context) error { r.logger.Info("Rollup only found tallies for today") return nil } - return Error.Wrap(r.db.SaveRollup(ctx, latestTally, isNil, rollupStats)) + return Error.Wrap(r.db.SaveRollup(ctx, latestTally, rollupStats)) } diff --git a/pkg/accounting/rollup/rollup_test.go b/pkg/accounting/rollup/rollup_test.go index d5a69058a..6c2ab1cbe 100644 --- a/pkg/accounting/rollup/rollup_test.go +++ b/pkg/accounting/rollup/rollup_test.go @@ -27,7 +27,7 @@ func TestQueryOneDay(t *testing.T) { now := time.Now().UTC() later := now.Add(time.Hour * 24) - err := db.Accounting().SaveAtRestRaw(ctx, now, true, nodeData) + err := db.Accounting().SaveAtRestRaw(ctx, now, nodeData) assert.NoError(t, err) // test should return error because we delete latest day's rollup @@ -48,7 +48,7 @@ func TestQueryTwoDays(t *testing.T) { now := time.Now().UTC() then := now.Add(time.Hour * -24) - err := db.Accounting().SaveAtRestRaw(ctx, now, true, nodeData) + err := db.Accounting().SaveAtRestRaw(ctx, now, nodeData) assert.NoError(t, err) // db.db.Exec("UPDATE accounting_raws SET created_at= WHERE ") diff --git a/pkg/accounting/tally/tally.go b/pkg/accounting/tally/tally.go index d671ce0d5..c2a252eb1 100644 --- a/pkg/accounting/tally/tally.go +++ b/pkg/accounting/tally/tally.go @@ -8,6 +8,7 @@ import ( "time" "github.com/gogo/protobuf/proto" + "github.com/zeebo/errs" "go.uber.org/zap" "storj.io/storj/pkg/accounting" @@ -52,15 +53,9 @@ func (t *Tally) Run(ctx context.Context) (err error) { t.logger.Info("Tally service starting up") defer mon.Task()(&ctx)(&err) for { - err = t.calculateAtRestData(ctx) - if err != nil { - t.logger.Error("calculateAtRestData failed", zap.Error(err)) + if err = t.Tally(ctx); err != nil { + t.logger.Error("Tally failed", zap.Error(err)) } - err = t.QueryBW(ctx) - if err != nil { - t.logger.Error("Query for bandwidth failed", zap.Error(err)) - } - select { case <-t.ticker.C: // wait for the next interval to happen case <-ctx.Done(): // or the Tally is canceled via context @@ -69,17 +64,42 @@ func (t *Tally) Run(ctx context.Context) (err error) { } } +//Tally calculates data-at-rest and bandwidth usage once +func (t *Tally) Tally(ctx context.Context) error { + //data at rest + var errAtRest, errBWA error + latestTally, nodeData, err := t.calculateAtRestData(ctx) + if err != nil { + errAtRest = errs.New("Query for data-at-rest failed : %v", err) + } else if len(nodeData) > 0 { + err = t.SaveAtRestRaw(ctx, latestTally, nodeData) + if err != nil { + errAtRest = errs.New("Saving data-at-rest failed : %v", err) + } + } + //bandwdith + tallyEnd, bwTotals, err := t.QueryBW(ctx) + if err != nil { + errBWA = errs.New("Query for bandwidth failed: %v", err) + } else if len(bwTotals) > 0 { + err = t.SaveBWRaw(ctx, tallyEnd, bwTotals) + if err != nil { + errBWA = errs.New("Saving for bandwidth failed : %v", err) + } + } + return errs.Combine(errAtRest, errBWA) +} + // calculateAtRestData iterates through the pieces on pointerdb and calculates // the amount of at-rest data stored on each respective node -func (t *Tally) calculateAtRestData(ctx context.Context) (err error) { +func (t *Tally) calculateAtRestData(ctx context.Context) (latestTally time.Time, nodeData map[storj.NodeID]float64, err error) { defer mon.Task()(&ctx)(&err) - latestTally, isNil, err := t.accountingDB.LastRawTime(ctx, accounting.LastAtRestTally) + latestTally, err = t.accountingDB.LastTimestamp(ctx, accounting.LastAtRestTally) if err != nil { - return Error.Wrap(err) + return latestTally, nodeData, Error.Wrap(err) } - var nodeData = make(map[storj.NodeID]float64) err = t.pointerdb.Iterate("", "", true, false, func(it storage.Iterator) error { var item storage.ListItem @@ -119,60 +139,49 @@ func (t *Tally) calculateAtRestData(ctx context.Context) (err error) { }, ) if len(nodeData) == 0 { - return nil + return latestTally, nodeData, nil } if err != nil { - return Error.Wrap(err) + return latestTally, nodeData, Error.Wrap(err) } //store byte hours, not just bytes - numHours := 1.0 //todo: something more considered? - if !isNil { - numHours = time.Now().UTC().Sub(latestTally).Hours() + numHours := time.Now().Sub(latestTally).Hours() + if latestTally.IsZero() { + numHours = 1.0 //todo: something more considered? } - - latestTally = time.Now().UTC() - + latestTally = time.Now() for k := range nodeData { - nodeData[k] *= numHours + nodeData[k] *= numHours //calculate byte hours } - return Error.Wrap(t.accountingDB.SaveAtRestRaw(ctx, latestTally, isNil, nodeData)) + return latestTally, nodeData, err +} + +// SaveAtRestRaw records raw tallies of at-rest-data and updates the LastTimestamp +func (t *Tally) SaveAtRestRaw(ctx context.Context, latestTally time.Time, nodeData map[storj.NodeID]float64) error { + return t.accountingDB.SaveAtRestRaw(ctx, latestTally, nodeData) } // QueryBW queries bandwidth allocation database, selecting all new contracts since the last collection run time. // Grouping by action type, storage node ID and adding total of bandwidth to granular data table. -func (t *Tally) QueryBW(ctx context.Context) error { - lastBwTally, isNil, err := t.accountingDB.LastRawTime(ctx, accounting.LastBandwidthTally) +func (t *Tally) QueryBW(ctx context.Context) (time.Time, map[storj.NodeID][]int64, error) { + var bwTotals map[storj.NodeID][]int64 + now := time.Now() + lastBwTally, err := t.accountingDB.LastTimestamp(ctx, accounting.LastBandwidthTally) if err != nil { - return Error.Wrap(err) - } - - var bwAgreements []bwagreement.Agreement - if isNil { - t.logger.Info("Tally found no existing bandwidth tracking data") - bwAgreements, err = t.bwAgreementDB.GetAgreements(ctx) - } else { - bwAgreements, err = t.bwAgreementDB.GetAgreementsSince(ctx, lastBwTally) + return now, bwTotals, Error.Wrap(err) } + bwTotals, err = t.bwAgreementDB.GetTotals(ctx, lastBwTally, now) if err != nil { - return Error.Wrap(err) + return now, bwTotals, Error.Wrap(err) } - if len(bwAgreements) == 0 { + if len(bwTotals) == 0 { t.logger.Info("Tally found no new bandwidth allocations") - return nil + return now, bwTotals, nil } - - // sum totals by node id ... todo: add nodeid as SQL column so DB can do this? - var bwTotals accounting.BWTally - for i := range bwTotals { - bwTotals[i] = make(map[storj.NodeID]int64) - } - var latestBwa time.Time - for _, baRow := range bwAgreements { - rba := baRow.Agreement - if baRow.CreatedAt.After(latestBwa) { - latestBwa = baRow.CreatedAt - } - bwTotals[rba.PayerAllocation.Action][rba.StorageNodeId] += rba.Total - } - return Error.Wrap(t.accountingDB.SaveBWRaw(ctx, latestBwa, isNil, bwTotals)) + return now, bwTotals, nil +} + +// SaveBWRaw records granular tallies (sums of bw agreement values) to the database and updates the LastTimestamp +func (t *Tally) SaveBWRaw(ctx context.Context, tallyEnd time.Time, bwTotals map[storj.NodeID][]int64) error { + return t.accountingDB.SaveBWRaw(ctx, tallyEnd, bwTotals) } diff --git a/pkg/accounting/tally/tally_test.go b/pkg/accounting/tally/tally_test.go index 1e5ce4a2a..0b090b012 100644 --- a/pkg/accounting/tally/tally_test.go +++ b/pkg/accounting/tally/tally_test.go @@ -8,82 +8,70 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" - "go.uber.org/zap" + "github.com/stretchr/testify/require" "storj.io/storj/internal/testcontext" - "storj.io/storj/internal/testidentity" - "storj.io/storj/internal/teststorj" - "storj.io/storj/pkg/accounting/tally" - "storj.io/storj/pkg/bwagreement" + "storj.io/storj/internal/testplanet" "storj.io/storj/pkg/bwagreement/testbwagreement" - "storj.io/storj/pkg/identity" - "storj.io/storj/pkg/overlay/mocks" "storj.io/storj/pkg/pb" - "storj.io/storj/pkg/pointerdb" - "storj.io/storj/satellite/satellitedb" - "storj.io/storj/storage/teststore" + "storj.io/storj/pkg/piecestore/psserver/psdb" ) -func TestQueryNoAgreements(t *testing.T) { +func runPlanet(t *testing.T, f func(context.Context, *testplanet.Planet)) { ctx := testcontext.New(t) defer ctx.Cleanup() + planet, err := testplanet.New(t, 1, 1, 1) + //time.Sleep(5 * time.Second) + require.NoError(t, err) + defer ctx.Check(planet.Shutdown) + planet.Start(ctx) + f(ctx, planet) +} - // TODO: use testplanet - - service := pointerdb.NewService(zap.NewNop(), teststore.New()) - overlayServer := mocks.NewOverlay([]*pb.Node{}) - db, err := satellitedb.NewInMemory() - assert.NoError(t, err) - defer ctx.Check(db.Close) - assert.NoError(t, db.CreateTables()) - - tally := tally.New(zap.NewNop(), db.Accounting(), db.BandwidthAgreement(), service, overlayServer, 0, time.Second) - - err = tally.QueryBW(ctx) - assert.NoError(t, err) +func TestQueryNoAgreements(t *testing.T) { + runPlanet(t, func(ctx context.Context, planet *testplanet.Planet) { + tally := planet.Satellites[0].Accounting.Tally + _, bwTotals, err := tally.QueryBW(ctx) + require.NoError(t, err) + require.Len(t, bwTotals, 0) + }) } func TestQueryWithBw(t *testing.T) { - ctx := testcontext.New(t) - defer ctx.Cleanup() - - // TODO: use testplanet - - service := pointerdb.NewService(zap.NewNop(), teststore.New()) - overlayServer := mocks.NewOverlay([]*pb.Node{}) - - db, err := satellitedb.NewInMemory() - assert.NoError(t, err) - defer ctx.Check(db.Close) - - assert.NoError(t, db.CreateTables()) - - bwDb := db.BandwidthAgreement() - tally := tally.New(zap.NewNop(), db.Accounting(), bwDb, service, overlayServer, 0, time.Second) - - //get a private key - fiC, err := testidentity.NewTestIdentity(ctx) - assert.NoError(t, err) - - makeBWA(ctx, t, bwDb, fiC, pb.BandwidthAction_PUT) - makeBWA(ctx, t, bwDb, fiC, pb.BandwidthAction_GET) - makeBWA(ctx, t, bwDb, fiC, pb.BandwidthAction_GET_AUDIT) - makeBWA(ctx, t, bwDb, fiC, pb.BandwidthAction_GET_REPAIR) - makeBWA(ctx, t, bwDb, fiC, pb.BandwidthAction_PUT_REPAIR) - - //check the db - err = tally.QueryBW(ctx) - assert.NoError(t, err) + runPlanet(t, func(ctx context.Context, planet *testplanet.Planet) { + tally := planet.Satellites[0].Accounting.Tally + makeBWAs(ctx, t, planet) + //check the db + tallyEnd, bwTotals, err := tally.QueryBW(ctx) + require.NoError(t, err) + require.Len(t, bwTotals, 1) + for id, nodeTotals := range bwTotals { + require.Len(t, nodeTotals, 5) + for _, total := range nodeTotals { + require.Equal(t, planet.StorageNodes[0].Identity.ID, id) + require.Equal(t, int64(1000), total) + } + } + err = tally.SaveBWRaw(ctx, tallyEnd, bwTotals) + require.NoError(t, err) + }) } -func makeBWA(ctx context.Context, t *testing.T, bwDb bwagreement.DB, fiC *identity.FullIdentity, action pb.BandwidthAction) { - //generate an agreement with the key - pba, err := testbwagreement.GeneratePayerBandwidthAllocation(action, fiC, fiC, time.Hour) - assert.NoError(t, err) - rba, err := testbwagreement.GenerateRenterBandwidthAllocation(pba, teststorj.NodeIDFromString("StorageNodeID"), fiC, 666) - assert.NoError(t, err) - //save to db - err = bwDb.CreateAgreement(ctx, rba) - assert.NoError(t, err) +func makeBWAs(ctx context.Context, t *testing.T, planet *testplanet.Planet) { + satID := planet.Satellites[0].Identity + upID := planet.Uplinks[0].Identity + snID := planet.StorageNodes[0].Identity + sender := planet.StorageNodes[0].Agreements.Sender + actions := []pb.BandwidthAction{pb.BandwidthAction_PUT, pb.BandwidthAction_GET, + pb.BandwidthAction_GET_AUDIT, pb.BandwidthAction_GET_REPAIR, pb.BandwidthAction_PUT_REPAIR} + agreements := make([]*psdb.Agreement, len(actions)) + for i, action := range actions { + pba, err := testbwagreement.GeneratePayerBandwidthAllocation(action, satID, upID, time.Hour) + require.NoError(t, err) + rba, err := testbwagreement.GenerateRenterBandwidthAllocation(pba, snID.ID, upID, 1000) + require.NoError(t, err) + agreements[i] = &psdb.Agreement{Agreement: *rba} + } + sender.SendAgreementsToSatellite(ctx, satID.ID, agreements) + } diff --git a/pkg/bwagreement/bwagreement_test.go b/pkg/bwagreement/bwagreement_test.go new file mode 100644 index 000000000..c2e5e1d1f --- /dev/null +++ b/pkg/bwagreement/bwagreement_test.go @@ -0,0 +1,80 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package bwagreement_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "storj.io/storj/internal/testcontext" + "storj.io/storj/internal/testidentity" + "storj.io/storj/pkg/bwagreement" + "storj.io/storj/pkg/identity" + "storj.io/storj/pkg/pb" + "storj.io/storj/satellite" + "storj.io/storj/satellite/satellitedb/satellitedbtest" +) + +func TestBandwidthDBAgreement(t *testing.T) { + satellitedbtest.Run(t, func(t *testing.T, db satellite.DB) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + upID, err := testidentity.NewTestIdentity(ctx) + require.NoError(t, err) + snID, err := testidentity.NewTestIdentity(ctx) + require.NoError(t, err) + + require.NoError(t, testCreateAgreement(ctx, t, db.BandwidthAgreement(), pb.BandwidthAction_PUT, "1", upID, snID)) + require.Error(t, testCreateAgreement(ctx, t, db.BandwidthAgreement(), pb.BandwidthAction_GET, "1", upID, snID)) + require.NoError(t, testCreateAgreement(ctx, t, db.BandwidthAgreement(), pb.BandwidthAction_GET, "2", upID, snID)) + testGetTotals(ctx, t, db.BandwidthAgreement(), snID) + testGetUplinkStats(ctx, t, db.BandwidthAgreement(), upID) + }) +} + +func testCreateAgreement(ctx context.Context, t *testing.T, b bwagreement.DB, action pb.BandwidthAction, + serialNum string, upID, snID *identity.FullIdentity) error { + rba := &pb.RenterBandwidthAllocation{ + PayerAllocation: pb.PayerBandwidthAllocation{ + Action: action, + SerialNumber: serialNum, + UplinkId: upID.ID, + }, + Total: 1000, + StorageNodeId: snID.ID, + } + return b.CreateAgreement(ctx, rba) +} + +func testGetUplinkStats(ctx context.Context, t *testing.T, b bwagreement.DB, upID *identity.FullIdentity) { + stats, err := b.GetUplinkStats(ctx, time.Time{}, time.Now().UTC()) + require.NoError(t, err) + var found int + for _, s := range stats { + if upID.ID == s.NodeID { + found++ + require.Equal(t, int64(2000), s.TotalBytes) + require.Equal(t, 1, s.GetActionCount) + require.Equal(t, 1, s.PutActionCount) + require.Equal(t, 2, s.TotalTransactions) + } + } + require.Equal(t, 1, found) +} + +func testGetTotals(ctx context.Context, t *testing.T, b bwagreement.DB, snID *identity.FullIdentity) { + totals, err := b.GetTotals(ctx, time.Time{}, time.Now().UTC()) + require.NoError(t, err) + total := totals[snID.ID] + require.Len(t, total, 5) + require.Equal(t, int64(1000), total[pb.BandwidthAction_PUT]) + require.Equal(t, int64(1000), total[pb.BandwidthAction_GET]) + require.Equal(t, int64(0), total[pb.BandwidthAction_GET_AUDIT]) + require.Equal(t, int64(0), total[pb.BandwidthAction_GET_REPAIR]) + require.Equal(t, int64(0), total[pb.BandwidthAction_PUT_REPAIR]) +} diff --git a/pkg/bwagreement/server.go b/pkg/bwagreement/server.go index 85dc4ff46..313717986 100644 --- a/pkg/bwagreement/server.go +++ b/pkg/bwagreement/server.go @@ -5,6 +5,7 @@ package bwagreement import ( "context" + "strings" "time" "github.com/zeebo/errs" @@ -28,14 +29,23 @@ var ( type Config struct { } +//UplinkStat contains information about an uplink's returned bandwidth agreement +type UplinkStat struct { + NodeID storj.NodeID + TotalBytes int64 + PutActionCount int + GetActionCount int + TotalTransactions int +} + // DB stores bandwidth agreements. type DB interface { // CreateAgreement adds a new bandwidth agreement. CreateAgreement(context.Context, *pb.RenterBandwidthAllocation) error - // GetAgreements gets all bandwidth agreements. - GetAgreements(context.Context) ([]Agreement, error) - // GetAgreementsSince gets all bandwidth agreements since specific time. - GetAgreementsSince(context.Context, time.Time) ([]Agreement, error) + // GetTotalsSince returns the sum of each bandwidth type after (exluding) a given date range + GetTotals(context.Context, time.Time, time.Time) (map[storj.NodeID][]int64, error) + //GetTotals returns stats about an uplink + GetUplinkStats(context.Context, time.Time, time.Time) ([]UplinkStat, error) } // Server is an implementation of the pb.BandwidthServer interface @@ -45,12 +55,6 @@ type Server struct { logger *zap.Logger } -// Agreement is a struct that contains a bandwidth agreement and the associated signature -type Agreement struct { - Agreement pb.RenterBandwidthAllocation - CreatedAt time.Time -} - // NewServer creates instance of Server func NewServer(db DB, logger *zap.Logger, nodeID storj.NodeID) *Server { // TODO: reorder arguments, rename logger -> log @@ -71,11 +75,11 @@ func (s *Server) BandwidthAgreements(ctx context.Context, rba *pb.RenterBandwidt //verify message content pi, err := identity.PeerIdentityFromContext(ctx) if err != nil || rba.StorageNodeId != pi.ID { - return reply, auth.ErrBadID.New("Storage Node ID: %s vs %s", rba.StorageNodeId, pi.ID) + return reply, auth.ErrBadID.New("Storage Node ID: %v vs %v", rba.StorageNodeId, pi.ID) } //todo: use whitelist for uplinks? if pba.SatelliteId != s.NodeID { - return reply, pb.ErrPayer.New("Satellite ID: %s vs %s", pba.SatelliteId, s.NodeID) + return reply, pb.ErrPayer.New("Satellite ID: %v vs %v", pba.SatelliteId, s.NodeID) } exp := time.Unix(pba.GetExpirationUnixSec(), 0).UTC() if exp.Before(time.Now().UTC()) { @@ -90,10 +94,13 @@ func (s *Server) BandwidthAgreements(ctx context.Context, rba *pb.RenterBandwidt } //save and return rersults - err = s.db.CreateAgreement(ctx, rba) - if err != nil { - //todo: better classify transport errors (AgreementsSummary_FAIL) vs logical (AgreementsSummary_REJECTED) - return reply, pb.ErrPayer.Wrap(auth.ErrSerial.Wrap(err)) + if err = s.db.CreateAgreement(ctx, rba); err != nil { + if strings.Contains(err.Error(), "UNIQUE constraint failed") || + strings.Contains(err.Error(), "violates unique constraint") { + return reply, pb.ErrPayer.Wrap(auth.ErrSerial.Wrap(err)) + } + reply.Status = pb.AgreementsSummary_FAIL + return reply, pb.ErrPayer.Wrap(err) } reply.Status = pb.AgreementsSummary_OK s.logger.Debug("Stored Agreement...") diff --git a/pkg/bwagreement/server_test.go b/pkg/bwagreement/server_test.go index a10808607..aa0491515 100644 --- a/pkg/bwagreement/server_test.go +++ b/pkg/bwagreement/server_test.go @@ -112,7 +112,7 @@ func testDatabase(ctx context.Context, t *testing.T, bwdb bwagreement.DB) { assert.NoError(t, err) reply, err := satellite.BandwidthAgreements(ctxSN1, rbaNode1) - assert.True(t, auth.ErrSerial.Has(err)) + assert.True(t, auth.ErrSerial.Has(err), err.Error()) assert.Equal(t, pb.AgreementsSummary_REJECTED, reply.Status) } diff --git a/pkg/piecestore/psserver/agreementsender/agreementsender.go b/pkg/piecestore/psserver/agreementsender/agreementsender.go index 8c5435042..fe1f0bcef 100644 --- a/pkg/piecestore/psserver/agreementsender/agreementsender.go +++ b/pkg/piecestore/psserver/agreementsender/agreementsender.go @@ -52,7 +52,7 @@ func (as *AgreementSender) Run(ctx context.Context) error { continue } for satellite, agreements := range agreementGroups { - as.sendAgreementsToSatellite(ctx, satellite, agreements) + as.SendAgreementsToSatellite(ctx, satellite, agreements) } select { case <-ticker.C: @@ -62,7 +62,8 @@ func (as *AgreementSender) Run(ctx context.Context) error { } } -func (as *AgreementSender) sendAgreementsToSatellite(ctx context.Context, satID storj.NodeID, agreements []*psdb.Agreement) { +//SendAgreementsToSatellite uploads agreements to the satellite +func (as *AgreementSender) SendAgreementsToSatellite(ctx context.Context, satID storj.NodeID, agreements []*psdb.Agreement) { as.log.Info("Sending agreements to satellite", zap.Int("number of agreements", len(agreements)), zap.String("satellite id", satID.String())) // todo: cache kad responses if this interval is very small // Get satellite ip from kademlia diff --git a/satellite/satellitedb/accounting.go b/satellite/satellitedb/accounting.go index c6edd6a53..ad578ae79 100644 --- a/satellite/satellitedb/accounting.go +++ b/satellite/satellitedb/accounting.go @@ -7,6 +7,8 @@ import ( "context" "time" + "github.com/zeebo/errs" + "storj.io/storj/pkg/accounting" "storj.io/storj/pkg/storj" "storj.io/storj/pkg/utils" @@ -18,18 +20,31 @@ type accountingDB struct { db *dbx.DB } -// LastRawTime records the greatest last tallied time -func (db *accountingDB) LastRawTime(ctx context.Context, timestampType string) (time.Time, bool, error) { - lastTally, err := db.db.Find_AccountingTimestamps_Value_By_Name(ctx, dbx.AccountingTimestamps_Name(timestampType)) - if lastTally == nil { - return time.Time{}, true, err +// LastTimestamp records the greatest last tallied time +func (db *accountingDB) LastTimestamp(ctx context.Context, timestampType string) (last time.Time, err error) { + // todo: use WithTx https://github.com/spacemonkeygo/dbx#transactions + tx, err := db.db.Open(ctx) + if err != nil { + return last, Error.Wrap(err) } - return lastTally.Value, false, err + defer func() { + if err == nil { + err = tx.Commit() + } else { + err = errs.Combine(err, tx.Rollback()) + } + }() + lastTally, err := tx.Find_AccountingTimestamps_Value_By_Name(ctx, dbx.AccountingTimestamps_Name(timestampType)) + if lastTally == nil { + update := dbx.AccountingTimestamps_Value(time.Time{}) + _, err = tx.Create_AccountingTimestamps(ctx, dbx.AccountingTimestamps_Name(timestampType), update) + return time.Time{}, err + } + return lastTally.Value, err } -// SaveBWRaw records granular tallies (sums of bw agreement values) to the database -// and updates the LastRawTime -func (db *accountingDB) SaveBWRaw(ctx context.Context, latestBwa time.Time, isNew bool, bwTotals accounting.BWTally) (err error) { +// SaveBWRaw records granular tallies (sums of bw agreement values) to the database and updates the LastTimestamp +func (db *accountingDB) SaveBWRaw(ctx context.Context, tallyEnd time.Time, bwTotals map[storj.NodeID][]int64) (err error) { // We use the latest bandwidth agreement value of a batch of records as the start of the next batch // todo: consider finding the sum of bwagreements using SQL sum() direct against the bwa table if len(bwTotals) == 0 { @@ -48,11 +63,11 @@ func (db *accountingDB) SaveBWRaw(ctx context.Context, latestBwa time.Time, isNe } }() //create a granular record per node id - for actionType, bwActionTotals := range bwTotals { - for k, v := range bwActionTotals { - nID := dbx.AccountingRaw_NodeId(k.Bytes()) - end := dbx.AccountingRaw_IntervalEndTime(latestBwa) - total := dbx.AccountingRaw_DataTotal(float64(v)) + for nodeID, totals := range bwTotals { + for actionType, total := range totals { + nID := dbx.AccountingRaw_NodeId(nodeID.Bytes()) + end := dbx.AccountingRaw_IntervalEndTime(tallyEnd) + total := dbx.AccountingRaw_DataTotal(float64(total)) dataType := dbx.AccountingRaw_DataType(actionType) _, err = tx.Create_AccountingRaw(ctx, nID, end, total, dataType) if err != nil { @@ -61,19 +76,13 @@ func (db *accountingDB) SaveBWRaw(ctx context.Context, latestBwa time.Time, isNe } } //save this batch's greatest time - - if isNew { - update := dbx.AccountingTimestamps_Value(latestBwa) - _, err = tx.Create_AccountingTimestamps(ctx, dbx.AccountingTimestamps_Name(accounting.LastBandwidthTally), update) - } else { - update := dbx.AccountingTimestamps_Update_Fields{Value: dbx.AccountingTimestamps_Value(latestBwa)} - _, err = tx.Update_AccountingTimestamps_By_Name(ctx, dbx.AccountingTimestamps_Name(accounting.LastBandwidthTally), update) - } + update := dbx.AccountingTimestamps_Update_Fields{Value: dbx.AccountingTimestamps_Value(tallyEnd)} + _, err = tx.Update_AccountingTimestamps_By_Name(ctx, dbx.AccountingTimestamps_Name(accounting.LastBandwidthTally), update) return err } // SaveAtRestRaw records raw tallies of at rest data to the database -func (db *accountingDB) SaveAtRestRaw(ctx context.Context, latestTally time.Time, isNew bool, nodeData map[storj.NodeID]float64) error { +func (db *accountingDB) SaveAtRestRaw(ctx context.Context, latestTally time.Time, nodeData map[storj.NodeID]float64) error { if len(nodeData) == 0 { return Error.New("In SaveAtRestRaw with empty nodeData") } @@ -98,14 +107,8 @@ func (db *accountingDB) SaveAtRestRaw(ctx context.Context, latestTally time.Time return Error.Wrap(err) } } - - if isNew { - update := dbx.AccountingTimestamps_Value(latestTally) - _, err = tx.Create_AccountingTimestamps(ctx, dbx.AccountingTimestamps_Name(accounting.LastAtRestTally), update) - } else { - update := dbx.AccountingTimestamps_Update_Fields{Value: dbx.AccountingTimestamps_Value(latestTally)} - _, err = tx.Update_AccountingTimestamps_By_Name(ctx, dbx.AccountingTimestamps_Name(accounting.LastAtRestTally), update) - } + update := dbx.AccountingTimestamps_Update_Fields{Value: dbx.AccountingTimestamps_Value(latestTally)} + _, err = tx.Update_AccountingTimestamps_By_Name(ctx, dbx.AccountingTimestamps_Name(accounting.LastAtRestTally), update) return Error.Wrap(err) } @@ -152,7 +155,7 @@ func (db *accountingDB) GetRawSince(ctx context.Context, latestRollup time.Time) } // SaveRollup records raw tallies of at rest data to the database -func (db *accountingDB) SaveRollup(ctx context.Context, latestRollup time.Time, isNew bool, stats accounting.RollupStats) error { +func (db *accountingDB) SaveRollup(ctx context.Context, latestRollup time.Time, stats accounting.RollupStats) error { if len(stats) == 0 { return Error.New("In SaveRollup with empty nodeData") } @@ -183,13 +186,8 @@ func (db *accountingDB) SaveRollup(ctx context.Context, latestRollup time.Time, } } } - if isNew { - update := dbx.AccountingTimestamps_Value(latestRollup) - _, err = tx.Create_AccountingTimestamps(ctx, dbx.AccountingTimestamps_Name(accounting.LastRollup), update) - } else { - update := dbx.AccountingTimestamps_Update_Fields{Value: dbx.AccountingTimestamps_Value(latestRollup)} - _, err = tx.Update_AccountingTimestamps_By_Name(ctx, dbx.AccountingTimestamps_Name(accounting.LastRollup), update) - } + update := dbx.AccountingTimestamps_Update_Fields{Value: dbx.AccountingTimestamps_Value(latestRollup)} + _, err = tx.Update_AccountingTimestamps_By_Name(ctx, dbx.AccountingTimestamps_Name(accounting.LastRollup), update) return Error.Wrap(err) } diff --git a/satellite/satellitedb/bandwidthagreement.go b/satellite/satellitedb/bandwidthagreement.go index a32d769f7..b2efc6663 100644 --- a/satellite/satellitedb/bandwidthagreement.go +++ b/satellite/satellitedb/bandwidthagreement.go @@ -5,12 +5,14 @@ package satellitedb import ( "context" + "fmt" "time" - "github.com/golang/protobuf/proto" + "github.com/zeebo/errs" "storj.io/storj/pkg/bwagreement" "storj.io/storj/pkg/pb" + "storj.io/storj/pkg/storj" dbx "storj.io/storj/satellite/satellitedb/dbx" ) @@ -18,17 +20,13 @@ type bandwidthagreement struct { db *dbx.DB } -func (b *bandwidthagreement) CreateAgreement(ctx context.Context, rba *pb.RenterBandwidthAllocation) error { - rbaBytes, err := proto.Marshal(rba) - if err != nil { - return err - } +func (b *bandwidthagreement) CreateAgreement(ctx context.Context, rba *pb.RenterBandwidthAllocation) (err error) { expiration := time.Unix(rba.PayerAllocation.ExpirationUnixSec, 0) _, err = b.db.Create_Bwagreement( ctx, dbx.Bwagreement_Serialnum(rba.PayerAllocation.SerialNumber+rba.StorageNodeId.String()), - dbx.Bwagreement_Data(rbaBytes), - dbx.Bwagreement_StorageNode(rba.StorageNodeId.Bytes()), + dbx.Bwagreement_StorageNodeId(rba.StorageNodeId.Bytes()), + dbx.Bwagreement_UplinkId(rba.PayerAllocation.UplinkId.Bytes()), dbx.Bwagreement_Action(int64(rba.PayerAllocation.Action)), dbx.Bwagreement_Total(rba.Total), dbx.Bwagreement_ExpiresAt(expiration), @@ -36,43 +34,71 @@ func (b *bandwidthagreement) CreateAgreement(ctx context.Context, rba *pb.Renter return err } -func (b *bandwidthagreement) GetAgreements(ctx context.Context) ([]bwagreement.Agreement, error) { - rows, err := b.db.All_Bwagreement(ctx) +//GetTotals returns stats about an uplink +func (b *bandwidthagreement) GetUplinkStats(ctx context.Context, from, to time.Time) (stats []bwagreement.UplinkStat, err error) { + + var uplinkSQL = fmt.Sprintf(`SELECT uplink_id, SUM(total), + COUNT(CASE WHEN action = %d THEN total ELSE null END), + COUNT(CASE WHEN action = %d THEN total ELSE null END), COUNT(*) + FROM bwagreements WHERE created_at > ? + AND created_at <= ? GROUP BY uplink_id ORDER BY uplink_id`, + pb.BandwidthAction_PUT, pb.BandwidthAction_GET) + rows, err := b.db.DB.Query(b.db.Rebind(uplinkSQL), from, to) if err != nil { return nil, err } - agreements := make([]bwagreement.Agreement, len(rows)) - for i, entry := range rows { - rba := pb.RenterBandwidthAllocation{} - err := proto.Unmarshal(entry.Data, &rba) + defer func() { err = errs.Combine(err, rows.Close()) }() + for rows.Next() { + var nodeID []byte + stat := bwagreement.UplinkStat{} + err := rows.Scan(&nodeID, &stat.TotalBytes, &stat.PutActionCount, &stat.GetActionCount, &stat.TotalTransactions) if err != nil { - return nil, err + return stats, err } - agreement := &agreements[i] - agreement.Agreement = rba - agreement.CreatedAt = entry.CreatedAt + id, err := storj.NodeIDFromBytes(nodeID) + if err != nil { + return stats, err + } + stat.NodeID = id + stats = append(stats, stat) } - return agreements, nil + return stats, nil } -func (b *bandwidthagreement) GetAgreementsSince(ctx context.Context, since time.Time) ([]bwagreement.Agreement, error) { - rows, err := b.db.All_Bwagreement_By_CreatedAt_Greater(ctx, dbx.Bwagreement_CreatedAt(since)) +//GetTotals returns the sum of each bandwidth type after (exluding) a given date range +func (b *bandwidthagreement) GetTotals(ctx context.Context, from, to time.Time) (bwa map[storj.NodeID][]int64, err error) { + var getTotalsSQL = fmt.Sprintf(`SELECT storage_node_id, + SUM(CASE WHEN action = %d THEN total ELSE 0 END), + SUM(CASE WHEN action = %d THEN total ELSE 0 END), + SUM(CASE WHEN action = %d THEN total ELSE 0 END), + SUM(CASE WHEN action = %d THEN total ELSE 0 END), + SUM(CASE WHEN action = %d THEN total ELSE 0 END) + FROM bwagreements WHERE created_at > ? AND created_at <= ? + GROUP BY storage_node_id ORDER BY storage_node_id`, pb.BandwidthAction_PUT, + pb.BandwidthAction_GET, pb.BandwidthAction_GET_AUDIT, + pb.BandwidthAction_GET_REPAIR, pb.BandwidthAction_PUT_REPAIR) + rows, err := b.db.DB.Query(b.db.Rebind(getTotalsSQL), from, to) if err != nil { return nil, err } + defer func() { err = errs.Combine(err, rows.Close()) }() - agreements := make([]bwagreement.Agreement, len(rows)) - for i, entry := range rows { - rba := pb.RenterBandwidthAllocation{} - err := proto.Unmarshal(entry.Data, &rba) + totals := make(map[storj.NodeID][]int64) + for i := 0; rows.Next(); i++ { + var nodeID []byte + data := make([]int64, len(pb.BandwidthAction_value)) + err := rows.Scan(&nodeID, &data[pb.BandwidthAction_PUT], &data[pb.BandwidthAction_GET], + &data[pb.BandwidthAction_GET_AUDIT], &data[pb.BandwidthAction_GET_REPAIR], &data[pb.BandwidthAction_PUT_REPAIR]) if err != nil { - return nil, err + return totals, err } - agreement := &agreements[i] - agreement.Agreement = rba - agreement.CreatedAt = entry.CreatedAt + id, err := storj.NodeIDFromBytes(nodeID) + if err != nil { + return totals, err + } + totals[id] = data } - return agreements, nil + return totals, nil } func (b *bandwidthagreement) DeletePaidAndExpired(ctx context.Context) error { diff --git a/satellite/satellitedb/dbx/satellitedb.dbx b/satellite/satellitedb/dbx/satellitedb.dbx index 4b1bfa655..b674f8387 100644 --- a/satellite/satellitedb/dbx/satellitedb.dbx +++ b/satellite/satellitedb/dbx/satellitedb.dbx @@ -5,32 +5,18 @@ model bwagreement ( key serialnum - field serialnum text - field data blob - field storage_node blob - field action int64 - field total int64 - field created_at timestamp ( autoinsert ) - field expires_at timestamp + field serialnum text + field storage_node_id blob + field uplink_id blob + field action int64 + field total int64 + field created_at timestamp ( autoinsert ) + field expires_at timestamp ) create bwagreement ( ) -delete bwagreement ( where bwagreement.serialnum = ? ) -delete bwagreement ( where bwagreement.expires_at <= ?) - -read one ( - select bwagreement - where bwagreement.serialnum = ? -) - -read limitoffset ( - select bwagreement -) - -read all ( - select bwagreement -) - +read limitoffset ( select bwagreement) +read all ( select bwagreement) read all ( select bwagreement where bwagreement.created_at > ? diff --git a/satellite/satellitedb/dbx/satellitedb.dbx.go b/satellite/satellitedb/dbx/satellitedb.dbx.go index 7ee374087..f20c6a260 100644 --- a/satellite/satellitedb/dbx/satellitedb.dbx.go +++ b/satellite/satellitedb/dbx/satellitedb.dbx.go @@ -301,8 +301,8 @@ CREATE TABLE accounting_timestamps ( ); CREATE TABLE bwagreements ( serialnum text NOT NULL, - data bytea NOT NULL, - storage_node bytea NOT NULL, + storage_node_id bytea NOT NULL, + uplink_id bytea NOT NULL, action bigint NOT NULL, total bigint NOT NULL, created_at timestamp with time zone NOT NULL, @@ -483,8 +483,8 @@ CREATE TABLE accounting_timestamps ( ); CREATE TABLE bwagreements ( serialnum TEXT NOT NULL, - data BLOB NOT NULL, - storage_node BLOB NOT NULL, + storage_node_id BLOB NOT NULL, + uplink_id BLOB NOT NULL, action INTEGER NOT NULL, total INTEGER NOT NULL, created_at TIMESTAMP NOT NULL, @@ -1002,13 +1002,13 @@ func (f AccountingTimestamps_Value_Field) value() interface{} { func (AccountingTimestamps_Value_Field) _Column() string { return "value" } type Bwagreement struct { - Serialnum string - Data []byte - StorageNode []byte - Action int64 - Total int64 - CreatedAt time.Time - ExpiresAt time.Time + Serialnum string + StorageNodeId []byte + UplinkId []byte + Action int64 + Total int64 + CreatedAt time.Time + ExpiresAt time.Time } func (Bwagreement) _Table() string { return "bwagreements" } @@ -1035,43 +1035,43 @@ func (f Bwagreement_Serialnum_Field) value() interface{} { func (Bwagreement_Serialnum_Field) _Column() string { return "serialnum" } -type Bwagreement_Data_Field struct { +type Bwagreement_StorageNodeId_Field struct { _set bool _null bool _value []byte } -func Bwagreement_Data(v []byte) Bwagreement_Data_Field { - return Bwagreement_Data_Field{_set: true, _value: v} +func Bwagreement_StorageNodeId(v []byte) Bwagreement_StorageNodeId_Field { + return Bwagreement_StorageNodeId_Field{_set: true, _value: v} } -func (f Bwagreement_Data_Field) value() interface{} { +func (f Bwagreement_StorageNodeId_Field) value() interface{} { if !f._set || f._null { return nil } return f._value } -func (Bwagreement_Data_Field) _Column() string { return "data" } +func (Bwagreement_StorageNodeId_Field) _Column() string { return "storage_node_id" } -type Bwagreement_StorageNode_Field struct { +type Bwagreement_UplinkId_Field struct { _set bool _null bool _value []byte } -func Bwagreement_StorageNode(v []byte) Bwagreement_StorageNode_Field { - return Bwagreement_StorageNode_Field{_set: true, _value: v} +func Bwagreement_UplinkId(v []byte) Bwagreement_UplinkId_Field { + return Bwagreement_UplinkId_Field{_set: true, _value: v} } -func (f Bwagreement_StorageNode_Field) value() interface{} { +func (f Bwagreement_UplinkId_Field) value() interface{} { if !f._set || f._null { return nil } return f._value } -func (Bwagreement_StorageNode_Field) _Column() string { return "storage_node" } +func (Bwagreement_UplinkId_Field) _Column() string { return "uplink_id" } type Bwagreement_Action_Field struct { _set bool @@ -2502,8 +2502,8 @@ type Value_Row struct { func (obj *postgresImpl) Create_Bwagreement(ctx context.Context, bwagreement_serialnum Bwagreement_Serialnum_Field, - bwagreement_data Bwagreement_Data_Field, - bwagreement_storage_node Bwagreement_StorageNode_Field, + bwagreement_storage_node_id Bwagreement_StorageNodeId_Field, + bwagreement_uplink_id Bwagreement_UplinkId_Field, bwagreement_action Bwagreement_Action_Field, bwagreement_total Bwagreement_Total_Field, bwagreement_expires_at Bwagreement_ExpiresAt_Field) ( @@ -2511,20 +2511,20 @@ func (obj *postgresImpl) Create_Bwagreement(ctx context.Context, __now := obj.db.Hooks.Now().UTC() __serialnum_val := bwagreement_serialnum.value() - __data_val := bwagreement_data.value() - __storage_node_val := bwagreement_storage_node.value() + __storage_node_id_val := bwagreement_storage_node_id.value() + __uplink_id_val := bwagreement_uplink_id.value() __action_val := bwagreement_action.value() __total_val := bwagreement_total.value() __created_at_val := __now __expires_at_val := bwagreement_expires_at.value() - var __embed_stmt = __sqlbundle_Literal("INSERT INTO bwagreements ( serialnum, data, storage_node, action, total, created_at, expires_at ) VALUES ( ?, ?, ?, ?, ?, ?, ? ) RETURNING bwagreements.serialnum, bwagreements.data, bwagreements.storage_node, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at") + var __embed_stmt = __sqlbundle_Literal("INSERT INTO bwagreements ( serialnum, storage_node_id, uplink_id, action, total, created_at, expires_at ) VALUES ( ?, ?, ?, ?, ?, ?, ? ) RETURNING bwagreements.serialnum, bwagreements.storage_node_id, bwagreements.uplink_id, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at") var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) - obj.logStmt(__stmt, __serialnum_val, __data_val, __storage_node_val, __action_val, __total_val, __created_at_val, __expires_at_val) + obj.logStmt(__stmt, __serialnum_val, __storage_node_id_val, __uplink_id_val, __action_val, __total_val, __created_at_val, __expires_at_val) bwagreement = &Bwagreement{} - err = obj.driver.QueryRow(__stmt, __serialnum_val, __data_val, __storage_node_val, __action_val, __total_val, __created_at_val, __expires_at_val).Scan(&bwagreement.Serialnum, &bwagreement.Data, &bwagreement.StorageNode, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt) + err = obj.driver.QueryRow(__stmt, __serialnum_val, __storage_node_id_val, __uplink_id_val, __action_val, __total_val, __created_at_val, __expires_at_val).Scan(&bwagreement.Serialnum, &bwagreement.StorageNodeId, &bwagreement.UplinkId, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt) if err != nil { return nil, obj.makeErr(err) } @@ -2874,32 +2874,11 @@ func (obj *postgresImpl) Create_BucketInfo(ctx context.Context, } -func (obj *postgresImpl) Get_Bwagreement_By_Serialnum(ctx context.Context, - bwagreement_serialnum Bwagreement_Serialnum_Field) ( - bwagreement *Bwagreement, err error) { - - var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.data, bwagreements.storage_node, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements WHERE bwagreements.serialnum = ?") - - var __values []interface{} - __values = append(__values, bwagreement_serialnum.value()) - - var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) - obj.logStmt(__stmt, __values...) - - bwagreement = &Bwagreement{} - err = obj.driver.QueryRow(__stmt, __values...).Scan(&bwagreement.Serialnum, &bwagreement.Data, &bwagreement.StorageNode, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt) - if err != nil { - return nil, obj.makeErr(err) - } - return bwagreement, nil - -} - func (obj *postgresImpl) Limited_Bwagreement(ctx context.Context, limit int, offset int64) ( rows []*Bwagreement, err error) { - var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.data, bwagreements.storage_node, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements LIMIT ? OFFSET ?") + var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.storage_node_id, bwagreements.uplink_id, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements LIMIT ? OFFSET ?") var __values []interface{} __values = append(__values) @@ -2917,7 +2896,7 @@ func (obj *postgresImpl) Limited_Bwagreement(ctx context.Context, for __rows.Next() { bwagreement := &Bwagreement{} - err = __rows.Scan(&bwagreement.Serialnum, &bwagreement.Data, &bwagreement.StorageNode, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt) + err = __rows.Scan(&bwagreement.Serialnum, &bwagreement.StorageNodeId, &bwagreement.UplinkId, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt) if err != nil { return nil, obj.makeErr(err) } @@ -2933,7 +2912,7 @@ func (obj *postgresImpl) Limited_Bwagreement(ctx context.Context, func (obj *postgresImpl) All_Bwagreement(ctx context.Context) ( rows []*Bwagreement, err error) { - var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.data, bwagreements.storage_node, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements") + var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.storage_node_id, bwagreements.uplink_id, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements") var __values []interface{} __values = append(__values) @@ -2949,7 +2928,7 @@ func (obj *postgresImpl) All_Bwagreement(ctx context.Context) ( for __rows.Next() { bwagreement := &Bwagreement{} - err = __rows.Scan(&bwagreement.Serialnum, &bwagreement.Data, &bwagreement.StorageNode, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt) + err = __rows.Scan(&bwagreement.Serialnum, &bwagreement.StorageNodeId, &bwagreement.UplinkId, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt) if err != nil { return nil, obj.makeErr(err) } @@ -2966,7 +2945,7 @@ func (obj *postgresImpl) All_Bwagreement_By_CreatedAt_Greater(ctx context.Contex bwagreement_created_at_greater Bwagreement_CreatedAt_Field) ( rows []*Bwagreement, err error) { - var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.data, bwagreements.storage_node, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements WHERE bwagreements.created_at > ?") + var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.storage_node_id, bwagreements.uplink_id, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements WHERE bwagreements.created_at > ?") var __values []interface{} __values = append(__values, bwagreement_created_at_greater.value()) @@ -2982,7 +2961,7 @@ func (obj *postgresImpl) All_Bwagreement_By_CreatedAt_Greater(ctx context.Contex for __rows.Next() { bwagreement := &Bwagreement{} - err = __rows.Scan(&bwagreement.Serialnum, &bwagreement.Data, &bwagreement.StorageNode, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt) + err = __rows.Scan(&bwagreement.Serialnum, &bwagreement.StorageNodeId, &bwagreement.UplinkId, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt) if err != nil { return nil, obj.makeErr(err) } @@ -4143,58 +4122,6 @@ func (obj *postgresImpl) Update_ApiKey_By_Id(ctx context.Context, return api_key, nil } -func (obj *postgresImpl) Delete_Bwagreement_By_Serialnum(ctx context.Context, - bwagreement_serialnum Bwagreement_Serialnum_Field) ( - deleted bool, err error) { - - var __embed_stmt = __sqlbundle_Literal("DELETE FROM bwagreements WHERE bwagreements.serialnum = ?") - - var __values []interface{} - __values = append(__values, bwagreement_serialnum.value()) - - var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) - obj.logStmt(__stmt, __values...) - - __res, err := obj.driver.Exec(__stmt, __values...) - if err != nil { - return false, obj.makeErr(err) - } - - __count, err := __res.RowsAffected() - if err != nil { - return false, obj.makeErr(err) - } - - return __count > 0, nil - -} - -func (obj *postgresImpl) Delete_Bwagreement_By_ExpiresAt_LessOrEqual(ctx context.Context, - bwagreement_expires_at_less_or_equal Bwagreement_ExpiresAt_Field) ( - count int64, err error) { - - var __embed_stmt = __sqlbundle_Literal("DELETE FROM bwagreements WHERE bwagreements.expires_at <= ?") - - var __values []interface{} - __values = append(__values, bwagreement_expires_at_less_or_equal.value()) - - var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) - obj.logStmt(__stmt, __values...) - - __res, err := obj.driver.Exec(__stmt, __values...) - if err != nil { - return 0, obj.makeErr(err) - } - - count, err = __res.RowsAffected() - if err != nil { - return 0, obj.makeErr(err) - } - - return count, nil - -} - func (obj *postgresImpl) Delete_Irreparabledb_By_Segmentpath(ctx context.Context, irreparabledb_segmentpath Irreparabledb_Segmentpath_Field) ( deleted bool, err error) { @@ -4632,8 +4559,8 @@ func (obj *postgresImpl) deleteAll(ctx context.Context) (count int64, err error) func (obj *sqlite3Impl) Create_Bwagreement(ctx context.Context, bwagreement_serialnum Bwagreement_Serialnum_Field, - bwagreement_data Bwagreement_Data_Field, - bwagreement_storage_node Bwagreement_StorageNode_Field, + bwagreement_storage_node_id Bwagreement_StorageNodeId_Field, + bwagreement_uplink_id Bwagreement_UplinkId_Field, bwagreement_action Bwagreement_Action_Field, bwagreement_total Bwagreement_Total_Field, bwagreement_expires_at Bwagreement_ExpiresAt_Field) ( @@ -4641,19 +4568,19 @@ func (obj *sqlite3Impl) Create_Bwagreement(ctx context.Context, __now := obj.db.Hooks.Now().UTC() __serialnum_val := bwagreement_serialnum.value() - __data_val := bwagreement_data.value() - __storage_node_val := bwagreement_storage_node.value() + __storage_node_id_val := bwagreement_storage_node_id.value() + __uplink_id_val := bwagreement_uplink_id.value() __action_val := bwagreement_action.value() __total_val := bwagreement_total.value() __created_at_val := __now __expires_at_val := bwagreement_expires_at.value() - var __embed_stmt = __sqlbundle_Literal("INSERT INTO bwagreements ( serialnum, data, storage_node, action, total, created_at, expires_at ) VALUES ( ?, ?, ?, ?, ?, ?, ? )") + var __embed_stmt = __sqlbundle_Literal("INSERT INTO bwagreements ( serialnum, storage_node_id, uplink_id, action, total, created_at, expires_at ) VALUES ( ?, ?, ?, ?, ?, ?, ? )") var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) - obj.logStmt(__stmt, __serialnum_val, __data_val, __storage_node_val, __action_val, __total_val, __created_at_val, __expires_at_val) + obj.logStmt(__stmt, __serialnum_val, __storage_node_id_val, __uplink_id_val, __action_val, __total_val, __created_at_val, __expires_at_val) - __res, err := obj.driver.Exec(__stmt, __serialnum_val, __data_val, __storage_node_val, __action_val, __total_val, __created_at_val, __expires_at_val) + __res, err := obj.driver.Exec(__stmt, __serialnum_val, __storage_node_id_val, __uplink_id_val, __action_val, __total_val, __created_at_val, __expires_at_val) if err != nil { return nil, obj.makeErr(err) } @@ -5043,32 +4970,11 @@ func (obj *sqlite3Impl) Create_BucketInfo(ctx context.Context, } -func (obj *sqlite3Impl) Get_Bwagreement_By_Serialnum(ctx context.Context, - bwagreement_serialnum Bwagreement_Serialnum_Field) ( - bwagreement *Bwagreement, err error) { - - var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.data, bwagreements.storage_node, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements WHERE bwagreements.serialnum = ?") - - var __values []interface{} - __values = append(__values, bwagreement_serialnum.value()) - - var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) - obj.logStmt(__stmt, __values...) - - bwagreement = &Bwagreement{} - err = obj.driver.QueryRow(__stmt, __values...).Scan(&bwagreement.Serialnum, &bwagreement.Data, &bwagreement.StorageNode, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt) - if err != nil { - return nil, obj.makeErr(err) - } - return bwagreement, nil - -} - func (obj *sqlite3Impl) Limited_Bwagreement(ctx context.Context, limit int, offset int64) ( rows []*Bwagreement, err error) { - var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.data, bwagreements.storage_node, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements LIMIT ? OFFSET ?") + var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.storage_node_id, bwagreements.uplink_id, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements LIMIT ? OFFSET ?") var __values []interface{} __values = append(__values) @@ -5086,7 +4992,7 @@ func (obj *sqlite3Impl) Limited_Bwagreement(ctx context.Context, for __rows.Next() { bwagreement := &Bwagreement{} - err = __rows.Scan(&bwagreement.Serialnum, &bwagreement.Data, &bwagreement.StorageNode, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt) + err = __rows.Scan(&bwagreement.Serialnum, &bwagreement.StorageNodeId, &bwagreement.UplinkId, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt) if err != nil { return nil, obj.makeErr(err) } @@ -5102,7 +5008,7 @@ func (obj *sqlite3Impl) Limited_Bwagreement(ctx context.Context, func (obj *sqlite3Impl) All_Bwagreement(ctx context.Context) ( rows []*Bwagreement, err error) { - var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.data, bwagreements.storage_node, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements") + var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.storage_node_id, bwagreements.uplink_id, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements") var __values []interface{} __values = append(__values) @@ -5118,7 +5024,7 @@ func (obj *sqlite3Impl) All_Bwagreement(ctx context.Context) ( for __rows.Next() { bwagreement := &Bwagreement{} - err = __rows.Scan(&bwagreement.Serialnum, &bwagreement.Data, &bwagreement.StorageNode, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt) + err = __rows.Scan(&bwagreement.Serialnum, &bwagreement.StorageNodeId, &bwagreement.UplinkId, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt) if err != nil { return nil, obj.makeErr(err) } @@ -5135,7 +5041,7 @@ func (obj *sqlite3Impl) All_Bwagreement_By_CreatedAt_Greater(ctx context.Context bwagreement_created_at_greater Bwagreement_CreatedAt_Field) ( rows []*Bwagreement, err error) { - var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.data, bwagreements.storage_node, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements WHERE bwagreements.created_at > ?") + var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.storage_node_id, bwagreements.uplink_id, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements WHERE bwagreements.created_at > ?") var __values []interface{} __values = append(__values, bwagreement_created_at_greater.value()) @@ -5151,7 +5057,7 @@ func (obj *sqlite3Impl) All_Bwagreement_By_CreatedAt_Greater(ctx context.Context for __rows.Next() { bwagreement := &Bwagreement{} - err = __rows.Scan(&bwagreement.Serialnum, &bwagreement.Data, &bwagreement.StorageNode, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt) + err = __rows.Scan(&bwagreement.Serialnum, &bwagreement.StorageNodeId, &bwagreement.UplinkId, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt) if err != nil { return nil, obj.makeErr(err) } @@ -6382,58 +6288,6 @@ func (obj *sqlite3Impl) Update_ApiKey_By_Id(ctx context.Context, return api_key, nil } -func (obj *sqlite3Impl) Delete_Bwagreement_By_Serialnum(ctx context.Context, - bwagreement_serialnum Bwagreement_Serialnum_Field) ( - deleted bool, err error) { - - var __embed_stmt = __sqlbundle_Literal("DELETE FROM bwagreements WHERE bwagreements.serialnum = ?") - - var __values []interface{} - __values = append(__values, bwagreement_serialnum.value()) - - var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) - obj.logStmt(__stmt, __values...) - - __res, err := obj.driver.Exec(__stmt, __values...) - if err != nil { - return false, obj.makeErr(err) - } - - __count, err := __res.RowsAffected() - if err != nil { - return false, obj.makeErr(err) - } - - return __count > 0, nil - -} - -func (obj *sqlite3Impl) Delete_Bwagreement_By_ExpiresAt_LessOrEqual(ctx context.Context, - bwagreement_expires_at_less_or_equal Bwagreement_ExpiresAt_Field) ( - count int64, err error) { - - var __embed_stmt = __sqlbundle_Literal("DELETE FROM bwagreements WHERE bwagreements.expires_at <= ?") - - var __values []interface{} - __values = append(__values, bwagreement_expires_at_less_or_equal.value()) - - var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) - obj.logStmt(__stmt, __values...) - - __res, err := obj.driver.Exec(__stmt, __values...) - if err != nil { - return 0, obj.makeErr(err) - } - - count, err = __res.RowsAffected() - if err != nil { - return 0, obj.makeErr(err) - } - - return count, nil - -} - func (obj *sqlite3Impl) Delete_Irreparabledb_By_Segmentpath(ctx context.Context, irreparabledb_segmentpath Irreparabledb_Segmentpath_Field) ( deleted bool, err error) { @@ -6725,13 +6579,13 @@ func (obj *sqlite3Impl) getLastBwagreement(ctx context.Context, pk int64) ( bwagreement *Bwagreement, err error) { - var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.data, bwagreements.storage_node, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements WHERE _rowid_ = ?") + var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.storage_node_id, bwagreements.uplink_id, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements WHERE _rowid_ = ?") var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) obj.logStmt(__stmt, pk) bwagreement = &Bwagreement{} - err = obj.driver.QueryRow(__stmt, pk).Scan(&bwagreement.Serialnum, &bwagreement.Data, &bwagreement.StorageNode, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt) + err = obj.driver.QueryRow(__stmt, pk).Scan(&bwagreement.Serialnum, &bwagreement.StorageNodeId, &bwagreement.UplinkId, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt) if err != nil { return nil, obj.makeErr(err) } @@ -7339,8 +7193,8 @@ func (rx *Rx) Create_BucketInfo(ctx context.Context, func (rx *Rx) Create_Bwagreement(ctx context.Context, bwagreement_serialnum Bwagreement_Serialnum_Field, - bwagreement_data Bwagreement_Data_Field, - bwagreement_storage_node Bwagreement_StorageNode_Field, + bwagreement_storage_node_id Bwagreement_StorageNodeId_Field, + bwagreement_uplink_id Bwagreement_UplinkId_Field, bwagreement_action Bwagreement_Action_Field, bwagreement_total Bwagreement_Total_Field, bwagreement_expires_at Bwagreement_ExpiresAt_Field) ( @@ -7349,7 +7203,7 @@ func (rx *Rx) Create_Bwagreement(ctx context.Context, if tx, err = rx.getTx(ctx); err != nil { return } - return tx.Create_Bwagreement(ctx, bwagreement_serialnum, bwagreement_data, bwagreement_storage_node, bwagreement_action, bwagreement_total, bwagreement_expires_at) + return tx.Create_Bwagreement(ctx, bwagreement_serialnum, bwagreement_storage_node_id, bwagreement_uplink_id, bwagreement_action, bwagreement_total, bwagreement_expires_at) } @@ -7501,27 +7355,6 @@ func (rx *Rx) Delete_BucketInfo_By_Name(ctx context.Context, return tx.Delete_BucketInfo_By_Name(ctx, bucket_info_name) } -func (rx *Rx) Delete_Bwagreement_By_ExpiresAt_LessOrEqual(ctx context.Context, - bwagreement_expires_at_less_or_equal Bwagreement_ExpiresAt_Field) ( - count int64, err error) { - var tx *Tx - if tx, err = rx.getTx(ctx); err != nil { - return - } - return tx.Delete_Bwagreement_By_ExpiresAt_LessOrEqual(ctx, bwagreement_expires_at_less_or_equal) - -} - -func (rx *Rx) Delete_Bwagreement_By_Serialnum(ctx context.Context, - bwagreement_serialnum Bwagreement_Serialnum_Field) ( - deleted bool, err error) { - var tx *Tx - if tx, err = rx.getTx(ctx); err != nil { - return - } - return tx.Delete_Bwagreement_By_Serialnum(ctx, bwagreement_serialnum) -} - func (rx *Rx) Delete_Injuredsegment_By_Id(ctx context.Context, injuredsegment_id Injuredsegment_Id_Field) ( deleted bool, err error) { @@ -7662,16 +7495,6 @@ func (rx *Rx) Get_BucketInfo_By_Name(ctx context.Context, return tx.Get_BucketInfo_By_Name(ctx, bucket_info_name) } -func (rx *Rx) Get_Bwagreement_By_Serialnum(ctx context.Context, - bwagreement_serialnum Bwagreement_Serialnum_Field) ( - bwagreement *Bwagreement, err error) { - var tx *Tx - if tx, err = rx.getTx(ctx); err != nil { - return - } - return tx.Get_Bwagreement_By_Serialnum(ctx, bwagreement_serialnum) -} - func (rx *Rx) Get_Irreparabledb_By_Segmentpath(ctx context.Context, irreparabledb_segmentpath Irreparabledb_Segmentpath_Field) ( irreparabledb *Irreparabledb, err error) { @@ -7944,8 +7767,8 @@ type Methods interface { Create_Bwagreement(ctx context.Context, bwagreement_serialnum Bwagreement_Serialnum_Field, - bwagreement_data Bwagreement_Data_Field, - bwagreement_storage_node Bwagreement_StorageNode_Field, + bwagreement_storage_node_id Bwagreement_StorageNodeId_Field, + bwagreement_uplink_id Bwagreement_UplinkId_Field, bwagreement_action Bwagreement_Action_Field, bwagreement_total Bwagreement_Total_Field, bwagreement_expires_at Bwagreement_ExpiresAt_Field) ( @@ -8026,14 +7849,6 @@ type Methods interface { bucket_info_name BucketInfo_Name_Field) ( deleted bool, err error) - Delete_Bwagreement_By_ExpiresAt_LessOrEqual(ctx context.Context, - bwagreement_expires_at_less_or_equal Bwagreement_ExpiresAt_Field) ( - count int64, err error) - - Delete_Bwagreement_By_Serialnum(ctx context.Context, - bwagreement_serialnum Bwagreement_Serialnum_Field) ( - deleted bool, err error) - Delete_Injuredsegment_By_Id(ctx context.Context, injuredsegment_id Injuredsegment_Id_Field) ( deleted bool, err error) @@ -8090,10 +7905,6 @@ type Methods interface { bucket_info_name BucketInfo_Name_Field) ( bucket_info *BucketInfo, err error) - Get_Bwagreement_By_Serialnum(ctx context.Context, - bwagreement_serialnum Bwagreement_Serialnum_Field) ( - bwagreement *Bwagreement, err error) - Get_Irreparabledb_By_Segmentpath(ctx context.Context, irreparabledb_segmentpath Irreparabledb_Segmentpath_Field) ( irreparabledb *Irreparabledb, err error) diff --git a/satellite/satellitedb/dbx/satellitedb.dbx.postgres.sql b/satellite/satellitedb/dbx/satellitedb.dbx.postgres.sql index 70622a2ee..1b81f005b 100644 --- a/satellite/satellitedb/dbx/satellitedb.dbx.postgres.sql +++ b/satellite/satellitedb/dbx/satellitedb.dbx.postgres.sql @@ -28,8 +28,8 @@ CREATE TABLE accounting_timestamps ( ); CREATE TABLE bwagreements ( serialnum text NOT NULL, - data bytea NOT NULL, - storage_node bytea NOT NULL, + storage_node_id bytea NOT NULL, + uplink_id bytea NOT NULL, action bigint NOT NULL, total bigint NOT NULL, created_at timestamp with time zone NOT NULL, diff --git a/satellite/satellitedb/dbx/satellitedb.dbx.sqlite3.sql b/satellite/satellitedb/dbx/satellitedb.dbx.sqlite3.sql index ab12952d4..d6e3787cd 100644 --- a/satellite/satellitedb/dbx/satellitedb.dbx.sqlite3.sql +++ b/satellite/satellitedb/dbx/satellitedb.dbx.sqlite3.sql @@ -28,8 +28,8 @@ CREATE TABLE accounting_timestamps ( ); CREATE TABLE bwagreements ( serialnum TEXT NOT NULL, - data BLOB NOT NULL, - storage_node BLOB NOT NULL, + storage_node_id BLOB NOT NULL, + uplink_id BLOB NOT NULL, action INTEGER NOT NULL, total INTEGER NOT NULL, created_at TIMESTAMP NOT NULL, diff --git a/satellite/satellitedb/locked.go b/satellite/satellitedb/locked.go index 9f6e375a5..22a23ea24 100644 --- a/satellite/satellitedb/locked.go +++ b/satellite/satellitedb/locked.go @@ -62,11 +62,11 @@ func (m *lockedAccounting) GetRawSince(ctx context.Context, latestRollup time.Ti return m.db.GetRawSince(ctx, latestRollup) } -// LastRawTime records the latest last tallied time. -func (m *lockedAccounting) LastRawTime(ctx context.Context, timestampType string) (time.Time, bool, error) { +// LastTimestamp records the latest last tallied time. +func (m *lockedAccounting) LastTimestamp(ctx context.Context, timestampType string) (time.Time, error) { m.Lock() defer m.Unlock() - return m.db.LastRawTime(ctx, timestampType) + return m.db.LastTimestamp(ctx, timestampType) } // QueryPaymentInfo queries StatDB, Accounting Rollup on nodeID @@ -77,24 +77,24 @@ func (m *lockedAccounting) QueryPaymentInfo(ctx context.Context, start time.Time } // SaveAtRestRaw records raw tallies of at-rest-data. -func (m *lockedAccounting) SaveAtRestRaw(ctx context.Context, latestTally time.Time, isNew bool, nodeData map[storj.NodeID]float64) error { +func (m *lockedAccounting) SaveAtRestRaw(ctx context.Context, latestTally time.Time, nodeData map[storj.NodeID]float64) error { m.Lock() defer m.Unlock() - return m.db.SaveAtRestRaw(ctx, latestTally, isNew, nodeData) + return m.db.SaveAtRestRaw(ctx, latestTally, nodeData) } -// SaveBWRaw records raw sums of agreement values to the database and updates the LastRawTime. -func (m *lockedAccounting) SaveBWRaw(ctx context.Context, latestBwa time.Time, isNew bool, bwTotals accounting.BWTally) error { +// SaveBWRaw records raw sums of agreement values to the database and updates the LastTimestamp. +func (m *lockedAccounting) SaveBWRaw(ctx context.Context, tallyEnd time.Time, bwTotals map[storj.NodeID][]int64) error { m.Lock() defer m.Unlock() - return m.db.SaveBWRaw(ctx, latestBwa, isNew, bwTotals) + return m.db.SaveBWRaw(ctx, tallyEnd, bwTotals) } // SaveRollup records raw tallies of at rest data to the database -func (m *lockedAccounting) SaveRollup(ctx context.Context, latestTally time.Time, isNew bool, stats accounting.RollupStats) error { +func (m *lockedAccounting) SaveRollup(ctx context.Context, latestTally time.Time, stats accounting.RollupStats) error { m.Lock() defer m.Unlock() - return m.db.SaveRollup(ctx, latestTally, isNew, stats) + return m.db.SaveRollup(ctx, latestTally, stats) } // BandwidthAgreement returns database for storing bandwidth agreements @@ -117,18 +117,18 @@ func (m *lockedBandwidthAgreement) CreateAgreement(ctx context.Context, a1 *pb.R return m.db.CreateAgreement(ctx, a1) } -// GetAgreements gets all bandwidth agreements. -func (m *lockedBandwidthAgreement) GetAgreements(ctx context.Context) ([]bwagreement.Agreement, error) { +// GetTotalsSince returns the sum of each bandwidth type after (exluding) a given date range +func (m *lockedBandwidthAgreement) GetTotals(ctx context.Context, a1 time.Time, a2 time.Time) (map[storj.NodeID][]int64, error) { m.Lock() defer m.Unlock() - return m.db.GetAgreements(ctx) + return m.db.GetTotals(ctx, a1, a2) } -// GetAgreementsSince gets all bandwidth agreements since specific time. -func (m *lockedBandwidthAgreement) GetAgreementsSince(ctx context.Context, a1 time.Time) ([]bwagreement.Agreement, error) { +// GetTotals returns stats about an uplink +func (m *lockedBandwidthAgreement) GetUplinkStats(ctx context.Context, a1 time.Time, a2 time.Time) ([]bwagreement.UplinkStat, error) { m.Lock() defer m.Unlock() - return m.db.GetAgreementsSince(ctx, a1) + return m.db.GetUplinkStats(ctx, a1, a2) } // Close closes the database