9c67f62fe3
This table will be used as a queue for pieces that need to be reverified (a regular audit timed out on the owning node, so now that node is contained and we need to validate the piece before un-containing it). Refs: https://github.com/storj/storj/issues/5228 Change-Id: I5dcd26b6adced8674cbd81884c1543a61ea9d4c8
100 lines
3.1 KiB
Go
100 lines
3.1 KiB
Go
// Copyright (C) 2022 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package satellitedb_test
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"storj.io/common/sync2"
|
|
"storj.io/common/testcontext"
|
|
"storj.io/common/testrand"
|
|
"storj.io/storj/satellite"
|
|
"storj.io/storj/satellite/audit"
|
|
"storj.io/storj/satellite/metabase"
|
|
"storj.io/storj/satellite/satellitedb"
|
|
"storj.io/storj/satellite/satellitedb/satellitedbtest"
|
|
)
|
|
|
|
func TestReverifyQueue(t *testing.T) {
|
|
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
|
reverifyQueue := db.ReverifyQueue()
|
|
|
|
locator1 := audit.PieceLocator{
|
|
StreamID: testrand.UUID(),
|
|
Position: metabase.SegmentPosition{
|
|
Part: uint32(testrand.Intn(1 << 10)),
|
|
Index: uint32(testrand.Intn(1 << 20)),
|
|
},
|
|
NodeID: testrand.NodeID(),
|
|
PieceNum: testrand.Intn(1 << 10),
|
|
}
|
|
locator2 := audit.PieceLocator{
|
|
StreamID: testrand.UUID(),
|
|
Position: metabase.SegmentPosition{
|
|
Part: uint32(testrand.Intn(1 << 10)),
|
|
Index: uint32(testrand.Intn(1 << 20)),
|
|
},
|
|
NodeID: testrand.NodeID(),
|
|
PieceNum: testrand.Intn(1 << 10),
|
|
}
|
|
|
|
err := reverifyQueue.Insert(ctx, locator1)
|
|
require.NoError(t, err)
|
|
|
|
// Postgres/Cockroach have only microsecond time resolution, so make
|
|
// sure at least 1us has elapsed before inserting the second.
|
|
sync2.Sleep(ctx, time.Microsecond)
|
|
|
|
err = reverifyQueue.Insert(ctx, locator2)
|
|
require.NoError(t, err)
|
|
|
|
job1, err := reverifyQueue.GetNextJob(ctx)
|
|
require.NoError(t, err)
|
|
require.Equal(t, locator1, job1.Locator)
|
|
require.Equal(t, 1, job1.ReverifyCount)
|
|
|
|
job2, err := reverifyQueue.GetNextJob(ctx)
|
|
require.NoError(t, err)
|
|
require.Equal(t, locator2, job2.Locator)
|
|
require.Equal(t, 1, job2.ReverifyCount)
|
|
|
|
require.Truef(t, job1.InsertedAt.Before(job2.InsertedAt), "job1 [%s] should have an earlier insertion time than job2 [%s]", job1.InsertedAt, job2.InsertedAt)
|
|
|
|
_, err = reverifyQueue.GetNextJob(ctx)
|
|
require.Error(t, sql.ErrNoRows, err)
|
|
|
|
// pretend that ReverifyRetryInterval has elapsed
|
|
reverifyQueueTest := reverifyQueue.(interface {
|
|
TestingFudgeUpdateTime(ctx context.Context, piece audit.PieceLocator, updateTime time.Time) error
|
|
})
|
|
err = reverifyQueueTest.TestingFudgeUpdateTime(ctx, locator1, time.Now().Add(-satellitedb.ReverifyRetryInterval))
|
|
require.NoError(t, err)
|
|
|
|
// job 1 should be eligible for a new worker to take over now (whatever
|
|
// worker acquired job 1 before is presumed to have died or timed out).
|
|
job3, err := reverifyQueue.GetNextJob(ctx)
|
|
require.NoError(t, err)
|
|
require.Equal(t, locator1, job3.Locator)
|
|
require.Equal(t, 2, job3.ReverifyCount)
|
|
|
|
wasDeleted, err := reverifyQueue.Remove(ctx, locator1)
|
|
require.NoError(t, err)
|
|
require.True(t, wasDeleted)
|
|
wasDeleted, err = reverifyQueue.Remove(ctx, locator2)
|
|
require.NoError(t, err)
|
|
require.True(t, wasDeleted)
|
|
wasDeleted, err = reverifyQueue.Remove(ctx, locator1)
|
|
require.NoError(t, err)
|
|
require.False(t, wasDeleted)
|
|
|
|
_, err = reverifyQueue.GetNextJob(ctx)
|
|
require.Error(t, sql.ErrNoRows, err)
|
|
})
|
|
}
|