satellite/audit: fix audit panic (#3217)

This commit is contained in:
Maximillian von Briesen 2019-10-09 10:06:58 -04:00 committed by GitHub
parent 2e4d8630b9
commit 784ca1582a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 32 additions and 38 deletions

View File

@ -49,7 +49,7 @@ func TestDisqualificationTooManyFailedAudits(t *testing.T) {
var (
satellitePeer = planet.Satellites[0]
nodeID = planet.StorageNodes[0].ID()
report = &audit.Report{
report = audit.Report{
Fails: storj.NodeIDList{nodeID},
}
)

View File

@ -45,11 +45,8 @@ func NewReporter(log *zap.Logger, overlay *overlay.Service, containment Containm
// RecordAudits saves audit results to overlay. When no error, it returns
// nil for both return values, otherwise it returns the report with the fields
// set to the values which have been saved and the error.
func (reporter *Reporter) RecordAudits(ctx context.Context, req *Report) (_ *Report, err error) {
func (reporter *Reporter) RecordAudits(ctx context.Context, req Report) (_ Report, err error) {
defer mon.Task()(&ctx)(&err)
if req == nil {
return nil, nil
}
successes := req.Successes
fails := req.Fails
@ -68,7 +65,7 @@ func (reporter *Reporter) RecordAudits(ctx context.Context, req *Report) (_ *Rep
tries := 0
for tries <= reporter.maxRetries {
if len(successes) == 0 && len(fails) == 0 && len(offlines) == 0 && len(pendingAudits) == 0 {
return nil, nil
return Report{}, nil
}
errlist = errs.Group{}
@ -103,14 +100,14 @@ func (reporter *Reporter) RecordAudits(ctx context.Context, req *Report) (_ *Rep
err = errlist.Err()
if tries >= reporter.maxRetries && err != nil {
return &Report{
return Report{
Successes: successes,
Fails: fails,
Offlines: offlines,
PendingAudits: pendingAudits,
}, errs.Combine(Error.New("some nodes failed to be updated in overlay"), err)
}
return nil, nil
return Report{}, nil
}
// recordAuditFailStatus updates nodeIDs in overlay with isup=true, auditsuccess=false

View File

@ -39,7 +39,7 @@ func TestReportPendingAudits(t *testing.T) {
overlay := satellite.Overlay.Service
containment := satellite.DB.Containment()
failed, err := audits.Reporter.RecordAudits(ctx, &report)
failed, err := audits.Reporter.RecordAudits(ctx, report)
require.NoError(t, err)
assert.Zero(t, failed)
@ -66,7 +66,7 @@ func TestRecordAuditsAtLeastOnce(t *testing.T) {
report := audit.Report{Successes: []storj.NodeID{nodeID}}
// expect RecordAudits to try recording at least once (maxRetries is set to 0)
failed, err := audits.Reporter.RecordAudits(ctx, &report)
failed, err := audits.Reporter.RecordAudits(ctx, report)
require.NoError(t, err)
require.Zero(t, failed)

View File

@ -82,15 +82,15 @@ func NewVerifier(log *zap.Logger, metainfo *metainfo.Service, dialer rpc.Dialer,
}
// Verify downloads shares then verifies the data correctness at a random stripe.
func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[storj.NodeID]bool) (report *Report, err error) {
func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[storj.NodeID]bool) (report Report, err error) {
defer mon.Task()(&ctx)(&err)
pointer, err := verifier.metainfo.Get(ctx, path)
if err != nil {
if storage.ErrKeyNotFound.Has(err) {
return nil, ErrSegmentDeleted.New("%q", path)
return Report{}, ErrSegmentDeleted.New("%q", path)
}
return nil, err
return Report{}, err
}
defer func() {
@ -103,7 +103,7 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[
randomIndex, err := GetRandomStripe(ctx, pointer)
if err != nil {
return nil, err
return Report{}, err
}
shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize()
@ -116,7 +116,7 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[
orderLimits, privateKey, err := verifier.orders.CreateAuditOrderLimits(ctx, bucketID, pointer, skip)
if err != nil {
return nil, err
return Report{}, err
}
// NOTE offlineNodes will include disqualified nodes because they aren't in
@ -131,14 +131,14 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[
shares, err := verifier.DownloadShares(ctx, orderLimits, privateKey, randomIndex, shareSize)
if err != nil {
return &Report{
return Report{
Offlines: offlineNodes,
}, err
}
_, err = verifier.checkIfSegmentAltered(ctx, path, pointer)
if err != nil {
return &Report{
return Report{
Offlines: offlineNodes,
}, err
}
@ -214,7 +214,7 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[
total := int(pointer.Remote.Redundancy.GetTotal())
if len(sharesToAudit) < required {
return &Report{
return Report{
Fails: failedNodes,
Offlines: offlineNodes,
}, ErrNotEnoughShares.New("got %d, required %d", len(sharesToAudit), required)
@ -222,7 +222,7 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[
pieceNums, correctedShares, err := auditShares(ctx, required, total, sharesToAudit)
if err != nil {
return &Report{
return Report{
Fails: failedNodes,
Offlines: offlineNodes,
}, err
@ -278,14 +278,14 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[
pendingAudits, err := createPendingAudits(ctx, containedNodes, correctedShares, pointer, randomIndex, path)
if err != nil {
return &Report{
return Report{
Successes: successNodes,
Fails: failedNodes,
Offlines: offlineNodes,
}, err
}
return &Report{
return Report{
Successes: successNodes,
Fails: failedNodes,
Offlines: offlineNodes,
@ -331,7 +331,7 @@ func (verifier *Verifier) DownloadShares(ctx context.Context, limits []*pb.Addre
}
// Reverify reverifies the contained nodes in the stripe
func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report *Report, err error) {
func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report Report, err error) {
defer mon.Task()(&ctx)(&err)
// result status enum
@ -354,9 +354,9 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report
pointer, err := verifier.metainfo.Get(ctx, path)
if err != nil {
if storage.ErrKeyNotFound.Has(err) {
return nil, ErrSegmentDeleted.New("%q", path)
return Report{}, ErrSegmentDeleted.New("%q", path)
}
return nil, err
return Report{}, err
}
pieceHashesVerified := make(map[storj.NodeID]bool)
@ -560,7 +560,6 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report
}(pending)
}
report = &Report{}
for range pieces {
result := <-ch
switch result.status {

View File

@ -118,19 +118,17 @@ func (worker *Worker) work(ctx context.Context, path storj.Path) error {
// Skip all reverified nodes in the next Verify step.
skip := make(map[storj.NodeID]bool)
if report != nil {
for _, nodeID := range report.Successes {
skip[nodeID] = true
}
for _, nodeID := range report.Offlines {
skip[nodeID] = true
}
for _, nodeID := range report.Fails {
skip[nodeID] = true
}
for _, pending := range report.PendingAudits {
skip[pending.NodeID] = true
}
for _, nodeID := range report.Successes {
skip[nodeID] = true
}
for _, nodeID := range report.Offlines {
skip[nodeID] = true
}
for _, nodeID := range report.Fails {
skip[nodeID] = true
}
for _, pending := range report.PendingAudits {
skip[pending.NodeID] = true
}
// Next, audit the the remaining nodes that are not in containment mode.