satellite/nodeevents: add Chore
Create NodeEvents Chore on satellite core to read nodeevents DB and notify node operators on node events. The chore sends notifications grouped by email and event type: it selects the oldest entry in nodeevents.DB and also any other event with the same email and event type no matter how old it is. The oldest entry of a group must exist for a minimum amount of time before that group can be selected, however. This minimum amount of time is a configurable value: --node-events.selection-wait-period. This wait period allows us to combine events of the same time and same email address into a singular email. Change-Id: I8b444aa324d2dae265cc27d9e9e85faef79195d8
This commit is contained in:
parent
3e0a4230a5
commit
57be07f60a
@ -51,6 +51,7 @@ import (
|
||||
"storj.io/storj/satellite/metainfo"
|
||||
"storj.io/storj/satellite/metainfo/expireddeletion"
|
||||
"storj.io/storj/satellite/metrics"
|
||||
"storj.io/storj/satellite/nodeevents"
|
||||
"storj.io/storj/satellite/nodestats"
|
||||
"storj.io/storj/satellite/orders"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
@ -94,6 +95,12 @@ type Satellite struct {
|
||||
DQStrayNodes *straynodes.Chore
|
||||
}
|
||||
|
||||
NodeEvents struct {
|
||||
DB nodeevents.DB
|
||||
Notifier nodeevents.Notifier
|
||||
Chore *nodeevents.Chore
|
||||
}
|
||||
|
||||
Metainfo struct {
|
||||
// TODO remove when uplink will be adjusted to use Metabase.DB
|
||||
Metabase *metabase.DB
|
||||
@ -575,6 +582,10 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer
|
||||
system.Overlay.Service = api.Overlay.Service
|
||||
system.Overlay.DQStrayNodes = peer.Overlay.DQStrayNodes
|
||||
|
||||
system.NodeEvents.DB = peer.NodeEvents.DB
|
||||
system.NodeEvents.Notifier = peer.NodeEvents.Notifier
|
||||
system.NodeEvents.Chore = peer.NodeEvents.Chore
|
||||
|
||||
system.Reputation.Service = peer.Reputation.Service
|
||||
|
||||
// system.Metainfo.Metabase = api.Metainfo.Metabase
|
||||
|
@ -40,6 +40,7 @@ import (
|
||||
"storj.io/storj/satellite/metabase/zombiedeletion"
|
||||
"storj.io/storj/satellite/metainfo/expireddeletion"
|
||||
"storj.io/storj/satellite/metrics"
|
||||
"storj.io/storj/satellite/nodeevents"
|
||||
"storj.io/storj/satellite/orders"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
"storj.io/storj/satellite/overlay/straynodes"
|
||||
@ -87,6 +88,12 @@ type Core struct {
|
||||
DQStrayNodes *straynodes.Chore
|
||||
}
|
||||
|
||||
NodeEvents struct {
|
||||
DB nodeevents.DB
|
||||
Notifier nodeevents.Notifier
|
||||
Chore *nodeevents.Chore
|
||||
}
|
||||
|
||||
Metainfo struct {
|
||||
Metabase *metabase.DB
|
||||
SegmentLoop *segmentloop.Service
|
||||
@ -274,6 +281,19 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
|
||||
}
|
||||
}
|
||||
|
||||
{ // setup node events
|
||||
if config.Overlay.SendNodeEmails {
|
||||
peer.NodeEvents.Notifier = nodeevents.NewMockNotifier(log.Named("node events:mock notifier"))
|
||||
peer.NodeEvents.DB = peer.DB.NodeEvents()
|
||||
peer.NodeEvents.Chore = nodeevents.NewChore(peer.Log.Named("node events:chore"), peer.NodeEvents.DB, config.Console.SatelliteName, peer.NodeEvents.Notifier, config.NodeEvents)
|
||||
peer.Services.Add(lifecycle.Item{
|
||||
Name: "node-events:chore",
|
||||
Run: peer.NodeEvents.Chore.Run,
|
||||
Close: peer.NodeEvents.Chore.Close,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
{ // setup live accounting
|
||||
peer.LiveAccounting.Cache = liveAccounting
|
||||
}
|
||||
|
120
satellite/nodeevents/chore.go
Normal file
120
satellite/nodeevents/chore.go
Normal file
@ -0,0 +1,120 @@
|
||||
// Copyright (C) 2022 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package nodeevents
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/spacemonkeygo/monkit/v3"
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/sync2"
|
||||
"storj.io/common/uuid"
|
||||
)
|
||||
|
||||
var (
|
||||
// Error is the standard error class for node events.
|
||||
Error = errs.Class("node events")
|
||||
mon = monkit.Package()
|
||||
)
|
||||
|
||||
// Config contains configurable values for node events chore.
|
||||
type Config struct {
|
||||
Interval time.Duration `help:"how long to wait before checking the node events DB again if there is nothing to work on" default:"5m"`
|
||||
SelectionWaitPeriod time.Duration `help:"how long the earliest instance of an event for a particular email should exist in the DB before it is selected" default:"5m"`
|
||||
}
|
||||
|
||||
// Notifier notifies node operators about node events.
|
||||
type Notifier interface {
|
||||
// Notify notifies a node operator about an event that occurred on some of their nodes.
|
||||
Notify(ctx context.Context, satellite string, events []NodeEvent) (err error)
|
||||
}
|
||||
|
||||
// Chore is a chore that reads events from node events and sends emails.
|
||||
type Chore struct {
|
||||
log *zap.Logger
|
||||
db DB
|
||||
satellite string
|
||||
notifier Notifier
|
||||
config Config
|
||||
nowFn func() time.Time
|
||||
Loop *sync2.Cycle
|
||||
}
|
||||
|
||||
// NewChore is a constructor for Chore.
|
||||
func NewChore(log *zap.Logger, db DB, satellite string, notifier Notifier, config Config) *Chore {
|
||||
return &Chore{
|
||||
log: log,
|
||||
db: db,
|
||||
satellite: satellite,
|
||||
notifier: notifier,
|
||||
config: config,
|
||||
nowFn: time.Now,
|
||||
Loop: sync2.NewCycle(config.Interval),
|
||||
}
|
||||
}
|
||||
|
||||
// Run runs the chore.
|
||||
func (chore *Chore) Run(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
return chore.Loop.Run(ctx, chore.processWhileQueueHasItems)
|
||||
}
|
||||
|
||||
// processWhileQueueHasItems keeps calling process() until the DB is empty or something
|
||||
// else goes wrong in fetching from the queue.
|
||||
func (chore *Chore) processWhileQueueHasItems(ctx context.Context) error {
|
||||
for {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
n, err := chore.process(ctx)
|
||||
if err != nil {
|
||||
chore.log.Error("process", zap.Error(Error.Wrap(err)))
|
||||
return nil
|
||||
}
|
||||
if n == 0 {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// process picks items from the DB, combines them into an email and sends it.
|
||||
func (chore *Chore) process(ctx context.Context) (n int, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
batch, err := chore.db.GetNextBatch(ctx, chore.nowFn().Add(-chore.config.SelectionWaitPeriod))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
var rowIDs []uuid.UUID
|
||||
for _, event := range batch {
|
||||
rowIDs = append(rowIDs, event.ID)
|
||||
}
|
||||
|
||||
if err = chore.notifier.Notify(ctx, chore.satellite, batch); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
err = chore.db.UpdateEmailSent(ctx, rowIDs, chore.nowFn())
|
||||
return len(batch), err
|
||||
}
|
||||
|
||||
// SetNotifier sets the notifier on chore for testing.
|
||||
func (chore *Chore) SetNotifier(n Notifier) {
|
||||
chore.notifier = n
|
||||
}
|
||||
|
||||
// SetNow sets nowFn on chore for testing.
|
||||
func (chore *Chore) SetNow(f func() time.Time) {
|
||||
chore.nowFn = f
|
||||
}
|
||||
|
||||
// Close closes the chore.
|
||||
func (chore *Chore) Close() error {
|
||||
chore.Loop.Close()
|
||||
return nil
|
||||
}
|
101
satellite/nodeevents/chore_test.go
Normal file
101
satellite/nodeevents/chore_test.go
Normal file
@ -0,0 +1,101 @@
|
||||
// Copyright (C) 2022 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package nodeevents_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/nodeevents"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
"storj.io/storj/storagenode"
|
||||
)
|
||||
|
||||
type TestNotifier struct {
|
||||
notifications map[string][]nodeevents.NodeEvent
|
||||
}
|
||||
|
||||
func (tn *TestNotifier) Notify(ctx context.Context, satellite string, events []nodeevents.NodeEvent) error {
|
||||
if len(events) == 0 {
|
||||
return nil
|
||||
}
|
||||
email := events[0].Email
|
||||
n := tn.notifications[email]
|
||||
n = append(n, events...)
|
||||
tn.notifications[email] = n
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestNodeEventsChore(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 2, UplinkCount: 0,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
config.Overlay.SendNodeEmails = true
|
||||
config.NodeEvents.SelectionWaitPeriod = 5 * time.Minute
|
||||
},
|
||||
StorageNode: func(index int, config *storagenode.Config) {
|
||||
config.Operator.Email = "test@storj.test"
|
||||
},
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
sat := planet.Satellites[0]
|
||||
node0 := planet.StorageNodes[0]
|
||||
node1 := planet.StorageNodes[1]
|
||||
// email was reconfigured to be the same for all nodes.
|
||||
email := node0.Config.Operator.Email
|
||||
|
||||
chore := sat.NodeEvents.Chore
|
||||
chore.Loop.Pause()
|
||||
|
||||
tn := &TestNotifier{
|
||||
notifications: make(map[string][]nodeevents.NodeEvent),
|
||||
}
|
||||
chore.SetNotifier(tn)
|
||||
|
||||
// First, test that chore does not notify because not enough time has elapsed since the oldest event of type Disqualified,
|
||||
// with this email, was inserted.
|
||||
//
|
||||
// DQ nodes. Should create a node events in nodeevents DB.
|
||||
require.NoError(t, sat.Overlay.Service.DisqualifyNode(ctx, node0.ID(), overlay.DisqualificationReasonUnknown))
|
||||
require.NoError(t, sat.Overlay.Service.DisqualifyNode(ctx, node1.ID(), overlay.DisqualificationReasonUnknown))
|
||||
|
||||
// Trigger chore and check that Notifier.Notify was NOT called with the node events.
|
||||
chore.Loop.TriggerWait()
|
||||
|
||||
events := tn.notifications[email]
|
||||
require.Empty(t, events)
|
||||
|
||||
// Now, set nowFn on chore to 5 minutes in the future to test that chore does notify for the events.
|
||||
futureTime := func() time.Time {
|
||||
return time.Now().Add(5 * time.Minute)
|
||||
}
|
||||
chore.SetNow(futureTime)
|
||||
|
||||
// Trigger chore and check that Notifier.Notify was called with the node events.
|
||||
chore.Loop.TriggerWait()
|
||||
|
||||
events = tn.notifications[email]
|
||||
require.Len(t, events, 2)
|
||||
var foundEvent1, foundEvent2 bool
|
||||
for _, e := range events {
|
||||
require.Equal(t, email, e.Email)
|
||||
require.Equal(t, nodeevents.Disqualified, e.Event)
|
||||
if e.NodeID == node0.ID() {
|
||||
foundEvent1 = true
|
||||
} else if e.NodeID == node1.ID() {
|
||||
foundEvent2 = true
|
||||
}
|
||||
}
|
||||
require.True(t, foundEvent1)
|
||||
require.True(t, foundEvent2)
|
||||
})
|
||||
}
|
67
satellite/nodeevents/mocknotifier.go
Normal file
67
satellite/nodeevents/mocknotifier.go
Normal file
@ -0,0 +1,67 @@
|
||||
// Copyright (C) 2022 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package nodeevents
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// MockNotifier implements the Notifier interface.
|
||||
type MockNotifier struct {
|
||||
log *zap.Logger
|
||||
}
|
||||
|
||||
// NewMockNotifier is a constructor for MockNotifier.
|
||||
func NewMockNotifier(log *zap.Logger) *MockNotifier {
|
||||
return &MockNotifier{
|
||||
log: log,
|
||||
}
|
||||
}
|
||||
|
||||
// Notify stores the events in the Notifications field so they can be checked.
|
||||
func (m *MockNotifier) Notify(ctx context.Context, satellite string, events []NodeEvent) (err error) {
|
||||
var nodeIDs string
|
||||
if len(events) == 0 {
|
||||
return nil
|
||||
}
|
||||
for _, e := range events {
|
||||
nodeIDs = nodeIDs + e.NodeID.String() + ","
|
||||
}
|
||||
nodeIDs = strings.TrimSuffix(nodeIDs, ",")
|
||||
|
||||
eventString, err := typeToString(events[0].Event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m.log.Info("node operator notified", zap.String("email", events[0].Email), zap.String("event", eventString), zap.String("node IDs", nodeIDs))
|
||||
return nil
|
||||
}
|
||||
|
||||
func typeToString(event Type) (desc string, err error) {
|
||||
switch event {
|
||||
case Online:
|
||||
desc = "online"
|
||||
case Offline:
|
||||
desc = "offline"
|
||||
case Disqualified:
|
||||
desc = "disqualified"
|
||||
case UnknownAuditSuspended:
|
||||
desc = "unknown audit suspended"
|
||||
case UnknownAuditUnsuspended:
|
||||
desc = "unknown audit unsuspended"
|
||||
case OfflineSuspended:
|
||||
desc = "offline suspended"
|
||||
case OfflineUnsuspended:
|
||||
desc = "offline unsuspended"
|
||||
case BelowMinVersion:
|
||||
desc = "below minimum version"
|
||||
default:
|
||||
err = errs.New("event type has no description")
|
||||
}
|
||||
return desc, err
|
||||
}
|
@ -145,6 +145,7 @@ type Config struct {
|
||||
|
||||
Contact contact.Config
|
||||
Overlay overlay.Config
|
||||
NodeEvents nodeevents.Config
|
||||
StrayNodes straynodes.Config
|
||||
|
||||
Metainfo metainfo.Config
|
||||
|
6
scripts/testdata/satellite-config.yaml.lock
vendored
6
scripts/testdata/satellite-config.yaml.lock
vendored
@ -670,6 +670,12 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
|
||||
# path to log for oom notices
|
||||
# monkit.hw.oomlog: /var/log/kern.log
|
||||
|
||||
# how long to wait before checking the node events DB again if there is nothing to work on
|
||||
# node-events.interval: 5m0s
|
||||
|
||||
# how long the earliest instance of an event for a particular email should exist in the DB before it is selected
|
||||
# node-events.selection-wait-period: 5m0s
|
||||
|
||||
# encryption keys to encrypt info in orders
|
||||
# orders.encryption-keys: ""
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user