diff --git a/private/testplanet/satellite.go b/private/testplanet/satellite.go index 99a22e407..2c3c7b716 100644 --- a/private/testplanet/satellite.go +++ b/private/testplanet/satellite.go @@ -55,6 +55,7 @@ import ( "storj.io/storj/satellite/nodestats" "storj.io/storj/satellite/orders" "storj.io/storj/satellite/overlay" + "storj.io/storj/satellite/overlay/offlinenodes" "storj.io/storj/satellite/overlay/straynodes" "storj.io/storj/satellite/repair/checker" "storj.io/storj/satellite/repair/repairer" @@ -90,9 +91,10 @@ type Satellite struct { } Overlay struct { - DB overlay.DB - Service *overlay.Service - DQStrayNodes *straynodes.Chore + DB overlay.DB + Service *overlay.Service + OfflineNodeEmails *offlinenodes.Chore + DQStrayNodes *straynodes.Chore } NodeEvents struct { @@ -580,6 +582,7 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer system.Overlay.DB = api.Overlay.DB system.Overlay.Service = api.Overlay.Service + system.Overlay.OfflineNodeEmails = peer.Overlay.OfflineNodeEmails system.Overlay.DQStrayNodes = peer.Overlay.DQStrayNodes system.NodeEvents.DB = peer.NodeEvents.DB diff --git a/satellite/core.go b/satellite/core.go index 51c046fa3..70909c870 100644 --- a/satellite/core.go +++ b/satellite/core.go @@ -43,6 +43,7 @@ import ( "storj.io/storj/satellite/nodeevents" "storj.io/storj/satellite/orders" "storj.io/storj/satellite/overlay" + "storj.io/storj/satellite/overlay/offlinenodes" "storj.io/storj/satellite/overlay/straynodes" "storj.io/storj/satellite/payments" "storj.io/storj/satellite/payments/billing" @@ -83,9 +84,10 @@ type Core struct { // services and endpoints Overlay struct { - DB overlay.DB - Service *overlay.Service - DQStrayNodes *straynodes.Chore + DB overlay.DB + Service *overlay.Service + OfflineNodeEmails *offlinenodes.Chore + DQStrayNodes *straynodes.Chore } NodeEvents struct { @@ -269,6 +271,17 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, Close: peer.Overlay.Service.Close, }) + if config.Overlay.SendNodeEmails { + peer.Overlay.OfflineNodeEmails = offlinenodes.NewChore(log.Named("overlay:offline-node-emails"), peer.Mail.Service, peer.Overlay.Service, config.OfflineNodes) + peer.Services.Add(lifecycle.Item{ + Name: "overlay:offline-node-emails", + Run: peer.Overlay.OfflineNodeEmails.Run, + Close: peer.Overlay.OfflineNodeEmails.Close, + }) + peer.Debug.Server.Panel.Add( + debug.Cycle("Overlay Offline Node Emails", peer.Overlay.OfflineNodeEmails.Loop)) + } + if config.StrayNodes.EnableDQ { peer.Overlay.DQStrayNodes = straynodes.NewChore(peer.Log.Named("overlay:dq-stray-nodes"), peer.Overlay.Service, config.StrayNodes) peer.Services.Add(lifecycle.Item{ diff --git a/satellite/overlay/offlinenodes/chore.go b/satellite/overlay/offlinenodes/chore.go new file mode 100644 index 000000000..610ff120d --- /dev/null +++ b/satellite/overlay/offlinenodes/chore.go @@ -0,0 +1,74 @@ +// Copyright (C) 2022 Storj Labs, Inc. +// See LICENSE for copying information. + +package offlinenodes + +import ( + "context" + "time" + + "github.com/spacemonkeygo/monkit/v3" + "go.uber.org/zap" + + "storj.io/common/sync2" + "storj.io/storj/satellite/mailservice" + "storj.io/storj/satellite/overlay" +) + +var mon = monkit.Package() + +// Config contains configurable values for offline nodes chore. +type Config struct { + Interval time.Duration `help:"how often to check for offline nodes and send them emails" default:"1h"` + Cooldown time.Duration `help:"how long to wait between sending Node Offline emails" default:"24h"` + MaxEmails int `help:"max number of offline emails to send a node operator until the node comes back online" default:"3"` + Limit int `help:"Max number of nodes to return in a single query. Chore will iterate until rows returned is less than limit" releaseDefault:"1000" devDefault:"1000"` +} + +// Chore sends emails to offline nodes. +type Chore struct { + log *zap.Logger + mail *mailservice.Service + cache *overlay.Service + config Config + Loop *sync2.Cycle +} + +// NewChore creates a new offline nodes Chore. +func NewChore(log *zap.Logger, mail *mailservice.Service, cache *overlay.Service, config Config) *Chore { + return &Chore{ + log: log, + mail: mail, + cache: cache, + config: config, + Loop: sync2.NewCycle(config.Interval), + } +} + +// Run runs the chore. +func (chore *Chore) Run(ctx context.Context) (err error) { + defer mon.Task()(&ctx)(&err) + // multiply max emails by email cooldown to get cutoff for emails + // e.g. cooldown = 24h, maxEmails = 3 + // after 72h the node should get 3 emails and no more. + cutoff := time.Duration(chore.config.Cooldown.Nanoseconds() * int64(chore.config.MaxEmails)) + return chore.Loop.Run(ctx, func(ctx context.Context) error { + for { + count, err := chore.cache.InsertOfflineNodeEvents(ctx, chore.config.Cooldown, cutoff, chore.config.Limit) + if err != nil { + chore.log.Error("error inserting offline node events", zap.Error(err)) + return nil + } + if count < chore.config.Limit { + break + } + } + return nil + }) +} + +// Close closes chore. +func (chore *Chore) Close() error { + chore.Loop.Close() + return nil +} diff --git a/satellite/overlay/offlinenodes/chore_test.go b/satellite/overlay/offlinenodes/chore_test.go new file mode 100644 index 000000000..e0e933566 --- /dev/null +++ b/satellite/overlay/offlinenodes/chore_test.go @@ -0,0 +1,109 @@ +// Copyright (C) 2022 Storj Labs, Inc. +// See LICENSE for copying information. + +package offlinenodes_test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "storj.io/common/pb" + "storj.io/common/storj" + "storj.io/common/testcontext" + "storj.io/storj/private/testplanet" + "storj.io/storj/satellite" + "storj.io/storj/satellite/nodeevents" + "storj.io/storj/satellite/overlay" +) + +func TestOfflineNodes(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 2, + Reconfigure: testplanet.Reconfigure{ + Satellite: func(log *zap.Logger, index int, config *satellite.Config) { + config.Overlay.SendNodeEmails = true + }, + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + offlineNode := planet.StorageNodes[0] + onlineNode := planet.StorageNodes[1] + sat := planet.Satellites[0] + offlineNode.Contact.Chore.Pause(ctx) + cache := planet.Satellites[0].Overlay.DB + + offlineInfo, err := cache.Get(ctx, offlineNode.ID()) + require.NoError(t, err) + require.Nil(t, offlineInfo.LastOfflineEmail) + + checkInInfo := overlay.NodeCheckInInfo{ + NodeID: offlineInfo.Id, + IsUp: true, + Address: &pb.NodeAddress{ + Address: "1.2.3.4", + }, + Version: &pb.NodeVersion{ + Version: "v0.0.0", + CommitHash: "", + Timestamp: time.Time{}, + Release: false, + }, + Operator: &pb.NodeOperator{ + Email: "offline@storj.test", + }, + } + + // offline node checks in 48 hours ago + require.NoError(t, sat.Overlay.DB.UpdateCheckIn(ctx, checkInInfo, time.Now().Add(-48*time.Hour), sat.Config.Overlay.Node)) + + // online node checks in now + checkInInfo.NodeID = onlineNode.ID() + require.NoError(t, sat.Overlay.DB.UpdateCheckIn(ctx, checkInInfo, time.Now(), sat.Config.Overlay.Node)) + + sat.Overlay.OfflineNodeEmails.Loop.TriggerWait() + + // offline node gets an email + offlineInfo, err = cache.Get(ctx, offlineNode.ID()) + require.NoError(t, err) + lastEmail := offlineInfo.LastOfflineEmail + require.NotNil(t, lastEmail) + + ne, err := planet.Satellites[0].DB.NodeEvents().GetLatestByEmailAndEvent(ctx, offlineInfo.Operator.Email, nodeevents.Offline) + require.NoError(t, err) + require.Equal(t, offlineNode.ID(), ne.NodeID) + require.Equal(t, offlineInfo.Operator.Email, ne.Email) + require.Equal(t, nodeevents.Offline, ne.Event) + + firstEventTime := ne.CreatedAt + + // online node does not get an email + onlineInfo, err := cache.Get(ctx, onlineNode.ID()) + require.NoError(t, err) + require.Nil(t, onlineInfo.LastOfflineEmail) + + // run chore again and check that offline node does not get another email before cooldown has passed + sat.Overlay.OfflineNodeEmails.Loop.TriggerWait() + + offlineInfo, err = cache.Get(ctx, offlineNode.ID()) + require.NoError(t, err) + require.Equal(t, lastEmail, offlineInfo.LastOfflineEmail) + + // change last_offline_email so that cooldown has passed and email should be sent again + require.NoError(t, cache.UpdateLastOfflineEmail(ctx, []storj.NodeID{offlineNode.ID()}, time.Now().Add(-48*time.Hour))) + + sat.Overlay.OfflineNodeEmails.Loop.TriggerWait() + + ne, err = planet.Satellites[0].DB.NodeEvents().GetLatestByEmailAndEvent(ctx, offlineInfo.Operator.Email, nodeevents.Offline) + require.NoError(t, err) + require.Equal(t, offlineNode.ID(), ne.NodeID) + require.Equal(t, offlineInfo.Operator.Email, ne.Email) + require.Equal(t, nodeevents.Offline, ne.Event) + require.True(t, firstEventTime.Before(ne.CreatedAt)) + + offlineInfo, err = cache.Get(ctx, offlineNode.ID()) + require.NoError(t, err) + require.True(t, offlineInfo.LastOfflineEmail.After(*lastEmail)) + }) +} diff --git a/satellite/overlay/service.go b/satellite/overlay/service.go index a8bc029bb..c84b4c228 100644 --- a/satellite/overlay/service.go +++ b/satellite/overlay/service.go @@ -520,6 +520,39 @@ func (service *Service) KnownUnreliableOrOffline(ctx context.Context, nodeIds st return service.db.KnownUnreliableOrOffline(ctx, criteria, nodeIds) } +// InsertOfflineNodeEvents inserts offline events into node events. +func (service *Service) InsertOfflineNodeEvents(ctx context.Context, cooldown time.Duration, cutoff time.Duration, limit int) (count int, err error) { + defer mon.Task()(&ctx)(&err) + + if !service.config.SendNodeEmails { + return 0, nil + } + + nodes, err := service.db.GetOfflineNodesForEmail(ctx, service.config.Node.OnlineWindow, cutoff, cooldown, limit) + if err != nil { + return 0, err + } + + count = len(nodes) + + var successful storj.NodeIDList + for id, email := range nodes { + _, err = service.nodeEvents.Insert(ctx, email, id, nodeevents.Offline) + if err != nil { + service.log.Error("could not insert node offline into node events", zap.Error(err)) + } else { + successful = append(successful, id) + } + } + if len(successful) > 0 { + err = service.db.UpdateLastOfflineEmail(ctx, successful, time.Now()) + if err != nil { + return count, err + } + } + return count, err +} + // KnownReliableInExcludedCountries filters healthy nodes that are in excluded countries. func (service *Service) KnownReliableInExcludedCountries(ctx context.Context, nodeIds storj.NodeIDList) (reliableInExcluded storj.NodeIDList, err error) { defer mon.Task()(&ctx)(&err) diff --git a/satellite/peer.go b/satellite/peer.go index 89f536b44..9c0399b9d 100644 --- a/satellite/peer.go +++ b/satellite/peer.go @@ -51,6 +51,7 @@ import ( "storj.io/storj/satellite/oidc" "storj.io/storj/satellite/orders" "storj.io/storj/satellite/overlay" + "storj.io/storj/satellite/overlay/offlinenodes" "storj.io/storj/satellite/overlay/straynodes" "storj.io/storj/satellite/payments/billing" "storj.io/storj/satellite/payments/paymentsconfig" @@ -143,10 +144,11 @@ type Config struct { Admin admin.Config - Contact contact.Config - Overlay overlay.Config - NodeEvents nodeevents.Config - StrayNodes straynodes.Config + Contact contact.Config + Overlay overlay.Config + OfflineNodes offlinenodes.Config + NodeEvents nodeevents.Config + StrayNodes straynodes.Config Metainfo metainfo.Config Orders orders.Config diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index c9d5698d5..afc9af724 100755 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -676,6 +676,18 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key # how long the earliest instance of an event for a particular email should exist in the DB before it is selected # node-events.selection-wait-period: 5m0s +# how long to wait between sending Node Offline emails +# offline-nodes.cooldown: 24h0m0s + +# how often to check for offline nodes and send them emails +# offline-nodes.interval: 1h0m0s + +# Max number of nodes to return in a single query. Chore will iterate until rows returned is less than limit +# offline-nodes.limit: 1000 + +# max number of offline emails to send a node operator until the node comes back online +# offline-nodes.max-emails: 3 + # encryption keys to encrypt info in orders # orders.encryption-keys: ""