rearranges code in verifier.go (#1529)

This commit is contained in:
Natalie Villasana 2019-03-20 06:54:37 -04:00 committed by Michal Niewrzal
parent b4b99fa979
commit e390605b81

View File

@ -64,121 +64,6 @@ func NewVerifier(log *zap.Logger, transport transport.Client, overlay *overlay.C
return &Verifier{downloader: newDefaultDownloader(log, transport, overlay, id, minBytesPerSecond)}
}
// getShare use piece store client to download shares from nodes
func (d *defaultDownloader) getShare(ctx context.Context, limit *pb.AddressedOrderLimit, stripeIndex int64, shareSize int32, pieceNum int) (share Share, err error) {
defer mon.Task()(&ctx)(&err)
bandwidthMsgSize := shareSize
// determines number of seconds allotted for receiving data from a storage node
seconds := time.Duration(int32(time.Second) * bandwidthMsgSize / d.minBytesPerSecond.Int32())
timedCtx, cancel := context.WithTimeout(ctx, seconds)
defer cancel()
storageNodeID := limit.GetLimit().StorageNodeId
conn, err := d.transport.DialNode(timedCtx, &pb.Node{
Id: storageNodeID,
Address: limit.GetStorageNodeAddress(),
Type: pb.NodeType_STORAGE,
})
if err != nil {
return Share{}, err
}
ps := piecestore.NewClient(
d.log.Named(storageNodeID.String()),
signing.SignerFromFullIdentity(d.transport.Identity()),
conn,
piecestore.DefaultConfig,
)
offset := int64(shareSize) * stripeIndex
downloader, err := ps.Download(timedCtx, limit.GetLimit(), offset, int64(shareSize))
if err != nil {
return Share{}, err
}
defer func() { err = errs.Combine(err, downloader.Close()) }()
buf := make([]byte, shareSize)
_, err = io.ReadFull(downloader, buf)
if err != nil {
return Share{}, err
}
return Share{
Error: nil,
PieceNum: pieceNum,
Data: buf,
}, nil
}
// Download Shares downloads shares from the nodes where remote pieces are located
func (d *defaultDownloader) DownloadShares(ctx context.Context, limits []*pb.AddressedOrderLimit, stripeIndex int64, shareSize int32) (shares map[int]Share, nodes map[int]storj.NodeID, err error) {
defer mon.Task()(&ctx)(&err)
shares = make(map[int]Share, len(limits))
nodes = make(map[int]storj.NodeID, len(limits))
for i, limit := range limits {
if limit == nil {
continue
}
share, err := d.getShare(ctx, limit, stripeIndex, shareSize, i)
if err != nil {
share = Share{
Error: err,
PieceNum: i,
Data: nil,
}
}
shares[share.PieceNum] = share
nodes[share.PieceNum] = limit.GetLimit().StorageNodeId
}
return shares, nodes, nil
}
func makeCopies(ctx context.Context, originals map[int]Share) (copies []infectious.Share, err error) {
defer mon.Task()(&ctx)(&err)
copies = make([]infectious.Share, 0, len(originals))
for _, original := range originals {
copies = append(copies, infectious.Share{
Data: append([]byte{}, original.Data...),
Number: original.PieceNum})
}
return copies, nil
}
// auditShares takes the downloaded shares and uses infectious's Correct function to check that they
// haven't been altered. auditShares returns a slice containing the piece numbers of altered shares.
func auditShares(ctx context.Context, required, total int, originals map[int]Share) (pieceNums []int, err error) {
defer mon.Task()(&ctx)(&err)
f, err := infectious.NewFEC(required, total)
if err != nil {
return nil, err
}
copies, err := makeCopies(ctx, originals)
if err != nil {
return nil, err
}
err = f.Correct(copies)
if err != nil {
return nil, err
}
for _, share := range copies {
if !bytes.Equal(originals[share.Number].Data, share.Data) {
pieceNums = append(pieceNums, share.Number)
}
}
return pieceNums, nil
}
// Verify downloads shares then verifies the data correctness at the given stripe
func (verifier *Verifier) Verify(ctx context.Context, stripe *Stripe) (verifiedNodes *RecordAuditsInfo, err error) {
defer mon.Task()(&ctx)(&err)
@ -237,6 +122,122 @@ func (verifier *Verifier) Verify(ctx context.Context, stripe *Stripe) (verifiedN
}, nil
}
// Download Shares downloads shares from the nodes where remote pieces are located
func (d *defaultDownloader) DownloadShares(ctx context.Context, limits []*pb.AddressedOrderLimit, stripeIndex int64, shareSize int32) (shares map[int]Share, nodes map[int]storj.NodeID, err error) {
defer mon.Task()(&ctx)(&err)
shares = make(map[int]Share, len(limits))
nodes = make(map[int]storj.NodeID, len(limits))
for i, limit := range limits {
if limit == nil {
continue
}
share, err := d.getShare(ctx, limit, stripeIndex, shareSize, i)
if err != nil {
share = Share{
Error: err,
PieceNum: i,
Data: nil,
}
}
shares[share.PieceNum] = share
nodes[share.PieceNum] = limit.GetLimit().StorageNodeId
}
return shares, nodes, nil
}
// getShare use piece store client to download shares from nodes
func (d *defaultDownloader) getShare(ctx context.Context, limit *pb.AddressedOrderLimit, stripeIndex int64, shareSize int32, pieceNum int) (share Share, err error) {
defer mon.Task()(&ctx)(&err)
bandwidthMsgSize := shareSize
// determines number of seconds allotted for receiving data from a storage node
seconds := time.Duration(int32(time.Second) * bandwidthMsgSize / d.minBytesPerSecond.Int32())
timedCtx, cancel := context.WithTimeout(ctx, seconds)
defer cancel()
storageNodeID := limit.GetLimit().StorageNodeId
conn, err := d.transport.DialNode(timedCtx, &pb.Node{
Id: storageNodeID,
Address: limit.GetStorageNodeAddress(),
Type: pb.NodeType_STORAGE,
})
if err != nil {
return Share{}, err
}
ps := piecestore.NewClient(
d.log.Named(storageNodeID.String()),
signing.SignerFromFullIdentity(d.transport.Identity()),
conn,
piecestore.DefaultConfig,
)
offset := int64(shareSize) * stripeIndex
downloader, err := ps.Download(timedCtx, limit.GetLimit(), offset, int64(shareSize))
if err != nil {
return Share{}, err
}
defer func() { err = errs.Combine(err, downloader.Close()) }()
buf := make([]byte, shareSize)
_, err = io.ReadFull(downloader, buf)
if err != nil {
return Share{}, err
}
return Share{
Error: nil,
PieceNum: pieceNum,
Data: buf,
}, nil
}
// auditShares takes the downloaded shares and uses infectious's Correct function to check that they
// haven't been altered. auditShares returns a slice containing the piece numbers of altered shares.
func auditShares(ctx context.Context, required, total int, originals map[int]Share) (pieceNums []int, err error) {
defer mon.Task()(&ctx)(&err)
f, err := infectious.NewFEC(required, total)
if err != nil {
return nil, err
}
copies, err := makeCopies(ctx, originals)
if err != nil {
return nil, err
}
err = f.Correct(copies)
if err != nil {
return nil, err
}
for _, share := range copies {
if !bytes.Equal(originals[share.Number].Data, share.Data) {
pieceNums = append(pieceNums, share.Number)
}
}
return pieceNums, nil
}
// makeCopies takes in a map of audit Shares and deep copies their data to a slice of infectious Shares
func makeCopies(ctx context.Context, originals map[int]Share) (copies []infectious.Share, err error) {
defer mon.Task()(&ctx)(&err)
copies = make([]infectious.Share, 0, len(originals))
for _, original := range originals {
copies = append(copies, infectious.Share{
Data: append([]byte{}, original.Data...),
Number: original.PieceNum})
}
return copies, nil
}
// getSuccessNodes uses the failed nodes and offline nodes arrays to determine which nodes passed the audit
func getSuccessNodes(ctx context.Context, nodes map[int]storj.NodeID, failedNodes, offlineNodes storj.NodeIDList) (successNodes storj.NodeIDList) {
fails := make(map[storj.NodeID]bool)