Save hash of expected share from missing audited node (#2025)

This commit is contained in:
Kaloyan Raev 2019-05-23 23:07:19 +03:00 committed by Natalie Villasana
parent b0ca12e31e
commit de8070730a
8 changed files with 171 additions and 60 deletions

View File

@ -30,10 +30,10 @@ var (
type PendingAudit struct {
NodeID storj.NodeID
PieceID storj.PieceID
StripeIndex uint32
ShareSize int64
StripeIndex int64
ShareSize int32
ExpectedShareHash []byte
ReverifyCount uint32
ReverifyCount int32
}
// Containment holds information about pending audits for contained nodes

View File

@ -99,14 +99,14 @@ func TestContainIncrementPendingEntryExists(t *testing.T) {
// expect reverify count for an entry to be 0 after first IncrementPending call
pending, err := planet.Satellites[0].DB.Containment().Get(ctx, info1.NodeID)
require.NoError(t, err)
require.Equal(t, uint32(0), pending.ReverifyCount)
require.EqualValues(t, 0, pending.ReverifyCount)
// expect reverify count to be 1 after second IncrementPending call
err = planet.Satellites[0].DB.Containment().IncrementPending(ctx, info1)
require.NoError(t, err)
pending, err = planet.Satellites[0].DB.Containment().Get(ctx, info1.NodeID)
require.NoError(t, err)
require.Equal(t, uint32(1), pending.ReverifyCount)
require.EqualValues(t, 1, pending.ReverifyCount)
})
}

View File

@ -6,6 +6,8 @@ package audit
import (
"context"
"github.com/zeebo/errs"
"storj.io/storj/pkg/overlay"
"storj.io/storj/pkg/storj"
)
@ -16,8 +18,9 @@ type reporter interface {
// Reporter records audit reports in overlay and implements the reporter interface
type Reporter struct {
overlay *overlay.Cache
maxRetries int
overlay *overlay.Cache
containment Containment
maxRetries int
}
// RecordAuditsInfo is a struct containing arguments/return values for RecordAudits()
@ -25,11 +28,12 @@ type RecordAuditsInfo struct {
SuccessNodeIDs storj.NodeIDList
FailNodeIDs storj.NodeIDList
OfflineNodeIDs storj.NodeIDList
PendingAudits []*PendingAudit
}
// NewReporter instantiates a reporter
func NewReporter(overlay *overlay.Cache, maxRetries int) *Reporter {
return &Reporter{overlay: overlay, maxRetries: maxRetries}
func NewReporter(overlay *overlay.Cache, containment Containment, maxRetries int) *Reporter {
return &Reporter{overlay: overlay, containment: containment, maxRetries: maxRetries}
}
// RecordAudits saves failed audit details to overlay
@ -37,8 +41,9 @@ func (reporter *Reporter) RecordAudits(ctx context.Context, req *RecordAuditsInf
successNodeIDs := req.SuccessNodeIDs
failNodeIDs := req.FailNodeIDs
offlineNodeIDs := req.OfflineNodeIDs
pendingAudits := req.PendingAudits
var errNodeIDs storj.NodeIDList
var errlist errs.Group
retries := 0
for retries < reporter.maxRetries {
@ -46,43 +51,51 @@ func (reporter *Reporter) RecordAudits(ctx context.Context, req *RecordAuditsInf
return nil, nil
}
errNodeIDs = storj.NodeIDList{}
errlist = errs.Group{}
if len(successNodeIDs) > 0 {
successNodeIDs, err = reporter.recordAuditSuccessStatus(ctx, successNodeIDs)
if err != nil {
errNodeIDs = append(errNodeIDs, successNodeIDs...)
errlist.Add(err)
}
}
if len(failNodeIDs) > 0 {
failNodeIDs, err = reporter.recordAuditFailStatus(ctx, failNodeIDs)
if err != nil {
errNodeIDs = append(errNodeIDs, failNodeIDs...)
errlist.Add(err)
}
}
if len(offlineNodeIDs) > 0 {
offlineNodeIDs, err = reporter.recordOfflineStatus(ctx, offlineNodeIDs)
if err != nil {
errNodeIDs = append(errNodeIDs, offlineNodeIDs...)
errlist.Add(err)
}
}
if len(pendingAudits) > 0 {
pendingAudits, err = reporter.recordPendingAudits(ctx, pendingAudits)
if err != nil {
errlist.Add(err)
}
}
retries++
}
if retries >= reporter.maxRetries && len(errNodeIDs) > 0 {
err = errlist.Err()
if retries >= reporter.maxRetries && err != nil {
return &RecordAuditsInfo{
SuccessNodeIDs: successNodeIDs,
FailNodeIDs: failNodeIDs,
OfflineNodeIDs: offlineNodeIDs,
}, Error.New("some nodes failed to be updated in overlay")
PendingAudits: pendingAudits,
}, errs.Combine(Error.New("some nodes failed to be updated in overlay"), err)
}
return nil, nil
}
// recordAuditFailStatus updates nodeIDs in overlay with isup=true, auditsuccess=false
func (reporter *Reporter) recordAuditFailStatus(ctx context.Context, failedAuditNodeIDs storj.NodeIDList) (failed storj.NodeIDList, err error) {
failedIDs := storj.NodeIDList{}
var errlist errs.Group
for _, nodeID := range failedAuditNodeIDs {
_, err := reporter.overlay.UpdateStats(ctx, &overlay.UpdateRequest{
NodeID: nodeID,
@ -90,36 +103,35 @@ func (reporter *Reporter) recordAuditFailStatus(ctx context.Context, failedAudit
AuditSuccess: false,
})
if err != nil {
failedIDs = append(failedIDs, nodeID)
failed = append(failed, nodeID)
errlist.Add(err)
}
}
if len(failedIDs) > 0 {
return failedIDs, Error.New("failed to record some audit fail statuses in overlay")
if len(failed) > 0 {
return failed, errs.Combine(Error.New("failed to record some audit fail statuses in overlay"), errlist.Err())
}
return nil, nil
}
// recordOfflineStatus updates nodeIDs in overlay with isup=false
// TODO: offline nodes should maybe be marked as failing the audit in the future
func (reporter *Reporter) recordOfflineStatus(ctx context.Context, offlineNodeIDs storj.NodeIDList) (failed storj.NodeIDList, err error) {
failedIDs := storj.NodeIDList{}
var errlist errs.Group
for _, nodeID := range offlineNodeIDs {
_, err := reporter.overlay.UpdateUptime(ctx, nodeID, false)
if err != nil {
failedIDs = append(failedIDs, nodeID)
failed = append(failed, nodeID)
errlist.Add(err)
}
}
if len(failedIDs) > 0 {
return failedIDs, Error.New("failed to record some audit offline statuses in overlay")
if len(failed) > 0 {
return failed, errs.Combine(Error.New("failed to record some audit offline statuses in overlay"), errlist.Err())
}
return nil, nil
}
// recordAuditSuccessStatus updates nodeIDs in overlay with isup=true, auditsuccess=true
func (reporter *Reporter) recordAuditSuccessStatus(ctx context.Context, successNodeIDs storj.NodeIDList) (failed storj.NodeIDList, err error) {
failedIDs := storj.NodeIDList{}
var errlist errs.Group
for _, nodeID := range successNodeIDs {
_, err := reporter.overlay.UpdateStats(ctx, &overlay.UpdateRequest{
NodeID: nodeID,
@ -127,11 +139,28 @@ func (reporter *Reporter) recordAuditSuccessStatus(ctx context.Context, successN
AuditSuccess: true,
})
if err != nil {
failedIDs = append(failedIDs, nodeID)
failed = append(failed, nodeID)
errlist.Add(err)
}
}
if len(failedIDs) > 0 {
return failedIDs, Error.New("failed to record some audit success statuses in overlay")
if len(failed) > 0 {
return failed, errs.Combine(Error.New("failed to record some audit success statuses in overlay"), errlist.Err())
}
return nil, nil
}
// recordPendingAudits updates the containment status of nodes with pending audits
func (reporter *Reporter) recordPendingAudits(ctx context.Context, pendingAudits []*PendingAudit) (failed []*PendingAudit, err error) {
var errlist errs.Group
for _, pendingAudit := range pendingAudits {
err := reporter.containment.IncrementPending(ctx, pendingAudit)
if err != nil {
failed = append(failed, pendingAudit)
errlist.Add(err)
}
}
if len(failed) > 0 {
return failed, errs.Combine(Error.New("failed to record some pending audits"), errlist.Err())
}
return nil, nil
}

View File

@ -43,13 +43,13 @@ type Service struct {
// NewService instantiates a Service with access to a Cursor and Verifier
func NewService(log *zap.Logger, config Config, metainfo *metainfo.Service,
orders *orders.Service, transport transport.Client, overlay *overlay.Cache,
identity *identity.FullIdentity) (service *Service, err error) {
containment Containment, identity *identity.FullIdentity) (service *Service, err error) {
return &Service{
log: log,
Cursor: NewCursor(metainfo),
Verifier: NewVerifier(log.Named("audit:verifier"), transport, overlay, orders, identity, config.MinBytesPerSecond),
Reporter: NewReporter(overlay, config.MaxRetriesStatDB),
Reporter: NewReporter(overlay, containment, config.MaxRetriesStatDB),
Loop: *sync2.NewCycle(config.Interval),
}, nil
@ -92,15 +92,15 @@ func (service *Service) process(ctx context.Context) error {
}
}
verifiedNodes, err := service.Verifier.Verify(ctx, stripe)
if err != nil {
return err
verifiedNodes, verifierErr := service.Verifier.Verify(ctx, stripe)
if verifierErr != nil && verifiedNodes == nil {
return verifierErr
}
// TODO(moby) we need to decide if we want to do something with nodes that the reporter failed to update
_, err = service.Reporter.RecordAudits(ctx, verifiedNodes)
if err != nil {
return err
_, reporterErr := service.Reporter.RecordAudits(ctx, verifiedNodes)
if reporterErr != nil {
return errs.Combine(verifierErr, reporterErr)
}
return nil

View File

@ -9,6 +9,7 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
@ -77,8 +78,12 @@ func TestGetShareTimeout(t *testing.T) {
require.NoError(t, err)
}
_, err = verifier.Verify(ctx, stripe)
require.NoError(t, err)
verifiedNodes, err := verifier.Verify(ctx, stripe)
assert.Error(t, err)
assert.NotNil(t, verifiedNodes)
for i := 0; i < k; i++ {
assert.True(t, contains(verifiedNodes.OfflineNodeIDs, pieces[i].NodeId))
}
})
}
@ -90,3 +95,12 @@ func stopStorageNode(planet *testplanet.Planet, nodeID storj.NodeID) error {
}
return fmt.Errorf("no such node: %s", nodeID.String())
}
func contains(nodes storj.NodeIDList, nodeID storj.NodeID) bool {
for _, n := range nodes {
if n == nodeID {
return true
}
}
return false
}

View File

@ -19,6 +19,7 @@ import (
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/overlay"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/pkcrypto"
"storj.io/storj/pkg/storj"
"storj.io/storj/pkg/transport"
"storj.io/storj/satellite/orders"
@ -71,12 +72,14 @@ func (verifier *Verifier) Verify(ctx context.Context, stripe *Stripe) (verifiedN
var offlineNodes storj.NodeIDList
var failedNodes storj.NodeIDList
containedNodes := make(map[int]storj.NodeID)
sharesToAudit := make(map[int]Share)
for pieceNum, share := range shares {
if shares[pieceNum].Error != nil {
// TODO(kaloyan): we need to check the logic here if we correctly identify offline nodes from those that didn't respond.
if shares[pieceNum].Error == context.DeadlineExceeded || !transport.Error.Has(shares[pieceNum].Error) {
failedNodes = append(failedNodes, nodes[pieceNum])
containedNodes[pieceNum] = nodes[pieceNum]
} else {
offlineNodes = append(offlineNodes, nodes[pieceNum])
}
@ -90,27 +93,37 @@ func (verifier *Verifier) Verify(ctx context.Context, stripe *Stripe) (verifiedN
if len(sharesToAudit) < required {
return &RecordAuditsInfo{
SuccessNodeIDs: nil,
FailNodeIDs: failedNodes,
OfflineNodeIDs: offlineNodes,
}, nil
}, Error.New("not enough shares for successful audit: got %d, required %d", len(sharesToAudit), required)
}
pieceNums, err := auditShares(ctx, required, total, sharesToAudit)
pieceNums, correctedShares, err := auditShares(ctx, required, total, sharesToAudit)
if err != nil {
return nil, err
return &RecordAuditsInfo{
OfflineNodeIDs: offlineNodes,
}, err
}
for _, pieceNum := range pieceNums {
failedNodes = append(failedNodes, nodes[pieceNum])
}
successNodes := getSuccessNodes(ctx, nodes, failedNodes, offlineNodes)
successNodes := getSuccessNodes(ctx, nodes, failedNodes, offlineNodes, containedNodes)
pendingAudits, err := createPendingAudits(containedNodes, correctedShares, stripe)
if err != nil {
return &RecordAuditsInfo{
SuccessNodeIDs: successNodes,
FailNodeIDs: failedNodes,
OfflineNodeIDs: offlineNodes,
}, err
}
return &RecordAuditsInfo{
SuccessNodeIDs: successNodes,
FailNodeIDs: failedNodes,
OfflineNodeIDs: offlineNodes,
PendingAudits: pendingAudits,
}, nil
}
@ -204,22 +217,23 @@ func (verifier *Verifier) getShare(ctx context.Context, limit *pb.AddressedOrder
}
// auditShares takes the downloaded shares and uses infectious's Correct function to check that they
// haven't been altered. auditShares returns a slice containing the piece numbers of altered shares.
func auditShares(ctx context.Context, required, total int, originals map[int]Share) (pieceNums []int, err error) {
// haven't been altered. auditShares returns a slice containing the piece numbers of altered shares,
// and a slice of the corrected shares.
func auditShares(ctx context.Context, required, total int, originals map[int]Share) (pieceNums []int, corrected []infectious.Share, err error) {
defer mon.Task()(&ctx)(&err)
f, err := infectious.NewFEC(required, total)
if err != nil {
return nil, err
return nil, nil, err
}
copies, err := makeCopies(ctx, originals)
if err != nil {
return nil, err
return nil, nil, err
}
err = f.Correct(copies)
if err != nil {
return nil, err
return nil, nil, err
}
for _, share := range copies {
@ -227,7 +241,7 @@ func auditShares(ctx context.Context, required, total int, originals map[int]Sha
pieceNums = append(pieceNums, share.Number)
}
}
return pieceNums, nil
return pieceNums, copies, nil
}
// makeCopies takes in a map of audit Shares and deep copies their data to a slice of infectious Shares
@ -242,8 +256,8 @@ func makeCopies(ctx context.Context, originals map[int]Share) (copies []infectio
return copies, nil
}
// getSuccessNodes uses the failed nodes and offline nodes arrays to determine which nodes passed the audit
func getSuccessNodes(ctx context.Context, nodes map[int]storj.NodeID, failedNodes, offlineNodes storj.NodeIDList) (successNodes storj.NodeIDList) {
// getSuccessNodes uses the failed nodes, offline nodes and contained nodes arrays to determine which nodes passed the audit
func getSuccessNodes(ctx context.Context, nodes map[int]storj.NodeID, failedNodes, offlineNodes storj.NodeIDList, containedNodes map[int]storj.NodeID) (successNodes storj.NodeIDList) {
fails := make(map[storj.NodeID]bool)
for _, fail := range failedNodes {
fails[fail] = true
@ -251,6 +265,9 @@ func getSuccessNodes(ctx context.Context, nodes map[int]storj.NodeID, failedNode
for _, offline := range offlineNodes {
fails[offline] = true
}
for _, contained := range containedNodes {
fails[contained] = true
}
for _, node := range nodes {
if !fails[node] {
@ -269,3 +286,53 @@ func createBucketID(path storj.Path) []byte {
// project_id/bucket_name
return []byte(storj.JoinPaths(comps[0], comps[2]))
}
func createPendingAudits(containedNodes map[int]storj.NodeID, correctedShares []infectious.Share, stripe *Stripe) ([]*PendingAudit, error) {
if len(containedNodes) > 0 {
return nil, nil
}
redundancy := stripe.Segment.GetRemote().GetRedundancy()
required := int(redundancy.GetMinReq())
total := int(redundancy.GetTotal())
shareSize := redundancy.GetErasureShareSize()
fec, err := infectious.NewFEC(required, total)
if err != nil {
return nil, err
}
stripeData, err := rebuildStripe(fec, correctedShares, int(shareSize))
if err != nil {
return nil, err
}
var pendingAudits []*PendingAudit
for pieceNum, nodeID := range containedNodes {
share := make([]byte, shareSize)
err = fec.EncodeSingle(stripeData, share, pieceNum)
if err != nil {
return nil, err
}
pendingAudits = append(pendingAudits, &PendingAudit{
NodeID: nodeID,
PieceID: stripe.Segment.GetRemote().RootPieceId,
StripeIndex: stripe.Index,
ShareSize: shareSize,
ExpectedShareHash: pkcrypto.SHA256Hash(share),
})
}
return pendingAudits, nil
}
func rebuildStripe(fec *infectious.FEC, corrected []infectious.Share, shareSize int) ([]byte, error) {
stripe := make([]byte, fec.Required()*shareSize)
err := fec.Rebuild(corrected, func(share infectious.Share) {
copy(stripe[share.Number*shareSize:], share.Data)
})
if err != nil {
return nil, err
}
return stripe, nil
}

View File

@ -409,6 +409,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config, ve
peer.Orders.Service,
peer.Transport,
peer.Overlay.Service,
peer.DB.Containment(),
peer.Identity,
)
if err != nil {

View File

@ -128,10 +128,10 @@ func convertDBPending(info *dbx.PendingAudits) (*audit.PendingAudit, error) {
pending := &audit.PendingAudit{
NodeID: nodeID,
PieceID: pieceID,
StripeIndex: uint32(info.StripeIndex),
ShareSize: info.ShareSize,
StripeIndex: info.StripeIndex,
ShareSize: int32(info.ShareSize),
ExpectedShareHash: info.ExpectedShareHash,
ReverifyCount: uint32(info.ReverifyCount),
ReverifyCount: int32(info.ReverifyCount),
}
return pending, nil
}