cmd/storagenode: refactor main.go

The cmd/storagenode/main.go is a big mess right now with so many
unneeded config structures initialized and shared by several
subcommands.

There are many instances where the config structure of one subcommand
is mistakenly used for another subcommand.

This changes is an attempt to clean up the main.go by moving the
subcommands to a separate `cmd_*.go` files with separate config structures
for each subcommand.

Resolves https://github.com/storj/storj/issues/5756

Change-Id: I85adf2439acba271c023c269739f7fa3c6d49f9d
This commit is contained in:
Clement Sam 2023-04-05 21:05:07 +00:00
parent 45a8ac7f57
commit 3cf89633e9
14 changed files with 737 additions and 480 deletions

View File

@ -0,0 +1,52 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"os"
"path/filepath"
"github.com/spf13/cobra"
"storj.io/common/fpath"
"storj.io/private/cfgstruct"
"storj.io/private/process"
)
func newConfigCmd(f *Factory) *cobra.Command {
var cfg setupCfg
cmd := &cobra.Command{
Use: "config",
Short: "Edit config files",
RunE: func(cmd *cobra.Command, args []string) error {
setupDir, err := filepath.Abs(f.ConfDir)
if err != nil {
return err
}
cfg.SetupDir = setupDir
return cmdConfig(cmd, &cfg)
},
Annotations: map[string]string{"type": "setup"},
}
process.Bind(cmd, &cfg, f.Defaults, cfgstruct.ConfDir(f.ConfDir), cfgstruct.IdentityDir(f.IdentityDir), cfgstruct.SetupMode())
return cmd
}
func cmdConfig(cmd *cobra.Command, cfg *setupCfg) (err error) {
setupDir, err := filepath.Abs(cfg.SetupDir)
if err != nil {
return err
}
// run setup if we can't access the config file
conf := filepath.Join(setupDir, "config.yaml")
if _, err := os.Stat(conf); err != nil {
return cmdSetup(cmd, cfg)
}
return fpath.EditFile(conf)
}

View File

@ -1,4 +1,4 @@
// Copyright (C) 2019 Storj Labs, Inc.
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package main
@ -17,8 +17,10 @@ import (
"github.com/spf13/cobra"
"go.uber.org/zap"
"storj.io/common/identity"
"storj.io/common/memory"
"storj.io/common/rpc"
"storj.io/private/cfgstruct"
"storj.io/private/process"
"storj.io/private/version"
"storj.io/storj/storagenode/internalpb"
@ -30,6 +32,29 @@ type dashboardClient struct {
conn *rpc.Conn
}
type dashboardCfg struct {
Address string `default:"127.0.0.1:7778" help:"address for dashboard service"`
UseColor bool `internal:"true"`
}
func newDashboardCmd(f *Factory) *cobra.Command {
var cfg dashboardCfg
cmd := &cobra.Command{
Use: "dashboard",
Short: "Run the dashboard",
RunE: func(cmd *cobra.Command, args []string) error {
cfg.UseColor = f.UseColor
return cmdDashboard(cmd, &cfg)
},
}
process.Bind(cmd, &cfg, cfgstruct.ConfDir(f.ConfDir), cfgstruct.IdentityDir(f.IdentityDir))
return cmd
}
func dialDashboardClient(ctx context.Context, address string) (*dashboardClient, error) {
conn, err := rpc.NewDefaultDialer(nil).DialAddressUnencrypted(ctx, address)
if err != nil {
@ -46,17 +71,20 @@ func (dash *dashboardClient) close() error {
return dash.conn.Close()
}
func cmdDashboard(cmd *cobra.Command, args []string) (err error) {
func cmdDashboard(cmd *cobra.Command, cfg *dashboardCfg) (err error) {
ctx, _ := process.Ctx(cmd)
ident, err := runCfg.Identity.Load()
// TDDO: move to dashboardCfg to allow setting CertPath and KeyPath
var identityCfg identity.Config
ident, err := identityCfg.Load()
if err != nil {
zap.L().Fatal("Failed to load identity.", zap.Error(err))
} else {
zap.L().Info("Identity loaded.", zap.Stringer("Node ID", ident.ID))
}
client, err := dialDashboardClient(ctx, dashboardCfg.Address)
client, err := dialDashboardClient(ctx, cfg.Address)
if err != nil {
return err
}
@ -72,7 +100,7 @@ func cmdDashboard(cmd *cobra.Command, args []string) (err error) {
return err
}
if err := printDashboard(data); err != nil {
if err := printDashboard(cfg, data); err != nil {
return err
}
@ -81,10 +109,10 @@ func cmdDashboard(cmd *cobra.Command, args []string) (err error) {
}
}
func printDashboard(data *internalpb.DashboardResponse) error {
func printDashboard(cfg *dashboardCfg, data *internalpb.DashboardResponse) error {
clearScreen()
var warnFlag bool
color.NoColor = !useColor
color.NoColor = !cfg.UseColor
heading := color.New(color.FgGreen, color.Bold)
_, _ = heading.Printf("\nStorage Node Dashboard ( Node Version: %s )\n", version.Build.Version.String())
@ -130,7 +158,7 @@ func printDashboard(data *internalpb.DashboardResponse) error {
w = tabwriter.NewWriter(color.Output, 0, 0, 1, ' ', 0)
// TODO: Get addresses from server data
fmt.Fprintf(w, "Internal\t%s\n", color.WhiteString(dashboardCfg.Address))
fmt.Fprintf(w, "Internal\t%s\n", color.WhiteString(cfg.Address))
fmt.Fprintf(w, "External\t%s\n", color.WhiteString(data.GetExternalAddress()))
// Disabling the Link to the Dashboard as its not working yet
// fmt.Fprintf(w, "Dashboard\t%s\n", color.WhiteString(data.GetDashboardAddress()))

103
cmd/storagenode/cmd_diag.go Normal file
View File

@ -0,0 +1,103 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"fmt"
"os"
"path/filepath"
"sort"
"text/tabwriter"
"time"
"github.com/spf13/cobra"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/memory"
"storj.io/common/storj"
"storj.io/private/cfgstruct"
"storj.io/private/process"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/storagenodedb"
)
type diagCfg struct {
storagenode.Config
DiagDir string `internal:"true"`
}
func newDiagCmd(f *Factory) *cobra.Command {
var diagCfg diagCfg
cmd := &cobra.Command{
Use: "diag",
Short: "Diagnostic Tool support",
RunE: func(cmd *cobra.Command, args []string) error {
diagDir, err := filepath.Abs(f.ConfDir)
if err != nil {
return err
}
diagCfg.DiagDir = diagDir
return cmdDiag(cmd, &diagCfg)
},
Annotations: map[string]string{"type": "helper"},
}
process.Bind(cmd, &diagCfg, f.Defaults, cfgstruct.ConfDir(f.ConfDir), cfgstruct.IdentityDir(f.IdentityDir))
return cmd
}
func cmdDiag(cmd *cobra.Command, cfg *diagCfg) (err error) {
ctx, _ := process.Ctx(cmd)
// check if the directory exists
_, err = os.Stat(cfg.DiagDir)
if err != nil {
fmt.Println("storage node directory doesn't exist", cfg.DiagDir)
return err
}
db, err := storagenodedb.OpenExisting(ctx, zap.L().Named("db"), cfg.DatabaseConfig())
if err != nil {
return errs.New("Error starting master database on storage node: %v", err)
}
defer func() {
err = errs.Combine(err, db.Close())
}()
summaries, err := db.Bandwidth().SummaryBySatellite(ctx, time.Time{}, time.Now())
if err != nil {
fmt.Printf("unable to get bandwidth summary: %v\n", err)
return err
}
satellites := storj.NodeIDList{}
for id := range summaries {
satellites = append(satellites, id)
}
sort.Sort(satellites)
w := tabwriter.NewWriter(os.Stdout, 0, 0, 3, ' ', tabwriter.AlignRight|tabwriter.Debug)
defer func() { err = errs.Combine(err, w.Flush()) }()
fmt.Fprint(w, "Satellite\tTotal\tPut\tGet\tDelete\tAudit Get\tRepair Get\tRepair Put\n")
for _, id := range satellites {
summary := summaries[id]
fmt.Fprintf(w, "%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\n",
id,
memory.Size(summary.Total()),
memory.Size(summary.Put),
memory.Size(summary.Get),
memory.Size(summary.Delete),
memory.Size(summary.GetAudit),
memory.Size(summary.GetRepair),
memory.Size(summary.PutRepair),
)
}
return nil
}

View File

@ -19,12 +19,53 @@ import (
"storj.io/common/memory"
"storj.io/common/rpc"
"storj.io/common/storj"
"storj.io/private/cfgstruct"
"storj.io/private/process"
"storj.io/storj/private/date"
"storj.io/storj/private/prompt"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/internalpb"
)
type gracefulExitCfg struct {
storagenode.Config
}
func newGracefulExitInitCmd(f *Factory) *cobra.Command {
var cfg gracefulExitCfg
cmd := &cobra.Command{
Use: "exit-satellite",
Short: "Initiate graceful exit",
Long: "Initiate gracefule exit.\n" +
"The command shows the list of the available satellites that can be exited " +
"and ask for choosing one.",
RunE: func(cmd *cobra.Command, args []string) error {
return cmdGracefulExitInit(cmd, &cfg)
},
Annotations: map[string]string{"type": "helper"},
}
process.Bind(cmd, &cfg, f.Defaults, cfgstruct.ConfDir(f.ConfDir))
return cmd
}
func newGracefulExitStatusCmd(f *Factory) *cobra.Command {
var cfg gracefulExitCfg
cmd := &cobra.Command{
Use: "exit-status",
Short: "Display graceful exit status",
RunE: func(cmd *cobra.Command, args []string) error {
return cmdGracefulExitStatus(cmd, &cfg)
},
Annotations: map[string]string{"type": "helper"},
}
process.Bind(cmd, &cfg, f.Defaults, cfgstruct.ConfDir(f.ConfDir))
return cmd
}
type gracefulExitClient struct {
conn *rpc.Conn
}
@ -62,10 +103,10 @@ func (client *gracefulExitClient) close() error {
return client.conn.Close()
}
func cmdGracefulExitInit(cmd *cobra.Command, args []string) error {
func cmdGracefulExitInit(cmd *cobra.Command, cfg *gracefulExitCfg) error {
ctx, _ := process.Ctx(cmd)
ident, err := runCfg.Identity.Load()
ident, err := cfg.Identity.Load()
if err != nil {
zap.L().Fatal("Failed to load identity.", zap.Error(err))
} else {
@ -81,7 +122,7 @@ func cmdGracefulExitInit(cmd *cobra.Command, args []string) error {
return nil
}
client, err := dialGracefulExitClient(ctx, diagCfg.Server.PrivateAddress)
client, err := dialGracefulExitClient(ctx, cfg.Server.PrivateAddress)
if err != nil {
return errs.Wrap(err)
}
@ -139,17 +180,17 @@ func cmdGracefulExitInit(cmd *cobra.Command, args []string) error {
return gracefulExitInit(ctx, satelliteIDs, w, client)
}
func cmdGracefulExitStatus(cmd *cobra.Command, args []string) (err error) {
func cmdGracefulExitStatus(cmd *cobra.Command, cfg *gracefulExitCfg) (err error) {
ctx, _ := process.Ctx(cmd)
ident, err := runCfg.Identity.Load()
ident, err := cfg.Identity.Load()
if err != nil {
zap.L().Fatal("Failed to load identity.", zap.Error(err))
} else {
zap.L().Info("Identity loaded.", zap.Stringer("Node ID", ident.ID))
}
client, err := dialGracefulExitClient(ctx, diagCfg.Server.PrivateAddress)
client, err := dialGracefulExitClient(ctx, cfg.Server.PrivateAddress)
if err != nil {
return errs.Wrap(err)
}

View File

@ -0,0 +1,68 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"fmt"
"github.com/spf13/cobra"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/private/cfgstruct"
"storj.io/private/process"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/apikeys"
"storj.io/storj/storagenode/storagenodedb"
)
type issueCfg struct {
storagenode.Config
}
func newIssueAPIKeyCmd(f *Factory) *cobra.Command {
var cfg issueCfg
cmd := &cobra.Command{
Use: "issue-apikey",
Short: "Issue a new api key",
RunE: func(cmd *cobra.Command, args []string) error {
return cmdIssue(cmd, &cfg)
},
}
process.Bind(cmd, &cfg, f.Defaults, cfgstruct.ConfDir(f.ConfDir), cfgstruct.IdentityDir(f.IdentityDir))
return cmd
}
func cmdIssue(cmd *cobra.Command, cfg *issueCfg) (err error) {
ctx, _ := process.Ctx(cmd)
ident, err := cfg.Identity.Load()
if err != nil {
zap.L().Fatal("Failed to load identity.", zap.Error(err))
} else {
zap.L().Info("Identity loaded.", zap.Stringer("Node ID", ident.ID))
}
db, err := storagenodedb.OpenExisting(ctx, zap.L().Named("db"), cfg.DatabaseConfig())
if err != nil {
return errs.New("Error starting master database on storage node: %v", err)
}
defer func() {
err = errs.Combine(err, db.Close())
}()
service := apikeys.NewService(db.APIKeys())
apiKey, err := service.Issue(ctx)
if err != nil {
return errs.New("Error while trying to issue new api key: %v", err)
}
fmt.Println(apiKey.Secret.String())
return
}

View File

@ -0,0 +1,112 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"encoding/json"
"fmt"
"github.com/spf13/cobra"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/private/cfgstruct"
"storj.io/private/process"
"storj.io/storj/multinode/nodes"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/apikeys"
"storj.io/storj/storagenode/storagenodedb"
)
type nodeInfoCfg struct {
storagenode.Config
JSON bool `default:"false" help:"print node info in JSON format"`
}
func newNodeInfoCmd(f *Factory) *cobra.Command {
var cfg nodeInfoCfg
cmd := &cobra.Command{
Use: "info",
Short: "Print storage node info",
Long: `Print storage node info.
--json should be specified to print output in JSON format.
It is expected that the JSON output will mostly be piped to 'multinode add -'.
WARNING: The output includes the api secret of the storagenode.
`,
RunE: func(cmd *cobra.Command, args []string) error {
return cmdInfo(cmd, &cfg)
},
Example: `
#=> print node info
$ storagenode info --config-dir '<path/to/config-dir>' --identity-dir '<path/to/identity-dir>'
#=> print output in JSON format
$ storagenode info --json --config-dir '<path/to/config-dir>' --identity-dir '<path/to/identity-dir>'
#=> add node to multinode dashboard
$ storagenode info --json --config-dir '<path/to/config-dir>' --identity-dir '<path/to/identity-dir>' | multinode add -
`,
Args: cobra.ExactArgs(0),
}
process.Bind(cmd, &cfg, f.Defaults, cfgstruct.ConfDir(f.ConfDir), cfgstruct.IdentityDir(f.IdentityDir))
return cmd
}
func cmdInfo(cmd *cobra.Command, cfg *nodeInfoCfg) (err error) {
ctx, _ := process.Ctx(cmd)
// TODO(clement): add support for getting info for all available storagenodes
identity, err := cfg.Identity.Load()
if err != nil {
zap.L().Fatal("Failed to load identity.", zap.Error(err))
} else {
zap.L().Info("Identity loaded.", zap.Stringer("Node ID", identity.ID))
}
db, err := storagenodedb.OpenExisting(ctx, zap.L().Named("db"), cfg.DatabaseConfig())
if err != nil {
return errs.New("error starting master database on storage node: %v", err)
}
defer func() {
err = errs.Combine(err, db.Close())
}()
service := apikeys.NewService(db.APIKeys())
apiKey, err := service.Issue(ctx)
if err != nil {
return errs.New("error while trying to issue new api key: %v", err)
}
if cfg.JSON {
node := nodes.Node{
ID: identity.ID,
APISecret: apiKey.Secret,
PublicAddress: cfg.Contact.ExternalAddress,
}
data, err := json.Marshal(node)
if err != nil {
return err
}
fmt.Println(string(data))
return nil
}
fmt.Printf(`
ID: %s
API Secret: %s
Public Address: %s
`, identity.ID, apiKey.Secret, cfg.Contact.ExternalAddress)
return nil
}

124
cmd/storagenode/cmd_run.go Normal file
View File

@ -0,0 +1,124 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"github.com/spf13/cobra"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/private/cfgstruct"
"storj.io/private/process"
"storj.io/private/version"
"storj.io/storj/private/revocation"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/storagenodedb"
)
// runCfg defines configuration for run command.
type runCfg struct {
StorageNodeFlags
}
// newRunCmd creates a new run command.
func newRunCmd(f *Factory) *cobra.Command {
var runCfg runCfg
cmd := &cobra.Command{
Use: "run",
Short: "Run the storagenode",
RunE: func(cmd *cobra.Command, args []string) error {
return cmdRun(cmd, &runCfg)
},
}
process.Bind(cmd, &runCfg, f.Defaults, cfgstruct.ConfDir(f.ConfDir), cfgstruct.IdentityDir(f.IdentityDir))
return cmd
}
func cmdRun(cmd *cobra.Command, cfg *runCfg) (err error) {
// inert constructors only ====
ctx, _ := process.Ctx(cmd)
log := zap.L()
cfg.Debug.Address = *process.DebugAddrFlag
mapDeprecatedConfigs(log, &cfg.StorageNodeFlags)
identity, err := cfg.Identity.Load()
if err != nil {
log.Error("Failed to load identity.", zap.Error(err))
return errs.New("Failed to load identity: %+v", err)
}
if err := cfg.Verify(log); err != nil {
log.Error("Invalid configuration.", zap.Error(err))
return err
}
db, err := storagenodedb.OpenExisting(ctx, log.Named("db"), cfg.DatabaseConfig())
if err != nil {
return errs.New("Error starting master database on storagenode: %+v", err)
}
defer func() {
err = errs.Combine(err, db.Close())
}()
revocationDB, err := revocation.OpenDBFromCfg(ctx, cfg.Server.Config)
if err != nil {
return errs.New("Error creating revocation database: %+v", err)
}
defer func() {
err = errs.Combine(err, revocationDB.Close())
}()
peer, err := storagenode.New(log, identity, db, revocationDB, cfg.Config, version.Build, process.AtomicLevel(cmd))
if err != nil {
return err
}
// okay, start doing stuff ====
_, err = peer.Version.Service.CheckVersion(ctx)
if err != nil {
return err
}
if err := process.InitMetricsWithCertPath(ctx, log, nil, cfg.Identity.CertPath); err != nil {
log.Warn("Failed to initialize telemetry batcher.", zap.Error(err))
}
err = db.MigrateToLatest(ctx)
if err != nil {
return errs.New("Error creating tables for master database on storagenode: %+v", err)
}
err = db.CheckVersion(ctx)
if err != nil {
return errs.New("Error checking version for storagenode database: %+v", err)
}
preflightEnabled, err := cmd.Flags().GetBool("preflight.database-check")
if err != nil {
return errs.New("Cannot retrieve preflight.database-check flag: %+v", err)
}
if preflightEnabled {
err = db.Preflight(ctx)
if err != nil {
return errs.New("Error during preflight check for storagenode databases: %+v", err)
}
}
if err := peer.Storage2.CacheService.Init(ctx); err != nil {
log.Error("Failed to initialize CacheService.", zap.Error(err))
}
runError := peer.Run(ctx)
closeError := peer.Close()
return errs.Combine(runError, closeError)
}

View File

@ -0,0 +1,105 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"fmt"
"os"
"path/filepath"
"github.com/spf13/cobra"
"go.uber.org/zap"
"storj.io/common/fpath"
"storj.io/private/cfgstruct"
"storj.io/private/process"
"storj.io/storj/storagenode/storagenodedb"
)
const (
defaultServerAddr = ":28967"
defaultPrivateServerAddr = "127.0.0.1:7778"
)
type setupCfg struct {
StorageNodeFlags
SetupDir string `internal:"true" help:"path to setup directory"`
}
func newSetupCmd(f *Factory) *cobra.Command {
var setupCfg setupCfg
cmd := &cobra.Command{
Use: "setup",
Short: "Create config files",
RunE: func(cmd *cobra.Command, args []string) error {
setupDir, err := filepath.Abs(f.ConfDir)
if err != nil {
return err
}
setupCfg.SetupDir = setupDir
return cmdSetup(cmd, &setupCfg)
},
Annotations: map[string]string{"type": "setup"},
}
process.Bind(cmd, &setupCfg, f.Defaults, cfgstruct.ConfDir(f.ConfDir), cfgstruct.IdentityDir(f.IdentityDir), cfgstruct.SetupMode())
return cmd
}
func cmdSetup(cmd *cobra.Command, cfg *setupCfg) (err error) {
ctx, _ := process.Ctx(cmd)
valid, _ := fpath.IsValidSetupDir(cfg.SetupDir)
if !valid {
return fmt.Errorf("storagenode configuration already exists (%v)", cfg.SetupDir)
}
identity, err := cfg.Identity.Load()
if err != nil {
return err
}
err = os.MkdirAll(cfg.SetupDir, 0700)
if err != nil {
return err
}
overrides := map[string]interface{}{
"log.level": "info",
}
serverAddress := cmd.Flag("server.address")
if !serverAddress.Changed {
overrides[serverAddress.Name] = defaultServerAddr
}
serverPrivateAddress := cmd.Flag("server.private-address")
if !serverPrivateAddress.Changed {
overrides[serverPrivateAddress.Name] = defaultPrivateServerAddr
}
configFile := filepath.Join(cfg.SetupDir, "config.yaml")
err = process.SaveConfig(cmd, configFile, process.SaveConfigWithOverrides(overrides))
if err != nil {
return err
}
if cfg.EditConf {
return fpath.EditFile(configFile)
}
// create db
db, err := storagenodedb.OpenNew(ctx, zap.L().Named("db"), cfg.DatabaseConfig())
if err != nil {
return err
}
if err := db.Pieces().CreateVerificationFile(ctx, identity.ID); err != nil {
return err
}
return db.Close()
}

View File

@ -29,7 +29,11 @@ type Deprecated struct {
}
// maps deprecated config values to new values if applicable.
func mapDeprecatedConfigs(log *zap.Logger) {
func mapDeprecatedConfigs(log *zap.Logger, cfg *StorageNodeFlags) {
if cfg == nil {
return
}
type migration struct {
newValue interface{}
newConfigString string
@ -38,27 +42,27 @@ func mapDeprecatedConfigs(log *zap.Logger) {
}
migrations := []migration{
{
newValue: &runCfg.Contact.ExternalAddress,
newValue: &cfg.Contact.ExternalAddress,
newConfigString: "contact.external-address",
oldValue: runCfg.Deprecated.Kademlia.ExternalAddress,
oldValue: cfg.Deprecated.Kademlia.ExternalAddress,
oldConfigString: "kademlia.external-address",
},
{
newValue: &runCfg.Operator.Wallet,
newValue: &cfg.Operator.Wallet,
newConfigString: "operator.wallet",
oldValue: runCfg.Deprecated.Kademlia.Operator.Wallet,
oldValue: cfg.Deprecated.Kademlia.Operator.Wallet,
oldConfigString: "kademlia.operator.wallet",
},
{
newValue: &runCfg.Operator.Email,
newValue: &cfg.Operator.Email,
newConfigString: "operator.email",
oldValue: runCfg.Deprecated.Kademlia.Operator.Email,
oldValue: cfg.Deprecated.Kademlia.Operator.Email,
oldConfigString: "kademlia.operator.email",
},
{
newValue: &runCfg.Config.Storage2.Monitor.VerifyDirReadableInterval,
newValue: &cfg.Config.Storage2.Monitor.VerifyDirReadableInterval,
newConfigString: "storage2.monitor.verify-dir-readable-interval",
oldValue: runCfg.Deprecated.Storage2.Monitor.VerifyDirInterval,
oldValue: cfg.Deprecated.Storage2.Monitor.VerifyDirInterval,
oldConfigString: "storage2.monitor.verify-dir-interval",
},
}

View File

@ -4,473 +4,21 @@
package main
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"sort"
"text/tabwriter"
"time"
"github.com/spf13/cobra"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/fpath"
"storj.io/common/memory"
"storj.io/common/storj"
"storj.io/private/cfgstruct"
"storj.io/private/process"
"storj.io/private/version"
"storj.io/storj/multinode/nodes"
"storj.io/storj/private/revocation"
_ "storj.io/storj/private/version" // This attaches version information during release builds.
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/apikeys"
"storj.io/storj/storagenode/storagenodedb"
)
// StorageNodeFlags defines storage node configuration.
type StorageNodeFlags struct {
EditConf bool `default:"false" help:"open config in default editor"`
storagenode.Config
Deprecated
}
var (
rootCmd = &cobra.Command{
Use: "storagenode",
Short: "StorageNode",
}
runCmd = &cobra.Command{
Use: "run",
Short: "Run the storagenode",
RunE: cmdRun,
}
setupCmd = &cobra.Command{
Use: "setup",
Short: "Create config files",
RunE: cmdSetup,
Annotations: map[string]string{"type": "setup"},
}
configCmd = &cobra.Command{
Use: "config",
Short: "Edit config files",
RunE: cmdConfig,
Annotations: map[string]string{"type": "setup"},
}
diagCmd = &cobra.Command{
Use: "diag",
Short: "Diagnostic Tool support",
RunE: cmdDiag,
Annotations: map[string]string{"type": "helper"},
}
dashboardCmd = &cobra.Command{
Use: "dashboard",
Short: "Display a dashboard",
RunE: cmdDashboard,
Annotations: map[string]string{"type": "helper"},
}
gracefulExitInitCmd = &cobra.Command{
Use: "exit-satellite",
Short: "Initiate graceful exit",
Long: "Initiate gracefule exit.\n" +
"The command shows the list of the available satellites that can be exited " +
"and ask for choosing one.",
RunE: cmdGracefulExitInit,
Annotations: map[string]string{"type": "helper"},
}
gracefulExitStatusCmd = &cobra.Command{
Use: "exit-status",
Short: "Display graceful exit status",
RunE: cmdGracefulExitStatus,
Annotations: map[string]string{"type": "helper"},
}
issueAPITokenCmd = &cobra.Command{
Use: "issue-apikey",
Short: "Issue apikey for multinode",
RunE: cmdIssue,
}
nodeInfoCmd = &cobra.Command{
Use: "info",
Short: "Print storage node info",
Long: `Print storage node info.
--json should be specified to print output in JSON format.
It is expected that the JSON output will mostly be piped to 'multinode add -'.
WARNING: The output includes the api secret of the storagenode.
`,
RunE: cmdInfo,
Example: `
#=> print node info
$ storagenode info --config-dir '<path/to/config-dir>' --identity-dir '<path/to/identity-dir>'
#=> print output in JSON format
$ storagenode info --json --config-dir '<path/to/config-dir>' --identity-dir '<path/to/identity-dir>'
#=> add node to multinode dashboard
$ storagenode info --json --config-dir '<path/to/config-dir>' --identity-dir '<path/to/identity-dir>' | multinode add -
`,
Args: cobra.ExactArgs(0),
}
runCfg StorageNodeFlags
setupCfg StorageNodeFlags
diagCfg storagenode.Config
nodeInfoCfg struct {
storagenode.Config
JSON bool `default:"false" help:"print node info in JSON format"`
}
dashboardCfg struct {
Address string `default:"127.0.0.1:7778" help:"address for dashboard service"`
}
defaultDiagDir string
confDir string
identityDir string
useColor bool
)
const (
defaultServerAddr = ":28967"
defaultPrivateServerAddr = "127.0.0.1:7778"
)
func init() {
process.SetHardcodedApplicationName("storagenode")
defaultConfDir := fpath.ApplicationDir("storj", "storagenode")
defaultIdentityDir := fpath.ApplicationDir("storj", "identity", "storagenode")
defaultDiagDir = filepath.Join(defaultConfDir, "storage")
cfgstruct.SetupFlag(zap.L(), rootCmd, &confDir, "config-dir", defaultConfDir, "main directory for storagenode configuration")
cfgstruct.SetupFlag(zap.L(), rootCmd, &identityDir, "identity-dir", defaultIdentityDir, "main directory for storagenode identity credentials")
defaults := cfgstruct.DefaultsFlag(rootCmd)
rootCmd.PersistentFlags().BoolVar(&useColor, "color", false, "use color in user interface")
rootCmd.AddCommand(runCmd)
rootCmd.AddCommand(setupCmd)
rootCmd.AddCommand(configCmd)
rootCmd.AddCommand(diagCmd)
rootCmd.AddCommand(dashboardCmd)
rootCmd.AddCommand(gracefulExitInitCmd)
rootCmd.AddCommand(gracefulExitStatusCmd)
rootCmd.AddCommand(issueAPITokenCmd)
rootCmd.AddCommand(nodeInfoCmd)
process.Bind(runCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(setupCmd, &setupCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir), cfgstruct.SetupMode())
process.Bind(configCmd, &setupCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir), cfgstruct.SetupMode())
process.Bind(diagCmd, &diagCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(dashboardCmd, &dashboardCfg, defaults, cfgstruct.ConfDir(defaultDiagDir))
process.Bind(gracefulExitInitCmd, &diagCfg, defaults, cfgstruct.ConfDir(defaultDiagDir))
process.Bind(gracefulExitStatusCmd, &diagCfg, defaults, cfgstruct.ConfDir(defaultDiagDir))
process.Bind(issueAPITokenCmd, &diagCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(nodeInfoCmd, &nodeInfoCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
}
func cmdRun(cmd *cobra.Command, args []string) (err error) {
// inert constructors only ====
ctx, _ := process.Ctx(cmd)
log := zap.L()
runCfg.Debug.Address = *process.DebugAddrFlag
mapDeprecatedConfigs(log)
identity, err := runCfg.Identity.Load()
if err != nil {
log.Error("Failed to load identity.", zap.Error(err))
return errs.New("Failed to load identity: %+v", err)
}
if err := runCfg.Verify(log); err != nil {
log.Error("Invalid configuration.", zap.Error(err))
return err
}
db, err := storagenodedb.OpenExisting(ctx, log.Named("db"), runCfg.DatabaseConfig())
if err != nil {
return errs.New("Error starting master database on storagenode: %+v", err)
}
defer func() {
err = errs.Combine(err, db.Close())
}()
revocationDB, err := revocation.OpenDBFromCfg(ctx, runCfg.Server.Config)
if err != nil {
return errs.New("Error creating revocation database: %+v", err)
}
defer func() {
err = errs.Combine(err, revocationDB.Close())
}()
peer, err := storagenode.New(log, identity, db, revocationDB, runCfg.Config, version.Build, process.AtomicLevel(cmd))
if err != nil {
return err
}
// okay, start doing stuff ====
_, err = peer.Version.Service.CheckVersion(ctx)
if err != nil {
return err
}
if err := process.InitMetricsWithCertPath(ctx, log, nil, runCfg.Identity.CertPath); err != nil {
log.Warn("Failed to initialize telemetry batcher.", zap.Error(err))
}
err = db.MigrateToLatest(ctx)
if err != nil {
return errs.New("Error creating tables for master database on storagenode: %+v", err)
}
err = db.CheckVersion(ctx)
if err != nil {
return errs.New("Error checking version for storagenode database: %+v", err)
}
preflightEnabled, err := cmd.Flags().GetBool("preflight.database-check")
if err != nil {
return errs.New("Cannot retrieve preflight.database-check flag: %+v", err)
}
if preflightEnabled {
err = db.Preflight(ctx)
if err != nil {
return errs.New("Error during preflight check for storagenode databases: %+v", err)
}
}
if err := peer.Storage2.CacheService.Init(ctx); err != nil {
log.Error("Failed to initialize CacheService.", zap.Error(err))
}
runError := peer.Run(ctx)
closeError := peer.Close()
return errs.Combine(runError, closeError)
}
func cmdSetup(cmd *cobra.Command, args []string) (err error) {
ctx, _ := process.Ctx(cmd)
setupDir, err := filepath.Abs(confDir)
if err != nil {
return err
}
valid, _ := fpath.IsValidSetupDir(setupDir)
if !valid {
return fmt.Errorf("storagenode configuration already exists (%v)", setupDir)
}
identity, err := setupCfg.Identity.Load()
if err != nil {
return err
}
err = os.MkdirAll(setupDir, 0700)
if err != nil {
return err
}
overrides := map[string]interface{}{
"log.level": "info",
}
serverAddress := cmd.Flag("server.address")
if !serverAddress.Changed {
overrides[serverAddress.Name] = defaultServerAddr
}
serverPrivateAddress := cmd.Flag("server.private-address")
if !serverPrivateAddress.Changed {
overrides[serverPrivateAddress.Name] = defaultPrivateServerAddr
}
configFile := filepath.Join(setupDir, "config.yaml")
err = process.SaveConfig(cmd, configFile, process.SaveConfigWithOverrides(overrides))
if err != nil {
return err
}
if setupCfg.EditConf {
return fpath.EditFile(configFile)
}
// create db
db, err := storagenodedb.OpenNew(ctx, zap.L().Named("db"), setupCfg.DatabaseConfig())
if err != nil {
return err
}
if err := db.Pieces().CreateVerificationFile(ctx, identity.ID); err != nil {
return err
}
return db.Close()
}
func cmdConfig(cmd *cobra.Command, args []string) (err error) {
setupDir, err := filepath.Abs(confDir)
if err != nil {
return err
}
// run setup if we can't access the config file
conf := filepath.Join(setupDir, "config.yaml")
if _, err := os.Stat(conf); err != nil {
return cmdSetup(cmd, args)
}
return fpath.EditFile(conf)
}
func cmdIssue(cmd *cobra.Command, args []string) (err error) {
ctx, _ := process.Ctx(cmd)
ident, err := runCfg.Identity.Load()
if err != nil {
zap.L().Fatal("Failed to load identity.", zap.Error(err))
} else {
zap.L().Info("Identity loaded.", zap.Stringer("Node ID", ident.ID))
}
db, err := storagenodedb.OpenExisting(ctx, zap.L().Named("db"), diagCfg.DatabaseConfig())
if err != nil {
return errs.New("Error starting master database on storage node: %v", err)
}
defer func() {
err = errs.Combine(err, db.Close())
}()
service := apikeys.NewService(db.APIKeys())
apiKey, err := service.Issue(ctx)
if err != nil {
return errs.New("Error while trying to issue new api key: %v", err)
}
fmt.Println(apiKey.Secret.String())
return
}
func cmdInfo(cmd *cobra.Command, args []string) (err error) {
ctx, _ := process.Ctx(cmd)
// TODO(clement): add support for getting info for all available storagenodes
identity, err := nodeInfoCfg.Identity.Load()
if err != nil {
zap.L().Fatal("Failed to load identity.", zap.Error(err))
} else {
zap.L().Info("Identity loaded.", zap.Stringer("Node ID", identity.ID))
}
db, err := storagenodedb.OpenExisting(ctx, zap.L().Named("db"), nodeInfoCfg.DatabaseConfig())
if err != nil {
return errs.New("error starting master database on storage node: %v", err)
}
defer func() {
err = errs.Combine(err, db.Close())
}()
service := apikeys.NewService(db.APIKeys())
apiKey, err := service.Issue(ctx)
if err != nil {
return errs.New("error while trying to issue new api key: %v", err)
}
if nodeInfoCfg.JSON {
node := nodes.Node{
ID: identity.ID,
APISecret: apiKey.Secret,
PublicAddress: nodeInfoCfg.Contact.ExternalAddress,
}
data, err := json.Marshal(node)
if err != nil {
return err
}
fmt.Println(string(data))
return nil
}
fmt.Printf(`
ID: %s
API Secret: %s
Public Address: %s
`, identity.ID, apiKey.Secret, nodeInfoCfg.Contact.ExternalAddress)
return nil
}
func cmdDiag(cmd *cobra.Command, args []string) (err error) {
ctx, _ := process.Ctx(cmd)
diagDir, err := filepath.Abs(confDir)
if err != nil {
return err
}
// check if the directory exists
_, err = os.Stat(diagDir)
if err != nil {
fmt.Println("storage node directory doesn't exist", diagDir)
return err
}
db, err := storagenodedb.OpenExisting(ctx, zap.L().Named("db"), diagCfg.DatabaseConfig())
if err != nil {
return errs.New("Error starting master database on storage node: %v", err)
}
defer func() {
err = errs.Combine(err, db.Close())
}()
summaries, err := db.Bandwidth().SummaryBySatellite(ctx, time.Time{}, time.Now())
if err != nil {
fmt.Printf("unable to get bandwidth summary: %v\n", err)
return err
}
satellites := storj.NodeIDList{}
for id := range summaries {
satellites = append(satellites, id)
}
sort.Sort(satellites)
w := tabwriter.NewWriter(os.Stdout, 0, 0, 3, ' ', tabwriter.AlignRight|tabwriter.Debug)
defer func() { err = errs.Combine(err, w.Flush()) }()
fmt.Fprint(w, "Satellite\tTotal\tPut\tGet\tDelete\tAudit Get\tRepair Get\tRepair Put\n")
for _, id := range satellites {
summary := summaries[id]
fmt.Fprintf(w, "%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\n",
id,
memory.Size(summary.Total()),
memory.Size(summary.Put),
memory.Size(summary.Get),
memory.Size(summary.Delete),
memory.Size(summary.GetAudit),
memory.Size(summary.GetRepair),
memory.Size(summary.PutRepair),
)
}
return nil
}
func main() {
process.SetHardcodedApplicationName("storagenode")
if startAsService() {
return
}
rootCmd, _ := newRootCmd(true)
loggerFunc := func(logger *zap.Logger) *zap.Logger {
return logger.With(zap.String("Process", rootCmd.Use))
}

64
cmd/storagenode/root.go Normal file
View File

@ -0,0 +1,64 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"github.com/spf13/cobra"
"go.uber.org/zap"
"storj.io/common/fpath"
"storj.io/private/cfgstruct"
"storj.io/storj/storagenode"
)
// StorageNodeFlags defines storage node configuration.
type StorageNodeFlags struct {
EditConf bool `default:"false" help:"open config in default editor"`
storagenode.Config
Deprecated
}
// Factory contains default values for configuration flags.
type Factory struct {
Defaults cfgstruct.BindOpt
ConfDir string
IdentityDir string
UseColor bool
}
// newRootCmd creates a new root command.
func newRootCmd(setDefaults bool) (*cobra.Command, *Factory) {
cmd := &cobra.Command{
Use: "storagenode",
Short: "Storagenode",
}
factory := &Factory{}
if setDefaults {
defaultConfDir := fpath.ApplicationDir("storj", "storagenode")
defaultIdentityDir := fpath.ApplicationDir("storj", "identity", "storagenode")
cfgstruct.SetupFlag(zap.L(), cmd, &factory.ConfDir, "config-dir", defaultConfDir, "main directory for storagenode configuration")
cfgstruct.SetupFlag(zap.L(), cmd, &factory.IdentityDir, "identity-dir", defaultIdentityDir, "main directory for storagenode identity credentials")
cmd.PersistentFlags().BoolVar(&factory.UseColor, "color", false, "use color in user interface")
factory.Defaults = cfgstruct.DefaultsFlag(cmd)
}
cmd.AddCommand(
newConfigCmd(factory),
newSetupCmd(factory),
newDashboardCmd(factory),
newDiagCmd(factory),
newRunCmd(factory),
newNodeInfoCmd(factory),
newIssueAPIKeyCmd(factory),
newGracefulExitInitCmd(factory),
newGracefulExitStatusCmd(factory),
)
return cmd, factory
}

View File

@ -50,6 +50,7 @@ func TestMapDeprecatedConfigs(t *testing.T) {
expectedEmail: "newEmail",
},
}
runCfg := runCfg{}
for _, c := range cases {
testCase := c
t.Run(testCase.testID, func(t *testing.T) {
@ -59,7 +60,7 @@ func TestMapDeprecatedConfigs(t *testing.T) {
runCfg.Deprecated.Kademlia.Operator.Wallet = testCase.deprecatedWallet
runCfg.Operator.Email = testCase.newEmail
runCfg.Deprecated.Kademlia.Operator.Email = testCase.deprecatedEmail
mapDeprecatedConfigs(log)
mapDeprecatedConfigs(log, &runCfg.StorageNodeFlags)
require.Equal(t, testCase.expectedAddr, runCfg.Contact.ExternalAddress)
require.Equal(t, testCase.expectedWallet, runCfg.Operator.Wallet)
require.Equal(t, testCase.expectedEmail, runCfg.Operator.Email)

View File

@ -15,6 +15,7 @@ package main
import (
"os"
"github.com/spf13/cobra"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"golang.org/x/sys/windows/svc"
@ -22,6 +23,8 @@ import (
"storj.io/private/process"
)
var rootCmd, runCmd *cobra.Command
func startAsService() bool {
isService, err := svc.IsWindowsService()
if err != nil {
@ -40,6 +43,10 @@ func startAsService() bool {
return false
}
var factory *Factory
rootCmd, factory = newRootCmd(true)
runCmd = newRunCmd(factory)
// Initialize the Windows Service handler
err = svc.Run("storagenode", &service{})
if err != nil {