From 0eaf43120ba85a57bbd067a01275063999edc62a Mon Sep 17 00:00:00 2001 From: Michal Niewrzal Date: Fri, 6 Oct 2023 11:53:48 +0200 Subject: [PATCH] satellite/repair/checker: optimize processing, part 3 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ClassifySegmentPieces uses custom set implementation instead map. Side note, for custom set implementation I also checked int8 bit set but it didn't give better performance so I used simpler implementation. Benchmark results (compared against part 2 optimization change): name old time/op new time/op delta RemoteSegment/healthy_segment-8 21.7µs ± 8% 15.4µs ±16% -29.38% (p=0.008 n=5+5) name old alloc/op new alloc/op delta RemoteSegment/healthy_segment-8 7.41kB ± 0% 1.87kB ± 0% -74.83% (p=0.000 n=5+4) name old allocs/op new allocs/op delta RemoteSegment/healthy_segment-8 150 ± 0% 130 ± 0% -13.33% (p=0.008 n=5+5) Change-Id: I21feca9ec6ac0a2558ac5ce8894451c54f69e52d --- satellite/repair/checker/observer.go | 22 ++-- satellite/repair/classification.go | 157 ++++++++++++++++-------- satellite/repair/classification_test.go | 46 +++---- satellite/repair/repairer/segments.go | 81 ++++++------ 4 files changed, 188 insertions(+), 118 deletions(-) diff --git a/satellite/repair/checker/observer.go b/satellite/repair/checker/observer.go index 2a8fa6655..7a4fa2eb6 100644 --- a/satellite/repair/checker/observer.go +++ b/satellite/repair/checker/observer.go @@ -390,18 +390,18 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme piecesCheck := repair.ClassifySegmentPieces(segment.Pieces, selectedNodes, fork.excludedCountryCodes, fork.doPlacementCheck, fork.doDeclumping, fork.placementRules(segment.Placement), fork.nodeIDs) - numHealthy := len(piecesCheck.Healthy) + numHealthy := piecesCheck.Healthy.Size() segmentTotalCountIntVal.Observe(int64(len(pieces))) stats.segmentStats.segmentTotalCount.Observe(int64(len(pieces))) segmentHealthyCountIntVal.Observe(int64(numHealthy)) stats.segmentStats.segmentHealthyCount.Observe(int64(numHealthy)) - segmentClumpedCountIntVal.Observe(int64(len(piecesCheck.Clumped))) - stats.segmentStats.segmentClumpedCount.Observe(int64(len(piecesCheck.Clumped))) - segmentExitingCountIntVal.Observe(int64(len(piecesCheck.Exiting))) - stats.segmentStats.segmentExitingCount.Observe(int64(len(piecesCheck.Exiting))) - segmentOffPlacementCountIntVal.Observe(int64(len(piecesCheck.OutOfPlacement))) - stats.segmentStats.segmentOffPlacementCount.Observe(int64(len(piecesCheck.OutOfPlacement))) + segmentClumpedCountIntVal.Observe(int64(piecesCheck.Clumped.Size())) + stats.segmentStats.segmentClumpedCount.Observe(int64(piecesCheck.Clumped.Size())) + segmentExitingCountIntVal.Observe(int64(piecesCheck.Exiting.Size())) + stats.segmentStats.segmentExitingCount.Observe(int64(piecesCheck.Exiting.Size())) + segmentOffPlacementCountIntVal.Observe(int64(piecesCheck.OutOfPlacement.Size())) + stats.segmentStats.segmentOffPlacementCount.Observe(int64(piecesCheck.OutOfPlacement.Size())) segmentAge := time.Since(segment.CreatedAt) segmentAgeIntVal.Observe(int64(segmentAge.Seconds())) @@ -417,7 +417,7 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme // except for the case when the repair and success thresholds are the same (a case usually seen during testing). // separate case is when we find pieces which are outside segment placement. in such case we are putting segment // into queue right away. - if (numHealthy <= repairThreshold && numHealthy < successThreshold) || len(piecesCheck.ForcingRepair) > 0 { + if (numHealthy <= repairThreshold && numHealthy < successThreshold) || piecesCheck.ForcingRepair.Size() > 0 { injuredSegmentHealthFloatVal.Observe(segmentHealth) stats.segmentStats.injuredSegmentHealth.Observe(segmentHealth) fork.totalStats.remoteSegmentsNeedingRepair++ @@ -440,7 +440,7 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme } // monitor irreparable segments - if len(piecesCheck.Retrievable) < required { + if piecesCheck.Retrievable.Size() < required { if !slices.Contains(fork.totalStats.objectsLost, segment.StreamID) { fork.totalStats.objectsLost = append(fork.totalStats.objectsLost, segment.StreamID) } @@ -472,13 +472,13 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme var missingNodes []string for _, piece := range pieces { - if _, isMissing := piecesCheck.Missing[piece.Number]; isMissing { + if piecesCheck.Missing.Contains(int(piece.Number)) { missingNodes = append(missingNodes, piece.StorageNode.String()) } } fork.log.Warn("checker found irreparable segment", zap.String("Segment StreamID", segment.StreamID.String()), zap.Int("Segment Position", int(segment.Position.Encode())), zap.Int("total pieces", len(pieces)), zap.Int("min required", required), zap.String("unavailable node IDs", strings.Join(missingNodes, ","))) - } else if len(piecesCheck.Clumped) > 0 && len(piecesCheck.Healthy)+len(piecesCheck.Clumped) > repairThreshold && len(piecesCheck.ForcingRepair) == 0 { + } else if piecesCheck.Clumped.Size() > 0 && piecesCheck.Healthy.Size()+piecesCheck.Clumped.Size() > repairThreshold && piecesCheck.ForcingRepair.Size() == 0 { // This segment is to be repaired because of clumping (it wouldn't need repair yet // otherwise). Produce a brief report of where the clumping occurred so that we have // a better understanding of the cause. diff --git a/satellite/repair/classification.go b/satellite/repair/classification.go index e2ce14727..b927a0b11 100644 --- a/satellite/repair/classification.go +++ b/satellite/repair/classification.go @@ -4,8 +4,6 @@ package repair import ( - "golang.org/x/exp/maps" - "storj.io/common/storj" "storj.io/common/storj/location" "storj.io/storj/satellite/metabase" @@ -22,39 +20,39 @@ type PiecesCheckResult struct { // Missing is a set of Piece Numbers which are to be considered as lost and irretrievable. // (They reside on offline/disqualified/unknown nodes.) - Missing map[uint16]struct{} + Missing IntSet // Retrievable contains all Piece Numbers that are retrievable; that is, all piece numbers // from the segment that are NOT in Missing. - Retrievable map[uint16]struct{} + Retrievable IntSet // Suspended is a set of Piece Numbers which reside on nodes which are suspended. - Suspended map[uint16]struct{} + Suspended IntSet // Clumped is a set of Piece Numbers which are to be considered unhealthy because of IP // clumping. (If DoDeclumping is disabled, this set will be empty.) - Clumped map[uint16]struct{} + Clumped IntSet // Exiting is a set of Piece Numbers which are considered unhealthy because the node on // which they reside has initiated graceful exit. - Exiting map[uint16]struct{} + Exiting IntSet // OutOfPlacement is a set of Piece Numbers which are unhealthy because of placement rules. // (If DoPlacementCheck is disabled, this set will be empty.) - OutOfPlacement map[uint16]struct{} + OutOfPlacement IntSet // InExcludedCountry is a set of Piece Numbers which are unhealthy because they are in // Excluded countries. - InExcludedCountry map[uint16]struct{} + InExcludedCountry IntSet // ForcingRepair is the set of pieces which force a repair operation for this segment (that // includes, currently, only pieces in OutOfPlacement). - ForcingRepair map[uint16]struct{} + ForcingRepair IntSet // Unhealthy contains all Piece Numbers which are in Missing OR Suspended OR Clumped OR // Exiting OR OutOfPlacement OR InExcludedCountry. - Unhealthy map[uint16]struct{} + Unhealthy IntSet // UnhealthyRetrievable is the set of pieces that are "unhealthy-but-retrievable". That is, // pieces that are in Unhealthy AND Retrievable. - UnhealthyRetrievable map[uint16]struct{} + UnhealthyRetrievable IntSet // Healthy contains all Piece Numbers from the segment which are not in Unhealthy. // (Equivalently: all Piece Numbers from the segment which are NOT in Missing OR // Suspended OR Clumped OR Exiting OR OutOfPlacement OR InExcludedCountry). - Healthy map[uint16]struct{} + Healthy IntSet } // ClassifySegmentPieces classifies the pieces of a segment into the categories @@ -64,12 +62,20 @@ func ClassifySegmentPieces(pieces metabase.Pieces, nodes []nodeselection.Selecte doPlacementCheck, doDeclumping bool, filter nodeselection.NodeFilter, excludeNodeIDs []storj.NodeID) (result PiecesCheckResult) { result.ExcludeNodeIDs = excludeNodeIDs + maxPieceNum := 0 + for _, piece := range pieces { + if int(piece.Number) > maxPieceNum { + maxPieceNum = int(piece.Number) + } + } + maxPieceNum++ + // check excluded countries and remove online nodes from missing pieces - result.Missing = make(map[uint16]struct{}) - result.Suspended = make(map[uint16]struct{}) - result.Exiting = make(map[uint16]struct{}) - result.Retrievable = make(map[uint16]struct{}) - result.InExcludedCountry = make(map[uint16]struct{}) + result.Missing = NewIntSet(maxPieceNum) + result.Suspended = NewIntSet(maxPieceNum) + result.Exiting = NewIntSet(maxPieceNum) + result.Retrievable = NewIntSet(maxPieceNum) + result.InExcludedCountry = NewIntSet(maxPieceNum) for index, nodeRecord := range nodes { pieceNum := pieces[index].Number @@ -80,21 +86,21 @@ func ClassifySegmentPieces(pieces metabase.Pieces, nodes []nodeselection.Selecte if nodeRecord.ID.IsZero() || !nodeRecord.Online { // node ID was not found, or the node is disqualified or exited, // or it is offline - result.Missing[pieceNum] = struct{}{} + result.Missing.Include(int(pieceNum)) } else { // node is expected to be online and receiving requests. - result.Retrievable[pieceNum] = struct{}{} + result.Retrievable.Include(int(pieceNum)) } if nodeRecord.Suspended { - result.Suspended[pieceNum] = struct{}{} + result.Suspended.Include(int(pieceNum)) } if nodeRecord.Exiting { - result.Exiting[pieceNum] = struct{}{} + result.Exiting.Include(int(pieceNum)) } if _, excluded := excludedCountryCodes[nodeRecord.CountryCode]; excluded { - result.InExcludedCountry[pieceNum] = struct{}{} + result.InExcludedCountry.Include(int(pieceNum)) } } @@ -103,7 +109,7 @@ func ClassifySegmentPieces(pieces metabase.Pieces, nodes []nodeselection.Selecte // to be considered retrievable but unhealthy. lastNets := make(map[string]struct{}, len(pieces)) - result.Clumped = make(map[uint16]struct{}) + result.Clumped = NewIntSet(maxPieceNum) collectClumpedPieces := func(onlineness bool) { for index, nodeRecord := range nodes { @@ -117,7 +123,7 @@ func ClassifySegmentPieces(pieces metabase.Pieces, nodes []nodeselection.Selecte _, ok := lastNets[nodeRecord.LastNet] if ok { // this LastNet was already seen - result.Clumped[pieceNum] = struct{}{} + result.Clumped.Include(int(pieceNum)) } else { // add to the list of seen LastNets lastNets[nodeRecord.LastNet] = struct{}{} @@ -133,7 +139,7 @@ func ClassifySegmentPieces(pieces metabase.Pieces, nodes []nodeselection.Selecte if doPlacementCheck { // mark all pieces that are out of placement. - result.OutOfPlacement = make(map[uint16]struct{}) + result.OutOfPlacement = NewIntSet(maxPieceNum) for index, nodeRecord := range nodes { if nodeRecord.ID.IsZero() { continue @@ -142,37 +148,92 @@ func ClassifySegmentPieces(pieces metabase.Pieces, nodes []nodeselection.Selecte continue } pieceNum := pieces[index].Number - result.OutOfPlacement[pieceNum] = struct{}{} + result.OutOfPlacement.Include(int(pieceNum)) } } // ForcingRepair = OutOfPlacement only, for now - result.ForcingRepair = make(map[uint16]struct{}) - maps.Copy(result.ForcingRepair, result.OutOfPlacement) + result.ForcingRepair = copyIntSet(NewIntSet(maxPieceNum), + result.OutOfPlacement, + ) - // Unhealthy = Missing OR Suspended OR Clumped OR OutOfPlacement OR InExcludedCountry - result.Unhealthy = make(map[uint16]struct{}) - maps.Copy(result.Unhealthy, result.Missing) - maps.Copy(result.Unhealthy, result.Suspended) - maps.Copy(result.Unhealthy, result.Clumped) - maps.Copy(result.Unhealthy, result.Exiting) - maps.Copy(result.Unhealthy, result.OutOfPlacement) - maps.Copy(result.Unhealthy, result.InExcludedCountry) + // Unhealthy = Missing OR Suspended OR Clumped OR Exiting OR OutOfPlacement OR InExcludedCountry + result.Unhealthy = copyIntSet(NewIntSet(maxPieceNum), + result.Missing, + result.Suspended, + result.Clumped, + result.Exiting, + result.OutOfPlacement, + result.InExcludedCountry, + ) // UnhealthyRetrievable = Unhealthy AND Retrievable - result.UnhealthyRetrievable = make(map[uint16]struct{}) - for pieceNum := range result.Unhealthy { - if _, isRetrievable := result.Retrievable[pieceNum]; isRetrievable { - result.UnhealthyRetrievable[pieceNum] = struct{}{} + // Healthy = NOT Unhealthy + result.UnhealthyRetrievable = NewIntSet(maxPieceNum) + result.Healthy = NewIntSet(maxPieceNum) + for _, piece := range pieces { + if !result.Unhealthy.Contains(int(piece.Number)) { + result.Healthy.Include(int(piece.Number)) + } else if result.Retrievable.Contains(int(piece.Number)) { + result.UnhealthyRetrievable.Include(int(piece.Number)) } } - // Healthy = NOT Unhealthy - result.Healthy = make(map[uint16]struct{}) - for _, piece := range pieces { - if _, found := result.Unhealthy[piece.Number]; !found { - result.Healthy[piece.Number] = struct{}{} - } - } return result } + +func copyIntSet(destination IntSet, sources ...IntSet) IntSet { + for element := 0; element < destination.Cap(); element++ { + for _, sources := range sources { + if sources.Contains(element) { + destination.Include(element) + break + } + } + } + return destination +} + +// IntSet set of pieces. +type IntSet struct { + bits []bool + size int +} + +// NewIntSet creates new int set. +func NewIntSet(n int) IntSet { + return IntSet{ + bits: make([]bool, n), + } +} + +// Contains returns true if set includes int value. +func (i IntSet) Contains(value int) bool { + if value >= cap(i.bits) { + return false + } + return i.bits[value] +} + +// Include includes int value into set. +// Ignores values above set size. +func (i *IntSet) Include(value int) { + i.bits[value] = true + i.size++ +} + +// Remove removes int value from set. +func (i *IntSet) Remove(value int) { + i.bits[value] = true + i.size-- +} + +// Size returns size of set. +func (i IntSet) Size() int { + return i.size +} + +// Cap returns set capacity. +func (i IntSet) Cap() int { + return cap(i.bits) +} diff --git a/satellite/repair/classification_test.go b/satellite/repair/classification_test.go index c0f8ba39a..d9cfee00e 100644 --- a/satellite/repair/classification_test.go +++ b/satellite/repair/classification_test.go @@ -45,10 +45,10 @@ func TestClassifySegmentPieces(t *testing.T) { pieces := createPieces(selectedNodes, 0, 1, 2, 3, 4) result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, false, parsed.CreateFilters(0), piecesToNodeIDs(pieces)) - require.Equal(t, 0, len(result.Missing)) - require.Equal(t, 0, len(result.Clumped)) - require.Equal(t, 0, len(result.OutOfPlacement)) - require.Equal(t, 0, len(result.UnhealthyRetrievable)) + require.Equal(t, 0, result.Missing.Size()) + require.Equal(t, 0, result.Clumped.Size()) + require.Equal(t, 0, result.OutOfPlacement.Size()) + require.Equal(t, 0, result.UnhealthyRetrievable.Size()) }) t.Run("out of placement", func(t *testing.T) { @@ -71,11 +71,11 @@ func TestClassifySegmentPieces(t *testing.T) { pieces := createPieces(selectedNodes, 1, 2, 3, 4, 7, 8) result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, false, c.CreateFilters(10), piecesToNodeIDs(pieces)) - require.Equal(t, 0, len(result.Missing)) - require.Equal(t, 0, len(result.Clumped)) + require.Equal(t, 0, result.Missing.Size()) + require.Equal(t, 0, result.Clumped.Size()) // 1,2,3 are in Germany instead of GB - require.Equal(t, 3, len(result.OutOfPlacement)) - require.Equal(t, 3, len(result.UnhealthyRetrievable)) + require.Equal(t, 3, result.OutOfPlacement.Size()) + require.Equal(t, 3, result.UnhealthyRetrievable.Size()) }) t.Run("out of placement and offline", func(t *testing.T) { @@ -95,11 +95,11 @@ func TestClassifySegmentPieces(t *testing.T) { result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, false, c.CreateFilters(10), piecesToNodeIDs(pieces)) // offline nodes - require.Equal(t, 5, len(result.Missing)) - require.Equal(t, 0, len(result.Clumped)) - require.Equal(t, 10, len(result.OutOfPlacement)) - require.Equal(t, 5, len(result.UnhealthyRetrievable)) - numHealthy := len(pieces) - len(result.Missing) - len(result.UnhealthyRetrievable) + require.Equal(t, 5, result.Missing.Size()) + require.Equal(t, 0, result.Clumped.Size()) + require.Equal(t, 10, result.OutOfPlacement.Size()) + require.Equal(t, 5, result.UnhealthyRetrievable.Size()) + numHealthy := len(pieces) - result.Missing.Size() - result.UnhealthyRetrievable.Size() require.Equal(t, 0, numHealthy) }) @@ -118,11 +118,11 @@ func TestClassifySegmentPieces(t *testing.T) { result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, true, c.CreateFilters(0), piecesToNodeIDs(pieces)) // offline nodes - require.Equal(t, 2, len(result.Missing)) - require.Equal(t, 3, len(result.Clumped)) - require.Equal(t, 0, len(result.OutOfPlacement)) - require.Equal(t, 2, len(result.UnhealthyRetrievable)) - numHealthy := len(pieces) - len(result.Missing) - len(result.UnhealthyRetrievable) + require.Equal(t, 2, result.Missing.Size()) + require.Equal(t, 3, result.Clumped.Size()) + require.Equal(t, 0, result.OutOfPlacement.Size()) + require.Equal(t, 2, result.UnhealthyRetrievable.Size()) + numHealthy := len(pieces) - result.Missing.Size() - result.UnhealthyRetrievable.Size() require.Equal(t, 3, numHealthy) }) @@ -145,11 +145,11 @@ func TestClassifySegmentPieces(t *testing.T) { result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, true, c.CreateFilters(10), piecesToNodeIDs(pieces)) // offline nodes - require.Equal(t, 2, len(result.Missing)) - require.Equal(t, 0, len(result.Clumped)) - require.Equal(t, 0, len(result.OutOfPlacement)) - require.Equal(t, 0, len(result.UnhealthyRetrievable)) - numHealthy := len(pieces) - len(result.Missing) - len(result.UnhealthyRetrievable) + require.Equal(t, 2, result.Missing.Size()) + require.Equal(t, 0, result.Clumped.Size()) + require.Equal(t, 0, result.OutOfPlacement.Size()) + require.Equal(t, 0, result.UnhealthyRetrievable.Size()) + numHealthy := len(pieces) - result.Missing.Size() - result.UnhealthyRetrievable.Size() require.Equal(t, 5, numHealthy) }) diff --git a/satellite/repair/repairer/segments.go b/satellite/repair/repairer/segments.go index e7678b353..19f80fed8 100644 --- a/satellite/repair/repairer/segments.go +++ b/satellite/repair/repairer/segments.go @@ -14,7 +14,6 @@ import ( "github.com/zeebo/errs" "go.uber.org/zap" - "golang.org/x/exp/maps" "storj.io/common/pb" "storj.io/common/storj" @@ -225,7 +224,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue piecesCheck := repair.ClassifySegmentPieces(pieces, selectedNodes, repairer.excludedCountryCodes, repairer.doPlacementCheck, repairer.doDeclumping, repairer.placementRules(segment.Placement), allNodeIDs) // irreparable segment - if len(piecesCheck.Retrievable) < int(segment.Redundancy.RequiredShares) { + if piecesCheck.Retrievable.Size() < int(segment.Redundancy.RequiredShares) { mon.Counter("repairer_segments_below_min_req").Inc(1) //mon:locked stats.repairerSegmentsBelowMinReq.Inc(1) mon.Meter("repair_nodes_unavailable").Mark(1) //mon:locked @@ -234,7 +233,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue repairer.log.Warn("irreparable segment", zap.String("StreamID", queueSegment.StreamID.String()), zap.Uint64("Position", queueSegment.Position.Encode()), - zap.Int("piecesAvailable", len(piecesCheck.Retrievable)), + zap.Int("piecesAvailable", piecesCheck.Retrievable.Size()), zap.Int16("piecesRequired", segment.Redundancy.RequiredShares), ) return false, nil @@ -257,15 +256,15 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue repairThreshold = overrideValue } - if len(piecesCheck.Healthy) > int(repairThreshold) { + if piecesCheck.Healthy.Size() > int(repairThreshold) { // No repair is needed (note Healthy does not include pieces in ForcingRepair). var dropPieces metabase.Pieces - if len(piecesCheck.ForcingRepair) > 0 { + if piecesCheck.ForcingRepair.Size() > 0 { // No repair is needed, but remove forcing-repair pieces without a repair operation, // as we will still be above the repair threshold. for _, piece := range pieces { - if _, ok := piecesCheck.ForcingRepair[piece.Number]; ok { + if piecesCheck.ForcingRepair.Contains(int(piece.Number)) { dropPieces = append(dropPieces, piece) } } @@ -295,23 +294,23 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue mon.Meter("repair_unnecessary").Mark(1) //mon:locked stats.repairUnnecessary.Mark(1) - repairer.log.Debug("segment above repair threshold", zap.Int("numHealthy", len(piecesCheck.Healthy)), zap.Int32("repairThreshold", repairThreshold), - zap.Int("numClumped", len(piecesCheck.Clumped)), zap.Int("numExiting", len(piecesCheck.Exiting)), zap.Int("numOffPieces", len(piecesCheck.OutOfPlacement)), - zap.Int("numExcluded", len(piecesCheck.InExcludedCountry)), zap.Int("droppedPieces", len(dropPieces))) + repairer.log.Debug("segment above repair threshold", zap.Int("numHealthy", piecesCheck.Healthy.Size()), zap.Int32("repairThreshold", repairThreshold), + zap.Int("numClumped", piecesCheck.Clumped.Size()), zap.Int("numExiting", piecesCheck.Exiting.Size()), zap.Int("numOffPieces", piecesCheck.OutOfPlacement.Size()), + zap.Int("numExcluded", piecesCheck.InExcludedCountry.Size()), zap.Int("droppedPieces", len(dropPieces))) return true, nil } healthyRatioBeforeRepair := 0.0 if segment.Redundancy.TotalShares != 0 { - healthyRatioBeforeRepair = float64(len(piecesCheck.Healthy)) / float64(segment.Redundancy.TotalShares) + healthyRatioBeforeRepair = float64(piecesCheck.Healthy.Size()) / float64(segment.Redundancy.TotalShares) } mon.FloatVal("healthy_ratio_before_repair").Observe(healthyRatioBeforeRepair) //mon:locked stats.healthyRatioBeforeRepair.Observe(healthyRatioBeforeRepair) // Create the order limits for the GET_REPAIR action - retrievablePieces := make(metabase.Pieces, 0, len(piecesCheck.Retrievable)) + retrievablePieces := make(metabase.Pieces, 0, piecesCheck.Retrievable.Size()) for _, piece := range pieces { - if _, found := piecesCheck.Retrievable[piece.Number]; found { + if piecesCheck.Retrievable.Contains(int(piece.Number)) { retrievablePieces = append(retrievablePieces, piece) } } @@ -338,11 +337,12 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue // call to CreateGetRepairOrderLimits. Add or remove them from the appropriate sets. for _, piece := range retrievablePieces { if getOrderLimits[piece.Number] == nil { - piecesCheck.Missing[piece.Number] = struct{}{} - piecesCheck.Unhealthy[piece.Number] = struct{}{} - delete(piecesCheck.Healthy, piece.Number) - delete(piecesCheck.Retrievable, piece.Number) - delete(piecesCheck.UnhealthyRetrievable, piece.Number) + piecesCheck.Missing.Include(int(piece.Number)) + piecesCheck.Unhealthy.Include(int(piece.Number)) + + piecesCheck.Healthy.Remove(int(piece.Number)) + piecesCheck.Retrievable.Remove(int(piece.Number)) + piecesCheck.UnhealthyRetrievable.Remove(int(piece.Number)) } } @@ -352,9 +352,9 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue if totalNeeded > redundancy.TotalCount() { totalNeeded = redundancy.TotalCount() } - requestCount = totalNeeded - len(piecesCheck.Healthy) + requestCount = totalNeeded - piecesCheck.Healthy.Size() } - minSuccessfulNeeded := redundancy.OptimalThreshold() - len(piecesCheck.Healthy) + minSuccessfulNeeded := redundancy.OptimalThreshold() - piecesCheck.Healthy.Size() // Request Overlay for n-h new storage nodes request := overlay.FindStorageNodesRequest{ @@ -372,13 +372,22 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue // pieces they have, as long as they are kept intact and retrievable). maxToKeep := int(segment.Redundancy.TotalShares) - len(newNodes) toKeep := map[uint16]struct{}{} - maps.Copy(toKeep, piecesCheck.Healthy) - for excludedNodeNum := range piecesCheck.InExcludedCountry { - if len(toKeep) >= maxToKeep { - break + + // TODO how to avoid this two loops + for _, piece := range pieces { + if piecesCheck.Healthy.Contains(int(piece.Number)) { + toKeep[piece.Number] = struct{}{} } - toKeep[excludedNodeNum] = struct{}{} } + for _, piece := range pieces { + if piecesCheck.InExcludedCountry.Contains(int(piece.Number)) { + if len(toKeep) >= maxToKeep { + break + } + toKeep[piece.Number] = struct{}{} + } + } + putLimits, putPrivateKey, err := repairer.orders.CreatePutRepairOrderLimits(ctx, segment, getOrderLimits, toKeep, newNodes) if err != nil { return false, orderLimitFailureError.New("could not create PUT_REPAIR order limits: %w", err) @@ -551,7 +560,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue mon.Meter("repair_bytes_uploaded").Mark64(bytesRepaired) //mon:locked - healthyAfterRepair := len(piecesCheck.Healthy) + len(repairedPieces) + healthyAfterRepair := piecesCheck.Healthy.Size() + len(repairedPieces) switch { case healthyAfterRepair >= int(segment.Redundancy.OptimalShares): mon.Meter("repair_success").Mark(1) //mon:locked @@ -584,9 +593,9 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue // (Retrievable AND InExcludedCountry). Those, we allow to remain on the nodes as // long as the nodes are keeping the pieces intact and available. for _, piece := range pieces { - if _, isUnhealthy := piecesCheck.Unhealthy[piece.Number]; isUnhealthy { - _, retrievable := piecesCheck.Retrievable[piece.Number] - _, inExcludedCountry := piecesCheck.InExcludedCountry[piece.Number] + if piecesCheck.Unhealthy.Contains(int(piece.Number)) { + retrievable := piecesCheck.Retrievable.Contains(int(piece.Number)) + inExcludedCountry := piecesCheck.InExcludedCountry.Contains(int(piece.Number)) if retrievable && inExcludedCountry { continue } @@ -598,7 +607,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue // pieces. We want to do that wherever possible, except where doing so puts data in // jeopardy. for _, piece := range pieces { - if _, ok := piecesCheck.OutOfPlacement[piece.Number]; ok { + if piecesCheck.OutOfPlacement.Contains(int(piece.Number)) { toRemove = append(toRemove, piece) } } @@ -657,15 +666,15 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue repairer.log.Debug("repaired segment", zap.Stringer("Stream ID", segment.StreamID), zap.Uint64("Position", segment.Position.Encode()), - zap.Int("clumped pieces", len(piecesCheck.Clumped)), - zap.Int("exiting-node pieces", len(piecesCheck.Exiting)), - zap.Int("out of placement pieces", len(piecesCheck.OutOfPlacement)), - zap.Int("in excluded countries", len(piecesCheck.InExcludedCountry)), - zap.Int("missing pieces", len(piecesCheck.Missing)), + zap.Int("clumped pieces", piecesCheck.Clumped.Size()), + zap.Int("exiting-node pieces", piecesCheck.Exiting.Size()), + zap.Int("out of placement pieces", piecesCheck.OutOfPlacement.Size()), + zap.Int("in excluded countries", piecesCheck.InExcludedCountry.Size()), + zap.Int("missing pieces", piecesCheck.Missing.Size()), zap.Int("removed pieces", len(toRemove)), zap.Int("repaired pieces", len(repairedPieces)), - zap.Int("retrievable pieces", len(piecesCheck.Retrievable)), - zap.Int("healthy before repair", len(piecesCheck.Healthy)), + zap.Int("retrievable pieces", piecesCheck.Retrievable.Size()), + zap.Int("healthy before repair", piecesCheck.Healthy.Size()), zap.Int("healthy after repair", healthyAfterRepair), zap.Int("total before repair", len(piecesCheck.ExcludeNodeIDs)), zap.Int("total after repair", len(newPieces)))