diff --git a/satellite/audit/containment_test.go b/satellite/audit/containment_test.go index 1e34273c6..ccd343045 100644 --- a/satellite/audit/containment_test.go +++ b/satellite/audit/containment_test.go @@ -14,6 +14,7 @@ import ( "storj.io/common/testrand" "storj.io/storj/private/testplanet" "storj.io/storj/satellite/audit" + "storj.io/storj/satellite/overlay" "storj.io/storj/satellite/reputation" ) @@ -126,7 +127,7 @@ func TestContainUpdateStats(t *testing.T) { require.NoError(t, err) // update node stats - err = planet.Satellites[0].Reputation.Service.ApplyAudit(ctx, info1.NodeID, reputation.AuditSuccess) + err = planet.Satellites[0].Reputation.Service.ApplyAudit(ctx, info1.NodeID, overlay.ReputationStatus{}, reputation.AuditSuccess) require.NoError(t, err) // check contained flag set to false diff --git a/satellite/audit/disqualification_test.go b/satellite/audit/disqualification_test.go index ce36da18e..6f911750e 100644 --- a/satellite/audit/disqualification_test.go +++ b/satellite/audit/disqualification_test.go @@ -233,7 +233,9 @@ func TestDisqualifiedNodeRemainsDisqualified(t *testing.T) { require.NoError(t, err) assert.True(t, isDisqualified(t, ctx, satellitePeer, disqualifiedNode.ID())) - err = satellitePeer.Reputation.Service.ApplyAudit(ctx, disqualifiedNode.ID(), reputation.AuditSuccess) + node, err := satellitePeer.Overlay.Service.Get(ctx, disqualifiedNode.ID()) + require.NoError(t, err) + err = satellitePeer.Reputation.Service.ApplyAudit(ctx, disqualifiedNode.ID(), overlay.ReputationStatus{Disqualified: node.Disqualified}, reputation.AuditSuccess) require.NoError(t, err) assert.True(t, isDisqualified(t, ctx, satellitePeer, disqualifiedNode.ID())) }) diff --git a/satellite/audit/reporter.go b/satellite/audit/reporter.go index 2344a40aa..4fc4bf5e0 100644 --- a/satellite/audit/reporter.go +++ b/satellite/audit/reporter.go @@ -10,6 +10,7 @@ import ( "go.uber.org/zap" "storj.io/common/storj" + "storj.io/storj/satellite/overlay" "storj.io/storj/satellite/reputation" ) @@ -28,13 +29,14 @@ type Reporter struct { // It records whether an audit is able to be completed, the total number of // pieces a given audit has conducted for, lists for nodes that // succeeded, failed, were offline, have pending audits, or failed for unknown -// reasons. +// reasons and their current reputation status. type Report struct { - Successes storj.NodeIDList - Fails storj.NodeIDList - Offlines storj.NodeIDList - PendingAudits []*PendingAudit - Unknown storj.NodeIDList + Successes storj.NodeIDList + Fails storj.NodeIDList + Offlines storj.NodeIDList + PendingAudits []*PendingAudit + Unknown storj.NodeIDList + NodesReputation map[storj.NodeID]overlay.ReputationStatus } // NewReporter instantiates a reporter. @@ -69,6 +71,7 @@ func (reporter *Reporter) RecordAudits(ctx context.Context, req Report) (_ Repor ) var errlist errs.Group + nodesReputation := req.NodesReputation tries := 0 for tries <= reporter.maxRetries { @@ -79,31 +82,31 @@ func (reporter *Reporter) RecordAudits(ctx context.Context, req Report) (_ Repor errlist = errs.Group{} if len(successes) > 0 { - successes, err = reporter.recordAuditSuccessStatus(ctx, successes) + successes, err = reporter.recordAuditSuccessStatus(ctx, successes, nodesReputation) if err != nil { errlist.Add(err) } } if len(fails) > 0 { - fails, err = reporter.recordAuditFailStatus(ctx, fails) + fails, err = reporter.recordAuditFailStatus(ctx, fails, nodesReputation) if err != nil { errlist.Add(err) } } if len(unknowns) > 0 { - unknowns, err = reporter.recordAuditUnknownStatus(ctx, unknowns) + unknowns, err = reporter.recordAuditUnknownStatus(ctx, unknowns, nodesReputation) if err != nil { errlist.Add(err) } } if len(offlines) > 0 { - offlines, err = reporter.recordOfflineStatus(ctx, offlines) + offlines, err = reporter.recordOfflineStatus(ctx, offlines, nodesReputation) if err != nil { errlist.Add(err) } } if len(pendingAudits) > 0 { - pendingAudits, err = reporter.recordPendingAudits(ctx, pendingAudits) + pendingAudits, err = reporter.recordPendingAudits(ctx, pendingAudits, nodesReputation) if err != nil { errlist.Add(err) } @@ -125,68 +128,70 @@ func (reporter *Reporter) RecordAudits(ctx context.Context, req Report) (_ Repor return Report{}, nil } +// TODO record* methods can be consolidated to reduce code duplication // recordAuditFailStatus updates nodeIDs in overlay with isup=true, auditoutcome=fail. -func (reporter *Reporter) recordAuditFailStatus(ctx context.Context, failedAuditNodeIDs storj.NodeIDList) (failed storj.NodeIDList, err error) { +func (reporter *Reporter) recordAuditFailStatus(ctx context.Context, failedAuditNodeIDs storj.NodeIDList, nodesReputation map[storj.NodeID]overlay.ReputationStatus) (failed storj.NodeIDList, err error) { defer mon.Task()(&ctx)(&err) - var errors error + var errors errs.Group for _, nodeID := range failedAuditNodeIDs { - err = reporter.reputations.ApplyAudit(ctx, nodeID, reputation.AuditFailure) + err = reporter.reputations.ApplyAudit(ctx, nodeID, nodesReputation[nodeID], reputation.AuditFailure) if err != nil { failed = append(failed, nodeID) - errors = errs.Combine(Error.New("failed to record some audit fail statuses in overlay"), err) + errors.Add(errs.Combine(Error.New("failed to record audit fail status in overlay for node %s", nodeID.String()), err)) } } - return failed, errors + return failed, errors.Err() } // recordAuditUnknownStatus updates nodeIDs in overlay with isup=true, auditoutcome=unknown. -func (reporter *Reporter) recordAuditUnknownStatus(ctx context.Context, unknownAuditNodeIDs storj.NodeIDList) (failed storj.NodeIDList, err error) { +func (reporter *Reporter) recordAuditUnknownStatus(ctx context.Context, unknownAuditNodeIDs storj.NodeIDList, nodesReputation map[storj.NodeID]overlay.ReputationStatus) (failed storj.NodeIDList, err error) { defer mon.Task()(&ctx)(&err) - var errors error + var errors errs.Group for _, nodeID := range unknownAuditNodeIDs { - err = reporter.reputations.ApplyAudit(ctx, nodeID, reputation.AuditUnknown) + err = reporter.reputations.ApplyAudit(ctx, nodeID, nodesReputation[nodeID], reputation.AuditUnknown) if err != nil { failed = append(failed, nodeID) - errors = errs.Combine(Error.New("failed to record some audit unknown statuses in overlay"), err) + errors.Add(errs.Combine(Error.New("failed to record audit unknown status in overlay for node %s", nodeID.String()), err)) } } - return failed, errors + return failed, errors.Err() } // recordOfflineStatus updates nodeIDs in overlay with isup=false, auditoutcome=offline. -func (reporter *Reporter) recordOfflineStatus(ctx context.Context, offlineNodeIDs storj.NodeIDList) (failed storj.NodeIDList, err error) { +func (reporter *Reporter) recordOfflineStatus(ctx context.Context, offlineNodeIDs storj.NodeIDList, nodesReputation map[storj.NodeID]overlay.ReputationStatus) (failed storj.NodeIDList, err error) { defer mon.Task()(&ctx)(&err) - var errors error + var errors errs.Group for _, nodeID := range offlineNodeIDs { - err = reporter.reputations.ApplyAudit(ctx, nodeID, reputation.AuditOffline) + err = reporter.reputations.ApplyAudit(ctx, nodeID, nodesReputation[nodeID], reputation.AuditOffline) if err != nil { failed = append(failed, nodeID) - errors = errs.Combine(Error.New("failed to record some audit offline statuses in overlay"), err) + errors.Add(errs.Combine(Error.New("failed to record audit offline status in overlay for node %s", nodeID.String()), err)) } } - return failed, errors + return failed, errors.Err() } // recordAuditSuccessStatus updates nodeIDs in overlay with isup=true, auditoutcome=success. -func (reporter *Reporter) recordAuditSuccessStatus(ctx context.Context, successNodeIDs storj.NodeIDList) (failed storj.NodeIDList, err error) { +func (reporter *Reporter) recordAuditSuccessStatus(ctx context.Context, successNodeIDs storj.NodeIDList, nodesReputation map[storj.NodeID]overlay.ReputationStatus) (failed storj.NodeIDList, err error) { defer mon.Task()(&ctx)(&err) - var errors error + var errors errs.Group for _, nodeID := range successNodeIDs { - err = reporter.reputations.ApplyAudit(ctx, nodeID, reputation.AuditSuccess) + err = reporter.reputations.ApplyAudit(ctx, nodeID, nodesReputation[nodeID], reputation.AuditSuccess) if err != nil { failed = append(failed, nodeID) - errors = errs.Combine(Error.New("failed to record some audit success statuses in overlay"), err) + errors.Add(errs.Combine(Error.New("failed to record audit success status in overlay for node %s", nodeID.String()), err)) + } } - return failed, errors + return failed, errors.Err() } // recordPendingAudits updates the containment status of nodes with pending audits. -func (reporter *Reporter) recordPendingAudits(ctx context.Context, pendingAudits []*PendingAudit) (failed []*PendingAudit, err error) { +func (reporter *Reporter) recordPendingAudits(ctx context.Context, pendingAudits []*PendingAudit, nodesReputation map[storj.NodeID]overlay.ReputationStatus) (failed []*PendingAudit, err error) { defer mon.Task()(&ctx)(&err) var errlist errs.Group @@ -203,7 +208,7 @@ func (reporter *Reporter) recordPendingAudits(ctx context.Context, pendingAudits } else { // record failure -- max reverify count reached reporter.log.Info("max reverify count reached (audit failed)", zap.Stringer("Node ID", pendingAudit.NodeID)) - err = reporter.reputations.ApplyAudit(ctx, pendingAudit.NodeID, reputation.AuditFailure) + err = reporter.reputations.ApplyAudit(ctx, pendingAudit.NodeID, nodesReputation[pendingAudit.NodeID], reputation.AuditFailure) if err != nil { errlist.Add(err) failed = append(failed, pendingAudit) diff --git a/satellite/audit/reverify_test.go b/satellite/audit/reverify_test.go index d65eb4dbf..3be8f0c01 100644 --- a/satellite/audit/reverify_test.go +++ b/satellite/audit/reverify_test.go @@ -72,10 +72,10 @@ func TestReverifySuccess(t *testing.T) { pieces := segment.Pieces rootPieceID := segment.RootPieceID - limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, pieces[0].StorageNode, pieces[0].Number, rootPieceID, shareSize) + limit, privateKey, cachedNodeInfo, err := orders.CreateAuditOrderLimit(ctx, pieces[0].StorageNode, pieces[0].Number, rootPieceID, shareSize) require.NoError(t, err) - share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(pieces[0].Number)) + share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedNodeInfo.LastIPPort, randomIndex, shareSize, int(pieces[0].Number)) require.NoError(t, err) pending := &audit.PendingAudit{ @@ -151,10 +151,10 @@ func TestReverifyFailMissingShare(t *testing.T) { pieces := segment.Pieces rootPieceID := segment.RootPieceID - limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, pieces[0].StorageNode, pieces[0].Number, rootPieceID, shareSize) + limit, privateKey, cachedNodeInfo, err := orders.CreateAuditOrderLimit(ctx, pieces[0].StorageNode, pieces[0].Number, rootPieceID, shareSize) require.NoError(t, err) - share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(pieces[0].Number)) + share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedNodeInfo.LastIPPort, randomIndex, shareSize, int(pieces[0].Number)) require.NoError(t, err) pending := &audit.PendingAudit{ @@ -796,10 +796,10 @@ func TestReverifyDifferentShare(t *testing.T) { shareSize := segment1.Redundancy.ShareSize rootPieceID := segment1.RootPieceID - limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, selectedNode, selectedPieceNum, rootPieceID, shareSize) + limit, privateKey, cachedNodeInfo, err := orders.CreateAuditOrderLimit(ctx, selectedNode, selectedPieceNum, rootPieceID, shareSize) require.NoError(t, err) - share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(selectedPieceNum)) + share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedNodeInfo.LastIPPort, randomIndex, shareSize, int(selectedPieceNum)) require.NoError(t, err) pending := &audit.PendingAudit{ @@ -955,10 +955,10 @@ func TestReverifyExpired2(t *testing.T) { shareSize := segment1.Redundancy.ShareSize rootPieceID := segment1.RootPieceID - limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, selectedNode, selectedPieceNum, rootPieceID, shareSize) + limit, privateKey, cachedNodeInfo, err := orders.CreateAuditOrderLimit(ctx, selectedNode, selectedPieceNum, rootPieceID, shareSize) require.NoError(t, err) - share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(selectedPieceNum)) + share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedNodeInfo.LastIPPort, randomIndex, shareSize, int(selectedPieceNum)) require.NoError(t, err) pending := &audit.PendingAudit{ @@ -1052,10 +1052,10 @@ func TestReverifySlowDownload(t *testing.T) { shareSize := segment.Redundancy.ShareSize rootPieceID := segment.RootPieceID - limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, slowNode, slowPiece.Number, rootPieceID, shareSize) + limit, privateKey, cachedNodeInfo, err := orders.CreateAuditOrderLimit(ctx, slowNode, slowPiece.Number, rootPieceID, shareSize) require.NoError(t, err) - share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(slowPiece.Number)) + share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedNodeInfo.LastIPPort, randomIndex, shareSize, int(slowPiece.Number)) require.NoError(t, err) pending := &audit.PendingAudit{ @@ -1141,10 +1141,10 @@ func TestReverifyUnknownError(t *testing.T) { shareSize := segment.Redundancy.ShareSize rootPieceID := segment.RootPieceID - limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, badNode, badPiece.Number, rootPieceID, shareSize) + limit, privateKey, cachedNodeInfo, err := orders.CreateAuditOrderLimit(ctx, badNode, badPiece.Number, rootPieceID, shareSize) require.NoError(t, err) - share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(badPiece.Number)) + share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedNodeInfo.LastIPPort, randomIndex, shareSize, int(badPiece.Number)) require.NoError(t, err) pending := &audit.PendingAudit{ @@ -1234,10 +1234,10 @@ func TestMaxReverifyCount(t *testing.T) { shareSize := segment.Redundancy.ShareSize rootPieceID := segment.RootPieceID - limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, slowNode, slowPiece.Number, rootPieceID, shareSize) + limit, privateKey, cachedNodeInfo, err := orders.CreateAuditOrderLimit(ctx, slowNode, slowPiece.Number, rootPieceID, shareSize) require.NoError(t, err) - share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(slowPiece.Number)) + share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedNodeInfo.LastIPPort, randomIndex, shareSize, int(slowPiece.Number)) require.NoError(t, err) pending := &audit.PendingAudit{ diff --git a/satellite/audit/verifier.go b/satellite/audit/verifier.go index 3cb33077f..2b6dce10c 100644 --- a/satellite/audit/verifier.go +++ b/satellite/audit/verifier.go @@ -120,7 +120,7 @@ func (verifier *Verifier) Verify(ctx context.Context, segment Segment, skip map[ containedNodes := make(map[int]storj.NodeID) sharesToAudit := make(map[int]Share) - orderLimits, privateKey, cachedIPsAndPorts, err := verifier.orders.CreateAuditOrderLimits(ctx, segmentInfo, skip) + orderLimits, privateKey, cachedNodesInfo, err := verifier.orders.CreateAuditOrderLimits(ctx, segmentInfo, skip) if err != nil { if orders.ErrDownloadFailedNotEnoughPieces.Has(err) { mon.Counter("not_enough_shares_for_audit").Inc(1) @@ -128,6 +128,11 @@ func (verifier *Verifier) Verify(ctx context.Context, segment Segment, skip map[ } return Report{}, err } + cachedNodesReputation := make(map[storj.NodeID]overlay.ReputationStatus, len(cachedNodesInfo)) + for id, info := range cachedNodesInfo { + cachedNodesReputation[id] = info.Reputation + } + defer func() { report.NodesReputation = cachedNodesReputation }() // NOTE offlineNodes will include disqualified nodes because they aren't in // the skip list @@ -138,7 +143,7 @@ func (verifier *Verifier) Verify(ctx context.Context, segment Segment, skip map[ zap.String("Segment", segmentInfoString(segment))) } - shares, err := verifier.DownloadShares(ctx, orderLimits, privateKey, cachedIPsAndPorts, randomIndex, segmentInfo.Redundancy.ShareSize) + shares, err := verifier.DownloadShares(ctx, orderLimits, privateKey, cachedNodesInfo, randomIndex, segmentInfo.Redundancy.ShareSize) if err != nil { return Report{ Offlines: offlineNodes, @@ -289,7 +294,7 @@ func segmentInfoString(segment Segment) string { } // DownloadShares downloads shares from the nodes where remote pieces are located. -func (verifier *Verifier) DownloadShares(ctx context.Context, limits []*pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, cachedIPsAndPorts map[storj.NodeID]string, stripeIndex int32, shareSize int32) (shares map[int]Share, err error) { +func (verifier *Verifier) DownloadShares(ctx context.Context, limits []*pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, cachedNodesInfo map[storj.NodeID]overlay.NodeReputation, stripeIndex int32, shareSize int32) (shares map[int]Share, err error) { defer mon.Task()(&ctx)(&err) shares = make(map[int]Share, len(limits)) @@ -301,9 +306,14 @@ func (verifier *Verifier) DownloadShares(ctx context.Context, limits []*pb.Addre continue } - ip := cachedIPsAndPorts[limit.Limit.StorageNodeId] + var ipPort string + node, ok := cachedNodesInfo[limit.Limit.StorageNodeId] + if ok && node.LastIPPort != "" { + ipPort = node.LastIPPort + } + go func(i int, limit *pb.AddressedOrderLimit) { - share, err := verifier.GetShare(ctx, limit, piecePrivateKey, ip, stripeIndex, shareSize, i) + share, err := verifier.GetShare(ctx, limit, piecePrivateKey, ipPort, stripeIndex, shareSize, i) if err != nil { share = Share{ Error: err, @@ -345,6 +355,7 @@ func (verifier *Verifier) Reverify(ctx context.Context, segment Segment) (report nodeID storj.NodeID status int pendingAudit *PendingAudit + reputation overlay.ReputationStatus release bool err error } @@ -431,7 +442,7 @@ func (verifier *Verifier) Reverify(ctx context.Context, segment Segment) (report return } - limit, piecePrivateKey, cachedIPAndPort, err := verifier.orders.CreateAuditOrderLimit(ctx, pending.NodeID, pieceNum, pending.PieceID, pending.ShareSize) + limit, piecePrivateKey, cachedNodeInfo, err := verifier.orders.CreateAuditOrderLimit(ctx, pending.NodeID, pieceNum, pending.PieceID, pending.ShareSize) if err != nil { if overlay.ErrNodeDisqualified.Has(err) { ch <- result{nodeID: pending.NodeID, status: skipped, release: true} @@ -444,7 +455,7 @@ func (verifier *Verifier) Reverify(ctx context.Context, segment Segment) (report return } if overlay.ErrNodeOffline.Has(err) { - ch <- result{nodeID: pending.NodeID, status: offline} + ch <- result{nodeID: pending.NodeID, status: offline, reputation: cachedNodeInfo.Reputation} verifier.log.Debug("Reverify: order limit not created (offline)", zap.Stringer("Node ID", pending.NodeID)) return } @@ -453,7 +464,7 @@ func (verifier *Verifier) Reverify(ctx context.Context, segment Segment) (report return } - share, err := verifier.GetShare(ctx, limit, piecePrivateKey, cachedIPAndPort, pending.StripeIndex, pending.ShareSize, int(pieceNum)) + share, err := verifier.GetShare(ctx, limit, piecePrivateKey, cachedNodeInfo.LastIPPort, pending.StripeIndex, pending.ShareSize, int(pieceNum)) // check if the pending audit was deleted while downloading the share _, getErr := verifier.containment.Get(ctx, pending.NodeID) @@ -473,18 +484,18 @@ func (verifier *Verifier) Reverify(ctx context.Context, segment Segment) (report if rpc.Error.Has(err) { if errs.Is(err, context.DeadlineExceeded) { // dial timeout - ch <- result{nodeID: pending.NodeID, status: offline} + ch <- result{nodeID: pending.NodeID, status: offline, reputation: cachedNodeInfo.Reputation} verifier.log.Debug("Reverify: dial timeout (offline)", zap.Stringer("Node ID", pending.NodeID), zap.Error(err)) return } if errs2.IsRPC(err, rpcstatus.Unknown) { // dial failed -- offline node verifier.log.Debug("Reverify: dial failed (offline)", zap.Stringer("Node ID", pending.NodeID), zap.Error(err)) - ch <- result{nodeID: pending.NodeID, status: offline} + ch <- result{nodeID: pending.NodeID, status: offline, reputation: cachedNodeInfo.Reputation} return } // unknown transport error - ch <- result{nodeID: pending.NodeID, status: unknown, pendingAudit: pending, release: true} + ch <- result{nodeID: pending.NodeID, status: unknown, pendingAudit: pending, reputation: cachedNodeInfo.Reputation, release: true} verifier.log.Info("Reverify: unknown transport error (skipped)", zap.Stringer("Node ID", pending.NodeID), zap.Error(err)) return } @@ -497,24 +508,24 @@ func (verifier *Verifier) Reverify(ctx context.Context, segment Segment) (report return } // missing share - ch <- result{nodeID: pending.NodeID, status: failed, release: true} + ch <- result{nodeID: pending.NodeID, status: failed, reputation: cachedNodeInfo.Reputation, release: true} verifier.log.Info("Reverify: piece not found (audit failed)", zap.Stringer("Node ID", pending.NodeID), zap.Error(err)) return } if errs2.IsRPC(err, rpcstatus.DeadlineExceeded) { // dial successful, but download timed out - ch <- result{nodeID: pending.NodeID, status: contained, pendingAudit: pending} + ch <- result{nodeID: pending.NodeID, status: contained, pendingAudit: pending, reputation: cachedNodeInfo.Reputation} verifier.log.Info("Reverify: download timeout (contained)", zap.Stringer("Node ID", pending.NodeID), zap.Error(err)) return } // unknown error - ch <- result{nodeID: pending.NodeID, status: unknown, pendingAudit: pending, release: true} + ch <- result{nodeID: pending.NodeID, status: unknown, pendingAudit: pending, reputation: cachedNodeInfo.Reputation, release: true} verifier.log.Info("Reverify: unknown error (skipped)", zap.Stringer("Node ID", pending.NodeID), zap.Error(err)) return } downloadedHash := pkcrypto.SHA256Hash(share.Data) if bytes.Equal(downloadedHash, pending.ExpectedShareHash) { - ch <- result{nodeID: pending.NodeID, status: success, release: true} + ch <- result{nodeID: pending.NodeID, status: success, reputation: cachedNodeInfo.Reputation, release: true} verifier.log.Info("Reverify: hashes match (audit success)", zap.Stringer("Node ID", pending.NodeID)) } else { err := verifier.checkIfSegmentAltered(ctx, pendingSegment) @@ -525,13 +536,17 @@ func (verifier *Verifier) Reverify(ctx context.Context, segment Segment) (report } verifier.log.Info("Reverify: hashes mismatch (audit failed)", zap.Stringer("Node ID", pending.NodeID), zap.Binary("expected hash", pending.ExpectedShareHash), zap.Binary("downloaded hash", downloadedHash)) - ch <- result{nodeID: pending.NodeID, status: failed, release: true} + ch <- result{nodeID: pending.NodeID, status: failed, reputation: cachedNodeInfo.Reputation, release: true} } }(pending) } + reputations := make(map[storj.NodeID]overlay.ReputationStatus) for range pieces { result := <-ch + + reputations[result.nodeID] = result.reputation + switch result.status { case skipped: case success: @@ -555,6 +570,7 @@ func (verifier *Verifier) Reverify(ctx context.Context, segment Segment) (report } } } + report.NodesReputation = reputations mon.Meter("reverify_successes_global").Mark(len(report.Successes)) //mon:locked mon.Meter("reverify_offlines_global").Mark(len(report.Offlines)) //mon:locked diff --git a/satellite/audit/verifier_test.go b/satellite/audit/verifier_test.go index 70a9990da..bf4f3a35f 100644 --- a/satellite/audit/verifier_test.go +++ b/satellite/audit/verifier_test.go @@ -64,10 +64,10 @@ func TestDownloadSharesHappyPath(t *testing.T) { shareSize := segment.Redundancy.ShareSize - limits, privateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, segment, nil) + limits, privateKey, cachedNodesInfo, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, segment, nil) require.NoError(t, err) - shares, err := audits.Verifier.DownloadShares(ctx, limits, privateKey, cachedIPsAndPorts, randomIndex, shareSize) + shares, err := audits.Verifier.DownloadShares(ctx, limits, privateKey, cachedNodesInfo, randomIndex, shareSize) require.NoError(t, err) for _, share := range shares { @@ -117,7 +117,7 @@ func TestDownloadSharesOfflineNode(t *testing.T) { shareSize := segment.Redundancy.ShareSize - limits, privateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, segment, nil) + limits, privateKey, cachedNodesInfo, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, segment, nil) require.NoError(t, err) // stop the first node in the segment @@ -125,7 +125,7 @@ func TestDownloadSharesOfflineNode(t *testing.T) { err = planet.StopNodeAndUpdate(ctx, planet.FindNode(stoppedNodeID)) require.NoError(t, err) - shares, err := audits.Verifier.DownloadShares(ctx, limits, privateKey, cachedIPsAndPorts, randomIndex, shareSize) + shares, err := audits.Verifier.DownloadShares(ctx, limits, privateKey, cachedNodesInfo, randomIndex, shareSize) require.NoError(t, err) for _, share := range shares { @@ -182,10 +182,10 @@ func TestDownloadSharesMissingPiece(t *testing.T) { shareSize := segment.Redundancy.ShareSize - limits, privateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, segment, nil) + limits, privateKey, cachedNodesInfo, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, segment, nil) require.NoError(t, err) - shares, err := audits.Verifier.DownloadShares(ctx, limits, privateKey, cachedIPsAndPorts, randomIndex, shareSize) + shares, err := audits.Verifier.DownloadShares(ctx, limits, privateKey, cachedNodesInfo, randomIndex, shareSize) require.NoError(t, err) for _, share := range shares { @@ -261,10 +261,10 @@ func TestDownloadSharesDialTimeout(t *testing.T) { shareSize := segment.Redundancy.ShareSize - limits, privateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, segment, nil) + limits, privateKey, cachedNodesInfo, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, segment, nil) require.NoError(t, err) - shares, err := verifier.DownloadShares(ctx, limits, privateKey, cachedIPsAndPorts, randomIndex, shareSize) + shares, err := verifier.DownloadShares(ctx, limits, privateKey, cachedNodesInfo, randomIndex, shareSize) require.NoError(t, err) for _, share := range shares { @@ -336,14 +336,14 @@ func TestDownloadSharesDownloadTimeout(t *testing.T) { shareSize := segment.Redundancy.ShareSize - limits, privateKey, cachedIPsAndPorts, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, segment, nil) + limits, privateKey, cachedNodesInfo, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, segment, nil) require.NoError(t, err) // make downloads on storage node slower than the timeout on the satellite for downloading shares delay := 200 * time.Millisecond storageNodeDB.SetLatency(delay) - shares, err := verifier.DownloadShares(ctx, limits, privateKey, cachedIPsAndPorts, randomIndex, shareSize) + shares, err := verifier.DownloadShares(ctx, limits, privateKey, cachedNodesInfo, randomIndex, shareSize) require.NoError(t, err) require.Len(t, shares, 1) diff --git a/satellite/orders/service.go b/satellite/orders/service.go index cc54d9919..2060ee7fd 100644 --- a/satellite/orders/service.go +++ b/satellite/orders/service.go @@ -263,7 +263,7 @@ func (service *Service) CreatePutOrderLimits(ctx context.Context, bucket metabas } // CreateAuditOrderLimits creates the order limits for auditing the pieces of a segment. -func (service *Service) CreateAuditOrderLimits(ctx context.Context, segment metabase.Segment, skip map[storj.NodeID]bool) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, cachedIPsAndPorts map[storj.NodeID]string, err error) { +func (service *Service) CreateAuditOrderLimits(ctx context.Context, segment metabase.Segment, skip map[storj.NodeID]bool) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, cachedNodesInfo map[storj.NodeID]overlay.NodeReputation, err error) { defer mon.Task()(&ctx)(&err) nodeIDs := make([]storj.NodeID, len(segment.Pieces)) @@ -271,7 +271,7 @@ func (service *Service) CreateAuditOrderLimits(ctx context.Context, segment meta nodeIDs[i] = piece.StorageNode } - nodes, err := service.overlay.GetOnlineNodesForGetDelete(ctx, nodeIDs) + nodes, err := service.overlay.GetOnlineNodesForAuditRepair(ctx, nodeIDs) if err != nil { service.log.Debug("error getting nodes from overlay", zap.Error(err)) return nil, storj.PiecePrivateKey{}, nil, Error.Wrap(err) @@ -283,7 +283,7 @@ func (service *Service) CreateAuditOrderLimits(ctx context.Context, segment meta return nil, storj.PiecePrivateKey{}, nil, Error.Wrap(err) } - cachedIPsAndPorts = make(map[storj.NodeID]string) + cachedNodesInfo = make(map[storj.NodeID]overlay.NodeReputation) var nodeErrors errs.Group var limitsCount int16 limits := make([]*pb.AddressedOrderLimit, segment.Redundancy.TotalShares) @@ -298,9 +298,8 @@ func (service *Service) CreateAuditOrderLimits(ctx context.Context, segment meta } address := node.Address.Address - if node.LastIPPort != "" { - cachedIPsAndPorts[piece.StorageNode] = node.LastIPPort - } + cachedNodesInfo[piece.StorageNode] = *node + limit, err := signer.Sign(ctx, storj.NodeURL{ ID: piece.StorageNode, Address: address, @@ -318,31 +317,40 @@ func (service *Service) CreateAuditOrderLimits(ctx context.Context, segment meta return nil, storj.PiecePrivateKey{}, nil, errs.Combine(err, nodeErrors.Err()) } - return limits, signer.PrivateKey, cachedIPsAndPorts, nil + return limits, signer.PrivateKey, cachedNodesInfo, nil } // CreateAuditOrderLimit creates an order limit for auditing a single the piece from a segment. -func (service *Service) CreateAuditOrderLimit(ctx context.Context, nodeID storj.NodeID, pieceNum uint16, rootPieceID storj.PieceID, shareSize int32) (limit *pb.AddressedOrderLimit, _ storj.PiecePrivateKey, cachedIPAndPort string, err error) { +func (service *Service) CreateAuditOrderLimit(ctx context.Context, nodeID storj.NodeID, pieceNum uint16, rootPieceID storj.PieceID, shareSize int32) (limit *pb.AddressedOrderLimit, _ storj.PiecePrivateKey, nodeInfo *overlay.NodeReputation, err error) { // TODO reduce number of params ? defer mon.Task()(&ctx)(&err) node, err := service.overlay.Get(ctx, nodeID) if err != nil { - return nil, storj.PiecePrivateKey{}, "", Error.Wrap(err) + return nil, storj.PiecePrivateKey{}, nil, Error.Wrap(err) } + + nodeInfo = &overlay.NodeReputation{ + ID: nodeID, + Address: node.Address, + LastNet: node.LastNet, + LastIPPort: node.LastIPPort, + Reputation: node.Reputation.Status, + } + if node.Disqualified != nil { - return nil, storj.PiecePrivateKey{}, "", overlay.ErrNodeDisqualified.New("%v", nodeID) + return nil, storj.PiecePrivateKey{}, nodeInfo, overlay.ErrNodeDisqualified.New("%v", nodeID) } if node.ExitStatus.ExitFinishedAt != nil { - return nil, storj.PiecePrivateKey{}, "", overlay.ErrNodeFinishedGE.New("%v", nodeID) + return nil, storj.PiecePrivateKey{}, nodeInfo, overlay.ErrNodeFinishedGE.New("%v", nodeID) } if !service.overlay.IsOnline(node) { - return nil, storj.PiecePrivateKey{}, "", overlay.ErrNodeOffline.New("%v", nodeID) + return nil, storj.PiecePrivateKey{}, nodeInfo, overlay.ErrNodeOffline.New("%v", nodeID) } signer, err := NewSignerAudit(service, rootPieceID, time.Now(), int64(shareSize), metabase.BucketLocation{}) if err != nil { - return nil, storj.PiecePrivateKey{}, "", Error.Wrap(err) + return nil, storj.PiecePrivateKey{}, nodeInfo, Error.Wrap(err) } orderLimit, err := signer.Sign(ctx, storj.NodeURL{ @@ -350,10 +358,10 @@ func (service *Service) CreateAuditOrderLimit(ctx context.Context, nodeID storj. Address: node.Address.Address, }, int32(pieceNum)) if err != nil { - return nil, storj.PiecePrivateKey{}, "", Error.Wrap(err) + return nil, storj.PiecePrivateKey{}, nodeInfo, Error.Wrap(err) } - return orderLimit, signer.PrivateKey, node.LastIPPort, nil + return orderLimit, signer.PrivateKey, nodeInfo, nil } // CreateGetRepairOrderLimits creates the order limits for downloading the @@ -361,7 +369,7 @@ func (service *Service) CreateAuditOrderLimit(ctx context.Context, nodeID storj. // // The length of the returned orders slice is the total number of pieces of the // segment, setting to null the ones which don't correspond to a healthy piece. -func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, bucket metabase.BucketLocation, segment metabase.Segment, healthy metabase.Pieces) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, cachedIPsAndPorts map[storj.NodeID]string, err error) { +func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, bucket metabase.BucketLocation, segment metabase.Segment, healthy metabase.Pieces) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, cachedNodesInfo map[storj.NodeID]overlay.NodeReputation, err error) { defer mon.Task()(&ctx)(&err) redundancy, err := eestream.NewRedundancyStrategyFromStorj(segment.Redundancy) @@ -377,7 +385,7 @@ func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, bucket m nodeIDs[i] = piece.StorageNode } - nodes, err := service.overlay.GetOnlineNodesForGetDelete(ctx, nodeIDs) + nodes, err := service.overlay.GetOnlineNodesForAuditRepair(ctx, nodeIDs) if err != nil { service.log.Debug("error getting nodes from overlay", zap.Error(err)) return nil, storj.PiecePrivateKey{}, nil, Error.Wrap(err) @@ -388,7 +396,7 @@ func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, bucket m return nil, storj.PiecePrivateKey{}, nil, Error.Wrap(err) } - cachedIPsAndPorts = make(map[storj.NodeID]string) + cachedNodesInfo = make(map[storj.NodeID]overlay.NodeReputation, len(healthy)) var nodeErrors errs.Group var limitsCount int limits := make([]*pb.AddressedOrderLimit, totalPieces) @@ -399,9 +407,7 @@ func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, bucket m continue } - if node.LastIPPort != "" { - cachedIPsAndPorts[piece.StorageNode] = node.LastIPPort - } + cachedNodesInfo[piece.StorageNode] = *node limit, err := signer.Sign(ctx, storj.NodeURL{ ID: piece.StorageNode, @@ -424,7 +430,7 @@ func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, bucket m return nil, storj.PiecePrivateKey{}, nil, Error.Wrap(err) } - return limits, signer.PrivateKey, cachedIPsAndPorts, nil + return limits, signer.PrivateKey, cachedNodesInfo, nil } // CreatePutRepairOrderLimits creates the order limits for uploading the repaired pieces of segment to newNodes. diff --git a/satellite/overlay/db_test.go b/satellite/overlay/db_test.go index 1c54ea7c8..2933f9790 100644 --- a/satellite/overlay/db_test.go +++ b/satellite/overlay/db_test.go @@ -89,12 +89,12 @@ func TestDQNodesLastSeenBefore(t *testing.T) { n1Info, err = cache.Get(ctx, node1.ID()) require.NoError(t, err) require.NotNil(t, n1Info.Disqualified) - require.Equal(t, n1DQTime, n1Info.Reputation.Disqualified) + require.Equal(t, n1DQTime, n1Info.Reputation.Status.Disqualified) n2Info, err = cache.Get(ctx, node2.ID()) require.NoError(t, err) require.NotNil(t, n2Info.Disqualified) - require.Equal(t, n2DQTime, n2Info.Reputation.Disqualified) + require.Equal(t, n2DQTime, n2Info.Reputation.Status.Disqualified) }) } @@ -128,14 +128,14 @@ func TestDQStrayNodesFailedPingback(t *testing.T) { d, err := oc.Get(ctx, testID) require.NoError(t, err) require.Equal(t, time.Time{}, d.Reputation.LastContactSuccess.UTC()) - require.Nil(t, d.Reputation.Disqualified) + require.Nil(t, d.Reputation.Status.Disqualified) sat.Overlay.DQStrayNodes.Loop.TriggerWait() d, err = oc.Get(ctx, testID) require.NoError(t, err) require.Equal(t, time.Time{}, d.Reputation.LastContactSuccess.UTC()) - require.Nil(t, d.Reputation.Disqualified) + require.Nil(t, d.Reputation.Status.Disqualified) }) } diff --git a/satellite/overlay/selection_test.go b/satellite/overlay/selection_test.go index bc899bc22..3e95003d1 100644 --- a/satellite/overlay/selection_test.go +++ b/satellite/overlay/selection_test.go @@ -191,7 +191,7 @@ func TestEnsureMinimumRequested(t *testing.T) { for i := 0; i < 5; i++ { node := planet.StorageNodes[i] reputable[node.ID()] = true - err := repService.ApplyAudit(ctx, node.ID(), reputation.AuditSuccess) + err := repService.ApplyAudit(ctx, node.ID(), overlay.ReputationStatus{}, reputation.AuditSuccess) require.NoError(t, err) } @@ -229,7 +229,7 @@ func TestEnsureMinimumRequested(t *testing.T) { for i := 5; i < 10; i++ { node := planet.StorageNodes[i] reputable[node.ID()] = true - err := repService.ApplyAudit(ctx, node.ID(), reputation.AuditSuccess) + err := repService.ApplyAudit(ctx, node.ID(), overlay.ReputationStatus{}, reputation.AuditSuccess) require.NoError(t, err) } @@ -397,7 +397,7 @@ func TestNodeSelectionGracefulExit(t *testing.T) { // nodes at indices 0, 2, 4, 6, 8 are gracefully exiting for i, node := range planet.StorageNodes { for k := 0; k < i; k++ { - err := satellite.Reputation.Service.ApplyAudit(ctx, node.ID(), reputation.AuditSuccess) + err := satellite.Reputation.Service.ApplyAudit(ctx, node.ID(), overlay.ReputationStatus{}, reputation.AuditSuccess) require.NoError(t, err) } @@ -630,7 +630,7 @@ func TestDistinctIPs(t *testing.T) { satellite := planet.Satellites[0] // Vets nodes[8] and nodes[9]. for i := 9; i > 7; i-- { - err := satellite.Reputation.Service.ApplyAudit(ctx, planet.StorageNodes[i].ID(), reputation.AuditSuccess) + err := satellite.Reputation.Service.ApplyAudit(ctx, planet.StorageNodes[i].ID(), overlay.ReputationStatus{}, reputation.AuditSuccess) assert.NoError(t, err) } testDistinctIPs(t, ctx, planet) @@ -657,7 +657,7 @@ func TestDistinctIPsWithBatch(t *testing.T) { satellite := planet.Satellites[0] // Vets nodes[8] and nodes[9]. for i := 9; i > 7; i-- { - err := satellite.Reputation.Service.ApplyAudit(ctx, planet.StorageNodes[i].ID(), reputation.AuditSuccess) + err := satellite.Reputation.Service.ApplyAudit(ctx, planet.StorageNodes[i].ID(), overlay.ReputationStatus{}, reputation.AuditSuccess) assert.NoError(t, err) } testDistinctIPs(t, ctx, planet) diff --git a/satellite/overlay/service.go b/satellite/overlay/service.go index db2c22e32..63efe39ef 100644 --- a/satellite/overlay/service.go +++ b/satellite/overlay/service.go @@ -43,6 +43,10 @@ var ErrNotEnoughNodes = errs.Class("not enough nodes") type DB interface { // GetOnlineNodesForGetDelete returns a map of nodes for the supplied nodeIDs GetOnlineNodesForGetDelete(ctx context.Context, nodeIDs []storj.NodeID, onlineWindow time.Duration) (map[storj.NodeID]*SelectedNode, error) + // GetOnlineNodesForAuditRepair returns a map of nodes for the supplied nodeIDs. + // The return value contains necessary information to create orders as well as nodes' + // current reputation status. + GetOnlineNodesForAuditRepair(ctx context.Context, nodeIDs []storj.NodeID, onlineWindow time.Duration) (map[storj.NodeID]*NodeReputation, error) // SelectStorageNodes looks up nodes based on criteria SelectStorageNodes(ctx context.Context, totalNeededNodes, newNodeCount int, criteria *NodeCriteria) ([]*SelectedNode, error) // SelectAllStorageNodesUpload returns all nodes that qualify to store data, organized as reputable nodes and new nodes @@ -243,14 +247,11 @@ type NodeDossier struct { // NodeStats contains statistics about a node. type NodeStats struct { - Latency90 int64 - LastContactSuccess time.Time - LastContactFailure time.Time - VettedAt *time.Time - Disqualified *time.Time - UnknownAuditSuspended *time.Time - OfflineUnderReview *time.Time - OfflineSuspended *time.Time + Latency90 int64 + LastContactSuccess time.Time + LastContactFailure time.Time + OfflineUnderReview *time.Time + Status ReputationStatus } // NodeLastContact contains the ID, address, and timestamp. @@ -270,6 +271,15 @@ type SelectedNode struct { CountryCode location.CountryCode } +// NodeReputation is used as a result for creating orders limits for audits. +type NodeReputation struct { + ID storj.NodeID + Address *pb.NodeAddress + LastNet string + LastIPPort string + Reputation ReputationStatus +} + // Clone returns a deep clone of the selected node. func (node *SelectedNode) Clone() *SelectedNode { return &SelectedNode{ @@ -351,6 +361,13 @@ func (service *Service) GetOnlineNodesForGetDelete(ctx context.Context, nodeIDs return service.db.GetOnlineNodesForGetDelete(ctx, nodeIDs, service.config.Node.OnlineWindow) } +// GetOnlineNodesForAuditRepair returns a map of nodes for the supplied nodeIDs. +func (service *Service) GetOnlineNodesForAuditRepair(ctx context.Context, nodeIDs []storj.NodeID) (_ map[storj.NodeID]*NodeReputation, err error) { + defer mon.Task()(&ctx)(&err) + + return service.db.GetOnlineNodesForAuditRepair(ctx, nodeIDs, service.config.Node.OnlineWindow) +} + // GetNodeIPs returns a map of node ip:port for the supplied nodeIDs. func (service *Service) GetNodeIPs(ctx context.Context, nodeIDs []storj.NodeID) (_ map[storj.NodeID]string, err error) { defer mon.Task()(&ctx)(&err) diff --git a/satellite/overlay/service_test.go b/satellite/overlay/service_test.go index dfd4ca1c9..66d5e6067 100644 --- a/satellite/overlay/service_test.go +++ b/satellite/overlay/service_test.go @@ -680,7 +680,7 @@ func TestUpdateReputation(t *testing.T) { require.Nil(t, info.Disqualified) require.Nil(t, info.UnknownAuditSuspended) require.Nil(t, info.OfflineSuspended) - require.Nil(t, info.Reputation.VettedAt) + require.Nil(t, info.Reputation.Status.VettedAt) t0 := time.Now().Truncate(time.Hour) t1 := t0.Add(time.Hour) @@ -701,7 +701,7 @@ func TestUpdateReputation(t *testing.T) { require.Equal(t, reputationChange.Disqualified, info.Disqualified) require.Equal(t, reputationChange.UnknownAuditSuspended, info.UnknownAuditSuspended) require.Equal(t, reputationChange.OfflineSuspended, info.OfflineSuspended) - require.Equal(t, reputationChange.VettedAt, info.Reputation.VettedAt) + require.Equal(t, reputationChange.VettedAt, info.Reputation.Status.VettedAt) reputationChange.Disqualified = &t0 @@ -766,7 +766,7 @@ func TestVetAndUnvetNode(t *testing.T) { require.NoError(t, err) dossier, err := service.Get(ctx, node.ID()) require.NoError(t, err) - require.Nil(t, dossier.Reputation.VettedAt) + require.Nil(t, dossier.Reputation.Status.VettedAt) // vet again vettedTime, err := service.TestVetNode(ctx, node.ID()) @@ -774,13 +774,13 @@ func TestVetAndUnvetNode(t *testing.T) { require.NotNil(t, vettedTime) dossier, err = service.Get(ctx, node.ID()) require.NoError(t, err) - require.NotNil(t, dossier.Reputation.VettedAt) + require.NotNil(t, dossier.Reputation.Status.VettedAt) // unvet again err = service.TestUnvetNode(ctx, node.ID()) require.NoError(t, err) dossier, err = service.Get(ctx, node.ID()) require.NoError(t, err) - require.Nil(t, dossier.Reputation.VettedAt) + require.Nil(t, dossier.Reputation.Status.VettedAt) }) } diff --git a/satellite/repair/repair_test.go b/satellite/repair/repair_test.go index 2df40dcaf..4148dc0e2 100644 --- a/satellite/repair/repair_test.go +++ b/satellite/repair/repair_test.go @@ -2948,15 +2948,16 @@ func TestECRepairerGetDoesNameLookupIfNecessary(t *testing.T) { require.NoError(t, err) require.True(t, len(segment.Pieces) > 1) - limits, privateKey, _, err := testSatellite.Orders.Service.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, segment.Pieces) + limits, privateKey, cachedNodesInfo, err := testSatellite.Orders.Service.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, segment.Pieces) require.NoError(t, err) - cachedIPsAndPorts := make(map[storj.NodeID]string) for i, l := range limits { if l == nil { continue } - cachedIPsAndPorts[l.Limit.StorageNodeId] = fmt.Sprintf("garbageXXX#:%d", i) + info := cachedNodesInfo[l.Limit.StorageNodeId] + info.LastIPPort = fmt.Sprintf("garbageXXX#:%d", i) + cachedNodesInfo[l.Limit.StorageNodeId] = info } mock := &mockConnector{} @@ -2965,7 +2966,7 @@ func TestECRepairerGetDoesNameLookupIfNecessary(t *testing.T) { redundancy, err := eestream.NewRedundancyStrategyFromStorj(segment.Redundancy) require.NoError(t, err) - readCloser, pieces, err := ec.Get(ctx, limits, cachedIPsAndPorts, privateKey, redundancy, int64(segment.EncryptedSize)) + readCloser, pieces, err := ec.Get(ctx, limits, cachedNodesInfo, privateKey, redundancy, int64(segment.EncryptedSize)) require.NoError(t, err) require.Len(t, pieces.Failed, 0) require.NotNil(t, readCloser) @@ -2973,9 +2974,9 @@ func TestECRepairerGetDoesNameLookupIfNecessary(t *testing.T) { // repair will only download minimum required minReq := redundancy.RequiredCount() var numDialed int - for _, ip := range cachedIPsAndPorts { + for _, info := range cachedNodesInfo { for _, dialed := range mock.addressesDialed { - if dialed == ip { + if dialed == info.LastIPPort { numDialed++ if numDialed == minReq { break @@ -3019,13 +3020,12 @@ func TestECRepairerGetPrefersCachedIPPort(t *testing.T) { require.NoError(t, err) require.True(t, len(segment.Pieces) > 1) - limits, privateKey, _, err := testSatellite.Orders.Service.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, segment.Pieces) + limits, privateKey, cachedNodesInfo, err := testSatellite.Orders.Service.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, segment.Pieces) require.NoError(t, err) // make it so that when the cached IP is dialed, we dial the "right" address, // but when the "right" address is dialed (meaning it came from the OrderLimit, // we dial something else! - cachedIPsAndPorts := make(map[storj.NodeID]string) mock := &mockConnector{ dialInstead: make(map[string]string), } @@ -3034,11 +3034,13 @@ func TestECRepairerGetPrefersCachedIPPort(t *testing.T) { if l == nil { continue } - ip := fmt.Sprintf("garbageXXX#:%d", i) - cachedIPsAndPorts[l.Limit.StorageNodeId] = ip + + info := cachedNodesInfo[l.Limit.StorageNodeId] + info.LastIPPort = fmt.Sprintf("garbageXXX#:%d", i) + cachedNodesInfo[l.Limit.StorageNodeId] = info address := l.StorageNodeAddress.Address - mock.dialInstead[ip] = address + mock.dialInstead[info.LastIPPort] = address mock.dialInstead[address] = "utter.failure?!*" realAddresses = append(realAddresses, address) @@ -3049,16 +3051,16 @@ func TestECRepairerGetPrefersCachedIPPort(t *testing.T) { redundancy, err := eestream.NewRedundancyStrategyFromStorj(segment.Redundancy) require.NoError(t, err) - readCloser, pieces, err := ec.Get(ctx, limits, cachedIPsAndPorts, privateKey, redundancy, int64(segment.EncryptedSize)) + readCloser, pieces, err := ec.Get(ctx, limits, cachedNodesInfo, privateKey, redundancy, int64(segment.EncryptedSize)) require.NoError(t, err) require.Len(t, pieces.Failed, 0) require.NotNil(t, readCloser) // repair will only download minimum required. minReq := redundancy.RequiredCount() var numDialed int - for _, ip := range cachedIPsAndPorts { + for _, info := range cachedNodesInfo { for _, dialed := range mock.addressesDialed { - if dialed == ip { + if dialed == info.LastIPPort { numDialed++ if numDialed == minReq { break diff --git a/satellite/repair/repairer/ec.go b/satellite/repair/repairer/ec.go index c5a6d0705..5a7e99ce0 100644 --- a/satellite/repair/repairer/ec.go +++ b/satellite/repair/repairer/ec.go @@ -29,6 +29,7 @@ import ( "storj.io/common/sync2" "storj.io/storj/satellite/audit" "storj.io/storj/satellite/metabase" + "storj.io/storj/satellite/overlay" "storj.io/uplink/private/eestream" "storj.io/uplink/private/piecestore" ) @@ -65,7 +66,7 @@ func (ec *ECRepairer) dialPiecestore(ctx context.Context, n storj.NodeURL) (*pie // After downloading a piece, the ECRepairer will verify the hash and original order limit for that piece. // If verification fails, another piece will be downloaded until we reach the minimum required or run out of order limits. // If piece hash verification fails, it will return all failed node IDs. -func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, cachedIPsAndPorts map[storj.NodeID]string, privateKey storj.PiecePrivateKey, es eestream.ErasureScheme, dataSize int64) (_ io.ReadCloser, _ audit.Pieces, err error) { +func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, cachedNodesInfo map[storj.NodeID]overlay.NodeReputation, privateKey storj.PiecePrivateKey, es eestream.ErasureScheme, dataSize int64) (_ io.ReadCloser, _ audit.Pieces, err error) { defer mon.Task()(&ctx)(&err) if len(limits) != es.TotalCount() { @@ -123,11 +124,11 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, inProgress++ cond.L.Unlock() - lastIPPort := cachedIPsAndPorts[limit.GetLimit().StorageNodeId] + info := cachedNodesInfo[limit.GetLimit().StorageNodeId] address := limit.GetStorageNodeAddress().GetAddress() var triedLastIPPort bool - if lastIPPort != "" && lastIPPort != address { - address = lastIPPort + if info.LastIPPort != "" && info.LastIPPort != address { + address = info.LastIPPort triedLastIPPort = true } diff --git a/satellite/repair/repairer/segments.go b/satellite/repair/repairer/segments.go index dc3a9b53e..646cf4d62 100644 --- a/satellite/repair/repairer/segments.go +++ b/satellite/repair/repairer/segments.go @@ -232,7 +232,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue } // Create the order limits for the GET_REPAIR action - getOrderLimits, getPrivateKey, cachedIPsAndPorts, err := repairer.orders.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, healthyPieces) + getOrderLimits, getPrivateKey, cachedNodesInfo, err := repairer.orders.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, healthyPieces) if err != nil { if orders.ErrDownloadFailedNotEnoughPieces.Has(err) { mon.Counter("repairer_segments_below_min_req").Inc(1) //mon:locked @@ -286,7 +286,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue } // Download the segment using just the healthy pieces - segmentReader, piecesReport, err := repairer.ec.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize)) + segmentReader, piecesReport, err := repairer.ec.Get(ctx, getOrderLimits, cachedNodesInfo, getPrivateKey, redundancy, int64(segment.EncryptedSize)) // ensure we get values, even if only zero values, so that redash can have an alert based on this mon.Meter("repair_too_many_nodes_failed").Mark(0) //mon:locked @@ -346,7 +346,15 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue defer func() { err = errs.Combine(err, segmentReader.Close()) }() // only report audit result when segment can be successfully downloaded - var report audit.Report + cachedNodesReputation := make(map[storj.NodeID]overlay.ReputationStatus, len(cachedNodesInfo)) + for id, info := range cachedNodesInfo { + cachedNodesReputation[id] = info.Reputation + } + + report := audit.Report{ + NodesReputation: cachedNodesReputation, + } + for _, piece := range piecesReport.Successful { report.Successes = append(report.Successes, piece.StorageNode) } diff --git a/satellite/reputation/benchmark_test.go b/satellite/reputation/benchmark_test.go index 93cf7ca3b..88598f7d2 100644 --- a/satellite/reputation/benchmark_test.go +++ b/satellite/reputation/benchmark_test.go @@ -36,7 +36,7 @@ func BenchmarkReputation(b *testing.B) { b.Run("UpdateStatsSuccess", func(b *testing.B) { for i := 0; i < b.N; i++ { id := all[i%len(all)] - _, _, err := reputationdb.Update(ctx, reputation.UpdateRequest{ + _, err := reputationdb.Update(ctx, reputation.UpdateRequest{ NodeID: id, AuditOutcome: reputation.AuditSuccess, AuditHistory: testAuditHistoryConfig(), @@ -48,7 +48,7 @@ func BenchmarkReputation(b *testing.B) { b.Run("UpdateStatsFailure", func(b *testing.B) { for i := 0; i < b.N; i++ { id := all[i%len(all)] - _, _, err := reputationdb.Update(ctx, reputation.UpdateRequest{ + _, err := reputationdb.Update(ctx, reputation.UpdateRequest{ NodeID: id, AuditOutcome: reputation.AuditFailure, AuditHistory: testAuditHistoryConfig(), @@ -60,7 +60,7 @@ func BenchmarkReputation(b *testing.B) { b.Run("UpdateStatsUnknown", func(b *testing.B) { for i := 0; i < b.N; i++ { id := all[i%len(all)] - _, _, err := reputationdb.Update(ctx, reputation.UpdateRequest{ + _, err := reputationdb.Update(ctx, reputation.UpdateRequest{ NodeID: id, AuditOutcome: reputation.AuditUnknown, AuditHistory: testAuditHistoryConfig(), @@ -72,7 +72,7 @@ func BenchmarkReputation(b *testing.B) { b.Run("UpdateStatsOffline", func(b *testing.B) { for i := 0; i < b.N; i++ { id := all[i%len(all)] - _, _, err := reputationdb.Update(ctx, reputation.UpdateRequest{ + _, err := reputationdb.Update(ctx, reputation.UpdateRequest{ NodeID: id, AuditOutcome: reputation.AuditOffline, AuditHistory: testAuditHistoryConfig(), diff --git a/satellite/reputation/db_test.go b/satellite/reputation/db_test.go index 248dfa8c9..33422bc22 100644 --- a/satellite/reputation/db_test.go +++ b/satellite/reputation/db_test.go @@ -38,27 +38,24 @@ func TestUpdate(t *testing.T) { AuditsRequiredForVetting: planet.Satellites[0].Config.Reputation.AuditCount, AuditHistory: testAuditHistoryConfig(), } - nodeStats, changed, err := db.Update(ctx, updateReq, time.Now()) + nodeStats, err := db.Update(ctx, updateReq, time.Now()) require.NoError(t, err) assert.Nil(t, nodeStats.VettedAt) - require.False(t, changed) // 2 audits -> vetted updateReq.NodeID = node.ID() updateReq.AuditOutcome = reputation.AuditOffline - nodeStats, changed, err = db.Update(ctx, updateReq, time.Now()) + nodeStats, err = db.Update(ctx, updateReq, time.Now()) require.NoError(t, err) assert.NotNil(t, nodeStats.VettedAt) - require.True(t, changed) // Don't overwrite node's vetted_at timestamp updateReq.NodeID = node.ID() updateReq.AuditOutcome = reputation.AuditSuccess - nodeStats2, changed, err := db.Update(ctx, updateReq, time.Now()) + nodeStats2, err := db.Update(ctx, updateReq, time.Now()) require.NoError(t, err) assert.NotNil(t, nodeStats2.VettedAt) assert.Equal(t, nodeStats.VettedAt, nodeStats2.VettedAt) - require.False(t, changed) }) } diff --git a/satellite/reputation/service.go b/satellite/reputation/service.go index 19727336d..27e8e2523 100644 --- a/satellite/reputation/service.go +++ b/satellite/reputation/service.go @@ -15,8 +15,7 @@ import ( // DB is an interface for storing reputation data. type DB interface { - Update(ctx context.Context, request UpdateRequest, now time.Time) (_ *overlay.ReputationStatus, changed bool, err error) - SetNodeStatus(ctx context.Context, id storj.NodeID, status overlay.ReputationStatus) error + Update(ctx context.Context, request UpdateRequest, now time.Time) (_ *overlay.ReputationStatus, err error) Get(ctx context.Context, nodeID storj.NodeID) (*Info, error) // UnsuspendNodeUnknownAudit unsuspends a storage node for unknown audits. @@ -67,10 +66,11 @@ func NewService(log *zap.Logger, overlay overlay.DB, db DB, config Config) *Serv } // ApplyAudit receives an audit result and applies it to the relevant node in DB. -func (service *Service) ApplyAudit(ctx context.Context, nodeID storj.NodeID, result AuditType) (err error) { +func (service *Service) ApplyAudit(ctx context.Context, nodeID storj.NodeID, reputation overlay.ReputationStatus, result AuditType) (err error) { defer mon.Task()(&ctx)(&err) - statusUpdate, changed, err := service.db.Update(ctx, UpdateRequest{ + now := time.Now() + statusUpdate, err := service.db.Update(ctx, UpdateRequest{ NodeID: nodeID, AuditOutcome: result, @@ -81,12 +81,18 @@ func (service *Service) ApplyAudit(ctx context.Context, nodeID storj.NodeID, res SuspensionDQEnabled: service.config.SuspensionDQEnabled, AuditsRequiredForVetting: service.config.AuditCount, AuditHistory: service.config.AuditHistory, - }, time.Now()) + }, now) if err != nil { return err } - if changed { + // only update node if its health status has changed, or it's a newly vetted + // node. + // this prevents the need to require caller of ApplyAudit() to always know + // the VettedAt time for a node. + // Due to inconsistencies in the precision of time.Now() on different platforms and databases, the time comparison + // for the VettedAt status is done using time values that are truncated to second precision. + if hasReputationChanged(*statusUpdate, reputation, now) { err = service.overlay.UpdateReputation(ctx, nodeID, statusUpdate) if err != nil { return err @@ -153,3 +159,30 @@ func (service *Service) TestUnsuspendNodeUnknownAudit(ctx context.Context, nodeI // Close closes resources. func (service *Service) Close() error { return nil } + +// hasReputationChanged determines if the current node reputation is different from the newly updated reputation. This +// function will only consider the Disqualified, UnknownAudiSuspended and OfflineSuspended statuses for changes. +func hasReputationChanged(updated, current overlay.ReputationStatus, now time.Time) bool { + if statusChanged(current.Disqualified, updated.Disqualified) || + statusChanged(current.UnknownAuditSuspended, updated.UnknownAuditSuspended) || + statusChanged(current.OfflineSuspended, updated.OfflineSuspended) { + return true + } + // check for newly vetted nodes. + // Due to inconsistencies in the precision of time.Now() on different platforms and databases, the time comparison + // for the VettedAt status is done using time values that are truncated to second precision. + if updated.VettedAt != nil && updated.VettedAt.Truncate(time.Second).Equal(now.Truncate(time.Second)) { + return true + } + return false +} + +// statusChanged determines if the two given statuses are different. +func statusChanged(s1, s2 *time.Time) bool { + if s1 == nil && s2 == nil { + return false + } else if s1 != nil && s2 != nil { + return !s1.Equal(*s1) + } + return true +} diff --git a/satellite/reputation/service_test.go b/satellite/reputation/service_test.go index ea287c367..f551eb0f7 100644 --- a/satellite/reputation/service_test.go +++ b/satellite/reputation/service_test.go @@ -15,6 +15,7 @@ import ( "storj.io/common/testrand" "storj.io/storj/private/testplanet" "storj.io/storj/satellite" + "storj.io/storj/satellite/overlay" "storj.io/storj/satellite/reputation" ) @@ -30,7 +31,7 @@ func TestConcurrentAudit(t *testing.T) { n := 5 for i := 0; i < n; i++ { group.Go(func() error { - err := planet.Satellites[0].Reputation.Service.ApplyAudit(ctx, planet.StorageNodes[0].ID(), reputation.AuditSuccess) + err := planet.Satellites[0].Reputation.Service.ApplyAudit(ctx, planet.StorageNodes[0].ID(), overlay.ReputationStatus{}, reputation.AuditSuccess) return err }) } @@ -63,19 +64,37 @@ func TestApplyAudit(t *testing.T) { require.NoError(t, err) require.Zero(t, node.TotalAuditCount) - err = service.ApplyAudit(ctx, nodeID, reputation.AuditSuccess) + status := overlay.ReputationStatus{ + Disqualified: node.Disqualified, + UnknownAuditSuspended: node.UnknownAuditSuspended, + OfflineSuspended: node.OfflineSuspended, + VettedAt: node.VettedAt, + } + err = service.ApplyAudit(ctx, nodeID, status, reputation.AuditSuccess) require.NoError(t, err) node, err = service.Get(ctx, nodeID) require.NoError(t, err) auditAlpha := node.AuditReputationAlpha auditBeta := node.AuditReputationBeta + status = overlay.ReputationStatus{ + Disqualified: node.Disqualified, + UnknownAuditSuspended: node.UnknownAuditSuspended, + OfflineSuspended: node.OfflineSuspended, + VettedAt: node.VettedAt, + } - err = service.ApplyAudit(ctx, nodeID, reputation.AuditSuccess) + err = service.ApplyAudit(ctx, nodeID, status, reputation.AuditSuccess) require.NoError(t, err) stats, err := service.Get(ctx, nodeID) require.NoError(t, err) + status = overlay.ReputationStatus{ + Disqualified: stats.Disqualified, + UnknownAuditSuspended: stats.UnknownAuditSuspended, + OfflineSuspended: stats.OfflineSuspended, + VettedAt: stats.VettedAt, + } expectedAuditAlpha := config.AuditLambda*auditAlpha + config.AuditWeight expectedAuditBeta := config.AuditLambda * auditBeta @@ -85,7 +104,7 @@ func TestApplyAudit(t *testing.T) { auditAlpha = expectedAuditAlpha auditBeta = expectedAuditBeta - err = service.ApplyAudit(ctx, nodeID, reputation.AuditFailure) + err = service.ApplyAudit(ctx, nodeID, status, reputation.AuditFailure) require.NoError(t, err) stats, err = service.Get(ctx, nodeID) diff --git a/satellite/reputation/suspension_test.go b/satellite/reputation/suspension_test.go index 4735119e1..e22152f98 100644 --- a/satellite/reputation/suspension_test.go +++ b/satellite/reputation/suspension_test.go @@ -16,6 +16,7 @@ import ( "storj.io/storj/private/testplanet" "storj.io/storj/satellite" "storj.io/storj/satellite/audit" + "storj.io/storj/satellite/overlay" "storj.io/storj/satellite/reputation" ) @@ -71,7 +72,7 @@ func TestAuditSuspendWithUpdateStats(t *testing.T) { testStartTime := time.Now() // give node one unknown audit - bringing unknown audit rep to 0.5, and suspending node - err = repService.ApplyAudit(ctx, nodeID, reputation.AuditUnknown) + err = repService.ApplyAudit(ctx, nodeID, node.Reputation.Status, reputation.AuditUnknown) require.NoError(t, err) reputationInfo, err := repService.Get(ctx, nodeID) @@ -91,7 +92,7 @@ func TestAuditSuspendWithUpdateStats(t *testing.T) { // give node two successful audits - bringing unknown audit rep to 0.75, and unsuspending node for i := 0; i < 2; i++ { - err = repService.ApplyAudit(ctx, nodeID, reputation.AuditSuccess) + err = repService.ApplyAudit(ctx, nodeID, node.Reputation.Status, reputation.AuditSuccess) require.NoError(t, err) } node, err = oc.Get(ctx, nodeID) @@ -116,7 +117,7 @@ func TestAuditSuspendFailedAudit(t *testing.T) { // give node one failed audit - bringing audit rep to 0.5, and disqualifying node // expect that suspended field and unknown audit reputation remain unchanged - err = repService.ApplyAudit(ctx, nodeID, reputation.AuditFailure) + err = repService.ApplyAudit(ctx, nodeID, node.Reputation.Status, reputation.AuditFailure) require.NoError(t, err) node, err = oc.Get(ctx, nodeID) @@ -152,19 +153,27 @@ func TestAuditSuspendExceedGracePeriod(t *testing.T) { require.NoError(t, err) } + nodesStatus := make(map[storj.NodeID]overlay.ReputationStatus) // no nodes should be disqualified for _, node := range (storj.NodeIDList{successNodeID, failNodeID, offlineNodeID, unknownNodeID}) { n, err := repService.Get(ctx, node) require.NoError(t, err) require.Nil(t, n.Disqualified) + nodesStatus[node] = overlay.ReputationStatus{ + Disqualified: n.Disqualified, + UnknownAuditSuspended: n.UnknownAuditSuspended, + OfflineSuspended: n.OfflineSuspended, + VettedAt: n.VettedAt, + } } // give one node a successful audit, one a failed audit, one an offline audit, and one an unknown audit report := audit.Report{ - Successes: storj.NodeIDList{successNodeID}, - Fails: storj.NodeIDList{failNodeID}, - Offlines: storj.NodeIDList{offlineNodeID}, - Unknown: storj.NodeIDList{unknownNodeID}, + Successes: storj.NodeIDList{successNodeID}, + Fails: storj.NodeIDList{failNodeID}, + Offlines: storj.NodeIDList{offlineNodeID}, + Unknown: storj.NodeIDList{unknownNodeID}, + NodesReputation: nodesStatus, } auditService := planet.Satellites[0].Audit _, err := auditService.Reporter.RecordAudits(ctx, report) @@ -208,20 +217,29 @@ func TestAuditSuspendDQDisabled(t *testing.T) { err := repService.TestSuspendNodeUnknownAudit(ctx, node, time.Now().Add(-2*time.Hour)) require.NoError(t, err) } + nodesStatus := make(map[storj.NodeID]overlay.ReputationStatus) // no nodes should be disqualified for _, node := range (storj.NodeIDList{successNodeID, failNodeID, offlineNodeID, unknownNodeID}) { n, err := oc.Get(ctx, node) require.NoError(t, err) require.Nil(t, n.Disqualified) + nodesStatus[node] = overlay.ReputationStatus{ + Disqualified: n.Disqualified, + UnknownAuditSuspended: n.UnknownAuditSuspended, + OfflineSuspended: n.OfflineSuspended, + VettedAt: n.Reputation.Status.VettedAt, + } + } // give one node a successful audit, one a failed audit, one an offline audit, and one an unknown audit report := audit.Report{ - Successes: storj.NodeIDList{successNodeID}, - Fails: storj.NodeIDList{failNodeID}, - Offlines: storj.NodeIDList{offlineNodeID}, - Unknown: storj.NodeIDList{unknownNodeID}, + Successes: storj.NodeIDList{successNodeID}, + Fails: storj.NodeIDList{failNodeID}, + Offlines: storj.NodeIDList{offlineNodeID}, + Unknown: storj.NodeIDList{unknownNodeID}, + NodesReputation: nodesStatus, } auditService := planet.Satellites[0].Audit _, err := auditService.Reporter.RecordAudits(ctx, report) @@ -289,7 +307,7 @@ func TestOfflineAuditSuspensionDisabled(t *testing.T) { // check that unsuspended node does not get suspended for i := 0; i <= int(trackingPeriodLength/windowSize); i++ { - _, _, err = reputationdb.Update(ctx, req, currentWindow) + _, err = reputationdb.Update(ctx, req, currentWindow) require.NoError(t, err) currentWindow = currentWindow.Add(windowSize) } @@ -302,7 +320,7 @@ func TestOfflineAuditSuspensionDisabled(t *testing.T) { // check that enabling flag suspends the node req.AuditHistory.OfflineSuspensionEnabled = true - _, _, err = reputationdb.Update(ctx, req, currentWindow) + _, err = reputationdb.Update(ctx, req, currentWindow) require.NoError(t, err) reputationInfo, err = reputationdb.Get(ctx, nodeID) @@ -313,7 +331,7 @@ func TestOfflineAuditSuspensionDisabled(t *testing.T) { // check that disabling flag clears suspension and under review req.AuditHistory.OfflineSuspensionEnabled = false - _, _, err = reputationdb.Update(ctx, req, currentWindow) + _, err = reputationdb.Update(ctx, req, currentWindow) require.NoError(t, err) reputationInfo, err = reputationdb.Get(ctx, nodeID) @@ -371,7 +389,7 @@ func TestOfflineSuspend(t *testing.T) { // give node an offline audit // node's score is still 1 until its first window is complete nextWindowTime := time.Now() - _, _, err = reputationdb.Update(ctx, updateReq, nextWindowTime) + _, err = reputationdb.Update(ctx, updateReq, nextWindowTime) require.NoError(t, err) reputationInfo, err := reputationdb.Get(ctx, nodeID) @@ -385,7 +403,7 @@ func TestOfflineSuspend(t *testing.T) { // node now has one full window, so its score should be 0 // should not be suspended or DQ since it only has 1 window out of 2 for tracking period - _, _, err = reputationdb.Update(ctx, updateReq, nextWindowTime) + _, err = reputationdb.Update(ctx, updateReq, nextWindowTime) require.NoError(t, err) reputationInfo, err = reputationdb.Get(ctx, nodeID) @@ -485,7 +503,7 @@ func setOnlineScore(ctx context.Context, reqPtr reputation.UpdateRequest, desire } updateReq.AuditHistory.GracePeriod = gracePeriod - _, _, err = reputationdb.Update(ctx, updateReq, nextWindowTime) + _, err = reputationdb.Update(ctx, updateReq, nextWindowTime) if err != nil { return nextWindowTime, err } diff --git a/satellite/satellitedb/overlaycache.go b/satellite/satellitedb/overlaycache.go index 142720111..7c5759f3c 100644 --- a/satellite/satellitedb/overlaycache.go +++ b/satellite/satellitedb/overlaycache.go @@ -287,6 +287,60 @@ func (cache *overlaycache) getOnlineNodesForGetDelete(ctx context.Context, nodeI return nodes, Error.Wrap(rows.Err()) } +// GetOnlineNodesForAuditRepair returns a map of nodes for the supplied nodeIDs. +func (cache *overlaycache) GetOnlineNodesForAuditRepair(ctx context.Context, nodeIDs []storj.NodeID, onlineWindow time.Duration) (nodes map[storj.NodeID]*overlay.NodeReputation, err error) { + for { + nodes, err = cache.getOnlineNodesForAuditRepair(ctx, nodeIDs, onlineWindow) + if err != nil { + if cockroachutil.NeedsRetry(err) { + continue + } + return nodes, err + } + break + } + + return nodes, err +} + +func (cache *overlaycache) getOnlineNodesForAuditRepair(ctx context.Context, nodeIDs []storj.NodeID, onlineWindow time.Duration) (_ map[storj.NodeID]*overlay.NodeReputation, err error) { + defer mon.Task()(&ctx)(&err) + + var rows tagsql.Rows + rows, err = cache.db.Query(ctx, cache.db.Rebind(` + SELECT last_net, id, address, last_ip_port, vetted_at, + unknown_audit_suspended, offline_suspended + FROM nodes + WHERE id = any($1::bytea[]) + AND disqualified IS NULL + AND exit_finished_at IS NULL + AND last_contact_success > $2 + `), pgutil.NodeIDArray(nodeIDs), time.Now().Add(-onlineWindow)) + if err != nil { + return nil, err + } + defer func() { err = errs.Combine(err, rows.Close()) }() + + nodes := make(map[storj.NodeID]*overlay.NodeReputation) + for rows.Next() { + var node overlay.NodeReputation + node.Address = &pb.NodeAddress{Transport: pb.NodeTransport_TCP_TLS_GRPC} + + var lastIPPort sql.NullString + err = rows.Scan(&node.LastNet, &node.ID, &node.Address.Address, &lastIPPort, &node.Reputation.VettedAt, &node.Reputation.UnknownAuditSuspended, &node.Reputation.OfflineSuspended) + if err != nil { + return nil, err + } + if lastIPPort.Valid { + node.LastIPPort = lastIPPort.String + } + + nodes[node.ID] = &node + } + + return nodes, Error.Wrap(rows.Err()) +} + // KnownOffline filters a set of nodes to offline nodes. func (cache *overlaycache) KnownOffline(ctx context.Context, criteria *overlay.NodeCriteria, nodeIDs storj.NodeIDList) (offlineNodes storj.NodeIDList, err error) { for { @@ -967,14 +1021,16 @@ func decodeWalletFeatures(encoded string) []string { func getNodeStats(dbNode *dbx.Node) *overlay.NodeStats { nodeStats := &overlay.NodeStats{ - Latency90: dbNode.Latency90, - VettedAt: dbNode.VettedAt, - LastContactSuccess: dbNode.LastContactSuccess, - LastContactFailure: dbNode.LastContactFailure, - Disqualified: dbNode.Disqualified, - UnknownAuditSuspended: dbNode.UnknownAuditSuspended, - OfflineUnderReview: dbNode.UnderReview, - OfflineSuspended: dbNode.OfflineSuspended, + Latency90: dbNode.Latency90, + LastContactSuccess: dbNode.LastContactSuccess, + LastContactFailure: dbNode.LastContactFailure, + OfflineUnderReview: dbNode.UnderReview, + Status: overlay.ReputationStatus{ + VettedAt: dbNode.VettedAt, + Disqualified: dbNode.Disqualified, + UnknownAuditSuspended: dbNode.UnknownAuditSuspended, + OfflineSuspended: dbNode.OfflineSuspended, + }, } return nodeStats } diff --git a/satellite/satellitedb/reputations.go b/satellite/satellitedb/reputations.go index e20ddd976..8d4c6a34f 100644 --- a/satellite/satellitedb/reputations.go +++ b/satellite/satellitedb/reputations.go @@ -34,26 +34,26 @@ type reputations struct { // 2. Depends on the result of the first step, // a. if existing row is returned, do compare-and-swap. // b. if no row found, insert a new row. -func (reputations *reputations) Update(ctx context.Context, updateReq reputation.UpdateRequest, now time.Time) (_ *overlay.ReputationStatus, changed bool, err error) { +func (reputations *reputations) Update(ctx context.Context, updateReq reputation.UpdateRequest, now time.Time) (_ *overlay.ReputationStatus, err error) { defer mon.Task()(&ctx)(&err) for { // get existing reputation stats dbNode, err := reputations.db.Get_Reputation_By_Id(ctx, dbx.Reputation_Id(updateReq.NodeID.Bytes())) if err != nil && !errors.Is(err, sql.ErrNoRows) { - return nil, false, Error.Wrap(err) + return nil, Error.Wrap(err) } // if this is a new node, we will insert a new entry into the table if dbNode == nil { historyBytes, err := pb.Marshal(&internalpb.AuditHistory{}) if err != nil { - return nil, false, Error.Wrap(err) + return nil, Error.Wrap(err) } auditHistoryResponse, err := reputations.UpdateAuditHistory(ctx, historyBytes, updateReq, now) if err != nil { - return nil, false, Error.Wrap(err) + return nil, Error.Wrap(err) } // set default reputation stats for new node @@ -76,30 +76,23 @@ func (reputations *reputations) Update(ctx context.Context, updateReq reputation continue } - return nil, false, Error.Wrap(err) + return nil, Error.Wrap(err) } rep := getNodeStatus(stats) - return &rep, !rep.Equal(overlay.ReputationStatus{}), nil + return &rep, nil } - // do not update reputation if node is disqualified - if dbNode.Disqualified != nil { - return nil, false, nil - } - - oldStats := getNodeStatus(dbNode) - auditHistoryResponse, err := reputations.UpdateAuditHistory(ctx, dbNode.AuditHistory, updateReq, now) if err != nil { - return nil, false, Error.Wrap(err) + return nil, Error.Wrap(err) } updateFields := reputations.populateUpdateFields(dbNode, updateReq, auditHistoryResponse, now) oldAuditHistory := dbx.Reputation_AuditHistory(dbNode.AuditHistory) dbNode, err = reputations.db.Update_Reputation_By_Id_And_AuditHistory(ctx, dbx.Reputation_Id(updateReq.NodeID.Bytes()), oldAuditHistory, updateFields) if err != nil && !errors.Is(err, sql.ErrNoRows) { - return nil, false, Error.Wrap(err) + return nil, Error.Wrap(err) } // if update failed due to concurrent audit_history updates, we will try @@ -110,30 +103,11 @@ func (reputations *reputations) Update(ctx context.Context, updateReq reputation } newStats := getNodeStatus(dbNode) - return &newStats, !newStats.Equal(oldStats), nil + return &newStats, nil } } -// SetNodeStatus updates node reputation status. -func (reputations *reputations) SetNodeStatus(ctx context.Context, id storj.NodeID, status overlay.ReputationStatus) (err error) { - defer mon.Task()(&ctx)(&err) - - updateFields := dbx.Reputation_Update_Fields{ - Disqualified: dbx.Reputation_Disqualified_Raw(status.Disqualified), - UnknownAuditSuspended: dbx.Reputation_UnknownAuditSuspended_Raw(status.UnknownAuditSuspended), - OfflineSuspended: dbx.Reputation_OfflineSuspended_Raw(status.OfflineSuspended), - VettedAt: dbx.Reputation_VettedAt_Raw(status.VettedAt), - } - - _, err = reputations.db.Update_Reputation_By_Id(ctx, dbx.Reputation_Id(id.Bytes()), updateFields) - if err != nil { - return Error.Wrap(err) - } - - return nil - -} func (reputations *reputations) Get(ctx context.Context, nodeID storj.NodeID) (*reputation.Info, error) { res, err := reputations.db.Get_Reputation_By_Id(ctx, dbx.Reputation_Id(nodeID.Bytes())) if err != nil {