Clearer code for node selection (#1173)

This commit is contained in:
Egon Elbre 2019-01-31 20:49:00 +02:00 committed by Maximillian von Briesen
parent 5ae47c0de4
commit 0c366c1ed2
6 changed files with 299 additions and 372 deletions

View File

@ -30,13 +30,19 @@ var ErrNodeNotFound = errs.New("Node not found")
// ErrBucketNotFound is returned if a bucket is unable to be found in the routing table
var ErrBucketNotFound = errs.New("Bucket not found")
// ErrNotEnoughNodes is when selecting nodes failed with the given parameters
var ErrNotEnoughNodes = errs.Class("not enough nodes")
// OverlayError creates class of errors for stack traces
var OverlayError = errs.Class("Overlay Error")
// DB implements the database for overlay.Cache
type DB interface {
// FilterNodes looks up nodes based on reputation requirements
FilterNodes(ctx context.Context, filterNodesRequest *FilterNodesRequest) ([]*pb.Node, error)
// SelectNodes looks up nodes based on criteria
SelectNodes(ctx context.Context, count int, criteria *NodeCriteria) ([]*pb.Node, error)
// SelectNewNodes looks up nodes based on new node criteria
SelectNewNodes(ctx context.Context, count int, criteria *NewNodeCriteria) ([]*pb.Node, error)
// Get looks up the node by nodeID
Get(ctx context.Context, nodeID storj.NodeID) (*pb.Node, error)
// GetAll looks up nodes based on the ids from the overlay cache
@ -49,7 +55,7 @@ type DB interface {
Update(ctx context.Context, value *pb.Node) error
// Delete deletes node based on id
Delete(ctx context.Context, id storj.NodeID) error
//GetWalletAddress gets the node's wallet address
// GetWalletAddress gets the node's wallet address
GetWalletAddress(ctx context.Context, id storj.NodeID) (string, error)
}
@ -92,6 +98,74 @@ func (cache *Cache) Get(ctx context.Context, nodeID storj.NodeID) (*pb.Node, err
return cache.db.Get(ctx, nodeID)
}
// FindStorageNodes searches the overlay network for nodes that meet the provided criteria
func (cache *Cache) FindStorageNodes(ctx context.Context, req *pb.FindStorageNodesRequest, preferences *NodeSelectionConfig) ([]*pb.Node, error) {
// TODO: use a nicer struct for input
minimumRequiredNodes := int(req.GetMinNodes())
freeBandwidth := req.GetOpts().GetRestrictions().FreeBandwidth
freeDisk := req.GetOpts().GetRestrictions().FreeDisk
excludedNodes := req.GetOpts().ExcludedNodes
requestedCount := int(req.GetOpts().GetAmount())
// TODO: verify logic
// TODO: add sanity limits to requested node count
// TODO: add sanity limits to excluded nodes
reputableNodeCount := minimumRequiredNodes
if reputableNodeCount <= 0 {
reputableNodeCount = requestedCount
}
auditCount := preferences.AuditCount
if auditCount < preferences.NewNodeAuditThreshold {
auditCount = preferences.NewNodeAuditThreshold
}
reputableNodes, err := cache.db.SelectNodes(ctx, reputableNodeCount, &NodeCriteria{
Type: pb.NodeType_STORAGE,
FreeBandwidth: freeBandwidth,
FreeDisk: freeDisk,
AuditCount: auditCount,
AuditSuccessRatio: preferences.AuditSuccessRatio,
UptimeCount: preferences.UptimeCount,
UptimeSuccessRatio: preferences.UptimeRatio,
Excluded: excludedNodes,
})
if err != nil {
return nil, err
}
newNodeCount := int64(float64(reputableNodeCount) * preferences.NewNodePercentage)
newNodes, err := cache.db.SelectNewNodes(ctx, int(newNodeCount), &NewNodeCriteria{
Type: pb.NodeType_STORAGE,
FreeBandwidth: freeBandwidth,
FreeDisk: freeDisk,
AuditThreshold: preferences.NewNodeAuditThreshold,
Excluded: excludedNodes,
})
if err != nil {
return nil, err
}
nodes := []*pb.Node{}
nodes = append(nodes, newNodes...)
nodes = append(nodes, reputableNodes...)
if len(reputableNodes) < reputableNodeCount {
return nodes, ErrNotEnoughNodes.New("requested %d found %d", reputableNodeCount, len(reputableNodes))
}
return nodes, nil
}
// GetAll looks up the provided ids from the overlay cache
func (cache *Cache) GetAll(ctx context.Context, ids storj.NodeIDList) ([]*pb.Node, error) {
if len(ids) == 0 {

View File

@ -36,12 +36,13 @@ type LookupConfig struct {
// NodeSelectionConfig is a configuration struct to determine the minimum
// values for nodes to select
type NodeSelectionConfig struct {
UptimeRatio float64 `help:"a node's ratio of being up/online vs. down/offline" default:"0"`
UptimeCount int64 `help:"the number of times a node's uptime has been checked" default:"0"`
AuditSuccessRatio float64 `help:"a node's ratio of successful audits" default:"0"`
AuditCount int64 `help:"the number of times a node has been audited" default:"0"`
UptimeRatio float64 `help:"a node's ratio of being up/online vs. down/offline" default:"0"`
UptimeCount int64 `help:"the number of times a node's uptime has been checked" default:"0"`
AuditSuccessRatio float64 `help:"a node's ratio of successful audits" default:"0"`
AuditCount int64 `help:"the number of times a node has been audited" default:"0"`
NewNodeAuditThreshold int64 `help:"the number of audits a node must have to not be considered a New Node" default:"0"`
NewNodePercentage float64 `help:"the percentage of new nodes allowed per request" default:"0.05"`
NewNodePercentage float64 `help:"the percentage of new nodes allowed per request" default:"0.05"` // TODO: fix, this is not percentage, it's ratio
}
// ParseIDs converts the base58check encoded node ID strings from the config into node IDs

View File

@ -8,8 +8,6 @@ import (
"github.com/zeebo/errs"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/pkg/pb"
@ -21,19 +19,19 @@ var ServerError = errs.Class("Server Error")
// Server implements our overlay RPC service
type Server struct {
log *zap.Logger
cache *Cache
metrics *monkit.Registry
nodeSelectionConfig *NodeSelectionConfig
log *zap.Logger
cache *Cache
metrics *monkit.Registry
preferences *NodeSelectionConfig
}
// NewServer creates a new Overlay Server
func NewServer(log *zap.Logger, cache *Cache, nodeSelectionConfig *NodeSelectionConfig) *Server {
func NewServer(log *zap.Logger, cache *Cache, preferences *NodeSelectionConfig) *Server {
return &Server{
cache: cache,
log: log,
metrics: monkit.Default,
nodeSelectionConfig: nodeSelectionConfig,
cache: cache,
log: log,
metrics: monkit.Default,
preferences: preferences,
}
}
@ -67,48 +65,47 @@ func (server *Server) BulkLookup(ctx context.Context, reqs *pb.LookupRequests) (
return nodesToLookupResponses(ns), nil
}
// FilterNodesRequest are the requirements for nodes from the overlay cache
type FilterNodesRequest struct {
MinReputation *pb.NodeStats
MinNodes int64
Opts *pb.OverlayOptions
NewNodePercentage float64
NewNodeAuditThreshold int64
// NodeCriteria are the requirements for selecting nodes
type NodeCriteria struct {
Type pb.NodeType
FreeBandwidth int64
FreeDisk int64
AuditCount int64
AuditSuccessRatio float64
UptimeCount int64
UptimeSuccessRatio float64
Excluded []storj.NodeID
}
// NewNodeCriteria are the requirement for selecting new nodes
type NewNodeCriteria struct {
Type pb.NodeType
FreeBandwidth int64
FreeDisk int64
AuditThreshold int64
Excluded []storj.NodeID
}
// FindStorageNodes searches the overlay network for nodes that meet the provided requirements
func (server *Server) FindStorageNodes(ctx context.Context, req *pb.FindStorageNodesRequest) (resp *pb.FindStorageNodesResponse, err error) {
defer mon.Task()(&ctx)(&err)
return server.FindStorageNodesWithPreferences(ctx, req, server.preferences)
}
minStats := &pb.NodeStats{
AuditCount: server.nodeSelectionConfig.AuditCount,
AuditSuccessRatio: server.nodeSelectionConfig.AuditSuccessRatio,
UptimeCount: server.nodeSelectionConfig.UptimeCount,
UptimeRatio: server.nodeSelectionConfig.UptimeRatio,
}
filterNodesReq := &FilterNodesRequest{
MinReputation: minStats,
MinNodes: req.GetMinNodes(),
Opts: req.GetOpts(),
NewNodePercentage: server.nodeSelectionConfig.NewNodePercentage,
NewNodeAuditThreshold: server.nodeSelectionConfig.NewNodeAuditThreshold,
}
foundNodes, err := server.cache.db.FilterNodes(ctx, filterNodesReq)
if err != nil {
stat, _ := status.FromError(err)
if stat.Code() == codes.ResourceExhausted {
return &pb.FindStorageNodesResponse{
Nodes: foundNodes,
}, err
}
return nil, Error.Wrap(err)
}
// FindStorageNodesWithPreferences searches the overlay network for nodes that meet the provided requirements
// exposed mainly for testing
func (server *Server) FindStorageNodesWithPreferences(ctx context.Context, req *pb.FindStorageNodesRequest, preferences *NodeSelectionConfig) (resp *pb.FindStorageNodesResponse, err error) {
// TODO: use better structs for find storage nodes
nodes, err := server.cache.FindStorageNodes(ctx, req, preferences)
return &pb.FindStorageNodesResponse{
Nodes: foundNodes,
}, nil
Nodes: nodes,
}, err
}
// lookupRequestsToNodeIDs returns the nodeIDs from the LookupRequests

View File

@ -4,19 +4,18 @@
package overlay_test
import (
"strconv"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/zeebo/errs"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testplanet"
"storj.io/storj/pkg/overlay"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
)
func TestServer(t *testing.T) {
@ -67,176 +66,164 @@ func TestServer(t *testing.T) {
}
}
func TestNewNodeFiltering(t *testing.T) {
func TestNodeSelection(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
var totalNodes int
totalNodes = 10
planet, err := testplanet.New(t, 1, totalNodes, 1)
if err != nil {
t.Fatal(err)
}
planet, err := testplanet.New(t, 1, 10, 0)
require.NoError(t, err)
planet.Start(ctx)
defer ctx.Check(planet.Shutdown)
// we wait a second for all the nodes to complete bootstrapping off the satellite
time.Sleep(5 * time.Second)
satellite := planet.Satellites[0]
// we wait a second for all the nodes to complete bootstrapping off the satellite
time.Sleep(2 * time.Second)
// This sets a reputable audit count for a certain number of nodes.
for i, node := range planet.StorageNodes {
for j := 0; j < i; j++ {
for k := 0; k < i; k++ {
_, err := satellite.DB.StatDB().UpdateAuditSuccess(ctx, node.ID(), true)
assert.NoError(t, err)
}
}
for _, tt := range []struct {
name string
newNodeAuditThreshold int64
newNodePercentage float64
requestedNodeAmt int64
expectedResultLength int
excludedAmt int
notEnoughRepNodes bool
}{
{
name: "case: all reputable nodes, only reputable nodes requested",
newNodeAuditThreshold: 0,
newNodePercentage: 0,
requestedNodeAmt: 5,
expectedResultLength: 5,
// ensure all storagenodes are in overlay service
for _, storageNode := range planet.StorageNodes {
err = satellite.Overlay.Service.Put(ctx, storageNode.ID(), storageNode.Local())
assert.NoError(t, err)
}
type test struct {
Preferences overlay.NodeSelectionConfig
ExcludeCount int
RequestCount int64
ExpectedCount int
ShouldFailWith *errs.Class
}
for i, tt := range []test{
{ // all reputable nodes, only reputable nodes requested
Preferences: overlay.NodeSelectionConfig{
NewNodeAuditThreshold: 0,
NewNodePercentage: 0,
},
RequestCount: 5,
ExpectedCount: 5,
},
{
name: "case: all reputable nodes, reputable and new nodes requested",
newNodeAuditThreshold: 0,
newNodePercentage: 1,
requestedNodeAmt: 5,
expectedResultLength: 5,
{ // all reputable nodes, reputable and new nodes requested
Preferences: overlay.NodeSelectionConfig{
NewNodeAuditThreshold: 0,
NewNodePercentage: 1,
},
RequestCount: 5,
ExpectedCount: 5,
},
{
name: "case: all reputable nodes except one, reputable and new nodes requested",
newNodeAuditThreshold: 1,
newNodePercentage: 1,
requestedNodeAmt: 5,
expectedResultLength: 6,
{ // all reputable nodes except one, reputable and new nodes requested
Preferences: overlay.NodeSelectionConfig{
NewNodeAuditThreshold: 1,
NewNodePercentage: 1,
},
RequestCount: 5,
ExpectedCount: 6,
},
{
name: "case: 50-50 reputable and new nodes, reputable and new nodes requested (new node % 1)",
newNodeAuditThreshold: 5,
newNodePercentage: 1,
requestedNodeAmt: 2,
expectedResultLength: 4,
{ // 50-50 reputable and new nodes, reputable and new nodes requested (new node ratio 1.0)
Preferences: overlay.NodeSelectionConfig{
NewNodeAuditThreshold: 5,
NewNodePercentage: 1,
},
RequestCount: 2,
ExpectedCount: 4,
},
{
name: "case: 50-50 reputable and new nodes, reputable and new nodes requested (new node % .5)",
newNodeAuditThreshold: 5,
newNodePercentage: 0.5,
requestedNodeAmt: 4,
expectedResultLength: 6,
{ // 50-50 reputable and new nodes, reputable and new nodes requested (new node ratio 0.5)
Preferences: overlay.NodeSelectionConfig{
NewNodeAuditThreshold: 5,
NewNodePercentage: 0.5,
},
RequestCount: 4,
ExpectedCount: 6,
},
{
name: "case: all new nodes except one, reputable and new nodes requested (happy path)",
newNodeAuditThreshold: 8,
newNodePercentage: 1,
requestedNodeAmt: 1,
expectedResultLength: 2,
{ // all new nodes except one, reputable and new nodes requested (happy path)
Preferences: overlay.NodeSelectionConfig{
NewNodeAuditThreshold: 8,
NewNodePercentage: 1,
},
RequestCount: 1,
ExpectedCount: 2,
},
{
name: "case: all new nodes except one, reputable and new nodes requested (not happy path)",
newNodeAuditThreshold: 9,
newNodePercentage: 1,
requestedNodeAmt: 2,
expectedResultLength: 3,
notEnoughRepNodes: true,
{ // all new nodes except one, reputable and new nodes requested (not happy path)
Preferences: overlay.NodeSelectionConfig{
NewNodeAuditThreshold: 9,
NewNodePercentage: 1,
},
RequestCount: 2,
ExpectedCount: 3,
ShouldFailWith: &overlay.ErrNotEnoughNodes,
},
{
name: "case: all new nodes, reputable and new nodes requested",
newNodeAuditThreshold: 50,
newNodePercentage: 1,
requestedNodeAmt: 2,
expectedResultLength: 2,
notEnoughRepNodes: true,
{ // all new nodes, reputable and new nodes requested
Preferences: overlay.NodeSelectionConfig{
NewNodeAuditThreshold: 50,
NewNodePercentage: 1,
},
RequestCount: 2,
ExpectedCount: 2,
ShouldFailWith: &overlay.ErrNotEnoughNodes,
},
{
name: "case: audit threshold edge case (1)",
newNodeAuditThreshold: 9,
newNodePercentage: 0,
requestedNodeAmt: 1,
expectedResultLength: 1,
{ // audit threshold edge case (1)
Preferences: overlay.NodeSelectionConfig{
NewNodeAuditThreshold: 9,
NewNodePercentage: 0,
},
RequestCount: 1,
ExpectedCount: 1,
},
{
name: "case: audit threshold edge case (2)",
newNodeAuditThreshold: 0,
newNodePercentage: 1,
requestedNodeAmt: 1,
expectedResultLength: 1,
{ // audit threshold edge case (2)
Preferences: overlay.NodeSelectionConfig{
NewNodeAuditThreshold: 0,
NewNodePercentage: 1,
},
RequestCount: 1,
ExpectedCount: 1,
},
{
name: "case: excluded node ids being excluded",
excludedAmt: 7,
newNodeAuditThreshold: 5,
newNodePercentage: 0,
requestedNodeAmt: 5,
expectedResultLength: 3,
notEnoughRepNodes: true,
{ // excluded node ids being excluded
Preferences: overlay.NodeSelectionConfig{
NewNodeAuditThreshold: 5,
NewNodePercentage: 0,
},
ExcludeCount: 7,
RequestCount: 5,
ExpectedCount: 3,
ShouldFailWith: &overlay.ErrNotEnoughNodes,
},
} {
t.Logf("#%2d. %+v", i, tt)
endpoint := planet.Satellites[0].Overlay.Endpoint
nodeSelectionConfig := &overlay.NodeSelectionConfig{
UptimeCount: 0,
UptimeRatio: 0,
AuditSuccessRatio: 0,
AuditCount: 0,
NewNodeAuditThreshold: tt.newNodeAuditThreshold,
NewNodePercentage: tt.newNodePercentage,
var excludedNodes []storj.NodeID
for _, storageNode := range planet.StorageNodes[:tt.ExcludeCount] {
excludedNodes = append(excludedNodes, storageNode.ID())
}
server := overlay.NewServer(satellite.Log.Named("overlay"), satellite.Overlay.Service, nodeSelectionConfig)
var excludedNodes []pb.NodeID
for i := range planet.StorageNodes {
address := "127.0.0.1:555" + strconv.Itoa(i)
n := &pb.Node{
Id: planet.StorageNodes[i].ID(),
Address: &pb.NodeAddress{Address: address},
}
if tt.excludedAmt != 0 && i < tt.excludedAmt {
excludedNodes = append(excludedNodes, n.Id)
}
err = satellite.Overlay.Service.Put(ctx, n.Id, *n)
assert.NoError(t, err, tt.name)
}
result, err := server.FindStorageNodes(ctx,
response, err := endpoint.FindStorageNodesWithPreferences(ctx,
&pb.FindStorageNodesRequest{
Opts: &pb.OverlayOptions{
Restrictions: &pb.NodeRestrictions{
FreeBandwidth: 0,
FreeDisk: 0,
},
Amount: tt.requestedNodeAmt,
Amount: tt.RequestCount,
ExcludedNodes: excludedNodes,
},
})
}, &tt.Preferences)
if tt.notEnoughRepNodes {
stat, ok := status.FromError(err)
assert.Equal(t, true, ok, tt.name)
assert.Equal(t, codes.ResourceExhausted, stat.Code(), tt.name)
t.Log(len(response.Nodes), err)
if tt.ShouldFailWith != nil {
assert.Error(t, err)
assert.True(t, tt.ShouldFailWith.Has(err))
} else {
assert.NoError(t, err, tt.name)
assert.NoError(t, err)
}
assert.Equal(t, tt.expectedResultLength, len(result.GetNodes()), tt.name)
assert.Equal(t, tt.ExpectedCount, len(response.Nodes))
}
}

View File

@ -459,13 +459,6 @@ type lockedOverlayCache struct {
db overlay.DB
}
// FilterNodes looks up nodes based on reputation requirements
func (m *lockedOverlayCache) FilterNodes(ctx context.Context, req *overlay.FilterNodesRequest) ([]*pb.Node, error) {
m.Lock()
defer m.Unlock()
return m.db.FilterNodes(ctx, req)
}
// Delete deletes node based on id
func (m *lockedOverlayCache) Delete(ctx context.Context, id storj.NodeID) error {
m.Lock()
@ -508,6 +501,20 @@ func (m *lockedOverlayCache) Paginate(ctx context.Context, offset int64, limit i
return m.db.Paginate(ctx, offset, limit)
}
// SelectNewNodes looks up nodes based on new node criteria
func (m *lockedOverlayCache) SelectNewNodes(ctx context.Context, count int, criteria *overlay.NewNodeCriteria) ([]*pb.Node, error) {
m.Lock()
defer m.Unlock()
return m.db.SelectNewNodes(ctx, count, criteria)
}
// SelectNodes looks up nodes based on criteria
func (m *lockedOverlayCache) SelectNodes(ctx context.Context, count int, criteria *overlay.NodeCriteria) ([]*pb.Node, error) {
m.Lock()
defer m.Unlock()
return m.db.SelectNodes(ctx, count, criteria)
}
// Update updates node information
func (m *lockedOverlayCache) Update(ctx context.Context, value *pb.Node) error {
m.Lock()

View File

@ -6,17 +6,13 @@ package satellitedb
import (
"context"
"database/sql"
"fmt"
"strings"
"github.com/zeebo/errs"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"storj.io/storj/pkg/overlay"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
"storj.io/storj/pkg/utils"
dbx "storj.io/storj/satellite/satellitedb/dbx"
"storj.io/storj/storage"
)
@ -27,100 +23,53 @@ type overlaycache struct {
db *dbx.DB
}
type getNodesRequest struct {
minReputation *pb.NodeStats
freeBandwidth int64
freeDisk int64
excluded []pb.NodeID
reputableNodeAmount int64
newNodeAmount int64
newNodeAuditThreshold int64
func (cache *overlaycache) SelectNodes(ctx context.Context, count int, criteria *overlay.NodeCriteria) ([]*pb.Node, error) {
return cache.queryFilteredNodes(ctx, criteria.Excluded, count, `
WHERE node_type == ? AND free_bandwidth >= ? AND free_disk >= ?
AND audit_count >= ?
AND audit_success_ratio >= ?
AND uptime_count >= ?
AND audit_uptime_ratio >= ?
`, int(criteria.Type), criteria.FreeBandwidth, criteria.FreeDisk,
criteria.AuditCount, criteria.AuditSuccessRatio, criteria.UptimeCount, criteria.UptimeSuccessRatio,
)
}
// FilterNodes looks up nodes based on reputation requirements
func (cache *overlaycache) FilterNodes(ctx context.Context, req *overlay.FilterNodesRequest) ([]*pb.Node, error) {
reputableNodeAmount := req.MinNodes
if reputableNodeAmount <= 0 {
reputableNodeAmount = req.Opts.GetAmount()
}
getReputableReq := &getNodesRequest{
minReputation: req.MinReputation,
freeBandwidth: req.Opts.GetRestrictions().FreeBandwidth,
freeDisk: req.Opts.GetRestrictions().FreeDisk,
excluded: req.Opts.ExcludedNodes,
reputableNodeAmount: reputableNodeAmount,
newNodeAuditThreshold: req.NewNodeAuditThreshold,
}
reputableNodes, err := cache.getReputableNodes(ctx, getReputableReq)
if err != nil {
return nil, err
}
newNodeAmount := int64(float64(reputableNodeAmount) * req.NewNodePercentage)
getNewReq := &getNodesRequest{
freeBandwidth: req.Opts.GetRestrictions().FreeBandwidth,
freeDisk: req.Opts.GetRestrictions().FreeDisk,
excluded: req.Opts.ExcludedNodes,
newNodeAmount: newNodeAmount,
newNodeAuditThreshold: req.NewNodeAuditThreshold,
}
newNodes, err := cache.getNewNodes(ctx, getNewReq)
if err != nil {
return nil, err
}
var allNodes []*pb.Node
allNodes = append(allNodes, reputableNodes...)
allNodes = append(allNodes, newNodes...)
if int64(len(reputableNodes)) < reputableNodeAmount {
err := status.Errorf(codes.ResourceExhausted, fmt.Sprintf("requested %d reputable nodes, only %d reputable nodes matched the criteria requested",
reputableNodeAmount, len(reputableNodes)))
return allNodes, err
}
return allNodes, nil
func (cache *overlaycache) SelectNewNodes(ctx context.Context, count int, criteria *overlay.NewNodeCriteria) ([]*pb.Node, error) {
return cache.queryFilteredNodes(ctx, criteria.Excluded, count, `
WHERE node_type == ? AND free_bandwidth >= ? AND free_disk >= ?
AND audit_count < ?
`, int(criteria.Type), criteria.FreeBandwidth, criteria.FreeDisk,
criteria.AuditThreshold,
)
}
func (cache *overlaycache) getReputableNodes(ctx context.Context, req *getNodesRequest) ([]*pb.Node, error) {
rows, err := cache.findReputableNodesQuery(ctx, req)
func (cache *overlaycache) queryFilteredNodes(ctx context.Context, excluded []storj.NodeID, count int, safeQuery string, args ...interface{}) (_ []*pb.Node, err error) {
if count == 0 {
return nil, nil
}
safeExcludeNodes := ""
if len(excluded) > 0 {
safeExcludeNodes = ` AND node_id NOT IN (?` + strings.Repeat(", ?", len(excluded)-1) + `)`
}
for _, id := range excluded {
args = append(args, id.Bytes())
}
args = append(args, count)
rows, err := cache.db.Query(cache.db.Rebind(`SELECT node_id,
node_type, address, free_bandwidth, free_disk, audit_success_ratio,
audit_uptime_ratio, audit_count, audit_success_count, uptime_count,
uptime_success_count
FROM overlay_cache_nodes
`+safeQuery+safeExcludeNodes+` LIMIT ?`), args...)
if err != nil {
return nil, err
}
defer func() {
err = utils.CombineErrors(err, rows.Close())
}()
defer func() { err = errs.Combine(err, rows.Close()) }()
reputableNodes, err := sqlRowsToNodes(rows)
if err != nil {
return nil, err
}
return reputableNodes, nil
}
func (cache *overlaycache) getNewNodes(ctx context.Context, req *getNodesRequest) ([]*pb.Node, error) {
rows, err := cache.findNewNodesQuery(ctx, req)
if err != nil {
return nil, err
}
defer func() {
err = utils.CombineErrors(err, rows.Close())
}()
newNodes, err := sqlRowsToNodes(rows)
if err != nil {
return nil, err
}
return newNodes, nil
}
func sqlRowsToNodes(rows *sql.Rows) (nodes []*pb.Node, err error) {
var nodes []*pb.Node
for rows.Next() {
overlayNode := &dbx.OverlayCacheNode{}
err = rows.Scan(&overlayNode.NodeId, &overlayNode.NodeType,
@ -138,96 +87,8 @@ func sqlRowsToNodes(rows *sql.Rows) (nodes []*pb.Node, err error) {
}
nodes = append(nodes, node)
}
return nodes, nil
}
func (cache *overlaycache) findReputableNodesQuery(ctx context.Context, req *getNodesRequest) (*sql.Rows, error) {
auditCount := req.minReputation.AuditCount
if req.newNodeAuditThreshold > auditCount {
auditCount = req.newNodeAuditThreshold
}
auditSuccessRatio := req.minReputation.AuditSuccessRatio
uptimeCount := req.minReputation.UptimeCount
uptimeRatio := req.minReputation.UptimeRatio
nodeAmt := req.reputableNodeAmount
var rows *sql.Rows
var err error
var nodeTypeStorage int32 = 2
args := make([]interface{}, len(req.excluded))
for i, id := range req.excluded {
args[i] = id.Bytes()
}
args = append(args, auditCount, auditSuccessRatio, uptimeCount, uptimeRatio,
req.freeBandwidth, req.freeDisk, nodeTypeStorage, nodeAmt)
// This queries for nodes whose audit counts are greater than or equal to
// the new node audit threshold and the minimum reputation audit count.
rows, err = cache.db.Query(`SELECT node_id,
node_type, address, free_bandwidth, free_disk, audit_success_ratio,
audit_uptime_ratio, audit_count, audit_success_count, uptime_count,
uptime_success_count
FROM overlay_cache_nodes
WHERE node_id NOT IN (`+strings.Join(sliceOfCopies("?", len(req.excluded)), ", ")+`)
AND audit_count >= ?
AND audit_success_ratio >= ?
AND uptime_count >= ?
AND audit_uptime_ratio >= ?
AND free_bandwidth >= ?
AND free_disk >= ?
AND node_type == ?
LIMIT ?
`,
args...)
if err != nil {
return nil, err
}
return rows, nil
}
func sliceOfCopies(val string, count int) []string {
slice := make([]string, count)
for i := range slice {
slice[i] = val
}
return slice
}
func (cache *overlaycache) findNewNodesQuery(ctx context.Context, req *getNodesRequest) (*sql.Rows, error) {
var rows *sql.Rows
var err error
var nodeTypeStorage int32 = 2
args := make([]interface{}, len(req.excluded))
for i, id := range req.excluded {
args[i] = id.Bytes()
}
args = append(args, req.newNodeAuditThreshold, req.freeBandwidth,
req.freeDisk, nodeTypeStorage, req.newNodeAmount)
rows, err = cache.db.Query(cache.db.Rebind(`SELECT node_id,
node_type, address, free_bandwidth, free_disk, audit_success_ratio,
audit_uptime_ratio, audit_count, audit_success_count, uptime_count,
uptime_success_count
FROM overlay_cache_nodes
WHERE node_id NOT IN (`+strings.Join(sliceOfCopies("?", len(req.excluded)), ", ")+`)
AND audit_count < ?
AND free_bandwidth >= ?
AND free_disk >= ?
AND node_type == ?
LIMIT ?
`),
args...)
if err != nil {
return nil, err
}
return rows, nil
return nodes, rows.Err()
}
// Get looks up the node by nodeID