private,satellite: add chore to dq stray nodes
Full scope: private/testplanet,satellite/{overlay,satellitedb} Description: In most cases, downtime tracking with audits will eventually lead to DQ for nodes who are unresponsive. However, if a stray node has no pieces, it will not be audited and will thus never be disqualified. This chore will check for nodes who have not successfully been contacted in some set time and DQ them. There are some new flags for toggling DQ of stray nodes and the timeframes for running the chore and how long nodes can go without contact. Change-Id: Ic9d41fdbf214736798925e728245180fb3c55615
This commit is contained in:
parent
2e34b631b1
commit
75d828200c
@ -56,6 +56,7 @@ import (
|
|||||||
"storj.io/storj/satellite/nodestats"
|
"storj.io/storj/satellite/nodestats"
|
||||||
"storj.io/storj/satellite/orders"
|
"storj.io/storj/satellite/orders"
|
||||||
"storj.io/storj/satellite/overlay"
|
"storj.io/storj/satellite/overlay"
|
||||||
|
"storj.io/storj/satellite/overlay/straynodes"
|
||||||
"storj.io/storj/satellite/payments/paymentsconfig"
|
"storj.io/storj/satellite/payments/paymentsconfig"
|
||||||
"storj.io/storj/satellite/payments/stripecoinpayments"
|
"storj.io/storj/satellite/payments/stripecoinpayments"
|
||||||
"storj.io/storj/satellite/repair/checker"
|
"storj.io/storj/satellite/repair/checker"
|
||||||
@ -92,9 +93,10 @@ type Satellite struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Overlay struct {
|
Overlay struct {
|
||||||
DB overlay.DB
|
DB overlay.DB
|
||||||
Service *overlay.Service
|
Service *overlay.Service
|
||||||
Inspector *overlay.Inspector
|
Inspector *overlay.Inspector
|
||||||
|
DQStrayNodes *straynodes.Chore
|
||||||
}
|
}
|
||||||
|
|
||||||
Metainfo struct {
|
Metainfo struct {
|
||||||
@ -120,6 +122,7 @@ type Satellite struct {
|
|||||||
Repairer *repairer.Service
|
Repairer *repairer.Service
|
||||||
Inspector *irreparable.Inspector
|
Inspector *irreparable.Inspector
|
||||||
}
|
}
|
||||||
|
|
||||||
Audit struct {
|
Audit struct {
|
||||||
Queues *audit.Queues
|
Queues *audit.Queues
|
||||||
Worker *audit.Worker
|
Worker *audit.Worker
|
||||||
@ -439,6 +442,11 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int
|
|||||||
OfflineThreshold: 0.6,
|
OfflineThreshold: 0.6,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
StrayNodes: straynodes.Config{
|
||||||
|
EnableDQ: true,
|
||||||
|
Interval: time.Minute,
|
||||||
|
MaxDurationWithoutContact: 30 * time.Second,
|
||||||
|
},
|
||||||
Metainfo: metainfo.Config{
|
Metainfo: metainfo.Config{
|
||||||
DatabaseURL: "", // not used
|
DatabaseURL: "", // not used
|
||||||
MinRemoteSegmentSize: 0, // TODO: fix tests to work with 1024
|
MinRemoteSegmentSize: 0, // TODO: fix tests to work with 1024
|
||||||
@ -696,6 +704,7 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer
|
|||||||
system.Overlay.DB = api.Overlay.DB
|
system.Overlay.DB = api.Overlay.DB
|
||||||
system.Overlay.Service = api.Overlay.Service
|
system.Overlay.Service = api.Overlay.Service
|
||||||
system.Overlay.Inspector = api.Overlay.Inspector
|
system.Overlay.Inspector = api.Overlay.Inspector
|
||||||
|
system.Overlay.DQStrayNodes = peer.Overlay.DQStrayNodes
|
||||||
|
|
||||||
system.Metainfo.Database = api.Metainfo.Database
|
system.Metainfo.Database = api.Metainfo.Database
|
||||||
system.Metainfo.Service = peer.Metainfo.Service
|
system.Metainfo.Service = peer.Metainfo.Service
|
||||||
|
@ -38,6 +38,7 @@ import (
|
|||||||
"storj.io/storj/satellite/metrics"
|
"storj.io/storj/satellite/metrics"
|
||||||
"storj.io/storj/satellite/orders"
|
"storj.io/storj/satellite/orders"
|
||||||
"storj.io/storj/satellite/overlay"
|
"storj.io/storj/satellite/overlay"
|
||||||
|
"storj.io/storj/satellite/overlay/straynodes"
|
||||||
"storj.io/storj/satellite/payments"
|
"storj.io/storj/satellite/payments"
|
||||||
"storj.io/storj/satellite/payments/stripecoinpayments"
|
"storj.io/storj/satellite/payments/stripecoinpayments"
|
||||||
"storj.io/storj/satellite/repair/checker"
|
"storj.io/storj/satellite/repair/checker"
|
||||||
@ -73,8 +74,9 @@ type Core struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Overlay struct {
|
Overlay struct {
|
||||||
DB overlay.DB
|
DB overlay.DB
|
||||||
Service *overlay.Service
|
Service *overlay.Service
|
||||||
|
DQStrayNodes *straynodes.Chore
|
||||||
}
|
}
|
||||||
|
|
||||||
Metainfo struct {
|
Metainfo struct {
|
||||||
@ -92,6 +94,7 @@ type Core struct {
|
|||||||
Repair struct {
|
Repair struct {
|
||||||
Checker *checker.Checker
|
Checker *checker.Checker
|
||||||
}
|
}
|
||||||
|
|
||||||
Audit struct {
|
Audit struct {
|
||||||
Queues *audit.Queues
|
Queues *audit.Queues
|
||||||
Worker *audit.Worker
|
Worker *audit.Worker
|
||||||
@ -229,6 +232,17 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
|
|||||||
Name: "overlay",
|
Name: "overlay",
|
||||||
Close: peer.Overlay.Service.Close,
|
Close: peer.Overlay.Service.Close,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
if config.StrayNodes.EnableDQ {
|
||||||
|
peer.Overlay.DQStrayNodes = straynodes.NewChore(peer.Log.Named("overlay:dq-stray-nodes"), peer.Overlay.DB, config.StrayNodes)
|
||||||
|
peer.Services.Add(lifecycle.Item{
|
||||||
|
Name: "overlay:dq-stray-nodes",
|
||||||
|
Run: peer.Overlay.DQStrayNodes.Run,
|
||||||
|
Close: peer.Overlay.DQStrayNodes.Close,
|
||||||
|
})
|
||||||
|
peer.Debug.Server.Panel.Add(
|
||||||
|
debug.Cycle("Overlay DQ Stray Nodes", peer.Overlay.DQStrayNodes.Loop))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
{ // setup live accounting
|
{ // setup live accounting
|
||||||
|
@ -88,6 +88,8 @@ type DB interface {
|
|||||||
|
|
||||||
// DisqualifyNode disqualifies a storage node.
|
// DisqualifyNode disqualifies a storage node.
|
||||||
DisqualifyNode(ctx context.Context, nodeID storj.NodeID) (err error)
|
DisqualifyNode(ctx context.Context, nodeID storj.NodeID) (err error)
|
||||||
|
// DQNodesLastSeenBefore disqualifies all nodes where last_contact_success < cutoff.
|
||||||
|
DQNodesLastSeenBefore(ctx context.Context, cutoff time.Time) (err error)
|
||||||
|
|
||||||
// SuspendNodeUnknownAudit suspends a storage node for unknown audits.
|
// SuspendNodeUnknownAudit suspends a storage node for unknown audits.
|
||||||
SuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID, suspendedAt time.Time) (err error)
|
SuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID, suspendedAt time.Time) (err error)
|
||||||
|
61
satellite/overlay/straynodes/chore.go
Normal file
61
satellite/overlay/straynodes/chore.go
Normal file
@ -0,0 +1,61 @@
|
|||||||
|
// Copyright (C) 2020 Storj Labs, Inc.
|
||||||
|
// See LICENSE for copying information.
|
||||||
|
|
||||||
|
package straynodes
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/spacemonkeygo/monkit/v3"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"storj.io/common/sync2"
|
||||||
|
"storj.io/storj/satellite/overlay"
|
||||||
|
)
|
||||||
|
|
||||||
|
var mon = monkit.Package()
|
||||||
|
|
||||||
|
// Config contains configurable values for stray nodes chore.
|
||||||
|
type Config struct {
|
||||||
|
EnableDQ bool `help:"whether nodes will be disqualified if they have not been contacted in some time" releaseDefault:"false" devDefault:"true"`
|
||||||
|
Interval time.Duration `help:"how often to check for and DQ stray nodes" releaseDefault:"168h" devDefault:"5m"`
|
||||||
|
MaxDurationWithoutContact time.Duration `help:"length of time a node can go without contacting satellite before being disqualified" releaseDefault:"720h" devDefault:"5m"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Chore disqualifies stray nodes.
|
||||||
|
type Chore struct {
|
||||||
|
log *zap.Logger
|
||||||
|
cache overlay.DB
|
||||||
|
maxDurationWithoutContact time.Duration
|
||||||
|
Loop *sync2.Cycle
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewChore creates a new stray nodes Chore.
|
||||||
|
func NewChore(log *zap.Logger, cache overlay.DB, config Config) *Chore {
|
||||||
|
return &Chore{
|
||||||
|
log: log,
|
||||||
|
cache: cache,
|
||||||
|
maxDurationWithoutContact: config.MaxDurationWithoutContact,
|
||||||
|
Loop: sync2.NewCycle(config.Interval),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run runs the chore.
|
||||||
|
func (chore *Chore) Run(ctx context.Context) (err error) {
|
||||||
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
|
return chore.Loop.Run(ctx, func(ctx context.Context) error {
|
||||||
|
err := chore.cache.DQNodesLastSeenBefore(ctx, time.Now().UTC().Add(-chore.maxDurationWithoutContact))
|
||||||
|
if err != nil {
|
||||||
|
chore.log.Error("error disqualifying stray nodes", zap.Error(err))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes chore.
|
||||||
|
func (chore *Chore) Close() error {
|
||||||
|
chore.Loop.Close()
|
||||||
|
return nil
|
||||||
|
}
|
68
satellite/overlay/straynodes/chore_test.go
Normal file
68
satellite/overlay/straynodes/chore_test.go
Normal file
@ -0,0 +1,68 @@
|
|||||||
|
// Copyright (C) 2020 Storj Labs, Inc.
|
||||||
|
// See LICENSE for copying information.
|
||||||
|
|
||||||
|
package straynodes_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"storj.io/common/pb"
|
||||||
|
"storj.io/common/testcontext"
|
||||||
|
"storj.io/storj/private/testplanet"
|
||||||
|
"storj.io/storj/satellite"
|
||||||
|
"storj.io/storj/satellite/overlay"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestDQStrayNodes(t *testing.T) {
|
||||||
|
testplanet.Run(t, testplanet.Config{
|
||||||
|
SatelliteCount: 1, StorageNodeCount: 2,
|
||||||
|
Reconfigure: testplanet.Reconfigure{
|
||||||
|
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
||||||
|
config.StrayNodes.MaxDurationWithoutContact = 24 * time.Hour
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||||
|
strayNode := planet.StorageNodes[0]
|
||||||
|
liveNode := planet.StorageNodes[1]
|
||||||
|
sat := planet.Satellites[0]
|
||||||
|
strayNode.Contact.Chore.Pause(ctx)
|
||||||
|
sat.Overlay.DQStrayNodes.Loop.Pause()
|
||||||
|
|
||||||
|
cache := planet.Satellites[0].Overlay.DB
|
||||||
|
|
||||||
|
strayInfo, err := cache.Get(ctx, strayNode.ID())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Nil(t, strayInfo.Disqualified)
|
||||||
|
|
||||||
|
checkInInfo := overlay.NodeCheckInInfo{
|
||||||
|
NodeID: strayNode.ID(),
|
||||||
|
IsUp: true,
|
||||||
|
Address: &pb.NodeAddress{
|
||||||
|
Address: "1.2.3.4",
|
||||||
|
},
|
||||||
|
Version: &pb.NodeVersion{
|
||||||
|
Version: "v0.0.0",
|
||||||
|
CommitHash: "",
|
||||||
|
Timestamp: time.Time{},
|
||||||
|
Release: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// set strayNode last_contact_success to 48 hours ago
|
||||||
|
require.NoError(t, sat.Overlay.DB.UpdateCheckIn(ctx, checkInInfo, time.Now().Add(-48*time.Hour), sat.Config.Overlay.Node))
|
||||||
|
|
||||||
|
sat.Overlay.DQStrayNodes.Loop.TriggerWait()
|
||||||
|
|
||||||
|
strayInfo, err = cache.Get(ctx, strayNode.ID())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, strayInfo.Disqualified)
|
||||||
|
|
||||||
|
liveInfo, err := cache.Get(ctx, liveNode.ID())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Nil(t, liveInfo.Disqualified)
|
||||||
|
})
|
||||||
|
}
|
@ -36,6 +36,7 @@ import (
|
|||||||
"storj.io/storj/satellite/nodeapiversion"
|
"storj.io/storj/satellite/nodeapiversion"
|
||||||
"storj.io/storj/satellite/orders"
|
"storj.io/storj/satellite/orders"
|
||||||
"storj.io/storj/satellite/overlay"
|
"storj.io/storj/satellite/overlay"
|
||||||
|
"storj.io/storj/satellite/overlay/straynodes"
|
||||||
"storj.io/storj/satellite/payments/paymentsconfig"
|
"storj.io/storj/satellite/payments/paymentsconfig"
|
||||||
"storj.io/storj/satellite/payments/stripecoinpayments"
|
"storj.io/storj/satellite/payments/stripecoinpayments"
|
||||||
"storj.io/storj/satellite/referrals"
|
"storj.io/storj/satellite/referrals"
|
||||||
@ -114,8 +115,9 @@ type Config struct {
|
|||||||
|
|
||||||
Admin admin.Config
|
Admin admin.Config
|
||||||
|
|
||||||
Contact contact.Config
|
Contact contact.Config
|
||||||
Overlay overlay.Config
|
Overlay overlay.Config
|
||||||
|
StrayNodes straynodes.Config
|
||||||
|
|
||||||
Metainfo metainfo.Config
|
Metainfo metainfo.Config
|
||||||
Orders orders.Config
|
Orders orders.Config
|
||||||
|
@ -1565,6 +1565,14 @@ func (cache *overlaycache) populateUpdateFields(dbNode *dbx.Node, updateReq *ove
|
|||||||
return updateFields
|
return updateFields
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DQNodesLastSeenBefore disqualifies all nodes where last_contact_success < cutoff.
|
||||||
|
func (cache *overlaycache) DQNodesLastSeenBefore(ctx context.Context, cutoff time.Time) (err error) {
|
||||||
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
q := `UPDATE nodes SET disqualified = current_timestamp WHERE last_contact_success < $1;`
|
||||||
|
_, err = cache.db.ExecContext(ctx, q, cutoff)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// UpdateCheckIn updates a single storagenode with info from when the the node last checked in.
|
// UpdateCheckIn updates a single storagenode with info from when the the node last checked in.
|
||||||
func (cache *overlaycache) UpdateCheckIn(ctx context.Context, node overlay.NodeCheckInInfo, timestamp time.Time, config overlay.NodeSelectionConfig) (err error) {
|
func (cache *overlaycache) UpdateCheckIn(ctx context.Context, node overlay.NodeCheckInInfo, timestamp time.Time, config overlay.NodeSelectionConfig) (err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
@ -15,6 +15,27 @@ import (
|
|||||||
"storj.io/storj/satellite/overlay"
|
"storj.io/storj/satellite/overlay"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestDQNodesLastSeenBefore(t *testing.T) {
|
||||||
|
testplanet.Run(t, testplanet.Config{
|
||||||
|
SatelliteCount: 1, StorageNodeCount: 1,
|
||||||
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||||
|
node := planet.StorageNodes[0]
|
||||||
|
node.Contact.Chore.Pause(ctx)
|
||||||
|
|
||||||
|
cache := planet.Satellites[0].Overlay.DB
|
||||||
|
|
||||||
|
info, err := cache.Get(ctx, node.ID())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Nil(t, info.Disqualified)
|
||||||
|
|
||||||
|
require.NoError(t, cache.DQNodesLastSeenBefore(ctx, time.Now()))
|
||||||
|
|
||||||
|
info, err = cache.Get(ctx, node.ID())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, info.Disqualified)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestUpdateStats(t *testing.T) {
|
func TestUpdateStats(t *testing.T) {
|
||||||
testplanet.Run(t, testplanet.Config{
|
testplanet.Run(t, testplanet.Config{
|
||||||
SatelliteCount: 1, StorageNodeCount: 2,
|
SatelliteCount: 1, StorageNodeCount: 2,
|
||||||
|
9
scripts/testdata/satellite-config.yaml.lock
vendored
9
scripts/testdata/satellite-config.yaml.lock
vendored
@ -652,6 +652,15 @@ server.private-address: 127.0.0.1:7778
|
|||||||
# if true, uses peer ca whitelist checking
|
# if true, uses peer ca whitelist checking
|
||||||
# server.use-peer-ca-whitelist: true
|
# server.use-peer-ca-whitelist: true
|
||||||
|
|
||||||
|
# whether nodes will be disqualified if they have not been contacted in some time
|
||||||
|
# stray-nodes.enable-dq: false
|
||||||
|
|
||||||
|
# how often to check for and DQ stray nodes
|
||||||
|
# stray-nodes.interval: 168h0m0s
|
||||||
|
|
||||||
|
# length of time a node can go without contacting satellite before being disqualified
|
||||||
|
# stray-nodes.max-duration-without-contact: 720h0m0s
|
||||||
|
|
||||||
# how frequently the tally service should run
|
# how frequently the tally service should run
|
||||||
# tally.interval: 1h0m0s
|
# tally.interval: 1h0m0s
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user