2019-01-24 20:15:10 +00:00
|
|
|
// Copyright (C) 2019 Storj Labs, Inc.
|
2018-10-09 22:10:37 +01:00
|
|
|
// See LICENSE for copying information.
|
|
|
|
|
|
|
|
package audit
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"context"
|
|
|
|
"io"
|
2019-03-19 17:37:26 +00:00
|
|
|
"time"
|
2018-10-09 22:10:37 +01:00
|
|
|
|
|
|
|
"github.com/vivint/infectious"
|
2019-01-29 20:42:27 +00:00
|
|
|
"github.com/zeebo/errs"
|
2019-03-18 10:55:06 +00:00
|
|
|
"go.uber.org/zap"
|
2018-12-14 20:17:30 +00:00
|
|
|
monkit "gopkg.in/spacemonkeygo/monkit.v2"
|
2018-10-09 22:10:37 +01:00
|
|
|
|
2019-03-19 17:37:26 +00:00
|
|
|
"storj.io/storj/internal/memory"
|
2019-03-18 10:55:06 +00:00
|
|
|
"storj.io/storj/pkg/auth/signing"
|
2019-01-30 20:47:21 +00:00
|
|
|
"storj.io/storj/pkg/identity"
|
2018-10-09 22:10:37 +01:00
|
|
|
"storj.io/storj/pkg/overlay"
|
|
|
|
"storj.io/storj/pkg/pb"
|
2018-11-29 18:39:27 +00:00
|
|
|
"storj.io/storj/pkg/storj"
|
2018-10-09 22:10:37 +01:00
|
|
|
"storj.io/storj/pkg/transport"
|
2019-03-28 20:09:23 +00:00
|
|
|
"storj.io/storj/satellite/orders"
|
2019-03-18 10:55:06 +00:00
|
|
|
"storj.io/storj/uplink/piecestore"
|
2018-10-09 22:10:37 +01:00
|
|
|
)
|
|
|
|
|
2019-03-19 17:37:26 +00:00
|
|
|
var (
|
|
|
|
mon = monkit.Package()
|
|
|
|
)
|
2018-10-09 22:10:37 +01:00
|
|
|
|
2019-01-23 19:58:44 +00:00
|
|
|
// Share represents required information about an audited share
|
|
|
|
type Share struct {
|
2019-03-18 10:55:06 +00:00
|
|
|
Error error
|
|
|
|
PieceNum int
|
|
|
|
Data []byte
|
2018-10-09 22:10:37 +01:00
|
|
|
}
|
|
|
|
|
2018-10-10 19:25:46 +01:00
|
|
|
// Verifier helps verify the correctness of a given stripe
|
|
|
|
type Verifier struct {
|
2019-05-17 19:48:32 +01:00
|
|
|
log *zap.Logger
|
|
|
|
orders *orders.Service
|
|
|
|
auditor *identity.PeerIdentity
|
|
|
|
transport transport.Client
|
|
|
|
overlay *overlay.Cache
|
2019-03-19 17:37:26 +00:00
|
|
|
minBytesPerSecond memory.Size
|
2018-10-09 22:10:37 +01:00
|
|
|
}
|
|
|
|
|
2018-10-10 19:25:46 +01:00
|
|
|
// NewVerifier creates a Verifier
|
2019-03-28 20:09:23 +00:00
|
|
|
func NewVerifier(log *zap.Logger, transport transport.Client, overlay *overlay.Cache, orders *orders.Service, id *identity.FullIdentity, minBytesPerSecond memory.Size) *Verifier {
|
2019-05-17 19:48:32 +01:00
|
|
|
return &Verifier{log: log, orders: orders, auditor: id.PeerIdentity(), transport: transport, overlay: overlay, minBytesPerSecond: minBytesPerSecond}
|
2018-10-09 22:10:37 +01:00
|
|
|
}
|
|
|
|
|
2019-03-20 10:54:37 +00:00
|
|
|
// Verify downloads shares then verifies the data correctness at the given stripe
|
|
|
|
func (verifier *Verifier) Verify(ctx context.Context, stripe *Stripe) (verifiedNodes *RecordAuditsInfo, err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
|
|
|
pointer := stripe.Segment
|
|
|
|
shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize()
|
2019-03-28 20:09:23 +00:00
|
|
|
bucketID := createBucketID(stripe.SegmentPath)
|
|
|
|
|
|
|
|
orderLimits, err := verifier.orders.CreateAuditOrderLimits(ctx, verifier.auditor, bucketID, pointer)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2019-03-20 10:54:37 +00:00
|
|
|
|
2019-05-17 19:48:32 +01:00
|
|
|
shares, nodes, err := verifier.DownloadShares(ctx, orderLimits, stripe.Index, shareSize)
|
2019-03-20 10:54:37 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
var offlineNodes storj.NodeIDList
|
|
|
|
var failedNodes storj.NodeIDList
|
|
|
|
sharesToAudit := make(map[int]Share)
|
|
|
|
|
|
|
|
for pieceNum, share := range shares {
|
|
|
|
if shares[pieceNum].Error != nil {
|
2019-05-17 19:48:32 +01:00
|
|
|
if shares[pieceNum].Error == context.DeadlineExceeded || !transport.Error.Has(shares[pieceNum].Error) {
|
2019-03-20 10:54:37 +00:00
|
|
|
failedNodes = append(failedNodes, nodes[pieceNum])
|
|
|
|
} else {
|
|
|
|
offlineNodes = append(offlineNodes, nodes[pieceNum])
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
sharesToAudit[pieceNum] = share
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
required := int(pointer.Remote.Redundancy.GetMinReq())
|
|
|
|
total := int(pointer.Remote.Redundancy.GetTotal())
|
|
|
|
|
|
|
|
if len(sharesToAudit) < required {
|
|
|
|
return &RecordAuditsInfo{
|
|
|
|
SuccessNodeIDs: nil,
|
|
|
|
FailNodeIDs: failedNodes,
|
|
|
|
OfflineNodeIDs: offlineNodes,
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
pieceNums, err := auditShares(ctx, required, total, sharesToAudit)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, pieceNum := range pieceNums {
|
|
|
|
failedNodes = append(failedNodes, nodes[pieceNum])
|
|
|
|
}
|
|
|
|
|
|
|
|
successNodes := getSuccessNodes(ctx, nodes, failedNodes, offlineNodes)
|
|
|
|
|
|
|
|
return &RecordAuditsInfo{
|
|
|
|
SuccessNodeIDs: successNodes,
|
|
|
|
FailNodeIDs: failedNodes,
|
|
|
|
OfflineNodeIDs: offlineNodes,
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
2019-05-17 19:48:32 +01:00
|
|
|
// DownloadShares downloads shares from the nodes where remote pieces are located
|
|
|
|
func (verifier *Verifier) DownloadShares(ctx context.Context, limits []*pb.AddressedOrderLimit, stripeIndex int64, shareSize int32) (shares map[int]Share, nodes map[int]storj.NodeID, err error) {
|
2019-03-20 10:54:37 +00:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
|
|
|
shares = make(map[int]Share, len(limits))
|
|
|
|
nodes = make(map[int]storj.NodeID, len(limits))
|
|
|
|
|
|
|
|
for i, limit := range limits {
|
|
|
|
if limit == nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2019-05-17 19:48:32 +01:00
|
|
|
share, err := verifier.getShare(ctx, limit, stripeIndex, shareSize, i)
|
2019-03-20 10:54:37 +00:00
|
|
|
if err != nil {
|
|
|
|
share = Share{
|
|
|
|
Error: err,
|
|
|
|
PieceNum: i,
|
|
|
|
Data: nil,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
shares[share.PieceNum] = share
|
|
|
|
nodes[share.PieceNum] = limit.GetLimit().StorageNodeId
|
|
|
|
}
|
|
|
|
|
|
|
|
return shares, nodes, nil
|
|
|
|
}
|
|
|
|
|
2019-03-18 10:55:06 +00:00
|
|
|
// getShare use piece store client to download shares from nodes
|
2019-05-17 19:48:32 +01:00
|
|
|
func (verifier *Verifier) getShare(ctx context.Context, limit *pb.AddressedOrderLimit, stripeIndex int64, shareSize int32, pieceNum int) (share Share, err error) {
|
2018-10-09 22:10:37 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-01-10 20:13:40 +00:00
|
|
|
|
2019-03-19 17:37:26 +00:00
|
|
|
bandwidthMsgSize := shareSize
|
|
|
|
|
|
|
|
// determines number of seconds allotted for receiving data from a storage node
|
2019-03-22 13:14:17 +00:00
|
|
|
timedCtx := ctx
|
2019-05-17 19:48:32 +01:00
|
|
|
if verifier.minBytesPerSecond > 0 {
|
|
|
|
maxTransferTime := time.Duration(int64(time.Second) * int64(bandwidthMsgSize) / verifier.minBytesPerSecond.Int64())
|
2019-04-03 18:17:29 +01:00
|
|
|
if maxTransferTime < (5 * time.Second) {
|
|
|
|
maxTransferTime = 5 * time.Second
|
|
|
|
}
|
2019-03-22 13:14:17 +00:00
|
|
|
var cancel func()
|
|
|
|
timedCtx, cancel = context.WithTimeout(ctx, maxTransferTime)
|
|
|
|
defer cancel()
|
|
|
|
}
|
2019-03-19 17:37:26 +00:00
|
|
|
|
2019-03-18 10:55:06 +00:00
|
|
|
storageNodeID := limit.GetLimit().StorageNodeId
|
2018-10-09 22:10:37 +01:00
|
|
|
|
2019-05-17 19:48:32 +01:00
|
|
|
conn, err := verifier.transport.DialNode(timedCtx, &pb.Node{
|
2019-03-18 10:55:06 +00:00
|
|
|
Id: storageNodeID,
|
|
|
|
Address: limit.GetStorageNodeAddress(),
|
|
|
|
})
|
2018-10-09 22:10:37 +01:00
|
|
|
if err != nil {
|
2019-03-18 10:55:06 +00:00
|
|
|
return Share{}, err
|
2018-10-09 22:10:37 +01:00
|
|
|
}
|
2019-03-18 10:55:06 +00:00
|
|
|
ps := piecestore.NewClient(
|
2019-05-17 19:48:32 +01:00
|
|
|
verifier.log.Named(storageNodeID.String()),
|
|
|
|
signing.SignerFromFullIdentity(verifier.transport.Identity()),
|
2019-03-18 10:55:06 +00:00
|
|
|
conn,
|
|
|
|
piecestore.DefaultConfig,
|
|
|
|
)
|
2019-04-03 14:42:24 +01:00
|
|
|
defer func() {
|
|
|
|
err := ps.Close()
|
|
|
|
if err != nil {
|
2019-05-17 19:48:32 +01:00
|
|
|
verifier.log.Error("audit verifier failed to close conn to node: %+v", zap.Error(err))
|
2019-04-03 14:42:24 +01:00
|
|
|
}
|
|
|
|
}()
|
2018-10-09 22:10:37 +01:00
|
|
|
|
2019-03-18 10:55:06 +00:00
|
|
|
offset := int64(shareSize) * stripeIndex
|
2018-10-09 22:10:37 +01:00
|
|
|
|
2019-03-19 17:37:26 +00:00
|
|
|
downloader, err := ps.Download(timedCtx, limit.GetLimit(), offset, int64(shareSize))
|
2018-10-09 22:10:37 +01:00
|
|
|
if err != nil {
|
2019-03-18 10:55:06 +00:00
|
|
|
return Share{}, err
|
2018-10-09 22:10:37 +01:00
|
|
|
}
|
2019-03-18 10:55:06 +00:00
|
|
|
defer func() { err = errs.Combine(err, downloader.Close()) }()
|
2018-10-09 22:10:37 +01:00
|
|
|
|
|
|
|
buf := make([]byte, shareSize)
|
2019-03-18 10:55:06 +00:00
|
|
|
_, err = io.ReadFull(downloader, buf)
|
2018-10-09 22:10:37 +01:00
|
|
|
if err != nil {
|
2019-03-18 10:55:06 +00:00
|
|
|
return Share{}, err
|
2018-10-09 22:10:37 +01:00
|
|
|
}
|
|
|
|
|
2019-03-18 10:55:06 +00:00
|
|
|
return Share{
|
|
|
|
Error: nil,
|
|
|
|
PieceNum: pieceNum,
|
|
|
|
Data: buf,
|
|
|
|
}, nil
|
2018-10-09 22:10:37 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// auditShares takes the downloaded shares and uses infectious's Correct function to check that they
|
|
|
|
// haven't been altered. auditShares returns a slice containing the piece numbers of altered shares.
|
2019-01-23 19:58:44 +00:00
|
|
|
func auditShares(ctx context.Context, required, total int, originals map[int]Share) (pieceNums []int, err error) {
|
2018-10-09 22:10:37 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
f, err := infectious.NewFEC(required, total)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2018-11-07 01:16:43 +00:00
|
|
|
|
2018-10-09 22:10:37 +01:00
|
|
|
copies, err := makeCopies(ctx, originals)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
err = f.Correct(copies)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2019-03-19 17:37:26 +00:00
|
|
|
|
2018-11-28 07:33:17 +00:00
|
|
|
for _, share := range copies {
|
|
|
|
if !bytes.Equal(originals[share.Number].Data, share.Data) {
|
2018-10-09 22:10:37 +01:00
|
|
|
pieceNums = append(pieceNums, share.Number)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return pieceNums, nil
|
|
|
|
}
|
|
|
|
|
2019-03-20 10:54:37 +00:00
|
|
|
// makeCopies takes in a map of audit Shares and deep copies their data to a slice of infectious Shares
|
|
|
|
func makeCopies(ctx context.Context, originals map[int]Share) (copies []infectious.Share, err error) {
|
2018-10-09 22:10:37 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-03-20 10:54:37 +00:00
|
|
|
copies = make([]infectious.Share, 0, len(originals))
|
|
|
|
for _, original := range originals {
|
|
|
|
copies = append(copies, infectious.Share{
|
|
|
|
Data: append([]byte{}, original.Data...),
|
|
|
|
Number: original.PieceNum})
|
2018-10-16 18:40:34 +01:00
|
|
|
}
|
2019-03-20 10:54:37 +00:00
|
|
|
return copies, nil
|
2018-10-16 18:40:34 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// getSuccessNodes uses the failed nodes and offline nodes arrays to determine which nodes passed the audit
|
2019-02-01 14:48:57 +00:00
|
|
|
func getSuccessNodes(ctx context.Context, nodes map[int]storj.NodeID, failedNodes, offlineNodes storj.NodeIDList) (successNodes storj.NodeIDList) {
|
2018-11-29 18:39:27 +00:00
|
|
|
fails := make(map[storj.NodeID]bool)
|
2018-10-16 18:40:34 +01:00
|
|
|
for _, fail := range failedNodes {
|
|
|
|
fails[fail] = true
|
2018-10-09 22:10:37 +01:00
|
|
|
}
|
2018-10-16 18:40:34 +01:00
|
|
|
for _, offline := range offlineNodes {
|
|
|
|
fails[offline] = true
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, node := range nodes {
|
2019-02-01 14:48:57 +00:00
|
|
|
if !fails[node] {
|
|
|
|
successNodes = append(successNodes, node)
|
2018-10-16 18:40:34 +01:00
|
|
|
}
|
|
|
|
}
|
2019-02-01 14:48:57 +00:00
|
|
|
|
2018-10-16 18:40:34 +01:00
|
|
|
return successNodes
|
|
|
|
}
|
2019-03-28 20:09:23 +00:00
|
|
|
|
|
|
|
func createBucketID(path storj.Path) []byte {
|
|
|
|
comps := storj.SplitPath(path)
|
2019-04-01 21:14:58 +01:00
|
|
|
if len(comps) < 3 {
|
2019-03-28 20:09:23 +00:00
|
|
|
return nil
|
|
|
|
}
|
2019-04-01 21:14:58 +01:00
|
|
|
// project_id/bucket_name
|
|
|
|
return []byte(storj.JoinPaths(comps[0], comps[2]))
|
2019-03-28 20:09:23 +00:00
|
|
|
}
|