fix kademlia bootstrap and getting peer identity from context (#1434)
This commit is contained in:
parent
61ee04d363
commit
724aaab78d
@ -181,7 +181,15 @@ func PeerIdentityFromCerts(leaf, ca *x509.Certificate, rest []*x509.Certificate)
|
||||
|
||||
// PeerIdentityFromPeer loads a PeerIdentity from a peer connection
|
||||
func PeerIdentityFromPeer(peer *peer.Peer) (*PeerIdentity, error) {
|
||||
tlsInfo := peer.AuthInfo.(credentials.TLSInfo)
|
||||
if peer.AuthInfo == nil {
|
||||
return nil, Error.New("peer AuthInfo is nil")
|
||||
}
|
||||
|
||||
tlsInfo, ok := peer.AuthInfo.(credentials.TLSInfo)
|
||||
if !ok {
|
||||
return nil, Error.New("peer AuthInfo is not credentials.TLSInfo")
|
||||
}
|
||||
|
||||
c := tlsInfo.State.PeerCertificates
|
||||
if len(c) < 2 {
|
||||
return nil, Error.New("invalid certificate chain")
|
||||
|
@ -88,24 +88,8 @@ func (dialer *Dialer) PingNode(ctx context.Context, target pb.Node) (bool, error
|
||||
return err == nil, errs.Combine(err, conn.disconnect())
|
||||
}
|
||||
|
||||
// PingAddress pings target by address (no node ID verification).
|
||||
func (dialer *Dialer) PingAddress(ctx context.Context, address string, opts ...grpc.CallOption) (bool, error) {
|
||||
if !dialer.limit.Lock() {
|
||||
return false, context.Canceled
|
||||
}
|
||||
defer dialer.limit.Unlock()
|
||||
|
||||
conn, err := dialer.dialAddress(ctx, address)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
_, err = conn.client.Ping(ctx, &pb.PingRequest{}, opts...)
|
||||
return err == nil, errs.Combine(err, conn.disconnect())
|
||||
}
|
||||
|
||||
// FetchPeerIdentity connects to a node and returns its peer identity
|
||||
func (dialer *Dialer) FetchPeerIdentity(ctx context.Context, target pb.Node) (pID *identity.PeerIdentity, err error) {
|
||||
func (dialer *Dialer) FetchPeerIdentity(ctx context.Context, target pb.Node) (_ *identity.PeerIdentity, err error) {
|
||||
if !dialer.limit.Lock() {
|
||||
return nil, context.Canceled
|
||||
}
|
||||
@ -120,9 +104,30 @@ func (dialer *Dialer) FetchPeerIdentity(ctx context.Context, target pb.Node) (pI
|
||||
}()
|
||||
|
||||
p := &peer.Peer{}
|
||||
pCall := grpc.Peer(p)
|
||||
_, err = conn.client.Ping(ctx, &pb.PingRequest{}, pCall)
|
||||
return identity.PeerIdentityFromPeer(p)
|
||||
_, err = conn.client.Ping(ctx, &pb.PingRequest{}, grpc.Peer(p))
|
||||
ident, errFromPeer := identity.PeerIdentityFromPeer(p)
|
||||
return ident, errs.Combine(err, errFromPeer)
|
||||
}
|
||||
|
||||
// FetchPeerIdentityUnverified connects to an address and returns its peer identity (no node ID verification).
|
||||
func (dialer *Dialer) FetchPeerIdentityUnverified(ctx context.Context, address string, opts ...grpc.CallOption) (_ *identity.PeerIdentity, err error) {
|
||||
if !dialer.limit.Lock() {
|
||||
return nil, context.Canceled
|
||||
}
|
||||
defer dialer.limit.Unlock()
|
||||
|
||||
conn, err := dialer.dialAddress(ctx, address)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
err = errs.Combine(err, conn.disconnect())
|
||||
}()
|
||||
|
||||
p := &peer.Peer{}
|
||||
_, err = conn.client.Ping(ctx, &pb.PingRequest{}, grpc.Peer(p))
|
||||
ident, errFromPeer := identity.PeerIdentityFromPeer(p)
|
||||
return ident, errs.Combine(err, errFromPeer)
|
||||
}
|
||||
|
||||
// FetchInfo connects to a node and returns its node info.
|
||||
|
@ -34,6 +34,7 @@ func TestDialer(t *testing.T) {
|
||||
defer ctx.Check(dialer.Close)
|
||||
|
||||
var group errgroup.Group
|
||||
defer ctx.Check(group.Wait)
|
||||
|
||||
for _, peer := range peers {
|
||||
peer := peer
|
||||
@ -46,7 +47,36 @@ func TestDialer(t *testing.T) {
|
||||
return errs.Combine(pingErr, err)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
{ // FetchPeerIdentity: storage node fetches identity of the satellite
|
||||
self := planet.StorageNodes[0]
|
||||
|
||||
dialer := kademlia.NewDialer(zaptest.NewLogger(t), self.Transport)
|
||||
defer ctx.Check(dialer.Close)
|
||||
|
||||
var group errgroup.Group
|
||||
defer ctx.Check(group.Wait)
|
||||
|
||||
group.Go(func() error {
|
||||
ident, err := dialer.FetchPeerIdentity(ctx, planet.Satellites[0].Local())
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to fetch peer identity")
|
||||
}
|
||||
if ident.ID != planet.Satellites[0].Local().Id {
|
||||
return fmt.Errorf("fetched wrong identity")
|
||||
}
|
||||
|
||||
ident, err = dialer.FetchPeerIdentityUnverified(ctx, planet.Satellites[0].Addr())
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to fetch peer identity from address")
|
||||
}
|
||||
if ident.ID != planet.Satellites[0].Local().Id {
|
||||
return fmt.Errorf("fetched wrong identity from address")
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
{ // Lookup: storage node query every node for everyone elese
|
||||
@ -55,6 +85,7 @@ func TestDialer(t *testing.T) {
|
||||
defer ctx.Check(dialer.Close)
|
||||
|
||||
var group errgroup.Group
|
||||
defer ctx.Check(group.Wait)
|
||||
|
||||
for _, peer := range peers {
|
||||
peer := peer
|
||||
@ -82,8 +113,6 @@ func TestDialer(t *testing.T) {
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
defer ctx.Check(group.Wait)
|
||||
}
|
||||
|
||||
{ // Lookup: storage node queries every node for missing storj.NodeID{} and storj.NodeID{255}
|
||||
@ -97,6 +126,7 @@ func TestDialer(t *testing.T) {
|
||||
}
|
||||
|
||||
var group errgroup.Group
|
||||
defer ctx.Check(group.Wait)
|
||||
|
||||
for _, target := range targets {
|
||||
target := target
|
||||
@ -118,8 +148,6 @@ func TestDialer(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
defer ctx.Check(group.Wait)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -11,8 +11,6 @@ import (
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/peer"
|
||||
|
||||
"storj.io/storj/internal/sync2"
|
||||
"storj.io/storj/pkg/identity"
|
||||
@ -121,47 +119,39 @@ func (k *Kademlia) Bootstrap(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
var errs errs.Group
|
||||
var errGroup errs.Group
|
||||
var foundOnlineBootstrap bool
|
||||
for i, node := range k.bootstrapNodes {
|
||||
if ctx.Err() != nil {
|
||||
errs.Add(ctx.Err())
|
||||
return errs.Err()
|
||||
errGroup.Add(ctx.Err())
|
||||
return errGroup.Err()
|
||||
}
|
||||
|
||||
p := &peer.Peer{}
|
||||
pCall := grpc.Peer(p)
|
||||
_, err := k.dialer.PingAddress(ctx, node.Address.Address, pCall)
|
||||
ident, err := k.dialer.FetchPeerIdentityUnverified(ctx, node.Address.Address)
|
||||
if err != nil {
|
||||
errs.Add(err)
|
||||
}
|
||||
|
||||
ident, err := identity.PeerIdentityFromPeer(p)
|
||||
if err != nil {
|
||||
errs.Add(err)
|
||||
errGroup.Add(err)
|
||||
continue
|
||||
}
|
||||
|
||||
k.routingTable.mutex.Lock()
|
||||
node.Id = ident.ID
|
||||
k.bootstrapNodes[i] = node
|
||||
k.routingTable.mutex.Unlock()
|
||||
if err == nil {
|
||||
// We have pinged successfully one bootstrap node.
|
||||
// Clear any errors and break the cycle.
|
||||
errs = nil
|
||||
break
|
||||
foundOnlineBootstrap = true
|
||||
}
|
||||
errs.Add(err)
|
||||
}
|
||||
err := errs.Err()
|
||||
|
||||
if !foundOnlineBootstrap {
|
||||
err := errGroup.Err()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
//find nodes most similar to self
|
||||
k.routingTable.mutex.Lock()
|
||||
id := k.routingTable.self.Id
|
||||
k.routingTable.mutex.Unlock()
|
||||
_, err = k.lookup(ctx, id, true)
|
||||
_, err := k.lookup(ctx, id, true)
|
||||
|
||||
// TODO(dylan): We do not currently handle this last bit of behavior.
|
||||
// ```
|
||||
|
Loading…
Reference in New Issue
Block a user