2019-01-24 20:15:10 +00:00
// Copyright (C) 2019 Storj Labs, Inc.
2018-10-09 22:10:37 +01:00
// See LICENSE for copying information.
package audit
import (
"bytes"
"context"
"io"
2019-03-19 17:37:26 +00:00
"time"
2018-10-09 22:10:37 +01:00
"github.com/vivint/infectious"
2019-01-29 20:42:27 +00:00
"github.com/zeebo/errs"
2019-03-18 10:55:06 +00:00
"go.uber.org/zap"
2019-06-07 13:38:41 +01:00
"google.golang.org/grpc/codes"
2018-12-14 20:17:30 +00:00
monkit "gopkg.in/spacemonkeygo/monkit.v2"
2018-10-09 22:10:37 +01:00
2019-06-26 08:38:07 +01:00
"storj.io/storj/internal/errs2"
2019-03-19 17:37:26 +00:00
"storj.io/storj/internal/memory"
2019-01-30 20:47:21 +00:00
"storj.io/storj/pkg/identity"
2018-10-09 22:10:37 +01:00
"storj.io/storj/pkg/pb"
2019-05-23 21:07:19 +01:00
"storj.io/storj/pkg/pkcrypto"
2018-11-29 18:39:27 +00:00
"storj.io/storj/pkg/storj"
2018-10-09 22:10:37 +01:00
"storj.io/storj/pkg/transport"
2019-06-19 10:02:25 +01:00
"storj.io/storj/satellite/metainfo"
2019-03-28 20:09:23 +00:00
"storj.io/storj/satellite/orders"
2019-07-28 06:55:36 +01:00
"storj.io/storj/satellite/overlay"
2019-06-19 10:02:25 +01:00
"storj.io/storj/storage"
2019-03-18 10:55:06 +00:00
"storj.io/storj/uplink/piecestore"
2018-10-09 22:10:37 +01:00
)
2019-03-19 17:37:26 +00:00
var (
mon = monkit . Package ( )
2019-05-27 12:13:47 +01:00
// ErrNotEnoughShares is the errs class for when not enough shares are available to do an audit
ErrNotEnoughShares = errs . Class ( "not enough shares for successful audit" )
2019-06-19 10:02:25 +01:00
// ErrSegmentDeleted is the errs class when the audited segment was deleted during the audit
ErrSegmentDeleted = errs . Class ( "segment deleted during audit" )
2019-03-19 17:37:26 +00:00
)
2018-10-09 22:10:37 +01:00
2019-01-23 19:58:44 +00:00
// Share represents required information about an audited share
type Share struct {
2019-03-18 10:55:06 +00:00
Error error
PieceNum int
2019-06-11 09:00:59 +01:00
NodeID storj . NodeID
2019-03-18 10:55:06 +00:00
Data [ ] byte
2018-10-09 22:10:37 +01:00
}
2018-10-10 19:25:46 +01:00
// Verifier helps verify the correctness of a given stripe
type Verifier struct {
2019-06-03 10:17:09 +01:00
log * zap . Logger
2019-06-19 10:02:25 +01:00
metainfo * metainfo . Service
2019-06-03 10:17:09 +01:00
orders * orders . Service
auditor * identity . PeerIdentity
transport transport . Client
2019-08-06 17:35:59 +01:00
overlay * overlay . Service
2019-06-03 10:17:09 +01:00
containment Containment
minBytesPerSecond memory . Size
minDownloadTimeout time . Duration
2018-10-09 22:10:37 +01:00
}
2018-10-10 19:25:46 +01:00
// NewVerifier creates a Verifier
2019-08-06 17:35:59 +01:00
func NewVerifier ( log * zap . Logger , metainfo * metainfo . Service , transport transport . Client , overlay * overlay . Service , containment Containment , orders * orders . Service , id * identity . FullIdentity , minBytesPerSecond memory . Size , minDownloadTimeout time . Duration ) * Verifier {
2019-05-27 12:13:47 +01:00
return & Verifier {
2019-06-03 10:17:09 +01:00
log : log ,
2019-06-19 10:02:25 +01:00
metainfo : metainfo ,
2019-06-03 10:17:09 +01:00
orders : orders ,
auditor : id . PeerIdentity ( ) ,
transport : transport ,
overlay : overlay ,
containment : containment ,
minBytesPerSecond : minBytesPerSecond ,
minDownloadTimeout : minDownloadTimeout ,
2019-05-27 12:13:47 +01:00
}
2018-10-09 22:10:37 +01:00
}
2019-03-20 10:54:37 +00:00
// Verify downloads shares then verifies the data correctness at the given stripe
2019-05-27 12:13:47 +01:00
func ( verifier * Verifier ) Verify ( ctx context . Context , stripe * Stripe , skip map [ storj . NodeID ] bool ) ( report * Report , err error ) {
2019-03-20 10:54:37 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
pointer := stripe . Segment
shareSize := pointer . GetRemote ( ) . GetRedundancy ( ) . GetErasureShareSize ( )
2019-03-28 20:09:23 +00:00
bucketID := createBucketID ( stripe . SegmentPath )
2019-06-07 13:38:41 +01:00
var offlineNodes storj . NodeIDList
var failedNodes storj . NodeIDList
containedNodes := make ( map [ int ] storj . NodeID )
sharesToAudit := make ( map [ int ] Share )
2019-07-11 21:51:40 +01:00
orderLimits , privateKey , err := verifier . orders . CreateAuditOrderLimits ( ctx , bucketID , pointer , skip )
2019-03-28 20:09:23 +00:00
if err != nil {
return nil , err
}
2019-03-20 10:54:37 +00:00
2019-08-20 15:23:14 +01:00
// NOTE offlineNodes will include disqualified nodes because they aren't in
// the skip list
2019-06-07 13:38:41 +01:00
offlineNodes = getOfflineNodes ( stripe . Segment , orderLimits , skip )
if len ( offlineNodes ) > 0 {
2019-08-29 13:38:26 +01:00
verifier . log . Debug ( "Verify: order limits not created for some nodes (offline/disqualified)" , zap . String ( "Segment Path" , stripe . SegmentPath ) , zap . Strings ( "Node IDs" , offlineNodes . Strings ( ) ) )
2019-06-07 13:38:41 +01:00
}
2019-07-11 21:51:40 +01:00
shares , err := verifier . DownloadShares ( ctx , orderLimits , privateKey , stripe . Index , shareSize )
2019-03-20 10:54:37 +00:00
if err != nil {
2019-06-19 10:02:25 +01:00
return & Report {
Offlines : offlineNodes ,
} , err
}
2019-07-18 19:08:15 +01:00
_ , err = verifier . checkIfSegmentDeleted ( ctx , stripe . SegmentPath , stripe . Segment )
2019-06-19 10:02:25 +01:00
if err != nil {
return & Report {
Offlines : offlineNodes ,
} , err
2019-03-20 10:54:37 +00:00
}
for pieceNum , share := range shares {
2019-06-07 13:38:41 +01:00
if share . Error == nil {
// no error -- share downloaded successfully
sharesToAudit [ pieceNum ] = share
continue
}
if transport . Error . Has ( share . Error ) {
2019-06-26 08:38:07 +01:00
if errs . Is ( share . Error , context . DeadlineExceeded ) {
2019-06-07 13:38:41 +01:00
// dial timeout
2019-06-11 09:00:59 +01:00
offlineNodes = append ( offlineNodes , share . NodeID )
2019-08-29 13:38:26 +01:00
verifier . log . Debug ( "Verify: dial timeout (offline)" , zap . String ( "Segment Path" , stripe . SegmentPath ) , zap . Stringer ( "Node ID" , share . NodeID ) , zap . Error ( share . Error ) )
2019-06-07 13:38:41 +01:00
continue
2019-03-20 10:54:37 +00:00
}
2019-06-26 08:38:07 +01:00
if errs2 . IsRPC ( share . Error , codes . Unknown ) {
2019-06-07 13:38:41 +01:00
// dial failed -- offline node
2019-06-11 09:00:59 +01:00
offlineNodes = append ( offlineNodes , share . NodeID )
2019-08-29 13:38:26 +01:00
verifier . log . Debug ( "Verify: dial failed (offline)" , zap . String ( "Segment Path" , stripe . SegmentPath ) , zap . Stringer ( "Node ID" , share . NodeID ) , zap . Error ( share . Error ) )
2019-06-07 13:38:41 +01:00
continue
}
// unknown transport error
2019-06-11 09:00:59 +01:00
containedNodes [ pieceNum ] = share . NodeID
2019-08-29 13:38:26 +01:00
verifier . log . Debug ( "Verify: unknown transport error (contained)" , zap . String ( "Segment Path" , stripe . SegmentPath ) , zap . Stringer ( "Node ID" , share . NodeID ) , zap . Error ( share . Error ) )
2019-06-07 13:38:41 +01:00
}
2019-06-26 08:38:07 +01:00
if errs2 . IsRPC ( share . Error , codes . NotFound ) {
2019-06-07 13:38:41 +01:00
// missing share
2019-06-11 09:00:59 +01:00
failedNodes = append ( failedNodes , share . NodeID )
2019-08-29 13:38:26 +01:00
verifier . log . Debug ( "Verify: piece not found (audit failed)" , zap . String ( "Segment Path" , stripe . SegmentPath ) , zap . Stringer ( "Node ID" , share . NodeID ) , zap . Error ( share . Error ) )
2019-06-07 13:38:41 +01:00
continue
2019-03-20 10:54:37 +00:00
}
2019-06-07 13:38:41 +01:00
2019-06-26 08:38:07 +01:00
if errs2 . IsRPC ( share . Error , codes . DeadlineExceeded ) {
2019-06-07 13:38:41 +01:00
// dial successful, but download timed out
2019-06-11 09:00:59 +01:00
containedNodes [ pieceNum ] = share . NodeID
2019-08-29 13:38:26 +01:00
verifier . log . Debug ( "Verify: download timeout (contained)" , zap . String ( "Segment Path" , stripe . SegmentPath ) , zap . Stringer ( "Node ID" , share . NodeID ) , zap . Error ( share . Error ) )
2019-06-07 13:38:41 +01:00
continue
}
// unknown error
2019-06-11 09:00:59 +01:00
containedNodes [ pieceNum ] = share . NodeID
2019-08-29 13:38:26 +01:00
verifier . log . Debug ( "Verify: unknown error (contained)" , zap . String ( "Segment Path" , stripe . SegmentPath ) , zap . Stringer ( "Node ID" , share . NodeID ) , zap . Error ( share . Error ) )
2019-03-20 10:54:37 +00:00
}
required := int ( pointer . Remote . Redundancy . GetMinReq ( ) )
total := int ( pointer . Remote . Redundancy . GetTotal ( ) )
if len ( sharesToAudit ) < required {
2019-05-23 23:32:19 +01:00
return & Report {
2019-06-07 13:38:41 +01:00
Fails : failedNodes ,
2019-05-23 23:32:19 +01:00
Offlines : offlineNodes ,
2019-05-27 12:13:47 +01:00
} , ErrNotEnoughShares . New ( "got %d, required %d" , len ( sharesToAudit ) , required )
2019-03-20 10:54:37 +00:00
}
2019-05-23 21:07:19 +01:00
pieceNums , correctedShares , err := auditShares ( ctx , required , total , sharesToAudit )
2019-03-20 10:54:37 +00:00
if err != nil {
2019-05-23 23:32:19 +01:00
return & Report {
2019-06-07 13:38:41 +01:00
Fails : failedNodes ,
2019-05-23 23:32:19 +01:00
Offlines : offlineNodes ,
2019-05-23 21:07:19 +01:00
} , err
2019-03-20 10:54:37 +00:00
}
for _ , pieceNum := range pieceNums {
2019-06-11 09:00:59 +01:00
failedNodes = append ( failedNodes , shares [ pieceNum ] . NodeID )
2019-03-20 10:54:37 +00:00
}
2019-07-18 19:08:15 +01:00
// remove failed audit pieces from the pointer so as to only penalize once for failed audits
err = verifier . removeFailedPieces ( ctx , stripe . SegmentPath , stripe . Segment , failedNodes )
if err != nil {
2019-08-29 13:38:26 +01:00
verifier . log . Warn ( "Verify: failed to delete failed pieces" , zap . String ( "Segment Path" , stripe . SegmentPath ) , zap . Error ( err ) )
2019-07-18 19:08:15 +01:00
}
2019-03-20 10:54:37 +00:00
2019-06-11 09:00:59 +01:00
successNodes := getSuccessNodes ( ctx , shares , failedNodes , offlineNodes , containedNodes )
2019-05-23 21:07:19 +01:00
2019-05-31 21:46:25 +01:00
totalInPointer := len ( stripe . Segment . GetRemote ( ) . GetRemotePieces ( ) )
numOffline := len ( offlineNodes )
numSuccessful := len ( successNodes )
numFailed := len ( failedNodes )
2019-06-07 13:38:41 +01:00
numContained := len ( containedNodes )
totalAudited := numSuccessful + numFailed + numOffline + numContained
2019-05-31 21:46:25 +01:00
auditedPercentage := float64 ( totalAudited ) / float64 ( totalInPointer )
offlinePercentage := float64 ( 0 )
successfulPercentage := float64 ( 0 )
failedPercentage := float64 ( 0 )
2019-06-07 13:38:41 +01:00
containedPercentage := float64 ( 0 )
2019-05-31 21:46:25 +01:00
if totalAudited > 0 {
offlinePercentage = float64 ( numOffline ) / float64 ( totalAudited )
successfulPercentage = float64 ( numSuccessful ) / float64 ( totalAudited )
failedPercentage = float64 ( numFailed ) / float64 ( totalAudited )
2019-06-07 13:38:41 +01:00
containedPercentage = float64 ( numContained ) / float64 ( totalAudited )
2019-05-31 21:46:25 +01:00
}
mon . Meter ( "audit_success_nodes_global" ) . Mark ( numSuccessful )
mon . Meter ( "audit_fail_nodes_global" ) . Mark ( numFailed )
mon . Meter ( "audit_offline_nodes_global" ) . Mark ( numOffline )
2019-06-18 13:54:52 +01:00
mon . Meter ( "audit_contained_nodes_global" ) . Mark ( numContained )
2019-05-31 21:46:25 +01:00
mon . Meter ( "audit_total_nodes_global" ) . Mark ( totalAudited )
mon . Meter ( "audit_total_pointer_nodes_global" ) . Mark ( totalInPointer )
2019-06-07 13:38:41 +01:00
mon . IntVal ( "audit_success_nodes" ) . Observe ( int64 ( numSuccessful ) )
mon . IntVal ( "audit_fail_nodes" ) . Observe ( int64 ( numFailed ) )
mon . IntVal ( "audit_offline_nodes" ) . Observe ( int64 ( numOffline ) )
mon . IntVal ( "audit_contained_nodes" ) . Observe ( int64 ( numContained ) )
2019-05-31 21:46:25 +01:00
mon . IntVal ( "audit_total_nodes" ) . Observe ( int64 ( totalAudited ) )
mon . IntVal ( "audit_total_pointer_nodes" ) . Observe ( int64 ( totalInPointer ) )
mon . FloatVal ( "audited_percentage" ) . Observe ( auditedPercentage )
mon . FloatVal ( "audit_offline_percentage" ) . Observe ( offlinePercentage )
mon . FloatVal ( "audit_successful_percentage" ) . Observe ( successfulPercentage )
mon . FloatVal ( "audit_failed_percentage" ) . Observe ( failedPercentage )
2019-06-07 13:38:41 +01:00
mon . FloatVal ( "audit_contained_percentage" ) . Observe ( containedPercentage )
2019-05-31 21:46:25 +01:00
2019-06-04 12:36:27 +01:00
pendingAudits , err := createPendingAudits ( ctx , containedNodes , correctedShares , stripe )
2019-05-23 21:07:19 +01:00
if err != nil {
2019-05-23 23:32:19 +01:00
return & Report {
Successes : successNodes ,
Fails : failedNodes ,
Offlines : offlineNodes ,
2019-05-23 21:07:19 +01:00
} , err
}
2019-03-20 10:54:37 +00:00
2019-05-23 23:32:19 +01:00
return & Report {
Successes : successNodes ,
Fails : failedNodes ,
Offlines : offlineNodes ,
PendingAudits : pendingAudits ,
2019-03-20 10:54:37 +00:00
} , nil
}
2019-05-17 19:48:32 +01:00
// DownloadShares downloads shares from the nodes where remote pieces are located
2019-07-11 21:51:40 +01:00
func ( verifier * Verifier ) DownloadShares ( ctx context . Context , limits [ ] * pb . AddressedOrderLimit , piecePrivateKey storj . PiecePrivateKey , stripeIndex int64 , shareSize int32 ) ( shares map [ int ] Share , err error ) {
2019-03-20 10:54:37 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
shares = make ( map [ int ] Share , len ( limits ) )
2019-06-11 09:00:59 +01:00
ch := make ( chan * Share , len ( limits ) )
2019-03-20 10:54:37 +00:00
for i , limit := range limits {
if limit == nil {
2019-06-11 09:00:59 +01:00
ch <- nil
2019-03-20 10:54:37 +00:00
continue
}
2019-06-11 09:00:59 +01:00
go func ( i int , limit * pb . AddressedOrderLimit ) {
2019-07-11 21:51:40 +01:00
share , err := verifier . GetShare ( ctx , limit , piecePrivateKey , stripeIndex , shareSize , i )
2019-06-11 09:00:59 +01:00
if err != nil {
share = Share {
Error : err ,
PieceNum : i ,
NodeID : limit . GetLimit ( ) . StorageNodeId ,
Data : nil ,
}
2019-03-20 10:54:37 +00:00
}
2019-06-11 09:00:59 +01:00
ch <- & share
} ( i , limit )
}
2019-03-20 10:54:37 +00:00
2019-06-11 09:00:59 +01:00
for range limits {
share := <- ch
if share != nil {
shares [ share . PieceNum ] = * share
}
2019-03-20 10:54:37 +00:00
}
2019-06-11 09:00:59 +01:00
return shares , nil
2019-03-20 10:54:37 +00:00
}
2019-05-27 12:13:47 +01:00
// Reverify reverifies the contained nodes in the stripe
func ( verifier * Verifier ) Reverify ( ctx context . Context , stripe * Stripe ) ( report * Report , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
// result status enum
const (
skipped = iota
success
offline
failed
contained
erred
)
type result struct {
nodeID storj . NodeID
status int
pendingAudit * PendingAudit
err error
}
pieces := stripe . Segment . GetRemote ( ) . GetRemotePieces ( )
ch := make ( chan result , len ( pieces ) )
2019-06-18 13:54:52 +01:00
var containedInSegment int64
2019-05-27 12:13:47 +01:00
for _ , piece := range pieces {
pending , err := verifier . containment . Get ( ctx , piece . NodeId )
if err != nil {
if ErrContainedNotFound . Has ( err ) {
ch <- result { nodeID : piece . NodeId , status : skipped }
continue
}
ch <- result { nodeID : piece . NodeId , status : erred , err : err }
2019-08-29 13:38:26 +01:00
verifier . log . Debug ( "Reverify: error getting from containment db" , zap . String ( "Segment Path" , stripe . SegmentPath ) , zap . Stringer ( "Node ID" , piece . NodeId ) , zap . Error ( err ) )
2019-05-27 12:13:47 +01:00
continue
}
2019-06-18 13:54:52 +01:00
containedInSegment ++
2019-05-27 12:13:47 +01:00
go func ( pending * PendingAudit , piece * pb . RemotePiece ) {
2019-07-11 21:51:40 +01:00
limit , piecePrivateKey , err := verifier . orders . CreateAuditOrderLimit ( ctx , createBucketID ( stripe . SegmentPath ) , pending . NodeID , piece . PieceNum , pending . PieceID , pending . ShareSize )
2019-05-27 12:13:47 +01:00
if err != nil {
2019-06-24 15:46:10 +01:00
if overlay . ErrNodeDisqualified . Has ( err ) {
_ , errDelete := verifier . containment . Delete ( ctx , piece . NodeId )
if errDelete != nil {
2019-08-29 13:38:26 +01:00
verifier . log . Debug ( "Error deleting disqualified node from containment db" , zap . String ( "Segment Path" , stripe . SegmentPath ) , zap . Stringer ( "Node ID" , piece . NodeId ) , zap . Error ( err ) )
2019-06-24 15:46:10 +01:00
err = errs . Combine ( err , errDelete )
}
ch <- result { nodeID : piece . NodeId , status : erred , err : err }
2019-08-29 13:38:26 +01:00
verifier . log . Debug ( "Reverify: order limit not created (disqualified)" , zap . String ( "Segment Path" , stripe . SegmentPath ) , zap . Stringer ( "Node ID" , piece . NodeId ) )
2019-06-24 15:46:10 +01:00
return
}
2019-05-27 12:13:47 +01:00
if overlay . ErrNodeOffline . Has ( err ) {
ch <- result { nodeID : piece . NodeId , status : offline }
2019-08-29 13:38:26 +01:00
verifier . log . Debug ( "Reverify: order limit not created (offline)" , zap . String ( "Segment Path" , stripe . SegmentPath ) , zap . Stringer ( "Node ID" , piece . NodeId ) )
2019-05-27 12:13:47 +01:00
return
}
ch <- result { nodeID : piece . NodeId , status : erred , err : err }
2019-08-29 13:38:26 +01:00
verifier . log . Debug ( "Reverify: error creating order limit" , zap . String ( "Segment Path" , stripe . SegmentPath ) , zap . Stringer ( "Node ID" , piece . NodeId ) , zap . Error ( err ) )
2019-05-27 12:13:47 +01:00
return
}
2019-07-11 21:51:40 +01:00
share , err := verifier . GetShare ( ctx , limit , piecePrivateKey , pending . StripeIndex , pending . ShareSize , int ( piece . PieceNum ) )
2019-06-19 10:02:25 +01:00
// check if the pending audit was deleted while downloading the share
_ , getErr := verifier . containment . Get ( ctx , piece . NodeId )
if getErr != nil {
if ErrContainedNotFound . Has ( getErr ) {
ch <- result { nodeID : piece . NodeId , status : skipped }
2019-08-29 13:38:26 +01:00
verifier . log . Debug ( "Reverify: pending audit deleted during reverification" , zap . String ( "Segment Path" , stripe . SegmentPath ) , zap . Stringer ( "Node ID" , piece . NodeId ) , zap . Error ( getErr ) )
2019-06-19 10:02:25 +01:00
return
}
ch <- result { nodeID : piece . NodeId , status : erred , err : getErr }
2019-08-29 13:38:26 +01:00
verifier . log . Debug ( "Reverify: error getting from containment db" , zap . String ( "Segment Path" , stripe . SegmentPath ) , zap . Stringer ( "Node ID" , piece . NodeId ) , zap . Error ( getErr ) )
2019-06-19 10:02:25 +01:00
return
}
// analyze the error from GetShare
2019-05-27 12:13:47 +01:00
if err != nil {
2019-06-07 13:38:41 +01:00
if transport . Error . Has ( err ) {
2019-06-26 08:38:07 +01:00
if errs . Is ( err , context . DeadlineExceeded ) {
2019-06-07 13:38:41 +01:00
// dial timeout
ch <- result { nodeID : piece . NodeId , status : offline }
2019-08-29 13:38:26 +01:00
verifier . log . Debug ( "Reverify: dial timeout (offline)" , zap . String ( "Segment Path" , stripe . SegmentPath ) , zap . Stringer ( "Node ID" , piece . NodeId ) , zap . Error ( err ) )
2019-06-07 13:38:41 +01:00
return
}
2019-06-26 08:38:07 +01:00
if errs2 . IsRPC ( err , codes . Unknown ) {
2019-06-07 13:38:41 +01:00
// dial failed -- offline node
2019-08-29 13:38:26 +01:00
verifier . log . Debug ( "Reverify: dial failed (offline)" , zap . String ( "Segment Path" , stripe . SegmentPath ) , zap . Stringer ( "Node ID" , piece . NodeId ) , zap . Error ( err ) )
2019-06-07 13:38:41 +01:00
ch <- result { nodeID : piece . NodeId , status : offline }
return
}
// unknown transport error
ch <- result { nodeID : piece . NodeId , status : contained , pendingAudit : pending }
2019-08-29 13:38:26 +01:00
verifier . log . Debug ( "Reverify: unknown transport error (contained)" , zap . String ( "Segment Path" , stripe . SegmentPath ) , zap . Stringer ( "Node ID" , piece . NodeId ) , zap . Error ( err ) )
2019-06-07 13:38:41 +01:00
return
}
2019-06-26 08:38:07 +01:00
if errs2 . IsRPC ( err , codes . NotFound ) {
2019-07-18 19:08:15 +01:00
// Get the original segment pointer in the metainfo
oldPtr , err := verifier . checkIfSegmentDeleted ( ctx , pending . Path , stripe . Segment )
if err != nil {
ch <- result { nodeID : piece . NodeId , status : success }
2019-08-29 13:38:26 +01:00
verifier . log . Debug ( "Reverify: audit source deleted before reverification" , zap . String ( "Segment Path" , stripe . SegmentPath ) , zap . Stringer ( "Node ID" , piece . NodeId ) , zap . Error ( err ) )
2019-07-18 19:08:15 +01:00
return
}
// remove failed audit pieces from the pointer so as to only penalize once for failed audits
err = verifier . removeFailedPieces ( ctx , pending . Path , oldPtr , storj . NodeIDList { pending . NodeID } )
if err != nil {
2019-08-29 13:38:26 +01:00
verifier . log . Warn ( "Reverify: failed to delete failed pieces" , zap . String ( "Segment Path" , stripe . SegmentPath ) , zap . Stringer ( "Node ID" , piece . NodeId ) , zap . Error ( err ) )
2019-07-18 19:08:15 +01:00
}
2019-06-07 13:38:41 +01:00
// missing share
ch <- result { nodeID : piece . NodeId , status : failed }
2019-08-29 13:38:26 +01:00
verifier . log . Debug ( "Reverify: piece not found (audit failed)" , zap . String ( "Segment Path" , stripe . SegmentPath ) , zap . Stringer ( "Node ID" , piece . NodeId ) , zap . Error ( err ) )
2019-06-07 13:38:41 +01:00
return
}
2019-06-26 08:38:07 +01:00
if errs2 . IsRPC ( err , codes . DeadlineExceeded ) {
2019-06-07 13:38:41 +01:00
// dial successful, but download timed out
ch <- result { nodeID : piece . NodeId , status : contained , pendingAudit : pending }
2019-08-29 13:38:26 +01:00
verifier . log . Debug ( "Reverify: download timeout (contained)" , zap . String ( "Segment Path" , stripe . SegmentPath ) , zap . Stringer ( "Node ID" , piece . NodeId ) , zap . Error ( err ) )
2019-06-07 13:38:41 +01:00
return
2019-05-27 12:13:47 +01:00
}
2019-06-07 13:38:41 +01:00
// unknown error
ch <- result { nodeID : piece . NodeId , status : contained , pendingAudit : pending }
2019-08-29 13:38:26 +01:00
verifier . log . Debug ( "Reverify: unknown error (contained)" , zap . String ( "Segment Path" , stripe . SegmentPath ) , zap . Stringer ( "Node ID" , piece . NodeId ) , zap . Error ( err ) )
2019-05-27 12:13:47 +01:00
return
}
downloadedHash := pkcrypto . SHA256Hash ( share . Data )
if bytes . Equal ( downloadedHash , pending . ExpectedShareHash ) {
ch <- result { nodeID : piece . NodeId , status : success }
2019-08-29 13:38:26 +01:00
verifier . log . Debug ( "Reverify: hashes match (audit success)" , zap . String ( "Segment Path" , stripe . SegmentPath ) , zap . Stringer ( "Node ID" , piece . NodeId ) )
2019-05-27 12:13:47 +01:00
} else {
2019-07-18 19:08:15 +01:00
oldPtr , err := verifier . checkIfSegmentDeleted ( ctx , pending . Path , nil )
if err != nil {
ch <- result { nodeID : piece . NodeId , status : success }
2019-08-29 13:38:26 +01:00
verifier . log . Debug ( "Reverify: audit source deleted before reverification" , zap . String ( "Segment Path" , stripe . SegmentPath ) , zap . Stringer ( "Node ID" , piece . NodeId ) , zap . Error ( err ) )
2019-07-18 19:08:15 +01:00
return
}
// remove failed audit pieces from the pointer so as to only penalize once for failed audits
err = verifier . removeFailedPieces ( ctx , pending . Path , oldPtr , storj . NodeIDList { pending . NodeID } )
if err != nil {
2019-08-29 13:38:26 +01:00
verifier . log . Warn ( "Reverify: failed to delete failed pieces" , zap . String ( "Segment Path" , stripe . SegmentPath ) , zap . Stringer ( "Node ID" , piece . NodeId ) , zap . Error ( err ) )
2019-07-18 19:08:15 +01:00
}
2019-08-29 13:38:26 +01:00
verifier . log . Debug ( "Reverify: hashes mismatch (audit failed)" , zap . String ( "Segment Path" , stripe . SegmentPath ) , zap . Stringer ( "Node ID" , piece . NodeId ) ,
2019-06-07 13:38:41 +01:00
zap . Binary ( "expected hash" , pending . ExpectedShareHash ) , zap . Binary ( "downloaded hash" , downloadedHash ) )
2019-07-18 19:08:15 +01:00
ch <- result { nodeID : piece . NodeId , status : failed }
2019-05-27 12:13:47 +01:00
}
} ( pending , piece )
}
report = & Report { }
for range pieces {
result := <- ch
switch result . status {
case success :
report . Successes = append ( report . Successes , result . nodeID )
case offline :
report . Offlines = append ( report . Offlines , result . nodeID )
case failed :
report . Fails = append ( report . Fails , result . nodeID )
case contained :
report . PendingAudits = append ( report . PendingAudits , result . pendingAudit )
case erred :
err = errs . Combine ( err , result . err )
}
}
2019-06-18 13:54:52 +01:00
mon . Meter ( "reverify_successes_global" ) . Mark ( len ( report . Successes ) )
mon . Meter ( "reverify_offlines_global" ) . Mark ( len ( report . Offlines ) )
mon . Meter ( "reverify_fails_global" ) . Mark ( len ( report . Fails ) )
mon . Meter ( "reverify_contained_global" ) . Mark ( len ( report . PendingAudits ) )
mon . IntVal ( "reverify_successes" ) . Observe ( int64 ( len ( report . Successes ) ) )
mon . IntVal ( "reverify_offlines" ) . Observe ( int64 ( len ( report . Offlines ) ) )
mon . IntVal ( "reverify_fails" ) . Observe ( int64 ( len ( report . Fails ) ) )
mon . IntVal ( "reverify_contained" ) . Observe ( int64 ( len ( report . PendingAudits ) ) )
mon . IntVal ( "reverify_contained_in_segment" ) . Observe ( containedInSegment )
mon . IntVal ( "reverify_total_in_segment" ) . Observe ( int64 ( len ( pieces ) ) )
2019-05-27 12:13:47 +01:00
return report , err
}
// GetShare use piece store client to download shares from nodes
2019-07-11 21:51:40 +01:00
func ( verifier * Verifier ) GetShare ( ctx context . Context , limit * pb . AddressedOrderLimit , piecePrivateKey storj . PiecePrivateKey , stripeIndex int64 , shareSize int32 , pieceNum int ) ( share Share , err error ) {
2018-10-09 22:10:37 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-01-10 20:13:40 +00:00
2019-03-19 17:37:26 +00:00
bandwidthMsgSize := shareSize
// determines number of seconds allotted for receiving data from a storage node
2019-03-22 13:14:17 +00:00
timedCtx := ctx
2019-05-17 19:48:32 +01:00
if verifier . minBytesPerSecond > 0 {
maxTransferTime := time . Duration ( int64 ( time . Second ) * int64 ( bandwidthMsgSize ) / verifier . minBytesPerSecond . Int64 ( ) )
2019-06-03 10:17:09 +01:00
if maxTransferTime < verifier . minDownloadTimeout {
maxTransferTime = verifier . minDownloadTimeout
2019-04-03 18:17:29 +01:00
}
2019-03-22 13:14:17 +00:00
var cancel func ( )
timedCtx , cancel = context . WithTimeout ( ctx , maxTransferTime )
defer cancel ( )
}
2019-03-19 17:37:26 +00:00
2019-03-18 10:55:06 +00:00
storageNodeID := limit . GetLimit ( ) . StorageNodeId
2019-06-26 13:14:48 +01:00
log := verifier . log . Named ( storageNodeID . String ( ) )
target := & pb . Node { Id : storageNodeID , Address : limit . GetStorageNodeAddress ( ) }
2018-10-09 22:10:37 +01:00
2019-07-11 21:51:40 +01:00
ps , err := piecestore . Dial ( timedCtx , verifier . transport , target , log , piecestore . DefaultConfig )
2018-10-09 22:10:37 +01:00
if err != nil {
2019-06-26 13:14:48 +01:00
return Share { } , Error . Wrap ( err )
2018-10-09 22:10:37 +01:00
}
2019-04-03 14:42:24 +01:00
defer func ( ) {
err := ps . Close ( )
if err != nil {
2019-05-17 19:48:32 +01:00
verifier . log . Error ( "audit verifier failed to close conn to node: %+v" , zap . Error ( err ) )
2019-04-03 14:42:24 +01:00
}
} ( )
2018-10-09 22:10:37 +01:00
2019-03-18 10:55:06 +00:00
offset := int64 ( shareSize ) * stripeIndex
2018-10-09 22:10:37 +01:00
2019-07-11 21:51:40 +01:00
downloader , err := ps . Download ( timedCtx , limit . GetLimit ( ) , piecePrivateKey , offset , int64 ( shareSize ) )
2018-10-09 22:10:37 +01:00
if err != nil {
2019-03-18 10:55:06 +00:00
return Share { } , err
2018-10-09 22:10:37 +01:00
}
2019-03-18 10:55:06 +00:00
defer func ( ) { err = errs . Combine ( err , downloader . Close ( ) ) } ( )
2018-10-09 22:10:37 +01:00
buf := make ( [ ] byte , shareSize )
2019-03-18 10:55:06 +00:00
_ , err = io . ReadFull ( downloader , buf )
2018-10-09 22:10:37 +01:00
if err != nil {
2019-03-18 10:55:06 +00:00
return Share { } , err
2018-10-09 22:10:37 +01:00
}
2019-03-18 10:55:06 +00:00
return Share {
Error : nil ,
PieceNum : pieceNum ,
2019-06-11 09:00:59 +01:00
NodeID : storageNodeID ,
2019-03-18 10:55:06 +00:00
Data : buf ,
} , nil
2018-10-09 22:10:37 +01:00
}
2019-07-18 19:08:15 +01:00
// removeFailedPieces removes lost pieces from a pointer
func ( verifier * Verifier ) removeFailedPieces ( ctx context . Context , path string , pointer * pb . Pointer , failedNodes storj . NodeIDList ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
if len ( failedNodes ) == 0 {
return nil
}
2019-07-25 17:59:46 +01:00
var toRemove [ ] * pb . RemotePiece
2019-07-18 19:08:15 +01:00
OUTER :
2019-07-25 17:59:46 +01:00
for _ , piece := range pointer . GetRemote ( ) . GetRemotePieces ( ) {
2019-07-18 19:08:15 +01:00
for _ , failedNode := range failedNodes {
if piece . NodeId == failedNode {
2019-07-25 17:59:46 +01:00
toRemove = append ( toRemove , piece )
2019-07-18 19:08:15 +01:00
continue OUTER
}
}
}
// Update the segment pointer in the metainfo
2019-07-25 17:59:46 +01:00
_ , err = verifier . metainfo . UpdatePieces ( ctx , path , pointer , nil , toRemove )
return err
2019-07-18 19:08:15 +01:00
}
2019-06-19 10:02:25 +01:00
// checkIfSegmentDeleted checks if stripe's pointer has been deleted since stripe was selected.
2019-07-18 19:08:15 +01:00
func ( verifier * Verifier ) checkIfSegmentDeleted ( ctx context . Context , segmentPath string , oldPointer * pb . Pointer ) ( newPointer * pb . Pointer , err error ) {
2019-06-19 10:02:25 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-07-18 19:08:15 +01:00
newPointer , err = verifier . metainfo . Get ( ctx , segmentPath )
2019-06-19 10:02:25 +01:00
if err != nil {
if storage . ErrKeyNotFound . Has ( err ) {
2019-08-21 17:30:29 +01:00
return nil , ErrSegmentDeleted . New ( "%q" , segmentPath )
2019-06-19 10:02:25 +01:00
}
2019-07-18 19:08:15 +01:00
return nil , err
2019-06-19 10:02:25 +01:00
}
2019-07-18 19:08:15 +01:00
if oldPointer != nil && oldPointer . CreationDate != newPointer . CreationDate {
2019-08-21 17:30:29 +01:00
return nil , ErrSegmentDeleted . New ( "%q" , segmentPath )
2019-06-19 10:02:25 +01:00
}
2019-07-18 19:08:15 +01:00
return newPointer , nil
2019-06-19 10:02:25 +01:00
}
2018-10-09 22:10:37 +01:00
// auditShares takes the downloaded shares and uses infectious's Correct function to check that they
2019-05-23 21:07:19 +01:00
// haven't been altered. auditShares returns a slice containing the piece numbers of altered shares,
// and a slice of the corrected shares.
func auditShares ( ctx context . Context , required , total int , originals map [ int ] Share ) ( pieceNums [ ] int , corrected [ ] infectious . Share , err error ) {
2018-10-09 22:10:37 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
f , err := infectious . NewFEC ( required , total )
if err != nil {
2019-05-23 21:07:19 +01:00
return nil , nil , err
2018-10-09 22:10:37 +01:00
}
2018-11-07 01:16:43 +00:00
2018-10-09 22:10:37 +01:00
copies , err := makeCopies ( ctx , originals )
if err != nil {
2019-05-23 21:07:19 +01:00
return nil , nil , err
2018-10-09 22:10:37 +01:00
}
err = f . Correct ( copies )
if err != nil {
2019-05-23 21:07:19 +01:00
return nil , nil , err
2018-10-09 22:10:37 +01:00
}
2019-03-19 17:37:26 +00:00
2018-11-28 07:33:17 +00:00
for _ , share := range copies {
if ! bytes . Equal ( originals [ share . Number ] . Data , share . Data ) {
2018-10-09 22:10:37 +01:00
pieceNums = append ( pieceNums , share . Number )
}
}
2019-05-23 21:07:19 +01:00
return pieceNums , copies , nil
2018-10-09 22:10:37 +01:00
}
2019-03-20 10:54:37 +00:00
// makeCopies takes in a map of audit Shares and deep copies their data to a slice of infectious Shares
func makeCopies ( ctx context . Context , originals map [ int ] Share ) ( copies [ ] infectious . Share , err error ) {
2018-10-09 22:10:37 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-03-20 10:54:37 +00:00
copies = make ( [ ] infectious . Share , 0 , len ( originals ) )
for _ , original := range originals {
copies = append ( copies , infectious . Share {
Data : append ( [ ] byte { } , original . Data ... ) ,
Number : original . PieceNum } )
2018-10-16 18:40:34 +01:00
}
2019-03-20 10:54:37 +00:00
return copies , nil
2018-10-16 18:40:34 +01:00
}
2019-08-20 15:23:14 +01:00
// getOfflines nodes returns these storage nodes from pointer which have no
// order limit nor are skipped.
2019-06-07 13:38:41 +01:00
func getOfflineNodes ( pointer * pb . Pointer , limits [ ] * pb . AddressedOrderLimit , skip map [ storj . NodeID ] bool ) storj . NodeIDList {
var offlines storj . NodeIDList
nodesWithLimit := make ( map [ storj . NodeID ] bool , len ( limits ) )
for _ , limit := range limits {
if limit != nil {
nodesWithLimit [ limit . GetLimit ( ) . StorageNodeId ] = true
}
}
for _ , piece := range pointer . GetRemote ( ) . GetRemotePieces ( ) {
if ! nodesWithLimit [ piece . NodeId ] && ! skip [ piece . NodeId ] {
offlines = append ( offlines , piece . NodeId )
}
}
return offlines
}
2019-05-23 21:07:19 +01:00
// getSuccessNodes uses the failed nodes, offline nodes and contained nodes arrays to determine which nodes passed the audit
2019-06-11 09:00:59 +01:00
func getSuccessNodes ( ctx context . Context , shares map [ int ] Share , failedNodes , offlineNodes storj . NodeIDList , containedNodes map [ int ] storj . NodeID ) ( successNodes storj . NodeIDList ) {
2019-06-04 12:36:27 +01:00
defer mon . Task ( ) ( & ctx ) ( nil )
2018-11-29 18:39:27 +00:00
fails := make ( map [ storj . NodeID ] bool )
2018-10-16 18:40:34 +01:00
for _ , fail := range failedNodes {
fails [ fail ] = true
2018-10-09 22:10:37 +01:00
}
2018-10-16 18:40:34 +01:00
for _ , offline := range offlineNodes {
fails [ offline ] = true
}
2019-05-23 21:07:19 +01:00
for _ , contained := range containedNodes {
fails [ contained ] = true
}
2018-10-16 18:40:34 +01:00
2019-06-11 09:00:59 +01:00
for _ , share := range shares {
if ! fails [ share . NodeID ] {
successNodes = append ( successNodes , share . NodeID )
2018-10-16 18:40:34 +01:00
}
}
2019-02-01 14:48:57 +00:00
2018-10-16 18:40:34 +01:00
return successNodes
}
2019-03-28 20:09:23 +00:00
func createBucketID ( path storj . Path ) [ ] byte {
comps := storj . SplitPath ( path )
2019-04-01 21:14:58 +01:00
if len ( comps ) < 3 {
2019-03-28 20:09:23 +00:00
return nil
}
2019-04-01 21:14:58 +01:00
// project_id/bucket_name
return [ ] byte ( storj . JoinPaths ( comps [ 0 ] , comps [ 2 ] ) )
2019-03-28 20:09:23 +00:00
}
2019-05-23 21:07:19 +01:00
2019-06-07 13:38:41 +01:00
func createPendingAudits ( ctx context . Context , containedNodes map [ int ] storj . NodeID , correctedShares [ ] infectious . Share , stripe * Stripe ) ( pending [ ] * PendingAudit , err error ) {
2019-06-04 12:36:27 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-06-07 13:38:41 +01:00
if len ( containedNodes ) == 0 {
2019-05-23 21:07:19 +01:00
return nil , nil
}
redundancy := stripe . Segment . GetRemote ( ) . GetRedundancy ( )
required := int ( redundancy . GetMinReq ( ) )
total := int ( redundancy . GetTotal ( ) )
shareSize := redundancy . GetErasureShareSize ( )
fec , err := infectious . NewFEC ( required , total )
if err != nil {
2019-05-27 12:13:47 +01:00
return nil , Error . Wrap ( err )
2019-05-23 21:07:19 +01:00
}
2019-06-04 12:36:27 +01:00
stripeData , err := rebuildStripe ( ctx , fec , correctedShares , int ( shareSize ) )
2019-05-23 21:07:19 +01:00
if err != nil {
2019-05-27 12:13:47 +01:00
return nil , Error . Wrap ( err )
2019-05-23 21:07:19 +01:00
}
for pieceNum , nodeID := range containedNodes {
share := make ( [ ] byte , shareSize )
err = fec . EncodeSingle ( stripeData , share , pieceNum )
if err != nil {
2019-05-27 12:13:47 +01:00
return nil , Error . Wrap ( err )
2019-05-23 21:07:19 +01:00
}
2019-06-07 13:38:41 +01:00
pending = append ( pending , & PendingAudit {
2019-05-23 21:07:19 +01:00
NodeID : nodeID ,
PieceID : stripe . Segment . GetRemote ( ) . RootPieceId ,
StripeIndex : stripe . Index ,
ShareSize : shareSize ,
ExpectedShareHash : pkcrypto . SHA256Hash ( share ) ,
2019-07-18 19:08:15 +01:00
Path : stripe . SegmentPath ,
2019-05-23 21:07:19 +01:00
} )
}
2019-06-07 13:38:41 +01:00
return pending , nil
2019-05-23 21:07:19 +01:00
}
2019-06-04 12:36:27 +01:00
func rebuildStripe ( ctx context . Context , fec * infectious . FEC , corrected [ ] infectious . Share , shareSize int ) ( _ [ ] byte , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-05-23 21:07:19 +01:00
stripe := make ( [ ] byte , fec . Required ( ) * shareSize )
2019-06-04 12:36:27 +01:00
err = fec . Rebuild ( corrected , func ( share infectious . Share ) {
2019-05-23 21:07:19 +01:00
copy ( stripe [ share . Number * shareSize : ] , share . Data )
} )
if err != nil {
return nil , err
}
return stripe , nil
}