diff --git a/satellite/repair/checker/observer.go b/satellite/repair/checker/observer.go index dce09e32e..03441e00e 100644 --- a/satellite/repair/checker/observer.go +++ b/satellite/repair/checker/observer.go @@ -5,6 +5,7 @@ package checker import ( "context" + "fmt" "reflect" "sort" "strings" @@ -17,6 +18,7 @@ import ( "storj.io/common/storj" "storj.io/common/uuid" + "storj.io/storj/satellite/metabase" "storj.io/storj/satellite/metabase/rangedloop" "storj.io/storj/satellite/overlay" "storj.io/storj/satellite/repair" @@ -387,7 +389,8 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme } // monitor irreparable segments - if numHealthy < required { + numRetrievable := len(pieces) - len(missingPieces) + if numRetrievable < required { if !slices.Contains(fork.totalStats.objectsLost, segment.StreamID) { fork.totalStats.objectsLost = append(fork.totalStats.objectsLost, segment.StreamID) } @@ -423,6 +426,16 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme } fork.log.Warn("checker found irreparable segment", zap.String("Segment StreamID", segment.StreamID.String()), zap.Int("Segment Position", int(segment.Position.Encode())), zap.Int("total pieces", len(pieces)), zap.Int("min required", required), zap.String("unhealthy node IDs", strings.Join(unhealthyNodes, ","))) + } else if numRetrievable > repairThreshold { + // This segment is to be repaired because of clumping (it wouldn't need repair yet + // otherwise). Produce a brief report of where the clumping occurred so that we have + // a better understanding of the cause. + clumpedNets := clumpingReport{ + clumpedPieces: clumpedPieces, + allPieces: segment.Pieces, + lastNets: lastNets, + } + fork.log.Info("segment needs repair because of clumping", zap.Stringer("Segment StreamID", segment.StreamID), zap.Uint64("Segment Position", segment.Position.Encode()), zap.Int("total pieces", len(pieces)), zap.Int("min required", required), zap.Stringer("clumping", &clumpedNets)) } } else { if numHealthy > repairThreshold && numHealthy <= (repairThreshold+len(fork.totalStats.remoteSegmentsOverThreshold)) { @@ -450,3 +463,30 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme return nil } + +type clumpingReport struct { + clumpedPieces metabase.Pieces + allPieces metabase.Pieces + lastNets []string +} + +// String produces the clumping report. In case the satellite isn't logging at the required level, +// we avoid doing the work of building the report until String() is called. +func (cr *clumpingReport) String() string { + clumpedNets := make(map[string]int) + for _, clumpedPiece := range cr.clumpedPieces { + lastNet := "" + for i, piece := range cr.allPieces { + if piece.Number == clumpedPiece.Number && piece.StorageNode.Compare(clumpedPiece.StorageNode) == 0 { + lastNet = cr.lastNets[i] + break + } + } + clumpedNets[lastNet]++ + } + counts := make([]string, 0, len(clumpedNets)) + for clumpedNet, count := range clumpedNets { + counts = append(counts, fmt.Sprintf("[%s]: %d", clumpedNet, count)) + } + return strings.Join(counts, ", ") +}