From b612ded9af13fcabccf09dedd158e10162e79bdc Mon Sep 17 00:00:00 2001 From: paul cannon Date: Wed, 23 Nov 2022 10:37:39 -0600 Subject: [PATCH] satellite/audit: help performance of pushing to audit queue The audit chore will be pushing a large number of segments to be audited, and the db might choke on that large insert when under load. This change divides the insert up into batches, which can be sized however is optimal for the backing database. It also arranges for segments to be inserted in the order of the primary key, which helps performance on some systems. Refs: https://github.com/storj/storj/issues/5228 Change-Id: I941f580f690d681b80c86faf4abca2995e37135d --- satellite/audit/chore.go | 5 +- satellite/audit/queue.go | 24 +++++++- satellite/audit/worker.go | 9 +-- satellite/satellitedb/verifyqueue.go | 50 ++++++++++------ satellite/satellitedb/verifyqueue_test.go | 63 ++++++++++++--------- scripts/testdata/satellite-config.yaml.lock | 3 + 6 files changed, 104 insertions(+), 50 deletions(-) diff --git a/satellite/audit/chore.go b/satellite/audit/chore.go index 99e7954d9..8a6a723f7 100644 --- a/satellite/audit/chore.go +++ b/satellite/audit/chore.go @@ -30,6 +30,9 @@ type Chore struct { // NewChore instantiates Chore. func NewChore(log *zap.Logger, queue VerifyQueue, loop *segmentloop.Service, config Config) *Chore { + if config.VerificationPushBatchSize < 1 { + config.VerificationPushBatchSize = 1 + } return &Chore{ log: log, rand: rand.New(rand.NewSource(time.Now().Unix())), @@ -86,7 +89,7 @@ func (chore *Chore) Run(ctx context.Context) (err error) { } // Push new queue to queues struct so it can be fetched by worker. - return chore.queue.Push(ctx, newQueue) + return chore.queue.Push(ctx, newQueue, chore.config.VerificationPushBatchSize) }) } diff --git a/satellite/audit/queue.go b/satellite/audit/queue.go index 42fb09af1..d64ded522 100644 --- a/satellite/audit/queue.go +++ b/satellite/audit/queue.go @@ -136,7 +136,7 @@ func (queues *Queues) WaitForSwap(ctx context.Context) error { // stripe of data across all pieces in the segment and ensure that all pieces // conform to the same polynomial. type VerifyQueue interface { - Push(ctx context.Context, segments []Segment) (err error) + Push(ctx context.Context, segments []Segment, maxBatchSize int) (err error) Next(ctx context.Context) (Segment, error) } @@ -149,3 +149,25 @@ type ReverifyQueue interface { GetNextJob(ctx context.Context) (job ReverificationJob, err error) Remove(ctx context.Context, piece PieceLocator) (wasDeleted bool, err error) } + +// ByStreamIDAndPosition allows sorting of a slice of segments by stream ID and position. +type ByStreamIDAndPosition []Segment + +func (b ByStreamIDAndPosition) Len() int { + return len(b) +} + +func (b ByStreamIDAndPosition) Less(i, j int) bool { + comparison := b[i].StreamID.Compare(b[j].StreamID) + if comparison < 0 { + return true + } + if comparison > 0 { + return false + } + return b[i].Position.Less(b[j].Position) +} + +func (b ByStreamIDAndPosition) Swap(i, j int) { + b[i], b[j] = b[j], b[i] +} diff --git a/satellite/audit/worker.go b/satellite/audit/worker.go index 7bce3a9b3..6a65258d7 100644 --- a/satellite/audit/worker.go +++ b/satellite/audit/worker.go @@ -26,10 +26,11 @@ type Config struct { MinDownloadTimeout time.Duration `help:"the minimum duration for downloading a share from storage nodes before timing out" default:"5m0s" testDefault:"5s"` MaxReverifyCount int `help:"limit above which we consider an audit is failed" default:"3"` - ChoreInterval time.Duration `help:"how often to run the reservoir chore" releaseDefault:"24h" devDefault:"1m" testDefault:"$TESTINTERVAL"` - QueueInterval time.Duration `help:"how often to recheck an empty audit queue" releaseDefault:"1h" devDefault:"1m" testDefault:"$TESTINTERVAL"` - Slots int `help:"number of reservoir slots allotted for nodes, currently capped at 3" default:"3"` - WorkerConcurrency int `help:"number of workers to run audits on segments" default:"2"` + ChoreInterval time.Duration `help:"how often to run the reservoir chore" releaseDefault:"24h" devDefault:"1m" testDefault:"$TESTINTERVAL"` + QueueInterval time.Duration `help:"how often to recheck an empty audit queue" releaseDefault:"1h" devDefault:"1m" testDefault:"$TESTINTERVAL"` + Slots int `help:"number of reservoir slots allotted for nodes, currently capped at 3" default:"3"` + VerificationPushBatchSize int `help:"number of audit jobs to push at once to the verification queue" devDefault:"10" releaseDefault:"4096"` + WorkerConcurrency int `help:"number of workers to run audits on segments" default:"2"` ReverifyWorkerConcurrency int `help:"number of workers to run reverify audits on pieces" default:"2"` ReverificationRetryInterval time.Duration `help:"how long a single reverification job can take before it may be taken over by another worker" releaseDefault:"6h" devDefault:"10m"` diff --git a/satellite/satellitedb/verifyqueue.go b/satellite/satellitedb/verifyqueue.go index f241f2104..8e3561084 100644 --- a/satellite/satellitedb/verifyqueue.go +++ b/satellite/satellitedb/verifyqueue.go @@ -7,8 +7,10 @@ import ( "context" "database/sql" "errors" + "sort" "time" + "storj.io/common/uuid" "storj.io/private/dbutil" "storj.io/private/dbutil/pgutil" "storj.io/storj/satellite/audit" @@ -28,30 +30,42 @@ type verifyQueue struct { var _ audit.VerifyQueue = (*verifyQueue)(nil) -func (vq *verifyQueue) Push(ctx context.Context, segments []audit.Segment) (err error) { +func (vq *verifyQueue) Push(ctx context.Context, segments []audit.Segment, maxBatchSize int) (err error) { defer mon.Task()(&ctx)(&err) - streamIDSlice := make([][]byte, len(segments)) - positionSlice := make([]int64, len(segments)) - expirationSlice := make([]*time.Time, len(segments)) - encryptedSizeSlice := make([]int32, len(segments)) + streamIDSlice := make([]uuid.UUID, maxBatchSize) + positionSlice := make([]int64, maxBatchSize) + expirationSlice := make([]*time.Time, maxBatchSize) + encryptedSizeSlice := make([]int32, maxBatchSize) - for i, seg := range segments { - streamIDSlice[i] = seg.StreamID.Bytes() - positionSlice[i] = int64(seg.Position.Encode()) - expirationSlice[i] = seg.ExpiresAt - encryptedSizeSlice[i] = seg.EncryptedSize - } - _, err = vq.db.DB.ExecContext(ctx, ` + // sort segments in the order of the primary key before inserting, for performance reasons + sort.Sort(audit.ByStreamIDAndPosition(segments)) + + segmentsIndex := 0 + for segmentsIndex < len(segments) { + batchIndex := 0 + for batchIndex < maxBatchSize && segmentsIndex < len(segments) { + streamIDSlice[batchIndex] = segments[segmentsIndex].StreamID + positionSlice[batchIndex] = int64(segments[segmentsIndex].Position.Encode()) + expirationSlice[batchIndex] = segments[segmentsIndex].ExpiresAt + encryptedSizeSlice[batchIndex] = segments[segmentsIndex].EncryptedSize + batchIndex++ + segmentsIndex++ + } + _, err = vq.db.DB.ExecContext(ctx, ` INSERT INTO verification_audits (stream_id, position, expires_at, encrypted_size) SELECT unnest($1::bytea[]), unnest($2::int8[]), unnest($3::timestamptz[]), unnest($4::int4[]) `, - pgutil.ByteaArray(streamIDSlice), - pgutil.Int8Array(positionSlice), - pgutil.NullTimestampTZArray(expirationSlice), - pgutil.Int4Array(encryptedSizeSlice), - ) - return Error.Wrap(err) + pgutil.UUIDArray(streamIDSlice[:batchIndex]), + pgutil.Int8Array(positionSlice[:batchIndex]), + pgutil.NullTimestampTZArray(expirationSlice[:batchIndex]), + pgutil.Int4Array(encryptedSizeSlice[:batchIndex]), + ) + if err != nil { + return Error.Wrap(err) + } + } + return nil } func (vq *verifyQueue) Next(ctx context.Context) (seg audit.Segment, err error) { diff --git a/satellite/satellitedb/verifyqueue_test.go b/satellite/satellitedb/verifyqueue_test.go index 5eaeafab5..51edc7ab7 100644 --- a/satellite/satellitedb/verifyqueue_test.go +++ b/satellite/satellitedb/verifyqueue_test.go @@ -34,17 +34,17 @@ func TestVerifyQueueBasicUsage(t *testing.T) { segmentsToVerify[1].ExpiresAt = &expireTime // add these segments to the queue, 3 at a time - err := verifyQueue.Push(ctx, segmentsToVerify[0:3]) + err := verifyQueue.Push(ctx, segmentsToVerify[0:3], 3) require.NoError(t, err) - err = verifyQueue.Push(ctx, segmentsToVerify[3:6]) + err = verifyQueue.Push(ctx, segmentsToVerify[3:6], 3) require.NoError(t, err) // sort both sets of 3. segments inserted in the same call to Push // can't be differentiated by insertion time, so they are ordered in the // queue by (stream_id, position). We will sort our list here so that it // matches the order we expect to receive them from the queue. - sort.Sort(byStreamIDAndPosition(segmentsToVerify[0:3])) - sort.Sort(byStreamIDAndPosition(segmentsToVerify[3:6])) + sort.Sort(audit.ByStreamIDAndPosition(segmentsToVerify[0:3])) + sort.Sort(audit.ByStreamIDAndPosition(segmentsToVerify[3:6])) // Pop all segments from the queue and check for a match with the input. for _, expected := range segmentsToVerify { @@ -67,33 +67,12 @@ func TestVerifyQueueBasicUsage(t *testing.T) { }) } -type byStreamIDAndPosition []audit.Segment - -func (b byStreamIDAndPosition) Len() int { - return len(b) -} - -func (b byStreamIDAndPosition) Less(i, j int) bool { - comparison := b[i].StreamID.Compare(b[j].StreamID) - if comparison < 0 { - return true - } - if comparison > 0 { - return false - } - return b[i].Position.Less(b[j].Position) -} - -func (b byStreamIDAndPosition) Swap(i, j int) { - b[i], b[j] = b[j], b[i] -} - func TestVerifyQueueEmpty(t *testing.T) { satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { verifyQueue := db.VerifyQueue() // insert empty list - err := verifyQueue.Push(ctx, []audit.Segment{}) + err := verifyQueue.Push(ctx, []audit.Segment{}, 1000) require.NoError(t, err) // read from empty queue @@ -103,3 +82,35 @@ func TestVerifyQueueEmpty(t *testing.T) { require.Equal(t, audit.Segment{}, popped) }) } + +func TestMultipleBatches(t *testing.T) { + satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { + verifyQueue := db.VerifyQueue() + + // make a list of segments, and push them in batches + const ( + numSegments = 20 + batchSize = 5 + ) + segments := make([]audit.Segment, numSegments) + for i := range segments { + segments[i] = audit.Segment{ + StreamID: testrand.UUID(), + Position: metabase.SegmentPositionFromEncoded(rand.Uint64()), + EncryptedSize: rand.Int31(), + } + } + + err := verifyQueue.Push(ctx, segments, batchSize) + require.NoError(t, err) + + // verify that all segments made it to the db + sort.Sort(audit.ByStreamIDAndPosition(segments)) + + for i := range segments { + gotSegment, err := verifyQueue.Next(ctx) + require.NoError(t, err) + require.Equal(t, segments[i], gotSegment) + } + }) +} diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index 35210e38a..edadb9be5 100755 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -58,6 +58,9 @@ # number of reservoir slots allotted for nodes, currently capped at 3 # audit.slots: 3 +# number of audit jobs to push at once to the verification queue +# audit.verification-push-batch-size: 4096 + # number of workers to run audits on segments # audit.worker-concurrency: 2