satellite/overlay: use UploadSelectionCache for GetNodesNetworkInOrder
The query for GetNodesNetworkInOrder is causing far too much load on the database. Since it is not critical that the repair checker have perfectly up-to-date node network information, we can use a cache instead. Change-Id: I07ad45bfdeb46529da093941a06c2da8a00ce878
This commit is contained in:
parent
e9fc4e9d0f
commit
75d10fe4fa
@ -324,6 +324,9 @@ func (system *Satellite) Run(ctx context.Context) (err error) {
|
||||
group.Go(func() error {
|
||||
return errs2.IgnoreCanceled(system.GCBF.Run(ctx))
|
||||
})
|
||||
group.Go(func() error {
|
||||
return errs2.IgnoreCanceled(system.RangedLoop.Run(ctx))
|
||||
})
|
||||
return group.Wait()
|
||||
}
|
||||
|
||||
|
@ -129,6 +129,22 @@ func (state *State) Select(ctx context.Context, request Request) (_ []*Node, err
|
||||
return selected, nil
|
||||
}
|
||||
|
||||
// GetNodesNetwork returns the cached network for each given node ID.
|
||||
func (state *State) GetNodesNetwork(ctx context.Context, nodeIDs []storj.NodeID) (nets []string) {
|
||||
defer mon.Task()(&ctx)(nil)
|
||||
|
||||
state.mu.RLock()
|
||||
defer state.mu.RUnlock()
|
||||
|
||||
nets = make([]string, len(nodeIDs))
|
||||
for i, nodeID := range nodeIDs {
|
||||
if net, ok := state.netByID[nodeID]; ok {
|
||||
nets[i] = net
|
||||
}
|
||||
}
|
||||
return nets
|
||||
}
|
||||
|
||||
// Stats returns state information.
|
||||
func (state *State) Stats() Stats {
|
||||
state.mu.RLock()
|
||||
|
@ -125,7 +125,7 @@ type DB interface {
|
||||
TestVetNode(ctx context.Context, nodeID storj.NodeID) (vettedTime *time.Time, err error)
|
||||
// TestUnvetNode directly sets a node's vetted_at timestamp to null to make testing easier
|
||||
TestUnvetNode(ctx context.Context, nodeID storj.NodeID) (err error)
|
||||
// TestVetNode directly sets a node's offline_suspended timestamp to make testing easier
|
||||
// TestSuspendNodeOffline directly sets a node's offline_suspended timestamp to make testing easier
|
||||
TestSuspendNodeOffline(ctx context.Context, nodeID storj.NodeID, suspendedAt time.Time) (err error)
|
||||
// TestNodeCountryCode sets node country code.
|
||||
TestNodeCountryCode(ctx context.Context, nodeID storj.NodeID, countryCode string) (err error)
|
||||
@ -430,7 +430,7 @@ func (service *Service) IsOnline(node *NodeDossier) bool {
|
||||
// requested node is not in the database, an empty string will be returned corresponding
|
||||
// to that node's last_net.
|
||||
func (service *Service) GetNodesNetworkInOrder(ctx context.Context, nodeIDs []storj.NodeID) (lastNets []string, err error) {
|
||||
return service.db.GetNodesNetworkInOrder(ctx, nodeIDs)
|
||||
return service.UploadSelectionCache.GetNodesNetwork(ctx, nodeIDs)
|
||||
}
|
||||
|
||||
// FindStorageNodesForGracefulExit searches the overlay network for nodes that meet the provided requirements for graceful-exit requests.
|
||||
@ -932,6 +932,11 @@ func (service *Service) TestNodeCountryCode(ctx context.Context, nodeID storj.No
|
||||
return nil
|
||||
}
|
||||
|
||||
// TestRefreshUploadSelectionCache refreshes the upload selection cache.
|
||||
func (service *Service) TestRefreshUploadSelectionCache(ctx context.Context) (err error) {
|
||||
return service.UploadSelectionCache.Refresh(ctx)
|
||||
}
|
||||
|
||||
func (service *Service) insertReputationNodeEvents(ctx context.Context, email string, id storj.NodeID, repEvents []nodeevents.Type) {
|
||||
defer mon.Task()(&ctx)(nil)
|
||||
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/pb"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/sync2"
|
||||
"storj.io/storj/satellite/nodeselection/uploadselection"
|
||||
)
|
||||
@ -116,6 +117,15 @@ func (cache *UploadSelectionCache) Size(ctx context.Context) (reputableNodeCount
|
||||
return stats.Reputable, stats.New, nil
|
||||
}
|
||||
|
||||
// GetNodesNetwork returns the cached network for each given node ID.
|
||||
func (cache *UploadSelectionCache) GetNodesNetwork(ctx context.Context, nodeIDs []storj.NodeID) (nets []string, err error) {
|
||||
state, err := cache.cache.Get(ctx, time.Now())
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
return state.GetNodesNetwork(ctx, nodeIDs), nil
|
||||
}
|
||||
|
||||
func convNodesToSelectedNodes(nodes []*uploadselection.Node) (xs []*SelectedNode) {
|
||||
for _, n := range nodes {
|
||||
xs = append(xs, &SelectedNode{
|
||||
|
@ -302,6 +302,7 @@ func TestCleanRepairQueueObserver(t *testing.T) {
|
||||
}
|
||||
|
||||
require.NoError(t, observer.RefreshReliabilityCache(ctx))
|
||||
require.NoError(t, planet.Satellites[0].RangedLoop.Overlay.Service.TestRefreshUploadSelectionCache(ctx))
|
||||
|
||||
// check that repair queue is empty to avoid false positive
|
||||
count, err := repairQueue.Count(ctx)
|
||||
@ -323,6 +324,7 @@ func TestCleanRepairQueueObserver(t *testing.T) {
|
||||
}
|
||||
|
||||
require.NoError(t, observer.RefreshReliabilityCache(ctx))
|
||||
require.NoError(t, planet.Satellites[0].RangedLoop.Overlay.Service.TestRefreshUploadSelectionCache(ctx))
|
||||
|
||||
// The checker will not insert/update the now healthy segments causing
|
||||
// them to be removed from the queue at the end of the checker iteration
|
||||
|
@ -3279,6 +3279,8 @@ func TestRepairClumpedPieces(t *testing.T) {
|
||||
}
|
||||
err = satellite.DB.OverlayCache().UpdateCheckIn(ctx, checkInInfo, time.Now().UTC(), overlay.NodeSelectionConfig{})
|
||||
require.NoError(t, err)
|
||||
err = satellite.RangedLoop.Overlay.Service.TestRefreshUploadSelectionCache(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// running repair checker again should put the segment into the repair queue
|
||||
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
|
||||
|
Loading…
Reference in New Issue
Block a user