satellite: remove UptimeReputation configs from codebase

With the new storage node downtime tracking feature, we need remove current uptime reputation configs: UptimeReputationAlpha, UptimeReputationBeta, and
UptimeReputationDQ. This is the first step of removing the uptime
reputation columns from satellitedb

Change-Id: Ie8fab13295dbf545e33aeda0c4306cda4ba54e36
This commit is contained in:
Yingrong Zhao 2020-01-02 19:00:18 -05:00 committed by Yingrong Zhao
parent ebeee58001
commit 76ee8a1b4c
14 changed files with 178 additions and 470 deletions

View File

@ -287,20 +287,13 @@ func (planet *Planet) newSatellites(count int) ([]*SatelliteSystem, error) {
OnlineWindow: time.Minute,
DistinctIP: false,
AuditReputationRepairWeight: 1,
AuditReputationUplinkWeight: 1,
AuditReputationAlpha0: 1,
AuditReputationBeta0: 0,
AuditReputationLambda: 0.95,
AuditReputationWeight: 1,
AuditReputationDQ: 0.6,
UptimeReputationRepairWeight: 1,
UptimeReputationUplinkWeight: 1,
UptimeReputationAlpha0: 2,
UptimeReputationBeta0: 0,
UptimeReputationLambda: 0.99,
UptimeReputationWeight: 1,
UptimeReputationDQ: 0.6,
AuditReputationRepairWeight: 1,
AuditReputationUplinkWeight: 1,
AuditReputationAlpha0: 1,
AuditReputationBeta0: 0,
AuditReputationLambda: 0.95,
AuditReputationWeight: 1,
AuditReputationDQ: 0.6,
},
UpdateStatsBatchSize: 100,
},

View File

@ -143,7 +143,8 @@ func TestDisqualifiedNodesGetNoDownload(t *testing.T) {
require.NoError(t, err)
disqualifiedNode := pointer.GetRemote().GetRemotePieces()[0].NodeId
disqualifyNode(t, ctx, satellitePeer, disqualifiedNode)
err = satellitePeer.DB.OverlayCache().DisqualifyNode(ctx, disqualifiedNode)
require.NoError(t, err)
limits, _, err := satellitePeer.Orders.Service.CreateGetOrderLimits(ctx, bucketID, pointer)
require.NoError(t, err)
@ -168,7 +169,8 @@ func TestDisqualifiedNodesGetNoUpload(t *testing.T) {
disqualifiedNode := planet.StorageNodes[0]
satellitePeer.Audit.Worker.Loop.Pause()
disqualifyNode(t, ctx, satellitePeer, disqualifiedNode.ID())
err := satellitePeer.DB.OverlayCache().DisqualifyNode(ctx, disqualifiedNode.ID())
require.NoError(t, err)
request := overlay.FindStorageNodesRequest{
MinimumRequiredNodes: 4,
@ -193,7 +195,7 @@ func TestDisqualifiedNodesGetNoUpload(t *testing.T) {
func TestDisqualifiedNodeRemainsDisqualified(t *testing.T) {
// - mark a node as disqualified
// - give it high uptime and audit rate
// - give it high audit rate
// - check that the node remains disqualified
testplanet.Run(t, testplanet.Config{
@ -203,7 +205,8 @@ func TestDisqualifiedNodeRemainsDisqualified(t *testing.T) {
satellitePeer.Audit.Worker.Loop.Pause()
disqualifiedNode := planet.StorageNodes[0]
disqualifyNode(t, ctx, satellitePeer, disqualifiedNode.ID())
err := satellitePeer.DB.OverlayCache().DisqualifyNode(ctx, disqualifiedNode.ID())
require.NoError(t, err)
info := overlay.NodeCheckInInfo{
NodeID: disqualifiedNode.ID(),
@ -218,12 +221,7 @@ func TestDisqualifiedNodeRemainsDisqualified(t *testing.T) {
Release: false,
},
}
config := overlay.NodeSelectionConfig{
UptimeReputationLambda: 0,
UptimeReputationWeight: 1,
UptimeReputationDQ: 0,
}
err := satellitePeer.DB.OverlayCache().UpdateCheckIn(ctx, info, time.Now().UTC(), config)
err = satellitePeer.DB.OverlayCache().UpdateCheckIn(ctx, info, time.Now().UTC(), overlay.NodeSelectionConfig{})
require.NoError(t, err)
assert.True(t, isDisqualified(t, ctx, satellitePeer, disqualifiedNode.ID()))
@ -235,9 +233,6 @@ func TestDisqualifiedNodeRemainsDisqualified(t *testing.T) {
AuditLambda: 0, // forget about history
AuditWeight: 1,
AuditDQ: 0, // make sure new reputation scores are larger than the DQ thresholds
UptimeLambda: 0, // forget about history
UptimeWeight: 1,
UptimeDQ: 0, // make sure new reputation scores are larger than the DQ thresholds
}}, 100)
require.NoError(t, err)
@ -251,26 +246,3 @@ func isDisqualified(t *testing.T, ctx *testcontext.Context, satellite *testplane
return node.Disqualified != nil
}
func disqualifyNode(t *testing.T, ctx *testcontext.Context, satellite *testplanet.SatelliteSystem, nodeID storj.NodeID) {
info := overlay.NodeCheckInInfo{
NodeID: nodeID,
IsUp: false,
Address: &pb.NodeAddress{
Address: "1.2.3.4",
},
Version: &pb.NodeVersion{
Version: "v0.0.0",
CommitHash: "",
Timestamp: time.Time{},
Release: false,
},
}
config := overlay.NodeSelectionConfig{
UptimeReputationLambda: 1,
UptimeReputationWeight: 1,
UptimeReputationDQ: 1,
}
err := satellite.DB.OverlayCache().UpdateCheckIn(ctx, info, time.Now().UTC(), config)
require.NoError(t, err)
assert.True(t, isDisqualified(t, ctx, satellite, nodeID))
}

View File

@ -34,16 +34,11 @@ func TestDetectionChore(t *testing.T) {
Version: &nodeDossier.Version,
}
config := overlay.NodeSelectionConfig{
UptimeReputationLambda: 0.99,
UptimeReputationWeight: 1.0,
UptimeReputationDQ: 0,
}
sixtyOneMinutes := 61 * time.Minute
{ // test node ping back success
// check-in 1 hours, 1 minute ago for that node
oldCheckinTime := time.Now().UTC().Add(-sixtyOneMinutes)
err := satellite.DB.OverlayCache().UpdateCheckIn(ctx, info, oldCheckinTime, config)
err := satellite.DB.OverlayCache().UpdateCheckIn(ctx, info, oldCheckinTime, overlay.NodeSelectionConfig{})
require.NoError(t, err)
// get successful nodes that haven't checked in with the hour. should return 1
@ -68,7 +63,7 @@ func TestDetectionChore(t *testing.T) {
{ // test node ping back failure
// check-in 1 hour, 1 minute ago for that node - again
oldCheckinTime := time.Now().UTC().Add(-sixtyOneMinutes)
err := satellite.DB.OverlayCache().UpdateCheckIn(ctx, info, oldCheckinTime, config)
err := satellite.DB.OverlayCache().UpdateCheckIn(ctx, info, oldCheckinTime, overlay.NodeSelectionConfig{})
require.NoError(t, err)
// close the node service so the ping back will fail

View File

@ -46,7 +46,7 @@ type exitProcessClient interface {
}
func TestSuccess(t *testing.T) {
testTransfers(t, numObjects, func(ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
testTransfers(t, numObjects, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
var pieceID storj.PieceID
failedCount := 0
deletedCount := 0
@ -321,7 +321,7 @@ func TestRecvTimeout(t *testing.T) {
}
func TestInvalidStorageNodeSignature(t *testing.T) {
testTransfers(t, 1, func(ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
testTransfers(t, 1, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
response, err := processClient.Recv()
require.NoError(t, err)
@ -406,7 +406,8 @@ func TestExitDisqualifiedNodeFailOnStart(t *testing.T) {
satellite := planet.Satellites[0]
exitingNode := planet.StorageNodes[0]
disqualifyNode(t, ctx, satellite, exitingNode.ID())
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, exitingNode.ID())
require.NoError(t, err)
conn, err := exitingNode.Dialer.DialAddressID(ctx, satellite.Addr(), satellite.Identity.ID)
require.NoError(t, err)
@ -431,21 +432,22 @@ func TestExitDisqualifiedNodeFailOnStart(t *testing.T) {
}
func TestExitDisqualifiedNodeFailEventually(t *testing.T) {
testTransfers(t, numObjects, func(ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
disqualifyNode(t, ctx, satellite, exitingNode.ID())
deletedCount := 0
testTransfers(t, numObjects, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
var disqualifiedError error
isDisqualified := false
for {
response, err := processClient.Recv()
if errs.Is(err, io.EOF) {
// Done
break
}
if deletedCount >= numPieces {
// when a disqualified node has finished transfer all pieces, it should receive an error
require.True(t, errs2.IsRPC(err, rpcstatus.FailedPrecondition))
if errs2.IsRPC(err, rpcstatus.FailedPrecondition) {
disqualifiedError = err
break
} else {
}
if !isDisqualified {
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, exitingNode.ID())
require.NoError(t, err)
}
@ -495,18 +497,19 @@ func TestExitDisqualifiedNodeFailEventually(t *testing.T) {
err = processClient.Send(success)
require.NoError(t, err)
case *pb.SatelliteMessage_DeletePiece:
deletedCount++
continue
default:
t.FailNow()
}
}
// check that the exit has failed due to node has been disqualified
require.True(t, errs2.IsRPC(disqualifiedError, rpcstatus.FailedPrecondition))
// check that the exit has completed and we have the correct transferred/failed values
progress, err := satellite.DB.GracefulExit().GetProgress(ctx, exitingNode.ID())
require.NoError(t, err)
require.EqualValues(t, numPieces, progress.PiecesTransferred)
require.EqualValues(t, numPieces, deletedCount)
// disqualified node should fail graceful exit
exitStatus, err := satellite.Overlay.DB.GetExitStatus(ctx, exitingNode.ID())
@ -517,7 +520,7 @@ func TestExitDisqualifiedNodeFailEventually(t *testing.T) {
}
func TestFailureHashMismatch(t *testing.T) {
testTransfers(t, 1, func(ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
testTransfers(t, 1, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
response, err := processClient.Recv()
require.NoError(t, err)
@ -595,7 +598,7 @@ func TestFailureHashMismatch(t *testing.T) {
}
func TestFailureUnknownError(t *testing.T) {
testTransfers(t, 1, func(ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
testTransfers(t, 1, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
response, err := processClient.Recv()
require.NoError(t, err)
@ -636,7 +639,7 @@ func TestFailureUnknownError(t *testing.T) {
}
func TestFailureUplinkSignature(t *testing.T) {
testTransfers(t, 1, func(ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
testTransfers(t, 1, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
response, err := processClient.Recv()
require.NoError(t, err)
@ -716,7 +719,7 @@ func TestFailureUplinkSignature(t *testing.T) {
}
func TestSuccessPointerUpdate(t *testing.T) {
testTransfers(t, 1, func(ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
testTransfers(t, 1, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
var recNodeID storj.NodeID
response, err := processClient.Recv()
@ -813,7 +816,7 @@ func TestSuccessPointerUpdate(t *testing.T) {
}
func TestUpdatePointerFailure_DuplicatedNodeID(t *testing.T) {
testTransfers(t, 1, func(ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
testTransfers(t, 1, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
response, err := processClient.Recv()
require.NoError(t, err)
@ -1070,7 +1073,7 @@ func TestPointerChangedOrDeleted(t *testing.T) {
}
func TestFailureNotFoundPieceHashVerified(t *testing.T) {
testTransfers(t, 1, func(ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
testTransfers(t, 1, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
response, err := processClient.Recv()
require.NoError(t, err)
@ -1138,7 +1141,7 @@ func TestFailureNotFoundPieceHashVerified(t *testing.T) {
}
func TestFailureNotFoundPieceHashUnverified(t *testing.T) {
testTransfers(t, 1, func(ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
testTransfers(t, 1, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
// retrieve remote segment
keys, err := satellite.Metainfo.Database.List(ctx, nil, -1)
require.NoError(t, err)
@ -1430,7 +1433,7 @@ func TestIneligibleNodeAge(t *testing.T) {
})
}
func testTransfers(t *testing.T, objects int, verifier func(ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int)) {
func testTransfers(t *testing.T, objects int, verifier func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int)) {
successThreshold := 4
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
@ -1508,7 +1511,7 @@ func testTransfers(t *testing.T, objects int, verifier func(ctx *testcontext.Con
require.NoError(t, err)
defer ctx.Check(c.CloseSend)
verifier(ctx, nodeFullIDs, satellite, c, exitingNode, len(incompleteTransfers))
verifier(t, ctx, nodeFullIDs, satellite, c, exitingNode, len(incompleteTransfers))
})
}
@ -1557,19 +1560,3 @@ func findNodeToExit(ctx context.Context, planet *testplanet.Planet, objects int)
return nil, nil
}
func disqualifyNode(t *testing.T, ctx *testcontext.Context, satellite *testplanet.SatelliteSystem, nodeID storj.NodeID) {
nodeStat, err := satellite.DB.OverlayCache().UpdateStats(ctx, &overlay.UpdateRequest{
NodeID: nodeID,
IsUp: true,
AuditSuccess: false,
AuditLambda: 0,
AuditWeight: 1,
AuditDQ: 0.5,
UptimeLambda: 1,
UptimeWeight: 1,
UptimeDQ: 0.5,
})
require.NoError(t, err)
require.NotNil(t, nodeStat.Disqualified)
}

View File

@ -55,21 +55,14 @@ func (e *Endpoint) GetStats(ctx context.Context, req *pb.GetStatsRequest) (_ *pb
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
uptimeScore := calculateReputationScore(
node.Reputation.UptimeReputationAlpha,
node.Reputation.UptimeReputationBeta)
auditScore := calculateReputationScore(
node.Reputation.AuditReputationAlpha,
node.Reputation.AuditReputationBeta)
return &pb.GetStatsResponse{
UptimeCheck: &pb.ReputationStats{
TotalCount: node.Reputation.UptimeCount,
SuccessCount: node.Reputation.UptimeSuccessCount,
ReputationAlpha: node.Reputation.UptimeReputationAlpha,
ReputationBeta: node.Reputation.UptimeReputationBeta,
ReputationScore: uptimeScore,
TotalCount: node.Reputation.UptimeCount,
SuccessCount: node.Reputation.UptimeSuccessCount,
},
AuditCheck: &pb.ReputationStats{
TotalCount: node.Reputation.AuditCount,

View File

@ -125,7 +125,7 @@ func BenchmarkOverlay(b *testing.B) {
b.Run("UpdateUptime", func(b *testing.B) {
for i := 0; i < b.N; i++ {
id := all[i%len(all)]
_, err := overlaydb.UpdateUptime(ctx, id, i&1 == 0, 1, 1, 0.5)
_, err := overlaydb.UpdateUptime(ctx, id, i&1 == 0)
require.NoError(b, err)
}
})
@ -156,11 +156,8 @@ func BenchmarkOverlay(b *testing.B) {
},
},
now,
overlay.NodeSelectionConfig{
UptimeReputationLambda: 0.99,
UptimeReputationWeight: 1.0,
UptimeReputationDQ: 0,
})
overlay.NodeSelectionConfig{},
)
require.NoError(b, err)
}
})

View File

@ -32,18 +32,11 @@ type NodeSelectionConfig struct {
OnlineWindow time.Duration `help:"the amount of time without seeing a node before its considered offline" default:"4h"`
DistinctIP bool `help:"require distinct IPs when choosing nodes for upload" releaseDefault:"true" devDefault:"false"`
AuditReputationRepairWeight float64 `help:"weight to apply to audit reputation for total repair reputation calculation" default:"1.0"`
AuditReputationUplinkWeight float64 `help:"weight to apply to audit reputation for total uplink reputation calculation" default:"1.0"`
AuditReputationAlpha0 float64 `help:"the initial shape 'alpha' used to calculate audit SNs reputation" default:"1.0"`
AuditReputationBeta0 float64 `help:"the initial shape 'beta' value used to calculate audit SNs reputation" default:"0.0"`
AuditReputationLambda float64 `help:"the forgetting factor used to calculate the audit SNs reputation" default:"0.95"`
AuditReputationWeight float64 `help:"the normalization weight used to calculate the audit SNs reputation" default:"1.0"`
AuditReputationDQ float64 `help:"the reputation cut-off for disqualifying SNs based on audit history" default:"0.6"`
UptimeReputationRepairWeight float64 `help:"weight to apply to uptime reputation for total repair reputation calculation" default:"1.0"`
UptimeReputationUplinkWeight float64 `help:"weight to apply to uptime reputation for total uplink reputation calculation" default:"1.0"`
UptimeReputationAlpha0 float64 `help:"the initial shape 'alpha' used to calculate uptime SNs reputation" default:"2.0"`
UptimeReputationBeta0 float64 `help:"the initial shape 'beta' value used to calculate uptime SNs reputation" default:"0.0"`
UptimeReputationLambda float64 `help:"the forgetting factor used to calculate the uptime SNs reputation" default:"0.99"`
UptimeReputationWeight float64 `help:"the normalization weight used to calculate the uptime SNs reputation" default:"1.0"`
UptimeReputationDQ float64 `help:"the reputation cut-off for disqualifying SNs based on uptime history" default:"0"`
AuditReputationRepairWeight float64 `help:"weight to apply to audit reputation for total repair reputation calculation" default:"1.0"`
AuditReputationUplinkWeight float64 `help:"weight to apply to audit reputation for total uplink reputation calculation" default:"1.0"`
AuditReputationAlpha0 float64 `help:"the initial shape 'alpha' used to calculate audit SNs reputation" default:"1.0"`
AuditReputationBeta0 float64 `help:"the initial shape 'beta' value used to calculate audit SNs reputation" default:"0.0"`
AuditReputationLambda float64 `help:"the forgetting factor used to calculate the audit SNs reputation" default:"0.95"`
AuditReputationWeight float64 `help:"the normalization weight used to calculate the audit SNs reputation" default:"1.0"`
AuditReputationDQ float64 `help:"the reputation cut-off for disqualifying SNs based on audit history" default:"0.6"`
}

View File

@ -70,7 +70,6 @@ func TestNodeSelection(t *testing.T) {
IsUp: true,
AuditSuccess: true,
AuditLambda: 1, AuditWeight: 1, AuditDQ: 0.5,
UptimeLambda: 1, UptimeWeight: 1, UptimeDQ: 0.5,
})
require.NoError(t, err)
}
@ -96,7 +95,6 @@ func TestNodeSelectionWithBatch(t *testing.T) {
IsUp: true,
AuditSuccess: true,
AuditLambda: 1, AuditWeight: 1, AuditDQ: 0.5,
UptimeLambda: 1, UptimeWeight: 1, UptimeDQ: 0.5,
}}, 1)
require.NoError(t, err)
}
@ -219,7 +217,6 @@ func TestNodeSelectionGracefulExit(t *testing.T) {
IsUp: true,
AuditSuccess: true,
AuditLambda: 1, AuditWeight: 1, AuditDQ: 0.5,
UptimeLambda: 1, UptimeWeight: 1, UptimeDQ: 0.5,
})
require.NoError(t, err)
}
@ -379,9 +376,6 @@ func TestDistinctIPs(t *testing.T) {
AuditLambda: 1,
AuditWeight: 1,
AuditDQ: 0.5,
UptimeLambda: 1,
UptimeWeight: 1,
UptimeDQ: 0.5,
})
assert.NoError(t, err)
}
@ -410,9 +404,6 @@ func TestDistinctIPsWithBatch(t *testing.T) {
AuditLambda: 1,
AuditWeight: 1,
AuditDQ: 0.5,
UptimeLambda: 1,
UptimeWeight: 1,
UptimeDQ: 0.5,
}}, 1)
assert.NoError(t, err)
}

View File

@ -64,7 +64,7 @@ type DB interface {
// UpdateNodeInfo updates node dossier with info requested from the node itself like node type, email, wallet, capacity, and version.
UpdateNodeInfo(ctx context.Context, node storj.NodeID, nodeInfo *pb.InfoResponse) (stats *NodeDossier, err error)
// UpdateUptime updates a single storagenode's uptime stats.
UpdateUptime(ctx context.Context, nodeID storj.NodeID, isUp bool, lambda, weight, uptimeDQ float64) (stats *NodeStats, err error)
UpdateUptime(ctx context.Context, nodeID storj.NodeID, isUp bool) (stats *NodeStats, err error)
// UpdateCheckIn updates a single storagenode's check-in stats.
UpdateCheckIn(ctx context.Context, node NodeCheckInInfo, timestamp time.Time, config NodeSelectionConfig) (err error)
@ -138,12 +138,9 @@ type UpdateRequest struct {
// n.b. these are set values from the satellite.
// They are part of the UpdateRequest struct in order to be
// more easily accessible in satellite/satellitedb/overlaycache.go.
AuditLambda float64
AuditWeight float64
AuditDQ float64
UptimeLambda float64
UptimeWeight float64
UptimeDQ float64
AuditLambda float64
AuditWeight float64
AuditDQ float64
}
// ExitStatus is used for reading graceful exit status.
@ -181,18 +178,16 @@ type NodeDossier struct {
// NodeStats contains statistics about a node.
type NodeStats struct {
Latency90 int64
AuditSuccessCount int64
AuditCount int64
UptimeSuccessCount int64
UptimeCount int64
LastContactSuccess time.Time
LastContactFailure time.Time
AuditReputationAlpha float64
UptimeReputationAlpha float64
AuditReputationBeta float64
UptimeReputationBeta float64
Disqualified *time.Time
Latency90 int64
AuditSuccessCount int64
AuditCount int64
UptimeSuccessCount int64
UptimeCount int64
LastContactSuccess time.Time
LastContactFailure time.Time
AuditReputationAlpha float64
AuditReputationBeta float64
Disqualified *time.Time
}
// NodeLastContact contains the ID, address, and timestamp
@ -404,9 +399,6 @@ func (service *Service) BatchUpdateStats(ctx context.Context, requests []*Update
request.AuditLambda = service.config.Node.AuditReputationLambda
request.AuditWeight = service.config.Node.AuditReputationWeight
request.AuditDQ = service.config.Node.AuditReputationDQ
request.UptimeLambda = service.config.Node.UptimeReputationLambda
request.UptimeWeight = service.config.Node.UptimeReputationWeight
request.UptimeDQ = service.config.Node.UptimeReputationDQ
}
return service.db.BatchUpdateStats(ctx, requests, service.config.UpdateStatsBatchSize)
}
@ -418,9 +410,6 @@ func (service *Service) UpdateStats(ctx context.Context, request *UpdateRequest)
request.AuditLambda = service.config.Node.AuditReputationLambda
request.AuditWeight = service.config.Node.AuditReputationWeight
request.AuditDQ = service.config.Node.AuditReputationDQ
request.UptimeLambda = service.config.Node.UptimeReputationLambda
request.UptimeWeight = service.config.Node.UptimeReputationWeight
request.UptimeDQ = service.config.Node.UptimeReputationDQ
return service.db.UpdateStats(ctx, request)
}
@ -434,11 +423,7 @@ func (service *Service) UpdateNodeInfo(ctx context.Context, node storj.NodeID, n
// UpdateUptime updates a single storagenode's uptime stats.
func (service *Service) UpdateUptime(ctx context.Context, nodeID storj.NodeID, isUp bool) (stats *NodeStats, err error) {
defer mon.Task()(&ctx)(&err)
lambda := service.config.Node.UptimeReputationLambda
weight := service.config.Node.UptimeReputationWeight
uptimeDQ := service.config.Node.UptimeReputationDQ
return service.db.UpdateUptime(ctx, nodeID, isUp, lambda, weight, uptimeDQ)
return service.db.UpdateUptime(ctx, nodeID, isUp)
}
// UpdateCheckIn updates a single storagenode's check-in info.

View File

@ -45,20 +45,13 @@ func testNodeSelectionConfig(auditCount int64, newNodePercentage float64, distin
OnlineWindow: time.Hour,
DistinctIP: distinctIP,
AuditReputationRepairWeight: 1,
AuditReputationUplinkWeight: 1,
AuditReputationAlpha0: 1,
AuditReputationBeta0: 0,
AuditReputationLambda: 1,
AuditReputationWeight: 1,
AuditReputationDQ: 0.5,
UptimeReputationRepairWeight: 1,
UptimeReputationUplinkWeight: 1,
UptimeReputationAlpha0: 1,
UptimeReputationBeta0: 0,
UptimeReputationLambda: 1,
UptimeReputationWeight: 1,
UptimeReputationDQ: 0.5,
AuditReputationRepairWeight: 1,
AuditReputationUplinkWeight: 1,
AuditReputationAlpha0: 1,
AuditReputationBeta0: 0,
AuditReputationLambda: 1,
AuditReputationWeight: 1,
AuditReputationDQ: 0.5,
}
}
@ -83,7 +76,8 @@ func testCache(ctx context.Context, t *testing.T, store overlay.DB) {
err = service.Put(ctx, valid3ID, pb.Node{Id: valid3ID, Address: address})
require.NoError(t, err)
_, err = service.UpdateUptime(ctx, valid3ID, false)
// disqualify one node
err = service.DisqualifyNode(ctx, valid3ID)
require.NoError(t, err)
}
@ -138,8 +132,6 @@ func testCache(ctx context.Context, t *testing.T, store overlay.DB) {
require.EqualValues(t, valid1.Id, valid1ID)
require.EqualValues(t, valid1.Reputation.AuditReputationAlpha, nodeSelectionConfig.AuditReputationAlpha0)
require.EqualValues(t, valid1.Reputation.AuditReputationBeta, nodeSelectionConfig.AuditReputationBeta0)
require.EqualValues(t, valid1.Reputation.UptimeReputationAlpha, nodeSelectionConfig.UptimeReputationAlpha0)
require.EqualValues(t, valid1.Reputation.UptimeReputationBeta, nodeSelectionConfig.UptimeReputationBeta0)
require.Nil(t, valid1.Reputation.Disqualified)
stats, err := service.UpdateStats(ctx, &overlay.UpdateRequest{
@ -150,26 +142,13 @@ func testCache(ctx context.Context, t *testing.T, store overlay.DB) {
require.NoError(t, err)
newAuditAlpha := 1
newAuditBeta := 1
newUptimeAlpha := 2
newUptimeBeta := 0
require.EqualValues(t, stats.AuditReputationAlpha, newAuditAlpha)
require.EqualValues(t, stats.AuditReputationBeta, newAuditBeta)
require.EqualValues(t, stats.UptimeReputationAlpha, newUptimeAlpha)
require.EqualValues(t, stats.UptimeReputationBeta, newUptimeBeta)
require.NotNil(t, stats.Disqualified)
require.True(t, time.Now().UTC().Sub(*stats.Disqualified) < time.Minute)
stats, err = service.UpdateUptime(ctx, valid2ID, false)
err = service.DisqualifyNode(ctx, valid2ID)
require.NoError(t, err)
newUptimeAlpha = 1
newUptimeBeta = 1
require.EqualValues(t, stats.AuditReputationAlpha, nodeSelectionConfig.AuditReputationAlpha0)
require.EqualValues(t, stats.AuditReputationBeta, nodeSelectionConfig.AuditReputationBeta0)
require.EqualValues(t, stats.UptimeReputationAlpha, newUptimeAlpha)
require.EqualValues(t, stats.UptimeReputationBeta, newUptimeBeta)
require.NotNil(t, stats.Disqualified)
require.True(t, time.Now().UTC().Sub(*stats.Disqualified) < time.Minute)
dqTime := *stats.Disqualified
// should not update once already disqualified
_, err = service.BatchUpdateStats(ctx, []*overlay.UpdateRequest{{
@ -183,11 +162,7 @@ func testCache(ctx context.Context, t *testing.T, store overlay.DB) {
require.NoError(t, err)
require.EqualValues(t, dossier.Reputation.AuditReputationAlpha, nodeSelectionConfig.AuditReputationAlpha0)
require.EqualValues(t, dossier.Reputation.AuditReputationBeta, nodeSelectionConfig.AuditReputationBeta0)
require.EqualValues(t, dossier.Reputation.UptimeReputationAlpha, newUptimeAlpha)
require.EqualValues(t, dossier.Reputation.UptimeReputationBeta, newUptimeBeta)
require.NotNil(t, dossier.Disqualified)
require.Equal(t, *dossier.Disqualified, dqTime)
}
}
@ -207,10 +182,8 @@ func TestRandomizedSelection(t *testing.T) {
allIDs := make(storj.NodeIDList, totalNodes)
nodeCounts := make(map[storj.NodeID]int)
defaults := overlay.NodeSelectionConfig{
AuditReputationAlpha0: 1,
AuditReputationBeta0: 0,
UptimeReputationAlpha0: 1,
UptimeReputationBeta0: 0,
AuditReputationAlpha0: 1,
AuditReputationBeta0: 0,
}
// put nodes in cache
@ -233,9 +206,6 @@ func TestRandomizedSelection(t *testing.T) {
AuditLambda: 1,
AuditWeight: 1,
AuditDQ: 0.5,
UptimeLambda: 1,
UptimeWeight: 1,
UptimeDQ: 0.5,
})
require.NoError(t, err)
}
@ -425,9 +395,8 @@ func TestUpdateCheckIn(t *testing.T) {
FreeDisk: info.Capacity.GetFreeDisk(),
},
Reputation: overlay.NodeStats{
UptimeCount: 1,
UptimeSuccessCount: 1,
UptimeReputationAlpha: 1,
UptimeCount: 1,
UptimeSuccessCount: 1,
},
Version: pb.NodeVersion{
Version: "v0.0.0",
@ -440,11 +409,6 @@ func TestUpdateCheckIn(t *testing.T) {
PieceCount: 0,
ExitStatus: overlay.ExitStatus{NodeID: nodeID},
}
config := overlay.NodeSelectionConfig{
UptimeReputationLambda: 0.99,
UptimeReputationWeight: 1.0,
UptimeReputationDQ: 0,
}
// confirm the node doesn't exist in nodes table yet
_, err := db.OverlayCache().Get(ctx, nodeID)
@ -454,7 +418,7 @@ func TestUpdateCheckIn(t *testing.T) {
// check-in for that node id, which should add the node
// to the nodes tables in the database
startOfTest := time.Now().UTC()
err = db.OverlayCache().UpdateCheckIn(ctx, info, time.Now().UTC(), config)
err = db.OverlayCache().UpdateCheckIn(ctx, info, time.Now().UTC(), overlay.NodeSelectionConfig{})
require.NoError(t, err)
// confirm that the node is now in the nodes table with the
@ -492,7 +456,7 @@ func TestUpdateCheckIn(t *testing.T) {
}
// confirm that the updated node is in the nodes table with the
// correct updated fields set
err = db.OverlayCache().UpdateCheckIn(ctx, updatedInfo, time.Now().UTC(), config)
err = db.OverlayCache().UpdateCheckIn(ctx, updatedInfo, time.Now().UTC(), overlay.NodeSelectionConfig{})
require.NoError(t, err)
updatedNode, err := db.OverlayCache().Get(ctx, nodeID)
require.NoError(t, err)
@ -524,7 +488,7 @@ func TestUpdateCheckIn(t *testing.T) {
Release: false,
},
}
err = db.OverlayCache().UpdateCheckIn(ctx, updatedInfo2, time.Now().UTC(), config)
err = db.OverlayCache().UpdateCheckIn(ctx, updatedInfo2, time.Now().UTC(), overlay.NodeSelectionConfig{})
require.NoError(t, err)
updated2Node, err := db.OverlayCache().Get(ctx, nodeID)
require.NoError(t, err)
@ -541,10 +505,8 @@ func TestCache_DowntimeTracking(t *testing.T) {
cache := db.OverlayCache()
defaults := overlay.NodeSelectionConfig{
AuditReputationAlpha0: 1,
AuditReputationBeta0: 0,
UptimeReputationAlpha0: 1,
UptimeReputationBeta0: 0,
AuditReputationAlpha0: 1,
AuditReputationBeta0: 0,
}
totalNodes := 10
@ -565,12 +527,14 @@ func TestCache_DowntimeTracking(t *testing.T) {
// make half of the nodes (0, 2, 4, 6, 8) offline + not disqualified
if i%2 == 0 {
_, err := cache.UpdateUptime(ctx, newID, false, 1, 0, 0)
_, err := cache.UpdateUptime(ctx, newID, false)
require.NoError(t, err)
}
// make first node (0) offline + disqualified
if i == 0 {
_, err := cache.UpdateUptime(ctx, newID, false, 1, 0, 1)
_, err := cache.UpdateUptime(ctx, newID, false)
require.NoError(t, err)
err = cache.DisqualifyNode(ctx, newID)
require.NoError(t, err)
}
}
@ -603,22 +567,16 @@ func TestGetSuccesfulNodesNotCheckedInSince(t *testing.T) {
info1 := getNodeInfo(testrand.NodeID())
info2 := getNodeInfo(testrand.NodeID())
config := overlay.NodeSelectionConfig{
UptimeReputationLambda: 0.99,
UptimeReputationWeight: 1.0,
UptimeReputationDQ: 0,
}
{ // check-in the nodes, which should add them
twoHoursAgo := time.Now().UTC().Add(-2 * time.Hour)
err := db.OverlayCache().UpdateCheckIn(ctx, info1, twoHoursAgo, config)
err := db.OverlayCache().UpdateCheckIn(ctx, info1, twoHoursAgo, overlay.NodeSelectionConfig{})
require.NoError(t, err)
err = db.OverlayCache().UpdateCheckIn(ctx, info2, twoHoursAgo, config)
err = db.OverlayCache().UpdateCheckIn(ctx, info2, twoHoursAgo, overlay.NodeSelectionConfig{})
require.NoError(t, err)
// update uptime so that node 2 has a last contact failure > last contact success
_, err = db.OverlayCache().UpdateUptime(ctx, info2.NodeID, false, 0.99, 1.0, 0)
_, err = db.OverlayCache().UpdateUptime(ctx, info2.NodeID, false)
require.NoError(t, err)
// should just get 1 node
@ -630,7 +588,7 @@ func TestGetSuccesfulNodesNotCheckedInSince(t *testing.T) {
}
{ // check-in again with current time
err := db.OverlayCache().UpdateCheckIn(ctx, info1, time.Now().UTC(), config)
err := db.OverlayCache().UpdateCheckIn(ctx, info1, time.Now().UTC(), overlay.NodeSelectionConfig{})
require.NoError(t, err)
nodeLastContacts, err := db.OverlayCache().GetSuccesfulNodesNotCheckedInSince(ctx, time.Minute)

View File

@ -37,21 +37,16 @@ func TestStatDB(t *testing.T) {
func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) {
{ // TestKnownUnreliableOrOffline
for _, tt := range []struct {
nodeID storj.NodeID
auditAlpha float64
auditBeta float64
uptimeAlpha float64
uptimeBeta float64
nodeID storj.NodeID
auditAlpha float64
auditBeta float64
}{
{storj.NodeID{1}, 20, 0, 20, 0}, // good reputations => good
{storj.NodeID{2}, 0, 20, 20, 0}, // bad audit rep, good uptime rep => bad
{storj.NodeID{3}, 20, 0, 0, 20}, // good audit rep, bad uptime rep => bad
{storj.NodeID{1}, 20, 0}, // good reputations => good
{storj.NodeID{2}, 0, 20}, // bad audit rep
} {
startingRep := overlay.NodeSelectionConfig{
AuditReputationAlpha0: tt.auditAlpha,
AuditReputationBeta0: tt.auditBeta,
UptimeReputationAlpha0: tt.uptimeAlpha,
UptimeReputationBeta0: tt.uptimeBeta,
AuditReputationAlpha0: tt.auditAlpha,
AuditReputationBeta0: tt.auditBeta,
}
err := cache.UpdateAddress(ctx, &pb.Node{Id: tt.nodeID}, startingRep)
@ -63,15 +58,14 @@ func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) {
AuditSuccess: true,
IsUp: true,
AuditLambda: 1, AuditWeight: 1,
UptimeLambda: 1, UptimeWeight: 1,
AuditDQ: 0.9, UptimeDQ: 0.9,
AuditDQ: 0.9,
})
require.NoError(t, err)
}
nodeIds := storj.NodeIDList{
storj.NodeID{1}, storj.NodeID{2},
storj.NodeID{3}, storj.NodeID{4},
storj.NodeID{3},
}
criteria := &overlay.NodeCriteria{OnlineWindow: time.Hour}
@ -79,9 +73,8 @@ func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) {
require.NoError(t, err)
require.Contains(t, invalid, storj.NodeID{2}) // bad audit
require.Contains(t, invalid, storj.NodeID{3}) // bad uptime
require.Contains(t, invalid, storj.NodeID{4}) // not in db
require.Len(t, invalid, 3)
require.Contains(t, invalid, storj.NodeID{3}) // not in db
require.Len(t, invalid, 2)
}
{ // TestUpdateOperator
@ -138,33 +131,24 @@ func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) {
auditAlpha := node.Reputation.AuditReputationAlpha
auditBeta := node.Reputation.AuditReputationBeta
uptimeAlpha := node.Reputation.UptimeReputationAlpha
uptimeBeta := node.Reputation.UptimeReputationBeta
updateReq := &overlay.UpdateRequest{
NodeID: nodeID,
AuditSuccess: true,
IsUp: true,
AuditLambda: 0.123, AuditWeight: 0.456,
UptimeLambda: 0.789, UptimeWeight: 0.876,
AuditDQ: 0, UptimeDQ: 0, // don't disqualify for any reason
AuditDQ: 0, // don't disqualify for any reason
}
stats, err := cache.UpdateStats(ctx, updateReq)
require.NoError(t, err)
expectedAuditAlpha := updateReq.AuditLambda*auditAlpha + updateReq.AuditWeight
expectedAuditBeta := updateReq.AuditLambda * auditBeta
expectedUptimeAlpha := updateReq.UptimeLambda*uptimeAlpha + updateReq.UptimeWeight
expectedUptimeBeta := updateReq.UptimeLambda * uptimeBeta
require.EqualValues(t, stats.AuditReputationAlpha, expectedAuditAlpha)
require.EqualValues(t, stats.AuditReputationBeta, expectedAuditBeta)
require.EqualValues(t, stats.UptimeReputationAlpha, expectedUptimeAlpha)
require.EqualValues(t, stats.UptimeReputationBeta, expectedUptimeBeta)
auditAlpha = expectedAuditAlpha
auditBeta = expectedAuditBeta
uptimeAlpha = expectedUptimeAlpha
uptimeBeta = expectedUptimeBeta
updateReq.AuditSuccess = false
updateReq.IsUp = false
@ -173,12 +157,8 @@ func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) {
expectedAuditAlpha = updateReq.AuditLambda * auditAlpha
expectedAuditBeta = updateReq.AuditLambda*auditBeta + updateReq.AuditWeight
expectedUptimeAlpha = updateReq.UptimeLambda * uptimeAlpha
expectedUptimeBeta = updateReq.UptimeLambda*uptimeBeta + updateReq.UptimeWeight
require.EqualValues(t, stats.AuditReputationAlpha, expectedAuditAlpha)
require.EqualValues(t, stats.AuditReputationBeta, expectedAuditBeta)
require.EqualValues(t, stats.UptimeReputationAlpha, expectedUptimeAlpha)
require.EqualValues(t, stats.UptimeReputationBeta, expectedUptimeBeta)
}
@ -186,14 +166,8 @@ func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) {
nodeID := storj.NodeID{1}
// get the existing node info that is stored in nodes table
node, err := cache.Get(ctx, nodeID)
_, err := cache.Get(ctx, nodeID)
require.NoError(t, err)
alpha := node.Reputation.UptimeReputationAlpha
beta := node.Reputation.UptimeReputationBeta
lambda := 0.789
weight := 0.876
dq := float64(0) // don't disqualify for any reason
info := overlay.NodeCheckInInfo{
NodeID: nodeID,
@ -208,37 +182,18 @@ func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) {
Release: false,
},
}
config := overlay.NodeSelectionConfig{
UptimeReputationLambda: lambda,
UptimeReputationWeight: weight,
UptimeReputationDQ: dq,
}
// update check-in when node is offline
err = cache.UpdateCheckIn(ctx, info, time.Now().UTC(), config)
err = cache.UpdateCheckIn(ctx, info, time.Now().UTC(), overlay.NodeSelectionConfig{})
require.NoError(t, err)
node, err = cache.Get(ctx, nodeID)
_, err = cache.Get(ctx, nodeID)
require.NoError(t, err)
expectedAlpha := lambda * alpha
expectedBeta := lambda*beta + weight
// confirm the reputation is updated correctly when node is offline
require.EqualValues(t, node.Reputation.UptimeReputationAlpha, expectedAlpha)
require.EqualValues(t, node.Reputation.UptimeReputationBeta, expectedBeta)
alpha = expectedAlpha
beta = expectedBeta
info.IsUp = true
// update check-in when node is online
err = cache.UpdateCheckIn(ctx, info, time.Now().UTC(), config)
err = cache.UpdateCheckIn(ctx, info, time.Now().UTC(), overlay.NodeSelectionConfig{})
require.NoError(t, err)
node, err = cache.Get(ctx, nodeID)
_, err = cache.Get(ctx, nodeID)
require.NoError(t, err)
expectedAlpha = lambda*alpha + weight
expectedBeta = lambda * beta
// confirm the reputation is updated correctly when node is online
require.EqualValues(t, node.Reputation.UptimeReputationAlpha, expectedAlpha)
require.EqualValues(t, node.Reputation.UptimeReputationBeta, expectedBeta)
}
}

View File

@ -115,7 +115,8 @@ func TestDataRepair(t *testing.T) {
for _, node := range planet.StorageNodes {
if nodesToDisqualify[node.ID()] {
disqualifyNode(t, ctx, satellite, node.ID())
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, node.ID())
require.NoError(t, err)
continue
}
if nodesToKill[node.ID()] {
@ -445,7 +446,9 @@ func TestRemoveDeletedSegmentFromQueue(t *testing.T) {
}
for nodeID := range nodesToDQ {
disqualifyNode(t, ctx, satellite, nodeID)
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, nodeID)
require.NoError(t, err)
}
// trigger checker to add segment to repair queue
@ -526,7 +529,8 @@ func TestRemoveIrreparableSegmentFromQueue(t *testing.T) {
}
for nodeID := range nodesToDQ {
disqualifyNode(t, ctx, satellite, nodeID)
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, nodeID)
require.NoError(t, err)
}
// trigger checker to add segment to repair queue
@ -537,7 +541,9 @@ func TestRemoveIrreparableSegmentFromQueue(t *testing.T) {
// Kill nodes so that online nodes < minimum threshold
// This will make the segment irreparable
for _, piece := range remotePieces {
disqualifyNode(t, ctx, satellite, piece.NodeId)
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, piece.NodeId)
require.NoError(t, err)
}
// Verify that the segment is on the repair queue
@ -629,7 +635,9 @@ func TestRepairMultipleDisqualified(t *testing.T) {
for _, node := range planet.StorageNodes {
if nodesToDisqualify[node.ID()] {
disqualifyNode(t, ctx, satellite, node.ID())
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, node.ID())
require.NoError(t, err)
}
}
@ -986,29 +994,6 @@ func TestDataRepairUploadLimit(t *testing.T) {
})
}
func isDisqualified(t *testing.T, ctx *testcontext.Context, satellite *testplanet.SatelliteSystem, nodeID storj.NodeID) bool {
node, err := satellite.Overlay.Service.Get(ctx, nodeID)
require.NoError(t, err)
return node.Disqualified != nil
}
func disqualifyNode(t *testing.T, ctx *testcontext.Context, satellite *testplanet.SatelliteSystem, nodeID storj.NodeID) {
_, err := satellite.DB.OverlayCache().UpdateStats(ctx, &overlay.UpdateRequest{
NodeID: nodeID,
IsUp: true,
AuditSuccess: false,
AuditLambda: 0,
AuditWeight: 1,
AuditDQ: 0.5,
UptimeLambda: 1,
UptimeWeight: 1,
UptimeDQ: 0.5,
})
require.NoError(t, err)
require.True(t, isDisqualified(t, ctx, satellite, nodeID))
}
// getRemoteSegment returns a remote pointer its path from satellite.
// nolint:golint
func getRemoteSegment(

View File

@ -191,7 +191,7 @@ func (cache *overlaycache) queryNodes(ctx context.Context, excludedNodes []storj
rows, err = cache.db.Query(cache.db.Rebind(`SELECT id, type, address, last_net,
free_bandwidth, free_disk, total_audit_count, audit_success_count,
total_uptime_count, uptime_success_count, disqualified, audit_reputation_alpha,
audit_reputation_beta, uptime_reputation_alpha, uptime_reputation_beta
audit_reputation_beta
FROM nodes
`+safeQuery+safeExcludeNodes+`
ORDER BY RANDOM()
@ -209,7 +209,6 @@ func (cache *overlaycache) queryNodes(ctx context.Context, excludedNodes []storj
&dbNode.TotalAuditCount, &dbNode.AuditSuccessCount,
&dbNode.TotalUptimeCount, &dbNode.UptimeSuccessCount, &dbNode.Disqualified,
&dbNode.AuditReputationAlpha, &dbNode.AuditReputationBeta,
&dbNode.UptimeReputationAlpha, &dbNode.UptimeReputationBeta,
)
if err != nil {
return nil, err
@ -255,8 +254,7 @@ func (cache *overlaycache) queryNodesDistinct(ctx context.Context, excludedNodes
SELECT DISTINCT ON (last_net) last_net, -- choose at max 1 node from this IP or network
id, type, address, free_bandwidth, free_disk, total_audit_count,
audit_success_count, total_uptime_count, uptime_success_count,
audit_reputation_alpha, audit_reputation_beta, uptime_reputation_alpha,
uptime_reputation_beta
audit_reputation_alpha, audit_reputation_beta
FROM nodes
`+safeQuery+safeExcludeNodes+safeExcludeIPs+`
AND last_net <> '' -- don't try to IP-filter nodes with no known IP yet
@ -277,7 +275,6 @@ func (cache *overlaycache) queryNodesDistinct(ctx context.Context, excludedNodes
&dbNode.TotalAuditCount, &dbNode.AuditSuccessCount,
&dbNode.TotalUptimeCount, &dbNode.UptimeSuccessCount,
&dbNode.AuditReputationAlpha, &dbNode.AuditReputationBeta,
&dbNode.UptimeReputationAlpha, &dbNode.UptimeReputationBeta,
)
if err != nil {
return nil, err
@ -558,8 +555,9 @@ func (cache *overlaycache) UpdateAddress(ctx context.Context, info *pb.Node, def
dbx.Node_Contained(false),
dbx.Node_AuditReputationAlpha(defaults.AuditReputationAlpha0),
dbx.Node_AuditReputationBeta(defaults.AuditReputationBeta0),
dbx.Node_UptimeReputationAlpha(defaults.UptimeReputationAlpha0),
dbx.Node_UptimeReputationBeta(defaults.UptimeReputationBeta0),
//TODO: remove uptime reputation after finishing db migration
dbx.Node_UptimeReputationAlpha(0),
dbx.Node_UptimeReputationBeta(0),
dbx.Node_ExitSuccess(false),
dbx.Node_Create_Fields{
Disqualified: dbx.Node_Disqualified_Null(),
@ -741,7 +739,7 @@ func (cache *overlaycache) UpdateNodeInfo(ctx context.Context, nodeID storj.Node
}
// UpdateUptime updates a single storagenode's uptime stats in the db
func (cache *overlaycache) UpdateUptime(ctx context.Context, nodeID storj.NodeID, isUp bool, lambda, weight, uptimeDQ float64) (stats *overlay.NodeStats, err error) {
func (cache *overlaycache) UpdateUptime(ctx context.Context, nodeID storj.NodeID, isUp bool) (stats *overlay.NodeStats, err error) {
defer mon.Task()(&ctx)(&err)
var dbNode *dbx.Node
@ -756,30 +754,13 @@ func (cache *overlaycache) UpdateUptime(ctx context.Context, nodeID storj.NodeID
}
updateFields := dbx.Node_Update_Fields{}
uptimeAlpha, uptimeBeta, totalUptimeCount := updateReputation(
isUp,
dbNode.UptimeReputationAlpha,
dbNode.UptimeReputationBeta,
lambda,
weight,
dbNode.TotalUptimeCount,
)
mon.FloatVal("uptime_reputation_alpha").Observe(uptimeAlpha)
mon.FloatVal("uptime_reputation_beta").Observe(uptimeBeta)
updateFields.UptimeReputationAlpha = dbx.Node_UptimeReputationAlpha(uptimeAlpha)
updateFields.UptimeReputationBeta = dbx.Node_UptimeReputationBeta(uptimeBeta)
updateFields.TotalUptimeCount = dbx.Node_TotalUptimeCount(totalUptimeCount)
uptimeRep := uptimeAlpha / (uptimeAlpha + uptimeBeta)
if uptimeRep <= uptimeDQ {
updateFields.Disqualified = dbx.Node_Disqualified(time.Now().UTC())
}
totalUptimeCount := dbNode.TotalUptimeCount
lastContactSuccess := dbNode.LastContactSuccess
lastContactFailure := dbNode.LastContactFailure
mon.Meter("uptime_updates").Mark(1)
if isUp {
totalUptimeCount++
updateFields.UptimeSuccessCount = dbx.Node_UptimeSuccessCount(dbNode.UptimeSuccessCount + 1)
updateFields.LastContactSuccess = dbx.Node_LastContactSuccess(time.Now())
@ -806,6 +787,7 @@ func (cache *overlaycache) UpdateUptime(ctx context.Context, nodeID storj.NodeID
}
}
updateFields.TotalUptimeCount = dbx.Node_TotalUptimeCount(totalUptimeCount)
dbNode, err = tx.Update_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()), updateFields)
return err
})
@ -1188,18 +1170,16 @@ func convertDBNodeToPBNode(ctx context.Context, info *dbx.Id_LastNet_Address_Pro
func getNodeStats(dbNode *dbx.Node) *overlay.NodeStats {
nodeStats := &overlay.NodeStats{
Latency90: dbNode.Latency90,
AuditCount: dbNode.TotalAuditCount,
AuditSuccessCount: dbNode.AuditSuccessCount,
UptimeCount: dbNode.TotalUptimeCount,
UptimeSuccessCount: dbNode.UptimeSuccessCount,
LastContactSuccess: dbNode.LastContactSuccess,
LastContactFailure: dbNode.LastContactFailure,
AuditReputationAlpha: dbNode.AuditReputationAlpha,
AuditReputationBeta: dbNode.AuditReputationBeta,
UptimeReputationAlpha: dbNode.UptimeReputationAlpha,
UptimeReputationBeta: dbNode.UptimeReputationBeta,
Disqualified: dbNode.Disqualified,
Latency90: dbNode.Latency90,
AuditCount: dbNode.TotalAuditCount,
AuditSuccessCount: dbNode.AuditSuccessCount,
UptimeCount: dbNode.TotalUptimeCount,
UptimeSuccessCount: dbNode.UptimeSuccessCount,
LastContactSuccess: dbNode.LastContactSuccess,
LastContactFailure: dbNode.LastContactFailure,
AuditReputationAlpha: dbNode.AuditReputationAlpha,
AuditReputationBeta: dbNode.AuditReputationBeta,
Disqualified: dbNode.Disqualified,
}
return nodeStats
}
@ -1249,20 +1229,6 @@ func buildUpdateStatement(update updateNodeStats) string {
atLeastOne = true
sql += fmt.Sprintf("audit_reputation_beta = %v", update.AuditReputationBeta.value)
}
if update.UptimeReputationAlpha.set {
if atLeastOne {
sql += ","
}
atLeastOne = true
sql += fmt.Sprintf("uptime_reputation_alpha = %v", update.UptimeReputationAlpha.value)
}
if update.UptimeReputationBeta.set {
if atLeastOne {
sql += ","
}
atLeastOne = true
sql += fmt.Sprintf("uptime_reputation_beta = %v", update.UptimeReputationBeta.value)
}
if update.Disqualified.set {
if atLeastOne {
sql += ","
@ -1338,19 +1304,17 @@ type timeField struct {
}
type updateNodeStats struct {
NodeID storj.NodeID
TotalAuditCount int64Field
TotalUptimeCount int64Field
AuditReputationAlpha float64Field
AuditReputationBeta float64Field
UptimeReputationAlpha float64Field
UptimeReputationBeta float64Field
Disqualified timeField
UptimeSuccessCount int64Field
LastContactSuccess timeField
LastContactFailure timeField
AuditSuccessCount int64Field
Contained boolField
NodeID storj.NodeID
TotalAuditCount int64Field
TotalUptimeCount int64Field
AuditReputationAlpha float64Field
AuditReputationBeta float64Field
Disqualified timeField
UptimeSuccessCount int64Field
LastContactSuccess timeField
LastContactFailure timeField
AuditSuccessCount int64Field
Contained boolField
}
func populateUpdateNodeStats(dbNode *dbx.Node, updateReq *overlay.UpdateRequest) updateNodeStats {
@ -1365,25 +1329,17 @@ func populateUpdateNodeStats(dbNode *dbx.Node, updateReq *overlay.UpdateRequest)
mon.FloatVal("audit_reputation_alpha").Observe(auditAlpha) //locked
mon.FloatVal("audit_reputation_beta").Observe(auditBeta) //locked
uptimeAlpha, uptimeBeta, totalUptimeCount := updateReputation(
updateReq.IsUp,
dbNode.UptimeReputationAlpha,
dbNode.UptimeReputationBeta,
updateReq.UptimeLambda,
updateReq.UptimeWeight,
dbNode.TotalUptimeCount,
)
mon.FloatVal("uptime_reputation_alpha").Observe(uptimeAlpha)
mon.FloatVal("uptime_reputation_beta").Observe(uptimeBeta)
totalUptimeCount := dbNode.TotalUptimeCount
if updateReq.IsUp {
totalUptimeCount++
}
updateFields := updateNodeStats{
NodeID: updateReq.NodeID,
TotalAuditCount: int64Field{set: true, value: totalAuditCount},
TotalUptimeCount: int64Field{set: true, value: totalUptimeCount},
AuditReputationAlpha: float64Field{set: true, value: auditAlpha},
AuditReputationBeta: float64Field{set: true, value: auditBeta},
UptimeReputationAlpha: float64Field{set: true, value: uptimeAlpha},
UptimeReputationBeta: float64Field{set: true, value: uptimeBeta},
NodeID: updateReq.NodeID,
TotalAuditCount: int64Field{set: true, value: totalAuditCount},
TotalUptimeCount: int64Field{set: true, value: totalUptimeCount},
AuditReputationAlpha: float64Field{set: true, value: auditAlpha},
AuditReputationBeta: float64Field{set: true, value: auditBeta},
}
auditRep := auditAlpha / (auditAlpha + auditBeta)
@ -1391,13 +1347,6 @@ func populateUpdateNodeStats(dbNode *dbx.Node, updateReq *overlay.UpdateRequest)
updateFields.Disqualified = timeField{set: true, value: time.Now().UTC()}
}
uptimeRep := uptimeAlpha / (uptimeAlpha + uptimeBeta)
if uptimeRep <= updateReq.UptimeDQ {
// n.b. that this will overwrite the audit DQ timestamp
// if it has already been set.
updateFields.Disqualified = timeField{set: true, value: time.Now().UTC()}
}
if updateReq.IsUp {
updateFields.UptimeSuccessCount = int64Field{set: true, value: dbNode.UptimeSuccessCount + 1}
updateFields.LastContactSuccess = timeField{set: true, value: time.Now()}
@ -1431,12 +1380,6 @@ func populateUpdateFields(dbNode *dbx.Node, updateReq *overlay.UpdateRequest) db
if update.AuditReputationBeta.set {
updateFields.AuditReputationBeta = dbx.Node_AuditReputationBeta(update.AuditReputationBeta.value)
}
if update.UptimeReputationAlpha.set {
updateFields.UptimeReputationAlpha = dbx.Node_UptimeReputationAlpha(update.UptimeReputationAlpha.value)
}
if update.UptimeReputationBeta.set {
updateFields.UptimeReputationBeta = dbx.Node_UptimeReputationBeta(update.UptimeReputationBeta.value)
}
if update.Disqualified.set {
updateFields.Disqualified = dbx.Node_Disqualified(update.Disqualified.value)
}
@ -1470,14 +1413,6 @@ func (cache *overlaycache) UpdateCheckIn(ctx context.Context, node overlay.NodeC
return Error.New("error UpdateCheckIn: missing the storage node address")
}
// v is a single feedback value that allows us to update both alpha and beta
var v float64 = -1
if node.IsUp {
v = 1
}
uptimeReputationAlpha := config.UptimeReputationLambda*config.UptimeReputationAlpha0 + config.UptimeReputationWeight*(1+v)/2
uptimeReputationBeta := config.UptimeReputationLambda*config.UptimeReputationBeta0 + config.UptimeReputationWeight*(1-v)/2
semVer, err := version.NewSemVer(node.Version.GetVersion())
if err != nil {
return Error.New("unable to convert version to semVer")
@ -1488,24 +1423,24 @@ func (cache *overlaycache) UpdateCheckIn(ctx context.Context, node overlay.NodeC
(
id, address, last_net, protocol, type,
email, wallet, free_bandwidth, free_disk,
uptime_success_count, total_uptime_count,
uptime_success_count, total_uptime_count,
last_contact_success,
last_contact_failure,
audit_reputation_alpha, audit_reputation_beta, uptime_reputation_alpha, uptime_reputation_beta,
audit_reputation_alpha, audit_reputation_beta,
major, minor, patch, hash, timestamp, release
)
VALUES (
$1, $2, $3, $4, $5,
$6, $7, $8, $9,
$10::bool::int, 1,
CASE WHEN $10::bool IS TRUE THEN $24::timestamptz
CASE WHEN $10::bool IS TRUE THEN $19::timestamptz
ELSE '0001-01-01 00:00:00+00'::timestamptz
END,
CASE WHEN $10::bool IS FALSE THEN $24::timestamptz
CASE WHEN $10::bool IS FALSE THEN $19::timestamptz
ELSE '0001-01-01 00:00:00+00'::timestamptz
END,
$11, $12, $13, $14,
$18, $19, $20, $21, $22, $23
$11, $12,
$13, $14, $15, $16, $17, $18
)
ON CONFLICT (id)
DO UPDATE
@ -1517,24 +1452,16 @@ func (cache *overlaycache) UpdateCheckIn(ctx context.Context, node overlay.NodeC
wallet=$7,
free_bandwidth=$8,
free_disk=$9,
major=$18, minor=$19, patch=$20, hash=$21, timestamp=$22, release=$23,
major=$13, minor=$14, patch=$15, hash=$16, timestamp=$17, release=$18,
total_uptime_count=nodes.total_uptime_count+1,
uptime_reputation_alpha=$16::float*nodes.uptime_reputation_alpha + $17::float*$10::bool::int::float,
uptime_reputation_beta=$16::float*nodes.uptime_reputation_beta + $17::float*(NOT $10)::bool::int::float,
uptime_success_count = nodes.uptime_success_count + $10::bool::int,
last_contact_success = CASE WHEN $10::bool IS TRUE
THEN $24::timestamptz
THEN $19::timestamptz
ELSE nodes.last_contact_success
END,
last_contact_failure = CASE WHEN $10::bool IS FALSE
THEN $24::timestamptz
THEN $19::timestamptz
ELSE nodes.last_contact_failure
END,
-- this disqualified case statement resolves to:
-- when (new.uptime_reputation_alpha /(new.uptime_reputation_alpha + new.uptime_reputation_beta)) <= config.UptimeReputationDQ
disqualified = CASE WHEN (($16::float*nodes.uptime_reputation_alpha + $17::float*$10::bool::int::float) / (($16::float*nodes.uptime_reputation_alpha + $17::float*$10::bool::int::float) + ($16::float*nodes.uptime_reputation_beta + $17::float*(NOT $10)::bool::int::float))) <= $15 AND nodes.disqualified IS NULL
THEN $24::timestamptz
ELSE nodes.disqualified
END;
`
_, err = cache.db.ExecContext(ctx, query,
@ -1544,13 +1471,11 @@ func (cache *overlaycache) UpdateCheckIn(ctx context.Context, node overlay.NodeC
node.Operator.GetEmail(), node.Operator.GetWallet(), node.Capacity.GetFreeBandwidth(), node.Capacity.GetFreeDisk(),
// args $10
node.IsUp,
// args $11 - $14
config.AuditReputationAlpha0, config.AuditReputationBeta0, uptimeReputationAlpha, uptimeReputationBeta,
// args $15 - $17
config.UptimeReputationDQ, config.UptimeReputationLambda, config.UptimeReputationWeight,
// args $18 - $23
// args $11 - $12
config.AuditReputationAlpha0, config.AuditReputationBeta0,
// args $13 - $18
semVer.Major, semVer.Minor, semVer.Patch, node.Version.GetCommitHash(), node.Version.Timestamp, node.Version.GetRelease(),
// args $24
// args $19
timestamp,
)
if err != nil {

View File

@ -328,27 +328,6 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
# the number of times a node's uptime has been checked to not be considered a New Node
# overlay.node.uptime-count: 100
# the initial shape 'alpha' used to calculate uptime SNs reputation
# overlay.node.uptime-reputation-alpha0: 2
# the initial shape 'beta' value used to calculate uptime SNs reputation
# overlay.node.uptime-reputation-beta0: 0
# the reputation cut-off for disqualifying SNs based on uptime history
# overlay.node.uptime-reputation-dq: 0
# the forgetting factor used to calculate the uptime SNs reputation
# overlay.node.uptime-reputation-lambda: 0.99
# weight to apply to uptime reputation for total repair reputation calculation
# overlay.node.uptime-reputation-repair-weight: 1
# weight to apply to uptime reputation for total uplink reputation calculation
# overlay.node.uptime-reputation-uplink-weight: 1
# the normalization weight used to calculate the uptime SNs reputation
# overlay.node.uptime-reputation-weight: 1
# number of update requests to process per transaction
# overlay.update-stats-batch-size: 100