use Node ID for metrics ID (#1052)
This change removes automatic metrics reporting for everything going through process.Exec(), and re-adds metrics reporting for those commands which are expected to be long-lived. Other commands (which may have been intermittently sending metrics before this, if they ran unusually long) will no longer send any metrics. For commands where it makes sense, a node ID is used as the metrics ID.
This commit is contained in:
parent
a63abf8fab
commit
248ee6438f
@ -9,6 +9,7 @@ import (
|
||||
"path/filepath"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/storj/internal/fpath"
|
||||
"storj.io/storj/pkg/cfgstruct"
|
||||
@ -71,7 +72,11 @@ func init() {
|
||||
}
|
||||
|
||||
func cmdRun(cmd *cobra.Command, args []string) (err error) {
|
||||
return cfg.Server.Run(process.Ctx(cmd), nil, cfg.Kademlia)
|
||||
ctx := process.Ctx(cmd)
|
||||
if err := process.InitMetricsWithCertPath(ctx, nil, cfg.Identity.CertPath); err != nil {
|
||||
zap.S().Errorf("Failed to initialize telemetry batcher: %+v", err)
|
||||
}
|
||||
return cfg.Server.Run(ctx, nil, cfg.Kademlia)
|
||||
}
|
||||
|
||||
func cmdSetup(cmd *cobra.Command, args []string) (err error) {
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
"github.com/alicebob/miniredis"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/storj/pkg/auth/grpcauth"
|
||||
"storj.io/storj/pkg/cfgstruct"
|
||||
@ -42,6 +43,10 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
|
||||
ctx := process.Ctx(cmd)
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if err := process.InitMetrics(ctx, nil, ""); err != nil {
|
||||
zap.S().Errorf("Failed to initialize telemetry batcher: %+v", err)
|
||||
}
|
||||
|
||||
errch := make(chan error, len(runCfg.StorageNodes)+2)
|
||||
// start mini redis
|
||||
m := miniredis.NewMiniRedis()
|
||||
|
@ -14,6 +14,7 @@ import (
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/storj/internal/fpath"
|
||||
"storj.io/storj/pkg/audit"
|
||||
@ -127,6 +128,9 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
|
||||
if err != nil {
|
||||
return errs.New("Error creating tables for master database on satellite: %+v", err)
|
||||
}
|
||||
if err := process.InitMetricsWithCertPath(ctx, nil, runCfg.Identity.CertPath); err != nil {
|
||||
zap.S().Errorf("Failed to initialize telemetry batcher: %+v", err)
|
||||
}
|
||||
|
||||
//nolint ignoring context rules to not create cyclic dependency, will be removed later
|
||||
ctx = context.WithValue(ctx, "masterdb", database)
|
||||
|
@ -118,8 +118,12 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
|
||||
} else {
|
||||
zap.S().Info("Operator wallet: ", operatorConfig.Wallet)
|
||||
}
|
||||
ctx := process.Ctx(cmd)
|
||||
if err := process.InitMetricsWithCertPath(ctx, nil, runCfg.Identity.CertPath); err != nil {
|
||||
zap.S().Error("Failed to initialize telemetry batcher:", err)
|
||||
}
|
||||
|
||||
return runCfg.Server.Run(process.Ctx(cmd), nil, runCfg.Kademlia, runCfg.Storage)
|
||||
return runCfg.Server.Run(ctx, nil, runCfg.Kademlia, runCfg.Storage)
|
||||
}
|
||||
|
||||
func cmdSetup(cmd *cobra.Command, args []string) (err error) {
|
||||
|
@ -51,6 +51,10 @@ func mountBucket(cmd *cobra.Command, args []string) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := process.InitMetricsWithCertPath(ctx, nil, cfg.Identity.CertPath); err != nil {
|
||||
zap.S().Errorf("Failed to initialize telemetry batcher: %v", err)
|
||||
}
|
||||
|
||||
src, err := fpath.New(args[0])
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"net"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/storj/pkg/process"
|
||||
"storj.io/storj/pkg/storj"
|
||||
@ -51,6 +52,9 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
|
||||
return fmt.Errorf("Failed to contact Satellite.\n"+
|
||||
"Perhaps your configuration is invalid?\n%s", err)
|
||||
}
|
||||
if err := process.InitMetricsWithCertPath(ctx, nil, cfg.Identity.CertPath); err != nil {
|
||||
zap.S().Errorf("Failed to initialize telemetry batcher: %v", err)
|
||||
}
|
||||
|
||||
return cfg.Run(process.Ctx(cmd))
|
||||
return cfg.Run(ctx)
|
||||
}
|
||||
|
@ -158,6 +158,27 @@ func PeerIdentityFromContext(ctx context.Context) (*PeerIdentity, error) {
|
||||
return PeerIdentityFromPeer(p)
|
||||
}
|
||||
|
||||
// NodeIDFromCertPath loads a node ID from a certificate file path
|
||||
func NodeIDFromCertPath(certPath string) (storj.NodeID, error) {
|
||||
certBytes, err := ioutil.ReadFile(certPath)
|
||||
if err != nil {
|
||||
return storj.NodeID{}, err
|
||||
}
|
||||
return NodeIDFromPEM(certBytes)
|
||||
}
|
||||
|
||||
// NodeIDFromPEM loads a node ID from certificate bytes
|
||||
func NodeIDFromPEM(pemBytes []byte) (storj.NodeID, error) {
|
||||
chain, err := DecodeAndParseChainPEM(pemBytes)
|
||||
if err != nil {
|
||||
return storj.NodeID{}, Error.New("invalid identity certificate")
|
||||
}
|
||||
if len(chain) < peertls.CAIndex+1 {
|
||||
return storj.NodeID{}, Error.New("no CA in identity certificate")
|
||||
}
|
||||
return NodeIDFromKey(chain[peertls.CAIndex].PublicKey)
|
||||
}
|
||||
|
||||
// NodeIDFromKey hashes a public key and creates a node ID from it
|
||||
func NodeIDFromKey(k crypto.PublicKey) (storj.NodeID, error) {
|
||||
if ek, ok := k.(*ecdsa.PublicKey); ok {
|
||||
|
@ -22,8 +22,6 @@ import (
|
||||
"github.com/spf13/viper"
|
||||
"go.uber.org/zap"
|
||||
monkit "gopkg.in/spacemonkeygo/monkit.v2"
|
||||
|
||||
"storj.io/storj/pkg/telemetry"
|
||||
)
|
||||
|
||||
// ExecuteWithConfig runs a Cobra command with the provided default config
|
||||
@ -203,12 +201,6 @@ func cleanup(cmd *cobra.Command) {
|
||||
logger.Sugar().Infof("Invalid configuration file value for key: %s", key)
|
||||
}
|
||||
|
||||
err = initMetrics(ctx, monkit.Default,
|
||||
telemetry.DefaultInstanceID())
|
||||
if err != nil {
|
||||
logger.Error("failed to configure telemetry", zap.Error(err))
|
||||
}
|
||||
|
||||
err = initDebug(logger, monkit.Default)
|
||||
if err != nil {
|
||||
logger.Error("failed to start debug endpoints", zap.Error(err))
|
||||
|
@ -11,9 +11,11 @@ import (
|
||||
|
||||
hw "github.com/jtolds/monkit-hw"
|
||||
"github.com/zeebo/admission/admproto"
|
||||
"go.uber.org/zap"
|
||||
"gopkg.in/spacemonkeygo/monkit.v2"
|
||||
"gopkg.in/spacemonkeygo/monkit.v2/environment"
|
||||
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/telemetry"
|
||||
)
|
||||
|
||||
@ -28,11 +30,19 @@ var (
|
||||
"application suffix")
|
||||
)
|
||||
|
||||
func initMetrics(ctx context.Context, r *monkit.Registry, instanceID string) (
|
||||
// InitMetrics initializes telemetry reporting. Makes a telemetry.Client and calls
|
||||
// its Run() method in a goroutine.
|
||||
func InitMetrics(ctx context.Context, r *monkit.Registry, instanceID string) (
|
||||
err error) {
|
||||
if *metricCollector == "" || *metricInterval == 0 {
|
||||
return Error.New("telemetry disabled")
|
||||
}
|
||||
if r == nil {
|
||||
r = monkit.Default
|
||||
}
|
||||
if instanceID == "" {
|
||||
instanceID = telemetry.DefaultInstanceID()
|
||||
}
|
||||
c, err := telemetry.NewClient(*metricCollector, telemetry.ClientOpts{
|
||||
Interval: *metricInterval,
|
||||
Application: *metricApp + *metricAppSuffix,
|
||||
@ -48,3 +58,17 @@ func initMetrics(ctx context.Context, r *monkit.Registry, instanceID string) (
|
||||
go c.Run(ctx)
|
||||
return nil
|
||||
}
|
||||
|
||||
// InitMetricsWithCertPath initializes telemetry reporting, using the node ID
|
||||
// corresponding to the given certificate as the telemetry instance ID.
|
||||
func InitMetricsWithCertPath(ctx context.Context, r *monkit.Registry, certPath string) error {
|
||||
var metricsID string
|
||||
nodeID, err := identity.NodeIDFromCertPath(certPath)
|
||||
if err != nil {
|
||||
zap.S().Errorf("Could not read identity for telemetry setup: %v", err)
|
||||
metricsID = "" // InitMetrics() will fill in a default value
|
||||
} else {
|
||||
metricsID = nodeID.String()
|
||||
}
|
||||
return InitMetrics(ctx, r, metricsID)
|
||||
}
|
||||
|
@ -103,6 +103,7 @@ func NewClient(remoteAddr string, opts ClientOpts) (rv *Client, err error) {
|
||||
|
||||
// Run calls Report roughly every Interval
|
||||
func (c *Client) Run(ctx context.Context) {
|
||||
zap.S().Debugf("Initialized telemetry batcher with id = %q", c.opts.InstanceId)
|
||||
for {
|
||||
time.Sleep(jitter(c.interval))
|
||||
if ctx.Err() != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user