satellite/{audit,overlay,satellitedb}: implement unknown audit reputation and suspension

* change overlay.UpdateStats to allow a third audit outcome. Now it can
handle successful, failed, and unknown audits.
* when "unknown audit reputation"
(unknownAuditAlpha/(unknownAuditAlpha+unknownAuditBeta)) falls below the
DQ threshold, put node into suspension.
* when unknown audit reputation goes above the DQ threshold, remove node
from suspension.
* record unknown audits from audit reporter.
* add basic tests around unknown audits and suspension.

Change-Id: I125f06f3af52e8a29ba48dc19361821a9ff1daa1
This commit is contained in:
Moby von Briesen 2020-03-09 11:35:54 -04:00 committed by Jennifer Li Johnson
parent 52590197c2
commit 8b72181a1f
13 changed files with 413 additions and 110 deletions

View File

@ -89,6 +89,8 @@ storj.io/storj/satellite/repair/repairer."time_for_repair" FloatVal
storj.io/storj/satellite/repair/repairer."time_since_checker_queue" FloatVal
storj.io/storj/satellite/satellitedb."audit_reputation_alpha" FloatVal
storj.io/storj/satellite/satellitedb."audit_reputation_beta" FloatVal
storj.io/storj/satellite/satellitedb."unknown_audit_reputation_alpha" FloatVal
storj.io/storj/satellite/satellitedb."unknown_audit_reputation_beta" FloatVal
storj.io/storj/storage/filestore."open_file_in_trash" Meter
storj.io/storj/storagenode/contact."satellite_contact_request" Meter
storj.io/storj/storagenode/gracefulexit."satellite_gracefulexit_request" Meter

View File

@ -218,7 +218,7 @@ func dqNodes(ctx *testcontext.Context, planet *testplanet.Planet) (map[storj.Nod
updateRequests = append(updateRequests, &overlay.UpdateRequest{
NodeID: n.ID(),
IsUp: true,
AuditSuccess: false,
AuditOutcome: overlay.AuditFailure,
})
}

View File

@ -227,7 +227,7 @@ func TestDisqualifiedNodeRemainsDisqualified(t *testing.T) {
_, err = satellitePeer.DB.OverlayCache().BatchUpdateStats(ctx, []*overlay.UpdateRequest{{
NodeID: disqualifiedNode.ID(),
IsUp: true,
AuditSuccess: true,
AuditOutcome: overlay.AuditSuccess,
AuditLambda: 0, // forget about history
AuditWeight: 1,
AuditDQ: 0, // make sure new reputation scores are larger than the DQ thresholds

View File

@ -54,12 +54,14 @@ func (reporter *Reporter) RecordAudits(ctx context.Context, req Report, path sto
successes := req.Successes
fails := req.Fails
unknowns := req.Unknown
offlines := req.Offlines
pendingAudits := req.PendingAudits
reporter.log.Debug("Reporting audits",
zap.Int("successes", len(successes)),
zap.Int("failures", len(fails)),
zap.Int("unknowns", len(unknowns)),
zap.Int("offlines", len(offlines)),
zap.Int("pending", len(pendingAudits)),
zap.Binary("Segment", []byte(path)),
@ -70,7 +72,7 @@ func (reporter *Reporter) RecordAudits(ctx context.Context, req Report, path sto
tries := 0
for tries <= reporter.maxRetries {
if len(successes) == 0 && len(fails) == 0 && len(offlines) == 0 && len(pendingAudits) == 0 {
if len(successes) == 0 && len(fails) == 0 && len(unknowns) == 0 && len(offlines) == 0 && len(pendingAudits) == 0 {
return Report{}, nil
}
@ -88,6 +90,12 @@ func (reporter *Reporter) RecordAudits(ctx context.Context, req Report, path sto
errlist.Add(err)
}
}
if len(unknowns) > 0 {
unknowns, err = reporter.recordAuditUnknownStatus(ctx, unknowns)
if err != nil {
errlist.Add(err)
}
}
// We do not report offline nodes to the overlay at this time; see V3-3025.
if len(offlines) > 0 && reportOfflineDuringAudit {
offlines, err = reporter.recordOfflineStatus(ctx, offlines)
@ -111,13 +119,14 @@ func (reporter *Reporter) RecordAudits(ctx context.Context, req Report, path sto
Successes: successes,
Fails: fails,
Offlines: offlines,
Unknown: unknowns,
PendingAudits: pendingAudits,
}, errs.Combine(Error.New("some nodes failed to be updated in overlay"), err)
}
return Report{}, nil
}
// recordAuditFailStatus updates nodeIDs in overlay with isup=true, auditsuccess=false
// recordAuditFailStatus updates nodeIDs in overlay with isup=true, auditoutcome=fail
func (reporter *Reporter) recordAuditFailStatus(ctx context.Context, failedAuditNodeIDs storj.NodeIDList) (failed storj.NodeIDList, err error) {
defer mon.Task()(&ctx)(&err)
@ -126,16 +135,35 @@ func (reporter *Reporter) recordAuditFailStatus(ctx context.Context, failedAudit
updateRequests[i] = &overlay.UpdateRequest{
NodeID: nodeID,
IsUp: true,
AuditSuccess: false,
AuditOutcome: overlay.AuditFailure,
}
}
if len(updateRequests) > 0 {
failed, err = reporter.overlay.BatchUpdateStats(ctx, updateRequests)
if err != nil || len(failed) > 0 {
reporter.log.Debug("failed to record Failed Nodes ", zap.Strings("NodeIDs", failed.Strings()))
return failed, errs.Combine(Error.New("failed to record some audit fail statuses in overlay"), err)
failed, err = reporter.overlay.BatchUpdateStats(ctx, updateRequests)
if err != nil || len(failed) > 0 {
reporter.log.Debug("failed to record Failed Nodes ", zap.Strings("NodeIDs", failed.Strings()))
return failed, errs.Combine(Error.New("failed to record some audit fail statuses in overlay"), err)
}
return nil, nil
}
// recordAuditUnknownStatus updates nodeIDs in overlay with isup=true, auditoutcome=unknown
func (reporter *Reporter) recordAuditUnknownStatus(ctx context.Context, unknownAuditNodeIDs storj.NodeIDList) (failed storj.NodeIDList, err error) {
defer mon.Task()(&ctx)(&err)
updateRequests := make([]*overlay.UpdateRequest, len(unknownAuditNodeIDs))
for i, nodeID := range unknownAuditNodeIDs {
updateRequests[i] = &overlay.UpdateRequest{
NodeID: nodeID,
IsUp: true,
AuditOutcome: overlay.AuditUnknown,
}
}
failed, err = reporter.overlay.BatchUpdateStats(ctx, updateRequests)
if err != nil || len(failed) > 0 {
reporter.log.Debug("failed to record Unknown Nodes ", zap.Strings("NodeIDs", failed.Strings()))
return failed, errs.Combine(Error.New("failed to record some audit unknown statuses in overlay"), err)
}
return nil, nil
}
@ -160,7 +188,7 @@ func (reporter *Reporter) recordOfflineStatus(ctx context.Context, offlineNodeID
return nil, nil
}
// recordAuditSuccessStatus updates nodeIDs in overlay with isup=true, auditsuccess=true
// recordAuditSuccessStatus updates nodeIDs in overlay with isup=true, auditoutcome=success
func (reporter *Reporter) recordAuditSuccessStatus(ctx context.Context, successNodeIDs storj.NodeIDList) (failed storj.NodeIDList, err error) {
defer mon.Task()(&ctx)(&err)
@ -169,16 +197,14 @@ func (reporter *Reporter) recordAuditSuccessStatus(ctx context.Context, successN
updateRequests[i] = &overlay.UpdateRequest{
NodeID: nodeID,
IsUp: true,
AuditSuccess: true,
AuditOutcome: overlay.AuditSuccess,
}
}
if len(updateRequests) > 0 {
failed, err = reporter.overlay.BatchUpdateStats(ctx, updateRequests)
if err != nil || len(failed) > 0 {
reporter.log.Debug("failed to record Success Nodes ", zap.Strings("NodeIDs", failed.Strings()))
return failed, errs.Combine(Error.New("failed to record some audit success statuses in overlay"), err)
}
failed, err = reporter.overlay.BatchUpdateStats(ctx, updateRequests)
if err != nil || len(failed) > 0 {
reporter.log.Debug("failed to record Success Nodes ", zap.Strings("NodeIDs", failed.Strings()))
return failed, errs.Combine(Error.New("failed to record some audit success statuses in overlay"), err)
}
return nil, nil
}
@ -201,35 +227,33 @@ func (reporter *Reporter) recordPendingAudits(ctx context.Context, pendingAudits
updateRequests = append(updateRequests, &overlay.UpdateRequest{
NodeID: pendingAudit.NodeID,
IsUp: true,
AuditSuccess: false,
AuditOutcome: overlay.AuditFailure,
})
}
}
if len(updateRequests) > 0 {
failedBatch, err := reporter.overlay.BatchUpdateStats(ctx, updateRequests)
if err != nil {
errlist.Add(err)
failedBatch, err := reporter.overlay.BatchUpdateStats(ctx, updateRequests)
if err != nil {
errlist.Add(err)
}
if len(failedBatch) > 0 {
pendingMap := make(map[storj.NodeID]*PendingAudit)
for _, pendingAudit := range pendingAudits {
pendingMap[pendingAudit.NodeID] = pendingAudit
}
if len(failedBatch) > 0 {
pendingMap := make(map[storj.NodeID]*PendingAudit)
for _, pendingAudit := range pendingAudits {
pendingMap[pendingAudit.NodeID] = pendingAudit
}
for _, nodeID := range failedBatch {
pending, ok := pendingMap[nodeID]
if ok {
failed = append(failed, pending)
}
for _, nodeID := range failedBatch {
pending, ok := pendingMap[nodeID]
if ok {
failed = append(failed, pending)
}
}
}
if len(failed) > 0 {
for _, v := range failed {
reporter.log.Debug("failed to record Pending Nodes ", zap.Stringer("NodeID", v.NodeID), zap.String("Path", v.Path))
}
return failed, errs.Combine(Error.New("failed to record some pending audits"), errlist.Err())
if len(failed) > 0 {
for _, v := range failed {
reporter.log.Debug("failed to record Pending Nodes ", zap.Stringer("NodeID", v.NodeID), zap.String("Path", v.Path))
}
return failed, errs.Combine(Error.New("failed to record some pending audits"), errlist.Err())
}
return nil, nil
}

View File

@ -1258,13 +1258,12 @@ func TestReverifyUnknownError(t *testing.T) {
require.Len(t, report.Unknown, 1)
require.Equal(t, report.Unknown[0], badNode)
// TODO uncomment this stuff when suspension mode is implemented
//// record audit
//_, err = audits.Reporter.RecordAudits(ctx, report, path)
//require.NoError(t, err)
//
//// make sure that pending audit is removed by the reporter when audit is recorded
//_, err = containment.Get(ctx, pending.NodeID)
//require.True(t, audit.ErrContainedNotFound.Has(err))
// record audit
_, err = audits.Reporter.RecordAudits(ctx, report, path)
require.NoError(t, err)
// make sure that pending audit is removed by the reporter when audit is recorded
_, err = containment.Get(ctx, pending.NodeID)
require.True(t, audit.ErrContainedNotFound.Has(err))
})
}

View File

@ -77,9 +77,13 @@ func BenchmarkOverlay(b *testing.B) {
b.Run("UpdateStats", func(b *testing.B) {
for i := 0; i < b.N; i++ {
id := all[i%len(all)]
outcome := overlay.AuditFailure
if i&1 == 0 {
outcome = overlay.AuditSuccess
}
_, err := overlaydb.UpdateStats(ctx, &overlay.UpdateRequest{
NodeID: id,
AuditSuccess: i&1 == 0,
AuditOutcome: outcome,
IsUp: i&2 == 0,
})
require.NoError(b, err)
@ -90,9 +94,13 @@ func BenchmarkOverlay(b *testing.B) {
var updateRequests []*overlay.UpdateRequest
for i := 0; i < b.N; i++ {
id := all[i%len(all)]
outcome := overlay.AuditFailure
if i&1 == 0 {
outcome = overlay.AuditSuccess
}
updateRequests = append(updateRequests, &overlay.UpdateRequest{
NodeID: id,
AuditSuccess: i&1 == 0,
AuditOutcome: outcome,
IsUp: i&2 == 0,
})

View File

@ -137,7 +137,7 @@ func TestNodeSelection(t *testing.T) {
_, err := satellite.DB.OverlayCache().UpdateStats(ctx, &overlay.UpdateRequest{
NodeID: node.ID(),
IsUp: true,
AuditSuccess: true,
AuditOutcome: overlay.AuditSuccess,
AuditLambda: 1, AuditWeight: 1, AuditDQ: 0.5,
})
require.NoError(t, err)
@ -162,7 +162,7 @@ func TestNodeSelectionWithBatch(t *testing.T) {
_, err := satellite.DB.OverlayCache().BatchUpdateStats(ctx, []*overlay.UpdateRequest{{
NodeID: node.ID(),
IsUp: true,
AuditSuccess: true,
AuditOutcome: overlay.AuditSuccess,
AuditLambda: 1, AuditWeight: 1, AuditDQ: 0.5,
}}, 1)
require.NoError(t, err)
@ -282,7 +282,7 @@ func TestNodeSelectionGracefulExit(t *testing.T) {
_, err := satellite.DB.OverlayCache().UpdateStats(ctx, &overlay.UpdateRequest{
NodeID: node.ID(),
IsUp: true,
AuditSuccess: true,
AuditOutcome: overlay.AuditSuccess,
AuditLambda: 1, AuditWeight: 1, AuditDQ: 0.5,
})
require.NoError(t, err)
@ -485,7 +485,7 @@ func TestDistinctIPs(t *testing.T) {
_, err := satellite.DB.OverlayCache().UpdateStats(ctx, &overlay.UpdateRequest{
NodeID: planet.StorageNodes[i].ID(),
IsUp: true,
AuditSuccess: true,
AuditOutcome: overlay.AuditSuccess,
AuditLambda: 1,
AuditWeight: 1,
AuditDQ: 0.5,
@ -513,7 +513,7 @@ func TestDistinctIPsWithBatch(t *testing.T) {
_, err := satellite.DB.OverlayCache().BatchUpdateStats(ctx, []*overlay.UpdateRequest{{
NodeID: planet.StorageNodes[i].ID(),
IsUp: true,
AuditSuccess: true,
AuditOutcome: overlay.AuditSuccess,
AuditLambda: 1,
AuditWeight: 1,
AuditDQ: 0.5,

View File

@ -94,6 +94,11 @@ type DB interface {
// DisqualifyNode disqualifies a storage node.
DisqualifyNode(ctx context.Context, nodeID storj.NodeID) (err error)
// SuspendNode suspends a storage node.
SuspendNode(ctx context.Context, nodeID storj.NodeID, suspendedAt time.Time) (err error)
// UnsuspendNode unsuspends a storage node.
UnsuspendNode(ctx context.Context, nodeID storj.NodeID) (err error)
}
// NodeCheckInInfo contains all the info that will be updated when a node checkins
@ -128,10 +133,22 @@ type NodeCriteria struct {
DistinctIP bool
}
// AuditType is an enum representing the outcome of a particular audit reported to the overlay.
type AuditType int
const (
// AuditSuccess represents a successful audit.
AuditSuccess AuditType = iota
// AuditFailure represents a failed audit.
AuditFailure
// AuditUnknown represents an audit that resulted in an unknown error from the node.
AuditUnknown
)
// UpdateRequest is used to update a node status.
type UpdateRequest struct {
NodeID storj.NodeID
AuditSuccess bool
AuditOutcome AuditType
IsUp bool
// n.b. these are set values from the satellite.
// They are part of the UpdateRequest struct in order to be
@ -169,6 +186,7 @@ type NodeDossier struct {
Version pb.NodeVersion
Contained bool
Disqualified *time.Time
Suspended *time.Time
PieceCount int64
ExitStatus ExitStatus
CreatedAt time.Time
@ -178,16 +196,19 @@ type NodeDossier struct {
// NodeStats contains statistics about a node.
type NodeStats struct {
Latency90 int64
AuditSuccessCount int64
AuditCount int64
UptimeSuccessCount int64
UptimeCount int64
LastContactSuccess time.Time
LastContactFailure time.Time
AuditReputationAlpha float64
AuditReputationBeta float64
Disqualified *time.Time
Latency90 int64
AuditSuccessCount int64
AuditCount int64
UptimeSuccessCount int64
UptimeCount int64
LastContactSuccess time.Time
LastContactFailure time.Time
AuditReputationAlpha float64
AuditReputationBeta float64
Disqualified *time.Time
UnknownAuditReputationAlpha float64
UnknownAuditReputationBeta float64
Suspended *time.Time
}
// NodeLastContact contains the ID, address, and timestamp

View File

@ -134,7 +134,7 @@ func testCache(ctx context.Context, t *testing.T, store overlay.DB) {
stats, err := service.UpdateStats(ctx, &overlay.UpdateRequest{
NodeID: valid1ID,
IsUp: true,
AuditSuccess: false,
AuditOutcome: overlay.AuditFailure,
})
require.NoError(t, err)
newAuditAlpha := 1
@ -151,7 +151,7 @@ func testCache(ctx context.Context, t *testing.T, store overlay.DB) {
_, err = service.BatchUpdateStats(ctx, []*overlay.UpdateRequest{{
NodeID: valid2ID,
IsUp: false,
AuditSuccess: true,
AuditOutcome: overlay.AuditSuccess,
}})
require.NoError(t, err)
dossier, err := service.Get(ctx, valid2ID)
@ -197,7 +197,7 @@ func TestRandomizedSelection(t *testing.T) {
_, err = cache.UpdateStats(ctx, &overlay.UpdateRequest{
NodeID: newID,
IsUp: true,
AuditSuccess: true,
AuditOutcome: overlay.AuditSuccess,
AuditLambda: 1,
AuditWeight: 1,
AuditDQ: 0.5,
@ -299,7 +299,7 @@ func TestKnownReliable(t *testing.T) {
// Disqualify storage node #0
stats, err := service.UpdateStats(ctx, &overlay.UpdateRequest{
NodeID: planet.StorageNodes[0].ID(),
AuditSuccess: false,
AuditOutcome: overlay.AuditFailure,
})
require.NoError(t, err)
require.NotNil(t, stats.Disqualified)

View File

@ -51,7 +51,7 @@ func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) {
// update stats so node disqualification is triggered
_, err = cache.UpdateStats(ctx, &overlay.UpdateRequest{
NodeID: tt.nodeID,
AuditSuccess: true,
AuditOutcome: overlay.AuditSuccess,
IsUp: true,
AuditLambda: 1, AuditWeight: 1,
AuditDQ: 0.9,
@ -133,7 +133,7 @@ func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) {
updateReq := &overlay.UpdateRequest{
NodeID: nodeID,
AuditSuccess: true,
AuditOutcome: overlay.AuditSuccess,
IsUp: true,
AuditLambda: 0.123, AuditWeight: 0.456,
AuditDQ: 0, // don't disqualify for any reason
@ -149,7 +149,7 @@ func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) {
auditAlpha = expectedAuditAlpha
auditBeta = expectedAuditBeta
updateReq.AuditSuccess = false
updateReq.AuditOutcome = overlay.AuditFailure
updateReq.IsUp = false
stats, err = cache.UpdateStats(ctx, updateReq)
require.NoError(t, err)

View File

@ -0,0 +1,131 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package overlay_test
import (
"testing"
"time"
"github.com/stretchr/testify/require"
"storj.io/common/testcontext"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite/overlay"
)
// TestSuspendBasic ensures that we can suspend a node using overlayService.SuspendNode and that we can unsuspend a node using overlayservice.UnsuspendNode
func TestSuspendBasic(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
nodeID := planet.StorageNodes[0].ID()
oc := planet.Satellites[0].Overlay.DB
node, err := oc.Get(ctx, nodeID)
require.NoError(t, err)
require.Nil(t, node.Suspended)
timeToSuspend := time.Now().UTC().Truncate(time.Second)
err = oc.SuspendNode(ctx, nodeID, timeToSuspend)
require.NoError(t, err)
node, err = oc.Get(ctx, nodeID)
require.NoError(t, err)
require.NotNil(t, node.Suspended)
require.True(t, node.Suspended.Equal(timeToSuspend))
err = oc.UnsuspendNode(ctx, nodeID)
require.NoError(t, err)
node, err = oc.Get(ctx, nodeID)
require.NoError(t, err)
require.Nil(t, node.Suspended)
})
}
// TestSuspendWithUpdateStats ensures that a node goes into suspension node from getting enough unknown audits, and gets removed from getting enough successful audits.
func TestSuspendWithUpdateStats(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
nodeID := planet.StorageNodes[0].ID()
oc := planet.Satellites[0].Overlay.Service
node, err := oc.Get(ctx, nodeID)
require.NoError(t, err)
require.Nil(t, node.Suspended)
testStartTime := time.Now()
// give node one unknown audit - bringing unknown audit rep to 0.5, and suspending node
_, err = oc.UpdateStats(ctx, &overlay.UpdateRequest{
NodeID: nodeID,
AuditOutcome: overlay.AuditUnknown,
IsUp: true,
AuditLambda: 1,
AuditWeight: 1,
AuditDQ: 0.6,
})
require.NoError(t, err)
node, err = oc.Get(ctx, nodeID)
require.NoError(t, err)
require.NotNil(t, node.Suspended)
require.True(t, node.Suspended.After(testStartTime))
// expect node is not disqualified and that normal audit alpha/beta remain unchanged
require.Nil(t, node.Disqualified)
require.EqualValues(t, node.Reputation.AuditReputationAlpha, 1)
require.EqualValues(t, node.Reputation.AuditReputationBeta, 0)
// give node two successful audits - bringing unknown audit rep to 0.75, and unsuspending node
for i := 0; i < 2; i++ {
_, err = oc.UpdateStats(ctx, &overlay.UpdateRequest{
NodeID: nodeID,
AuditOutcome: overlay.AuditSuccess,
IsUp: true,
AuditLambda: 1,
AuditWeight: 1,
AuditDQ: 0.6,
})
require.NoError(t, err)
}
node, err = oc.Get(ctx, nodeID)
require.NoError(t, err)
require.Nil(t, node.Suspended)
})
}
// TestSuspendFailedAudit ensures that a node is not suspended for a failed audit.
func TestSuspendFailedAudit(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
nodeID := planet.StorageNodes[0].ID()
oc := planet.Satellites[0].Overlay.DB
node, err := oc.Get(ctx, nodeID)
require.NoError(t, err)
require.Nil(t, node.Disqualified)
require.Nil(t, node.Suspended)
// give node one failed audit - bringing audit rep to 0.5, and disqualifying node
// expect that suspended field and unknown audit reputation remain unchanged
_, err = oc.UpdateStats(ctx, &overlay.UpdateRequest{
NodeID: nodeID,
AuditOutcome: overlay.AuditFailure,
IsUp: true,
AuditLambda: 1,
AuditWeight: 1,
AuditDQ: 0.6,
})
require.NoError(t, err)
node, err = oc.Get(ctx, nodeID)
require.NoError(t, err)
require.NotNil(t, node.Disqualified)
require.Nil(t, node.Suspended)
require.EqualValues(t, node.Reputation.UnknownAuditReputationAlpha, 1)
require.EqualValues(t, node.Reputation.UnknownAuditReputationBeta, 0)
})
}

View File

@ -328,7 +328,7 @@ func (repairer *SegmentRepairer) updateAuditFailStatus(ctx context.Context, fail
updateRequests[i] = &overlay.UpdateRequest{
NodeID: nodeID,
IsUp: true,
AuditSuccess: false,
AuditOutcome: overlay.AuditFailure,
}
}
if len(updateRequests) > 0 {

View File

@ -802,6 +802,38 @@ func (cache *overlaycache) DisqualifyNode(ctx context.Context, nodeID storj.Node
return nil
}
// SuspendNode suspends a storage node.
func (cache *overlaycache) SuspendNode(ctx context.Context, nodeID storj.NodeID, suspendedAt time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
updateFields := dbx.Node_Update_Fields{}
updateFields.Suspended = dbx.Node_Suspended(suspendedAt.UTC())
dbNode, err := cache.db.Update_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()), updateFields)
if err != nil {
return err
}
if dbNode == nil {
return errs.New("unable to get node by ID: %v", nodeID)
}
return nil
}
// UnsuspendNode unsuspends a storage node.
func (cache *overlaycache) UnsuspendNode(ctx context.Context, nodeID storj.NodeID) (err error) {
defer mon.Task()(&ctx)(&err)
updateFields := dbx.Node_Update_Fields{}
updateFields.Suspended = dbx.Node_Suspended_Null()
dbNode, err := cache.db.Update_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()), updateFields)
if err != nil {
return err
}
if dbNode == nil {
return errs.New("unable to get node by ID: %v", nodeID)
}
return nil
}
// AllPieceCounts returns a map of node IDs to piece counts from the db.
// NB: a valid, partial piece map can be returned even if node ID parsing error(s) are returned.
func (cache *overlaycache) AllPieceCounts(ctx context.Context) (_ map[storj.NodeID]int, err error) {
@ -1126,6 +1158,7 @@ func convertDBNode(ctx context.Context, info *dbx.Node) (_ *overlay.NodeDossier,
},
Contained: info.Contained,
Disqualified: info.Disqualified,
Suspended: info.Suspended,
PieceCount: info.PieceCount,
ExitStatus: exitStatus,
CreatedAt: info.CreatedAt,
@ -1159,16 +1192,19 @@ func convertDBNodeToPBNode(ctx context.Context, info *dbx.Id_LastNet_LastIpPort_
func getNodeStats(dbNode *dbx.Node) *overlay.NodeStats {
nodeStats := &overlay.NodeStats{
Latency90: dbNode.Latency90,
AuditCount: dbNode.TotalAuditCount,
AuditSuccessCount: dbNode.AuditSuccessCount,
UptimeCount: dbNode.TotalUptimeCount,
UptimeSuccessCount: dbNode.UptimeSuccessCount,
LastContactSuccess: dbNode.LastContactSuccess,
LastContactFailure: dbNode.LastContactFailure,
AuditReputationAlpha: dbNode.AuditReputationAlpha,
AuditReputationBeta: dbNode.AuditReputationBeta,
Disqualified: dbNode.Disqualified,
Latency90: dbNode.Latency90,
AuditCount: dbNode.TotalAuditCount,
AuditSuccessCount: dbNode.AuditSuccessCount,
UptimeCount: dbNode.TotalUptimeCount,
UptimeSuccessCount: dbNode.UptimeSuccessCount,
LastContactSuccess: dbNode.LastContactSuccess,
LastContactFailure: dbNode.LastContactFailure,
AuditReputationAlpha: dbNode.AuditReputationAlpha,
AuditReputationBeta: dbNode.AuditReputationBeta,
Disqualified: dbNode.Disqualified,
UnknownAuditReputationAlpha: dbNode.UnknownAuditReputationAlpha,
UnknownAuditReputationBeta: dbNode.UnknownAuditReputationBeta,
Suspended: dbNode.Suspended,
}
return nodeStats
}
@ -1225,6 +1261,13 @@ func buildUpdateStatement(update updateNodeStats) string {
atLeastOne = true
sql += fmt.Sprintf("disqualified = '%v'", update.Disqualified.value.Format(time.RFC3339Nano))
}
if update.Suspended.set {
if atLeastOne {
sql += ","
}
atLeastOne = true
sql += fmt.Sprintf("suspended = '%v'", update.Suspended.value.Format(time.RFC3339Nano))
}
if update.UptimeSuccessCount.set {
if atLeastOne {
sql += ","
@ -1289,34 +1332,84 @@ type boolField struct {
type timeField struct {
set bool
isNil bool
value time.Time
}
type updateNodeStats struct {
NodeID storj.NodeID
TotalAuditCount int64Field
TotalUptimeCount int64Field
AuditReputationAlpha float64Field
AuditReputationBeta float64Field
Disqualified timeField
UptimeSuccessCount int64Field
LastContactSuccess timeField
LastContactFailure timeField
AuditSuccessCount int64Field
Contained boolField
NodeID storj.NodeID
TotalAuditCount int64Field
TotalUptimeCount int64Field
AuditReputationAlpha float64Field
AuditReputationBeta float64Field
Disqualified timeField
UnknownAuditReputationAlpha float64Field
UnknownAuditReputationBeta float64Field
Suspended timeField
UptimeSuccessCount int64Field
LastContactSuccess timeField
LastContactFailure timeField
AuditSuccessCount int64Field
Contained boolField
}
func populateUpdateNodeStats(dbNode *dbx.Node, updateReq *overlay.UpdateRequest) updateNodeStats {
auditAlpha, auditBeta, totalAuditCount := updateReputation(
updateReq.AuditSuccess,
dbNode.AuditReputationAlpha,
dbNode.AuditReputationBeta,
updateReq.AuditLambda,
updateReq.AuditWeight,
dbNode.TotalAuditCount,
)
mon.FloatVal("audit_reputation_alpha").Observe(auditAlpha) //locked
mon.FloatVal("audit_reputation_beta").Observe(auditBeta) //locked
// there are three audit outcomes: success, failure, and unknown
// if a node fails enough audits, it gets disqualified
// if a node gets enough "unknown" audits, it gets put into suspension
// if a node gets enough successful audits, and is in suspension, it gets removed from suspension
auditAlpha := dbNode.AuditReputationAlpha
auditBeta := dbNode.AuditReputationBeta
unknownAuditAlpha := dbNode.UnknownAuditReputationAlpha
unknownAuditBeta := dbNode.UnknownAuditReputationBeta
totalAuditCount := dbNode.TotalAuditCount
switch updateReq.AuditOutcome {
case overlay.AuditSuccess:
// for a successful audit, increase reputation for normal *and* unknown audits
auditAlpha, auditBeta, totalAuditCount = updateReputation(
true,
auditAlpha,
auditBeta,
updateReq.AuditLambda,
updateReq.AuditWeight,
totalAuditCount,
)
unknownAuditAlpha, unknownAuditBeta, totalAuditCount = updateReputation(
true,
unknownAuditAlpha,
unknownAuditBeta,
updateReq.AuditLambda,
updateReq.AuditWeight,
totalAuditCount-1, // subtract one because this is still a single audit
)
case overlay.AuditFailure:
// for audit failure, only update normal alpha/beta
auditAlpha, auditBeta, totalAuditCount = updateReputation(
false,
auditAlpha,
auditBeta,
updateReq.AuditLambda,
updateReq.AuditWeight,
totalAuditCount,
)
case overlay.AuditUnknown:
// for audit unknown, only update unknown alpha/beta
unknownAuditAlpha, unknownAuditBeta, totalAuditCount = updateReputation(
false,
unknownAuditAlpha,
unknownAuditBeta,
updateReq.AuditLambda,
updateReq.AuditWeight,
totalAuditCount,
)
}
mon.FloatVal("audit_reputation_alpha").Observe(auditAlpha) //locked
mon.FloatVal("audit_reputation_beta").Observe(auditBeta) //locked
mon.FloatVal("unknown_audit_reputation_alpha").Observe(unknownAuditAlpha) //locked
mon.FloatVal("unknown_audit_reputation_beta").Observe(unknownAuditBeta) //locked
totalUptimeCount := dbNode.TotalUptimeCount
if updateReq.IsUp {
@ -1324,11 +1417,13 @@ func populateUpdateNodeStats(dbNode *dbx.Node, updateReq *overlay.UpdateRequest)
}
updateFields := updateNodeStats{
NodeID: updateReq.NodeID,
TotalAuditCount: int64Field{set: true, value: totalAuditCount},
TotalUptimeCount: int64Field{set: true, value: totalUptimeCount},
AuditReputationAlpha: float64Field{set: true, value: auditAlpha},
AuditReputationBeta: float64Field{set: true, value: auditBeta},
NodeID: updateReq.NodeID,
TotalAuditCount: int64Field{set: true, value: totalAuditCount},
TotalUptimeCount: int64Field{set: true, value: totalUptimeCount},
AuditReputationAlpha: float64Field{set: true, value: auditAlpha},
AuditReputationBeta: float64Field{set: true, value: auditBeta},
UnknownAuditReputationAlpha: float64Field{set: true, value: unknownAuditAlpha},
UnknownAuditReputationBeta: float64Field{set: true, value: unknownAuditBeta},
}
auditRep := auditAlpha / (auditAlpha + auditBeta)
@ -1336,6 +1431,16 @@ func populateUpdateNodeStats(dbNode *dbx.Node, updateReq *overlay.UpdateRequest)
updateFields.Disqualified = timeField{set: true, value: time.Now().UTC()}
}
// if unknown audit rep goes below threshold, suspend node. Otherwise unsuspend node.
unknownAuditRep := unknownAuditAlpha / (unknownAuditAlpha + unknownAuditBeta)
if unknownAuditRep <= updateReq.AuditDQ {
updateFields.Suspended = timeField{set: true, value: time.Now().UTC()}
} else {
updateFields.Suspended = timeField{set: true, isNil: true}
}
// TODO if node has been suspended for longer than threshold, and audit outcome is failure or unknown, disqualify node.
if updateReq.IsUp {
updateFields.UptimeSuccessCount = int64Field{set: true, value: dbNode.UptimeSuccessCount + 1}
updateFields.LastContactSuccess = timeField{set: true, value: time.Now()}
@ -1343,7 +1448,7 @@ func populateUpdateNodeStats(dbNode *dbx.Node, updateReq *overlay.UpdateRequest)
updateFields.LastContactFailure = timeField{set: true, value: time.Now()}
}
if updateReq.AuditSuccess {
if updateReq.AuditOutcome == overlay.AuditSuccess {
updateFields.AuditSuccessCount = int64Field{set: true, value: dbNode.AuditSuccessCount + 1}
}
@ -1372,6 +1477,19 @@ func populateUpdateFields(dbNode *dbx.Node, updateReq *overlay.UpdateRequest) db
if update.Disqualified.set {
updateFields.Disqualified = dbx.Node_Disqualified(update.Disqualified.value)
}
if update.UnknownAuditReputationAlpha.set {
updateFields.UnknownAuditReputationAlpha = dbx.Node_UnknownAuditReputationAlpha(update.UnknownAuditReputationAlpha.value)
}
if update.UnknownAuditReputationBeta.set {
updateFields.UnknownAuditReputationBeta = dbx.Node_UnknownAuditReputationBeta(update.UnknownAuditReputationBeta.value)
}
if update.Suspended.set {
if update.Suspended.isNil {
updateFields.Suspended = dbx.Node_Suspended_Null()
} else {
updateFields.Suspended = dbx.Node_Suspended(update.Suspended.value)
}
}
if update.UptimeSuccessCount.set {
updateFields.UptimeSuccessCount = dbx.Node_UptimeSuccessCount(update.UptimeSuccessCount.value)
}
@ -1387,7 +1505,7 @@ func populateUpdateFields(dbNode *dbx.Node, updateReq *overlay.UpdateRequest) db
if update.Contained.set {
updateFields.Contained = dbx.Node_Contained(update.Contained.value)
}
if updateReq.AuditSuccess {
if updateReq.AuditOutcome == overlay.AuditSuccess {
updateFields.AuditSuccessCount = dbx.Node_AuditSuccessCount(dbNode.AuditSuccessCount + 1)
}