remove downloader from verifier (#1983)

* remove downloader from verifier
This commit is contained in:
Bill Thorp 2019-05-17 14:48:32 -04:00 committed by GitHub
parent 5d24af9b41
commit b23afb7aaa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 22 additions and 47 deletions

View File

@ -1,11 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package audit
import (
"github.com/zeebo/errs"
)
// Error is the default audit errs class
var Error = errs.Class("audit error")

View File

@ -7,6 +7,7 @@ import (
"context"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/storj/internal/memory"
@ -18,6 +19,9 @@ import (
"storj.io/storj/satellite/orders"
)
// Error is the default audit errs class
var Error = errs.Class("audit error")
// Config contains configurable values for audit service
type Config struct {
MaxRetriesStatDB int `help:"max number of times to attempt updating a statdb batch" default:"3"`

View File

@ -38,34 +38,17 @@ type Share struct {
// Verifier helps verify the correctness of a given stripe
type Verifier struct {
orders *orders.Service
auditor *identity.PeerIdentity
downloader downloader
}
type downloader interface {
DownloadShares(ctx context.Context, limits []*pb.AddressedOrderLimit, stripeIndex int64, shareSize int32) (shares map[int]Share, nodes map[int]storj.NodeID, err error)
}
// defaultDownloader downloads shares from networked storage nodes
type defaultDownloader struct {
log *zap.Logger
transport transport.Client
overlay *overlay.Cache
reporter
log *zap.Logger
orders *orders.Service
auditor *identity.PeerIdentity
transport transport.Client
overlay *overlay.Cache
minBytesPerSecond memory.Size
}
// newDefaultDownloader creates a defaultDownloader
func newDefaultDownloader(log *zap.Logger, transport transport.Client, overlay *overlay.Cache, id *identity.FullIdentity, minBytesPerSecond memory.Size) *defaultDownloader {
return &defaultDownloader{log: log, transport: transport, overlay: overlay, minBytesPerSecond: minBytesPerSecond}
}
// NewVerifier creates a Verifier
func NewVerifier(log *zap.Logger, transport transport.Client, overlay *overlay.Cache, orders *orders.Service, id *identity.FullIdentity, minBytesPerSecond memory.Size) *Verifier {
return &Verifier{downloader: newDefaultDownloader(log, transport, overlay, id, minBytesPerSecond), orders: orders, auditor: id.PeerIdentity()}
return &Verifier{log: log, orders: orders, auditor: id.PeerIdentity(), transport: transport, overlay: overlay, minBytesPerSecond: minBytesPerSecond}
}
// Verify downloads shares then verifies the data correctness at the given stripe
@ -81,7 +64,7 @@ func (verifier *Verifier) Verify(ctx context.Context, stripe *Stripe) (verifiedN
return nil, err
}
shares, nodes, err := verifier.downloader.DownloadShares(ctx, orderLimits, stripe.Index, shareSize)
shares, nodes, err := verifier.DownloadShares(ctx, orderLimits, stripe.Index, shareSize)
if err != nil {
return nil, err
}
@ -92,8 +75,7 @@ func (verifier *Verifier) Verify(ctx context.Context, stripe *Stripe) (verifiedN
for pieceNum, share := range shares {
if shares[pieceNum].Error != nil {
if shares[pieceNum].Error == context.DeadlineExceeded ||
!transport.Error.Has(shares[pieceNum].Error) {
if shares[pieceNum].Error == context.DeadlineExceeded || !transport.Error.Has(shares[pieceNum].Error) {
failedNodes = append(failedNodes, nodes[pieceNum])
} else {
offlineNodes = append(offlineNodes, nodes[pieceNum])
@ -132,8 +114,8 @@ func (verifier *Verifier) Verify(ctx context.Context, stripe *Stripe) (verifiedN
}, nil
}
// Download Shares downloads shares from the nodes where remote pieces are located
func (d *defaultDownloader) DownloadShares(ctx context.Context, limits []*pb.AddressedOrderLimit, stripeIndex int64, shareSize int32) (shares map[int]Share, nodes map[int]storj.NodeID, err error) {
// DownloadShares downloads shares from the nodes where remote pieces are located
func (verifier *Verifier) DownloadShares(ctx context.Context, limits []*pb.AddressedOrderLimit, stripeIndex int64, shareSize int32) (shares map[int]Share, nodes map[int]storj.NodeID, err error) {
defer mon.Task()(&ctx)(&err)
shares = make(map[int]Share, len(limits))
@ -144,7 +126,7 @@ func (d *defaultDownloader) DownloadShares(ctx context.Context, limits []*pb.Add
continue
}
share, err := d.getShare(ctx, limit, stripeIndex, shareSize, i)
share, err := verifier.getShare(ctx, limit, stripeIndex, shareSize, i)
if err != nil {
share = Share{
Error: err,
@ -161,15 +143,15 @@ func (d *defaultDownloader) DownloadShares(ctx context.Context, limits []*pb.Add
}
// getShare use piece store client to download shares from nodes
func (d *defaultDownloader) getShare(ctx context.Context, limit *pb.AddressedOrderLimit, stripeIndex int64, shareSize int32, pieceNum int) (share Share, err error) {
func (verifier *Verifier) getShare(ctx context.Context, limit *pb.AddressedOrderLimit, stripeIndex int64, shareSize int32, pieceNum int) (share Share, err error) {
defer mon.Task()(&ctx)(&err)
bandwidthMsgSize := shareSize
// determines number of seconds allotted for receiving data from a storage node
timedCtx := ctx
if d.minBytesPerSecond > 0 {
maxTransferTime := time.Duration(int64(time.Second) * int64(bandwidthMsgSize) / d.minBytesPerSecond.Int64())
if verifier.minBytesPerSecond > 0 {
maxTransferTime := time.Duration(int64(time.Second) * int64(bandwidthMsgSize) / verifier.minBytesPerSecond.Int64())
if maxTransferTime < (5 * time.Second) {
maxTransferTime = 5 * time.Second
}
@ -180,7 +162,7 @@ func (d *defaultDownloader) getShare(ctx context.Context, limit *pb.AddressedOrd
storageNodeID := limit.GetLimit().StorageNodeId
conn, err := d.transport.DialNode(timedCtx, &pb.Node{
conn, err := verifier.transport.DialNode(timedCtx, &pb.Node{
Id: storageNodeID,
Address: limit.GetStorageNodeAddress(),
})
@ -188,15 +170,15 @@ func (d *defaultDownloader) getShare(ctx context.Context, limit *pb.AddressedOrd
return Share{}, err
}
ps := piecestore.NewClient(
d.log.Named(storageNodeID.String()),
signing.SignerFromFullIdentity(d.transport.Identity()),
verifier.log.Named(storageNodeID.String()),
signing.SignerFromFullIdentity(verifier.transport.Identity()),
conn,
piecestore.DefaultConfig,
)
defer func() {
err := ps.Close()
if err != nil {
d.log.Error("audit verifier failed to close conn to node: %+v", zap.Error(err))
verifier.log.Error("audit verifier failed to close conn to node: %+v", zap.Error(err))
}
}()