diff --git a/satellite/repair/checker/observer_test.go b/satellite/repair/checker/observer_test.go index 110464d1b..75bc794fb 100644 --- a/satellite/repair/checker/observer_test.go +++ b/satellite/repair/checker/observer_test.go @@ -88,14 +88,14 @@ func TestIdentifyInjuredSegmentsObserver(t *testing.T) { // check that the unhealthy, non-expired segment was added to the queue // and that the expired segment was ignored - injuredSegment, err := repairQueue.Select(ctx) + injuredSegment, err := repairQueue.Select(ctx, nil, nil) require.NoError(t, err) err = repairQueue.Delete(ctx, injuredSegment) require.NoError(t, err) require.Equal(t, b0StreamID, injuredSegment.StreamID) - _, err = repairQueue.Select(ctx) + _, err = repairQueue.Select(ctx, nil, nil) require.Error(t, err) }) } @@ -165,7 +165,7 @@ func TestIdentifyIrreparableSegmentsObserver(t *testing.T) { // check that single irreparable segment was added repair queue repairQueue := planet.Satellites[0].DB.RepairQueue() - _, err = repairQueue.Select(ctx) + _, err = repairQueue.Select(ctx, nil, nil) require.NoError(t, err) count, err := repairQueue.Count(ctx) require.NoError(t, err) @@ -244,7 +244,7 @@ func TestObserver_CheckSegmentCopy(t *testing.T) { // check that repair queue has original segment and copied one as it has exactly the same metadata for _, segment := range segmentsAfterCopy { - injuredSegment, err := repairQueue.Select(ctx) + injuredSegment, err := repairQueue.Select(ctx, nil, nil) require.NoError(t, err) require.Equal(t, segment.StreamID, injuredSegment.StreamID) } @@ -666,7 +666,7 @@ func TestObserver_PlacementCheck(t *testing.T) { planet.Satellites[0].RangedLoop.RangedLoop.Service.Loop.TriggerWait() - injuredSegment, err := repairQueue.Select(ctx) + injuredSegment, err := repairQueue.Select(ctx, nil, nil) require.NoError(t, err) err = repairQueue.Delete(ctx, injuredSegment) require.NoError(t, err) diff --git a/satellite/repair/queue/mock.go b/satellite/repair/queue/mock.go index a2e886a5d..09c7bd951 100644 --- a/satellite/repair/queue/mock.go +++ b/satellite/repair/queue/mock.go @@ -7,6 +7,7 @@ import ( "context" "time" + "storj.io/common/storj" "storj.io/common/uuid" "storj.io/storj/satellite/metabase" ) @@ -42,7 +43,7 @@ func (m *MockRepairQueue) InsertBatch(ctx context.Context, segments []*InjuredSe } // Select implements RepairQueue. -func (m *MockRepairQueue) Select(ctx context.Context) (*InjuredSegment, error) { +func (m *MockRepairQueue) Select(context.Context, []storj.PlacementConstraint, []storj.PlacementConstraint) (*InjuredSegment, error) { panic("implement me") } diff --git a/satellite/repair/queue/queue.go b/satellite/repair/queue/queue.go index 369e9134a..730c84544 100644 --- a/satellite/repair/queue/queue.go +++ b/satellite/repair/queue/queue.go @@ -36,7 +36,7 @@ type RepairQueue interface { // InsertBatch adds multiple injured segments InsertBatch(ctx context.Context, segments []*InjuredSegment) (newlyInsertedSegments []*InjuredSegment, err error) // Select gets an injured segment. - Select(ctx context.Context) (*InjuredSegment, error) + Select(ctx context.Context, includedPlacements []storj.PlacementConstraint, excludedPlacements []storj.PlacementConstraint) (*InjuredSegment, error) // Delete removes an injured segment. Delete(ctx context.Context, s *InjuredSegment) error // Clean removes all segments last updated before a certain time diff --git a/satellite/repair/queue/queue2_test.go b/satellite/repair/queue/queue2_test.go index 7b47ca5e4..8ff7c5fb4 100644 --- a/satellite/repair/queue/queue2_test.go +++ b/satellite/repair/queue/queue2_test.go @@ -42,7 +42,7 @@ func TestUntilEmpty(t *testing.T) { // select segments until no more are returned, and we should get each one exactly once for { - injuredSeg, err := repairQueue.Select(ctx) + injuredSeg, err := repairQueue.Select(ctx, nil, nil) if err != nil { require.True(t, queue.ErrEmpty.Has(err)) break @@ -91,22 +91,22 @@ func TestOrder(t *testing.T) { } // segment with attempted = null should be selected first - injuredSeg, err := repairQueue.Select(ctx) + injuredSeg, err := repairQueue.Select(ctx, nil, nil) require.NoError(t, err) assert.Equal(t, nullID, injuredSeg.StreamID) // segment with attempted = 8 hours ago should be selected next - injuredSeg, err = repairQueue.Select(ctx) + injuredSeg, err = repairQueue.Select(ctx, nil, nil) require.NoError(t, err) assert.Equal(t, olderID, injuredSeg.StreamID) // segment with attempted = 7 hours ago should be selected next - injuredSeg, err = repairQueue.Select(ctx) + injuredSeg, err = repairQueue.Select(ctx, nil, nil) require.NoError(t, err) assert.Equal(t, oldID, injuredSeg.StreamID) // segment should be considered "empty" now - injuredSeg, err = repairQueue.Select(ctx) + injuredSeg, err = repairQueue.Select(ctx, nil, nil) assert.True(t, queue.ErrEmpty.Has(err)) assert.Nil(t, injuredSeg) }) @@ -197,13 +197,13 @@ func testorderHealthyPieces(t *testing.T, connStr string) { {'g'}, {'h'}, } { - injuredSeg, err := repairQueue.Select(ctx) + injuredSeg, err := repairQueue.Select(ctx, nil, nil) require.NoError(t, err) assert.Equal(t, nextID, injuredSeg.StreamID) } // queue should be considered "empty" now - injuredSeg, err := repairQueue.Select(ctx) + injuredSeg, err := repairQueue.Select(ctx, nil, nil) assert.True(t, queue.ErrEmpty.Has(err)) assert.Nil(t, injuredSeg) } @@ -247,13 +247,13 @@ func TestOrderOverwrite(t *testing.T) { segmentA, segmentB, } { - injuredSeg, err := repairQueue.Select(ctx) + injuredSeg, err := repairQueue.Select(ctx, nil, nil) require.NoError(t, err) assert.Equal(t, nextStreamID, injuredSeg.StreamID) } // queue should be considered "empty" now - injuredSeg, err := repairQueue.Select(ctx) + injuredSeg, err := repairQueue.Select(ctx, nil, nil) assert.True(t, queue.ErrEmpty.Has(err)) assert.Nil(t, injuredSeg) }) diff --git a/satellite/repair/queue/queue_test.go b/satellite/repair/queue/queue_test.go index d499c89d9..2739fe426 100644 --- a/satellite/repair/queue/queue_test.go +++ b/satellite/repair/queue/queue_test.go @@ -29,7 +29,7 @@ func TestInsertSelect(t *testing.T) { alreadyInserted, err := q.Insert(ctx, seg) require.NoError(t, err) require.False(t, alreadyInserted) - s, err := q.Select(ctx) + s, err := q.Select(ctx, nil, nil) require.NoError(t, err) err = q.Delete(ctx, s) require.NoError(t, err) @@ -133,7 +133,7 @@ func TestDequeueEmptyQueue(t *testing.T) { satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { q := db.RepairQueue() - _, err := q.Select(ctx) + _, err := q.Select(ctx, nil, nil) require.Error(t, err) require.True(t, queue.ErrEmpty.Has(err), "error should of class EmptyQueue") }) @@ -161,7 +161,7 @@ func TestSequential(t *testing.T) { require.Len(t, list, N) for i := 0; i < N; i++ { - s, err := q.Select(ctx) + s, err := q.Select(ctx, nil, nil) require.NoError(t, err) err = q.Delete(ctx, s) require.NoError(t, err) @@ -203,7 +203,7 @@ func TestParallel(t *testing.T) { var remove errs2.Group for i := 0; i < N; i++ { remove.Go(func() error { - s, err := q.Select(ctx) + s, err := q.Select(ctx, nil, nil) if err != nil { return err } diff --git a/satellite/repair/repair_test.go b/satellite/repair/repair_test.go index 26f236e97..e606d90ef 100644 --- a/satellite/repair/repair_test.go +++ b/satellite/repair/repair_test.go @@ -3257,7 +3257,7 @@ func TestRepairClumpedPieces(t *testing.T) { _, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx) require.NoError(t, err) - injuredSegment, err := satellite.DB.RepairQueue().Select(ctx) + injuredSegment, err := satellite.DB.RepairQueue().Select(ctx, nil, nil) require.Error(t, err) if !queue.ErrEmpty.Has(err) { require.FailNow(t, "Should get ErrEmptyQueue, but got", err) diff --git a/satellite/repair/repairer/repairer.go b/satellite/repair/repairer/repairer.go index 53b700ed3..074a456af 100644 --- a/satellite/repair/repairer/repairer.go +++ b/satellite/repair/repairer/repairer.go @@ -138,7 +138,7 @@ func (service *Service) process(ctx context.Context) (err error) { // return from service.Run when queue fetch fails. ctx, cancel := context.WithTimeout(ctx, service.config.TotalTimeout) - seg, err := service.queue.Select(ctx) + seg, err := service.queue.Select(ctx, nil, nil) if err != nil { service.JobLimiter.Release(1) cancel() diff --git a/satellite/satellitedb/repairqueue.go b/satellite/satellitedb/repairqueue.go index dd45e7fd1..0305aee9b 100644 --- a/satellite/satellitedb/repairqueue.go +++ b/satellite/satellitedb/repairqueue.go @@ -7,10 +7,13 @@ import ( "context" "database/sql" "errors" + "fmt" + "strings" "time" "github.com/zeebo/errs" + "storj.io/common/storj" "storj.io/common/uuid" "storj.io/private/dbutil" "storj.io/private/dbutil/pgutil" @@ -199,15 +202,31 @@ func (r *repairQueue) InsertBatch( return newlyInsertedSegments, rows.Err() } -func (r *repairQueue) Select(ctx context.Context) (seg *queue.InjuredSegment, err error) { +func (r *repairQueue) Select(ctx context.Context, includedPlacements []storj.PlacementConstraint, excludedPlacements []storj.PlacementConstraint) (seg *queue.InjuredSegment, err error) { defer mon.Task()(&ctx)(&err) + restriction := "" + + placementsToString := func(placements []storj.PlacementConstraint) string { + var ps []string + for _, p := range placements { + ps = append(ps, fmt.Sprintf("%d", p)) + } + return strings.Join(ps, ",") + } + if len(includedPlacements) > 0 { + restriction += fmt.Sprintf(" AND placement IN (%s)", placementsToString(includedPlacements)) + } + + if len(excludedPlacements) > 0 { + restriction += fmt.Sprintf(" AND placement NOT IN (%s)", placementsToString(excludedPlacements)) + } segment := queue.InjuredSegment{} switch r.db.impl { case dbutil.Cockroach: err = r.db.QueryRowContext(ctx, ` UPDATE repair_queue SET attempted_at = now() - WHERE attempted_at IS NULL OR attempted_at < now() - interval '6 hours' + WHERE (attempted_at IS NULL OR attempted_at < now() - interval '6 hours') `+restriction+` ORDER BY segment_health ASC, attempted_at NULLS FIRST LIMIT 1 RETURNING stream_id, position, attempted_at, updated_at, inserted_at, segment_health, placement @@ -217,7 +236,7 @@ func (r *repairQueue) Select(ctx context.Context) (seg *queue.InjuredSegment, er err = r.db.QueryRowContext(ctx, ` UPDATE repair_queue SET attempted_at = now() WHERE (stream_id, position) = ( SELECT stream_id, position FROM repair_queue - WHERE attempted_at IS NULL OR attempted_at < now() - interval '6 hours' + WHERE (attempted_at IS NULL OR attempted_at < now() - interval '6 hours') `+restriction+` ORDER BY segment_health ASC, attempted_at NULLS FIRST FOR UPDATE SKIP LOCKED LIMIT 1 ) RETURNING stream_id, position, attempted_at, updated_at, inserted_at, segment_health, placement `).Scan(&segment.StreamID, &segment.Position, &segment.AttemptedAt, diff --git a/satellite/satellitedb/repairqueue_test.go b/satellite/satellitedb/repairqueue_test.go index e17e2e4c8..ca5afd964 100644 --- a/satellite/satellitedb/repairqueue_test.go +++ b/satellite/satellitedb/repairqueue_test.go @@ -56,7 +56,7 @@ func TestRepairQueue(t *testing.T) { require.NoError(t, err) require.True(t, alreadyInserted) - rs1, err := rq.Select(ctx) + rs1, err := rq.Select(ctx, nil, nil) require.NoError(t, err) require.Equal(t, testSegments[0].StreamID, rs1.StreamID) require.Equal(t, storj.PlacementConstraint(99), rs1.Placement) @@ -64,7 +64,7 @@ func TestRepairQueue(t *testing.T) { require.Equal(t, float64(12), rs1.SegmentHealth) // empty queue (one record, but that's already attempted) - _, err = rq.Select(ctx) + _, err = rq.Select(ctx, nil, nil) require.Error(t, err) // make sure it's really empty @@ -95,3 +95,55 @@ func TestRepairQueue(t *testing.T) { }) } + +func TestRepairQueue_PlacementRestrictions(t *testing.T) { + testSegments := make([]*queue.InjuredSegment, 40) + for i := 0; i < len(testSegments); i++ { + testSegments[i] = &queue.InjuredSegment{ + StreamID: testrand.UUID(), + Position: metabase.SegmentPosition{ + Part: uint32(i), + Index: 2, + }, + SegmentHealth: 10, + Placement: storj.PlacementConstraint(i % 10), + } + } + + satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { + rq := db.RepairQueue() + + for i := 0; i < len(testSegments); i++ { + inserted, err := rq.Insert(ctx, testSegments[i]) + require.NoError(t, err) + require.False(t, inserted) + } + + // any random segment + _, err := rq.Select(ctx, nil, nil) + require.NoError(t, err) + + for i := 0; i < 2; i++ { + // placement constraint + selected, err := rq.Select(ctx, []storj.PlacementConstraint{1, 2}, nil) + require.NoError(t, err) + require.True(t, selected.Placement == 1 || selected.Placement == 2, "Expected placement 1 or 2 but was %d", selected.Placement) + + selected, err = rq.Select(ctx, []storj.PlacementConstraint{3, 4}, []storj.PlacementConstraint{3}) + require.NoError(t, err) + require.Equal(t, storj.PlacementConstraint(4), selected.Placement) + + selected, err = rq.Select(ctx, nil, []storj.PlacementConstraint{0, 1, 2, 3, 4}) + require.NoError(t, err) + require.True(t, selected.Placement > 4) + + selected, err = rq.Select(ctx, []storj.PlacementConstraint{9}, []storj.PlacementConstraint{1, 2, 3, 4}) + require.NoError(t, err) + require.Equal(t, storj.PlacementConstraint(9), selected.Placement) + + _, err = rq.Select(ctx, []storj.PlacementConstraint{11}, nil) + require.Error(t, err) + } + + }) +}