storagenode: accept HTTP calls on public port, listening for monitoring requests

Today each storagenode should have a port which is opened for the internet, and handles DRPC protocol calls.

When we do a HTTP call on the DRPC endpoint, it hangs until a timeout.

This patch changes the behavior: the main DRPC port of the storagenodes can accept HTTP requests and can be used to monitor the status of the node:

 * if returns with HTTP 200 only if the storagnode is healthy (not suspended / disqualified + online score > 0.9)
 * it CAN include information about the current status (per satellite). It's opt-in, you should configure it so.

In this way it becomes extremely easy to monitor storagenodes with external uptime services.

Note: this patch exposes some information which was not easily available before (especially the node status, and used satellites). I think it should be acceptable:

 * Until having more community satellites, all storagenodes are connected to the main Storj satellites.
 * With community satellites, it's good thing to have more transparency (easy way to check who is connected to which satellites)

The implementation is based on this line:

```
http.Serve(NewPrefixedListener([]byte("GET / HT"), publicMux.Route("GET / HT")), p.public.http)
```

This line answers to the TCP requests with `GET / HT...` (GET HTTP request to the route), but puts back the removed prefix.

Change-Id: I3700c7e24524850825ecdf75a4bcc3b4afcb3a74
This commit is contained in:
Márton Elek 2022-08-23 12:28:41 +02:00 committed by Storj Robot
parent 07bbe7d340
commit 7e71986493
6 changed files with 249 additions and 1 deletions

51
private/server/prefix.go Normal file
View File

@ -0,0 +1,51 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package server
import (
"bytes"
"io"
"net"
)
type prefixConn struct {
io.Reader
net.Conn
}
func newPrefixConn(data []byte, conn net.Conn) *prefixConn {
return &prefixConn{
Reader: io.MultiReader(bytes.NewReader(data), conn),
Conn: conn,
}
}
func (pc *prefixConn) Read(p []byte) (n int, err error) {
return pc.Reader.Read(p)
}
// PrefixedListener injects prefix bytes to the beginning of every new connection.
type PrefixedListener struct {
net.Listener
prefix []byte
}
// NewPrefixedListener creates a new PrefixedListener.
func NewPrefixedListener(prefix []byte, listener net.Listener) net.Listener {
return &PrefixedListener{
Listener: listener,
prefix: prefix,
}
}
// Accept implements function of net.Listener.
func (p *PrefixedListener) Accept() (net.Conn, error) {
conn, err := p.Listener.Accept()
if err != nil {
return conn, err
}
return newPrefixConn(p.prefix, conn), nil
}
var _ net.Listener = &PrefixedListener{}

View File

@ -8,6 +8,7 @@ import (
"crypto/tls" "crypto/tls"
"errors" "errors"
"net" "net"
"net/http"
"os" "os"
"runtime" "runtime"
"sync" "sync"
@ -17,6 +18,7 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"storj.io/common/errs2"
"storj.io/common/identity" "storj.io/common/identity"
"storj.io/common/peertls/tlsopts" "storj.io/common/peertls/tlsopts"
"storj.io/common/rpc" "storj.io/common/rpc"
@ -48,6 +50,9 @@ type public struct {
disableQUIC bool disableQUIC bool
drpc *drpcserver.Server drpc *drpcserver.Server
// http fallback for the public endpoint
http http.HandlerFunc
mux *drpcmux.Mux mux *drpcmux.Mux
} }
@ -139,6 +144,11 @@ func (p *Server) Close() error {
return nil return nil
} }
// AddHTTPFallback adds http fallback to the drpc endpoint.
func (p *Server) AddHTTPFallback(httpHandler http.HandlerFunc) {
p.public.http = httpHandler
}
// Run will run the server and all of its services. // Run will run the server and all of its services.
func (p *Server) Run(ctx context.Context) (err error) { func (p *Server) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
@ -165,10 +175,15 @@ func (p *Server) Run(ctx context.Context) (err error) {
var ( var (
publicMux *drpcmigrate.ListenMux publicMux *drpcmigrate.ListenMux
publicDRPCListener net.Listener publicDRPCListener net.Listener
publicHTTPListener net.Listener
) )
if p.public.tcpListener != nil { if p.public.tcpListener != nil {
publicMux = drpcmigrate.NewListenMux(p.public.tcpListener, len(drpcmigrate.DRPCHeader)) publicMux = drpcmigrate.NewListenMux(p.public.tcpListener, len(drpcmigrate.DRPCHeader))
publicDRPCListener = tls.NewListener(publicMux.Route(drpcmigrate.DRPCHeader), p.tlsOptions.ServerTLSConfig()) publicDRPCListener = tls.NewListener(publicMux.Route(drpcmigrate.DRPCHeader), p.tlsOptions.ServerTLSConfig())
if p.public.http != nil {
publicHTTPListener = NewPrefixedListener([]byte("GET / HT"), publicMux.Route("GET / HT"))
}
} }
if p.public.udpConn != nil { if p.public.udpConn != nil {
@ -228,6 +243,26 @@ func (p *Server) Run(ctx context.Context) (err error) {
}) })
} }
if publicHTTPListener != nil {
// this http server listens on the filtered messages of the incoming DRPC port, instead of a separated port
httpServer := http.Server{
Handler: p.public.http,
}
group.Go(func() error {
<-ctx.Done()
return httpServer.Shutdown(context.Background())
})
group.Go(func() error {
defer cancel()
err := httpServer.Serve(publicHTTPListener)
if errs2.IsCanceled(err) || errors.Is(err, http.ErrServerClosed) {
err = nil
}
return err
})
}
group.Go(func() error { group.Go(func() error {
defer cancel() defer cancel()
return p.private.drpc.Serve(ctx, privateDRPCListener) return p.private.drpc.Serve(ctx, privateDRPCListener)

View File

@ -0,0 +1,10 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package healthcheck
// Config is the configuration for healthcheck service and endpoint.
type Config struct {
Details bool `user:"true" help:"Enable additional details about the satellite connections via the HTTP healthcheck." default:"false"`
Enabled bool `user:"true" help:"Provide health endpoint (including suspension/audit failures) on main public port, but HTTP protocol." default:"true"`
}

View File

@ -0,0 +1,46 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package healthcheck
import (
"encoding/json"
"net/http"
)
// Endpoint handles HTTP request for health endpoint.
type Endpoint struct {
service *Service
}
// NewEndpoint creates a new HTTP endpoint.
func NewEndpoint(service *Service) *Endpoint {
return &Endpoint{
service: service,
}
}
// HandleHTTP manages the HTTP conversion for the function call.
func (e *Endpoint) HandleHTTP(writer http.ResponseWriter, request *http.Request) {
health, err := e.service.GetHealth(request.Context())
if err != nil {
writer.WriteHeader(http.StatusInternalServerError)
_, _ = writer.Write([]byte(err.Error()))
return
}
out, err := json.MarshalIndent(health, "", " ")
if err != nil {
writer.WriteHeader(http.StatusInternalServerError)
_, _ = writer.Write([]byte(err.Error()))
return
}
if health.AllHealthy {
writer.WriteHeader(http.StatusOK)
} else {
writer.WriteHeader(http.StatusServiceUnavailable)
}
_, _ = writer.Write(out)
}

View File

@ -0,0 +1,88 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package healthcheck
import (
"context"
"time"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"storj.io/common/storj"
"storj.io/storj/storagenode/reputation"
)
var (
// Err defines sno service error.
Err = errs.Class("healthcheck")
mon = monkit.Package()
)
// Service is handling storage node estimation payouts logic.
//
// architecture: Service
type Service struct {
reputationDB reputation.DB
serveDetails bool
}
// NewService returns new instance of Service.
func NewService(reputationDB reputation.DB, serveDetails bool) *Service {
return &Service{
reputationDB: reputationDB,
serveDetails: serveDetails,
}
}
// Health represents the current status of the Storage ndoe.
type Health struct {
Statuses []SatelliteHealthStatus
Help string
AllHealthy bool
}
// SatelliteHealthStatus is the health status reported by one satellite.
type SatelliteHealthStatus struct {
OnlineScore float64
SatelliteID storj.NodeID
DisqualifiedAt *time.Time
SuspendedAt *time.Time
}
// GetHealth retrieves current health status based on DB records.
func (s *Service) GetHealth(ctx context.Context) (h Health, err error) {
defer mon.Task()(&ctx)(&err)
stats, err := s.reputationDB.All(ctx)
h.AllHealthy = true
if err != nil {
return h, Err.Wrap(err)
}
for _, stat := range stats {
if stat.DisqualifiedAt != nil || stat.SuspendedAt != nil || stat.OnlineScore < 0.9 {
h.AllHealthy = false
}
if s.serveDetails {
h.Statuses = append(h.Statuses, SatelliteHealthStatus{
SatelliteID: stat.SatelliteID,
OnlineScore: stat.OnlineScore,
DisqualifiedAt: stat.DisqualifiedAt,
SuspendedAt: stat.SuspendedAt,
})
}
}
// sg is wrong if we didn't connect to any satellite
if len(stats) == 0 {
h.AllHealthy = false
}
h.Help = "To access Storagenode services, please use DRPC protocol!"
return h, nil
}

View File

@ -40,6 +40,7 @@ import (
"storj.io/storj/storagenode/console/consoleserver" "storj.io/storj/storagenode/console/consoleserver"
"storj.io/storj/storagenode/contact" "storj.io/storj/storagenode/contact"
"storj.io/storj/storagenode/gracefulexit" "storj.io/storj/storagenode/gracefulexit"
"storj.io/storj/storagenode/healthcheck"
"storj.io/storj/storagenode/inspector" "storj.io/storj/storagenode/inspector"
"storj.io/storj/storagenode/internalpb" "storj.io/storj/storagenode/internalpb"
"storj.io/storj/storagenode/monitor" "storj.io/storj/storagenode/monitor"
@ -127,6 +128,8 @@ type Config struct {
Console consoleserver.Config Console consoleserver.Config
Healthcheck healthcheck.Config
Version checker.Config Version checker.Config
Bandwidth bandwidth.Config Bandwidth bandwidth.Config
@ -212,6 +215,11 @@ type Peer struct {
Service *checker.Service Service *checker.Service
} }
Healthcheck struct {
Service *healthcheck.Service
Endpoint *healthcheck.Endpoint
}
Debug struct { Debug struct {
Listener net.Listener Listener net.Listener
Server *debug.Server Server *debug.Server
@ -351,6 +359,12 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
}) })
} }
{
peer.Healthcheck.Service = healthcheck.NewService(peer.DB.Reputation(), config.Healthcheck.Details)
peer.Healthcheck.Endpoint = healthcheck.NewEndpoint(peer.Healthcheck.Service)
}
{ // setup listener and server { // setup listener and server
sc := config.Server sc := config.Server
@ -366,6 +380,10 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
return nil, errs.Combine(err, peer.Close()) return nil, errs.Combine(err, peer.Close())
} }
if config.Healthcheck.Enabled {
peer.Server.AddHTTPFallback(peer.Healthcheck.Endpoint.HandleHTTP)
}
peer.Servers.Add(lifecycle.Item{ peer.Servers.Add(lifecycle.Item{
Name: "server", Name: "server",
Run: func(ctx context.Context) error { Run: func(ctx context.Context) error {