satellite/overlay: use DownloadSelectionCache for getting node IPs
Change-Id: Ib8f4eedb2bf465767050693a1e961b37a294ca06
This commit is contained in:
parent
54e01d37f9
commit
b7a0739219
@ -22,8 +22,6 @@ type State struct {
|
|||||||
stats Stats
|
stats Stats
|
||||||
// netByID returns subnet based on storj.NodeID
|
// netByID returns subnet based on storj.NodeID
|
||||||
netByID map[storj.NodeID]string
|
netByID map[storj.NodeID]string
|
||||||
// ipPortByID returns IP based on storj.NodeID
|
|
||||||
ipPortByID map[storj.NodeID]string
|
|
||||||
// nonDistinct contains selectors for non-distinct selection.
|
// nonDistinct contains selectors for non-distinct selection.
|
||||||
nonDistinct struct {
|
nonDistinct struct {
|
||||||
Reputable SelectByID
|
Reputable SelectByID
|
||||||
@ -59,14 +57,11 @@ func NewState(reputableNodes, newNodes []*Node) *State {
|
|||||||
state := &State{}
|
state := &State{}
|
||||||
|
|
||||||
state.netByID = map[storj.NodeID]string{}
|
state.netByID = map[storj.NodeID]string{}
|
||||||
state.ipPortByID = map[storj.NodeID]string{}
|
|
||||||
for _, node := range reputableNodes {
|
for _, node := range reputableNodes {
|
||||||
state.netByID[node.ID] = node.LastNet
|
state.netByID[node.ID] = node.LastNet
|
||||||
state.ipPortByID[node.ID] = node.LastIPPort
|
|
||||||
}
|
}
|
||||||
for _, node := range newNodes {
|
for _, node := range newNodes {
|
||||||
state.netByID[node.ID] = node.LastNet
|
state.netByID[node.ID] = node.LastNet
|
||||||
state.ipPortByID[node.ID] = node.LastIPPort
|
|
||||||
}
|
}
|
||||||
|
|
||||||
state.nonDistinct.Reputable = SelectByID(reputableNodes)
|
state.nonDistinct.Reputable = SelectByID(reputableNodes)
|
||||||
@ -140,18 +135,6 @@ func (state *State) Select(ctx context.Context, request Request) (_ []*Node, err
|
|||||||
return selected, nil
|
return selected, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// IPs returns node ip:port for nodes that are in state.
|
|
||||||
func (state *State) IPs(ctx context.Context, nodes []storj.NodeID) map[storj.NodeID]string {
|
|
||||||
defer mon.Task()(&ctx)(nil)
|
|
||||||
xs := make(map[storj.NodeID]string, len(nodes))
|
|
||||||
for _, nodeID := range nodes {
|
|
||||||
if ip, exists := state.ipPortByID[nodeID]; exists {
|
|
||||||
xs[nodeID] = ip
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return xs
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stats returns state information.
|
// Stats returns state information.
|
||||||
func (state *State) Stats() Stats {
|
func (state *State) Stats() Stats {
|
||||||
state.mu.RLock()
|
state.mu.RLock()
|
||||||
|
@ -203,27 +203,3 @@ next:
|
|||||||
|
|
||||||
return xs
|
return xs
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestState_IPs(t *testing.T) {
|
|
||||||
ctx := testcontext.New(t)
|
|
||||||
defer ctx.Cleanup()
|
|
||||||
|
|
||||||
reputableNodes := createRandomNodes(2, "1.0.1")
|
|
||||||
newNodes := createRandomNodes(2, "1.0.3")
|
|
||||||
|
|
||||||
state := nodeselection.NewState(reputableNodes, newNodes)
|
|
||||||
|
|
||||||
nodeIPs := state.IPs(ctx, nil)
|
|
||||||
require.Equal(t, map[storj.NodeID]string{}, nodeIPs)
|
|
||||||
|
|
||||||
missing := storj.NodeID{}
|
|
||||||
nodeIPs = state.IPs(ctx, []storj.NodeID{
|
|
||||||
reputableNodes[0].ID,
|
|
||||||
newNodes[1].ID,
|
|
||||||
missing,
|
|
||||||
})
|
|
||||||
require.Equal(t, map[storj.NodeID]string{
|
|
||||||
reputableNodes[0].ID: "1.0.1.0:8080",
|
|
||||||
newNodes[1].ID: "1.0.3.1:8080",
|
|
||||||
}, nodeIPs)
|
|
||||||
}
|
|
||||||
|
@ -324,7 +324,7 @@ func (service *Service) GetOnlineNodesForGetDelete(ctx context.Context, nodeIDs
|
|||||||
// GetNodeIPs returns a map of node ip:port for the supplied nodeIDs.
|
// GetNodeIPs returns a map of node ip:port for the supplied nodeIDs.
|
||||||
func (service *Service) GetNodeIPs(ctx context.Context, nodeIDs []storj.NodeID) (_ map[storj.NodeID]string, err error) {
|
func (service *Service) GetNodeIPs(ctx context.Context, nodeIDs []storj.NodeID) (_ map[storj.NodeID]string, err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
return service.UploadSelectionCache.GetNodeIPs(ctx, nodeIDs)
|
return service.DownloadSelectionCache.GetNodeIPs(ctx, nodeIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsOnline checks if a node is 'online' based on the collected statistics.
|
// IsOnline checks if a node is 'online' based on the collected statistics.
|
||||||
|
@ -118,26 +118,6 @@ func (cache *UploadSelectionCache) GetNodes(ctx context.Context, req FindStorage
|
|||||||
return convNodesToSelectedNodes(selected), err
|
return convNodesToSelectedNodes(selected), err
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetNodeIPs gets the last node ip:port from the cache, refreshing when needed.
|
|
||||||
func (cache *UploadSelectionCache) GetNodeIPs(ctx context.Context, nodes []storj.NodeID) (_ map[storj.NodeID]string, err error) {
|
|
||||||
defer mon.Task()(&ctx)(&err)
|
|
||||||
|
|
||||||
cache.mu.RLock()
|
|
||||||
lastRefresh := cache.lastRefresh
|
|
||||||
state := cache.state
|
|
||||||
cache.mu.RUnlock()
|
|
||||||
|
|
||||||
// if the cache is stale, then refresh it before we get nodes
|
|
||||||
if state == nil || time.Since(lastRefresh) > cache.staleness {
|
|
||||||
state, err = cache.refresh(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return state.IPs(ctx, nodes), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Size returns how many reputable nodes and new nodes are in the cache.
|
// Size returns how many reputable nodes and new nodes are in the cache.
|
||||||
func (cache *UploadSelectionCache) Size() (reputableNodeCount int, newNodeCount int) {
|
func (cache *UploadSelectionCache) Size() (reputableNodeCount int, newNodeCount int) {
|
||||||
cache.mu.RLock()
|
cache.mu.RLock()
|
||||||
|
Loading…
Reference in New Issue
Block a user