173 lines
4.8 KiB
Go
173 lines
4.8 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package vouchers
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/zeebo/errs"
|
|
"go.uber.org/zap"
|
|
"golang.org/x/sync/errgroup"
|
|
monkit "gopkg.in/spacemonkeygo/monkit.v2"
|
|
|
|
"storj.io/storj/internal/sync2"
|
|
"storj.io/storj/pkg/kademlia"
|
|
"storj.io/storj/pkg/pb"
|
|
"storj.io/storj/pkg/storj"
|
|
"storj.io/storj/pkg/transport"
|
|
"storj.io/storj/storagenode/trust"
|
|
)
|
|
|
|
var (
|
|
// VoucherError represents errors with vouchers
|
|
VoucherError = errs.Class("voucher")
|
|
|
|
mon = monkit.Package()
|
|
)
|
|
|
|
// DB implements storing and retrieving vouchers
|
|
type DB interface {
|
|
// Put inserts or updates a voucher from a satellite
|
|
Put(context.Context, *pb.Voucher) error
|
|
// GetValid returns one valid voucher from the list of approved satellites
|
|
GetValid(context.Context, []storj.NodeID) (*pb.Voucher, error)
|
|
// NeedVoucher returns true if a voucher from a particular satellite is expired, about to expire, or does not exist
|
|
NeedVoucher(context.Context, storj.NodeID, time.Duration) (bool, error)
|
|
}
|
|
|
|
// Config defines configuration for requesting vouchers.
|
|
type Config struct {
|
|
Interval time.Duration `help:"interval between voucher service iterations" default:"168h0m0s"`
|
|
}
|
|
|
|
// Service is a service for requesting vouchers
|
|
type Service struct {
|
|
log *zap.Logger
|
|
|
|
kademlia *kademlia.Kademlia
|
|
transport transport.Client
|
|
|
|
vouchersdb DB
|
|
trust *trust.Pool
|
|
|
|
expirationBuffer time.Duration
|
|
|
|
Loop sync2.Cycle
|
|
}
|
|
|
|
// NewService creates a new voucher service
|
|
func NewService(log *zap.Logger, kad *kademlia.Kademlia, transport transport.Client, vouchersdb DB, trust *trust.Pool, interval, expirationBuffer time.Duration) *Service {
|
|
return &Service{
|
|
log: log,
|
|
kademlia: kad,
|
|
transport: transport,
|
|
vouchersdb: vouchersdb,
|
|
trust: trust,
|
|
expirationBuffer: expirationBuffer,
|
|
Loop: *sync2.NewCycle(interval),
|
|
}
|
|
}
|
|
|
|
// Run sends requests to satellites for vouchers
|
|
func (service *Service) Run(ctx context.Context) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
return service.Loop.Run(ctx, service.RunOnce)
|
|
}
|
|
|
|
// RunOnce runs one iteration of the voucher request service
|
|
func (service *Service) RunOnce(ctx context.Context) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
service.log.Info("Checking vouchers")
|
|
|
|
trustedSatellites := service.trust.GetSatellites((ctx))
|
|
|
|
if len(trustedSatellites) == 0 {
|
|
service.log.Debug("No trusted satellites configured. No vouchers to request")
|
|
return nil
|
|
}
|
|
|
|
var group errgroup.Group
|
|
ctx, cancel := context.WithTimeout(ctx, time.Hour)
|
|
defer cancel()
|
|
for _, satellite := range trustedSatellites {
|
|
satellite := satellite
|
|
needVoucher, err := service.vouchersdb.NeedVoucher(ctx, satellite, service.expirationBuffer)
|
|
if err != nil {
|
|
service.log.Error("getting voucher status", zap.Error(err))
|
|
return nil
|
|
}
|
|
if needVoucher {
|
|
group.Go(func() error {
|
|
service.Request(ctx, satellite)
|
|
return nil
|
|
})
|
|
}
|
|
}
|
|
_ = group.Wait() // doesn't return errors
|
|
|
|
return nil
|
|
}
|
|
|
|
// Request makes a voucher request to a satellite
|
|
func (service *Service) Request(ctx context.Context, satelliteID storj.NodeID) {
|
|
service.log.Info("Requesting voucher", zap.String("satellite", satelliteID.String()))
|
|
err := service.request(ctx, satelliteID)
|
|
if err != nil {
|
|
service.log.Error("Error requesting voucher", zap.String("satellite", satelliteID.String()), zap.Error(err))
|
|
}
|
|
}
|
|
|
|
func (service *Service) request(ctx context.Context, satelliteID storj.NodeID) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
satellite, err := service.kademlia.FindNode(ctx, satelliteID)
|
|
if err != nil {
|
|
return VoucherError.New("unable to find satellite on the network: %v", err)
|
|
}
|
|
|
|
conn, err := service.transport.DialNode(ctx, &satellite)
|
|
if err != nil {
|
|
return VoucherError.New("unable to connect to the satellite: %v", err)
|
|
}
|
|
defer func() {
|
|
if cerr := conn.Close(); cerr != nil {
|
|
err = errs.Combine(err, VoucherError.New("failed to close connection: %v", err))
|
|
}
|
|
}()
|
|
|
|
resp, err := pb.NewVouchersClient(conn).Request(ctx, &pb.VoucherRequest{})
|
|
if err != nil {
|
|
return VoucherError.New("failed to start request: %v", err)
|
|
}
|
|
|
|
// TODO: separate status for disqualified nodes?
|
|
switch resp.GetStatus() {
|
|
case pb.VoucherResponse_REJECTED:
|
|
service.log.Info("Voucher request denied. Vetting process not yet complete")
|
|
case pb.VoucherResponse_ACCEPTED:
|
|
voucher := resp.GetVoucher()
|
|
|
|
if err := service.VerifyVoucher(ctx, satelliteID, voucher); err != nil {
|
|
return err
|
|
}
|
|
|
|
err = service.vouchersdb.Put(ctx, voucher)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
service.log.Info("Voucher received", zap.String("satellite", voucher.SatelliteId.String()))
|
|
default:
|
|
service.log.Warn("Unknown voucher response status")
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// Close stops the voucher service
|
|
func (service *Service) Close() error {
|
|
service.Loop.Close()
|
|
return nil
|
|
}
|