ad87d1de74
The easiest way to get node information WITH node tags is executing two queries: 1. select all nodes 2. select all tags And we can pair them with a loop, using the in-memory data structures. But this approach does work only, if we select all nodes, which is true when we use cache (upload, download, repair checker). But repair process selects only the required nodes, where this approach is suboptimal. (full table scan for all tags, even if we need only tags for a few dozens nodes). Possible solutions: 1. We can introduce a cache for repair (similar to upload cache) 2. Or we can select both node and tag information with one query (join). This patch implements the second approach. Note: repair itself is quite slow (10-20 seconds per segements to repair). With 15 seconds execution time and 3 minutes cache staleness, we would use the cache only 12 times per worker. Probably we don't need cache for now. https://github.com/storj/storj/issues/6198 Change-Id: I0364d94306e9815a1c280b71e843b8f504e3d870
809 lines
25 KiB
Go
809 lines
25 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package satellitedb_test
|
|
|
|
import (
|
|
"context"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"math/rand"
|
|
"net"
|
|
"strconv"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"storj.io/common/identity/testidentity"
|
|
"storj.io/common/pb"
|
|
"storj.io/common/storj"
|
|
"storj.io/common/storj/location"
|
|
"storj.io/common/testcontext"
|
|
"storj.io/common/testrand"
|
|
"storj.io/private/version"
|
|
"storj.io/storj/private/teststorj"
|
|
"storj.io/storj/satellite"
|
|
"storj.io/storj/satellite/nodeselection"
|
|
"storj.io/storj/satellite/overlay"
|
|
"storj.io/storj/satellite/satellitedb/satellitedbtest"
|
|
)
|
|
|
|
func TestGetOfflineNodesForEmail(t *testing.T) {
|
|
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
|
cache := db.OverlayCache()
|
|
|
|
selectionCfg := overlay.NodeSelectionConfig{
|
|
OnlineWindow: 4 * time.Hour,
|
|
}
|
|
|
|
offlineID := teststorj.NodeIDFromString("offlineNode")
|
|
onlineID := teststorj.NodeIDFromString("onlineNode")
|
|
disqualifiedID := teststorj.NodeIDFromString("dqNode")
|
|
exitedID := teststorj.NodeIDFromString("exitedNode")
|
|
offlineNoEmailID := teststorj.NodeIDFromString("noEmail")
|
|
|
|
checkInInfo := overlay.NodeCheckInInfo{
|
|
IsUp: true,
|
|
Address: &pb.NodeAddress{
|
|
Address: "1.2.3.4",
|
|
},
|
|
Version: &pb.NodeVersion{
|
|
Version: "v0.0.0",
|
|
CommitHash: "",
|
|
Timestamp: time.Time{},
|
|
Release: false,
|
|
},
|
|
Operator: &pb.NodeOperator{
|
|
Email: "offline@storj.test",
|
|
},
|
|
}
|
|
|
|
now := time.Now()
|
|
|
|
// offline node should be selected
|
|
checkInInfo.NodeID = offlineID
|
|
require.NoError(t, cache.UpdateCheckIn(ctx, checkInInfo, now.Add(-24*time.Hour), selectionCfg))
|
|
|
|
// online node should not be selected
|
|
checkInInfo.NodeID = onlineID
|
|
require.NoError(t, cache.UpdateCheckIn(ctx, checkInInfo, now, selectionCfg))
|
|
|
|
// disqualified node should not be selected
|
|
checkInInfo.NodeID = disqualifiedID
|
|
require.NoError(t, cache.UpdateCheckIn(ctx, checkInInfo, now.Add(-24*time.Hour), selectionCfg))
|
|
_, err := cache.DisqualifyNode(ctx, disqualifiedID, now, overlay.DisqualificationReasonUnknown)
|
|
require.NoError(t, err)
|
|
|
|
// exited node should not be selected
|
|
checkInInfo.NodeID = exitedID
|
|
require.NoError(t, cache.UpdateCheckIn(ctx, checkInInfo, now.Add(-24*time.Hour), selectionCfg))
|
|
_, err = cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{
|
|
NodeID: exitedID,
|
|
ExitInitiatedAt: now,
|
|
ExitLoopCompletedAt: now,
|
|
ExitFinishedAt: now,
|
|
ExitSuccess: true,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
// node with no email should not be selected
|
|
checkInInfo.NodeID = offlineNoEmailID
|
|
checkInInfo.Operator.Email = ""
|
|
require.NoError(t, cache.UpdateCheckIn(ctx, checkInInfo, time.Now().Add(-24*time.Hour), selectionCfg))
|
|
|
|
nodes, err := cache.GetOfflineNodesForEmail(ctx, selectionCfg.OnlineWindow, 72*time.Hour, 24*time.Hour, 10)
|
|
require.NoError(t, err)
|
|
require.Equal(t, 1, len(nodes))
|
|
require.NotEmpty(t, nodes[offlineID])
|
|
|
|
// test cutoff causes node to not be selected
|
|
nodes, err = cache.GetOfflineNodesForEmail(ctx, selectionCfg.OnlineWindow, time.Second, 24*time.Hour, 10)
|
|
require.NoError(t, err)
|
|
require.Empty(t, nodes)
|
|
})
|
|
}
|
|
|
|
func TestUpdateLastOfflineEmail(t *testing.T) {
|
|
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
|
cache := db.OverlayCache()
|
|
|
|
selectionCfg := overlay.NodeSelectionConfig{
|
|
OnlineWindow: 4 * time.Hour,
|
|
}
|
|
|
|
nodeID0 := teststorj.NodeIDFromString("testnode0")
|
|
nodeID1 := teststorj.NodeIDFromString("testnode1")
|
|
|
|
checkInInfo := overlay.NodeCheckInInfo{
|
|
IsUp: true,
|
|
Address: &pb.NodeAddress{
|
|
Address: "1.2.3.4",
|
|
},
|
|
Version: &pb.NodeVersion{
|
|
Version: "v0.0.0",
|
|
CommitHash: "",
|
|
Timestamp: time.Time{},
|
|
Release: false,
|
|
},
|
|
Operator: &pb.NodeOperator{
|
|
Email: "test@storj.test",
|
|
},
|
|
}
|
|
|
|
now := time.Now()
|
|
checkInInfo.NodeID = nodeID0
|
|
require.NoError(t, cache.UpdateCheckIn(ctx, checkInInfo, now, selectionCfg))
|
|
checkInInfo.NodeID = nodeID1
|
|
require.NoError(t, cache.UpdateCheckIn(ctx, checkInInfo, now, selectionCfg))
|
|
require.NoError(t, cache.UpdateLastOfflineEmail(ctx, []storj.NodeID{nodeID0, nodeID1}, now))
|
|
|
|
node0, err := cache.Get(ctx, nodeID0)
|
|
require.NoError(t, err)
|
|
require.Equal(t, now.Truncate(time.Second), node0.LastOfflineEmail.Truncate(time.Second))
|
|
|
|
node1, err := cache.Get(ctx, nodeID1)
|
|
require.NoError(t, err)
|
|
require.Equal(t, now.Truncate(time.Second), node1.LastOfflineEmail.Truncate(time.Second))
|
|
})
|
|
}
|
|
|
|
func TestSetNodeContained(t *testing.T) {
|
|
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
|
cache := db.OverlayCache()
|
|
|
|
nodeID := testrand.NodeID()
|
|
checkInInfo := overlay.NodeCheckInInfo{
|
|
IsUp: true,
|
|
Address: &pb.NodeAddress{
|
|
Address: "1.2.3.4",
|
|
},
|
|
Version: &pb.NodeVersion{
|
|
Version: "v0.0.0",
|
|
CommitHash: "",
|
|
Timestamp: time.Time{},
|
|
Release: false,
|
|
},
|
|
Operator: &pb.NodeOperator{
|
|
Email: "offline@storj.test",
|
|
},
|
|
}
|
|
|
|
now := time.Now()
|
|
|
|
// offline node should be selected
|
|
checkInInfo.NodeID = nodeID
|
|
require.NoError(t, cache.UpdateCheckIn(ctx, checkInInfo, now.Add(-24*time.Hour), overlay.NodeSelectionConfig{}))
|
|
|
|
cacheInfo, err := cache.Get(ctx, nodeID)
|
|
require.NoError(t, err)
|
|
require.False(t, cacheInfo.Contained)
|
|
|
|
err = cache.SetNodeContained(ctx, nodeID, true)
|
|
require.NoError(t, err)
|
|
|
|
cacheInfo, err = cache.Get(ctx, nodeID)
|
|
require.NoError(t, err)
|
|
require.True(t, cacheInfo.Contained)
|
|
|
|
err = cache.SetNodeContained(ctx, nodeID, false)
|
|
require.NoError(t, err)
|
|
|
|
cacheInfo, err = cache.Get(ctx, nodeID)
|
|
require.NoError(t, err)
|
|
require.False(t, cacheInfo.Contained)
|
|
})
|
|
}
|
|
|
|
func TestUpdateCheckInDirectUpdate(t *testing.T) {
|
|
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
|
cache := db.OverlayCache()
|
|
db.OverlayCache()
|
|
selectionCfg := overlay.NodeSelectionConfig{
|
|
OnlineWindow: 4 * time.Hour,
|
|
}
|
|
nodeID := teststorj.NodeIDFromString("testnode0")
|
|
checkInInfo := overlay.NodeCheckInInfo{
|
|
IsUp: true,
|
|
Address: &pb.NodeAddress{
|
|
Address: "1.2.3.4",
|
|
},
|
|
Version: &pb.NodeVersion{
|
|
Version: "v0.0.0",
|
|
CommitHash: "",
|
|
Timestamp: time.Time{},
|
|
Release: false,
|
|
},
|
|
Operator: &pb.NodeOperator{
|
|
Email: "test@storj.test",
|
|
},
|
|
}
|
|
now := time.Now().UTC()
|
|
checkInInfo.NodeID = nodeID
|
|
semVer, err := version.NewSemVer(checkInInfo.Version.Version)
|
|
require.NoError(t, err)
|
|
// node unknown - should not be updated by updateCheckInDirectUpdate
|
|
updated, err := cache.TestUpdateCheckInDirectUpdate(ctx, checkInInfo, now, semVer, "encodedwalletfeature")
|
|
require.NoError(t, err)
|
|
require.False(t, updated)
|
|
require.NoError(t, cache.UpdateCheckIn(ctx, checkInInfo, now, selectionCfg))
|
|
updated, err = cache.TestUpdateCheckInDirectUpdate(ctx, checkInInfo, now.Add(6*time.Hour), semVer, "encodedwalletfeature")
|
|
require.NoError(t, err)
|
|
require.True(t, updated)
|
|
})
|
|
}
|
|
|
|
func TestSetAllContainedNodes(t *testing.T) {
|
|
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
|
cache := db.OverlayCache()
|
|
|
|
node1 := testrand.NodeID()
|
|
node2 := testrand.NodeID()
|
|
node3 := testrand.NodeID()
|
|
|
|
// put nodes with these IDs in the db
|
|
for _, n := range []storj.NodeID{node1, node2, node3} {
|
|
checkInInfo := overlay.NodeCheckInInfo{
|
|
IsUp: true,
|
|
Address: &pb.NodeAddress{Address: "1.2.3.4"},
|
|
Version: &pb.NodeVersion{Version: "v0.0.0"},
|
|
NodeID: n,
|
|
}
|
|
err := cache.UpdateCheckIn(ctx, checkInInfo, time.Now().UTC(), overlay.NodeSelectionConfig{})
|
|
require.NoError(t, err)
|
|
}
|
|
// none of them should be contained
|
|
assertContained(ctx, t, cache, node1, false, node2, false, node3, false)
|
|
|
|
// Set node2 (only) to be contained
|
|
err := cache.SetAllContainedNodes(ctx, []storj.NodeID{node2})
|
|
require.NoError(t, err)
|
|
assertContained(ctx, t, cache, node1, false, node2, true, node3, false)
|
|
|
|
// Set node1 and node3 (only) to be contained
|
|
err = cache.SetAllContainedNodes(ctx, []storj.NodeID{node1, node3})
|
|
require.NoError(t, err)
|
|
assertContained(ctx, t, cache, node1, true, node2, false, node3, true)
|
|
|
|
// Set node1 (only) to be contained
|
|
err = cache.SetAllContainedNodes(ctx, []storj.NodeID{node1})
|
|
require.NoError(t, err)
|
|
assertContained(ctx, t, cache, node1, true, node2, false, node3, false)
|
|
|
|
// Set no nodes to be contained
|
|
err = cache.SetAllContainedNodes(ctx, []storj.NodeID{})
|
|
require.NoError(t, err)
|
|
assertContained(ctx, t, cache, node1, false, node2, false, node3, false)
|
|
})
|
|
}
|
|
|
|
func assertContained(ctx context.Context, t testing.TB, cache overlay.DB, args ...interface{}) {
|
|
require.Equal(t, 0, len(args)%2, "must be given an even number of args")
|
|
for n := 0; n < len(args); n += 2 {
|
|
nodeID := args[n].(storj.NodeID)
|
|
expectedContainment := args[n+1].(bool)
|
|
nodeInDB, err := cache.Get(ctx, nodeID)
|
|
require.NoError(t, err)
|
|
require.Equalf(t, expectedContainment, nodeInDB.Contained,
|
|
"Expected nodeID %v (args[%d]) contained = %v, but got %v",
|
|
nodeID, n, expectedContainment, nodeInDB.Contained)
|
|
}
|
|
}
|
|
|
|
func TestGetNodesNetwork(t *testing.T) {
|
|
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
|
cache := db.OverlayCache()
|
|
const (
|
|
distinctNetworks = 10
|
|
netMask = 28
|
|
nodesPerNetwork = 1 << (32 - netMask)
|
|
)
|
|
mask := net.CIDRMask(netMask, 32)
|
|
|
|
nodes := make([]storj.NodeID, distinctNetworks*nodesPerNetwork)
|
|
ips := make([]net.IP, len(nodes))
|
|
lastNets := make([]string, len(nodes))
|
|
setOfNets := make(map[string]struct{})
|
|
|
|
for n := range nodes {
|
|
nodes[n] = testrand.NodeID()
|
|
ips[n] = make(net.IP, 4)
|
|
binary.BigEndian.PutUint32(ips[n], uint32(n))
|
|
lastNets[n] = ips[n].Mask(mask).String()
|
|
setOfNets[lastNets[n]] = struct{}{}
|
|
|
|
checkInInfo := overlay.NodeCheckInInfo{
|
|
IsUp: true,
|
|
Address: &pb.NodeAddress{Address: ips[n].String()},
|
|
LastNet: lastNets[n],
|
|
Version: &pb.NodeVersion{Version: "v0.0.0"},
|
|
NodeID: nodes[n],
|
|
}
|
|
err := cache.UpdateCheckIn(ctx, checkInInfo, time.Now().UTC(), overlay.NodeSelectionConfig{})
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
t.Run("GetNodesNetwork", func(t *testing.T) {
|
|
gotLastNets, err := cache.GetNodesNetwork(ctx, nodes)
|
|
require.NoError(t, err)
|
|
require.Len(t, gotLastNets, len(nodes))
|
|
gotLastNetsSet := make(map[string]struct{})
|
|
for _, lastNet := range gotLastNets {
|
|
gotLastNetsSet[lastNet] = struct{}{}
|
|
}
|
|
require.Len(t, gotLastNetsSet, distinctNetworks)
|
|
for _, lastNet := range gotLastNets {
|
|
require.NotEmpty(t, lastNet)
|
|
delete(setOfNets, lastNet)
|
|
}
|
|
require.Empty(t, setOfNets) // indicates that all last_nets were seen in the result
|
|
})
|
|
|
|
t.Run("GetNodesNetworkInOrder", func(t *testing.T) {
|
|
nodesPlusOne := make([]storj.NodeID, len(nodes)+1)
|
|
copy(nodesPlusOne[:len(nodes)], nodes)
|
|
lastNetsPlusOne := make([]string, len(nodes)+1)
|
|
copy(lastNetsPlusOne[:len(nodes)], lastNets)
|
|
// add a node that the overlay cache doesn't know about
|
|
unknownNode := testrand.NodeID()
|
|
nodesPlusOne[len(nodes)] = unknownNode
|
|
lastNetsPlusOne[len(nodes)] = ""
|
|
|
|
// shuffle the order of the requested nodes, so we know output is in the right order
|
|
rand.Shuffle(len(nodesPlusOne), func(i, j int) {
|
|
nodesPlusOne[i], nodesPlusOne[j] = nodesPlusOne[j], nodesPlusOne[i]
|
|
lastNetsPlusOne[i], lastNetsPlusOne[j] = lastNetsPlusOne[j], lastNetsPlusOne[i]
|
|
})
|
|
|
|
gotLastNets, err := cache.GetNodesNetworkInOrder(ctx, nodesPlusOne)
|
|
require.NoError(t, err)
|
|
require.Len(t, gotLastNets, len(nodes)+1)
|
|
|
|
require.Equal(t, lastNetsPlusOne, gotLastNets)
|
|
})
|
|
})
|
|
}
|
|
|
|
func TestOverlayCache_SelectAllStorageNodesDownloadUpload(t *testing.T) {
|
|
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
|
tagSigner := testidentity.MustPregeneratedIdentity(0, storj.LatestIDVersion())
|
|
|
|
cache := db.OverlayCache()
|
|
const netMask = 28
|
|
mask := net.CIDRMask(netMask, 32)
|
|
|
|
infos := make([]overlay.NodeCheckInInfo, 5)
|
|
|
|
for n := range infos {
|
|
id := testrand.NodeID()
|
|
ip := net.IP{0, 0, 1, byte(n)}
|
|
lastNet := ip.Mask(mask).String()
|
|
|
|
infos[n] = overlay.NodeCheckInInfo{
|
|
IsUp: true,
|
|
Address: &pb.NodeAddress{Address: ip.String()},
|
|
LastNet: lastNet,
|
|
LastIPPort: "0.0.0.0:0",
|
|
Version: &pb.NodeVersion{Version: "v0.0.0"},
|
|
NodeID: id,
|
|
CountryCode: location.Canada,
|
|
}
|
|
err := cache.UpdateCheckIn(ctx, infos[n], time.Now().UTC(), overlay.NodeSelectionConfig{})
|
|
require.NoError(t, err)
|
|
|
|
if n%2 == 0 {
|
|
err = cache.UpdateNodeTags(ctx, nodeselection.NodeTags{
|
|
nodeselection.NodeTag{
|
|
NodeID: id,
|
|
SignedAt: time.Now(),
|
|
Signer: tagSigner.ID,
|
|
Name: "even",
|
|
Value: []byte{1},
|
|
},
|
|
})
|
|
require.NoError(t, err)
|
|
}
|
|
}
|
|
|
|
checkNodes := func(selectedNodes []*nodeselection.SelectedNode) {
|
|
selectedNodesMap := map[storj.NodeID]*nodeselection.SelectedNode{}
|
|
for _, node := range selectedNodes {
|
|
selectedNodesMap[node.ID] = node
|
|
}
|
|
|
|
for _, info := range infos {
|
|
selectedNode, ok := selectedNodesMap[info.NodeID]
|
|
require.True(t, ok)
|
|
|
|
require.Equal(t, info.NodeID, selectedNode.ID)
|
|
require.Equal(t, info.Address, selectedNode.Address)
|
|
require.Equal(t, info.CountryCode, selectedNode.CountryCode)
|
|
require.Equal(t, info.LastIPPort, selectedNode.LastIPPort)
|
|
require.Equal(t, info.LastNet, selectedNode.LastNet)
|
|
segments := strings.Split(selectedNode.Address.Address, ".")
|
|
origIndex, err := strconv.Atoi(segments[len(segments)-1])
|
|
require.NoError(t, err)
|
|
if origIndex%2 == 0 {
|
|
require.Len(t, selectedNode.Tags, 1)
|
|
require.Equal(t, "even", selectedNode.Tags[0].Name)
|
|
require.Equal(t, []byte{1}, selectedNode.Tags[0].Value)
|
|
} else {
|
|
require.Len(t, selectedNode.Tags, 0)
|
|
}
|
|
}
|
|
}
|
|
|
|
selectedNodes, err := cache.SelectAllStorageNodesDownload(ctx, time.Minute, overlay.AsOfSystemTimeConfig{})
|
|
require.NoError(t, err)
|
|
|
|
checkNodes(selectedNodes)
|
|
|
|
reputableNodes, newNodes, err := cache.SelectAllStorageNodesUpload(ctx, overlay.NodeSelectionConfig{
|
|
OnlineWindow: time.Minute,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
checkNodes(append(reputableNodes, newNodes...))
|
|
})
|
|
|
|
}
|
|
|
|
type nodeDisposition struct {
|
|
id storj.NodeID
|
|
address string
|
|
lastIPPort string
|
|
offlineInterval time.Duration
|
|
countryCode location.CountryCode
|
|
disqualified bool
|
|
auditSuspended bool
|
|
offlineSuspended bool
|
|
exiting bool
|
|
exited bool
|
|
}
|
|
|
|
func TestOverlayCache_GetNodes(t *testing.T) {
|
|
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
|
cache := db.OverlayCache()
|
|
|
|
allNodes := []nodeDisposition{
|
|
addNode(ctx, t, cache, "online ", "127.0.0.1", time.Second, false, false, false, false, false),
|
|
addNode(ctx, t, cache, "offline ", "127.0.0.2", 2*time.Hour, false, false, false, false, false),
|
|
addNode(ctx, t, cache, "disqualified ", "127.0.0.3", 2*time.Hour, true, false, false, false, false),
|
|
addNode(ctx, t, cache, "audit-suspended ", "127.0.0.4", time.Second, false, true, false, false, false),
|
|
addNode(ctx, t, cache, "offline-suspended", "127.0.0.5", 2*time.Hour, false, false, true, false, false),
|
|
addNode(ctx, t, cache, "exiting ", "127.0.0.5", 2*time.Hour, false, false, false, true, false),
|
|
addNode(ctx, t, cache, "exited ", "127.0.0.6", 2*time.Hour, false, false, false, false, true),
|
|
}
|
|
|
|
nodes := func(nodeNums ...int) []nodeDisposition {
|
|
nodeDisps := make([]nodeDisposition, len(nodeNums))
|
|
for i, nodeNum := range nodeNums {
|
|
nodeDisps[i] = allNodes[nodeNum]
|
|
}
|
|
return nodeDisps
|
|
}
|
|
|
|
sNodes := func(nodes ...int) []nodeselection.SelectedNode {
|
|
selectedNodes := make([]nodeselection.SelectedNode, len(nodes))
|
|
for i, nodeNum := range nodes {
|
|
selectedNodes[i] = nodeDispositionToSelectedNode(allNodes[nodeNum], time.Hour)
|
|
}
|
|
return selectedNodes
|
|
}
|
|
|
|
type testCase struct {
|
|
QueryNodes []nodeDisposition
|
|
Online []nodeselection.SelectedNode
|
|
Offline []nodeselection.SelectedNode
|
|
}
|
|
|
|
for testNum, tc := range []testCase{
|
|
{
|
|
QueryNodes: nodes(0, 1),
|
|
Online: sNodes(0),
|
|
Offline: sNodes(1),
|
|
},
|
|
{
|
|
QueryNodes: nodes(0),
|
|
Online: sNodes(0),
|
|
},
|
|
{
|
|
QueryNodes: nodes(1),
|
|
Offline: sNodes(1),
|
|
},
|
|
{ // only unreliable
|
|
QueryNodes: nodes(2, 3, 4, 5),
|
|
Online: sNodes(3),
|
|
Offline: sNodes(4, 5),
|
|
},
|
|
|
|
{ // all nodes
|
|
QueryNodes: allNodes,
|
|
Online: sNodes(0, 3),
|
|
Offline: sNodes(1, 4, 5),
|
|
},
|
|
// all nodes + one ID not from DB
|
|
{
|
|
QueryNodes: append(allNodes, nodeDisposition{
|
|
id: testrand.NodeID(),
|
|
disqualified: true, // just so we expect a zero ID for this entry
|
|
}),
|
|
Online: sNodes(0, 3),
|
|
Offline: sNodes(1, 4, 5),
|
|
},
|
|
} {
|
|
ids := make([]storj.NodeID, len(tc.QueryNodes))
|
|
for i := range tc.QueryNodes {
|
|
ids[i] = tc.QueryNodes[i].id
|
|
}
|
|
selectedNodes, err := cache.GetNodes(ctx, ids, 1*time.Hour, 0)
|
|
require.NoError(t, err)
|
|
require.Equal(t, len(tc.QueryNodes), len(selectedNodes))
|
|
var gotOnline []nodeselection.SelectedNode
|
|
var gotOffline []nodeselection.SelectedNode
|
|
for i, n := range selectedNodes {
|
|
if tc.QueryNodes[i].disqualified || tc.QueryNodes[i].exited {
|
|
assert.Zero(t, n, testNum, i)
|
|
} else {
|
|
assert.Equal(t, tc.QueryNodes[i].id, selectedNodes[i].ID, "%d:%d", testNum, i)
|
|
if n.Online {
|
|
gotOnline = append(gotOnline, n)
|
|
} else {
|
|
gotOffline = append(gotOffline, n)
|
|
}
|
|
}
|
|
}
|
|
assert.Equal(t, tc.Online, gotOnline)
|
|
assert.Equal(t, tc.Offline, gotOffline)
|
|
}
|
|
|
|
// test empty id list
|
|
_, err := cache.GetNodes(ctx, storj.NodeIDList{}, 1*time.Hour, 0)
|
|
require.Error(t, err)
|
|
|
|
// test as of system time
|
|
allIDs := make([]storj.NodeID, len(allNodes))
|
|
for i := range allNodes {
|
|
allIDs[i] = allNodes[i].id
|
|
}
|
|
_, err = cache.GetNodes(ctx, allIDs, 1*time.Hour, -1*time.Microsecond)
|
|
require.NoError(t, err)
|
|
})
|
|
}
|
|
|
|
func TestOverlayCache_GetParticipatingNodes(t *testing.T) {
|
|
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
|
cache := db.OverlayCache()
|
|
|
|
allNodes := []nodeDisposition{
|
|
addNode(ctx, t, cache, "online ", "127.0.0.1", time.Second, false, false, false, false, false),
|
|
addNode(ctx, t, cache, "offline ", "127.0.0.2", 2*time.Hour, false, false, false, false, false),
|
|
addNode(ctx, t, cache, "disqualified ", "127.0.0.3", 2*time.Hour, true, false, false, false, false),
|
|
addNode(ctx, t, cache, "audit-suspended ", "127.0.0.4", time.Second, false, true, false, false, false),
|
|
addNode(ctx, t, cache, "offline-suspended", "127.0.0.5", 2*time.Hour, false, false, true, false, false),
|
|
addNode(ctx, t, cache, "exiting ", "127.0.0.5", 2*time.Hour, false, false, false, true, false),
|
|
addNode(ctx, t, cache, "exited ", "127.0.0.6", 2*time.Hour, false, false, false, false, true),
|
|
}
|
|
|
|
type testCase struct {
|
|
OnlineWindow time.Duration
|
|
Online []int
|
|
Offline []int
|
|
}
|
|
|
|
for i, tc := range []testCase{
|
|
{
|
|
OnlineWindow: 1 * time.Hour,
|
|
Online: []int{0, 3},
|
|
Offline: []int{1, 4, 5},
|
|
},
|
|
{
|
|
OnlineWindow: 20 * time.Hour,
|
|
Online: []int{0, 1, 3, 4, 5},
|
|
},
|
|
{
|
|
OnlineWindow: 1 * time.Microsecond,
|
|
Offline: []int{0, 1, 3, 4, 5},
|
|
},
|
|
} {
|
|
expectedNodes := make([]nodeselection.SelectedNode, 0, len(tc.Offline)+len(tc.Online))
|
|
for _, num := range tc.Online {
|
|
selectedNode := nodeDispositionToSelectedNode(allNodes[num], 0)
|
|
selectedNode.Online = true
|
|
expectedNodes = append(expectedNodes, selectedNode)
|
|
}
|
|
for _, num := range tc.Offline {
|
|
selectedNode := nodeDispositionToSelectedNode(allNodes[num], 0)
|
|
selectedNode.Online = false
|
|
expectedNodes = append(expectedNodes, selectedNode)
|
|
}
|
|
gotNodes, err := cache.GetParticipatingNodes(ctx, tc.OnlineWindow, 0)
|
|
require.NoError(t, err)
|
|
require.ElementsMatch(t, expectedNodes, gotNodes, "#%d", i)
|
|
}
|
|
|
|
// test as of system time
|
|
_, err := cache.GetParticipatingNodes(ctx, 1*time.Hour, -1*time.Microsecond)
|
|
require.NoError(t, err)
|
|
})
|
|
}
|
|
|
|
func nodeDispositionToSelectedNode(disp nodeDisposition, onlineWindow time.Duration) nodeselection.SelectedNode {
|
|
if disp.exited || disp.disqualified {
|
|
return nodeselection.SelectedNode{}
|
|
}
|
|
return nodeselection.SelectedNode{
|
|
ID: disp.id,
|
|
Address: &pb.NodeAddress{Address: disp.address},
|
|
LastNet: disp.lastIPPort,
|
|
LastIPPort: disp.lastIPPort,
|
|
CountryCode: disp.countryCode,
|
|
Exiting: disp.exiting,
|
|
Suspended: disp.auditSuspended || disp.offlineSuspended,
|
|
Online: disp.offlineInterval <= onlineWindow,
|
|
}
|
|
}
|
|
|
|
func addNode(ctx context.Context, t *testing.T, cache overlay.DB, address, lastIPPort string, offlineInterval time.Duration, disqualified, auditSuspended, offlineSuspended, exiting, exited bool) nodeDisposition {
|
|
disp := nodeDisposition{
|
|
id: testrand.NodeID(),
|
|
address: address,
|
|
lastIPPort: lastIPPort,
|
|
offlineInterval: offlineInterval,
|
|
countryCode: location.Poland,
|
|
disqualified: disqualified,
|
|
auditSuspended: auditSuspended,
|
|
offlineSuspended: offlineSuspended,
|
|
exiting: exiting,
|
|
exited: exited,
|
|
}
|
|
|
|
checkInInfo := overlay.NodeCheckInInfo{
|
|
IsUp: true,
|
|
NodeID: disp.id,
|
|
Address: &pb.NodeAddress{Address: disp.address},
|
|
LastIPPort: disp.lastIPPort,
|
|
LastNet: disp.lastIPPort,
|
|
CountryCode: disp.countryCode,
|
|
Version: &pb.NodeVersion{Version: "v0.0.0"},
|
|
}
|
|
|
|
timestamp := time.Now().UTC().Add(-disp.offlineInterval)
|
|
|
|
err := cache.UpdateCheckIn(ctx, checkInInfo, timestamp, overlay.NodeSelectionConfig{})
|
|
require.NoError(t, err)
|
|
|
|
if disqualified {
|
|
_, err := cache.DisqualifyNode(ctx, disp.id, time.Now(), overlay.DisqualificationReasonAuditFailure)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
if auditSuspended {
|
|
require.NoError(t, cache.TestSuspendNodeUnknownAudit(ctx, disp.id, time.Now()))
|
|
}
|
|
|
|
if offlineSuspended {
|
|
require.NoError(t, cache.TestSuspendNodeOffline(ctx, disp.id, time.Now()))
|
|
}
|
|
|
|
if exiting {
|
|
now := time.Now()
|
|
_, err = cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{
|
|
NodeID: disp.id,
|
|
ExitInitiatedAt: now,
|
|
})
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
if exited {
|
|
now := time.Now()
|
|
_, err = cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{
|
|
NodeID: disp.id,
|
|
ExitInitiatedAt: now,
|
|
ExitLoopCompletedAt: now,
|
|
ExitFinishedAt: now,
|
|
ExitSuccess: true,
|
|
})
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
return disp
|
|
}
|
|
|
|
func TestOverlayCache_KnownReliableTagHandling(t *testing.T) {
|
|
signer := testidentity.MustPregeneratedIdentity(0, storj.LatestIDVersion())
|
|
|
|
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
|
|
|
cache := db.OverlayCache()
|
|
|
|
// GIVEN
|
|
|
|
var ids []storj.NodeID
|
|
for i := 0; i < 10; i++ {
|
|
address := fmt.Sprintf("127.0.0.%d", i)
|
|
checkInInfo := overlay.NodeCheckInInfo{
|
|
IsUp: true,
|
|
NodeID: testidentity.MustPregeneratedIdentity(i+1, storj.LatestIDVersion()).ID,
|
|
Address: &pb.NodeAddress{Address: address},
|
|
LastIPPort: address + ":1234",
|
|
LastNet: "127.0.0.0",
|
|
CountryCode: location.Romania,
|
|
Version: &pb.NodeVersion{Version: "v0.0.0"},
|
|
}
|
|
|
|
ids = append(ids, checkInInfo.NodeID)
|
|
|
|
checkin := time.Now().UTC()
|
|
if i%2 == 0 {
|
|
checkin = checkin.Add(-50 * time.Hour)
|
|
}
|
|
err := cache.UpdateCheckIn(ctx, checkInInfo, checkin, overlay.NodeSelectionConfig{})
|
|
require.NoError(t, err)
|
|
|
|
tags := nodeselection.NodeTags{}
|
|
|
|
if i%2 == 0 {
|
|
tags = append(tags, nodeselection.NodeTag{
|
|
SignedAt: time.Now(),
|
|
Signer: signer.ID,
|
|
NodeID: checkInInfo.NodeID,
|
|
Name: "index",
|
|
Value: []byte{byte(i)},
|
|
})
|
|
}
|
|
if i%4 == 0 {
|
|
tags = append(tags, nodeselection.NodeTag{
|
|
SignedAt: time.Now(),
|
|
Signer: signer.ID,
|
|
NodeID: checkInInfo.NodeID,
|
|
Name: "selected",
|
|
Value: []byte("true"),
|
|
})
|
|
}
|
|
|
|
if len(tags) > 0 {
|
|
require.NoError(t, err)
|
|
err = cache.UpdateNodeTags(ctx, tags)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
}
|
|
|
|
// WHEN
|
|
nodes, err := cache.GetNodes(ctx, ids, 1*time.Hour, 0)
|
|
require.NoError(t, err)
|
|
|
|
// THEN
|
|
require.Len(t, nodes, 10)
|
|
|
|
checkTag := func(tags nodeselection.NodeTags, name string, value []byte) {
|
|
tag1, err := tags.FindBySignerAndName(signer.ID, name)
|
|
require.NoError(t, err)
|
|
require.Equal(t, name, tag1.Name)
|
|
require.Equal(t, value, tag1.Value)
|
|
require.Equal(t, signer.ID, tag1.Signer)
|
|
require.True(t, time.Since(tag1.SignedAt) < 1*time.Hour)
|
|
}
|
|
|
|
for _, node := range nodes {
|
|
ipParts := strings.Split(node.Address.Address, ".")
|
|
ix, err := strconv.Atoi(ipParts[3])
|
|
require.NoError(t, err)
|
|
|
|
if ix%4 == 0 {
|
|
require.Len(t, node.Tags, 2)
|
|
checkTag(node.Tags, "selected", []byte("true"))
|
|
checkTag(node.Tags, "index", []byte{byte(ix)})
|
|
} else if ix%2 == 0 {
|
|
checkTag(node.Tags, "index", []byte{byte(ix)})
|
|
require.Len(t, node.Tags, 1)
|
|
} else {
|
|
require.Len(t, node.Tags, 0)
|
|
}
|
|
}
|
|
})
|
|
}
|