2019-01-24 20:15:10 +00:00
// Copyright (C) 2019 Storj Labs, Inc.
2018-12-17 20:14:16 +00:00
// See LICENSE for copying information.
package satellitedb
import (
"context"
"database/sql"
2020-07-14 14:04:38 +01:00
"errors"
2019-07-31 18:21:06 +01:00
"fmt"
2019-09-11 22:38:58 +01:00
"sort"
2021-01-18 14:33:13 +00:00
"strings"
2019-03-29 08:53:43 +00:00
"time"
2018-12-17 20:14:16 +00:00
2019-11-08 20:40:39 +00:00
"github.com/spacemonkeygo/monkit/v3"
2019-01-15 16:08:45 +00:00
"github.com/zeebo/errs"
2020-04-08 23:28:25 +01:00
"go.uber.org/zap"
2019-01-15 16:08:45 +00:00
2019-12-27 11:48:47 +00:00
"storj.io/common/pb"
"storj.io/common/storj"
2021-10-29 18:44:44 +01:00
"storj.io/common/storj/location"
2021-04-23 10:52:40 +01:00
"storj.io/private/dbutil/cockroachutil"
"storj.io/private/dbutil/pgutil"
"storj.io/private/tagsql"
2020-03-23 19:30:31 +00:00
"storj.io/private/version"
2019-07-28 06:55:36 +01:00
"storj.io/storj/satellite/overlay"
2020-01-15 02:29:51 +00:00
"storj.io/storj/satellite/satellitedb/dbx"
2018-12-17 20:14:16 +00:00
)
2019-03-25 22:25:09 +00:00
var (
2019-06-20 14:56:04 +01:00
mon = monkit . Package ( )
2019-03-25 22:25:09 +00:00
)
2019-01-15 16:08:45 +00:00
var _ overlay . DB = ( * overlaycache ) ( nil )
2018-12-17 20:14:16 +00:00
type overlaycache struct {
2019-12-14 02:29:54 +00:00
db * satelliteDB
2018-12-17 20:14:16 +00:00
}
2020-07-16 15:18:02 +01:00
// SelectAllStorageNodesUpload returns all nodes that qualify to store data, organized as reputable nodes and new nodes.
2020-04-14 21:50:02 +01:00
func ( cache * overlaycache ) SelectAllStorageNodesUpload ( ctx context . Context , selectionCfg overlay . NodeSelectionConfig ) ( reputable , new [ ] * overlay . SelectedNode , err error ) {
2020-11-29 20:54:03 +00:00
for {
reputable , new , err = cache . selectAllStorageNodesUpload ( ctx , selectionCfg )
if err != nil {
if cockroachutil . NeedsRetry ( err ) {
continue
}
return reputable , new , err
}
break
}
return reputable , new , err
}
func ( cache * overlaycache ) selectAllStorageNodesUpload ( ctx context . Context , selectionCfg overlay . NodeSelectionConfig ) ( reputable , new [ ] * overlay . SelectedNode , err error ) {
2020-04-14 21:50:02 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
query := `
2023-06-02 15:47:34 +01:00
SELECT id , address , last_net , last_ip_port , vetted_at , country_code , noise_proto , noise_public_key , debounce_limit , features , country_code
2021-05-11 09:49:26 +01:00
FROM nodes
2022-06-27 13:14:56 +01:00
` + cache.db.impl.AsOfSystemInterval(selectionCfg.AsOfSystemTime.Interval()) + `
2020-04-14 21:50:02 +01:00
WHERE disqualified IS NULL
2020-06-10 17:11:25 +01:00
AND unknown_audit_suspended IS NULL
2021-03-18 19:55:06 +00:00
AND offline_suspended IS NULL
2020-04-14 21:50:02 +01:00
AND exit_initiated_at IS NULL
2020-07-08 15:28:49 +01:00
AND type = $ 1
AND free_disk >= $ 2
AND last_contact_success > $ 3
2020-04-14 21:50:02 +01:00
`
args := [ ] interface { } {
2020-07-08 15:28:49 +01:00
// $1
2020-04-14 21:50:02 +01:00
int ( pb . NodeType_STORAGE ) ,
2020-07-08 15:28:49 +01:00
// $2
2020-04-14 21:50:02 +01:00
selectionCfg . MinimumDiskSpace . Int64 ( ) ,
2020-07-08 15:28:49 +01:00
// $3
2020-04-14 21:50:02 +01:00
time . Now ( ) . Add ( - selectionCfg . OnlineWindow ) ,
}
if selectionCfg . MinimumVersion != "" {
version , err := version . NewSemVer ( selectionCfg . MinimumVersion )
if err != nil {
return nil , nil , err
}
2020-07-08 15:28:49 +01:00
query += ` AND (major > $4 OR (major = $5 AND (minor > $6 OR (minor = $7 AND patch >= $8)))) AND release `
2020-04-14 21:50:02 +01:00
args = append ( args ,
2020-07-08 15:28:49 +01:00
// $4 - $8
2020-04-14 21:50:02 +01:00
version . Major , version . Major , version . Minor , version . Minor , version . Patch ,
)
}
rows , err := cache . db . Query ( ctx , query , args ... )
if err != nil {
return nil , nil , err
}
defer func ( ) { err = errs . Combine ( err , rows . Close ( ) ) } ( )
var reputableNodes [ ] * overlay . SelectedNode
var newNodes [ ] * overlay . SelectedNode
for rows . Next ( ) {
var node overlay . SelectedNode
node . Address = & pb . NodeAddress { }
var lastIPPort sql . NullString
2020-07-08 15:28:49 +01:00
var vettedAt * time . Time
2023-01-02 16:10:47 +00:00
var noise noiseScanner
2023-05-24 12:35:55 +01:00
err = rows . Scan ( & node . ID , & node . Address . Address , & node . LastNet , & lastIPPort , & vettedAt , & node . CountryCode , & noise . Proto ,
2023-06-02 15:47:34 +01:00
& noise . PublicKey , & node . Address . DebounceLimit , & node . Address . Features , & node . CountryCode )
2020-04-14 21:50:02 +01:00
if err != nil {
return nil , nil , err
}
if lastIPPort . Valid {
node . LastIPPort = lastIPPort . String
}
2023-01-02 16:10:47 +00:00
node . Address . NoiseInfo = noise . Convert ( )
2020-04-14 21:50:02 +01:00
2020-07-08 15:28:49 +01:00
if vettedAt == nil {
2020-04-14 21:50:02 +01:00
newNodes = append ( newNodes , & node )
continue
}
reputableNodes = append ( reputableNodes , & node )
}
return reputableNodes , newNodes , Error . Wrap ( rows . Err ( ) )
}
2021-01-28 14:33:53 +00:00
// SelectAllStorageNodesDownload returns all nodes that qualify to store data, organized as reputable nodes and new nodes.
func ( cache * overlaycache ) SelectAllStorageNodesDownload ( ctx context . Context , onlineWindow time . Duration , asOf overlay . AsOfSystemTimeConfig ) ( nodes [ ] * overlay . SelectedNode , err error ) {
for {
nodes , err = cache . selectAllStorageNodesDownload ( ctx , onlineWindow , asOf )
if err != nil {
if cockroachutil . NeedsRetry ( err ) {
continue
}
return nodes , err
}
break
}
return nodes , err
}
func ( cache * overlaycache ) selectAllStorageNodesDownload ( ctx context . Context , onlineWindow time . Duration , asOfConfig overlay . AsOfSystemTimeConfig ) ( _ [ ] * overlay . SelectedNode , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
query := `
2023-06-02 15:47:34 +01:00
SELECT id , address , last_net , last_ip_port , noise_proto , noise_public_key , debounce_limit , features , country_code
2021-05-11 09:49:26 +01:00
FROM nodes
2022-06-27 13:14:56 +01:00
` + cache.db.impl.AsOfSystemInterval(asOfConfig.Interval()) + `
2021-01-28 14:33:53 +00:00
WHERE disqualified IS NULL
AND exit_finished_at IS NULL
AND last_contact_success > $ 1
`
args := [ ] interface { } {
// $1
time . Now ( ) . Add ( - onlineWindow ) ,
}
rows , err := cache . db . Query ( ctx , query , args ... )
if err != nil {
return nil , err
}
defer func ( ) { err = errs . Combine ( err , rows . Close ( ) ) } ( )
var nodes [ ] * overlay . SelectedNode
for rows . Next ( ) {
var node overlay . SelectedNode
node . Address = & pb . NodeAddress { }
var lastIPPort sql . NullString
2023-01-02 16:10:47 +00:00
var noise noiseScanner
2023-05-24 12:35:55 +01:00
err = rows . Scan ( & node . ID , & node . Address . Address , & node . LastNet , & lastIPPort , & noise . Proto ,
2023-06-02 15:47:34 +01:00
& noise . PublicKey , & node . Address . DebounceLimit , & node . Address . Features , & node . CountryCode )
2021-01-28 14:33:53 +00:00
if err != nil {
return nil , err
}
if lastIPPort . Valid {
node . LastIPPort = lastIPPort . String
}
2023-01-02 16:10:47 +00:00
node . Address . NoiseInfo = noise . Convert ( )
2021-01-28 14:33:53 +00:00
nodes = append ( nodes , & node )
}
return nodes , Error . Wrap ( rows . Err ( ) )
}
2022-12-13 20:40:15 +00:00
// GetNodesNetwork returns the /24 subnet for each storage node. Order is not guaranteed.
// If a requested node is not in the database, no corresponding last_net will be returned
// for that node.
2020-03-06 22:04:23 +00:00
func ( cache * overlaycache ) GetNodesNetwork ( ctx context . Context , nodeIDs [ ] storj . NodeID ) ( nodeNets [ ] string , err error ) {
2022-12-13 20:40:15 +00:00
query := `
SELECT last_net FROM nodes
WHERE id = any ( $ 1 : : bytea [ ] )
`
2020-11-29 20:54:03 +00:00
for {
2022-12-13 20:40:15 +00:00
nodeNets , err = cache . getNodesNetwork ( ctx , nodeIDs , query )
2020-11-29 20:54:03 +00:00
if err != nil {
if cockroachutil . NeedsRetry ( err ) {
continue
}
return nodeNets , err
}
break
}
return nodeNets , err
}
2022-12-13 20:40:15 +00:00
// GetNodesNetworkInOrder returns the /24 subnet for each storage node, in order. If a
// requested node is not in the database, an empty string will be returned corresponding
// to that node's last_net.
func ( cache * overlaycache ) GetNodesNetworkInOrder ( ctx context . Context , nodeIDs [ ] storj . NodeID ) ( nodeNets [ ] string , err error ) {
query := `
SELECT coalesce ( n . last_net , ' ' )
FROM unnest ( $ 1 : : bytea [ ] ) WITH ORDINALITY AS input ( node_id , ord )
LEFT OUTER JOIN nodes n ON input . node_id = n . id
ORDER BY input . ord
`
for {
nodeNets , err = cache . getNodesNetwork ( ctx , nodeIDs , query )
if err != nil {
if cockroachutil . NeedsRetry ( err ) {
continue
}
return nodeNets , err
}
break
}
return nodeNets , err
}
func ( cache * overlaycache ) getNodesNetwork ( ctx context . Context , nodeIDs [ ] storj . NodeID , query string ) ( nodeNets [ ] string , err error ) {
2019-11-06 21:38:52 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
2020-01-19 13:42:08 +00:00
var rows tagsql . Rows
2022-12-13 20:40:15 +00:00
rows , err = cache . db . Query ( ctx , cache . db . Rebind ( query ) , pgutil . NodeIDArray ( nodeIDs ) )
2019-11-06 21:38:52 +00:00
if err != nil {
return nil , err
}
defer func ( ) { err = errs . Combine ( err , rows . Close ( ) ) } ( )
for rows . Next ( ) {
var ip string
err = rows . Scan ( & ip )
if err != nil {
return nil , err
}
2020-03-06 22:04:23 +00:00
nodeNets = append ( nodeNets , ip )
2019-11-06 21:38:52 +00:00
}
2020-03-06 22:04:23 +00:00
return nodeNets , Error . Wrap ( rows . Err ( ) )
2019-11-06 21:38:52 +00:00
}
2020-07-16 15:18:02 +01:00
// Get looks up the node by nodeID.
2020-11-29 20:54:03 +00:00
func ( cache * overlaycache ) Get ( ctx context . Context , id storj . NodeID ) ( dossier * overlay . NodeDossier , err error ) {
2019-06-04 12:55:38 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-01-15 16:08:45 +00:00
if id . IsZero ( ) {
return nil , overlay . ErrEmptyNode
2018-12-17 20:14:16 +00:00
}
2019-03-29 08:53:43 +00:00
node , err := cache . db . Get_Node_By_Id ( ctx , dbx . Node_Id ( id . Bytes ( ) ) )
2020-07-14 14:04:38 +01:00
if errors . Is ( err , sql . ErrNoRows ) {
2019-08-21 17:30:29 +01:00
return nil , overlay . ErrNodeNotFound . New ( "%v" , id )
2019-01-15 16:08:45 +00:00
}
2018-12-17 20:14:16 +00:00
if err != nil {
2019-01-15 16:08:45 +00:00
return nil , err
2018-12-17 20:14:16 +00:00
}
2019-06-04 12:55:38 +01:00
return convertDBNode ( ctx , node )
2019-01-15 16:08:45 +00:00
}
2021-11-08 20:51:04 +00:00
// GetOnlineNodesForAuditRepair returns a map of nodes for the supplied nodeIDs.
func ( cache * overlaycache ) GetOnlineNodesForAuditRepair ( ctx context . Context , nodeIDs [ ] storj . NodeID , onlineWindow time . Duration ) ( nodes map [ storj . NodeID ] * overlay . NodeReputation , err error ) {
for {
nodes , err = cache . getOnlineNodesForAuditRepair ( ctx , nodeIDs , onlineWindow )
if err != nil {
if cockroachutil . NeedsRetry ( err ) {
continue
}
return nodes , err
}
break
}
return nodes , err
}
func ( cache * overlaycache ) getOnlineNodesForAuditRepair ( ctx context . Context , nodeIDs [ ] storj . NodeID , onlineWindow time . Duration ) ( _ map [ storj . NodeID ] * overlay . NodeReputation , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
var rows tagsql . Rows
rows , err = cache . db . Query ( ctx , cache . db . Rebind ( `
2023-06-02 15:47:34 +01:00
SELECT last_net , id , address , email , last_ip_port , noise_proto , noise_public_key , debounce_limit , features ,
2023-02-28 20:38:13 +00:00
vetted_at , unknown_audit_suspended , offline_suspended
2021-11-08 20:51:04 +00:00
FROM nodes
WHERE id = any ( $ 1 : : bytea [ ] )
AND disqualified IS NULL
AND exit_finished_at IS NULL
AND last_contact_success > $ 2
` ) , pgutil . NodeIDArray ( nodeIDs ) , time . Now ( ) . Add ( - onlineWindow ) )
if err != nil {
return nil , err
}
defer func ( ) { err = errs . Combine ( err , rows . Close ( ) ) } ( )
nodes := make ( map [ storj . NodeID ] * overlay . NodeReputation )
for rows . Next ( ) {
var node overlay . NodeReputation
2023-01-24 15:59:47 +00:00
node . Address = & pb . NodeAddress { }
2021-11-08 20:51:04 +00:00
var lastIPPort sql . NullString
2023-01-02 16:10:47 +00:00
var noise noiseScanner
2023-06-02 15:47:34 +01:00
err = rows . Scan ( & node . LastNet , & node . ID , & node . Address . Address , & node . Reputation . Email , & lastIPPort , & noise . Proto , & noise . PublicKey , & node . Address . DebounceLimit , & node . Address . Features , & node . Reputation . VettedAt , & node . Reputation . UnknownAuditSuspended , & node . Reputation . OfflineSuspended )
2021-11-08 20:51:04 +00:00
if err != nil {
return nil , err
}
if lastIPPort . Valid {
node . LastIPPort = lastIPPort . String
}
2023-01-02 16:10:47 +00:00
node . Address . NoiseInfo = noise . Convert ( )
2021-11-08 20:51:04 +00:00
nodes [ node . ID ] = & node
}
return nodes , Error . Wrap ( rows . Err ( ) )
}
2020-07-16 15:18:02 +01:00
// KnownUnreliableOrOffline filters a set of nodes to unreliable or offlines node, independent of new.
2020-11-29 20:54:03 +00:00
func ( cache * overlaycache ) KnownUnreliableOrOffline ( ctx context . Context , criteria * overlay . NodeCriteria , nodeIDs storj . NodeIDList ) ( badNodes storj . NodeIDList , err error ) {
for {
badNodes , err = cache . knownUnreliableOrOffline ( ctx , criteria , nodeIDs )
if err != nil {
if cockroachutil . NeedsRetry ( err ) {
continue
}
return badNodes , err
}
break
}
return badNodes , err
}
2022-10-07 21:24:43 +01:00
// GetOfflineNodesForEmail gets nodes that we want to send an email to. These are non-disqualified, non-exited nodes where
// last_contact_success is between two points: the point where it is considered offline (offlineWindow), and the point where we don't want
// to send more emails (cutoff). It also filters nodes where last_offline_email is too recent (cooldown).
func ( cache * overlaycache ) GetOfflineNodesForEmail ( ctx context . Context , offlineWindow , cutoff , cooldown time . Duration , limit int ) ( nodes map [ storj . NodeID ] string , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
now := time . Now ( )
nodes = make ( map [ storj . NodeID ] string )
rows , err := cache . db . QueryContext ( ctx , `
SELECT id , email
FROM nodes
WHERE last_contact_success < $ 1
AND last_contact_success > $ 2
AND ( last_offline_email IS NULL OR last_offline_email < $ 3 )
AND email != ' '
AND disqualified is NULL
AND exit_finished_at is NULL
LIMIT $ 4
` , now . Add ( - offlineWindow ) , now . Add ( - cutoff ) , now . Add ( - cooldown ) , limit )
if err != nil {
return
}
defer func ( ) { err = errs . Combine ( err , rows . Close ( ) ) } ( )
var idBytes [ ] byte
var email string
var nodeID storj . NodeID
for rows . Next ( ) {
err = rows . Scan ( & idBytes , & email )
nodeID , err = storj . NodeIDFromBytes ( idBytes )
if err != nil {
return
}
nodes [ nodeID ] = email
}
return nodes , Error . Wrap ( rows . Err ( ) )
}
// UpdateLastOfflineEmail updates last_offline_email for a list of nodes.
func ( cache * overlaycache ) UpdateLastOfflineEmail ( ctx context . Context , nodeIDs storj . NodeIDList , timestamp time . Time ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
_ , err = cache . db . ExecContext ( ctx , `
UPDATE nodes
SET last_offline_email = $ 1
WHERE id = any ( $ 2 : : bytea [ ] )
` , timestamp , pgutil . NodeIDArray ( nodeIDs ) )
return err
}
2022-03-03 00:23:11 +00:00
// KnownReliableInExcludedCountries filters healthy nodes that are in excluded countries.
func ( cache * overlaycache ) KnownReliableInExcludedCountries ( ctx context . Context , criteria * overlay . NodeCriteria , nodeIDs storj . NodeIDList ) ( reliableInExcluded storj . NodeIDList , err error ) {
for {
reliableInExcluded , err = cache . knownReliableInExcludedCountries ( ctx , criteria , nodeIDs )
if err != nil {
if cockroachutil . NeedsRetry ( err ) {
continue
}
return reliableInExcluded , err
}
break
}
return reliableInExcluded , err
}
func ( cache * overlaycache ) knownReliableInExcludedCountries ( ctx context . Context , criteria * overlay . NodeCriteria , nodeIDs storj . NodeIDList ) ( reliableInExcluded storj . NodeIDList , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
if len ( nodeIDs ) == 0 {
return nil , Error . New ( "no ids provided" )
}
args := [ ] interface { } {
pgutil . NodeIDArray ( nodeIDs ) ,
time . Now ( ) . Add ( - criteria . OnlineWindow ) ,
}
// When this config is not set, it's a string slice with one empty string. This is a sanity check just
// in case for some reason it's nil or has no elements.
if criteria . ExcludedCountries == nil || len ( criteria . ExcludedCountries ) == 0 {
return reliableInExcluded , nil
}
var excludedCountriesCondition string
if criteria . ExcludedCountries [ 0 ] == "" {
return reliableInExcluded , nil
}
excludedCountriesCondition = "AND country_code IN (SELECT UNNEST($3::TEXT[]))"
args = append ( args , pgutil . TextArray ( criteria . ExcludedCountries ) )
// get reliable and online nodes
var rows tagsql . Rows
rows , err = cache . db . Query ( ctx , cache . db . Rebind ( `
SELECT id
FROM nodes
` +cache.db.impl.AsOfSystemInterval(criteria.AsOfSystemInterval)+ `
WHERE id = any ( $ 1 : : bytea [ ] )
AND disqualified IS NULL
AND unknown_audit_suspended IS NULL
AND offline_suspended IS NULL
AND exit_finished_at IS NULL
AND last_contact_success > $ 2
` +excludedCountriesCondition+ `
` ) , args ... ,
)
if err != nil {
return nil , err
}
defer func ( ) { err = errs . Combine ( err , rows . Close ( ) ) } ( )
for rows . Next ( ) {
var id storj . NodeID
err = rows . Scan ( & id )
if err != nil {
return nil , err
}
reliableInExcluded = append ( reliableInExcluded , id )
}
return reliableInExcluded , Error . Wrap ( rows . Err ( ) )
}
2020-11-29 20:54:03 +00:00
func ( cache * overlaycache ) knownUnreliableOrOffline ( ctx context . Context , criteria * overlay . NodeCriteria , nodeIDs storj . NodeIDList ) ( badNodes storj . NodeIDList , err error ) {
2019-06-04 12:55:38 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2020-11-29 20:54:03 +00:00
if len ( nodeIDs ) == 0 {
2019-05-01 14:45:52 +01:00
return nil , Error . New ( "no ids provided" )
}
2019-05-08 18:59:50 +01:00
// get reliable and online nodes
2020-01-19 13:42:08 +00:00
var rows tagsql . Rows
2020-01-17 20:07:00 +00:00
rows , err = cache . db . Query ( ctx , cache . db . Rebind ( `
2021-05-11 09:49:26 +01:00
SELECT id
FROM nodes
` +cache.db.impl.AsOfSystemInterval(criteria.AsOfSystemInterval)+ `
2019-10-18 22:27:57 +01:00
WHERE id = any ( $ 1 : : bytea [ ] )
2019-06-18 10:14:31 +01:00
AND disqualified IS NULL
2020-06-10 17:11:25 +01:00
AND unknown_audit_suspended IS NULL
2021-03-18 19:55:06 +00:00
AND offline_suspended IS NULL
2020-04-23 20:46:16 +01:00
AND exit_finished_at IS NULL
2019-11-15 22:43:06 +00:00
AND last_contact_success > $ 2
2020-11-29 20:54:03 +00:00
` ) , pgutil . NodeIDArray ( nodeIDs ) , time . Now ( ) . Add ( - criteria . OnlineWindow ) ,
2019-10-18 22:27:57 +01:00
)
2019-05-01 14:45:52 +01:00
if err != nil {
return nil , err
}
2019-10-18 22:27:57 +01:00
defer func ( ) { err = errs . Combine ( err , rows . Close ( ) ) } ( )
2019-05-08 18:59:50 +01:00
2020-11-29 20:54:03 +00:00
goodNodes := make ( map [ storj . NodeID ] struct { } , len ( nodeIDs ) )
2019-05-01 14:45:52 +01:00
for rows . Next ( ) {
var id storj . NodeID
err = rows . Scan ( & id )
2018-12-17 20:14:16 +00:00
if err != nil {
2019-05-01 14:45:52 +01:00
return nil , err
2018-12-17 20:14:16 +00:00
}
2019-05-19 16:10:46 +01:00
goodNodes [ id ] = struct { } { }
2019-05-08 18:59:50 +01:00
}
2020-11-29 20:54:03 +00:00
for _ , id := range nodeIDs {
2019-05-19 16:10:46 +01:00
if _ , ok := goodNodes [ id ] ; ! ok {
2019-05-08 18:59:50 +01:00
badNodes = append ( badNodes , id )
}
2018-12-17 20:14:16 +00:00
}
2020-01-16 14:27:24 +00:00
return badNodes , Error . Wrap ( rows . Err ( ) )
2018-12-17 20:14:16 +00:00
}
2019-12-16 13:45:13 +00:00
// KnownReliable filters a set of nodes to reliable (online and qualified) nodes.
func ( cache * overlaycache ) KnownReliable ( ctx context . Context , onlineWindow time . Duration , nodeIDs storj . NodeIDList ) ( nodes [ ] * pb . Node , err error ) {
2020-11-29 20:54:03 +00:00
for {
nodes , err = cache . knownReliable ( ctx , onlineWindow , nodeIDs )
if err != nil {
if cockroachutil . NeedsRetry ( err ) {
continue
}
return nodes , err
}
break
}
return nodes , err
}
func ( cache * overlaycache ) knownReliable ( ctx context . Context , onlineWindow time . Duration , nodeIDs storj . NodeIDList ) ( nodes [ ] * pb . Node , err error ) {
2019-12-16 13:45:13 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
if len ( nodeIDs ) == 0 {
return nil , Error . New ( "no ids provided" )
}
// get online nodes
2020-01-17 20:07:00 +00:00
rows , err := cache . db . Query ( ctx , cache . db . Rebind ( `
2023-06-02 15:47:34 +01:00
SELECT id , last_net , last_ip_port , address , protocol , noise_proto , noise_public_key , debounce_limit , features
2020-03-06 22:04:23 +00:00
FROM nodes
2019-12-16 13:45:13 +00:00
WHERE id = any ( $ 1 : : bytea [ ] )
AND disqualified IS NULL
2020-06-10 17:11:25 +01:00
AND unknown_audit_suspended IS NULL
2021-03-18 19:55:06 +00:00
AND offline_suspended IS NULL
2020-04-23 20:46:16 +01:00
AND exit_finished_at IS NULL
2019-12-16 13:45:13 +00:00
AND last_contact_success > $ 2
2020-06-28 04:56:29 +01:00
` ) , pgutil . NodeIDArray ( nodeIDs ) , time . Now ( ) . Add ( - onlineWindow ) ,
2019-12-16 13:45:13 +00:00
)
if err != nil {
return nil , err
}
defer func ( ) { err = errs . Combine ( err , rows . Close ( ) ) } ( )
for rows . Next ( ) {
2020-03-11 21:11:46 +00:00
row := & dbx . Node { }
2023-06-02 15:47:34 +01:00
err = rows . Scan ( & row . Id , & row . LastNet , & row . LastIpPort , & row . Address , & row . Protocol , & row . NoiseProto , & row . NoisePublicKey , & row . DebounceLimit , & row . Features )
2019-12-16 13:45:13 +00:00
if err != nil {
return nil , err
}
2020-03-11 21:11:46 +00:00
node , err := convertDBNode ( ctx , row )
2019-12-16 13:45:13 +00:00
if err != nil {
return nil , err
}
2020-03-11 21:11:46 +00:00
nodes = append ( nodes , & node . Node )
2019-12-16 13:45:13 +00:00
}
2020-01-16 14:27:24 +00:00
return nodes , Error . Wrap ( rows . Err ( ) )
2019-12-16 13:45:13 +00:00
}
2019-07-08 23:04:35 +01:00
// Reliable returns all reliable nodes.
func ( cache * overlaycache ) Reliable ( ctx context . Context , criteria * overlay . NodeCriteria ) ( nodes storj . NodeIDList , err error ) {
2020-11-29 20:54:03 +00:00
for {
nodes , err = cache . reliable ( ctx , criteria )
if err != nil {
if cockroachutil . NeedsRetry ( err ) {
continue
}
return nodes , err
}
break
}
return nodes , err
}
func ( cache * overlaycache ) reliable ( ctx context . Context , criteria * overlay . NodeCriteria ) ( nodes storj . NodeIDList , err error ) {
2022-03-03 00:23:11 +00:00
args := [ ] interface { } {
time . Now ( ) . Add ( - criteria . OnlineWindow ) ,
}
// When this config is not set, it's a string slice with one empty string. I added some sanity checks to make sure we don't
// dereference a nil pointer or index an element that doesn't exist.
var excludedCountriesCondition string
if criteria . ExcludedCountries != nil && len ( criteria . ExcludedCountries ) != 0 && criteria . ExcludedCountries [ 0 ] != "" {
excludedCountriesCondition = "AND country_code NOT IN (SELECT UNNEST($2::TEXT[]))"
args = append ( args , pgutil . TextArray ( criteria . ExcludedCountries ) )
}
2019-07-08 23:04:35 +01:00
// get reliable and online nodes
2020-01-17 20:07:00 +00:00
rows , err := cache . db . Query ( ctx , cache . db . Rebind ( `
2021-05-11 09:49:26 +01:00
SELECT id
FROM nodes
` +cache.db.impl.AsOfSystemInterval(criteria.AsOfSystemInterval)+ `
2019-07-08 23:04:35 +01:00
WHERE disqualified IS NULL
2020-06-10 17:11:25 +01:00
AND unknown_audit_suspended IS NULL
2021-03-18 19:55:06 +00:00
AND offline_suspended IS NULL
2020-04-23 20:46:16 +01:00
AND exit_finished_at IS NULL
2022-02-25 10:43:19 +00:00
AND last_contact_success > $ 1
2022-03-03 00:23:11 +00:00
` +excludedCountriesCondition+ `
` ) , args ... )
2019-07-08 23:04:35 +01:00
if err != nil {
return nil , err
}
defer func ( ) {
err = errs . Combine ( err , rows . Close ( ) )
} ( )
for rows . Next ( ) {
var id storj . NodeID
err = rows . Scan ( & id )
if err != nil {
return nil , err
}
nodes = append ( nodes , id )
}
2020-01-16 14:27:24 +00:00
return nodes , Error . Wrap ( rows . Err ( ) )
2019-07-08 23:04:35 +01:00
}
2021-10-25 21:40:41 +01:00
// UpdateReputation updates the DB columns for any of the reputation fields in ReputationUpdate.
func ( cache * overlaycache ) UpdateReputation ( ctx context . Context , id storj . NodeID , request overlay . ReputationUpdate ) ( err error ) {
2021-08-05 12:07:45 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2021-06-17 15:01:21 +01:00
2021-08-04 18:33:47 +01:00
updateFields := dbx . Node_Update_Fields { }
updateFields . UnknownAuditSuspended = dbx . Node_UnknownAuditSuspended_Raw ( request . UnknownAuditSuspended )
updateFields . OfflineSuspended = dbx . Node_OfflineSuspended_Raw ( request . OfflineSuspended )
updateFields . VettedAt = dbx . Node_VettedAt_Raw ( request . VettedAt )
2021-10-27 11:58:29 +01:00
updateFields . Disqualified = dbx . Node_Disqualified_Raw ( request . Disqualified )
if request . Disqualified != nil {
updateFields . DisqualificationReason = dbx . Node_DisqualificationReason ( int ( request . DisqualificationReason ) )
}
2021-08-04 18:33:47 +01:00
err = cache . db . UpdateNoReturn_Node_By_Id_And_Disqualified_Is_Null_And_ExitFinishedAt_Is_Null ( ctx , dbx . Node_Id ( id . Bytes ( ) ) , updateFields )
2021-06-23 00:09:39 +01:00
return Error . Wrap ( err )
2021-06-17 15:01:21 +01:00
}
2019-09-10 17:05:07 +01:00
// UpdateNodeInfo updates the following fields for a given node ID:
2020-07-16 15:18:02 +01:00
// wallet, email for node operator, free disk, and version.
2020-06-16 13:16:55 +01:00
func ( cache * overlaycache ) UpdateNodeInfo ( ctx context . Context , nodeID storj . NodeID , nodeInfo * overlay . InfoResponse ) ( stats * overlay . NodeDossier , err error ) {
2019-03-25 22:25:09 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-04-10 07:04:24 +01:00
var updateFields dbx . Node_Update_Fields
if nodeInfo != nil {
2020-06-16 13:16:55 +01:00
if nodeInfo . Type != pb . NodeType_INVALID {
updateFields . Type = dbx . Node_Type ( int ( nodeInfo . Type ) )
2019-04-22 10:07:50 +01:00
}
2020-06-16 13:16:55 +01:00
if nodeInfo . Operator != nil {
2021-01-18 14:33:13 +00:00
walletFeatures , err := encodeWalletFeatures ( nodeInfo . Operator . GetWalletFeatures ( ) )
if err != nil {
return nil , Error . Wrap ( err )
}
2020-06-16 13:16:55 +01:00
updateFields . Wallet = dbx . Node_Wallet ( nodeInfo . Operator . GetWallet ( ) )
updateFields . Email = dbx . Node_Email ( nodeInfo . Operator . GetEmail ( ) )
2021-01-18 14:33:13 +00:00
updateFields . WalletFeatures = dbx . Node_WalletFeatures ( walletFeatures )
2019-04-10 07:04:24 +01:00
}
2020-06-16 13:16:55 +01:00
if nodeInfo . Capacity != nil {
updateFields . FreeDisk = dbx . Node_FreeDisk ( nodeInfo . Capacity . GetFreeDisk ( ) )
2019-04-10 07:04:24 +01:00
}
2020-06-16 13:16:55 +01:00
if nodeInfo . Version != nil {
semVer , err := version . NewSemVer ( nodeInfo . Version . GetVersion ( ) )
2019-04-10 07:04:24 +01:00
if err != nil {
2019-04-22 10:07:50 +01:00
return nil , errs . New ( "unable to convert version to semVer" )
2019-04-10 07:04:24 +01:00
}
2019-10-21 11:50:59 +01:00
updateFields . Major = dbx . Node_Major ( int64 ( semVer . Major ) )
updateFields . Minor = dbx . Node_Minor ( int64 ( semVer . Minor ) )
updateFields . Patch = dbx . Node_Patch ( int64 ( semVer . Patch ) )
2020-06-16 13:16:55 +01:00
updateFields . Hash = dbx . Node_Hash ( nodeInfo . Version . GetCommitHash ( ) )
updateFields . Timestamp = dbx . Node_Timestamp ( nodeInfo . Version . Timestamp )
updateFields . Release = dbx . Node_Release ( nodeInfo . Version . GetRelease ( ) )
2019-04-10 07:04:24 +01:00
}
2019-03-25 22:25:09 +00:00
}
2019-04-04 17:34:36 +01:00
updatedDBNode , err := cache . db . Update_Node_By_Id ( ctx , dbx . Node_Id ( nodeID . Bytes ( ) ) , updateFields )
2019-03-25 22:25:09 +00:00
if err != nil {
2019-04-04 17:34:36 +01:00
return nil , Error . Wrap ( err )
2019-03-25 22:25:09 +00:00
}
2019-06-04 12:55:38 +01:00
return convertDBNode ( ctx , updatedDBNode )
2019-03-25 22:25:09 +00:00
}
2020-01-03 19:11:47 +00:00
// DisqualifyNode disqualifies a storage node.
2022-10-11 17:13:29 +01:00
func ( cache * overlaycache ) DisqualifyNode ( ctx context . Context , nodeID storj . NodeID , disqualifiedAt time . Time , reason overlay . DisqualificationReason ) ( email string , err error ) {
2020-01-03 19:11:47 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
updateFields := dbx . Node_Update_Fields { }
2021-10-27 11:58:29 +01:00
updateFields . Disqualified = dbx . Node_Disqualified ( disqualifiedAt . UTC ( ) )
updateFields . DisqualificationReason = dbx . Node_DisqualificationReason ( int ( reason ) )
2020-01-03 19:11:47 +00:00
dbNode , err := cache . db . Update_Node_By_Id ( ctx , dbx . Node_Id ( nodeID . Bytes ( ) ) , updateFields )
if err != nil {
2022-10-11 17:13:29 +01:00
return "" , err
2020-01-03 19:11:47 +00:00
}
if dbNode == nil {
2022-10-11 17:13:29 +01:00
return "" , errs . New ( "unable to get node by ID: %v" , nodeID )
2020-01-03 19:11:47 +00:00
}
2022-10-11 17:13:29 +01:00
return dbNode . Email , nil
2020-01-03 19:11:47 +00:00
}
2021-07-15 15:14:13 +01:00
// TestSuspendNodeUnknownAudit suspends a storage node for unknown audits.
func ( cache * overlaycache ) TestSuspendNodeUnknownAudit ( ctx context . Context , nodeID storj . NodeID , suspendedAt time . Time ) ( err error ) {
2020-03-09 15:35:54 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
updateFields := dbx . Node_Update_Fields { }
2020-06-10 17:11:25 +01:00
updateFields . UnknownAuditSuspended = dbx . Node_UnknownAuditSuspended ( suspendedAt . UTC ( ) )
2020-03-09 15:35:54 +00:00
dbNode , err := cache . db . Update_Node_By_Id ( ctx , dbx . Node_Id ( nodeID . Bytes ( ) ) , updateFields )
if err != nil {
return err
}
if dbNode == nil {
return errs . New ( "unable to get node by ID: %v" , nodeID )
}
return nil
}
2021-07-15 15:14:13 +01:00
// TestUnsuspendNodeUnknownAudit unsuspends a storage node for unknown audits.
func ( cache * overlaycache ) TestUnsuspendNodeUnknownAudit ( ctx context . Context , nodeID storj . NodeID ) ( err error ) {
2020-03-09 15:35:54 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
updateFields := dbx . Node_Update_Fields { }
2020-06-10 17:11:25 +01:00
updateFields . UnknownAuditSuspended = dbx . Node_UnknownAuditSuspended_Null ( )
2020-03-09 15:35:54 +00:00
dbNode , err := cache . db . Update_Node_By_Id ( ctx , dbx . Node_Id ( nodeID . Bytes ( ) ) , updateFields )
2020-12-04 22:21:07 +00:00
if err != nil {
return err
}
2020-03-09 15:35:54 +00:00
if dbNode == nil {
return errs . New ( "unable to get node by ID: %v" , nodeID )
}
return nil
}
2019-08-27 13:37:42 +01:00
// AllPieceCounts returns a map of node IDs to piece counts from the db.
// NB: a valid, partial piece map can be returned even if node ID parsing error(s) are returned.
2022-09-13 09:37:54 +01:00
func ( cache * overlaycache ) AllPieceCounts ( ctx context . Context ) ( _ map [ storj . NodeID ] int64 , err error ) {
2019-08-27 13:37:42 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
// NB: `All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number` selects node
// ID and piece count from the nodes table where piece count is not zero.
rows , err := cache . db . All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number ( ctx )
if err != nil {
return nil , Error . Wrap ( err )
}
2022-09-13 09:37:54 +01:00
pieceCounts := make ( map [ storj . NodeID ] int64 )
2019-08-27 13:37:42 +01:00
nodeIDErrs := errs . Group { }
for _ , row := range rows {
nodeID , err := storj . NodeIDFromBytes ( row . Id )
if err != nil {
nodeIDErrs . Add ( err )
continue
}
2022-09-13 09:37:54 +01:00
pieceCounts [ nodeID ] = row . PieceCount
2019-08-27 13:37:42 +01:00
}
2019-09-11 22:38:58 +01:00
2019-08-27 13:37:42 +01:00
return pieceCounts , nodeIDErrs . Err ( )
}
2022-09-13 09:37:54 +01:00
func ( cache * overlaycache ) UpdatePieceCounts ( ctx context . Context , pieceCounts map [ storj . NodeID ] int64 ) ( err error ) {
2019-08-27 13:37:42 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
if len ( pieceCounts ) == 0 {
return nil
}
2019-09-11 22:38:58 +01:00
// TODO: pass in the apprioriate struct to database, rather than constructing it here
type NodeCount struct {
ID storj . NodeID
Count int64
2019-08-27 13:37:42 +01:00
}
2019-09-11 22:38:58 +01:00
var counts [ ] NodeCount
2019-08-27 13:37:42 +01:00
2019-09-11 22:38:58 +01:00
for nodeid , count := range pieceCounts {
counts = append ( counts , NodeCount {
ID : nodeid ,
2022-09-13 09:37:54 +01:00
Count : count ,
2019-09-11 22:38:58 +01:00
} )
}
sort . Slice ( counts , func ( i , k int ) bool {
return counts [ i ] . ID . Less ( counts [ k ] . ID )
} )
2019-08-27 13:37:42 +01:00
2019-10-18 22:27:57 +01:00
var nodeIDs [ ] storj . NodeID
var countNumbers [ ] int64
for _ , count := range counts {
nodeIDs = append ( nodeIDs , count . ID )
countNumbers = append ( countNumbers , count . Count )
2019-08-27 13:37:42 +01:00
}
2019-09-11 22:38:58 +01:00
2019-10-18 22:27:57 +01:00
_ , err = cache . db . ExecContext ( ctx , `
UPDATE nodes
SET piece_count = update . count
FROM (
SELECT unnest ( $ 1 : : bytea [ ] ) as id , unnest ( $ 2 : : bigint [ ] ) as count
) as update
WHERE nodes . id = update . id
2020-06-28 04:56:29 +01:00
` , pgutil . NodeIDArray ( nodeIDs ) , pgutil . Int8Array ( countNumbers ) )
2019-10-18 22:27:57 +01:00
2019-09-11 22:38:58 +01:00
return Error . Wrap ( err )
2019-08-27 13:37:42 +01:00
}
2019-11-07 17:19:34 +00:00
// GetExitingNodes returns nodes who have initiated a graceful exit and is not disqualified, but have not completed it.
2019-10-24 17:24:42 +01:00
func ( cache * overlaycache ) GetExitingNodes ( ctx context . Context ) ( exitingNodes [ ] * overlay . ExitStatus , err error ) {
2020-11-29 20:54:03 +00:00
for {
exitingNodes , err = cache . getExitingNodes ( ctx )
if err != nil {
if cockroachutil . NeedsRetry ( err ) {
continue
}
return exitingNodes , err
}
break
}
return exitingNodes , err
}
func ( cache * overlaycache ) getExitingNodes ( ctx context . Context ) ( exitingNodes [ ] * overlay . ExitStatus , err error ) {
2019-10-01 23:18:21 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2020-01-17 20:07:00 +00:00
rows , err := cache . db . Query ( ctx , cache . db . Rebind ( `
2019-10-24 17:24:42 +01:00
SELECT id , exit_initiated_at , exit_loop_completed_at , exit_finished_at , exit_success FROM nodes
2019-10-01 23:18:21 +01:00
WHERE exit_initiated_at IS NOT NULL
AND exit_finished_at IS NULL
2019-11-07 17:19:34 +00:00
AND disqualified is NULL
2020-01-16 14:27:24 +00:00
` ) )
2019-10-01 23:18:21 +01:00
if err != nil {
return nil , err
}
2020-01-16 14:27:24 +00:00
defer func ( ) { err = errs . Combine ( err , rows . Close ( ) ) } ( )
2019-10-01 23:18:21 +01:00
for rows . Next ( ) {
2019-10-24 17:24:42 +01:00
var exitingNodeStatus overlay . ExitStatus
err = rows . Scan ( & exitingNodeStatus . NodeID , & exitingNodeStatus . ExitInitiatedAt , & exitingNodeStatus . ExitLoopCompletedAt , & exitingNodeStatus . ExitFinishedAt , & exitingNodeStatus . ExitSuccess )
2019-10-01 23:18:21 +01:00
if err != nil {
return nil , err
}
2019-10-24 17:24:42 +01:00
exitingNodes = append ( exitingNodes , & exitingNodeStatus )
2019-10-01 23:18:21 +01:00
}
2020-01-16 14:27:24 +00:00
return exitingNodes , Error . Wrap ( rows . Err ( ) )
2019-10-01 23:18:21 +01:00
}
2019-10-23 02:06:01 +01:00
// GetExitStatus returns a node's graceful exit status.
2020-11-29 20:54:03 +00:00
func ( cache * overlaycache ) GetExitStatus ( ctx context . Context , nodeID storj . NodeID ) ( exitStatus * overlay . ExitStatus , err error ) {
for {
exitStatus , err = cache . getExitStatus ( ctx , nodeID )
if err != nil {
if cockroachutil . NeedsRetry ( err ) {
continue
}
return exitStatus , err
}
break
}
return exitStatus , err
}
func ( cache * overlaycache ) getExitStatus ( ctx context . Context , nodeID storj . NodeID ) ( _ * overlay . ExitStatus , err error ) {
2019-10-11 22:18:05 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2020-01-17 20:07:00 +00:00
rows , err := cache . db . Query ( ctx , cache . db . Rebind ( `
2020-01-16 14:27:24 +00:00
SELECT id , exit_initiated_at , exit_loop_completed_at , exit_finished_at , exit_success
FROM nodes
WHERE id = ?
` ) , nodeID )
2019-10-11 22:18:05 +01:00
if err != nil {
return nil , Error . Wrap ( err )
}
2020-01-16 14:27:24 +00:00
defer func ( ) { err = errs . Combine ( err , rows . Close ( ) ) } ( )
2019-10-11 22:18:05 +01:00
exitStatus := & overlay . ExitStatus { }
if rows . Next ( ) {
2019-10-17 16:01:39 +01:00
err = rows . Scan ( & exitStatus . NodeID , & exitStatus . ExitInitiatedAt , & exitStatus . ExitLoopCompletedAt , & exitStatus . ExitFinishedAt , & exitStatus . ExitSuccess )
2020-01-16 14:27:24 +00:00
if err != nil {
return nil , err
}
2019-10-11 22:18:05 +01:00
}
2020-01-16 14:27:24 +00:00
return exitStatus , Error . Wrap ( rows . Err ( ) )
2019-10-11 22:18:05 +01:00
}
2019-10-01 23:18:21 +01:00
2019-10-23 02:06:01 +01:00
// GetGracefulExitCompletedByTimeFrame returns nodes who have completed graceful exit within a time window (time window is around graceful exit completion).
func ( cache * overlaycache ) GetGracefulExitCompletedByTimeFrame ( ctx context . Context , begin , end time . Time ) ( exitedNodes storj . NodeIDList , err error ) {
2020-11-29 20:54:03 +00:00
for {
exitedNodes , err = cache . getGracefulExitCompletedByTimeFrame ( ctx , begin , end )
if err != nil {
if cockroachutil . NeedsRetry ( err ) {
continue
}
return exitedNodes , err
}
break
}
return exitedNodes , err
}
func ( cache * overlaycache ) getGracefulExitCompletedByTimeFrame ( ctx context . Context , begin , end time . Time ) ( exitedNodes storj . NodeIDList , err error ) {
2019-10-23 02:06:01 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2020-01-17 20:07:00 +00:00
rows , err := cache . db . Query ( ctx , cache . db . Rebind ( `
2019-10-23 02:06:01 +01:00
SELECT id FROM nodes
WHERE exit_initiated_at IS NOT NULL
AND exit_finished_at IS NOT NULL
AND exit_finished_at >= ?
AND exit_finished_at < ?
2020-01-16 14:27:24 +00:00
` ) , begin , end )
2019-10-23 02:06:01 +01:00
if err != nil {
return nil , err
}
defer func ( ) {
err = errs . Combine ( err , rows . Close ( ) )
} ( )
for rows . Next ( ) {
var id storj . NodeID
err = rows . Scan ( & id )
if err != nil {
return nil , err
}
exitedNodes = append ( exitedNodes , id )
}
2020-01-16 14:27:24 +00:00
return exitedNodes , Error . Wrap ( rows . Err ( ) )
2019-10-23 02:06:01 +01:00
}
// GetGracefulExitIncompleteByTimeFrame returns nodes who have initiated, but not completed graceful exit within a time window (time window is around graceful exit initiation).
func ( cache * overlaycache ) GetGracefulExitIncompleteByTimeFrame ( ctx context . Context , begin , end time . Time ) ( exitingNodes storj . NodeIDList , err error ) {
2020-11-29 20:54:03 +00:00
for {
exitingNodes , err = cache . getGracefulExitIncompleteByTimeFrame ( ctx , begin , end )
if err != nil {
if cockroachutil . NeedsRetry ( err ) {
continue
}
return exitingNodes , err
}
break
}
return exitingNodes , err
}
func ( cache * overlaycache ) getGracefulExitIncompleteByTimeFrame ( ctx context . Context , begin , end time . Time ) ( exitingNodes storj . NodeIDList , err error ) {
2019-10-23 02:06:01 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2020-01-17 20:07:00 +00:00
rows , err := cache . db . Query ( ctx , cache . db . Rebind ( `
2019-10-23 02:06:01 +01:00
SELECT id FROM nodes
WHERE exit_initiated_at IS NOT NULL
AND exit_finished_at IS NULL
AND exit_initiated_at >= ?
AND exit_initiated_at < ?
2020-01-16 14:27:24 +00:00
` ) , begin , end )
2019-10-23 02:06:01 +01:00
if err != nil {
return nil , err
}
defer func ( ) {
err = errs . Combine ( err , rows . Close ( ) )
} ( )
// TODO return more than just ID
for rows . Next ( ) {
var id storj . NodeID
err = rows . Scan ( & id )
if err != nil {
return nil , err
}
exitingNodes = append ( exitingNodes , id )
}
2020-01-16 14:27:24 +00:00
return exitingNodes , Error . Wrap ( rows . Err ( ) )
2019-10-23 02:06:01 +01:00
}
2019-10-01 23:18:21 +01:00
// UpdateExitStatus is used to update a node's graceful exit status.
2019-10-29 20:22:20 +00:00
func ( cache * overlaycache ) UpdateExitStatus ( ctx context . Context , request * overlay . ExitStatusRequest ) ( _ * overlay . NodeDossier , err error ) {
2019-10-01 23:18:21 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
nodeID := request . NodeID
updateFields := populateExitStatusFields ( request )
2019-12-20 15:59:47 +00:00
dbNode , err := cache . db . Update_Node_By_Id ( ctx , dbx . Node_Id ( nodeID . Bytes ( ) ) , updateFields )
2019-10-01 23:18:21 +01:00
if err != nil {
return nil , Error . Wrap ( err )
}
if dbNode == nil {
2019-12-20 15:59:47 +00:00
return nil , Error . Wrap ( errs . New ( "unable to get node by ID: %v" , nodeID ) )
2019-10-29 20:22:20 +00:00
}
return convertDBNode ( ctx , dbNode )
2019-10-01 23:18:21 +01:00
}
func populateExitStatusFields ( req * overlay . ExitStatusRequest ) dbx . Node_Update_Fields {
dbxUpdateFields := dbx . Node_Update_Fields { }
if ! req . ExitInitiatedAt . IsZero ( ) {
dbxUpdateFields . ExitInitiatedAt = dbx . Node_ExitInitiatedAt ( req . ExitInitiatedAt )
}
if ! req . ExitLoopCompletedAt . IsZero ( ) {
dbxUpdateFields . ExitLoopCompletedAt = dbx . Node_ExitLoopCompletedAt ( req . ExitLoopCompletedAt )
}
if ! req . ExitFinishedAt . IsZero ( ) {
dbxUpdateFields . ExitFinishedAt = dbx . Node_ExitFinishedAt ( req . ExitFinishedAt )
}
2019-10-17 16:01:39 +01:00
dbxUpdateFields . ExitSuccess = dbx . Node_ExitSuccess ( req . ExitSuccess )
2019-10-01 23:18:21 +01:00
return dbxUpdateFields
}
2019-06-04 12:55:38 +01:00
func convertDBNode ( ctx context . Context , info * dbx . Node ) ( _ * overlay . NodeDossier , err error ) {
2019-01-15 16:08:45 +00:00
if info == nil {
return nil , Error . New ( "missing info" )
}
2019-03-29 08:53:43 +00:00
id , err := storj . NodeIDFromBytes ( info . Id )
2019-01-15 16:08:45 +00:00
if err != nil {
return nil , err
}
2019-10-21 11:50:59 +01:00
ver , err := version . NewSemVer ( fmt . Sprintf ( "%d.%d.%d" , info . Major , info . Minor , info . Patch ) )
if err != nil {
return nil , err
2019-04-10 07:04:24 +01:00
}
2022-12-22 20:28:53 +00:00
var noiseInfo * pb . NoiseInfo
if info . NoiseProto != nil && len ( info . NoisePublicKey ) > 0 {
noiseInfo = & pb . NoiseInfo {
Proto : pb . NoiseProtocol ( * info . NoiseProto ) ,
PublicKey : info . NoisePublicKey ,
}
}
2019-10-11 22:18:05 +01:00
exitStatus := overlay . ExitStatus { NodeID : id }
exitStatus . ExitInitiatedAt = info . ExitInitiatedAt
exitStatus . ExitLoopCompletedAt = info . ExitLoopCompletedAt
exitStatus . ExitFinishedAt = info . ExitFinishedAt
2020-03-10 20:42:11 +00:00
exitStatus . ExitSuccess = info . ExitSuccess
2019-10-11 22:18:05 +01:00
2019-04-04 17:34:36 +01:00
node := & overlay . NodeDossier {
Node : pb . Node {
2020-03-06 22:04:23 +00:00
Id : id ,
2019-04-04 17:34:36 +01:00
Address : & pb . NodeAddress {
2023-02-28 20:38:13 +00:00
Address : info . Address ,
NoiseInfo : noiseInfo ,
DebounceLimit : int32 ( info . DebounceLimit ) ,
2023-06-02 15:47:34 +01:00
Features : uint64 ( info . Features ) ,
2019-04-04 17:34:36 +01:00
} ,
2019-01-15 16:08:45 +00:00
} ,
2019-04-04 17:34:36 +01:00
Type : pb . NodeType ( info . Type ) ,
Operator : pb . NodeOperator {
2021-01-18 14:33:13 +00:00
Email : info . Email ,
Wallet : info . Wallet ,
WalletFeatures : decodeWalletFeatures ( info . WalletFeatures ) ,
2019-01-15 16:08:45 +00:00
} ,
2019-04-04 17:34:36 +01:00
Capacity : pb . NodeCapacity {
2020-02-12 21:19:42 +00:00
FreeDisk : info . FreeDisk ,
2019-01-15 16:08:45 +00:00
} ,
2019-06-20 14:56:04 +01:00
Reputation : * getNodeStats ( info ) ,
2019-04-10 07:04:24 +01:00
Version : pb . NodeVersion {
Version : ver . String ( ) ,
CommitHash : info . Hash ,
2019-07-08 19:24:42 +01:00
Timestamp : info . Timestamp ,
2019-04-10 07:04:24 +01:00
Release : info . Release ,
} ,
2022-09-30 16:26:24 +01:00
Disqualified : info . Disqualified ,
DisqualificationReason : ( * overlay . DisqualificationReason ) ( info . DisqualificationReason ) ,
UnknownAuditSuspended : info . UnknownAuditSuspended ,
OfflineSuspended : info . OfflineSuspended ,
OfflineUnderReview : info . UnderReview ,
PieceCount : info . PieceCount ,
ExitStatus : exitStatus ,
CreatedAt : info . CreatedAt ,
LastNet : info . LastNet ,
2022-10-07 21:24:43 +01:00
LastOfflineEmail : info . LastOfflineEmail ,
2022-09-30 16:26:24 +01:00
LastSoftwareUpdateEmail : info . LastSoftwareUpdateEmail ,
2020-03-06 22:04:23 +00:00
}
if info . LastIpPort != nil {
node . LastIPPort = * info . LastIpPort
2019-01-15 16:08:45 +00:00
}
2021-10-29 18:44:44 +01:00
if info . CountryCode != nil {
node . CountryCode = location . ToCountryCode ( * info . CountryCode )
}
2022-11-22 00:10:27 +00:00
if info . Contained != nil {
node . Contained = true
}
2019-01-15 16:08:45 +00:00
return node , nil
2018-12-17 20:14:16 +00:00
}
2019-03-25 22:25:09 +00:00
2021-01-18 14:33:13 +00:00
// encodeWalletFeatures encodes wallet features into comma separated list string.
func encodeWalletFeatures ( features [ ] string ) ( string , error ) {
var errGroup errs . Group
for _ , feature := range features {
if strings . Contains ( feature , "," ) {
errGroup . Add ( errs . New ( "error encoding %s, can not contain separator \",\"" , feature ) )
}
}
if err := errGroup . Err ( ) ; err != nil {
return "" , Error . Wrap ( err )
}
return strings . Join ( features , "," ) , nil
}
// decodeWalletFeatures decodes comma separated wallet features list string.
func decodeWalletFeatures ( encoded string ) [ ] string {
if encoded == "" {
return nil
}
return strings . Split ( encoded , "," )
}
2019-04-08 18:52:53 +01:00
func getNodeStats ( dbNode * dbx . Node ) * overlay . NodeStats {
2019-03-25 22:25:09 +00:00
nodeStats := & overlay . NodeStats {
2021-11-08 20:51:04 +00:00
Latency90 : dbNode . Latency90 ,
LastContactSuccess : dbNode . LastContactSuccess ,
LastContactFailure : dbNode . LastContactFailure ,
OfflineUnderReview : dbNode . UnderReview ,
Status : overlay . ReputationStatus {
2023-01-27 18:02:24 +00:00
Email : dbNode . Email ,
2021-11-08 20:51:04 +00:00
VettedAt : dbNode . VettedAt ,
Disqualified : dbNode . Disqualified ,
UnknownAuditSuspended : dbNode . UnknownAuditSuspended ,
OfflineSuspended : dbNode . OfflineSuspended ,
} ,
2019-03-25 22:25:09 +00:00
}
return nodeStats
}
2021-03-15 20:48:36 +00:00
// DQNodesLastSeenBefore disqualifies a limited number of nodes where last_contact_success < cutoff except those already disqualified
2021-04-22 14:43:56 +01:00
// or gracefully exited or where last_contact_success = '0001-01-01 00:00:00+00'.
2022-10-12 18:56:15 +01:00
func ( cache * overlaycache ) DQNodesLastSeenBefore ( ctx context . Context , cutoff time . Time , limit int ) ( nodeEmails map [ storj . NodeID ] string , count int , err error ) {
2020-12-31 18:43:13 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
2021-03-15 20:48:36 +00:00
2021-03-23 18:31:08 +00:00
var nodeIDs [ ] storj . NodeID
2022-10-12 18:56:15 +01:00
nodeEmails = make ( map [ storj . NodeID ] string )
2021-03-23 18:31:08 +00:00
for {
nodeIDs , err = cache . getNodesForDQLastSeenBefore ( ctx , cutoff , limit )
if err != nil {
if cockroachutil . NeedsRetry ( err ) {
continue
}
2022-10-12 18:56:15 +01:00
return nil , 0 , err
2021-03-23 18:31:08 +00:00
}
if len ( nodeIDs ) == 0 {
2022-10-12 18:56:15 +01:00
return nil , 0 , nil
2021-03-23 18:31:08 +00:00
}
break
}
var rows tagsql . Rows
rows , err = cache . db . Query ( ctx , cache . db . Rebind ( `
UPDATE nodes
2022-04-20 17:59:47 +01:00
SET disqualified = current_timestamp ,
disqualification_reason = $ 3
2021-03-23 18:31:08 +00:00
WHERE id = any ( $ 1 : : bytea [ ] )
AND disqualified IS NULL
AND exit_finished_at IS NULL
AND last_contact_success < $ 2
2021-04-22 14:43:56 +01:00
AND last_contact_success != ' 0001 - 01 - 01 00 : 00 : 00 + 00 ' : : timestamptz
2022-10-12 18:56:15 +01:00
RETURNING id , email , last_contact_success ;
2022-04-20 17:59:47 +01:00
` ) , pgutil . NodeIDArray ( nodeIDs ) , cutoff , overlay . DisqualificationReasonNodeOffline )
2021-02-22 20:01:24 +00:00
if err != nil {
2022-10-12 18:56:15 +01:00
return nil , 0 , err
2021-02-22 20:01:24 +00:00
}
2021-03-15 20:48:36 +00:00
defer func ( ) { err = errs . Combine ( err , rows . Close ( ) ) } ( )
for rows . Next ( ) {
var id storj . NodeID
2022-10-12 18:56:15 +01:00
var email string
2021-03-15 20:48:36 +00:00
var lcs time . Time
2022-10-12 18:56:15 +01:00
err = rows . Scan ( & id , & email , & lcs )
2021-03-15 20:48:36 +00:00
if err != nil {
2022-10-12 18:56:15 +01:00
return nil , count , err
2021-03-15 20:48:36 +00:00
}
cache . db . log . Info ( "Disqualified" ,
zap . String ( "DQ type" , "stray node" ) ,
zap . Stringer ( "Node ID" , id ) ,
zap . Stringer ( "Last contacted" , lcs ) )
2022-10-12 18:56:15 +01:00
nodeEmails [ id ] = email
2021-03-15 20:48:36 +00:00
count ++
2021-02-22 20:01:24 +00:00
}
2022-10-12 18:56:15 +01:00
return nodeEmails , count , rows . Err ( )
2020-12-31 18:43:13 +00:00
}
2021-03-23 18:31:08 +00:00
func ( cache * overlaycache ) getNodesForDQLastSeenBefore ( ctx context . Context , cutoff time . Time , limit int ) ( nodes [ ] storj . NodeID , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
rows , err := cache . db . Query ( ctx , cache . db . Rebind ( `
SELECT id
FROM nodes
WHERE last_contact_success < $ 1
AND disqualified is NULL
AND exit_finished_at is NULL
2021-10-28 19:59:04 +01:00
AND last_contact_success != ' 0001 - 01 - 01 00 : 00 : 00 + 00 ' : : timestamptz
2021-03-23 18:31:08 +00:00
LIMIT $ 2
` ) , cutoff , limit )
if err != nil {
return nil , err
}
defer func ( ) { err = errs . Combine ( err , rows . Close ( ) ) } ( )
var nodeIDs [ ] storj . NodeID
for rows . Next ( ) {
var id storj . NodeID
err = rows . Scan ( & id )
if err != nil {
return nil , err
}
nodeIDs = append ( nodeIDs , id )
}
return nodeIDs , rows . Err ( )
}
2022-11-22 13:02:01 +00:00
func ( cache * overlaycache ) updateCheckInDirectUpdate ( ctx context . Context , node overlay . NodeCheckInInfo , timestamp time . Time , semVer version . SemVer , walletFeatures string ) ( updated bool , err error ) {
2022-12-22 20:28:53 +00:00
var noiseProto sql . NullInt32
var noisePublicKey [ ] byte
if node . Address . NoiseInfo != nil {
noiseProto = sql . NullInt32 {
Int32 : int32 ( node . Address . NoiseInfo . Proto ) ,
Valid : true ,
}
noisePublicKey = node . Address . NoiseInfo . PublicKey
}
2021-05-16 18:09:15 +01:00
// First try the fast path.
var res sql . Result
res , err = cache . db . ExecContext ( ctx , `
UPDATE nodes
SET
address = $ 2 ,
last_net = $ 3 ,
protocol = $ 4 ,
email = $ 5 ,
wallet = $ 6 ,
free_disk = $ 7 ,
major = $ 9 , minor = $ 10 , patch = $ 11 , hash = $ 12 , timestamp = $ 13 , release = $ 14 ,
last_contact_success = CASE WHEN $ 8 : : bool IS TRUE
THEN $ 15 : : timestamptz
ELSE nodes . last_contact_success
END ,
last_contact_failure = CASE WHEN $ 8 : : bool IS FALSE
THEN $ 15 : : timestamptz
ELSE nodes . last_contact_failure
END ,
last_ip_port = $ 16 ,
2021-10-29 18:44:44 +01:00
wallet_features = $ 17 ,
2022-11-22 13:02:01 +00:00
country_code = $ 18 ,
2022-12-22 20:28:53 +00:00
noise_proto = $ 21 ,
noise_public_key = $ 22 ,
2023-02-28 20:38:13 +00:00
debounce_limit = $ 23 ,
2023-06-02 15:47:34 +01:00
features = $ 24 ,
2022-09-30 16:26:24 +01:00
last_software_update_email = CASE
WHEN $ 19 : : bool IS TRUE THEN $ 15 : : timestamptz
WHEN $ 20 : : bool IS FALSE THEN NULL
2023-02-01 22:19:02 +00:00
ELSE nodes . last_software_update_email
2022-10-07 21:24:43 +01:00
END ,
last_offline_email = CASE WHEN $ 8 : : bool IS TRUE
THEN NULL
ELSE nodes . last_offline_email
2022-09-30 16:26:24 +01:00
END
2021-05-16 18:09:15 +01:00
WHERE id = $ 1
` , // args $1 - $4
2023-01-24 15:59:47 +00:00
node . NodeID . Bytes ( ) , node . Address . GetAddress ( ) , node . LastNet , pb . NodeTransport_TCP_TLS_RPC ,
2021-05-16 18:09:15 +01:00
// args $5 - $7
node . Operator . GetEmail ( ) , node . Operator . GetWallet ( ) , node . Capacity . GetFreeDisk ( ) ,
// args $8
node . IsUp ,
// args $9 - $14
semVer . Major , semVer . Minor , semVer . Patch , node . Version . GetCommitHash ( ) , node . Version . Timestamp , node . Version . GetRelease ( ) ,
// args $15
timestamp ,
// args $16
node . LastIPPort ,
// args $17,
walletFeatures ,
2021-10-29 18:44:44 +01:00
// args $18,
node . CountryCode . String ( ) ,
2022-09-30 16:26:24 +01:00
// args $19 - $20
node . SoftwareUpdateEmailSent , node . VersionBelowMin ,
2023-06-02 15:47:34 +01:00
// args $21 - $24
noiseProto , noisePublicKey , node . Address . DebounceLimit , node . Address . Features ,
2021-05-16 18:09:15 +01:00
)
2021-05-24 22:30:36 +01:00
2022-11-22 13:02:01 +00:00
if err != nil {
return false , Error . Wrap ( err )
}
affected , affectedErr := res . RowsAffected ( )
if affectedErr != nil {
return false , Error . Wrap ( err )
}
return affected > 0 , nil
}
// UpdateCheckIn updates a single storagenode with info from when the the node last checked in.
func ( cache * overlaycache ) UpdateCheckIn ( ctx context . Context , node overlay . NodeCheckInInfo , timestamp time . Time , config overlay . NodeSelectionConfig ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
if node . Address . GetAddress ( ) == "" {
return Error . New ( "error UpdateCheckIn: missing the storage node address" )
}
semVer , err := version . NewSemVer ( node . Version . GetVersion ( ) )
if err != nil {
return Error . New ( "unable to convert version to semVer" )
}
walletFeatures , err := encodeWalletFeatures ( node . Operator . GetWalletFeatures ( ) )
if err != nil {
return Error . Wrap ( err )
}
updated , err := cache . updateCheckInDirectUpdate ( ctx , node , timestamp , semVer , walletFeatures )
if err != nil {
return Error . Wrap ( err )
}
if updated {
return nil
2021-05-16 18:09:15 +01:00
}
2022-12-22 20:28:53 +00:00
var noiseProto sql . NullInt32
var noisePublicKey [ ] byte
if node . Address . NoiseInfo != nil {
noiseProto = sql . NullInt32 {
Int32 : int32 ( node . Address . NoiseInfo . Proto ) ,
Valid : true ,
}
noisePublicKey = node . Address . NoiseInfo . PublicKey
}
2021-05-16 18:09:15 +01:00
_ , err = cache . db . ExecContext ( ctx , `
2019-09-19 19:37:31 +01:00
INSERT INTO nodes
(
id , address , last_net , protocol , type ,
2020-02-12 21:19:42 +00:00
email , wallet , free_disk ,
2019-09-19 19:37:31 +01:00
last_contact_success ,
last_contact_failure ,
2020-03-06 22:04:23 +00:00
major , minor , patch , hash , timestamp , release ,
2022-12-22 20:28:53 +00:00
last_ip_port , wallet_features , country_code ,
2023-06-02 15:47:34 +01:00
noise_proto , noise_public_key , debounce_limit ,
features
2019-09-19 19:37:31 +01:00
)
VALUES (
$ 1 , $ 2 , $ 3 , $ 4 , $ 5 ,
2020-02-12 21:19:42 +00:00
$ 6 , $ 7 , $ 8 ,
2021-08-10 19:32:24 +01:00
CASE WHEN $ 9 : : bool IS TRUE THEN $ 16 : : timestamptz
2019-11-15 22:43:06 +00:00
ELSE ' 0001 - 01 - 01 00 : 00 : 00 + 00 ' : : timestamptz
2019-09-19 19:37:31 +01:00
END ,
2021-08-10 19:32:24 +01:00
CASE WHEN $ 9 : : bool IS FALSE THEN $ 16 : : timestamptz
2019-11-15 22:43:06 +00:00
ELSE ' 0001 - 01 - 01 00 : 00 : 00 + 00 ' : : timestamptz
2019-09-19 19:37:31 +01:00
END ,
2021-08-10 19:32:24 +01:00
$ 10 , $ 11 , $ 12 , $ 13 , $ 14 , $ 15 ,
2022-12-22 20:28:53 +00:00
$ 17 , $ 18 , $ 19 ,
2023-06-02 15:47:34 +01:00
$ 22 , $ 23 , $ 24 ,
$ 25
2019-09-19 19:37:31 +01:00
)
ON CONFLICT ( id )
DO UPDATE
SET
address = $ 2 ,
last_net = $ 3 ,
protocol = $ 4 ,
email = $ 6 ,
wallet = $ 7 ,
2020-02-12 21:19:42 +00:00
free_disk = $ 8 ,
2021-08-10 19:32:24 +01:00
major = $ 10 , minor = $ 11 , patch = $ 12 , hash = $ 13 , timestamp = $ 14 , release = $ 15 ,
2020-02-12 21:19:42 +00:00
last_contact_success = CASE WHEN $ 9 : : bool IS TRUE
2021-08-10 19:32:24 +01:00
THEN $ 16 : : timestamptz
2019-09-19 19:37:31 +01:00
ELSE nodes . last_contact_success
END ,
2020-02-12 21:19:42 +00:00
last_contact_failure = CASE WHEN $ 9 : : bool IS FALSE
2021-08-10 19:32:24 +01:00
THEN $ 16 : : timestamptz
2019-09-19 19:37:31 +01:00
ELSE nodes . last_contact_failure
2020-03-06 22:04:23 +00:00
END ,
2021-08-10 19:32:24 +01:00
last_ip_port = $ 17 ,
2021-10-29 18:44:44 +01:00
wallet_features = $ 18 ,
2022-09-30 16:26:24 +01:00
country_code = $ 19 ,
2022-12-22 20:28:53 +00:00
noise_proto = $ 22 ,
noise_public_key = $ 23 ,
2023-02-28 20:38:13 +00:00
debounce_limit = $ 24 ,
2023-06-02 15:47:34 +01:00
features = $ 25 ,
2022-09-30 16:26:24 +01:00
last_software_update_email = CASE
WHEN $ 20 : : bool IS TRUE THEN $ 16 : : timestamptz
WHEN $ 21 : : bool IS FALSE THEN NULL
2023-02-01 22:19:02 +00:00
ELSE nodes . last_software_update_email
2022-10-07 21:24:43 +01:00
END ,
last_offline_email = CASE WHEN $ 9 : : bool IS TRUE
THEN NULL
ELSE nodes . last_offline_email
END ;
2021-05-16 18:09:15 +01:00
` ,
2019-10-18 22:27:57 +01:00
// args $1 - $5
2023-01-24 15:59:47 +00:00
node . NodeID . Bytes ( ) , node . Address . GetAddress ( ) , node . LastNet , pb . NodeTransport_TCP_TLS_RPC , int ( pb . NodeType_STORAGE ) ,
2020-02-12 21:19:42 +00:00
// args $6 - $8
node . Operator . GetEmail ( ) , node . Operator . GetWallet ( ) , node . Capacity . GetFreeDisk ( ) ,
// args $9
2019-10-18 22:27:57 +01:00
node . IsUp ,
2021-08-10 19:32:24 +01:00
// args $10 - $15
2019-10-18 22:27:57 +01:00
semVer . Major , semVer . Minor , semVer . Patch , node . Version . GetCommitHash ( ) , node . Version . Timestamp , node . Version . GetRelease ( ) ,
2021-08-10 19:32:24 +01:00
// args $16
2019-11-15 22:43:06 +00:00
timestamp ,
2022-12-22 20:28:53 +00:00
// args $17 - $19
node . LastIPPort , walletFeatures , node . CountryCode . String ( ) ,
2022-09-30 16:26:24 +01:00
// args $20 - $21
node . SoftwareUpdateEmailSent , node . VersionBelowMin ,
2023-06-02 15:47:34 +01:00
// args $22 - $25
noiseProto , noisePublicKey , node . Address . DebounceLimit , node . Address . Features ,
2019-10-18 22:27:57 +01:00
)
if err != nil {
return Error . Wrap ( err )
2019-09-19 19:37:31 +01:00
}
return nil
}
2020-07-08 15:28:49 +01:00
2022-11-22 00:10:27 +00:00
// SetNodeContained updates the contained field for the node record. If
// `contained` is true, the contained field in the record is set to the current
// database time, if it is not already set. If `contained` is false, the
// contained field in the record is set to NULL. All other fields are left
// alone.
func ( cache * overlaycache ) SetNodeContained ( ctx context . Context , nodeID storj . NodeID , contained bool ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
var query string
if contained {
// only update the timestamp if it's not already set
query = `
UPDATE nodes SET contained = current_timestamp
WHERE id = $ 1 AND contained IS NULL
`
} else {
query = `
UPDATE nodes SET contained = NULL
WHERE id = $ 1
`
}
_ , err = cache . db . DB . ExecContext ( ctx , query , nodeID [ : ] )
return Error . Wrap ( err )
}
2023-01-26 00:09:51 +00:00
// SetAllContainedNodes updates the contained field for all nodes, as necessary.
// containedNodes is expected to be a set of all nodes that should be contained.
// All nodes which are in this set but do not already have a non-NULL contained
// field will be updated to be contained as of the current time, and all nodes
// which are not in this set but are contained in the table will be updated to
// have a NULL contained field.
func ( cache * overlaycache ) SetAllContainedNodes ( ctx context . Context , containedNodes [ ] storj . NodeID ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
updateQuery := `
WITH should_be AS (
SELECT nodes . id , EXISTS ( SELECT 1 FROM unnest ( $ 1 : : BYTEA [ ] ) sb ( i ) WHERE sb . i = id ) AS contained
FROM nodes
)
UPDATE nodes n SET contained =
CASE WHEN should_be . contained
THEN current_timestamp
ELSE NULL
END
FROM should_be
WHERE n . id = should_be . id
AND ( n . contained IS NOT NULL ) != should_be . contained
`
_ , err = cache . db . DB . ExecContext ( ctx , updateQuery , pgutil . NodeIDArray ( containedNodes ) )
return Error . Wrap ( err )
}
2020-07-08 15:28:49 +01:00
var (
// ErrVetting is the error class for the following test methods.
2021-04-28 09:06:17 +01:00
ErrVetting = errs . Class ( "vetting" )
2020-07-08 15:28:49 +01:00
)
// TestVetNode directly sets a node's vetted_at timestamp to make testing easier.
func ( cache * overlaycache ) TestVetNode ( ctx context . Context , nodeID storj . NodeID ) ( vettedTime * time . Time , err error ) {
updateFields := dbx . Node_Update_Fields {
VettedAt : dbx . Node_VettedAt ( time . Now ( ) . UTC ( ) ) ,
}
node , err := cache . db . Update_Node_By_Id ( ctx , dbx . Node_Id ( nodeID . Bytes ( ) ) , updateFields )
if err != nil {
return nil , err
}
return node . VettedAt , nil
}
// TestUnvetNode directly sets a node's vetted_at timestamp to null to make testing easier.
func ( cache * overlaycache ) TestUnvetNode ( ctx context . Context , nodeID storj . NodeID ) ( err error ) {
_ , err = cache . db . Exec ( ctx , ` UPDATE nodes SET vetted_at = NULL WHERE nodes.id = $1; ` , nodeID )
if err != nil {
return err
}
2020-11-28 16:23:39 +00:00
_ , err = cache . Get ( ctx , nodeID )
2020-07-08 15:28:49 +01:00
return err
}
2021-02-18 15:33:49 +00:00
2021-07-07 20:20:23 +01:00
// TestSuspendNodeOffline suspends a storage node for offline.
func ( cache * overlaycache ) TestSuspendNodeOffline ( ctx context . Context , nodeID storj . NodeID , suspendedAt time . Time ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
updateFields := dbx . Node_Update_Fields { }
updateFields . OfflineSuspended = dbx . Node_OfflineSuspended ( suspendedAt . UTC ( ) )
dbNode , err := cache . db . Update_Node_By_Id ( ctx , dbx . Node_Id ( nodeID . Bytes ( ) ) , updateFields )
if err != nil {
return err
}
if dbNode == nil {
return errs . New ( "unable to get node by ID: %v" , nodeID )
}
return nil
}
2022-02-25 10:43:19 +00:00
// TestNodeCountryCode sets node country code.
func ( cache * overlaycache ) TestNodeCountryCode ( ctx context . Context , nodeID storj . NodeID , countryCode string ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
updateFields := dbx . Node_Update_Fields { }
updateFields . CountryCode = dbx . Node_CountryCode ( countryCode )
dbNode , err := cache . db . Update_Node_By_Id ( ctx , dbx . Node_Id ( nodeID . Bytes ( ) ) , updateFields )
if err != nil {
return err
}
if dbNode == nil {
return errs . New ( "unable to set node country code: %v" , nodeID )
}
return nil
}
2022-06-26 02:58:30 +01:00
// IterateAllContactedNodes will call cb on all known nodes (used in restore trash contexts).
func ( cache * overlaycache ) IterateAllContactedNodes ( ctx context . Context , cb func ( context . Context , * overlay . SelectedNode ) error ) ( err error ) {
2021-02-18 15:33:49 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
var rows tagsql . Rows
2022-06-26 02:58:30 +01:00
// 2018-04-06 is the date of the first storj v3 commit.
2021-02-18 15:33:49 +00:00
rows , err = cache . db . Query ( ctx , cache . db . Rebind ( `
2023-06-02 15:47:34 +01:00
SELECT last_net , id , address , last_ip_port , noise_proto , noise_public_key , debounce_limit , features
2021-02-18 15:33:49 +00:00
FROM nodes
2022-06-26 02:58:30 +01:00
WHERE last_contact_success >= timestamp ' 2018 - 04 - 06 '
2021-02-18 15:33:49 +00:00
` ) )
if err != nil {
return Error . Wrap ( err )
}
defer func ( ) { err = errs . Combine ( err , rows . Close ( ) ) } ( )
for rows . Next ( ) {
var node overlay . SelectedNode
2023-01-24 15:59:47 +00:00
node . Address = & pb . NodeAddress { }
2021-02-18 15:33:49 +00:00
var lastIPPort sql . NullString
2023-01-02 16:10:47 +00:00
var noise noiseScanner
2023-06-02 15:47:34 +01:00
err = rows . Scan ( & node . LastNet , & node . ID , & node . Address . Address , & lastIPPort , & noise . Proto , & noise . PublicKey , & node . Address . DebounceLimit , & node . Address . Features )
2021-02-18 15:33:49 +00:00
if err != nil {
return Error . Wrap ( err )
}
if lastIPPort . Valid {
node . LastIPPort = lastIPPort . String
}
2023-01-02 16:10:47 +00:00
node . Address . NoiseInfo = noise . Convert ( )
2021-02-18 15:33:49 +00:00
err = cb ( ctx , & node )
if err != nil {
return err
}
}
return rows . Err ( )
}
2021-03-01 20:04:00 +00:00
// IterateAllNodeDossiers will call cb on all known nodes (used for invoice generation).
func ( cache * overlaycache ) IterateAllNodeDossiers ( ctx context . Context , cb func ( context . Context , * overlay . NodeDossier ) error ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
const nodesPerPage = 1000
var cont * dbx . Paged_Node_Continuation
var dbxNodes [ ] * dbx . Node
for {
dbxNodes , cont , err = cache . db . Paged_Node ( ctx , nodesPerPage , cont )
if err != nil {
return err
}
for _ , node := range dbxNodes {
dossier , err := convertDBNode ( ctx , node )
if err != nil {
return err
}
if err := cb ( ctx , dossier ) ; err != nil {
return err
}
}
if cont == nil {
return nil
}
}
}
2022-11-22 13:02:01 +00:00
func ( cache * overlaycache ) TestUpdateCheckInDirectUpdate ( ctx context . Context , node overlay . NodeCheckInInfo , timestamp time . Time , semVer version . SemVer , walletFeatures string ) ( updated bool , err error ) {
return cache . updateCheckInDirectUpdate ( ctx , node , timestamp , semVer , walletFeatures )
}
2023-01-02 16:10:47 +00:00
type noiseScanner struct {
Proto sql . NullInt32
PublicKey [ ] byte
}
func ( n * noiseScanner ) Convert ( ) * pb . NoiseInfo {
if ! n . Proto . Valid || len ( n . PublicKey ) == 0 {
return nil
}
return & pb . NoiseInfo {
Proto : pb . NoiseProtocol ( n . Proto . Int32 ) ,
PublicKey : n . PublicKey ,
}
}
2023-03-09 16:28:16 +00:00
// OneTimeFixLastNets updates the last_net values for all node records to be equal to their
// last_ip_port values.
//
2023-03-10 19:45:01 +00:00
// This is only appropriate to do when the satellite has DistinctIP=false, the satelliteDB is
// running on PostgreSQL (not CockroachDB), and has been upgraded from before changeset
// I0e7e92498c3da768df5b4d5fb213dcd2d4862924. These are not common circumstances, but they do
// exist. It is only necessary to run this once. When all satellite peer processes are upgraded,
// the function and trigger can be dropped if desired.
2023-03-09 16:28:16 +00:00
func ( cache * overlaycache ) OneTimeFixLastNets ( ctx context . Context ) error {
2023-03-10 19:45:01 +00:00
_ , err := cache . db . ExecContext ( ctx , `
CREATE OR REPLACE FUNCTION fix_last_nets ( )
RETURNS TRIGGER
LANGUAGE plpgsql AS $ $
BEGIN
NEW . last_net = NEW . last_ip_port ;
RETURN NEW ;
END
$ $ ;
CREATE TRIGGER nodes_fix_last_nets
BEFORE INSERT OR UPDATE ON nodes
FOR EACH ROW
EXECUTE PROCEDURE fix_last_nets ( ) ;
` )
if err != nil {
return Error . Wrap ( err )
}
_ , err = cache . db . ExecContext ( ctx , "UPDATE nodes SET last_net = last_ip_port" )
2023-03-09 16:28:16 +00:00
return Error . Wrap ( err )
}