satellitedb/repair: update placement when InsertBatch updates records

Change-Id: If974ff5d57abbe5bd16ce4cb6643d8a12314fe12
This commit is contained in:
Márton Elek 2023-11-16 16:47:34 +01:00 committed by Storj Robot
parent e474b9728c
commit 257bdbac32
2 changed files with 67 additions and 14 deletions

View File

@ -121,7 +121,7 @@ func (r *repairQueue) InsertBatch(
)
ON CONFLICT (stream_id, position)
DO UPDATE
SET segment_health=EXCLUDED.segment_health, updated_at=current_timestamp
SET segment_health=EXCLUDED.segment_health, updated_at=current_timestamp, placement=EXCLUDED.placement
RETURNING NOT(xmax != 0) AS newlyInserted
`
case dbutil.Cockroach:

View File

@ -7,6 +7,7 @@ import (
"sort"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"storj.io/common/storj"
@ -97,6 +98,7 @@ func TestRepairQueue(t *testing.T) {
}
func TestRepairQueue_PlacementRestrictions(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
testSegments := make([]*queue.InjuredSegment, 40)
for i := 0; i < len(testSegments); i++ {
testSegments[i] = &queue.InjuredSegment{
@ -110,7 +112,6 @@ func TestRepairQueue_PlacementRestrictions(t *testing.T) {
}
}
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
rq := db.RepairQueue()
for i := 0; i < len(testSegments); i++ {
@ -147,3 +148,55 @@ func TestRepairQueue_PlacementRestrictions(t *testing.T) {
})
}
func TestRepairQueue_BatchInsert(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
testSegments := make([]*queue.InjuredSegment, 5)
for i := 0; i < len(testSegments); i++ {
placement := storj.PlacementConstraint(i % 10)
uuid := testrand.UUID()
uuid[0] = byte(placement)
testSegments[i] = &queue.InjuredSegment{
StreamID: uuid,
Position: metabase.SegmentPosition{
Part: uint32(i),
Index: 2,
},
SegmentHealth: 10,
Placement: placement,
}
}
// fresh inserts
rq := db.RepairQueue()
_, err := rq.InsertBatch(ctx, testSegments)
require.NoError(t, err)
for i := 0; i < len(testSegments); i++ {
segment, err := rq.Select(ctx, []storj.PlacementConstraint{storj.PlacementConstraint(i)}, nil)
require.NoError(t, err)
assert.Equal(t, storj.PlacementConstraint(segment.StreamID[0]), segment.Placement)
}
// fresh inserts again
_, err = rq.InsertBatch(ctx, testSegments)
require.NoError(t, err)
for _, ts := range testSegments {
ts.StreamID[0] = byte(ts.Placement + 1)
}
// this time placement is changed between inserts.
_, err = rq.InsertBatch(ctx, testSegments)
require.NoError(t, err)
for i := 0; i < len(testSegments); i++ {
segment, err := rq.Select(ctx, []storj.PlacementConstraint{storj.PlacementConstraint(i)}, nil)
require.NoError(t, err)
require.Equal(t, storj.PlacementConstraint(segment.StreamID[0]), segment.Placement+1)
}
})
}