crashcollect: process for receiving panics

Added crashcollect server for tracking panics
on parent process and saving them in file.

Change-Id: I7926f9a16594227a3262e05d216199b7c2857385
This commit is contained in:
Qweder93 2021-04-26 22:00:05 +03:00
parent 02460fcc4c
commit a5c1e4b4a5
7 changed files with 360 additions and 13 deletions

114
cmd/crashcollect/main.go Normal file
View File

@ -0,0 +1,114 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"fmt"
"os"
"path/filepath"
"github.com/spf13/cobra"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/fpath"
"storj.io/private/cfgstruct"
"storj.io/private/process"
"storj.io/storj/crashcollect"
)
// Config defines storj crash collect service configuration.
type Config struct {
crashcollect.Config
}
func main() {
rootCmd := &cobra.Command{
Use: "crashcollect",
Short: "Crash collect service",
}
var runCfg Config
var setupCfg Config
var confDir string
var identityDir string
defaultConfDir := fpath.ApplicationDir("storj", "crashcollect")
defaultIdentityDir := fpath.ApplicationDir("storj", "identity", "crashcollect")
cfgstruct.SetupFlag(zap.L(), rootCmd, &confDir, "config-dir", defaultConfDir, "main directory for storj crash collect service configuration")
cfgstruct.SetupFlag(zap.L(), rootCmd, &identityDir, "identity-dir", defaultIdentityDir, "main directory for storj crash collect service identity credentials")
defaults := cfgstruct.DefaultsFlag(rootCmd)
runCmd := RunCommand(&runCfg)
setupCmd := SetupCommand(confDir)
rootCmd.AddCommand(runCmd)
rootCmd.AddCommand(setupCmd)
process.Bind(setupCmd, &setupCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir), cfgstruct.SetupMode())
process.Bind(runCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.ExecCustomDebug(rootCmd)
}
// RunCommand creates command for running crash collect.
func RunCommand(runCfg *Config) *cobra.Command {
runCmd := &cobra.Command{
Use: "run",
Short: "Run the storj crash collect service",
}
runCmd.RunE = func(cmd *cobra.Command, args []string) error {
ctx, _ := process.Ctx(cmd)
log := zap.L()
runCfg.Debug.Address = *process.DebugAddrFlag
identity, err := runCfg.Identity.Load()
if err != nil {
log.Error("failed to load identity.", zap.Error(err))
return errs.New("failed to load identity: %+v", err)
}
peer, err := crashcollect.New(log, identity, runCfg.Config)
if err != nil {
return err
}
runError := peer.Run(ctx)
closeError := peer.Close()
return errs.Combine(runError, closeError)
}
return runCmd
}
// SetupCommand creates command for creating config file for crash collect service.
func SetupCommand(confDir string) *cobra.Command {
setupCmd := &cobra.Command{
Use: "setup",
Short: "Create config files",
Annotations: map[string]string{"type": "setup"},
}
setupCmd.RunE = func(cmd *cobra.Command, args []string) error {
setupDir, err := filepath.Abs(confDir)
if err != nil {
return err
}
valid, _ := fpath.IsValidSetupDir(setupDir)
if !valid {
return fmt.Errorf("storj crash collect service configuration already exists (%v)", setupDir)
}
err = os.MkdirAll(setupDir, 0700)
if err != nil {
return err
}
return process.SaveConfig(cmd, filepath.Join(setupDir, "config.yaml"))
}
return setupCmd
}

View File

@ -0,0 +1,17 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package crash
import (
"time"
"storj.io/common/storj"
)
// Crash holds information about storagenode crash.
type Crash struct {
ID storj.NodeID
CompressedPanic []byte
CrashedAt time.Time
}

View File

@ -0,0 +1,48 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package crash
import (
"context"
"go.uber.org/zap"
"storj.io/common/identity"
"storj.io/common/rpc/rpcstatus"
"storj.io/storj/private/crashreportpb"
)
// ensures that Endpoint implements crashreportpb.DRPCCrashReportServer.
var _ crashreportpb.DRPCCrashReportServer = (*Endpoint)(nil)
// Endpoint is an drpc controller for receiving crashes.
type Endpoint struct {
crashes *Service
log *zap.Logger
}
// NewEndpoint is a constructor for Endpoint.
func NewEndpoint(log *zap.Logger, crashes *Service) *Endpoint {
return &Endpoint{
crashes: crashes,
log: log,
}
}
// Report is an drpc endpoint for receiving crashes.
func (endpoint *Endpoint) Report(ctx context.Context, r *crashreportpb.ReportRequest) (*crashreportpb.ReportResponse, error) {
peerID, err := identity.PeerIdentityFromContext(ctx)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
err = endpoint.crashes.Report(peerID.ID, r.GzippedPanic)
if err != nil {
endpoint.log.Error("could not create file with panic", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
return &crashreportpb.ReportResponse{}, nil
}

View File

@ -0,0 +1,59 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package crash
import (
"fmt"
"os"
"path"
"time"
"github.com/zeebo/errs"
"storj.io/common/storj"
)
// Config contains configurable values for crash collect service.
type Config struct {
StoringDir string `help:"directory to store crash reports" default:""`
}
// Error is a default error type for crash collect Service.
var Error = errs.Class("crashes service")
// Service exposes all crash-collect business logic.
//
// architecture: service
type Service struct {
config Config
}
// NewService is an constructor for Service.
func NewService(config Config) *Service {
return &Service{
config: config,
}
}
// Report receives report from crash-report client and saves it into .gz file.
func (s *Service) Report(nodeID storj.NodeID, gzippedPanic []byte) error {
now := time.Now().UTC()
filename := fmt.Sprintf("%s-%s.gz", nodeID.String(), now.Format(time.RFC3339))
f, err := os.Create(path.Join(s.config.StoringDir, filename))
if err != nil {
return Error.Wrap(err)
}
defer func() {
err = errs.Combine(err, f.Close())
}()
_, err = f.Write(gzippedPanic)
if err != nil {
return Error.Wrap(err)
}
return nil
}

108
crashcollect/peer.go Normal file
View File

@ -0,0 +1,108 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package crashcollect
import (
"context"
"errors"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"storj.io/common/identity"
"storj.io/common/peertls/tlsopts"
"storj.io/private/debug"
"storj.io/storj/crashcollect/crash"
"storj.io/storj/private/crashreportpb"
"storj.io/storj/private/server"
)
// Config is the global configuration for storj crash collect service.
type Config struct {
Debug debug.Config
Server server.Config
Crash crash.Config
Identity identity.Config
}
// Peer is the representation of a storj crash collect service.
//
// architecture: Peer
type Peer struct {
Log *zap.Logger
Config Config
Identity *identity.FullIdentity
Server *server.Server
Crash struct {
Service *crash.Service
Endpoint *crash.Endpoint
}
}
// New is a constructor for storj crash collect Peer.
func New(log *zap.Logger, full *identity.FullIdentity, config Config) (peer *Peer, err error) {
peer = &Peer{
Log: log,
Config: config,
Identity: full,
}
peer.Crash.Service = crash.NewService(peer.Config.Crash)
peer.Crash.Endpoint = crash.NewEndpoint(peer.Log, peer.Crash.Service)
tlsConfig := tlsopts.Config{
UsePeerCAWhitelist: false,
PeerIDVersions: "0",
}
tlsOptions, err := tlsopts.NewOptions(peer.Identity, tlsConfig, nil)
if err != nil {
return nil, err
}
peer.Server, err = server.New(log.Named("server"), tlsOptions, config.Server)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
err = crashreportpb.DRPCRegisterCrashReport(peer.Server.DRPC(), peer.Crash.Endpoint)
if err != nil {
return nil, err
}
peer.Log.Info("id = ", zap.Any("", full.ID.String()))
return peer, nil
}
// Run runs storj crash collect Peer api until it's either closed or it errors.
func (peer *Peer) Run(ctx context.Context) error {
group, ctx := errgroup.WithContext(ctx)
// start storj crash collect web api drpc server as a separate goroutine.
group.Go(func() error {
return ignoreCancel(peer.Server.Run(ctx))
})
return group.Wait()
}
// Close closes all the resources.
func (peer *Peer) Close() error {
if peer.Server != nil {
return peer.Server.Close()
}
return nil
}
func ignoreCancel(err error) error {
if errors.Is(err, context.Canceled) {
return nil
}
return err
}

View File

@ -22,7 +22,7 @@ var _ = math.Inf
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
type ReportRequest struct { type ReportRequest struct {
Crash []byte `protobuf:"bytes,1,opt,name=crash,proto3" json:"crash,omitempty"` GzippedPanic []byte `protobuf:"bytes,1,opt,name=gzipped_panic,json=gzippedPanic,proto3" json:"gzipped_panic,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
@ -52,9 +52,9 @@ func (m *ReportRequest) XXX_DiscardUnknown() {
var xxx_messageInfo_ReportRequest proto.InternalMessageInfo var xxx_messageInfo_ReportRequest proto.InternalMessageInfo
func (m *ReportRequest) GetCrash() []byte { func (m *ReportRequest) GetGzippedPanic() []byte {
if m != nil { if m != nil {
return m.Crash return m.GzippedPanic
} }
return nil return nil
} }
@ -97,15 +97,16 @@ func init() {
func init() { proto.RegisterFile("crashreport.proto", fileDescriptor_0c640f4432300a07) } func init() { proto.RegisterFile("crashreport.proto", fileDescriptor_0c640f4432300a07) }
var fileDescriptor_0c640f4432300a07 = []byte{ var fileDescriptor_0c640f4432300a07 = []byte{
// 149 bytes of a gzipped FileDescriptorProto // 166 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x4c, 0x2e, 0x4a, 0x2c, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x4c, 0x2e, 0x4a, 0x2c,
0xce, 0x28, 0x4a, 0x2d, 0xc8, 0x2f, 0x2a, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05, 0xce, 0x28, 0x4a, 0x2d, 0xc8, 0x2f, 0x2a, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05,
0x0b, 0x29, 0xa9, 0x72, 0xf1, 0x06, 0x81, 0x85, 0x83, 0x52, 0x0b, 0x4b, 0x53, 0x8b, 0x4b, 0x84, 0x0b, 0x29, 0x99, 0x70, 0xf1, 0x06, 0x81, 0x85, 0x83, 0x52, 0x0b, 0x4b, 0x53, 0x8b, 0x4b, 0x84,
0x44, 0xb8, 0x20, 0x32, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0x3c, 0x41, 0x50, 0x65, 0x02, 0x5c, 0x7c, 0x94, 0xb9, 0x78, 0xd3, 0xab, 0x32, 0x0b, 0x0a, 0x52, 0x53, 0xe2, 0x0b, 0x12, 0xf3, 0x32, 0x93,
0x30, 0x65, 0xc5, 0x05, 0xf9, 0x79, 0xc5, 0xa9, 0x46, 0x6e, 0x5c, 0xdc, 0xce, 0x20, 0x29, 0x88, 0x25, 0x18, 0x15, 0x18, 0x35, 0x78, 0x82, 0x78, 0xa0, 0x82, 0x01, 0x20, 0x31, 0x25, 0x01, 0x2e,
0xb0, 0x90, 0x39, 0x17, 0x1b, 0x94, 0x25, 0xa2, 0x07, 0xd6, 0xa2, 0x87, 0x62, 0xac, 0x94, 0x28, 0x3e, 0x98, 0xae, 0xe2, 0x82, 0xfc, 0xbc, 0xe2, 0x54, 0x23, 0x37, 0x2e, 0x6e, 0x67, 0x90, 0x81,
0x9a, 0x28, 0xc4, 0x14, 0x25, 0x06, 0x27, 0xb5, 0x28, 0x95, 0xe2, 0x92, 0xfc, 0xa2, 0x2c, 0xbd, 0x10, 0x61, 0x21, 0x73, 0x2e, 0x36, 0x28, 0x4b, 0x44, 0x0f, 0x6c, 0x91, 0x1e, 0x8a, 0x2d, 0x52,
0xcc, 0x7c, 0x7d, 0x30, 0x43, 0xbf, 0xa0, 0x28, 0xb3, 0x2c, 0xb1, 0x24, 0x55, 0x1f, 0xc9, 0xcd, 0xa2, 0x68, 0xa2, 0x10, 0x53, 0x94, 0x18, 0x9c, 0xd4, 0xa2, 0x54, 0x8a, 0x4b, 0xf2, 0x8b, 0xb2,
0x05, 0x49, 0x49, 0x6c, 0x60, 0x67, 0x1b, 0x03, 0x02, 0x00, 0x00, 0xff, 0xff, 0xdf, 0xed, 0xec, 0xf4, 0x32, 0xf3, 0xf5, 0xc1, 0x0c, 0xfd, 0x82, 0xa2, 0xcc, 0xb2, 0xc4, 0x92, 0x54, 0x7d, 0x24,
0x6f, 0xcb, 0x00, 0x00, 0x00, 0x2f, 0x14, 0x24, 0x25, 0xb1, 0x81, 0x7d, 0x61, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0x5b, 0xac,
0xda, 0xa6, 0xda, 0x00, 0x00, 0x00,
} }

View File

@ -11,7 +11,7 @@ service CrashReport {
} }
message ReportRequest { message ReportRequest {
bytes crash = 1; bytes gzipped_panic = 1;
} }
message ReportResponse {} message ReportResponse {}