diff --git a/satellite/repair/repairer/repairer.go b/satellite/repair/repairer/repairer.go index 8ec1708db..75020ddbe 100644 --- a/satellite/repair/repairer/repairer.go +++ b/satellite/repair/repairer/repairer.go @@ -36,6 +36,7 @@ type Config struct { ReputationUpdateEnabled bool `help:"whether the audit score of nodes should be updated as a part of repair" default:"false"` UseRangedLoop bool `help:"whether to enable repair checker observer with ranged loop" default:"true"` DoDeclumping bool `help:"repair pieces on the same network to other nodes" default:"false"` + DoPlacementCheck bool `help:"repair pieces out of segment placement" default:"true"` } // Service contains the information needed to run the repair service. diff --git a/satellite/repair/repairer/segments.go b/satellite/repair/repairer/segments.go index e9e92109e..1c2451183 100644 --- a/satellite/repair/repairer/segments.go +++ b/satellite/repair/repairer/segments.go @@ -14,6 +14,8 @@ import ( "github.com/zeebo/errs" "go.uber.org/zap" + "golang.org/x/exp/maps" + "golang.org/x/exp/slices" "storj.io/common/pb" "storj.io/common/storj" @@ -87,6 +89,7 @@ type SegmentRepairer struct { reputationUpdateEnabled bool doDeclumping bool + doPlacementCheck bool // multiplierOptimalThreshold is the value that multiplied by the optimal // threshold results in the maximum limit of number of nodes to upload @@ -96,6 +99,8 @@ type SegmentRepairer struct { // repairOverrides is the set of values configured by the checker to override the repair threshold for various RS schemes. repairOverrides checker.RepairOverridesMap + allNodeIDs []storj.NodeID + nowFn func() time.Time OnTestingCheckSegmentAlteredHook func() OnTestingPiecesReportHook func(pieces FetchResultReport) @@ -135,6 +140,7 @@ func NewSegmentRepairer( reporter: reporter, reputationUpdateEnabled: config.ReputationUpdateEnabled, doDeclumping: config.DoDeclumping, + doPlacementCheck: config.DoPlacementCheck, nowFn: time.Now, } @@ -191,8 +197,21 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue mon.IntVal("repair_segment_size").Observe(int64(segment.EncryptedSize)) //mon:locked stats.repairSegmentSize.Observe(int64(segment.EncryptedSize)) - var excludeNodeIDs storj.NodeIDList pieces := segment.Pieces + + // reuse allNodeIDs slice if its large enough + if cap(repairer.allNodeIDs) < len(pieces) { + repairer.allNodeIDs = make([]storj.NodeID, len(pieces)) + } else { + repairer.allNodeIDs = repairer.allNodeIDs[:len(pieces)] + } + + for i, p := range pieces { + repairer.allNodeIDs[i] = p.StorageNode + } + + excludeNodeIDs := repairer.allNodeIDs + missingPieces, err := repairer.overlay.GetMissingPieces(ctx, pieces) if err != nil { return false, overlayQueryError.New("error identifying missing pieces: %w", err) @@ -203,11 +222,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue if repairer.doDeclumping { // if multiple pieces are on the same last_net, keep only the first one. The rest are // to be considered retrievable but unhealthy. - nodeIDs := make([]storj.NodeID, len(pieces)) - for i, p := range pieces { - nodeIDs[i] = p.StorageNode - } - lastNets, err := repairer.overlay.GetNodesNetworkInOrder(ctx, nodeIDs) + lastNets, err := repairer.overlay.GetNodesNetworkInOrder(ctx, repairer.allNodeIDs) if err != nil { return false, metainfoGetError.Wrap(err) } @@ -216,11 +231,37 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue for _, clumpedPiece := range clumpedPieces { clumpedPiecesSet[clumpedPiece.Number] = true } + } + var outOfPlacementPieces metabase.Pieces + var outOfPlacementPiecesSet map[uint16]bool + if repairer.doPlacementCheck && segment.Placement != storj.EveryCountry { + var err error + outOfPlacementNodes, err := repairer.overlay.GetNodesOutOfPlacement(ctx, repairer.allNodeIDs, segment.Placement) + if err != nil { + return false, metainfoGetError.Wrap(err) + } + + outOfPlacementPiecesSet = make(map[uint16]bool) + for _, piece := range pieces { + if slices.Contains(outOfPlacementNodes, piece.StorageNode) { + outOfPlacementPieces = append(outOfPlacementPieces, piece) + outOfPlacementPiecesSet[piece.Number] = true + } + } + } + + numUnhealthyRetrievable := len(clumpedPieces) + len(outOfPlacementPieces) + if len(clumpedPieces) != 0 && len(outOfPlacementPieces) != 0 { + // verify that some of clumped pieces and out of placement pieces are not the same + unhealthyRetrievableSet := map[uint16]bool{} + maps.Copy(unhealthyRetrievableSet, clumpedPiecesSet) + maps.Copy(unhealthyRetrievableSet, outOfPlacementPiecesSet) + numUnhealthyRetrievable = len(unhealthyRetrievableSet) } numRetrievable := len(pieces) - len(missingPieces) - numHealthy := len(pieces) - len(missingPieces) - len(clumpedPieces) + numHealthy := len(pieces) - len(missingPieces) - numUnhealthyRetrievable // irreparable segment if numRetrievable < int(segment.Redundancy.RequiredShares) { mon.Counter("repairer_segments_below_min_req").Inc(1) //mon:locked @@ -263,9 +304,34 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue // repair not needed if numHealthy-numHealthyInExcludedCountries > int(repairThreshold) { + // remove pieces out of placement without repairing as we are above repair threshold + if len(outOfPlacementPieces) > 0 { + newPieces, err := segment.Pieces.Update(nil, outOfPlacementPieces) + if err != nil { + return false, metainfoPutError.Wrap(err) + } + + err = repairer.metabase.UpdateSegmentPieces(ctx, metabase.UpdateSegmentPieces{ + StreamID: segment.StreamID, + Position: segment.Position, + + OldPieces: segment.Pieces, + NewRedundancy: segment.Redundancy, + NewPieces: newPieces, + + NewRepairedAt: time.Now(), + }) + if err != nil { + return false, metainfoPutError.Wrap(err) + } + + mon.Meter("dropped_out_of_placement_pieces").Mark(len(outOfPlacementPieces)) + } + mon.Meter("repair_unnecessary").Mark(1) //mon:locked stats.repairUnnecessary.Mark(1) - repairer.log.Debug("segment above repair threshold", zap.Int("numHealthy", numHealthy), zap.Int32("repairThreshold", repairThreshold), zap.Int("numClumped", len(clumpedPieces))) + repairer.log.Debug("segment above repair threshold", zap.Int("numHealthy", numHealthy), zap.Int32("repairThreshold", repairThreshold), + zap.Int("numClumped", len(clumpedPieces)), zap.Int("numOffPieces", len(outOfPlacementPieces))) return true, nil } @@ -282,14 +348,14 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue unhealthyPieces := make(map[metabase.Piece]struct{}) healthySet := make(map[int32]struct{}) // Populate retrievablePieces with all pieces from the segment except those correlating to indices in lostPieces. - // Populate unhealthyPieces with all pieces in lostPieces OR clumpedPieces. + // Populate unhealthyPieces with all pieces in lostPieces, clumpedPieces or outOfPlacementPieces. for _, piece := range pieces { excludeNodeIDs = append(excludeNodeIDs, piece.StorageNode) if lostPiecesSet[piece.Number] { unhealthyPieces[piece] = struct{}{} } else { retrievablePieces = append(retrievablePieces, piece) - if clumpedPiecesSet[piece.Number] { + if clumpedPiecesSet[piece.Number] || outOfPlacementPiecesSet[piece.Number] { unhealthyPieces[piece] = struct{}{} } else { healthySet[int32(piece.Number)] = struct{}{} @@ -609,6 +675,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue zap.Stringer("Stream ID", segment.StreamID), zap.Uint64("Position", segment.Position.Encode()), zap.Int("clumped pieces", len(clumpedPieces)), + zap.Int("out of placement pieces", len(outOfPlacementPieces)), zap.Int("in excluded countries", numHealthyInExcludedCountries), zap.Int("removed pieces", len(toRemove)), zap.Int("repaired pieces", len(repairedPieces)), diff --git a/satellite/repair/repairer/segments_test.go b/satellite/repair/repairer/segments_test.go new file mode 100644 index 000000000..5756ced04 --- /dev/null +++ b/satellite/repair/repairer/segments_test.go @@ -0,0 +1,204 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package repairer_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "storj.io/common/memory" + "storj.io/common/pb" + "storj.io/common/storj" + "storj.io/common/testcontext" + "storj.io/common/testrand" + "storj.io/storj/private/testplanet" + "storj.io/storj/satellite" + "storj.io/storj/satellite/buckets" + "storj.io/storj/satellite/metabase" + "storj.io/storj/satellite/overlay" + "storj.io/storj/satellite/repair/queue" +) + +func TestSegmentRepairPlacement(t *testing.T) { + piecesCount := 4 + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 8, UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: testplanet.ReconfigureRS(1, 2, piecesCount, piecesCount), + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + require.NoError(t, planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], "testbucket")) + + _, err := planet.Satellites[0].API.Buckets.Service.UpdateBucket(ctx, buckets.Bucket{ + ProjectID: planet.Uplinks[0].Projects[0].ID, + Name: "testbucket", + Placement: storj.EU, + }) + require.NoError(t, err) + + for _, node := range planet.StorageNodes { + require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), "PL")) + } + + err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "object", testrand.Bytes(5*memory.KiB)) + require.NoError(t, err) + + type testCase struct { + piecesOutOfPlacement int + piecesAfterRepair int + } + + for _, tc := range []testCase{ + // all pieces/nodes are out of placement, repair download/upload should be triggered + {piecesOutOfPlacement: piecesCount, piecesAfterRepair: piecesCount}, + // few pieces/nodes are out of placement, repair download/upload should be triggered + {piecesOutOfPlacement: piecesCount - 2, piecesAfterRepair: piecesCount}, + // single piece/node is out of placement, NO download/upload repair, we are only removing piece from segment + // as segment is still above repair threshold + {piecesOutOfPlacement: 1, piecesAfterRepair: piecesCount - 1}, + } { + for _, node := range planet.StorageNodes { + require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), "PL")) + } + + require.NoError(t, planet.Satellites[0].Repairer.Overlay.DownloadSelectionCache.Refresh(ctx)) + + segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) + require.NoError(t, err) + require.Len(t, segments, 1) + require.Len(t, segments[0].Pieces, piecesCount) + + for _, piece := range segments[0].Pieces[:tc.piecesOutOfPlacement] { + require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, piece.StorageNode, "US")) + } + + // confirm that some pieces are out of placement + ok, err := allPiecesInPlacement(ctx, planet.Satellites[0].Overlay.Service, segments[0].Pieces, segments[0].Placement) + require.NoError(t, err) + require.False(t, ok) + + require.NoError(t, planet.Satellites[0].Repairer.Overlay.DownloadSelectionCache.Refresh(ctx)) + + _, err = planet.Satellites[0].Repairer.SegmentRepairer.Repair(ctx, &queue.InjuredSegment{ + StreamID: segments[0].StreamID, + Position: segments[0].Position, + }) + require.NoError(t, err) + + // confirm that all pieces have correct placement + segments, err = planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) + require.NoError(t, err) + require.Len(t, segments, 1) + require.NotNil(t, segments[0].RepairedAt) + require.Len(t, segments[0].Pieces, tc.piecesAfterRepair) + + ok, err = allPiecesInPlacement(ctx, planet.Satellites[0].Overlay.Service, segments[0].Pieces, segments[0].Placement) + require.NoError(t, err) + require.True(t, ok) + } + }) +} + +func TestSegmentRepairPlacementAndClumped(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 8, UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: testplanet.Combine( + testplanet.ReconfigureRS(1, 2, 4, 4), + func(log *zap.Logger, index int, config *satellite.Config) { + config.Checker.DoDeclumping = true + config.Repairer.DoDeclumping = true + }, + ), + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + require.NoError(t, planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], "testbucket")) + + _, err := planet.Satellites[0].API.Buckets.Service.UpdateBucket(ctx, buckets.Bucket{ + ProjectID: planet.Uplinks[0].Projects[0].ID, + Name: "testbucket", + Placement: storj.EU, + }) + require.NoError(t, err) + + for _, node := range planet.StorageNodes { + require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), "PL")) + } + + err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "object", testrand.Bytes(5*memory.KiB)) + require.NoError(t, err) + + for _, node := range planet.StorageNodes { + require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), "PL")) + } + + require.NoError(t, planet.Satellites[0].Repairer.Overlay.DownloadSelectionCache.Refresh(ctx)) + + segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) + require.NoError(t, err) + require.Len(t, segments, 1) + require.Len(t, segments[0].Pieces, 4) + + // set nodes to the same placement/country and put all nodes into the same net to mark them as clumped + node0 := planet.FindNode(segments[0].Pieces[0].StorageNode) + for _, piece := range segments[0].Pieces { + require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, piece.StorageNode, "US")) + + local := node0.Contact.Service.Local() + checkInInfo := overlay.NodeCheckInInfo{ + NodeID: piece.StorageNode, + Address: &pb.NodeAddress{Address: local.Address}, + LastIPPort: local.Address, + LastNet: node0.Contact.Service.Local().Address, + IsUp: true, + Operator: &local.Operator, + Capacity: &local.Capacity, + Version: &local.Version, + } + err = planet.Satellites[0].DB.OverlayCache().UpdateCheckIn(ctx, checkInInfo, time.Now().UTC(), overlay.NodeSelectionConfig{}) + require.NoError(t, err) + } + + // confirm that some pieces are out of placement + ok, err := allPiecesInPlacement(ctx, planet.Satellites[0].Overlay.Service, segments[0].Pieces, segments[0].Placement) + require.NoError(t, err) + require.False(t, ok) + + require.NoError(t, planet.Satellites[0].Repairer.Overlay.DownloadSelectionCache.Refresh(ctx)) + + _, err = planet.Satellites[0].Repairer.SegmentRepairer.Repair(ctx, &queue.InjuredSegment{ + StreamID: segments[0].StreamID, + Position: segments[0].Position, + }) + require.NoError(t, err) + + // confirm that all pieces have correct placement + segments, err = planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) + require.NoError(t, err) + require.Len(t, segments, 1) + require.NotNil(t, segments[0].RepairedAt) + require.Len(t, segments[0].Pieces, 4) + + ok, err = allPiecesInPlacement(ctx, planet.Satellites[0].Overlay.Service, segments[0].Pieces, segments[0].Placement) + require.NoError(t, err) + require.True(t, ok) + }) +} + +func allPiecesInPlacement(ctx context.Context, overaly *overlay.Service, pieces metabase.Pieces, placement storj.PlacementConstraint) (bool, error) { + for _, piece := range pieces { + nodeDossier, err := overaly.Get(ctx, piece.StorageNode) + if err != nil { + return false, err + } + if !placement.AllowedCountry(nodeDossier.CountryCode) { + return false, nil + } + } + return true, nil +} diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index 3e3ea3661..cd69fb87a 100755 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -946,6 +946,9 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key # repair pieces on the same network to other nodes # repairer.do-declumping: false +# repair pieces out of segment placement +# repairer.do-placement-check: true + # time limit for downloading pieces from a node for repair # repairer.download-timeout: 5m0s