satellite/repair/checker: respect autoExcludeSubnet anntation in checker rangedloop

This patch is a oneliner: rangedloop checker should check the subnets only if it's not turned off with placement annotation.
(see in satellite/repair/checker/observer.go).

But I didn't find any unit test to cover that part, so I had to write one, and I prefered to write it as a unit test not an integration test, which requires a mock repair queue (observer_unit_test.go mock.go).

Because it's small change, I also included a small change: creating a elper method to check if AutoExcludeSubnet annotation is defined

Change-Id: I2666b937074ab57f603b356408ef108cd55bd6fd
This commit is contained in:
Márton Elek 2023-08-21 12:11:22 +02:00 committed by Storj Robot
parent 4ccce11893
commit 84ea80c1fd
7 changed files with 261 additions and 4 deletions

View File

@ -20,6 +20,11 @@ const (
AutoExcludeSubnetOFF = "off" AutoExcludeSubnetOFF = "off"
) )
// AllowSameSubnet is a short to check if Subnet exclusion is disabled == allow pick nodes from the same subnet.
func AllowSameSubnet(filter NodeFilter) bool {
return GetAnnotation(filter, AutoExcludeSubnet) == AutoExcludeSubnetOFF
}
// ErrNotEnoughNodes is when selecting nodes failed with the given parameters. // ErrNotEnoughNodes is when selecting nodes failed with the given parameters.
var ErrNotEnoughNodes = errs.Class("not enough nodes") var ErrNotEnoughNodes = errs.Class("not enough nodes")

View File

@ -97,7 +97,7 @@ func (cache *UploadSelectionCache) GetNodes(ctx context.Context, req FindStorage
} }
placementRules := cache.placementRules(req.Placement) placementRules := cache.placementRules(req.Placement)
useSubnetExclusion := nodeselection.GetAnnotation(placementRules, nodeselection.AutoExcludeSubnet) != nodeselection.AutoExcludeSubnetOFF useSubnetExclusion := !nodeselection.AllowSameSubnet(placementRules)
filters := nodeselection.NodeFilters{placementRules} filters := nodeselection.NodeFilters{placementRules}
if len(req.ExcludedIDs) > 0 { if len(req.ExcludedIDs) > 0 {

View File

@ -20,6 +20,7 @@ import (
"storj.io/common/uuid" "storj.io/common/uuid"
"storj.io/storj/satellite/metabase" "storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/rangedloop" "storj.io/storj/satellite/metabase/rangedloop"
"storj.io/storj/satellite/nodeselection"
"storj.io/storj/satellite/overlay" "storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/repair" "storj.io/storj/satellite/repair"
"storj.io/storj/satellite/repair/queue" "storj.io/storj/satellite/repair/queue"
@ -342,7 +343,10 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
var clumpedPieces metabase.Pieces var clumpedPieces metabase.Pieces
var lastNets []string var lastNets []string
if fork.doDeclumping {
nodeFilter := fork.nodesCache.placementRules(segment.Placement)
if fork.doDeclumping && !nodeselection.AllowSameSubnet(nodeFilter) {
// if multiple pieces are on the same last_net, keep only the first one. The rest are // if multiple pieces are on the same last_net, keep only the first one. The rest are
// to be considered retrievable but unhealthy. // to be considered retrievable but unhealthy.
lastNets, err = fork.nodesCache.PiecesNodesLastNetsInOrder(ctx, segment.CreatedAt, pieces) lastNets, err = fork.nodesCache.PiecesNodesLastNetsInOrder(ctx, segment.CreatedAt, pieces)

View File

@ -0,0 +1,174 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package checker
import (
"fmt"
"testing"
"time"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"storj.io/common/identity/testidentity"
"storj.io/common/storj"
"storj.io/common/storj/location"
"storj.io/common/testcontext"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/rangedloop"
"storj.io/storj/satellite/nodeselection"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/repair/queue"
)
func TestObserverForkProcess(t *testing.T) {
nodes := func() (res []nodeselection.SelectedNode) {
for i := 0; i < 10; i++ {
res = append(res, nodeselection.SelectedNode{
ID: testidentity.MustPregeneratedIdentity(i, storj.LatestIDVersion()).ID,
Online: true,
CountryCode: location.Germany,
LastNet: "127.0.0.0",
})
}
return res
}()
mapNodes := func(nodes []nodeselection.SelectedNode, include func(node nodeselection.SelectedNode) bool) map[storj.NodeID]nodeselection.SelectedNode {
res := map[storj.NodeID]nodeselection.SelectedNode{}
for _, node := range nodes {
if include(node) {
res[node.ID] = node
}
}
return res
}
ctx := testcontext.New(t)
createDefaultObserver := func() *Observer {
o := &Observer{
statsCollector: make(map[string]*observerRSStats),
nodesCache: &ReliabilityCache{
staleness: time.Hour,
placementRules: overlay.NewPlacementRules().CreateFilters,
},
}
o.nodesCache.state.Store(&reliabilityState{
reliableAll: mapNodes(nodes, func(node nodeselection.SelectedNode) bool {
return true
}),
reliableOnline: mapNodes(nodes, func(node nodeselection.SelectedNode) bool {
return node.Online == true
}),
created: time.Now(),
})
return o
}
createFork := func(o *Observer, q queue.RepairQueue) *observerFork {
return &observerFork{
log: zaptest.NewLogger(t),
getObserverStats: o.getObserverStats,
rsStats: make(map[string]*partialRSStats),
doDeclumping: o.doDeclumping,
doPlacementCheck: o.doPlacementCheck,
getNodesEstimate: o.getNodesEstimate,
nodesCache: o.nodesCache,
repairQueue: queue.NewInsertBuffer(q, 1000),
}
}
createPieces := func(nodes []nodeselection.SelectedNode, selected ...int) metabase.Pieces {
pieces := make(metabase.Pieces, len(selected))
for ix, s := range selected {
pieces[ix] = metabase.Piece{
Number: uint16(ix),
StorageNode: nodes[s].ID,
}
}
return pieces
}
t.Run("all healthy", func(t *testing.T) {
o := createDefaultObserver()
q := queue.MockRepairQueue{}
fork := createFork(o, &q)
err := fork.process(ctx, &rangedloop.Segment{
Pieces: createPieces(nodes, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9),
Redundancy: storj.RedundancyScheme{
Algorithm: storj.ReedSolomon,
ShareSize: 256,
RepairShares: 4,
RequiredShares: 6,
OptimalShares: 8,
TotalShares: 10,
},
})
require.NoError(t, err)
err = fork.repairQueue.Flush(ctx)
require.NoError(t, err)
require.Len(t, q.Segments, 0)
})
t.Run("declumping", func(t *testing.T) {
o := createDefaultObserver()
o.doDeclumping = true
q := queue.MockRepairQueue{}
fork := createFork(o, &q)
err := fork.process(ctx, &rangedloop.Segment{
Pieces: createPieces(nodes, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9),
Redundancy: storj.RedundancyScheme{
Algorithm: storj.ReedSolomon,
ShareSize: 256,
RepairShares: 4,
RequiredShares: 6,
OptimalShares: 8,
TotalShares: 10,
},
})
require.NoError(t, err)
err = fork.repairQueue.Flush(ctx)
require.NoError(t, err)
// as all test nodes are in the same subnet...
require.Len(t, q.Segments, 1)
})
t.Run("declumping is ignored by annotation", func(t *testing.T) {
o := createDefaultObserver()
o.doDeclumping = true
placements := overlay.ConfigurablePlacementRule{}
require.NoError(t, placements.Set(fmt.Sprintf(`10:annotated(country("DE"),annotation("%s","%s"))`, nodeselection.AutoExcludeSubnet, nodeselection.AutoExcludeSubnetOFF)))
o.nodesCache.placementRules = placements.CreateFilters
q := queue.MockRepairQueue{}
fork := createFork(o, &q)
err := fork.process(ctx, &rangedloop.Segment{
Placement: 10,
Pieces: createPieces(nodes, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9),
Redundancy: storj.RedundancyScheme{
Algorithm: storj.ReedSolomon,
ShareSize: 256,
RepairShares: 4,
RequiredShares: 6,
OptimalShares: 8,
TotalShares: 10,
},
})
require.NoError(t, err)
err = fork.repairQueue.Flush(ctx)
require.NoError(t, err)
require.Len(t, q.Segments, 0)
})
}

View File

@ -0,0 +1,74 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package queue
import (
"context"
"time"
"storj.io/common/uuid"
"storj.io/storj/satellite/metabase"
)
// MockRepairQueue helps testing RepairQueue.
type MockRepairQueue struct {
Segments []*InjuredSegment
}
// Insert implements RepairQueue.
func (m *MockRepairQueue) Insert(ctx context.Context, s *InjuredSegment) (alreadyInserted bool, err error) {
for _, alreadyAdded := range m.Segments {
if alreadyAdded.StreamID == s.StreamID {
return true, nil
}
}
m.Segments = append(m.Segments, s)
return false, nil
}
// InsertBatch implements RepairQueue.
func (m *MockRepairQueue) InsertBatch(ctx context.Context, segments []*InjuredSegment) (newlyInsertedSegments []*InjuredSegment, err error) {
for _, segment := range segments {
inserted, err := m.Insert(ctx, segment)
if err != nil {
return nil, err
}
if inserted {
newlyInsertedSegments = append(newlyInsertedSegments, segment)
}
}
return newlyInsertedSegments, nil
}
// Select implements RepairQueue.
func (m *MockRepairQueue) Select(ctx context.Context) (*InjuredSegment, error) {
panic("implement me")
}
// Delete implements RepairQueue.
func (m *MockRepairQueue) Delete(ctx context.Context, s *InjuredSegment) error {
panic("implement me")
}
// Clean implements RepairQueue.
func (m *MockRepairQueue) Clean(ctx context.Context, before time.Time) (deleted int64, err error) {
panic("implement me")
}
// SelectN implements RepairQueue.
func (m *MockRepairQueue) SelectN(ctx context.Context, limit int) ([]InjuredSegment, error) {
panic("implement me")
}
// Count implements RepairQueue.
func (m *MockRepairQueue) Count(ctx context.Context) (count int, err error) {
panic("implement me")
}
// TestingSetAttemptedTime implements RepairQueue.
func (m *MockRepairQueue) TestingSetAttemptedTime(ctx context.Context, streamID uuid.UUID, position metabase.SegmentPosition, t time.Time) (rowsAffected int64, err error) {
panic("implement me")
}
var _ RepairQueue = &MockRepairQueue{}

View File

@ -704,7 +704,7 @@ func (repairer *SegmentRepairer) classifySegmentPiecesWithNodes(ctx context.Cont
nodeFilters = repairer.placementRules(segment.Placement) nodeFilters = repairer.placementRules(segment.Placement)
if repairer.doDeclumping && nodeselection.GetAnnotation(nodeFilters, nodeselection.AutoExcludeSubnet) != nodeselection.AutoExcludeSubnetOFF { if repairer.doDeclumping && !nodeselection.AllowSameSubnet(nodeFilters) {
// if multiple pieces are on the same last_net, keep only the first one. The rest are // if multiple pieces are on the same last_net, keep only the first one. The rest are
// to be considered retrievable but unhealthy. // to be considered retrievable but unhealthy.
lastNets := make([]string, 0, len(allNodeIDs)) lastNets := make([]string, 0, len(allNodeIDs))

View File

@ -130,7 +130,7 @@ func TestClassify(t *testing.T) {
}) })
t.Run("decumpling but with no subnet filter", func(t *testing.T) { t.Run("declumping but with no subnet filter", func(t *testing.T) {
var online, offline = generateNodes(10, func(ix int) bool { var online, offline = generateNodes(10, func(ix int) bool {
return ix < 5 return ix < 5
}, func(ix int, node *nodeselection.SelectedNode) { }, func(ix int, node *nodeselection.SelectedNode) {