satellite/repair/checker: remove segments loop parts

We are switching completely to ranged loop.

https://github.com/storj/storj/issues/5368

Change-Id: I8583549973cd36aa0e0c482c20d7a75cb7568ab3
This commit is contained in:
Michal Niewrzal 2023-04-25 10:40:22 +02:00
parent e8e6dd056a
commit 36e046375c
19 changed files with 376 additions and 1230 deletions

2
go.mod
View File

@ -50,6 +50,7 @@ require (
go.etcd.io/bbolt v1.3.5
go.uber.org/zap v1.16.0
golang.org/x/crypto v0.6.0
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db
golang.org/x/net v0.6.0
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/sync v0.1.0
@ -127,7 +128,6 @@ require (
go.opencensus.io v0.22.2 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db // indirect
golang.org/x/mod v0.6.0 // indirect
golang.org/x/tools v0.2.0 // indirect
google.golang.org/api v0.20.0 // indirect

View File

@ -57,7 +57,6 @@ import (
"storj.io/storj/satellite/overlay/offlinenodes"
"storj.io/storj/satellite/overlay/straynodes"
"storj.io/storj/satellite/payments/stripe"
"storj.io/storj/satellite/repair/checker"
"storj.io/storj/satellite/repair/repairer"
"storj.io/storj/satellite/reputation"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
@ -134,7 +133,6 @@ type Satellite struct {
}
Repair struct {
Checker *checker.Checker
Repairer *repairer.Service
}
@ -629,7 +627,6 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer
system.Orders.Service = api.Orders.Service
system.Orders.Chore = api.Orders.Chore
system.Repair.Checker = peer.Repair.Checker
system.Repair.Repairer = repairerPeer.Repairer
system.Audit.VerifyQueue = auditorPeer.Audit.VerifyQueue

View File

@ -208,8 +208,8 @@ func TestBilling_AuditRepairTraffic(t *testing.T) {
)
satelliteSys := planet.Satellites[0]
satelliteSys.RangedLoop.RangedLoop.Service.Loop.Stop()
satelliteSys.Audit.Worker.Loop.Pause()
satelliteSys.Repair.Checker.Loop.Pause()
satelliteSys.Repair.Repairer.Loop.Pause()
// stop any async flushes because we want to be sure when some values are
// written to avoid races

View File

@ -987,7 +987,7 @@ func TestAuditRepairedSegmentInExcludedCountries(t *testing.T) {
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Pause()
satellite.Repair.Checker.Loop.Pause()
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
satellite.Repair.Repairer.Loop.Pause()
var testData = testrand.Bytes(8 * memory.KiB)
@ -1014,10 +1014,9 @@ func TestAuditRepairedSegmentInExcludedCountries(t *testing.T) {
require.NoError(t, err)
}
// trigger checker to add segment to repair queue
satellite.Repair.Checker.Loop.Restart()
satellite.Repair.Checker.Loop.TriggerWait()
satellite.Repair.Checker.Loop.Pause()
// trigger repair checker with ranged loop to add segment to repair queue
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
count, err := satellite.DB.RepairQueue().Count(ctx)
require.NoError(t, err)

View File

@ -48,7 +48,6 @@ import (
"storj.io/storj/satellite/payments/billing"
"storj.io/storj/satellite/payments/storjscan"
"storj.io/storj/satellite/payments/stripe"
"storj.io/storj/satellite/repair/checker"
"storj.io/storj/satellite/reputation"
)
@ -104,10 +103,6 @@ type Core struct {
Service *reputation.Service
}
Repair struct {
Checker *checker.Checker
}
Audit struct {
VerifyQueue audit.VerifyQueue
ReverifyQueue audit.ReverifyQueue
@ -322,29 +317,6 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
})
}
{ // setup data repair
log := peer.Log.Named("repair:checker")
if config.Repairer.UseRangedLoop {
log.Info("using ranged loop")
} else {
peer.Repair.Checker = checker.NewChecker(
log,
peer.DB.RepairQueue(),
peer.Metainfo.Metabase,
peer.Metainfo.SegmentLoop,
peer.Overlay.Service,
config.Checker)
peer.Services.Add(lifecycle.Item{
Name: "repair:checker",
Run: peer.Repair.Checker.Run,
Close: peer.Repair.Checker.Close,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Repair Checker", peer.Repair.Checker.Loop))
}
}
{ // setup reputation
reputationDB := peer.DB.Reputation()
if config.Reputation.FlushInterval > 0 {

View File

@ -421,7 +421,7 @@ func TestAllInOne(t *testing.T) {
bfConfig,
satellite.DB.OverlayCache(),
),
checker.NewRangedLoopObserver(
checker.NewObserver(
log.Named("repair:checker"),
satellite.DB.RepairQueue(),
satellite.Overlay.Service,

View File

@ -172,7 +172,6 @@ func TestEnsureMinimumRequested(t *testing.T) {
// pause chores that might update node data
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
satellite.Repair.Checker.Loop.Pause()
satellite.Repair.Repairer.Loop.Pause()
for _, node := range planet.StorageNodes {
node.Contact.Chore.Pause(ctx)

View File

@ -393,7 +393,6 @@ func TestGetOnlineNodesForGetDelete(t *testing.T) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
// pause chores that might update node data
planet.Satellites[0].RangedLoop.RangedLoop.Service.Loop.Stop()
planet.Satellites[0].Repair.Checker.Loop.Pause()
planet.Satellites[0].Repair.Repairer.Loop.Pause()
for _, node := range planet.StorageNodes {
node.Contact.Chore.Pause(ctx)

View File

@ -55,7 +55,7 @@ type RangedLoop struct {
}
Repair struct {
Observer rangedloop.Observer
Observer *checker.Observer
}
GracefulExit struct {
@ -141,7 +141,7 @@ func NewRangedLoop(log *zap.Logger, db DB, metabaseDB *metabase.DB, config *Conf
}
{ // setup repair
peer.Repair.Observer = checker.NewRangedLoopObserver(
peer.Repair.Observer = checker.NewObserver(
peer.Log.Named("repair:checker"),
peer.DB.RepairQueue(),
peer.Overlay.Service,

View File

@ -1,401 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package checker
import (
"bytes"
"context"
"strings"
"time"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/errs2"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/common/uuid"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/segmentloop"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/repair"
"storj.io/storj/satellite/repair/queue"
)
// Error is a standard error class for this package.
var (
Error = errs.Class("repair checker")
mon = monkit.Package()
)
// Checker contains the information needed to do checks for missing pieces.
//
// architecture: Chore
type Checker struct {
logger *zap.Logger
repairQueue queue.RepairQueue
metabase *metabase.DB
segmentLoop *segmentloop.Service
nodestate *ReliabilityCache
overlayService *overlay.Service
statsCollector *statsCollector
repairOverrides RepairOverridesMap
nodeFailureRate float64
repairQueueBatchSize int
Loop *sync2.Cycle
}
// NewChecker creates a new instance of checker.
func NewChecker(logger *zap.Logger, repairQueue queue.RepairQueue, metabase *metabase.DB, segmentLoop *segmentloop.Service, overlay *overlay.Service, config Config) *Checker {
return &Checker{
logger: logger,
repairQueue: repairQueue,
metabase: metabase,
segmentLoop: segmentLoop,
nodestate: NewReliabilityCache(overlay, config.ReliabilityCacheStaleness),
overlayService: overlay,
statsCollector: newStatsCollector(),
repairOverrides: config.RepairOverrides.GetMap(),
nodeFailureRate: config.NodeFailureRate,
repairQueueBatchSize: config.RepairQueueInsertBatchSize,
Loop: sync2.NewCycle(config.Interval),
}
}
// Run the checker loop.
func (checker *Checker) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
return checker.Loop.Run(ctx, checker.IdentifyInjuredSegments)
}
// getNodesEstimate updates the estimate of the total number of nodes. It is guaranteed
// to return a number greater than 0 when the error is nil.
//
// We can't calculate this upon first starting a Checker, because there may not be any
// nodes yet. We expect that there will be nodes before there are segments, though.
func (checker *Checker) getNodesEstimate(ctx context.Context) (int, error) {
// this should be safe to call frequently; it is an efficient caching lookup.
totalNumNodes, err := checker.nodestate.NumNodes(ctx)
if err != nil {
// We could proceed here by returning the last good value, or by returning a fallback
// constant estimate, like "20000", and we'd probably be fine, but it would be better
// not to have that happen silently for too long. Also, if we can't get this from the
// database, we probably can't modify the injured segments queue, so it won't help to
// proceed with this repair operation.
return 0, err
}
if totalNumNodes == 0 {
return 0, Error.New("segment health is meaningless: there are no nodes")
}
return totalNumNodes, nil
}
func (checker *Checker) createInsertBuffer() *queue.InsertBuffer {
return queue.NewInsertBuffer(checker.repairQueue, checker.repairQueueBatchSize)
}
// RefreshReliabilityCache forces refreshing node online status cache.
func (checker *Checker) RefreshReliabilityCache(ctx context.Context) error {
return checker.nodestate.Refresh(ctx)
}
// Close halts the Checker loop.
func (checker *Checker) Close() error {
checker.Loop.Close()
return nil
}
// IdentifyInjuredSegments checks for missing pieces off of the metainfo and overlay.
func (checker *Checker) IdentifyInjuredSegments(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
startTime := time.Now()
observer := &checkerObserver{
repairQueue: checker.createInsertBuffer(),
nodestate: checker.nodestate,
overlayService: checker.overlayService,
statsCollector: checker.statsCollector,
monStats: aggregateStats{},
repairOverrides: checker.repairOverrides,
nodeFailureRate: checker.nodeFailureRate,
getNodesEstimate: checker.getNodesEstimate,
log: checker.logger,
}
err = checker.segmentLoop.Join(ctx, observer)
if err != nil {
if !errs2.IsCanceled(err) {
checker.logger.Error("IdentifyInjuredSegments error", zap.Error(err))
}
return nil
}
err = observer.repairQueue.Flush(ctx)
if err != nil {
return Error.Wrap(err)
}
// remove all segments which were not seen as unhealthy by this checker iteration
healthyDeleted, err := checker.repairQueue.Clean(ctx, startTime)
if err != nil {
return Error.Wrap(err)
}
checker.statsCollector.collectAggregates()
mon.IntVal("remote_files_checked").Observe(observer.monStats.objectsChecked) //mon:locked
mon.IntVal("remote_segments_checked").Observe(observer.monStats.remoteSegmentsChecked) //mon:locked
mon.IntVal("remote_segments_failed_to_check").Observe(observer.monStats.remoteSegmentsFailedToCheck) //mon:locked
mon.IntVal("remote_segments_needing_repair").Observe(observer.monStats.remoteSegmentsNeedingRepair) //mon:locked
mon.IntVal("new_remote_segments_needing_repair").Observe(observer.monStats.newRemoteSegmentsNeedingRepair) //mon:locked
mon.IntVal("remote_segments_lost").Observe(observer.monStats.remoteSegmentsLost) //mon:locked
mon.IntVal("remote_files_lost").Observe(int64(len(observer.monStats.objectsLost))) //mon:locked
mon.IntVal("remote_segments_over_threshold_1").Observe(observer.monStats.remoteSegmentsOverThreshold[0]) //mon:locked
mon.IntVal("remote_segments_over_threshold_2").Observe(observer.monStats.remoteSegmentsOverThreshold[1]) //mon:locked
mon.IntVal("remote_segments_over_threshold_3").Observe(observer.monStats.remoteSegmentsOverThreshold[2]) //mon:locked
mon.IntVal("remote_segments_over_threshold_4").Observe(observer.monStats.remoteSegmentsOverThreshold[3]) //mon:locked
mon.IntVal("remote_segments_over_threshold_5").Observe(observer.monStats.remoteSegmentsOverThreshold[4]) //mon:locked
mon.IntVal("healthy_segments_removed_from_queue").Observe(healthyDeleted) //mon:locked
allUnhealthy := observer.monStats.remoteSegmentsNeedingRepair + observer.monStats.remoteSegmentsFailedToCheck
allChecked := observer.monStats.remoteSegmentsChecked
allHealthy := allChecked - allUnhealthy
mon.FloatVal("remote_segments_healthy_percentage").Observe(100 * float64(allHealthy) / float64(allChecked)) //mon:locked
return nil
}
var _ segmentloop.Observer = (*checkerObserver)(nil)
// checkerObserver implements the metainfo loop Observer interface.
//
// architecture: Observer
type checkerObserver struct {
repairQueue *queue.InsertBuffer
nodestate *ReliabilityCache
overlayService *overlay.Service
statsCollector *statsCollector
monStats aggregateStats // TODO(cam): once we verify statsCollector reports data correctly, remove this
repairOverrides RepairOverridesMap
nodeFailureRate float64
getNodesEstimate func(ctx context.Context) (int, error)
log *zap.Logger
lastStreamID uuid.UUID
}
// NewCheckerObserver creates new checker observer instance.
func NewCheckerObserver(checker *Checker) segmentloop.Observer {
return &checkerObserver{
repairQueue: checker.createInsertBuffer(),
nodestate: checker.nodestate,
overlayService: checker.overlayService,
statsCollector: checker.statsCollector,
monStats: aggregateStats{},
repairOverrides: checker.repairOverrides,
nodeFailureRate: checker.nodeFailureRate,
getNodesEstimate: checker.getNodesEstimate,
log: checker.logger,
}
}
// checks for a stream id in slice.
func containsStreamID(a []uuid.UUID, x uuid.UUID) bool {
for _, n := range a {
if bytes.Equal(x[:], n[:]) {
return true
}
}
return false
}
func (obs *checkerObserver) getStatsByRS(redundancy storj.RedundancyScheme) *stats {
rsString := getRSString(obs.loadRedundancy(redundancy))
return obs.statsCollector.getStatsByRS(rsString)
}
func (obs *checkerObserver) loadRedundancy(redundancy storj.RedundancyScheme) (int, int, int, int) {
repair := int(redundancy.RepairShares)
overrideValue := obs.repairOverrides.GetOverrideValue(redundancy)
if overrideValue != 0 {
repair = int(overrideValue)
}
return int(redundancy.RequiredShares), repair, int(redundancy.OptimalShares), int(redundancy.TotalShares)
}
// LoopStarted is called at each start of a loop.
func (obs *checkerObserver) LoopStarted(context.Context, segmentloop.LoopInfo) (err error) {
return nil
}
func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *segmentloop.Segment) (err error) {
// we are explicitly not adding monitoring here as we are tracking loop observers separately
// ignore segment if expired
if segment.Expired(time.Now()) {
return nil
}
stats := obs.getStatsByRS(segment.Redundancy)
if obs.lastStreamID.Compare(segment.StreamID) != 0 {
obs.lastStreamID = segment.StreamID
stats.iterationAggregates.objectsChecked++
obs.monStats.objectsChecked++
}
obs.monStats.remoteSegmentsChecked++
stats.iterationAggregates.remoteSegmentsChecked++
// ensure we get values, even if only zero values, so that redash can have an alert based on this
mon.Counter("checker_segments_below_min_req").Inc(0) //mon:locked
stats.segmentsBelowMinReq.Inc(0)
pieces := segment.Pieces
if len(pieces) == 0 {
obs.log.Debug("no pieces on remote segment")
return nil
}
totalNumNodes, err := obs.getNodesEstimate(ctx)
if err != nil {
return Error.New("could not get estimate of total number of nodes: %w", err)
}
missingPieces, err := obs.nodestate.MissingPieces(ctx, segment.CreatedAt, segment.Pieces)
if err != nil {
obs.monStats.remoteSegmentsFailedToCheck++
stats.iterationAggregates.remoteSegmentsFailedToCheck++
return errs.Combine(Error.New("error getting missing pieces"), err)
}
// if multiple pieces are on the same last_net, keep only the first one. The rest are
// to be considered retrievable but unhealthy.
nodeIDs := make([]storj.NodeID, len(pieces))
for i, p := range pieces {
nodeIDs[i] = p.StorageNode
}
lastNets, err := obs.overlayService.GetNodesNetworkInOrder(ctx, nodeIDs)
if err != nil {
obs.monStats.remoteSegmentsFailedToCheck++
stats.iterationAggregates.remoteSegmentsFailedToCheck++
return errs.Combine(Error.New("error determining node last_nets"), err)
}
clumpedPieces := repair.FindClumpedPieces(segment.Pieces, lastNets)
numHealthy := len(pieces) - len(missingPieces) - len(clumpedPieces)
mon.IntVal("checker_segment_total_count").Observe(int64(len(pieces))) //mon:locked
stats.segmentTotalCount.Observe(int64(len(pieces)))
mon.IntVal("checker_segment_healthy_count").Observe(int64(numHealthy)) //mon:locked
stats.segmentHealthyCount.Observe(int64(numHealthy))
mon.IntVal("checker_segment_clumped_count").Observe(int64(len(clumpedPieces))) //mon:locked
stats.segmentClumpedCount.Observe(int64(len(clumpedPieces)))
segmentAge := time.Since(segment.CreatedAt)
mon.IntVal("checker_segment_age").Observe(int64(segmentAge.Seconds())) //mon:locked
stats.segmentAge.Observe(int64(segmentAge.Seconds()))
required, repairThreshold, successThreshold, _ := obs.loadRedundancy(segment.Redundancy)
segmentHealth := repair.SegmentHealth(numHealthy, required, totalNumNodes, obs.nodeFailureRate)
mon.FloatVal("checker_segment_health").Observe(segmentHealth) //mon:locked
stats.segmentHealth.Observe(segmentHealth)
// we repair when the number of healthy pieces is less than or equal to the repair threshold and is greater or equal to
// minimum required pieces in redundancy
// except for the case when the repair and success thresholds are the same (a case usually seen during testing)
if numHealthy <= repairThreshold && numHealthy < successThreshold {
mon.FloatVal("checker_injured_segment_health").Observe(segmentHealth) //mon:locked
stats.injuredSegmentHealth.Observe(segmentHealth)
obs.monStats.remoteSegmentsNeedingRepair++
stats.iterationAggregates.remoteSegmentsNeedingRepair++
err := obs.repairQueue.Insert(ctx, &queue.InjuredSegment{
StreamID: segment.StreamID,
Position: segment.Position,
UpdatedAt: time.Now().UTC(),
SegmentHealth: segmentHealth,
}, func() {
// Counters are increased after the queue has determined
// that the segment wasn't already queued for repair.
obs.monStats.newRemoteSegmentsNeedingRepair++
stats.iterationAggregates.newRemoteSegmentsNeedingRepair++
})
if err != nil {
obs.log.Error("error adding injured segment to queue", zap.Error(err))
return nil
}
// monitor irreperable segments
if numHealthy < required {
if !containsStreamID(obs.monStats.objectsLost, segment.StreamID) {
obs.monStats.objectsLost = append(obs.monStats.objectsLost, segment.StreamID)
}
if !containsStreamID(stats.iterationAggregates.objectsLost, segment.StreamID) {
stats.iterationAggregates.objectsLost = append(stats.iterationAggregates.objectsLost, segment.StreamID)
}
repairedAt := time.Time{}
if segment.RepairedAt != nil {
repairedAt = *segment.RepairedAt
}
var segmentAge time.Duration
if segment.CreatedAt.Before(repairedAt) {
segmentAge = time.Since(repairedAt)
} else {
segmentAge = time.Since(segment.CreatedAt)
}
mon.IntVal("checker_segment_time_until_irreparable").Observe(int64(segmentAge.Seconds())) //mon:locked
stats.segmentTimeUntilIrreparable.Observe(int64(segmentAge.Seconds()))
obs.monStats.remoteSegmentsLost++
stats.iterationAggregates.remoteSegmentsLost++
mon.Counter("checker_segments_below_min_req").Inc(1) //mon:locked
stats.segmentsBelowMinReq.Inc(1)
var unhealthyNodes []string
for _, p := range missingPieces {
unhealthyNodes = append(unhealthyNodes, p.StorageNode.String())
}
obs.log.Warn("checker found irreparable segment", zap.String("Segment StreamID", segment.StreamID.String()), zap.Int("Segment Position",
int(segment.Position.Encode())), zap.Int("total pieces", len(pieces)), zap.Int("min required", required), zap.String("unhealthy node IDs", strings.Join(unhealthyNodes, ",")))
}
} else {
if numHealthy > repairThreshold && numHealthy <= (repairThreshold+len(obs.monStats.remoteSegmentsOverThreshold)) {
// record metrics for segments right above repair threshold
// numHealthy=repairThreshold+1 through numHealthy=repairThreshold+5
for i := range obs.monStats.remoteSegmentsOverThreshold {
if numHealthy == (repairThreshold + i + 1) {
obs.monStats.remoteSegmentsOverThreshold[i]++
break
}
}
}
if numHealthy > repairThreshold && numHealthy <= (repairThreshold+len(stats.iterationAggregates.remoteSegmentsOverThreshold)) {
// record metrics for segments right above repair threshold
// numHealthy=repairThreshold+1 through numHealthy=repairThreshold+5
for i := range stats.iterationAggregates.remoteSegmentsOverThreshold {
if numHealthy == (repairThreshold + i + 1) {
stats.iterationAggregates.remoteSegmentsOverThreshold[i]++
break
}
}
}
}
return nil
}
func (obs *checkerObserver) InlineSegment(ctx context.Context, segment *segmentloop.Segment) (err error) {
// inline segments are not repaired but we would like to count as checked also
// objects that have only inline segments
if obs.lastStreamID.Compare(segment.StreamID) != 0 {
obs.monStats.objectsChecked++
}
return nil
}

View File

@ -1,426 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package checker_test
import (
"context"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/require"
"storj.io/common/memory"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/common/uuid"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/segmentloop"
"storj.io/storj/satellite/repair/checker"
)
func TestIdentifyInjuredSegments(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
checker := planet.Satellites[0].Repair.Checker
repairQueue := planet.Satellites[0].DB.RepairQueue()
checker.Loop.Pause()
planet.Satellites[0].Repair.Repairer.Loop.Pause()
rs := storj.RedundancyScheme{
RequiredShares: 2,
RepairShares: 3,
OptimalShares: 4,
TotalShares: 5,
ShareSize: 256,
}
projectID := planet.Uplinks[0].Projects[0].ID
err := planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], "test-bucket")
require.NoError(t, err)
expectedLocation := metabase.SegmentLocation{
ProjectID: projectID,
BucketName: "test-bucket",
}
// add some valid pointers
for x := 0; x < 10; x++ {
expectedLocation.ObjectKey = metabase.ObjectKey(fmt.Sprintf("a-%d", x))
insertSegment(ctx, t, planet, rs, expectedLocation, createPieces(planet, rs), nil)
}
// add pointer that needs repair
expectedLocation.ObjectKey = metabase.ObjectKey("b-0")
b0StreamID := insertSegment(ctx, t, planet, rs, expectedLocation, createLostPieces(planet, rs), nil)
// add pointer that is unhealthy, but is expired
expectedLocation.ObjectKey = metabase.ObjectKey("b-1")
expiresAt := time.Now().Add(-time.Hour)
insertSegment(ctx, t, planet, rs, expectedLocation, createLostPieces(planet, rs), &expiresAt)
// add some valid pointers
for x := 0; x < 10; x++ {
expectedLocation.ObjectKey = metabase.ObjectKey(fmt.Sprintf("c-%d", x))
insertSegment(ctx, t, planet, rs, expectedLocation, createPieces(planet, rs), nil)
}
checker.Loop.TriggerWait()
// check that the unhealthy, non-expired segment was added to the queue
// and that the expired segment was ignored
injuredSegment, err := repairQueue.Select(ctx)
require.NoError(t, err)
err = repairQueue.Delete(ctx, injuredSegment)
require.NoError(t, err)
require.Equal(t, b0StreamID, injuredSegment.StreamID)
_, err = repairQueue.Select(ctx)
require.Error(t, err)
})
}
func TestIdentifyIrreparableSegments(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 3, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
checker := planet.Satellites[0].Repair.Checker
checker.Loop.Stop()
const numberOfNodes = 10
pieces := make(metabase.Pieces, 0, numberOfNodes)
// use online nodes
for i, storagenode := range planet.StorageNodes {
pieces = append(pieces, metabase.Piece{
Number: uint16(i),
StorageNode: storagenode.ID(),
})
}
// simulate offline nodes
expectedLostPieces := make(map[int32]bool)
for i := len(pieces); i < numberOfNodes; i++ {
pieces = append(pieces, metabase.Piece{
Number: uint16(i),
StorageNode: storj.NodeID{byte(i)},
})
expectedLostPieces[int32(i)] = true
}
rs := storj.RedundancyScheme{
ShareSize: 256,
RequiredShares: 4,
RepairShares: 8,
OptimalShares: 9,
TotalShares: 10,
}
projectID := planet.Uplinks[0].Projects[0].ID
err := planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], "test-bucket")
require.NoError(t, err)
expectedLocation := metabase.SegmentLocation{
ProjectID: projectID,
BucketName: "test-bucket",
}
// when number of healthy piece is less than minimum required number of piece in redundancy,
// the piece is considered irreparable but also will be put into repair queue
expectedLocation.ObjectKey = "piece"
insertSegment(ctx, t, planet, rs, expectedLocation, pieces, nil)
expectedLocation.ObjectKey = "piece-expired"
expiresAt := time.Now().Add(-time.Hour)
insertSegment(ctx, t, planet, rs, expectedLocation, pieces, &expiresAt)
err = checker.IdentifyInjuredSegments(ctx)
require.NoError(t, err)
// check that single irreparable segment was added repair queue
repairQueue := planet.Satellites[0].DB.RepairQueue()
_, err = repairQueue.Select(ctx)
require.NoError(t, err)
count, err := repairQueue.Count(ctx)
require.NoError(t, err)
require.Equal(t, 1, count)
// check irreparable once again but wait a second
time.Sleep(1 * time.Second)
err = checker.IdentifyInjuredSegments(ctx)
require.NoError(t, err)
expectedLocation.ObjectKey = "piece"
_, err = planet.Satellites[0].Metabase.DB.DeleteObjectExactVersion(ctx, metabase.DeleteObjectExactVersion{
ObjectLocation: expectedLocation.Object(),
Version: metabase.DefaultVersion,
})
require.NoError(t, err)
err = checker.IdentifyInjuredSegments(ctx)
require.NoError(t, err)
count, err = repairQueue.Count(ctx)
require.NoError(t, err)
require.Equal(t, 0, count)
})
}
func TestCleanRepairQueue(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
checker := planet.Satellites[0].Repair.Checker
repairQueue := planet.Satellites[0].DB.RepairQueue()
checker.Loop.Pause()
planet.Satellites[0].Repair.Repairer.Loop.Pause()
rs := storj.RedundancyScheme{
RequiredShares: 2,
RepairShares: 3,
OptimalShares: 4,
TotalShares: 5,
ShareSize: 256,
}
projectID := planet.Uplinks[0].Projects[0].ID
err := planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], "test-bucket")
require.NoError(t, err)
expectedLocation := metabase.SegmentLocation{
ProjectID: projectID,
BucketName: "test-bucket",
}
healthyCount := 5
for i := 0; i < healthyCount; i++ {
expectedLocation.ObjectKey = metabase.ObjectKey(fmt.Sprintf("healthy-%d", i))
insertSegment(ctx, t, planet, rs, expectedLocation, createPieces(planet, rs), nil)
}
unhealthyCount := 5
unhealthyIDs := make(map[uuid.UUID]struct{})
for i := 0; i < unhealthyCount; i++ {
expectedLocation.ObjectKey = metabase.ObjectKey(fmt.Sprintf("unhealthy-%d", i))
unhealthyStreamID := insertSegment(ctx, t, planet, rs, expectedLocation, createLostPieces(planet, rs), nil)
unhealthyIDs[unhealthyStreamID] = struct{}{}
}
// suspend enough nodes to make healthy pointers unhealthy
for i := rs.RequiredShares; i < rs.OptimalShares; i++ {
require.NoError(t, planet.Satellites[0].Overlay.DB.TestSuspendNodeUnknownAudit(ctx, planet.StorageNodes[i].ID(), time.Now()))
}
require.NoError(t, planet.Satellites[0].Repair.Checker.RefreshReliabilityCache(ctx))
// check that repair queue is empty to avoid false positive
count, err := repairQueue.Count(ctx)
require.NoError(t, err)
require.Equal(t, 0, count)
checker.Loop.TriggerWait()
// check that the pointers were put into the repair queue
// and not cleaned up at the end of the checker iteration
count, err = repairQueue.Count(ctx)
require.NoError(t, err)
require.Equal(t, healthyCount+unhealthyCount, count)
// unsuspend nodes to make the previously healthy pointers healthy again
for i := rs.RequiredShares; i < rs.OptimalShares; i++ {
require.NoError(t, planet.Satellites[0].Overlay.DB.TestUnsuspendNodeUnknownAudit(ctx, planet.StorageNodes[i].ID()))
}
require.NoError(t, planet.Satellites[0].Repair.Checker.RefreshReliabilityCache(ctx))
// The checker will not insert/update the now healthy segments causing
// them to be removed from the queue at the end of the checker iteration
checker.Loop.TriggerWait()
// only unhealthy segments should remain
count, err = repairQueue.Count(ctx)
require.NoError(t, err)
require.Equal(t, unhealthyCount, count)
segs, err := repairQueue.SelectN(ctx, count)
require.NoError(t, err)
require.Equal(t, len(unhealthyIDs), len(segs))
for _, s := range segs {
_, ok := unhealthyIDs[s.StreamID]
require.True(t, ok)
}
})
}
func TestIgnoringCopiedSegments(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.ReconfigureRS(2, 3, 4, 4),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
uplink := planet.Uplinks[0]
metabaseDB := satellite.Metabase.DB
checker := satellite.Repair.Checker
repairQueue := satellite.DB.RepairQueue()
checker.Loop.Pause()
satellite.Repair.Repairer.Loop.Pause()
err := uplink.CreateBucket(ctx, satellite, "test-bucket")
require.NoError(t, err)
testData := testrand.Bytes(8 * memory.KiB)
err = uplink.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
project, err := uplink.OpenProject(ctx, satellite)
require.NoError(t, err)
defer ctx.Check(project.Close)
segments, err := metabaseDB.TestingAllSegments(ctx)
require.NoError(t, err)
require.Len(t, segments, 1)
_, err = project.CopyObject(ctx, "testbucket", "test/path", "testbucket", "empty", nil)
require.NoError(t, err)
segmentsAfterCopy, err := metabaseDB.TestingAllSegments(ctx)
require.NoError(t, err)
require.Len(t, segmentsAfterCopy, 2)
err = planet.StopNodeAndUpdate(ctx, planet.FindNode(segments[0].Pieces[0].StorageNode))
require.NoError(t, err)
checker.Loop.TriggerWait()
// check that injured segment in repair queue streamID is same that in original segment.
injuredSegment, err := repairQueue.Select(ctx)
require.NoError(t, err)
require.Equal(t, segments[0].StreamID, injuredSegment.StreamID)
// check that repair queue has only original segment, and not copied one.
injuredSegments, err := repairQueue.Count(ctx)
require.NoError(t, err)
require.Equal(t, 1, injuredSegments)
})
}
func createPieces(planet *testplanet.Planet, rs storj.RedundancyScheme) metabase.Pieces {
pieces := make(metabase.Pieces, rs.OptimalShares)
for i := range pieces {
pieces[i] = metabase.Piece{
Number: uint16(i),
StorageNode: planet.StorageNodes[i].Identity.ID,
}
}
return pieces
}
func createLostPieces(planet *testplanet.Planet, rs storj.RedundancyScheme) metabase.Pieces {
pieces := make(metabase.Pieces, rs.OptimalShares)
for i := range pieces[:rs.RequiredShares] {
pieces[i] = metabase.Piece{
Number: uint16(i),
StorageNode: planet.StorageNodes[i].Identity.ID,
}
}
for i := rs.RequiredShares; i < rs.OptimalShares; i++ {
pieces[i] = metabase.Piece{
Number: uint16(i),
StorageNode: storj.NodeID{byte(0xFF)},
}
}
return pieces
}
func insertSegment(ctx context.Context, t *testing.T, planet *testplanet.Planet, rs storj.RedundancyScheme, location metabase.SegmentLocation, pieces metabase.Pieces, expiresAt *time.Time) uuid.UUID {
metabaseDB := planet.Satellites[0].Metabase.DB
obj := metabase.ObjectStream{
ProjectID: location.ProjectID,
BucketName: location.BucketName,
ObjectKey: location.ObjectKey,
Version: 1,
StreamID: testrand.UUID(),
}
_, err := metabaseDB.BeginObjectExactVersion(ctx, metabase.BeginObjectExactVersion{
ObjectStream: obj,
Encryption: storj.EncryptionParameters{
CipherSuite: storj.EncAESGCM,
BlockSize: 256,
},
ExpiresAt: expiresAt,
})
require.NoError(t, err)
rootPieceID := testrand.PieceID()
err = metabaseDB.BeginSegment(ctx, metabase.BeginSegment{
ObjectStream: obj,
RootPieceID: rootPieceID,
Pieces: pieces,
})
require.NoError(t, err)
err = metabaseDB.CommitSegment(ctx, metabase.CommitSegment{
ObjectStream: obj,
RootPieceID: rootPieceID,
Pieces: pieces,
EncryptedKey: testrand.Bytes(256),
EncryptedKeyNonce: testrand.Bytes(256),
PlainSize: 1,
EncryptedSize: 1,
Redundancy: rs,
ExpiresAt: expiresAt,
})
require.NoError(t, err)
_, err = metabaseDB.CommitObject(ctx, metabase.CommitObject{
ObjectStream: obj,
})
require.NoError(t, err)
return obj.StreamID
}
func BenchmarkRemoteSegment(b *testing.B) {
testplanet.Bench(b, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(b *testing.B, ctx *testcontext.Context, planet *testplanet.Planet) {
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "object", testrand.Bytes(10*memory.KiB))
require.NoError(b, err)
observer := checker.NewCheckerObserver(planet.Satellites[0].Repair.Checker)
segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
require.NoError(b, err)
loopSegment := &segmentloop.Segment{
StreamID: segments[0].StreamID,
Position: segments[0].Position,
CreatedAt: segments[0].CreatedAt,
ExpiresAt: segments[0].ExpiresAt,
Redundancy: segments[0].Redundancy,
Pieces: segments[0].Pieces,
}
b.Run("healthy segment", func(b *testing.B) {
for i := 0; i < b.N; i++ {
err := observer.RemoteSegment(ctx, loopSegment)
if err != nil {
b.FailNow()
}
}
})
})
}

View File

@ -1,184 +0,0 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package checker
import (
"fmt"
"github.com/spacemonkeygo/monkit/v3"
"storj.io/common/uuid"
)
// statsCollector holds a *stats for each redundancy scheme
// seen by the checker. These are chained into the monkit scope for
// monitoring as they are initialized.
type statsCollector struct {
stats map[string]*stats
}
func newStatsCollector() *statsCollector {
return &statsCollector{
stats: make(map[string]*stats),
}
}
func (collector *statsCollector) getStatsByRS(rs string) *stats {
stats, ok := collector.stats[rs]
if !ok {
stats = newStats(rs)
mon.Chain(stats)
collector.stats[rs] = stats
}
return stats
}
// collectAggregates transfers the iteration aggregates into the
// respective stats monkit metrics at the end of each checker iteration.
// iterationAggregates is then cleared.
func (collector *statsCollector) collectAggregates() {
for _, stats := range collector.stats {
stats.collectAggregates()
stats.iterationAggregates = new(aggregateStats)
}
}
// stats is used for collecting and reporting checker metrics.
//
// add any new metrics tagged with rs_scheme to this struct and set them
// in newStats.
type stats struct {
iterationAggregates *aggregateStats
objectsChecked *monkit.IntVal
remoteSegmentsChecked *monkit.IntVal
remoteSegmentsNeedingRepair *monkit.IntVal
newRemoteSegmentsNeedingRepair *monkit.IntVal
remoteSegmentsLost *monkit.IntVal
objectsLost *monkit.IntVal
remoteSegmentsFailedToCheck *monkit.IntVal
remoteSegmentsHealthyPercentage *monkit.FloatVal
// remoteSegmentsOverThreshold[0]=# of healthy=rt+1, remoteSegmentsOverThreshold[1]=# of healthy=rt+2, etc...
remoteSegmentsOverThreshold1 *monkit.IntVal
remoteSegmentsOverThreshold2 *monkit.IntVal
remoteSegmentsOverThreshold3 *monkit.IntVal
remoteSegmentsOverThreshold4 *monkit.IntVal
remoteSegmentsOverThreshold5 *monkit.IntVal
segmentsBelowMinReq *monkit.Counter
segmentTotalCount *monkit.IntVal
segmentHealthyCount *monkit.IntVal
segmentClumpedCount *monkit.IntVal
segmentAge *monkit.IntVal
segmentHealth *monkit.FloatVal
injuredSegmentHealth *monkit.FloatVal
segmentTimeUntilIrreparable *monkit.IntVal
}
// aggregateStats tallies data over the full checker iteration.
type aggregateStats struct {
objectsChecked int64
remoteSegmentsChecked int64
remoteSegmentsNeedingRepair int64
newRemoteSegmentsNeedingRepair int64
remoteSegmentsLost int64
remoteSegmentsFailedToCheck int64
objectsLost []uuid.UUID
// remoteSegmentsOverThreshold[0]=# of healthy=rt+1, remoteSegmentsOverThreshold[1]=# of healthy=rt+2, etc...
remoteSegmentsOverThreshold [5]int64
}
func (a *aggregateStats) combine(stats aggregateStats) {
a.objectsChecked += stats.objectsChecked
a.remoteSegmentsChecked += stats.remoteSegmentsChecked
a.remoteSegmentsNeedingRepair += stats.remoteSegmentsNeedingRepair
a.newRemoteSegmentsNeedingRepair += stats.newRemoteSegmentsNeedingRepair
a.remoteSegmentsLost += stats.remoteSegmentsLost
a.remoteSegmentsFailedToCheck += stats.remoteSegmentsFailedToCheck
a.objectsLost = append(a.objectsLost, stats.objectsLost...)
a.remoteSegmentsOverThreshold[0] += stats.remoteSegmentsOverThreshold[0]
a.remoteSegmentsOverThreshold[1] += stats.remoteSegmentsOverThreshold[1]
a.remoteSegmentsOverThreshold[2] += stats.remoteSegmentsOverThreshold[2]
a.remoteSegmentsOverThreshold[3] += stats.remoteSegmentsOverThreshold[3]
a.remoteSegmentsOverThreshold[4] += stats.remoteSegmentsOverThreshold[4]
}
func newStats(rs string) *stats {
return &stats{
iterationAggregates: new(aggregateStats),
objectsChecked: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "remote_objects_checked").WithTag("rs_scheme", rs)),
remoteSegmentsChecked: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "remote_segments_checked").WithTag("rs_scheme", rs)),
remoteSegmentsNeedingRepair: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "remote_segments_needing_repair").WithTag("rs_scheme", rs)),
newRemoteSegmentsNeedingRepair: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "new_remote_segments_needing_repair").WithTag("rs_scheme", rs)),
remoteSegmentsLost: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "remote_segments_lost").WithTag("rs_scheme", rs)),
objectsLost: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "objects_lost").WithTag("rs_scheme", rs)),
remoteSegmentsFailedToCheck: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "remote_segments_failed_to_check").WithTag("rs_scheme", rs)),
remoteSegmentsHealthyPercentage: monkit.NewFloatVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "remote_segments_healthy_percentage").WithTag("rs_scheme", rs)),
remoteSegmentsOverThreshold1: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "remote_segments_over_threshold_1").WithTag("rs_scheme", rs)),
remoteSegmentsOverThreshold2: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "remote_segments_over_threshold_2").WithTag("rs_scheme", rs)),
remoteSegmentsOverThreshold3: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "remote_segments_over_threshold_3").WithTag("rs_scheme", rs)),
remoteSegmentsOverThreshold4: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "remote_segments_over_threshold_4").WithTag("rs_scheme", rs)),
remoteSegmentsOverThreshold5: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "remote_segments_over_threshold_5").WithTag("rs_scheme", rs)),
segmentsBelowMinReq: monkit.NewCounter(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segments_below_min_req").WithTag("rs_scheme", rs)),
segmentTotalCount: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_total_count").WithTag("rs_scheme", rs)),
segmentHealthyCount: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_healthy_count").WithTag("rs_scheme", rs)),
segmentClumpedCount: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_clumped_count").WithTag("rs_scheme", rs)),
segmentAge: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_age").WithTag("rs_scheme", rs)),
segmentHealth: monkit.NewFloatVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_health").WithTag("rs_scheme", rs)),
injuredSegmentHealth: monkit.NewFloatVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_injured_segment_health").WithTag("rs_scheme", rs)),
segmentTimeUntilIrreparable: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_time_until_irreparable").WithTag("rs_scheme", rs)),
}
}
func (stats *stats) collectAggregates() {
stats.objectsChecked.Observe(stats.iterationAggregates.objectsChecked)
stats.remoteSegmentsChecked.Observe(stats.iterationAggregates.remoteSegmentsChecked)
stats.remoteSegmentsNeedingRepair.Observe(stats.iterationAggregates.remoteSegmentsNeedingRepair)
stats.newRemoteSegmentsNeedingRepair.Observe(stats.iterationAggregates.newRemoteSegmentsNeedingRepair)
stats.remoteSegmentsLost.Observe(stats.iterationAggregates.remoteSegmentsLost)
stats.objectsLost.Observe(int64(len(stats.iterationAggregates.objectsLost)))
stats.remoteSegmentsFailedToCheck.Observe(stats.iterationAggregates.remoteSegmentsFailedToCheck)
stats.remoteSegmentsOverThreshold1.Observe(stats.iterationAggregates.remoteSegmentsOverThreshold[0])
stats.remoteSegmentsOverThreshold2.Observe(stats.iterationAggregates.remoteSegmentsOverThreshold[1])
stats.remoteSegmentsOverThreshold3.Observe(stats.iterationAggregates.remoteSegmentsOverThreshold[2])
stats.remoteSegmentsOverThreshold4.Observe(stats.iterationAggregates.remoteSegmentsOverThreshold[3])
stats.remoteSegmentsOverThreshold5.Observe(stats.iterationAggregates.remoteSegmentsOverThreshold[4])
allUnhealthy := stats.iterationAggregates.remoteSegmentsNeedingRepair + stats.iterationAggregates.remoteSegmentsFailedToCheck
allChecked := stats.iterationAggregates.remoteSegmentsChecked
allHealthy := allChecked - allUnhealthy
stats.remoteSegmentsHealthyPercentage.Observe(100 * float64(allHealthy) / float64(allChecked))
}
// Stats implements the monkit.StatSource interface.
func (stats *stats) Stats(cb func(key monkit.SeriesKey, field string, val float64)) {
stats.objectsChecked.Stats(cb)
stats.remoteSegmentsChecked.Stats(cb)
stats.remoteSegmentsNeedingRepair.Stats(cb)
stats.newRemoteSegmentsNeedingRepair.Stats(cb)
stats.remoteSegmentsLost.Stats(cb)
stats.objectsLost.Stats(cb)
stats.remoteSegmentsFailedToCheck.Stats(cb)
stats.remoteSegmentsOverThreshold1.Stats(cb)
stats.remoteSegmentsOverThreshold2.Stats(cb)
stats.remoteSegmentsOverThreshold3.Stats(cb)
stats.remoteSegmentsOverThreshold4.Stats(cb)
stats.remoteSegmentsOverThreshold5.Stats(cb)
stats.remoteSegmentsHealthyPercentage.Stats(cb)
stats.segmentsBelowMinReq.Stats(cb)
stats.segmentTotalCount.Stats(cb)
stats.segmentHealthyCount.Stats(cb)
stats.segmentAge.Stats(cb)
stats.segmentHealth.Stats(cb)
stats.injuredSegmentHealth.Stats(cb)
stats.segmentTimeUntilIrreparable.Stats(cb)
}
func getRSString(min, repair, success, total int) string {
return fmt.Sprintf("%d/%d/%d/%d", min, repair, success, total)
}

View File

@ -0,0 +1,15 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package checker
import (
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
)
var (
// Error is a standard error class for this package.
Error = errs.Class("repair checker")
mon = monkit.Package()
)

View File

@ -13,6 +13,7 @@ import (
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/exp/slices"
"storj.io/common/storj"
"storj.io/common/uuid"
@ -23,15 +24,17 @@ import (
"storj.io/storj/satellite/repair/queue"
)
var _ rangedloop.Observer = (*RangedLoopObserver)(nil)
var _ rangedloop.Observer = (*Observer)(nil)
var _ rangedloop.Partial = (*observerFork)(nil)
// RangedLoopObserver implements the ranged loop Observer interface. Should be renamed to checkerObserver after rangedloop will replace segmentloop.
// Observer implements the ranged loop Observer interface.
//
// architecture: Observer
type RangedLoopObserver struct {
type Observer struct {
logger *zap.Logger
repairQueue queue.RepairQueue
nodestate *ReliabilityCache
overlayService *overlay.Service
repairOverrides RepairOverridesMap
nodeFailureRate float64
repairQueueBatchSize int
@ -44,13 +47,14 @@ type RangedLoopObserver struct {
statsCollector map[string]*observerRSStats
}
// NewRangedLoopObserver creates new checker observer instance.
func NewRangedLoopObserver(logger *zap.Logger, repairQueue queue.RepairQueue, overlay *overlay.Service, config Config) rangedloop.Observer {
return &RangedLoopObserver{
// NewObserver creates new checker observer instance.
func NewObserver(logger *zap.Logger, repairQueue queue.RepairQueue, overlay *overlay.Service, config Config) *Observer {
return &Observer{
logger: logger,
repairQueue: repairQueue,
nodestate: NewReliabilityCache(overlay, config.ReliabilityCacheStaleness),
overlayService: overlay,
repairOverrides: config.RepairOverrides.GetMap(),
nodeFailureRate: config.NodeFailureRate,
repairQueueBatchSize: config.RepairQueueInsertBatchSize,
@ -63,7 +67,7 @@ func NewRangedLoopObserver(logger *zap.Logger, repairQueue queue.RepairQueue, ov
//
// We can't calculate this upon first starting a Ranged Loop Observer, because there may not be any
// nodes yet. We expect that there will be nodes before there are segments, though.
func (observer *RangedLoopObserver) getNodesEstimate(ctx context.Context) (int, error) {
func (observer *Observer) getNodesEstimate(ctx context.Context) (int, error) {
// this should be safe to call frequently; it is an efficient caching lookup.
totalNumNodes, err := observer.nodestate.NumNodes(ctx)
if err != nil {
@ -82,12 +86,12 @@ func (observer *RangedLoopObserver) getNodesEstimate(ctx context.Context) (int,
return totalNumNodes, nil
}
func (observer *RangedLoopObserver) createInsertBuffer() *queue.InsertBuffer {
func (observer *Observer) createInsertBuffer() *queue.InsertBuffer {
return queue.NewInsertBuffer(observer.repairQueue, observer.repairQueueBatchSize)
}
// TestingCompareInjuredSegmentIDs compares stream id of injured segment.
func (observer *RangedLoopObserver) TestingCompareInjuredSegmentIDs(ctx context.Context, streamIDs []uuid.UUID) error {
func (observer *Observer) TestingCompareInjuredSegmentIDs(ctx context.Context, streamIDs []uuid.UUID) error {
injuredSegments, err := observer.repairQueue.SelectN(ctx, 100)
if err != nil {
return err
@ -114,7 +118,7 @@ func (observer *RangedLoopObserver) TestingCompareInjuredSegmentIDs(ctx context.
}
// Start starts parallel segments loop.
func (observer *RangedLoopObserver) Start(ctx context.Context, startTime time.Time) (err error) {
func (observer *Observer) Start(ctx context.Context, startTime time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
observer.startTime = startTime
@ -124,18 +128,18 @@ func (observer *RangedLoopObserver) Start(ctx context.Context, startTime time.Ti
}
// Fork creates a Partial to process a chunk of all the segments.
func (observer *RangedLoopObserver) Fork(ctx context.Context) (_ rangedloop.Partial, err error) {
func (observer *Observer) Fork(ctx context.Context) (_ rangedloop.Partial, err error) {
defer mon.Task()(&ctx)(&err)
return newRangedLoopCheckerPartial(observer), nil
return newObserverFork(observer), nil
}
// Join is called after the chunk for Partial is done.
// This gives the opportunity to merge the output like in a reduce step.
func (observer *RangedLoopObserver) Join(ctx context.Context, partial rangedloop.Partial) (err error) {
func (observer *Observer) Join(ctx context.Context, partial rangedloop.Partial) (err error) {
defer mon.Task()(&ctx)(&err)
repPartial, ok := partial.(*repairPartial)
repPartial, ok := partial.(*observerFork)
if !ok {
return Error.New("expected partial type %T but got %T", repPartial, partial)
}
@ -154,7 +158,7 @@ func (observer *RangedLoopObserver) Join(ctx context.Context, partial rangedloop
}
// Finish is called after all segments are processed by all observers.
func (observer *RangedLoopObserver) Finish(ctx context.Context) (err error) {
func (observer *Observer) Finish(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
// remove all segments which were not seen as unhealthy by this checker iteration
@ -186,13 +190,13 @@ func (observer *RangedLoopObserver) Finish(ctx context.Context) (err error) {
return nil
}
func (observer *RangedLoopObserver) collectAggregates() {
func (observer *Observer) collectAggregates() {
for _, stats := range observer.statsCollector {
stats.collectAggregates()
}
}
func (observer *RangedLoopObserver) getObserverStats(rsString string) *observerRSStats {
func (observer *Observer) getObserverStats(rsString string) *observerRSStats {
observer.mu.Lock()
defer observer.mu.Unlock()
@ -207,16 +211,15 @@ func (observer *RangedLoopObserver) getObserverStats(rsString string) *observerR
}
// RefreshReliabilityCache forces refreshing node online status cache.
func (observer *RangedLoopObserver) RefreshReliabilityCache(ctx context.Context) error {
func (observer *Observer) RefreshReliabilityCache(ctx context.Context) error {
return observer.nodestate.Refresh(ctx)
}
// repairPartial implements the ranged loop Partial interface.
//
// architecture: Observer
type repairPartial struct {
// observerFork implements the ranged loop Partial interface.
type observerFork struct {
repairQueue *queue.InsertBuffer
nodestate *ReliabilityCache
overlayService *overlay.Service
rsStats map[string]*partialRSStats
repairOverrides RepairOverridesMap
nodeFailureRate float64
@ -228,12 +231,13 @@ type repairPartial struct {
getObserverStats func(string) *observerRSStats
}
// newRangedLoopCheckerPartial creates new checker partial instance.
func newRangedLoopCheckerPartial(observer *RangedLoopObserver) rangedloop.Partial {
// newObserverFork creates new observer partial instance.
func newObserverFork(observer *Observer) rangedloop.Partial {
// we can only share thread-safe objects.
return &repairPartial{
return &observerFork{
repairQueue: observer.createInsertBuffer(),
nodestate: observer.nodestate,
overlayService: observer.overlayService,
rsStats: make(map[string]*partialRSStats),
repairOverrides: observer.repairOverrides,
nodeFailureRate: observer.nodeFailureRate,
@ -243,27 +247,27 @@ func newRangedLoopCheckerPartial(observer *RangedLoopObserver) rangedloop.Partia
}
}
func (rp *repairPartial) getStatsByRS(redundancy storj.RedundancyScheme) *partialRSStats {
rsString := getRSString(rp.loadRedundancy(redundancy))
func (fork *observerFork) getStatsByRS(redundancy storj.RedundancyScheme) *partialRSStats {
rsString := getRSString(fork.loadRedundancy(redundancy))
stats, ok := rp.rsStats[rsString]
stats, ok := fork.rsStats[rsString]
if !ok {
observerStats := rp.getObserverStats(rsString)
observerStats := fork.getObserverStats(rsString)
rp.rsStats[rsString] = &partialRSStats{
fork.rsStats[rsString] = &partialRSStats{
iterationAggregates: aggregateStats{},
segmentStats: observerStats.segmentStats,
}
return rp.rsStats[rsString]
return fork.rsStats[rsString]
}
return stats
}
func (rp *repairPartial) loadRedundancy(redundancy storj.RedundancyScheme) (int, int, int, int) {
func (fork *observerFork) loadRedundancy(redundancy storj.RedundancyScheme) (int, int, int, int) {
repair := int(redundancy.RepairShares)
overrideValue := rp.repairOverrides.GetOverrideValue(redundancy)
overrideValue := fork.repairOverrides.GetOverrideValue(redundancy)
if overrideValue != 0 {
repair = int(overrideValue)
}
@ -272,9 +276,9 @@ func (rp *repairPartial) loadRedundancy(redundancy storj.RedundancyScheme) (int,
}
// Process repair implementation of partial's Process.
func (rp *repairPartial) Process(ctx context.Context, segments []segmentloop.Segment) (err error) {
func (fork *observerFork) Process(ctx context.Context, segments []segmentloop.Segment) (err error) {
for _, segment := range segments {
if err := rp.process(ctx, &segment); err != nil {
if err := fork.process(ctx, &segment); err != nil {
return err
}
}
@ -282,11 +286,11 @@ func (rp *repairPartial) Process(ctx context.Context, segments []segmentloop.Seg
return nil
}
func (rp *repairPartial) process(ctx context.Context, segment *segmentloop.Segment) (err error) {
func (fork *observerFork) process(ctx context.Context, segment *segmentloop.Segment) (err error) {
if segment.Inline() {
if rp.lastStreamID.Compare(segment.StreamID) != 0 {
rp.lastStreamID = segment.StreamID
rp.totalStats.objectsChecked++
if fork.lastStreamID.Compare(segment.StreamID) != 0 {
fork.lastStreamID = segment.StreamID
fork.totalStats.objectsChecked++
}
return nil
@ -297,49 +301,65 @@ func (rp *repairPartial) process(ctx context.Context, segment *segmentloop.Segme
return nil
}
stats := rp.getStatsByRS(segment.Redundancy)
if rp.lastStreamID.Compare(segment.StreamID) != 0 {
rp.lastStreamID = segment.StreamID
stats := fork.getStatsByRS(segment.Redundancy)
if fork.lastStreamID.Compare(segment.StreamID) != 0 {
fork.lastStreamID = segment.StreamID
stats.iterationAggregates.objectsChecked++
rp.totalStats.objectsChecked++
fork.totalStats.objectsChecked++
}
rp.totalStats.remoteSegmentsChecked++
fork.totalStats.remoteSegmentsChecked++
stats.iterationAggregates.remoteSegmentsChecked++
// ensure we get values, even if only zero values, so that redash can have an alert based on this
mon.Counter("checker_segments_below_min_req").Inc(0) //mon:locked
pieces := segment.Pieces
if len(pieces) == 0 {
rp.log.Debug("no pieces on remote segment")
fork.log.Debug("no pieces on remote segment")
return nil
}
totalNumNodes, err := rp.getNodesEstimate(ctx)
totalNumNodes, err := fork.getNodesEstimate(ctx)
if err != nil {
return Error.New("could not get estimate of total number of nodes: %w", err)
}
missingPieces, err := rp.nodestate.MissingPieces(ctx, segment.CreatedAt, segment.Pieces)
missingPieces, err := fork.nodestate.MissingPieces(ctx, segment.CreatedAt, segment.Pieces)
if err != nil {
rp.totalStats.remoteSegmentsFailedToCheck++
fork.totalStats.remoteSegmentsFailedToCheck++
stats.iterationAggregates.remoteSegmentsFailedToCheck++
return Error.New("error getting missing pieces: %w", err)
}
numHealthy := len(pieces) - len(missingPieces)
// if multiple pieces are on the same last_net, keep only the first one. The rest are
// to be considered retrievable but unhealthy.
nodeIDs := make([]storj.NodeID, len(pieces))
for i, p := range pieces {
nodeIDs[i] = p.StorageNode
}
lastNets, err := fork.overlayService.GetNodesNetworkInOrder(ctx, nodeIDs)
if err != nil {
fork.totalStats.remoteSegmentsFailedToCheck++
stats.iterationAggregates.remoteSegmentsFailedToCheck++
return errs.Combine(Error.New("error determining node last_nets"), err)
}
clumpedPieces := repair.FindClumpedPieces(segment.Pieces, lastNets)
numHealthy := len(pieces) - len(missingPieces) - len(clumpedPieces)
mon.IntVal("checker_segment_total_count").Observe(int64(len(pieces))) //mon:locked
stats.segmentStats.segmentTotalCount.Observe(int64(len(pieces)))
mon.IntVal("checker_segment_healthy_count").Observe(int64(numHealthy)) //mon:locked
stats.segmentStats.segmentHealthyCount.Observe(int64(numHealthy))
mon.IntVal("checker_segment_clumped_count").Observe(int64(len(clumpedPieces))) //mon:locked
stats.segmentStats.segmentClumpedCount.Observe(int64(len(clumpedPieces)))
segmentAge := time.Since(segment.CreatedAt)
mon.IntVal("checker_segment_age").Observe(int64(segmentAge.Seconds())) //mon:locked
stats.segmentStats.segmentAge.Observe(int64(segmentAge.Seconds()))
required, repairThreshold, successThreshold, _ := rp.loadRedundancy(segment.Redundancy)
segmentHealth := repair.SegmentHealth(numHealthy, required, totalNumNodes, rp.nodeFailureRate)
required, repairThreshold, successThreshold, _ := fork.loadRedundancy(segment.Redundancy)
segmentHealth := repair.SegmentHealth(numHealthy, required, totalNumNodes, fork.nodeFailureRate)
mon.FloatVal("checker_segment_health").Observe(segmentHealth) //mon:locked
stats.segmentStats.segmentHealth.Observe(segmentHealth)
@ -349,9 +369,9 @@ func (rp *repairPartial) process(ctx context.Context, segment *segmentloop.Segme
if numHealthy <= repairThreshold && numHealthy < successThreshold {
mon.FloatVal("checker_injured_segment_health").Observe(segmentHealth) //mon:locked
stats.segmentStats.injuredSegmentHealth.Observe(segmentHealth)
rp.totalStats.remoteSegmentsNeedingRepair++
fork.totalStats.remoteSegmentsNeedingRepair++
stats.iterationAggregates.remoteSegmentsNeedingRepair++
err := rp.repairQueue.Insert(ctx, &queue.InjuredSegment{
err := fork.repairQueue.Insert(ctx, &queue.InjuredSegment{
StreamID: segment.StreamID,
Position: segment.Position,
UpdatedAt: time.Now().UTC(),
@ -359,21 +379,21 @@ func (rp *repairPartial) process(ctx context.Context, segment *segmentloop.Segme
}, func() {
// Counters are increased after the queue has determined
// that the segment wasn't already queued for repair.
rp.totalStats.newRemoteSegmentsNeedingRepair++
fork.totalStats.newRemoteSegmentsNeedingRepair++
stats.iterationAggregates.newRemoteSegmentsNeedingRepair++
})
if err != nil {
rp.log.Error("error adding injured segment to queue", zap.Error(err))
fork.log.Error("error adding injured segment to queue", zap.Error(err))
return nil
}
// monitor irreparable segments
if numHealthy < required {
if !containsStreamID(rp.totalStats.objectsLost, segment.StreamID) {
rp.totalStats.objectsLost = append(rp.totalStats.objectsLost, segment.StreamID)
if !slices.Contains(fork.totalStats.objectsLost, segment.StreamID) {
fork.totalStats.objectsLost = append(fork.totalStats.objectsLost, segment.StreamID)
}
if !containsStreamID(stats.iterationAggregates.objectsLost, segment.StreamID) {
if !slices.Contains(stats.iterationAggregates.objectsLost, segment.StreamID) {
stats.iterationAggregates.objectsLost = append(stats.iterationAggregates.objectsLost, segment.StreamID)
}
@ -392,7 +412,7 @@ func (rp *repairPartial) process(ctx context.Context, segment *segmentloop.Segme
mon.IntVal("checker_segment_time_until_irreparable").Observe(int64(segmentAge.Seconds())) //mon:locked
stats.segmentStats.segmentTimeUntilIrreparable.Observe(int64(segmentAge.Seconds()))
rp.totalStats.remoteSegmentsLost++
fork.totalStats.remoteSegmentsLost++
stats.iterationAggregates.remoteSegmentsLost++
mon.Counter("checker_segments_below_min_req").Inc(1) //mon:locked
@ -402,16 +422,16 @@ func (rp *repairPartial) process(ctx context.Context, segment *segmentloop.Segme
for _, p := range missingPieces {
unhealthyNodes = append(unhealthyNodes, p.StorageNode.String())
}
rp.log.Warn("checker found irreparable segment", zap.String("Segment StreamID", segment.StreamID.String()), zap.Int("Segment Position",
fork.log.Warn("checker found irreparable segment", zap.String("Segment StreamID", segment.StreamID.String()), zap.Int("Segment Position",
int(segment.Position.Encode())), zap.Int("total pieces", len(pieces)), zap.Int("min required", required), zap.String("unhealthy node IDs", strings.Join(unhealthyNodes, ",")))
}
} else {
if numHealthy > repairThreshold && numHealthy <= (repairThreshold+len(rp.totalStats.remoteSegmentsOverThreshold)) {
if numHealthy > repairThreshold && numHealthy <= (repairThreshold+len(fork.totalStats.remoteSegmentsOverThreshold)) {
// record metrics for segments right above repair threshold
// numHealthy=repairThreshold+1 through numHealthy=repairThreshold+5
for i := range rp.totalStats.remoteSegmentsOverThreshold {
for i := range fork.totalStats.remoteSegmentsOverThreshold {
if numHealthy == (repairThreshold + i + 1) {
rp.totalStats.remoteSegmentsOverThreshold[i]++
fork.totalStats.remoteSegmentsOverThreshold[i]++
break
}
}

View File

@ -4,9 +4,11 @@
package checker_test
import (
"context"
"fmt"
"reflect"
"sort"
"strconv"
"testing"
"time"
@ -22,6 +24,7 @@ import (
"storj.io/storj/satellite"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/rangedloop"
"storj.io/storj/satellite/metabase/segmentloop"
"storj.io/storj/satellite/repair/checker"
"storj.io/storj/satellite/repair/queue"
)
@ -261,7 +264,7 @@ func TestCleanRepairQueueObserver(t *testing.T) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
rangedLoopService := planet.Satellites[0].RangedLoop.RangedLoop.Service
repairQueue := planet.Satellites[0].DB.RepairQueue()
observer := planet.Satellites[0].RangedLoop.Repair.Observer.(*checker.RangedLoopObserver)
observer := planet.Satellites[0].RangedLoop.Repair.Observer
planet.Satellites[0].Repair.Repairer.Loop.Pause()
rs := storj.RedundancyScheme{
@ -459,3 +462,120 @@ func TestRepairObserver(t *testing.T) {
}
})
}
func createPieces(planet *testplanet.Planet, rs storj.RedundancyScheme) metabase.Pieces {
pieces := make(metabase.Pieces, rs.OptimalShares)
for i := range pieces {
pieces[i] = metabase.Piece{
Number: uint16(i),
StorageNode: planet.StorageNodes[i].Identity.ID,
}
}
return pieces
}
func createLostPieces(planet *testplanet.Planet, rs storj.RedundancyScheme) metabase.Pieces {
pieces := make(metabase.Pieces, rs.OptimalShares)
for i := range pieces[:rs.RequiredShares] {
pieces[i] = metabase.Piece{
Number: uint16(i),
StorageNode: planet.StorageNodes[i].Identity.ID,
}
}
for i := rs.RequiredShares; i < rs.OptimalShares; i++ {
pieces[i] = metabase.Piece{
Number: uint16(i),
StorageNode: storj.NodeID{byte(0xFF)},
}
}
return pieces
}
func insertSegment(ctx context.Context, t *testing.T, planet *testplanet.Planet, rs storj.RedundancyScheme, location metabase.SegmentLocation, pieces metabase.Pieces, expiresAt *time.Time) uuid.UUID {
metabaseDB := planet.Satellites[0].Metabase.DB
obj := metabase.ObjectStream{
ProjectID: location.ProjectID,
BucketName: location.BucketName,
ObjectKey: location.ObjectKey,
Version: 1,
StreamID: testrand.UUID(),
}
_, err := metabaseDB.BeginObjectExactVersion(ctx, metabase.BeginObjectExactVersion{
ObjectStream: obj,
Encryption: storj.EncryptionParameters{
CipherSuite: storj.EncAESGCM,
BlockSize: 256,
},
ExpiresAt: expiresAt,
})
require.NoError(t, err)
rootPieceID := testrand.PieceID()
err = metabaseDB.BeginSegment(ctx, metabase.BeginSegment{
ObjectStream: obj,
RootPieceID: rootPieceID,
Pieces: pieces,
})
require.NoError(t, err)
err = metabaseDB.CommitSegment(ctx, metabase.CommitSegment{
ObjectStream: obj,
RootPieceID: rootPieceID,
Pieces: pieces,
EncryptedKey: testrand.Bytes(256),
EncryptedKeyNonce: testrand.Bytes(256),
PlainSize: 1,
EncryptedSize: 1,
Redundancy: rs,
ExpiresAt: expiresAt,
})
require.NoError(t, err)
_, err = metabaseDB.CommitObject(ctx, metabase.CommitObject{
ObjectStream: obj,
})
require.NoError(t, err)
return obj.StreamID
}
func BenchmarkRemoteSegment(b *testing.B) {
testplanet.Bench(b, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(b *testing.B, ctx *testcontext.Context, planet *testplanet.Planet) {
for i := 0; i < 10; i++ {
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "object"+strconv.Itoa(i), testrand.Bytes(10*memory.KiB))
require.NoError(b, err)
}
observer := checker.NewObserver(zap.NewNop(), planet.Satellites[0].DB.RepairQueue(),
planet.Satellites[0].Auditor.Overlay, planet.Satellites[0].Config.Checker)
segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
require.NoError(b, err)
loopSegments := []segmentloop.Segment{}
for _, segment := range segments {
loopSegments = append(loopSegments, segmentloop.Segment{
StreamID: segment.StreamID,
Position: segment.Position,
CreatedAt: segment.CreatedAt,
ExpiresAt: segment.ExpiresAt,
Redundancy: segment.Redundancy,
Pieces: segment.Pieces,
})
}
fork, err := observer.Fork(ctx)
require.NoError(b, err)
b.Run("healthy segment", func(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = fork.Process(ctx, loopSegments)
}
})
})
}

View File

@ -3,7 +3,13 @@
package checker
import "github.com/spacemonkeygo/monkit/v3"
import (
"fmt"
"github.com/spacemonkeygo/monkit/v3"
"storj.io/common/uuid"
)
type observerRSStats struct {
// iterationAggregates contains the aggregated counters across all partials.
@ -96,6 +102,7 @@ type segmentRSStats struct {
segmentsBelowMinReq *monkit.Counter
segmentTotalCount *monkit.IntVal
segmentHealthyCount *monkit.IntVal
segmentClumpedCount *monkit.IntVal
segmentAge *monkit.IntVal
segmentHealth *monkit.FloatVal
injuredSegmentHealth *monkit.FloatVal
@ -107,6 +114,7 @@ func newSegmentRSStats(rs string) *segmentRSStats {
segmentsBelowMinReq: monkit.NewCounter(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segments_below_min_req").WithTag("rs_scheme", rs)),
segmentTotalCount: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_total_count").WithTag("rs_scheme", rs)),
segmentHealthyCount: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_healthy_count").WithTag("rs_scheme", rs)),
segmentClumpedCount: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_clumped_count").WithTag("rs_scheme", rs)),
segmentAge: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_age").WithTag("rs_scheme", rs)),
segmentHealth: monkit.NewFloatVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_health").WithTag("rs_scheme", rs)),
injuredSegmentHealth: monkit.NewFloatVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_injured_segment_health").WithTag("rs_scheme", rs)),
@ -137,3 +145,37 @@ func (stats *observerRSStats) collectAggregates() {
// resetting iteration aggregates after loop run finished
stats.iterationAggregates = aggregateStats{}
}
// aggregateStats tallies data over the full checker iteration.
type aggregateStats struct {
objectsChecked int64
remoteSegmentsChecked int64
remoteSegmentsNeedingRepair int64
newRemoteSegmentsNeedingRepair int64
remoteSegmentsLost int64
remoteSegmentsFailedToCheck int64
objectsLost []uuid.UUID
// remoteSegmentsOverThreshold[0]=# of healthy=rt+1, remoteSegmentsOverThreshold[1]=# of healthy=rt+2, etc...
remoteSegmentsOverThreshold [5]int64
}
func (a *aggregateStats) combine(stats aggregateStats) {
a.objectsChecked += stats.objectsChecked
a.remoteSegmentsChecked += stats.remoteSegmentsChecked
a.remoteSegmentsNeedingRepair += stats.remoteSegmentsNeedingRepair
a.newRemoteSegmentsNeedingRepair += stats.newRemoteSegmentsNeedingRepair
a.remoteSegmentsLost += stats.remoteSegmentsLost
a.remoteSegmentsFailedToCheck += stats.remoteSegmentsFailedToCheck
a.objectsLost = append(a.objectsLost, stats.objectsLost...)
a.remoteSegmentsOverThreshold[0] += stats.remoteSegmentsOverThreshold[0]
a.remoteSegmentsOverThreshold[1] += stats.remoteSegmentsOverThreshold[1]
a.remoteSegmentsOverThreshold[2] += stats.remoteSegmentsOverThreshold[2]
a.remoteSegmentsOverThreshold[3] += stats.remoteSegmentsOverThreshold[3]
a.remoteSegmentsOverThreshold[4] += stats.remoteSegmentsOverThreshold[4]
}
func getRSString(min, repair, success, total int) string {
return fmt.Sprintf("%d/%d/%d/%d", min, repair, success, total)
}

View File

@ -85,7 +85,7 @@ func testDataRepair(t *testing.T, inMemoryRepair bool, hashAlgo pb.PieceHashAlgo
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Pause()
satellite.Repair.Checker.Loop.Pause()
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
satellite.Repair.Repairer.Loop.Pause()
for _, storageNode := range planet.StorageNodes {
@ -150,9 +150,8 @@ func testDataRepair(t *testing.T, inMemoryRepair bool, hashAlgo pb.PieceHashAlgo
}
}
satellite.Repair.Checker.Loop.Restart()
satellite.Repair.Checker.Loop.TriggerWait()
satellite.Repair.Checker.Loop.Pause()
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
satellite.Repair.Repairer.Loop.Restart()
satellite.Repair.Repairer.Loop.TriggerWait()
satellite.Repair.Repairer.Loop.Pause()
@ -243,7 +242,7 @@ func TestDataRepairPendingObject(t *testing.T) {
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Pause()
satellite.Repair.Checker.Loop.Pause()
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
satellite.Repair.Repairer.Loop.Pause()
testData := testrand.Bytes(8 * memory.KiB)
@ -317,9 +316,8 @@ func TestDataRepairPendingObject(t *testing.T) {
}
}
satellite.Repair.Checker.Loop.Restart()
satellite.Repair.Checker.Loop.TriggerWait()
satellite.Repair.Checker.Loop.Pause()
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
satellite.Repair.Repairer.Loop.Restart()
satellite.Repair.Repairer.Loop.TriggerWait()
satellite.Repair.Repairer.Loop.Pause()
@ -386,7 +384,7 @@ func TestMinRequiredDataRepair(t *testing.T) {
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Pause()
satellite.Repair.Checker.Loop.Pause()
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
satellite.Repair.Repairer.Loop.Pause()
var testData = testrand.Bytes(8 * memory.KiB)
@ -428,9 +426,9 @@ func TestMinRequiredDataRepair(t *testing.T) {
nodesReputation[nodeID] = *info
}
satellite.Repair.Checker.Loop.Restart()
satellite.Repair.Checker.Loop.TriggerWait()
satellite.Repair.Checker.Loop.Pause()
// trigger checker with ranged loop to add segment to repair queue
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
satellite.Repair.Repairer.Loop.Restart()
satellite.Repair.Repairer.Loop.TriggerWait()
satellite.Repair.Repairer.Loop.Pause()
@ -496,7 +494,7 @@ func TestFailedDataRepair(t *testing.T) {
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Pause()
satellite.Repair.Checker.Loop.Pause()
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
satellite.Repair.Repairer.Loop.Pause()
var testData = testrand.Bytes(8 * memory.KiB)
@ -551,9 +549,8 @@ func TestFailedDataRepair(t *testing.T) {
}
satellite.Repair.Repairer.TestingSetMinFailures(2) // expecting one erroring node, one offline node
satellite.Repair.Checker.Loop.Restart()
satellite.Repair.Checker.Loop.TriggerWait()
satellite.Repair.Checker.Loop.Pause()
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
satellite.Repair.Repairer.Loop.Restart()
satellite.Repair.Repairer.Loop.TriggerWait()
satellite.Repair.Repairer.Loop.Pause()
@ -618,7 +615,7 @@ func TestOfflineNodeDataRepair(t *testing.T) {
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Pause()
satellite.Repair.Checker.Loop.Pause()
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
satellite.Repair.Repairer.Loop.Pause()
var testData = testrand.Bytes(8 * memory.KiB)
@ -666,9 +663,9 @@ func TestOfflineNodeDataRepair(t *testing.T) {
}
satellite.Repair.Repairer.TestingSetMinFailures(1) // expect one offline node
satellite.Repair.Checker.Loop.Restart()
satellite.Repair.Checker.Loop.TriggerWait()
satellite.Repair.Checker.Loop.Pause()
// trigger checker with ranged loop to add segment to repair queue
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
satellite.Repair.Repairer.Loop.Restart()
satellite.Repair.Repairer.Loop.TriggerWait()
satellite.Repair.Repairer.Loop.Pause()
@ -742,7 +739,7 @@ func TestUnknownErrorDataRepair(t *testing.T) {
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Pause()
satellite.Repair.Checker.Loop.Pause()
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
satellite.Repair.Repairer.Loop.Pause()
var testData = testrand.Bytes(8 * memory.KiB)
@ -791,9 +788,8 @@ func TestUnknownErrorDataRepair(t *testing.T) {
}
satellite.Repair.Repairer.TestingSetMinFailures(1) // expecting one bad node
satellite.Repair.Checker.Loop.Restart()
satellite.Repair.Checker.Loop.TriggerWait()
satellite.Repair.Checker.Loop.Pause()
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
satellite.Repair.Repairer.Loop.Restart()
satellite.Repair.Repairer.Loop.TriggerWait()
satellite.Repair.Repairer.Loop.Pause()
@ -865,7 +861,7 @@ func TestMissingPieceDataRepair_Succeed(t *testing.T) {
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Pause()
satellite.Repair.Checker.Loop.Pause()
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
satellite.Repair.Repairer.Loop.Pause()
var testData = testrand.Bytes(8 * memory.KiB)
@ -912,9 +908,9 @@ func TestMissingPieceDataRepair_Succeed(t *testing.T) {
}
satellite.Repair.Repairer.TestingSetMinFailures(1) // expect one node to have a missing piece
satellite.Repair.Checker.Loop.Restart()
satellite.Repair.Checker.Loop.TriggerWait()
satellite.Repair.Checker.Loop.Pause()
// trigger checker with ranged loop to add segment to repair queue
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
satellite.Repair.Repairer.Loop.Restart()
satellite.Repair.Repairer.Loop.TriggerWait()
satellite.Repair.Repairer.Loop.Pause()
@ -982,7 +978,7 @@ func TestMissingPieceDataRepair(t *testing.T) {
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Pause()
satellite.Repair.Checker.Loop.Pause()
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
satellite.Repair.Repairer.Loop.Pause()
var testData = testrand.Bytes(8 * memory.KiB)
@ -1035,9 +1031,8 @@ func TestMissingPieceDataRepair(t *testing.T) {
}
satellite.Repair.Repairer.TestingSetMinFailures(1) // expect one missing piece
satellite.Repair.Checker.Loop.Restart()
satellite.Repair.Checker.Loop.TriggerWait()
satellite.Repair.Checker.Loop.Pause()
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
satellite.Repair.Repairer.Loop.Restart()
satellite.Repair.Repairer.Loop.TriggerWait()
satellite.Repair.Repairer.Loop.Pause()
@ -1101,7 +1096,7 @@ func TestCorruptDataRepair_Succeed(t *testing.T) {
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Pause()
satellite.Repair.Checker.Loop.Pause()
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
satellite.Repair.Repairer.Loop.Pause()
var testData = testrand.Bytes(8 * memory.KiB)
@ -1147,9 +1142,9 @@ func TestCorruptDataRepair_Succeed(t *testing.T) {
}
satellite.Repair.Repairer.TestingSetMinFailures(1) // expect one node with bad data
satellite.Repair.Checker.Loop.Restart()
satellite.Repair.Checker.Loop.TriggerWait()
satellite.Repair.Checker.Loop.Pause()
// trigger checker with ranged loop to add segment to repair queue
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
satellite.Repair.Repairer.Loop.Restart()
satellite.Repair.Repairer.Loop.TriggerWait()
satellite.Repair.Repairer.Loop.Pause()
@ -1217,7 +1212,7 @@ func TestCorruptDataRepair_Failed(t *testing.T) {
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Pause()
satellite.Repair.Checker.Loop.Pause()
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
satellite.Repair.Repairer.Loop.Pause()
var testData = testrand.Bytes(8 * memory.KiB)
@ -1269,9 +1264,8 @@ func TestCorruptDataRepair_Failed(t *testing.T) {
}
satellite.Repair.Repairer.TestingSetMinFailures(1) // expect one corrupted piece
satellite.Repair.Checker.Loop.Restart()
satellite.Repair.Checker.Loop.TriggerWait()
satellite.Repair.Checker.Loop.Pause()
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
satellite.Repair.Repairer.Loop.Restart()
satellite.Repair.Repairer.Loop.TriggerWait()
satellite.Repair.Repairer.Loop.Pause()
@ -1325,7 +1319,7 @@ func TestRepairExpiredSegment(t *testing.T) {
satellite.Audit.Worker.Loop.Stop()
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
satellite.Repair.Checker.Loop.Pause()
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
satellite.Repair.Repairer.Loop.Pause()
testData := testrand.Bytes(8 * memory.KiB)
@ -1356,10 +1350,9 @@ func TestRepairExpiredSegment(t *testing.T) {
}
// trigger checker to add segment to repair queue
satellite.Repair.Checker.Loop.Restart()
satellite.Repair.Checker.Loop.TriggerWait()
satellite.Repair.Checker.Loop.Pause()
// trigger checker with ranged loop to add segment to repair queue
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
// Verify that the segment is on the repair queue
count, err := satellite.DB.RepairQueue().Count(ctx)
@ -1405,7 +1398,7 @@ func TestRemoveDeletedSegmentFromQueue(t *testing.T) {
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Stop()
satellite.Repair.Checker.Loop.Pause()
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
satellite.Repair.Repairer.Loop.Pause()
testData := testrand.Bytes(8 * memory.KiB)
@ -1437,9 +1430,8 @@ func TestRemoveDeletedSegmentFromQueue(t *testing.T) {
}
// trigger checker to add segment to repair queue
satellite.Repair.Checker.Loop.Restart()
satellite.Repair.Checker.Loop.TriggerWait()
satellite.Repair.Checker.Loop.Pause()
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
// Delete segment from the satellite database
err = uplinkPeer.DeleteObject(ctx, satellite, "testbucket", "test/path")
@ -1493,7 +1485,7 @@ func TestSegmentDeletedDuringRepair(t *testing.T) {
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Pause()
satellite.Repair.Checker.Loop.Pause()
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
satellite.Repair.Repairer.Loop.Pause()
var testData = testrand.Bytes(8 * memory.KiB)
@ -1521,9 +1513,8 @@ func TestSegmentDeletedDuringRepair(t *testing.T) {
require.Equal(t, 3, len(availableNodes))
// trigger checker to add segment to repair queue
satellite.Repair.Checker.Loop.Restart()
satellite.Repair.Checker.Loop.TriggerWait()
satellite.Repair.Checker.Loop.Pause()
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
count, err := satellite.DB.RepairQueue().Count(ctx)
require.NoError(t, err)
@ -1587,7 +1578,7 @@ func TestSegmentModifiedDuringRepair(t *testing.T) {
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Pause()
satellite.Repair.Checker.Loop.Pause()
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
satellite.Repair.Repairer.Loop.Pause()
var testData = testrand.Bytes(8 * memory.KiB)
@ -1614,10 +1605,9 @@ func TestSegmentModifiedDuringRepair(t *testing.T) {
}
require.Equal(t, 3, len(availableNodes))
// trigger checker to add segment to repair queue
satellite.Repair.Checker.Loop.Restart()
satellite.Repair.Checker.Loop.TriggerWait()
satellite.Repair.Checker.Loop.Pause()
// trigger checker with ranged loop to add segment to repair queue
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
count, err := satellite.DB.RepairQueue().Count(ctx)
require.NoError(t, err)
@ -1679,7 +1669,7 @@ func TestIrreparableSegmentAccordingToOverlay(t *testing.T) {
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Stop()
satellite.Repair.Checker.Loop.Pause()
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
satellite.Repair.Repairer.Loop.Pause()
testData := testrand.Bytes(8 * memory.KiB)
@ -1698,10 +1688,9 @@ func TestIrreparableSegmentAccordingToOverlay(t *testing.T) {
require.NoError(t, err)
}
// trigger checker to add segment to repair queue
satellite.Repair.Checker.Loop.Restart()
satellite.Repair.Checker.Loop.TriggerWait()
satellite.Repair.Checker.Loop.Pause()
// trigger checker with ranged loop to add segment to repair queue
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
// Disqualify nodes so that online nodes < minimum threshold
// This will make the segment irreparable
@ -1750,7 +1739,7 @@ func TestIrreparableSegmentNodesOffline(t *testing.T) {
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Stop()
satellite.Repair.Checker.Loop.Pause()
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
satellite.Repair.Repairer.Loop.Pause()
testData := testrand.Bytes(8 * memory.KiB)
@ -1775,10 +1764,9 @@ func TestIrreparableSegmentNodesOffline(t *testing.T) {
require.NoError(t, err)
}
// trigger checker to add segment to repair queue
satellite.Repair.Checker.Loop.Restart()
satellite.Repair.Checker.Loop.TriggerWait()
satellite.Repair.Checker.Loop.Pause()
// trigger checker with ranged loop to add segment to repair queue
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
// Verify that the segment is on the repair queue
count, err := satellite.DB.RepairQueue().Count(ctx)
@ -1855,7 +1843,7 @@ func TestRepairMultipleDisqualifiedAndSuspended(t *testing.T) {
uplinkPeer := planet.Uplinks[0]
satellite := planet.Satellites[0]
satellite.Repair.Checker.Loop.Pause()
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
satellite.Repair.Repairer.Loop.Pause()
testData := testrand.Bytes(8 * memory.KiB)
@ -1900,10 +1888,11 @@ func TestRepairMultipleDisqualifiedAndSuspended(t *testing.T) {
nodesToKeepAlive[remotePieces[i].StorageNode] = true
}
err = satellite.Repair.Checker.RefreshReliabilityCache(ctx)
err = satellite.RangedLoop.Repair.Observer.RefreshReliabilityCache(ctx)
require.NoError(t, err)
satellite.Repair.Checker.Loop.TriggerWait()
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
satellite.Repair.Repairer.Loop.TriggerWait()
satellite.Repair.Repairer.WaitForPendingRepairs()
@ -1963,7 +1952,7 @@ func TestDataRepairOverride_HigherLimit(t *testing.T) {
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Pause()
satellite.Repair.Checker.Loop.Pause()
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
satellite.Repair.Repairer.Loop.Pause()
var testData = testrand.Bytes(8 * memory.KiB)
@ -2000,9 +1989,9 @@ func TestDataRepairOverride_HigherLimit(t *testing.T) {
}
}
satellite.Repair.Checker.Loop.Restart()
satellite.Repair.Checker.Loop.TriggerWait()
satellite.Repair.Checker.Loop.Pause()
// trigger checker with ranged loop to add segment to repair queue
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
satellite.Repair.Repairer.Loop.Restart()
satellite.Repair.Repairer.Loop.TriggerWait()
satellite.Repair.Repairer.Loop.Pause()
@ -2050,7 +2039,7 @@ func TestDataRepairOverride_LowerLimit(t *testing.T) {
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Pause()
satellite.Repair.Checker.Loop.Pause()
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
satellite.Repair.Repairer.Loop.Pause()
var testData = testrand.Bytes(8 * memory.KiB)
@ -2088,9 +2077,10 @@ func TestDataRepairOverride_LowerLimit(t *testing.T) {
}
}
satellite.Repair.Checker.Loop.Restart()
satellite.Repair.Checker.Loop.TriggerWait()
satellite.Repair.Checker.Loop.Pause()
// trigger checker with ranged loop to add segment to repair queue
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
satellite.Repair.Repairer.Loop.Restart()
satellite.Repair.Repairer.Loop.TriggerWait()
satellite.Repair.Repairer.Loop.Pause()
@ -2115,9 +2105,10 @@ func TestDataRepairOverride_LowerLimit(t *testing.T) {
}
}
satellite.Repair.Checker.Loop.Restart()
satellite.Repair.Checker.Loop.TriggerWait()
satellite.Repair.Checker.Loop.Pause()
// trigger checker with ranged loop to add segment to repair queue
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
satellite.Repair.Repairer.Loop.Restart()
satellite.Repair.Repairer.Loop.TriggerWait()
satellite.Repair.Repairer.Loop.Pause()
@ -2164,7 +2155,7 @@ func TestDataRepairUploadLimit(t *testing.T) {
satellite := planet.Satellites[0]
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Pause()
satellite.Repair.Checker.Loop.Pause()
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
satellite.Repair.Repairer.Loop.Pause()
var (
@ -2220,9 +2211,9 @@ func TestDataRepairUploadLimit(t *testing.T) {
}
}
satellite.Repair.Checker.Loop.Restart()
satellite.Repair.Checker.Loop.TriggerWait()
satellite.Repair.Checker.Loop.Pause()
// trigger checker with ranged loop to add segment to repair queue
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
satellite.Repair.Repairer.Loop.Restart()
satellite.Repair.Repairer.Loop.TriggerWait()
satellite.Repair.Repairer.Loop.Pause()
@ -2280,7 +2271,7 @@ func TestRepairGracefullyExited(t *testing.T) {
uplinkPeer := planet.Uplinks[0]
satellite := planet.Satellites[0]
satellite.Repair.Checker.Loop.Pause()
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
satellite.Repair.Repairer.Loop.Pause()
testData := testrand.Bytes(8 * memory.KiB)
@ -2319,10 +2310,11 @@ func TestRepairGracefullyExited(t *testing.T) {
nodesToKeepAlive[remotePieces[i].StorageNode] = true
}
err = satellite.Repair.Checker.RefreshReliabilityCache(ctx)
err = satellite.RangedLoop.Repair.Observer.RefreshReliabilityCache(ctx)
require.NoError(t, err)
satellite.Repair.Checker.Loop.TriggerWait()
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
satellite.Repair.Repairer.Loop.TriggerWait()
satellite.Repair.Repairer.WaitForPendingRepairs()
@ -2469,7 +2461,7 @@ func TestECRepairerGet(t *testing.T) {
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Pause()
satellite.Repair.Checker.Loop.Pause()
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
satellite.Repair.Repairer.Loop.Pause()
var testData = testrand.Bytes(8 * memory.KiB)
@ -2510,7 +2502,7 @@ func TestECRepairerGetCorrupted(t *testing.T) {
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Pause()
satellite.Repair.Checker.Loop.Pause()
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
satellite.Repair.Repairer.Loop.Pause()
var testData = testrand.Bytes(8 * memory.KiB)
@ -2579,7 +2571,7 @@ func TestECRepairerGetMissingPiece(t *testing.T) {
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Pause()
satellite.Repair.Checker.Loop.Pause()
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
satellite.Repair.Repairer.Loop.Pause()
var testData = testrand.Bytes(8 * memory.KiB)
@ -2649,7 +2641,7 @@ func TestECRepairerGetOffline(t *testing.T) {
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Pause()
satellite.Repair.Checker.Loop.Pause()
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
satellite.Repair.Repairer.Loop.Pause()
var testData = testrand.Bytes(8 * memory.KiB)
@ -2719,7 +2711,7 @@ func TestECRepairerGetUnknown(t *testing.T) {
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Pause()
satellite.Repair.Checker.Loop.Pause()
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
satellite.Repair.Repairer.Loop.Pause()
var testData = testrand.Bytes(8 * memory.KiB)
@ -2790,7 +2782,7 @@ func TestECRepairerGetFailure(t *testing.T) {
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Pause()
satellite.Repair.Checker.Loop.Pause()
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
satellite.Repair.Repairer.Loop.Pause()
var testData = testrand.Bytes(8 * memory.KiB)
@ -3045,7 +3037,7 @@ func TestSegmentInExcludedCountriesRepair(t *testing.T) {
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Pause()
satellite.Repair.Checker.Loop.Pause()
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
satellite.Repair.Repairer.Loop.Pause()
var testData = testrand.Bytes(8 * memory.KiB)
@ -3065,10 +3057,9 @@ func TestSegmentInExcludedCountriesRepair(t *testing.T) {
err = planet.StopNodeAndUpdate(ctx, planet.FindNode(remotePieces[2].StorageNode))
require.NoError(t, err)
// trigger checker to add segment to repair queue
satellite.Repair.Checker.Loop.Restart()
satellite.Repair.Checker.Loop.TriggerWait()
satellite.Repair.Checker.Loop.Pause()
// trigger checker with ranged loop to add segment to repair queue
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
count, err := satellite.DB.RepairQueue().Count(ctx)
require.NoError(t, err)
@ -3138,7 +3129,7 @@ func TestSegmentInExcludedCountriesRepairIrreparable(t *testing.T) {
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Pause()
satellite.Repair.Checker.Loop.Pause()
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
satellite.Repair.Repairer.Loop.Pause()
var testData = testrand.Bytes(8 * memory.KiB)
@ -3159,10 +3150,9 @@ func TestSegmentInExcludedCountriesRepairIrreparable(t *testing.T) {
err = planet.StopNodeAndUpdate(ctx, planet.FindNode(offlineNode))
require.NoError(t, err)
// trigger checker to add segment to repair queue
satellite.Repair.Checker.Loop.Restart()
satellite.Repair.Checker.Loop.TriggerWait()
satellite.Repair.Checker.Loop.Pause()
// trigger checker with ranged loop to add segment to repair queue
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
count, err := satellite.DB.RepairQueue().Count(ctx)
require.NoError(t, err)
@ -3228,7 +3218,7 @@ func TestRepairClumpedPieces(t *testing.T) {
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Pause()
satellite.Repair.Checker.Loop.Pause()
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
satellite.Repair.Repairer.Loop.Pause()
var testData = testrand.Bytes(8 * memory.KiB)
@ -3240,7 +3230,9 @@ func TestRepairClumpedPieces(t *testing.T) {
remotePiecesBefore := segment.Pieces
// that segment should be ignored by repair checker for now
satellite.Repair.Checker.Loop.TriggerWait()
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
injuredSegment, err := satellite.DB.RepairQueue().Select(ctx)
require.Error(t, err)
if !queue.ErrEmpty.Has(err) {
@ -3272,7 +3264,9 @@ func TestRepairClumpedPieces(t *testing.T) {
require.NoError(t, err)
// running repair checker again should put the segment into the repair queue
satellite.Repair.Checker.Loop.TriggerWait()
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
// and subsequently running the repair worker should pull that off the queue and repair it
satellite.Repair.Repairer.Loop.TriggerWait()
satellite.Repair.Repairer.WaitForPendingRepairs()

View File

@ -34,7 +34,7 @@ type Config struct {
MaxExcessRateOptimalThreshold float64 `help:"ratio applied to the optimal threshold to calculate the excess of the maximum number of repaired pieces to upload" default:"0.05"`
InMemoryRepair bool `help:"whether to download pieces for repair in memory (true) or download to disk (false)" default:"false"`
ReputationUpdateEnabled bool `help:"whether the audit score of nodes should be updated as a part of repair" default:"false"`
UseRangedLoop bool `help:"whether to use ranged loop instead of segment loop" default:"false"`
UseRangedLoop bool `help:"whether to enable repair checker observer with ranged loop" default:"true"`
}
// Service contains the information needed to run the repair service.

View File

@ -970,8 +970,8 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
# time limit for an entire repair job, from queue pop to upload completion
# repairer.total-timeout: 45m0s
# whether to use ranged loop instead of segment loop
# repairer.use-ranged-loop: false
# whether to enable repair checker observer with ranged loop
# repairer.use-ranged-loop: true
# the number of times a node has been audited to not be considered a New Node
# reputation.audit-count: 100