2019-01-24 20:15:10 +00:00
// Copyright (C) 2019 Storj Labs, Inc.
2018-04-18 17:55:28 +01:00
// See LICENSE for copying information.
2018-06-13 19:22:32 +01:00
package overlay
2018-04-18 16:34:15 +01:00
import (
2018-05-16 19:47:59 +01:00
"context"
2019-01-15 16:08:45 +00:00
"errors"
2019-05-30 18:35:04 +01:00
"net"
2019-04-04 17:34:36 +01:00
"time"
2018-04-18 16:34:15 +01:00
2018-06-13 19:22:32 +01:00
"github.com/zeebo/errs"
2018-12-22 04:51:42 +00:00
"go.uber.org/zap"
2018-11-16 16:31:14 +00:00
2019-12-27 11:48:47 +00:00
"storj.io/common/pb"
"storj.io/common/storj"
2021-10-29 18:44:44 +01:00
"storj.io/common/storj/location"
"storj.io/storj/satellite/geoip"
2021-04-21 13:42:57 +01:00
"storj.io/storj/satellite/metabase"
2018-04-18 16:34:15 +01:00
)
2020-07-16 15:18:02 +01:00
// ErrEmptyNode is returned when the nodeID is empty.
2018-12-17 18:47:26 +00:00
var ErrEmptyNode = errs . New ( "empty node ID" )
2020-07-16 15:18:02 +01:00
// ErrNodeNotFound is returned if a node does not exist in database.
2019-03-29 08:53:43 +00:00
var ErrNodeNotFound = errs . Class ( "node not found" )
2018-11-21 17:31:27 +00:00
2020-07-16 15:18:02 +01:00
// ErrNodeOffline is returned if a nodes is offline.
2019-05-27 12:13:47 +01:00
var ErrNodeOffline = errs . Class ( "node is offline" )
2020-07-16 15:18:02 +01:00
// ErrNodeDisqualified is returned if a nodes is disqualified.
2019-06-24 15:46:10 +01:00
var ErrNodeDisqualified = errs . Class ( "node is disqualified" )
2020-08-13 13:00:56 +01:00
// ErrNodeFinishedGE is returned if a node has finished graceful exit.
var ErrNodeFinishedGE = errs . Class ( "node finished graceful exit" )
2020-07-16 15:18:02 +01:00
// ErrNotEnoughNodes is when selecting nodes failed with the given parameters.
2019-01-31 18:49:00 +00:00
var ErrNotEnoughNodes = errs . Class ( "not enough nodes" )
2020-12-05 16:01:42 +00:00
// DB implements the database for overlay.Service.
2019-09-10 14:24:16 +01:00
//
// architecture: Database
2019-01-15 16:08:45 +00:00
type DB interface {
2020-03-30 14:32:02 +01:00
// GetOnlineNodesForGetDelete returns a map of nodes for the supplied nodeIDs
GetOnlineNodesForGetDelete ( ctx context . Context , nodeIDs [ ] storj . NodeID , onlineWindow time . Duration ) ( map [ storj . NodeID ] * SelectedNode , error )
2021-11-08 20:51:04 +00:00
// GetOnlineNodesForAuditRepair returns a map of nodes for the supplied nodeIDs.
// The return value contains necessary information to create orders as well as nodes'
// current reputation status.
GetOnlineNodesForAuditRepair ( ctx context . Context , nodeIDs [ ] storj . NodeID , onlineWindow time . Duration ) ( map [ storj . NodeID ] * NodeReputation , error )
2019-02-11 19:24:51 +00:00
// SelectStorageNodes looks up nodes based on criteria
2020-04-09 16:19:44 +01:00
SelectStorageNodes ( ctx context . Context , totalNeededNodes , newNodeCount int , criteria * NodeCriteria ) ( [ ] * SelectedNode , error )
2020-04-14 21:50:02 +01:00
// SelectAllStorageNodesUpload returns all nodes that qualify to store data, organized as reputable nodes and new nodes
SelectAllStorageNodesUpload ( ctx context . Context , selectionCfg NodeSelectionConfig ) ( reputable , new [ ] * SelectedNode , err error )
2021-01-28 14:33:53 +00:00
// SelectAllStorageNodesDownload returns a nodes that are ready for downloading
SelectAllStorageNodesDownload ( ctx context . Context , onlineWindow time . Duration , asOf AsOfSystemTimeConfig ) ( [ ] * SelectedNode , error )
2020-04-14 21:50:02 +01:00
2019-01-15 16:08:45 +00:00
// Get looks up the node by nodeID
2019-04-04 17:34:36 +01:00
Get ( ctx context . Context , nodeID storj . NodeID ) ( * NodeDossier , error )
2019-06-18 23:22:14 +01:00
// KnownOffline filters a set of nodes to offline nodes
KnownOffline ( context . Context , * NodeCriteria , storj . NodeIDList ) ( storj . NodeIDList , error )
2019-05-01 14:45:52 +01:00
// KnownUnreliableOrOffline filters a set of nodes to unhealth or offlines node, independent of new
KnownUnreliableOrOffline ( context . Context , * NodeCriteria , storj . NodeIDList ) ( storj . NodeIDList , error )
2019-12-16 13:45:13 +00:00
// KnownReliable filters a set of nodes to reliable (online and qualified) nodes.
KnownReliable ( ctx context . Context , onlineWindow time . Duration , nodeIDs storj . NodeIDList ) ( [ ] * pb . Node , error )
2019-07-08 23:04:35 +01:00
// Reliable returns all nodes that are reliable
Reliable ( context . Context , * NodeCriteria ) ( storj . NodeIDList , error )
2021-06-17 15:01:21 +01:00
// UpdateReputation updates the DB columns for all reputation fields in ReputationStatus.
UpdateReputation ( ctx context . Context , id storj . NodeID , request * ReputationStatus ) error
2019-04-10 07:04:24 +01:00
// UpdateNodeInfo updates node dossier with info requested from the node itself like node type, email, wallet, capacity, and version.
2020-06-16 13:16:55 +01:00
UpdateNodeInfo ( ctx context . Context , node storj . NodeID , nodeInfo * InfoResponse ) ( stats * NodeDossier , err error )
2019-09-19 19:37:31 +01:00
// UpdateCheckIn updates a single storagenode's check-in stats.
2019-11-15 22:43:06 +00:00
UpdateCheckIn ( ctx context . Context , node NodeCheckInInfo , timestamp time . Time , config NodeSelectionConfig ) ( err error )
2019-08-27 13:37:42 +01:00
// AllPieceCounts returns a map of node IDs to piece counts from the db.
AllPieceCounts ( ctx context . Context ) ( pieceCounts map [ storj . NodeID ] int , err error )
// UpdatePieceCounts sets the piece count field for the given node IDs.
UpdatePieceCounts ( ctx context . Context , pieceCounts map [ storj . NodeID ] int ) ( err error )
2019-10-01 23:18:21 +01:00
// UpdateExitStatus is used to update a node's graceful exit status.
2019-10-29 20:22:20 +00:00
UpdateExitStatus ( ctx context . Context , request * ExitStatusRequest ) ( _ * NodeDossier , err error )
2019-10-01 23:18:21 +01:00
// GetExitingNodes returns nodes who have initiated a graceful exit, but have not completed it.
2019-10-24 17:24:42 +01:00
GetExitingNodes ( ctx context . Context ) ( exitingNodes [ ] * ExitStatus , err error )
2019-10-23 02:06:01 +01:00
// GetGracefulExitCompletedByTimeFrame returns nodes who have completed graceful exit within a time window (time window is around graceful exit completion).
GetGracefulExitCompletedByTimeFrame ( ctx context . Context , begin , end time . Time ) ( exitedNodes storj . NodeIDList , err error )
// GetGracefulExitIncompleteByTimeFrame returns nodes who have initiated, but not completed graceful exit within a time window (time window is around graceful exit initiation).
GetGracefulExitIncompleteByTimeFrame ( ctx context . Context , begin , end time . Time ) ( exitingNodes storj . NodeIDList , err error )
// GetExitStatus returns a node's graceful exit status.
2019-10-11 22:18:05 +01:00
GetExitStatus ( ctx context . Context , nodeID storj . NodeID ) ( exitStatus * ExitStatus , err error )
2019-11-06 21:38:52 +00:00
2020-03-06 22:04:23 +00:00
// GetNodesNetwork returns the /24 subnet for each storage node, order is not guaranteed.
GetNodesNetwork ( ctx context . Context , nodeIDs [ ] storj . NodeID ) ( nodeNets [ ] string , err error )
2019-12-30 17:10:24 +00:00
2020-01-03 19:11:47 +00:00
// DisqualifyNode disqualifies a storage node.
DisqualifyNode ( ctx context . Context , nodeID storj . NodeID ) ( err error )
2021-07-07 20:20:23 +01:00
2021-04-22 14:43:56 +01:00
// DQNodesLastSeenBefore disqualifies a limited number of nodes where last_contact_success < cutoff except those already disqualified
// or gracefully exited or where last_contact_success = '0001-01-01 00:00:00+00'.
2021-03-15 20:48:36 +00:00
DQNodesLastSeenBefore ( ctx context . Context , cutoff time . Time , limit int ) ( count int , err error )
2020-03-09 15:35:54 +00:00
2021-07-15 15:14:13 +01:00
// TestSuspendNodeUnknownAudit suspends a storage node for unknown audits.
TestSuspendNodeUnknownAudit ( ctx context . Context , nodeID storj . NodeID , suspendedAt time . Time ) ( err error )
// TestUnsuspendNodeUnknownAudit unsuspends a storage node for unknown audits.
TestUnsuspendNodeUnknownAudit ( ctx context . Context , nodeID storj . NodeID ) ( err error )
2020-07-08 15:28:49 +01:00
// TestVetNode directly sets a node's vetted_at timestamp to make testing easier
TestVetNode ( ctx context . Context , nodeID storj . NodeID ) ( vettedTime * time . Time , err error )
// TestUnvetNode directly sets a node's vetted_at timestamp to null to make testing easier
TestUnvetNode ( ctx context . Context , nodeID storj . NodeID ) ( err error )
2021-07-07 20:20:23 +01:00
// TestVetNode directly sets a node's offline_suspended timestamp to make testing easier
TestSuspendNodeOffline ( ctx context . Context , nodeID storj . NodeID , suspendedAt time . Time ) ( err error )
2021-02-18 15:33:49 +00:00
2021-02-18 16:29:28 +00:00
// IterateAllNodes will call cb on all known nodes (used in restore trash contexts).
2021-02-18 15:33:49 +00:00
IterateAllNodes ( context . Context , func ( context . Context , * SelectedNode ) error ) error
2021-03-01 20:04:00 +00:00
// IterateAllNodes will call cb on all known nodes (used for invoice generation).
IterateAllNodeDossiers ( context . Context , func ( context . Context , * NodeDossier ) error ) error
2019-01-15 16:08:45 +00:00
}
2020-07-16 15:18:02 +01:00
// NodeCheckInInfo contains all the info that will be updated when a node checkins.
2019-09-19 19:37:31 +01:00
type NodeCheckInInfo struct {
2021-10-29 18:44:44 +01:00
NodeID storj . NodeID
Address * pb . NodeAddress
LastNet string
LastIPPort string
IsUp bool
Operator * pb . NodeOperator
Capacity * pb . NodeCapacity
Version * pb . NodeVersion
CountryCode location . CountryCode
2019-09-19 19:37:31 +01:00
}
2020-06-16 13:16:55 +01:00
// InfoResponse contains node dossier info requested from the storage node.
type InfoResponse struct {
Type pb . NodeType
Operator * pb . NodeOperator
Capacity * pb . NodeCapacity
Version * pb . NodeVersion
}
2019-03-23 08:06:11 +00:00
// FindStorageNodesRequest defines easy request parameters.
type FindStorageNodesRequest struct {
2021-05-11 09:49:26 +01:00
RequestedCount int
ExcludedIDs [ ] storj . NodeID
MinimumVersion string // semver or empty
AsOfSystemInterval time . Duration // only used for CRDB queries
2021-10-27 09:50:27 +01:00
Placement storj . PlacementConstraint
2019-03-23 08:06:11 +00:00
}
2020-07-16 15:18:02 +01:00
// NodeCriteria are the requirements for selecting nodes.
2019-03-23 08:06:11 +00:00
type NodeCriteria struct {
2021-05-11 09:49:26 +01:00
FreeDisk int64
ExcludedIDs [ ] storj . NodeID
ExcludedNetworks [ ] string // the /24 subnet IPv4 or /64 subnet IPv6 for nodes
MinimumVersion string // semver or empty
OnlineWindow time . Duration
DistinctIP bool
AsOfSystemInterval time . Duration // only used for CRDB queries
2019-03-23 08:06:11 +00:00
}
2021-06-17 15:01:21 +01:00
// ReputationStatus indicates current reputation status for a node.
type ReputationStatus struct {
Contained bool // TODO: check to see if this column is still used.
Disqualified * time . Time
UnknownAuditSuspended * time . Time
OfflineSuspended * time . Time
VettedAt * time . Time
}
2021-06-23 00:09:39 +01:00
// Equal checks if two ReputationStatus contains the same value.
func ( status ReputationStatus ) Equal ( value ReputationStatus ) bool {
if status . Contained != value . Contained {
return false
}
if status . Disqualified != nil && value . Disqualified != nil {
if ! status . Disqualified . Equal ( * value . Disqualified ) {
return false
}
} else if ! ( status . Disqualified == nil && value . Disqualified == nil ) {
return false
}
if status . UnknownAuditSuspended != nil && value . UnknownAuditSuspended != nil {
if ! status . UnknownAuditSuspended . Equal ( * value . UnknownAuditSuspended ) {
return false
}
} else if ! ( status . UnknownAuditSuspended == nil && value . UnknownAuditSuspended == nil ) {
return false
}
if status . OfflineSuspended != nil && value . OfflineSuspended != nil {
if ! status . OfflineSuspended . Equal ( * value . OfflineSuspended ) {
return false
}
} else if ! ( status . OfflineSuspended == nil && value . OfflineSuspended == nil ) {
return false
}
if status . VettedAt != nil && value . VettedAt != nil {
if ! status . VettedAt . Equal ( * value . VettedAt ) {
return false
}
} else if ! ( status . VettedAt == nil && value . VettedAt == nil ) {
return false
}
return true
}
2019-10-11 22:18:05 +01:00
// ExitStatus is used for reading graceful exit status.
type ExitStatus struct {
NodeID storj . NodeID
ExitInitiatedAt * time . Time
ExitLoopCompletedAt * time . Time
ExitFinishedAt * time . Time
2019-10-17 16:01:39 +01:00
ExitSuccess bool
2019-10-11 22:18:05 +01:00
}
2019-10-01 23:18:21 +01:00
// ExitStatusRequest is used to update a node's graceful exit status.
type ExitStatusRequest struct {
NodeID storj . NodeID
ExitInitiatedAt time . Time
ExitLoopCompletedAt time . Time
ExitFinishedAt time . Time
2019-10-17 16:01:39 +01:00
ExitSuccess bool
2019-10-01 23:18:21 +01:00
}
2020-07-16 15:18:02 +01:00
// NodeDossier is the complete info that the satellite tracks for a storage node.
2019-04-04 17:34:36 +01:00
type NodeDossier struct {
pb . Node
2020-06-10 17:11:25 +01:00
Type pb . NodeType
Operator pb . NodeOperator
Capacity pb . NodeCapacity
Reputation NodeStats
Version pb . NodeVersion
Contained bool
Disqualified * time . Time
UnknownAuditSuspended * time . Time
2020-08-26 21:26:10 +01:00
OfflineSuspended * time . Time
OfflineUnderReview * time . Time
2020-06-10 17:11:25 +01:00
PieceCount int64
ExitStatus ExitStatus
CreatedAt time . Time
LastNet string
LastIPPort string
2021-10-29 18:44:44 +01:00
CountryCode location . CountryCode
2019-04-04 17:34:36 +01:00
}
2019-03-29 08:53:43 +00:00
// NodeStats contains statistics about a node.
2019-03-25 22:25:09 +00:00
type NodeStats struct {
2021-11-08 20:51:04 +00:00
Latency90 int64
LastContactSuccess time . Time
LastContactFailure time . Time
OfflineUnderReview * time . Time
Status ReputationStatus
2019-03-25 22:25:09 +00:00
}
2020-07-16 15:18:02 +01:00
// NodeLastContact contains the ID, address, and timestamp.
2020-01-02 20:41:18 +00:00
type NodeLastContact struct {
2020-05-19 17:42:00 +01:00
URL storj . NodeURL
2020-03-06 22:04:23 +00:00
LastIPPort string
2020-01-02 20:41:18 +00:00
LastContactSuccess time . Time
LastContactFailure time . Time
}
2020-03-28 14:56:05 +00:00
// SelectedNode is used as a result for creating orders limits.
type SelectedNode struct {
2021-10-29 18:44:44 +01:00
ID storj . NodeID
Address * pb . NodeAddress
LastNet string
LastIPPort string
CountryCode location . CountryCode
2020-03-28 14:56:05 +00:00
}
2021-11-08 20:51:04 +00:00
// NodeReputation is used as a result for creating orders limits for audits.
type NodeReputation struct {
ID storj . NodeID
Address * pb . NodeAddress
LastNet string
LastIPPort string
Reputation ReputationStatus
}
2020-05-06 18:00:07 +01:00
// Clone returns a deep clone of the selected node.
func ( node * SelectedNode ) Clone ( ) * SelectedNode {
return & SelectedNode {
ID : node . ID ,
Address : & pb . NodeAddress {
Transport : node . Address . Transport ,
Address : node . Address . Address ,
} ,
LastNet : node . LastNet ,
LastIPPort : node . LastIPPort ,
}
}
2020-12-05 16:01:42 +00:00
// Service is used to store and handle node information.
2019-09-10 14:24:16 +01:00
//
// architecture: Service
2019-08-06 17:35:59 +01:00
type Service struct {
2021-01-28 14:33:53 +00:00
log * zap . Logger
db DB
config Config
2021-10-29 18:44:44 +01:00
GeoIP geoip . IPToCountry
2021-01-28 14:33:53 +00:00
UploadSelectionCache * UploadSelectionCache
DownloadSelectionCache * DownloadSelectionCache
2018-04-18 16:34:15 +01:00
}
2020-07-16 15:18:02 +01:00
// NewService returns a new Service.
2020-12-22 19:07:07 +00:00
func NewService ( log * zap . Logger , db DB , config Config ) ( * Service , error ) {
2021-10-29 18:44:44 +01:00
err := config . Node . AsOfSystemTime . isValid ( )
if err != nil {
2020-12-22 19:07:07 +00:00
return nil , err
}
2021-10-29 18:44:44 +01:00
var geoIP geoip . IPToCountry = geoip . NewMockIPToCountry ( config . GeoIP . MockCountries )
if config . GeoIP . DB != "" {
geoIP , err = geoip . OpenMaxmindDB ( config . GeoIP . DB )
if err != nil {
return nil , Error . Wrap ( err )
}
}
2019-08-06 17:35:59 +01:00
return & Service {
2019-07-31 18:21:06 +01:00
log : log ,
db : db ,
config : config ,
2021-01-28 14:33:53 +00:00
2021-10-29 18:44:44 +01:00
GeoIP : geoIP ,
2021-01-28 11:46:18 +00:00
UploadSelectionCache : NewUploadSelectionCache ( log , db ,
2020-04-24 17:11:04 +01:00
config . NodeSelectionCache . Staleness , config . Node ,
) ,
2021-01-28 14:33:53 +00:00
DownloadSelectionCache : NewDownloadSelectionCache ( log , db , DownloadSelectionCacheConfig {
Staleness : config . NodeSelectionCache . Staleness ,
OnlineWindow : config . Node . OnlineWindow ,
AsOfSystemTime : config . Node . AsOfSystemTime ,
} ) ,
2020-12-22 19:07:07 +00:00
} , nil
2018-12-20 13:57:54 +00:00
}
2020-07-16 15:18:02 +01:00
// Close closes resources.
2021-10-29 18:44:44 +01:00
func ( service * Service ) Close ( ) error {
return service . GeoIP . Close ( )
}
2019-01-18 13:54:08 +00:00
2019-08-06 17:35:59 +01:00
// Get looks up the provided nodeID from the overlay.
func ( service * Service ) Get ( ctx context . Context , nodeID storj . NodeID ) ( _ * NodeDossier , err error ) {
2019-03-23 08:06:11 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
2018-12-17 18:47:26 +00:00
if nodeID . IsZero ( ) {
return nil , ErrEmptyNode
}
2019-08-06 17:35:59 +01:00
return service . db . Get ( ctx , nodeID )
2018-04-18 16:34:15 +01:00
}
2020-03-30 14:32:02 +01:00
// GetOnlineNodesForGetDelete returns a map of nodes for the supplied nodeIDs.
func ( service * Service ) GetOnlineNodesForGetDelete ( ctx context . Context , nodeIDs [ ] storj . NodeID ) ( _ map [ storj . NodeID ] * SelectedNode , err error ) {
2020-03-13 18:01:48 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
2020-03-30 14:32:02 +01:00
return service . db . GetOnlineNodesForGetDelete ( ctx , nodeIDs , service . config . Node . OnlineWindow )
2020-03-13 18:01:48 +00:00
}
2021-11-08 20:51:04 +00:00
// GetOnlineNodesForAuditRepair returns a map of nodes for the supplied nodeIDs.
func ( service * Service ) GetOnlineNodesForAuditRepair ( ctx context . Context , nodeIDs [ ] storj . NodeID ) ( _ map [ storj . NodeID ] * NodeReputation , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
return service . db . GetOnlineNodesForAuditRepair ( ctx , nodeIDs , service . config . Node . OnlineWindow )
}
2021-01-13 13:59:05 +00:00
// GetNodeIPs returns a map of node ip:port for the supplied nodeIDs.
func ( service * Service ) GetNodeIPs ( ctx context . Context , nodeIDs [ ] storj . NodeID ) ( _ map [ storj . NodeID ] string , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2021-01-28 15:02:34 +00:00
return service . DownloadSelectionCache . GetNodeIPs ( ctx , nodeIDs )
2021-01-13 13:59:05 +00:00
}
2019-04-23 23:45:50 +01:00
// IsOnline checks if a node is 'online' based on the collected statistics.
2019-08-06 17:35:59 +01:00
func ( service * Service ) IsOnline ( node * NodeDossier ) bool {
2019-11-15 22:43:06 +00:00
return time . Since ( node . Reputation . LastContactSuccess ) < service . config . Node . OnlineWindow
2019-04-23 23:45:50 +01:00
}
2020-05-06 14:05:31 +01:00
// FindStorageNodesForGracefulExit searches the overlay network for nodes that meet the provided requirements for graceful-exit requests.
func ( service * Service ) FindStorageNodesForGracefulExit ( ctx context . Context , req FindStorageNodesRequest ) ( _ [ ] * SelectedNode , err error ) {
2020-04-24 17:11:04 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2021-10-25 11:14:59 +01:00
return service . UploadSelectionCache . GetNodes ( ctx , req )
2020-05-06 14:05:31 +01:00
}
// FindStorageNodesForUpload searches the overlay network for nodes that meet the provided requirements for upload.
//
// When enabled it uses the cache to select nodes.
// When the node selection from the cache fails, it falls back to the old implementation.
func ( service * Service ) FindStorageNodesForUpload ( ctx context . Context , req FindStorageNodesRequest ) ( _ [ ] * SelectedNode , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2020-12-22 19:07:07 +00:00
if service . config . Node . AsOfSystemTime . Enabled && service . config . Node . AsOfSystemTime . DefaultInterval < 0 {
2021-05-11 09:49:26 +01:00
req . AsOfSystemInterval = service . config . Node . AsOfSystemTime . DefaultInterval
2020-12-22 19:07:07 +00:00
}
2020-05-06 14:05:31 +01:00
if service . config . NodeSelectionCache . Disabled {
return service . FindStorageNodesWithPreferences ( ctx , req , & service . config . Node )
}
2021-01-28 11:46:18 +00:00
selectedNodes , err := service . UploadSelectionCache . GetNodes ( ctx , req )
2020-04-24 17:11:04 +01:00
if err != nil {
2021-10-27 09:50:27 +01:00
return selectedNodes , err
2020-04-24 17:11:04 +01:00
}
if len ( selectedNodes ) < req . RequestedCount {
2021-10-27 09:50:27 +01:00
excludedIDs := make ( [ ] string , 0 )
for _ , e := range req . ExcludedIDs {
excludedIDs = append ( excludedIDs , e . String ( ) )
}
service . log . Warn ( "Not enough nodes are available from Node Cache" ,
zap . String ( "minVersion" , req . MinimumVersion ) ,
zap . Strings ( "excludedIDs" , excludedIDs ) ,
zap . Duration ( "asOfSystemInterval" , req . AsOfSystemInterval ) ,
zap . Int ( "requested" , req . RequestedCount ) ,
zap . Int ( "available" , len ( selectedNodes ) ) ,
zap . Uint16 ( "placement" , uint16 ( req . Placement ) ) )
2020-04-24 17:11:04 +01:00
}
2021-10-27 09:50:27 +01:00
return selectedNodes , err
2020-04-24 17:11:04 +01:00
}
2020-05-06 14:05:31 +01:00
// FindStorageNodesWithPreferences searches the overlay network for nodes that meet the provided criteria.
//
// This does not use a cache.
2020-03-28 14:56:05 +00:00
func ( service * Service ) FindStorageNodesWithPreferences ( ctx context . Context , req FindStorageNodesRequest , preferences * NodeSelectionConfig ) ( nodes [ ] * SelectedNode , err error ) {
2019-03-23 08:06:11 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-01-31 18:49:00 +00:00
// TODO: add sanity limits to requested node count
// TODO: add sanity limits to excluded nodes
2020-05-07 12:54:48 +01:00
totalNeededNodes := req . RequestedCount
2019-01-31 18:49:00 +00:00
2020-03-12 18:37:57 +00:00
excludedIDs := req . ExcludedIDs
// if distinctIP is enabled, keep track of the network
// to make sure we only select nodes from different networks
var excludedNetworks [ ] string
if preferences . DistinctIP && len ( excludedIDs ) > 0 {
excludedNetworks , err = service . db . GetNodesNetwork ( ctx , excludedIDs )
2019-11-06 21:38:52 +00:00
if err != nil {
return nil , Error . Wrap ( err )
}
}
2019-04-23 16:23:51 +01:00
newNodeCount := 0
2020-03-18 21:16:13 +00:00
if preferences . NewNodeFraction > 0 {
2020-04-09 16:19:44 +01:00
newNodeCount = int ( float64 ( totalNeededNodes ) * preferences . NewNodeFraction )
2019-04-23 16:23:51 +01:00
}
2019-05-07 15:44:47 +01:00
criteria := NodeCriteria {
2021-05-11 09:49:26 +01:00
FreeDisk : preferences . MinimumDiskSpace . Int64 ( ) ,
ExcludedIDs : excludedIDs ,
ExcludedNetworks : excludedNetworks ,
MinimumVersion : preferences . MinimumVersion ,
OnlineWindow : preferences . OnlineWindow ,
DistinctIP : preferences . DistinctIP ,
AsOfSystemInterval : req . AsOfSystemInterval ,
2019-05-07 15:44:47 +01:00
}
2020-04-09 16:19:44 +01:00
nodes , err = service . db . SelectStorageNodes ( ctx , totalNeededNodes , newNodeCount , & criteria )
2019-01-31 18:49:00 +00:00
if err != nil {
2019-08-06 17:35:59 +01:00
return nil , Error . Wrap ( err )
2019-01-31 18:49:00 +00:00
}
2020-04-09 16:19:44 +01:00
if len ( nodes ) < totalNeededNodes {
return nodes , ErrNotEnoughNodes . New ( "requested %d found %d; %+v " , totalNeededNodes , len ( nodes ) , criteria )
2019-01-31 18:49:00 +00:00
}
return nodes , nil
}
2020-07-16 15:18:02 +01:00
// KnownOffline filters a set of nodes to offline nodes.
2019-08-06 17:35:59 +01:00
func ( service * Service ) KnownOffline ( ctx context . Context , nodeIds storj . NodeIDList ) ( offlineNodes storj . NodeIDList , err error ) {
2019-06-18 23:22:14 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
criteria := & NodeCriteria {
2019-08-06 17:35:59 +01:00
OnlineWindow : service . config . Node . OnlineWindow ,
2019-06-18 23:22:14 +01:00
}
2019-08-06 17:35:59 +01:00
return service . db . KnownOffline ( ctx , criteria , nodeIds )
2019-06-18 23:22:14 +01:00
}
2019-05-10 20:05:42 +01:00
// KnownUnreliableOrOffline filters a set of nodes to unhealth or offlines node, independent of new.
2019-08-06 17:35:59 +01:00
func ( service * Service ) KnownUnreliableOrOffline ( ctx context . Context , nodeIds storj . NodeIDList ) ( badNodes storj . NodeIDList , err error ) {
2019-03-23 08:06:11 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-05-01 14:45:52 +01:00
criteria := & NodeCriteria {
2019-08-06 17:35:59 +01:00
OnlineWindow : service . config . Node . OnlineWindow ,
2018-09-11 05:52:14 +01:00
}
2019-08-06 17:35:59 +01:00
return service . db . KnownUnreliableOrOffline ( ctx , criteria , nodeIds )
2018-09-11 05:52:14 +01:00
}
2019-12-16 13:45:13 +00:00
// KnownReliable filters a set of nodes to reliable (online and qualified) nodes.
func ( service * Service ) KnownReliable ( ctx context . Context , nodeIDs storj . NodeIDList ) ( nodes [ ] * pb . Node , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
return service . db . KnownReliable ( ctx , service . config . Node . OnlineWindow , nodeIDs )
}
2019-07-08 23:04:35 +01:00
// Reliable filters a set of nodes that are reliable, independent of new.
2019-08-06 17:35:59 +01:00
func ( service * Service ) Reliable ( ctx context . Context ) ( nodes storj . NodeIDList , err error ) {
2019-07-08 23:04:35 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
criteria := & NodeCriteria {
2019-08-06 17:35:59 +01:00
OnlineWindow : service . config . Node . OnlineWindow ,
2019-07-08 23:04:35 +01:00
}
2019-08-06 17:35:59 +01:00
return service . db . Reliable ( ctx , criteria )
2019-07-08 23:04:35 +01:00
}
2021-06-17 15:01:21 +01:00
// UpdateReputation updates the DB columns for any of the reputation fields.
func ( service * Service ) UpdateReputation ( ctx context . Context , id storj . NodeID , request * ReputationStatus ) ( err error ) {
2021-08-05 12:07:45 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2021-06-17 15:01:21 +01:00
return service . db . UpdateReputation ( ctx , id , request )
}
2019-04-10 07:04:24 +01:00
// UpdateNodeInfo updates node dossier with info requested from the node itself like node type, email, wallet, capacity, and version.
2020-06-16 13:16:55 +01:00
func ( service * Service ) UpdateNodeInfo ( ctx context . Context , node storj . NodeID , nodeInfo * InfoResponse ) ( stats * NodeDossier , err error ) {
2019-03-25 22:25:09 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-08-06 17:35:59 +01:00
return service . db . UpdateNodeInfo ( ctx , node , nodeInfo )
2019-03-25 22:25:09 +00:00
}
2021-08-02 18:48:55 +01:00
// UpdateCheckIn updates a single storagenode's check-in info if needed.
2021-08-05 14:07:14 +01:00
/ *
The check - in info is updated in the database if :
( 1 ) there is no previous entry ;
( 2 ) it has been too long since the last known entry ; or
( 3 ) the node hostname , IP address , port , wallet , sw version , or disk capacity
has changed .
Note that there can be a race between acquiring the previous entry and
performing the update , so if two updates happen at about the same time it is
not defined which one will end up in the database .
* /
2019-11-15 22:43:06 +00:00
func ( service * Service ) UpdateCheckIn ( ctx context . Context , node NodeCheckInInfo , timestamp time . Time ) ( err error ) {
2019-09-19 19:37:31 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2021-10-29 18:44:44 +01:00
failureMeter := mon . Meter ( "geofencing_lookup_failed" )
2021-06-15 17:32:12 +01:00
oldInfo , err := service . Get ( ctx , node . NodeID )
if err != nil && ! ErrNodeNotFound . Has ( err ) {
return Error . New ( "failed to get node info from DB" )
}
if oldInfo == nil {
2021-10-29 18:44:44 +01:00
node . CountryCode , err = service . GeoIP . LookupISOCountryCode ( node . LastIPPort )
if err != nil {
failureMeter . Mark ( 1 )
service . log . Debug ( "failed to resolve country code for node" ,
zap . String ( "node address" , node . Address . Address ) ,
zap . Stringer ( "Node ID" , node . NodeID ) ,
zap . Error ( err ) )
}
2021-06-15 17:32:12 +01:00
return service . db . UpdateCheckIn ( ctx , node , timestamp , service . config . Node )
}
lastUp , lastDown := oldInfo . Reputation . LastContactSuccess , oldInfo . Reputation . LastContactFailure
lastContact := lastUp
if lastContact . Before ( lastDown ) {
lastContact = lastDown
}
dbStale := lastContact . Add ( service . config . NodeCheckInWaitPeriod ) . Before ( timestamp ) ||
( node . IsUp && lastUp . Before ( lastDown ) ) || ( ! node . IsUp && lastDown . Before ( lastUp ) )
addrChanged := ( ( node . Address == nil ) != ( oldInfo . Address == nil ) ) ||
( oldInfo . Address != nil && node . Address != nil && oldInfo . Address . Address != node . Address . Address )
walletChanged := ( node . Operator == nil && oldInfo . Operator . Wallet != "" ) ||
( node . Operator != nil && oldInfo . Operator . Wallet != node . Operator . Wallet )
verChanged := ( node . Version == nil && oldInfo . Version . Version != "" ) ||
( node . Version != nil && oldInfo . Version . Version != node . Version . Version )
spaceChanged := ( node . Capacity == nil && oldInfo . Capacity . FreeDisk != 0 ) ||
( node . Capacity != nil && node . Capacity . FreeDisk != oldInfo . Capacity . FreeDisk )
2021-10-29 18:44:44 +01:00
if oldInfo . CountryCode == location . CountryCode ( 0 ) || oldInfo . LastIPPort != node . LastIPPort {
node . CountryCode , err = service . GeoIP . LookupISOCountryCode ( node . LastIPPort )
if err != nil {
failureMeter . Mark ( 1 )
service . log . Debug ( "failed to resolve country code for node" ,
zap . String ( "node address" , node . Address . Address ) ,
zap . Stringer ( "Node ID" , node . NodeID ) ,
zap . Error ( err ) )
}
} else {
node . CountryCode = oldInfo . CountryCode
}
2021-06-15 17:32:12 +01:00
if dbStale || addrChanged || walletChanged || verChanged || spaceChanged ||
2021-10-29 18:44:44 +01:00
oldInfo . LastNet != node . LastNet || oldInfo . LastIPPort != node . LastIPPort ||
oldInfo . CountryCode != node . CountryCode {
2021-06-15 17:32:12 +01:00
return service . db . UpdateCheckIn ( ctx , node , timestamp , service . config . Node )
}
2021-08-02 18:48:55 +01:00
2021-06-15 17:32:12 +01:00
service . log . Debug ( "ignoring unnecessary check-in" ,
zap . String ( "node address" , node . Address . Address ) ,
zap . Stringer ( "Node ID" , node . NodeID ) )
2021-08-02 18:48:55 +01:00
mon . Event ( "unnecessary_node_check_in" )
2021-06-15 17:32:12 +01:00
return nil
2019-09-19 19:37:31 +01:00
}
2020-07-16 15:18:02 +01:00
// GetMissingPieces returns the list of offline nodes.
2020-12-14 14:29:48 +00:00
func ( service * Service ) GetMissingPieces ( ctx context . Context , pieces metabase . Pieces ) ( missingPieces [ ] uint16 , err error ) {
2019-06-04 12:36:27 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-05-16 14:49:10 +01:00
var nodeIDs storj . NodeIDList
for _ , p := range pieces {
2020-12-14 14:29:48 +00:00
nodeIDs = append ( nodeIDs , p . StorageNode )
2019-05-16 14:49:10 +01:00
}
2019-08-06 17:35:59 +01:00
badNodeIDs , err := service . KnownUnreliableOrOffline ( ctx , nodeIDs )
2019-05-16 14:49:10 +01:00
if err != nil {
return nil , Error . New ( "error getting nodes %s" , err )
}
for _ , p := range pieces {
for _ , nodeID := range badNodeIDs {
2020-12-14 14:29:48 +00:00
if nodeID == p . StorageNode {
missingPieces = append ( missingPieces , p . Number )
2019-05-16 14:49:10 +01:00
}
}
}
return missingPieces , nil
}
2019-05-30 18:35:04 +01:00
2020-01-03 19:11:47 +00:00
// DisqualifyNode disqualifies a storage node.
func ( service * Service ) DisqualifyNode ( ctx context . Context , nodeID storj . NodeID ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
return service . db . DisqualifyNode ( ctx , nodeID )
}
2020-07-16 15:18:02 +01:00
// ResolveIPAndNetwork resolves the target address and determines its IP and /24 subnet IPv4 or /64 subnet IPv6.
2020-03-12 18:37:57 +00:00
func ResolveIPAndNetwork ( ctx context . Context , target string ) ( ipPort , network string , err error ) {
2019-06-24 16:33:18 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2020-03-06 22:04:23 +00:00
host , port , err := net . SplitHostPort ( target )
if err != nil {
return "" , "" , err
}
ipAddr , err := net . ResolveIPAddr ( "ip" , host )
2019-05-30 18:35:04 +01:00
if err != nil {
2020-03-06 22:04:23 +00:00
return "" , "" , err
2019-05-30 18:35:04 +01:00
}
2019-06-24 16:33:18 +01:00
// If addr can be converted to 4byte notation, it is an IPv4 address, else its an IPv6 address
2020-03-06 22:04:23 +00:00
if ipv4 := ipAddr . IP . To4 ( ) ; ipv4 != nil {
2020-10-13 13:47:55 +01:00
// Filter all IPv4 Addresses into /24 Subnet's
2019-06-24 16:33:18 +01:00
mask := net . CIDRMask ( 24 , 32 )
2020-03-06 22:04:23 +00:00
return net . JoinHostPort ( ipAddr . String ( ) , port ) , ipv4 . Mask ( mask ) . String ( ) , nil
2019-06-24 16:33:18 +01:00
}
2020-03-06 22:04:23 +00:00
if ipv6 := ipAddr . IP . To16 ( ) ; ipv6 != nil {
2020-10-13 13:47:55 +01:00
// Filter all IPv6 Addresses into /64 Subnet's
2019-06-24 16:33:18 +01:00
mask := net . CIDRMask ( 64 , 128 )
2020-03-06 22:04:23 +00:00
return net . JoinHostPort ( ipAddr . String ( ) , port ) , ipv6 . Mask ( mask ) . String ( ) , nil
2019-06-24 16:33:18 +01:00
}
2020-03-06 22:04:23 +00:00
return "" , "" , errors . New ( "unable to get network for address " + ipAddr . String ( ) )
2019-05-30 18:35:04 +01:00
}
2020-07-08 15:28:49 +01:00
// TestVetNode directly sets a node's vetted_at timestamp to make testing easier.
func ( service * Service ) TestVetNode ( ctx context . Context , nodeID storj . NodeID ) ( vettedTime * time . Time , err error ) {
vettedTime , err = service . db . TestVetNode ( ctx , nodeID )
service . log . Warn ( "node vetted" , zap . Stringer ( "node ID" , nodeID ) , zap . Stringer ( "vetted time" , vettedTime ) )
if err != nil {
service . log . Warn ( "error vetting node" , zap . Stringer ( "node ID" , nodeID ) )
return nil , err
}
2021-01-28 11:46:18 +00:00
err = service . UploadSelectionCache . Refresh ( ctx )
2020-07-08 15:28:49 +01:00
service . log . Warn ( "nodecache refresh err" , zap . Error ( err ) )
return vettedTime , err
}
// TestUnvetNode directly sets a node's vetted_at timestamp to null to make testing easier.
func ( service * Service ) TestUnvetNode ( ctx context . Context , nodeID storj . NodeID ) ( err error ) {
err = service . db . TestUnvetNode ( ctx , nodeID )
if err != nil {
service . log . Warn ( "error unvetting node" , zap . Stringer ( "node ID" , nodeID ) , zap . Error ( err ) )
return err
}
2021-01-28 11:46:18 +00:00
err = service . UploadSelectionCache . Refresh ( ctx )
2020-07-08 15:28:49 +01:00
service . log . Warn ( "nodecache refresh err" , zap . Error ( err ) )
return err
}