// Copyright (C) 2022 Storj Labs, Inc. // See LICENSE for copying information. package satellitedb import ( "context" "database/sql" "errors" "sort" "time" "storj.io/common/uuid" "storj.io/private/dbutil" "storj.io/private/dbutil/pgutil" "storj.io/storj/satellite/audit" ) const ( // VerifyRetryInterval defines a limit on how frequently we retry // verification audits. At least this long should elapse between // attempts. VerifyRetryInterval = 4 * time.Hour ) // verifyQueue implements storj.io/storj/satellite/audit.VerifyQueue. type verifyQueue struct { db *satelliteDB } var _ audit.VerifyQueue = (*verifyQueue)(nil) func (vq *verifyQueue) Push(ctx context.Context, segments []audit.Segment, maxBatchSize int) (err error) { defer mon.Task()(&ctx)(&err) streamIDSlice := make([]uuid.UUID, maxBatchSize) positionSlice := make([]int64, maxBatchSize) expirationSlice := make([]*time.Time, maxBatchSize) encryptedSizeSlice := make([]int32, maxBatchSize) // sort segments in the order of the primary key before inserting, for performance reasons sort.Sort(audit.ByStreamIDAndPosition(segments)) segmentsIndex := 0 for segmentsIndex < len(segments) { batchIndex := 0 for batchIndex < maxBatchSize && segmentsIndex < len(segments) { streamIDSlice[batchIndex] = segments[segmentsIndex].StreamID positionSlice[batchIndex] = int64(segments[segmentsIndex].Position.Encode()) expirationSlice[batchIndex] = segments[segmentsIndex].ExpiresAt encryptedSizeSlice[batchIndex] = segments[segmentsIndex].EncryptedSize batchIndex++ segmentsIndex++ } _, err = vq.db.DB.ExecContext(ctx, ` INSERT INTO verification_audits (stream_id, position, expires_at, encrypted_size) SELECT unnest($1::bytea[]), unnest($2::int8[]), unnest($3::timestamptz[]), unnest($4::int4[]) `, pgutil.UUIDArray(streamIDSlice[:batchIndex]), pgutil.Int8Array(positionSlice[:batchIndex]), pgutil.NullTimestampTZArray(expirationSlice[:batchIndex]), pgutil.Int4Array(encryptedSizeSlice[:batchIndex]), ) if err != nil { return Error.Wrap(err) } } return nil } func (vq *verifyQueue) Next(ctx context.Context) (seg audit.Segment, err error) { defer mon.Task()(&ctx)(&err) var getQuery string switch vq.db.impl { case dbutil.Postgres: getQuery = ` WITH next_row AS ( SELECT inserted_at, stream_id, position FROM verification_audits ORDER BY inserted_at, stream_id, position FOR UPDATE SKIP LOCKED LIMIT 1 ) DELETE FROM verification_audits v USING next_row WHERE v.inserted_at = next_row.inserted_at AND v.stream_id = next_row.stream_id AND v.position = next_row.position RETURNING v.stream_id, v.position, v.expires_at, v.encrypted_size ` case dbutil.Cockroach: // Note: because Cockroach does not support SKIP LOCKED, this implementation // is likely much less performant under any amount of contention. getQuery = ` WITH next_row AS ( SELECT inserted_at, stream_id, position FROM verification_audits ORDER BY inserted_at, stream_id, position FOR UPDATE LIMIT 1 ) DELETE FROM verification_audits v WHERE v.inserted_at = (SELECT inserted_at FROM next_row) AND v.stream_id = (SELECT stream_id FROM next_row) AND v.position = (SELECT position FROM next_row) RETURNING v.stream_id, v.position, v.expires_at, v.encrypted_size ` } err = vq.db.DB.QueryRowContext(ctx, getQuery).Scan(&seg.StreamID, &seg.Position, &seg.ExpiresAt, &seg.EncryptedSize) if err != nil { if errors.Is(err, sql.ErrNoRows) { return audit.Segment{}, audit.ErrEmptyQueue.Wrap(err) } return audit.Segment{}, Error.Wrap(err) } return seg, nil }