satellite/overlay: change Reliable and KnownReliable

as GetParticipatingNodes and GetNodes, respectively.

We now want these functions to include offline and suspended nodes as
well, so that we can force immediate repair when pieces are out of
placement or in excluded countries. With that change, the old names no
longer made sense.

Change-Id: Icbcbad43dbde0ca8cbc80a4d17a896bb89b078b7
This commit is contained in:
Márton Elek 2023-08-21 13:59:54 +02:00 committed by Storj Robot
parent 6896241933
commit e2006d821c
15 changed files with 589 additions and 446 deletions

View File

@ -27,6 +27,7 @@ import (
"storj.io/storj/private/revocation"
"storj.io/storj/private/server"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite/nodeselection"
"storj.io/uplink"
"storj.io/uplink/private/metaclient"
)
@ -105,8 +106,14 @@ func TestDownloadWithSomeNodesOffline(t *testing.T) {
}
// confirm that we marked the correct number of storage nodes as offline
online, _, err := satellite.Overlay.Service.Reliable(ctx)
allNodes, err := satellite.Overlay.Service.GetParticipatingNodes(ctx)
require.NoError(t, err)
online := make([]nodeselection.SelectedNode, 0, len(allNodes))
for _, node := range allNodes {
if node.Online {
online = append(online, node)
}
}
require.Len(t, online, len(planet.StorageNodes)-toKill)
// we should be able to download data without any of the original nodes

View File

@ -528,7 +528,6 @@ func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, segment
func (service *Service) CreateGracefulExitPutOrderLimit(ctx context.Context, bucket metabase.BucketLocation, nodeID storj.NodeID, pieceNum int32, rootPieceID storj.PieceID, shareSize int32) (limit *pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error) {
defer mon.Task()(&ctx)(&err)
// should this use KnownReliable or similar?
node, err := service.overlay.Get(ctx, nodeID)
if err != nil {
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)

View File

@ -64,12 +64,19 @@ func BenchmarkOverlay(b *testing.B) {
check = append(check, testrand.NodeID())
}
b.Run("KnownReliable", func(b *testing.B) {
b.Run("GetNodes", func(b *testing.B) {
onlineWindow := 1000 * time.Hour
for i := 0; i < b.N; i++ {
online, _, err := overlaydb.KnownReliable(ctx, check, onlineWindow, 0)
selectedNodes, err := overlaydb.GetNodes(ctx, check, onlineWindow, 0)
require.NoError(b, err)
require.Len(b, online, OnlineCount)
require.Len(b, selectedNodes, len(check))
foundOnline := 0
for _, n := range selectedNodes {
if n.Online {
foundOnline++
}
}
require.Equal(b, OnlineCount, foundOnline)
}
})

View File

@ -20,7 +20,6 @@ import (
"github.com/zeebo/errs"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"golang.org/x/exp/slices"
"storj.io/common/identity/testidentity"
"storj.io/common/memory"
@ -118,38 +117,39 @@ func TestOnlineOffline(t *testing.T) {
satellite := planet.Satellites[0]
service := satellite.Overlay.Service
online, offline, err := service.KnownReliable(ctx, []storj.NodeID{
selectedNodes, err := service.GetNodes(ctx, []storj.NodeID{
planet.StorageNodes[0].ID(),
})
require.NoError(t, err)
require.Empty(t, offline)
require.Len(t, online, 1)
require.Len(t, selectedNodes, 1)
require.True(t, selectedNodes[0].Online)
online, offline, err = service.KnownReliable(ctx, []storj.NodeID{
selectedNodes, err = service.GetNodes(ctx, []storj.NodeID{
planet.StorageNodes[0].ID(),
planet.StorageNodes[1].ID(),
planet.StorageNodes[2].ID(),
})
require.NoError(t, err)
require.Empty(t, offline)
require.Len(t, online, 3)
require.Len(t, selectedNodes, 3)
for i := 0; i < 3; i++ {
require.True(t, selectedNodes[i].Online, i)
require.Equal(t, planet.StorageNodes[i].ID(), selectedNodes[i].ID, i)
}
unreliableNodeID := storj.NodeID{1, 2, 3, 4}
online, offline, err = service.KnownReliable(ctx, []storj.NodeID{
selectedNodes, err = service.GetNodes(ctx, []storj.NodeID{
planet.StorageNodes[0].ID(),
unreliableNodeID,
planet.StorageNodes[2].ID(),
})
require.NoError(t, err)
require.Empty(t, offline)
require.Len(t, online, 2)
require.False(t, slices.ContainsFunc(online, func(node nodeselection.SelectedNode) bool {
return node.ID == unreliableNodeID
}))
require.False(t, slices.ContainsFunc(offline, func(node nodeselection.SelectedNode) bool {
return node.ID == unreliableNodeID
}))
require.Len(t, selectedNodes, 3)
require.True(t, selectedNodes[0].Online)
require.False(t, selectedNodes[1].Online)
require.True(t, selectedNodes[2].Online)
require.Equal(t, planet.StorageNodes[0].ID(), selectedNodes[0].ID)
require.Equal(t, storj.NodeID{}, selectedNodes[1].ID)
require.Equal(t, planet.StorageNodes[2].ID(), selectedNodes[2].ID)
})
}

View File

@ -11,7 +11,6 @@ import (
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/exp/maps"
"storj.io/common/pb"
"storj.io/common/storj"
@ -19,7 +18,6 @@ import (
"storj.io/common/sync2"
"storj.io/private/version"
"storj.io/storj/satellite/geoip"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/nodeevents"
"storj.io/storj/satellite/nodeselection"
)
@ -62,10 +60,15 @@ type DB interface {
// Get looks up the node by nodeID
Get(ctx context.Context, nodeID storj.NodeID) (*NodeDossier, error)
// KnownReliable filters a set of nodes to reliable (online and qualified) nodes.
KnownReliable(ctx context.Context, nodeIDs storj.NodeIDList, onlineWindow, asOfSystemInterval time.Duration) (online []nodeselection.SelectedNode, offline []nodeselection.SelectedNode, err error)
// Reliable returns all nodes that are reliable (separated by whether they are currently online or offline).
Reliable(ctx context.Context, onlineWindow, asOfSystemInterval time.Duration) (online []nodeselection.SelectedNode, offline []nodeselection.SelectedNode, err error)
// GetNodes gets records for all specified nodes as of the given system interval. The
// onlineWindow is used to determine whether each node is marked as Online. The results are
// returned in a slice of the same length as the input nodeIDs, and each index of the returned
// list corresponds to the same index in nodeIDs. If a node is not known, or is disqualified
// or exited, the corresponding returned SelectedNode will have a zero value.
GetNodes(ctx context.Context, nodeIDs storj.NodeIDList, onlineWindow, asOfSystemInterval time.Duration) (_ []nodeselection.SelectedNode, err error)
// GetParticipatingNodes returns all known participating nodes (this includes all known nodes
// excluding nodes that have been disqualified or gracefully exited).
GetParticipatingNodes(ctx context.Context, onlineWindow, asOfSystemInterval time.Duration) (_ []nodeselection.SelectedNode, err error)
// UpdateReputation updates the DB columns for all reputation fields in ReputationStatus.
UpdateReputation(ctx context.Context, id storj.NodeID, request ReputationUpdate) error
// UpdateNodeInfo updates node dossier with info requested from the node itself like node type, email, wallet, capacity, and version.
@ -486,20 +489,25 @@ func (service *Service) InsertOfflineNodeEvents(ctx context.Context, cooldown ti
return count, err
}
// KnownReliable filters a set of nodes to reliable (online and qualified) nodes.
func (service *Service) KnownReliable(ctx context.Context, nodeIDs storj.NodeIDList) (onlineNodes []nodeselection.SelectedNode, offlineNodes []nodeselection.SelectedNode, err error) {
// GetNodes gets records for all specified nodes. The configured OnlineWindow is used to determine
// whether each node is marked as Online. The results are returned in a slice of the same length as
// the input nodeIDs, and each index of the returned list corresponds to the same index in nodeIDs.
// If a node is not known, or is disqualified or exited, the corresponding returned SelectedNode
// will have a zero value.
func (service *Service) GetNodes(ctx context.Context, nodeIDs storj.NodeIDList) (records []nodeselection.SelectedNode, err error) {
defer mon.Task()(&ctx)(&err)
// TODO add as of system time
return service.db.KnownReliable(ctx, nodeIDs, service.config.Node.OnlineWindow, 0)
return service.db.GetNodes(ctx, nodeIDs, service.config.Node.OnlineWindow, 0)
}
// Reliable returns all nodes that are reliable (separated by whether they are currently online or offline).
func (service *Service) Reliable(ctx context.Context) (online []nodeselection.SelectedNode, offline []nodeselection.SelectedNode, err error) {
// GetParticipatingNodes returns all known participating nodes (this includes all known nodes
// excluding nodes that have been disqualified or gracefully exited).
func (service *Service) GetParticipatingNodes(ctx context.Context) (records []nodeselection.SelectedNode, err error) {
defer mon.Task()(&ctx)(&err)
// TODO add as of system tim.
return service.db.Reliable(ctx, service.config.Node.OnlineWindow, 0)
// TODO add as of system time.
return service.db.GetParticipatingNodes(ctx, service.config.Node.OnlineWindow, 0)
}
// UpdateReputation updates the DB columns for any of the reputation fields.
@ -660,28 +668,6 @@ func (service *Service) UpdateCheckIn(ctx context.Context, node NodeCheckInInfo,
return nil
}
// GetMissingPieces returns the list of offline nodes and the corresponding pieces.
func (service *Service) GetMissingPieces(ctx context.Context, pieces metabase.Pieces) (missingPieces []uint16, err error) {
defer mon.Task()(&ctx)(&err)
// TODO this method will be removed completely in subsequent change
var nodeIDs storj.NodeIDList
missingPiecesMap := map[storj.NodeID]uint16{}
for _, p := range pieces {
nodeIDs = append(nodeIDs, p.StorageNode)
missingPiecesMap[p.StorageNode] = p.Number
}
onlineNodes, _, err := service.KnownReliable(ctx, nodeIDs)
if err != nil {
return nil, Error.New("error getting nodes %s", err)
}
for _, node := range onlineNodes {
delete(missingPiecesMap, node.ID)
}
return maps.Values(missingPiecesMap), nil
}
// DQNodesLastSeenBefore disqualifies nodes who have not been contacted since the cutoff time.
func (service *Service) DQNodesLastSeenBefore(ctx context.Context, cutoff time.Time, limit int) (count int, err error) {
defer mon.Task()(&ctx)(&err)

View File

@ -6,7 +6,6 @@ package overlay_test
import (
"context"
"fmt"
"sort"
"testing"
"time"
@ -383,7 +382,7 @@ func TestNodeInfo(t *testing.T) {
})
}
func TestKnownReliable(t *testing.T) {
func TestGetNodes(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
@ -428,8 +427,8 @@ func TestKnownReliable(t *testing.T) {
err = oc.TestSuspendNodeOffline(ctx, planet.StorageNodes[3].ID(), time.Now())
require.NoError(t, err)
// Check that only storage nodes #4 and #5 are reliable
online, _, err := service.KnownReliable(ctx, []storj.NodeID{
// Check that the results of GetNodes match expectations.
selectedNodes, err := service.GetNodes(ctx, []storj.NodeID{
planet.StorageNodes[0].ID(),
planet.StorageNodes[1].ID(),
planet.StorageNodes[2].ID(),
@ -438,20 +437,26 @@ func TestKnownReliable(t *testing.T) {
planet.StorageNodes[5].ID(),
})
require.NoError(t, err)
require.Len(t, online, 2)
require.Len(t, selectedNodes, 6)
require.False(t, selectedNodes[0].Online)
require.Zero(t, selectedNodes[0]) // node was disqualified
require.False(t, selectedNodes[1].Online)
require.False(t, selectedNodes[1].Suspended)
require.True(t, selectedNodes[2].Online)
require.True(t, selectedNodes[2].Suspended)
require.True(t, selectedNodes[3].Online)
require.True(t, selectedNodes[3].Suspended)
require.True(t, selectedNodes[4].Online)
require.False(t, selectedNodes[4].Suspended)
require.True(t, selectedNodes[5].Online)
require.False(t, selectedNodes[5].Suspended)
// Sort the storage nodes for predictable checks
expectedReliable := []storj.NodeURL{
planet.StorageNodes[4].NodeURL(),
planet.StorageNodes[5].NodeURL(),
}
sort.Slice(expectedReliable, func(i, j int) bool { return expectedReliable[i].ID.Less(expectedReliable[j].ID) })
sort.Slice(online, func(i, j int) bool { return online[i].ID.Less(online[j].ID) })
// Assert the reliable nodes are the expected ones
for i, node := range online {
assert.Equal(t, expectedReliable[i].ID, node.ID)
assert.Equal(t, expectedReliable[i].Address, node.Address.Address)
// Assert the returned nodes are the expected ones
for i, node := range selectedNodes {
if i == 0 {
continue
}
assert.Equal(t, planet.StorageNodes[i].ID(), node.ID)
}
})
}

View File

@ -11,7 +11,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
"storj.io/common/pb"
"storj.io/common/storj"
@ -30,118 +29,151 @@ func TestStatDB(t *testing.T) {
}
func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) {
{ // Test KnownReliable and Reliable
for i, tt := range []struct {
nodeID storj.NodeID
unknownAuditSuspended bool
offlineSuspended bool
disqualified bool
offline bool
gracefullyexited bool
countryCode string
}{
{storj.NodeID{1}, false, false, false, false, false, "DE"}, // good
{storj.NodeID{2}, false, false, true, false, false, "DE"}, // disqualified
{storj.NodeID{3}, true, false, false, false, false, "DE"}, // unknown audit suspended
{storj.NodeID{4}, false, false, false, true, false, "DE"}, // offline
{storj.NodeID{5}, false, false, false, false, true, "DE"}, // gracefully exited
{storj.NodeID{6}, false, true, false, false, false, "DE"}, // offline suspended
{storj.NodeID{7}, false, false, false, false, false, "FR"}, // excluded country
{storj.NodeID{8}, false, false, false, false, false, ""}, // good
} {
addr := fmt.Sprintf("127.0.%d.0:8080", i)
lastNet := fmt.Sprintf("127.0.%d", i)
d := overlay.NodeCheckInInfo{
NodeID: tt.nodeID,
Address: &pb.NodeAddress{Address: addr},
LastIPPort: addr,
LastNet: lastNet,
Version: &pb.NodeVersion{Version: "v1.0.0"},
Capacity: &pb.NodeCapacity{},
IsUp: true,
CountryCode: location.ToCountryCode(tt.countryCode),
}
err := cache.UpdateCheckIn(ctx, d, time.Now().UTC(), overlay.NodeSelectionConfig{})
for i, tt := range []struct {
nodeID storj.NodeID
unknownAuditSuspended bool
offlineSuspended bool
disqualified bool
offline bool
gracefullyExited bool
countryCode string
}{
{storj.NodeID{1}, false, false, false, false, false, "DE"}, // good
{storj.NodeID{2}, false, false, true, false, false, "DE"}, // disqualified
{storj.NodeID{3}, true, false, false, false, false, "DE"}, // unknown audit suspended
{storj.NodeID{4}, false, false, false, true, false, "DE"}, // offline
{storj.NodeID{5}, false, false, false, false, true, "DE"}, // gracefully exited
{storj.NodeID{6}, false, true, false, false, false, "DE"}, // offline suspended
{storj.NodeID{7}, false, false, false, false, false, "FR"}, // excluded country
{storj.NodeID{8}, false, false, false, false, false, ""}, // good
} {
addr := fmt.Sprintf("127.0.%d.0:8080", i)
lastNet := fmt.Sprintf("127.0.%d", i)
d := overlay.NodeCheckInInfo{
NodeID: tt.nodeID,
Address: &pb.NodeAddress{Address: addr},
LastIPPort: addr,
LastNet: lastNet,
Version: &pb.NodeVersion{Version: "v1.0.0"},
Capacity: &pb.NodeCapacity{},
IsUp: true,
CountryCode: location.ToCountryCode(tt.countryCode),
}
err := cache.UpdateCheckIn(ctx, d, time.Now().UTC(), overlay.NodeSelectionConfig{})
require.NoError(t, err)
if tt.unknownAuditSuspended {
err = cache.TestSuspendNodeUnknownAudit(ctx, tt.nodeID, time.Now())
require.NoError(t, err)
if tt.unknownAuditSuspended {
err = cache.TestSuspendNodeUnknownAudit(ctx, tt.nodeID, time.Now())
require.NoError(t, err)
}
if tt.offlineSuspended {
err = cache.TestSuspendNodeOffline(ctx, tt.nodeID, time.Now())
require.NoError(t, err)
}
if tt.disqualified {
_, err = cache.DisqualifyNode(ctx, tt.nodeID, time.Now(), overlay.DisqualificationReasonUnknown)
require.NoError(t, err)
}
if tt.offline {
checkInInfo := getNodeInfo(tt.nodeID)
err = cache.UpdateCheckIn(ctx, checkInInfo, time.Now().Add(-2*time.Hour), overlay.NodeSelectionConfig{})
require.NoError(t, err)
}
if tt.gracefullyexited {
req := &overlay.ExitStatusRequest{
NodeID: tt.nodeID,
ExitInitiatedAt: time.Now(),
ExitLoopCompletedAt: time.Now(),
ExitFinishedAt: time.Now(),
}
_, err := cache.UpdateExitStatus(ctx, req)
require.NoError(t, err)
}
}
nodeIds := storj.NodeIDList{
storj.NodeID{1}, storj.NodeID{2},
storj.NodeID{3}, storj.NodeID{4},
storj.NodeID{5}, storj.NodeID{6},
storj.NodeID{7}, storj.NodeID{8},
storj.NodeID{9},
if tt.offlineSuspended {
err = cache.TestSuspendNodeOffline(ctx, tt.nodeID, time.Now())
require.NoError(t, err)
}
contains := func(nodeID storj.NodeID) func(node nodeselection.SelectedNode) bool {
return func(node nodeselection.SelectedNode) bool {
return node.ID == nodeID
if tt.disqualified {
_, err = cache.DisqualifyNode(ctx, tt.nodeID, time.Now(), overlay.DisqualificationReasonUnknown)
require.NoError(t, err)
}
if tt.offline {
checkInInfo := getNodeInfo(tt.nodeID)
checkInInfo.CountryCode = location.ToCountryCode(tt.countryCode)
err = cache.UpdateCheckIn(ctx, checkInInfo, time.Now().Add(-2*time.Hour), overlay.NodeSelectionConfig{})
require.NoError(t, err)
}
if tt.gracefullyExited {
req := &overlay.ExitStatusRequest{
NodeID: tt.nodeID,
ExitInitiatedAt: time.Now(),
ExitLoopCompletedAt: time.Now(),
ExitFinishedAt: time.Now(),
}
_, err := cache.UpdateExitStatus(ctx, req)
require.NoError(t, err)
}
online, offline, err := cache.KnownReliable(ctx, nodeIds, time.Hour, 0)
require.NoError(t, err)
// unrealiable nodes shouldn't be in results
require.False(t, slices.ContainsFunc(append(online, offline...), contains(storj.NodeID{2}))) // disqualified
require.False(t, slices.ContainsFunc(append(online, offline...), contains(storj.NodeID{3}))) // unknown audit suspended
require.False(t, slices.ContainsFunc(append(online, offline...), contains(storj.NodeID{5}))) // gracefully exited
require.False(t, slices.ContainsFunc(append(online, offline...), contains(storj.NodeID{6}))) // offline suspended
require.False(t, slices.ContainsFunc(append(online, offline...), contains(storj.NodeID{9}))) // not in db
require.True(t, slices.ContainsFunc(offline, contains(storj.NodeID{4}))) // offline
// KnownReliable is not excluding by country anymore
require.True(t, slices.ContainsFunc(online, contains(storj.NodeID{7}))) // excluded country
require.Len(t, append(online, offline...), 4)
online, offline, err = cache.Reliable(ctx, time.Hour, 0)
require.NoError(t, err)
require.False(t, slices.ContainsFunc(append(online, offline...), contains(storj.NodeID{2}))) // disqualified
require.False(t, slices.ContainsFunc(append(online, offline...), contains(storj.NodeID{3}))) // unknown audit suspended
require.False(t, slices.ContainsFunc(append(online, offline...), contains(storj.NodeID{5}))) // gracefully exited
require.False(t, slices.ContainsFunc(append(online, offline...), contains(storj.NodeID{6}))) // offline suspended
require.False(t, slices.ContainsFunc(append(online, offline...), contains(storj.NodeID{9}))) // not in db
require.True(t, slices.ContainsFunc(offline, contains(storj.NodeID{4}))) // offline
// Reliable is not excluding by country anymore
require.True(t, slices.ContainsFunc(online, contains(storj.NodeID{7}))) // excluded country
require.Len(t, append(online, offline...), 4)
}
{ // TestUpdateOperator
nodeIds := storj.NodeIDList{
storj.NodeID{1}, storj.NodeID{2},
storj.NodeID{3}, storj.NodeID{4},
storj.NodeID{5}, storj.NodeID{6},
storj.NodeID{7}, storj.NodeID{8},
storj.NodeID{9},
}
t.Run("GetNodes", func(t *testing.T) {
selectedNodes, err := cache.GetNodes(ctx, nodeIds, time.Hour, 0)
require.NoError(t, err)
require.Len(t, selectedNodes, len(nodeIds))
// disqualified/exited/unknown nodes should be returned as a zero-value SelectedNode in results
require.Zero(t, selectedNodes[1].ID) // #2 is disqualified
require.False(t, selectedNodes[1].Online)
require.Zero(t, selectedNodes[4].ID) // #5 gracefully exited
require.False(t, selectedNodes[4].Online)
require.Zero(t, selectedNodes[8].ID) // #9 is not in db
require.False(t, selectedNodes[8].Online)
require.Equal(t, nodeIds[0], selectedNodes[0].ID) // #1 is online
require.True(t, selectedNodes[0].Online)
require.Equal(t, "DE", selectedNodes[0].CountryCode.String())
require.Equal(t, nodeIds[2], selectedNodes[2].ID) // #3 is unknown-audit-suspended
require.True(t, selectedNodes[2].Online)
require.Equal(t, "DE", selectedNodes[2].CountryCode.String())
require.Equal(t, nodeIds[3], selectedNodes[3].ID) // #4 is offline
require.False(t, selectedNodes[3].Online)
require.Equal(t, "DE", selectedNodes[3].CountryCode.String())
require.Equal(t, nodeIds[5], selectedNodes[5].ID) // #6 is offline-suspended
require.True(t, selectedNodes[5].Online)
require.Equal(t, "DE", selectedNodes[5].CountryCode.String())
require.Equal(t, nodeIds[6], selectedNodes[6].ID) // #7 is in an excluded country
require.True(t, selectedNodes[6].Online)
require.Equal(t, "FR", selectedNodes[6].CountryCode.String())
require.Equal(t, nodeIds[7], selectedNodes[7].ID) // #8 is online but has no country code
require.True(t, selectedNodes[7].Online)
require.Equal(t, "", selectedNodes[7].CountryCode.String())
})
t.Run("GetParticipatingNodes", func(t *testing.T) {
allNodes, err := cache.GetParticipatingNodes(ctx, time.Hour, 0)
require.NoError(t, err)
expectOnline := func(t *testing.T, nodeList []nodeselection.SelectedNode, nodeID storj.NodeID, shouldBeOnline bool) {
for _, n := range nodeList {
if n.ID == nodeID {
if n.Online != shouldBeOnline {
require.Failf(t, "invalid Onlineness", "node %x was found in list, but Online=%v, whereas we expected Online=%v", n.ID[:], n.Online, shouldBeOnline)
}
return
}
}
require.Fail(t, "node not found in list", "node ID %x not found in list. list: %v", nodeID[:], nodeList)
}
expectOnline(t, allNodes, storj.NodeID{1}, true) // normal and online
expectOnline(t, allNodes, storj.NodeID{3}, true) // unknown audit suspended
expectOnline(t, allNodes, storj.NodeID{4}, false) // offline
expectOnline(t, allNodes, storj.NodeID{6}, true) // offline suspended
expectOnline(t, allNodes, storj.NodeID{7}, true) // excluded country
expectOnline(t, allNodes, storj.NodeID{8}, true) // normal and online, no country code
expectNotInList := func(t *testing.T, nodeList []nodeselection.SelectedNode, nodeID storj.NodeID) {
for index, n := range nodeList {
if n.ID == nodeID {
require.Failf(t, "not found in list", "node %x should not have been found in list, but it was found at index [%d].", nodeID[:], index)
}
}
}
expectNotInList(t, allNodes, storj.NodeID{2}) // disqualified
expectNotInList(t, allNodes, storj.NodeID{5}) // gracefully exited
expectNotInList(t, allNodes, storj.NodeID{9}) // not in db
require.Len(t, allNodes, 6)
})
t.Run("TestUpdateOperator", func(t *testing.T) {
nodeID := storj.NodeID{10}
addr := "127.0.1.0:8080"
lastNet := "127.0.1"
@ -214,9 +246,10 @@ func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) {
assert.Equal(t, "0x2222222222222222222222222222222222222222", updateWalletFeatures.Operator.Wallet)
assert.Equal(t, "def456@mail.test", updateWalletFeatures.Operator.Email)
assert.Equal(t, []string{"wallet_features_updated"}, updateWalletFeatures.Operator.WalletFeatures)
}
})
{ // test UpdateCheckIn updates the reputation correctly when the node is offline/online
// test UpdateCheckIn updates the reputation correctly when the node is offline/online
t.Run("UpdateCheckIn", func(t *testing.T) {
nodeID := storj.NodeID{1}
// get the existing node info that is stored in nodes table
@ -248,5 +281,5 @@ func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) {
require.NoError(t, err)
_, err = cache.Get(ctx, nodeID)
require.NoError(t, err)
}
})
}

View File

@ -205,7 +205,7 @@ func TestRefreshConcurrent(t *testing.T) {
require.True(t, 1 <= mockDB.callCount && mockDB.callCount <= 2, "calls %d", mockDB.callCount)
}
func TestGetNodes(t *testing.T) {
func TestSelectNodes(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
var nodeSelectionConfig = overlay.NodeSelectionConfig{
NewNodeFraction: 0.2,
@ -768,6 +768,16 @@ func (m *mockdb) Get(ctx context.Context, nodeID storj.NodeID) (*overlay.NodeDos
panic("implement me")
}
// GetNodes satisfies nodeevents.DB interface.
func (m *mockdb) GetNodes(ctx context.Context, nodeIDs storj.NodeIDList, onlineWindow, asOfSystemInterval time.Duration) (_ []nodeselection.SelectedNode, err error) {
panic("implement me")
}
// GetParticipatingNodes satisfies nodeevents.DB interface.
func (m *mockdb) GetParticipatingNodes(ctx context.Context, onlineWindow, asOfSystemInterval time.Duration) (_ []nodeselection.SelectedNode, err error) {
panic("implement me")
}
// KnownReliable satisfies nodeevents.DB interface.
func (m *mockdb) KnownReliable(ctx context.Context, nodeIDs storj.NodeIDList, onlineWindow, asOfSystemInterval time.Duration) (online []nodeselection.SelectedNode, offline []nodeselection.SelectedNode, err error) {
panic("implement me")

View File

@ -57,12 +57,9 @@ func TestObserverForkProcess(t *testing.T) {
}
o.nodesCache.state.Store(&reliabilityState{
reliableAll: mapNodes(nodes, func(node nodeselection.SelectedNode) bool {
nodeByID: mapNodes(nodes, func(node nodeselection.SelectedNode) bool {
return true
}),
reliableOnline: mapNodes(nodes, func(node nodeselection.SelectedNode) bool {
return node.Online == true
}),
created: time.Now(),
})
return o

View File

@ -16,7 +16,7 @@ import (
"storj.io/storj/satellite/overlay"
)
// ReliabilityCache caches the reliable nodes for the specified staleness duration
// ReliabilityCache caches known nodes for the specified staleness duration
// and updates automatically from overlay.
//
// architecture: Service
@ -32,9 +32,8 @@ type ReliabilityCache struct {
// reliabilityState.
type reliabilityState struct {
reliableOnline map[storj.NodeID]nodeselection.SelectedNode
reliableAll map[storj.NodeID]nodeselection.SelectedNode
created time.Time
nodeByID map[storj.NodeID]nodeselection.SelectedNode
created time.Time
}
// NewReliabilityCache creates a new reliability checking cache.
@ -73,7 +72,7 @@ func (cache *ReliabilityCache) NumNodes(ctx context.Context) (numNodes int, err
return 0, err
}
return len(state.reliableOnline), nil
return len(state.nodeByID), nil
}
// MissingPieces returns piece indices that are unreliable with the given staleness period.
@ -84,8 +83,8 @@ func (cache *ReliabilityCache) MissingPieces(ctx context.Context, created time.T
}
var unreliable metabase.Pieces
for _, p := range pieces {
node, ok := state.reliableOnline[p.StorageNode]
if !ok {
node, ok := state.nodeByID[p.StorageNode]
if !ok || !node.Online || node.Suspended {
unreliable = append(unreliable, p)
} else if _, excluded := cache.excludedCountryCodes[node.CountryCode]; excluded {
unreliable = append(unreliable, p)
@ -109,7 +108,7 @@ func (cache *ReliabilityCache) OutOfPlacementPieces(ctx context.Context, created
var outOfPlacementPieces metabase.Pieces
nodeFilters := cache.placementRules(placement)
for _, p := range pieces {
if node, ok := state.reliableAll[p.StorageNode]; ok && !nodeFilters.Match(&node) {
if node, ok := state.nodeByID[p.StorageNode]; ok && !nodeFilters.Match(&node) {
outOfPlacementPieces = append(outOfPlacementPieces, p)
}
}
@ -118,8 +117,8 @@ func (cache *ReliabilityCache) OutOfPlacementPieces(ctx context.Context, created
}
// PiecesNodesLastNetsInOrder returns the /24 subnet for each piece storage node, in order. If a
// requested node is not in the database or it's unreliable, an empty string will be returned corresponding
// to that node's last_net.
// requested node is not in the database, an empty string will be returned corresponding to that
// node's last_net.
func (cache *ReliabilityCache) PiecesNodesLastNetsInOrder(ctx context.Context, created time.Time, pieces metabase.Pieces) (lastNets []string, err error) {
defer mon.Task()(&ctx)(nil)
@ -134,7 +133,7 @@ func (cache *ReliabilityCache) PiecesNodesLastNetsInOrder(ctx context.Context, c
lastNets = make([]string, len(pieces))
for i, piece := range pieces {
if node, ok := state.reliableAll[piece.StorageNode]; ok {
if node, ok := state.nodeByID[piece.StorageNode]; ok {
lastNets[i] = node.LastNet
}
}
@ -180,22 +179,17 @@ func (cache *ReliabilityCache) Refresh(ctx context.Context) (err error) {
func (cache *ReliabilityCache) refreshLocked(ctx context.Context) (_ *reliabilityState, err error) {
defer mon.Task()(&ctx)(&err)
online, offline, err := cache.overlay.Reliable(ctx)
selectedNodes, err := cache.overlay.GetParticipatingNodes(ctx)
if err != nil {
return nil, Error.Wrap(err)
}
state := &reliabilityState{
created: time.Now(),
reliableOnline: make(map[storj.NodeID]nodeselection.SelectedNode, len(online)),
reliableAll: make(map[storj.NodeID]nodeselection.SelectedNode, len(online)+len(offline)),
created: time.Now(),
nodeByID: make(map[storj.NodeID]nodeselection.SelectedNode, len(selectedNodes)),
}
for _, node := range online {
state.reliableOnline[node.ID] = node
state.reliableAll[node.ID] = node
}
for _, node := range offline {
state.reliableAll[node.ID] = node
for _, node := range selectedNodes {
state.nodeByID[node.ID] = node
}
cache.state.Store(state)

View File

@ -60,13 +60,13 @@ func TestReliabilityCache_Concurrent(t *testing.T) {
type fakeOverlayDB struct{ overlay.DB }
type fakeNodeEvents struct{ nodeevents.DB }
func (fakeOverlayDB) Reliable(context.Context, time.Duration, time.Duration) ([]nodeselection.SelectedNode, []nodeselection.SelectedNode, error) {
func (fakeOverlayDB) GetParticipatingNodes(context.Context, time.Duration, time.Duration) ([]nodeselection.SelectedNode, error) {
return []nodeselection.SelectedNode{
{ID: testrand.NodeID()},
{ID: testrand.NodeID()},
{ID: testrand.NodeID()},
{ID: testrand.NodeID()},
}, nil, nil
{ID: testrand.NodeID(), Online: true},
{ID: testrand.NodeID(), Online: true},
{ID: testrand.NodeID(), Online: true},
{ID: testrand.NodeID(), Online: true},
}, nil
}
func TestReliabilityCache_OutOfPlacementPieces(t *testing.T) {

View File

@ -668,14 +668,14 @@ func (repairer *SegmentRepairer) classifySegmentPieces(ctx context.Context, segm
allNodeIDs[i] = piece.StorageNode
}
online, offline, err := repairer.overlay.KnownReliable(ctx, allNodeIDs)
selectedNodes, err := repairer.overlay.GetNodes(ctx, allNodeIDs)
if err != nil {
return piecesCheckResult{}, overlayQueryError.New("error identifying missing pieces: %w", err)
}
return repairer.classifySegmentPiecesWithNodes(ctx, segment, allNodeIDs, online, offline)
return repairer.classifySegmentPiecesWithNodes(ctx, segment, allNodeIDs, selectedNodes)
}
func (repairer *SegmentRepairer) classifySegmentPiecesWithNodes(ctx context.Context, segment metabase.Segment, allNodeIDs []storj.NodeID, online []nodeselection.SelectedNode, offline []nodeselection.SelectedNode) (result piecesCheckResult, err error) {
func (repairer *SegmentRepairer) classifySegmentPiecesWithNodes(ctx context.Context, segment metabase.Segment, allNodeIDs []storj.NodeID, selectedNodes []nodeselection.SelectedNode) (result piecesCheckResult, err error) {
pieces := segment.Pieces
nodeIDPieceMap := map[storj.NodeID]uint16{}
@ -688,22 +688,28 @@ func (repairer *SegmentRepairer) classifySegmentPiecesWithNodes(ctx context.Cont
result.ExcludeNodeIDs = allNodeIDs
if len(selectedNodes) != len(pieces) {
repairer.log.Error("GetNodes returned an invalid result", zap.Any("pieces", pieces), zap.Any("selectedNodes", selectedNodes), zap.Error(err))
return piecesCheckResult{}, overlayQueryError.New("GetNodes returned an invalid result")
}
nodeFilters := repairer.placementRules(segment.Placement)
// remove online nodes from missing pieces
for _, onlineNode := range online {
for _, node := range selectedNodes {
if !node.Online || node.Suspended {
continue
}
// count online nodes in excluded countries only if country is not excluded by segment
// placement, those nodes will be counted with out of placement check
if _, excluded := repairer.excludedCountryCodes[onlineNode.CountryCode]; excluded && nodeFilters.Match(&onlineNode) {
if _, excluded := repairer.excludedCountryCodes[node.CountryCode]; excluded && nodeFilters.Match(&node) {
result.NumHealthyInExcludedCountries++
}
pieceNum := nodeIDPieceMap[onlineNode.ID]
pieceNum := nodeIDPieceMap[node.ID]
delete(result.MissingPiecesSet, pieceNum)
}
nodeFilters = repairer.placementRules(segment.Placement)
if repairer.doDeclumping && !nodeselection.AllowSameSubnet(nodeFilters) {
// if multiple pieces are on the same last_net, keep only the first one. The rest are
// to be considered retrievable but unhealthy.
@ -711,8 +717,11 @@ func (repairer *SegmentRepairer) classifySegmentPiecesWithNodes(ctx context.Cont
reliablePieces := metabase.Pieces{}
collectLastNets := func(reliable []nodeselection.SelectedNode) {
for _, node := range reliable {
collectClumpedPieces := func(onlineness bool) {
for _, node := range selectedNodes {
if node.Online != onlineness {
continue
}
pieceNum := nodeIDPieceMap[node.ID]
reliablePieces = append(reliablePieces, metabase.Piece{
Number: pieceNum,
@ -721,8 +730,10 @@ func (repairer *SegmentRepairer) classifySegmentPiecesWithNodes(ctx context.Cont
lastNets = append(lastNets, node.LastNet)
}
}
collectLastNets(online)
collectLastNets(offline)
// go over online nodes first, so that if we have to remove clumped pieces, we prefer
// to remove offline ones over online ones.
collectClumpedPieces(true)
collectClumpedPieces(false)
clumpedPieces := repair.FindClumpedPieces(reliablePieces, lastNets)
result.ClumpedPiecesSet = map[uint16]bool{}
@ -734,17 +745,13 @@ func (repairer *SegmentRepairer) classifySegmentPiecesWithNodes(ctx context.Cont
result.OutOfPlacementPiecesSet = map[uint16]bool{}
if repairer.doPlacementCheck {
checkPlacement := func(reliable []nodeselection.SelectedNode) {
for _, node := range reliable {
if nodeFilters.Match(&node) {
continue
}
result.OutOfPlacementPiecesSet[nodeIDPieceMap[node.ID]] = true
for _, node := range selectedNodes {
if nodeFilters.Match(&node) {
continue
}
result.OutOfPlacementPiecesSet[nodeIDPieceMap[node.ID]] = true
}
checkPlacement(online)
checkPlacement(offline)
}
// verify that some of clumped pieces and out of placement pieces are not the same
@ -753,8 +760,10 @@ func (repairer *SegmentRepairer) classifySegmentPiecesWithNodes(ctx context.Cont
maps.Copy(unhealthyRetrievableSet, result.OutOfPlacementPiecesSet)
// offline nodes are not retrievable
for _, node := range offline {
delete(unhealthyRetrievableSet, nodeIDPieceMap[node.ID])
for _, node := range selectedNodes {
if !node.Online {
delete(unhealthyRetrievableSet, nodeIDPieceMap[node.ID])
}
}
result.NumUnhealthyRetrievable = len(unhealthyRetrievableSet)

View File

@ -8,6 +8,7 @@ import (
"testing"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"storj.io/common/identity/testidentity"
"storj.io/common/storj"
@ -21,8 +22,21 @@ import (
func TestClassify(t *testing.T) {
ctx := testcontext.New(t)
getNodes := func(nodes []nodeselection.SelectedNode, pieces metabase.Pieces) (res []nodeselection.SelectedNode) {
for _, piece := range pieces {
for _, node := range nodes {
if node.ID == piece.StorageNode {
res = append(res, node)
break
}
}
}
return res
}
t.Run("all online", func(t *testing.T) {
var online, offline = generateNodes(5, func(ix int) bool {
var selectedNodes = generateNodes(5, func(ix int) bool {
return true
}, func(ix int, node *nodeselection.SelectedNode) {
@ -33,8 +47,8 @@ func TestClassify(t *testing.T) {
s := SegmentRepairer{
placementRules: c.CreateFilters,
}
pieces := createPieces(online, offline, 0, 1, 2, 3, 4)
result, err := s.classifySegmentPiecesWithNodes(ctx, metabase.Segment{Pieces: pieces}, allNodeIDs(pieces), online, offline)
pieces := createPieces(selectedNodes, 0, 1, 2, 3, 4)
result, err := s.classifySegmentPiecesWithNodes(ctx, metabase.Segment{Pieces: pieces}, allNodeIDs(pieces), selectedNodes)
require.NoError(t, err)
require.Equal(t, 0, len(result.MissingPiecesSet))
@ -44,10 +58,10 @@ func TestClassify(t *testing.T) {
})
t.Run("out of placement", func(t *testing.T) {
var online, offline = generateNodes(10, func(ix int) bool {
var selectedNodes = generateNodes(10, func(ix int) bool {
return true
}, func(ix int, node *nodeselection.SelectedNode) {
if ix > 4 {
if ix < 4 {
node.CountryCode = location.Germany
} else {
node.CountryCode = location.UnitedKingdom
@ -60,10 +74,11 @@ func TestClassify(t *testing.T) {
s := SegmentRepairer{
placementRules: c.CreateFilters,
doPlacementCheck: true,
log: zaptest.NewLogger(t),
}
pieces := createPieces(online, offline, 1, 2, 3, 4, 7, 8)
result, err := s.classifySegmentPiecesWithNodes(ctx, metabase.Segment{Pieces: pieces, Placement: 10}, allNodeIDs(pieces), online, offline)
pieces := createPieces(selectedNodes, 1, 2, 3, 4, 7, 8)
result, err := s.classifySegmentPiecesWithNodes(ctx, metabase.Segment{Pieces: pieces, Placement: 10}, allNodeIDs(pieces), getNodes(selectedNodes, pieces))
require.NoError(t, err)
require.Equal(t, 0, len(result.MissingPiecesSet))
@ -75,7 +90,7 @@ func TestClassify(t *testing.T) {
t.Run("out of placement and offline", func(t *testing.T) {
// all nodes are in wrong region and half of them are offline
var online, offline = generateNodes(10, func(ix int) bool {
var selectedNodes = generateNodes(10, func(ix int) bool {
return ix < 5
}, func(ix int, node *nodeselection.SelectedNode) {
node.CountryCode = location.Germany
@ -88,8 +103,8 @@ func TestClassify(t *testing.T) {
doPlacementCheck: true,
}
pieces := createPieces(online, offline, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
result, err := s.classifySegmentPiecesWithNodes(ctx, metabase.Segment{Pieces: pieces, Placement: 10}, allNodeIDs(pieces), online, offline)
pieces := createPieces(selectedNodes, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
result, err := s.classifySegmentPiecesWithNodes(ctx, metabase.Segment{Pieces: pieces, Placement: 10}, allNodeIDs(pieces), getNodes(selectedNodes, pieces))
require.NoError(t, err)
// offline nodes
@ -103,7 +118,7 @@ func TestClassify(t *testing.T) {
})
t.Run("normal declumping (subnet check)", func(t *testing.T) {
var online, offline = generateNodes(10, func(ix int) bool {
var selectedNodes = generateNodes(10, func(ix int) bool {
return ix < 5
}, func(ix int, node *nodeselection.SelectedNode) {
node.LastNet = fmt.Sprintf("127.0.%d.0", ix/2)
@ -113,16 +128,17 @@ func TestClassify(t *testing.T) {
s := SegmentRepairer{
placementRules: c.CreateFilters,
doDeclumping: true,
log: zaptest.NewLogger(t),
}
// first 5: online, 2 in each subnet --> healthy: one from (0,1) (2,3) (4), offline: (5,6) but 5 is in the same subnet as 6
pieces := createPieces(online, offline, 0, 1, 2, 3, 4, 5, 6)
result, err := s.classifySegmentPiecesWithNodes(ctx, metabase.Segment{Pieces: pieces}, allNodeIDs(pieces), online, offline)
pieces := createPieces(selectedNodes, 0, 1, 2, 3, 4, 5, 6)
result, err := s.classifySegmentPiecesWithNodes(ctx, metabase.Segment{Pieces: pieces}, allNodeIDs(pieces), getNodes(selectedNodes, pieces))
require.NoError(t, err)
// offline nodes
require.Equal(t, 2, len(result.MissingPiecesSet))
require.Equal(t, 4, len(result.ClumpedPiecesSet))
require.Equal(t, 3, len(result.ClumpedPiecesSet))
require.Equal(t, 0, len(result.OutOfPlacementPiecesSet))
require.Equal(t, 2, result.NumUnhealthyRetrievable)
numHealthy := len(pieces) - len(result.MissingPiecesSet) - result.NumUnhealthyRetrievable
@ -131,7 +147,7 @@ func TestClassify(t *testing.T) {
})
t.Run("declumping but with no subnet filter", func(t *testing.T) {
var online, offline = generateNodes(10, func(ix int) bool {
var selectedNodes = generateNodes(10, func(ix int) bool {
return ix < 5
}, func(ix int, node *nodeselection.SelectedNode) {
node.LastNet = fmt.Sprintf("127.0.%d.0", ix/2)
@ -147,8 +163,8 @@ func TestClassify(t *testing.T) {
}
// first 5: online, 2 in each subnet --> healthy: one from (0,1) (2,3) (4), offline: (5,6) but 5 is in the same subnet as 6
pieces := createPieces(online, offline, 0, 1, 2, 3, 4, 5, 6)
result, err := s.classifySegmentPiecesWithNodes(ctx, metabase.Segment{Pieces: pieces, Placement: 10}, allNodeIDs(pieces), online, offline)
pieces := createPieces(selectedNodes, 0, 1, 2, 3, 4, 5, 6)
result, err := s.classifySegmentPiecesWithNodes(ctx, metabase.Segment{Pieces: pieces, Placement: 10}, allNodeIDs(pieces), getNodes(selectedNodes, pieces))
require.NoError(t, err)
// offline nodes
@ -163,31 +179,25 @@ func TestClassify(t *testing.T) {
}
func generateNodes(num int, isOnline func(i int) bool, config func(ix int, node *nodeselection.SelectedNode)) (online []nodeselection.SelectedNode, offline []nodeselection.SelectedNode) {
func generateNodes(num int, isOnline func(i int) bool, config func(ix int, node *nodeselection.SelectedNode)) (selectedNodes []nodeselection.SelectedNode) {
for i := 0; i < num; i++ {
node := nodeselection.SelectedNode{
ID: testidentity.MustPregeneratedIdentity(i, storj.LatestIDVersion()).ID,
ID: testidentity.MustPregeneratedIdentity(i, storj.LatestIDVersion()).ID,
Online: isOnline(i),
}
config(i, &node)
if isOnline(i) {
online = append(online, node)
} else {
offline = append(offline, node)
}
selectedNodes = append(selectedNodes, node)
}
return
}
func createPieces(online []nodeselection.SelectedNode, offline []nodeselection.SelectedNode, indexes ...int) (res metabase.Pieces) {
func createPieces(selectedNodes []nodeselection.SelectedNode, indexes ...int) (res metabase.Pieces) {
for _, index := range indexes {
piece := metabase.Piece{
Number: uint16(index),
}
if len(online)-1 < index {
piece.StorageNode = offline[index-len(online)].ID
} else {
piece.StorageNode = online[index].ID
}
piece.StorageNode = selectedNodes[index].ID
res = append(res, piece)
}

View File

@ -394,47 +394,31 @@ func (cache *overlaycache) UpdateLastOfflineEmail(ctx context.Context, nodeIDs s
return err
}
// KnownReliable filters a set of nodes to reliable nodes. List is split into online and offline nodes.
func (cache *overlaycache) KnownReliable(ctx context.Context, nodeIDs storj.NodeIDList, onlineWindow, asOfSystemInterval time.Duration) ([]nodeselection.SelectedNode, []nodeselection.SelectedNode, error) {
var on, off []*nodeselection.SelectedNode
var err error
for {
on, off, err = cache.knownReliable(ctx, nodeIDs, onlineWindow, asOfSystemInterval)
if err != nil {
if cockroachutil.NeedsRetry(err) {
continue
}
return nil, nil, err
}
break
}
err = cache.addNodeTags(ctx, append(on, off...))
deref := func(nodes []*nodeselection.SelectedNode) []nodeselection.SelectedNode {
var res []nodeselection.SelectedNode
for _, node := range nodes {
res = append(res, *node)
}
return res
}
return deref(on), deref(off), err
}
func (cache *overlaycache) knownReliable(ctx context.Context, nodeIDs storj.NodeIDList, onlineWindow, asOfSystemInterval time.Duration) (online []*nodeselection.SelectedNode, offline []*nodeselection.SelectedNode, err error) {
// GetNodes gets records for all specified nodes as of the given system interval. The
// onlineWindow is used to determine whether each node is marked as Online. The results are
// returned in a slice of the same length as the input nodeIDs, and each index of the returned
// list corresponds to the same index in nodeIDs. If a node is not known, or is disqualified
// or exited, the corresponding returned SelectedNode will have a zero value.
func (cache *overlaycache) GetNodes(ctx context.Context, nodeIDs storj.NodeIDList, onlineWindow, asOfSystemInterval time.Duration) (records []nodeselection.SelectedNode, err error) {
defer mon.Task()(&ctx)(&err)
var nodes []*nodeselection.SelectedNode
if len(nodeIDs) == 0 {
return nil, nil, Error.New("no ids provided")
return nil, Error.New("no ids provided")
}
err = withRows(cache.db.Query(ctx, `
SELECT id, address, last_net, last_ip_port, country_code, last_contact_success > $2 as online, exit_initiated_at IS NOT NULL as exiting
FROM nodes
SELECT n.id, n.address, n.last_net, n.last_ip_port, n.country_code,
n.last_contact_success > $2 AS online,
(n.offline_suspended IS NOT NULL OR n.unknown_audit_suspended IS NOT NULL) AS suspended,
n.disqualified IS NOT NULL AS disqualified,
n.exit_initiated_at IS NOT NULL AS exiting,
n.exit_finished_at IS NOT NULL AS exited
FROM unnest($1::bytea[]) WITH ORDINALITY AS input(node_id, ordinal)
LEFT OUTER JOIN nodes n ON input.node_id = n.id
`+cache.db.impl.AsOfSystemInterval(asOfSystemInterval)+`
WHERE id = any($1::bytea[])
AND disqualified IS NULL
AND unknown_audit_suspended IS NULL
AND offline_suspended IS NULL
AND exit_finished_at IS NULL
ORDER BY input.ordinal
`, pgutil.NodeIDArray(nodeIDs), time.Now().Add(-onlineWindow),
))(func(rows tagsql.Rows) error {
for rows.Next() {
@ -443,53 +427,43 @@ func (cache *overlaycache) knownReliable(ctx context.Context, nodeIDs storj.Node
return err
}
if node.Online {
online = append(online, &node)
} else {
offline = append(offline, &node)
}
nodes = append(nodes, &node)
}
return nil
})
if err != nil {
return nil, Error.Wrap(err)
}
return online, offline, Error.Wrap(err)
err = cache.addNodeTags(ctx, nodes)
if err != nil {
return nil, Error.Wrap(err)
}
records = make([]nodeselection.SelectedNode, len(nodes))
for i := 0; i < len(nodes); i++ {
records[i] = *nodes[i]
}
return records, Error.Wrap(err)
}
// Reliable returns all nodes that are reliable, online and offline.
func (cache *overlaycache) Reliable(ctx context.Context, onlineWindow, asOfSystemInterval time.Duration) ([]nodeselection.SelectedNode, []nodeselection.SelectedNode, error) {
var on, off []*nodeselection.SelectedNode
var err error
for {
on, off, err = cache.reliable(ctx, onlineWindow, asOfSystemInterval)
if err != nil {
if cockroachutil.NeedsRetry(err) {
continue
}
return nil, nil, err
}
break
}
err = cache.addNodeTags(ctx, append(on, off...))
deref := func(nodes []*nodeselection.SelectedNode) []nodeselection.SelectedNode {
var res []nodeselection.SelectedNode
for _, node := range nodes {
res = append(res, *node)
}
return res
}
return deref(on), deref(off), err
}
func (cache *overlaycache) reliable(ctx context.Context, onlineWindow, asOfSystemInterval time.Duration) (online []*nodeselection.SelectedNode, offline []*nodeselection.SelectedNode, err error) {
// GetParticipatingNodes returns all known participating nodes (this includes all known nodes
// excluding nodes that have been disqualified or gracefully exited).
func (cache *overlaycache) GetParticipatingNodes(ctx context.Context, onlineWindow, asOfSystemInterval time.Duration) (records []nodeselection.SelectedNode, err error) {
defer mon.Task()(&ctx)(&err)
var nodes []*nodeselection.SelectedNode
err = withRows(cache.db.Query(ctx, `
SELECT id, address, last_net, last_ip_port, country_code, last_contact_success > $1 as online, exit_initiated_at IS NOT NULL as exiting
SELECT id, address, last_net, last_ip_port, country_code,
last_contact_success > $1 AS online,
(offline_suspended IS NOT NULL OR unknown_audit_suspended IS NOT NULL) AS suspended,
false AS disqualified,
exit_initiated_at IS NOT NULL AS exiting,
false AS exited
FROM nodes
`+cache.db.impl.AsOfSystemInterval(asOfSystemInterval)+`
WHERE disqualified IS NULL
AND unknown_audit_suspended IS NULL
AND offline_suspended IS NULL
AND exit_finished_at IS NULL
`, time.Now().Add(-onlineWindow),
))(func(rows tagsql.Rows) error {
@ -498,35 +472,82 @@ func (cache *overlaycache) reliable(ctx context.Context, onlineWindow, asOfSyste
if err != nil {
return err
}
if node.Online {
online = append(online, &node)
} else {
offline = append(offline, &node)
}
nodes = append(nodes, &node)
}
return nil
})
if err != nil {
return nil, Error.Wrap(err)
}
return online, offline, Error.Wrap(err)
err = cache.addNodeTags(ctx, nodes)
if err != nil {
return nil, Error.Wrap(err)
}
records = make([]nodeselection.SelectedNode, len(nodes))
for i := 0; i < len(nodes); i++ {
records[i] = *nodes[i]
}
return records, Error.Wrap(err)
}
// nullNodeID represents a NodeID that may be null.
type nullNodeID struct {
NodeID storj.NodeID
Valid bool
}
// Scan implements the sql.Scanner interface.
func (n *nullNodeID) Scan(value any) error {
if value == nil {
n.NodeID = storj.NodeID{}
n.Valid = false
return nil
}
err := n.NodeID.Scan(value)
if err != nil {
n.Valid = false
return err
}
n.Valid = true
return nil
}
func scanSelectedNode(rows tagsql.Rows) (nodeselection.SelectedNode, error) {
var node nodeselection.SelectedNode
node.Address = &pb.NodeAddress{}
var lastIPPort sql.NullString
err := rows.Scan(&node.ID, &node.Address.Address, &node.LastNet, &lastIPPort, &node.CountryCode, &node.Online, &node.Exiting)
var nodeID nullNodeID
var address, lastNet, lastIPPort, countryCode sql.NullString
var online, suspended, disqualified, exiting, exited sql.NullBool
err := rows.Scan(&nodeID, &address, &lastNet, &lastIPPort, &countryCode,
&online, &suspended, &disqualified, &exiting, &exited)
if err != nil {
return nodeselection.SelectedNode{}, err
}
// If node ID was null, no record was found for the specified ID. For our purposes
// here, we will treat that as equivalent to a node being DQ'd or exited.
if !nodeID.Valid {
// return an empty record
return nodeselection.SelectedNode{}, nil
}
// nodeID was valid, so from here on we assume all the other non-null fields are valid, per database constraints
if disqualified.Bool || exited.Bool {
return nodeselection.SelectedNode{}, nil
}
node.ID = nodeID.NodeID
node.Address.Address = address.String
node.LastNet = lastNet.String
if lastIPPort.Valid {
node.LastIPPort = lastIPPort.String
}
// node.Suspended is always false for now, but that will change in a coming
// commit; we need to include suspended nodes in return values from
// Reliable() and KnownReliable() (in case they are in excluded countries,
// are out of placement, are on clumped IP networks, etc).
if countryCode.Valid {
node.CountryCode = location.ToCountryCode(countryCode.String)
}
node.Online = online.Bool
node.Suspended = suspended.Bool
node.Exiting = exiting.Bool
return node, nil
}

View File

@ -13,6 +13,7 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"storj.io/common/identity/testidentity"
@ -448,200 +449,264 @@ func TestOverlayCache_SelectAllStorageNodesDownloadUpload(t *testing.T) {
}
func TestOverlayCache_KnownReliable(t *testing.T) {
type nodeDisposition struct {
id storj.NodeID
address string
lastIPPort string
offlineInterval time.Duration
countryCode location.CountryCode
disqualified bool
auditSuspended bool
offlineSuspended bool
exiting bool
exited bool
}
func TestOverlayCache_GetNodes(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
cache := db.OverlayCache()
allNodes := []nodeselection.SelectedNode{
addNode(ctx, t, cache, "online", "127.0.0.1", true, false, false, false, false),
addNode(ctx, t, cache, "offline", "127.0.0.2", false, false, false, false, false),
addNode(ctx, t, cache, "disqalified", "127.0.0.3", false, true, false, false, false),
addNode(ctx, t, cache, "audit-suspended", "127.0.0.4", false, false, true, false, false),
addNode(ctx, t, cache, "offline-suspended", "127.0.0.5", false, false, false, true, false),
addNode(ctx, t, cache, "exited", "127.0.0.6", false, false, false, false, true),
allNodes := []nodeDisposition{
addNode(ctx, t, cache, "online ", "127.0.0.1", time.Second, false, false, false, false, false),
addNode(ctx, t, cache, "offline ", "127.0.0.2", 2*time.Hour, false, false, false, false, false),
addNode(ctx, t, cache, "disqualified ", "127.0.0.3", 2*time.Hour, true, false, false, false, false),
addNode(ctx, t, cache, "audit-suspended ", "127.0.0.4", time.Second, false, true, false, false, false),
addNode(ctx, t, cache, "offline-suspended", "127.0.0.5", 2*time.Hour, false, false, true, false, false),
addNode(ctx, t, cache, "exiting ", "127.0.0.5", 2*time.Hour, false, false, false, true, false),
addNode(ctx, t, cache, "exited ", "127.0.0.6", 2*time.Hour, false, false, false, false, true),
}
ids := func(nodes ...nodeselection.SelectedNode) storj.NodeIDList {
nodeIds := storj.NodeIDList{}
for _, node := range nodes {
nodeIds = append(nodeIds, node.ID)
nodes := func(nodeNums ...int) []nodeDisposition {
nodeDisps := make([]nodeDisposition, len(nodeNums))
for i, nodeNum := range nodeNums {
nodeDisps[i] = allNodes[nodeNum]
}
return nodeIds
return nodeDisps
}
nodes := func(nodes ...nodeselection.SelectedNode) []nodeselection.SelectedNode {
return append([]nodeselection.SelectedNode{}, nodes...)
sNodes := func(nodes ...int) []nodeselection.SelectedNode {
selectedNodes := make([]nodeselection.SelectedNode, len(nodes))
for i, nodeNum := range nodes {
selectedNodes[i] = nodeDispositionToSelectedNode(allNodes[nodeNum], time.Hour)
}
return selectedNodes
}
type testCase struct {
IDs storj.NodeIDList
Online []nodeselection.SelectedNode
Offline []nodeselection.SelectedNode
QueryNodes []nodeDisposition
Online []nodeselection.SelectedNode
Offline []nodeselection.SelectedNode
}
shuffledNodeIDs := ids(allNodes...)
rand.Shuffle(len(shuffledNodeIDs), shuffledNodeIDs.Swap)
for _, tc := range []testCase{
for testNum, tc := range []testCase{
{
IDs: ids(allNodes[0], allNodes[1]),
Online: nodes(allNodes[0]),
Offline: nodes(allNodes[1]),
QueryNodes: nodes(0, 1),
Online: sNodes(0),
Offline: sNodes(1),
},
{
IDs: ids(allNodes[0]),
Online: nodes(allNodes[0]),
QueryNodes: nodes(0),
Online: sNodes(0),
},
{
IDs: ids(allNodes[1]),
Offline: nodes(allNodes[1]),
QueryNodes: nodes(1),
Offline: sNodes(1),
},
{ // only unreliable
IDs: ids(allNodes[2], allNodes[3], allNodes[4], allNodes[5]),
QueryNodes: nodes(2, 3, 4, 5),
Online: sNodes(3),
Offline: sNodes(4, 5),
},
{ // all nodes
IDs: ids(allNodes...),
Online: nodes(allNodes[0]),
Offline: nodes(allNodes[1]),
},
// all nodes but in shuffled order
{
IDs: shuffledNodeIDs,
Online: nodes(allNodes[0]),
Offline: nodes(allNodes[1]),
QueryNodes: allNodes,
Online: sNodes(0, 3),
Offline: sNodes(1, 4, 5),
},
// all nodes + one ID not from DB
{
IDs: append(ids(allNodes...), testrand.NodeID()),
Online: nodes(allNodes[0]),
Offline: nodes(allNodes[1]),
QueryNodes: append(allNodes, nodeDisposition{
id: testrand.NodeID(),
disqualified: true, // just so we expect a zero ID for this entry
}),
Online: sNodes(0, 3),
Offline: sNodes(1, 4, 5),
},
} {
online, offline, err := cache.KnownReliable(ctx, tc.IDs, 1*time.Hour, 0)
ids := make([]storj.NodeID, len(tc.QueryNodes))
for i := range tc.QueryNodes {
ids[i] = tc.QueryNodes[i].id
}
selectedNodes, err := cache.GetNodes(ctx, ids, 1*time.Hour, 0)
require.NoError(t, err)
require.ElementsMatch(t, tc.Online, online)
require.ElementsMatch(t, tc.Offline, offline)
require.Equal(t, len(tc.QueryNodes), len(selectedNodes))
var gotOnline []nodeselection.SelectedNode
var gotOffline []nodeselection.SelectedNode
for i, n := range selectedNodes {
if tc.QueryNodes[i].disqualified || tc.QueryNodes[i].exited {
assert.Zero(t, n, testNum, i)
} else {
assert.Equal(t, tc.QueryNodes[i].id, selectedNodes[i].ID, "%d:%d", testNum, i)
if n.Online {
gotOnline = append(gotOnline, n)
} else {
gotOffline = append(gotOffline, n)
}
}
}
assert.Equal(t, tc.Online, gotOnline)
assert.Equal(t, tc.Offline, gotOffline)
}
// test empty id list
_, _, err := cache.KnownReliable(ctx, storj.NodeIDList{}, 1*time.Hour, 0)
_, err := cache.GetNodes(ctx, storj.NodeIDList{}, 1*time.Hour, 0)
require.Error(t, err)
// test as of system time
_, _, err = cache.KnownReliable(ctx, ids(allNodes...), 1*time.Hour, -1*time.Microsecond)
allIDs := make([]storj.NodeID, len(allNodes))
for i := range allNodes {
allIDs[i] = allNodes[i].id
}
_, err = cache.GetNodes(ctx, allIDs, 1*time.Hour, -1*time.Microsecond)
require.NoError(t, err)
})
}
func TestOverlayCache_Reliable(t *testing.T) {
func TestOverlayCache_GetParticipatingNodes(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
cache := db.OverlayCache()
allNodes := []nodeselection.SelectedNode{
addNode(ctx, t, cache, "online", "127.0.0.1", true, false, false, false, false),
addNode(ctx, t, cache, "offline", "127.0.0.2", false, false, false, false, false),
addNode(ctx, t, cache, "disqalified", "127.0.0.3", false, true, false, false, false),
addNode(ctx, t, cache, "audit-suspended", "127.0.0.4", false, false, true, false, false),
addNode(ctx, t, cache, "offline-suspended", "127.0.0.5", false, false, false, true, false),
addNode(ctx, t, cache, "exited", "127.0.0.6", false, false, false, false, true),
allNodes := []nodeDisposition{
addNode(ctx, t, cache, "online ", "127.0.0.1", time.Second, false, false, false, false, false),
addNode(ctx, t, cache, "offline ", "127.0.0.2", 2*time.Hour, false, false, false, false, false),
addNode(ctx, t, cache, "disqualified ", "127.0.0.3", 2*time.Hour, true, false, false, false, false),
addNode(ctx, t, cache, "audit-suspended ", "127.0.0.4", time.Second, false, true, false, false, false),
addNode(ctx, t, cache, "offline-suspended", "127.0.0.5", 2*time.Hour, false, false, true, false, false),
addNode(ctx, t, cache, "exiting ", "127.0.0.5", 2*time.Hour, false, false, false, true, false),
addNode(ctx, t, cache, "exited ", "127.0.0.6", 2*time.Hour, false, false, false, false, true),
}
type testCase struct {
OnlineWindow time.Duration
Online []nodeselection.SelectedNode
Offline []nodeselection.SelectedNode
Online []int
Offline []int
}
for i, tc := range []testCase{
{
OnlineWindow: 1 * time.Hour,
Online: []nodeselection.SelectedNode{allNodes[0]},
Offline: []nodeselection.SelectedNode{allNodes[1]},
Online: []int{0, 3},
Offline: []int{1, 4, 5},
},
{
OnlineWindow: 20 * time.Hour,
Online: []nodeselection.SelectedNode{allNodes[0], allNodes[1]},
Online: []int{0, 1, 3, 4, 5},
},
{
OnlineWindow: 1 * time.Microsecond,
Offline: []nodeselection.SelectedNode{allNodes[0], allNodes[1]},
Offline: []int{0, 1, 3, 4, 5},
},
} {
online, offline, err := cache.Reliable(ctx, tc.OnlineWindow, 0)
expectedNodes := make([]nodeselection.SelectedNode, 0, len(tc.Offline)+len(tc.Online))
for _, num := range tc.Online {
selectedNode := nodeDispositionToSelectedNode(allNodes[num], 0)
selectedNode.Online = true
expectedNodes = append(expectedNodes, selectedNode)
}
for _, num := range tc.Offline {
selectedNode := nodeDispositionToSelectedNode(allNodes[num], 0)
selectedNode.Online = false
expectedNodes = append(expectedNodes, selectedNode)
}
gotNodes, err := cache.GetParticipatingNodes(ctx, tc.OnlineWindow, 0)
require.NoError(t, err)
// make the .Online attribute match expectations for this OnlineWindow
for n := range tc.Online {
tc.Online[n].Online = true
}
for n := range tc.Offline {
tc.Offline[n].Online = false
}
require.ElementsMatch(t, tc.Online, online, "#%d", i)
require.ElementsMatch(t, tc.Offline, offline, "#%d", i)
require.ElementsMatch(t, expectedNodes, gotNodes, "#%d", i)
}
// test as of system time
_, _, err := cache.Reliable(ctx, 1*time.Hour, -1*time.Microsecond)
_, err := cache.GetParticipatingNodes(ctx, 1*time.Hour, -1*time.Microsecond)
require.NoError(t, err)
})
}
func addNode(ctx context.Context, t *testing.T, cache overlay.DB, address, lastIPPort string, online, disqualified, auditSuspended, offlineSuspended, exited bool) nodeselection.SelectedNode {
selectedNode := nodeselection.SelectedNode{
ID: testrand.NodeID(),
Address: &pb.NodeAddress{Address: address},
LastNet: lastIPPort,
LastIPPort: lastIPPort,
CountryCode: location.Poland,
Online: online,
func nodeDispositionToSelectedNode(disp nodeDisposition, onlineWindow time.Duration) nodeselection.SelectedNode {
if disp.exited || disp.disqualified {
return nodeselection.SelectedNode{}
}
return nodeselection.SelectedNode{
ID: disp.id,
Address: &pb.NodeAddress{Address: disp.address},
LastNet: disp.lastIPPort,
LastIPPort: disp.lastIPPort,
CountryCode: disp.countryCode,
Exiting: disp.exiting,
Suspended: disp.auditSuspended || disp.offlineSuspended,
Online: disp.offlineInterval <= onlineWindow,
}
}
func addNode(ctx context.Context, t *testing.T, cache overlay.DB, address, lastIPPort string, offlineInterval time.Duration, disqualified, auditSuspended, offlineSuspended, exiting, exited bool) nodeDisposition {
disp := nodeDisposition{
id: testrand.NodeID(),
address: address,
lastIPPort: lastIPPort,
offlineInterval: offlineInterval,
countryCode: location.Poland,
disqualified: disqualified,
auditSuspended: auditSuspended,
offlineSuspended: offlineSuspended,
exiting: exiting,
exited: exited,
}
checkInInfo := overlay.NodeCheckInInfo{
IsUp: true,
NodeID: selectedNode.ID,
Address: &pb.NodeAddress{Address: selectedNode.Address.Address},
LastIPPort: selectedNode.LastIPPort,
LastNet: selectedNode.LastNet,
CountryCode: selectedNode.CountryCode,
NodeID: disp.id,
Address: &pb.NodeAddress{Address: disp.address},
LastIPPort: disp.lastIPPort,
LastNet: disp.lastIPPort,
CountryCode: disp.countryCode,
Version: &pb.NodeVersion{Version: "v0.0.0"},
}
timestamp := time.Now().UTC()
if !online {
timestamp = time.Now().Add(-10 * time.Hour)
}
timestamp := time.Now().UTC().Add(-disp.offlineInterval)
err := cache.UpdateCheckIn(ctx, checkInInfo, timestamp, overlay.NodeSelectionConfig{})
require.NoError(t, err)
if disqualified {
_, err := cache.DisqualifyNode(ctx, selectedNode.ID, time.Now(), overlay.DisqualificationReasonAuditFailure)
_, err := cache.DisqualifyNode(ctx, disp.id, time.Now(), overlay.DisqualificationReasonAuditFailure)
require.NoError(t, err)
}
if auditSuspended {
require.NoError(t, cache.TestSuspendNodeUnknownAudit(ctx, selectedNode.ID, time.Now()))
selectedNode.Suspended = true
require.NoError(t, cache.TestSuspendNodeUnknownAudit(ctx, disp.id, time.Now()))
}
if offlineSuspended {
require.NoError(t, cache.TestSuspendNodeOffline(ctx, selectedNode.ID, time.Now()))
selectedNode.Suspended = true
require.NoError(t, cache.TestSuspendNodeOffline(ctx, disp.id, time.Now()))
}
if exiting {
now := time.Now()
_, err = cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{
NodeID: disp.id,
ExitInitiatedAt: now,
})
require.NoError(t, err)
}
if exited {
now := time.Now()
_, err = cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{
NodeID: selectedNode.ID,
NodeID: disp.id,
ExitInitiatedAt: now,
ExitLoopCompletedAt: now,
ExitFinishedAt: now,
ExitSuccess: true,
})
selectedNode.Exiting = true
require.NoError(t, err)
}
return selectedNode
return disp
}