satellite/audit: add mutex to pieceHashesVerified map (#3214)

* add mutex

* remove double send to ch

* lock mutex inside defer

* import sync
This commit is contained in:
Maximillian von Briesen 2019-10-08 17:01:32 -04:00 committed by Yingrong Zhao
parent f75893c1ba
commit 3a3d576d9b

View File

@ -8,6 +8,7 @@ import (
"context"
"io"
"math/rand"
"sync"
"time"
"github.com/vivint/infectious"
@ -359,7 +360,10 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report
}
pieceHashesVerified := make(map[storj.NodeID]bool)
pieceHashesVerifiedMutex := &sync.Mutex{}
defer func() {
pieceHashesVerifiedMutex.Lock()
// for each node in Fails and PendingAudits, remove if piece hashes not verified for that segment
newFails := storj.NodeIDList{}
newPendingAudits := []*PendingAudit{}
@ -377,6 +381,8 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report
report.Fails = newFails
report.PendingAudits = newPendingAudits
pieceHashesVerifiedMutex.Unlock()
}()
pieces := pointer.GetRemote().GetRemotePieces()
@ -415,7 +421,9 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report
}
// set whether piece hashes have been verified for this segment so we know whether to report a failed or pending audit for this node
pieceHashesVerifiedMutex.Lock()
pieceHashesVerified[pending.NodeID] = pendingPointer.PieceHashesVerified
pieceHashesVerifiedMutex.Unlock()
if pendingPointer.GetRemote().RootPieceId != pending.PieceID {
// segment has changed since initial containment
@ -436,7 +444,6 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report
}
if !found {
// node is no longer in pointer, so remove from containment
ch <- result{nodeID: pending.NodeID, status: erred, err: err}
_, errDelete := verifier.containment.Delete(ctx, pending.NodeID)
if errDelete != nil {
verifier.log.Debug("Error deleting node from containment db", zap.String("Segment Path", pending.Path), zap.Stringer("Node ID", pending.NodeID), zap.Error(errDelete))