diff --git a/satellite/repair/queue/mock.go b/satellite/repair/queue/mock.go index 09c7bd951..cd5f58cf7 100644 --- a/satellite/repair/queue/mock.go +++ b/satellite/repair/queue/mock.go @@ -72,4 +72,9 @@ func (m *MockRepairQueue) TestingSetAttemptedTime(ctx context.Context, streamID panic("implement me") } +// Stat implements RepairQueue. +func (m *MockRepairQueue) Stat(ctx context.Context) ([]Stat, error) { + panic("implement me") +} + var _ RepairQueue = &MockRepairQueue{} diff --git a/satellite/repair/queue/queue.go b/satellite/repair/queue/queue.go index 730c84544..b4bb65a10 100644 --- a/satellite/repair/queue/queue.go +++ b/satellite/repair/queue/queue.go @@ -26,6 +26,18 @@ type InjuredSegment struct { Placement storj.PlacementConstraint } +// Stat contains information about a segment of repair queue. +type Stat struct { + Count int + Placement storj.PlacementConstraint + MaxInsertedAt time.Time + MinInsertedAt time.Time + MaxAttemptedAt *time.Time + MinAttemptedAt *time.Time + MinSegmentHealth float64 + MaxSegmentHealth float64 +} + // RepairQueue implements queueing for segments that need repairing. // Implementation can be found at satellite/satellitedb/repairqueue.go. // @@ -46,6 +58,9 @@ type RepairQueue interface { // Count counts the number of segments in the repair queue. Count(ctx context.Context) (count int, err error) + // Stat returns stat of the current queue state. + Stat(ctx context.Context) ([]Stat, error) + // TestingSetAttemptedTime sets attempted time for a segment. TestingSetAttemptedTime(ctx context.Context, streamID uuid.UUID, position metabase.SegmentPosition, t time.Time) (rowsAffected int64, err error) } diff --git a/satellite/satellitedb/repairqueue.go b/satellite/satellitedb/repairqueue.go index b3d4e7028..eb7c6bf59 100644 --- a/satellite/satellitedb/repairqueue.go +++ b/satellite/satellitedb/repairqueue.go @@ -30,6 +30,47 @@ type repairQueue struct { db *satelliteDB } +// Stat returns stat of the current queue state. +func (r *repairQueue) Stat(ctx context.Context) ([]queue.Stat, error) { + query := ` + select placement, + count(1), + max(inserted_at) as max_inserted_at, + min(inserted_at) as min_inserted_at, + max(attempted_at) as max_attempted_at, + min(attempted_at) as min_attempted_at, + max(segment_health) as max_health, + min(segment_health) as min_health + from repair_queue + group by placement, attempted_at is null` + + rows, err := r.db.QueryContext(ctx, query) + if err != nil { + return nil, err + } + defer func() { err = errs.Combine(err, rows.Close()) }() + + var res []queue.Stat + for rows.Next() { + var stat queue.Stat + err = rows.Scan( + &stat.Placement, + &stat.Count, + &stat.MaxInsertedAt, + &stat.MinInsertedAt, + &stat.MaxAttemptedAt, + &stat.MinAttemptedAt, + &stat.MaxSegmentHealth, + &stat.MinSegmentHealth, + ) + if err != nil { + return res, err + } + res = append(res, stat) + } + return res, rows.Err() +} + func (r *repairQueue) Insert(ctx context.Context, seg *queue.InjuredSegment) (alreadyInserted bool, err error) { defer mon.Task()(&ctx)(&err) // insert if not exists, or update healthy count if does exist diff --git a/satellite/satellitedb/repairqueue_test.go b/satellite/satellitedb/repairqueue_test.go index ca7dd9dfb..4f12d7d14 100644 --- a/satellite/satellitedb/repairqueue_test.go +++ b/satellite/satellitedb/repairqueue_test.go @@ -200,3 +200,40 @@ func TestRepairQueue_BatchInsert(t *testing.T) { }) } + +func TestRepairQueue_Stat(t *testing.T) { + satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { + testSegments := make([]*queue.InjuredSegment, 20) + for i := 0; i < len(testSegments); i++ { + + placement := storj.PlacementConstraint(i % 5) + uuid := testrand.UUID() + uuid[0] = byte(placement) + + is := &queue.InjuredSegment{ + StreamID: uuid, + Position: metabase.SegmentPosition{ + Part: uint32(i), + Index: 2, + }, + SegmentHealth: 10, + Placement: placement, + } + testSegments[i] = is + } + rq := db.RepairQueue() + + _, err := rq.InsertBatch(ctx, testSegments) + require.NoError(t, err) + + _, err = rq.Select(ctx, nil, nil) + require.NoError(t, err) + + stat, err := rq.Stat(ctx) + require.NoError(t, err) + + // we have 5 placement, but one has both attempted and non-attempted entries + require.Len(t, stat, 6) + + }) +}