satellite/durability: use single classifier per observer instance

the new bus_factor calculation doesn't make sense with different classes, as we have overlaps.

For example: it can detect a risk if we loose one country and one different subnet (with possible overlap).

It's better to calculate the stat and bus_factor per class (net, country, ...).

It also makes it easier to measure execution time per class.

Change-Id: I7d4d5f7cb811cd50c5831077b43e001908aab96b
This commit is contained in:
Márton Elek 2023-11-20 09:58:58 +01:00 committed by Storj Robot
parent 7c25e3733a
commit 0f4f1ddde8
5 changed files with 88 additions and 72 deletions

View File

@ -75,12 +75,12 @@ type ReportConfig struct {
type Report struct {
healthStat map[string]*HealthStat
busFactor HealthStat
classifiers []NodeClassifier
classifier NodeClassifier
aliasMap *metabase.NodeAliasMap
nodes map[storj.NodeID]*nodeselection.SelectedNode
db overlay.DB
metabaseDB *metabase.DB
reporter func(n time.Time, name string, stat *HealthStat)
reporter func(n time.Time, class string, value string, stat *HealthStat)
reportThreshold int
busFactorThreshold int
asOfSystemInterval time.Duration
@ -90,17 +90,19 @@ type Report struct {
className map[classID]string
// contains the available classes for each node alias.
classified [][]classID
classified []classID
maxPieceCount int
class string
}
// NewDurability creates the new instance.
func NewDurability(db overlay.DB, metabaseDB *metabase.DB, classifiers []NodeClassifier, maxPieceCount int, reportThreshold int, busFactorThreshold int, asOfSystemInterval time.Duration) *Report {
func NewDurability(db overlay.DB, metabaseDB *metabase.DB, class string, classifier NodeClassifier, maxPieceCount int, reportThreshold int, busFactorThreshold int, asOfSystemInterval time.Duration) *Report {
return &Report{
class: class,
db: db,
metabaseDB: metabaseDB,
classifiers: classifiers,
classifier: classifier,
reportThreshold: reportThreshold,
busFactorThreshold: busFactorThreshold,
asOfSystemInterval: asOfSystemInterval,
@ -137,35 +139,33 @@ func (c *Report) resetStat() {
}
func (c *Report) classifyNodeAliases() {
c.classID = make(map[string]classID, len(c.classifiers))
c.className = make(map[classID]string, len(c.classifiers))
c.classID = make(map[string]classID)
c.className = make(map[classID]string)
c.classified = make([][]classID, c.aliasMap.Max()+1)
c.classID["unclassified"] = 1
c.className[0] = "unclassified"
c.classified = make([]classID, c.aliasMap.Max()+1)
for _, node := range c.nodes {
alias, ok := c.aliasMap.Alias(node.ID)
if !ok {
continue
}
classes := make([]classID, len(c.classifiers))
for i, group := range c.classifiers {
class := group(node)
class := c.classifier(node)
id, ok := c.classID[class]
if !ok {
id = classID(len(c.classID))
c.className[id] = class
c.classID[class] = id
}
classes[i] = id
}
c.classified[alias] = classes
c.classified[alias] = id
}
}
// Fork implements rangedloop.Observer.
func (c *Report) Fork(ctx context.Context) (rangedloop.Partial, error) {
d := &ObserverFork{
classifiers: c.classifiers,
aliasMap: c.aliasMap,
nodes: c.nodes,
classifierCache: make([][]string, c.aliasMap.Max()+1),
@ -206,16 +206,21 @@ func (c *Report) Join(ctx context.Context, partial rangedloop.Partial) (err erro
func (c *Report) Finish(ctx context.Context) error {
reportTime := time.Now()
for name, stat := range c.healthStat {
c.reporter(reportTime, name, stat)
c.reporter(reportTime, c.class, name, stat)
}
return nil
}
// TestChangeReporter modifies the reporter for unit tests.
func (c *Report) TestChangeReporter(r func(n time.Time, name string, stat *HealthStat)) {
func (c *Report) TestChangeReporter(r func(n time.Time, class string, name string, stat *HealthStat)) {
c.reporter = r
}
// GetClass return with the class instance name (like last_net or country).
func (c *Report) GetClass() string {
return c.class
}
// classID is a fork level short identifier for each class.
type classID int32
@ -225,7 +230,6 @@ type ObserverFork struct {
healthStat []HealthStat
busFactor HealthStat
classifiers []NodeClassifier
aliasMap *metabase.NodeAliasMap
nodes map[storj.NodeID]*nodeselection.SelectedNode
classifierCache [][]string
@ -234,7 +238,7 @@ type ObserverFork struct {
reportThreshold int
busFactorThreshold int
classified [][]classID
classified []classID
}
// Process implements rangedloop.Partial.
@ -254,16 +258,15 @@ func (c *ObserverFork) Process(ctx context.Context, segments []rangedloop.Segmen
continue
}
classes := c.classified[piece.Alias]
class := c.classified[piece.Alias]
// unavailable/offline nodes were not classified
if len(classes) > 0 {
if class > 0 {
healthyPieceCount++
}
for _, class := range classes {
controlledByClass[class]++
}
}
busFactorGroups := c.busFactorCache
@ -307,9 +310,10 @@ func (c *ObserverFork) Process(ctx context.Context, segments []rangedloop.Segmen
return nil
}
func reportToEventkit(n time.Time, name string, stat *HealthStat) {
func reportToEventkit(n time.Time, class string, name string, stat *HealthStat) {
ek.Event("durability",
eventkit.String("name", name),
eventkit.String("class", class),
eventkit.String("value", name),
eventkit.String("exemplar", stat.Exemplar),
eventkit.Timestamp("report_time", n),
eventkit.Int64("min", int64(stat.Min())),

View File

@ -64,17 +64,19 @@ func TestDurabilityIntegration(t *testing.T) {
}
result := make(map[string]*durability.HealthStat)
planet.Satellites[0].RangedLoop.DurabilityReport.Observer.TestChangeReporter(func(n time.Time, name string, stat *durability.HealthStat) {
result[name] = stat
for _, observer := range planet.Satellites[0].RangedLoop.DurabilityReport.Observer {
observer.TestChangeReporter(func(n time.Time, class string, value string, stat *durability.HealthStat) {
result[value] = stat
})
}
rangedLoopService := planet.Satellites[0].RangedLoop.RangedLoop.Service
_, err := rangedLoopService.RunOnce(ctx)
require.Len(t, result, 15)
// one or two pieces are controlled out of the 5-6 --> 3 or 4 pieces are available without HU nodes
require.True(t, result["c:HU"].Min() > 2)
require.True(t, result["c:HU"].Min() < 5)
require.True(t, result["HU"].Min() > 2)
require.True(t, result["HU"].Min() < 5)
require.NoError(t, err)
})
}

View File

@ -64,10 +64,9 @@ func TestDurability(t *testing.T) {
}
ctx := testcontext.New(t)
c := NewDurability(nil, nil, []NodeClassifier{
func(node *nodeselection.SelectedNode) string {
return "net:" + node.LastNet
}}, 110, 0, 0, 0)
c := NewDurability(nil, nil, "net", func(node *nodeselection.SelectedNode) string {
return node.LastNet
}, 110, 0, 0, 0)
c.aliasMap = metabase.NewNodeAliasMap(aliases)
for _, node := range storageNodes {
@ -98,11 +97,11 @@ func TestDurability(t *testing.T) {
require.NoError(t, err)
}
require.NotNil(t, c.healthStat["net:127.0.0.0"])
require.Equal(t, 1, c.healthStat["net:127.0.0.0"].Min())
require.Equal(t, segment1.StreamID.String()+"/0", c.healthStat["net:127.0.0.0"].Exemplar)
require.Equal(t, 2, c.healthStat["net:127.0.1.0"].Min())
require.Equal(t, 3, c.healthStat["net:127.0.2.0"].Min())
require.NotNil(t, c.healthStat["127.0.0.0"])
require.Equal(t, 1, c.healthStat["127.0.0.0"].Min())
require.Equal(t, segment1.StreamID.String()+"/0", c.healthStat["127.0.0.0"].Exemplar)
require.Equal(t, 2, c.healthStat["127.0.1.0"].Min())
require.Equal(t, 3, c.healthStat["127.0.2.0"].Min())
// usually called with c.Start()
c.resetStat()
@ -113,8 +112,7 @@ func TestDurability(t *testing.T) {
require.NoError(t, err)
// second run supposed to have zero stat.
require.Nil(t, c.healthStat["net:127.0.0.0"])
require.Nil(t, c.healthStat["127.0.0.0"])
}
func TestDurabilityUnknownNode(t *testing.T) {
@ -132,10 +130,9 @@ func TestDurabilityUnknownNode(t *testing.T) {
})
ctx := testcontext.New(t)
c := NewDurability(nil, nil, []NodeClassifier{
func(node *nodeselection.SelectedNode) string {
return "net:" + node.LastNet
}}, 110, 0, 0, 0)
c := NewDurability(nil, nil, "net", func(node *nodeselection.SelectedNode) string {
return node.LastNet
}, 110, 0, 0, 0)
c.aliasMap = metabase.NewNodeAliasMap(aliases)
for _, node := range storageNodes {
@ -174,7 +171,7 @@ func TestDurabilityUnknownNode(t *testing.T) {
err = c.Join(ctx, fork)
require.NoError(t, err)
// note: the newly created node (alias 9999) is not considered.
require.Equal(t, 0, c.healthStat["net:127.0.0.1"].Min())
require.Equal(t, 0, c.healthStat["127.0.0.1"].Min())
}
func TestBusFactor(t *testing.T) {
@ -182,7 +179,7 @@ func TestBusFactor(t *testing.T) {
f := ObserverFork{}
for i := 0; i < 100; i++ {
f.classified = append(f.classified, []classID{classID((i))})
f.classified = append(f.classified, classID(i))
}
f.controlledByClassCache = make([]int32, 100)
f.busFactorCache = make([]int32, 300)
@ -295,11 +292,6 @@ func BenchmarkDurabilityProcess(b *testing.B) {
aliasMap: aliases,
nodes: nodeMap,
classifierCache: make([][]string, aliases.Max()),
classifiers: []NodeClassifier{
func(node *nodeselection.SelectedNode) string {
return "email:" + node.Email
},
},
}
b.ResetTimer()

View File

@ -19,7 +19,7 @@ var (
func sendObserverDurations(observerDurations []ObserverDuration) {
for _, od := range observerDurations {
ev.Event("rangedloop",
eventkit.String("observer", fmt.Sprintf("%T", od.Observer)),
eventkit.String("observer", observerName(od.Observer)),
eventkit.Duration("duration", od.Duration))
}
@ -44,7 +44,7 @@ func (o *completedObserverStats) Stats(cb func(key monkit.SeriesKey, field strin
// if there are no completed observers yet, no statistics will be sent
for _, observerDuration := range o.observerDurations {
key := monkit.NewSeriesKey("completed-observer-duration")
key = key.WithTag("observer", fmt.Sprintf("%T", observerDuration.Observer))
key = key.WithTag("observer", observerName(observerDuration.Observer))
cb(key, "duration", observerDuration.Duration.Seconds())
}
@ -57,3 +57,16 @@ func (o *completedObserverStats) setObserverDurations(observerDurations []Observ
o.observerDurations = observerDurations
}
type withClass interface {
GetClass() string
}
func observerName(o Observer) string {
name := fmt.Sprintf("%T", o)
// durability observers are per class instances.
if dr, ok := o.(withClass); ok {
name += fmt.Sprintf("[%s]", dr.GetClass())
}
return name
}

View File

@ -73,7 +73,7 @@ type RangedLoop struct {
}
DurabilityReport struct {
Observer *durability.Report
Observer []*durability.Report
}
RangedLoop struct {
@ -147,20 +147,23 @@ func NewRangedLoop(log *zap.Logger, db DB, metabaseDB *metabase.DB, config *Conf
}
{ // setup
peer.DurabilityReport.Observer = durability.NewDurability(db.OverlayCache(), metabaseDB, []durability.NodeClassifier{
func(node *nodeselection.SelectedNode) string {
return "e:" + node.Email
classes := map[string]func(node *nodeselection.SelectedNode) string{
"email": func(node *nodeselection.SelectedNode) string {
return node.Email
},
func(node *nodeselection.SelectedNode) string {
return "w:" + node.Wallet
"wallet": func(node *nodeselection.SelectedNode) string {
return node.Wallet
},
func(node *nodeselection.SelectedNode) string {
return "n:" + node.LastNet
"net": func(node *nodeselection.SelectedNode) string {
return node.LastNet
},
func(node *nodeselection.SelectedNode) string {
return "c:" + node.CountryCode.String()
"country": func(node *nodeselection.SelectedNode) string {
return node.CountryCode.String()
},
}, config.Metainfo.RS.Total, config.Metainfo.RS.Repair, config.Metainfo.RS.Repair-config.Metainfo.RS.Min, config.RangedLoop.AsOfSystemInterval)
}
for class, f := range classes {
peer.DurabilityReport.Observer = append(peer.DurabilityReport.Observer, durability.NewDurability(db.OverlayCache(), metabaseDB, class, f, config.Metainfo.RS.Total, config.Metainfo.RS.Repair, config.Metainfo.RS.Repair-config.Metainfo.RS.Min, config.RangedLoop.AsOfSystemInterval))
}
}
{ // setup overlay
@ -226,7 +229,9 @@ func NewRangedLoop(log *zap.Logger, db DB, metabaseDB *metabase.DB, config *Conf
}
if config.DurabilityReport.Enabled {
observers = append(observers, peer.DurabilityReport.Observer)
for _, observer := range peer.DurabilityReport.Observer {
observers = append(observers, observer)
}
}
segments := rangedloop.NewMetabaseRangeSplitter(metabaseDB, config.RangedLoop.AsOfSystemInterval, config.RangedLoop.BatchSize)