From 578724e9b153f7ad169200d67246eee2ee0af2f3 Mon Sep 17 00:00:00 2001 From: Michal Niewrzal Date: Mon, 26 Jun 2023 11:54:52 +0200 Subject: [PATCH] satellite/repair/repairer: use KnownReliable to check segment pieces At the moment segment repairer is skipping offline nodes in checks like clumped pieces and off placement pieces. This change is fixing this problem using new version of KnownReliable method. New method is returning both online and offline nodes. Provided data can be used to find clumped and off placement pieces. We are not using DownloadSelectionCache anymore with segment repairer. https://github.com/storj/storj/issues/5998 Change-Id: I236a1926e21f13df4cdedc91130352d37ff97e18 --- satellite/repair/repairer/segments.go | 191 +++++++++++++-------- satellite/repair/repairer/segments_test.go | 131 +++++++++----- 2 files changed, 204 insertions(+), 118 deletions(-) diff --git a/satellite/repair/repairer/segments.go b/satellite/repair/repairer/segments.go index 8a6d55001..7fca8195f 100644 --- a/satellite/repair/repairer/segments.go +++ b/satellite/repair/repairer/segments.go @@ -15,7 +15,6 @@ import ( "github.com/zeebo/errs" "go.uber.org/zap" "golang.org/x/exp/maps" - "golang.org/x/exp/slices" "storj.io/common/pb" "storj.io/common/storj" @@ -195,65 +194,15 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue mon.IntVal("repair_segment_size").Observe(int64(segment.EncryptedSize)) //mon:locked stats.repairSegmentSize.Observe(int64(segment.EncryptedSize)) + piecesCheck, err := repairer.classifySegmentPieces(ctx, segment) + if err != nil { + return false, err + } + pieces := segment.Pieces - allNodeIDs := make([]storj.NodeID, len(pieces)) - for i, p := range pieces { - allNodeIDs[i] = p.StorageNode - } - - excludeNodeIDs := allNodeIDs - - missingPieces, err := repairer.overlay.GetMissingPieces(ctx, pieces) - if err != nil { - return false, overlayQueryError.New("error identifying missing pieces: %w", err) - } - - var clumpedPieces metabase.Pieces - var clumpedPiecesSet map[uint16]bool - if repairer.doDeclumping { - // if multiple pieces are on the same last_net, keep only the first one. The rest are - // to be considered retrievable but unhealthy. - lastNets, err := repairer.overlay.GetNodesNetworkInOrder(ctx, allNodeIDs) - if err != nil { - return false, metainfoGetError.Wrap(err) - } - clumpedPieces = repair.FindClumpedPieces(segment.Pieces, lastNets) - clumpedPiecesSet = make(map[uint16]bool) - for _, clumpedPiece := range clumpedPieces { - clumpedPiecesSet[clumpedPiece.Number] = true - } - } - - var outOfPlacementPieces metabase.Pieces - var outOfPlacementPiecesSet map[uint16]bool - if repairer.doPlacementCheck && segment.Placement != storj.EveryCountry { - var err error - outOfPlacementNodes, err := repairer.overlay.GetNodesOutOfPlacement(ctx, allNodeIDs, segment.Placement) - if err != nil { - return false, metainfoGetError.Wrap(err) - } - - outOfPlacementPiecesSet = make(map[uint16]bool) - for _, piece := range pieces { - if slices.Contains(outOfPlacementNodes, piece.StorageNode) { - outOfPlacementPieces = append(outOfPlacementPieces, piece) - outOfPlacementPiecesSet[piece.Number] = true - } - } - } - - numUnhealthyRetrievable := len(clumpedPieces) + len(outOfPlacementPieces) - if len(clumpedPieces) != 0 && len(outOfPlacementPieces) != 0 { - // verify that some of clumped pieces and out of placement pieces are not the same - unhealthyRetrievableSet := map[uint16]bool{} - maps.Copy(unhealthyRetrievableSet, clumpedPiecesSet) - maps.Copy(unhealthyRetrievableSet, outOfPlacementPiecesSet) - numUnhealthyRetrievable = len(unhealthyRetrievableSet) - } - - numRetrievable := len(pieces) - len(missingPieces) - numHealthy := len(pieces) - len(missingPieces) - numUnhealthyRetrievable + numRetrievable := len(pieces) - len(piecesCheck.MissingPiecesSet) + numHealthy := len(pieces) - len(piecesCheck.MissingPiecesSet) - piecesCheck.NumUnhealthyRetrievable // irreparable segment if numRetrievable < int(segment.Redundancy.RequiredShares) { mon.Counter("repairer_segments_below_min_req").Inc(1) //mon:locked @@ -297,7 +246,15 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue // repair not needed if numHealthy-numHealthyInExcludedCountries > int(repairThreshold) { // remove pieces out of placement without repairing as we are above repair threshold - if len(outOfPlacementPieces) > 0 { + if len(piecesCheck.OutOfPlacementPiecesSet) > 0 { + + var outOfPlacementPieces metabase.Pieces + for _, piece := range pieces { + if _, ok := piecesCheck.OutOfPlacementPiecesSet[piece.Number]; ok { + outOfPlacementPieces = append(outOfPlacementPieces, piece) + } + } + newPieces, err := segment.Pieces.Update(nil, outOfPlacementPieces) if err != nil { return false, metainfoPutError.Wrap(err) @@ -317,13 +274,13 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue return false, metainfoPutError.Wrap(err) } - mon.Meter("dropped_out_of_placement_pieces").Mark(len(outOfPlacementPieces)) + mon.Meter("dropped_out_of_placement_pieces").Mark(len(piecesCheck.OutOfPlacementPiecesSet)) } mon.Meter("repair_unnecessary").Mark(1) //mon:locked stats.repairUnnecessary.Mark(1) repairer.log.Debug("segment above repair threshold", zap.Int("numHealthy", numHealthy), zap.Int32("repairThreshold", repairThreshold), - zap.Int("numClumped", len(clumpedPieces)), zap.Int("numOffPieces", len(outOfPlacementPieces))) + zap.Int("numClumped", len(piecesCheck.ClumpedPiecesSet)), zap.Int("numOffPieces", len(piecesCheck.OutOfPlacementPiecesSet))) return true, nil } @@ -334,7 +291,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue mon.FloatVal("healthy_ratio_before_repair").Observe(healthyRatioBeforeRepair) //mon:locked stats.healthyRatioBeforeRepair.Observe(healthyRatioBeforeRepair) - lostPiecesSet := sliceToSet(missingPieces) + lostPiecesSet := piecesCheck.MissingPiecesSet var retrievablePieces metabase.Pieces unhealthyPieces := make(map[metabase.Piece]struct{}) @@ -342,12 +299,11 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue // Populate retrievablePieces with all pieces from the segment except those correlating to indices in lostPieces. // Populate unhealthyPieces with all pieces in lostPieces, clumpedPieces or outOfPlacementPieces. for _, piece := range pieces { - excludeNodeIDs = append(excludeNodeIDs, piece.StorageNode) if lostPiecesSet[piece.Number] { unhealthyPieces[piece] = struct{}{} } else { retrievablePieces = append(retrievablePieces, piece) - if clumpedPiecesSet[piece.Number] || outOfPlacementPiecesSet[piece.Number] { + if piecesCheck.ClumpedPiecesSet[piece.Number] || piecesCheck.OutOfPlacementPiecesSet[piece.Number] { unhealthyPieces[piece] = struct{}{} } else { healthySet[int32(piece.Number)] = struct{}{} @@ -399,7 +355,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue // Request Overlay for n-h new storage nodes request := overlay.FindStorageNodesRequest{ RequestedCount: requestCount, - ExcludedIDs: excludeNodeIDs, + ExcludedIDs: piecesCheck.ExcludeNodeIDs, Placement: segment.Placement, } newNodes, err := repairer.overlay.FindStorageNodesForUpload(ctx, request) @@ -671,8 +627,8 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue repairer.log.Debug("repaired segment", zap.Stringer("Stream ID", segment.StreamID), zap.Uint64("Position", segment.Position.Encode()), - zap.Int("clumped pieces", len(clumpedPieces)), - zap.Int("out of placement pieces", len(outOfPlacementPieces)), + zap.Int("clumped pieces", len(piecesCheck.ClumpedPiecesSet)), + zap.Int("out of placement pieces", len(piecesCheck.OutOfPlacementPiecesSet)), zap.Int("in excluded countries", numHealthyInExcludedCountries), zap.Int("removed pieces", len(toRemove)), zap.Int("repaired pieces", len(repairedPieces)), @@ -681,6 +637,98 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue return true, nil } +type piecesCheckResult struct { + ExcludeNodeIDs []storj.NodeID + + MissingPiecesSet map[uint16]bool + ClumpedPiecesSet map[uint16]bool + OutOfPlacementPiecesSet map[uint16]bool + + NumUnhealthyRetrievable int +} + +func (repairer *SegmentRepairer) classifySegmentPieces(ctx context.Context, segment metabase.Segment) (result piecesCheckResult, err error) { + defer mon.Task()(&ctx)(&err) + + pieces := segment.Pieces + + allNodeIDs := make([]storj.NodeID, len(pieces)) + nodeIDPieceMap := map[storj.NodeID]uint16{} + result.MissingPiecesSet = map[uint16]bool{} + for i, p := range pieces { + allNodeIDs[i] = p.StorageNode + nodeIDPieceMap[p.StorageNode] = p.Number + result.MissingPiecesSet[p.Number] = true + } + + result.ExcludeNodeIDs = allNodeIDs + + online, offline, err := repairer.overlay.KnownReliable(ctx, allNodeIDs) + if err != nil { + return piecesCheckResult{}, overlayQueryError.New("error identifying missing pieces: %w", err) + } + + // remove online nodes from missing pieces + for _, onlineNode := range online { + pieceNum := nodeIDPieceMap[onlineNode.ID] + delete(result.MissingPiecesSet, pieceNum) + } + + if repairer.doDeclumping { + // if multiple pieces are on the same last_net, keep only the first one. The rest are + // to be considered retrievable but unhealthy. + lastNets := make([]string, 0, len(allNodeIDs)) + + reliablePieces := metabase.Pieces{} + + collectLastNets := func(reliable []overlay.SelectedNode) { + for _, node := range reliable { + pieceNum := nodeIDPieceMap[node.ID] + reliablePieces = append(reliablePieces, metabase.Piece{ + Number: pieceNum, + StorageNode: node.ID, + }) + lastNets = append(lastNets, node.LastNet) + } + } + collectLastNets(online) + collectLastNets(offline) + + clumpedPieces := repair.FindClumpedPieces(reliablePieces, lastNets) + result.ClumpedPiecesSet = map[uint16]bool{} + for _, clumpedPiece := range clumpedPieces { + result.ClumpedPiecesSet[clumpedPiece.Number] = true + } + } + + if repairer.doPlacementCheck && segment.Placement != storj.EveryCountry { + result.OutOfPlacementPiecesSet = map[uint16]bool{} + + checkPlacement := func(reliable []overlay.SelectedNode) { + for _, node := range reliable { + if segment.Placement.AllowedCountry(node.CountryCode) { + continue + } + + result.OutOfPlacementPiecesSet[nodeIDPieceMap[node.ID]] = true + } + } + checkPlacement(online) + checkPlacement(offline) + } + + result.NumUnhealthyRetrievable = len(result.ClumpedPiecesSet) + len(result.OutOfPlacementPiecesSet) + if len(result.ClumpedPiecesSet) != 0 && len(result.OutOfPlacementPiecesSet) != 0 { + // verify that some of clumped pieces and out of placement pieces are not the same + unhealthyRetrievableSet := map[uint16]bool{} + maps.Copy(unhealthyRetrievableSet, result.ClumpedPiecesSet) + maps.Copy(unhealthyRetrievableSet, result.OutOfPlacementPiecesSet) + result.NumUnhealthyRetrievable = len(unhealthyRetrievableSet) + } + + return result, nil +} + // checkIfSegmentAltered checks if oldSegment has been altered since it was selected for audit. func (repairer *SegmentRepairer) checkIfSegmentAltered(ctx context.Context, oldSegment metabase.Segment) (err error) { defer mon.Task()(&ctx)(&err) @@ -797,15 +845,6 @@ func (repairer *SegmentRepairer) AdminFetchPieces(ctx context.Context, seg *meta return pieceInfos, nil } -// sliceToSet converts the given slice to a set. -func sliceToSet(slice []uint16) map[uint16]bool { - set := make(map[uint16]bool, len(slice)) - for _, value := range slice { - set[value] = true - } - return set -} - // commaSeparatedArray concatenates an array into a comma-separated string, // lazily. type commaSeparatedArray []string diff --git a/satellite/repair/repairer/segments_test.go b/satellite/repair/repairer/segments_test.go index 5d504911f..50c676d67 100644 --- a/satellite/repair/repairer/segments_test.go +++ b/satellite/repair/repairer/segments_test.go @@ -5,6 +5,7 @@ package repairer_test import ( "context" + "strconv" "testing" "time" @@ -14,6 +15,7 @@ import ( "storj.io/common/memory" "storj.io/common/pb" "storj.io/common/storj" + "storj.io/common/storj/location" "storj.io/common/testcontext" "storj.io/common/testrand" "storj.io/storj/private/testplanet" @@ -27,13 +29,15 @@ import ( func TestSegmentRepairPlacement(t *testing.T) { piecesCount := 4 testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, StorageNodeCount: 10, UplinkCount: 1, + SatelliteCount: 1, StorageNodeCount: 12, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ - Satellite: testplanet.ReconfigureRS(1, 2, piecesCount, piecesCount), + Satellite: testplanet.ReconfigureRS(1, 1, piecesCount, piecesCount), }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { require.NoError(t, planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], "testbucket")) + defaultLocation := location.Poland + _, err := planet.Satellites[0].API.Buckets.Service.UpdateBucket(ctx, buckets.Bucket{ ProjectID: planet.Uplinks[0].Projects[0].ID, Name: "testbucket", @@ -41,65 +45,85 @@ func TestSegmentRepairPlacement(t *testing.T) { }) require.NoError(t, err) - for _, node := range planet.StorageNodes { - require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), "PL")) - } - - err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "object", testrand.Bytes(5*memory.KiB)) - require.NoError(t, err) - type testCase struct { piecesOutOfPlacement int piecesAfterRepair int + + // how many from out of placement pieces should be also offline + piecesOutOfPlacementOffline int } - for _, tc := range []testCase{ + for i, tc := range []testCase{ // all pieces/nodes are out of placement, repair download/upload should be triggered {piecesOutOfPlacement: piecesCount, piecesAfterRepair: piecesCount}, + + // all pieces/nodes are out of placement, repair download/upload should be triggered, some pieces are offline + {piecesOutOfPlacement: piecesCount, piecesAfterRepair: piecesCount, piecesOutOfPlacementOffline: 1}, + {piecesOutOfPlacement: piecesCount, piecesAfterRepair: piecesCount, piecesOutOfPlacementOffline: 2}, + // few pieces/nodes are out of placement, repair download/upload should be triggered - {piecesOutOfPlacement: piecesCount - 2, piecesAfterRepair: piecesCount}, + {piecesOutOfPlacement: piecesCount - 1, piecesAfterRepair: piecesCount}, + {piecesOutOfPlacement: piecesCount - 1, piecesAfterRepair: piecesCount, piecesOutOfPlacementOffline: 1}, + // single piece/node is out of placement, NO download/upload repair, we are only removing piece from segment // as segment is still above repair threshold {piecesOutOfPlacement: 1, piecesAfterRepair: piecesCount - 1}, + {piecesOutOfPlacement: 1, piecesAfterRepair: piecesCount - 1, piecesOutOfPlacementOffline: 1}, + {piecesOutOfPlacement: 1, piecesAfterRepair: piecesCount - 1, piecesOutOfPlacementOffline: 1}, } { - for _, node := range planet.StorageNodes { - require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), "PL")) - } + t.Run("#"+strconv.Itoa(i), func(t *testing.T) { + for _, node := range planet.StorageNodes { + require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), defaultLocation.String())) + } - require.NoError(t, planet.Satellites[0].Repairer.Overlay.DownloadSelectionCache.Refresh(ctx)) + require.NoError(t, planet.Satellites[0].Repairer.Overlay.DownloadSelectionCache.Refresh(ctx)) - segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) - require.NoError(t, err) - require.Len(t, segments, 1) - require.Len(t, segments[0].Pieces, piecesCount) + expectedData := testrand.Bytes(5 * memory.KiB) + err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "object", expectedData) + require.NoError(t, err) - for _, piece := range segments[0].Pieces[:tc.piecesOutOfPlacement] { - require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, piece.StorageNode, "US")) - } + segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) + require.NoError(t, err) + require.Len(t, segments, 1) + require.Len(t, segments[0].Pieces, piecesCount) - // confirm that some pieces are out of placement - ok, err := allPiecesInPlacement(ctx, planet.Satellites[0].Overlay.Service, segments[0].Pieces, segments[0].Placement) - require.NoError(t, err) - require.False(t, ok) + for index, piece := range segments[0].Pieces { + // make node offline if needed + require.NoError(t, updateNodeStatus(ctx, planet.Satellites[0], planet.FindNode(piece.StorageNode), index < tc.piecesOutOfPlacementOffline, defaultLocation)) - require.NoError(t, planet.Satellites[0].Repairer.Overlay.DownloadSelectionCache.Refresh(ctx)) + if index < tc.piecesOutOfPlacement { + require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, piece.StorageNode, "US")) + } + } - _, err = planet.Satellites[0].Repairer.SegmentRepairer.Repair(ctx, &queue.InjuredSegment{ - StreamID: segments[0].StreamID, - Position: segments[0].Position, + // confirm that some pieces are out of placement + ok, err := allPiecesInPlacement(ctx, planet.Satellites[0].Overlay.Service, segments[0].Pieces, segments[0].Placement) + require.NoError(t, err) + require.False(t, ok) + + require.NoError(t, planet.Satellites[0].Repairer.Overlay.DownloadSelectionCache.Refresh(ctx)) + + _, err = planet.Satellites[0].Repairer.SegmentRepairer.Repair(ctx, &queue.InjuredSegment{ + StreamID: segments[0].StreamID, + Position: segments[0].Position, + }) + require.NoError(t, err) + + // confirm that all pieces have correct placement + segments, err = planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) + require.NoError(t, err) + require.Len(t, segments, 1) + require.NotNil(t, segments[0].RepairedAt) + require.Len(t, segments[0].Pieces, tc.piecesAfterRepair) + + ok, err = allPiecesInPlacement(ctx, planet.Satellites[0].Overlay.Service, segments[0].Pieces, segments[0].Placement) + require.NoError(t, err) + require.True(t, ok) + + data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], "testbucket", "object") + require.NoError(t, err) + require.Equal(t, expectedData, data) }) - require.NoError(t, err) - - // confirm that all pieces have correct placement - segments, err = planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) - require.NoError(t, err) - require.Len(t, segments, 1) - require.NotNil(t, segments[0].RepairedAt) - require.Len(t, segments[0].Pieces, tc.piecesAfterRepair) - - ok, err = allPiecesInPlacement(ctx, planet.Satellites[0].Overlay.Service, segments[0].Pieces, segments[0].Placement) - require.NoError(t, err) - require.True(t, ok) } }) } @@ -248,3 +272,26 @@ func allPiecesInPlacement(ctx context.Context, overaly *overlay.Service, pieces } return true, nil } + +func updateNodeStatus(ctx context.Context, satellite *testplanet.Satellite, node *testplanet.StorageNode, offline bool, countryCode location.CountryCode) error { + timestamp := time.Now() + if offline { + timestamp = time.Now().Add(-4 * time.Hour) + } + + return satellite.DB.OverlayCache().UpdateCheckIn(ctx, overlay.NodeCheckInInfo{ + NodeID: node.ID(), + Address: &pb.NodeAddress{Address: node.Addr()}, + IsUp: true, + Version: &pb.NodeVersion{ + Version: "v0.0.0", + CommitHash: "", + Timestamp: time.Time{}, + Release: true, + }, + Capacity: &pb.NodeCapacity{ + FreeDisk: 1 * memory.GiB.Int64(), + }, + CountryCode: countryCode, + }, timestamp, satellite.Config.Overlay.Node) +}