2019-03-18 10:55:06 +00:00
|
|
|
// Copyright (C) 2019 Storj Labs, Inc.
|
|
|
|
// See LICENSE for copying information.
|
|
|
|
|
|
|
|
package orders
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2019-03-21 13:24:26 +00:00
|
|
|
"io"
|
2019-03-18 10:55:06 +00:00
|
|
|
"time"
|
|
|
|
|
2019-06-04 13:31:39 +01:00
|
|
|
"github.com/zeebo/errs"
|
2019-03-18 10:55:06 +00:00
|
|
|
"go.uber.org/zap"
|
2019-03-21 13:24:26 +00:00
|
|
|
"golang.org/x/sync/errgroup"
|
2019-06-04 13:31:39 +01:00
|
|
|
monkit "gopkg.in/spacemonkeygo/monkit.v2"
|
2019-03-18 10:55:06 +00:00
|
|
|
|
2019-03-21 13:24:26 +00:00
|
|
|
"storj.io/storj/internal/sync2"
|
2019-03-18 10:55:06 +00:00
|
|
|
"storj.io/storj/pkg/identity"
|
|
|
|
"storj.io/storj/pkg/kademlia"
|
|
|
|
"storj.io/storj/pkg/pb"
|
2019-03-21 13:24:26 +00:00
|
|
|
"storj.io/storj/pkg/storj"
|
2019-03-18 10:55:06 +00:00
|
|
|
"storj.io/storj/pkg/transport"
|
|
|
|
)
|
|
|
|
|
2019-06-04 13:31:39 +01:00
|
|
|
var (
|
|
|
|
// OrderError represents errors with orders
|
|
|
|
OrderError = errs.Class("order")
|
|
|
|
|
|
|
|
mon = monkit.Package()
|
|
|
|
)
|
|
|
|
|
2019-03-18 10:55:06 +00:00
|
|
|
// Info contains full information about an order.
|
|
|
|
type Info struct {
|
2019-07-01 16:54:11 +01:00
|
|
|
Limit *pb.OrderLimit
|
|
|
|
Order *pb.Order
|
2019-03-18 10:55:06 +00:00
|
|
|
Uplink *identity.PeerIdentity
|
|
|
|
}
|
|
|
|
|
2019-03-21 13:24:26 +00:00
|
|
|
// ArchivedInfo contains full information about an archived order.
|
|
|
|
type ArchivedInfo struct {
|
2019-07-01 16:54:11 +01:00
|
|
|
Limit *pb.OrderLimit
|
|
|
|
Order *pb.Order
|
2019-03-21 13:24:26 +00:00
|
|
|
Uplink *identity.PeerIdentity
|
|
|
|
|
|
|
|
Status Status
|
|
|
|
ArchivedAt time.Time
|
|
|
|
}
|
|
|
|
|
|
|
|
// Status is the archival status of the order.
|
|
|
|
type Status byte
|
|
|
|
|
|
|
|
// Statuses for satellite responses.
|
|
|
|
const (
|
|
|
|
StatusUnsent Status = iota
|
|
|
|
StatusAccepted
|
|
|
|
StatusRejected
|
|
|
|
)
|
|
|
|
|
2019-03-18 10:55:06 +00:00
|
|
|
// DB implements storing orders for sending to the satellite.
|
|
|
|
type DB interface {
|
|
|
|
// Enqueue inserts order to the list of orders needing to be sent to the satellite.
|
|
|
|
Enqueue(ctx context.Context, info *Info) error
|
|
|
|
// ListUnsent returns orders that haven't been sent yet.
|
|
|
|
ListUnsent(ctx context.Context, limit int) ([]*Info, error)
|
2019-03-21 13:24:26 +00:00
|
|
|
// ListUnsentBySatellite returns orders that haven't been sent yet grouped by satellite.
|
|
|
|
ListUnsentBySatellite(ctx context.Context) (map[storj.NodeID][]*Info, error)
|
|
|
|
|
|
|
|
// Archive marks order as being handled.
|
|
|
|
Archive(ctx context.Context, satellite storj.NodeID, serial storj.SerialNumber, status Status) error
|
|
|
|
|
|
|
|
// ListArchived returns orders that have been sent.
|
|
|
|
ListArchived(ctx context.Context, limit int) ([]*ArchivedInfo, error)
|
2019-03-18 10:55:06 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// SenderConfig defines configuration for sending orders.
|
|
|
|
type SenderConfig struct {
|
2019-03-21 13:24:26 +00:00
|
|
|
Interval time.Duration `help:"duration between sending" default:"1h0m0s"`
|
|
|
|
Timeout time.Duration `help:"timeout for sending" default:"1h0m0s"`
|
2019-03-18 10:55:06 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Sender sends every interval unsent orders to the satellite.
|
|
|
|
type Sender struct {
|
|
|
|
log *zap.Logger
|
|
|
|
config SenderConfig
|
|
|
|
|
2019-03-21 13:24:26 +00:00
|
|
|
transport transport.Client
|
|
|
|
kademlia *kademlia.Kademlia
|
|
|
|
orders DB
|
|
|
|
|
|
|
|
Loop sync2.Cycle
|
2019-03-18 10:55:06 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// NewSender creates an order sender.
|
2019-03-21 13:24:26 +00:00
|
|
|
func NewSender(log *zap.Logger, transport transport.Client, kademlia *kademlia.Kademlia, orders DB, config SenderConfig) *Sender {
|
2019-03-18 10:55:06 +00:00
|
|
|
return &Sender{
|
2019-03-21 13:24:26 +00:00
|
|
|
log: log,
|
|
|
|
transport: transport,
|
|
|
|
kademlia: kademlia,
|
|
|
|
orders: orders,
|
2019-03-27 10:24:35 +00:00
|
|
|
config: config,
|
2019-03-21 13:24:26 +00:00
|
|
|
|
|
|
|
Loop: *sync2.NewCycle(config.Interval),
|
2019-03-18 10:55:06 +00:00
|
|
|
}
|
|
|
|
}
|
2019-03-21 13:24:26 +00:00
|
|
|
|
|
|
|
// Run sends orders on every interval to the appropriate satellites.
|
2019-06-04 13:31:39 +01:00
|
|
|
func (sender *Sender) Run(ctx context.Context) (err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
return sender.Loop.Run(ctx, sender.runOnce)
|
|
|
|
}
|
2019-03-21 13:24:26 +00:00
|
|
|
|
2019-06-04 13:31:39 +01:00
|
|
|
func (sender *Sender) runOnce(ctx context.Context) (err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
sender.log.Debug("sending")
|
2019-03-21 13:24:26 +00:00
|
|
|
|
2019-06-04 13:31:39 +01:00
|
|
|
ordersBySatellite, err := sender.orders.ListUnsentBySatellite(ctx)
|
|
|
|
if err != nil {
|
|
|
|
sender.log.Error("listing orders", zap.Error(err))
|
|
|
|
return nil
|
|
|
|
}
|
2019-03-21 13:24:26 +00:00
|
|
|
|
2019-06-04 13:31:39 +01:00
|
|
|
if len(ordersBySatellite) > 0 {
|
|
|
|
var group errgroup.Group
|
|
|
|
ctx, cancel := context.WithTimeout(ctx, sender.config.Timeout)
|
|
|
|
defer cancel()
|
2019-03-27 10:24:35 +00:00
|
|
|
|
2019-06-04 13:31:39 +01:00
|
|
|
for satelliteID, orders := range ordersBySatellite {
|
|
|
|
satelliteID, orders := satelliteID, orders
|
|
|
|
group.Go(func() error {
|
|
|
|
|
|
|
|
sender.Settle(ctx, satelliteID, orders)
|
|
|
|
return nil
|
|
|
|
})
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
2019-06-04 13:31:39 +01:00
|
|
|
_ = group.Wait() // doesn't return errors
|
|
|
|
} else {
|
|
|
|
sender.log.Debug("no orders to send")
|
|
|
|
}
|
2019-03-21 13:24:26 +00:00
|
|
|
|
2019-06-04 13:31:39 +01:00
|
|
|
return nil
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
|
|
|
|
2019-03-27 10:24:35 +00:00
|
|
|
// Settle uploads orders to the satellite.
|
2019-03-21 13:24:26 +00:00
|
|
|
func (sender *Sender) Settle(ctx context.Context, satelliteID storj.NodeID, orders []*Info) {
|
|
|
|
log := sender.log.Named(satelliteID.String())
|
2019-06-04 13:31:39 +01:00
|
|
|
err := sender.settle(ctx, log, satelliteID, orders)
|
|
|
|
if err != nil {
|
|
|
|
log.Error("failed to settle orders", zap.Error(err))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (sender *Sender) settle(ctx context.Context, log *zap.Logger, satelliteID storj.NodeID, orders []*Info) (err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-03-21 13:24:26 +00:00
|
|
|
|
|
|
|
log.Info("sending", zap.Int("count", len(orders)))
|
|
|
|
defer log.Info("finished")
|
|
|
|
|
|
|
|
satellite, err := sender.kademlia.FindNode(ctx, satelliteID)
|
|
|
|
if err != nil {
|
2019-06-04 13:31:39 +01:00
|
|
|
return OrderError.New("unable to find satellite on the network: %v", err)
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
conn, err := sender.transport.DialNode(ctx, &satellite)
|
|
|
|
if err != nil {
|
2019-06-04 13:31:39 +01:00
|
|
|
return OrderError.New("unable to connect to the satellite: %v", err)
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
|
|
|
defer func() {
|
2019-06-04 13:31:39 +01:00
|
|
|
if cerr := conn.Close(); cerr != nil {
|
2019-07-04 11:34:23 +01:00
|
|
|
err = errs.Combine(err, OrderError.New("failed to close connection: %v", cerr))
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
client, err := pb.NewOrdersClient(conn).Settlement(ctx)
|
|
|
|
if err != nil {
|
2019-06-04 13:31:39 +01:00
|
|
|
return OrderError.New("failed to start settlement: %v", err)
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
var group errgroup.Group
|
|
|
|
group.Go(func() error {
|
|
|
|
for _, order := range orders {
|
|
|
|
err := client.Send(&pb.SettlementRequest{
|
|
|
|
Limit: order.Limit,
|
|
|
|
Order: order.Order,
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return client.CloseSend()
|
|
|
|
})
|
|
|
|
|
2019-06-04 13:31:39 +01:00
|
|
|
var errList errs.Group
|
|
|
|
errHandle := func(cls errs.Class, format string, args ...interface{}) {
|
|
|
|
log.Sugar().Errorf(format, args...)
|
|
|
|
errList.Add(cls.New(format, args...))
|
|
|
|
}
|
2019-03-21 13:24:26 +00:00
|
|
|
for {
|
|
|
|
response, err := client.Recv()
|
|
|
|
if err != nil {
|
|
|
|
if err == io.EOF {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
2019-06-04 13:31:39 +01:00
|
|
|
errHandle(OrderError, "failed to receive response: %v", err)
|
2019-03-21 13:24:26 +00:00
|
|
|
break
|
|
|
|
}
|
|
|
|
|
|
|
|
switch response.Status {
|
|
|
|
case pb.SettlementResponse_ACCEPTED:
|
|
|
|
err = sender.orders.Archive(ctx, satelliteID, response.SerialNumber, StatusAccepted)
|
|
|
|
if err != nil {
|
2019-06-04 13:31:39 +01:00
|
|
|
errHandle(OrderError, "failed to archive order as accepted: serial: %v, %v", response.SerialNumber, err)
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
|
|
|
case pb.SettlementResponse_REJECTED:
|
|
|
|
err = sender.orders.Archive(ctx, satelliteID, response.SerialNumber, StatusRejected)
|
|
|
|
if err != nil {
|
2019-06-04 13:31:39 +01:00
|
|
|
errHandle(OrderError, "failed to archive order as rejected: serial: %v, %v", response.SerialNumber, err)
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
|
|
|
default:
|
2019-06-04 13:31:39 +01:00
|
|
|
errHandle(OrderError, "unexpected response: %v", response.Status)
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := group.Wait(); err != nil {
|
2019-06-30 14:02:12 +01:00
|
|
|
errHandle(OrderError, "sending agreements returned an error: %v", err)
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
2019-06-04 13:31:39 +01:00
|
|
|
|
|
|
|
return errList.Err()
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Close stops the sending service.
|
|
|
|
func (sender *Sender) Close() error {
|
2019-05-08 12:11:59 +01:00
|
|
|
sender.Loop.Close()
|
2019-03-21 13:24:26 +00:00
|
|
|
return nil
|
|
|
|
}
|