From 128b0a86e3a6ac6b93146f2ff73079c62a314b70 Mon Sep 17 00:00:00 2001 From: Michal Niewrzal Date: Fri, 26 May 2023 14:22:15 +0200 Subject: [PATCH] satellite/repair/repairer: repair pieces out of placement Segment repairer should take into account segment 'placement' field and remove or repair pieces from nodes that are outside this placement. In case when after considering pieces out of placement we are still above repair threshold we are only updating segment pieces to remove problematic pieces. Otherwise we are doing regular repair. https://github.com/storj/storj/issues/5896 Change-Id: I72b652aff2e6b20be3ac6dbfb1d32c2840ce3d59 --- satellite/repair/repairer/repairer.go | 1 + satellite/repair/repairer/segments.go | 87 ++++++++- satellite/repair/repairer/segments_test.go | 204 ++++++++++++++++++++ scripts/testdata/satellite-config.yaml.lock | 3 + 4 files changed, 285 insertions(+), 10 deletions(-) create mode 100644 satellite/repair/repairer/segments_test.go diff --git a/satellite/repair/repairer/repairer.go b/satellite/repair/repairer/repairer.go index 8ec1708db..75020ddbe 100644 --- a/satellite/repair/repairer/repairer.go +++ b/satellite/repair/repairer/repairer.go @@ -36,6 +36,7 @@ type Config struct { ReputationUpdateEnabled bool `help:"whether the audit score of nodes should be updated as a part of repair" default:"false"` UseRangedLoop bool `help:"whether to enable repair checker observer with ranged loop" default:"true"` DoDeclumping bool `help:"repair pieces on the same network to other nodes" default:"false"` + DoPlacementCheck bool `help:"repair pieces out of segment placement" default:"true"` } // Service contains the information needed to run the repair service. diff --git a/satellite/repair/repairer/segments.go b/satellite/repair/repairer/segments.go index e9e92109e..1c2451183 100644 --- a/satellite/repair/repairer/segments.go +++ b/satellite/repair/repairer/segments.go @@ -14,6 +14,8 @@ import ( "github.com/zeebo/errs" "go.uber.org/zap" + "golang.org/x/exp/maps" + "golang.org/x/exp/slices" "storj.io/common/pb" "storj.io/common/storj" @@ -87,6 +89,7 @@ type SegmentRepairer struct { reputationUpdateEnabled bool doDeclumping bool + doPlacementCheck bool // multiplierOptimalThreshold is the value that multiplied by the optimal // threshold results in the maximum limit of number of nodes to upload @@ -96,6 +99,8 @@ type SegmentRepairer struct { // repairOverrides is the set of values configured by the checker to override the repair threshold for various RS schemes. repairOverrides checker.RepairOverridesMap + allNodeIDs []storj.NodeID + nowFn func() time.Time OnTestingCheckSegmentAlteredHook func() OnTestingPiecesReportHook func(pieces FetchResultReport) @@ -135,6 +140,7 @@ func NewSegmentRepairer( reporter: reporter, reputationUpdateEnabled: config.ReputationUpdateEnabled, doDeclumping: config.DoDeclumping, + doPlacementCheck: config.DoPlacementCheck, nowFn: time.Now, } @@ -191,8 +197,21 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue mon.IntVal("repair_segment_size").Observe(int64(segment.EncryptedSize)) //mon:locked stats.repairSegmentSize.Observe(int64(segment.EncryptedSize)) - var excludeNodeIDs storj.NodeIDList pieces := segment.Pieces + + // reuse allNodeIDs slice if its large enough + if cap(repairer.allNodeIDs) < len(pieces) { + repairer.allNodeIDs = make([]storj.NodeID, len(pieces)) + } else { + repairer.allNodeIDs = repairer.allNodeIDs[:len(pieces)] + } + + for i, p := range pieces { + repairer.allNodeIDs[i] = p.StorageNode + } + + excludeNodeIDs := repairer.allNodeIDs + missingPieces, err := repairer.overlay.GetMissingPieces(ctx, pieces) if err != nil { return false, overlayQueryError.New("error identifying missing pieces: %w", err) @@ -203,11 +222,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue if repairer.doDeclumping { // if multiple pieces are on the same last_net, keep only the first one. The rest are // to be considered retrievable but unhealthy. - nodeIDs := make([]storj.NodeID, len(pieces)) - for i, p := range pieces { - nodeIDs[i] = p.StorageNode - } - lastNets, err := repairer.overlay.GetNodesNetworkInOrder(ctx, nodeIDs) + lastNets, err := repairer.overlay.GetNodesNetworkInOrder(ctx, repairer.allNodeIDs) if err != nil { return false, metainfoGetError.Wrap(err) } @@ -216,11 +231,37 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue for _, clumpedPiece := range clumpedPieces { clumpedPiecesSet[clumpedPiece.Number] = true } + } + var outOfPlacementPieces metabase.Pieces + var outOfPlacementPiecesSet map[uint16]bool + if repairer.doPlacementCheck && segment.Placement != storj.EveryCountry { + var err error + outOfPlacementNodes, err := repairer.overlay.GetNodesOutOfPlacement(ctx, repairer.allNodeIDs, segment.Placement) + if err != nil { + return false, metainfoGetError.Wrap(err) + } + + outOfPlacementPiecesSet = make(map[uint16]bool) + for _, piece := range pieces { + if slices.Contains(outOfPlacementNodes, piece.StorageNode) { + outOfPlacementPieces = append(outOfPlacementPieces, piece) + outOfPlacementPiecesSet[piece.Number] = true + } + } + } + + numUnhealthyRetrievable := len(clumpedPieces) + len(outOfPlacementPieces) + if len(clumpedPieces) != 0 && len(outOfPlacementPieces) != 0 { + // verify that some of clumped pieces and out of placement pieces are not the same + unhealthyRetrievableSet := map[uint16]bool{} + maps.Copy(unhealthyRetrievableSet, clumpedPiecesSet) + maps.Copy(unhealthyRetrievableSet, outOfPlacementPiecesSet) + numUnhealthyRetrievable = len(unhealthyRetrievableSet) } numRetrievable := len(pieces) - len(missingPieces) - numHealthy := len(pieces) - len(missingPieces) - len(clumpedPieces) + numHealthy := len(pieces) - len(missingPieces) - numUnhealthyRetrievable // irreparable segment if numRetrievable < int(segment.Redundancy.RequiredShares) { mon.Counter("repairer_segments_below_min_req").Inc(1) //mon:locked @@ -263,9 +304,34 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue // repair not needed if numHealthy-numHealthyInExcludedCountries > int(repairThreshold) { + // remove pieces out of placement without repairing as we are above repair threshold + if len(outOfPlacementPieces) > 0 { + newPieces, err := segment.Pieces.Update(nil, outOfPlacementPieces) + if err != nil { + return false, metainfoPutError.Wrap(err) + } + + err = repairer.metabase.UpdateSegmentPieces(ctx, metabase.UpdateSegmentPieces{ + StreamID: segment.StreamID, + Position: segment.Position, + + OldPieces: segment.Pieces, + NewRedundancy: segment.Redundancy, + NewPieces: newPieces, + + NewRepairedAt: time.Now(), + }) + if err != nil { + return false, metainfoPutError.Wrap(err) + } + + mon.Meter("dropped_out_of_placement_pieces").Mark(len(outOfPlacementPieces)) + } + mon.Meter("repair_unnecessary").Mark(1) //mon:locked stats.repairUnnecessary.Mark(1) - repairer.log.Debug("segment above repair threshold", zap.Int("numHealthy", numHealthy), zap.Int32("repairThreshold", repairThreshold), zap.Int("numClumped", len(clumpedPieces))) + repairer.log.Debug("segment above repair threshold", zap.Int("numHealthy", numHealthy), zap.Int32("repairThreshold", repairThreshold), + zap.Int("numClumped", len(clumpedPieces)), zap.Int("numOffPieces", len(outOfPlacementPieces))) return true, nil } @@ -282,14 +348,14 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue unhealthyPieces := make(map[metabase.Piece]struct{}) healthySet := make(map[int32]struct{}) // Populate retrievablePieces with all pieces from the segment except those correlating to indices in lostPieces. - // Populate unhealthyPieces with all pieces in lostPieces OR clumpedPieces. + // Populate unhealthyPieces with all pieces in lostPieces, clumpedPieces or outOfPlacementPieces. for _, piece := range pieces { excludeNodeIDs = append(excludeNodeIDs, piece.StorageNode) if lostPiecesSet[piece.Number] { unhealthyPieces[piece] = struct{}{} } else { retrievablePieces = append(retrievablePieces, piece) - if clumpedPiecesSet[piece.Number] { + if clumpedPiecesSet[piece.Number] || outOfPlacementPiecesSet[piece.Number] { unhealthyPieces[piece] = struct{}{} } else { healthySet[int32(piece.Number)] = struct{}{} @@ -609,6 +675,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue zap.Stringer("Stream ID", segment.StreamID), zap.Uint64("Position", segment.Position.Encode()), zap.Int("clumped pieces", len(clumpedPieces)), + zap.Int("out of placement pieces", len(outOfPlacementPieces)), zap.Int("in excluded countries", numHealthyInExcludedCountries), zap.Int("removed pieces", len(toRemove)), zap.Int("repaired pieces", len(repairedPieces)), diff --git a/satellite/repair/repairer/segments_test.go b/satellite/repair/repairer/segments_test.go new file mode 100644 index 000000000..5756ced04 --- /dev/null +++ b/satellite/repair/repairer/segments_test.go @@ -0,0 +1,204 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package repairer_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "storj.io/common/memory" + "storj.io/common/pb" + "storj.io/common/storj" + "storj.io/common/testcontext" + "storj.io/common/testrand" + "storj.io/storj/private/testplanet" + "storj.io/storj/satellite" + "storj.io/storj/satellite/buckets" + "storj.io/storj/satellite/metabase" + "storj.io/storj/satellite/overlay" + "storj.io/storj/satellite/repair/queue" +) + +func TestSegmentRepairPlacement(t *testing.T) { + piecesCount := 4 + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 8, UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: testplanet.ReconfigureRS(1, 2, piecesCount, piecesCount), + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + require.NoError(t, planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], "testbucket")) + + _, err := planet.Satellites[0].API.Buckets.Service.UpdateBucket(ctx, buckets.Bucket{ + ProjectID: planet.Uplinks[0].Projects[0].ID, + Name: "testbucket", + Placement: storj.EU, + }) + require.NoError(t, err) + + for _, node := range planet.StorageNodes { + require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), "PL")) + } + + err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "object", testrand.Bytes(5*memory.KiB)) + require.NoError(t, err) + + type testCase struct { + piecesOutOfPlacement int + piecesAfterRepair int + } + + for _, tc := range []testCase{ + // all pieces/nodes are out of placement, repair download/upload should be triggered + {piecesOutOfPlacement: piecesCount, piecesAfterRepair: piecesCount}, + // few pieces/nodes are out of placement, repair download/upload should be triggered + {piecesOutOfPlacement: piecesCount - 2, piecesAfterRepair: piecesCount}, + // single piece/node is out of placement, NO download/upload repair, we are only removing piece from segment + // as segment is still above repair threshold + {piecesOutOfPlacement: 1, piecesAfterRepair: piecesCount - 1}, + } { + for _, node := range planet.StorageNodes { + require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), "PL")) + } + + require.NoError(t, planet.Satellites[0].Repairer.Overlay.DownloadSelectionCache.Refresh(ctx)) + + segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) + require.NoError(t, err) + require.Len(t, segments, 1) + require.Len(t, segments[0].Pieces, piecesCount) + + for _, piece := range segments[0].Pieces[:tc.piecesOutOfPlacement] { + require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, piece.StorageNode, "US")) + } + + // confirm that some pieces are out of placement + ok, err := allPiecesInPlacement(ctx, planet.Satellites[0].Overlay.Service, segments[0].Pieces, segments[0].Placement) + require.NoError(t, err) + require.False(t, ok) + + require.NoError(t, planet.Satellites[0].Repairer.Overlay.DownloadSelectionCache.Refresh(ctx)) + + _, err = planet.Satellites[0].Repairer.SegmentRepairer.Repair(ctx, &queue.InjuredSegment{ + StreamID: segments[0].StreamID, + Position: segments[0].Position, + }) + require.NoError(t, err) + + // confirm that all pieces have correct placement + segments, err = planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) + require.NoError(t, err) + require.Len(t, segments, 1) + require.NotNil(t, segments[0].RepairedAt) + require.Len(t, segments[0].Pieces, tc.piecesAfterRepair) + + ok, err = allPiecesInPlacement(ctx, planet.Satellites[0].Overlay.Service, segments[0].Pieces, segments[0].Placement) + require.NoError(t, err) + require.True(t, ok) + } + }) +} + +func TestSegmentRepairPlacementAndClumped(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 8, UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: testplanet.Combine( + testplanet.ReconfigureRS(1, 2, 4, 4), + func(log *zap.Logger, index int, config *satellite.Config) { + config.Checker.DoDeclumping = true + config.Repairer.DoDeclumping = true + }, + ), + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + require.NoError(t, planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], "testbucket")) + + _, err := planet.Satellites[0].API.Buckets.Service.UpdateBucket(ctx, buckets.Bucket{ + ProjectID: planet.Uplinks[0].Projects[0].ID, + Name: "testbucket", + Placement: storj.EU, + }) + require.NoError(t, err) + + for _, node := range planet.StorageNodes { + require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), "PL")) + } + + err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "object", testrand.Bytes(5*memory.KiB)) + require.NoError(t, err) + + for _, node := range planet.StorageNodes { + require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), "PL")) + } + + require.NoError(t, planet.Satellites[0].Repairer.Overlay.DownloadSelectionCache.Refresh(ctx)) + + segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) + require.NoError(t, err) + require.Len(t, segments, 1) + require.Len(t, segments[0].Pieces, 4) + + // set nodes to the same placement/country and put all nodes into the same net to mark them as clumped + node0 := planet.FindNode(segments[0].Pieces[0].StorageNode) + for _, piece := range segments[0].Pieces { + require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, piece.StorageNode, "US")) + + local := node0.Contact.Service.Local() + checkInInfo := overlay.NodeCheckInInfo{ + NodeID: piece.StorageNode, + Address: &pb.NodeAddress{Address: local.Address}, + LastIPPort: local.Address, + LastNet: node0.Contact.Service.Local().Address, + IsUp: true, + Operator: &local.Operator, + Capacity: &local.Capacity, + Version: &local.Version, + } + err = planet.Satellites[0].DB.OverlayCache().UpdateCheckIn(ctx, checkInInfo, time.Now().UTC(), overlay.NodeSelectionConfig{}) + require.NoError(t, err) + } + + // confirm that some pieces are out of placement + ok, err := allPiecesInPlacement(ctx, planet.Satellites[0].Overlay.Service, segments[0].Pieces, segments[0].Placement) + require.NoError(t, err) + require.False(t, ok) + + require.NoError(t, planet.Satellites[0].Repairer.Overlay.DownloadSelectionCache.Refresh(ctx)) + + _, err = planet.Satellites[0].Repairer.SegmentRepairer.Repair(ctx, &queue.InjuredSegment{ + StreamID: segments[0].StreamID, + Position: segments[0].Position, + }) + require.NoError(t, err) + + // confirm that all pieces have correct placement + segments, err = planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) + require.NoError(t, err) + require.Len(t, segments, 1) + require.NotNil(t, segments[0].RepairedAt) + require.Len(t, segments[0].Pieces, 4) + + ok, err = allPiecesInPlacement(ctx, planet.Satellites[0].Overlay.Service, segments[0].Pieces, segments[0].Placement) + require.NoError(t, err) + require.True(t, ok) + }) +} + +func allPiecesInPlacement(ctx context.Context, overaly *overlay.Service, pieces metabase.Pieces, placement storj.PlacementConstraint) (bool, error) { + for _, piece := range pieces { + nodeDossier, err := overaly.Get(ctx, piece.StorageNode) + if err != nil { + return false, err + } + if !placement.AllowedCountry(nodeDossier.CountryCode) { + return false, nil + } + } + return true, nil +} diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index 3e3ea3661..cd69fb87a 100755 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -946,6 +946,9 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key # repair pieces on the same network to other nodes # repairer.do-declumping: false +# repair pieces out of segment placement +# repairer.do-placement-check: true + # time limit for downloading pieces from a node for repair # repairer.download-timeout: 5m0s