storj/satellite/satellitedb/repairqueue.go
Jessica Grebenschikov 0649d2b930 satellite/repair: improve contention for injuredsegments table on CRDB
We migrated satelliteDB off of Postgres and over to CockroachDB (crdb), but there was way too high contention for the injuredsegments table so we had to rollback to Postgres for the repair queue. A couple things contributed to this problem:
1) crdb doesn't support `FOR UPDATE SKIP LOCKED`
2) the original crdb Select query was doing 2 full table scans and not using any indexes
3) the SLC Satellite (where we were doing the migration) was running 48 repair worker processes, each of which run up to 5 goroutines which all are trying to select out of the repair queue and this was causing a ton of contention.

The changes in this PR should help to reduce that contention and improve performance on CRDB.
The changes include:
1) Use an update/set query instead of select/update to capitalize on the new `UPDATE` implicit row locking ability in CRDB.
- Details: As of CRDB v20.2.2, there is implicit row locking with update/set queries (contention reduction and performance gains are described in this blog post: https://www.cockroachlabs.com/blog/when-and-why-to-use-select-for-update-in-cockroachdb/).

2) Remove the `ORDER BY` clause since this was causing a full table scan and also prevented the use of the row locking capability.
- While long term it is very important to `ORDER BY segment_health`, the change here is only suppose to be a temporary bandaid to get us migrated over to CRDB quickly. Since segment_health has been set to infinity for some time now (re: https://review.dev.storj.io/c/storj/storj/+/3224), it seems like it might be ok to continue not making use of this for the short term. However, long term this needs to be fixed with a redesign of the repair workers, possible in the trusted delegated repair design (https://review.dev.storj.io/c/storj/storj/+/2602) or something similar to what is recommended here on how to implement a queue on CRDB https://dev.to/ajwerner/quick-and-easy-exactly-once-distributed-work-queues-using-serializable-transactions-jdp, or migrate to rabbit MQ priority queue or something similar..

This PRs improved query uses the index to avoid full scans and also locks the row its going to update and CRDB retries for us if there are any lock errors.

Change-Id: Id29faad2186627872fbeb0f31536c4f55f860f23
2020-12-10 09:51:26 -08:00

161 lines
4.9 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package satellitedb
import (
"context"
"database/sql"
"errors"
"time"
"github.com/zeebo/errs"
"storj.io/storj/private/dbutil"
"storj.io/storj/satellite/internalpb"
"storj.io/storj/satellite/satellitedb/dbx"
"storj.io/storj/storage"
)
// RepairQueueSelectLimit defines how many items can be selected at the same time.
const RepairQueueSelectLimit = 1000
type repairQueue struct {
db *satelliteDB
}
func (r *repairQueue) Insert(ctx context.Context, seg *internalpb.InjuredSegment, segmentHealth float64) (alreadyInserted bool, err error) {
defer mon.Task()(&ctx)(&err)
// insert if not exists, or update healthy count if does exist
var query string
// we want to insert the segment if it is not in the queue, but update the number of healthy pieces if it already is in the queue
// we also want to know if the result was an insert or an update - this is the reasoning for the xmax section of the postgres query
// and the separate cockroach query (which the xmax trick does not work for)
switch r.db.implementation {
case dbutil.Postgres:
query = `
INSERT INTO injuredsegments
(
path, data, segment_health
)
VALUES (
$1, $2, $3
)
ON CONFLICT (path)
DO UPDATE
SET segment_health=$3, updated_at=current_timestamp
RETURNING (xmax != 0) AS alreadyInserted
`
case dbutil.Cockroach:
query = `
WITH updater AS (
UPDATE injuredsegments SET segment_health = $3, updated_at = current_timestamp WHERE path = $1
RETURNING *
)
INSERT INTO injuredsegments (path, data, segment_health)
SELECT $1, $2, $3
WHERE NOT EXISTS (SELECT * FROM updater)
RETURNING false
`
}
rows, err := r.db.QueryContext(ctx, query, seg.Path, seg, segmentHealth)
if err != nil {
return false, err
}
defer func() { err = errs.Combine(err, rows.Close()) }()
if !rows.Next() {
// cockroach query does not return anything if the segment is already in the queue
alreadyInserted = true
} else {
err = rows.Scan(&alreadyInserted)
if err != nil {
return false, err
}
}
return alreadyInserted, rows.Err()
}
func (r *repairQueue) Select(ctx context.Context) (seg *internalpb.InjuredSegment, err error) {
defer mon.Task()(&ctx)(&err)
switch r.db.implementation {
case dbutil.Cockroach:
err = r.db.QueryRowContext(ctx, `
UPDATE injuredsegments SET attempted = now()
WHERE attempted IS NULL OR attempted < now() - interval '6 hours'
LIMIT 1
RETURNING data`).Scan(&seg)
case dbutil.Postgres:
err = r.db.QueryRowContext(ctx, `
UPDATE injuredsegments SET attempted = now() WHERE path = (
SELECT path FROM injuredsegments
WHERE attempted IS NULL OR attempted < now() - interval '6 hours'
ORDER BY segment_health ASC, attempted NULLS FIRST FOR UPDATE SKIP LOCKED LIMIT 1
) RETURNING data`).Scan(&seg)
default:
return seg, errs.New("invalid dbType: %v", r.db.implementation)
}
if errors.Is(err, sql.ErrNoRows) {
err = storage.ErrEmptyQueue.New("")
}
return seg, err
}
func (r *repairQueue) Delete(ctx context.Context, seg *internalpb.InjuredSegment) (err error) {
defer mon.Task()(&ctx)(&err)
_, err = r.db.ExecContext(ctx, r.db.Rebind(`DELETE FROM injuredsegments WHERE path = ?`), seg.Path)
return Error.Wrap(err)
}
func (r *repairQueue) Clean(ctx context.Context, before time.Time) (deleted int64, err error) {
defer mon.Task()(&ctx)(&err)
n, err := r.db.Delete_Injuredsegment_By_UpdatedAt_Less(ctx, dbx.Injuredsegment_UpdatedAt(before))
return n, Error.Wrap(err)
}
func (r *repairQueue) SelectN(ctx context.Context, limit int) (segs []internalpb.InjuredSegment, err error) {
defer mon.Task()(&ctx)(&err)
if limit <= 0 || limit > RepairQueueSelectLimit {
limit = RepairQueueSelectLimit
}
// TODO: strictly enforce order-by or change tests
rows, err := r.db.QueryContext(ctx, r.db.Rebind(`SELECT data FROM injuredsegments LIMIT ?`), limit)
if err != nil {
return nil, Error.Wrap(err)
}
defer func() { err = errs.Combine(err, rows.Close()) }()
for rows.Next() {
var seg internalpb.InjuredSegment
err = rows.Scan(&seg)
if err != nil {
return segs, Error.Wrap(err)
}
segs = append(segs, seg)
}
return segs, Error.Wrap(rows.Err())
}
func (r *repairQueue) Count(ctx context.Context) (count int, err error) {
defer mon.Task()(&ctx)(&err)
// Count every segment regardless of how recently repair was last attempted
err = r.db.QueryRowContext(ctx, r.db.Rebind(`SELECT COUNT(*) as count FROM injuredsegments`)).Scan(&count)
return count, Error.Wrap(err)
}
// TestingSetAttemptedTime sets attempted time for a repairpath.
func (r *repairQueue) TestingSetAttemptedTime(ctx context.Context, repairpath []byte, t time.Time) (rowsAffected int64, err error) {
defer mon.Task()(&ctx)(&err)
res, err := r.db.ExecContext(ctx, r.db.Rebind(`UPDATE injuredsegments SET attempted = ? WHERE path = ?`), t, repairpath)
if err != nil {
return 0, Error.Wrap(err)
}
count, err := res.RowsAffected()
return count, Error.Wrap(err)
}