satellite:{downtime,overlay}: Implement offline node detection chore

https://storjlabs.atlassian.net/browse/V3-3398

Change-Id: I598c3bad819026377d1d113c099dc9bba8b02742
This commit is contained in:
Ethan 2020-01-02 15:41:18 -05:00 committed by Ethan Adams
parent 5aac77c2a1
commit 05b406e992
13 changed files with 504 additions and 10 deletions

View File

@ -165,7 +165,8 @@ type SatelliteSystem struct {
}
DowntimeTracking struct {
Service *downtime.Service
DetectionChore *downtime.DetectionChore
Service *downtime.Service
}
}
@ -402,6 +403,9 @@ func (planet *Planet) newSatellites(count int) ([]*SatelliteSystem, error) {
Metrics: metrics.Config{
ChoreInterval: defaultInterval,
},
Downtime: downtime.Config{
DetectionInterval: defaultInterval,
},
}
if planet.ReferralManager != nil {
@ -519,6 +523,7 @@ func createNewSystem(log *zap.Logger, peer *satellite.Core, api *satellite.API,
system.Metrics.Chore = peer.Metrics.Chore
system.DowntimeTracking.DetectionChore = peer.DowntimeTracking.DetectionChore
system.DowntimeTracking.Service = peer.DowntimeTracking.Service
return system

View File

@ -117,7 +117,8 @@ type Core struct {
}
DowntimeTracking struct {
Service *downtime.Service
DetectionChore *downtime.DetectionChore
Service *downtime.Service
}
}
@ -366,9 +367,17 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo
}
{ // setup downtime tracking
log.Debug("Starting downtime tracking service")
log.Debug("Starting downtime tracking")
peer.DowntimeTracking.Service = downtime.NewService(peer.Log.Named("downtime"), peer.Overlay.Service, peer.Contact.Service)
peer.DowntimeTracking.DetectionChore = downtime.NewDetectionChore(
peer.Log.Named("downtime:detection"),
config.Downtime,
peer.Overlay.Service,
peer.DowntimeTracking.Service,
peer.DB.DowntimeTracking(),
)
}
return peer, nil
@ -423,6 +432,9 @@ func (peer *Core) Run(ctx context.Context) (err error) {
return errs2.IgnoreCanceled(peer.Payments.Chore.Run(ctx))
})
}
group.Go(func() error {
return errs2.IgnoreCanceled(peer.DowntimeTracking.DetectionChore.Run(ctx))
})
return group.Wait()
}
@ -434,6 +446,10 @@ func (peer *Core) Close() error {
// TODO: ensure that Close can be called on nil-s that way this code won't need the checks.
// close servers, to avoid new connections to closing subsystems
if peer.DowntimeTracking.DetectionChore != nil {
errlist.Add(peer.DowntimeTracking.DetectionChore.Close())
}
if peer.Metrics.Chore != nil {
errlist.Add(peer.Metrics.Chore.Close())
}

View File

@ -0,0 +1,19 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package downtime
import (
"time"
"gopkg.in/spacemonkeygo/monkit.v2"
)
var (
mon = monkit.Package()
)
// Config for the chore
type Config struct {
DetectionInterval time.Duration `help:"how often to run the downtime detection chore." releaseDefault:"1h0s" devDefault:"30s"`
}

View File

@ -0,0 +1,89 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package downtime
import (
"context"
"time"
"go.uber.org/zap"
"storj.io/common/sync2"
"storj.io/storj/satellite/overlay"
)
// DetectionChore looks for nodes that have not checked in and tries to contact them.
//
// architecture: Chore
type DetectionChore struct {
log *zap.Logger
Loop sync2.Cycle
config Config
overlay *overlay.Service
service *Service
db DB
}
// NewDetectionChore instantiates DetectionChore.
func NewDetectionChore(log *zap.Logger, config Config, overlay *overlay.Service, service *Service, db DB) *DetectionChore {
return &DetectionChore{
log: log,
Loop: *sync2.NewCycle(config.DetectionInterval),
config: config,
overlay: overlay,
service: service,
db: db,
}
}
// Run starts the chore.
func (chore *DetectionChore) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
return chore.Loop.Run(ctx, func(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
chore.log.Debug("checking for nodes that have not had a successful check-in within the interval.",
zap.Stringer("interval", chore.config.DetectionInterval))
nodeLastContacts, err := chore.overlay.GetSuccesfulNodesNotCheckedInSince(ctx, chore.config.DetectionInterval)
if err != nil {
chore.log.Error("error retrieving node addresses for downtime detection.", zap.Error(err))
return nil
}
chore.log.Debug("nodes that have had not had a successful check-in with the interval.",
zap.Stringer("interval", chore.config.DetectionInterval),
zap.Int("count", len(nodeLastContacts)))
for _, nodeLastContact := range nodeLastContacts {
success, err := chore.service.CheckAndUpdateNodeAvailability(ctx, nodeLastContact.ID, nodeLastContact.Address)
if err != nil {
chore.log.Error("error during downtime detection ping back.",
zap.Bool("success", success),
zap.Error(err))
continue
}
if !success {
now := time.Now().UTC()
duration := now.Sub(nodeLastContact.LastContactSuccess) - chore.config.DetectionInterval
err = chore.db.Add(ctx, nodeLastContact.ID, now, duration)
if err != nil {
chore.log.Error("error adding node seconds offline information.",
zap.Stringer("node ID", nodeLastContact.ID),
zap.Stringer("duration", duration),
zap.Error(err))
}
}
}
return nil
})
}
// Close closes chore.
func (chore *DetectionChore) Close() error {
chore.Loop.Close()
return nil
}

View File

@ -0,0 +1,93 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package downtime_test
import (
"testing"
"time"
"github.com/stretchr/testify/require"
"storj.io/common/testcontext"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite/overlay"
)
func TestDetectionChore(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
node := planet.StorageNodes[0]
nodeDossier := planet.StorageNodes[0].Local()
satellite := planet.Satellites[0]
node.Contact.Chore.Pause(ctx)
satellite.DowntimeTracking.DetectionChore.Loop.Pause()
// setup
info := overlay.NodeCheckInInfo{
NodeID: nodeDossier.Id,
IsUp: true,
Address: nodeDossier.Address,
Operator: &nodeDossier.Operator,
Version: &nodeDossier.Version,
}
config := overlay.NodeSelectionConfig{
UptimeReputationLambda: 0.99,
UptimeReputationWeight: 1.0,
UptimeReputationDQ: 0,
}
sixtyOneMinutes := 61 * time.Minute
{ // test node ping back success
// check-in 1 hours, 1 minute ago for that node
oldCheckinTime := time.Now().UTC().Add(-sixtyOneMinutes)
err := satellite.DB.OverlayCache().UpdateCheckIn(ctx, info, oldCheckinTime, config)
require.NoError(t, err)
// get successful nodes that haven't checked in with the hour. should return 1
nodeLastContacts, err := satellite.DB.OverlayCache().GetSuccesfulNodesNotCheckedInSince(ctx, time.Hour)
require.NoError(t, err)
require.Len(t, nodeLastContacts, 1)
require.Equal(t, oldCheckinTime.Truncate(time.Second), nodeLastContacts[0].LastContactSuccess.Truncate(time.Second)) // truncate to avoid flakiness
// run detection chore
satellite.DowntimeTracking.DetectionChore.Loop.TriggerWait()
nodeLastContacts, err = satellite.DB.OverlayCache().GetSuccesfulNodesNotCheckedInSince(ctx, time.Hour)
require.NoError(t, err)
require.Len(t, nodeLastContacts, 0)
// downtime duration should be 0 for the node
downtime, err := satellite.DB.DowntimeTracking().GetOfflineTime(ctx, node.ID(), time.Now().Add(-time.Hour), time.Now().Add(time.Hour))
require.NoError(t, err)
require.EqualValues(t, 0, downtime)
}
{ // test node ping back failure
// check-in 1 hour, 1 minute ago for that node - again
oldCheckinTime := time.Now().UTC().Add(-sixtyOneMinutes)
err := satellite.DB.OverlayCache().UpdateCheckIn(ctx, info, oldCheckinTime, config)
require.NoError(t, err)
// close the node service so the ping back will fail
err = node.Server.Close()
require.NoError(t, err)
// get successful nodes that haven't checked in with the hour. should return 1 - again
nodeLastContacts, err := satellite.DB.OverlayCache().GetSuccesfulNodesNotCheckedInSince(ctx, time.Hour)
require.NoError(t, err)
require.Len(t, nodeLastContacts, 1)
require.Equal(t, oldCheckinTime.Truncate(time.Second), nodeLastContacts[0].LastContactSuccess.Truncate(time.Second)) // truncate to avoid flakiness
// run detection chore - again
satellite.DowntimeTracking.DetectionChore.Loop.TriggerWait()
// downtime duration should be > 1hr
downtime, err := satellite.DB.DowntimeTracking().GetOfflineTime(ctx, node.ID(), time.Now().Add(-time.Hour), time.Now().Add(time.Hour))
require.NoError(t, err)
require.EqualValues(t, time.Hour, downtime.Truncate(time.Hour)) // truncate to avoid flakiness
}
})
}

View File

@ -33,6 +33,8 @@ func NewService(log *zap.Logger, overlay *overlay.Service, contact *contact.Serv
// CheckAndUpdateNodeAvailability tries to ping the supplied address and updates the uptime based on ping success or failure. Returns true if the ping and uptime updates are successful.
func (service *Service) CheckAndUpdateNodeAvailability(ctx context.Context, nodeID storj.NodeID, address string) (success bool, err error) {
defer mon.Task()(&ctx)(&err)
pingNodeSuccess, pingErrorMessage, err := service.contact.PingBack(ctx, address, nodeID)
if err != nil {
service.log.Error("error during downtime detection ping back.",

View File

@ -87,6 +87,8 @@ type DB interface {
// GetNodeIPs returns a list of IP addresses associated with given node IDs.
GetNodeIPs(ctx context.Context, nodeIDs []storj.NodeID) (nodeIPs []string, err error)
// GetSuccesfulNodesNotCheckedInSince returns all nodes that last check-in was successful, but haven't checked-in within a given duration.
GetSuccesfulNodesNotCheckedInSince(ctx context.Context, duration time.Duration) (nodeAddresses []NodeLastContact, err error)
// GetOfflineNodesLimited returns a list of the first N offline nodes ordered by least recently contacted.
GetOfflineNodesLimited(ctx context.Context, limit int) ([]*pb.Node, error)
}
@ -190,6 +192,14 @@ type NodeStats struct {
Disqualified *time.Time
}
// NodeLastContact contains the ID, address, and timestamp
type NodeLastContact struct {
ID storj.NodeID
Address string
LastContactSuccess time.Time
LastContactFailure time.Time
}
// Service is used to store and handle node information
//
// architecture: Service
@ -434,6 +444,13 @@ func (service *Service) UpdateCheckIn(ctx context.Context, node NodeCheckInInfo,
return service.db.UpdateCheckIn(ctx, node, timestamp, service.config.Node)
}
// GetSuccesfulNodesNotCheckedInSince returns all nodes that last check-in was successful, but haven't checked-in within a given duration.
func (service *Service) GetSuccesfulNodesNotCheckedInSince(ctx context.Context, duration time.Duration) (nodeLastContacts []NodeLastContact, err error) {
defer mon.Task()(&ctx)(&err)
return service.db.GetSuccesfulNodesNotCheckedInSince(ctx, duration)
}
// GetMissingPieces returns the list of offline nodes
func (service *Service) GetMissingPieces(ctx context.Context, pieces []*pb.RemotePiece) (missingPieces []int32, err error) {
defer mon.Task()(&ctx)(&err)

View File

@ -593,3 +593,69 @@ func TestCache_DowntimeTracking(t *testing.T) {
require.Equal(t, allIDs[4], nodes[1].Id)
})
}
func TestGetSuccesfulNodesNotCheckedInSince(t *testing.T) {
satellitedbtest.Run(t, func(t *testing.T, db satellite.DB) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
// setup
info1 := getNodeInfo(testrand.NodeID())
info2 := getNodeInfo(testrand.NodeID())
config := overlay.NodeSelectionConfig{
UptimeReputationLambda: 0.99,
UptimeReputationWeight: 1.0,
UptimeReputationDQ: 0,
}
{ // check-in the nodes, which should add them
twoHoursAgo := time.Now().UTC().Add(-2 * time.Hour)
err := db.OverlayCache().UpdateCheckIn(ctx, info1, twoHoursAgo, config)
require.NoError(t, err)
err = db.OverlayCache().UpdateCheckIn(ctx, info2, twoHoursAgo, config)
require.NoError(t, err)
// update uptime so that node 2 has a last contact failure > last contact success
_, err = db.OverlayCache().UpdateUptime(ctx, info2.NodeID, false, 0.99, 1.0, 0)
require.NoError(t, err)
// should just get 1 node
nodeLastContacts, err := db.OverlayCache().GetSuccesfulNodesNotCheckedInSince(ctx, time.Duration(0))
require.NoError(t, err)
require.Len(t, nodeLastContacts, 1)
require.Equal(t, twoHoursAgo.Truncate(time.Second), nodeLastContacts[0].LastContactSuccess.Truncate(time.Second))
require.True(t, nodeLastContacts[0].LastContactFailure.IsZero())
}
{ // check-in again with current time
err := db.OverlayCache().UpdateCheckIn(ctx, info1, time.Now().UTC(), config)
require.NoError(t, err)
nodeLastContacts, err := db.OverlayCache().GetSuccesfulNodesNotCheckedInSince(ctx, time.Minute)
require.NoError(t, err)
require.Len(t, nodeLastContacts, 0)
}
})
}
func getNodeInfo(nodeID storj.NodeID) overlay.NodeCheckInInfo {
return overlay.NodeCheckInInfo{
NodeID: nodeID,
IsUp: true,
Address: &pb.NodeAddress{
Address: "1.2.3.4",
},
Operator: &pb.NodeOperator{
Email: "test@email.com",
Wallet: "0x123",
},
Version: &pb.NodeVersion{
Version: "v0.0.0",
CommitHash: "",
Timestamp: time.Time{},
Release: false,
},
}
}

View File

@ -121,4 +121,6 @@ type Config struct {
GracefulExit gracefulexit.Config
Metrics metrics.Config
Downtime downtime.Config
}

View File

@ -217,6 +217,14 @@ read limitoffset (
orderby asc node.last_contact_failure
)
read all (
select node.id node.address node.last_contact_success node.last_contact_failure
where node.last_contact_success < ?
where node.last_contact_success > node.last_contact_failure
where node.disqualified = null
orderby asc node.last_contact_success
)
//--- repairqueue ---//
model injuredsegment (

View File

@ -6239,19 +6239,52 @@ func __sqlbundle_flattenSQL(x string) string {
type __sqlbundle_postgres struct{}
func (p __sqlbundle_postgres) Rebind(sql string) string {
type sqlParseState int
const (
sqlParseStart sqlParseState = iota
sqlParseInStringLiteral
sqlParseInQuotedIdentifier
sqlParseInComment
)
out := make([]byte, 0, len(sql)+10)
j := 1
state := sqlParseStart
for i := 0; i < len(sql); i++ {
ch := sql[i]
if ch != '?' {
out = append(out, ch)
continue
switch state {
case sqlParseStart:
switch ch {
case '?':
out = append(out, '$')
out = append(out, strconv.Itoa(j)...)
state = sqlParseStart
j++
continue
case '-':
if i+1 < len(sql) && sql[i+1] == '-' {
state = sqlParseInComment
}
case '"':
state = sqlParseInQuotedIdentifier
case '\'':
state = sqlParseInStringLiteral
}
case sqlParseInStringLiteral:
if ch == '\'' {
state = sqlParseStart
}
case sqlParseInQuotedIdentifier:
if ch == '"' {
state = sqlParseStart
}
case sqlParseInComment:
if ch == '\n' {
state = sqlParseStart
}
}
out = append(out, '$')
out = append(out, strconv.Itoa(j)...)
j++
out = append(out, ch)
}
return string(out)
@ -6265,6 +6298,62 @@ func (s __sqlbundle_sqlite3) Rebind(sql string) string {
return sql
}
// this type is specially named to match up with the name returned by the
// dialect impl in the sql package.
type __sqlbundle_cockroach struct{}
func (p __sqlbundle_cockroach) Rebind(sql string) string {
type sqlParseState int
const (
sqlParseStart sqlParseState = iota
sqlParseInStringLiteral
sqlParseInQuotedIdentifier
sqlParseInComment
)
out := make([]byte, 0, len(sql)+10)
j := 1
state := sqlParseStart
for i := 0; i < len(sql); i++ {
ch := sql[i]
switch state {
case sqlParseStart:
switch ch {
case '?':
out = append(out, '$')
out = append(out, strconv.Itoa(j)...)
state = sqlParseStart
j++
continue
case '-':
if i+1 < len(sql) && sql[i+1] == '-' {
state = sqlParseInComment
}
case '"':
state = sqlParseInQuotedIdentifier
case '\'':
state = sqlParseInStringLiteral
}
case sqlParseInStringLiteral:
if ch == '\'' {
state = sqlParseStart
}
case sqlParseInQuotedIdentifier:
if ch == '"' {
state = sqlParseStart
}
case sqlParseInComment:
if ch == '\n' {
state = sqlParseStart
}
}
out = append(out, ch)
}
return string(out)
}
type __sqlbundle_Literal string
func (__sqlbundle_Literal) private() {}
@ -6347,6 +6436,13 @@ type CustomerId_Row struct {
CustomerId string
}
type Id_Address_LastContactSuccess_LastContactFailure_Row struct {
Id []byte
Address string
LastContactSuccess time.Time
LastContactFailure time.Time
}
type Id_LastNet_Address_Protocol_Row struct {
Id []byte
LastNet string
@ -7734,6 +7830,40 @@ func (obj *postgresImpl) Limited_Node_By_LastContactSuccess_Less_LastContactFail
}
func (obj *postgresImpl) All_Node_Id_Node_Address_Node_LastContactSuccess_Node_LastContactFailure_By_LastContactSuccess_Less_And_LastContactSuccess_Greater_LastContactFailure_And_Disqualified_Is_Null_OrderBy_Asc_LastContactSuccess(ctx context.Context,
node_last_contact_success_less Node_LastContactSuccess_Field) (
rows []*Id_Address_LastContactSuccess_LastContactFailure_Row, err error) {
defer mon.Task()(&ctx)(&err)
var __embed_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.address, nodes.last_contact_success, nodes.last_contact_failure FROM nodes WHERE nodes.last_contact_success < ? AND nodes.last_contact_success > nodes.last_contact_failure AND nodes.disqualified is NULL ORDER BY nodes.last_contact_success")
var __values []interface{}
__values = append(__values, node_last_contact_success_less.value())
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __values...)
__rows, err := obj.driver.Query(__stmt, __values...)
if err != nil {
return nil, obj.makeErr(err)
}
defer __rows.Close()
for __rows.Next() {
row := &Id_Address_LastContactSuccess_LastContactFailure_Row{}
err = __rows.Scan(&row.Id, &row.Address, &row.LastContactSuccess, &row.LastContactFailure)
if err != nil {
return nil, obj.makeErr(err)
}
rows = append(rows, row)
}
if err := __rows.Err(); err != nil {
return nil, obj.makeErr(err)
}
return rows, nil
}
func (obj *postgresImpl) Get_User_By_NormalizedEmail_And_Status_Not_Number(ctx context.Context,
user_normalized_email User_NormalizedEmail_Field) (
user *User, err error) {
@ -11590,6 +11720,16 @@ func (rx *Rx) All_Node_Id(ctx context.Context) (
return tx.All_Node_Id(ctx)
}
func (rx *Rx) All_Node_Id_Node_Address_Node_LastContactSuccess_Node_LastContactFailure_By_LastContactSuccess_Less_And_LastContactSuccess_Greater_LastContactFailure_And_Disqualified_Is_Null_OrderBy_Asc_LastContactSuccess(ctx context.Context,
node_last_contact_success_less Node_LastContactSuccess_Field) (
rows []*Id_Address_LastContactSuccess_LastContactFailure_Row, err error) {
var tx *Tx
if tx, err = rx.getTx(ctx); err != nil {
return
}
return tx.All_Node_Id_Node_Address_Node_LastContactSuccess_Node_LastContactFailure_By_LastContactSuccess_Less_And_LastContactSuccess_Greater_LastContactFailure_And_Disqualified_Is_Null_OrderBy_Asc_LastContactSuccess(ctx, node_last_contact_success_less)
}
func (rx *Rx) All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number(ctx context.Context) (
rows []*Id_PieceCount_Row, err error) {
var tx *Tx
@ -13127,6 +13267,10 @@ type Methods interface {
All_Node_Id(ctx context.Context) (
rows []*Id_Row, err error)
All_Node_Id_Node_Address_Node_LastContactSuccess_Node_LastContactFailure_By_LastContactSuccess_Less_And_LastContactSuccess_Greater_LastContactFailure_And_Disqualified_Is_Null_OrderBy_Asc_LastContactSuccess(ctx context.Context,
node_last_contact_success_less Node_LastContactSuccess_Field) (
rows []*Id_Address_LastContactSuccess_LastContactFailure_Row, err error)
All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number(ctx context.Context) (
rows []*Id_PieceCount_Row, err error)

View File

@ -1022,6 +1022,36 @@ func (cache *overlaycache) UpdateExitStatus(ctx context.Context, request *overla
return convertDBNode(ctx, dbNode)
}
// GetSuccesfulNodesNotCheckedInSince returns all nodes that last check-in was successful, but haven't checked-in within a given duration.
func (cache *overlaycache) GetSuccesfulNodesNotCheckedInSince(ctx context.Context, duration time.Duration) (nodeLastContacts []overlay.NodeLastContact, err error) {
// get successful nodes that have not checked-in with the hour
defer mon.Task()(&ctx)(&err)
dbxNodes, err := cache.db.DB.All_Node_Id_Node_Address_Node_LastContactSuccess_Node_LastContactFailure_By_LastContactSuccess_Less_And_LastContactSuccess_Greater_LastContactFailure_And_Disqualified_Is_Null_OrderBy_Asc_LastContactSuccess(
ctx, dbx.Node_LastContactSuccess(time.Now().UTC().Add(-duration)))
if err != nil {
return nil, Error.Wrap(err)
}
for _, node := range dbxNodes {
nodeID, err := storj.NodeIDFromBytes(node.Id)
if err != nil {
return nil, err
}
nodeLastContact := overlay.NodeLastContact{
ID: nodeID,
Address: node.Address,
LastContactSuccess: node.LastContactSuccess.UTC(),
LastContactFailure: node.LastContactFailure.UTC(),
}
nodeLastContacts = append(nodeLastContacts, nodeLastContact)
}
return nodeLastContacts, nil
}
func populateExitStatusFields(req *overlay.ExitStatusRequest) dbx.Node_Update_Fields {
dbxUpdateFields := dbx.Node_Update_Fields{}

View File

@ -97,6 +97,9 @@ contact.external-address: ""
# If set, a path to write a process trace SVG to
# debug.trace-out: ""
# how often to run the downtime detection chore.
# downtime.detection-interval: 1h0m0s
# the number of nodes to concurrently send garbage collection bloom filters to
# garbage-collection.concurrent-sends: 1