satellite/repair: run chore which updates repair_queue stat

Change-Id: I18c9e79e700ac690c3aa78ee0df0cf6089acd2ac
This commit is contained in:
Márton Elek 2023-11-27 11:24:58 +01:00 committed by Storj Robot
parent 0590eadc17
commit 504c72f29d
6 changed files with 233 additions and 5 deletions

View File

@ -48,6 +48,7 @@ import (
"storj.io/storj/satellite/payments/billing" "storj.io/storj/satellite/payments/billing"
"storj.io/storj/satellite/payments/storjscan" "storj.io/storj/satellite/payments/storjscan"
"storj.io/storj/satellite/payments/stripe" "storj.io/storj/satellite/payments/stripe"
"storj.io/storj/satellite/repair/repairer"
"storj.io/storj/satellite/reputation" "storj.io/storj/satellite/reputation"
) )
@ -147,6 +148,10 @@ type Core struct {
GarbageCollection struct { GarbageCollection struct {
Sender *sender.Service Sender *sender.Service
} }
RepairQueueStat struct {
Chore *repairer.QueueStat
}
} }
// New creates a new satellite. // New creates a new satellite.
@ -247,13 +252,13 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
} }
} }
{ // setup overlay
placement, err := config.Placement.Parse() placement, err := config.Placement.Parse()
if err != nil { if err != nil {
return nil, err return nil, err
} }
{ // setup overlay
peer.Overlay.DB = peer.DB.OverlayCache() peer.Overlay.DB = peer.DB.OverlayCache()
peer.Overlay.Service, err = overlay.NewService(peer.Log.Named("overlay"), peer.Overlay.DB, peer.DB.NodeEvents(), placement.CreateFilters, config.Console.ExternalAddress, config.Console.SatelliteName, config.Overlay) peer.Overlay.Service, err = overlay.NewService(peer.Log.Named("overlay"), peer.Overlay.DB, peer.DB.NodeEvents(), placement.CreateFilters, config.Console.ExternalAddress, config.Console.SatelliteName, config.Overlay)
if err != nil { if err != nil {
@ -587,6 +592,17 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
}) })
} }
{
if config.RepairQueueCheck.Interval.Seconds() > 0 {
peer.RepairQueueStat.Chore = repairer.NewQueueStat(log, placement.SupportedPlacements(), db.RepairQueue(), config.RepairQueueCheck.Interval)
peer.Services.Add(lifecycle.Item{
Name: "queue-stat",
Run: peer.RepairQueueStat.Chore.Run,
})
}
}
{ // setup garbage collection { // setup garbage collection
peer.GarbageCollection.Sender = sender.NewService( peer.GarbageCollection.Sender = sender.NewService(
peer.Log.Named("gc-sender"), peer.Log.Named("gc-sender"),

View File

@ -187,6 +187,8 @@ type Config struct {
GarbageCollection sender.Config GarbageCollection sender.Config
GarbageCollectionBF bloomfilter.Config GarbageCollectionBF bloomfilter.Config
RepairQueueCheck repairer.QueueStatConfig
RangedLoop rangedloop.Config RangedLoop rangedloop.Config
ExpiredDeletion expireddeletion.Config ExpiredDeletion expireddeletion.Config

View File

@ -0,0 +1,125 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package repairer
import (
"context"
"fmt"
"sync"
"time"
"github.com/spacemonkeygo/monkit/v3"
"go.uber.org/zap"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/storj/satellite/repair/queue"
)
// QueueStatConfig configures the queue checker chore.
// note: this is intentionally not part of the Config, as it is required by the chore, and it makes it possible to print out only required configs for chore (full repair config is not required).
type QueueStatConfig struct {
Interval time.Duration `help:"how frequently core should check the size of the repair queue" releaseDefault:"1h" devDefault:"1m0s" testDefault:"$TESTINTERVAL"`
}
// QueueStat contains the information and variables to ensure the Software is up-to-date.
type QueueStat struct {
db queue.RepairQueue
log *zap.Logger
Loop *sync2.Cycle
mu sync.Mutex
stats map[string]queue.Stat
updated time.Time
placements []storj.PlacementConstraint
}
var _ monkit.StatSource = &QueueStat{}
// NewQueueStat creates a chore to stat repair queue statistics.
func NewQueueStat(log *zap.Logger, placements []storj.PlacementConstraint, db queue.RepairQueue, checkInterval time.Duration) *QueueStat {
chore := &QueueStat{
db: db,
log: log,
Loop: sync2.NewCycle(checkInterval),
placements: placements,
}
mon.Chain(chore)
return chore
}
// Run logs the current version information.
func (c *QueueStat) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
return c.Loop.Run(ctx, func(ctx context.Context) error {
c.RunOnce(ctx)
return nil
})
}
// RunOnce refresh the queue statistics.
func (c *QueueStat) RunOnce(ctx context.Context) {
stats, err := c.db.Stat(ctx)
if err != nil {
c.log.Error("couldn't get repair queue statistic", zap.Error(err))
}
c.mu.Lock()
c.stats = map[string]queue.Stat{}
for _, stat := range stats {
c.stats[key(stat.Placement, stat.MinAttemptedAt != nil)] = stat
}
c.updated = time.Now()
c.mu.Unlock()
}
// Stats implements stat source.
func (c *QueueStat) Stats(cb func(key monkit.SeriesKey, field string, val float64)) {
c.mu.Lock()
defer c.mu.Unlock()
if time.Since(c.updated) > 24*time.Hour {
// stat is one day old (or never retrieved), not so interesting...
return
}
for _, placement := range c.placements {
for _, attempted := range []bool{false, true} {
keyWithDefaultTags := monkit.NewSeriesKey("repair_queue").
WithTags(
monkit.NewSeriesTag("attempted", fmt.Sprintf("%v", attempted)),
monkit.NewSeriesTag("placement", fmt.Sprintf("%d", placement)))
k := key(placement, attempted)
stat, found := c.stats[k]
if !found {
cb(keyWithDefaultTags, "count", 0)
cb(keyWithDefaultTags, "age", time.Since(c.updated).Seconds())
cb(keyWithDefaultTags, "since_oldest_inserted_sec", 0)
cb(keyWithDefaultTags, "since_latest_inserted_sec", 0)
cb(keyWithDefaultTags, "since_oldest_attempted_sec", 0)
cb(keyWithDefaultTags, "since_latest_attempted_sec", 0)
continue
}
cb(keyWithDefaultTags, "count", float64(stat.Count))
cb(keyWithDefaultTags, "age", time.Since(c.updated).Seconds())
cb(keyWithDefaultTags, "since_oldest_inserted_sec", time.Since(stat.MinInsertedAt).Seconds())
cb(keyWithDefaultTags, "since_latest_inserted_sec", time.Since(stat.MaxInsertedAt).Seconds())
if stat.MinAttemptedAt != nil {
cb(keyWithDefaultTags, "since_oldest_attempted_sec", time.Since(*stat.MinAttemptedAt).Seconds())
} else {
cb(keyWithDefaultTags, "since_oldest_attempted_sec", 0)
}
if stat.MaxAttemptedAt != nil {
cb(keyWithDefaultTags, "since_latest_attempted_sec", time.Since(*stat.MaxAttemptedAt).Seconds())
} else {
cb(keyWithDefaultTags, "since_latest_attempted_sec", 0)
}
}
}
}
func key(placement storj.PlacementConstraint, attempted bool) string {
return fmt.Sprintf("%d-%v", placement, attempted)
}

View File

@ -0,0 +1,81 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package repairer_test
import (
"fmt"
"sort"
"strings"
"testing"
"time"
"github.com/spacemonkeygo/monkit/v3"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/satellite"
"storj.io/storj/satellite/repair/queue"
"storj.io/storj/satellite/repair/repairer"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
)
func TestStatChore(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
_, err := db.RepairQueue().InsertBatch(ctx, []*queue.InjuredSegment{
{
StreamID: testrand.UUID(),
Placement: storj.PlacementConstraint(1),
},
{
StreamID: testrand.UUID(),
Placement: storj.PlacementConstraint(2),
},
{
StreamID: testrand.UUID(),
Placement: storj.PlacementConstraint(2),
},
})
require.NoError(t, err)
chore := repairer.NewQueueStat(zaptest.NewLogger(t), []storj.PlacementConstraint{0, 1, 2}, db.RepairQueue(), 100*time.Hour)
collectMonkitStat := func() map[string]float64 {
monkitValues := map[string]float64{}
monkit.Default.Stats(func(key monkit.SeriesKey, field string, val float64) {
if key.Measurement != "repair_queue" {
return
}
tags := key.Tags.All()
var tagKeys []string
for t := range tags {
tagKeys = append(tagKeys, t)
}
sort.Strings(tagKeys)
var tagKeyValues []string
for _, k := range tagKeys {
tagKeyValues = append(tagKeyValues, fmt.Sprintf("%s=%s", k, tags[k]))
}
monkitValues[strings.Join(tagKeyValues, ",")+" "+field] = val
})
return monkitValues
}
stat := collectMonkitStat()
require.Zero(t, stat["attempted=false,placement=1,scope=storj.io/storj/satellite/repair/repairer count"])
chore.RunOnce(ctx)
stat = collectMonkitStat()
require.Equal(t, float64(0), stat["attempted=false,placement=0,scope=storj.io/storj/satellite/repair/repairer count"])
require.Equal(t, float64(1), stat["attempted=false,placement=1,scope=storj.io/storj/satellite/repair/repairer count"])
require.Equal(t, float64(2), stat["attempted=false,placement=2,scope=storj.io/storj/satellite/repair/repairer count"])
})
}

View File

@ -248,6 +248,7 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity,
}) })
peer.Debug.Server.Panel.Add( peer.Debug.Server.Panel.Add(
debug.Cycle("Repair Worker", peer.Repairer.Loop)) debug.Cycle("Repair Worker", peer.Repairer.Loop))
} }
return peer, nil return peer, nil

View File

@ -1012,6 +1012,9 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
# ratio where to consider processed count as supicious # ratio where to consider processed count as supicious
# ranged-loop.suspicious-processed-ratio: 0.03 # ranged-loop.suspicious-processed-ratio: 0.03
# how frequently core should check the size of the repair queue
# repair-queue-check.interval: 1h0m0s
# time limit for dialing storage node # time limit for dialing storage node
# repairer.dial-timeout: 5s # repairer.dial-timeout: 5s