From 16abf02b35bf348f1255a75309972323cff0035a Mon Sep 17 00:00:00 2001 From: Egon Elbre Date: Mon, 18 May 2020 18:10:17 +0300 Subject: [PATCH] satellite/{nodeselection,overlay}: use the new package Change-Id: I034fdbe578dec2e5c906aca82231cd3e56f26aeb --- satellite/nodeselection/state.go | 2 +- satellite/overlay/nodeselectioncache.go | 151 +++++++----------------- satellite/overlay/selection_test.go | 7 +- 3 files changed, 47 insertions(+), 113 deletions(-) diff --git a/satellite/nodeselection/state.go b/satellite/nodeselection/state.go index 94a9dda3d..0cdaa19a9 100644 --- a/satellite/nodeselection/state.go +++ b/satellite/nodeselection/state.go @@ -53,7 +53,7 @@ type Selector interface { } // NewState returns a state based on the input. -func NewState(newNodes, reputableNodes []*Node) *State { +func NewState(reputableNodes, newNodes []*Node) *State { state := &State{} state.netByID = map[storj.NodeID]string{} diff --git a/satellite/overlay/nodeselectioncache.go b/satellite/overlay/nodeselectioncache.go index 608cf96bb..e4ca3ef3a 100644 --- a/satellite/overlay/nodeselectioncache.go +++ b/satellite/overlay/nodeselectioncache.go @@ -5,11 +5,12 @@ package overlay import ( "context" - "math/rand" "sync" "time" "go.uber.org/zap" + + "storj.io/storj/satellite/nodeselection" ) // CacheDB implements the database for overlay node selection cache @@ -35,16 +36,9 @@ type NodeSelectionCache struct { selectionConfig NodeSelectionConfig staleness time.Duration - mu sync.RWMutex - data *state -} - -type state struct { + mu sync.RWMutex lastRefresh time.Time - - mu sync.RWMutex - reputableNodes []*SelectedNode - newNodes []*SelectedNode + state *nodeselection.State } // NewNodeSelectionCache creates a new cache that keeps a list of all the storage nodes that are qualified to store data @@ -54,7 +48,6 @@ func NewNodeSelectionCache(log *zap.Logger, db CacheDB, staleness time.Duration, db: db, staleness: staleness, selectionConfig: config, - data: &state{}, } } @@ -69,28 +62,26 @@ func (cache *NodeSelectionCache) Refresh(ctx context.Context) (err error) { // refresh calls out to the database and refreshes the cache with the most up-to-date // data from the nodes table, then sets time that the last refresh occurred so we know when // to refresh again in the future -func (cache *NodeSelectionCache) refresh(ctx context.Context) (cachData *state, err error) { +func (cache *NodeSelectionCache) refresh(ctx context.Context) (state *nodeselection.State, err error) { defer mon.Task()(&ctx)(&err) cache.mu.Lock() defer cache.mu.Unlock() - if cache.data != nil && time.Since(cache.data.lastRefresh) <= cache.staleness { - return cache.data, nil + if cache.state != nil && time.Since(cache.lastRefresh) <= cache.staleness { + return cache.state, nil } reputableNodes, newNodes, err := cache.db.SelectAllStorageNodesUpload(ctx, cache.selectionConfig) if err != nil { - return cache.data, err - } - cache.data = &state{ - lastRefresh: time.Now().UTC(), - reputableNodes: reputableNodes, - newNodes: newNodes, + return cache.state, err } + cache.lastRefresh = time.Now().UTC() + cache.state = nodeselection.NewState(convSelectedNodesToNodes(reputableNodes), convSelectedNodesToNodes(newNodes)) + mon.IntVal("refresh_cache_size_reputable").Observe(int64(len(reputableNodes))) mon.IntVal("refresh_cache_size_new").Observe(int64(len(newNodes))) - return cache.data, nil + return cache.state, nil } // GetNodes selects nodes from the cache that will be used to upload a file. @@ -100,107 +91,55 @@ func (cache *NodeSelectionCache) GetNodes(ctx context.Context, req FindStorageNo defer mon.Task()(&ctx)(&err) cache.mu.RLock() - cacheData := cache.data + lastRefresh := cache.lastRefresh + state := cache.state cache.mu.RUnlock() // if the cache is stale, then refresh it before we get nodes - if time.Since(cacheData.lastRefresh) > cache.staleness { - cacheData, err = cache.refresh(ctx) + if state == nil || time.Since(lastRefresh) > cache.staleness { + state, err = cache.refresh(ctx) if err != nil { return nil, err } } - return cacheData.GetNodes(ctx, req, cache.selectionConfig.NewNodeFraction, cache.selectionConfig.DistinctIP) -} - -// GetNodes selects nodes from the cache that will be used to upload a file. -// -// If there are new nodes in the cache, we will return a small fraction of those -// and then return mostly reputable nodes. -// -// Distinct determines whether the nodes have to be from distinct networks. -func (cacheData *state) GetNodes(ctx context.Context, req FindStorageNodesRequest, newNodeFraction float64, distinct bool) (_ []*SelectedNode, err error) { - defer mon.Task()(&ctx)(&err) - - cacheData.mu.RLock() - defer cacheData.mu.RUnlock() - - // how many reputableNodes versus newNode nodes should be selected - totalcount := req.RequestedCount - newNodeCount := int(float64(totalcount) * newNodeFraction) - - var selectedNodeResults = []*SelectedNode{} - var distinctNetworks = map[string]struct{}{} - - // Get a random selection of new nodes out of the cache first so that if there aren't - // enough new nodes on the network, we can fall back to using reputable nodes instead - randomIndexes := rand.Perm(len(cacheData.newNodes)) -nextNewNode: - for _, idx := range randomIndexes { - currNode := cacheData.newNodes[idx] - for _, excludedID := range req.ExcludedIDs { - if excludedID == currNode.ID { - continue nextNewNode - } - } - - if distinct { - // don't select a node if we've already selected another node from the same network - if _, ok := distinctNetworks[currNode.LastNet]; ok { - continue nextNewNode - } - distinctNetworks[currNode.LastNet] = struct{}{} - } - - selectedNodeResults = append(selectedNodeResults, currNode.Clone()) - if len(selectedNodeResults) >= newNodeCount { - break - } + selected, err := state.Select(ctx, nodeselection.Request{ + Count: req.RequestedCount, + NewFraction: cache.selectionConfig.NewNodeFraction, + Distinct: cache.selectionConfig.DistinctIP, + ExcludedIDs: req.ExcludedIDs, + }) + if nodeselection.ErrNotEnoughNodes.Has(err) { + err = ErrNotEnoughNodes.Wrap(err) } - randomIndexes = rand.Perm(len(cacheData.reputableNodes)) -nextReputableNode: - for _, idx := range randomIndexes { - currNode := cacheData.reputableNodes[idx] - - // don't select a node listed in the excluded list - for _, excludedID := range req.ExcludedIDs { - if excludedID == currNode.ID { - continue nextReputableNode - } - } - - if distinct { - // don't select a node if we've already selected another node from the same network - if _, ok := distinctNetworks[currNode.LastNet]; ok { - continue nextReputableNode - } - distinctNetworks[currNode.LastNet] = struct{}{} - } - - selectedNodeResults = append(selectedNodeResults, currNode.Clone()) - if len(selectedNodeResults) >= totalcount { - break - } - } - - if len(selectedNodeResults) < totalcount { - return selectedNodeResults, ErrNotEnoughNodes.New("requested from cache %d found %d", totalcount, len(selectedNodeResults)) - } - return selectedNodeResults, nil + return convNodesToSelectedNodes(selected), err } // Size returns how many reputable nodes and new nodes are in the cache func (cache *NodeSelectionCache) Size() (reputableNodeCount int, newNodeCount int) { cache.mu.RLock() - cacheData := cache.data + state := cache.state cache.mu.RUnlock() - return cacheData.size() + + if state == nil { + return 0, 0 + } + + stats := state.Stats() + return stats.Reputable, stats.New } -func (cacheData *state) size() (reputableNodeCount int, newNodeCount int) { - cacheData.mu.RLock() - defer cacheData.mu.RUnlock() - return len(cacheData.reputableNodes), len(cacheData.newNodes) +func convNodesToSelectedNodes(nodes []*nodeselection.Node) (xs []*SelectedNode) { + for _, n := range nodes { + xs = append(xs, (*SelectedNode)(n)) + } + return xs +} + +func convSelectedNodesToNodes(nodes []*SelectedNode) (xs []*nodeselection.Node) { + for _, n := range nodes { + xs = append(xs, (*nodeselection.Node)(n)) + } + return xs } diff --git a/satellite/overlay/selection_test.go b/satellite/overlay/selection_test.go index 228a8f7b1..ad893fa6b 100644 --- a/satellite/overlay/selection_test.go +++ b/satellite/overlay/selection_test.go @@ -573,12 +573,7 @@ func TestFindStorageNodesDistinctNetworks(t *testing.T) { require.Equal(t, len(n), len(n1)) n2, err = satellite.Overlay.Service.SelectionCache.GetNodes(ctx, req) require.Error(t, err) - // GetNodes returns 1 more node than FindStorageNodesWithPreferences because of the way the queries are... - // FindStorageNodesWithPreferences gets the IPs for the excludedNodeIDs and excludes all those IPs from the selection - // (which results in filtering out any node on the same network as a excludedNodeID), - // but the selection cache only filters IPs at time of selection which makes it so that it can include a node that shares a network - // with an exclueded ID - require.Equal(t, len(n1)+1, len(n2)) + require.Equal(t, len(n1), len(n2)) }) }