From 504c72f29df54be48745748f88ea207f7e0fc7ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rton=20Elek?= Date: Mon, 27 Nov 2023 11:24:58 +0100 Subject: [PATCH] satellite/repair: run chore which updates repair_queue stat Change-Id: I18c9e79e700ac690c3aa78ee0df0cf6089acd2ac --- satellite/core.go | 26 +++- satellite/peer.go | 2 + satellite/repair/repairer/queue_stat.go | 125 +++++++++++++++++++ satellite/repair/repairer/queue_stat_test.go | 81 ++++++++++++ satellite/repairer.go | 1 + scripts/testdata/satellite-config.yaml.lock | 3 + 6 files changed, 233 insertions(+), 5 deletions(-) create mode 100644 satellite/repair/repairer/queue_stat.go create mode 100644 satellite/repair/repairer/queue_stat_test.go diff --git a/satellite/core.go b/satellite/core.go index 10de99f39..09ad3839c 100644 --- a/satellite/core.go +++ b/satellite/core.go @@ -48,6 +48,7 @@ import ( "storj.io/storj/satellite/payments/billing" "storj.io/storj/satellite/payments/storjscan" "storj.io/storj/satellite/payments/stripe" + "storj.io/storj/satellite/repair/repairer" "storj.io/storj/satellite/reputation" ) @@ -147,6 +148,10 @@ type Core struct { GarbageCollection struct { Sender *sender.Service } + + RepairQueueStat struct { + Chore *repairer.QueueStat + } } // New creates a new satellite. @@ -247,12 +252,12 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, } } - { // setup overlay + placement, err := config.Placement.Parse() + if err != nil { + return nil, err + } - placement, err := config.Placement.Parse() - if err != nil { - return nil, err - } + { // setup overlay peer.Overlay.DB = peer.DB.OverlayCache() peer.Overlay.Service, err = overlay.NewService(peer.Log.Named("overlay"), peer.Overlay.DB, peer.DB.NodeEvents(), placement.CreateFilters, config.Console.ExternalAddress, config.Console.SatelliteName, config.Overlay) @@ -587,6 +592,17 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, }) } + { + if config.RepairQueueCheck.Interval.Seconds() > 0 { + peer.RepairQueueStat.Chore = repairer.NewQueueStat(log, placement.SupportedPlacements(), db.RepairQueue(), config.RepairQueueCheck.Interval) + + peer.Services.Add(lifecycle.Item{ + Name: "queue-stat", + Run: peer.RepairQueueStat.Chore.Run, + }) + } + } + { // setup garbage collection peer.GarbageCollection.Sender = sender.NewService( peer.Log.Named("gc-sender"), diff --git a/satellite/peer.go b/satellite/peer.go index 2f093a86b..92b365820 100644 --- a/satellite/peer.go +++ b/satellite/peer.go @@ -187,6 +187,8 @@ type Config struct { GarbageCollection sender.Config GarbageCollectionBF bloomfilter.Config + RepairQueueCheck repairer.QueueStatConfig + RangedLoop rangedloop.Config ExpiredDeletion expireddeletion.Config diff --git a/satellite/repair/repairer/queue_stat.go b/satellite/repair/repairer/queue_stat.go new file mode 100644 index 000000000..a82fc40af --- /dev/null +++ b/satellite/repair/repairer/queue_stat.go @@ -0,0 +1,125 @@ +// Copyright (C) 2023 Storj Labs, Inc. +// See LICENSE for copying information. + +package repairer + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/spacemonkeygo/monkit/v3" + "go.uber.org/zap" + + "storj.io/common/storj" + "storj.io/common/sync2" + "storj.io/storj/satellite/repair/queue" +) + +// QueueStatConfig configures the queue checker chore. +// note: this is intentionally not part of the Config, as it is required by the chore, and it makes it possible to print out only required configs for chore (full repair config is not required). +type QueueStatConfig struct { + Interval time.Duration `help:"how frequently core should check the size of the repair queue" releaseDefault:"1h" devDefault:"1m0s" testDefault:"$TESTINTERVAL"` +} + +// QueueStat contains the information and variables to ensure the Software is up-to-date. +type QueueStat struct { + db queue.RepairQueue + log *zap.Logger + Loop *sync2.Cycle + mu sync.Mutex + stats map[string]queue.Stat + updated time.Time + placements []storj.PlacementConstraint +} + +var _ monkit.StatSource = &QueueStat{} + +// NewQueueStat creates a chore to stat repair queue statistics. +func NewQueueStat(log *zap.Logger, placements []storj.PlacementConstraint, db queue.RepairQueue, checkInterval time.Duration) *QueueStat { + + chore := &QueueStat{ + db: db, + log: log, + Loop: sync2.NewCycle(checkInterval), + placements: placements, + } + mon.Chain(chore) + return chore +} + +// Run logs the current version information. +func (c *QueueStat) Run(ctx context.Context) (err error) { + defer mon.Task()(&ctx)(&err) + return c.Loop.Run(ctx, func(ctx context.Context) error { + c.RunOnce(ctx) + return nil + }) +} + +// RunOnce refresh the queue statistics. +func (c *QueueStat) RunOnce(ctx context.Context) { + stats, err := c.db.Stat(ctx) + if err != nil { + c.log.Error("couldn't get repair queue statistic", zap.Error(err)) + } + c.mu.Lock() + c.stats = map[string]queue.Stat{} + for _, stat := range stats { + c.stats[key(stat.Placement, stat.MinAttemptedAt != nil)] = stat + } + c.updated = time.Now() + c.mu.Unlock() +} + +// Stats implements stat source. +func (c *QueueStat) Stats(cb func(key monkit.SeriesKey, field string, val float64)) { + c.mu.Lock() + defer c.mu.Unlock() + if time.Since(c.updated) > 24*time.Hour { + // stat is one day old (or never retrieved), not so interesting... + return + } + for _, placement := range c.placements { + for _, attempted := range []bool{false, true} { + keyWithDefaultTags := monkit.NewSeriesKey("repair_queue"). + WithTags( + monkit.NewSeriesTag("attempted", fmt.Sprintf("%v", attempted)), + monkit.NewSeriesTag("placement", fmt.Sprintf("%d", placement))) + + k := key(placement, attempted) + stat, found := c.stats[k] + if !found { + cb(keyWithDefaultTags, "count", 0) + cb(keyWithDefaultTags, "age", time.Since(c.updated).Seconds()) + cb(keyWithDefaultTags, "since_oldest_inserted_sec", 0) + cb(keyWithDefaultTags, "since_latest_inserted_sec", 0) + cb(keyWithDefaultTags, "since_oldest_attempted_sec", 0) + cb(keyWithDefaultTags, "since_latest_attempted_sec", 0) + + continue + } + + cb(keyWithDefaultTags, "count", float64(stat.Count)) + cb(keyWithDefaultTags, "age", time.Since(c.updated).Seconds()) + cb(keyWithDefaultTags, "since_oldest_inserted_sec", time.Since(stat.MinInsertedAt).Seconds()) + cb(keyWithDefaultTags, "since_latest_inserted_sec", time.Since(stat.MaxInsertedAt).Seconds()) + if stat.MinAttemptedAt != nil { + cb(keyWithDefaultTags, "since_oldest_attempted_sec", time.Since(*stat.MinAttemptedAt).Seconds()) + } else { + cb(keyWithDefaultTags, "since_oldest_attempted_sec", 0) + } + if stat.MaxAttemptedAt != nil { + cb(keyWithDefaultTags, "since_latest_attempted_sec", time.Since(*stat.MaxAttemptedAt).Seconds()) + } else { + cb(keyWithDefaultTags, "since_latest_attempted_sec", 0) + } + } + } + +} + +func key(placement storj.PlacementConstraint, attempted bool) string { + return fmt.Sprintf("%d-%v", placement, attempted) +} diff --git a/satellite/repair/repairer/queue_stat_test.go b/satellite/repair/repairer/queue_stat_test.go new file mode 100644 index 000000000..644e651a6 --- /dev/null +++ b/satellite/repair/repairer/queue_stat_test.go @@ -0,0 +1,81 @@ +// Copyright (C) 2023 Storj Labs, Inc. +// See LICENSE for copying information. + +package repairer_test + +import ( + "fmt" + "sort" + "strings" + "testing" + "time" + + "github.com/spacemonkeygo/monkit/v3" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + + "storj.io/common/storj" + "storj.io/common/testcontext" + "storj.io/common/testrand" + "storj.io/storj/satellite" + "storj.io/storj/satellite/repair/queue" + "storj.io/storj/satellite/repair/repairer" + "storj.io/storj/satellite/satellitedb/satellitedbtest" +) + +func TestStatChore(t *testing.T) { + satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { + _, err := db.RepairQueue().InsertBatch(ctx, []*queue.InjuredSegment{ + { + StreamID: testrand.UUID(), + Placement: storj.PlacementConstraint(1), + }, + { + StreamID: testrand.UUID(), + Placement: storj.PlacementConstraint(2), + }, + { + StreamID: testrand.UUID(), + Placement: storj.PlacementConstraint(2), + }, + }) + require.NoError(t, err) + + chore := repairer.NewQueueStat(zaptest.NewLogger(t), []storj.PlacementConstraint{0, 1, 2}, db.RepairQueue(), 100*time.Hour) + + collectMonkitStat := func() map[string]float64 { + monkitValues := map[string]float64{} + monkit.Default.Stats(func(key monkit.SeriesKey, field string, val float64) { + if key.Measurement != "repair_queue" { + return + } + + tags := key.Tags.All() + + var tagKeys []string + for t := range tags { + tagKeys = append(tagKeys, t) + } + sort.Strings(tagKeys) + + var tagKeyValues []string + for _, k := range tagKeys { + tagKeyValues = append(tagKeyValues, fmt.Sprintf("%s=%s", k, tags[k])) + } + + monkitValues[strings.Join(tagKeyValues, ",")+" "+field] = val + }) + return monkitValues + } + + stat := collectMonkitStat() + require.Zero(t, stat["attempted=false,placement=1,scope=storj.io/storj/satellite/repair/repairer count"]) + + chore.RunOnce(ctx) + stat = collectMonkitStat() + + require.Equal(t, float64(0), stat["attempted=false,placement=0,scope=storj.io/storj/satellite/repair/repairer count"]) + require.Equal(t, float64(1), stat["attempted=false,placement=1,scope=storj.io/storj/satellite/repair/repairer count"]) + require.Equal(t, float64(2), stat["attempted=false,placement=2,scope=storj.io/storj/satellite/repair/repairer count"]) + }) +} diff --git a/satellite/repairer.go b/satellite/repairer.go index b38e22026..e68835570 100644 --- a/satellite/repairer.go +++ b/satellite/repairer.go @@ -248,6 +248,7 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity, }) peer.Debug.Server.Panel.Add( debug.Cycle("Repair Worker", peer.Repairer.Loop)) + } return peer, nil diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index 53ec93d53..2d5c7a472 100755 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -1012,6 +1012,9 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key # ratio where to consider processed count as supicious # ranged-loop.suspicious-processed-ratio: 0.03 +# how frequently core should check the size of the repair queue +# repair-queue-check.interval: 1h0m0s + # time limit for dialing storage node # repairer.dial-timeout: 5s