diff --git a/satellite/repair/repairer/segments.go b/satellite/repair/repairer/segments.go index 8a6d55001..7fca8195f 100644 --- a/satellite/repair/repairer/segments.go +++ b/satellite/repair/repairer/segments.go @@ -15,7 +15,6 @@ import ( "github.com/zeebo/errs" "go.uber.org/zap" "golang.org/x/exp/maps" - "golang.org/x/exp/slices" "storj.io/common/pb" "storj.io/common/storj" @@ -195,65 +194,15 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue mon.IntVal("repair_segment_size").Observe(int64(segment.EncryptedSize)) //mon:locked stats.repairSegmentSize.Observe(int64(segment.EncryptedSize)) + piecesCheck, err := repairer.classifySegmentPieces(ctx, segment) + if err != nil { + return false, err + } + pieces := segment.Pieces - allNodeIDs := make([]storj.NodeID, len(pieces)) - for i, p := range pieces { - allNodeIDs[i] = p.StorageNode - } - - excludeNodeIDs := allNodeIDs - - missingPieces, err := repairer.overlay.GetMissingPieces(ctx, pieces) - if err != nil { - return false, overlayQueryError.New("error identifying missing pieces: %w", err) - } - - var clumpedPieces metabase.Pieces - var clumpedPiecesSet map[uint16]bool - if repairer.doDeclumping { - // if multiple pieces are on the same last_net, keep only the first one. The rest are - // to be considered retrievable but unhealthy. - lastNets, err := repairer.overlay.GetNodesNetworkInOrder(ctx, allNodeIDs) - if err != nil { - return false, metainfoGetError.Wrap(err) - } - clumpedPieces = repair.FindClumpedPieces(segment.Pieces, lastNets) - clumpedPiecesSet = make(map[uint16]bool) - for _, clumpedPiece := range clumpedPieces { - clumpedPiecesSet[clumpedPiece.Number] = true - } - } - - var outOfPlacementPieces metabase.Pieces - var outOfPlacementPiecesSet map[uint16]bool - if repairer.doPlacementCheck && segment.Placement != storj.EveryCountry { - var err error - outOfPlacementNodes, err := repairer.overlay.GetNodesOutOfPlacement(ctx, allNodeIDs, segment.Placement) - if err != nil { - return false, metainfoGetError.Wrap(err) - } - - outOfPlacementPiecesSet = make(map[uint16]bool) - for _, piece := range pieces { - if slices.Contains(outOfPlacementNodes, piece.StorageNode) { - outOfPlacementPieces = append(outOfPlacementPieces, piece) - outOfPlacementPiecesSet[piece.Number] = true - } - } - } - - numUnhealthyRetrievable := len(clumpedPieces) + len(outOfPlacementPieces) - if len(clumpedPieces) != 0 && len(outOfPlacementPieces) != 0 { - // verify that some of clumped pieces and out of placement pieces are not the same - unhealthyRetrievableSet := map[uint16]bool{} - maps.Copy(unhealthyRetrievableSet, clumpedPiecesSet) - maps.Copy(unhealthyRetrievableSet, outOfPlacementPiecesSet) - numUnhealthyRetrievable = len(unhealthyRetrievableSet) - } - - numRetrievable := len(pieces) - len(missingPieces) - numHealthy := len(pieces) - len(missingPieces) - numUnhealthyRetrievable + numRetrievable := len(pieces) - len(piecesCheck.MissingPiecesSet) + numHealthy := len(pieces) - len(piecesCheck.MissingPiecesSet) - piecesCheck.NumUnhealthyRetrievable // irreparable segment if numRetrievable < int(segment.Redundancy.RequiredShares) { mon.Counter("repairer_segments_below_min_req").Inc(1) //mon:locked @@ -297,7 +246,15 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue // repair not needed if numHealthy-numHealthyInExcludedCountries > int(repairThreshold) { // remove pieces out of placement without repairing as we are above repair threshold - if len(outOfPlacementPieces) > 0 { + if len(piecesCheck.OutOfPlacementPiecesSet) > 0 { + + var outOfPlacementPieces metabase.Pieces + for _, piece := range pieces { + if _, ok := piecesCheck.OutOfPlacementPiecesSet[piece.Number]; ok { + outOfPlacementPieces = append(outOfPlacementPieces, piece) + } + } + newPieces, err := segment.Pieces.Update(nil, outOfPlacementPieces) if err != nil { return false, metainfoPutError.Wrap(err) @@ -317,13 +274,13 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue return false, metainfoPutError.Wrap(err) } - mon.Meter("dropped_out_of_placement_pieces").Mark(len(outOfPlacementPieces)) + mon.Meter("dropped_out_of_placement_pieces").Mark(len(piecesCheck.OutOfPlacementPiecesSet)) } mon.Meter("repair_unnecessary").Mark(1) //mon:locked stats.repairUnnecessary.Mark(1) repairer.log.Debug("segment above repair threshold", zap.Int("numHealthy", numHealthy), zap.Int32("repairThreshold", repairThreshold), - zap.Int("numClumped", len(clumpedPieces)), zap.Int("numOffPieces", len(outOfPlacementPieces))) + zap.Int("numClumped", len(piecesCheck.ClumpedPiecesSet)), zap.Int("numOffPieces", len(piecesCheck.OutOfPlacementPiecesSet))) return true, nil } @@ -334,7 +291,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue mon.FloatVal("healthy_ratio_before_repair").Observe(healthyRatioBeforeRepair) //mon:locked stats.healthyRatioBeforeRepair.Observe(healthyRatioBeforeRepair) - lostPiecesSet := sliceToSet(missingPieces) + lostPiecesSet := piecesCheck.MissingPiecesSet var retrievablePieces metabase.Pieces unhealthyPieces := make(map[metabase.Piece]struct{}) @@ -342,12 +299,11 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue // Populate retrievablePieces with all pieces from the segment except those correlating to indices in lostPieces. // Populate unhealthyPieces with all pieces in lostPieces, clumpedPieces or outOfPlacementPieces. for _, piece := range pieces { - excludeNodeIDs = append(excludeNodeIDs, piece.StorageNode) if lostPiecesSet[piece.Number] { unhealthyPieces[piece] = struct{}{} } else { retrievablePieces = append(retrievablePieces, piece) - if clumpedPiecesSet[piece.Number] || outOfPlacementPiecesSet[piece.Number] { + if piecesCheck.ClumpedPiecesSet[piece.Number] || piecesCheck.OutOfPlacementPiecesSet[piece.Number] { unhealthyPieces[piece] = struct{}{} } else { healthySet[int32(piece.Number)] = struct{}{} @@ -399,7 +355,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue // Request Overlay for n-h new storage nodes request := overlay.FindStorageNodesRequest{ RequestedCount: requestCount, - ExcludedIDs: excludeNodeIDs, + ExcludedIDs: piecesCheck.ExcludeNodeIDs, Placement: segment.Placement, } newNodes, err := repairer.overlay.FindStorageNodesForUpload(ctx, request) @@ -671,8 +627,8 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue repairer.log.Debug("repaired segment", zap.Stringer("Stream ID", segment.StreamID), zap.Uint64("Position", segment.Position.Encode()), - zap.Int("clumped pieces", len(clumpedPieces)), - zap.Int("out of placement pieces", len(outOfPlacementPieces)), + zap.Int("clumped pieces", len(piecesCheck.ClumpedPiecesSet)), + zap.Int("out of placement pieces", len(piecesCheck.OutOfPlacementPiecesSet)), zap.Int("in excluded countries", numHealthyInExcludedCountries), zap.Int("removed pieces", len(toRemove)), zap.Int("repaired pieces", len(repairedPieces)), @@ -681,6 +637,98 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue return true, nil } +type piecesCheckResult struct { + ExcludeNodeIDs []storj.NodeID + + MissingPiecesSet map[uint16]bool + ClumpedPiecesSet map[uint16]bool + OutOfPlacementPiecesSet map[uint16]bool + + NumUnhealthyRetrievable int +} + +func (repairer *SegmentRepairer) classifySegmentPieces(ctx context.Context, segment metabase.Segment) (result piecesCheckResult, err error) { + defer mon.Task()(&ctx)(&err) + + pieces := segment.Pieces + + allNodeIDs := make([]storj.NodeID, len(pieces)) + nodeIDPieceMap := map[storj.NodeID]uint16{} + result.MissingPiecesSet = map[uint16]bool{} + for i, p := range pieces { + allNodeIDs[i] = p.StorageNode + nodeIDPieceMap[p.StorageNode] = p.Number + result.MissingPiecesSet[p.Number] = true + } + + result.ExcludeNodeIDs = allNodeIDs + + online, offline, err := repairer.overlay.KnownReliable(ctx, allNodeIDs) + if err != nil { + return piecesCheckResult{}, overlayQueryError.New("error identifying missing pieces: %w", err) + } + + // remove online nodes from missing pieces + for _, onlineNode := range online { + pieceNum := nodeIDPieceMap[onlineNode.ID] + delete(result.MissingPiecesSet, pieceNum) + } + + if repairer.doDeclumping { + // if multiple pieces are on the same last_net, keep only the first one. The rest are + // to be considered retrievable but unhealthy. + lastNets := make([]string, 0, len(allNodeIDs)) + + reliablePieces := metabase.Pieces{} + + collectLastNets := func(reliable []overlay.SelectedNode) { + for _, node := range reliable { + pieceNum := nodeIDPieceMap[node.ID] + reliablePieces = append(reliablePieces, metabase.Piece{ + Number: pieceNum, + StorageNode: node.ID, + }) + lastNets = append(lastNets, node.LastNet) + } + } + collectLastNets(online) + collectLastNets(offline) + + clumpedPieces := repair.FindClumpedPieces(reliablePieces, lastNets) + result.ClumpedPiecesSet = map[uint16]bool{} + for _, clumpedPiece := range clumpedPieces { + result.ClumpedPiecesSet[clumpedPiece.Number] = true + } + } + + if repairer.doPlacementCheck && segment.Placement != storj.EveryCountry { + result.OutOfPlacementPiecesSet = map[uint16]bool{} + + checkPlacement := func(reliable []overlay.SelectedNode) { + for _, node := range reliable { + if segment.Placement.AllowedCountry(node.CountryCode) { + continue + } + + result.OutOfPlacementPiecesSet[nodeIDPieceMap[node.ID]] = true + } + } + checkPlacement(online) + checkPlacement(offline) + } + + result.NumUnhealthyRetrievable = len(result.ClumpedPiecesSet) + len(result.OutOfPlacementPiecesSet) + if len(result.ClumpedPiecesSet) != 0 && len(result.OutOfPlacementPiecesSet) != 0 { + // verify that some of clumped pieces and out of placement pieces are not the same + unhealthyRetrievableSet := map[uint16]bool{} + maps.Copy(unhealthyRetrievableSet, result.ClumpedPiecesSet) + maps.Copy(unhealthyRetrievableSet, result.OutOfPlacementPiecesSet) + result.NumUnhealthyRetrievable = len(unhealthyRetrievableSet) + } + + return result, nil +} + // checkIfSegmentAltered checks if oldSegment has been altered since it was selected for audit. func (repairer *SegmentRepairer) checkIfSegmentAltered(ctx context.Context, oldSegment metabase.Segment) (err error) { defer mon.Task()(&ctx)(&err) @@ -797,15 +845,6 @@ func (repairer *SegmentRepairer) AdminFetchPieces(ctx context.Context, seg *meta return pieceInfos, nil } -// sliceToSet converts the given slice to a set. -func sliceToSet(slice []uint16) map[uint16]bool { - set := make(map[uint16]bool, len(slice)) - for _, value := range slice { - set[value] = true - } - return set -} - // commaSeparatedArray concatenates an array into a comma-separated string, // lazily. type commaSeparatedArray []string diff --git a/satellite/repair/repairer/segments_test.go b/satellite/repair/repairer/segments_test.go index 5d504911f..50c676d67 100644 --- a/satellite/repair/repairer/segments_test.go +++ b/satellite/repair/repairer/segments_test.go @@ -5,6 +5,7 @@ package repairer_test import ( "context" + "strconv" "testing" "time" @@ -14,6 +15,7 @@ import ( "storj.io/common/memory" "storj.io/common/pb" "storj.io/common/storj" + "storj.io/common/storj/location" "storj.io/common/testcontext" "storj.io/common/testrand" "storj.io/storj/private/testplanet" @@ -27,13 +29,15 @@ import ( func TestSegmentRepairPlacement(t *testing.T) { piecesCount := 4 testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, StorageNodeCount: 10, UplinkCount: 1, + SatelliteCount: 1, StorageNodeCount: 12, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ - Satellite: testplanet.ReconfigureRS(1, 2, piecesCount, piecesCount), + Satellite: testplanet.ReconfigureRS(1, 1, piecesCount, piecesCount), }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { require.NoError(t, planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], "testbucket")) + defaultLocation := location.Poland + _, err := planet.Satellites[0].API.Buckets.Service.UpdateBucket(ctx, buckets.Bucket{ ProjectID: planet.Uplinks[0].Projects[0].ID, Name: "testbucket", @@ -41,65 +45,85 @@ func TestSegmentRepairPlacement(t *testing.T) { }) require.NoError(t, err) - for _, node := range planet.StorageNodes { - require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), "PL")) - } - - err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "object", testrand.Bytes(5*memory.KiB)) - require.NoError(t, err) - type testCase struct { piecesOutOfPlacement int piecesAfterRepair int + + // how many from out of placement pieces should be also offline + piecesOutOfPlacementOffline int } - for _, tc := range []testCase{ + for i, tc := range []testCase{ // all pieces/nodes are out of placement, repair download/upload should be triggered {piecesOutOfPlacement: piecesCount, piecesAfterRepair: piecesCount}, + + // all pieces/nodes are out of placement, repair download/upload should be triggered, some pieces are offline + {piecesOutOfPlacement: piecesCount, piecesAfterRepair: piecesCount, piecesOutOfPlacementOffline: 1}, + {piecesOutOfPlacement: piecesCount, piecesAfterRepair: piecesCount, piecesOutOfPlacementOffline: 2}, + // few pieces/nodes are out of placement, repair download/upload should be triggered - {piecesOutOfPlacement: piecesCount - 2, piecesAfterRepair: piecesCount}, + {piecesOutOfPlacement: piecesCount - 1, piecesAfterRepair: piecesCount}, + {piecesOutOfPlacement: piecesCount - 1, piecesAfterRepair: piecesCount, piecesOutOfPlacementOffline: 1}, + // single piece/node is out of placement, NO download/upload repair, we are only removing piece from segment // as segment is still above repair threshold {piecesOutOfPlacement: 1, piecesAfterRepair: piecesCount - 1}, + {piecesOutOfPlacement: 1, piecesAfterRepair: piecesCount - 1, piecesOutOfPlacementOffline: 1}, + {piecesOutOfPlacement: 1, piecesAfterRepair: piecesCount - 1, piecesOutOfPlacementOffline: 1}, } { - for _, node := range planet.StorageNodes { - require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), "PL")) - } + t.Run("#"+strconv.Itoa(i), func(t *testing.T) { + for _, node := range planet.StorageNodes { + require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), defaultLocation.String())) + } - require.NoError(t, planet.Satellites[0].Repairer.Overlay.DownloadSelectionCache.Refresh(ctx)) + require.NoError(t, planet.Satellites[0].Repairer.Overlay.DownloadSelectionCache.Refresh(ctx)) - segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) - require.NoError(t, err) - require.Len(t, segments, 1) - require.Len(t, segments[0].Pieces, piecesCount) + expectedData := testrand.Bytes(5 * memory.KiB) + err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "object", expectedData) + require.NoError(t, err) - for _, piece := range segments[0].Pieces[:tc.piecesOutOfPlacement] { - require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, piece.StorageNode, "US")) - } + segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) + require.NoError(t, err) + require.Len(t, segments, 1) + require.Len(t, segments[0].Pieces, piecesCount) - // confirm that some pieces are out of placement - ok, err := allPiecesInPlacement(ctx, planet.Satellites[0].Overlay.Service, segments[0].Pieces, segments[0].Placement) - require.NoError(t, err) - require.False(t, ok) + for index, piece := range segments[0].Pieces { + // make node offline if needed + require.NoError(t, updateNodeStatus(ctx, planet.Satellites[0], planet.FindNode(piece.StorageNode), index < tc.piecesOutOfPlacementOffline, defaultLocation)) - require.NoError(t, planet.Satellites[0].Repairer.Overlay.DownloadSelectionCache.Refresh(ctx)) + if index < tc.piecesOutOfPlacement { + require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, piece.StorageNode, "US")) + } + } - _, err = planet.Satellites[0].Repairer.SegmentRepairer.Repair(ctx, &queue.InjuredSegment{ - StreamID: segments[0].StreamID, - Position: segments[0].Position, + // confirm that some pieces are out of placement + ok, err := allPiecesInPlacement(ctx, planet.Satellites[0].Overlay.Service, segments[0].Pieces, segments[0].Placement) + require.NoError(t, err) + require.False(t, ok) + + require.NoError(t, planet.Satellites[0].Repairer.Overlay.DownloadSelectionCache.Refresh(ctx)) + + _, err = planet.Satellites[0].Repairer.SegmentRepairer.Repair(ctx, &queue.InjuredSegment{ + StreamID: segments[0].StreamID, + Position: segments[0].Position, + }) + require.NoError(t, err) + + // confirm that all pieces have correct placement + segments, err = planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) + require.NoError(t, err) + require.Len(t, segments, 1) + require.NotNil(t, segments[0].RepairedAt) + require.Len(t, segments[0].Pieces, tc.piecesAfterRepair) + + ok, err = allPiecesInPlacement(ctx, planet.Satellites[0].Overlay.Service, segments[0].Pieces, segments[0].Placement) + require.NoError(t, err) + require.True(t, ok) + + data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], "testbucket", "object") + require.NoError(t, err) + require.Equal(t, expectedData, data) }) - require.NoError(t, err) - - // confirm that all pieces have correct placement - segments, err = planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) - require.NoError(t, err) - require.Len(t, segments, 1) - require.NotNil(t, segments[0].RepairedAt) - require.Len(t, segments[0].Pieces, tc.piecesAfterRepair) - - ok, err = allPiecesInPlacement(ctx, planet.Satellites[0].Overlay.Service, segments[0].Pieces, segments[0].Placement) - require.NoError(t, err) - require.True(t, ok) } }) } @@ -248,3 +272,26 @@ func allPiecesInPlacement(ctx context.Context, overaly *overlay.Service, pieces } return true, nil } + +func updateNodeStatus(ctx context.Context, satellite *testplanet.Satellite, node *testplanet.StorageNode, offline bool, countryCode location.CountryCode) error { + timestamp := time.Now() + if offline { + timestamp = time.Now().Add(-4 * time.Hour) + } + + return satellite.DB.OverlayCache().UpdateCheckIn(ctx, overlay.NodeCheckInInfo{ + NodeID: node.ID(), + Address: &pb.NodeAddress{Address: node.Addr()}, + IsUp: true, + Version: &pb.NodeVersion{ + Version: "v0.0.0", + CommitHash: "", + Timestamp: time.Time{}, + Release: true, + }, + Capacity: &pb.NodeCapacity{ + FreeDisk: 1 * memory.GiB.Int64(), + }, + CountryCode: countryCode, + }, timestamp, satellite.Config.Overlay.Node) +}