satellite/repair/repairer: use KnownReliable to check segment pieces

At the moment segment repairer is skipping offline nodes in checks like
clumped pieces and off placement pieces. This change is fixing this
problem using new version of KnownReliable method. New method is
returning both online and offline nodes. Provided data can be used to
find clumped and off placement pieces.

We are not using DownloadSelectionCache anymore with segment repairer.

https://github.com/storj/storj/issues/5998

Change-Id: I236a1926e21f13df4cdedc91130352d37ff97e18
This commit is contained in:
Michal Niewrzal 2023-06-26 11:54:52 +02:00 committed by Storj Robot
parent ec780003f0
commit 578724e9b1
2 changed files with 204 additions and 118 deletions

View File

@ -15,7 +15,6 @@ import (
"github.com/zeebo/errs" "github.com/zeebo/errs"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/exp/maps" "golang.org/x/exp/maps"
"golang.org/x/exp/slices"
"storj.io/common/pb" "storj.io/common/pb"
"storj.io/common/storj" "storj.io/common/storj"
@ -195,65 +194,15 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
mon.IntVal("repair_segment_size").Observe(int64(segment.EncryptedSize)) //mon:locked mon.IntVal("repair_segment_size").Observe(int64(segment.EncryptedSize)) //mon:locked
stats.repairSegmentSize.Observe(int64(segment.EncryptedSize)) stats.repairSegmentSize.Observe(int64(segment.EncryptedSize))
piecesCheck, err := repairer.classifySegmentPieces(ctx, segment)
if err != nil {
return false, err
}
pieces := segment.Pieces pieces := segment.Pieces
allNodeIDs := make([]storj.NodeID, len(pieces)) numRetrievable := len(pieces) - len(piecesCheck.MissingPiecesSet)
for i, p := range pieces { numHealthy := len(pieces) - len(piecesCheck.MissingPiecesSet) - piecesCheck.NumUnhealthyRetrievable
allNodeIDs[i] = p.StorageNode
}
excludeNodeIDs := allNodeIDs
missingPieces, err := repairer.overlay.GetMissingPieces(ctx, pieces)
if err != nil {
return false, overlayQueryError.New("error identifying missing pieces: %w", err)
}
var clumpedPieces metabase.Pieces
var clumpedPiecesSet map[uint16]bool
if repairer.doDeclumping {
// if multiple pieces are on the same last_net, keep only the first one. The rest are
// to be considered retrievable but unhealthy.
lastNets, err := repairer.overlay.GetNodesNetworkInOrder(ctx, allNodeIDs)
if err != nil {
return false, metainfoGetError.Wrap(err)
}
clumpedPieces = repair.FindClumpedPieces(segment.Pieces, lastNets)
clumpedPiecesSet = make(map[uint16]bool)
for _, clumpedPiece := range clumpedPieces {
clumpedPiecesSet[clumpedPiece.Number] = true
}
}
var outOfPlacementPieces metabase.Pieces
var outOfPlacementPiecesSet map[uint16]bool
if repairer.doPlacementCheck && segment.Placement != storj.EveryCountry {
var err error
outOfPlacementNodes, err := repairer.overlay.GetNodesOutOfPlacement(ctx, allNodeIDs, segment.Placement)
if err != nil {
return false, metainfoGetError.Wrap(err)
}
outOfPlacementPiecesSet = make(map[uint16]bool)
for _, piece := range pieces {
if slices.Contains(outOfPlacementNodes, piece.StorageNode) {
outOfPlacementPieces = append(outOfPlacementPieces, piece)
outOfPlacementPiecesSet[piece.Number] = true
}
}
}
numUnhealthyRetrievable := len(clumpedPieces) + len(outOfPlacementPieces)
if len(clumpedPieces) != 0 && len(outOfPlacementPieces) != 0 {
// verify that some of clumped pieces and out of placement pieces are not the same
unhealthyRetrievableSet := map[uint16]bool{}
maps.Copy(unhealthyRetrievableSet, clumpedPiecesSet)
maps.Copy(unhealthyRetrievableSet, outOfPlacementPiecesSet)
numUnhealthyRetrievable = len(unhealthyRetrievableSet)
}
numRetrievable := len(pieces) - len(missingPieces)
numHealthy := len(pieces) - len(missingPieces) - numUnhealthyRetrievable
// irreparable segment // irreparable segment
if numRetrievable < int(segment.Redundancy.RequiredShares) { if numRetrievable < int(segment.Redundancy.RequiredShares) {
mon.Counter("repairer_segments_below_min_req").Inc(1) //mon:locked mon.Counter("repairer_segments_below_min_req").Inc(1) //mon:locked
@ -297,7 +246,15 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
// repair not needed // repair not needed
if numHealthy-numHealthyInExcludedCountries > int(repairThreshold) { if numHealthy-numHealthyInExcludedCountries > int(repairThreshold) {
// remove pieces out of placement without repairing as we are above repair threshold // remove pieces out of placement without repairing as we are above repair threshold
if len(outOfPlacementPieces) > 0 { if len(piecesCheck.OutOfPlacementPiecesSet) > 0 {
var outOfPlacementPieces metabase.Pieces
for _, piece := range pieces {
if _, ok := piecesCheck.OutOfPlacementPiecesSet[piece.Number]; ok {
outOfPlacementPieces = append(outOfPlacementPieces, piece)
}
}
newPieces, err := segment.Pieces.Update(nil, outOfPlacementPieces) newPieces, err := segment.Pieces.Update(nil, outOfPlacementPieces)
if err != nil { if err != nil {
return false, metainfoPutError.Wrap(err) return false, metainfoPutError.Wrap(err)
@ -317,13 +274,13 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
return false, metainfoPutError.Wrap(err) return false, metainfoPutError.Wrap(err)
} }
mon.Meter("dropped_out_of_placement_pieces").Mark(len(outOfPlacementPieces)) mon.Meter("dropped_out_of_placement_pieces").Mark(len(piecesCheck.OutOfPlacementPiecesSet))
} }
mon.Meter("repair_unnecessary").Mark(1) //mon:locked mon.Meter("repair_unnecessary").Mark(1) //mon:locked
stats.repairUnnecessary.Mark(1) stats.repairUnnecessary.Mark(1)
repairer.log.Debug("segment above repair threshold", zap.Int("numHealthy", numHealthy), zap.Int32("repairThreshold", repairThreshold), repairer.log.Debug("segment above repair threshold", zap.Int("numHealthy", numHealthy), zap.Int32("repairThreshold", repairThreshold),
zap.Int("numClumped", len(clumpedPieces)), zap.Int("numOffPieces", len(outOfPlacementPieces))) zap.Int("numClumped", len(piecesCheck.ClumpedPiecesSet)), zap.Int("numOffPieces", len(piecesCheck.OutOfPlacementPiecesSet)))
return true, nil return true, nil
} }
@ -334,7 +291,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
mon.FloatVal("healthy_ratio_before_repair").Observe(healthyRatioBeforeRepair) //mon:locked mon.FloatVal("healthy_ratio_before_repair").Observe(healthyRatioBeforeRepair) //mon:locked
stats.healthyRatioBeforeRepair.Observe(healthyRatioBeforeRepair) stats.healthyRatioBeforeRepair.Observe(healthyRatioBeforeRepair)
lostPiecesSet := sliceToSet(missingPieces) lostPiecesSet := piecesCheck.MissingPiecesSet
var retrievablePieces metabase.Pieces var retrievablePieces metabase.Pieces
unhealthyPieces := make(map[metabase.Piece]struct{}) unhealthyPieces := make(map[metabase.Piece]struct{})
@ -342,12 +299,11 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
// Populate retrievablePieces with all pieces from the segment except those correlating to indices in lostPieces. // Populate retrievablePieces with all pieces from the segment except those correlating to indices in lostPieces.
// Populate unhealthyPieces with all pieces in lostPieces, clumpedPieces or outOfPlacementPieces. // Populate unhealthyPieces with all pieces in lostPieces, clumpedPieces or outOfPlacementPieces.
for _, piece := range pieces { for _, piece := range pieces {
excludeNodeIDs = append(excludeNodeIDs, piece.StorageNode)
if lostPiecesSet[piece.Number] { if lostPiecesSet[piece.Number] {
unhealthyPieces[piece] = struct{}{} unhealthyPieces[piece] = struct{}{}
} else { } else {
retrievablePieces = append(retrievablePieces, piece) retrievablePieces = append(retrievablePieces, piece)
if clumpedPiecesSet[piece.Number] || outOfPlacementPiecesSet[piece.Number] { if piecesCheck.ClumpedPiecesSet[piece.Number] || piecesCheck.OutOfPlacementPiecesSet[piece.Number] {
unhealthyPieces[piece] = struct{}{} unhealthyPieces[piece] = struct{}{}
} else { } else {
healthySet[int32(piece.Number)] = struct{}{} healthySet[int32(piece.Number)] = struct{}{}
@ -399,7 +355,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
// Request Overlay for n-h new storage nodes // Request Overlay for n-h new storage nodes
request := overlay.FindStorageNodesRequest{ request := overlay.FindStorageNodesRequest{
RequestedCount: requestCount, RequestedCount: requestCount,
ExcludedIDs: excludeNodeIDs, ExcludedIDs: piecesCheck.ExcludeNodeIDs,
Placement: segment.Placement, Placement: segment.Placement,
} }
newNodes, err := repairer.overlay.FindStorageNodesForUpload(ctx, request) newNodes, err := repairer.overlay.FindStorageNodesForUpload(ctx, request)
@ -671,8 +627,8 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
repairer.log.Debug("repaired segment", repairer.log.Debug("repaired segment",
zap.Stringer("Stream ID", segment.StreamID), zap.Stringer("Stream ID", segment.StreamID),
zap.Uint64("Position", segment.Position.Encode()), zap.Uint64("Position", segment.Position.Encode()),
zap.Int("clumped pieces", len(clumpedPieces)), zap.Int("clumped pieces", len(piecesCheck.ClumpedPiecesSet)),
zap.Int("out of placement pieces", len(outOfPlacementPieces)), zap.Int("out of placement pieces", len(piecesCheck.OutOfPlacementPiecesSet)),
zap.Int("in excluded countries", numHealthyInExcludedCountries), zap.Int("in excluded countries", numHealthyInExcludedCountries),
zap.Int("removed pieces", len(toRemove)), zap.Int("removed pieces", len(toRemove)),
zap.Int("repaired pieces", len(repairedPieces)), zap.Int("repaired pieces", len(repairedPieces)),
@ -681,6 +637,98 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
return true, nil return true, nil
} }
type piecesCheckResult struct {
ExcludeNodeIDs []storj.NodeID
MissingPiecesSet map[uint16]bool
ClumpedPiecesSet map[uint16]bool
OutOfPlacementPiecesSet map[uint16]bool
NumUnhealthyRetrievable int
}
func (repairer *SegmentRepairer) classifySegmentPieces(ctx context.Context, segment metabase.Segment) (result piecesCheckResult, err error) {
defer mon.Task()(&ctx)(&err)
pieces := segment.Pieces
allNodeIDs := make([]storj.NodeID, len(pieces))
nodeIDPieceMap := map[storj.NodeID]uint16{}
result.MissingPiecesSet = map[uint16]bool{}
for i, p := range pieces {
allNodeIDs[i] = p.StorageNode
nodeIDPieceMap[p.StorageNode] = p.Number
result.MissingPiecesSet[p.Number] = true
}
result.ExcludeNodeIDs = allNodeIDs
online, offline, err := repairer.overlay.KnownReliable(ctx, allNodeIDs)
if err != nil {
return piecesCheckResult{}, overlayQueryError.New("error identifying missing pieces: %w", err)
}
// remove online nodes from missing pieces
for _, onlineNode := range online {
pieceNum := nodeIDPieceMap[onlineNode.ID]
delete(result.MissingPiecesSet, pieceNum)
}
if repairer.doDeclumping {
// if multiple pieces are on the same last_net, keep only the first one. The rest are
// to be considered retrievable but unhealthy.
lastNets := make([]string, 0, len(allNodeIDs))
reliablePieces := metabase.Pieces{}
collectLastNets := func(reliable []overlay.SelectedNode) {
for _, node := range reliable {
pieceNum := nodeIDPieceMap[node.ID]
reliablePieces = append(reliablePieces, metabase.Piece{
Number: pieceNum,
StorageNode: node.ID,
})
lastNets = append(lastNets, node.LastNet)
}
}
collectLastNets(online)
collectLastNets(offline)
clumpedPieces := repair.FindClumpedPieces(reliablePieces, lastNets)
result.ClumpedPiecesSet = map[uint16]bool{}
for _, clumpedPiece := range clumpedPieces {
result.ClumpedPiecesSet[clumpedPiece.Number] = true
}
}
if repairer.doPlacementCheck && segment.Placement != storj.EveryCountry {
result.OutOfPlacementPiecesSet = map[uint16]bool{}
checkPlacement := func(reliable []overlay.SelectedNode) {
for _, node := range reliable {
if segment.Placement.AllowedCountry(node.CountryCode) {
continue
}
result.OutOfPlacementPiecesSet[nodeIDPieceMap[node.ID]] = true
}
}
checkPlacement(online)
checkPlacement(offline)
}
result.NumUnhealthyRetrievable = len(result.ClumpedPiecesSet) + len(result.OutOfPlacementPiecesSet)
if len(result.ClumpedPiecesSet) != 0 && len(result.OutOfPlacementPiecesSet) != 0 {
// verify that some of clumped pieces and out of placement pieces are not the same
unhealthyRetrievableSet := map[uint16]bool{}
maps.Copy(unhealthyRetrievableSet, result.ClumpedPiecesSet)
maps.Copy(unhealthyRetrievableSet, result.OutOfPlacementPiecesSet)
result.NumUnhealthyRetrievable = len(unhealthyRetrievableSet)
}
return result, nil
}
// checkIfSegmentAltered checks if oldSegment has been altered since it was selected for audit. // checkIfSegmentAltered checks if oldSegment has been altered since it was selected for audit.
func (repairer *SegmentRepairer) checkIfSegmentAltered(ctx context.Context, oldSegment metabase.Segment) (err error) { func (repairer *SegmentRepairer) checkIfSegmentAltered(ctx context.Context, oldSegment metabase.Segment) (err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
@ -797,15 +845,6 @@ func (repairer *SegmentRepairer) AdminFetchPieces(ctx context.Context, seg *meta
return pieceInfos, nil return pieceInfos, nil
} }
// sliceToSet converts the given slice to a set.
func sliceToSet(slice []uint16) map[uint16]bool {
set := make(map[uint16]bool, len(slice))
for _, value := range slice {
set[value] = true
}
return set
}
// commaSeparatedArray concatenates an array into a comma-separated string, // commaSeparatedArray concatenates an array into a comma-separated string,
// lazily. // lazily.
type commaSeparatedArray []string type commaSeparatedArray []string

View File

@ -5,6 +5,7 @@ package repairer_test
import ( import (
"context" "context"
"strconv"
"testing" "testing"
"time" "time"
@ -14,6 +15,7 @@ import (
"storj.io/common/memory" "storj.io/common/memory"
"storj.io/common/pb" "storj.io/common/pb"
"storj.io/common/storj" "storj.io/common/storj"
"storj.io/common/storj/location"
"storj.io/common/testcontext" "storj.io/common/testcontext"
"storj.io/common/testrand" "storj.io/common/testrand"
"storj.io/storj/private/testplanet" "storj.io/storj/private/testplanet"
@ -27,13 +29,15 @@ import (
func TestSegmentRepairPlacement(t *testing.T) { func TestSegmentRepairPlacement(t *testing.T) {
piecesCount := 4 piecesCount := 4
testplanet.Run(t, testplanet.Config{ testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 10, UplinkCount: 1, SatelliteCount: 1, StorageNodeCount: 12, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{ Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.ReconfigureRS(1, 2, piecesCount, piecesCount), Satellite: testplanet.ReconfigureRS(1, 1, piecesCount, piecesCount),
}, },
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
require.NoError(t, planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], "testbucket")) require.NoError(t, planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], "testbucket"))
defaultLocation := location.Poland
_, err := planet.Satellites[0].API.Buckets.Service.UpdateBucket(ctx, buckets.Bucket{ _, err := planet.Satellites[0].API.Buckets.Service.UpdateBucket(ctx, buckets.Bucket{
ProjectID: planet.Uplinks[0].Projects[0].ID, ProjectID: planet.Uplinks[0].Projects[0].ID,
Name: "testbucket", Name: "testbucket",
@ -41,41 +45,56 @@ func TestSegmentRepairPlacement(t *testing.T) {
}) })
require.NoError(t, err) require.NoError(t, err)
for _, node := range planet.StorageNodes {
require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), "PL"))
}
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "object", testrand.Bytes(5*memory.KiB))
require.NoError(t, err)
type testCase struct { type testCase struct {
piecesOutOfPlacement int piecesOutOfPlacement int
piecesAfterRepair int piecesAfterRepair int
// how many from out of placement pieces should be also offline
piecesOutOfPlacementOffline int
} }
for _, tc := range []testCase{ for i, tc := range []testCase{
// all pieces/nodes are out of placement, repair download/upload should be triggered // all pieces/nodes are out of placement, repair download/upload should be triggered
{piecesOutOfPlacement: piecesCount, piecesAfterRepair: piecesCount}, {piecesOutOfPlacement: piecesCount, piecesAfterRepair: piecesCount},
// all pieces/nodes are out of placement, repair download/upload should be triggered, some pieces are offline
{piecesOutOfPlacement: piecesCount, piecesAfterRepair: piecesCount, piecesOutOfPlacementOffline: 1},
{piecesOutOfPlacement: piecesCount, piecesAfterRepair: piecesCount, piecesOutOfPlacementOffline: 2},
// few pieces/nodes are out of placement, repair download/upload should be triggered // few pieces/nodes are out of placement, repair download/upload should be triggered
{piecesOutOfPlacement: piecesCount - 2, piecesAfterRepair: piecesCount}, {piecesOutOfPlacement: piecesCount - 1, piecesAfterRepair: piecesCount},
{piecesOutOfPlacement: piecesCount - 1, piecesAfterRepair: piecesCount, piecesOutOfPlacementOffline: 1},
// single piece/node is out of placement, NO download/upload repair, we are only removing piece from segment // single piece/node is out of placement, NO download/upload repair, we are only removing piece from segment
// as segment is still above repair threshold // as segment is still above repair threshold
{piecesOutOfPlacement: 1, piecesAfterRepair: piecesCount - 1}, {piecesOutOfPlacement: 1, piecesAfterRepair: piecesCount - 1},
{piecesOutOfPlacement: 1, piecesAfterRepair: piecesCount - 1, piecesOutOfPlacementOffline: 1},
{piecesOutOfPlacement: 1, piecesAfterRepair: piecesCount - 1, piecesOutOfPlacementOffline: 1},
} { } {
t.Run("#"+strconv.Itoa(i), func(t *testing.T) {
for _, node := range planet.StorageNodes { for _, node := range planet.StorageNodes {
require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), "PL")) require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), defaultLocation.String()))
} }
require.NoError(t, planet.Satellites[0].Repairer.Overlay.DownloadSelectionCache.Refresh(ctx)) require.NoError(t, planet.Satellites[0].Repairer.Overlay.DownloadSelectionCache.Refresh(ctx))
expectedData := testrand.Bytes(5 * memory.KiB)
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "object", expectedData)
require.NoError(t, err)
segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, segments, 1) require.Len(t, segments, 1)
require.Len(t, segments[0].Pieces, piecesCount) require.Len(t, segments[0].Pieces, piecesCount)
for _, piece := range segments[0].Pieces[:tc.piecesOutOfPlacement] { for index, piece := range segments[0].Pieces {
// make node offline if needed
require.NoError(t, updateNodeStatus(ctx, planet.Satellites[0], planet.FindNode(piece.StorageNode), index < tc.piecesOutOfPlacementOffline, defaultLocation))
if index < tc.piecesOutOfPlacement {
require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, piece.StorageNode, "US")) require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, piece.StorageNode, "US"))
} }
}
// confirm that some pieces are out of placement // confirm that some pieces are out of placement
ok, err := allPiecesInPlacement(ctx, planet.Satellites[0].Overlay.Service, segments[0].Pieces, segments[0].Placement) ok, err := allPiecesInPlacement(ctx, planet.Satellites[0].Overlay.Service, segments[0].Pieces, segments[0].Placement)
@ -100,6 +119,11 @@ func TestSegmentRepairPlacement(t *testing.T) {
ok, err = allPiecesInPlacement(ctx, planet.Satellites[0].Overlay.Service, segments[0].Pieces, segments[0].Placement) ok, err = allPiecesInPlacement(ctx, planet.Satellites[0].Overlay.Service, segments[0].Pieces, segments[0].Placement)
require.NoError(t, err) require.NoError(t, err)
require.True(t, ok) require.True(t, ok)
data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], "testbucket", "object")
require.NoError(t, err)
require.Equal(t, expectedData, data)
})
} }
}) })
} }
@ -248,3 +272,26 @@ func allPiecesInPlacement(ctx context.Context, overaly *overlay.Service, pieces
} }
return true, nil return true, nil
} }
func updateNodeStatus(ctx context.Context, satellite *testplanet.Satellite, node *testplanet.StorageNode, offline bool, countryCode location.CountryCode) error {
timestamp := time.Now()
if offline {
timestamp = time.Now().Add(-4 * time.Hour)
}
return satellite.DB.OverlayCache().UpdateCheckIn(ctx, overlay.NodeCheckInInfo{
NodeID: node.ID(),
Address: &pb.NodeAddress{Address: node.Addr()},
IsUp: true,
Version: &pb.NodeVersion{
Version: "v0.0.0",
CommitHash: "",
Timestamp: time.Time{},
Release: true,
},
Capacity: &pb.NodeCapacity{
FreeDisk: 1 * memory.GiB.Int64(),
},
CountryCode: countryCode,
}, timestamp, satellite.Config.Overlay.Node)
}