storj/satellite/satellitedb/repairqueue.go

295 lines
8.9 KiB
Go
Raw Permalink Normal View History

2019-01-24 20:15:10 +00:00
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package satellitedb
import (
"context"
"database/sql"
"errors"
"time"
"github.com/zeebo/errs"
"storj.io/common/uuid"
"storj.io/private/dbutil"
"storj.io/private/dbutil/pgutil"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/repair/queue"
"storj.io/storj/satellite/satellitedb/dbx"
)
// RepairQueueSelectLimit defines how many items can be selected at the same time.
const RepairQueueSelectLimit = 1000
// repairQueue implements storj.io/storj/satellite/repair/queue.RepairQueue.
type repairQueue struct {
satellite/satellitedb: unexport satellitedb.DB Backstory: I needed a better way to pass around information about the underlying driver and implementation to all the various db-using things in satellitedb (at least until some new "cockroach driver" support makes it to DBX). After hitting a few dead ends, I decided I wanted to have a type that could act like a *dbx.DB but which would also carry information about the implementation, etc. Then I could pass around that type to all the things in satellitedb that previously wanted *dbx.DB. But then I realized that *satellitedb.DB was, essentially, exactly that already. One thing that might have kept *satellitedb.DB from being directly usable was that embedding a *dbx.DB inside it would make a lot of dbx methods publicly available on a *satellitedb.DB instance that previously were nicely encapsulated and hidden. But after a quick look, I realized that _nothing_ outside of satellite/satellitedb even needs to use satellitedb.DB at all. It didn't even need to be exported, except for some trivially-replaceable code in migrate_postgres_test.go. And once I made it unexported, any concerns about exposing new methods on it were entirely moot. So I have here changed the exported *satellitedb.DB type into the unexported *satellitedb.satelliteDB type, and I have changed all the places here that wanted raw dbx.DB handles to use this new type instead. Now they can just take a gander at the implementation member on it and know all they need to know about the underlying database. This will make it possible for some other pending code here to differentiate between postgres and cockroach backends. Change-Id: I27af99f8ae23b50782333da5277b553b34634edc
2019-12-14 02:29:54 +00:00
db *satelliteDB
}
func (r *repairQueue) Insert(ctx context.Context, seg *queue.InjuredSegment) (alreadyInserted bool, err error) {
defer mon.Task()(&ctx)(&err)
// insert if not exists, or update healthy count if does exist
var query string
// we want to insert the segment if it is not in the queue, but update the segment health if it already is in the queue
// we also want to know if the result was an insert or an update - this is the reasoning for the xmax section of the postgres query
// and the separate cockroach query (which the xmax trick does not work for)
switch r.db.impl {
case dbutil.Postgres:
query = `
INSERT INTO repair_queue
(
stream_id, position, segment_health
)
VALUES (
$1, $2, $3
)
ON CONFLICT (stream_id, position)
DO UPDATE
SET segment_health=$3, updated_at=current_timestamp
RETURNING (xmax != 0) AS alreadyInserted
`
case dbutil.Cockroach:
// TODO it's not optimal solution but crdb is not used in prod for repair queue
query = `
WITH inserted AS (
SELECT count(*) as alreadyInserted FROM repair_queue
WHERE stream_id = $1 AND position = $2
)
INSERT INTO repair_queue
(
stream_id, position, segment_health
)
VALUES (
$1, $2, $3
)
ON CONFLICT (stream_id, position)
DO UPDATE
SET segment_health=$3, updated_at=current_timestamp
RETURNING (SELECT alreadyInserted FROM inserted)
`
}
rows, err := r.db.QueryContext(ctx, query, seg.StreamID, seg.Position.Encode(), seg.SegmentHealth)
if err != nil {
return false, err
}
defer func() { err = errs.Combine(err, rows.Close()) }()
if !rows.Next() {
// cockroach query does not return anything if the segment is already in the queue
alreadyInserted = true
} else {
err = rows.Scan(&alreadyInserted)
if err != nil {
return false, err
}
}
return alreadyInserted, rows.Err()
}
func (r *repairQueue) InsertBatch(
ctx context.Context,
segments []*queue.InjuredSegment,
) (newlyInsertedSegments []*queue.InjuredSegment, err error) {
defer mon.Task()(&ctx)(&err)
if len(segments) == 0 {
return nil, nil
}
// insert if not exists, or update healthy count if does exist
var query string
// we want to insert the segment if it is not in the queue, but update the segment health if it already is in the queue
// we also want to know if the result was an insert or an update - this is the reasoning for the xmax section of the postgres query
// and the separate cockroach query (which the xmax trick does not work for)
switch r.db.impl {
case dbutil.Postgres:
query = `
INSERT INTO repair_queue
(
stream_id, position, segment_health
)
VALUES (
UNNEST($1::BYTEA[]),
UNNEST($2::INT8[]),
UNNEST($3::double precision[])
)
ON CONFLICT (stream_id, position)
DO UPDATE
SET segment_health=EXCLUDED.segment_health, updated_at=current_timestamp
RETURNING NOT(xmax != 0) AS newlyInserted
`
case dbutil.Cockroach:
// TODO it's not optimal solution but crdb is not used in prod for repair queue
query = `
WITH to_insert AS (
SELECT
UNNEST($1::BYTEA[]) AS stream_id,
UNNEST($2::INT8[]) AS position,
UNNEST($3::double precision[]) AS segment_health
),
do_insert AS (
INSERT INTO repair_queue (
stream_id, position, segment_health
)
SELECT stream_id, position, segment_health
FROM to_insert
ON CONFLICT (stream_id, position)
DO UPDATE
SET
segment_health=EXCLUDED.segment_health,
updated_at=current_timestamp
RETURNING false
)
SELECT
(repair_queue.stream_id IS NULL) AS newlyInserted
FROM to_insert
LEFT JOIN repair_queue
ON to_insert.stream_id = repair_queue.stream_id
AND to_insert.position = repair_queue.position
`
}
var insertData struct {
StreamIDs []uuid.UUID
Positions []int64
SegmentHealths []float64
}
for _, segment := range segments {
insertData.StreamIDs = append(insertData.StreamIDs, segment.StreamID)
insertData.Positions = append(insertData.Positions, int64(segment.Position.Encode()))
insertData.SegmentHealths = append(insertData.SegmentHealths, segment.SegmentHealth)
}
rows, err := r.db.QueryContext(
ctx, query,
pgutil.UUIDArray(insertData.StreamIDs),
pgutil.Int8Array(insertData.Positions),
pgutil.Float8Array(insertData.SegmentHealths),
)
if err != nil {
return newlyInsertedSegments, err
}
defer func() { err = errs.Combine(err, rows.Close()) }()
i := 0
for rows.Next() {
var isNewlyInserted bool
err = rows.Scan(&isNewlyInserted)
if err != nil {
return newlyInsertedSegments, err
}
if isNewlyInserted {
newlyInsertedSegments = append(newlyInsertedSegments, segments[i])
}
i++
}
return newlyInsertedSegments, rows.Err()
}
func (r *repairQueue) Select(ctx context.Context) (seg *queue.InjuredSegment, err error) {
defer mon.Task()(&ctx)(&err)
satellite/repair: improve contention for injuredsegments table on CRDB We migrated satelliteDB off of Postgres and over to CockroachDB (crdb), but there was way too high contention for the injuredsegments table so we had to rollback to Postgres for the repair queue. A couple things contributed to this problem: 1) crdb doesn't support `FOR UPDATE SKIP LOCKED` 2) the original crdb Select query was doing 2 full table scans and not using any indexes 3) the SLC Satellite (where we were doing the migration) was running 48 repair worker processes, each of which run up to 5 goroutines which all are trying to select out of the repair queue and this was causing a ton of contention. The changes in this PR should help to reduce that contention and improve performance on CRDB. The changes include: 1) Use an update/set query instead of select/update to capitalize on the new `UPDATE` implicit row locking ability in CRDB. - Details: As of CRDB v20.2.2, there is implicit row locking with update/set queries (contention reduction and performance gains are described in this blog post: https://www.cockroachlabs.com/blog/when-and-why-to-use-select-for-update-in-cockroachdb/). 2) Remove the `ORDER BY` clause since this was causing a full table scan and also prevented the use of the row locking capability. - While long term it is very important to `ORDER BY segment_health`, the change here is only suppose to be a temporary bandaid to get us migrated over to CRDB quickly. Since segment_health has been set to infinity for some time now (re: https://review.dev.storj.io/c/storj/storj/+/3224), it seems like it might be ok to continue not making use of this for the short term. However, long term this needs to be fixed with a redesign of the repair workers, possible in the trusted delegated repair design (https://review.dev.storj.io/c/storj/storj/+/2602) or something similar to what is recommended here on how to implement a queue on CRDB https://dev.to/ajwerner/quick-and-easy-exactly-once-distributed-work-queues-using-serializable-transactions-jdp, or migrate to rabbit MQ priority queue or something similar.. This PRs improved query uses the index to avoid full scans and also locks the row its going to update and CRDB retries for us if there are any lock errors. Change-Id: Id29faad2186627872fbeb0f31536c4f55f860f23
2020-12-02 15:45:33 +00:00
segment := queue.InjuredSegment{}
switch r.db.impl {
case dbutil.Cockroach:
err = r.db.QueryRowContext(ctx, `
UPDATE repair_queue SET attempted_at = now()
WHERE attempted_at IS NULL OR attempted_at < now() - interval '6 hours'
ORDER BY segment_health ASC, attempted_at NULLS FIRST
satellite/repair: improve contention for injuredsegments table on CRDB We migrated satelliteDB off of Postgres and over to CockroachDB (crdb), but there was way too high contention for the injuredsegments table so we had to rollback to Postgres for the repair queue. A couple things contributed to this problem: 1) crdb doesn't support `FOR UPDATE SKIP LOCKED` 2) the original crdb Select query was doing 2 full table scans and not using any indexes 3) the SLC Satellite (where we were doing the migration) was running 48 repair worker processes, each of which run up to 5 goroutines which all are trying to select out of the repair queue and this was causing a ton of contention. The changes in this PR should help to reduce that contention and improve performance on CRDB. The changes include: 1) Use an update/set query instead of select/update to capitalize on the new `UPDATE` implicit row locking ability in CRDB. - Details: As of CRDB v20.2.2, there is implicit row locking with update/set queries (contention reduction and performance gains are described in this blog post: https://www.cockroachlabs.com/blog/when-and-why-to-use-select-for-update-in-cockroachdb/). 2) Remove the `ORDER BY` clause since this was causing a full table scan and also prevented the use of the row locking capability. - While long term it is very important to `ORDER BY segment_health`, the change here is only suppose to be a temporary bandaid to get us migrated over to CRDB quickly. Since segment_health has been set to infinity for some time now (re: https://review.dev.storj.io/c/storj/storj/+/3224), it seems like it might be ok to continue not making use of this for the short term. However, long term this needs to be fixed with a redesign of the repair workers, possible in the trusted delegated repair design (https://review.dev.storj.io/c/storj/storj/+/2602) or something similar to what is recommended here on how to implement a queue on CRDB https://dev.to/ajwerner/quick-and-easy-exactly-once-distributed-work-queues-using-serializable-transactions-jdp, or migrate to rabbit MQ priority queue or something similar.. This PRs improved query uses the index to avoid full scans and also locks the row its going to update and CRDB retries for us if there are any lock errors. Change-Id: Id29faad2186627872fbeb0f31536c4f55f860f23
2020-12-02 15:45:33 +00:00
LIMIT 1
RETURNING stream_id, position, attempted_at, updated_at, inserted_at, segment_health
`).Scan(&segment.StreamID, &segment.Position, &segment.AttemptedAt,
&segment.UpdatedAt, &segment.InsertedAt, &segment.SegmentHealth)
case dbutil.Postgres:
err = r.db.QueryRowContext(ctx, `
UPDATE repair_queue SET attempted_at = now() WHERE (stream_id, position) = (
SELECT stream_id, position FROM repair_queue
WHERE attempted_at IS NULL OR attempted_at < now() - interval '6 hours'
ORDER BY segment_health ASC, attempted_at NULLS FIRST FOR UPDATE SKIP LOCKED LIMIT 1
) RETURNING stream_id, position, attempted_at, updated_at, inserted_at, segment_health
`).Scan(&segment.StreamID, &segment.Position, &segment.AttemptedAt,
&segment.UpdatedAt, &segment.InsertedAt, &segment.SegmentHealth)
default:
return seg, errs.New("unhandled database: %v", r.db.impl)
}
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, queue.ErrEmpty.New("")
}
return nil, err
}
return &segment, err
}
func (r *repairQueue) Delete(ctx context.Context, seg *queue.InjuredSegment) (err error) {
defer mon.Task()(&ctx)(&err)
_, err = r.db.ExecContext(ctx, r.db.Rebind(`DELETE FROM repair_queue WHERE stream_id = ? AND position = ?`), seg.StreamID, seg.Position.Encode())
[v3 2137] - Add more info to find out repair failures (#2623) * pkg/datarepair/repairer: Track always time for repair Make a minor change in the worker function of the repairer, that when successful, always track the metric time for repair independently if the time since checker queue metric can be tracked. * storage/postgreskv: Wrap error in Get func Wrap the returned error of the Get function as it is done when the query doesn't return any row. * satellite/metainfo: Move debug msg to the right place NewStore function was writing a debug log message when the DB was connected, however it was always writing it out despite if an error happened when getting the connection. * pkg/datarepair/repairer: Wrap error before logging it Wrap the error returned by process which is executed by the Run method of the repairer service to add context to the error log message. * pkg/datarepair/repairer: Make errors more specific in worker Make the error messages of the "worker" method of the Service more specific and the logged message for such errors. * pkg/storage/repair: Improve error reporting Repair In order of improving the error reporting by the pkg/storage/repair.Repair method, several errors of this method and functions/methods which this one relies one have been updated to be wrapper into their corresponding classes. * pkg/storage/segments: Track path param of Repair method Track in monkit the path parameter passed to the Repair method. * satellite/satellitedb: Wrap Error returned by Delete Wrap the error returned by repairQueue.Delete method to enhance the error with a class and stack and the pkg/storage/segments.Repairer.Repair method get a more contextualized error from it.
2019-07-23 15:28:06 +01:00
return Error.Wrap(err)
}
func (r *repairQueue) Clean(ctx context.Context, before time.Time) (deleted int64, err error) {
defer mon.Task()(&ctx)(&err)
n, err := r.db.Delete_RepairQueue_By_UpdatedAt_Less(ctx, dbx.RepairQueue_UpdatedAt(before))
return n, Error.Wrap(err)
}
func (r *repairQueue) SelectN(ctx context.Context, limit int) (segs []queue.InjuredSegment, err error) {
defer mon.Task()(&ctx)(&err)
if limit <= 0 || limit > RepairQueueSelectLimit {
limit = RepairQueueSelectLimit
}
// TODO: strictly enforce order-by or change tests
rows, err := r.db.QueryContext(ctx,
r.db.Rebind(`SELECT stream_id, position, attempted_at, updated_at, segment_health
FROM repair_queue LIMIT ?`), limit,
)
2019-07-25 16:01:44 +01:00
if err != nil {
return nil, Error.Wrap(err)
}
defer func() { err = errs.Combine(err, rows.Close()) }()
for rows.Next() {
var seg queue.InjuredSegment
err = rows.Scan(&seg.StreamID, &seg.Position, &seg.AttemptedAt,
&seg.UpdatedAt, &seg.SegmentHealth)
if err != nil {
2019-07-25 16:01:44 +01:00
return segs, Error.Wrap(err)
}
segs = append(segs, seg)
}
2019-07-25 16:01:44 +01:00
return segs, Error.Wrap(rows.Err())
}
func (r *repairQueue) Count(ctx context.Context) (count int, err error) {
defer mon.Task()(&ctx)(&err)
// Count every segment regardless of how recently repair was last attempted
err = r.db.QueryRowContext(ctx, r.db.Rebind(`SELECT COUNT(*) as count FROM repair_queue`)).Scan(&count)
return count, Error.Wrap(err)
}
// TestingSetAttemptedTime sets attempted time for a segment.
func (r *repairQueue) TestingSetAttemptedTime(ctx context.Context, streamID uuid.UUID,
position metabase.SegmentPosition, t time.Time) (rowsAffected int64, err error) {
defer mon.Task()(&ctx)(&err)
res, err := r.db.ExecContext(ctx,
r.db.Rebind(`UPDATE repair_queue SET attempted_at = ? WHERE stream_id = ? AND position = ?`),
t, streamID, position.Encode(),
)
if err != nil {
return 0, Error.Wrap(err)
}
count, err := res.RowsAffected()
return count, Error.Wrap(err)
}