satellite/durability: ignore information from new nodes
To get better performance, we pre-load all nodealias/node information at the beginning of the segment loop. It's possible that we receive a new node alias from the segment table what we are not fully aware of (yet). The easiest solution is just ignoring. New risks/threats can be detected by a new execution cycle. Change-Id: Ib54f7edc46eedbab6d13b4d651aaac1425994940
This commit is contained in:
parent
a63a69dfd9
commit
5c49ba1d85
@ -215,6 +215,12 @@ func (c *ObserverFork) Process(ctx context.Context, segments []rangedloop.Segmen
|
|||||||
s := &segments[i]
|
s := &segments[i]
|
||||||
healthyPieceCount := 0
|
healthyPieceCount := 0
|
||||||
for _, piece := range s.AliasPieces {
|
for _, piece := range s.AliasPieces {
|
||||||
|
if len(c.classified) <= int(piece.Alias) {
|
||||||
|
// this is a new node, but we can ignore it.
|
||||||
|
// will be included in the next execution cycle.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
classes := c.classified[piece.Alias]
|
classes := c.classified[piece.Alias]
|
||||||
|
|
||||||
// unavailable/offline nodes were not classified
|
// unavailable/offline nodes were not classified
|
||||||
|
@ -24,13 +24,6 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestDurability(t *testing.T) {
|
func TestDurability(t *testing.T) {
|
||||||
createUUID := func() uuid.UUID {
|
|
||||||
id, err := uuid.New()
|
|
||||||
require.NoError(t, err)
|
|
||||||
return id
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
var storageNodes []*nodeselection.SelectedNode
|
var storageNodes []*nodeselection.SelectedNode
|
||||||
var aliases []metabase.NodeAliasEntry
|
var aliases []metabase.NodeAliasEntry
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
@ -63,21 +56,13 @@ func TestDurability(t *testing.T) {
|
|||||||
// first batch
|
// first batch
|
||||||
err = fork.Process(ctx, []rangedloop.Segment{
|
err = fork.Process(ctx, []rangedloop.Segment{
|
||||||
{
|
{
|
||||||
StreamID: createUUID(),
|
StreamID: testrand.UUID(),
|
||||||
Position: metabase.SegmentPosition{
|
Position: metabase.SegmentPosition{
|
||||||
Part: 1,
|
Part: 1,
|
||||||
Index: 1,
|
Index: 1,
|
||||||
},
|
},
|
||||||
AliasPieces: pieces(storageNodes, 3, 6, 9, 1),
|
AliasPieces: pieces(storageNodes, 3, 6, 9, 1),
|
||||||
},
|
},
|
||||||
{
|
|
||||||
StreamID: createUUID(),
|
|
||||||
Position: metabase.SegmentPosition{
|
|
||||||
Part: 1,
|
|
||||||
Index: 1,
|
|
||||||
},
|
|
||||||
AliasPieces: pieces(storageNodes, 1, 2, 3, 4),
|
|
||||||
},
|
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
@ -89,7 +74,7 @@ func TestDurability(t *testing.T) {
|
|||||||
// second batch
|
// second batch
|
||||||
err = fork.Process(ctx, []rangedloop.Segment{
|
err = fork.Process(ctx, []rangedloop.Segment{
|
||||||
{
|
{
|
||||||
StreamID: createUUID(),
|
StreamID: testrand.UUID(),
|
||||||
Position: metabase.SegmentPosition{
|
Position: metabase.SegmentPosition{
|
||||||
Part: 1,
|
Part: 1,
|
||||||
Index: 1,
|
Index: 1,
|
||||||
@ -97,7 +82,7 @@ func TestDurability(t *testing.T) {
|
|||||||
AliasPieces: pieces(storageNodes, 2, 3, 4, 7),
|
AliasPieces: pieces(storageNodes, 2, 3, 4, 7),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
StreamID: createUUID(),
|
StreamID: testrand.UUID(),
|
||||||
Position: metabase.SegmentPosition{
|
Position: metabase.SegmentPosition{
|
||||||
Part: 1,
|
Part: 1,
|
||||||
Index: 1,
|
Index: 1,
|
||||||
@ -116,6 +101,62 @@ func TestDurability(t *testing.T) {
|
|||||||
require.Equal(t, 3, c.healthStat["net:127.0.2.0"].Min())
|
require.Equal(t, 3, c.healthStat["net:127.0.2.0"].Min())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDurabilityUnknownNode(t *testing.T) {
|
||||||
|
var storageNodes []*nodeselection.SelectedNode
|
||||||
|
var aliases []metabase.NodeAliasEntry
|
||||||
|
|
||||||
|
node := &nodeselection.SelectedNode{
|
||||||
|
ID: testidentity.MustPregeneratedIdentity(0, storj.LatestIDVersion()).ID,
|
||||||
|
LastNet: "127.0.0.1",
|
||||||
|
}
|
||||||
|
storageNodes = append(storageNodes, node)
|
||||||
|
aliases = append(aliases, metabase.NodeAliasEntry{
|
||||||
|
ID: node.ID,
|
||||||
|
Alias: metabase.NodeAlias(0),
|
||||||
|
})
|
||||||
|
|
||||||
|
ctx := testcontext.New(t)
|
||||||
|
c := NewDurability(nil, nil, []NodeClassifier{
|
||||||
|
func(node *nodeselection.SelectedNode) string {
|
||||||
|
return "net:" + node.LastNet
|
||||||
|
}}, 0, 0)
|
||||||
|
|
||||||
|
c.aliasMap = metabase.NewNodeAliasMap(aliases)
|
||||||
|
for _, node := range storageNodes {
|
||||||
|
c.nodes[node.ID] = node
|
||||||
|
}
|
||||||
|
|
||||||
|
fork, err := c.Fork(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// note: second piece points to an alias which was not preloaded (newly inserted).
|
||||||
|
err = fork.Process(ctx, []rangedloop.Segment{
|
||||||
|
{
|
||||||
|
StreamID: testrand.UUID(),
|
||||||
|
Position: metabase.SegmentPosition{
|
||||||
|
Part: 1,
|
||||||
|
Index: 1,
|
||||||
|
},
|
||||||
|
AliasPieces: metabase.AliasPieces{
|
||||||
|
metabase.AliasPiece{
|
||||||
|
Number: 1,
|
||||||
|
Alias: 0,
|
||||||
|
},
|
||||||
|
metabase.AliasPiece{
|
||||||
|
Number: 2,
|
||||||
|
Alias: 9999,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = c.Join(ctx, fork)
|
||||||
|
require.NoError(t, err)
|
||||||
|
// note: the newly created node (alias 9999) is not considered.
|
||||||
|
require.Equal(t, 0, c.healthStat["net:127.0.0.1"].Min())
|
||||||
|
}
|
||||||
|
|
||||||
func pieces(nodes []*nodeselection.SelectedNode, ix ...int) (res metabase.AliasPieces) {
|
func pieces(nodes []*nodeselection.SelectedNode, ix ...int) (res metabase.AliasPieces) {
|
||||||
for n, i := range ix {
|
for n, i := range ix {
|
||||||
res = append(res, metabase.AliasPiece{
|
res = append(res, metabase.AliasPiece{
|
||||||
|
Loading…
Reference in New Issue
Block a user