storj/pkg/provider/provider.go

148 lines
3.7 KiB
Go

// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package provider
import (
"context"
"io"
"net"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"storj.io/storj/storage"
)
var (
// ErrSetup is returned when there's an error with setup
ErrSetup = errs.Class("setup error")
)
// Responsibility represents a specific gRPC method collection to be registered
// on a shared gRPC server. PointerDB, OverlayCache, PieceStore, Kademlia,
// StatDB, etc. are all examples of Responsibilities.
type Responsibility interface {
Run(ctx context.Context, server *Provider) error
}
// Provider represents a bundle of responsibilities defined by a specific ID.
// Examples of providers are the heavy client, the storagenode, and the gateway.
type Provider struct {
lis net.Listener
g *grpc.Server
next []Responsibility
identity *FullIdentity
}
// NewProvider creates a Provider out of an Identity, a net.Listener, and a set
// of responsibilities.
func NewProvider(identity *FullIdentity, lis net.Listener,
responsibilities ...Responsibility) (*Provider, error) {
// NB: talk to anyone with an identity
ident, err := identity.ServerOption()
if err != nil {
return nil, err
}
return &Provider{
lis: lis,
g: grpc.NewServer(
grpc.StreamInterceptor(streamInterceptor),
grpc.UnaryInterceptor(unaryInterceptor),
ident,
),
next: responsibilities,
identity: identity,
}, nil
}
// SetupIdentity ensures a CA and identity exist and returns a config overrides map
func SetupIdentity(ctx context.Context, c CASetupConfig, i IdentitySetupConfig) error {
if s := c.Status(); s != NoCertNoKey && !c.Overwrite {
return ErrSetup.New("certificate authority file(s) exist: %s", s)
}
t, err := time.ParseDuration(c.Timeout)
if err != nil {
return errs.Wrap(err)
}
ctx, cancel := context.WithTimeout(ctx, t)
defer cancel()
// Load or create a certificate authority
ca, err := c.Create(ctx)
if err != nil {
return err
}
if s := c.Status(); s != NoCertNoKey && !c.Overwrite {
return ErrSetup.New("identity file(s) exist: %s", s)
}
// Create identity from new CA
_, err = i.Create(ca)
return err
}
// Identity returns the provider's identity
func (p *Provider) Identity() *FullIdentity { return p.identity }
// GRPC returns the provider's gRPC server for registration purposes
func (p *Provider) GRPC() *grpc.Server { return p.g }
// Close shuts down the provider
func (p *Provider) Close() error {
p.g.GracefulStop()
return nil
}
// Run will run the provider and all of its responsibilities
func (p *Provider) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
// are there any unstarted responsibilities? start those first. the
// responsibilities know to call Run again once they're ready.
if len(p.next) > 0 {
next := p.next[0]
p.next = p.next[1:]
return next.Run(ctx, p)
}
return p.g.Serve(p.lis)
}
func streamInterceptor(srv interface{}, ss grpc.ServerStream,
info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
err = handler(srv, ss)
if err != nil {
// no zap errors for canceled or wrong file downloads
if storage.ErrKeyNotFound.Has(err) ||
status.Code(err) == codes.Canceled ||
status.Code(err) == codes.Unavailable ||
err == io.EOF {
return err
}
zap.S().Errorf("%+v", err)
}
return err
}
func unaryInterceptor(ctx context.Context, req interface{},
info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{},
err error) {
resp, err = handler(ctx, req)
if err != nil {
// no zap errors for wrong file downloads
if status.Code(err) == codes.NotFound {
return resp, err
}
zap.S().Errorf("%+v", err)
}
return resp, err
}