satellite/audit: Begin using piecewise reverifications

This commit pulls the big switch! We have been setting up piecewise
reverifications (the workers for which can be scaled independently of
the core) for several commits now, and this commit actually begins
making use of them.

The core of this commit is fairly small, but it requires changing the
semantics in all the tests that relate to reverifications, so it ends up
being a large change. The changes to the tests are mostly mechanical and
repetitive, though, so reviewers needn't worry much.

Refs: https://github.com/storj/storj/issues/5230
Change-Id: Ibb421cc021664fd6e0096ffdf5b402a69b2d6f18
This commit is contained in:
paul cannon 2022-11-22 17:18:01 -06:00 committed by Storj Robot
parent 9190a549ad
commit a66503b444
10 changed files with 250 additions and 1097 deletions

View File

@ -32,17 +32,10 @@ storj.io/storj/satellite/audit."audit_unknown_percentage" FloatVal
storj.io/storj/satellite/audit."audited_percentage" FloatVal storj.io/storj/satellite/audit."audited_percentage" FloatVal
storj.io/storj/satellite/audit."could_not_verify_audit_shares" Counter storj.io/storj/satellite/audit."could_not_verify_audit_shares" Counter
storj.io/storj/satellite/audit."not_enough_shares_for_audit" Counter storj.io/storj/satellite/audit."not_enough_shares_for_audit" Counter
storj.io/storj/satellite/audit."reverify_contained" IntVal
storj.io/storj/satellite/audit."reverify_contained_global" Meter storj.io/storj/satellite/audit."reverify_contained_global" Meter
storj.io/storj/satellite/audit."reverify_contained_in_segment" IntVal
storj.io/storj/satellite/audit."reverify_fails" IntVal
storj.io/storj/satellite/audit."reverify_fails_global" Meter storj.io/storj/satellite/audit."reverify_fails_global" Meter
storj.io/storj/satellite/audit."reverify_offlines" IntVal
storj.io/storj/satellite/audit."reverify_offlines_global" Meter storj.io/storj/satellite/audit."reverify_offlines_global" Meter
storj.io/storj/satellite/audit."reverify_successes" IntVal
storj.io/storj/satellite/audit."reverify_successes_global" Meter storj.io/storj/satellite/audit."reverify_successes_global" Meter
storj.io/storj/satellite/audit."reverify_total_in_segment" IntVal
storj.io/storj/satellite/audit."reverify_unknown" IntVal
storj.io/storj/satellite/audit."reverify_unknown_global" Meter storj.io/storj/satellite/audit."reverify_unknown_global" Meter
storj.io/storj/satellite/audit."verify_shares_downloaded_successfully" IntVal storj.io/storj/satellite/audit."verify_shares_downloaded_successfully" IntVal
storj.io/storj/satellite/console."create_user_attempt" Counter storj.io/storj/satellite/console."create_user_attempt" Counter

View File

@ -51,9 +51,6 @@ func TestAuditOrderLimit(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.False(t, queueSegment.StreamID.IsZero()) require.False(t, queueSegment.StreamID.IsZero())
_, err = audits.Verifier.Reverify(ctx, queueSegment)
require.NoError(t, err)
report, err := audits.Verifier.Verify(ctx, queueSegment, nil) report, err := audits.Verifier.Verify(ctx, queueSegment, nil)
require.NoError(t, err) require.NoError(t, err)

View File

@ -39,10 +39,7 @@ func TestNewContainInsertAndGet(t *testing.T) {
output, err := containment.Get(ctx, input.NodeID) output, err := containment.Get(ctx, input.NodeID)
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, input.NodeID, output.Locator.NodeID) assert.Equal(t, *input, output.Locator)
assert.Equal(t, input.StreamID, output.Locator.StreamID)
assert.Equal(t, input.Position, output.Locator.Position)
assert.Equal(t, input.PieceNum, output.Locator.PieceNum)
assert.EqualValues(t, 0, output.ReverifyCount) assert.EqualValues(t, 0, output.ReverifyCount)
nodeID1 := planet.StorageNodes[1].ID() nodeID1 := planet.StorageNodes[1].ID()
@ -116,6 +113,7 @@ func TestNewContainDelete(t *testing.T) {
if got.Locator != *info1 { if got.Locator != *info1 {
require.Equal(t, *info2, got.Locator) require.Equal(t, *info2, got.Locator)
} }
require.EqualValues(t, 0, got.ReverifyCount)
// delete one of the pending reverifications // delete one of the pending reverifications
wasDeleted, stillInContainment, err := containment.Delete(ctx, info2) wasDeleted, stillInContainment, err := containment.Delete(ctx, info2)

View File

@ -116,8 +116,6 @@ func (reporter *reporter) RecordAudits(ctx context.Context, req Report) {
reportFailures(tries, "unknown", err, unknowns, nil, nil) reportFailures(tries, "unknown", err, unknowns, nil, nil)
offlines, err = reporter.recordAuditStatus(ctx, offlines, nodesReputation, reputation.AuditOffline) offlines, err = reporter.recordAuditStatus(ctx, offlines, nodesReputation, reputation.AuditOffline)
reportFailures(tries, "offline", err, offlines, nil, nil) reportFailures(tries, "offline", err, offlines, nil, nil)
pendingAudits, err = reporter.recordPendingAudits(ctx, pendingAudits, nodesReputation)
reportFailures(tries, "pending", err, nil, pendingAudits, nil)
pieceAudits, err = reporter.recordPendingPieceAudits(ctx, pieceAudits, nodesReputation) pieceAudits, err = reporter.recordPendingPieceAudits(ctx, pieceAudits, nodesReputation)
reportFailures(tries, "pending", err, nil, nil, pieceAudits) reportFailures(tries, "pending", err, nil, nil, pieceAudits)
} }
@ -193,49 +191,6 @@ func (reporter *reporter) recordPendingPieceAudits(ctx context.Context, pendingA
return nil, nil return nil, nil
} }
// recordPendingAudits updates the containment status of nodes with pending audits.
func (reporter *reporter) recordPendingAudits(ctx context.Context, pendingAudits []*PendingAudit, nodesReputation map[storj.NodeID]overlay.ReputationStatus) (failed []*PendingAudit, err error) {
defer mon.Task()(&ctx)(&err)
var errlist errs.Group
for _, pendingAudit := range pendingAudits {
if pendingAudit.ReverifyCount < reporter.maxReverifyCount {
err := reporter.containment.IncrementPending(ctx, pendingAudit)
if err != nil {
failed = append(failed, pendingAudit)
errlist.Add(err)
}
reporter.log.Info("Audit pending",
zap.Stringer("Piece ID", pendingAudit.PieceID),
zap.Stringer("Node ID", pendingAudit.NodeID))
} else {
// record failure -- max reverify count reached
reporter.log.Info("max reverify count reached (audit failed)", zap.Stringer("Node ID", pendingAudit.NodeID))
err = reporter.reputations.ApplyAudit(ctx, pendingAudit.NodeID, nodesReputation[pendingAudit.NodeID], reputation.AuditFailure)
if err != nil {
errlist.Add(err)
failed = append(failed, pendingAudit)
} else {
_, err = reporter.containment.Delete(ctx, pendingAudit.NodeID)
if err != nil && !ErrContainedNotFound.Has(err) {
errlist.Add(err)
}
}
}
}
if len(failed) > 0 {
for _, v := range failed {
reporter.log.Debug("failed to record Pending Nodes ",
zap.Stringer("NodeID", v.NodeID),
zap.String("Segment StreamID", v.StreamID.String()),
zap.Uint64("Segment Position", v.Position.Encode()))
}
return failed, errs.Combine(Error.New("failed to record some pending audits"), errlist.Err())
}
return nil, nil
}
func (reporter *reporter) ReportReverificationNeeded(ctx context.Context, piece *PieceLocator) (err error) { func (reporter *reporter) ReportReverificationNeeded(ctx context.Context, piece *PieceLocator) (err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)

View File

@ -195,7 +195,7 @@ func (reverifier *Reverifier) DoReverifyPiece(ctx context.Context, logger *zap.L
} }
if overlay.ErrNodeOffline.Has(err) { if overlay.ErrNodeOffline.Has(err) {
logger.Debug("ReverifyPiece: order limit not created (node considered offline)") logger.Debug("ReverifyPiece: order limit not created (node considered offline)")
return OutcomeNotPerformed, reputation, nil return OutcomeNodeOffline, reputation, nil
} }
return OutcomeNotPerformed, reputation, Error.Wrap(err) return OutcomeNotPerformed, reputation, Error.Wrap(err)
} }

File diff suppressed because it is too large Load Diff

View File

@ -20,7 +20,6 @@ import (
"storj.io/common/identity" "storj.io/common/identity"
"storj.io/common/memory" "storj.io/common/memory"
"storj.io/common/pb" "storj.io/common/pb"
"storj.io/common/pkcrypto"
"storj.io/common/rpc" "storj.io/common/rpc"
"storj.io/common/rpc/rpcpool" "storj.io/common/rpc/rpcpool"
"storj.io/common/rpc/rpcstatus" "storj.io/common/rpc/rpcstatus"
@ -259,7 +258,7 @@ func (verifier *Verifier) Verify(ctx context.Context, segment Segment, skip map[
mon.Counter("could_not_verify_audit_shares").Inc(0) //mon:locked mon.Counter("could_not_verify_audit_shares").Inc(0) //mon:locked
mon.Counter("audit_suspected_network_problem").Inc(0) //mon:locked mon.Counter("audit_suspected_network_problem").Inc(0) //mon:locked
pieceNums, correctedShares, err := auditShares(ctx, required, total, sharesToAudit) pieceNums, _, err := auditShares(ctx, required, total, sharesToAudit)
if err != nil { if err != nil {
mon.Counter("could_not_verify_audit_shares").Inc(1) //mon:locked mon.Counter("could_not_verify_audit_shares").Inc(1) //mon:locked
verifier.log.Error("could not verify shares", zap.String("Segment", segmentInfoString(segment)), zap.Error(err)) verifier.log.Error("could not verify shares", zap.String("Segment", segmentInfoString(segment)), zap.Error(err))
@ -279,7 +278,7 @@ func (verifier *Verifier) Verify(ctx context.Context, segment Segment, skip map[
successNodes := getSuccessNodes(ctx, shares, failedNodes, offlineNodes, unknownNodes, containedNodes) successNodes := getSuccessNodes(ctx, shares, failedNodes, offlineNodes, unknownNodes, containedNodes)
pendingAudits, err := createPendingAudits(ctx, containedNodes, correctedShares, segment, segmentInfo, randomIndex) pendingAudits, err := createPendingAudits(ctx, containedNodes, segment)
if err != nil { if err != nil {
return Report{ return Report{
Successes: successNodes, Successes: successNodes,
@ -290,11 +289,11 @@ func (verifier *Verifier) Verify(ctx context.Context, segment Segment, skip map[
} }
return Report{ return Report{
Successes: successNodes, Successes: successNodes,
Fails: failedNodes, Fails: failedNodes,
Offlines: offlineNodes, Offlines: offlineNodes,
PendingAudits: pendingAudits, PieceAudits: pendingAudits,
Unknown: unknownNodes, Unknown: unknownNodes,
}, nil }, nil
} }
@ -374,260 +373,6 @@ func (verifier *Verifier) IdentifyContainedNodes(ctx context.Context, segment Se
return skipList, nil return skipList, nil
} }
// Reverify reverifies the contained nodes in the stripe.
func (verifier *Verifier) Reverify(ctx context.Context, segment Segment) (report Report, err error) {
defer mon.Task()(&ctx)(&err)
// result status enum
const (
skipped = iota
success
offline
failed
contained
unknown
erred
)
type result struct {
nodeID storj.NodeID
status int
pendingAudit *PendingAudit
reputation overlay.ReputationStatus
release bool
err error
}
if segment.Expired(verifier.nowFn()) {
verifier.log.Debug("segment expired before Reverify")
return Report{}, nil
}
segmentInfo, err := verifier.metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: segment.StreamID,
Position: segment.Position,
})
if err != nil {
if metabase.ErrSegmentNotFound.Has(err) {
verifier.log.Debug("segment deleted before Reverify")
return Report{}, nil
}
return Report{}, err
}
pieces := segmentInfo.Pieces
ch := make(chan result, len(pieces))
var containedInSegment int64
for _, piece := range pieces {
pending, err := verifier.containment.Get(ctx, piece.StorageNode)
if err != nil {
if ErrContainedNotFound.Has(err) {
ch <- result{nodeID: piece.StorageNode, status: skipped}
continue
}
ch <- result{nodeID: piece.StorageNode, status: erred, err: err}
verifier.log.Debug("Reverify: error getting from containment db", zap.Stringer("Node ID", piece.StorageNode), zap.Error(err))
continue
}
// TODO remove this when old entries with empty StreamID will be deleted
if pending.StreamID.IsZero() {
verifier.log.Debug("Reverify: skip pending audit with empty StreamID", zap.Stringer("Node ID", piece.StorageNode))
ch <- result{nodeID: piece.StorageNode, status: skipped}
continue
}
containedInSegment++
go func(pending *PendingAudit) {
pendingSegment, err := verifier.metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: pending.StreamID,
Position: pending.Position,
})
if err != nil {
if metabase.ErrSegmentNotFound.Has(err) {
ch <- result{nodeID: pending.NodeID, status: skipped, release: true}
return
}
ch <- result{nodeID: pending.NodeID, status: erred, err: err}
verifier.log.Debug("Reverify: error getting pending segment from metabase", zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
return
}
if pendingSegment.Expired(verifier.nowFn()) {
verifier.log.Debug("Reverify: segment already expired", zap.Stringer("Node ID", pending.NodeID))
ch <- result{nodeID: pending.NodeID, status: skipped, release: true}
return
}
// TODO: is this check still necessary? If the segment was found by its StreamID and position, the RootPieceID should not had changed.
if pendingSegment.RootPieceID != pending.PieceID {
ch <- result{nodeID: pending.NodeID, status: skipped, release: true}
return
}
var pieceNum uint16
found := false
for _, piece := range pendingSegment.Pieces {
if piece.StorageNode == pending.NodeID {
pieceNum = piece.Number
found = true
}
}
if !found {
ch <- result{nodeID: pending.NodeID, status: skipped, release: true}
return
}
limit, piecePrivateKey, cachedNodeInfo, err := verifier.orders.CreateAuditOrderLimit(ctx, pending.NodeID, pieceNum, pending.PieceID, pending.ShareSize)
if err != nil {
if overlay.ErrNodeDisqualified.Has(err) {
ch <- result{nodeID: pending.NodeID, status: skipped, release: true}
verifier.log.Debug("Reverify: order limit not created (disqualified)", zap.Stringer("Node ID", pending.NodeID))
return
}
if overlay.ErrNodeFinishedGE.Has(err) {
ch <- result{nodeID: pending.NodeID, status: skipped, release: true}
verifier.log.Debug("Reverify: order limit not created (completed graceful exit)", zap.Stringer("Node ID", pending.NodeID))
return
}
if overlay.ErrNodeOffline.Has(err) {
ch <- result{nodeID: pending.NodeID, status: offline, reputation: cachedNodeInfo.Reputation}
verifier.log.Debug("Reverify: order limit not created (offline)", zap.Stringer("Node ID", pending.NodeID))
return
}
ch <- result{nodeID: pending.NodeID, status: erred, err: err}
verifier.log.Debug("Reverify: error creating order limit", zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
return
}
share, err := verifier.GetShare(ctx, limit, piecePrivateKey, cachedNodeInfo.LastIPPort, pending.StripeIndex, pending.ShareSize, int(pieceNum))
// check if the pending audit was deleted while downloading the share
_, getErr := verifier.containment.Get(ctx, pending.NodeID)
if getErr != nil {
if ErrContainedNotFound.Has(getErr) {
ch <- result{nodeID: pending.NodeID, status: skipped}
verifier.log.Debug("Reverify: pending audit deleted during reverification", zap.Stringer("Node ID", pending.NodeID), zap.Error(getErr))
return
}
ch <- result{nodeID: pending.NodeID, status: erred, err: getErr}
verifier.log.Debug("Reverify: error getting from containment db", zap.Stringer("Node ID", pending.NodeID), zap.Error(getErr))
return
}
// analyze the error from GetShare
if err != nil {
if rpc.Error.Has(err) {
if errs.Is(err, context.DeadlineExceeded) {
// dial timeout
ch <- result{nodeID: pending.NodeID, status: offline, reputation: cachedNodeInfo.Reputation}
verifier.log.Debug("Reverify: dial timeout (offline)", zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
return
}
if errs2.IsRPC(err, rpcstatus.Unknown) {
// dial failed -- offline node
verifier.log.Debug("Reverify: dial failed (offline)", zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
ch <- result{nodeID: pending.NodeID, status: offline, reputation: cachedNodeInfo.Reputation}
return
}
// unknown transport error
ch <- result{nodeID: pending.NodeID, status: unknown, pendingAudit: pending, reputation: cachedNodeInfo.Reputation, release: true}
verifier.log.Info("Reverify: unknown transport error (skipped)", zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
return
}
if errs2.IsRPC(err, rpcstatus.NotFound) {
// Get the original segment
err := verifier.checkIfSegmentAltered(ctx, pendingSegment)
if err != nil {
ch <- result{nodeID: pending.NodeID, status: skipped, release: true}
verifier.log.Debug("Reverify: audit source changed before reverification", zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
return
}
// missing share
ch <- result{nodeID: pending.NodeID, status: failed, reputation: cachedNodeInfo.Reputation, release: true}
verifier.log.Info("Reverify: piece not found (audit failed)", zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
return
}
if errs2.IsRPC(err, rpcstatus.DeadlineExceeded) {
// dial successful, but download timed out
ch <- result{nodeID: pending.NodeID, status: contained, pendingAudit: pending, reputation: cachedNodeInfo.Reputation}
verifier.log.Info("Reverify: download timeout (contained)", zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
return
}
// unknown error
ch <- result{nodeID: pending.NodeID, status: unknown, pendingAudit: pending, reputation: cachedNodeInfo.Reputation, release: true}
verifier.log.Info("Reverify: unknown error (skipped)", zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
return
}
downloadedHash := pkcrypto.SHA256Hash(share.Data)
if bytes.Equal(downloadedHash, pending.ExpectedShareHash) {
ch <- result{nodeID: pending.NodeID, status: success, reputation: cachedNodeInfo.Reputation, release: true}
verifier.log.Info("Reverify: hashes match (audit success)", zap.Stringer("Node ID", pending.NodeID))
} else {
err := verifier.checkIfSegmentAltered(ctx, pendingSegment)
if err != nil {
ch <- result{nodeID: pending.NodeID, status: skipped, release: true}
verifier.log.Debug("Reverify: audit source changed before reverification", zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
return
}
verifier.log.Info("Reverify: hashes mismatch (audit failed)", zap.Stringer("Node ID", pending.NodeID),
zap.Binary("expected hash", pending.ExpectedShareHash), zap.Binary("downloaded hash", downloadedHash))
ch <- result{nodeID: pending.NodeID, status: failed, reputation: cachedNodeInfo.Reputation, release: true}
}
}(pending)
}
reputations := make(map[storj.NodeID]overlay.ReputationStatus)
for range pieces {
result := <-ch
reputations[result.nodeID] = result.reputation
switch result.status {
case skipped:
case success:
report.Successes = append(report.Successes, result.nodeID)
case offline:
report.Offlines = append(report.Offlines, result.nodeID)
case failed:
report.Fails = append(report.Fails, result.nodeID)
case contained:
report.PendingAudits = append(report.PendingAudits, result.pendingAudit)
case unknown:
report.Unknown = append(report.Unknown, result.nodeID)
case erred:
err = errs.Combine(err, result.err)
default:
}
if result.release {
_, errDelete := verifier.containment.Delete(ctx, result.nodeID)
if errDelete != nil {
verifier.log.Debug("Error deleting node from containment db", zap.Stringer("Node ID", result.nodeID), zap.Error(errDelete))
}
}
}
report.NodesReputation = reputations
mon.Meter("reverify_successes_global").Mark(len(report.Successes)) //mon:locked
mon.Meter("reverify_offlines_global").Mark(len(report.Offlines)) //mon:locked
mon.Meter("reverify_fails_global").Mark(len(report.Fails)) //mon:locked
mon.Meter("reverify_contained_global").Mark(len(report.PendingAudits)) //mon:locked
mon.Meter("reverify_unknown_global").Mark(len(report.Unknown)) //mon:locked
mon.IntVal("reverify_successes").Observe(int64(len(report.Successes))) //mon:locked
mon.IntVal("reverify_offlines").Observe(int64(len(report.Offlines))) //mon:locked
mon.IntVal("reverify_fails").Observe(int64(len(report.Fails))) //mon:locked
mon.IntVal("reverify_contained").Observe(int64(len(report.PendingAudits))) //mon:locked
mon.IntVal("reverify_unknown").Observe(int64(len(report.Unknown))) //mon:locked
mon.IntVal("reverify_contained_in_segment").Observe(containedInSegment) //mon:locked
mon.IntVal("reverify_total_in_segment").Observe(int64(len(pieces))) //mon:locked
return report, err
}
// GetShare use piece store client to download shares from nodes. // GetShare use piece store client to download shares from nodes.
func (verifier *Verifier) GetShare(ctx context.Context, limit *pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, cachedIPAndPort string, stripeIndex, shareSize int32, pieceNum int) (share Share, err error) { func (verifier *Verifier) GetShare(ctx context.Context, limit *pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, cachedIPAndPort string, stripeIndex, shareSize int32, pieceNum int) (share Share, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
@ -773,7 +518,7 @@ func makeCopies(ctx context.Context, originals map[int]Share) (copies []infectio
return copies, nil return copies, nil
} }
// getOfflines nodes returns these storage nodes from the segment which have no // getOfflineNodes returns those storage nodes from the segment which have no
// order limit nor are skipped. // order limit nor are skipped.
func getOfflineNodes(segment metabase.Segment, limits []*pb.AddressedOrderLimit, skip map[storj.NodeID]bool) storj.NodeIDList { func getOfflineNodes(segment metabase.Segment, limits []*pb.AddressedOrderLimit, skip map[storj.NodeID]bool) storj.NodeIDList {
var offlines storj.NodeIDList var offlines storj.NodeIDList
@ -820,59 +565,28 @@ func getSuccessNodes(ctx context.Context, shares map[int]Share, failedNodes, off
return successNodes return successNodes
} }
func createPendingAudits(ctx context.Context, containedNodes map[int]storj.NodeID, correctedShares []infectious.Share, segment Segment, segmentInfo metabase.Segment, randomIndex int32) (pending []*PendingAudit, err error) { func createPendingAudits(ctx context.Context, containedNodes map[int]storj.NodeID, segment Segment) (pending []*ReverificationJob, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
if len(containedNodes) == 0 { if len(containedNodes) == 0 {
return nil, nil return nil, nil
} }
required := int(segmentInfo.Redundancy.RequiredShares) pending = make([]*ReverificationJob, 0, len(containedNodes))
total := int(segmentInfo.Redundancy.TotalShares)
shareSize := segmentInfo.Redundancy.ShareSize
fec, err := infectious.NewFEC(required, total)
if err != nil {
return nil, Error.Wrap(err)
}
stripeData, err := rebuildStripe(ctx, fec, correctedShares, int(shareSize))
if err != nil {
return nil, Error.Wrap(err)
}
for pieceNum, nodeID := range containedNodes { for pieceNum, nodeID := range containedNodes {
share := make([]byte, shareSize) pending = append(pending, &ReverificationJob{
err = fec.EncodeSingle(stripeData, share, pieceNum) Locator: PieceLocator{
if err != nil { NodeID: nodeID,
return nil, Error.Wrap(err) StreamID: segment.StreamID,
} Position: segment.Position,
pending = append(pending, &PendingAudit{ PieceNum: pieceNum,
NodeID: nodeID, },
PieceID: segmentInfo.RootPieceID,
StripeIndex: randomIndex,
ShareSize: shareSize,
ExpectedShareHash: pkcrypto.SHA256Hash(share),
StreamID: segment.StreamID,
Position: segment.Position,
}) })
} }
return pending, nil return pending, nil
} }
func rebuildStripe(ctx context.Context, fec *infectious.FEC, corrected []infectious.Share, shareSize int) (_ []byte, err error) {
defer mon.Task()(&ctx)(&err)
stripe := make([]byte, fec.Required()*shareSize)
err = fec.Rebuild(corrected, func(share infectious.Share) {
copy(stripe[share.Number*shareSize:], share.Data)
})
if err != nil {
return nil, err
}
return stripe, nil
}
// GetRandomStripe takes a segment and returns a random stripe index within that segment. // GetRandomStripe takes a segment and returns a random stripe index within that segment.
func GetRandomStripe(ctx context.Context, segment metabase.Segment) (index int32, err error) { func GetRandomStripe(ctx context.Context, segment metabase.Segment) (index int32, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)

View File

@ -877,8 +877,8 @@ func TestVerifierSlowDownload(t *testing.T) {
assert.Len(t, report.Fails, 0) assert.Len(t, report.Fails, 0)
assert.Len(t, report.Offlines, 0) assert.Len(t, report.Offlines, 0)
assert.Len(t, report.Unknown, 0) assert.Len(t, report.Unknown, 0)
require.Len(t, report.PendingAudits, 1) require.Len(t, report.PieceAudits, 1)
assert.Equal(t, report.PendingAudits[0].NodeID, slowNode.ID()) assert.Equal(t, report.PieceAudits[0].Locator.NodeID, slowNode.ID())
}) })
} }

View File

@ -5,14 +5,12 @@ package audit
import ( import (
"context" "context"
"math/rand"
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/vivint/infectious" "github.com/vivint/infectious"
"storj.io/common/pkcrypto"
"storj.io/common/storj" "storj.io/common/storj"
"storj.io/common/testrand" "storj.io/common/testrand"
"storj.io/storj/satellite/metabase" "storj.io/storj/satellite/metabase"
@ -126,12 +124,14 @@ func TestCreatePendingAudits(t *testing.T) {
testNodeID := testrand.NodeID() testNodeID := testrand.NodeID()
ctx := context.Background() ctx := context.Background()
const pieceNum = 1
contained := make(map[int]storj.NodeID) contained := make(map[int]storj.NodeID)
contained[1] = testNodeID contained[pieceNum] = testNodeID
segment := testSegment() segment := testSegment()
segmentInfo := metabase.Segment{ segmentInfo := metabase.Segment{
StreamID: segment.StreamID, StreamID: segment.StreamID,
Position: segment.Position,
RootPieceID: testrand.PieceID(), RootPieceID: testrand.PieceID(),
Redundancy: storj.RedundancyScheme{ Redundancy: storj.RedundancyScheme{
Algorithm: storj.ReedSolomon, Algorithm: storj.ReedSolomon,
@ -140,16 +140,14 @@ func TestCreatePendingAudits(t *testing.T) {
ShareSize: int32(len(shares[0].Data)), ShareSize: int32(len(shares[0].Data)),
}, },
} }
randomIndex := rand.Int31n(10)
pending, err := createPendingAudits(ctx, contained, shares, segment, segmentInfo, randomIndex) pending, err := createPendingAudits(ctx, contained, segment)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 1, len(pending)) require.Equal(t, 1, len(pending))
assert.Equal(t, testNodeID, pending[0].NodeID) assert.Equal(t, testNodeID, pending[0].Locator.NodeID)
assert.Equal(t, segmentInfo.RootPieceID, pending[0].PieceID) assert.Equal(t, segmentInfo.StreamID, pending[0].Locator.StreamID)
assert.Equal(t, randomIndex, pending[0].StripeIndex) assert.Equal(t, segmentInfo.Position, pending[0].Locator.Position)
assert.Equal(t, segmentInfo.Redundancy.ShareSize, pending[0].ShareSize) assert.Equal(t, pieceNum, pending[0].Locator.PieceNum)
assert.Equal(t, pkcrypto.SHA256Hash(shares[1].Data), pending[0].ExpectedShareHash)
assert.EqualValues(t, 0, pending[0].ReverifyCount) assert.EqualValues(t, 0, pending[0].ReverifyCount)
} }

View File

@ -11,7 +11,6 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"storj.io/common/memory" "storj.io/common/memory"
"storj.io/common/storj"
"storj.io/common/sync2" "storj.io/common/sync2"
"storj.io/storj/satellite/metabase" "storj.io/storj/satellite/metabase"
) )
@ -117,14 +116,12 @@ func (worker *Worker) work(ctx context.Context, segment Segment) (err error) {
var errlist errs.Group var errlist errs.Group
// First, attempt to reverify nodes for this segment that are in containment mode. // First, remove nodes that are contained. We do not (currently)
report, err := worker.verifier.Reverify(ctx, segment) // audit contained nodes for other pieces until we get an answer
if err != nil { // for the contained audit. (I suspect this could change without
errlist.Add(err) // upsetting anything, but for now it's best to keep it the way
} // it was. -thepaul)
skip, err := worker.verifier.IdentifyContainedNodes(ctx, segment)
worker.reporter.RecordAudits(ctx, report)
if err != nil { if err != nil {
if metabase.ErrSegmentNotFound.Has(err) { if metabase.ErrSegmentNotFound.Has(err) {
// no need to add this error; Verify() will encounter it again // no need to add this error; Verify() will encounter it again
@ -135,26 +132,8 @@ func (worker *Worker) work(ctx context.Context, segment Segment) (err error) {
} }
} }
// Skip all reverified nodes in the next Verify step.
skip := make(map[storj.NodeID]bool)
for _, nodeID := range report.Successes {
skip[nodeID] = true
}
for _, nodeID := range report.Offlines {
skip[nodeID] = true
}
for _, nodeID := range report.Fails {
skip[nodeID] = true
}
for _, pending := range report.PendingAudits {
skip[pending.NodeID] = true
}
for _, nodeID := range report.Unknown {
skip[nodeID] = true
}
// Next, audit the remaining nodes that are not in containment mode. // Next, audit the remaining nodes that are not in containment mode.
report, err = worker.verifier.Verify(ctx, segment, skip) report, err := worker.verifier.Verify(ctx, segment, skip)
if err != nil { if err != nil {
errlist.Add(err) errlist.Add(err)
} }