storj/satellite/satellitedb/overlaycache.go
Jessica Grebenschikov 6a6427526b satellite/overlay: remove old updateaddress method
The UpdateAddress method use to be used when storage node's checked in with the Satellite, but once the contact service was created this method was no longer used. This PR finally removes it.

Change-Id: Ib3f83c8003269671d97d54f21ee69665fa663f24
2020-04-30 06:41:48 +00:00

1401 lines
43 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package satellitedb
import (
"context"
"database/sql"
"encoding/hex"
"fmt"
"sort"
"time"
"github.com/lib/pq"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/private/version"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/satellitedb/dbx"
)
var (
mon = monkit.Package()
)
var _ overlay.DB = (*overlaycache)(nil)
type overlaycache struct {
db *satelliteDB
}
// SelectAllStorageNodesUpload returns all nodes that qualify to store data, organized as reputable nodes and new nodes
func (cache *overlaycache) SelectAllStorageNodesUpload(ctx context.Context, selectionCfg overlay.NodeSelectionConfig) (reputable, new []*overlay.SelectedNode, err error) {
defer mon.Task()(&ctx)(&err)
query := `
SELECT id, address, last_net, last_ip_port, (total_audit_count < $1 OR total_uptime_count < $2) as isnew
FROM nodes
WHERE disqualified IS NULL
AND suspended IS NULL
AND exit_initiated_at IS NULL
AND type = $3
AND free_disk >= $4
AND last_contact_success > $5
`
args := []interface{}{
// $1, $2
selectionCfg.AuditCount, selectionCfg.UptimeCount,
// $3
int(pb.NodeType_STORAGE),
// $4
selectionCfg.MinimumDiskSpace.Int64(),
// $5
time.Now().Add(-selectionCfg.OnlineWindow),
}
if selectionCfg.MinimumVersion != "" {
version, err := version.NewSemVer(selectionCfg.MinimumVersion)
if err != nil {
return nil, nil, err
}
query += `AND (major > $6 OR (major = $7 AND (minor > $8 OR (minor = $9 AND patch >= $10)))) AND release`
args = append(args,
// $6 - $10
version.Major, version.Major, version.Minor, version.Minor, version.Patch,
)
}
rows, err := cache.db.Query(ctx, query, args...)
if err != nil {
return nil, nil, err
}
defer func() { err = errs.Combine(err, rows.Close()) }()
var reputableNodes []*overlay.SelectedNode
var newNodes []*overlay.SelectedNode
for rows.Next() {
var node overlay.SelectedNode
node.Address = &pb.NodeAddress{}
var lastIPPort sql.NullString
var isnew bool
err = rows.Scan(&node.ID, &node.Address.Address, &node.LastNet, &lastIPPort, &isnew)
if err != nil {
return nil, nil, err
}
if lastIPPort.Valid {
node.LastIPPort = lastIPPort.String
}
if isnew {
newNodes = append(newNodes, &node)
continue
}
reputableNodes = append(reputableNodes, &node)
}
return reputableNodes, newNodes, Error.Wrap(rows.Err())
}
// GetNodesNetwork returns the /24 subnet for each storage node, order is not guaranteed.
func (cache *overlaycache) GetNodesNetwork(ctx context.Context, nodeIDs []storj.NodeID) (nodeNets []string, err error) {
defer mon.Task()(&ctx)(&err)
var rows *sql.Rows
rows, err = cache.db.Query(ctx, cache.db.Rebind(`
SELECT last_net FROM nodes
WHERE id = any($1::bytea[])
`), postgresNodeIDList(nodeIDs),
)
if err != nil {
return nil, err
}
defer func() { err = errs.Combine(err, rows.Close()) }()
for rows.Next() {
var ip string
err = rows.Scan(&ip)
if err != nil {
return nil, err
}
nodeNets = append(nodeNets, ip)
}
return nodeNets, Error.Wrap(rows.Err())
}
// Get looks up the node by nodeID
func (cache *overlaycache) Get(ctx context.Context, id storj.NodeID) (_ *overlay.NodeDossier, err error) {
defer mon.Task()(&ctx)(&err)
if id.IsZero() {
return nil, overlay.ErrEmptyNode
}
node, err := cache.db.Get_Node_By_Id(ctx, dbx.Node_Id(id.Bytes()))
if err == sql.ErrNoRows {
return nil, overlay.ErrNodeNotFound.New("%v", id)
}
if err != nil {
return nil, err
}
return convertDBNode(ctx, node)
}
// GetOnlineNodesForGetDelete returns a map of nodes for the supplied nodeIDs
func (cache *overlaycache) GetOnlineNodesForGetDelete(ctx context.Context, nodeIDs []storj.NodeID, onlineWindow time.Duration) (_ map[storj.NodeID]*overlay.SelectedNode, err error) {
defer mon.Task()(&ctx)(&err)
var rows *sql.Rows
rows, err = cache.db.Query(ctx, cache.db.Rebind(`
SELECT last_net, id, address, last_ip_port
FROM nodes
WHERE id = any($1::bytea[])
AND disqualified IS NULL
AND last_contact_success > $2
`), postgresNodeIDList(nodeIDs), time.Now().Add(-onlineWindow))
if err != nil {
return nil, err
}
defer func() { err = errs.Combine(err, rows.Close()) }()
nodes := make(map[storj.NodeID]*overlay.SelectedNode)
for rows.Next() {
var node overlay.SelectedNode
node.Address = &pb.NodeAddress{Transport: pb.NodeTransport_TCP_TLS_GRPC}
var lastIPPort sql.NullString
err = rows.Scan(&node.LastNet, &node.ID, &node.Address.Address, &lastIPPort)
if err != nil {
return nil, err
}
if lastIPPort.Valid {
node.LastIPPort = lastIPPort.String
}
nodes[node.ID] = &node
}
return nodes, Error.Wrap(rows.Err())
}
// KnownOffline filters a set of nodes to offline nodes
func (cache *overlaycache) KnownOffline(ctx context.Context, criteria *overlay.NodeCriteria, nodeIds storj.NodeIDList) (offlineNodes storj.NodeIDList, err error) {
defer mon.Task()(&ctx)(&err)
if len(nodeIds) == 0 {
return nil, Error.New("no ids provided")
}
// get offline nodes
var rows *sql.Rows
rows, err = cache.db.Query(ctx, cache.db.Rebind(`
SELECT id FROM nodes
WHERE id = any($1::bytea[])
AND last_contact_success < $2
`), postgresNodeIDList(nodeIds), time.Now().Add(-criteria.OnlineWindow),
)
if err != nil {
return nil, err
}
defer func() { err = errs.Combine(err, rows.Close()) }()
for rows.Next() {
var id storj.NodeID
err = rows.Scan(&id)
if err != nil {
return nil, err
}
offlineNodes = append(offlineNodes, id)
}
return offlineNodes, Error.Wrap(rows.Err())
}
// KnownUnreliableOrOffline filters a set of nodes to unreliable or offlines node, independent of new
func (cache *overlaycache) KnownUnreliableOrOffline(ctx context.Context, criteria *overlay.NodeCriteria, nodeIds storj.NodeIDList) (badNodes storj.NodeIDList, err error) {
defer mon.Task()(&ctx)(&err)
if len(nodeIds) == 0 {
return nil, Error.New("no ids provided")
}
// get reliable and online nodes
var rows *sql.Rows
rows, err = cache.db.Query(ctx, cache.db.Rebind(`
SELECT id FROM nodes
WHERE id = any($1::bytea[])
AND disqualified IS NULL
AND suspended IS NULL
AND exit_finished_at IS NULL
AND last_contact_success > $2
`), postgresNodeIDList(nodeIds), time.Now().Add(-criteria.OnlineWindow),
)
if err != nil {
return nil, err
}
defer func() { err = errs.Combine(err, rows.Close()) }()
goodNodes := make(map[storj.NodeID]struct{}, len(nodeIds))
for rows.Next() {
var id storj.NodeID
err = rows.Scan(&id)
if err != nil {
return nil, err
}
goodNodes[id] = struct{}{}
}
for _, id := range nodeIds {
if _, ok := goodNodes[id]; !ok {
badNodes = append(badNodes, id)
}
}
return badNodes, Error.Wrap(rows.Err())
}
// KnownReliable filters a set of nodes to reliable (online and qualified) nodes.
func (cache *overlaycache) KnownReliable(ctx context.Context, onlineWindow time.Duration, nodeIDs storj.NodeIDList) (nodes []*pb.Node, err error) {
defer mon.Task()(&ctx)(&err)
if len(nodeIDs) == 0 {
return nil, Error.New("no ids provided")
}
// get online nodes
rows, err := cache.db.Query(ctx, cache.db.Rebind(`
SELECT id, last_net, last_ip_port, address, protocol
FROM nodes
WHERE id = any($1::bytea[])
AND disqualified IS NULL
AND suspended IS NULL
AND exit_finished_at IS NULL
AND last_contact_success > $2
`), postgresNodeIDList(nodeIDs), time.Now().Add(-onlineWindow),
)
if err != nil {
return nil, err
}
defer func() { err = errs.Combine(err, rows.Close()) }()
for rows.Next() {
row := &dbx.Node{}
err = rows.Scan(&row.Id, &row.LastNet, &row.LastIpPort, &row.Address, &row.Protocol)
if err != nil {
return nil, err
}
node, err := convertDBNode(ctx, row)
if err != nil {
return nil, err
}
nodes = append(nodes, &node.Node)
}
return nodes, Error.Wrap(rows.Err())
}
// Reliable returns all reliable nodes.
func (cache *overlaycache) Reliable(ctx context.Context, criteria *overlay.NodeCriteria) (nodes storj.NodeIDList, err error) {
// get reliable and online nodes
rows, err := cache.db.Query(ctx, cache.db.Rebind(`
SELECT id FROM nodes
WHERE disqualified IS NULL
AND suspended IS NULL
AND exit_finished_at IS NULL
AND last_contact_success > ?
`), time.Now().Add(-criteria.OnlineWindow))
if err != nil {
return nil, err
}
defer func() {
err = errs.Combine(err, rows.Close())
}()
for rows.Next() {
var id storj.NodeID
err = rows.Scan(&id)
if err != nil {
return nil, err
}
nodes = append(nodes, id)
}
return nodes, Error.Wrap(rows.Err())
}
// BatchUpdateStats updates multiple storagenode's stats in one transaction
func (cache *overlaycache) BatchUpdateStats(ctx context.Context, updateRequests []*overlay.UpdateRequest, batchSize int) (failed storj.NodeIDList, err error) {
defer mon.Task()(&ctx)(&err)
if len(updateRequests) == 0 {
return failed, nil
}
// ensure updates happen in-order
sort.Slice(updateRequests, func(i, k int) bool {
return updateRequests[i].NodeID.Less(updateRequests[k].NodeID)
})
doUpdate := func(updateSlice []*overlay.UpdateRequest) (duf storj.NodeIDList, err error) {
appendAll := func() {
for _, ur := range updateRequests {
duf = append(duf, ur.NodeID)
}
}
doAppendAll := true
err = cache.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) (err error) {
var allSQL string
for _, updateReq := range updateSlice {
dbNode, err := tx.Get_Node_By_Id(ctx, dbx.Node_Id(updateReq.NodeID.Bytes()))
if err != nil {
doAppendAll = false
return err
}
// do not update reputation if node is disqualified
if dbNode.Disqualified != nil {
continue
}
// do not update reputation if node has gracefully exited
if dbNode.ExitFinishedAt != nil {
continue
}
updateNodeStats := cache.populateUpdateNodeStats(dbNode, updateReq)
sql := buildUpdateStatement(updateNodeStats)
allSQL += sql
}
if allSQL != "" {
results, err := tx.Tx.Exec(ctx, allSQL)
if err != nil {
return err
}
_, err = results.RowsAffected()
if err != nil {
return err
}
}
return nil
})
if err != nil {
if doAppendAll {
appendAll()
}
return duf, Error.Wrap(err)
}
return duf, nil
}
var errlist errs.Group
length := len(updateRequests)
for i := 0; i < length; i += batchSize {
end := i + batchSize
if end > length {
end = length
}
failedBatch, err := doUpdate(updateRequests[i:end])
if err != nil && len(failedBatch) > 0 {
for _, fb := range failedBatch {
errlist.Add(err)
failed = append(failed, fb)
}
}
}
return failed, errlist.Err()
}
// UpdateStats a single storagenode's stats in the db
func (cache *overlaycache) UpdateStats(ctx context.Context, updateReq *overlay.UpdateRequest) (stats *overlay.NodeStats, err error) {
defer mon.Task()(&ctx)(&err)
nodeID := updateReq.NodeID
var dbNode *dbx.Node
err = cache.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) (err error) {
dbNode, err = tx.Get_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()))
if err != nil {
return err
}
// do not update reputation if node is disqualified
if dbNode.Disqualified != nil {
return nil
}
// do not update reputation if node has gracefully exited
if dbNode.ExitFinishedAt != nil {
return nil
}
updateFields := cache.populateUpdateFields(dbNode, updateReq)
dbNode, err = tx.Update_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()), updateFields)
if err != nil {
return err
}
// Cleanup containment table too
_, err = tx.Delete_PendingAudits_By_NodeId(ctx, dbx.PendingAudits_NodeId(nodeID.Bytes()))
return err
})
if err != nil {
return nil, Error.Wrap(err)
}
// TODO: Allegedly tx.Get_Node_By_Id and tx.Update_Node_By_Id should never return a nil value for dbNode,
// however we've seen from some crashes that it does. We need to track down the cause of these crashes
// but for now we're adding a nil check to prevent a panic.
if dbNode == nil {
return nil, Error.New("unable to get node by ID: %v", nodeID)
}
return getNodeStats(dbNode), nil
}
// UpdateNodeInfo updates the following fields for a given node ID:
// wallet, email for node operator, free disk, and version
func (cache *overlaycache) UpdateNodeInfo(ctx context.Context, nodeID storj.NodeID, nodeInfo *pb.InfoResponse) (stats *overlay.NodeDossier, err error) {
defer mon.Task()(&ctx)(&err)
var updateFields dbx.Node_Update_Fields
if nodeInfo != nil {
if nodeInfo.GetType() != pb.NodeType_INVALID {
updateFields.Type = dbx.Node_Type(int(nodeInfo.GetType()))
}
if nodeInfo.GetOperator() != nil {
updateFields.Wallet = dbx.Node_Wallet(nodeInfo.GetOperator().GetWallet())
updateFields.Email = dbx.Node_Email(nodeInfo.GetOperator().GetEmail())
}
if nodeInfo.GetCapacity() != nil {
updateFields.FreeDisk = dbx.Node_FreeDisk(nodeInfo.GetCapacity().GetFreeDisk())
}
if nodeInfo.GetVersion() != nil {
semVer, err := version.NewSemVer(nodeInfo.GetVersion().GetVersion())
if err != nil {
return nil, errs.New("unable to convert version to semVer")
}
updateFields.Major = dbx.Node_Major(int64(semVer.Major))
updateFields.Minor = dbx.Node_Minor(int64(semVer.Minor))
updateFields.Patch = dbx.Node_Patch(int64(semVer.Patch))
updateFields.Hash = dbx.Node_Hash(nodeInfo.GetVersion().GetCommitHash())
updateFields.Timestamp = dbx.Node_Timestamp(nodeInfo.GetVersion().Timestamp)
updateFields.Release = dbx.Node_Release(nodeInfo.GetVersion().GetRelease())
}
}
updatedDBNode, err := cache.db.Update_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()), updateFields)
if err != nil {
return nil, Error.Wrap(err)
}
return convertDBNode(ctx, updatedDBNode)
}
// UpdateUptime updates a single storagenode's uptime stats in the db
func (cache *overlaycache) UpdateUptime(ctx context.Context, nodeID storj.NodeID, isUp bool) (stats *overlay.NodeStats, err error) {
defer mon.Task()(&ctx)(&err)
var dbNode *dbx.Node
err = cache.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) (err error) {
dbNode, err = tx.Get_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()))
if err != nil {
return err
}
// do not update reputation if node is disqualified
if dbNode.Disqualified != nil {
return nil
}
// do not update reputation if node has gracefully exited
if dbNode.ExitFinishedAt != nil {
return nil
}
updateFields := dbx.Node_Update_Fields{}
totalUptimeCount := dbNode.TotalUptimeCount
lastContactSuccess := dbNode.LastContactSuccess
lastContactFailure := dbNode.LastContactFailure
mon.Meter("uptime_updates").Mark(1)
if isUp {
totalUptimeCount++
updateFields.UptimeSuccessCount = dbx.Node_UptimeSuccessCount(dbNode.UptimeSuccessCount + 1)
updateFields.LastContactSuccess = dbx.Node_LastContactSuccess(time.Now())
mon.Meter("uptime_update_successes").Mark(1)
// we have seen this node in the past 24 hours
if time.Since(lastContactFailure) > time.Hour*24 {
mon.Meter("uptime_seen_24h").Mark(1)
}
// we have seen this node in the past week
if time.Since(lastContactFailure) > time.Hour*24*7 {
mon.Meter("uptime_seen_week").Mark(1)
}
} else {
updateFields.LastContactFailure = dbx.Node_LastContactFailure(time.Now())
mon.Meter("uptime_update_failures").Mark(1)
// it's been over 24 hours since we've seen this node
if time.Since(lastContactSuccess) > time.Hour*24 {
mon.Meter("uptime_not_seen_24h").Mark(1)
}
// it's been over a week since we've seen this node
if time.Since(lastContactSuccess) > time.Hour*24*7 {
mon.Meter("uptime_not_seen_week").Mark(1)
}
}
updateFields.TotalUptimeCount = dbx.Node_TotalUptimeCount(totalUptimeCount)
dbNode, err = tx.Update_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()), updateFields)
return err
})
if err != nil {
return nil, Error.Wrap(err)
}
// TODO: Allegedly tx.Get_Node_By_Id and tx.Update_Node_By_Id should never return a nil value for dbNode,
// however we've seen from some crashes that it does. We need to track down the cause of these crashes
// but for now we're adding a nil check to prevent a panic.
if dbNode == nil {
return nil, Error.New("unable to get node by ID: %v", nodeID)
}
return getNodeStats(dbNode), nil
}
// DisqualifyNode disqualifies a storage node.
func (cache *overlaycache) DisqualifyNode(ctx context.Context, nodeID storj.NodeID) (err error) {
defer mon.Task()(&ctx)(&err)
updateFields := dbx.Node_Update_Fields{}
updateFields.Disqualified = dbx.Node_Disqualified(time.Now().UTC())
dbNode, err := cache.db.Update_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()), updateFields)
if err != nil {
return err
}
if dbNode == nil {
return errs.New("unable to get node by ID: %v", nodeID)
}
return nil
}
// SuspendNode suspends a storage node.
func (cache *overlaycache) SuspendNode(ctx context.Context, nodeID storj.NodeID, suspendedAt time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
updateFields := dbx.Node_Update_Fields{}
updateFields.Suspended = dbx.Node_Suspended(suspendedAt.UTC())
dbNode, err := cache.db.Update_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()), updateFields)
if err != nil {
return err
}
if dbNode == nil {
return errs.New("unable to get node by ID: %v", nodeID)
}
return nil
}
// UnsuspendNode unsuspends a storage node.
func (cache *overlaycache) UnsuspendNode(ctx context.Context, nodeID storj.NodeID) (err error) {
defer mon.Task()(&ctx)(&err)
updateFields := dbx.Node_Update_Fields{}
updateFields.Suspended = dbx.Node_Suspended_Null()
dbNode, err := cache.db.Update_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()), updateFields)
if err != nil {
return err
}
if dbNode == nil {
return errs.New("unable to get node by ID: %v", nodeID)
}
return nil
}
// AllPieceCounts returns a map of node IDs to piece counts from the db.
// NB: a valid, partial piece map can be returned even if node ID parsing error(s) are returned.
func (cache *overlaycache) AllPieceCounts(ctx context.Context) (_ map[storj.NodeID]int, err error) {
defer mon.Task()(&ctx)(&err)
// NB: `All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number` selects node
// ID and piece count from the nodes table where piece count is not zero.
rows, err := cache.db.All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number(ctx)
if err != nil {
return nil, Error.Wrap(err)
}
pieceCounts := make(map[storj.NodeID]int)
nodeIDErrs := errs.Group{}
for _, row := range rows {
nodeID, err := storj.NodeIDFromBytes(row.Id)
if err != nil {
nodeIDErrs.Add(err)
continue
}
pieceCounts[nodeID] = int(row.PieceCount)
}
return pieceCounts, nodeIDErrs.Err()
}
func (cache *overlaycache) UpdatePieceCounts(ctx context.Context, pieceCounts map[storj.NodeID]int) (err error) {
defer mon.Task()(&ctx)(&err)
if len(pieceCounts) == 0 {
return nil
}
// TODO: pass in the apprioriate struct to database, rather than constructing it here
type NodeCount struct {
ID storj.NodeID
Count int64
}
var counts []NodeCount
for nodeid, count := range pieceCounts {
counts = append(counts, NodeCount{
ID: nodeid,
Count: int64(count),
})
}
sort.Slice(counts, func(i, k int) bool {
return counts[i].ID.Less(counts[k].ID)
})
var nodeIDs []storj.NodeID
var countNumbers []int64
for _, count := range counts {
nodeIDs = append(nodeIDs, count.ID)
countNumbers = append(countNumbers, count.Count)
}
_, err = cache.db.ExecContext(ctx, `
UPDATE nodes
SET piece_count = update.count
FROM (
SELECT unnest($1::bytea[]) as id, unnest($2::bigint[]) as count
) as update
WHERE nodes.id = update.id
`, postgresNodeIDList(nodeIDs), pq.Array(countNumbers))
return Error.Wrap(err)
}
// GetExitingNodes returns nodes who have initiated a graceful exit and is not disqualified, but have not completed it.
func (cache *overlaycache) GetExitingNodes(ctx context.Context) (exitingNodes []*overlay.ExitStatus, err error) {
defer mon.Task()(&ctx)(&err)
rows, err := cache.db.Query(ctx, cache.db.Rebind(`
SELECT id, exit_initiated_at, exit_loop_completed_at, exit_finished_at, exit_success FROM nodes
WHERE exit_initiated_at IS NOT NULL
AND exit_finished_at IS NULL
AND disqualified is NULL
`))
if err != nil {
return nil, err
}
defer func() { err = errs.Combine(err, rows.Close()) }()
for rows.Next() {
var exitingNodeStatus overlay.ExitStatus
err = rows.Scan(&exitingNodeStatus.NodeID, &exitingNodeStatus.ExitInitiatedAt, &exitingNodeStatus.ExitLoopCompletedAt, &exitingNodeStatus.ExitFinishedAt, &exitingNodeStatus.ExitSuccess)
if err != nil {
return nil, err
}
exitingNodes = append(exitingNodes, &exitingNodeStatus)
}
return exitingNodes, Error.Wrap(rows.Err())
}
// GetExitStatus returns a node's graceful exit status.
func (cache *overlaycache) GetExitStatus(ctx context.Context, nodeID storj.NodeID) (_ *overlay.ExitStatus, err error) {
defer mon.Task()(&ctx)(&err)
rows, err := cache.db.Query(ctx, cache.db.Rebind(`
SELECT id, exit_initiated_at, exit_loop_completed_at, exit_finished_at, exit_success
FROM nodes
WHERE id = ?
`), nodeID)
if err != nil {
return nil, Error.Wrap(err)
}
defer func() { err = errs.Combine(err, rows.Close()) }()
exitStatus := &overlay.ExitStatus{}
if rows.Next() {
err = rows.Scan(&exitStatus.NodeID, &exitStatus.ExitInitiatedAt, &exitStatus.ExitLoopCompletedAt, &exitStatus.ExitFinishedAt, &exitStatus.ExitSuccess)
if err != nil {
return nil, err
}
}
return exitStatus, Error.Wrap(rows.Err())
}
// GetGracefulExitCompletedByTimeFrame returns nodes who have completed graceful exit within a time window (time window is around graceful exit completion).
func (cache *overlaycache) GetGracefulExitCompletedByTimeFrame(ctx context.Context, begin, end time.Time) (exitedNodes storj.NodeIDList, err error) {
defer mon.Task()(&ctx)(&err)
rows, err := cache.db.Query(ctx, cache.db.Rebind(`
SELECT id FROM nodes
WHERE exit_initiated_at IS NOT NULL
AND exit_finished_at IS NOT NULL
AND exit_finished_at >= ?
AND exit_finished_at < ?
`), begin, end)
if err != nil {
return nil, err
}
defer func() {
err = errs.Combine(err, rows.Close())
}()
for rows.Next() {
var id storj.NodeID
err = rows.Scan(&id)
if err != nil {
return nil, err
}
exitedNodes = append(exitedNodes, id)
}
return exitedNodes, Error.Wrap(rows.Err())
}
// GetGracefulExitIncompleteByTimeFrame returns nodes who have initiated, but not completed graceful exit within a time window (time window is around graceful exit initiation).
func (cache *overlaycache) GetGracefulExitIncompleteByTimeFrame(ctx context.Context, begin, end time.Time) (exitingNodes storj.NodeIDList, err error) {
defer mon.Task()(&ctx)(&err)
rows, err := cache.db.Query(ctx, cache.db.Rebind(`
SELECT id FROM nodes
WHERE exit_initiated_at IS NOT NULL
AND exit_finished_at IS NULL
AND exit_initiated_at >= ?
AND exit_initiated_at < ?
`), begin, end)
if err != nil {
return nil, err
}
defer func() {
err = errs.Combine(err, rows.Close())
}()
// TODO return more than just ID
for rows.Next() {
var id storj.NodeID
err = rows.Scan(&id)
if err != nil {
return nil, err
}
exitingNodes = append(exitingNodes, id)
}
return exitingNodes, Error.Wrap(rows.Err())
}
// UpdateExitStatus is used to update a node's graceful exit status.
func (cache *overlaycache) UpdateExitStatus(ctx context.Context, request *overlay.ExitStatusRequest) (_ *overlay.NodeDossier, err error) {
defer mon.Task()(&ctx)(&err)
nodeID := request.NodeID
updateFields := populateExitStatusFields(request)
dbNode, err := cache.db.Update_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()), updateFields)
if err != nil {
return nil, Error.Wrap(err)
}
if dbNode == nil {
return nil, Error.Wrap(errs.New("unable to get node by ID: %v", nodeID))
}
return convertDBNode(ctx, dbNode)
}
// GetSuccesfulNodesNotCheckedInSince returns all nodes that last check-in was successful, but haven't checked-in within a given duration.
func (cache *overlaycache) GetSuccesfulNodesNotCheckedInSince(ctx context.Context, duration time.Duration) (nodeLastContacts []overlay.NodeLastContact, err error) {
// get successful nodes that have not checked-in with the hour
defer mon.Task()(&ctx)(&err)
dbxNodes, err := cache.db.DB.All_Node_Id_Node_Address_Node_LastIpPort_Node_LastContactSuccess_Node_LastContactFailure_By_LastContactSuccess_Less_And_LastContactSuccess_Greater_LastContactFailure_And_Disqualified_Is_Null_OrderBy_Asc_LastContactSuccess(
ctx, dbx.Node_LastContactSuccess(time.Now().UTC().Add(-duration)))
if err != nil {
return nil, Error.Wrap(err)
}
for _, node := range dbxNodes {
nodeID, err := storj.NodeIDFromBytes(node.Id)
if err != nil {
return nil, err
}
nodeLastContact := overlay.NodeLastContact{
ID: nodeID,
Address: node.Address,
LastContactSuccess: node.LastContactSuccess.UTC(),
LastContactFailure: node.LastContactFailure.UTC(),
}
if node.LastIpPort != nil {
nodeLastContact.LastIPPort = *node.LastIpPort
}
nodeLastContacts = append(nodeLastContacts, nodeLastContact)
}
return nodeLastContacts, nil
}
func populateExitStatusFields(req *overlay.ExitStatusRequest) dbx.Node_Update_Fields {
dbxUpdateFields := dbx.Node_Update_Fields{}
if !req.ExitInitiatedAt.IsZero() {
dbxUpdateFields.ExitInitiatedAt = dbx.Node_ExitInitiatedAt(req.ExitInitiatedAt)
}
if !req.ExitLoopCompletedAt.IsZero() {
dbxUpdateFields.ExitLoopCompletedAt = dbx.Node_ExitLoopCompletedAt(req.ExitLoopCompletedAt)
}
if !req.ExitFinishedAt.IsZero() {
dbxUpdateFields.ExitFinishedAt = dbx.Node_ExitFinishedAt(req.ExitFinishedAt)
}
dbxUpdateFields.ExitSuccess = dbx.Node_ExitSuccess(req.ExitSuccess)
return dbxUpdateFields
}
// GetOfflineNodesLimited returns a list of the first N offline nodes ordered by least recently contacted.
func (cache *overlaycache) GetOfflineNodesLimited(ctx context.Context, limit int) (nodeLastContacts []overlay.NodeLastContact, err error) {
defer mon.Task()(&ctx)(&err)
dbxNodes, err := cache.db.DB.Limited_Node_Id_Node_Address_Node_LastIpPort_Node_LastContactSuccess_Node_LastContactFailure_By_LastContactSuccess_Less_LastContactFailure_And_Disqualified_Is_Null_OrderBy_Asc_LastContactFailure(
ctx, limit, 0)
if err != nil {
return nil, Error.Wrap(err)
}
for _, node := range dbxNodes {
nodeID, err := storj.NodeIDFromBytes(node.Id)
if err != nil {
return nil, err
}
nodeLastContact := overlay.NodeLastContact{
ID: nodeID,
Address: node.Address,
LastContactSuccess: node.LastContactSuccess.UTC(),
LastContactFailure: node.LastContactFailure.UTC(),
}
if node.LastIpPort != nil {
nodeLastContact.LastIPPort = *node.LastIpPort
}
nodeLastContacts = append(nodeLastContacts, nodeLastContact)
}
return nodeLastContacts, nil
}
func convertDBNode(ctx context.Context, info *dbx.Node) (_ *overlay.NodeDossier, err error) {
defer mon.Task()(&ctx)(&err)
if info == nil {
return nil, Error.New("missing info")
}
id, err := storj.NodeIDFromBytes(info.Id)
if err != nil {
return nil, err
}
ver, err := version.NewSemVer(fmt.Sprintf("%d.%d.%d", info.Major, info.Minor, info.Patch))
if err != nil {
return nil, err
}
exitStatus := overlay.ExitStatus{NodeID: id}
exitStatus.ExitInitiatedAt = info.ExitInitiatedAt
exitStatus.ExitLoopCompletedAt = info.ExitLoopCompletedAt
exitStatus.ExitFinishedAt = info.ExitFinishedAt
exitStatus.ExitSuccess = info.ExitSuccess
node := &overlay.NodeDossier{
Node: pb.Node{
Id: id,
Address: &pb.NodeAddress{
Address: info.Address,
Transport: pb.NodeTransport(info.Protocol),
},
},
Type: pb.NodeType(info.Type),
Operator: pb.NodeOperator{
Email: info.Email,
Wallet: info.Wallet,
},
Capacity: pb.NodeCapacity{
FreeDisk: info.FreeDisk,
},
Reputation: *getNodeStats(info),
Version: pb.NodeVersion{
Version: ver.String(),
CommitHash: info.Hash,
Timestamp: info.Timestamp,
Release: info.Release,
},
Contained: info.Contained,
Disqualified: info.Disqualified,
Suspended: info.Suspended,
PieceCount: info.PieceCount,
ExitStatus: exitStatus,
CreatedAt: info.CreatedAt,
LastNet: info.LastNet,
}
if info.LastIpPort != nil {
node.LastIPPort = *info.LastIpPort
}
return node, nil
}
func getNodeStats(dbNode *dbx.Node) *overlay.NodeStats {
nodeStats := &overlay.NodeStats{
Latency90: dbNode.Latency90,
AuditCount: dbNode.TotalAuditCount,
AuditSuccessCount: dbNode.AuditSuccessCount,
UptimeCount: dbNode.TotalUptimeCount,
UptimeSuccessCount: dbNode.UptimeSuccessCount,
LastContactSuccess: dbNode.LastContactSuccess,
LastContactFailure: dbNode.LastContactFailure,
AuditReputationAlpha: dbNode.AuditReputationAlpha,
AuditReputationBeta: dbNode.AuditReputationBeta,
Disqualified: dbNode.Disqualified,
UnknownAuditReputationAlpha: dbNode.UnknownAuditReputationAlpha,
UnknownAuditReputationBeta: dbNode.UnknownAuditReputationBeta,
Suspended: dbNode.Suspended,
}
return nodeStats
}
// updateReputation uses the Beta distribution model to determine a node's reputation.
// lambda is the "forgetting factor" which determines how much past info is kept when determining current reputation score.
// w is the normalization weight that affects how severely new updates affect the current reputation distribution.
func updateReputation(isSuccess bool, alpha, beta, lambda, w float64, totalCount int64) (newAlpha, newBeta float64, updatedCount int64) {
// v is a single feedback value that allows us to update both alpha and beta
var v float64 = -1
if isSuccess {
v = 1
}
newAlpha = lambda*alpha + w*(1+v)/2
newBeta = lambda*beta + w*(1-v)/2
return newAlpha, newBeta, totalCount + 1
}
func buildUpdateStatement(update updateNodeStats) string {
if update.NodeID.IsZero() {
return ""
}
atLeastOne := false
sql := "UPDATE nodes SET "
if update.TotalAuditCount.set {
atLeastOne = true
sql += fmt.Sprintf("total_audit_count = %v", update.TotalAuditCount.value)
}
if update.TotalUptimeCount.set {
if atLeastOne {
sql += ","
}
atLeastOne = true
sql += fmt.Sprintf("total_uptime_count = %v", update.TotalUptimeCount.value)
}
if update.AuditReputationAlpha.set {
if atLeastOne {
sql += ","
}
atLeastOne = true
sql += fmt.Sprintf("audit_reputation_alpha = %v", update.AuditReputationAlpha.value)
}
if update.AuditReputationBeta.set {
if atLeastOne {
sql += ","
}
atLeastOne = true
sql += fmt.Sprintf("audit_reputation_beta = %v", update.AuditReputationBeta.value)
}
if update.UnknownAuditReputationAlpha.set {
if atLeastOne {
sql += ","
}
atLeastOne = true
sql += fmt.Sprintf("unknown_audit_reputation_alpha = %v", update.UnknownAuditReputationAlpha.value)
}
if update.UnknownAuditReputationBeta.set {
if atLeastOne {
sql += ","
}
atLeastOne = true
sql += fmt.Sprintf("unknown_audit_reputation_beta = %v", update.UnknownAuditReputationBeta.value)
}
if update.Disqualified.set {
if atLeastOne {
sql += ","
}
atLeastOne = true
sql += fmt.Sprintf("disqualified = '%v'", update.Disqualified.value.Format(time.RFC3339Nano))
}
if update.Suspended.set {
if atLeastOne {
sql += ","
}
atLeastOne = true
if update.Suspended.isNil {
sql += fmt.Sprintf("suspended = NULL")
} else {
sql += fmt.Sprintf("suspended = '%v'", update.Suspended.value.Format(time.RFC3339Nano))
}
}
if update.UptimeSuccessCount.set {
if atLeastOne {
sql += ","
}
atLeastOne = true
sql += fmt.Sprintf("uptime_success_count = %v", update.UptimeSuccessCount.value)
}
if update.LastContactSuccess.set {
if atLeastOne {
sql += ","
}
atLeastOne = true
sql += fmt.Sprintf("last_contact_success = '%v'", update.LastContactSuccess.value.Format(time.RFC3339Nano))
}
if update.LastContactFailure.set {
if atLeastOne {
sql += ","
}
atLeastOne = true
sql += fmt.Sprintf("last_contact_failure = '%v'", update.LastContactFailure.value.Format(time.RFC3339Nano))
}
if update.AuditSuccessCount.set {
if atLeastOne {
sql += ","
}
atLeastOne = true
sql += fmt.Sprintf("audit_success_count = %v", update.AuditSuccessCount.value)
}
if update.Contained.set {
if atLeastOne {
sql += ","
}
atLeastOne = true
sql += fmt.Sprintf("contained = %v", update.Contained.value)
}
if !atLeastOne {
return ""
}
hexNodeID := hex.EncodeToString(update.NodeID.Bytes())
sql += fmt.Sprintf(" WHERE nodes.id = decode('%v', 'hex');\n", hexNodeID)
sql += fmt.Sprintf("DELETE FROM pending_audits WHERE pending_audits.node_id = decode('%v', 'hex');\n", hexNodeID)
return sql
}
type int64Field struct {
set bool
value int64
}
type float64Field struct {
set bool
value float64
}
type boolField struct {
set bool
value bool
}
type timeField struct {
set bool
isNil bool
value time.Time
}
type updateNodeStats struct {
NodeID storj.NodeID
TotalAuditCount int64Field
TotalUptimeCount int64Field
AuditReputationAlpha float64Field
AuditReputationBeta float64Field
Disqualified timeField
UnknownAuditReputationAlpha float64Field
UnknownAuditReputationBeta float64Field
Suspended timeField
UptimeSuccessCount int64Field
LastContactSuccess timeField
LastContactFailure timeField
AuditSuccessCount int64Field
Contained boolField
}
func (cache *overlaycache) populateUpdateNodeStats(dbNode *dbx.Node, updateReq *overlay.UpdateRequest) updateNodeStats {
// there are three audit outcomes: success, failure, and unknown
// if a node fails enough audits, it gets disqualified
// if a node gets enough "unknown" audits, it gets put into suspension
// if a node gets enough successful audits, and is in suspension, it gets removed from suspension
auditAlpha := dbNode.AuditReputationAlpha
auditBeta := dbNode.AuditReputationBeta
unknownAuditAlpha := dbNode.UnknownAuditReputationAlpha
unknownAuditBeta := dbNode.UnknownAuditReputationBeta
totalAuditCount := dbNode.TotalAuditCount
var updatedTotalAuditCount int64
switch updateReq.AuditOutcome {
case overlay.AuditSuccess:
// for a successful audit, increase reputation for normal *and* unknown audits
auditAlpha, auditBeta, updatedTotalAuditCount = updateReputation(
true,
auditAlpha,
auditBeta,
updateReq.AuditLambda,
updateReq.AuditWeight,
totalAuditCount,
)
// we will use updatedTotalAuditCount from the updateReputation call above
unknownAuditAlpha, unknownAuditBeta, _ = updateReputation(
true,
unknownAuditAlpha,
unknownAuditBeta,
updateReq.AuditLambda,
updateReq.AuditWeight,
totalAuditCount,
)
case overlay.AuditFailure:
// for audit failure, only update normal alpha/beta
auditAlpha, auditBeta, updatedTotalAuditCount = updateReputation(
false,
auditAlpha,
auditBeta,
updateReq.AuditLambda,
updateReq.AuditWeight,
totalAuditCount,
)
case overlay.AuditUnknown:
// for audit unknown, only update unknown alpha/beta
unknownAuditAlpha, unknownAuditBeta, updatedTotalAuditCount = updateReputation(
false,
unknownAuditAlpha,
unknownAuditBeta,
updateReq.AuditLambda,
updateReq.AuditWeight,
totalAuditCount,
)
}
mon.FloatVal("audit_reputation_alpha").Observe(auditAlpha) //locked
mon.FloatVal("audit_reputation_beta").Observe(auditBeta) //locked
mon.FloatVal("unknown_audit_reputation_alpha").Observe(unknownAuditAlpha) //locked
mon.FloatVal("unknown_audit_reputation_beta").Observe(unknownAuditBeta) //locked
totalUptimeCount := dbNode.TotalUptimeCount
if updateReq.IsUp {
totalUptimeCount++
}
updateFields := updateNodeStats{
NodeID: updateReq.NodeID,
TotalAuditCount: int64Field{set: true, value: updatedTotalAuditCount},
TotalUptimeCount: int64Field{set: true, value: totalUptimeCount},
AuditReputationAlpha: float64Field{set: true, value: auditAlpha},
AuditReputationBeta: float64Field{set: true, value: auditBeta},
UnknownAuditReputationAlpha: float64Field{set: true, value: unknownAuditAlpha},
UnknownAuditReputationBeta: float64Field{set: true, value: unknownAuditBeta},
}
// disqualification case a
// a) Success/fail audit reputation falls below audit DQ threshold
auditRep := auditAlpha / (auditAlpha + auditBeta)
if auditRep <= updateReq.AuditDQ {
cache.db.log.Info("Disqualified", zap.String("DQ type", "audit failure"), zap.String("Node ID", updateReq.NodeID.String()))
updateFields.Disqualified = timeField{set: true, value: time.Now().UTC()}
}
// if unknown audit rep goes below threshold, suspend node. Otherwise unsuspend node.
unknownAuditRep := unknownAuditAlpha / (unknownAuditAlpha + unknownAuditBeta)
if unknownAuditRep <= updateReq.AuditDQ {
if dbNode.Suspended == nil {
cache.db.log.Info("Suspended", zap.String("Node ID", updateFields.NodeID.String()), zap.String("Category", "Unknown Audits"))
updateFields.Suspended = timeField{set: true, value: time.Now().UTC()}
}
// disqualification case b
// b) Node is suspended (success/unknown reputation below audit DQ threshold)
// AND the suspended grace period has elapsed
// AND audit outcome is unknown or failed
// if suspended grace period has elapsed and audit outcome was failed or unknown,
// disqualify node. Set suspended to nil if node is disqualified
// NOTE: if updateFields.Suspended is set, we just suspended the node so it will not be disqualified
if updateReq.AuditOutcome != overlay.AuditSuccess {
if dbNode.Suspended != nil && !updateFields.Suspended.set &&
time.Since(*dbNode.Suspended) > updateReq.SuspensionGracePeriod {
cache.db.log.Info("Disqualified", zap.String("DQ type", "suspension grace period expired"), zap.String("Node ID", updateReq.NodeID.String()))
updateFields.Disqualified = timeField{set: true, value: time.Now().UTC()}
updateFields.Suspended = timeField{set: true, isNil: true}
}
}
} else if dbNode.Suspended != nil {
cache.db.log.Info("Suspension lifted", zap.String("Category", "Unknown Audits"), zap.String("Node ID", updateFields.NodeID.String()))
updateFields.Suspended = timeField{set: true, isNil: true}
}
if updateReq.IsUp {
updateFields.UptimeSuccessCount = int64Field{set: true, value: dbNode.UptimeSuccessCount + 1}
updateFields.LastContactSuccess = timeField{set: true, value: time.Now()}
} else {
updateFields.LastContactFailure = timeField{set: true, value: time.Now()}
}
if updateReq.AuditOutcome == overlay.AuditSuccess {
updateFields.AuditSuccessCount = int64Field{set: true, value: dbNode.AuditSuccessCount + 1}
}
// Updating node stats always exits it from containment mode
updateFields.Contained = boolField{set: true, value: false}
return updateFields
}
func (cache *overlaycache) populateUpdateFields(dbNode *dbx.Node, updateReq *overlay.UpdateRequest) dbx.Node_Update_Fields {
update := cache.populateUpdateNodeStats(dbNode, updateReq)
updateFields := dbx.Node_Update_Fields{}
if update.TotalAuditCount.set {
updateFields.TotalAuditCount = dbx.Node_TotalAuditCount(update.TotalAuditCount.value)
}
if update.TotalUptimeCount.set {
updateFields.TotalUptimeCount = dbx.Node_TotalUptimeCount(update.TotalUptimeCount.value)
}
if update.AuditReputationAlpha.set {
updateFields.AuditReputationAlpha = dbx.Node_AuditReputationAlpha(update.AuditReputationAlpha.value)
}
if update.AuditReputationBeta.set {
updateFields.AuditReputationBeta = dbx.Node_AuditReputationBeta(update.AuditReputationBeta.value)
}
if update.Disqualified.set {
updateFields.Disqualified = dbx.Node_Disqualified(update.Disqualified.value)
}
if update.UnknownAuditReputationAlpha.set {
updateFields.UnknownAuditReputationAlpha = dbx.Node_UnknownAuditReputationAlpha(update.UnknownAuditReputationAlpha.value)
}
if update.UnknownAuditReputationBeta.set {
updateFields.UnknownAuditReputationBeta = dbx.Node_UnknownAuditReputationBeta(update.UnknownAuditReputationBeta.value)
}
if update.Suspended.set {
if update.Suspended.isNil {
updateFields.Suspended = dbx.Node_Suspended_Null()
} else {
updateFields.Suspended = dbx.Node_Suspended(update.Suspended.value)
}
}
if update.UptimeSuccessCount.set {
updateFields.UptimeSuccessCount = dbx.Node_UptimeSuccessCount(update.UptimeSuccessCount.value)
}
if update.LastContactSuccess.set {
updateFields.LastContactSuccess = dbx.Node_LastContactSuccess(update.LastContactSuccess.value)
}
if update.LastContactFailure.set {
updateFields.LastContactFailure = dbx.Node_LastContactFailure(update.LastContactFailure.value)
}
if update.AuditSuccessCount.set {
updateFields.AuditSuccessCount = dbx.Node_AuditSuccessCount(update.AuditSuccessCount.value)
}
if update.Contained.set {
updateFields.Contained = dbx.Node_Contained(update.Contained.value)
}
if updateReq.AuditOutcome == overlay.AuditSuccess {
updateFields.AuditSuccessCount = dbx.Node_AuditSuccessCount(dbNode.AuditSuccessCount + 1)
}
return updateFields
}
// UpdateCheckIn updates a single storagenode with info from when the the node last checked in.
func (cache *overlaycache) UpdateCheckIn(ctx context.Context, node overlay.NodeCheckInInfo, timestamp time.Time, config overlay.NodeSelectionConfig) (err error) {
defer mon.Task()(&ctx)(&err)
if node.Address.GetAddress() == "" {
return Error.New("error UpdateCheckIn: missing the storage node address")
}
semVer, err := version.NewSemVer(node.Version.GetVersion())
if err != nil {
return Error.New("unable to convert version to semVer")
}
query := `
INSERT INTO nodes
(
id, address, last_net, protocol, type,
email, wallet, free_disk,
uptime_success_count, total_uptime_count,
last_contact_success,
last_contact_failure,
audit_reputation_alpha, audit_reputation_beta,
unknown_audit_reputation_alpha, unknown_audit_reputation_beta,
major, minor, patch, hash, timestamp, release,
last_ip_port
)
VALUES (
$1, $2, $3, $4, $5,
$6, $7, $8,
$9::bool::int, 1,
CASE WHEN $9::bool IS TRUE THEN $18::timestamptz
ELSE '0001-01-01 00:00:00+00'::timestamptz
END,
CASE WHEN $9::bool IS FALSE THEN $18::timestamptz
ELSE '0001-01-01 00:00:00+00'::timestamptz
END,
$10, $11,
$10, $11,
$12, $13, $14, $15, $16, $17,
$19
)
ON CONFLICT (id)
DO UPDATE
SET
address=$2,
last_net=$3,
protocol=$4,
email=$6,
wallet=$7,
free_disk=$8,
major=$12, minor=$13, patch=$14, hash=$15, timestamp=$16, release=$17,
total_uptime_count=nodes.total_uptime_count+1,
uptime_success_count = nodes.uptime_success_count + $9::bool::int,
last_contact_success = CASE WHEN $9::bool IS TRUE
THEN $18::timestamptz
ELSE nodes.last_contact_success
END,
last_contact_failure = CASE WHEN $9::bool IS FALSE
THEN $18::timestamptz
ELSE nodes.last_contact_failure
END,
last_ip_port=$19;
`
_, err = cache.db.ExecContext(ctx, query,
// args $1 - $5
node.NodeID.Bytes(), node.Address.GetAddress(), node.LastNet, node.Address.GetTransport(), int(pb.NodeType_STORAGE),
// args $6 - $8
node.Operator.GetEmail(), node.Operator.GetWallet(), node.Capacity.GetFreeDisk(),
// args $9
node.IsUp,
// args $10 - $11
1, 0,
// args $12 - $17
semVer.Major, semVer.Minor, semVer.Patch, node.Version.GetCommitHash(), node.Version.Timestamp, node.Version.GetRelease(),
// args $18
timestamp,
// args $19
node.LastIPPort,
)
if err != nil {
return Error.Wrap(err)
}
return nil
}