storj/satellite/satellitedb/verifyqueue.go

106 lines
3.1 KiB
Go
Raw Normal View History

// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package satellitedb
import (
"context"
"database/sql"
"errors"
"time"
"storj.io/private/dbutil"
"storj.io/private/dbutil/pgutil"
"storj.io/storj/satellite/audit"
)
const (
// VerifyRetryInterval defines a limit on how frequently we retry
// verification audits. At least this long should elapse between
// attempts.
VerifyRetryInterval = 4 * time.Hour
)
// verifyQueue implements storj.io/storj/satellite/audit.VerifyQueue.
type verifyQueue struct {
db *satelliteDB
}
var _ audit.VerifyQueue = (*verifyQueue)(nil)
func (vq *verifyQueue) Push(ctx context.Context, segments []audit.Segment) (err error) {
defer mon.Task()(&ctx)(&err)
streamIDSlice := make([][]byte, len(segments))
positionSlice := make([]int64, len(segments))
expirationSlice := make([]*time.Time, len(segments))
encryptedSizeSlice := make([]int32, len(segments))
for i, seg := range segments {
streamIDSlice[i] = seg.StreamID.Bytes()
positionSlice[i] = int64(seg.Position.Encode())
expirationSlice[i] = seg.ExpiresAt
encryptedSizeSlice[i] = seg.EncryptedSize
}
_, err = vq.db.DB.ExecContext(ctx, `
INSERT INTO verification_audits (stream_id, position, expires_at, encrypted_size)
SELECT unnest($1::bytea[]), unnest($2::int8[]), unnest($3::timestamptz[]), unnest($4::int4[])
`,
pgutil.ByteaArray(streamIDSlice),
pgutil.Int8Array(positionSlice),
pgutil.NullTimestampTZArray(expirationSlice),
pgutil.Int4Array(encryptedSizeSlice),
)
return Error.Wrap(err)
}
func (vq *verifyQueue) Next(ctx context.Context) (seg audit.Segment, err error) {
defer mon.Task()(&ctx)(&err)
var getQuery string
switch vq.db.impl {
case dbutil.Postgres:
getQuery = `
WITH next_row AS (
SELECT inserted_at, stream_id, position
FROM verification_audits
ORDER BY inserted_at, stream_id, position
FOR UPDATE SKIP LOCKED
LIMIT 1
)
DELETE FROM verification_audits v
USING next_row
WHERE v.inserted_at = next_row.inserted_at
AND v.stream_id = next_row.stream_id
AND v.position = next_row.position
RETURNING v.stream_id, v.position, v.expires_at, v.encrypted_size
`
case dbutil.Cockroach:
// Note: because Cockroach does not support SKIP LOCKED, this implementation
// is likely much less performant under any amount of contention.
getQuery = `
WITH next_row AS (
SELECT inserted_at, stream_id, position
FROM verification_audits
ORDER BY inserted_at, stream_id, position
FOR UPDATE
LIMIT 1
)
DELETE FROM verification_audits v
WHERE v.inserted_at = (SELECT inserted_at FROM next_row)
AND v.stream_id = (SELECT stream_id FROM next_row)
AND v.position = (SELECT position FROM next_row)
RETURNING v.stream_id, v.position, v.expires_at, v.encrypted_size
`
}
err = vq.db.DB.QueryRowContext(ctx, getQuery).Scan(&seg.StreamID, &seg.Position, &seg.ExpiresAt, &seg.EncryptedSize)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return audit.Segment{}, audit.ErrEmptyQueue.Wrap(err)
}
return audit.Segment{}, Error.Wrap(err)
}
return seg, nil
}