// Copyright (C) 2019 Storj Labs, Inc. // See LICENSE for copying information. package main import ( "context" "fmt" "os" "path/filepath" "sort" "text/tabwriter" "github.com/spf13/cobra" "github.com/zeebo/errs" "go.uber.org/zap" "storj.io/storj/internal/fpath" "storj.io/storj/pkg/cfgstruct" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/process" "storj.io/storj/pkg/storj" "storj.io/storj/satellite" "storj.io/storj/satellite/satellitedb" ) // Satellite defines satellite configuration type Satellite struct { Database string `help:"satellite database connection string" default:"sqlite3://$CONFDIR/master.db"` satellite.Config } var ( rootCmd = &cobra.Command{ Use: "satellite", Short: "Satellite", } runCmd = &cobra.Command{ Use: "run", Short: "Run the satellite", RunE: cmdRun, } setupCmd = &cobra.Command{ Use: "setup", Short: "Create config files", RunE: cmdSetup, Annotations: map[string]string{"type": "setup"}, } diagCmd = &cobra.Command{ Use: "diag", Short: "Diagnostic Tool support", RunE: cmdDiag, } qdiagCmd = &cobra.Command{ Use: "qdiag", Short: "Repair Queue Diagnostic Tool support", RunE: cmdQDiag, } runCfg Satellite setupCfg Satellite diagCfg struct { Database string `help:"satellite database connection string" default:"sqlite3://$CONFDIR/master.db"` } qdiagCfg struct { Database string `help:"satellite database connection string" default:"sqlite3://$CONFDIR/master.db"` QListLimit int `help:"maximum segments that can be requested" default:"1000"` } defaultConfDir = fpath.ApplicationDir("storj", "satellite") // TODO: this path should be defined somewhere else defaultIdentityDir = fpath.ApplicationDir("storj", "identity", "satellite") confDir string identityDir string ) func init() { confDirParam := cfgstruct.FindConfigDirParam() if confDirParam != "" { defaultConfDir = confDirParam } identityDirParam := cfgstruct.FindIdentityDirParam() if identityDirParam != "" { defaultIdentityDir = identityDirParam } rootCmd.PersistentFlags().StringVar(&confDir, "config-dir", defaultConfDir, "main directory for satellite configuration") err := rootCmd.PersistentFlags().SetAnnotation("config-dir", "setup", []string{"true"}) if err != nil { zap.S().Error("Failed to set 'setup' annotation for 'config-dir'") } rootCmd.PersistentFlags().StringVar(&identityDir, "identity-dir", defaultIdentityDir, "main directory for satellite identity credentials") err = rootCmd.PersistentFlags().SetAnnotation("identity-dir", "setup", []string{"true"}) if err != nil { zap.S().Error("Failed to set 'setup' annotation for 'config-dir'") } rootCmd.AddCommand(runCmd) rootCmd.AddCommand(setupCmd) rootCmd.AddCommand(diagCmd) rootCmd.AddCommand(qdiagCmd) cfgstruct.Bind(runCmd.Flags(), &runCfg, cfgstruct.ConfDir(defaultConfDir), cfgstruct.IdentityDir(defaultIdentityDir)) cfgstruct.BindSetup(setupCmd.Flags(), &setupCfg, cfgstruct.ConfDir(defaultConfDir), cfgstruct.IdentityDir(defaultIdentityDir)) cfgstruct.Bind(diagCmd.Flags(), &diagCfg, cfgstruct.ConfDir(defaultConfDir), cfgstruct.IdentityDir(defaultIdentityDir)) cfgstruct.Bind(qdiagCmd.Flags(), &qdiagCfg, cfgstruct.ConfDir(defaultConfDir), cfgstruct.IdentityDir(defaultIdentityDir)) } func cmdRun(cmd *cobra.Command, args []string) (err error) { log := zap.L() identity, err := runCfg.Identity.Load() if err != nil { zap.S().Fatal(err) } ctx := process.Ctx(cmd) if err := process.InitMetricsWithCertPath(ctx, nil, runCfg.Identity.CertPath); err != nil { zap.S().Error("Failed to initialize telemetry batcher: ", err) } db, err := satellitedb.New(runCfg.Database) if err != nil { return errs.New("Error starting master database on satellite: %+v", err) } defer func() { err = errs.Combine(err, db.Close()) }() err = db.CreateTables() if err != nil { return errs.New("Error creating tables for master database on satellite: %+v", err) } peer, err := satellite.New(log, identity, db, &runCfg.Config) if err != nil { return err } runError := peer.Run(ctx) closeError := peer.Close() return errs.Combine(runError, closeError) } func cmdSetup(cmd *cobra.Command, args []string) (err error) { setupDir, err := filepath.Abs(confDir) if err != nil { return err } valid, _ := fpath.IsValidSetupDir(setupDir) if !valid { return fmt.Errorf("satellite configuration already exists (%v)", setupDir) } err = os.MkdirAll(setupDir, 0700) if err != nil { return err } return process.SaveConfigWithAllDefaults(cmd.Flags(), filepath.Join(setupDir, "config.yaml"), nil) } func cmdDiag(cmd *cobra.Command, args []string) (err error) { database, err := satellitedb.New(diagCfg.Database) if err != nil { return errs.New("error connecting to master database on satellite: %+v", err) } defer func() { err := database.Close() if err != nil { fmt.Printf("error closing connection to master database on satellite: %+v\n", err) } }() //get all bandwidth agreements rows already ordered baRows, err := database.BandwidthAgreement().GetAgreements(context.Background()) if err != nil { fmt.Printf("error reading satellite database %v: %v\n", diagCfg.Database, err) return err } // Agreement is a struct that contains a bandwidth agreement and the associated signature type UplinkSummary struct { TotalBytes int64 PutActionCount int64 GetActionCount int64 TotalTransactions int64 // additional attributes add here ... } // attributes per uplinkid summaries := make(map[storj.NodeID]*UplinkSummary) uplinkIDs := storj.NodeIDList{} for _, baRow := range baRows { // deserializing rbad you get payerbwallocation, total & storage node id rbad := baRow.Agreement pbad := rbad.PayerAllocation uplinkID := pbad.UplinkId summary, ok := summaries[uplinkID] if !ok { summaries[uplinkID] = &UplinkSummary{} uplinkIDs = append(uplinkIDs, uplinkID) summary = summaries[uplinkID] } // fill the summary info summary.TotalBytes += rbad.Total summary.TotalTransactions++ switch pbad.GetAction() { case pb.BandwidthAction_PUT: summary.PutActionCount++ case pb.BandwidthAction_GET: summary.GetActionCount++ } } // initialize the table header (fields) const padding = 3 w := tabwriter.NewWriter(os.Stdout, 0, 0, padding, ' ', tabwriter.AlignRight|tabwriter.Debug) fmt.Fprintln(w, "UplinkID\tTotal\t# Of Transactions\tPUT Action\tGET Action\t") // populate the row fields sort.Sort(uplinkIDs) for _, uplinkID := range uplinkIDs { summary := summaries[uplinkID] fmt.Fprint(w, uplinkID, "\t", summary.TotalBytes, "\t", summary.TotalTransactions, "\t", summary.PutActionCount, "\t", summary.GetActionCount, "\t\n") } // display the data return w.Flush() } func cmdQDiag(cmd *cobra.Command, args []string) (err error) { // open the master db database, err := satellitedb.New(qdiagCfg.Database) if err != nil { return errs.New("error connecting to master database on satellite: %+v", err) } defer func() { err := database.Close() if err != nil { fmt.Printf("error closing connection to master database on satellite: %+v\n", err) } }() list, err := database.RepairQueue().Peekqueue(context.Background(), qdiagCfg.QListLimit) if err != nil { return err } // initialize the table header (fields) const padding = 3 w := tabwriter.NewWriter(os.Stdout, 0, 0, padding, ' ', tabwriter.AlignRight|tabwriter.Debug) fmt.Fprintln(w, "Path\tLost Pieces\t") // populate the row fields for _, v := range list { fmt.Fprint(w, v.GetPath(), "\t", v.GetLostPieces(), "\t") } // display the data return w.Flush() } func main() { process.Exec(rootCmd) }