satellite/audit: fix containment mode (#3085)

* add test to make sure we will reverify the share in the containment db rather than in the pointer passed into reverify

* use pending audit information only when running reverify
This commit is contained in:
Maximillian von Briesen 2019-09-18 19:45:15 -04:00 committed by littleskunk
parent 1c72e80e40
commit a4048fd529
2 changed files with 184 additions and 53 deletions

View File

@ -519,3 +519,113 @@ func TestReverifyModifiedSegment(t *testing.T) {
require.True(t, audit.ErrContainedNotFound.Has(err))
})
}
func TestReverifyDifferentShare(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
// - uploads random data to two files
// - get a random stripe to audit from file 1
// - creates one pending audit for a node holding a piece for that stripe
// - the actual share is downloaded to make sure ExpectedShareHash is correct
// - delete piece for file 1 from the selected node
// - calls reverify on some stripe from file 2
// - expects one storage node to be marked as a fail in the audit report
// - (if file 2 is used during reverify, the node will pass the audit and the test should fail)
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
ul := planet.Uplinks[0]
testData1 := testrand.Bytes(8 * memory.KiB)
testData2 := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, satellite, "testbucket", "test/path1", testData1)
require.NoError(t, err)
err = ul.Upload(ctx, satellite, "testbucket", "test/path2", testData2)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
path1, err := queue.Next()
require.NoError(t, err)
path2, err := queue.Next()
require.NoError(t, err)
require.NotEqual(t, path1, path2)
pointer1, err := satellite.Metainfo.Service.Get(ctx, path1)
require.NoError(t, err)
pointer2, err := satellite.Metainfo.Service.Get(ctx, path2)
require.NoError(t, err)
// find a node that contains a piece for both files
// save that node ID and the piece number associated with it for pointer1
var selectedNode storj.NodeID
var selectedPieceNum int32
p1Nodes := make(map[storj.NodeID]int32)
for _, piece := range pointer1.GetRemote().GetRemotePieces() {
p1Nodes[piece.NodeId] = piece.PieceNum
}
for _, piece := range pointer2.GetRemote().GetRemotePieces() {
pieceNum, ok := p1Nodes[piece.NodeId]
if ok {
selectedNode = piece.NodeId
selectedPieceNum = pieceNum
break
}
}
require.NotEqual(t, selectedNode, storj.NodeID{})
randomIndex, err := audit.GetRandomStripe(ctx, pointer1)
require.NoError(t, err)
orders := satellite.Orders.Service
containment := satellite.DB.Containment()
projects, err := satellite.DB.Console().Projects().GetAll(ctx)
require.NoError(t, err)
bucketID := []byte(storj.JoinPaths(projects[0].ID.String(), "testbucket"))
shareSize := pointer1.GetRemote().GetRedundancy().GetErasureShareSize()
rootPieceID := pointer1.GetRemote().RootPieceId
limit, privateKey, err := orders.CreateAuditOrderLimit(ctx, bucketID, selectedNode, selectedPieceNum, rootPieceID, shareSize)
require.NoError(t, err)
share, err := audits.Verifier.GetShare(ctx, limit, privateKey, randomIndex, shareSize, int(selectedPieceNum))
require.NoError(t, err)
pending := &audit.PendingAudit{
NodeID: selectedNode,
PieceID: rootPieceID,
StripeIndex: randomIndex,
ShareSize: shareSize,
ExpectedShareHash: pkcrypto.SHA256Hash(share.Data),
ReverifyCount: 0,
Path: path1,
}
err = containment.IncrementPending(ctx, pending)
require.NoError(t, err)
// delete the piece for pointer1 from the selected node
pieceID := pointer1.GetRemote().RootPieceId.Derive(selectedNode, selectedPieceNum)
node := getStorageNode(planet, selectedNode)
err = node.Storage2.Store.Delete(ctx, satellite.ID(), pieceID)
require.NoError(t, err)
// reverify with path 2. Since the selected node was put in containment for path1,
// it should be audited for path1 and fail
report, err := audits.Verifier.Reverify(ctx, path2)
require.NoError(t, err)
require.Len(t, report.Successes, 0)
require.Len(t, report.Offlines, 0)
require.Len(t, report.PendingAudits, 0)
require.Len(t, report.Fails, 1)
require.Equal(t, report.Fails[0], selectedNode)
})
}

View File

@ -340,41 +340,62 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report
}
containedInSegment++
go func(pending *PendingAudit, piece *pb.RemotePiece) {
limit, piecePrivateKey, err := verifier.orders.CreateAuditOrderLimit(ctx, createBucketID(path), pending.NodeID, piece.PieceNum, pending.PieceID, pending.ShareSize)
go func(pending *PendingAudit) {
// TODO perhaps we should save piece number as part of the pending audit so we do not need to use metainfo here
pendingPointer, err := verifier.metainfo.Get(ctx, pending.Path)
if err != nil {
if overlay.ErrNodeDisqualified.Has(err) {
_, errDelete := verifier.containment.Delete(ctx, piece.NodeId)
if errDelete != nil {
verifier.log.Debug("Error deleting disqualified node from containment db", zap.String("Segment Path", path), zap.Stringer("Node ID", piece.NodeId), zap.Error(err))
err = errs.Combine(err, errDelete)
}
ch <- result{nodeID: piece.NodeId, status: erred, err: err}
verifier.log.Debug("Reverify: order limit not created (disqualified)", zap.String("Segment Path", path), zap.Stringer("Node ID", piece.NodeId))
return
ch <- result{nodeID: pending.NodeID, status: erred, err: err}
verifier.log.Debug("Reverify: error getting pending pointer from metainfo", zap.String("Segment Path", pending.Path), zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
return
}
var pieceNum int32
found := false
for _, piece := range pendingPointer.GetRemote().GetRemotePieces() {
if piece.NodeId == pending.NodeID {
pieceNum = piece.PieceNum
found = true
}
if overlay.ErrNodeOffline.Has(err) {
ch <- result{nodeID: piece.NodeId, status: offline}
verifier.log.Debug("Reverify: order limit not created (offline)", zap.String("Segment Path", path), zap.Stringer("Node ID", piece.NodeId))
return
}
ch <- result{nodeID: piece.NodeId, status: erred, err: err}
verifier.log.Debug("Reverify: error creating order limit", zap.String("Segment Path", path), zap.Stringer("Node ID", piece.NodeId), zap.Error(err))
}
if !found {
ch <- result{nodeID: pending.NodeID, status: erred, err: err}
verifier.log.Debug("Reverify: could not find node in pointer to audit", zap.String("Segment Path", pending.Path), zap.Stringer("Node ID", pending.NodeID))
return
}
share, err := verifier.GetShare(ctx, limit, piecePrivateKey, pending.StripeIndex, pending.ShareSize, int(piece.PieceNum))
// check if the pending audit was deleted while downloading the share
_, getErr := verifier.containment.Get(ctx, piece.NodeId)
if getErr != nil {
if ErrContainedNotFound.Has(getErr) {
ch <- result{nodeID: piece.NodeId, status: skipped}
verifier.log.Debug("Reverify: pending audit deleted during reverification", zap.String("Segment Path", path), zap.Stringer("Node ID", piece.NodeId), zap.Error(getErr))
limit, piecePrivateKey, err := verifier.orders.CreateAuditOrderLimit(ctx, createBucketID(pending.Path), pending.NodeID, pieceNum, pending.PieceID, pending.ShareSize)
if err != nil {
if overlay.ErrNodeDisqualified.Has(err) {
_, errDelete := verifier.containment.Delete(ctx, pending.NodeID)
if errDelete != nil {
verifier.log.Debug("Error deleting disqualified node from containment db", zap.String("Segment Path", pending.Path), zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
err = errs.Combine(err, errDelete)
}
ch <- result{nodeID: pending.NodeID, status: erred, err: err}
verifier.log.Debug("Reverify: order limit not created (disqualified)", zap.String("Segment Path", pending.Path), zap.Stringer("Node ID", pending.NodeID))
return
}
ch <- result{nodeID: piece.NodeId, status: erred, err: getErr}
verifier.log.Debug("Reverify: error getting from containment db", zap.String("Segment Path", path), zap.Stringer("Node ID", piece.NodeId), zap.Error(getErr))
if overlay.ErrNodeOffline.Has(err) {
ch <- result{nodeID: pending.NodeID, status: offline}
verifier.log.Debug("Reverify: order limit not created (offline)", zap.String("Segment Path", pending.Path), zap.Stringer("Node ID", pending.NodeID))
return
}
ch <- result{nodeID: pending.NodeID, status: erred, err: err}
verifier.log.Debug("Reverify: error creating order limit", zap.String("Segment Path", pending.Path), zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
return
}
share, err := verifier.GetShare(ctx, limit, piecePrivateKey, pending.StripeIndex, pending.ShareSize, int(pieceNum))
// check if the pending audit was deleted while downloading the share
_, getErr := verifier.containment.Get(ctx, pending.NodeID)
if getErr != nil {
if ErrContainedNotFound.Has(getErr) {
ch <- result{nodeID: pending.NodeID, status: skipped}
verifier.log.Debug("Reverify: pending audit deleted during reverification", zap.String("Segment Path", pending.Path), zap.Stringer("Node ID", pending.NodeID), zap.Error(getErr))
return
}
ch <- result{nodeID: pending.NodeID, status: erred, err: getErr}
verifier.log.Debug("Reverify: error getting from containment db", zap.String("Segment Path", pending.Path), zap.Stringer("Node ID", pending.NodeID), zap.Error(getErr))
return
}
@ -383,71 +404,71 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report
if transport.Error.Has(err) {
if errs.Is(err, context.DeadlineExceeded) {
// dial timeout
ch <- result{nodeID: piece.NodeId, status: offline}
verifier.log.Debug("Reverify: dial timeout (offline)", zap.String("Segment Path", path), zap.Stringer("Node ID", piece.NodeId), zap.Error(err))
ch <- result{nodeID: pending.NodeID, status: offline}
verifier.log.Debug("Reverify: dial timeout (offline)", zap.String("Segment Path", path), zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
return
}
if errs2.IsRPC(err, codes.Unknown) {
// dial failed -- offline node
verifier.log.Debug("Reverify: dial failed (offline)", zap.String("Segment Path", path), zap.Stringer("Node ID", piece.NodeId), zap.Error(err))
ch <- result{nodeID: piece.NodeId, status: offline}
verifier.log.Debug("Reverify: dial failed (offline)", zap.String("Segment Path", path), zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
ch <- result{nodeID: pending.NodeID, status: offline}
return
}
// unknown transport error
ch <- result{nodeID: piece.NodeId, status: contained, pendingAudit: pending}
verifier.log.Debug("Reverify: unknown transport error (contained)", zap.String("Segment Path", path), zap.Stringer("Node ID", piece.NodeId), zap.Error(err))
ch <- result{nodeID: pending.NodeID, status: contained, pendingAudit: pending}
verifier.log.Debug("Reverify: unknown transport error (contained)", zap.String("Segment Path", path), zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
return
}
if errs2.IsRPC(err, codes.NotFound) {
// Get the original segment pointer in the metainfo
oldPtr, err := verifier.checkIfSegmentAltered(ctx, pending.Path, pointer)
oldPtr, err := verifier.checkIfSegmentAltered(ctx, pending.Path, pendingPointer)
if err != nil {
ch <- result{nodeID: piece.NodeId, status: success}
verifier.log.Debug("Reverify: audit source deleted before reverification", zap.String("Segment Path", path), zap.Stringer("Node ID", piece.NodeId), zap.Error(err))
ch <- result{nodeID: pending.NodeID, status: success}
verifier.log.Debug("Reverify: audit source deleted before reverification", zap.String("Segment Path", path), zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
return
}
// remove failed audit pieces from the pointer so as to only penalize once for failed audits
err = verifier.removeFailedPieces(ctx, pending.Path, oldPtr, storj.NodeIDList{pending.NodeID})
if err != nil {
verifier.log.Warn("Reverify: failed to delete failed pieces", zap.String("Segment Path", path), zap.Stringer("Node ID", piece.NodeId), zap.Error(err))
verifier.log.Warn("Reverify: failed to delete failed pieces", zap.String("Segment Path", path), zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
}
// missing share
ch <- result{nodeID: piece.NodeId, status: failed}
verifier.log.Debug("Reverify: piece not found (audit failed)", zap.String("Segment Path", path), zap.Stringer("Node ID", piece.NodeId), zap.Error(err))
ch <- result{nodeID: pending.NodeID, status: failed}
verifier.log.Debug("Reverify: piece not found (audit failed)", zap.String("Segment Path", path), zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
return
}
if errs2.IsRPC(err, codes.DeadlineExceeded) {
// dial successful, but download timed out
ch <- result{nodeID: piece.NodeId, status: contained, pendingAudit: pending}
verifier.log.Debug("Reverify: download timeout (contained)", zap.String("Segment Path", path), zap.Stringer("Node ID", piece.NodeId), zap.Error(err))
ch <- result{nodeID: pending.NodeID, status: contained, pendingAudit: pending}
verifier.log.Debug("Reverify: download timeout (contained)", zap.String("Segment Path", path), zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
return
}
// unknown error
ch <- result{nodeID: piece.NodeId, status: contained, pendingAudit: pending}
verifier.log.Debug("Reverify: unknown error (contained)", zap.String("Segment Path", path), zap.Stringer("Node ID", piece.NodeId), zap.Error(err))
ch <- result{nodeID: pending.NodeID, status: contained, pendingAudit: pending}
verifier.log.Debug("Reverify: unknown error (contained)", zap.String("Segment Path", path), zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
return
}
downloadedHash := pkcrypto.SHA256Hash(share.Data)
if bytes.Equal(downloadedHash, pending.ExpectedShareHash) {
ch <- result{nodeID: piece.NodeId, status: success}
verifier.log.Debug("Reverify: hashes match (audit success)", zap.String("Segment Path", path), zap.Stringer("Node ID", piece.NodeId))
ch <- result{nodeID: pending.NodeID, status: success}
verifier.log.Debug("Reverify: hashes match (audit success)", zap.String("Segment Path", path), zap.Stringer("Node ID", pending.NodeID))
} else {
oldPtr, err := verifier.checkIfSegmentAltered(ctx, pending.Path, pointer)
oldPtr, err := verifier.checkIfSegmentAltered(ctx, pending.Path, pendingPointer)
if err != nil {
ch <- result{nodeID: piece.NodeId, status: success}
verifier.log.Debug("Reverify: audit source deleted before reverification", zap.String("Segment Path", path), zap.Stringer("Node ID", piece.NodeId), zap.Error(err))
ch <- result{nodeID: pending.NodeID, status: success}
verifier.log.Debug("Reverify: audit source deleted before reverification", zap.String("Segment Path", path), zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
return
}
// remove failed audit pieces from the pointer so as to only penalize once for failed audits
err = verifier.removeFailedPieces(ctx, pending.Path, oldPtr, storj.NodeIDList{pending.NodeID})
if err != nil {
verifier.log.Warn("Reverify: failed to delete failed pieces", zap.String("Segment Path", path), zap.Stringer("Node ID", piece.NodeId), zap.Error(err))
verifier.log.Warn("Reverify: failed to delete failed pieces", zap.String("Segment Path", pending.Path), zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
}
verifier.log.Debug("Reverify: hashes mismatch (audit failed)", zap.String("Segment Path", path), zap.Stringer("Node ID", piece.NodeId),
verifier.log.Debug("Reverify: hashes mismatch (audit failed)", zap.String("Segment Path", pending.Path), zap.Stringer("Node ID", pending.NodeID),
zap.Binary("expected hash", pending.ExpectedShareHash), zap.Binary("downloaded hash", downloadedHash))
ch <- result{nodeID: piece.NodeId, status: failed}
ch <- result{nodeID: pending.NodeID, status: failed}
}
}(pending, piece)
}(pending)
}
report = &Report{}