storj/satellite/durability/observer.go
Márton Elek 5c49ba1d85 satellite/durability: ignore information from new nodes
To get better performance, we pre-load all nodealias/node information at the beginning of the segment loop.

It's possible that we receive a new node alias from the segment table what we are not fully aware of (yet).

The easiest solution is just ignoring. New risks/threats can be detected by a new execution cycle.

Change-Id: Ib54f7edc46eedbab6d13b4d651aaac1425994940
2023-10-19 16:38:39 +00:00

262 lines
7.4 KiB
Go

// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package durability
import (
"context"
"time"
"github.com/jtolio/eventkit"
"github.com/zeebo/errs"
"storj.io/common/storj"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/rangedloop"
"storj.io/storj/satellite/nodeselection"
"storj.io/storj/satellite/overlay"
)
var ek = eventkit.Package()
// HealthStat collects the availability conditions for one class (for example: nodes with the same owner).
type HealthStat struct {
// because 0 means uninitialized, we store the min +1
minPlusOne int
}
// Update updates the stat with one measurement: number of pieces which are available even without the nodes of the selected class.
func (h *HealthStat) Update(num int) {
if num < h.minPlusOne-1 || h.minPlusOne == 0 {
h.minPlusOne = num + 1
}
}
// Merge can merge two stat to one, without losing information.
func (h *HealthStat) Merge(stat *HealthStat) {
if stat.minPlusOne < h.minPlusOne && stat.minPlusOne > 0 {
h.minPlusOne = stat.minPlusOne
}
}
// Min returns the minimal number.
func (h *HealthStat) Min() int {
return h.minPlusOne - 1
}
// Unused returns true when stat is uninitialized (-1) and was not updated with any number.
func (h *HealthStat) Unused() bool {
return h.minPlusOne == 0
}
// NodeClassifier identifies a risk class (for example an owner, or country) of the SelectedNode.
type NodeClassifier func(node *nodeselection.SelectedNode) string
// ReportConfig configures durability report.
type ReportConfig struct {
Enabled bool `help:"whether to enable durability report (rangedloop observer)" default:"true"`
}
// Report is a calculator (rangloop.Observer) which checks the availability of pieces without certain nodes.
// It can answer the following question:
// 1. loosing a given group of nodes (all nodes of one country or all nodes of one owner)...
// 2. what will be the lowest humber of healhty pieces, checking all the segments.
//
// Example: we have one segment where 80 pieces are stored, but 42 of them are in Germany.
//
// in this case this reporter will return 38 for the class "country:DE" (assuming all the other segments are more lucky).
type Report struct {
healthStat map[string]*HealthStat
classifiers []NodeClassifier
aliasMap *metabase.NodeAliasMap
nodes map[storj.NodeID]*nodeselection.SelectedNode
db overlay.DB
metabaseDB *metabase.DB
reporter func(name string, stat *HealthStat)
reportThreshold int
asOfSystemInterval time.Duration
}
// NewDurability creates the new instance.
func NewDurability(db overlay.DB, metabaseDB *metabase.DB, classifiers []NodeClassifier, reportThreshold int, asOfSystemInterval time.Duration) *Report {
return &Report{
db: db,
metabaseDB: metabaseDB,
classifiers: classifiers,
reportThreshold: reportThreshold,
asOfSystemInterval: asOfSystemInterval,
nodes: make(map[storj.NodeID]*nodeselection.SelectedNode),
healthStat: make(map[string]*HealthStat),
reporter: reportToEventkit,
}
}
// Start implements rangedloop.Observer.
func (c *Report) Start(ctx context.Context, startTime time.Time) error {
nodes, err := c.db.GetParticipatingNodes(ctx, -12*time.Hour, c.asOfSystemInterval)
if err != nil {
return errs.Wrap(err)
}
c.nodes = map[storj.NodeID]*nodeselection.SelectedNode{}
for ix := range nodes {
c.nodes[nodes[ix].ID] = &nodes[ix]
}
aliasMap, err := c.metabaseDB.LatestNodesAliasMap(ctx)
if err != nil {
return errs.Wrap(err)
}
c.aliasMap = aliasMap
return nil
}
// Fork implements rangedloop.Observer.
func (c *Report) Fork(ctx context.Context) (rangedloop.Partial, error) {
d := &ObserverFork{
classifiers: c.classifiers,
healthStat: nil,
aliasMap: c.aliasMap,
nodes: c.nodes,
classifierCache: make([][]string, c.aliasMap.Max()+1),
reportThreshold: c.reportThreshold,
}
d.classifyNodeAliases()
return d, nil
}
// Join implements rangedloop.Observer.
func (c *Report) Join(ctx context.Context, partial rangedloop.Partial) (err error) {
defer mon.Task()(&ctx)(&err)
fork := partial.(*ObserverFork)
for cid, stat := range fork.healthStat {
if stat.Unused() {
continue
}
name := fork.className[classID(cid)]
existing, found := c.healthStat[name]
if !found {
c.healthStat[name] = &HealthStat{
minPlusOne: stat.minPlusOne,
}
} else {
existing.Merge(&stat)
}
}
return nil
}
// Finish implements rangedloop.Observer.
func (c *Report) Finish(ctx context.Context) error {
for name, stat := range c.healthStat {
c.reporter(name, stat)
}
return nil
}
// TestChangeReporter modifies the reporter for unit tests.
func (c *Report) TestChangeReporter(r func(name string, stat *HealthStat)) {
c.reporter = r
}
// classID is a fork level short identifier for each class.
type classID int32
// ObserverFork is the durability calculator for each segment range.
type ObserverFork struct {
// map between classes (like "country:hu" and integer IDs)
classID map[string]classID
className map[classID]string
controlledByClassCache []int32
healthStat []HealthStat
classifiers []NodeClassifier
aliasMap *metabase.NodeAliasMap
nodes map[storj.NodeID]*nodeselection.SelectedNode
classifierCache [][]string
// contains the available classes for each node alias.
classified [][]classID
reportThreshold int
}
func (c *ObserverFork) classifyNodeAliases() {
c.classID = make(map[string]classID, len(c.classifiers))
c.className = make(map[classID]string, len(c.classifiers))
c.classified = make([][]classID, c.aliasMap.Max()+1)
for _, node := range c.nodes {
alias, ok := c.aliasMap.Alias(node.ID)
if !ok {
continue
}
classes := make([]classID, len(c.classifiers))
for i, group := range c.classifiers {
class := group(node)
id, ok := c.classID[class]
if !ok {
id = classID(len(c.classID))
c.className[id] = class
c.classID[class] = id
}
classes[i] = id
}
c.classified[alias] = classes
}
c.healthStat = make([]HealthStat, len(c.classID))
c.controlledByClassCache = make([]int32, len(c.classID))
}
// Process implements rangedloop.Partial.
func (c *ObserverFork) Process(ctx context.Context, segments []rangedloop.Segment) (err error) {
controlledByClass := c.controlledByClassCache
for i := range segments {
s := &segments[i]
healthyPieceCount := 0
for _, piece := range s.AliasPieces {
if len(c.classified) <= int(piece.Alias) {
// this is a new node, but we can ignore it.
// will be included in the next execution cycle.
continue
}
classes := c.classified[piece.Alias]
// unavailable/offline nodes were not classified
if len(classes) > 0 {
healthyPieceCount++
}
for _, class := range classes {
controlledByClass[class]++
}
}
for classID, count := range controlledByClass {
if count == 0 {
continue
}
// reset the value for the next iteration
controlledByClass[classID] = 0
diff := healthyPieceCount - int(count)
// if value is high, it's not a problem. faster to ignore it...
if c.reportThreshold > 0 && diff > c.reportThreshold {
continue
}
c.healthStat[classID].Update(diff)
}
}
return nil
}
func reportToEventkit(name string, stat *HealthStat) {
ek.Event("durability", eventkit.String("name", name), eventkit.Int64("min", int64(stat.Min())))
}
var _ rangedloop.Observer = &Report{}
var _ rangedloop.Partial = &ObserverFork{}