satellite/overlay: refactor KnownReliable to be used with repairer

Currently we are using KnownUnreliableOrOffline to get missing pieces
for segment repairer (GetMissingPieces). The issue is that now repairer
is looking at more things than just missing pieces (clumped/off
placement pieces).

KnownReliable was refactored to get data (e.g. country, lastNet) about
all reliable nodes from provided list. List is split into online and
offline. This way we will be able to use results from this method to all
checks: missing pieces, clumped pieces, out of placement pieces.

This this first part of changes to handle different kind of pieces in
segment repairer.

https://github.com/storj/storj/issues/5998

Change-Id: I6cbaf59cff9d6c4346ace75bb814ccd985c0e43e
This commit is contained in:
Michal Niewrzal 2023-06-26 10:25:13 +02:00
parent 049953a7ce
commit 98f4f249b2
7 changed files with 233 additions and 146 deletions

View File

@ -64,14 +64,12 @@ func BenchmarkOverlay(b *testing.B) {
check = append(check, testrand.NodeID())
}
b.Run("KnownUnreliableOrOffline", func(b *testing.B) {
criteria := &overlay.NodeCriteria{
OnlineWindow: 1000 * time.Hour,
}
b.Run("KnownReliable", func(b *testing.B) {
onlineWindow := 1000 * time.Hour
for i := 0; i < b.N; i++ {
badNodes, err := overlaydb.KnownUnreliableOrOffline(ctx, criteria, check)
online, _, err := overlaydb.KnownReliable(ctx, check, onlineWindow, 0)
require.NoError(b, err)
require.Len(b, badNodes, OfflineCount)
require.Len(b, online, OnlineCount)
}
})

View File

@ -17,6 +17,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/exp/slices"
"storj.io/common/memory"
"storj.io/common/pb"
@ -113,36 +114,45 @@ func TestMinimumDiskSpace(t *testing.T) {
})
}
func TestOffline(t *testing.T) {
func TestOnlineOffline(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
service := satellite.Overlay.Service
// TODO: handle cleanup
result, err := service.KnownUnreliableOrOffline(ctx, []storj.NodeID{
online, offline, err := service.KnownReliable(ctx, []storj.NodeID{
planet.StorageNodes[0].ID(),
})
require.NoError(t, err)
require.Empty(t, result)
require.Empty(t, offline)
require.Len(t, online, 1)
result, err = service.KnownUnreliableOrOffline(ctx, []storj.NodeID{
online, offline, err = service.KnownReliable(ctx, []storj.NodeID{
planet.StorageNodes[0].ID(),
planet.StorageNodes[1].ID(),
planet.StorageNodes[2].ID(),
})
require.NoError(t, err)
require.Empty(t, result)
require.Empty(t, offline)
require.Len(t, online, 3)
result, err = service.KnownUnreliableOrOffline(ctx, []storj.NodeID{
unreliableNodeID := storj.NodeID{1, 2, 3, 4}
online, offline, err = service.KnownReliable(ctx, []storj.NodeID{
planet.StorageNodes[0].ID(),
{1, 2, 3, 4}, // note that this succeeds by design
unreliableNodeID,
planet.StorageNodes[2].ID(),
})
require.NoError(t, err)
require.Len(t, result, 1)
require.Equal(t, result[0], storj.NodeID{1, 2, 3, 4})
require.Empty(t, offline)
require.Len(t, online, 2)
require.False(t, slices.ContainsFunc(online, func(node overlay.SelectedNode) bool {
return node.ID == unreliableNodeID
}))
require.False(t, slices.ContainsFunc(offline, func(node overlay.SelectedNode) bool {
return node.ID == unreliableNodeID
}))
})
}

View File

@ -11,6 +11,7 @@ import (
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/exp/maps"
"storj.io/common/pb"
"storj.io/common/storj"
@ -60,12 +61,10 @@ type DB interface {
// Get looks up the node by nodeID
Get(ctx context.Context, nodeID storj.NodeID) (*NodeDossier, error)
// KnownUnreliableOrOffline filters a set of nodes to unhealth or offlines node, independent of new
KnownUnreliableOrOffline(context.Context, *NodeCriteria, storj.NodeIDList) (storj.NodeIDList, error)
// KnownReliableInExcludedCountries filters healthy nodes that are in excluded countries.
KnownReliableInExcludedCountries(context.Context, *NodeCriteria, storj.NodeIDList) (storj.NodeIDList, error)
// KnownReliable filters a set of nodes to reliable (online and qualified) nodes.
KnownReliable(ctx context.Context, onlineWindow time.Duration, nodeIDs storj.NodeIDList) ([]*pb.Node, error)
KnownReliable(ctx context.Context, nodeIDs storj.NodeIDList, onlineWindow, asOfSystemInterval time.Duration) (online []SelectedNode, offline []SelectedNode, err error)
// Reliable returns all nodes that are reliable
Reliable(context.Context, *NodeCriteria) (storj.NodeIDList, error)
// UpdateReputation updates the DB columns for all reputation fields in ReputationStatus.
@ -540,15 +539,6 @@ func (service *Service) FindStorageNodesWithPreferences(ctx context.Context, req
return nodes, nil
}
// KnownUnreliableOrOffline filters a set of nodes to unhealth or offlines node, independent of new.
func (service *Service) KnownUnreliableOrOffline(ctx context.Context, nodeIds storj.NodeIDList) (badNodes storj.NodeIDList, err error) {
defer mon.Task()(&ctx)(&err)
criteria := &NodeCriteria{
OnlineWindow: service.config.Node.OnlineWindow,
}
return service.db.KnownUnreliableOrOffline(ctx, criteria, nodeIds)
}
// InsertOfflineNodeEvents inserts offline events into node events.
func (service *Service) InsertOfflineNodeEvents(ctx context.Context, cooldown time.Duration, cutoff time.Duration, limit int) (count int, err error) {
defer mon.Task()(&ctx)(&err)
@ -594,9 +584,11 @@ func (service *Service) KnownReliableInExcludedCountries(ctx context.Context, no
}
// KnownReliable filters a set of nodes to reliable (online and qualified) nodes.
func (service *Service) KnownReliable(ctx context.Context, nodeIDs storj.NodeIDList) (nodes []*pb.Node, err error) {
func (service *Service) KnownReliable(ctx context.Context, nodeIDs storj.NodeIDList) (onlineNodes []SelectedNode, offlineNodes []SelectedNode, err error) {
defer mon.Task()(&ctx)(&err)
return service.db.KnownReliable(ctx, service.config.Node.OnlineWindow, nodeIDs)
// TODO add as of system time
return service.db.KnownReliable(ctx, nodeIDs, service.config.Node.OnlineWindow, 0)
}
// Reliable filters a set of nodes that are reliable, independent of new.
@ -771,23 +763,23 @@ func (service *Service) UpdateCheckIn(ctx context.Context, node NodeCheckInInfo,
// GetMissingPieces returns the list of offline nodes and the corresponding pieces.
func (service *Service) GetMissingPieces(ctx context.Context, pieces metabase.Pieces) (missingPieces []uint16, err error) {
defer mon.Task()(&ctx)(&err)
// TODO this method will be removed completely in subsequent change
var nodeIDs storj.NodeIDList
missingPiecesMap := map[storj.NodeID]uint16{}
for _, p := range pieces {
nodeIDs = append(nodeIDs, p.StorageNode)
missingPiecesMap[p.StorageNode] = p.Number
}
badNodeIDs, err := service.KnownUnreliableOrOffline(ctx, nodeIDs)
onlineNodes, _, err := service.KnownReliable(ctx, nodeIDs)
if err != nil {
return nil, Error.New("error getting nodes %s", err)
}
for _, p := range pieces {
for _, nodeID := range badNodeIDs {
if nodeID == p.StorageNode {
missingPieces = append(missingPieces, p.Number)
}
}
for _, node := range onlineNodes {
delete(missingPiecesMap, node.ID)
}
return missingPieces, nil
return maps.Values(missingPiecesMap), nil
}
// GetReliablePiecesInExcludedCountries returns the list of pieces held by nodes located in excluded countries.

View File

@ -434,7 +434,7 @@ func TestKnownReliable(t *testing.T) {
require.NoError(t, err)
// Check that only storage nodes #4 and #5 are reliable
result, err := service.KnownReliable(ctx, []storj.NodeID{
online, _, err := service.KnownReliable(ctx, []storj.NodeID{
planet.StorageNodes[0].ID(),
planet.StorageNodes[1].ID(),
planet.StorageNodes[2].ID(),
@ -443,7 +443,7 @@ func TestKnownReliable(t *testing.T) {
planet.StorageNodes[5].ID(),
})
require.NoError(t, err)
require.Len(t, result, 2)
require.Len(t, online, 2)
// Sort the storage nodes for predictable checks
expectedReliable := []storj.NodeURL{
@ -451,11 +451,11 @@ func TestKnownReliable(t *testing.T) {
planet.StorageNodes[5].NodeURL(),
}
sort.Slice(expectedReliable, func(i, j int) bool { return expectedReliable[i].ID.Less(expectedReliable[j].ID) })
sort.Slice(result, func(i, j int) bool { return result[i].Id.Less(result[j].Id) })
sort.Slice(online, func(i, j int) bool { return online[i].ID.Less(online[j].ID) })
// Assert the reliable nodes are the expected ones
for i, node := range result {
assert.Equal(t, expectedReliable[i].ID, node.Id)
for i, node := range online {
assert.Equal(t, expectedReliable[i].ID, node.ID)
assert.Equal(t, expectedReliable[i].Address, node.Address.Address)
}
})

View File

@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
"storj.io/common/pb"
"storj.io/common/storj"
@ -25,13 +26,10 @@ func TestStatDB(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
testDatabase(ctx, t, db.OverlayCache())
})
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
testDatabase(ctx, t, db.OverlayCache())
})
}
func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) {
{ // TestKnownUnreliableOrOffline and TestReliable
{ // Test KnownReliable and Reliable
for i, tt := range []struct {
nodeID storj.NodeID
unknownAuditSuspended bool
@ -108,16 +106,24 @@ func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) {
ExcludedCountries: []string{"FR", "BE"},
}
invalid, err := cache.KnownUnreliableOrOffline(ctx, criteria, nodeIds)
contains := func(nodeID storj.NodeID) func(node overlay.SelectedNode) bool {
return func(node overlay.SelectedNode) bool {
return node.ID == nodeID
}
}
online, offline, err := cache.KnownReliable(ctx, nodeIds, criteria.OnlineWindow, criteria.AsOfSystemInterval)
require.NoError(t, err)
require.Contains(t, invalid, storj.NodeID{2}) // disqualified
require.Contains(t, invalid, storj.NodeID{3}) // unknown audit suspended
require.Contains(t, invalid, storj.NodeID{4}) // offline
require.Contains(t, invalid, storj.NodeID{5}) // gracefully exited
require.Contains(t, invalid, storj.NodeID{6}) // offline suspended
require.Contains(t, invalid, storj.NodeID{9}) // not in db
require.Len(t, invalid, 6)
// unrealiable nodes shouldn't be in results
require.False(t, slices.ContainsFunc(append(online, offline...), contains(storj.NodeID{2}))) // disqualified
require.False(t, slices.ContainsFunc(append(online, offline...), contains(storj.NodeID{3}))) // unknown audit suspended
require.False(t, slices.ContainsFunc(append(online, offline...), contains(storj.NodeID{5}))) // gracefully exited
require.False(t, slices.ContainsFunc(append(online, offline...), contains(storj.NodeID{6}))) // offline suspended
require.False(t, slices.ContainsFunc(append(online, offline...), contains(storj.NodeID{9}))) // not in db
require.True(t, slices.ContainsFunc(offline, contains(storj.NodeID{4}))) // offline
require.Len(t, append(online, offline...), 4)
valid, err := cache.Reliable(ctx, criteria)
require.NoError(t, err)
@ -239,6 +245,5 @@ func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) {
require.NoError(t, err)
_, err = cache.Get(ctx, nodeID)
require.NoError(t, err)
}
}

View File

@ -322,22 +322,6 @@ func (cache *overlaycache) getOnlineNodesForAuditRepair(ctx context.Context, nod
return nodes, Error.Wrap(rows.Err())
}
// KnownUnreliableOrOffline filters a set of nodes to unreliable or offlines node, independent of new.
func (cache *overlaycache) KnownUnreliableOrOffline(ctx context.Context, criteria *overlay.NodeCriteria, nodeIDs storj.NodeIDList) (badNodes storj.NodeIDList, err error) {
for {
badNodes, err = cache.knownUnreliableOrOffline(ctx, criteria, nodeIDs)
if err != nil {
if cockroachutil.NeedsRetry(err) {
continue
}
return badNodes, err
}
break
}
return badNodes, err
}
// GetOfflineNodesForEmail gets nodes that we want to send an email to. These are non-disqualified, non-exited nodes where
// last_contact_success is between two points: the point where it is considered offline (offlineWindow), and the point where we don't want
// to send more emails (cutoff). It also filters nodes where last_offline_email is too recent (cooldown).
@ -463,102 +447,64 @@ func (cache *overlaycache) knownReliableInExcludedCountries(ctx context.Context,
return reliableInExcluded, Error.Wrap(rows.Err())
}
func (cache *overlaycache) knownUnreliableOrOffline(ctx context.Context, criteria *overlay.NodeCriteria, nodeIDs storj.NodeIDList) (badNodes storj.NodeIDList, err error) {
defer mon.Task()(&ctx)(&err)
if len(nodeIDs) == 0 {
return nil, Error.New("no ids provided")
}
// get reliable and online nodes
var rows tagsql.Rows
rows, err = cache.db.Query(ctx, cache.db.Rebind(`
SELECT id
FROM nodes
`+cache.db.impl.AsOfSystemInterval(criteria.AsOfSystemInterval)+`
WHERE id = any($1::bytea[])
AND disqualified IS NULL
AND unknown_audit_suspended IS NULL
AND offline_suspended IS NULL
AND exit_finished_at IS NULL
AND last_contact_success > $2
`), pgutil.NodeIDArray(nodeIDs), time.Now().Add(-criteria.OnlineWindow),
)
if err != nil {
return nil, err
}
defer func() { err = errs.Combine(err, rows.Close()) }()
goodNodes := make(map[storj.NodeID]struct{}, len(nodeIDs))
for rows.Next() {
var id storj.NodeID
err = rows.Scan(&id)
if err != nil {
return nil, err
}
goodNodes[id] = struct{}{}
}
for _, id := range nodeIDs {
if _, ok := goodNodes[id]; !ok {
badNodes = append(badNodes, id)
}
}
return badNodes, Error.Wrap(rows.Err())
}
// KnownReliable filters a set of nodes to reliable (online and qualified) nodes.
func (cache *overlaycache) KnownReliable(ctx context.Context, onlineWindow time.Duration, nodeIDs storj.NodeIDList) (nodes []*pb.Node, err error) {
// KnownReliable filters a set of nodes to reliable nodes. List is split into online and offline nodes.
func (cache *overlaycache) KnownReliable(ctx context.Context, nodeIDs storj.NodeIDList, onlineWindow, asOfSystemInterval time.Duration) (online []overlay.SelectedNode, offline []overlay.SelectedNode, err error) {
for {
nodes, err = cache.knownReliable(ctx, onlineWindow, nodeIDs)
online, offline, err = cache.knownReliable(ctx, nodeIDs, onlineWindow, asOfSystemInterval)
if err != nil {
if cockroachutil.NeedsRetry(err) {
continue
}
return nodes, err
return nil, nil, err
}
break
}
return nodes, err
return online, offline, err
}
func (cache *overlaycache) knownReliable(ctx context.Context, onlineWindow time.Duration, nodeIDs storj.NodeIDList) (nodes []*pb.Node, err error) {
func (cache *overlaycache) knownReliable(ctx context.Context, nodeIDs storj.NodeIDList, onlineWindow, asOfSystemInterval time.Duration) (online []overlay.SelectedNode, offline []overlay.SelectedNode, err error) {
defer mon.Task()(&ctx)(&err)
if len(nodeIDs) == 0 {
return nil, Error.New("no ids provided")
return nil, nil, Error.New("no ids provided")
}
// get online nodes
rows, err := cache.db.Query(ctx, cache.db.Rebind(`
SELECT id, last_net, last_ip_port, address, protocol, noise_proto, noise_public_key, debounce_limit, features
FROM nodes
WHERE id = any($1::bytea[])
err = withRows(cache.db.Query(ctx, `
SELECT id, address, last_net, last_ip_port, country_code, last_contact_success > $2 as online
FROM nodes
`+cache.db.impl.AsOfSystemInterval(asOfSystemInterval)+`
WHERE id = any($1::bytea[])
AND disqualified IS NULL
AND unknown_audit_suspended IS NULL
AND offline_suspended IS NULL
AND exit_finished_at IS NULL
AND last_contact_success > $2
`), pgutil.NodeIDArray(nodeIDs), time.Now().Add(-onlineWindow),
)
if err != nil {
return nil, err
}
defer func() { err = errs.Combine(err, rows.Close()) }()
`, pgutil.NodeIDArray(nodeIDs), time.Now().Add(-onlineWindow),
))(func(rows tagsql.Rows) error {
for rows.Next() {
var onlineNode bool
var node overlay.SelectedNode
node.Address = &pb.NodeAddress{}
var lastIPPort sql.NullString
err = rows.Scan(&node.ID, &node.Address.Address, &node.LastNet, &lastIPPort, &node.CountryCode, &onlineNode)
if err != nil {
return err
}
for rows.Next() {
row := &dbx.Node{}
err = rows.Scan(&row.Id, &row.LastNet, &row.LastIpPort, &row.Address, &row.Protocol, &row.NoiseProto, &row.NoisePublicKey, &row.DebounceLimit, &row.Features)
if err != nil {
return nil, err
if lastIPPort.Valid {
node.LastIPPort = lastIPPort.String
}
if onlineNode {
online = append(online, node)
} else {
offline = append(offline, node)
}
}
node, err := convertDBNode(ctx, row)
if err != nil {
return nil, err
}
nodes = append(nodes, &node.Node)
}
return nodes, Error.Wrap(rows.Err())
return nil
})
return online, offline, Error.Wrap(err)
}
// Reliable returns all reliable nodes.

View File

@ -418,3 +418,139 @@ func TestOverlayCache_SelectAllStorageNodesDownloadUpload(t *testing.T) {
})
}
func TestOverlayCache_KnownReliable(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
cache := db.OverlayCache()
allNodes := []overlay.SelectedNode{
addNode(ctx, t, cache, "online", "127.0.0.1", true, false, false, false, false),
addNode(ctx, t, cache, "offline", "127.0.0.2", false, false, false, false, false),
addNode(ctx, t, cache, "disqalified", "127.0.0.3", false, true, false, false, false),
addNode(ctx, t, cache, "audit-suspended", "127.0.0.4", false, false, true, false, false),
addNode(ctx, t, cache, "offline-suspended", "127.0.0.5", false, false, false, true, false),
addNode(ctx, t, cache, "exited", "127.0.0.6", false, false, false, false, true),
}
ids := func(nodes ...overlay.SelectedNode) storj.NodeIDList {
nodeIds := storj.NodeIDList{}
for _, node := range nodes {
nodeIds = append(nodeIds, node.ID)
}
return nodeIds
}
nodes := func(nodes ...overlay.SelectedNode) []overlay.SelectedNode {
return append([]overlay.SelectedNode{}, nodes...)
}
type testCase struct {
IDs storj.NodeIDList
Online []overlay.SelectedNode
Offline []overlay.SelectedNode
}
shuffledNodeIDs := ids(allNodes...)
rand.Shuffle(len(shuffledNodeIDs), shuffledNodeIDs.Swap)
for _, tc := range []testCase{
{
IDs: ids(allNodes[0], allNodes[1]),
Online: nodes(allNodes[0]),
Offline: nodes(allNodes[1]),
},
{
IDs: ids(allNodes[0]),
Online: nodes(allNodes[0]),
},
{
IDs: ids(allNodes[1]),
Offline: nodes(allNodes[1]),
},
{ // only unreliable
IDs: ids(allNodes[2], allNodes[3], allNodes[4], allNodes[5]),
},
{ // all nodes
IDs: ids(allNodes...),
Online: nodes(allNodes[0]),
Offline: nodes(allNodes[1]),
},
// all nodes but in shuffled order
{
IDs: shuffledNodeIDs,
Online: nodes(allNodes[0]),
Offline: nodes(allNodes[1]),
},
// all nodes + one ID not from DB
{
IDs: append(ids(allNodes...), testrand.NodeID()),
Online: nodes(allNodes[0]),
Offline: nodes(allNodes[1]),
},
} {
online, offline, err := cache.KnownReliable(ctx, tc.IDs, 1*time.Hour, 0)
require.NoError(t, err)
require.ElementsMatch(t, tc.Online, online)
require.ElementsMatch(t, tc.Offline, offline)
}
_, _, err := cache.KnownReliable(ctx, storj.NodeIDList{}, 1*time.Hour, 0)
require.Error(t, err)
})
}
func addNode(ctx context.Context, t *testing.T, cache overlay.DB, address, lastIPPort string, online, disqalified, auditSuspended, offlineSuspended, exited bool) overlay.SelectedNode {
selectedNode := overlay.SelectedNode{
ID: testrand.NodeID(),
Address: &pb.NodeAddress{Address: address},
LastNet: lastIPPort,
LastIPPort: lastIPPort,
CountryCode: location.Poland,
}
checkInInfo := overlay.NodeCheckInInfo{
IsUp: true,
NodeID: selectedNode.ID,
Address: &pb.NodeAddress{Address: selectedNode.Address.Address},
LastIPPort: selectedNode.LastIPPort,
LastNet: selectedNode.LastNet,
CountryCode: selectedNode.CountryCode,
Version: &pb.NodeVersion{Version: "v0.0.0"},
}
timestamp := time.Now().UTC()
if !online {
timestamp = time.Now().Add(-10 * time.Hour)
}
err := cache.UpdateCheckIn(ctx, checkInInfo, timestamp, overlay.NodeSelectionConfig{})
require.NoError(t, err)
if disqalified {
_, err := cache.DisqualifyNode(ctx, selectedNode.ID, time.Now(), overlay.DisqualificationReasonAuditFailure)
require.NoError(t, err)
}
if auditSuspended {
require.NoError(t, cache.TestSuspendNodeUnknownAudit(ctx, selectedNode.ID, time.Now()))
}
if offlineSuspended {
require.NoError(t, cache.TestSuspendNodeOffline(ctx, selectedNode.ID, time.Now()))
}
if exited {
now := time.Now()
_, err = cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{
NodeID: selectedNode.ID,
ExitInitiatedAt: now,
ExitLoopCompletedAt: now,
ExitFinishedAt: now,
ExitSuccess: true,
})
require.NoError(t, err)
}
return selectedNode
}