diff --git a/cmd/bootstrap/main.go b/cmd/bootstrap/main.go index 45b371a1c..c2ddd6e97 100644 --- a/cmd/bootstrap/main.go +++ b/cmd/bootstrap/main.go @@ -9,6 +9,7 @@ import ( "path/filepath" "github.com/spf13/cobra" + "go.uber.org/zap" "storj.io/storj/internal/fpath" "storj.io/storj/pkg/cfgstruct" @@ -71,7 +72,11 @@ func init() { } func cmdRun(cmd *cobra.Command, args []string) (err error) { - return cfg.Server.Run(process.Ctx(cmd), nil, cfg.Kademlia) + ctx := process.Ctx(cmd) + if err := process.InitMetricsWithCertPath(ctx, nil, cfg.Identity.CertPath); err != nil { + zap.S().Errorf("Failed to initialize telemetry batcher: %+v", err) + } + return cfg.Server.Run(ctx, nil, cfg.Kademlia) } func cmdSetup(cmd *cobra.Command, args []string) (err error) { diff --git a/cmd/captplanet/run.go b/cmd/captplanet/run.go index 0447aa657..7e5389144 100644 --- a/cmd/captplanet/run.go +++ b/cmd/captplanet/run.go @@ -11,6 +11,7 @@ import ( "github.com/alicebob/miniredis" "github.com/spf13/cobra" "github.com/zeebo/errs" + "go.uber.org/zap" "storj.io/storj/pkg/auth/grpcauth" "storj.io/storj/pkg/cfgstruct" @@ -42,6 +43,10 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) { ctx := process.Ctx(cmd) defer mon.Task()(&ctx)(&err) + if err := process.InitMetrics(ctx, nil, ""); err != nil { + zap.S().Errorf("Failed to initialize telemetry batcher: %+v", err) + } + errch := make(chan error, len(runCfg.StorageNodes)+2) // start mini redis m := miniredis.NewMiniRedis() diff --git a/cmd/satellite/main.go b/cmd/satellite/main.go index 54db7b04e..31721550e 100644 --- a/cmd/satellite/main.go +++ b/cmd/satellite/main.go @@ -14,6 +14,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/spf13/cobra" "github.com/zeebo/errs" + "go.uber.org/zap" "storj.io/storj/internal/fpath" "storj.io/storj/pkg/audit" @@ -127,6 +128,9 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) { if err != nil { return errs.New("Error creating tables for master database on satellite: %+v", err) } + if err := process.InitMetricsWithCertPath(ctx, nil, runCfg.Identity.CertPath); err != nil { + zap.S().Errorf("Failed to initialize telemetry batcher: %+v", err) + } //nolint ignoring context rules to not create cyclic dependency, will be removed later ctx = context.WithValue(ctx, "masterdb", database) diff --git a/cmd/storagenode/main.go b/cmd/storagenode/main.go index 6001d52c1..ccac15d57 100644 --- a/cmd/storagenode/main.go +++ b/cmd/storagenode/main.go @@ -118,8 +118,12 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) { } else { zap.S().Info("Operator wallet: ", operatorConfig.Wallet) } + ctx := process.Ctx(cmd) + if err := process.InitMetricsWithCertPath(ctx, nil, runCfg.Identity.CertPath); err != nil { + zap.S().Error("Failed to initialize telemetry batcher:", err) + } - return runCfg.Server.Run(process.Ctx(cmd), nil, runCfg.Kademlia, runCfg.Storage) + return runCfg.Server.Run(ctx, nil, runCfg.Kademlia, runCfg.Storage) } func cmdSetup(cmd *cobra.Command, args []string) (err error) { diff --git a/cmd/uplink/cmd/mount.go b/cmd/uplink/cmd/mount.go index abcf8f09f..f3bf5bcd7 100644 --- a/cmd/uplink/cmd/mount.go +++ b/cmd/uplink/cmd/mount.go @@ -51,6 +51,10 @@ func mountBucket(cmd *cobra.Command, args []string) (err error) { return err } + if err := process.InitMetricsWithCertPath(ctx, nil, cfg.Identity.CertPath); err != nil { + zap.S().Errorf("Failed to initialize telemetry batcher: %v", err) + } + src, err := fpath.New(args[0]) if err != nil { return err diff --git a/cmd/uplink/cmd/run.go b/cmd/uplink/cmd/run.go index 445ba4130..f50065be6 100644 --- a/cmd/uplink/cmd/run.go +++ b/cmd/uplink/cmd/run.go @@ -8,6 +8,7 @@ import ( "net" "github.com/spf13/cobra" + "go.uber.org/zap" "storj.io/storj/pkg/process" "storj.io/storj/pkg/storj" @@ -51,6 +52,9 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) { return fmt.Errorf("Failed to contact Satellite.\n"+ "Perhaps your configuration is invalid?\n%s", err) } + if err := process.InitMetricsWithCertPath(ctx, nil, cfg.Identity.CertPath); err != nil { + zap.S().Errorf("Failed to initialize telemetry batcher: %v", err) + } - return cfg.Run(process.Ctx(cmd)) + return cfg.Run(ctx) } diff --git a/pkg/identity/identity.go b/pkg/identity/identity.go index 8222647a7..97e1cb535 100644 --- a/pkg/identity/identity.go +++ b/pkg/identity/identity.go @@ -158,6 +158,27 @@ func PeerIdentityFromContext(ctx context.Context) (*PeerIdentity, error) { return PeerIdentityFromPeer(p) } +// NodeIDFromCertPath loads a node ID from a certificate file path +func NodeIDFromCertPath(certPath string) (storj.NodeID, error) { + certBytes, err := ioutil.ReadFile(certPath) + if err != nil { + return storj.NodeID{}, err + } + return NodeIDFromPEM(certBytes) +} + +// NodeIDFromPEM loads a node ID from certificate bytes +func NodeIDFromPEM(pemBytes []byte) (storj.NodeID, error) { + chain, err := DecodeAndParseChainPEM(pemBytes) + if err != nil { + return storj.NodeID{}, Error.New("invalid identity certificate") + } + if len(chain) < peertls.CAIndex+1 { + return storj.NodeID{}, Error.New("no CA in identity certificate") + } + return NodeIDFromKey(chain[peertls.CAIndex].PublicKey) +} + // NodeIDFromKey hashes a public key and creates a node ID from it func NodeIDFromKey(k crypto.PublicKey) (storj.NodeID, error) { if ek, ok := k.(*ecdsa.PublicKey); ok { diff --git a/pkg/process/exec_conf.go b/pkg/process/exec_conf.go index d87618e2b..162d3366c 100644 --- a/pkg/process/exec_conf.go +++ b/pkg/process/exec_conf.go @@ -22,8 +22,6 @@ import ( "github.com/spf13/viper" "go.uber.org/zap" monkit "gopkg.in/spacemonkeygo/monkit.v2" - - "storj.io/storj/pkg/telemetry" ) // ExecuteWithConfig runs a Cobra command with the provided default config @@ -203,12 +201,6 @@ func cleanup(cmd *cobra.Command) { logger.Sugar().Infof("Invalid configuration file value for key: %s", key) } - err = initMetrics(ctx, monkit.Default, - telemetry.DefaultInstanceID()) - if err != nil { - logger.Error("failed to configure telemetry", zap.Error(err)) - } - err = initDebug(logger, monkit.Default) if err != nil { logger.Error("failed to start debug endpoints", zap.Error(err)) diff --git a/pkg/process/metrics.go b/pkg/process/metrics.go index 8462b7fc2..886fcba75 100644 --- a/pkg/process/metrics.go +++ b/pkg/process/metrics.go @@ -11,9 +11,11 @@ import ( hw "github.com/jtolds/monkit-hw" "github.com/zeebo/admission/admproto" + "go.uber.org/zap" "gopkg.in/spacemonkeygo/monkit.v2" "gopkg.in/spacemonkeygo/monkit.v2/environment" + "storj.io/storj/pkg/identity" "storj.io/storj/pkg/telemetry" ) @@ -28,11 +30,19 @@ var ( "application suffix") ) -func initMetrics(ctx context.Context, r *monkit.Registry, instanceID string) ( +// InitMetrics initializes telemetry reporting. Makes a telemetry.Client and calls +// its Run() method in a goroutine. +func InitMetrics(ctx context.Context, r *monkit.Registry, instanceID string) ( err error) { if *metricCollector == "" || *metricInterval == 0 { return Error.New("telemetry disabled") } + if r == nil { + r = monkit.Default + } + if instanceID == "" { + instanceID = telemetry.DefaultInstanceID() + } c, err := telemetry.NewClient(*metricCollector, telemetry.ClientOpts{ Interval: *metricInterval, Application: *metricApp + *metricAppSuffix, @@ -48,3 +58,17 @@ func initMetrics(ctx context.Context, r *monkit.Registry, instanceID string) ( go c.Run(ctx) return nil } + +// InitMetricsWithCertPath initializes telemetry reporting, using the node ID +// corresponding to the given certificate as the telemetry instance ID. +func InitMetricsWithCertPath(ctx context.Context, r *monkit.Registry, certPath string) error { + var metricsID string + nodeID, err := identity.NodeIDFromCertPath(certPath) + if err != nil { + zap.S().Errorf("Could not read identity for telemetry setup: %v", err) + metricsID = "" // InitMetrics() will fill in a default value + } else { + metricsID = nodeID.String() + } + return InitMetrics(ctx, r, metricsID) +} diff --git a/pkg/telemetry/client.go b/pkg/telemetry/client.go index 735a6d7b7..21dd20337 100644 --- a/pkg/telemetry/client.go +++ b/pkg/telemetry/client.go @@ -103,6 +103,7 @@ func NewClient(remoteAddr string, opts ClientOpts) (rv *Client, err error) { // Run calls Report roughly every Interval func (c *Client) Run(ctx context.Context) { + zap.S().Debugf("Initialized telemetry batcher with id = %q", c.opts.InstanceId) for { time.Sleep(jitter(c.interval)) if ctx.Err() != nil {