diff --git a/monkit.lock b/monkit.lock index bd3c98da8..76cbc2315 100644 --- a/monkit.lock +++ b/monkit.lock @@ -58,6 +58,7 @@ storj.io/storj/satellite/repair/checker."checker_segment_age" IntVal storj.io/storj/satellite/repair/checker."checker_segment_healthy_count" IntVal storj.io/storj/satellite/repair/checker."checker_segment_time_until_irreparable" IntVal storj.io/storj/satellite/repair/checker."checker_segment_total_count" IntVal +storj.io/storj/satellite/repair/checker."healthy_segments_removed_from_queue" IntVal storj.io/storj/satellite/repair/checker."new_remote_segments_needing_repair" IntVal storj.io/storj/satellite/repair/checker."remote_files_checked" IntVal storj.io/storj/satellite/repair/checker."remote_files_lost" IntVal diff --git a/satellite/repair/checker/checker.go b/satellite/repair/checker/checker.go index 6c3b1cec8..e51c6bd21 100644 --- a/satellite/repair/checker/checker.go +++ b/satellite/repair/checker/checker.go @@ -114,6 +114,8 @@ func (checker *Checker) Close() error { func (checker *Checker) IdentifyInjuredSegments(ctx context.Context) (err error) { defer mon.Task()(&ctx)(&err) + startTime := time.Now() + observer := &checkerObserver{ repairQueue: checker.repairQueue, irrdb: checker.irrdb, @@ -130,6 +132,12 @@ func (checker *Checker) IdentifyInjuredSegments(ctx context.Context) (err error) return err } + // remove all segments which were not seen as unhealthy by this checker iteration + healthyDeleted, err := checker.repairQueue.Clean(ctx, startTime) + if err != nil { + return Error.Wrap(err) + } + mon.IntVal("remote_files_checked").Observe(observer.monStats.objectsChecked) //locked mon.IntVal("remote_segments_checked").Observe(observer.monStats.remoteSegmentsChecked) //locked mon.IntVal("remote_segments_failed_to_check").Observe(observer.monStats.remoteSegmentsFailedToCheck) //locked @@ -142,6 +150,7 @@ func (checker *Checker) IdentifyInjuredSegments(ctx context.Context) (err error) mon.IntVal("remote_segments_over_threshold_3").Observe(observer.monStats.remoteSegmentsOverThreshold[2]) //locked mon.IntVal("remote_segments_over_threshold_4").Observe(observer.monStats.remoteSegmentsOverThreshold[3]) //locked mon.IntVal("remote_segments_over_threshold_5").Observe(observer.monStats.remoteSegmentsOverThreshold[4]) //locked + mon.IntVal("healthy_segments_removed_from_queue").Observe(healthyDeleted) //locked allUnhealthy := observer.monStats.remoteSegmentsNeedingRepair + observer.monStats.remoteSegmentsFailedToCheck allChecked := observer.monStats.remoteSegmentsChecked diff --git a/satellite/repair/checker/checker_test.go b/satellite/repair/checker/checker_test.go index f398bc855..721a1df68 100644 --- a/satellite/repair/checker/checker_test.go +++ b/satellite/repair/checker/checker_test.go @@ -4,6 +4,7 @@ package checker_test import ( + "bytes" "context" "fmt" "testing" @@ -208,6 +209,81 @@ func TestIdentifyIrreparableSegments(t *testing.T) { }) } +func TestCleanRepairQueue(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 0, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + checker := planet.Satellites[0].Repair.Checker + repairQueue := planet.Satellites[0].DB.RepairQueue() + + checker.Loop.Pause() + planet.Satellites[0].Repair.Repairer.Loop.Pause() + + rs := &pb.RedundancyScheme{ + MinReq: int32(2), + RepairThreshold: int32(3), + SuccessThreshold: int32(4), + Total: int32(4), + ErasureShareSize: int32(256), + } + + projectID := testrand.UUID() + pointerPathPrefix := storj.JoinPaths(projectID.String(), "l", "bucket") + "/" + + healthyCount := 5 + for i := 0; i < healthyCount; i++ { + insertPointer(ctx, t, planet, rs, pointerPathPrefix+fmt.Sprintf("healthy-%d", i), false, time.Time{}) + } + unhealthyCount := 5 + for i := 0; i < unhealthyCount; i++ { + insertPointer(ctx, t, planet, rs, pointerPathPrefix+fmt.Sprintf("unhealthy-%d", i), true, time.Time{}) + } + + // suspend enough nodes to make healthy pointers unhealthy + for i := rs.MinReq; i < rs.SuccessThreshold; i++ { + require.NoError(t, planet.Satellites[0].Overlay.DB.SuspendNodeUnknownAudit(ctx, planet.StorageNodes[i].ID(), time.Now())) + } + + require.NoError(t, planet.Satellites[0].Repair.Checker.RefreshReliabilityCache(ctx)) + + // check that repair queue is empty to avoid false positive + count, err := repairQueue.Count(ctx) + require.NoError(t, err) + require.Equal(t, 0, count) + + checker.Loop.TriggerWait() + + // check that the pointers were put into the repair queue + // and not cleaned up at the end of the checker iteration + count, err = repairQueue.Count(ctx) + require.NoError(t, err) + require.Equal(t, healthyCount+unhealthyCount, count) + + // unsuspend nodes to make the previously healthy pointers healthy again + for i := rs.MinReq; i < rs.SuccessThreshold; i++ { + require.NoError(t, planet.Satellites[0].Overlay.DB.UnsuspendNodeUnknownAudit(ctx, planet.StorageNodes[i].ID())) + } + + require.NoError(t, planet.Satellites[0].Repair.Checker.RefreshReliabilityCache(ctx)) + + // The checker will not insert/update the now healthy segments causing + // them to be removed from the queue at the end of the checker iteration + checker.Loop.TriggerWait() + + // only unhealthy segments should remain + count, err = repairQueue.Count(ctx) + require.NoError(t, err) + require.Equal(t, unhealthyCount, count) + + segs, err := repairQueue.SelectN(ctx, count) + require.NoError(t, err) + + for _, s := range segs { + require.True(t, bytes.Contains(s.GetPath(), []byte("unhealthy"))) + } + }) +} + func insertPointer(ctx context.Context, t *testing.T, planet *testplanet.Planet, rs *pb.RedundancyScheme, pointerPath string, createLost bool, expire time.Time) { pieces := make([]*pb.RemotePiece, rs.SuccessThreshold) if !createLost { diff --git a/satellite/repair/queue/queue.go b/satellite/repair/queue/queue.go index e4ebf7df6..3119084fa 100644 --- a/satellite/repair/queue/queue.go +++ b/satellite/repair/queue/queue.go @@ -5,6 +5,7 @@ package queue import ( "context" + "time" "storj.io/common/pb" ) @@ -20,6 +21,8 @@ type RepairQueue interface { Select(ctx context.Context) (*pb.InjuredSegment, error) // Delete removes an injured segment. Delete(ctx context.Context, s *pb.InjuredSegment) error + // Clean removes all segments last updated before a certain time + Clean(ctx context.Context, before time.Time) (deleted int64, err error) // SelectN lists limit amount of injured segments. SelectN(ctx context.Context, limit int) ([]pb.InjuredSegment, error) // Count counts the number of segments in the repair queue. diff --git a/satellite/repair/queue/queue_test.go b/satellite/repair/queue/queue_test.go index 1a52cef11..e25248cab 100644 --- a/satellite/repair/queue/queue_test.go +++ b/satellite/repair/queue/queue_test.go @@ -7,6 +7,7 @@ import ( "sort" "strconv" "testing" + "time" "github.com/stretchr/testify/require" @@ -160,3 +161,77 @@ func TestParallel(t *testing.T) { } }) } + +func TestClean(t *testing.T) { + satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { + q := db.RepairQueue() + + seg1 := &pb.InjuredSegment{ + Path: []byte("seg1"), + LostPieces: []int32{int32(1), int32(3)}, + } + seg2 := &pb.InjuredSegment{ + Path: []byte("seg2"), + LostPieces: []int32{int32(1), int32(3)}, + } + seg3 := &pb.InjuredSegment{ + Path: []byte("seg3"), + LostPieces: []int32{int32(1), int32(3)}, + } + + timeBeforeInsert1 := time.Now() + + numHealthy := 10 + _, err := q.Insert(ctx, seg1, numHealthy) + require.NoError(t, err) + + _, err = q.Insert(ctx, seg2, numHealthy) + require.NoError(t, err) + + _, err = q.Insert(ctx, seg3, numHealthy) + require.NoError(t, err) + + count, err := q.Count(ctx) + require.NoError(t, err) + require.Equal(t, 3, count) + + d, err := q.Clean(ctx, timeBeforeInsert1) + require.NoError(t, err) + require.Equal(t, int64(0), d) + + count, err = q.Count(ctx) + require.NoError(t, err) + require.Equal(t, 3, count) + + timeBeforeInsert2 := time.Now() + + // seg1 "becomes healthy", so do not update it + // seg2 stays at the same health + _, err = q.Insert(ctx, seg2, numHealthy) + require.NoError(t, err) + + // seg3 has a lower health + _, err = q.Insert(ctx, seg3, numHealthy-1) + require.NoError(t, err) + + count, err = q.Count(ctx) + require.NoError(t, err) + require.Equal(t, 3, count) + + d, err = q.Clean(ctx, timeBeforeInsert2) + require.NoError(t, err) + require.Equal(t, int64(1), d) + + count, err = q.Count(ctx) + require.NoError(t, err) + require.Equal(t, 2, count) + + d, err = q.Clean(ctx, time.Now()) + require.NoError(t, err) + require.Equal(t, int64(2), d) + + count, err = q.Count(ctx) + require.NoError(t, err) + require.Equal(t, 0, count) + }) +} diff --git a/satellite/satellitedb/repairqueue.go b/satellite/satellitedb/repairqueue.go index 5794160c6..4c44fa21d 100644 --- a/satellite/satellitedb/repairqueue.go +++ b/satellite/satellitedb/repairqueue.go @@ -7,11 +7,13 @@ import ( "context" "database/sql" "errors" + "time" "github.com/zeebo/errs" "storj.io/common/pb" "storj.io/storj/private/dbutil" + "storj.io/storj/satellite/satellitedb/dbx" "storj.io/storj/storage" ) @@ -107,6 +109,12 @@ func (r *repairQueue) Delete(ctx context.Context, seg *pb.InjuredSegment) (err e return Error.Wrap(err) } +func (r *repairQueue) Clean(ctx context.Context, before time.Time) (deleted int64, err error) { + defer mon.Task()(&ctx)(&err) + n, err := r.db.Delete_Injuredsegment_By_UpdatedAt_Less(ctx, dbx.Injuredsegment_UpdatedAt(before)) + return n, Error.Wrap(err) +} + func (r *repairQueue) SelectN(ctx context.Context, limit int) (segs []pb.InjuredSegment, err error) { defer mon.Task()(&ctx)(&err) if limit <= 0 || limit > RepairQueueSelectLimit {