From 7e7198649301d915d1e6100285ca56ffd05369ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rton=20Elek?= Date: Tue, 23 Aug 2022 12:28:41 +0200 Subject: [PATCH] storagenode: accept HTTP calls on public port, listening for monitoring requests Today each storagenode should have a port which is opened for the internet, and handles DRPC protocol calls. When we do a HTTP call on the DRPC endpoint, it hangs until a timeout. This patch changes the behavior: the main DRPC port of the storagenodes can accept HTTP requests and can be used to monitor the status of the node: * if returns with HTTP 200 only if the storagnode is healthy (not suspended / disqualified + online score > 0.9) * it CAN include information about the current status (per satellite). It's opt-in, you should configure it so. In this way it becomes extremely easy to monitor storagenodes with external uptime services. Note: this patch exposes some information which was not easily available before (especially the node status, and used satellites). I think it should be acceptable: * Until having more community satellites, all storagenodes are connected to the main Storj satellites. * With community satellites, it's good thing to have more transparency (easy way to check who is connected to which satellites) The implementation is based on this line: ``` http.Serve(NewPrefixedListener([]byte("GET / HT"), publicMux.Route("GET / HT")), p.public.http) ``` This line answers to the TCP requests with `GET / HT...` (GET HTTP request to the route), but puts back the removed prefix. Change-Id: I3700c7e24524850825ecdf75a4bcc3b4afcb3a74 --- private/server/prefix.go | 51 +++++++++++++++++ private/server/server.go | 37 +++++++++++- storagenode/healthcheck/config.go | 10 ++++ storagenode/healthcheck/endpoint.go | 46 +++++++++++++++ storagenode/healthcheck/service.go | 88 +++++++++++++++++++++++++++++ storagenode/peer.go | 18 ++++++ 6 files changed, 249 insertions(+), 1 deletion(-) create mode 100644 private/server/prefix.go create mode 100644 storagenode/healthcheck/config.go create mode 100644 storagenode/healthcheck/endpoint.go create mode 100644 storagenode/healthcheck/service.go diff --git a/private/server/prefix.go b/private/server/prefix.go new file mode 100644 index 000000000..69abbb918 --- /dev/null +++ b/private/server/prefix.go @@ -0,0 +1,51 @@ +// Copyright (C) 2022 Storj Labs, Inc. +// See LICENSE for copying information. + +package server + +import ( + "bytes" + "io" + "net" +) + +type prefixConn struct { + io.Reader + net.Conn +} + +func newPrefixConn(data []byte, conn net.Conn) *prefixConn { + return &prefixConn{ + Reader: io.MultiReader(bytes.NewReader(data), conn), + Conn: conn, + } +} + +func (pc *prefixConn) Read(p []byte) (n int, err error) { + return pc.Reader.Read(p) +} + +// PrefixedListener injects prefix bytes to the beginning of every new connection. +type PrefixedListener struct { + net.Listener + prefix []byte +} + +// NewPrefixedListener creates a new PrefixedListener. +func NewPrefixedListener(prefix []byte, listener net.Listener) net.Listener { + return &PrefixedListener{ + Listener: listener, + prefix: prefix, + } +} + +// Accept implements function of net.Listener. +func (p *PrefixedListener) Accept() (net.Conn, error) { + conn, err := p.Listener.Accept() + if err != nil { + return conn, err + } + return newPrefixConn(p.prefix, conn), nil +} + +var _ net.Listener = &PrefixedListener{} diff --git a/private/server/server.go b/private/server/server.go index 5a282d47f..6d9760a00 100644 --- a/private/server/server.go +++ b/private/server/server.go @@ -8,6 +8,7 @@ import ( "crypto/tls" "errors" "net" + "net/http" "os" "runtime" "sync" @@ -17,6 +18,7 @@ import ( "go.uber.org/zap" "golang.org/x/sync/errgroup" + "storj.io/common/errs2" "storj.io/common/identity" "storj.io/common/peertls/tlsopts" "storj.io/common/rpc" @@ -48,7 +50,10 @@ type public struct { disableQUIC bool drpc *drpcserver.Server - mux *drpcmux.Mux + // http fallback for the public endpoint + http http.HandlerFunc + + mux *drpcmux.Mux } type private struct { @@ -139,6 +144,11 @@ func (p *Server) Close() error { return nil } +// AddHTTPFallback adds http fallback to the drpc endpoint. +func (p *Server) AddHTTPFallback(httpHandler http.HandlerFunc) { + p.public.http = httpHandler +} + // Run will run the server and all of its services. func (p *Server) Run(ctx context.Context) (err error) { defer mon.Task()(&ctx)(&err) @@ -165,10 +175,15 @@ func (p *Server) Run(ctx context.Context) (err error) { var ( publicMux *drpcmigrate.ListenMux publicDRPCListener net.Listener + publicHTTPListener net.Listener ) if p.public.tcpListener != nil { publicMux = drpcmigrate.NewListenMux(p.public.tcpListener, len(drpcmigrate.DRPCHeader)) publicDRPCListener = tls.NewListener(publicMux.Route(drpcmigrate.DRPCHeader), p.tlsOptions.ServerTLSConfig()) + + if p.public.http != nil { + publicHTTPListener = NewPrefixedListener([]byte("GET / HT"), publicMux.Route("GET / HT")) + } } if p.public.udpConn != nil { @@ -228,6 +243,26 @@ func (p *Server) Run(ctx context.Context) (err error) { }) } + if publicHTTPListener != nil { + // this http server listens on the filtered messages of the incoming DRPC port, instead of a separated port + httpServer := http.Server{ + Handler: p.public.http, + } + + group.Go(func() error { + <-ctx.Done() + return httpServer.Shutdown(context.Background()) + }) + group.Go(func() error { + defer cancel() + err := httpServer.Serve(publicHTTPListener) + if errs2.IsCanceled(err) || errors.Is(err, http.ErrServerClosed) { + err = nil + } + return err + }) + } + group.Go(func() error { defer cancel() return p.private.drpc.Serve(ctx, privateDRPCListener) diff --git a/storagenode/healthcheck/config.go b/storagenode/healthcheck/config.go new file mode 100644 index 000000000..e6d366ba3 --- /dev/null +++ b/storagenode/healthcheck/config.go @@ -0,0 +1,10 @@ +// Copyright (C) 2022 Storj Labs, Inc. +// See LICENSE for copying information. + +package healthcheck + +// Config is the configuration for healthcheck service and endpoint. +type Config struct { + Details bool `user:"true" help:"Enable additional details about the satellite connections via the HTTP healthcheck." default:"false"` + Enabled bool `user:"true" help:"Provide health endpoint (including suspension/audit failures) on main public port, but HTTP protocol." default:"true"` +} diff --git a/storagenode/healthcheck/endpoint.go b/storagenode/healthcheck/endpoint.go new file mode 100644 index 000000000..195bea285 --- /dev/null +++ b/storagenode/healthcheck/endpoint.go @@ -0,0 +1,46 @@ +// Copyright (C) 2022 Storj Labs, Inc. +// See LICENSE for copying information. + +package healthcheck + +import ( + "encoding/json" + "net/http" +) + +// Endpoint handles HTTP request for health endpoint. +type Endpoint struct { + service *Service +} + +// NewEndpoint creates a new HTTP endpoint. +func NewEndpoint(service *Service) *Endpoint { + return &Endpoint{ + service: service, + } +} + +// HandleHTTP manages the HTTP conversion for the function call. +func (e *Endpoint) HandleHTTP(writer http.ResponseWriter, request *http.Request) { + health, err := e.service.GetHealth(request.Context()) + if err != nil { + writer.WriteHeader(http.StatusInternalServerError) + _, _ = writer.Write([]byte(err.Error())) + return + } + + out, err := json.MarshalIndent(health, "", " ") + if err != nil { + writer.WriteHeader(http.StatusInternalServerError) + _, _ = writer.Write([]byte(err.Error())) + return + } + + if health.AllHealthy { + writer.WriteHeader(http.StatusOK) + } else { + writer.WriteHeader(http.StatusServiceUnavailable) + } + + _, _ = writer.Write(out) +} diff --git a/storagenode/healthcheck/service.go b/storagenode/healthcheck/service.go new file mode 100644 index 000000000..ac0d2503a --- /dev/null +++ b/storagenode/healthcheck/service.go @@ -0,0 +1,88 @@ +// Copyright (C) 2022 Storj Labs, Inc. +// See LICENSE for copying information. + +package healthcheck + +import ( + "context" + "time" + + "github.com/spacemonkeygo/monkit/v3" + "github.com/zeebo/errs" + + "storj.io/common/storj" + "storj.io/storj/storagenode/reputation" +) + +var ( + // Err defines sno service error. + Err = errs.Class("healthcheck") + + mon = monkit.Package() +) + +// Service is handling storage node estimation payouts logic. +// +// architecture: Service +type Service struct { + reputationDB reputation.DB + serveDetails bool +} + +// NewService returns new instance of Service. +func NewService(reputationDB reputation.DB, serveDetails bool) *Service { + return &Service{ + reputationDB: reputationDB, + serveDetails: serveDetails, + } +} + +// Health represents the current status of the Storage ndoe. +type Health struct { + Statuses []SatelliteHealthStatus + Help string + AllHealthy bool +} + +// SatelliteHealthStatus is the health status reported by one satellite. +type SatelliteHealthStatus struct { + OnlineScore float64 + SatelliteID storj.NodeID + DisqualifiedAt *time.Time + SuspendedAt *time.Time +} + +// GetHealth retrieves current health status based on DB records. +func (s *Service) GetHealth(ctx context.Context) (h Health, err error) { + defer mon.Task()(&ctx)(&err) + stats, err := s.reputationDB.All(ctx) + + h.AllHealthy = true + + if err != nil { + return h, Err.Wrap(err) + } + for _, stat := range stats { + if stat.DisqualifiedAt != nil || stat.SuspendedAt != nil || stat.OnlineScore < 0.9 { + h.AllHealthy = false + } + + if s.serveDetails { + h.Statuses = append(h.Statuses, SatelliteHealthStatus{ + SatelliteID: stat.SatelliteID, + OnlineScore: stat.OnlineScore, + DisqualifiedAt: stat.DisqualifiedAt, + SuspendedAt: stat.SuspendedAt, + }) + } + } + + // sg is wrong if we didn't connect to any satellite + if len(stats) == 0 { + h.AllHealthy = false + } + + h.Help = "To access Storagenode services, please use DRPC protocol!" + + return h, nil +} diff --git a/storagenode/peer.go b/storagenode/peer.go index 832366ec4..fdc39e93c 100644 --- a/storagenode/peer.go +++ b/storagenode/peer.go @@ -40,6 +40,7 @@ import ( "storj.io/storj/storagenode/console/consoleserver" "storj.io/storj/storagenode/contact" "storj.io/storj/storagenode/gracefulexit" + "storj.io/storj/storagenode/healthcheck" "storj.io/storj/storagenode/inspector" "storj.io/storj/storagenode/internalpb" "storj.io/storj/storagenode/monitor" @@ -127,6 +128,8 @@ type Config struct { Console consoleserver.Config + Healthcheck healthcheck.Config + Version checker.Config Bandwidth bandwidth.Config @@ -212,6 +215,11 @@ type Peer struct { Service *checker.Service } + Healthcheck struct { + Service *healthcheck.Service + Endpoint *healthcheck.Endpoint + } + Debug struct { Listener net.Listener Server *debug.Server @@ -351,6 +359,12 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten }) } + { + + peer.Healthcheck.Service = healthcheck.NewService(peer.DB.Reputation(), config.Healthcheck.Details) + peer.Healthcheck.Endpoint = healthcheck.NewEndpoint(peer.Healthcheck.Service) + } + { // setup listener and server sc := config.Server @@ -366,6 +380,10 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten return nil, errs.Combine(err, peer.Close()) } + if config.Healthcheck.Enabled { + peer.Server.AddHTTPFallback(peer.Healthcheck.Endpoint.HandleHTTP) + } + peer.Servers.Add(lifecycle.Item{ Name: "server", Run: func(ctx context.Context) error {