diff --git a/monkit.lock b/monkit.lock index 5f91b52f9..d5e59639a 100644 --- a/monkit.lock +++ b/monkit.lock @@ -91,6 +91,7 @@ storj.io/storj/satellite/repair/repairer."segment_repair_count" IntVal storj.io/storj/satellite/repair/repairer."segment_time_until_repair" IntVal storj.io/storj/satellite/repair/repairer."time_for_repair" FloatVal storj.io/storj/satellite/repair/repairer."time_since_checker_queue" FloatVal +storj.io/storj/satellite/satellitedb."audit_online_score" FloatVal storj.io/storj/satellite/satellitedb."audit_reputation_alpha" FloatVal storj.io/storj/satellite/satellitedb."audit_reputation_beta" FloatVal storj.io/storj/satellite/satellitedb."nodetallies.totalsum" IntVal diff --git a/satellite/overlay/benchmark_test.go b/satellite/overlay/benchmark_test.go index 65da0ce11..f702d7ec4 100644 --- a/satellite/overlay/benchmark_test.go +++ b/satellite/overlay/benchmark_test.go @@ -102,7 +102,7 @@ func BenchmarkOverlay(b *testing.B) { NodeID: id, AuditOutcome: outcome, IsUp: i&2 == 0, - }) + }, testAuditHistoryConfig()) require.NoError(b, err) } }) @@ -122,7 +122,7 @@ func BenchmarkOverlay(b *testing.B) { }) } - _, err := overlaydb.BatchUpdateStats(ctx, updateRequests, 100) + _, err := overlaydb.BatchUpdateStats(ctx, updateRequests, 100, testAuditHistoryConfig()) require.NoError(b, err) }) @@ -277,7 +277,7 @@ func BenchmarkNodeSelection(b *testing.B) { AuditLambda: 1, AuditWeight: 1, AuditDQ: 0.5, - }) + }, testAuditHistoryConfig()) require.NoError(b, err) } diff --git a/satellite/overlay/nodeselectioncache_test.go b/satellite/overlay/nodeselectioncache_test.go index cfcf961a3..698bf25e8 100644 --- a/satellite/overlay/nodeselectioncache_test.go +++ b/satellite/overlay/nodeselectioncache_test.go @@ -109,7 +109,7 @@ func addNodesToNodesTable(ctx context.Context, t *testing.T, db overlay.DB, coun IsUp: true, AuditOutcome: overlay.AuditSuccess, AuditLambda: 1, AuditWeight: 1, AuditDQ: 0.5, - }) + }, testAuditHistoryConfig()) require.NoError(t, err) reputableIds = append(reputableIds, storj.NodeID{byte(i)}) } diff --git a/satellite/overlay/selection_test.go b/satellite/overlay/selection_test.go index 944d9e3d9..f51dc1f11 100644 --- a/satellite/overlay/selection_test.go +++ b/satellite/overlay/selection_test.go @@ -192,7 +192,7 @@ func TestEnsureMinimumRequested(t *testing.T) { IsUp: true, AuditOutcome: overlay.AuditSuccess, AuditLambda: 1, AuditWeight: 1, AuditDQ: 0.5, - }) + }, testAuditHistoryConfig()) require.NoError(t, err) } @@ -235,7 +235,7 @@ func TestEnsureMinimumRequested(t *testing.T) { IsUp: true, AuditOutcome: overlay.AuditSuccess, AuditLambda: 1, AuditWeight: 1, AuditDQ: 0.5, - }) + }, testAuditHistoryConfig()) require.NoError(t, err) } @@ -272,7 +272,7 @@ func TestNodeSelection(t *testing.T) { IsUp: true, AuditOutcome: overlay.AuditSuccess, AuditLambda: 1, AuditWeight: 1, AuditDQ: 0.5, - }) + }, testAuditHistoryConfig()) require.NoError(t, err) } } @@ -462,7 +462,7 @@ func TestNodeSelectionGracefulExit(t *testing.T) { IsUp: true, AuditOutcome: overlay.AuditSuccess, AuditLambda: 1, AuditWeight: 1, AuditDQ: 0.5, - }) + }, testAuditHistoryConfig()) require.NoError(t, err) } @@ -694,7 +694,7 @@ func TestDistinctIPs(t *testing.T) { AuditLambda: 1, AuditWeight: 1, AuditDQ: 0.5, - }) + }, testAuditHistoryConfig()) assert.NoError(t, err) } testDistinctIPs(t, ctx, planet) diff --git a/satellite/overlay/service.go b/satellite/overlay/service.go index c3609729c..acf100d1e 100644 --- a/satellite/overlay/service.go +++ b/satellite/overlay/service.go @@ -57,9 +57,9 @@ type DB interface { // Reliable returns all nodes that are reliable Reliable(context.Context, *NodeCriteria) (storj.NodeIDList, error) // BatchUpdateStats updates multiple storagenode's stats in one transaction - BatchUpdateStats(ctx context.Context, updateRequests []*UpdateRequest, batchSize int) (failed storj.NodeIDList, err error) + BatchUpdateStats(ctx context.Context, updateRequests []*UpdateRequest, batchSize int, auditHistoryConfig AuditHistoryConfig) (failed storj.NodeIDList, err error) // UpdateStats all parts of single storagenode's stats. - UpdateStats(ctx context.Context, request *UpdateRequest) (stats *NodeStats, err error) + UpdateStats(ctx context.Context, request *UpdateRequest, auditHistoryConfig AuditHistoryConfig) (stats *NodeStats, err error) // UpdateNodeInfo updates node dossier with info requested from the node itself like node type, email, wallet, capacity, and version. UpdateNodeInfo(ctx context.Context, node storj.NodeID, nodeInfo *InfoResponse) (stats *NodeDossier, err error) // UpdateUptime updates a single storagenode's uptime stats. @@ -430,7 +430,7 @@ func (service *Service) BatchUpdateStats(ctx context.Context, requests []*Update request.AuditsRequiredForVetting = service.config.Node.AuditCount request.UptimesRequiredForVetting = service.config.Node.UptimeCount } - return service.db.BatchUpdateStats(ctx, requests, service.config.UpdateStatsBatchSize) + return service.db.BatchUpdateStats(ctx, requests, service.config.UpdateStatsBatchSize, service.config.AuditHistory) } // UpdateStats all parts of single storagenode's stats. @@ -445,7 +445,7 @@ func (service *Service) UpdateStats(ctx context.Context, request *UpdateRequest) request.AuditsRequiredForVetting = service.config.Node.AuditCount request.UptimesRequiredForVetting = service.config.Node.UptimeCount - return service.db.UpdateStats(ctx, request) + return service.db.UpdateStats(ctx, request, service.config.AuditHistory) } // UpdateNodeInfo updates node dossier with info requested from the node itself like node type, email, wallet, capacity, and version. diff --git a/satellite/overlay/service_test.go b/satellite/overlay/service_test.go index 1257bb3fb..842e5c164 100644 --- a/satellite/overlay/service_test.go +++ b/satellite/overlay/service_test.go @@ -53,6 +53,16 @@ func testNodeSelectionConfig(auditCount int64, newNodeFraction float64, distinct } } +// returns an AuditHistoryConfig with sensible test values. +func testAuditHistoryConfig() overlay.AuditHistoryConfig { + return overlay.AuditHistoryConfig{ + WindowSize: time.Hour, + TrackingPeriod: time.Hour, + GracePeriod: time.Hour, + OfflineThreshold: 0, + } +} + func testCache(ctx context.Context, t *testing.T, store overlay.DB) { valid1ID := testrand.NodeID() valid2ID := testrand.NodeID() @@ -62,7 +72,7 @@ func testCache(ctx context.Context, t *testing.T, store overlay.DB) { lastNet := "127.0.0" nodeSelectionConfig := testNodeSelectionConfig(0, 0, false) - serviceConfig := overlay.Config{Node: nodeSelectionConfig, UpdateStatsBatchSize: 100} + serviceConfig := overlay.Config{Node: nodeSelectionConfig, UpdateStatsBatchSize: 100, AuditHistory: testAuditHistoryConfig()} service := overlay.NewService(zaptest.NewLogger(t), store, serviceConfig) d := overlay.NodeCheckInInfo{ @@ -190,7 +200,7 @@ func TestRandomizedSelection(t *testing.T) { AuditLambda: 1, AuditWeight: 1, AuditDQ: 0.5, - }) + }, testAuditHistoryConfig()) require.NoError(t, err) } @@ -311,7 +321,7 @@ func TestRandomizedSelectionCache(t *testing.T) { AuditLambda: 1, AuditWeight: 1, AuditDQ: 0.5, - }) + }, testAuditHistoryConfig()) require.NoError(t, err) } @@ -768,7 +778,7 @@ func TestSuspendedSelection(t *testing.T) { AuditLambda: 1, AuditWeight: 1, AuditDQ: 0.5, - }) + }, testAuditHistoryConfig()) require.NoError(t, err) } diff --git a/satellite/overlay/statdb_test.go b/satellite/overlay/statdb_test.go index 9c69bb187..535da5ee0 100644 --- a/satellite/overlay/statdb_test.go +++ b/satellite/overlay/statdb_test.go @@ -173,7 +173,7 @@ func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) { AuditLambda: 0.123, AuditWeight: 0.456, AuditDQ: 0, // don't disqualify for any reason } - stats, err := cache.UpdateStats(ctx, updateReq) + stats, err := cache.UpdateStats(ctx, updateReq, testAuditHistoryConfig()) require.NoError(t, err) expectedAuditAlpha := updateReq.AuditLambda*auditAlpha + updateReq.AuditWeight @@ -186,7 +186,7 @@ func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) { updateReq.AuditOutcome = overlay.AuditFailure updateReq.IsUp = false - stats, err = cache.UpdateStats(ctx, updateReq) + stats, err = cache.UpdateStats(ctx, updateReq, testAuditHistoryConfig()) require.NoError(t, err) expectedAuditAlpha = updateReq.AuditLambda * auditAlpha diff --git a/satellite/overlay/suspension_test.go b/satellite/overlay/suspension_test.go index 8fde25385..a575f88af 100644 --- a/satellite/overlay/suspension_test.go +++ b/satellite/overlay/suspension_test.go @@ -124,7 +124,7 @@ func TestSuspendFailedAudit(t *testing.T) { AuditLambda: 1, AuditWeight: 1, AuditDQ: 0.6, - }) + }, testAuditHistoryConfig()) require.NoError(t, err) node, err = oc.Get(ctx, nodeID) diff --git a/satellite/satellitedb/overlaycache.go b/satellite/satellitedb/overlaycache.go index 0b99ce31d..612b440fd 100644 --- a/satellite/satellitedb/overlaycache.go +++ b/satellite/satellitedb/overlaycache.go @@ -326,7 +326,7 @@ func (cache *overlaycache) Reliable(ctx context.Context, criteria *overlay.NodeC } // BatchUpdateStats updates multiple storagenode's stats in one transaction. -func (cache *overlaycache) BatchUpdateStats(ctx context.Context, updateRequests []*overlay.UpdateRequest, batchSize int) (failed storj.NodeIDList, err error) { +func (cache *overlaycache) BatchUpdateStats(ctx context.Context, updateRequests []*overlay.UpdateRequest, batchSize int, auditHistoryConfig overlay.AuditHistoryConfig) (failed storj.NodeIDList, err error) { defer mon.Task()(&ctx)(&err) if len(updateRequests) == 0 { return failed, nil @@ -344,6 +344,7 @@ func (cache *overlaycache) BatchUpdateStats(ctx context.Context, updateRequests } } + auditTime := time.Now().UTC() doAppendAll := true err = cache.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) (err error) { _, err = tx.Tx.ExecContext(ctx, "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE") @@ -367,7 +368,13 @@ func (cache *overlaycache) BatchUpdateStats(ctx context.Context, updateRequests continue } - updateNodeStats := cache.populateUpdateNodeStats(dbNode, updateReq) + onlineScore, err := cache.updateAuditHistoryWithTx(ctx, tx, updateReq.NodeID, auditTime, updateReq.IsUp, auditHistoryConfig) + if err != nil { + doAppendAll = false + return err + } + + updateNodeStats := cache.populateUpdateNodeStats(dbNode, updateReq, onlineScore) sql := buildUpdateStatement(updateNodeStats) @@ -416,7 +423,7 @@ func (cache *overlaycache) BatchUpdateStats(ctx context.Context, updateRequests } // UpdateStats a single storagenode's stats in the db. -func (cache *overlaycache) UpdateStats(ctx context.Context, updateReq *overlay.UpdateRequest) (stats *overlay.NodeStats, err error) { +func (cache *overlaycache) UpdateStats(ctx context.Context, updateReq *overlay.UpdateRequest, auditHistoryConfig overlay.AuditHistoryConfig) (stats *overlay.NodeStats, err error) { defer mon.Task()(&ctx)(&err) nodeID := updateReq.NodeID @@ -439,7 +446,12 @@ func (cache *overlaycache) UpdateStats(ctx context.Context, updateReq *overlay.U return nil } - updateFields := cache.populateUpdateFields(dbNode, updateReq) + onlineScore, err := cache.updateAuditHistoryWithTx(ctx, tx, updateReq.NodeID, time.Now().UTC(), updateReq.IsUp, auditHistoryConfig) + if err != nil { + return err + } + + updateFields := cache.populateUpdateFields(dbNode, updateReq, onlineScore) dbNode, err = tx.Update_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()), updateFields) if err != nil { return err @@ -1145,7 +1157,7 @@ type updateNodeStats struct { Contained boolField } -func (cache *overlaycache) populateUpdateNodeStats(dbNode *dbx.Node, updateReq *overlay.UpdateRequest) updateNodeStats { +func (cache *overlaycache) populateUpdateNodeStats(dbNode *dbx.Node, updateReq *overlay.UpdateRequest, auditOnlineScore float64) updateNodeStats { // there are three audit outcomes: success, failure, and unknown // if a node fails enough audits, it gets disqualified // if a node gets enough "unknown" audits, it gets put into suspension @@ -1206,6 +1218,7 @@ func (cache *overlaycache) populateUpdateNodeStats(dbNode *dbx.Node, updateReq * mon.FloatVal("audit_reputation_beta").Observe(auditBeta) //locked mon.FloatVal("unknown_audit_reputation_alpha").Observe(unknownAuditAlpha) //locked mon.FloatVal("unknown_audit_reputation_beta").Observe(unknownAuditBeta) //locked + mon.FloatVal("audit_online_score").Observe(auditOnlineScore) //locked totalUptimeCount := dbNode.TotalUptimeCount if updateReq.IsUp { @@ -1264,6 +1277,9 @@ func (cache *overlaycache) populateUpdateNodeStats(dbNode *dbx.Node, updateReq * updateFields.UnknownAuditSuspended = timeField{set: true, isNil: true} } + // TODO when we verify that auditOnlineScore is calculated as expected, add offline suspension + // TODO when we verify that offline suspension works as expected, add DQ from offline suspension + if updateReq.IsUp { updateFields.UptimeSuccessCount = int64Field{set: true, value: dbNode.UptimeSuccessCount + 1} updateFields.LastContactSuccess = timeField{set: true, value: time.Now()} @@ -1281,9 +1297,9 @@ func (cache *overlaycache) populateUpdateNodeStats(dbNode *dbx.Node, updateReq * return updateFields } -func (cache *overlaycache) populateUpdateFields(dbNode *dbx.Node, updateReq *overlay.UpdateRequest) dbx.Node_Update_Fields { +func (cache *overlaycache) populateUpdateFields(dbNode *dbx.Node, updateReq *overlay.UpdateRequest, auditOnlineScore float64) dbx.Node_Update_Fields { - update := cache.populateUpdateNodeStats(dbNode, updateReq) + update := cache.populateUpdateNodeStats(dbNode, updateReq, auditOnlineScore) updateFields := dbx.Node_Update_Fields{} if update.VettedAt.set { updateFields.VettedAt = dbx.Node_VettedAt(update.VettedAt.value) diff --git a/satellite/satellitedb/overlaycache_test.go b/satellite/satellitedb/overlaycache_test.go index 072f7da32..e7462deba 100644 --- a/satellite/satellitedb/overlaycache_test.go +++ b/satellite/satellitedb/overlaycache_test.go @@ -5,6 +5,7 @@ package satellitedb_test import ( "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -28,7 +29,7 @@ func TestUpdateStats(t *testing.T) { // nodeA: 1 audit, 2 uptime -> unvetted updateReq := &overlay.UpdateRequest{NodeID: nodeA.ID(), AuditOutcome: overlay.AuditFailure, IsUp: false, AuditsRequiredForVetting: numAudits, UptimesRequiredForVetting: numUptimes} - nodeStats, err := cache.UpdateStats(ctx, updateReq) + nodeStats, err := cache.UpdateStats(ctx, updateReq, testAuditHistoryConfig()) require.NoError(t, err) assert.Nil(t, nodeStats.VettedAt) assert.EqualValues(t, 1, nodeStats.AuditCount) @@ -36,7 +37,7 @@ func TestUpdateStats(t *testing.T) { // nodeA: 2 audits, 2 uptimes -> unvetted updateReq = &overlay.UpdateRequest{NodeID: nodeA.ID(), AuditOutcome: overlay.AuditFailure, IsUp: false, AuditsRequiredForVetting: numAudits, UptimesRequiredForVetting: numUptimes} - nodeStats, err = cache.UpdateStats(ctx, updateReq) + nodeStats, err = cache.UpdateStats(ctx, updateReq, testAuditHistoryConfig()) require.NoError(t, err) assert.Nil(t, nodeStats.VettedAt) assert.EqualValues(t, 2, nodeStats.AuditCount) @@ -44,7 +45,7 @@ func TestUpdateStats(t *testing.T) { // nodeA: 3 audits, 3 uptimes -> vetted updateReq = &overlay.UpdateRequest{NodeID: nodeA.ID(), AuditOutcome: overlay.AuditSuccess, IsUp: true, AuditsRequiredForVetting: numAudits, UptimesRequiredForVetting: numUptimes} - nodeStats, err = cache.UpdateStats(ctx, updateReq) + nodeStats, err = cache.UpdateStats(ctx, updateReq, testAuditHistoryConfig()) require.NoError(t, err) assert.NotNil(t, nodeStats.VettedAt) assert.EqualValues(t, 3, nodeStats.AuditCount) @@ -52,7 +53,7 @@ func TestUpdateStats(t *testing.T) { // nodeB: 1 audit, 3 uptimes -> unvetted updateReq = &overlay.UpdateRequest{NodeID: nodeB.ID(), AuditOutcome: overlay.AuditSuccess, IsUp: true, AuditsRequiredForVetting: numAudits, UptimesRequiredForVetting: numUptimes} - nodeStats, err = cache.UpdateStats(ctx, updateReq) + nodeStats, err = cache.UpdateStats(ctx, updateReq, testAuditHistoryConfig()) require.NoError(t, err) assert.Nil(t, nodeStats.VettedAt) assert.EqualValues(t, 1, nodeStats.AuditCount) @@ -60,7 +61,7 @@ func TestUpdateStats(t *testing.T) { // nodeB: 2 audits, 3 uptimes -> vetted updateReq = &overlay.UpdateRequest{NodeID: nodeB.ID(), AuditOutcome: overlay.AuditFailure, IsUp: false, AuditsRequiredForVetting: numAudits, UptimesRequiredForVetting: numUptimes} - nodeStats, err = cache.UpdateStats(ctx, updateReq) + nodeStats, err = cache.UpdateStats(ctx, updateReq, testAuditHistoryConfig()) require.NoError(t, err) assert.NotNil(t, nodeStats.VettedAt) assert.EqualValues(t, 2, nodeStats.AuditCount) @@ -68,7 +69,7 @@ func TestUpdateStats(t *testing.T) { // Don't overwrite node b's vetted_at timestamp updateReq = &overlay.UpdateRequest{NodeID: nodeB.ID(), AuditOutcome: overlay.AuditSuccess, IsUp: true, AuditsRequiredForVetting: numAudits, UptimesRequiredForVetting: numUptimes} - nodeStats2, err := cache.UpdateStats(ctx, updateReq) + nodeStats2, err := cache.UpdateStats(ctx, updateReq, testAuditHistoryConfig()) require.NoError(t, err) assert.NotNil(t, nodeStats2.VettedAt) assert.Equal(t, nodeStats.VettedAt, nodeStats2.VettedAt) @@ -94,7 +95,7 @@ func TestBatchUpdateStats(t *testing.T) { updateReqA := &overlay.UpdateRequest{NodeID: nodeA.ID(), AuditOutcome: overlay.AuditFailure, IsUp: false, AuditsRequiredForVetting: numAudits, UptimesRequiredForVetting: numUptimes} updateReqB := &overlay.UpdateRequest{NodeID: nodeB.ID(), AuditOutcome: overlay.AuditSuccess, IsUp: true, AuditsRequiredForVetting: numAudits, UptimesRequiredForVetting: numUptimes} updateReqs := []*overlay.UpdateRequest{updateReqA, updateReqB} - failed, err := cache.BatchUpdateStats(ctx, updateReqs, batchSize) + failed, err := cache.BatchUpdateStats(ctx, updateReqs, batchSize, testAuditHistoryConfig()) require.NoError(t, err) assert.Len(t, failed, 0) @@ -114,7 +115,7 @@ func TestBatchUpdateStats(t *testing.T) { updateReqA = &overlay.UpdateRequest{NodeID: nodeA.ID(), AuditOutcome: overlay.AuditFailure, IsUp: false, AuditsRequiredForVetting: numAudits, UptimesRequiredForVetting: numUptimes} updateReqB = &overlay.UpdateRequest{NodeID: nodeB.ID(), AuditOutcome: overlay.AuditFailure, IsUp: false, AuditsRequiredForVetting: numAudits, UptimesRequiredForVetting: numUptimes} updateReqs = []*overlay.UpdateRequest{updateReqA, updateReqB} - failed, err = cache.BatchUpdateStats(ctx, updateReqs, batchSize) + failed, err = cache.BatchUpdateStats(ctx, updateReqs, batchSize, testAuditHistoryConfig()) require.NoError(t, err) assert.Len(t, failed, 0) @@ -134,7 +135,7 @@ func TestBatchUpdateStats(t *testing.T) { updateReqA = &overlay.UpdateRequest{NodeID: nodeA.ID(), AuditOutcome: overlay.AuditSuccess, IsUp: true, AuditsRequiredForVetting: numAudits, UptimesRequiredForVetting: numUptimes} updateReqB = &overlay.UpdateRequest{NodeID: nodeB.ID(), AuditOutcome: overlay.AuditSuccess, IsUp: true, AuditsRequiredForVetting: numAudits, UptimesRequiredForVetting: numUptimes} updateReqs = []*overlay.UpdateRequest{updateReqA, updateReqB} - failed, err = cache.BatchUpdateStats(ctx, updateReqs, batchSize) + failed, err = cache.BatchUpdateStats(ctx, updateReqs, batchSize, testAuditHistoryConfig()) require.NoError(t, err) assert.Len(t, failed, 0) @@ -152,3 +153,13 @@ func TestBatchUpdateStats(t *testing.T) { assert.EqualValues(t, 4, nB2.Reputation.UptimeCount) }) } + +// returns an AuditHistoryConfig with sensible test values. +func testAuditHistoryConfig() overlay.AuditHistoryConfig { + return overlay.AuditHistoryConfig{ + WindowSize: time.Hour, + TrackingPeriod: time.Hour, + GracePeriod: time.Hour, + OfflineThreshold: 0, + } +}