satellite/repair: stat method to the repair queue to return with statistics

Change-Id: I2e07b116df9b282978a794423bd38803e2778755
This commit is contained in:
Márton Elek 2023-11-20 13:15:40 +01:00 committed by Elek, Márton
parent 2d8cca49b2
commit f2eca99bde
4 changed files with 98 additions and 0 deletions

View File

@ -72,4 +72,9 @@ func (m *MockRepairQueue) TestingSetAttemptedTime(ctx context.Context, streamID
panic("implement me")
}
// Stat implements RepairQueue.
func (m *MockRepairQueue) Stat(ctx context.Context) ([]Stat, error) {
panic("implement me")
}
var _ RepairQueue = &MockRepairQueue{}

View File

@ -26,6 +26,18 @@ type InjuredSegment struct {
Placement storj.PlacementConstraint
}
// Stat contains information about a segment of repair queue.
type Stat struct {
Count int
Placement storj.PlacementConstraint
MaxInsertedAt time.Time
MinInsertedAt time.Time
MaxAttemptedAt *time.Time
MinAttemptedAt *time.Time
MinSegmentHealth float64
MaxSegmentHealth float64
}
// RepairQueue implements queueing for segments that need repairing.
// Implementation can be found at satellite/satellitedb/repairqueue.go.
//
@ -46,6 +58,9 @@ type RepairQueue interface {
// Count counts the number of segments in the repair queue.
Count(ctx context.Context) (count int, err error)
// Stat returns stat of the current queue state.
Stat(ctx context.Context) ([]Stat, error)
// TestingSetAttemptedTime sets attempted time for a segment.
TestingSetAttemptedTime(ctx context.Context, streamID uuid.UUID, position metabase.SegmentPosition, t time.Time) (rowsAffected int64, err error)
}

View File

@ -30,6 +30,47 @@ type repairQueue struct {
db *satelliteDB
}
// Stat returns stat of the current queue state.
func (r *repairQueue) Stat(ctx context.Context) ([]queue.Stat, error) {
query := `
select placement,
count(1),
max(inserted_at) as max_inserted_at,
min(inserted_at) as min_inserted_at,
max(attempted_at) as max_attempted_at,
min(attempted_at) as min_attempted_at,
max(segment_health) as max_health,
min(segment_health) as min_health
from repair_queue
group by placement, attempted_at is null`
rows, err := r.db.QueryContext(ctx, query)
if err != nil {
return nil, err
}
defer func() { err = errs.Combine(err, rows.Close()) }()
var res []queue.Stat
for rows.Next() {
var stat queue.Stat
err = rows.Scan(
&stat.Placement,
&stat.Count,
&stat.MaxInsertedAt,
&stat.MinInsertedAt,
&stat.MaxAttemptedAt,
&stat.MinAttemptedAt,
&stat.MaxSegmentHealth,
&stat.MinSegmentHealth,
)
if err != nil {
return res, err
}
res = append(res, stat)
}
return res, rows.Err()
}
func (r *repairQueue) Insert(ctx context.Context, seg *queue.InjuredSegment) (alreadyInserted bool, err error) {
defer mon.Task()(&ctx)(&err)
// insert if not exists, or update healthy count if does exist

View File

@ -200,3 +200,40 @@ func TestRepairQueue_BatchInsert(t *testing.T) {
})
}
func TestRepairQueue_Stat(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
testSegments := make([]*queue.InjuredSegment, 20)
for i := 0; i < len(testSegments); i++ {
placement := storj.PlacementConstraint(i % 5)
uuid := testrand.UUID()
uuid[0] = byte(placement)
is := &queue.InjuredSegment{
StreamID: uuid,
Position: metabase.SegmentPosition{
Part: uint32(i),
Index: 2,
},
SegmentHealth: 10,
Placement: placement,
}
testSegments[i] = is
}
rq := db.RepairQueue()
_, err := rq.InsertBatch(ctx, testSegments)
require.NoError(t, err)
_, err = rq.Select(ctx, nil, nil)
require.NoError(t, err)
stat, err := rq.Stat(ctx)
require.NoError(t, err)
// we have 5 placement, but one has both attempted and non-attempted entries
require.Len(t, stat, 6)
})
}