internal/testplanet: fix conn leak (#3132)
This commit is contained in:
parent
02f68d68d6
commit
29b96a666b
@ -44,6 +44,7 @@ func TestBasic(t *testing.T) {
|
||||
node := sn.Local()
|
||||
conn, err := sn.Dialer.DialNode(ctx, &satellite)
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(conn.Close)
|
||||
_, err = conn.NodeClient().CheckIn(ctx, &pb.CheckInRequest{
|
||||
Address: node.GetAddress().GetAddress(),
|
||||
Version: &node.Version,
|
||||
@ -52,7 +53,6 @@ func TestBasic(t *testing.T) {
|
||||
})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
}
|
||||
// wait a bit to see whether some failures occur
|
||||
time.Sleep(time.Second)
|
||||
|
@ -1,278 +0,0 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package kademliaclient_test
|
||||
|
||||
//func TestDialer(t *testing.T) {
|
||||
// ctx := testcontext.New(t)
|
||||
// defer ctx.Cleanup()
|
||||
//
|
||||
// planet, err := testplanet.New(t, 1, 4, 3)
|
||||
// require.NoError(t, err)
|
||||
// defer ctx.Check(planet.Shutdown)
|
||||
//
|
||||
// planet.Start(ctx)
|
||||
//
|
||||
// expectedKademliaEntries := len(planet.Satellites) + len(planet.StorageNodes)
|
||||
//
|
||||
// // TODO: also use satellites
|
||||
// peers := planet.StorageNodes
|
||||
//
|
||||
// { // PingNode: storage node pings all other storage nodes
|
||||
// self := planet.StorageNodes[0]
|
||||
//
|
||||
// dialer := kademliaclient.NewDialer(zaptest.NewLogger(t), self.Transport)
|
||||
// defer ctx.Check(dialer.Close)
|
||||
//
|
||||
// var group errgroup.Group
|
||||
// defer ctx.Check(group.Wait)
|
||||
//
|
||||
// for _, peer := range peers {
|
||||
// peer := peer
|
||||
// group.Go(func() error {
|
||||
// pinged, err := dialer.PingNode(ctx, peer.Local().Node)
|
||||
// var pingErr error
|
||||
// if !pinged {
|
||||
// pingErr = fmt.Errorf("ping to %s should have succeeded", peer.ID())
|
||||
// }
|
||||
// return errs.Combine(pingErr, err)
|
||||
// })
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// { // FetchPeerIdentity: storage node fetches identity of the satellite
|
||||
// self := planet.StorageNodes[0]
|
||||
//
|
||||
// dialer := kademliaclient.NewDialer(zaptest.NewLogger(t), self.Transport)
|
||||
// defer ctx.Check(dialer.Close)
|
||||
//
|
||||
// var group errgroup.Group
|
||||
// defer ctx.Check(group.Wait)
|
||||
//
|
||||
// group.Go(func() error {
|
||||
// ident, err := dialer.FetchPeerIdentity(ctx, planet.Satellites[0].Local().Node)
|
||||
// if err != nil {
|
||||
// return fmt.Errorf("failed to fetch peer identity")
|
||||
// }
|
||||
// if ident.ID != planet.Satellites[0].Local().Id {
|
||||
// return fmt.Errorf("fetched wrong identity")
|
||||
// }
|
||||
//
|
||||
// ident, err = dialer.FetchPeerIdentityUnverified(ctx, planet.Satellites[0].Addr())
|
||||
// if err != nil {
|
||||
// return fmt.Errorf("failed to fetch peer identity from address")
|
||||
// }
|
||||
// if ident.ID != planet.Satellites[0].Local().Id {
|
||||
// return fmt.Errorf("fetched wrong identity from address")
|
||||
// }
|
||||
//
|
||||
// return nil
|
||||
// })
|
||||
// }
|
||||
//
|
||||
// { // Lookup: storage node query every node for everyone elese
|
||||
// self := planet.StorageNodes[1]
|
||||
// dialer := kademliaclient.NewDialer(zaptest.NewLogger(t), self.Transport)
|
||||
// defer ctx.Check(dialer.Close)
|
||||
//
|
||||
// var group errgroup.Group
|
||||
// defer ctx.Check(group.Wait)
|
||||
//
|
||||
// for _, peer := range peers {
|
||||
// peer := peer
|
||||
// group.Go(func() error {
|
||||
// for _, target := range peers {
|
||||
// errTag := fmt.Errorf("lookup peer:%s target:%s", peer.ID(), target.ID())
|
||||
//
|
||||
// selfnode := self.Local().Node
|
||||
// results, err := dialer.Lookup(ctx, &selfnode, peer.Local().Node, target.Local().Node.Id, self.Kademlia.RoutingTable.K())
|
||||
// if err != nil {
|
||||
// return errs.Combine(errTag, err)
|
||||
// }
|
||||
//
|
||||
// if containsResult(results, target.ID()) {
|
||||
// continue
|
||||
// }
|
||||
//
|
||||
// // with small network we expect to return everything
|
||||
// if len(results) != expectedKademliaEntries {
|
||||
// return errs.Combine(errTag, fmt.Errorf("expected %d got %d: %s", expectedKademliaEntries, len(results), pb.NodesToIDs(results)))
|
||||
// }
|
||||
// return nil
|
||||
// }
|
||||
// return nil
|
||||
// })
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// { // Lookup: storage node queries every node for missing storj.NodeID{} and storj.NodeID{255}
|
||||
// self := planet.StorageNodes[2]
|
||||
// dialer := kademliaclient.NewDialer(zaptest.NewLogger(t), self.Transport)
|
||||
// defer ctx.Check(dialer.Close)
|
||||
//
|
||||
// targets := []storj.NodeID{
|
||||
// {}, // empty target
|
||||
// {255}, // non-empty
|
||||
// }
|
||||
//
|
||||
// var group errgroup.Group
|
||||
// defer ctx.Check(group.Wait)
|
||||
//
|
||||
// for _, target := range targets {
|
||||
// target := target
|
||||
// for _, peer := range peers {
|
||||
// peer := peer
|
||||
// group.Go(func() error {
|
||||
// errTag := fmt.Errorf("invalid lookup peer:%s target:%s", peer.ID(), target)
|
||||
//
|
||||
// selfnode := self.Local().Node
|
||||
// results, err := dialer.Lookup(ctx, &selfnode, peer.Local().Node, target, self.Kademlia.RoutingTable.K())
|
||||
// if err != nil {
|
||||
// return errs.Combine(errTag, err)
|
||||
// }
|
||||
//
|
||||
// // with small network we expect to return everything
|
||||
// if len(results) != expectedKademliaEntries {
|
||||
// return errs.Combine(errTag, fmt.Errorf("expected %d got %d: %s", expectedKademliaEntries, len(results), pb.NodesToIDs(results)))
|
||||
// }
|
||||
// return nil
|
||||
// })
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//}
|
||||
//
|
||||
//func TestSlowDialerHasTimeout(t *testing.T) {
|
||||
// ctx := testcontext.New(t)
|
||||
// defer ctx.Cleanup()
|
||||
//
|
||||
// planet, err := testplanet.New(t, 1, 4, 0)
|
||||
// require.NoError(t, err)
|
||||
// defer ctx.Check(planet.Shutdown)
|
||||
//
|
||||
// planet.Start(ctx)
|
||||
//
|
||||
// // TODO: also use satellites
|
||||
// peers := planet.StorageNodes
|
||||
//
|
||||
// func() { // PingNode
|
||||
// self := planet.StorageNodes[0]
|
||||
//
|
||||
// tlsOpts, err := tlsopts.NewOptions(self.Identity, tlsopts.Config{}, nil)
|
||||
// require.NoError(t, err)
|
||||
//
|
||||
// self.Transport = transport.NewClientWithTimeouts(tlsOpts, transport.Timeouts{
|
||||
// Dial: 20 * time.Millisecond,
|
||||
// })
|
||||
//
|
||||
// network := &transport.SimulatedNetwork{
|
||||
// DialLatency: 200 * time.Second,
|
||||
// BytesPerSecond: 1 * memory.KB,
|
||||
// }
|
||||
//
|
||||
// slowClient := network.NewClient(self.Transport)
|
||||
// require.NotNil(t, slowClient)
|
||||
//
|
||||
// dialer := kademliaclient.NewDialer(zaptest.NewLogger(t), slowClient)
|
||||
// defer ctx.Check(dialer.Close)
|
||||
//
|
||||
// var group errgroup.Group
|
||||
// defer ctx.Check(group.Wait)
|
||||
//
|
||||
// for _, peer := range peers {
|
||||
// peer := peer
|
||||
// group.Go(func() error {
|
||||
// _, err := dialer.PingNode(ctx, peer.Local().Node)
|
||||
// if !transport.Error.Has(err) || errs.Unwrap(err) != context.DeadlineExceeded {
|
||||
// return errs.New("invalid error: %v", err)
|
||||
// }
|
||||
// return nil
|
||||
// })
|
||||
// }
|
||||
// }()
|
||||
//
|
||||
// func() { // FetchPeerIdentity
|
||||
// self := planet.StorageNodes[1]
|
||||
//
|
||||
// tlsOpts, err := tlsopts.NewOptions(self.Identity, tlsopts.Config{}, nil)
|
||||
// require.NoError(t, err)
|
||||
//
|
||||
// self.Transport = transport.NewClientWithTimeouts(tlsOpts, transport.Timeouts{
|
||||
// Dial: 20 * time.Millisecond,
|
||||
// })
|
||||
//
|
||||
// network := &transport.SimulatedNetwork{
|
||||
// DialLatency: 200 * time.Second,
|
||||
// BytesPerSecond: 1 * memory.KB,
|
||||
// }
|
||||
//
|
||||
// slowClient := network.NewClient(self.Transport)
|
||||
// require.NotNil(t, slowClient)
|
||||
//
|
||||
// dialer := kademliaclient.NewDialer(zaptest.NewLogger(t), slowClient)
|
||||
// defer ctx.Check(dialer.Close)
|
||||
//
|
||||
// var group errgroup.Group
|
||||
// defer ctx.Check(group.Wait)
|
||||
//
|
||||
// group.Go(func() error {
|
||||
// _, err := dialer.FetchPeerIdentity(ctx, planet.Satellites[0].Local().Node)
|
||||
// if !transport.Error.Has(err) || errs.Unwrap(err) != context.DeadlineExceeded {
|
||||
// return errs.New("invalid error: %v", err)
|
||||
// }
|
||||
// _, err = dialer.FetchPeerIdentityUnverified(ctx, planet.Satellites[0].Addr())
|
||||
// if !transport.Error.Has(err) || errs.Unwrap(err) != context.DeadlineExceeded {
|
||||
// return errs.New("invalid error: %v", err)
|
||||
// }
|
||||
// return nil
|
||||
// })
|
||||
// }()
|
||||
//
|
||||
// func() { // Lookup
|
||||
// self := planet.StorageNodes[2]
|
||||
//
|
||||
// tlsOpts, err := tlsopts.NewOptions(self.Identity, tlsopts.Config{}, nil)
|
||||
// require.NoError(t, err)
|
||||
//
|
||||
// self.Transport = transport.NewClientWithTimeouts(tlsOpts, transport.Timeouts{
|
||||
// Dial: 20 * time.Millisecond,
|
||||
// })
|
||||
//
|
||||
// network := &transport.SimulatedNetwork{
|
||||
// DialLatency: 200 * time.Second,
|
||||
// BytesPerSecond: 1 * memory.KB,
|
||||
// }
|
||||
//
|
||||
// slowClient := network.NewClient(self.Transport)
|
||||
// require.NotNil(t, slowClient)
|
||||
//
|
||||
// dialer := kademliaclient.NewDialer(zaptest.NewLogger(t), slowClient)
|
||||
// defer ctx.Check(dialer.Close)
|
||||
//
|
||||
// var group errgroup.Group
|
||||
// defer ctx.Check(group.Wait)
|
||||
//
|
||||
// for _, peer := range peers {
|
||||
// peer := peer
|
||||
// group.Go(func() error {
|
||||
// for _, target := range peers {
|
||||
// selfnode := self.Local().Node
|
||||
// _, err := dialer.Lookup(ctx, &selfnode, peer.Local().Node, target.Local().Node.Id, self.Kademlia.RoutingTable.K())
|
||||
// if !transport.Error.Has(err) || errs.Unwrap(err) != context.DeadlineExceeded {
|
||||
// return errs.New("invalid error: %v (peer:%s target:%s)", err, peer.ID(), target.ID())
|
||||
// }
|
||||
// }
|
||||
// return nil
|
||||
// })
|
||||
// }
|
||||
// }()
|
||||
//}
|
||||
//
|
||||
//func containsResult(nodes []*pb.Node, target storj.NodeID) bool {
|
||||
// for _, node := range nodes {
|
||||
// if node.Id == target {
|
||||
// return true
|
||||
// }
|
||||
// }
|
||||
// return false
|
||||
//}
|
@ -7,6 +7,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"storj.io/storj/internal/errs2"
|
||||
"storj.io/storj/internal/testcontext"
|
||||
@ -121,3 +122,18 @@ func TestRequestInfoEndpointUntrustedSatellite(t *testing.T) {
|
||||
require.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied))
|
||||
})
|
||||
}
|
||||
|
||||
func TestLocalAndUpdateSelf(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
node := planet.StorageNodes[0]
|
||||
var group errgroup.Group
|
||||
group.Go(func() error {
|
||||
_ = node.Contact.Service.Local()
|
||||
return nil
|
||||
})
|
||||
node.Contact.Service.UpdateSelf(&pb.NodeCapacity{})
|
||||
_ = group.Wait()
|
||||
})
|
||||
}
|
||||
|
@ -34,30 +34,29 @@ type Config struct {
|
||||
type Service struct {
|
||||
log *zap.Logger
|
||||
|
||||
mutex *sync.Mutex
|
||||
self *overlay.NodeDossier
|
||||
mu sync.Mutex
|
||||
self *overlay.NodeDossier
|
||||
}
|
||||
|
||||
// NewService creates a new contact service
|
||||
func NewService(log *zap.Logger, self *overlay.NodeDossier) *Service {
|
||||
return &Service{
|
||||
log: log,
|
||||
mutex: &sync.Mutex{},
|
||||
self: self,
|
||||
log: log,
|
||||
self: self,
|
||||
}
|
||||
}
|
||||
|
||||
// Local returns the storagenode node-dossier
|
||||
func (service *Service) Local() overlay.NodeDossier {
|
||||
service.mutex.Lock()
|
||||
defer service.mutex.Unlock()
|
||||
service.mu.Lock()
|
||||
defer service.mu.Unlock()
|
||||
return *service.self
|
||||
}
|
||||
|
||||
// UpdateSelf updates the local node with the capacity
|
||||
func (service *Service) UpdateSelf(capacity *pb.NodeCapacity) {
|
||||
service.mutex.Lock()
|
||||
defer service.mutex.Unlock()
|
||||
service.mu.Lock()
|
||||
defer service.mu.Unlock()
|
||||
if capacity != nil {
|
||||
service.self.Capacity = *capacity
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user