satellite/repair: don't mark clumped segments as irreparable

Clumped segments (segments with multiple pieces on the same subnet) may
need repair, but the clumped pieces are considered retrievable and we
don't need to call such segments irreparable.

We do want to know where they're coming from, though, if we can, because
we are seeing more than expected.

Change-Id: I41863b243f4bb007ef8929191a3fde1562565ef9
This commit is contained in:
paul cannon 2023-05-17 10:45:09 -05:00 committed by Michal Niewrzal
parent 607b120116
commit 1f4f79b6b3

View File

@ -5,6 +5,7 @@ package checker
import (
"context"
"fmt"
"reflect"
"sort"
"strings"
@ -17,6 +18,7 @@ import (
"storj.io/common/storj"
"storj.io/common/uuid"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/rangedloop"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/repair"
@ -387,7 +389,8 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
}
// monitor irreparable segments
if numHealthy < required {
numRetrievable := len(pieces) - len(missingPieces)
if numRetrievable < required {
if !slices.Contains(fork.totalStats.objectsLost, segment.StreamID) {
fork.totalStats.objectsLost = append(fork.totalStats.objectsLost, segment.StreamID)
}
@ -423,6 +426,16 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
}
fork.log.Warn("checker found irreparable segment", zap.String("Segment StreamID", segment.StreamID.String()), zap.Int("Segment Position",
int(segment.Position.Encode())), zap.Int("total pieces", len(pieces)), zap.Int("min required", required), zap.String("unhealthy node IDs", strings.Join(unhealthyNodes, ",")))
} else if numRetrievable > repairThreshold {
// This segment is to be repaired because of clumping (it wouldn't need repair yet
// otherwise). Produce a brief report of where the clumping occurred so that we have
// a better understanding of the cause.
clumpedNets := clumpingReport{
clumpedPieces: clumpedPieces,
allPieces: segment.Pieces,
lastNets: lastNets,
}
fork.log.Info("segment needs repair because of clumping", zap.Stringer("Segment StreamID", segment.StreamID), zap.Uint64("Segment Position", segment.Position.Encode()), zap.Int("total pieces", len(pieces)), zap.Int("min required", required), zap.Stringer("clumping", &clumpedNets))
}
} else {
if numHealthy > repairThreshold && numHealthy <= (repairThreshold+len(fork.totalStats.remoteSegmentsOverThreshold)) {
@ -450,3 +463,30 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
return nil
}
type clumpingReport struct {
clumpedPieces metabase.Pieces
allPieces metabase.Pieces
lastNets []string
}
// String produces the clumping report. In case the satellite isn't logging at the required level,
// we avoid doing the work of building the report until String() is called.
func (cr *clumpingReport) String() string {
clumpedNets := make(map[string]int)
for _, clumpedPiece := range cr.clumpedPieces {
lastNet := ""
for i, piece := range cr.allPieces {
if piece.Number == clumpedPiece.Number && piece.StorageNode.Compare(clumpedPiece.StorageNode) == 0 {
lastNet = cr.lastNets[i]
break
}
}
clumpedNets[lastNet]++
}
counts := make([]string, 0, len(clumpedNets))
for clumpedNet, count := range clumpedNets {
counts = append(counts, fmt.Sprintf("[%s]: %d", clumpedNet, count))
}
return strings.Join(counts, ", ")
}