diff --git a/cmd/satellite/compensation.go b/cmd/satellite/compensation.go index 7f0b721c9..5de645e5f 100644 --- a/cmd/satellite/compensation.go +++ b/cmd/satellite/compensation.go @@ -12,7 +12,10 @@ import ( "github.com/zeebo/errs" "go.uber.org/zap" + "storj.io/common/storj" + "storj.io/storj/satellite/accounting" "storj.io/storj/satellite/compensation" + "storj.io/storj/satellite/overlay" "storj.io/storj/satellite/satellitedb" ) @@ -48,36 +51,49 @@ func generateInvoicesCSV(ctx context.Context, period compensation.Period, out io return err } - invoices := make([]compensation.Invoice, 0, len(periodUsage)) + periodUsageByNode := make(map[storj.NodeID]accounting.StorageNodePeriodUsage, len(periodUsage)) for _, usage := range periodUsage { - totalAmounts, err := db.Compensation().QueryTotalAmounts(ctx, usage.NodeID) + periodUsageByNode[usage.NodeID] = usage + } + + var allNodes []*overlay.NodeDossier + err = db.OverlayCache().IterateAllNodeDossiers(ctx, + func(ctx context.Context, node *overlay.NodeDossier) error { + allNodes = append(allNodes, node) + return nil + }) + if err != nil { + return err + } + + invoices := make([]compensation.Invoice, 0, len(allNodes)) + for _, node := range allNodes { + totalAmounts, err := db.Compensation().QueryTotalAmounts(ctx, node.Id) if err != nil { return err } - node, err := db.OverlayCache().Get(ctx, usage.NodeID) - if err != nil { - zap.L().Warn("failed to get node, skipping", zap.String("nodeID", usage.NodeID.String()), zap.Error(err)) - continue - } var gracefulExit *time.Time if node.ExitStatus.ExitSuccess { gracefulExit = node.ExitStatus.ExitFinishedAt } nodeAddress, _, err := net.SplitHostPort(node.Address.Address) if err != nil { - return errs.New("unable to split node %q address %q", usage.NodeID, node.Address.Address) + return errs.New("unable to split node %q address %q", node.Id, node.Address.Address) } var nodeLastIP string if node.LastIPPort != "" { nodeLastIP, _, err = net.SplitHostPort(node.LastIPPort) if err != nil { - return errs.New("unable to split node %q last ip:port %q", usage.NodeID, node.LastIPPort) + return errs.New("unable to split node %q last ip:port %q", node.Id, node.LastIPPort) } } + // the zero value of period usage is acceptable for if the node does not have + // any usage for the period. + usage := periodUsageByNode[node.Id] nodeInfo := compensation.NodeInfo{ - ID: usage.NodeID, + ID: node.Id, CreatedAt: node.CreatedAt, LastContactSuccess: node.Reputation.LastContactSuccess, Disqualified: node.Disqualified, @@ -96,7 +112,7 @@ func generateInvoicesCSV(ctx context.Context, period compensation.Period, out io invoice := compensation.Invoice{ Period: period, - NodeID: compensation.NodeID(usage.NodeID), + NodeID: compensation.NodeID(node.Id), NodeWallet: node.Operator.Wallet, NodeWalletFeatures: node.Operator.WalletFeatures, NodeAddress: nodeAddress, diff --git a/satellite/overlay/service.go b/satellite/overlay/service.go index b62b6530b..4a942247a 100644 --- a/satellite/overlay/service.go +++ b/satellite/overlay/service.go @@ -106,6 +106,8 @@ type DB interface { // IterateAllNodes will call cb on all known nodes (used in restore trash contexts). IterateAllNodes(context.Context, func(context.Context, *SelectedNode) error) error + // IterateAllNodes will call cb on all known nodes (used for invoice generation). + IterateAllNodeDossiers(context.Context, func(context.Context, *NodeDossier) error) error } // NodeCheckInInfo contains all the info that will be updated when a node checkins. diff --git a/satellite/satellitedb/dbx/satellitedb.dbx b/satellite/satellitedb/dbx/satellitedb.dbx index fa812974e..775a03d4c 100644 --- a/satellite/satellitedb/dbx/satellitedb.dbx +++ b/satellite/satellitedb/dbx/satellitedb.dbx @@ -144,7 +144,7 @@ model node ( field vetted_at timestamp ( updatable, nullable ) field uptime_success_count int64 ( updatable, default 0 ) field total_uptime_count int64 ( updatable, default 0 ) - + field created_at timestamp ( autoinsert, default current_timestamp ) field updated_at timestamp ( autoinsert, autoupdate, default current_timestamp ) field last_contact_success timestamp ( updatable, default "epoch" ) @@ -192,9 +192,13 @@ read all ( select node.id ) +read paged ( + select node +) + read all ( - select node.id node.piece_count - where node.piece_count != 0 + select node.id node.piece_count + where node.piece_count != 0 ) //--- audit history ---// diff --git a/satellite/satellitedb/dbx/satellitedb.dbx.go b/satellite/satellitedb/dbx/satellitedb.dbx.go index 721a26b3a..687ac58f7 100644 --- a/satellite/satellitedb/dbx/satellitedb.dbx.go +++ b/satellite/satellitedb/dbx/satellitedb.dbx.go @@ -9146,6 +9146,11 @@ type Paged_BucketBandwidthRollup_By_IntervalStart_GreaterOrEqual_Continuation st _set bool } +type Paged_Node_Continuation struct { + _value_id []byte + _set bool +} + type Paged_StoragenodeBandwidthRollupArchive_By_IntervalStart_GreaterOrEqual_Continuation struct { _value_storagenode_id []byte _value_interval_start time.Time @@ -10293,6 +10298,65 @@ func (obj *pgxImpl) All_Node_Id(ctx context.Context) ( } +func (obj *pgxImpl) Paged_Node(ctx context.Context, + limit int, start *Paged_Node_Continuation) ( + rows []*Node, next *Paged_Node_Continuation, err error) { + defer mon.Task()(&ctx)(&err) + + var __embed_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.address, nodes.last_net, nodes.last_ip_port, nodes.protocol, nodes.type, nodes.email, nodes.wallet, nodes.wallet_features, nodes.free_disk, nodes.piece_count, nodes.major, nodes.minor, nodes.patch, nodes.hash, nodes.timestamp, nodes.release, nodes.latency_90, nodes.audit_success_count, nodes.total_audit_count, nodes.vetted_at, nodes.uptime_success_count, nodes.total_uptime_count, nodes.created_at, nodes.updated_at, nodes.last_contact_success, nodes.last_contact_failure, nodes.contained, nodes.disqualified, nodes.suspended, nodes.unknown_audit_suspended, nodes.offline_suspended, nodes.under_review, nodes.online_score, nodes.audit_reputation_alpha, nodes.audit_reputation_beta, nodes.unknown_audit_reputation_alpha, nodes.unknown_audit_reputation_beta, nodes.exit_initiated_at, nodes.exit_loop_completed_at, nodes.exit_finished_at, nodes.exit_success, nodes.id FROM nodes WHERE (nodes.id) > ? ORDER BY nodes.id LIMIT ?") + + var __embed_first_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.address, nodes.last_net, nodes.last_ip_port, nodes.protocol, nodes.type, nodes.email, nodes.wallet, nodes.wallet_features, nodes.free_disk, nodes.piece_count, nodes.major, nodes.minor, nodes.patch, nodes.hash, nodes.timestamp, nodes.release, nodes.latency_90, nodes.audit_success_count, nodes.total_audit_count, nodes.vetted_at, nodes.uptime_success_count, nodes.total_uptime_count, nodes.created_at, nodes.updated_at, nodes.last_contact_success, nodes.last_contact_failure, nodes.contained, nodes.disqualified, nodes.suspended, nodes.unknown_audit_suspended, nodes.offline_suspended, nodes.under_review, nodes.online_score, nodes.audit_reputation_alpha, nodes.audit_reputation_beta, nodes.unknown_audit_reputation_alpha, nodes.unknown_audit_reputation_beta, nodes.exit_initiated_at, nodes.exit_loop_completed_at, nodes.exit_finished_at, nodes.exit_success, nodes.id FROM nodes ORDER BY nodes.id LIMIT ?") + + var __values []interface{} + + var __stmt string + if start != nil && start._set { + __values = append(__values, start._value_id, limit) + __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) + } else { + __values = append(__values, limit) + __stmt = __sqlbundle_Render(obj.dialect, __embed_first_stmt) + } + obj.logStmt(__stmt, __values...) + + for { + rows, next, err = func() (rows []*Node, next *Paged_Node_Continuation, err error) { + __rows, err := obj.driver.QueryContext(ctx, __stmt, __values...) + if err != nil { + return nil, nil, err + } + defer __rows.Close() + + var __continuation Paged_Node_Continuation + __continuation._set = true + + for __rows.Next() { + node := &Node{} + err = __rows.Scan(&node.Id, &node.Address, &node.LastNet, &node.LastIpPort, &node.Protocol, &node.Type, &node.Email, &node.Wallet, &node.WalletFeatures, &node.FreeDisk, &node.PieceCount, &node.Major, &node.Minor, &node.Patch, &node.Hash, &node.Timestamp, &node.Release, &node.Latency90, &node.AuditSuccessCount, &node.TotalAuditCount, &node.VettedAt, &node.UptimeSuccessCount, &node.TotalUptimeCount, &node.CreatedAt, &node.UpdatedAt, &node.LastContactSuccess, &node.LastContactFailure, &node.Contained, &node.Disqualified, &node.Suspended, &node.UnknownAuditSuspended, &node.OfflineSuspended, &node.UnderReview, &node.OnlineScore, &node.AuditReputationAlpha, &node.AuditReputationBeta, &node.UnknownAuditReputationAlpha, &node.UnknownAuditReputationBeta, &node.ExitInitiatedAt, &node.ExitLoopCompletedAt, &node.ExitFinishedAt, &node.ExitSuccess, &__continuation._value_id) + if err != nil { + return nil, nil, err + } + rows = append(rows, node) + next = &__continuation + } + + if err := __rows.Err(); err != nil { + return nil, nil, err + } + + return rows, next, nil + }() + if err != nil { + if obj.shouldRetry(err) { + continue + } + return nil, nil, obj.makeErr(err) + } + return rows, next, nil + } + +} + func (obj *pgxImpl) All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number(ctx context.Context) ( rows []*Id_PieceCount_Row, err error) { defer mon.Task()(&ctx)(&err) @@ -15727,6 +15791,65 @@ func (obj *pgxcockroachImpl) All_Node_Id(ctx context.Context) ( } +func (obj *pgxcockroachImpl) Paged_Node(ctx context.Context, + limit int, start *Paged_Node_Continuation) ( + rows []*Node, next *Paged_Node_Continuation, err error) { + defer mon.Task()(&ctx)(&err) + + var __embed_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.address, nodes.last_net, nodes.last_ip_port, nodes.protocol, nodes.type, nodes.email, nodes.wallet, nodes.wallet_features, nodes.free_disk, nodes.piece_count, nodes.major, nodes.minor, nodes.patch, nodes.hash, nodes.timestamp, nodes.release, nodes.latency_90, nodes.audit_success_count, nodes.total_audit_count, nodes.vetted_at, nodes.uptime_success_count, nodes.total_uptime_count, nodes.created_at, nodes.updated_at, nodes.last_contact_success, nodes.last_contact_failure, nodes.contained, nodes.disqualified, nodes.suspended, nodes.unknown_audit_suspended, nodes.offline_suspended, nodes.under_review, nodes.online_score, nodes.audit_reputation_alpha, nodes.audit_reputation_beta, nodes.unknown_audit_reputation_alpha, nodes.unknown_audit_reputation_beta, nodes.exit_initiated_at, nodes.exit_loop_completed_at, nodes.exit_finished_at, nodes.exit_success, nodes.id FROM nodes WHERE (nodes.id) > ? ORDER BY nodes.id LIMIT ?") + + var __embed_first_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.address, nodes.last_net, nodes.last_ip_port, nodes.protocol, nodes.type, nodes.email, nodes.wallet, nodes.wallet_features, nodes.free_disk, nodes.piece_count, nodes.major, nodes.minor, nodes.patch, nodes.hash, nodes.timestamp, nodes.release, nodes.latency_90, nodes.audit_success_count, nodes.total_audit_count, nodes.vetted_at, nodes.uptime_success_count, nodes.total_uptime_count, nodes.created_at, nodes.updated_at, nodes.last_contact_success, nodes.last_contact_failure, nodes.contained, nodes.disqualified, nodes.suspended, nodes.unknown_audit_suspended, nodes.offline_suspended, nodes.under_review, nodes.online_score, nodes.audit_reputation_alpha, nodes.audit_reputation_beta, nodes.unknown_audit_reputation_alpha, nodes.unknown_audit_reputation_beta, nodes.exit_initiated_at, nodes.exit_loop_completed_at, nodes.exit_finished_at, nodes.exit_success, nodes.id FROM nodes ORDER BY nodes.id LIMIT ?") + + var __values []interface{} + + var __stmt string + if start != nil && start._set { + __values = append(__values, start._value_id, limit) + __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) + } else { + __values = append(__values, limit) + __stmt = __sqlbundle_Render(obj.dialect, __embed_first_stmt) + } + obj.logStmt(__stmt, __values...) + + for { + rows, next, err = func() (rows []*Node, next *Paged_Node_Continuation, err error) { + __rows, err := obj.driver.QueryContext(ctx, __stmt, __values...) + if err != nil { + return nil, nil, err + } + defer __rows.Close() + + var __continuation Paged_Node_Continuation + __continuation._set = true + + for __rows.Next() { + node := &Node{} + err = __rows.Scan(&node.Id, &node.Address, &node.LastNet, &node.LastIpPort, &node.Protocol, &node.Type, &node.Email, &node.Wallet, &node.WalletFeatures, &node.FreeDisk, &node.PieceCount, &node.Major, &node.Minor, &node.Patch, &node.Hash, &node.Timestamp, &node.Release, &node.Latency90, &node.AuditSuccessCount, &node.TotalAuditCount, &node.VettedAt, &node.UptimeSuccessCount, &node.TotalUptimeCount, &node.CreatedAt, &node.UpdatedAt, &node.LastContactSuccess, &node.LastContactFailure, &node.Contained, &node.Disqualified, &node.Suspended, &node.UnknownAuditSuspended, &node.OfflineSuspended, &node.UnderReview, &node.OnlineScore, &node.AuditReputationAlpha, &node.AuditReputationBeta, &node.UnknownAuditReputationAlpha, &node.UnknownAuditReputationBeta, &node.ExitInitiatedAt, &node.ExitLoopCompletedAt, &node.ExitFinishedAt, &node.ExitSuccess, &__continuation._value_id) + if err != nil { + return nil, nil, err + } + rows = append(rows, node) + next = &__continuation + } + + if err := __rows.Err(); err != nil { + return nil, nil, err + } + + return rows, next, nil + }() + if err != nil { + if obj.shouldRetry(err) { + continue + } + return nil, nil, obj.makeErr(err) + } + return rows, next, nil + } + +} + func (obj *pgxcockroachImpl) All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number(ctx context.Context) ( rows []*Id_PieceCount_Row, err error) { defer mon.Task()(&ctx)(&err) @@ -21263,6 +21386,16 @@ func (rx *Rx) Paged_BucketBandwidthRollup_By_IntervalStart_GreaterOrEqual(ctx co return tx.Paged_BucketBandwidthRollup_By_IntervalStart_GreaterOrEqual(ctx, bucket_bandwidth_rollup_interval_start_greater_or_equal, limit, start) } +func (rx *Rx) Paged_Node(ctx context.Context, + limit int, start *Paged_Node_Continuation) ( + rows []*Node, next *Paged_Node_Continuation, err error) { + var tx *Tx + if tx, err = rx.getTx(ctx); err != nil { + return + } + return tx.Paged_Node(ctx, limit, start) +} + func (rx *Rx) Paged_StoragenodeBandwidthRollupArchive_By_IntervalStart_GreaterOrEqual(ctx context.Context, storagenode_bandwidth_rollup_archive_interval_start_greater_or_equal StoragenodeBandwidthRollupArchive_IntervalStart_Field, limit int, start *Paged_StoragenodeBandwidthRollupArchive_By_IntervalStart_GreaterOrEqual_Continuation) ( @@ -22074,6 +22207,10 @@ type Methods interface { limit int, start *Paged_BucketBandwidthRollup_By_IntervalStart_GreaterOrEqual_Continuation) ( rows []*BucketBandwidthRollup, next *Paged_BucketBandwidthRollup_By_IntervalStart_GreaterOrEqual_Continuation, err error) + Paged_Node(ctx context.Context, + limit int, start *Paged_Node_Continuation) ( + rows []*Node, next *Paged_Node_Continuation, err error) + Paged_StoragenodeBandwidthRollupArchive_By_IntervalStart_GreaterOrEqual(ctx context.Context, storagenode_bandwidth_rollup_archive_interval_start_greater_or_equal StoragenodeBandwidthRollupArchive_IntervalStart_Field, limit int, start *Paged_StoragenodeBandwidthRollupArchive_By_IntervalStart_GreaterOrEqual_Continuation) ( diff --git a/satellite/satellitedb/migrate.go b/satellite/satellitedb/migrate.go index 2f81e9f5a..94c6db0b9 100644 --- a/satellite/satellitedb/migrate.go +++ b/satellite/satellitedb/migrate.go @@ -256,10 +256,10 @@ func (db *satelliteDB) PostgresMigration() *migrate.Migration { exit_initiated_at timestamp with time zone, exit_finished_at timestamp with time zone, exit_success boolean NOT NULL DEFAULT FALSE, - last_ip_port text, - suspended timestamp with time zone, unknown_audit_reputation_alpha double precision NOT NULL DEFAULT 1, unknown_audit_reputation_beta double precision NOT NULL DEFAULT 0, + suspended timestamp with time zone, + last_ip_port text, vetted_at timestamp with time zone, PRIMARY KEY ( id ) );`, diff --git a/satellite/satellitedb/overlaycache.go b/satellite/satellitedb/overlaycache.go index 4182c082f..e2396aa29 100644 --- a/satellite/satellitedb/overlaycache.go +++ b/satellite/satellitedb/overlaycache.go @@ -1802,3 +1802,33 @@ func (cache *overlaycache) IterateAllNodes(ctx context.Context, cb func(context. return rows.Err() } + +// IterateAllNodeDossiers will call cb on all known nodes (used for invoice generation). +func (cache *overlaycache) IterateAllNodeDossiers(ctx context.Context, cb func(context.Context, *overlay.NodeDossier) error) (err error) { + defer mon.Task()(&ctx)(&err) + + const nodesPerPage = 1000 + var cont *dbx.Paged_Node_Continuation + var dbxNodes []*dbx.Node + + for { + dbxNodes, cont, err = cache.db.Paged_Node(ctx, nodesPerPage, cont) + if err != nil { + return err + } + + for _, node := range dbxNodes { + dossier, err := convertDBNode(ctx, node) + if err != nil { + return err + } + if err := cb(ctx, dossier); err != nil { + return err + } + } + + if cont == nil { + return nil + } + } +} diff --git a/satellite/satellitedb/projectaccounting.go b/satellite/satellitedb/projectaccounting.go index 2f9d1aebc..4efc11fcf 100644 --- a/satellite/satellitedb/projectaccounting.go +++ b/satellite/satellitedb/projectaccounting.go @@ -249,17 +249,17 @@ func (db *ProjectAccounting) GetProjectTotal(ctx context.Context, projectID uuid storageQuery := db.db.Rebind(` SELECT - bucket_storage_tallies.interval_start, + bucket_storage_tallies.interval_start, bucket_storage_tallies.inline, bucket_storage_tallies.remote, bucket_storage_tallies.object_count - FROM - bucket_storage_tallies - WHERE - bucket_storage_tallies.project_id = ? AND + FROM + bucket_storage_tallies + WHERE + bucket_storage_tallies.project_id = ? AND bucket_storage_tallies.bucket_name = ? AND - bucket_storage_tallies.interval_start >= ? AND - bucket_storage_tallies.interval_start <= ? + bucket_storage_tallies.interval_start >= ? AND + bucket_storage_tallies.interval_start <= ? ORDER BY bucket_storage_tallies.interval_start DESC `) @@ -320,14 +320,14 @@ func (db *ProjectAccounting) GetProjectTotal(ctx context.Context, projectID uuid // only process PieceAction_GET. func (db *ProjectAccounting) getTotalEgress(ctx context.Context, projectID uuid.UUID, since, before time.Time) (totalEgress int64, err error) { totalEgressQuery := db.db.Rebind(` - SELECT - COALESCE(SUM(settled) + SUM(inline), 0) - FROM - bucket_bandwidth_rollups - WHERE - project_id = ? AND - interval_start >= ? AND - interval_start <= ? AND + SELECT + COALESCE(SUM(settled) + SUM(inline), 0) + FROM + bucket_bandwidth_rollups + WHERE + project_id = ? AND + interval_start >= ? AND + interval_start <= ? AND action = ?; `) @@ -542,7 +542,7 @@ func (db *ProjectAccounting) GetBucketTotals(ctx context.Context, projectID uuid } var buckets []string - bucketsQuery := db.db.Rebind(`SELECT name FROM bucket_metainfos + bucketsQuery := db.db.Rebind(`SELECT name FROM bucket_metainfos WHERE project_id = ? AND ` + bucketNameRange + `ORDER BY name ASC LIMIT ? OFFSET ?`) args = []interface{}{ @@ -645,15 +645,15 @@ func (db *ProjectAccounting) ArchiveRollupsBefore(ctx context.Context, before ti for { row := db.db.QueryRow(ctx, ` WITH rollups_to_move AS ( - DELETE FROM bucket_bandwidth_rollups - WHERE interval_start <= $1 - LIMIT $2 RETURNING * - ), moved_rollups AS ( + DELETE FROM bucket_bandwidth_rollups + WHERE interval_start <= $1 + LIMIT $2 RETURNING * + ), moved_rollups AS ( INSERT INTO bucket_bandwidth_rollup_archives(bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled) SELECT bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled FROM rollups_to_move RETURNING * - ) - SELECT count(*) FROM moved_rollups + ) + SELECT count(*) FROM moved_rollups `, before, batchSize) var rowCount int @@ -677,8 +677,8 @@ func (db *ProjectAccounting) ArchiveRollupsBefore(ctx context.Context, before ti INSERT INTO bucket_bandwidth_rollup_archives(bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled) SELECT bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled FROM rollups_to_move RETURNING * - ) - SELECT count(*) FROM moved_rollups + ) + SELECT count(*) FROM moved_rollups ` row := db.db.DB.QueryRow(ctx, bwStatement, before) var rowCount int @@ -697,7 +697,7 @@ func (db *ProjectAccounting) getBucketsSinceAndBefore(ctx context.Context, proje bucketsQuery := db.db.Rebind(`SELECT DISTINCT bucket_name FROM bucket_storage_tallies WHERE project_id = ? - AND interval_start >= ? + AND interval_start >= ? AND interval_start <= ?`) bucketRows, err := db.db.QueryContext(ctx, bucketsQuery, projectID[:], since, before) if err != nil { @@ -773,7 +773,7 @@ func (db *ProjectAccounting) GetRollupsSince(ctx context.Context, since time.Tim Settled: int64(dbxRollup.Settled), }) } - if len(dbxRollups) < pageLimit { + if cursor == nil { return bwRollups, nil } } @@ -811,7 +811,7 @@ func (db *ProjectAccounting) GetArchivedRollupsSince(ctx context.Context, since Settled: int64(dbxRollup.Settled), }) } - if len(dbxRollups) < pageLimit { + if cursor == nil { return bwRollups, nil } } diff --git a/satellite/satellitedb/storagenodeaccounting.go b/satellite/satellitedb/storagenodeaccounting.go index a5372c6cf..8ec534522 100644 --- a/satellite/satellitedb/storagenodeaccounting.go +++ b/satellite/satellitedb/storagenodeaccounting.go @@ -157,7 +157,7 @@ func (db *StoragenodeAccounting) getBandwidthByNodeSince(ctx context.Context, la return err } } - if len(rollups) < pageLimit { + if cursor == nil { return nil } } @@ -196,7 +196,7 @@ func (db *StoragenodeAccounting) getBandwidthPhase2ByNodeSince(ctx context.Conte return err } } - if len(rollups) < pageLimit { + if cursor == nil { return nil } } @@ -611,7 +611,7 @@ func (db *StoragenodeAccounting) GetRollupsSince(ctx context.Context, since time Settled: dbxRollup.Settled, }) } - if len(dbxRollups) < pageLimit { + if cursor == nil { return bwRollups, nil } } @@ -647,7 +647,7 @@ func (db *StoragenodeAccounting) GetArchivedRollupsSince(ctx context.Context, si Settled: dbxRollup.Settled, }) } - if len(dbxRollups) < pageLimit { + if cursor == nil { return bwRollups, nil } }