satellite/metrics: create a metrics chore (#3263)

* add metrics counter and chore

* updates metrics observer interval release default and dev default to 15min

* add more specific check for remote pointers

* add Counter field to metrics chore, add counter tests

* rm redundant ObjectCount suffix

* make pointer check easier to read

* change metrics.Config.Interval to ChoreInterval

* rm unneeded var

* fix comment

* update satellite config lock
This commit is contained in:
Natalie Villasana 2019-10-16 14:08:33 -04:00 committed by Yingrong Zhao
parent e5099f31f3
commit 855fca003d
6 changed files with 276 additions and 0 deletions

View File

@ -41,6 +41,7 @@ import (
"storj.io/storj/satellite/mailservice"
"storj.io/storj/satellite/marketingweb"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/metrics"
"storj.io/storj/satellite/nodestats"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
@ -151,6 +152,10 @@ type SatelliteSystem struct {
Chore *gracefulexit.Chore
Endpoint *gracefulexit.Endpoint
}
Metrics struct {
Chore *metrics.Chore
}
}
// ID returns the ID of the Satellite system.
@ -359,6 +364,9 @@ func (planet *Planet) newSatellites(count int) ([]*SatelliteSystem, error) {
EndpointBatchSize: 100,
EndpointMaxFailures: 5,
},
Metrics: metrics.Config{
ChoreInterval: defaultInterval,
},
}
if planet.config.Reconfigure.Satellite != nil {
planet.config.Reconfigure.Satellite(log, i, &config)
@ -459,6 +467,9 @@ func createNewSystem(log *zap.Logger, peer *satellite.Peer, api *satellite.API)
system.GracefulExit.Chore = peer.GracefulExit.Chore
system.GracefulExit.Endpoint = api.GracefulExit.Endpoint
system.Metrics.Chore = peer.Metrics.Chore
return system
}

View File

@ -0,0 +1,76 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package metrics
import (
"context"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/internal/sync2"
"storj.io/storj/satellite/metainfo"
)
var (
// Error defines the metrics chore errors class.
Error = errs.Class("metrics chore error")
mon = monkit.Package()
)
// Config contains configurable values for metrics collection.
type Config struct {
ChoreInterval time.Duration `help:"the time between each metrics chore run" releaseDefault:"15m" devDefault:"15m"`
}
// Chore implements the metrics chore.
//
// architecture: Chore
type Chore struct {
log *zap.Logger
config Config
Loop sync2.Cycle
metainfoLoop *metainfo.Loop
Counter *Counter
}
// NewChore creates a new instance of the metrics chore.
func NewChore(log *zap.Logger, config Config, loop *metainfo.Loop) *Chore {
return &Chore{
log: log,
config: config,
Loop: *sync2.NewCycle(config.ChoreInterval),
metainfoLoop: loop,
}
}
// Run starts the metrics chore.
func (chore *Chore) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
return chore.Loop.Run(ctx, func(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
chore.Counter = NewCounter()
err = chore.metainfoLoop.Join(ctx, chore.Counter)
if err != nil {
chore.log.Error("error joining metainfoloop", zap.Error(err))
return nil
}
mon.IntVal("remote_dependent_object_count").Observe(chore.Counter.RemoteDependent)
mon.IntVal("inline_object_count").Observe(chore.Counter.Inline)
mon.IntVal("total_object_count").Observe(chore.Counter.Total)
return nil
})
}
// Close closes metrics chore.
func (chore *Chore) Close() error {
chore.Loop.Close()
return nil
}

View File

@ -0,0 +1,56 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package metrics
import (
"context"
"github.com/gogo/protobuf/proto"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/rpc/rpcstatus"
"storj.io/storj/satellite/metainfo"
)
// Counter implements the metainfo loop observer interface for data science metrics collection.
//
// architecture: Observer
type Counter struct {
RemoteDependent int64
Inline int64
Total int64
}
// NewCounter instantiates a new counter to be subscribed to the metainfo loop.
func NewCounter() *Counter {
return &Counter{}
}
// Object increments counts for inline objects and remote dependent objects.
func (counter *Counter) Object(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error) {
streamMeta := &pb.StreamMeta{}
err = proto.Unmarshal(pointer.Metadata, streamMeta)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
if streamMeta.NumberOfSegments == 1 && pointer.Type == pb.Pointer_INLINE {
counter.Inline++
} else {
counter.RemoteDependent++
}
counter.Total++
return nil
}
// RemoteSegment returns nil because counter does not interact with remote segments this way for now.
func (counter *Counter) RemoteSegment(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error) {
return nil
}
// InlineSegment returns nil because counter does not interact with inline segments this way for now.
func (counter *Counter) InlineSegment(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error) {
return nil
}

View File

@ -0,0 +1,108 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package metrics_test
import (
"testing"
"github.com/stretchr/testify/require"
"storj.io/storj/internal/memory"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testplanet"
"storj.io/storj/internal/testrand"
"storj.io/storj/uplink"
)
func TestCounterInlineAndRemote(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 5, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
ul := planet.Uplinks[0]
metricsChore := satellite.Metrics.Chore
metricsChore.Loop.Pause()
segmentSize := 8 * memory.KiB
// upload 2 inline files
for i := 0; i < 2; i++ {
testData := testrand.Bytes(segmentSize / 8)
path := "/some/inline/path/" + string(i)
err := ul.Upload(ctx, satellite, "bucket", path, testData)
require.NoError(t, err)
}
// upload 2 remote files with 1 segment
for i := 0; i < 2; i++ {
testData := testrand.Bytes(segmentSize)
path := "/some/remote/path/" + string(i)
err := ul.UploadWithConfig(ctx, satellite, &uplink.RSConfig{
MinThreshold: 3,
RepairThreshold: 4,
SuccessThreshold: 5,
MaxThreshold: 5,
}, "testbucket", path, testData)
require.NoError(t, err)
}
metricsChore.Loop.TriggerWait()
require.EqualValues(t, 2, metricsChore.Counter.Inline)
require.EqualValues(t, 2, metricsChore.Counter.RemoteDependent)
require.EqualValues(t, 4, metricsChore.Counter.Total)
})
}
func TestCounterInlineOnly(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 5, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
ul := planet.Uplinks[0]
metricsChore := satellite.Metrics.Chore
metricsChore.Loop.Pause()
// upload 2 inline files
for i := 0; i < 2; i++ {
testData := testrand.Bytes(memory.KiB)
path := "/some/inline/path/" + string(i)
err := ul.Upload(ctx, satellite, "bucket", path, testData)
require.NoError(t, err)
}
metricsChore.Loop.TriggerWait()
require.EqualValues(t, 2, metricsChore.Counter.Inline)
require.EqualValues(t, 0, metricsChore.Counter.RemoteDependent)
require.EqualValues(t, 2, metricsChore.Counter.Total)
})
}
func TestCounterRemoteOnly(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 5, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
ul := planet.Uplinks[0]
metricsChore := satellite.Metrics.Chore
metricsChore.Loop.Pause()
// upload 2 remote files with 1 segment
for i := 0; i < 2; i++ {
testData := testrand.Bytes(8 * memory.KiB)
path := "/some/remote/path/" + string(i)
err := ul.UploadWithConfig(ctx, satellite, &uplink.RSConfig{
MinThreshold: 3,
RepairThreshold: 4,
SuccessThreshold: 5,
MaxThreshold: 5,
}, "testbucket", path, testData)
require.NoError(t, err)
}
metricsChore.Loop.TriggerWait()
require.EqualValues(t, 0, metricsChore.Counter.Inline)
require.EqualValues(t, 2, metricsChore.Counter.RemoteDependent)
require.EqualValues(t, 2, metricsChore.Counter.Total)
})
}

View File

@ -45,6 +45,7 @@ import (
"storj.io/storj/satellite/mailservice/simulate"
"storj.io/storj/satellite/marketingweb"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/metrics"
"storj.io/storj/satellite/nodestats"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
@ -134,6 +135,8 @@ type Config struct {
Version version.Config
GracefulExit gracefulexit.Config
Metrics metrics.Config
}
// Peer is the satellite
@ -237,6 +240,10 @@ type Peer struct {
Endpoint *gracefulexit.Endpoint
Chore *gracefulexit.Chore
}
Metrics struct {
Chore *metrics.Chore
}
}
// New creates a new satellite
@ -648,6 +655,14 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo
pb.DRPCRegisterSatelliteGracefulExit(peer.Server.DRPC(), peer.GracefulExit.Endpoint.DRPC())
}
{ // setup metrics service
peer.Metrics.Chore = metrics.NewChore(
peer.Log.Named("metrics"),
config.Metrics,
peer.Metainfo.Loop,
)
}
return peer, nil
}
@ -704,6 +719,9 @@ func (peer *Peer) Run(ctx context.Context) (err error) {
group.Go(func() error {
return errs2.IgnoreCanceled(peer.GracefulExit.Chore.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Metrics.Chore.Run(ctx))
})
return group.Wait()
}
@ -719,6 +737,10 @@ func (peer *Peer) Close() error {
errlist.Add(peer.Server.Close())
}
if peer.Metrics.Chore != nil {
errlist.Add(peer.Metrics.Chore.Close())
}
if peer.GracefulExit.Chore != nil {
errlist.Add(peer.GracefulExit.Chore.Close())
}

View File

@ -238,6 +238,9 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
# application suffix
# metrics.app-suffix: -release
# the time between each metrics chore run
# metrics.chore-interval: 15m0s
# instance id prefix
# metrics.instance-prefix: ""