satellite: durability rangeloop observer for monitoring risks

Change-Id: I92805fcc6e7c1bbe0f42bbf849d22f9908fedadb
This commit is contained in:
Márton Elek 2023-10-04 15:22:35 +02:00 committed by Storj Robot
parent 863c96b771
commit db3578d9ba
8 changed files with 597 additions and 0 deletions

View File

@ -0,0 +1,5 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
// Package durability helps us to keep segments in healthy state with reporting risks.
package durability

View File

@ -0,0 +1,255 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package durability
import (
var ek = eventkit.Package()
// HealthStat collects the availability conditions for one class (for example: nodes with the same owner).
type HealthStat struct {
// because 0 means uninitialized, we store the min +1
minPlusOne int
// Update updates the stat with one measurement: number of pieces which are available even without the nodes of the selected class.
func (h *HealthStat) Update(num int) {
if num < h.minPlusOne-1 || h.minPlusOne == 0 {
h.minPlusOne = num + 1
// Merge can merge two stat to one, without losing information.
func (h *HealthStat) Merge(stat *HealthStat) {
if stat.minPlusOne < h.minPlusOne && stat.minPlusOne > 0 {
h.minPlusOne = stat.minPlusOne
// Min returns the minimal number.
func (h *HealthStat) Min() int {
return h.minPlusOne - 1
// Unused returns true when stat is uninitialized (-1) and was not updated with any number.
func (h *HealthStat) Unused() bool {
return h.minPlusOne == 0
// NodeClassifier identifies a risk class (for example an owner, or country) of the SelectedNode.
type NodeClassifier func(node *nodeselection.SelectedNode) string
// ReportConfig configures durability report.
type ReportConfig struct {
Enabled bool `help:"whether to enable durability report (rangedloop observer)" default:"true"`
// Report is a calculator (rangloop.Observer) which checks the availability of pieces without certain nodes.
// It can answer the following question:
// 1. loosing a given group of nodes (all nodes of one country or all nodes of one owner)...
// 2. what will be the lowest humber of healhty pieces, checking all the segments.
// Example: we have one segment where 80 pieces are stored, but 42 of them are in Germany.
// in this case this reporter will return 38 for the class "country:DE" (assuming all the other segments are more lucky).
type Report struct {
healthStat map[string]*HealthStat
classifiers []NodeClassifier
aliasMap *metabase.NodeAliasMap
nodes map[storj.NodeID]*nodeselection.SelectedNode
db overlay.DB
metabaseDB *metabase.DB
reporter func(name string, stat *HealthStat)
reportThreshold int
asOfSystemInterval time.Duration
// NewDurability creates the new instance.
func NewDurability(db overlay.DB, metabaseDB *metabase.DB, classifiers []NodeClassifier, reportThreshold int, asOfSystemInterval time.Duration) *Report {
return &Report{
db: db,
metabaseDB: metabaseDB,
classifiers: classifiers,
reportThreshold: reportThreshold,
asOfSystemInterval: asOfSystemInterval,
nodes: make(map[storj.NodeID]*nodeselection.SelectedNode),
healthStat: make(map[string]*HealthStat),
reporter: reportToEventkit,
// Start implements rangedloop.Observer.
func (c *Report) Start(ctx context.Context, startTime time.Time) error {
nodes, err := c.db.GetParticipatingNodes(ctx, -12*time.Hour, c.asOfSystemInterval)
if err != nil {
return errs.Wrap(err)
c.nodes = map[storj.NodeID]*nodeselection.SelectedNode{}
for ix := range nodes {
c.nodes[nodes[ix].ID] = &nodes[ix]
aliasMap, err := c.metabaseDB.LatestNodesAliasMap(ctx)
if err != nil {
return errs.Wrap(err)
c.aliasMap = aliasMap
return nil
// Fork implements rangedloop.Observer.
func (c *Report) Fork(ctx context.Context) (rangedloop.Partial, error) {
d := &ObserverFork{
classifiers: c.classifiers,
healthStat: nil,
aliasMap: c.aliasMap,
nodes: c.nodes,
classifierCache: make([][]string, c.aliasMap.Max()+1),
reportThreshold: c.reportThreshold,
return d, nil
// Join implements rangedloop.Observer.
func (c *Report) Join(ctx context.Context, partial rangedloop.Partial) (err error) {
defer mon.Task()(&ctx)(&err)
fork := partial.(*ObserverFork)
for cid, stat := range fork.healthStat {
if stat.Unused() {
name := fork.className[classID(cid)]
existing, found := c.healthStat[name]
if !found {
c.healthStat[name] = &HealthStat{
minPlusOne: stat.minPlusOne,
} else {
return nil
// Finish implements rangedloop.Observer.
func (c *Report) Finish(ctx context.Context) error {
for name, stat := range c.healthStat {
c.reporter(name, stat)
return nil
// TestChangeReporter modifies the reporter for unit tests.
func (c *Report) TestChangeReporter(r func(name string, stat *HealthStat)) {
c.reporter = r
// classID is a fork level short identifier for each class.
type classID int32
// ObserverFork is the durability calculator for each segment range.
type ObserverFork struct {
// map between classes (like "country:hu" and integer IDs)
classID map[string]classID
className map[classID]string
controlledByClassCache []int32
healthStat []HealthStat
classifiers []NodeClassifier
aliasMap *metabase.NodeAliasMap
nodes map[storj.NodeID]*nodeselection.SelectedNode
classifierCache [][]string
// contains the available classes for each node alias.
classified [][]classID
reportThreshold int
func (c *ObserverFork) classifyNodeAliases() {
c.classID = make(map[string]classID, len(c.classifiers))
c.className = make(map[classID]string, len(c.classifiers))
c.classified = make([][]classID, c.aliasMap.Max()+1)
for _, node := range c.nodes {
alias, ok := c.aliasMap.Alias(node.ID)
if !ok {
classes := make([]classID, len(c.classifiers))
for i, group := range c.classifiers {
class := group(node)
id, ok := c.classID[class]
if !ok {
id = classID(len(c.classID))
c.className[id] = class
c.classID[class] = id
classes[i] = id
c.classified[alias] = classes
c.healthStat = make([]HealthStat, len(c.classID))
c.controlledByClassCache = make([]int32, len(c.classID))
// Process implements rangedloop.Partial.
func (c *ObserverFork) Process(ctx context.Context, segments []rangedloop.Segment) (err error) {
controlledByClass := c.controlledByClassCache
for i := range segments {
s := &segments[i]
healthyPieceCount := 0
for _, piece := range s.AliasPieces {
classes := c.classified[piece.Alias]
// unavailable/offline nodes were not classified
if len(classes) > 0 {
for _, class := range classes {
for classID, count := range controlledByClass {
if count == 0 {
// reset the value for the next iteration
controlledByClass[classID] = 0
diff := healthyPieceCount - int(count)
// if value is high, it's not a problem. faster to ignore it...
if c.reportThreshold > 0 && diff > c.reportThreshold {
return nil
func reportToEventkit(name string, stat *HealthStat) {
ek.Event("durability", eventkit.String("name", name), eventkit.Int64("min", int64(stat.Min())))
var _ rangedloop.Observer = &Report{}
var _ rangedloop.Partial = &ObserverFork{}

View File

@ -0,0 +1,79 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package durability_test
import (
func TestDurabilityIntegration(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: 6,
UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Metainfo.RS.Min = 3
config.Metainfo.RS.Repair = 5
config.Metainfo.RS.Success = 5
config.Metainfo.RS.Total = 6
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
// upload object
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
require.NoError(t, err)
_, err = project.CreateBucket(ctx, "bucket1")
assert.NoError(t, err)
for i := 0; i < 10; i++ {
object, err := project.UploadObject(ctx, "bucket1", fmt.Sprintf("key%d", i), nil)
assert.NoError(t, err)
_, err = object.Write(make([]byte, 10240))
assert.NoError(t, err)
err = object.Commit()
assert.NoError(t, err)
require.NoError(t, project.Close())
// we uploaded to 5 nodes, having 2 node in HU means that we control at least 1 piece, but max 2
require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, planet.StorageNodes[0].NodeURL().ID, location.Hungary.String()))
require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, planet.StorageNodes[1].NodeURL().ID, location.Hungary.String()))
result := make(map[string]*durability.HealthStat)
planet.Satellites[0].RangedLoop.DurabilityReport.Observer.TestChangeReporter(func(name string, stat *durability.HealthStat) {
result[name] = stat
rangedLoopService := planet.Satellites[0].RangedLoop.RangedLoop.Service
_, err := rangedLoopService.RunOnce(ctx)
require.Len(t, result, 15)
// one or two pieces are controlled out of the 5-6 --> 3 or 4 pieces are available without HU nodes
require.True(t, result["c:HU"].Min() > 2)
require.True(t, result["c:HU"].Min() < 5)
require.NoError(t, err)

View File

@ -0,0 +1,217 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package durability
import (
func TestDurability(t *testing.T) {
createUUID := func() uuid.UUID {
id, err := uuid.New()
require.NoError(t, err)
return id
var storageNodes []*nodeselection.SelectedNode
var aliases []metabase.NodeAliasEntry
for i := 0; i < 10; i++ {
node := &nodeselection.SelectedNode{
ID: testidentity.MustPregeneratedIdentity(i, storj.LatestIDVersion()).ID,
LastNet: fmt.Sprintf("127.0.%d.0", i%3),
storageNodes = append(storageNodes, node)
aliases = append(aliases, metabase.NodeAliasEntry{
ID: node.ID,
Alias: metabase.NodeAlias(i),
ctx := testcontext.New(t)
c := NewDurability(nil, nil, []NodeClassifier{
func(node *nodeselection.SelectedNode) string {
return "net:" + node.LastNet
}}, 0, 0)
c.aliasMap = metabase.NewNodeAliasMap(aliases)
for _, node := range storageNodes {
c.nodes[node.ID] = node
fork, err := c.Fork(ctx)
require.NoError(t, err)
// first batch
err = fork.Process(ctx, []rangedloop.Segment{
StreamID: createUUID(),
Position: metabase.SegmentPosition{
Part: 1,
Index: 1,
AliasPieces: pieces(storageNodes, 3, 6, 9, 1),
StreamID: createUUID(),
Position: metabase.SegmentPosition{
Part: 1,
Index: 1,
AliasPieces: pieces(storageNodes, 1, 2, 3, 4),
require.NoError(t, err)
err = c.Join(ctx, fork)
require.NoError(t, err)
// second batch
err = fork.Process(ctx, []rangedloop.Segment{
StreamID: createUUID(),
Position: metabase.SegmentPosition{
Part: 1,
Index: 1,
AliasPieces: pieces(storageNodes, 2, 3, 4, 7),
StreamID: createUUID(),
Position: metabase.SegmentPosition{
Part: 1,
Index: 1,
AliasPieces: pieces(storageNodes, 1, 2, 3, 4, 6, 7, 8),
require.NoError(t, err)
err = c.Join(ctx, fork)
require.NoError(t, err)
require.Equal(t, 1, c.healthStat["net:"].Min())
require.Equal(t, 2, c.healthStat["net:"].Min())
require.Equal(t, 3, c.healthStat["net:"].Min())
func pieces(nodes []*nodeselection.SelectedNode, ix (res metabase.AliasPieces) {
for n, i := range ix {
res = append(res, metabase.AliasPiece{
Number: uint16(n),
Alias: metabase.NodeAlias(i),
return res
func BenchmarkDurabilityProcess(b *testing.B) {
ctx := context.TODO()
rng := rand.New(rand.NewSource(0))
nodeNo := 20000
if testing.Short() {
nodeNo = 10
nodeMap := make(map[storj.NodeID]*nodeselection.SelectedNode)
var aliasToNode []*nodeselection.SelectedNode
var nodeAliases []metabase.NodeAliasEntry
// generating nodes and node aliases.
for i := 0; i < nodeNo; i++ {
id := testrand.NodeID()
node := &nodeselection.SelectedNode{
ID: id,
LastNet: "",
CountryCode: location.UnitedStates,
Email: fmt.Sprintf("", i%2),
nodeMap[node.ID] = node
aliasToNode = append(aliasToNode, node)
nodeAliases = append(nodeAliases, metabase.NodeAliasEntry{
ID: node.ID,
Alias: metabase.NodeAlias(i),
aliases := metabase.NewNodeAliasMap(nodeAliases)
var segments []rangedloop.Segment
// create 2500 segments (usual observer loop batch size) with 80 pieces
for i := 0; i < 2500; i++ {
var id uuid.UUID
var pieces metabase.Pieces
var aliasPieces metabase.AliasPieces
for j := 0; j < 80; j++ {
nodeIx := rand.Intn(len(aliasToNode) - 1)
pieces = append(pieces, metabase.Piece{
Number: uint16(j),
StorageNode: aliasToNode[nodeIx].ID,
aliasPieces = append(aliasPieces, metabase.AliasPiece{
Number: uint16(j),
Alias: metabase.NodeAlias(nodeIx),
segments = append(segments, rangedloop.Segment{
StreamID: id,
Position: metabase.SegmentPosition{
Part: 1,
Index: 1,
CreatedAt: time.Now(),
Pieces: pieces,
AliasPieces: aliasPieces,
d := ObserverFork{
aliasMap: aliases,
nodes: nodeMap,
classifierCache: make([][]string, aliases.Max()),
classifiers: []NodeClassifier{
func(node *nodeselection.SelectedNode) string {
return "email:" + node.Email
for i := 0; i < b.N; i++ {
benchmarkProcess(ctx, b, d, segments)
func benchmarkProcess(ctx context.Context, b *testing.B, d ObserverFork, segments []rangedloop.Segment) {
err := d.Process(ctx, segments)
require.NoError(b, err)

View File

@ -0,0 +1,8 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package durability
import ""
var mon = monkit.Package()

View File

@ -42,6 +42,7 @@ import (
@ -221,6 +222,8 @@ type Config struct {
PieceTracker piecetracker.Config
DurabilityReport durability.ReportConfig
TagAuthorities string `help:"comma-separated paths of additional cert files, used to validate signed node tags"`

View File

@ -18,11 +18,13 @@ import (
@ -70,6 +72,10 @@ type RangedLoop struct {
Observer *piecetracker.Observer
DurabilityReport struct {
Observer *durability.Report
RangedLoop struct {
Service *rangedloop.Service
@ -140,6 +146,23 @@ func NewRangedLoop(log *zap.Logger, db DB, metabaseDB *metabase.DB, config *Conf
{ // setup
peer.DurabilityReport.Observer = durability.NewDurability(db.OverlayCache(), metabaseDB, []durability.NodeClassifier{
func(node *nodeselection.SelectedNode) string {
return "e:" + node.Email
func(node *nodeselection.SelectedNode) string {
return "w:" + node.Wallet
func(node *nodeselection.SelectedNode) string {
return "n:" + node.LastNet
func(node *nodeselection.SelectedNode) string {
return "c:" + node.CountryCode.String()
}, config.Metainfo.RS.Repair, config.RangedLoop.AsOfSystemInterval)
{ // setup overlay
placement, err := config.Placement.Parse()
if err != nil {
@ -202,6 +225,10 @@ func NewRangedLoop(log *zap.Logger, db DB, metabaseDB *metabase.DB, config *Conf
observers = append(observers, peer.PieceTracker.Observer)
if config.DurabilityReport.Enabled {
observers = append(observers, peer.DurabilityReport.Observer)
segments := rangedloop.NewMetabaseRangeSplitter(metabaseDB, config.RangedLoop.AsOfSystemInterval, config.RangedLoop.BatchSize)
peer.RangedLoop.Service = rangedloop.NewService(log.Named("rangedloop"), config.RangedLoop, segments, observers)

View File

@ -481,6 +481,9 @@ contact.external-address: ""
# If set, a path to write a process trace SVG to
# debug.trace-out: ""
# whether to enable durability report (rangedloop observer)
# durability-report.enabled: true
# how often to send reminders to users who need to verify their email
# email-reminders.chore-interval: 24h0m0s