From 724aaab78db1fc3f34db4ca5d8c5d013bd86e172 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Wed, 20 Mar 2019 09:30:42 +0100 Subject: [PATCH] fix kademlia bootstrap and getting peer identity from context (#1434) --- pkg/identity/identity.go | 10 ++++++++- pkg/kademlia/dialer.go | 45 ++++++++++++++++++++----------------- pkg/kademlia/dialer_test.go | 36 +++++++++++++++++++++++++---- pkg/kademlia/kademlia.go | 40 +++++++++++++-------------------- 4 files changed, 81 insertions(+), 50 deletions(-) diff --git a/pkg/identity/identity.go b/pkg/identity/identity.go index c18a7b2c3..aa524214a 100644 --- a/pkg/identity/identity.go +++ b/pkg/identity/identity.go @@ -181,7 +181,15 @@ func PeerIdentityFromCerts(leaf, ca *x509.Certificate, rest []*x509.Certificate) // PeerIdentityFromPeer loads a PeerIdentity from a peer connection func PeerIdentityFromPeer(peer *peer.Peer) (*PeerIdentity, error) { - tlsInfo := peer.AuthInfo.(credentials.TLSInfo) + if peer.AuthInfo == nil { + return nil, Error.New("peer AuthInfo is nil") + } + + tlsInfo, ok := peer.AuthInfo.(credentials.TLSInfo) + if !ok { + return nil, Error.New("peer AuthInfo is not credentials.TLSInfo") + } + c := tlsInfo.State.PeerCertificates if len(c) < 2 { return nil, Error.New("invalid certificate chain") diff --git a/pkg/kademlia/dialer.go b/pkg/kademlia/dialer.go index 23f33a39b..718f179b6 100644 --- a/pkg/kademlia/dialer.go +++ b/pkg/kademlia/dialer.go @@ -88,24 +88,8 @@ func (dialer *Dialer) PingNode(ctx context.Context, target pb.Node) (bool, error return err == nil, errs.Combine(err, conn.disconnect()) } -// PingAddress pings target by address (no node ID verification). -func (dialer *Dialer) PingAddress(ctx context.Context, address string, opts ...grpc.CallOption) (bool, error) { - if !dialer.limit.Lock() { - return false, context.Canceled - } - defer dialer.limit.Unlock() - - conn, err := dialer.dialAddress(ctx, address) - if err != nil { - return false, err - } - - _, err = conn.client.Ping(ctx, &pb.PingRequest{}, opts...) - return err == nil, errs.Combine(err, conn.disconnect()) -} - // FetchPeerIdentity connects to a node and returns its peer identity -func (dialer *Dialer) FetchPeerIdentity(ctx context.Context, target pb.Node) (pID *identity.PeerIdentity, err error) { +func (dialer *Dialer) FetchPeerIdentity(ctx context.Context, target pb.Node) (_ *identity.PeerIdentity, err error) { if !dialer.limit.Lock() { return nil, context.Canceled } @@ -120,9 +104,30 @@ func (dialer *Dialer) FetchPeerIdentity(ctx context.Context, target pb.Node) (pI }() p := &peer.Peer{} - pCall := grpc.Peer(p) - _, err = conn.client.Ping(ctx, &pb.PingRequest{}, pCall) - return identity.PeerIdentityFromPeer(p) + _, err = conn.client.Ping(ctx, &pb.PingRequest{}, grpc.Peer(p)) + ident, errFromPeer := identity.PeerIdentityFromPeer(p) + return ident, errs.Combine(err, errFromPeer) +} + +// FetchPeerIdentityUnverified connects to an address and returns its peer identity (no node ID verification). +func (dialer *Dialer) FetchPeerIdentityUnverified(ctx context.Context, address string, opts ...grpc.CallOption) (_ *identity.PeerIdentity, err error) { + if !dialer.limit.Lock() { + return nil, context.Canceled + } + defer dialer.limit.Unlock() + + conn, err := dialer.dialAddress(ctx, address) + if err != nil { + return nil, err + } + defer func() { + err = errs.Combine(err, conn.disconnect()) + }() + + p := &peer.Peer{} + _, err = conn.client.Ping(ctx, &pb.PingRequest{}, grpc.Peer(p)) + ident, errFromPeer := identity.PeerIdentityFromPeer(p) + return ident, errs.Combine(err, errFromPeer) } // FetchInfo connects to a node and returns its node info. diff --git a/pkg/kademlia/dialer_test.go b/pkg/kademlia/dialer_test.go index 6490eb1f6..e9ca6a8b0 100644 --- a/pkg/kademlia/dialer_test.go +++ b/pkg/kademlia/dialer_test.go @@ -34,6 +34,7 @@ func TestDialer(t *testing.T) { defer ctx.Check(dialer.Close) var group errgroup.Group + defer ctx.Check(group.Wait) for _, peer := range peers { peer := peer @@ -46,7 +47,36 @@ func TestDialer(t *testing.T) { return errs.Combine(pingErr, err) }) } + } + + { // FetchPeerIdentity: storage node fetches identity of the satellite + self := planet.StorageNodes[0] + + dialer := kademlia.NewDialer(zaptest.NewLogger(t), self.Transport) + defer ctx.Check(dialer.Close) + + var group errgroup.Group defer ctx.Check(group.Wait) + + group.Go(func() error { + ident, err := dialer.FetchPeerIdentity(ctx, planet.Satellites[0].Local()) + if err != nil { + return fmt.Errorf("failed to fetch peer identity") + } + if ident.ID != planet.Satellites[0].Local().Id { + return fmt.Errorf("fetched wrong identity") + } + + ident, err = dialer.FetchPeerIdentityUnverified(ctx, planet.Satellites[0].Addr()) + if err != nil { + return fmt.Errorf("failed to fetch peer identity from address") + } + if ident.ID != planet.Satellites[0].Local().Id { + return fmt.Errorf("fetched wrong identity from address") + } + + return nil + }) } { // Lookup: storage node query every node for everyone elese @@ -55,6 +85,7 @@ func TestDialer(t *testing.T) { defer ctx.Check(dialer.Close) var group errgroup.Group + defer ctx.Check(group.Wait) for _, peer := range peers { peer := peer @@ -82,8 +113,6 @@ func TestDialer(t *testing.T) { return nil }) } - - defer ctx.Check(group.Wait) } { // Lookup: storage node queries every node for missing storj.NodeID{} and storj.NodeID{255} @@ -97,6 +126,7 @@ func TestDialer(t *testing.T) { } var group errgroup.Group + defer ctx.Check(group.Wait) for _, target := range targets { target := target @@ -118,8 +148,6 @@ func TestDialer(t *testing.T) { }) } } - - defer ctx.Check(group.Wait) } }) } diff --git a/pkg/kademlia/kademlia.go b/pkg/kademlia/kademlia.go index 06a49d964..03e3abd53 100644 --- a/pkg/kademlia/kademlia.go +++ b/pkg/kademlia/kademlia.go @@ -11,8 +11,6 @@ import ( "github.com/zeebo/errs" "go.uber.org/zap" - "google.golang.org/grpc" - "google.golang.org/grpc/peer" "storj.io/storj/internal/sync2" "storj.io/storj/pkg/identity" @@ -121,47 +119,39 @@ func (k *Kademlia) Bootstrap(ctx context.Context) error { return nil } - var errs errs.Group + var errGroup errs.Group + var foundOnlineBootstrap bool for i, node := range k.bootstrapNodes { if ctx.Err() != nil { - errs.Add(ctx.Err()) - return errs.Err() + errGroup.Add(ctx.Err()) + return errGroup.Err() } - p := &peer.Peer{} - pCall := grpc.Peer(p) - _, err := k.dialer.PingAddress(ctx, node.Address.Address, pCall) + ident, err := k.dialer.FetchPeerIdentityUnverified(ctx, node.Address.Address) if err != nil { - errs.Add(err) - } - - ident, err := identity.PeerIdentityFromPeer(p) - if err != nil { - errs.Add(err) + errGroup.Add(err) + continue } k.routingTable.mutex.Lock() node.Id = ident.ID k.bootstrapNodes[i] = node k.routingTable.mutex.Unlock() - if err == nil { - // We have pinged successfully one bootstrap node. - // Clear any errors and break the cycle. - errs = nil - break - } - errs.Add(err) + foundOnlineBootstrap = true } - err := errs.Err() - if err != nil { - return err + + if !foundOnlineBootstrap { + err := errGroup.Err() + if err != nil { + return err + } } //find nodes most similar to self k.routingTable.mutex.Lock() id := k.routingTable.self.Id k.routingTable.mutex.Unlock() - _, err = k.lookup(ctx, id, true) + _, err := k.lookup(ctx, id, true) // TODO(dylan): We do not currently handle this last bit of behavior. // ```