From 6c655d117b10e1248e9355d51162b3023c5f8109 Mon Sep 17 00:00:00 2001 From: Maximillian von Briesen Date: Tue, 4 Dec 2018 15:18:26 -0500 Subject: [PATCH] Filter nodes by reputation and IP address (node selection) (#720) * Pulls statdb stats into overlay cache whenever cache.Put() is called * Updates overlay.FindStorageNodes()/overlayClient.Choose() to filter based on node stats * Updates overlay.FindStorageNodes()/overlayClient.Choose() to exclude duplicate IP addresses --- cmd/overlay/cache.go | 8 +- cmd/overlay/main.go | 2 +- internal/testplanet/node.go | 11 +- internal/testplanet/planet.go | 4 - pkg/overlay/cache.go | 22 ++- pkg/overlay/cache_test.go | 68 +++++--- pkg/overlay/client.go | 20 ++- pkg/overlay/client_test.go | 319 ++++++++++++++-------------------- pkg/overlay/server.go | 39 +++-- pkg/overlay/utils_test.go | 41 ----- pkg/pb/node.pb.go | 23 ++- pkg/pb/node.proto | 16 +- pkg/statdb/statdb.go | 14 +- 13 files changed, 279 insertions(+), 308 deletions(-) delete mode 100644 pkg/overlay/utils_test.go diff --git a/cmd/overlay/cache.go b/cmd/overlay/cache.go index e12c37e4f..93ed5b004 100644 --- a/cmd/overlay/cache.go +++ b/cmd/overlay/cache.go @@ -9,6 +9,7 @@ import ( "go.uber.org/zap" "storj.io/storj/pkg/overlay" + "storj.io/storj/pkg/statdb" "storj.io/storj/storage" "storj.io/storj/storage/boltdb" "storj.io/storj/storage/redis" @@ -27,6 +28,7 @@ func (c cacheConfig) open() (*overlay.Cache, error) { } var db storage.KeyValueStore + var sdb *statdb.StatDB switch dburl.Scheme { case "bolt": @@ -47,6 +49,10 @@ func (c cacheConfig) open() (*overlay.Cache, error) { // add logger db = storelogger.New(zap.L(), db) + sdb, err = statdb.NewStatDB("postgres", dburl.String(), zap.L()) + if err != nil { + return nil, Error.New("statdb error: %s", err) + } - return overlay.NewOverlayCache(db, nil, nil), nil + return overlay.NewOverlayCache(db, nil, sdb), nil } diff --git a/cmd/overlay/main.go b/cmd/overlay/main.go index a9f3194e1..5143e2730 100644 --- a/cmd/overlay/main.go +++ b/cmd/overlay/main.go @@ -99,7 +99,7 @@ func cmdAdd(cmd *cobra.Command, args []string) (err error) { zap.S().Error(err) } zap.S().Infof("adding node ID: %s; Address: %s", i, a) - err = c.Put(id, pb.Node{ + err = c.Put(process.Ctx(cmd), id, pb.Node{ Id: id, // TODO: NodeType is missing Address: &pb.NodeAddress{ diff --git a/internal/testplanet/node.go b/internal/testplanet/node.go index 8686fc247..25b0aa956 100644 --- a/internal/testplanet/node.go +++ b/internal/testplanet/node.go @@ -148,22 +148,17 @@ func (node *Node) initOverlay(planet *Planet) error { if err != nil { return utils.CombineErrors(err, routing.Close()) } - node.Kademlia = kad - node.Overlay = overlay.NewOverlayCache(teststore.New(), node.Kademlia, node.StatDB) - - return nil -} - -// initStatDB creates statdb for a given planet -func (node *Node) initStatDB() error { dbPath := fmt.Sprintf("file:memdb%d?mode=memory&cache=shared", rand.Int63()) sdb, err := statdb.NewStatDB("sqlite3", dbPath, zap.NewNop()) if err != nil { return err } node.StatDB = sdb + + node.Overlay = overlay.NewOverlayCache(teststore.New(), node.Kademlia, node.StatDB) + return nil } diff --git a/internal/testplanet/planet.go b/internal/testplanet/planet.go index 7685117a5..b35556075 100644 --- a/internal/testplanet/planet.go +++ b/internal/testplanet/planet.go @@ -83,10 +83,6 @@ func New(t zaptest.TestingT, satelliteCount, storageNodeCount, uplinkCount int) } for _, node := range planet.nodes { - err := node.initStatDB() - if err != nil { - return nil, utils.CombineErrors(err, planet.Shutdown()) - } err = node.initOverlay(planet) if err != nil { return nil, utils.CombineErrors(err, planet.Shutdown()) diff --git a/pkg/overlay/cache.go b/pkg/overlay/cache.go index 0bd3e7a40..bf92b899e 100644 --- a/pkg/overlay/cache.go +++ b/pkg/overlay/cache.go @@ -12,6 +12,7 @@ import ( "storj.io/storj/pkg/dht" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/statdb" + statproto "storj.io/storj/pkg/statdb/proto" "storj.io/storj/pkg/storj" "storj.io/storj/storage" ) @@ -89,13 +90,30 @@ func (o *Cache) GetAll(ctx context.Context, nodeIDs storj.NodeIDList) ([]*pb.Nod } // Put adds a nodeID to the redis cache with a binary representation of proto defined Node -func (o *Cache) Put(nodeID storj.NodeID, value pb.Node) error { +func (o *Cache) Put(ctx context.Context, nodeID storj.NodeID, value pb.Node) error { // If we get a Node without an ID (i.e. bootstrap node) // we don't want to add to the routing tbale if nodeID == (storj.NodeID{}) { return nil } + // get existing node rep, or create a new statdb node with 0 rep + res, err := o.StatDB.CreateEntryIfNotExists(ctx, &statproto.CreateEntryIfNotExistsRequest{ + Node: &pb.Node{ + Id: nodeID, + }, + }) + if err != nil { + return err + } + stats := res.Stats + value.Reputation = &pb.NodeStats{ + AuditSuccessRatio: stats.AuditSuccessRatio, + AuditCount: stats.AuditCount, + UptimeRatio: stats.UptimeRatio, + UptimeCount: stats.UptimeCount, + } + data, err := proto.Marshal(&value) if err != nil { return err @@ -123,7 +141,7 @@ func (o *Cache) Refresh(ctx context.Context) error { nodes := o.DHT.Seen() for _, v := range nodes { - if err := o.Put(v.Id, *v); err != nil { + if err := o.Put(ctx, v.Id, *v); err != nil { return err } } diff --git a/pkg/overlay/cache_test.go b/pkg/overlay/cache_test.go index 87480cdda..e31ce0e3f 100644 --- a/pkg/overlay/cache_test.go +++ b/pkg/overlay/cache_test.go @@ -13,7 +13,7 @@ import ( "storj.io/storj/internal/testplanet" "storj.io/storj/internal/teststorj" "storj.io/storj/pkg/overlay" - "storj.io/storj/pkg/pb" + "storj.io/storj/pkg/statdb" "storj.io/storj/pkg/storj" "storj.io/storj/storage" "storj.io/storj/storage/boltdb" @@ -29,15 +29,15 @@ var ( invalid2ID = teststorj.NodeIDFromString("invalid2") ) -func testCache(ctx context.Context, t *testing.T, store storage.KeyValueStore) { - cache := overlay.Cache{DB: store} +func testCache(ctx context.Context, t *testing.T, store storage.KeyValueStore, sdb *statdb.StatDB) { + cache := overlay.Cache{DB: store, StatDB: sdb} { // Put - err := cache.Put(valid1ID, pb.Node{Address: &pb.NodeAddress{Transport: pb.NodeTransport_TCP_TLS_GRPC, Address: "127.0.0.1:9001"}}) + err := cache.Put(ctx, valid1ID, *teststorj.MockNode("valid1")) if err != nil { t.Fatal(err) } - err = cache.Put(valid2ID, pb.Node{Address: &pb.NodeAddress{Transport: pb.NodeTransport_TCP_TLS_GRPC, Address: "127.0.0.1:9002"}}) + err = cache.Put(ctx, valid2ID, *teststorj.MockNode("valid2")) if err != nil { t.Fatal(err) } @@ -45,9 +45,8 @@ func testCache(ctx context.Context, t *testing.T, store storage.KeyValueStore) { { // Get valid2, err := cache.Get(ctx, valid2ID) - if assert.NoError(t, err) { - assert.Equal(t, valid2.Address.Address, "127.0.0.1:9002") - } + assert.NoError(t, err) + assert.Equal(t, valid2.Id, valid2ID) invalid2, err := cache.Get(ctx, invalid2ID) assert.Error(t, err) @@ -62,23 +61,20 @@ func testCache(ctx context.Context, t *testing.T, store storage.KeyValueStore) { { // GetAll nodes, err := cache.GetAll(ctx, storj.NodeIDList{valid2ID, valid1ID, valid2ID}) - if assert.NoError(t, err) { - assert.Equal(t, nodes[0].Address.Address, "127.0.0.1:9002") - assert.Equal(t, nodes[1].Address.Address, "127.0.0.1:9001") - assert.Equal(t, nodes[2].Address.Address, "127.0.0.1:9002") - } + assert.NoError(t, err) + assert.Equal(t, nodes[0].Id, valid2ID) + assert.Equal(t, nodes[1].Id, valid1ID) + assert.Equal(t, nodes[2].Id, valid2ID) nodes, err = cache.GetAll(ctx, storj.NodeIDList{valid1ID, invalid1ID}) - if assert.NoError(t, err) { - assert.Equal(t, nodes[0].Address.Address, "127.0.0.1:9001") - assert.Nil(t, nodes[1]) - } + assert.NoError(t, err) + assert.Equal(t, nodes[0].Id, valid1ID) + assert.Nil(t, nodes[1]) nodes, err = cache.GetAll(ctx, make(storj.NodeIDList, 2)) - if assert.NoError(t, err) { - assert.Nil(t, nodes[0]) - assert.Nil(t, nodes[1]) - } + assert.NoError(t, err) + assert.Nil(t, nodes[0]) + assert.Nil(t, nodes[1]) _, err = cache.GetAll(ctx, storj.NodeIDList{}) assert.True(t, overlay.OverlayError.Has(err)) @@ -95,6 +91,14 @@ func TestCache_Redis(t *testing.T) { ctx := testcontext.New(t) defer ctx.Cleanup() + planet, err := testplanet.New(t, 1, 4, 0) + if err != nil { + t.Fatal(err) + } + defer ctx.Check(planet.Shutdown) + planet.Start(ctx) + sdb := planet.Satellites[0].StatDB + redisAddr, cleanup, err := redisserver.Start() if err != nil { t.Fatal(err) @@ -107,27 +111,43 @@ func TestCache_Redis(t *testing.T) { } defer ctx.Check(store.Close) - testCache(ctx, t, store) + testCache(ctx, t, store, sdb) } func TestCache_Bolt(t *testing.T) { ctx := testcontext.New(t) defer ctx.Cleanup() + planet, err := testplanet.New(t, 1, 4, 0) + if err != nil { + t.Fatal(err) + } + defer ctx.Check(planet.Shutdown) + planet.Start(ctx) + sdb := planet.Satellites[0].StatDB + client, err := boltdb.New(ctx.File("overlay.db"), "overlay") if err != nil { t.Fatal(err) } defer ctx.Check(client.Close) - testCache(ctx, t, client) + testCache(ctx, t, client, sdb) } func TestCache_Store(t *testing.T) { ctx := testcontext.New(t) defer ctx.Cleanup() - testCache(ctx, t, teststore.New()) + planet, err := testplanet.New(t, 1, 4, 0) + if err != nil { + t.Fatal(err) + } + defer ctx.Check(planet.Shutdown) + planet.Start(ctx) + sdb := planet.Satellites[0].StatDB + + testCache(ctx, t, teststore.New(), sdb) } func TestCache_Refresh(t *testing.T) { diff --git a/pkg/overlay/client.go b/pkg/overlay/client.go index 0fadd31e3..e32f9bd02 100644 --- a/pkg/overlay/client.go +++ b/pkg/overlay/client.go @@ -39,9 +39,13 @@ type Overlay struct { // Options contains parameters for selecting nodes type Options struct { - Amount int - Space int64 - Excluded storj.NodeIDList + Amount int + Space int64 + Uptime float64 + UptimeCount int64 + AuditSuccess float64 + AuditCount int64 + Excluded storj.NodeIDList } // NewOverlayClient returns a new intialized Overlay Client @@ -70,8 +74,14 @@ func (o *Overlay) Choose(ctx context.Context, op Options) ([]*pb.Node, error) { // TODO(coyle): We will also need to communicate with the reputation service here resp, err := o.client.FindStorageNodes(ctx, &pb.FindStorageNodesRequest{ Opts: &pb.OverlayOptions{ - Amount: int64(op.Amount), - Restrictions: &pb.NodeRestrictions{FreeDisk: op.Space}, + Amount: int64(op.Amount), + Restrictions: &pb.NodeRestrictions{FreeDisk: op.Space}, + MinStats: &pb.NodeStats{ + UptimeRatio: op.Uptime, + UptimeCount: op.UptimeCount, + AuditSuccessRatio: op.AuditSuccess, + AuditCount: op.AuditCount, + }, ExcludedNodes: exIDs, }, }) diff --git a/pkg/overlay/client_test.go b/pkg/overlay/client_test.go index 225247cc7..0957f46e4 100644 --- a/pkg/overlay/client_test.go +++ b/pkg/overlay/client_test.go @@ -1,28 +1,23 @@ // Copyright (C) 2018 Storj Labs, Inc. // See LICENSE for copying information. -package overlay +package overlay_test import ( - "context" - "net" "testing" + "time" - "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/assert" - "google.golang.org/grpc" "storj.io/storj/internal/identity" "storj.io/storj/internal/testcontext" + "storj.io/storj/internal/testplanet" "storj.io/storj/internal/teststorj" + "storj.io/storj/pkg/overlay" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/storj" - "storj.io/storj/storage" - "storj.io/storj/storage/teststore" ) -var fooID = teststorj.NodeIDFromString("foo") - func TestNewOverlayClient(t *testing.T) { ctx := testcontext.New(t) defer ctx.Cleanup() @@ -41,14 +36,12 @@ func TestNewOverlayClient(t *testing.T) { identity, err := ca.NewIdentity() assert.NoError(t, err) - oc, err := NewOverlayClient(identity, v.address) + oc, err := overlay.NewOverlayClient(identity, v.address) assert.NoError(t, err) assert.NotNil(t, oc) - overlay, ok := oc.(*Overlay) + _, ok := oc.(*overlay.Overlay) assert.True(t, ok) - assert.NotEmpty(t, overlay.client) - } } @@ -56,15 +49,29 @@ func TestChoose(t *testing.T) { ctx := testcontext.New(t) defer ctx.Cleanup() + planet, cleanup := getPlanet(ctx, t) + defer cleanup() + oc := getOverlayClient(t, planet) + cases := []struct { - limit int - space int64 - allNodes []*pb.Node - excluded storj.NodeIDList + limit int + space int64 + bandwidth int64 + uptime float64 + uptimeCount int64 + auditSuccess float64 + auditCount int64 + allNodes []*pb.Node + excluded storj.NodeIDList }{ { - limit: 4, - space: 0, + limit: 4, + space: 0, + bandwidth: 0, + uptime: 0, + uptimeCount: 0, + auditSuccess: 0, + auditCount: 0, allNodes: func() []*pb.Node { n1 := teststorj.MockNode("n1") n2 := teststorj.MockNode("n2") @@ -91,47 +98,31 @@ func TestChoose(t *testing.T) { } for _, v := range cases { - lis, err := net.Listen("tcp", "127.0.0.1:0") + newNodes, err := oc.Choose(ctx, overlay.Options{ + Amount: v.limit, + Space: v.space, + Uptime: v.uptime, + UptimeCount: v.uptimeCount, + AuditSuccess: v.auditSuccess, + AuditCount: v.auditCount, + Excluded: v.excluded, + }) assert.NoError(t, err) - var listItems []storage.ListItem - for _, n := range v.allNodes { - data, err := proto.Marshal(n) - assert.NoError(t, err) - listItems = append(listItems, storage.ListItem{ - Key: n.Id.Bytes(), - Value: data, - }) + excludedNodes := make(map[storj.NodeID]bool) + for _, e := range v.excluded { + excludedNodes[e] = true } + assert.Len(t, newNodes, v.limit) + for _, n := range newNodes { + assert.NotContains(t, excludedNodes, n.Id) + assert.True(t, n.GetRestrictions().GetFreeDisk() >= v.space) + assert.True(t, n.GetRestrictions().GetFreeBandwidth() >= v.bandwidth) + assert.True(t, n.GetReputation().GetUptimeRatio() >= v.uptime) + assert.True(t, n.GetReputation().GetUptimeCount() >= v.uptimeCount) + assert.True(t, n.GetReputation().GetAuditSuccessRatio() >= v.auditSuccess) + assert.True(t, n.GetReputation().GetAuditCount() >= v.auditCount) - ca, err := testidentity.NewTestCA(ctx) - assert.NoError(t, err) - identity, err := ca.NewIdentity() - assert.NoError(t, err) - - srv := NewMockServer(listItems, func() grpc.ServerOption { - opt, err := identity.ServerOption() - assert.NoError(t, err) - return opt - }()) - - go func() { assert.NoError(t, srv.Serve(lis)) }() - defer srv.Stop() - - oc, err := NewOverlayClient(identity, lis.Addr().String()) - assert.NoError(t, err) - - assert.NotNil(t, oc) - overlay, ok := oc.(*Overlay) - assert.True(t, ok) - assert.NotEmpty(t, overlay.client) - - newNodes, err := oc.Choose(ctx, Options{Amount: v.limit, Space: v.space, Excluded: v.excluded}) - assert.NoError(t, err) - for _, new := range newNodes { - for _, ex := range v.excluded { - assert.NotEqual(t, ex.String(), new.Id) - } } } } @@ -140,41 +131,34 @@ func TestLookup(t *testing.T) { ctx := testcontext.New(t) defer ctx.Cleanup() + planet, cleanup := getPlanet(ctx, t) + defer cleanup() + oc := getOverlayClient(t, planet) + + nid1 := planet.StorageNodes[0].ID() + cases := []struct { - nodeID storj.NodeID - expectedCalls int + nodeID storj.NodeID + expectErr bool }{ { - nodeID: fooID, - expectedCalls: 1, + nodeID: nid1, + expectErr: false, + }, + { + nodeID: teststorj.NodeIDFromString("n1"), + expectErr: true, }, } for _, v := range cases { - lis, err := net.Listen("tcp", "127.0.0.1:0") - assert.NoError(t, err) - - srv, mock, err := newTestServer(ctx) - assert.NoError(t, err) - go func() { assert.NoError(t, srv.Serve(lis)) }() - defer srv.Stop() - - ca, err := testidentity.NewTestCA(ctx) - assert.NoError(t, err) - identity, err := ca.NewIdentity() - assert.NoError(t, err) - - oc, err := NewOverlayClient(identity, lis.Addr().String()) - assert.NoError(t, err) - - assert.NotNil(t, oc) - overlay, ok := oc.(*Overlay) - assert.True(t, ok) - assert.NotEmpty(t, overlay.client) - - _, err = oc.Lookup(ctx, v.nodeID) - assert.NoError(t, err) - assert.Equal(t, mock.lookupCalled, v.expectedCalls) + n, err := oc.Lookup(ctx, v.nodeID) + if v.expectErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, n.Id.String(), v.nodeID.String()) + } } } @@ -183,40 +167,30 @@ func TestBulkLookup(t *testing.T) { ctx := testcontext.New(t) defer ctx.Cleanup() + planet, cleanup := getPlanet(ctx, t) + defer cleanup() + oc := getOverlayClient(t, planet) + + nid1 := planet.StorageNodes[0].ID() + nid2 := planet.StorageNodes[1].ID() + nid3 := planet.StorageNodes[2].ID() + cases := []struct { nodeIDs storj.NodeIDList expectedCalls int }{ { - nodeIDs: storj.NodeIDList{fooID, fooID, fooID}, + nodeIDs: storj.NodeIDList{nid1, nid2, nid3}, expectedCalls: 1, }, } for _, v := range cases { - lis, err := net.Listen("tcp", "127.0.0.1:0") + resNodes, err := oc.BulkLookup(ctx, v.nodeIDs) assert.NoError(t, err) - - srv, mock, err := newTestServer(ctx) - assert.NoError(t, err) - go func() { assert.NoError(t, srv.Serve(lis)) }() - defer srv.Stop() - - ca, err := testidentity.NewTestCA(ctx) - assert.NoError(t, err) - identity, err := ca.NewIdentity() - assert.NoError(t, err) - - oc, err := NewOverlayClient(identity, lis.Addr().String()) - assert.NoError(t, err) - - assert.NotNil(t, oc) - overlay, ok := oc.(*Overlay) - assert.True(t, ok) - assert.NotEmpty(t, overlay.client) - - _, err = oc.BulkLookup(ctx, v.nodeIDs) - assert.NoError(t, err) - assert.Equal(t, mock.bulkLookupCalled, v.expectedCalls) + for i, n := range resNodes { + assert.Equal(t, n.Id, v.nodeIDs[i]) + } + assert.Equal(t, len(resNodes), len(v.nodeIDs)) } } @@ -224,33 +198,17 @@ func TestBulkLookupV2(t *testing.T) { ctx := testcontext.New(t) defer ctx.Cleanup() - lis, err := net.Listen("tcp", "127.0.0.1:0") - assert.NoError(t, err) + planet, cleanup := getPlanet(ctx, t) + defer cleanup() + oc := getOverlayClient(t, planet) - srv, s, err := newServer(ctx) - - assert.NoError(t, err) - go func() { assert.NoError(t, srv.Serve(lis)) }() - defer srv.Stop() - - ca, err := testidentity.NewTestCA(ctx) - assert.NoError(t, err) - identity, err := ca.NewIdentity() - assert.NoError(t, err) - - oc, err := NewOverlayClient(identity, lis.Addr().String()) - assert.NoError(t, err) - - assert.NotNil(t, oc) - overlay, ok := oc.(*Overlay) - assert.True(t, ok) - assert.NotEmpty(t, overlay.client) + cache := planet.Satellites[0].Overlay n1 := teststorj.MockNode("n1") n2 := teststorj.MockNode("n2") n3 := teststorj.MockNode("n3") nodes := []*pb.Node{n1, n2, n3} for _, n := range nodes { - assert.NoError(t, s.cache.Put(n.Id, *n)) + assert.NoError(t, cache.Put(ctx, n.Id, *n)) } nid1 := teststorj.NodeIDFromString("n1") @@ -265,86 +223,61 @@ func TestBulkLookupV2(t *testing.T) { } { // valid ids - ns, err := oc.BulkLookup(ctx, storj.NodeIDList{nid1, nid2, nid3}) + idList := storj.NodeIDList{nid1, nid2, nid3} + ns, err := oc.BulkLookup(ctx, idList) assert.NoError(t, err) - assert.Equal(t, nodes, ns) + + for i, n := range ns { + assert.Equal(t, n.Id, idList[i]) + } } { // missing ids - ns, err := oc.BulkLookup(ctx, storj.NodeIDList{nid4, nid5}) + idList := storj.NodeIDList{nid4, nid5} + ns, err := oc.BulkLookup(ctx, idList) assert.NoError(t, err) + assert.Equal(t, []*pb.Node{nil, nil}, ns) } { // different order and missing - ns, err := oc.BulkLookup(ctx, storj.NodeIDList{nid3, nid4, nid1, nid2, nid5}) + idList := storj.NodeIDList{nid3, nid4, nid1, nid2, nid5} + ns, err := oc.BulkLookup(ctx, idList) assert.NoError(t, err) - assert.Equal(t, []*pb.Node{n3, nil, n1, n2, nil}, ns) + + expectedNodes := []*pb.Node{n3, nil, n1, n2, nil} + for i, n := range ns { + if n == nil { + assert.Nil(t, expectedNodes[i]) + } else { + assert.Equal(t, n.Id, expectedNodes[i].Id) + } + } } } -func newServer(ctx context.Context) (*grpc.Server, *Server, error) { - ca, err := testidentity.NewTestCA(ctx) +func getPlanet(ctx *testcontext.Context, t *testing.T) (planet *testplanet.Planet, f func()) { + planet, err := testplanet.New(t, 1, 4, 1) if err != nil { - return nil, nil, err + t.Fatal(err) } - identity, err := ca.NewIdentity() + + planet.Start(ctx) + // we wait a second for all the nodes to complete bootstrapping off the satellite + time.Sleep(2 * time.Second) + + f = func() { + ctx.Check(planet.Shutdown) + } + + return planet, f +} + +func getOverlayClient(t *testing.T, planet *testplanet.Planet) (oc overlay.Client) { + oc, err := planet.Uplinks[0].DialOverlay(planet.Satellites[0]) if err != nil { - return nil, nil, err - } - identOpt, err := identity.ServerOption() - if err != nil { - return nil, nil, err + t.Fatal(err) } - grpcServer := grpc.NewServer(identOpt) - s := &Server{cache: NewOverlayCache(teststore.New(), nil, nil)} - - pb.RegisterOverlayServer(grpcServer, s) - - return grpcServer, s, nil -} - -func newTestServer(ctx context.Context) (*grpc.Server, *mockOverlayServer, error) { - ca, err := testidentity.NewTestCA(ctx) - if err != nil { - return nil, nil, err - } - identity, err := ca.NewIdentity() - if err != nil { - return nil, nil, err - } - identOpt, err := identity.ServerOption() - if err != nil { - return nil, nil, err - } - - grpcServer := grpc.NewServer(identOpt) - mo := &mockOverlayServer{lookupCalled: 0, FindStorageNodesCalled: 0} - - pb.RegisterOverlayServer(grpcServer, mo) - - return grpcServer, mo, nil - -} - -type mockOverlayServer struct { - lookupCalled int - bulkLookupCalled int - FindStorageNodesCalled int -} - -func (o *mockOverlayServer) Lookup(ctx context.Context, req *pb.LookupRequest) (*pb.LookupResponse, error) { - o.lookupCalled++ - return &pb.LookupResponse{}, nil -} - -func (o *mockOverlayServer) FindStorageNodes(ctx context.Context, req *pb.FindStorageNodesRequest) (*pb.FindStorageNodesResponse, error) { - o.FindStorageNodesCalled++ - return &pb.FindStorageNodesResponse{}, nil -} - -func (o *mockOverlayServer) BulkLookup(ctx context.Context, reqs *pb.LookupRequests) (*pb.LookupResponses, error) { - o.bulkLookupCalled++ - return &pb.LookupResponses{}, nil + return oc } diff --git a/pkg/overlay/server.go b/pkg/overlay/server.go index 620c3da77..b158196a7 100644 --- a/pkg/overlay/server.go +++ b/pkg/overlay/server.go @@ -75,23 +75,32 @@ func (o *Server) FindStorageNodes(ctx context.Context, req *pb.FindStorageNodesR excluded := opts.ExcludedNodes restrictions := opts.GetRestrictions() - restrictedBandwidth := restrictions.GetFreeBandwidth() - restrictedSpace := restrictions.GetFreeDisk() + reputation := opts.GetMinStats() var startID storj.NodeID result := []*pb.Node{} for { var nodes []*pb.Node - nodes, startID, err = o.populate(ctx, req.Start, maxNodes, restrictedBandwidth, restrictedSpace, excluded) + nodes, startID, err = o.populate(ctx, req.Start, maxNodes, restrictions, reputation, excluded) if err != nil { return nil, Error.Wrap(err) } - if len(nodes) <= 0 { + resultNodes := []*pb.Node{} + usedAddrs := make(map[string]bool) + for _, n := range nodes { + addr := n.Address.GetAddress() + excluded = append(excluded, n.Id) // exclude all nodes on next iteration + if !usedAddrs[addr] { + resultNodes = append(resultNodes, n) + usedAddrs[addr] = true + } + } + if len(resultNodes) <= 0 { break } - result = append(result, nodes...) + result = append(result, resultNodes...) if len(result) >= int(maxNodes) || startID == (storj.NodeID{}) { break @@ -132,7 +141,10 @@ func (o *Server) getNodes(ctx context.Context, keys storage.Keys) ([]*pb.Node, e } -func (o *Server) populate(ctx context.Context, startID storj.NodeID, maxNodes, restrictedBandwidth, restrictedSpace int64, excluded storj.NodeIDList) ([]*pb.Node, storj.NodeID, error) { +func (o *Server) populate(ctx context.Context, startID storj.NodeID, maxNodes int64, + minRestrictions *pb.NodeRestrictions, minReputation *pb.NodeStats, + excluded storj.NodeIDList) ([]*pb.Node, storj.NodeID, error) { + limit := int(maxNodes * 2) keys, err := o.cache.DB.List(startID.Bytes(), limit) if err != nil { @@ -158,11 +170,16 @@ func (o *Server) populate(ctx context.Context, startID storj.NodeID, maxNodes, r continue } - rest := v.GetRestrictions() - if rest.GetFreeBandwidth() < restrictedBandwidth || rest.GetFreeDisk() < restrictedSpace { - continue - } - if contains(excluded, v.Id) { + nodeRestrictions := v.GetRestrictions() + nodeReputation := v.GetReputation() + + if nodeRestrictions.GetFreeBandwidth() < minRestrictions.GetFreeBandwidth() || + nodeRestrictions.GetFreeDisk() < minRestrictions.GetFreeDisk() || + nodeReputation.GetUptimeRatio() < minReputation.GetUptimeRatio() || + nodeReputation.GetUptimeCount() < minReputation.GetUptimeCount() || + nodeReputation.GetAuditSuccessRatio() < minReputation.GetAuditSuccessRatio() || + nodeReputation.GetAuditCount() < minReputation.GetAuditCount() || + contains(excluded, v.Id) { continue } result = append(result, v) diff --git a/pkg/overlay/utils_test.go b/pkg/overlay/utils_test.go deleted file mode 100644 index fdc6fb36c..000000000 --- a/pkg/overlay/utils_test.go +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright (C) 2018 Storj Labs, Inc. -// See LICENSE for copying information. - -package overlay - -import ( - "go.uber.org/zap" - "google.golang.org/grpc" - monkit "gopkg.in/spacemonkeygo/monkit.v2" - - "storj.io/storj/pkg/kademlia" - "storj.io/storj/pkg/pb" - "storj.io/storj/storage" - "storj.io/storj/storage/teststore" -) - -// NewMockServer provides a mock grpc server for testing -func NewMockServer(items []storage.ListItem, opts ...grpc.ServerOption) *grpc.Server { - grpcServer := grpc.NewServer(opts...) - - registry := monkit.Default - - k := kademlia.NewMockKademlia() - - c := &Cache{ - DB: teststore.New(), - DHT: k, - } - - _ = storage.PutAll(c.DB, items...) - - s := Server{ - dht: k, - cache: c, - logger: zap.NewNop(), - metrics: registry, - } - pb.RegisterOverlayServer(grpcServer, &s) - - return grpcServer -} diff --git a/pkg/pb/node.pb.go b/pkg/pb/node.pb.go index 4974b194c..08ddc922a 100644 --- a/pkg/pb/node.pb.go +++ b/pkg/pb/node.pb.go @@ -111,6 +111,7 @@ func (m *NodeRestrictions) GetFreeDisk() int64 { return 0 } +// TODO move statdb.Update() stuff out of here // Node represents a node in the overlay network // Node is info for a updating a single storagenode, used in the Update rpc calls type Node struct { @@ -118,13 +119,14 @@ type Node struct { Address *NodeAddress `protobuf:"bytes,2,opt,name=address" json:"address,omitempty"` Type NodeType `protobuf:"varint,3,opt,name=type,proto3,enum=node.NodeType" json:"type,omitempty"` Restrictions *NodeRestrictions `protobuf:"bytes,4,opt,name=restrictions" json:"restrictions,omitempty"` - Metadata *NodeMetadata `protobuf:"bytes,5,opt,name=metadata" json:"metadata,omitempty"` - LatencyList []int64 `protobuf:"varint,6,rep,packed,name=latency_list,json=latencyList" json:"latency_list,omitempty"` - AuditSuccess bool `protobuf:"varint,7,opt,name=audit_success,json=auditSuccess,proto3" json:"audit_success,omitempty"` - IsUp bool `protobuf:"varint,8,opt,name=is_up,json=isUp,proto3" json:"is_up,omitempty"` - UpdateLatency bool `protobuf:"varint,9,opt,name=update_latency,json=updateLatency,proto3" json:"update_latency,omitempty"` - UpdateAuditSuccess bool `protobuf:"varint,10,opt,name=update_audit_success,json=updateAuditSuccess,proto3" json:"update_audit_success,omitempty"` - UpdateUptime bool `protobuf:"varint,11,opt,name=update_uptime,json=updateUptime,proto3" json:"update_uptime,omitempty"` + Reputation *NodeStats `protobuf:"bytes,5,opt,name=reputation" json:"reputation,omitempty"` + Metadata *NodeMetadata `protobuf:"bytes,6,opt,name=metadata" json:"metadata,omitempty"` + LatencyList []int64 `protobuf:"varint,7,rep,packed,name=latency_list,json=latencyList" json:"latency_list,omitempty"` + AuditSuccess bool `protobuf:"varint,8,opt,name=audit_success,json=auditSuccess,proto3" json:"audit_success,omitempty"` + IsUp bool `protobuf:"varint,9,opt,name=is_up,json=isUp,proto3" json:"is_up,omitempty"` + UpdateLatency bool `protobuf:"varint,10,opt,name=update_latency,json=updateLatency,proto3" json:"update_latency,omitempty"` + UpdateAuditSuccess bool `protobuf:"varint,11,opt,name=update_audit_success,json=updateAuditSuccess,proto3" json:"update_audit_success,omitempty"` + UpdateUptime bool `protobuf:"varint,12,opt,name=update_uptime,json=updateUptime,proto3" json:"update_uptime,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -175,6 +177,13 @@ func (m *Node) GetRestrictions() *NodeRestrictions { return nil } +func (m *Node) GetReputation() *NodeStats { + if m != nil { + return m.Reputation + } + return nil +} + func (m *Node) GetMetadata() *NodeMetadata { if m != nil { return m.Metadata diff --git a/pkg/pb/node.proto b/pkg/pb/node.proto index 8e14735b2..2f1482e59 100644 --- a/pkg/pb/node.proto +++ b/pkg/pb/node.proto @@ -14,6 +14,7 @@ message NodeRestrictions { int64 free_disk = 2; } +// TODO move statdb.Update() stuff out of here // Node represents a node in the overlay network // Node is info for a updating a single storagenode, used in the Update rpc calls message Node { @@ -21,13 +22,14 @@ message Node { NodeAddress address = 2; NodeType type = 3; NodeRestrictions restrictions = 4; - NodeMetadata metadata = 5; - repeated int64 latency_list = 6; - bool audit_success = 7; - bool is_up = 8; - bool update_latency = 9; - bool update_audit_success = 10; - bool update_uptime = 11; + NodeStats reputation = 5; + NodeMetadata metadata = 6; + repeated int64 latency_list = 7; + bool audit_success = 8; + bool is_up = 9; + bool update_latency = 10; + bool update_audit_success = 11; + bool update_uptime = 12; } // NodeType is an enum of possible node types diff --git a/pkg/statdb/statdb.go b/pkg/statdb/statdb.go index 0618dba5d..205312245 100644 --- a/pkg/statdb/statdb.go +++ b/pkg/statdb/statdb.go @@ -98,9 +98,10 @@ func (s *StatDB) Create(ctx context.Context, createReq *pb.CreateRequest) (resp nodeStats := &pb.NodeStats{ NodeId: node.Id, - AuditCount: dbNode.TotalAuditCount, AuditSuccessRatio: dbNode.AuditSuccessRatio, + AuditCount: dbNode.TotalAuditCount, UptimeRatio: dbNode.UptimeRatio, + UptimeCount: dbNode.TotalUptimeCount, } return &pb.CreateResponse{ Stats: nodeStats, @@ -118,9 +119,10 @@ func (s *StatDB) Get(ctx context.Context, getReq *pb.GetRequest) (resp *pb.GetRe nodeStats := &pb.NodeStats{ NodeId: getReq.NodeId, - AuditCount: dbNode.TotalAuditCount, AuditSuccessRatio: dbNode.AuditSuccessRatio, + AuditCount: dbNode.TotalAuditCount, UptimeRatio: dbNode.UptimeRatio, + UptimeCount: dbNode.TotalUptimeCount, } return &pb.GetResponse{ Stats: nodeStats, @@ -246,7 +248,9 @@ func (s *StatDB) Update(ctx context.Context, updateReq *pb.UpdateRequest) (resp nodeStats := &pb.NodeStats{ NodeId: node.Id, AuditSuccessRatio: dbNode.AuditSuccessRatio, + AuditCount: dbNode.TotalAuditCount, UptimeRatio: dbNode.UptimeRatio, + UptimeCount: dbNode.TotalUptimeCount, } return &pb.UpdateResponse{ Stats: nodeStats, @@ -288,8 +292,9 @@ func (s *StatDB) UpdateUptime(ctx context.Context, updateReq *pb.UpdateUptimeReq nodeStats := &pb.NodeStats{ NodeId: node.Id, AuditSuccessRatio: dbNode.AuditSuccessRatio, - UptimeRatio: dbNode.UptimeRatio, AuditCount: dbNode.TotalAuditCount, + UptimeRatio: dbNode.UptimeRatio, + UptimeCount: dbNode.TotalUptimeCount, } return &pb.UpdateUptimeResponse{ Stats: nodeStats, @@ -331,8 +336,9 @@ func (s *StatDB) UpdateAuditSuccess(ctx context.Context, updateReq *pb.UpdateAud nodeStats := &pb.NodeStats{ NodeId: node.Id, AuditSuccessRatio: dbNode.AuditSuccessRatio, - UptimeRatio: dbNode.UptimeRatio, AuditCount: dbNode.TotalAuditCount, + UptimeRatio: dbNode.UptimeRatio, + UptimeCount: dbNode.TotalUptimeCount, } return &pb.UpdateAuditSuccessResponse{ Stats: nodeStats,