diff --git a/cmd/captplanet/run.go b/cmd/captplanet/run.go index d076811ee..d7d0c4a5f 100644 --- a/cmd/captplanet/run.go +++ b/cmd/captplanet/run.go @@ -28,8 +28,8 @@ import ( "storj.io/storj/pkg/piecestore/psserver" "storj.io/storj/pkg/pointerdb" "storj.io/storj/pkg/process" - "storj.io/storj/pkg/provider" "storj.io/storj/pkg/satellite/satelliteweb" + "storj.io/storj/pkg/server" "storj.io/storj/pkg/utils" "storj.io/storj/satellite/satellitedb" ) @@ -40,7 +40,7 @@ const ( // Satellite is for configuring client type Satellite struct { - Identity provider.IdentityConfig + Server server.Config Kademlia kademlia.Config PointerDB pointerdb.Config Overlay overlay.Config @@ -58,7 +58,7 @@ type Satellite struct { // StorageNode is for configuring storage nodes type StorageNode struct { - Identity provider.IdentityConfig + Server server.Config Kademlia kademlia.Config Storage psserver.Config } @@ -66,7 +66,7 @@ type StorageNode struct { var ( runCmd = &cobra.Command{ Use: "run", - Short: "Run all providers", + Short: "Run all servers", RunE: cmdRun, } @@ -100,14 +100,14 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) { // start satellite go func() { _, _ = fmt.Printf("Starting satellite on %s\n", - runCfg.Satellite.Identity.Server.Address) + runCfg.Satellite.Server.Address) if runCfg.Satellite.Audit.SatelliteAddr == "" { - runCfg.Satellite.Audit.SatelliteAddr = runCfg.Satellite.Identity.Server.Address + runCfg.Satellite.Audit.SatelliteAddr = runCfg.Satellite.Server.Address } if runCfg.Satellite.Web.SatelliteAddr == "" { - runCfg.Satellite.Web.SatelliteAddr = runCfg.Satellite.Identity.Server.Address + runCfg.Satellite.Web.SatelliteAddr = runCfg.Satellite.Server.Address } database, err := satellitedb.New(runCfg.Satellite.Database) @@ -126,7 +126,7 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) { ctx = context.WithValue(ctx, "masterdb", database) // Run satellite - errch <- runCfg.Satellite.Identity.Run(ctx, + errch <- runCfg.Satellite.Server.Run(ctx, grpcauth.NewAPIKeyInterceptor(), runCfg.Satellite.Kademlia, runCfg.Satellite.Audit, @@ -152,23 +152,23 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) { // start the storagenodes for i, v := range runCfg.StorageNodes { go func(i int, v StorageNode) { - identity, err := v.Identity.Load() + identity, err := v.Server.Identity.Load() if err != nil { return } - address := v.Identity.Server.Address + address := v.Server.Address storagenode := fmt.Sprintf("%s:%s", identity.ID.String(), address) _, _ = fmt.Printf("Starting storage node %d %s (kad on %s)\n", i, storagenode, address) - errch <- v.Identity.Run(ctx, nil, v.Kademlia, v.Storage) + errch <- v.Server.Run(ctx, nil, v.Kademlia, v.Storage) }(i, v) } // start s3 uplink go func() { _, _ = fmt.Printf("Starting s3-gateway on %s\nAccess key: %s\nSecret key: %s\n", - runCfg.Uplink.Identity.Server.Address, + runCfg.Uplink.Server.Address, runCfg.Uplink.Minio.AccessKey, runCfg.Uplink.Minio.SecretKey) errch <- runCfg.Uplink.Run(ctx) diff --git a/cmd/captplanet/setup.go b/cmd/captplanet/setup.go index ace10b678..da6ae2985 100644 --- a/cmd/captplanet/setup.go +++ b/cmd/captplanet/setup.go @@ -132,40 +132,37 @@ func cmdSetup(cmd *cobra.Command, args []string) (err error) { overlayAddr := joinHostPort(setupCfg.ListenHost, startingPort+1) overrides := map[string]interface{}{ - "satellite.identity.cert-path": setupCfg.SatelliteIdentity.CertPath, - "satellite.identity.key-path": setupCfg.SatelliteIdentity.KeyPath, - "satellite.identity.server.address": joinHostPort(setupCfg.ListenHost, startingPort+1), - "satellite.identity.server.revocation-dburl": "redis://127.0.0.1:6378?db=2&password=abc123", - "satellite.kademlia.bootstrap-addr": joinHostPort(setupCfg.ListenHost, startingPort+1), - "satellite.pointer-db.database-url": "bolt://" + filepath.Join(setupDir, "satellite", "pointerdb.db"), - "satellite.overlay.database-url": "bolt://" + filepath.Join(setupDir, "satellite", "overlay.db"), - "satellite.kademlia.alpha": 3, - "satellite.repairer.queue-address": "redis://127.0.0.1:6378?db=1&password=abc123", - "satellite.repairer.overlay-addr": overlayAddr, - "satellite.repairer.pointer-db-addr": joinHostPort(setupCfg.ListenHost, startingPort+1), - "satellite.repairer.api-key": setupCfg.APIKey, - "uplink.identity.cert-path": setupCfg.UplinkIdentity.CertPath, - "uplink.identity.key-path": setupCfg.UplinkIdentity.KeyPath, - "uplink.identity.server.address": joinHostPort(setupCfg.ListenHost, startingPort), - "uplink.identity.server.revocation-dburl": "redis://127.0.0.1:6378?db=2&password=abc123", - "uplink.client.overlay-addr": joinHostPort(setupCfg.ListenHost, startingPort+1), - "uplink.client.pointer-db-addr": joinHostPort(setupCfg.ListenHost, startingPort+1), - "uplink.minio.dir": filepath.Join(setupDir, "uplink", "minio"), - "uplink.enc.key": setupCfg.EncKey, - "uplink.client.api-key": setupCfg.APIKey, - "uplink.rs.min-threshold": 1 * len(runCfg.StorageNodes) / 5, - "uplink.rs.repair-threshold": 2 * len(runCfg.StorageNodes) / 5, - "uplink.rs.success-threshold": 3 * len(runCfg.StorageNodes) / 5, - "uplink.rs.max-threshold": 4 * len(runCfg.StorageNodes) / 5, - "kademlia.bucket-size": 4, - "kademlia.replacement-cache-size": 1, + "satellite.server.identity.cert-path": setupCfg.SatelliteIdentity.CertPath, + "satellite.server.identity.key-path": setupCfg.SatelliteIdentity.KeyPath, + "satellite.server.address": joinHostPort(setupCfg.ListenHost, startingPort+1), + "satellite.server.revocation-dburl": "redis://127.0.0.1:6378?db=2&password=abc123", + "satellite.kademlia.bootstrap-addr": joinHostPort(setupCfg.ListenHost, startingPort+1), + "satellite.pointer-db.database-url": "bolt://" + filepath.Join(setupDir, "satellite", "pointerdb.db"), + "satellite.kademlia.alpha": 3, + "satellite.repairer.overlay-addr": overlayAddr, + "satellite.repairer.pointer-db-addr": joinHostPort(setupCfg.ListenHost, startingPort+1), + "satellite.repairer.api-key": setupCfg.APIKey, + "uplink.identity.cert-path": setupCfg.UplinkIdentity.CertPath, + "uplink.identity.key-path": setupCfg.UplinkIdentity.KeyPath, + "uplink.server.address": joinHostPort(setupCfg.ListenHost, startingPort), + "uplink.client.overlay-addr": joinHostPort(setupCfg.ListenHost, startingPort+1), + "uplink.client.pointer-db-addr": joinHostPort(setupCfg.ListenHost, startingPort+1), + "uplink.minio.dir": filepath.Join(setupDir, "uplink", "minio"), + "uplink.enc.key": setupCfg.EncKey, + "uplink.client.api-key": setupCfg.APIKey, + "uplink.rs.min-threshold": 1 * len(runCfg.StorageNodes) / 5, + "uplink.rs.repair-threshold": 2 * len(runCfg.StorageNodes) / 5, + "uplink.rs.success-threshold": 3 * len(runCfg.StorageNodes) / 5, + "uplink.rs.max-threshold": 4 * len(runCfg.StorageNodes) / 5, + "kademlia.bucket-size": 4, + "kademlia.replacement-cache-size": 1, // TODO: this will eventually go away "pointer-db.auth.api-key": setupCfg.APIKey, // TODO: this is a source of bugs. this value should be pulled from // kademlia instead - "piecestore.agreementsender.overlay_addr": overlayAddr, + "piecestore.agreementsender.overlay-addr": overlayAddr, "log.development": true, "log.level": "debug", @@ -174,15 +171,13 @@ func cmdSetup(cmd *cobra.Command, args []string) (err error) { for i := 0; i < len(runCfg.StorageNodes); i++ { storagenodePath := filepath.Join(setupDir, fmt.Sprintf("f%d", i)) storagenode := fmt.Sprintf("storage-nodes.%02d.", i) - overrides[storagenode+"identity.cert-path"] = filepath.Join( + overrides[storagenode+"server.identity.cert-path"] = filepath.Join( storagenodePath, "identity.cert") - overrides[storagenode+"identity.key-path"] = filepath.Join( + overrides[storagenode+"server.identity.key-path"] = filepath.Join( storagenodePath, "identity.key") - overrides[storagenode+"identity.server.address"] = joinHostPort( + overrides[storagenode+"server.address"] = joinHostPort( setupCfg.ListenHost, startingPort+i*2+3) - overrides[storagenode+"identity.server.revocation-dburl"] = "bolt://" + filepath.Join( - storagenodePath, "revocations.db") - overrides[storagenode+"identity.server.revocation-dburl"] = "redis://127.0.0.1:6378?db=2&password=abc123" + overrides[storagenode+"server.revocation-dburl"] = "redis://127.0.0.1:6378?db=2&password=abc123" overrides[storagenode+"kademlia.bootstrap-addr"] = joinHostPort( setupCfg.ListenHost, startingPort+1) overrides[storagenode+"storage.path"] = filepath.Join(storagenodePath, "data") diff --git a/cmd/certificates/main.go b/cmd/certificates/main.go index d03b4808a..e68399a4d 100644 --- a/cmd/certificates/main.go +++ b/cmd/certificates/main.go @@ -20,8 +20,9 @@ import ( "storj.io/storj/internal/fpath" "storj.io/storj/pkg/certificates" "storj.io/storj/pkg/cfgstruct" + "storj.io/storj/pkg/identity" "storj.io/storj/pkg/process" - "storj.io/storj/pkg/provider" + "storj.io/storj/pkg/server" "storj.io/storj/pkg/utils" ) @@ -74,15 +75,15 @@ var ( setupCfg struct { // NB: cert and key paths overridden in setup - CA provider.CASetupConfig + CA identity.CASetupConfig // NB: cert and key paths overridden in setup - Identity provider.IdentitySetupConfig + Identity identity.SetupConfig certificates.CertSignerConfig } runCfg struct { CertSigner certificates.CertSignerConfig - Identity provider.IdentityConfig + Server server.Config } authCreateCfg struct { @@ -147,7 +148,7 @@ func cmdSetup(cmd *cobra.Command, args []string) error { setupCfg.Identity.CertPath = filepath.Join(setupDir, "identity.cert") setupCfg.Identity.KeyPath = filepath.Join(setupDir, "identity.key") - err = provider.SetupCA(process.Ctx(cmd), setupCfg.CA) + err = identity.SetupCA(process.Ctx(cmd), setupCfg.CA) if err != nil { return err } @@ -165,7 +166,7 @@ func cmdSetup(cmd *cobra.Command, args []string) error { func cmdRun(cmd *cobra.Command, args []string) error { ctx := process.Ctx(cmd) - return runCfg.Identity.Run(ctx, nil, runCfg.CertSigner) + return runCfg.Server.Run(ctx, nil, runCfg.CertSigner) } func cmdCreateAuth(cmd *cobra.Command, args []string) error { diff --git a/cmd/satellite/main.go b/cmd/satellite/main.go index c8f352467..0848101b5 100644 --- a/cmd/satellite/main.go +++ b/cmd/satellite/main.go @@ -23,12 +23,13 @@ import ( "storj.io/storj/pkg/datarepair/checker" "storj.io/storj/pkg/datarepair/repairer" "storj.io/storj/pkg/discovery" + "storj.io/storj/pkg/identity" "storj.io/storj/pkg/kademlia" "storj.io/storj/pkg/overlay" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/pointerdb" "storj.io/storj/pkg/process" - "storj.io/storj/pkg/provider" + "storj.io/storj/pkg/server" "storj.io/storj/pkg/storj" "storj.io/storj/satellite/satellitedb" ) @@ -61,7 +62,7 @@ var ( } runCfg struct { - Identity provider.IdentityConfig + Server server.Config Kademlia kademlia.Config PointerDB pointerdb.Config Overlay overlay.Config @@ -73,8 +74,8 @@ var ( Discovery discovery.Config } setupCfg struct { - CA provider.CASetupConfig - Identity provider.IdentitySetupConfig + CA identity.CASetupConfig + Identity identity.SetupConfig Overwrite bool `default:"false" help:"whether to overwrite pre-existing configuration files"` } diagCfg struct { @@ -125,7 +126,7 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) { //nolint ignoring context rules to not create cyclic dependency, will be removed later ctx = context.WithValue(ctx, "masterdb", database) - return runCfg.Identity.Run( + return runCfg.Server.Run( ctx, grpcauth.NewAPIKeyInterceptor(), runCfg.Kademlia, @@ -169,7 +170,7 @@ func cmdSetup(cmd *cobra.Command, args []string) (err error) { setupCfg.Identity.CertPath = filepath.Join(setupDir, "identity.cert") setupCfg.Identity.KeyPath = filepath.Join(setupDir, "identity.key") } - err = provider.SetupIdentity(process.Ctx(cmd), setupCfg.CA, setupCfg.Identity) + err = identity.SetupIdentity(process.Ctx(cmd), setupCfg.CA, setupCfg.Identity) if err != nil { return err } diff --git a/cmd/storagenode/main.go b/cmd/storagenode/main.go index 0a51eac88..2e847532a 100644 --- a/cmd/storagenode/main.go +++ b/cmd/storagenode/main.go @@ -18,12 +18,13 @@ import ( "storj.io/storj/internal/fpath" "storj.io/storj/pkg/cfgstruct" + "storj.io/storj/pkg/identity" "storj.io/storj/pkg/kademlia" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/piecestore/psserver" "storj.io/storj/pkg/piecestore/psserver/psdb" "storj.io/storj/pkg/process" - "storj.io/storj/pkg/provider" + "storj.io/storj/pkg/server" "storj.io/storj/pkg/storj" ) @@ -50,13 +51,13 @@ var ( } runCfg struct { - Identity provider.IdentityConfig + Server server.Config Kademlia kademlia.Config Storage psserver.Config } setupCfg struct { - CA provider.CASetupConfig - Identity provider.IdentitySetupConfig + CA identity.CASetupConfig + Identity identity.SetupConfig Overwrite bool `default:"false" help:"whether to overwrite pre-existing configuration files"` } diagCfg struct { @@ -104,7 +105,7 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) { zap.S().Info("Farmer wallet: ", farmerConfig.Wallet) } - return runCfg.Identity.Run(process.Ctx(cmd), nil, runCfg.Kademlia, runCfg.Storage) + return runCfg.Server.Run(process.Ctx(cmd), nil, runCfg.Kademlia, runCfg.Storage) } func cmdSetup(cmd *cobra.Command, args []string) (err error) { @@ -134,7 +135,7 @@ func cmdSetup(cmd *cobra.Command, args []string) (err error) { setupCfg.Identity.CertPath = filepath.Join(setupDir, "identity.cert") setupCfg.Identity.KeyPath = filepath.Join(setupDir, "identity.key") - err = provider.SetupIdentity(process.Ctx(cmd), setupCfg.CA, setupCfg.Identity) + err = identity.SetupIdentity(process.Ctx(cmd), setupCfg.CA, setupCfg.Identity) if err != nil { return err } diff --git a/cmd/uplink/cmd/run.go b/cmd/uplink/cmd/run.go index 42a2e1004..440100a47 100644 --- a/cmd/uplink/cmd/run.go +++ b/cmd/uplink/cmd/run.go @@ -26,7 +26,7 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) { return fmt.Errorf("Invalid argument %#v. Try 'uplink run'", flagname) } - address := cfg.Identity.Server.Address + address := cfg.Server.Address host, port, err := net.SplitHostPort(address) if err != nil { return err diff --git a/pkg/provider/cert_authority_test.go b/pkg/identity/cert_authority_test.go similarity index 98% rename from pkg/provider/cert_authority_test.go rename to pkg/identity/cert_authority_test.go index 4356b6f6e..ec9ff2949 100644 --- a/pkg/provider/cert_authority_test.go +++ b/pkg/identity/cert_authority_test.go @@ -1,7 +1,7 @@ // Copyright (C) 2018 Storj Labs, Inc. // See LICENSE for copying information. -package provider +package identity import ( "context" diff --git a/pkg/provider/certificate_authority.go b/pkg/identity/certificate_authority.go similarity index 97% rename from pkg/provider/certificate_authority.go rename to pkg/identity/certificate_authority.go index d4ff1cea3..37e89283b 100644 --- a/pkg/provider/certificate_authority.go +++ b/pkg/identity/certificate_authority.go @@ -1,7 +1,7 @@ // Copyright (C) 2018 Storj Labs, Inc. // See LICENSE for copying information. -package provider +package identity import ( "bytes" @@ -154,7 +154,7 @@ func (pc PeerCAConfig) Load() (*PeerCertificateAuthority, error) { return nil, peertls.ErrNotExist.Wrap(err) } - chain, err := decodeAndParseChainPEM(chainPEM) + chain, err := DecodeAndParseChainPEM(chainPEM) if err != nil { return nil, errs.New("failed to load identity %#v: %v", pc.CertPath, err) @@ -175,7 +175,10 @@ func (pc PeerCAConfig) Load() (*PeerCertificateAuthority, error) { } // NewCA creates a new full identity with the given difficulty -func NewCA(ctx context.Context, opts NewCAOptions) (*FullCertificateAuthority, error) { +func NewCA(ctx context.Context, opts NewCAOptions) ( + rv *FullCertificateAuthority, err error) { + defer mon.Task()(&ctx)(&err) + if opts.Concurrency < 1 { opts.Concurrency = 1 } diff --git a/pkg/identity/common.go b/pkg/identity/common.go new file mode 100644 index 000000000..bb974bbb4 --- /dev/null +++ b/pkg/identity/common.go @@ -0,0 +1,16 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package identity + +import ( + "github.com/zeebo/errs" + monkit "gopkg.in/spacemonkeygo/monkit.v2" +) + +var ( + mon = monkit.Package() + + // Error is a pkg/identity error + Error = errs.Class("pkg/identity error") +) diff --git a/pkg/provider/identity.go b/pkg/identity/identity.go similarity index 64% rename from pkg/provider/identity.go rename to pkg/identity/identity.go index d26f67a82..1551f0332 100644 --- a/pkg/provider/identity.go +++ b/pkg/identity/identity.go @@ -1,7 +1,7 @@ // Copyright (C) 2018 Storj Labs, Inc. // See LICENSE for copying information. -package provider +package identity import ( "bytes" @@ -10,11 +10,8 @@ import ( "crypto/tls" "crypto/x509" "io/ioutil" - "net" - "os" "github.com/zeebo/errs" - "go.uber.org/zap" "golang.org/x/crypto/sha3" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -52,59 +49,26 @@ type FullIdentity struct { Key crypto.PrivateKey } -// IdentitySetupConfig allows you to run a set of Responsibilities with the given +// SetupConfig allows you to run a set of Responsibilities with the given // identity. You can also just load an Identity from disk. -type IdentitySetupConfig struct { +type SetupConfig struct { CertPath string `help:"path to the certificate chain for this identity" default:"$CONFDIR/identity.cert"` KeyPath string `help:"path to the private key for this identity" default:"$CONFDIR/identity.key"` Overwrite bool `help:"if true, existing identity certs AND keys will overwritten for" default:"false"` Version string `help:"semantic version of identity storage format" default:"0"` - Server ServerConfig } -// IdentityConfig allows you to run a set of Responsibilities with the given +// Config allows you to run a set of Responsibilities with the given // identity. You can also just load an Identity from disk. -type IdentityConfig struct { +type Config struct { CertPath string `help:"path to the certificate chain for this identity" default:"$CONFDIR/identity.cert"` KeyPath string `help:"path to the private key for this identity" default:"$CONFDIR/identity.key"` - Server ServerConfig -} - -// ServerConfig holds server specific configuration parameters -type ServerConfig struct { - RevocationDBURL string `help:"url for revocation database (e.g. bolt://some.db OR redis://127.0.0.1:6378?db=2&password=abc123)" default:"bolt://$CONFDIR/revocations.db"` - PeerCAWhitelistPath string `help:"path to the CA cert whitelist (peer identities must be signed by one these to be verified)"` - Address string `help:"address to listen on" default:":7777"` - Extensions peertls.TLSExtConfig -} - -// ServerOptions holds config, identity, and peer verification function data for use with a grpc server. -type ServerOptions struct { - Config ServerConfig - Ident *FullIdentity - RevDB *peertls.RevocationDB - PCVFuncs []peertls.PeerCertVerificationFunc -} - -// NewServerOptions is a constructor for `serverOptions` given an identity and config -func NewServerOptions(i *FullIdentity, c ServerConfig) (*ServerOptions, error) { - serverOpts := &ServerOptions{ - Config: c, - Ident: i, - } - - err := c.configure(serverOpts) - if err != nil { - return nil, err - } - - return serverOpts, nil } // FullIdentityFromPEM loads a FullIdentity from a certificate chain and // private key PEM-encoded bytes func FullIdentityFromPEM(chainPEM, keyPEM []byte) (*FullIdentity, error) { - chain, err := decodeAndParseChainPEM(chainPEM) + chain, err := DecodeAndParseChainPEM(chainPEM) if err != nil { return nil, errs.Wrap(err) } @@ -213,18 +177,18 @@ func NewFullIdentity(ctx context.Context, difficulty uint16, concurrency uint) ( } // Status returns the status of the identity cert/key files for the config -func (is IdentitySetupConfig) Status() TLSFilesStatus { +func (is SetupConfig) Status() TLSFilesStatus { return statTLSFiles(is.CertPath, is.KeyPath) } // Create generates and saves a CA using the config -func (is IdentitySetupConfig) Create(ca *FullCertificateAuthority) (*FullIdentity, error) { +func (is SetupConfig) Create(ca *FullCertificateAuthority) (*FullIdentity, error) { fi, err := ca.NewIdentity() if err != nil { return nil, err } fi.CA = ca.Cert - ic := IdentityConfig{ + ic := Config{ CertPath: is.CertPath, KeyPath: is.KeyPath, } @@ -232,7 +196,7 @@ func (is IdentitySetupConfig) Create(ca *FullCertificateAuthority) (*FullIdentit } // Load loads a FullIdentity from the config -func (ic IdentityConfig) Load() (*FullIdentity, error) { +func (ic Config) Load() (*FullIdentity, error) { c, err := ioutil.ReadFile(ic.CertPath) if err != nil { return nil, peertls.ErrNotExist.Wrap(err) @@ -250,7 +214,7 @@ func (ic IdentityConfig) Load() (*FullIdentity, error) { } // Save saves a FullIdentity according to the config -func (ic IdentityConfig) Save(fi *FullIdentity) error { +func (ic Config) Save(fi *FullIdentity) error { var ( certData, keyData bytes.Buffer writeChainErr, writeChainDataErr, writeKeyErr, writeKeyDataErr error @@ -280,37 +244,6 @@ func (ic IdentityConfig) Save(fi *FullIdentity) error { ) } -// Run will run the given responsibilities with the configured identity. -func (ic IdentityConfig) Run(ctx context.Context, interceptor grpc.UnaryServerInterceptor, responsibilities ...Responsibility) (err error) { - defer mon.Task()(&ctx)(&err) - - ident, err := ic.Load() - if err != nil { - return err - } - - lis, err := net.Listen("tcp", ic.Server.Address) - if err != nil { - return err - } - defer func() { _ = lis.Close() }() - - opts, err := NewServerOptions(ident, ic.Server) - if err != nil { - return err - } - defer func() { err = utils.CombineErrors(err, opts.RevDB.Close()) }() - - s, err := NewProvider(opts, lis, interceptor, responsibilities...) - if err != nil { - return err - } - defer func() { _ = s.Close() }() - - zap.S().Infof("Node %s started", s.Identity().ID) - return s.Run(ctx) -} - // RestChainRaw returns the rest (excluding leaf and CA) of the certficate chain as a 2d byte slice func (fi *FullIdentity) RestChainRaw() [][]byte { var chain [][]byte @@ -369,77 +302,9 @@ func (fi *FullIdentity) DialOption(id storj.NodeID) (grpc.DialOption, error) { return grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)), nil } -// NewRevDB returns a new revocation database given the config -func (c ServerConfig) NewRevDB() (*peertls.RevocationDB, error) { - driver, source, err := utils.SplitDBURL(c.RevocationDBURL) - if err != nil { - return nil, peertls.ErrRevocationDB.Wrap(err) - } - - var db *peertls.RevocationDB - switch driver { - case "bolt": - db, err = peertls.NewRevocationDBBolt(source) - if err != nil { - return nil, peertls.ErrRevocationDB.Wrap(err) - } - zap.S().Info("Starting overlay cache with BoltDB") - case "redis": - db, err = peertls.NewRevocationDBRedis(c.RevocationDBURL) - if err != nil { - return nil, peertls.ErrRevocationDB.Wrap(err) - } - zap.S().Info("Starting overlay cache with Redis") - default: - return nil, peertls.ErrRevocationDB.New("database scheme not supported: %s", driver) - } - - return db, nil -} - -// configure adds peer certificate verification functions and revocation datase to the config. -func (c ServerConfig) configure(opts *ServerOptions) (err error) { - var pcvs []peertls.PeerCertVerificationFunc - parseOpts := peertls.ParseExtOptions{} - - if c.PeerCAWhitelistPath != "" { - caWhitelist, err := loadWhitelist(c.PeerCAWhitelistPath) - if err != nil { - return err - } - parseOpts.CAWhitelist = caWhitelist - pcvs = append(pcvs, peertls.VerifyCAWhitelist(caWhitelist)) - } - - if c.Extensions.Revocation { - opts.RevDB, err = c.NewRevDB() - if err != nil { - return err - } - pcvs = append(pcvs, peertls.VerifyUnrevokedChainFunc(opts.RevDB)) - } - - exts := peertls.ParseExtensions(c.Extensions, parseOpts) - pcvs = append(pcvs, exts.VerifyFunc()) - - // NB: remove nil elements - for i, f := range pcvs { - if f == nil { - copy(pcvs[i:], pcvs[i+1:]) - pcvs = pcvs[:len(pcvs)-1] - } - } - - opts.PCVFuncs = pcvs - return nil -} - -func (so *ServerOptions) grpcOpts() (grpc.ServerOption, error) { - return so.Ident.ServerOption(so.PCVFuncs...) -} - func verifyIdentity(id storj.NodeID) peertls.PeerCertVerificationFunc { - return func(_ [][]byte, parsedChains [][]*x509.Certificate) error { + return func(_ [][]byte, parsedChains [][]*x509.Certificate) (err error) { + defer mon.TaskNamed("verifyIdentity")(nil)(&err) if id == (storj.NodeID{}) { return nil } @@ -456,19 +321,3 @@ func verifyIdentity(id storj.NodeID) peertls.PeerCertVerificationFunc { return nil } } - -func loadWhitelist(path string) ([]*x509.Certificate, error) { - w, err := ioutil.ReadFile(path) - if err != nil && !os.IsNotExist(err) { - return nil, err - } - - var whitelist []*x509.Certificate - if w != nil { - whitelist, err = decodeAndParseChainPEM(w) - if err != nil { - return nil, errs.Wrap(err) - } - } - return whitelist, nil -} diff --git a/pkg/provider/identity_test.go b/pkg/identity/identity_test.go similarity index 67% rename from pkg/provider/identity_test.go rename to pkg/identity/identity_test.go index 82d146ba0..186d3c062 100644 --- a/pkg/provider/identity_test.go +++ b/pkg/identity/identity_test.go @@ -1,7 +1,7 @@ // Copyright (C) 2018 Storj Labs, Inc. // See LICENSE for copying information. -package provider_test +package identity_test import ( "bytes" @@ -9,17 +9,15 @@ import ( "crypto/ecdsa" "crypto/x509" "encoding/pem" - "io/ioutil" "os" - "reflect" "runtime" "testing" "github.com/stretchr/testify/assert" "storj.io/storj/internal/testcontext" + "storj.io/storj/pkg/identity" "storj.io/storj/pkg/peertls" - "storj.io/storj/pkg/provider" ) func TestPeerIdentityFromCertChain(t *testing.T) { @@ -41,7 +39,7 @@ func TestPeerIdentityFromCertChain(t *testing.T) { leafCert, err := peertls.NewCert(leafKey, caKey, leafTemplate, caTemplate) assert.NoError(t, err) - peerIdent, err := provider.PeerIdentityFromCerts(leafCert, caCert, nil) + peerIdent, err := identity.PeerIdentityFromCerts(leafCert, caCert, nil) assert.NoError(t, err) assert.Equal(t, caCert, peerIdent.CA) assert.Equal(t, leafCert, peerIdent.Leaf) @@ -85,18 +83,18 @@ func TestFullIdentityFromPEM(t *testing.T) { keyPEM := bytes.NewBuffer([]byte{}) assert.NoError(t, pem.Encode(keyPEM, peertls.NewKeyBlock(leafKeyBytes))) - fullIdent, err := provider.FullIdentityFromPEM(chainPEM.Bytes(), keyPEM.Bytes()) + fullIdent, err := identity.FullIdentityFromPEM(chainPEM.Bytes(), keyPEM.Bytes()) assert.NoError(t, err) assert.Equal(t, leafCert.Raw, fullIdent.Leaf.Raw) assert.Equal(t, caCert.Raw, fullIdent.CA.Raw) assert.Equal(t, leafKey, fullIdent.Key) } -func TestIdentityConfig_SaveIdentity(t *testing.T) { +func TestConfig_SaveIdentity(t *testing.T) { ctx := testcontext.New(t) defer ctx.Cleanup() - ic := &provider.IdentityConfig{ + ic := &identity.Config{ CertPath: ctx.File("chain.pem"), KeyPath: ctx.File("key.pem"), } @@ -145,7 +143,7 @@ func TestIdentityConfig_SaveIdentity(t *testing.T) { } func TestVerifyPeer(t *testing.T) { - ca, err := provider.NewCA(context.Background(), provider.NewCAOptions{ + ca, err := identity.NewCA(context.Background(), identity.NewCAOptions{ Difficulty: 12, Concurrency: 4, }) @@ -158,91 +156,7 @@ func TestVerifyPeer(t *testing.T) { assert.NoError(t, err) } -func TestNewServerOptions(t *testing.T) { - ctx := testcontext.New(t) - defer ctx.Cleanup() - - fi := pregeneratedIdentity(t) - - whitelistPath := ctx.File("whitelist.pem") - - chainData, err := peertls.ChainBytes(fi.CA) - assert.NoError(t, err) - - err = ioutil.WriteFile(whitelistPath, chainData, 0644) - assert.NoError(t, err) - - cases := []struct { - testID string - config provider.ServerConfig - pcvFuncsLen int - }{ - { - "default", - provider.ServerConfig{}, - 0, - }, { - "revocation processing", - provider.ServerConfig{ - RevocationDBURL: "bolt://" + ctx.File("revocation1.db"), - Extensions: peertls.TLSExtConfig{ - Revocation: true, - }, - }, - 2, - }, { - "ca whitelist verification", - provider.ServerConfig{ - PeerCAWhitelistPath: whitelistPath, - }, - 1, - }, { - "ca whitelist verification and whitelist signed leaf verification", - provider.ServerConfig{ - // NB: file doesn't actually exist - PeerCAWhitelistPath: whitelistPath, - Extensions: peertls.TLSExtConfig{ - WhitelistSignedLeaf: true, - }, - }, - 2, - }, { - "revocation processing and whitelist verification", - provider.ServerConfig{ - // NB: file doesn't actually exist - PeerCAWhitelistPath: whitelistPath, - RevocationDBURL: "bolt://" + ctx.File("revocation2.db"), - Extensions: peertls.TLSExtConfig{ - Revocation: true, - }, - }, - 3, - }, { - "revocation processing, whitelist, and signed leaf verification", - provider.ServerConfig{ - // NB: file doesn't actually exist - PeerCAWhitelistPath: whitelistPath, - RevocationDBURL: "bolt://" + ctx.File("revocation3.db"), - Extensions: peertls.TLSExtConfig{ - Revocation: true, - WhitelistSignedLeaf: true, - }, - }, - 3, - }, - } - - for _, c := range cases { - t.Log(c.testID) - opts, err := provider.NewServerOptions(fi, c.config) - assert.NoError(t, err) - assert.True(t, reflect.DeepEqual(fi, opts.Ident)) - assert.Equal(t, c.config, opts.Config) - assert.Len(t, opts.PCVFuncs, c.pcvFuncsLen) - } -} - -func pregeneratedIdentity(t *testing.T) *provider.FullIdentity { +func pregeneratedIdentity(t *testing.T) *identity.FullIdentity { const chain = `-----BEGIN CERTIFICATE----- MIIBQDCB56ADAgECAhB+u3d03qyW/ROgwy/ZsPccMAoGCCqGSM49BAMCMAAwIhgP MDAwMTAxMDEwMDAwMDBaGA8wMDAxMDEwMTAwMDAwMFowADBZMBMGByqGSM49AgEG @@ -268,7 +182,7 @@ AwEHoUQDQgAEoLy/0hs5deTXZunRumsMkiHpF0g8wAc58aXANmr7Mxx9tzoIYFnx 0YN4VDKdCtUJa29yA6TIz1MiIDUAcB5YCA== -----END EC PRIVATE KEY-----` - fi, err := provider.FullIdentityFromPEM([]byte(chain), []byte(key)) + fi, err := identity.FullIdentityFromPEM([]byte(chain), []byte(key)) assert.NoError(t, err) return fi diff --git a/pkg/identity/setup.go b/pkg/identity/setup.go new file mode 100644 index 000000000..963a04025 --- /dev/null +++ b/pkg/identity/setup.go @@ -0,0 +1,62 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package identity + +import ( + "context" + "time" + + "github.com/zeebo/errs" +) + +var ( + // ErrSetup is returned when there's an error with setup + ErrSetup = errs.Class("setup error") +) + +// SetupIdentity ensures a CA and identity exist +func SetupIdentity(ctx context.Context, c CASetupConfig, i SetupConfig) error { + if s := c.Status(); s != NoCertNoKey && !c.Overwrite { + return ErrSetup.New("certificate authority file(s) exist: %s", s) + } + + t, err := time.ParseDuration(c.Timeout) + if err != nil { + return errs.Wrap(err) + } + ctx, cancel := context.WithTimeout(ctx, t) + defer cancel() + + // Create a new certificate authority + ca, err := c.Create(ctx) + if err != nil { + return err + } + + if s := i.Status(); s != NoCertNoKey && !i.Overwrite { + return ErrSetup.New("identity file(s) exist: %s", s) + } + + // Create identity from new CA + _, err = i.Create(ca) + return err +} + +// SetupCA ensures a CA exists +func SetupCA(ctx context.Context, c CASetupConfig) error { + if s := c.Status(); s != NoCertNoKey && !c.Overwrite { + return ErrSetup.New("certificate authority file(s) exist: %s", s) + } + + t, err := time.ParseDuration(c.Timeout) + if err != nil { + return errs.Wrap(err) + } + ctx, cancel := context.WithTimeout(ctx, t) + defer cancel() + + // Create a new certificate authority + _, err = c.Create(ctx) + return err +} diff --git a/pkg/provider/utils.go b/pkg/identity/utils.go similarity index 97% rename from pkg/provider/utils.go rename to pkg/identity/utils.go index 9604832af..6a9cc9db9 100644 --- a/pkg/provider/utils.go +++ b/pkg/identity/utils.go @@ -1,7 +1,7 @@ // Copyright (C) 2018 Storj Labs, Inc. // See LICENSE for copying information. -package provider +package identity import ( "context" @@ -45,7 +45,8 @@ type encodedChain struct { extensions [][][]byte } -func decodeAndParseChainPEM(PEMBytes []byte) ([]*x509.Certificate, error) { +// DecodeAndParseChainPEM parses a PEM chain +func DecodeAndParseChainPEM(PEMBytes []byte) ([]*x509.Certificate, error) { var ( encChain encodedChain blockErrs utils.ErrorGroup diff --git a/pkg/miniogw/config.go b/pkg/miniogw/config.go index 0d71a8fdf..d0d797704 100644 --- a/pkg/miniogw/config.go +++ b/pkg/miniogw/config.go @@ -13,6 +13,7 @@ import ( "go.uber.org/zap" "storj.io/storj/pkg/eestream" + "storj.io/storj/pkg/identity" "storj.io/storj/pkg/metainfo/kvmetainfo" "storj.io/storj/pkg/overlay" "storj.io/storj/pkg/pointerdb/pdbclient" @@ -64,10 +65,16 @@ type ClientConfig struct { SegmentSize int64 `help:"the size of a segment in bytes" default:"64000000"` } +// ServerConfig determines how minio listens for requests +type ServerConfig struct { + Address string `help:"address to serve S3 api over" default:"localhost:7777"` +} + // Config is a general miniogw configuration struct. This should be everything // one needs to start a minio gateway. type Config struct { - Identity provider.IdentityConfig + Identity identity.Config + Server ServerConfig Minio MinioConfig Client ClientConfig RS RSConfig @@ -106,7 +113,7 @@ func (c Config) Run(ctx context.Context) (err error) { } minio.Main([]string{"storj", "gateway", "storj", - "--address", c.Identity.Server.Address, "--config-dir", c.Minio.Dir, "--quiet"}) + "--address", c.Server.Address, "--config-dir", c.Minio.Dir, "--quiet"}) return Error.New("unexpected minio exit") } diff --git a/pkg/miniogw/integration_test.go b/pkg/miniogw/integration_test.go index 2abaf0b41..f6a5d53e1 100644 --- a/pkg/miniogw/integration_test.go +++ b/pkg/miniogw/integration_test.go @@ -17,7 +17,7 @@ import ( "go.uber.org/zap" "go.uber.org/zap/zaptest" - "storj.io/storj/internal/identity" + testidentity "storj.io/storj/internal/identity" "storj.io/storj/internal/s3client" "storj.io/storj/internal/testcontext" "storj.io/storj/internal/testplanet" @@ -48,7 +48,7 @@ func TestUploadDownload(t *testing.T) { gwCfg.Minio.Dir = ctx.Dir("minio") // addresses - gwCfg.Identity.Server.Address = "127.0.0.1:7777" + gwCfg.Server.Address = "127.0.0.1:7777" gwCfg.Client.OverlayAddr = planet.Satellites[0].Addr() gwCfg.Client.PointerDBAddr = planet.Satellites[0].Addr() @@ -84,7 +84,7 @@ func TestUploadDownload(t *testing.T) { time.Sleep(100 * time.Millisecond) client, err := s3client.NewMinio(s3client.Config{ - S3Gateway: gwCfg.Identity.Server.Address, + S3Gateway: gwCfg.Server.Address, Satellite: planet.Satellites[0].Addr(), AccessKey: gwCfg.Minio.AccessKey, SecretKey: gwCfg.Minio.SecretKey, @@ -123,7 +123,7 @@ func runGateway(ctx context.Context, c miniogw.Config, log *zap.Logger, identity // set gateway flags flags := flag.NewFlagSet("gateway", flag.ExitOnError) - flags.String("address", c.Identity.Server.Address, "") + flags.String("address", c.Server.Address, "") flags.String("config-dir", c.Minio.Dir, "") flags.Bool("quiet", true, "") diff --git a/pkg/peertls/extensions.go b/pkg/peertls/extensions.go index cc65fed78..846438d36 100644 --- a/pkg/peertls/extensions.go +++ b/pkg/peertls/extensions.go @@ -14,7 +14,9 @@ import ( "time" "github.com/zeebo/errs" + "go.uber.org/zap" + "storj.io/storj/pkg/utils" "storj.io/storj/storage" "storj.io/storj/storage/boltdb" "storj.io/storj/storage/redis" @@ -402,3 +404,31 @@ func verifyCAWhitelistSignedLeafFunc(caWhitelist []*x509.Certificate) extensionV return ErrVerifyCAWhitelist.New("leaf extension") } } + +// NewRevDB returns a new revocation database given the URL +func NewRevDB(revocationDBURL string) (*RevocationDB, error) { + driver, source, err := utils.SplitDBURL(revocationDBURL) + if err != nil { + return nil, ErrRevocationDB.Wrap(err) + } + + var db *RevocationDB + switch driver { + case "bolt": + db, err = NewRevocationDBBolt(source) + if err != nil { + return nil, ErrRevocationDB.Wrap(err) + } + zap.S().Info("Starting overlay cache with BoltDB") + case "redis": + db, err = NewRevocationDBRedis(revocationDBURL) + if err != nil { + return nil, ErrRevocationDB.Wrap(err) + } + zap.S().Info("Starting overlay cache with Redis") + default: + return nil, ErrRevocationDB.New("database scheme not supported: %s", driver) + } + + return db, nil +} diff --git a/pkg/provider/provider.go b/pkg/provider/provider.go deleted file mode 100644 index f8ac8145b..000000000 --- a/pkg/provider/provider.go +++ /dev/null @@ -1,180 +0,0 @@ -// Copyright (C) 2018 Storj Labs, Inc. -// See LICENSE for copying information. - -package provider - -import ( - "context" - "io" - "net" - "time" - - "github.com/zeebo/errs" - "go.uber.org/zap" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - - "storj.io/storj/storage" -) - -var ( - // ErrSetup is returned when there's an error with setup - ErrSetup = errs.Class("setup error") -) - -// Responsibility represents a specific gRPC method collection to be registered -// on a shared gRPC server. PointerDB, OverlayCache, PieceStore, Kademlia, -// StatDB, etc. are all examples of Responsibilities. -type Responsibility interface { - Run(ctx context.Context, server *Provider) error -} - -// Provider represents a bundle of responsibilities defined by a specific ID. -// Examples of providers are the heavy client, the storagenode, and the gateway. -type Provider struct { - lis net.Listener - grpc *grpc.Server - next []Responsibility - identity *FullIdentity -} - -// NewProvider creates a Provider out of an Identity, a net.Listener, a UnaryInterceptorProvider and -// a set of responsibilities. -func NewProvider(opts *ServerOptions, lis net.Listener, interceptor grpc.UnaryServerInterceptor, - responsibilities ...Responsibility) (*Provider, error) { - grpcOpts, err := opts.grpcOpts() - if err != nil { - return nil, err - } - - unaryInterceptor := unaryInterceptor - if interceptor != nil { - unaryInterceptor = combineInterceptors(unaryInterceptor, interceptor) - } - - return &Provider{ - lis: lis, - grpc: grpc.NewServer( - grpc.StreamInterceptor(streamInterceptor), - grpc.UnaryInterceptor(unaryInterceptor), - grpcOpts, - ), - next: responsibilities, - identity: opts.Ident, - }, nil -} - -// SetupIdentity ensures a CA and identity exist -func SetupIdentity(ctx context.Context, c CASetupConfig, i IdentitySetupConfig) error { - if s := c.Status(); s != NoCertNoKey && !c.Overwrite { - return ErrSetup.New("certificate authority file(s) exist: %s", s) - } - - t, err := time.ParseDuration(c.Timeout) - if err != nil { - return errs.Wrap(err) - } - ctx, cancel := context.WithTimeout(ctx, t) - defer cancel() - - // Create a new certificate authority - ca, err := c.Create(ctx) - if err != nil { - return err - } - - if s := i.Status(); s != NoCertNoKey && !i.Overwrite { - return ErrSetup.New("identity file(s) exist: %s", s) - } - - // Create identity from new CA - _, err = i.Create(ca) - return err -} - -// SetupCA ensures a CA exists -func SetupCA(ctx context.Context, c CASetupConfig) error { - if s := c.Status(); s != NoCertNoKey && !c.Overwrite { - return ErrSetup.New("certificate authority file(s) exist: %s", s) - } - - t, err := time.ParseDuration(c.Timeout) - if err != nil { - return errs.Wrap(err) - } - ctx, cancel := context.WithTimeout(ctx, t) - defer cancel() - - // Create a new certificate authority - _, err = c.Create(ctx) - return err -} - -// Identity returns the provider's identity -func (p *Provider) Identity() *FullIdentity { return p.identity } - -// Addr returns the providers listener address -func (p *Provider) Addr() net.Addr { return p.lis.Addr() } - -// GRPC returns the provider's gRPC server for registration purposes -func (p *Provider) GRPC() *grpc.Server { return p.grpc } - -// Close shuts down the provider -func (p *Provider) Close() error { - p.grpc.GracefulStop() - return nil -} - -// Run will run the provider and all of its responsibilities -func (p *Provider) Run(ctx context.Context) (err error) { - defer mon.Task()(&ctx)(&err) - - // are there any unstarted responsibilities? start those first. the - // responsibilities know to call Run again once they're ready. - if len(p.next) > 0 { - next := p.next[0] - p.next = p.next[1:] - return next.Run(ctx, p) - } - - return p.grpc.Serve(p.lis) -} - -func streamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) { - err = handler(srv, ss) - if err != nil { - // no zap errors for canceled or wrong file downloads - if storage.ErrKeyNotFound.Has(err) || - status.Code(err) == codes.Canceled || - status.Code(err) == codes.Unavailable || - err == io.EOF { - return err - } - zap.S().Errorf("%+v", err) - } - return err -} - -func unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, - err error) { - resp, err = handler(ctx, req) - if err != nil { - // no zap errors for wrong file downloads - if status.Code(err) == codes.NotFound { - return resp, err - } - zap.S().Errorf("%+v", err) - } - return resp, err -} - -func combineInterceptors(a, b grpc.UnaryServerInterceptor) grpc.UnaryServerInterceptor { - return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { - return a(ctx, req, info, func(actx context.Context, areq interface{}) (interface{}, error) { - return b(actx, areq, info, func(bctx context.Context, breq interface{}) (interface{}, error) { - return handler(bctx, breq) - }) - }) - } -} diff --git a/pkg/provider/transition.go b/pkg/provider/transition.go new file mode 100644 index 000000000..b79003acb --- /dev/null +++ b/pkg/provider/transition.go @@ -0,0 +1,57 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package provider + +import ( + "storj.io/storj/pkg/identity" + "storj.io/storj/pkg/server" +) + +type ( + // Provider Transition: pkg/provider is going away. + Provider = server.Server + // ServerConfig Transition: pkg/provider is going away. + ServerConfig = server.Config + // FullIdentity Transition: pkg/provider is going away. + FullIdentity = identity.FullIdentity + // PeerIdentity Transition: pkg/provider is going away. + PeerIdentity = identity.PeerIdentity + // CASetupConfig Transition: pkg/provider is going away. + CASetupConfig = identity.CASetupConfig + // PeerCAConfig Transition: pkg/provider is going away. + PeerCAConfig = identity.PeerCAConfig + // FullCAConfig Transition: pkg/provider is going away. + FullCAConfig = identity.FullCAConfig + // IdentitySetupConfig Transition: pkg/provider is going away. + IdentitySetupConfig = identity.SetupConfig + // IdentityConfig Transition: pkg/provider is going away. + IdentityConfig = identity.Config + // NewCAOptions Transition: pkg/provider is going away. + NewCAOptions = identity.NewCAOptions + // FullCertificateAuthority Transition: pkg/provider is going away. + FullCertificateAuthority = identity.FullCertificateAuthority +) + +var ( + // NewServerOptions Transition: pkg/provider is going away. + NewServerOptions = server.NewOptions + // NewProvider Transition: pkg/provider is going away. + NewProvider = server.NewServer + // PeerIdentityFromContext Transition: pkg/provider is going away. + PeerIdentityFromContext = identity.PeerIdentityFromContext + // NewFullIdentity Transition: pkg/provider is going away. + NewFullIdentity = identity.NewFullIdentity + // SetupIdentity Transition: pkg/provider is going away. + SetupIdentity = identity.SetupIdentity + // SetupCA Transition: pkg/provider is going away. + SetupCA = identity.SetupCA + // NewCA Transition: pkg/provider is going away. + NewCA = identity.NewCA + // ErrSetup Transition: pkg/provider is going away. + ErrSetup = identity.ErrSetup + // NoCertNoKey Transition: pkg/provider is going away. + NoCertNoKey = identity.NoCertNoKey + // FullIdentityFromPEM Transition: pkg/provider is going away. + FullIdentityFromPEM = identity.FullIdentityFromPEM +) diff --git a/pkg/provider/common.go b/pkg/server/common.go similarity index 69% rename from pkg/provider/common.go rename to pkg/server/common.go index 953a62094..1ec4d0108 100644 --- a/pkg/provider/common.go +++ b/pkg/server/common.go @@ -1,7 +1,7 @@ // Copyright (C) 2018 Storj Labs, Inc. // See LICENSE for copying information. -package provider +package server import ( "github.com/zeebo/errs" @@ -11,6 +11,6 @@ import ( var ( mon = monkit.Package() - // Error is a provider error - Error = errs.Class("provider error") + // Error is a pkg/server error + Error = errs.Class("pkg/server error") ) diff --git a/pkg/server/config.go b/pkg/server/config.go new file mode 100644 index 000000000..cd03a4fe1 --- /dev/null +++ b/pkg/server/config.go @@ -0,0 +1,58 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package server + +import ( + "context" + "net" + + "go.uber.org/zap" + "google.golang.org/grpc" + + "storj.io/storj/pkg/identity" + "storj.io/storj/pkg/peertls" + "storj.io/storj/pkg/utils" +) + +// Config holds server specific configuration parameters +type Config struct { + RevocationDBURL string `help:"url for revocation database (e.g. bolt://some.db OR redis://127.0.0.1:6378?db=2&password=abc123)" default:"bolt://$CONFDIR/revocations.db"` + PeerCAWhitelistPath string `help:"path to the CA cert whitelist (peer identities must be signed by one these to be verified)"` + Address string `help:"address to listen on" default:":7777"` + Extensions peertls.TLSExtConfig + + Identity identity.Config +} + +// Run will run the given responsibilities with the configured identity. +func (sc Config) Run(ctx context.Context, + interceptor grpc.UnaryServerInterceptor, services ...Service) (err error) { + defer mon.Task()(&ctx)(&err) + + ident, err := sc.Identity.Load() + if err != nil { + return err + } + + lis, err := net.Listen("tcp", sc.Address) + if err != nil { + return err + } + defer func() { _ = lis.Close() }() + + opts, err := NewOptions(ident, sc) + if err != nil { + return err + } + defer func() { err = utils.CombineErrors(err, opts.RevDB.Close()) }() + + s, err := NewServer(opts, lis, interceptor, services...) + if err != nil { + return err + } + defer func() { _ = s.Close() }() + + zap.S().Infof("Node %s started", s.Identity().ID) + return s.Run(ctx) +} diff --git a/pkg/server/interceptors.go b/pkg/server/interceptors.go new file mode 100644 index 000000000..1e0a5a84f --- /dev/null +++ b/pkg/server/interceptors.go @@ -0,0 +1,54 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package server + +import ( + "context" + "io" + + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "storj.io/storj/storage" +) + +func streamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) { + err = handler(srv, ss) + if err != nil { + // no zap errors for canceled or wrong file downloads + if storage.ErrKeyNotFound.Has(err) || + status.Code(err) == codes.Canceled || + status.Code(err) == codes.Unavailable || + err == io.EOF { + return err + } + zap.S().Errorf("%+v", err) + } + return err +} + +func unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, + err error) { + resp, err = handler(ctx, req) + if err != nil { + // no zap errors for wrong file downloads + if status.Code(err) == codes.NotFound { + return resp, err + } + zap.S().Errorf("%+v", err) + } + return resp, err +} + +func combineInterceptors(a, b grpc.UnaryServerInterceptor) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + return a(ctx, req, info, func(actx context.Context, areq interface{}) (interface{}, error) { + return b(actx, areq, info, func(bctx context.Context, breq interface{}) (interface{}, error) { + return handler(bctx, breq) + }) + }) + } +} diff --git a/pkg/server/options.go b/pkg/server/options.go new file mode 100644 index 000000000..50f5db95e --- /dev/null +++ b/pkg/server/options.go @@ -0,0 +1,96 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package server + +import ( + "crypto/x509" + "io/ioutil" + "os" + + "google.golang.org/grpc" + + "storj.io/storj/pkg/identity" + "storj.io/storj/pkg/peertls" +) + +// Options holds config, identity, and peer verification function data for use with a grpc server. +type Options struct { + Config Config + Ident *identity.FullIdentity + RevDB *peertls.RevocationDB + PCVFuncs []peertls.PeerCertVerificationFunc +} + +// NewOptions is a constructor for `serverOptions` given an identity and config +func NewOptions(i *identity.FullIdentity, c Config) (*Options, error) { + opts := &Options{ + Config: c, + Ident: i, + } + + err := opts.configure(c) + if err != nil { + return nil, err + } + + return opts, nil +} + +func (opts *Options) grpcOpts() (grpc.ServerOption, error) { + return opts.Ident.ServerOption(opts.PCVFuncs...) +} + +// configure adds peer certificate verification functions and revocation +// database to the config. +func (opts *Options) configure(c Config) (err error) { + var pcvs []peertls.PeerCertVerificationFunc + parseOpts := peertls.ParseExtOptions{} + + if c.PeerCAWhitelistPath != "" { + caWhitelist, err := loadWhitelist(c.PeerCAWhitelistPath) + if err != nil { + return err + } + parseOpts.CAWhitelist = caWhitelist + pcvs = append(pcvs, peertls.VerifyCAWhitelist(caWhitelist)) + } + + if c.Extensions.Revocation { + opts.RevDB, err = peertls.NewRevDB(c.RevocationDBURL) + if err != nil { + return err + } + pcvs = append(pcvs, peertls.VerifyUnrevokedChainFunc(opts.RevDB)) + } + + exts := peertls.ParseExtensions(c.Extensions, parseOpts) + pcvs = append(pcvs, exts.VerifyFunc()) + + // NB: remove nil elements + for i, f := range pcvs { + if f == nil { + copy(pcvs[i:], pcvs[i+1:]) + pcvs = pcvs[:len(pcvs)-1] + } + } + + opts.PCVFuncs = pcvs + return nil +} + +func loadWhitelist(path string) ([]*x509.Certificate, error) { + w, err := ioutil.ReadFile(path) + if err != nil && !os.IsNotExist(err) { + return nil, err + } + + var whitelist []*x509.Certificate + if w != nil { + whitelist, err = identity.DecodeAndParseChainPEM(w) + if err != nil { + return nil, Error.Wrap(err) + } + } + return whitelist, nil +} diff --git a/pkg/server/options_test.go b/pkg/server/options_test.go new file mode 100644 index 000000000..b45f7106a --- /dev/null +++ b/pkg/server/options_test.go @@ -0,0 +1,133 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package server_test + +import ( + "io/ioutil" + "reflect" + "testing" + + "github.com/stretchr/testify/assert" + + "storj.io/storj/internal/testcontext" + "storj.io/storj/pkg/identity" + "storj.io/storj/pkg/peertls" + "storj.io/storj/pkg/server" +) + +func TestNewOptions(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + fi := pregeneratedIdentity(t) + + whitelistPath := ctx.File("whitelist.pem") + + chainData, err := peertls.ChainBytes(fi.CA) + assert.NoError(t, err) + + err = ioutil.WriteFile(whitelistPath, chainData, 0644) + assert.NoError(t, err) + + cases := []struct { + testID string + config server.Config + pcvFuncsLen int + }{ + { + "default", + server.Config{}, + 0, + }, { + "revocation processing", + server.Config{ + RevocationDBURL: "bolt://" + ctx.File("revocation1.db"), + Extensions: peertls.TLSExtConfig{ + Revocation: true, + }, + }, + 2, + }, { + "ca whitelist verification", + server.Config{ + PeerCAWhitelistPath: whitelistPath, + }, + 1, + }, { + "ca whitelist verification and whitelist signed leaf verification", + server.Config{ + // NB: file doesn't actually exist + PeerCAWhitelistPath: whitelistPath, + Extensions: peertls.TLSExtConfig{ + WhitelistSignedLeaf: true, + }, + }, + 2, + }, { + "revocation processing and whitelist verification", + server.Config{ + // NB: file doesn't actually exist + PeerCAWhitelistPath: whitelistPath, + RevocationDBURL: "bolt://" + ctx.File("revocation2.db"), + Extensions: peertls.TLSExtConfig{ + Revocation: true, + }, + }, + 3, + }, { + "revocation processing, whitelist, and signed leaf verification", + server.Config{ + // NB: file doesn't actually exist + PeerCAWhitelistPath: whitelistPath, + RevocationDBURL: "bolt://" + ctx.File("revocation3.db"), + Extensions: peertls.TLSExtConfig{ + Revocation: true, + WhitelistSignedLeaf: true, + }, + }, + 3, + }, + } + + for _, c := range cases { + t.Log(c.testID) + opts, err := server.NewOptions(fi, c.config) + assert.NoError(t, err) + assert.True(t, reflect.DeepEqual(fi, opts.Ident)) + assert.Equal(t, c.config, opts.Config) + assert.Len(t, opts.PCVFuncs, c.pcvFuncsLen) + } +} + +func pregeneratedIdentity(t *testing.T) *identity.FullIdentity { + const chain = `-----BEGIN CERTIFICATE----- +MIIBQDCB56ADAgECAhB+u3d03qyW/ROgwy/ZsPccMAoGCCqGSM49BAMCMAAwIhgP +MDAwMTAxMDEwMDAwMDBaGA8wMDAxMDEwMTAwMDAwMFowADBZMBMGByqGSM49AgEG +CCqGSM49AwEHA0IABIZrEPV/ExEkF0qUF0fJ3qSeGt5oFUX231v02NSUywcQ/Ve0 +v3nHbmcJdjWBis2AkfL25mYDVC25jLl4tylMKumjPzA9MA4GA1UdDwEB/wQEAwIF +oDAdBgNVHSUEFjAUBggrBgEFBQcDAQYIKwYBBQUHAwIwDAYDVR0TAQH/BAIwADAK +BggqhkjOPQQDAgNIADBFAiEA2ZvsR0ncw4mHRIg2Isavd+XVEoMo/etXQRAkDy9n +wyoCIDykUsqjshc9kCrXOvPSN8GuO2bNoLu5C7K1GlE/HI2X +-----END CERTIFICATE----- +-----BEGIN CERTIFICATE----- +MIIBODCB4KADAgECAhAOcvhKe5TWT44LqFfgA1f8MAoGCCqGSM49BAMCMAAwIhgP +MDAwMTAxMDEwMDAwMDBaGA8wMDAxMDEwMTAwMDAwMFowADBZMBMGByqGSM49AgEG +CCqGSM49AwEHA0IABIZrEPV/ExEkF0qUF0fJ3qSeGt5oFUX231v02NSUywcQ/Ve0 +v3nHbmcJdjWBis2AkfL25mYDVC25jLl4tylMKumjODA2MA4GA1UdDwEB/wQEAwIC +BDATBgNVHSUEDDAKBggrBgEFBQcDATAPBgNVHRMBAf8EBTADAQH/MAoGCCqGSM49 +BAMCA0cAMEQCIGAZfPT1qvlnkTacojTtP20ZWf6XbnSztJHIKlUw6AE+AiB5Vcjj +awRaC5l1KBPGqiKB0coVXDwhW+K70l326MPUcg== +-----END CERTIFICATE-----` + + const key = `-----BEGIN EC PRIVATE KEY----- +MHcCAQEEIKGjEetrxKrzl+AL1E5LXke+1ElyAdjAmr88/1Kx09+doAoGCCqGSM49 +AwEHoUQDQgAEoLy/0hs5deTXZunRumsMkiHpF0g8wAc58aXANmr7Mxx9tzoIYFnx +0YN4VDKdCtUJa29yA6TIz1MiIDUAcB5YCA== +-----END EC PRIVATE KEY-----` + + fi, err := identity.FullIdentityFromPEM([]byte(chain), []byte(key)) + assert.NoError(t, err) + + return fi +} diff --git a/pkg/server/server.go b/pkg/server/server.go new file mode 100644 index 000000000..7c3cb6fec --- /dev/null +++ b/pkg/server/server.go @@ -0,0 +1,86 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package server + +import ( + "context" + "net" + + "google.golang.org/grpc" + + "storj.io/storj/pkg/identity" +) + +// Service represents a specific gRPC method collection to be registered +// on a shared gRPC server. PointerDB, OverlayCache, PieceStore, Kademlia, +// StatDB, etc. are all examples of services. +type Service interface { + Run(ctx context.Context, server *Server) error +} + +// Server represents a bundle of services defined by a specific ID. +// Examples of servers are the satellite, the storagenode, and the uplink. +type Server struct { + lis net.Listener + grpc *grpc.Server + next []Service + identity *identity.FullIdentity +} + +// NewServer creates a Server out of an Identity, a net.Listener, +// a UnaryServerInterceptor, and a set of services. +func NewServer(opts *Options, lis net.Listener, + interceptor grpc.UnaryServerInterceptor, services ...Service) ( + *Server, error) { + grpcOpts, err := opts.grpcOpts() + if err != nil { + return nil, err + } + + unaryInterceptor := unaryInterceptor + if interceptor != nil { + unaryInterceptor = combineInterceptors(unaryInterceptor, interceptor) + } + + return &Server{ + lis: lis, + grpc: grpc.NewServer( + grpc.StreamInterceptor(streamInterceptor), + grpc.UnaryInterceptor(unaryInterceptor), + grpcOpts, + ), + next: services, + identity: opts.Ident, + }, nil +} + +// Identity returns the server's identity +func (p *Server) Identity() *identity.FullIdentity { return p.identity } + +// Addr returns the server's listener address +func (p *Server) Addr() net.Addr { return p.lis.Addr() } + +// GRPC returns the server's gRPC handle for registration purposes +func (p *Server) GRPC() *grpc.Server { return p.grpc } + +// Close shuts down the server +func (p *Server) Close() error { + p.grpc.GracefulStop() + return nil +} + +// Run will run the server and all of its services +func (p *Server) Run(ctx context.Context) (err error) { + defer mon.Task()(&ctx)(&err) + + // are there any unstarted services? start those first. the + // services should know to call Run again once they're ready. + if len(p.next) > 0 { + next := p.next[0] + p.next = p.next[1:] + return next.Run(ctx, p) + } + + return p.grpc.Serve(p.lis) +}