storj/satellite/satellitedb/containment.go
paul cannon 7652a598d8 satellite/audit: add GetAllContainedNodes method to ReverifyQueue
We will be needing an infrequent chore to check which nodes are in the
reverify queue and synchronize that set with the 'contained' field in
the nodes db, since it is easily possible for them to get out of sync.
(We can't require that the reverification queue table be in the same
database as the nodes table, so maintaining consistency with SQL
transactions is out. Plus, even if they were in the same database, using
such SQL transactions to maintain consistency would be slow and
unwieldy.)

This commit adds a method to the class representing the reverify queue
in the database, allowing us to get the list of every node that has at
least one record in the reverification queue.

Refs: https://github.com/storj/storj/issues/5431
Change-Id: Idce2633b3d63f2645170365e5cdeb2ea749fa9cb
2023-02-02 00:39:29 +00:00

65 lines
2.0 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package satellitedb
import (
"context"
"storj.io/common/pb"
"storj.io/storj/satellite/audit"
)
type containment struct {
reverifyQueue audit.ReverifyQueue
}
var _ audit.Containment = &containment{}
// Get gets a pending reverification audit by node id. If there are
// multiple pending reverification audits, an arbitrary one is returned.
// If there are none, an error wrapped by audit.ErrContainedNotFound is
// returned.
func (containment *containment) Get(ctx context.Context, id pb.NodeID) (_ *audit.ReverificationJob, err error) {
defer mon.Task()(&ctx)(&err)
if id.IsZero() {
return nil, audit.ContainError.New("node ID empty")
}
return containment.reverifyQueue.GetByNodeID(ctx, id)
}
// Insert creates a new pending audit entry.
func (containment *containment) Insert(ctx context.Context, pendingJob *audit.PieceLocator) (err error) {
defer mon.Task()(&ctx)(&err)
return containment.reverifyQueue.Insert(ctx, pendingJob)
}
// Delete removes a job from the reverification queue, whether because the job
// was successful or because the job is no longer necessary. The wasDeleted
// return value indicates whether the indicated job was actually deleted (if
// not, there was no such job in the queue).
func (containment *containment) Delete(ctx context.Context, pendingJob *audit.PieceLocator) (isDeleted, nodeStillContained bool, err error) {
defer mon.Task()(&ctx)(&err)
isDeleted, err = containment.reverifyQueue.Remove(ctx, pendingJob)
if err != nil {
return false, false, audit.ContainError.Wrap(err)
}
nodeStillContained = true
_, err = containment.reverifyQueue.GetByNodeID(ctx, pendingJob.NodeID)
if audit.ErrContainedNotFound.Has(err) {
nodeStillContained = false
err = nil
}
return isDeleted, nodeStillContained, audit.ContainError.Wrap(err)
}
func (containment *containment) GetAllContainedNodes(ctx context.Context) (nodes []pb.NodeID, err error) {
defer mon.Task()(&ctx)(&err)
return containment.reverifyQueue.GetAllContainedNodes(ctx)
}