satellite/repair/repairer: repair pieces out of placement

Segment repairer should take into account segment 'placement' field
and remove or repair pieces from nodes that are outside this placement.

In case when after considering pieces out of placement we are still above
repair threshold we are only updating segment pieces to remove
problematic pieces. Otherwise we are doing regular repair.

https://github.com/storj/storj/issues/5896

Change-Id: I72b652aff2e6b20be3ac6dbfb1d32c2840ce3d59
This commit is contained in:
Michal Niewrzal 2023-05-26 14:22:15 +02:00
parent 52cefb816c
commit 128b0a86e3
4 changed files with 285 additions and 10 deletions

View File

@ -36,6 +36,7 @@ type Config struct {
ReputationUpdateEnabled bool `help:"whether the audit score of nodes should be updated as a part of repair" default:"false"`
UseRangedLoop bool `help:"whether to enable repair checker observer with ranged loop" default:"true"`
DoDeclumping bool `help:"repair pieces on the same network to other nodes" default:"false"`
DoPlacementCheck bool `help:"repair pieces out of segment placement" default:"true"`
}
// Service contains the information needed to run the repair service.

View File

@ -14,6 +14,8 @@ import (
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
"storj.io/common/pb"
"storj.io/common/storj"
@ -87,6 +89,7 @@ type SegmentRepairer struct {
reputationUpdateEnabled bool
doDeclumping bool
doPlacementCheck bool
// multiplierOptimalThreshold is the value that multiplied by the optimal
// threshold results in the maximum limit of number of nodes to upload
@ -96,6 +99,8 @@ type SegmentRepairer struct {
// repairOverrides is the set of values configured by the checker to override the repair threshold for various RS schemes.
repairOverrides checker.RepairOverridesMap
allNodeIDs []storj.NodeID
nowFn func() time.Time
OnTestingCheckSegmentAlteredHook func()
OnTestingPiecesReportHook func(pieces FetchResultReport)
@ -135,6 +140,7 @@ func NewSegmentRepairer(
reporter: reporter,
reputationUpdateEnabled: config.ReputationUpdateEnabled,
doDeclumping: config.DoDeclumping,
doPlacementCheck: config.DoPlacementCheck,
nowFn: time.Now,
}
@ -191,8 +197,21 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
mon.IntVal("repair_segment_size").Observe(int64(segment.EncryptedSize)) //mon:locked
stats.repairSegmentSize.Observe(int64(segment.EncryptedSize))
var excludeNodeIDs storj.NodeIDList
pieces := segment.Pieces
// reuse allNodeIDs slice if its large enough
if cap(repairer.allNodeIDs) < len(pieces) {
repairer.allNodeIDs = make([]storj.NodeID, len(pieces))
} else {
repairer.allNodeIDs = repairer.allNodeIDs[:len(pieces)]
}
for i, p := range pieces {
repairer.allNodeIDs[i] = p.StorageNode
}
excludeNodeIDs := repairer.allNodeIDs
missingPieces, err := repairer.overlay.GetMissingPieces(ctx, pieces)
if err != nil {
return false, overlayQueryError.New("error identifying missing pieces: %w", err)
@ -203,11 +222,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
if repairer.doDeclumping {
// if multiple pieces are on the same last_net, keep only the first one. The rest are
// to be considered retrievable but unhealthy.
nodeIDs := make([]storj.NodeID, len(pieces))
for i, p := range pieces {
nodeIDs[i] = p.StorageNode
}
lastNets, err := repairer.overlay.GetNodesNetworkInOrder(ctx, nodeIDs)
lastNets, err := repairer.overlay.GetNodesNetworkInOrder(ctx, repairer.allNodeIDs)
if err != nil {
return false, metainfoGetError.Wrap(err)
}
@ -216,11 +231,37 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
for _, clumpedPiece := range clumpedPieces {
clumpedPiecesSet[clumpedPiece.Number] = true
}
}
var outOfPlacementPieces metabase.Pieces
var outOfPlacementPiecesSet map[uint16]bool
if repairer.doPlacementCheck && segment.Placement != storj.EveryCountry {
var err error
outOfPlacementNodes, err := repairer.overlay.GetNodesOutOfPlacement(ctx, repairer.allNodeIDs, segment.Placement)
if err != nil {
return false, metainfoGetError.Wrap(err)
}
outOfPlacementPiecesSet = make(map[uint16]bool)
for _, piece := range pieces {
if slices.Contains(outOfPlacementNodes, piece.StorageNode) {
outOfPlacementPieces = append(outOfPlacementPieces, piece)
outOfPlacementPiecesSet[piece.Number] = true
}
}
}
numUnhealthyRetrievable := len(clumpedPieces) + len(outOfPlacementPieces)
if len(clumpedPieces) != 0 && len(outOfPlacementPieces) != 0 {
// verify that some of clumped pieces and out of placement pieces are not the same
unhealthyRetrievableSet := map[uint16]bool{}
maps.Copy(unhealthyRetrievableSet, clumpedPiecesSet)
maps.Copy(unhealthyRetrievableSet, outOfPlacementPiecesSet)
numUnhealthyRetrievable = len(unhealthyRetrievableSet)
}
numRetrievable := len(pieces) - len(missingPieces)
numHealthy := len(pieces) - len(missingPieces) - len(clumpedPieces)
numHealthy := len(pieces) - len(missingPieces) - numUnhealthyRetrievable
// irreparable segment
if numRetrievable < int(segment.Redundancy.RequiredShares) {
mon.Counter("repairer_segments_below_min_req").Inc(1) //mon:locked
@ -263,9 +304,34 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
// repair not needed
if numHealthy-numHealthyInExcludedCountries > int(repairThreshold) {
// remove pieces out of placement without repairing as we are above repair threshold
if len(outOfPlacementPieces) > 0 {
newPieces, err := segment.Pieces.Update(nil, outOfPlacementPieces)
if err != nil {
return false, metainfoPutError.Wrap(err)
}
err = repairer.metabase.UpdateSegmentPieces(ctx, metabase.UpdateSegmentPieces{
StreamID: segment.StreamID,
Position: segment.Position,
OldPieces: segment.Pieces,
NewRedundancy: segment.Redundancy,
NewPieces: newPieces,
NewRepairedAt: time.Now(),
})
if err != nil {
return false, metainfoPutError.Wrap(err)
}
mon.Meter("dropped_out_of_placement_pieces").Mark(len(outOfPlacementPieces))
}
mon.Meter("repair_unnecessary").Mark(1) //mon:locked
stats.repairUnnecessary.Mark(1)
repairer.log.Debug("segment above repair threshold", zap.Int("numHealthy", numHealthy), zap.Int32("repairThreshold", repairThreshold), zap.Int("numClumped", len(clumpedPieces)))
repairer.log.Debug("segment above repair threshold", zap.Int("numHealthy", numHealthy), zap.Int32("repairThreshold", repairThreshold),
zap.Int("numClumped", len(clumpedPieces)), zap.Int("numOffPieces", len(outOfPlacementPieces)))
return true, nil
}
@ -282,14 +348,14 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
unhealthyPieces := make(map[metabase.Piece]struct{})
healthySet := make(map[int32]struct{})
// Populate retrievablePieces with all pieces from the segment except those correlating to indices in lostPieces.
// Populate unhealthyPieces with all pieces in lostPieces OR clumpedPieces.
// Populate unhealthyPieces with all pieces in lostPieces, clumpedPieces or outOfPlacementPieces.
for _, piece := range pieces {
excludeNodeIDs = append(excludeNodeIDs, piece.StorageNode)
if lostPiecesSet[piece.Number] {
unhealthyPieces[piece] = struct{}{}
} else {
retrievablePieces = append(retrievablePieces, piece)
if clumpedPiecesSet[piece.Number] {
if clumpedPiecesSet[piece.Number] || outOfPlacementPiecesSet[piece.Number] {
unhealthyPieces[piece] = struct{}{}
} else {
healthySet[int32(piece.Number)] = struct{}{}
@ -609,6 +675,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
zap.Stringer("Stream ID", segment.StreamID),
zap.Uint64("Position", segment.Position.Encode()),
zap.Int("clumped pieces", len(clumpedPieces)),
zap.Int("out of placement pieces", len(outOfPlacementPieces)),
zap.Int("in excluded countries", numHealthyInExcludedCountries),
zap.Int("removed pieces", len(toRemove)),
zap.Int("repaired pieces", len(repairedPieces)),

View File

@ -0,0 +1,204 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package repairer_test
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
"storj.io/storj/satellite/buckets"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/repair/queue"
)
func TestSegmentRepairPlacement(t *testing.T) {
piecesCount := 4
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 8, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.ReconfigureRS(1, 2, piecesCount, piecesCount),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
require.NoError(t, planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], "testbucket"))
_, err := planet.Satellites[0].API.Buckets.Service.UpdateBucket(ctx, buckets.Bucket{
ProjectID: planet.Uplinks[0].Projects[0].ID,
Name: "testbucket",
Placement: storj.EU,
})
require.NoError(t, err)
for _, node := range planet.StorageNodes {
require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), "PL"))
}
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "object", testrand.Bytes(5*memory.KiB))
require.NoError(t, err)
type testCase struct {
piecesOutOfPlacement int
piecesAfterRepair int
}
for _, tc := range []testCase{
// all pieces/nodes are out of placement, repair download/upload should be triggered
{piecesOutOfPlacement: piecesCount, piecesAfterRepair: piecesCount},
// few pieces/nodes are out of placement, repair download/upload should be triggered
{piecesOutOfPlacement: piecesCount - 2, piecesAfterRepair: piecesCount},
// single piece/node is out of placement, NO download/upload repair, we are only removing piece from segment
// as segment is still above repair threshold
{piecesOutOfPlacement: 1, piecesAfterRepair: piecesCount - 1},
} {
for _, node := range planet.StorageNodes {
require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), "PL"))
}
require.NoError(t, planet.Satellites[0].Repairer.Overlay.DownloadSelectionCache.Refresh(ctx))
segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
require.NoError(t, err)
require.Len(t, segments, 1)
require.Len(t, segments[0].Pieces, piecesCount)
for _, piece := range segments[0].Pieces[:tc.piecesOutOfPlacement] {
require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, piece.StorageNode, "US"))
}
// confirm that some pieces are out of placement
ok, err := allPiecesInPlacement(ctx, planet.Satellites[0].Overlay.Service, segments[0].Pieces, segments[0].Placement)
require.NoError(t, err)
require.False(t, ok)
require.NoError(t, planet.Satellites[0].Repairer.Overlay.DownloadSelectionCache.Refresh(ctx))
_, err = planet.Satellites[0].Repairer.SegmentRepairer.Repair(ctx, &queue.InjuredSegment{
StreamID: segments[0].StreamID,
Position: segments[0].Position,
})
require.NoError(t, err)
// confirm that all pieces have correct placement
segments, err = planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
require.NoError(t, err)
require.Len(t, segments, 1)
require.NotNil(t, segments[0].RepairedAt)
require.Len(t, segments[0].Pieces, tc.piecesAfterRepair)
ok, err = allPiecesInPlacement(ctx, planet.Satellites[0].Overlay.Service, segments[0].Pieces, segments[0].Placement)
require.NoError(t, err)
require.True(t, ok)
}
})
}
func TestSegmentRepairPlacementAndClumped(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 8, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.Combine(
testplanet.ReconfigureRS(1, 2, 4, 4),
func(log *zap.Logger, index int, config *satellite.Config) {
config.Checker.DoDeclumping = true
config.Repairer.DoDeclumping = true
},
),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
require.NoError(t, planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], "testbucket"))
_, err := planet.Satellites[0].API.Buckets.Service.UpdateBucket(ctx, buckets.Bucket{
ProjectID: planet.Uplinks[0].Projects[0].ID,
Name: "testbucket",
Placement: storj.EU,
})
require.NoError(t, err)
for _, node := range planet.StorageNodes {
require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), "PL"))
}
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "object", testrand.Bytes(5*memory.KiB))
require.NoError(t, err)
for _, node := range planet.StorageNodes {
require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), "PL"))
}
require.NoError(t, planet.Satellites[0].Repairer.Overlay.DownloadSelectionCache.Refresh(ctx))
segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
require.NoError(t, err)
require.Len(t, segments, 1)
require.Len(t, segments[0].Pieces, 4)
// set nodes to the same placement/country and put all nodes into the same net to mark them as clumped
node0 := planet.FindNode(segments[0].Pieces[0].StorageNode)
for _, piece := range segments[0].Pieces {
require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, piece.StorageNode, "US"))
local := node0.Contact.Service.Local()
checkInInfo := overlay.NodeCheckInInfo{
NodeID: piece.StorageNode,
Address: &pb.NodeAddress{Address: local.Address},
LastIPPort: local.Address,
LastNet: node0.Contact.Service.Local().Address,
IsUp: true,
Operator: &local.Operator,
Capacity: &local.Capacity,
Version: &local.Version,
}
err = planet.Satellites[0].DB.OverlayCache().UpdateCheckIn(ctx, checkInInfo, time.Now().UTC(), overlay.NodeSelectionConfig{})
require.NoError(t, err)
}
// confirm that some pieces are out of placement
ok, err := allPiecesInPlacement(ctx, planet.Satellites[0].Overlay.Service, segments[0].Pieces, segments[0].Placement)
require.NoError(t, err)
require.False(t, ok)
require.NoError(t, planet.Satellites[0].Repairer.Overlay.DownloadSelectionCache.Refresh(ctx))
_, err = planet.Satellites[0].Repairer.SegmentRepairer.Repair(ctx, &queue.InjuredSegment{
StreamID: segments[0].StreamID,
Position: segments[0].Position,
})
require.NoError(t, err)
// confirm that all pieces have correct placement
segments, err = planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
require.NoError(t, err)
require.Len(t, segments, 1)
require.NotNil(t, segments[0].RepairedAt)
require.Len(t, segments[0].Pieces, 4)
ok, err = allPiecesInPlacement(ctx, planet.Satellites[0].Overlay.Service, segments[0].Pieces, segments[0].Placement)
require.NoError(t, err)
require.True(t, ok)
})
}
func allPiecesInPlacement(ctx context.Context, overaly *overlay.Service, pieces metabase.Pieces, placement storj.PlacementConstraint) (bool, error) {
for _, piece := range pieces {
nodeDossier, err := overaly.Get(ctx, piece.StorageNode)
if err != nil {
return false, err
}
if !placement.AllowedCountry(nodeDossier.CountryCode) {
return false, nil
}
}
return true, nil
}

View File

@ -946,6 +946,9 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
# repair pieces on the same network to other nodes
# repairer.do-declumping: false
# repair pieces out of segment placement
# repairer.do-placement-check: true
# time limit for downloading pieces from a node for repair
# repairer.download-timeout: 5m0s