satellite/repair/checker: move checker to segment loop
Change-Id: I04b25e4fa14c822c9524586c25bde89db2a6cad9
This commit is contained in:
parent
8686267e06
commit
b900f6b4f9
@ -272,7 +272,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
|
||||
peer.Log.Named("repair:checker"),
|
||||
peer.DB.RepairQueue(),
|
||||
peer.Metainfo.Metabase,
|
||||
peer.Metainfo.Loop,
|
||||
peer.Metainfo.SegmentLoop,
|
||||
peer.Overlay.Service,
|
||||
config.Checker)
|
||||
peer.Services.Add(lifecycle.Item{
|
||||
|
@ -4,6 +4,7 @@
|
||||
package checker
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"time"
|
||||
|
||||
@ -15,8 +16,9 @@ import (
|
||||
"storj.io/common/pb"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/sync2"
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
"storj.io/storj/satellite/metabase/metaloop"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
"storj.io/storj/satellite/repair"
|
||||
"storj.io/storj/satellite/repair/queue"
|
||||
@ -35,7 +37,7 @@ type Checker struct {
|
||||
logger *zap.Logger
|
||||
repairQueue queue.RepairQueue
|
||||
metabase *metabase.DB
|
||||
metaLoop *metaloop.Service
|
||||
segmentLoop *segmentloop.Service
|
||||
nodestate *ReliabilityCache
|
||||
statsCollector *statsCollector
|
||||
repairOverrides RepairOverridesMap
|
||||
@ -44,13 +46,13 @@ type Checker struct {
|
||||
}
|
||||
|
||||
// NewChecker creates a new instance of checker.
|
||||
func NewChecker(logger *zap.Logger, repairQueue queue.RepairQueue, metabase *metabase.DB, metaLoop *metaloop.Service, overlay *overlay.Service, config Config) *Checker {
|
||||
func NewChecker(logger *zap.Logger, repairQueue queue.RepairQueue, metabase *metabase.DB, segmentLoop *segmentloop.Service, overlay *overlay.Service, config Config) *Checker {
|
||||
return &Checker{
|
||||
logger: logger,
|
||||
|
||||
repairQueue: repairQueue,
|
||||
metabase: metabase,
|
||||
metaLoop: metaLoop,
|
||||
segmentLoop: segmentLoop,
|
||||
nodestate: NewReliabilityCache(overlay, config.ReliabilityCacheStaleness),
|
||||
statsCollector: newStatsCollector(),
|
||||
repairOverrides: config.RepairOverrides.GetMap(),
|
||||
@ -116,7 +118,7 @@ func (checker *Checker) IdentifyInjuredSegments(ctx context.Context) (err error)
|
||||
getNodesEstimate: checker.getNodesEstimate,
|
||||
log: checker.logger,
|
||||
}
|
||||
err = checker.metaLoop.Join(ctx, observer)
|
||||
err = checker.segmentLoop.Join(ctx, observer)
|
||||
if err != nil {
|
||||
if !errs2.IsCanceled(err) {
|
||||
checker.logger.Error("IdentifyInjuredSegments error", zap.Error(err))
|
||||
@ -138,7 +140,7 @@ func (checker *Checker) IdentifyInjuredSegments(ctx context.Context) (err error)
|
||||
mon.IntVal("remote_segments_needing_repair").Observe(observer.monStats.remoteSegmentsNeedingRepair) //mon:locked
|
||||
mon.IntVal("new_remote_segments_needing_repair").Observe(observer.monStats.newRemoteSegmentsNeedingRepair) //mon:locked
|
||||
mon.IntVal("remote_segments_lost").Observe(observer.monStats.remoteSegmentsLost) //mon:locked
|
||||
mon.IntVal("remote_files_lost").Observe(int64(len(observer.monStats.remoteSegmentInfo))) //mon:locked
|
||||
mon.IntVal("remote_files_lost").Observe(int64(len(observer.monStats.objectsLost))) //mon:locked
|
||||
mon.IntVal("remote_segments_over_threshold_1").Observe(observer.monStats.remoteSegmentsOverThreshold[0]) //mon:locked
|
||||
mon.IntVal("remote_segments_over_threshold_2").Observe(observer.monStats.remoteSegmentsOverThreshold[1]) //mon:locked
|
||||
mon.IntVal("remote_segments_over_threshold_3").Observe(observer.monStats.remoteSegmentsOverThreshold[2]) //mon:locked
|
||||
@ -154,7 +156,7 @@ func (checker *Checker) IdentifyInjuredSegments(ctx context.Context) (err error)
|
||||
return nil
|
||||
}
|
||||
|
||||
var _ metaloop.Observer = (*checkerObserver)(nil)
|
||||
var _ segmentloop.Observer = (*checkerObserver)(nil)
|
||||
|
||||
// checkerObserver implements the metainfo loop Observer interface.
|
||||
//
|
||||
@ -169,14 +171,13 @@ type checkerObserver struct {
|
||||
getNodesEstimate func(ctx context.Context) (int, error)
|
||||
log *zap.Logger
|
||||
|
||||
// we need to delay counting objects to ensure they get associated with the correct redundancy only once
|
||||
objectCounted bool
|
||||
lastStreamID uuid.UUID
|
||||
}
|
||||
|
||||
// checks for a object location in slice.
|
||||
func containsObjectLocation(a []metabase.ObjectLocation, x metabase.ObjectLocation) bool {
|
||||
// checks for a stream id in slice.
|
||||
func containsStreamID(a []uuid.UUID, x uuid.UUID) bool {
|
||||
for _, n := range a {
|
||||
if x == n {
|
||||
if bytes.Equal(x[:], n[:]) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
@ -198,11 +199,11 @@ func (obs *checkerObserver) loadRedundancy(redundancy storj.RedundancyScheme) (i
|
||||
}
|
||||
|
||||
// LoopStarted is called at each start of a loop.
|
||||
func (obs *checkerObserver) LoopStarted(context.Context, metaloop.LoopInfo) (err error) {
|
||||
func (obs *checkerObserver) LoopStarted(context.Context, segmentloop.LoopInfo) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metaloop.Segment) (err error) {
|
||||
func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *segmentloop.Segment) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
// ignore segment if expired
|
||||
@ -212,8 +213,8 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metaloop
|
||||
|
||||
stats := obs.getStatsByRS(segment.Redundancy)
|
||||
|
||||
if !obs.objectCounted {
|
||||
obs.objectCounted = true
|
||||
if obs.lastStreamID.Compare(segment.StreamID) != 0 {
|
||||
obs.lastStreamID = segment.StreamID
|
||||
stats.iterationAggregates.objectsChecked++
|
||||
}
|
||||
|
||||
@ -300,12 +301,11 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metaloop
|
||||
|
||||
// monitor irreperable segments
|
||||
if numHealthy < required {
|
||||
lostSegInfo := segment.Location.Object()
|
||||
if !containsObjectLocation(obs.monStats.remoteSegmentInfo, lostSegInfo) {
|
||||
obs.monStats.remoteSegmentInfo = append(obs.monStats.remoteSegmentInfo, lostSegInfo)
|
||||
if !containsStreamID(obs.monStats.objectsLost, segment.StreamID) {
|
||||
obs.monStats.objectsLost = append(obs.monStats.objectsLost, segment.StreamID)
|
||||
}
|
||||
if !containsObjectLocation(stats.iterationAggregates.remoteSegmentInfo, lostSegInfo) {
|
||||
stats.iterationAggregates.remoteSegmentInfo = append(stats.iterationAggregates.remoteSegmentInfo, lostSegInfo)
|
||||
if !containsStreamID(stats.iterationAggregates.objectsLost, segment.StreamID) {
|
||||
stats.iterationAggregates.objectsLost = append(stats.iterationAggregates.objectsLost, segment.StreamID)
|
||||
}
|
||||
|
||||
var segmentAge time.Duration
|
||||
@ -349,34 +349,6 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metaloop
|
||||
return nil
|
||||
}
|
||||
|
||||
func (obs *checkerObserver) Object(ctx context.Context, object *metaloop.Object) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
obs.monStats.objectsChecked++
|
||||
|
||||
// TODO: check for expired objects
|
||||
|
||||
if object.SegmentCount == 0 {
|
||||
stats := obs.getStatsByRS(storj.RedundancyScheme{})
|
||||
stats.iterationAggregates.objectsChecked++
|
||||
return nil
|
||||
}
|
||||
obs.objectCounted = false
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (obs *checkerObserver) InlineSegment(ctx context.Context, segment *metaloop.Segment) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
// TODO: check for expired segments
|
||||
|
||||
if !obs.objectCounted {
|
||||
// Note: this may give false stats when an object starts with a inline segment.
|
||||
obs.objectCounted = true
|
||||
stats := obs.getStatsByRS(storj.RedundancyScheme{})
|
||||
stats.iterationAggregates.objectsChecked++
|
||||
}
|
||||
|
||||
func (obs *checkerObserver) InlineSegment(ctx context.Context, segment *segmentloop.Segment) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
@ -49,21 +49,22 @@ func TestIdentifyInjuredSegments(t *testing.T) {
|
||||
// add some valid pointers
|
||||
for x := 0; x < 10; x++ {
|
||||
expectedLocation.ObjectKey = metabase.ObjectKey(fmt.Sprintf("a-%d", x))
|
||||
insertSegment(ctx, t, planet, rs, expectedLocation, createPieces(planet, rs), time.Time{})
|
||||
insertSegment(ctx, t, planet, rs, expectedLocation, createPieces(planet, rs), nil)
|
||||
}
|
||||
|
||||
// add pointer that needs repair
|
||||
expectedLocation.ObjectKey = metabase.ObjectKey("b-0")
|
||||
b0StreamID := insertSegment(ctx, t, planet, rs, expectedLocation, createLostPieces(planet, rs), time.Time{})
|
||||
b0StreamID := insertSegment(ctx, t, planet, rs, expectedLocation, createLostPieces(planet, rs), nil)
|
||||
|
||||
// add pointer that is unhealthy, but is expired
|
||||
expectedLocation.ObjectKey = metabase.ObjectKey("b-1")
|
||||
insertSegment(ctx, t, planet, rs, expectedLocation, createLostPieces(planet, rs), time.Now().Add(-time.Hour))
|
||||
expiresAt := time.Now().Add(-time.Hour)
|
||||
insertSegment(ctx, t, planet, rs, expectedLocation, createLostPieces(planet, rs), &expiresAt)
|
||||
|
||||
// add some valid pointers
|
||||
for x := 0; x < 10; x++ {
|
||||
expectedLocation.ObjectKey = metabase.ObjectKey(fmt.Sprintf("c-%d", x))
|
||||
insertSegment(ctx, t, planet, rs, expectedLocation, createPieces(planet, rs), time.Time{})
|
||||
insertSegment(ctx, t, planet, rs, expectedLocation, createPieces(planet, rs), nil)
|
||||
}
|
||||
|
||||
checker.Loop.TriggerWait()
|
||||
@ -130,10 +131,11 @@ func TestIdentifyIrreparableSegments(t *testing.T) {
|
||||
// the piece is considered irreparable but also will be put into repair queue
|
||||
|
||||
expectedLocation.ObjectKey = "piece"
|
||||
insertSegment(ctx, t, planet, rs, expectedLocation, pieces, time.Time{})
|
||||
insertSegment(ctx, t, planet, rs, expectedLocation, pieces, nil)
|
||||
|
||||
expectedLocation.ObjectKey = "piece-expired"
|
||||
insertSegment(ctx, t, planet, rs, expectedLocation, pieces, time.Now().Add(-time.Hour))
|
||||
expiresAt := time.Now().Add(-time.Hour)
|
||||
insertSegment(ctx, t, planet, rs, expectedLocation, pieces, &expiresAt)
|
||||
|
||||
err = checker.IdentifyInjuredSegments(ctx)
|
||||
require.NoError(t, err)
|
||||
@ -196,13 +198,13 @@ func TestCleanRepairQueue(t *testing.T) {
|
||||
healthyCount := 5
|
||||
for i := 0; i < healthyCount; i++ {
|
||||
expectedLocation.ObjectKey = metabase.ObjectKey(fmt.Sprintf("healthy-%d", i))
|
||||
insertSegment(ctx, t, planet, rs, expectedLocation, createPieces(planet, rs), time.Time{})
|
||||
insertSegment(ctx, t, planet, rs, expectedLocation, createPieces(planet, rs), nil)
|
||||
}
|
||||
unhealthyCount := 5
|
||||
unhealthyIDs := make(map[uuid.UUID]struct{})
|
||||
for i := 0; i < unhealthyCount; i++ {
|
||||
expectedLocation.ObjectKey = metabase.ObjectKey(fmt.Sprintf("unhealthy-%d", i))
|
||||
unhealthyStreamID := insertSegment(ctx, t, planet, rs, expectedLocation, createLostPieces(planet, rs), time.Time{})
|
||||
unhealthyStreamID := insertSegment(ctx, t, planet, rs, expectedLocation, createLostPieces(planet, rs), nil)
|
||||
unhealthyIDs[unhealthyStreamID] = struct{}{}
|
||||
}
|
||||
|
||||
@ -281,12 +283,7 @@ func createLostPieces(planet *testplanet.Planet, rs storj.RedundancyScheme) meta
|
||||
return pieces
|
||||
}
|
||||
|
||||
func insertSegment(ctx context.Context, t *testing.T, planet *testplanet.Planet, rs storj.RedundancyScheme, location metabase.SegmentLocation, pieces metabase.Pieces, expire time.Time) uuid.UUID {
|
||||
var expiresAt *time.Time
|
||||
if !expire.IsZero() {
|
||||
expiresAt = &expire
|
||||
}
|
||||
|
||||
func insertSegment(ctx context.Context, t *testing.T, planet *testplanet.Planet, rs storj.RedundancyScheme, location metabase.SegmentLocation, pieces metabase.Pieces, expiresAt *time.Time) uuid.UUID {
|
||||
metabaseDB := planet.Satellites[0].Metainfo.Metabase
|
||||
|
||||
obj := metabase.ObjectStream{
|
||||
@ -324,6 +321,7 @@ func insertSegment(ctx context.Context, t *testing.T, planet *testplanet.Planet,
|
||||
PlainSize: 1,
|
||||
EncryptedSize: 1,
|
||||
Redundancy: rs,
|
||||
ExpiresAt: expiresAt,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -8,7 +8,7 @@ import (
|
||||
|
||||
"github.com/spacemonkeygo/monkit/v3"
|
||||
|
||||
"storj.io/storj/satellite/metabase"
|
||||
"storj.io/common/uuid"
|
||||
)
|
||||
|
||||
// statsCollector holds a *stats for each redundancy scheme
|
||||
@ -84,7 +84,7 @@ type aggregateStats struct {
|
||||
newRemoteSegmentsNeedingRepair int64
|
||||
remoteSegmentsLost int64
|
||||
remoteSegmentsFailedToCheck int64
|
||||
remoteSegmentInfo []metabase.ObjectLocation
|
||||
objectsLost []uuid.UUID
|
||||
|
||||
// remoteSegmentsOverThreshold[0]=# of healthy=rt+1, remoteSegmentsOverThreshold[1]=# of healthy=rt+2, etc...
|
||||
remoteSegmentsOverThreshold [5]int64
|
||||
@ -122,7 +122,7 @@ func (stats *stats) collectAggregates() {
|
||||
stats.remoteSegmentsNeedingRepair.Observe(stats.iterationAggregates.remoteSegmentsNeedingRepair)
|
||||
stats.newRemoteSegmentsNeedingRepair.Observe(stats.iterationAggregates.newRemoteSegmentsNeedingRepair)
|
||||
stats.remoteSegmentsLost.Observe(stats.iterationAggregates.remoteSegmentsLost)
|
||||
stats.objectsLost.Observe(int64(len(stats.iterationAggregates.remoteSegmentInfo)))
|
||||
stats.objectsLost.Observe(int64(len(stats.iterationAggregates.objectsLost)))
|
||||
stats.remoteSegmentsFailedToCheck.Observe(stats.iterationAggregates.remoteSegmentsFailedToCheck)
|
||||
stats.remoteSegmentsOverThreshold1.Observe(stats.iterationAggregates.remoteSegmentsOverThreshold[0])
|
||||
stats.remoteSegmentsOverThreshold2.Observe(stats.iterationAggregates.remoteSegmentsOverThreshold[1])
|
||||
|
Loading…
Reference in New Issue
Block a user