satellite/satellitedb: change reverifyQueue.Insert()

It helps for the (*reverifyQueue).Insert() method to be idempotent (it
does not make sense for the same node to be under containment for the
same piece multiple times). This change allows for that, by adding an
`ON CONFLICT DO NOTHING` clause to the database query.

Refs: https://github.com/storj/storj/issues/5231
Change-Id: Id2839ee185d5396c0bc2f84ffad610df9786f6c7
This commit is contained in:
paul cannon 2022-11-22 16:08:08 -06:00 committed by Storj Robot
parent 1a8954daad
commit d8df75186b

View File

@ -23,19 +23,19 @@ type reverifyQueue struct {
var _ audit.ReverifyQueue = (*reverifyQueue)(nil)
// Insert adds a reverification job to the queue.
// Insert adds a reverification job to the queue. If there is already
// a matching job in the queue, nothing happens. (reverify_count is only
// incremented when a job is selected by GetNextJob.)
func (rq *reverifyQueue) Insert(ctx context.Context, piece *audit.PieceLocator) (err error) {
defer mon.Task()(&ctx)(&err)
_, err = rq.db.Create_ReverificationAudits(
ctx,
dbx.ReverificationAudits_NodeId(piece.NodeID[:]),
dbx.ReverificationAudits_StreamId(piece.StreamID[:]),
dbx.ReverificationAudits_Position(piece.Position.Encode()),
dbx.ReverificationAudits_PieceNum(piece.PieceNum),
dbx.ReverificationAudits_Create_Fields{},
)
return err
_, err = rq.db.DB.ExecContext(ctx, `
INSERT INTO reverification_audits ("node_id", "stream_id", "position", "piece_num")
VALUES ($1, $2, $3, $4)
ON CONFLICT ("node_id", "stream_id", "position") DO NOTHING
`, piece.NodeID[:], piece.StreamID[:], piece.Position.Encode(), piece.PieceNum)
return audit.ContainError.Wrap(err)
}
// GetNextJob retrieves a job from the queue. The job will be the