storj/pkg/overlay/server.go
Natalie Villasana 80727ae90b adds netstate pagination (#95)
* adds netstate rpc server pagination, mocks pagination in test/util.go

* updates ns client example, combines ns client and server test to netstate_test, adds pagination to bolt client

* better organizes netstate test calls

* wip breaking netstate test into smaller tests

* wip modularizing netstate tests

* adds some test panics

* wip netstate test attempts

* testing bug in netstate TestDeleteAuth

* wip fixes global variable problem, still issues with list

* wip fixes get request params and args

* fixes bug in path when using MakePointers helper fn

* updates mockdb list func, adds test, changes Limit to int

* fixes merge conflicts

* fixes broken tests from merge

* remove unnecessary PointerEntry struct

* removes error when Get returns nil value from boltdb

* breaks boltdb client tests into smaller tests

* renames AssertNoErr test helper to HandleErr

* adds StartingKey and Limit parameters to redis list func, adds beginning of redis tests

* adds helper func for mockdb List function

* if no starting key provided for netstate List, the first value in storage will be used

* adds basic pagination for redis List function, adds tests

* adds list limit to call in overlay/server.go

* streamlines/fixes some nits from review

* removes use of obsolete EncryptedUnencryptedSize

* uses MockKeyValueStore instead of redis instance in redis client test

* changes test to expect nil returned for getting missing key

* remove error from `KeyValueStore#Get`

* fix bolt test

* Merge pull request #1 from bryanchriswhite/nat-pagination

remove error from `KeyValueStore#Get`

* adds Get returning error back to KeyValueStore interface and affected clients

* trying to appease travis: returns errors in Get calls in overlay/cache and cache_test

* handles redis get error when no key found
2018-06-29 16:06:25 -04:00

96 lines
2.2 KiB
Go

// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package overlay
import (
"context"
"sync"
"go.uber.org/zap"
"gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/pkg/dht"
proto "storj.io/storj/protos/overlay" // naming proto to avoid confusion with this package
"storj.io/storj/storage"
)
const (
maxNodes = 40
)
// Server implements our overlay RPC service
type Server struct {
dht dht.DHT
cache *Cache
logger *zap.Logger
metrics *monkit.Registry
}
// Lookup finds the address of a node in our overlay network
func (o *Server) Lookup(ctx context.Context, req *proto.LookupRequest) (*proto.LookupResponse, error) {
na, err := o.cache.Get(ctx, req.NodeID)
if err != nil {
o.logger.Error("Error looking up node", zap.Error(err), zap.String("nodeID", req.NodeID))
return nil, err
}
return &proto.LookupResponse{
Node: &proto.Node{
Id: req.GetNodeID(),
Address: na,
},
}, nil
}
// FindStorageNodes searches the overlay network for nodes that meet the provided requirements
func (o *Server) FindStorageNodes(ctx context.Context, req *proto.FindStorageNodesRequest) (*proto.FindStorageNodesResponse, error) {
// NB: call FilterNodeReputation from node_reputation package to find nodes for storage
keys, err := o.cache.DB.List(nil, storage.Limit(10))
if err != nil {
o.logger.Error("Error listing nodes", zap.Error(err))
return nil, err
}
if len(keys) > maxNodes {
// TODO(coyle): determine if this is a set value or they user of the api will specify
keys = keys[:maxNodes]
}
nodes := o.getNodes(ctx, keys)
return &proto.FindStorageNodesResponse{
Nodes: nodes,
}, nil
}
func (o *Server) getNodes(ctx context.Context, keys storage.Keys) []*proto.Node {
wg := &sync.WaitGroup{}
ch := make(chan *proto.Node, len(keys))
wg.Add(len(keys))
for _, v := range keys {
go func(ch chan *proto.Node, id string) {
defer wg.Done()
na, err := o.cache.Get(ctx, id)
if err != nil {
o.logger.Error("failed to get key from cache", zap.Error(err))
return
}
ch <- &proto.Node{Id: id, Address: na}
}(ch, v.String())
}
wg.Wait()
close(ch)
nodes := []*proto.Node{}
for node := range ch {
nodes = append(nodes, node)
}
return nodes
}