satellite/metrics: remove code related to segments loop
We are switching completely to ranged loop. Change-Id: I32120ef496addebec2de088fd10d0c1d02313c68
This commit is contained in:
parent
78029ec3fb
commit
fbfe5aaad7
@ -51,7 +51,6 @@ import (
|
|||||||
"storj.io/storj/satellite/metabase/zombiedeletion"
|
"storj.io/storj/satellite/metabase/zombiedeletion"
|
||||||
"storj.io/storj/satellite/metainfo"
|
"storj.io/storj/satellite/metainfo"
|
||||||
"storj.io/storj/satellite/metainfo/expireddeletion"
|
"storj.io/storj/satellite/metainfo/expireddeletion"
|
||||||
"storj.io/storj/satellite/metrics"
|
|
||||||
"storj.io/storj/satellite/nodeevents"
|
"storj.io/storj/satellite/nodeevents"
|
||||||
"storj.io/storj/satellite/nodestats"
|
"storj.io/storj/satellite/nodestats"
|
||||||
"storj.io/storj/satellite/orders"
|
"storj.io/storj/satellite/orders"
|
||||||
@ -204,10 +203,6 @@ type Satellite struct {
|
|||||||
Chore *gracefulexit.Chore
|
Chore *gracefulexit.Chore
|
||||||
Endpoint *gracefulexit.Endpoint
|
Endpoint *gracefulexit.Endpoint
|
||||||
}
|
}
|
||||||
|
|
||||||
Metrics struct {
|
|
||||||
Chore *metrics.Chore
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Label returns name for debugger.
|
// Label returns name for debugger.
|
||||||
@ -671,8 +666,6 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer
|
|||||||
system.GracefulExit.Chore = peer.GracefulExit.Chore
|
system.GracefulExit.Chore = peer.GracefulExit.Chore
|
||||||
system.GracefulExit.Endpoint = api.GracefulExit.Endpoint
|
system.GracefulExit.Endpoint = api.GracefulExit.Endpoint
|
||||||
|
|
||||||
system.Metrics.Chore = peer.Metrics.Chore
|
|
||||||
|
|
||||||
return system
|
return system
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -40,7 +40,6 @@ import (
|
|||||||
"storj.io/storj/satellite/metabase/segmentloop"
|
"storj.io/storj/satellite/metabase/segmentloop"
|
||||||
"storj.io/storj/satellite/metabase/zombiedeletion"
|
"storj.io/storj/satellite/metabase/zombiedeletion"
|
||||||
"storj.io/storj/satellite/metainfo/expireddeletion"
|
"storj.io/storj/satellite/metainfo/expireddeletion"
|
||||||
"storj.io/storj/satellite/metrics"
|
|
||||||
"storj.io/storj/satellite/nodeevents"
|
"storj.io/storj/satellite/nodeevents"
|
||||||
"storj.io/storj/satellite/overlay"
|
"storj.io/storj/satellite/overlay"
|
||||||
"storj.io/storj/satellite/overlay/offlinenodes"
|
"storj.io/storj/satellite/overlay/offlinenodes"
|
||||||
@ -149,10 +148,6 @@ type Core struct {
|
|||||||
GracefulExit struct {
|
GracefulExit struct {
|
||||||
Chore *gracefulexit.Chore
|
Chore *gracefulexit.Chore
|
||||||
}
|
}
|
||||||
|
|
||||||
Metrics struct {
|
|
||||||
Chore *metrics.Chore
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a new satellite.
|
// New creates a new satellite.
|
||||||
@ -631,26 +626,6 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
{ // setup metrics service
|
|
||||||
log := peer.Log.Named("metrics")
|
|
||||||
if config.Metrics.UseRangedLoop {
|
|
||||||
log.Info("using ranged loop")
|
|
||||||
} else {
|
|
||||||
peer.Metrics.Chore = metrics.NewChore(
|
|
||||||
log,
|
|
||||||
config.Metrics,
|
|
||||||
peer.Metainfo.SegmentLoop,
|
|
||||||
)
|
|
||||||
peer.Services.Add(lifecycle.Item{
|
|
||||||
Name: "metrics",
|
|
||||||
Run: peer.Metrics.Chore.Run,
|
|
||||||
Close: peer.Metrics.Chore.Close,
|
|
||||||
})
|
|
||||||
peer.Debug.Server.Panel.Add(
|
|
||||||
debug.Cycle("Metrics", peer.Metrics.Chore.Loop))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return peer, nil
|
return peer, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,86 +0,0 @@
|
|||||||
// Copyright (C) 2019 Storj Labs, Inc.
|
|
||||||
// See LICENSE for copying information.
|
|
||||||
|
|
||||||
package metrics
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/spacemonkeygo/monkit/v3"
|
|
||||||
"github.com/zeebo/errs"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
|
|
||||||
"storj.io/common/sync2"
|
|
||||||
"storj.io/storj/satellite/metabase/segmentloop"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
// Error defines the metrics chore errors class.
|
|
||||||
Error = errs.Class("metrics")
|
|
||||||
mon = monkit.Package()
|
|
||||||
)
|
|
||||||
|
|
||||||
// Config contains configurable values for metrics collection.
|
|
||||||
type Config struct {
|
|
||||||
UseRangedLoop bool `help:"whether to use ranged loop instead of segment loop" default:"false"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// Chore implements the metrics chore.
|
|
||||||
//
|
|
||||||
// architecture: Chore
|
|
||||||
type Chore struct {
|
|
||||||
log *zap.Logger
|
|
||||||
config Config
|
|
||||||
Loop *sync2.Cycle
|
|
||||||
segmentLoop *segmentloop.Service
|
|
||||||
Counter *Counter
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewChore creates a new instance of the metrics chore.
|
|
||||||
func NewChore(log *zap.Logger, config Config, loop *segmentloop.Service) *Chore {
|
|
||||||
return &Chore{
|
|
||||||
log: log,
|
|
||||||
config: config,
|
|
||||||
// This chore monitors segment loop, so it's fine to use very small cycle time.
|
|
||||||
Loop: sync2.NewCycle(time.Nanosecond),
|
|
||||||
segmentLoop: loop,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Run starts the metrics chore.
|
|
||||||
func (chore *Chore) Run(ctx context.Context) (err error) {
|
|
||||||
defer mon.Task()(&ctx)(&err)
|
|
||||||
|
|
||||||
return chore.Loop.Run(ctx, func(ctx context.Context) (err error) {
|
|
||||||
defer mon.Task()(&ctx)(&err)
|
|
||||||
|
|
||||||
chore.Counter = NewCounter()
|
|
||||||
|
|
||||||
err = chore.segmentLoop.Monitor(ctx, chore.Counter)
|
|
||||||
if err != nil {
|
|
||||||
chore.log.Error("error joining segment loop", zap.Error(err))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
mon.IntVal("remote_dependent_object_count").Observe(chore.Counter.RemoteObjects)
|
|
||||||
mon.IntVal("inline_object_count").Observe(chore.Counter.InlineObjects)
|
|
||||||
|
|
||||||
mon.IntVal("total_inline_bytes").Observe(chore.Counter.TotalInlineBytes) //mon:locked
|
|
||||||
mon.IntVal("total_remote_bytes").Observe(chore.Counter.TotalRemoteBytes) //mon:locked
|
|
||||||
|
|
||||||
mon.IntVal("total_inline_segments").Observe(chore.Counter.TotalInlineSegments) //mon:locked
|
|
||||||
mon.IntVal("total_remote_segments").Observe(chore.Counter.TotalRemoteSegments) //mon:locked
|
|
||||||
|
|
||||||
// TODO move this metric to a place where objects are iterated e.g. tally
|
|
||||||
// or drop it completely as we can easily get this value with redash
|
|
||||||
// mon.IntVal("total_object_count").Observe(chore.Counter.ObjectCount)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close closes metrics chore.
|
|
||||||
func (chore *Chore) Close() error {
|
|
||||||
chore.Loop.Close()
|
|
||||||
return nil
|
|
||||||
}
|
|
@ -1,87 +0,0 @@
|
|||||||
// Copyright (C) 2019 Storj Labs, Inc.
|
|
||||||
// See LICENSE for copying information.
|
|
||||||
|
|
||||||
package metrics
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
"storj.io/common/uuid"
|
|
||||||
"storj.io/storj/satellite/metabase/segmentloop"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
remoteSegmentFunc = mon.Task()
|
|
||||||
inlineSegmentFunc = mon.Task()
|
|
||||||
)
|
|
||||||
|
|
||||||
// Counter implements the segment loop observer interface for data science metrics collection.
|
|
||||||
//
|
|
||||||
// architecture: Observer
|
|
||||||
type Counter struct {
|
|
||||||
// number of objects that has at least one remote segment
|
|
||||||
RemoteObjects int64
|
|
||||||
// number of objects that has all inline segments
|
|
||||||
InlineObjects int64
|
|
||||||
|
|
||||||
// encrypted size
|
|
||||||
TotalInlineBytes int64
|
|
||||||
// encrypted size
|
|
||||||
TotalRemoteBytes int64
|
|
||||||
|
|
||||||
TotalInlineSegments int64
|
|
||||||
TotalRemoteSegments int64
|
|
||||||
|
|
||||||
lastStreamID uuid.UUID
|
|
||||||
onlyInline bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewCounter instantiates a new counter to be subscribed to the metainfo loop.
|
|
||||||
func NewCounter() *Counter {
|
|
||||||
return &Counter{
|
|
||||||
onlyInline: true,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// LoopStarted is called at each start of a loop.
|
|
||||||
func (counter *Counter) LoopStarted(context.Context, segmentloop.LoopInfo) (err error) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// RemoteSegment increments the count for objects with remote segments.
|
|
||||||
func (counter *Counter) RemoteSegment(ctx context.Context, segment *segmentloop.Segment) error {
|
|
||||||
defer remoteSegmentFunc(&ctx)(nil) // method always returns nil
|
|
||||||
|
|
||||||
counter.onlyInline = false
|
|
||||||
|
|
||||||
counter.TotalRemoteBytes += int64(segment.EncryptedSize)
|
|
||||||
counter.TotalRemoteSegments++
|
|
||||||
|
|
||||||
if counter.lastStreamID.Compare(segment.StreamID) != 0 {
|
|
||||||
counter.RemoteObjects++
|
|
||||||
|
|
||||||
counter.lastStreamID = segment.StreamID
|
|
||||||
counter.onlyInline = true
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// InlineSegment increments the count for inline objects.
|
|
||||||
func (counter *Counter) InlineSegment(ctx context.Context, segment *segmentloop.Segment) error {
|
|
||||||
defer inlineSegmentFunc(&ctx)(nil) // method always returns nil
|
|
||||||
|
|
||||||
counter.TotalInlineBytes += int64(segment.EncryptedSize)
|
|
||||||
counter.TotalInlineSegments++
|
|
||||||
|
|
||||||
if counter.lastStreamID.Compare(segment.StreamID) != 0 {
|
|
||||||
if counter.onlyInline {
|
|
||||||
counter.InlineObjects++
|
|
||||||
} else {
|
|
||||||
counter.RemoteObjects++
|
|
||||||
}
|
|
||||||
|
|
||||||
counter.lastStreamID = segment.StreamID
|
|
||||||
counter.onlyInline = true
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
@ -1,109 +0,0 @@
|
|||||||
// Copyright (C) 2019 Storj Labs, Inc.
|
|
||||||
// See LICENSE for copying information.
|
|
||||||
|
|
||||||
package metrics_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"strconv"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
|
|
||||||
"storj.io/common/memory"
|
|
||||||
"storj.io/common/testcontext"
|
|
||||||
"storj.io/common/testrand"
|
|
||||||
"storj.io/storj/private/testplanet"
|
|
||||||
"storj.io/storj/satellite/metrics"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestCounterInlineAndRemote(t *testing.T) {
|
|
||||||
testplanet.Run(t, testplanet.Config{
|
|
||||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
|
||||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
||||||
satellite := planet.Satellites[0]
|
|
||||||
ul := planet.Uplinks[0]
|
|
||||||
|
|
||||||
segmentSize := 8 * memory.KiB
|
|
||||||
|
|
||||||
// upload 2 inline files
|
|
||||||
for i := 0; i < 2; i++ {
|
|
||||||
testData := testrand.Bytes(segmentSize / 8)
|
|
||||||
path := "/some/inline/path/" + strconv.Itoa(i)
|
|
||||||
err := ul.Upload(ctx, satellite, "bucket", path, testData)
|
|
||||||
require.NoError(t, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// upload 2 remote files with 1 segment
|
|
||||||
for i := 0; i < 2; i++ {
|
|
||||||
testData := testrand.Bytes(segmentSize)
|
|
||||||
path := "/some/remote/path/" + strconv.Itoa(i)
|
|
||||||
err := ul.Upload(ctx, satellite, "testbucket", path, testData)
|
|
||||||
require.NoError(t, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
counter := metrics.NewCounter()
|
|
||||||
err := satellite.Metabase.SegmentLoop.Join(ctx, counter)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
require.EqualValues(t, 2, counter.InlineObjects)
|
|
||||||
require.EqualValues(t, 2, counter.RemoteObjects)
|
|
||||||
|
|
||||||
require.EqualValues(t, 2, counter.TotalInlineSegments)
|
|
||||||
require.EqualValues(t, 2, counter.TotalRemoteSegments)
|
|
||||||
// 2 inline segments * (1024 + encryption overhead)
|
|
||||||
require.EqualValues(t, 2080, counter.TotalInlineBytes)
|
|
||||||
// 2 remote segments * (8192 + encryption overhead)
|
|
||||||
require.EqualValues(t, 29696, counter.TotalRemoteBytes)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestCounterInlineOnly(t *testing.T) {
|
|
||||||
testplanet.Run(t, testplanet.Config{
|
|
||||||
SatelliteCount: 1, UplinkCount: 1,
|
|
||||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
||||||
satellite := planet.Satellites[0]
|
|
||||||
ul := planet.Uplinks[0]
|
|
||||||
|
|
||||||
// upload 2 inline files
|
|
||||||
for i := 0; i < 2; i++ {
|
|
||||||
testData := testrand.Bytes(memory.KiB)
|
|
||||||
path := "/some/inline/path/" + strconv.Itoa(i)
|
|
||||||
err := ul.Upload(ctx, satellite, "bucket", path, testData)
|
|
||||||
require.NoError(t, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
counter := metrics.NewCounter()
|
|
||||||
err := satellite.Metabase.SegmentLoop.Join(ctx, counter)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
require.EqualValues(t, 2, counter.InlineObjects)
|
|
||||||
require.EqualValues(t, 0, counter.RemoteObjects)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestCounterRemoteOnly(t *testing.T) {
|
|
||||||
testplanet.Run(t, testplanet.Config{
|
|
||||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
|
||||||
Reconfigure: testplanet.Reconfigure{
|
|
||||||
Satellite: testplanet.MaxSegmentSize(150 * memory.KiB),
|
|
||||||
},
|
|
||||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
||||||
satellite := planet.Satellites[0]
|
|
||||||
ul := planet.Uplinks[0]
|
|
||||||
|
|
||||||
// upload 2 remote files with multiple segments
|
|
||||||
for i := 0; i < 2; i++ {
|
|
||||||
testData := testrand.Bytes(300 * memory.KiB)
|
|
||||||
path := "/some/remote/path/" + strconv.Itoa(i)
|
|
||||||
err := ul.Upload(ctx, satellite, "testbucket", path, testData)
|
|
||||||
require.NoError(t, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
counter := metrics.NewCounter()
|
|
||||||
err := satellite.Metabase.SegmentLoop.Join(ctx, counter)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
require.EqualValues(t, 0, counter.InlineObjects)
|
|
||||||
require.EqualValues(t, 2, counter.RemoteObjects)
|
|
||||||
})
|
|
||||||
}
|
|
@ -7,11 +7,20 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/spacemonkeygo/monkit/v3"
|
||||||
|
"github.com/zeebo/errs"
|
||||||
|
|
||||||
"storj.io/common/uuid"
|
"storj.io/common/uuid"
|
||||||
"storj.io/storj/satellite/metabase/rangedloop"
|
"storj.io/storj/satellite/metabase/rangedloop"
|
||||||
"storj.io/storj/satellite/metabase/segmentloop"
|
"storj.io/storj/satellite/metabase/segmentloop"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// Error defines the metrics chore errors class.
|
||||||
|
Error = errs.Class("metrics")
|
||||||
|
mon = monkit.Package()
|
||||||
|
)
|
||||||
|
|
||||||
// Observer implements the ranged segment loop observer interface for data
|
// Observer implements the ranged segment loop observer interface for data
|
||||||
// science metrics collection.
|
// science metrics collection.
|
||||||
type Observer struct {
|
type Observer struct {
|
||||||
|
@ -50,7 +50,6 @@ import (
|
|||||||
"storj.io/storj/satellite/metabase/zombiedeletion"
|
"storj.io/storj/satellite/metabase/zombiedeletion"
|
||||||
"storj.io/storj/satellite/metainfo"
|
"storj.io/storj/satellite/metainfo"
|
||||||
"storj.io/storj/satellite/metainfo/expireddeletion"
|
"storj.io/storj/satellite/metainfo/expireddeletion"
|
||||||
"storj.io/storj/satellite/metrics"
|
|
||||||
"storj.io/storj/satellite/nodeapiversion"
|
"storj.io/storj/satellite/nodeapiversion"
|
||||||
"storj.io/storj/satellite/nodeevents"
|
"storj.io/storj/satellite/nodeevents"
|
||||||
"storj.io/storj/satellite/oidc"
|
"storj.io/storj/satellite/oidc"
|
||||||
@ -209,8 +208,6 @@ type Config struct {
|
|||||||
|
|
||||||
GracefulExit gracefulexit.Config
|
GracefulExit gracefulexit.Config
|
||||||
|
|
||||||
Metrics metrics.Config
|
|
||||||
|
|
||||||
Compensation compensation.Config
|
Compensation compensation.Config
|
||||||
|
|
||||||
ProjectLimit accounting.ProjectLimitConfig
|
ProjectLimit accounting.ProjectLimitConfig
|
||||||
|
@ -156,16 +156,13 @@ func NewRangedLoop(log *zap.Logger, db DB, metabaseDB *metabase.DB, config *Conf
|
|||||||
{ // setup ranged loop
|
{ // setup ranged loop
|
||||||
observers := []rangedloop.Observer{
|
observers := []rangedloop.Observer{
|
||||||
rangedloop.NewLiveCountObserver(metabaseDB, config.RangedLoop.SuspiciousProcessedRatio, config.RangedLoop.AsOfSystemInterval),
|
rangedloop.NewLiveCountObserver(metabaseDB, config.RangedLoop.SuspiciousProcessedRatio, config.RangedLoop.AsOfSystemInterval),
|
||||||
|
peer.Metrics.Observer,
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.Audit.UseRangedLoop {
|
if config.Audit.UseRangedLoop {
|
||||||
observers = append(observers, peer.Audit.Observer)
|
observers = append(observers, peer.Audit.Observer)
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.Metrics.UseRangedLoop {
|
|
||||||
observers = append(observers, peer.Metrics.Observer)
|
|
||||||
}
|
|
||||||
|
|
||||||
if config.Tally.UseRangedLoop {
|
if config.Tally.UseRangedLoop {
|
||||||
observers = append(observers, peer.Accounting.NodeTallyObserver)
|
observers = append(observers, peer.Accounting.NodeTallyObserver)
|
||||||
}
|
}
|
||||||
|
3
scripts/testdata/satellite-config.yaml.lock
vendored
3
scripts/testdata/satellite-config.yaml.lock
vendored
@ -706,9 +706,6 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
|
|||||||
# how frequently to send up telemetry. Ignored for certain applications.
|
# how frequently to send up telemetry. Ignored for certain applications.
|
||||||
# metrics.interval: 1m0s
|
# metrics.interval: 1m0s
|
||||||
|
|
||||||
# whether to use ranged loop instead of segment loop
|
|
||||||
# metrics.use-ranged-loop: false
|
|
||||||
|
|
||||||
# path to log for oom notices
|
# path to log for oom notices
|
||||||
# monkit.hw.oomlog: /var/log/kern.log
|
# monkit.hw.oomlog: /var/log/kern.log
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user