From 18d5caad7eab0352e6ec762261ae04ddb319385e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rton=20Elek?= Date: Mon, 18 Sep 2023 13:58:33 +0200 Subject: [PATCH] satellite/satellitedb: write/read placement information to/from repairqueue Change-Id: Ie58f129feae7898850905940f94643605dcf56ae --- satellite/repair/queue/insertbuffer_test.go | 5 +- satellite/repair/queue/queue.go | 3 + satellite/repair/queue/queue_test.go | 1 + satellite/satellitedb/repairqueue.go | 44 ++++++---- satellite/satellitedb/repairqueue_test.go | 97 +++++++++++++++++++++ 5 files changed, 130 insertions(+), 20 deletions(-) create mode 100644 satellite/satellitedb/repairqueue_test.go diff --git a/satellite/repair/queue/insertbuffer_test.go b/satellite/repair/queue/insertbuffer_test.go index 8303e8f58..c436afa27 100644 --- a/satellite/repair/queue/insertbuffer_test.go +++ b/satellite/repair/queue/insertbuffer_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/require" + "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/common/testrand" "storj.io/storj/satellite" @@ -99,12 +100,14 @@ func TestInsertBufferTwoUniqueObjects(t *testing.T) { } func createInjuredSegment() *queue.InjuredSegment { + index := uint32(testrand.Intn(1000)) return &queue.InjuredSegment{ StreamID: testrand.UUID(), Position: metabase.SegmentPosition{ Part: uint32(testrand.Intn(1000)), - Index: uint32(testrand.Intn(1000)), + Index: index, }, SegmentHealth: 10, + Placement: storj.PlacementConstraint(index % 3), } } diff --git a/satellite/repair/queue/queue.go b/satellite/repair/queue/queue.go index 8d1901c38..369e9134a 100644 --- a/satellite/repair/queue/queue.go +++ b/satellite/repair/queue/queue.go @@ -7,6 +7,7 @@ import ( "context" "time" + "storj.io/common/storj" "storj.io/common/uuid" "storj.io/storj/satellite/metabase" ) @@ -21,6 +22,8 @@ type InjuredSegment struct { AttemptedAt *time.Time UpdatedAt time.Time InsertedAt time.Time + + Placement storj.PlacementConstraint } // RepairQueue implements queueing for segments that need repairing. diff --git a/satellite/repair/queue/queue_test.go b/satellite/repair/queue/queue_test.go index d2476eb92..d499c89d9 100644 --- a/satellite/repair/queue/queue_test.go +++ b/satellite/repair/queue/queue_test.go @@ -77,6 +77,7 @@ func TestInsertBatchOfOne(t *testing.T) { require.Equal(t, writeSegments[0].StreamID, readSegments[0].StreamID) require.Equal(t, writeSegments[0].Position, readSegments[0].Position) require.Equal(t, writeSegments[0].SegmentHealth, readSegments[0].SegmentHealth) + require.Equal(t, writeSegments[0].Placement, readSegments[0].Placement) }) } diff --git a/satellite/satellitedb/repairqueue.go b/satellite/satellitedb/repairqueue.go index 580bc1127..dd45e7fd1 100644 --- a/satellite/satellitedb/repairqueue.go +++ b/satellite/satellitedb/repairqueue.go @@ -40,14 +40,14 @@ func (r *repairQueue) Insert(ctx context.Context, seg *queue.InjuredSegment) (al query = ` INSERT INTO repair_queue ( - stream_id, position, segment_health + stream_id, position, segment_health, placement ) VALUES ( - $1, $2, $3 + $1, $2, $3, $4 ) ON CONFLICT (stream_id, position) DO UPDATE - SET segment_health=$3, updated_at=current_timestamp + SET segment_health=$3, updated_at=current_timestamp, placement=$4 RETURNING (xmax != 0) AS alreadyInserted ` case dbutil.Cockroach: @@ -59,18 +59,18 @@ func (r *repairQueue) Insert(ctx context.Context, seg *queue.InjuredSegment) (al ) INSERT INTO repair_queue ( - stream_id, position, segment_health + stream_id, position, segment_health, placement ) VALUES ( - $1, $2, $3 + $1, $2, $3, $4 ) ON CONFLICT (stream_id, position) DO UPDATE - SET segment_health=$3, updated_at=current_timestamp + SET segment_health=$3, updated_at=current_timestamp, placement=$4 RETURNING (SELECT alreadyInserted FROM inserted) ` } - rows, err := r.db.QueryContext(ctx, query, seg.StreamID, seg.Position.Encode(), seg.SegmentHealth) + rows, err := r.db.QueryContext(ctx, query, seg.StreamID, seg.Position.Encode(), seg.SegmentHealth, seg.Placement) if err != nil { return false, err } @@ -108,12 +108,13 @@ func (r *repairQueue) InsertBatch( query = ` INSERT INTO repair_queue ( - stream_id, position, segment_health + stream_id, position, segment_health, placement ) VALUES ( UNNEST($1::BYTEA[]), UNNEST($2::INT8[]), - UNNEST($3::double precision[]) + UNNEST($3::double precision[]), + UNNEST($4::INT2[]) ) ON CONFLICT (stream_id, position) DO UPDATE @@ -127,19 +128,21 @@ func (r *repairQueue) InsertBatch( SELECT UNNEST($1::BYTEA[]) AS stream_id, UNNEST($2::INT8[]) AS position, - UNNEST($3::double precision[]) AS segment_health + UNNEST($3::double precision[]) AS segment_health, + UNNEST($4::INT2[]) AS placement ), do_insert AS ( INSERT INTO repair_queue ( - stream_id, position, segment_health + stream_id, position, segment_health, placement ) - SELECT stream_id, position, segment_health + SELECT stream_id, position, segment_health, placement FROM to_insert ON CONFLICT (stream_id, position) DO UPDATE SET segment_health=EXCLUDED.segment_health, - updated_at=current_timestamp + updated_at=current_timestamp, + placement=EXCLUDED.placement RETURNING false ) SELECT @@ -155,12 +158,14 @@ func (r *repairQueue) InsertBatch( StreamIDs []uuid.UUID Positions []int64 SegmentHealths []float64 + placements []int16 } for _, segment := range segments { insertData.StreamIDs = append(insertData.StreamIDs, segment.StreamID) insertData.Positions = append(insertData.Positions, int64(segment.Position.Encode())) insertData.SegmentHealths = append(insertData.SegmentHealths, segment.SegmentHealth) + insertData.placements = append(insertData.placements, int16(segment.Placement)) } rows, err := r.db.QueryContext( @@ -168,6 +173,7 @@ func (r *repairQueue) InsertBatch( pgutil.UUIDArray(insertData.StreamIDs), pgutil.Int8Array(insertData.Positions), pgutil.Float8Array(insertData.SegmentHealths), + pgutil.Int2Array(insertData.placements), ) if err != nil { @@ -204,18 +210,18 @@ func (r *repairQueue) Select(ctx context.Context) (seg *queue.InjuredSegment, er WHERE attempted_at IS NULL OR attempted_at < now() - interval '6 hours' ORDER BY segment_health ASC, attempted_at NULLS FIRST LIMIT 1 - RETURNING stream_id, position, attempted_at, updated_at, inserted_at, segment_health + RETURNING stream_id, position, attempted_at, updated_at, inserted_at, segment_health, placement `).Scan(&segment.StreamID, &segment.Position, &segment.AttemptedAt, - &segment.UpdatedAt, &segment.InsertedAt, &segment.SegmentHealth) + &segment.UpdatedAt, &segment.InsertedAt, &segment.SegmentHealth, &segment.Placement) case dbutil.Postgres: err = r.db.QueryRowContext(ctx, ` UPDATE repair_queue SET attempted_at = now() WHERE (stream_id, position) = ( SELECT stream_id, position FROM repair_queue WHERE attempted_at IS NULL OR attempted_at < now() - interval '6 hours' ORDER BY segment_health ASC, attempted_at NULLS FIRST FOR UPDATE SKIP LOCKED LIMIT 1 - ) RETURNING stream_id, position, attempted_at, updated_at, inserted_at, segment_health + ) RETURNING stream_id, position, attempted_at, updated_at, inserted_at, segment_health, placement `).Scan(&segment.StreamID, &segment.Position, &segment.AttemptedAt, - &segment.UpdatedAt, &segment.InsertedAt, &segment.SegmentHealth) + &segment.UpdatedAt, &segment.InsertedAt, &segment.SegmentHealth, &segment.Placement) default: return seg, errs.New("unhandled database: %v", r.db.impl) } @@ -247,7 +253,7 @@ func (r *repairQueue) SelectN(ctx context.Context, limit int) (segs []queue.Inju } // TODO: strictly enforce order-by or change tests rows, err := r.db.QueryContext(ctx, - r.db.Rebind(`SELECT stream_id, position, attempted_at, updated_at, segment_health + r.db.Rebind(`SELECT stream_id, position, attempted_at, updated_at, segment_health, placement FROM repair_queue LIMIT ?`), limit, ) if err != nil { @@ -258,7 +264,7 @@ func (r *repairQueue) SelectN(ctx context.Context, limit int) (segs []queue.Inju for rows.Next() { var seg queue.InjuredSegment err = rows.Scan(&seg.StreamID, &seg.Position, &seg.AttemptedAt, - &seg.UpdatedAt, &seg.SegmentHealth) + &seg.UpdatedAt, &seg.SegmentHealth, &seg.Placement) if err != nil { return segs, Error.Wrap(err) } diff --git a/satellite/satellitedb/repairqueue_test.go b/satellite/satellitedb/repairqueue_test.go new file mode 100644 index 000000000..e17e2e4c8 --- /dev/null +++ b/satellite/satellitedb/repairqueue_test.go @@ -0,0 +1,97 @@ +// Copyright (C) 2023 Storj Labs, Inc. +// See LICENSE for copying information. + +package satellitedb_test + +import ( + "sort" + "testing" + + "github.com/stretchr/testify/require" + + "storj.io/common/storj" + "storj.io/common/testcontext" + "storj.io/common/testrand" + "storj.io/storj/satellite" + "storj.io/storj/satellite/metabase" + "storj.io/storj/satellite/repair/queue" + "storj.io/storj/satellite/satellitedb/satellitedbtest" +) + +func TestRepairQueue(t *testing.T) { + testSegments := make([]*queue.InjuredSegment, 3) + for i := 0; i < len(testSegments); i++ { + testSegments[i] = &queue.InjuredSegment{ + StreamID: testrand.UUID(), + Position: metabase.SegmentPosition{ + Part: uint32(i), + Index: 2, + }, + SegmentHealth: 10, + Placement: storj.PlacementConstraint(i), + } + } + + satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { + rq := db.RepairQueue() + + alreadyInserted, err := rq.Insert(ctx, testSegments[0]) + require.NoError(t, err) + require.False(t, alreadyInserted) + + sn, err := rq.SelectN(ctx, 1) + require.NoError(t, err) + require.Equal(t, testSegments[0].StreamID, sn[0].StreamID) + require.Equal(t, testSegments[0].Placement, sn[0].Placement) + require.Equal(t, testSegments[0].Position, sn[0].Position) + require.Equal(t, testSegments[0].SegmentHealth, sn[0].SegmentHealth) + + // upsert + alreadyInserted, err = rq.Insert(ctx, &queue.InjuredSegment{ + StreamID: testSegments[0].StreamID, + Position: testSegments[0].Position, + SegmentHealth: 12, + Placement: storj.PlacementConstraint(99), + }) + require.NoError(t, err) + require.True(t, alreadyInserted) + + rs1, err := rq.Select(ctx) + require.NoError(t, err) + require.Equal(t, testSegments[0].StreamID, rs1.StreamID) + require.Equal(t, storj.PlacementConstraint(99), rs1.Placement) + require.Equal(t, testSegments[0].Position, rs1.Position) + require.Equal(t, float64(12), rs1.SegmentHealth) + + // empty queue (one record, but that's already attempted) + _, err = rq.Select(ctx) + require.Error(t, err) + + // make sure it's really empty + err = rq.Delete(ctx, testSegments[0]) + require.NoError(t, err) + + // insert 2 new + newlyInserted, err := rq.InsertBatch(ctx, []*queue.InjuredSegment{ + testSegments[1], testSegments[2], + }) + require.NoError(t, err) + require.Len(t, newlyInserted, 2) + + // select2 (including attempted) + segments, err := rq.SelectN(ctx, 2) + require.NoError(t, err) + + sort.Slice(segments, func(i, j int) bool { + return segments[i].Position.Part < segments[j].Position.Part + }) + + for i := 0; i < 2; i++ { + require.Equal(t, testSegments[i+1].StreamID, segments[i].StreamID) + require.Equal(t, testSegments[i+1].Placement, segments[i].Placement) + require.Equal(t, testSegments[i+1].Position, segments[i].Position) + require.Equal(t, testSegments[i+1].SegmentHealth, segments[i].SegmentHealth) + } + + }) +}