diff --git a/satellite/audit/disqualification_test.go b/satellite/audit/disqualification_test.go index 5002af0b6..99012ae68 100644 --- a/satellite/audit/disqualification_test.go +++ b/satellite/audit/disqualification_test.go @@ -48,7 +48,7 @@ func TestDisqualificationTooManyFailedAudits(t *testing.T) { satellitePeer = planet.Satellites[0] nodeID = planet.StorageNodes[0].ID() report = audit.Report{ - Fails: storj.NodeIDList{nodeID}, + Fails: metabase.Pieces{{StorageNode: nodeID}}, } ) satellitePeer.Audit.Worker.Loop.Pause() diff --git a/satellite/audit/reporter.go b/satellite/audit/reporter.go index d487ac778..79df2b69d 100644 --- a/satellite/audit/reporter.go +++ b/satellite/audit/reporter.go @@ -11,6 +11,7 @@ import ( "go.uber.org/zap" "storj.io/common/storj" + "storj.io/storj/satellite/metabase" "storj.io/storj/satellite/overlay" "storj.io/storj/satellite/reputation" ) @@ -22,6 +23,7 @@ type reporter struct { log *zap.Logger reputations *reputation.Service overlay *overlay.Service + metabase *metabase.DB containment Containment maxRetries int maxReverifyCount int32 @@ -40,8 +42,10 @@ type Reporter interface { // succeeded, failed, were offline, have pending audits, or failed for unknown // reasons and their current reputation status. type Report struct { + Segment *metabase.Segment + Successes storj.NodeIDList - Fails storj.NodeIDList + Fails metabase.Pieces Offlines storj.NodeIDList PendingAudits []*ReverificationJob Unknown storj.NodeIDList @@ -49,11 +53,12 @@ type Report struct { } // NewReporter instantiates a reporter. -func NewReporter(log *zap.Logger, reputations *reputation.Service, overlay *overlay.Service, containment Containment, maxRetries int, maxReverifyCount int32) Reporter { +func NewReporter(log *zap.Logger, reputations *reputation.Service, overlay *overlay.Service, metabase *metabase.DB, containment Containment, maxRetries int, maxReverifyCount int32) Reporter { return &reporter{ log: log, reputations: reputations, overlay: overlay, + metabase: metabase, containment: containment, maxRetries: maxRetries, maxReverifyCount: maxReverifyCount, @@ -72,7 +77,11 @@ func (reporter *reporter) RecordAudits(ctx context.Context, req Report) { offlines := req.Offlines pendingAudits := req.PendingAudits - reporter.log.Debug("Reporting audits", + logger := reporter.log + if req.Segment != nil { + logger = logger.With(zap.Stringer("stream ID", req.Segment.StreamID), zap.Uint64("position", req.Segment.Position.Encode())) + } + logger.Debug("Reporting audits", zap.Int("successes", len(successes)), zap.Int("failures", len(fails)), zap.Int("unknowns", len(unknowns)), @@ -102,8 +111,8 @@ func (reporter *reporter) RecordAudits(ctx context.Context, req Report) { successes, err = reporter.recordAuditStatus(ctx, successes, nodesReputation, reputation.AuditSuccess) reportFailures(tries, "successful", err, successes, nil) - fails, err = reporter.recordAuditStatus(ctx, fails, nodesReputation, reputation.AuditFailure) - reportFailures(tries, "failed", err, fails, nil) + fails, err = reporter.recordFailedAudits(ctx, req.Segment, fails, nodesReputation) + reportFailures(tries, "failed", err, nil, nil) unknowns, err = reporter.recordAuditStatus(ctx, unknowns, nodesReputation, reputation.AuditUnknown) reportFailures(tries, "unknown", err, unknowns, nil) offlines, err = reporter.recordAuditStatus(ctx, offlines, nodesReputation, reputation.AuditOffline) @@ -124,7 +133,7 @@ func (reporter *reporter) recordAuditStatus(ctx context.Context, nodeIDs storj.N err = reporter.reputations.ApplyAudit(ctx, nodeID, nodesReputation[nodeID], auditOutcome) if err != nil { failed = append(failed, nodeID) - errors.Add(Error.New("failed to record audit status %s in overlay for node %s: %w", auditOutcome.String(), nodeID.String(), err)) + errors.Add(Error.New("failed to record audit status %s in overlay for node %s: %w", auditOutcome.String(), nodeID, err)) } } return failed, errors.Err() @@ -182,6 +191,50 @@ func (reporter *reporter) recordPendingAudits(ctx context.Context, pendingAudits return nil, nil } +const maxPiecesToRemoveAtOnce = 6 + +// recordFailedAudits performs reporting and response to hard-failed audits. Failed audits generally +// mean the piece is gone. Remove the pieces from the relevant pointers so that the segment can be +// repaired if appropriate, and so that we don't continually dock reputation for the same missing +// piece(s). +func (reporter *reporter) recordFailedAudits(ctx context.Context, segment *metabase.Segment, failures []metabase.Piece, nodesReputation map[storj.NodeID]overlay.ReputationStatus) (failedToRecord []metabase.Piece, err error) { + defer mon.Task()(&ctx)(&err) + + piecesToRemove := make(metabase.Pieces, 0, len(failures)) + var errors errs.Group + for _, f := range failures { + err = reporter.reputations.ApplyAudit(ctx, f.StorageNode, nodesReputation[f.StorageNode], reputation.AuditFailure) + if err != nil { + failedToRecord = append(failedToRecord, f) + errors.Add(Error.New("failed to record audit failure in overlay for node %s: %w", f.StorageNode, err)) + } + piecesToRemove = append(piecesToRemove, f) + } + if segment != nil { + // Safety check. If, say, 30 pieces all started having audit failures at the same time, the + // problem is more likely with the audit system itself and not with the pieces. + if len(piecesToRemove) > maxPiecesToRemoveAtOnce { + reporter.log.Error("cowardly refusing to remove large number of pieces for failed audit", + zap.Int("piecesToRemove", len(piecesToRemove)), + zap.Int("threshold", maxPiecesToRemoveAtOnce)) + return failedToRecord, errors.Err() + } + pieces, err := segment.Pieces.Remove(piecesToRemove) + if err != nil { + errors.Add(err) + return failedToRecord, errors.Err() + } + errors.Add(reporter.metabase.UpdateSegmentPieces(ctx, metabase.UpdateSegmentPieces{ + StreamID: segment.StreamID, + Position: segment.Position, + OldPieces: segment.Pieces, + NewRedundancy: segment.Redundancy, + NewPieces: pieces, + })) + } + return failedToRecord, errors.Err() +} + func (reporter *reporter) ReportReverificationNeeded(ctx context.Context, piece *PieceLocator) (err error) { defer mon.Task()(&ctx)(&err) @@ -214,7 +267,26 @@ func (reporter *reporter) RecordReverificationResult(ctx context.Context, pendin report.Successes = append(report.Successes, pendingJob.Locator.NodeID) keepInQueue = false case OutcomeFailure: - report.Fails = append(report.Fails, pendingJob.Locator.NodeID) + // We have to look up the segment metainfo and pass it on to RecordAudits so that + // the segment can be modified (removing this piece). We don't persist this + // information through the reverification queue. + segmentInfo, err := reporter.metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ + StreamID: pendingJob.Locator.StreamID, + Position: pendingJob.Locator.Position, + }) + if err != nil { + reporter.log.Error("could not look up segment after audit reverification", + zap.Stringer("stream ID", pendingJob.Locator.StreamID), + zap.Uint64("position", pendingJob.Locator.Position.Encode()), + zap.Error(err), + ) + } else { + report.Segment = &segmentInfo + } + report.Fails = append(report.Fails, metabase.Piece{ + StorageNode: pendingJob.Locator.NodeID, + Number: uint16(pendingJob.Locator.PieceNum), + }) keepInQueue = false case OutcomeTimedOut: // This will get re-added to the reverification queue, but that is idempotent diff --git a/satellite/audit/reporter_test.go b/satellite/audit/reporter_test.go index e0a60a026..64e2933d8 100644 --- a/satellite/audit/reporter_test.go +++ b/satellite/audit/reporter_test.go @@ -11,11 +11,14 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" + "storj.io/common/memory" "storj.io/common/storj" "storj.io/common/testcontext" + "storj.io/common/testrand" "storj.io/storj/private/testplanet" "storj.io/storj/satellite" "storj.io/storj/satellite/audit" + "storj.io/storj/satellite/metabase" "storj.io/storj/satellite/overlay" ) @@ -98,7 +101,7 @@ func TestRecordAuditsCorrectOutcome(t *testing.T) { report := audit.Report{ Successes: []storj.NodeID{goodNode}, - Fails: []storj.NodeID{dqNode}, + Fails: metabase.Pieces{{StorageNode: dqNode}}, Unknown: []storj.NodeID{suspendedNode}, PendingAudits: []*audit.ReverificationJob{ { @@ -213,7 +216,7 @@ func TestGracefullyExitedNotUpdated(t *testing.T) { } report = audit.Report{ Successes: storj.NodeIDList{successNode.ID()}, - Fails: storj.NodeIDList{failedNode.ID()}, + Fails: metabase.Pieces{{StorageNode: failedNode.ID()}}, Offlines: storj.NodeIDList{offlineNode.ID()}, PendingAudits: []*audit.ReverificationJob{&pending}, Unknown: storj.NodeIDList{unknownNode.ID()}, @@ -261,3 +264,52 @@ func TestReportOfflineAudits(t *testing.T) { require.EqualValues(t, 0, info.UnknownAuditReputationBeta) }) } + +func TestReportingAuditFailureResultsInRemovalOfPiece(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: testplanet.Combine( + func(log *zap.Logger, index int, config *satellite.Config) { + // disable reputation write cache so changes are immediate + config.Reputation.FlushInterval = 0 + }, + testplanet.ReconfigureRS(4, 5, 6, 6), + ), + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + satellite := planet.Satellites[0] + ul := planet.Uplinks[0] + + testData := testrand.Bytes(1 * memory.MiB) + err := ul.Upload(ctx, satellite, "bucket-for-test", "path/of/testness", testData) + require.NoError(t, err) + + segment, _ := getRemoteSegment(ctx, t, satellite, ul.Projects[0].ID, "bucket-for-test") + + report := audit.Report{ + Segment: &segment, + Fails: metabase.Pieces{ + metabase.Piece{ + Number: segment.Pieces[0].Number, + StorageNode: segment.Pieces[0].StorageNode, + }, + }, + } + + satellite.Audit.Reporter.RecordAudits(ctx, report) + + // piece marked as failed is no longer in the segment + afterSegment, _ := getRemoteSegment(ctx, t, satellite, ul.Projects[0].ID, "bucket-for-test") + require.Len(t, afterSegment.Pieces, len(segment.Pieces)-1) + for i, p := range afterSegment.Pieces { + assert.NotEqual(t, segment.Pieces[0].Number, p.Number, i) + assert.NotEqual(t, segment.Pieces[0].StorageNode, p.StorageNode, i) + } + + // segment is still retrievable + gotData, err := ul.Download(ctx, satellite, "bucket-for-test", "path/of/testness") + require.NoError(t, err) + require.Equal(t, testData, gotData) + }) +} diff --git a/satellite/audit/verifier.go b/satellite/audit/verifier.go index 457e6d7ae..fda902d3c 100644 --- a/satellite/audit/verifier.go +++ b/satellite/audit/verifier.go @@ -130,7 +130,7 @@ func (verifier *Verifier) Verify(ctx context.Context, segment Segment, skip map[ } var offlineNodes storj.NodeIDList - var failedNodes storj.NodeIDList + var failedNodes metabase.Pieces var unknownNodes storj.NodeIDList containedNodes := make(map[int]storj.NodeID) sharesToAudit := make(map[int]Share) @@ -206,7 +206,10 @@ func (verifier *Verifier) Verify(ctx context.Context, segment Segment, skip map[ case RequestFailure: if errs2.IsRPC(share.Error, rpcstatus.NotFound) { // missing share - failedNodes = append(failedNodes, share.NodeID) + failedNodes = append(failedNodes, metabase.Piece{ + Number: uint16(share.PieceNum), + StorageNode: share.NodeID, + }) errLogger.Info("Verify: piece not found (audit failed)") continue } @@ -258,6 +261,7 @@ func (verifier *Verifier) Verify(ctx context.Context, segment Segment, skip map[ mon.Counter("could_not_verify_audit_shares").Inc(1) //mon:locked verifier.log.Error("could not verify shares", zap.String("Segment", segmentInfoString(segment)), zap.Error(err)) return Report{ + Segment: &segmentInfo, Fails: failedNodes, Offlines: offlineNodes, Unknown: unknownNodes, @@ -268,7 +272,10 @@ func (verifier *Verifier) Verify(ctx context.Context, segment Segment, skip map[ verifier.log.Info("Verify: share data altered (audit failed)", zap.Stringer("Node ID", shares[pieceNum].NodeID), zap.String("Segment", segmentInfoString(segment))) - failedNodes = append(failedNodes, shares[pieceNum].NodeID) + failedNodes = append(failedNodes, metabase.Piece{ + StorageNode: shares[pieceNum].NodeID, + Number: uint16(pieceNum), + }) } successNodes := getSuccessNodes(ctx, shares, failedNodes, offlineNodes, unknownNodes, containedNodes) @@ -276,6 +283,7 @@ func (verifier *Verifier) Verify(ctx context.Context, segment Segment, skip map[ pendingAudits, err := createPendingAudits(ctx, containedNodes, segment) if err != nil { return Report{ + Segment: &segmentInfo, Successes: successNodes, Fails: failedNodes, Offlines: offlineNodes, @@ -284,6 +292,7 @@ func (verifier *Verifier) Verify(ctx context.Context, segment Segment, skip map[ } return Report{ + Segment: &segmentInfo, Successes: successNodes, Fails: failedNodes, Offlines: offlineNodes, @@ -542,11 +551,11 @@ func getOfflineNodes(segment metabase.Segment, limits []*pb.AddressedOrderLimit, } // getSuccessNodes uses the failed nodes, offline nodes and contained nodes arrays to determine which nodes passed the audit. -func getSuccessNodes(ctx context.Context, shares map[int]Share, failedNodes, offlineNodes, unknownNodes storj.NodeIDList, containedNodes map[int]storj.NodeID) (successNodes storj.NodeIDList) { +func getSuccessNodes(ctx context.Context, shares map[int]Share, failedNodes metabase.Pieces, offlineNodes, unknownNodes storj.NodeIDList, containedNodes map[int]storj.NodeID) (successNodes storj.NodeIDList) { defer mon.Task()(&ctx)(nil) fails := make(map[storj.NodeID]bool) for _, fail := range failedNodes { - fails[fail] = true + fails[fail.StorageNode] = true } for _, offline := range offlineNodes { fails[offline] = true diff --git a/satellite/audit/verifier_test.go b/satellite/audit/verifier_test.go index 17caaf4fe..2b6d78eb7 100644 --- a/satellite/audit/verifier_test.go +++ b/satellite/audit/verifier_test.go @@ -968,7 +968,15 @@ func TestVerifierModifiedSegmentFailsOnce(t *testing.T) { assert.Len(t, report.Successes, origNumPieces-1) require.Len(t, report.Fails, 1) - assert.Equal(t, report.Fails[0], piece.StorageNode) + assert.Equal(t, metabase.Piece{ + StorageNode: piece.StorageNode, + Number: piece.Number, + }, report.Fails[0]) + require.NotNil(t, report.Segment) + assert.Equal(t, segment.StreamID, report.Segment.StreamID) + assert.Equal(t, segment.Position, report.Segment.Position) + assert.Equal(t, segment.Redundancy, report.Segment.Redundancy) + assert.Equal(t, segment.Pieces, report.Segment.Pieces) assert.Len(t, report.Offlines, 0) require.Len(t, report.PendingAudits, 0) }) @@ -1196,7 +1204,15 @@ func TestAuditRepairedSegmentInExcludedCountries(t *testing.T) { }, nil) require.NoError(t, err) require.Len(t, report.Fails, 1) - require.Equal(t, report.Fails[0], lastPiece.StorageNode) + require.Equal(t, metabase.Piece{ + StorageNode: lastPiece.StorageNode, + Number: lastPiece.Number, + }, report.Fails[0]) + require.NotNil(t, report.Segment) + assert.Equal(t, segmentAfterRepair.StreamID, report.Segment.StreamID) + assert.Equal(t, segmentAfterRepair.Position, report.Segment.Position) + assert.Equal(t, segmentAfterRepair.Redundancy, report.Segment.Redundancy) + assert.Equal(t, segmentAfterRepair.Pieces, report.Segment.Pieces) }) } diff --git a/satellite/auditor.go b/satellite/auditor.go index e61b0ab51..af3ee06de 100644 --- a/satellite/auditor.go +++ b/satellite/auditor.go @@ -219,6 +219,7 @@ func NewAuditor(log *zap.Logger, full *identity.FullIdentity, log.Named("reporter"), peer.Reputation, peer.Overlay, + metabaseDB, containmentDB, config.Audit.MaxRetriesStatDB, int32(config.Audit.MaxReverifyCount)) diff --git a/satellite/repair/repairer/segments.go b/satellite/repair/repairer/segments.go index 9e9e0086d..8a6d55001 100644 --- a/satellite/repair/repairer/segments.go +++ b/satellite/repair/repairer/segments.go @@ -530,6 +530,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue } report := audit.Report{ + Segment: &segment, NodesReputation: cachedNodesReputation, } @@ -537,7 +538,10 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue report.Successes = append(report.Successes, outcome.Piece.StorageNode) } for _, outcome := range piecesReport.Failed { - report.Fails = append(report.Fails, outcome.Piece.StorageNode) + report.Fails = append(report.Fails, metabase.Piece{ + StorageNode: outcome.Piece.StorageNode, + Number: outcome.Piece.Number, + }) } for _, outcome := range piecesReport.Offline { report.Offlines = append(report.Offlines, outcome.Piece.StorageNode) diff --git a/satellite/repairer.go b/satellite/repairer.go index 325f83d97..a327cff01 100644 --- a/satellite/repairer.go +++ b/satellite/repairer.go @@ -195,6 +195,7 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity, log.Named("reporter"), peer.Reputation, peer.Overlay, + metabaseDB, containmentDB, config.Audit.MaxRetriesStatDB, int32(config.Audit.MaxReverifyCount)) diff --git a/satellite/reputation/suspension_test.go b/satellite/reputation/suspension_test.go index b6d392bc6..5fec3f0b9 100644 --- a/satellite/reputation/suspension_test.go +++ b/satellite/reputation/suspension_test.go @@ -16,6 +16,7 @@ import ( "storj.io/storj/private/testplanet" "storj.io/storj/satellite" "storj.io/storj/satellite/audit" + "storj.io/storj/satellite/metabase" "storj.io/storj/satellite/overlay" "storj.io/storj/satellite/reputation" ) @@ -182,7 +183,7 @@ func TestAuditSuspendExceedGracePeriod(t *testing.T) { // give one node a successful audit, one a failed audit, one an offline audit, and one an unknown audit report := audit.Report{ Successes: storj.NodeIDList{successNodeID}, - Fails: storj.NodeIDList{failNodeID}, + Fails: metabase.Pieces{{StorageNode: failNodeID}}, Offlines: storj.NodeIDList{offlineNodeID}, Unknown: storj.NodeIDList{unknownNodeID}, NodesReputation: nodesStatus, @@ -248,7 +249,7 @@ func TestAuditSuspendDQDisabled(t *testing.T) { // give one node a successful audit, one a failed audit, one an offline audit, and one an unknown audit report := audit.Report{ Successes: storj.NodeIDList{successNodeID}, - Fails: storj.NodeIDList{failNodeID}, + Fails: metabase.Pieces{{StorageNode: failNodeID}}, Offlines: storj.NodeIDList{offlineNodeID}, Unknown: storj.NodeIDList{unknownNodeID}, NodesReputation: nodesStatus,