satellite/overlay: use node selection cache for uploads (#3859)

* satellite/overlay: use node selection cache for uploads

Change-Id: Ibd16cccee979d0544f2f4a01749af9f36f02a6ad

* fix config lock

Change-Id: Idd307e4dee8ab92749f1ec3f996419ea0af829fd

* start fixing tests

Change-Id: I207d373a3b2a2d9312c9e72fe9bd0b01e06ad6cf

* fix test, add some more

Change-Id: I82b99c2004fca2510965f9b389f87dd4474bc722

* change config name

Change-Id: I0c0f7fc726b2565dc3828cb723f5459a940f2a0b

* add benchmarks

Change-Id: I05fa25bff8d5b65f94d918556855b95163d002e9

* revert bench to put in different PR

Change-Id: I0f6942296895594768f19614bd7b2e3b9b106ade

* add staleness to benchmark

Change-Id: Ia80a310623d5a342afa6d835402170b531b0f870

* add cache config to testplanet

Change-Id: I39abdab8cc442694da543115a9e470b2a8a25dff

* have repair select old way

Change-Id: I25a938457d7d1bcf89fd15130cb6b0ac19585252

* lower testplante config time

Change-Id: Ib56a2ed086c06bc6061388d15a10a2526a663af7

* fix test

Change-Id: I3868e9cacde2dfbf9c407afab04dc5fc2f286f69
This commit is contained in:
Jess G 2020-04-24 09:11:04 -07:00 committed by GitHub
parent 7a4dcd61f7
commit 825226c98e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 259 additions and 40 deletions

View File

@ -337,6 +337,9 @@ func (planet *Planet) newSatellites(count int, satelliteDatabases satellitedbtes
AuditReputationDQ: 0.6,
SuspensionGracePeriod: time.Hour,
},
NodeSelectionCache: overlay.CacheConfig{
Staleness: 3 * time.Minute,
},
UpdateStatsBatchSize: 100,
},
Metainfo: metainfo.Config{

View File

@ -170,7 +170,7 @@ func TestDisqualifiedNodesGetNoUpload(t *testing.T) {
request := overlay.FindStorageNodesRequest{
MinimumRequiredNodes: 4,
RequestedCount: 0,
RequestedCount: 4,
ExcludedIDs: nil,
MinimumVersion: "", // semver or empty
}

View File

@ -359,6 +359,9 @@ func BenchmarkNodeSelection(b *testing.B) {
service := overlay.NewService(zap.NewNop(), overlaydb, overlay.Config{
Node: nodeSelectionConfig,
NodeSelectionCache: overlay.CacheConfig{
Staleness: time.Hour,
},
})
b.Run("FindStorageNodesWithPreference", func(b *testing.B) {
@ -412,5 +415,31 @@ func BenchmarkNodeSelection(b *testing.B) {
require.NotEmpty(b, selected)
}
})
b.Run("NodeSelectionCacheGetNodes", func(b *testing.B) {
for i := 0; i < b.N; i++ {
selected, err := service.SelectionCache.GetNodes(ctx, overlay.FindStorageNodesRequest{
MinimumRequiredNodes: SelectCount,
RequestedCount: SelectCount,
ExcludedIDs: nil,
MinimumVersion: "v1.0.0",
})
require.NoError(b, err)
require.NotEmpty(b, selected)
}
})
b.Run("NodeSelectionCacheGetNodesExclusion", func(b *testing.B) {
for i := 0; i < b.N; i++ {
selected, err := service.SelectionCache.GetNodes(ctx, overlay.FindStorageNodesRequest{
MinimumRequiredNodes: SelectCount,
RequestedCount: SelectCount,
ExcludedIDs: excludedIDs,
MinimumVersion: "v1.0.0",
})
require.NoError(b, err)
require.NotEmpty(b, selected)
}
})
})
}

View File

@ -21,6 +21,7 @@ var (
// Config is a configuration for overlay service.
type Config struct {
Node NodeSelectionConfig
NodeSelectionCache CacheConfig
UpdateStatsBatchSize int `help:"number of update requests to process per transaction" default:"100"`
}

View File

@ -123,7 +123,10 @@ func (cacheData *state) GetNodes(ctx context.Context, req FindStorageNodesReques
defer cacheData.mu.RUnlock()
// how many reputableNodes versus newNode nodes should be selected
totalcount := req.RequestedCount
totalcount := req.MinimumRequiredNodes
if totalcount <= 0 {
totalcount = req.RequestedCount
}
newNodeCount := int(float64(req.RequestedCount) * newNodeFraction)
var selectedNodeResults = []*SelectedNode{}
@ -132,16 +135,17 @@ func (cacheData *state) GetNodes(ctx context.Context, req FindStorageNodesReques
// Get a random selection of new nodes out of the cache first so that if there aren't
// enough new nodes on the network, we can fall back to using reputable nodes instead
randomIndexes := rand.Perm(len(cacheData.newNodes))
nextNewNode:
for _, idx := range randomIndexes {
currNode := cacheData.newNodes[idx]
if _, ok := distinctNetworks[currNode.LastNet]; ok {
continue
}
for _, excludedID := range req.ExcludedIDs {
if excludedID == currNode.ID {
continue
continue nextNewNode
}
}
if _, ok := distinctNetworks[currNode.LastNet]; ok {
continue nextNewNode
}
selectedNodeResults = append(selectedNodeResults, currNode)
distinctNetworks[currNode.LastNet] = struct{}{}
@ -151,20 +155,22 @@ func (cacheData *state) GetNodes(ctx context.Context, req FindStorageNodesReques
}
randomIndexes = rand.Perm(len(cacheData.reputableNodes))
nextReputableNode:
for _, idx := range randomIndexes {
currNode := cacheData.reputableNodes[idx]
// don't select a node if we've already selected another node from the same network
if _, ok := distinctNetworks[currNode.LastNet]; ok {
continue
}
// don't select a node listed in the excluded list
for _, excludedID := range req.ExcludedIDs {
if excludedID == currNode.ID {
continue
continue nextReputableNode
}
}
// don't select a node if we've already selected another node from the same network
if _, ok := distinctNetworks[currNode.LastNet]; ok {
continue nextReputableNode
}
selectedNodeResults = append(selectedNodeResults, currNode)
distinctNetworks[currNode.LastNet] = struct{}{}
if len(selectedNodeResults) >= totalcount {
@ -173,9 +179,7 @@ func (cacheData *state) GetNodes(ctx context.Context, req FindStorageNodesReques
}
if len(selectedNodeResults) < totalcount {
return nil, Error.New("unable to select enough nodes from node selection cache, needed: %d, actual: %d",
totalcount, len(selectedNodeResults),
)
return selectedNodeResults, ErrNotEnoughNodes.New("requested from cache %d found %d", totalcount, len(selectedNodeResults))
}
return selectedNodeResults, nil
}

View File

@ -62,7 +62,7 @@ func TestRefresh(t *testing.T) {
// add some nodes to the database
const nodeCount = 2
addNodesToNodesTable(ctx, t, db.OverlayCache(), nodeCount)
addNodesToNodesTable(ctx, t, db.OverlayCache(), nodeCount, false)
// confirm nodes are in the cache once
err = cache.Refresh(ctx)
@ -73,7 +73,8 @@ func TestRefresh(t *testing.T) {
})
}
func addNodesToNodesTable(ctx context.Context, t *testing.T, db overlay.DB, count int) {
func addNodesToNodesTable(ctx context.Context, t *testing.T, db overlay.DB, count int, makeReputable bool) []storj.NodeID {
var reputableIds = []storj.NodeID{}
for i := 0; i < count; i++ {
subnet := strconv.Itoa(i) + ".1.2"
addr := subnet + ".3:8080"
@ -99,7 +100,20 @@ func addNodesToNodesTable(ctx context.Context, t *testing.T, db overlay.DB, coun
}
err := db.UpdateCheckIn(ctx, n, time.Now().UTC(), nodeCfg)
require.NoError(t, err)
// make half of the nodes reputable
if makeReputable && i > count/2 {
_, err = db.UpdateStats(ctx, &overlay.UpdateRequest{
NodeID: storj.NodeID{byte(i)},
IsUp: true,
AuditOutcome: overlay.AuditSuccess,
AuditLambda: 1, AuditWeight: 1, AuditDQ: 0.5,
})
require.NoError(t, err)
reputableIds = append(reputableIds, storj.NodeID{byte(i)})
}
}
return reputableIds
}
type mockdb struct {
@ -182,7 +196,7 @@ func TestGetNode(t *testing.T) {
// add some nodes to the database
const nodeCount = 4
addNodesToNodesTable(ctx, t, db.OverlayCache(), nodeCount)
addNodesToNodesTable(ctx, t, db.OverlayCache(), nodeCount, false)
// confirm cache.GetNodes returns the correct nodes
selectedNodes, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{RequestedCount: 2})
@ -271,3 +285,54 @@ func TestGetNodeError(t *testing.T) {
_, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{RequestedCount: 2})
require.Error(t, err)
}
func TestNewNodeFraction(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
newNodeFraction := 0.2
var nodeCfg = overlay.NodeSelectionConfig{
AuditCount: 1,
UptimeCount: 1,
NewNodeFraction: newNodeFraction,
MinimumVersion: "v1.0.0",
OnlineWindow: 4 * time.Hour,
DistinctIP: true,
MinimumDiskSpace: 10 * memory.MiB,
}
cache := overlay.NewNodeSelectionCache(zap.NewNop(),
db.OverlayCache(),
lowStaleness,
nodeCfg,
)
// the cache should have no nodes to start
err := cache.Refresh(ctx)
require.NoError(t, err)
reputable, new := cache.Size()
require.Equal(t, 0, reputable)
require.Equal(t, 0, new)
// add some nodes to the database, some are reputable and some are new nodes
const nodeCount = 10
repIDs := addNodesToNodesTable(ctx, t, db.OverlayCache(), nodeCount, true)
// confirm nodes are in the cache once
err = cache.Refresh(ctx)
require.NoError(t, err)
reputable, new = cache.Size()
require.Equal(t, 6, new)
require.Equal(t, 4, reputable)
// select nodes and confirm correct new node fraction
n, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{RequestedCount: 5})
require.NoError(t, err)
require.Equal(t, len(n), 5)
var reputableCount int
for _, id := range repIDs {
for _, node := range n {
if id == node.ID {
reputableCount++
}
}
}
require.Equal(t, len(n)-reputableCount, int(5*newNodeFraction))
})
}

View File

@ -28,17 +28,24 @@ import (
)
func TestMinimumDiskSpace(t *testing.T) {
if runtime.GOOS == "darwin" {
t.Skip("Test does not work with macOS")
}
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 2, UplinkCount: 0,
Reconfigure: testplanet.Reconfigure{
UniqueIPCount: 2,
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Overlay.Node.MinimumDiskSpace = 10 * memory.MB
config.Overlay.NodeSelectionCache.Staleness = -time.Hour
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
saOverlay := planet.Satellites[0].Overlay
nodeConfig := planet.Satellites[0].Config.Overlay.Node
node0 := planet.StorageNodes[0]
node0.Contact.Chore.Pause(ctx)
nodeDossier := node0.Local()
ident := node0.Identity
peer := rpcpeer.Peer{
@ -63,13 +70,22 @@ func TestMinimumDiskSpace(t *testing.T) {
})
require.NoError(t, err)
// request 2 nodes, expect failure from not enough nodes
_, err = planet.Satellites[0].Overlay.Service.FindStorageNodes(ctx, overlay.FindStorageNodesRequest{
req := overlay.FindStorageNodesRequest{
MinimumRequiredNodes: 2,
RequestedCount: 2,
})
}
// request 2 nodes, expect failure from not enough nodes
n1, err := saOverlay.Service.FindStorageNodes(ctx, req)
require.Error(t, err)
require.True(t, overlay.ErrNotEnoughNodes.Has(err))
n2, err := saOverlay.Service.SelectionCache.GetNodes(ctx, req)
require.Error(t, err)
require.True(t, overlay.ErrNotEnoughNodes.Has(err))
require.Equal(t, len(n2), len(n1))
n3, err := saOverlay.Service.FindStorageNodesWithPreferences(ctx, req, &nodeConfig)
require.Error(t, err)
require.Equal(t, len(n3), len(n1))
// report disk space greater than minimum
_, err = planet.Satellites[0].Contact.Endpoint.CheckIn(peerCtx, &pb.CheckInRequest{
@ -83,11 +99,15 @@ func TestMinimumDiskSpace(t *testing.T) {
require.NoError(t, err)
// request 2 nodes, expect success
_, err = planet.Satellites[0].Overlay.Service.FindStorageNodes(ctx, overlay.FindStorageNodesRequest{
MinimumRequiredNodes: 2,
RequestedCount: 2,
})
n1, err = planet.Satellites[0].Overlay.Service.FindStorageNodes(ctx, req)
require.NoError(t, err)
require.Equal(t, 2, len(n1))
n2, err = saOverlay.Service.FindStorageNodesWithPreferences(ctx, req, &nodeConfig)
require.NoError(t, err)
require.Equal(t, len(n1), len(n2))
n3, err = saOverlay.Service.SelectionCache.GetNodes(ctx, req)
require.NoError(t, err)
require.Equal(t, len(n1), len(n3))
})
}
@ -127,6 +147,12 @@ func TestOffline(t *testing.T) {
func TestEnsureMinimumRequested(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 10, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
UniqueIPCount: 5,
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Overlay.Node.MinimumDiskSpace = 10 * memory.MB
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
@ -170,10 +196,10 @@ func TestEnsureMinimumRequested(t *testing.T) {
requestedCount, newCount := 5, 1
newNodeFraction := float64(newCount) / float64(requestedCount)
preferences := testNodeSelectionConfig(1, newNodeFraction, false)
nodes, err := service.FindStorageNodesWithPreferences(ctx, overlay.FindStorageNodesRequest{
req := overlay.FindStorageNodesRequest{
RequestedCount: requestedCount,
}, &preferences)
}
nodes, err := service.FindStorageNodesWithPreferences(ctx, req, &preferences)
require.NoError(t, err)
require.Len(t, nodes, requestedCount)
require.Equal(t, requestedCount-newCount, countReputable(nodes))
@ -183,13 +209,17 @@ func TestEnsureMinimumRequested(t *testing.T) {
requestedCount, newCount := 5, 5
newNodeFraction := float64(newCount) / float64(requestedCount)
preferences := testNodeSelectionConfig(1, newNodeFraction, false)
nodes, err := service.FindStorageNodesWithPreferences(ctx, overlay.FindStorageNodesRequest{
req := overlay.FindStorageNodesRequest{
RequestedCount: requestedCount,
}, &preferences)
}
nodes, err := service.FindStorageNodesWithPreferences(ctx, req, &preferences)
require.NoError(t, err)
require.Len(t, nodes, requestedCount)
require.Equal(t, 0, countReputable(nodes))
n2, err := service.SelectionCache.GetNodes(ctx, req)
require.NoError(t, err)
require.Equal(t, requestedCount, len(n2))
})
// update all of them to be reputable
@ -209,6 +239,7 @@ func TestEnsureMinimumRequested(t *testing.T) {
requestedCount, newCount := 5, 1.0
newNodeFraction := newCount / float64(requestedCount)
preferences := testNodeSelectionConfig(1, newNodeFraction, false)
satellite.Config.Overlay.Node = testNodeSelectionConfig(1, newNodeFraction, false)
nodes, err := service.FindStorageNodesWithPreferences(ctx, overlay.FindStorageNodesRequest{
RequestedCount: requestedCount,
@ -508,14 +539,37 @@ func TestFindStorageNodesDistinctNetworks(t *testing.T) {
require.NotEqual(t, nodes[0].LastIPPort, nodes[1].LastIPPort)
require.NotEqual(t, nodes[0].LastIPPort, excludedNodeAddr)
require.NotEqual(t, nodes[1].LastIPPort, excludedNodeAddr)
n2, err := satellite.Overlay.Service.SelectionCache.GetNodes(ctx, req)
require.NoError(t, err)
require.Len(t, n2, 2)
require.NotEqual(t, n2[0].LastIPPort, n2[1].LastIPPort)
require.NotEqual(t, n2[0].LastIPPort, excludedNodeAddr)
require.NotEqual(t, n2[1].LastIPPort, excludedNodeAddr)
n3, err := satellite.Overlay.Service.FindStorageNodesWithPreferences(ctx, req, &satellite.Config.Overlay.Node)
require.NoError(t, err)
require.Len(t, n3, 2)
require.NotEqual(t, n3[0].LastIPPort, n3[1].LastIPPort)
require.NotEqual(t, n3[0].LastIPPort, excludedNodeAddr)
require.NotEqual(t, n3[1].LastIPPort, excludedNodeAddr)
req = overlay.FindStorageNodesRequest{
MinimumRequiredNodes: 3,
RequestedCount: 3,
MinimumRequiredNodes: 4,
RequestedCount: 4,
ExcludedIDs: excludedNodes,
}
_, err = satellite.Overlay.Service.FindStorageNodes(ctx, req)
n, err := satellite.Overlay.Service.FindStorageNodes(ctx, req)
require.Error(t, err)
n1, err := satellite.Overlay.Service.FindStorageNodesWithPreferences(ctx, req, &satellite.Config.Overlay.Node)
require.Error(t, err)
require.Equal(t, len(n), len(n1))
n2, err = satellite.Overlay.Service.SelectionCache.GetNodes(ctx, req)
require.Error(t, err)
// GetNodes returns 1 more node than FindStorageNodesWithPreferences because of the way the queries are...
// FindStorageNodesWithPreferences gets the IPs for the excludedNodeIDs and excludes all those IPs from the selection
// (which results in filtering out any node on the same network as a excludedNodeID),
// but the selection cache only filters IPs at time of selection which makes it so that it can include a node that shares a network
// with an exclueded ID
require.Equal(t, len(n1)+1, len(n2))
})
}
@ -564,6 +618,18 @@ func TestSelectNewStorageNodesExcludedIPs(t *testing.T) {
require.NotEqual(t, nodes[0].LastIPPort, nodes[1].LastIPPort)
require.NotEqual(t, nodes[0].LastIPPort, excludedNodeAddr)
require.NotEqual(t, nodes[1].LastIPPort, excludedNodeAddr)
n2, err := satellite.Overlay.Service.SelectionCache.GetNodes(ctx, req)
require.NoError(t, err)
require.Len(t, n2, 2)
require.NotEqual(t, n2[0].LastIPPort, n2[1].LastIPPort)
require.NotEqual(t, n2[0].LastIPPort, excludedNodeAddr)
require.NotEqual(t, n2[1].LastIPPort, excludedNodeAddr)
n3, err := satellite.Overlay.Service.FindStorageNodesWithPreferences(ctx, req, &satellite.Config.Overlay.Node)
require.NoError(t, err)
require.Len(t, n3, 2)
require.NotEqual(t, n3[0].LastIPPort, n3[1].LastIPPort)
require.NotEqual(t, n3[0].LastIPPort, excludedNodeAddr)
require.NotEqual(t, n3[1].LastIPPort, excludedNodeAddr)
})
}
@ -689,3 +755,29 @@ func TestAddrtoNetwork_Conversion(t *testing.T) {
require.Equal(t, ipv6, resolvedIPPort)
require.NoError(t, err)
}
func TestCacheSelectionVsDBSelection(t *testing.T) {
if runtime.GOOS == "darwin" {
t.Skip("Test does not work with macOS")
}
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 5, UplinkCount: 0,
Reconfigure: testplanet.Reconfigure{
UniqueIPCount: 5,
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
planet.StorageNodes[0].Storage2.Monitor.Loop.Pause()
saOverlay := planet.Satellites[0].Overlay
nodeConfig := planet.Satellites[0].Config.Overlay.Node
req := overlay.FindStorageNodesRequest{RequestedCount: 5}
n1, err := saOverlay.Service.FindStorageNodes(ctx, req)
require.NoError(t, err)
n2, err := saOverlay.Service.SelectionCache.GetNodes(ctx, req)
require.NoError(t, err)
require.Equal(t, len(n2), len(n1))
n3, err := saOverlay.Service.FindStorageNodesWithPreferences(ctx, req, &nodeConfig)
require.NoError(t, err)
require.Equal(t, len(n3), len(n2))
})
}

View File

@ -231,9 +231,10 @@ type SelectedNode struct {
//
// architecture: Service
type Service struct {
log *zap.Logger
db DB
config Config
log *zap.Logger
db DB
config Config
SelectionCache *NodeSelectionCache
}
// NewService returns a new Service
@ -242,6 +243,9 @@ func NewService(log *zap.Logger, db DB, config Config) *Service {
log: log,
db: db,
config: config,
SelectionCache: NewNodeSelectionCache(log, db,
config.NodeSelectionCache.Staleness, config.Node,
),
}
}
@ -276,12 +280,30 @@ func (service *Service) IsOnline(node *NodeDossier) bool {
return time.Since(node.Reputation.LastContactSuccess) < service.config.Node.OnlineWindow
}
// FindStorageNodes searches the overlay network for nodes that meet the provided requirements
func (service *Service) FindStorageNodes(ctx context.Context, req FindStorageNodesRequest) (_ []*SelectedNode, err error) {
// FindStorageNodesForRepair searches the overlay network for nodes that meet the provided requirements for repair
// The main difference between this method and the normal FindStorageNodes is that here we filter out all nodes that
// share a subnet with any node in req.ExcludedIDs. This additional complexity is not needed for other uses of finding storage nodes
func (service *Service) FindStorageNodesForRepair(ctx context.Context, req FindStorageNodesRequest) (_ []*SelectedNode, err error) {
defer mon.Task()(&ctx)(&err)
return service.FindStorageNodesWithPreferences(ctx, req, &service.config.Node)
}
// FindStorageNodes searches the overlay network for nodes that meet the provided requirements,
// it first searches the selected nodes cache, if there aren't enough nodes in the
// cache (which shouldn't typically happen), then it resorts back to selecting nodes from the the nodes table
func (service *Service) FindStorageNodes(ctx context.Context, req FindStorageNodesRequest) (_ []*SelectedNode, err error) {
defer mon.Task()(&ctx)(&err)
selectedNodes, err := service.SelectionCache.GetNodes(ctx, req)
if err != nil {
service.log.Warn("error selecting from node selection cache", zap.String("err", err.Error()))
}
if len(selectedNodes) < req.RequestedCount {
mon.Event("default_node_selection")
return service.FindStorageNodesWithPreferences(ctx, req, &service.config.Node)
}
return selectedNodes, nil
}
// FindStorageNodesWithPreferences searches the overlay network for nodes that meet the provided criteria
func (service *Service) FindStorageNodesWithPreferences(ctx context.Context, req FindStorageNodesRequest, preferences *NodeSelectionConfig) (nodes []*SelectedNode, err error) {
defer mon.Task()(&ctx)(&err)

View File

@ -204,7 +204,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (s
RequestedCount: requestCount,
ExcludedIDs: excludeNodeIDs,
}
newNodes, err := repairer.overlay.FindStorageNodes(ctx, request)
newNodes, err := repairer.overlay.FindStorageNodesForRepair(ctx, request)
if err != nil {
return false, overlayQueryError.Wrap(err)
}

View File

@ -406,6 +406,9 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
# how many orders to batch per transaction
# orders.settlement-batch-size: 250
# how stale the node selection cache can be
# overlay.node-selection-cache.staleness: 3m0s
# the number of times a node has been audited to not be considered a New Node
# overlay.node.audit-count: 100