satellite/overlay/offlinenodes: insert offline nodes into node events
Add a new chore to periodically insert nodes who are offline and have not gotten an offline email in a certain amount of time into node events Change-Id: I658b385bb777b0240c98092946a93d65bee94abc
This commit is contained in:
parent
ec777855e1
commit
87660bd9b3
@ -55,6 +55,7 @@ import (
|
||||
"storj.io/storj/satellite/nodestats"
|
||||
"storj.io/storj/satellite/orders"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
"storj.io/storj/satellite/overlay/offlinenodes"
|
||||
"storj.io/storj/satellite/overlay/straynodes"
|
||||
"storj.io/storj/satellite/repair/checker"
|
||||
"storj.io/storj/satellite/repair/repairer"
|
||||
@ -90,9 +91,10 @@ type Satellite struct {
|
||||
}
|
||||
|
||||
Overlay struct {
|
||||
DB overlay.DB
|
||||
Service *overlay.Service
|
||||
DQStrayNodes *straynodes.Chore
|
||||
DB overlay.DB
|
||||
Service *overlay.Service
|
||||
OfflineNodeEmails *offlinenodes.Chore
|
||||
DQStrayNodes *straynodes.Chore
|
||||
}
|
||||
|
||||
NodeEvents struct {
|
||||
@ -580,6 +582,7 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer
|
||||
|
||||
system.Overlay.DB = api.Overlay.DB
|
||||
system.Overlay.Service = api.Overlay.Service
|
||||
system.Overlay.OfflineNodeEmails = peer.Overlay.OfflineNodeEmails
|
||||
system.Overlay.DQStrayNodes = peer.Overlay.DQStrayNodes
|
||||
|
||||
system.NodeEvents.DB = peer.NodeEvents.DB
|
||||
|
@ -43,6 +43,7 @@ import (
|
||||
"storj.io/storj/satellite/nodeevents"
|
||||
"storj.io/storj/satellite/orders"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
"storj.io/storj/satellite/overlay/offlinenodes"
|
||||
"storj.io/storj/satellite/overlay/straynodes"
|
||||
"storj.io/storj/satellite/payments"
|
||||
"storj.io/storj/satellite/payments/billing"
|
||||
@ -83,9 +84,10 @@ type Core struct {
|
||||
|
||||
// services and endpoints
|
||||
Overlay struct {
|
||||
DB overlay.DB
|
||||
Service *overlay.Service
|
||||
DQStrayNodes *straynodes.Chore
|
||||
DB overlay.DB
|
||||
Service *overlay.Service
|
||||
OfflineNodeEmails *offlinenodes.Chore
|
||||
DQStrayNodes *straynodes.Chore
|
||||
}
|
||||
|
||||
NodeEvents struct {
|
||||
@ -269,6 +271,17 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
|
||||
Close: peer.Overlay.Service.Close,
|
||||
})
|
||||
|
||||
if config.Overlay.SendNodeEmails {
|
||||
peer.Overlay.OfflineNodeEmails = offlinenodes.NewChore(log.Named("overlay:offline-node-emails"), peer.Mail.Service, peer.Overlay.Service, config.OfflineNodes)
|
||||
peer.Services.Add(lifecycle.Item{
|
||||
Name: "overlay:offline-node-emails",
|
||||
Run: peer.Overlay.OfflineNodeEmails.Run,
|
||||
Close: peer.Overlay.OfflineNodeEmails.Close,
|
||||
})
|
||||
peer.Debug.Server.Panel.Add(
|
||||
debug.Cycle("Overlay Offline Node Emails", peer.Overlay.OfflineNodeEmails.Loop))
|
||||
}
|
||||
|
||||
if config.StrayNodes.EnableDQ {
|
||||
peer.Overlay.DQStrayNodes = straynodes.NewChore(peer.Log.Named("overlay:dq-stray-nodes"), peer.Overlay.Service, config.StrayNodes)
|
||||
peer.Services.Add(lifecycle.Item{
|
||||
|
74
satellite/overlay/offlinenodes/chore.go
Normal file
74
satellite/overlay/offlinenodes/chore.go
Normal file
@ -0,0 +1,74 @@
|
||||
// Copyright (C) 2022 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package offlinenodes
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/spacemonkeygo/monkit/v3"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/sync2"
|
||||
"storj.io/storj/satellite/mailservice"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
)
|
||||
|
||||
var mon = monkit.Package()
|
||||
|
||||
// Config contains configurable values for offline nodes chore.
|
||||
type Config struct {
|
||||
Interval time.Duration `help:"how often to check for offline nodes and send them emails" default:"1h"`
|
||||
Cooldown time.Duration `help:"how long to wait between sending Node Offline emails" default:"24h"`
|
||||
MaxEmails int `help:"max number of offline emails to send a node operator until the node comes back online" default:"3"`
|
||||
Limit int `help:"Max number of nodes to return in a single query. Chore will iterate until rows returned is less than limit" releaseDefault:"1000" devDefault:"1000"`
|
||||
}
|
||||
|
||||
// Chore sends emails to offline nodes.
|
||||
type Chore struct {
|
||||
log *zap.Logger
|
||||
mail *mailservice.Service
|
||||
cache *overlay.Service
|
||||
config Config
|
||||
Loop *sync2.Cycle
|
||||
}
|
||||
|
||||
// NewChore creates a new offline nodes Chore.
|
||||
func NewChore(log *zap.Logger, mail *mailservice.Service, cache *overlay.Service, config Config) *Chore {
|
||||
return &Chore{
|
||||
log: log,
|
||||
mail: mail,
|
||||
cache: cache,
|
||||
config: config,
|
||||
Loop: sync2.NewCycle(config.Interval),
|
||||
}
|
||||
}
|
||||
|
||||
// Run runs the chore.
|
||||
func (chore *Chore) Run(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
// multiply max emails by email cooldown to get cutoff for emails
|
||||
// e.g. cooldown = 24h, maxEmails = 3
|
||||
// after 72h the node should get 3 emails and no more.
|
||||
cutoff := time.Duration(chore.config.Cooldown.Nanoseconds() * int64(chore.config.MaxEmails))
|
||||
return chore.Loop.Run(ctx, func(ctx context.Context) error {
|
||||
for {
|
||||
count, err := chore.cache.InsertOfflineNodeEvents(ctx, chore.config.Cooldown, cutoff, chore.config.Limit)
|
||||
if err != nil {
|
||||
chore.log.Error("error inserting offline node events", zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
if count < chore.config.Limit {
|
||||
break
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// Close closes chore.
|
||||
func (chore *Chore) Close() error {
|
||||
chore.Loop.Close()
|
||||
return nil
|
||||
}
|
109
satellite/overlay/offlinenodes/chore_test.go
Normal file
109
satellite/overlay/offlinenodes/chore_test.go
Normal file
@ -0,0 +1,109 @@
|
||||
// Copyright (C) 2022 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package offlinenodes_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/pb"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/nodeevents"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
)
|
||||
|
||||
func TestOfflineNodes(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 2,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
config.Overlay.SendNodeEmails = true
|
||||
},
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
offlineNode := planet.StorageNodes[0]
|
||||
onlineNode := planet.StorageNodes[1]
|
||||
sat := planet.Satellites[0]
|
||||
offlineNode.Contact.Chore.Pause(ctx)
|
||||
cache := planet.Satellites[0].Overlay.DB
|
||||
|
||||
offlineInfo, err := cache.Get(ctx, offlineNode.ID())
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, offlineInfo.LastOfflineEmail)
|
||||
|
||||
checkInInfo := overlay.NodeCheckInInfo{
|
||||
NodeID: offlineInfo.Id,
|
||||
IsUp: true,
|
||||
Address: &pb.NodeAddress{
|
||||
Address: "1.2.3.4",
|
||||
},
|
||||
Version: &pb.NodeVersion{
|
||||
Version: "v0.0.0",
|
||||
CommitHash: "",
|
||||
Timestamp: time.Time{},
|
||||
Release: false,
|
||||
},
|
||||
Operator: &pb.NodeOperator{
|
||||
Email: "offline@storj.test",
|
||||
},
|
||||
}
|
||||
|
||||
// offline node checks in 48 hours ago
|
||||
require.NoError(t, sat.Overlay.DB.UpdateCheckIn(ctx, checkInInfo, time.Now().Add(-48*time.Hour), sat.Config.Overlay.Node))
|
||||
|
||||
// online node checks in now
|
||||
checkInInfo.NodeID = onlineNode.ID()
|
||||
require.NoError(t, sat.Overlay.DB.UpdateCheckIn(ctx, checkInInfo, time.Now(), sat.Config.Overlay.Node))
|
||||
|
||||
sat.Overlay.OfflineNodeEmails.Loop.TriggerWait()
|
||||
|
||||
// offline node gets an email
|
||||
offlineInfo, err = cache.Get(ctx, offlineNode.ID())
|
||||
require.NoError(t, err)
|
||||
lastEmail := offlineInfo.LastOfflineEmail
|
||||
require.NotNil(t, lastEmail)
|
||||
|
||||
ne, err := planet.Satellites[0].DB.NodeEvents().GetLatestByEmailAndEvent(ctx, offlineInfo.Operator.Email, nodeevents.Offline)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, offlineNode.ID(), ne.NodeID)
|
||||
require.Equal(t, offlineInfo.Operator.Email, ne.Email)
|
||||
require.Equal(t, nodeevents.Offline, ne.Event)
|
||||
|
||||
firstEventTime := ne.CreatedAt
|
||||
|
||||
// online node does not get an email
|
||||
onlineInfo, err := cache.Get(ctx, onlineNode.ID())
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, onlineInfo.LastOfflineEmail)
|
||||
|
||||
// run chore again and check that offline node does not get another email before cooldown has passed
|
||||
sat.Overlay.OfflineNodeEmails.Loop.TriggerWait()
|
||||
|
||||
offlineInfo, err = cache.Get(ctx, offlineNode.ID())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, lastEmail, offlineInfo.LastOfflineEmail)
|
||||
|
||||
// change last_offline_email so that cooldown has passed and email should be sent again
|
||||
require.NoError(t, cache.UpdateLastOfflineEmail(ctx, []storj.NodeID{offlineNode.ID()}, time.Now().Add(-48*time.Hour)))
|
||||
|
||||
sat.Overlay.OfflineNodeEmails.Loop.TriggerWait()
|
||||
|
||||
ne, err = planet.Satellites[0].DB.NodeEvents().GetLatestByEmailAndEvent(ctx, offlineInfo.Operator.Email, nodeevents.Offline)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, offlineNode.ID(), ne.NodeID)
|
||||
require.Equal(t, offlineInfo.Operator.Email, ne.Email)
|
||||
require.Equal(t, nodeevents.Offline, ne.Event)
|
||||
require.True(t, firstEventTime.Before(ne.CreatedAt))
|
||||
|
||||
offlineInfo, err = cache.Get(ctx, offlineNode.ID())
|
||||
require.NoError(t, err)
|
||||
require.True(t, offlineInfo.LastOfflineEmail.After(*lastEmail))
|
||||
})
|
||||
}
|
@ -520,6 +520,39 @@ func (service *Service) KnownUnreliableOrOffline(ctx context.Context, nodeIds st
|
||||
return service.db.KnownUnreliableOrOffline(ctx, criteria, nodeIds)
|
||||
}
|
||||
|
||||
// InsertOfflineNodeEvents inserts offline events into node events.
|
||||
func (service *Service) InsertOfflineNodeEvents(ctx context.Context, cooldown time.Duration, cutoff time.Duration, limit int) (count int, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if !service.config.SendNodeEmails {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
nodes, err := service.db.GetOfflineNodesForEmail(ctx, service.config.Node.OnlineWindow, cutoff, cooldown, limit)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
count = len(nodes)
|
||||
|
||||
var successful storj.NodeIDList
|
||||
for id, email := range nodes {
|
||||
_, err = service.nodeEvents.Insert(ctx, email, id, nodeevents.Offline)
|
||||
if err != nil {
|
||||
service.log.Error("could not insert node offline into node events", zap.Error(err))
|
||||
} else {
|
||||
successful = append(successful, id)
|
||||
}
|
||||
}
|
||||
if len(successful) > 0 {
|
||||
err = service.db.UpdateLastOfflineEmail(ctx, successful, time.Now())
|
||||
if err != nil {
|
||||
return count, err
|
||||
}
|
||||
}
|
||||
return count, err
|
||||
}
|
||||
|
||||
// KnownReliableInExcludedCountries filters healthy nodes that are in excluded countries.
|
||||
func (service *Service) KnownReliableInExcludedCountries(ctx context.Context, nodeIds storj.NodeIDList) (reliableInExcluded storj.NodeIDList, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
@ -51,6 +51,7 @@ import (
|
||||
"storj.io/storj/satellite/oidc"
|
||||
"storj.io/storj/satellite/orders"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
"storj.io/storj/satellite/overlay/offlinenodes"
|
||||
"storj.io/storj/satellite/overlay/straynodes"
|
||||
"storj.io/storj/satellite/payments/billing"
|
||||
"storj.io/storj/satellite/payments/paymentsconfig"
|
||||
@ -143,10 +144,11 @@ type Config struct {
|
||||
|
||||
Admin admin.Config
|
||||
|
||||
Contact contact.Config
|
||||
Overlay overlay.Config
|
||||
NodeEvents nodeevents.Config
|
||||
StrayNodes straynodes.Config
|
||||
Contact contact.Config
|
||||
Overlay overlay.Config
|
||||
OfflineNodes offlinenodes.Config
|
||||
NodeEvents nodeevents.Config
|
||||
StrayNodes straynodes.Config
|
||||
|
||||
Metainfo metainfo.Config
|
||||
Orders orders.Config
|
||||
|
12
scripts/testdata/satellite-config.yaml.lock
vendored
12
scripts/testdata/satellite-config.yaml.lock
vendored
@ -676,6 +676,18 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
|
||||
# how long the earliest instance of an event for a particular email should exist in the DB before it is selected
|
||||
# node-events.selection-wait-period: 5m0s
|
||||
|
||||
# how long to wait between sending Node Offline emails
|
||||
# offline-nodes.cooldown: 24h0m0s
|
||||
|
||||
# how often to check for offline nodes and send them emails
|
||||
# offline-nodes.interval: 1h0m0s
|
||||
|
||||
# Max number of nodes to return in a single query. Chore will iterate until rows returned is less than limit
|
||||
# offline-nodes.limit: 1000
|
||||
|
||||
# max number of offline emails to send a node operator until the node comes back online
|
||||
# offline-nodes.max-emails: 3
|
||||
|
||||
# encryption keys to encrypt info in orders
|
||||
# orders.encryption-keys: ""
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user