diff --git a/monkit.lock b/monkit.lock index f2c34322b..6fe21a88e 100644 --- a/monkit.lock +++ b/monkit.lock @@ -89,6 +89,8 @@ storj.io/storj/satellite/repair/repairer."time_for_repair" FloatVal storj.io/storj/satellite/repair/repairer."time_since_checker_queue" FloatVal storj.io/storj/satellite/satellitedb."audit_reputation_alpha" FloatVal storj.io/storj/satellite/satellitedb."audit_reputation_beta" FloatVal +storj.io/storj/satellite/satellitedb."unknown_audit_reputation_alpha" FloatVal +storj.io/storj/satellite/satellitedb."unknown_audit_reputation_beta" FloatVal storj.io/storj/storage/filestore."open_file_in_trash" Meter storj.io/storj/storagenode/contact."satellite_contact_request" Meter storj.io/storj/storagenode/gracefulexit."satellite_gracefulexit_request" Meter diff --git a/satellite/accounting/rollup/rollup_test.go b/satellite/accounting/rollup/rollup_test.go index 368c33459..f50b1a2ee 100644 --- a/satellite/accounting/rollup/rollup_test.go +++ b/satellite/accounting/rollup/rollup_test.go @@ -218,7 +218,7 @@ func dqNodes(ctx *testcontext.Context, planet *testplanet.Planet) (map[storj.Nod updateRequests = append(updateRequests, &overlay.UpdateRequest{ NodeID: n.ID(), IsUp: true, - AuditSuccess: false, + AuditOutcome: overlay.AuditFailure, }) } diff --git a/satellite/audit/disqualification_test.go b/satellite/audit/disqualification_test.go index 39431cd7f..55b5a3e73 100644 --- a/satellite/audit/disqualification_test.go +++ b/satellite/audit/disqualification_test.go @@ -227,7 +227,7 @@ func TestDisqualifiedNodeRemainsDisqualified(t *testing.T) { _, err = satellitePeer.DB.OverlayCache().BatchUpdateStats(ctx, []*overlay.UpdateRequest{{ NodeID: disqualifiedNode.ID(), IsUp: true, - AuditSuccess: true, + AuditOutcome: overlay.AuditSuccess, AuditLambda: 0, // forget about history AuditWeight: 1, AuditDQ: 0, // make sure new reputation scores are larger than the DQ thresholds diff --git a/satellite/audit/reporter.go b/satellite/audit/reporter.go index 6ef7e3a53..778c232fc 100644 --- a/satellite/audit/reporter.go +++ b/satellite/audit/reporter.go @@ -54,12 +54,14 @@ func (reporter *Reporter) RecordAudits(ctx context.Context, req Report, path sto successes := req.Successes fails := req.Fails + unknowns := req.Unknown offlines := req.Offlines pendingAudits := req.PendingAudits reporter.log.Debug("Reporting audits", zap.Int("successes", len(successes)), zap.Int("failures", len(fails)), + zap.Int("unknowns", len(unknowns)), zap.Int("offlines", len(offlines)), zap.Int("pending", len(pendingAudits)), zap.Binary("Segment", []byte(path)), @@ -70,7 +72,7 @@ func (reporter *Reporter) RecordAudits(ctx context.Context, req Report, path sto tries := 0 for tries <= reporter.maxRetries { - if len(successes) == 0 && len(fails) == 0 && len(offlines) == 0 && len(pendingAudits) == 0 { + if len(successes) == 0 && len(fails) == 0 && len(unknowns) == 0 && len(offlines) == 0 && len(pendingAudits) == 0 { return Report{}, nil } @@ -88,6 +90,12 @@ func (reporter *Reporter) RecordAudits(ctx context.Context, req Report, path sto errlist.Add(err) } } + if len(unknowns) > 0 { + unknowns, err = reporter.recordAuditUnknownStatus(ctx, unknowns) + if err != nil { + errlist.Add(err) + } + } // We do not report offline nodes to the overlay at this time; see V3-3025. if len(offlines) > 0 && reportOfflineDuringAudit { offlines, err = reporter.recordOfflineStatus(ctx, offlines) @@ -111,13 +119,14 @@ func (reporter *Reporter) RecordAudits(ctx context.Context, req Report, path sto Successes: successes, Fails: fails, Offlines: offlines, + Unknown: unknowns, PendingAudits: pendingAudits, }, errs.Combine(Error.New("some nodes failed to be updated in overlay"), err) } return Report{}, nil } -// recordAuditFailStatus updates nodeIDs in overlay with isup=true, auditsuccess=false +// recordAuditFailStatus updates nodeIDs in overlay with isup=true, auditoutcome=fail func (reporter *Reporter) recordAuditFailStatus(ctx context.Context, failedAuditNodeIDs storj.NodeIDList) (failed storj.NodeIDList, err error) { defer mon.Task()(&ctx)(&err) @@ -126,16 +135,35 @@ func (reporter *Reporter) recordAuditFailStatus(ctx context.Context, failedAudit updateRequests[i] = &overlay.UpdateRequest{ NodeID: nodeID, IsUp: true, - AuditSuccess: false, + AuditOutcome: overlay.AuditFailure, } } - if len(updateRequests) > 0 { - failed, err = reporter.overlay.BatchUpdateStats(ctx, updateRequests) - if err != nil || len(failed) > 0 { - reporter.log.Debug("failed to record Failed Nodes ", zap.Strings("NodeIDs", failed.Strings())) - return failed, errs.Combine(Error.New("failed to record some audit fail statuses in overlay"), err) + failed, err = reporter.overlay.BatchUpdateStats(ctx, updateRequests) + if err != nil || len(failed) > 0 { + reporter.log.Debug("failed to record Failed Nodes ", zap.Strings("NodeIDs", failed.Strings())) + return failed, errs.Combine(Error.New("failed to record some audit fail statuses in overlay"), err) + } + return nil, nil +} + +// recordAuditUnknownStatus updates nodeIDs in overlay with isup=true, auditoutcome=unknown +func (reporter *Reporter) recordAuditUnknownStatus(ctx context.Context, unknownAuditNodeIDs storj.NodeIDList) (failed storj.NodeIDList, err error) { + defer mon.Task()(&ctx)(&err) + + updateRequests := make([]*overlay.UpdateRequest, len(unknownAuditNodeIDs)) + for i, nodeID := range unknownAuditNodeIDs { + updateRequests[i] = &overlay.UpdateRequest{ + NodeID: nodeID, + IsUp: true, + AuditOutcome: overlay.AuditUnknown, } } + + failed, err = reporter.overlay.BatchUpdateStats(ctx, updateRequests) + if err != nil || len(failed) > 0 { + reporter.log.Debug("failed to record Unknown Nodes ", zap.Strings("NodeIDs", failed.Strings())) + return failed, errs.Combine(Error.New("failed to record some audit unknown statuses in overlay"), err) + } return nil, nil } @@ -160,7 +188,7 @@ func (reporter *Reporter) recordOfflineStatus(ctx context.Context, offlineNodeID return nil, nil } -// recordAuditSuccessStatus updates nodeIDs in overlay with isup=true, auditsuccess=true +// recordAuditSuccessStatus updates nodeIDs in overlay with isup=true, auditoutcome=success func (reporter *Reporter) recordAuditSuccessStatus(ctx context.Context, successNodeIDs storj.NodeIDList) (failed storj.NodeIDList, err error) { defer mon.Task()(&ctx)(&err) @@ -169,16 +197,14 @@ func (reporter *Reporter) recordAuditSuccessStatus(ctx context.Context, successN updateRequests[i] = &overlay.UpdateRequest{ NodeID: nodeID, IsUp: true, - AuditSuccess: true, + AuditOutcome: overlay.AuditSuccess, } } - if len(updateRequests) > 0 { - failed, err = reporter.overlay.BatchUpdateStats(ctx, updateRequests) - if err != nil || len(failed) > 0 { - reporter.log.Debug("failed to record Success Nodes ", zap.Strings("NodeIDs", failed.Strings())) - return failed, errs.Combine(Error.New("failed to record some audit success statuses in overlay"), err) - } + failed, err = reporter.overlay.BatchUpdateStats(ctx, updateRequests) + if err != nil || len(failed) > 0 { + reporter.log.Debug("failed to record Success Nodes ", zap.Strings("NodeIDs", failed.Strings())) + return failed, errs.Combine(Error.New("failed to record some audit success statuses in overlay"), err) } return nil, nil } @@ -201,35 +227,33 @@ func (reporter *Reporter) recordPendingAudits(ctx context.Context, pendingAudits updateRequests = append(updateRequests, &overlay.UpdateRequest{ NodeID: pendingAudit.NodeID, IsUp: true, - AuditSuccess: false, + AuditOutcome: overlay.AuditFailure, }) } } - if len(updateRequests) > 0 { - failedBatch, err := reporter.overlay.BatchUpdateStats(ctx, updateRequests) - if err != nil { - errlist.Add(err) + failedBatch, err := reporter.overlay.BatchUpdateStats(ctx, updateRequests) + if err != nil { + errlist.Add(err) + } + if len(failedBatch) > 0 { + pendingMap := make(map[storj.NodeID]*PendingAudit) + for _, pendingAudit := range pendingAudits { + pendingMap[pendingAudit.NodeID] = pendingAudit } - if len(failedBatch) > 0 { - pendingMap := make(map[storj.NodeID]*PendingAudit) - for _, pendingAudit := range pendingAudits { - pendingMap[pendingAudit.NodeID] = pendingAudit - } - for _, nodeID := range failedBatch { - pending, ok := pendingMap[nodeID] - if ok { - failed = append(failed, pending) - } + for _, nodeID := range failedBatch { + pending, ok := pendingMap[nodeID] + if ok { + failed = append(failed, pending) } } + } - if len(failed) > 0 { - for _, v := range failed { - reporter.log.Debug("failed to record Pending Nodes ", zap.Stringer("NodeID", v.NodeID), zap.String("Path", v.Path)) - } - return failed, errs.Combine(Error.New("failed to record some pending audits"), errlist.Err()) + if len(failed) > 0 { + for _, v := range failed { + reporter.log.Debug("failed to record Pending Nodes ", zap.Stringer("NodeID", v.NodeID), zap.String("Path", v.Path)) } + return failed, errs.Combine(Error.New("failed to record some pending audits"), errlist.Err()) } return nil, nil } diff --git a/satellite/audit/reverify_test.go b/satellite/audit/reverify_test.go index f39fe3b75..9c73a6ac9 100644 --- a/satellite/audit/reverify_test.go +++ b/satellite/audit/reverify_test.go @@ -1258,13 +1258,12 @@ func TestReverifyUnknownError(t *testing.T) { require.Len(t, report.Unknown, 1) require.Equal(t, report.Unknown[0], badNode) - // TODO uncomment this stuff when suspension mode is implemented - //// record audit - //_, err = audits.Reporter.RecordAudits(ctx, report, path) - //require.NoError(t, err) - // - //// make sure that pending audit is removed by the reporter when audit is recorded - //_, err = containment.Get(ctx, pending.NodeID) - //require.True(t, audit.ErrContainedNotFound.Has(err)) + // record audit + _, err = audits.Reporter.RecordAudits(ctx, report, path) + require.NoError(t, err) + + // make sure that pending audit is removed by the reporter when audit is recorded + _, err = containment.Get(ctx, pending.NodeID) + require.True(t, audit.ErrContainedNotFound.Has(err)) }) } diff --git a/satellite/overlay/benchmark_test.go b/satellite/overlay/benchmark_test.go index 1adc3fcfb..91ae7cc32 100644 --- a/satellite/overlay/benchmark_test.go +++ b/satellite/overlay/benchmark_test.go @@ -77,9 +77,13 @@ func BenchmarkOverlay(b *testing.B) { b.Run("UpdateStats", func(b *testing.B) { for i := 0; i < b.N; i++ { id := all[i%len(all)] + outcome := overlay.AuditFailure + if i&1 == 0 { + outcome = overlay.AuditSuccess + } _, err := overlaydb.UpdateStats(ctx, &overlay.UpdateRequest{ NodeID: id, - AuditSuccess: i&1 == 0, + AuditOutcome: outcome, IsUp: i&2 == 0, }) require.NoError(b, err) @@ -90,9 +94,13 @@ func BenchmarkOverlay(b *testing.B) { var updateRequests []*overlay.UpdateRequest for i := 0; i < b.N; i++ { id := all[i%len(all)] + outcome := overlay.AuditFailure + if i&1 == 0 { + outcome = overlay.AuditSuccess + } updateRequests = append(updateRequests, &overlay.UpdateRequest{ NodeID: id, - AuditSuccess: i&1 == 0, + AuditOutcome: outcome, IsUp: i&2 == 0, }) diff --git a/satellite/overlay/selection_test.go b/satellite/overlay/selection_test.go index 0852e4f04..6b6e0c1b7 100644 --- a/satellite/overlay/selection_test.go +++ b/satellite/overlay/selection_test.go @@ -137,7 +137,7 @@ func TestNodeSelection(t *testing.T) { _, err := satellite.DB.OverlayCache().UpdateStats(ctx, &overlay.UpdateRequest{ NodeID: node.ID(), IsUp: true, - AuditSuccess: true, + AuditOutcome: overlay.AuditSuccess, AuditLambda: 1, AuditWeight: 1, AuditDQ: 0.5, }) require.NoError(t, err) @@ -162,7 +162,7 @@ func TestNodeSelectionWithBatch(t *testing.T) { _, err := satellite.DB.OverlayCache().BatchUpdateStats(ctx, []*overlay.UpdateRequest{{ NodeID: node.ID(), IsUp: true, - AuditSuccess: true, + AuditOutcome: overlay.AuditSuccess, AuditLambda: 1, AuditWeight: 1, AuditDQ: 0.5, }}, 1) require.NoError(t, err) @@ -282,7 +282,7 @@ func TestNodeSelectionGracefulExit(t *testing.T) { _, err := satellite.DB.OverlayCache().UpdateStats(ctx, &overlay.UpdateRequest{ NodeID: node.ID(), IsUp: true, - AuditSuccess: true, + AuditOutcome: overlay.AuditSuccess, AuditLambda: 1, AuditWeight: 1, AuditDQ: 0.5, }) require.NoError(t, err) @@ -485,7 +485,7 @@ func TestDistinctIPs(t *testing.T) { _, err := satellite.DB.OverlayCache().UpdateStats(ctx, &overlay.UpdateRequest{ NodeID: planet.StorageNodes[i].ID(), IsUp: true, - AuditSuccess: true, + AuditOutcome: overlay.AuditSuccess, AuditLambda: 1, AuditWeight: 1, AuditDQ: 0.5, @@ -513,7 +513,7 @@ func TestDistinctIPsWithBatch(t *testing.T) { _, err := satellite.DB.OverlayCache().BatchUpdateStats(ctx, []*overlay.UpdateRequest{{ NodeID: planet.StorageNodes[i].ID(), IsUp: true, - AuditSuccess: true, + AuditOutcome: overlay.AuditSuccess, AuditLambda: 1, AuditWeight: 1, AuditDQ: 0.5, diff --git a/satellite/overlay/service.go b/satellite/overlay/service.go index 9c1ff994a..9c8a451a2 100644 --- a/satellite/overlay/service.go +++ b/satellite/overlay/service.go @@ -94,6 +94,11 @@ type DB interface { // DisqualifyNode disqualifies a storage node. DisqualifyNode(ctx context.Context, nodeID storj.NodeID) (err error) + + // SuspendNode suspends a storage node. + SuspendNode(ctx context.Context, nodeID storj.NodeID, suspendedAt time.Time) (err error) + // UnsuspendNode unsuspends a storage node. + UnsuspendNode(ctx context.Context, nodeID storj.NodeID) (err error) } // NodeCheckInInfo contains all the info that will be updated when a node checkins @@ -128,10 +133,22 @@ type NodeCriteria struct { DistinctIP bool } +// AuditType is an enum representing the outcome of a particular audit reported to the overlay. +type AuditType int + +const ( + // AuditSuccess represents a successful audit. + AuditSuccess AuditType = iota + // AuditFailure represents a failed audit. + AuditFailure + // AuditUnknown represents an audit that resulted in an unknown error from the node. + AuditUnknown +) + // UpdateRequest is used to update a node status. type UpdateRequest struct { NodeID storj.NodeID - AuditSuccess bool + AuditOutcome AuditType IsUp bool // n.b. these are set values from the satellite. // They are part of the UpdateRequest struct in order to be @@ -169,6 +186,7 @@ type NodeDossier struct { Version pb.NodeVersion Contained bool Disqualified *time.Time + Suspended *time.Time PieceCount int64 ExitStatus ExitStatus CreatedAt time.Time @@ -178,16 +196,19 @@ type NodeDossier struct { // NodeStats contains statistics about a node. type NodeStats struct { - Latency90 int64 - AuditSuccessCount int64 - AuditCount int64 - UptimeSuccessCount int64 - UptimeCount int64 - LastContactSuccess time.Time - LastContactFailure time.Time - AuditReputationAlpha float64 - AuditReputationBeta float64 - Disqualified *time.Time + Latency90 int64 + AuditSuccessCount int64 + AuditCount int64 + UptimeSuccessCount int64 + UptimeCount int64 + LastContactSuccess time.Time + LastContactFailure time.Time + AuditReputationAlpha float64 + AuditReputationBeta float64 + Disqualified *time.Time + UnknownAuditReputationAlpha float64 + UnknownAuditReputationBeta float64 + Suspended *time.Time } // NodeLastContact contains the ID, address, and timestamp diff --git a/satellite/overlay/service_test.go b/satellite/overlay/service_test.go index cf11fd38d..077975416 100644 --- a/satellite/overlay/service_test.go +++ b/satellite/overlay/service_test.go @@ -134,7 +134,7 @@ func testCache(ctx context.Context, t *testing.T, store overlay.DB) { stats, err := service.UpdateStats(ctx, &overlay.UpdateRequest{ NodeID: valid1ID, IsUp: true, - AuditSuccess: false, + AuditOutcome: overlay.AuditFailure, }) require.NoError(t, err) newAuditAlpha := 1 @@ -151,7 +151,7 @@ func testCache(ctx context.Context, t *testing.T, store overlay.DB) { _, err = service.BatchUpdateStats(ctx, []*overlay.UpdateRequest{{ NodeID: valid2ID, IsUp: false, - AuditSuccess: true, + AuditOutcome: overlay.AuditSuccess, }}) require.NoError(t, err) dossier, err := service.Get(ctx, valid2ID) @@ -197,7 +197,7 @@ func TestRandomizedSelection(t *testing.T) { _, err = cache.UpdateStats(ctx, &overlay.UpdateRequest{ NodeID: newID, IsUp: true, - AuditSuccess: true, + AuditOutcome: overlay.AuditSuccess, AuditLambda: 1, AuditWeight: 1, AuditDQ: 0.5, @@ -299,7 +299,7 @@ func TestKnownReliable(t *testing.T) { // Disqualify storage node #0 stats, err := service.UpdateStats(ctx, &overlay.UpdateRequest{ NodeID: planet.StorageNodes[0].ID(), - AuditSuccess: false, + AuditOutcome: overlay.AuditFailure, }) require.NoError(t, err) require.NotNil(t, stats.Disqualified) diff --git a/satellite/overlay/statdb_test.go b/satellite/overlay/statdb_test.go index 18fc0496e..ddf80969a 100644 --- a/satellite/overlay/statdb_test.go +++ b/satellite/overlay/statdb_test.go @@ -51,7 +51,7 @@ func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) { // update stats so node disqualification is triggered _, err = cache.UpdateStats(ctx, &overlay.UpdateRequest{ NodeID: tt.nodeID, - AuditSuccess: true, + AuditOutcome: overlay.AuditSuccess, IsUp: true, AuditLambda: 1, AuditWeight: 1, AuditDQ: 0.9, @@ -133,7 +133,7 @@ func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) { updateReq := &overlay.UpdateRequest{ NodeID: nodeID, - AuditSuccess: true, + AuditOutcome: overlay.AuditSuccess, IsUp: true, AuditLambda: 0.123, AuditWeight: 0.456, AuditDQ: 0, // don't disqualify for any reason @@ -149,7 +149,7 @@ func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) { auditAlpha = expectedAuditAlpha auditBeta = expectedAuditBeta - updateReq.AuditSuccess = false + updateReq.AuditOutcome = overlay.AuditFailure updateReq.IsUp = false stats, err = cache.UpdateStats(ctx, updateReq) require.NoError(t, err) diff --git a/satellite/overlay/suspension_test.go b/satellite/overlay/suspension_test.go new file mode 100644 index 000000000..c25f53e7b --- /dev/null +++ b/satellite/overlay/suspension_test.go @@ -0,0 +1,131 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +package overlay_test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "storj.io/common/testcontext" + "storj.io/storj/private/testplanet" + "storj.io/storj/satellite/overlay" +) + +// TestSuspendBasic ensures that we can suspend a node using overlayService.SuspendNode and that we can unsuspend a node using overlayservice.UnsuspendNode +func TestSuspendBasic(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + nodeID := planet.StorageNodes[0].ID() + oc := planet.Satellites[0].Overlay.DB + + node, err := oc.Get(ctx, nodeID) + require.NoError(t, err) + require.Nil(t, node.Suspended) + + timeToSuspend := time.Now().UTC().Truncate(time.Second) + err = oc.SuspendNode(ctx, nodeID, timeToSuspend) + require.NoError(t, err) + + node, err = oc.Get(ctx, nodeID) + require.NoError(t, err) + require.NotNil(t, node.Suspended) + require.True(t, node.Suspended.Equal(timeToSuspend)) + + err = oc.UnsuspendNode(ctx, nodeID) + require.NoError(t, err) + + node, err = oc.Get(ctx, nodeID) + require.NoError(t, err) + require.Nil(t, node.Suspended) + }) +} + +// TestSuspendWithUpdateStats ensures that a node goes into suspension node from getting enough unknown audits, and gets removed from getting enough successful audits. +func TestSuspendWithUpdateStats(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + nodeID := planet.StorageNodes[0].ID() + oc := planet.Satellites[0].Overlay.Service + + node, err := oc.Get(ctx, nodeID) + require.NoError(t, err) + require.Nil(t, node.Suspended) + + testStartTime := time.Now() + + // give node one unknown audit - bringing unknown audit rep to 0.5, and suspending node + _, err = oc.UpdateStats(ctx, &overlay.UpdateRequest{ + NodeID: nodeID, + AuditOutcome: overlay.AuditUnknown, + IsUp: true, + AuditLambda: 1, + AuditWeight: 1, + AuditDQ: 0.6, + }) + require.NoError(t, err) + + node, err = oc.Get(ctx, nodeID) + require.NoError(t, err) + require.NotNil(t, node.Suspended) + require.True(t, node.Suspended.After(testStartTime)) + // expect node is not disqualified and that normal audit alpha/beta remain unchanged + require.Nil(t, node.Disqualified) + require.EqualValues(t, node.Reputation.AuditReputationAlpha, 1) + require.EqualValues(t, node.Reputation.AuditReputationBeta, 0) + + // give node two successful audits - bringing unknown audit rep to 0.75, and unsuspending node + for i := 0; i < 2; i++ { + _, err = oc.UpdateStats(ctx, &overlay.UpdateRequest{ + NodeID: nodeID, + AuditOutcome: overlay.AuditSuccess, + IsUp: true, + AuditLambda: 1, + AuditWeight: 1, + AuditDQ: 0.6, + }) + require.NoError(t, err) + } + node, err = oc.Get(ctx, nodeID) + require.NoError(t, err) + require.Nil(t, node.Suspended) + }) +} + +// TestSuspendFailedAudit ensures that a node is not suspended for a failed audit. +func TestSuspendFailedAudit(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + nodeID := planet.StorageNodes[0].ID() + oc := planet.Satellites[0].Overlay.DB + + node, err := oc.Get(ctx, nodeID) + require.NoError(t, err) + require.Nil(t, node.Disqualified) + require.Nil(t, node.Suspended) + + // give node one failed audit - bringing audit rep to 0.5, and disqualifying node + // expect that suspended field and unknown audit reputation remain unchanged + _, err = oc.UpdateStats(ctx, &overlay.UpdateRequest{ + NodeID: nodeID, + AuditOutcome: overlay.AuditFailure, + IsUp: true, + AuditLambda: 1, + AuditWeight: 1, + AuditDQ: 0.6, + }) + require.NoError(t, err) + + node, err = oc.Get(ctx, nodeID) + require.NoError(t, err) + require.NotNil(t, node.Disqualified) + require.Nil(t, node.Suspended) + require.EqualValues(t, node.Reputation.UnknownAuditReputationAlpha, 1) + require.EqualValues(t, node.Reputation.UnknownAuditReputationBeta, 0) + }) +} diff --git a/satellite/repair/repairer/segments.go b/satellite/repair/repairer/segments.go index 497b88024..65ab8d5cc 100644 --- a/satellite/repair/repairer/segments.go +++ b/satellite/repair/repairer/segments.go @@ -328,7 +328,7 @@ func (repairer *SegmentRepairer) updateAuditFailStatus(ctx context.Context, fail updateRequests[i] = &overlay.UpdateRequest{ NodeID: nodeID, IsUp: true, - AuditSuccess: false, + AuditOutcome: overlay.AuditFailure, } } if len(updateRequests) > 0 { diff --git a/satellite/satellitedb/overlaycache.go b/satellite/satellitedb/overlaycache.go index 63970daf1..4e67c3a2f 100644 --- a/satellite/satellitedb/overlaycache.go +++ b/satellite/satellitedb/overlaycache.go @@ -802,6 +802,38 @@ func (cache *overlaycache) DisqualifyNode(ctx context.Context, nodeID storj.Node return nil } +// SuspendNode suspends a storage node. +func (cache *overlaycache) SuspendNode(ctx context.Context, nodeID storj.NodeID, suspendedAt time.Time) (err error) { + defer mon.Task()(&ctx)(&err) + updateFields := dbx.Node_Update_Fields{} + updateFields.Suspended = dbx.Node_Suspended(suspendedAt.UTC()) + + dbNode, err := cache.db.Update_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()), updateFields) + if err != nil { + return err + } + if dbNode == nil { + return errs.New("unable to get node by ID: %v", nodeID) + } + return nil +} + +// UnsuspendNode unsuspends a storage node. +func (cache *overlaycache) UnsuspendNode(ctx context.Context, nodeID storj.NodeID) (err error) { + defer mon.Task()(&ctx)(&err) + updateFields := dbx.Node_Update_Fields{} + updateFields.Suspended = dbx.Node_Suspended_Null() + + dbNode, err := cache.db.Update_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()), updateFields) + if err != nil { + return err + } + if dbNode == nil { + return errs.New("unable to get node by ID: %v", nodeID) + } + return nil +} + // AllPieceCounts returns a map of node IDs to piece counts from the db. // NB: a valid, partial piece map can be returned even if node ID parsing error(s) are returned. func (cache *overlaycache) AllPieceCounts(ctx context.Context) (_ map[storj.NodeID]int, err error) { @@ -1126,6 +1158,7 @@ func convertDBNode(ctx context.Context, info *dbx.Node) (_ *overlay.NodeDossier, }, Contained: info.Contained, Disqualified: info.Disqualified, + Suspended: info.Suspended, PieceCount: info.PieceCount, ExitStatus: exitStatus, CreatedAt: info.CreatedAt, @@ -1159,16 +1192,19 @@ func convertDBNodeToPBNode(ctx context.Context, info *dbx.Id_LastNet_LastIpPort_ func getNodeStats(dbNode *dbx.Node) *overlay.NodeStats { nodeStats := &overlay.NodeStats{ - Latency90: dbNode.Latency90, - AuditCount: dbNode.TotalAuditCount, - AuditSuccessCount: dbNode.AuditSuccessCount, - UptimeCount: dbNode.TotalUptimeCount, - UptimeSuccessCount: dbNode.UptimeSuccessCount, - LastContactSuccess: dbNode.LastContactSuccess, - LastContactFailure: dbNode.LastContactFailure, - AuditReputationAlpha: dbNode.AuditReputationAlpha, - AuditReputationBeta: dbNode.AuditReputationBeta, - Disqualified: dbNode.Disqualified, + Latency90: dbNode.Latency90, + AuditCount: dbNode.TotalAuditCount, + AuditSuccessCount: dbNode.AuditSuccessCount, + UptimeCount: dbNode.TotalUptimeCount, + UptimeSuccessCount: dbNode.UptimeSuccessCount, + LastContactSuccess: dbNode.LastContactSuccess, + LastContactFailure: dbNode.LastContactFailure, + AuditReputationAlpha: dbNode.AuditReputationAlpha, + AuditReputationBeta: dbNode.AuditReputationBeta, + Disqualified: dbNode.Disqualified, + UnknownAuditReputationAlpha: dbNode.UnknownAuditReputationAlpha, + UnknownAuditReputationBeta: dbNode.UnknownAuditReputationBeta, + Suspended: dbNode.Suspended, } return nodeStats } @@ -1225,6 +1261,13 @@ func buildUpdateStatement(update updateNodeStats) string { atLeastOne = true sql += fmt.Sprintf("disqualified = '%v'", update.Disqualified.value.Format(time.RFC3339Nano)) } + if update.Suspended.set { + if atLeastOne { + sql += "," + } + atLeastOne = true + sql += fmt.Sprintf("suspended = '%v'", update.Suspended.value.Format(time.RFC3339Nano)) + } if update.UptimeSuccessCount.set { if atLeastOne { sql += "," @@ -1289,34 +1332,84 @@ type boolField struct { type timeField struct { set bool + isNil bool value time.Time } type updateNodeStats struct { - NodeID storj.NodeID - TotalAuditCount int64Field - TotalUptimeCount int64Field - AuditReputationAlpha float64Field - AuditReputationBeta float64Field - Disqualified timeField - UptimeSuccessCount int64Field - LastContactSuccess timeField - LastContactFailure timeField - AuditSuccessCount int64Field - Contained boolField + NodeID storj.NodeID + TotalAuditCount int64Field + TotalUptimeCount int64Field + AuditReputationAlpha float64Field + AuditReputationBeta float64Field + Disqualified timeField + UnknownAuditReputationAlpha float64Field + UnknownAuditReputationBeta float64Field + Suspended timeField + UptimeSuccessCount int64Field + LastContactSuccess timeField + LastContactFailure timeField + AuditSuccessCount int64Field + Contained boolField } func populateUpdateNodeStats(dbNode *dbx.Node, updateReq *overlay.UpdateRequest) updateNodeStats { - auditAlpha, auditBeta, totalAuditCount := updateReputation( - updateReq.AuditSuccess, - dbNode.AuditReputationAlpha, - dbNode.AuditReputationBeta, - updateReq.AuditLambda, - updateReq.AuditWeight, - dbNode.TotalAuditCount, - ) - mon.FloatVal("audit_reputation_alpha").Observe(auditAlpha) //locked - mon.FloatVal("audit_reputation_beta").Observe(auditBeta) //locked + // there are three audit outcomes: success, failure, and unknown + // if a node fails enough audits, it gets disqualified + // if a node gets enough "unknown" audits, it gets put into suspension + // if a node gets enough successful audits, and is in suspension, it gets removed from suspension + + auditAlpha := dbNode.AuditReputationAlpha + auditBeta := dbNode.AuditReputationBeta + unknownAuditAlpha := dbNode.UnknownAuditReputationAlpha + unknownAuditBeta := dbNode.UnknownAuditReputationBeta + totalAuditCount := dbNode.TotalAuditCount + + switch updateReq.AuditOutcome { + case overlay.AuditSuccess: + // for a successful audit, increase reputation for normal *and* unknown audits + auditAlpha, auditBeta, totalAuditCount = updateReputation( + true, + auditAlpha, + auditBeta, + updateReq.AuditLambda, + updateReq.AuditWeight, + totalAuditCount, + ) + unknownAuditAlpha, unknownAuditBeta, totalAuditCount = updateReputation( + true, + unknownAuditAlpha, + unknownAuditBeta, + updateReq.AuditLambda, + updateReq.AuditWeight, + totalAuditCount-1, // subtract one because this is still a single audit + ) + case overlay.AuditFailure: + // for audit failure, only update normal alpha/beta + auditAlpha, auditBeta, totalAuditCount = updateReputation( + false, + auditAlpha, + auditBeta, + updateReq.AuditLambda, + updateReq.AuditWeight, + totalAuditCount, + ) + case overlay.AuditUnknown: + // for audit unknown, only update unknown alpha/beta + unknownAuditAlpha, unknownAuditBeta, totalAuditCount = updateReputation( + false, + unknownAuditAlpha, + unknownAuditBeta, + updateReq.AuditLambda, + updateReq.AuditWeight, + totalAuditCount, + ) + + } + mon.FloatVal("audit_reputation_alpha").Observe(auditAlpha) //locked + mon.FloatVal("audit_reputation_beta").Observe(auditBeta) //locked + mon.FloatVal("unknown_audit_reputation_alpha").Observe(unknownAuditAlpha) //locked + mon.FloatVal("unknown_audit_reputation_beta").Observe(unknownAuditBeta) //locked totalUptimeCount := dbNode.TotalUptimeCount if updateReq.IsUp { @@ -1324,11 +1417,13 @@ func populateUpdateNodeStats(dbNode *dbx.Node, updateReq *overlay.UpdateRequest) } updateFields := updateNodeStats{ - NodeID: updateReq.NodeID, - TotalAuditCount: int64Field{set: true, value: totalAuditCount}, - TotalUptimeCount: int64Field{set: true, value: totalUptimeCount}, - AuditReputationAlpha: float64Field{set: true, value: auditAlpha}, - AuditReputationBeta: float64Field{set: true, value: auditBeta}, + NodeID: updateReq.NodeID, + TotalAuditCount: int64Field{set: true, value: totalAuditCount}, + TotalUptimeCount: int64Field{set: true, value: totalUptimeCount}, + AuditReputationAlpha: float64Field{set: true, value: auditAlpha}, + AuditReputationBeta: float64Field{set: true, value: auditBeta}, + UnknownAuditReputationAlpha: float64Field{set: true, value: unknownAuditAlpha}, + UnknownAuditReputationBeta: float64Field{set: true, value: unknownAuditBeta}, } auditRep := auditAlpha / (auditAlpha + auditBeta) @@ -1336,6 +1431,16 @@ func populateUpdateNodeStats(dbNode *dbx.Node, updateReq *overlay.UpdateRequest) updateFields.Disqualified = timeField{set: true, value: time.Now().UTC()} } + // if unknown audit rep goes below threshold, suspend node. Otherwise unsuspend node. + unknownAuditRep := unknownAuditAlpha / (unknownAuditAlpha + unknownAuditBeta) + if unknownAuditRep <= updateReq.AuditDQ { + updateFields.Suspended = timeField{set: true, value: time.Now().UTC()} + } else { + updateFields.Suspended = timeField{set: true, isNil: true} + } + + // TODO if node has been suspended for longer than threshold, and audit outcome is failure or unknown, disqualify node. + if updateReq.IsUp { updateFields.UptimeSuccessCount = int64Field{set: true, value: dbNode.UptimeSuccessCount + 1} updateFields.LastContactSuccess = timeField{set: true, value: time.Now()} @@ -1343,7 +1448,7 @@ func populateUpdateNodeStats(dbNode *dbx.Node, updateReq *overlay.UpdateRequest) updateFields.LastContactFailure = timeField{set: true, value: time.Now()} } - if updateReq.AuditSuccess { + if updateReq.AuditOutcome == overlay.AuditSuccess { updateFields.AuditSuccessCount = int64Field{set: true, value: dbNode.AuditSuccessCount + 1} } @@ -1372,6 +1477,19 @@ func populateUpdateFields(dbNode *dbx.Node, updateReq *overlay.UpdateRequest) db if update.Disqualified.set { updateFields.Disqualified = dbx.Node_Disqualified(update.Disqualified.value) } + if update.UnknownAuditReputationAlpha.set { + updateFields.UnknownAuditReputationAlpha = dbx.Node_UnknownAuditReputationAlpha(update.UnknownAuditReputationAlpha.value) + } + if update.UnknownAuditReputationBeta.set { + updateFields.UnknownAuditReputationBeta = dbx.Node_UnknownAuditReputationBeta(update.UnknownAuditReputationBeta.value) + } + if update.Suspended.set { + if update.Suspended.isNil { + updateFields.Suspended = dbx.Node_Suspended_Null() + } else { + updateFields.Suspended = dbx.Node_Suspended(update.Suspended.value) + } + } if update.UptimeSuccessCount.set { updateFields.UptimeSuccessCount = dbx.Node_UptimeSuccessCount(update.UptimeSuccessCount.value) } @@ -1387,7 +1505,7 @@ func populateUpdateFields(dbNode *dbx.Node, updateReq *overlay.UpdateRequest) db if update.Contained.set { updateFields.Contained = dbx.Node_Contained(update.Contained.value) } - if updateReq.AuditSuccess { + if updateReq.AuditOutcome == overlay.AuditSuccess { updateFields.AuditSuccessCount = dbx.Node_AuditSuccessCount(dbNode.AuditSuccessCount + 1) }