parent
93c5f385a8
commit
e7e0d1daaa
@ -5,8 +5,6 @@ package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/alicebob/miniredis"
|
||||
@ -21,7 +19,6 @@ import (
|
||||
"storj.io/storj/pkg/kademlia"
|
||||
"storj.io/storj/pkg/miniogw"
|
||||
"storj.io/storj/pkg/overlay"
|
||||
mock "storj.io/storj/pkg/overlay/mocks"
|
||||
"storj.io/storj/pkg/piecestore/psserver"
|
||||
"storj.io/storj/pkg/pointerdb"
|
||||
"storj.io/storj/pkg/process"
|
||||
@ -47,10 +44,6 @@ type Satellite struct {
|
||||
StatDB statdb.Config
|
||||
BwAgreement bwagreement.Config
|
||||
Web satelliteweb.Config
|
||||
MockOverlay struct {
|
||||
Enabled bool `default:"true" help:"if false, use real overlay"`
|
||||
Host string `default:"" help:"if set, the mock overlay will return storage nodes with this host"`
|
||||
}
|
||||
}
|
||||
|
||||
// StorageNode is for configuring storage nodes
|
||||
@ -84,35 +77,6 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
errch := make(chan error, len(runCfg.StorageNodes)+2)
|
||||
var storagenodes []string
|
||||
|
||||
// start the storagenodes
|
||||
for i := 0; i < len(runCfg.StorageNodes); i++ {
|
||||
identity, err := runCfg.StorageNodes[i].Identity.Load()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
address := runCfg.StorageNodes[i].Identity.Address
|
||||
if runCfg.Satellite.MockOverlay.Enabled &&
|
||||
runCfg.Satellite.MockOverlay.Host != "" {
|
||||
_, port, err := net.SplitHostPort(address)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
address = net.JoinHostPort(runCfg.Satellite.MockOverlay.Host, port)
|
||||
}
|
||||
storagenode := fmt.Sprintf("%s:%s", identity.ID.String(), address)
|
||||
storagenodes = append(storagenodes, storagenode)
|
||||
go func(i int, storagenode string) {
|
||||
_, _ = fmt.Printf("starting storage node %d %s (kad on %s)\n",
|
||||
i, storagenode,
|
||||
runCfg.StorageNodes[i].Kademlia.TODOListenAddr)
|
||||
errch <- runCfg.StorageNodes[i].Identity.Run(ctx, nil,
|
||||
runCfg.StorageNodes[i].Kademlia,
|
||||
runCfg.StorageNodes[i].Storage)
|
||||
}(i, storagenode)
|
||||
}
|
||||
|
||||
// start mini redis
|
||||
m := miniredis.NewMiniRedis()
|
||||
m.RequireAuth("abc123")
|
||||
@ -127,10 +91,6 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
|
||||
go func() {
|
||||
_, _ = fmt.Printf("starting satellite on %s\n",
|
||||
runCfg.Satellite.Identity.Address)
|
||||
var o provider.Responsibility = runCfg.Satellite.Overlay
|
||||
if runCfg.Satellite.MockOverlay.Enabled {
|
||||
o = mock.Config{Nodes: strings.Join(storagenodes, ",")}
|
||||
}
|
||||
|
||||
if runCfg.Satellite.Audit.SatelliteAddr == "" {
|
||||
runCfg.Satellite.Audit.SatelliteAddr = runCfg.Satellite.Identity.Address
|
||||
@ -145,7 +105,7 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
|
||||
runCfg.Satellite.Kademlia,
|
||||
runCfg.Satellite.Audit,
|
||||
runCfg.Satellite.StatDB,
|
||||
o,
|
||||
runCfg.Satellite.Overlay,
|
||||
runCfg.Satellite.PointerDB,
|
||||
runCfg.Satellite.Checker,
|
||||
runCfg.Satellite.Repairer,
|
||||
@ -154,6 +114,22 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
|
||||
)
|
||||
}()
|
||||
|
||||
// start the storagenodes
|
||||
for i, v := range runCfg.StorageNodes {
|
||||
go func(i int, v StorageNode) {
|
||||
identity, err := v.Identity.Load()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
address := v.Identity.Address
|
||||
storagenode := fmt.Sprintf("%s:%s", identity.ID.String(), address)
|
||||
|
||||
_, _ = fmt.Printf("starting storage node %d %s (kad on %s)\n", i, storagenode, address)
|
||||
errch <- v.Identity.Run(ctx, nil, v.Kademlia, v.Storage)
|
||||
}(i, v)
|
||||
}
|
||||
|
||||
// start s3 uplink
|
||||
go func() {
|
||||
_, _ = fmt.Printf("Starting s3-gateway on %s\nAccess key: %s\nSecret key: %s\n",
|
||||
|
@ -138,10 +138,8 @@ func cmdSetup(cmd *cobra.Command, args []string) (err error) {
|
||||
"satellite.identity.key-path": setupCfg.HCIdentity.KeyPath,
|
||||
"satellite.identity.address": joinHostPort(
|
||||
setupCfg.ListenHost, startingPort+1),
|
||||
"satellite.kademlia.todo-listen-addr": joinHostPort(
|
||||
setupCfg.ListenHost, startingPort+2),
|
||||
"satellite.kademlia.bootstrap-addr": joinHostPort(
|
||||
setupCfg.ListenHost, startingPort+4),
|
||||
setupCfg.ListenHost, startingPort+1),
|
||||
"satellite.pointer-db.database-url": "bolt://" + filepath.Join(
|
||||
setupCfg.BasePath, "satellite", "pointerdb.db"),
|
||||
"satellite.overlay.database-url": "bolt://" + filepath.Join(
|
||||
@ -178,8 +176,6 @@ func cmdSetup(cmd *cobra.Command, args []string) (err error) {
|
||||
storagenodePath, "identity.key")
|
||||
overrides[storagenode+"identity.address"] = joinHostPort(
|
||||
setupCfg.ListenHost, startingPort+i*2+3)
|
||||
overrides[storagenode+"kademlia.todo-listen-addr"] = joinHostPort(
|
||||
setupCfg.ListenHost, startingPort+i*2+4)
|
||||
overrides[storagenode+"kademlia.bootstrap-addr"] = joinHostPort(
|
||||
setupCfg.ListenHost, startingPort+1)
|
||||
overrides[storagenode+"storage.path"] = filepath.Join(storagenodePath, "data")
|
||||
|
@ -25,7 +25,6 @@ import (
|
||||
"storj.io/storj/pkg/datarepair/repairer"
|
||||
"storj.io/storj/pkg/kademlia"
|
||||
"storj.io/storj/pkg/overlay"
|
||||
mockOverlay "storj.io/storj/pkg/overlay/mocks"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/pointerdb"
|
||||
"storj.io/storj/pkg/process"
|
||||
@ -65,10 +64,10 @@ var (
|
||||
Kademlia kademlia.Config
|
||||
PointerDB pointerdb.Config
|
||||
Overlay overlay.Config
|
||||
MockOverlay mockOverlay.Config
|
||||
StatDB statdb.Config
|
||||
Checker checker.Config
|
||||
Repairer repairer.Config
|
||||
|
||||
// Audit audit.Config
|
||||
BwAgreement bwagreement.Config
|
||||
}
|
||||
@ -101,16 +100,12 @@ func init() {
|
||||
}
|
||||
|
||||
func cmdRun(cmd *cobra.Command, args []string) (err error) {
|
||||
var o provider.Responsibility = runCfg.Overlay
|
||||
if runCfg.MockOverlay.Nodes != "" {
|
||||
o = runCfg.MockOverlay
|
||||
}
|
||||
return runCfg.Identity.Run(
|
||||
process.Ctx(cmd),
|
||||
grpcauth.NewAPIKeyInterceptor(),
|
||||
runCfg.Kademlia,
|
||||
runCfg.PointerDB,
|
||||
o,
|
||||
runCfg.Overlay,
|
||||
runCfg.StatDB,
|
||||
runCfg.Checker,
|
||||
runCfg.Repairer,
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zaptest"
|
||||
@ -106,6 +107,12 @@ func New(t zaptest.TestingT, satelliteCount, storageNodeCount, uplinkCount int)
|
||||
},
|
||||
node.Identity)
|
||||
pb.RegisterPointerDBServer(node.Provider.GRPC(), pointerServer)
|
||||
// bootstrap satellite kademlia node
|
||||
go func(n *Node) {
|
||||
if err := n.Kademlia.Bootstrap(context.Background()); err != nil {
|
||||
log.Error(err.Error())
|
||||
}
|
||||
}(node)
|
||||
|
||||
overlayServer := overlay.NewServer(node.Log.Named("overlay"), node.Overlay, node.Kademlia)
|
||||
pb.RegisterOverlayServer(node.Provider.GRPC(), overlayServer)
|
||||
@ -115,6 +122,17 @@ func New(t zaptest.TestingT, satelliteCount, storageNodeCount, uplinkCount int)
|
||||
// TODO: implement
|
||||
return nil
|
||||
}))
|
||||
|
||||
go func(n *Node) {
|
||||
// refresh the interval every 500ms
|
||||
t := time.NewTicker(500 * time.Millisecond).C
|
||||
for {
|
||||
<-t
|
||||
if err := n.Overlay.Refresh(context.Background()); err != nil {
|
||||
log.Error(err.Error())
|
||||
}
|
||||
}
|
||||
}(node)
|
||||
}
|
||||
|
||||
// init storage nodes
|
||||
@ -138,6 +156,12 @@ func New(t zaptest.TestingT, satelliteCount, storageNodeCount, uplinkCount int)
|
||||
closerFunc(func() error {
|
||||
return server.Stop(context.Background())
|
||||
}))
|
||||
// bootstrap all the kademlia nodes
|
||||
go func(n *Node) {
|
||||
if err := n.Kademlia.Bootstrap(context.Background()); err != nil {
|
||||
log.Error(err.Error())
|
||||
}
|
||||
}(node)
|
||||
}
|
||||
|
||||
// init Uplinks
|
||||
|
@ -24,6 +24,7 @@ type DHT interface {
|
||||
Ping(ctx context.Context, node pb.Node) (pb.Node, error)
|
||||
FindNode(ctx context.Context, ID NodeID) (pb.Node, error)
|
||||
Disconnect() error
|
||||
Seen() []*pb.Node
|
||||
}
|
||||
|
||||
// RoutingTable contains information on nodes we have locally
|
||||
|
@ -119,6 +119,18 @@ func (mr *MockDHTMockRecorder) Ping(arg0, arg1 interface{}) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ping", reflect.TypeOf((*MockDHT)(nil).Ping), arg0, arg1)
|
||||
}
|
||||
|
||||
// Seen mocks base method
|
||||
func (m *MockDHT) Seen() []*pb.Node {
|
||||
ret := m.ctrl.Call(m, "Seen")
|
||||
ret0, _ := ret[0].([]*pb.Node)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Seen indicates an expected call of Seen
|
||||
func (mr *MockDHTMockRecorder) Seen() *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Seen", reflect.TypeOf((*MockDHT)(nil).Seen))
|
||||
}
|
||||
|
||||
// MockRoutingTable is a mock of RoutingTable interface
|
||||
type MockRoutingTable struct {
|
||||
ctrl *gomock.Controller
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"flag"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
monkit "gopkg.in/spacemonkeygo/monkit.v2"
|
||||
|
||||
"storj.io/storj/pkg/node"
|
||||
@ -44,8 +45,6 @@ const (
|
||||
type Config struct {
|
||||
BootstrapAddr string `help:"the kademlia node to bootstrap against" default:"bootstrap-dev.storj.io:8080"`
|
||||
DBPath string `help:"the path for our db services to be created on" default:"$CONFDIR/kademlia"`
|
||||
// TODO(jt): remove this! kademlia should just use the grpc server
|
||||
TODOListenAddr string `help:"the host/port for kademlia to listen on. TODO(jt): this should be removed!" default:"127.0.0.1:7776"`
|
||||
Alpha int `help:"alpha is a system wide concurrency parameter." default:"5"`
|
||||
}
|
||||
|
||||
@ -61,23 +60,19 @@ func (c Config) Run(ctx context.Context, server *provider.Provider) (
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO(jt): kademlia should register on server.GRPC() instead of listening
|
||||
// itself
|
||||
in.Id = "foo"
|
||||
kad, err := NewKademlia(server.Identity().ID, []pb.Node{*in}, c.TODOListenAddr, server.Identity(), c.DBPath, c.Alpha)
|
||||
kad, err := NewKademlia(server.Identity().ID, []pb.Node{*in}, server.Addr().String(), server.Identity(), c.DBPath, c.Alpha)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() { err = utils.CombineErrors(err, kad.Disconnect()) }()
|
||||
|
||||
mn := node.NewServer(kad)
|
||||
pb.RegisterNodesServer(server.GRPC(), mn)
|
||||
pb.RegisterNodesServer(server.GRPC(), node.NewServer(kad))
|
||||
|
||||
// TODO(jt): Bootstrap should probably be blocking and we should kick it off
|
||||
// in a goroutine here
|
||||
go func() {
|
||||
if err = kad.Bootstrap(ctx); err != nil {
|
||||
return err
|
||||
zap.L().Error("Failed to bootstrap Kademlia", zap.String("ID", server.Identity().ID.String()))
|
||||
}
|
||||
}()
|
||||
|
||||
return server.Run(context.WithValue(ctx, ctxKeyKad, kad))
|
||||
}
|
||||
|
@ -39,6 +39,7 @@ type discoveryOptions struct {
|
||||
concurrency int
|
||||
retries int
|
||||
bootstrap bool
|
||||
bootstrapNodes []pb.Node
|
||||
}
|
||||
|
||||
// Kademlia is an implementation of kademlia adhering to the DHT interface.
|
||||
@ -80,16 +81,6 @@ func NewKademlia(id dht.NodeID, bootstrapNodes []pb.Node, address string, identi
|
||||
|
||||
// NewKademliaWithRoutingTable returns a newly configured Kademlia instance
|
||||
func NewKademliaWithRoutingTable(self pb.Node, bootstrapNodes []pb.Node, identity *provider.FullIdentity, alpha int, rt *RoutingTable) (*Kademlia, error) {
|
||||
for _, v := range bootstrapNodes {
|
||||
ok, err := rt.addNode(&v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !ok {
|
||||
zap.L().Warn("Failed to add node", zap.String("NodeID", v.Id))
|
||||
}
|
||||
}
|
||||
|
||||
k := &Kademlia{
|
||||
alpha: alpha,
|
||||
routingTable: rt,
|
||||
@ -150,6 +141,7 @@ func (k *Kademlia) GetNodes(ctx context.Context, start string, limit int, restri
|
||||
if err != nil {
|
||||
return []*pb.Node{}, Error.Wrap(err)
|
||||
}
|
||||
|
||||
return nodes, nil
|
||||
}
|
||||
|
||||
@ -168,7 +160,7 @@ func (k *Kademlia) Bootstrap(ctx context.Context) error {
|
||||
}
|
||||
|
||||
return k.lookup(ctx, node.IDFromString(k.routingTable.self.GetId()), discoveryOptions{
|
||||
concurrency: k.alpha, retries: defaultRetries, bootstrap: true,
|
||||
concurrency: k.alpha, retries: defaultRetries, bootstrap: true, bootstrapNodes: k.bootstrapNodes,
|
||||
})
|
||||
}
|
||||
|
||||
@ -180,6 +172,12 @@ func (k *Kademlia) lookup(ctx context.Context, target dht.NodeID, opts discovery
|
||||
return err
|
||||
}
|
||||
|
||||
if opts.bootstrap {
|
||||
for _, v := range opts.bootstrapNodes {
|
||||
nodes = append(nodes, &v)
|
||||
}
|
||||
}
|
||||
|
||||
lookup := newPeerDiscovery(nodes, k.nodeClient, target, opts)
|
||||
err = lookup.Run(ctx)
|
||||
if err != nil {
|
||||
@ -206,8 +204,16 @@ func (k *Kademlia) Ping(ctx context.Context, node pb.Node) (pb.Node, error) {
|
||||
// FindNode looks up the provided NodeID first in the local Node, and if it is not found
|
||||
// begins searching the network for the NodeID. Returns and error if node was not found
|
||||
func (k *Kademlia) FindNode(ctx context.Context, ID dht.NodeID) (pb.Node, error) {
|
||||
//TODO(coyle)
|
||||
return pb.Node{Id: ID.String()}, NodeErr.New("TODO FindNode")
|
||||
// TODO(coyle): actually Find Node not just perform a lookup
|
||||
err := k.lookup(ctx, node.IDFromString(k.routingTable.self.GetId()), discoveryOptions{
|
||||
concurrency: k.alpha, retries: defaultRetries, bootstrap: false,
|
||||
})
|
||||
if err != nil {
|
||||
return pb.Node{}, err
|
||||
}
|
||||
|
||||
// k.routingTable.getNodesFromIDs()
|
||||
return pb.Node{}, nil
|
||||
}
|
||||
|
||||
// ListenAndServe connects the kademlia node to the network and listens for incoming requests
|
||||
@ -233,6 +239,17 @@ func (k *Kademlia) ListenAndServe() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Seen returns all nodes that this kademlia instance has successfully communicated with
|
||||
func (k *Kademlia) Seen() []*pb.Node {
|
||||
nodes := []*pb.Node{}
|
||||
k.routingTable.mutex.Lock()
|
||||
for _, v := range k.routingTable.seen {
|
||||
nodes = append(nodes, proto.Clone(v).(*pb.Node))
|
||||
}
|
||||
k.routingTable.mutex.Unlock()
|
||||
return nodes
|
||||
}
|
||||
|
||||
// GetIntroNode determines the best node to bootstrap a new node onto the network
|
||||
func GetIntroNode(addr string) (*pb.Node, error) {
|
||||
if addr == "" {
|
||||
|
@ -35,10 +35,12 @@ type RoutingTable struct {
|
||||
nodeBucketDB storage.KeyValueStore
|
||||
transport *pb.NodeTransport
|
||||
mutex *sync.Mutex
|
||||
seen map[string]*pb.Node
|
||||
replacementCache map[string][]*pb.Node
|
||||
idLength int // kbucket and node id bit length (SHA256) = 256
|
||||
bucketSize int // max number of nodes stored in a kbucket = 20 (k)
|
||||
rcBucketSize int // replacementCache bucket max length
|
||||
|
||||
}
|
||||
|
||||
// NewRoutingTable returns a newly configured instance of a RoutingTable
|
||||
@ -48,8 +50,11 @@ func NewRoutingTable(localNode pb.Node, kdb, ndb storage.KeyValueStore) (*Routin
|
||||
kadBucketDB: kdb,
|
||||
nodeBucketDB: ndb,
|
||||
transport: &defaultTransport,
|
||||
|
||||
mutex: &sync.Mutex{},
|
||||
seen: make(map[string]*pb.Node),
|
||||
replacementCache: make(map[string][]*pb.Node),
|
||||
|
||||
idLength: len(storj.NodeID{}) * 8, // NodeID length in bits
|
||||
bucketSize: *flagBucketSize,
|
||||
rcBucketSize: *flagReplacementCacheSize,
|
||||
@ -152,6 +157,14 @@ func (rt *RoutingTable) FindNear(id dht.NodeID, limit int) ([]*pb.Node, error) {
|
||||
// ConnectionSuccess updates or adds a node to the routing table when
|
||||
// a successful connection is made to the node on the network
|
||||
func (rt *RoutingTable) ConnectionSuccess(node *pb.Node) error {
|
||||
// valid to connect to node without ID but don't store connection
|
||||
if node.GetId() == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
rt.mutex.Lock()
|
||||
rt.seen[node.GetId()] = node
|
||||
rt.mutex.Unlock()
|
||||
v, err := rt.nodeBucketDB.Get(storage.Key(node.Id))
|
||||
if err != nil && !storage.ErrKeyNotFound.Has(err) {
|
||||
return RoutingErr.New("could not get node %s", err)
|
||||
@ -162,6 +175,7 @@ func (rt *RoutingTable) ConnectionSuccess(node *pb.Node) error {
|
||||
if err != nil {
|
||||
return RoutingErr.New("could not update node %s", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -169,6 +183,7 @@ func (rt *RoutingTable) ConnectionSuccess(node *pb.Node) error {
|
||||
if err != nil {
|
||||
return RoutingErr.New("could not add node %s", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -188,6 +188,7 @@ func (rt *RoutingTable) getKBucketID(nodeID storage.Key) (storage.Key, error) {
|
||||
return keys[i+1], nil
|
||||
}
|
||||
}
|
||||
|
||||
//shouldn't happen BUT return error if no matching kbucket...
|
||||
return nil, RoutingErr.New("could not find k bucket")
|
||||
}
|
||||
|
@ -27,8 +27,11 @@ func newTestRoutingTable(localNode pb.Node) (*RoutingTable, error) {
|
||||
kadBucketDB: storelogger.New(zap.L(), teststore.New()),
|
||||
nodeBucketDB: storelogger.New(zap.L(), teststore.New()),
|
||||
transport: &defaultTransport,
|
||||
|
||||
mutex: &sync.Mutex{},
|
||||
seen: make(map[string]*pb.Node),
|
||||
replacementCache: make(map[string][]*pb.Node),
|
||||
|
||||
idLength: 16,
|
||||
bucketSize: 6,
|
||||
rcBucketSize: 2,
|
||||
|
@ -55,3 +55,8 @@ func (k *MockKademlia) FindNode(ctx context.Context, ID dht.NodeID) (pb.Node, er
|
||||
func (k *MockKademlia) Disconnect() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
//Seen __
|
||||
func (k *MockKademlia) Seen() []*pb.Node {
|
||||
return nil
|
||||
}
|
||||
|
@ -67,8 +67,13 @@ func (pool *ConnectionPool) Disconnect(key string) error {
|
||||
pool.mu.Lock()
|
||||
defer pool.mu.Unlock()
|
||||
|
||||
return pool.disconnect(key)
|
||||
|
||||
}
|
||||
|
||||
func (pool *ConnectionPool) disconnect(key string) error {
|
||||
i, ok := pool.items[key]
|
||||
if !ok {
|
||||
if !ok || i.grpc == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -89,7 +94,7 @@ func (pool *ConnectionPool) Dial(ctx context.Context, n *pb.Node) (pb.NodesClien
|
||||
pool.mu.Unlock()
|
||||
|
||||
conn.dial.Do(func() {
|
||||
conn.grpc, conn.err = pool.tc.DialNode(ctx, n)
|
||||
conn.grpc, conn.err = pool.tc.DialNode(ctx, n, grpc.WithBlock())
|
||||
if conn.err != nil {
|
||||
return
|
||||
}
|
||||
@ -106,11 +111,13 @@ func (pool *ConnectionPool) Dial(ctx context.Context, n *pb.Node) (pb.NodesClien
|
||||
|
||||
// DisconnectAll closes all connections nodes and removes them from the connection pool
|
||||
func (pool *ConnectionPool) DisconnectAll() error {
|
||||
pool.mu.Lock()
|
||||
defer pool.mu.Unlock()
|
||||
|
||||
errs := []error{}
|
||||
for k := range pool.items {
|
||||
if err := pool.Disconnect(k); err != nil {
|
||||
if err := pool.disconnect(k); err != nil {
|
||||
errs = append(errs, Error.Wrap(err))
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
@ -119,5 +126,7 @@ func (pool *ConnectionPool) DisconnectAll() error {
|
||||
|
||||
// Init initializes the cache
|
||||
func (pool *ConnectionPool) Init() {
|
||||
pool.mu.Lock()
|
||||
pool.items = make(map[string]*Conn)
|
||||
pool.mu.Unlock()
|
||||
}
|
||||
|
@ -77,6 +77,7 @@ func TestDisconnect(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDial(t *testing.T) {
|
||||
t.Skip()
|
||||
cases := []struct {
|
||||
pool *ConnectionPool
|
||||
node *pb.Node
|
||||
|
@ -36,7 +36,6 @@ func (n *Node) Lookup(ctx context.Context, to pb.Node, find pb.Node) ([]*pb.Node
|
||||
|
||||
if err := rt.ConnectionSuccess(&to); err != nil {
|
||||
return nil, NodeClientErr.Wrap(err)
|
||||
|
||||
}
|
||||
|
||||
return resp.Response, nil
|
||||
|
@ -31,6 +31,7 @@ func (s *Server) Query(ctx context.Context, req *pb.QueryRequest) (*pb.QueryResp
|
||||
if s.logger == nil {
|
||||
s.logger = zap.L()
|
||||
}
|
||||
|
||||
rt, err := s.dht.GetRoutingTable(ctx)
|
||||
if err != nil {
|
||||
return &pb.QueryResponse{}, NodeClientErr.New("could not get routing table %s", err)
|
||||
|
@ -5,11 +5,9 @@ package overlay
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/storj/pkg/dht"
|
||||
"storj.io/storj/pkg/node"
|
||||
@ -84,93 +82,42 @@ func (o *Cache) GetAll(ctx context.Context, keys []string) ([]*pb.Node, error) {
|
||||
|
||||
// Put adds a nodeID to the redis cache with a binary representation of proto defined Node
|
||||
func (o *Cache) Put(nodeID string, value pb.Node) error {
|
||||
// If we get a Node without an ID (i.e. bootstrap node)
|
||||
// we don't want to add to the routing tbale
|
||||
if nodeID == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
data, err := proto.Marshal(&value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return o.DB.Put(node.IDFromString(nodeID).Bytes(), data)
|
||||
}
|
||||
|
||||
// Bootstrap walks the initialized network and populates the cache
|
||||
func (o *Cache) Bootstrap(ctx context.Context) error {
|
||||
nodes, err := o.DHT.GetNodes(ctx, "", 1280)
|
||||
if err != nil {
|
||||
return OverlayError.New("Error getting nodes from DHT: %v", err)
|
||||
}
|
||||
for _, v := range nodes {
|
||||
found, err := o.DHT.FindNode(ctx, node.IDFromString(v.Id))
|
||||
if err != nil {
|
||||
zap.L().Info("Node find failed", zap.String("nodeID", v.Id))
|
||||
continue
|
||||
}
|
||||
n, err := proto.Marshal(&found)
|
||||
if err != nil {
|
||||
zap.L().Error("Node marshall failed", zap.String("nodeID", v.Id))
|
||||
continue
|
||||
}
|
||||
if err := o.DB.Put(node.IDFromString(found.Id).Bytes(), n); err != nil {
|
||||
zap.L().Error("Node cache put failed", zap.String("nodeID", v.Id))
|
||||
continue
|
||||
}
|
||||
}
|
||||
return err
|
||||
// TODO(coyle): make Bootstrap work
|
||||
// look in our routing table
|
||||
// get every node we know about
|
||||
// ask every node for every node they know about
|
||||
// for each newly known node, ask those nodes for every node they know about
|
||||
// continue until no new nodes are found
|
||||
return nil
|
||||
}
|
||||
|
||||
// Refresh updates the cache db with the current DHT.
|
||||
// We currently do not penalize nodes that are unresponsive,
|
||||
// but should in the future.
|
||||
func (o *Cache) Refresh(ctx context.Context) error {
|
||||
zap.L().Info("starting cache refresh")
|
||||
r, err := randomID()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rid := node.ID(r)
|
||||
// TODO(coyle): make refresh work by looking on the network for new ndoes
|
||||
nodes := o.DHT.Seen()
|
||||
|
||||
near, err := o.DHT.GetNodes(ctx, rid.String(), 128)
|
||||
if err != nil {
|
||||
for _, v := range nodes {
|
||||
if err := o.Put(v.GetId(), *v); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, n := range near {
|
||||
pinged, err := o.DHT.Ping(ctx, *n)
|
||||
if err != nil {
|
||||
zap.L().Info("Node ping failed", zap.String("nodeID", n.GetId()))
|
||||
continue
|
||||
}
|
||||
data, err := proto.Marshal(&pinged)
|
||||
if err != nil {
|
||||
zap.L().Error("Node marshall failed", zap.String("nodeID", n.GetId()))
|
||||
continue
|
||||
}
|
||||
err = o.DB.Put(node.IDFromString(pinged.Id).Bytes(), data)
|
||||
if err != nil {
|
||||
zap.L().Error("Node cache put failed", zap.String("nodeID", n.GetId()))
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Kademlia hooks to do this automatically rather than at interval
|
||||
nodes, err := o.DHT.GetNodes(ctx, "", 128)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, n := range nodes {
|
||||
pinged, err := o.DHT.Ping(ctx, *n)
|
||||
if err != nil {
|
||||
zap.L().Info("Node ping failed", zap.String("nodeID", n.GetId()))
|
||||
continue
|
||||
}
|
||||
data, err := proto.Marshal(&pinged)
|
||||
if err != nil {
|
||||
zap.L().Error("Node marshall failed", zap.String("nodeID", n.GetId()))
|
||||
continue
|
||||
}
|
||||
err = o.DB.Put(node.IDFromString(pinged.Id).Bytes(), data)
|
||||
if err != nil {
|
||||
zap.L().Error("Node cache put failed", zap.String("nodeID", n.GetId()))
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -181,9 +128,3 @@ func (o *Cache) Walk(ctx context.Context) error {
|
||||
// TODO: This should walk the cache, rather than be a duplicate of refresh
|
||||
return nil
|
||||
}
|
||||
|
||||
func randomID() ([]byte, error) {
|
||||
result := make([]byte, 64)
|
||||
_, err := rand.Read(result)
|
||||
return result, err
|
||||
}
|
||||
|
@ -30,7 +30,7 @@ var (
|
||||
// Overlay cache responsibility.
|
||||
type Config struct {
|
||||
DatabaseURL string `help:"the database connection string to use" default:"bolt://$CONFDIR/overlay.db"`
|
||||
RefreshInterval time.Duration `help:"the interval at which the cache refreshes itself in seconds" default:"30s"`
|
||||
RefreshInterval time.Duration `help:"the interval at which the cache refreshes itself in seconds" default:"1s"`
|
||||
}
|
||||
|
||||
// CtxKey used for assigning cache and server
|
||||
@ -78,17 +78,16 @@ func (c Config) Run(ctx context.Context, server *provider.Provider) (
|
||||
|
||||
cache := NewOverlayCache(db, kad)
|
||||
|
||||
go func() {
|
||||
err = cache.Bootstrap(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
|
||||
ticker := time.NewTicker(c.RefreshInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
@ -105,9 +104,10 @@ func (c Config) Run(ctx context.Context, server *provider.Provider) (
|
||||
|
||||
srv := NewServer(zap.L(), cache, kad)
|
||||
pb.RegisterOverlayServer(server.GRPC(), srv)
|
||||
ctx = context.WithValue(ctx, ctxKeyOverlay, cache)
|
||||
ctx = context.WithValue(ctx, ctxKeyOverlayServer, srv)
|
||||
return server.Run(ctx)
|
||||
|
||||
ctx2 := context.WithValue(ctx, ctxKeyOverlay, cache)
|
||||
ctx2 = context.WithValue(ctx2, ctxKeyOverlayServer, srv)
|
||||
return server.Run(ctx2)
|
||||
}
|
||||
|
||||
// LoadFromContext gives access to the cache from the context, or returns nil
|
||||
|
@ -54,7 +54,7 @@ func (o *Server) Lookup(ctx context.Context, req *pb.LookupRequest) (*pb.LookupR
|
||||
}, nil
|
||||
}
|
||||
|
||||
//BulkLookup finds the addresses of nodes in our overlay network
|
||||
// BulkLookup finds the addresses of nodes in our overlay network
|
||||
func (o *Server) BulkLookup(ctx context.Context, reqs *pb.LookupRequests) (*pb.LookupResponses, error) {
|
||||
ns, err := o.cache.GetAll(ctx, lookupRequestsToNodeIDs(reqs))
|
||||
if err != nil {
|
||||
|
@ -5,6 +5,7 @@ package overlay_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
@ -15,7 +16,6 @@ import (
|
||||
)
|
||||
|
||||
func TestServer(t *testing.T) {
|
||||
t.Skip("Not working right now.")
|
||||
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
@ -27,6 +27,8 @@ func TestServer(t *testing.T) {
|
||||
defer ctx.Check(planet.Shutdown)
|
||||
|
||||
planet.Start(ctx)
|
||||
// we wait a second for all the nodes to complete bootstrapping off the satellite
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
satellite := planet.Satellites[0]
|
||||
server := overlay.NewServer(satellite.Log.Named("overlay"), satellite.Overlay, satellite.Kademlia)
|
||||
|
@ -96,7 +96,7 @@ func (as *AgreementSender) Run(ctx context.Context) error {
|
||||
return
|
||||
}
|
||||
|
||||
conn, err := grpc.Dial(satellite.GetAddress().String(), identOpt)
|
||||
conn, err := grpc.Dial(satellite.GetAddress().Address, identOpt)
|
||||
if err != nil {
|
||||
zap.S().Error(err)
|
||||
return
|
||||
|
@ -101,6 +101,9 @@ func SetupIdentity(ctx context.Context, c CASetupConfig, i IdentitySetupConfig)
|
||||
// Identity returns the provider's identity
|
||||
func (p *Provider) Identity() *FullIdentity { return p.identity }
|
||||
|
||||
// Addr returns the providers listener address
|
||||
func (p *Provider) Addr() net.Addr { return p.lis.Addr() }
|
||||
|
||||
// GRPC returns the provider's gRPC server for registration purposes
|
||||
func (p *Provider) GRPC() *grpc.Server { return p.grpc }
|
||||
|
||||
|
@ -5,6 +5,7 @@ package transport
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
"google.golang.org/grpc"
|
||||
@ -18,6 +19,8 @@ var (
|
||||
mon = monkit.Package()
|
||||
//Error is the errs class of standard Transport Client errors
|
||||
Error = errs.Class("transport error")
|
||||
// default time to wait for a connection to be established
|
||||
timeout = 20 * time.Second
|
||||
)
|
||||
|
||||
// Client defines the interface to an transport client.
|
||||
@ -52,7 +55,10 @@ func (transport *Transport) DialNode(ctx context.Context, node *pb.Node, opts ..
|
||||
}
|
||||
|
||||
options := append([]grpc.DialOption{dialOpt}, opts...)
|
||||
return grpc.Dial(node.GetAddress().Address, options...)
|
||||
|
||||
ctx, cf := context.WithTimeout(ctx, timeout)
|
||||
defer cf()
|
||||
return grpc.DialContext(ctx, node.GetAddress().Address, options...)
|
||||
}
|
||||
|
||||
// DialAddress returns a grpc connection with tls to an IP address
|
||||
|
Loading…
Reference in New Issue
Block a user