storj/pkg/provider/provider.go
Bryan White 5d20cf8829
Node Identity (#193)
* peertls: don't log errors for double close

understood that this part of the code is undergoing heavy change
right now, but just want to make sure this fix gets incorporated
somewhere

* git cleanup: node-id stuff

* cleanup

* rename identity_util.go

* wip `CertificateAuthority` refactor

* refactoring

* gitignore update

* wip

* Merge remote-tracking branch 'storj/doubleclose' into node-id3

* storj/doubleclose:
  peertls: don't log errors for double close

* add peertls tests & gomports

* wip:

+ refactor
+ style changes
+ cleanup
+ [wip] add version to CA and identity configs
+ [wip] heavy client setup

* refactor

* wip:

+ refactor
+ style changes
+ add `CAConfig.Load`
+ add `CAConfig.Save`

* wip:

+ add `LoadOrCreate` and `Create` to CA and Identity configs
+ add overwrite to CA and identity configs
+ heavy client setup
+ refactor
+ style changes
+ cleanup

* wip

* fixing things

* fixing things

* wip hc setup

* hc setup:

+ refactor
+ bugfixing

* improvements based on reveiw feedback

* goimports

* improvements:

+ responding to review feedback
+ refactor

* feedback-based improvements

* feedback-based improvements

* feedback-based improvements

* feedback-based improvements

* feedback-based improvements

* feedback-based improvements

* cleanup

* refactoring CA and Identity structs

* Merge branch 'master' into node-id3

* move version field to setup config structs for CA and identity

* fix typo

* responding to revieiw feedback

* responding to revieiw feedback

* responding to revieiw feedback

* responding to revieiw feedback

* responding to revieiw feedback

* responding to revieiw feedback

* Merge branch 'master' into node-id3

* fix gateway setup finally

* go imports

* fix `FullCertificateAuthority.GenerateIdentity`

* cleanup overlay tests

* bugfixing

* update ca/identity setup

* go imports

* fix peertls test copy/paste fail

* responding to review feedback

* setup tweaking

* update farmer setup
2018-08-13 10:39:45 +02:00

135 lines
3.3 KiB
Go

// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package provider
import (
"context"
"net"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"google.golang.org/grpc"
)
var (
ErrSetup = errs.Class("setup error")
)
// Responsibility represents a specific gRPC method collection to be registered
// on a shared gRPC server. PointerDB, OverlayCache, PieceStore, Kademlia,
// StatDB, etc. are all examples of Responsibilities.
type Responsibility interface {
Run(ctx context.Context, server *Provider) error
}
// Provider represents a bundle of responsibilities defined by a specific ID.
// Examples of providers are the heavy client, the farmer, and the gateway.
type Provider struct {
lis net.Listener
g *grpc.Server
next []Responsibility
identity *FullIdentity
}
// NewProvider creates a Provider out of an Identity, a net.Listener, and a set
// of responsibilities.
func NewProvider(identity *FullIdentity, lis net.Listener,
responsibilities ...Responsibility) (*Provider, error) {
// NB: talk to anyone with an identity
s, err := identity.ServerOption()
if err != nil {
return nil, err
}
return &Provider{
lis: lis,
g: grpc.NewServer(
grpc.StreamInterceptor(streamInterceptor),
grpc.UnaryInterceptor(unaryInterceptor),
s,
),
next: responsibilities,
identity: identity,
}, nil
}
// SetupIdentity ensures a CA and identity exist and returns a config overrides map
func SetupIdentity(ctx context.Context, c CASetupConfig, i IdentitySetupConfig) error {
if s := c.Stat(); s == NoCertNoKey || c.Overwrite {
t, err := time.ParseDuration(c.Timeout)
if err != nil {
return errs.Wrap(err)
}
ctx, _ = context.WithTimeout(ctx, t)
// Load or create a certificate authority
ca, err := c.Create(ctx, 4)
if err != nil {
return err
}
if s := i.Stat(); s == NoCertNoKey || i.Overwrite {
// Create identity from new CA
_, err = i.Create(ca)
if err != nil {
return err
}
return nil
} else {
return ErrSetup.New("identity file(s) exist: %s", s)
}
} else {
return ErrSetup.New("certificate authority file(s) exist: %s", s)
}
}
// Identity returns the provider's identity
func (p *Provider) Identity() *FullIdentity { return p.identity }
// GRPC returns the provider's gRPC server for registration purposes
func (p *Provider) GRPC() *grpc.Server { return p.g }
// Close shuts down the provider
func (p *Provider) Close() error {
p.g.GracefulStop()
return nil
}
// Run will run the provider and all of its responsibilities
func (p *Provider) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
// are there any unstarted responsibilities? start those first. the
// responsibilities know to call Run again once they're ready.
if len(p.next) > 0 {
next := p.next[0]
p.next = p.next[1:]
return next.Run(ctx, p)
}
return p.g.Serve(p.lis)
}
func streamInterceptor(srv interface{}, ss grpc.ServerStream,
info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
err = handler(srv, ss)
if err != nil {
zap.S().Errorf("%+v", err)
}
return err
}
func unaryInterceptor(ctx context.Context, req interface{},
info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{},
err error) {
resp, err = handler(ctx, req)
if err != nil {
zap.S().Errorf("%+v", err)
}
return resp, err
}