satellite/nodeselection: don't select nodes that haven't checked in for a while (#3567)

* satellite/nodeselection: dont select nodes that havent checked in for a while

* change testplanet online window to one minute

* remove satellite reconfigure online window = 0 in repair tests

* pass timestamp into UpdateCheckIn

* change timestamp to timestamptz

* edit tests to set last_contact_success to 4 hours ago

* fix syntax error

* remove check for last_contact_success > last_contact_failure in IsOnline
This commit is contained in:
littleskunk 2019-11-15 23:43:06 +01:00 committed by GitHub
parent ecd2ef4a21
commit 8b3444e088
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 64 additions and 76 deletions

View File

@ -263,7 +263,7 @@ func (planet *Planet) newSatellites(count int) ([]*SatelliteSystem, error) {
UptimeCount: 0,
AuditCount: 0,
NewNodePercentage: 0,
OnlineWindow: 0,
OnlineWindow: time.Minute,
DistinctIP: false,
AuditReputationRepairWeight: 1,

View File

@ -128,9 +128,9 @@ func TestDownloadWithSomeNodesOffline(t *testing.T) {
// mark node as offline in overlay
info := overlay.NodeCheckInInfo{
NodeID: node.ID(),
IsUp: false,
IsUp: true,
Address: &pb.NodeAddress{
Address: "1.2.3.4",
Address: node.Addr(),
},
Version: &pb.NodeVersion{
Version: "v0.0.0",
@ -139,12 +139,13 @@ func TestDownloadWithSomeNodesOffline(t *testing.T) {
Release: false,
},
}
err = satellite.Overlay.Service.UpdateCheckIn(ctx, info)
err = satellite.Overlay.Service.UpdateCheckIn(ctx, info, time.Now().UTC().Add(-4*time.Hour))
require.NoError(t, err)
}
}
// confirm that we marked the correct number of storage nodes as offline
nodes, err := satellite.Overlay.DB.SelectStorageNodes(ctx, len(planet.StorageNodes), &overlay.NodeCriteria{})
nodes, err := satellite.Overlay.Service.Reliable(ctx)
require.NoError(t, err)
require.Len(t, nodes, len(planet.StorageNodes)-len(nodesToKill))

View File

@ -223,7 +223,7 @@ func TestDisqualifiedNodeRemainsDisqualified(t *testing.T) {
UptimeReputationWeight: 1,
UptimeReputationDQ: 0,
}
err := satellitePeer.DB.OverlayCache().UpdateCheckIn(ctx, info, config)
err := satellitePeer.DB.OverlayCache().UpdateCheckIn(ctx, info, time.Now().UTC(), config)
require.NoError(t, err)
assert.True(t, isDisqualified(t, ctx, satellitePeer, disqualifiedNode.ID()))
@ -270,7 +270,7 @@ func disqualifyNode(t *testing.T, ctx *testcontext.Context, satellite *testplane
UptimeReputationWeight: 1,
UptimeReputationDQ: 1,
}
err := satellite.DB.OverlayCache().UpdateCheckIn(ctx, info, config)
err := satellite.DB.OverlayCache().UpdateCheckIn(ctx, info, time.Now().UTC(), config)
require.NoError(t, err)
assert.True(t, isDisqualified(t, ctx, satellite, nodeID))
}

View File

@ -6,6 +6,7 @@ package contact
import (
"context"
"fmt"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
@ -86,7 +87,7 @@ func (endpoint *Endpoint) CheckIn(ctx context.Context, req *pb.CheckInRequest) (
Operator: req.Operator,
Version: req.Version,
}
err = endpoint.service.overlay.UpdateCheckIn(ctx, nodeInfo)
err = endpoint.service.overlay.UpdateCheckIn(ctx, nodeInfo, time.Now().UTC())
if err != nil {
endpoint.log.Info("failed to update check in", zap.String("node address", req.Address), zap.Stringer("Node ID", nodeID), zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, Error.Wrap(err).Error())

View File

@ -154,11 +154,13 @@ func BenchmarkOverlay(b *testing.B) {
Timestamp: now,
Release: false,
},
}, overlay.NodeSelectionConfig{
UptimeReputationLambda: 0.99,
UptimeReputationWeight: 1.0,
UptimeReputationDQ: 0,
})
},
now,
overlay.NodeSelectionConfig{
UptimeReputationLambda: 0.99,
UptimeReputationWeight: 1.0,
UptimeReputationDQ: 0,
})
require.NoError(b, err)
}
})

View File

@ -64,7 +64,7 @@ type DB interface {
// UpdateUptime updates a single storagenode's uptime stats.
UpdateUptime(ctx context.Context, nodeID storj.NodeID, isUp bool, lambda, weight, uptimeDQ float64) (stats *NodeStats, err error)
// UpdateCheckIn updates a single storagenode's check-in stats.
UpdateCheckIn(ctx context.Context, node NodeCheckInInfo, config NodeSelectionConfig) (err error)
UpdateCheckIn(ctx context.Context, node NodeCheckInInfo, timestamp time.Time, config NodeSelectionConfig) (err error)
// AllPieceCounts returns a map of node IDs to piece counts from the db.
AllPieceCounts(ctx context.Context) (pieceCounts map[storj.NodeID]int, err error)
@ -236,8 +236,7 @@ func (service *Service) Get(ctx context.Context, nodeID storj.NodeID) (_ *NodeDo
// IsOnline checks if a node is 'online' based on the collected statistics.
func (service *Service) IsOnline(node *NodeDossier) bool {
return time.Since(node.Reputation.LastContactSuccess) < service.config.Node.OnlineWindow ||
node.Reputation.LastContactSuccess.After(node.Reputation.LastContactFailure)
return time.Since(node.Reputation.LastContactSuccess) < service.config.Node.OnlineWindow
}
// FindStorageNodes searches the overlay network for nodes that meet the provided requirements
@ -419,9 +418,9 @@ func (service *Service) UpdateUptime(ctx context.Context, nodeID storj.NodeID, i
}
// UpdateCheckIn updates a single storagenode's check-in info.
func (service *Service) UpdateCheckIn(ctx context.Context, node NodeCheckInInfo) (err error) {
func (service *Service) UpdateCheckIn(ctx context.Context, node NodeCheckInInfo, timestamp time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
return service.db.UpdateCheckIn(ctx, node, service.config.Node)
return service.db.UpdateCheckIn(ctx, node, timestamp, service.config.Node)
}
// GetMissingPieces returns the list of offline nodes

View File

@ -390,7 +390,7 @@ func TestUpdateCheckIn(t *testing.T) {
// check-in for that node id, which should add the node
// to the nodes tables in the database
startOfTest := time.Now().UTC()
err = db.OverlayCache().UpdateCheckIn(ctx, info, config)
err = db.OverlayCache().UpdateCheckIn(ctx, info, time.Now().UTC(), config)
require.NoError(t, err)
// confirm that the node is now in the nodes table with the
@ -398,7 +398,7 @@ func TestUpdateCheckIn(t *testing.T) {
actualNode, err := db.OverlayCache().Get(ctx, nodeID)
require.NoError(t, err)
require.True(t, actualNode.Reputation.LastContactSuccess.After(startOfTest))
require.True(t, actualNode.Reputation.LastContactFailure.Equal(time.Time{}.UTC()))
require.True(t, actualNode.Reputation.LastContactFailure.UTC().Equal(time.Time{}.UTC()))
// we need to overwrite the times so that the deep equal considers them the same
expectedNode.Reputation.LastContactSuccess = actualNode.Reputation.LastContactSuccess
@ -428,7 +428,7 @@ func TestUpdateCheckIn(t *testing.T) {
}
// confirm that the updated node is in the nodes table with the
// correct updated fields set
err = db.OverlayCache().UpdateCheckIn(ctx, updatedInfo, config)
err = db.OverlayCache().UpdateCheckIn(ctx, updatedInfo, time.Now().UTC(), config)
require.NoError(t, err)
updatedNode, err := db.OverlayCache().Get(ctx, nodeID)
require.NoError(t, err)
@ -460,7 +460,7 @@ func TestUpdateCheckIn(t *testing.T) {
Release: false,
},
}
err = db.OverlayCache().UpdateCheckIn(ctx, updatedInfo2, config)
err = db.OverlayCache().UpdateCheckIn(ctx, updatedInfo2, time.Now().UTC(), config)
require.NoError(t, err)
updated2Node, err := db.OverlayCache().Get(ctx, nodeID)
require.NoError(t, err)

View File

@ -214,7 +214,7 @@ func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) {
UptimeReputationDQ: dq,
}
// update check-in when node is offline
err = cache.UpdateCheckIn(ctx, info, config)
err = cache.UpdateCheckIn(ctx, info, time.Now().UTC(), config)
require.NoError(t, err)
node, err = cache.Get(ctx, nodeID)
require.NoError(t, err)
@ -230,7 +230,7 @@ func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) {
info.IsUp = true
// update check-in when node is online
err = cache.UpdateCheckIn(ctx, info, config)
err = cache.UpdateCheckIn(ctx, info, time.Now().UTC(), config)
require.NoError(t, err)
node, err = cache.Get(ctx, nodeID)
require.NoError(t, err)

View File

@ -8,6 +8,7 @@ import (
"io"
"math"
"testing"
"time"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
@ -43,11 +44,11 @@ func TestDataRepair(t *testing.T) {
UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Overlay.Node.OnlineWindow = 0
config.Repairer.MaxExcessRateOptimalThreshold = RepairMaxExcessRateOptimalThreshold
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
// first, upload some remote data
uplinkPeer := planet.Uplinks[0]
satellite := planet.Satellites[0]
@ -118,10 +119,7 @@ func TestDataRepair(t *testing.T) {
continue
}
if nodesToKill[node.ID()] {
err = planet.StopPeer(node)
require.NoError(t, err)
_, err = satellite.Overlay.Service.UpdateUptime(ctx, node.ID(), false)
require.NoError(t, err)
stopNodeByID(t, ctx, planet, node.ID())
}
}
@ -153,7 +151,6 @@ func TestDataRepair(t *testing.T) {
nodesToKillForMinThreshold--
}
}
// we should be able to download data without any of the original nodes
newData, err := uplinkPeer.Download(ctx, satellite, "testbucket", "test/path")
require.NoError(t, err)
@ -177,7 +174,6 @@ func TestCorruptDataRepair_Failed(t *testing.T) {
UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Overlay.Node.OnlineWindow = 0
config.Repairer.MaxExcessRateOptimalThreshold = RepairMaxExcessRateOptimalThreshold
},
},
@ -239,10 +235,7 @@ func TestCorruptDataRepair_Failed(t *testing.T) {
corruptedNode = node
}
if nodesToKill[node.ID()] {
err = planet.StopPeer(node)
require.NoError(t, err)
_, err = satellite.Overlay.Service.UpdateUptime(ctx, node.ID(), false)
require.NoError(t, err)
stopNodeByID(t, ctx, planet, node.ID())
}
}
require.NotNil(t, corruptedNode)
@ -297,7 +290,6 @@ func TestCorruptDataRepair_Succeed(t *testing.T) {
UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Overlay.Node.OnlineWindow = 0
config.Repairer.MaxExcessRateOptimalThreshold = RepairMaxExcessRateOptimalThreshold
},
},
@ -361,10 +353,7 @@ func TestCorruptDataRepair_Succeed(t *testing.T) {
corruptedNode = node
}
if nodesToKill[node.ID()] {
err = planet.StopPeer(node)
require.NoError(t, err)
_, err = satellite.Overlay.Service.UpdateUptime(ctx, node.ID(), false)
require.NoError(t, err)
stopNodeByID(t, ctx, planet, node.ID())
}
}
require.NotNil(t, corruptedNode)
@ -649,11 +638,7 @@ func TestRepairMultipleDisqualified(t *testing.T) {
// kill nodes kept alive to ensure repair worked
for _, node := range planet.StorageNodes {
if nodesToKeepAlive[node.ID()] {
err = planet.StopPeer(node)
require.NoError(t, err)
_, err = satellite.Overlay.Service.UpdateUptime(ctx, node.ID(), false)
require.NoError(t, err)
stopNodeByID(t, ctx, planet, node.ID())
}
}
@ -687,7 +672,6 @@ func TestDataRepairOverride_HigherLimit(t *testing.T) {
UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Overlay.Node.OnlineWindow = 0
config.Checker.RepairOverride = repairOverride
},
},
@ -734,10 +718,7 @@ func TestDataRepairOverride_HigherLimit(t *testing.T) {
for _, node := range planet.StorageNodes {
if nodesToKill[node.ID()] {
err = planet.StopPeer(node)
require.NoError(t, err)
_, err = satellite.Overlay.Service.UpdateUptime(ctx, node.ID(), false)
require.NoError(t, err)
stopNodeByID(t, ctx, planet, node.ID())
}
}
@ -776,7 +757,6 @@ func TestDataRepairOverride_LowerLimit(t *testing.T) {
UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Overlay.Node.OnlineWindow = 0
config.Checker.RepairOverride = repairOverride
},
},
@ -824,10 +804,7 @@ func TestDataRepairOverride_LowerLimit(t *testing.T) {
for _, node := range planet.StorageNodes {
if nodesToKill[node.ID()] {
err = planet.StopPeer(node)
require.NoError(t, err)
_, err = satellite.Overlay.Service.UpdateUptime(ctx, node.ID(), false)
require.NoError(t, err)
stopNodeByID(t, ctx, planet, node.ID())
}
}
@ -959,10 +936,7 @@ func TestDataRepairUploadLimit(t *testing.T) {
}
if len(killedNodes) < numNodesToKill {
err = planet.StopPeer(node)
require.NoError(t, err)
_, err = satellite.Overlay.Service.UpdateUptime(ctx, node.ID(), false)
require.NoError(t, err)
stopNodeByID(t, ctx, planet, node.ID())
killedNodes[node.ID()] = struct{}{}
}
@ -1062,7 +1036,19 @@ func stopNodeByID(t *testing.T, ctx context.Context, planet *testplanet.Planet,
require.NoError(t, err)
for _, satellite := range planet.Satellites {
_, err = satellite.Overlay.Service.UpdateUptime(ctx, node.ID(), false)
err = satellite.Overlay.Service.UpdateCheckIn(ctx, overlay.NodeCheckInInfo{
NodeID: node.ID(),
Address: &pb.NodeAddress{
Address: node.Addr(),
},
IsUp: true,
Version: &pb.NodeVersion{
Version: "v0.0.0",
CommitHash: "",
Timestamp: time.Time{},
Release: false,
},
}, time.Now().UTC().Add(-4*time.Hour))
require.NoError(t, err)
}

View File

@ -47,8 +47,7 @@ func (cache *overlaycache) SelectStorageNodes(ctx context.Context, count int, cr
AND free_disk >= ?
AND total_audit_count >= ?
AND total_uptime_count >= ?
AND (last_contact_success > ?
OR last_contact_success > last_contact_failure)`
AND last_contact_success > ?`
args := append(make([]interface{}, 0, 13),
nodeType, criteria.FreeBandwidth, criteria.FreeDisk, criteria.AuditCount,
criteria.UptimeCount, time.Now().Add(-criteria.OnlineWindow))
@ -103,8 +102,7 @@ func (cache *overlaycache) SelectNewStorageNodes(ctx context.Context, count int,
AND free_bandwidth >= ?
AND free_disk >= ?
AND (total_audit_count < ? OR total_uptime_count < ?)
AND (last_contact_success > ?
OR last_contact_success > last_contact_failure)`
AND last_contact_success > ?`
args := append(make([]interface{}, 0, 10),
nodeType, criteria.FreeBandwidth, criteria.FreeDisk, criteria.AuditCount, criteria.UptimeCount, time.Now().Add(-criteria.OnlineWindow))
@ -327,7 +325,7 @@ func (cache *overlaycache) KnownOffline(ctx context.Context, criteria *overlay.N
SELECT id FROM nodes
WHERE id = any($1::bytea[])
AND (
last_contact_success < last_contact_failure AND last_contact_success < $2
last_contact_success < $2
)
`), postgresNodeIDList(nodeIds), time.Now().Add(-criteria.OnlineWindow),
)
@ -361,7 +359,7 @@ func (cache *overlaycache) KnownUnreliableOrOffline(ctx context.Context, criteri
SELECT id FROM nodes
WHERE id = any($1::bytea[])
AND disqualified IS NULL
AND (last_contact_success > $2 OR last_contact_success > last_contact_failure)
AND last_contact_success > $2
`), postgresNodeIDList(nodeIds), time.Now().Add(-criteria.OnlineWindow),
)
if err != nil {
@ -392,7 +390,7 @@ func (cache *overlaycache) Reliable(ctx context.Context, criteria *overlay.NodeC
rows, err := cache.db.Query(cache.db.Rebind(`
SELECT id FROM nodes
WHERE disqualified IS NULL
AND (last_contact_success > ? OR last_contact_success > last_contact_failure)`),
AND last_contact_success > ?`),
time.Now().Add(-criteria.OnlineWindow))
if err != nil {
return nil, err
@ -1367,7 +1365,7 @@ func populateUpdateFields(dbNode *dbx.Node, updateReq *overlay.UpdateRequest) db
}
// UpdateCheckIn updates a single storagenode with info from when the the node last checked in.
func (cache *overlaycache) UpdateCheckIn(ctx context.Context, node overlay.NodeCheckInInfo, config overlay.NodeSelectionConfig) (err error) {
func (cache *overlaycache) UpdateCheckIn(ctx context.Context, node overlay.NodeCheckInInfo, timestamp time.Time, config overlay.NodeSelectionConfig) (err error) {
defer mon.Task()(&ctx)(&err)
if node.Address.GetAddress() == "" {
@ -1402,11 +1400,11 @@ func (cache *overlaycache) UpdateCheckIn(ctx context.Context, node overlay.NodeC
$1, $2, $3, $4, $5,
$6, $7, $8, $9,
$10::bool::int, 1,
CASE WHEN $10 IS TRUE THEN current_timestamp
ELSE '0001-01-01 00:00:00+00'
CASE WHEN $10 IS TRUE THEN $24::timestamptz
ELSE '0001-01-01 00:00:00+00'::timestamptz
END,
CASE WHEN $10 IS FALSE THEN current_timestamp
ELSE '0001-01-01 00:00:00+00'
CASE WHEN $10 IS FALSE THEN $24::timestamptz
ELSE '0001-01-01 00:00:00+00'::timestamptz
END,
$11, $12, $13, $14,
$18, $19, $20, $21, $22, $23
@ -1427,17 +1425,17 @@ func (cache *overlaycache) UpdateCheckIn(ctx context.Context, node overlay.NodeC
uptime_reputation_beta=$16::numeric*nodes.uptime_reputation_beta + $17::numeric*(NOT $10)::bool::int,
uptime_success_count = nodes.uptime_success_count + $10::bool::int,
last_contact_success = CASE WHEN $10 IS TRUE
THEN current_timestamp
THEN $24::timestamptz
ELSE nodes.last_contact_success
END,
last_contact_failure = CASE WHEN $10 IS FALSE
THEN current_timestamp
THEN $24::timestamptz
ELSE nodes.last_contact_failure
END,
-- this disqualified case statement resolves to:
-- when (new.uptime_reputation_alpha /(new.uptime_reputation_alpha + new.uptime_reputation_beta)) <= config.UptimeReputationDQ
disqualified = CASE WHEN (($16::numeric*nodes.uptime_reputation_alpha + $17::numeric*$10::bool::int) / (($16::numeric*nodes.uptime_reputation_alpha + $17::numeric*$10::bool::int) + ($16::numeric*nodes.uptime_reputation_beta + $17::numeric*(NOT $10)::bool::int))) <= $15 AND nodes.disqualified IS NULL
THEN current_timestamp
THEN $24::timestamptz
ELSE nodes.disqualified
END;
`
@ -1454,6 +1452,8 @@ func (cache *overlaycache) UpdateCheckIn(ctx context.Context, node overlay.NodeC
config.UptimeReputationDQ, config.UptimeReputationLambda, config.UptimeReputationWeight,
// args $18 - $23
semVer.Major, semVer.Minor, semVer.Patch, node.Version.GetCommitHash(), node.Version.Timestamp, node.Version.GetRelease(),
// args $24
timestamp,
)
if err != nil {
return Error.Wrap(err)

View File

@ -187,6 +187,5 @@ func CalcNeededNodes(rs storj.RedundancyScheme) int32 {
if needed > int32(rs.TotalShares) {
needed = int32(rs.TotalShares)
}
return needed
}