From 85fb964afefc151cb9b667a4eee5592252363e31 Mon Sep 17 00:00:00 2001 From: Egon Elbre Date: Wed, 13 Jan 2021 15:59:05 +0200 Subject: [PATCH] satellite/{metainfo,overlay}: improvements to GetObjectIPs * Deduplicate NodeID list prior to fetching IPs. * Use NodeSelectionCache for fetching reliable IPs. * Return number of segements, reliable pieces and all pieces. Change-Id: I13e679caab275488b4037624b840a4068dad9589 --- satellite/metainfo/metainfo.go | 34 ++++++++++++++++++------- satellite/nodeselection/state.go | 17 +++++++++++++ satellite/nodeselection/state_test.go | 24 +++++++++++++++++ satellite/overlay/nodeselectioncache.go | 20 +++++++++++++++ satellite/overlay/service.go | 6 +++++ 5 files changed, 92 insertions(+), 9 deletions(-) diff --git a/satellite/metainfo/metainfo.go b/satellite/metainfo/metainfo.go index 3b7f18a6d..eec74938f 100644 --- a/satellite/metainfo/metainfo.go +++ b/satellite/metainfo/metainfo.go @@ -1086,11 +1086,11 @@ func (endpoint *Endpoint) GetObjectIPs(ctx context.Context, req *pb.ObjectGetIPs return nil, err } - var nodeIDs []storj.NodeID + pieceCountByNodeID := map[storj.NodeID]int64{} addPointerToNodeIDs := func(pointer *pb.Pointer) { if pointer.Remote != nil { for _, piece := range pointer.Remote.RemotePieces { - nodeIDs = append(nodeIDs, piece.NodeId) + pieceCountByNodeID[piece.NodeId]++ } } } @@ -1129,20 +1129,36 @@ func (endpoint *Endpoint) GetObjectIPs(ctx context.Context, req *pb.ObjectGetIPs addPointerToNodeIDs(pointer) } - nodes, err := endpoint.overlay.GetOnlineNodesForGetDelete(ctx, nodeIDs) + nodeIDs := make([]storj.NodeID, 0, len(pieceCountByNodeID)) + for nodeID := range pieceCountByNodeID { + nodeIDs = append(nodeIDs, nodeID) + } + + nodeIPMap, err := endpoint.overlay.GetNodeIPs(ctx, nodeIDs) if err != nil { return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) } - resp = &pb.ObjectGetIPsResponse{} - for _, node := range nodes { - address := node.Address.GetAddress() - if address != "" { - resp.Ips = append(resp.Ips, []byte(address)) + nodeIPs := make([][]byte, 0, len(nodeIPMap)) + pieceCount := int64(0) + reliablePieceCount := int64(0) + for nodeID, count := range pieceCountByNodeID { + pieceCount += count + + ip, reliable := nodeIPMap[nodeID] + if !reliable { + continue } + nodeIPs = append(nodeIPs, []byte(ip)) + reliablePieceCount += count } - return resp, nil + return &pb.ObjectGetIPsResponse{ + Ips: nodeIPs, + SegmentCount: streamMeta.NumberOfSegments, + ReliablePieceCount: reliablePieceCount, + PieceCount: pieceCount, + }, nil } // BeginSegment begins segment uploading. diff --git a/satellite/nodeselection/state.go b/satellite/nodeselection/state.go index 0cdaa19a9..6813d8485 100644 --- a/satellite/nodeselection/state.go +++ b/satellite/nodeselection/state.go @@ -22,6 +22,8 @@ type State struct { stats Stats // netByID returns subnet based on storj.NodeID netByID map[storj.NodeID]string + // ipPortByID returns IP based on storj.NodeID + ipPortByID map[storj.NodeID]string // nonDistinct contains selectors for non-distinct selection. nonDistinct struct { Reputable SelectByID @@ -57,11 +59,14 @@ func NewState(reputableNodes, newNodes []*Node) *State { state := &State{} state.netByID = map[storj.NodeID]string{} + state.ipPortByID = map[storj.NodeID]string{} for _, node := range reputableNodes { state.netByID[node.ID] = node.LastNet + state.ipPortByID[node.ID] = node.LastIPPort } for _, node := range newNodes { state.netByID[node.ID] = node.LastNet + state.ipPortByID[node.ID] = node.LastIPPort } state.nonDistinct.Reputable = SelectByID(reputableNodes) @@ -135,6 +140,18 @@ func (state *State) Select(ctx context.Context, request Request) (_ []*Node, err return selected, nil } +// IPs returns node ip:port for nodes that are in state. +func (state *State) IPs(ctx context.Context, nodes []storj.NodeID) map[storj.NodeID]string { + defer mon.Task()(&ctx)(nil) + xs := make(map[storj.NodeID]string, len(nodes)) + for _, nodeID := range nodes { + if ip, exists := state.ipPortByID[nodeID]; exists { + xs[nodeID] = ip + } + } + return xs +} + // Stats returns state information. func (state *State) Stats() Stats { state.mu.RLock() diff --git a/satellite/nodeselection/state_test.go b/satellite/nodeselection/state_test.go index 5e861939b..1acf2aeee 100644 --- a/satellite/nodeselection/state_test.go +++ b/satellite/nodeselection/state_test.go @@ -203,3 +203,27 @@ next: return xs } + +func TestState_IPs(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + reputableNodes := createRandomNodes(2, "1.0.1") + newNodes := createRandomNodes(2, "1.0.3") + + state := nodeselection.NewState(reputableNodes, newNodes) + + nodeIPs := state.IPs(ctx, nil) + require.Equal(t, map[storj.NodeID]string{}, nodeIPs) + + missing := storj.NodeID{} + nodeIPs = state.IPs(ctx, []storj.NodeID{ + reputableNodes[0].ID, + newNodes[1].ID, + missing, + }) + require.Equal(t, map[storj.NodeID]string{ + reputableNodes[0].ID: "1.0.1.0:8080", + newNodes[1].ID: "1.0.3.1:8080", + }, nodeIPs) +} diff --git a/satellite/overlay/nodeselectioncache.go b/satellite/overlay/nodeselectioncache.go index 30bfaa9a8..15deead53 100644 --- a/satellite/overlay/nodeselectioncache.go +++ b/satellite/overlay/nodeselectioncache.go @@ -118,6 +118,26 @@ func (cache *NodeSelectionCache) GetNodes(ctx context.Context, req FindStorageNo return convNodesToSelectedNodes(selected), err } +// GetNodeIPs gets the last node ip:port from the cache, refreshing when needed. +func (cache *NodeSelectionCache) GetNodeIPs(ctx context.Context, nodes []storj.NodeID) (_ map[storj.NodeID]string, err error) { + defer mon.Task()(&ctx)(&err) + + cache.mu.RLock() + lastRefresh := cache.lastRefresh + state := cache.state + cache.mu.RUnlock() + + // if the cache is stale, then refresh it before we get nodes + if state == nil || time.Since(lastRefresh) > cache.staleness { + state, err = cache.refresh(ctx) + if err != nil { + return nil, err + } + } + + return state.IPs(ctx, nodes), nil +} + // Size returns how many reputable nodes and new nodes are in the cache. func (cache *NodeSelectionCache) Size() (reputableNodeCount int, newNodeCount int) { cache.mu.RLock() diff --git a/satellite/overlay/service.go b/satellite/overlay/service.go index 4ba3ccc3f..b9a55bff1 100644 --- a/satellite/overlay/service.go +++ b/satellite/overlay/service.go @@ -313,6 +313,12 @@ func (service *Service) GetOnlineNodesForGetDelete(ctx context.Context, nodeIDs return service.db.GetOnlineNodesForGetDelete(ctx, nodeIDs, service.config.Node.OnlineWindow) } +// GetNodeIPs returns a map of node ip:port for the supplied nodeIDs. +func (service *Service) GetNodeIPs(ctx context.Context, nodeIDs []storj.NodeID) (_ map[storj.NodeID]string, err error) { + defer mon.Task()(&ctx)(&err) + return service.SelectionCache.GetNodeIPs(ctx, nodeIDs) +} + // IsOnline checks if a node is 'online' based on the collected statistics. func (service *Service) IsOnline(node *NodeDossier) bool { return time.Since(node.Reputation.LastContactSuccess) < service.config.Node.OnlineWindow