storj/storagenode/vouchers/service.go
Cameron 5f096a3eab
Remove GetValid, add GetAll to vouchers DB (#2594)
* refactor GetValid to GetAll

* update design doc
2019-07-19 10:52:44 -04:00

174 lines
4.6 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package vouchers
import (
"context"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/internal/sync2"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
"storj.io/storj/pkg/transport"
"storj.io/storj/storagenode/trust"
)
var (
// VoucherError represents errors with vouchers
VoucherError = errs.Class("voucher")
mon = monkit.Package()
)
// DB implements storing and retrieving vouchers
type DB interface {
// Put inserts or updates a voucher from a satellite
Put(context.Context, *pb.Voucher) error
// GetAll returns all vouchers in the table
GetAll(context.Context) ([]*pb.Voucher, error)
// NeedVoucher returns true if a voucher from a particular satellite is expired, about to expire, or does not exist
NeedVoucher(context.Context, storj.NodeID, time.Duration) (bool, error)
}
// Config defines configuration for requesting vouchers.
type Config struct {
Interval time.Duration `help:"interval between voucher service iterations" default:"168h0m0s"`
}
// Service is a service for requesting vouchers
type Service struct {
log *zap.Logger
transport transport.Client
vouchersdb DB
trust *trust.Pool
expirationBuffer time.Duration
Loop sync2.Cycle
}
// NewService creates a new voucher service
func NewService(log *zap.Logger, transport transport.Client, vouchersdb DB, trust *trust.Pool, interval, expirationBuffer time.Duration) *Service {
return &Service{
log: log,
transport: transport,
vouchersdb: vouchersdb,
trust: trust,
expirationBuffer: expirationBuffer,
Loop: *sync2.NewCycle(interval),
}
}
// Run sends requests to satellites for vouchers
func (service *Service) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
return service.Loop.Run(ctx, service.RunOnce)
}
// RunOnce runs one iteration of the voucher request service
func (service *Service) RunOnce(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
service.log.Info("Checking vouchers")
trustedSatellites := service.trust.GetSatellites((ctx))
if len(trustedSatellites) == 0 {
service.log.Debug("No trusted satellites configured. No vouchers to request")
return nil
}
var group errgroup.Group
ctx, cancel := context.WithTimeout(ctx, time.Hour)
defer cancel()
for _, satellite := range trustedSatellites {
satellite := satellite
needVoucher, err := service.vouchersdb.NeedVoucher(ctx, satellite, service.expirationBuffer)
if err != nil {
service.log.Error("getting voucher status", zap.Error(err))
return nil
}
if needVoucher {
group.Go(func() error {
service.Request(ctx, satellite)
return nil
})
}
}
_ = group.Wait() // doesn't return errors
return nil
}
// Request makes a voucher request to a satellite
func (service *Service) Request(ctx context.Context, satelliteID storj.NodeID) {
service.log.Info("Requesting voucher", zap.String("satellite", satelliteID.String()))
err := service.request(ctx, satelliteID)
if err != nil {
service.log.Error("Error requesting voucher", zap.String("satellite", satelliteID.String()), zap.Error(err))
}
}
func (service *Service) request(ctx context.Context, satelliteID storj.NodeID) (err error) {
defer mon.Task()(&ctx)(&err)
address, err := service.trust.GetAddress(ctx, satelliteID)
if err != nil {
return err
}
conn, err := service.transport.DialNode(ctx, &pb.Node{
Id: satelliteID,
Address: &pb.NodeAddress{
Transport: pb.NodeTransport_TCP_TLS_GRPC,
Address: address,
},
})
if err != nil {
return VoucherError.New("unable to connect to the satellite: %v", err)
}
defer func() {
if cerr := conn.Close(); cerr != nil {
err = errs.Combine(err, VoucherError.New("failed to close connection: %v", err))
}
}()
resp, err := pb.NewVouchersClient(conn).Request(ctx, &pb.VoucherRequest{})
if err != nil {
return VoucherError.New("failed to start request: %v", err)
}
switch resp.GetStatus() {
case pb.VoucherResponse_REJECTED:
service.log.Info("Voucher request denied. Vetting process not yet complete")
case pb.VoucherResponse_ACCEPTED:
voucher := resp.GetVoucher()
if err := service.VerifyVoucher(ctx, satelliteID, voucher); err != nil {
return err
}
err = service.vouchersdb.Put(ctx, voucher)
if err != nil {
return err
}
service.log.Info("Voucher received", zap.String("satellite", voucher.SatelliteId.String()))
default:
service.log.Warn("Unknown voucher response status")
}
return err
}
// Close stops the voucher service
func (service *Service) Close() error {
service.Loop.Close()
return nil
}