satellite/repair/checker: put into queue segment off placement

Checker when qualifying segment for repair is now looking at pieces
location and if they are outisde segment placement puts them into
repair queue.

Fixes https://github.com/storj/storj/issues/5895

Change-Id: If0d941b30ad94c5ef02fb1a03c7f3d04a2df25c7
This commit is contained in:
Michal Niewrzal 2023-05-30 16:44:36 +02:00
parent 1966340b2a
commit 337eb9be6a
6 changed files with 131 additions and 7 deletions

View File

@ -91,6 +91,7 @@ storj.io/storj/satellite/repair/checker."checker_segment_age" IntVal
storj.io/storj/satellite/repair/checker."checker_segment_clumped_count" IntVal
storj.io/storj/satellite/repair/checker."checker_segment_health" FloatVal
storj.io/storj/satellite/repair/checker."checker_segment_healthy_count" IntVal
storj.io/storj/satellite/repair/checker."checker_segment_off_placement_count" IntVal
storj.io/storj/satellite/repair/checker."checker_segment_time_until_irreparable" IntVal
storj.io/storj/satellite/repair/checker."checker_segment_total_count" IntVal
storj.io/storj/satellite/repair/checker."checker_segments_below_min_req" Counter

View File

@ -24,6 +24,7 @@ type Config struct {
NodeFailureRate float64 `help:"the probability of a single node going down within the next checker iteration" default:"0.00005435" `
RepairQueueInsertBatchSize int `help:"Number of damaged segments to buffer in-memory before flushing to the repair queue" default:"100" `
DoDeclumping bool `help:"Treat pieces on the same network as in need of repair" default:"false"`
DoPlacementCheck bool `help:"Treat pieces out of segment placement as in need of repair" default:"true"`
}
// RepairOverride is a configuration struct that contains an override repair

View File

@ -40,6 +40,7 @@ type Observer struct {
nodeFailureRate float64
repairQueueBatchSize int
doDeclumping bool
doPlacementCheck bool
// the following are reset on each iteration
startTime time.Time
@ -61,6 +62,7 @@ func NewObserver(logger *zap.Logger, repairQueue queue.RepairQueue, overlay *ove
nodeFailureRate: config.NodeFailureRate,
repairQueueBatchSize: config.RepairQueueInsertBatchSize,
doDeclumping: config.DoDeclumping,
doPlacementCheck: config.DoPlacementCheck,
statsCollector: make(map[string]*observerRSStats),
}
}
@ -229,8 +231,10 @@ type observerFork struct {
getNodesEstimate func(ctx context.Context) (int, error)
log *zap.Logger
doDeclumping bool
doPlacementCheck bool
lastStreamID uuid.UUID
totalStats aggregateStats
allNodeIDs []storj.NodeID
getObserverStats func(string) *observerRSStats
}
@ -248,6 +252,7 @@ func newObserverFork(observer *Observer) rangedloop.Partial {
getNodesEstimate: observer.getNodesEstimate,
log: observer.logger,
doDeclumping: observer.doDeclumping,
doPlacementCheck: observer.doPlacementCheck,
getObserverStats: observer.getObserverStats,
}
}
@ -336,16 +341,23 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
return Error.New("error getting missing pieces: %w", err)
}
// reuse allNodeIDs slice if its large enough
if cap(fork.allNodeIDs) < len(pieces) {
fork.allNodeIDs = make([]storj.NodeID, len(pieces))
} else {
fork.allNodeIDs = fork.allNodeIDs[:len(pieces)]
}
for i, p := range pieces {
fork.allNodeIDs[i] = p.StorageNode
}
var clumpedPieces metabase.Pieces
var lastNets []string
if fork.doDeclumping {
// if multiple pieces are on the same last_net, keep only the first one. The rest are
// to be considered retrievable but unhealthy.
nodeIDs := make([]storj.NodeID, len(pieces))
for i, p := range pieces {
nodeIDs[i] = p.StorageNode
}
lastNets, err = fork.overlayService.GetNodesNetworkInOrder(ctx, nodeIDs)
lastNets, err = fork.overlayService.GetNodesNetworkInOrder(ctx, fork.allNodeIDs)
if err != nil {
fork.totalStats.remoteSegmentsFailedToCheck++
stats.iterationAggregates.remoteSegmentsFailedToCheck++
@ -354,6 +366,18 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
clumpedPieces = repair.FindClumpedPieces(segment.Pieces, lastNets)
}
numPiecesOutOfPlacement := 0
if fork.doPlacementCheck && segment.Placement != storj.EveryCountry {
outOfPlacementNodes, err := fork.overlayService.GetNodesOutOfPlacement(ctx, fork.allNodeIDs, segment.Placement)
if err != nil {
fork.totalStats.remoteSegmentsFailedToCheck++
stats.iterationAggregates.remoteSegmentsFailedToCheck++
return errs.Combine(Error.New("error determining nodes placement"), err)
}
numPiecesOutOfPlacement = len(outOfPlacementNodes)
}
numHealthy := len(pieces) - len(missingPieces) - len(clumpedPieces)
mon.IntVal("checker_segment_total_count").Observe(int64(len(pieces))) //mon:locked
stats.segmentStats.segmentTotalCount.Observe(int64(len(pieces)))
@ -362,6 +386,8 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
stats.segmentStats.segmentHealthyCount.Observe(int64(numHealthy))
mon.IntVal("checker_segment_clumped_count").Observe(int64(len(clumpedPieces))) //mon:locked
stats.segmentStats.segmentClumpedCount.Observe(int64(len(clumpedPieces)))
mon.IntVal("checker_segment_off_placement_count").Observe(int64(numPiecesOutOfPlacement)) //mon:locked
stats.segmentStats.segmentOffPlacementCount.Observe(int64(numPiecesOutOfPlacement))
segmentAge := time.Since(segment.CreatedAt)
mon.IntVal("checker_segment_age").Observe(int64(segmentAge.Seconds())) //mon:locked
@ -374,8 +400,10 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
// we repair when the number of healthy pieces is less than or equal to the repair threshold and is greater or equal to
// minimum required pieces in redundancy
// except for the case when the repair and success thresholds are the same (a case usually seen during testing)
if numHealthy <= repairThreshold && numHealthy < successThreshold {
// except for the case when the repair and success thresholds are the same (a case usually seen during testing).
// separate case is when we find pieces which are outside segment placement. in such case we are putting segment
// into queue right away.
if (numHealthy <= repairThreshold && numHealthy < successThreshold) || numPiecesOutOfPlacement > 0 {
mon.FloatVal("checker_injured_segment_health").Observe(segmentHealth) //mon:locked
stats.segmentStats.injuredSegmentHealth.Observe(segmentHealth)
fork.totalStats.remoteSegmentsNeedingRepair++

View File

@ -22,8 +22,10 @@ import (
"storj.io/common/uuid"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
"storj.io/storj/satellite/buckets"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/rangedloop"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/repair/checker"
"storj.io/storj/satellite/repair/queue"
)
@ -580,3 +582,90 @@ func BenchmarkRemoteSegment(b *testing.B) {
})
}
func TestObserver_PlacementCheck(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.ReconfigureRS(1, 2, 4, 4),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
repairQueue := planet.Satellites[0].DB.RepairQueue()
require.NoError(t, planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], "testbucket"))
_, err := planet.Satellites[0].API.Buckets.Service.UpdateBucket(ctx, buckets.Bucket{
ProjectID: planet.Uplinks[0].Projects[0].ID,
Name: "testbucket",
Placement: storj.EU,
})
require.NoError(t, err)
for _, node := range planet.StorageNodes {
require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), "PL"))
}
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "object", testrand.Bytes(5*memory.KiB))
require.NoError(t, err)
type testCase struct {
piecesOutOfPlacement int
}
for _, tc := range []testCase{
// all pieces/nodes are out of placement
{piecesOutOfPlacement: 4},
// few pieces/nodes are out of placement
{piecesOutOfPlacement: 2},
} {
for _, node := range planet.StorageNodes {
require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), "PL"))
}
require.NoError(t, planet.Satellites[0].Repairer.Overlay.DownloadSelectionCache.Refresh(ctx))
segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
require.NoError(t, err)
require.Len(t, segments, 1)
require.Len(t, segments[0].Pieces, 4)
for _, piece := range segments[0].Pieces[:tc.piecesOutOfPlacement] {
require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, piece.StorageNode, "US"))
}
// confirm that some pieces are out of placement
ok, err := allPiecesInPlacement(ctx, planet.Satellites[0].Overlay.Service, segments[0].Pieces, segments[0].Placement)
require.NoError(t, err)
require.False(t, ok)
require.NoError(t, planet.Satellites[0].Repairer.Overlay.DownloadSelectionCache.Refresh(ctx))
_, err = planet.Satellites[0].RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
injuredSegment, err := repairQueue.Select(ctx)
require.NoError(t, err)
err = repairQueue.Delete(ctx, injuredSegment)
require.NoError(t, err)
require.Equal(t, segments[0].StreamID, injuredSegment.StreamID)
count, err := repairQueue.Count(ctx)
require.Zero(t, err)
require.Zero(t, count)
}
})
}
func allPiecesInPlacement(ctx context.Context, overlay *overlay.Service, pieces metabase.Pieces, placement storj.PlacementConstraint) (bool, error) {
for _, piece := range pieces {
nodeDossier, err := overlay.Get(ctx, piece.StorageNode)
if err != nil {
return false, err
}
if !placement.AllowedCountry(nodeDossier.CountryCode) {
return false, nil
}
}
return true, nil
}

View File

@ -103,6 +103,7 @@ type segmentRSStats struct {
segmentTotalCount *monkit.IntVal
segmentHealthyCount *monkit.IntVal
segmentClumpedCount *monkit.IntVal
segmentOffPlacementCount *monkit.IntVal
segmentAge *monkit.IntVal
segmentHealth *monkit.FloatVal
injuredSegmentHealth *monkit.FloatVal
@ -115,6 +116,7 @@ func newSegmentRSStats(rs string) *segmentRSStats {
segmentTotalCount: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_total_count").WithTag("rs_scheme", rs)),
segmentHealthyCount: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_healthy_count").WithTag("rs_scheme", rs)),
segmentClumpedCount: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_clumped_count").WithTag("rs_scheme", rs)),
segmentOffPlacementCount: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_off_placement_count").WithTag("rs_scheme", rs)),
segmentAge: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_age").WithTag("rs_scheme", rs)),
segmentHealth: monkit.NewFloatVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_health").WithTag("rs_scheme", rs)),
injuredSegmentHealth: monkit.NewFloatVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_injured_segment_health").WithTag("rs_scheme", rs)),

View File

@ -91,6 +91,9 @@
# Treat pieces on the same network as in need of repair
# checker.do-declumping: false
# Treat pieces out of segment placement as in need of repair
# checker.do-placement-check: true
# how frequently checker should check for bad segments
# checker.interval: 30s