satellite/repair: unify repair logic

The repair checker and repair worker both need to determine which pieces
are healthy, which are retrievable, and which should be replaced, but
they have been doing it in different ways in different code, which has
been the cause of bugs. The same term could have very similar but subtly
different meanings between the two, causing much confusion.

With this change, the piece- and node-classification logic is
consolidated into one place within the satellite/repair package, so that
both subsystems can use it. This ought to make decision-making code more
concise and more readable.

The consolidated classification logic has been expanded to create more
sets, so that the decision-making code does not need to do as much
precalculation. It should now be clearer in comments and code that a
piece can belong to multiple sets arbitrarily (except where the
definition of the sets makes this logically impossible), and what the
precise meaning of each set is. These sets include Missing, Suspended,
Clumped, OutOfPlacement, InExcludedCountry, ForcingRepair,
UnhealthyRetrievable, Unhealthy, Retrievable, and Healthy.

Some other side effects of this change:

* CreatePutRepairOrderLimits no longer needs to special-case excluded
  countries; it can just create as many order limits as requested (by
  way of len(newNodes)).
* The repair checker will now queue a segment for repair when there are
  any pieces out of placement. The code calls this "forcing a repair".
* The checker.ReliabilityCache is now accessed by way of a GetNodes()
  function similar to the one on the overlay. The classification methods
  like MissingPieces(), OutOfPlacementPieces(), and
  PiecesNodesLastNetsInOrder() are removed in favor of the
  classification logic in satellite/repair/classification.go. This
  means the reliability cache no longer needs access to the placement
  rules or excluded countries list.

Change-Id: I105109fb94ee126952f07d747c6e11131164fadb
This commit is contained in:
paul cannon 2023-09-10 23:07:39 -05:00
parent c44e3d78d8
commit 1b8bd6c082
14 changed files with 484 additions and 600 deletions

View File

@ -281,10 +281,8 @@ func reuploadSegment(ctx context.Context, log *zap.Logger, peer *satellite.Repai
return errs.New("not enough new nodes were found for repair: min %v got %v", redundancy.RepairThreshold(), len(newNodes)) return errs.New("not enough new nodes were found for repair: min %v got %v", redundancy.RepairThreshold(), len(newNodes))
} }
optimalThresholdMultiplier := float64(1) // is this value fine?
numHealthyInExcludedCountries := 0
putLimits, putPrivateKey, err := peer.Orders.Service.CreatePutRepairOrderLimits(ctx, segment, make([]*pb.AddressedOrderLimit, len(newNodes)), putLimits, putPrivateKey, err := peer.Orders.Service.CreatePutRepairOrderLimits(ctx, segment, make([]*pb.AddressedOrderLimit, len(newNodes)),
make(map[int32]struct{}), newNodes, optimalThresholdMultiplier, numHealthyInExcludedCountries) make(map[uint16]struct{}), newNodes)
if err != nil { if err != nil {
return errs.New("could not create PUT_REPAIR order limits: %w", err) return errs.New("could not create PUT_REPAIR order limits: %w", err)
} }

View File

@ -128,7 +128,6 @@ storj.io/storj/satellite/repair/repairer."repair_too_many_nodes_failed" Meter
storj.io/storj/satellite/repair/repairer."repair_unnecessary" Meter storj.io/storj/satellite/repair/repairer."repair_unnecessary" Meter
storj.io/storj/satellite/repair/repairer."repairer_segments_below_min_req" Counter storj.io/storj/satellite/repair/repairer."repairer_segments_below_min_req" Counter
storj.io/storj/satellite/repair/repairer."segment_deleted_before_repair" Meter storj.io/storj/satellite/repair/repairer."segment_deleted_before_repair" Meter
storj.io/storj/satellite/repair/repairer."segment_repair_count" IntVal
storj.io/storj/satellite/repair/repairer."segment_time_until_repair" IntVal storj.io/storj/satellite/repair/repairer."segment_time_until_repair" IntVal
storj.io/storj/satellite/repair/repairer."time_for_repair" FloatVal storj.io/storj/satellite/repair/repairer."time_for_repair" FloatVal
storj.io/storj/satellite/repair/repairer."time_since_checker_queue" FloatVal storj.io/storj/satellite/repair/repairer."time_since_checker_queue" FloatVal

View File

@ -1106,6 +1106,7 @@ func TestAuditRepairedSegmentInExcludedCountries(t *testing.T) {
Satellite: testplanet.Combine( Satellite: testplanet.Combine(
func(log *zap.Logger, index int, config *satellite.Config) { func(log *zap.Logger, index int, config *satellite.Config) {
config.Repairer.InMemoryRepair = true config.Repairer.InMemoryRepair = true
config.Repairer.MaxExcessRateOptimalThreshold = 0.0
}, },
testplanet.ReconfigureRS(3, 5, 8, 10), testplanet.ReconfigureRS(3, 5, 8, 10),
testplanet.RepairExcludedCountryCodes([]string{"FR", "BE"}), testplanet.RepairExcludedCountryCodes([]string{"FR", "BE"}),
@ -1138,7 +1139,8 @@ func TestAuditRepairedSegmentInExcludedCountries(t *testing.T) {
nodesInExcluded = append(nodesInExcluded, remotePieces[i].StorageNode) nodesInExcluded = append(nodesInExcluded, remotePieces[i].StorageNode)
} }
// make extra pieces after optimal bad // make extra pieces after optimal bad, so we know there are exactly OptimalShares
// retrievable shares. numExcluded of them are in an excluded country.
for i := int(segment.Redundancy.OptimalShares); i < len(remotePieces); i++ { for i := int(segment.Redundancy.OptimalShares); i < len(remotePieces); i++ {
err = planet.StopNodeAndUpdate(ctx, planet.FindNode(remotePieces[i].StorageNode)) err = planet.StopNodeAndUpdate(ctx, planet.FindNode(remotePieces[i].StorageNode))
require.NoError(t, err) require.NoError(t, err)
@ -1167,17 +1169,29 @@ func TestAuditRepairedSegmentInExcludedCountries(t *testing.T) {
require.NotEqual(t, segment.Pieces, segmentAfterRepair.Pieces) require.NotEqual(t, segment.Pieces, segmentAfterRepair.Pieces)
require.Equal(t, 10, len(segmentAfterRepair.Pieces)) require.Equal(t, 10, len(segmentAfterRepair.Pieces))
// check excluded area nodes still exist // the number of nodes that should still be online holding intact pieces, not in
for i, n := range nodesInExcluded { // excluded countries
var found bool expectHealthyNodes := int(segment.Redundancy.OptimalShares) - numExcluded
// repair should make this many new pieces to get the segment up to OptimalShares
// shares, not counting excluded-country nodes
expectNewPieces := int(segment.Redundancy.OptimalShares) - expectHealthyNodes
// so there should be this many pieces after repair, not counting excluded-country
// nodes
expectPiecesAfterRepair := expectHealthyNodes + expectNewPieces
// so there should be this many excluded-country pieces left in the segment (we
// couldn't keep all of them, or we would have had more than TotalShares pieces).
expectRemainingExcluded := int(segment.Redundancy.TotalShares) - expectPiecesAfterRepair
found := 0
for _, nodeID := range nodesInExcluded {
for _, p := range segmentAfterRepair.Pieces { for _, p := range segmentAfterRepair.Pieces {
if p.StorageNode == n { if p.StorageNode == nodeID {
found = true found++
break break
} }
} }
require.True(t, found, fmt.Sprintf("node %s not in segment, but should be\n", segmentAfterRepair.Pieces[i].StorageNode.String()))
} }
require.Equal(t, expectRemainingExcluded, found, "found wrong number of excluded-country pieces after repair")
nodesInPointer := make(map[storj.NodeID]bool) nodesInPointer := make(map[storj.NodeID]bool)
for _, n := range segmentAfterRepair.Pieces { for _, n := range segmentAfterRepair.Pieces {
// check for duplicates // check for duplicates

View File

@ -5,7 +5,6 @@ package orders
import ( import (
"context" "context"
"math"
mathrand "math/rand" mathrand "math/rand"
"sync" "sync"
"time" "time"
@ -459,18 +458,12 @@ func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, segment
} }
// CreatePutRepairOrderLimits creates the order limits for uploading the repaired pieces of segment to newNodes. // CreatePutRepairOrderLimits creates the order limits for uploading the repaired pieces of segment to newNodes.
func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, segment metabase.Segment, getOrderLimits []*pb.AddressedOrderLimit, healthySet map[int32]struct{}, newNodes []*nodeselection.SelectedNode, optimalThresholdMultiplier float64, numPiecesInExcludedCountries int) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error) { func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, segment metabase.Segment, getOrderLimits []*pb.AddressedOrderLimit, healthySet map[uint16]struct{}, newNodes []*nodeselection.SelectedNode) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
// Create the order limits for being used to upload the repaired pieces // Create the order limits for being used to upload the repaired pieces
pieceSize := segment.PieceSize() pieceSize := segment.PieceSize()
totalPieces := int(segment.Redundancy.TotalShares) totalPieces := int(segment.Redundancy.TotalShares)
totalPiecesAfterRepair := int(math.Ceil(float64(segment.Redundancy.OptimalShares)*optimalThresholdMultiplier)) + numPiecesInExcludedCountries
if totalPiecesAfterRepair > totalPieces {
totalPiecesAfterRepair = totalPieces
}
var numRetrievablePieces int var numRetrievablePieces int
for _, o := range getOrderLimits { for _, o := range getOrderLimits {
@ -479,8 +472,6 @@ func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, segment
} }
} }
totalPiecesToRepair := totalPiecesAfterRepair - len(healthySet)
limits := make([]*pb.AddressedOrderLimit, totalPieces) limits := make([]*pb.AddressedOrderLimit, totalPieces)
expirationDate := time.Time{} expirationDate := time.Time{}
@ -493,7 +484,7 @@ func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, segment
return nil, storj.PiecePrivateKey{}, Error.Wrap(err) return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
} }
var pieceNum int32 var pieceNum uint16
for _, node := range newNodes { for _, node := range newNodes {
for int(pieceNum) < totalPieces { for int(pieceNum) < totalPieces {
_, isHealthy := healthySet[pieceNum] _, isHealthy := healthySet[pieceNum]
@ -507,18 +498,13 @@ func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, segment
return nil, storj.PiecePrivateKey{}, Error.New("piece num greater than total pieces: %d >= %d", pieceNum, totalPieces) return nil, storj.PiecePrivateKey{}, Error.New("piece num greater than total pieces: %d >= %d", pieceNum, totalPieces)
} }
limit, err := signer.Sign(ctx, resolveStorageNode_Selected(node, false), pieceNum) limit, err := signer.Sign(ctx, resolveStorageNode_Selected(node, false), int32(pieceNum))
if err != nil { if err != nil {
return nil, storj.PiecePrivateKey{}, Error.Wrap(err) return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
} }
limits[pieceNum] = limit limits[pieceNum] = limit
pieceNum++ pieceNum++
totalPiecesToRepair--
if totalPiecesToRepair == 0 {
break
}
} }
return limits, signer.PrivateKey, nil return limits, signer.PrivateKey, nil

View File

@ -17,10 +17,9 @@ import (
"golang.org/x/exp/slices" "golang.org/x/exp/slices"
"storj.io/common/storj" "storj.io/common/storj"
"storj.io/common/storj/location"
"storj.io/common/uuid" "storj.io/common/uuid"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/rangedloop" "storj.io/storj/satellite/metabase/rangedloop"
"storj.io/storj/satellite/nodeselection"
"storj.io/storj/satellite/overlay" "storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/repair" "storj.io/storj/satellite/repair"
"storj.io/storj/satellite/repair/queue" "storj.io/storj/satellite/repair/queue"
@ -40,8 +39,10 @@ type Observer struct {
repairOverrides RepairOverridesMap repairOverrides RepairOverridesMap
nodeFailureRate float64 nodeFailureRate float64
repairQueueBatchSize int repairQueueBatchSize int
excludedCountryCodes map[location.CountryCode]struct{}
doDeclumping bool doDeclumping bool
doPlacementCheck bool doPlacementCheck bool
placementRules overlay.PlacementRules
// the following are reset on each iteration // the following are reset on each iteration
startTime time.Time startTime time.Time
@ -53,17 +54,26 @@ type Observer struct {
// NewObserver creates new checker observer instance. // NewObserver creates new checker observer instance.
func NewObserver(logger *zap.Logger, repairQueue queue.RepairQueue, overlay *overlay.Service, placementRules overlay.PlacementRules, config Config) *Observer { func NewObserver(logger *zap.Logger, repairQueue queue.RepairQueue, overlay *overlay.Service, placementRules overlay.PlacementRules, config Config) *Observer {
excludedCountryCodes := make(map[location.CountryCode]struct{})
for _, countryCode := range config.RepairExcludedCountryCodes {
if cc := location.ToCountryCode(countryCode); cc != location.None {
excludedCountryCodes[cc] = struct{}{}
}
}
return &Observer{ return &Observer{
logger: logger, logger: logger,
repairQueue: repairQueue, repairQueue: repairQueue,
nodesCache: NewReliabilityCache(overlay, config.ReliabilityCacheStaleness, placementRules, config.RepairExcludedCountryCodes), nodesCache: NewReliabilityCache(overlay, config.ReliabilityCacheStaleness),
overlayService: overlay, overlayService: overlay,
repairOverrides: config.RepairOverrides.GetMap(), repairOverrides: config.RepairOverrides.GetMap(),
nodeFailureRate: config.NodeFailureRate, nodeFailureRate: config.NodeFailureRate,
repairQueueBatchSize: config.RepairQueueInsertBatchSize, repairQueueBatchSize: config.RepairQueueInsertBatchSize,
excludedCountryCodes: excludedCountryCodes,
doDeclumping: config.DoDeclumping, doDeclumping: config.DoDeclumping,
doPlacementCheck: config.DoPlacementCheck, doPlacementCheck: config.DoPlacementCheck,
placementRules: placementRules,
statsCollector: make(map[string]*observerRSStats), statsCollector: make(map[string]*observerRSStats),
} }
} }
@ -231,11 +241,15 @@ type observerFork struct {
nodeFailureRate float64 nodeFailureRate float64
getNodesEstimate func(ctx context.Context) (int, error) getNodesEstimate func(ctx context.Context) (int, error)
log *zap.Logger log *zap.Logger
doDeclumping bool
doPlacementCheck bool
lastStreamID uuid.UUID lastStreamID uuid.UUID
totalStats aggregateStats totalStats aggregateStats
// define from which countries nodes should be marked as offline
excludedCountryCodes map[location.CountryCode]struct{}
doDeclumping bool
doPlacementCheck bool
placementRules overlay.PlacementRules
getObserverStats func(string) *observerRSStats getObserverStats func(string) *observerRSStats
} }
@ -243,17 +257,19 @@ type observerFork struct {
func newObserverFork(observer *Observer) rangedloop.Partial { func newObserverFork(observer *Observer) rangedloop.Partial {
// we can only share thread-safe objects. // we can only share thread-safe objects.
return &observerFork{ return &observerFork{
repairQueue: observer.createInsertBuffer(), repairQueue: observer.createInsertBuffer(),
nodesCache: observer.nodesCache, nodesCache: observer.nodesCache,
overlayService: observer.overlayService, overlayService: observer.overlayService,
rsStats: make(map[string]*partialRSStats), rsStats: make(map[string]*partialRSStats),
repairOverrides: observer.repairOverrides, repairOverrides: observer.repairOverrides,
nodeFailureRate: observer.nodeFailureRate, nodeFailureRate: observer.nodeFailureRate,
getNodesEstimate: observer.getNodesEstimate, getNodesEstimate: observer.getNodesEstimate,
log: observer.logger, log: observer.logger,
doDeclumping: observer.doDeclumping, excludedCountryCodes: observer.excludedCountryCodes,
doPlacementCheck: observer.doPlacementCheck, doDeclumping: observer.doDeclumping,
getObserverStats: observer.getObserverStats, doPlacementCheck: observer.doPlacementCheck,
placementRules: observer.placementRules,
getObserverStats: observer.getObserverStats,
} }
} }
@ -334,52 +350,28 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
return Error.New("could not get estimate of total number of nodes: %w", err) return Error.New("could not get estimate of total number of nodes: %w", err)
} }
missingPieces, err := fork.nodesCache.MissingPieces(ctx, segment.CreatedAt, segment.Pieces) nodeIDs := make([]storj.NodeID, len(pieces))
for i, piece := range pieces {
nodeIDs[i] = piece.StorageNode
}
selectedNodes, err := fork.nodesCache.GetNodes(ctx, segment.CreatedAt, nodeIDs)
if err != nil { if err != nil {
fork.totalStats.remoteSegmentsFailedToCheck++ fork.totalStats.remoteSegmentsFailedToCheck++
stats.iterationAggregates.remoteSegmentsFailedToCheck++ stats.iterationAggregates.remoteSegmentsFailedToCheck++
return Error.New("error getting missing pieces: %w", err) return Error.New("error getting node information for pieces: %w", err)
} }
piecesCheck := repair.ClassifySegmentPieces(segment.Pieces, selectedNodes, fork.excludedCountryCodes, fork.doPlacementCheck, fork.doDeclumping, fork.placementRules(segment.Placement))
var clumpedPieces metabase.Pieces numHealthy := len(piecesCheck.Healthy)
var lastNets []string
nodeFilter := fork.nodesCache.placementRules(segment.Placement)
if fork.doDeclumping && !nodeselection.AllowSameSubnet(nodeFilter) {
// if multiple pieces are on the same last_net, keep only the first one. The rest are
// to be considered retrievable but unhealthy.
lastNets, err = fork.nodesCache.PiecesNodesLastNetsInOrder(ctx, segment.CreatedAt, pieces)
if err != nil {
fork.totalStats.remoteSegmentsFailedToCheck++
stats.iterationAggregates.remoteSegmentsFailedToCheck++
return errs.Combine(Error.New("error determining node last_nets"), err)
}
clumpedPieces = repair.FindClumpedPieces(segment.Pieces, lastNets)
}
numOutOfPlacementPieces := 0
if fork.doPlacementCheck {
outOfPlacementPieces, err := fork.nodesCache.OutOfPlacementPieces(ctx, segment.CreatedAt, segment.Pieces, segment.Placement)
if err != nil {
fork.totalStats.remoteSegmentsFailedToCheck++
stats.iterationAggregates.remoteSegmentsFailedToCheck++
return errs.Combine(Error.New("error determining nodes placement"), err)
}
numOutOfPlacementPieces = len(outOfPlacementPieces)
}
numHealthy := len(pieces) - len(missingPieces) - len(clumpedPieces)
mon.IntVal("checker_segment_total_count").Observe(int64(len(pieces))) //mon:locked mon.IntVal("checker_segment_total_count").Observe(int64(len(pieces))) //mon:locked
stats.segmentStats.segmentTotalCount.Observe(int64(len(pieces))) stats.segmentStats.segmentTotalCount.Observe(int64(len(pieces)))
mon.IntVal("checker_segment_healthy_count").Observe(int64(numHealthy)) //mon:locked mon.IntVal("checker_segment_healthy_count").Observe(int64(numHealthy)) //mon:locked
stats.segmentStats.segmentHealthyCount.Observe(int64(numHealthy)) stats.segmentStats.segmentHealthyCount.Observe(int64(numHealthy))
mon.IntVal("checker_segment_clumped_count").Observe(int64(len(clumpedPieces))) //mon:locked mon.IntVal("checker_segment_clumped_count").Observe(int64(len(piecesCheck.Clumped))) //mon:locked
stats.segmentStats.segmentClumpedCount.Observe(int64(len(clumpedPieces))) stats.segmentStats.segmentClumpedCount.Observe(int64(len(piecesCheck.Clumped)))
mon.IntVal("checker_segment_off_placement_count").Observe(int64(numOutOfPlacementPieces)) //mon:locked mon.IntVal("checker_segment_off_placement_count").Observe(int64(len(piecesCheck.OutOfPlacement))) //mon:locked
stats.segmentStats.segmentOffPlacementCount.Observe(int64(numOutOfPlacementPieces)) stats.segmentStats.segmentOffPlacementCount.Observe(int64(len(piecesCheck.OutOfPlacement)))
segmentAge := time.Since(segment.CreatedAt) segmentAge := time.Since(segment.CreatedAt)
mon.IntVal("checker_segment_age").Observe(int64(segmentAge.Seconds())) //mon:locked mon.IntVal("checker_segment_age").Observe(int64(segmentAge.Seconds())) //mon:locked
@ -395,7 +387,7 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
// except for the case when the repair and success thresholds are the same (a case usually seen during testing). // except for the case when the repair and success thresholds are the same (a case usually seen during testing).
// separate case is when we find pieces which are outside segment placement. in such case we are putting segment // separate case is when we find pieces which are outside segment placement. in such case we are putting segment
// into queue right away. // into queue right away.
if (numHealthy <= repairThreshold && numHealthy < successThreshold) || numOutOfPlacementPieces > 0 { if (numHealthy <= repairThreshold && numHealthy < successThreshold) || len(piecesCheck.ForcingRepair) > 0 {
mon.FloatVal("checker_injured_segment_health").Observe(segmentHealth) //mon:locked mon.FloatVal("checker_injured_segment_health").Observe(segmentHealth) //mon:locked
stats.segmentStats.injuredSegmentHealth.Observe(segmentHealth) stats.segmentStats.injuredSegmentHealth.Observe(segmentHealth)
fork.totalStats.remoteSegmentsNeedingRepair++ fork.totalStats.remoteSegmentsNeedingRepair++
@ -418,8 +410,7 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
} }
// monitor irreparable segments // monitor irreparable segments
numRetrievable := len(pieces) - len(missingPieces) if len(piecesCheck.Retrievable) < required {
if numRetrievable < required {
if !slices.Contains(fork.totalStats.objectsLost, segment.StreamID) { if !slices.Contains(fork.totalStats.objectsLost, segment.StreamID) {
fork.totalStats.objectsLost = append(fork.totalStats.objectsLost, segment.StreamID) fork.totalStats.objectsLost = append(fork.totalStats.objectsLost, segment.StreamID)
} }
@ -449,18 +440,24 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
mon.Counter("checker_segments_below_min_req").Inc(1) //mon:locked mon.Counter("checker_segments_below_min_req").Inc(1) //mon:locked
stats.segmentStats.segmentsBelowMinReq.Inc(1) stats.segmentStats.segmentsBelowMinReq.Inc(1)
var unhealthyNodes []string var missingNodes []string
for _, p := range missingPieces { for _, piece := range pieces {
unhealthyNodes = append(unhealthyNodes, p.StorageNode.String()) if _, isMissing := piecesCheck.Missing[piece.Number]; isMissing {
missingNodes = append(missingNodes, piece.StorageNode.String())
}
} }
fork.log.Warn("checker found irreparable segment", zap.String("Segment StreamID", segment.StreamID.String()), zap.Int("Segment Position", fork.log.Warn("checker found irreparable segment", zap.String("Segment StreamID", segment.StreamID.String()), zap.Int("Segment Position",
int(segment.Position.Encode())), zap.Int("total pieces", len(pieces)), zap.Int("min required", required), zap.String("unhealthy node IDs", strings.Join(unhealthyNodes, ","))) int(segment.Position.Encode())), zap.Int("total pieces", len(pieces)), zap.Int("min required", required), zap.String("unavailable node IDs", strings.Join(missingNodes, ",")))
} else if numRetrievable > repairThreshold { } else if len(piecesCheck.Clumped) > 0 && len(piecesCheck.Healthy)+len(piecesCheck.Clumped) > repairThreshold && len(piecesCheck.ForcingRepair) == 0 {
// This segment is to be repaired because of clumping (it wouldn't need repair yet // This segment is to be repaired because of clumping (it wouldn't need repair yet
// otherwise). Produce a brief report of where the clumping occurred so that we have // otherwise). Produce a brief report of where the clumping occurred so that we have
// a better understanding of the cause. // a better understanding of the cause.
lastNets := make([]string, len(pieces))
for i, node := range selectedNodes {
lastNets[i] = node.LastNet
}
clumpedNets := clumpingReport{lastNets: lastNets} clumpedNets := clumpingReport{lastNets: lastNets}
fork.log.Info("segment needs repair because of clumping", zap.Stringer("Segment StreamID", segment.StreamID), zap.Uint64("Segment Position", segment.Position.Encode()), zap.Int("total pieces", len(pieces)), zap.Int("min required", required), zap.Stringer("clumping", &clumpedNets)) fork.log.Info("segment needs repair only because of clumping", zap.Stringer("Segment StreamID", segment.StreamID), zap.Uint64("Segment Position", segment.Position.Encode()), zap.Int("total pieces", len(pieces)), zap.Int("min required", required), zap.Stringer("clumping", &clumpedNets))
} }
} else { } else {
if numHealthy > repairThreshold && numHealthy <= (repairThreshold+len(fork.totalStats.remoteSegmentsOverThreshold)) { if numHealthy > repairThreshold && numHealthy <= (repairThreshold+len(fork.totalStats.remoteSegmentsOverThreshold)) {

View File

@ -47,13 +47,16 @@ func TestObserverForkProcess(t *testing.T) {
} }
ctx := testcontext.New(t) ctx := testcontext.New(t)
placementRules := overlay.ConfigurablePlacementRule{}
parsed, err := placementRules.Parse()
require.NoError(t, err)
createDefaultObserver := func() *Observer { createDefaultObserver := func() *Observer {
o := &Observer{ o := &Observer{
statsCollector: make(map[string]*observerRSStats), statsCollector: make(map[string]*observerRSStats),
nodesCache: &ReliabilityCache{ nodesCache: &ReliabilityCache{
staleness: time.Hour, staleness: time.Hour,
placementRules: overlay.NewPlacementDefinitions().CreateFilters,
}, },
placementRules: parsed.CreateFilters,
} }
o.nodesCache.state.Store(&reliabilityState{ o.nodesCache.state.Store(&reliabilityState{
@ -72,6 +75,7 @@ func TestObserverForkProcess(t *testing.T) {
rsStats: make(map[string]*partialRSStats), rsStats: make(map[string]*partialRSStats),
doDeclumping: o.doDeclumping, doDeclumping: o.doDeclumping,
doPlacementCheck: o.doPlacementCheck, doPlacementCheck: o.doPlacementCheck,
placementRules: o.placementRules,
getNodesEstimate: o.getNodesEstimate, getNodesEstimate: o.getNodesEstimate,
nodesCache: o.nodesCache, nodesCache: o.nodesCache,
repairQueue: queue.NewInsertBuffer(q, 1000), repairQueue: queue.NewInsertBuffer(q, 1000),
@ -146,7 +150,7 @@ func TestObserverForkProcess(t *testing.T) {
require.NoError(t, placements.Set(fmt.Sprintf(`10:annotated(country("DE"),annotation("%s","%s"))`, nodeselection.AutoExcludeSubnet, nodeselection.AutoExcludeSubnetOFF))) require.NoError(t, placements.Set(fmt.Sprintf(`10:annotated(country("DE"),annotation("%s","%s"))`, nodeselection.AutoExcludeSubnet, nodeselection.AutoExcludeSubnetOFF)))
parsed, err := placements.Parse() parsed, err := placements.Parse()
require.NoError(t, err) require.NoError(t, err)
o.nodesCache.placementRules = parsed.CreateFilters o.placementRules = parsed.CreateFilters
q := queue.MockRepairQueue{} q := queue.MockRepairQueue{}
fork := createFork(o, &q) fork := createFork(o, &q)

View File

@ -10,8 +10,6 @@ import (
"time" "time"
"storj.io/common/storj" "storj.io/common/storj"
"storj.io/common/storj/location"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/nodeselection" "storj.io/storj/satellite/nodeselection"
"storj.io/storj/satellite/overlay" "storj.io/storj/satellite/overlay"
) )
@ -23,11 +21,8 @@ import (
type ReliabilityCache struct { type ReliabilityCache struct {
overlay *overlay.Service overlay *overlay.Service
staleness time.Duration staleness time.Duration
// define from which countries nodes should be marked as offline mu sync.Mutex
excludedCountryCodes map[location.CountryCode]struct{} state atomic.Value // contains immutable *reliabilityState
mu sync.Mutex
state atomic.Value // contains immutable *reliabilityState
placementRules overlay.PlacementRules
} }
// reliabilityState. // reliabilityState.
@ -37,19 +32,10 @@ type reliabilityState struct {
} }
// NewReliabilityCache creates a new reliability checking cache. // NewReliabilityCache creates a new reliability checking cache.
func NewReliabilityCache(overlay *overlay.Service, staleness time.Duration, placementRules overlay.PlacementRules, excludedCountries []string) *ReliabilityCache { func NewReliabilityCache(overlay *overlay.Service, staleness time.Duration) *ReliabilityCache {
excludedCountryCodes := make(map[location.CountryCode]struct{})
for _, countryCode := range excludedCountries {
if cc := location.ToCountryCode(countryCode); cc != location.None {
excludedCountryCodes[cc] = struct{}{}
}
}
return &ReliabilityCache{ return &ReliabilityCache{
overlay: overlay, overlay: overlay,
staleness: staleness, staleness: staleness,
placementRules: placementRules,
excludedCountryCodes: excludedCountryCodes,
} }
} }
@ -75,69 +61,20 @@ func (cache *ReliabilityCache) NumNodes(ctx context.Context) (numNodes int, err
return len(state.nodeByID), nil return len(state.nodeByID), nil
} }
// MissingPieces returns piece indices that are unreliable with the given staleness period. // GetNodes gets the cached SelectedNode records (valid as of the given time) for each of
func (cache *ReliabilityCache) MissingPieces(ctx context.Context, created time.Time, pieces metabase.Pieces) (_ metabase.Pieces, err error) { // the requested node IDs, and returns them in order. If a node is not in the reliability
state, err := cache.loadFast(ctx, created) // cache (that is, it is unknown or disqualified), an empty SelectedNode will be returned
// for the index corresponding to that node ID.
func (cache *ReliabilityCache) GetNodes(ctx context.Context, validUpTo time.Time, nodeIDs []storj.NodeID) ([]nodeselection.SelectedNode, error) {
state, err := cache.loadFast(ctx, validUpTo)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var unreliable metabase.Pieces nodes := make([]nodeselection.SelectedNode, len(nodeIDs))
for _, p := range pieces { for i, nodeID := range nodeIDs {
node, ok := state.nodeByID[p.StorageNode] nodes[i] = state.nodeByID[nodeID]
if !ok || !node.Online || node.Suspended {
unreliable = append(unreliable, p)
} else if _, excluded := cache.excludedCountryCodes[node.CountryCode]; excluded {
unreliable = append(unreliable, p)
}
} }
return unreliable, nil return nodes, nil
}
// OutOfPlacementPieces checks which pieces are out of segment placement. Piece placement is defined by node location which is storing it.
func (cache *ReliabilityCache) OutOfPlacementPieces(ctx context.Context, created time.Time, pieces metabase.Pieces, placement storj.PlacementConstraint) (_ metabase.Pieces, err error) {
defer mon.Task()(&ctx)(nil)
if len(pieces) == 0 {
return metabase.Pieces{}, nil
}
state, err := cache.loadFast(ctx, created)
if err != nil {
return nil, err
}
var outOfPlacementPieces metabase.Pieces
nodeFilters := cache.placementRules(placement)
for _, p := range pieces {
if node, ok := state.nodeByID[p.StorageNode]; ok && !nodeFilters.Match(&node) {
outOfPlacementPieces = append(outOfPlacementPieces, p)
}
}
return outOfPlacementPieces, nil
}
// PiecesNodesLastNetsInOrder returns the /24 subnet for each piece storage node, in order. If a
// requested node is not in the database, an empty string will be returned corresponding to that
// node's last_net.
func (cache *ReliabilityCache) PiecesNodesLastNetsInOrder(ctx context.Context, created time.Time, pieces metabase.Pieces) (lastNets []string, err error) {
defer mon.Task()(&ctx)(nil)
if len(pieces) == 0 {
return []string{}, nil
}
state, err := cache.loadFast(ctx, created)
if err != nil {
return nil, err
}
lastNets = make([]string, len(pieces))
for i, piece := range pieces {
if node, ok := state.nodeByID[piece.StorageNode]; ok {
lastNets[i] = node.LastNet
}
}
return lastNets, nil
} }
func (cache *ReliabilityCache) loadFast(ctx context.Context, validUpTo time.Time) (_ *reliabilityState, err error) { func (cache *ReliabilityCache) loadFast(ctx context.Context, validUpTo time.Time) (_ *reliabilityState, err error) {

View File

@ -13,12 +13,8 @@ import (
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"storj.io/common/storj" "storj.io/common/storj"
"storj.io/common/storj/location"
"storj.io/common/testcontext" "storj.io/common/testcontext"
"storj.io/common/testrand" "storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/nodeevents" "storj.io/storj/satellite/nodeevents"
"storj.io/storj/satellite/nodeselection" "storj.io/storj/satellite/nodeselection"
"storj.io/storj/satellite/overlay" "storj.io/storj/satellite/overlay"
@ -40,13 +36,13 @@ func TestReliabilityCache_Concurrent(t *testing.T) {
ctx.Go(func() error { return overlayCache.Run(cacheCtx) }) ctx.Go(func() error { return overlayCache.Run(cacheCtx) })
defer ctx.Check(overlayCache.Close) defer ctx.Check(overlayCache.Close)
cache := checker.NewReliabilityCache(overlayCache, time.Millisecond, overlay.NewPlacementDefinitions().CreateFilters, []string{}) cache := checker.NewReliabilityCache(overlayCache, time.Millisecond)
var group errgroup.Group var group errgroup.Group
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
group.Go(func() error { group.Go(func() error {
for i := 0; i < 10000; i++ { for i := 0; i < 10000; i++ {
pieces := []metabase.Piece{{StorageNode: testrand.NodeID()}} nodeIDs := []storj.NodeID{testrand.NodeID()}
_, err := cache.MissingPieces(ctx, time.Now(), pieces) _, err := cache.GetNodes(ctx, time.Now(), nodeIDs)
if err != nil { if err != nil {
return err return err
} }
@ -68,59 +64,3 @@ func (fakeOverlayDB) GetParticipatingNodes(context.Context, time.Duration, time.
{ID: testrand.NodeID(), Online: true}, {ID: testrand.NodeID(), Online: true},
}, nil }, nil
} }
func TestReliabilityCache_OutOfPlacementPieces(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Overlay.Node.AsOfSystemTime.Enabled = false
config.Overlay.Node.AsOfSystemTime.DefaultInterval = 0
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
overlayService := planet.Satellites[0].Overlay.Service
config := planet.Satellites[0].Config.Checker
rules := overlay.NewPlacementDefinitions()
rules.AddLegacyStaticRules()
cache := checker.NewReliabilityCache(overlayService, config.ReliabilityCacheStaleness, rules.CreateFilters, []string{})
nodesPlacement := func(location location.CountryCode, nodes ...*testplanet.StorageNode) {
for _, node := range nodes {
err := overlayService.TestNodeCountryCode(ctx, node.ID(), location.String())
require.NoError(t, err)
}
require.NoError(t, cache.Refresh(ctx))
}
allPieces := metabase.Pieces{
metabase.Piece{Number: 0, StorageNode: planet.StorageNodes[0].ID()},
metabase.Piece{Number: 1, StorageNode: planet.StorageNodes[1].ID()},
metabase.Piece{Number: 2, StorageNode: planet.StorageNodes[2].ID()},
metabase.Piece{Number: 3, StorageNode: planet.StorageNodes[3].ID()},
}
pieces, err := cache.OutOfPlacementPieces(ctx, time.Now().Add(-time.Hour), metabase.Pieces{}, storj.EU)
require.NoError(t, err)
require.Empty(t, pieces)
nodesPlacement(location.Poland, planet.StorageNodes...)
pieces, err = cache.OutOfPlacementPieces(ctx, time.Now().Add(-time.Hour), allPieces, storj.EU)
require.NoError(t, err)
require.Empty(t, pieces)
pieces, err = cache.OutOfPlacementPieces(ctx, time.Now().Add(-time.Hour), allPieces, storj.US)
require.NoError(t, err)
require.ElementsMatch(t, allPieces, pieces)
nodesPlacement(location.UnitedStates, planet.StorageNodes[:2]...)
pieces, err = cache.OutOfPlacementPieces(ctx, time.Now().Add(-time.Hour), allPieces, storj.EU)
require.NoError(t, err)
require.ElementsMatch(t, allPieces[:2], pieces)
pieces, err = cache.OutOfPlacementPieces(ctx, time.Now().Add(-time.Hour), allPieces, storj.US)
require.NoError(t, err)
require.ElementsMatch(t, allPieces[2:], pieces)
})
}

View File

@ -0,0 +1,172 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package repair
import (
"golang.org/x/exp/maps"
"storj.io/common/storj"
"storj.io/common/storj/location"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/nodeselection"
)
// PiecesCheckResult contains all necessary aggregate information about the state of pieces in a
// segment. The node that should be holding each piece is evaluated to see if it is online and
// whether it is in a clumped IP network, in an excluded country, or out of placement for the
// segment.
type PiecesCheckResult struct {
// ExcludeNodeIDs is a list of all node IDs holding pieces of this segment.
ExcludeNodeIDs []storj.NodeID
// Missing is a set of Piece Numbers which are to be considered as lost and irretrievable.
// (They reside on offline/disqualified/unknown nodes.)
Missing map[uint16]struct{}
// Retrievable contains all Piece Numbers that are retrievable; that is, all piece numbers
// from the segment that are NOT in Missing.
Retrievable map[uint16]struct{}
// Suspended is a set of Piece Numbers which reside on nodes which are suspended.
Suspended map[uint16]struct{}
// Clumped is a set of Piece Numbers which are to be considered unhealthy because of IP
// clumping. (If DoDeclumping is disabled, this set will be empty.)
Clumped map[uint16]struct{}
// OutOfPlacement is a set of Piece Numbers which are unhealthy because of placement rules.
// (If DoPlacementCheck is disabled, this set will be empty.)
OutOfPlacement map[uint16]struct{}
// InExcludedCountry is a set of Piece Numbers which are unhealthy because they are in
// Excluded countries.
InExcludedCountry map[uint16]struct{}
// ForcingRepair is the set of pieces which force a repair operation for this segment (that
// includes, currently, only pieces in OutOfPlacement).
ForcingRepair map[uint16]struct{}
// Unhealthy contains all Piece Numbers which are in Missing OR Suspended OR Clumped OR
// OutOfPlacement OR InExcludedCountry.
Unhealthy map[uint16]struct{}
// UnhealthyRetrievable is the set of pieces that are "unhealthy-but-retrievable". That is,
// pieces that are in Unhealthy AND Retrievable.
UnhealthyRetrievable map[uint16]struct{}
// Healthy contains all Piece Numbers from the segment which are not in Unhealthy.
// (Equivalently: all Piece Numbers from the segment which are NOT in Missing OR
// Suspended OR Clumped OR OutOfPlacement OR InExcludedCountry).
Healthy map[uint16]struct{}
}
// ClassifySegmentPieces classifies the pieces of a segment into the categories
// represented by a PiecesCheckResult. Pieces may be put into multiple
// categories.
func ClassifySegmentPieces(pieces metabase.Pieces, nodes []nodeselection.SelectedNode, excludedCountryCodes map[location.CountryCode]struct{}, doPlacementCheck, doDeclumping bool, filter nodeselection.NodeFilter) (result PiecesCheckResult) {
result.ExcludeNodeIDs = make([]storj.NodeID, len(pieces))
for i, p := range pieces {
result.ExcludeNodeIDs[i] = p.StorageNode
}
// check excluded countries and remove online nodes from missing pieces
result.Missing = make(map[uint16]struct{})
result.Suspended = make(map[uint16]struct{})
result.Retrievable = make(map[uint16]struct{})
result.InExcludedCountry = make(map[uint16]struct{})
for index, nodeRecord := range nodes {
pieceNum := pieces[index].Number
if !nodeRecord.ID.IsZero() && pieces[index].StorageNode != nodeRecord.ID {
panic("wrong order")
}
if nodeRecord.ID.IsZero() || !nodeRecord.Online {
// node ID was not found, or the node is disqualified or exited,
// or it is offline
result.Missing[pieceNum] = struct{}{}
} else {
// node is expected to be online and receiving requests.
result.Retrievable[pieceNum] = struct{}{}
}
if nodeRecord.Suspended {
result.Suspended[pieceNum] = struct{}{}
}
if _, excluded := excludedCountryCodes[nodeRecord.CountryCode]; excluded {
result.InExcludedCountry[pieceNum] = struct{}{}
}
}
if doDeclumping && nodeselection.GetAnnotation(filter, nodeselection.AutoExcludeSubnet) != nodeselection.AutoExcludeSubnetOFF {
// if multiple pieces are on the same last_net, keep only the first one. The rest are
// to be considered retrievable but unhealthy.
lastNets := make(map[string]struct{}, len(pieces))
result.Clumped = make(map[uint16]struct{})
collectClumpedPieces := func(onlineness bool) {
for index, nodeRecord := range nodes {
if nodeRecord.Online != onlineness {
continue
}
if nodeRecord.LastNet == "" {
continue
}
pieceNum := pieces[index].Number
_, ok := lastNets[nodeRecord.LastNet]
if ok {
// this LastNet was already seen
result.Clumped[pieceNum] = struct{}{}
} else {
// add to the list of seen LastNets
lastNets[nodeRecord.LastNet] = struct{}{}
}
}
}
// go over online nodes first, so that if we have to remove clumped pieces, we prefer
// to remove offline ones over online ones.
collectClumpedPieces(true)
collectClumpedPieces(false)
}
if doPlacementCheck {
// mark all pieces that are out of placement.
result.OutOfPlacement = make(map[uint16]struct{})
for index, nodeRecord := range nodes {
if nodeRecord.ID.IsZero() {
continue
}
if filter.Match(&nodeRecord) {
continue
}
pieceNum := pieces[index].Number
result.OutOfPlacement[pieceNum] = struct{}{}
}
}
// ForcingRepair = OutOfPlacement only, for now
result.ForcingRepair = make(map[uint16]struct{})
maps.Copy(result.ForcingRepair, result.OutOfPlacement)
// Unhealthy = Missing OR Suspended OR Clumped OR OutOfPlacement OR InExcludedCountry
result.Unhealthy = make(map[uint16]struct{})
maps.Copy(result.Unhealthy, result.Missing)
maps.Copy(result.Unhealthy, result.Suspended)
maps.Copy(result.Unhealthy, result.Clumped)
maps.Copy(result.Unhealthy, result.OutOfPlacement)
maps.Copy(result.Unhealthy, result.InExcludedCountry)
// UnhealthyRetrievable = Unhealthy AND Retrievable
result.UnhealthyRetrievable = make(map[uint16]struct{})
for pieceNum := range result.Unhealthy {
if _, isRetrievable := result.Retrievable[pieceNum]; isRetrievable {
result.UnhealthyRetrievable[pieceNum] = struct{}{}
}
}
// Healthy = NOT Unhealthy
result.Healthy = make(map[uint16]struct{})
for _, piece := range pieces {
if _, found := result.Unhealthy[piece.Number]; !found {
result.Healthy[piece.Number] = struct{}{}
}
}
return result
}

View File

@ -1,27 +1,23 @@
// Copyright (C) 2023 Storj Labs, Inc. // Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information. // See LICENSE for copying information.
package repairer package repair
import ( import (
"fmt" "fmt"
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"storj.io/common/identity/testidentity" "storj.io/common/identity/testidentity"
"storj.io/common/storj" "storj.io/common/storj"
"storj.io/common/storj/location" "storj.io/common/storj/location"
"storj.io/common/testcontext"
"storj.io/storj/satellite/metabase" "storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/nodeselection" "storj.io/storj/satellite/nodeselection"
"storj.io/storj/satellite/overlay" "storj.io/storj/satellite/overlay"
) )
func TestClassify(t *testing.T) { func TestClassifySegmentPieces(t *testing.T) {
ctx := testcontext.New(t)
getNodes := func(nodes []nodeselection.SelectedNode, pieces metabase.Pieces) (res []nodeselection.SelectedNode) { getNodes := func(nodes []nodeselection.SelectedNode, pieces metabase.Pieces) (res []nodeselection.SelectedNode) {
for _, piece := range pieces { for _, piece := range pieces {
for _, node := range nodes { for _, node := range nodes {
@ -34,7 +30,6 @@ func TestClassify(t *testing.T) {
} }
return res return res
} }
t.Run("all online", func(t *testing.T) { t.Run("all online", func(t *testing.T) {
var selectedNodes = generateNodes(5, func(ix int) bool { var selectedNodes = generateNodes(5, func(ix int) bool {
return true return true
@ -44,17 +39,16 @@ func TestClassify(t *testing.T) {
c := &overlay.ConfigurablePlacementRule{} c := &overlay.ConfigurablePlacementRule{}
require.NoError(t, c.Set("")) require.NoError(t, c.Set(""))
s := SegmentRepairer{ parsed, err := c.Parse()
placementRules: overlay.NewPlacementDefinitions().CreateFilters,
}
pieces := createPieces(selectedNodes, 0, 1, 2, 3, 4)
result, err := s.classifySegmentPiecesWithNodes(ctx, metabase.Segment{Pieces: pieces}, allNodeIDs(pieces), selectedNodes)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 0, len(result.MissingPiecesSet)) pieces := createPieces(selectedNodes, 0, 1, 2, 3, 4)
require.Equal(t, 0, len(result.ClumpedPiecesSet)) result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, false, parsed.CreateFilters(0))
require.Equal(t, 0, len(result.OutOfPlacementPiecesSet))
require.Equal(t, 0, result.NumUnhealthyRetrievable) require.Equal(t, 0, len(result.Missing))
require.Equal(t, 0, len(result.Clumped))
require.Equal(t, 0, len(result.OutOfPlacement))
require.Equal(t, 0, len(result.UnhealthyRetrievable))
}) })
t.Run("out of placement", func(t *testing.T) { t.Run("out of placement", func(t *testing.T) {
@ -74,21 +68,14 @@ func TestClassify(t *testing.T) {
}.Parse() }.Parse()
require.NoError(t, err) require.NoError(t, err)
s := SegmentRepairer{
placementRules: c.CreateFilters,
doPlacementCheck: true,
log: zaptest.NewLogger(t),
}
pieces := createPieces(selectedNodes, 1, 2, 3, 4, 7, 8) pieces := createPieces(selectedNodes, 1, 2, 3, 4, 7, 8)
result, err := s.classifySegmentPiecesWithNodes(ctx, metabase.Segment{Pieces: pieces, Placement: 10}, allNodeIDs(pieces), getNodes(selectedNodes, pieces)) result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, false, c.CreateFilters(10))
require.NoError(t, err)
require.Equal(t, 0, len(result.MissingPiecesSet)) require.Equal(t, 0, len(result.Missing))
require.Equal(t, 0, len(result.ClumpedPiecesSet)) require.Equal(t, 0, len(result.Clumped))
// 1,2,3 are in Germany instead of GB // 1,2,3 are in Germany instead of GB
require.Equal(t, 3, len(result.OutOfPlacementPiecesSet)) require.Equal(t, 3, len(result.OutOfPlacement))
require.Equal(t, 3, result.NumUnhealthyRetrievable) require.Equal(t, 3, len(result.UnhealthyRetrievable))
}) })
t.Run("out of placement and offline", func(t *testing.T) { t.Run("out of placement and offline", func(t *testing.T) {
@ -104,21 +91,15 @@ func TestClassify(t *testing.T) {
}.Parse() }.Parse()
require.NoError(t, err) require.NoError(t, err)
s := SegmentRepairer{
placementRules: c.CreateFilters,
doPlacementCheck: true,
}
pieces := createPieces(selectedNodes, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9) pieces := createPieces(selectedNodes, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
result, err := s.classifySegmentPiecesWithNodes(ctx, metabase.Segment{Pieces: pieces, Placement: 10}, allNodeIDs(pieces), getNodes(selectedNodes, pieces)) result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, false, c.CreateFilters(10))
require.NoError(t, err)
// offline nodes // offline nodes
require.Equal(t, 5, len(result.MissingPiecesSet)) require.Equal(t, 5, len(result.Missing))
require.Equal(t, 0, len(result.ClumpedPiecesSet)) require.Equal(t, 0, len(result.Clumped))
require.Equal(t, 10, len(result.OutOfPlacementPiecesSet)) require.Equal(t, 10, len(result.OutOfPlacement))
require.Equal(t, 5, result.NumUnhealthyRetrievable) require.Equal(t, 5, len(result.UnhealthyRetrievable))
numHealthy := len(pieces) - len(result.MissingPiecesSet) - result.NumUnhealthyRetrievable numHealthy := len(pieces) - len(result.Missing) - len(result.UnhealthyRetrievable)
require.Equal(t, 0, numHealthy) require.Equal(t, 0, numHealthy)
}) })
@ -131,23 +112,17 @@ func TestClassify(t *testing.T) {
}) })
c := overlay.NewPlacementDefinitions() c := overlay.NewPlacementDefinitions()
s := SegmentRepairer{
placementRules: c.CreateFilters,
doDeclumping: true,
log: zaptest.NewLogger(t),
}
// first 5: online, 2 in each subnet --> healthy: one from (0,1) (2,3) (4), offline: (5,6) but 5 is in the same subnet as 6 // first 5: online, 2 in each subnet --> healthy: one from (0,1) (2,3) (4), offline: (5,6) but 5 is in the same subnet as 6
pieces := createPieces(selectedNodes, 0, 1, 2, 3, 4, 5, 6) pieces := createPieces(selectedNodes, 0, 1, 2, 3, 4, 5, 6)
result, err := s.classifySegmentPiecesWithNodes(ctx, metabase.Segment{Pieces: pieces}, allNodeIDs(pieces), getNodes(selectedNodes, pieces)) result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, true, c.CreateFilters(0))
require.NoError(t, err)
// offline nodes // offline nodes
require.Equal(t, 2, len(result.MissingPiecesSet)) require.Equal(t, 2, len(result.Missing))
require.Equal(t, 3, len(result.ClumpedPiecesSet)) require.Equal(t, 3, len(result.Clumped))
require.Equal(t, 0, len(result.OutOfPlacementPiecesSet)) require.Equal(t, 0, len(result.OutOfPlacement))
require.Equal(t, 2, result.NumUnhealthyRetrievable) require.Equal(t, 2, len(result.UnhealthyRetrievable))
numHealthy := len(pieces) - len(result.MissingPiecesSet) - result.NumUnhealthyRetrievable numHealthy := len(pieces) - len(result.Missing) - len(result.UnhealthyRetrievable)
require.Equal(t, 3, numHealthy) require.Equal(t, 3, numHealthy)
}) })
@ -165,22 +140,16 @@ func TestClassify(t *testing.T) {
}.Parse() }.Parse()
require.NoError(t, err) require.NoError(t, err)
s := SegmentRepairer{
placementRules: c.CreateFilters,
doDeclumping: true,
}
// first 5: online, 2 in each subnet --> healthy: one from (0,1) (2,3) (4), offline: (5,6) but 5 is in the same subnet as 6 // first 5: online, 2 in each subnet --> healthy: one from (0,1) (2,3) (4), offline: (5,6) but 5 is in the same subnet as 6
pieces := createPieces(selectedNodes, 0, 1, 2, 3, 4, 5, 6) pieces := createPieces(selectedNodes, 0, 1, 2, 3, 4, 5, 6)
result, err := s.classifySegmentPiecesWithNodes(ctx, metabase.Segment{Pieces: pieces, Placement: 10}, allNodeIDs(pieces), getNodes(selectedNodes, pieces)) result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, true, c.CreateFilters(10))
require.NoError(t, err)
// offline nodes // offline nodes
require.Equal(t, 2, len(result.MissingPiecesSet)) require.Equal(t, 2, len(result.Missing))
require.Equal(t, 0, len(result.ClumpedPiecesSet)) require.Equal(t, 0, len(result.Clumped))
require.Equal(t, 0, len(result.OutOfPlacementPiecesSet)) require.Equal(t, 0, len(result.OutOfPlacement))
require.Equal(t, 0, result.NumUnhealthyRetrievable) require.Equal(t, 0, len(result.UnhealthyRetrievable))
numHealthy := len(pieces) - len(result.MissingPiecesSet) - result.NumUnhealthyRetrievable numHealthy := len(pieces) - len(result.Missing) - len(result.UnhealthyRetrievable)
require.Equal(t, 5, numHealthy) require.Equal(t, 5, numHealthy)
}) })
@ -205,16 +174,7 @@ func createPieces(selectedNodes []nodeselection.SelectedNode, indexes ...int) (r
Number: uint16(index), Number: uint16(index),
} }
piece.StorageNode = selectedNodes[index].ID piece.StorageNode = selectedNodes[index].ID
res = append(res, piece) res = append(res, piece)
} }
return return
} }
func allNodeIDs(pieces metabase.Pieces) (res []storj.NodeID) {
for _, piece := range pieces {
res = append(res, piece.StorageNode)
}
return res
}

View File

@ -3038,14 +3038,15 @@ func TestECRepairerGetPrefersCachedIPPort(t *testing.T) {
func TestSegmentInExcludedCountriesRepair(t *testing.T) { func TestSegmentInExcludedCountriesRepair(t *testing.T) {
testplanet.Run(t, testplanet.Config{ testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, SatelliteCount: 1,
StorageNodeCount: 7, StorageNodeCount: 20,
UplinkCount: 1, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{ Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.Combine( Satellite: testplanet.Combine(
func(log *zap.Logger, index int, config *satellite.Config) { func(log *zap.Logger, index int, config *satellite.Config) {
config.Repairer.InMemoryRepair = true config.Repairer.InMemoryRepair = true
config.Repairer.MaxExcessRateOptimalThreshold = 0.0
}, },
testplanet.ReconfigureRS(2, 3, 4, 5), testplanet.ReconfigureRS(3, 5, 8, 10),
testplanet.RepairExcludedCountryCodes([]string{"FR", "BE"}), testplanet.RepairExcludedCountryCodes([]string{"FR", "BE"}),
), ),
}, },
@ -3066,14 +3067,22 @@ func TestSegmentInExcludedCountriesRepair(t *testing.T) {
segment, _ := getRemoteSegment(ctx, t, satellite, planet.Uplinks[0].Projects[0].ID, "testbucket") segment, _ := getRemoteSegment(ctx, t, satellite, planet.Uplinks[0].Projects[0].ID, "testbucket")
remotePieces := segment.Pieces remotePieces := segment.Pieces
require.GreaterOrEqual(t, len(segment.Pieces), int(segment.Redundancy.RequiredShares)) require.GreaterOrEqual(t, len(segment.Pieces), int(segment.Redundancy.OptimalShares))
err = planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, remotePieces[1].StorageNode, "FR") numExcluded := 5
require.NoError(t, err) var nodesInExcluded storj.NodeIDList
nodeInExcluded := remotePieces[1].StorageNode for i := 0; i < numExcluded; i++ {
// make one piece after optimal bad err = planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, remotePieces[i].StorageNode, "FR")
err = planet.StopNodeAndUpdate(ctx, planet.FindNode(remotePieces[2].StorageNode)) require.NoError(t, err)
require.NoError(t, err) nodesInExcluded = append(nodesInExcluded, remotePieces[i].StorageNode)
}
// make extra pieces after the optimal threshold bad, so we know there are exactly
// OptimalShares retrievable shares. numExcluded of them are in an excluded country.
for i := int(segment.Redundancy.OptimalShares); i < len(remotePieces); i++ {
err = planet.StopNodeAndUpdate(ctx, planet.FindNode(remotePieces[i].StorageNode))
require.NoError(t, err)
}
// trigger checker with ranged loop to add segment to repair queue // trigger checker with ranged loop to add segment to repair queue
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx) _, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
@ -3088,7 +3097,7 @@ func TestSegmentInExcludedCountriesRepair(t *testing.T) {
satellite.Repair.Repairer.Loop.Pause() satellite.Repair.Repairer.Loop.Pause()
satellite.Repair.Repairer.WaitForPendingRepairs() satellite.Repair.Repairer.WaitForPendingRepairs()
// Verify that the segment was removed // Verify that the segment was removed from the repair queue
count, err = satellite.DB.RepairQueue().Count(ctx) count, err = satellite.DB.RepairQueue().Count(ctx)
require.NoError(t, err) require.NoError(t, err)
require.Zero(t, count) require.Zero(t, count)
@ -3098,15 +3107,30 @@ func TestSegmentInExcludedCountriesRepair(t *testing.T) {
require.NotEqual(t, segment.Pieces, segmentAfterRepair.Pieces) require.NotEqual(t, segment.Pieces, segmentAfterRepair.Pieces)
require.GreaterOrEqual(t, len(segmentAfterRepair.Pieces), int(segmentAfterRepair.Redundancy.OptimalShares)) require.GreaterOrEqual(t, len(segmentAfterRepair.Pieces), int(segmentAfterRepair.Redundancy.OptimalShares))
// check excluded area node still exists // the number of nodes that should still be online holding intact pieces, not in
var found bool // excluded countries
for _, p := range segmentAfterRepair.Pieces { expectHealthyNodes := int(segment.Redundancy.OptimalShares) - numExcluded
if p.StorageNode == nodeInExcluded { // repair should create this many new pieces to get the segment up to OptimalShares
found = true // shares, not counting excluded-country nodes
break expectNewPieces := int(segment.Redundancy.OptimalShares) - expectHealthyNodes
// so there should be this many pieces after repair, not counting excluded-country
// nodes
expectPiecesAfterRepair := expectHealthyNodes + expectNewPieces
// so there should be this many excluded-country pieces left in the segment (we
// couldn't keep all of them, or we would have had more than TotalShares pieces).
expectRemainingExcluded := int(segment.Redundancy.TotalShares) - expectPiecesAfterRepair
// check excluded area nodes are no longer being used
var found int
for _, nodeID := range nodesInExcluded {
for _, p := range segmentAfterRepair.Pieces {
if p.StorageNode == nodeID {
found++
break
}
} }
} }
require.True(t, found, fmt.Sprintf("node %s not in segment, but should be\n", segmentAfterRepair.Pieces[1].StorageNode.String())) require.Equal(t, found, expectRemainingExcluded, "found wrong number of excluded-country pieces after repair")
nodesInPointer := make(map[storj.NodeID]bool) nodesInPointer := make(map[storj.NodeID]bool)
for _, n := range segmentAfterRepair.Pieces { for _, n := range segmentAfterRepair.Pieces {
// check for duplicates // check for duplicates

View File

@ -497,6 +497,10 @@ func (ec *ECRepairer) Repair(ctx context.Context, limits []*pb.AddressedOrderLim
successfulCount++ successfulCount++
if successfulCount >= int32(successfulNeeded) { if successfulCount >= int32(successfulNeeded) {
// if this is logged more than once for a given repair operation, it is because
// an upload succeeded right after we called cancel(), before that upload could
// actually be canceled. So, successfulCount should increase by one with each
// repeated logging.
ec.log.Debug("Number of successful uploads met. Canceling the long tail...", ec.log.Debug("Number of successful uploads met. Canceling the long tail...",
zap.Int32("Successfully repaired", atomic.LoadInt32(&successfulCount)), zap.Int32("Successfully repaired", atomic.LoadInt32(&successfulCount)),
) )
@ -574,10 +578,12 @@ func (ec *ECRepairer) putPiece(ctx, parent context.Context, limit *pb.AddressedO
// to slow connection. No error logging for this case. // to slow connection. No error logging for this case.
if errors.Is(parent.Err(), context.Canceled) { if errors.Is(parent.Err(), context.Canceled) {
ec.log.Debug("Upload to node canceled by user", ec.log.Debug("Upload to node canceled by user",
zap.Stringer("Node ID", storageNodeID)) zap.Stringer("Node ID", storageNodeID),
zap.Stringer("Piece ID", pieceID))
} else { } else {
ec.log.Debug("Node cut from upload due to slow connection", ec.log.Debug("Node cut from upload due to slow connection",
zap.Stringer("Node ID", storageNodeID)) zap.Stringer("Node ID", storageNodeID),
zap.Stringer("Piece ID", pieceID))
} }
// make sure context.Canceled is the primary error in the error chain // make sure context.Canceled is the primary error in the error chain

View File

@ -22,7 +22,6 @@ import (
"storj.io/common/sync2" "storj.io/common/sync2"
"storj.io/storj/satellite/audit" "storj.io/storj/satellite/audit"
"storj.io/storj/satellite/metabase" "storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/nodeselection"
"storj.io/storj/satellite/orders" "storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay" "storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/repair" "storj.io/storj/satellite/repair"
@ -209,17 +208,24 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
mon.IntVal("repair_segment_size").Observe(int64(segment.EncryptedSize)) //mon:locked mon.IntVal("repair_segment_size").Observe(int64(segment.EncryptedSize)) //mon:locked
stats.repairSegmentSize.Observe(int64(segment.EncryptedSize)) stats.repairSegmentSize.Observe(int64(segment.EncryptedSize))
piecesCheck, err := repairer.classifySegmentPieces(ctx, segment) allNodeIDs := make([]storj.NodeID, len(segment.Pieces))
if err != nil { for i, p := range segment.Pieces {
return false, err allNodeIDs[i] = p.StorageNode
} }
selectedNodes, err := repairer.overlay.GetNodes(ctx, allNodeIDs)
if err != nil {
return false, overlayQueryError.New("error identifying missing pieces: %w", err)
}
if len(selectedNodes) != len(segment.Pieces) {
repairer.log.Error("GetNodes returned an invalid result", zap.Any("pieces", segment.Pieces), zap.Any("selectedNodes", selectedNodes), zap.Error(err))
return false, overlayQueryError.New("GetNodes returned an invalid result")
}
pieces := segment.Pieces pieces := segment.Pieces
piecesCheck := repair.ClassifySegmentPieces(pieces, selectedNodes, repairer.excludedCountryCodes, repairer.doPlacementCheck, repairer.doDeclumping, repairer.placementRules(segment.Placement))
numRetrievable := len(pieces) - len(piecesCheck.MissingPiecesSet)
numHealthy := len(pieces) - len(piecesCheck.MissingPiecesSet) - piecesCheck.NumUnhealthyRetrievable
// irreparable segment // irreparable segment
if numRetrievable < int(segment.Redundancy.RequiredShares) { if len(piecesCheck.Retrievable) < int(segment.Redundancy.RequiredShares) {
mon.Counter("repairer_segments_below_min_req").Inc(1) //mon:locked mon.Counter("repairer_segments_below_min_req").Inc(1) //mon:locked
stats.repairerSegmentsBelowMinReq.Inc(1) stats.repairerSegmentsBelowMinReq.Inc(1)
mon.Meter("repair_nodes_unavailable").Mark(1) //mon:locked mon.Meter("repair_nodes_unavailable").Mark(1) //mon:locked
@ -228,7 +234,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
repairer.log.Warn("irreparable segment", repairer.log.Warn("irreparable segment",
zap.String("StreamID", queueSegment.StreamID.String()), zap.String("StreamID", queueSegment.StreamID.String()),
zap.Uint64("Position", queueSegment.Position.Encode()), zap.Uint64("Position", queueSegment.Position.Encode()),
zap.Int("piecesAvailable", numRetrievable), zap.Int("piecesAvailable", len(piecesCheck.Retrievable)),
zap.Int16("piecesRequired", segment.Redundancy.RequiredShares), zap.Int16("piecesRequired", segment.Redundancy.RequiredShares),
) )
return false, nil return false, nil
@ -251,75 +257,64 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
repairThreshold = overrideValue repairThreshold = overrideValue
} }
// repair not needed if len(piecesCheck.Healthy) > int(repairThreshold) {
if numHealthy-piecesCheck.NumHealthyInExcludedCountries > int(repairThreshold) { // No repair is needed (note Healthy does not include pieces in ForcingRepair).
// remove pieces out of placement without repairing as we are above repair threshold
if len(piecesCheck.OutOfPlacementPiecesSet) > 0 {
var outOfPlacementPieces metabase.Pieces var dropPieces metabase.Pieces
if len(piecesCheck.ForcingRepair) > 0 {
// No repair is needed, but remove forcing-repair pieces without a repair operation,
// as we will still be above the repair threshold.
for _, piece := range pieces { for _, piece := range pieces {
if _, ok := piecesCheck.OutOfPlacementPiecesSet[piece.Number]; ok { if _, ok := piecesCheck.ForcingRepair[piece.Number]; ok {
outOfPlacementPieces = append(outOfPlacementPieces, piece) dropPieces = append(dropPieces, piece)
} }
} }
if len(dropPieces) > 0 {
newPieces, err := segment.Pieces.Update(nil, dropPieces)
if err != nil {
return false, metainfoPutError.Wrap(err)
}
newPieces, err := segment.Pieces.Update(nil, outOfPlacementPieces) err = repairer.metabase.UpdateSegmentPieces(ctx, metabase.UpdateSegmentPieces{
if err != nil { StreamID: segment.StreamID,
return false, metainfoPutError.Wrap(err) Position: segment.Position,
OldPieces: segment.Pieces,
NewRedundancy: segment.Redundancy,
NewPieces: newPieces,
NewRepairedAt: time.Now(),
})
if err != nil {
return false, metainfoPutError.Wrap(err)
}
mon.Meter("dropped_undesirable_pieces_without_repair").Mark(len(dropPieces))
} }
err = repairer.metabase.UpdateSegmentPieces(ctx, metabase.UpdateSegmentPieces{
StreamID: segment.StreamID,
Position: segment.Position,
OldPieces: segment.Pieces,
NewRedundancy: segment.Redundancy,
NewPieces: newPieces,
NewRepairedAt: time.Now(),
})
if err != nil {
return false, metainfoPutError.Wrap(err)
}
mon.Meter("dropped_out_of_placement_pieces").Mark(len(piecesCheck.OutOfPlacementPiecesSet))
} }
mon.Meter("repair_unnecessary").Mark(1) //mon:locked mon.Meter("repair_unnecessary").Mark(1) //mon:locked
stats.repairUnnecessary.Mark(1) stats.repairUnnecessary.Mark(1)
repairer.log.Debug("segment above repair threshold", zap.Int("numHealthy", numHealthy), zap.Int32("repairThreshold", repairThreshold), repairer.log.Debug("segment above repair threshold", zap.Int("numHealthy", len(piecesCheck.Healthy)), zap.Int32("repairThreshold", repairThreshold),
zap.Int("numClumped", len(piecesCheck.ClumpedPiecesSet)), zap.Int("numOffPieces", len(piecesCheck.OutOfPlacementPiecesSet))) zap.Int("numClumped", len(piecesCheck.Clumped)), zap.Int("numOffPieces", len(piecesCheck.OutOfPlacement)),
zap.Int("numExcluded", len(piecesCheck.InExcludedCountry)), zap.Int("droppedPieces", len(dropPieces)))
return true, nil return true, nil
} }
healthyRatioBeforeRepair := 0.0 healthyRatioBeforeRepair := 0.0
if segment.Redundancy.TotalShares != 0 { if segment.Redundancy.TotalShares != 0 {
healthyRatioBeforeRepair = float64(numHealthy) / float64(segment.Redundancy.TotalShares) healthyRatioBeforeRepair = float64(len(piecesCheck.Healthy)) / float64(segment.Redundancy.TotalShares)
} }
mon.FloatVal("healthy_ratio_before_repair").Observe(healthyRatioBeforeRepair) //mon:locked mon.FloatVal("healthy_ratio_before_repair").Observe(healthyRatioBeforeRepair) //mon:locked
stats.healthyRatioBeforeRepair.Observe(healthyRatioBeforeRepair) stats.healthyRatioBeforeRepair.Observe(healthyRatioBeforeRepair)
lostPiecesSet := piecesCheck.MissingPiecesSet // Create the order limits for the GET_REPAIR action
retrievablePieces := make(metabase.Pieces, 0, len(piecesCheck.Retrievable))
var retrievablePieces metabase.Pieces
unhealthyPieces := make(map[metabase.Piece]struct{})
healthySet := make(map[int32]struct{})
// Populate retrievablePieces with all pieces from the segment except those correlating to indices in lostPieces.
// Populate unhealthyPieces with all pieces in lostPieces, clumpedPieces or outOfPlacementPieces.
for _, piece := range pieces { for _, piece := range pieces {
if lostPiecesSet[piece.Number] { if _, found := piecesCheck.Retrievable[piece.Number]; found {
unhealthyPieces[piece] = struct{}{}
} else {
retrievablePieces = append(retrievablePieces, piece) retrievablePieces = append(retrievablePieces, piece)
if piecesCheck.ClumpedPiecesSet[piece.Number] || piecesCheck.OutOfPlacementPiecesSet[piece.Number] {
unhealthyPieces[piece] = struct{}{}
} else {
healthySet[int32(piece.Number)] = struct{}{}
}
} }
} }
// Create the order limits for the GET_REPAIR action
getOrderLimits, getPrivateKey, cachedNodesInfo, err := repairer.orders.CreateGetRepairOrderLimits(ctx, segment, retrievablePieces) getOrderLimits, getPrivateKey, cachedNodesInfo, err := repairer.orders.CreateGetRepairOrderLimits(ctx, segment, retrievablePieces)
if err != nil { if err != nil {
if orders.ErrDownloadFailedNotEnoughPieces.Has(err) { if orders.ErrDownloadFailedNotEnoughPieces.Has(err) {
@ -339,26 +334,27 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
return false, orderLimitFailureError.New("could not create GET_REPAIR order limits: %w", err) return false, orderLimitFailureError.New("could not create GET_REPAIR order limits: %w", err)
} }
// Double check for retrievable pieces which became irretrievable inside CreateGetRepairOrderLimits // Double check for retrievable pieces which were recognized as irretrievable during the
// Add them to unhealthyPieces. // call to CreateGetRepairOrderLimits. Add or remove them from the appropriate sets.
for _, piece := range retrievablePieces { for _, piece := range retrievablePieces {
if getOrderLimits[piece.Number] == nil { if getOrderLimits[piece.Number] == nil {
unhealthyPieces[piece] = struct{}{} piecesCheck.Missing[piece.Number] = struct{}{}
piecesCheck.Unhealthy[piece.Number] = struct{}{}
delete(piecesCheck.Healthy, piece.Number)
delete(piecesCheck.Retrievable, piece.Number)
delete(piecesCheck.UnhealthyRetrievable, piece.Number)
} }
} }
numHealthy = len(healthySet)
var requestCount int var requestCount int
var minSuccessfulNeeded int
{ {
totalNeeded := math.Ceil(float64(redundancy.OptimalThreshold()) * repairer.multiplierOptimalThreshold) totalNeeded := int(math.Ceil(float64(redundancy.OptimalThreshold()) * repairer.multiplierOptimalThreshold))
requestCount = int(totalNeeded) + piecesCheck.NumHealthyInExcludedCountries if totalNeeded > redundancy.TotalCount() {
if requestCount > redundancy.TotalCount() { totalNeeded = redundancy.TotalCount()
requestCount = redundancy.TotalCount()
} }
requestCount -= numHealthy requestCount = totalNeeded - len(piecesCheck.Healthy)
minSuccessfulNeeded = redundancy.OptimalThreshold() - numHealthy + piecesCheck.NumHealthyInExcludedCountries
} }
minSuccessfulNeeded := redundancy.OptimalThreshold() - len(piecesCheck.Healthy)
// Request Overlay for n-h new storage nodes // Request Overlay for n-h new storage nodes
request := overlay.FindStorageNodesRequest{ request := overlay.FindStorageNodesRequest{
@ -371,8 +367,19 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
return false, overlayQueryError.Wrap(err) return false, overlayQueryError.Wrap(err)
} }
// Create the order limits for the PUT_REPAIR action // Create the order limits for the PUT_REPAIR action. We want to keep pieces in Healthy
putLimits, putPrivateKey, err := repairer.orders.CreatePutRepairOrderLimits(ctx, segment, getOrderLimits, healthySet, newNodes, repairer.multiplierOptimalThreshold, piecesCheck.NumHealthyInExcludedCountries) // as well as pieces in InExcludedCountry (our policy is to let those nodes keep the
// pieces they have, as long as they are kept intact and retrievable).
maxToKeep := int(segment.Redundancy.TotalShares) - len(newNodes)
toKeep := map[uint16]struct{}{}
maps.Copy(toKeep, piecesCheck.Healthy)
for excludedNodeNum := range piecesCheck.InExcludedCountry {
if len(toKeep) >= maxToKeep {
break
}
toKeep[excludedNodeNum] = struct{}{}
}
putLimits, putPrivateKey, err := repairer.orders.CreatePutRepairOrderLimits(ctx, segment, getOrderLimits, toKeep, newNodes)
if err != nil { if err != nil {
return false, orderLimitFailureError.New("could not create PUT_REPAIR order limits: %w", err) return false, orderLimitFailureError.New("could not create PUT_REPAIR order limits: %w", err)
} }
@ -544,8 +551,11 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
mon.Meter("repair_bytes_uploaded").Mark64(bytesRepaired) //mon:locked mon.Meter("repair_bytes_uploaded").Mark64(bytesRepaired) //mon:locked
healthyAfterRepair := numHealthy + len(repairedPieces) healthyAfterRepair := len(piecesCheck.Healthy) + len(repairedPieces)
switch { switch {
case healthyAfterRepair >= int(segment.Redundancy.OptimalShares):
mon.Meter("repair_success").Mark(1) //mon:locked
stats.repairSuccess.Mark(1)
case healthyAfterRepair <= int(segment.Redundancy.RepairShares): case healthyAfterRepair <= int(segment.Redundancy.RepairShares):
// Important: this indicates a failure to PUT enough pieces to the network to pass // Important: this indicates a failure to PUT enough pieces to the network to pass
// the repair threshold, and _not_ a failure to reconstruct the segment. But we // the repair threshold, and _not_ a failure to reconstruct the segment. But we
@ -554,12 +564,9 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
// not as healthy as we want it to be. // not as healthy as we want it to be.
mon.Meter("repair_failed").Mark(1) //mon:locked mon.Meter("repair_failed").Mark(1) //mon:locked
stats.repairFailed.Mark(1) stats.repairFailed.Mark(1)
case healthyAfterRepair < int(segment.Redundancy.OptimalShares): default:
mon.Meter("repair_partial").Mark(1) //mon:locked mon.Meter("repair_partial").Mark(1) //mon:locked
stats.repairPartial.Mark(1) stats.repairPartial.Mark(1)
default:
mon.Meter("repair_success").Mark(1) //mon:locked
stats.repairSuccess.Mark(1)
} }
healthyRatioAfterRepair := 0.0 healthyRatioAfterRepair := 0.0
@ -571,23 +578,44 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
stats.healthyRatioAfterRepair.Observe(healthyRatioAfterRepair) stats.healthyRatioAfterRepair.Observe(healthyRatioAfterRepair)
var toRemove metabase.Pieces var toRemove metabase.Pieces
if healthyAfterRepair >= int(segment.Redundancy.OptimalShares) { switch {
// if full repair, remove all unhealthy pieces case healthyAfterRepair >= int(segment.Redundancy.OptimalShares):
for unhealthyPiece := range unhealthyPieces { // Repair was fully successful; remove all unhealthy pieces except those in
toRemove = append(toRemove, unhealthyPiece) // (Retrievable AND InExcludedCountry). Those, we allow to remain on the nodes as
} // long as the nodes are keeping the pieces intact and available.
} else { for _, piece := range pieces {
// if partial repair, leave unrepaired unhealthy pieces in the pointer if _, isUnhealthy := piecesCheck.Unhealthy[piece.Number]; isUnhealthy {
for unhealthyPiece := range unhealthyPieces { _, retrievable := piecesCheck.Retrievable[piece.Number]
if repairedMap[unhealthyPiece.Number] { _, inExcludedCountry := piecesCheck.InExcludedCountry[piece.Number]
// add only repaired pieces in the slice, unrepaired if retrievable && inExcludedCountry {
// unhealthy pieces are not removed from the pointer continue
toRemove = append(toRemove, unhealthyPiece) }
toRemove = append(toRemove, piece)
} }
} }
case healthyAfterRepair > int(segment.Redundancy.RepairShares):
// Repair was successful enough that we still want to drop all out-of-placement
// pieces. We want to do that wherever possible, except where doing so puts data in
// jeopardy.
for _, piece := range pieces {
if _, ok := piecesCheck.OutOfPlacement[piece.Number]; ok {
toRemove = append(toRemove, piece)
}
}
default:
// Repair improved the health of the piece, but it is still at or below the
// repair threshold (not counting unhealthy-but-retrievable pieces). To be safe,
// we will keep unhealthy-but-retrievable pieces in the segment for now.
}
// in any case, we want to remove pieces for which we have replacements now.
for _, piece := range pieces {
if repairedMap[piece.Number] {
toRemove = append(toRemove, piece)
}
} }
// add pieces that failed piece hashes verification to the removal list // add pieces that failed piece hash verification to the removal list
for _, outcome := range piecesReport.Failed { for _, outcome := range piecesReport.Failed {
toRemove = append(toRemove, outcome.Piece) toRemove = append(toRemove, outcome.Piece)
} }
@ -623,153 +651,26 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
segmentAge = time.Since(segment.CreatedAt) segmentAge = time.Since(segment.CreatedAt)
} }
// TODO what to do with RepairCount
var repairCount int64
// pointer.RepairCount++
mon.IntVal("segment_time_until_repair").Observe(int64(segmentAge.Seconds())) //mon:locked mon.IntVal("segment_time_until_repair").Observe(int64(segmentAge.Seconds())) //mon:locked
stats.segmentTimeUntilRepair.Observe(int64(segmentAge.Seconds())) stats.segmentTimeUntilRepair.Observe(int64(segmentAge.Seconds()))
mon.IntVal("segment_repair_count").Observe(repairCount) //mon:locked
stats.segmentRepairCount.Observe(repairCount)
repairer.log.Debug("repaired segment", repairer.log.Debug("repaired segment",
zap.Stringer("Stream ID", segment.StreamID), zap.Stringer("Stream ID", segment.StreamID),
zap.Uint64("Position", segment.Position.Encode()), zap.Uint64("Position", segment.Position.Encode()),
zap.Int("clumped pieces", len(piecesCheck.ClumpedPiecesSet)), zap.Int("clumped pieces", len(piecesCheck.Clumped)),
zap.Int("out of placement pieces", len(piecesCheck.OutOfPlacementPiecesSet)), zap.Int("out of placement pieces", len(piecesCheck.OutOfPlacement)),
zap.Int("in excluded countries", piecesCheck.NumHealthyInExcludedCountries), zap.Int("in excluded countries", len(piecesCheck.InExcludedCountry)),
zap.Int("missing pieces", len(piecesCheck.Missing)),
zap.Int("removed pieces", len(toRemove)), zap.Int("removed pieces", len(toRemove)),
zap.Int("repaired pieces", len(repairedPieces)), zap.Int("repaired pieces", len(repairedPieces)),
zap.Int("healthy before repair", numHealthy), zap.Int("retrievable pieces", len(piecesCheck.Retrievable)),
zap.Int("healthy after repair", healthyAfterRepair)) zap.Int("healthy before repair", len(piecesCheck.Healthy)),
zap.Int("healthy after repair", healthyAfterRepair),
zap.Int("total before repair", len(piecesCheck.ExcludeNodeIDs)),
zap.Int("total after repair", len(newPieces)))
return true, nil return true, nil
} }
type piecesCheckResult struct {
ExcludeNodeIDs []storj.NodeID
MissingPiecesSet map[uint16]bool
ClumpedPiecesSet map[uint16]bool
// piece which are out of placement (both offline and online)
OutOfPlacementPiecesSet map[uint16]bool
NumUnhealthyRetrievable int
NumHealthyInExcludedCountries int
}
func (repairer *SegmentRepairer) classifySegmentPieces(ctx context.Context, segment metabase.Segment) (result piecesCheckResult, err error) {
defer mon.Task()(&ctx)(&err)
pieces := segment.Pieces
allNodeIDs := make([]storj.NodeID, len(pieces))
for i, piece := range pieces {
allNodeIDs[i] = piece.StorageNode
}
selectedNodes, err := repairer.overlay.GetNodes(ctx, allNodeIDs)
if err != nil {
return piecesCheckResult{}, overlayQueryError.New("error identifying missing pieces: %w", err)
}
return repairer.classifySegmentPiecesWithNodes(ctx, segment, allNodeIDs, selectedNodes)
}
func (repairer *SegmentRepairer) classifySegmentPiecesWithNodes(ctx context.Context, segment metabase.Segment, allNodeIDs []storj.NodeID, selectedNodes []nodeselection.SelectedNode) (result piecesCheckResult, err error) {
pieces := segment.Pieces
nodeIDPieceMap := map[storj.NodeID]uint16{}
result.MissingPiecesSet = map[uint16]bool{}
for i, p := range pieces {
allNodeIDs[i] = p.StorageNode
nodeIDPieceMap[p.StorageNode] = p.Number
result.MissingPiecesSet[p.Number] = true
}
result.ExcludeNodeIDs = allNodeIDs
if len(selectedNodes) != len(pieces) {
repairer.log.Error("GetNodes returned an invalid result", zap.Any("pieces", pieces), zap.Any("selectedNodes", selectedNodes), zap.Error(err))
return piecesCheckResult{}, overlayQueryError.New("GetNodes returned an invalid result")
}
nodeFilters := repairer.placementRules(segment.Placement)
// remove online nodes from missing pieces
for _, node := range selectedNodes {
if !node.Online || node.Suspended {
continue
}
// count online nodes in excluded countries only if country is not excluded by segment
// placement, those nodes will be counted with out of placement check
if _, excluded := repairer.excludedCountryCodes[node.CountryCode]; excluded && nodeFilters.Match(&node) {
result.NumHealthyInExcludedCountries++
}
pieceNum := nodeIDPieceMap[node.ID]
delete(result.MissingPiecesSet, pieceNum)
}
if repairer.doDeclumping && !nodeselection.AllowSameSubnet(nodeFilters) {
// if multiple pieces are on the same last_net, keep only the first one. The rest are
// to be considered retrievable but unhealthy.
lastNets := make([]string, 0, len(allNodeIDs))
reliablePieces := metabase.Pieces{}
collectClumpedPieces := func(onlineness bool) {
for _, node := range selectedNodes {
if node.Online != onlineness {
continue
}
pieceNum := nodeIDPieceMap[node.ID]
reliablePieces = append(reliablePieces, metabase.Piece{
Number: pieceNum,
StorageNode: node.ID,
})
lastNets = append(lastNets, node.LastNet)
}
}
// go over online nodes first, so that if we have to remove clumped pieces, we prefer
// to remove offline ones over online ones.
collectClumpedPieces(true)
collectClumpedPieces(false)
clumpedPieces := repair.FindClumpedPieces(reliablePieces, lastNets)
result.ClumpedPiecesSet = map[uint16]bool{}
for _, clumpedPiece := range clumpedPieces {
result.ClumpedPiecesSet[clumpedPiece.Number] = true
}
}
result.OutOfPlacementPiecesSet = map[uint16]bool{}
if repairer.doPlacementCheck {
for _, node := range selectedNodes {
if nodeFilters.Match(&node) {
continue
}
result.OutOfPlacementPiecesSet[nodeIDPieceMap[node.ID]] = true
}
}
// verify that some of clumped pieces and out of placement pieces are not the same
unhealthyRetrievableSet := map[uint16]bool{}
maps.Copy(unhealthyRetrievableSet, result.ClumpedPiecesSet)
maps.Copy(unhealthyRetrievableSet, result.OutOfPlacementPiecesSet)
// offline nodes are not retrievable
for _, node := range selectedNodes {
if !node.Online {
delete(unhealthyRetrievableSet, nodeIDPieceMap[node.ID])
}
}
result.NumUnhealthyRetrievable = len(unhealthyRetrievableSet)
return result, nil
}
// checkIfSegmentAltered checks if oldSegment has been altered since it was selected for audit. // checkIfSegmentAltered checks if oldSegment has been altered since it was selected for audit.
func (repairer *SegmentRepairer) checkIfSegmentAltered(ctx context.Context, oldSegment metabase.Segment) (err error) { func (repairer *SegmentRepairer) checkIfSegmentAltered(ctx context.Context, oldSegment metabase.Segment) (err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)

View File

@ -429,60 +429,6 @@ func TestSegmentRepairPlacementNotEnoughNodes(t *testing.T) {
}) })
} }
func TestSegmentRepairPlacementAndExcludedCountries(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.Combine(
testplanet.ReconfigureRS(1, 2, 4, 4),
func(log *zap.Logger, index int, config *satellite.Config) {
config.Overlay.RepairExcludedCountryCodes = []string{"US"}
},
),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
require.NoError(t, planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], "testbucket"))
_, err := planet.Satellites[0].API.Buckets.Service.UpdateBucket(ctx, buckets.Bucket{
ProjectID: planet.Uplinks[0].Projects[0].ID,
Name: "testbucket",
Placement: storj.EU,
})
require.NoError(t, err)
for _, node := range planet.StorageNodes {
require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), "PL"))
}
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "object", testrand.Bytes(5*memory.KiB))
require.NoError(t, err)
segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
require.NoError(t, err)
require.Len(t, segments, 1)
require.Len(t, segments[0].Pieces, 4)
// change single node to location outside bucket placement and location which is part of RepairExcludedCountryCodes
require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, segments[0].Pieces[0].StorageNode, "US"))
require.NoError(t, planet.Satellites[0].Repairer.Overlay.DownloadSelectionCache.Refresh(ctx))
shouldDelete, err := planet.Satellites[0].Repairer.SegmentRepairer.Repair(ctx, &queue.InjuredSegment{
StreamID: segments[0].StreamID,
Position: segments[0].Position,
})
require.NoError(t, err)
require.True(t, shouldDelete)
// we are checking that repairer counted only single piece as out of placement and didn't count this piece
// also as from excluded country. That would cause full repair because repairer would count single pieces
// as unhealthy two times. Full repair would restore number of pieces to 4 but we just removed single pieces.
segmentsAfter, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
require.NoError(t, err)
require.ElementsMatch(t, segments[0].Pieces[1:], segmentsAfter[0].Pieces)
})
}
func piecesOnNodeByIndex(ctx context.Context, planet *testplanet.Planet, pieces metabase.Pieces, allowedIndexes []int) error { func piecesOnNodeByIndex(ctx context.Context, planet *testplanet.Planet, pieces metabase.Pieces, allowedIndexes []int) error {
findIndex := func(id storj.NodeID) int { findIndex := func(id storj.NodeID) int {