satellite/gracefulexit: select new node filtered by Distinct IP (#3435)

This commit is contained in:
Natalie Villasana 2019-11-06 16:38:52 -05:00 committed by Yingrong Zhao
parent aaead2a29e
commit 68a7790069
6 changed files with 100 additions and 5 deletions

View File

@ -21,7 +21,7 @@ type Reconfigure struct {
NewStorageNodeDB func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error)
StorageNode func(index int, config *storagenode.Config)
NewIPCount int
UniqueIPCount int
}
// DisablePeerCAWhitelist returns a `Reconfigure` that sets `UsePeerCAWhitelist` for

View File

@ -135,7 +135,7 @@ func (planet *Planet) newStorageNodes(count int, whitelistedSatellites storj.Nod
planet.config.Reconfigure.StorageNode(i, &config)
}
newIPCount := planet.config.Reconfigure.NewIPCount
newIPCount := planet.config.Reconfigure.UniqueIPCount
if newIPCount > 0 {
if i >= count-newIPCount {
config.Server.Address = fmt.Sprintf("127.0.%d.1:0", i+1)

View File

@ -267,6 +267,7 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) {
return nil
}
// maps pieceIDs to pendingTransfers to keep track of ongoing piece transfer requests
pending := newPendingMap()
// these are used to synchronize the "incomplete transfer loop" with the main thread (storagenode receive loop)

View File

@ -5,16 +5,19 @@ package overlay_test
import (
"runtime"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testplanet"
"storj.io/storj/pkg/storj"
"storj.io/storj/satellite"
"storj.io/storj/satellite/overlay"
)
@ -301,6 +304,61 @@ func TestNodeSelectionGracefulExit(t *testing.T) {
})
}
func TestFindStorageNodesDistinctIPs(t *testing.T) {
if runtime.GOOS == "darwin" {
t.Skip("Test does not work with macOS")
}
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 5, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
// will create 3 storage nodes with same IP; 2 will have unique
UniqueIPCount: 2,
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Overlay.Node.DistinctIP = true
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
// select one of the nodes that shares an IP with others to exclude
var excludedNodes storj.NodeIDList
addrCounts := make(map[string]int)
var excludedNodeAddr string
for _, node := range planet.StorageNodes {
addrNoPort := strings.Split(node.Addr(), ":")[0]
if addrCounts[addrNoPort] > 0 && len(excludedNodes) == 0 {
excludedNodes = append(excludedNodes, node.ID())
break
}
addrCounts[addrNoPort]++
}
require.Len(t, excludedNodes, 1)
res, err := satellite.Overlay.Service.Get(ctx, excludedNodes[0])
require.NoError(t, err)
excludedNodeAddr = res.LastIp
req := overlay.FindStorageNodesRequest{
MinimumRequiredNodes: 2,
RequestedCount: 2,
ExcludedNodes: excludedNodes,
}
nodes, err := satellite.Overlay.Service.FindStorageNodes(ctx, req)
require.NoError(t, err)
require.Len(t, nodes, 2)
require.NotEqual(t, nodes[0].LastIp, nodes[1].LastIp)
require.NotEqual(t, nodes[0].LastIp, excludedNodeAddr)
require.NotEqual(t, nodes[1].LastIp, excludedNodeAddr)
req = overlay.FindStorageNodesRequest{
MinimumRequiredNodes: 3,
RequestedCount: 3,
ExcludedNodes: excludedNodes,
}
_, err = satellite.Overlay.Service.FindStorageNodes(ctx, req)
require.Error(t, err)
})
}
func TestDistinctIPs(t *testing.T) {
if runtime.GOOS == "darwin" {
t.Skip("Test does not work with macOS")
@ -308,7 +366,7 @@ func TestDistinctIPs(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 10, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
NewIPCount: 3,
UniqueIPCount: 3,
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
@ -338,7 +396,7 @@ func TestDistinctIPsWithBatch(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 10, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
NewIPCount: 3,
UniqueIPCount: 3,
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]

View File

@ -81,6 +81,9 @@ type DB interface {
GetGracefulExitIncompleteByTimeFrame(ctx context.Context, begin, end time.Time) (exitingNodes storj.NodeIDList, err error)
// GetExitStatus returns a node's graceful exit status.
GetExitStatus(ctx context.Context, nodeID storj.NodeID) (exitStatus *ExitStatus, err error)
// GetNodeIPs returns a list of IP addresses associated with given node IDs.
GetNodeIPs(ctx context.Context, nodeIDs []storj.NodeID) (nodeIPs []string, err error)
}
// NodeCheckInInfo contains all the info that will be updated when a node checkins
@ -255,6 +258,14 @@ func (service *Service) FindStorageNodesWithPreferences(ctx context.Context, req
}
excludedNodes := req.ExcludedNodes
// get and exclude IPs associated with excluded nodes if distinctIP is enabled
var excludedIPs []string
if preferences.DistinctIP && len(excludedNodes) > 0 {
excludedIPs, err = service.db.GetNodeIPs(ctx, excludedNodes)
if err != nil {
return nil, Error.Wrap(err)
}
}
newNodeCount := 0
if preferences.NewNodePercentage > 0 {
@ -277,7 +288,6 @@ func (service *Service) FindStorageNodesWithPreferences(ctx context.Context, req
}
}
var excludedIPs []string
// add selected new nodes and their IPs to the excluded lists for reputable node selection
for _, newNode := range newNodes {
excludedNodes = append(excludedNodes, newNode.Id)

View File

@ -146,6 +146,32 @@ func (cache *overlaycache) SelectNewStorageNodes(ctx context.Context, count int,
return nodes, nil
}
// GetNodeIPs returns a list of node IP addresses. Warning: these node IP addresses might be returned out of order.
func (cache *overlaycache) GetNodeIPs(ctx context.Context, nodeIDs []storj.NodeID) (nodeIPs []string, err error) {
defer mon.Task()(&ctx)(&err)
var rows *sql.Rows
rows, err = cache.db.Query(cache.db.Rebind(`
SELECT last_net FROM nodes
WHERE id = any($1::bytea[])
`), postgresNodeIDList(nodeIDs),
)
if err != nil {
return nil, err
}
defer func() { err = errs.Combine(err, rows.Close()) }()
for rows.Next() {
var ip string
err = rows.Scan(&ip)
if err != nil {
return nil, err
}
nodeIPs = append(nodeIPs, ip)
}
return nodeIPs, nil
}
func (cache *overlaycache) queryNodes(ctx context.Context, excludedNodes []storj.NodeID, count int, safeQuery string, args ...interface{}) (_ []*pb.Node, err error) {
defer mon.Task()(&ctx)(&err)