diff --git a/internal/testplanet/reconfigure.go b/internal/testplanet/reconfigure.go index b41c28139..9b34f7d44 100644 --- a/internal/testplanet/reconfigure.go +++ b/internal/testplanet/reconfigure.go @@ -4,6 +4,8 @@ package testplanet import ( + "time" + "go.uber.org/zap" "storj.io/storj/bootstrap" @@ -37,3 +39,11 @@ var DisablePeerCAWhitelist = Reconfigure{ config.Server.UsePeerCAWhitelist = false }, } + +// ShortenOnlineWindow returns a `Reconfigure` that sets the NodeSelection +// OnlineWindow to 1 second, meaning a connection failure leads to marking the nodes as offline +var ShortenOnlineWindow = Reconfigure{ + Satellite: func(log *zap.Logger, index int, config *satellite.Config) { + config.Overlay.Node.OnlineWindow = 1 * time.Second + }, +} diff --git a/internal/testplanet/satellite.go b/internal/testplanet/satellite.go index d7a07d790..ddc1cd65a 100644 --- a/internal/testplanet/satellite.go +++ b/internal/testplanet/satellite.go @@ -104,7 +104,7 @@ func (planet *Planet) newSatellites(count int) ([]*satellite.Peer, error) { UptimeCount: 0, AuditCount: 0, NewNodePercentage: 0, - OnlineWindow: time.Hour, + OnlineWindow: 0, DistinctIP: false, AuditReputationRepairWeight: 1, diff --git a/pkg/datarepair/datarepair_test.go b/pkg/datarepair/datarepair_test.go index 526ed339f..5a15253a6 100644 --- a/pkg/datarepair/datarepair_test.go +++ b/pkg/datarepair/datarepair_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/stretchr/testify/require" + "go.uber.org/zap" "storj.io/storj/internal/memory" "storj.io/storj/internal/testcontext" @@ -31,6 +32,11 @@ func TestDataRepair(t *testing.T) { SatelliteCount: 1, StorageNodeCount: 12, UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: func(log *zap.Logger, index int, config *satellite.Config) { + config.Overlay.Node.OnlineWindow = 0 + }, + }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { // first, upload some remote data ul := planet.Uplinks[0] diff --git a/pkg/overlay/cache.go b/pkg/overlay/cache.go index 1e9f1baee..c8d36eac5 100644 --- a/pkg/overlay/cache.go +++ b/pkg/overlay/cache.go @@ -177,7 +177,7 @@ func (cache *Cache) Get(ctx context.Context, nodeID storj.NodeID) (_ *NodeDossie // IsOnline checks if a node is 'online' based on the collected statistics. func (cache *Cache) IsOnline(node *NodeDossier) bool { - return time.Now().Sub(node.Reputation.LastContactSuccess) < cache.preferences.OnlineWindow && + return time.Now().Sub(node.Reputation.LastContactSuccess) < cache.preferences.OnlineWindow || node.Reputation.LastContactSuccess.After(node.Reputation.LastContactFailure) } diff --git a/satellite/satellitedb/overlaycache.go b/satellite/satellitedb/overlaycache.go index f3305db80..e6ab90952 100644 --- a/satellite/satellitedb/overlaycache.go +++ b/satellite/satellitedb/overlaycache.go @@ -10,9 +10,9 @@ import ( "time" "github.com/lib/pq" - "github.com/mattn/go-sqlite3" + sqlite3 "github.com/mattn/go-sqlite3" "github.com/zeebo/errs" - "gopkg.in/spacemonkeygo/monkit.v2" + monkit "gopkg.in/spacemonkeygo/monkit.v2" "storj.io/storj/internal/version" "storj.io/storj/pkg/overlay" @@ -44,8 +44,8 @@ func (cache *overlaycache) SelectStorageNodes(ctx context.Context, count int, cr AND free_disk >= ? AND total_audit_count >= ? AND total_uptime_count >= ? - AND last_contact_success > ? - AND last_contact_success > last_contact_failure` + AND (last_contact_success > ? + OR last_contact_success > last_contact_failure)` args := append(make([]interface{}, 0, 13), nodeType, criteria.FreeBandwidth, criteria.FreeDisk, criteria.AuditCount, criteria.UptimeCount, time.Now().Add(-criteria.OnlineWindow)) @@ -99,8 +99,8 @@ func (cache *overlaycache) SelectNewStorageNodes(ctx context.Context, count int, AND free_bandwidth >= ? AND free_disk >= ? AND (total_audit_count < ? OR total_uptime_count < ?) - AND last_contact_success > ? - AND last_contact_success > last_contact_failure` + AND (last_contact_success > ? + OR last_contact_success > last_contact_failure)` args := append(make([]interface{}, 0, 10), nodeType, criteria.FreeBandwidth, criteria.FreeDisk, criteria.AuditCount, criteria.UptimeCount, time.Now().Add(-criteria.OnlineWindow)) @@ -160,8 +160,8 @@ func (cache *overlaycache) queryNodes(ctx context.Context, excludedNodes []storj args = append(args, count) var rows *sql.Rows - rows, err = cache.db.Query(cache.db.Rebind(`SELECT id, type, address, last_net, - free_bandwidth, free_disk, total_audit_count, audit_success_count, + rows, err = cache.db.Query(cache.db.Rebind(`SELECT id, type, address, last_net, + free_bandwidth, free_disk, total_audit_count, audit_success_count, total_uptime_count, uptime_success_count, disqualified, audit_reputation_alpha, audit_reputation_beta, uptime_reputation_alpha, uptime_reputation_beta FROM nodes @@ -235,8 +235,8 @@ func (cache *overlaycache) sqliteQueryNodesDistinct(ctx context.Context, exclude args = append(args, count) - rows, err := cache.db.Query(cache.db.Rebind(`SELECT id, type, address, last_net, - free_bandwidth, free_disk, total_audit_count, audit_success_count, + rows, err := cache.db.Query(cache.db.Rebind(`SELECT id, type, address, last_net, + free_bandwidth, free_disk, total_audit_count, audit_success_count, total_uptime_count, uptime_success_count, disqualified, audit_reputation_alpha, audit_reputation_beta, uptime_reputation_alpha, uptime_reputation_beta FROM (SELECT *, Row_number() OVER(PARTITION BY last_net ORDER BY RANDOM()) rn @@ -412,7 +412,7 @@ func (cache *overlaycache) KnownOffline(ctx context.Context, criteria *overlay.N SELECT id FROM nodes WHERE id IN (?`+strings.Repeat(", ?", len(nodeIds)-1)+`) AND ( - last_contact_success < last_contact_failure OR last_contact_success < ? + last_contact_success < last_contact_failure AND last_contact_success < ? ) `), args...) @@ -421,7 +421,7 @@ func (cache *overlaycache) KnownOffline(ctx context.Context, criteria *overlay.N SELECT id FROM nodes WHERE id = any($1::bytea[]) AND ( - last_contact_success < last_contact_failure OR last_contact_success < $2 + last_contact_success < last_contact_failure AND last_contact_success < $2 ) `, postgresNodeIDList(nodeIds), time.Now().Add(-criteria.OnlineWindow), ) @@ -469,7 +469,7 @@ func (cache *overlaycache) KnownUnreliableOrOffline(ctx context.Context, criteri SELECT id FROM nodes WHERE id IN (?`+strings.Repeat(", ?", len(nodeIds)-1)+`) AND disqualified IS NULL - AND last_contact_success > ? AND last_contact_success > last_contact_failure + AND (last_contact_success > ? OR last_contact_success > last_contact_failure) `), args...) case *pq.Driver: @@ -477,7 +477,7 @@ func (cache *overlaycache) KnownUnreliableOrOffline(ctx context.Context, criteri SELECT id FROM nodes WHERE id = any($1::bytea[]) AND disqualified IS NULL - AND last_contact_success > $2 AND last_contact_success > last_contact_failure + AND (last_contact_success > $2 OR last_contact_success > last_contact_failure) `, postgresNodeIDList(nodeIds), time.Now().Add(-criteria.OnlineWindow), ) default: @@ -514,7 +514,7 @@ func (cache *overlaycache) Reliable(ctx context.Context, criteria *overlay.NodeC rows, err := cache.db.Query(cache.db.Rebind(` SELECT id FROM nodes WHERE disqualified IS NULL - AND last_contact_success > ? AND last_contact_success > last_contact_failure`), + AND (last_contact_success > ? OR last_contact_success > last_contact_failure)`), time.Now().Add(-criteria.OnlineWindow)) if err != nil { return nil, err