satellite/audit: change signature of ReverifyPiece

First, adding a logger argument allows the caller to have a logger
already set up with whatever extra fields they want here.

Secondly, we need to return the Outcome instead of a simple boolean so
that it can be passed on to the Reporter later (need to make the right
decision on increasing reputation vs decreasing it).

Thirdly, we collect the cached reputation information from the overlay
when creating the Orders, and return it from ReverifyPiece. This will
allow the Reporter to determine what reputation-status fields need to be
updated, similarly to how we include a map of ReputationStatus objects
in an audit.Report.

Refs: https://github.com/storj/storj/issues/5251
Change-Id: I5700b9ce543d18b857b81e684323b2d21c498cd8
This commit is contained in:
paul cannon 2022-11-22 12:23:43 -06:00 committed by Storj Robot
parent 378b8915c4
commit 35f60fd5eb
2 changed files with 48 additions and 63 deletions

View File

@ -105,20 +105,13 @@ func NewReverifier(log *zap.Logger, verifier *Verifier, db ReverifyQueue, config
// ReverifyPiece acquires a piece from a single node and verifies its
// contents, its hash, and its order limit.
func (reverifier *Reverifier) ReverifyPiece(ctx context.Context, locator PieceLocator) (keepInQueue bool) {
func (reverifier *Reverifier) ReverifyPiece(ctx context.Context, logger *zap.Logger, locator *PieceLocator) (outcome Outcome, reputation overlay.ReputationStatus) {
defer mon.Task()(&ctx)(nil)
logger := reverifier.log.With(
zap.Stringer("stream-id", locator.StreamID),
zap.Uint32("position-part", locator.Position.Part),
zap.Uint32("position-index", locator.Position.Index),
zap.Stringer("node-id", locator.NodeID),
zap.Int("piece-num", locator.PieceNum))
outcome, err := reverifier.DoReverifyPiece(ctx, logger, locator)
outcome, reputation, err := reverifier.DoReverifyPiece(ctx, logger, locator)
if err != nil {
logger.Error("could not perform reverification due to error", zap.Error(err))
return true
return outcome, reputation
}
var (
@ -129,22 +122,17 @@ func (reverifier *Reverifier) ReverifyPiece(ctx context.Context, locator PieceLo
unknown int
)
switch outcome {
case OutcomeNotPerformed:
keepInQueue = true
case OutcomeNotNecessary:
case OutcomeNotPerformed, OutcomeNotNecessary:
case OutcomeSuccess:
successes++
case OutcomeFailure:
fails++
case OutcomeTimedOut:
pending++
keepInQueue = true
case OutcomeNodeOffline:
offlines++
keepInQueue = true
case OutcomeUnknownError:
unknown++
keepInQueue = true
}
mon.Meter("reverify_successes_global").Mark(successes) //mon:locked
mon.Meter("reverify_offlines_global").Mark(offlines) //mon:locked
@ -152,12 +140,12 @@ func (reverifier *Reverifier) ReverifyPiece(ctx context.Context, locator PieceLo
mon.Meter("reverify_contained_global").Mark(pending) //mon:locked
mon.Meter("reverify_unknown_global").Mark(unknown) //mon:locked
return keepInQueue
return outcome, reputation
}
// DoReverifyPiece acquires a piece from a single node and verifies its
// contents, its hash, and its order limit.
func (reverifier *Reverifier) DoReverifyPiece(ctx context.Context, logger *zap.Logger, locator PieceLocator) (outcome Outcome, err error) {
func (reverifier *Reverifier) DoReverifyPiece(ctx context.Context, logger *zap.Logger, locator *PieceLocator) (outcome Outcome, reputation overlay.ReputationStatus, err error) {
defer mon.Task()(&ctx)(&err)
// First, we must ensure that the specified node still holds the indicated piece.
@ -168,29 +156,29 @@ func (reverifier *Reverifier) DoReverifyPiece(ctx context.Context, logger *zap.L
if err != nil {
if metabase.ErrSegmentNotFound.Has(err) {
logger.Debug("segment no longer exists")
return OutcomeNotNecessary, nil
return OutcomeNotNecessary, reputation, nil
}
return OutcomeNotPerformed, Error.Wrap(err)
return OutcomeNotPerformed, reputation, Error.Wrap(err)
}
if segment.Expired(reverifier.nowFn()) {
logger.Debug("segment expired before ReverifyPiece")
return OutcomeNotNecessary, nil
return OutcomeNotNecessary, reputation, nil
}
piece, found := segment.Pieces.FindByNum(locator.PieceNum)
if !found || piece.StorageNode != locator.NodeID {
logger.Debug("piece is no longer held by the indicated node")
return OutcomeNotNecessary, nil
return OutcomeNotNecessary, reputation, nil
}
// TODO remove this when old entries with empty StreamID will be deleted
if locator.StreamID.IsZero() {
logger.Debug("ReverifyPiece: skip pending audit with empty StreamID")
return OutcomeNotNecessary, nil
return OutcomeNotNecessary, reputation, nil
}
redundancy, err := eestream.NewRedundancyStrategyFromStorj(segment.Redundancy)
if err != nil {
return OutcomeNotPerformed, Error.Wrap(err)
return OutcomeNotPerformed, reputation, Error.Wrap(err)
}
pieceSize := eestream.CalcPieceSize(int64(segment.EncryptedSize), redundancy)
@ -199,33 +187,34 @@ func (reverifier *Reverifier) DoReverifyPiece(ctx context.Context, logger *zap.L
if err != nil {
if overlay.ErrNodeDisqualified.Has(err) {
logger.Debug("ReverifyPiece: order limit not created (node is already disqualified)")
return OutcomeNotNecessary, nil
return OutcomeNotNecessary, reputation, nil
}
if overlay.ErrNodeFinishedGE.Has(err) {
logger.Debug("ReverifyPiece: order limit not created (node has completed graceful exit)")
return OutcomeNotNecessary, nil
return OutcomeNotNecessary, reputation, nil
}
if overlay.ErrNodeOffline.Has(err) {
logger.Debug("ReverifyPiece: order limit not created (node considered offline)")
return OutcomeNotPerformed, nil
return OutcomeNotPerformed, reputation, nil
}
return OutcomeNotPerformed, Error.Wrap(err)
return OutcomeNotPerformed, reputation, Error.Wrap(err)
}
reputation = cachedNodeInfo.Reputation
pieceData, pieceHash, pieceOriginalLimit, err := reverifier.GetPiece(ctx, limit, piecePrivateKey, cachedNodeInfo.LastIPPort, int32(pieceSize))
if err != nil {
if rpc.Error.Has(err) {
if errs.Is(err, context.DeadlineExceeded) {
// dial timeout
return OutcomeTimedOut, nil
return OutcomeTimedOut, reputation, nil
}
if errs2.IsRPC(err, rpcstatus.Unknown) {
// dial failed -- offline node
return OutcomeNodeOffline, nil
return OutcomeNodeOffline, reputation, nil
}
// unknown transport error
logger.Info("ReverifyPiece: unknown transport error", zap.Error(err))
return OutcomeUnknownError, nil
return OutcomeUnknownError, reputation, nil
}
if errs2.IsRPC(err, rpcstatus.NotFound) {
// Fetch the segment metadata again and see if it has been altered in the interim
@ -233,30 +222,30 @@ func (reverifier *Reverifier) DoReverifyPiece(ctx context.Context, logger *zap.L
if err != nil {
// if so, we skip this audit
logger.Debug("ReverifyPiece: audit source segment changed during reverification", zap.Error(err))
return OutcomeNotNecessary, nil
return OutcomeNotNecessary, reputation, nil
}
// missing share
logger.Info("ReverifyPiece: audit failure; node indicates piece not found")
return OutcomeFailure, nil
return OutcomeFailure, reputation, nil
}
if errs2.IsRPC(err, rpcstatus.DeadlineExceeded) {
// dial successful, but download timed out
return OutcomeTimedOut, nil
return OutcomeTimedOut, reputation, nil
}
// unknown error
logger.Info("ReverifyPiece: unknown error from node", zap.Error(err))
return OutcomeUnknownError, nil
return OutcomeUnknownError, reputation, nil
}
// We have successfully acquired the piece from the node. Now, we must verify its contents.
if pieceHash == nil {
logger.Info("ReverifyPiece: audit failure; node did not send piece hash as requested")
return OutcomeFailure, nil
return OutcomeFailure, reputation, nil
}
if pieceOriginalLimit == nil {
logger.Info("ReverifyPiece: audit failure; node did not send original order limit as requested")
return OutcomeFailure, nil
return OutcomeFailure, reputation, nil
}
// check for the correct size
if int64(len(pieceData)) != pieceSize {
@ -278,23 +267,23 @@ func (reverifier *Reverifier) DoReverifyPiece(ctx context.Context, logger *zap.L
// by the uplink public key in the order limit)
signer := signing.SigneeFromPeerIdentity(reverifier.auditor)
if err := signing.VerifyOrderLimitSignature(ctx, signer, pieceOriginalLimit); err != nil {
return OutcomeFailure, nil
return OutcomeFailure, reputation, nil
}
if err := signing.VerifyUplinkPieceHashSignature(ctx, pieceOriginalLimit.UplinkPublicKey, pieceHash); err != nil {
return OutcomeFailure, nil
return OutcomeFailure, reputation, nil
}
}
}
if err := reverifier.checkIfSegmentAltered(ctx, segment); err != nil {
logger.Debug("ReverifyPiece: audit source segment changed during reverification", zap.Error(err))
return OutcomeNotNecessary, nil
return OutcomeNotNecessary, reputation, nil
}
if outcome == OutcomeFailure {
return OutcomeFailure, nil
return OutcomeFailure, reputation, nil
}
return OutcomeSuccess, nil
return OutcomeSuccess, reputation, nil
}
// GetPiece uses the piecestore client to download a piece (and the associated

View File

@ -38,18 +38,18 @@ func TestReverifyPiece(t *testing.T) {
// ensure ReverifyPiece tells us to remove the segment from the queue after a successful audit
for _, piece := range segment.Pieces {
keepInQueue := satellite.Audit.Reverifier.ReverifyPiece(ctx, audit.PieceLocator{
outcome, _ := satellite.Audit.Reverifier.ReverifyPiece(ctx, planet.Log().Named("reverifier"), &audit.PieceLocator{
StreamID: segment.StreamID,
Position: segment.Position,
NodeID: piece.StorageNode,
PieceNum: int(piece.Number),
})
require.False(t, keepInQueue)
require.Equal(t, audit.OutcomeSuccess, outcome)
}
})
}
func TestDoReverifyPieceSucceeds(t *testing.T) {
func TestReverifyPieceSucceeds(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: 5,
@ -66,21 +66,20 @@ func TestDoReverifyPieceSucceeds(t *testing.T) {
segment := uploadSomeData(t, ctx, planet)
// ensure DoReverifyPiece succeeds on the new pieces we uploaded
// ensure ReverifyPiece succeeds on the new pieces we uploaded
for _, piece := range segment.Pieces {
outcome, err := satellite.Audit.Reverifier.DoReverifyPiece(ctx, planet.Log().Named("reverifier"), audit.PieceLocator{
outcome, _ := satellite.Audit.Reverifier.ReverifyPiece(ctx, planet.Log().Named("reverifier"), &audit.PieceLocator{
StreamID: segment.StreamID,
Position: segment.Position,
NodeID: piece.StorageNode,
PieceNum: int(piece.Number),
})
require.NoError(t, err)
require.Equal(t, audit.OutcomeSuccess, outcome)
}
})
}
func TestDoReverifyPieceWithNodeOffline(t *testing.T) {
func TestReverifyPieceWithNodeOffline(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: 5,
@ -102,19 +101,18 @@ func TestDoReverifyPieceWithNodeOffline(t *testing.T) {
require.NotNil(t, offlineNode)
require.NoError(t, planet.StopPeer(offlineNode))
// see what happens when DoReverifyPiece tries to hit that node
outcome, err := satellite.Audit.Reverifier.DoReverifyPiece(ctx, planet.Log().Named("reverifier"), audit.PieceLocator{
// see what happens when ReverifyPiece tries to hit that node
outcome, _ := satellite.Audit.Reverifier.ReverifyPiece(ctx, planet.Log().Named("reverifier"), &audit.PieceLocator{
StreamID: segment.StreamID,
Position: segment.Position,
NodeID: offlinePiece.StorageNode,
PieceNum: int(offlinePiece.Number),
})
require.NoError(t, err)
require.Equal(t, audit.OutcomeNodeOffline, outcome)
})
}
func TestDoReverifyPieceWithPieceMissing(t *testing.T) {
func TestReverifyPieceWithPieceMissing(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: 5,
@ -138,14 +136,13 @@ func TestDoReverifyPieceWithPieceMissing(t *testing.T) {
err := missingPieceNode.Storage2.Store.Delete(ctx, satellite.ID(), missingPieceID)
require.NoError(t, err)
// see what happens when DoReverifyPiece tries to hit that node
outcome, err := satellite.Audit.Reverifier.DoReverifyPiece(ctx, planet.Log().Named("reverifier"), audit.PieceLocator{
// see what happens when ReverifyPiece tries to hit that node
outcome, _ := satellite.Audit.Reverifier.ReverifyPiece(ctx, planet.Log().Named("reverifier"), &audit.PieceLocator{
StreamID: segment.StreamID,
Position: segment.Position,
NodeID: missingPiece.StorageNode,
PieceNum: int(missingPiece.Number),
})
require.NoError(t, err)
require.Equal(t, audit.OutcomeFailure, outcome)
})
}
@ -175,13 +172,12 @@ func testReverifyRewrittenPiece(t *testing.T, mutator func(content []byte, heade
rewritePiece(t, ctx, node, satellite.ID(), pieceID, mutator)
outcome, err := satellite.Audit.Reverifier.DoReverifyPiece(ctx, planet.Log().Named("reverifier"), audit.PieceLocator{
outcome, _ := satellite.Audit.Reverifier.ReverifyPiece(ctx, planet.Log().Named("reverifier"), &audit.PieceLocator{
StreamID: segment.StreamID,
Position: segment.Position,
NodeID: pieceToRewrite.StorageNode,
PieceNum: int(pieceToRewrite.Number),
})
require.NoError(t, err)
require.Equal(t, expectedOutcome, outcome)
})
}
@ -190,34 +186,34 @@ func testReverifyRewrittenPiece(t *testing.T, mutator func(content []byte, heade
// with new contents or a new piece header. Since the api for dealing with the
// piece store may change over time, this test makes sure that we can even expect
// the rewriting trick to work.
func TestDoReverifyPieceWithRewrittenPiece(t *testing.T) {
func TestReverifyPieceWithRewrittenPiece(t *testing.T) {
testReverifyRewrittenPiece(t, func(content []byte, header *pb.PieceHeader) {
// don't change anything; just write back original contents
}, audit.OutcomeSuccess)
}
func TestDoReverifyPieceWithCorruptedContent(t *testing.T) {
func TestReverifyPieceWithCorruptedContent(t *testing.T) {
testReverifyRewrittenPiece(t, func(content []byte, header *pb.PieceHeader) {
// increment last byte of content
content[len(content)-1]++
}, audit.OutcomeFailure)
}
func TestDoReverifyPieceWithCorruptedHash(t *testing.T) {
func TestReverifyPieceWithCorruptedHash(t *testing.T) {
testReverifyRewrittenPiece(t, func(content []byte, header *pb.PieceHeader) {
// change last byte of hash
header.Hash[len(header.Hash)-1]++
}, audit.OutcomeFailure)
}
func TestDoReverifyPieceWithInvalidHashSignature(t *testing.T) {
func TestReverifyPieceWithInvalidHashSignature(t *testing.T) {
testReverifyRewrittenPiece(t, func(content []byte, header *pb.PieceHeader) {
// change last byte of signature on hash
header.Signature[len(header.Signature)-1]++
}, audit.OutcomeFailure)
}
func TestDoReverifyPieceWithInvalidOrderLimitSignature(t *testing.T) {
func TestReverifyPieceWithInvalidOrderLimitSignature(t *testing.T) {
testReverifyRewrittenPiece(t, func(content []byte, header *pb.PieceHeader) {
// change last byte of signature on order limit signature
header.OrderLimit.SatelliteSignature[len(header.OrderLimit.SatelliteSignature)-1]++