diff --git a/pkg/audit/containment.go b/pkg/audit/containment.go index e263090ea..ea9a768ed 100644 --- a/pkg/audit/containment.go +++ b/pkg/audit/containment.go @@ -30,10 +30,10 @@ var ( type PendingAudit struct { NodeID storj.NodeID PieceID storj.PieceID - StripeIndex uint32 - ShareSize int64 + StripeIndex int64 + ShareSize int32 ExpectedShareHash []byte - ReverifyCount uint32 + ReverifyCount int32 } // Containment holds information about pending audits for contained nodes diff --git a/pkg/audit/containment_test.go b/pkg/audit/containment_test.go index 57847b6ac..c917a9d22 100644 --- a/pkg/audit/containment_test.go +++ b/pkg/audit/containment_test.go @@ -99,14 +99,14 @@ func TestContainIncrementPendingEntryExists(t *testing.T) { // expect reverify count for an entry to be 0 after first IncrementPending call pending, err := planet.Satellites[0].DB.Containment().Get(ctx, info1.NodeID) require.NoError(t, err) - require.Equal(t, uint32(0), pending.ReverifyCount) + require.EqualValues(t, 0, pending.ReverifyCount) // expect reverify count to be 1 after second IncrementPending call err = planet.Satellites[0].DB.Containment().IncrementPending(ctx, info1) require.NoError(t, err) pending, err = planet.Satellites[0].DB.Containment().Get(ctx, info1.NodeID) require.NoError(t, err) - require.Equal(t, uint32(1), pending.ReverifyCount) + require.EqualValues(t, 1, pending.ReverifyCount) }) } diff --git a/pkg/audit/reporter.go b/pkg/audit/reporter.go index a0f179f4a..1924e4c90 100644 --- a/pkg/audit/reporter.go +++ b/pkg/audit/reporter.go @@ -6,6 +6,8 @@ package audit import ( "context" + "github.com/zeebo/errs" + "storj.io/storj/pkg/overlay" "storj.io/storj/pkg/storj" ) @@ -16,8 +18,9 @@ type reporter interface { // Reporter records audit reports in overlay and implements the reporter interface type Reporter struct { - overlay *overlay.Cache - maxRetries int + overlay *overlay.Cache + containment Containment + maxRetries int } // RecordAuditsInfo is a struct containing arguments/return values for RecordAudits() @@ -25,11 +28,12 @@ type RecordAuditsInfo struct { SuccessNodeIDs storj.NodeIDList FailNodeIDs storj.NodeIDList OfflineNodeIDs storj.NodeIDList + PendingAudits []*PendingAudit } // NewReporter instantiates a reporter -func NewReporter(overlay *overlay.Cache, maxRetries int) *Reporter { - return &Reporter{overlay: overlay, maxRetries: maxRetries} +func NewReporter(overlay *overlay.Cache, containment Containment, maxRetries int) *Reporter { + return &Reporter{overlay: overlay, containment: containment, maxRetries: maxRetries} } // RecordAudits saves failed audit details to overlay @@ -37,8 +41,9 @@ func (reporter *Reporter) RecordAudits(ctx context.Context, req *RecordAuditsInf successNodeIDs := req.SuccessNodeIDs failNodeIDs := req.FailNodeIDs offlineNodeIDs := req.OfflineNodeIDs + pendingAudits := req.PendingAudits - var errNodeIDs storj.NodeIDList + var errlist errs.Group retries := 0 for retries < reporter.maxRetries { @@ -46,43 +51,51 @@ func (reporter *Reporter) RecordAudits(ctx context.Context, req *RecordAuditsInf return nil, nil } - errNodeIDs = storj.NodeIDList{} + errlist = errs.Group{} if len(successNodeIDs) > 0 { successNodeIDs, err = reporter.recordAuditSuccessStatus(ctx, successNodeIDs) if err != nil { - errNodeIDs = append(errNodeIDs, successNodeIDs...) + errlist.Add(err) } } if len(failNodeIDs) > 0 { failNodeIDs, err = reporter.recordAuditFailStatus(ctx, failNodeIDs) if err != nil { - errNodeIDs = append(errNodeIDs, failNodeIDs...) + errlist.Add(err) } } if len(offlineNodeIDs) > 0 { offlineNodeIDs, err = reporter.recordOfflineStatus(ctx, offlineNodeIDs) if err != nil { - errNodeIDs = append(errNodeIDs, offlineNodeIDs...) + errlist.Add(err) + } + } + if len(pendingAudits) > 0 { + pendingAudits, err = reporter.recordPendingAudits(ctx, pendingAudits) + if err != nil { + errlist.Add(err) } } retries++ } - if retries >= reporter.maxRetries && len(errNodeIDs) > 0 { + + err = errlist.Err() + if retries >= reporter.maxRetries && err != nil { return &RecordAuditsInfo{ SuccessNodeIDs: successNodeIDs, FailNodeIDs: failNodeIDs, OfflineNodeIDs: offlineNodeIDs, - }, Error.New("some nodes failed to be updated in overlay") + PendingAudits: pendingAudits, + }, errs.Combine(Error.New("some nodes failed to be updated in overlay"), err) } return nil, nil } // recordAuditFailStatus updates nodeIDs in overlay with isup=true, auditsuccess=false func (reporter *Reporter) recordAuditFailStatus(ctx context.Context, failedAuditNodeIDs storj.NodeIDList) (failed storj.NodeIDList, err error) { - failedIDs := storj.NodeIDList{} - + var errlist errs.Group for _, nodeID := range failedAuditNodeIDs { _, err := reporter.overlay.UpdateStats(ctx, &overlay.UpdateRequest{ NodeID: nodeID, @@ -90,36 +103,35 @@ func (reporter *Reporter) recordAuditFailStatus(ctx context.Context, failedAudit AuditSuccess: false, }) if err != nil { - failedIDs = append(failedIDs, nodeID) + failed = append(failed, nodeID) + errlist.Add(err) } } - if len(failedIDs) > 0 { - return failedIDs, Error.New("failed to record some audit fail statuses in overlay") + if len(failed) > 0 { + return failed, errs.Combine(Error.New("failed to record some audit fail statuses in overlay"), errlist.Err()) } return nil, nil } // recordOfflineStatus updates nodeIDs in overlay with isup=false -// TODO: offline nodes should maybe be marked as failing the audit in the future func (reporter *Reporter) recordOfflineStatus(ctx context.Context, offlineNodeIDs storj.NodeIDList) (failed storj.NodeIDList, err error) { - failedIDs := storj.NodeIDList{} - + var errlist errs.Group for _, nodeID := range offlineNodeIDs { _, err := reporter.overlay.UpdateUptime(ctx, nodeID, false) if err != nil { - failedIDs = append(failedIDs, nodeID) + failed = append(failed, nodeID) + errlist.Add(err) } } - if len(failedIDs) > 0 { - return failedIDs, Error.New("failed to record some audit offline statuses in overlay") + if len(failed) > 0 { + return failed, errs.Combine(Error.New("failed to record some audit offline statuses in overlay"), errlist.Err()) } return nil, nil } // recordAuditSuccessStatus updates nodeIDs in overlay with isup=true, auditsuccess=true func (reporter *Reporter) recordAuditSuccessStatus(ctx context.Context, successNodeIDs storj.NodeIDList) (failed storj.NodeIDList, err error) { - failedIDs := storj.NodeIDList{} - + var errlist errs.Group for _, nodeID := range successNodeIDs { _, err := reporter.overlay.UpdateStats(ctx, &overlay.UpdateRequest{ NodeID: nodeID, @@ -127,11 +139,28 @@ func (reporter *Reporter) recordAuditSuccessStatus(ctx context.Context, successN AuditSuccess: true, }) if err != nil { - failedIDs = append(failedIDs, nodeID) + failed = append(failed, nodeID) + errlist.Add(err) } } - if len(failedIDs) > 0 { - return failedIDs, Error.New("failed to record some audit success statuses in overlay") + if len(failed) > 0 { + return failed, errs.Combine(Error.New("failed to record some audit success statuses in overlay"), errlist.Err()) + } + return nil, nil +} + +// recordPendingAudits updates the containment status of nodes with pending audits +func (reporter *Reporter) recordPendingAudits(ctx context.Context, pendingAudits []*PendingAudit) (failed []*PendingAudit, err error) { + var errlist errs.Group + for _, pendingAudit := range pendingAudits { + err := reporter.containment.IncrementPending(ctx, pendingAudit) + if err != nil { + failed = append(failed, pendingAudit) + errlist.Add(err) + } + } + if len(failed) > 0 { + return failed, errs.Combine(Error.New("failed to record some pending audits"), errlist.Err()) } return nil, nil } diff --git a/pkg/audit/service.go b/pkg/audit/service.go index 29b820605..eb454ded6 100644 --- a/pkg/audit/service.go +++ b/pkg/audit/service.go @@ -43,13 +43,13 @@ type Service struct { // NewService instantiates a Service with access to a Cursor and Verifier func NewService(log *zap.Logger, config Config, metainfo *metainfo.Service, orders *orders.Service, transport transport.Client, overlay *overlay.Cache, - identity *identity.FullIdentity) (service *Service, err error) { + containment Containment, identity *identity.FullIdentity) (service *Service, err error) { return &Service{ log: log, Cursor: NewCursor(metainfo), Verifier: NewVerifier(log.Named("audit:verifier"), transport, overlay, orders, identity, config.MinBytesPerSecond), - Reporter: NewReporter(overlay, config.MaxRetriesStatDB), + Reporter: NewReporter(overlay, containment, config.MaxRetriesStatDB), Loop: *sync2.NewCycle(config.Interval), }, nil @@ -92,15 +92,15 @@ func (service *Service) process(ctx context.Context) error { } } - verifiedNodes, err := service.Verifier.Verify(ctx, stripe) - if err != nil { - return err + verifiedNodes, verifierErr := service.Verifier.Verify(ctx, stripe) + if verifierErr != nil && verifiedNodes == nil { + return verifierErr } // TODO(moby) we need to decide if we want to do something with nodes that the reporter failed to update - _, err = service.Reporter.RecordAudits(ctx, verifiedNodes) - if err != nil { - return err + _, reporterErr := service.Reporter.RecordAudits(ctx, verifiedNodes) + if reporterErr != nil { + return errs.Combine(verifierErr, reporterErr) } return nil diff --git a/pkg/audit/timeout_test.go b/pkg/audit/timeout_test.go index 95006b268..41ecf329a 100644 --- a/pkg/audit/timeout_test.go +++ b/pkg/audit/timeout_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -77,8 +78,12 @@ func TestGetShareTimeout(t *testing.T) { require.NoError(t, err) } - _, err = verifier.Verify(ctx, stripe) - require.NoError(t, err) + verifiedNodes, err := verifier.Verify(ctx, stripe) + assert.Error(t, err) + assert.NotNil(t, verifiedNodes) + for i := 0; i < k; i++ { + assert.True(t, contains(verifiedNodes.OfflineNodeIDs, pieces[i].NodeId)) + } }) } @@ -90,3 +95,12 @@ func stopStorageNode(planet *testplanet.Planet, nodeID storj.NodeID) error { } return fmt.Errorf("no such node: %s", nodeID.String()) } + +func contains(nodes storj.NodeIDList, nodeID storj.NodeID) bool { + for _, n := range nodes { + if n == nodeID { + return true + } + } + return false +} diff --git a/pkg/audit/verifier.go b/pkg/audit/verifier.go index ca9eea19b..a2efb5e7c 100644 --- a/pkg/audit/verifier.go +++ b/pkg/audit/verifier.go @@ -19,6 +19,7 @@ import ( "storj.io/storj/pkg/identity" "storj.io/storj/pkg/overlay" "storj.io/storj/pkg/pb" + "storj.io/storj/pkg/pkcrypto" "storj.io/storj/pkg/storj" "storj.io/storj/pkg/transport" "storj.io/storj/satellite/orders" @@ -71,12 +72,14 @@ func (verifier *Verifier) Verify(ctx context.Context, stripe *Stripe) (verifiedN var offlineNodes storj.NodeIDList var failedNodes storj.NodeIDList + containedNodes := make(map[int]storj.NodeID) sharesToAudit := make(map[int]Share) for pieceNum, share := range shares { if shares[pieceNum].Error != nil { + // TODO(kaloyan): we need to check the logic here if we correctly identify offline nodes from those that didn't respond. if shares[pieceNum].Error == context.DeadlineExceeded || !transport.Error.Has(shares[pieceNum].Error) { - failedNodes = append(failedNodes, nodes[pieceNum]) + containedNodes[pieceNum] = nodes[pieceNum] } else { offlineNodes = append(offlineNodes, nodes[pieceNum]) } @@ -90,27 +93,37 @@ func (verifier *Verifier) Verify(ctx context.Context, stripe *Stripe) (verifiedN if len(sharesToAudit) < required { return &RecordAuditsInfo{ - SuccessNodeIDs: nil, - FailNodeIDs: failedNodes, OfflineNodeIDs: offlineNodes, - }, nil + }, Error.New("not enough shares for successful audit: got %d, required %d", len(sharesToAudit), required) } - pieceNums, err := auditShares(ctx, required, total, sharesToAudit) + pieceNums, correctedShares, err := auditShares(ctx, required, total, sharesToAudit) if err != nil { - return nil, err + return &RecordAuditsInfo{ + OfflineNodeIDs: offlineNodes, + }, err } for _, pieceNum := range pieceNums { failedNodes = append(failedNodes, nodes[pieceNum]) } - successNodes := getSuccessNodes(ctx, nodes, failedNodes, offlineNodes) + successNodes := getSuccessNodes(ctx, nodes, failedNodes, offlineNodes, containedNodes) + + pendingAudits, err := createPendingAudits(containedNodes, correctedShares, stripe) + if err != nil { + return &RecordAuditsInfo{ + SuccessNodeIDs: successNodes, + FailNodeIDs: failedNodes, + OfflineNodeIDs: offlineNodes, + }, err + } return &RecordAuditsInfo{ SuccessNodeIDs: successNodes, FailNodeIDs: failedNodes, OfflineNodeIDs: offlineNodes, + PendingAudits: pendingAudits, }, nil } @@ -204,22 +217,23 @@ func (verifier *Verifier) getShare(ctx context.Context, limit *pb.AddressedOrder } // auditShares takes the downloaded shares and uses infectious's Correct function to check that they -// haven't been altered. auditShares returns a slice containing the piece numbers of altered shares. -func auditShares(ctx context.Context, required, total int, originals map[int]Share) (pieceNums []int, err error) { +// haven't been altered. auditShares returns a slice containing the piece numbers of altered shares, +// and a slice of the corrected shares. +func auditShares(ctx context.Context, required, total int, originals map[int]Share) (pieceNums []int, corrected []infectious.Share, err error) { defer mon.Task()(&ctx)(&err) f, err := infectious.NewFEC(required, total) if err != nil { - return nil, err + return nil, nil, err } copies, err := makeCopies(ctx, originals) if err != nil { - return nil, err + return nil, nil, err } err = f.Correct(copies) if err != nil { - return nil, err + return nil, nil, err } for _, share := range copies { @@ -227,7 +241,7 @@ func auditShares(ctx context.Context, required, total int, originals map[int]Sha pieceNums = append(pieceNums, share.Number) } } - return pieceNums, nil + return pieceNums, copies, nil } // makeCopies takes in a map of audit Shares and deep copies their data to a slice of infectious Shares @@ -242,8 +256,8 @@ func makeCopies(ctx context.Context, originals map[int]Share) (copies []infectio return copies, nil } -// getSuccessNodes uses the failed nodes and offline nodes arrays to determine which nodes passed the audit -func getSuccessNodes(ctx context.Context, nodes map[int]storj.NodeID, failedNodes, offlineNodes storj.NodeIDList) (successNodes storj.NodeIDList) { +// getSuccessNodes uses the failed nodes, offline nodes and contained nodes arrays to determine which nodes passed the audit +func getSuccessNodes(ctx context.Context, nodes map[int]storj.NodeID, failedNodes, offlineNodes storj.NodeIDList, containedNodes map[int]storj.NodeID) (successNodes storj.NodeIDList) { fails := make(map[storj.NodeID]bool) for _, fail := range failedNodes { fails[fail] = true @@ -251,6 +265,9 @@ func getSuccessNodes(ctx context.Context, nodes map[int]storj.NodeID, failedNode for _, offline := range offlineNodes { fails[offline] = true } + for _, contained := range containedNodes { + fails[contained] = true + } for _, node := range nodes { if !fails[node] { @@ -269,3 +286,53 @@ func createBucketID(path storj.Path) []byte { // project_id/bucket_name return []byte(storj.JoinPaths(comps[0], comps[2])) } + +func createPendingAudits(containedNodes map[int]storj.NodeID, correctedShares []infectious.Share, stripe *Stripe) ([]*PendingAudit, error) { + if len(containedNodes) > 0 { + return nil, nil + } + + redundancy := stripe.Segment.GetRemote().GetRedundancy() + required := int(redundancy.GetMinReq()) + total := int(redundancy.GetTotal()) + shareSize := redundancy.GetErasureShareSize() + + fec, err := infectious.NewFEC(required, total) + if err != nil { + return nil, err + } + + stripeData, err := rebuildStripe(fec, correctedShares, int(shareSize)) + if err != nil { + return nil, err + } + + var pendingAudits []*PendingAudit + for pieceNum, nodeID := range containedNodes { + share := make([]byte, shareSize) + err = fec.EncodeSingle(stripeData, share, pieceNum) + if err != nil { + return nil, err + } + pendingAudits = append(pendingAudits, &PendingAudit{ + NodeID: nodeID, + PieceID: stripe.Segment.GetRemote().RootPieceId, + StripeIndex: stripe.Index, + ShareSize: shareSize, + ExpectedShareHash: pkcrypto.SHA256Hash(share), + }) + } + + return pendingAudits, nil +} + +func rebuildStripe(fec *infectious.FEC, corrected []infectious.Share, shareSize int) ([]byte, error) { + stripe := make([]byte, fec.Required()*shareSize) + err := fec.Rebuild(corrected, func(share infectious.Share) { + copy(stripe[share.Number*shareSize:], share.Data) + }) + if err != nil { + return nil, err + } + return stripe, nil +} diff --git a/satellite/peer.go b/satellite/peer.go index 12968efe1..dd5357a23 100644 --- a/satellite/peer.go +++ b/satellite/peer.go @@ -409,6 +409,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config, ve peer.Orders.Service, peer.Transport, peer.Overlay.Service, + peer.DB.Containment(), peer.Identity, ) if err != nil { diff --git a/satellite/satellitedb/containment.go b/satellite/satellitedb/containment.go index 78c79762f..638f7b56d 100644 --- a/satellite/satellitedb/containment.go +++ b/satellite/satellitedb/containment.go @@ -128,10 +128,10 @@ func convertDBPending(info *dbx.PendingAudits) (*audit.PendingAudit, error) { pending := &audit.PendingAudit{ NodeID: nodeID, PieceID: pieceID, - StripeIndex: uint32(info.StripeIndex), - ShareSize: info.ShareSize, + StripeIndex: info.StripeIndex, + ShareSize: int32(info.ShareSize), ExpectedShareHash: info.ExpectedShareHash, - ReverifyCount: uint32(info.ReverifyCount), + ReverifyCount: int32(info.ReverifyCount), } return pending, nil }