satellite/durability: add exemplar and report time to the reported results

Exemplars are representative elements for the stat. For example if a stat min is `30`, we can save one example with that value.

More details about the concept is here: https://grafana.com/docs/grafana/latest/fundamentals/exemplars/

In our context, which save the segment + position in case of min is updated, to make it easier to look after the segment in danger.

Change-Id: I19be482f1ddc7f1711e722c7b17480366d2c8312
This commit is contained in:
Márton Elek 2023-10-19 13:29:42 +02:00
parent 86decb1f44
commit 015cb94909
No known key found for this signature in database
3 changed files with 64 additions and 45 deletions

View File

@ -5,6 +5,7 @@ package durability
import (
"context"
"fmt"
"time"
"github.com/jtolio/eventkit"
@ -23,12 +24,15 @@ var ek = eventkit.Package()
type HealthStat struct {
// because 0 means uninitialized, we store the min +1
minPlusOne int
Exemplar string
}
// Update updates the stat with one measurement: number of pieces which are available even without the nodes of the selected class.
func (h *HealthStat) Update(num int) {
// Exemplar is one example identifier with such measurement. Useful to dig deeper, based on this one example.
func (h *HealthStat) Update(num int, exemplar string) {
if num < h.minPlusOne-1 || h.minPlusOne == 0 {
h.minPlusOne = num + 1
h.Exemplar = exemplar
}
}
@ -36,7 +40,9 @@ func (h *HealthStat) Update(num int) {
func (h *HealthStat) Merge(stat *HealthStat) {
if stat.minPlusOne < h.minPlusOne && stat.minPlusOne > 0 {
h.minPlusOne = stat.minPlusOne
h.Exemplar = stat.Exemplar
}
}
// Min returns the minimal number.
@ -72,7 +78,7 @@ type Report struct {
nodes map[storj.NodeID]*nodeselection.SelectedNode
db overlay.DB
metabaseDB *metabase.DB
reporter func(name string, stat *HealthStat)
reporter func(n time.Time, name string, stat *HealthStat)
reportThreshold int
asOfSystemInterval time.Duration
}
@ -136,6 +142,7 @@ func (c *Report) Join(ctx context.Context, partial rangedloop.Partial) (err erro
if !found {
c.healthStat[name] = &HealthStat{
minPlusOne: stat.minPlusOne,
Exemplar: stat.Exemplar,
}
} else {
existing.Merge(&stat)
@ -147,14 +154,15 @@ func (c *Report) Join(ctx context.Context, partial rangedloop.Partial) (err erro
// Finish implements rangedloop.Observer.
func (c *Report) Finish(ctx context.Context) error {
reportTime := time.Now()
for name, stat := range c.healthStat {
c.reporter(name, stat)
c.reporter(reportTime, name, stat)
}
return nil
}
// TestChangeReporter modifies the reporter for unit tests.
func (c *Report) TestChangeReporter(r func(name string, stat *HealthStat)) {
func (c *Report) TestChangeReporter(r func(n time.Time, name string, stat *HealthStat)) {
c.reporter = r
}
@ -213,6 +221,10 @@ func (c *ObserverFork) Process(ctx context.Context, segments []rangedloop.Segmen
controlledByClass := c.controlledByClassCache
for i := range segments {
s := &segments[i]
if s.Inline() {
continue
}
healthyPieceCount := 0
for _, piece := range s.AliasPieces {
if len(c.classified) <= int(piece.Alias) {
@ -233,6 +245,7 @@ func (c *ObserverFork) Process(ctx context.Context, segments []rangedloop.Segmen
}
}
streamLocation := fmt.Sprintf("%s/%d", s.StreamID, s.Position.Encode())
for classID, count := range controlledByClass {
if count == 0 {
continue
@ -243,18 +256,23 @@ func (c *ObserverFork) Process(ctx context.Context, segments []rangedloop.Segmen
diff := healthyPieceCount - int(count)
// if value is high, it's not a problem. faster to ignore it...
if c.reportThreshold > 0 && diff > c.reportThreshold {
continue
}
c.healthStat[classID].Update(diff)
c.healthStat[classID].Update(diff, streamLocation)
}
}
return nil
}
func reportToEventkit(name string, stat *HealthStat) {
ek.Event("durability", eventkit.String("name", name), eventkit.Int64("min", int64(stat.Min())))
func reportToEventkit(n time.Time, name string, stat *HealthStat) {
ek.Event("durability",
eventkit.String("name", name),
eventkit.String("exemplar", stat.Exemplar),
eventkit.Timestamp("report_time", n),
eventkit.Int64("min", int64(stat.Min())),
)
}
var _ rangedloop.Observer = &Report{}

View File

@ -6,6 +6,7 @@ package durability_test
import (
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -63,7 +64,7 @@ func TestDurabilityIntegration(t *testing.T) {
}
result := make(map[string]*durability.HealthStat)
planet.Satellites[0].RangedLoop.DurabilityReport.Observer.TestChangeReporter(func(name string, stat *durability.HealthStat) {
planet.Satellites[0].RangedLoop.DurabilityReport.Observer.TestChangeReporter(func(n time.Time, name string, stat *durability.HealthStat) {
result[name] = stat
})

View File

@ -38,6 +38,31 @@ func TestDurability(t *testing.T) {
})
}
segment := func(nodes []*nodeselection.SelectedNode, ix ...int) (res rangedloop.Segment) {
var pieces metabase.AliasPieces
for n, i := range ix {
pieces = append(pieces, metabase.AliasPiece{
Number: uint16(n),
Alias: metabase.NodeAlias(i),
})
}
res.StreamID = testrand.UUID()
res.Position = metabase.SegmentPosition{
Part: 0,
Index: 0,
}
// it's not inline if non-default redundancy is set.
res.Redundancy = storj.RedundancyScheme{
ShareSize: 123,
}
res.AliasPieces = pieces
return res
}
ctx := testcontext.New(t)
c := NewDurability(nil, nil, []NodeClassifier{
func(node *nodeselection.SelectedNode) string {
@ -52,17 +77,11 @@ func TestDurability(t *testing.T) {
fork, err := c.Fork(ctx)
require.NoError(t, err)
segment1 := segment(storageNodes, 3, 6, 9, 1)
{
// first batch
err = fork.Process(ctx, []rangedloop.Segment{
{
StreamID: testrand.UUID(),
Position: metabase.SegmentPosition{
Part: 1,
Index: 1,
},
AliasPieces: pieces(storageNodes, 3, 6, 9, 1),
},
segment1,
})
require.NoError(t, err)
@ -73,22 +92,8 @@ func TestDurability(t *testing.T) {
{
// second batch
err = fork.Process(ctx, []rangedloop.Segment{
{
StreamID: testrand.UUID(),
Position: metabase.SegmentPosition{
Part: 1,
Index: 1,
},
AliasPieces: pieces(storageNodes, 2, 3, 4, 7),
},
{
StreamID: testrand.UUID(),
Position: metabase.SegmentPosition{
Part: 1,
Index: 1,
},
AliasPieces: pieces(storageNodes, 1, 2, 3, 4, 6, 7, 8),
},
segment(storageNodes, 2, 3, 4, 7),
segment(storageNodes, 1, 2, 3, 4, 6, 7, 8),
})
require.NoError(t, err)
@ -96,7 +101,9 @@ func TestDurability(t *testing.T) {
require.NoError(t, err)
}
require.NotNil(t, c.healthStat["net:127.0.0.0"])
require.Equal(t, 1, c.healthStat["net:127.0.0.0"].Min())
require.Equal(t, segment1.StreamID.String()+"/0", c.healthStat["net:127.0.0.0"].Exemplar)
require.Equal(t, 2, c.healthStat["net:127.0.1.0"].Min())
require.Equal(t, 3, c.healthStat["net:127.0.2.0"].Min())
}
@ -134,8 +141,11 @@ func TestDurabilityUnknownNode(t *testing.T) {
{
StreamID: testrand.UUID(),
Position: metabase.SegmentPosition{
Part: 1,
Index: 1,
Part: 0,
Index: 0,
},
Redundancy: storj.RedundancyScheme{
ShareSize: 123,
},
AliasPieces: metabase.AliasPieces{
metabase.AliasPiece{
@ -157,16 +167,6 @@ func TestDurabilityUnknownNode(t *testing.T) {
require.Equal(t, 0, c.healthStat["net:127.0.0.1"].Min())
}
func pieces(nodes []*nodeselection.SelectedNode, ix ...int) (res metabase.AliasPieces) {
for n, i := range ix {
res = append(res, metabase.AliasPiece{
Number: uint16(n),
Alias: metabase.NodeAlias(i),
})
}
return res
}
func BenchmarkDurabilityProcess(b *testing.B) {
ctx := context.TODO()