Cleanup overlay methods and names. (#914)

This commit is contained in:
Egon Elbre 2018-12-20 15:57:54 +02:00 committed by GitHub
parent e1c3f11cfa
commit d9b9ae6ffa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 86 additions and 106 deletions

View File

@ -30,5 +30,5 @@ func (c cacheConfig) open(ctx context.Context) (cache *overlay.Cache, dbClose fu
} }
} }
return overlay.NewOverlayCache(database.OverlayCache(), nil, database.StatDB()), dbClose, nil return overlay.NewCache(database.OverlayCache(), database.StatDB()), dbClose, nil
} }

View File

@ -50,13 +50,13 @@ func init() {
func cmdList(cmd *cobra.Command, args []string) (err error) { func cmdList(cmd *cobra.Command, args []string) (err error) {
ctx := process.Ctx(cmd) ctx := process.Ctx(cmd)
c, dbClose, err := cacheCfg.open(ctx) cache, dbClose, err := cacheCfg.open(ctx)
if err != nil { if err != nil {
return err return err
} }
defer dbClose() defer dbClose()
keys, err := c.DB.List(nil, 0) keys, err := cache.Inspect(ctx)
if err != nil { if err != nil {
return err return err
} }
@ -71,7 +71,7 @@ func cmdList(cmd *cobra.Command, args []string) (err error) {
fmt.Fprintln(w, "Node ID\t Address") fmt.Fprintln(w, "Node ID\t Address")
for _, id := range nodeIDs { for _, id := range nodeIDs {
n, err := c.Get(process.Ctx(cmd), id) n, err := cache.Get(process.Ctx(cmd), id)
if err != nil { if err != nil {
fmt.Fprintln(w, id.String(), "\t", "error getting value") fmt.Fprintln(w, id.String(), "\t", "error getting value")
} }
@ -97,7 +97,7 @@ func cmdAdd(cmd *cobra.Command, args []string) (err error) {
return errs.Wrap(err) return errs.Wrap(err)
} }
c, dbClose, err := cacheCfg.open(ctx) cache, dbClose, err := cacheCfg.open(ctx)
if err != nil { if err != nil {
return err return err
} }
@ -109,7 +109,7 @@ func cmdAdd(cmd *cobra.Command, args []string) (err error) {
zap.S().Error(err) zap.S().Error(err)
} }
fmt.Printf("adding node ID: %s; Address: %s", i, a) fmt.Printf("adding node ID: %s; Address: %s", i, a)
err = c.Put(process.Ctx(cmd), id, pb.Node{ err = cache.Put(process.Ctx(cmd), id, pb.Node{
Id: id, Id: id,
// TODO: NodeType is missing // TODO: NodeType is missing
Address: &pb.NodeAddress{ Address: &pb.NodeAddress{

View File

@ -179,7 +179,7 @@ func (node *Node) initOverlay(planet *Planet) error {
node.StatDB = node.Database.StatDB() node.StatDB = node.Database.StatDB()
node.Overlay = overlay.NewOverlayCache(teststore.New(), node.Kademlia, node.StatDB) node.Overlay = overlay.NewCache(teststore.New(), node.StatDB)
node.Discovery = discovery.NewDiscovery(node.Overlay, node.Kademlia, node.StatDB) node.Discovery = discovery.NewDiscovery(node.Overlay, node.Kademlia, node.StatDB)
return nil return nil

View File

@ -121,7 +121,7 @@ func New(t zaptest.TestingT, satelliteCount, storageNodeCount, uplinkCount int)
AuditCount: 0, AuditCount: 0,
} }
overlayServer := overlay.NewServer(node.Log.Named("overlay"), node.Overlay, node.Kademlia, ns) overlayServer := overlay.NewServer(node.Log.Named("overlay"), node.Overlay, ns)
pb.RegisterOverlayServer(node.Provider.GRPC(), overlayServer) pb.RegisterOverlayServer(node.Provider.GRPC(), overlayServer)
node.Dependencies = append(node.Dependencies, node.Dependencies = append(node.Dependencies,

View File

@ -16,7 +16,7 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/grpc" "google.golang.org/grpc"
"storj.io/storj/internal/identity" testidentity "storj.io/storj/internal/identity"
"storj.io/storj/internal/teststorj" "storj.io/storj/internal/teststorj"
"storj.io/storj/pkg/auth" "storj.io/storj/pkg/auth"
"storj.io/storj/pkg/overlay" "storj.io/storj/pkg/overlay"
@ -129,7 +129,7 @@ func TestAuditSegment(t *testing.T) {
db := teststore.New() db := teststore.New()
c := pointerdb.Config{MaxInlineSegmentSize: 8000} c := pointerdb.Config{MaxInlineSegmentSize: 8000}
cache := overlay.NewOverlayCache(teststore.New(), nil, nil) cache := overlay.NewCache(teststore.New(), nil)
pdbw := newPointerDBWrapper(pointerdb.NewServer(db, cache, zap.NewNop(), c, identity)) pdbw := newPointerDBWrapper(pointerdb.NewServer(db, cache, zap.NewNop(), c, identity))
pointers := pdbclient.New(pdbw) pointers := pdbclient.New(pdbw)

View File

@ -38,7 +38,7 @@ func (c Config) Run(ctx context.Context, server *provider.Provider) (err error)
if err != nil { if err != nil {
return err return err
} }
overlay, err := overlay.NewOverlayClient(identity, c.SatelliteAddr) overlay, err := overlay.NewClient(identity, c.SatelliteAddr)
if err != nil { if err != nil {
return err return err
} }

View File

@ -13,7 +13,7 @@ import (
"storj.io/storj/pkg/overlay" "storj.io/storj/pkg/overlay"
"storj.io/storj/pkg/pointerdb/pdbclient" "storj.io/storj/pkg/pointerdb/pdbclient"
"storj.io/storj/pkg/provider" "storj.io/storj/pkg/provider"
"storj.io/storj/pkg/storage/ec" ecclient "storj.io/storj/pkg/storage/ec"
"storj.io/storj/pkg/storage/segments" "storj.io/storj/pkg/storage/segments"
"storj.io/storj/storage/redis" "storj.io/storj/storage/redis"
) )
@ -63,7 +63,7 @@ func (c Config) getSegmentRepairer(ctx context.Context, identity *provider.FullI
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
var oc overlay.Client var oc overlay.Client
oc, err = overlay.NewOverlayClient(identity, c.OverlayAddr) oc, err = overlay.NewClient(identity, c.OverlayAddr)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -40,7 +40,7 @@ type Server struct {
// CountNodes returns the number of nodes in the cache and in kademlia // CountNodes returns the number of nodes in the cache and in kademlia
func (srv *Server) CountNodes(ctx context.Context, req *pb.CountNodesRequest) (*pb.CountNodesResponse, error) { func (srv *Server) CountNodes(ctx context.Context, req *pb.CountNodesRequest) (*pb.CountNodesResponse, error) {
overlayKeys, err := srv.cache.DB.List(nil, 0) overlayKeys, err := srv.cache.Inspect(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -126,7 +126,7 @@ func (c Config) action(ctx context.Context, cliCtx *cli.Context, identity *provi
func (c Config) GetMetainfo(ctx context.Context, identity *provider.FullIdentity) (db storj.Metainfo, ss streams.Store, err error) { func (c Config) GetMetainfo(ctx context.Context, identity *provider.FullIdentity) (db storj.Metainfo, ss streams.Store, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
oc, err := overlay.NewOverlayClient(identity, c.Client.OverlayAddr) oc, err := overlay.NewClient(identity, c.Client.OverlayAddr)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }

View File

@ -9,7 +9,6 @@ import (
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"github.com/zeebo/errs" "github.com/zeebo/errs"
"storj.io/storj/pkg/dht"
"storj.io/storj/pkg/pb" "storj.io/storj/pkg/pb"
"storj.io/storj/pkg/statdb" "storj.io/storj/pkg/statdb"
"storj.io/storj/pkg/storj" "storj.io/storj/pkg/storj"
@ -35,23 +34,27 @@ var OverlayError = errs.Class("Overlay Error")
// Cache is used to store overlay data in Redis // Cache is used to store overlay data in Redis
type Cache struct { type Cache struct {
DB storage.KeyValueStore db storage.KeyValueStore
DHT dht.DHT statDB statdb.DB
StatDB statdb.DB
} }
// NewOverlayCache returns a new Cache // NewCache returns a new Cache
func NewOverlayCache(db storage.KeyValueStore, dht dht.DHT, sdb statdb.DB) *Cache { func NewCache(db storage.KeyValueStore, sdb statdb.DB) *Cache {
return &Cache{DB: db, DHT: dht, StatDB: sdb} return &Cache{db: db, statDB: sdb}
}
// Inspect lists limited number of items in the cache
func (cache *Cache) Inspect(ctx context.Context) (storage.Keys, error) {
return cache.db.List(nil, 0)
} }
// Get looks up the provided nodeID from the overlay cache // Get looks up the provided nodeID from the overlay cache
func (o *Cache) Get(ctx context.Context, nodeID storj.NodeID) (*pb.Node, error) { func (cache *Cache) Get(ctx context.Context, nodeID storj.NodeID) (*pb.Node, error) {
if nodeID.IsZero() { if nodeID.IsZero() {
return nil, ErrEmptyNode return nil, ErrEmptyNode
} }
b, err := o.DB.Get(nodeID.Bytes()) b, err := cache.db.Get(nodeID.Bytes())
if err != nil { if err != nil {
if storage.ErrKeyNotFound.Has(err) { if storage.ErrKeyNotFound.Has(err) {
return nil, ErrNodeNotFound return nil, ErrNodeNotFound
@ -70,7 +73,7 @@ func (o *Cache) Get(ctx context.Context, nodeID storj.NodeID) (*pb.Node, error)
} }
// GetAll looks up the provided nodeIDs from the overlay cache // GetAll looks up the provided nodeIDs from the overlay cache
func (o *Cache) GetAll(ctx context.Context, nodeIDs storj.NodeIDList) ([]*pb.Node, error) { func (cache *Cache) GetAll(ctx context.Context, nodeIDs storj.NodeIDList) ([]*pb.Node, error) {
if len(nodeIDs) == 0 { if len(nodeIDs) == 0 {
return nil, OverlayError.New("no nodeIDs provided") return nil, OverlayError.New("no nodeIDs provided")
} }
@ -78,7 +81,7 @@ func (o *Cache) GetAll(ctx context.Context, nodeIDs storj.NodeIDList) ([]*pb.Nod
for _, v := range nodeIDs { for _, v := range nodeIDs {
ks = append(ks, v.Bytes()) ks = append(ks, v.Bytes())
} }
vs, err := o.DB.GetAll(ks) vs, err := cache.db.GetAll(ks)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -99,7 +102,7 @@ func (o *Cache) GetAll(ctx context.Context, nodeIDs storj.NodeIDList) ([]*pb.Nod
} }
// Put adds a nodeID to the redis cache with a binary representation of proto defined Node // Put adds a nodeID to the redis cache with a binary representation of proto defined Node
func (o *Cache) Put(ctx context.Context, nodeID storj.NodeID, value pb.Node) error { func (cache *Cache) Put(ctx context.Context, nodeID storj.NodeID, value pb.Node) error {
// If we get a Node without an ID (i.e. bootstrap node) // If we get a Node without an ID (i.e. bootstrap node)
// we don't want to add to the routing tbale // we don't want to add to the routing tbale
if nodeID.IsZero() { if nodeID.IsZero() {
@ -107,7 +110,7 @@ func (o *Cache) Put(ctx context.Context, nodeID storj.NodeID, value pb.Node) err
} }
// get existing node rep, or create a new statdb node with 0 rep // get existing node rep, or create a new statdb node with 0 rep
stats, err := o.StatDB.CreateEntryIfNotExists(ctx, nodeID) stats, err := cache.statDB.CreateEntryIfNotExists(ctx, nodeID)
if err != nil { if err != nil {
return err return err
} }
@ -125,5 +128,5 @@ func (o *Cache) Put(ctx context.Context, nodeID storj.NodeID, value pb.Node) err
return err return err
} }
return o.DB.Put(nodeID.Bytes(), data) return cache.db.Put(nodeID.Bytes(), data)
} }

View File

@ -31,7 +31,7 @@ func testCache(ctx context.Context, t *testing.T, store storage.KeyValueStore, s
_, _ = rand.Read(valid2ID[:]) _, _ = rand.Read(valid2ID[:])
_, _ = rand.Read(missingID[:]) _, _ = rand.Read(missingID[:])
cache := overlay.Cache{DB: store, StatDB: sdb} cache := overlay.NewCache(store, sdb)
{ // Put { // Put
err := cache.Put(ctx, valid1ID, pb.Node{Id: valid1ID}) err := cache.Put(ctx, valid1ID, pb.Node{Id: valid1ID})

View File

@ -32,9 +32,9 @@ type Client interface {
BulkLookup(ctx context.Context, nodeIDs storj.NodeIDList) ([]*pb.Node, error) BulkLookup(ctx context.Context, nodeIDs storj.NodeIDList) ([]*pb.Node, error)
} }
// Overlay is the overlay concrete implementation of the client interface // client is the overlay concrete implementation of the client interface
type Overlay struct { type client struct {
client pb.OverlayClient conn pb.OverlayClient
} }
// Options contains parameters for selecting nodes // Options contains parameters for selecting nodes
@ -49,31 +49,31 @@ type Options struct {
Excluded storj.NodeIDList Excluded storj.NodeIDList
} }
// NewOverlayClient returns a new intialized Overlay Client // NewClient returns a new intialized Overlay Client
func NewOverlayClient(identity *provider.FullIdentity, address string) (Client, error) { func NewClient(identity *provider.FullIdentity, address string) (Client, error) {
tc := transport.NewClient(identity) tc := transport.NewClient(identity)
conn, err := tc.DialAddress(context.Background(), address) conn, err := tc.DialAddress(context.Background(), address)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &Overlay{ return &client{
client: pb.NewOverlayClient(conn), conn: pb.NewOverlayClient(conn),
}, nil }, nil
} }
// NewClientFrom returns a new overlay.Client from a connection // NewClientFrom returns a new overlay.Client from a connection
func NewClientFrom(conn pb.OverlayClient) Client { return &Overlay{conn} } func NewClientFrom(conn pb.OverlayClient) Client { return &client{conn} }
// a compiler trick to make sure *Overlay implements Client // a compiler trick to make sure *client implements Client
var _ Client = (*Overlay)(nil) var _ Client = (*client)(nil)
// Choose implements the client.Choose interface // Choose returns nodes based on Options
func (o *Overlay) Choose(ctx context.Context, op Options) ([]*pb.Node, error) { func (client *client) Choose(ctx context.Context, op Options) ([]*pb.Node, error) {
var exIDs storj.NodeIDList var exIDs storj.NodeIDList
exIDs = append(exIDs, op.Excluded...) exIDs = append(exIDs, op.Excluded...)
// TODO(coyle): We will also need to communicate with the reputation service here // TODO(coyle): We will also need to communicate with the reputation service here
resp, err := o.client.FindStorageNodes(ctx, &pb.FindStorageNodesRequest{ resp, err := client.conn.FindStorageNodes(ctx, &pb.FindStorageNodesRequest{
Opts: &pb.OverlayOptions{ Opts: &pb.OverlayOptions{
Amount: int64(op.Amount), Amount: int64(op.Amount),
Restrictions: &pb.NodeRestrictions{FreeDisk: op.Space, FreeBandwidth: op.Bandwidth}, Restrictions: &pb.NodeRestrictions{FreeDisk: op.Space, FreeBandwidth: op.Bandwidth},
@ -88,8 +88,8 @@ func (o *Overlay) Choose(ctx context.Context, op Options) ([]*pb.Node, error) {
} }
// Lookup provides a Node with the given ID // Lookup provides a Node with the given ID
func (o *Overlay) Lookup(ctx context.Context, nodeID storj.NodeID) (*pb.Node, error) { func (client *client) Lookup(ctx context.Context, nodeID storj.NodeID) (*pb.Node, error) {
resp, err := o.client.Lookup(ctx, &pb.LookupRequest{NodeId: nodeID}) resp, err := client.conn.Lookup(ctx, &pb.LookupRequest{NodeId: nodeID})
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -98,12 +98,12 @@ func (o *Overlay) Lookup(ctx context.Context, nodeID storj.NodeID) (*pb.Node, er
} }
// BulkLookup provides a list of Nodes with the given IDs // BulkLookup provides a list of Nodes with the given IDs
func (o *Overlay) BulkLookup(ctx context.Context, nodeIDs storj.NodeIDList) ([]*pb.Node, error) { func (client *client) BulkLookup(ctx context.Context, nodeIDs storj.NodeIDList) ([]*pb.Node, error) {
var reqs pb.LookupRequests var reqs pb.LookupRequests
for _, v := range nodeIDs { for _, v := range nodeIDs {
reqs.LookupRequest = append(reqs.LookupRequest, &pb.LookupRequest{NodeId: v}) reqs.LookupRequest = append(reqs.LookupRequest, &pb.LookupRequest{NodeId: v})
} }
resp, err := o.client.BulkLookup(ctx, &reqs) resp, err := client.conn.BulkLookup(ctx, &reqs)
if err != nil { if err != nil {
return nil, ClientError.Wrap(err) return nil, ClientError.Wrap(err)

View File

@ -17,7 +17,7 @@ import (
"storj.io/storj/pkg/storj" "storj.io/storj/pkg/storj"
) )
func TestNewOverlayClient(t *testing.T) { func TestNewClient(t *testing.T) {
ctx := testcontext.New(t) ctx := testcontext.New(t)
defer ctx.Cleanup() defer ctx.Cleanup()
@ -35,12 +35,10 @@ func TestNewOverlayClient(t *testing.T) {
identity, err := ca.NewIdentity() identity, err := ca.NewIdentity()
assert.NoError(t, err) assert.NoError(t, err)
oc, err := overlay.NewOverlayClient(identity, v.address) oc, err := overlay.NewClient(identity, v.address)
assert.NoError(t, err) assert.NoError(t, err)
assert.NotNil(t, oc) assert.NotNil(t, oc)
_, ok := oc.(*overlay.Overlay)
assert.True(t, ok)
} }
} }

View File

@ -12,7 +12,6 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
monkit "gopkg.in/spacemonkeygo/monkit.v2" monkit "gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/pkg/kademlia"
"storj.io/storj/pkg/pb" "storj.io/storj/pkg/pb"
"storj.io/storj/pkg/provider" "storj.io/storj/pkg/provider"
"storj.io/storj/pkg/statdb" "storj.io/storj/pkg/statdb"
@ -63,11 +62,6 @@ func (c Config) Run(ctx context.Context, server *provider.Provider) (
err error) { err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
kad := kademlia.LoadFromContext(ctx)
if kad == nil {
return Error.New("programmer error: kademlia responsibility unstarted")
}
sdb, ok := ctx.Value("masterdb").(interface { sdb, ok := ctx.Value("masterdb").(interface {
StatDB() statdb.DB StatDB() statdb.DB
OverlayCache() storage.KeyValueStore OverlayCache() storage.KeyValueStore
@ -76,7 +70,7 @@ func (c Config) Run(ctx context.Context, server *provider.Provider) (
return Error.Wrap(errs.New("unable to get master db instance")) return Error.Wrap(errs.New("unable to get master db instance"))
} }
cache := NewOverlayCache(sdb.OverlayCache(), kad, sdb.StatDB()) cache := NewCache(sdb.OverlayCache(), sdb.StatDB())
ns := &pb.NodeStats{ ns := &pb.NodeStats{
UptimeCount: c.Node.UptimeCount, UptimeCount: c.Node.UptimeCount,
@ -85,7 +79,7 @@ func (c Config) Run(ctx context.Context, server *provider.Provider) (
AuditCount: c.Node.AuditCount, AuditCount: c.Node.AuditCount,
} }
srv := NewServer(zap.L(), cache, kad, ns) srv := NewServer(zap.L(), cache, ns)
pb.RegisterOverlayServer(server.GRPC(), srv) pb.RegisterOverlayServer(server.GRPC(), srv)
ctx2 := context.WithValue(ctx, ctxKeyOverlay, cache) ctx2 := context.WithValue(ctx, ctxKeyOverlay, cache)

View File

@ -7,24 +7,13 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"storj.io/storj/pkg/kademlia"
) )
func TestRun(t *testing.T) { func TestRun(t *testing.T) {
bctx := context.Background() ctx := context.Background()
kad := &kademlia.Kademlia{}
var kadKey kademlia.CtxKey
ctx := context.WithValue(bctx, kadKey, kad)
// run with nil
err := Config{}.Run(context.Background(), nil)
assert.Error(t, err)
assert.Equal(t, "overlay error: programmer error: kademlia responsibility unstarted", err.Error())
// run with nil, pass pointer to Kademlia in context // run with nil, pass pointer to Kademlia in context
err = Config{}.Run(ctx, nil) err := Config{}.Run(ctx, nil)
assert.Error(t, err) assert.Error(t, err)
assert.Equal(t, "overlay error: unable to get master db instance", err.Error()) assert.Equal(t, "overlay error: unable to get master db instance", err.Error())
} }

View File

@ -13,9 +13,8 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"gopkg.in/spacemonkeygo/monkit.v2" monkit "gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/pkg/dht"
"storj.io/storj/pkg/pb" "storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj" "storj.io/storj/pkg/storj"
"storj.io/storj/storage" "storj.io/storj/storage"
@ -26,30 +25,28 @@ var ServerError = errs.Class("Server Error")
// Server implements our overlay RPC service // Server implements our overlay RPC service
type Server struct { type Server struct {
logger *zap.Logger log *zap.Logger
dht dht.DHT
cache *Cache cache *Cache
metrics *monkit.Registry metrics *monkit.Registry
nodeStats *pb.NodeStats nodeStats *pb.NodeStats
} }
// NewServer creates a new Overlay Server // NewServer creates a new Overlay Server
func NewServer(log *zap.Logger, cache *Cache, dht dht.DHT, nodeStats *pb.NodeStats) *Server { func NewServer(log *zap.Logger, cache *Cache, nodeStats *pb.NodeStats) *Server {
return &Server{ return &Server{
dht: dht,
cache: cache, cache: cache,
logger: log, log: log,
metrics: monkit.Default, metrics: monkit.Default,
nodeStats: nodeStats, nodeStats: nodeStats,
} }
} }
// Lookup finds the address of a node in our overlay network // Lookup finds the address of a node in our overlay network
func (o *Server) Lookup(ctx context.Context, req *pb.LookupRequest) (*pb.LookupResponse, error) { func (server *Server) Lookup(ctx context.Context, req *pb.LookupRequest) (*pb.LookupResponse, error) {
na, err := o.cache.Get(ctx, req.NodeId) na, err := server.cache.Get(ctx, req.NodeId)
if err != nil { if err != nil {
o.logger.Error("Error looking up node", zap.Error(err), zap.String("nodeID", req.NodeId.String())) server.log.Error("Error looking up node", zap.Error(err), zap.String("nodeID", req.NodeId.String()))
return nil, err return nil, err
} }
@ -59,8 +56,8 @@ func (o *Server) Lookup(ctx context.Context, req *pb.LookupRequest) (*pb.LookupR
} }
// BulkLookup finds the addresses of nodes in our overlay network // BulkLookup finds the addresses of nodes in our overlay network
func (o *Server) BulkLookup(ctx context.Context, reqs *pb.LookupRequests) (*pb.LookupResponses, error) { func (server *Server) BulkLookup(ctx context.Context, reqs *pb.LookupRequests) (*pb.LookupResponses, error) {
ns, err := o.cache.GetAll(ctx, lookupRequestsToNodeIDs(reqs)) ns, err := server.cache.GetAll(ctx, lookupRequestsToNodeIDs(reqs))
if err != nil { if err != nil {
return nil, ServerError.New("could not get nodes requested %s\n", err) return nil, ServerError.New("could not get nodes requested %s\n", err)
} }
@ -68,7 +65,7 @@ func (o *Server) BulkLookup(ctx context.Context, reqs *pb.LookupRequests) (*pb.L
} }
// FindStorageNodes searches the overlay network for nodes that meet the provided requirements // FindStorageNodes searches the overlay network for nodes that meet the provided requirements
func (o *Server) FindStorageNodes(ctx context.Context, req *pb.FindStorageNodesRequest) (resp *pb.FindStorageNodesResponse, err error) { func (server *Server) FindStorageNodes(ctx context.Context, req *pb.FindStorageNodesRequest) (resp *pb.FindStorageNodesResponse, err error) {
opts := req.GetOpts() opts := req.GetOpts()
maxNodes := req.GetMaxNodes() maxNodes := req.GetMaxNodes()
if maxNodes <= 0 { if maxNodes <= 0 {
@ -77,13 +74,13 @@ func (o *Server) FindStorageNodes(ctx context.Context, req *pb.FindStorageNodesR
excluded := opts.ExcludedNodes excluded := opts.ExcludedNodes
restrictions := opts.GetRestrictions() restrictions := opts.GetRestrictions()
reputation := o.nodeStats reputation := server.nodeStats
var startID storj.NodeID var startID storj.NodeID
result := []*pb.Node{} result := []*pb.Node{}
for { for {
var nodes []*pb.Node var nodes []*pb.Node
nodes, startID, err = o.populate(ctx, req.Start, maxNodes, restrictions, reputation, excluded) nodes, startID, err = server.populate(ctx, req.Start, maxNodes, restrictions, reputation, excluded)
if err != nil { if err != nil {
return nil, Error.Wrap(err) return nil, Error.Wrap(err)
} }
@ -123,8 +120,8 @@ func (o *Server) FindStorageNodes(ctx context.Context, req *pb.FindStorageNodesR
}, nil }, nil
} }
func (o *Server) getNodes(ctx context.Context, keys storage.Keys) ([]*pb.Node, error) { func (server *Server) getNodes(ctx context.Context, keys storage.Keys) ([]*pb.Node, error) {
values, err := o.cache.DB.GetAll(keys) values, err := server.cache.db.GetAll(keys)
if err != nil { if err != nil {
return nil, Error.Wrap(err) return nil, Error.Wrap(err)
} }
@ -143,27 +140,27 @@ func (o *Server) getNodes(ctx context.Context, keys storage.Keys) ([]*pb.Node, e
} }
func (o *Server) populate(ctx context.Context, startID storj.NodeID, maxNodes int64, func (server *Server) populate(ctx context.Context, startID storj.NodeID, maxNodes int64,
minRestrictions *pb.NodeRestrictions, minReputation *pb.NodeStats, minRestrictions *pb.NodeRestrictions, minReputation *pb.NodeStats,
excluded storj.NodeIDList) ([]*pb.Node, storj.NodeID, error) { excluded storj.NodeIDList) ([]*pb.Node, storj.NodeID, error) {
limit := int(maxNodes * 2) limit := int(maxNodes * 2)
keys, err := o.cache.DB.List(startID.Bytes(), limit) keys, err := server.cache.db.List(startID.Bytes(), limit)
if err != nil { if err != nil {
o.logger.Error("Error listing nodes", zap.Error(err)) server.log.Error("Error listing nodes", zap.Error(err))
return nil, storj.NodeID{}, Error.Wrap(err) return nil, storj.NodeID{}, Error.Wrap(err)
} }
if len(keys) <= 0 { if len(keys) <= 0 {
o.logger.Info("No Keys returned from List operation") server.log.Info("No Keys returned from List operation")
return []*pb.Node{}, startID, nil return []*pb.Node{}, startID, nil
} }
// TODO: should this be `var result []*pb.Node` ? // TODO: should this be `var result []*pb.Node` ?
result := []*pb.Node{} result := []*pb.Node{}
nodes, err := o.getNodes(ctx, keys) nodes, err := server.getNodes(ctx, keys)
if err != nil { if err != nil {
o.logger.Error("Error getting nodes", zap.Error(err)) server.log.Error("Error getting nodes", zap.Error(err))
return nil, storj.NodeID{}, Error.Wrap(err) return nil, storj.NodeID{}, Error.Wrap(err)
} }
@ -172,15 +169,15 @@ func (o *Server) populate(ctx context.Context, startID storj.NodeID, maxNodes in
continue continue
} }
nodeRestrictions := v.GetRestrictions() restrictions := v.GetRestrictions()
nodeReputation := v.GetReputation() reputation := v.GetReputation()
if nodeRestrictions.GetFreeBandwidth() < minRestrictions.GetFreeBandwidth() || if restrictions.GetFreeBandwidth() < minRestrictions.GetFreeBandwidth() ||
nodeRestrictions.GetFreeDisk() < minRestrictions.GetFreeDisk() || restrictions.GetFreeDisk() < minRestrictions.GetFreeDisk() ||
nodeReputation.GetUptimeRatio() < minReputation.GetUptimeRatio() || reputation.GetUptimeRatio() < minReputation.GetUptimeRatio() ||
nodeReputation.GetUptimeCount() < minReputation.GetUptimeCount() || reputation.GetUptimeCount() < minReputation.GetUptimeCount() ||
nodeReputation.GetAuditSuccessRatio() < minReputation.GetAuditSuccessRatio() || reputation.GetAuditSuccessRatio() < minReputation.GetAuditSuccessRatio() ||
nodeReputation.GetAuditCount() < minReputation.GetAuditCount() || reputation.GetAuditCount() < minReputation.GetAuditCount() ||
contains(excluded, v.Id) { contains(excluded, v.Id) {
continue continue
} }

View File

@ -16,7 +16,6 @@ import (
) )
func TestServer(t *testing.T) { func TestServer(t *testing.T) {
ctx := testcontext.New(t) ctx := testcontext.New(t)
defer ctx.Cleanup() defer ctx.Cleanup()
@ -31,7 +30,7 @@ func TestServer(t *testing.T) {
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
satellite := planet.Satellites[0] satellite := planet.Satellites[0]
server := overlay.NewServer(satellite.Log.Named("overlay"), satellite.Overlay, satellite.Kademlia, &pb.NodeStats{}) server := overlay.NewServer(satellite.Log.Named("overlay"), satellite.Overlay, &pb.NodeStats{})
// TODO: handle cleanup // TODO: handle cleanup
{ // FindStorageNodes { // FindStorageNodes

View File

@ -38,7 +38,7 @@ type AgreementSender struct {
// Initialize the Agreement Sender // Initialize the Agreement Sender
func Initialize(DB *psdb.DB, identity *provider.FullIdentity) (*AgreementSender, error) { func Initialize(DB *psdb.DB, identity *provider.FullIdentity) (*AgreementSender, error) {
overlay, err := overlay.NewOverlayClient(identity, *defaultOverlayAddr) overlay, err := overlay.NewClient(identity, *defaultOverlayAddr)
if err != nil { if err != nil {
return nil, err return nil, err
} }