satellite/{metainfo,overlay}: improvements to GetObjectIPs

* Deduplicate NodeID list prior to fetching IPs.
* Use NodeSelectionCache for fetching reliable IPs.
* Return number of segements, reliable pieces and all pieces.

Change-Id: I13e679caab275488b4037624b840a4068dad9589
This commit is contained in:
Egon Elbre 2021-01-13 15:59:05 +02:00
parent 889d2eaaea
commit 85fb964afe
5 changed files with 92 additions and 9 deletions

View File

@ -1086,11 +1086,11 @@ func (endpoint *Endpoint) GetObjectIPs(ctx context.Context, req *pb.ObjectGetIPs
return nil, err
}
var nodeIDs []storj.NodeID
pieceCountByNodeID := map[storj.NodeID]int64{}
addPointerToNodeIDs := func(pointer *pb.Pointer) {
if pointer.Remote != nil {
for _, piece := range pointer.Remote.RemotePieces {
nodeIDs = append(nodeIDs, piece.NodeId)
pieceCountByNodeID[piece.NodeId]++
}
}
}
@ -1129,20 +1129,36 @@ func (endpoint *Endpoint) GetObjectIPs(ctx context.Context, req *pb.ObjectGetIPs
addPointerToNodeIDs(pointer)
}
nodes, err := endpoint.overlay.GetOnlineNodesForGetDelete(ctx, nodeIDs)
nodeIDs := make([]storj.NodeID, 0, len(pieceCountByNodeID))
for nodeID := range pieceCountByNodeID {
nodeIDs = append(nodeIDs, nodeID)
}
nodeIPMap, err := endpoint.overlay.GetNodeIPs(ctx, nodeIDs)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
resp = &pb.ObjectGetIPsResponse{}
for _, node := range nodes {
address := node.Address.GetAddress()
if address != "" {
resp.Ips = append(resp.Ips, []byte(address))
nodeIPs := make([][]byte, 0, len(nodeIPMap))
pieceCount := int64(0)
reliablePieceCount := int64(0)
for nodeID, count := range pieceCountByNodeID {
pieceCount += count
ip, reliable := nodeIPMap[nodeID]
if !reliable {
continue
}
nodeIPs = append(nodeIPs, []byte(ip))
reliablePieceCount += count
}
return resp, nil
return &pb.ObjectGetIPsResponse{
Ips: nodeIPs,
SegmentCount: streamMeta.NumberOfSegments,
ReliablePieceCount: reliablePieceCount,
PieceCount: pieceCount,
}, nil
}
// BeginSegment begins segment uploading.

View File

@ -22,6 +22,8 @@ type State struct {
stats Stats
// netByID returns subnet based on storj.NodeID
netByID map[storj.NodeID]string
// ipPortByID returns IP based on storj.NodeID
ipPortByID map[storj.NodeID]string
// nonDistinct contains selectors for non-distinct selection.
nonDistinct struct {
Reputable SelectByID
@ -57,11 +59,14 @@ func NewState(reputableNodes, newNodes []*Node) *State {
state := &State{}
state.netByID = map[storj.NodeID]string{}
state.ipPortByID = map[storj.NodeID]string{}
for _, node := range reputableNodes {
state.netByID[node.ID] = node.LastNet
state.ipPortByID[node.ID] = node.LastIPPort
}
for _, node := range newNodes {
state.netByID[node.ID] = node.LastNet
state.ipPortByID[node.ID] = node.LastIPPort
}
state.nonDistinct.Reputable = SelectByID(reputableNodes)
@ -135,6 +140,18 @@ func (state *State) Select(ctx context.Context, request Request) (_ []*Node, err
return selected, nil
}
// IPs returns node ip:port for nodes that are in state.
func (state *State) IPs(ctx context.Context, nodes []storj.NodeID) map[storj.NodeID]string {
defer mon.Task()(&ctx)(nil)
xs := make(map[storj.NodeID]string, len(nodes))
for _, nodeID := range nodes {
if ip, exists := state.ipPortByID[nodeID]; exists {
xs[nodeID] = ip
}
}
return xs
}
// Stats returns state information.
func (state *State) Stats() Stats {
state.mu.RLock()

View File

@ -203,3 +203,27 @@ next:
return xs
}
func TestState_IPs(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
reputableNodes := createRandomNodes(2, "1.0.1")
newNodes := createRandomNodes(2, "1.0.3")
state := nodeselection.NewState(reputableNodes, newNodes)
nodeIPs := state.IPs(ctx, nil)
require.Equal(t, map[storj.NodeID]string{}, nodeIPs)
missing := storj.NodeID{}
nodeIPs = state.IPs(ctx, []storj.NodeID{
reputableNodes[0].ID,
newNodes[1].ID,
missing,
})
require.Equal(t, map[storj.NodeID]string{
reputableNodes[0].ID: "1.0.1.0:8080",
newNodes[1].ID: "1.0.3.1:8080",
}, nodeIPs)
}

View File

@ -118,6 +118,26 @@ func (cache *NodeSelectionCache) GetNodes(ctx context.Context, req FindStorageNo
return convNodesToSelectedNodes(selected), err
}
// GetNodeIPs gets the last node ip:port from the cache, refreshing when needed.
func (cache *NodeSelectionCache) GetNodeIPs(ctx context.Context, nodes []storj.NodeID) (_ map[storj.NodeID]string, err error) {
defer mon.Task()(&ctx)(&err)
cache.mu.RLock()
lastRefresh := cache.lastRefresh
state := cache.state
cache.mu.RUnlock()
// if the cache is stale, then refresh it before we get nodes
if state == nil || time.Since(lastRefresh) > cache.staleness {
state, err = cache.refresh(ctx)
if err != nil {
return nil, err
}
}
return state.IPs(ctx, nodes), nil
}
// Size returns how many reputable nodes and new nodes are in the cache.
func (cache *NodeSelectionCache) Size() (reputableNodeCount int, newNodeCount int) {
cache.mu.RLock()

View File

@ -313,6 +313,12 @@ func (service *Service) GetOnlineNodesForGetDelete(ctx context.Context, nodeIDs
return service.db.GetOnlineNodesForGetDelete(ctx, nodeIDs, service.config.Node.OnlineWindow)
}
// GetNodeIPs returns a map of node ip:port for the supplied nodeIDs.
func (service *Service) GetNodeIPs(ctx context.Context, nodeIDs []storj.NodeID) (_ map[storj.NodeID]string, err error) {
defer mon.Task()(&ctx)(&err)
return service.SelectionCache.GetNodeIPs(ctx, nodeIDs)
}
// IsOnline checks if a node is 'online' based on the collected statistics.
func (service *Service) IsOnline(node *NodeDossier) bool {
return time.Since(node.Reputation.LastContactSuccess) < service.config.Node.OnlineWindow