satellite/gracefulexit: only allow one connection per node to graceful exit endpoint (#3357)

This commit is contained in:
Maximillian von Briesen 2019-10-29 13:23:17 -04:00 committed by Yingrong Zhao
parent 30a3205745
commit cd3d3850f9
2 changed files with 142 additions and 1 deletions

View File

@ -54,6 +54,7 @@ type Endpoint struct {
overlay *overlay.Service
metainfo *metainfo.Service
orders *orders.Service
connections *connectionsTracker
peerIdentities overlay.PeerIdentities
config Config
}
@ -113,6 +114,40 @@ func (pm *pendingMap) delete(pieceID storj.PieceID) {
delete(pm.data, pieceID)
}
// connectionsTracker for tracking ongoing connections on this api server
type connectionsTracker struct {
mu sync.RWMutex
data map[storj.NodeID]struct{}
}
// newConnectionsTracker creates a new connectionsTracker and instantiates the map.
func newConnectionsTracker() *connectionsTracker {
return &connectionsTracker{
data: make(map[storj.NodeID]struct{}),
}
}
// tryAdd adds to the map if the node ID is not already added
// it returns true if succeeded and false if already added.
func (pm *connectionsTracker) tryAdd(nodeID storj.NodeID) bool {
pm.mu.Lock()
defer pm.mu.Unlock()
if _, ok := pm.data[nodeID]; ok {
return false
}
pm.data[nodeID] = struct{}{}
return true
}
// delete deletes a node ID from the map.
func (pm *connectionsTracker) delete(nodeID storj.NodeID) {
pm.mu.Lock()
defer pm.mu.Unlock()
delete(pm.data, nodeID)
}
// DRPC returns a DRPC form of the endpoint.
func (endpoint *Endpoint) DRPC() pb.DRPCSatelliteGracefulExitServer {
return &drpcEndpoint{Endpoint: endpoint}
@ -130,13 +165,14 @@ func NewEndpoint(log *zap.Logger, signer signing.Signer, db DB, overlaydb overla
overlay: overlay,
metainfo: metainfo,
orders: orders,
connections: newConnectionsTracker(),
peerIdentities: peerIdentities,
config: config,
}
}
// Process is called by storage nodes to receive pieces to transfer to new nodes and get exit status.
func (endpoint *Endpoint) Process(stream pb.SatelliteGracefulExit_ProcessServer) error {
func (endpoint *Endpoint) Process(stream pb.SatelliteGracefulExit_ProcessServer) (err error) {
return endpoint.doProcess(stream)
}
@ -158,6 +194,14 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) {
nodeID := peer.ID
endpoint.log.Debug("graceful exit process", zap.Stringer("node ID", nodeID))
// ensure that only one connection can be opened for a single node at a time
if !endpoint.connections.tryAdd(nodeID) {
return rpcstatus.Error(rpcstatus.PermissionDenied, "Only one concurrent connection allowed for graceful exit")
}
defer func() {
endpoint.connections.delete(nodeID)
}()
eofHandler := func(err error) error {
if err == io.EOF {
endpoint.log.Debug("received EOF when trying to receive messages from storage node", zap.Stringer("node ID", nodeID))

View File

@ -7,11 +7,13 @@ import (
"context"
"io"
"strconv"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/zeebo/errs"
"golang.org/x/sync/errgroup"
"storj.io/storj/internal/errs2"
"storj.io/storj/internal/memory"
@ -130,6 +132,101 @@ func TestSuccess(t *testing.T) {
})
}
func TestConcurrentConnections(t *testing.T) {
successThreshold := 8
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: successThreshold + 1,
UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
uplinkPeer := planet.Uplinks[0]
satellite := planet.Satellites[0]
satellite.GracefulExit.Chore.Loop.Pause()
rs := &uplink.RSConfig{
MinThreshold: 4,
RepairThreshold: 6,
SuccessThreshold: successThreshold,
MaxThreshold: successThreshold,
}
err := uplinkPeer.UploadWithConfig(ctx, satellite, rs, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB))
require.NoError(t, err)
// check that there are no exiting nodes.
exitingNodeIDs, err := satellite.DB.OverlayCache().GetExitingNodes(ctx)
require.NoError(t, err)
require.Len(t, exitingNodeIDs, 0)
exitingNode, err := findNodeToExit(ctx, planet, 2)
require.NoError(t, err)
var group errgroup.Group
concurrentCalls := 4
var wg sync.WaitGroup
wg.Add(1)
for i := 0; i < concurrentCalls; i++ {
group.Go(func() (err error) {
// connect to satellite so we initiate the exit.
conn, err := exitingNode.Dialer.DialAddressID(ctx, satellite.Addr(), satellite.Identity.ID)
require.NoError(t, err)
defer func() {
err = errs.Combine(err, conn.Close())
}()
client := conn.SatelliteGracefulExitClient()
// wait for "main" call to begin
wg.Wait()
c, err := client.Process(ctx)
require.NoError(t, err)
_, err = c.Recv()
require.Error(t, err)
require.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied))
return nil
})
}
// connect to satellite so we initiate the exit ("main" call)
conn, err := exitingNode.Dialer.DialAddressID(ctx, satellite.Addr(), satellite.Identity.ID)
require.NoError(t, err)
defer func() {
err = errs.Combine(err, conn.Close())
}()
client := conn.SatelliteGracefulExitClient()
// this connection will immediately return since graceful exit has not been initiated yet
c, err := client.Process(ctx)
require.NoError(t, err)
response, err := c.Recv()
require.NoError(t, err)
switch response.GetMessage().(type) {
case *pb.SatelliteMessage_NotReady:
default:
t.FailNow()
}
// wait for initial loop to start so we have pieces to transfer
satellite.GracefulExit.Chore.Loop.TriggerWait()
// this connection should not close immediately, since there are pieces to transfer
c, err = client.Process(ctx)
require.NoError(t, err)
_, err = c.Recv()
require.NoError(t, err)
// start receiving from concurrent connections
wg.Done()
err = group.Wait()
require.NoError(t, err)
})
}
func TestInvalidStorageNodeSignature(t *testing.T) {
testTransfers(t, 1, func(ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
response, err := processClient.Recv()