satellite/repair: improve contention for injuredsegments table on CRDB
We migrated satelliteDB off of Postgres and over to CockroachDB (crdb), but there was way too high contention for the injuredsegments table so we had to rollback to Postgres for the repair queue. A couple things contributed to this problem: 1) crdb doesn't support `FOR UPDATE SKIP LOCKED` 2) the original crdb Select query was doing 2 full table scans and not using any indexes 3) the SLC Satellite (where we were doing the migration) was running 48 repair worker processes, each of which run up to 5 goroutines which all are trying to select out of the repair queue and this was causing a ton of contention. The changes in this PR should help to reduce that contention and improve performance on CRDB. The changes include: 1) Use an update/set query instead of select/update to capitalize on the new `UPDATE` implicit row locking ability in CRDB. - Details: As of CRDB v20.2.2, there is implicit row locking with update/set queries (contention reduction and performance gains are described in this blog post: https://www.cockroachlabs.com/blog/when-and-why-to-use-select-for-update-in-cockroachdb/). 2) Remove the `ORDER BY` clause since this was causing a full table scan and also prevented the use of the row locking capability. - While long term it is very important to `ORDER BY segment_health`, the change here is only suppose to be a temporary bandaid to get us migrated over to CRDB quickly. Since segment_health has been set to infinity for some time now (re: https://review.dev.storj.io/c/storj/storj/+/3224), it seems like it might be ok to continue not making use of this for the short term. However, long term this needs to be fixed with a redesign of the repair workers, possible in the trusted delegated repair design (https://review.dev.storj.io/c/storj/storj/+/2602) or something similar to what is recommended here on how to implement a queue on CRDB https://dev.to/ajwerner/quick-and-easy-exactly-once-distributed-work-queues-using-serializable-transactions-jdp, or migrate to rabbit MQ priority queue or something similar.. This PRs improved query uses the index to avoid full scans and also locks the row its going to update and CRDB retries for us if there are any lock errors. Change-Id: Id29faad2186627872fbeb0f31536c4f55f860f23
This commit is contained in:
parent
c2a97aeb14
commit
0649d2b930
@ -11,10 +11,14 @@ import (
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/storj/private/dbutil/pgtest"
|
||||
"storj.io/storj/private/dbutil/tempdb"
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/internalpb"
|
||||
"storj.io/storj/satellite/satellitedb"
|
||||
"storj.io/storj/satellite/satellitedb/satellitedbtest"
|
||||
"storj.io/storj/storage"
|
||||
)
|
||||
@ -102,83 +106,98 @@ func TestOrder(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
// TestOrderHealthyPieces ensures that we select in the correct order, accounting for segment health as well as last attempted repair time.
|
||||
// TestOrderHealthyPieces ensures that we select in the correct order, accounting for segment health as well as last attempted repair time. We only test on Postgres since Cockraoch doesn't order by segment health due to performance.
|
||||
func TestOrderHealthyPieces(t *testing.T) {
|
||||
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
||||
repairQueue := db.RepairQueue()
|
||||
testorderHealthyPieces(t, pgtest.PickPostgres(t))
|
||||
}
|
||||
|
||||
// we insert (path, segmentHealth, lastAttempted) as follows:
|
||||
// ("path/a", 6, now-8h)
|
||||
// ("path/b", 7, now)
|
||||
// ("path/c", 8, null)
|
||||
// ("path/d", 9, null)
|
||||
// ("path/e", 9, now-7h)
|
||||
// ("path/f", 9, now-8h)
|
||||
// ("path/g", 10, null)
|
||||
// ("path/h", 10, now-8h)
|
||||
func testorderHealthyPieces(t *testing.T, connStr string) {
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
// insert the 8 segments according to the plan above
|
||||
injuredSegList := []struct {
|
||||
path []byte
|
||||
segmentHealth float64
|
||||
attempted time.Time
|
||||
}{
|
||||
{[]byte("path/a"), 6, time.Now().Add(-8 * time.Hour)},
|
||||
{[]byte("path/b"), 7, time.Now()},
|
||||
{[]byte("path/c"), 8, time.Time{}},
|
||||
{[]byte("path/d"), 9, time.Time{}},
|
||||
{[]byte("path/e"), 9, time.Now().Add(-7 * time.Hour)},
|
||||
{[]byte("path/f"), 9, time.Now().Add(-8 * time.Hour)},
|
||||
{[]byte("path/g"), 10, time.Time{}},
|
||||
{[]byte("path/h"), 10, time.Now().Add(-8 * time.Hour)},
|
||||
}
|
||||
// shuffle list since select order should not depend on insert order
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
rand.Shuffle(len(injuredSegList), func(i, j int) {
|
||||
injuredSegList[i], injuredSegList[j] = injuredSegList[j], injuredSegList[i]
|
||||
})
|
||||
for _, item := range injuredSegList {
|
||||
// first, insert the injured segment
|
||||
injuredSeg := &internalpb.InjuredSegment{Path: item.path}
|
||||
alreadyInserted, err := repairQueue.Insert(ctx, injuredSeg, item.segmentHealth)
|
||||
require.NoError(t, err)
|
||||
require.False(t, alreadyInserted)
|
||||
// create tempDB
|
||||
tempDB, err := tempdb.OpenUnique(ctx, connStr, "orderhealthy")
|
||||
require.NoError(t, err)
|
||||
defer func() { require.NoError(t, tempDB.Close()) }()
|
||||
|
||||
// next, if applicable, update the "attempted at" timestamp
|
||||
if !item.attempted.IsZero() {
|
||||
rowsAffected, err := db.RepairQueue().TestingSetAttemptedTime(ctx, item.path, item.attempted)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 1, rowsAffected)
|
||||
}
|
||||
}
|
||||
// create a new satellitedb connection
|
||||
db, err := satellitedb.Open(ctx, zaptest.NewLogger(t), tempDB.ConnStr, satellitedb.Options{ApplicationName: "satellite-repair-test"})
|
||||
require.NoError(t, err)
|
||||
defer func() { require.NoError(t, db.Close()) }()
|
||||
require.NoError(t, db.MigrateToLatest(ctx))
|
||||
|
||||
// we expect segment health to be prioritized first
|
||||
// if segment health is equal, we expect the least recently attempted, with nulls first, to be prioritized first
|
||||
// (excluding segments that have been attempted in the past six hours)
|
||||
// we do not expect to see segments that have been attempted in the past hour
|
||||
// therefore, the order of selection should be:
|
||||
// "path/a", "path/c", "path/d", "path/f", "path/e", "path/g", "path/h"
|
||||
// "path/b" will not be selected because it was attempted recently
|
||||
repairQueue := db.RepairQueue()
|
||||
// we insert (path, segmentHealth, lastAttempted) as follows:
|
||||
// ("path/a", 6, now-8h)
|
||||
// ("path/b", 7, now)
|
||||
// ("path/c", 8, null)
|
||||
// ("path/d", 9, null)
|
||||
// ("path/e", 9, now-7h)
|
||||
// ("path/f", 9, now-8h)
|
||||
// ("path/g", 10, null)
|
||||
// ("path/h", 10, now-8h)
|
||||
|
||||
for _, nextPath := range []string{
|
||||
"path/a",
|
||||
"path/c",
|
||||
"path/d",
|
||||
"path/f",
|
||||
"path/e",
|
||||
"path/g",
|
||||
"path/h",
|
||||
} {
|
||||
injuredSeg, err := repairQueue.Select(ctx)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, nextPath, string(injuredSeg.Path))
|
||||
}
|
||||
|
||||
// queue should be considered "empty" now
|
||||
injuredSeg, err := repairQueue.Select(ctx)
|
||||
assert.True(t, storage.ErrEmptyQueue.Has(err))
|
||||
assert.Nil(t, injuredSeg)
|
||||
// insert the 8 segments according to the plan above
|
||||
injuredSegList := []struct {
|
||||
path []byte
|
||||
segmentHealth float64
|
||||
attempted time.Time
|
||||
}{
|
||||
{[]byte("path/a"), 6, time.Now().Add(-8 * time.Hour)},
|
||||
{[]byte("path/b"), 7, time.Now()},
|
||||
{[]byte("path/c"), 8, time.Time{}},
|
||||
{[]byte("path/d"), 9, time.Time{}},
|
||||
{[]byte("path/e"), 9, time.Now().Add(-7 * time.Hour)},
|
||||
{[]byte("path/f"), 9, time.Now().Add(-8 * time.Hour)},
|
||||
{[]byte("path/g"), 10, time.Time{}},
|
||||
{[]byte("path/h"), 10, time.Now().Add(-8 * time.Hour)},
|
||||
}
|
||||
// shuffle list since select order should not depend on insert order
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
rand.Shuffle(len(injuredSegList), func(i, j int) {
|
||||
injuredSegList[i], injuredSegList[j] = injuredSegList[j], injuredSegList[i]
|
||||
})
|
||||
for _, item := range injuredSegList {
|
||||
// first, insert the injured segment
|
||||
injuredSeg := &internalpb.InjuredSegment{Path: item.path}
|
||||
alreadyInserted, err := repairQueue.Insert(ctx, injuredSeg, item.segmentHealth)
|
||||
require.NoError(t, err)
|
||||
require.False(t, alreadyInserted)
|
||||
|
||||
// next, if applicable, update the "attempted at" timestamp
|
||||
if !item.attempted.IsZero() {
|
||||
rowsAffected, err := db.RepairQueue().TestingSetAttemptedTime(ctx, item.path, item.attempted)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 1, rowsAffected)
|
||||
}
|
||||
}
|
||||
|
||||
// we expect segment health to be prioritized first
|
||||
// if segment health is equal, we expect the least recently attempted, with nulls first, to be prioritized first
|
||||
// (excluding segments that have been attempted in the past six hours)
|
||||
// we do not expect to see segments that have been attempted in the past hour
|
||||
// therefore, the order of selection should be:
|
||||
// "path/a", "path/c", "path/d", "path/f", "path/e", "path/g", "path/h"
|
||||
// "path/b" will not be selected because it was attempted recently
|
||||
|
||||
for _, nextPath := range []string{
|
||||
"path/a",
|
||||
"path/c",
|
||||
"path/d",
|
||||
"path/f",
|
||||
"path/e",
|
||||
"path/g",
|
||||
"path/h",
|
||||
} {
|
||||
injuredSeg, err := repairQueue.Select(ctx)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, nextPath, string(injuredSeg.Path))
|
||||
}
|
||||
|
||||
// queue should be considered "empty" now
|
||||
injuredSeg, err := repairQueue.Select(ctx)
|
||||
assert.True(t, storage.ErrEmptyQueue.Has(err))
|
||||
assert.Nil(t, injuredSeg)
|
||||
}
|
||||
|
||||
// TestOrderOverwrite ensures that re-inserting the same segment with a lower health, will properly adjust its prioritizationTestOrderOverwrite ensures that re-inserting the same segment with a lower health, will properly adjust its prioritization.
|
||||
|
@ -79,14 +79,14 @@ func (r *repairQueue) Insert(ctx context.Context, seg *internalpb.InjuredSegment
|
||||
|
||||
func (r *repairQueue) Select(ctx context.Context) (seg *internalpb.InjuredSegment, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
switch r.db.implementation {
|
||||
case dbutil.Cockroach:
|
||||
err = r.db.QueryRowContext(ctx, `
|
||||
UPDATE injuredsegments SET attempted = now() WHERE path = (
|
||||
SELECT path FROM injuredsegments
|
||||
WHERE attempted IS NULL OR attempted < now() - interval '6 hours'
|
||||
ORDER BY segment_health ASC, attempted LIMIT 1
|
||||
) RETURNING data`).Scan(&seg)
|
||||
UPDATE injuredsegments SET attempted = now()
|
||||
WHERE attempted IS NULL OR attempted < now() - interval '6 hours'
|
||||
LIMIT 1
|
||||
RETURNING data`).Scan(&seg)
|
||||
case dbutil.Postgres:
|
||||
err = r.db.QueryRowContext(ctx, `
|
||||
UPDATE injuredsegments SET attempted = now() WHERE path = (
|
||||
|
Loading…
Reference in New Issue
Block a user