storj/pkg/provider/provider.go
2018-08-02 15:52:16 -06:00

102 lines
2.6 KiB
Go

// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package provider
import (
"context"
"crypto/tls"
"net"
"go.uber.org/zap"
"google.golang.org/grpc"
"storj.io/storj/pkg/peertls"
)
// Responsibility represents a specific gRPC method collection to be registered
// on a shared gRPC server. PointerDB, OverlayCache, PieceStore, Kademlia,
// StatDB, etc. are all examples of Responsibilities.
type Responsibility interface {
Run(ctx context.Context, server *Provider) error
}
// Provider represents a bundle of responsibilities defined by a specific ID.
// Examples of providers are the heavy client, the farmer, and the gateway.
type Provider struct {
lis net.Listener
g *grpc.Server
next []Responsibility
identity *FullIdentity
}
// NewProvider creates a Provider out of an Identity, a net.Listener, and a set
// of responsibilities.
func NewProvider(identity *FullIdentity, lis net.Listener,
responsibilities ...Responsibility) (*Provider, error) {
return &Provider{
lis: lis,
g: grpc.NewServer(
grpc.StreamInterceptor(streamInterceptor),
grpc.UnaryInterceptor(unaryInterceptor),
),
next: responsibilities,
identity: identity,
}, nil
}
// Identity returns the provider's identity
func (p *Provider) Identity() *FullIdentity { return p.identity }
// GRPC returns the provider's gRPC server for registration purposes
func (p *Provider) GRPC() *grpc.Server { return p.g }
// Close shuts down the provider
func (p *Provider) Close() error {
p.g.GracefulStop()
return nil
}
// Run will run the provider and all of its responsibilities
func (p *Provider) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
// are there any unstarted responsibilities? start those first. the
// responsibilities know to call Run again once they're ready.
if len(p.next) > 0 {
next := p.next[0]
p.next = p.next[1:]
return next.Run(ctx, p)
}
return p.g.Serve(p.lis)
}
// TLSConfig returns the provider's identity as a TLS Config
func (p *Provider) TLSConfig() *tls.Config {
// TODO(jt): get rid of tls.Certificate
return (&peertls.TLSFileOptions{
LeafCertificate: p.identity.todoCert,
}).NewTLSConfig(nil)
}
func streamInterceptor(srv interface{}, ss grpc.ServerStream,
info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
err = handler(srv, ss)
if err != nil {
zap.S().Errorf("%+v", err)
}
return err
}
func unaryInterceptor(ctx context.Context, req interface{},
info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{},
err error) {
resp, err = handler(ctx, req)
if err != nil {
zap.S().Errorf("%+v", err)
}
return resp, err
}