b612ded9af
The audit chore will be pushing a large number of segments to be audited, and the db might choke on that large insert when under load. This change divides the insert up into batches, which can be sized however is optimal for the backing database. It also arranges for segments to be inserted in the order of the primary key, which helps performance on some systems. Refs: https://github.com/storj/storj/issues/5228 Change-Id: I941f580f690d681b80c86faf4abca2995e37135d
117 lines
3.8 KiB
Go
117 lines
3.8 KiB
Go
// Copyright (C) 2022 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package satellitedb_test
|
|
|
|
import (
|
|
"math/rand"
|
|
"sort"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"storj.io/common/testcontext"
|
|
"storj.io/common/testrand"
|
|
"storj.io/storj/satellite"
|
|
"storj.io/storj/satellite/audit"
|
|
"storj.io/storj/satellite/metabase"
|
|
"storj.io/storj/satellite/satellitedb/satellitedbtest"
|
|
)
|
|
|
|
func TestVerifyQueueBasicUsage(t *testing.T) {
|
|
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
|
verifyQueue := db.VerifyQueue()
|
|
|
|
// generate random segment records
|
|
segmentsToVerify := make([]audit.Segment, 6)
|
|
for i := range segmentsToVerify {
|
|
segmentsToVerify[i].StreamID = testrand.UUID()
|
|
segmentsToVerify[i].Position = metabase.SegmentPositionFromEncoded(rand.Uint64())
|
|
segmentsToVerify[i].EncryptedSize = rand.Int31()
|
|
}
|
|
expireTime := time.Now().Add(24 * time.Hour).Truncate(time.Microsecond)
|
|
segmentsToVerify[1].ExpiresAt = &expireTime
|
|
|
|
// add these segments to the queue, 3 at a time
|
|
err := verifyQueue.Push(ctx, segmentsToVerify[0:3], 3)
|
|
require.NoError(t, err)
|
|
err = verifyQueue.Push(ctx, segmentsToVerify[3:6], 3)
|
|
require.NoError(t, err)
|
|
|
|
// sort both sets of 3. segments inserted in the same call to Push
|
|
// can't be differentiated by insertion time, so they are ordered in the
|
|
// queue by (stream_id, position). We will sort our list here so that it
|
|
// matches the order we expect to receive them from the queue.
|
|
sort.Sort(audit.ByStreamIDAndPosition(segmentsToVerify[0:3]))
|
|
sort.Sort(audit.ByStreamIDAndPosition(segmentsToVerify[3:6]))
|
|
|
|
// Pop all segments from the queue and check for a match with the input.
|
|
for _, expected := range segmentsToVerify {
|
|
popped, err := verifyQueue.Next(ctx)
|
|
require.NoError(t, err)
|
|
require.Equal(t, expected.StreamID, popped.StreamID)
|
|
require.Equal(t, expected.Position, popped.Position)
|
|
require.Equal(t, expected.ExpiresAt == nil, popped.ExpiresAt == nil)
|
|
if expected.ExpiresAt != nil {
|
|
require.Truef(t, expected.ExpiresAt.Equal(*popped.ExpiresAt), "expected %s but got %s", expected.ExpiresAt.Format(time.RFC3339), popped.ExpiresAt.Format(time.RFC3339))
|
|
}
|
|
require.Equal(t, expected.EncryptedSize, popped.EncryptedSize)
|
|
}
|
|
|
|
// Check that we got all segments.
|
|
popped, err := verifyQueue.Next(ctx)
|
|
require.Error(t, err)
|
|
require.Truef(t, audit.ErrEmptyQueue.Has(err), "unexpected error %v", err)
|
|
require.Equal(t, audit.Segment{}, popped)
|
|
})
|
|
}
|
|
|
|
func TestVerifyQueueEmpty(t *testing.T) {
|
|
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
|
verifyQueue := db.VerifyQueue()
|
|
|
|
// insert empty list
|
|
err := verifyQueue.Push(ctx, []audit.Segment{}, 1000)
|
|
require.NoError(t, err)
|
|
|
|
// read from empty queue
|
|
popped, err := verifyQueue.Next(ctx)
|
|
require.Error(t, err)
|
|
require.Truef(t, audit.ErrEmptyQueue.Has(err), "unexpected error %v", err)
|
|
require.Equal(t, audit.Segment{}, popped)
|
|
})
|
|
}
|
|
|
|
func TestMultipleBatches(t *testing.T) {
|
|
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
|
verifyQueue := db.VerifyQueue()
|
|
|
|
// make a list of segments, and push them in batches
|
|
const (
|
|
numSegments = 20
|
|
batchSize = 5
|
|
)
|
|
segments := make([]audit.Segment, numSegments)
|
|
for i := range segments {
|
|
segments[i] = audit.Segment{
|
|
StreamID: testrand.UUID(),
|
|
Position: metabase.SegmentPositionFromEncoded(rand.Uint64()),
|
|
EncryptedSize: rand.Int31(),
|
|
}
|
|
}
|
|
|
|
err := verifyQueue.Push(ctx, segments, batchSize)
|
|
require.NoError(t, err)
|
|
|
|
// verify that all segments made it to the db
|
|
sort.Sort(audit.ByStreamIDAndPosition(segments))
|
|
|
|
for i := range segments {
|
|
gotSegment, err := verifyQueue.Next(ctx)
|
|
require.NoError(t, err)
|
|
require.Equal(t, segments[i], gotSegment)
|
|
}
|
|
})
|
|
}
|