From 5395ff5fe6c019e2773450900631f1e6c6353046 Mon Sep 17 00:00:00 2001 From: Jennifer Li Johnson Date: Fri, 10 May 2019 15:05:42 -0400 Subject: [PATCH] Refactor accountingdb interface (#1897) * splits accounting db into storagenodeaccounting and projectaccounting interfaces and renames methods to match --- cmd/satellite/usage.go | 2 +- pkg/accounting/db.go | 58 +- pkg/accounting/db_test.go | 4 +- pkg/accounting/projectusage_test.go | 16 +- pkg/accounting/rollup/rollup.go | 23 +- pkg/accounting/rollup/rollup_test.go | 20 +- pkg/accounting/tally/tally.go | 43 +- pkg/accounting/tally/tally_test.go | 13 +- pkg/overlay/cache.go | 3 +- satellite/metainfo/metainfo.go | 41 +- satellite/peer.go | 13 +- satellite/satellitedb/accounting.go | 296 ------ satellite/satellitedb/database.go | 11 +- satellite/satellitedb/dbx/satellitedb.dbx | 52 +- satellite/satellitedb/dbx/satellitedb.dbx.go | 971 ++++++++---------- .../dbx/satellitedb.dbx.postgres.sql | 18 +- .../dbx/satellitedb.dbx.sqlite3.sql | 18 +- satellite/satellitedb/locked.go | 222 ++-- satellite/satellitedb/migrate.go | 10 + satellite/satellitedb/projectaccounting.go | 116 +++ .../satellitedb/storagenodeaccounting.go | 194 ++++ .../satellitedb/testdata/postgres.v18.sql | 233 +++++ 22 files changed, 1228 insertions(+), 1149 deletions(-) delete mode 100644 satellite/satellitedb/accounting.go create mode 100644 satellite/satellitedb/projectaccounting.go create mode 100644 satellite/satellitedb/storagenodeaccounting.go create mode 100644 satellite/satellitedb/testdata/postgres.v18.sql diff --git a/cmd/satellite/usage.go b/cmd/satellite/usage.go index 7eeb5f2e9..b0c4176ac 100644 --- a/cmd/satellite/usage.go +++ b/cmd/satellite/usage.go @@ -29,7 +29,7 @@ func generateCSV(ctx context.Context, start time.Time, end time.Time, output io. err = errs.Combine(err, db.Close()) }() - rows, err := db.Accounting().QueryPaymentInfo(ctx, start, end) + rows, err := db.StoragenodeAccounting().QueryPaymentInfo(ctx, start, end) if err != nil { return err } diff --git a/pkg/accounting/db.go b/pkg/accounting/db.go index b735f23b5..6a6b19fad 100644 --- a/pkg/accounting/db.go +++ b/pkg/accounting/db.go @@ -15,14 +15,12 @@ import ( // RollupStats is a convenience alias type RollupStats map[time.Time]map[storj.NodeID]*Rollup -// Raw mirrors dbx.AccountingRaw, allowing us to use that struct without leaking dbx -type Raw struct { +// StoragenodeStorageTally mirrors dbx.StoragenodeStorageTally, allowing us to use that struct without leaking dbx +type StoragenodeStorageTally struct { ID int64 NodeID storj.NodeID IntervalEndTime time.Time DataTotal float64 - DataType int - CreatedAt time.Time } // StoragenodeBandwidthRollup mirrors dbx.StoragenodeBandwidthRollup, allowing us to use the struct without leaking dbx @@ -46,30 +44,34 @@ type Rollup struct { AtRestTotal float64 } -// DB stores information about bandwidth and storage usage -type DB interface { - // LastTimestamp records the latest last tallied time. - LastTimestamp(ctx context.Context, timestampType string) (time.Time, error) - // SaveAtRestRaw records raw tallies of at-rest-data. - SaveAtRestRaw(ctx context.Context, latestTally time.Time, created time.Time, nodeData map[storj.NodeID]float64) error - // GetRaw retrieves all raw tallies - GetRaw(ctx context.Context) ([]*Raw, error) - // GetRawSince retrieves all raw tallies since latestRollup - GetRawSince(ctx context.Context, latestRollup time.Time) ([]*Raw, error) - // GetStoragenodeBandwidthSince retrieves all storagenode_bandwidth_rollup entires since latestRollup - GetStoragenodeBandwidthSince(ctx context.Context, latestRollup time.Time) ([]*StoragenodeBandwidthRollup, error) - // SaveRollup records raw tallies of at rest data to the database +// StoragenodeAccounting stores information about bandwidth and storage usage for storage nodes +type StoragenodeAccounting interface { + // SaveTallies records tallies of data at rest + SaveTallies(ctx context.Context, latestTally time.Time, nodeData map[storj.NodeID]float64) error + // GetTallies retrieves all tallies + GetTallies(ctx context.Context) ([]*StoragenodeStorageTally, error) + // GetTalliesSince retrieves all tallies since latestRollup + GetTalliesSince(ctx context.Context, latestRollup time.Time) ([]*StoragenodeStorageTally, error) + // GetBandwidthSince retrieves all bandwidth rollup entires since latestRollup + GetBandwidthSince(ctx context.Context, latestRollup time.Time) ([]*StoragenodeBandwidthRollup, error) + // SaveRollup records tally and bandwidth rollup aggregations to the database SaveRollup(ctx context.Context, latestTally time.Time, stats RollupStats) error - // SaveBucketTallies saves the latest bucket info - SaveBucketTallies(ctx context.Context, intervalStart time.Time, bucketTallies map[string]*BucketTally) ([]BucketTally, error) - // QueryPaymentInfo queries Overlay, Accounting Rollup on nodeID + // LastTimestamp records and returns the latest last tallied time. + LastTimestamp(ctx context.Context, timestampType string) (time.Time, error) + // QueryPaymentInfo queries Nodes and Accounting_Rollup on nodeID QueryPaymentInfo(ctx context.Context, start time.Time, end time.Time) ([]*CSVRow, error) - // DeleteRawBefore deletes all raw tallies prior to some time - DeleteRawBefore(ctx context.Context, latestRollup time.Time) error - // CreateBucketStorageTally creates a record for BucketStorageTally in the accounting DB table - CreateBucketStorageTally(ctx context.Context, tally BucketStorageTally) error - // ProjectAllocatedBandwidthTotal returns the sum of GET bandwidth usage allocated for a projectID in the past time frame - ProjectAllocatedBandwidthTotal(ctx context.Context, bucketID []byte, from time.Time) (int64, error) - // ProjectStorageTotals returns the current inline and remote storage usage for a projectID - ProjectStorageTotals(ctx context.Context, projectID uuid.UUID) (int64, int64, error) + // DeleteTalliesBefore deletes all tallies prior to some time + DeleteTalliesBefore(ctx context.Context, latestRollup time.Time) error +} + +// ProjectAccounting stores information about bandwidth and storage usage for projects +type ProjectAccounting interface { + // SaveTallies saves the latest project info + SaveTallies(ctx context.Context, intervalStart time.Time, bucketTallies map[string]*BucketTally) ([]BucketTally, error) + // CreateStorageTally creates a record for BucketStorageTally in the accounting DB table + CreateStorageTally(ctx context.Context, tally BucketStorageTally) error + // GetAllocatedBandwidthTotal returns the sum of GET bandwidth usage allocated for a projectID in the past time frame + GetAllocatedBandwidthTotal(ctx context.Context, bucketID []byte, from time.Time) (int64, error) + // GetStorageTotals returns the current inline and remote storage usage for a projectID + GetStorageTotals(ctx context.Context, projectID uuid.UUID) (int64, int64, error) } diff --git a/pkg/accounting/db_test.go b/pkg/accounting/db_test.go index 9a7bb7958..1054d5abc 100644 --- a/pkg/accounting/db_test.go +++ b/pkg/accounting/db_test.go @@ -30,8 +30,8 @@ func TestSaveBucketTallies(t *testing.T) { // Execute test: retrieve the save tallies and confirm they contains the expected data intervalStart := time.Now() - accountingDB := db.Accounting() - actualTallies, err := accountingDB.SaveBucketTallies(ctx, intervalStart, bucketTallies) + pdb := db.ProjectAccounting() + actualTallies, err := pdb.SaveTallies(ctx, intervalStart, bucketTallies) require.NoError(t, err) for _, tally := range actualTallies { require.Contains(t, expectedTallies, tally) diff --git a/pkg/accounting/projectusage_test.go b/pkg/accounting/projectusage_test.go index 9dec173fb..260014a6a 100644 --- a/pkg/accounting/projectusage_test.go +++ b/pkg/accounting/projectusage_test.go @@ -39,7 +39,7 @@ func TestProjectUsageStorage(t *testing.T) { SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { saDB := planet.Satellites[0].DB - acctDB := saDB.Accounting() + acctDB := saDB.ProjectAccounting() // Setup: create a new project to use the projectID projects, err := planet.Satellites[0].DB.Console().Projects().GetAll(ctx) @@ -57,7 +57,7 @@ func TestProjectUsageStorage(t *testing.T) { } // Execute test: get storage totals for a project, then check if that exceeds the max usage limit - inlineTotal, remoteTotal, err := acctDB.ProjectStorageTotals(ctx, projectID) + inlineTotal, remoteTotal, err := acctDB.GetStorageTotals(ctx, projectID) require.NoError(t, err) maxAlphaUsage := 25 * memory.GB actualExceeded, actualResource := accounting.ExceedsAlphaUsage(0, inlineTotal, remoteTotal, maxAlphaUsage) @@ -97,7 +97,7 @@ func TestProjectUsageBandwidth(t *testing.T) { }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { saDB := planet.Satellites[0].DB orderDB := saDB.Orders() - acctDB := saDB.Accounting() + acctDB := saDB.ProjectAccounting() // Setup: get projectID and create bucketID projects, err := planet.Satellites[0].DB.Console().Projects().GetAll(ctx) @@ -128,7 +128,7 @@ func TestProjectUsageBandwidth(t *testing.T) { from := time.Now().AddDate(0, 0, -accounting.AverageDaysInMonth) // Execute test: get bandwidth totals for a project, then check if that exceeds the max usage limit - bandwidthTotal, err := acctDB.ProjectAllocatedBandwidthTotal(ctx, bucketID, from) + bandwidthTotal, err := acctDB.GetAllocatedBandwidthTotal(ctx, bucketID, from) require.NoError(t, err) maxAlphaUsage := 25 * memory.GB actualExceeded, actualResource := accounting.ExceedsAlphaUsage(bandwidthTotal, 0, 0, maxAlphaUsage) @@ -155,7 +155,7 @@ func createBucketID(projectID uuid.UUID, bucket []byte) []byte { return []byte(storj.JoinPaths(entries...)) } -func setUpStorageTallies(ctx *testcontext.Context, projectID uuid.UUID, acctDB accounting.DB, time time.Time) error { +func setUpStorageTallies(ctx *testcontext.Context, projectID uuid.UUID, acctDB accounting.ProjectAccounting, time time.Time) error { // Create many records that sum greater than project usage limit of 25GB for i := 0; i < 4; i++ { @@ -169,7 +169,7 @@ func setUpStorageTallies(ctx *testcontext.Context, projectID uuid.UUID, acctDB a // that sum greater than the maxAlphaUsage * expansionFactor RemoteBytes: 10 * memory.GB.Int64() * accounting.ExpansionFactor, } - err := acctDB.CreateBucketStorageTally(ctx, tally) + err := acctDB.CreateStorageTally(ctx, tally) if err != nil { return err } @@ -226,7 +226,7 @@ func TestProjectBandwidthTotal(t *testing.T) { ctx := testcontext.New(t) defer ctx.Cleanup() - accountingDB := db.Accounting() + pdb := db.ProjectAccounting() projectID, err := uuid.New() require.NoError(t, err) @@ -237,7 +237,7 @@ func TestProjectBandwidthTotal(t *testing.T) { // Execute test: get project bandwidth total bucketID := createBucketID(*projectID, []byte("testbucket")) from := time.Now().AddDate(0, 0, -accounting.AverageDaysInMonth) // past 30 days - actualBandwidthTotal, err := accountingDB.ProjectAllocatedBandwidthTotal(ctx, bucketID, from) + actualBandwidthTotal, err := pdb.GetAllocatedBandwidthTotal(ctx, bucketID, from) require.NoError(t, err) require.Equal(t, actualBandwidthTotal, expectedTotal) }) diff --git a/pkg/accounting/rollup/rollup.go b/pkg/accounting/rollup/rollup.go index f719fe29e..1c3b8cc13 100644 --- a/pkg/accounting/rollup/rollup.go +++ b/pkg/accounting/rollup/rollup.go @@ -26,16 +26,16 @@ type Config struct { type Service struct { logger *zap.Logger ticker *time.Ticker - db accounting.DB + sdb accounting.StoragenodeAccounting deleteTallies bool } // New creates a new rollup service -func New(logger *zap.Logger, db accounting.DB, interval time.Duration, deleteTallies bool) *Service { +func New(logger *zap.Logger, sdb accounting.StoragenodeAccounting, interval time.Duration, deleteTallies bool) *Service { return &Service{ logger: logger, ticker: time.NewTicker(interval), - db: db, + sdb: sdb, deleteTallies: deleteTallies, } } @@ -60,7 +60,7 @@ func (r *Service) Run(ctx context.Context) (err error) { // Rollup aggregates storage and bandwidth amounts for the time interval func (r *Service) Rollup(ctx context.Context) error { // only Rollup new things - get LastRollup - lastRollup, err := r.db.LastTimestamp(ctx, accounting.LastRollup) + lastRollup, err := r.sdb.LastTimestamp(ctx, accounting.LastRollup) if err != nil { return Error.Wrap(err) } @@ -83,14 +83,14 @@ func (r *Service) Rollup(ctx context.Context) error { return nil } - err = r.db.SaveRollup(ctx, latestTally, rollupStats) + err = r.sdb.SaveRollup(ctx, latestTally, rollupStats) if err != nil { return Error.Wrap(err) } if r.deleteTallies { // Delete already rolled up tallies - err = r.db.DeleteRawBefore(ctx, latestTally) + err = r.sdb.DeleteTalliesBefore(ctx, latestTally) if err != nil { return Error.Wrap(err) } @@ -101,7 +101,7 @@ func (r *Service) Rollup(ctx context.Context) error { // RollupStorage rolls up storage tally, modifies rollupStats map func (r *Service) RollupStorage(ctx context.Context, lastRollup time.Time, rollupStats accounting.RollupStats) (latestTally time.Time, err error) { - tallies, err := r.db.GetRawSince(ctx, lastRollup) + tallies, err := r.sdb.GetTalliesSince(ctx, lastRollup) if err != nil { return time.Now(), Error.Wrap(err) } @@ -126,12 +126,7 @@ func (r *Service) RollupStorage(ctx context.Context, lastRollup time.Time, rollu rollupStats[iDay][node] = &accounting.Rollup{NodeID: node, StartTime: iDay} } //increment data at rest sum - switch tallyRow.DataType { - case accounting.AtRest: - rollupStats[iDay][node].AtRestTotal += tallyRow.DataTotal - default: - r.logger.Info("rollupStorage no longer supports non-accounting.AtRest datatypes") - } + rollupStats[iDay][node].AtRestTotal += tallyRow.DataTotal } return latestTally, nil @@ -140,7 +135,7 @@ func (r *Service) RollupStorage(ctx context.Context, lastRollup time.Time, rollu // RollupBW aggregates the bandwidth rollups, modifies rollupStats map func (r *Service) RollupBW(ctx context.Context, lastRollup time.Time, rollupStats accounting.RollupStats) error { var latestTally time.Time - bws, err := r.db.GetStoragenodeBandwidthSince(ctx, lastRollup.UTC()) + bws, err := r.sdb.GetBandwidthSince(ctx, lastRollup.UTC()) if err != nil { return Error.Wrap(err) } diff --git a/pkg/accounting/rollup/rollup_test.go b/pkg/accounting/rollup/rollup_test.go index 8bbd5f6f0..84b9f7053 100644 --- a/pkg/accounting/rollup/rollup_test.go +++ b/pkg/accounting/rollup/rollup_test.go @@ -14,7 +14,6 @@ import ( "storj.io/storj/internal/testcontext" "storj.io/storj/internal/testplanet" - "storj.io/storj/pkg/accounting" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/storj" "storj.io/storj/satellite" @@ -37,7 +36,7 @@ func TestRollupNoDeletes(t *testing.T) { start := timestamp for i := 0; i < days; i++ { - err := planet.Satellites[0].DB.Accounting().SaveAtRestRaw(ctx, timestamp, timestamp, testData[i].nodeData) + err := planet.Satellites[0].DB.StoragenodeAccounting().SaveTallies(ctx, timestamp, testData[i].nodeData) require.NoError(t, err) err = saveBW(ctx, planet, testData[i].bwTotals, timestamp) require.NoError(t, err) @@ -53,7 +52,7 @@ func TestRollupNoDeletes(t *testing.T) { start = time.Date(start.Year(), start.Month(), start.Day(), 0, 0, 0, 0, start.Location()) end = time.Date(end.Year(), end.Month(), end.Day(), 0, 0, 0, 0, end.Location()) - rows, err := planet.Satellites[0].DB.Accounting().QueryPaymentInfo(ctx, start, end) + rows, err := planet.Satellites[0].DB.StoragenodeAccounting().QueryPaymentInfo(ctx, start, end) require.NoError(t, err) if i == 0 { // we need at least two days for rollup to work assert.Equal(t, 0, len(rows)) @@ -74,7 +73,7 @@ func TestRollupNoDeletes(t *testing.T) { assert.NotEmpty(t, r.Wallet) } } - raw, err := planet.Satellites[0].DB.Accounting().GetRaw(ctx) + raw, err := planet.Satellites[0].DB.StoragenodeAccounting().GetTallies(ctx) require.NoError(t, err) assert.Equal(t, days*len(planet.StorageNodes), len(raw)) }) @@ -97,7 +96,7 @@ func TestRollupDeletes(t *testing.T) { start := timestamp for i := 0; i < days; i++ { - err := planet.Satellites[0].DB.Accounting().SaveAtRestRaw(ctx, timestamp, timestamp, testData[i].nodeData) + err := planet.Satellites[0].DB.StoragenodeAccounting().SaveTallies(ctx, timestamp, testData[i].nodeData) require.NoError(t, err) err = saveBW(ctx, planet, testData[i].bwTotals, timestamp) require.NoError(t, err) @@ -106,15 +105,12 @@ func TestRollupDeletes(t *testing.T) { require.NoError(t, err) // Assert that RollupStorage deleted all raws except for today's - raw, err := planet.Satellites[0].DB.Accounting().GetRaw(ctx) + raw, err := planet.Satellites[0].DB.StoragenodeAccounting().GetTallies(ctx) require.NoError(t, err) for _, r := range raw { assert.Equal(t, r.IntervalEndTime.UTC().Truncate(time.Second), timestamp.Truncate(time.Second)) - if r.DataType == accounting.AtRest { - assert.Equal(t, testData[i].nodeData[r.NodeID], r.DataTotal) - } else { - assert.Equal(t, testData[i].bwTotals[r.NodeID][r.DataType], int64(r.DataTotal)) - } + assert.Equal(t, testData[i].nodeData[r.NodeID], r.DataTotal) + } // Advance time by 24 hours @@ -125,7 +121,7 @@ func TestRollupDeletes(t *testing.T) { start = time.Date(start.Year(), start.Month(), start.Day(), 0, 0, 0, 0, start.Location()) end = time.Date(end.Year(), end.Month(), end.Day(), 0, 0, 0, 0, end.Location()) - rows, err := planet.Satellites[0].DB.Accounting().QueryPaymentInfo(ctx, start, end) + rows, err := planet.Satellites[0].DB.StoragenodeAccounting().QueryPaymentInfo(ctx, start, end) require.NoError(t, err) if i == 0 { // we need at least two days for rollup to work assert.Equal(t, 0, len(rows)) diff --git a/pkg/accounting/tally/tally.go b/pkg/accounting/tally/tally.go index 9cfb63354..33a99898a 100644 --- a/pkg/accounting/tally/tally.go +++ b/pkg/accounting/tally/tally.go @@ -27,25 +27,27 @@ type Config struct { // Service is the tally service for data stored on each storage node type Service struct { - logger *zap.Logger - metainfo *metainfo.Service - overlay *overlay.Cache - limit int - ticker *time.Ticker - accountingDB accounting.DB - liveAccounting live.Service + logger *zap.Logger + metainfo *metainfo.Service + overlay *overlay.Cache + limit int + ticker *time.Ticker + storagenodeAccountingDB accounting.StoragenodeAccounting + projectAccountingDB accounting.ProjectAccounting + liveAccounting live.Service } // New creates a new tally Service -func New(logger *zap.Logger, accountingDB accounting.DB, liveAccounting live.Service, metainfo *metainfo.Service, overlay *overlay.Cache, limit int, interval time.Duration) *Service { +func New(logger *zap.Logger, sdb accounting.StoragenodeAccounting, pdb accounting.ProjectAccounting, liveAccounting live.Service, metainfo *metainfo.Service, overlay *overlay.Cache, limit int, interval time.Duration) *Service { return &Service{ - logger: logger, - metainfo: metainfo, - overlay: overlay, - limit: limit, - ticker: time.NewTicker(interval), - accountingDB: accountingDB, - liveAccounting: liveAccounting, + logger: logger, + metainfo: metainfo, + overlay: overlay, + limit: limit, + ticker: time.NewTicker(interval), + storagenodeAccountingDB: sdb, + projectAccountingDB: pdb, + liveAccounting: liveAccounting, } } @@ -83,13 +85,13 @@ func (t *Service) Tally(ctx context.Context) error { errAtRest = errs.New("Query for data-at-rest failed : %v", err) } else { if len(nodeData) > 0 { - err = t.SaveAtRestRaw(ctx, latestTally, time.Now().UTC(), nodeData) + err = t.storagenodeAccountingDB.SaveTallies(ctx, latestTally, nodeData) if err != nil { errAtRest = errs.New("Saving storage node data-at-rest failed : %v", err) } } if len(bucketData) > 0 { - _, err = t.accountingDB.SaveBucketTallies(ctx, latestTally, bucketData) + _, err = t.projectAccountingDB.SaveTallies(ctx, latestTally, bucketData) if err != nil { errBucketInfo = errs.New("Saving bucket storage data failed") } @@ -103,7 +105,7 @@ func (t *Service) Tally(ctx context.Context) error { func (t *Service) CalculateAtRestData(ctx context.Context) (latestTally time.Time, nodeData map[storj.NodeID]float64, bucketTallies map[string]*accounting.BucketTally, err error) { defer mon.Task()(&ctx)(&err) - latestTally, err = t.accountingDB.LastTimestamp(ctx, accounting.LastAtRestTally) + latestTally, err = t.storagenodeAccountingDB.LastTimestamp(ctx, accounting.LastAtRestTally) if err != nil { return latestTally, nodeData, bucketTallies, Error.Wrap(err) } @@ -212,8 +214,3 @@ func (t *Service) CalculateAtRestData(ctx context.Context) (latestTally time.Tim } return latestTally, nodeData, bucketTallies, err } - -// SaveAtRestRaw records raw tallies of at-rest-data and updates the LastTimestamp -func (t *Service) SaveAtRestRaw(ctx context.Context, latestTally time.Time, created time.Time, nodeData map[storj.NodeID]float64) error { - return t.accountingDB.SaveAtRestRaw(ctx, latestTally, created, nodeData) -} diff --git a/pkg/accounting/tally/tally_test.go b/pkg/accounting/tally/tally_test.go index 97e151162..b04784740 100644 --- a/pkg/accounting/tally/tally_test.go +++ b/pkg/accounting/tally/tally_test.go @@ -20,19 +20,16 @@ import ( "storj.io/storj/pkg/storj" ) -func TestDeleteRawBefore(t *testing.T) { +func TestDeleteTalliesBefore(t *testing.T) { tests := []struct { - createdAt time.Time eraseBefore time.Time expectedRaws int }{ { - createdAt: time.Now(), eraseBefore: time.Now(), expectedRaws: 1, }, { - createdAt: time.Now(), eraseBefore: time.Now().Add(24 * time.Hour), expectedRaws: 0, }, @@ -46,13 +43,13 @@ func TestDeleteRawBefore(t *testing.T) { nodeData := make(map[storj.NodeID]float64) nodeData[id] = float64(1000) - err := planet.Satellites[0].DB.Accounting().SaveAtRestRaw(ctx, tt.createdAt, tt.createdAt, nodeData) + err := planet.Satellites[0].DB.StoragenodeAccounting().SaveTallies(ctx, time.Now(), nodeData) require.NoError(t, err) - err = planet.Satellites[0].DB.Accounting().DeleteRawBefore(ctx, tt.eraseBefore) + err = planet.Satellites[0].DB.StoragenodeAccounting().DeleteTalliesBefore(ctx, tt.eraseBefore) require.NoError(t, err) - raws, err := planet.Satellites[0].DB.Accounting().GetRaw(ctx) + raws, err := planet.Satellites[0].DB.StoragenodeAccounting().GetTallies(ctx) require.NoError(t, err) assert.Len(t, raws, tt.expectedRaws) }) @@ -98,7 +95,7 @@ func TestOnlyInline(t *testing.T) { require.NoError(t, err) assert.Len(t, actualNodeData, 0) - _, err = planet.Satellites[0].DB.Accounting().SaveBucketTallies(ctx, latestTally, actualBucketData) + _, err = planet.Satellites[0].DB.ProjectAccounting().SaveTallies(ctx, latestTally, actualBucketData) require.NoError(t, err) // Confirm the correct bucket storage tally was created diff --git a/pkg/overlay/cache.go b/pkg/overlay/cache.go index 6e908601e..634b159e5 100644 --- a/pkg/overlay/cache.go +++ b/pkg/overlay/cache.go @@ -226,8 +226,7 @@ func (cache *Cache) FindStorageNodesWithPreferences(ctx context.Context, req Fin return nodes, nil } -// KnownUnreliableOrOffline filters a set of nodes to unhealth or offlines node, independent of new -// Note that KnownUnreliableOrOffline will not return node ids which are not in the database at all +// KnownUnreliableOrOffline filters a set of nodes to unhealth or offlines node, independent of new. func (cache *Cache) KnownUnreliableOrOffline(ctx context.Context, nodeIds storj.NodeIDList) (badNodes storj.NodeIDList, err error) { defer mon.Task()(&ctx)(&err) criteria := &NodeCriteria{ diff --git a/satellite/metainfo/metainfo.go b/satellite/metainfo/metainfo.go index a2a236d1d..3f3a11697 100644 --- a/satellite/metainfo/metainfo.go +++ b/satellite/metainfo/metainfo.go @@ -44,28 +44,30 @@ type APIKeys interface { // Endpoint metainfo endpoint type Endpoint struct { - log *zap.Logger - metainfo *Service - orders *orders.Service - cache *overlay.Cache - apiKeys APIKeys - accountingDB accounting.DB - liveAccounting live.Service - maxAlphaUsage memory.Size + log *zap.Logger + metainfo *Service + orders *orders.Service + cache *overlay.Cache + apiKeys APIKeys + storagenodeAccountingDB accounting.StoragenodeAccounting + projectAccountingDB accounting.ProjectAccounting + liveAccounting live.Service + maxAlphaUsage memory.Size } // NewEndpoint creates new metainfo endpoint instance -func NewEndpoint(log *zap.Logger, metainfo *Service, orders *orders.Service, cache *overlay.Cache, apiKeys APIKeys, acctDB accounting.DB, liveAccounting live.Service, maxAlphaUsage memory.Size) *Endpoint { +func NewEndpoint(log *zap.Logger, metainfo *Service, orders *orders.Service, cache *overlay.Cache, apiKeys APIKeys, sdb accounting.StoragenodeAccounting, pdb accounting.ProjectAccounting, liveAccounting live.Service, maxAlphaUsage memory.Size) *Endpoint { // TODO do something with too many params return &Endpoint{ - log: log, - metainfo: metainfo, - orders: orders, - cache: cache, - apiKeys: apiKeys, - accountingDB: acctDB, - liveAccounting: liveAccounting, - maxAlphaUsage: maxAlphaUsage, + log: log, + metainfo: metainfo, + orders: orders, + cache: cache, + apiKeys: apiKeys, + storagenodeAccountingDB: sdb, + projectAccountingDB: pdb, + liveAccounting: liveAccounting, + maxAlphaUsage: maxAlphaUsage, } } @@ -150,6 +152,7 @@ func (endpoint *Endpoint) CreateSegment(ctx context.Context, req *pb.SegmentWrit inlineTotal, remoteTotal, err := endpoint.getProjectStorageTotals(ctx, keyInfo.ProjectID) if err != nil { endpoint.log.Error("retrieving project storage totals", zap.Error(err)) + } exceeded, resource := accounting.ExceedsAlphaUsage(0, inlineTotal, remoteTotal, endpoint.maxAlphaUsage) if exceeded { @@ -192,7 +195,7 @@ func (endpoint *Endpoint) CreateSegment(ctx context.Context, req *pb.SegmentWrit } func (endpoint *Endpoint) getProjectStorageTotals(ctx context.Context, projectID uuid.UUID) (int64, int64, error) { - lastCountInline, lastCountRemote, err := endpoint.accountingDB.ProjectStorageTotals(ctx, projectID) + lastCountInline, lastCountRemote, err := endpoint.projectAccountingDB.GetStorageTotals(ctx, projectID) if err != nil { return 0, 0, err } @@ -296,7 +299,7 @@ func (endpoint *Endpoint) DownloadSegment(ctx context.Context, req *pb.SegmentDo // Ref: https://storjlabs.atlassian.net/browse/V3-1274 bucketID := createBucketID(keyInfo.ProjectID, req.Bucket) from := time.Now().AddDate(0, 0, -accounting.AverageDaysInMonth) // past 30 days - bandwidthTotal, err := endpoint.accountingDB.ProjectAllocatedBandwidthTotal(ctx, bucketID, from) + bandwidthTotal, err := endpoint.projectAccountingDB.GetAllocatedBandwidthTotal(ctx, bucketID, from) if err != nil { endpoint.log.Error("retrieving ProjectBandwidthTotal", zap.Error(err)) } diff --git a/satellite/peer.go b/satellite/peer.go index b5f0bd36b..6708781f6 100644 --- a/satellite/peer.go +++ b/satellite/peer.go @@ -73,8 +73,10 @@ type DB interface { CertDB() certdb.DB // OverlayCache returns database for caching overlay information OverlayCache() overlay.DB - // Accounting returns database for storing information about data use - Accounting() accounting.DB + // StoragenodeAccounting returns database for storing information about storagenode use + StoragenodeAccounting() accounting.StoragenodeAccounting + // ProjectAccounting returns database for storing information about project data use + ProjectAccounting() accounting.ProjectAccounting // RepairQueue returns queue for segments that need repairing RepairQueue() queue.RepairQueue // Irreparable returns database for failed repairs @@ -354,7 +356,8 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config, ve peer.Orders.Service, peer.Overlay.Service, peer.DB.Console().APIKeys(), - peer.DB.Accounting(), + peer.DB.StoragenodeAccounting(), + peer.DB.ProjectAccounting(), peer.LiveAccounting.Service, config.Rollup.MaxAlphaUsage, ) @@ -413,8 +416,8 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config, ve { // setup accounting log.Debug("Setting up accounting") - peer.Accounting.Tally = tally.New(peer.Log.Named("tally"), peer.DB.Accounting(), peer.LiveAccounting.Service, peer.Metainfo.Service, peer.Overlay.Service, 0, config.Tally.Interval) - peer.Accounting.Rollup = rollup.New(peer.Log.Named("rollup"), peer.DB.Accounting(), config.Rollup.Interval, config.Rollup.DeleteTallies) + peer.Accounting.Tally = tally.New(peer.Log.Named("tally"), peer.DB.StoragenodeAccounting(), peer.DB.ProjectAccounting(), peer.LiveAccounting.Service, peer.Metainfo.Service, peer.Overlay.Service, 0, config.Tally.Interval) + peer.Accounting.Rollup = rollup.New(peer.Log.Named("rollup"), peer.DB.StoragenodeAccounting(), config.Rollup.Interval, config.Rollup.DeleteTallies) } { // setup inspector diff --git a/satellite/satellitedb/accounting.go b/satellite/satellitedb/accounting.go deleted file mode 100644 index e985df470..000000000 --- a/satellite/satellitedb/accounting.go +++ /dev/null @@ -1,296 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information. - -package satellitedb - -import ( - "bytes" - "context" - "database/sql" - "time" - - "github.com/skyrings/skyring-common/tools/uuid" - "github.com/zeebo/errs" - - "storj.io/storj/pkg/accounting" - "storj.io/storj/pkg/pb" - "storj.io/storj/pkg/storj" - dbx "storj.io/storj/satellite/satellitedb/dbx" -) - -//database implements DB -type accountingDB struct { - db *dbx.DB -} - -// ProjectAllocatedBandwidthTotal returns the sum of GET bandwidth usage allocated for a projectID for a time frame -func (db *accountingDB) ProjectAllocatedBandwidthTotal(ctx context.Context, bucketID []byte, from time.Time) (int64, error) { - pathEl := bytes.Split(bucketID, []byte("/")) - _, projectID := pathEl[1], pathEl[0] - var sum *int64 - query := `SELECT SUM(allocated) FROM bucket_bandwidth_rollups WHERE project_id = ? AND action = ? AND interval_start > ?;` - err := db.db.QueryRow(db.db.Rebind(query), projectID, pb.PieceAction_GET, from).Scan(&sum) - if err == sql.ErrNoRows || sum == nil { - return 0, nil - } - - return *sum, err -} - -// ProjectStorageTotals returns the current inline and remote storage usage for a projectID -func (db *accountingDB) ProjectStorageTotals(ctx context.Context, projectID uuid.UUID) (int64, int64, error) { - var inlineSum, remoteSum sql.NullInt64 - var intervalStart time.Time - - // Sum all the inline and remote values for a project that all share the same interval_start. - // All records for a project that have the same interval start are part of the same tally run. - // This should represent the most recent calculation of a project's total at rest storage. - query := `SELECT interval_start, SUM(inline), SUM(remote) - FROM bucket_storage_tallies - WHERE project_id = ? - GROUP BY interval_start - ORDER BY interval_start DESC LIMIT 1;` - - err := db.db.QueryRow(db.db.Rebind(query), projectID[:]).Scan(&intervalStart, &inlineSum, &remoteSum) - if err != nil || !inlineSum.Valid || !remoteSum.Valid { - return 0, 0, nil - } - return inlineSum.Int64, remoteSum.Int64, err -} - -// CreateBucketStorageTally creates a record in the bucket_storage_tallies accounting table -func (db *accountingDB) CreateBucketStorageTally(ctx context.Context, tally accounting.BucketStorageTally) error { - _, err := db.db.Create_BucketStorageTally( - ctx, - dbx.BucketStorageTally_BucketName([]byte(tally.BucketName)), - dbx.BucketStorageTally_ProjectId(tally.ProjectID[:]), - dbx.BucketStorageTally_IntervalStart(tally.IntervalStart), - dbx.BucketStorageTally_Inline(uint64(tally.InlineBytes)), - dbx.BucketStorageTally_Remote(uint64(tally.RemoteBytes)), - dbx.BucketStorageTally_RemoteSegmentsCount(uint(tally.RemoteSegmentCount)), - dbx.BucketStorageTally_InlineSegmentsCount(uint(tally.InlineSegmentCount)), - dbx.BucketStorageTally_ObjectCount(uint(tally.ObjectCount)), - dbx.BucketStorageTally_MetadataSize(uint64(tally.MetadataSize)), - ) - if err != nil { - return err - } - return nil -} - -// LastTimestamp records the greatest last tallied time -func (db *accountingDB) LastTimestamp(ctx context.Context, timestampType string) (time.Time, error) { - lastTally := time.Time{} - err := db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error { - lt, err := tx.Find_AccountingTimestamps_Value_By_Name(ctx, dbx.AccountingTimestamps_Name(timestampType)) - if lt == nil { - update := dbx.AccountingTimestamps_Value(lastTally) - _, err = tx.Create_AccountingTimestamps(ctx, dbx.AccountingTimestamps_Name(timestampType), update) - return err - } - lastTally = lt.Value - return err - }) - return lastTally, err -} - -// SaveAtRestRaw records raw tallies of at rest data to the database -func (db *accountingDB) SaveAtRestRaw(ctx context.Context, latestTally time.Time, created time.Time, nodeData map[storj.NodeID]float64) error { - if len(nodeData) == 0 { - return Error.New("In SaveAtRestRaw with empty nodeData") - } - err := db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error { - for k, v := range nodeData { - nID := dbx.AccountingRaw_NodeId(k.Bytes()) - end := dbx.AccountingRaw_IntervalEndTime(latestTally) - total := dbx.AccountingRaw_DataTotal(v) - dataType := dbx.AccountingRaw_DataType(accounting.AtRest) - timestamp := dbx.AccountingRaw_CreatedAt(created) - _, err := tx.Create_AccountingRaw(ctx, nID, end, total, dataType, timestamp) - if err != nil { - return err - } - } - update := dbx.AccountingTimestamps_Update_Fields{Value: dbx.AccountingTimestamps_Value(latestTally)} - _, err := tx.Update_AccountingTimestamps_By_Name(ctx, dbx.AccountingTimestamps_Name(accounting.LastAtRestTally), update) - return err - }) - return Error.Wrap(err) -} - -// GetRaw retrieves all raw tallies -func (db *accountingDB) GetRaw(ctx context.Context) ([]*accounting.Raw, error) { - raws, err := db.db.All_AccountingRaw(ctx) - out := make([]*accounting.Raw, len(raws)) - for i, r := range raws { - nodeID, err := storj.NodeIDFromBytes(r.NodeId) - if err != nil { - return nil, Error.Wrap(err) - } - out[i] = &accounting.Raw{ - ID: r.Id, - NodeID: nodeID, - IntervalEndTime: r.IntervalEndTime, - DataTotal: r.DataTotal, - DataType: r.DataType, - CreatedAt: r.CreatedAt, - } - } - return out, Error.Wrap(err) -} - -// GetRawSince retrieves all raw tallies since latestRollup -func (db *accountingDB) GetRawSince(ctx context.Context, latestRollup time.Time) ([]*accounting.Raw, error) { - raws, err := db.db.All_AccountingRaw_By_IntervalEndTime_GreaterOrEqual(ctx, dbx.AccountingRaw_IntervalEndTime(latestRollup)) - out := make([]*accounting.Raw, len(raws)) - for i, r := range raws { - nodeID, err := storj.NodeIDFromBytes(r.NodeId) - if err != nil { - return nil, Error.Wrap(err) - } - out[i] = &accounting.Raw{ - ID: r.Id, - NodeID: nodeID, - IntervalEndTime: r.IntervalEndTime, - DataTotal: r.DataTotal, - DataType: r.DataType, - CreatedAt: r.CreatedAt, - } - } - return out, Error.Wrap(err) -} - -// GetStoragenodeBandwidthSince retrieves all storagenode_bandwidth_rollup entires since latestRollup -func (db *accountingDB) GetStoragenodeBandwidthSince(ctx context.Context, latestRollup time.Time) ([]*accounting.StoragenodeBandwidthRollup, error) { - rollups, err := db.db.All_StoragenodeBandwidthRollup_By_IntervalStart_GreaterOrEqual(ctx, dbx.StoragenodeBandwidthRollup_IntervalStart(latestRollup)) - out := make([]*accounting.StoragenodeBandwidthRollup, len(rollups)) - for i, r := range rollups { - nodeID, err := storj.NodeIDFromBytes(r.StoragenodeId) - if err != nil { - return nil, Error.Wrap(err) - } - out[i] = &accounting.StoragenodeBandwidthRollup{ - NodeID: nodeID, - IntervalStart: r.IntervalStart, - Action: r.Action, - Settled: r.Settled, - } - } - return out, Error.Wrap(err) -} - -// SaveRollup records raw tallies of at rest data to the database -func (db *accountingDB) SaveRollup(ctx context.Context, latestRollup time.Time, stats accounting.RollupStats) error { - if len(stats) == 0 { - return Error.New("In SaveRollup with empty nodeData") - } - err := db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error { - for _, arsByDate := range stats { - for _, ar := range arsByDate { - nID := dbx.AccountingRollup_NodeId(ar.NodeID.Bytes()) - start := dbx.AccountingRollup_StartTime(ar.StartTime) - put := dbx.AccountingRollup_PutTotal(ar.PutTotal) - get := dbx.AccountingRollup_GetTotal(ar.GetTotal) - audit := dbx.AccountingRollup_GetAuditTotal(ar.GetAuditTotal) - getRepair := dbx.AccountingRollup_GetRepairTotal(ar.GetRepairTotal) - putRepair := dbx.AccountingRollup_PutRepairTotal(ar.PutRepairTotal) - atRest := dbx.AccountingRollup_AtRestTotal(ar.AtRestTotal) - _, err := tx.Create_AccountingRollup(ctx, nID, start, put, get, audit, getRepair, putRepair, atRest) - if err != nil { - return err - } - } - } - update := dbx.AccountingTimestamps_Update_Fields{Value: dbx.AccountingTimestamps_Value(latestRollup)} - _, err := tx.Update_AccountingTimestamps_By_Name(ctx, dbx.AccountingTimestamps_Name(accounting.LastRollup), update) - return err - }) - return Error.Wrap(err) -} - -// SaveBucketTallies saves the latest bucket info -func (db *accountingDB) SaveBucketTallies(ctx context.Context, intervalStart time.Time, bucketTallies map[string]*accounting.BucketTally) ([]accounting.BucketTally, error) { - if len(bucketTallies) == 0 { - return nil, Error.New("In SaveBucketTallies with empty bucketTallies") - } - - var result []accounting.BucketTally - - for bucketID, info := range bucketTallies { - bucketIDComponents := storj.SplitPath(bucketID) - bucketName := dbx.BucketStorageTally_BucketName([]byte(bucketIDComponents[1])) - projectID := dbx.BucketStorageTally_ProjectId([]byte(bucketIDComponents[0])) - interval := dbx.BucketStorageTally_IntervalStart(intervalStart) - inlineBytes := dbx.BucketStorageTally_Inline(uint64(info.InlineBytes)) - remoteBytes := dbx.BucketStorageTally_Remote(uint64(info.RemoteBytes)) - rSegments := dbx.BucketStorageTally_RemoteSegmentsCount(uint(info.RemoteSegments)) - iSegments := dbx.BucketStorageTally_InlineSegmentsCount(uint(info.InlineSegments)) - objectCount := dbx.BucketStorageTally_ObjectCount(uint(info.Files)) - meta := dbx.BucketStorageTally_MetadataSize(uint64(info.MetadataSize)) - dbxTally, err := db.db.Create_BucketStorageTally(ctx, bucketName, projectID, interval, inlineBytes, remoteBytes, rSegments, iSegments, objectCount, meta) - if err != nil { - return nil, err - } - tally := accounting.BucketTally{ - BucketName: dbxTally.BucketName, - ProjectID: dbxTally.ProjectId, - InlineSegments: int64(dbxTally.InlineSegmentsCount), - RemoteSegments: int64(dbxTally.RemoteSegmentsCount), - Files: int64(dbxTally.ObjectCount), - InlineBytes: int64(dbxTally.Inline), - RemoteBytes: int64(dbxTally.Remote), - MetadataSize: int64(dbxTally.MetadataSize), - } - result = append(result, tally) - } - return result, nil -} - -// QueryPaymentInfo queries Overlay, Accounting Rollup on nodeID -func (db *accountingDB) QueryPaymentInfo(ctx context.Context, start time.Time, end time.Time) ([]*accounting.CSVRow, error) { - var sqlStmt = `SELECT n.id, n.created_at, n.audit_success_ratio, r.at_rest_total, r.get_repair_total, - r.put_repair_total, r.get_audit_total, r.put_total, r.get_total, n.wallet - FROM ( - SELECT node_id, SUM(at_rest_total) AS at_rest_total, SUM(get_repair_total) AS get_repair_total, - SUM(put_repair_total) AS put_repair_total, SUM(get_audit_total) AS get_audit_total, - SUM(put_total) AS put_total, SUM(get_total) AS get_total - FROM accounting_rollups - WHERE start_time >= ? AND start_time < ? - GROUP BY node_id - ) r - LEFT JOIN nodes n ON n.id = r.node_id - ORDER BY n.id` - rows, err := db.db.DB.QueryContext(ctx, db.db.Rebind(sqlStmt), start.UTC(), end.UTC()) - if err != nil { - return nil, Error.Wrap(err) - } - defer func() { err = errs.Combine(err, rows.Close()) }() - csv := make([]*accounting.CSVRow, 0, 0) - for rows.Next() { - var nodeID []byte - r := &accounting.CSVRow{} - var wallet sql.NullString - err := rows.Scan(&nodeID, &r.NodeCreationDate, &r.AuditSuccessRatio, &r.AtRestTotal, &r.GetRepairTotal, - &r.PutRepairTotal, &r.GetAuditTotal, &r.PutTotal, &r.GetTotal, &wallet) - if err != nil { - return csv, Error.Wrap(err) - } - if wallet.Valid { - r.Wallet = wallet.String - } - id, err := storj.NodeIDFromBytes(nodeID) - if err != nil { - return csv, Error.Wrap(err) - } - r.NodeID = id - csv = append(csv, r) - } - return csv, nil -} - -// DeleteRawBefore deletes all raw tallies prior to some time -func (db *accountingDB) DeleteRawBefore(ctx context.Context, latestRollup time.Time) error { - var deleteRawSQL = `DELETE FROM accounting_raws WHERE interval_end_time < ?` - _, err := db.db.DB.ExecContext(ctx, db.db.Rebind(deleteRawSQL), latestRollup) - return err -} diff --git a/satellite/satellitedb/database.go b/satellite/satellitedb/database.go index 20396733b..f344a8b86 100644 --- a/satellite/satellitedb/database.go +++ b/satellite/satellitedb/database.go @@ -114,9 +114,14 @@ func (db *DB) RepairQueue() queue.RepairQueue { return &repairQueue{db: db.db} } -// Accounting returns database for tracking bandwidth agreements over time -func (db *DB) Accounting() accounting.DB { - return &accountingDB{db: db.db} +// StoragenodeAccounting returns database for tracking storagenode usage +func (db *DB) StoragenodeAccounting() accounting.StoragenodeAccounting { + return &StoragenodeAccounting{db: db.db} +} + +// ProjectAccounting returns database for tracking project data use +func (db *DB) ProjectAccounting() accounting.ProjectAccounting { + return &ProjectAccounting{db: db.db} } // Irreparable returns database for storing segments that failed repair diff --git a/satellite/satellitedb/dbx/satellitedb.dbx b/satellite/satellitedb/dbx/satellitedb.dbx index 73cc3de0e..d9ab6c529 100644 --- a/satellite/satellitedb/dbx/satellitedb.dbx +++ b/satellite/satellitedb/dbx/satellitedb.dbx @@ -86,34 +86,6 @@ read all ( where accounting_rollup.start_time >= ? ) -model accounting_raw ( - key id - - field id serial64 - field node_id blob - field interval_end_time timestamp - field data_total float64 - field data_type int - field created_at timestamp -) - -create accounting_raw ( ) -delete accounting_raw ( where accounting_raw.id = ? ) - -read one ( - select accounting_raw - where accounting_raw.id = ? -) - -read all ( - select accounting_raw -) - -read all ( - select accounting_raw - where accounting_raw.interval_end_time >= ? -) - //--- statdb ---// model node ( @@ -482,12 +454,26 @@ read all ( ) model storagenode_storage_tally ( - key storagenode_id interval_start + key id + + field id serial64 + field node_id blob + field interval_end_time timestamp + field data_total float64 +) - field storagenode_id blob - field interval_start utimestamp - - field total uint64 +create storagenode_storage_tally () +delete storagenode_storage_tally ( where storagenode_storage_tally.id = ? ) +read one ( + select storagenode_storage_tally + where storagenode_storage_tally.id = ? +) +read all ( + select storagenode_storage_tally +) +read all ( + select storagenode_storage_tally + where storagenode_storage_tally.interval_end_time >= ? ) //--- certRecord ---// diff --git a/satellite/satellitedb/dbx/satellitedb.dbx.go b/satellite/satellitedb/dbx/satellitedb.dbx.go index eeae32ac5..7eb2aaaa2 100644 --- a/satellite/satellitedb/dbx/satellitedb.dbx.go +++ b/satellite/satellitedb/dbx/satellitedb.dbx.go @@ -272,16 +272,7 @@ func newpostgres(db *DB) *postgresDB { } func (obj *postgresDB) Schema() string { - return `CREATE TABLE accounting_raws ( - id bigserial NOT NULL, - node_id bytea NOT NULL, - interval_end_time timestamp with time zone NOT NULL, - data_total double precision NOT NULL, - data_type integer NOT NULL, - created_at timestamp with time zone NOT NULL, - PRIMARY KEY ( id ) -); -CREATE TABLE accounting_rollups ( + return `CREATE TABLE accounting_rollups ( id bigserial NOT NULL, node_id bytea NOT NULL, start_time timestamp with time zone NOT NULL, @@ -426,10 +417,11 @@ CREATE TABLE storagenode_bandwidth_rollups ( PRIMARY KEY ( storagenode_id, interval_start, action ) ); CREATE TABLE storagenode_storage_tallies ( - storagenode_id bytea NOT NULL, - interval_start timestamp NOT NULL, - total bigint NOT NULL, - PRIMARY KEY ( storagenode_id, interval_start ) + id bigserial NOT NULL, + node_id bytea NOT NULL, + interval_end_time timestamp with time zone NOT NULL, + data_total double precision NOT NULL, + PRIMARY KEY ( id ) ); CREATE TABLE users ( id bytea NOT NULL, @@ -530,16 +522,7 @@ func newsqlite3(db *DB) *sqlite3DB { } func (obj *sqlite3DB) Schema() string { - return `CREATE TABLE accounting_raws ( - id INTEGER NOT NULL, - node_id BLOB NOT NULL, - interval_end_time TIMESTAMP NOT NULL, - data_total REAL NOT NULL, - data_type INTEGER NOT NULL, - created_at TIMESTAMP NOT NULL, - PRIMARY KEY ( id ) -); -CREATE TABLE accounting_rollups ( + return `CREATE TABLE accounting_rollups ( id INTEGER NOT NULL, node_id BLOB NOT NULL, start_time TIMESTAMP NOT NULL, @@ -684,10 +667,11 @@ CREATE TABLE storagenode_bandwidth_rollups ( PRIMARY KEY ( storagenode_id, interval_start, action ) ); CREATE TABLE storagenode_storage_tallies ( - storagenode_id BLOB NOT NULL, - interval_start TIMESTAMP NOT NULL, - total INTEGER NOT NULL, - PRIMARY KEY ( storagenode_id, interval_start ) + id INTEGER NOT NULL, + node_id BLOB NOT NULL, + interval_end_time TIMESTAMP NOT NULL, + data_total REAL NOT NULL, + PRIMARY KEY ( id ) ); CREATE TABLE users ( id BLOB NOT NULL, @@ -787,134 +771,6 @@ nextval: fmt.Fprint(f, "]") } -type AccountingRaw struct { - Id int64 - NodeId []byte - IntervalEndTime time.Time - DataTotal float64 - DataType int - CreatedAt time.Time -} - -func (AccountingRaw) _Table() string { return "accounting_raws" } - -type AccountingRaw_Update_Fields struct { -} - -type AccountingRaw_Id_Field struct { - _set bool - _null bool - _value int64 -} - -func AccountingRaw_Id(v int64) AccountingRaw_Id_Field { - return AccountingRaw_Id_Field{_set: true, _value: v} -} - -func (f AccountingRaw_Id_Field) value() interface{} { - if !f._set || f._null { - return nil - } - return f._value -} - -func (AccountingRaw_Id_Field) _Column() string { return "id" } - -type AccountingRaw_NodeId_Field struct { - _set bool - _null bool - _value []byte -} - -func AccountingRaw_NodeId(v []byte) AccountingRaw_NodeId_Field { - return AccountingRaw_NodeId_Field{_set: true, _value: v} -} - -func (f AccountingRaw_NodeId_Field) value() interface{} { - if !f._set || f._null { - return nil - } - return f._value -} - -func (AccountingRaw_NodeId_Field) _Column() string { return "node_id" } - -type AccountingRaw_IntervalEndTime_Field struct { - _set bool - _null bool - _value time.Time -} - -func AccountingRaw_IntervalEndTime(v time.Time) AccountingRaw_IntervalEndTime_Field { - return AccountingRaw_IntervalEndTime_Field{_set: true, _value: v} -} - -func (f AccountingRaw_IntervalEndTime_Field) value() interface{} { - if !f._set || f._null { - return nil - } - return f._value -} - -func (AccountingRaw_IntervalEndTime_Field) _Column() string { return "interval_end_time" } - -type AccountingRaw_DataTotal_Field struct { - _set bool - _null bool - _value float64 -} - -func AccountingRaw_DataTotal(v float64) AccountingRaw_DataTotal_Field { - return AccountingRaw_DataTotal_Field{_set: true, _value: v} -} - -func (f AccountingRaw_DataTotal_Field) value() interface{} { - if !f._set || f._null { - return nil - } - return f._value -} - -func (AccountingRaw_DataTotal_Field) _Column() string { return "data_total" } - -type AccountingRaw_DataType_Field struct { - _set bool - _null bool - _value int -} - -func AccountingRaw_DataType(v int) AccountingRaw_DataType_Field { - return AccountingRaw_DataType_Field{_set: true, _value: v} -} - -func (f AccountingRaw_DataType_Field) value() interface{} { - if !f._set || f._null { - return nil - } - return f._value -} - -func (AccountingRaw_DataType_Field) _Column() string { return "data_type" } - -type AccountingRaw_CreatedAt_Field struct { - _set bool - _null bool - _value time.Time -} - -func AccountingRaw_CreatedAt(v time.Time) AccountingRaw_CreatedAt_Field { - return AccountingRaw_CreatedAt_Field{_set: true, _value: v} -} - -func (f AccountingRaw_CreatedAt_Field) value() interface{} { - if !f._set || f._null { - return nil - } - return f._value -} - -func (AccountingRaw_CreatedAt_Field) _Column() string { return "created_at" } - type AccountingRollup struct { Id int64 NodeId []byte @@ -3122,9 +2978,10 @@ func (f StoragenodeBandwidthRollup_Settled_Field) value() interface{} { func (StoragenodeBandwidthRollup_Settled_Field) _Column() string { return "settled" } type StoragenodeStorageTally struct { - StoragenodeId []byte - IntervalStart time.Time - Total uint64 + Id int64 + NodeId []byte + IntervalEndTime time.Time + DataTotal float64 } func (StoragenodeStorageTally) _Table() string { return "storagenode_storage_tallies" } @@ -3132,63 +2989,81 @@ func (StoragenodeStorageTally) _Table() string { return "storagenode_storage_tal type StoragenodeStorageTally_Update_Fields struct { } -type StoragenodeStorageTally_StoragenodeId_Field struct { +type StoragenodeStorageTally_Id_Field struct { + _set bool + _null bool + _value int64 +} + +func StoragenodeStorageTally_Id(v int64) StoragenodeStorageTally_Id_Field { + return StoragenodeStorageTally_Id_Field{_set: true, _value: v} +} + +func (f StoragenodeStorageTally_Id_Field) value() interface{} { + if !f._set || f._null { + return nil + } + return f._value +} + +func (StoragenodeStorageTally_Id_Field) _Column() string { return "id" } + +type StoragenodeStorageTally_NodeId_Field struct { _set bool _null bool _value []byte } -func StoragenodeStorageTally_StoragenodeId(v []byte) StoragenodeStorageTally_StoragenodeId_Field { - return StoragenodeStorageTally_StoragenodeId_Field{_set: true, _value: v} +func StoragenodeStorageTally_NodeId(v []byte) StoragenodeStorageTally_NodeId_Field { + return StoragenodeStorageTally_NodeId_Field{_set: true, _value: v} } -func (f StoragenodeStorageTally_StoragenodeId_Field) value() interface{} { +func (f StoragenodeStorageTally_NodeId_Field) value() interface{} { if !f._set || f._null { return nil } return f._value } -func (StoragenodeStorageTally_StoragenodeId_Field) _Column() string { return "storagenode_id" } +func (StoragenodeStorageTally_NodeId_Field) _Column() string { return "node_id" } -type StoragenodeStorageTally_IntervalStart_Field struct { +type StoragenodeStorageTally_IntervalEndTime_Field struct { _set bool _null bool _value time.Time } -func StoragenodeStorageTally_IntervalStart(v time.Time) StoragenodeStorageTally_IntervalStart_Field { - v = toUTC(v) - return StoragenodeStorageTally_IntervalStart_Field{_set: true, _value: v} +func StoragenodeStorageTally_IntervalEndTime(v time.Time) StoragenodeStorageTally_IntervalEndTime_Field { + return StoragenodeStorageTally_IntervalEndTime_Field{_set: true, _value: v} } -func (f StoragenodeStorageTally_IntervalStart_Field) value() interface{} { +func (f StoragenodeStorageTally_IntervalEndTime_Field) value() interface{} { if !f._set || f._null { return nil } return f._value } -func (StoragenodeStorageTally_IntervalStart_Field) _Column() string { return "interval_start" } +func (StoragenodeStorageTally_IntervalEndTime_Field) _Column() string { return "interval_end_time" } -type StoragenodeStorageTally_Total_Field struct { +type StoragenodeStorageTally_DataTotal_Field struct { _set bool _null bool - _value uint64 + _value float64 } -func StoragenodeStorageTally_Total(v uint64) StoragenodeStorageTally_Total_Field { - return StoragenodeStorageTally_Total_Field{_set: true, _value: v} +func StoragenodeStorageTally_DataTotal(v float64) StoragenodeStorageTally_DataTotal_Field { + return StoragenodeStorageTally_DataTotal_Field{_set: true, _value: v} } -func (f StoragenodeStorageTally_Total_Field) value() interface{} { +func (f StoragenodeStorageTally_DataTotal_Field) value() interface{} { if !f._set || f._null { return nil } return f._value } -func (StoragenodeStorageTally_Total_Field) _Column() string { return "total" } +func (StoragenodeStorageTally_DataTotal_Field) _Column() string { return "data_total" } type User struct { Id []byte @@ -3888,33 +3763,6 @@ func (obj *postgresImpl) Create_AccountingRollup(ctx context.Context, } -func (obj *postgresImpl) Create_AccountingRaw(ctx context.Context, - accounting_raw_node_id AccountingRaw_NodeId_Field, - accounting_raw_interval_end_time AccountingRaw_IntervalEndTime_Field, - accounting_raw_data_total AccountingRaw_DataTotal_Field, - accounting_raw_data_type AccountingRaw_DataType_Field, - accounting_raw_created_at AccountingRaw_CreatedAt_Field) ( - accounting_raw *AccountingRaw, err error) { - __node_id_val := accounting_raw_node_id.value() - __interval_end_time_val := accounting_raw_interval_end_time.value() - __data_total_val := accounting_raw_data_total.value() - __data_type_val := accounting_raw_data_type.value() - __created_at_val := accounting_raw_created_at.value() - - var __embed_stmt = __sqlbundle_Literal("INSERT INTO accounting_raws ( node_id, interval_end_time, data_total, data_type, created_at ) VALUES ( ?, ?, ?, ?, ? ) RETURNING accounting_raws.id, accounting_raws.node_id, accounting_raws.interval_end_time, accounting_raws.data_total, accounting_raws.data_type, accounting_raws.created_at") - - var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) - obj.logStmt(__stmt, __node_id_val, __interval_end_time_val, __data_total_val, __data_type_val, __created_at_val) - - accounting_raw = &AccountingRaw{} - err = obj.driver.QueryRow(__stmt, __node_id_val, __interval_end_time_val, __data_total_val, __data_type_val, __created_at_val).Scan(&accounting_raw.Id, &accounting_raw.NodeId, &accounting_raw.IntervalEndTime, &accounting_raw.DataTotal, &accounting_raw.DataType, &accounting_raw.CreatedAt) - if err != nil { - return nil, obj.makeErr(err) - } - return accounting_raw, nil - -} - func (obj *postgresImpl) Create_Node(ctx context.Context, node_id Node_Id_Field, node_address Node_Address_Field, @@ -4211,6 +4059,29 @@ func (obj *postgresImpl) Create_BucketStorageTally(ctx context.Context, } +func (obj *postgresImpl) Create_StoragenodeStorageTally(ctx context.Context, + storagenode_storage_tally_node_id StoragenodeStorageTally_NodeId_Field, + storagenode_storage_tally_interval_end_time StoragenodeStorageTally_IntervalEndTime_Field, + storagenode_storage_tally_data_total StoragenodeStorageTally_DataTotal_Field) ( + storagenode_storage_tally *StoragenodeStorageTally, err error) { + __node_id_val := storagenode_storage_tally_node_id.value() + __interval_end_time_val := storagenode_storage_tally_interval_end_time.value() + __data_total_val := storagenode_storage_tally_data_total.value() + + var __embed_stmt = __sqlbundle_Literal("INSERT INTO storagenode_storage_tallies ( node_id, interval_end_time, data_total ) VALUES ( ?, ?, ? ) RETURNING storagenode_storage_tallies.id, storagenode_storage_tallies.node_id, storagenode_storage_tallies.interval_end_time, storagenode_storage_tallies.data_total") + + var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) + obj.logStmt(__stmt, __node_id_val, __interval_end_time_val, __data_total_val) + + storagenode_storage_tally = &StoragenodeStorageTally{} + err = obj.driver.QueryRow(__stmt, __node_id_val, __interval_end_time_val, __data_total_val).Scan(&storagenode_storage_tally.Id, &storagenode_storage_tally.NodeId, &storagenode_storage_tally.IntervalEndTime, &storagenode_storage_tally.DataTotal) + if err != nil { + return nil, obj.makeErr(err) + } + return storagenode_storage_tally, nil + +} + func (obj *postgresImpl) Create_CertRecord(ctx context.Context, certRecord_publickey CertRecord_Publickey_Field, certRecord_id CertRecord_Id_Field) ( @@ -4395,92 +4266,6 @@ func (obj *postgresImpl) All_AccountingRollup_By_StartTime_GreaterOrEqual(ctx co } -func (obj *postgresImpl) Get_AccountingRaw_By_Id(ctx context.Context, - accounting_raw_id AccountingRaw_Id_Field) ( - accounting_raw *AccountingRaw, err error) { - - var __embed_stmt = __sqlbundle_Literal("SELECT accounting_raws.id, accounting_raws.node_id, accounting_raws.interval_end_time, accounting_raws.data_total, accounting_raws.data_type, accounting_raws.created_at FROM accounting_raws WHERE accounting_raws.id = ?") - - var __values []interface{} - __values = append(__values, accounting_raw_id.value()) - - var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) - obj.logStmt(__stmt, __values...) - - accounting_raw = &AccountingRaw{} - err = obj.driver.QueryRow(__stmt, __values...).Scan(&accounting_raw.Id, &accounting_raw.NodeId, &accounting_raw.IntervalEndTime, &accounting_raw.DataTotal, &accounting_raw.DataType, &accounting_raw.CreatedAt) - if err != nil { - return nil, obj.makeErr(err) - } - return accounting_raw, nil - -} - -func (obj *postgresImpl) All_AccountingRaw(ctx context.Context) ( - rows []*AccountingRaw, err error) { - - var __embed_stmt = __sqlbundle_Literal("SELECT accounting_raws.id, accounting_raws.node_id, accounting_raws.interval_end_time, accounting_raws.data_total, accounting_raws.data_type, accounting_raws.created_at FROM accounting_raws") - - var __values []interface{} - __values = append(__values) - - var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) - obj.logStmt(__stmt, __values...) - - __rows, err := obj.driver.Query(__stmt, __values...) - if err != nil { - return nil, obj.makeErr(err) - } - defer __rows.Close() - - for __rows.Next() { - accounting_raw := &AccountingRaw{} - err = __rows.Scan(&accounting_raw.Id, &accounting_raw.NodeId, &accounting_raw.IntervalEndTime, &accounting_raw.DataTotal, &accounting_raw.DataType, &accounting_raw.CreatedAt) - if err != nil { - return nil, obj.makeErr(err) - } - rows = append(rows, accounting_raw) - } - if err := __rows.Err(); err != nil { - return nil, obj.makeErr(err) - } - return rows, nil - -} - -func (obj *postgresImpl) All_AccountingRaw_By_IntervalEndTime_GreaterOrEqual(ctx context.Context, - accounting_raw_interval_end_time_greater_or_equal AccountingRaw_IntervalEndTime_Field) ( - rows []*AccountingRaw, err error) { - - var __embed_stmt = __sqlbundle_Literal("SELECT accounting_raws.id, accounting_raws.node_id, accounting_raws.interval_end_time, accounting_raws.data_total, accounting_raws.data_type, accounting_raws.created_at FROM accounting_raws WHERE accounting_raws.interval_end_time >= ?") - - var __values []interface{} - __values = append(__values, accounting_raw_interval_end_time_greater_or_equal.value()) - - var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) - obj.logStmt(__stmt, __values...) - - __rows, err := obj.driver.Query(__stmt, __values...) - if err != nil { - return nil, obj.makeErr(err) - } - defer __rows.Close() - - for __rows.Next() { - accounting_raw := &AccountingRaw{} - err = __rows.Scan(&accounting_raw.Id, &accounting_raw.NodeId, &accounting_raw.IntervalEndTime, &accounting_raw.DataTotal, &accounting_raw.DataType, &accounting_raw.CreatedAt) - if err != nil { - return nil, obj.makeErr(err) - } - rows = append(rows, accounting_raw) - } - if err := __rows.Err(); err != nil { - return nil, obj.makeErr(err) - } - return rows, nil - -} - func (obj *postgresImpl) Get_Node_By_Id(ctx context.Context, node_id Node_Id_Field) ( node *Node, err error) { @@ -5161,6 +4946,92 @@ func (obj *postgresImpl) All_StoragenodeBandwidthRollup_By_IntervalStart_Greater } +func (obj *postgresImpl) Get_StoragenodeStorageTally_By_Id(ctx context.Context, + storagenode_storage_tally_id StoragenodeStorageTally_Id_Field) ( + storagenode_storage_tally *StoragenodeStorageTally, err error) { + + var __embed_stmt = __sqlbundle_Literal("SELECT storagenode_storage_tallies.id, storagenode_storage_tallies.node_id, storagenode_storage_tallies.interval_end_time, storagenode_storage_tallies.data_total FROM storagenode_storage_tallies WHERE storagenode_storage_tallies.id = ?") + + var __values []interface{} + __values = append(__values, storagenode_storage_tally_id.value()) + + var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) + obj.logStmt(__stmt, __values...) + + storagenode_storage_tally = &StoragenodeStorageTally{} + err = obj.driver.QueryRow(__stmt, __values...).Scan(&storagenode_storage_tally.Id, &storagenode_storage_tally.NodeId, &storagenode_storage_tally.IntervalEndTime, &storagenode_storage_tally.DataTotal) + if err != nil { + return nil, obj.makeErr(err) + } + return storagenode_storage_tally, nil + +} + +func (obj *postgresImpl) All_StoragenodeStorageTally(ctx context.Context) ( + rows []*StoragenodeStorageTally, err error) { + + var __embed_stmt = __sqlbundle_Literal("SELECT storagenode_storage_tallies.id, storagenode_storage_tallies.node_id, storagenode_storage_tallies.interval_end_time, storagenode_storage_tallies.data_total FROM storagenode_storage_tallies") + + var __values []interface{} + __values = append(__values) + + var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) + obj.logStmt(__stmt, __values...) + + __rows, err := obj.driver.Query(__stmt, __values...) + if err != nil { + return nil, obj.makeErr(err) + } + defer __rows.Close() + + for __rows.Next() { + storagenode_storage_tally := &StoragenodeStorageTally{} + err = __rows.Scan(&storagenode_storage_tally.Id, &storagenode_storage_tally.NodeId, &storagenode_storage_tally.IntervalEndTime, &storagenode_storage_tally.DataTotal) + if err != nil { + return nil, obj.makeErr(err) + } + rows = append(rows, storagenode_storage_tally) + } + if err := __rows.Err(); err != nil { + return nil, obj.makeErr(err) + } + return rows, nil + +} + +func (obj *postgresImpl) All_StoragenodeStorageTally_By_IntervalEndTime_GreaterOrEqual(ctx context.Context, + storagenode_storage_tally_interval_end_time_greater_or_equal StoragenodeStorageTally_IntervalEndTime_Field) ( + rows []*StoragenodeStorageTally, err error) { + + var __embed_stmt = __sqlbundle_Literal("SELECT storagenode_storage_tallies.id, storagenode_storage_tallies.node_id, storagenode_storage_tallies.interval_end_time, storagenode_storage_tallies.data_total FROM storagenode_storage_tallies WHERE storagenode_storage_tallies.interval_end_time >= ?") + + var __values []interface{} + __values = append(__values, storagenode_storage_tally_interval_end_time_greater_or_equal.value()) + + var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) + obj.logStmt(__stmt, __values...) + + __rows, err := obj.driver.Query(__stmt, __values...) + if err != nil { + return nil, obj.makeErr(err) + } + defer __rows.Close() + + for __rows.Next() { + storagenode_storage_tally := &StoragenodeStorageTally{} + err = __rows.Scan(&storagenode_storage_tally.Id, &storagenode_storage_tally.NodeId, &storagenode_storage_tally.IntervalEndTime, &storagenode_storage_tally.DataTotal) + if err != nil { + return nil, obj.makeErr(err) + } + rows = append(rows, storagenode_storage_tally) + } + if err := __rows.Err(); err != nil { + return nil, obj.makeErr(err) + } + return rows, nil + +} + func (obj *postgresImpl) Get_CertRecord_By_Id(ctx context.Context, certRecord_id CertRecord_Id_Field) ( certRecord *CertRecord, err error) { @@ -5740,32 +5611,6 @@ func (obj *postgresImpl) Delete_AccountingRollup_By_Id(ctx context.Context, } -func (obj *postgresImpl) Delete_AccountingRaw_By_Id(ctx context.Context, - accounting_raw_id AccountingRaw_Id_Field) ( - deleted bool, err error) { - - var __embed_stmt = __sqlbundle_Literal("DELETE FROM accounting_raws WHERE accounting_raws.id = ?") - - var __values []interface{} - __values = append(__values, accounting_raw_id.value()) - - var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) - obj.logStmt(__stmt, __values...) - - __res, err := obj.driver.Exec(__stmt, __values...) - if err != nil { - return false, obj.makeErr(err) - } - - __count, err := __res.RowsAffected() - if err != nil { - return false, obj.makeErr(err) - } - - return __count > 0, nil - -} - func (obj *postgresImpl) Delete_Node_By_Id(ctx context.Context, node_id Node_Id_Field) ( deleted bool, err error) { @@ -5949,6 +5794,32 @@ func (obj *postgresImpl) Delete_SerialNumber_By_ExpiresAt_LessOrEqual(ctx contex } +func (obj *postgresImpl) Delete_StoragenodeStorageTally_By_Id(ctx context.Context, + storagenode_storage_tally_id StoragenodeStorageTally_Id_Field) ( + deleted bool, err error) { + + var __embed_stmt = __sqlbundle_Literal("DELETE FROM storagenode_storage_tallies WHERE storagenode_storage_tallies.id = ?") + + var __values []interface{} + __values = append(__values, storagenode_storage_tally_id.value()) + + var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) + obj.logStmt(__stmt, __values...) + + __res, err := obj.driver.Exec(__stmt, __values...) + if err != nil { + return false, obj.makeErr(err) + } + + __count, err := __res.RowsAffected() + if err != nil { + return false, obj.makeErr(err) + } + + return __count > 0, nil + +} + func (obj *postgresImpl) Delete_CertRecord_By_Id(ctx context.Context, certRecord_id CertRecord_Id_Field) ( deleted bool, err error) { @@ -6173,16 +6044,6 @@ func (obj *postgresImpl) deleteAll(ctx context.Context) (count int64, err error) return 0, obj.makeErr(err) } - __count, err = __res.RowsAffected() - if err != nil { - return 0, obj.makeErr(err) - } - count += __count - __res, err = obj.driver.Exec("DELETE FROM accounting_raws;") - if err != nil { - return 0, obj.makeErr(err) - } - __count, err = __res.RowsAffected() if err != nil { return 0, obj.makeErr(err) @@ -6283,36 +6144,6 @@ func (obj *sqlite3Impl) Create_AccountingRollup(ctx context.Context, } -func (obj *sqlite3Impl) Create_AccountingRaw(ctx context.Context, - accounting_raw_node_id AccountingRaw_NodeId_Field, - accounting_raw_interval_end_time AccountingRaw_IntervalEndTime_Field, - accounting_raw_data_total AccountingRaw_DataTotal_Field, - accounting_raw_data_type AccountingRaw_DataType_Field, - accounting_raw_created_at AccountingRaw_CreatedAt_Field) ( - accounting_raw *AccountingRaw, err error) { - __node_id_val := accounting_raw_node_id.value() - __interval_end_time_val := accounting_raw_interval_end_time.value() - __data_total_val := accounting_raw_data_total.value() - __data_type_val := accounting_raw_data_type.value() - __created_at_val := accounting_raw_created_at.value() - - var __embed_stmt = __sqlbundle_Literal("INSERT INTO accounting_raws ( node_id, interval_end_time, data_total, data_type, created_at ) VALUES ( ?, ?, ?, ?, ? )") - - var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) - obj.logStmt(__stmt, __node_id_val, __interval_end_time_val, __data_total_val, __data_type_val, __created_at_val) - - __res, err := obj.driver.Exec(__stmt, __node_id_val, __interval_end_time_val, __data_total_val, __data_type_val, __created_at_val) - if err != nil { - return nil, obj.makeErr(err) - } - __pk, err := __res.LastInsertId() - if err != nil { - return nil, obj.makeErr(err) - } - return obj.getLastAccountingRaw(ctx, __pk) - -} - func (obj *sqlite3Impl) Create_Node(ctx context.Context, node_id Node_Id_Field, node_address Node_Address_Field, @@ -6636,6 +6467,32 @@ func (obj *sqlite3Impl) Create_BucketStorageTally(ctx context.Context, } +func (obj *sqlite3Impl) Create_StoragenodeStorageTally(ctx context.Context, + storagenode_storage_tally_node_id StoragenodeStorageTally_NodeId_Field, + storagenode_storage_tally_interval_end_time StoragenodeStorageTally_IntervalEndTime_Field, + storagenode_storage_tally_data_total StoragenodeStorageTally_DataTotal_Field) ( + storagenode_storage_tally *StoragenodeStorageTally, err error) { + __node_id_val := storagenode_storage_tally_node_id.value() + __interval_end_time_val := storagenode_storage_tally_interval_end_time.value() + __data_total_val := storagenode_storage_tally_data_total.value() + + var __embed_stmt = __sqlbundle_Literal("INSERT INTO storagenode_storage_tallies ( node_id, interval_end_time, data_total ) VALUES ( ?, ?, ? )") + + var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) + obj.logStmt(__stmt, __node_id_val, __interval_end_time_val, __data_total_val) + + __res, err := obj.driver.Exec(__stmt, __node_id_val, __interval_end_time_val, __data_total_val) + if err != nil { + return nil, obj.makeErr(err) + } + __pk, err := __res.LastInsertId() + if err != nil { + return nil, obj.makeErr(err) + } + return obj.getLastStoragenodeStorageTally(ctx, __pk) + +} + func (obj *sqlite3Impl) Create_CertRecord(ctx context.Context, certRecord_publickey CertRecord_Publickey_Field, certRecord_id CertRecord_Id_Field) ( @@ -6826,92 +6683,6 @@ func (obj *sqlite3Impl) All_AccountingRollup_By_StartTime_GreaterOrEqual(ctx con } -func (obj *sqlite3Impl) Get_AccountingRaw_By_Id(ctx context.Context, - accounting_raw_id AccountingRaw_Id_Field) ( - accounting_raw *AccountingRaw, err error) { - - var __embed_stmt = __sqlbundle_Literal("SELECT accounting_raws.id, accounting_raws.node_id, accounting_raws.interval_end_time, accounting_raws.data_total, accounting_raws.data_type, accounting_raws.created_at FROM accounting_raws WHERE accounting_raws.id = ?") - - var __values []interface{} - __values = append(__values, accounting_raw_id.value()) - - var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) - obj.logStmt(__stmt, __values...) - - accounting_raw = &AccountingRaw{} - err = obj.driver.QueryRow(__stmt, __values...).Scan(&accounting_raw.Id, &accounting_raw.NodeId, &accounting_raw.IntervalEndTime, &accounting_raw.DataTotal, &accounting_raw.DataType, &accounting_raw.CreatedAt) - if err != nil { - return nil, obj.makeErr(err) - } - return accounting_raw, nil - -} - -func (obj *sqlite3Impl) All_AccountingRaw(ctx context.Context) ( - rows []*AccountingRaw, err error) { - - var __embed_stmt = __sqlbundle_Literal("SELECT accounting_raws.id, accounting_raws.node_id, accounting_raws.interval_end_time, accounting_raws.data_total, accounting_raws.data_type, accounting_raws.created_at FROM accounting_raws") - - var __values []interface{} - __values = append(__values) - - var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) - obj.logStmt(__stmt, __values...) - - __rows, err := obj.driver.Query(__stmt, __values...) - if err != nil { - return nil, obj.makeErr(err) - } - defer __rows.Close() - - for __rows.Next() { - accounting_raw := &AccountingRaw{} - err = __rows.Scan(&accounting_raw.Id, &accounting_raw.NodeId, &accounting_raw.IntervalEndTime, &accounting_raw.DataTotal, &accounting_raw.DataType, &accounting_raw.CreatedAt) - if err != nil { - return nil, obj.makeErr(err) - } - rows = append(rows, accounting_raw) - } - if err := __rows.Err(); err != nil { - return nil, obj.makeErr(err) - } - return rows, nil - -} - -func (obj *sqlite3Impl) All_AccountingRaw_By_IntervalEndTime_GreaterOrEqual(ctx context.Context, - accounting_raw_interval_end_time_greater_or_equal AccountingRaw_IntervalEndTime_Field) ( - rows []*AccountingRaw, err error) { - - var __embed_stmt = __sqlbundle_Literal("SELECT accounting_raws.id, accounting_raws.node_id, accounting_raws.interval_end_time, accounting_raws.data_total, accounting_raws.data_type, accounting_raws.created_at FROM accounting_raws WHERE accounting_raws.interval_end_time >= ?") - - var __values []interface{} - __values = append(__values, accounting_raw_interval_end_time_greater_or_equal.value()) - - var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) - obj.logStmt(__stmt, __values...) - - __rows, err := obj.driver.Query(__stmt, __values...) - if err != nil { - return nil, obj.makeErr(err) - } - defer __rows.Close() - - for __rows.Next() { - accounting_raw := &AccountingRaw{} - err = __rows.Scan(&accounting_raw.Id, &accounting_raw.NodeId, &accounting_raw.IntervalEndTime, &accounting_raw.DataTotal, &accounting_raw.DataType, &accounting_raw.CreatedAt) - if err != nil { - return nil, obj.makeErr(err) - } - rows = append(rows, accounting_raw) - } - if err := __rows.Err(); err != nil { - return nil, obj.makeErr(err) - } - return rows, nil - -} - func (obj *sqlite3Impl) Get_Node_By_Id(ctx context.Context, node_id Node_Id_Field) ( node *Node, err error) { @@ -7592,6 +7363,92 @@ func (obj *sqlite3Impl) All_StoragenodeBandwidthRollup_By_IntervalStart_GreaterO } +func (obj *sqlite3Impl) Get_StoragenodeStorageTally_By_Id(ctx context.Context, + storagenode_storage_tally_id StoragenodeStorageTally_Id_Field) ( + storagenode_storage_tally *StoragenodeStorageTally, err error) { + + var __embed_stmt = __sqlbundle_Literal("SELECT storagenode_storage_tallies.id, storagenode_storage_tallies.node_id, storagenode_storage_tallies.interval_end_time, storagenode_storage_tallies.data_total FROM storagenode_storage_tallies WHERE storagenode_storage_tallies.id = ?") + + var __values []interface{} + __values = append(__values, storagenode_storage_tally_id.value()) + + var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) + obj.logStmt(__stmt, __values...) + + storagenode_storage_tally = &StoragenodeStorageTally{} + err = obj.driver.QueryRow(__stmt, __values...).Scan(&storagenode_storage_tally.Id, &storagenode_storage_tally.NodeId, &storagenode_storage_tally.IntervalEndTime, &storagenode_storage_tally.DataTotal) + if err != nil { + return nil, obj.makeErr(err) + } + return storagenode_storage_tally, nil + +} + +func (obj *sqlite3Impl) All_StoragenodeStorageTally(ctx context.Context) ( + rows []*StoragenodeStorageTally, err error) { + + var __embed_stmt = __sqlbundle_Literal("SELECT storagenode_storage_tallies.id, storagenode_storage_tallies.node_id, storagenode_storage_tallies.interval_end_time, storagenode_storage_tallies.data_total FROM storagenode_storage_tallies") + + var __values []interface{} + __values = append(__values) + + var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) + obj.logStmt(__stmt, __values...) + + __rows, err := obj.driver.Query(__stmt, __values...) + if err != nil { + return nil, obj.makeErr(err) + } + defer __rows.Close() + + for __rows.Next() { + storagenode_storage_tally := &StoragenodeStorageTally{} + err = __rows.Scan(&storagenode_storage_tally.Id, &storagenode_storage_tally.NodeId, &storagenode_storage_tally.IntervalEndTime, &storagenode_storage_tally.DataTotal) + if err != nil { + return nil, obj.makeErr(err) + } + rows = append(rows, storagenode_storage_tally) + } + if err := __rows.Err(); err != nil { + return nil, obj.makeErr(err) + } + return rows, nil + +} + +func (obj *sqlite3Impl) All_StoragenodeStorageTally_By_IntervalEndTime_GreaterOrEqual(ctx context.Context, + storagenode_storage_tally_interval_end_time_greater_or_equal StoragenodeStorageTally_IntervalEndTime_Field) ( + rows []*StoragenodeStorageTally, err error) { + + var __embed_stmt = __sqlbundle_Literal("SELECT storagenode_storage_tallies.id, storagenode_storage_tallies.node_id, storagenode_storage_tallies.interval_end_time, storagenode_storage_tallies.data_total FROM storagenode_storage_tallies WHERE storagenode_storage_tallies.interval_end_time >= ?") + + var __values []interface{} + __values = append(__values, storagenode_storage_tally_interval_end_time_greater_or_equal.value()) + + var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) + obj.logStmt(__stmt, __values...) + + __rows, err := obj.driver.Query(__stmt, __values...) + if err != nil { + return nil, obj.makeErr(err) + } + defer __rows.Close() + + for __rows.Next() { + storagenode_storage_tally := &StoragenodeStorageTally{} + err = __rows.Scan(&storagenode_storage_tally.Id, &storagenode_storage_tally.NodeId, &storagenode_storage_tally.IntervalEndTime, &storagenode_storage_tally.DataTotal) + if err != nil { + return nil, obj.makeErr(err) + } + rows = append(rows, storagenode_storage_tally) + } + if err := __rows.Err(); err != nil { + return nil, obj.makeErr(err) + } + return rows, nil + +} + func (obj *sqlite3Impl) Get_CertRecord_By_Id(ctx context.Context, certRecord_id CertRecord_Id_Field) ( certRecord *CertRecord, err error) { @@ -8251,32 +8108,6 @@ func (obj *sqlite3Impl) Delete_AccountingRollup_By_Id(ctx context.Context, } -func (obj *sqlite3Impl) Delete_AccountingRaw_By_Id(ctx context.Context, - accounting_raw_id AccountingRaw_Id_Field) ( - deleted bool, err error) { - - var __embed_stmt = __sqlbundle_Literal("DELETE FROM accounting_raws WHERE accounting_raws.id = ?") - - var __values []interface{} - __values = append(__values, accounting_raw_id.value()) - - var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) - obj.logStmt(__stmt, __values...) - - __res, err := obj.driver.Exec(__stmt, __values...) - if err != nil { - return false, obj.makeErr(err) - } - - __count, err := __res.RowsAffected() - if err != nil { - return false, obj.makeErr(err) - } - - return __count > 0, nil - -} - func (obj *sqlite3Impl) Delete_Node_By_Id(ctx context.Context, node_id Node_Id_Field) ( deleted bool, err error) { @@ -8460,6 +8291,32 @@ func (obj *sqlite3Impl) Delete_SerialNumber_By_ExpiresAt_LessOrEqual(ctx context } +func (obj *sqlite3Impl) Delete_StoragenodeStorageTally_By_Id(ctx context.Context, + storagenode_storage_tally_id StoragenodeStorageTally_Id_Field) ( + deleted bool, err error) { + + var __embed_stmt = __sqlbundle_Literal("DELETE FROM storagenode_storage_tallies WHERE storagenode_storage_tallies.id = ?") + + var __values []interface{} + __values = append(__values, storagenode_storage_tally_id.value()) + + var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) + obj.logStmt(__stmt, __values...) + + __res, err := obj.driver.Exec(__stmt, __values...) + if err != nil { + return false, obj.makeErr(err) + } + + __count, err := __res.RowsAffected() + if err != nil { + return false, obj.makeErr(err) + } + + return __count > 0, nil + +} + func (obj *sqlite3Impl) Delete_CertRecord_By_Id(ctx context.Context, certRecord_id CertRecord_Id_Field) ( deleted bool, err error) { @@ -8540,24 +8397,6 @@ func (obj *sqlite3Impl) getLastAccountingRollup(ctx context.Context, } -func (obj *sqlite3Impl) getLastAccountingRaw(ctx context.Context, - pk int64) ( - accounting_raw *AccountingRaw, err error) { - - var __embed_stmt = __sqlbundle_Literal("SELECT accounting_raws.id, accounting_raws.node_id, accounting_raws.interval_end_time, accounting_raws.data_total, accounting_raws.data_type, accounting_raws.created_at FROM accounting_raws WHERE _rowid_ = ?") - - var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) - obj.logStmt(__stmt, pk) - - accounting_raw = &AccountingRaw{} - err = obj.driver.QueryRow(__stmt, pk).Scan(&accounting_raw.Id, &accounting_raw.NodeId, &accounting_raw.IntervalEndTime, &accounting_raw.DataTotal, &accounting_raw.DataType, &accounting_raw.CreatedAt) - if err != nil { - return nil, obj.makeErr(err) - } - return accounting_raw, nil - -} - func (obj *sqlite3Impl) getLastNode(ctx context.Context, pk int64) ( node *Node, err error) { @@ -8720,6 +8559,24 @@ func (obj *sqlite3Impl) getLastBucketStorageTally(ctx context.Context, } +func (obj *sqlite3Impl) getLastStoragenodeStorageTally(ctx context.Context, + pk int64) ( + storagenode_storage_tally *StoragenodeStorageTally, err error) { + + var __embed_stmt = __sqlbundle_Literal("SELECT storagenode_storage_tallies.id, storagenode_storage_tallies.node_id, storagenode_storage_tallies.interval_end_time, storagenode_storage_tallies.data_total FROM storagenode_storage_tallies WHERE _rowid_ = ?") + + var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) + obj.logStmt(__stmt, pk) + + storagenode_storage_tally = &StoragenodeStorageTally{} + err = obj.driver.QueryRow(__stmt, pk).Scan(&storagenode_storage_tally.Id, &storagenode_storage_tally.NodeId, &storagenode_storage_tally.IntervalEndTime, &storagenode_storage_tally.DataTotal) + if err != nil { + return nil, obj.makeErr(err) + } + return storagenode_storage_tally, nil + +} + func (obj *sqlite3Impl) getLastCertRecord(ctx context.Context, pk int64) ( certRecord *CertRecord, err error) { @@ -8959,16 +8816,6 @@ func (obj *sqlite3Impl) deleteAll(ctx context.Context) (count int64, err error) return 0, obj.makeErr(err) } - __count, err = __res.RowsAffected() - if err != nil { - return 0, obj.makeErr(err) - } - count += __count - __res, err = obj.driver.Exec("DELETE FROM accounting_raws;") - if err != nil { - return 0, obj.makeErr(err) - } - __count, err = __res.RowsAffected() if err != nil { return 0, obj.makeErr(err) @@ -9021,25 +8868,6 @@ func (rx *Rx) Rollback() (err error) { return err } -func (rx *Rx) All_AccountingRaw(ctx context.Context) ( - rows []*AccountingRaw, err error) { - var tx *Tx - if tx, err = rx.getTx(ctx); err != nil { - return - } - return tx.All_AccountingRaw(ctx) -} - -func (rx *Rx) All_AccountingRaw_By_IntervalEndTime_GreaterOrEqual(ctx context.Context, - accounting_raw_interval_end_time_greater_or_equal AccountingRaw_IntervalEndTime_Field) ( - rows []*AccountingRaw, err error) { - var tx *Tx - if tx, err = rx.getTx(ctx); err != nil { - return - } - return tx.All_AccountingRaw_By_IntervalEndTime_GreaterOrEqual(ctx, accounting_raw_interval_end_time_greater_or_equal) -} - func (rx *Rx) All_AccountingRollup_By_StartTime_GreaterOrEqual(ctx context.Context, accounting_rollup_start_time_greater_or_equal AccountingRollup_StartTime_Field) ( rows []*AccountingRollup, err error) { @@ -9121,19 +8949,23 @@ func (rx *Rx) All_StoragenodeBandwidthRollup_By_IntervalStart_GreaterOrEqual(ctx return tx.All_StoragenodeBandwidthRollup_By_IntervalStart_GreaterOrEqual(ctx, storagenode_bandwidth_rollup_interval_start_greater_or_equal) } -func (rx *Rx) Create_AccountingRaw(ctx context.Context, - accounting_raw_node_id AccountingRaw_NodeId_Field, - accounting_raw_interval_end_time AccountingRaw_IntervalEndTime_Field, - accounting_raw_data_total AccountingRaw_DataTotal_Field, - accounting_raw_data_type AccountingRaw_DataType_Field, - accounting_raw_created_at AccountingRaw_CreatedAt_Field) ( - accounting_raw *AccountingRaw, err error) { +func (rx *Rx) All_StoragenodeStorageTally(ctx context.Context) ( + rows []*StoragenodeStorageTally, err error) { var tx *Tx if tx, err = rx.getTx(ctx); err != nil { return } - return tx.Create_AccountingRaw(ctx, accounting_raw_node_id, accounting_raw_interval_end_time, accounting_raw_data_total, accounting_raw_data_type, accounting_raw_created_at) + return tx.All_StoragenodeStorageTally(ctx) +} +func (rx *Rx) All_StoragenodeStorageTally_By_IntervalEndTime_GreaterOrEqual(ctx context.Context, + storagenode_storage_tally_interval_end_time_greater_or_equal StoragenodeStorageTally_IntervalEndTime_Field) ( + rows []*StoragenodeStorageTally, err error) { + var tx *Tx + if tx, err = rx.getTx(ctx); err != nil { + return + } + return tx.All_StoragenodeStorageTally_By_IntervalEndTime_GreaterOrEqual(ctx, storagenode_storage_tally_interval_end_time_greater_or_equal) } func (rx *Rx) Create_AccountingRollup(ctx context.Context, @@ -9332,6 +9164,19 @@ func (rx *Rx) Create_SerialNumber(ctx context.Context, } +func (rx *Rx) Create_StoragenodeStorageTally(ctx context.Context, + storagenode_storage_tally_node_id StoragenodeStorageTally_NodeId_Field, + storagenode_storage_tally_interval_end_time StoragenodeStorageTally_IntervalEndTime_Field, + storagenode_storage_tally_data_total StoragenodeStorageTally_DataTotal_Field) ( + storagenode_storage_tally *StoragenodeStorageTally, err error) { + var tx *Tx + if tx, err = rx.getTx(ctx); err != nil { + return + } + return tx.Create_StoragenodeStorageTally(ctx, storagenode_storage_tally_node_id, storagenode_storage_tally_interval_end_time, storagenode_storage_tally_data_total) + +} + func (rx *Rx) Create_UsedSerial(ctx context.Context, used_serial_serial_number_id UsedSerial_SerialNumberId_Field, used_serial_storage_node_id UsedSerial_StorageNodeId_Field) ( @@ -9359,16 +9204,6 @@ func (rx *Rx) Create_User(ctx context.Context, } -func (rx *Rx) Delete_AccountingRaw_By_Id(ctx context.Context, - accounting_raw_id AccountingRaw_Id_Field) ( - deleted bool, err error) { - var tx *Tx - if tx, err = rx.getTx(ctx); err != nil { - return - } - return tx.Delete_AccountingRaw_By_Id(ctx, accounting_raw_id) -} - func (rx *Rx) Delete_AccountingRollup_By_Id(ctx context.Context, accounting_rollup_id AccountingRollup_Id_Field) ( deleted bool, err error) { @@ -9461,6 +9296,16 @@ func (rx *Rx) Delete_SerialNumber_By_ExpiresAt_LessOrEqual(ctx context.Context, } +func (rx *Rx) Delete_StoragenodeStorageTally_By_Id(ctx context.Context, + storagenode_storage_tally_id StoragenodeStorageTally_Id_Field) ( + deleted bool, err error) { + var tx *Tx + if tx, err = rx.getTx(ctx); err != nil { + return + } + return tx.Delete_StoragenodeStorageTally_By_Id(ctx, storagenode_storage_tally_id) +} + func (rx *Rx) Delete_User_By_Id(ctx context.Context, user_id User_Id_Field) ( deleted bool, err error) { @@ -9526,16 +9371,6 @@ func (rx *Rx) First_BucketStorageTally_By_ProjectId_OrderBy_Desc_IntervalStart(c return tx.First_BucketStorageTally_By_ProjectId_OrderBy_Desc_IntervalStart(ctx, bucket_storage_tally_project_id) } -func (rx *Rx) Get_AccountingRaw_By_Id(ctx context.Context, - accounting_raw_id AccountingRaw_Id_Field) ( - accounting_raw *AccountingRaw, err error) { - var tx *Tx - if tx, err = rx.getTx(ctx); err != nil { - return - } - return tx.Get_AccountingRaw_By_Id(ctx, accounting_raw_id) -} - func (rx *Rx) Get_AccountingRollup_By_Id(ctx context.Context, accounting_rollup_id AccountingRollup_Id_Field) ( accounting_rollup *AccountingRollup, err error) { @@ -9636,6 +9471,16 @@ func (rx *Rx) Get_RegistrationToken_By_Secret(ctx context.Context, return tx.Get_RegistrationToken_By_Secret(ctx, registration_token_secret) } +func (rx *Rx) Get_StoragenodeStorageTally_By_Id(ctx context.Context, + storagenode_storage_tally_id StoragenodeStorageTally_Id_Field) ( + storagenode_storage_tally *StoragenodeStorageTally, err error) { + var tx *Tx + if tx, err = rx.getTx(ctx); err != nil { + return + } + return tx.Get_StoragenodeStorageTally_By_Id(ctx, storagenode_storage_tally_id) +} + func (rx *Rx) Get_User_By_Email_And_Status_Not_Number(ctx context.Context, user_email User_Email_Field) ( user *User, err error) { @@ -9803,13 +9648,6 @@ func (rx *Rx) Update_User_By_Id(ctx context.Context, } type Methods interface { - All_AccountingRaw(ctx context.Context) ( - rows []*AccountingRaw, err error) - - All_AccountingRaw_By_IntervalEndTime_GreaterOrEqual(ctx context.Context, - accounting_raw_interval_end_time_greater_or_equal AccountingRaw_IntervalEndTime_Field) ( - rows []*AccountingRaw, err error) - All_AccountingRollup_By_StartTime_GreaterOrEqual(ctx context.Context, accounting_rollup_start_time_greater_or_equal AccountingRollup_StartTime_Field) ( rows []*AccountingRollup, err error) @@ -9843,13 +9681,12 @@ type Methods interface { storagenode_bandwidth_rollup_interval_start_greater_or_equal StoragenodeBandwidthRollup_IntervalStart_Field) ( rows []*StoragenodeBandwidthRollup, err error) - Create_AccountingRaw(ctx context.Context, - accounting_raw_node_id AccountingRaw_NodeId_Field, - accounting_raw_interval_end_time AccountingRaw_IntervalEndTime_Field, - accounting_raw_data_total AccountingRaw_DataTotal_Field, - accounting_raw_data_type AccountingRaw_DataType_Field, - accounting_raw_created_at AccountingRaw_CreatedAt_Field) ( - accounting_raw *AccountingRaw, err error) + All_StoragenodeStorageTally(ctx context.Context) ( + rows []*StoragenodeStorageTally, err error) + + All_StoragenodeStorageTally_By_IntervalEndTime_GreaterOrEqual(ctx context.Context, + storagenode_storage_tally_interval_end_time_greater_or_equal StoragenodeStorageTally_IntervalEndTime_Field) ( + rows []*StoragenodeStorageTally, err error) Create_AccountingRollup(ctx context.Context, accounting_rollup_node_id AccountingRollup_NodeId_Field, @@ -9963,6 +9800,12 @@ type Methods interface { serial_number_expires_at SerialNumber_ExpiresAt_Field) ( serial_number *SerialNumber, err error) + Create_StoragenodeStorageTally(ctx context.Context, + storagenode_storage_tally_node_id StoragenodeStorageTally_NodeId_Field, + storagenode_storage_tally_interval_end_time StoragenodeStorageTally_IntervalEndTime_Field, + storagenode_storage_tally_data_total StoragenodeStorageTally_DataTotal_Field) ( + storagenode_storage_tally *StoragenodeStorageTally, err error) + Create_UsedSerial(ctx context.Context, used_serial_serial_number_id UsedSerial_SerialNumberId_Field, used_serial_storage_node_id UsedSerial_StorageNodeId_Field) ( @@ -9976,10 +9819,6 @@ type Methods interface { optional User_Create_Fields) ( user *User, err error) - Delete_AccountingRaw_By_Id(ctx context.Context, - accounting_raw_id AccountingRaw_Id_Field) ( - deleted bool, err error) - Delete_AccountingRollup_By_Id(ctx context.Context, accounting_rollup_id AccountingRollup_Id_Field) ( deleted bool, err error) @@ -10017,6 +9856,10 @@ type Methods interface { serial_number_expires_at_less_or_equal SerialNumber_ExpiresAt_Field) ( count int64, err error) + Delete_StoragenodeStorageTally_By_Id(ctx context.Context, + storagenode_storage_tally_id StoragenodeStorageTally_Id_Field) ( + deleted bool, err error) + Delete_User_By_Id(ctx context.Context, user_id User_Id_Field) ( deleted bool, err error) @@ -10046,10 +9889,6 @@ type Methods interface { bucket_storage_tally_project_id BucketStorageTally_ProjectId_Field) ( bucket_storage_tally *BucketStorageTally, err error) - Get_AccountingRaw_By_Id(ctx context.Context, - accounting_raw_id AccountingRaw_Id_Field) ( - accounting_raw *AccountingRaw, err error) - Get_AccountingRollup_By_Id(ctx context.Context, accounting_rollup_id AccountingRollup_Id_Field) ( accounting_rollup *AccountingRollup, err error) @@ -10090,6 +9929,10 @@ type Methods interface { registration_token_secret RegistrationToken_Secret_Field) ( registration_token *RegistrationToken, err error) + Get_StoragenodeStorageTally_By_Id(ctx context.Context, + storagenode_storage_tally_id StoragenodeStorageTally_Id_Field) ( + storagenode_storage_tally *StoragenodeStorageTally, err error) + Get_User_By_Email_And_Status_Not_Number(ctx context.Context, user_email User_Email_Field) ( user *User, err error) diff --git a/satellite/satellitedb/dbx/satellitedb.dbx.postgres.sql b/satellite/satellitedb/dbx/satellitedb.dbx.postgres.sql index c70e14de3..1a78f0b38 100644 --- a/satellite/satellitedb/dbx/satellitedb.dbx.postgres.sql +++ b/satellite/satellitedb/dbx/satellitedb.dbx.postgres.sql @@ -1,14 +1,5 @@ -- AUTOGENERATED BY gopkg.in/spacemonkeygo/dbx.v1 -- DO NOT EDIT -CREATE TABLE accounting_raws ( - id bigserial NOT NULL, - node_id bytea NOT NULL, - interval_end_time timestamp with time zone NOT NULL, - data_total double precision NOT NULL, - data_type integer NOT NULL, - created_at timestamp with time zone NOT NULL, - PRIMARY KEY ( id ) -); CREATE TABLE accounting_rollups ( id bigserial NOT NULL, node_id bytea NOT NULL, @@ -154,10 +145,11 @@ CREATE TABLE storagenode_bandwidth_rollups ( PRIMARY KEY ( storagenode_id, interval_start, action ) ); CREATE TABLE storagenode_storage_tallies ( - storagenode_id bytea NOT NULL, - interval_start timestamp NOT NULL, - total bigint NOT NULL, - PRIMARY KEY ( storagenode_id, interval_start ) + id bigserial NOT NULL, + node_id bytea NOT NULL, + interval_end_time timestamp with time zone NOT NULL, + data_total double precision NOT NULL, + PRIMARY KEY ( id ) ); CREATE TABLE users ( id bytea NOT NULL, diff --git a/satellite/satellitedb/dbx/satellitedb.dbx.sqlite3.sql b/satellite/satellitedb/dbx/satellitedb.dbx.sqlite3.sql index b75eb1203..d036f49bb 100644 --- a/satellite/satellitedb/dbx/satellitedb.dbx.sqlite3.sql +++ b/satellite/satellitedb/dbx/satellitedb.dbx.sqlite3.sql @@ -1,14 +1,5 @@ -- AUTOGENERATED BY gopkg.in/spacemonkeygo/dbx.v1 -- DO NOT EDIT -CREATE TABLE accounting_raws ( - id INTEGER NOT NULL, - node_id BLOB NOT NULL, - interval_end_time TIMESTAMP NOT NULL, - data_total REAL NOT NULL, - data_type INTEGER NOT NULL, - created_at TIMESTAMP NOT NULL, - PRIMARY KEY ( id ) -); CREATE TABLE accounting_rollups ( id INTEGER NOT NULL, node_id BLOB NOT NULL, @@ -154,10 +145,11 @@ CREATE TABLE storagenode_bandwidth_rollups ( PRIMARY KEY ( storagenode_id, interval_start, action ) ); CREATE TABLE storagenode_storage_tallies ( - storagenode_id BLOB NOT NULL, - interval_start TIMESTAMP NOT NULL, - total INTEGER NOT NULL, - PRIMARY KEY ( storagenode_id, interval_start ) + id INTEGER NOT NULL, + node_id BLOB NOT NULL, + interval_end_time TIMESTAMP NOT NULL, + data_total REAL NOT NULL, + PRIMARY KEY ( id ) ); CREATE TABLE users ( id BLOB NOT NULL, diff --git a/satellite/satellitedb/locked.go b/satellite/satellitedb/locked.go index 578f5646f..be5052ba1 100644 --- a/satellite/satellitedb/locked.go +++ b/satellite/satellitedb/locked.go @@ -37,103 +37,6 @@ func newLocked(db satellite.DB) satellite.DB { return &locked{&sync.Mutex{}, db} } -// Accounting returns database for storing information about data use -func (m *locked) Accounting() accounting.DB { - m.Lock() - defer m.Unlock() - return &lockedAccounting{m.Locker, m.db.Accounting()} -} - -// lockedAccounting implements locking wrapper for accounting.DB -type lockedAccounting struct { - sync.Locker - db accounting.DB -} - -// CreateBucketStorageTally creates a record for BucketStorageTally in the accounting DB table -func (m *lockedAccounting) CreateBucketStorageTally(ctx context.Context, tally accounting.BucketStorageTally) error { - m.Lock() - defer m.Unlock() - return m.db.CreateBucketStorageTally(ctx, tally) -} - -// DeleteRawBefore deletes all raw tallies prior to some time -func (m *lockedAccounting) DeleteRawBefore(ctx context.Context, latestRollup time.Time) error { - m.Lock() - defer m.Unlock() - return m.db.DeleteRawBefore(ctx, latestRollup) -} - -// GetRaw retrieves all raw tallies -func (m *lockedAccounting) GetRaw(ctx context.Context) ([]*accounting.Raw, error) { - m.Lock() - defer m.Unlock() - return m.db.GetRaw(ctx) -} - -// GetRawSince retrieves all raw tallies since latestRollup -func (m *lockedAccounting) GetRawSince(ctx context.Context, latestRollup time.Time) ([]*accounting.Raw, error) { - m.Lock() - defer m.Unlock() - return m.db.GetRawSince(ctx, latestRollup) -} - -// GetStoragenodeBandwidthSince retrieves all storagenode_bandwidth_rollup entires since latestRollup -func (m *lockedAccounting) GetStoragenodeBandwidthSince(ctx context.Context, latestRollup time.Time) ([]*accounting.StoragenodeBandwidthRollup, error) { - m.Lock() - defer m.Unlock() - return m.db.GetStoragenodeBandwidthSince(ctx, latestRollup) -} - -// LastTimestamp records the latest last tallied time. -func (m *lockedAccounting) LastTimestamp(ctx context.Context, timestampType string) (time.Time, error) { - m.Lock() - defer m.Unlock() - return m.db.LastTimestamp(ctx, timestampType) -} - -// ProjectAllocatedBandwidthTotal returns the sum of GET bandwidth usage allocated for a projectID in the past time frame -func (m *lockedAccounting) ProjectAllocatedBandwidthTotal(ctx context.Context, bucketID []byte, from time.Time) (int64, error) { - m.Lock() - defer m.Unlock() - return m.db.ProjectAllocatedBandwidthTotal(ctx, bucketID, from) -} - -// ProjectStorageTotals returns the current inline and remote storage usage for a projectID -func (m *lockedAccounting) ProjectStorageTotals(ctx context.Context, projectID uuid.UUID) (int64, int64, error) { - m.Lock() - defer m.Unlock() - return m.db.ProjectStorageTotals(ctx, projectID) -} - -// QueryPaymentInfo queries Overlay, Accounting Rollup on nodeID -func (m *lockedAccounting) QueryPaymentInfo(ctx context.Context, start time.Time, end time.Time) ([]*accounting.CSVRow, error) { - m.Lock() - defer m.Unlock() - return m.db.QueryPaymentInfo(ctx, start, end) -} - -// SaveAtRestRaw records raw tallies of at-rest-data. -func (m *lockedAccounting) SaveAtRestRaw(ctx context.Context, latestTally time.Time, created time.Time, nodeData map[storj.NodeID]float64) error { - m.Lock() - defer m.Unlock() - return m.db.SaveAtRestRaw(ctx, latestTally, created, nodeData) -} - -// SaveBucketTallies saves the latest bucket info -func (m *lockedAccounting) SaveBucketTallies(ctx context.Context, intervalStart time.Time, bucketTallies map[string]*accounting.BucketTally) ([]accounting.BucketTally, error) { - m.Lock() - defer m.Unlock() - return m.db.SaveBucketTallies(ctx, intervalStart, bucketTallies) -} - -// SaveRollup records raw tallies of at rest data to the database -func (m *lockedAccounting) SaveRollup(ctx context.Context, latestTally time.Time, stats accounting.RollupStats) error { - m.Lock() - defer m.Unlock() - return m.db.SaveRollup(ctx, latestTally, stats) -} - // BandwidthAgreement returns database for storing bandwidth agreements func (m *locked) BandwidthAgreement() bwagreement.DB { m.Lock() @@ -703,6 +606,13 @@ func (m *lockedOverlayCache) Get(ctx context.Context, nodeID storj.NodeID) (*ove return m.db.Get(ctx, nodeID) } +// KnownUnreliableOrOffline filters a set of nodes to unhealth or offlines node, independent of new +func (m *lockedOverlayCache) KnownUnreliableOrOffline(ctx context.Context, a1 *overlay.NodeCriteria, a2 storj.NodeIDList) (storj.NodeIDList, error) { + m.Lock() + defer m.Unlock() + return m.db.KnownUnreliableOrOffline(ctx, a1, a2) +} + // Paginate will page through the database nodes func (m *lockedOverlayCache) Paginate(ctx context.Context, offset int64, limit int) ([]*overlay.NodeDossier, bool, error) { m.Lock() @@ -724,14 +634,6 @@ func (m *lockedOverlayCache) SelectStorageNodes(ctx context.Context, count int, return m.db.SelectStorageNodes(ctx, count, criteria) } -// KnownUnreliableOrOffline filters a set of nodes to unhealth or offlines node, independent of new -// Note that KnownUnreliableOrOffline will not return node ids which are not in the database at all -func (m *lockedOverlayCache) KnownUnreliableOrOffline(ctx context.Context, a1 *overlay.NodeCriteria, a2 storj.NodeIDList) (storj.NodeIDList, error) { - m.Lock() - defer m.Unlock() - return m.db.KnownUnreliableOrOffline(ctx, a1, a2) -} - // Update updates node address func (m *lockedOverlayCache) UpdateAddress(ctx context.Context, value *pb.Node) error { m.Lock() @@ -760,6 +662,47 @@ func (m *lockedOverlayCache) UpdateUptime(ctx context.Context, nodeID storj.Node return m.db.UpdateUptime(ctx, nodeID, isUp) } +// ProjectAccounting returns database for storing information about project data use +func (m *locked) ProjectAccounting() accounting.ProjectAccounting { + m.Lock() + defer m.Unlock() + return &lockedProjectAccounting{m.Locker, m.db.ProjectAccounting()} +} + +// lockedProjectAccounting implements locking wrapper for accounting.ProjectAccounting +type lockedProjectAccounting struct { + sync.Locker + db accounting.ProjectAccounting +} + +// CreateStorageTally creates a record for BucketStorageTally in the accounting DB table +func (m *lockedProjectAccounting) CreateStorageTally(ctx context.Context, tally accounting.BucketStorageTally) error { + m.Lock() + defer m.Unlock() + return m.db.CreateStorageTally(ctx, tally) +} + +// GetAllocatedBandwidthTotal returns the sum of GET bandwidth usage allocated for a projectID in the past time frame +func (m *lockedProjectAccounting) GetAllocatedBandwidthTotal(ctx context.Context, bucketID []byte, from time.Time) (int64, error) { + m.Lock() + defer m.Unlock() + return m.db.GetAllocatedBandwidthTotal(ctx, bucketID, from) +} + +// GetStorageTotals returns the current inline and remote storage usage for a projectID +func (m *lockedProjectAccounting) GetStorageTotals(ctx context.Context, projectID uuid.UUID) (int64, int64, error) { + m.Lock() + defer m.Unlock() + return m.db.GetStorageTotals(ctx, projectID) +} + +// SaveTallies saves the latest project info +func (m *lockedProjectAccounting) SaveTallies(ctx context.Context, intervalStart time.Time, bucketTallies map[string]*accounting.BucketTally) ([]accounting.BucketTally, error) { + m.Lock() + defer m.Unlock() + return m.db.SaveTallies(ctx, intervalStart, bucketTallies) +} + // RepairQueue returns queue for segments that need repairing func (m *locked) RepairQueue() queue.RepairQueue { m.Lock() @@ -800,3 +743,72 @@ func (m *lockedRepairQueue) SelectN(ctx context.Context, limit int) ([]pb.Injure defer m.Unlock() return m.db.SelectN(ctx, limit) } + +// StoragenodeAccounting returns database for storing information about storagenode use +func (m *locked) StoragenodeAccounting() accounting.StoragenodeAccounting { + m.Lock() + defer m.Unlock() + return &lockedStoragenodeAccounting{m.Locker, m.db.StoragenodeAccounting()} +} + +// lockedStoragenodeAccounting implements locking wrapper for accounting.StoragenodeAccounting +type lockedStoragenodeAccounting struct { + sync.Locker + db accounting.StoragenodeAccounting +} + +// DeleteTalliesBefore deletes all tallies prior to some time +func (m *lockedStoragenodeAccounting) DeleteTalliesBefore(ctx context.Context, latestRollup time.Time) error { + m.Lock() + defer m.Unlock() + return m.db.DeleteTalliesBefore(ctx, latestRollup) +} + +// GetBandwidthSince retrieves all bandwidth rollup entires since latestRollup +func (m *lockedStoragenodeAccounting) GetBandwidthSince(ctx context.Context, latestRollup time.Time) ([]*accounting.StoragenodeBandwidthRollup, error) { + m.Lock() + defer m.Unlock() + return m.db.GetBandwidthSince(ctx, latestRollup) +} + +// GetTallies retrieves all tallies +func (m *lockedStoragenodeAccounting) GetTallies(ctx context.Context) ([]*accounting.StoragenodeStorageTally, error) { + m.Lock() + defer m.Unlock() + return m.db.GetTallies(ctx) +} + +// GetTalliesSince retrieves all tallies since latestRollup +func (m *lockedStoragenodeAccounting) GetTalliesSince(ctx context.Context, latestRollup time.Time) ([]*accounting.StoragenodeStorageTally, error) { + m.Lock() + defer m.Unlock() + return m.db.GetTalliesSince(ctx, latestRollup) +} + +// LastTimestamp records and returns the latest last tallied time. +func (m *lockedStoragenodeAccounting) LastTimestamp(ctx context.Context, timestampType string) (time.Time, error) { + m.Lock() + defer m.Unlock() + return m.db.LastTimestamp(ctx, timestampType) +} + +// QueryPaymentInfo queries Nodes and Accounting_Rollup on nodeID +func (m *lockedStoragenodeAccounting) QueryPaymentInfo(ctx context.Context, start time.Time, end time.Time) ([]*accounting.CSVRow, error) { + m.Lock() + defer m.Unlock() + return m.db.QueryPaymentInfo(ctx, start, end) +} + +// SaveRollup records tally and bandwidth rollup aggregations to the database +func (m *lockedStoragenodeAccounting) SaveRollup(ctx context.Context, latestTally time.Time, stats accounting.RollupStats) error { + m.Lock() + defer m.Unlock() + return m.db.SaveRollup(ctx, latestTally, stats) +} + +// SaveTallies records tallies of data at rest +func (m *lockedStoragenodeAccounting) SaveTallies(ctx context.Context, latestTally time.Time, nodeData map[storj.NodeID]float64) error { + m.Lock() + defer m.Unlock() + return m.db.SaveTallies(ctx, latestTally, nodeData) +} diff --git a/satellite/satellitedb/migrate.go b/satellite/satellitedb/migrate.go index 247319ec6..688a33ae4 100644 --- a/satellite/satellitedb/migrate.go +++ b/satellite/satellitedb/migrate.go @@ -610,6 +610,16 @@ func (db *DB) PostgresMigration() *migrate.Migration { UPDATE nodes SET uptime_ratio = 1 WHERE total_uptime_count = 0;`, }, }, + { + Description: "Drops storagenode_storage_tally table, Renames accounting_raws to storagenode_storage_tally, and Drops data_type and created_at columns", + Version: 18, + Action: migrate.SQL{ + `DROP TABLE storagenode_storage_tallies CASCADE`, + `ALTER TABLE accounting_raws RENAME TO storagenode_storage_tallies`, + `ALTER TABLE storagenode_storage_tallies DROP COLUMN data_type`, + `ALTER TABLE storagenode_storage_tallies DROP COLUMN created_at`, + }, + }, }, } } diff --git a/satellite/satellitedb/projectaccounting.go b/satellite/satellitedb/projectaccounting.go new file mode 100644 index 000000000..1c8886cfe --- /dev/null +++ b/satellite/satellitedb/projectaccounting.go @@ -0,0 +1,116 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package satellitedb + +import ( + "bytes" + "context" + "database/sql" + "time" + + "github.com/skyrings/skyring-common/tools/uuid" + + "storj.io/storj/pkg/accounting" + "storj.io/storj/pkg/pb" + "storj.io/storj/pkg/storj" + dbx "storj.io/storj/satellite/satellitedb/dbx" +) + +// ProjectAccounting implements the accounting/db ProjectAccounting interface +type ProjectAccounting struct { + db *dbx.DB +} + +// SaveTallies saves the latest bucket info +func (db *ProjectAccounting) SaveTallies(ctx context.Context, intervalStart time.Time, bucketTallies map[string]*accounting.BucketTally) ([]accounting.BucketTally, error) { + if len(bucketTallies) == 0 { + return nil, Error.New("In SaveTallies with empty bucketTallies") + } + + var result []accounting.BucketTally + + for bucketID, info := range bucketTallies { + bucketIDComponents := storj.SplitPath(bucketID) + bucketName := dbx.BucketStorageTally_BucketName([]byte(bucketIDComponents[1])) + projectID := dbx.BucketStorageTally_ProjectId([]byte(bucketIDComponents[0])) + interval := dbx.BucketStorageTally_IntervalStart(intervalStart) + inlineBytes := dbx.BucketStorageTally_Inline(uint64(info.InlineBytes)) + remoteBytes := dbx.BucketStorageTally_Remote(uint64(info.RemoteBytes)) + rSegments := dbx.BucketStorageTally_RemoteSegmentsCount(uint(info.RemoteSegments)) + iSegments := dbx.BucketStorageTally_InlineSegmentsCount(uint(info.InlineSegments)) + objectCount := dbx.BucketStorageTally_ObjectCount(uint(info.Files)) + meta := dbx.BucketStorageTally_MetadataSize(uint64(info.MetadataSize)) + dbxTally, err := db.db.Create_BucketStorageTally(ctx, bucketName, projectID, interval, inlineBytes, remoteBytes, rSegments, iSegments, objectCount, meta) + if err != nil { + return nil, err + } + tally := accounting.BucketTally{ + BucketName: dbxTally.BucketName, + ProjectID: dbxTally.ProjectId, + InlineSegments: int64(dbxTally.InlineSegmentsCount), + RemoteSegments: int64(dbxTally.RemoteSegmentsCount), + Files: int64(dbxTally.ObjectCount), + InlineBytes: int64(dbxTally.Inline), + RemoteBytes: int64(dbxTally.Remote), + MetadataSize: int64(dbxTally.MetadataSize), + } + result = append(result, tally) + } + return result, nil +} + +// CreateStorageTally creates a record in the bucket_storage_tallies accounting table +func (db *ProjectAccounting) CreateStorageTally(ctx context.Context, tally accounting.BucketStorageTally) error { + _, err := db.db.Create_BucketStorageTally( + ctx, + dbx.BucketStorageTally_BucketName([]byte(tally.BucketName)), + dbx.BucketStorageTally_ProjectId(tally.ProjectID[:]), + dbx.BucketStorageTally_IntervalStart(tally.IntervalStart), + dbx.BucketStorageTally_Inline(uint64(tally.InlineBytes)), + dbx.BucketStorageTally_Remote(uint64(tally.RemoteBytes)), + dbx.BucketStorageTally_RemoteSegmentsCount(uint(tally.RemoteSegmentCount)), + dbx.BucketStorageTally_InlineSegmentsCount(uint(tally.InlineSegmentCount)), + dbx.BucketStorageTally_ObjectCount(uint(tally.ObjectCount)), + dbx.BucketStorageTally_MetadataSize(uint64(tally.MetadataSize)), + ) + if err != nil { + return err + } + return nil +} + +// GetAllocatedBandwidthTotal returns the sum of GET bandwidth usage allocated for a projectID for a time frame +func (db *ProjectAccounting) GetAllocatedBandwidthTotal(ctx context.Context, bucketID []byte, from time.Time) (int64, error) { + pathEl := bytes.Split(bucketID, []byte("/")) + _, projectID := pathEl[1], pathEl[0] + var sum *int64 + query := `SELECT SUM(allocated) FROM bucket_bandwidth_rollups WHERE project_id = ? AND action = ? AND interval_start > ?;` + err := db.db.QueryRow(db.db.Rebind(query), projectID, pb.PieceAction_GET, from).Scan(&sum) + if err == sql.ErrNoRows || sum == nil { + return 0, nil + } + + return *sum, err +} + +// GetStorageTotals returns the current inline and remote storage usage for a projectID +func (db *ProjectAccounting) GetStorageTotals(ctx context.Context, projectID uuid.UUID) (int64, int64, error) { + var inlineSum, remoteSum sql.NullInt64 + var intervalStart time.Time + + // Sum all the inline and remote values for a project that all share the same interval_start. + // All records for a project that have the same interval start are part of the same tally run. + // This should represent the most recent calculation of a project's total at rest storage. + query := `SELECT interval_start, SUM(inline), SUM(remote) + FROM bucket_storage_tallies + WHERE project_id = ? + GROUP BY interval_start + ORDER BY interval_start DESC LIMIT 1;` + + err := db.db.QueryRow(db.db.Rebind(query), projectID[:]).Scan(&intervalStart, &inlineSum, &remoteSum) + if err != nil || !inlineSum.Valid || !remoteSum.Valid { + return 0, 0, nil + } + return inlineSum.Int64, remoteSum.Int64, err +} diff --git a/satellite/satellitedb/storagenodeaccounting.go b/satellite/satellitedb/storagenodeaccounting.go new file mode 100644 index 000000000..da6bc479f --- /dev/null +++ b/satellite/satellitedb/storagenodeaccounting.go @@ -0,0 +1,194 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package satellitedb + +import ( + "context" + "database/sql" + "time" + + "github.com/zeebo/errs" + + "storj.io/storj/pkg/accounting" + "storj.io/storj/pkg/storj" + dbx "storj.io/storj/satellite/satellitedb/dbx" +) + +// StoragenodeAccounting implements the accounting/db StoragenodeAccounting interface +type StoragenodeAccounting struct { + db *dbx.DB +} + +// SaveTallies records raw tallies of at rest data to the database +func (db *StoragenodeAccounting) SaveTallies(ctx context.Context, latestTally time.Time, nodeData map[storj.NodeID]float64) error { + if len(nodeData) == 0 { + return Error.New("In SaveTallies with empty nodeData") + } + err := db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error { + for k, v := range nodeData { + nID := dbx.StoragenodeStorageTally_NodeId(k.Bytes()) + end := dbx.StoragenodeStorageTally_IntervalEndTime(latestTally) + total := dbx.StoragenodeStorageTally_DataTotal(v) + _, err := tx.Create_StoragenodeStorageTally(ctx, nID, end, total) + if err != nil { + return err + } + } + update := dbx.AccountingTimestamps_Update_Fields{Value: dbx.AccountingTimestamps_Value(latestTally)} + _, err := tx.Update_AccountingTimestamps_By_Name(ctx, dbx.AccountingTimestamps_Name(accounting.LastAtRestTally), update) + return err + }) + return Error.Wrap(err) +} + +// GetTallies retrieves all raw tallies +func (db *StoragenodeAccounting) GetTallies(ctx context.Context) ([]*accounting.StoragenodeStorageTally, error) { + raws, err := db.db.All_StoragenodeStorageTally(ctx) + out := make([]*accounting.StoragenodeStorageTally, len(raws)) + for i, r := range raws { + nodeID, err := storj.NodeIDFromBytes(r.NodeId) + if err != nil { + return nil, Error.Wrap(err) + } + out[i] = &accounting.StoragenodeStorageTally{ + ID: r.Id, + NodeID: nodeID, + IntervalEndTime: r.IntervalEndTime, + DataTotal: r.DataTotal, + } + } + return out, Error.Wrap(err) +} + +// GetTalliesSince retrieves all raw tallies since latestRollup +func (db *StoragenodeAccounting) GetTalliesSince(ctx context.Context, latestRollup time.Time) ([]*accounting.StoragenodeStorageTally, error) { + raws, err := db.db.All_StoragenodeStorageTally_By_IntervalEndTime_GreaterOrEqual(ctx, dbx.StoragenodeStorageTally_IntervalEndTime(latestRollup)) + out := make([]*accounting.StoragenodeStorageTally, len(raws)) + for i, r := range raws { + nodeID, err := storj.NodeIDFromBytes(r.NodeId) + if err != nil { + return nil, Error.Wrap(err) + } + out[i] = &accounting.StoragenodeStorageTally{ + ID: r.Id, + NodeID: nodeID, + IntervalEndTime: r.IntervalEndTime, + DataTotal: r.DataTotal, + } + } + return out, Error.Wrap(err) +} + +// GetBandwidthSince retrieves all storagenode_bandwidth_rollup entires since latestRollup +func (db *StoragenodeAccounting) GetBandwidthSince(ctx context.Context, latestRollup time.Time) ([]*accounting.StoragenodeBandwidthRollup, error) { + rollups, err := db.db.All_StoragenodeBandwidthRollup_By_IntervalStart_GreaterOrEqual(ctx, dbx.StoragenodeBandwidthRollup_IntervalStart(latestRollup)) + out := make([]*accounting.StoragenodeBandwidthRollup, len(rollups)) + for i, r := range rollups { + nodeID, err := storj.NodeIDFromBytes(r.StoragenodeId) + if err != nil { + return nil, Error.Wrap(err) + } + out[i] = &accounting.StoragenodeBandwidthRollup{ + NodeID: nodeID, + IntervalStart: r.IntervalStart, + Action: r.Action, + Settled: r.Settled, + } + } + return out, Error.Wrap(err) +} + +// SaveRollup records raw tallies of at rest data to the database +func (db *StoragenodeAccounting) SaveRollup(ctx context.Context, latestRollup time.Time, stats accounting.RollupStats) error { + if len(stats) == 0 { + return Error.New("In SaveRollup with empty nodeData") + } + err := db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error { + for _, arsByDate := range stats { + for _, ar := range arsByDate { + nID := dbx.AccountingRollup_NodeId(ar.NodeID.Bytes()) + start := dbx.AccountingRollup_StartTime(ar.StartTime) + put := dbx.AccountingRollup_PutTotal(ar.PutTotal) + get := dbx.AccountingRollup_GetTotal(ar.GetTotal) + audit := dbx.AccountingRollup_GetAuditTotal(ar.GetAuditTotal) + getRepair := dbx.AccountingRollup_GetRepairTotal(ar.GetRepairTotal) + putRepair := dbx.AccountingRollup_PutRepairTotal(ar.PutRepairTotal) + atRest := dbx.AccountingRollup_AtRestTotal(ar.AtRestTotal) + _, err := tx.Create_AccountingRollup(ctx, nID, start, put, get, audit, getRepair, putRepair, atRest) + if err != nil { + return err + } + } + } + update := dbx.AccountingTimestamps_Update_Fields{Value: dbx.AccountingTimestamps_Value(latestRollup)} + _, err := tx.Update_AccountingTimestamps_By_Name(ctx, dbx.AccountingTimestamps_Name(accounting.LastRollup), update) + return err + }) + return Error.Wrap(err) +} + +// LastTimestamp records the greatest last tallied time +func (db *StoragenodeAccounting) LastTimestamp(ctx context.Context, timestampType string) (time.Time, error) { + lastTally := time.Time{} + err := db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error { + lt, err := tx.Find_AccountingTimestamps_Value_By_Name(ctx, dbx.AccountingTimestamps_Name(timestampType)) + if lt == nil { + update := dbx.AccountingTimestamps_Value(lastTally) + _, err = tx.Create_AccountingTimestamps(ctx, dbx.AccountingTimestamps_Name(timestampType), update) + return err + } + lastTally = lt.Value + return err + }) + return lastTally, err +} + +// QueryPaymentInfo queries Overlay, Accounting Rollup on nodeID +func (db *StoragenodeAccounting) QueryPaymentInfo(ctx context.Context, start time.Time, end time.Time) ([]*accounting.CSVRow, error) { + var sqlStmt = `SELECT n.id, n.created_at, n.audit_success_ratio, r.at_rest_total, r.get_repair_total, + r.put_repair_total, r.get_audit_total, r.put_total, r.get_total, n.wallet + FROM ( + SELECT node_id, SUM(at_rest_total) AS at_rest_total, SUM(get_repair_total) AS get_repair_total, + SUM(put_repair_total) AS put_repair_total, SUM(get_audit_total) AS get_audit_total, + SUM(put_total) AS put_total, SUM(get_total) AS get_total + FROM accounting_rollups + WHERE start_time >= ? AND start_time < ? + GROUP BY node_id + ) r + LEFT JOIN nodes n ON n.id = r.node_id + ORDER BY n.id` + rows, err := db.db.DB.QueryContext(ctx, db.db.Rebind(sqlStmt), start.UTC(), end.UTC()) + if err != nil { + return nil, Error.Wrap(err) + } + defer func() { err = errs.Combine(err, rows.Close()) }() + csv := make([]*accounting.CSVRow, 0, 0) + for rows.Next() { + var nodeID []byte + r := &accounting.CSVRow{} + var wallet sql.NullString + err := rows.Scan(&nodeID, &r.NodeCreationDate, &r.AuditSuccessRatio, &r.AtRestTotal, &r.GetRepairTotal, + &r.PutRepairTotal, &r.GetAuditTotal, &r.PutTotal, &r.GetTotal, &wallet) + if err != nil { + return csv, Error.Wrap(err) + } + if wallet.Valid { + r.Wallet = wallet.String + } + id, err := storj.NodeIDFromBytes(nodeID) + if err != nil { + return csv, Error.Wrap(err) + } + r.NodeID = id + csv = append(csv, r) + } + return csv, nil +} + +// DeleteTalliesBefore deletes all raw tallies prior to some time +func (db *StoragenodeAccounting) DeleteTalliesBefore(ctx context.Context, latestRollup time.Time) error { + var deleteRawSQL = `DELETE FROM storagenode_storage_tallies WHERE interval_end_time < ?` + _, err := db.db.DB.ExecContext(ctx, db.db.Rebind(deleteRawSQL), latestRollup) + return err +} diff --git a/satellite/satellitedb/testdata/postgres.v18.sql b/satellite/satellitedb/testdata/postgres.v18.sql new file mode 100644 index 000000000..a0f547939 --- /dev/null +++ b/satellite/satellitedb/testdata/postgres.v18.sql @@ -0,0 +1,233 @@ +-- Copied from the corresponding version of dbx generated schema +CREATE TABLE accounting_rollups ( + id bigserial NOT NULL, + node_id bytea NOT NULL, + start_time timestamp with time zone NOT NULL, + put_total bigint NOT NULL, + get_total bigint NOT NULL, + get_audit_total bigint NOT NULL, + get_repair_total bigint NOT NULL, + put_repair_total bigint NOT NULL, + at_rest_total double precision NOT NULL, + PRIMARY KEY ( id ) +); +CREATE TABLE accounting_timestamps ( + name text NOT NULL, + value timestamp with time zone NOT NULL, + PRIMARY KEY ( name ) +); +CREATE TABLE bucket_bandwidth_rollups ( + bucket_name bytea NOT NULL, + project_id bytea NOT NULL, + interval_start timestamp NOT NULL, + interval_seconds integer NOT NULL, + action integer NOT NULL, + inline bigint NOT NULL, + allocated bigint NOT NULL, + settled bigint NOT NULL, + PRIMARY KEY ( bucket_name, project_id, interval_start, action ) +); +CREATE TABLE bucket_storage_tallies ( + bucket_name bytea NOT NULL, + project_id bytea NOT NULL, + interval_start timestamp NOT NULL, + inline bigint NOT NULL, + remote bigint NOT NULL, + remote_segments_count integer NOT NULL, + inline_segments_count integer NOT NULL, + object_count integer NOT NULL, + metadata_size bigint NOT NULL, + PRIMARY KEY ( bucket_name, project_id, interval_start ) +); +CREATE TABLE bucket_usages ( + id bytea NOT NULL, + bucket_id bytea NOT NULL, + rollup_end_time timestamp with time zone NOT NULL, + remote_stored_data bigint NOT NULL, + inline_stored_data bigint NOT NULL, + remote_segments integer NOT NULL, + inline_segments integer NOT NULL, + objects integer NOT NULL, + metadata_size bigint NOT NULL, + repair_egress bigint NOT NULL, + get_egress bigint NOT NULL, + audit_egress bigint NOT NULL, + PRIMARY KEY ( id ) +); +CREATE TABLE bwagreements ( + serialnum text NOT NULL, + storage_node_id bytea NOT NULL, + uplink_id bytea NOT NULL, + action bigint NOT NULL, + total bigint NOT NULL, + created_at timestamp with time zone NOT NULL, + expires_at timestamp with time zone NOT NULL, + PRIMARY KEY ( serialnum ) +); +CREATE TABLE certRecords ( + publickey bytea NOT NULL, + id bytea NOT NULL, + update_at timestamp with time zone NOT NULL, + PRIMARY KEY ( id ) +); +CREATE TABLE injuredsegments ( + path text NOT NULL, + data bytea NOT NULL, + attempted timestamp, + PRIMARY KEY ( path ) +); +CREATE TABLE irreparabledbs ( + segmentpath bytea NOT NULL, + segmentdetail bytea NOT NULL, + pieces_lost_count bigint NOT NULL, + seg_damaged_unix_sec bigint NOT NULL, + repair_attempt_count bigint NOT NULL, + PRIMARY KEY ( segmentpath ) +); +CREATE TABLE nodes ( + id bytea NOT NULL, + address text NOT NULL, + protocol integer NOT NULL, + type integer NOT NULL, + email text NOT NULL, + wallet text NOT NULL, + free_bandwidth bigint NOT NULL, + free_disk bigint NOT NULL, + major bigint NOT NULL, + minor bigint NOT NULL, + patch bigint NOT NULL, + hash text NOT NULL, + timestamp timestamp with time zone NOT NULL, + release boolean NOT NULL, + latency_90 bigint NOT NULL, + audit_success_count bigint NOT NULL, + total_audit_count bigint NOT NULL, + audit_success_ratio double precision NOT NULL, + uptime_success_count bigint NOT NULL, + total_uptime_count bigint NOT NULL, + uptime_ratio double precision NOT NULL, + created_at timestamp with time zone NOT NULL, + updated_at timestamp with time zone NOT NULL, + last_contact_success timestamp with time zone NOT NULL, + last_contact_failure timestamp with time zone NOT NULL, + PRIMARY KEY ( id ) +); +CREATE TABLE projects ( + id bytea NOT NULL, + name text NOT NULL, + description text NOT NULL, + created_at timestamp with time zone NOT NULL, + PRIMARY KEY ( id ) +); +CREATE TABLE registration_tokens ( + secret bytea NOT NULL, + owner_id bytea, + project_limit integer NOT NULL, + created_at timestamp with time zone NOT NULL, + PRIMARY KEY ( secret ), + UNIQUE ( owner_id ) +); +CREATE TABLE serial_numbers ( + id serial NOT NULL, + serial_number bytea NOT NULL, + bucket_id bytea NOT NULL, + expires_at timestamp NOT NULL, + PRIMARY KEY ( id ) +); +CREATE TABLE storagenode_bandwidth_rollups ( + storagenode_id bytea NOT NULL, + interval_start timestamp NOT NULL, + interval_seconds integer NOT NULL, + action integer NOT NULL, + allocated bigint NOT NULL, + settled bigint NOT NULL, + PRIMARY KEY ( storagenode_id, interval_start, action ) +); +CREATE TABLE storagenode_storage_tallies ( + id bigserial NOT NULL, + node_id bytea NOT NULL, + interval_end_time timestamp with time zone NOT NULL, + data_total double precision NOT NULL, + PRIMARY KEY ( id ) +); +CREATE TABLE users ( + id bytea NOT NULL, + full_name text NOT NULL, + short_name text, + email text NOT NULL, + password_hash bytea NOT NULL, + status integer NOT NULL, + created_at timestamp with time zone NOT NULL, + PRIMARY KEY ( id ) +); +CREATE TABLE api_keys ( + id bytea NOT NULL, + project_id bytea NOT NULL REFERENCES projects( id ) ON DELETE CASCADE, + key bytea NOT NULL, + name text NOT NULL, + created_at timestamp with time zone NOT NULL, + PRIMARY KEY ( id ), + UNIQUE ( key ), + UNIQUE ( name, project_id ) +); +CREATE TABLE project_members ( + member_id bytea NOT NULL REFERENCES users( id ) ON DELETE CASCADE, + project_id bytea NOT NULL REFERENCES projects( id ) ON DELETE CASCADE, + created_at timestamp with time zone NOT NULL, + PRIMARY KEY ( member_id, project_id ) +); +CREATE TABLE used_serials ( + serial_number_id integer NOT NULL REFERENCES serial_numbers( id ) ON DELETE CASCADE, + storage_node_id bytea NOT NULL, + PRIMARY KEY ( serial_number_id, storage_node_id ) +); +CREATE INDEX bucket_name_project_id_interval_start_interval_seconds ON bucket_bandwidth_rollups ( bucket_name, project_id, interval_start, interval_seconds ); +CREATE UNIQUE INDEX bucket_id_rollup ON bucket_usages ( bucket_id, rollup_end_time ); +CREATE UNIQUE INDEX serial_number ON serial_numbers ( serial_number ); +CREATE INDEX serial_numbers_expires_at_index ON serial_numbers ( expires_at ); +CREATE INDEX storagenode_id_interval_start_interval_seconds ON storagenode_bandwidth_rollups ( storagenode_id, interval_start, interval_seconds ); + +--- + +INSERT INTO "accounting_rollups"("id", "node_id", "start_time", "put_total", "get_total", "get_audit_total", "get_repair_total", "put_repair_total", "at_rest_total") VALUES (1, E'\\367M\\177\\251]t/\\022\\256\\214\\265\\025\\224\\204:\\217\\212\\0102<\\321\\374\\020&\\271Qc\\325\\261\\354\\246\\233'::bytea, '2019-02-09 00:00:00+00', 1000, 2000, 3000, 4000, 0, 5000); + +INSERT INTO "accounting_timestamps" VALUES ('LastAtRestTally', '0001-01-01 00:00:00+00'); +INSERT INTO "accounting_timestamps" VALUES ('LastRollup', '0001-01-01 00:00:00+00'); +INSERT INTO "accounting_timestamps" VALUES ('LastBandwidthTally', '0001-01-01 00:00:00+00'); + +INSERT INTO "nodes"("id", "address", "protocol", "type", "email", "wallet", "free_bandwidth", "free_disk", "major", "minor", "patch", "hash", "timestamp", "release","latency_90", "audit_success_count", "total_audit_count", "audit_success_ratio", "uptime_success_count", "total_uptime_count", "uptime_ratio", "created_at", "updated_at", "last_contact_success", "last_contact_failure") VALUES (E'\\153\\313\\233\\074\\327\\177\\136\\070\\346\\001', '127.0.0.1:55516', 0, 4, '', '', -1, -1, 0, 1, 0, '', 'epoch', false, 0, 0, 5, 0, 0, 5, 0, '2019-02-14 08:07:31.028103+00', '2019-02-14 08:07:31.108963+00', 'epoch', 'epoch'); +INSERT INTO "nodes"("id", "address", "protocol", "type", "email", "wallet", "free_bandwidth", "free_disk", "major", "minor", "patch", "hash", "timestamp", "release","latency_90", "audit_success_count", "total_audit_count", "audit_success_ratio", "uptime_success_count", "total_uptime_count", "uptime_ratio", "created_at", "updated_at", "last_contact_success", "last_contact_failure") VALUES (E'\\006\\223\\250R\\221\\005\\365\\377v>0\\266\\365\\216\\255?\\347\\244\\371?2\\264\\262\\230\\007<\\001\\262\\263\\237\\247n', '127.0.0.1:55518', 0, 4, '', '', -1, -1, 0, 1, 0, '', 'epoch', false, 0, 0, 0, 1, 3, 3, 1, '2019-02-14 08:07:31.028103+00', '2019-02-14 08:07:31.108963+00', 'epoch', 'epoch'); +INSERT INTO "nodes"("id", "address", "protocol", "type", "email", "wallet", "free_bandwidth", "free_disk", "major", "minor", "patch", "hash", "timestamp", "release","latency_90", "audit_success_count", "total_audit_count", "audit_success_ratio", "uptime_success_count", "total_uptime_count", "uptime_ratio", "created_at", "updated_at", "last_contact_success", "last_contact_failure") VALUES (E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014', '127.0.0.1:55517', 0, 4, '', '', -1, -1, 0, 1, 0, '', 'epoch', false, 0, 0, 0, 1, 0, 0, 1, '2019-02-14 08:07:31.028103+00', '2019-02-14 08:07:31.108963+00', 'epoch', 'epoch'); + + +INSERT INTO "projects"("id", "name", "description", "created_at") VALUES (E'\\022\\217/\\014\\376!K\\023\\276\\031\\311}m\\236\\205\\300'::bytea, 'ProjectName', 'projects description', '2019-02-14 08:28:24.254934+00'); +INSERT INTO "api_keys"("id", "project_id", "key", "name", "created_at") VALUES (E'\\334/\\302;\\225\\355O\\323\\276f\\247\\354/6\\241\\033'::bytea, E'\\022\\217/\\014\\376!K\\023\\276\\031\\311}m\\236\\205\\300'::bytea, E'\\000]\\326N \\343\\270L\\327\\027\\337\\242\\240\\322mOl\\0318\\251.P I'::bytea, 'key 2', '2019-02-14 08:28:24.267934+00'); + +INSERT INTO "users"("id", "full_name", "short_name", "email", "password_hash", "status", "created_at") VALUES (E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, 'Noahson', 'William', '1email1@ukr.net', E'some_readable_hash'::bytea, 1, '2019-02-14 08:28:24.614594+00'); +INSERT INTO "projects"("id", "name", "description", "created_at") VALUES (E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014'::bytea, 'projName1', 'Test project 1', '2019-02-14 08:28:24.636949+00'); +INSERT INTO "project_members"("member_id", "project_id", "created_at") VALUES (E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014'::bytea, '2019-02-14 08:28:24.677953+00'); + +INSERT INTO "bwagreements"("serialnum", "storage_node_id", "action", "total", "created_at", "expires_at", "uplink_id") VALUES ('8fc0ceaa-984c-4d52-bcf4-b5429e1e35e812FpiifDbcJkePa12jxjDEutKrfLmwzT7sz2jfVwpYqgtM8B74c', E'\\245Z[/\\333\\022\\011\\001\\036\\003\\204\\005\\032.\\206\\333E\\261\\342\\227=y,}aRaH6\\240\\370\\000'::bytea, 1, 666, '2019-02-14 15:09:54.420181+00', '2019-02-14 16:09:54+00', E'\\253Z+\\374eFm\\245$\\036\\206\\335\\247\\263\\350x\\\\\\304+\\364\\343\\364+\\276fIJQ\\361\\014\\232\\000'::bytea); +INSERT INTO "irreparabledbs" ("segmentpath", "segmentdetail", "pieces_lost_count", "seg_damaged_unix_sec", "repair_attempt_count") VALUES ('\x49616d5365676d656e746b6579696e666f30', '\x49616d5365676d656e7464657461696c696e666f30', 10, 1550159554, 10); + +INSERT INTO "injuredsegments" ("path", "data") VALUES ('0', '\x0a0130120100'); +INSERT INTO "injuredsegments" ("path", "data") VALUES ('here''s/a/great/path', '\x0a136865726527732f612f67726561742f70617468120a0102030405060708090a'); +INSERT INTO "injuredsegments" ("path", "data") VALUES ('yet/another/cool/path', '\x0a157965742f616e6f746865722f636f6f6c2f70617468120a0102030405060708090a'); +INSERT INTO "injuredsegments" ("path", "data") VALUES ('so/many/iconic/paths/to/choose/from', '\x0a23736f2f6d616e792f69636f6e69632f70617468732f746f2f63686f6f73652f66726f6d120a0102030405060708090a'); + +INSERT INTO "certrecords" VALUES (E'0Y0\\023\\006\\007*\\206H\\316=\\002\\001\\006\\010*\\206H\\316=\\003\\001\\007\\003B\\000\\004\\360\\267\\227\\377\\253u\\222\\337Y\\324C:GQ\\010\\277v\\010\\315D\\271\\333\\337.\\203\\023=C\\343\\014T%6\\027\\362?\\214\\326\\017U\\334\\000\\260\\224\\260J\\221\\304\\331F\\304\\221\\236zF,\\325\\326l\\215\\306\\365\\200\\022', E'L\\301|\\200\\247}F|1\\320\\232\\037n\\335\\241\\206\\244\\242\\207\\204.\\253\\357\\326\\352\\033Dt\\202`\\022\\325', '2019-02-14 08:07:31.335028+00'); + +INSERT INTO "bucket_usages" ("id", "bucket_id", "rollup_end_time", "remote_stored_data", "inline_stored_data", "remote_segments", "inline_segments", "objects", "metadata_size", "repair_egress", "get_egress", "audit_egress") VALUES (E'\\153\\313\\233\\074\\327\\177\\136\\070\\346\\001",'::bytea, E'\\366\\146\\032\\321\\316\\161\\070\\133\\302\\271",'::bytea, '2019-03-06 08:28:24.677953+00', 10, 11, 12, 13, 14, 15, 16, 17, 18); + +INSERT INTO "registration_tokens" ("secret", "owner_id", "project_limit", "created_at") VALUES (E'\\070\\127\\144\\013\\332\\344\\102\\376\\306\\056\\303\\130\\106\\132\\321\\276\\321\\274\\170\\264\\054\\333\\221\\116\\154\\221\\335\\070\\220\\146\\344\\216'::bytea, null, 1, '2019-02-14 08:28:24.677953+00'); + +INSERT INTO "serial_numbers" ("id", "serial_number", "bucket_id", "expires_at") VALUES (1, E'0123456701234567'::bytea, E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014/testbucket'::bytea, '2019-03-06 08:28:24.677953+00'); +INSERT INTO "used_serials" ("serial_number_id", "storage_node_id") VALUES (1, E'\\006\\223\\250R\\221\\005\\365\\377v>0\\266\\365\\216\\255?\\347\\244\\371?2\\264\\262\\230\\007<\\001\\262\\263\\237\\247n'); + +INSERT INTO "storagenode_bandwidth_rollups" ("storagenode_id", "interval_start", "interval_seconds", "action", "allocated", "settled") VALUES (E'\\006\\223\\250R\\221\\005\\365\\377v>0\\266\\365\\216\\255?\\347\\244\\371?2\\264\\262\\230\\007<\\001\\262\\263\\237\\247n', '2019-03-06 08:00:00.000000+00', 3600, 1, 1024, 2024); +INSERT INTO "storagenode_storage_tallies" VALUES (1, E'\\3510\\323\\225"~\\036<\\342\\330m\\0253Jhr\\246\\233K\\246#\\2303\\351\\256\\275j\\212UM\\362\\207', '2019-02-14 08:16:57.812849+00', 1000); + +INSERT INTO "bucket_bandwidth_rollups" ("bucket_name", "project_id", "interval_start", "interval_seconds", "action", "inline", "allocated", "settled") VALUES (E'testbucket'::bytea, E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014'::bytea,'2019-03-06 08:00:00.000000+00', 3600, 1, 1024, 2024, 3024); +INSERT INTO "bucket_storage_tallies" ("bucket_name", "project_id", "interval_start", "inline", "remote", "remote_segments_count", "inline_segments_count", "object_count", "metadata_size") VALUES (E'testbucket'::bytea, E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014'::bytea,'2019-03-06 08:00:00.000000+00', 4024, 5024, 0, 0, 0, 0); + +-- NEW DATA --