From 22b1fe4e21e4a0ee122ec8f03106ce25cbc667a1 Mon Sep 17 00:00:00 2001 From: JT Olio Date: Wed, 30 May 2018 08:03:44 -0600 Subject: [PATCH] pkg/process: add pkg/telemetry plumbing (#47) * pkg/process: add pkg/telemetry plumbing * pkg/process: add debug endpoints * fix linting --- examples/metric-receiver/main.go | 35 ++++++++++++++++ examples/metric-sender/main.go | 21 ++++++++++ pkg/overlay/service.go | 3 ++ pkg/process/debug.go | 43 ++++++++++++++++++++ pkg/process/func.go | 27 +++++++++++++ pkg/process/metrics.go | 47 ++++++++++++++++++++++ pkg/process/process.go | 69 +++++++++++++++++++++++++++----- pkg/telemetry/client.go | 18 +-------- pkg/telemetry/utils.go | 22 ++++++++++ 9 files changed, 258 insertions(+), 27 deletions(-) create mode 100644 examples/metric-receiver/main.go create mode 100644 examples/metric-sender/main.go create mode 100644 pkg/process/debug.go create mode 100644 pkg/process/func.go create mode 100644 pkg/process/metrics.go diff --git a/examples/metric-receiver/main.go b/examples/metric-receiver/main.go new file mode 100644 index 000000000..13b956766 --- /dev/null +++ b/examples/metric-receiver/main.go @@ -0,0 +1,35 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package main + +import ( + "context" + "flag" + "fmt" + + "storj.io/storj/pkg/process" + "storj.io/storj/pkg/telemetry" +) + +var ( + addr = flag.String("addr", ":9000", "address to listen for metrics on") +) + +func main() { + process.Must(process.Main(process.ServiceFunc(run))) +} + +func run(ctx context.Context) error { + s, err := telemetry.Listen(*addr) + if err != nil { + return err + } + defer s.Close() + fmt.Printf("listening on %s\n", s.Addr()) + return s.Serve(ctx, telemetry.HandlerFunc(handle)) +} + +func handle(application, instance string, key []byte, val float64) { + fmt.Printf("%s %s %s %v\n", application, instance, string(key), val) +} diff --git a/examples/metric-sender/main.go b/examples/metric-sender/main.go new file mode 100644 index 000000000..b8497a7ef --- /dev/null +++ b/examples/metric-sender/main.go @@ -0,0 +1,21 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package main + +import ( + "context" + "flag" + + "storj.io/storj/pkg/process" +) + +func main() { + flag.Set("metrics.interval", "1s") + process.Must(process.Main(process.ServiceFunc(run))) +} + +func run(ctx context.Context) error { + // just go to sleep and let the background telemetry start sending + select {} +} diff --git a/pkg/overlay/service.go b/pkg/overlay/service.go index 421b1043f..2315ab93c 100644 --- a/pkg/overlay/service.go +++ b/pkg/overlay/service.go @@ -101,3 +101,6 @@ func (s *Service) SetMetricHandler(m *monkit.Registry) error { s.metrics = m return nil } + +// InstanceID implements Service.InstanceID +func (s *Service) InstanceID() string { return "" } diff --git a/pkg/process/debug.go b/pkg/process/debug.go new file mode 100644 index 000000000..c67bead82 --- /dev/null +++ b/pkg/process/debug.go @@ -0,0 +1,43 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package process + +import ( + "context" + "net" + "net/http" + "net/http/pprof" + + "go.uber.org/zap" + monkit "gopkg.in/spacemonkeygo/monkit.v2" + "gopkg.in/spacemonkeygo/monkit.v2/present" +) + +func init() { + // zero out the http.DefaultServeMux net/http/pprof so unhelpfully + // side-effected. + *http.DefaultServeMux = http.ServeMux{} +} + +func initDebug(ctx context.Context, logger *zap.Logger, r *monkit.Registry) ( + err error) { + var mux http.ServeMux + mux.HandleFunc("/debug/pprof/", pprof.Index) + mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + mux.HandleFunc("/debug/pprof/profile", pprof.Profile) + mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + mux.HandleFunc("/debug/pprof/trace", pprof.Trace) + mux.Handle("/mon/", http.StripPrefix("/mon", present.HTTP(r))) + ln, err := net.Listen("tcp", "localhost:0") + if err != nil { + return err + } + go func() { + err := (&http.Server{Handler: &mux}).Serve(ln) + if err != nil { + logger.Error("debug server died", zap.Error(err)) + } + }() + return nil +} diff --git a/pkg/process/func.go b/pkg/process/func.go new file mode 100644 index 000000000..f96f555bd --- /dev/null +++ b/pkg/process/func.go @@ -0,0 +1,27 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package process + +import ( + "context" + + "go.uber.org/zap" + monkit "gopkg.in/spacemonkeygo/monkit.v2" +) + +// ServiceFunc allows one to implement a Service in terms of simply the Process +// method +type ServiceFunc func(context.Context) error + +// Process implements the Service interface and simply calls f +func (f ServiceFunc) Process(ctx context.Context) error { return f(ctx) } + +// SetLogger implements the Service interface but is a no-op +func (f ServiceFunc) SetLogger(*zap.Logger) error { return nil } + +// SetMetricHandler implements the Service interface but is a no-op +func (f ServiceFunc) SetMetricHandler(*monkit.Registry) error { return nil } + +// InstanceID implements the Service interface and expects default behavior +func (f ServiceFunc) InstanceID() string { return "" } diff --git a/pkg/process/metrics.go b/pkg/process/metrics.go new file mode 100644 index 000000000..4e9386351 --- /dev/null +++ b/pkg/process/metrics.go @@ -0,0 +1,47 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package process + +import ( + "context" + "flag" + "os" + "path/filepath" + + "github.com/zeebo/admission/admproto" + "gopkg.in/spacemonkeygo/monkit.v2" + "gopkg.in/spacemonkeygo/monkit.v2/environment" + "storj.io/storj/pkg/telemetry" +) + +var ( + metricInterval = flag.Duration("metrics.interval", telemetry.DefaultInterval, + "how frequently to send up telemetry") + metricCollector = flag.String("metrics.addr", "collector.storj.io:9000", + "address to send telemetry to") + metricApp = flag.String("metrics.app", filepath.Base(os.Args[0]), + "application name for telemetry identification") + metricAppSuffix = flag.String("metrics.app_suffix", "-dev", + "application suffix") +) + +func initMetrics(ctx context.Context, r *monkit.Registry, instanceID string) ( + err error) { + if *metricCollector == "" || *metricInterval == 0 { + return Error.New("telemetry disabled") + } + c, err := telemetry.NewClient(*metricCollector, telemetry.ClientOpts{ + Interval: *metricInterval, + Application: *metricApp + *metricAppSuffix, + Instance: instanceID, + Registry: r, + FloatEncoding: admproto.Float32Encoding, + }) + if err != nil { + return err + } + environment.Register(r) + go c.Run(ctx) + return nil +} diff --git a/pkg/process/process.go b/pkg/process/process.go index dd6e99a06..f50dd7f4d 100644 --- a/pkg/process/process.go +++ b/pkg/process/process.go @@ -6,45 +6,92 @@ package process import ( "context" "flag" + "log" - "github.com/google/uuid" + "github.com/spacemonkeygo/flagfile" + "github.com/zeebo/errs" "go.uber.org/zap" monkit "gopkg.in/spacemonkeygo/monkit.v2" + "storj.io/storj/pkg/telemetry" "storj.io/storj/pkg/utils" ) +var ( + logDisposition = flag.String("log.disp", "prod", + "switch to 'dev' to get more output") + + // Error is a process error class + Error = errs.Class("ProcessError") +) + // ID is the type used to specify a ID key in the process context type ID string // Service defines the interface contract for all Storj services type Service interface { + // Process should run the program Process(context.Context) error + SetLogger(*zap.Logger) error SetMetricHandler(*monkit.Registry) error + + // InstanceID should return a server or process instance identifier that is + // stable across restarts, or the empty string to use the first non-nil + // MAC address + InstanceID() string } -var ( +const ( id ID = "SrvID" ) // Main initializes a new Service -func Main(s Service) error { - flag.Parse() - ctx := context.Background() - uid := uuid.New().String() +func Main(s Service) (err error) { + flagfile.Load() - logger, err := utils.NewLogger("", zap.Fields(zap.String("SrvID", uid))) + ctx := context.Background() + + instanceID := s.InstanceID() + if instanceID == "" { + instanceID = telemetry.DefaultInstanceID() + } + + ctx, cf := context.WithCancel(context.WithValue(ctx, id, instanceID)) + defer cf() + + registry := monkit.Default + scope := registry.ScopeNamed("process") + defer scope.TaskNamed("main")(&ctx)(&err) + + logger, err := utils.NewLogger(*logDisposition, + zap.Fields(zap.String(string(id), instanceID))) if err != nil { return err } defer logger.Sync() - - ctx, cf := context.WithCancel(context.WithValue(ctx, id, uid)) - defer cf() + defer zap.ReplaceGlobals(logger)() + defer zap.RedirectStdLog(logger)() s.SetLogger(logger) - s.SetMetricHandler(monkit.NewRegistry()) + s.SetMetricHandler(registry) + + err = initMetrics(ctx, registry, instanceID) + if err != nil { + logger.Error("failed to configure telemetry", zap.Error(err)) + } + + err = initDebug(ctx, logger, registry) + if err != nil { + logger.Error("failed to start debug endpoints", zap.Error(err)) + } return s.Process(ctx) } + +// Must can be used for default Main error handling +func Must(err error) { + if err != nil { + log.Fatal(err) + } +} diff --git a/pkg/telemetry/client.go b/pkg/telemetry/client.go index fd4f1235c..4d36e8e61 100644 --- a/pkg/telemetry/client.go +++ b/pkg/telemetry/client.go @@ -5,7 +5,6 @@ package telemetry import ( "context" - "net" "os" "time" @@ -37,7 +36,7 @@ type ClientOpts struct { Application string // Instance is a string that identifies this particular server. Could be a - // node id, but defaults to the first non-nil MAC address + // node id, but defaults to the result of DefaultInstanceId() Instance string // PacketSize controls how we fragment the data as it goes out in UDP @@ -74,20 +73,7 @@ func NewClient(remoteAddr string, opts ClientOpts) (rv *Client, err error) { } } if opts.Instance == "" { - // instance by default is the first non-nil mac address - ifaces, err := net.Interfaces() - if err != nil { - return nil, err - } - for _, iface := range ifaces { - if iface.HardwareAddr != nil { - opts.Instance = iface.HardwareAddr.String() - break - } - } - if opts.Instance == "" { - opts.Instance = "unknown" - } + opts.Instance = DefaultInstanceID() } if opts.Registry == nil { opts.Registry = monkit.Default diff --git a/pkg/telemetry/utils.go b/pkg/telemetry/utils.go index ed70acf80..fa32d9cf9 100644 --- a/pkg/telemetry/utils.go +++ b/pkg/telemetry/utils.go @@ -4,10 +4,16 @@ package telemetry import ( + "log" "math/rand" + "net" "time" ) +const ( + unknownInstanceID = "unknown" +) + func jitter(t time.Duration) time.Duration { nanos := rand.NormFloat64()*float64(t/4) + float64(t) if nanos <= 0 { @@ -15,3 +21,19 @@ func jitter(t time.Duration) time.Duration { } return time.Duration(nanos) } + +// DefaultInstanceID will return the first non-nil mac address if possible, +// unknown otherwise. +func DefaultInstanceID() string { + ifaces, err := net.Interfaces() + if err != nil { + log.Printf("failed to determine default instance id: %v", err) + return unknownInstanceID + } + for _, iface := range ifaces { + if iface.HardwareAddr != nil { + return iface.HardwareAddr.String() + } + } + return unknownInstanceID +}