satellite/audit: add GetAllContainedNodes method to ReverifyQueue

We will be needing an infrequent chore to check which nodes are in the
reverify queue and synchronize that set with the 'contained' field in
the nodes db, since it is easily possible for them to get out of sync.
(We can't require that the reverification queue table be in the same
database as the nodes table, so maintaining consistency with SQL
transactions is out. Plus, even if they were in the same database, using
such SQL transactions to maintain consistency would be slow and
unwieldy.)

This commit adds a method to the class representing the reverify queue
in the database, allowing us to get the list of every node that has at
least one record in the reverification queue.

Refs: https://github.com/storj/storj/issues/5431
Change-Id: Idce2633b3d63f2645170365e5cdeb2ea749fa9cb
This commit is contained in:
paul cannon 2023-01-25 16:34:40 -06:00 committed by Storj Robot
parent 091ed29935
commit 7652a598d8
5 changed files with 61 additions and 0 deletions

View File

@ -29,4 +29,5 @@ type Containment interface {
Get(ctx context.Context, nodeID pb.NodeID) (*ReverificationJob, error) Get(ctx context.Context, nodeID pb.NodeID) (*ReverificationJob, error)
Insert(ctx context.Context, job *PieceLocator) error Insert(ctx context.Context, job *PieceLocator) error
Delete(ctx context.Context, job *PieceLocator) (wasDeleted, nodeStillContained bool, err error) Delete(ctx context.Context, job *PieceLocator) (wasDeleted, nodeStillContained bool, err error)
GetAllContainedNodes(ctx context.Context) ([]pb.NodeID, error)
} }

View File

@ -34,6 +34,7 @@ type ReverifyQueue interface {
GetNextJob(ctx context.Context, retryInterval time.Duration) (job *ReverificationJob, err error) GetNextJob(ctx context.Context, retryInterval time.Duration) (job *ReverificationJob, err error)
Remove(ctx context.Context, piece *PieceLocator) (wasDeleted bool, err error) Remove(ctx context.Context, piece *PieceLocator) (wasDeleted bool, err error)
GetByNodeID(ctx context.Context, nodeID storj.NodeID) (audit *ReverificationJob, err error) GetByNodeID(ctx context.Context, nodeID storj.NodeID) (audit *ReverificationJob, err error)
GetAllContainedNodes(ctx context.Context) ([]storj.NodeID, error)
} }
// ByStreamIDAndPosition allows sorting of a slice of segments by stream ID and position. // ByStreamIDAndPosition allows sorting of a slice of segments by stream ID and position.

View File

@ -56,3 +56,9 @@ func (containment *containment) Delete(ctx context.Context, pendingJob *audit.Pi
} }
return isDeleted, nodeStillContained, audit.ContainError.Wrap(err) return isDeleted, nodeStillContained, audit.ContainError.Wrap(err)
} }
func (containment *containment) GetAllContainedNodes(ctx context.Context) (nodes []pb.NodeID, err error) {
defer mon.Task()(&ctx)(&err)
return containment.reverifyQueue.GetAllContainedNodes(ctx)
}

View File

@ -9,6 +9,8 @@ import (
"errors" "errors"
"time" "time"
"github.com/zeebo/errs"
"storj.io/common/storj" "storj.io/common/storj"
"storj.io/common/uuid" "storj.io/common/uuid"
"storj.io/storj/satellite/audit" "storj.io/storj/satellite/audit"
@ -137,6 +139,32 @@ func (rq *reverifyQueue) GetByNodeID(ctx context.Context, nodeID storj.NodeID) (
return convertDBJob(ctx, pending) return convertDBJob(ctx, pending)
} }
func (rq *reverifyQueue) GetAllContainedNodes(ctx context.Context) (nodes []storj.NodeID, err error) {
defer mon.Task()(&ctx)(&err)
result, err := rq.db.QueryContext(ctx, `SELECT DISTINCT node_id FROM reverification_audits`)
if err != nil {
return nil, audit.ContainError.Wrap(err)
}
defer func() {
err = errs.Combine(err, audit.ContainError.Wrap(result.Close()))
}()
for result.Next() {
var nodeIDBytes []byte
if err := result.Scan(&nodeIDBytes); err != nil {
return nil, audit.ContainError.Wrap(err)
}
nodeID, err := storj.NodeIDFromBytes(nodeIDBytes)
if err != nil {
return nil, audit.ContainError.Wrap(err)
}
nodes = append(nodes, nodeID)
}
return nodes, audit.ContainError.Wrap(result.Err())
}
func convertDBJob(ctx context.Context, info *dbx.ReverificationAudits) (pendingJob *audit.ReverificationJob, err error) { func convertDBJob(ctx context.Context, info *dbx.ReverificationAudits) (pendingJob *audit.ReverificationJob, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
if info == nil { if info == nil {

View File

@ -5,11 +5,13 @@ package satellitedb_test
import ( import (
"context" "context"
"sort"
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"storj.io/common/storj"
"storj.io/common/sync2" "storj.io/common/sync2"
"storj.io/common/testcontext" "storj.io/common/testcontext"
"storj.io/common/testrand" "storj.io/common/testrand"
@ -52,6 +54,8 @@ func TestReverifyQueue(t *testing.T) {
err = reverifyQueue.Insert(ctx, locator2) err = reverifyQueue.Insert(ctx, locator2)
require.NoError(t, err) require.NoError(t, err)
checkGetAllContainedNodes(ctx, t, reverifyQueue, locator1.NodeID, locator2.NodeID)
job1, err := reverifyQueue.GetNextJob(ctx, retryInterval) job1, err := reverifyQueue.GetNextJob(ctx, retryInterval)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, *locator1, job1.Locator) require.Equal(t, *locator1, job1.Locator)
@ -62,6 +66,8 @@ func TestReverifyQueue(t *testing.T) {
require.Equal(t, *locator2, job2.Locator) require.Equal(t, *locator2, job2.Locator)
require.EqualValues(t, 1, job2.ReverifyCount) require.EqualValues(t, 1, job2.ReverifyCount)
checkGetAllContainedNodes(ctx, t, reverifyQueue, locator1.NodeID, locator2.NodeID)
require.Truef(t, job1.InsertedAt.Before(job2.InsertedAt), "job1 [%s] should have an earlier insertion time than job2 [%s]", job1.InsertedAt, job2.InsertedAt) require.Truef(t, job1.InsertedAt.Before(job2.InsertedAt), "job1 [%s] should have an earlier insertion time than job2 [%s]", job1.InsertedAt, job2.InsertedAt)
_, err = reverifyQueue.GetNextJob(ctx, retryInterval) _, err = reverifyQueue.GetNextJob(ctx, retryInterval)
@ -81,15 +87,20 @@ func TestReverifyQueue(t *testing.T) {
require.Equal(t, *locator1, job3.Locator) require.Equal(t, *locator1, job3.Locator)
require.EqualValues(t, 2, job3.ReverifyCount) require.EqualValues(t, 2, job3.ReverifyCount)
checkGetAllContainedNodes(ctx, t, reverifyQueue, locator1.NodeID, locator2.NodeID)
wasDeleted, err := reverifyQueue.Remove(ctx, locator1) wasDeleted, err := reverifyQueue.Remove(ctx, locator1)
require.NoError(t, err) require.NoError(t, err)
require.True(t, wasDeleted) require.True(t, wasDeleted)
checkGetAllContainedNodes(ctx, t, reverifyQueue, locator2.NodeID)
wasDeleted, err = reverifyQueue.Remove(ctx, locator2) wasDeleted, err = reverifyQueue.Remove(ctx, locator2)
require.NoError(t, err) require.NoError(t, err)
require.True(t, wasDeleted) require.True(t, wasDeleted)
checkGetAllContainedNodes(ctx, t, reverifyQueue)
wasDeleted, err = reverifyQueue.Remove(ctx, locator1) wasDeleted, err = reverifyQueue.Remove(ctx, locator1)
require.NoError(t, err) require.NoError(t, err)
require.False(t, wasDeleted) require.False(t, wasDeleted)
checkGetAllContainedNodes(ctx, t, reverifyQueue)
_, err = reverifyQueue.GetNextJob(ctx, retryInterval) _, err = reverifyQueue.GetNextJob(ctx, retryInterval)
require.Truef(t, audit.ErrEmptyQueue.Has(err), "expected empty queue error, but got error %+v", err) require.Truef(t, audit.ErrEmptyQueue.Has(err), "expected empty queue error, but got error %+v", err)
@ -150,3 +161,17 @@ func TestReverifyQueueGetByNodeID(t *testing.T) {
require.Nil(t, job3) require.Nil(t, job3)
}) })
} }
// checkGetAllContainedNodes checks that the GetAllContainedNodes method works as expected
// in a particular situation.
func checkGetAllContainedNodes(ctx context.Context, t testing.TB, reverifyQueue audit.ReverifyQueue, expectedIDs ...storj.NodeID) {
containedNodes, err := reverifyQueue.GetAllContainedNodes(ctx)
require.NoError(t, err)
sort.Slice(containedNodes, func(i, j int) bool {
return containedNodes[i].Compare(containedNodes[j]) < 0
})
sort.Slice(expectedIDs, func(i, j int) bool {
return expectedIDs[i].Compare(expectedIDs[j]) < 0
})
require.Equal(t, expectedIDs, containedNodes)
}