satellite/{audit,satellitedb}: release nodes from containment in Reverify rather than (Batch)UpdateStats
Until now, whenever audits were recorded we would try to delete the node from containment just in case it exists. Since we now want to treat segment repair downloads as audits, this would erroneously remove nodes from containment, as repair does not go through a Reverify step. With this changeset, (Batch)UpdateStats will not remove nodes from containment. The Reverify method will remove all necessary nodes from containment. Change-Id: Iabc9496293076dccba32ddfa028e92580b26167f
This commit is contained in:
parent
e76cbc9bd5
commit
53322bb0a7
@ -114,6 +114,8 @@ func TestContainDelete(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
// UpdateStats used to remove nodes from containment. It doesn't anymore.
|
||||
// This is a sanity check.
|
||||
func TestContainUpdateStats(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 1,
|
||||
@ -138,9 +140,8 @@ func TestContainUpdateStats(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
assert.False(t, node.Contained)
|
||||
|
||||
// get pending audit that doesn't exist
|
||||
// get pending audit
|
||||
_, err = containment.Get(ctx, info1.NodeID)
|
||||
assert.Error(t, err, audit.ErrContainedNotFound.New("%v", info1.NodeID))
|
||||
assert.True(t, audit.ErrContainedNotFound.Has(err))
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
@ -100,11 +100,7 @@ func TestReverifySuccess(t *testing.T) {
|
||||
require.Len(t, report.Successes, 1)
|
||||
require.Equal(t, report.Successes[0], pieces[0].StorageNode)
|
||||
|
||||
// record audit
|
||||
_, err = audits.Reporter.RecordAudits(ctx, report)
|
||||
require.NoError(t, err)
|
||||
|
||||
// make sure that pending audit is removed by the reporter when audit is recorded
|
||||
// make sure that pending audit is removed
|
||||
_, err = containment.Get(ctx, pending.NodeID)
|
||||
require.True(t, audit.ErrContainedNotFound.Has(err))
|
||||
})
|
||||
@ -189,11 +185,7 @@ func TestReverifyFailMissingShare(t *testing.T) {
|
||||
require.Len(t, report.Fails, 1)
|
||||
require.Equal(t, report.Fails[0], pieces[0].StorageNode)
|
||||
|
||||
// record audit
|
||||
_, err = audits.Reporter.RecordAudits(ctx, report)
|
||||
require.NoError(t, err)
|
||||
|
||||
// make sure that pending audit is removed by the reporter when audit is recorded
|
||||
// make sure that pending audit is removed
|
||||
_, err = containment.Get(ctx, pending.NodeID)
|
||||
require.True(t, audit.ErrContainedNotFound.Has(err))
|
||||
})
|
||||
@ -263,11 +255,7 @@ func TestReverifyFailBadData(t *testing.T) {
|
||||
require.Len(t, report.Fails, 1)
|
||||
require.Equal(t, report.Fails[0], nodeID)
|
||||
|
||||
// record audit
|
||||
_, err = audits.Reporter.RecordAudits(ctx, report)
|
||||
require.NoError(t, err)
|
||||
|
||||
// make sure that pending audit is removed by the reporter when audit is recorded
|
||||
// make sure that pending audit is removed
|
||||
containment := satellite.DB.Containment()
|
||||
_, err = containment.Get(ctx, pending.NodeID)
|
||||
require.True(t, audit.ErrContainedNotFound.Has(err))
|
||||
@ -340,9 +328,6 @@ func TestReverifyOffline(t *testing.T) {
|
||||
require.Len(t, report.Offlines, 1)
|
||||
require.Equal(t, report.Offlines[0], pieces[0].StorageNode)
|
||||
|
||||
_, err = audits.Reporter.RecordAudits(ctx, report)
|
||||
require.NoError(t, err)
|
||||
|
||||
// make sure that pending audit is not removed
|
||||
containment := satellite.DB.Containment()
|
||||
_, err = containment.Get(ctx, pending.NodeID)
|
||||
@ -440,9 +425,6 @@ func TestReverifyOfflineDialTimeout(t *testing.T) {
|
||||
require.Len(t, report.Offlines, 1)
|
||||
require.Equal(t, report.Offlines[0], pending.NodeID)
|
||||
|
||||
_, err = audits.Reporter.RecordAudits(ctx, report)
|
||||
require.NoError(t, err)
|
||||
|
||||
// make sure that pending audit is not removed
|
||||
containment := satellite.DB.Containment()
|
||||
_, err = containment.Get(ctx, pending.NodeID)
|
||||
@ -536,9 +518,6 @@ func TestReverifyDeletedSegment(t *testing.T) {
|
||||
assert.Empty(t, report.Successes)
|
||||
assert.Empty(t, report.PendingAudits)
|
||||
|
||||
_, err = audits.Reporter.RecordAudits(ctx, report)
|
||||
require.NoError(t, err)
|
||||
|
||||
// expect that the node was removed from containment since the segment it was contained for has been deleted
|
||||
_, err = containment.Get(ctx, nodeID)
|
||||
require.True(t, audit.ErrContainedNotFound.Has(err))
|
||||
@ -635,9 +614,6 @@ func TestReverifyModifiedSegment(t *testing.T) {
|
||||
assert.Empty(t, report.Successes)
|
||||
assert.Empty(t, report.PendingAudits)
|
||||
|
||||
_, err = audits.Reporter.RecordAudits(ctx, report)
|
||||
require.NoError(t, err)
|
||||
|
||||
// expect that the node was removed from containment since the segment it was contained for has been changed
|
||||
_, err = containment.Get(ctx, nodeID)
|
||||
require.True(t, audit.ErrContainedNotFound.Has(err))
|
||||
@ -726,9 +702,6 @@ func TestReverifyReplacedSegment(t *testing.T) {
|
||||
assert.Empty(t, report.Successes)
|
||||
assert.Empty(t, report.PendingAudits)
|
||||
|
||||
_, err = audits.Reporter.RecordAudits(ctx, report)
|
||||
require.NoError(t, err)
|
||||
|
||||
// expect that the node was removed from containment since the segment it was contained for has been changed
|
||||
_, err = containment.Get(ctx, nodeID)
|
||||
require.True(t, audit.ErrContainedNotFound.Has(err))
|
||||
@ -851,11 +824,7 @@ func TestReverifyDifferentShare(t *testing.T) {
|
||||
require.Len(t, report.Fails, 1)
|
||||
require.Equal(t, report.Fails[0], selectedNode)
|
||||
|
||||
// record audit
|
||||
_, err = audits.Reporter.RecordAudits(ctx, report)
|
||||
require.NoError(t, err)
|
||||
|
||||
// make sure that pending audit is removed by the reporter when audit is recorded
|
||||
// make sure that pending audit is removed
|
||||
_, err = containment.Get(ctx, pending.NodeID)
|
||||
require.True(t, audit.ErrContainedNotFound.Has(err))
|
||||
})
|
||||
@ -892,9 +861,6 @@ func TestReverifyExpired1(t *testing.T) {
|
||||
report, err := audits.Verifier.Reverify(ctx, queueSegment)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = audits.Reporter.RecordAudits(ctx, report)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Len(t, report.Successes, 0)
|
||||
assert.Len(t, report.Fails, 0)
|
||||
assert.Len(t, report.Offlines, 0)
|
||||
@ -1016,9 +982,6 @@ func TestReverifyExpired2(t *testing.T) {
|
||||
require.Len(t, report.PendingAudits, 0)
|
||||
require.Len(t, report.Fails, 0)
|
||||
|
||||
_, err = audits.Reporter.RecordAudits(ctx, report)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Reverify should remove the node from containment mode
|
||||
_, err = containment.Get(ctx, pending.NodeID)
|
||||
require.True(t, audit.ErrContainedNotFound.Has(err))
|
||||
@ -1200,11 +1163,7 @@ func TestReverifyUnknownError(t *testing.T) {
|
||||
require.Len(t, report.Unknown, 1)
|
||||
require.Equal(t, report.Unknown[0], badNode)
|
||||
|
||||
// record audit
|
||||
_, err = audits.Reporter.RecordAudits(ctx, report)
|
||||
require.NoError(t, err)
|
||||
|
||||
// make sure that pending audit is removed by the reporter when audit is recorded
|
||||
// make sure that pending audit is removed
|
||||
_, err = containment.Get(ctx, pending.NodeID)
|
||||
require.True(t, audit.ErrContainedNotFound.Has(err))
|
||||
})
|
||||
|
@ -350,7 +350,6 @@ func (verifier *Verifier) Reverify(ctx context.Context, segment Segment) (report
|
||||
failed
|
||||
contained
|
||||
unknown
|
||||
remove
|
||||
erred
|
||||
)
|
||||
|
||||
@ -358,6 +357,7 @@ func (verifier *Verifier) Reverify(ctx context.Context, segment Segment) (report
|
||||
nodeID storj.NodeID
|
||||
status int
|
||||
pendingAudit *PendingAudit
|
||||
release bool
|
||||
err error
|
||||
}
|
||||
|
||||
@ -402,7 +402,7 @@ func (verifier *Verifier) Reverify(ctx context.Context, segment Segment) (report
|
||||
})
|
||||
if err != nil {
|
||||
if storj.ErrObjectNotFound.Has(err) {
|
||||
ch <- result{nodeID: pending.NodeID, status: remove}
|
||||
ch <- result{nodeID: pending.NodeID, status: skipped, release: true}
|
||||
return
|
||||
}
|
||||
|
||||
@ -413,7 +413,7 @@ func (verifier *Verifier) Reverify(ctx context.Context, segment Segment) (report
|
||||
|
||||
if pendingObject.ExpiresAt != nil && !pendingObject.ExpiresAt.IsZero() && pendingObject.ExpiresAt.Before(verifier.nowFn()) {
|
||||
verifier.log.Debug("Reverify: segment already expired", zap.Stringer("Node ID", pending.NodeID))
|
||||
ch <- result{nodeID: pending.NodeID, status: remove}
|
||||
ch <- result{nodeID: pending.NodeID, status: skipped, release: true}
|
||||
return
|
||||
}
|
||||
|
||||
@ -423,7 +423,7 @@ func (verifier *Verifier) Reverify(ctx context.Context, segment Segment) (report
|
||||
})
|
||||
if err != nil {
|
||||
if metabase.ErrSegmentNotFound.Has(err) {
|
||||
ch <- result{nodeID: pending.NodeID, status: skipped}
|
||||
ch <- result{nodeID: pending.NodeID, status: skipped, release: true}
|
||||
return
|
||||
}
|
||||
|
||||
@ -434,7 +434,7 @@ func (verifier *Verifier) Reverify(ctx context.Context, segment Segment) (report
|
||||
|
||||
// TODO: is this check still necessary? If the segment was found by its StreamID and position, the RootPieceID should not had changed.
|
||||
if pendingSegmentInfo.RootPieceID != pending.PieceID {
|
||||
ch <- result{nodeID: pending.NodeID, status: remove}
|
||||
ch <- result{nodeID: pending.NodeID, status: skipped, release: true}
|
||||
return
|
||||
}
|
||||
var pieceNum uint16
|
||||
@ -446,27 +446,19 @@ func (verifier *Verifier) Reverify(ctx context.Context, segment Segment) (report
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
ch <- result{nodeID: pending.NodeID, status: remove}
|
||||
ch <- result{nodeID: pending.NodeID, status: skipped, release: true}
|
||||
return
|
||||
}
|
||||
|
||||
limit, piecePrivateKey, cachedIPAndPort, err := verifier.orders.CreateAuditOrderLimit(ctx, pending.Segment.Bucket(), pending.NodeID, pieceNum, pending.PieceID, pending.ShareSize)
|
||||
if err != nil {
|
||||
if overlay.ErrNodeDisqualified.Has(err) {
|
||||
_, errDelete := verifier.containment.Delete(ctx, pending.NodeID)
|
||||
if errDelete != nil {
|
||||
verifier.log.Debug("Error deleting disqualified node from containment db", zap.Stringer("Node ID", pending.NodeID), zap.Error(errDelete))
|
||||
}
|
||||
ch <- result{nodeID: pending.NodeID, status: erred, err: err}
|
||||
ch <- result{nodeID: pending.NodeID, status: skipped, release: true}
|
||||
verifier.log.Debug("Reverify: order limit not created (disqualified)", zap.Stringer("Node ID", pending.NodeID))
|
||||
return
|
||||
}
|
||||
if overlay.ErrNodeFinishedGE.Has(err) {
|
||||
_, errDelete := verifier.containment.Delete(ctx, pending.NodeID)
|
||||
if errDelete != nil {
|
||||
verifier.log.Debug("Error deleting gracefully exited node from containment db", zap.Stringer("Node ID", pending.NodeID), zap.Error(errDelete))
|
||||
}
|
||||
ch <- result{nodeID: pending.NodeID, status: erred, err: err}
|
||||
ch <- result{nodeID: pending.NodeID, status: skipped, release: true}
|
||||
verifier.log.Debug("Reverify: order limit not created (completed graceful exit)", zap.Stringer("Node ID", pending.NodeID))
|
||||
return
|
||||
}
|
||||
@ -511,7 +503,7 @@ func (verifier *Verifier) Reverify(ctx context.Context, segment Segment) (report
|
||||
return
|
||||
}
|
||||
// unknown transport error
|
||||
ch <- result{nodeID: pending.NodeID, status: unknown, pendingAudit: pending}
|
||||
ch <- result{nodeID: pending.NodeID, status: unknown, pendingAudit: pending, release: true}
|
||||
verifier.log.Info("Reverify: unknown transport error (skipped)", zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
|
||||
return
|
||||
}
|
||||
@ -519,12 +511,12 @@ func (verifier *Verifier) Reverify(ctx context.Context, segment Segment) (report
|
||||
// Get the original segment
|
||||
err := verifier.checkIfSegmentAltered(ctx, pending.Segment, pendingSegmentInfo)
|
||||
if err != nil {
|
||||
ch <- result{nodeID: pending.NodeID, status: remove}
|
||||
ch <- result{nodeID: pending.NodeID, status: skipped, release: true}
|
||||
verifier.log.Debug("Reverify: audit source changed before reverification", zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
|
||||
return
|
||||
}
|
||||
// missing share
|
||||
ch <- result{nodeID: pending.NodeID, status: failed}
|
||||
ch <- result{nodeID: pending.NodeID, status: failed, release: true}
|
||||
verifier.log.Info("Reverify: piece not found (audit failed)", zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
|
||||
return
|
||||
}
|
||||
@ -535,24 +527,24 @@ func (verifier *Verifier) Reverify(ctx context.Context, segment Segment) (report
|
||||
return
|
||||
}
|
||||
// unknown error
|
||||
ch <- result{nodeID: pending.NodeID, status: unknown, pendingAudit: pending}
|
||||
ch <- result{nodeID: pending.NodeID, status: unknown, pendingAudit: pending, release: true}
|
||||
verifier.log.Info("Reverify: unknown error (skipped)", zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
|
||||
return
|
||||
}
|
||||
downloadedHash := pkcrypto.SHA256Hash(share.Data)
|
||||
if bytes.Equal(downloadedHash, pending.ExpectedShareHash) {
|
||||
ch <- result{nodeID: pending.NodeID, status: success}
|
||||
ch <- result{nodeID: pending.NodeID, status: success, release: true}
|
||||
verifier.log.Info("Reverify: hashes match (audit success)", zap.Stringer("Node ID", pending.NodeID))
|
||||
} else {
|
||||
err := verifier.checkIfSegmentAltered(ctx, pending.Segment, pendingSegmentInfo)
|
||||
if err != nil {
|
||||
ch <- result{nodeID: pending.NodeID, status: remove}
|
||||
ch <- result{nodeID: pending.NodeID, status: skipped, release: true}
|
||||
verifier.log.Debug("Reverify: audit source changed before reverification", zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
|
||||
return
|
||||
}
|
||||
verifier.log.Info("Reverify: hashes mismatch (audit failed)", zap.Stringer("Node ID", pending.NodeID),
|
||||
zap.Binary("expected hash", pending.ExpectedShareHash), zap.Binary("downloaded hash", downloadedHash))
|
||||
ch <- result{nodeID: pending.NodeID, status: failed}
|
||||
ch <- result{nodeID: pending.NodeID, status: failed, release: true}
|
||||
}
|
||||
}(pending)
|
||||
}
|
||||
@ -571,14 +563,15 @@ func (verifier *Verifier) Reverify(ctx context.Context, segment Segment) (report
|
||||
report.PendingAudits = append(report.PendingAudits, result.pendingAudit)
|
||||
case unknown:
|
||||
report.Unknown = append(report.Unknown, result.nodeID)
|
||||
case remove:
|
||||
case erred:
|
||||
err = errs.Combine(err, result.err)
|
||||
default:
|
||||
}
|
||||
if result.release {
|
||||
_, errDelete := verifier.containment.Delete(ctx, result.nodeID)
|
||||
if errDelete != nil {
|
||||
verifier.log.Debug("Error deleting node from containment db", zap.Stringer("Node ID", result.nodeID), zap.Error(errDelete))
|
||||
}
|
||||
case erred:
|
||||
err = errs.Combine(err, result.err)
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1262,11 +1262,6 @@ func buildUpdateStatement(update updateNodeStats, isUp bool) string {
|
||||
|
||||
sql += fmt.Sprintf(" WHERE nodes.id = decode('%v', 'hex');\n", hexNodeID)
|
||||
|
||||
// only remove from containment if node is online
|
||||
if isUp {
|
||||
sql += fmt.Sprintf("DELETE FROM pending_audits WHERE pending_audits.node_id = decode('%v', 'hex');\n", hexNodeID)
|
||||
}
|
||||
|
||||
return sql
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user