4e5a7f13c7
Add a column to the repair queue table in the satellite db for healthy piece count. When an item is selected from the repair queue, the least durable segment that has not been attempted in the past hour should be selected first. This prevents our repairer from getting stuck doing work on segments that are close to the repair threshold while allowing segments that are more unhealthy to degrade further. The migration also clears the repair queue so that the migration runs quickly and we can properly account for segment health in future repair work. We do not select items off the repair queue that have been attempted in the past six hours. This was changed from on hour to allow us time to try a wider variety of segments when the repair queue is very large. Change-Id: Iaf183f1e5fd45cd792a52e3563a3e43a2b9f410b
108 lines
3.1 KiB
Go
108 lines
3.1 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package satellitedb
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
|
|
"github.com/zeebo/errs"
|
|
|
|
"storj.io/common/pb"
|
|
"storj.io/storj/private/dbutil"
|
|
"storj.io/storj/storage"
|
|
)
|
|
|
|
// RepairQueueSelectLimit defines how many items can be selected at the same time.
|
|
const RepairQueueSelectLimit = 1000
|
|
|
|
type repairQueue struct {
|
|
db *satelliteDB
|
|
}
|
|
|
|
func (r *repairQueue) Insert(ctx context.Context, seg *pb.InjuredSegment, numHealthy int) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
// insert if not exists, or update healthy count if does exist
|
|
query := `
|
|
INSERT INTO injuredsegments
|
|
(
|
|
path, data, num_healthy_pieces
|
|
)
|
|
VALUES (
|
|
$1, $2, $3
|
|
)
|
|
ON CONFLICT (path)
|
|
DO UPDATE
|
|
SET
|
|
num_healthy_pieces=$3
|
|
`
|
|
_, err = r.db.ExecContext(ctx, query, seg.Path, seg, numHealthy)
|
|
return err
|
|
}
|
|
|
|
func (r *repairQueue) Select(ctx context.Context) (seg *pb.InjuredSegment, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
switch r.db.implementation {
|
|
case dbutil.Cockroach:
|
|
err = r.db.QueryRowContext(ctx, `
|
|
UPDATE injuredsegments SET attempted = now() AT TIME ZONE 'UTC' WHERE path = (
|
|
SELECT path FROM injuredsegments
|
|
WHERE attempted IS NULL OR attempted < now() AT TIME ZONE 'UTC' - interval '6 hours'
|
|
ORDER BY num_healthy_pieces ASC, attempted LIMIT 1
|
|
) RETURNING data`).Scan(&seg)
|
|
case dbutil.Postgres:
|
|
err = r.db.QueryRowContext(ctx, `
|
|
UPDATE injuredsegments SET attempted = now() AT TIME ZONE 'UTC' WHERE path = (
|
|
SELECT path FROM injuredsegments
|
|
WHERE attempted IS NULL OR attempted < now() AT TIME ZONE 'UTC' - interval '6 hours'
|
|
ORDER BY num_healthy_pieces ASC, attempted NULLS FIRST FOR UPDATE SKIP LOCKED LIMIT 1
|
|
) RETURNING data`).Scan(&seg)
|
|
default:
|
|
return seg, errs.New("invalid dbType: %v", r.db.implementation)
|
|
}
|
|
if err == sql.ErrNoRows {
|
|
err = storage.ErrEmptyQueue.New("")
|
|
}
|
|
return seg, err
|
|
}
|
|
|
|
func (r *repairQueue) Delete(ctx context.Context, seg *pb.InjuredSegment) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
_, err = r.db.ExecContext(ctx, r.db.Rebind(`DELETE FROM injuredsegments WHERE path = ?`), seg.Path)
|
|
return Error.Wrap(err)
|
|
}
|
|
|
|
func (r *repairQueue) SelectN(ctx context.Context, limit int) (segs []pb.InjuredSegment, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
if limit <= 0 || limit > RepairQueueSelectLimit {
|
|
limit = RepairQueueSelectLimit
|
|
}
|
|
//todo: strictly enforce order-by or change tests
|
|
rows, err := r.db.QueryContext(ctx, r.db.Rebind(`SELECT data FROM injuredsegments LIMIT ?`), limit)
|
|
if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
defer func() { err = errs.Combine(err, rows.Close()) }()
|
|
|
|
for rows.Next() {
|
|
var seg pb.InjuredSegment
|
|
err = rows.Scan(&seg)
|
|
if err != nil {
|
|
return segs, Error.Wrap(err)
|
|
}
|
|
segs = append(segs, seg)
|
|
}
|
|
|
|
return segs, Error.Wrap(rows.Err())
|
|
}
|
|
|
|
func (r *repairQueue) Count(ctx context.Context) (count int, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
// Count every segment regardless of how recently repair was last attempted
|
|
err = r.db.QueryRowContext(ctx, r.db.Rebind(`SELECT COUNT(*) as count FROM injuredsegments`)).Scan(&count)
|
|
|
|
return count, Error.Wrap(err)
|
|
}
|