From 0e17b1018c87cc76a5195ff8b32606f117c09cfa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rton=20Elek?= Date: Tue, 1 Aug 2023 13:50:22 +0200 Subject: [PATCH] satellite/{nodeselection,overlay}: support annotations on node filters Change-Id: I844d8a25042750aae189175842113e2f052d5b17 --- satellite/nodeselection/filter.go | 35 ++++++++++++++ satellite/nodeselection/state.go | 51 ++++++++++++++------ satellite/overlay/downloadselection.go | 2 +- satellite/overlay/placement.go | 30 ++++++++---- satellite/overlay/placement_test.go | 12 +++++ satellite/overlay/uploadselection.go | 30 ++++++++++-- satellite/overlay/uploadselection_test.go | 59 +++++++++++++++++++++++ 7 files changed, 189 insertions(+), 30 deletions(-) diff --git a/satellite/nodeselection/filter.go b/satellite/nodeselection/filter.go index 518ac61b5..dc65c9230 100644 --- a/satellite/nodeselection/filter.go +++ b/satellite/nodeselection/filter.go @@ -15,6 +15,41 @@ type NodeFilter interface { MatchInclude(node *SelectedNode) bool } +// AnnotatedNodeFilter is just a NodeFilter with additional annotations. +type AnnotatedNodeFilter struct { + Filter NodeFilter + Annotations map[string]string +} + +// MatchInclude implements NodeFilter. +func (a AnnotatedNodeFilter) MatchInclude(node *SelectedNode) bool { + return a.Filter.MatchInclude(node) +} + +// WithAnnotation adds annotations to a NodeFilter. +func WithAnnotation(filter NodeFilter, name string, value string) NodeFilter { + if anf, ok := filter.(AnnotatedNodeFilter); ok { + anf.Annotations[name] = value + return anf + } + return AnnotatedNodeFilter{ + Filter: filter, + Annotations: map[string]string{ + name: value, + }, + } +} + +// GetAnnotation retrieves annotation from AnnotatedNodeFilter. +func GetAnnotation(filter NodeFilter, name string) string { + if annotated, ok := filter.(AnnotatedNodeFilter); ok { + return annotated.Annotations[name] + } + return "" +} + +var _ NodeFilter = AnnotatedNodeFilter{} + // NodeFilters is a collection of multiple node filters (all should vote with true). type NodeFilters []NodeFilter diff --git a/satellite/nodeselection/state.go b/satellite/nodeselection/state.go index 395159a51..6ff909150 100644 --- a/satellite/nodeselection/state.go +++ b/satellite/nodeselection/state.go @@ -21,17 +21,17 @@ type State struct { // netByID returns subnet based on storj.NodeID netByID map[storj.NodeID]string - // distinct contains selectors for distinct selection. - distinct struct { + + // byNetwork contains selectors for distinct selection. + byNetwork struct { Reputable SelectBySubnet New SelectBySubnet } -} -// Stats contains state information. -type Stats struct { - New int - Reputable int + byID struct { + Reputable SelectByID + New SelectByID + } } // Selector defines interface for selecting nodes. @@ -53,17 +53,32 @@ func NewState(reputableNodes, newNodes []*SelectedNode) *State { state.netByID[node.ID] = node.LastNet } - state.distinct.Reputable = SelectBySubnetFromNodes(reputableNodes) - state.distinct.New = SelectBySubnetFromNodes(newNodes) + state.byNetwork.Reputable = SelectBySubnetFromNodes(reputableNodes) + state.byNetwork.New = SelectBySubnetFromNodes(newNodes) + + state.byID.Reputable = SelectByID(reputableNodes) + state.byID.New = SelectByID(newNodes) return state } +// SelectionType defines how to select nodes randomly. +type SelectionType int8 + +const ( + // SelectionTypeByNetwork chooses subnets randomly, and one node from each subnet. + SelectionTypeByNetwork = iota + + // SelectionTypeByID chooses nodes randomly. + SelectionTypeByID +) + // Request contains arguments for State.Request. type Request struct { - Count int - NewFraction float64 - NodeFilters NodeFilters + Count int + NewFraction float64 + NodeFilters NodeFilters + SelectionType SelectionType } // Select selects requestedCount nodes where there will be newFraction nodes. @@ -81,8 +96,16 @@ func (state *State) Select(ctx context.Context, request Request) (_ []*SelectedN var reputableNodes Selector var newNodes Selector - reputableNodes = state.distinct.Reputable - newNodes = state.distinct.New + switch request.SelectionType { + case SelectionTypeByNetwork: + reputableNodes = state.byNetwork.Reputable + newNodes = state.byNetwork.New + case SelectionTypeByID: + reputableNodes = state.byID.Reputable + newNodes = state.byID.New + default: + return nil, errs.New("Unsupported selection type: %d", request.SelectionType) + } // Get a random selection of new nodes out of the cache first so that if there aren't // enough new nodes on the network, we can fall back to using reputable nodes instead. diff --git a/satellite/overlay/downloadselection.go b/satellite/overlay/downloadselection.go index 587b31a3c..765f2ca66 100644 --- a/satellite/overlay/downloadselection.go +++ b/satellite/overlay/downloadselection.go @@ -144,7 +144,7 @@ func (state *DownloadSelectionCacheState) IPs(nodes []storj.NodeID) map[storj.No } // FilteredIPs returns node ip:port for nodes that are in state. Results are filtered out.. -func (state *DownloadSelectionCacheState) FilteredIPs(nodes []storj.NodeID, filter nodeselection.NodeFilters) map[storj.NodeID]string { +func (state *DownloadSelectionCacheState) FilteredIPs(nodes []storj.NodeID, filter nodeselection.NodeFilter) map[storj.NodeID]string { xs := make(map[storj.NodeID]string, len(nodes)) for _, nodeID := range nodes { if n, exists := state.byID[nodeID]; exists && filter.MatchInclude(n) { diff --git a/satellite/overlay/placement.go b/satellite/overlay/placement.go index 514171cd5..7b01416e4 100644 --- a/satellite/overlay/placement.go +++ b/satellite/overlay/placement.go @@ -11,7 +11,6 @@ import ( "github.com/jtolio/mito" "github.com/spf13/pflag" "github.com/zeebo/errs" - "golang.org/x/exp/slices" "storj.io/common/storj" "storj.io/common/storj/location" @@ -19,11 +18,11 @@ import ( ) // PlacementRules can crate filter based on the placement identifier. -type PlacementRules func(constraint storj.PlacementConstraint) (filter nodeselection.NodeFilters) +type PlacementRules func(constraint storj.PlacementConstraint) (filter nodeselection.NodeFilter) // ConfigurablePlacementRule can include the placement definitions for each known identifier. type ConfigurablePlacementRule struct { - placements map[storj.PlacementConstraint]nodeselection.NodeFilters + placements map[storj.PlacementConstraint]nodeselection.NodeFilter } // String implements pflag.Value. @@ -42,7 +41,7 @@ func (d *ConfigurablePlacementRule) String() string { // Set implements pflag.Value. func (d *ConfigurablePlacementRule) Set(s string) error { if d.placements == nil { - d.placements = make(map[storj.PlacementConstraint]nodeselection.NodeFilters) + d.placements = make(map[storj.PlacementConstraint]nodeselection.NodeFilter) } d.AddLegacyStaticRules() return d.AddPlacementFromString(s) @@ -58,7 +57,7 @@ var _ pflag.Value = &ConfigurablePlacementRule{} // NewPlacementRules creates a fully initialized NewPlacementRules. func NewPlacementRules() *ConfigurablePlacementRule { return &ConfigurablePlacementRule{ - placements: map[storj.PlacementConstraint]nodeselection.NodeFilters{}, + placements: make(map[storj.PlacementConstraint]nodeselection.NodeFilter), } } @@ -72,8 +71,8 @@ func (d *ConfigurablePlacementRule) AddLegacyStaticRules() { } // AddPlacementRule registers a new placement. -func (d *ConfigurablePlacementRule) AddPlacementRule(id storj.PlacementConstraint, filters nodeselection.NodeFilters) { - d.placements[id] = filters +func (d *ConfigurablePlacementRule) AddPlacementRule(id storj.PlacementConstraint, filter nodeselection.NodeFilter) { + d.placements[id] = filter } // AddPlacementFromString parses placement definition form string representations from id:definition;id:definition;... @@ -116,6 +115,17 @@ func (d *ConfigurablePlacementRule) AddPlacementFromString(definitions string) e } return res, nil }, + "annotated": func(filter nodeselection.NodeFilter, kv map[string]string) (nodeselection.AnnotatedNodeFilter, error) { + return nodeselection.AnnotatedNodeFilter{ + Filter: filter, + Annotations: kv, + }, nil + }, + "annotation": func(key string, value string) (map[string]string, error) { + return map[string]string{ + key: value, + }, nil + }, } for _, definition := range strings.Split(definitions, ";") { definition = strings.TrimSpace(definition) @@ -132,18 +142,18 @@ func (d *ConfigurablePlacementRule) AddPlacementFromString(definitions string) e if err != nil { return errs.Wrap(err) } - d.placements[storj.PlacementConstraint(id)] = val.(nodeselection.NodeFilters) + d.placements[storj.PlacementConstraint(id)] = val.(nodeselection.NodeFilter) } return nil } // CreateFilters implements PlacementCondition. -func (d *ConfigurablePlacementRule) CreateFilters(constraint storj.PlacementConstraint) (filter nodeselection.NodeFilters) { +func (d *ConfigurablePlacementRule) CreateFilters(constraint storj.PlacementConstraint) (filter nodeselection.NodeFilter) { if constraint == storj.EveryCountry { return nodeselection.NodeFilters{} } if filters, found := d.placements[constraint]; found { - return slices.Clone(filters) + return filters } return nodeselection.NodeFilters{ nodeselection.ExcludeAllFilter{}, diff --git a/satellite/overlay/placement_test.go b/satellite/overlay/placement_test.go index 7f459ed1e..3312afca2 100644 --- a/satellite/overlay/placement_test.go +++ b/satellite/overlay/placement_test.go @@ -111,6 +111,18 @@ func TestPlacementFromString(t *testing.T) { CountryCode: location.Germany, })) + }) + t.Run("annotated", func(t *testing.T) { + p := NewPlacementRules() + err := p.AddPlacementFromString(`11:annotated(country("GB"),annotation("autoExcludeSubnet","off"))`) + require.NoError(t, err) + filters := p.placements[storj.PlacementConstraint(11)] + require.True(t, filters.MatchInclude(&nodeselection.SelectedNode{ + CountryCode: location.UnitedKingdom, + })) + + require.Equal(t, nodeselection.GetAnnotation(filters, "autoExcludeSubnet"), "off") + }) t.Run("legacy geofencing rules", func(t *testing.T) { diff --git a/satellite/overlay/uploadselection.go b/satellite/overlay/uploadselection.go index 120711de4..04dff4a5f 100644 --- a/satellite/overlay/uploadselection.go +++ b/satellite/overlay/uploadselection.go @@ -13,6 +13,14 @@ import ( "storj.io/storj/satellite/nodeselection" ) +const ( + // AutoExcludeSubnet is placement annotation key to turn off subnet restrictions. + AutoExcludeSubnet = "autoExcludeSubnet" + + // AutoExcludeSubnetOFF is the value of AutoExcludeSubnet to disable subnet restrictions. + AutoExcludeSubnetOFF = "off" +) + // UploadSelectionDB implements the database for upload selection cache. // // architecture: Database @@ -96,19 +104,31 @@ func (cache *UploadSelectionCache) GetNodes(ctx context.Context, req FindStorage return nil, Error.Wrap(err) } - filters := cache.placementRules(req.Placement) + placementRules := cache.placementRules(req.Placement) + useSubnetExclusion := nodeselection.GetAnnotation(placementRules, AutoExcludeSubnet) != AutoExcludeSubnetOFF + + filters := nodeselection.NodeFilters{placementRules} if len(req.ExcludedIDs) > 0 { - filters = append(filters, state.ExcludeNetworksBasedOnNodes(req.ExcludedIDs)) + if useSubnetExclusion { + filters = append(filters, state.ExcludeNetworksBasedOnNodes(req.ExcludedIDs)) + } else { + filters = append(filters, nodeselection.ExcludedIDs(req.ExcludedIDs)) + } } filters = append(filters, cache.defaultFilters) - filters = filters.WithAutoExcludeSubnets() - selected, err := state.Select(ctx, nodeselection.Request{ + selectionReq := nodeselection.Request{ Count: req.RequestedCount, NewFraction: cache.selectionConfig.NewNodeFraction, NodeFilters: filters, - }) + } + + if !useSubnetExclusion { + selectionReq.SelectionType = nodeselection.SelectionTypeByID + } + + selected, err := state.Select(ctx, selectionReq) if nodeselection.ErrNotEnoughNodes.Has(err) { err = ErrNotEnoughNodes.Wrap(err) } diff --git a/satellite/overlay/uploadselection_test.go b/satellite/overlay/uploadselection_test.go index 2d00bcbfa..5527a979b 100644 --- a/satellite/overlay/uploadselection_test.go +++ b/satellite/overlay/uploadselection_test.go @@ -215,6 +215,7 @@ func TestGetNodes(t *testing.T) { } placementRules := overlay.NewPlacementRules() placementRules.AddPlacementRule(storj.PlacementConstraint(5), nodeselection.NodeFilters{}.WithCountryFilter(location.NewSet(location.Germany))) + placementRules.AddPlacementRule(storj.PlacementConstraint(6), nodeselection.WithAnnotation(nodeselection.NodeFilters{}.WithCountryFilter(location.NewSet(location.Germany)), overlay.AutoExcludeSubnet, overlay.AutoExcludeSubnetOFF)) cache, err := overlay.NewUploadSelectionCache(zap.NewNop(), db.OverlayCache(), @@ -239,6 +240,7 @@ func TestGetNodes(t *testing.T) { t.Run("normal selection", func(t *testing.T) { t.Run("get 2", func(t *testing.T) { + t.Parallel() // confirm cache.GetNodes returns the correct nodes selectedNodes, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{RequestedCount: 2}) require.NoError(t, err) @@ -253,6 +255,7 @@ func TestGetNodes(t *testing.T) { } }) t.Run("too much", func(t *testing.T) { + t.Parallel() // we have 5 subnets (1 new, 4 vetted), with two nodes in each _, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{RequestedCount: 6}) require.Error(t, err) @@ -262,6 +265,7 @@ func TestGetNodes(t *testing.T) { t.Run("using country filter", func(t *testing.T) { t.Run("normal", func(t *testing.T) { + t.Parallel() selectedNodes, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{ RequestedCount: 3, Placement: 5, @@ -270,6 +274,7 @@ func TestGetNodes(t *testing.T) { require.Len(t, selectedNodes, 3) }) t.Run("too much", func(t *testing.T) { + t.Parallel() _, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{ RequestedCount: 4, Placement: 5, @@ -278,6 +283,60 @@ func TestGetNodes(t *testing.T) { }) }) + t.Run("using country without subnets", func(t *testing.T) { + t.Run("normal", func(t *testing.T) { + t.Parallel() + // it's possible to get 5 only because we don't use subnet exclusions. + selectedNodes, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{ + RequestedCount: 5, + Placement: 6, + }) + require.NoError(t, err) + require.Len(t, selectedNodes, 5) + }) + t.Run("too much", func(t *testing.T) { + t.Parallel() + _, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{ + RequestedCount: 6, + Placement: 6, + }) + require.Error(t, err) + }) + }) + + t.Run("using country without subnets and exclusions", func(t *testing.T) { + // DE nodes: 0 (subet:A), 2 (A), 4 (B) 6(C) 8(C, but not vetted) + // if everything works well, we can exclude 0, and got 3 (2,4,6) + // unless somebody removes the 2 (because it's in the same subnet as 0) + selectedNodes, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{ + RequestedCount: 3, + Placement: 6, + ExcludedIDs: []storj.NodeID{ + nodeIds[0], + }, + }) + require.NoError(t, err) + require.Len(t, selectedNodes, 3) + }) + + t.Run("check subnet selection", func(t *testing.T) { + for i := 0; i < 10; i++ { + selectedNodes, err := cache.GetNodes(ctx, overlay.FindStorageNodesRequest{ + RequestedCount: 3, + Placement: 0, + }) + require.NoError(t, err) + + subnets := map[string]struct{}{} + for _, node := range selectedNodes { + subnets[node.LastNet] = struct{}{} + } + + require.Len(t, selectedNodes, 3) + require.Len(t, subnets, 3) + } + }) + }) }