ad327bedb1
* Satellite signs proofs * wip * remove direct apikey usage from pdbclient * adjusting unit tests * fix linter errors * unit tests * linter errors * remove usless interface * remove unused code * improve unit tests * signature generation * code review changes * code review comments * back to satellite-id signature generation * remove go-grpc-middlewar dependency * small step back * linter fixes * fix tests * packages reorganization * Move TestAPIKeyInjector to grpcauth package
163 lines
4.4 KiB
Go
163 lines
4.4 KiB
Go
// Copyright (C) 2018 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package provider
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"net"
|
|
"time"
|
|
|
|
"github.com/zeebo/errs"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
|
|
"storj.io/storj/storage"
|
|
)
|
|
|
|
var (
|
|
// ErrSetup is returned when there's an error with setup
|
|
ErrSetup = errs.Class("setup error")
|
|
)
|
|
|
|
// Responsibility represents a specific gRPC method collection to be registered
|
|
// on a shared gRPC server. PointerDB, OverlayCache, PieceStore, Kademlia,
|
|
// StatDB, etc. are all examples of Responsibilities.
|
|
type Responsibility interface {
|
|
Run(ctx context.Context, server *Provider) error
|
|
}
|
|
|
|
// Provider represents a bundle of responsibilities defined by a specific ID.
|
|
// Examples of providers are the heavy client, the storagenode, and the gateway.
|
|
type Provider struct {
|
|
lis net.Listener
|
|
g *grpc.Server
|
|
next []Responsibility
|
|
identity *FullIdentity
|
|
}
|
|
|
|
// NewProvider creates a Provider out of an Identity, a net.Listener, a UnaryInterceptorProvider and
|
|
// a set of responsibilities.
|
|
func NewProvider(identity *FullIdentity, lis net.Listener, interceptor grpc.UnaryServerInterceptor,
|
|
responsibilities ...Responsibility) (*Provider, error) {
|
|
// NB: talk to anyone with an identity
|
|
ident, err := identity.ServerOption()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
unaryInterceptor := unaryInterceptor
|
|
if interceptor != nil {
|
|
unaryInterceptor = combineInterceptors(unaryInterceptor, interceptor)
|
|
}
|
|
|
|
return &Provider{
|
|
lis: lis,
|
|
g: grpc.NewServer(
|
|
grpc.StreamInterceptor(streamInterceptor),
|
|
grpc.UnaryInterceptor(unaryInterceptor),
|
|
ident,
|
|
),
|
|
next: responsibilities,
|
|
identity: identity,
|
|
}, nil
|
|
}
|
|
|
|
// SetupIdentity ensures a CA and identity exist and returns a config overrides map
|
|
func SetupIdentity(ctx context.Context, c CASetupConfig, i IdentitySetupConfig) error {
|
|
if s := c.Status(); s != NoCertNoKey && !c.Overwrite {
|
|
return ErrSetup.New("certificate authority file(s) exist: %s", s)
|
|
}
|
|
|
|
t, err := time.ParseDuration(c.Timeout)
|
|
if err != nil {
|
|
return errs.Wrap(err)
|
|
}
|
|
ctx, cancel := context.WithTimeout(ctx, t)
|
|
defer cancel()
|
|
|
|
// Load or create a certificate authority
|
|
ca, err := c.Create(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if s := c.Status(); s != NoCertNoKey && !c.Overwrite {
|
|
return ErrSetup.New("identity file(s) exist: %s", s)
|
|
}
|
|
|
|
// Create identity from new CA
|
|
_, err = i.Create(ca)
|
|
return err
|
|
}
|
|
|
|
// Identity returns the provider's identity
|
|
func (p *Provider) Identity() *FullIdentity { return p.identity }
|
|
|
|
// GRPC returns the provider's gRPC server for registration purposes
|
|
func (p *Provider) GRPC() *grpc.Server { return p.g }
|
|
|
|
// Close shuts down the provider
|
|
func (p *Provider) Close() error {
|
|
p.g.GracefulStop()
|
|
return nil
|
|
}
|
|
|
|
// Run will run the provider and all of its responsibilities
|
|
func (p *Provider) Run(ctx context.Context) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
// are there any unstarted responsibilities? start those first. the
|
|
// responsibilities know to call Run again once they're ready.
|
|
if len(p.next) > 0 {
|
|
next := p.next[0]
|
|
p.next = p.next[1:]
|
|
return next.Run(ctx, p)
|
|
}
|
|
|
|
return p.g.Serve(p.lis)
|
|
}
|
|
|
|
func streamInterceptor(srv interface{}, ss grpc.ServerStream,
|
|
info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
|
|
err = handler(srv, ss)
|
|
if err != nil {
|
|
// no zap errors for canceled or wrong file downloads
|
|
if storage.ErrKeyNotFound.Has(err) ||
|
|
status.Code(err) == codes.Canceled ||
|
|
status.Code(err) == codes.Unavailable ||
|
|
err == io.EOF {
|
|
return err
|
|
}
|
|
zap.S().Errorf("%+v", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func unaryInterceptor(ctx context.Context, req interface{},
|
|
info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{},
|
|
err error) {
|
|
resp, err = handler(ctx, req)
|
|
if err != nil {
|
|
// no zap errors for wrong file downloads
|
|
if status.Code(err) == codes.NotFound {
|
|
return resp, err
|
|
}
|
|
zap.S().Errorf("%+v", err)
|
|
}
|
|
return resp, err
|
|
}
|
|
|
|
func combineInterceptors(a, b grpc.UnaryServerInterceptor) grpc.UnaryServerInterceptor {
|
|
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
|
|
return a(ctx, req, info, func(actx context.Context, areq interface{}) (interface{}, error) {
|
|
return b(actx, areq, info, func(bctx context.Context, breq interface{}) (interface{}, error) {
|
|
return handler(bctx, breq)
|
|
})
|
|
})
|
|
}
|
|
}
|