satellite/repair/{checker,queue}: add metric for new segments added to repair queue

* add monkit stat new_remote_segments_needing_repair, which reports the
number of new unhealthy segments in the repair queue since the previous
checker iteration

Change-Id: I2f10266006fdd6406ece50f4759b91382059dcc3
This commit is contained in:
Moby von Briesen 2020-05-22 15:54:05 -04:00 committed by Ivan Fraixedes
parent ed857f0fb4
commit 290c006a10
6 changed files with 103 additions and 50 deletions

View File

@ -57,6 +57,7 @@ storj.io/storj/satellite/repair/checker."checker_segment_age" IntVal
storj.io/storj/satellite/repair/checker."checker_segment_healthy_count" IntVal
storj.io/storj/satellite/repair/checker."checker_segment_time_until_irreparable" IntVal
storj.io/storj/satellite/repair/checker."checker_segment_total_count" IntVal
storj.io/storj/satellite/repair/checker."new_remote_segments_needing_repair" IntVal
storj.io/storj/satellite/repair/checker."remote_files_checked" IntVal
storj.io/storj/satellite/repair/checker."remote_files_lost" IntVal
storj.io/storj/satellite/repair/checker."remote_segments_checked" IntVal

View File

@ -39,12 +39,13 @@ type Config struct {
// durabilityStats remote segment information
type durabilityStats struct {
objectsChecked int64
remoteSegmentsChecked int64
remoteSegmentsNeedingRepair int64
remoteSegmentsLost int64
remoteSegmentsFailedToCheck int64
remoteSegmentInfo []string
objectsChecked int64
remoteSegmentsChecked int64
remoteSegmentsNeedingRepair int64
newRemoteSegmentsNeedingRepair int64
remoteSegmentsLost int64
remoteSegmentsFailedToCheck int64
remoteSegmentInfo []string
// remoteSegmentsOverThreshold[0]=# of healthy=rt+1, remoteSegmentsOverThreshold[1]=# of healthy=rt+2, etc...
remoteSegmentsOverThreshold [5]int64
}
@ -129,17 +130,18 @@ func (checker *Checker) IdentifyInjuredSegments(ctx context.Context) (err error)
return err
}
mon.IntVal("remote_files_checked").Observe(observer.monStats.objectsChecked) //locked
mon.IntVal("remote_segments_checked").Observe(observer.monStats.remoteSegmentsChecked) //locked
mon.IntVal("remote_segments_failed_to_check").Observe(observer.monStats.remoteSegmentsFailedToCheck) //locked
mon.IntVal("remote_segments_needing_repair").Observe(observer.monStats.remoteSegmentsNeedingRepair) //locked
mon.IntVal("remote_segments_lost").Observe(observer.monStats.remoteSegmentsLost) //locked
mon.IntVal("remote_files_lost").Observe(int64(len(observer.monStats.remoteSegmentInfo))) //locked
mon.IntVal("remote_segments_over_threshold_1").Observe(observer.monStats.remoteSegmentsOverThreshold[0]) //locked
mon.IntVal("remote_segments_over_threshold_2").Observe(observer.monStats.remoteSegmentsOverThreshold[1]) //locked
mon.IntVal("remote_segments_over_threshold_3").Observe(observer.monStats.remoteSegmentsOverThreshold[2]) //locked
mon.IntVal("remote_segments_over_threshold_4").Observe(observer.monStats.remoteSegmentsOverThreshold[3]) //locked
mon.IntVal("remote_segments_over_threshold_5").Observe(observer.monStats.remoteSegmentsOverThreshold[4]) //locked
mon.IntVal("remote_files_checked").Observe(observer.monStats.objectsChecked) //locked
mon.IntVal("remote_segments_checked").Observe(observer.monStats.remoteSegmentsChecked) //locked
mon.IntVal("remote_segments_failed_to_check").Observe(observer.monStats.remoteSegmentsFailedToCheck) //locked
mon.IntVal("remote_segments_needing_repair").Observe(observer.monStats.remoteSegmentsNeedingRepair) //locked
mon.IntVal("new_remote_segments_needing_repair").Observe(observer.monStats.newRemoteSegmentsNeedingRepair) //locked
mon.IntVal("remote_segments_lost").Observe(observer.monStats.remoteSegmentsLost) //locked
mon.IntVal("remote_files_lost").Observe(int64(len(observer.monStats.remoteSegmentInfo))) //locked
mon.IntVal("remote_segments_over_threshold_1").Observe(observer.monStats.remoteSegmentsOverThreshold[0]) //locked
mon.IntVal("remote_segments_over_threshold_2").Observe(observer.monStats.remoteSegmentsOverThreshold[1]) //locked
mon.IntVal("remote_segments_over_threshold_3").Observe(observer.monStats.remoteSegmentsOverThreshold[2]) //locked
mon.IntVal("remote_segments_over_threshold_4").Observe(observer.monStats.remoteSegmentsOverThreshold[3]) //locked
mon.IntVal("remote_segments_over_threshold_5").Observe(observer.monStats.remoteSegmentsOverThreshold[4]) //locked
allUnhealthy := observer.monStats.remoteSegmentsNeedingRepair + observer.monStats.remoteSegmentsFailedToCheck
allChecked := observer.monStats.remoteSegmentsChecked
@ -190,7 +192,7 @@ func (checker *Checker) updateIrreparableSegmentStatus(ctx context.Context, poin
// minimum required pieces in redundancy
// except for the case when the repair and success thresholds are the same (a case usually seen during testing)
if numHealthy >= redundancy.MinReq && numHealthy <= repairThreshold && numHealthy < redundancy.SuccessThreshold {
err = checker.repairQueue.Insert(ctx, &pb.InjuredSegment{
_, err = checker.repairQueue.Insert(ctx, &pb.InjuredSegment{
Path: []byte(path),
LostPieces: missingPieces,
InsertedTime: time.Now().UTC(),
@ -280,7 +282,7 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, path metainfo.Sco
// except for the case when the repair and success thresholds are the same (a case usually seen during testing)
if numHealthy >= redundancy.MinReq && numHealthy <= repairThreshold && numHealthy < redundancy.SuccessThreshold {
obs.monStats.remoteSegmentsNeedingRepair++
err = obs.repairQueue.Insert(ctx, &pb.InjuredSegment{
alreadyInserted, err := obs.repairQueue.Insert(ctx, &pb.InjuredSegment{
Path: []byte(path.Raw),
LostPieces: missingPieces,
InsertedTime: time.Now().UTC(),
@ -290,6 +292,10 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, path metainfo.Sco
return nil
}
if !alreadyInserted {
obs.monStats.newRemoteSegmentsNeedingRepair++
}
// delete always returns nil when something was deleted and also when element didn't exists
err = obs.irrdb.Delete(ctx, []byte(path.Raw))
if err != nil {

View File

@ -15,7 +15,7 @@ import (
// architecture: Database
type RepairQueue interface {
// Insert adds an injured segment.
Insert(ctx context.Context, s *pb.InjuredSegment, numHealthy int) error
Insert(ctx context.Context, s *pb.InjuredSegment, numHealthy int) (alreadyInserted bool, err error)
// Select gets an injured segment.
Select(ctx context.Context) (*pb.InjuredSegment, error)
// Delete removes an injured segment.

View File

@ -26,11 +26,12 @@ func TestUntilEmpty(t *testing.T) {
// insert a bunch of segments
pathsMap := make(map[string]int)
for i := 0; i < 100; i++ {
for i := 0; i < 20; i++ {
path := "/path/" + string(i)
injuredSeg := &pb.InjuredSegment{Path: []byte(path)}
err := repairQueue.Insert(ctx, injuredSeg, 10)
alreadyInserted, err := repairQueue.Insert(ctx, injuredSeg, 10)
require.NoError(t, err)
require.False(t, alreadyInserted)
pathsMap[path] = 0
}
@ -61,8 +62,9 @@ func TestOrder(t *testing.T) {
for _, path := range [][]byte{oldRepairPath, recentRepairPath, nullPath, olderRepairPath} {
injuredSeg := &pb.InjuredSegment{Path: path}
err := repairQueue.Insert(ctx, injuredSeg, 10)
alreadyInserted, err := repairQueue.Insert(ctx, injuredSeg, 10)
require.NoError(t, err)
require.False(t, alreadyInserted)
}
// TODO: remove dependency on *dbx.DB
@ -155,8 +157,9 @@ func TestOrderHealthyPieces(t *testing.T) {
for _, item := range injuredSegList {
// first, insert the injured segment
injuredSeg := &pb.InjuredSegment{Path: item.path}
err := repairQueue.Insert(ctx, injuredSeg, item.health)
alreadyInserted, err := repairQueue.Insert(ctx, injuredSeg, item.health)
require.NoError(t, err)
require.False(t, alreadyInserted)
// next, if applicable, update the "attempted at" timestamp
if !item.attempted.IsZero() {
@ -216,10 +219,15 @@ func TestOrderOverwrite(t *testing.T) {
{[]byte("path/b"), 9},
{[]byte("path/a"), 8},
}
for _, item := range injuredSegList {
for i, item := range injuredSegList {
injuredSeg := &pb.InjuredSegment{Path: item.path}
err := repairQueue.Insert(ctx, injuredSeg, item.health)
alreadyInserted, err := repairQueue.Insert(ctx, injuredSeg, item.health)
require.NoError(t, err)
if i == 2 {
require.True(t, alreadyInserted)
} else {
require.False(t, alreadyInserted)
}
}
for _, nextPath := range []string{
@ -244,12 +252,13 @@ func TestCount(t *testing.T) {
// insert a bunch of segments
pathsMap := make(map[string]int)
numSegments := 100
numSegments := 20
for i := 0; i < numSegments; i++ {
path := "/path/" + string(i)
injuredSeg := &pb.InjuredSegment{Path: []byte(path)}
err := repairQueue.Insert(ctx, injuredSeg, 10)
alreadyInserted, err := repairQueue.Insert(ctx, injuredSeg, 10)
require.NoError(t, err)
require.False(t, alreadyInserted)
pathsMap[path] = 0
}

View File

@ -26,8 +26,9 @@ func TestInsertSelect(t *testing.T) {
Path: []byte("abc"),
LostPieces: []int32{int32(1), int32(3)},
}
err := q.Insert(ctx, seg, 10)
alreadyInserted, err := q.Insert(ctx, seg, 10)
require.NoError(t, err)
require.False(t, alreadyInserted)
s, err := q.Select(ctx)
require.NoError(t, err)
err = q.Delete(ctx, s)
@ -44,10 +45,12 @@ func TestInsertDuplicate(t *testing.T) {
Path: []byte("abc"),
LostPieces: []int32{int32(1), int32(3)},
}
err := q.Insert(ctx, seg, 10)
alreadyInserted, err := q.Insert(ctx, seg, 10)
require.NoError(t, err)
err = q.Insert(ctx, seg, 10)
require.False(t, alreadyInserted)
alreadyInserted, err = q.Insert(ctx, seg, 10)
require.NoError(t, err)
require.True(t, alreadyInserted)
})
}
@ -65,15 +68,16 @@ func TestSequential(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
q := db.RepairQueue()
const N = 100
const N = 20
var addSegs []*pb.InjuredSegment
for i := 0; i < N; i++ {
seg := &pb.InjuredSegment{
Path: []byte(strconv.Itoa(i)),
LostPieces: []int32{int32(i)},
}
err := q.Insert(ctx, seg, 10)
alreadyInserted, err := q.Insert(ctx, seg, 10)
require.NoError(t, err)
require.False(t, alreadyInserted)
addSegs = append(addSegs, seg)
}
@ -102,7 +106,7 @@ func TestSequential(t *testing.T) {
func TestParallel(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
q := db.RepairQueue()
const N = 100
const N = 20
entries := make(chan *pb.InjuredSegment, N)
var inserts errs2.Group
@ -110,10 +114,11 @@ func TestParallel(t *testing.T) {
for i := 0; i < N; i++ {
i := i
inserts.Go(func() error {
return q.Insert(ctx, &pb.InjuredSegment{
_, err := q.Insert(ctx, &pb.InjuredSegment{
Path: []byte(strconv.Itoa(i)),
LostPieces: []int32{int32(i)},
}, 10)
return err
})
}
require.Empty(t, inserts.Wait(), "unexpected queue.Insert errors")

View File

@ -21,24 +21,56 @@ type repairQueue struct {
db *satelliteDB
}
func (r *repairQueue) Insert(ctx context.Context, seg *pb.InjuredSegment, numHealthy int) (err error) {
func (r *repairQueue) Insert(ctx context.Context, seg *pb.InjuredSegment, numHealthy int) (alreadyInserted bool, err error) {
defer mon.Task()(&ctx)(&err)
// insert if not exists, or update healthy count if does exist
query := `
INSERT INTO injuredsegments
(
path, data, num_healthy_pieces
)
VALUES (
$1, $2, $3
)
ON CONFLICT (path)
DO UPDATE
SET
num_healthy_pieces=$3
var query string
// we want to insert the segment if it is not in the queue, but update the number of healthy pieces if it already is in the queue
// we also want to know if the result was an insert or an update - this is the reasoning for the xmax section of the postgres query
// and the separate cockroach query (which the xmax trick does not work for)
switch r.db.implementation {
case dbutil.Postgres:
query = `
INSERT INTO injuredsegments
(
path, data, num_healthy_pieces
)
VALUES (
$1, $2, $3
)
ON CONFLICT (path)
DO UPDATE
SET num_healthy_pieces=$3
RETURNING (xmax != 0) AS alreadyInserted
`
_, err = r.db.ExecContext(ctx, query, seg.Path, seg, numHealthy)
return err
case dbutil.Cockroach:
query = `
WITH updater AS (
UPDATE injuredsegments SET num_healthy_pieces = $3 WHERE path = $1
RETURNING *
)
INSERT INTO injuredsegments (path, data, num_healthy_pieces)
SELECT $1, $2, $3
WHERE NOT EXISTS (SELECT * FROM updater)
RETURNING false
`
}
rows, err := r.db.QueryContext(ctx, query, seg.Path, seg, numHealthy)
if err != nil {
return false, err
}
if !rows.Next() {
// cockroach query does not return anything if the segment is already in the queue
alreadyInserted = true
} else {
err = rows.Scan(&alreadyInserted)
if err != nil {
return false, err
}
}
return alreadyInserted, err
}
func (r *repairQueue) Select(ctx context.Context) (seg *pb.InjuredSegment, err error) {