storj/satellite/repair/queue/queue_test.go
Moby von Briesen 4e5a7f13c7 satellite/repair/queue: Prioritize selection of items off repair queue by segment health
Add a column to the repair queue table in the satellite db for healthy
piece count. When an item is selected from the repair queue, the least
durable segment that has not been attempted in the past hour should be
selected first. This prevents our repairer from getting stuck doing work
on segments that are close to the repair threshold while allowing
segments that are more unhealthy to degrade further.

The migration also clears the repair queue so that the migration runs
quickly and we can properly account for segment health in future repair
work.

We do not select items off the repair queue that have been attempted in
the past six hours. This was changed from on hour to allow us time to
try a wider variety of segments when the repair queue is very large.

Change-Id: Iaf183f1e5fd45cd792a52e3563a3e43a2b9f410b
2020-02-26 09:54:16 -05:00

158 lines
3.7 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package queue_test
import (
"sort"
"strconv"
"testing"
"github.com/stretchr/testify/require"
"storj.io/common/errs2"
"storj.io/common/pb"
"storj.io/common/testcontext"
"storj.io/storj/satellite"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
"storj.io/storj/storage"
)
func TestInsertSelect(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
q := db.RepairQueue()
seg := &pb.InjuredSegment{
Path: []byte("abc"),
LostPieces: []int32{int32(1), int32(3)},
}
err := q.Insert(ctx, seg, 10)
require.NoError(t, err)
s, err := q.Select(ctx)
require.NoError(t, err)
err = q.Delete(ctx, s)
require.NoError(t, err)
require.True(t, pb.Equal(s, seg))
})
}
func TestInsertDuplicate(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
q := db.RepairQueue()
seg := &pb.InjuredSegment{
Path: []byte("abc"),
LostPieces: []int32{int32(1), int32(3)},
}
err := q.Insert(ctx, seg, 10)
require.NoError(t, err)
err = q.Insert(ctx, seg, 10)
require.NoError(t, err)
})
}
func TestDequeueEmptyQueue(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
q := db.RepairQueue()
_, err := q.Select(ctx)
require.Error(t, err)
require.True(t, storage.ErrEmptyQueue.Has(err), "error should of class EmptyQueue")
})
}
func TestSequential(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
q := db.RepairQueue()
const N = 100
var addSegs []*pb.InjuredSegment
for i := 0; i < N; i++ {
seg := &pb.InjuredSegment{
Path: []byte(strconv.Itoa(i)),
LostPieces: []int32{int32(i)},
}
err := q.Insert(ctx, seg, 10)
require.NoError(t, err)
addSegs = append(addSegs, seg)
}
list, err := q.SelectN(ctx, N)
require.NoError(t, err)
require.Len(t, list, N)
sort.SliceStable(list, func(i, j int) bool { return list[i].LostPieces[0] < list[j].LostPieces[0] })
for i := 0; i < N; i++ {
require.True(t, pb.Equal(addSegs[i], &list[i]))
}
// TODO: fix out of order issue
for i := 0; i < N; i++ {
s, err := q.Select(ctx)
require.NoError(t, err)
err = q.Delete(ctx, s)
require.NoError(t, err)
expected := s.LostPieces[0]
require.True(t, pb.Equal(addSegs[expected], s))
}
})
}
func TestParallel(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
q := db.RepairQueue()
const N = 100
entries := make(chan *pb.InjuredSegment, N)
var inserts errs2.Group
// Add to queue concurrently
for i := 0; i < N; i++ {
i := i
inserts.Go(func() error {
return q.Insert(ctx, &pb.InjuredSegment{
Path: []byte(strconv.Itoa(i)),
LostPieces: []int32{int32(i)},
}, 10)
})
}
require.Empty(t, inserts.Wait(), "unexpected queue.Insert errors")
// Remove from queue concurrently
var remove errs2.Group
for i := 0; i < N; i++ {
remove.Go(func() error {
s, err := q.Select(ctx)
if err != nil {
return err
}
err = q.Delete(ctx, s)
if err != nil {
return err
}
entries <- s
return nil
})
}
require.Empty(t, remove.Wait(), "unexpected queue.Select/Delete errors")
close(entries)
var items []*pb.InjuredSegment
for segment := range entries {
items = append(items, segment)
}
sort.Slice(items, func(i, k int) bool {
return items[i].LostPieces[0] < items[k].LostPieces[0]
})
// check if the enqueued and dequeued elements match
for i := 0; i < N; i++ {
require.Equal(t, items[i].LostPieces[0], int32(i))
}
})
}