storj/satellite/satellitedb/reverifyqueue_test.go
paul cannon 7652a598d8 satellite/audit: add GetAllContainedNodes method to ReverifyQueue
We will be needing an infrequent chore to check which nodes are in the
reverify queue and synchronize that set with the 'contained' field in
the nodes db, since it is easily possible for them to get out of sync.
(We can't require that the reverification queue table be in the same
database as the nodes table, so maintaining consistency with SQL
transactions is out. Plus, even if they were in the same database, using
such SQL transactions to maintain consistency would be slow and
unwieldy.)

This commit adds a method to the class representing the reverify queue
in the database, allowing us to get the list of every node that has at
least one record in the reverification queue.

Refs: https://github.com/storj/storj/issues/5431
Change-Id: Idce2633b3d63f2645170365e5cdeb2ea749fa9cb
2023-02-02 00:39:29 +00:00

178 lines
6.0 KiB
Go

// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package satellitedb_test
import (
"context"
"sort"
"testing"
"time"
"github.com/stretchr/testify/require"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/satellite"
"storj.io/storj/satellite/audit"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
)
func randomLocator() *audit.PieceLocator {
return &audit.PieceLocator{
StreamID: testrand.UUID(),
Position: metabase.SegmentPosition{
Part: uint32(testrand.Intn(1 << 10)),
Index: uint32(testrand.Intn(1 << 20)),
},
NodeID: testrand.NodeID(),
PieceNum: testrand.Intn(1 << 10),
}
}
const (
retryInterval = 30 * time.Minute
)
func TestReverifyQueue(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
reverifyQueue := db.ReverifyQueue()
locator1 := randomLocator()
locator2 := randomLocator()
err := reverifyQueue.Insert(ctx, locator1)
require.NoError(t, err)
// Postgres/Cockroach have only microsecond time resolution, so make
// sure at least 1us has elapsed before inserting the second.
sync2.Sleep(ctx, time.Microsecond)
err = reverifyQueue.Insert(ctx, locator2)
require.NoError(t, err)
checkGetAllContainedNodes(ctx, t, reverifyQueue, locator1.NodeID, locator2.NodeID)
job1, err := reverifyQueue.GetNextJob(ctx, retryInterval)
require.NoError(t, err)
require.Equal(t, *locator1, job1.Locator)
require.EqualValues(t, 1, job1.ReverifyCount)
job2, err := reverifyQueue.GetNextJob(ctx, retryInterval)
require.NoError(t, err)
require.Equal(t, *locator2, job2.Locator)
require.EqualValues(t, 1, job2.ReverifyCount)
checkGetAllContainedNodes(ctx, t, reverifyQueue, locator1.NodeID, locator2.NodeID)
require.Truef(t, job1.InsertedAt.Before(job2.InsertedAt), "job1 [%s] should have an earlier insertion time than job2 [%s]", job1.InsertedAt, job2.InsertedAt)
_, err = reverifyQueue.GetNextJob(ctx, retryInterval)
require.Truef(t, audit.ErrEmptyQueue.Has(err), "expected empty queue error, but got error %+v", err)
// pretend that ReverifyRetryInterval has elapsed
reverifyQueueTest := reverifyQueue.(interface {
TestingFudgeUpdateTime(ctx context.Context, piece *audit.PieceLocator, updateTime time.Time) error
})
err = reverifyQueueTest.TestingFudgeUpdateTime(ctx, locator1, time.Now().Add(-retryInterval))
require.NoError(t, err)
// job 1 should be eligible for a new worker to take over now (whatever
// worker acquired job 1 before is presumed to have died or timed out).
job3, err := reverifyQueue.GetNextJob(ctx, retryInterval)
require.NoError(t, err)
require.Equal(t, *locator1, job3.Locator)
require.EqualValues(t, 2, job3.ReverifyCount)
checkGetAllContainedNodes(ctx, t, reverifyQueue, locator1.NodeID, locator2.NodeID)
wasDeleted, err := reverifyQueue.Remove(ctx, locator1)
require.NoError(t, err)
require.True(t, wasDeleted)
checkGetAllContainedNodes(ctx, t, reverifyQueue, locator2.NodeID)
wasDeleted, err = reverifyQueue.Remove(ctx, locator2)
require.NoError(t, err)
require.True(t, wasDeleted)
checkGetAllContainedNodes(ctx, t, reverifyQueue)
wasDeleted, err = reverifyQueue.Remove(ctx, locator1)
require.NoError(t, err)
require.False(t, wasDeleted)
checkGetAllContainedNodes(ctx, t, reverifyQueue)
_, err = reverifyQueue.GetNextJob(ctx, retryInterval)
require.Truef(t, audit.ErrEmptyQueue.Has(err), "expected empty queue error, but got error %+v", err)
})
}
func TestReverifyQueueGetByNodeID(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
reverifyQueue := db.ReverifyQueue()
locator1 := randomLocator()
locator2 := randomLocator()
locator3 := randomLocator()
// two jobs on the same node
locator3.NodeID = locator1.NodeID
err := reverifyQueue.Insert(ctx, locator1)
require.NoError(t, err)
// Postgres/Cockroach have only microsecond time resolution, so make
// sure at least 1us has elapsed between inserts.
sync2.Sleep(ctx, time.Microsecond)
err = reverifyQueue.Insert(ctx, locator2)
require.NoError(t, err)
sync2.Sleep(ctx, time.Microsecond)
err = reverifyQueue.Insert(ctx, locator3)
require.NoError(t, err)
job1, err := reverifyQueue.GetByNodeID(ctx, locator1.NodeID)
require.NoError(t, err)
// we got either locator1 or locator3
if job1.Locator.StreamID == locator1.StreamID {
require.Equal(t, locator1.NodeID, job1.Locator.NodeID)
require.Equal(t, locator1.PieceNum, job1.Locator.PieceNum)
require.Equal(t, locator1.Position, job1.Locator.Position)
} else {
require.Equal(t, locator3.StreamID, job1.Locator.StreamID)
require.Equal(t, locator3.NodeID, job1.Locator.NodeID)
require.Equal(t, locator3.PieceNum, job1.Locator.PieceNum)
require.Equal(t, locator3.Position, job1.Locator.Position)
}
job2, err := reverifyQueue.GetByNodeID(ctx, locator2.NodeID)
require.NoError(t, err)
require.Equal(t, locator2.StreamID, job2.Locator.StreamID)
require.Equal(t, locator2.NodeID, job2.Locator.NodeID)
require.Equal(t, locator2.PieceNum, job2.Locator.PieceNum)
require.Equal(t, locator2.Position, job2.Locator.Position)
// ask for a nonexistent node ID
job3, err := reverifyQueue.GetByNodeID(ctx, testrand.NodeID())
require.Error(t, err)
require.Truef(t, audit.ErrContainedNotFound.Has(err), "expected ErrContainedNotFound error but got %+v", err)
require.Nil(t, job3)
})
}
// checkGetAllContainedNodes checks that the GetAllContainedNodes method works as expected
// in a particular situation.
func checkGetAllContainedNodes(ctx context.Context, t testing.TB, reverifyQueue audit.ReverifyQueue, expectedIDs ...storj.NodeID) {
containedNodes, err := reverifyQueue.GetAllContainedNodes(ctx)
require.NoError(t, err)
sort.Slice(containedNodes, func(i, j int) bool {
return containedNodes[i].Compare(containedNodes[j]) < 0
})
sort.Slice(expectedIDs, func(i, j int) bool {
return expectedIDs[i].Compare(expectedIDs[j]) < 0
})
require.Equal(t, expectedIDs, containedNodes)
}