adds pingbackTimeout to kademlia endpoint (#1518)

This commit is contained in:
Natalie Villasana 2019-03-19 14:30:27 -04:00 committed by GitHub
parent c6f8d82978
commit 61ee04d363
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 45 additions and 24 deletions

View File

@ -7,6 +7,7 @@ import (
"context"
"net"
"net/http"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
@ -130,13 +131,12 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config) (*P
peer.Transport = peer.Transport.WithObservers(peer.Kademlia.RoutingTable)
// TODO: reduce number of arguments
peer.Kademlia.Service, err = kademlia.NewService(peer.Log.Named("kademlia"), self, config.BootstrapNodes(), peer.Transport, config.Alpha, peer.Kademlia.RoutingTable)
peer.Kademlia.Service, err = kademlia.NewService(peer.Log.Named("kademlia"), self, peer.Transport, peer.Kademlia.RoutingTable, config)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
peer.Kademlia.Endpoint = kademlia.NewEndpoint(peer.Log.Named("kademlia:endpoint"), peer.Kademlia.Service, peer.Kademlia.RoutingTable)
peer.Kademlia.Endpoint = kademlia.NewEndpoint(peer.Log.Named("kademlia:endpoint"), peer.Kademlia.Service, peer.Kademlia.RoutingTable, 60*time.Second)
pb.RegisterNodesServer(peer.Server.GRPC(), peer.Kademlia.Endpoint)
peer.Kademlia.Inspector = kademlia.NewInspector(peer.Kademlia.Service, peer.Identity)

View File

@ -6,6 +6,7 @@ package kademlia
import (
"context"
"sync/atomic"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
@ -18,25 +19,30 @@ var EndpointError = errs.Class("kademlia endpoint error")
// Endpoint implements the kademlia Endpoints
type Endpoint struct {
log *zap.Logger
service *Kademlia
routingTable *RoutingTable
connected int32
log *zap.Logger
service *Kademlia
routingTable *RoutingTable
connected int32
pingbackTimeout time.Duration
}
// NewEndpoint returns a new kademlia endpoint
func NewEndpoint(log *zap.Logger, service *Kademlia, routingTable *RoutingTable) *Endpoint {
func NewEndpoint(log *zap.Logger, service *Kademlia, routingTable *RoutingTable, pingbackTimeout time.Duration) *Endpoint {
return &Endpoint{
service: service,
routingTable: routingTable,
log: log,
service: service,
routingTable: routingTable,
log: log,
pingbackTimeout: pingbackTimeout,
}
}
// Query is a node to node communication query
func (endpoint *Endpoint) Query(ctx context.Context, req *pb.QueryRequest) (*pb.QueryResponse, error) {
if req.GetPingback() {
endpoint.pingback(ctx, req.Sender)
timedCtx, cancel := context.WithTimeout(ctx, endpoint.pingbackTimeout)
defer cancel()
endpoint.pingback(timedCtx, req.Sender)
}
nodes, err := endpoint.routingTable.FindNear(req.Target.Id, int(req.Limit))

View File

@ -57,12 +57,12 @@ type Kademlia struct {
}
// NewService returns a newly configured Kademlia instance
func NewService(log *zap.Logger, self pb.Node, bootstrapNodes []pb.Node, transport transport.Client, alpha int, rt *RoutingTable) (*Kademlia, error) {
func NewService(log *zap.Logger, self pb.Node, transport transport.Client, rt *RoutingTable, config Config) (*Kademlia, error) {
k := &Kademlia{
log: log,
alpha: alpha,
alpha: config.Alpha,
routingTable: rt,
bootstrapNodes: bootstrapNodes,
bootstrapNodes: config.BootstrapNodes(),
dialer: NewDialer(log.Named("dialer"), transport),
refreshThreshold: int64(time.Minute),
}

View File

@ -162,7 +162,8 @@ func testNode(ctx *testcontext.Context, name string, t *testing.T, bn []pb.Node)
logger := zaptest.NewLogger(t)
k, err := newKademlia(logger, pb.NodeType_STORAGE, bn, lis.Addr().String(), nil, fid, ctx.Dir(name), defaultAlpha)
assert.NoError(t, err)
s := NewEndpoint(logger, k, k.routingTable)
s := NewEndpoint(logger, k, k.routingTable, 60*time.Second)
// new ident opts
serverOptions, err := tlsopts.NewOptions(fid, tlsopts.Config{})
@ -528,5 +529,15 @@ func newKademlia(log *zap.Logger, nodeType pb.NodeType, bootstrapNodes []pb.Node
}
transportClient := transport.NewClient(tlsOptions, rt)
return NewService(log, self, bootstrapNodes, transportClient, alpha, rt)
kadConfig := Config{
Alpha: alpha,
}
kad, err := NewService(log, self, transportClient, rt, kadConfig)
if err != nil {
return nil, err
}
kad.bootstrapNodes = bootstrapNodes
return kad, nil
}

View File

@ -12,6 +12,7 @@ import (
"net/smtp"
"os"
"path/filepath"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
@ -273,13 +274,12 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config) (*
peer.Transport = peer.Transport.WithObservers(peer.Kademlia.RoutingTable)
}
// TODO: reduce number of arguments
peer.Kademlia.Service, err = kademlia.NewService(peer.Log.Named("kademlia"), self, config.BootstrapNodes(), peer.Transport, config.Alpha, peer.Kademlia.RoutingTable)
peer.Kademlia.Service, err = kademlia.NewService(peer.Log.Named("kademlia"), self, peer.Transport, peer.Kademlia.RoutingTable, config)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
peer.Kademlia.Endpoint = kademlia.NewEndpoint(peer.Log.Named("kademlia:endpoint"), peer.Kademlia.Service, peer.Kademlia.RoutingTable)
peer.Kademlia.Endpoint = kademlia.NewEndpoint(peer.Log.Named("kademlia:endpoint"), peer.Kademlia.Service, peer.Kademlia.RoutingTable, 60*time.Second)
pb.RegisterNodesServer(peer.Server.GRPC(), peer.Kademlia.Endpoint)
peer.Kademlia.Inspector = kademlia.NewInspector(peer.Kademlia.Service, peer.Identity)

View File

@ -62,7 +62,11 @@ func TestOrders(t *testing.T) {
})
require.NoError(t, err)
info := &orders.Info{limit, order, uplink.PeerIdentity()}
info := &orders.Info{
Limit: limit,
Order: order,
Uplink: uplink.PeerIdentity(),
}
// basic add
err = ordersdb.Enqueue(ctx, info)

View File

@ -5,6 +5,7 @@ package storagenode
import (
"context"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
@ -158,13 +159,12 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config) (*P
peer.Transport = peer.Transport.WithObservers(peer.Kademlia.RoutingTable)
// TODO: reduce number of arguments
peer.Kademlia.Service, err = kademlia.NewService(peer.Log.Named("kademlia"), self, config.BootstrapNodes(), peer.Transport, config.Alpha, peer.Kademlia.RoutingTable)
peer.Kademlia.Service, err = kademlia.NewService(peer.Log.Named("kademlia"), self, peer.Transport, peer.Kademlia.RoutingTable, config)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
peer.Kademlia.Endpoint = kademlia.NewEndpoint(peer.Log.Named("kademlia:endpoint"), peer.Kademlia.Service, peer.Kademlia.RoutingTable)
peer.Kademlia.Endpoint = kademlia.NewEndpoint(peer.Log.Named("kademlia:endpoint"), peer.Kademlia.Service, peer.Kademlia.RoutingTable, 60*time.Second)
pb.RegisterNodesServer(peer.Server.GRPC(), peer.Kademlia.Endpoint)
peer.Kademlia.Inspector = kademlia.NewInspector(peer.Kademlia.Service, peer.Identity)