pkg/transport: require tls configuration for dialing (#1286)
* separate TLS options from server options (because we need them for dialing too) * stop creating transports in multiple places * ensure that we actually check revocation, whitelists, certificate signing, etc, for all connections.
This commit is contained in:
parent
ea28a9a581
commit
2a59679766
@ -15,6 +15,7 @@ import (
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/kademlia"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/peertls/tlsopts"
|
||||
"storj.io/storj/pkg/server"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/pkg/transport"
|
||||
@ -72,10 +73,9 @@ type Peer struct {
|
||||
// New creates a new Bootstrap Node.
|
||||
func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config) (*Peer, error) {
|
||||
peer := &Peer{
|
||||
Log: log,
|
||||
Identity: full,
|
||||
DB: db,
|
||||
Transport: transport.NewClient(full),
|
||||
Log: log,
|
||||
Identity: full,
|
||||
DB: db,
|
||||
}
|
||||
|
||||
var err error
|
||||
@ -88,11 +88,13 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config) (*P
|
||||
|
||||
publicConfig := config.Server
|
||||
publicConfig.Address = peer.Public.Listener.Addr().String()
|
||||
publicOptions, err := server.NewOptions(peer.Identity, publicConfig)
|
||||
publicOptions, err := tlsopts.NewOptions(peer.Identity, publicConfig.Config)
|
||||
if err != nil {
|
||||
return nil, errs.Combine(err, peer.Close())
|
||||
}
|
||||
|
||||
peer.Transport = transport.NewClient(publicOptions)
|
||||
|
||||
peer.Public.Server, err = server.New(publicOptions, peer.Public.Listener, nil)
|
||||
if err != nil {
|
||||
return nil, errs.Combine(err, peer.Close())
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/peertls/tlsopts"
|
||||
"storj.io/storj/pkg/process"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/pkg/transport"
|
||||
@ -124,12 +125,16 @@ func NewInspector(address, path string) (*Inspector, error) {
|
||||
CertPath: fmt.Sprintf("%s/identity.cert", path),
|
||||
KeyPath: fmt.Sprintf("%s/identity.key", path),
|
||||
}.Load()
|
||||
|
||||
if err != nil {
|
||||
return &Inspector{}, ErrIdentity.Wrap(err)
|
||||
return nil, ErrIdentity.Wrap(err)
|
||||
}
|
||||
|
||||
tc := transport.NewClient(id)
|
||||
tlsOpts, err := tlsopts.NewOptions(id, tlsopts.Config{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tc := transport.NewClient(tlsOpts)
|
||||
conn, err := tc.DialAddress(ctx, address)
|
||||
if err != nil {
|
||||
return &Inspector{}, ErrInspectorDial.Wrap(err)
|
||||
|
@ -21,12 +21,14 @@ import (
|
||||
"storj.io/storj/internal/memory"
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/peertls/tlsopts"
|
||||
"storj.io/storj/pkg/piecestore/psclient"
|
||||
"storj.io/storj/pkg/process"
|
||||
"storj.io/storj/pkg/transport"
|
||||
)
|
||||
|
||||
func dashCmd(cmd *cobra.Command, args []string) (err error) {
|
||||
ctx := context.Background()
|
||||
ctx := process.Ctx(cmd)
|
||||
|
||||
ident, err := runCfg.Identity.Load()
|
||||
if err != nil {
|
||||
@ -35,7 +37,12 @@ func dashCmd(cmd *cobra.Command, args []string) (err error) {
|
||||
zap.S().Info("Node ID: ", ident.ID)
|
||||
}
|
||||
|
||||
tc := transport.NewClient(ident)
|
||||
tlsOpts, err := tlsopts.NewOptions(ident, tlsopts.Config{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tc := transport.NewClient(tlsOpts)
|
||||
n := &pb.Node{
|
||||
Address: &pb.NodeAddress{
|
||||
Address: dashboardCfg.Address,
|
||||
|
@ -135,6 +135,9 @@ func newNetwork(flags *Flags) (*Processes, error) {
|
||||
"--kademlia.bootstrap-addr", bootstrap.Address,
|
||||
"--kademlia.operator.email", "bootstrap@example.com",
|
||||
"--kademlia.operator.wallet", "0x0123456789012345678901234567890123456789",
|
||||
|
||||
"--server.extensions.revocation=false",
|
||||
"--server.use-peer-ca-whitelist=false",
|
||||
},
|
||||
"run": {},
|
||||
})
|
||||
@ -165,6 +168,9 @@ func newNetwork(flags *Flags) (*Processes, error) {
|
||||
"--kademlia.bootstrap-addr", bootstrap.Address,
|
||||
"--repairer.overlay-addr", process.Address,
|
||||
"--repairer.pointer-db-addr", process.Address,
|
||||
|
||||
"--server.extensions.revocation=false",
|
||||
"--server.use-peer-ca-whitelist=false",
|
||||
},
|
||||
"run": {},
|
||||
})
|
||||
@ -202,6 +208,9 @@ func newNetwork(flags *Flags) (*Processes, error) {
|
||||
"--rs.repair-threshold", strconv.Itoa(2 * flags.StorageNodeCount / 5),
|
||||
"--rs.success-threshold", strconv.Itoa(3 * flags.StorageNodeCount / 5),
|
||||
"--rs.max-threshold", strconv.Itoa(4 * flags.StorageNodeCount / 5),
|
||||
|
||||
"--tls.extensions.revocation=false",
|
||||
"--tls.use-peer-ca-whitelist=false",
|
||||
},
|
||||
"run": {},
|
||||
})
|
||||
@ -279,6 +288,9 @@ func newNetwork(flags *Flags) (*Processes, error) {
|
||||
"--kademlia.bootstrap-addr", bootstrap.Address,
|
||||
"--kademlia.operator.email", fmt.Sprintf("storage%d@example.com", i),
|
||||
"--kademlia.operator.wallet", "0x0123456789012345678901234567890123456789",
|
||||
|
||||
"--server.extensions.revocation=false",
|
||||
"--server.use-peer-ca-whitelist=false",
|
||||
},
|
||||
"run": {},
|
||||
})
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
|
||||
"storj.io/storj/pkg/cfgstruct"
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/peertls/tlsopts"
|
||||
"storj.io/storj/pkg/storj"
|
||||
)
|
||||
|
||||
@ -32,10 +33,13 @@ func main() {
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
dialOption, err := identity.DialOption(storj.NodeID{})
|
||||
clientOptions, err := tlsopts.NewOptions(identity, tlsopts.Config{})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
dialOption := clientOptions.DialOption(storj.NodeID{})
|
||||
|
||||
conn, err := grpc.Dial(*targetAddr, dialOption)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
@ -16,8 +16,10 @@ import (
|
||||
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/peertls/tlsopts"
|
||||
"storj.io/storj/pkg/pointerdb/pdbclient"
|
||||
"storj.io/storj/pkg/storage/meta"
|
||||
"storj.io/storj/pkg/transport"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -41,8 +43,15 @@ func main() {
|
||||
logger.Error("Failed to create full identity: ", zap.Error(err))
|
||||
os.Exit(1)
|
||||
}
|
||||
clientOptions, err := tlsopts.NewOptions(identity, tlsopts.Config{})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
transportClient := transport.NewClient(clientOptions)
|
||||
|
||||
APIKey := "abc123"
|
||||
client, err := pdbclient.NewClient(identity, pointerdbClientPort, APIKey)
|
||||
client, err := pdbclient.NewClient(transportClient, pointerdbClientPort, APIKey)
|
||||
|
||||
if err != nil {
|
||||
logger.Error("Failed to dial: ", zap.Error(err))
|
||||
|
@ -37,6 +37,7 @@ import (
|
||||
"storj.io/storj/pkg/overlay"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/peertls"
|
||||
"storj.io/storj/pkg/peertls/tlsopts"
|
||||
"storj.io/storj/pkg/piecestore/psserver"
|
||||
"storj.io/storj/pkg/pointerdb"
|
||||
"storj.io/storj/pkg/server"
|
||||
@ -379,12 +380,14 @@ func (planet *Planet) newSatellites(count int) ([]*satellite.Peer, error) {
|
||||
|
||||
config := satellite.Config{
|
||||
Server: server.Config{
|
||||
Address: "127.0.0.1:0",
|
||||
RevocationDBURL: "bolt://" + filepath.Join(storageDir, "revocation.db"),
|
||||
UsePeerCAWhitelist: false, // TODO: enable
|
||||
Extensions: peertls.TLSExtConfig{
|
||||
Revocation: false,
|
||||
WhitelistSignedLeaf: false,
|
||||
Address: "127.0.0.1:0",
|
||||
Config: tlsopts.Config{
|
||||
RevocationDBURL: "bolt://" + filepath.Join(storageDir, "revocation.db"),
|
||||
UsePeerCAWhitelist: false, // TODO: enable
|
||||
Extensions: peertls.TLSExtConfig{
|
||||
Revocation: false,
|
||||
WhitelistSignedLeaf: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
Kademlia: kademlia.Config{
|
||||
@ -506,12 +509,14 @@ func (planet *Planet) newStorageNodes(count int) ([]*storagenode.Peer, error) {
|
||||
|
||||
config := storagenode.Config{
|
||||
Server: server.Config{
|
||||
Address: "127.0.0.1:0",
|
||||
RevocationDBURL: "bolt://" + filepath.Join(storageDir, "revocation.db"),
|
||||
UsePeerCAWhitelist: false, // TODO: enable
|
||||
Extensions: peertls.TLSExtConfig{
|
||||
Revocation: false,
|
||||
WhitelistSignedLeaf: false,
|
||||
Address: "127.0.0.1:0",
|
||||
Config: tlsopts.Config{
|
||||
RevocationDBURL: "bolt://" + filepath.Join(storageDir, "revocation.db"),
|
||||
UsePeerCAWhitelist: false, // TODO: enable
|
||||
Extensions: peertls.TLSExtConfig{
|
||||
Revocation: false,
|
||||
WhitelistSignedLeaf: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
Kademlia: kademlia.Config{
|
||||
@ -583,12 +588,14 @@ func (planet *Planet) newBootstrap() (peer *bootstrap.Peer, err error) {
|
||||
|
||||
config := bootstrap.Config{
|
||||
Server: server.Config{
|
||||
Address: "127.0.0.1:0",
|
||||
RevocationDBURL: "bolt://" + filepath.Join(dbDir, "revocation.db"),
|
||||
UsePeerCAWhitelist: false, // TODO: enable
|
||||
Extensions: peertls.TLSExtConfig{
|
||||
Revocation: false,
|
||||
WhitelistSignedLeaf: false,
|
||||
Address: "127.0.0.1:0",
|
||||
Config: tlsopts.Config{
|
||||
RevocationDBURL: "bolt://" + filepath.Join(dbDir, "revocation.db"),
|
||||
UsePeerCAWhitelist: false, // TODO: enable
|
||||
Extensions: peertls.TLSExtConfig{
|
||||
Revocation: false,
|
||||
WhitelistSignedLeaf: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
Kademlia: kademlia.Config{
|
||||
|
@ -19,6 +19,7 @@ import (
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/overlay"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/peertls/tlsopts"
|
||||
"storj.io/storj/pkg/pointerdb/pdbclient"
|
||||
"storj.io/storj/pkg/storage/streams"
|
||||
"storj.io/storj/pkg/storj"
|
||||
@ -46,6 +47,11 @@ func (planet *Planet) newUplink(name string, storageNodeCount int) (*Uplink, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tlsOpts, err := tlsopts.NewOptions(identity, tlsopts.Config{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
uplink := &Uplink{
|
||||
Log: planet.log.Named(name),
|
||||
Identity: identity,
|
||||
@ -54,7 +60,7 @@ func (planet *Planet) newUplink(name string, storageNodeCount int) (*Uplink, err
|
||||
|
||||
uplink.Log.Debug("id=" + identity.ID.String())
|
||||
|
||||
uplink.Transport = transport.NewClient(identity)
|
||||
uplink.Transport = transport.NewClient(tlsOpts)
|
||||
|
||||
uplink.Info = pb.Node{
|
||||
Id: uplink.Identity.ID,
|
||||
@ -120,17 +126,8 @@ func (uplink *Uplink) Shutdown() error { return nil }
|
||||
|
||||
// DialPointerDB dials destination with apikey and returns pointerdb Client
|
||||
func (uplink *Uplink) DialPointerDB(destination Peer, apikey string) (pdbclient.Client, error) {
|
||||
// TODO: use node.Transport instead of pdbclient.NewClient
|
||||
/*
|
||||
conn, err := node.Transport.DialNode(context.Background(), &destination.Info)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return piececlient.NewPSClient
|
||||
*/
|
||||
|
||||
// TODO: handle disconnect
|
||||
return pdbclient.NewClient(uplink.Identity, destination.Addr(), apikey)
|
||||
return pdbclient.NewClient(uplink.Transport, destination.Addr(), apikey)
|
||||
}
|
||||
|
||||
// DialOverlay dials destination and returns an overlay.Client
|
||||
@ -234,6 +231,10 @@ func (uplink *Uplink) getConfig(satellite *satellite.Peer) uplink.Config {
|
||||
config.RS.SuccessThreshold = 3 * uplink.StorageNodeCount / 5
|
||||
config.RS.MaxThreshold = 4 * uplink.StorageNodeCount / 5
|
||||
|
||||
config.TLS.UsePeerCAWhitelist = false
|
||||
config.TLS.Extensions.Revocation = false
|
||||
config.TLS.Extensions.WhitelistSignedLeaf = false
|
||||
|
||||
return config
|
||||
}
|
||||
|
||||
|
@ -125,8 +125,7 @@ func NewServer(log *zap.Logger, signer *identity.FullCertificateAuthority, authD
|
||||
}
|
||||
|
||||
// NewClient creates a new certificate signing grpc client
|
||||
func NewClient(ctx context.Context, ident *identity.FullIdentity, address string) (*Client, error) {
|
||||
tc := transport.NewClient(ident)
|
||||
func NewClient(ctx context.Context, tc transport.Client, address string) (*Client, error) {
|
||||
conn, err := tc.DialAddress(ctx, address)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
"storj.io/storj/internal/testplanet"
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/peertls/tlsopts"
|
||||
"storj.io/storj/pkg/pkcrypto"
|
||||
"storj.io/storj/pkg/server"
|
||||
"storj.io/storj/pkg/transport"
|
||||
@ -620,7 +621,7 @@ func TestCertificateSigner_Sign_E2E(t *testing.T) {
|
||||
require.NotNil(t, listener)
|
||||
|
||||
serverConfig := server.Config{Address: listener.Addr().String()}
|
||||
opts, err := server.NewOptions(serverIdent, serverConfig)
|
||||
opts, err := tlsopts.NewOptions(serverIdent, serverConfig.Config)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, opts)
|
||||
|
||||
@ -656,7 +657,11 @@ func TestCertificateSigner_Sign_E2E(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, clientIdent)
|
||||
|
||||
client, err := NewClient(ctx, clientIdent, listener.Addr().String())
|
||||
tlsOptions, err := tlsopts.NewOptions(clientIdent, tlsopts.Config{})
|
||||
require.NoError(t, err)
|
||||
clientTransport := transport.NewClient(tlsOptions)
|
||||
|
||||
client, err := NewClient(ctx, clientTransport, listener.Addr().String())
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, client)
|
||||
|
||||
@ -733,8 +738,12 @@ func TestNewClient(t *testing.T) {
|
||||
}
|
||||
})
|
||||
|
||||
tlsOptions, err := tlsopts.NewOptions(ident, tlsopts.Config{})
|
||||
require.NoError(t, err)
|
||||
clientTransport := transport.NewClient(tlsOptions)
|
||||
|
||||
t.Run("Basic", func(t *testing.T) {
|
||||
client, err := NewClient(ctx, ident, listener.Addr().String())
|
||||
client, err := NewClient(ctx, clientTransport, listener.Addr().String())
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, client)
|
||||
|
||||
@ -742,8 +751,7 @@ func TestNewClient(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("ClientFrom", func(t *testing.T) {
|
||||
tc := transport.NewClient(ident)
|
||||
conn, err := tc.DialAddress(ctx, listener.Addr().String())
|
||||
conn, err := clientTransport.DialAddress(ctx, listener.Addr().String())
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, conn)
|
||||
|
||||
|
@ -14,7 +14,9 @@ import (
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/peertls"
|
||||
"storj.io/storj/pkg/peertls/tlsopts"
|
||||
"storj.io/storj/pkg/server"
|
||||
"storj.io/storj/pkg/transport"
|
||||
"storj.io/storj/pkg/utils"
|
||||
"storj.io/storj/storage/boltdb"
|
||||
"storj.io/storj/storage/redis"
|
||||
@ -23,6 +25,7 @@ import (
|
||||
// CertClientConfig is a config struct for use with a certificate signing service client
|
||||
type CertClientConfig struct {
|
||||
Address string `help:"address of the certificate signing rpc service"`
|
||||
TLS tlsopts.Config
|
||||
}
|
||||
|
||||
// CertServerConfig is a config struct for use with a certificate signing service server
|
||||
@ -35,7 +38,11 @@ type CertServerConfig struct {
|
||||
|
||||
// Sign submits a certificate signing request given the config
|
||||
func (c CertClientConfig) Sign(ctx context.Context, ident *identity.FullIdentity, authToken string) ([][]byte, error) {
|
||||
client, err := NewClient(ctx, ident, c.Address)
|
||||
tlsOpts, err := tlsopts.NewOptions(ident, c.TLS)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
client, err := NewClient(ctx, transport.NewClient(tlsOpts), c.Address)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -8,11 +8,11 @@ import (
|
||||
"time"
|
||||
|
||||
"storj.io/storj/internal/memory"
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/overlay"
|
||||
"storj.io/storj/pkg/pointerdb/pdbclient"
|
||||
ecclient "storj.io/storj/pkg/storage/ec"
|
||||
"storj.io/storj/pkg/storage/segments"
|
||||
"storj.io/storj/pkg/transport"
|
||||
)
|
||||
|
||||
// Config contains configurable values for repairer
|
||||
@ -26,20 +26,20 @@ type Config struct {
|
||||
}
|
||||
|
||||
// GetSegmentRepairer creates a new segment repairer from storeConfig values
|
||||
func (c Config) GetSegmentRepairer(ctx context.Context, identity *identity.FullIdentity) (ss SegmentRepairer, err error) {
|
||||
func (c Config) GetSegmentRepairer(ctx context.Context, tc transport.Client) (ss SegmentRepairer, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
var oc overlay.Client
|
||||
oc, err = overlay.NewClientContext(ctx, identity, c.OverlayAddr)
|
||||
oc, err = overlay.NewClientContext(ctx, tc, c.OverlayAddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pdb, err := pdbclient.NewClientContext(ctx, identity, c.PointerDBAddr, c.APIKey)
|
||||
pdb, err := pdbclient.NewClientContext(ctx, tc, c.PointerDBAddr, c.APIKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ec := ecclient.NewClient(identity, c.MaxBufferMem.Int())
|
||||
ec := ecclient.NewClient(tc, c.MaxBufferMem.Int())
|
||||
return segments.NewSegmentRepairer(oc, ec, pdb), nil
|
||||
}
|
||||
|
@ -11,8 +11,8 @@ import (
|
||||
|
||||
"storj.io/storj/internal/sync2"
|
||||
"storj.io/storj/pkg/datarepair/queue"
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/pkg/transport"
|
||||
"storj.io/storj/storage"
|
||||
)
|
||||
|
||||
@ -23,22 +23,22 @@ type SegmentRepairer interface {
|
||||
|
||||
// Service contains the information needed to run the repair service
|
||||
type Service struct {
|
||||
queue queue.RepairQueue
|
||||
config *Config
|
||||
identity *identity.FullIdentity
|
||||
repairer SegmentRepairer
|
||||
limiter *sync2.Limiter
|
||||
ticker *time.Ticker
|
||||
queue queue.RepairQueue
|
||||
config *Config
|
||||
transport transport.Client
|
||||
repairer SegmentRepairer
|
||||
limiter *sync2.Limiter
|
||||
ticker *time.Ticker
|
||||
}
|
||||
|
||||
// NewService creates repairing service
|
||||
func NewService(queue queue.RepairQueue, config *Config, identity *identity.FullIdentity, interval time.Duration, concurrency int) *Service {
|
||||
func NewService(queue queue.RepairQueue, config *Config, transport transport.Client, interval time.Duration, concurrency int) *Service {
|
||||
return &Service{
|
||||
queue: queue,
|
||||
config: config,
|
||||
identity: identity,
|
||||
limiter: sync2.NewLimiter(concurrency),
|
||||
ticker: time.NewTicker(interval),
|
||||
queue: queue,
|
||||
config: config,
|
||||
transport: transport,
|
||||
limiter: sync2.NewLimiter(concurrency),
|
||||
ticker: time.NewTicker(interval),
|
||||
}
|
||||
}
|
||||
|
||||
@ -50,7 +50,7 @@ func (service *Service) Run(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
// TODO: close segment repairer, currently this leaks connections
|
||||
service.repairer, err = service.config.GetSegmentRepairer(ctx, service.identity)
|
||||
service.repairer, err = service.config.GetSegmentRepairer(ctx, service.transport)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -8,7 +8,6 @@ import (
|
||||
"context"
|
||||
"crypto"
|
||||
"crypto/sha256"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
@ -18,7 +17,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/peer"
|
||||
|
||||
@ -367,51 +365,6 @@ func (fi *FullIdentity) RestChainRaw() [][]byte {
|
||||
return chain
|
||||
}
|
||||
|
||||
// ServerOption returns a grpc `ServerOption` for incoming connections
|
||||
// to the node with this full identity
|
||||
func (fi *FullIdentity) ServerOption(pcvFuncs ...peertls.PeerCertVerificationFunc) (grpc.ServerOption, error) {
|
||||
c, err := peertls.TLSCert(fi.ChainRaw(), fi.Leaf, fi.Key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pcvFuncs = append(
|
||||
[]peertls.PeerCertVerificationFunc{peertls.VerifyPeerCertChains},
|
||||
pcvFuncs...,
|
||||
)
|
||||
tlsConfig := &tls.Config{
|
||||
Certificates: []tls.Certificate{*c},
|
||||
InsecureSkipVerify: true,
|
||||
ClientAuth: tls.RequireAnyClientCert,
|
||||
VerifyPeerCertificate: peertls.VerifyPeerFunc(
|
||||
pcvFuncs...,
|
||||
),
|
||||
}
|
||||
|
||||
return grpc.Creds(credentials.NewTLS(tlsConfig)), nil
|
||||
}
|
||||
|
||||
// DialOption returns a grpc `DialOption` for making outgoing connections
|
||||
// to the node with this peer identity
|
||||
// id is an optional id of the node we are dialing
|
||||
func (fi *FullIdentity) DialOption(id storj.NodeID) (grpc.DialOption, error) {
|
||||
c, err := peertls.TLSCert(fi.ChainRaw(), fi.Leaf, fi.Key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tlsConfig := &tls.Config{
|
||||
Certificates: []tls.Certificate{*c},
|
||||
InsecureSkipVerify: true,
|
||||
VerifyPeerCertificate: peertls.VerifyPeerFunc(
|
||||
peertls.VerifyPeerCertChains,
|
||||
verifyIdentity(id),
|
||||
),
|
||||
}
|
||||
|
||||
return grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)), nil
|
||||
}
|
||||
|
||||
// PeerIdentity converts a FullIdentity into a PeerIdentity
|
||||
func (fi *FullIdentity) PeerIdentity() *PeerIdentity {
|
||||
return &PeerIdentity{
|
||||
@ -422,26 +375,6 @@ func (fi *FullIdentity) PeerIdentity() *PeerIdentity {
|
||||
}
|
||||
}
|
||||
|
||||
func verifyIdentity(id storj.NodeID) peertls.PeerCertVerificationFunc {
|
||||
return func(_ [][]byte, parsedChains [][]*x509.Certificate) (err error) {
|
||||
defer mon.TaskNamed("verifyIdentity")(nil)(&err)
|
||||
if id == (storj.NodeID{}) {
|
||||
return nil
|
||||
}
|
||||
|
||||
peer, err := PeerIdentityFromCerts(parsedChains[0][0], parsedChains[0][1], parsedChains[0][2:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if peer.ID.String() != id.String() {
|
||||
return Error.New("peer ID did not match requested ID")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func backupPath(path string) string {
|
||||
pathExt := filepath.Ext(path)
|
||||
base := strings.TrimSuffix(path, pathExt)
|
||||
|
@ -1,6 +1,5 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package kademlia
|
||||
|
||||
@ -28,6 +27,7 @@ import (
|
||||
"storj.io/storj/internal/teststorj"
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/peertls/tlsopts"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/pkg/transport"
|
||||
"storj.io/storj/storage/teststore"
|
||||
@ -177,8 +177,10 @@ func testNode(t *testing.T, bn []pb.Node) (*Kademlia, *grpc.Server, func()) {
|
||||
assert.NoError(t, err)
|
||||
s := NewEndpoint(logger, k, k.GetRoutingTable().(*RoutingTable))
|
||||
// new ident opts
|
||||
identOpt, err := fid.ServerOption()
|
||||
assert.NoError(t, err)
|
||||
|
||||
serverOptions, err := tlsopts.NewOptions(fid, tlsopts.Config{})
|
||||
require.NoError(t, err)
|
||||
identOpt := serverOptions.ServerOption()
|
||||
|
||||
grpcServer := grpc.NewServer(identOpt)
|
||||
|
||||
@ -442,10 +444,13 @@ func startTestNodeServer(ctx context.Context) (*grpc.Server, *mockNodesServer, *
|
||||
if err != nil {
|
||||
return nil, nil, nil, ""
|
||||
}
|
||||
identOpt, err := identity.ServerOption()
|
||||
|
||||
serverOptions, err := tlsopts.NewOptions(identity, tlsopts.Config{})
|
||||
if err != nil {
|
||||
return nil, nil, nil, ""
|
||||
}
|
||||
identOpt := serverOptions.ServerOption()
|
||||
|
||||
grpcServer := grpc.NewServer(identOpt)
|
||||
mn := &mockNodesServer{queryCalled: 0}
|
||||
|
||||
@ -469,10 +474,12 @@ func newTestServer(ctx context.Context, nn []*pb.Node) (*grpc.Server, *mockNodes
|
||||
if err != nil {
|
||||
return nil, nil
|
||||
}
|
||||
identOpt, err := identity.ServerOption()
|
||||
serverOptions, err := tlsopts.NewOptions(identity, tlsopts.Config{})
|
||||
if err != nil {
|
||||
return nil, nil
|
||||
}
|
||||
identOpt := serverOptions.ServerOption()
|
||||
|
||||
grpcServer := grpc.NewServer(identOpt)
|
||||
mn := &mockNodesServer{queryCalled: 0}
|
||||
|
||||
@ -541,8 +548,14 @@ func newKademlia(log *zap.Logger, nodeType pb.NodeType, bootstrapNodes []pb.Node
|
||||
|
||||
rt, err := NewRoutingTable(log, self, teststore.New(), teststore.New(), nil)
|
||||
if err != nil {
|
||||
return nil, BootstrapErr.Wrap(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return NewService(log, self, bootstrapNodes, transport.NewClient(identity, rt), alpha, rt)
|
||||
tlsOptions, err := tlsopts.NewOptions(identity, tlsopts.Config{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
transportClient := transport.NewClient(tlsOptions, rt)
|
||||
|
||||
return NewService(log, self, bootstrapNodes, transportClient, alpha, rt)
|
||||
}
|
||||
|
@ -372,7 +372,7 @@ func newMetainfoParts(planet *testplanet.Planet) (*kvmetainfo.DB, buckets.Store,
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
ec := ecclient.NewClient(planet.Uplinks[0].Identity, 0)
|
||||
ec := ecclient.NewClient(planet.Uplinks[0].Transport, 0)
|
||||
fc, err := infectious.NewFEC(2, 4)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
|
@ -696,7 +696,7 @@ func initEnv(planet *testplanet.Planet) (minio.ObjectLayer, storj.Metainfo, stre
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
ec := ecclient.NewClient(planet.Uplinks[0].Identity, 0)
|
||||
ec := ecclient.NewClient(planet.Uplinks[0].Transport, 0)
|
||||
fc, err := infectious.NewFEC(2, 4)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
|
@ -8,7 +8,6 @@ import (
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/pkg/transport"
|
||||
@ -50,13 +49,12 @@ type Options struct {
|
||||
}
|
||||
|
||||
// NewClient returns a new intialized Overlay Client
|
||||
func NewClient(identity *identity.FullIdentity, address string) (Client, error) {
|
||||
return NewClientContext(context.TODO(), identity, address)
|
||||
func NewClient(tc transport.Client, address string) (Client, error) {
|
||||
return NewClientContext(context.TODO(), tc, address)
|
||||
}
|
||||
|
||||
// NewClientContext returns a new intialized Overlay Client
|
||||
func NewClientContext(ctx context.Context, identity *identity.FullIdentity, address string) (Client, error) {
|
||||
tc := transport.NewClient(identity, &Cache{}) // add overlay to transport client as observer
|
||||
func NewClientContext(ctx context.Context, tc transport.Client, address string) (Client, error) {
|
||||
conn, err := tc.DialAddress(ctx, address)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -1,7 +1,7 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package server
|
||||
package tlsopts
|
||||
|
||||
const (
|
||||
// DefaultPeerCAWhitelist includes the production Storj network CAs
|
16
pkg/peertls/tlsopts/config.go
Normal file
16
pkg/peertls/tlsopts/config.go
Normal file
@ -0,0 +1,16 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package tlsopts
|
||||
|
||||
import (
|
||||
"storj.io/storj/pkg/peertls"
|
||||
)
|
||||
|
||||
// Config holds tls configuration parameters
|
||||
type Config struct {
|
||||
RevocationDBURL string `default:"bolt://$CONFDIR/revocations.db" help:"url for revocation database (e.g. bolt://some.db OR redis://127.0.0.1:6378?db=2&password=abc123)"`
|
||||
PeerCAWhitelistPath string `help:"path to the CA cert whitelist (peer identities must be signed by one these to be verified). this will override the default peer whitelist"`
|
||||
UsePeerCAWhitelist bool `help:"if true, uses peer ca whitelist checking" default:"false"`
|
||||
Extensions peertls.TLSExtConfig
|
||||
}
|
@ -1,27 +1,36 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package server
|
||||
package tlsopts
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"io/ioutil"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"github.com/zeebo/errs"
|
||||
monkit "gopkg.in/spacemonkeygo/monkit.v2"
|
||||
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/peertls"
|
||||
"storj.io/storj/pkg/pkcrypto"
|
||||
)
|
||||
|
||||
// Options holds config, identity, and peer verification function data for use with a grpc server.
|
||||
var (
|
||||
mon = monkit.Package()
|
||||
// Error is error for tlsopts
|
||||
Error = errs.Class("tlsopts error")
|
||||
)
|
||||
|
||||
// Options holds config, identity, and peer verification function data for use with tls.
|
||||
type Options struct {
|
||||
Config Config
|
||||
Ident *identity.FullIdentity
|
||||
RevDB *identity.RevocationDB
|
||||
PCVFuncs []peertls.PeerCertVerificationFunc
|
||||
Cert *tls.Certificate
|
||||
}
|
||||
|
||||
// NewOptions is a constructor for `serverOptions` given an identity and config
|
||||
// NewOptions is a constructor for `tls options` given an identity and config
|
||||
func NewOptions(i *identity.FullIdentity, c Config) (*Options, error) {
|
||||
opts := &Options{
|
||||
Config: c,
|
||||
@ -36,10 +45,6 @@ func NewOptions(i *identity.FullIdentity, c Config) (*Options, error) {
|
||||
return opts, nil
|
||||
}
|
||||
|
||||
func (opts *Options) grpcOpts() (grpc.ServerOption, error) {
|
||||
return opts.Ident.ServerOption(opts.PCVFuncs...)
|
||||
}
|
||||
|
||||
// configure adds peer certificate verification functions and revocation
|
||||
// database to the config.
|
||||
func (opts *Options) configure(c Config) (err error) {
|
||||
@ -82,5 +87,7 @@ func (opts *Options) configure(c Config) (err error) {
|
||||
}
|
||||
|
||||
opts.PCVFuncs = pcvs
|
||||
return nil
|
||||
|
||||
opts.Cert, err = peertls.TLSCert(opts.Ident.ChainRaw(), opts.Ident.Leaf, opts.Ident.Key)
|
||||
return err
|
||||
}
|
@ -1,7 +1,7 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package server_test
|
||||
package tlsopts_test
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
@ -13,7 +13,7 @@ import (
|
||||
"storj.io/storj/internal/testcontext"
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/peertls"
|
||||
"storj.io/storj/pkg/server"
|
||||
"storj.io/storj/pkg/peertls/tlsopts"
|
||||
)
|
||||
|
||||
func TestNewOptions(t *testing.T) {
|
||||
@ -32,16 +32,16 @@ func TestNewOptions(t *testing.T) {
|
||||
|
||||
cases := []struct {
|
||||
testID string
|
||||
config server.Config
|
||||
config tlsopts.Config
|
||||
pcvFuncsLen int
|
||||
}{
|
||||
{
|
||||
"default",
|
||||
server.Config{},
|
||||
tlsopts.Config{},
|
||||
0,
|
||||
}, {
|
||||
"revocation processing",
|
||||
server.Config{
|
||||
tlsopts.Config{
|
||||
RevocationDBURL: "bolt://" + ctx.File("revocation1.db"),
|
||||
Extensions: peertls.TLSExtConfig{
|
||||
Revocation: true,
|
||||
@ -50,14 +50,14 @@ func TestNewOptions(t *testing.T) {
|
||||
2,
|
||||
}, {
|
||||
"ca whitelist verification",
|
||||
server.Config{
|
||||
tlsopts.Config{
|
||||
PeerCAWhitelistPath: whitelistPath,
|
||||
UsePeerCAWhitelist: true,
|
||||
},
|
||||
1,
|
||||
}, {
|
||||
"ca whitelist verification and whitelist signed leaf verification",
|
||||
server.Config{
|
||||
tlsopts.Config{
|
||||
// NB: file doesn't actually exist
|
||||
PeerCAWhitelistPath: whitelistPath,
|
||||
UsePeerCAWhitelist: true,
|
||||
@ -68,7 +68,7 @@ func TestNewOptions(t *testing.T) {
|
||||
2,
|
||||
}, {
|
||||
"revocation processing and whitelist verification",
|
||||
server.Config{
|
||||
tlsopts.Config{
|
||||
// NB: file doesn't actually exist
|
||||
PeerCAWhitelistPath: whitelistPath,
|
||||
UsePeerCAWhitelist: true,
|
||||
@ -80,7 +80,7 @@ func TestNewOptions(t *testing.T) {
|
||||
3,
|
||||
}, {
|
||||
"revocation processing, whitelist, and signed leaf verification",
|
||||
server.Config{
|
||||
tlsopts.Config{
|
||||
// NB: file doesn't actually exist
|
||||
PeerCAWhitelistPath: whitelistPath,
|
||||
UsePeerCAWhitelist: true,
|
||||
@ -96,7 +96,7 @@ func TestNewOptions(t *testing.T) {
|
||||
|
||||
for _, c := range cases {
|
||||
t.Log(c.testID)
|
||||
opts, err := server.NewOptions(fi, c.config)
|
||||
opts, err := tlsopts.NewOptions(fi, c.config)
|
||||
assert.NoError(t, err)
|
||||
assert.True(t, reflect.DeepEqual(fi, opts.Ident))
|
||||
assert.Equal(t, c.config, opts.Config)
|
79
pkg/peertls/tlsopts/tls.go
Normal file
79
pkg/peertls/tlsopts/tls.go
Normal file
@ -0,0 +1,79 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package tlsopts
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/peertls"
|
||||
"storj.io/storj/pkg/storj"
|
||||
)
|
||||
|
||||
// ServerOption returns a grpc `ServerOption` for incoming connections
|
||||
// to the node with this full identity
|
||||
func (opts *Options) ServerOption() grpc.ServerOption {
|
||||
pcvFuncs := append(
|
||||
[]peertls.PeerCertVerificationFunc{
|
||||
peertls.VerifyPeerCertChains,
|
||||
},
|
||||
opts.PCVFuncs...,
|
||||
)
|
||||
tlsConfig := &tls.Config{
|
||||
Certificates: []tls.Certificate{*opts.Cert},
|
||||
InsecureSkipVerify: true,
|
||||
ClientAuth: tls.RequireAnyClientCert,
|
||||
VerifyPeerCertificate: peertls.VerifyPeerFunc(
|
||||
pcvFuncs...,
|
||||
),
|
||||
}
|
||||
|
||||
return grpc.Creds(credentials.NewTLS(tlsConfig))
|
||||
}
|
||||
|
||||
// DialOption returns a grpc `DialOption` for making outgoing connections
|
||||
// to the node with this peer identity
|
||||
// id is an optional id of the node we are dialing
|
||||
func (opts *Options) DialOption(id storj.NodeID) grpc.DialOption {
|
||||
pcvFuncs := append(
|
||||
[]peertls.PeerCertVerificationFunc{
|
||||
peertls.VerifyPeerCertChains,
|
||||
verifyIdentity(id),
|
||||
},
|
||||
opts.PCVFuncs...,
|
||||
)
|
||||
tlsConfig := &tls.Config{
|
||||
Certificates: []tls.Certificate{*opts.Cert},
|
||||
InsecureSkipVerify: true,
|
||||
VerifyPeerCertificate: peertls.VerifyPeerFunc(
|
||||
pcvFuncs...,
|
||||
),
|
||||
}
|
||||
|
||||
return grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))
|
||||
}
|
||||
|
||||
func verifyIdentity(id storj.NodeID) peertls.PeerCertVerificationFunc {
|
||||
return func(_ [][]byte, parsedChains [][]*x509.Certificate) (err error) {
|
||||
defer mon.TaskNamed("verifyIdentity")(nil)(&err)
|
||||
if id == (storj.NodeID{}) {
|
||||
return nil
|
||||
}
|
||||
|
||||
peer, err := identity.PeerIdentityFromCerts(parsedChains[0][0], parsedChains[0][1], parsedChains[0][2:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if peer.ID.String() != id.String() {
|
||||
return Error.New("peer ID did not match requested ID")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
@ -10,7 +10,6 @@ import (
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/kademlia"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/piecestore/psserver/psdb"
|
||||
@ -35,8 +34,8 @@ type AgreementSender struct { // TODO: rename to service
|
||||
// TODO: take transport instead of identity as argument
|
||||
|
||||
// New creates an Agreement Sender
|
||||
func New(log *zap.Logger, DB *psdb.DB, identity *identity.FullIdentity, kad *kademlia.Kademlia, checkInterval time.Duration) *AgreementSender {
|
||||
return &AgreementSender{DB: DB, log: log, transport: transport.NewClient(identity), kad: kad, checkInterval: checkInterval}
|
||||
func New(log *zap.Logger, DB *psdb.DB, tc transport.Client, kad *kademlia.Kademlia, checkInterval time.Duration) *AgreementSender {
|
||||
return &AgreementSender{DB: DB, log: log, transport: tc, kad: kad, checkInterval: checkInterval}
|
||||
}
|
||||
|
||||
// Run the agreement sender with a context to check for cancel
|
||||
|
@ -31,6 +31,7 @@ import (
|
||||
"storj.io/storj/pkg/bwagreement/testbwagreement"
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/peertls/tlsopts"
|
||||
pstore "storj.io/storj/pkg/piecestore"
|
||||
"storj.io/storj/pkg/piecestore/psserver/psdb"
|
||||
"storj.io/storj/pkg/server"
|
||||
@ -581,16 +582,18 @@ func NewTest(ctx context.Context, t *testing.T, snID, upID *identity.FullIdentit
|
||||
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
publicConfig := server.Config{Address: "127.0.0.1:0"}
|
||||
publicOptions, err := server.NewOptions(snID, publicConfig)
|
||||
publicOptions, err := tlsopts.NewOptions(snID, publicConfig.Config)
|
||||
require.NoError(t, err)
|
||||
grpcServer, err := server.New(publicOptions, listener, nil)
|
||||
require.NoError(t, err)
|
||||
pb.RegisterPieceStoreRoutesServer(grpcServer.GRPC(), psServer)
|
||||
go func() { require.NoError(t, grpcServer.Run(ctx)) }()
|
||||
//init client
|
||||
co, err := upID.DialOption(storj.NodeID{})
|
||||
|
||||
tlsOptions, err := tlsopts.NewOptions(upID, tlsopts.Config{})
|
||||
require.NoError(t, err)
|
||||
conn, err := grpc.Dial(listener.Addr().String(), co)
|
||||
|
||||
conn, err := grpc.Dial(listener.Addr().String(), tlsOptions.DialOption(storj.NodeID{}))
|
||||
require.NoError(t, err)
|
||||
psClient := pb.NewPieceStoreRoutesClient(conn)
|
||||
//cleanup callback
|
||||
|
@ -14,7 +14,6 @@ import (
|
||||
monkit "gopkg.in/spacemonkeygo/monkit.v2"
|
||||
|
||||
"storj.io/storj/pkg/auth/grpcauth"
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/pkg/transport"
|
||||
@ -60,14 +59,13 @@ type Client interface {
|
||||
}
|
||||
|
||||
// NewClient initializes a new pointerdb client
|
||||
func NewClient(identity *identity.FullIdentity, address string, APIKey string) (*PointerDB, error) {
|
||||
return NewClientContext(context.TODO(), identity, address, APIKey)
|
||||
func NewClient(tc transport.Client, address string, APIKey string) (*PointerDB, error) {
|
||||
return NewClientContext(context.TODO(), tc, address, APIKey)
|
||||
}
|
||||
|
||||
// NewClientContext initializes a new pointerdb client
|
||||
func NewClientContext(ctx context.Context, identity *identity.FullIdentity, address string, APIKey string) (*PointerDB, error) {
|
||||
func NewClientContext(ctx context.Context, tc transport.Client, address string, APIKey string) (*PointerDB, error) {
|
||||
apiKeyInjector := grpcauth.NewAPIKeyInjector(APIKey)
|
||||
tc := transport.NewClient(identity)
|
||||
conn, err := tc.DialAddress(
|
||||
ctx,
|
||||
address,
|
||||
|
@ -11,17 +11,14 @@ import (
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/peertls"
|
||||
"storj.io/storj/pkg/peertls/tlsopts"
|
||||
"storj.io/storj/pkg/utils"
|
||||
)
|
||||
|
||||
// Config holds server specific configuration parameters
|
||||
type Config struct {
|
||||
RevocationDBURL string `default:"bolt://$CONFDIR/revocations.db" help:"url for revocation database (e.g. bolt://some.db OR redis://127.0.0.1:6378?db=2&password=abc123)"`
|
||||
PeerCAWhitelistPath string `help:"path to the CA cert whitelist (peer identities must be signed by one these to be verified). this will override the default peer whitelist"`
|
||||
UsePeerCAWhitelist bool `help:"if true, uses peer ca whitelist checking" default:"false"`
|
||||
Address string `user:"true" help:"address to listen on" default:":7777"`
|
||||
Extensions peertls.TLSExtConfig
|
||||
tlsopts.Config
|
||||
Address string `user:"true" help:"address to listen on" default:":7777"`
|
||||
}
|
||||
|
||||
// Run will run the given responsibilities with the configured identity.
|
||||
@ -34,7 +31,7 @@ func (sc Config) Run(ctx context.Context, identity *identity.FullIdentity, inter
|
||||
}
|
||||
defer func() { _ = lis.Close() }()
|
||||
|
||||
opts, err := NewOptions(identity, sc)
|
||||
opts, err := tlsopts.NewOptions(identity, sc.Config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/peertls/tlsopts"
|
||||
)
|
||||
|
||||
// Service represents a specific gRPC method collection to be registered
|
||||
@ -31,12 +32,7 @@ type Server struct {
|
||||
|
||||
// New creates a Server out of an Identity, a net.Listener,
|
||||
// a UnaryServerInterceptor, and a set of services.
|
||||
func New(opts *Options, lis net.Listener, interceptor grpc.UnaryServerInterceptor, services ...Service) (*Server, error) {
|
||||
grpcOpts, err := opts.grpcOpts()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func New(opts *tlsopts.Options, lis net.Listener, interceptor grpc.UnaryServerInterceptor, services ...Service) (*Server, error) {
|
||||
unaryInterceptor := unaryInterceptor
|
||||
if interceptor != nil {
|
||||
unaryInterceptor = combineInterceptors(unaryInterceptor, interceptor)
|
||||
@ -47,7 +43,7 @@ func New(opts *Options, lis net.Listener, interceptor grpc.UnaryServerIntercepto
|
||||
grpc: grpc.NewServer(
|
||||
grpc.StreamInterceptor(streamInterceptor),
|
||||
grpc.UnaryInterceptor(unaryInterceptor),
|
||||
grpcOpts,
|
||||
opts.ServerOption(),
|
||||
),
|
||||
next: services,
|
||||
identity: opts.Ident,
|
||||
|
@ -16,7 +16,6 @@ import (
|
||||
monkit "gopkg.in/spacemonkeygo/monkit.v2"
|
||||
|
||||
"storj.io/storj/pkg/eestream"
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/piecestore/psclient"
|
||||
"storj.io/storj/pkg/ranger"
|
||||
@ -44,8 +43,7 @@ type ecClient struct {
|
||||
}
|
||||
|
||||
// NewClient from the given identity and max buffer memory
|
||||
func NewClient(identity *identity.FullIdentity, memoryLimit int) Client {
|
||||
tc := transport.NewClient(identity)
|
||||
func NewClient(tc transport.Client, memoryLimit int) Client {
|
||||
return &ecClient{
|
||||
transport: tc,
|
||||
memoryLimit: memoryLimit,
|
||||
|
@ -15,14 +15,15 @@ import (
|
||||
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/vivint/infectious"
|
||||
|
||||
"storj.io/storj/internal/teststorj"
|
||||
"storj.io/storj/pkg/eestream"
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/peertls/tlsopts"
|
||||
"storj.io/storj/pkg/piecestore/psclient"
|
||||
"storj.io/storj/pkg/pkcrypto"
|
||||
"storj.io/storj/pkg/ranger"
|
||||
"storj.io/storj/pkg/transport"
|
||||
)
|
||||
@ -45,24 +46,48 @@ var (
|
||||
)
|
||||
|
||||
func TestNewECClient(t *testing.T) {
|
||||
ident, err := identity.FullIdentityFromPEM([]byte(`-----BEGIN CERTIFICATE-----
|
||||
MIIBPzCB56ADAgECAhBkctCIgrE25/vSSXpUno5SMAoGCCqGSM49BAMCMAAwIhgP
|
||||
MDAwMTAxMDEwMDAwMDBaGA8wMDAxMDEwMTAwMDAwMFowADBZMBMGByqGSM49AgEG
|
||||
CCqGSM49AwEHA0IABFaIq+DPJfvMv8RwFXIpGGxLOHCbsvG8iMyAarv04l8QptPP
|
||||
nSEKiod+KGbhQ6pEJZ0eWEyDbkA9RsUG/axNX96jPzA9MA4GA1UdDwEB/wQEAwIF
|
||||
oDAdBgNVHSUEFjAUBggrBgEFBQcDAQYIKwYBBQUHAwIwDAYDVR0TAQH/BAIwADAK
|
||||
BggqhkjOPQQDAgNHADBEAiAc+6+oquoS0zcYrLd4rmoZC6uoh4ItQvH5phP0MK3b
|
||||
YAIgDznIZz/oeowiv+Ui6HZT7aclBvTGjrfHR7Uo7TeGFls=
|
||||
-----END CERTIFICATE-----
|
||||
-----BEGIN CERTIFICATE-----
|
||||
MIIBOjCB4KADAgECAhA7Yb8vONMfR8ri8DCmFP7hMAoGCCqGSM49BAMCMAAwIhgP
|
||||
MDAwMTAxMDEwMDAwMDBaGA8wMDAxMDEwMTAwMDAwMFowADBZMBMGByqGSM49AgEG
|
||||
CCqGSM49AwEHA0IABCqtWDMdx38NKcTW58up4SLn6d6f+E4jljovCp9YY4zVg2lk
|
||||
/GyDAb5tuB/WttbZUO7VUMSdYjpSH5sad8uff3+jODA2MA4GA1UdDwEB/wQEAwIC
|
||||
BDATBgNVHSUEDDAKBggrBgEFBQcDATAPBgNVHRMBAf8EBTADAQH/MAoGCCqGSM49
|
||||
BAMCA0kAMEYCIQDFCnJ5qV6KyN2AGD7exywI5ls7Jo3scBO8ekuXT2yNhQIhAK3W
|
||||
qYzzqaR5oPuEeRSitAbV69mNcKznpU21jCnnuSq9
|
||||
-----END CERTIFICATE-----
|
||||
`), []byte(`-----BEGIN EC PRIVATE KEY-----
|
||||
MHcCAQEEICvE+Bd39LJ3VVf/SBdkw/IPjyVmMWq8Sr7GuWzkfdpJoAoGCCqGSM49
|
||||
AwEHoUQDQgAEVoir4M8l+8y/xHAVcikYbEs4cJuy8byIzIBqu/TiXxCm08+dIQqK
|
||||
h34oZuFDqkQlnR5YTINuQD1GxQb9rE1f3g==
|
||||
-----END EC PRIVATE KEY-----`))
|
||||
require.NoError(t, err)
|
||||
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
mbm := 1234
|
||||
|
||||
privKey, err := pkcrypto.GeneratePrivateKey()
|
||||
assert.NoError(t, err)
|
||||
identity := &identity.FullIdentity{Key: privKey}
|
||||
ec := NewClient(identity, mbm)
|
||||
clientOptions, err := tlsopts.NewOptions(ident, tlsopts.Config{})
|
||||
require.NoError(t, err)
|
||||
|
||||
clientTransport := transport.NewClient(clientOptions)
|
||||
|
||||
ec := NewClient(clientTransport, mbm)
|
||||
assert.NotNil(t, ec)
|
||||
|
||||
ecc, ok := ec.(*ecClient)
|
||||
assert.True(t, ok)
|
||||
assert.NotNil(t, ecc.transport)
|
||||
assert.Equal(t, mbm, ecc.memoryLimit)
|
||||
|
||||
assert.NotNil(t, ecc.transport.Identity())
|
||||
assert.Equal(t, ecc.transport.Identity(), identity)
|
||||
}
|
||||
|
||||
func TestPut(t *testing.T) {
|
||||
|
@ -13,6 +13,7 @@ import (
|
||||
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/peertls/tlsopts"
|
||||
"storj.io/storj/pkg/storj"
|
||||
)
|
||||
|
||||
@ -41,14 +42,14 @@ type Client interface {
|
||||
|
||||
// Transport interface structure
|
||||
type Transport struct {
|
||||
identity *identity.FullIdentity
|
||||
tlsOpts *tlsopts.Options
|
||||
observers []Observer
|
||||
}
|
||||
|
||||
// NewClient returns a newly instantiated Transport Client
|
||||
func NewClient(identity *identity.FullIdentity, obs ...Observer) Client {
|
||||
func NewClient(tlsOpts *tlsopts.Options, obs ...Observer) Client {
|
||||
return &Transport{
|
||||
identity: identity,
|
||||
tlsOpts: tlsOpts,
|
||||
observers: obs,
|
||||
}
|
||||
}
|
||||
@ -64,11 +65,7 @@ func (transport *Transport) DialNode(ctx context.Context, node *pb.Node, opts ..
|
||||
}
|
||||
|
||||
// add ID of node we are wanting to connect to
|
||||
dialOpt, err := transport.identity.DialOption(node.Id)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
dialOpt := transport.tlsOpts.DialOption(node.Id)
|
||||
options := append([]grpc.DialOption{dialOpt, grpc.WithBlock(), grpc.FailOnNonTempDialError(true)}, opts...)
|
||||
|
||||
ctx, cf := context.WithTimeout(ctx, timeout)
|
||||
@ -92,12 +89,9 @@ func (transport *Transport) DialNode(ctx context.Context, node *pb.Node, opts ..
|
||||
func (transport *Transport) DialAddress(ctx context.Context, address string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
dialOpt, err := transport.identity.DialOption(storj.NodeID{})
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
dialOpt := transport.tlsOpts.DialOption(storj.NodeID{})
|
||||
options := append([]grpc.DialOption{dialOpt, grpc.WithBlock(), grpc.FailOnNonTempDialError(true)}, opts...)
|
||||
|
||||
conn, err = grpc.DialContext(ctx, address, options...)
|
||||
if err == context.Canceled {
|
||||
return nil, err
|
||||
@ -107,12 +101,12 @@ func (transport *Transport) DialAddress(ctx context.Context, address string, opt
|
||||
|
||||
// Identity is a getter for the transport's identity
|
||||
func (transport *Transport) Identity() *identity.FullIdentity {
|
||||
return transport.identity
|
||||
return transport.tlsOpts.Ident
|
||||
}
|
||||
|
||||
// WithObservers returns a new transport including the listed observers.
|
||||
func (transport *Transport) WithObservers(obs ...Observer) *Transport {
|
||||
tr := &Transport{identity: transport.identity}
|
||||
tr := &Transport{tlsOpts: transport.tlsOpts}
|
||||
tr.observers = append(tr.observers, transport.observers...)
|
||||
tr.observers = append(tr.observers, obs...)
|
||||
return tr
|
||||
|
@ -1,5 +1,6 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package transport_test
|
||||
|
||||
import (
|
||||
@ -14,7 +15,6 @@ import (
|
||||
"storj.io/storj/internal/testplanet"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/pkg/transport"
|
||||
)
|
||||
|
||||
func TestDialNode(t *testing.T) {
|
||||
@ -29,7 +29,7 @@ func TestDialNode(t *testing.T) {
|
||||
|
||||
planet.Start(ctx)
|
||||
|
||||
client := transport.NewClient(planet.StorageNodes[0].Identity)
|
||||
client := planet.StorageNodes[0].Transport
|
||||
|
||||
{ // DialNode with invalid targets
|
||||
targets := []*pb.Node{
|
||||
|
@ -32,6 +32,7 @@ import (
|
||||
"storj.io/storj/pkg/kademlia"
|
||||
"storj.io/storj/pkg/overlay"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/peertls/tlsopts"
|
||||
"storj.io/storj/pkg/pointerdb"
|
||||
"storj.io/storj/pkg/server"
|
||||
"storj.io/storj/pkg/statdb"
|
||||
@ -172,10 +173,9 @@ type Peer struct {
|
||||
// New creates a new satellite
|
||||
func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config) (*Peer, error) {
|
||||
peer := &Peer{
|
||||
Log: log,
|
||||
Identity: full,
|
||||
DB: db,
|
||||
Transport: transport.NewClient(full),
|
||||
Log: log,
|
||||
Identity: full,
|
||||
DB: db,
|
||||
}
|
||||
|
||||
var err error
|
||||
@ -188,11 +188,13 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config) (*
|
||||
|
||||
publicConfig := config.Server
|
||||
publicConfig.Address = peer.Public.Listener.Addr().String()
|
||||
publicOptions, err := server.NewOptions(peer.Identity, publicConfig)
|
||||
publicOptions, err := tlsopts.NewOptions(peer.Identity, publicConfig.Config)
|
||||
if err != nil {
|
||||
return nil, errs.Combine(err, peer.Close())
|
||||
}
|
||||
|
||||
peer.Transport = transport.NewClient(publicOptions)
|
||||
|
||||
peer.Public.Server, err = server.New(publicOptions, peer.Public.Listener, grpcauth.NewAPIKeyInterceptor())
|
||||
if err != nil {
|
||||
return nil, errs.Combine(err, peer.Close())
|
||||
@ -327,20 +329,17 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config) (*
|
||||
if config.Repairer.PointerDBAddr == "" {
|
||||
config.Repairer.PointerDBAddr = peer.Addr()
|
||||
}
|
||||
peer.Repair.Repairer = repairer.NewService(peer.DB.RepairQueue(), &config.Repairer, peer.Identity, config.Repairer.Interval, config.Repairer.MaxRepair)
|
||||
peer.Repair.Repairer = repairer.NewService(peer.DB.RepairQueue(), &config.Repairer, peer.Transport, config.Repairer.Interval, config.Repairer.MaxRepair)
|
||||
}
|
||||
|
||||
{ // setup audit
|
||||
config := config.Audit
|
||||
|
||||
// TODO: use common transport Client and close to avoid leak
|
||||
transportClient := transport.NewClient(peer.Identity)
|
||||
|
||||
peer.Audit.Service, err = audit.NewService(peer.Log.Named("audit"),
|
||||
peer.DB.StatDB(),
|
||||
config.Interval, config.MaxRetriesStatDB,
|
||||
peer.Metainfo.Service, peer.Metainfo.Allocation,
|
||||
transportClient, peer.Overlay.Service,
|
||||
peer.Transport, peer.Overlay.Service,
|
||||
peer.Identity,
|
||||
)
|
||||
if err != nil {
|
||||
|
@ -15,6 +15,7 @@ import (
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/kademlia"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/peertls/tlsopts"
|
||||
pstore "storj.io/storj/pkg/piecestore"
|
||||
"storj.io/storj/pkg/piecestore/psserver"
|
||||
"storj.io/storj/pkg/piecestore/psserver/agreementsender"
|
||||
@ -90,10 +91,9 @@ type Peer struct {
|
||||
// New creates a new Storage Node.
|
||||
func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config) (*Peer, error) {
|
||||
peer := &Peer{
|
||||
Log: log,
|
||||
Identity: full,
|
||||
DB: db,
|
||||
Transport: transport.NewClient(full),
|
||||
Log: log,
|
||||
Identity: full,
|
||||
DB: db,
|
||||
}
|
||||
|
||||
var err error
|
||||
@ -106,11 +106,13 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config) (*P
|
||||
|
||||
publicConfig := config.Server
|
||||
publicConfig.Address = peer.Public.Listener.Addr().String()
|
||||
publicOptions, err := server.NewOptions(peer.Identity, publicConfig)
|
||||
publicOptions, err := tlsopts.NewOptions(peer.Identity, publicConfig.Config)
|
||||
if err != nil {
|
||||
return nil, errs.Combine(err, peer.Close())
|
||||
}
|
||||
|
||||
peer.Transport = transport.NewClient(publicOptions)
|
||||
|
||||
peer.Public.Server, err = server.New(publicOptions, peer.Public.Listener, nil)
|
||||
if err != nil {
|
||||
return nil, errs.Combine(err, peer.Close())
|
||||
@ -178,7 +180,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config) (*P
|
||||
config := config.Storage // TODO: separate config
|
||||
peer.Agreements.Sender = agreementsender.New(
|
||||
peer.Log.Named("agreements"),
|
||||
peer.DB.PSDB(), peer.Identity, peer.Kademlia.Service,
|
||||
peer.DB.PSDB(), peer.Transport, peer.Kademlia.Service,
|
||||
config.AgreementSenderCheckInterval,
|
||||
)
|
||||
}
|
||||
|
@ -16,12 +16,14 @@ import (
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/metainfo/kvmetainfo"
|
||||
"storj.io/storj/pkg/overlay"
|
||||
"storj.io/storj/pkg/peertls/tlsopts"
|
||||
"storj.io/storj/pkg/pointerdb/pdbclient"
|
||||
"storj.io/storj/pkg/storage/buckets"
|
||||
ecclient "storj.io/storj/pkg/storage/ec"
|
||||
"storj.io/storj/pkg/storage/segments"
|
||||
"storj.io/storj/pkg/storage/streams"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/pkg/transport"
|
||||
)
|
||||
|
||||
// RSConfig is a configuration struct that keeps details about default
|
||||
@ -61,6 +63,7 @@ type Config struct {
|
||||
Client ClientConfig
|
||||
RS RSConfig
|
||||
Enc EncryptionConfig
|
||||
TLS tlsopts.Config
|
||||
}
|
||||
|
||||
var (
|
||||
@ -74,6 +77,12 @@ var (
|
||||
func (c Config) GetMetainfo(ctx context.Context, identity *identity.FullIdentity) (db storj.Metainfo, ss streams.Store, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
tlsOpts, err := tlsopts.NewOptions(identity, c.TLS)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
tc := transport.NewClient(tlsOpts)
|
||||
|
||||
if c.Client.OverlayAddr == "" || c.Client.PointerDBAddr == "" {
|
||||
var errlist errs.Group
|
||||
if c.Client.OverlayAddr == "" {
|
||||
@ -85,17 +94,17 @@ func (c Config) GetMetainfo(ctx context.Context, identity *identity.FullIdentity
|
||||
return nil, nil, errlist.Err()
|
||||
}
|
||||
|
||||
oc, err := overlay.NewClient(identity, c.Client.OverlayAddr)
|
||||
oc, err := overlay.NewClient(tc, c.Client.OverlayAddr)
|
||||
if err != nil {
|
||||
return nil, nil, Error.New("failed to connect to overlay: %v", err)
|
||||
}
|
||||
|
||||
pdb, err := pdbclient.NewClient(identity, c.Client.PointerDBAddr, c.Client.APIKey)
|
||||
pdb, err := pdbclient.NewClient(tc, c.Client.PointerDBAddr, c.Client.APIKey)
|
||||
if err != nil {
|
||||
return nil, nil, Error.New("failed to connect to pointer DB: %v", err)
|
||||
}
|
||||
|
||||
ec := ecclient.NewClient(identity, c.RS.MaxBufferMem.Int())
|
||||
ec := ecclient.NewClient(tc, c.RS.MaxBufferMem.Int())
|
||||
fc, err := infectious.NewFEC(c.RS.MinThreshold, c.RS.MaxThreshold)
|
||||
if err != nil {
|
||||
return nil, nil, Error.New("failed to create erasure coding client: %v", err)
|
||||
|
Loading…
Reference in New Issue
Block a user