compensation: always generate invoices for every node
instead of only generating invoices for nodes that had some activity, we generate it for every node so that we can find and pay terminal nodes that did not meet thresholds before we recognized them as terminal. Change-Id: Ibb3433e1b35f1ddcfbe292c034238c9fa1b66c44
This commit is contained in:
parent
035c393da0
commit
a65aecfd98
@ -12,7 +12,10 @@ import (
|
|||||||
"github.com/zeebo/errs"
|
"github.com/zeebo/errs"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"storj.io/common/storj"
|
||||||
|
"storj.io/storj/satellite/accounting"
|
||||||
"storj.io/storj/satellite/compensation"
|
"storj.io/storj/satellite/compensation"
|
||||||
|
"storj.io/storj/satellite/overlay"
|
||||||
"storj.io/storj/satellite/satellitedb"
|
"storj.io/storj/satellite/satellitedb"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -48,36 +51,49 @@ func generateInvoicesCSV(ctx context.Context, period compensation.Period, out io
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
invoices := make([]compensation.Invoice, 0, len(periodUsage))
|
periodUsageByNode := make(map[storj.NodeID]accounting.StorageNodePeriodUsage, len(periodUsage))
|
||||||
for _, usage := range periodUsage {
|
for _, usage := range periodUsage {
|
||||||
totalAmounts, err := db.Compensation().QueryTotalAmounts(ctx, usage.NodeID)
|
periodUsageByNode[usage.NodeID] = usage
|
||||||
|
}
|
||||||
|
|
||||||
|
var allNodes []*overlay.NodeDossier
|
||||||
|
err = db.OverlayCache().IterateAllNodeDossiers(ctx,
|
||||||
|
func(ctx context.Context, node *overlay.NodeDossier) error {
|
||||||
|
allNodes = append(allNodes, node)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
invoices := make([]compensation.Invoice, 0, len(allNodes))
|
||||||
|
for _, node := range allNodes {
|
||||||
|
totalAmounts, err := db.Compensation().QueryTotalAmounts(ctx, node.Id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
node, err := db.OverlayCache().Get(ctx, usage.NodeID)
|
|
||||||
if err != nil {
|
|
||||||
zap.L().Warn("failed to get node, skipping", zap.String("nodeID", usage.NodeID.String()), zap.Error(err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
var gracefulExit *time.Time
|
var gracefulExit *time.Time
|
||||||
if node.ExitStatus.ExitSuccess {
|
if node.ExitStatus.ExitSuccess {
|
||||||
gracefulExit = node.ExitStatus.ExitFinishedAt
|
gracefulExit = node.ExitStatus.ExitFinishedAt
|
||||||
}
|
}
|
||||||
nodeAddress, _, err := net.SplitHostPort(node.Address.Address)
|
nodeAddress, _, err := net.SplitHostPort(node.Address.Address)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errs.New("unable to split node %q address %q", usage.NodeID, node.Address.Address)
|
return errs.New("unable to split node %q address %q", node.Id, node.Address.Address)
|
||||||
}
|
}
|
||||||
var nodeLastIP string
|
var nodeLastIP string
|
||||||
if node.LastIPPort != "" {
|
if node.LastIPPort != "" {
|
||||||
nodeLastIP, _, err = net.SplitHostPort(node.LastIPPort)
|
nodeLastIP, _, err = net.SplitHostPort(node.LastIPPort)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errs.New("unable to split node %q last ip:port %q", usage.NodeID, node.LastIPPort)
|
return errs.New("unable to split node %q last ip:port %q", node.Id, node.LastIPPort)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// the zero value of period usage is acceptable for if the node does not have
|
||||||
|
// any usage for the period.
|
||||||
|
usage := periodUsageByNode[node.Id]
|
||||||
nodeInfo := compensation.NodeInfo{
|
nodeInfo := compensation.NodeInfo{
|
||||||
ID: usage.NodeID,
|
ID: node.Id,
|
||||||
CreatedAt: node.CreatedAt,
|
CreatedAt: node.CreatedAt,
|
||||||
LastContactSuccess: node.Reputation.LastContactSuccess,
|
LastContactSuccess: node.Reputation.LastContactSuccess,
|
||||||
Disqualified: node.Disqualified,
|
Disqualified: node.Disqualified,
|
||||||
@ -96,7 +112,7 @@ func generateInvoicesCSV(ctx context.Context, period compensation.Period, out io
|
|||||||
|
|
||||||
invoice := compensation.Invoice{
|
invoice := compensation.Invoice{
|
||||||
Period: period,
|
Period: period,
|
||||||
NodeID: compensation.NodeID(usage.NodeID),
|
NodeID: compensation.NodeID(node.Id),
|
||||||
NodeWallet: node.Operator.Wallet,
|
NodeWallet: node.Operator.Wallet,
|
||||||
NodeWalletFeatures: node.Operator.WalletFeatures,
|
NodeWalletFeatures: node.Operator.WalletFeatures,
|
||||||
NodeAddress: nodeAddress,
|
NodeAddress: nodeAddress,
|
||||||
|
@ -106,6 +106,8 @@ type DB interface {
|
|||||||
|
|
||||||
// IterateAllNodes will call cb on all known nodes (used in restore trash contexts).
|
// IterateAllNodes will call cb on all known nodes (used in restore trash contexts).
|
||||||
IterateAllNodes(context.Context, func(context.Context, *SelectedNode) error) error
|
IterateAllNodes(context.Context, func(context.Context, *SelectedNode) error) error
|
||||||
|
// IterateAllNodes will call cb on all known nodes (used for invoice generation).
|
||||||
|
IterateAllNodeDossiers(context.Context, func(context.Context, *NodeDossier) error) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// NodeCheckInInfo contains all the info that will be updated when a node checkins.
|
// NodeCheckInInfo contains all the info that will be updated when a node checkins.
|
||||||
|
@ -144,7 +144,7 @@ model node (
|
|||||||
field vetted_at timestamp ( updatable, nullable )
|
field vetted_at timestamp ( updatable, nullable )
|
||||||
field uptime_success_count int64 ( updatable, default 0 )
|
field uptime_success_count int64 ( updatable, default 0 )
|
||||||
field total_uptime_count int64 ( updatable, default 0 )
|
field total_uptime_count int64 ( updatable, default 0 )
|
||||||
|
|
||||||
field created_at timestamp ( autoinsert, default current_timestamp )
|
field created_at timestamp ( autoinsert, default current_timestamp )
|
||||||
field updated_at timestamp ( autoinsert, autoupdate, default current_timestamp )
|
field updated_at timestamp ( autoinsert, autoupdate, default current_timestamp )
|
||||||
field last_contact_success timestamp ( updatable, default "epoch" )
|
field last_contact_success timestamp ( updatable, default "epoch" )
|
||||||
@ -192,9 +192,13 @@ read all (
|
|||||||
select node.id
|
select node.id
|
||||||
)
|
)
|
||||||
|
|
||||||
|
read paged (
|
||||||
|
select node
|
||||||
|
)
|
||||||
|
|
||||||
read all (
|
read all (
|
||||||
select node.id node.piece_count
|
select node.id node.piece_count
|
||||||
where node.piece_count != 0
|
where node.piece_count != 0
|
||||||
)
|
)
|
||||||
|
|
||||||
//--- audit history ---//
|
//--- audit history ---//
|
||||||
|
@ -9146,6 +9146,11 @@ type Paged_BucketBandwidthRollup_By_IntervalStart_GreaterOrEqual_Continuation st
|
|||||||
_set bool
|
_set bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Paged_Node_Continuation struct {
|
||||||
|
_value_id []byte
|
||||||
|
_set bool
|
||||||
|
}
|
||||||
|
|
||||||
type Paged_StoragenodeBandwidthRollupArchive_By_IntervalStart_GreaterOrEqual_Continuation struct {
|
type Paged_StoragenodeBandwidthRollupArchive_By_IntervalStart_GreaterOrEqual_Continuation struct {
|
||||||
_value_storagenode_id []byte
|
_value_storagenode_id []byte
|
||||||
_value_interval_start time.Time
|
_value_interval_start time.Time
|
||||||
@ -10293,6 +10298,65 @@ func (obj *pgxImpl) All_Node_Id(ctx context.Context) (
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (obj *pgxImpl) Paged_Node(ctx context.Context,
|
||||||
|
limit int, start *Paged_Node_Continuation) (
|
||||||
|
rows []*Node, next *Paged_Node_Continuation, err error) {
|
||||||
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
|
var __embed_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.address, nodes.last_net, nodes.last_ip_port, nodes.protocol, nodes.type, nodes.email, nodes.wallet, nodes.wallet_features, nodes.free_disk, nodes.piece_count, nodes.major, nodes.minor, nodes.patch, nodes.hash, nodes.timestamp, nodes.release, nodes.latency_90, nodes.audit_success_count, nodes.total_audit_count, nodes.vetted_at, nodes.uptime_success_count, nodes.total_uptime_count, nodes.created_at, nodes.updated_at, nodes.last_contact_success, nodes.last_contact_failure, nodes.contained, nodes.disqualified, nodes.suspended, nodes.unknown_audit_suspended, nodes.offline_suspended, nodes.under_review, nodes.online_score, nodes.audit_reputation_alpha, nodes.audit_reputation_beta, nodes.unknown_audit_reputation_alpha, nodes.unknown_audit_reputation_beta, nodes.exit_initiated_at, nodes.exit_loop_completed_at, nodes.exit_finished_at, nodes.exit_success, nodes.id FROM nodes WHERE (nodes.id) > ? ORDER BY nodes.id LIMIT ?")
|
||||||
|
|
||||||
|
var __embed_first_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.address, nodes.last_net, nodes.last_ip_port, nodes.protocol, nodes.type, nodes.email, nodes.wallet, nodes.wallet_features, nodes.free_disk, nodes.piece_count, nodes.major, nodes.minor, nodes.patch, nodes.hash, nodes.timestamp, nodes.release, nodes.latency_90, nodes.audit_success_count, nodes.total_audit_count, nodes.vetted_at, nodes.uptime_success_count, nodes.total_uptime_count, nodes.created_at, nodes.updated_at, nodes.last_contact_success, nodes.last_contact_failure, nodes.contained, nodes.disqualified, nodes.suspended, nodes.unknown_audit_suspended, nodes.offline_suspended, nodes.under_review, nodes.online_score, nodes.audit_reputation_alpha, nodes.audit_reputation_beta, nodes.unknown_audit_reputation_alpha, nodes.unknown_audit_reputation_beta, nodes.exit_initiated_at, nodes.exit_loop_completed_at, nodes.exit_finished_at, nodes.exit_success, nodes.id FROM nodes ORDER BY nodes.id LIMIT ?")
|
||||||
|
|
||||||
|
var __values []interface{}
|
||||||
|
|
||||||
|
var __stmt string
|
||||||
|
if start != nil && start._set {
|
||||||
|
__values = append(__values, start._value_id, limit)
|
||||||
|
__stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
|
||||||
|
} else {
|
||||||
|
__values = append(__values, limit)
|
||||||
|
__stmt = __sqlbundle_Render(obj.dialect, __embed_first_stmt)
|
||||||
|
}
|
||||||
|
obj.logStmt(__stmt, __values...)
|
||||||
|
|
||||||
|
for {
|
||||||
|
rows, next, err = func() (rows []*Node, next *Paged_Node_Continuation, err error) {
|
||||||
|
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
defer __rows.Close()
|
||||||
|
|
||||||
|
var __continuation Paged_Node_Continuation
|
||||||
|
__continuation._set = true
|
||||||
|
|
||||||
|
for __rows.Next() {
|
||||||
|
node := &Node{}
|
||||||
|
err = __rows.Scan(&node.Id, &node.Address, &node.LastNet, &node.LastIpPort, &node.Protocol, &node.Type, &node.Email, &node.Wallet, &node.WalletFeatures, &node.FreeDisk, &node.PieceCount, &node.Major, &node.Minor, &node.Patch, &node.Hash, &node.Timestamp, &node.Release, &node.Latency90, &node.AuditSuccessCount, &node.TotalAuditCount, &node.VettedAt, &node.UptimeSuccessCount, &node.TotalUptimeCount, &node.CreatedAt, &node.UpdatedAt, &node.LastContactSuccess, &node.LastContactFailure, &node.Contained, &node.Disqualified, &node.Suspended, &node.UnknownAuditSuspended, &node.OfflineSuspended, &node.UnderReview, &node.OnlineScore, &node.AuditReputationAlpha, &node.AuditReputationBeta, &node.UnknownAuditReputationAlpha, &node.UnknownAuditReputationBeta, &node.ExitInitiatedAt, &node.ExitLoopCompletedAt, &node.ExitFinishedAt, &node.ExitSuccess, &__continuation._value_id)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
rows = append(rows, node)
|
||||||
|
next = &__continuation
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := __rows.Err(); err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rows, next, nil
|
||||||
|
}()
|
||||||
|
if err != nil {
|
||||||
|
if obj.shouldRetry(err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return nil, nil, obj.makeErr(err)
|
||||||
|
}
|
||||||
|
return rows, next, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func (obj *pgxImpl) All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number(ctx context.Context) (
|
func (obj *pgxImpl) All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number(ctx context.Context) (
|
||||||
rows []*Id_PieceCount_Row, err error) {
|
rows []*Id_PieceCount_Row, err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
@ -15727,6 +15791,65 @@ func (obj *pgxcockroachImpl) All_Node_Id(ctx context.Context) (
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (obj *pgxcockroachImpl) Paged_Node(ctx context.Context,
|
||||||
|
limit int, start *Paged_Node_Continuation) (
|
||||||
|
rows []*Node, next *Paged_Node_Continuation, err error) {
|
||||||
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
|
var __embed_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.address, nodes.last_net, nodes.last_ip_port, nodes.protocol, nodes.type, nodes.email, nodes.wallet, nodes.wallet_features, nodes.free_disk, nodes.piece_count, nodes.major, nodes.minor, nodes.patch, nodes.hash, nodes.timestamp, nodes.release, nodes.latency_90, nodes.audit_success_count, nodes.total_audit_count, nodes.vetted_at, nodes.uptime_success_count, nodes.total_uptime_count, nodes.created_at, nodes.updated_at, nodes.last_contact_success, nodes.last_contact_failure, nodes.contained, nodes.disqualified, nodes.suspended, nodes.unknown_audit_suspended, nodes.offline_suspended, nodes.under_review, nodes.online_score, nodes.audit_reputation_alpha, nodes.audit_reputation_beta, nodes.unknown_audit_reputation_alpha, nodes.unknown_audit_reputation_beta, nodes.exit_initiated_at, nodes.exit_loop_completed_at, nodes.exit_finished_at, nodes.exit_success, nodes.id FROM nodes WHERE (nodes.id) > ? ORDER BY nodes.id LIMIT ?")
|
||||||
|
|
||||||
|
var __embed_first_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.address, nodes.last_net, nodes.last_ip_port, nodes.protocol, nodes.type, nodes.email, nodes.wallet, nodes.wallet_features, nodes.free_disk, nodes.piece_count, nodes.major, nodes.minor, nodes.patch, nodes.hash, nodes.timestamp, nodes.release, nodes.latency_90, nodes.audit_success_count, nodes.total_audit_count, nodes.vetted_at, nodes.uptime_success_count, nodes.total_uptime_count, nodes.created_at, nodes.updated_at, nodes.last_contact_success, nodes.last_contact_failure, nodes.contained, nodes.disqualified, nodes.suspended, nodes.unknown_audit_suspended, nodes.offline_suspended, nodes.under_review, nodes.online_score, nodes.audit_reputation_alpha, nodes.audit_reputation_beta, nodes.unknown_audit_reputation_alpha, nodes.unknown_audit_reputation_beta, nodes.exit_initiated_at, nodes.exit_loop_completed_at, nodes.exit_finished_at, nodes.exit_success, nodes.id FROM nodes ORDER BY nodes.id LIMIT ?")
|
||||||
|
|
||||||
|
var __values []interface{}
|
||||||
|
|
||||||
|
var __stmt string
|
||||||
|
if start != nil && start._set {
|
||||||
|
__values = append(__values, start._value_id, limit)
|
||||||
|
__stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
|
||||||
|
} else {
|
||||||
|
__values = append(__values, limit)
|
||||||
|
__stmt = __sqlbundle_Render(obj.dialect, __embed_first_stmt)
|
||||||
|
}
|
||||||
|
obj.logStmt(__stmt, __values...)
|
||||||
|
|
||||||
|
for {
|
||||||
|
rows, next, err = func() (rows []*Node, next *Paged_Node_Continuation, err error) {
|
||||||
|
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
defer __rows.Close()
|
||||||
|
|
||||||
|
var __continuation Paged_Node_Continuation
|
||||||
|
__continuation._set = true
|
||||||
|
|
||||||
|
for __rows.Next() {
|
||||||
|
node := &Node{}
|
||||||
|
err = __rows.Scan(&node.Id, &node.Address, &node.LastNet, &node.LastIpPort, &node.Protocol, &node.Type, &node.Email, &node.Wallet, &node.WalletFeatures, &node.FreeDisk, &node.PieceCount, &node.Major, &node.Minor, &node.Patch, &node.Hash, &node.Timestamp, &node.Release, &node.Latency90, &node.AuditSuccessCount, &node.TotalAuditCount, &node.VettedAt, &node.UptimeSuccessCount, &node.TotalUptimeCount, &node.CreatedAt, &node.UpdatedAt, &node.LastContactSuccess, &node.LastContactFailure, &node.Contained, &node.Disqualified, &node.Suspended, &node.UnknownAuditSuspended, &node.OfflineSuspended, &node.UnderReview, &node.OnlineScore, &node.AuditReputationAlpha, &node.AuditReputationBeta, &node.UnknownAuditReputationAlpha, &node.UnknownAuditReputationBeta, &node.ExitInitiatedAt, &node.ExitLoopCompletedAt, &node.ExitFinishedAt, &node.ExitSuccess, &__continuation._value_id)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
rows = append(rows, node)
|
||||||
|
next = &__continuation
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := __rows.Err(); err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rows, next, nil
|
||||||
|
}()
|
||||||
|
if err != nil {
|
||||||
|
if obj.shouldRetry(err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return nil, nil, obj.makeErr(err)
|
||||||
|
}
|
||||||
|
return rows, next, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func (obj *pgxcockroachImpl) All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number(ctx context.Context) (
|
func (obj *pgxcockroachImpl) All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number(ctx context.Context) (
|
||||||
rows []*Id_PieceCount_Row, err error) {
|
rows []*Id_PieceCount_Row, err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
@ -21263,6 +21386,16 @@ func (rx *Rx) Paged_BucketBandwidthRollup_By_IntervalStart_GreaterOrEqual(ctx co
|
|||||||
return tx.Paged_BucketBandwidthRollup_By_IntervalStart_GreaterOrEqual(ctx, bucket_bandwidth_rollup_interval_start_greater_or_equal, limit, start)
|
return tx.Paged_BucketBandwidthRollup_By_IntervalStart_GreaterOrEqual(ctx, bucket_bandwidth_rollup_interval_start_greater_or_equal, limit, start)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (rx *Rx) Paged_Node(ctx context.Context,
|
||||||
|
limit int, start *Paged_Node_Continuation) (
|
||||||
|
rows []*Node, next *Paged_Node_Continuation, err error) {
|
||||||
|
var tx *Tx
|
||||||
|
if tx, err = rx.getTx(ctx); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return tx.Paged_Node(ctx, limit, start)
|
||||||
|
}
|
||||||
|
|
||||||
func (rx *Rx) Paged_StoragenodeBandwidthRollupArchive_By_IntervalStart_GreaterOrEqual(ctx context.Context,
|
func (rx *Rx) Paged_StoragenodeBandwidthRollupArchive_By_IntervalStart_GreaterOrEqual(ctx context.Context,
|
||||||
storagenode_bandwidth_rollup_archive_interval_start_greater_or_equal StoragenodeBandwidthRollupArchive_IntervalStart_Field,
|
storagenode_bandwidth_rollup_archive_interval_start_greater_or_equal StoragenodeBandwidthRollupArchive_IntervalStart_Field,
|
||||||
limit int, start *Paged_StoragenodeBandwidthRollupArchive_By_IntervalStart_GreaterOrEqual_Continuation) (
|
limit int, start *Paged_StoragenodeBandwidthRollupArchive_By_IntervalStart_GreaterOrEqual_Continuation) (
|
||||||
@ -22074,6 +22207,10 @@ type Methods interface {
|
|||||||
limit int, start *Paged_BucketBandwidthRollup_By_IntervalStart_GreaterOrEqual_Continuation) (
|
limit int, start *Paged_BucketBandwidthRollup_By_IntervalStart_GreaterOrEqual_Continuation) (
|
||||||
rows []*BucketBandwidthRollup, next *Paged_BucketBandwidthRollup_By_IntervalStart_GreaterOrEqual_Continuation, err error)
|
rows []*BucketBandwidthRollup, next *Paged_BucketBandwidthRollup_By_IntervalStart_GreaterOrEqual_Continuation, err error)
|
||||||
|
|
||||||
|
Paged_Node(ctx context.Context,
|
||||||
|
limit int, start *Paged_Node_Continuation) (
|
||||||
|
rows []*Node, next *Paged_Node_Continuation, err error)
|
||||||
|
|
||||||
Paged_StoragenodeBandwidthRollupArchive_By_IntervalStart_GreaterOrEqual(ctx context.Context,
|
Paged_StoragenodeBandwidthRollupArchive_By_IntervalStart_GreaterOrEqual(ctx context.Context,
|
||||||
storagenode_bandwidth_rollup_archive_interval_start_greater_or_equal StoragenodeBandwidthRollupArchive_IntervalStart_Field,
|
storagenode_bandwidth_rollup_archive_interval_start_greater_or_equal StoragenodeBandwidthRollupArchive_IntervalStart_Field,
|
||||||
limit int, start *Paged_StoragenodeBandwidthRollupArchive_By_IntervalStart_GreaterOrEqual_Continuation) (
|
limit int, start *Paged_StoragenodeBandwidthRollupArchive_By_IntervalStart_GreaterOrEqual_Continuation) (
|
||||||
|
@ -256,10 +256,10 @@ func (db *satelliteDB) PostgresMigration() *migrate.Migration {
|
|||||||
exit_initiated_at timestamp with time zone,
|
exit_initiated_at timestamp with time zone,
|
||||||
exit_finished_at timestamp with time zone,
|
exit_finished_at timestamp with time zone,
|
||||||
exit_success boolean NOT NULL DEFAULT FALSE,
|
exit_success boolean NOT NULL DEFAULT FALSE,
|
||||||
last_ip_port text,
|
|
||||||
suspended timestamp with time zone,
|
|
||||||
unknown_audit_reputation_alpha double precision NOT NULL DEFAULT 1,
|
unknown_audit_reputation_alpha double precision NOT NULL DEFAULT 1,
|
||||||
unknown_audit_reputation_beta double precision NOT NULL DEFAULT 0,
|
unknown_audit_reputation_beta double precision NOT NULL DEFAULT 0,
|
||||||
|
suspended timestamp with time zone,
|
||||||
|
last_ip_port text,
|
||||||
vetted_at timestamp with time zone,
|
vetted_at timestamp with time zone,
|
||||||
PRIMARY KEY ( id )
|
PRIMARY KEY ( id )
|
||||||
);`,
|
);`,
|
||||||
|
@ -1802,3 +1802,33 @@ func (cache *overlaycache) IterateAllNodes(ctx context.Context, cb func(context.
|
|||||||
|
|
||||||
return rows.Err()
|
return rows.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IterateAllNodeDossiers will call cb on all known nodes (used for invoice generation).
|
||||||
|
func (cache *overlaycache) IterateAllNodeDossiers(ctx context.Context, cb func(context.Context, *overlay.NodeDossier) error) (err error) {
|
||||||
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
|
const nodesPerPage = 1000
|
||||||
|
var cont *dbx.Paged_Node_Continuation
|
||||||
|
var dbxNodes []*dbx.Node
|
||||||
|
|
||||||
|
for {
|
||||||
|
dbxNodes, cont, err = cache.db.Paged_Node(ctx, nodesPerPage, cont)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, node := range dbxNodes {
|
||||||
|
dossier, err := convertDBNode(ctx, node)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := cb(ctx, dossier); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if cont == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -249,17 +249,17 @@ func (db *ProjectAccounting) GetProjectTotal(ctx context.Context, projectID uuid
|
|||||||
|
|
||||||
storageQuery := db.db.Rebind(`
|
storageQuery := db.db.Rebind(`
|
||||||
SELECT
|
SELECT
|
||||||
bucket_storage_tallies.interval_start,
|
bucket_storage_tallies.interval_start,
|
||||||
bucket_storage_tallies.inline,
|
bucket_storage_tallies.inline,
|
||||||
bucket_storage_tallies.remote,
|
bucket_storage_tallies.remote,
|
||||||
bucket_storage_tallies.object_count
|
bucket_storage_tallies.object_count
|
||||||
FROM
|
FROM
|
||||||
bucket_storage_tallies
|
bucket_storage_tallies
|
||||||
WHERE
|
WHERE
|
||||||
bucket_storage_tallies.project_id = ? AND
|
bucket_storage_tallies.project_id = ? AND
|
||||||
bucket_storage_tallies.bucket_name = ? AND
|
bucket_storage_tallies.bucket_name = ? AND
|
||||||
bucket_storage_tallies.interval_start >= ? AND
|
bucket_storage_tallies.interval_start >= ? AND
|
||||||
bucket_storage_tallies.interval_start <= ?
|
bucket_storage_tallies.interval_start <= ?
|
||||||
ORDER BY bucket_storage_tallies.interval_start DESC
|
ORDER BY bucket_storage_tallies.interval_start DESC
|
||||||
`)
|
`)
|
||||||
|
|
||||||
@ -320,14 +320,14 @@ func (db *ProjectAccounting) GetProjectTotal(ctx context.Context, projectID uuid
|
|||||||
// only process PieceAction_GET.
|
// only process PieceAction_GET.
|
||||||
func (db *ProjectAccounting) getTotalEgress(ctx context.Context, projectID uuid.UUID, since, before time.Time) (totalEgress int64, err error) {
|
func (db *ProjectAccounting) getTotalEgress(ctx context.Context, projectID uuid.UUID, since, before time.Time) (totalEgress int64, err error) {
|
||||||
totalEgressQuery := db.db.Rebind(`
|
totalEgressQuery := db.db.Rebind(`
|
||||||
SELECT
|
SELECT
|
||||||
COALESCE(SUM(settled) + SUM(inline), 0)
|
COALESCE(SUM(settled) + SUM(inline), 0)
|
||||||
FROM
|
FROM
|
||||||
bucket_bandwidth_rollups
|
bucket_bandwidth_rollups
|
||||||
WHERE
|
WHERE
|
||||||
project_id = ? AND
|
project_id = ? AND
|
||||||
interval_start >= ? AND
|
interval_start >= ? AND
|
||||||
interval_start <= ? AND
|
interval_start <= ? AND
|
||||||
action = ?;
|
action = ?;
|
||||||
`)
|
`)
|
||||||
|
|
||||||
@ -542,7 +542,7 @@ func (db *ProjectAccounting) GetBucketTotals(ctx context.Context, projectID uuid
|
|||||||
}
|
}
|
||||||
|
|
||||||
var buckets []string
|
var buckets []string
|
||||||
bucketsQuery := db.db.Rebind(`SELECT name FROM bucket_metainfos
|
bucketsQuery := db.db.Rebind(`SELECT name FROM bucket_metainfos
|
||||||
WHERE project_id = ? AND ` + bucketNameRange + `ORDER BY name ASC LIMIT ? OFFSET ?`)
|
WHERE project_id = ? AND ` + bucketNameRange + `ORDER BY name ASC LIMIT ? OFFSET ?`)
|
||||||
|
|
||||||
args = []interface{}{
|
args = []interface{}{
|
||||||
@ -645,15 +645,15 @@ func (db *ProjectAccounting) ArchiveRollupsBefore(ctx context.Context, before ti
|
|||||||
for {
|
for {
|
||||||
row := db.db.QueryRow(ctx, `
|
row := db.db.QueryRow(ctx, `
|
||||||
WITH rollups_to_move AS (
|
WITH rollups_to_move AS (
|
||||||
DELETE FROM bucket_bandwidth_rollups
|
DELETE FROM bucket_bandwidth_rollups
|
||||||
WHERE interval_start <= $1
|
WHERE interval_start <= $1
|
||||||
LIMIT $2 RETURNING *
|
LIMIT $2 RETURNING *
|
||||||
), moved_rollups AS (
|
), moved_rollups AS (
|
||||||
INSERT INTO bucket_bandwidth_rollup_archives(bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled)
|
INSERT INTO bucket_bandwidth_rollup_archives(bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled)
|
||||||
SELECT bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled FROM rollups_to_move
|
SELECT bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled FROM rollups_to_move
|
||||||
RETURNING *
|
RETURNING *
|
||||||
)
|
)
|
||||||
SELECT count(*) FROM moved_rollups
|
SELECT count(*) FROM moved_rollups
|
||||||
`, before, batchSize)
|
`, before, batchSize)
|
||||||
|
|
||||||
var rowCount int
|
var rowCount int
|
||||||
@ -677,8 +677,8 @@ func (db *ProjectAccounting) ArchiveRollupsBefore(ctx context.Context, before ti
|
|||||||
INSERT INTO bucket_bandwidth_rollup_archives(bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled)
|
INSERT INTO bucket_bandwidth_rollup_archives(bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled)
|
||||||
SELECT bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled FROM rollups_to_move
|
SELECT bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled FROM rollups_to_move
|
||||||
RETURNING *
|
RETURNING *
|
||||||
)
|
)
|
||||||
SELECT count(*) FROM moved_rollups
|
SELECT count(*) FROM moved_rollups
|
||||||
`
|
`
|
||||||
row := db.db.DB.QueryRow(ctx, bwStatement, before)
|
row := db.db.DB.QueryRow(ctx, bwStatement, before)
|
||||||
var rowCount int
|
var rowCount int
|
||||||
@ -697,7 +697,7 @@ func (db *ProjectAccounting) getBucketsSinceAndBefore(ctx context.Context, proje
|
|||||||
bucketsQuery := db.db.Rebind(`SELECT DISTINCT bucket_name
|
bucketsQuery := db.db.Rebind(`SELECT DISTINCT bucket_name
|
||||||
FROM bucket_storage_tallies
|
FROM bucket_storage_tallies
|
||||||
WHERE project_id = ?
|
WHERE project_id = ?
|
||||||
AND interval_start >= ?
|
AND interval_start >= ?
|
||||||
AND interval_start <= ?`)
|
AND interval_start <= ?`)
|
||||||
bucketRows, err := db.db.QueryContext(ctx, bucketsQuery, projectID[:], since, before)
|
bucketRows, err := db.db.QueryContext(ctx, bucketsQuery, projectID[:], since, before)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -773,7 +773,7 @@ func (db *ProjectAccounting) GetRollupsSince(ctx context.Context, since time.Tim
|
|||||||
Settled: int64(dbxRollup.Settled),
|
Settled: int64(dbxRollup.Settled),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
if len(dbxRollups) < pageLimit {
|
if cursor == nil {
|
||||||
return bwRollups, nil
|
return bwRollups, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -811,7 +811,7 @@ func (db *ProjectAccounting) GetArchivedRollupsSince(ctx context.Context, since
|
|||||||
Settled: int64(dbxRollup.Settled),
|
Settled: int64(dbxRollup.Settled),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
if len(dbxRollups) < pageLimit {
|
if cursor == nil {
|
||||||
return bwRollups, nil
|
return bwRollups, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -157,7 +157,7 @@ func (db *StoragenodeAccounting) getBandwidthByNodeSince(ctx context.Context, la
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(rollups) < pageLimit {
|
if cursor == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -196,7 +196,7 @@ func (db *StoragenodeAccounting) getBandwidthPhase2ByNodeSince(ctx context.Conte
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(rollups) < pageLimit {
|
if cursor == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -611,7 +611,7 @@ func (db *StoragenodeAccounting) GetRollupsSince(ctx context.Context, since time
|
|||||||
Settled: dbxRollup.Settled,
|
Settled: dbxRollup.Settled,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
if len(dbxRollups) < pageLimit {
|
if cursor == nil {
|
||||||
return bwRollups, nil
|
return bwRollups, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -647,7 +647,7 @@ func (db *StoragenodeAccounting) GetArchivedRollupsSince(ctx context.Context, si
|
|||||||
Settled: dbxRollup.Settled,
|
Settled: dbxRollup.Settled,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
if len(dbxRollups) < pageLimit {
|
if cursor == nil {
|
||||||
return bwRollups, nil
|
return bwRollups, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user