satellite/satellitedb: use delete returning to query pending_serial_queue

this way we don't have to do 2 steps, and by using the ctid, postgres
is going to do two very efficient prefix scans.

Change-Id: Ia9d0546cdf0a1af67ceec9cd508d336a5fdcbdb9
This commit is contained in:
Jeff Wendling 2020-06-01 15:40:28 -06:00
parent b82d04e618
commit 2b3545c0c3

View File

@ -17,6 +17,7 @@ import (
"storj.io/common/uuid" "storj.io/common/uuid"
"storj.io/storj/private/dbutil" "storj.io/storj/private/dbutil"
"storj.io/storj/private/dbutil/pgutil" "storj.io/storj/private/dbutil/pgutil"
"storj.io/storj/private/tagsql"
"storj.io/storj/satellite/orders" "storj.io/storj/satellite/orders"
"storj.io/storj/satellite/satellitedb/dbx" "storj.io/storj/satellite/satellitedb/dbx"
) )
@ -609,71 +610,74 @@ type rawPendingSerial struct {
} }
type ordersDBQueue struct { type ordersDBQueue struct {
db *satelliteDB impl dbutil.Implementation
log *zap.Logger log *zap.Logger
produced []rawPendingSerial tx tagsql.Tx
} }
func (db *ordersDB) WithQueue(ctx context.Context, cb func(ctx context.Context, queue orders.Queue) error) (err error) { func (db *ordersDB) WithQueue(ctx context.Context, cb func(ctx context.Context, queue orders.Queue) error) (err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
queue := &ordersDBQueue{ return Error.Wrap(db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
db: db.db, return cb(ctx, &ordersDBQueue{
log: db.db.log, impl: db.db.implementation,
} log: db.db.log,
tx: tx.Tx,
err = cb(ctx, queue) })
if err != nil { }))
return errs.Wrap(err)
}
var nodeIDs, bucketIDs, serialNumbers [][]byte
for _, pending := range queue.produced {
nodeIDs = append(nodeIDs, pending.nodeID)
bucketIDs = append(bucketIDs, pending.bucketID)
serialNumbers = append(serialNumbers, pending.serialNumber)
}
_, err = db.db.ExecContext(ctx, `
DELETE FROM pending_serial_queue WHERE (
storage_node_id, bucket_id, serial_number
) IN (
SELECT
unnest($1::bytea[]),
unnest($2::bytea[]),
unnest($3::bytea[])
)
`,
pq.ByteaArray(nodeIDs),
pq.ByteaArray(bucketIDs),
pq.ByteaArray(serialNumbers))
if err != nil {
return Error.Wrap(err)
}
return nil
} }
func (queue *ordersDBQueue) GetPendingSerialsBatch(ctx context.Context, size int) (pendingSerials []orders.PendingSerial, done bool, err error) { func (queue *ordersDBQueue) GetPendingSerialsBatch(ctx context.Context, size int) (pendingSerials []orders.PendingSerial, done bool, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
// TODO: this might end up being WORSE on cockroach because it does a hash-join after a // TODO: no idea of this query makes sense on cockroach. it may do a terrible job with it.
// full scan of the consumed_serials table, but it's massively better on postgres because // but it's blazing fast on postgres and that's where we have the problem! :D :D :D
// it does an indexed anti-join. hopefully we can get rid of the entire serials system
// before it matters.
rows, err := queue.db.Query(ctx, ` var rows *sql.Rows
SELECT storage_node_id, bucket_id, serial_number, action, settled, expires_at, switch queue.impl {
coalesce(( case dbutil.Postgres:
SELECT 1 rows, err = queue.tx.Query(ctx, `
FROM consumed_serials DELETE
FROM pending_serial_queue
WHERE WHERE
consumed_serials.storage_node_id = pending_serial_queue.storage_node_id ctid = any (array(
AND consumed_serials.serial_number = pending_serial_queue.serial_number SELECT
), 0) as consumed ctid
FROM pending_serial_queue FROM pending_serial_queue
LIMIT $1 LIMIT $1
`, size) ))
RETURNING storage_node_id, bucket_id, serial_number, action, settled, expires_at, (
coalesce((
SELECT 1
FROM consumed_serials
WHERE
consumed_serials.storage_node_id = pending_serial_queue.storage_node_id
AND consumed_serials.serial_number = pending_serial_queue.serial_number
), 0))
`, size)
case dbutil.Cockroach:
rows, err = queue.tx.Query(ctx, `
DELETE
FROM pending_serial_queue
WHERE
(storage_node_id, bucket_id, serial_number) = any (array(
SELECT
(storage_node_id, bucket_id, serial_number)
FROM pending_serial_queue
LIMIT $1
))
RETURNING storage_node_id, bucket_id, serial_number, action, settled, expires_at, (
coalesce((
SELECT 1
FROM consumed_serials
WHERE
consumed_serials.storage_node_id = pending_serial_queue.storage_node_id
AND consumed_serials.serial_number = pending_serial_queue.serial_number
), 0))
`, size)
default:
return nil, false, Error.New("unhandled implementation")
}
if err != nil { if err != nil {
return nil, false, Error.Wrap(err) return nil, false, Error.Wrap(err)
} }
@ -697,7 +701,6 @@ func (queue *ordersDBQueue) GetPendingSerialsBatch(ctx context.Context, size int
return nil, false, Error.Wrap(err) return nil, false, Error.Wrap(err)
} }
queue.produced = append(queue.produced, rawPending)
size-- size--
if consumed != 0 { if consumed != 0 {