satellite/durability: use process level classID cache (instead fork level)
Classifier of durability is sg. like "net:1.3.4.1" or "country:HU". To make the calculation faster we use arrays instead of maps, which means that we assign a uinique index to all of these strings (classes). As Egon suggested earlier, we can do this mapping only once (per process), not for each fork. Not a big deal performance-wise, as we have limited number of forks, which are initialized once per 5-10 hours, but the code is more readable and clean. Change-Id: Id081846b5d97dae8009aeeecbcc63cb713bed294
This commit is contained in:
parent
b6e4f4a02d
commit
23c592adeb
@ -81,6 +81,13 @@ type Report struct {
|
||||
reporter func(n time.Time, name string, stat *HealthStat)
|
||||
reportThreshold int
|
||||
asOfSystemInterval time.Duration
|
||||
|
||||
// map between classes (like "country:hu" and integer IDs)
|
||||
classID map[string]classID
|
||||
className map[classID]string
|
||||
|
||||
// contains the available classes for each node alias.
|
||||
classified [][]classID
|
||||
}
|
||||
|
||||
// NewDurability creates the new instance.
|
||||
@ -112,20 +119,48 @@ func (c *Report) Start(ctx context.Context, startTime time.Time) error {
|
||||
return errs.Wrap(err)
|
||||
}
|
||||
c.aliasMap = aliasMap
|
||||
c.classifyNodeAliases()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Report) classifyNodeAliases() {
|
||||
c.classID = make(map[string]classID, len(c.classifiers))
|
||||
c.className = make(map[classID]string, len(c.classifiers))
|
||||
|
||||
c.classified = make([][]classID, c.aliasMap.Max()+1)
|
||||
for _, node := range c.nodes {
|
||||
alias, ok := c.aliasMap.Alias(node.ID)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
classes := make([]classID, len(c.classifiers))
|
||||
for i, group := range c.classifiers {
|
||||
class := group(node)
|
||||
id, ok := c.classID[class]
|
||||
if !ok {
|
||||
id = classID(len(c.classID))
|
||||
c.className[id] = class
|
||||
c.classID[class] = id
|
||||
}
|
||||
classes[i] = id
|
||||
}
|
||||
c.classified[alias] = classes
|
||||
}
|
||||
}
|
||||
|
||||
// Fork implements rangedloop.Observer.
|
||||
func (c *Report) Fork(ctx context.Context) (rangedloop.Partial, error) {
|
||||
d := &ObserverFork{
|
||||
classifiers: c.classifiers,
|
||||
healthStat: nil,
|
||||
aliasMap: c.aliasMap,
|
||||
nodes: c.nodes,
|
||||
classifierCache: make([][]string, c.aliasMap.Max()+1),
|
||||
reportThreshold: c.reportThreshold,
|
||||
classifiers: c.classifiers,
|
||||
aliasMap: c.aliasMap,
|
||||
nodes: c.nodes,
|
||||
classifierCache: make([][]string, c.aliasMap.Max()+1),
|
||||
reportThreshold: c.reportThreshold,
|
||||
healthStat: make([]HealthStat, len(c.classID)),
|
||||
controlledByClassCache: make([]int32, len(c.classID)),
|
||||
classified: c.classified,
|
||||
}
|
||||
d.classifyNodeAliases()
|
||||
return d, nil
|
||||
}
|
||||
|
||||
@ -137,7 +172,7 @@ func (c *Report) Join(ctx context.Context, partial rangedloop.Partial) (err erro
|
||||
if stat.Unused() {
|
||||
continue
|
||||
}
|
||||
name := fork.className[classID(cid)]
|
||||
name := c.className[classID(cid)]
|
||||
existing, found := c.healthStat[name]
|
||||
if !found {
|
||||
c.healthStat[name] = &HealthStat{
|
||||
@ -171,10 +206,6 @@ type classID int32
|
||||
|
||||
// ObserverFork is the durability calculator for each segment range.
|
||||
type ObserverFork struct {
|
||||
// map between classes (like "country:hu" and integer IDs)
|
||||
classID map[string]classID
|
||||
className map[classID]string
|
||||
|
||||
controlledByClassCache []int32
|
||||
|
||||
healthStat []HealthStat
|
||||
@ -183,37 +214,9 @@ type ObserverFork struct {
|
||||
nodes map[storj.NodeID]*nodeselection.SelectedNode
|
||||
classifierCache [][]string
|
||||
|
||||
// contains the available classes for each node alias.
|
||||
classified [][]classID
|
||||
reportThreshold int
|
||||
}
|
||||
|
||||
func (c *ObserverFork) classifyNodeAliases() {
|
||||
c.classID = make(map[string]classID, len(c.classifiers))
|
||||
c.className = make(map[classID]string, len(c.classifiers))
|
||||
|
||||
c.classified = make([][]classID, c.aliasMap.Max()+1)
|
||||
for _, node := range c.nodes {
|
||||
alias, ok := c.aliasMap.Alias(node.ID)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
classes := make([]classID, len(c.classifiers))
|
||||
for i, group := range c.classifiers {
|
||||
class := group(node)
|
||||
id, ok := c.classID[class]
|
||||
if !ok {
|
||||
id = classID(len(c.classID))
|
||||
c.className[id] = class
|
||||
c.classID[class] = id
|
||||
}
|
||||
classes[i] = id
|
||||
}
|
||||
c.classified[alias] = classes
|
||||
}
|
||||
c.healthStat = make([]HealthStat, len(c.classID))
|
||||
c.controlledByClassCache = make([]int32, len(c.classID))
|
||||
classified [][]classID
|
||||
}
|
||||
|
||||
// Process implements rangedloop.Partial.
|
||||
|
@ -74,6 +74,8 @@ func TestDurability(t *testing.T) {
|
||||
c.nodes[node.ID] = node
|
||||
}
|
||||
|
||||
c.classifyNodeAliases()
|
||||
|
||||
fork, err := c.Fork(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -85,11 +87,6 @@ func TestDurability(t *testing.T) {
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = c.Join(ctx, fork)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
{
|
||||
// second batch
|
||||
err = fork.Process(ctx, []rangedloop.Segment{
|
||||
segment(storageNodes, 2, 3, 4, 7),
|
||||
@ -133,6 +130,7 @@ func TestDurabilityUnknownNode(t *testing.T) {
|
||||
c.nodes[node.ID] = node
|
||||
}
|
||||
|
||||
c.classifyNodeAliases()
|
||||
fork, err := c.Fork(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -244,7 +242,6 @@ func BenchmarkDurabilityProcess(b *testing.B) {
|
||||
},
|
||||
},
|
||||
}
|
||||
d.classifyNodeAliases()
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
|
Loading…
Reference in New Issue
Block a user