From 915f3952afb3863669ac87c262e133445134eb64 Mon Sep 17 00:00:00 2001 From: paul cannon Date: Tue, 13 Dec 2022 14:40:15 -0600 Subject: [PATCH] satellite/repair: repair pieces on the same last_net We avoid putting more than one piece of a segment on the same /24 network (or /64 for ipv6). However, it is possible for multiple pieces of the same segment to move to the same network over time. Nodes can change addresses, or segments could be uploaded with dev settings, etc. We will call such pieces "clumped", as they are clumped into the same net, and are much more likely to be lost or preserved together. This change teaches the repair checker to recognize segments which have clumped pieces, and put them in the repair queue. It also teaches the repair worker to repair such segments (treating clumped pieces as "retrievable but unhealthy"; i.e., they will be replaced on new nodes if possible). Refs: https://github.com/storj/storj/issues/5391 Change-Id: Iaa9e339fee8f80f4ad39895438e9f18606338908 --- cmd/satellite/repair_segment.go | 2 +- monkit.lock | 1 + satellite/orders/service.go | 14 ++-- satellite/overlay/service.go | 11 ++- satellite/repair/checker/checker.go | 25 ++++++- satellite/repair/checker/checkerstats.go | 2 + satellite/repair/clumping.go | 27 +++++++ satellite/repair/clumping_test.go | 64 +++++++++++++++++ satellite/repair/repair_test.go | 83 ++++++++++++++++++++++ satellite/repair/repairer/segments.go | 83 +++++++++++++++------- satellite/satellitedb/overlaycache.go | 42 ++++++++--- satellite/satellitedb/overlaycache_test.go | 77 ++++++++++++++++++++ 12 files changed, 387 insertions(+), 44 deletions(-) create mode 100644 satellite/repair/clumping.go create mode 100644 satellite/repair/clumping_test.go diff --git a/cmd/satellite/repair_segment.go b/cmd/satellite/repair_segment.go index 0691d165f..d70ade588 100644 --- a/cmd/satellite/repair_segment.go +++ b/cmd/satellite/repair_segment.go @@ -276,7 +276,7 @@ func reuploadSegment(ctx context.Context, log *zap.Logger, peer *satellite.Repai optimalThresholdMultiplier := float64(1) // is this value fine? numHealthyInExcludedCountries := 0 putLimits, putPrivateKey, err := peer.Orders.Service.CreatePutRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, - make([]*pb.AddressedOrderLimit, len(newNodes)), newNodes, optimalThresholdMultiplier, numHealthyInExcludedCountries) + make([]*pb.AddressedOrderLimit, len(newNodes)), make(map[int32]struct{}), newNodes, optimalThresholdMultiplier, numHealthyInExcludedCountries) if err != nil { return errs.New("could not create PUT_REPAIR order limits: %w", err) } diff --git a/monkit.lock b/monkit.lock index 4ae296c54..1ef55c88e 100644 --- a/monkit.lock +++ b/monkit.lock @@ -91,6 +91,7 @@ storj.io/storj/satellite/metrics."total_remote_segments" IntVal storj.io/storj/satellite/orders."download_failed_not_enough_pieces_uplink" Meter storj.io/storj/satellite/repair/checker."checker_injured_segment_health" FloatVal storj.io/storj/satellite/repair/checker."checker_segment_age" IntVal +storj.io/storj/satellite/repair/checker."checker_segment_clumped_count" IntVal storj.io/storj/satellite/repair/checker."checker_segment_health" FloatVal storj.io/storj/satellite/repair/checker."checker_segment_healthy_count" IntVal storj.io/storj/satellite/repair/checker."checker_segment_time_until_irreparable" IntVal diff --git a/satellite/orders/service.go b/satellite/orders/service.go index be56c6b53..0dd3304ff 100644 --- a/satellite/orders/service.go +++ b/satellite/orders/service.go @@ -449,7 +449,7 @@ func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, bucket m } // CreatePutRepairOrderLimits creates the order limits for uploading the repaired pieces of segment to newNodes. -func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, bucket metabase.BucketLocation, segment metabase.Segment, getOrderLimits []*pb.AddressedOrderLimit, newNodes []*overlay.SelectedNode, optimalThresholdMultiplier float64, numPiecesInExcludedCountries int) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error) { +func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, bucket metabase.BucketLocation, segment metabase.Segment, getOrderLimits []*pb.AddressedOrderLimit, healthySet map[int32]struct{}, newNodes []*overlay.SelectedNode, optimalThresholdMultiplier float64, numPiecesInExcludedCountries int) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error) { defer mon.Task()(&ctx)(&err) // Create the order limits for being used to upload the repaired pieces @@ -462,14 +462,14 @@ func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, bucket m totalPiecesAfterRepair = totalPieces } - var numCurrentPieces int + var numRetrievablePieces int for _, o := range getOrderLimits { if o != nil { - numCurrentPieces++ + numRetrievablePieces++ } } - totalPiecesToRepair := totalPiecesAfterRepair - numCurrentPieces + totalPiecesToRepair := totalPiecesAfterRepair - len(healthySet) limits := make([]*pb.AddressedOrderLimit, totalPieces) @@ -485,7 +485,11 @@ func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, bucket m var pieceNum int32 for _, node := range newNodes { - for int(pieceNum) < totalPieces && getOrderLimits[pieceNum] != nil { + for int(pieceNum) < totalPieces { + _, isHealthy := healthySet[pieceNum] + if !isHealthy { + break + } pieceNum++ } diff --git a/satellite/overlay/service.go b/satellite/overlay/service.go index 402f99633..a7c8674ae 100644 --- a/satellite/overlay/service.go +++ b/satellite/overlay/service.go @@ -99,8 +99,10 @@ type DB interface { // GetExitStatus returns a node's graceful exit status. GetExitStatus(ctx context.Context, nodeID storj.NodeID) (exitStatus *ExitStatus, err error) - // GetNodesNetwork returns the /24 subnet for each storage node, order is not guaranteed. + // GetNodesNetwork returns the last_net subnet for each storage node, order is not guaranteed. GetNodesNetwork(ctx context.Context, nodeIDs []storj.NodeID) (nodeNets []string, err error) + // GetNodesNetworkInOrder returns the last_net subnet for each storage node in order of the requested nodeIDs. + GetNodesNetworkInOrder(ctx context.Context, nodeIDs []storj.NodeID) (nodeNets []string, err error) // DisqualifyNode disqualifies a storage node. DisqualifyNode(ctx context.Context, nodeID storj.NodeID, disqualifiedAt time.Time, reason DisqualificationReason) (email string, err error) @@ -424,6 +426,13 @@ func (service *Service) IsOnline(node *NodeDossier) bool { return time.Since(node.Reputation.LastContactSuccess) < service.config.Node.OnlineWindow } +// GetNodesNetworkInOrder returns the /24 subnet for each storage node, in order. If a +// requested node is not in the database, an empty string will be returned corresponding +// to that node's last_net. +func (service *Service) GetNodesNetworkInOrder(ctx context.Context, nodeIDs []storj.NodeID) (lastNets []string, err error) { + return service.db.GetNodesNetworkInOrder(ctx, nodeIDs) +} + // FindStorageNodesForGracefulExit searches the overlay network for nodes that meet the provided requirements for graceful-exit requests. func (service *Service) FindStorageNodesForGracefulExit(ctx context.Context, req FindStorageNodesRequest) (_ []*SelectedNode, err error) { defer mon.Task()(&ctx)(&err) diff --git a/satellite/repair/checker/checker.go b/satellite/repair/checker/checker.go index a581842cd..2c4928518 100644 --- a/satellite/repair/checker/checker.go +++ b/satellite/repair/checker/checker.go @@ -39,6 +39,7 @@ type Checker struct { metabase *metabase.DB segmentLoop *segmentloop.Service nodestate *ReliabilityCache + overlayService *overlay.Service statsCollector *statsCollector repairOverrides RepairOverridesMap nodeFailureRate float64 @@ -55,6 +56,7 @@ func NewChecker(logger *zap.Logger, repairQueue queue.RepairQueue, metabase *met metabase: metabase, segmentLoop: segmentLoop, nodestate: NewReliabilityCache(overlay, config.ReliabilityCacheStaleness), + overlayService: overlay, statsCollector: newStatsCollector(), repairOverrides: config.RepairOverrides.GetMap(), nodeFailureRate: config.NodeFailureRate, @@ -117,6 +119,7 @@ func (checker *Checker) IdentifyInjuredSegments(ctx context.Context) (err error) observer := &checkerObserver{ repairQueue: checker.createInsertBuffer(), nodestate: checker.nodestate, + overlayService: checker.overlayService, statsCollector: checker.statsCollector, monStats: aggregateStats{}, repairOverrides: checker.repairOverrides, @@ -175,6 +178,7 @@ var _ segmentloop.Observer = (*checkerObserver)(nil) type checkerObserver struct { repairQueue *queue.InsertBuffer nodestate *ReliabilityCache + overlayService *overlay.Service statsCollector *statsCollector monStats aggregateStats // TODO(cam): once we verify statsCollector reports data correctly, remove this repairOverrides RepairOverridesMap @@ -190,6 +194,7 @@ func NewCheckerObserver(checker *Checker) segmentloop.Observer { return &checkerObserver{ repairQueue: checker.createInsertBuffer(), nodestate: checker.nodestate, + overlayService: checker.overlayService, statsCollector: checker.statsCollector, monStats: aggregateStats{}, repairOverrides: checker.repairOverrides, @@ -229,7 +234,7 @@ func (obs *checkerObserver) LoopStarted(context.Context, segmentloop.LoopInfo) ( } func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *segmentloop.Segment) (err error) { - // we are expliticy not adding monitoring here as we are tracking loop observers separately + // we are explicitly not adding monitoring here as we are tracking loop observers separately // ignore segment if expired if segment.Expired(time.Now()) { @@ -270,11 +275,27 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *segmentl return errs.Combine(Error.New("error getting missing pieces"), err) } - numHealthy := len(pieces) - len(missingPieces) + // if multiple pieces are on the same last_net, keep only the first one. The rest are + // to be considered retrievable but unhealthy. + nodeIDs := make([]storj.NodeID, len(pieces)) + for i, p := range pieces { + nodeIDs[i] = p.StorageNode + } + lastNets, err := obs.overlayService.GetNodesNetworkInOrder(ctx, nodeIDs) + if err != nil { + obs.monStats.remoteSegmentsFailedToCheck++ + stats.iterationAggregates.remoteSegmentsFailedToCheck++ + return errs.Combine(Error.New("error determining node last_nets"), err) + } + clumpedPieces := repair.FindClumpedPieces(segment.Pieces, lastNets) + + numHealthy := len(pieces) - len(missingPieces) - len(clumpedPieces) mon.IntVal("checker_segment_total_count").Observe(int64(len(pieces))) //mon:locked stats.segmentTotalCount.Observe(int64(len(pieces))) mon.IntVal("checker_segment_healthy_count").Observe(int64(numHealthy)) //mon:locked stats.segmentHealthyCount.Observe(int64(numHealthy)) + mon.IntVal("checker_segment_clumped_count").Observe(int64(len(clumpedPieces))) //mon:locked + stats.segmentClumpedCount.Observe(int64(len(clumpedPieces))) segmentAge := time.Since(segment.CreatedAt) mon.IntVal("checker_segment_age").Observe(int64(segmentAge.Seconds())) //mon:locked diff --git a/satellite/repair/checker/checkerstats.go b/satellite/repair/checker/checkerstats.go index e97933e25..068bfa312 100644 --- a/satellite/repair/checker/checkerstats.go +++ b/satellite/repair/checker/checkerstats.go @@ -70,6 +70,7 @@ type stats struct { segmentsBelowMinReq *monkit.Counter segmentTotalCount *monkit.IntVal segmentHealthyCount *monkit.IntVal + segmentClumpedCount *monkit.IntVal segmentAge *monkit.IntVal segmentHealth *monkit.FloatVal injuredSegmentHealth *monkit.FloatVal @@ -125,6 +126,7 @@ func newStats(rs string) *stats { segmentsBelowMinReq: monkit.NewCounter(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segments_below_min_req").WithTag("rs_scheme", rs)), segmentTotalCount: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_total_count").WithTag("rs_scheme", rs)), segmentHealthyCount: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_healthy_count").WithTag("rs_scheme", rs)), + segmentClumpedCount: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_clumped_count").WithTag("rs_scheme", rs)), segmentAge: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_age").WithTag("rs_scheme", rs)), segmentHealth: monkit.NewFloatVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_health").WithTag("rs_scheme", rs)), injuredSegmentHealth: monkit.NewFloatVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_injured_segment_health").WithTag("rs_scheme", rs)), diff --git a/satellite/repair/clumping.go b/satellite/repair/clumping.go new file mode 100644 index 000000000..5fdddcbd1 --- /dev/null +++ b/satellite/repair/clumping.go @@ -0,0 +1,27 @@ +// Copyright (C) 2023 Storj Labs, Inc. +// See LICENSE for copying information. + +package repair + +import "storj.io/storj/satellite/metabase" + +// FindClumpedPieces finds pieces that are stored in the same last_net (i.e., the same /24 network +// in the IPv4 case). The first piece for a given last_net is fine, but any subsequent pieces in +// the same last_net will be returned as part of the 'clumped' list. +// +// lastNets must be a slice of the same length as pieces; lastNets[i] corresponds to pieces[i]. +func FindClumpedPieces(pieces metabase.Pieces, lastNets []string) (clumped metabase.Pieces) { + lastNetSet := make(map[string]struct{}) + for i, p := range pieces { + lastNet := lastNets[i] + _, ok := lastNetSet[lastNet] + if ok { + // this last_net was already seen + clumped = append(clumped, p) + } else { + // add this last_net to the set of seen nets + lastNetSet[lastNet] = struct{}{} + } + } + return clumped +} diff --git a/satellite/repair/clumping_test.go b/satellite/repair/clumping_test.go new file mode 100644 index 000000000..cb7c1a497 --- /dev/null +++ b/satellite/repair/clumping_test.go @@ -0,0 +1,64 @@ +// Copyright (C) 2023 Storj Labs, Inc. +// See LICENSE for copying information. + +package repair_test + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "storj.io/common/testrand" + "storj.io/storj/satellite/metabase" + "storj.io/storj/satellite/repair" +) + +func TestFindClumpedPieces(t *testing.T) { + pieces := make(metabase.Pieces, 10) + for n := range pieces { + pieces[n] = metabase.Piece{ + Number: 0, + StorageNode: testrand.NodeID(), + } + } + + t.Run("all-separate-nets", func(t *testing.T) { + lastNets := make([]string, len(pieces)) + for n := range lastNets { + lastNets[n] = fmt.Sprintf("172.16.%d.0", n) + } + clumped := repair.FindClumpedPieces(pieces, lastNets) + require.Len(t, clumped, 0) + }) + + t.Run("one-clumped", func(t *testing.T) { + lastNets := make([]string, len(pieces)) + for n := range lastNets { + lastNets[n] = fmt.Sprintf("172.16.%d.0", n) + } + lastNets[len(lastNets)-1] = lastNets[0] + clumped := repair.FindClumpedPieces(pieces, lastNets) + require.Equal(t, metabase.Pieces{pieces[len(pieces)-1]}, clumped) + }) + + t.Run("all-clumped", func(t *testing.T) { + lastNets := make([]string, len(pieces)) + for n := range lastNets { + lastNets[n] = "172.16.41.0" + } + clumped := repair.FindClumpedPieces(pieces, lastNets) + require.Equal(t, pieces[1:], clumped) + }) + + t.Run("two-clumps", func(t *testing.T) { + lastNets := make([]string, len(pieces)) + for n := range lastNets { + lastNets[n] = fmt.Sprintf("172.16.%d.0", n) + lastNets[2] = lastNets[0] + lastNets[4] = lastNets[1] + } + clumped := repair.FindClumpedPieces(pieces, lastNets) + require.Equal(t, metabase.Pieces{pieces[2], pieces[4]}, clumped) + }) +} diff --git a/satellite/repair/repair_test.go b/satellite/repair/repair_test.go index 852fbe87d..58a164101 100644 --- a/satellite/repair/repair_test.go +++ b/satellite/repair/repair_test.go @@ -32,6 +32,7 @@ import ( "storj.io/storj/satellite/metabase" "storj.io/storj/satellite/overlay" "storj.io/storj/satellite/repair/checker" + "storj.io/storj/satellite/repair/queue" "storj.io/storj/satellite/repair/repairer" "storj.io/storj/satellite/reputation" "storj.io/storj/storagenode" @@ -1822,6 +1823,7 @@ func updateNodeCheckIn(ctx context.Context, overlayDB overlay.DB, node *testplan Address: local.Address, }, LastIPPort: local.Address, + LastNet: local.Address, IsUp: isUp, Operator: &local.Operator, Capacity: &local.Capacity, @@ -3205,3 +3207,84 @@ func TestSegmentInExcludedCountriesRepairIrreparable(t *testing.T) { func reputationRatio(info reputation.Info) float64 { return info.AuditReputationAlpha / (info.AuditReputationAlpha + info.AuditReputationBeta) } + +func TestRepairClumpedPieces(t *testing.T) { + // Test that if nodes change IPs such that multiple pieces of a segment + // reside in the same network, that segment will be considered unhealthy + // by the repair checker and it will be repaired by the repair worker. + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, + StorageNodeCount: 6, + UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: testplanet.ReconfigureRS(2, 3, 4, 4), + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + uplinkPeer := planet.Uplinks[0] + satellite := planet.Satellites[0] + // stop audit to prevent possible interactions i.e. repair timeout problems + satellite.Audit.Worker.Loop.Pause() + + satellite.Repair.Checker.Loop.Pause() + satellite.Repair.Repairer.Loop.Pause() + + var testData = testrand.Bytes(8 * memory.KiB) + // first, upload some remote data + err := uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path", testData) + require.NoError(t, err) + + segment, _ := getRemoteSegment(ctx, t, satellite, uplinkPeer.Projects[0].ID, "testbucket") + remotePiecesBefore := segment.Pieces + + // that segment should be ignored by repair checker for now + satellite.Repair.Checker.Loop.TriggerWait() + injuredSegment, err := satellite.DB.RepairQueue().Select(ctx) + require.Error(t, err) + if !queue.ErrEmpty.Has(err) { + require.FailNow(t, "Should get ErrEmptyQueue, but got", err) + } + require.Nil(t, injuredSegment) + + // pieces list has not changed + segment, _ = getRemoteSegment(ctx, t, satellite, uplinkPeer.Projects[0].ID, "testbucket") + remotePiecesAfter := segment.Pieces + require.Equal(t, remotePiecesBefore, remotePiecesAfter) + + // now move the network of one storage node holding a piece, so that it's the same as another + node0 := planet.FindNode(remotePiecesAfter[0].StorageNode) + node1 := planet.FindNode(remotePiecesAfter[1].StorageNode) + + local := node0.Contact.Service.Local() + checkInInfo := overlay.NodeCheckInInfo{ + NodeID: node0.ID(), + Address: &pb.NodeAddress{Address: local.Address}, + LastIPPort: local.Address, + LastNet: node1.Contact.Service.Local().Address, + IsUp: true, + Operator: &local.Operator, + Capacity: &local.Capacity, + Version: &local.Version, + } + err = satellite.DB.OverlayCache().UpdateCheckIn(ctx, checkInInfo, time.Now().UTC(), overlay.NodeSelectionConfig{}) + require.NoError(t, err) + + // running repair checker again should put the segment into the repair queue + satellite.Repair.Checker.Loop.TriggerWait() + // and subsequently running the repair worker should pull that off the queue and repair it + satellite.Repair.Repairer.Loop.TriggerWait() + satellite.Repair.Repairer.WaitForPendingRepairs() + + // confirm that the segment now has exactly one piece on (node0 or node1) + // and still has the right number of pieces. + segment, _ = getRemoteSegment(ctx, t, satellite, uplinkPeer.Projects[0].ID, "testbucket") + require.Len(t, segment.Pieces, 4) + foundOnFirstNetwork := 0 + for _, piece := range segment.Pieces { + if piece.StorageNode.Compare(node0.ID()) == 0 || piece.StorageNode.Compare(node1.ID()) == 0 { + foundOnFirstNetwork++ + } + } + require.Equalf(t, 1, foundOnFirstNetwork, + "%v should only include one of %s or %s", segment.Pieces, node0.ID(), node1.ID()) + }) +} diff --git a/satellite/repair/repairer/segments.go b/satellite/repair/repairer/segments.go index f21623f23..1d976ba5a 100644 --- a/satellite/repair/repairer/segments.go +++ b/satellite/repair/repairer/segments.go @@ -22,6 +22,7 @@ import ( "storj.io/storj/satellite/metabase" "storj.io/storj/satellite/orders" "storj.io/storj/satellite/overlay" + "storj.io/storj/satellite/repair" "storj.io/storj/satellite/repair/checker" "storj.io/storj/satellite/repair/queue" "storj.io/uplink/private/eestream" @@ -195,9 +196,26 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue return false, overlayQueryError.New("error identifying missing pieces: %w", err) } - numHealthy := len(pieces) - len(missingPieces) + // if multiple pieces are on the same last_net, keep only the first one. The rest are + // to be considered retrievable but unhealthy. + nodeIDs := make([]storj.NodeID, len(pieces)) + for i, p := range pieces { + nodeIDs[i] = p.StorageNode + } + lastNets, err := repairer.overlay.GetNodesNetworkInOrder(ctx, nodeIDs) + if err != nil { + return false, metainfoGetError.Wrap(err) + } + clumpedPieces := repair.FindClumpedPieces(segment.Pieces, lastNets) + clumpedPiecesSet := make(map[uint16]bool) + for _, clumpedPiece := range clumpedPieces { + clumpedPiecesSet[clumpedPiece.Number] = true + } + + numRetrievable := len(pieces) - len(missingPieces) + numHealthy := len(pieces) - len(missingPieces) - len(clumpedPieces) // irreparable segment - if numHealthy < int(segment.Redundancy.RequiredShares) { + if numRetrievable < int(segment.Redundancy.RequiredShares) { mon.Counter("repairer_segments_below_min_req").Inc(1) //mon:locked stats.repairerSegmentsBelowMinReq.Inc(1) mon.Meter("repair_nodes_unavailable").Mark(1) //mon:locked @@ -206,7 +224,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue repairer.log.Warn("irreparable segment", zap.String("StreamID", queueSegment.StreamID.String()), zap.Uint64("Position", queueSegment.Position.Encode()), - zap.Int("piecesAvailable", numHealthy), + zap.Int("piecesAvailable", numRetrievable), zap.Int16("piecesRequired", segment.Redundancy.RequiredShares), ) return false, nil @@ -253,19 +271,27 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue lostPiecesSet := sliceToSet(missingPieces) - var healthyPieces, unhealthyPieces metabase.Pieces - // Populate healthyPieces with all pieces from the segment except those correlating to indices in lostPieces + var retrievablePieces metabase.Pieces + unhealthyPieces := make(map[metabase.Piece]struct{}) + healthySet := make(map[int32]struct{}) + // Populate retrievablePieces with all pieces from the segment except those correlating to indices in lostPieces. + // Populate unhealthyPieces with all pieces in lostPieces OR clumpedPieces. for _, piece := range pieces { excludeNodeIDs = append(excludeNodeIDs, piece.StorageNode) - if !lostPiecesSet[piece.Number] { - healthyPieces = append(healthyPieces, piece) + if lostPiecesSet[piece.Number] { + unhealthyPieces[piece] = struct{}{} } else { - unhealthyPieces = append(unhealthyPieces, piece) + retrievablePieces = append(retrievablePieces, piece) + if clumpedPiecesSet[piece.Number] { + unhealthyPieces[piece] = struct{}{} + } else { + healthySet[int32(piece.Number)] = struct{}{} + } } } // Create the order limits for the GET_REPAIR action - getOrderLimits, getPrivateKey, cachedNodesInfo, err := repairer.orders.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, healthyPieces) + getOrderLimits, getPrivateKey, cachedNodesInfo, err := repairer.orders.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, retrievablePieces) if err != nil { if orders.ErrDownloadFailedNotEnoughPieces.Has(err) { mon.Counter("repairer_segments_below_min_req").Inc(1) //mon:locked @@ -276,7 +302,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue repairer.log.Warn("irreparable segment: too many nodes offline", zap.String("StreamID", queueSegment.StreamID.String()), zap.Uint64("Position", queueSegment.Position.Encode()), - zap.Int("piecesAvailable", len(healthyPieces)), + zap.Int("piecesAvailable", len(retrievablePieces)), zap.Int16("piecesRequired", segment.Redundancy.RequiredShares), zap.Error(err), ) @@ -284,24 +310,25 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue return false, orderLimitFailureError.New("could not create GET_REPAIR order limits: %w", err) } - // Double check for healthy pieces which became unhealthy inside CreateGetRepairOrderLimits - // Remove them from healthyPieces and add them to unhealthyPieces - var newHealthyPieces metabase.Pieces - for _, piece := range healthyPieces { + // Double check for retrievable pieces which became irretrievable inside CreateGetRepairOrderLimits + // Add them to unhealthyPieces. + for _, piece := range retrievablePieces { if getOrderLimits[piece.Number] == nil { - unhealthyPieces = append(unhealthyPieces, piece) - } else { - newHealthyPieces = append(newHealthyPieces, piece) + unhealthyPieces[piece] = struct{}{} } } - healthyPieces = newHealthyPieces + numHealthy = len(healthySet) var requestCount int var minSuccessfulNeeded int { totalNeeded := math.Ceil(float64(redundancy.OptimalThreshold()) * repairer.multiplierOptimalThreshold) - requestCount = int(totalNeeded) - len(healthyPieces) + numHealthyInExcludedCountries - minSuccessfulNeeded = redundancy.OptimalThreshold() - len(healthyPieces) + numHealthyInExcludedCountries + requestCount = int(totalNeeded) + numHealthyInExcludedCountries + if requestCount > redundancy.TotalCount() { + requestCount = redundancy.TotalCount() + } + requestCount -= numHealthy + minSuccessfulNeeded = redundancy.OptimalThreshold() - numHealthy + numHealthyInExcludedCountries } // Request Overlay for n-h new storage nodes @@ -315,12 +342,12 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue } // Create the order limits for the PUT_REPAIR action - putLimits, putPrivateKey, err := repairer.orders.CreatePutRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, getOrderLimits, newNodes, repairer.multiplierOptimalThreshold, numHealthyInExcludedCountries) + putLimits, putPrivateKey, err := repairer.orders.CreatePutRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, getOrderLimits, healthySet, newNodes, repairer.multiplierOptimalThreshold, numHealthyInExcludedCountries) if err != nil { return false, orderLimitFailureError.New("could not create PUT_REPAIR order limits: %w", err) } - // Download the segment using just the healthy pieces + // Download the segment using just the retrievable pieces segmentReader, piecesReport, err := repairer.ec.Get(ctx, getOrderLimits, cachedNodesInfo, getPrivateKey, redundancy, int64(segment.EncryptedSize)) // ensure we get values, even if only zero values, so that redash can have an alert based on this @@ -483,7 +510,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue mon.Meter("repair_bytes_uploaded").Mark64(bytesRepaired) //mon:locked - healthyAfterRepair := len(healthyPieces) + len(repairedPieces) + healthyAfterRepair := numHealthy + len(repairedPieces) switch { case healthyAfterRepair <= int(segment.Redundancy.RepairShares): // Important: this indicates a failure to PUT enough pieces to the network to pass @@ -512,14 +539,16 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue var toRemove metabase.Pieces if healthyAfterRepair >= int(segment.Redundancy.OptimalShares) { // if full repair, remove all unhealthy pieces - toRemove = unhealthyPieces + for unhealthyPiece := range unhealthyPieces { + toRemove = append(toRemove, unhealthyPiece) + } } else { // if partial repair, leave unrepaired unhealthy pieces in the pointer - for _, piece := range unhealthyPieces { - if repairedMap[piece.Number] { + for unhealthyPiece := range unhealthyPieces { + if repairedMap[unhealthyPiece.Number] { // add only repaired pieces in the slice, unrepaired // unhealthy pieces are not removed from the pointer - toRemove = append(toRemove, piece) + toRemove = append(toRemove, unhealthyPiece) } } } diff --git a/satellite/satellitedb/overlaycache.go b/satellite/satellitedb/overlaycache.go index 272b57538..0afe1409f 100644 --- a/satellite/satellitedb/overlaycache.go +++ b/satellite/satellitedb/overlaycache.go @@ -178,10 +178,16 @@ func (cache *overlaycache) selectAllStorageNodesDownload(ctx context.Context, on return nodes, Error.Wrap(rows.Err()) } -// GetNodesNetwork returns the /24 subnet for each storage node, order is not guaranteed. +// GetNodesNetwork returns the /24 subnet for each storage node. Order is not guaranteed. +// If a requested node is not in the database, no corresponding last_net will be returned +// for that node. func (cache *overlaycache) GetNodesNetwork(ctx context.Context, nodeIDs []storj.NodeID) (nodeNets []string, err error) { + query := ` + SELECT last_net FROM nodes + WHERE id = any($1::bytea[]) + ` for { - nodeNets, err = cache.getNodesNetwork(ctx, nodeIDs) + nodeNets, err = cache.getNodesNetwork(ctx, nodeIDs, query) if err != nil { if cockroachutil.NeedsRetry(err) { continue @@ -194,15 +200,35 @@ func (cache *overlaycache) GetNodesNetwork(ctx context.Context, nodeIDs []storj. return nodeNets, err } -func (cache *overlaycache) getNodesNetwork(ctx context.Context, nodeIDs []storj.NodeID) (nodeNets []string, err error) { +// GetNodesNetworkInOrder returns the /24 subnet for each storage node, in order. If a +// requested node is not in the database, an empty string will be returned corresponding +// to that node's last_net. +func (cache *overlaycache) GetNodesNetworkInOrder(ctx context.Context, nodeIDs []storj.NodeID) (nodeNets []string, err error) { + query := ` + SELECT coalesce(n.last_net, '') + FROM unnest($1::bytea[]) WITH ORDINALITY AS input(node_id, ord) + LEFT OUTER JOIN nodes n ON input.node_id = n.id + ORDER BY input.ord + ` + for { + nodeNets, err = cache.getNodesNetwork(ctx, nodeIDs, query) + if err != nil { + if cockroachutil.NeedsRetry(err) { + continue + } + return nodeNets, err + } + break + } + + return nodeNets, err +} + +func (cache *overlaycache) getNodesNetwork(ctx context.Context, nodeIDs []storj.NodeID, query string) (nodeNets []string, err error) { defer mon.Task()(&ctx)(&err) var rows tagsql.Rows - rows, err = cache.db.Query(ctx, cache.db.Rebind(` - SELECT last_net FROM nodes - WHERE id = any($1::bytea[]) - `), pgutil.NodeIDArray(nodeIDs), - ) + rows, err = cache.db.Query(ctx, cache.db.Rebind(query), pgutil.NodeIDArray(nodeIDs)) if err != nil { return nil, err } diff --git a/satellite/satellitedb/overlaycache_test.go b/satellite/satellitedb/overlaycache_test.go index 6d1db68d8..8b5a3e074 100644 --- a/satellite/satellitedb/overlaycache_test.go +++ b/satellite/satellitedb/overlaycache_test.go @@ -5,6 +5,9 @@ package satellitedb_test import ( "context" + "encoding/binary" + "math/rand" + "net" "testing" "time" @@ -281,3 +284,77 @@ func assertContained(ctx context.Context, t testing.TB, cache overlay.DB, args . nodeID, n, expectedContainment, nodeInDB.Contained) } } + +func TestGetNodesNetwork(t *testing.T) { + satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { + cache := db.OverlayCache() + const ( + distinctNetworks = 10 + netMask = 28 + nodesPerNetwork = 1 << (32 - netMask) + ) + mask := net.CIDRMask(netMask, 32) + + nodes := make([]storj.NodeID, distinctNetworks*nodesPerNetwork) + ips := make([]net.IP, len(nodes)) + lastNets := make([]string, len(nodes)) + setOfNets := make(map[string]struct{}) + + for n := range nodes { + nodes[n] = testrand.NodeID() + ips[n] = make(net.IP, 4) + binary.BigEndian.PutUint32(ips[n], uint32(n)) + lastNets[n] = ips[n].Mask(mask).String() + setOfNets[lastNets[n]] = struct{}{} + + checkInInfo := overlay.NodeCheckInInfo{ + IsUp: true, + Address: &pb.NodeAddress{Address: ips[n].String()}, + LastNet: lastNets[n], + Version: &pb.NodeVersion{Version: "v0.0.0"}, + NodeID: nodes[n], + } + err := cache.UpdateCheckIn(ctx, checkInInfo, time.Now().UTC(), overlay.NodeSelectionConfig{}) + require.NoError(t, err) + } + + t.Run("GetNodesNetwork", func(t *testing.T) { + gotLastNets, err := cache.GetNodesNetwork(ctx, nodes) + require.NoError(t, err) + require.Len(t, gotLastNets, len(nodes)) + gotLastNetsSet := make(map[string]struct{}) + for _, lastNet := range gotLastNets { + gotLastNetsSet[lastNet] = struct{}{} + } + require.Len(t, gotLastNetsSet, distinctNetworks) + for _, lastNet := range gotLastNets { + require.NotEmpty(t, lastNet) + delete(setOfNets, lastNet) + } + require.Empty(t, setOfNets) // indicates that all last_nets were seen in the result + }) + + t.Run("GetNodesNetworkInOrder", func(t *testing.T) { + nodesPlusOne := make([]storj.NodeID, len(nodes)+1) + copy(nodesPlusOne[:len(nodes)], nodes) + lastNetsPlusOne := make([]string, len(nodes)+1) + copy(lastNetsPlusOne[:len(nodes)], lastNets) + // add a node that the overlay cache doesn't know about + unknownNode := testrand.NodeID() + nodesPlusOne[len(nodes)] = unknownNode + lastNetsPlusOne[len(nodes)] = "" + + // shuffle the order of the requested nodes, so we know output is in the right order + rand.Shuffle(len(nodesPlusOne), func(i, j int) { + nodesPlusOne[i], nodesPlusOne[j] = nodesPlusOne[j], nodesPlusOne[i] + lastNetsPlusOne[i], lastNetsPlusOne[j] = lastNetsPlusOne[j], lastNetsPlusOne[i] + }) + + gotLastNets, err := cache.GetNodesNetworkInOrder(ctx, nodesPlusOne) + require.NoError(t, err) + require.Len(t, gotLastNets, len(nodes)+1) + + require.Equal(t, lastNetsPlusOne, gotLastNets) + }) + }) +}