satellite/satellitedb: write/read placement information to/from repairqueue

Change-Id: Ie58f129feae7898850905940f94643605dcf56ae
This commit is contained in:
Márton Elek 2023-09-18 13:58:33 +02:00 committed by Storj Robot
parent 7d0b8f6f8c
commit 18d5caad7e
5 changed files with 130 additions and 20 deletions

View File

@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/require"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/satellite"
@ -99,12 +100,14 @@ func TestInsertBufferTwoUniqueObjects(t *testing.T) {
}
func createInjuredSegment() *queue.InjuredSegment {
index := uint32(testrand.Intn(1000))
return &queue.InjuredSegment{
StreamID: testrand.UUID(),
Position: metabase.SegmentPosition{
Part: uint32(testrand.Intn(1000)),
Index: uint32(testrand.Intn(1000)),
Index: index,
},
SegmentHealth: 10,
Placement: storj.PlacementConstraint(index % 3),
}
}

View File

@ -7,6 +7,7 @@ import (
"context"
"time"
"storj.io/common/storj"
"storj.io/common/uuid"
"storj.io/storj/satellite/metabase"
)
@ -21,6 +22,8 @@ type InjuredSegment struct {
AttemptedAt *time.Time
UpdatedAt time.Time
InsertedAt time.Time
Placement storj.PlacementConstraint
}
// RepairQueue implements queueing for segments that need repairing.

View File

@ -77,6 +77,7 @@ func TestInsertBatchOfOne(t *testing.T) {
require.Equal(t, writeSegments[0].StreamID, readSegments[0].StreamID)
require.Equal(t, writeSegments[0].Position, readSegments[0].Position)
require.Equal(t, writeSegments[0].SegmentHealth, readSegments[0].SegmentHealth)
require.Equal(t, writeSegments[0].Placement, readSegments[0].Placement)
})
}

View File

@ -40,14 +40,14 @@ func (r *repairQueue) Insert(ctx context.Context, seg *queue.InjuredSegment) (al
query = `
INSERT INTO repair_queue
(
stream_id, position, segment_health
stream_id, position, segment_health, placement
)
VALUES (
$1, $2, $3
$1, $2, $3, $4
)
ON CONFLICT (stream_id, position)
DO UPDATE
SET segment_health=$3, updated_at=current_timestamp
SET segment_health=$3, updated_at=current_timestamp, placement=$4
RETURNING (xmax != 0) AS alreadyInserted
`
case dbutil.Cockroach:
@ -59,18 +59,18 @@ func (r *repairQueue) Insert(ctx context.Context, seg *queue.InjuredSegment) (al
)
INSERT INTO repair_queue
(
stream_id, position, segment_health
stream_id, position, segment_health, placement
)
VALUES (
$1, $2, $3
$1, $2, $3, $4
)
ON CONFLICT (stream_id, position)
DO UPDATE
SET segment_health=$3, updated_at=current_timestamp
SET segment_health=$3, updated_at=current_timestamp, placement=$4
RETURNING (SELECT alreadyInserted FROM inserted)
`
}
rows, err := r.db.QueryContext(ctx, query, seg.StreamID, seg.Position.Encode(), seg.SegmentHealth)
rows, err := r.db.QueryContext(ctx, query, seg.StreamID, seg.Position.Encode(), seg.SegmentHealth, seg.Placement)
if err != nil {
return false, err
}
@ -108,12 +108,13 @@ func (r *repairQueue) InsertBatch(
query = `
INSERT INTO repair_queue
(
stream_id, position, segment_health
stream_id, position, segment_health, placement
)
VALUES (
UNNEST($1::BYTEA[]),
UNNEST($2::INT8[]),
UNNEST($3::double precision[])
UNNEST($3::double precision[]),
UNNEST($4::INT2[])
)
ON CONFLICT (stream_id, position)
DO UPDATE
@ -127,19 +128,21 @@ func (r *repairQueue) InsertBatch(
SELECT
UNNEST($1::BYTEA[]) AS stream_id,
UNNEST($2::INT8[]) AS position,
UNNEST($3::double precision[]) AS segment_health
UNNEST($3::double precision[]) AS segment_health,
UNNEST($4::INT2[]) AS placement
),
do_insert AS (
INSERT INTO repair_queue (
stream_id, position, segment_health
stream_id, position, segment_health, placement
)
SELECT stream_id, position, segment_health
SELECT stream_id, position, segment_health, placement
FROM to_insert
ON CONFLICT (stream_id, position)
DO UPDATE
SET
segment_health=EXCLUDED.segment_health,
updated_at=current_timestamp
updated_at=current_timestamp,
placement=EXCLUDED.placement
RETURNING false
)
SELECT
@ -155,12 +158,14 @@ func (r *repairQueue) InsertBatch(
StreamIDs []uuid.UUID
Positions []int64
SegmentHealths []float64
placements []int16
}
for _, segment := range segments {
insertData.StreamIDs = append(insertData.StreamIDs, segment.StreamID)
insertData.Positions = append(insertData.Positions, int64(segment.Position.Encode()))
insertData.SegmentHealths = append(insertData.SegmentHealths, segment.SegmentHealth)
insertData.placements = append(insertData.placements, int16(segment.Placement))
}
rows, err := r.db.QueryContext(
@ -168,6 +173,7 @@ func (r *repairQueue) InsertBatch(
pgutil.UUIDArray(insertData.StreamIDs),
pgutil.Int8Array(insertData.Positions),
pgutil.Float8Array(insertData.SegmentHealths),
pgutil.Int2Array(insertData.placements),
)
if err != nil {
@ -204,18 +210,18 @@ func (r *repairQueue) Select(ctx context.Context) (seg *queue.InjuredSegment, er
WHERE attempted_at IS NULL OR attempted_at < now() - interval '6 hours'
ORDER BY segment_health ASC, attempted_at NULLS FIRST
LIMIT 1
RETURNING stream_id, position, attempted_at, updated_at, inserted_at, segment_health
RETURNING stream_id, position, attempted_at, updated_at, inserted_at, segment_health, placement
`).Scan(&segment.StreamID, &segment.Position, &segment.AttemptedAt,
&segment.UpdatedAt, &segment.InsertedAt, &segment.SegmentHealth)
&segment.UpdatedAt, &segment.InsertedAt, &segment.SegmentHealth, &segment.Placement)
case dbutil.Postgres:
err = r.db.QueryRowContext(ctx, `
UPDATE repair_queue SET attempted_at = now() WHERE (stream_id, position) = (
SELECT stream_id, position FROM repair_queue
WHERE attempted_at IS NULL OR attempted_at < now() - interval '6 hours'
ORDER BY segment_health ASC, attempted_at NULLS FIRST FOR UPDATE SKIP LOCKED LIMIT 1
) RETURNING stream_id, position, attempted_at, updated_at, inserted_at, segment_health
) RETURNING stream_id, position, attempted_at, updated_at, inserted_at, segment_health, placement
`).Scan(&segment.StreamID, &segment.Position, &segment.AttemptedAt,
&segment.UpdatedAt, &segment.InsertedAt, &segment.SegmentHealth)
&segment.UpdatedAt, &segment.InsertedAt, &segment.SegmentHealth, &segment.Placement)
default:
return seg, errs.New("unhandled database: %v", r.db.impl)
}
@ -247,7 +253,7 @@ func (r *repairQueue) SelectN(ctx context.Context, limit int) (segs []queue.Inju
}
// TODO: strictly enforce order-by or change tests
rows, err := r.db.QueryContext(ctx,
r.db.Rebind(`SELECT stream_id, position, attempted_at, updated_at, segment_health
r.db.Rebind(`SELECT stream_id, position, attempted_at, updated_at, segment_health, placement
FROM repair_queue LIMIT ?`), limit,
)
if err != nil {
@ -258,7 +264,7 @@ func (r *repairQueue) SelectN(ctx context.Context, limit int) (segs []queue.Inju
for rows.Next() {
var seg queue.InjuredSegment
err = rows.Scan(&seg.StreamID, &seg.Position, &seg.AttemptedAt,
&seg.UpdatedAt, &seg.SegmentHealth)
&seg.UpdatedAt, &seg.SegmentHealth, &seg.Placement)
if err != nil {
return segs, Error.Wrap(err)
}

View File

@ -0,0 +1,97 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package satellitedb_test
import (
"sort"
"testing"
"github.com/stretchr/testify/require"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/satellite"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/repair/queue"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
)
func TestRepairQueue(t *testing.T) {
testSegments := make([]*queue.InjuredSegment, 3)
for i := 0; i < len(testSegments); i++ {
testSegments[i] = &queue.InjuredSegment{
StreamID: testrand.UUID(),
Position: metabase.SegmentPosition{
Part: uint32(i),
Index: 2,
},
SegmentHealth: 10,
Placement: storj.PlacementConstraint(i),
}
}
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
rq := db.RepairQueue()
alreadyInserted, err := rq.Insert(ctx, testSegments[0])
require.NoError(t, err)
require.False(t, alreadyInserted)
sn, err := rq.SelectN(ctx, 1)
require.NoError(t, err)
require.Equal(t, testSegments[0].StreamID, sn[0].StreamID)
require.Equal(t, testSegments[0].Placement, sn[0].Placement)
require.Equal(t, testSegments[0].Position, sn[0].Position)
require.Equal(t, testSegments[0].SegmentHealth, sn[0].SegmentHealth)
// upsert
alreadyInserted, err = rq.Insert(ctx, &queue.InjuredSegment{
StreamID: testSegments[0].StreamID,
Position: testSegments[0].Position,
SegmentHealth: 12,
Placement: storj.PlacementConstraint(99),
})
require.NoError(t, err)
require.True(t, alreadyInserted)
rs1, err := rq.Select(ctx)
require.NoError(t, err)
require.Equal(t, testSegments[0].StreamID, rs1.StreamID)
require.Equal(t, storj.PlacementConstraint(99), rs1.Placement)
require.Equal(t, testSegments[0].Position, rs1.Position)
require.Equal(t, float64(12), rs1.SegmentHealth)
// empty queue (one record, but that's already attempted)
_, err = rq.Select(ctx)
require.Error(t, err)
// make sure it's really empty
err = rq.Delete(ctx, testSegments[0])
require.NoError(t, err)
// insert 2 new
newlyInserted, err := rq.InsertBatch(ctx, []*queue.InjuredSegment{
testSegments[1], testSegments[2],
})
require.NoError(t, err)
require.Len(t, newlyInserted, 2)
// select2 (including attempted)
segments, err := rq.SelectN(ctx, 2)
require.NoError(t, err)
sort.Slice(segments, func(i, j int) bool {
return segments[i].Position.Part < segments[j].Position.Part
})
for i := 0; i < 2; i++ {
require.Equal(t, testSegments[i+1].StreamID, segments[i].StreamID)
require.Equal(t, testSegments[i+1].Placement, segments[i].Placement)
require.Equal(t, testSegments[i+1].Position, segments[i].Position)
require.Equal(t, testSegments[i+1].SegmentHealth, segments[i].SegmentHealth)
}
})
}