satellitedb: retry GetBandwidthSince on cockroach
Change-Id: I2bf20f3a19e7f3af97630d8a679410feba70661e
This commit is contained in:
parent
5dc013d3bd
commit
5d8a67a4f7
@ -12,6 +12,7 @@ import (
|
||||
|
||||
"storj.io/common/storj"
|
||||
"storj.io/storj/private/dbutil"
|
||||
"storj.io/storj/private/dbutil/cockroachutil"
|
||||
"storj.io/storj/private/dbutil/pgutil"
|
||||
"storj.io/storj/satellite/accounting"
|
||||
"storj.io/storj/satellite/compensation"
|
||||
@ -100,6 +101,110 @@ func (db *StoragenodeAccounting) GetTalliesSince(ctx context.Context, latestRoll
|
||||
return out, Error.Wrap(err)
|
||||
}
|
||||
|
||||
func (db *StoragenodeAccounting) getNodeIds(ctx context.Context) (nodeids [][]byte, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
rows, err := db.db.QueryContext(ctx, db.db.Rebind(`select distinct storagenode_id from storagenode_bandwidth_rollups`))
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
defer func() {
|
||||
err = errs.Combine(err, Error.Wrap(rows.Close()))
|
||||
}()
|
||||
|
||||
for rows.Next() {
|
||||
var nodeid []byte
|
||||
err := rows.Scan(&nodeid)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
nodeids = append(nodeids, nodeid)
|
||||
}
|
||||
err = rows.Err()
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(rows.Err())
|
||||
}
|
||||
|
||||
return nodeids, nil
|
||||
}
|
||||
|
||||
func (db *StoragenodeAccounting) getBandwidthByNodeSince(ctx context.Context, latestRollup time.Time, nodeid []byte,
|
||||
cb func(context.Context, *accounting.StoragenodeBandwidthRollup) error) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
pageLimit := db.db.opts.ReadRollupBatchSize
|
||||
if pageLimit <= 0 {
|
||||
pageLimit = 10000
|
||||
}
|
||||
|
||||
var cursor *dbx.Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation
|
||||
for {
|
||||
rollups, next, err := db.db.Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual(ctx,
|
||||
dbx.StoragenodeBandwidthRollup_StoragenodeId(nodeid), dbx.StoragenodeBandwidthRollup_IntervalStart(latestRollup),
|
||||
pageLimit, cursor)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
cursor = next
|
||||
for _, r := range rollups {
|
||||
nodeID, err := storj.NodeIDFromBytes(r.StoragenodeId)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
err = cb(ctx, &accounting.StoragenodeBandwidthRollup{
|
||||
NodeID: nodeID,
|
||||
IntervalStart: r.IntervalStart,
|
||||
Action: r.Action,
|
||||
Settled: r.Settled,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if len(rollups) < pageLimit {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (db *StoragenodeAccounting) getBandwidthPhase2ByNodeSince(ctx context.Context, latestRollup time.Time, nodeid []byte,
|
||||
cb func(context.Context, *accounting.StoragenodeBandwidthRollup) error) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
pageLimit := db.db.opts.ReadRollupBatchSize
|
||||
if pageLimit <= 0 {
|
||||
pageLimit = 10000
|
||||
}
|
||||
|
||||
var cursor *dbx.Paged_StoragenodeBandwidthRollupPhase2_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation
|
||||
for {
|
||||
rollups, next, err := db.db.Paged_StoragenodeBandwidthRollupPhase2_By_StoragenodeId_And_IntervalStart_GreaterOrEqual(ctx,
|
||||
dbx.StoragenodeBandwidthRollupPhase2_StoragenodeId(nodeid), dbx.StoragenodeBandwidthRollupPhase2_IntervalStart(latestRollup),
|
||||
pageLimit, cursor)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
cursor = next
|
||||
for _, r := range rollups {
|
||||
nodeID, err := storj.NodeIDFromBytes(r.StoragenodeId)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
err = cb(ctx, &accounting.StoragenodeBandwidthRollup{
|
||||
NodeID: nodeID,
|
||||
IntervalStart: r.IntervalStart,
|
||||
Action: r.Action,
|
||||
Settled: r.Settled,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if len(rollups) < pageLimit {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetBandwidthSince retrieves all storagenode_bandwidth_rollup entires since latestRollup.
|
||||
func (db *StoragenodeAccounting) GetBandwidthSince(ctx context.Context, latestRollup time.Time,
|
||||
cb func(context.Context, *accounting.StoragenodeBandwidthRollup) error) (err error) {
|
||||
@ -108,97 +213,27 @@ func (db *StoragenodeAccounting) GetBandwidthSince(ctx context.Context, latestRo
|
||||
// This table's key structure is storagenode_id, interval_start, so we're going to try and make
|
||||
// things easier on the database by making individual requests node by node. This is also
|
||||
// going to allow us to avoid 16 minute queries.
|
||||
rows, err := db.db.QueryContext(ctx, db.db.Rebind(`select distinct storagenode_id from storagenode_bandwidth_rollups`))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
err = errs.Combine(err, Error.Wrap(rows.Close()))
|
||||
}()
|
||||
|
||||
var nodeids [][]byte
|
||||
for rows.Next() {
|
||||
var nodeid []byte
|
||||
err := rows.Scan(&nodeid)
|
||||
for {
|
||||
nodeids, err = db.getNodeIds(ctx)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
if cockroachutil.NeedsRetry(err) {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
nodeids = append(nodeids, nodeid)
|
||||
}
|
||||
err = rows.Err()
|
||||
if err != nil {
|
||||
return Error.Wrap(rows.Err())
|
||||
}
|
||||
|
||||
pageLimit := db.db.opts.ReadRollupBatchSize
|
||||
if pageLimit <= 0 {
|
||||
pageLimit = 10000
|
||||
break
|
||||
}
|
||||
|
||||
for _, nodeid := range nodeids {
|
||||
|
||||
// for each node, let's page through all rollups
|
||||
{
|
||||
var cursor *dbx.Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation
|
||||
for {
|
||||
rollups, next, err := db.db.Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual(ctx,
|
||||
dbx.StoragenodeBandwidthRollup_StoragenodeId(nodeid), dbx.StoragenodeBandwidthRollup_IntervalStart(latestRollup),
|
||||
pageLimit, cursor)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
cursor = next
|
||||
for _, r := range rollups {
|
||||
nodeID, err := storj.NodeIDFromBytes(r.StoragenodeId)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
err = cb(ctx, &accounting.StoragenodeBandwidthRollup{
|
||||
NodeID: nodeID,
|
||||
IntervalStart: r.IntervalStart,
|
||||
Action: r.Action,
|
||||
Settled: r.Settled,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if len(rollups) < pageLimit {
|
||||
break
|
||||
}
|
||||
}
|
||||
err = db.getBandwidthByNodeSince(ctx, latestRollup, nodeid, cb)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// let's also do phase 2
|
||||
{
|
||||
var cursor *dbx.Paged_StoragenodeBandwidthRollupPhase2_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation
|
||||
for {
|
||||
rollups, next, err := db.db.Paged_StoragenodeBandwidthRollupPhase2_By_StoragenodeId_And_IntervalStart_GreaterOrEqual(ctx,
|
||||
dbx.StoragenodeBandwidthRollupPhase2_StoragenodeId(nodeid), dbx.StoragenodeBandwidthRollupPhase2_IntervalStart(latestRollup),
|
||||
pageLimit, cursor)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
cursor = next
|
||||
for _, r := range rollups {
|
||||
nodeID, err := storj.NodeIDFromBytes(r.StoragenodeId)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
err = cb(ctx, &accounting.StoragenodeBandwidthRollup{
|
||||
NodeID: nodeID,
|
||||
IntervalStart: r.IntervalStart,
|
||||
Action: r.Action,
|
||||
Settled: r.Settled,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if len(rollups) < pageLimit {
|
||||
break
|
||||
}
|
||||
}
|
||||
err = db.getBandwidthPhase2ByNodeSince(ctx, latestRollup, nodeid, cb)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user