diff --git a/monkit.lock b/monkit.lock index 102bf2d36..f26e92cef 100644 --- a/monkit.lock +++ b/monkit.lock @@ -57,6 +57,7 @@ storj.io/storj/satellite/repair/checker."checker_segment_age" IntVal storj.io/storj/satellite/repair/checker."checker_segment_healthy_count" IntVal storj.io/storj/satellite/repair/checker."checker_segment_time_until_irreparable" IntVal storj.io/storj/satellite/repair/checker."checker_segment_total_count" IntVal +storj.io/storj/satellite/repair/checker."new_remote_segments_needing_repair" IntVal storj.io/storj/satellite/repair/checker."remote_files_checked" IntVal storj.io/storj/satellite/repair/checker."remote_files_lost" IntVal storj.io/storj/satellite/repair/checker."remote_segments_checked" IntVal diff --git a/satellite/repair/checker/checker.go b/satellite/repair/checker/checker.go index 87b1f8b1e..15c5db9e8 100644 --- a/satellite/repair/checker/checker.go +++ b/satellite/repair/checker/checker.go @@ -39,12 +39,13 @@ type Config struct { // durabilityStats remote segment information type durabilityStats struct { - objectsChecked int64 - remoteSegmentsChecked int64 - remoteSegmentsNeedingRepair int64 - remoteSegmentsLost int64 - remoteSegmentsFailedToCheck int64 - remoteSegmentInfo []string + objectsChecked int64 + remoteSegmentsChecked int64 + remoteSegmentsNeedingRepair int64 + newRemoteSegmentsNeedingRepair int64 + remoteSegmentsLost int64 + remoteSegmentsFailedToCheck int64 + remoteSegmentInfo []string // remoteSegmentsOverThreshold[0]=# of healthy=rt+1, remoteSegmentsOverThreshold[1]=# of healthy=rt+2, etc... remoteSegmentsOverThreshold [5]int64 } @@ -129,17 +130,18 @@ func (checker *Checker) IdentifyInjuredSegments(ctx context.Context) (err error) return err } - mon.IntVal("remote_files_checked").Observe(observer.monStats.objectsChecked) //locked - mon.IntVal("remote_segments_checked").Observe(observer.monStats.remoteSegmentsChecked) //locked - mon.IntVal("remote_segments_failed_to_check").Observe(observer.monStats.remoteSegmentsFailedToCheck) //locked - mon.IntVal("remote_segments_needing_repair").Observe(observer.monStats.remoteSegmentsNeedingRepair) //locked - mon.IntVal("remote_segments_lost").Observe(observer.monStats.remoteSegmentsLost) //locked - mon.IntVal("remote_files_lost").Observe(int64(len(observer.monStats.remoteSegmentInfo))) //locked - mon.IntVal("remote_segments_over_threshold_1").Observe(observer.monStats.remoteSegmentsOverThreshold[0]) //locked - mon.IntVal("remote_segments_over_threshold_2").Observe(observer.monStats.remoteSegmentsOverThreshold[1]) //locked - mon.IntVal("remote_segments_over_threshold_3").Observe(observer.monStats.remoteSegmentsOverThreshold[2]) //locked - mon.IntVal("remote_segments_over_threshold_4").Observe(observer.monStats.remoteSegmentsOverThreshold[3]) //locked - mon.IntVal("remote_segments_over_threshold_5").Observe(observer.monStats.remoteSegmentsOverThreshold[4]) //locked + mon.IntVal("remote_files_checked").Observe(observer.monStats.objectsChecked) //locked + mon.IntVal("remote_segments_checked").Observe(observer.monStats.remoteSegmentsChecked) //locked + mon.IntVal("remote_segments_failed_to_check").Observe(observer.monStats.remoteSegmentsFailedToCheck) //locked + mon.IntVal("remote_segments_needing_repair").Observe(observer.monStats.remoteSegmentsNeedingRepair) //locked + mon.IntVal("new_remote_segments_needing_repair").Observe(observer.monStats.newRemoteSegmentsNeedingRepair) //locked + mon.IntVal("remote_segments_lost").Observe(observer.monStats.remoteSegmentsLost) //locked + mon.IntVal("remote_files_lost").Observe(int64(len(observer.monStats.remoteSegmentInfo))) //locked + mon.IntVal("remote_segments_over_threshold_1").Observe(observer.monStats.remoteSegmentsOverThreshold[0]) //locked + mon.IntVal("remote_segments_over_threshold_2").Observe(observer.monStats.remoteSegmentsOverThreshold[1]) //locked + mon.IntVal("remote_segments_over_threshold_3").Observe(observer.monStats.remoteSegmentsOverThreshold[2]) //locked + mon.IntVal("remote_segments_over_threshold_4").Observe(observer.monStats.remoteSegmentsOverThreshold[3]) //locked + mon.IntVal("remote_segments_over_threshold_5").Observe(observer.monStats.remoteSegmentsOverThreshold[4]) //locked allUnhealthy := observer.monStats.remoteSegmentsNeedingRepair + observer.monStats.remoteSegmentsFailedToCheck allChecked := observer.monStats.remoteSegmentsChecked @@ -190,7 +192,7 @@ func (checker *Checker) updateIrreparableSegmentStatus(ctx context.Context, poin // minimum required pieces in redundancy // except for the case when the repair and success thresholds are the same (a case usually seen during testing) if numHealthy >= redundancy.MinReq && numHealthy <= repairThreshold && numHealthy < redundancy.SuccessThreshold { - err = checker.repairQueue.Insert(ctx, &pb.InjuredSegment{ + _, err = checker.repairQueue.Insert(ctx, &pb.InjuredSegment{ Path: []byte(path), LostPieces: missingPieces, InsertedTime: time.Now().UTC(), @@ -280,7 +282,7 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, path metainfo.Sco // except for the case when the repair and success thresholds are the same (a case usually seen during testing) if numHealthy >= redundancy.MinReq && numHealthy <= repairThreshold && numHealthy < redundancy.SuccessThreshold { obs.monStats.remoteSegmentsNeedingRepair++ - err = obs.repairQueue.Insert(ctx, &pb.InjuredSegment{ + alreadyInserted, err := obs.repairQueue.Insert(ctx, &pb.InjuredSegment{ Path: []byte(path.Raw), LostPieces: missingPieces, InsertedTime: time.Now().UTC(), @@ -290,6 +292,10 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, path metainfo.Sco return nil } + if !alreadyInserted { + obs.monStats.newRemoteSegmentsNeedingRepair++ + } + // delete always returns nil when something was deleted and also when element didn't exists err = obs.irrdb.Delete(ctx, []byte(path.Raw)) if err != nil { diff --git a/satellite/repair/queue/queue.go b/satellite/repair/queue/queue.go index 66df9bd00..e4ebf7df6 100644 --- a/satellite/repair/queue/queue.go +++ b/satellite/repair/queue/queue.go @@ -15,7 +15,7 @@ import ( // architecture: Database type RepairQueue interface { // Insert adds an injured segment. - Insert(ctx context.Context, s *pb.InjuredSegment, numHealthy int) error + Insert(ctx context.Context, s *pb.InjuredSegment, numHealthy int) (alreadyInserted bool, err error) // Select gets an injured segment. Select(ctx context.Context) (*pb.InjuredSegment, error) // Delete removes an injured segment. diff --git a/satellite/repair/queue/queue2_test.go b/satellite/repair/queue/queue2_test.go index 4f729f37d..658624053 100644 --- a/satellite/repair/queue/queue2_test.go +++ b/satellite/repair/queue/queue2_test.go @@ -26,11 +26,12 @@ func TestUntilEmpty(t *testing.T) { // insert a bunch of segments pathsMap := make(map[string]int) - for i := 0; i < 100; i++ { + for i := 0; i < 20; i++ { path := "/path/" + string(i) injuredSeg := &pb.InjuredSegment{Path: []byte(path)} - err := repairQueue.Insert(ctx, injuredSeg, 10) + alreadyInserted, err := repairQueue.Insert(ctx, injuredSeg, 10) require.NoError(t, err) + require.False(t, alreadyInserted) pathsMap[path] = 0 } @@ -61,8 +62,9 @@ func TestOrder(t *testing.T) { for _, path := range [][]byte{oldRepairPath, recentRepairPath, nullPath, olderRepairPath} { injuredSeg := &pb.InjuredSegment{Path: path} - err := repairQueue.Insert(ctx, injuredSeg, 10) + alreadyInserted, err := repairQueue.Insert(ctx, injuredSeg, 10) require.NoError(t, err) + require.False(t, alreadyInserted) } // TODO: remove dependency on *dbx.DB @@ -155,8 +157,9 @@ func TestOrderHealthyPieces(t *testing.T) { for _, item := range injuredSegList { // first, insert the injured segment injuredSeg := &pb.InjuredSegment{Path: item.path} - err := repairQueue.Insert(ctx, injuredSeg, item.health) + alreadyInserted, err := repairQueue.Insert(ctx, injuredSeg, item.health) require.NoError(t, err) + require.False(t, alreadyInserted) // next, if applicable, update the "attempted at" timestamp if !item.attempted.IsZero() { @@ -216,10 +219,15 @@ func TestOrderOverwrite(t *testing.T) { {[]byte("path/b"), 9}, {[]byte("path/a"), 8}, } - for _, item := range injuredSegList { + for i, item := range injuredSegList { injuredSeg := &pb.InjuredSegment{Path: item.path} - err := repairQueue.Insert(ctx, injuredSeg, item.health) + alreadyInserted, err := repairQueue.Insert(ctx, injuredSeg, item.health) require.NoError(t, err) + if i == 2 { + require.True(t, alreadyInserted) + } else { + require.False(t, alreadyInserted) + } } for _, nextPath := range []string{ @@ -244,12 +252,13 @@ func TestCount(t *testing.T) { // insert a bunch of segments pathsMap := make(map[string]int) - numSegments := 100 + numSegments := 20 for i := 0; i < numSegments; i++ { path := "/path/" + string(i) injuredSeg := &pb.InjuredSegment{Path: []byte(path)} - err := repairQueue.Insert(ctx, injuredSeg, 10) + alreadyInserted, err := repairQueue.Insert(ctx, injuredSeg, 10) require.NoError(t, err) + require.False(t, alreadyInserted) pathsMap[path] = 0 } diff --git a/satellite/repair/queue/queue_test.go b/satellite/repair/queue/queue_test.go index ba666df6b..1a52cef11 100644 --- a/satellite/repair/queue/queue_test.go +++ b/satellite/repair/queue/queue_test.go @@ -26,8 +26,9 @@ func TestInsertSelect(t *testing.T) { Path: []byte("abc"), LostPieces: []int32{int32(1), int32(3)}, } - err := q.Insert(ctx, seg, 10) + alreadyInserted, err := q.Insert(ctx, seg, 10) require.NoError(t, err) + require.False(t, alreadyInserted) s, err := q.Select(ctx) require.NoError(t, err) err = q.Delete(ctx, s) @@ -44,10 +45,12 @@ func TestInsertDuplicate(t *testing.T) { Path: []byte("abc"), LostPieces: []int32{int32(1), int32(3)}, } - err := q.Insert(ctx, seg, 10) + alreadyInserted, err := q.Insert(ctx, seg, 10) require.NoError(t, err) - err = q.Insert(ctx, seg, 10) + require.False(t, alreadyInserted) + alreadyInserted, err = q.Insert(ctx, seg, 10) require.NoError(t, err) + require.True(t, alreadyInserted) }) } @@ -65,15 +68,16 @@ func TestSequential(t *testing.T) { satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { q := db.RepairQueue() - const N = 100 + const N = 20 var addSegs []*pb.InjuredSegment for i := 0; i < N; i++ { seg := &pb.InjuredSegment{ Path: []byte(strconv.Itoa(i)), LostPieces: []int32{int32(i)}, } - err := q.Insert(ctx, seg, 10) + alreadyInserted, err := q.Insert(ctx, seg, 10) require.NoError(t, err) + require.False(t, alreadyInserted) addSegs = append(addSegs, seg) } @@ -102,7 +106,7 @@ func TestSequential(t *testing.T) { func TestParallel(t *testing.T) { satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { q := db.RepairQueue() - const N = 100 + const N = 20 entries := make(chan *pb.InjuredSegment, N) var inserts errs2.Group @@ -110,10 +114,11 @@ func TestParallel(t *testing.T) { for i := 0; i < N; i++ { i := i inserts.Go(func() error { - return q.Insert(ctx, &pb.InjuredSegment{ + _, err := q.Insert(ctx, &pb.InjuredSegment{ Path: []byte(strconv.Itoa(i)), LostPieces: []int32{int32(i)}, }, 10) + return err }) } require.Empty(t, inserts.Wait(), "unexpected queue.Insert errors") diff --git a/satellite/satellitedb/repairqueue.go b/satellite/satellitedb/repairqueue.go index 43663d659..786248826 100644 --- a/satellite/satellitedb/repairqueue.go +++ b/satellite/satellitedb/repairqueue.go @@ -21,24 +21,56 @@ type repairQueue struct { db *satelliteDB } -func (r *repairQueue) Insert(ctx context.Context, seg *pb.InjuredSegment, numHealthy int) (err error) { +func (r *repairQueue) Insert(ctx context.Context, seg *pb.InjuredSegment, numHealthy int) (alreadyInserted bool, err error) { defer mon.Task()(&ctx)(&err) // insert if not exists, or update healthy count if does exist - query := ` - INSERT INTO injuredsegments - ( - path, data, num_healthy_pieces - ) - VALUES ( - $1, $2, $3 - ) - ON CONFLICT (path) - DO UPDATE - SET - num_healthy_pieces=$3 + var query string + + // we want to insert the segment if it is not in the queue, but update the number of healthy pieces if it already is in the queue + // we also want to know if the result was an insert or an update - this is the reasoning for the xmax section of the postgres query + // and the separate cockroach query (which the xmax trick does not work for) + switch r.db.implementation { + case dbutil.Postgres: + query = ` + INSERT INTO injuredsegments + ( + path, data, num_healthy_pieces + ) + VALUES ( + $1, $2, $3 + ) + ON CONFLICT (path) + DO UPDATE + SET num_healthy_pieces=$3 + RETURNING (xmax != 0) AS alreadyInserted ` - _, err = r.db.ExecContext(ctx, query, seg.Path, seg, numHealthy) - return err + case dbutil.Cockroach: + query = ` + WITH updater AS ( + UPDATE injuredsegments SET num_healthy_pieces = $3 WHERE path = $1 + RETURNING * + ) + INSERT INTO injuredsegments (path, data, num_healthy_pieces) + SELECT $1, $2, $3 + WHERE NOT EXISTS (SELECT * FROM updater) + RETURNING false + ` + } + rows, err := r.db.QueryContext(ctx, query, seg.Path, seg, numHealthy) + if err != nil { + return false, err + } + + if !rows.Next() { + // cockroach query does not return anything if the segment is already in the queue + alreadyInserted = true + } else { + err = rows.Scan(&alreadyInserted) + if err != nil { + return false, err + } + } + return alreadyInserted, err } func (r *repairQueue) Select(ctx context.Context) (seg *pb.InjuredSegment, err error) {