satellite/audit: Split Reverifier from Verifier

Adding a new worker comparable to Verifier, called Reverifier; as the
name suggests, it will be used for reverifications, whereas Verifier
will be used for verifications.

This allows distinct logging from the two classes, plus we can add some
configuration that is specific to the Reverifier.

There is a slight modification to GetNextJob that goes along with this.

This should have no impact on operational concerns.

Refs: https://github.com/storj/storj/issues/5251
Change-Id: Ie60d2d833bc5db8660bb463dd93c764bb40fc49c
This commit is contained in:
paul cannon 2022-11-21 18:35:13 -06:00
parent 8777523255
commit 2617925f8d
7 changed files with 91 additions and 55 deletions

View File

@ -137,6 +137,7 @@ type Satellite struct {
Worker *audit.Worker
Chore *audit.Chore
Verifier *audit.Verifier
Reverifier *audit.Reverifier
Reporter audit.Reporter
}
@ -612,6 +613,7 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer
system.Audit.Worker = peer.Audit.Worker
system.Audit.Chore = peer.Audit.Chore
system.Audit.Verifier = peer.Audit.Verifier
system.Audit.Reverifier = peer.Audit.Reverifier
system.Audit.Reporter = peer.Audit.Reporter
system.GarbageCollection.Sender = gcPeer.GarbageCollection.Sender

View File

@ -5,6 +5,7 @@ package audit
import (
"context"
"time"
"github.com/zeebo/errs"
@ -30,7 +31,7 @@ type VerifyQueue interface {
// audit. (Or until we try too many times, and disqualify the node.)
type ReverifyQueue interface {
Insert(ctx context.Context, piece *PieceLocator) (err error)
GetNextJob(ctx context.Context) (job *ReverificationJob, err error)
GetNextJob(ctx context.Context, retryInterval time.Duration) (job *ReverificationJob, err error)
Remove(ctx context.Context, piece *PieceLocator) (wasDeleted bool, err error)
GetByNodeID(ctx context.Context, nodeID storj.NodeID) (audit *ReverificationJob, err error)
}

View File

@ -44,6 +44,22 @@ type ReverificationJob struct {
LastAttempt time.Time
}
// Reverifier pulls jobs from the reverification queue and fulfills them
// by performing the requested reverifications.
//
// architecture: Worker
type Reverifier struct {
*Verifier
log *zap.Logger
db ReverifyQueue
// retryInterval defines a limit on how frequently we will retry
// reverification audits. At least this long should elapse between
// attempts.
retryInterval time.Duration
}
// Outcome enumerates the possible results of a piecewise audit.
//
// Note that it is very similar to reputation.AuditType, but it is
@ -77,19 +93,29 @@ const (
OutcomeUnknownError
)
// NewReverifier creates a Reverifier.
func NewReverifier(log *zap.Logger, verifier *Verifier, db ReverifyQueue, config Config) *Reverifier {
return &Reverifier{
log: log,
Verifier: verifier,
db: db,
retryInterval: config.ReverificationRetryInterval,
}
}
// ReverifyPiece acquires a piece from a single node and verifies its
// contents, its hash, and its order limit.
func (verifier *Verifier) ReverifyPiece(ctx context.Context, locator PieceLocator) (keepInQueue bool) {
func (reverifier *Reverifier) ReverifyPiece(ctx context.Context, locator PieceLocator) (keepInQueue bool) {
defer mon.Task()(&ctx)(nil)
logger := verifier.log.With(
logger := reverifier.log.With(
zap.Stringer("stream-id", locator.StreamID),
zap.Uint32("position-part", locator.Position.Part),
zap.Uint32("position-index", locator.Position.Index),
zap.Stringer("node-id", locator.NodeID),
zap.Int("piece-num", locator.PieceNum))
outcome, err := verifier.DoReverifyPiece(ctx, logger, locator)
outcome, err := reverifier.DoReverifyPiece(ctx, logger, locator)
if err != nil {
logger.Error("could not perform reverification due to error", zap.Error(err))
return true
@ -131,11 +157,11 @@ func (verifier *Verifier) ReverifyPiece(ctx context.Context, locator PieceLocato
// DoReverifyPiece acquires a piece from a single node and verifies its
// contents, its hash, and its order limit.
func (verifier *Verifier) DoReverifyPiece(ctx context.Context, logger *zap.Logger, locator PieceLocator) (outcome Outcome, err error) {
func (reverifier *Reverifier) DoReverifyPiece(ctx context.Context, logger *zap.Logger, locator PieceLocator) (outcome Outcome, err error) {
defer mon.Task()(&ctx)(&err)
// First, we must ensure that the specified node still holds the indicated piece.
segment, err := verifier.metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
segment, err := reverifier.metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: locator.StreamID,
Position: locator.Position,
})
@ -146,7 +172,7 @@ func (verifier *Verifier) DoReverifyPiece(ctx context.Context, logger *zap.Logge
}
return OutcomeNotPerformed, Error.Wrap(err)
}
if segment.Expired(verifier.nowFn()) {
if segment.Expired(reverifier.nowFn()) {
logger.Debug("segment expired before ReverifyPiece")
return OutcomeNotNecessary, nil
}
@ -169,7 +195,7 @@ func (verifier *Verifier) DoReverifyPiece(ctx context.Context, logger *zap.Logge
pieceSize := eestream.CalcPieceSize(int64(segment.EncryptedSize), redundancy)
limit, piecePrivateKey, cachedNodeInfo, err := verifier.orders.CreateAuditPieceOrderLimit(ctx, locator.NodeID, uint16(locator.PieceNum), segment.RootPieceID, int32(pieceSize))
limit, piecePrivateKey, cachedNodeInfo, err := reverifier.orders.CreateAuditPieceOrderLimit(ctx, locator.NodeID, uint16(locator.PieceNum), segment.RootPieceID, int32(pieceSize))
if err != nil {
if overlay.ErrNodeDisqualified.Has(err) {
logger.Debug("ReverifyPiece: order limit not created (node is already disqualified)")
@ -186,7 +212,7 @@ func (verifier *Verifier) DoReverifyPiece(ctx context.Context, logger *zap.Logge
return OutcomeNotPerformed, Error.Wrap(err)
}
pieceData, pieceHash, pieceOriginalLimit, err := verifier.GetPiece(ctx, limit, piecePrivateKey, cachedNodeInfo.LastIPPort, int32(pieceSize))
pieceData, pieceHash, pieceOriginalLimit, err := reverifier.GetPiece(ctx, limit, piecePrivateKey, cachedNodeInfo.LastIPPort, int32(pieceSize))
if err != nil {
if rpc.Error.Has(err) {
if errs.Is(err, context.DeadlineExceeded) {
@ -203,7 +229,7 @@ func (verifier *Verifier) DoReverifyPiece(ctx context.Context, logger *zap.Logge
}
if errs2.IsRPC(err, rpcstatus.NotFound) {
// Fetch the segment metadata again and see if it has been altered in the interim
err := verifier.checkIfSegmentAltered(ctx, segment)
err := reverifier.checkIfSegmentAltered(ctx, segment)
if err != nil {
// if so, we skip this audit
logger.Debug("ReverifyPiece: audit source segment changed during reverification", zap.Error(err))
@ -250,7 +276,7 @@ func (verifier *Verifier) DoReverifyPiece(ctx context.Context, logger *zap.Logge
// check that the order limit and hash sent by the storagenode were
// correctly signed (order limit signed by this satellite, hash signed
// by the uplink public key in the order limit)
signer := signing.SigneeFromPeerIdentity(verifier.auditor)
signer := signing.SigneeFromPeerIdentity(reverifier.auditor)
if err := signing.VerifyOrderLimitSignature(ctx, signer, pieceOriginalLimit); err != nil {
return OutcomeFailure, nil
}
@ -260,7 +286,7 @@ func (verifier *Verifier) DoReverifyPiece(ctx context.Context, logger *zap.Logge
}
}
if err := verifier.checkIfSegmentAltered(ctx, segment); err != nil {
if err := reverifier.checkIfSegmentAltered(ctx, segment); err != nil {
logger.Debug("ReverifyPiece: audit source segment changed during reverification", zap.Error(err))
return OutcomeNotNecessary, nil
}
@ -273,15 +299,15 @@ func (verifier *Verifier) DoReverifyPiece(ctx context.Context, logger *zap.Logge
// GetPiece uses the piecestore client to download a piece (and the associated
// original OrderLimit and PieceHash) from a node.
func (verifier *Verifier) GetPiece(ctx context.Context, limit *pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, cachedIPAndPort string, pieceSize int32) (pieceData []byte, hash *pb.PieceHash, origLimit *pb.OrderLimit, err error) {
func (reverifier *Reverifier) GetPiece(ctx context.Context, limit *pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, cachedIPAndPort string, pieceSize int32) (pieceData []byte, hash *pb.PieceHash, origLimit *pb.OrderLimit, err error) {
defer mon.Task()(&ctx)(&err)
// determines number of seconds allotted for receiving data from a storage node
timedCtx := ctx
if verifier.minBytesPerSecond > 0 {
maxTransferTime := time.Duration(int64(time.Second) * int64(pieceSize) / verifier.minBytesPerSecond.Int64())
if maxTransferTime < verifier.minDownloadTimeout {
maxTransferTime = verifier.minDownloadTimeout
if reverifier.minBytesPerSecond > 0 {
maxTransferTime := time.Duration(int64(time.Second) * int64(pieceSize) / reverifier.minBytesPerSecond.Int64())
if maxTransferTime < reverifier.minDownloadTimeout {
maxTransferTime = reverifier.minDownloadTimeout
}
var cancel func()
timedCtx, cancel = context.WithTimeout(ctx, maxTransferTime)
@ -289,7 +315,7 @@ func (verifier *Verifier) GetPiece(ctx context.Context, limit *pb.AddressedOrder
}
targetNodeID := limit.GetLimit().StorageNodeId
log := verifier.log.With(zap.Stringer("node-id", targetNodeID), zap.Stringer("piece-id", limit.GetLimit().PieceId))
log := reverifier.log.With(zap.Stringer("node-id", targetNodeID), zap.Stringer("piece-id", limit.GetLimit().PieceId))
var ps *piecestore.Client
// if cached IP is given, try connecting there first
@ -298,7 +324,7 @@ func (verifier *Verifier) GetPiece(ctx context.Context, limit *pb.AddressedOrder
ID: targetNodeID,
Address: cachedIPAndPort,
}
ps, err = piecestore.Dial(timedCtx, verifier.dialer, nodeAddr, piecestore.DefaultConfig)
ps, err = piecestore.Dial(timedCtx, reverifier.dialer, nodeAddr, piecestore.DefaultConfig)
if err != nil {
log.Debug("failed to connect to audit target node at cached IP", zap.String("cached-ip-and-port", cachedIPAndPort), zap.Error(err))
}
@ -310,7 +336,7 @@ func (verifier *Verifier) GetPiece(ctx context.Context, limit *pb.AddressedOrder
ID: targetNodeID,
Address: limit.GetStorageNodeAddress().Address,
}
ps, err = piecestore.Dial(timedCtx, verifier.dialer, nodeAddr, piecestore.DefaultConfig)
ps, err = piecestore.Dial(timedCtx, reverifier.dialer, nodeAddr, piecestore.DefaultConfig)
if err != nil {
return nil, nil, nil, Error.Wrap(err)
}
@ -319,7 +345,7 @@ func (verifier *Verifier) GetPiece(ctx context.Context, limit *pb.AddressedOrder
defer func() {
err := ps.Close()
if err != nil {
log.Error("audit verifier failed to close conn to node", zap.Error(err))
log.Error("audit reverifier failed to close conn to node", zap.Error(err))
}
}()

View File

@ -38,7 +38,7 @@ func TestReverifyPiece(t *testing.T) {
// ensure ReverifyPiece tells us to remove the segment from the queue after a successful audit
for _, piece := range segment.Pieces {
keepInQueue := satellite.Audit.Verifier.ReverifyPiece(ctx, audit.PieceLocator{
keepInQueue := satellite.Audit.Reverifier.ReverifyPiece(ctx, audit.PieceLocator{
StreamID: segment.StreamID,
Position: segment.Position,
NodeID: piece.StorageNode,
@ -68,7 +68,7 @@ func TestDoReverifyPieceSucceeds(t *testing.T) {
// ensure DoReverifyPiece succeeds on the new pieces we uploaded
for _, piece := range segment.Pieces {
outcome, err := satellite.Audit.Verifier.DoReverifyPiece(ctx, planet.Log().Named("reverifier"), audit.PieceLocator{
outcome, err := satellite.Audit.Reverifier.DoReverifyPiece(ctx, planet.Log().Named("reverifier"), audit.PieceLocator{
StreamID: segment.StreamID,
Position: segment.Position,
NodeID: piece.StorageNode,
@ -103,7 +103,7 @@ func TestDoReverifyPieceWithNodeOffline(t *testing.T) {
require.NoError(t, planet.StopPeer(offlineNode))
// see what happens when DoReverifyPiece tries to hit that node
outcome, err := satellite.Audit.Verifier.DoReverifyPiece(ctx, planet.Log().Named("reverifier"), audit.PieceLocator{
outcome, err := satellite.Audit.Reverifier.DoReverifyPiece(ctx, planet.Log().Named("reverifier"), audit.PieceLocator{
StreamID: segment.StreamID,
Position: segment.Position,
NodeID: offlinePiece.StorageNode,
@ -139,7 +139,7 @@ func TestDoReverifyPieceWithPieceMissing(t *testing.T) {
require.NoError(t, err)
// see what happens when DoReverifyPiece tries to hit that node
outcome, err := satellite.Audit.Verifier.DoReverifyPiece(ctx, planet.Log().Named("reverifier"), audit.PieceLocator{
outcome, err := satellite.Audit.Reverifier.DoReverifyPiece(ctx, planet.Log().Named("reverifier"), audit.PieceLocator{
StreamID: segment.StreamID,
Position: segment.Position,
NodeID: missingPiece.StorageNode,
@ -175,7 +175,7 @@ func testReverifyRewrittenPiece(t *testing.T, mutator func(content []byte, heade
rewritePiece(t, ctx, node, satellite.ID(), pieceID, mutator)
outcome, err := satellite.Audit.Verifier.DoReverifyPiece(ctx, planet.Log().Named("reverifier"), audit.PieceLocator{
outcome, err := satellite.Audit.Reverifier.DoReverifyPiece(ctx, planet.Log().Named("reverifier"), audit.PieceLocator{
StreamID: segment.StreamID,
Position: segment.Position,
NodeID: pieceToRewrite.StorageNode,

View File

@ -116,11 +116,13 @@ type Core struct {
}
Audit struct {
VerifyQueue audit.VerifyQueue
Worker *audit.Worker
Chore *audit.Chore
Verifier *audit.Verifier
Reporter audit.Reporter
VerifyQueue audit.VerifyQueue
ReverifyQueue audit.ReverifyQueue
Worker *audit.Worker
Chore *audit.Chore
Verifier *audit.Verifier
Reverifier *audit.Reverifier
Reporter audit.Reporter
}
ExpiredDeletion struct {
@ -417,6 +419,10 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
config.MinBytesPerSecond,
config.MinDownloadTimeout,
)
peer.Audit.Reverifier = audit.NewReverifier(log.Named("audit:reverifier"),
peer.Audit.Verifier,
peer.Audit.ReverifyQueue,
config)
peer.Audit.Reporter = audit.NewReporter(log.Named("audit:reporter"),
peer.Reputation.Service,

View File

@ -16,13 +16,6 @@ import (
"storj.io/storj/satellite/satellitedb/dbx"
)
const (
// ReverifyRetryInterval defines a limit on how frequently we retry
// reverification audits. At least this long should elapse between
// attempts.
ReverifyRetryInterval = 4 * time.Hour
)
// reverifyQueue implements storj.io/storj/satellite/audit.ReverifyQueue.
type reverifyQueue struct {
db *satelliteDB
@ -48,9 +41,12 @@ func (rq *reverifyQueue) Insert(ctx context.Context, piece *audit.PieceLocator)
// GetNextJob retrieves a job from the queue. The job will be the
// job which has been in the queue the longest, except those which
// have already been claimed by another worker within the last
// ReverifyRetryInterval. If there are no such jobs, sql.ErrNoRows
// will be returned.
func (rq *reverifyQueue) GetNextJob(ctx context.Context) (job *audit.ReverificationJob, err error) {
// retryInterval. If there are no such jobs, an error wrapped by
// audit.ErrEmptyQueue will be returned.
//
// retryInterval is expected to be held to the same value for every
// call to GetNextJob() within a given satellite cluster.
func (rq *reverifyQueue) GetNextJob(ctx context.Context, retryInterval time.Duration) (job *audit.ReverificationJob, err error) {
defer mon.Task()(&ctx)(&err)
job = &audit.ReverificationJob{}
@ -70,7 +66,7 @@ func (rq *reverifyQueue) GetNextJob(ctx context.Context) (job *audit.Reverificat
AND ra.stream_id = next_entry.stream_id
AND ra.position = next_entry.position
RETURNING ra.node_id, ra.stream_id, ra.position, ra.piece_num, ra.inserted_at, ra.reverify_count
`, ReverifyRetryInterval.Microseconds()).Scan(
`, retryInterval.Microseconds()).Scan(
&job.Locator.NodeID,
&job.Locator.StreamID,
&job.Locator.Position,
@ -78,6 +74,9 @@ func (rq *reverifyQueue) GetNextJob(ctx context.Context) (job *audit.Reverificat
&job.InsertedAt,
&job.ReverifyCount,
)
if errors.Is(err, sql.ErrNoRows) {
return nil, audit.ErrEmptyQueue.Wrap(err)
}
return job, err
}

View File

@ -5,7 +5,6 @@ package satellitedb_test
import (
"context"
"database/sql"
"testing"
"time"
@ -17,7 +16,6 @@ import (
"storj.io/storj/satellite"
"storj.io/storj/satellite/audit"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/satellitedb"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
)
@ -33,6 +31,10 @@ func randomLocator() *audit.PieceLocator {
}
}
const (
retryInterval = 30 * time.Minute
)
func TestReverifyQueue(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
reverifyQueue := db.ReverifyQueue()
@ -50,34 +52,34 @@ func TestReverifyQueue(t *testing.T) {
err = reverifyQueue.Insert(ctx, locator2)
require.NoError(t, err)
job1, err := reverifyQueue.GetNextJob(ctx)
job1, err := reverifyQueue.GetNextJob(ctx, retryInterval)
require.NoError(t, err)
require.Equal(t, *locator1, job1.Locator)
require.Equal(t, 1, job1.ReverifyCount)
require.EqualValues(t, 1, job1.ReverifyCount)
job2, err := reverifyQueue.GetNextJob(ctx)
job2, err := reverifyQueue.GetNextJob(ctx, retryInterval)
require.NoError(t, err)
require.Equal(t, *locator2, job2.Locator)
require.Equal(t, 1, job2.ReverifyCount)
require.EqualValues(t, 1, job2.ReverifyCount)
require.Truef(t, job1.InsertedAt.Before(job2.InsertedAt), "job1 [%s] should have an earlier insertion time than job2 [%s]", job1.InsertedAt, job2.InsertedAt)
_, err = reverifyQueue.GetNextJob(ctx)
require.Error(t, sql.ErrNoRows, err)
_, err = reverifyQueue.GetNextJob(ctx, retryInterval)
require.Truef(t, audit.ErrEmptyQueue.Has(err), "expected empty queue error, but got error %+v", err)
// pretend that ReverifyRetryInterval has elapsed
reverifyQueueTest := reverifyQueue.(interface {
TestingFudgeUpdateTime(ctx context.Context, piece *audit.PieceLocator, updateTime time.Time) error
})
err = reverifyQueueTest.TestingFudgeUpdateTime(ctx, locator1, time.Now().Add(-satellitedb.ReverifyRetryInterval))
err = reverifyQueueTest.TestingFudgeUpdateTime(ctx, locator1, time.Now().Add(-retryInterval))
require.NoError(t, err)
// job 1 should be eligible for a new worker to take over now (whatever
// worker acquired job 1 before is presumed to have died or timed out).
job3, err := reverifyQueue.GetNextJob(ctx)
job3, err := reverifyQueue.GetNextJob(ctx, retryInterval)
require.NoError(t, err)
require.Equal(t, *locator1, job3.Locator)
require.Equal(t, 2, job3.ReverifyCount)
require.EqualValues(t, 2, job3.ReverifyCount)
wasDeleted, err := reverifyQueue.Remove(ctx, locator1)
require.NoError(t, err)
@ -89,8 +91,8 @@ func TestReverifyQueue(t *testing.T) {
require.NoError(t, err)
require.False(t, wasDeleted)
_, err = reverifyQueue.GetNextJob(ctx)
require.Error(t, sql.ErrNoRows, err)
_, err = reverifyQueue.GetNextJob(ctx, retryInterval)
require.Truef(t, audit.ErrEmptyQueue.Has(err), "expected empty queue error, but got error %+v", err)
})
}