satellite: try to stream rollups to aggregation function to use less memory

this change tries really hard to never have all of the storage node
rollups in memory at the same time, up until the rollups are actually
getting summed together.

Change-Id: If67f49e7d71106798d996a6850b3e48671bd9e18
This commit is contained in:
JT Olio 2020-11-29 09:13:06 -07:00
parent 6aae21541f
commit 6bce907cb0
9 changed files with 323 additions and 172 deletions

View File

@ -328,6 +328,7 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
db, err := satellitedb.Open(ctx, log.Named("db"), runCfg.Database, satellitedb.Options{
ReportedRollupsReadBatchSize: runCfg.Orders.SettlementBatchSize,
SaveRollupBatchSize: runCfg.Tally.SaveRollupBatchSize,
ReadRollupBatchSize: runCfg.Tally.ReadRollupBatchSize,
})
if err != nil {
return errs.New("Error starting master database on satellite: %+v", err)

View File

@ -149,7 +149,7 @@ type StoragenodeAccounting interface {
// GetTalliesSince retrieves all tallies since latestRollup
GetTalliesSince(ctx context.Context, latestRollup time.Time) ([]*StoragenodeStorageTally, error)
// GetBandwidthSince retrieves all bandwidth rollup entires since latestRollup
GetBandwidthSince(ctx context.Context, latestRollup time.Time) ([]*StoragenodeBandwidthRollup, error)
GetBandwidthSince(ctx context.Context, latestRollup time.Time, cb func(context.Context, *StoragenodeBandwidthRollup) error) error
// SaveRollup records tally and bandwidth rollup aggregations to the database
SaveRollup(ctx context.Context, latestTally time.Time, stats RollupStats) error
// LastTimestamp records and returns the latest last tallied time.

View File

@ -148,15 +148,7 @@ func (r *Service) RollupStorage(ctx context.Context, lastRollup time.Time, rollu
func (r *Service) RollupBW(ctx context.Context, lastRollup time.Time, rollupStats accounting.RollupStats) (err error) {
defer mon.Task()(&ctx)(&err)
var latestTally time.Time
bws, err := r.sdb.GetBandwidthSince(ctx, lastRollup.UTC())
if err != nil {
return Error.Wrap(err)
}
if len(bws) == 0 {
r.logger.Info("Rollup found no new bw rollups")
return nil
}
for _, row := range bws {
err = r.sdb.GetBandwidthSince(ctx, lastRollup.UTC(), func(ctx context.Context, row *accounting.StoragenodeBandwidthRollup) error {
nodeID := row.NodeID
// interval is the time the bw order was saved
interval := row.IntervalStart.UTC()
@ -186,7 +178,12 @@ func (r *Service) RollupBW(ctx context.Context, lastRollup time.Time, rollupStat
default:
r.logger.Info("delete order type")
}
}
return nil
})
if err != nil {
return Error.Wrap(err)
}
// TODO: we don't do anything with latestTally after figuring it out. should we?
return nil
}

View File

@ -29,6 +29,7 @@ var (
type Config struct {
Interval time.Duration `help:"how frequently the tally service should run" releaseDefault:"1h" devDefault:"30s"`
SaveRollupBatchSize int `help:"how large of batches SaveRollup should process at a time" default:"1000"`
ReadRollupBatchSize int `help:"how large of batches GetBandwidthSince should process at a time" default:"10000"`
}
// Service is the tally service for data stored on each storage node

View File

@ -74,8 +74,9 @@ type Options struct {
// billable bandwidth from the reported serials table.
ReportedRollupsReadBatchSize int
// How many storage node rollups to save in one batch.
// How many storage node rollups to save/read in one batch.
SaveRollupBatchSize int
ReadRollupBatchSize int
}
var _ dbx.DBMethods = &satelliteDB{}

View File

@ -657,17 +657,18 @@ model storagenode_bandwidth_rollup (
create storagenode_bandwidth_rollup()
read all (
select storagenode_bandwidth_rollup
where storagenode_bandwidth_rollup.interval_start >= ?
)
read all (
select storagenode_bandwidth_rollup
where storagenode_bandwidth_rollup.storagenode_id = ?
where storagenode_bandwidth_rollup.interval_start = ?
)
read paged (
select storagenode_bandwidth_rollup
where storagenode_bandwidth_rollup.storagenode_id = ?
where storagenode_bandwidth_rollup.interval_start >= ?
)
///////////////////////////////////////
// orders phase2->phase3 rollout table
///////////////////////////////////////
@ -688,9 +689,10 @@ model storagenode_bandwidth_rollup_phase2 (
create storagenode_bandwidth_rollup_phase2 ( )
read all (
select storagenode_bandwidth_rollup_phase2
where storagenode_bandwidth_rollup_phase2.interval_start >= ?
read paged (
select storagenode_bandwidth_rollup_phase2
where storagenode_bandwidth_rollup_phase2.storagenode_id = ?
where storagenode_bandwidth_rollup_phase2.interval_start >= ?
)
model storagenode_storage_tally (

View File

@ -8936,6 +8936,20 @@ type Paged_PendingSerialQueue_Continuation struct {
_set bool
}
type Paged_StoragenodeBandwidthRollupPhase2_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation struct {
_value_storagenode_id []byte
_value_interval_start time.Time
_value_action uint
_set bool
}
type Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation struct {
_value_storagenode_id []byte
_value_interval_start time.Time
_value_action uint
_set bool
}
type ProjectLimit_Row struct {
ProjectLimit int
}
@ -11713,40 +11727,6 @@ func (obj *pgxImpl) All_BucketStorageTally_By_ProjectId_And_BucketName_And_Inter
}
func (obj *pgxImpl) All_StoragenodeBandwidthRollup_By_IntervalStart_GreaterOrEqual(ctx context.Context,
storagenode_bandwidth_rollup_interval_start_greater_or_equal StoragenodeBandwidthRollup_IntervalStart_Field) (
rows []*StoragenodeBandwidthRollup, err error) {
defer mon.Task()(&ctx)(&err)
var __embed_stmt = __sqlbundle_Literal("SELECT storagenode_bandwidth_rollups.storagenode_id, storagenode_bandwidth_rollups.interval_start, storagenode_bandwidth_rollups.interval_seconds, storagenode_bandwidth_rollups.action, storagenode_bandwidth_rollups.allocated, storagenode_bandwidth_rollups.settled FROM storagenode_bandwidth_rollups WHERE storagenode_bandwidth_rollups.interval_start >= ?")
var __values []interface{}
__values = append(__values, storagenode_bandwidth_rollup_interval_start_greater_or_equal.value())
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __values...)
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, obj.makeErr(err)
}
defer __rows.Close()
for __rows.Next() {
storagenode_bandwidth_rollup := &StoragenodeBandwidthRollup{}
err = __rows.Scan(&storagenode_bandwidth_rollup.StoragenodeId, &storagenode_bandwidth_rollup.IntervalStart, &storagenode_bandwidth_rollup.IntervalSeconds, &storagenode_bandwidth_rollup.Action, &storagenode_bandwidth_rollup.Allocated, &storagenode_bandwidth_rollup.Settled)
if err != nil {
return nil, obj.makeErr(err)
}
rows = append(rows, storagenode_bandwidth_rollup)
}
if err := __rows.Err(); err != nil {
return nil, obj.makeErr(err)
}
return rows, nil
}
func (obj *pgxImpl) All_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart(ctx context.Context,
storagenode_bandwidth_rollup_storagenode_id StoragenodeBandwidthRollup_StoragenodeId_Field,
storagenode_bandwidth_rollup_interval_start StoragenodeBandwidthRollup_IntervalStart_Field) (
@ -11782,37 +11762,103 @@ func (obj *pgxImpl) All_StoragenodeBandwidthRollup_By_StoragenodeId_And_Interval
}
func (obj *pgxImpl) All_StoragenodeBandwidthRollupPhase2_By_IntervalStart_GreaterOrEqual(ctx context.Context,
storagenode_bandwidth_rollup_phase2_interval_start_greater_or_equal StoragenodeBandwidthRollupPhase2_IntervalStart_Field) (
rows []*StoragenodeBandwidthRollupPhase2, err error) {
func (obj *pgxImpl) Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual(ctx context.Context,
storagenode_bandwidth_rollup_storagenode_id StoragenodeBandwidthRollup_StoragenodeId_Field,
storagenode_bandwidth_rollup_interval_start_greater_or_equal StoragenodeBandwidthRollup_IntervalStart_Field,
limit int, start *Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation) (
rows []*StoragenodeBandwidthRollup, next *Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation, err error) {
defer mon.Task()(&ctx)(&err)
var __embed_stmt = __sqlbundle_Literal("SELECT storagenode_bandwidth_rollups_phase2.storagenode_id, storagenode_bandwidth_rollups_phase2.interval_start, storagenode_bandwidth_rollups_phase2.interval_seconds, storagenode_bandwidth_rollups_phase2.action, storagenode_bandwidth_rollups_phase2.allocated, storagenode_bandwidth_rollups_phase2.settled FROM storagenode_bandwidth_rollups_phase2 WHERE storagenode_bandwidth_rollups_phase2.interval_start >= ?")
var __embed_stmt = __sqlbundle_Literal("SELECT storagenode_bandwidth_rollups.storagenode_id, storagenode_bandwidth_rollups.interval_start, storagenode_bandwidth_rollups.interval_seconds, storagenode_bandwidth_rollups.action, storagenode_bandwidth_rollups.allocated, storagenode_bandwidth_rollups.settled, storagenode_bandwidth_rollups.storagenode_id, storagenode_bandwidth_rollups.interval_start, storagenode_bandwidth_rollups.action FROM storagenode_bandwidth_rollups WHERE storagenode_bandwidth_rollups.storagenode_id = ? AND storagenode_bandwidth_rollups.interval_start >= ? AND (storagenode_bandwidth_rollups.storagenode_id, storagenode_bandwidth_rollups.interval_start, storagenode_bandwidth_rollups.action) > (?, ?, ?) ORDER BY storagenode_bandwidth_rollups.storagenode_id, storagenode_bandwidth_rollups.interval_start, storagenode_bandwidth_rollups.action LIMIT ?")
var __embed_first_stmt = __sqlbundle_Literal("SELECT storagenode_bandwidth_rollups.storagenode_id, storagenode_bandwidth_rollups.interval_start, storagenode_bandwidth_rollups.interval_seconds, storagenode_bandwidth_rollups.action, storagenode_bandwidth_rollups.allocated, storagenode_bandwidth_rollups.settled, storagenode_bandwidth_rollups.storagenode_id, storagenode_bandwidth_rollups.interval_start, storagenode_bandwidth_rollups.action FROM storagenode_bandwidth_rollups WHERE storagenode_bandwidth_rollups.storagenode_id = ? AND storagenode_bandwidth_rollups.interval_start >= ? ORDER BY storagenode_bandwidth_rollups.storagenode_id, storagenode_bandwidth_rollups.interval_start, storagenode_bandwidth_rollups.action LIMIT ?")
var __values []interface{}
__values = append(__values, storagenode_bandwidth_rollup_phase2_interval_start_greater_or_equal.value())
__values = append(__values, storagenode_bandwidth_rollup_storagenode_id.value(), storagenode_bandwidth_rollup_interval_start_greater_or_equal.value())
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
var __stmt string
if start != nil && start._set {
__values = append(__values, start._value_storagenode_id, start._value_interval_start, start._value_action, limit)
__stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
} else {
__values = append(__values, limit)
__stmt = __sqlbundle_Render(obj.dialect, __embed_first_stmt)
}
obj.logStmt(__stmt, __values...)
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, obj.makeErr(err)
return nil, nil, obj.makeErr(err)
}
defer __rows.Close()
var __continuation Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation
__continuation._set = true
for __rows.Next() {
storagenode_bandwidth_rollup_phase2 := &StoragenodeBandwidthRollupPhase2{}
err = __rows.Scan(&storagenode_bandwidth_rollup_phase2.StoragenodeId, &storagenode_bandwidth_rollup_phase2.IntervalStart, &storagenode_bandwidth_rollup_phase2.IntervalSeconds, &storagenode_bandwidth_rollup_phase2.Action, &storagenode_bandwidth_rollup_phase2.Allocated, &storagenode_bandwidth_rollup_phase2.Settled)
storagenode_bandwidth_rollup := &StoragenodeBandwidthRollup{}
err = __rows.Scan(&storagenode_bandwidth_rollup.StoragenodeId, &storagenode_bandwidth_rollup.IntervalStart, &storagenode_bandwidth_rollup.IntervalSeconds, &storagenode_bandwidth_rollup.Action, &storagenode_bandwidth_rollup.Allocated, &storagenode_bandwidth_rollup.Settled, &__continuation._value_storagenode_id, &__continuation._value_interval_start, &__continuation._value_action)
if err != nil {
return nil, obj.makeErr(err)
return nil, nil, obj.makeErr(err)
}
rows = append(rows, storagenode_bandwidth_rollup_phase2)
rows = append(rows, storagenode_bandwidth_rollup)
next = &__continuation
}
if err := __rows.Err(); err != nil {
return nil, obj.makeErr(err)
return nil, nil, obj.makeErr(err)
}
return rows, nil
return rows, next, nil
}
func (obj *pgxImpl) Paged_StoragenodeBandwidthRollupPhase2_By_StoragenodeId_And_IntervalStart_GreaterOrEqual(ctx context.Context,
storagenode_bandwidth_rollup_phase2_storagenode_id StoragenodeBandwidthRollupPhase2_StoragenodeId_Field,
storagenode_bandwidth_rollup_phase2_interval_start_greater_or_equal StoragenodeBandwidthRollupPhase2_IntervalStart_Field,
limit int, start *Paged_StoragenodeBandwidthRollupPhase2_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation) (
rows []*StoragenodeBandwidthRollupPhase2, next *Paged_StoragenodeBandwidthRollupPhase2_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation, err error) {
defer mon.Task()(&ctx)(&err)
var __embed_stmt = __sqlbundle_Literal("SELECT storagenode_bandwidth_rollups_phase2.storagenode_id, storagenode_bandwidth_rollups_phase2.interval_start, storagenode_bandwidth_rollups_phase2.interval_seconds, storagenode_bandwidth_rollups_phase2.action, storagenode_bandwidth_rollups_phase2.allocated, storagenode_bandwidth_rollups_phase2.settled, storagenode_bandwidth_rollups_phase2.storagenode_id, storagenode_bandwidth_rollups_phase2.interval_start, storagenode_bandwidth_rollups_phase2.action FROM storagenode_bandwidth_rollups_phase2 WHERE storagenode_bandwidth_rollups_phase2.storagenode_id = ? AND storagenode_bandwidth_rollups_phase2.interval_start >= ? AND (storagenode_bandwidth_rollups_phase2.storagenode_id, storagenode_bandwidth_rollups_phase2.interval_start, storagenode_bandwidth_rollups_phase2.action) > (?, ?, ?) ORDER BY storagenode_bandwidth_rollups_phase2.storagenode_id, storagenode_bandwidth_rollups_phase2.interval_start, storagenode_bandwidth_rollups_phase2.action LIMIT ?")
var __embed_first_stmt = __sqlbundle_Literal("SELECT storagenode_bandwidth_rollups_phase2.storagenode_id, storagenode_bandwidth_rollups_phase2.interval_start, storagenode_bandwidth_rollups_phase2.interval_seconds, storagenode_bandwidth_rollups_phase2.action, storagenode_bandwidth_rollups_phase2.allocated, storagenode_bandwidth_rollups_phase2.settled, storagenode_bandwidth_rollups_phase2.storagenode_id, storagenode_bandwidth_rollups_phase2.interval_start, storagenode_bandwidth_rollups_phase2.action FROM storagenode_bandwidth_rollups_phase2 WHERE storagenode_bandwidth_rollups_phase2.storagenode_id = ? AND storagenode_bandwidth_rollups_phase2.interval_start >= ? ORDER BY storagenode_bandwidth_rollups_phase2.storagenode_id, storagenode_bandwidth_rollups_phase2.interval_start, storagenode_bandwidth_rollups_phase2.action LIMIT ?")
var __values []interface{}
__values = append(__values, storagenode_bandwidth_rollup_phase2_storagenode_id.value(), storagenode_bandwidth_rollup_phase2_interval_start_greater_or_equal.value())
var __stmt string
if start != nil && start._set {
__values = append(__values, start._value_storagenode_id, start._value_interval_start, start._value_action, limit)
__stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
} else {
__values = append(__values, limit)
__stmt = __sqlbundle_Render(obj.dialect, __embed_first_stmt)
}
obj.logStmt(__stmt, __values...)
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, nil, obj.makeErr(err)
}
defer __rows.Close()
var __continuation Paged_StoragenodeBandwidthRollupPhase2_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation
__continuation._set = true
for __rows.Next() {
storagenode_bandwidth_rollup_phase2 := &StoragenodeBandwidthRollupPhase2{}
err = __rows.Scan(&storagenode_bandwidth_rollup_phase2.StoragenodeId, &storagenode_bandwidth_rollup_phase2.IntervalStart, &storagenode_bandwidth_rollup_phase2.IntervalSeconds, &storagenode_bandwidth_rollup_phase2.Action, &storagenode_bandwidth_rollup_phase2.Allocated, &storagenode_bandwidth_rollup_phase2.Settled, &__continuation._value_storagenode_id, &__continuation._value_interval_start, &__continuation._value_action)
if err != nil {
return nil, nil, obj.makeErr(err)
}
rows = append(rows, storagenode_bandwidth_rollup_phase2)
next = &__continuation
}
if err := __rows.Err(); err != nil {
return nil, nil, obj.makeErr(err)
}
return rows, next, nil
}
@ -17993,40 +18039,6 @@ func (obj *pgxcockroachImpl) All_BucketStorageTally_By_ProjectId_And_BucketName_
}
func (obj *pgxcockroachImpl) All_StoragenodeBandwidthRollup_By_IntervalStart_GreaterOrEqual(ctx context.Context,
storagenode_bandwidth_rollup_interval_start_greater_or_equal StoragenodeBandwidthRollup_IntervalStart_Field) (
rows []*StoragenodeBandwidthRollup, err error) {
defer mon.Task()(&ctx)(&err)
var __embed_stmt = __sqlbundle_Literal("SELECT storagenode_bandwidth_rollups.storagenode_id, storagenode_bandwidth_rollups.interval_start, storagenode_bandwidth_rollups.interval_seconds, storagenode_bandwidth_rollups.action, storagenode_bandwidth_rollups.allocated, storagenode_bandwidth_rollups.settled FROM storagenode_bandwidth_rollups WHERE storagenode_bandwidth_rollups.interval_start >= ?")
var __values []interface{}
__values = append(__values, storagenode_bandwidth_rollup_interval_start_greater_or_equal.value())
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __values...)
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, obj.makeErr(err)
}
defer __rows.Close()
for __rows.Next() {
storagenode_bandwidth_rollup := &StoragenodeBandwidthRollup{}
err = __rows.Scan(&storagenode_bandwidth_rollup.StoragenodeId, &storagenode_bandwidth_rollup.IntervalStart, &storagenode_bandwidth_rollup.IntervalSeconds, &storagenode_bandwidth_rollup.Action, &storagenode_bandwidth_rollup.Allocated, &storagenode_bandwidth_rollup.Settled)
if err != nil {
return nil, obj.makeErr(err)
}
rows = append(rows, storagenode_bandwidth_rollup)
}
if err := __rows.Err(); err != nil {
return nil, obj.makeErr(err)
}
return rows, nil
}
func (obj *pgxcockroachImpl) All_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart(ctx context.Context,
storagenode_bandwidth_rollup_storagenode_id StoragenodeBandwidthRollup_StoragenodeId_Field,
storagenode_bandwidth_rollup_interval_start StoragenodeBandwidthRollup_IntervalStart_Field) (
@ -18062,37 +18074,103 @@ func (obj *pgxcockroachImpl) All_StoragenodeBandwidthRollup_By_StoragenodeId_And
}
func (obj *pgxcockroachImpl) All_StoragenodeBandwidthRollupPhase2_By_IntervalStart_GreaterOrEqual(ctx context.Context,
storagenode_bandwidth_rollup_phase2_interval_start_greater_or_equal StoragenodeBandwidthRollupPhase2_IntervalStart_Field) (
rows []*StoragenodeBandwidthRollupPhase2, err error) {
func (obj *pgxcockroachImpl) Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual(ctx context.Context,
storagenode_bandwidth_rollup_storagenode_id StoragenodeBandwidthRollup_StoragenodeId_Field,
storagenode_bandwidth_rollup_interval_start_greater_or_equal StoragenodeBandwidthRollup_IntervalStart_Field,
limit int, start *Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation) (
rows []*StoragenodeBandwidthRollup, next *Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation, err error) {
defer mon.Task()(&ctx)(&err)
var __embed_stmt = __sqlbundle_Literal("SELECT storagenode_bandwidth_rollups_phase2.storagenode_id, storagenode_bandwidth_rollups_phase2.interval_start, storagenode_bandwidth_rollups_phase2.interval_seconds, storagenode_bandwidth_rollups_phase2.action, storagenode_bandwidth_rollups_phase2.allocated, storagenode_bandwidth_rollups_phase2.settled FROM storagenode_bandwidth_rollups_phase2 WHERE storagenode_bandwidth_rollups_phase2.interval_start >= ?")
var __embed_stmt = __sqlbundle_Literal("SELECT storagenode_bandwidth_rollups.storagenode_id, storagenode_bandwidth_rollups.interval_start, storagenode_bandwidth_rollups.interval_seconds, storagenode_bandwidth_rollups.action, storagenode_bandwidth_rollups.allocated, storagenode_bandwidth_rollups.settled, storagenode_bandwidth_rollups.storagenode_id, storagenode_bandwidth_rollups.interval_start, storagenode_bandwidth_rollups.action FROM storagenode_bandwidth_rollups WHERE storagenode_bandwidth_rollups.storagenode_id = ? AND storagenode_bandwidth_rollups.interval_start >= ? AND (storagenode_bandwidth_rollups.storagenode_id, storagenode_bandwidth_rollups.interval_start, storagenode_bandwidth_rollups.action) > (?, ?, ?) ORDER BY storagenode_bandwidth_rollups.storagenode_id, storagenode_bandwidth_rollups.interval_start, storagenode_bandwidth_rollups.action LIMIT ?")
var __embed_first_stmt = __sqlbundle_Literal("SELECT storagenode_bandwidth_rollups.storagenode_id, storagenode_bandwidth_rollups.interval_start, storagenode_bandwidth_rollups.interval_seconds, storagenode_bandwidth_rollups.action, storagenode_bandwidth_rollups.allocated, storagenode_bandwidth_rollups.settled, storagenode_bandwidth_rollups.storagenode_id, storagenode_bandwidth_rollups.interval_start, storagenode_bandwidth_rollups.action FROM storagenode_bandwidth_rollups WHERE storagenode_bandwidth_rollups.storagenode_id = ? AND storagenode_bandwidth_rollups.interval_start >= ? ORDER BY storagenode_bandwidth_rollups.storagenode_id, storagenode_bandwidth_rollups.interval_start, storagenode_bandwidth_rollups.action LIMIT ?")
var __values []interface{}
__values = append(__values, storagenode_bandwidth_rollup_phase2_interval_start_greater_or_equal.value())
__values = append(__values, storagenode_bandwidth_rollup_storagenode_id.value(), storagenode_bandwidth_rollup_interval_start_greater_or_equal.value())
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
var __stmt string
if start != nil && start._set {
__values = append(__values, start._value_storagenode_id, start._value_interval_start, start._value_action, limit)
__stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
} else {
__values = append(__values, limit)
__stmt = __sqlbundle_Render(obj.dialect, __embed_first_stmt)
}
obj.logStmt(__stmt, __values...)
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, obj.makeErr(err)
return nil, nil, obj.makeErr(err)
}
defer __rows.Close()
var __continuation Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation
__continuation._set = true
for __rows.Next() {
storagenode_bandwidth_rollup_phase2 := &StoragenodeBandwidthRollupPhase2{}
err = __rows.Scan(&storagenode_bandwidth_rollup_phase2.StoragenodeId, &storagenode_bandwidth_rollup_phase2.IntervalStart, &storagenode_bandwidth_rollup_phase2.IntervalSeconds, &storagenode_bandwidth_rollup_phase2.Action, &storagenode_bandwidth_rollup_phase2.Allocated, &storagenode_bandwidth_rollup_phase2.Settled)
storagenode_bandwidth_rollup := &StoragenodeBandwidthRollup{}
err = __rows.Scan(&storagenode_bandwidth_rollup.StoragenodeId, &storagenode_bandwidth_rollup.IntervalStart, &storagenode_bandwidth_rollup.IntervalSeconds, &storagenode_bandwidth_rollup.Action, &storagenode_bandwidth_rollup.Allocated, &storagenode_bandwidth_rollup.Settled, &__continuation._value_storagenode_id, &__continuation._value_interval_start, &__continuation._value_action)
if err != nil {
return nil, obj.makeErr(err)
return nil, nil, obj.makeErr(err)
}
rows = append(rows, storagenode_bandwidth_rollup_phase2)
rows = append(rows, storagenode_bandwidth_rollup)
next = &__continuation
}
if err := __rows.Err(); err != nil {
return nil, obj.makeErr(err)
return nil, nil, obj.makeErr(err)
}
return rows, nil
return rows, next, nil
}
func (obj *pgxcockroachImpl) Paged_StoragenodeBandwidthRollupPhase2_By_StoragenodeId_And_IntervalStart_GreaterOrEqual(ctx context.Context,
storagenode_bandwidth_rollup_phase2_storagenode_id StoragenodeBandwidthRollupPhase2_StoragenodeId_Field,
storagenode_bandwidth_rollup_phase2_interval_start_greater_or_equal StoragenodeBandwidthRollupPhase2_IntervalStart_Field,
limit int, start *Paged_StoragenodeBandwidthRollupPhase2_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation) (
rows []*StoragenodeBandwidthRollupPhase2, next *Paged_StoragenodeBandwidthRollupPhase2_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation, err error) {
defer mon.Task()(&ctx)(&err)
var __embed_stmt = __sqlbundle_Literal("SELECT storagenode_bandwidth_rollups_phase2.storagenode_id, storagenode_bandwidth_rollups_phase2.interval_start, storagenode_bandwidth_rollups_phase2.interval_seconds, storagenode_bandwidth_rollups_phase2.action, storagenode_bandwidth_rollups_phase2.allocated, storagenode_bandwidth_rollups_phase2.settled, storagenode_bandwidth_rollups_phase2.storagenode_id, storagenode_bandwidth_rollups_phase2.interval_start, storagenode_bandwidth_rollups_phase2.action FROM storagenode_bandwidth_rollups_phase2 WHERE storagenode_bandwidth_rollups_phase2.storagenode_id = ? AND storagenode_bandwidth_rollups_phase2.interval_start >= ? AND (storagenode_bandwidth_rollups_phase2.storagenode_id, storagenode_bandwidth_rollups_phase2.interval_start, storagenode_bandwidth_rollups_phase2.action) > (?, ?, ?) ORDER BY storagenode_bandwidth_rollups_phase2.storagenode_id, storagenode_bandwidth_rollups_phase2.interval_start, storagenode_bandwidth_rollups_phase2.action LIMIT ?")
var __embed_first_stmt = __sqlbundle_Literal("SELECT storagenode_bandwidth_rollups_phase2.storagenode_id, storagenode_bandwidth_rollups_phase2.interval_start, storagenode_bandwidth_rollups_phase2.interval_seconds, storagenode_bandwidth_rollups_phase2.action, storagenode_bandwidth_rollups_phase2.allocated, storagenode_bandwidth_rollups_phase2.settled, storagenode_bandwidth_rollups_phase2.storagenode_id, storagenode_bandwidth_rollups_phase2.interval_start, storagenode_bandwidth_rollups_phase2.action FROM storagenode_bandwidth_rollups_phase2 WHERE storagenode_bandwidth_rollups_phase2.storagenode_id = ? AND storagenode_bandwidth_rollups_phase2.interval_start >= ? ORDER BY storagenode_bandwidth_rollups_phase2.storagenode_id, storagenode_bandwidth_rollups_phase2.interval_start, storagenode_bandwidth_rollups_phase2.action LIMIT ?")
var __values []interface{}
__values = append(__values, storagenode_bandwidth_rollup_phase2_storagenode_id.value(), storagenode_bandwidth_rollup_phase2_interval_start_greater_or_equal.value())
var __stmt string
if start != nil && start._set {
__values = append(__values, start._value_storagenode_id, start._value_interval_start, start._value_action, limit)
__stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
} else {
__values = append(__values, limit)
__stmt = __sqlbundle_Render(obj.dialect, __embed_first_stmt)
}
obj.logStmt(__stmt, __values...)
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, nil, obj.makeErr(err)
}
defer __rows.Close()
var __continuation Paged_StoragenodeBandwidthRollupPhase2_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation
__continuation._set = true
for __rows.Next() {
storagenode_bandwidth_rollup_phase2 := &StoragenodeBandwidthRollupPhase2{}
err = __rows.Scan(&storagenode_bandwidth_rollup_phase2.StoragenodeId, &storagenode_bandwidth_rollup_phase2.IntervalStart, &storagenode_bandwidth_rollup_phase2.IntervalSeconds, &storagenode_bandwidth_rollup_phase2.Action, &storagenode_bandwidth_rollup_phase2.Allocated, &storagenode_bandwidth_rollup_phase2.Settled, &__continuation._value_storagenode_id, &__continuation._value_interval_start, &__continuation._value_action)
if err != nil {
return nil, nil, obj.makeErr(err)
}
rows = append(rows, storagenode_bandwidth_rollup_phase2)
next = &__continuation
}
if err := __rows.Err(); err != nil {
return nil, nil, obj.makeErr(err)
}
return rows, next, nil
}
@ -21731,26 +21809,6 @@ func (rx *Rx) All_Project_By_ProjectMember_MemberId_OrderBy_Asc_Project_Name(ctx
return tx.All_Project_By_ProjectMember_MemberId_OrderBy_Asc_Project_Name(ctx, project_member_member_id)
}
func (rx *Rx) All_StoragenodeBandwidthRollupPhase2_By_IntervalStart_GreaterOrEqual(ctx context.Context,
storagenode_bandwidth_rollup_phase2_interval_start_greater_or_equal StoragenodeBandwidthRollupPhase2_IntervalStart_Field) (
rows []*StoragenodeBandwidthRollupPhase2, err error) {
var tx *Tx
if tx, err = rx.getTx(ctx); err != nil {
return
}
return tx.All_StoragenodeBandwidthRollupPhase2_By_IntervalStart_GreaterOrEqual(ctx, storagenode_bandwidth_rollup_phase2_interval_start_greater_or_equal)
}
func (rx *Rx) All_StoragenodeBandwidthRollup_By_IntervalStart_GreaterOrEqual(ctx context.Context,
storagenode_bandwidth_rollup_interval_start_greater_or_equal StoragenodeBandwidthRollup_IntervalStart_Field) (
rows []*StoragenodeBandwidthRollup, err error) {
var tx *Tx
if tx, err = rx.getTx(ctx); err != nil {
return
}
return tx.All_StoragenodeBandwidthRollup_By_IntervalStart_GreaterOrEqual(ctx, storagenode_bandwidth_rollup_interval_start_greater_or_equal)
}
func (rx *Rx) All_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart(ctx context.Context,
storagenode_bandwidth_rollup_storagenode_id StoragenodeBandwidthRollup_StoragenodeId_Field,
storagenode_bandwidth_rollup_interval_start StoragenodeBandwidthRollup_IntervalStart_Field) (
@ -23100,6 +23158,30 @@ func (rx *Rx) Paged_PendingSerialQueue(ctx context.Context,
return tx.Paged_PendingSerialQueue(ctx, limit, start)
}
func (rx *Rx) Paged_StoragenodeBandwidthRollupPhase2_By_StoragenodeId_And_IntervalStart_GreaterOrEqual(ctx context.Context,
storagenode_bandwidth_rollup_phase2_storagenode_id StoragenodeBandwidthRollupPhase2_StoragenodeId_Field,
storagenode_bandwidth_rollup_phase2_interval_start_greater_or_equal StoragenodeBandwidthRollupPhase2_IntervalStart_Field,
limit int, start *Paged_StoragenodeBandwidthRollupPhase2_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation) (
rows []*StoragenodeBandwidthRollupPhase2, next *Paged_StoragenodeBandwidthRollupPhase2_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation, err error) {
var tx *Tx
if tx, err = rx.getTx(ctx); err != nil {
return
}
return tx.Paged_StoragenodeBandwidthRollupPhase2_By_StoragenodeId_And_IntervalStart_GreaterOrEqual(ctx, storagenode_bandwidth_rollup_phase2_storagenode_id, storagenode_bandwidth_rollup_phase2_interval_start_greater_or_equal, limit, start)
}
func (rx *Rx) Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual(ctx context.Context,
storagenode_bandwidth_rollup_storagenode_id StoragenodeBandwidthRollup_StoragenodeId_Field,
storagenode_bandwidth_rollup_interval_start_greater_or_equal StoragenodeBandwidthRollup_IntervalStart_Field,
limit int, start *Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation) (
rows []*StoragenodeBandwidthRollup, next *Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation, err error) {
var tx *Tx
if tx, err = rx.getTx(ctx); err != nil {
return
}
return tx.Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual(ctx, storagenode_bandwidth_rollup_storagenode_id, storagenode_bandwidth_rollup_interval_start_greater_or_equal, limit, start)
}
func (rx *Rx) ReplaceNoReturn_AccountingRollup(ctx context.Context,
accounting_rollup_node_id AccountingRollup_NodeId_Field,
accounting_rollup_start_time AccountingRollup_StartTime_Field,
@ -23456,14 +23538,6 @@ type Methods interface {
project_member_member_id ProjectMember_MemberId_Field) (
rows []*Project, err error)
All_StoragenodeBandwidthRollupPhase2_By_IntervalStart_GreaterOrEqual(ctx context.Context,
storagenode_bandwidth_rollup_phase2_interval_start_greater_or_equal StoragenodeBandwidthRollupPhase2_IntervalStart_Field) (
rows []*StoragenodeBandwidthRollupPhase2, err error)
All_StoragenodeBandwidthRollup_By_IntervalStart_GreaterOrEqual(ctx context.Context,
storagenode_bandwidth_rollup_interval_start_greater_or_equal StoragenodeBandwidthRollup_IntervalStart_Field) (
rows []*StoragenodeBandwidthRollup, err error)
All_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart(ctx context.Context,
storagenode_bandwidth_rollup_storagenode_id StoragenodeBandwidthRollup_StoragenodeId_Field,
storagenode_bandwidth_rollup_interval_start StoragenodeBandwidthRollup_IntervalStart_Field) (
@ -24095,6 +24169,18 @@ type Methods interface {
limit int, start *Paged_PendingSerialQueue_Continuation) (
rows []*PendingSerialQueue, next *Paged_PendingSerialQueue_Continuation, err error)
Paged_StoragenodeBandwidthRollupPhase2_By_StoragenodeId_And_IntervalStart_GreaterOrEqual(ctx context.Context,
storagenode_bandwidth_rollup_phase2_storagenode_id StoragenodeBandwidthRollupPhase2_StoragenodeId_Field,
storagenode_bandwidth_rollup_phase2_interval_start_greater_or_equal StoragenodeBandwidthRollupPhase2_IntervalStart_Field,
limit int, start *Paged_StoragenodeBandwidthRollupPhase2_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation) (
rows []*StoragenodeBandwidthRollupPhase2, next *Paged_StoragenodeBandwidthRollupPhase2_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation, err error)
Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual(ctx context.Context,
storagenode_bandwidth_rollup_storagenode_id StoragenodeBandwidthRollup_StoragenodeId_Field,
storagenode_bandwidth_rollup_interval_start_greater_or_equal StoragenodeBandwidthRollup_IntervalStart_Field,
limit int, start *Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation) (
rows []*StoragenodeBandwidthRollup, next *Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation, err error)
ReplaceNoReturn_AccountingRollup(ctx context.Context,
accounting_rollup_node_id AccountingRollup_NodeId_Field,
accounting_rollup_start_time AccountingRollup_StartTime_Field,

View File

@ -101,49 +101,109 @@ func (db *StoragenodeAccounting) GetTalliesSince(ctx context.Context, latestRoll
}
// GetBandwidthSince retrieves all storagenode_bandwidth_rollup entires since latestRollup.
func (db *StoragenodeAccounting) GetBandwidthSince(ctx context.Context, latestRollup time.Time) (out []*accounting.StoragenodeBandwidthRollup, err error) {
func (db *StoragenodeAccounting) GetBandwidthSince(ctx context.Context, latestRollup time.Time,
cb func(context.Context, *accounting.StoragenodeBandwidthRollup) error) (err error) {
defer mon.Task()(&ctx)(&err)
// get everything from the current rollups table
rollups, err := db.db.All_StoragenodeBandwidthRollup_By_IntervalStart_GreaterOrEqual(ctx,
dbx.StoragenodeBandwidthRollup_IntervalStart(latestRollup))
// This table's key structure is storagenode_id, interval_start, so we're going to try and make
// things easier on the database by making individual requests node by node. This is also
// going to allow us to avoid 16 minute queries.
rows, err := db.db.QueryContext(ctx, db.db.Rebind(`select distinct storagenode_id from storagenode_bandwidth_rollups`))
if err != nil {
return nil, Error.Wrap(err)
return err
}
defer func() {
err = errs.Combine(err, Error.Wrap(rows.Close()))
}()
for _, r := range rollups {
nodeID, err := storj.NodeIDFromBytes(r.StoragenodeId)
var nodeids [][]byte
for rows.Next() {
var nodeid []byte
err := rows.Scan(&nodeid)
if err != nil {
return nil, Error.Wrap(err)
return Error.Wrap(err)
}
out = append(out, &accounting.StoragenodeBandwidthRollup{
NodeID: nodeID,
IntervalStart: r.IntervalStart,
Action: r.Action,
Settled: r.Settled,
})
nodeids = append(nodeids, nodeid)
}
// include everything from the phase2 rollups table as well
phase2rollups, err := db.db.All_StoragenodeBandwidthRollupPhase2_By_IntervalStart_GreaterOrEqual(ctx,
dbx.StoragenodeBandwidthRollupPhase2_IntervalStart(latestRollup))
err = rows.Err()
if err != nil {
return nil, Error.Wrap(err)
return Error.Wrap(rows.Err())
}
for _, r := range phase2rollups {
nodeID, err := storj.NodeIDFromBytes(r.StoragenodeId)
if err != nil {
return nil, Error.Wrap(err)
pageLimit := db.db.opts.ReadRollupBatchSize
if pageLimit <= 0 {
pageLimit = 10000
}
for _, nodeid := range nodeids {
// for each node, let's page through all rollups
{
var cursor *dbx.Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation
for {
rollups, next, err := db.db.Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual(ctx,
dbx.StoragenodeBandwidthRollup_StoragenodeId(nodeid), dbx.StoragenodeBandwidthRollup_IntervalStart(latestRollup),
pageLimit, cursor)
if err != nil {
return Error.Wrap(err)
}
cursor = next
for _, r := range rollups {
nodeID, err := storj.NodeIDFromBytes(r.StoragenodeId)
if err != nil {
return Error.Wrap(err)
}
err = cb(ctx, &accounting.StoragenodeBandwidthRollup{
NodeID: nodeID,
IntervalStart: r.IntervalStart,
Action: r.Action,
Settled: r.Settled,
})
if err != nil {
return err
}
}
if len(rollups) < pageLimit {
break
}
}
}
// let's also do phase 2
{
var cursor *dbx.Paged_StoragenodeBandwidthRollupPhase2_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation
for {
rollups, next, err := db.db.Paged_StoragenodeBandwidthRollupPhase2_By_StoragenodeId_And_IntervalStart_GreaterOrEqual(ctx,
dbx.StoragenodeBandwidthRollupPhase2_StoragenodeId(nodeid), dbx.StoragenodeBandwidthRollupPhase2_IntervalStart(latestRollup),
pageLimit, cursor)
if err != nil {
return Error.Wrap(err)
}
cursor = next
for _, r := range rollups {
nodeID, err := storj.NodeIDFromBytes(r.StoragenodeId)
if err != nil {
return Error.Wrap(err)
}
err = cb(ctx, &accounting.StoragenodeBandwidthRollup{
NodeID: nodeID,
IntervalStart: r.IntervalStart,
Action: r.Action,
Settled: r.Settled,
})
if err != nil {
return err
}
}
if len(rollups) < pageLimit {
break
}
}
}
out = append(out, &accounting.StoragenodeBandwidthRollup{
NodeID: nodeID,
IntervalStart: r.IntervalStart,
Action: r.Action,
Settled: r.Settled,
})
}
return out, nil
return nil
}
// SaveRollup records raw tallies of at rest data to the database.

View File

@ -667,6 +667,9 @@ server.private-address: 127.0.0.1:7778
# how frequently the tally service should run
# tally.interval: 1h0m0s
# how large of batches GetBandwidthSince should process at a time
# tally.read-rollup-batch-size: 10000
# how large of batches SaveRollup should process at a time
# tally.save-rollup-batch-size: 1000