satellite/repair/queue: buffer batch insert

Implement a buffer for inserting repair items into the queue in a batch.

Part of https://github.com/storj/storj/issues/4727

Change-Id: I718472b2f2b1f4993c3d6f15c44923776407155a
This commit is contained in:
Erik van Velzen 2022-04-19 11:22:05 +02:00 committed by Fadila
parent 6175731166
commit 928375a67c
4 changed files with 180 additions and 71 deletions

View File

@ -122,7 +122,7 @@ var (
}
qdiagCmd = &cobra.Command{
Use: "qdiag",
Short: "Repair Queue Diagnostic Tool support",
Short: "Repair queue Diagnostic Tool support",
RunE: cmdQDiag,
}
reportsCmd = &cobra.Command{

View File

@ -9,35 +9,75 @@ import (
// InsertBuffer exposes a synchronous API to buffer a batch of segments
// and insert them at once. Not threadsafe. Call Flush() before discarding.
//
//lint:ignore U1000 unused skeleton code
type InsertBuffer struct {
queue RepairQueue //nolint
batchSize int //nolint
batch []*InjuredSegment //nolint
queue RepairQueue
batchSize int
batch []*InjuredSegment
// newInsertCallbacks contains callback called when the InjuredSegment
// is flushed to the queue and it is determined that it wasn't already queued for repair.
// This is made to collect metrics.
newInsertCallbacks map[*InjuredSegment]func() //nolint
newInsertCallbacks map[*InjuredSegment]func()
}
// NewInsertBuffer wraps a RepairQueue with buffer logic.
func NewInsertBuffer(
queue RepairQueue,
batchSize int,
) *InsertBuffer {
insertBuffer := InsertBuffer{
queue: queue,
batchSize: batchSize,
batch: make([]*InjuredSegment, 0, batchSize),
newInsertCallbacks: make(map[*InjuredSegment]func()),
}
return &insertBuffer
}
// Insert adds a segment to the batch of the next insert,
// and does a synchronous database insert when the batch size is reached.
// When it is determined that this segment is newly queued, firstInsertCallback is called.
// for the purpose of metrics.
//
//lint:ignore U1000 skeleton code
func (r *InsertBuffer) Insert(
ctx context.Context,
segment *InjuredSegment,
newInsertCallback func(),
) (err error) {
return err
r.batch = append(r.batch, segment)
r.newInsertCallbacks[segment] = newInsertCallback
if len(r.batch) < r.batchSize {
return nil
}
return r.Flush(ctx)
}
// Flush inserts the remaining segments into the database.
//
//lint:ignore U1000 skeleton code
func (r *InsertBuffer) Flush(ctx context.Context) (err error) {
return err
newlyInsertedSegments, err := r.queue.InsertBatch(ctx, r.batch)
if err != nil {
return err
}
for _, segment := range newlyInsertedSegments {
callback := r.newInsertCallbacks[segment]
if callback != nil {
callback()
}
}
r.clearInternals()
return nil
}
func (r *InsertBuffer) clearInternals() {
// make room for the next batch
r.batch = r.batch[:0]
for key := range r.newInsertCallbacks {
delete(r.newInsertCallbacks, key)
}
}

View File

@ -0,0 +1,110 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package queue_test
import (
"testing"
"github.com/stretchr/testify/require"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/satellite"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/repair/queue"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
)
func TestInsertBufferNoCallback(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
repairQueue := db.RepairQueue()
insertBuffer := queue.NewInsertBuffer(repairQueue, 2)
segment1 := createInjuredSegment()
segment2 := createInjuredSegment()
segment3 := createInjuredSegment()
err := insertBuffer.Insert(ctx, segment1, nil)
require.NoError(t, err)
count, err := repairQueue.Count(ctx)
require.NoError(t, err)
require.Equal(t, 0, count)
err = insertBuffer.Insert(ctx, segment2, nil)
require.NoError(t, err)
count, err = repairQueue.Count(ctx)
require.NoError(t, err)
require.Equal(t, 2, count)
err = insertBuffer.Insert(ctx, segment1, nil)
require.NoError(t, err)
count, err = repairQueue.Count(ctx)
require.NoError(t, err)
require.Equal(t, 2, count)
err = insertBuffer.Insert(ctx, segment3, nil)
require.NoError(t, err)
count, err = repairQueue.Count(ctx)
require.NoError(t, err)
require.Equal(t, 3, count)
})
}
func TestInsertBufferSingleUniqueObject(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
insertBuffer := queue.NewInsertBuffer(db.RepairQueue(), 1)
numUnique := 0
inc := func() {
numUnique++
}
segment1 := createInjuredSegment()
err := insertBuffer.Insert(ctx, segment1, inc)
require.NoError(t, err)
require.Equal(t, numUnique, 1)
err = insertBuffer.Insert(ctx, segment1, inc)
require.NoError(t, err)
require.Equal(t, numUnique, 1)
err = insertBuffer.Insert(ctx, segment1, inc)
require.NoError(t, err)
require.Equal(t, numUnique, 1)
})
}
func TestInsertBufferTwoUniqueObjects(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
insertBuffer := queue.NewInsertBuffer(db.RepairQueue(), 1)
numUnique := 0
inc := func() {
numUnique++
}
segment1 := createInjuredSegment()
segment2 := createInjuredSegment()
err := insertBuffer.Insert(ctx, segment1, inc)
require.NoError(t, err)
require.Equal(t, numUnique, 1)
err = insertBuffer.Insert(ctx, segment2, inc)
require.NoError(t, err)
require.Equal(t, numUnique, 2)
})
}
func createInjuredSegment() *queue.InjuredSegment {
return &queue.InjuredSegment{
StreamID: testrand.UUID(),
Position: metabase.SegmentPosition{
Part: uint32(testrand.Intn(1000)),
Index: uint32(testrand.Intn(1000)),
},
SegmentHealth: 10,
}
}

View File

@ -15,7 +15,6 @@ import (
"storj.io/common/testrand"
"storj.io/common/uuid"
"storj.io/storj/satellite"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/repair/queue"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
"storj.io/storj/storage"
@ -25,14 +24,9 @@ func TestInsertSelect(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
q := db.RepairQueue()
seg := &queue.InjuredSegment{
StreamID: testrand.UUID(),
Position: metabase.SegmentPosition{
Part: uint32(testrand.Intn(1000)),
Index: uint32(testrand.Intn(1000)),
},
SegmentHealth: 0.4,
}
seg := createInjuredSegment()
seg.SegmentHealth = 0.4
alreadyInserted, err := q.Insert(ctx, seg)
require.NoError(t, err)
require.False(t, alreadyInserted)
@ -52,14 +46,7 @@ func TestInsertDuplicate(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
q := db.RepairQueue()
seg := &queue.InjuredSegment{
StreamID: testrand.UUID(),
Position: metabase.SegmentPosition{
Part: uint32(testrand.Intn(1000)),
Index: uint32(testrand.Intn(1000)),
},
SegmentHealth: 10,
}
seg := createInjuredSegment()
alreadyInserted, err := q.Insert(ctx, seg)
require.NoError(t, err)
require.False(t, alreadyInserted)
@ -74,14 +61,7 @@ func TestInsertBatchOfOne(t *testing.T) {
q := db.RepairQueue()
writeSegments := []*queue.InjuredSegment{
{
StreamID: testrand.UUID(),
Position: metabase.SegmentPosition{
Part: uint32(testrand.Intn(1000)),
Index: uint32(testrand.Intn(1000)),
},
SegmentHealth: 10,
},
createInjuredSegment(),
}
newlyInserted, err := q.InsertBatch(ctx, writeSegments)
require.NoError(t, err)
@ -125,48 +105,27 @@ func TestInsertOverlappingBatches(t *testing.T) {
}
}
writeSegment1 := queue.InjuredSegment{
StreamID: testrand.UUID(),
Position: metabase.SegmentPosition{
Part: uint32(testrand.Intn(1000)),
Index: uint32(testrand.Intn(1000)),
},
SegmentHealth: 10,
}
writeSegment2 := queue.InjuredSegment{
StreamID: testrand.UUID(),
Position: metabase.SegmentPosition{
Part: uint32(testrand.Intn(1000)),
Index: uint32(testrand.Intn(1000)),
},
SegmentHealth: 10,
}
writeSegment3 := queue.InjuredSegment{
StreamID: testrand.UUID(),
Position: metabase.SegmentPosition{
Part: uint32(testrand.Intn(1000)),
Index: uint32(testrand.Intn(1000)),
},
SegmentHealth: 10,
}
writeSegment1 := createInjuredSegment()
writeSegment2 := createInjuredSegment()
writeSegment3 := createInjuredSegment()
newlyInserted, err := q.InsertBatch(ctx, []*queue.InjuredSegment{&writeSegment1, &writeSegment2})
newlyInserted, err := q.InsertBatch(ctx, []*queue.InjuredSegment{writeSegment1, writeSegment2})
require.NoError(t, err)
require.Len(t, newlyInserted, 2)
require.Equal(t, newlyInserted[0], &writeSegment1)
require.Equal(t, newlyInserted[1], &writeSegment2)
requireDbState([]queue.InjuredSegment{writeSegment1, writeSegment2})
require.Equal(t, newlyInserted[0], writeSegment1)
require.Equal(t, newlyInserted[1], writeSegment2)
requireDbState([]queue.InjuredSegment{*writeSegment1, *writeSegment2})
newlyInserted, err = q.InsertBatch(ctx, []*queue.InjuredSegment{&writeSegment2, &writeSegment3})
newlyInserted, err = q.InsertBatch(ctx, []*queue.InjuredSegment{writeSegment2, writeSegment3})
require.NoError(t, err)
require.Len(t, newlyInserted, 1)
require.Equal(t, newlyInserted[0], &writeSegment3)
requireDbState([]queue.InjuredSegment{writeSegment1, writeSegment2, writeSegment3})
require.Equal(t, newlyInserted[0], writeSegment3)
requireDbState([]queue.InjuredSegment{*writeSegment1, *writeSegment2, *writeSegment3})
newlyInserted, err = q.InsertBatch(ctx, []*queue.InjuredSegment{&writeSegment1, &writeSegment3})
newlyInserted, err = q.InsertBatch(ctx, []*queue.InjuredSegment{writeSegment1, writeSegment3})
require.NoError(t, err)
require.Len(t, newlyInserted, 0)
requireDbState([]queue.InjuredSegment{writeSegment1, writeSegment2, writeSegment3})
requireDbState([]queue.InjuredSegment{*writeSegment1, *writeSegment2, *writeSegment3})
})
}