2019-03-27 10:24:35 +00:00
|
|
|
// Copyright (C) 2019 Storj Labs, Inc.
|
|
|
|
// See LICENSE for copying information.
|
|
|
|
|
|
|
|
package orders
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"io"
|
|
|
|
"time"
|
|
|
|
|
2019-06-25 16:58:42 +01:00
|
|
|
"github.com/skyrings/skyring-common/tools/uuid"
|
2019-03-27 10:24:35 +00:00
|
|
|
"github.com/zeebo/errs"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
"google.golang.org/grpc/codes"
|
|
|
|
"google.golang.org/grpc/status"
|
2019-07-09 22:54:00 +01:00
|
|
|
"gopkg.in/spacemonkeygo/monkit.v2"
|
2019-03-27 10:24:35 +00:00
|
|
|
|
|
|
|
"storj.io/storj/pkg/identity"
|
|
|
|
"storj.io/storj/pkg/pb"
|
2019-07-28 06:55:36 +01:00
|
|
|
"storj.io/storj/pkg/signing"
|
2019-03-28 20:09:23 +00:00
|
|
|
"storj.io/storj/pkg/storj"
|
2019-03-27 10:24:35 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
// DB implements saving order after receiving from storage node
|
|
|
|
type DB interface {
|
2019-03-28 20:09:23 +00:00
|
|
|
// CreateSerialInfo creates serial number entry in database
|
|
|
|
CreateSerialInfo(ctx context.Context, serialNumber storj.SerialNumber, bucketID []byte, limitExpiration time.Time) error
|
2019-04-01 21:14:58 +01:00
|
|
|
// UseSerialNumber creates serial number entry in database
|
|
|
|
UseSerialNumber(ctx context.Context, serialNumber storj.SerialNumber, storageNodeID storj.NodeID) ([]byte, error)
|
|
|
|
// UnuseSerialNumber removes pair serial number -> storage node id from database
|
|
|
|
UnuseSerialNumber(ctx context.Context, serialNumber storj.SerialNumber, storageNodeID storj.NodeID) error
|
|
|
|
|
|
|
|
// UpdateBucketBandwidthAllocation updates 'allocated' bandwidth for given bucket
|
2019-06-25 16:58:42 +01:00
|
|
|
UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
|
2019-04-01 21:14:58 +01:00
|
|
|
// UpdateBucketBandwidthSettle updates 'settled' bandwidth for given bucket
|
2019-06-25 16:58:42 +01:00
|
|
|
UpdateBucketBandwidthSettle(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
|
2019-04-01 21:14:58 +01:00
|
|
|
// UpdateBucketBandwidthInline updates 'inline' bandwidth for given bucket
|
2019-06-25 16:58:42 +01:00
|
|
|
UpdateBucketBandwidthInline(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
|
2019-04-01 21:14:58 +01:00
|
|
|
|
2019-06-10 15:58:28 +01:00
|
|
|
// UpdateStoragenodeBandwidthAllocation updates 'allocated' bandwidth for given storage nodes
|
|
|
|
UpdateStoragenodeBandwidthAllocation(ctx context.Context, storageNodes []storj.NodeID, action pb.PieceAction, amount int64, intervalStart time.Time) error
|
2019-04-01 21:14:58 +01:00
|
|
|
// UpdateStoragenodeBandwidthSettle updates 'settled' bandwidth for given storage node
|
2019-04-04 16:20:59 +01:00
|
|
|
UpdateStoragenodeBandwidthSettle(ctx context.Context, storageNode storj.NodeID, action pb.PieceAction, amount int64, intervalStart time.Time) error
|
2019-04-01 21:14:58 +01:00
|
|
|
|
|
|
|
// GetBucketBandwidth gets total bucket bandwidth from period of time
|
2019-06-25 16:58:42 +01:00
|
|
|
GetBucketBandwidth(ctx context.Context, projectID uuid.UUID, bucketName []byte, from, to time.Time) (int64, error)
|
2019-04-01 21:14:58 +01:00
|
|
|
// GetStorageNodeBandwidth gets total storage node bandwidth from period of time
|
|
|
|
GetStorageNodeBandwidth(ctx context.Context, nodeID storj.NodeID, from, to time.Time) (int64, error)
|
2019-08-15 20:05:43 +01:00
|
|
|
|
|
|
|
// ProcessOrders takes a list of order requests and processes them in a batch
|
2019-08-19 14:36:11 +01:00
|
|
|
ProcessOrders(ctx context.Context, requests []*ProcessOrderRequest) (responses []*ProcessOrderResponse, err error)
|
2019-03-27 10:24:35 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
var (
|
|
|
|
// Error the default orders errs class
|
|
|
|
Error = errs.Class("orders error")
|
2019-04-04 15:42:01 +01:00
|
|
|
// ErrUsingSerialNumber error class for serial number
|
|
|
|
ErrUsingSerialNumber = errs.Class("serial number")
|
|
|
|
|
|
|
|
mon = monkit.Package()
|
2019-03-27 10:24:35 +00:00
|
|
|
)
|
|
|
|
|
2019-08-15 20:05:43 +01:00
|
|
|
// ProcessOrderRequest for batch order processing
|
|
|
|
type ProcessOrderRequest struct {
|
|
|
|
Order *pb.Order
|
|
|
|
OrderLimit *pb.OrderLimit
|
|
|
|
}
|
|
|
|
|
2019-08-19 14:36:11 +01:00
|
|
|
// ProcessOrderResponse for batch order processing responses
|
|
|
|
type ProcessOrderResponse struct {
|
|
|
|
SerialNumber storj.SerialNumber
|
|
|
|
Status pb.SettlementResponse_Status
|
|
|
|
}
|
|
|
|
|
2019-03-27 10:24:35 +00:00
|
|
|
// Endpoint for orders receiving
|
|
|
|
type Endpoint struct {
|
2019-08-15 20:05:43 +01:00
|
|
|
log *zap.Logger
|
|
|
|
satelliteSignee signing.Signee
|
|
|
|
DB DB
|
|
|
|
settlementBatchSize int
|
2019-03-27 10:24:35 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// NewEndpoint new orders receiving endpoint
|
2019-08-15 20:05:43 +01:00
|
|
|
func NewEndpoint(log *zap.Logger, satelliteSignee signing.Signee, db DB, settlementBatchSize int) *Endpoint {
|
2019-03-27 10:24:35 +00:00
|
|
|
return &Endpoint{
|
2019-08-15 20:05:43 +01:00
|
|
|
log: log,
|
|
|
|
satelliteSignee: satelliteSignee,
|
|
|
|
DB: db,
|
|
|
|
settlementBatchSize: settlementBatchSize,
|
2019-03-27 10:24:35 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-06-05 16:43:41 +01:00
|
|
|
func monitoredSettlementStreamReceive(ctx context.Context, stream pb.Orders_SettlementServer) (_ *pb.SettlementRequest, err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
return stream.Recv()
|
|
|
|
}
|
|
|
|
|
|
|
|
func monitoredSettlementStreamSend(ctx context.Context, stream pb.Orders_SettlementServer, resp *pb.SettlementResponse) (err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
switch resp.Status {
|
|
|
|
case pb.SettlementResponse_ACCEPTED:
|
|
|
|
mon.Event("settlement_response_accepted")
|
|
|
|
case pb.SettlementResponse_REJECTED:
|
|
|
|
mon.Event("settlement_response_rejected")
|
|
|
|
default:
|
|
|
|
mon.Event("settlement_response_unknown")
|
|
|
|
}
|
|
|
|
return stream.Send(resp)
|
|
|
|
}
|
|
|
|
|
2019-08-19 14:36:11 +01:00
|
|
|
// Settlement receives orders and handles them in batches
|
2019-03-27 10:24:35 +00:00
|
|
|
func (endpoint *Endpoint) Settlement(stream pb.Orders_SettlementServer) (err error) {
|
|
|
|
ctx := stream.Context()
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
|
|
|
peer, err := identity.PeerIdentityFromContext(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return status.Error(codes.Unauthenticated, err.Error())
|
|
|
|
}
|
|
|
|
|
|
|
|
formatError := func(err error) error {
|
|
|
|
if err == io.EOF {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
return status.Error(codes.Unknown, err.Error())
|
|
|
|
}
|
|
|
|
|
2019-05-23 23:47:05 +01:00
|
|
|
log := endpoint.log.Named(peer.ID.String())
|
|
|
|
log.Debug("Settlement")
|
2019-08-15 20:05:43 +01:00
|
|
|
|
|
|
|
requests := make([]*ProcessOrderRequest, 0, endpoint.settlementBatchSize)
|
|
|
|
|
|
|
|
defer func() {
|
|
|
|
if len(requests) > 0 {
|
|
|
|
err = errs.Combine(err, endpoint.processOrders(ctx, stream, requests))
|
|
|
|
if err != nil {
|
|
|
|
err = formatError(err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
2019-03-27 10:24:35 +00:00
|
|
|
for {
|
2019-06-05 16:43:41 +01:00
|
|
|
request, err := monitoredSettlementStreamReceive(ctx, stream)
|
2019-03-27 10:24:35 +00:00
|
|
|
if err != nil {
|
|
|
|
return formatError(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if request == nil {
|
|
|
|
return status.Error(codes.InvalidArgument, "request missing")
|
|
|
|
}
|
|
|
|
if request.Limit == nil {
|
|
|
|
return status.Error(codes.InvalidArgument, "order limit missing")
|
|
|
|
}
|
|
|
|
if request.Order == nil {
|
|
|
|
return status.Error(codes.InvalidArgument, "order missing")
|
|
|
|
}
|
|
|
|
|
|
|
|
orderLimit := request.Limit
|
|
|
|
order := request.Order
|
|
|
|
|
|
|
|
if orderLimit.StorageNodeId != peer.ID {
|
|
|
|
return status.Error(codes.Unauthenticated, "only specified storage node can settle order")
|
|
|
|
}
|
|
|
|
|
2019-06-21 12:44:14 +01:00
|
|
|
rejectErr := func() error {
|
2019-08-12 15:41:34 +01:00
|
|
|
// satellite verifies that it signed the order limit
|
2019-06-05 14:47:01 +01:00
|
|
|
if err := signing.VerifyOrderLimitSignature(ctx, endpoint.satelliteSignee, orderLimit); err != nil {
|
2019-03-27 10:24:35 +00:00
|
|
|
return Error.New("unable to verify order limit")
|
|
|
|
}
|
|
|
|
|
2019-08-12 15:41:34 +01:00
|
|
|
// satellite verifies that the order signature matches pub key in order limit
|
|
|
|
if err := signing.VerifyUplinkOrderSignature(ctx, orderLimit.UplinkPublicKey, order); err != nil {
|
|
|
|
return Error.New("unable to verify order")
|
2019-03-27 10:24:35 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// TODO should this reject or just error ??
|
|
|
|
if orderLimit.SerialNumber != order.SerialNumber {
|
|
|
|
return Error.New("invalid serial number")
|
|
|
|
}
|
|
|
|
|
2019-07-09 22:54:00 +01:00
|
|
|
if orderLimit.OrderExpiration.Before(time.Now()) {
|
2019-03-27 10:24:35 +00:00
|
|
|
return Error.New("order limit expired")
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}()
|
2019-08-16 15:53:22 +01:00
|
|
|
if rejectErr != nil {
|
|
|
|
log.Debug("order limit/order verification failed", zap.Stringer("serial", orderLimit.SerialNumber), zap.Error(rejectErr))
|
2019-06-05 16:43:41 +01:00
|
|
|
err := monitoredSettlementStreamSend(ctx, stream, &pb.SettlementResponse{
|
2019-03-27 10:24:35 +00:00
|
|
|
SerialNumber: orderLimit.SerialNumber,
|
|
|
|
Status: pb.SettlementResponse_REJECTED,
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return formatError(err)
|
|
|
|
}
|
2019-06-12 16:00:29 +01:00
|
|
|
continue
|
2019-03-27 10:24:35 +00:00
|
|
|
}
|
|
|
|
|
2019-08-15 20:05:43 +01:00
|
|
|
requests = append(requests, &ProcessOrderRequest{Order: order, OrderLimit: orderLimit})
|
2019-04-01 21:14:58 +01:00
|
|
|
|
2019-08-15 20:05:43 +01:00
|
|
|
if len(requests) >= endpoint.settlementBatchSize {
|
|
|
|
err = endpoint.processOrders(ctx, stream, requests)
|
|
|
|
requests = requests[:0]
|
|
|
|
if err != nil {
|
|
|
|
return formatError(err)
|
2019-04-01 21:14:58 +01:00
|
|
|
}
|
|
|
|
}
|
2019-08-15 20:05:43 +01:00
|
|
|
}
|
|
|
|
}
|
2019-04-01 21:14:58 +01:00
|
|
|
|
2019-08-15 20:05:43 +01:00
|
|
|
func (endpoint *Endpoint) processOrders(ctx context.Context, stream pb.Orders_SettlementServer, requests []*ProcessOrderRequest) (err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
|
|
|
responses, err := endpoint.DB.ProcessOrders(ctx, requests)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2019-03-27 10:24:35 +00:00
|
|
|
|
2019-08-15 20:05:43 +01:00
|
|
|
for _, response := range responses {
|
2019-08-19 14:36:11 +01:00
|
|
|
r := &pb.SettlementResponse{
|
|
|
|
SerialNumber: response.SerialNumber,
|
|
|
|
Status: response.Status,
|
|
|
|
}
|
|
|
|
err = monitoredSettlementStreamSend(ctx, stream, r)
|
2019-03-27 10:24:35 +00:00
|
|
|
if err != nil {
|
2019-08-15 20:05:43 +01:00
|
|
|
return err
|
2019-03-27 10:24:35 +00:00
|
|
|
}
|
|
|
|
}
|
2019-08-15 20:05:43 +01:00
|
|
|
return nil
|
2019-03-27 10:24:35 +00:00
|
|
|
}
|