storj/pkg/kademlia/kademlia_planet_test.go

259 lines
7.3 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package kademlia_test
import (
"context"
"io"
"net"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/zeebo/errs"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"storj.io/storj/internal/errs2"
"storj.io/storj/internal/memory"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testplanet"
"storj.io/storj/pkg/kademlia"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/peertls/tlsopts"
"storj.io/storj/pkg/transport"
"storj.io/storj/satellite"
"storj.io/storj/storagenode"
)
func TestFetchPeerIdentity(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
sat := planet.Satellites[0]
peerID, err := planet.StorageNodes[0].Kademlia.Service.FetchPeerIdentity(ctx, sat.ID())
require.NoError(t, err)
require.Equal(t, sat.ID(), peerID.ID)
require.True(t, sat.Identity.Leaf.Equal(peerID.Leaf))
require.True(t, sat.Identity.CA.Equal(peerID.CA))
})
}
func TestRequestInfo(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
node := planet.StorageNodes[0]
info, err := planet.Satellites[0].Kademlia.Service.FetchInfo(ctx, node.Local().Node)
require.NoError(t, err)
require.Equal(t, node.Local().Type, info.GetType())
require.Empty(t, cmp.Diff(node.Local().Operator, *info.GetOperator(), cmp.Comparer(pb.Equal)))
require.Empty(t, cmp.Diff(node.Local().Capacity, *info.GetCapacity(), cmp.Comparer(pb.Equal)))
require.Empty(t, cmp.Diff(node.Local().Version, *info.GetVersion(), cmp.Comparer(pb.Equal)))
})
}
func TestRequestInfoUntrusted(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
Reconfigure: testplanet.Reconfigure{
StorageNode: func(index int, config *storagenode.Config) {
config.Storage.WhitelistedSatellites = nil
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
_, err := planet.Satellites[0].Kademlia.Service.FetchInfo(ctx, planet.StorageNodes[0].Local().Node)
require.Error(t, err)
assert.True(t, errs2.IsRPC(err, codes.PermissionDenied), "unexpected error: %+v", err)
})
}
func TestPingTimeout(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
self := planet.StorageNodes[0]
routingTable := self.Kademlia.RoutingTable
tlsOpts, err := tlsopts.NewOptions(self.Identity, tlsopts.Config{})
require.NoError(t, err)
self.Transport = transport.NewClientWithTimeouts(tlsOpts, transport.Timeouts{
Request: 1 * time.Millisecond,
})
network := &transport.SimulatedNetwork{
DialLatency: 300 * time.Second,
BytesPerSecond: 1 * memory.KB,
}
slowClient := network.NewClient(self.Transport)
require.NotNil(t, slowClient)
newService, err := kademlia.NewService(zaptest.NewLogger(t), slowClient, routingTable, kademlia.Config{})
require.NoError(t, err)
target := pb.Node{
Id: planet.StorageNodes[2].ID(),
Address: &pb.NodeAddress{
Transport: pb.NodeTransport_TCP_TLS_GRPC,
Address: planet.StorageNodes[2].Addr(),
},
}
_, err = newService.Ping(ctx, target)
require.Error(t, err, context.DeadlineExceeded)
require.True(t, kademlia.NodeErr.Has(err) && transport.Error.Has(err))
})
}
func TestBootstrapBackoffReconnect(t *testing.T) {
// TODO(nat): skipping because flakily erroring with "panic: planet took too long to shutdown"
// or kademlia_planet_test.go:139: dial tcp 127.0.0.1:40409: connect: connection refused
t.Skip("flaky")
ctx := testcontext.New(t)
defer ctx.Cleanup()
log := zaptest.NewLogger(t)
// This sets up an unreliable proxy server which will receive conns from
// storage nodes and the satellite, but drops the connections of the first
// `dropCount` number of connections to the bootstrap node (proxy.target).
// This should test that the Bootstrap function will retry a connection
// if it initially fails.
proxy, err := newBadProxy(log.Named("proxy"), "127.0.0.1:0", 4)
require.NoError(t, err)
planet, err := testplanet.NewCustom(log, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 0,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Kademlia.BootstrapAddr = proxy.listener.Addr().String()
},
StorageNode: func(index int, config *storagenode.Config) {
config.Kademlia.BootstrapAddr = proxy.listener.Addr().String()
config.Kademlia.BootstrapBackoffBase = 100 * time.Millisecond
config.Kademlia.BootstrapBackoffMax = 3 * time.Second
},
},
})
require.NoError(t, err)
// We set the bad proxy's "target" to the bootstrap node's addr
// (which was selected when the new custom planet was set up).
proxy.target = planet.Bootstrap.Addr()
var group errgroup.Group
group.Go(func() error { return proxy.run(ctx) })
defer ctx.Check(group.Wait)
defer ctx.Check(proxy.close)
planet.Start(ctx)
ctx.Check(planet.Shutdown)
}
type badProxy struct {
log *zap.Logger
target string
dropCount int
listener net.Listener
done chan struct{}
}
func newBadProxy(log *zap.Logger, addr string, dropCount int) (*badProxy, error) {
listener, err := net.Listen("tcp", addr)
if err != nil {
return nil, errs.Wrap(err)
}
return &badProxy{
log: log,
target: "",
dropCount: dropCount,
listener: listener,
done: make(chan struct{}),
}, nil
}
func (proxy *badProxy) close() error {
close(proxy.done)
return proxy.listener.Close()
}
func (proxy *badProxy) run(ctx context.Context) error {
var group errgroup.Group
group.Go(func() (err error) {
var connections errs2.Group
defer func() {
var errlist errs.Group
errlist.Add(err)
errlist.Add(connections.Wait()...)
err = errlist.Err()
}()
var conns int
for {
conn, err := proxy.listener.Accept()
if err != nil {
select {
case <-proxy.done:
return nil
default:
}
return errs.Wrap(err)
}
conns++
if conns < proxy.dropCount {
if err := conn.Close(); err != nil {
return errs.Wrap(err)
}
continue
}
connections.Go(func() error {
defer func() {
err = errs.Combine(err, conn.Close())
}()
targetConn, err := net.Dial("tcp", proxy.target)
if err != nil {
return err
}
defer func() { err = errs.Combine(err, targetConn.Close()) }()
var pipe errs2.Group
pipe.Go(func() error {
_, err := io.Copy(targetConn, conn)
// since planet is shutting down a forced close is to be expected
if err != nil {
proxy.log.Debug("copy error", zap.Error(err))
}
return nil
})
pipe.Go(func() error {
_, err := io.Copy(conn, targetConn)
// since planet is shutting down a forced close is to be expected
if err != nil {
proxy.log.Debug("copy error", zap.Error(err))
}
return nil
})
return errs.Combine(pipe.Wait()...)
})
}
})
return errs.Wrap(group.Wait())
}