From 5ee1a00857f6b48952727735f751ba2a3a55cdbc Mon Sep 17 00:00:00 2001 From: Kaloyan Raev Date: Mon, 16 Dec 2019 15:45:13 +0200 Subject: [PATCH] satellite/overlay: filter reliable nodes from a list Adds the KnownReliable method to Overlay Service that filters all nodes from the given list to be only reliable nodes (online and qualified). The method return []*pb.Node of reliable nodes. The pb.Node values are ready for dialing. The first use case is when deleting an object to efficiently dial all reliable nodes holding a piece of that object and send them a delete request. Change-Id: I13e0a8666f3807c5c31ef1a1087476018a5d3acb --- satellite/overlay/service.go | 8 ++++ satellite/overlay/service_test.go | 64 +++++++++++++++++++++++++++ satellite/satellitedb/overlaycache.go | 36 +++++++++++++++ 3 files changed, 108 insertions(+) diff --git a/satellite/overlay/service.go b/satellite/overlay/service.go index b9c2624cd..e876eb9a5 100644 --- a/satellite/overlay/service.go +++ b/satellite/overlay/service.go @@ -47,6 +47,8 @@ type DB interface { KnownOffline(context.Context, *NodeCriteria, storj.NodeIDList) (storj.NodeIDList, error) // KnownUnreliableOrOffline filters a set of nodes to unhealth or offlines node, independent of new KnownUnreliableOrOffline(context.Context, *NodeCriteria, storj.NodeIDList) (storj.NodeIDList, error) + // KnownReliable filters a set of nodes to reliable (online and qualified) nodes. + KnownReliable(ctx context.Context, onlineWindow time.Duration, nodeIDs storj.NodeIDList) ([]*pb.Node, error) // Reliable returns all nodes that are reliable Reliable(context.Context, *NodeCriteria) (storj.NodeIDList, error) // Paginate will page through the database nodes @@ -339,6 +341,12 @@ func (service *Service) KnownUnreliableOrOffline(ctx context.Context, nodeIds st return service.db.KnownUnreliableOrOffline(ctx, criteria, nodeIds) } +// KnownReliable filters a set of nodes to reliable (online and qualified) nodes. +func (service *Service) KnownReliable(ctx context.Context, nodeIDs storj.NodeIDList) (nodes []*pb.Node, err error) { + defer mon.Task()(&ctx)(&err) + return service.db.KnownReliable(ctx, service.config.Node.OnlineWindow, nodeIDs) +} + // Reliable filters a set of nodes that are reliable, independent of new. func (service *Service) Reliable(ctx context.Context) (nodes storj.NodeIDList, err error) { defer mon.Task()(&ctx)(&err) diff --git a/satellite/overlay/service_test.go b/satellite/overlay/service_test.go index 8cad5c702..2f4488708 100644 --- a/satellite/overlay/service_test.go +++ b/satellite/overlay/service_test.go @@ -5,11 +5,13 @@ package overlay_test import ( "context" + "sort" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" "go.uber.org/zap/zaptest" "storj.io/storj/pkg/pb" @@ -312,6 +314,68 @@ func TestNodeInfo(t *testing.T) { }) } +func TestKnownReliable(t *testing.T) { + onlineWindow := 500 * time.Millisecond + + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: func(log *zap.Logger, index int, config *satellite.Config) { + config.Overlay.Node.OnlineWindow = onlineWindow + }, + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + satellite := planet.Satellites[0] + service := satellite.Overlay.Service + + // Disqualify storage node #0 + stats, err := service.UpdateStats(ctx, &overlay.UpdateRequest{ + NodeID: planet.StorageNodes[0].ID(), + AuditSuccess: false, + }) + require.NoError(t, err) + require.NotNil(t, stats.Disqualified) + + // Stop storage node #1 + err = planet.StopPeer(planet.StorageNodes[1]) + require.NoError(t, err) + _, err = service.UpdateUptime(ctx, planet.StorageNodes[1].ID(), false) + require.NoError(t, err) + + // Sleep for the duration of the online window and check that storage node #1 is offline + time.Sleep(onlineWindow) + node, err := service.Get(ctx, planet.StorageNodes[1].ID()) + require.NoError(t, err) + require.False(t, service.IsOnline(node)) + + // Force storage node #2 and #3 ping the satellite, so they can maintain their online status + planet.StorageNodes[2].Contact.Chore.TriggerWait(ctx) + planet.StorageNodes[3].Contact.Chore.TriggerWait(ctx) + + // Check that only storage nodes #2 and #3 are reliable + result, err := service.KnownReliable(ctx, []storj.NodeID{ + planet.StorageNodes[0].ID(), + planet.StorageNodes[1].ID(), + planet.StorageNodes[2].ID(), + planet.StorageNodes[3].ID(), + }) + require.NoError(t, err) + require.Len(t, result, 2) + + // Sort the storage nodes for predictable checks + expectedReliable := []pb.Node{planet.StorageNodes[2].Local().Node, planet.StorageNodes[3].Local().Node} + sort.Slice(expectedReliable, func(i, j int) bool { return expectedReliable[i].Id.Less(expectedReliable[j].Id) }) + sort.Slice(result, func(i, j int) bool { return result[i].Id.Less(result[j].Id) }) + + // Assert the reliable nodes are the expected ones + for i, node := range result { + assert.Equal(t, expectedReliable[i].Id, node.Id) + assert.Equal(t, expectedReliable[i].Address, node.Address) + assert.NotNil(t, node.LastIp) + } + }) +} + func TestUpdateCheckIn(t *testing.T) { satellitedbtest.Run(t, func(t *testing.T, db satellite.DB) { ctx := testcontext.New(t) diff --git a/satellite/satellitedb/overlaycache.go b/satellite/satellitedb/overlaycache.go index 165a93419..4f0885594 100644 --- a/satellite/satellitedb/overlaycache.go +++ b/satellite/satellitedb/overlaycache.go @@ -384,6 +384,42 @@ func (cache *overlaycache) KnownUnreliableOrOffline(ctx context.Context, criteri return badNodes, nil } +// KnownReliable filters a set of nodes to reliable (online and qualified) nodes. +func (cache *overlaycache) KnownReliable(ctx context.Context, onlineWindow time.Duration, nodeIDs storj.NodeIDList) (nodes []*pb.Node, err error) { + defer mon.Task()(&ctx)(&err) + + if len(nodeIDs) == 0 { + return nil, Error.New("no ids provided") + } + + // get online nodes + rows, err := cache.db.Query(cache.db.Rebind(` + SELECT id, last_net, address, protocol FROM nodes + WHERE id = any($1::bytea[]) + AND disqualified IS NULL + AND last_contact_success > $2 + `), postgresNodeIDList(nodeIDs), time.Now().Add(-onlineWindow), + ) + if err != nil { + return nil, err + } + defer func() { err = errs.Combine(err, rows.Close()) }() + + for rows.Next() { + row := &dbx.Id_LastNet_Address_Protocol_Row{} + err = rows.Scan(&row.Id, &row.LastNet, &row.Address, &row.Protocol) + if err != nil { + return nil, err + } + node, err := convertDBNodeToPBNode(ctx, row) + if err != nil { + return nil, err + } + nodes = append(nodes, node) + } + return nodes, nil +} + // Reliable returns all reliable nodes. func (cache *overlaycache) Reliable(ctx context.Context, criteria *overlay.NodeCriteria) (nodes storj.NodeIDList, err error) { // get reliable and online nodes