diff --git a/satellite/overlay/benchmark_test.go b/satellite/overlay/benchmark_test.go index f5c929cd4..b2e81973c 100644 --- a/satellite/overlay/benchmark_test.go +++ b/satellite/overlay/benchmark_test.go @@ -64,14 +64,12 @@ func BenchmarkOverlay(b *testing.B) { check = append(check, testrand.NodeID()) } - b.Run("KnownUnreliableOrOffline", func(b *testing.B) { - criteria := &overlay.NodeCriteria{ - OnlineWindow: 1000 * time.Hour, - } + b.Run("KnownReliable", func(b *testing.B) { + onlineWindow := 1000 * time.Hour for i := 0; i < b.N; i++ { - badNodes, err := overlaydb.KnownUnreliableOrOffline(ctx, criteria, check) + online, _, err := overlaydb.KnownReliable(ctx, check, onlineWindow, 0) require.NoError(b, err) - require.Len(b, badNodes, OfflineCount) + require.Len(b, online, OnlineCount) } }) diff --git a/satellite/overlay/selection_test.go b/satellite/overlay/selection_test.go index 05cf74d2d..4e2ed7858 100644 --- a/satellite/overlay/selection_test.go +++ b/satellite/overlay/selection_test.go @@ -17,6 +17,7 @@ import ( "github.com/stretchr/testify/require" "github.com/zeebo/errs" "go.uber.org/zap" + "golang.org/x/exp/slices" "storj.io/common/memory" "storj.io/common/pb" @@ -113,36 +114,45 @@ func TestMinimumDiskSpace(t *testing.T) { }) } -func TestOffline(t *testing.T) { +func TestOnlineOffline(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { satellite := planet.Satellites[0] service := satellite.Overlay.Service - // TODO: handle cleanup - result, err := service.KnownUnreliableOrOffline(ctx, []storj.NodeID{ + online, offline, err := service.KnownReliable(ctx, []storj.NodeID{ planet.StorageNodes[0].ID(), }) require.NoError(t, err) - require.Empty(t, result) + require.Empty(t, offline) + require.Len(t, online, 1) - result, err = service.KnownUnreliableOrOffline(ctx, []storj.NodeID{ + online, offline, err = service.KnownReliable(ctx, []storj.NodeID{ planet.StorageNodes[0].ID(), planet.StorageNodes[1].ID(), planet.StorageNodes[2].ID(), }) require.NoError(t, err) - require.Empty(t, result) + require.Empty(t, offline) + require.Len(t, online, 3) - result, err = service.KnownUnreliableOrOffline(ctx, []storj.NodeID{ + unreliableNodeID := storj.NodeID{1, 2, 3, 4} + online, offline, err = service.KnownReliable(ctx, []storj.NodeID{ planet.StorageNodes[0].ID(), - {1, 2, 3, 4}, // note that this succeeds by design + unreliableNodeID, planet.StorageNodes[2].ID(), }) require.NoError(t, err) - require.Len(t, result, 1) - require.Equal(t, result[0], storj.NodeID{1, 2, 3, 4}) + require.Empty(t, offline) + require.Len(t, online, 2) + + require.False(t, slices.ContainsFunc(online, func(node overlay.SelectedNode) bool { + return node.ID == unreliableNodeID + })) + require.False(t, slices.ContainsFunc(offline, func(node overlay.SelectedNode) bool { + return node.ID == unreliableNodeID + })) }) } diff --git a/satellite/overlay/service.go b/satellite/overlay/service.go index 34f8a4b81..b26ad3128 100644 --- a/satellite/overlay/service.go +++ b/satellite/overlay/service.go @@ -11,6 +11,7 @@ import ( "github.com/zeebo/errs" "go.uber.org/zap" + "golang.org/x/exp/maps" "storj.io/common/pb" "storj.io/common/storj" @@ -60,12 +61,10 @@ type DB interface { // Get looks up the node by nodeID Get(ctx context.Context, nodeID storj.NodeID) (*NodeDossier, error) - // KnownUnreliableOrOffline filters a set of nodes to unhealth or offlines node, independent of new - KnownUnreliableOrOffline(context.Context, *NodeCriteria, storj.NodeIDList) (storj.NodeIDList, error) // KnownReliableInExcludedCountries filters healthy nodes that are in excluded countries. KnownReliableInExcludedCountries(context.Context, *NodeCriteria, storj.NodeIDList) (storj.NodeIDList, error) // KnownReliable filters a set of nodes to reliable (online and qualified) nodes. - KnownReliable(ctx context.Context, onlineWindow time.Duration, nodeIDs storj.NodeIDList) ([]*pb.Node, error) + KnownReliable(ctx context.Context, nodeIDs storj.NodeIDList, onlineWindow, asOfSystemInterval time.Duration) (online []SelectedNode, offline []SelectedNode, err error) // Reliable returns all nodes that are reliable Reliable(context.Context, *NodeCriteria) (storj.NodeIDList, error) // UpdateReputation updates the DB columns for all reputation fields in ReputationStatus. @@ -540,15 +539,6 @@ func (service *Service) FindStorageNodesWithPreferences(ctx context.Context, req return nodes, nil } -// KnownUnreliableOrOffline filters a set of nodes to unhealth or offlines node, independent of new. -func (service *Service) KnownUnreliableOrOffline(ctx context.Context, nodeIds storj.NodeIDList) (badNodes storj.NodeIDList, err error) { - defer mon.Task()(&ctx)(&err) - criteria := &NodeCriteria{ - OnlineWindow: service.config.Node.OnlineWindow, - } - return service.db.KnownUnreliableOrOffline(ctx, criteria, nodeIds) -} - // InsertOfflineNodeEvents inserts offline events into node events. func (service *Service) InsertOfflineNodeEvents(ctx context.Context, cooldown time.Duration, cutoff time.Duration, limit int) (count int, err error) { defer mon.Task()(&ctx)(&err) @@ -594,9 +584,11 @@ func (service *Service) KnownReliableInExcludedCountries(ctx context.Context, no } // KnownReliable filters a set of nodes to reliable (online and qualified) nodes. -func (service *Service) KnownReliable(ctx context.Context, nodeIDs storj.NodeIDList) (nodes []*pb.Node, err error) { +func (service *Service) KnownReliable(ctx context.Context, nodeIDs storj.NodeIDList) (onlineNodes []SelectedNode, offlineNodes []SelectedNode, err error) { defer mon.Task()(&ctx)(&err) - return service.db.KnownReliable(ctx, service.config.Node.OnlineWindow, nodeIDs) + + // TODO add as of system time + return service.db.KnownReliable(ctx, nodeIDs, service.config.Node.OnlineWindow, 0) } // Reliable filters a set of nodes that are reliable, independent of new. @@ -771,23 +763,23 @@ func (service *Service) UpdateCheckIn(ctx context.Context, node NodeCheckInInfo, // GetMissingPieces returns the list of offline nodes and the corresponding pieces. func (service *Service) GetMissingPieces(ctx context.Context, pieces metabase.Pieces) (missingPieces []uint16, err error) { defer mon.Task()(&ctx)(&err) + + // TODO this method will be removed completely in subsequent change var nodeIDs storj.NodeIDList + missingPiecesMap := map[storj.NodeID]uint16{} for _, p := range pieces { nodeIDs = append(nodeIDs, p.StorageNode) + missingPiecesMap[p.StorageNode] = p.Number } - badNodeIDs, err := service.KnownUnreliableOrOffline(ctx, nodeIDs) + onlineNodes, _, err := service.KnownReliable(ctx, nodeIDs) if err != nil { return nil, Error.New("error getting nodes %s", err) } - for _, p := range pieces { - for _, nodeID := range badNodeIDs { - if nodeID == p.StorageNode { - missingPieces = append(missingPieces, p.Number) - } - } + for _, node := range onlineNodes { + delete(missingPiecesMap, node.ID) } - return missingPieces, nil + return maps.Values(missingPiecesMap), nil } // GetReliablePiecesInExcludedCountries returns the list of pieces held by nodes located in excluded countries. diff --git a/satellite/overlay/service_test.go b/satellite/overlay/service_test.go index 4abf984ab..cc84aea03 100644 --- a/satellite/overlay/service_test.go +++ b/satellite/overlay/service_test.go @@ -434,7 +434,7 @@ func TestKnownReliable(t *testing.T) { require.NoError(t, err) // Check that only storage nodes #4 and #5 are reliable - result, err := service.KnownReliable(ctx, []storj.NodeID{ + online, _, err := service.KnownReliable(ctx, []storj.NodeID{ planet.StorageNodes[0].ID(), planet.StorageNodes[1].ID(), planet.StorageNodes[2].ID(), @@ -443,7 +443,7 @@ func TestKnownReliable(t *testing.T) { planet.StorageNodes[5].ID(), }) require.NoError(t, err) - require.Len(t, result, 2) + require.Len(t, online, 2) // Sort the storage nodes for predictable checks expectedReliable := []storj.NodeURL{ @@ -451,11 +451,11 @@ func TestKnownReliable(t *testing.T) { planet.StorageNodes[5].NodeURL(), } sort.Slice(expectedReliable, func(i, j int) bool { return expectedReliable[i].ID.Less(expectedReliable[j].ID) }) - sort.Slice(result, func(i, j int) bool { return result[i].Id.Less(result[j].Id) }) + sort.Slice(online, func(i, j int) bool { return online[i].ID.Less(online[j].ID) }) // Assert the reliable nodes are the expected ones - for i, node := range result { - assert.Equal(t, expectedReliable[i].ID, node.Id) + for i, node := range online { + assert.Equal(t, expectedReliable[i].ID, node.ID) assert.Equal(t, expectedReliable[i].Address, node.Address.Address) } }) diff --git a/satellite/overlay/statdb_test.go b/satellite/overlay/statdb_test.go index a22636e9b..b8561e3cb 100644 --- a/satellite/overlay/statdb_test.go +++ b/satellite/overlay/statdb_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" "storj.io/common/pb" "storj.io/common/storj" @@ -25,13 +26,10 @@ func TestStatDB(t *testing.T) { satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { testDatabase(ctx, t, db.OverlayCache()) }) - satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { - testDatabase(ctx, t, db.OverlayCache()) - }) } func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) { - { // TestKnownUnreliableOrOffline and TestReliable + { // Test KnownReliable and Reliable for i, tt := range []struct { nodeID storj.NodeID unknownAuditSuspended bool @@ -108,16 +106,24 @@ func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) { ExcludedCountries: []string{"FR", "BE"}, } - invalid, err := cache.KnownUnreliableOrOffline(ctx, criteria, nodeIds) + contains := func(nodeID storj.NodeID) func(node overlay.SelectedNode) bool { + return func(node overlay.SelectedNode) bool { + return node.ID == nodeID + } + } + + online, offline, err := cache.KnownReliable(ctx, nodeIds, criteria.OnlineWindow, criteria.AsOfSystemInterval) require.NoError(t, err) - require.Contains(t, invalid, storj.NodeID{2}) // disqualified - require.Contains(t, invalid, storj.NodeID{3}) // unknown audit suspended - require.Contains(t, invalid, storj.NodeID{4}) // offline - require.Contains(t, invalid, storj.NodeID{5}) // gracefully exited - require.Contains(t, invalid, storj.NodeID{6}) // offline suspended - require.Contains(t, invalid, storj.NodeID{9}) // not in db - require.Len(t, invalid, 6) + // unrealiable nodes shouldn't be in results + require.False(t, slices.ContainsFunc(append(online, offline...), contains(storj.NodeID{2}))) // disqualified + require.False(t, slices.ContainsFunc(append(online, offline...), contains(storj.NodeID{3}))) // unknown audit suspended + require.False(t, slices.ContainsFunc(append(online, offline...), contains(storj.NodeID{5}))) // gracefully exited + require.False(t, slices.ContainsFunc(append(online, offline...), contains(storj.NodeID{6}))) // offline suspended + require.False(t, slices.ContainsFunc(append(online, offline...), contains(storj.NodeID{9}))) // not in db + + require.True(t, slices.ContainsFunc(offline, contains(storj.NodeID{4}))) // offline + require.Len(t, append(online, offline...), 4) valid, err := cache.Reliable(ctx, criteria) require.NoError(t, err) @@ -239,6 +245,5 @@ func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) { require.NoError(t, err) _, err = cache.Get(ctx, nodeID) require.NoError(t, err) - } } diff --git a/satellite/satellitedb/overlaycache.go b/satellite/satellitedb/overlaycache.go index 42c9818c5..d0e879ed8 100644 --- a/satellite/satellitedb/overlaycache.go +++ b/satellite/satellitedb/overlaycache.go @@ -322,22 +322,6 @@ func (cache *overlaycache) getOnlineNodesForAuditRepair(ctx context.Context, nod return nodes, Error.Wrap(rows.Err()) } -// KnownUnreliableOrOffline filters a set of nodes to unreliable or offlines node, independent of new. -func (cache *overlaycache) KnownUnreliableOrOffline(ctx context.Context, criteria *overlay.NodeCriteria, nodeIDs storj.NodeIDList) (badNodes storj.NodeIDList, err error) { - for { - badNodes, err = cache.knownUnreliableOrOffline(ctx, criteria, nodeIDs) - if err != nil { - if cockroachutil.NeedsRetry(err) { - continue - } - return badNodes, err - } - break - } - - return badNodes, err -} - // GetOfflineNodesForEmail gets nodes that we want to send an email to. These are non-disqualified, non-exited nodes where // last_contact_success is between two points: the point where it is considered offline (offlineWindow), and the point where we don't want // to send more emails (cutoff). It also filters nodes where last_offline_email is too recent (cooldown). @@ -463,102 +447,64 @@ func (cache *overlaycache) knownReliableInExcludedCountries(ctx context.Context, return reliableInExcluded, Error.Wrap(rows.Err()) } -func (cache *overlaycache) knownUnreliableOrOffline(ctx context.Context, criteria *overlay.NodeCriteria, nodeIDs storj.NodeIDList) (badNodes storj.NodeIDList, err error) { - defer mon.Task()(&ctx)(&err) - - if len(nodeIDs) == 0 { - return nil, Error.New("no ids provided") - } - - // get reliable and online nodes - var rows tagsql.Rows - rows, err = cache.db.Query(ctx, cache.db.Rebind(` - SELECT id - FROM nodes - `+cache.db.impl.AsOfSystemInterval(criteria.AsOfSystemInterval)+` - WHERE id = any($1::bytea[]) - AND disqualified IS NULL - AND unknown_audit_suspended IS NULL - AND offline_suspended IS NULL - AND exit_finished_at IS NULL - AND last_contact_success > $2 - `), pgutil.NodeIDArray(nodeIDs), time.Now().Add(-criteria.OnlineWindow), - ) - if err != nil { - return nil, err - } - defer func() { err = errs.Combine(err, rows.Close()) }() - - goodNodes := make(map[storj.NodeID]struct{}, len(nodeIDs)) - for rows.Next() { - var id storj.NodeID - err = rows.Scan(&id) - if err != nil { - return nil, err - } - goodNodes[id] = struct{}{} - } - for _, id := range nodeIDs { - if _, ok := goodNodes[id]; !ok { - badNodes = append(badNodes, id) - } - } - return badNodes, Error.Wrap(rows.Err()) -} - -// KnownReliable filters a set of nodes to reliable (online and qualified) nodes. -func (cache *overlaycache) KnownReliable(ctx context.Context, onlineWindow time.Duration, nodeIDs storj.NodeIDList) (nodes []*pb.Node, err error) { +// KnownReliable filters a set of nodes to reliable nodes. List is split into online and offline nodes. +func (cache *overlaycache) KnownReliable(ctx context.Context, nodeIDs storj.NodeIDList, onlineWindow, asOfSystemInterval time.Duration) (online []overlay.SelectedNode, offline []overlay.SelectedNode, err error) { for { - nodes, err = cache.knownReliable(ctx, onlineWindow, nodeIDs) + online, offline, err = cache.knownReliable(ctx, nodeIDs, onlineWindow, asOfSystemInterval) if err != nil { if cockroachutil.NeedsRetry(err) { continue } - return nodes, err + return nil, nil, err } break } - return nodes, err + return online, offline, err } -func (cache *overlaycache) knownReliable(ctx context.Context, onlineWindow time.Duration, nodeIDs storj.NodeIDList) (nodes []*pb.Node, err error) { +func (cache *overlaycache) knownReliable(ctx context.Context, nodeIDs storj.NodeIDList, onlineWindow, asOfSystemInterval time.Duration) (online []overlay.SelectedNode, offline []overlay.SelectedNode, err error) { defer mon.Task()(&ctx)(&err) if len(nodeIDs) == 0 { - return nil, Error.New("no ids provided") + return nil, nil, Error.New("no ids provided") } - // get online nodes - rows, err := cache.db.Query(ctx, cache.db.Rebind(` - SELECT id, last_net, last_ip_port, address, protocol, noise_proto, noise_public_key, debounce_limit, features - FROM nodes - WHERE id = any($1::bytea[]) + err = withRows(cache.db.Query(ctx, ` + SELECT id, address, last_net, last_ip_port, country_code, last_contact_success > $2 as online + FROM nodes + `+cache.db.impl.AsOfSystemInterval(asOfSystemInterval)+` + WHERE id = any($1::bytea[]) AND disqualified IS NULL AND unknown_audit_suspended IS NULL AND offline_suspended IS NULL AND exit_finished_at IS NULL - AND last_contact_success > $2 - `), pgutil.NodeIDArray(nodeIDs), time.Now().Add(-onlineWindow), - ) - if err != nil { - return nil, err - } - defer func() { err = errs.Combine(err, rows.Close()) }() + `, pgutil.NodeIDArray(nodeIDs), time.Now().Add(-onlineWindow), + ))(func(rows tagsql.Rows) error { + for rows.Next() { + var onlineNode bool + var node overlay.SelectedNode + node.Address = &pb.NodeAddress{} + var lastIPPort sql.NullString + err = rows.Scan(&node.ID, &node.Address.Address, &node.LastNet, &lastIPPort, &node.CountryCode, &onlineNode) + if err != nil { + return err + } - for rows.Next() { - row := &dbx.Node{} - err = rows.Scan(&row.Id, &row.LastNet, &row.LastIpPort, &row.Address, &row.Protocol, &row.NoiseProto, &row.NoisePublicKey, &row.DebounceLimit, &row.Features) - if err != nil { - return nil, err + if lastIPPort.Valid { + node.LastIPPort = lastIPPort.String + } + + if onlineNode { + online = append(online, node) + } else { + offline = append(offline, node) + } } - node, err := convertDBNode(ctx, row) - if err != nil { - return nil, err - } - nodes = append(nodes, &node.Node) - } - return nodes, Error.Wrap(rows.Err()) + return nil + }) + + return online, offline, Error.Wrap(err) } // Reliable returns all reliable nodes. diff --git a/satellite/satellitedb/overlaycache_test.go b/satellite/satellitedb/overlaycache_test.go index e76fa21e2..ea1af0299 100644 --- a/satellite/satellitedb/overlaycache_test.go +++ b/satellite/satellitedb/overlaycache_test.go @@ -418,3 +418,139 @@ func TestOverlayCache_SelectAllStorageNodesDownloadUpload(t *testing.T) { }) } + +func TestOverlayCache_KnownReliable(t *testing.T) { + satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { + cache := db.OverlayCache() + + allNodes := []overlay.SelectedNode{ + addNode(ctx, t, cache, "online", "127.0.0.1", true, false, false, false, false), + addNode(ctx, t, cache, "offline", "127.0.0.2", false, false, false, false, false), + addNode(ctx, t, cache, "disqalified", "127.0.0.3", false, true, false, false, false), + addNode(ctx, t, cache, "audit-suspended", "127.0.0.4", false, false, true, false, false), + addNode(ctx, t, cache, "offline-suspended", "127.0.0.5", false, false, false, true, false), + addNode(ctx, t, cache, "exited", "127.0.0.6", false, false, false, false, true), + } + + ids := func(nodes ...overlay.SelectedNode) storj.NodeIDList { + nodeIds := storj.NodeIDList{} + for _, node := range nodes { + nodeIds = append(nodeIds, node.ID) + } + return nodeIds + } + + nodes := func(nodes ...overlay.SelectedNode) []overlay.SelectedNode { + return append([]overlay.SelectedNode{}, nodes...) + } + + type testCase struct { + IDs storj.NodeIDList + Online []overlay.SelectedNode + Offline []overlay.SelectedNode + } + + shuffledNodeIDs := ids(allNodes...) + rand.Shuffle(len(shuffledNodeIDs), shuffledNodeIDs.Swap) + + for _, tc := range []testCase{ + { + IDs: ids(allNodes[0], allNodes[1]), + Online: nodes(allNodes[0]), + Offline: nodes(allNodes[1]), + }, + { + IDs: ids(allNodes[0]), + Online: nodes(allNodes[0]), + }, + { + IDs: ids(allNodes[1]), + Offline: nodes(allNodes[1]), + }, + { // only unreliable + IDs: ids(allNodes[2], allNodes[3], allNodes[4], allNodes[5]), + }, + + { // all nodes + IDs: ids(allNodes...), + Online: nodes(allNodes[0]), + Offline: nodes(allNodes[1]), + }, + // all nodes but in shuffled order + { + IDs: shuffledNodeIDs, + Online: nodes(allNodes[0]), + Offline: nodes(allNodes[1]), + }, + // all nodes + one ID not from DB + { + IDs: append(ids(allNodes...), testrand.NodeID()), + Online: nodes(allNodes[0]), + Offline: nodes(allNodes[1]), + }, + } { + online, offline, err := cache.KnownReliable(ctx, tc.IDs, 1*time.Hour, 0) + require.NoError(t, err) + require.ElementsMatch(t, tc.Online, online) + require.ElementsMatch(t, tc.Offline, offline) + } + + _, _, err := cache.KnownReliable(ctx, storj.NodeIDList{}, 1*time.Hour, 0) + require.Error(t, err) + }) +} + +func addNode(ctx context.Context, t *testing.T, cache overlay.DB, address, lastIPPort string, online, disqalified, auditSuspended, offlineSuspended, exited bool) overlay.SelectedNode { + selectedNode := overlay.SelectedNode{ + ID: testrand.NodeID(), + Address: &pb.NodeAddress{Address: address}, + LastNet: lastIPPort, + LastIPPort: lastIPPort, + CountryCode: location.Poland, + } + + checkInInfo := overlay.NodeCheckInInfo{ + IsUp: true, + NodeID: selectedNode.ID, + Address: &pb.NodeAddress{Address: selectedNode.Address.Address}, + LastIPPort: selectedNode.LastIPPort, + LastNet: selectedNode.LastNet, + CountryCode: selectedNode.CountryCode, + Version: &pb.NodeVersion{Version: "v0.0.0"}, + } + + timestamp := time.Now().UTC() + if !online { + timestamp = time.Now().Add(-10 * time.Hour) + } + + err := cache.UpdateCheckIn(ctx, checkInInfo, timestamp, overlay.NodeSelectionConfig{}) + require.NoError(t, err) + + if disqalified { + _, err := cache.DisqualifyNode(ctx, selectedNode.ID, time.Now(), overlay.DisqualificationReasonAuditFailure) + require.NoError(t, err) + } + + if auditSuspended { + require.NoError(t, cache.TestSuspendNodeUnknownAudit(ctx, selectedNode.ID, time.Now())) + } + + if offlineSuspended { + require.NoError(t, cache.TestSuspendNodeOffline(ctx, selectedNode.ID, time.Now())) + } + + if exited { + now := time.Now() + _, err = cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{ + NodeID: selectedNode.ID, + ExitInitiatedAt: now, + ExitLoopCompletedAt: now, + ExitFinishedAt: now, + ExitSuccess: true, + }) + require.NoError(t, err) + } + + return selectedNode +}