From 29b96a666b0a65b6e600da7c47a1784464b83ead Mon Sep 17 00:00:00 2001 From: Jennifer Li Johnson Date: Fri, 27 Sep 2019 09:47:57 -0600 Subject: [PATCH] internal/testplanet: fix conn leak (#3132) --- internal/testplanet/planet_test.go | 2 +- .../kademliaclient/kademliaclient_test.go | 278 ------------------ storagenode/contact/contact_test.go | 16 + storagenode/contact/service.go | 17 +- 4 files changed, 25 insertions(+), 288 deletions(-) delete mode 100644 pkg/kademlia/kademliaclient/kademliaclient_test.go diff --git a/internal/testplanet/planet_test.go b/internal/testplanet/planet_test.go index bf75013c8..43167a9ce 100644 --- a/internal/testplanet/planet_test.go +++ b/internal/testplanet/planet_test.go @@ -44,6 +44,7 @@ func TestBasic(t *testing.T) { node := sn.Local() conn, err := sn.Dialer.DialNode(ctx, &satellite) require.NoError(t, err) + defer ctx.Check(conn.Close) _, err = conn.NodeClient().CheckIn(ctx, &pb.CheckInRequest{ Address: node.GetAddress().GetAddress(), Version: &node.Version, @@ -52,7 +53,6 @@ func TestBasic(t *testing.T) { }) require.NoError(t, err) } - } // wait a bit to see whether some failures occur time.Sleep(time.Second) diff --git a/pkg/kademlia/kademliaclient/kademliaclient_test.go b/pkg/kademlia/kademliaclient/kademliaclient_test.go deleted file mode 100644 index 8632c9a3b..000000000 --- a/pkg/kademlia/kademliaclient/kademliaclient_test.go +++ /dev/null @@ -1,278 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information. - -package kademliaclient_test - -//func TestDialer(t *testing.T) { -// ctx := testcontext.New(t) -// defer ctx.Cleanup() -// -// planet, err := testplanet.New(t, 1, 4, 3) -// require.NoError(t, err) -// defer ctx.Check(planet.Shutdown) -// -// planet.Start(ctx) -// -// expectedKademliaEntries := len(planet.Satellites) + len(planet.StorageNodes) -// -// // TODO: also use satellites -// peers := planet.StorageNodes -// -// { // PingNode: storage node pings all other storage nodes -// self := planet.StorageNodes[0] -// -// dialer := kademliaclient.NewDialer(zaptest.NewLogger(t), self.Transport) -// defer ctx.Check(dialer.Close) -// -// var group errgroup.Group -// defer ctx.Check(group.Wait) -// -// for _, peer := range peers { -// peer := peer -// group.Go(func() error { -// pinged, err := dialer.PingNode(ctx, peer.Local().Node) -// var pingErr error -// if !pinged { -// pingErr = fmt.Errorf("ping to %s should have succeeded", peer.ID()) -// } -// return errs.Combine(pingErr, err) -// }) -// } -// } -// -// { // FetchPeerIdentity: storage node fetches identity of the satellite -// self := planet.StorageNodes[0] -// -// dialer := kademliaclient.NewDialer(zaptest.NewLogger(t), self.Transport) -// defer ctx.Check(dialer.Close) -// -// var group errgroup.Group -// defer ctx.Check(group.Wait) -// -// group.Go(func() error { -// ident, err := dialer.FetchPeerIdentity(ctx, planet.Satellites[0].Local().Node) -// if err != nil { -// return fmt.Errorf("failed to fetch peer identity") -// } -// if ident.ID != planet.Satellites[0].Local().Id { -// return fmt.Errorf("fetched wrong identity") -// } -// -// ident, err = dialer.FetchPeerIdentityUnverified(ctx, planet.Satellites[0].Addr()) -// if err != nil { -// return fmt.Errorf("failed to fetch peer identity from address") -// } -// if ident.ID != planet.Satellites[0].Local().Id { -// return fmt.Errorf("fetched wrong identity from address") -// } -// -// return nil -// }) -// } -// -// { // Lookup: storage node query every node for everyone elese -// self := planet.StorageNodes[1] -// dialer := kademliaclient.NewDialer(zaptest.NewLogger(t), self.Transport) -// defer ctx.Check(dialer.Close) -// -// var group errgroup.Group -// defer ctx.Check(group.Wait) -// -// for _, peer := range peers { -// peer := peer -// group.Go(func() error { -// for _, target := range peers { -// errTag := fmt.Errorf("lookup peer:%s target:%s", peer.ID(), target.ID()) -// -// selfnode := self.Local().Node -// results, err := dialer.Lookup(ctx, &selfnode, peer.Local().Node, target.Local().Node.Id, self.Kademlia.RoutingTable.K()) -// if err != nil { -// return errs.Combine(errTag, err) -// } -// -// if containsResult(results, target.ID()) { -// continue -// } -// -// // with small network we expect to return everything -// if len(results) != expectedKademliaEntries { -// return errs.Combine(errTag, fmt.Errorf("expected %d got %d: %s", expectedKademliaEntries, len(results), pb.NodesToIDs(results))) -// } -// return nil -// } -// return nil -// }) -// } -// } -// -// { // Lookup: storage node queries every node for missing storj.NodeID{} and storj.NodeID{255} -// self := planet.StorageNodes[2] -// dialer := kademliaclient.NewDialer(zaptest.NewLogger(t), self.Transport) -// defer ctx.Check(dialer.Close) -// -// targets := []storj.NodeID{ -// {}, // empty target -// {255}, // non-empty -// } -// -// var group errgroup.Group -// defer ctx.Check(group.Wait) -// -// for _, target := range targets { -// target := target -// for _, peer := range peers { -// peer := peer -// group.Go(func() error { -// errTag := fmt.Errorf("invalid lookup peer:%s target:%s", peer.ID(), target) -// -// selfnode := self.Local().Node -// results, err := dialer.Lookup(ctx, &selfnode, peer.Local().Node, target, self.Kademlia.RoutingTable.K()) -// if err != nil { -// return errs.Combine(errTag, err) -// } -// -// // with small network we expect to return everything -// if len(results) != expectedKademliaEntries { -// return errs.Combine(errTag, fmt.Errorf("expected %d got %d: %s", expectedKademliaEntries, len(results), pb.NodesToIDs(results))) -// } -// return nil -// }) -// } -// } -// } -//} -// -//func TestSlowDialerHasTimeout(t *testing.T) { -// ctx := testcontext.New(t) -// defer ctx.Cleanup() -// -// planet, err := testplanet.New(t, 1, 4, 0) -// require.NoError(t, err) -// defer ctx.Check(planet.Shutdown) -// -// planet.Start(ctx) -// -// // TODO: also use satellites -// peers := planet.StorageNodes -// -// func() { // PingNode -// self := planet.StorageNodes[0] -// -// tlsOpts, err := tlsopts.NewOptions(self.Identity, tlsopts.Config{}, nil) -// require.NoError(t, err) -// -// self.Transport = transport.NewClientWithTimeouts(tlsOpts, transport.Timeouts{ -// Dial: 20 * time.Millisecond, -// }) -// -// network := &transport.SimulatedNetwork{ -// DialLatency: 200 * time.Second, -// BytesPerSecond: 1 * memory.KB, -// } -// -// slowClient := network.NewClient(self.Transport) -// require.NotNil(t, slowClient) -// -// dialer := kademliaclient.NewDialer(zaptest.NewLogger(t), slowClient) -// defer ctx.Check(dialer.Close) -// -// var group errgroup.Group -// defer ctx.Check(group.Wait) -// -// for _, peer := range peers { -// peer := peer -// group.Go(func() error { -// _, err := dialer.PingNode(ctx, peer.Local().Node) -// if !transport.Error.Has(err) || errs.Unwrap(err) != context.DeadlineExceeded { -// return errs.New("invalid error: %v", err) -// } -// return nil -// }) -// } -// }() -// -// func() { // FetchPeerIdentity -// self := planet.StorageNodes[1] -// -// tlsOpts, err := tlsopts.NewOptions(self.Identity, tlsopts.Config{}, nil) -// require.NoError(t, err) -// -// self.Transport = transport.NewClientWithTimeouts(tlsOpts, transport.Timeouts{ -// Dial: 20 * time.Millisecond, -// }) -// -// network := &transport.SimulatedNetwork{ -// DialLatency: 200 * time.Second, -// BytesPerSecond: 1 * memory.KB, -// } -// -// slowClient := network.NewClient(self.Transport) -// require.NotNil(t, slowClient) -// -// dialer := kademliaclient.NewDialer(zaptest.NewLogger(t), slowClient) -// defer ctx.Check(dialer.Close) -// -// var group errgroup.Group -// defer ctx.Check(group.Wait) -// -// group.Go(func() error { -// _, err := dialer.FetchPeerIdentity(ctx, planet.Satellites[0].Local().Node) -// if !transport.Error.Has(err) || errs.Unwrap(err) != context.DeadlineExceeded { -// return errs.New("invalid error: %v", err) -// } -// _, err = dialer.FetchPeerIdentityUnverified(ctx, planet.Satellites[0].Addr()) -// if !transport.Error.Has(err) || errs.Unwrap(err) != context.DeadlineExceeded { -// return errs.New("invalid error: %v", err) -// } -// return nil -// }) -// }() -// -// func() { // Lookup -// self := planet.StorageNodes[2] -// -// tlsOpts, err := tlsopts.NewOptions(self.Identity, tlsopts.Config{}, nil) -// require.NoError(t, err) -// -// self.Transport = transport.NewClientWithTimeouts(tlsOpts, transport.Timeouts{ -// Dial: 20 * time.Millisecond, -// }) -// -// network := &transport.SimulatedNetwork{ -// DialLatency: 200 * time.Second, -// BytesPerSecond: 1 * memory.KB, -// } -// -// slowClient := network.NewClient(self.Transport) -// require.NotNil(t, slowClient) -// -// dialer := kademliaclient.NewDialer(zaptest.NewLogger(t), slowClient) -// defer ctx.Check(dialer.Close) -// -// var group errgroup.Group -// defer ctx.Check(group.Wait) -// -// for _, peer := range peers { -// peer := peer -// group.Go(func() error { -// for _, target := range peers { -// selfnode := self.Local().Node -// _, err := dialer.Lookup(ctx, &selfnode, peer.Local().Node, target.Local().Node.Id, self.Kademlia.RoutingTable.K()) -// if !transport.Error.Has(err) || errs.Unwrap(err) != context.DeadlineExceeded { -// return errs.New("invalid error: %v (peer:%s target:%s)", err, peer.ID(), target.ID()) -// } -// } -// return nil -// }) -// } -// }() -//} -// -//func containsResult(nodes []*pb.Node, target storj.NodeID) bool { -// for _, node := range nodes { -// if node.Id == target { -// return true -// } -// } -// return false -//} diff --git a/storagenode/contact/contact_test.go b/storagenode/contact/contact_test.go index 02152329e..f903a94a7 100644 --- a/storagenode/contact/contact_test.go +++ b/storagenode/contact/contact_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" "storj.io/storj/internal/errs2" "storj.io/storj/internal/testcontext" @@ -121,3 +122,18 @@ func TestRequestInfoEndpointUntrustedSatellite(t *testing.T) { require.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied)) }) } + +func TestLocalAndUpdateSelf(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + node := planet.StorageNodes[0] + var group errgroup.Group + group.Go(func() error { + _ = node.Contact.Service.Local() + return nil + }) + node.Contact.Service.UpdateSelf(&pb.NodeCapacity{}) + _ = group.Wait() + }) +} diff --git a/storagenode/contact/service.go b/storagenode/contact/service.go index 197a1c0de..eda9bd2f5 100644 --- a/storagenode/contact/service.go +++ b/storagenode/contact/service.go @@ -34,30 +34,29 @@ type Config struct { type Service struct { log *zap.Logger - mutex *sync.Mutex - self *overlay.NodeDossier + mu sync.Mutex + self *overlay.NodeDossier } // NewService creates a new contact service func NewService(log *zap.Logger, self *overlay.NodeDossier) *Service { return &Service{ - log: log, - mutex: &sync.Mutex{}, - self: self, + log: log, + self: self, } } // Local returns the storagenode node-dossier func (service *Service) Local() overlay.NodeDossier { - service.mutex.Lock() - defer service.mutex.Unlock() + service.mu.Lock() + defer service.mu.Unlock() return *service.self } // UpdateSelf updates the local node with the capacity func (service *Service) UpdateSelf(capacity *pb.NodeCapacity) { - service.mutex.Lock() - defer service.mutex.Unlock() + service.mu.Lock() + defer service.mu.Unlock() if capacity != nil { service.self.Capacity = *capacity }