satellite/overlay: filter reliable nodes from a list

Adds the KnownReliable method to Overlay Service that filters all nodes
from the given list to be only reliable nodes (online and qualified).
The method return []*pb.Node of reliable nodes. The pb.Node values are
ready for dialing.

The first use case is when deleting an object to efficiently dial all
reliable nodes holding a piece of that object and send them a delete
request.

Change-Id: I13e0a8666f3807c5c31ef1a1087476018a5d3acb
This commit is contained in:
Kaloyan Raev 2019-12-16 15:45:13 +02:00
parent efa08d4081
commit 5ee1a00857
3 changed files with 108 additions and 0 deletions

View File

@ -47,6 +47,8 @@ type DB interface {
KnownOffline(context.Context, *NodeCriteria, storj.NodeIDList) (storj.NodeIDList, error) KnownOffline(context.Context, *NodeCriteria, storj.NodeIDList) (storj.NodeIDList, error)
// KnownUnreliableOrOffline filters a set of nodes to unhealth or offlines node, independent of new // KnownUnreliableOrOffline filters a set of nodes to unhealth or offlines node, independent of new
KnownUnreliableOrOffline(context.Context, *NodeCriteria, storj.NodeIDList) (storj.NodeIDList, error) KnownUnreliableOrOffline(context.Context, *NodeCriteria, storj.NodeIDList) (storj.NodeIDList, error)
// KnownReliable filters a set of nodes to reliable (online and qualified) nodes.
KnownReliable(ctx context.Context, onlineWindow time.Duration, nodeIDs storj.NodeIDList) ([]*pb.Node, error)
// Reliable returns all nodes that are reliable // Reliable returns all nodes that are reliable
Reliable(context.Context, *NodeCriteria) (storj.NodeIDList, error) Reliable(context.Context, *NodeCriteria) (storj.NodeIDList, error)
// Paginate will page through the database nodes // Paginate will page through the database nodes
@ -339,6 +341,12 @@ func (service *Service) KnownUnreliableOrOffline(ctx context.Context, nodeIds st
return service.db.KnownUnreliableOrOffline(ctx, criteria, nodeIds) return service.db.KnownUnreliableOrOffline(ctx, criteria, nodeIds)
} }
// KnownReliable filters a set of nodes to reliable (online and qualified) nodes.
func (service *Service) KnownReliable(ctx context.Context, nodeIDs storj.NodeIDList) (nodes []*pb.Node, err error) {
defer mon.Task()(&ctx)(&err)
return service.db.KnownReliable(ctx, service.config.Node.OnlineWindow, nodeIDs)
}
// Reliable filters a set of nodes that are reliable, independent of new. // Reliable filters a set of nodes that are reliable, independent of new.
func (service *Service) Reliable(ctx context.Context) (nodes storj.NodeIDList, err error) { func (service *Service) Reliable(ctx context.Context) (nodes storj.NodeIDList, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)

View File

@ -5,11 +5,13 @@ package overlay_test
import ( import (
"context" "context"
"sort"
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest" "go.uber.org/zap/zaptest"
"storj.io/storj/pkg/pb" "storj.io/storj/pkg/pb"
@ -312,6 +314,68 @@ func TestNodeInfo(t *testing.T) {
}) })
} }
func TestKnownReliable(t *testing.T) {
onlineWindow := 500 * time.Millisecond
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Overlay.Node.OnlineWindow = onlineWindow
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
service := satellite.Overlay.Service
// Disqualify storage node #0
stats, err := service.UpdateStats(ctx, &overlay.UpdateRequest{
NodeID: planet.StorageNodes[0].ID(),
AuditSuccess: false,
})
require.NoError(t, err)
require.NotNil(t, stats.Disqualified)
// Stop storage node #1
err = planet.StopPeer(planet.StorageNodes[1])
require.NoError(t, err)
_, err = service.UpdateUptime(ctx, planet.StorageNodes[1].ID(), false)
require.NoError(t, err)
// Sleep for the duration of the online window and check that storage node #1 is offline
time.Sleep(onlineWindow)
node, err := service.Get(ctx, planet.StorageNodes[1].ID())
require.NoError(t, err)
require.False(t, service.IsOnline(node))
// Force storage node #2 and #3 ping the satellite, so they can maintain their online status
planet.StorageNodes[2].Contact.Chore.TriggerWait(ctx)
planet.StorageNodes[3].Contact.Chore.TriggerWait(ctx)
// Check that only storage nodes #2 and #3 are reliable
result, err := service.KnownReliable(ctx, []storj.NodeID{
planet.StorageNodes[0].ID(),
planet.StorageNodes[1].ID(),
planet.StorageNodes[2].ID(),
planet.StorageNodes[3].ID(),
})
require.NoError(t, err)
require.Len(t, result, 2)
// Sort the storage nodes for predictable checks
expectedReliable := []pb.Node{planet.StorageNodes[2].Local().Node, planet.StorageNodes[3].Local().Node}
sort.Slice(expectedReliable, func(i, j int) bool { return expectedReliable[i].Id.Less(expectedReliable[j].Id) })
sort.Slice(result, func(i, j int) bool { return result[i].Id.Less(result[j].Id) })
// Assert the reliable nodes are the expected ones
for i, node := range result {
assert.Equal(t, expectedReliable[i].Id, node.Id)
assert.Equal(t, expectedReliable[i].Address, node.Address)
assert.NotNil(t, node.LastIp)
}
})
}
func TestUpdateCheckIn(t *testing.T) { func TestUpdateCheckIn(t *testing.T) {
satellitedbtest.Run(t, func(t *testing.T, db satellite.DB) { satellitedbtest.Run(t, func(t *testing.T, db satellite.DB) {
ctx := testcontext.New(t) ctx := testcontext.New(t)

View File

@ -384,6 +384,42 @@ func (cache *overlaycache) KnownUnreliableOrOffline(ctx context.Context, criteri
return badNodes, nil return badNodes, nil
} }
// KnownReliable filters a set of nodes to reliable (online and qualified) nodes.
func (cache *overlaycache) KnownReliable(ctx context.Context, onlineWindow time.Duration, nodeIDs storj.NodeIDList) (nodes []*pb.Node, err error) {
defer mon.Task()(&ctx)(&err)
if len(nodeIDs) == 0 {
return nil, Error.New("no ids provided")
}
// get online nodes
rows, err := cache.db.Query(cache.db.Rebind(`
SELECT id, last_net, address, protocol FROM nodes
WHERE id = any($1::bytea[])
AND disqualified IS NULL
AND last_contact_success > $2
`), postgresNodeIDList(nodeIDs), time.Now().Add(-onlineWindow),
)
if err != nil {
return nil, err
}
defer func() { err = errs.Combine(err, rows.Close()) }()
for rows.Next() {
row := &dbx.Id_LastNet_Address_Protocol_Row{}
err = rows.Scan(&row.Id, &row.LastNet, &row.Address, &row.Protocol)
if err != nil {
return nil, err
}
node, err := convertDBNodeToPBNode(ctx, row)
if err != nil {
return nil, err
}
nodes = append(nodes, node)
}
return nodes, nil
}
// Reliable returns all reliable nodes. // Reliable returns all reliable nodes.
func (cache *overlaycache) Reliable(ctx context.Context, criteria *overlay.NodeCriteria) (nodes storj.NodeIDList, err error) { func (cache *overlaycache) Reliable(ctx context.Context, criteria *overlay.NodeCriteria) (nodes storj.NodeIDList, err error) {
// get reliable and online nodes // get reliable and online nodes