Filter nodes by reputation and IP address (node selection) (#720)

*   Pulls statdb stats into overlay cache whenever cache.Put() is called
* Updates overlay.FindStorageNodes()/overlayClient.Choose() to filter based on node stats
* Updates overlay.FindStorageNodes()/overlayClient.Choose() to exclude duplicate IP addresses
This commit is contained in:
Maximillian von Briesen 2018-12-04 15:18:26 -05:00 committed by GitHub
parent 2ab15196d9
commit 6c655d117b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 279 additions and 308 deletions

View File

@ -9,6 +9,7 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"storj.io/storj/pkg/overlay" "storj.io/storj/pkg/overlay"
"storj.io/storj/pkg/statdb"
"storj.io/storj/storage" "storj.io/storj/storage"
"storj.io/storj/storage/boltdb" "storj.io/storj/storage/boltdb"
"storj.io/storj/storage/redis" "storj.io/storj/storage/redis"
@ -27,6 +28,7 @@ func (c cacheConfig) open() (*overlay.Cache, error) {
} }
var db storage.KeyValueStore var db storage.KeyValueStore
var sdb *statdb.StatDB
switch dburl.Scheme { switch dburl.Scheme {
case "bolt": case "bolt":
@ -47,6 +49,10 @@ func (c cacheConfig) open() (*overlay.Cache, error) {
// add logger // add logger
db = storelogger.New(zap.L(), db) db = storelogger.New(zap.L(), db)
sdb, err = statdb.NewStatDB("postgres", dburl.String(), zap.L())
if err != nil {
return nil, Error.New("statdb error: %s", err)
}
return overlay.NewOverlayCache(db, nil, nil), nil return overlay.NewOverlayCache(db, nil, sdb), nil
} }

View File

@ -99,7 +99,7 @@ func cmdAdd(cmd *cobra.Command, args []string) (err error) {
zap.S().Error(err) zap.S().Error(err)
} }
zap.S().Infof("adding node ID: %s; Address: %s", i, a) zap.S().Infof("adding node ID: %s; Address: %s", i, a)
err = c.Put(id, pb.Node{ err = c.Put(process.Ctx(cmd), id, pb.Node{
Id: id, Id: id,
// TODO: NodeType is missing // TODO: NodeType is missing
Address: &pb.NodeAddress{ Address: &pb.NodeAddress{

View File

@ -148,22 +148,17 @@ func (node *Node) initOverlay(planet *Planet) error {
if err != nil { if err != nil {
return utils.CombineErrors(err, routing.Close()) return utils.CombineErrors(err, routing.Close())
} }
node.Kademlia = kad node.Kademlia = kad
node.Overlay = overlay.NewOverlayCache(teststore.New(), node.Kademlia, node.StatDB)
return nil
}
// initStatDB creates statdb for a given planet
func (node *Node) initStatDB() error {
dbPath := fmt.Sprintf("file:memdb%d?mode=memory&cache=shared", rand.Int63()) dbPath := fmt.Sprintf("file:memdb%d?mode=memory&cache=shared", rand.Int63())
sdb, err := statdb.NewStatDB("sqlite3", dbPath, zap.NewNop()) sdb, err := statdb.NewStatDB("sqlite3", dbPath, zap.NewNop())
if err != nil { if err != nil {
return err return err
} }
node.StatDB = sdb node.StatDB = sdb
node.Overlay = overlay.NewOverlayCache(teststore.New(), node.Kademlia, node.StatDB)
return nil return nil
} }

View File

@ -83,10 +83,6 @@ func New(t zaptest.TestingT, satelliteCount, storageNodeCount, uplinkCount int)
} }
for _, node := range planet.nodes { for _, node := range planet.nodes {
err := node.initStatDB()
if err != nil {
return nil, utils.CombineErrors(err, planet.Shutdown())
}
err = node.initOverlay(planet) err = node.initOverlay(planet)
if err != nil { if err != nil {
return nil, utils.CombineErrors(err, planet.Shutdown()) return nil, utils.CombineErrors(err, planet.Shutdown())

View File

@ -12,6 +12,7 @@ import (
"storj.io/storj/pkg/dht" "storj.io/storj/pkg/dht"
"storj.io/storj/pkg/pb" "storj.io/storj/pkg/pb"
"storj.io/storj/pkg/statdb" "storj.io/storj/pkg/statdb"
statproto "storj.io/storj/pkg/statdb/proto"
"storj.io/storj/pkg/storj" "storj.io/storj/pkg/storj"
"storj.io/storj/storage" "storj.io/storj/storage"
) )
@ -89,13 +90,30 @@ func (o *Cache) GetAll(ctx context.Context, nodeIDs storj.NodeIDList) ([]*pb.Nod
} }
// Put adds a nodeID to the redis cache with a binary representation of proto defined Node // Put adds a nodeID to the redis cache with a binary representation of proto defined Node
func (o *Cache) Put(nodeID storj.NodeID, value pb.Node) error { func (o *Cache) Put(ctx context.Context, nodeID storj.NodeID, value pb.Node) error {
// If we get a Node without an ID (i.e. bootstrap node) // If we get a Node without an ID (i.e. bootstrap node)
// we don't want to add to the routing tbale // we don't want to add to the routing tbale
if nodeID == (storj.NodeID{}) { if nodeID == (storj.NodeID{}) {
return nil return nil
} }
// get existing node rep, or create a new statdb node with 0 rep
res, err := o.StatDB.CreateEntryIfNotExists(ctx, &statproto.CreateEntryIfNotExistsRequest{
Node: &pb.Node{
Id: nodeID,
},
})
if err != nil {
return err
}
stats := res.Stats
value.Reputation = &pb.NodeStats{
AuditSuccessRatio: stats.AuditSuccessRatio,
AuditCount: stats.AuditCount,
UptimeRatio: stats.UptimeRatio,
UptimeCount: stats.UptimeCount,
}
data, err := proto.Marshal(&value) data, err := proto.Marshal(&value)
if err != nil { if err != nil {
return err return err
@ -123,7 +141,7 @@ func (o *Cache) Refresh(ctx context.Context) error {
nodes := o.DHT.Seen() nodes := o.DHT.Seen()
for _, v := range nodes { for _, v := range nodes {
if err := o.Put(v.Id, *v); err != nil { if err := o.Put(ctx, v.Id, *v); err != nil {
return err return err
} }
} }

View File

@ -13,7 +13,7 @@ import (
"storj.io/storj/internal/testplanet" "storj.io/storj/internal/testplanet"
"storj.io/storj/internal/teststorj" "storj.io/storj/internal/teststorj"
"storj.io/storj/pkg/overlay" "storj.io/storj/pkg/overlay"
"storj.io/storj/pkg/pb" "storj.io/storj/pkg/statdb"
"storj.io/storj/pkg/storj" "storj.io/storj/pkg/storj"
"storj.io/storj/storage" "storj.io/storj/storage"
"storj.io/storj/storage/boltdb" "storj.io/storj/storage/boltdb"
@ -29,15 +29,15 @@ var (
invalid2ID = teststorj.NodeIDFromString("invalid2") invalid2ID = teststorj.NodeIDFromString("invalid2")
) )
func testCache(ctx context.Context, t *testing.T, store storage.KeyValueStore) { func testCache(ctx context.Context, t *testing.T, store storage.KeyValueStore, sdb *statdb.StatDB) {
cache := overlay.Cache{DB: store} cache := overlay.Cache{DB: store, StatDB: sdb}
{ // Put { // Put
err := cache.Put(valid1ID, pb.Node{Address: &pb.NodeAddress{Transport: pb.NodeTransport_TCP_TLS_GRPC, Address: "127.0.0.1:9001"}}) err := cache.Put(ctx, valid1ID, *teststorj.MockNode("valid1"))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
err = cache.Put(valid2ID, pb.Node{Address: &pb.NodeAddress{Transport: pb.NodeTransport_TCP_TLS_GRPC, Address: "127.0.0.1:9002"}}) err = cache.Put(ctx, valid2ID, *teststorj.MockNode("valid2"))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -45,9 +45,8 @@ func testCache(ctx context.Context, t *testing.T, store storage.KeyValueStore) {
{ // Get { // Get
valid2, err := cache.Get(ctx, valid2ID) valid2, err := cache.Get(ctx, valid2ID)
if assert.NoError(t, err) { assert.NoError(t, err)
assert.Equal(t, valid2.Address.Address, "127.0.0.1:9002") assert.Equal(t, valid2.Id, valid2ID)
}
invalid2, err := cache.Get(ctx, invalid2ID) invalid2, err := cache.Get(ctx, invalid2ID)
assert.Error(t, err) assert.Error(t, err)
@ -62,23 +61,20 @@ func testCache(ctx context.Context, t *testing.T, store storage.KeyValueStore) {
{ // GetAll { // GetAll
nodes, err := cache.GetAll(ctx, storj.NodeIDList{valid2ID, valid1ID, valid2ID}) nodes, err := cache.GetAll(ctx, storj.NodeIDList{valid2ID, valid1ID, valid2ID})
if assert.NoError(t, err) { assert.NoError(t, err)
assert.Equal(t, nodes[0].Address.Address, "127.0.0.1:9002") assert.Equal(t, nodes[0].Id, valid2ID)
assert.Equal(t, nodes[1].Address.Address, "127.0.0.1:9001") assert.Equal(t, nodes[1].Id, valid1ID)
assert.Equal(t, nodes[2].Address.Address, "127.0.0.1:9002") assert.Equal(t, nodes[2].Id, valid2ID)
}
nodes, err = cache.GetAll(ctx, storj.NodeIDList{valid1ID, invalid1ID}) nodes, err = cache.GetAll(ctx, storj.NodeIDList{valid1ID, invalid1ID})
if assert.NoError(t, err) { assert.NoError(t, err)
assert.Equal(t, nodes[0].Address.Address, "127.0.0.1:9001") assert.Equal(t, nodes[0].Id, valid1ID)
assert.Nil(t, nodes[1]) assert.Nil(t, nodes[1])
}
nodes, err = cache.GetAll(ctx, make(storj.NodeIDList, 2)) nodes, err = cache.GetAll(ctx, make(storj.NodeIDList, 2))
if assert.NoError(t, err) { assert.NoError(t, err)
assert.Nil(t, nodes[0]) assert.Nil(t, nodes[0])
assert.Nil(t, nodes[1]) assert.Nil(t, nodes[1])
}
_, err = cache.GetAll(ctx, storj.NodeIDList{}) _, err = cache.GetAll(ctx, storj.NodeIDList{})
assert.True(t, overlay.OverlayError.Has(err)) assert.True(t, overlay.OverlayError.Has(err))
@ -95,6 +91,14 @@ func TestCache_Redis(t *testing.T) {
ctx := testcontext.New(t) ctx := testcontext.New(t)
defer ctx.Cleanup() defer ctx.Cleanup()
planet, err := testplanet.New(t, 1, 4, 0)
if err != nil {
t.Fatal(err)
}
defer ctx.Check(planet.Shutdown)
planet.Start(ctx)
sdb := planet.Satellites[0].StatDB
redisAddr, cleanup, err := redisserver.Start() redisAddr, cleanup, err := redisserver.Start()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -107,27 +111,43 @@ func TestCache_Redis(t *testing.T) {
} }
defer ctx.Check(store.Close) defer ctx.Check(store.Close)
testCache(ctx, t, store) testCache(ctx, t, store, sdb)
} }
func TestCache_Bolt(t *testing.T) { func TestCache_Bolt(t *testing.T) {
ctx := testcontext.New(t) ctx := testcontext.New(t)
defer ctx.Cleanup() defer ctx.Cleanup()
planet, err := testplanet.New(t, 1, 4, 0)
if err != nil {
t.Fatal(err)
}
defer ctx.Check(planet.Shutdown)
planet.Start(ctx)
sdb := planet.Satellites[0].StatDB
client, err := boltdb.New(ctx.File("overlay.db"), "overlay") client, err := boltdb.New(ctx.File("overlay.db"), "overlay")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer ctx.Check(client.Close) defer ctx.Check(client.Close)
testCache(ctx, t, client) testCache(ctx, t, client, sdb)
} }
func TestCache_Store(t *testing.T) { func TestCache_Store(t *testing.T) {
ctx := testcontext.New(t) ctx := testcontext.New(t)
defer ctx.Cleanup() defer ctx.Cleanup()
testCache(ctx, t, teststore.New()) planet, err := testplanet.New(t, 1, 4, 0)
if err != nil {
t.Fatal(err)
}
defer ctx.Check(planet.Shutdown)
planet.Start(ctx)
sdb := planet.Satellites[0].StatDB
testCache(ctx, t, teststore.New(), sdb)
} }
func TestCache_Refresh(t *testing.T) { func TestCache_Refresh(t *testing.T) {

View File

@ -41,6 +41,10 @@ type Overlay struct {
type Options struct { type Options struct {
Amount int Amount int
Space int64 Space int64
Uptime float64
UptimeCount int64
AuditSuccess float64
AuditCount int64
Excluded storj.NodeIDList Excluded storj.NodeIDList
} }
@ -72,6 +76,12 @@ func (o *Overlay) Choose(ctx context.Context, op Options) ([]*pb.Node, error) {
Opts: &pb.OverlayOptions{ Opts: &pb.OverlayOptions{
Amount: int64(op.Amount), Amount: int64(op.Amount),
Restrictions: &pb.NodeRestrictions{FreeDisk: op.Space}, Restrictions: &pb.NodeRestrictions{FreeDisk: op.Space},
MinStats: &pb.NodeStats{
UptimeRatio: op.Uptime,
UptimeCount: op.UptimeCount,
AuditSuccessRatio: op.AuditSuccess,
AuditCount: op.AuditCount,
},
ExcludedNodes: exIDs, ExcludedNodes: exIDs,
}, },
}) })

View File

@ -1,28 +1,23 @@
// Copyright (C) 2018 Storj Labs, Inc. // Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information. // See LICENSE for copying information.
package overlay package overlay_test
import ( import (
"context"
"net"
"testing" "testing"
"time"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"storj.io/storj/internal/identity" "storj.io/storj/internal/identity"
"storj.io/storj/internal/testcontext" "storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testplanet"
"storj.io/storj/internal/teststorj" "storj.io/storj/internal/teststorj"
"storj.io/storj/pkg/overlay"
"storj.io/storj/pkg/pb" "storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj" "storj.io/storj/pkg/storj"
"storj.io/storj/storage"
"storj.io/storj/storage/teststore"
) )
var fooID = teststorj.NodeIDFromString("foo")
func TestNewOverlayClient(t *testing.T) { func TestNewOverlayClient(t *testing.T) {
ctx := testcontext.New(t) ctx := testcontext.New(t)
defer ctx.Cleanup() defer ctx.Cleanup()
@ -41,14 +36,12 @@ func TestNewOverlayClient(t *testing.T) {
identity, err := ca.NewIdentity() identity, err := ca.NewIdentity()
assert.NoError(t, err) assert.NoError(t, err)
oc, err := NewOverlayClient(identity, v.address) oc, err := overlay.NewOverlayClient(identity, v.address)
assert.NoError(t, err) assert.NoError(t, err)
assert.NotNil(t, oc) assert.NotNil(t, oc)
overlay, ok := oc.(*Overlay) _, ok := oc.(*overlay.Overlay)
assert.True(t, ok) assert.True(t, ok)
assert.NotEmpty(t, overlay.client)
} }
} }
@ -56,15 +49,29 @@ func TestChoose(t *testing.T) {
ctx := testcontext.New(t) ctx := testcontext.New(t)
defer ctx.Cleanup() defer ctx.Cleanup()
planet, cleanup := getPlanet(ctx, t)
defer cleanup()
oc := getOverlayClient(t, planet)
cases := []struct { cases := []struct {
limit int limit int
space int64 space int64
bandwidth int64
uptime float64
uptimeCount int64
auditSuccess float64
auditCount int64
allNodes []*pb.Node allNodes []*pb.Node
excluded storj.NodeIDList excluded storj.NodeIDList
}{ }{
{ {
limit: 4, limit: 4,
space: 0, space: 0,
bandwidth: 0,
uptime: 0,
uptimeCount: 0,
auditSuccess: 0,
auditCount: 0,
allNodes: func() []*pb.Node { allNodes: func() []*pb.Node {
n1 := teststorj.MockNode("n1") n1 := teststorj.MockNode("n1")
n2 := teststorj.MockNode("n2") n2 := teststorj.MockNode("n2")
@ -91,47 +98,31 @@ func TestChoose(t *testing.T) {
} }
for _, v := range cases { for _, v := range cases {
lis, err := net.Listen("tcp", "127.0.0.1:0") newNodes, err := oc.Choose(ctx, overlay.Options{
assert.NoError(t, err) Amount: v.limit,
Space: v.space,
var listItems []storage.ListItem Uptime: v.uptime,
for _, n := range v.allNodes { UptimeCount: v.uptimeCount,
data, err := proto.Marshal(n) AuditSuccess: v.auditSuccess,
assert.NoError(t, err) AuditCount: v.auditCount,
listItems = append(listItems, storage.ListItem{ Excluded: v.excluded,
Key: n.Id.Bytes(),
Value: data,
}) })
assert.NoError(t, err)
excludedNodes := make(map[storj.NodeID]bool)
for _, e := range v.excluded {
excludedNodes[e] = true
} }
assert.Len(t, newNodes, v.limit)
for _, n := range newNodes {
assert.NotContains(t, excludedNodes, n.Id)
assert.True(t, n.GetRestrictions().GetFreeDisk() >= v.space)
assert.True(t, n.GetRestrictions().GetFreeBandwidth() >= v.bandwidth)
assert.True(t, n.GetReputation().GetUptimeRatio() >= v.uptime)
assert.True(t, n.GetReputation().GetUptimeCount() >= v.uptimeCount)
assert.True(t, n.GetReputation().GetAuditSuccessRatio() >= v.auditSuccess)
assert.True(t, n.GetReputation().GetAuditCount() >= v.auditCount)
ca, err := testidentity.NewTestCA(ctx)
assert.NoError(t, err)
identity, err := ca.NewIdentity()
assert.NoError(t, err)
srv := NewMockServer(listItems, func() grpc.ServerOption {
opt, err := identity.ServerOption()
assert.NoError(t, err)
return opt
}())
go func() { assert.NoError(t, srv.Serve(lis)) }()
defer srv.Stop()
oc, err := NewOverlayClient(identity, lis.Addr().String())
assert.NoError(t, err)
assert.NotNil(t, oc)
overlay, ok := oc.(*Overlay)
assert.True(t, ok)
assert.NotEmpty(t, overlay.client)
newNodes, err := oc.Choose(ctx, Options{Amount: v.limit, Space: v.space, Excluded: v.excluded})
assert.NoError(t, err)
for _, new := range newNodes {
for _, ex := range v.excluded {
assert.NotEqual(t, ex.String(), new.Id)
}
} }
} }
} }
@ -140,41 +131,34 @@ func TestLookup(t *testing.T) {
ctx := testcontext.New(t) ctx := testcontext.New(t)
defer ctx.Cleanup() defer ctx.Cleanup()
planet, cleanup := getPlanet(ctx, t)
defer cleanup()
oc := getOverlayClient(t, planet)
nid1 := planet.StorageNodes[0].ID()
cases := []struct { cases := []struct {
nodeID storj.NodeID nodeID storj.NodeID
expectedCalls int expectErr bool
}{ }{
{ {
nodeID: fooID, nodeID: nid1,
expectedCalls: 1, expectErr: false,
},
{
nodeID: teststorj.NodeIDFromString("n1"),
expectErr: true,
}, },
} }
for _, v := range cases { for _, v := range cases {
lis, err := net.Listen("tcp", "127.0.0.1:0") n, err := oc.Lookup(ctx, v.nodeID)
if v.expectErr {
assert.Error(t, err)
} else {
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, n.Id.String(), v.nodeID.String())
srv, mock, err := newTestServer(ctx) }
assert.NoError(t, err)
go func() { assert.NoError(t, srv.Serve(lis)) }()
defer srv.Stop()
ca, err := testidentity.NewTestCA(ctx)
assert.NoError(t, err)
identity, err := ca.NewIdentity()
assert.NoError(t, err)
oc, err := NewOverlayClient(identity, lis.Addr().String())
assert.NoError(t, err)
assert.NotNil(t, oc)
overlay, ok := oc.(*Overlay)
assert.True(t, ok)
assert.NotEmpty(t, overlay.client)
_, err = oc.Lookup(ctx, v.nodeID)
assert.NoError(t, err)
assert.Equal(t, mock.lookupCalled, v.expectedCalls)
} }
} }
@ -183,40 +167,30 @@ func TestBulkLookup(t *testing.T) {
ctx := testcontext.New(t) ctx := testcontext.New(t)
defer ctx.Cleanup() defer ctx.Cleanup()
planet, cleanup := getPlanet(ctx, t)
defer cleanup()
oc := getOverlayClient(t, planet)
nid1 := planet.StorageNodes[0].ID()
nid2 := planet.StorageNodes[1].ID()
nid3 := planet.StorageNodes[2].ID()
cases := []struct { cases := []struct {
nodeIDs storj.NodeIDList nodeIDs storj.NodeIDList
expectedCalls int expectedCalls int
}{ }{
{ {
nodeIDs: storj.NodeIDList{fooID, fooID, fooID}, nodeIDs: storj.NodeIDList{nid1, nid2, nid3},
expectedCalls: 1, expectedCalls: 1,
}, },
} }
for _, v := range cases { for _, v := range cases {
lis, err := net.Listen("tcp", "127.0.0.1:0") resNodes, err := oc.BulkLookup(ctx, v.nodeIDs)
assert.NoError(t, err) assert.NoError(t, err)
for i, n := range resNodes {
srv, mock, err := newTestServer(ctx) assert.Equal(t, n.Id, v.nodeIDs[i])
assert.NoError(t, err) }
go func() { assert.NoError(t, srv.Serve(lis)) }() assert.Equal(t, len(resNodes), len(v.nodeIDs))
defer srv.Stop()
ca, err := testidentity.NewTestCA(ctx)
assert.NoError(t, err)
identity, err := ca.NewIdentity()
assert.NoError(t, err)
oc, err := NewOverlayClient(identity, lis.Addr().String())
assert.NoError(t, err)
assert.NotNil(t, oc)
overlay, ok := oc.(*Overlay)
assert.True(t, ok)
assert.NotEmpty(t, overlay.client)
_, err = oc.BulkLookup(ctx, v.nodeIDs)
assert.NoError(t, err)
assert.Equal(t, mock.bulkLookupCalled, v.expectedCalls)
} }
} }
@ -224,33 +198,17 @@ func TestBulkLookupV2(t *testing.T) {
ctx := testcontext.New(t) ctx := testcontext.New(t)
defer ctx.Cleanup() defer ctx.Cleanup()
lis, err := net.Listen("tcp", "127.0.0.1:0") planet, cleanup := getPlanet(ctx, t)
assert.NoError(t, err) defer cleanup()
oc := getOverlayClient(t, planet)
srv, s, err := newServer(ctx) cache := planet.Satellites[0].Overlay
assert.NoError(t, err)
go func() { assert.NoError(t, srv.Serve(lis)) }()
defer srv.Stop()
ca, err := testidentity.NewTestCA(ctx)
assert.NoError(t, err)
identity, err := ca.NewIdentity()
assert.NoError(t, err)
oc, err := NewOverlayClient(identity, lis.Addr().String())
assert.NoError(t, err)
assert.NotNil(t, oc)
overlay, ok := oc.(*Overlay)
assert.True(t, ok)
assert.NotEmpty(t, overlay.client)
n1 := teststorj.MockNode("n1") n1 := teststorj.MockNode("n1")
n2 := teststorj.MockNode("n2") n2 := teststorj.MockNode("n2")
n3 := teststorj.MockNode("n3") n3 := teststorj.MockNode("n3")
nodes := []*pb.Node{n1, n2, n3} nodes := []*pb.Node{n1, n2, n3}
for _, n := range nodes { for _, n := range nodes {
assert.NoError(t, s.cache.Put(n.Id, *n)) assert.NoError(t, cache.Put(ctx, n.Id, *n))
} }
nid1 := teststorj.NodeIDFromString("n1") nid1 := teststorj.NodeIDFromString("n1")
@ -265,86 +223,61 @@ func TestBulkLookupV2(t *testing.T) {
} }
{ // valid ids { // valid ids
ns, err := oc.BulkLookup(ctx, storj.NodeIDList{nid1, nid2, nid3}) idList := storj.NodeIDList{nid1, nid2, nid3}
ns, err := oc.BulkLookup(ctx, idList)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, nodes, ns)
for i, n := range ns {
assert.Equal(t, n.Id, idList[i])
}
} }
{ // missing ids { // missing ids
ns, err := oc.BulkLookup(ctx, storj.NodeIDList{nid4, nid5}) idList := storj.NodeIDList{nid4, nid5}
ns, err := oc.BulkLookup(ctx, idList)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, []*pb.Node{nil, nil}, ns) assert.Equal(t, []*pb.Node{nil, nil}, ns)
} }
{ // different order and missing { // different order and missing
ns, err := oc.BulkLookup(ctx, storj.NodeIDList{nid3, nid4, nid1, nid2, nid5}) idList := storj.NodeIDList{nid3, nid4, nid1, nid2, nid5}
ns, err := oc.BulkLookup(ctx, idList)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, []*pb.Node{n3, nil, n1, n2, nil}, ns)
expectedNodes := []*pb.Node{n3, nil, n1, n2, nil}
for i, n := range ns {
if n == nil {
assert.Nil(t, expectedNodes[i])
} else {
assert.Equal(t, n.Id, expectedNodes[i].Id)
}
}
} }
} }
func newServer(ctx context.Context) (*grpc.Server, *Server, error) { func getPlanet(ctx *testcontext.Context, t *testing.T) (planet *testplanet.Planet, f func()) {
ca, err := testidentity.NewTestCA(ctx) planet, err := testplanet.New(t, 1, 4, 1)
if err != nil { if err != nil {
return nil, nil, err t.Fatal(err)
} }
identity, err := ca.NewIdentity()
planet.Start(ctx)
// we wait a second for all the nodes to complete bootstrapping off the satellite
time.Sleep(2 * time.Second)
f = func() {
ctx.Check(planet.Shutdown)
}
return planet, f
}
func getOverlayClient(t *testing.T, planet *testplanet.Planet) (oc overlay.Client) {
oc, err := planet.Uplinks[0].DialOverlay(planet.Satellites[0])
if err != nil { if err != nil {
return nil, nil, err t.Fatal(err)
}
identOpt, err := identity.ServerOption()
if err != nil {
return nil, nil, err
} }
grpcServer := grpc.NewServer(identOpt) return oc
s := &Server{cache: NewOverlayCache(teststore.New(), nil, nil)}
pb.RegisterOverlayServer(grpcServer, s)
return grpcServer, s, nil
}
func newTestServer(ctx context.Context) (*grpc.Server, *mockOverlayServer, error) {
ca, err := testidentity.NewTestCA(ctx)
if err != nil {
return nil, nil, err
}
identity, err := ca.NewIdentity()
if err != nil {
return nil, nil, err
}
identOpt, err := identity.ServerOption()
if err != nil {
return nil, nil, err
}
grpcServer := grpc.NewServer(identOpt)
mo := &mockOverlayServer{lookupCalled: 0, FindStorageNodesCalled: 0}
pb.RegisterOverlayServer(grpcServer, mo)
return grpcServer, mo, nil
}
type mockOverlayServer struct {
lookupCalled int
bulkLookupCalled int
FindStorageNodesCalled int
}
func (o *mockOverlayServer) Lookup(ctx context.Context, req *pb.LookupRequest) (*pb.LookupResponse, error) {
o.lookupCalled++
return &pb.LookupResponse{}, nil
}
func (o *mockOverlayServer) FindStorageNodes(ctx context.Context, req *pb.FindStorageNodesRequest) (*pb.FindStorageNodesResponse, error) {
o.FindStorageNodesCalled++
return &pb.FindStorageNodesResponse{}, nil
}
func (o *mockOverlayServer) BulkLookup(ctx context.Context, reqs *pb.LookupRequests) (*pb.LookupResponses, error) {
o.bulkLookupCalled++
return &pb.LookupResponses{}, nil
} }

View File

@ -75,23 +75,32 @@ func (o *Server) FindStorageNodes(ctx context.Context, req *pb.FindStorageNodesR
excluded := opts.ExcludedNodes excluded := opts.ExcludedNodes
restrictions := opts.GetRestrictions() restrictions := opts.GetRestrictions()
restrictedBandwidth := restrictions.GetFreeBandwidth() reputation := opts.GetMinStats()
restrictedSpace := restrictions.GetFreeDisk()
var startID storj.NodeID var startID storj.NodeID
result := []*pb.Node{} result := []*pb.Node{}
for { for {
var nodes []*pb.Node var nodes []*pb.Node
nodes, startID, err = o.populate(ctx, req.Start, maxNodes, restrictedBandwidth, restrictedSpace, excluded) nodes, startID, err = o.populate(ctx, req.Start, maxNodes, restrictions, reputation, excluded)
if err != nil { if err != nil {
return nil, Error.Wrap(err) return nil, Error.Wrap(err)
} }
if len(nodes) <= 0 { resultNodes := []*pb.Node{}
usedAddrs := make(map[string]bool)
for _, n := range nodes {
addr := n.Address.GetAddress()
excluded = append(excluded, n.Id) // exclude all nodes on next iteration
if !usedAddrs[addr] {
resultNodes = append(resultNodes, n)
usedAddrs[addr] = true
}
}
if len(resultNodes) <= 0 {
break break
} }
result = append(result, nodes...) result = append(result, resultNodes...)
if len(result) >= int(maxNodes) || startID == (storj.NodeID{}) { if len(result) >= int(maxNodes) || startID == (storj.NodeID{}) {
break break
@ -132,7 +141,10 @@ func (o *Server) getNodes(ctx context.Context, keys storage.Keys) ([]*pb.Node, e
} }
func (o *Server) populate(ctx context.Context, startID storj.NodeID, maxNodes, restrictedBandwidth, restrictedSpace int64, excluded storj.NodeIDList) ([]*pb.Node, storj.NodeID, error) { func (o *Server) populate(ctx context.Context, startID storj.NodeID, maxNodes int64,
minRestrictions *pb.NodeRestrictions, minReputation *pb.NodeStats,
excluded storj.NodeIDList) ([]*pb.Node, storj.NodeID, error) {
limit := int(maxNodes * 2) limit := int(maxNodes * 2)
keys, err := o.cache.DB.List(startID.Bytes(), limit) keys, err := o.cache.DB.List(startID.Bytes(), limit)
if err != nil { if err != nil {
@ -158,11 +170,16 @@ func (o *Server) populate(ctx context.Context, startID storj.NodeID, maxNodes, r
continue continue
} }
rest := v.GetRestrictions() nodeRestrictions := v.GetRestrictions()
if rest.GetFreeBandwidth() < restrictedBandwidth || rest.GetFreeDisk() < restrictedSpace { nodeReputation := v.GetReputation()
continue
} if nodeRestrictions.GetFreeBandwidth() < minRestrictions.GetFreeBandwidth() ||
if contains(excluded, v.Id) { nodeRestrictions.GetFreeDisk() < minRestrictions.GetFreeDisk() ||
nodeReputation.GetUptimeRatio() < minReputation.GetUptimeRatio() ||
nodeReputation.GetUptimeCount() < minReputation.GetUptimeCount() ||
nodeReputation.GetAuditSuccessRatio() < minReputation.GetAuditSuccessRatio() ||
nodeReputation.GetAuditCount() < minReputation.GetAuditCount() ||
contains(excluded, v.Id) {
continue continue
} }
result = append(result, v) result = append(result, v)

View File

@ -1,41 +0,0 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package overlay
import (
"go.uber.org/zap"
"google.golang.org/grpc"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/pkg/kademlia"
"storj.io/storj/pkg/pb"
"storj.io/storj/storage"
"storj.io/storj/storage/teststore"
)
// NewMockServer provides a mock grpc server for testing
func NewMockServer(items []storage.ListItem, opts ...grpc.ServerOption) *grpc.Server {
grpcServer := grpc.NewServer(opts...)
registry := monkit.Default
k := kademlia.NewMockKademlia()
c := &Cache{
DB: teststore.New(),
DHT: k,
}
_ = storage.PutAll(c.DB, items...)
s := Server{
dht: k,
cache: c,
logger: zap.NewNop(),
metrics: registry,
}
pb.RegisterOverlayServer(grpcServer, &s)
return grpcServer
}

View File

@ -111,6 +111,7 @@ func (m *NodeRestrictions) GetFreeDisk() int64 {
return 0 return 0
} }
// TODO move statdb.Update() stuff out of here
// Node represents a node in the overlay network // Node represents a node in the overlay network
// Node is info for a updating a single storagenode, used in the Update rpc calls // Node is info for a updating a single storagenode, used in the Update rpc calls
type Node struct { type Node struct {
@ -118,13 +119,14 @@ type Node struct {
Address *NodeAddress `protobuf:"bytes,2,opt,name=address" json:"address,omitempty"` Address *NodeAddress `protobuf:"bytes,2,opt,name=address" json:"address,omitempty"`
Type NodeType `protobuf:"varint,3,opt,name=type,proto3,enum=node.NodeType" json:"type,omitempty"` Type NodeType `protobuf:"varint,3,opt,name=type,proto3,enum=node.NodeType" json:"type,omitempty"`
Restrictions *NodeRestrictions `protobuf:"bytes,4,opt,name=restrictions" json:"restrictions,omitempty"` Restrictions *NodeRestrictions `protobuf:"bytes,4,opt,name=restrictions" json:"restrictions,omitempty"`
Metadata *NodeMetadata `protobuf:"bytes,5,opt,name=metadata" json:"metadata,omitempty"` Reputation *NodeStats `protobuf:"bytes,5,opt,name=reputation" json:"reputation,omitempty"`
LatencyList []int64 `protobuf:"varint,6,rep,packed,name=latency_list,json=latencyList" json:"latency_list,omitempty"` Metadata *NodeMetadata `protobuf:"bytes,6,opt,name=metadata" json:"metadata,omitempty"`
AuditSuccess bool `protobuf:"varint,7,opt,name=audit_success,json=auditSuccess,proto3" json:"audit_success,omitempty"` LatencyList []int64 `protobuf:"varint,7,rep,packed,name=latency_list,json=latencyList" json:"latency_list,omitempty"`
IsUp bool `protobuf:"varint,8,opt,name=is_up,json=isUp,proto3" json:"is_up,omitempty"` AuditSuccess bool `protobuf:"varint,8,opt,name=audit_success,json=auditSuccess,proto3" json:"audit_success,omitempty"`
UpdateLatency bool `protobuf:"varint,9,opt,name=update_latency,json=updateLatency,proto3" json:"update_latency,omitempty"` IsUp bool `protobuf:"varint,9,opt,name=is_up,json=isUp,proto3" json:"is_up,omitempty"`
UpdateAuditSuccess bool `protobuf:"varint,10,opt,name=update_audit_success,json=updateAuditSuccess,proto3" json:"update_audit_success,omitempty"` UpdateLatency bool `protobuf:"varint,10,opt,name=update_latency,json=updateLatency,proto3" json:"update_latency,omitempty"`
UpdateUptime bool `protobuf:"varint,11,opt,name=update_uptime,json=updateUptime,proto3" json:"update_uptime,omitempty"` UpdateAuditSuccess bool `protobuf:"varint,11,opt,name=update_audit_success,json=updateAuditSuccess,proto3" json:"update_audit_success,omitempty"`
UpdateUptime bool `protobuf:"varint,12,opt,name=update_uptime,json=updateUptime,proto3" json:"update_uptime,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
@ -175,6 +177,13 @@ func (m *Node) GetRestrictions() *NodeRestrictions {
return nil return nil
} }
func (m *Node) GetReputation() *NodeStats {
if m != nil {
return m.Reputation
}
return nil
}
func (m *Node) GetMetadata() *NodeMetadata { func (m *Node) GetMetadata() *NodeMetadata {
if m != nil { if m != nil {
return m.Metadata return m.Metadata

View File

@ -14,6 +14,7 @@ message NodeRestrictions {
int64 free_disk = 2; int64 free_disk = 2;
} }
// TODO move statdb.Update() stuff out of here
// Node represents a node in the overlay network // Node represents a node in the overlay network
// Node is info for a updating a single storagenode, used in the Update rpc calls // Node is info for a updating a single storagenode, used in the Update rpc calls
message Node { message Node {
@ -21,13 +22,14 @@ message Node {
NodeAddress address = 2; NodeAddress address = 2;
NodeType type = 3; NodeType type = 3;
NodeRestrictions restrictions = 4; NodeRestrictions restrictions = 4;
NodeMetadata metadata = 5; NodeStats reputation = 5;
repeated int64 latency_list = 6; NodeMetadata metadata = 6;
bool audit_success = 7; repeated int64 latency_list = 7;
bool is_up = 8; bool audit_success = 8;
bool update_latency = 9; bool is_up = 9;
bool update_audit_success = 10; bool update_latency = 10;
bool update_uptime = 11; bool update_audit_success = 11;
bool update_uptime = 12;
} }
// NodeType is an enum of possible node types // NodeType is an enum of possible node types

View File

@ -98,9 +98,10 @@ func (s *StatDB) Create(ctx context.Context, createReq *pb.CreateRequest) (resp
nodeStats := &pb.NodeStats{ nodeStats := &pb.NodeStats{
NodeId: node.Id, NodeId: node.Id,
AuditCount: dbNode.TotalAuditCount,
AuditSuccessRatio: dbNode.AuditSuccessRatio, AuditSuccessRatio: dbNode.AuditSuccessRatio,
AuditCount: dbNode.TotalAuditCount,
UptimeRatio: dbNode.UptimeRatio, UptimeRatio: dbNode.UptimeRatio,
UptimeCount: dbNode.TotalUptimeCount,
} }
return &pb.CreateResponse{ return &pb.CreateResponse{
Stats: nodeStats, Stats: nodeStats,
@ -118,9 +119,10 @@ func (s *StatDB) Get(ctx context.Context, getReq *pb.GetRequest) (resp *pb.GetRe
nodeStats := &pb.NodeStats{ nodeStats := &pb.NodeStats{
NodeId: getReq.NodeId, NodeId: getReq.NodeId,
AuditCount: dbNode.TotalAuditCount,
AuditSuccessRatio: dbNode.AuditSuccessRatio, AuditSuccessRatio: dbNode.AuditSuccessRatio,
AuditCount: dbNode.TotalAuditCount,
UptimeRatio: dbNode.UptimeRatio, UptimeRatio: dbNode.UptimeRatio,
UptimeCount: dbNode.TotalUptimeCount,
} }
return &pb.GetResponse{ return &pb.GetResponse{
Stats: nodeStats, Stats: nodeStats,
@ -246,7 +248,9 @@ func (s *StatDB) Update(ctx context.Context, updateReq *pb.UpdateRequest) (resp
nodeStats := &pb.NodeStats{ nodeStats := &pb.NodeStats{
NodeId: node.Id, NodeId: node.Id,
AuditSuccessRatio: dbNode.AuditSuccessRatio, AuditSuccessRatio: dbNode.AuditSuccessRatio,
AuditCount: dbNode.TotalAuditCount,
UptimeRatio: dbNode.UptimeRatio, UptimeRatio: dbNode.UptimeRatio,
UptimeCount: dbNode.TotalUptimeCount,
} }
return &pb.UpdateResponse{ return &pb.UpdateResponse{
Stats: nodeStats, Stats: nodeStats,
@ -288,8 +292,9 @@ func (s *StatDB) UpdateUptime(ctx context.Context, updateReq *pb.UpdateUptimeReq
nodeStats := &pb.NodeStats{ nodeStats := &pb.NodeStats{
NodeId: node.Id, NodeId: node.Id,
AuditSuccessRatio: dbNode.AuditSuccessRatio, AuditSuccessRatio: dbNode.AuditSuccessRatio,
UptimeRatio: dbNode.UptimeRatio,
AuditCount: dbNode.TotalAuditCount, AuditCount: dbNode.TotalAuditCount,
UptimeRatio: dbNode.UptimeRatio,
UptimeCount: dbNode.TotalUptimeCount,
} }
return &pb.UpdateUptimeResponse{ return &pb.UpdateUptimeResponse{
Stats: nodeStats, Stats: nodeStats,
@ -331,8 +336,9 @@ func (s *StatDB) UpdateAuditSuccess(ctx context.Context, updateReq *pb.UpdateAud
nodeStats := &pb.NodeStats{ nodeStats := &pb.NodeStats{
NodeId: node.Id, NodeId: node.Id,
AuditSuccessRatio: dbNode.AuditSuccessRatio, AuditSuccessRatio: dbNode.AuditSuccessRatio,
UptimeRatio: dbNode.UptimeRatio,
AuditCount: dbNode.TotalAuditCount, AuditCount: dbNode.TotalAuditCount,
UptimeRatio: dbNode.UptimeRatio,
UptimeCount: dbNode.TotalUptimeCount,
} }
return &pb.UpdateAuditSuccessResponse{ return &pb.UpdateAuditSuccessResponse{
Stats: nodeStats, Stats: nodeStats,