From 5d8a67a4f78d6db06d7f3d0f48f699a8eb202d4e Mon Sep 17 00:00:00 2001 From: JT Olio Date: Sun, 29 Nov 2020 12:59:27 -0700 Subject: [PATCH] satellitedb: retry GetBandwidthSince on cockroach Change-Id: I2bf20f3a19e7f3af97630d8a679410feba70661e --- .../satellitedb/storagenodeaccounting.go | 201 ++++++++++-------- 1 file changed, 118 insertions(+), 83 deletions(-) diff --git a/satellite/satellitedb/storagenodeaccounting.go b/satellite/satellitedb/storagenodeaccounting.go index 1222c384d..71cbb6eae 100644 --- a/satellite/satellitedb/storagenodeaccounting.go +++ b/satellite/satellitedb/storagenodeaccounting.go @@ -12,6 +12,7 @@ import ( "storj.io/common/storj" "storj.io/storj/private/dbutil" + "storj.io/storj/private/dbutil/cockroachutil" "storj.io/storj/private/dbutil/pgutil" "storj.io/storj/satellite/accounting" "storj.io/storj/satellite/compensation" @@ -100,6 +101,110 @@ func (db *StoragenodeAccounting) GetTalliesSince(ctx context.Context, latestRoll return out, Error.Wrap(err) } +func (db *StoragenodeAccounting) getNodeIds(ctx context.Context) (nodeids [][]byte, err error) { + defer mon.Task()(&ctx)(&err) + rows, err := db.db.QueryContext(ctx, db.db.Rebind(`select distinct storagenode_id from storagenode_bandwidth_rollups`)) + if err != nil { + return nil, Error.Wrap(err) + } + defer func() { + err = errs.Combine(err, Error.Wrap(rows.Close())) + }() + + for rows.Next() { + var nodeid []byte + err := rows.Scan(&nodeid) + if err != nil { + return nil, Error.Wrap(err) + } + nodeids = append(nodeids, nodeid) + } + err = rows.Err() + if err != nil { + return nil, Error.Wrap(rows.Err()) + } + + return nodeids, nil +} + +func (db *StoragenodeAccounting) getBandwidthByNodeSince(ctx context.Context, latestRollup time.Time, nodeid []byte, + cb func(context.Context, *accounting.StoragenodeBandwidthRollup) error) (err error) { + defer mon.Task()(&ctx)(&err) + + pageLimit := db.db.opts.ReadRollupBatchSize + if pageLimit <= 0 { + pageLimit = 10000 + } + + var cursor *dbx.Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation + for { + rollups, next, err := db.db.Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual(ctx, + dbx.StoragenodeBandwidthRollup_StoragenodeId(nodeid), dbx.StoragenodeBandwidthRollup_IntervalStart(latestRollup), + pageLimit, cursor) + if err != nil { + return Error.Wrap(err) + } + cursor = next + for _, r := range rollups { + nodeID, err := storj.NodeIDFromBytes(r.StoragenodeId) + if err != nil { + return Error.Wrap(err) + } + err = cb(ctx, &accounting.StoragenodeBandwidthRollup{ + NodeID: nodeID, + IntervalStart: r.IntervalStart, + Action: r.Action, + Settled: r.Settled, + }) + if err != nil { + return err + } + } + if len(rollups) < pageLimit { + return nil + } + } +} + +func (db *StoragenodeAccounting) getBandwidthPhase2ByNodeSince(ctx context.Context, latestRollup time.Time, nodeid []byte, + cb func(context.Context, *accounting.StoragenodeBandwidthRollup) error) (err error) { + defer mon.Task()(&ctx)(&err) + + pageLimit := db.db.opts.ReadRollupBatchSize + if pageLimit <= 0 { + pageLimit = 10000 + } + + var cursor *dbx.Paged_StoragenodeBandwidthRollupPhase2_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation + for { + rollups, next, err := db.db.Paged_StoragenodeBandwidthRollupPhase2_By_StoragenodeId_And_IntervalStart_GreaterOrEqual(ctx, + dbx.StoragenodeBandwidthRollupPhase2_StoragenodeId(nodeid), dbx.StoragenodeBandwidthRollupPhase2_IntervalStart(latestRollup), + pageLimit, cursor) + if err != nil { + return Error.Wrap(err) + } + cursor = next + for _, r := range rollups { + nodeID, err := storj.NodeIDFromBytes(r.StoragenodeId) + if err != nil { + return Error.Wrap(err) + } + err = cb(ctx, &accounting.StoragenodeBandwidthRollup{ + NodeID: nodeID, + IntervalStart: r.IntervalStart, + Action: r.Action, + Settled: r.Settled, + }) + if err != nil { + return err + } + } + if len(rollups) < pageLimit { + return nil + } + } +} + // GetBandwidthSince retrieves all storagenode_bandwidth_rollup entires since latestRollup. func (db *StoragenodeAccounting) GetBandwidthSince(ctx context.Context, latestRollup time.Time, cb func(context.Context, *accounting.StoragenodeBandwidthRollup) error) (err error) { @@ -108,97 +213,27 @@ func (db *StoragenodeAccounting) GetBandwidthSince(ctx context.Context, latestRo // This table's key structure is storagenode_id, interval_start, so we're going to try and make // things easier on the database by making individual requests node by node. This is also // going to allow us to avoid 16 minute queries. - rows, err := db.db.QueryContext(ctx, db.db.Rebind(`select distinct storagenode_id from storagenode_bandwidth_rollups`)) - if err != nil { - return err - } - defer func() { - err = errs.Combine(err, Error.Wrap(rows.Close())) - }() - var nodeids [][]byte - for rows.Next() { - var nodeid []byte - err := rows.Scan(&nodeid) + for { + nodeids, err = db.getNodeIds(ctx) if err != nil { - return Error.Wrap(err) + if cockroachutil.NeedsRetry(err) { + continue + } + return err } - nodeids = append(nodeids, nodeid) - } - err = rows.Err() - if err != nil { - return Error.Wrap(rows.Err()) - } - - pageLimit := db.db.opts.ReadRollupBatchSize - if pageLimit <= 0 { - pageLimit = 10000 + break } for _, nodeid := range nodeids { - - // for each node, let's page through all rollups - { - var cursor *dbx.Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation - for { - rollups, next, err := db.db.Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual(ctx, - dbx.StoragenodeBandwidthRollup_StoragenodeId(nodeid), dbx.StoragenodeBandwidthRollup_IntervalStart(latestRollup), - pageLimit, cursor) - if err != nil { - return Error.Wrap(err) - } - cursor = next - for _, r := range rollups { - nodeID, err := storj.NodeIDFromBytes(r.StoragenodeId) - if err != nil { - return Error.Wrap(err) - } - err = cb(ctx, &accounting.StoragenodeBandwidthRollup{ - NodeID: nodeID, - IntervalStart: r.IntervalStart, - Action: r.Action, - Settled: r.Settled, - }) - if err != nil { - return err - } - } - if len(rollups) < pageLimit { - break - } - } + err = db.getBandwidthByNodeSince(ctx, latestRollup, nodeid, cb) + if err != nil { + return err } - // let's also do phase 2 - { - var cursor *dbx.Paged_StoragenodeBandwidthRollupPhase2_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation - for { - rollups, next, err := db.db.Paged_StoragenodeBandwidthRollupPhase2_By_StoragenodeId_And_IntervalStart_GreaterOrEqual(ctx, - dbx.StoragenodeBandwidthRollupPhase2_StoragenodeId(nodeid), dbx.StoragenodeBandwidthRollupPhase2_IntervalStart(latestRollup), - pageLimit, cursor) - if err != nil { - return Error.Wrap(err) - } - cursor = next - for _, r := range rollups { - nodeID, err := storj.NodeIDFromBytes(r.StoragenodeId) - if err != nil { - return Error.Wrap(err) - } - err = cb(ctx, &accounting.StoragenodeBandwidthRollup{ - NodeID: nodeID, - IntervalStart: r.IntervalStart, - Action: r.Action, - Settled: r.Settled, - }) - if err != nil { - return err - } - } - if len(rollups) < pageLimit { - break - } - } + err = db.getBandwidthPhase2ByNodeSince(ctx, latestRollup, nodeid, cb) + if err != nil { + return err } }