satellite/audit: fix checkSegmentAltered to detect segments that have changed during an audit

- Previously, checkSegmentAltered only checked for segments that were replaced
  but we want to detect all changes to a segment that occurred while an audit was being conducted.
- Fixed a bug where nodes failing audits during reverify for non-piece-hash-verified
  segments were not being removed from containment mode.
- Filled in gaps in reverify testing to ensure nodes are properly removed from containment.

Change-Id: Icd96d369278987200fd28581395725438972b292
This commit is contained in:
Jennifer Johnson 2020-03-04 18:09:18 -05:00 committed by Jennifer Li Johnson
parent e6d452decd
commit 0d60c1a4b2
3 changed files with 217 additions and 32 deletions

View File

@ -101,6 +101,14 @@ func TestReverifySuccess(t *testing.T) {
require.Len(t, report.PendingAudits, 0)
require.Len(t, report.Successes, 1)
require.Equal(t, report.Successes[0], pieces[0].NodeId)
// record audit
_, err = audits.Reporter.RecordAudits(ctx, report, path)
require.NoError(t, err)
// make sure that pending audit is removed by the reporter when audit is recorded
_, err = containment.Get(ctx, pending.NodeID)
require.True(t, audit.ErrContainedNotFound.Has(err))
})
}
@ -183,6 +191,14 @@ func TestReverifyFailMissingShare(t *testing.T) {
require.Len(t, report.PendingAudits, 0)
require.Len(t, report.Fails, 1)
require.Equal(t, report.Fails[0], pieces[0].NodeId)
// record audit
_, err = audits.Reporter.RecordAudits(ctx, report, path)
require.NoError(t, err)
// make sure that pending audit is removed by the reporter when audit is recorded
_, err = containment.Get(ctx, pending.NodeID)
require.True(t, audit.ErrContainedNotFound.Has(err))
})
}
@ -267,6 +283,10 @@ func TestReverifyFailMissingShareNotVerified(t *testing.T) {
require.Len(t, report.PendingAudits, 0)
// expect no failed audit
require.Len(t, report.Fails, 0)
// make sure that pending audit is removed
_, err = containment.Get(ctx, pending.NodeID)
require.True(t, audit.ErrContainedNotFound.Has(err))
})
}
@ -329,6 +349,15 @@ func TestReverifyFailBadData(t *testing.T) {
require.Len(t, report.PendingAudits, 0)
require.Len(t, report.Fails, 1)
require.Equal(t, report.Fails[0], nodeID)
// record audit
_, err = audits.Reporter.RecordAudits(ctx, report, path)
require.NoError(t, err)
// make sure that pending audit is removed by the reporter when audit is recorded
containment := satellite.DB.Containment()
_, err = containment.Get(ctx, pending.NodeID)
require.True(t, audit.ErrContainedNotFound.Has(err))
})
}
@ -393,6 +422,11 @@ func TestReverifyOffline(t *testing.T) {
require.Len(t, report.PendingAudits, 0)
require.Len(t, report.Offlines, 1)
require.Equal(t, report.Offlines[0], pieces[0].NodeId)
// make sure that pending audit is not removed
containment := satellite.DB.Containment()
_, err = containment.Get(ctx, pending.NodeID)
require.NoError(t, err)
})
}
@ -479,6 +513,11 @@ func TestReverifyOfflineDialTimeout(t *testing.T) {
require.Len(t, report.PendingAudits, 0)
require.Len(t, report.Offlines, 1)
require.Equal(t, report.Offlines[0], pending.NodeID)
// make sure that pending audit is not removed
containment := satellite.DB.Containment()
_, err = containment.Get(ctx, pending.NodeID)
require.NoError(t, err)
})
}
@ -570,6 +609,93 @@ func TestReverifyDeletedSegment(t *testing.T) {
}
func TestReverifyModifiedSegment(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.ReconfigureRS(1, 2, 4, 4),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
// - uploads random data to a file on all nodes
// - creates a pending audit for a particular node in that file
// - removes a piece from the file so that the segment is modified
// - uploads a new file to all nodes and calls reverify on it
// - expects reverification to pass successufully and the storage node to be not in containment mode
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
metainfo := satellite.Metainfo.Service
audits.Worker.Loop.Pause()
ul := planet.Uplinks[0]
testData1 := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, satellite, "testbucket", "test/path1", testData1)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
pendingPath, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, pendingPath)
require.NoError(t, err)
randomIndex, err := audit.GetRandomStripe(ctx, pointer)
require.NoError(t, err)
nodeID := pointer.GetRemote().GetRemotePieces()[0].NodeId
pending := &audit.PendingAudit{
NodeID: nodeID,
PieceID: pointer.GetRemote().RootPieceId,
StripeIndex: randomIndex,
ShareSize: pointer.GetRemote().GetRedundancy().GetErasureShareSize(),
ExpectedShareHash: pkcrypto.SHA256Hash(nil),
ReverifyCount: 0,
Path: pendingPath,
}
containment := satellite.DB.Containment()
err = containment.IncrementPending(ctx, pending)
require.NoError(t, err)
// remove a piece from the file (a piece that the contained node isn't holding)
audits.Verifier.OnTestingCheckSegmentAlteredHook = func() {
pieceToRemove := pointer.Remote.RemotePieces[1]
_, err = metainfo.UpdatePieces(ctx, pendingPath, pointer, nil, []*pb.RemotePiece{pieceToRemove})
require.NoError(t, err)
}
// upload another file to call reverify on
testData2 := testrand.Bytes(8 * memory.KiB)
err = ul.Upload(ctx, satellite, "testbucket", "test/path2", testData2)
require.NoError(t, err)
// select the encrypted path that was not used for the pending audit
audits.Chore.Loop.TriggerWait()
path1, err := queue.Next()
require.NoError(t, err)
path2, err := queue.Next()
require.NoError(t, err)
reverifyPath := path1
if path1 == pendingPath {
reverifyPath = path2
}
// reverify the path that was not modified
report, err := audits.Verifier.Reverify(ctx, reverifyPath)
require.NoError(t, err)
assert.Empty(t, report.Fails)
assert.Empty(t, report.Successes)
assert.Empty(t, report.PendingAudits)
// expect that the node was removed from containment since the segment it was contained for has been changed
_, err = containment.Get(ctx, nodeID)
require.True(t, audit.ErrContainedNotFound.Has(err))
})
}
func TestReverifyReplacedSegment(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
@ -763,6 +889,14 @@ func TestReverifyDifferentShare(t *testing.T) {
require.Len(t, report.PendingAudits, 0)
require.Len(t, report.Fails, 1)
require.Equal(t, report.Fails[0], selectedNode)
// record audit
_, err = audits.Reporter.RecordAudits(ctx, report, path2)
require.NoError(t, err)
// make sure that pending audit is removed by the reporter when audit is recorded
_, err = containment.Get(ctx, pending.NodeID)
require.True(t, audit.ErrContainedNotFound.Has(err))
})
}
@ -1034,6 +1168,9 @@ func TestReverifySlowDownload(t *testing.T) {
require.Len(t, report.PendingAudits, 1)
require.Len(t, report.Unknown, 0)
require.Equal(t, report.PendingAudits[0].NodeID, slowNode)
_, err = containment.Get(ctx, slowNode)
require.NoError(t, err)
})
}
@ -1120,5 +1257,14 @@ func TestReverifyUnknownError(t *testing.T) {
require.Len(t, report.PendingAudits, 0)
require.Len(t, report.Unknown, 1)
require.Equal(t, report.Unknown[0], badNode)
// TODO uncomment this stuff when suspension mode is implemented
//// record audit
//_, err = audits.Reporter.RecordAudits(ctx, report, path)
//require.NoError(t, err)
//
//// make sure that pending audit is removed by the reporter when audit is recorded
//_, err = containment.Get(ctx, pending.NodeID)
//require.True(t, audit.ErrContainedNotFound.Has(err))
})
}

View File

@ -40,6 +40,8 @@ var (
ErrSegmentDeleted = errs.Class("segment deleted during audit")
// ErrSegmentExpired is the errs class used when a segment to audit has already expired.
ErrSegmentExpired = errs.Class("segment expired before audit")
// ErrSegmentModified is the errs class used when a segment has been changed in any way
ErrSegmentModified = errs.Class("segment has been modified")
)
// Share represents required information about an audited share
@ -145,7 +147,7 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[
}, err
}
_, err = verifier.checkIfSegmentAltered(ctx, path, pointer)
err = verifier.checkIfSegmentAltered(ctx, path, pointer, pointerBytes)
if err != nil {
return Report{
Offlines: offlineNodes,
@ -390,17 +392,28 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report
pieceHashesVerifiedMutex.Lock()
// for each node in Fails and PendingAudits, remove if piece hashes not verified for that segment
// if the piece hashes are not verified, remove the "failed" node from containment
newFails := storj.NodeIDList{}
newPendingAudits := []*PendingAudit{}
for _, id := range report.Fails {
if pieceHashesVerified[id] {
newFails = append(newFails, id)
} else {
_, errDelete := verifier.containment.Delete(ctx, id)
if errDelete != nil {
verifier.log.Debug("Error deleting node from containment db", zap.Stringer("Node ID", id), zap.Error(errDelete))
}
}
}
for _, pending := range report.PendingAudits {
if pieceHashesVerified[pending.NodeID] {
newPendingAudits = append(newPendingAudits, pending)
} else {
_, errDelete := verifier.containment.Delete(ctx, pending.NodeID)
if errDelete != nil {
verifier.log.Debug("Error deleting node from containment db", zap.Stringer("Node ID", pending.NodeID), zap.Error(errDelete))
}
}
}
@ -431,11 +444,6 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report
pendingPointerBytes, pendingPointer, err := verifier.metainfo.GetWithBytes(ctx, pending.Path)
if err != nil {
if storj.ErrObjectNotFound.Has(err) {
// segment has been deleted since node was contained
_, errDelete := verifier.containment.Delete(ctx, pending.NodeID)
if errDelete != nil {
verifier.log.Debug("Error deleting node from containment db", zap.Binary("Segment", []byte(pending.Path)), zap.Stringer("Node ID", pending.NodeID), zap.Error(errDelete))
}
ch <- result{nodeID: pending.NodeID, status: skipped}
return
}
@ -449,10 +457,6 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report
if errDelete != nil {
verifier.log.Debug("Reverify: error deleting expired segment", zap.Binary("Segment", []byte(pending.Path)), zap.Stringer("Node ID", pending.NodeID), zap.Error(errDelete))
}
_, errDelete = verifier.containment.Delete(ctx, pending.NodeID)
if errDelete != nil {
verifier.log.Debug("Error deleting node from containment db", zap.Binary("Segment", []byte(pending.Path)), zap.Stringer("Node ID", pending.NodeID), zap.Error(errDelete))
}
verifier.log.Debug("Reverify: segment already expired", zap.Binary("Segment", []byte(pending.Path)), zap.Stringer("Node ID", pending.NodeID))
ch <- result{nodeID: pending.NodeID, status: skipped}
return
@ -464,11 +468,6 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report
pieceHashesVerifiedMutex.Unlock()
if pendingPointer.GetRemote().RootPieceId != pending.PieceID {
// segment has changed since initial containment
_, errDelete := verifier.containment.Delete(ctx, pending.NodeID)
if errDelete != nil {
verifier.log.Debug("Error deleting node from containment db", zap.Binary("Segment", []byte(pending.Path)), zap.Stringer("Node ID", pending.NodeID), zap.Error(errDelete))
}
ch <- result{nodeID: pending.NodeID, status: skipped}
return
}
@ -481,11 +480,6 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report
}
}
if !found {
// node is no longer in pointer, so remove from containment
_, errDelete := verifier.containment.Delete(ctx, pending.NodeID)
if errDelete != nil {
verifier.log.Debug("Error deleting node from containment db", zap.Binary("Segment", []byte(pending.Path)), zap.Stringer("Node ID", pending.NodeID), zap.Error(errDelete))
}
ch <- result{nodeID: pending.NodeID, status: skipped}
return
}
@ -548,10 +542,10 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report
}
if errs2.IsRPC(err, rpcstatus.NotFound) {
// Get the original segment pointer in the metainfo
_, err := verifier.checkIfSegmentAltered(ctx, pending.Path, pendingPointer)
err := verifier.checkIfSegmentAltered(ctx, pending.Path, pendingPointer, pendingPointerBytes)
if err != nil {
ch <- result{nodeID: pending.NodeID, status: success}
verifier.log.Debug("Reverify: audit source deleted before reverification", zap.Binary("Segment", []byte(pending.Path)), zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
ch <- result{nodeID: pending.NodeID, status: skipped}
verifier.log.Debug("Reverify: audit source changed before reverification", zap.Binary("Segment", []byte(pending.Path)), zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
return
}
// missing share
@ -575,10 +569,10 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report
ch <- result{nodeID: pending.NodeID, status: success}
verifier.log.Debug("Reverify: hashes match (audit success)", zap.Binary("Segment", []byte(pending.Path)), zap.Stringer("Node ID", pending.NodeID))
} else {
_, err := verifier.checkIfSegmentAltered(ctx, pending.Path, pendingPointer)
err := verifier.checkIfSegmentAltered(ctx, pending.Path, pendingPointer, pendingPointerBytes)
if err != nil {
ch <- result{nodeID: pending.NodeID, status: success}
verifier.log.Debug("Reverify: audit source deleted before reverification", zap.Binary("Segment", []byte(pending.Path)), zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
ch <- result{nodeID: pending.NodeID, status: skipped}
verifier.log.Debug("Reverify: audit source changed before reverification", zap.Binary("Segment", []byte(pending.Path)), zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
return
}
verifier.log.Debug("Reverify: hashes mismatch (audit failed)", zap.Binary("Segment", []byte(pending.Path)), zap.Stringer("Node ID", pending.NodeID),
@ -601,6 +595,11 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report
report.PendingAudits = append(report.PendingAudits, result.pendingAudit)
case unknown:
report.Unknown = append(report.Unknown, result.nodeID)
case skipped:
_, errDelete := verifier.containment.Delete(ctx, result.nodeID)
if errDelete != nil {
verifier.log.Debug("Error deleting node from containment db", zap.Stringer("Node ID", result.nodeID), zap.Error(errDelete))
}
case erred:
err = errs.Combine(err, result.err)
}
@ -680,25 +679,29 @@ func (verifier *Verifier) GetShare(ctx context.Context, limit *pb.AddressedOrder
}
// checkIfSegmentAltered checks if path's pointer has been altered since path was selected.
func (verifier *Verifier) checkIfSegmentAltered(ctx context.Context, segmentPath string, oldPointer *pb.Pointer) (newPointer *pb.Pointer, err error) {
func (verifier *Verifier) checkIfSegmentAltered(ctx context.Context, segmentPath string, oldPointer *pb.Pointer, oldPointerBytes []byte) (err error) {
defer mon.Task()(&ctx)(&err)
if verifier.OnTestingCheckSegmentAlteredHook != nil {
verifier.OnTestingCheckSegmentAlteredHook()
}
newPointer, err = verifier.metainfo.Get(ctx, segmentPath)
newPointerBytes, newPointer, err := verifier.metainfo.GetWithBytes(ctx, segmentPath)
if err != nil {
if storj.ErrObjectNotFound.Has(err) {
return nil, ErrSegmentDeleted.New("%q", segmentPath)
return ErrSegmentDeleted.New("%q", segmentPath)
}
return nil, err
return err
}
if oldPointer != nil && oldPointer.CreationDate != newPointer.CreationDate {
return nil, ErrSegmentDeleted.New("%q", segmentPath)
return ErrSegmentDeleted.New("%q", segmentPath)
}
return newPointer, nil
if !bytes.Equal(oldPointerBytes, newPointerBytes) {
return ErrSegmentModified.New("%q", segmentPath)
}
return nil
}
// auditShares takes the downloaded shares and uses infectious's Correct function to check that they

View File

@ -654,6 +654,42 @@ func TestVerifierDeletedSegment(t *testing.T) {
}
func TestVerifierModifiedSegment(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
metainfo := satellite.Metainfo.Service
audits.Worker.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
path, err := queue.Next()
require.NoError(t, err)
audits.Verifier.OnTestingCheckSegmentAlteredHook = func() {
// remove one piece from the segment so that checkIfSegmentAltered fails
pointer, err := metainfo.Get(ctx, path)
require.NoError(t, err)
pieceToRemove := pointer.Remote.RemotePieces[0]
_, err = metainfo.UpdatePieces(ctx, path, pointer, nil, []*pb.RemotePiece{pieceToRemove})
require.NoError(t, err)
}
report, err := audits.Verifier.Verify(ctx, path, nil)
require.True(t, audit.ErrSegmentModified.Has(err))
assert.Empty(t, report)
})
}
func TestVerifierReplacedSegment(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {