diff --git a/satellite/durability/observer.go b/satellite/durability/observer.go index d3f344f04..0581e001f 100644 --- a/satellite/durability/observer.go +++ b/satellite/durability/observer.go @@ -75,12 +75,12 @@ type ReportConfig struct { type Report struct { healthStat map[string]*HealthStat busFactor HealthStat - classifiers []NodeClassifier + classifier NodeClassifier aliasMap *metabase.NodeAliasMap nodes map[storj.NodeID]*nodeselection.SelectedNode db overlay.DB metabaseDB *metabase.DB - reporter func(n time.Time, name string, stat *HealthStat) + reporter func(n time.Time, class string, value string, stat *HealthStat) reportThreshold int busFactorThreshold int asOfSystemInterval time.Duration @@ -90,17 +90,19 @@ type Report struct { className map[classID]string // contains the available classes for each node alias. - classified [][]classID + classified []classID maxPieceCount int + class string } // NewDurability creates the new instance. -func NewDurability(db overlay.DB, metabaseDB *metabase.DB, classifiers []NodeClassifier, maxPieceCount int, reportThreshold int, busFactorThreshold int, asOfSystemInterval time.Duration) *Report { +func NewDurability(db overlay.DB, metabaseDB *metabase.DB, class string, classifier NodeClassifier, maxPieceCount int, reportThreshold int, busFactorThreshold int, asOfSystemInterval time.Duration) *Report { return &Report{ + class: class, db: db, metabaseDB: metabaseDB, - classifiers: classifiers, + classifier: classifier, reportThreshold: reportThreshold, busFactorThreshold: busFactorThreshold, asOfSystemInterval: asOfSystemInterval, @@ -137,35 +139,33 @@ func (c *Report) resetStat() { } func (c *Report) classifyNodeAliases() { - c.classID = make(map[string]classID, len(c.classifiers)) - c.className = make(map[classID]string, len(c.classifiers)) + c.classID = make(map[string]classID) + c.className = make(map[classID]string) - c.classified = make([][]classID, c.aliasMap.Max()+1) + c.classID["unclassified"] = 1 + c.className[0] = "unclassified" + + c.classified = make([]classID, c.aliasMap.Max()+1) for _, node := range c.nodes { alias, ok := c.aliasMap.Alias(node.ID) if !ok { continue } - classes := make([]classID, len(c.classifiers)) - for i, group := range c.classifiers { - class := group(node) - id, ok := c.classID[class] - if !ok { - id = classID(len(c.classID)) - c.className[id] = class - c.classID[class] = id - } - classes[i] = id + class := c.classifier(node) + id, ok := c.classID[class] + if !ok { + id = classID(len(c.classID)) + c.className[id] = class + c.classID[class] = id } - c.classified[alias] = classes + c.classified[alias] = id } } // Fork implements rangedloop.Observer. func (c *Report) Fork(ctx context.Context) (rangedloop.Partial, error) { d := &ObserverFork{ - classifiers: c.classifiers, aliasMap: c.aliasMap, nodes: c.nodes, classifierCache: make([][]string, c.aliasMap.Max()+1), @@ -206,16 +206,21 @@ func (c *Report) Join(ctx context.Context, partial rangedloop.Partial) (err erro func (c *Report) Finish(ctx context.Context) error { reportTime := time.Now() for name, stat := range c.healthStat { - c.reporter(reportTime, name, stat) + c.reporter(reportTime, c.class, name, stat) } return nil } // TestChangeReporter modifies the reporter for unit tests. -func (c *Report) TestChangeReporter(r func(n time.Time, name string, stat *HealthStat)) { +func (c *Report) TestChangeReporter(r func(n time.Time, class string, name string, stat *HealthStat)) { c.reporter = r } +// GetClass return with the class instance name (like last_net or country). +func (c *Report) GetClass() string { + return c.class +} + // classID is a fork level short identifier for each class. type classID int32 @@ -225,7 +230,6 @@ type ObserverFork struct { healthStat []HealthStat busFactor HealthStat - classifiers []NodeClassifier aliasMap *metabase.NodeAliasMap nodes map[storj.NodeID]*nodeselection.SelectedNode classifierCache [][]string @@ -234,7 +238,7 @@ type ObserverFork struct { reportThreshold int busFactorThreshold int - classified [][]classID + classified []classID } // Process implements rangedloop.Partial. @@ -254,16 +258,15 @@ func (c *ObserverFork) Process(ctx context.Context, segments []rangedloop.Segmen continue } - classes := c.classified[piece.Alias] + class := c.classified[piece.Alias] // unavailable/offline nodes were not classified - if len(classes) > 0 { + if class > 0 { healthyPieceCount++ } - for _, class := range classes { - controlledByClass[class]++ - } + controlledByClass[class]++ + } busFactorGroups := c.busFactorCache @@ -307,9 +310,10 @@ func (c *ObserverFork) Process(ctx context.Context, segments []rangedloop.Segmen return nil } -func reportToEventkit(n time.Time, name string, stat *HealthStat) { +func reportToEventkit(n time.Time, class string, name string, stat *HealthStat) { ek.Event("durability", - eventkit.String("name", name), + eventkit.String("class", class), + eventkit.String("value", name), eventkit.String("exemplar", stat.Exemplar), eventkit.Timestamp("report_time", n), eventkit.Int64("min", int64(stat.Min())), diff --git a/satellite/durability/observer_integration_test.go b/satellite/durability/observer_integration_test.go index 157283dfb..90d6c47c2 100644 --- a/satellite/durability/observer_integration_test.go +++ b/satellite/durability/observer_integration_test.go @@ -64,17 +64,19 @@ func TestDurabilityIntegration(t *testing.T) { } result := make(map[string]*durability.HealthStat) - planet.Satellites[0].RangedLoop.DurabilityReport.Observer.TestChangeReporter(func(n time.Time, name string, stat *durability.HealthStat) { - result[name] = stat - }) + for _, observer := range planet.Satellites[0].RangedLoop.DurabilityReport.Observer { + observer.TestChangeReporter(func(n time.Time, class string, value string, stat *durability.HealthStat) { + result[value] = stat + }) + } rangedLoopService := planet.Satellites[0].RangedLoop.RangedLoop.Service _, err := rangedLoopService.RunOnce(ctx) require.Len(t, result, 15) // one or two pieces are controlled out of the 5-6 --> 3 or 4 pieces are available without HU nodes - require.True(t, result["c:HU"].Min() > 2) - require.True(t, result["c:HU"].Min() < 5) + require.True(t, result["HU"].Min() > 2) + require.True(t, result["HU"].Min() < 5) require.NoError(t, err) }) } diff --git a/satellite/durability/observer_test.go b/satellite/durability/observer_test.go index dfa960ffb..56049c5c7 100644 --- a/satellite/durability/observer_test.go +++ b/satellite/durability/observer_test.go @@ -64,10 +64,9 @@ func TestDurability(t *testing.T) { } ctx := testcontext.New(t) - c := NewDurability(nil, nil, []NodeClassifier{ - func(node *nodeselection.SelectedNode) string { - return "net:" + node.LastNet - }}, 110, 0, 0, 0) + c := NewDurability(nil, nil, "net", func(node *nodeselection.SelectedNode) string { + return node.LastNet + }, 110, 0, 0, 0) c.aliasMap = metabase.NewNodeAliasMap(aliases) for _, node := range storageNodes { @@ -98,11 +97,11 @@ func TestDurability(t *testing.T) { require.NoError(t, err) } - require.NotNil(t, c.healthStat["net:127.0.0.0"]) - require.Equal(t, 1, c.healthStat["net:127.0.0.0"].Min()) - require.Equal(t, segment1.StreamID.String()+"/0", c.healthStat["net:127.0.0.0"].Exemplar) - require.Equal(t, 2, c.healthStat["net:127.0.1.0"].Min()) - require.Equal(t, 3, c.healthStat["net:127.0.2.0"].Min()) + require.NotNil(t, c.healthStat["127.0.0.0"]) + require.Equal(t, 1, c.healthStat["127.0.0.0"].Min()) + require.Equal(t, segment1.StreamID.String()+"/0", c.healthStat["127.0.0.0"].Exemplar) + require.Equal(t, 2, c.healthStat["127.0.1.0"].Min()) + require.Equal(t, 3, c.healthStat["127.0.2.0"].Min()) // usually called with c.Start() c.resetStat() @@ -113,8 +112,7 @@ func TestDurability(t *testing.T) { require.NoError(t, err) // second run supposed to have zero stat. - require.Nil(t, c.healthStat["net:127.0.0.0"]) - + require.Nil(t, c.healthStat["127.0.0.0"]) } func TestDurabilityUnknownNode(t *testing.T) { @@ -132,10 +130,9 @@ func TestDurabilityUnknownNode(t *testing.T) { }) ctx := testcontext.New(t) - c := NewDurability(nil, nil, []NodeClassifier{ - func(node *nodeselection.SelectedNode) string { - return "net:" + node.LastNet - }}, 110, 0, 0, 0) + c := NewDurability(nil, nil, "net", func(node *nodeselection.SelectedNode) string { + return node.LastNet + }, 110, 0, 0, 0) c.aliasMap = metabase.NewNodeAliasMap(aliases) for _, node := range storageNodes { @@ -174,7 +171,7 @@ func TestDurabilityUnknownNode(t *testing.T) { err = c.Join(ctx, fork) require.NoError(t, err) // note: the newly created node (alias 9999) is not considered. - require.Equal(t, 0, c.healthStat["net:127.0.0.1"].Min()) + require.Equal(t, 0, c.healthStat["127.0.0.1"].Min()) } func TestBusFactor(t *testing.T) { @@ -182,7 +179,7 @@ func TestBusFactor(t *testing.T) { f := ObserverFork{} for i := 0; i < 100; i++ { - f.classified = append(f.classified, []classID{classID((i))}) + f.classified = append(f.classified, classID(i)) } f.controlledByClassCache = make([]int32, 100) f.busFactorCache = make([]int32, 300) @@ -295,11 +292,6 @@ func BenchmarkDurabilityProcess(b *testing.B) { aliasMap: aliases, nodes: nodeMap, classifierCache: make([][]string, aliases.Max()), - classifiers: []NodeClassifier{ - func(node *nodeselection.SelectedNode) string { - return "email:" + node.Email - }, - }, } b.ResetTimer() diff --git a/satellite/metabase/rangedloop/observerstats.go b/satellite/metabase/rangedloop/observerstats.go index 4320be6bd..44dc5c93e 100644 --- a/satellite/metabase/rangedloop/observerstats.go +++ b/satellite/metabase/rangedloop/observerstats.go @@ -19,7 +19,7 @@ var ( func sendObserverDurations(observerDurations []ObserverDuration) { for _, od := range observerDurations { ev.Event("rangedloop", - eventkit.String("observer", fmt.Sprintf("%T", od.Observer)), + eventkit.String("observer", observerName(od.Observer)), eventkit.Duration("duration", od.Duration)) } @@ -44,7 +44,7 @@ func (o *completedObserverStats) Stats(cb func(key monkit.SeriesKey, field strin // if there are no completed observers yet, no statistics will be sent for _, observerDuration := range o.observerDurations { key := monkit.NewSeriesKey("completed-observer-duration") - key = key.WithTag("observer", fmt.Sprintf("%T", observerDuration.Observer)) + key = key.WithTag("observer", observerName(observerDuration.Observer)) cb(key, "duration", observerDuration.Duration.Seconds()) } @@ -57,3 +57,16 @@ func (o *completedObserverStats) setObserverDurations(observerDurations []Observ o.observerDurations = observerDurations } + +type withClass interface { + GetClass() string +} + +func observerName(o Observer) string { + name := fmt.Sprintf("%T", o) + // durability observers are per class instances. + if dr, ok := o.(withClass); ok { + name += fmt.Sprintf("[%s]", dr.GetClass()) + } + return name +} diff --git a/satellite/rangedloop.go b/satellite/rangedloop.go index 4f0e699f5..088b51bae 100644 --- a/satellite/rangedloop.go +++ b/satellite/rangedloop.go @@ -73,7 +73,7 @@ type RangedLoop struct { } DurabilityReport struct { - Observer *durability.Report + Observer []*durability.Report } RangedLoop struct { @@ -147,20 +147,23 @@ func NewRangedLoop(log *zap.Logger, db DB, metabaseDB *metabase.DB, config *Conf } { // setup - peer.DurabilityReport.Observer = durability.NewDurability(db.OverlayCache(), metabaseDB, []durability.NodeClassifier{ - func(node *nodeselection.SelectedNode) string { - return "e:" + node.Email + classes := map[string]func(node *nodeselection.SelectedNode) string{ + "email": func(node *nodeselection.SelectedNode) string { + return node.Email }, - func(node *nodeselection.SelectedNode) string { - return "w:" + node.Wallet + "wallet": func(node *nodeselection.SelectedNode) string { + return node.Wallet }, - func(node *nodeselection.SelectedNode) string { - return "n:" + node.LastNet + "net": func(node *nodeselection.SelectedNode) string { + return node.LastNet }, - func(node *nodeselection.SelectedNode) string { - return "c:" + node.CountryCode.String() + "country": func(node *nodeselection.SelectedNode) string { + return node.CountryCode.String() }, - }, config.Metainfo.RS.Total, config.Metainfo.RS.Repair, config.Metainfo.RS.Repair-config.Metainfo.RS.Min, config.RangedLoop.AsOfSystemInterval) + } + for class, f := range classes { + peer.DurabilityReport.Observer = append(peer.DurabilityReport.Observer, durability.NewDurability(db.OverlayCache(), metabaseDB, class, f, config.Metainfo.RS.Total, config.Metainfo.RS.Repair, config.Metainfo.RS.Repair-config.Metainfo.RS.Min, config.RangedLoop.AsOfSystemInterval)) + } } { // setup overlay @@ -226,7 +229,9 @@ func NewRangedLoop(log *zap.Logger, db DB, metabaseDB *metabase.DB, config *Conf } if config.DurabilityReport.Enabled { - observers = append(observers, peer.DurabilityReport.Observer) + for _, observer := range peer.DurabilityReport.Observer { + observers = append(observers, observer) + } } segments := rangedloop.NewMetabaseRangeSplitter(metabaseDB, config.RangedLoop.AsOfSystemInterval, config.RangedLoop.BatchSize)