satellite/overlay: Return NodeLastContact instead of a node dossier from

overlay.GetOfflineNodesLimited

We only care about node ID, address, and last contact success/failure
from the downtime service, so the overlay should only return these
values for the downtime-specific queries.

Change-Id: I08a6ecfdd2a12b82cae62e87d6adeab53975bfce
This commit is contained in:
Moby von Briesen 2020-01-06 15:06:05 -05:00
parent 4203e25c54
commit 6c2e4cc0cd
5 changed files with 42 additions and 33 deletions

View File

@ -90,7 +90,7 @@ type DB interface {
// GetSuccesfulNodesNotCheckedInSince returns all nodes that last check-in was successful, but haven't checked-in within a given duration.
GetSuccesfulNodesNotCheckedInSince(ctx context.Context, duration time.Duration) (nodeAddresses []NodeLastContact, err error)
// GetOfflineNodesLimited returns a list of the first N offline nodes ordered by least recently contacted.
GetOfflineNodesLimited(ctx context.Context, limit int) ([]*pb.Node, error)
GetOfflineNodesLimited(ctx context.Context, limit int) ([]NodeLastContact, error)
}
// NodeCheckInInfo contains all the info that will be updated when a node checkins

View File

@ -579,18 +579,18 @@ func TestCache_DowntimeTracking(t *testing.T) {
require.NoError(t, err)
require.Len(t, nodes, 4)
// order of nodes should be least recently checked first
require.Equal(t, allIDs[2], nodes[0].Id)
require.Equal(t, allIDs[4], nodes[1].Id)
require.Equal(t, allIDs[6], nodes[2].Id)
require.Equal(t, allIDs[8], nodes[3].Id)
require.Equal(t, allIDs[2], nodes[0].ID)
require.Equal(t, allIDs[4], nodes[1].ID)
require.Equal(t, allIDs[6], nodes[2].ID)
require.Equal(t, allIDs[8], nodes[3].ID)
// test with limit
nodes, err = cache.GetOfflineNodesLimited(ctx, 2)
require.NoError(t, err)
require.Len(t, nodes, 2)
// order of nodes should be least recently checked first
require.Equal(t, allIDs[2], nodes[0].Id)
require.Equal(t, allIDs[4], nodes[1].Id)
require.Equal(t, allIDs[2], nodes[0].ID)
require.Equal(t, allIDs[4], nodes[1].ID)
})
}

View File

@ -211,7 +211,7 @@ read all (
)
read limitoffset (
select node
select node.id node.address node.last_contact_success node.last_contact_failure
where node.last_contact_success < node.last_contact_failure
where node.disqualified = null
orderby asc node.last_contact_failure

View File

@ -8216,12 +8216,12 @@ func (obj *postgresImpl) All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number(ct
}
func (obj *postgresImpl) Limited_Node_By_LastContactSuccess_Less_LastContactFailure_And_Disqualified_Is_Null_OrderBy_Asc_LastContactFailure(ctx context.Context,
func (obj *postgresImpl) Limited_Node_Id_Node_Address_Node_LastContactSuccess_Node_LastContactFailure_By_LastContactSuccess_Less_LastContactFailure_And_Disqualified_Is_Null_OrderBy_Asc_LastContactFailure(ctx context.Context,
limit int, offset int64) (
rows []*Node, err error) {
rows []*Id_Address_LastContactSuccess_LastContactFailure_Row, err error) {
defer mon.Task()(&ctx)(&err)
var __embed_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.address, nodes.last_net, nodes.protocol, nodes.type, nodes.email, nodes.wallet, nodes.free_bandwidth, nodes.free_disk, nodes.piece_count, nodes.major, nodes.minor, nodes.patch, nodes.hash, nodes.timestamp, nodes.release, nodes.latency_90, nodes.audit_success_count, nodes.total_audit_count, nodes.uptime_success_count, nodes.total_uptime_count, nodes.created_at, nodes.updated_at, nodes.last_contact_success, nodes.last_contact_failure, nodes.contained, nodes.disqualified, nodes.audit_reputation_alpha, nodes.audit_reputation_beta, nodes.uptime_reputation_alpha, nodes.uptime_reputation_beta, nodes.exit_initiated_at, nodes.exit_loop_completed_at, nodes.exit_finished_at, nodes.exit_success FROM nodes WHERE nodes.last_contact_success < nodes.last_contact_failure AND nodes.disqualified is NULL ORDER BY nodes.last_contact_failure LIMIT ? OFFSET ?")
var __embed_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.address, nodes.last_contact_success, nodes.last_contact_failure FROM nodes WHERE nodes.last_contact_success < nodes.last_contact_failure AND nodes.disqualified is NULL ORDER BY nodes.last_contact_failure LIMIT ? OFFSET ?")
var __values []interface{}
__values = append(__values)
@ -8238,12 +8238,12 @@ func (obj *postgresImpl) Limited_Node_By_LastContactSuccess_Less_LastContactFail
defer __rows.Close()
for __rows.Next() {
node := &Node{}
err = __rows.Scan(&node.Id, &node.Address, &node.LastNet, &node.Protocol, &node.Type, &node.Email, &node.Wallet, &node.FreeBandwidth, &node.FreeDisk, &node.PieceCount, &node.Major, &node.Minor, &node.Patch, &node.Hash, &node.Timestamp, &node.Release, &node.Latency90, &node.AuditSuccessCount, &node.TotalAuditCount, &node.UptimeSuccessCount, &node.TotalUptimeCount, &node.CreatedAt, &node.UpdatedAt, &node.LastContactSuccess, &node.LastContactFailure, &node.Contained, &node.Disqualified, &node.AuditReputationAlpha, &node.AuditReputationBeta, &node.UptimeReputationAlpha, &node.UptimeReputationBeta, &node.ExitInitiatedAt, &node.ExitLoopCompletedAt, &node.ExitFinishedAt, &node.ExitSuccess)
row := &Id_Address_LastContactSuccess_LastContactFailure_Row{}
err = __rows.Scan(&row.Id, &row.Address, &row.LastContactSuccess, &row.LastContactFailure)
if err != nil {
return nil, obj.makeErr(err)
}
rows = append(rows, node)
rows = append(rows, row)
}
if err := __rows.Err(); err != nil {
return nil, obj.makeErr(err)
@ -13342,12 +13342,12 @@ func (obj *cockroachImpl) All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number(c
}
func (obj *cockroachImpl) Limited_Node_By_LastContactSuccess_Less_LastContactFailure_And_Disqualified_Is_Null_OrderBy_Asc_LastContactFailure(ctx context.Context,
func (obj *cockroachImpl) Limited_Node_Id_Node_Address_Node_LastContactSuccess_Node_LastContactFailure_By_LastContactSuccess_Less_LastContactFailure_And_Disqualified_Is_Null_OrderBy_Asc_LastContactFailure(ctx context.Context,
limit int, offset int64) (
rows []*Node, err error) {
rows []*Id_Address_LastContactSuccess_LastContactFailure_Row, err error) {
defer mon.Task()(&ctx)(&err)
var __embed_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.address, nodes.last_net, nodes.protocol, nodes.type, nodes.email, nodes.wallet, nodes.free_bandwidth, nodes.free_disk, nodes.piece_count, nodes.major, nodes.minor, nodes.patch, nodes.hash, nodes.timestamp, nodes.release, nodes.latency_90, nodes.audit_success_count, nodes.total_audit_count, nodes.uptime_success_count, nodes.total_uptime_count, nodes.created_at, nodes.updated_at, nodes.last_contact_success, nodes.last_contact_failure, nodes.contained, nodes.disqualified, nodes.audit_reputation_alpha, nodes.audit_reputation_beta, nodes.uptime_reputation_alpha, nodes.uptime_reputation_beta, nodes.exit_initiated_at, nodes.exit_loop_completed_at, nodes.exit_finished_at, nodes.exit_success FROM nodes WHERE nodes.last_contact_success < nodes.last_contact_failure AND nodes.disqualified is NULL ORDER BY nodes.last_contact_failure LIMIT ? OFFSET ?")
var __embed_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.address, nodes.last_contact_success, nodes.last_contact_failure FROM nodes WHERE nodes.last_contact_success < nodes.last_contact_failure AND nodes.disqualified is NULL ORDER BY nodes.last_contact_failure LIMIT ? OFFSET ?")
var __values []interface{}
__values = append(__values)
@ -13364,12 +13364,12 @@ func (obj *cockroachImpl) Limited_Node_By_LastContactSuccess_Less_LastContactFai
defer __rows.Close()
for __rows.Next() {
node := &Node{}
err = __rows.Scan(&node.Id, &node.Address, &node.LastNet, &node.Protocol, &node.Type, &node.Email, &node.Wallet, &node.FreeBandwidth, &node.FreeDisk, &node.PieceCount, &node.Major, &node.Minor, &node.Patch, &node.Hash, &node.Timestamp, &node.Release, &node.Latency90, &node.AuditSuccessCount, &node.TotalAuditCount, &node.UptimeSuccessCount, &node.TotalUptimeCount, &node.CreatedAt, &node.UpdatedAt, &node.LastContactSuccess, &node.LastContactFailure, &node.Contained, &node.Disqualified, &node.AuditReputationAlpha, &node.AuditReputationBeta, &node.UptimeReputationAlpha, &node.UptimeReputationBeta, &node.ExitInitiatedAt, &node.ExitLoopCompletedAt, &node.ExitFinishedAt, &node.ExitSuccess)
row := &Id_Address_LastContactSuccess_LastContactFailure_Row{}
err = __rows.Scan(&row.Id, &row.Address, &row.LastContactSuccess, &row.LastContactFailure)
if err != nil {
return nil, obj.makeErr(err)
}
rows = append(rows, node)
rows = append(rows, row)
}
if err := __rows.Err(); err != nil {
return nil, obj.makeErr(err)
@ -18514,14 +18514,14 @@ func (rx *Rx) Limited_Node_By_Id_GreaterOrEqual_OrderBy_Asc_Id(ctx context.Conte
return tx.Limited_Node_By_Id_GreaterOrEqual_OrderBy_Asc_Id(ctx, node_id_greater_or_equal, limit, offset)
}
func (rx *Rx) Limited_Node_By_LastContactSuccess_Less_LastContactFailure_And_Disqualified_Is_Null_OrderBy_Asc_LastContactFailure(ctx context.Context,
func (rx *Rx) Limited_Node_Id_Node_Address_Node_LastContactSuccess_Node_LastContactFailure_By_LastContactSuccess_Less_LastContactFailure_And_Disqualified_Is_Null_OrderBy_Asc_LastContactFailure(ctx context.Context,
limit int, offset int64) (
rows []*Node, err error) {
rows []*Id_Address_LastContactSuccess_LastContactFailure_Row, err error) {
var tx *Tx
if tx, err = rx.getTx(ctx); err != nil {
return
}
return tx.Limited_Node_By_LastContactSuccess_Less_LastContactFailure_And_Disqualified_Is_Null_OrderBy_Asc_LastContactFailure(ctx, limit, offset)
return tx.Limited_Node_Id_Node_Address_Node_LastContactSuccess_Node_LastContactFailure_By_LastContactSuccess_Less_LastContactFailure_And_Disqualified_Is_Null_OrderBy_Asc_LastContactFailure(ctx, limit, offset)
}
func (rx *Rx) Limited_Node_Id_Node_LastNet_Node_Address_Node_Protocol_By_Id_GreaterOrEqual_And_Disqualified_Is_Null_OrderBy_Asc_Id(ctx context.Context,
@ -19403,9 +19403,9 @@ type Methods interface {
limit int, offset int64) (
rows []*Node, err error)
Limited_Node_By_LastContactSuccess_Less_LastContactFailure_And_Disqualified_Is_Null_OrderBy_Asc_LastContactFailure(ctx context.Context,
Limited_Node_Id_Node_Address_Node_LastContactSuccess_Node_LastContactFailure_By_LastContactSuccess_Less_LastContactFailure_And_Disqualified_Is_Null_OrderBy_Asc_LastContactFailure(ctx context.Context,
limit int, offset int64) (
rows []*Node, err error)
rows []*Id_Address_LastContactSuccess_LastContactFailure_Row, err error)
Limited_Node_Id_Node_LastNet_Node_Address_Node_Protocol_By_Id_GreaterOrEqual_And_Disqualified_Is_Null_OrderBy_Asc_Id(ctx context.Context,
node_id_greater_or_equal Node_Id_Field,

View File

@ -1068,22 +1068,31 @@ func populateExitStatusFields(req *overlay.ExitStatusRequest) dbx.Node_Update_Fi
}
// GetOfflineNodesLimited returns a list of the first N offline nodes ordered by least recently contacted.
func (cache *overlaycache) GetOfflineNodesLimited(ctx context.Context, limit int) (nodes []*pb.Node, err error) {
func (cache *overlaycache) GetOfflineNodesLimited(ctx context.Context, limit int) (nodeLastContacts []overlay.NodeLastContact, err error) {
defer mon.Task()(&ctx)(&err)
rows, err := cache.db.DB.Limited_Node_By_LastContactSuccess_Less_LastContactFailure_And_Disqualified_Is_Null_OrderBy_Asc_LastContactFailure(ctx,
limit, 0)
dbxNodes, err := cache.db.DB.Limited_Node_Id_Node_Address_Node_LastContactSuccess_Node_LastContactFailure_By_LastContactSuccess_Less_LastContactFailure_And_Disqualified_Is_Null_OrderBy_Asc_LastContactFailure(
ctx, limit, 0)
if err != nil {
return nil, Error.Wrap(err)
}
for _, row := range rows {
nextNode, err := convertDBNode(ctx, row)
for _, node := range dbxNodes {
nodeID, err := storj.NodeIDFromBytes(node.Id)
if err != nil {
return nil, Error.Wrap(err)
return nil, err
}
nodes = append(nodes, &nextNode.Node)
nodeLastContact := overlay.NodeLastContact{
ID: nodeID,
Address: node.Address,
LastContactSuccess: node.LastContactSuccess.UTC(),
LastContactFailure: node.LastContactFailure.UTC(),
}
nodeLastContacts = append(nodeLastContacts, nodeLastContact)
}
return nodes, nil
return nodeLastContacts, nil
}
func convertDBNode(ctx context.Context, info *dbx.Node) (_ *overlay.NodeDossier, err error) {