satellite/overlay: fix data-race in node selection cache

GetNodes returned references to nodes in the immutable state, however
some parts of code expect them to be modified.

Change-Id: I5be1866f95e0dbe062a6b6be60e29f2365c35faa
This commit is contained in:
Egon Elbre 2020-05-06 20:00:07 +03:00
parent 693f224ffd
commit 2955c50bc1
3 changed files with 86 additions and 12 deletions

View File

@ -148,7 +148,7 @@ nextNewNode:
continue nextNewNode
}
selectedNodeResults = append(selectedNodeResults, currNode)
selectedNodeResults = append(selectedNodeResults, currNode.Clone())
distinctNetworks[currNode.LastNet] = struct{}{}
if len(selectedNodeResults) >= newNodeCount {
break
@ -172,7 +172,7 @@ nextReputableNode:
continue nextReputableNode
}
selectedNodeResults = append(selectedNodeResults, currNode)
selectedNodeResults = append(selectedNodeResults, currNode.Clone())
distinctNetworks[currNode.LastNet] = struct{}{}
if len(selectedNodeResults) >= totalcount {
break

View File

@ -119,6 +119,8 @@ func addNodesToNodesTable(ctx context.Context, t *testing.T, db overlay.DB, coun
type mockdb struct {
mu sync.Mutex
callCount int
reputable []*overlay.SelectedNode
new []*overlay.SelectedNode
}
func (m *mockdb) SelectAllStorageNodesUpload(ctx context.Context, selectionCfg overlay.NodeSelectionConfig) (reputable, new []*overlay.SelectedNode, err error) {
@ -126,8 +128,19 @@ func (m *mockdb) SelectAllStorageNodesUpload(ctx context.Context, selectionCfg o
defer m.mu.Unlock()
sync2.Sleep(ctx, 500*time.Millisecond)
m.callCount++
return []*overlay.SelectedNode{}, []*overlay.SelectedNode{}, nil
reputable = make([]*overlay.SelectedNode, len(m.reputable))
for i, n := range m.reputable {
reputable[i] = n.Clone()
}
new = make([]*overlay.SelectedNode, len(m.new))
for i, n := range m.new {
new[i] = n.Clone()
}
return reputable, new, nil
}
func TestRefreshConcurrent(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
@ -173,7 +186,7 @@ func TestRefreshConcurrent(t *testing.T) {
require.Equal(t, 2, mockDB.callCount)
}
func TestGetNode(t *testing.T) {
func TestGetNodes(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
var nodeCfg = overlay.NodeSelectionConfig{
AuditCount: 0,
@ -214,13 +227,30 @@ func TestGetNode(t *testing.T) {
}
})
}
func TestGetNodeConcurrent(t *testing.T) {
func TestGetNodesConcurrent(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
reputableNodes := []*overlay.SelectedNode{{
ID: storj.NodeID{1},
Address: &pb.NodeAddress{Address: "127.0.0.9"},
LastNet: "127.0.0",
LastIPPort: "127.0.0.9:8000",
}}
newNodes := []*overlay.SelectedNode{{
ID: storj.NodeID{1},
Address: &pb.NodeAddress{Address: "127.0.0.10"},
LastNet: "127.0.0",
LastIPPort: "127.0.0.10:8000",
}}
// concurrent GetNodes with high staleness, where high staleness means the
// cache should only be refreshed the first time we call cache.GetNodes
mockDB := mockdb{}
mockDB := mockdb{
reputable: reputableNodes,
new: newNodes,
}
cache := overlay.NewNodeSelectionCache(zap.NewNop(),
&mockDB,
highStaleness,
@ -229,11 +259,25 @@ func TestGetNodeConcurrent(t *testing.T) {
var group errgroup.Group
group.Go(func() error {
_, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{})
nodes, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{
MinimumRequiredNodes: 1,
})
for i := range nodes {
nodes[i].ID = storj.NodeID{byte(i)}
nodes[i].Address.Address = "123.123.123.123"
}
nodes[0] = nil
return err
})
group.Go(func() error {
_, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{})
nodes, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{
MinimumRequiredNodes: 1,
})
for i := range nodes {
nodes[i].ID = storj.NodeID{byte(i)}
nodes[i].Address.Address = "123.123.123.123"
}
nodes[0] = nil
return err
})
err := group.Wait()
@ -243,7 +287,10 @@ func TestGetNodeConcurrent(t *testing.T) {
// concurrent get nodes with low staleness, where low staleness means that
// the cache will refresh each time cache.GetNodes is called
mockDB = mockdb{}
mockDB = mockdb{
reputable: reputableNodes,
new: newNodes,
}
cache = overlay.NewNodeSelectionCache(zap.NewNop(),
&mockDB,
lowStaleness,
@ -251,11 +298,25 @@ func TestGetNodeConcurrent(t *testing.T) {
)
group.Go(func() error {
_, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{})
nodes, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{
MinimumRequiredNodes: 1,
})
for i := range nodes {
nodes[i].ID = storj.NodeID{byte(i)}
nodes[i].Address.Address = "123.123.123.123"
}
nodes[0] = nil
return err
})
group.Go(func() error {
_, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{})
nodes, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{
MinimumRequiredNodes: 1,
})
for i := range nodes {
nodes[i].ID = storj.NodeID{byte(i)}
nodes[i].Address.Address = "123.123.123.123"
}
nodes[0] = nil
return err
})
err = group.Wait()
@ -264,7 +325,7 @@ func TestGetNodeConcurrent(t *testing.T) {
require.Equal(t, 2, mockDB.callCount)
}
func TestGetNodeError(t *testing.T) {
func TestGetNodesError(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()

View File

@ -226,6 +226,19 @@ type SelectedNode struct {
LastIPPort string
}
// Clone returns a deep clone of the selected node.
func (node *SelectedNode) Clone() *SelectedNode {
return &SelectedNode{
ID: node.ID,
Address: &pb.NodeAddress{
Transport: node.Address.Transport,
Address: node.Address.Address,
},
LastNet: node.LastNet,
LastIPPort: node.LastIPPort,
}
}
// Service is used to store and handle node information
//
// architecture: Service