From 75d10fe4fa01fae3e904da1f0a4c8b3246def0cf Mon Sep 17 00:00:00 2001 From: paul cannon Date: Tue, 16 May 2023 09:08:52 -0500 Subject: [PATCH] satellite/overlay: use UploadSelectionCache for GetNodesNetworkInOrder The query for GetNodesNetworkInOrder is causing far too much load on the database. Since it is not critical that the repair checker have perfectly up-to-date node network information, we can use a cache instead. Change-Id: I07ad45bfdeb46529da093941a06c2da8a00ce878 --- private/testplanet/satellite.go | 3 +++ satellite/nodeselection/uploadselection/state.go | 16 ++++++++++++++++ satellite/overlay/service.go | 9 +++++++-- satellite/overlay/uploadselection.go | 10 ++++++++++ satellite/repair/checker/observer_test.go | 2 ++ satellite/repair/repair_test.go | 2 ++ 6 files changed, 40 insertions(+), 2 deletions(-) diff --git a/private/testplanet/satellite.go b/private/testplanet/satellite.go index 5a4511d56..f3a7898e6 100644 --- a/private/testplanet/satellite.go +++ b/private/testplanet/satellite.go @@ -324,6 +324,9 @@ func (system *Satellite) Run(ctx context.Context) (err error) { group.Go(func() error { return errs2.IgnoreCanceled(system.GCBF.Run(ctx)) }) + group.Go(func() error { + return errs2.IgnoreCanceled(system.RangedLoop.Run(ctx)) + }) return group.Wait() } diff --git a/satellite/nodeselection/uploadselection/state.go b/satellite/nodeselection/uploadselection/state.go index ad8857743..30da0efb7 100644 --- a/satellite/nodeselection/uploadselection/state.go +++ b/satellite/nodeselection/uploadselection/state.go @@ -129,6 +129,22 @@ func (state *State) Select(ctx context.Context, request Request) (_ []*Node, err return selected, nil } +// GetNodesNetwork returns the cached network for each given node ID. +func (state *State) GetNodesNetwork(ctx context.Context, nodeIDs []storj.NodeID) (nets []string) { + defer mon.Task()(&ctx)(nil) + + state.mu.RLock() + defer state.mu.RUnlock() + + nets = make([]string, len(nodeIDs)) + for i, nodeID := range nodeIDs { + if net, ok := state.netByID[nodeID]; ok { + nets[i] = net + } + } + return nets +} + // Stats returns state information. func (state *State) Stats() Stats { state.mu.RLock() diff --git a/satellite/overlay/service.go b/satellite/overlay/service.go index a7c8674ae..695a81c37 100644 --- a/satellite/overlay/service.go +++ b/satellite/overlay/service.go @@ -125,7 +125,7 @@ type DB interface { TestVetNode(ctx context.Context, nodeID storj.NodeID) (vettedTime *time.Time, err error) // TestUnvetNode directly sets a node's vetted_at timestamp to null to make testing easier TestUnvetNode(ctx context.Context, nodeID storj.NodeID) (err error) - // TestVetNode directly sets a node's offline_suspended timestamp to make testing easier + // TestSuspendNodeOffline directly sets a node's offline_suspended timestamp to make testing easier TestSuspendNodeOffline(ctx context.Context, nodeID storj.NodeID, suspendedAt time.Time) (err error) // TestNodeCountryCode sets node country code. TestNodeCountryCode(ctx context.Context, nodeID storj.NodeID, countryCode string) (err error) @@ -430,7 +430,7 @@ func (service *Service) IsOnline(node *NodeDossier) bool { // requested node is not in the database, an empty string will be returned corresponding // to that node's last_net. func (service *Service) GetNodesNetworkInOrder(ctx context.Context, nodeIDs []storj.NodeID) (lastNets []string, err error) { - return service.db.GetNodesNetworkInOrder(ctx, nodeIDs) + return service.UploadSelectionCache.GetNodesNetwork(ctx, nodeIDs) } // FindStorageNodesForGracefulExit searches the overlay network for nodes that meet the provided requirements for graceful-exit requests. @@ -932,6 +932,11 @@ func (service *Service) TestNodeCountryCode(ctx context.Context, nodeID storj.No return nil } +// TestRefreshUploadSelectionCache refreshes the upload selection cache. +func (service *Service) TestRefreshUploadSelectionCache(ctx context.Context) (err error) { + return service.UploadSelectionCache.Refresh(ctx) +} + func (service *Service) insertReputationNodeEvents(ctx context.Context, email string, id storj.NodeID, repEvents []nodeevents.Type) { defer mon.Task()(&ctx)(nil) diff --git a/satellite/overlay/uploadselection.go b/satellite/overlay/uploadselection.go index dec3ebb61..306696059 100644 --- a/satellite/overlay/uploadselection.go +++ b/satellite/overlay/uploadselection.go @@ -10,6 +10,7 @@ import ( "go.uber.org/zap" "storj.io/common/pb" + "storj.io/common/storj" "storj.io/common/sync2" "storj.io/storj/satellite/nodeselection/uploadselection" ) @@ -116,6 +117,15 @@ func (cache *UploadSelectionCache) Size(ctx context.Context) (reputableNodeCount return stats.Reputable, stats.New, nil } +// GetNodesNetwork returns the cached network for each given node ID. +func (cache *UploadSelectionCache) GetNodesNetwork(ctx context.Context, nodeIDs []storj.NodeID) (nets []string, err error) { + state, err := cache.cache.Get(ctx, time.Now()) + if err != nil { + return nil, Error.Wrap(err) + } + return state.GetNodesNetwork(ctx, nodeIDs), nil +} + func convNodesToSelectedNodes(nodes []*uploadselection.Node) (xs []*SelectedNode) { for _, n := range nodes { xs = append(xs, &SelectedNode{ diff --git a/satellite/repair/checker/observer_test.go b/satellite/repair/checker/observer_test.go index f527c3d12..d5117a47c 100644 --- a/satellite/repair/checker/observer_test.go +++ b/satellite/repair/checker/observer_test.go @@ -302,6 +302,7 @@ func TestCleanRepairQueueObserver(t *testing.T) { } require.NoError(t, observer.RefreshReliabilityCache(ctx)) + require.NoError(t, planet.Satellites[0].RangedLoop.Overlay.Service.TestRefreshUploadSelectionCache(ctx)) // check that repair queue is empty to avoid false positive count, err := repairQueue.Count(ctx) @@ -323,6 +324,7 @@ func TestCleanRepairQueueObserver(t *testing.T) { } require.NoError(t, observer.RefreshReliabilityCache(ctx)) + require.NoError(t, planet.Satellites[0].RangedLoop.Overlay.Service.TestRefreshUploadSelectionCache(ctx)) // The checker will not insert/update the now healthy segments causing // them to be removed from the queue at the end of the checker iteration diff --git a/satellite/repair/repair_test.go b/satellite/repair/repair_test.go index 5f3aeacc5..60ce1853d 100644 --- a/satellite/repair/repair_test.go +++ b/satellite/repair/repair_test.go @@ -3279,6 +3279,8 @@ func TestRepairClumpedPieces(t *testing.T) { } err = satellite.DB.OverlayCache().UpdateCheckIn(ctx, checkInInfo, time.Now().UTC(), overlay.NodeSelectionConfig{}) require.NoError(t, err) + err = satellite.RangedLoop.Overlay.Service.TestRefreshUploadSelectionCache(ctx) + require.NoError(t, err) // running repair checker again should put the segment into the repair queue _, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)