storj/satellite/satellitedb/verifyqueue.go
paul cannon b612ded9af satellite/audit: help performance of pushing to audit queue
The audit chore will be pushing a large number of segments to be
audited, and the db might choke on that large insert when under load.

This change divides the insert up into batches, which can be sized
however is optimal for the backing database. It also arranges for
segments to be inserted in the order of the primary key, which helps
performance on some systems.

Refs: https://github.com/storj/storj/issues/5228

Change-Id: I941f580f690d681b80c86faf4abca2995e37135d
2022-11-29 15:37:49 +00:00

120 lines
3.6 KiB
Go

// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package satellitedb
import (
"context"
"database/sql"
"errors"
"sort"
"time"
"storj.io/common/uuid"
"storj.io/private/dbutil"
"storj.io/private/dbutil/pgutil"
"storj.io/storj/satellite/audit"
)
const (
// VerifyRetryInterval defines a limit on how frequently we retry
// verification audits. At least this long should elapse between
// attempts.
VerifyRetryInterval = 4 * time.Hour
)
// verifyQueue implements storj.io/storj/satellite/audit.VerifyQueue.
type verifyQueue struct {
db *satelliteDB
}
var _ audit.VerifyQueue = (*verifyQueue)(nil)
func (vq *verifyQueue) Push(ctx context.Context, segments []audit.Segment, maxBatchSize int) (err error) {
defer mon.Task()(&ctx)(&err)
streamIDSlice := make([]uuid.UUID, maxBatchSize)
positionSlice := make([]int64, maxBatchSize)
expirationSlice := make([]*time.Time, maxBatchSize)
encryptedSizeSlice := make([]int32, maxBatchSize)
// sort segments in the order of the primary key before inserting, for performance reasons
sort.Sort(audit.ByStreamIDAndPosition(segments))
segmentsIndex := 0
for segmentsIndex < len(segments) {
batchIndex := 0
for batchIndex < maxBatchSize && segmentsIndex < len(segments) {
streamIDSlice[batchIndex] = segments[segmentsIndex].StreamID
positionSlice[batchIndex] = int64(segments[segmentsIndex].Position.Encode())
expirationSlice[batchIndex] = segments[segmentsIndex].ExpiresAt
encryptedSizeSlice[batchIndex] = segments[segmentsIndex].EncryptedSize
batchIndex++
segmentsIndex++
}
_, err = vq.db.DB.ExecContext(ctx, `
INSERT INTO verification_audits (stream_id, position, expires_at, encrypted_size)
SELECT unnest($1::bytea[]), unnest($2::int8[]), unnest($3::timestamptz[]), unnest($4::int4[])
`,
pgutil.UUIDArray(streamIDSlice[:batchIndex]),
pgutil.Int8Array(positionSlice[:batchIndex]),
pgutil.NullTimestampTZArray(expirationSlice[:batchIndex]),
pgutil.Int4Array(encryptedSizeSlice[:batchIndex]),
)
if err != nil {
return Error.Wrap(err)
}
}
return nil
}
func (vq *verifyQueue) Next(ctx context.Context) (seg audit.Segment, err error) {
defer mon.Task()(&ctx)(&err)
var getQuery string
switch vq.db.impl {
case dbutil.Postgres:
getQuery = `
WITH next_row AS (
SELECT inserted_at, stream_id, position
FROM verification_audits
ORDER BY inserted_at, stream_id, position
FOR UPDATE SKIP LOCKED
LIMIT 1
)
DELETE FROM verification_audits v
USING next_row
WHERE v.inserted_at = next_row.inserted_at
AND v.stream_id = next_row.stream_id
AND v.position = next_row.position
RETURNING v.stream_id, v.position, v.expires_at, v.encrypted_size
`
case dbutil.Cockroach:
// Note: because Cockroach does not support SKIP LOCKED, this implementation
// is likely much less performant under any amount of contention.
getQuery = `
WITH next_row AS (
SELECT inserted_at, stream_id, position
FROM verification_audits
ORDER BY inserted_at, stream_id, position
FOR UPDATE
LIMIT 1
)
DELETE FROM verification_audits v
WHERE v.inserted_at = (SELECT inserted_at FROM next_row)
AND v.stream_id = (SELECT stream_id FROM next_row)
AND v.position = (SELECT position FROM next_row)
RETURNING v.stream_id, v.position, v.expires_at, v.encrypted_size
`
}
err = vq.db.DB.QueryRowContext(ctx, getQuery).Scan(&seg.StreamID, &seg.Position, &seg.ExpiresAt, &seg.EncryptedSize)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return audit.Segment{}, audit.ErrEmptyQueue.Wrap(err)
}
return audit.Segment{}, Error.Wrap(err)
}
return seg, nil
}