80727ae90b
* adds netstate rpc server pagination, mocks pagination in test/util.go * updates ns client example, combines ns client and server test to netstate_test, adds pagination to bolt client * better organizes netstate test calls * wip breaking netstate test into smaller tests * wip modularizing netstate tests * adds some test panics * wip netstate test attempts * testing bug in netstate TestDeleteAuth * wip fixes global variable problem, still issues with list * wip fixes get request params and args * fixes bug in path when using MakePointers helper fn * updates mockdb list func, adds test, changes Limit to int * fixes merge conflicts * fixes broken tests from merge * remove unnecessary PointerEntry struct * removes error when Get returns nil value from boltdb * breaks boltdb client tests into smaller tests * renames AssertNoErr test helper to HandleErr * adds StartingKey and Limit parameters to redis list func, adds beginning of redis tests * adds helper func for mockdb List function * if no starting key provided for netstate List, the first value in storage will be used * adds basic pagination for redis List function, adds tests * adds list limit to call in overlay/server.go * streamlines/fixes some nits from review * removes use of obsolete EncryptedUnencryptedSize * uses MockKeyValueStore instead of redis instance in redis client test * changes test to expect nil returned for getting missing key * remove error from `KeyValueStore#Get` * fix bolt test * Merge pull request #1 from bryanchriswhite/nat-pagination remove error from `KeyValueStore#Get` * adds Get returning error back to KeyValueStore interface and affected clients * trying to appease travis: returns errors in Get calls in overlay/cache and cache_test * handles redis get error when no key found
96 lines
2.2 KiB
Go
96 lines
2.2 KiB
Go
// Copyright (C) 2018 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package overlay
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
|
|
"go.uber.org/zap"
|
|
"gopkg.in/spacemonkeygo/monkit.v2"
|
|
"storj.io/storj/pkg/dht"
|
|
|
|
proto "storj.io/storj/protos/overlay" // naming proto to avoid confusion with this package
|
|
"storj.io/storj/storage"
|
|
)
|
|
|
|
const (
|
|
maxNodes = 40
|
|
)
|
|
|
|
// Server implements our overlay RPC service
|
|
type Server struct {
|
|
dht dht.DHT
|
|
cache *Cache
|
|
logger *zap.Logger
|
|
metrics *monkit.Registry
|
|
}
|
|
|
|
// Lookup finds the address of a node in our overlay network
|
|
func (o *Server) Lookup(ctx context.Context, req *proto.LookupRequest) (*proto.LookupResponse, error) {
|
|
na, err := o.cache.Get(ctx, req.NodeID)
|
|
|
|
if err != nil {
|
|
o.logger.Error("Error looking up node", zap.Error(err), zap.String("nodeID", req.NodeID))
|
|
return nil, err
|
|
}
|
|
|
|
return &proto.LookupResponse{
|
|
Node: &proto.Node{
|
|
Id: req.GetNodeID(),
|
|
Address: na,
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
// FindStorageNodes searches the overlay network for nodes that meet the provided requirements
|
|
func (o *Server) FindStorageNodes(ctx context.Context, req *proto.FindStorageNodesRequest) (*proto.FindStorageNodesResponse, error) {
|
|
// NB: call FilterNodeReputation from node_reputation package to find nodes for storage
|
|
keys, err := o.cache.DB.List(nil, storage.Limit(10))
|
|
if err != nil {
|
|
o.logger.Error("Error listing nodes", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
if len(keys) > maxNodes {
|
|
// TODO(coyle): determine if this is a set value or they user of the api will specify
|
|
keys = keys[:maxNodes]
|
|
}
|
|
|
|
nodes := o.getNodes(ctx, keys)
|
|
|
|
return &proto.FindStorageNodesResponse{
|
|
Nodes: nodes,
|
|
}, nil
|
|
}
|
|
|
|
func (o *Server) getNodes(ctx context.Context, keys storage.Keys) []*proto.Node {
|
|
wg := &sync.WaitGroup{}
|
|
ch := make(chan *proto.Node, len(keys))
|
|
|
|
wg.Add(len(keys))
|
|
for _, v := range keys {
|
|
go func(ch chan *proto.Node, id string) {
|
|
|
|
defer wg.Done()
|
|
na, err := o.cache.Get(ctx, id)
|
|
if err != nil {
|
|
o.logger.Error("failed to get key from cache", zap.Error(err))
|
|
return
|
|
}
|
|
|
|
ch <- &proto.Node{Id: id, Address: na}
|
|
}(ch, v.String())
|
|
}
|
|
|
|
wg.Wait()
|
|
close(ch)
|
|
nodes := []*proto.Node{}
|
|
for node := range ch {
|
|
nodes = append(nodes, node)
|
|
}
|
|
|
|
return nodes
|
|
}
|