2019-01-24 20:15:10 +00:00
// Copyright (C) 2019 Storj Labs, Inc.
2018-04-18 17:55:28 +01:00
// See LICENSE for copying information.
2018-06-13 19:22:32 +01:00
package overlay
2018-04-18 16:34:15 +01:00
import (
2018-05-16 19:47:59 +01:00
"context"
2019-01-15 16:08:45 +00:00
"errors"
2019-05-30 18:35:04 +01:00
"net"
2019-04-04 17:34:36 +01:00
"time"
2018-04-18 16:34:15 +01:00
2018-06-13 19:22:32 +01:00
"github.com/zeebo/errs"
2018-12-22 04:51:42 +00:00
"go.uber.org/zap"
2018-11-16 16:31:14 +00:00
2019-12-27 11:48:47 +00:00
"storj.io/common/pb"
"storj.io/common/storj"
2018-06-13 19:22:32 +01:00
"storj.io/storj/storage"
2018-04-18 16:34:15 +01:00
)
2020-07-16 15:18:02 +01:00
// ErrEmptyNode is returned when the nodeID is empty.
2018-12-17 18:47:26 +00:00
var ErrEmptyNode = errs . New ( "empty node ID" )
2020-07-16 15:18:02 +01:00
// ErrNodeNotFound is returned if a node does not exist in database.
2019-03-29 08:53:43 +00:00
var ErrNodeNotFound = errs . Class ( "node not found" )
2018-11-21 17:31:27 +00:00
2020-07-16 15:18:02 +01:00
// ErrNodeOffline is returned if a nodes is offline.
2019-05-27 12:13:47 +01:00
var ErrNodeOffline = errs . Class ( "node is offline" )
2020-07-16 15:18:02 +01:00
// ErrNodeDisqualified is returned if a nodes is disqualified.
2019-06-24 15:46:10 +01:00
var ErrNodeDisqualified = errs . Class ( "node is disqualified" )
2020-08-13 13:00:56 +01:00
// ErrNodeFinishedGE is returned if a node has finished graceful exit.
var ErrNodeFinishedGE = errs . Class ( "node finished graceful exit" )
2020-07-16 15:18:02 +01:00
// ErrNotEnoughNodes is when selecting nodes failed with the given parameters.
2019-01-31 18:49:00 +00:00
var ErrNotEnoughNodes = errs . Class ( "not enough nodes" )
2020-12-05 16:01:42 +00:00
// DB implements the database for overlay.Service.
2019-09-10 14:24:16 +01:00
//
// architecture: Database
2019-01-15 16:08:45 +00:00
type DB interface {
2020-03-30 14:32:02 +01:00
// GetOnlineNodesForGetDelete returns a map of nodes for the supplied nodeIDs
GetOnlineNodesForGetDelete ( ctx context . Context , nodeIDs [ ] storj . NodeID , onlineWindow time . Duration ) ( map [ storj . NodeID ] * SelectedNode , error )
2019-02-11 19:24:51 +00:00
// SelectStorageNodes looks up nodes based on criteria
2020-04-09 16:19:44 +01:00
SelectStorageNodes ( ctx context . Context , totalNeededNodes , newNodeCount int , criteria * NodeCriteria ) ( [ ] * SelectedNode , error )
2020-04-14 21:50:02 +01:00
// SelectAllStorageNodesUpload returns all nodes that qualify to store data, organized as reputable nodes and new nodes
SelectAllStorageNodesUpload ( ctx context . Context , selectionCfg NodeSelectionConfig ) ( reputable , new [ ] * SelectedNode , err error )
2019-01-15 16:08:45 +00:00
// Get looks up the node by nodeID
2019-04-04 17:34:36 +01:00
Get ( ctx context . Context , nodeID storj . NodeID ) ( * NodeDossier , error )
2019-06-18 23:22:14 +01:00
// KnownOffline filters a set of nodes to offline nodes
KnownOffline ( context . Context , * NodeCriteria , storj . NodeIDList ) ( storj . NodeIDList , error )
2019-05-01 14:45:52 +01:00
// KnownUnreliableOrOffline filters a set of nodes to unhealth or offlines node, independent of new
KnownUnreliableOrOffline ( context . Context , * NodeCriteria , storj . NodeIDList ) ( storj . NodeIDList , error )
2019-12-16 13:45:13 +00:00
// KnownReliable filters a set of nodes to reliable (online and qualified) nodes.
KnownReliable ( ctx context . Context , onlineWindow time . Duration , nodeIDs storj . NodeIDList ) ( [ ] * pb . Node , error )
2019-07-08 23:04:35 +01:00
// Reliable returns all nodes that are reliable
Reliable ( context . Context , * NodeCriteria ) ( storj . NodeIDList , error )
2020-08-26 21:26:10 +01:00
// BatchUpdateStats updates multiple storagenode's stats in one transaction.
BatchUpdateStats ( ctx context . Context , updateRequests [ ] * UpdateRequest , batchSize int , now time . Time ) ( failed storj . NodeIDList , err error )
2019-03-25 22:25:09 +00:00
// UpdateStats all parts of single storagenode's stats.
2020-08-26 21:26:10 +01:00
UpdateStats ( ctx context . Context , request * UpdateRequest , now time . Time ) ( stats * NodeStats , err error )
2019-04-10 07:04:24 +01:00
// UpdateNodeInfo updates node dossier with info requested from the node itself like node type, email, wallet, capacity, and version.
2020-06-16 13:16:55 +01:00
UpdateNodeInfo ( ctx context . Context , node storj . NodeID , nodeInfo * InfoResponse ) ( stats * NodeDossier , err error )
2019-09-19 19:37:31 +01:00
// UpdateCheckIn updates a single storagenode's check-in stats.
2019-11-15 22:43:06 +00:00
UpdateCheckIn ( ctx context . Context , node NodeCheckInInfo , timestamp time . Time , config NodeSelectionConfig ) ( err error )
2019-08-27 13:37:42 +01:00
// AllPieceCounts returns a map of node IDs to piece counts from the db.
AllPieceCounts ( ctx context . Context ) ( pieceCounts map [ storj . NodeID ] int , err error )
// UpdatePieceCounts sets the piece count field for the given node IDs.
UpdatePieceCounts ( ctx context . Context , pieceCounts map [ storj . NodeID ] int ) ( err error )
2019-10-01 23:18:21 +01:00
// UpdateExitStatus is used to update a node's graceful exit status.
2019-10-29 20:22:20 +00:00
UpdateExitStatus ( ctx context . Context , request * ExitStatusRequest ) ( _ * NodeDossier , err error )
2019-10-01 23:18:21 +01:00
// GetExitingNodes returns nodes who have initiated a graceful exit, but have not completed it.
2019-10-24 17:24:42 +01:00
GetExitingNodes ( ctx context . Context ) ( exitingNodes [ ] * ExitStatus , err error )
2019-10-23 02:06:01 +01:00
// GetGracefulExitCompletedByTimeFrame returns nodes who have completed graceful exit within a time window (time window is around graceful exit completion).
GetGracefulExitCompletedByTimeFrame ( ctx context . Context , begin , end time . Time ) ( exitedNodes storj . NodeIDList , err error )
// GetGracefulExitIncompleteByTimeFrame returns nodes who have initiated, but not completed graceful exit within a time window (time window is around graceful exit initiation).
GetGracefulExitIncompleteByTimeFrame ( ctx context . Context , begin , end time . Time ) ( exitingNodes storj . NodeIDList , err error )
// GetExitStatus returns a node's graceful exit status.
2019-10-11 22:18:05 +01:00
GetExitStatus ( ctx context . Context , nodeID storj . NodeID ) ( exitStatus * ExitStatus , err error )
2019-11-06 21:38:52 +00:00
2020-03-06 22:04:23 +00:00
// GetNodesNetwork returns the /24 subnet for each storage node, order is not guaranteed.
GetNodesNetwork ( ctx context . Context , nodeIDs [ ] storj . NodeID ) ( nodeNets [ ] string , err error )
2019-12-30 17:10:24 +00:00
2020-01-03 19:11:47 +00:00
// DisqualifyNode disqualifies a storage node.
DisqualifyNode ( ctx context . Context , nodeID storj . NodeID ) ( err error )
2020-12-31 18:43:13 +00:00
// DQNodesLastSeenBefore disqualifies all nodes where last_contact_success < cutoff.
DQNodesLastSeenBefore ( ctx context . Context , cutoff time . Time ) ( err error )
2020-03-09 15:35:54 +00:00
2020-06-10 17:11:25 +01:00
// SuspendNodeUnknownAudit suspends a storage node for unknown audits.
SuspendNodeUnknownAudit ( ctx context . Context , nodeID storj . NodeID , suspendedAt time . Time ) ( err error )
// UnsuspendNodeUnknownAudit unsuspends a storage node for unknown audits.
UnsuspendNodeUnknownAudit ( ctx context . Context , nodeID storj . NodeID ) ( err error )
2020-07-08 15:28:49 +01:00
// TestVetNode directly sets a node's vetted_at timestamp to make testing easier
TestVetNode ( ctx context . Context , nodeID storj . NodeID ) ( vettedTime * time . Time , err error )
// TestUnvetNode directly sets a node's vetted_at timestamp to null to make testing easier
TestUnvetNode ( ctx context . Context , nodeID storj . NodeID ) ( err error )
2020-12-21 17:26:07 +00:00
// AuditHistoryDB includes operations for interfacing with the audit history table.
AuditHistoryDB
2019-01-15 16:08:45 +00:00
}
2020-07-16 15:18:02 +01:00
// NodeCheckInInfo contains all the info that will be updated when a node checkins.
2019-09-19 19:37:31 +01:00
type NodeCheckInInfo struct {
2020-03-06 22:04:23 +00:00
NodeID storj . NodeID
Address * pb . NodeAddress
LastNet string
LastIPPort string
IsUp bool
Operator * pb . NodeOperator
Capacity * pb . NodeCapacity
Version * pb . NodeVersion
2019-09-19 19:37:31 +01:00
}
2020-06-16 13:16:55 +01:00
// InfoResponse contains node dossier info requested from the storage node.
type InfoResponse struct {
Type pb . NodeType
Operator * pb . NodeOperator
Capacity * pb . NodeCapacity
Version * pb . NodeVersion
}
2019-03-23 08:06:11 +00:00
// FindStorageNodesRequest defines easy request parameters.
type FindStorageNodesRequest struct {
2020-12-22 19:07:07 +00:00
RequestedCount int
ExcludedIDs [ ] storj . NodeID
MinimumVersion string // semver or empty
AsOfSystemTimeInterval time . Duration // only used for CRDB queries
2019-03-23 08:06:11 +00:00
}
2020-07-16 15:18:02 +01:00
// NodeCriteria are the requirements for selecting nodes.
2019-03-23 08:06:11 +00:00
type NodeCriteria struct {
2020-12-22 19:07:07 +00:00
FreeDisk int64
ExcludedIDs [ ] storj . NodeID
ExcludedNetworks [ ] string // the /24 subnet IPv4 or /64 subnet IPv6 for nodes
MinimumVersion string // semver or empty
OnlineWindow time . Duration
DistinctIP bool
AsOfSystemTimeInterval time . Duration // only used for CRDB queries
2019-03-23 08:06:11 +00:00
}
2020-03-09 15:35:54 +00:00
// AuditType is an enum representing the outcome of a particular audit reported to the overlay.
type AuditType int
const (
// AuditSuccess represents a successful audit.
AuditSuccess AuditType = iota
// AuditFailure represents a failed audit.
AuditFailure
// AuditUnknown represents an audit that resulted in an unknown error from the node.
AuditUnknown
2020-10-22 22:02:48 +01:00
// AuditOffline represents an audit where a node was offline.
AuditOffline
2020-03-09 15:35:54 +00:00
)
2019-03-25 22:25:09 +00:00
// UpdateRequest is used to update a node status.
type UpdateRequest struct {
NodeID storj . NodeID
2020-03-09 15:35:54 +00:00
AuditOutcome AuditType
2019-06-20 14:56:04 +01:00
// n.b. these are set values from the satellite.
// They are part of the UpdateRequest struct in order to be
// more easily accessible in satellite/satellitedb/overlaycache.go.
2020-12-17 22:07:06 +00:00
AuditLambda float64
AuditWeight float64
AuditDQ float64
SuspensionGracePeriod time . Duration
SuspensionDQEnabled bool
AuditsRequiredForVetting int64
AuditHistory AuditHistoryConfig
2019-03-25 22:25:09 +00:00
}
2019-10-11 22:18:05 +01:00
// ExitStatus is used for reading graceful exit status.
type ExitStatus struct {
NodeID storj . NodeID
ExitInitiatedAt * time . Time
ExitLoopCompletedAt * time . Time
ExitFinishedAt * time . Time
2019-10-17 16:01:39 +01:00
ExitSuccess bool
2019-10-11 22:18:05 +01:00
}
2019-10-01 23:18:21 +01:00
// ExitStatusRequest is used to update a node's graceful exit status.
type ExitStatusRequest struct {
NodeID storj . NodeID
ExitInitiatedAt time . Time
ExitLoopCompletedAt time . Time
ExitFinishedAt time . Time
2019-10-17 16:01:39 +01:00
ExitSuccess bool
2019-10-01 23:18:21 +01:00
}
2020-07-16 15:18:02 +01:00
// NodeDossier is the complete info that the satellite tracks for a storage node.
2019-04-04 17:34:36 +01:00
type NodeDossier struct {
pb . Node
2020-06-10 17:11:25 +01:00
Type pb . NodeType
Operator pb . NodeOperator
Capacity pb . NodeCapacity
Reputation NodeStats
Version pb . NodeVersion
Contained bool
Disqualified * time . Time
UnknownAuditSuspended * time . Time
2020-08-26 21:26:10 +01:00
OfflineSuspended * time . Time
OfflineUnderReview * time . Time
2020-06-10 17:11:25 +01:00
PieceCount int64
ExitStatus ExitStatus
CreatedAt time . Time
LastNet string
LastIPPort string
2019-04-04 17:34:36 +01:00
}
2019-03-29 08:53:43 +00:00
// NodeStats contains statistics about a node.
2019-03-25 22:25:09 +00:00
type NodeStats struct {
2020-03-09 15:35:54 +00:00
Latency90 int64
2020-05-20 20:57:53 +01:00
VettedAt * time . Time
2020-03-09 15:35:54 +00:00
AuditSuccessCount int64
AuditCount int64
LastContactSuccess time . Time
LastContactFailure time . Time
AuditReputationAlpha float64
AuditReputationBeta float64
Disqualified * time . Time
UnknownAuditReputationAlpha float64
UnknownAuditReputationBeta float64
2020-06-10 17:11:25 +01:00
UnknownAuditSuspended * time . Time
2020-08-28 20:43:53 +01:00
OfflineUnderReview * time . Time
OfflineSuspended * time . Time
OnlineScore float64
2019-03-25 22:25:09 +00:00
}
2020-07-16 15:18:02 +01:00
// NodeLastContact contains the ID, address, and timestamp.
2020-01-02 20:41:18 +00:00
type NodeLastContact struct {
2020-05-19 17:42:00 +01:00
URL storj . NodeURL
2020-03-06 22:04:23 +00:00
LastIPPort string
2020-01-02 20:41:18 +00:00
LastContactSuccess time . Time
LastContactFailure time . Time
}
2020-03-28 14:56:05 +00:00
// SelectedNode is used as a result for creating orders limits.
type SelectedNode struct {
ID storj . NodeID
Address * pb . NodeAddress
LastNet string
LastIPPort string
}
2020-05-06 18:00:07 +01:00
// Clone returns a deep clone of the selected node.
func ( node * SelectedNode ) Clone ( ) * SelectedNode {
return & SelectedNode {
ID : node . ID ,
Address : & pb . NodeAddress {
Transport : node . Address . Transport ,
Address : node . Address . Address ,
} ,
LastNet : node . LastNet ,
LastIPPort : node . LastIPPort ,
}
}
2020-12-05 16:01:42 +00:00
// Service is used to store and handle node information.
2019-09-10 14:24:16 +01:00
//
// architecture: Service
2019-08-06 17:35:59 +01:00
type Service struct {
2021-01-28 11:46:18 +00:00
log * zap . Logger
db DB
config Config
UploadSelectionCache * UploadSelectionCache
2018-04-18 16:34:15 +01:00
}
2020-07-16 15:18:02 +01:00
// NewService returns a new Service.
2020-12-22 19:07:07 +00:00
func NewService ( log * zap . Logger , db DB , config Config ) ( * Service , error ) {
if err := config . Node . AsOfSystemTime . isValid ( ) ; err != nil {
return nil , err
}
2019-08-06 17:35:59 +01:00
return & Service {
2019-07-31 18:21:06 +01:00
log : log ,
db : db ,
config : config ,
2021-01-28 11:46:18 +00:00
UploadSelectionCache : NewUploadSelectionCache ( log , db ,
2020-04-24 17:11:04 +01:00
config . NodeSelectionCache . Staleness , config . Node ,
) ,
2020-12-22 19:07:07 +00:00
} , nil
2018-12-20 13:57:54 +00:00
}
2020-07-16 15:18:02 +01:00
// Close closes resources.
2019-08-06 17:35:59 +01:00
func ( service * Service ) Close ( ) error { return nil }
2019-01-18 13:54:08 +00:00
2020-07-16 15:18:02 +01:00
// Inspect lists limited number of items in the cache.
2019-08-06 17:35:59 +01:00
func ( service * Service ) Inspect ( ctx context . Context ) ( _ storage . Keys , err error ) {
2019-06-04 12:36:27 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-01-15 16:08:45 +00:00
// TODO: implement inspection tools
return nil , errors . New ( "not implemented" )
2018-06-13 19:22:32 +01:00
}
2019-08-06 17:35:59 +01:00
// Get looks up the provided nodeID from the overlay.
func ( service * Service ) Get ( ctx context . Context , nodeID storj . NodeID ) ( _ * NodeDossier , err error ) {
2019-03-23 08:06:11 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
2018-12-17 18:47:26 +00:00
if nodeID . IsZero ( ) {
return nil , ErrEmptyNode
}
2019-08-06 17:35:59 +01:00
return service . db . Get ( ctx , nodeID )
2018-04-18 16:34:15 +01:00
}
2020-03-30 14:32:02 +01:00
// GetOnlineNodesForGetDelete returns a map of nodes for the supplied nodeIDs.
func ( service * Service ) GetOnlineNodesForGetDelete ( ctx context . Context , nodeIDs [ ] storj . NodeID ) ( _ map [ storj . NodeID ] * SelectedNode , err error ) {
2020-03-13 18:01:48 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
2020-03-30 14:32:02 +01:00
return service . db . GetOnlineNodesForGetDelete ( ctx , nodeIDs , service . config . Node . OnlineWindow )
2020-03-13 18:01:48 +00:00
}
2021-01-13 13:59:05 +00:00
// GetNodeIPs returns a map of node ip:port for the supplied nodeIDs.
func ( service * Service ) GetNodeIPs ( ctx context . Context , nodeIDs [ ] storj . NodeID ) ( _ map [ storj . NodeID ] string , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2021-01-28 11:46:18 +00:00
return service . UploadSelectionCache . GetNodeIPs ( ctx , nodeIDs )
2021-01-13 13:59:05 +00:00
}
2019-04-23 23:45:50 +01:00
// IsOnline checks if a node is 'online' based on the collected statistics.
2019-08-06 17:35:59 +01:00
func ( service * Service ) IsOnline ( node * NodeDossier ) bool {
2019-11-15 22:43:06 +00:00
return time . Since ( node . Reputation . LastContactSuccess ) < service . config . Node . OnlineWindow
2019-04-23 23:45:50 +01:00
}
2020-05-06 14:05:31 +01:00
// FindStorageNodesForGracefulExit searches the overlay network for nodes that meet the provided requirements for graceful-exit requests.
//
// The main difference between this method and the normal FindStorageNodes is that here we avoid using the cache.
func ( service * Service ) FindStorageNodesForGracefulExit ( ctx context . Context , req FindStorageNodesRequest ) ( _ [ ] * SelectedNode , err error ) {
2020-04-24 17:11:04 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2020-12-22 19:07:07 +00:00
if service . config . Node . AsOfSystemTime . Enabled && service . config . Node . AsOfSystemTime . DefaultInterval < 0 {
req . AsOfSystemTimeInterval = service . config . Node . AsOfSystemTime . DefaultInterval
}
2020-05-06 14:05:31 +01:00
return service . FindStorageNodesWithPreferences ( ctx , req , & service . config . Node )
}
// FindStorageNodesForUpload searches the overlay network for nodes that meet the provided requirements for upload.
//
// When enabled it uses the cache to select nodes.
// When the node selection from the cache fails, it falls back to the old implementation.
func ( service * Service ) FindStorageNodesForUpload ( ctx context . Context , req FindStorageNodesRequest ) ( _ [ ] * SelectedNode , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2020-12-22 19:07:07 +00:00
if service . config . Node . AsOfSystemTime . Enabled && service . config . Node . AsOfSystemTime . DefaultInterval < 0 {
req . AsOfSystemTimeInterval = service . config . Node . AsOfSystemTime . DefaultInterval
}
2020-05-06 14:05:31 +01:00
if service . config . NodeSelectionCache . Disabled {
return service . FindStorageNodesWithPreferences ( ctx , req , & service . config . Node )
}
2021-01-28 11:46:18 +00:00
selectedNodes , err := service . UploadSelectionCache . GetNodes ( ctx , req )
2020-04-24 17:11:04 +01:00
if err != nil {
service . log . Warn ( "error selecting from node selection cache" , zap . String ( "err" , err . Error ( ) ) )
}
2020-12-22 19:07:07 +00:00
2020-04-24 17:11:04 +01:00
if len ( selectedNodes ) < req . RequestedCount {
mon . Event ( "default_node_selection" )
return service . FindStorageNodesWithPreferences ( ctx , req , & service . config . Node )
}
return selectedNodes , nil
}
2020-05-06 14:05:31 +01:00
// FindStorageNodesWithPreferences searches the overlay network for nodes that meet the provided criteria.
//
// This does not use a cache.
2020-03-28 14:56:05 +00:00
func ( service * Service ) FindStorageNodesWithPreferences ( ctx context . Context , req FindStorageNodesRequest , preferences * NodeSelectionConfig ) ( nodes [ ] * SelectedNode , err error ) {
2019-03-23 08:06:11 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-01-31 18:49:00 +00:00
// TODO: add sanity limits to requested node count
// TODO: add sanity limits to excluded nodes
2020-05-07 12:54:48 +01:00
totalNeededNodes := req . RequestedCount
2019-01-31 18:49:00 +00:00
2020-03-12 18:37:57 +00:00
excludedIDs := req . ExcludedIDs
// if distinctIP is enabled, keep track of the network
// to make sure we only select nodes from different networks
var excludedNetworks [ ] string
if preferences . DistinctIP && len ( excludedIDs ) > 0 {
excludedNetworks , err = service . db . GetNodesNetwork ( ctx , excludedIDs )
2019-11-06 21:38:52 +00:00
if err != nil {
return nil , Error . Wrap ( err )
}
}
2019-04-23 16:23:51 +01:00
newNodeCount := 0
2020-03-18 21:16:13 +00:00
if preferences . NewNodeFraction > 0 {
2020-04-09 16:19:44 +01:00
newNodeCount = int ( float64 ( totalNeededNodes ) * preferences . NewNodeFraction )
2019-04-23 16:23:51 +01:00
}
2019-05-07 15:44:47 +01:00
criteria := NodeCriteria {
2020-12-22 19:07:07 +00:00
FreeDisk : preferences . MinimumDiskSpace . Int64 ( ) ,
ExcludedIDs : excludedIDs ,
ExcludedNetworks : excludedNetworks ,
MinimumVersion : preferences . MinimumVersion ,
OnlineWindow : preferences . OnlineWindow ,
DistinctIP : preferences . DistinctIP ,
AsOfSystemTimeInterval : req . AsOfSystemTimeInterval ,
2019-05-07 15:44:47 +01:00
}
2020-04-09 16:19:44 +01:00
nodes , err = service . db . SelectStorageNodes ( ctx , totalNeededNodes , newNodeCount , & criteria )
2019-01-31 18:49:00 +00:00
if err != nil {
2019-08-06 17:35:59 +01:00
return nil , Error . Wrap ( err )
2019-01-31 18:49:00 +00:00
}
2020-04-09 16:19:44 +01:00
if len ( nodes ) < totalNeededNodes {
return nodes , ErrNotEnoughNodes . New ( "requested %d found %d; %+v " , totalNeededNodes , len ( nodes ) , criteria )
2019-01-31 18:49:00 +00:00
}
return nodes , nil
}
2020-07-16 15:18:02 +01:00
// KnownOffline filters a set of nodes to offline nodes.
2019-08-06 17:35:59 +01:00
func ( service * Service ) KnownOffline ( ctx context . Context , nodeIds storj . NodeIDList ) ( offlineNodes storj . NodeIDList , err error ) {
2019-06-18 23:22:14 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
criteria := & NodeCriteria {
2019-08-06 17:35:59 +01:00
OnlineWindow : service . config . Node . OnlineWindow ,
2019-06-18 23:22:14 +01:00
}
2019-08-06 17:35:59 +01:00
return service . db . KnownOffline ( ctx , criteria , nodeIds )
2019-06-18 23:22:14 +01:00
}
2019-05-10 20:05:42 +01:00
// KnownUnreliableOrOffline filters a set of nodes to unhealth or offlines node, independent of new.
2019-08-06 17:35:59 +01:00
func ( service * Service ) KnownUnreliableOrOffline ( ctx context . Context , nodeIds storj . NodeIDList ) ( badNodes storj . NodeIDList , err error ) {
2019-03-23 08:06:11 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-05-01 14:45:52 +01:00
criteria := & NodeCriteria {
2019-08-06 17:35:59 +01:00
OnlineWindow : service . config . Node . OnlineWindow ,
2018-09-11 05:52:14 +01:00
}
2019-08-06 17:35:59 +01:00
return service . db . KnownUnreliableOrOffline ( ctx , criteria , nodeIds )
2018-09-11 05:52:14 +01:00
}
2019-12-16 13:45:13 +00:00
// KnownReliable filters a set of nodes to reliable (online and qualified) nodes.
func ( service * Service ) KnownReliable ( ctx context . Context , nodeIDs storj . NodeIDList ) ( nodes [ ] * pb . Node , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
return service . db . KnownReliable ( ctx , service . config . Node . OnlineWindow , nodeIDs )
}
2019-07-08 23:04:35 +01:00
// Reliable filters a set of nodes that are reliable, independent of new.
2019-08-06 17:35:59 +01:00
func ( service * Service ) Reliable ( ctx context . Context ) ( nodes storj . NodeIDList , err error ) {
2019-07-08 23:04:35 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
criteria := & NodeCriteria {
2019-08-06 17:35:59 +01:00
OnlineWindow : service . config . Node . OnlineWindow ,
2019-07-08 23:04:35 +01:00
}
2019-08-06 17:35:59 +01:00
return service . db . Reliable ( ctx , criteria )
2019-07-08 23:04:35 +01:00
}
2020-07-16 15:18:02 +01:00
// BatchUpdateStats updates multiple storagenode's stats in one transaction.
2019-08-06 17:35:59 +01:00
func ( service * Service ) BatchUpdateStats ( ctx context . Context , requests [ ] * UpdateRequest ) ( failed storj . NodeIDList , err error ) {
2019-07-31 18:21:06 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
for _ , request := range requests {
2019-08-06 17:35:59 +01:00
request . AuditLambda = service . config . Node . AuditReputationLambda
request . AuditWeight = service . config . Node . AuditReputationWeight
request . AuditDQ = service . config . Node . AuditReputationDQ
2020-04-14 17:49:45 +01:00
request . SuspensionGracePeriod = service . config . Node . SuspensionGracePeriod
2020-05-04 17:32:06 +01:00
request . SuspensionDQEnabled = service . config . Node . SuspensionDQEnabled
2020-05-20 20:57:53 +01:00
request . AuditsRequiredForVetting = service . config . Node . AuditCount
2020-08-26 21:26:10 +01:00
request . AuditHistory = service . config . AuditHistory
2019-07-31 18:21:06 +01:00
}
2020-08-26 21:26:10 +01:00
return service . db . BatchUpdateStats ( ctx , requests , service . config . UpdateStatsBatchSize , time . Now ( ) )
2019-07-31 18:21:06 +01:00
}
2019-03-25 22:25:09 +00:00
// UpdateStats all parts of single storagenode's stats.
2019-08-06 17:35:59 +01:00
func ( service * Service ) UpdateStats ( ctx context . Context , request * UpdateRequest ) ( stats * NodeStats , err error ) {
2019-03-25 22:25:09 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-06-20 14:56:04 +01:00
2019-08-06 17:35:59 +01:00
request . AuditLambda = service . config . Node . AuditReputationLambda
request . AuditWeight = service . config . Node . AuditReputationWeight
request . AuditDQ = service . config . Node . AuditReputationDQ
2020-04-14 17:49:45 +01:00
request . SuspensionGracePeriod = service . config . Node . SuspensionGracePeriod
2020-05-04 17:32:06 +01:00
request . SuspensionDQEnabled = service . config . Node . SuspensionDQEnabled
2020-05-20 20:57:53 +01:00
request . AuditsRequiredForVetting = service . config . Node . AuditCount
2020-08-26 21:26:10 +01:00
request . AuditHistory = service . config . AuditHistory
2019-06-20 14:56:04 +01:00
2020-08-26 21:26:10 +01:00
return service . db . UpdateStats ( ctx , request , time . Now ( ) )
2019-03-25 22:25:09 +00:00
}
2019-04-10 07:04:24 +01:00
// UpdateNodeInfo updates node dossier with info requested from the node itself like node type, email, wallet, capacity, and version.
2020-06-16 13:16:55 +01:00
func ( service * Service ) UpdateNodeInfo ( ctx context . Context , node storj . NodeID , nodeInfo * InfoResponse ) ( stats * NodeDossier , err error ) {
2019-03-25 22:25:09 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-08-06 17:35:59 +01:00
return service . db . UpdateNodeInfo ( ctx , node , nodeInfo )
2019-03-25 22:25:09 +00:00
}
2019-09-19 19:37:31 +01:00
// UpdateCheckIn updates a single storagenode's check-in info.
2019-11-15 22:43:06 +00:00
func ( service * Service ) UpdateCheckIn ( ctx context . Context , node NodeCheckInInfo , timestamp time . Time ) ( err error ) {
2019-09-19 19:37:31 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-11-15 22:43:06 +00:00
return service . db . UpdateCheckIn ( ctx , node , timestamp , service . config . Node )
2019-09-19 19:37:31 +01:00
}
2020-07-16 15:18:02 +01:00
// GetMissingPieces returns the list of offline nodes.
2019-08-06 17:35:59 +01:00
func ( service * Service ) GetMissingPieces ( ctx context . Context , pieces [ ] * pb . RemotePiece ) ( missingPieces [ ] int32 , err error ) {
2019-06-04 12:36:27 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-05-16 14:49:10 +01:00
var nodeIDs storj . NodeIDList
for _ , p := range pieces {
nodeIDs = append ( nodeIDs , p . NodeId )
}
2019-08-06 17:35:59 +01:00
badNodeIDs , err := service . KnownUnreliableOrOffline ( ctx , nodeIDs )
2019-05-16 14:49:10 +01:00
if err != nil {
return nil , Error . New ( "error getting nodes %s" , err )
}
for _ , p := range pieces {
for _ , nodeID := range badNodeIDs {
if nodeID == p . NodeId {
missingPieces = append ( missingPieces , p . GetPieceNum ( ) )
}
}
}
return missingPieces , nil
}
2019-05-30 18:35:04 +01:00
2020-01-03 19:11:47 +00:00
// DisqualifyNode disqualifies a storage node.
func ( service * Service ) DisqualifyNode ( ctx context . Context , nodeID storj . NodeID ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
return service . db . DisqualifyNode ( ctx , nodeID )
}
2020-07-16 15:18:02 +01:00
// ResolveIPAndNetwork resolves the target address and determines its IP and /24 subnet IPv4 or /64 subnet IPv6.
2020-03-12 18:37:57 +00:00
func ResolveIPAndNetwork ( ctx context . Context , target string ) ( ipPort , network string , err error ) {
2019-06-24 16:33:18 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2020-03-06 22:04:23 +00:00
host , port , err := net . SplitHostPort ( target )
if err != nil {
return "" , "" , err
}
ipAddr , err := net . ResolveIPAddr ( "ip" , host )
2019-05-30 18:35:04 +01:00
if err != nil {
2020-03-06 22:04:23 +00:00
return "" , "" , err
2019-05-30 18:35:04 +01:00
}
2019-06-24 16:33:18 +01:00
// If addr can be converted to 4byte notation, it is an IPv4 address, else its an IPv6 address
2020-03-06 22:04:23 +00:00
if ipv4 := ipAddr . IP . To4 ( ) ; ipv4 != nil {
2020-10-13 13:47:55 +01:00
// Filter all IPv4 Addresses into /24 Subnet's
2019-06-24 16:33:18 +01:00
mask := net . CIDRMask ( 24 , 32 )
2020-03-06 22:04:23 +00:00
return net . JoinHostPort ( ipAddr . String ( ) , port ) , ipv4 . Mask ( mask ) . String ( ) , nil
2019-06-24 16:33:18 +01:00
}
2020-03-06 22:04:23 +00:00
if ipv6 := ipAddr . IP . To16 ( ) ; ipv6 != nil {
2020-10-13 13:47:55 +01:00
// Filter all IPv6 Addresses into /64 Subnet's
2019-06-24 16:33:18 +01:00
mask := net . CIDRMask ( 64 , 128 )
2020-03-06 22:04:23 +00:00
return net . JoinHostPort ( ipAddr . String ( ) , port ) , ipv6 . Mask ( mask ) . String ( ) , nil
2019-06-24 16:33:18 +01:00
}
2020-03-06 22:04:23 +00:00
return "" , "" , errors . New ( "unable to get network for address " + ipAddr . String ( ) )
2019-05-30 18:35:04 +01:00
}
2020-07-08 15:28:49 +01:00
// TestVetNode directly sets a node's vetted_at timestamp to make testing easier.
func ( service * Service ) TestVetNode ( ctx context . Context , nodeID storj . NodeID ) ( vettedTime * time . Time , err error ) {
vettedTime , err = service . db . TestVetNode ( ctx , nodeID )
service . log . Warn ( "node vetted" , zap . Stringer ( "node ID" , nodeID ) , zap . Stringer ( "vetted time" , vettedTime ) )
if err != nil {
service . log . Warn ( "error vetting node" , zap . Stringer ( "node ID" , nodeID ) )
return nil , err
}
2021-01-28 11:46:18 +00:00
err = service . UploadSelectionCache . Refresh ( ctx )
2020-07-08 15:28:49 +01:00
service . log . Warn ( "nodecache refresh err" , zap . Error ( err ) )
return vettedTime , err
}
// TestUnvetNode directly sets a node's vetted_at timestamp to null to make testing easier.
func ( service * Service ) TestUnvetNode ( ctx context . Context , nodeID storj . NodeID ) ( err error ) {
err = service . db . TestUnvetNode ( ctx , nodeID )
if err != nil {
service . log . Warn ( "error unvetting node" , zap . Stringer ( "node ID" , nodeID ) , zap . Error ( err ) )
return err
}
2021-01-28 11:46:18 +00:00
err = service . UploadSelectionCache . Refresh ( ctx )
2020-07-08 15:28:49 +01:00
service . log . Warn ( "nodecache refresh err" , zap . Error ( err ) )
return err
}